changeset 8:150de8a3954a draft

planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit 11132ef8b4103731b6f5860ea736bf332a06e303
author richard-burhans
date Tue, 09 Jul 2024 17:37:53 +0000
parents 39c21be8f8b9
children 08e987868f0f
files diagonal_partition.py macros.xml runner.py segalign.xml
diffstat 4 files changed, 494 insertions(+), 225 deletions(-) [+]
line wrap: on
line diff
--- a/diagonal_partition.py	Tue May 07 15:28:36 2024 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,216 +0,0 @@
-#!/usr/bin/env python
-
-"""
-Diagonal partitioning for segment files output by SegAlign.
-
-Usage:
-diagonal_partition <max-segments> <lastz-command>
-"""
-
-import os
-import sys
-import typing
-
-T = typing.TypeVar("T", bound="_Sliceable")
-
-
-class _Sliceable(typing.Protocol):
-    def __len__(self) -> int:
-        ...
-
-    def __getitem__(self: T, i: slice) -> T:
-        ...
-
-
-def chunks(lst: T, n: int) -> typing.Iterator[T]:
-    """Yield successive n-sized chunks from list."""
-    for i in range(0, len(lst), n):
-        yield lst[i: i + n]
-
-
-if __name__ == "__main__":
-
-    DELETE_AFTER_CHUNKING = True
-
-    # input_params = "10000 sad sadsa sad --segments=tmp10.block5.r1239937044.plus.segments dsa sa --strand=plus --output=out.maf sadads 2> logging.err"
-    # sys.argv = [sys.argv[0]] + input_params.split(' ')
-    chunk_size = int(sys.argv[1])  # first parameter contains chunk size
-    params = sys.argv[2:]
-
-    # Parsing command output from SegAlign
-    segment_key = "--segments="
-    segment_index = None
-    input_file: typing.Optional[str] = None
-
-    for index, value in enumerate(params):
-        if value[: len(segment_key)] == segment_key:
-            segment_index = index
-            input_file = value[len(segment_key):]
-            break
-    if segment_index is None:
-        print(f"Error: could not segment key {segment_key} in parameters {params}")
-        exit(0)
-
-    if input_file is None or not os.path.isfile(input_file):
-        print(f"Error: File {input_file} does not exist")
-        exit(0)
-
-    if (
-        chunk_size == 0 or sum(1 for _ in open(input_file)) <= chunk_size
-    ):  # no need to sort if number of lines <= chunk_size
-        print(" ".join(params), flush=True)
-        exit(0)
-
-    # Find rest of relevant parameters
-    output_key = "--output="
-    output_index: typing.Optional[int] = None
-    output_alignment_file = None
-    output_alignment_file_base: typing.Optional[str] = None
-    output_format = None
-
-    strand_key = "--strand="
-    strand_index = None
-    for index, value in enumerate(params):
-        if value[: len(output_key)] == output_key:
-            output_index = index
-            output_alignment_file = value[len(output_key):]
-            output_alignment_file_base, output_format = output_alignment_file.rsplit(
-                ".", 1
-            )
-        if value[: len(strand_key)] == strand_key:
-            strand_index = index
-    if segment_index is None:
-        print(f"Error: could not output key {output_key} in parameters {params}")
-        exit(0)
-    if strand_index is None:
-        print(f"Error: could not output key {strand_key} in parameters {params}")
-        exit(0)
-
-    err_index = -1  # error file is at very end
-    err_name_base = params[-1].split(".err", 1)[0]
-
-    data: typing.Dict[
-        typing.Tuple[str, str], typing.List[typing.Tuple[int, int, str]]
-    ] = {}  # dict of list of tuple (x, y, str)
-
-    direction = None
-    if "plus" in params[strand_index]:
-        direction = "f"
-    elif "minus" in params[strand_index]:
-        direction = "r"
-    else:
-        print(
-            f"Error: could not figure out direction from strand value {params[strand_index]}"
-        )
-        exit(0)
-
-    for line in open(input_file, "r"):
-        if line == "":
-            continue
-        (
-            seq1_name,
-            seq1_start,
-            seq1_end,
-            seq2_name,
-            seq2_start,
-            seq2_end,
-            _dir,
-            score,
-        ) = line.split()
-        # data.append((int(seq1_start), int(seq2_start), line))
-        half_dist = int((int(seq1_end) - int(seq1_start)) // 2)
-        assert int(seq1_end) > int(seq1_start)
-        assert int(seq2_end) > int(seq2_start)
-        seq1_mid = int(seq1_start) + half_dist
-        seq2_mid = int(seq2_start) + half_dist
-        data.setdefault((seq1_name, seq2_name), []).append((seq1_mid, seq2_mid, line))
-
-    # If there are chromosome pairs with segment count <= chunk_size
-    # then no need to sort and split these pairs into separate files.
-    # It is better to keep these pairs in a single segment file.
-    skip_pairs = []  # pairs that have count <= chunk_size.
-    # these will not be sorted
-    if len(data.keys()) > 1:
-        for pair in data.keys():
-            if len(data[pair]) <= chunk_size:
-                skip_pairs.append(pair)
-
-    # sorting for forward segments
-    if direction == "r":
-        for pair in data.keys():
-            if pair not in skip_pairs:
-                data[pair] = sorted(
-                    data[pair], key=lambda coord: (coord[1] - coord[0], coord[0])
-                )
-    # sorting for reverse segments
-    elif direction == "f":
-        for pair in data.keys():
-            if pair not in skip_pairs:
-                data[pair] = sorted(
-                    data[pair], key=lambda coord: (coord[1] + coord[0], coord[0])
-                )
-    else:
-        print(f"INVALID DIRECTION VALUE: {direction}")
-        exit(0)
-
-    # Writing file in chunks
-    ctr = 0
-    for pair in data.keys() - skip_pairs:
-        for chunk in chunks(list(zip(*data[pair]))[2], chunk_size):
-            ctr += 1
-            name_addition = f".split{ctr}"
-            fname = input_file.split(".segments", 1)[0] + name_addition + ".segments"
-
-            with open(fname, "w") as f:
-                f.writelines(chunk)
-            # update segment file in command
-            params[segment_index] = segment_key + fname
-            # update output file in command
-            if output_index is not None:
-                params[output_index] = (
-                    f"{output_key}{output_alignment_file_base}{name_addition}.{output_format}"
-                )
-            # update error file in command
-            params[-1] = err_name_base + name_addition + ".err"
-            print(" ".join(params), flush=True)
-
-    # writing unsorted skipped pairs
-    if len(skip_pairs) > 0:
-        skip_pairs_with_len = sorted(
-            [(len(data[p]), p) for p in skip_pairs]
-        )  # list of tuples of (pair length, pair)
-        aggregated_skip_pairs: typing.List[typing.List[typing.Any]] = (
-            []
-        )  # list of list of pair names
-        current_count = 0
-        aggregated_skip_pairs.append([])
-        for count, pair in skip_pairs_with_len:
-            if current_count + count <= chunk_size:
-                current_count += count
-                aggregated_skip_pairs[-1].append(pair)
-            else:
-                aggregated_skip_pairs.append([])
-                current_count = count
-                aggregated_skip_pairs[-1].append(pair)
-
-        for aggregate in aggregated_skip_pairs:
-            ctr += 1
-            name_addition = f".split{ctr}"
-            fname = input_file.split(".segments", 1)[0] + name_addition + ".segments"
-            with open(fname, "w") as f:
-                for pair in sorted(aggregate, key=lambda p: (p[1], p[0])):
-                    chunk = list(zip(*data[pair]))[2]
-                    f.writelines(chunk)
-            # update segment file in command
-            params[segment_index] = segment_key + fname
-            # update output file in command
-            if output_index is not None:
-                params[output_index] = (
-                    f"{output_key}{output_alignment_file_base}{name_addition}.{output_format}"
-                )
-            # update error file in command
-            params[-1] = err_name_base + name_addition + ".err"
-            print(" ".join(params), flush=True)
-
-    if DELETE_AFTER_CHUNKING:
-        os.remove(input_file)
--- a/macros.xml	Tue May 07 15:28:36 2024 +0000
+++ b/macros.xml	Tue Jul 09 17:37:53 2024 +0000
@@ -2,11 +2,9 @@
     <xml name="requirements">
         <requirements>
             <requirement type="package" version="@TOOL_VERSION@">segalign-full</requirement>
-            <requirement type="package" version="0.18">bashlex</requirement>
-            <requirement type="package" version="3.12">python</requirement>
         </requirements>
     </xml>
-    <token name="@TOOL_VERSION@">0.1.2.2</token>
+    <token name="@TOOL_VERSION@">0.1.2.7</token>
     <token name="@VERSION_SUFFIX@">0</token>
     <token name="@PROFILE@">21.05</token>
     <xml name="edam_ontology">
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/runner.py	Tue Jul 09 17:37:53 2024 +0000
@@ -0,0 +1,482 @@
+#!/usr/bin/env python
+
+import argparse
+import collections
+import concurrent.futures
+import multiprocessing
+import os
+import queue
+import re
+import resource
+import statistics
+import subprocess
+import sys
+import time
+import typing
+
+SENTINEL_VALUE: typing.Final = "SENTINEL"
+# CA_SENTEL_VALUE: typing.Final = ChunkAddress(0, 0, 0, 0, 0, SENTINEL_VALUE)
+RUSAGE_ATTRS: typing.Final = ["ru_utime", "ru_stime", "ru_maxrss", "ru_minflt", "ru_majflt", "ru_inblock", "ru_oublock", "ru_nvcsw", "ru_nivcsw"]
+
+
+class LastzCommands:
+    def __init__(self) -> None:
+        self.commands: dict[str, LastzCommand] = {}
+        self.segalign_segments: SegAlignSegments = SegAlignSegments()
+
+    def add(self, line: str) -> None:
+        if line not in self.commands:
+            self.commands[line] = LastzCommand(line)
+
+        command = self.commands[line]
+        self.segalign_segments.add(command.segments_filename)
+
+    def segments(self) -> typing.Iterator["SegAlignSegment"]:
+        for segment in self.segalign_segments:
+            yield segment
+
+
+class LastzCommand:
+    lastz_command_regex = re.compile(r"lastz (.+?)?ref\.2bit\[nameparse=darkspace\]\[multiple\]\[subset=ref_block(\d+)\.name\] (.+?)?query\.2bit\[nameparse=darkspace\]\[subset=query_block(\d+)\.name] --format=(\S+) --ydrop=(\d+) --gappedthresh=(\d+) --strand=(minus|plus)(?: --ambiguous=(\S+))?(?: --(notrivial))?(?: --scoring=(\S+))? --segments=tmp(\d+)\.block(\d+)\.r(\d+)\.(minus|plus)(?:\.split(\d+))?\.segments --output=tmp(\d+)\.block(\d+)\.r(\d+)\.(minus|plus)(?:\.split(\d+))?\.(\S+) 2> tmp(\d+)\.block(\d+)\.r(\d+)\.(minus|plus)(?:\.split(\d+))?\.err")
+
+    def __init__(self, line: str) -> None:
+        self.line = line
+        self.args: list[str] = []
+        self.target_filename: str = ''
+        self.query_filename: str = ''
+        self.data_folder: str = ''
+        self.ref_block: int = 0
+        self.query_block: int = 0
+        self.output_format: str = ''
+        self.ydrop: int = 0
+        self.gappedthresh: int = 0
+        self.strand: int | None = None
+        self.ambiguous: bool = False
+        self.nontrivial: bool = False
+        self.scoring: str | None = None
+        self.segments_filename: str = ''
+        self.output_filename: str = ''
+        self.error_filename: str = ''
+
+        self._parse_command()
+
+    def _parse_command(self) -> None:
+        match = self.lastz_command_regex.match(self.line)
+        if not match:
+            sys.exit(f"Unkown lastz command format: {self.line}")
+
+        self.data_folder = match.group(1)
+        self.ref_block = int(match.group(2))
+        self.query_block = int(match.group(4))
+        self.output_format = match.group(5)
+        self.ydrop = int(match.group(6))
+        self.gappedthresh = int(match.group(7))
+        strand = match.group(8)
+
+        if strand == 'plus':
+            self.strand = 0
+        elif strand == 'minus':
+            self.strand = 1
+
+        self.target_filename = f"{self.data_folder}ref.2bit[nameparse=darkspace][multiple][subset=ref_block{self.ref_block}.name]"
+        self.query_filename = f"{self.data_folder}query.2bit[nameparse=darkspace][subset=query_block{self.query_block}.name]"
+
+        self.args = [
+            "lastz",
+            self.target_filename,
+            self.query_filename,
+            f"--format={self.output_format}",
+            f"--ydrop={self.ydrop}",
+            f"--gappedthresh={self.gappedthresh}",
+            f"--strand={strand}"
+        ]
+
+        ambiguous = match.group(9)
+        if ambiguous is not None:
+            self.ambiguous = True
+            self.args.append(f"--ambiguous={ambiguous}")
+
+        nontrivial = match.group(10)
+        if nontrivial is not None:
+            self.nontrivial = True
+            self.args.append("--nontrivial")
+
+        scoring = match.group(11)
+        if scoring is not None:
+            self.scoring = scoring
+            self.args.append(f"--scoring={scoring}")
+
+        tmp_no = int(match.group(12))
+        block_no = int(match.group(13))
+        r_no = int(match.group(14))
+        split = match.group(16)
+
+        base_filename = f"tmp{tmp_no}.block{block_no}.r{r_no}.{strand}"
+
+        if split is not None:
+            base_filename = f"{base_filename}.split{split}"
+
+        self.segments_filename = f"{base_filename}.segments"
+        self.args.append(f"--segments={self.segments_filename}")
+
+        self.output_filename = f"{base_filename}.{self.output_format}"
+        self.args.append(f"--output={self.output_filename}")
+
+        self.error_filename = f"{base_filename}.err"
+
+
+class SegAlignSegments:
+    def __init__(self) -> None:
+        self.segments: dict[str, SegAlignSegment] = {}
+
+    def add(self, filename: str) -> None:
+        if filename not in self.segments:
+            self.segments[filename] = SegAlignSegment(filename)
+
+    def __iter__(self) -> "SegAlignSegments":
+        return self
+
+    def __next__(self) -> typing.Generator["SegAlignSegment", None, None]:
+        for segment in sorted(self.segments.values()):
+            yield segment
+
+
+class SegAlignSegment:
+    def __init__(self, filename: str):
+        self.filename = filename
+        self.tmp: int = 0
+        self.block: int = 0
+        self.r: int = 0
+        self.strand: int = 0
+        self.split: int | None = None
+        self.fmt: str = ""
+        self._parse_filename()
+
+    def _parse_filename(self) -> None:
+        match = re.match(r"tmp(\d+)\.block(\d+)\.r(\d+)\.(minus|plus)(?:\.split(\d+))?\.segments$", self.filename)
+        if not match:
+            sys.exit(f"Unkown segment filename format: {self.filename}")
+
+        self.tmp = int(match.group(1))
+        self.block = int(match.group(2))
+        self.r = int(match.group(3))
+
+        strand = match.group(4)
+        if strand == 'plus':
+            self.strand = 0
+        if strand == 'minus':
+            self.strand = 1
+
+        split = match.group(5)
+        if split is None:
+            self.split = None
+        else:
+            self.split = int(split)
+
+    def __lt__(self, other: "SegAlignSegment") -> bool:
+        for attr in ['strand', 'tmp', 'block', 'r', 'split']:
+            self_value = getattr(self, attr)
+            other_value = getattr(other, attr)
+            if self_value < other_value:
+                return True
+            elif self_value > other_value:
+                return False
+
+        return False
+
+
+def main() -> None:
+    args, segalign_args = parse_args()
+    lastz_commands = LastzCommands()
+
+    if args.diagonal_partition:
+        num_diagonal_partitioners = args.num_cpu
+    else:
+        num_diagonal_partitioners = 0
+
+    with multiprocessing.Manager() as manager:
+        segalign_q: queue.Queue[str] = manager.Queue()
+        skip_segalign = run_segalign(args, num_diagonal_partitioners, segalign_args, segalign_q, lastz_commands)
+
+        if num_diagonal_partitioners > 0:
+            diagonal_partition_q = segalign_q
+            segalign_q = manager.Queue()
+            run_diagonal_partitioners(args, num_diagonal_partitioners, diagonal_partition_q, segalign_q)
+
+        segalign_q.put(SENTINEL_VALUE)
+        output_q = segalign_q
+        segalign_q = manager.Queue()
+
+        output_filename = "lastz-commands.txt"
+        if args.output_type == "commands":
+            output_filename = args.output_file
+
+        with open(output_filename, "w") as f:
+            while True:
+                line = output_q.get()
+                if line == SENTINEL_VALUE:
+                    output_q.task_done()
+                    break
+
+                # messy, fix this
+                if skip_segalign:
+                    lastz_commands.add(line)
+
+                if args.output_type != "commands":
+                    segalign_q.put(line)
+
+                print(line, file=f)
+
+        if args.output_type == "output":
+            run_lastz(args, segalign_q, lastz_commands)
+
+            with open(args.output_file, 'w') as of:
+                print("##maf version=1", file=of)
+                for lastz_command in lastz_commands.commands.values():
+                    with open(lastz_command.output_filename) as f:
+                        for line in f:
+                            of.write(line)
+
+        if args.output_type == "tarball":
+            pass
+
+
+def run_lastz(args: argparse.Namespace, input_q: queue.Queue[str], lastz_commands: LastzCommands) -> None:
+    num_lastz_workers = args.num_cpu
+
+    for _ in range(num_lastz_workers):
+        input_q.put(SENTINEL_VALUE)
+
+    if args.debug:
+        r_beg = resource.getrusage(resource.RUSAGE_CHILDREN)
+        beg: int = time.monotonic_ns()
+
+    try:
+        with concurrent.futures.ProcessPoolExecutor(max_workers=num_lastz_workers) as executor:
+            for i in range(num_lastz_workers):
+                executor.submit(lastz_worker, input_q, i, lastz_commands)
+    except Exception as e:
+        sys.exit(f"Error: lastz failed: {e}")
+
+    if args.debug:
+        ns: int = time.monotonic_ns() - beg
+        r_end = resource.getrusage(resource.RUSAGE_CHILDREN)
+        print(f"lastz clock time: {ns} ns", file=sys.stderr, flush=True)
+        for rusage_attr in RUSAGE_ATTRS:
+            value = getattr(r_end, rusage_attr) - getattr(r_beg, rusage_attr)
+            print(f"  lastz {rusage_attr}: {value}", file=sys.stderr, flush=True)
+
+
+def lastz_worker(input_q: queue.Queue[str], instance: int, lastz_commands: LastzCommands) -> None:
+    while True:
+        line = input_q.get()
+        if line == SENTINEL_VALUE:
+            input_q.task_done()
+            break
+
+        if line not in lastz_commands.commands:
+            sys.exit(f"Error: lastz worker {instance} unexpected command format: {line}")
+
+        command = lastz_commands.commands[line]
+
+        if not os.path.exists(command.output_filename):
+            process = subprocess.run(command.args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
+
+            for line in process.stdout.splitlines():
+                print(line, file=sys.stdout, flush=True)
+
+            if len(process.stderr) != 0:
+                for line in process.stderr.splitlines():
+                    print(line, file=sys.stderr, flush=True)
+
+            if process.returncode != 0:
+                sys.exit(f"Error: lastz {instance} exited with returncode {process.returncode}")
+
+
+def run_diagonal_partitioners(args: argparse.Namespace, num_workers: int, input_q: queue.Queue[str], output_q: queue.Queue[str]) -> None:
+    chunk_size = estimate_chunk_size(args)
+
+    if args.debug:
+        print(f"estimated chunk size: {chunk_size}", file=sys.stderr, flush=True)
+
+    with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor:
+        for i in range(num_workers):
+            executor.submit(diagonal_partition_worker(input_q, output_q, chunk_size, i))
+
+
+def diagonal_partition_worker(input_q: queue.Queue[str], output_q: queue.Queue[str], chunk_size: int, instance: int) -> None:
+    while True:
+        line = input_q.get()
+        if line == SENTINEL_VALUE:
+            input_q.task_done()
+            break
+
+        run_args = ["python", "/jetstream2/scratch/rico/job-dir/tool_files/diagonal_partition.py", str(chunk_size)]
+        for word in line.split():
+            run_args.append(word)
+        process = subprocess.run(run_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
+
+        for line in process.stdout.splitlines():
+            output_q.put(line)
+
+        for line in process.stderr.splitlines():
+            print(line, file=sys.stderr, flush=True)
+
+        if process.returncode != 0:
+            sys.exit(f"Error: diagonal partitioner {instance} exited with returncode {process.returncode}")
+
+
+def estimate_chunk_size(args: argparse.Namespace) -> int:
+    chunk_size = -1
+    line_size = -1
+
+    if args.debug:
+        r_beg = resource.getrusage(resource.RUSAGE_SELF)
+        beg: int = time.monotonic_ns()
+
+    # get size of each segment assuming DELETE_AFTER_CHUNKING == True
+    # takes into account already split segments
+    fdict: typing.DefaultDict[str, int] = collections.defaultdict(int)
+    for entry in os.scandir("."):
+        if entry.name.endswith(".segments"):
+            try:
+                file_size = entry.stat().st_size
+            except FileNotFoundError:
+                continue
+
+            if line_size == -1:
+                try:
+                    with open(entry.name) as f:
+                        line_size = len(f.readline())  # add 1 for newline
+                except FileNotFoundError:
+                    continue
+
+            fdict[entry.name.split(".split", 1)[0]] += file_size
+
+    # if noot enough segment files for estimation, continue
+    if len(fdict) > 2:
+        chunk_size = int(statistics.quantiles(fdict.values())[-1] // line_size)
+
+    if args.debug:
+        ns: int = time.monotonic_ns() - beg
+        r_end = resource.getrusage(resource.RUSAGE_SELF)
+        print(f"estimate chunk size clock time: {ns} ns", file=sys.stderr, flush=True)
+        for rusage_attr in RUSAGE_ATTRS:
+            value = getattr(r_end, rusage_attr) - getattr(r_beg, rusage_attr)
+            print(f"  estimate chunk size {rusage_attr}: {value}", file=sys.stderr, flush=True)
+
+    return chunk_size
+
+
+def run_segalign(args: argparse.Namespace, num_sentinel: int, segalign_args: list[str], segalign_q: queue.Queue[str], commands: LastzCommands) -> bool:
+    skip_segalign: bool = False
+
+    # use the currently existing output file if it exists
+    if args.debug:
+        skip_segalign = load_segalign_output("lastz-commands.txt", segalign_q)
+
+    if not skip_segalign:
+        run_args = ["segalign"]
+        run_args.extend(segalign_args)
+        run_args.append("--num_threads")
+        run_args.append(str(args.num_cpu))
+
+        if args.debug:
+            beg: int = time.monotonic_ns()
+            r_beg = resource.getrusage(resource.RUSAGE_CHILDREN)
+
+        process = subprocess.run(run_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
+
+        for line in process.stdout.splitlines():
+            commands.add(line)
+            segalign_q.put(line)
+
+        if len(process.stderr) != 0:
+            for line in process.stderr.splitlines():
+                print(line, file=sys.stderr, flush=True)
+
+        if process.returncode != 0:
+            sys.exit(f"Error: segalign exited with returncode {process.returncode}")
+
+        if args.debug:
+            ns: int = time.monotonic_ns() - beg
+            r_end = resource.getrusage(resource.RUSAGE_CHILDREN)
+            print(f"segalign clock time: {ns} ns", file=sys.stderr, flush=True)
+            for rusage_attr in RUSAGE_ATTRS:
+                value = getattr(r_end, rusage_attr) - getattr(r_beg, rusage_attr)
+                print(f"  segalign {rusage_attr}: {value}", flush=True)
+
+    for _ in range(num_sentinel):
+        segalign_q.put(SENTINEL_VALUE)
+
+    return skip_segalign
+
+
+def load_segalign_output(filename: str, segalign_q: queue.Queue[str]) -> bool:
+    load_success = False
+
+    r_beg = resource.getrusage(resource.RUSAGE_SELF)
+    beg: int = time.monotonic_ns()
+
+    try:
+        with open(filename) as f:
+            for line in f:
+                line = line.rstrip("\n")
+                segalign_q.put(line)
+        load_success = True
+    except FileNotFoundError:
+        pass
+
+    if load_success:
+        ns: int = time.monotonic_ns() - beg
+        r_end = resource.getrusage(resource.RUSAGE_SELF)
+        print(f"load output clock time: {ns} ns", file=sys.stderr, flush=True)
+        for rusage_attr in RUSAGE_ATTRS:
+            value = getattr(r_end, rusage_attr) - getattr(r_beg, rusage_attr)
+            print(f"  load output {rusage_attr}: {value}", flush=True)
+
+    return load_success
+
+
+def parse_args() -> tuple[argparse.Namespace, list[str]]:
+    parser = argparse.ArgumentParser(allow_abbrev=False)
+
+    parser.add_argument("--output-type", nargs="?", const="commands", default="commands", type=str, choices=["commands", "output", "tarball"], help="output type (default: %(default)s)")
+    parser.add_argument("--output-file", type=str, required=True, help="output pathname")
+    parser.add_argument("--diagonal-partition", action="store_true", help="run diagonal partition optimization")
+    parser.add_argument("--nogapped", action="store_true", help="don't perform gapped extension stage")
+    parser.add_argument("--markend", action="store_true", help="write a marker line just before completion")
+    parser.add_argument("--num-gpu", default=-1, type=int, help="number of GPUs to use (default: %(default)s [use all GPUs])")
+    parser.add_argument("--num-cpu", default=-1, type=int, help="number of CPUs to use (default: %(default)s [use all CPUs])")
+    parser.add_argument("--debug", action="store_true", help="print debug messages")
+    parser.add_argument("--tool_directory", type=str, required=True, help="tool directory")
+
+    if not sys.argv[1:]:
+        parser.print_help()
+        sys.exit(0)
+
+    args, segalign_args = parser.parse_known_args(sys.argv[1:])
+
+    cpus_available = len(os.sched_getaffinity(0))
+    if args.num_cpu == -1:
+        args.num_cpu = cpus_available
+    elif args.num_cpu > cpus_available:
+        sys.exit(f"Error: additional {args.num_cpu - cpus_available} CPUs")
+
+    if args.nogapped:
+        segalign_args.append("--nogapped")
+
+    if args.markend:
+        segalign_args.append("--markend")
+
+    if args.num_gpu != -1:
+        segalign_args.extend(["--num_gpu", f"{args.num_gpu}"])
+
+    if args.debug:
+        segalign_args.append("--debug")
+
+    return args, segalign_args
+
+
+if __name__ == "__main__":
+    main()
--- a/segalign.xml	Tue May 07 15:28:36 2024 +0000
+++ b/segalign.xml	Tue Jul 09 17:37:53 2024 +0000
@@ -14,10 +14,9 @@
     <expand macro="edam_ontology"/>
     <expand macro="requirements"/>
     <required_files>
-        <include path="diagonal_partition.py"/>
         <include path="lastz-cmd.ini"/>
         <include path="package_output.py"/>
-        <include path="run_segalign_diagonal_partition"/>
+        <include path="runner.py"/>
     </required_files>
     <command detect_errors="exit_code"><![CDATA[
 ##
@@ -38,12 +37,19 @@
         #set $segalign_mode = "segalign_diagonal_partition"
         ## explicitly calling bash to bypass a pulsar bug
         ## https://github.com/galaxyproject/pulsar/issues/341
-        bash '$__tool_directory__/run_segalign_diagonal_partition'
+        bash '$__tool_directory__/runner.py'
+            --output-type tarball
+            --output-filename fake
+            --diagonal-partition
+            --num-cpu \${GALAXY_SLOTS:-2}
             --tool_directory '$__tool_directory__'
-            --max_segments '$mode.diagonal_partition_options.max_segments'
     #else
         #set $segalign_mode = "segalign"
-        run_segalign
+        bash '$__tool_directory__/runner.py'
+            --output-type output
+            --output-filename foo.maf
+            --num-cpu \${GALAXY_SLOTS:-2}
+            --tool_directory '$__tool_directory__'
     #end if
             '$mode.target'
             '$mode.query'
@@ -187,7 +193,6 @@
                 <expand macro="system_options"/>
                 <section name="diagonal_partition_options" expanded="false" title="Diagonal Partition Options">
                     <param argument="--diagonal_partition" type="boolean" value="false" label="Enable diagonal partition optimization"/>
-                    <param argument="--max_segments" type="integer" value="20000" label="Max segments"/>
                 </section>
             </when>
             <when value="segalign_repeat_masker">