Mercurial > repos > richard-burhans > segalign
comparison package_output.py @ 0:5c72425b7f1b draft
planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit 98a4dd44360447aa96d92143384d78e116d7581b
| author | richard-burhans | 
|---|---|
| date | Wed, 17 Apr 2024 18:06:54 +0000 | 
| parents | |
| children | 36cafb694dd2 | 
   comparison
  equal
  deleted
  inserted
  replaced
| -1:000000000000 | 0:5c72425b7f1b | 
|---|---|
| 1 #!/usr/bin/env python | |
| 2 | |
| 3 import argparse | |
| 4 import configparser | |
| 5 import json | |
| 6 import os | |
| 7 import sys | |
| 8 import tarfile | |
| 9 import typing | |
| 10 | |
| 11 import bashlex | |
| 12 | |
| 13 | |
| 14 class PackageFile: | |
| 15 def __init__( | |
| 16 self, | |
| 17 pathname: str = "data_package.tgz", | |
| 18 top_dir: str = "galaxy", | |
| 19 data_dir: str = "files", | |
| 20 config_file: str = "commands.json", | |
| 21 ) -> None: | |
| 22 self.pathname: str = os.path.realpath(pathname) | |
| 23 self.data_root: str = os.path.join(top_dir, data_dir) | |
| 24 self.config_path: str = os.path.join(top_dir, config_file) | |
| 25 self.config_file: str = config_file | |
| 26 self.tarfile: typing.Optional[tarfile.TarFile] = None | |
| 27 self.name_cache: typing.Dict[typing.Any, typing.Any] = {} | |
| 28 self.working_dir: str = os.path.realpath(os.getcwd()) | |
| 29 | |
| 30 def _initialize(self) -> None: | |
| 31 if self.tarfile is None: | |
| 32 self.tarfile = tarfile.open( | |
| 33 name=self.pathname, | |
| 34 mode="w:gz", | |
| 35 format=tarfile.GNU_FORMAT, | |
| 36 compresslevel=1, | |
| 37 ) | |
| 38 | |
| 39 def add_config(self, pathname: str) -> None: | |
| 40 if self.tarfile is None: | |
| 41 self._initialize() | |
| 42 | |
| 43 source_path = os.path.realpath(pathname) | |
| 44 | |
| 45 if self.tarfile is not None: | |
| 46 self.tarfile.add(source_path, arcname=self.config_path, recursive=False) | |
| 47 | |
| 48 def add_file(self, pathname: str, arcname: typing.Optional[str] = None) -> None: | |
| 49 if self.tarfile is None: | |
| 50 self._initialize() | |
| 51 | |
| 52 source_path = os.path.realpath(pathname) | |
| 53 | |
| 54 dest_path = None | |
| 55 | |
| 56 if arcname is None: | |
| 57 dest_path = os.path.join(self.data_root, os.path.basename(source_path)) | |
| 58 else: | |
| 59 arc_path = os.path.realpath(arcname) | |
| 60 rel_path = os.path.relpath(arc_path, self.working_dir) | |
| 61 if not (os.path.isabs(rel_path) or rel_path.startswith("../")): | |
| 62 dest_path = os.path.join(self.data_root, rel_path) | |
| 63 else: | |
| 64 sys.exit("path fail") | |
| 65 | |
| 66 if dest_path is not None: | |
| 67 if self.tarfile is not None: | |
| 68 if dest_path not in self.name_cache: | |
| 69 try: | |
| 70 self.tarfile.add( | |
| 71 source_path, arcname=dest_path, recursive=False | |
| 72 ) | |
| 73 except FileNotFoundError: | |
| 74 sys.exit(f"missing source file {source_path}") | |
| 75 | |
| 76 self.name_cache[dest_path] = 1 | |
| 77 # print(f"added: {dest_path}", flush=True) | |
| 78 | |
| 79 def close(self) -> None: | |
| 80 if self.tarfile is not None: | |
| 81 self.tarfile.close() | |
| 82 self.tarfile = None | |
| 83 | |
| 84 | |
| 85 class bashCommandLineFile: | |
| 86 def __init__( | |
| 87 self, | |
| 88 pathname: str, | |
| 89 config: configparser.ConfigParser, | |
| 90 package_file: PackageFile, | |
| 91 ) -> None: | |
| 92 self.pathname: str = pathname | |
| 93 self.config = config | |
| 94 self.package_file = package_file | |
| 95 self.executable: typing.Optional[str] = None | |
| 96 self._parse_lines() | |
| 97 | |
| 98 def _parse_lines(self) -> None: | |
| 99 with open("commands.json", "w") as ofh: | |
| 100 with open(self.pathname) as f: | |
| 101 line: str | |
| 102 for line in f: | |
| 103 line = line.rstrip("\n") | |
| 104 command_dict = self._parse_line(line) | |
| 105 # we may want to re-write args here | |
| 106 new_args_list = [] | |
| 107 | |
| 108 args_list = command_dict.get("args", []) | |
| 109 for arg in args_list: | |
| 110 if arg.startswith("--target="): | |
| 111 pathname = arg[9:] | |
| 112 new_args_list.append(arg) | |
| 113 if "[" in pathname: | |
| 114 elems = pathname.split("[") | |
| 115 sequence_file = elems.pop(0) | |
| 116 self.package_file.add_file(sequence_file, sequence_file) | |
| 117 for elem in elems: | |
| 118 if elem.endswith("]"): | |
| 119 elem = elem[:-1] | |
| 120 if elem.startswith("subset="): | |
| 121 subset_file = elem[7:] | |
| 122 self.package_file.add_file(subset_file) | |
| 123 | |
| 124 elif arg.startswith("--query="): | |
| 125 pathname = arg[8:] | |
| 126 new_args_list.append(arg) | |
| 127 if "[" in pathname: | |
| 128 elems = pathname.split("[") | |
| 129 sequence_file = elems.pop(0) | |
| 130 self.package_file.add_file(sequence_file, sequence_file) | |
| 131 for elem in elems: | |
| 132 if elem.endswith("]"): | |
| 133 elem = elem[:-1] | |
| 134 if elem.startswith("subset="): | |
| 135 subset_file = elem[7:] | |
| 136 self.package_file.add_file(subset_file) | |
| 137 elif arg.startswith("--segments="): | |
| 138 pathname = arg[11:] | |
| 139 new_args_list.append(arg) | |
| 140 self.package_file.add_file(pathname) | |
| 141 elif arg.startswith("--scores="): | |
| 142 pathname = arg[9:] | |
| 143 new_args_list.append("--scores=data/scores.txt") | |
| 144 self.package_file.add_file(pathname, "data/scores.txt") | |
| 145 else: | |
| 146 new_args_list.append(arg) | |
| 147 | |
| 148 command_dict["args"] = new_args_list | |
| 149 print(json.dumps(command_dict), file=ofh) | |
| 150 | |
| 151 self.package_file.add_config("commands.json") | |
| 152 | |
| 153 def _parse_line(self, line: str) -> typing.Dict[str, typing.Any]: | |
| 154 # resolve shell redirects | |
| 155 trees: typing.List[typing.Any] = bashlex.parse(line, strictmode=False) # type: ignore[attr-defined] | |
| 156 positions: typing.List[typing.Tuple[int, int]] = [] | |
| 157 | |
| 158 for tree in trees: | |
| 159 visitor = nodevisitor(positions) | |
| 160 visitor.visit(tree) | |
| 161 | |
| 162 # do replacements from the end so the indicies will be correct | |
| 163 positions.reverse() | |
| 164 | |
| 165 processed = list(line) | |
| 166 for start, end in positions: | |
| 167 processed[start:end] = "" | |
| 168 | |
| 169 processed_line: str = "".join(processed) | |
| 170 | |
| 171 command_dict = self._parse_processed_line(processed_line) | |
| 172 command_dict["stdin"] = visitor.stdin | |
| 173 command_dict["stdout"] = visitor.stdout | |
| 174 command_dict["stderr"] = visitor.stderr | |
| 175 | |
| 176 return command_dict | |
| 177 | |
| 178 def _parse_processed_line(self, line: str) -> typing.Dict[str, typing.Any]: | |
| 179 argv: typing.List[str] = list(bashlex.split(line)) # type: ignore[attr-defined] | |
| 180 self.executable = argv.pop(0) | |
| 181 | |
| 182 parser: argparse.ArgumentParser = argparse.ArgumentParser(add_help=False) | |
| 183 if "arguments" in self.config: | |
| 184 arguments_section = self.config["arguments"] | |
| 185 | |
| 186 arg: str | |
| 187 if "flag_args" in arguments_section: | |
| 188 for arg in arguments_section["flag_args"].split(): | |
| 189 parser.add_argument(f"--{arg}", action="store_true") | |
| 190 | |
| 191 if "str_args" in arguments_section: | |
| 192 for arg in arguments_section["str_args"].split(): | |
| 193 parser.add_argument(f"--{arg}", type=str) | |
| 194 | |
| 195 if "bool_str_args" in arguments_section: | |
| 196 for arg in arguments_section["bool_str_args"].split(): | |
| 197 parser.add_argument( | |
| 198 f"--{arg}", nargs="?", const=True, default=False | |
| 199 ) | |
| 200 | |
| 201 if "int_args" in arguments_section: | |
| 202 for arg in arguments_section["int_args"].split(): | |
| 203 parser.add_argument(f"--{arg}", type=int) | |
| 204 | |
| 205 if "bool_int_args" in arguments_section: | |
| 206 for arg in arguments_section["bool_int_args"].split(): | |
| 207 parser.add_argument( | |
| 208 f"--{arg}", nargs="?", const=True, default=False | |
| 209 ) | |
| 210 | |
| 211 namespace, rest = parser.parse_known_intermixed_args(argv) | |
| 212 vars_dict = vars(namespace) | |
| 213 | |
| 214 command_dict: typing.Dict[str, typing.Any] = { | |
| 215 "executable": self.executable, | |
| 216 "args": [], | |
| 217 } | |
| 218 | |
| 219 for var in vars_dict.keys(): | |
| 220 value = vars_dict[var] | |
| 221 if value is not None: | |
| 222 if isinstance(value, bool): | |
| 223 if value is True: | |
| 224 command_dict["args"].append(f"--{var}") | |
| 225 else: | |
| 226 command_dict["args"].append(f"--{var}={value}") | |
| 227 | |
| 228 if len(rest) >= 0: | |
| 229 value = rest.pop(0) | |
| 230 command_dict["args"].append(f"--target={value}") | |
| 231 | |
| 232 if len(rest) >= 0: | |
| 233 value = rest.pop(0) | |
| 234 command_dict["args"].append(f"--query={value}") | |
| 235 | |
| 236 return command_dict | |
| 237 | |
| 238 | |
| 239 class nodevisitor(bashlex.ast.nodevisitor): # type: ignore[name-defined,misc] | |
| 240 def __init__(self, positions: typing.List[typing.Tuple[int, int]]) -> None: | |
| 241 self.positions = positions | |
| 242 self.stdin = None | |
| 243 self.stdout = None | |
| 244 self.stderr = None | |
| 245 | |
| 246 def visitredirect( | |
| 247 self, | |
| 248 n: bashlex.ast.node, # type: ignore[name-defined] | |
| 249 n_input: int, | |
| 250 n_type: str, | |
| 251 output: typing.Any, | |
| 252 heredoc: typing.Any, | |
| 253 ) -> None: | |
| 254 if isinstance(n_input, int) and 0 <= n_input <= 2: | |
| 255 if isinstance(output, bashlex.ast.node) and output.kind == "word": # type: ignore[attr-defined] | |
| 256 self.positions.append(n.pos) | |
| 257 if n_input == 0: | |
| 258 self.stdin = output.word | |
| 259 elif n_input == 1: | |
| 260 self.stdout = output.word | |
| 261 elif n_input == 2: | |
| 262 self.stderr = output.word | |
| 263 else: | |
| 264 sys.exit(f"oops 1: {type(n_input)}") | |
| 265 else: | |
| 266 sys.exit(f"oops 2: {type(n_input)}") | |
| 267 | |
| 268 def visitheredoc(self, n: bashlex.ast.node, value: typing.Any) -> None: # type: ignore[name-defined] | |
| 269 pass | |
| 270 | |
| 271 | |
| 272 def main() -> None: | |
| 273 our_dirname: str = os.path.dirname(os.path.realpath(__file__)) | |
| 274 lastz_command_config_file: str = os.path.join(our_dirname, "lastz-cmd.ini") | |
| 275 | |
| 276 config: configparser.ConfigParser = configparser.ConfigParser() | |
| 277 config.read(lastz_command_config_file) | |
| 278 | |
| 279 package_file = PackageFile() | |
| 280 lastz_command_file = "lastz_commands.txt" | |
| 281 bashCommandLineFile(lastz_command_file, config, package_file) | |
| 282 package_file.close() | |
| 283 | |
| 284 | |
| 285 if __name__ == "__main__": | |
| 286 main() | 
