changeset 0:103538753e81 draft

planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/batched_lastz commit 7b119b432f721e228a73396d4e8f0d54350b0481
author richard-burhans
date Tue, 30 Apr 2024 21:06:58 +0000
parents
children ad3554614aad
files batched_lastz.xml macros.xml run_lastz_tarball.py
diffstat 3 files changed, 381 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/batched_lastz.xml	Tue Apr 30 21:06:58 2024 +0000
@@ -0,0 +1,20 @@
+<tool id="batched_lastz" name="Batched Lastz" version="@TOOL_VERSION@+galaxy@VERSION_SUFFIX@" profile="@PROFILE@">
+    <description>: align batches of sequences</description>
+    <macros>
+        <import>macros.xml</import>
+    </macros>
+    <expand macro="requirements"/>
+    <command detect_errors="exit_code"><![CDATA[
+        run_lastz_tarball.py '--input=$input' '--output=$output' '--parallel=\${GALAXY_SLOTS:-2}'
+    ]]></command>
+    <inputs>
+        <param argument="--tarball" type="data" format="tgz" label="Tarball"/>
+    </inputs>
+    <outputs>
+        <data name="output" label="Output"/>
+    </outputs>
+    <help><![CDATA[
+    TODO: Fill in help.
+    ]]></help>
+    <expand macro="citations"/>
+</tool>
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/macros.xml	Tue Apr 30 21:06:58 2024 +0000
@@ -0,0 +1,26 @@
+<macros>
+    <xml name="requirements">
+        <requirements>
+            <requirement type="package" version="@TOOL_VERSION@">lastz</requirement>
+            <yield/>
+        </requirements>
+    </xml>
+    <token name="@TOOL_VERSION@">1.04.22</token>
+    <token name="@VERSION_SUFFIX@">0</token>
+    <token name="@PROFILE@">21.05</token>
+    <xml name="citations">
+        <citations>
+            <citation type="bibtex">
+            @misc{
+                githublastz,
+                author = {Harris, Robert},
+                year = {2007},
+                title = {Improved pairwise alignment of genomic DNA},
+                publisher = {The Pennsylvania State University},
+                journal = {Ph. D. Thesis},
+                url = {http://www.bx.psu.edu/~rsharris/rsharris_phd_thesis_2007.pdf},
+                }
+            </citation>
+        </citations>
+    </xml>
+</macros>
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/run_lastz_tarball.py	Tue Apr 30 21:06:58 2024 +0000
@@ -0,0 +1,335 @@
+#!/usr/bin/env python
+
+import argparse
+import concurrent.futures
+import json
+import multiprocessing
+import os
+import queue
+import re
+import shutil
+import sys
+import subprocess
+import tarfile
+import tempfile
+import typing
+import time
+
+
+lastz_output_format_regex = re.compile(
+    r"^(?:axt\+?|blastn|cigar|differences|general-?.+|lav|lav\+text|maf[-+]?|none|paf(?::wfmash)?|rdotplot|sam-?|softsam-?|text)$",
+    re.IGNORECASE,
+)
+
+
+# Specifies the output format: lav, lav+text, axt, axt+, maf, maf+, maf-, sam, softsam, sam-, softsam-, cigar, BLASTN, PAF, PAF:wfmash, differences, rdotplot, text, general[:<fields>], or general-[:<fields>].
+# ‑‑format=none can be used when no alignment output is desired.
+
+
+def run_command(
+    instance: int,
+    input_queue: "queue.Queue[typing.Dict[str, typing.Any]]",
+    output_queue: "queue.Queue[float]",
+    debug: bool = False,
+) -> None:
+    os.chdir("galaxy/files")
+
+    while True:
+        command_dict = input_queue.get()
+
+        if not command_dict:
+            return
+
+        args = ["lastz"]
+        args.extend(command_dict["args"])
+
+        stdin = command_dict["stdin"]
+        if stdin is not None:
+            stdin = open(stdin, "r")
+
+        stdout = command_dict["stdout"]
+        if stdout is not None:
+            stdout = open(stdout, "w")
+
+        stderr = command_dict["stderr"]
+        if stderr is not None:
+            stderr = open(stderr, "w")
+
+        begin = time.perf_counter()
+        p = subprocess.run(args, stdin=stdin, stdout=stdout, stderr=stderr)
+
+        for var in [stdin, stdout, stderr]:
+            if var is not None:
+                var.close()
+
+        if p.returncode != 0:
+            sys.exit(f"command failed: {' '.join(args)}")
+        else:
+            stderr = command_dict["stderr"]
+            if stderr is not None:
+                try:
+                    statinfo = os.stat(stderr, follow_symlinks=False)
+                except:
+                    statinfo = None
+
+                if statinfo is None:
+                    sys.exit(f"unable to stat stderr file: {' '.join(args)}")
+
+                if statinfo.st_size != 0:
+                    sys.exit(f"stderr file is not empty: {' '.join(args)}")
+
+            elapsed = time.perf_counter() - begin
+            output_queue.put(elapsed)
+
+        if debug:
+            print(f"runtime {elapsed}", file=sys.stderr, flush=True)
+
+
+class BatchTar:
+    def __init__(self, pathname: str, debug: bool = False) -> None:
+        self.pathname = pathname
+        self.debug = debug
+        self.tarfile = None
+        self.commands: typing.List[typing.Dict[str, typing.Any]] = []
+        self._extract()
+        self._load_commands()
+
+    def batch_commands(self) -> typing.Iterator[typing.Dict[str, typing.Any]]:
+        for command in self.commands:
+            yield command
+
+    def _load_commands(self) -> None:
+        try:
+            f = open("galaxy/commands.json")
+        except FileNotFoundError:
+            sys.exit(
+                f"ERROR: input tarball missing galaxy/commands.json: {self.pathname}"
+            )
+
+        begin = time.perf_counter()
+        for json_line in f:
+            json_line = json_line.rstrip("\n")
+            try:
+                command_dict = json.loads(json_line)
+            except json.JSONDecodeError:
+                sys.exit(
+                    f"ERROR: bad json line in galaxy/commands.json: {self.pathname}"
+                )
+
+            self._load_command(command_dict)
+
+        f.close()
+        elapsed = time.perf_counter() - begin
+
+        if self.debug:
+            print(
+                f"loaded {len(self.commands)} commands in {elapsed} seconds ",
+                file=sys.stderr,
+                flush=True,
+            )
+
+    def _load_command(self, command_dict: typing.Dict[str, typing.Any]) -> None:
+        # check command_dict structure
+        field_types: typing.Dict[str, typing.List[typing.Any]] = {
+            "executable": [str],
+            "args": [list],
+            "stdin": [str, "None"],
+            "stdout": [str, "None"],
+            "stderr": [str, "None"],
+        }
+
+        bad_format = False
+        for field_name in field_types.keys():
+            # missing field
+            if field_name not in command_dict:
+                bad_format = True
+                break
+
+            # incorrect field type
+            good_type = False
+            for field_type in field_types[field_name]:
+                if isinstance(field_type, str) and field_type == "None":
+                    if command_dict[field_name] is None:
+                        good_type = True
+                        break
+                elif isinstance(command_dict[field_name], field_type):
+                    good_type = True
+                    break
+
+            if good_type is False:
+                bad_format = True
+
+        if not bad_format:
+            # all args must be strings
+            for arg in command_dict["args"]:
+                if not isinstance(arg, str):
+                    bad_format = True
+                    break
+
+        if bad_format:
+            sys.exit(
+                f"ERROR: unexpected json format in line in galaxy/commands.json: {self.pathname}"
+            )
+
+        self.commands.append(command_dict)
+
+    def _extract(self) -> None:
+        try:
+            self.tarball = tarfile.open(
+                name=self.pathname, mode="r:*", format=tarfile.GNU_FORMAT
+            )
+        except FileNotFoundError:
+            sys.exit(f"ERROR: unable to find input tarball: {self.pathname}")
+        except tarfile.ReadError:
+            sys.exit(f"ERROR: error reading input tarball: {self.pathname}")
+
+        begin = time.perf_counter()
+        self.tarball.extractall(filter="data")
+        self.tarball.close()
+        elapsed = time.perf_counter() - begin
+
+        if self.debug:
+            print(
+                f"Extracted tarball in {elapsed} seconds", file=sys.stderr, flush=True
+            )
+
+
+class TarRunner:
+    def __init__(
+        self,
+        input_pathname: str,
+        output_pathname: str,
+        parallel: int,
+        debug: bool = False,
+    ) -> None:
+        self.input_pathname = input_pathname
+        self.output_pathname = output_pathname
+        self.parallel = parallel
+        self.debug = debug
+        self.batch_tar = BatchTar(self.input_pathname, debug=self.debug)
+        self.output_file_format: typing.Dict[str, str] = {}
+        self.output_files: typing.Dict[str, typing.List[str]] = {}
+        self._set_output()
+        self._set_target_query()
+
+    def _set_output(self) -> None:
+        for command_dict in self.batch_tar.batch_commands():
+            output_file = None
+            output_format = None
+
+            for arg in command_dict["args"]:
+                if arg.startswith("--format="):
+                    output_format = arg[9:]
+                elif arg.startswith("--output="):
+                    output_file = arg[9:]
+
+            if output_file is None:
+                f = tempfile.NamedTemporaryFile(dir="galaxy/files", delete=False)
+                output_file = os.path.basename(f.name)
+                f.close()
+                command_dict["args"].append(f"--output={output_file}")
+
+            if output_format is None:
+                output_format = "lav"
+                command_dict["args"].append(f"--format={output_format}")
+
+            if not lastz_output_format_regex.match(output_format):
+                sys.exit(f"ERROR: invalid output format: {output_format}")
+
+            self.output_file_format[output_file] = output_format
+
+        for output_file, output_format in self.output_file_format.items():
+            self.output_files.setdefault(output_format, [])
+            self.output_files[output_format].append(output_file)
+
+    def _set_target_query(self) -> None:
+        for command_dict in self.batch_tar.batch_commands():
+            new_args: typing.List[str] = []
+
+            for arg in command_dict["args"]:
+                if arg.startswith("--target="):
+                    new_args.insert(0, arg[9:])
+                elif arg.startswith("--query="):
+                    new_args.insert(1, arg[8:])
+                else:
+                    new_args.append(arg)
+
+            command_dict["args"] = new_args
+
+    def run(self) -> None:
+        run_times = []
+        begin = time.perf_counter()
+
+        with multiprocessing.Manager() as manager:
+            input_queue: queue.Queue[typing.Dict[str, typing.Any]] = manager.Queue()
+            output_queue: queue.Queue[float] = manager.Queue()
+
+            for command_dict in self.batch_tar.batch_commands():
+                input_queue.put(command_dict)
+
+            # use the empty dict as a sentinel
+            for _ in range(self.parallel):
+                input_queue.put({})
+
+            with concurrent.futures.ProcessPoolExecutor(
+                max_workers=self.parallel
+            ) as executor:
+                futures = [
+                    executor.submit(
+                        run_command,
+                        instance,
+                        input_queue,
+                        output_queue,
+                        debug=self.debug,
+                    )
+                    for instance in range(self.parallel)
+                ]
+
+            for f in concurrent.futures.as_completed(futures):
+                if not f.done() or f.cancelled() or f.exception() is not None:
+                    sys.exit("lastz command failed")
+
+            while not output_queue.empty():
+                run_time = output_queue.get()
+                run_times.append(run_time)
+
+        elapsed = time.perf_counter() - begin
+
+        if self.debug:
+            print(f"elapsed {elapsed}", file=sys.stderr, flush=True)
+
+        self._cleanup()
+
+    def _cleanup(self) -> None:
+        num_output_files = len(self.output_files.keys())
+
+        for file_type, file_list in self.output_files.items():
+            with open(f"output.{file_type}", "w") as ofh:
+                for filename in file_list:
+                    with open(f"galaxy/files/{filename}") as ifh:
+                        for line in ifh:
+                            ofh.write(line)
+
+        if num_output_files == 1:
+            file_type = list(self.output_files.keys())[0]
+            src_filename = f"output.{file_type}"
+            shutil.copy2(src_filename, self.output_pathname)
+
+
+def main() -> None:
+    if not hasattr(tarfile, "data_filter"):
+        sys.exit("ERROR: extracting may be unsafe; consider updating Python")
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--input", type=str, required=True)
+    parser.add_argument("--output", type=str, required=True)
+    parser.add_argument("--parallel", type=int, default=1, required=False)
+    parser.add_argument("--debug", action="store_true", required=False)
+
+    args = parser.parse_args()
+    runner = TarRunner(args.input, args.output, args.parallel, args.debug)
+    runner.run()
+
+
+if __name__ == "__main__":
+    main()