view package_output.py @ 0:5c72425b7f1b draft

planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit 98a4dd44360447aa96d92143384d78e116d7581b
author richard-burhans
date Wed, 17 Apr 2024 18:06:54 +0000
parents
children 36cafb694dd2
line wrap: on
line source

#!/usr/bin/env python

import argparse
import configparser
import json
import os
import sys
import tarfile
import typing

import bashlex


class PackageFile:
    def __init__(
        self,
        pathname: str = "data_package.tgz",
        top_dir: str = "galaxy",
        data_dir: str = "files",
        config_file: str = "commands.json",
    ) -> None:
        self.pathname: str = os.path.realpath(pathname)
        self.data_root: str = os.path.join(top_dir, data_dir)
        self.config_path: str = os.path.join(top_dir, config_file)
        self.config_file: str = config_file
        self.tarfile: typing.Optional[tarfile.TarFile] = None
        self.name_cache: typing.Dict[typing.Any, typing.Any] = {}
        self.working_dir: str = os.path.realpath(os.getcwd())

    def _initialize(self) -> None:
        if self.tarfile is None:
            self.tarfile = tarfile.open(
                name=self.pathname,
                mode="w:gz",
                format=tarfile.GNU_FORMAT,
                compresslevel=1,
            )

    def add_config(self, pathname: str) -> None:
        if self.tarfile is None:
            self._initialize()

        source_path = os.path.realpath(pathname)

        if self.tarfile is not None:
            self.tarfile.add(source_path, arcname=self.config_path, recursive=False)

    def add_file(self, pathname: str, arcname: typing.Optional[str] = None) -> None:
        if self.tarfile is None:
            self._initialize()

        source_path = os.path.realpath(pathname)

        dest_path = None

        if arcname is None:
            dest_path = os.path.join(self.data_root, os.path.basename(source_path))
        else:
            arc_path = os.path.realpath(arcname)
            rel_path = os.path.relpath(arc_path, self.working_dir)
            if not (os.path.isabs(rel_path) or rel_path.startswith("../")):
                dest_path = os.path.join(self.data_root, rel_path)
            else:
                sys.exit("path fail")

        if dest_path is not None:
            if self.tarfile is not None:
                if dest_path not in self.name_cache:
                    try:
                        self.tarfile.add(
                            source_path, arcname=dest_path, recursive=False
                        )
                    except FileNotFoundError:
                        sys.exit(f"missing source file {source_path}")

                    self.name_cache[dest_path] = 1
                    # print(f"added: {dest_path}", flush=True)

    def close(self) -> None:
        if self.tarfile is not None:
            self.tarfile.close()
            self.tarfile = None


