Mercurial > repos > richard-burhans > segalign
diff package_output.py @ 0:5c72425b7f1b draft
planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit 98a4dd44360447aa96d92143384d78e116d7581b
author | richard-burhans |
---|---|
date | Wed, 17 Apr 2024 18:06:54 +0000 |
parents | |
children | 36cafb694dd2 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/package_output.py Wed Apr 17 18:06:54 2024 +0000 @@ -0,0 +1,286 @@ +#!/usr/bin/env python + +import argparse +import configparser +import json +import os +import sys +import tarfile +import typing + +import bashlex + + +class PackageFile: + def __init__( + self, + pathname: str = "data_package.tgz", + top_dir: str = "galaxy", + data_dir: str = "files", + config_file: str = "commands.json", + ) -> None: + self.pathname: str = os.path.realpath(pathname) + self.data_root: str = os.path.join(top_dir, data_dir) + self.config_path: str = os.path.join(top_dir, config_file) + self.config_file: str = config_file + self.tarfile: typing.Optional[tarfile.TarFile] = None + self.name_cache: typing.Dict[typing.Any, typing.Any] = {} + self.working_dir: str = os.path.realpath(os.getcwd()) + + def _initialize(self) -> None: + if self.tarfile is None: + self.tarfile = tarfile.open( + name=self.pathname, + mode="w:gz", + format=tarfile.GNU_FORMAT, + compresslevel=1, + ) + + def add_config(self, pathname: str) -> None: + if self.tarfile is None: + self._initialize() + + source_path = os.path.realpath(pathname) + + if self.tarfile is not None: + self.tarfile.add(source_path, arcname=self.config_path, recursive=False) + + def add_file(self, pathname: str, arcname: typing.Optional[str] = None) -> None: + if self.tarfile is None: + self._initialize() + + source_path = os.path.realpath(pathname) + + dest_path = None + + if arcname is None: + dest_path = os.path.join(self.data_root, os.path.basename(source_path)) + else: + arc_path = os.path.realpath(arcname) + rel_path = os.path.relpath(arc_path, self.working_dir) + if not (os.path.isabs(rel_path) or rel_path.startswith("../")): + dest_path = os.path.join(self.data_root, rel_path) + else: + sys.exit("path fail") + + if dest_path is not None: + if self.tarfile is not None: + if dest_path not in self.name_cache: + try: + self.tarfile.add( + source_path, arcname=dest_path, recursive=False + ) + except FileNotFoundError: + sys.exit(f"missing source file {source_path}") + + self.name_cache[dest_path] = 1 + # print(f"added: {dest_path}", flush=True) + + def close(self) -> None: + if self.tarfile is not None: + self.tarfile.close() + self.tarfile = None + + +class bashCommandLineFile: + def __init__( + self, + pathname: str, + config: configparser.ConfigParser, + package_file: PackageFile, + ) -> None: + self.pathname: str = pathname + self.config = config + self.package_file = package_file + self.executable: typing.Optional[str] = None + self._parse_lines() + + def _parse_lines(self) -> None: + with open("commands.json", "w") as ofh: + with open(self.pathname) as f: + line: str + for line in f: + line = line.rstrip("\n") + command_dict = self._parse_line(line) + # we may want to re-write args here + new_args_list = [] + + args_list = command_dict.get("args", []) + for arg in args_list: + if arg.startswith("--target="): + pathname = arg[9:] + new_args_list.append(arg) + if "[" in pathname: + elems = pathname.split("[") + sequence_file = elems.pop(0) + self.package_file.add_file(sequence_file, sequence_file) + for elem in elems: + if elem.endswith("]"): + elem = elem[:-1] + if elem.startswith("subset="): + subset_file = elem[7:] + self.package_file.add_file(subset_file) + + elif arg.startswith("--query="): + pathname = arg[8:] + new_args_list.append(arg) + if "[" in pathname: + elems = pathname.split("[") + sequence_file = elems.pop(0) + self.package_file.add_file(sequence_file, sequence_file) + for elem in elems: + if elem.endswith("]"): + elem = elem[:-1] + if elem.startswith("subset="): + subset_file = elem[7:] + self.package_file.add_file(subset_file) + elif arg.startswith("--segments="): + pathname = arg[11:] + new_args_list.append(arg) + self.package_file.add_file(pathname) + elif arg.startswith("--scores="): + pathname = arg[9:] + new_args_list.append("--scores=data/scores.txt") + self.package_file.add_file(pathname, "data/scores.txt") + else: + new_args_list.append(arg) + + command_dict["args"] = new_args_list + print(json.dumps(command_dict), file=ofh) + + self.package_file.add_config("commands.json") + + def _parse_line(self, line: str) -> typing.Dict[str, typing.Any]: + # resolve shell redirects + trees: typing.List[typing.Any] = bashlex.parse(line, strictmode=False) # type: ignore[attr-defined] + positions: typing.List[typing.Tuple[int, int]] = [] + + for tree in trees: + visitor = nodevisitor(positions) + visitor.visit(tree) + + # do replacements from the end so the indicies will be correct + positions.reverse() + + processed = list(line) + for start, end in positions: + processed[start:end] = "" + + processed_line: str = "".join(processed) + + command_dict = self._parse_processed_line(processed_line) + command_dict["stdin"] = visitor.stdin + command_dict["stdout"] = visitor.stdout + command_dict["stderr"] = visitor.stderr + + return command_dict + + def _parse_processed_line(self, line: str) -> typing.Dict[str, typing.Any]: + argv: typing.List[str] = list(bashlex.split(line)) # type: ignore[attr-defined] + self.executable = argv.pop(0) + + parser: argparse.ArgumentParser = argparse.ArgumentParser(add_help=False) + if "arguments" in self.config: + arguments_section = self.config["arguments"] + + arg: str + if "flag_args" in arguments_section: + for arg in arguments_section["flag_args"].split(): + parser.add_argument(f"--{arg}", action="store_true") + + if "str_args" in arguments_section: + for arg in arguments_section["str_args"].split(): + parser.add_argument(f"--{arg}", type=str) + + if "bool_str_args" in arguments_section: + for arg in arguments_section["bool_str_args"].split(): + parser.add_argument( + f"--{arg}", nargs="?", const=True, default=False + ) + + if "int_args" in arguments_section: + for arg in arguments_section["int_args"].split(): + parser.add_argument(f"--{arg}", type=int) + + if "bool_int_args" in arguments_section: + for arg in arguments_section["bool_int_args"].split(): + parser.add_argument( + f"--{arg}", nargs="?", const=True, default=False + ) + + namespace, rest = parser.parse_known_intermixed_args(argv) + vars_dict = vars(namespace) + + command_dict: typing.Dict[str, typing.Any] = { + "executable": self.executable, + "args": [], + } + + for var in vars_dict.keys(): + value = vars_dict[var] + if value is not None: + if isinstance(value, bool): + if value is True: + command_dict["args"].append(f"--{var}") + else: + command_dict["args"].append(f"--{var}={value}") + + if len(rest) >= 0: + value = rest.pop(0) + command_dict["args"].append(f"--target={value}") + + if len(rest) >= 0: + value = rest.pop(0) + command_dict["args"].append(f"--query={value}") + + return command_dict + + +class nodevisitor(bashlex.ast.nodevisitor): # type: ignore[name-defined,misc] + def __init__(self, positions: typing.List[typing.Tuple[int, int]]) -> None: + self.positions = positions + self.stdin = None + self.stdout = None + self.stderr = None + + def visitredirect( + self, + n: bashlex.ast.node, # type: ignore[name-defined] + n_input: int, + n_type: str, + output: typing.Any, + heredoc: typing.Any, + ) -> None: + if isinstance(n_input, int) and 0 <= n_input <= 2: + if isinstance(output, bashlex.ast.node) and output.kind == "word": # type: ignore[attr-defined] + self.positions.append(n.pos) + if n_input == 0: + self.stdin = output.word + elif n_input == 1: + self.stdout = output.word + elif n_input == 2: + self.stderr = output.word + else: + sys.exit(f"oops 1: {type(n_input)}") + else: + sys.exit(f"oops 2: {type(n_input)}") + + def visitheredoc(self, n: bashlex.ast.node, value: typing.Any) -> None: # type: ignore[name-defined] + pass + + +def main() -> None: + our_dirname: str = os.path.dirname(os.path.realpath(__file__)) + lastz_command_config_file: str = os.path.join(our_dirname, "lastz-cmd.ini") + + config: configparser.ConfigParser = configparser.ConfigParser() + config.read(lastz_command_config_file) + + package_file = PackageFile() + lastz_command_file = "lastz_commands.txt" + bashCommandLineFile(lastz_command_file, config, package_file) + package_file.close() + + +if __name__ == "__main__": + main()