Mercurial > repos > richard-burhans > segalign
view package_output.py @ 0:5c72425b7f1b draft
planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit 98a4dd44360447aa96d92143384d78e116d7581b
author | richard-burhans |
---|---|
date | Wed, 17 Apr 2024 18:06:54 +0000 |
parents | |
children | 36cafb694dd2 |
line wrap: on
line source
#!/usr/bin/env python import argparse import configparser import json import os import sys import tarfile import typing import bashlex class PackageFile: def __init__( self, pathname: str = "data_package.tgz", top_dir: str = "galaxy", data_dir: str = "files", config_file: str = "commands.json", ) -> None: self.pathname: str = os.path.realpath(pathname) self.data_root: str = os.path.join(top_dir, data_dir) self.config_path: str = os.path.join(top_dir, config_file) self.config_file: str = config_file self.tarfile: typing.Optional[tarfile.TarFile] = None self.name_cache: typing.Dict[typing.Any, typing.Any] = {} self.working_dir: str = os.path.realpath(os.getcwd()) def _initialize(self) -> None: if self.tarfile is None: self.tarfile = tarfile.open( name=self.pathname, mode="w:gz", format=tarfile.GNU_FORMAT, compresslevel=1, ) def add_config(self, pathname: str) -> None: if self.tarfile is None: self._initialize() source_path = os.path.realpath(pathname) if self.tarfile is not None: self.tarfile.add(source_path, arcname=self.config_path, recursive=False) def add_file(self, pathname: str, arcname: typing.Optional[str] = None) -> None: if self.tarfile is None: self._initialize() source_path = os.path.realpath(pathname) dest_path = None if arcname is None: dest_path = os.path.join(self.data_root, os.path.basename(source_path)) else: arc_path = os.path.realpath(arcname) rel_path = os.path.relpath(arc_path, self.working_dir) if not (os.path.isabs(rel_path) or rel_path.startswith("../")): dest_path = os.path.join(self.data_root, rel_path) else: sys.exit("path fail") if dest_path is not None: if self.tarfile is not None: if dest_path not in self.name_cache: try: self.tarfile.add( source_path, arcname=dest_path, recursive=False ) except FileNotFoundError: sys.exit(f"missing source file {source_path}") self.name_cache[dest_path] = 1 # print(f"added: {dest_path}", flush=True) def close(self) -> None: if self.tarfile is not None: self.tarfile.close() self.tarfile = None class bashCommandLineFile: def __init__( self, pathname: str, config: configparser.ConfigParser, package_file: PackageFile, ) -> None: self.pathname: str = pathname self.config = config self.package_file = package_file self.executable: typing.Optional[str] = None self._parse_lines() def _parse_lines(self) -> None: with open("commands.json", "w") as ofh: with open(self.pathname) as f: line: str for line in f: line = line.rstrip("\n") command_dict = self._parse_line(line) # we may want to re-write args here new_args_list = [] args_list = command_dict.get("args", []) for arg in args_list: if arg.startswith("--target="): pathname = arg[9:] new_args_list.append(arg) if "[" in pathname: elems = pathname.split("[") sequence_file = elems.pop(0) self.package_file.add_file(sequence_file, sequence_file) for elem in elems: if elem.endswith("]"): elem = elem[:-1] if elem.startswith("subset="): subset_file = elem[7:] self.package_file.add_file(subset_file) elif arg.startswith("--query="): pathname = arg[8:] new_args_list.append(arg) if "[" in pathname: elems = pathname.split("[") sequence_file = elems.pop(0) self.package_file.add_file(sequence_file, sequence_file) for elem in elems: if elem.endswith("]"): elem = elem[:-1] if elem.startswith("subset="): subset_file = elem[7:] self.package_file.add_file(subset_file) elif arg.startswith("--segments="): pathname = arg[11:] new_args_list.append(arg) self.package_file.add_file(pathname) elif arg.startswith("--scores="): pathname = arg[9:] new_args_list.append("--scores=data/scores.txt") self.package_file.add_file(pathname, "data/scores.txt") else: new_args_list.append(arg) command_dict["args"] = new_args_list print(json.dumps(command_dict), file=ofh) self.package_file.add_config("commands.json") def _parse_line(self, line: str) -> typing.Dict[str, typing.Any]: # resolve shell redirects trees: typing.List[typing.Any] = bashlex.parse(line, strictmode=False) # type: ignore[attr-defined] positions: typing.List[typing.Tuple[int, int]] = [] for tree in trees: visitor = nodevisitor(positions) visitor.visit(tree) # do replacements from the end so the indicies will be correct positions.reverse() processed = list(line) for start, end in positions: processed[start:end] = "" processed_line: str = "".join(processed) command_dict = self._parse_processed_line(processed_line) command_dict["stdin"] = visitor.stdin command_dict["stdout"] = visitor.stdout command_dict["stderr"] = visitor.stderr return command_dict def _parse_processed_line(self, line: str) -> typing.Dict[str, typing.Any]: argv: typing.List[str] = list(bashlex.split(line)) # type: ignore[attr-defined] self.executable = argv.pop(0) parser: argparse.ArgumentParser = argparse.ArgumentParser(add_help=False) if "arguments" in self.config: arguments_section = self.config["arguments"] arg: str if "flag_args" in arguments_section: for arg in arguments_section["flag_args"].split(): parser.add_argument(f"--{arg}", action="store_true") if "str_args" in arguments_section: for arg in arguments_section["str_args"].split(): parser.add_argument(f"--{arg}", type=str) if "bool_str_args" in arguments_section: for arg in arguments_section["bool_str_args"].split(): parser.add_argument( f"--{arg}", nargs="?", const=True, default=False ) if "int_args" in arguments_section: for arg in arguments_section["int_args"].split(): parser.add_argument(f"--{arg}", type=int) if "bool_int_args" in arguments_section: for arg in arguments_section["bool_int_args"].split(): parser.add_argument( f"--{arg}", nargs="?", const=True, default=False ) namespace, rest = parser.parse_known_intermixed_args(argv) vars_dict = vars(namespace) command_dict: typing.Dict[str, typing.Any] = { "executable": self.executable, "args": [], } for var in vars_dict.keys(): value = vars_dict[var] if value is not None: if isinstance(value, bool): if value is True: command_dict["args"].append(f"--{var}") else: command_dict["args"].append(f"--{var}={value}") if len(rest) >= 0: value = rest.pop(0) command_dict["args"].append(f"--target={value}") if len(rest) >= 0: value = rest.pop(0) command_dict["args"].append(f"--query={value}") return command_dict class nodevisitor(bashlex.ast.nodevisitor): # type: ignore[name-defined,misc] def __init__(self, positions: typing.List[typing.Tuple[int, int]]) -> None: self.positions = positions self.stdin = None self.stdout = None self.stderr = None def visitredirect( self, n: bashlex.ast.node, # type: ignore[name-defined] n_input: int, n_type: str, output: typing.Any, heredoc: typing.Any, ) -> None: if isinstance(n_input, int) and 0 <= n_input <= 2: if isinstance(output, bashlex.ast.node) and output.kind == "word": # type: ignore[attr-defined] self.positions.append(n.pos) if n_input == 0: self.stdin = output.word elif n_input == 1: self.stdout = output.word elif n_input == 2: self.stderr = output.word else: sys.exit(f"oops 1: {type(n_input)}") else: sys.exit(f"oops 2: {type(n_input)}") def visitheredoc(self, n: bashlex.ast.node, value: typing.Any) -> None: # type: ignore[name-defined] pass def main() -> None: our_dirname: str = os.path.dirname(os.path.realpath(__file__)) lastz_command_config_file: str = os.path.join(our_dirname, "lastz-cmd.ini") config: configparser.ConfigParser = configparser.ConfigParser() config.read(lastz_command_config_file) package_file = PackageFile() lastz_command_file = "lastz_commands.txt" bashCommandLineFile(lastz_command_file, config, package_file) package_file.close() if __name__ == "__main__": main()