class bashCommandLineFile:
    def __init__(
        self,
        pathname: str,
        config: configparser.ConfigParser,
        package_file: PackageFile,
    ) -> None:
        self.pathname: str = pathname
        self.config = config
        self.package_file = package_file
        self.executable: typing.Optional[str] = None
        self._parse_lines()

    def _parse_lines(self) -> None:
        with open("commands.json", "w") as ofh:
            with open(self.pathname) as f:
                line: str
                for line in f:
                    line = line.rstrip("\n")
                    command_dict = self._parse_line(line)
                    # we may want to re-write args here
                    new_args_list = []

                    args_list = command_dict.get("args", [])
                    for arg in args_list:
                        if arg.startswith("--target="):
                            pathname = arg[9:]
                            new_args_list.append(arg)
                            if "[" in pathname:
                                elems = pathname.split("[")
                                sequence_file = elems.pop(0)
                                self.package_file.add_file(sequence_file, sequence_file)
                                for elem in elems:
                                    if elem.endswith("]"):
                                        elem = elem[:-1]
                                        if elem.startswith("subset="):
                                            subset_file = elem[7:]
                                            self.package_file.add_file(subset_file)

                        elif arg.startswith("--query="):
                            pathname = arg[8:]
                            new_args_list.append(arg)
                            if "[" in pathname:
                                elems = pathname.split("[")
                                sequence_file = elems.pop(0)
                                self.package_file.add_file(sequence_file, sequence_file)
                                for elem in elems:
                                    if elem.endswith("]"):
                                        elem = elem[:-1]
                                        if elem.startswith("subset="):
                                            subset_file = elem[7:]
                                            self.package_file.add_file(subset_file)
                        elif arg.startswith("--segments="):
                            pathname = arg[11:]
                            new_args_list.append(arg)
                            self.package_file.add_file(pathname)
                        elif arg.startswith("--scores="):
                            pathname = arg[9:]
                            new_args_list.append("--scores=data/scores.txt")
                            self.package_file.add_file(pathname, "data/scores.txt")
                        else:
                            new_args_list.append(arg)

                    command_dict["args"] = new_args_list
                    print(json.dumps(command_dict), file=ofh)

        self.package_file.add_config("commands.json")

    def _parse_line(self, line: str) -> typing.Dict[str, typing.Any]:
        # resolve shell redirects
        trees: typing.List[typing.Any] = bashlex.parse(line, strictmode=False)  # type: ignore[attr-defined]
        positions: typing.List[typing.Tuple[int, int]] = []

        for tree in trees:
            visitor = nodevisitor(positions)
            visitor.visit(tree)

        # do replacements from the end so the indicies will be correct
        positions.reverse()

        processed = list(line)
        for start, end in positions:
            processed[start:end] = ""

        processed_line: str = "".join(processed)

        command_dict = self._parse_processed_line(processed_line)
        command_dict["stdin"] = visitor.stdin
        command_dict["stdout"] = visitor.stdout
        command_dict["stderr"] = visitor.stderr

        return command_dict

    def _parse_processed_line(self, line: str) -> typing.Dict[str, typing.Any]:
        argv: typing.List[str] = list(bashlex.split(line))  # type: ignore[attr-defined]
        self.executable = argv.pop(0)

        parser: argparse.ArgumentParser = argparse.ArgumentParser(add_help=False)
        if "arguments" in self.config:
            arguments_section = self.config["arguments"]

            arg: str
            if "flag_args" in arguments_section:
                for arg in arguments_section["flag_args"].split():
                    parser.add_argument(f"--{arg}", action="store_true")

            if "str_args" in arguments_section:
                for arg in arguments_section["str_args"].split():
                    parser.add_argument(f"--{arg}", type=str)

            if "bool_str_args" in arguments_section:
                for arg in arguments_section["bool_str_args"].split():
                    parser.add_argument(
                        f"--{arg}", nargs="?", const=True, default=False
                    )

            if "int_args" in arguments_section:
                for arg in arguments_section["int_args"].split():
                    parser.add_argument(f"--{arg}", type=int)

            if "bool_int_args" in arguments_section:
                for arg in arguments_section["bool_int_args"].split():
                    parser.add_argument(
                        f"--{arg}", nargs="?", const=True, default=False
                    )

        namespace, rest = parser.parse_known_intermixed_args(argv)
        vars_dict = vars(namespace)

        command_dict: typing.Dict[str, typing.Any] = {
            "executable": self.executable,
            "args": [],
        }

        for var in vars_dict.keys():
            value = vars_dict[var]
            if value is not None:
                if isinstance(value, bool):
                    if value is True:
                        command_dict["args"].append(f"--{var}")
                else:
                    command_dict["args"].append(f"--{var}={value}")

        if len(rest) >= 0:
            value = rest.pop(0)
            command_dict["args"].append(f"--target={value}")

        if len(rest) >= 0:
            value = rest.pop(0)
            command_dict["args"].append(f"--query={value}")

        return command_dict


class nodevisitor(bashlex.ast.nodevisitor):  # type: ignore[name-defined,misc]
    def __init__(self, positions: typing.List[typing.Tuple[int, int]]) -> None:
        self.positions = positions
        self.stdin = None
        self.stdout = None
        self.stderr = None

    def visitredirect(
        self,
        n: bashlex.ast.node,  # type: ignore[name-defined]
        n_input: int,
        n_type: str,
        output: typing.Any,
        heredoc: typing.Any,
    ) -> None:
        if isinstance(n_input, int) and 0 <= n_input <= 2:
            if isinstance(output, bashlex.ast.node) and output.kind == "word":  # type: ignore[attr-defined]
                self.positions.append(n.pos)
                if n_input == 0:
                    self.stdin = output.word
                elif n_input == 1:
                    self.stdout = output.word
                elif n_input == 2:
                    self.stderr = output.word
            else:
                sys.exit(f"oops 1: {type(n_input)}")
        else:
            sys.exit(f"oops 2: {type(n_input)}")

    def visitheredoc(self, n: bashlex.ast.node, value: typing.Any) -> None:  # type: ignore[name-defined]
        pass


def main() -> None:
    our_dirname: str = os.path.dirname(os.path.realpath(__file__))
    lastz_command_config_file: str = os.path.join(our_dirname, "lastz-cmd.ini")

    config: configparser.ConfigParser = configparser.ConfigParser()
    config.read(lastz_command_config_file)

    package_file = PackageFile()
    lastz_command_file = "lastz_commands.txt"
    bashCommandLineFile(lastz_command_file, config, package_file)
    package_file.close()


if __name__ == "__main__":
    main()