# HG changeset patch # User richard-burhans # Date 1720546673 0 # Node ID 150de8a3954a1e7baf510d94aa3feee6c7b60a1b # Parent 39c21be8f8b9ed7f3ef103e99c041e09bcd1eb21 planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit 11132ef8b4103731b6f5860ea736bf332a06e303 diff -r 39c21be8f8b9 -r 150de8a3954a diagonal_partition.py --- a/diagonal_partition.py Tue May 07 15:28:36 2024 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,216 +0,0 @@ -#!/usr/bin/env python - -""" -Diagonal partitioning for segment files output by SegAlign. - -Usage: -diagonal_partition -""" - -import os -import sys -import typing - -T = typing.TypeVar("T", bound="_Sliceable") - - -class _Sliceable(typing.Protocol): - def __len__(self) -> int: - ... - - def __getitem__(self: T, i: slice) -> T: - ... - - -def chunks(lst: T, n: int) -> typing.Iterator[T]: - """Yield successive n-sized chunks from list.""" - for i in range(0, len(lst), n): - yield lst[i: i + n] - - -if __name__ == "__main__": - - DELETE_AFTER_CHUNKING = True - - # input_params = "10000 sad sadsa sad --segments=tmp10.block5.r1239937044.plus.segments dsa sa --strand=plus --output=out.maf sadads 2> logging.err" - # sys.argv = [sys.argv[0]] + input_params.split(' ') - chunk_size = int(sys.argv[1]) # first parameter contains chunk size - params = sys.argv[2:] - - # Parsing command output from SegAlign - segment_key = "--segments=" - segment_index = None - input_file: typing.Optional[str] = None - - for index, value in enumerate(params): - if value[: len(segment_key)] == segment_key: - segment_index = index - input_file = value[len(segment_key):] - break - if segment_index is None: - print(f"Error: could not segment key {segment_key} in parameters {params}") - exit(0) - - if input_file is None or not os.path.isfile(input_file): - print(f"Error: File {input_file} does not exist") - exit(0) - - if ( - chunk_size == 0 or sum(1 for _ in open(input_file)) <= chunk_size - ): # no need to sort if number of lines <= chunk_size - print(" ".join(params), flush=True) - exit(0) - - # Find rest of relevant parameters - output_key = "--output=" - output_index: typing.Optional[int] = None - output_alignment_file = None - output_alignment_file_base: typing.Optional[str] = None - output_format = None - - strand_key = "--strand=" - strand_index = None - for index, value in enumerate(params): - if value[: len(output_key)] == output_key: - output_index = index - output_alignment_file = value[len(output_key):] - output_alignment_file_base, output_format = output_alignment_file.rsplit( - ".", 1 - ) - if value[: len(strand_key)] == strand_key: - strand_index = index - if segment_index is None: - print(f"Error: could not output key {output_key} in parameters {params}") - exit(0) - if strand_index is None: - print(f"Error: could not output key {strand_key} in parameters {params}") - exit(0) - - err_index = -1 # error file is at very end - err_name_base = params[-1].split(".err", 1)[0] - - data: typing.Dict[ - typing.Tuple[str, str], typing.List[typing.Tuple[int, int, str]] - ] = {} # dict of list of tuple (x, y, str) - - direction = None - if "plus" in params[strand_index]: - direction = "f" - elif "minus" in params[strand_index]: - direction = "r" - else: - print( - f"Error: could not figure out direction from strand value {params[strand_index]}" - ) - exit(0) - - for line in open(input_file, "r"): - if line == "": - continue - ( - seq1_name, - seq1_start, - seq1_end, - seq2_name, - seq2_start, - seq2_end, - _dir, - score, - ) = line.split() - # data.append((int(seq1_start), int(seq2_start), line)) - half_dist = int((int(seq1_end) - int(seq1_start)) // 2) - assert int(seq1_end) > int(seq1_start) - assert int(seq2_end) > int(seq2_start) - seq1_mid = int(seq1_start) + half_dist - seq2_mid = int(seq2_start) + half_dist - data.setdefault((seq1_name, seq2_name), []).append((seq1_mid, seq2_mid, line)) - - # If there are chromosome pairs with segment count <= chunk_size - # then no need to sort and split these pairs into separate files. - # It is better to keep these pairs in a single segment file. - skip_pairs = [] # pairs that have count <= chunk_size. - # these will not be sorted - if len(data.keys()) > 1: - for pair in data.keys(): - if len(data[pair]) <= chunk_size: - skip_pairs.append(pair) - - # sorting for forward segments - if direction == "r": - for pair in data.keys(): - if pair not in skip_pairs: - data[pair] = sorted( - data[pair], key=lambda coord: (coord[1] - coord[0], coord[0]) - ) - # sorting for reverse segments - elif direction == "f": - for pair in data.keys(): - if pair not in skip_pairs: - data[pair] = sorted( - data[pair], key=lambda coord: (coord[1] + coord[0], coord[0]) - ) - else: - print(f"INVALID DIRECTION VALUE: {direction}") - exit(0) - - # Writing file in chunks - ctr = 0 - for pair in data.keys() - skip_pairs: - for chunk in chunks(list(zip(*data[pair]))[2], chunk_size): - ctr += 1 - name_addition = f".split{ctr}" - fname = input_file.split(".segments", 1)[0] + name_addition + ".segments" - - with open(fname, "w") as f: - f.writelines(chunk) - # update segment file in command - params[segment_index] = segment_key + fname - # update output file in command - if output_index is not None: - params[output_index] = ( - f"{output_key}{output_alignment_file_base}{name_addition}.{output_format}" - ) - # update error file in command - params[-1] = err_name_base + name_addition + ".err" - print(" ".join(params), flush=True) - - # writing unsorted skipped pairs - if len(skip_pairs) > 0: - skip_pairs_with_len = sorted( - [(len(data[p]), p) for p in skip_pairs] - ) # list of tuples of (pair length, pair) - aggregated_skip_pairs: typing.List[typing.List[typing.Any]] = ( - [] - ) # list of list of pair names - current_count = 0 - aggregated_skip_pairs.append([]) - for count, pair in skip_pairs_with_len: - if current_count + count <= chunk_size: - current_count += count - aggregated_skip_pairs[-1].append(pair) - else: - aggregated_skip_pairs.append([]) - current_count = count - aggregated_skip_pairs[-1].append(pair) - - for aggregate in aggregated_skip_pairs: - ctr += 1 - name_addition = f".split{ctr}" - fname = input_file.split(".segments", 1)[0] + name_addition + ".segments" - with open(fname, "w") as f: - for pair in sorted(aggregate, key=lambda p: (p[1], p[0])): - chunk = list(zip(*data[pair]))[2] - f.writelines(chunk) - # update segment file in command - params[segment_index] = segment_key + fname - # update output file in command - if output_index is not None: - params[output_index] = ( - f"{output_key}{output_alignment_file_base}{name_addition}.{output_format}" - ) - # update error file in command - params[-1] = err_name_base + name_addition + ".err" - print(" ".join(params), flush=True) - - if DELETE_AFTER_CHUNKING: - os.remove(input_file) diff -r 39c21be8f8b9 -r 150de8a3954a macros.xml --- a/macros.xml Tue May 07 15:28:36 2024 +0000 +++ b/macros.xml Tue Jul 09 17:37:53 2024 +0000 @@ -2,11 +2,9 @@ segalign-full - bashlex - python - 0.1.2.2 + 0.1.2.7 0 21.05 diff -r 39c21be8f8b9 -r 150de8a3954a runner.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/runner.py Tue Jul 09 17:37:53 2024 +0000 @@ -0,0 +1,482 @@ +#!/usr/bin/env python + +import argparse +import collections +import concurrent.futures +import multiprocessing +import os +import queue +import re +import resource +import statistics +import subprocess +import sys +import time +import typing + +SENTINEL_VALUE: typing.Final = "SENTINEL" +# CA_SENTEL_VALUE: typing.Final = ChunkAddress(0, 0, 0, 0, 0, SENTINEL_VALUE) +RUSAGE_ATTRS: typing.Final = ["ru_utime", "ru_stime", "ru_maxrss", "ru_minflt", "ru_majflt", "ru_inblock", "ru_oublock", "ru_nvcsw", "ru_nivcsw"] + + +class LastzCommands: + def __init__(self) -> None: + self.commands: dict[str, LastzCommand] = {} + self.segalign_segments: SegAlignSegments = SegAlignSegments() + + def add(self, line: str) -> None: + if line not in self.commands: + self.commands[line] = LastzCommand(line) + + command = self.commands[line] + self.segalign_segments.add(command.segments_filename) + + def segments(self) -> typing.Iterator["SegAlignSegment"]: + for segment in self.segalign_segments: + yield segment + + +class LastzCommand: + lastz_command_regex = re.compile(r"lastz (.+?)?ref\.2bit\[nameparse=darkspace\]\[multiple\]\[subset=ref_block(\d+)\.name\] (.+?)?query\.2bit\[nameparse=darkspace\]\[subset=query_block(\d+)\.name] --format=(\S+) --ydrop=(\d+) --gappedthresh=(\d+) --strand=(minus|plus)(?: --ambiguous=(\S+))?(?: --(notrivial))?(?: --scoring=(\S+))? --segments=tmp(\d+)\.block(\d+)\.r(\d+)\.(minus|plus)(?:\.split(\d+))?\.segments --output=tmp(\d+)\.block(\d+)\.r(\d+)\.(minus|plus)(?:\.split(\d+))?\.(\S+) 2> tmp(\d+)\.block(\d+)\.r(\d+)\.(minus|plus)(?:\.split(\d+))?\.err") + + def __init__(self, line: str) -> None: + self.line = line + self.args: list[str] = [] + self.target_filename: str = '' + self.query_filename: str = '' + self.data_folder: str = '' + self.ref_block: int = 0 + self.query_block: int = 0 + self.output_format: str = '' + self.ydrop: int = 0 + self.gappedthresh: int = 0 + self.strand: int | None = None + self.ambiguous: bool = False + self.nontrivial: bool = False + self.scoring: str | None = None + self.segments_filename: str = '' + self.output_filename: str = '' + self.error_filename: str = '' + + self._parse_command() + + def _parse_command(self) -> None: + match = self.lastz_command_regex.match(self.line) + if not match: + sys.exit(f"Unkown lastz command format: {self.line}") + + self.data_folder = match.group(1) + self.ref_block = int(match.group(2)) + self.query_block = int(match.group(4)) + self.output_format = match.group(5) + self.ydrop = int(match.group(6)) + self.gappedthresh = int(match.group(7)) + strand = match.group(8) + + if strand == 'plus': + self.strand = 0 + elif strand == 'minus': + self.strand = 1 + + self.target_filename = f"{self.data_folder}ref.2bit[nameparse=darkspace][multiple][subset=ref_block{self.ref_block}.name]" + self.query_filename = f"{self.data_folder}query.2bit[nameparse=darkspace][subset=query_block{self.query_block}.name]" + + self.args = [ + "lastz", + self.target_filename, + self.query_filename, + f"--format={self.output_format}", + f"--ydrop={self.ydrop}", + f"--gappedthresh={self.gappedthresh}", + f"--strand={strand}" + ] + + ambiguous = match.group(9) + if ambiguous is not None: + self.ambiguous = True + self.args.append(f"--ambiguous={ambiguous}") + + nontrivial = match.group(10) + if nontrivial is not None: + self.nontrivial = True + self.args.append("--nontrivial") + + scoring = match.group(11) + if scoring is not None: + self.scoring = scoring + self.args.append(f"--scoring={scoring}") + + tmp_no = int(match.group(12)) + block_no = int(match.group(13)) + r_no = int(match.group(14)) + split = match.group(16) + + base_filename = f"tmp{tmp_no}.block{block_no}.r{r_no}.{strand}" + + if split is not None: + base_filename = f"{base_filename}.split{split}" + + self.segments_filename = f"{base_filename}.segments" + self.args.append(f"--segments={self.segments_filename}") + + self.output_filename = f"{base_filename}.{self.output_format}" + self.args.append(f"--output={self.output_filename}") + + self.error_filename = f"{base_filename}.err" + + +class SegAlignSegments: + def __init__(self) -> None: + self.segments: dict[str, SegAlignSegment] = {} + + def add(self, filename: str) -> None: + if filename not in self.segments: + self.segments[filename] = SegAlignSegment(filename) + + def __iter__(self) -> "SegAlignSegments": + return self + + def __next__(self) -> typing.Generator["SegAlignSegment", None, None]: + for segment in sorted(self.segments.values()): + yield segment + + +class SegAlignSegment: + def __init__(self, filename: str): + self.filename = filename + self.tmp: int = 0 + self.block: int = 0 + self.r: int = 0 + self.strand: int = 0 + self.split: int | None = None + self.fmt: str = "" + self._parse_filename() + + def _parse_filename(self) -> None: + match = re.match(r"tmp(\d+)\.block(\d+)\.r(\d+)\.(minus|plus)(?:\.split(\d+))?\.segments$", self.filename) + if not match: + sys.exit(f"Unkown segment filename format: {self.filename}") + + self.tmp = int(match.group(1)) + self.block = int(match.group(2)) + self.r = int(match.group(3)) + + strand = match.group(4) + if strand == 'plus': + self.strand = 0 + if strand == 'minus': + self.strand = 1 + + split = match.group(5) + if split is None: + self.split = None + else: + self.split = int(split) + + def __lt__(self, other: "SegAlignSegment") -> bool: + for attr in ['strand', 'tmp', 'block', 'r', 'split']: + self_value = getattr(self, attr) + other_value = getattr(other, attr) + if self_value < other_value: + return True + elif self_value > other_value: + return False + + return False + + +def main() -> None: + args, segalign_args = parse_args() + lastz_commands = LastzCommands() + + if args.diagonal_partition: + num_diagonal_partitioners = args.num_cpu + else: + num_diagonal_partitioners = 0 + + with multiprocessing.Manager() as manager: + segalign_q: queue.Queue[str] = manager.Queue() + skip_segalign = run_segalign(args, num_diagonal_partitioners, segalign_args, segalign_q, lastz_commands) + + if num_diagonal_partitioners > 0: + diagonal_partition_q = segalign_q + segalign_q = manager.Queue() + run_diagonal_partitioners(args, num_diagonal_partitioners, diagonal_partition_q, segalign_q) + + segalign_q.put(SENTINEL_VALUE) + output_q = segalign_q + segalign_q = manager.Queue() + + output_filename = "lastz-commands.txt" + if args.output_type == "commands": + output_filename = args.output_file + + with open(output_filename, "w") as f: + while True: + line = output_q.get() + if line == SENTINEL_VALUE: + output_q.task_done() + break + + # messy, fix this + if skip_segalign: + lastz_commands.add(line) + + if args.output_type != "commands": + segalign_q.put(line) + + print(line, file=f) + + if args.output_type == "output": + run_lastz(args, segalign_q, lastz_commands) + + with open(args.output_file, 'w') as of: + print("##maf version=1", file=of) + for lastz_command in lastz_commands.commands.values(): + with open(lastz_command.output_filename) as f: + for line in f: + of.write(line) + + if args.output_type == "tarball": + pass + + +def run_lastz(args: argparse.Namespace, input_q: queue.Queue[str], lastz_commands: LastzCommands) -> None: + num_lastz_workers = args.num_cpu + + for _ in range(num_lastz_workers): + input_q.put(SENTINEL_VALUE) + + if args.debug: + r_beg = resource.getrusage(resource.RUSAGE_CHILDREN) + beg: int = time.monotonic_ns() + + try: + with concurrent.futures.ProcessPoolExecutor(max_workers=num_lastz_workers) as executor: + for i in range(num_lastz_workers): + executor.submit(lastz_worker, input_q, i, lastz_commands) + except Exception as e: + sys.exit(f"Error: lastz failed: {e}") + + if args.debug: + ns: int = time.monotonic_ns() - beg + r_end = resource.getrusage(resource.RUSAGE_CHILDREN) + print(f"lastz clock time: {ns} ns", file=sys.stderr, flush=True) + for rusage_attr in RUSAGE_ATTRS: + value = getattr(r_end, rusage_attr) - getattr(r_beg, rusage_attr) + print(f" lastz {rusage_attr}: {value}", file=sys.stderr, flush=True) + + +def lastz_worker(input_q: queue.Queue[str], instance: int, lastz_commands: LastzCommands) -> None: + while True: + line = input_q.get() + if line == SENTINEL_VALUE: + input_q.task_done() + break + + if line not in lastz_commands.commands: + sys.exit(f"Error: lastz worker {instance} unexpected command format: {line}") + + command = lastz_commands.commands[line] + + if not os.path.exists(command.output_filename): + process = subprocess.run(command.args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + for line in process.stdout.splitlines(): + print(line, file=sys.stdout, flush=True) + + if len(process.stderr) != 0: + for line in process.stderr.splitlines(): + print(line, file=sys.stderr, flush=True) + + if process.returncode != 0: + sys.exit(f"Error: lastz {instance} exited with returncode {process.returncode}") + + +def run_diagonal_partitioners(args: argparse.Namespace, num_workers: int, input_q: queue.Queue[str], output_q: queue.Queue[str]) -> None: + chunk_size = estimate_chunk_size(args) + + if args.debug: + print(f"estimated chunk size: {chunk_size}", file=sys.stderr, flush=True) + + with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor: + for i in range(num_workers): + executor.submit(diagonal_partition_worker(input_q, output_q, chunk_size, i)) + + +def diagonal_partition_worker(input_q: queue.Queue[str], output_q: queue.Queue[str], chunk_size: int, instance: int) -> None: + while True: + line = input_q.get() + if line == SENTINEL_VALUE: + input_q.task_done() + break + + run_args = ["python", "/jetstream2/scratch/rico/job-dir/tool_files/diagonal_partition.py", str(chunk_size)] + for word in line.split(): + run_args.append(word) + process = subprocess.run(run_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + for line in process.stdout.splitlines(): + output_q.put(line) + + for line in process.stderr.splitlines(): + print(line, file=sys.stderr, flush=True) + + if process.returncode != 0: + sys.exit(f"Error: diagonal partitioner {instance} exited with returncode {process.returncode}") + + +def estimate_chunk_size(args: argparse.Namespace) -> int: + chunk_size = -1 + line_size = -1 + + if args.debug: + r_beg = resource.getrusage(resource.RUSAGE_SELF) + beg: int = time.monotonic_ns() + + # get size of each segment assuming DELETE_AFTER_CHUNKING == True + # takes into account already split segments + fdict: typing.DefaultDict[str, int] = collections.defaultdict(int) + for entry in os.scandir("."): + if entry.name.endswith(".segments"): + try: + file_size = entry.stat().st_size + except FileNotFoundError: + continue + + if line_size == -1: + try: + with open(entry.name) as f: + line_size = len(f.readline()) # add 1 for newline + except FileNotFoundError: + continue + + fdict[entry.name.split(".split", 1)[0]] += file_size + + # if noot enough segment files for estimation, continue + if len(fdict) > 2: + chunk_size = int(statistics.quantiles(fdict.values())[-1] // line_size) + + if args.debug: + ns: int = time.monotonic_ns() - beg + r_end = resource.getrusage(resource.RUSAGE_SELF) + print(f"estimate chunk size clock time: {ns} ns", file=sys.stderr, flush=True) + for rusage_attr in RUSAGE_ATTRS: + value = getattr(r_end, rusage_attr) - getattr(r_beg, rusage_attr) + print(f" estimate chunk size {rusage_attr}: {value}", file=sys.stderr, flush=True) + + return chunk_size + + +def run_segalign(args: argparse.Namespace, num_sentinel: int, segalign_args: list[str], segalign_q: queue.Queue[str], commands: LastzCommands) -> bool: + skip_segalign: bool = False + + # use the currently existing output file if it exists + if args.debug: + skip_segalign = load_segalign_output("lastz-commands.txt", segalign_q) + + if not skip_segalign: + run_args = ["segalign"] + run_args.extend(segalign_args) + run_args.append("--num_threads") + run_args.append(str(args.num_cpu)) + + if args.debug: + beg: int = time.monotonic_ns() + r_beg = resource.getrusage(resource.RUSAGE_CHILDREN) + + process = subprocess.run(run_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + + for line in process.stdout.splitlines(): + commands.add(line) + segalign_q.put(line) + + if len(process.stderr) != 0: + for line in process.stderr.splitlines(): + print(line, file=sys.stderr, flush=True) + + if process.returncode != 0: + sys.exit(f"Error: segalign exited with returncode {process.returncode}") + + if args.debug: + ns: int = time.monotonic_ns() - beg + r_end = resource.getrusage(resource.RUSAGE_CHILDREN) + print(f"segalign clock time: {ns} ns", file=sys.stderr, flush=True) + for rusage_attr in RUSAGE_ATTRS: + value = getattr(r_end, rusage_attr) - getattr(r_beg, rusage_attr) + print(f" segalign {rusage_attr}: {value}", flush=True) + + for _ in range(num_sentinel): + segalign_q.put(SENTINEL_VALUE) + + return skip_segalign + + +def load_segalign_output(filename: str, segalign_q: queue.Queue[str]) -> bool: + load_success = False + + r_beg = resource.getrusage(resource.RUSAGE_SELF) + beg: int = time.monotonic_ns() + + try: + with open(filename) as f: + for line in f: + line = line.rstrip("\n") + segalign_q.put(line) + load_success = True + except FileNotFoundError: + pass + + if load_success: + ns: int = time.monotonic_ns() - beg + r_end = resource.getrusage(resource.RUSAGE_SELF) + print(f"load output clock time: {ns} ns", file=sys.stderr, flush=True) + for rusage_attr in RUSAGE_ATTRS: + value = getattr(r_end, rusage_attr) - getattr(r_beg, rusage_attr) + print(f" load output {rusage_attr}: {value}", flush=True) + + return load_success + + +def parse_args() -> tuple[argparse.Namespace, list[str]]: + parser = argparse.ArgumentParser(allow_abbrev=False) + + parser.add_argument("--output-type", nargs="?", const="commands", default="commands", type=str, choices=["commands", "output", "tarball"], help="output type (default: %(default)s)") + parser.add_argument("--output-file", type=str, required=True, help="output pathname") + parser.add_argument("--diagonal-partition", action="store_true", help="run diagonal partition optimization") + parser.add_argument("--nogapped", action="store_true", help="don't perform gapped extension stage") + parser.add_argument("--markend", action="store_true", help="write a marker line just before completion") + parser.add_argument("--num-gpu", default=-1, type=int, help="number of GPUs to use (default: %(default)s [use all GPUs])") + parser.add_argument("--num-cpu", default=-1, type=int, help="number of CPUs to use (default: %(default)s [use all CPUs])") + parser.add_argument("--debug", action="store_true", help="print debug messages") + parser.add_argument("--tool_directory", type=str, required=True, help="tool directory") + + if not sys.argv[1:]: + parser.print_help() + sys.exit(0) + + args, segalign_args = parser.parse_known_args(sys.argv[1:]) + + cpus_available = len(os.sched_getaffinity(0)) + if args.num_cpu == -1: + args.num_cpu = cpus_available + elif args.num_cpu > cpus_available: + sys.exit(f"Error: additional {args.num_cpu - cpus_available} CPUs") + + if args.nogapped: + segalign_args.append("--nogapped") + + if args.markend: + segalign_args.append("--markend") + + if args.num_gpu != -1: + segalign_args.extend(["--num_gpu", f"{args.num_gpu}"]) + + if args.debug: + segalign_args.append("--debug") + + return args, segalign_args + + +if __name__ == "__main__": + main() diff -r 39c21be8f8b9 -r 150de8a3954a segalign.xml --- a/segalign.xml Tue May 07 15:28:36 2024 +0000 +++ b/segalign.xml Tue Jul 09 17:37:53 2024 +0000 @@ -14,10 +14,9 @@ - - +
-