# HG changeset patch # User richard-burhans # Date 1720631205 0 # Node ID 08e987868f0fe07374d08f7d480042611219f2b0 # Parent 150de8a3954a1e7baf510d94aa3feee6c7b60a1b planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit 062a761a340e095ea7ef7ed7cd1d3d55b1fdc5c4 diff -r 150de8a3954a -r 08e987868f0f diagonal_partition.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/diagonal_partition.py Wed Jul 10 17:06:45 2024 +0000 @@ -0,0 +1,236 @@ +#!/usr/bin/env python + +""" +Diagonal partitioning for segment files output by SegAlign. + +Usage: +diagonal_partition.py + +set = 0 to skip partitioning, -1 to infer best parameter +""" + + +import os +import sys +import typing + + +def chunks(lst: tuple[str, ...], n: int) -> typing.Iterator[tuple[str, ...]]: + """Yield successive n-sized chunks from list.""" + for i in range(0, len(lst), n): + yield lst[i:i + n] + + +if __name__ == "__main__": + + DELETE_AFTER_CHUNKING = True + MIN_CHUNK_SIZE = 5000 # don't partition segment files with line count below this value + + # input_params = "10000 sad sadsa sad --segments=tmp21.block0.r0.minus.segments dsa sa --strand=plus --output=out.maf sadads 2> logging.err" + # sys.argv = [sys.argv[0]] + input_params.split(' ') + chunk_size = int(sys.argv[1]) # first parameter contains chunk size + params = sys.argv[2:] + + # don't do anything if 0 chunk size + if chunk_size == 0: + print(" ".join(params), flush=True) + exit(0) + + # Parsing command output from SegAlign + segment_key = "--segments=" + segment_index = None + input_file = None + + for index, value in enumerate(params): + if value[:len(segment_key)] == segment_key: + segment_index = index + input_file = value[len(segment_key):] + break + + if segment_index is None: + sys.exit(f"Error: could not get segment key {segment_key} from parameters {params}") + + if input_file is None: + sys.exit(f"Error: could not get segment file from parameters {params}") + + if not os.path.isfile(input_file): + sys.exit(f"Error: File {input_file} does not exist") + + line_size = None # each char in 1 byte + file_size = os.path.getsize(input_file) + with open(input_file, "r") as f: + line_size = len(f.readline()) # add 1 for newline + + estimated_lines = file_size // line_size + + # check if chunk size should be estimated + if chunk_size < 0: + # optimization, do not need to get each file size in this case + if estimated_lines < MIN_CHUNK_SIZE: + print(" ".join(params), flush=True) + exit(0) + + from collections import defaultdict + from statistics import quantiles + # get size of each segment assuming DELETE_AFTER_CHUNKING == True + # takes into account already split segments + files = [i for i in os.listdir(".") if i.endswith(".segments")] + # find . -maxdepth 1 -name "*.segments" -print0 | du -ba --files0-from=- + # if not enough segment files for estimation, continue + if len(files) <= 2: + print(" ".join(params), flush=True) + exit(0) + + fdict: typing.DefaultDict[str, int] = defaultdict(int) + for filename in files: + size = os.path.getsize(filename) + f_ = filename.split(".split", 1)[0] + fdict[f_] += size + chunk_size = int(quantiles(fdict.values())[-1] // line_size) + + if file_size // line_size <= chunk_size: # no need to sort if number of lines <= chunk_size + print(" ".join(params), flush=True) + exit(0) + + # Find rest of relevant parameters + output_key = "--output=" + output_index = None + output_alignment_file = None + output_alignment_file_base = None + output_format = None + + strand_key = "--strand=" + strand_index = None + for index, value in enumerate(params): + if value[:len(output_key)] == output_key: + output_index = index + output_alignment_file = value[len(output_key):] + output_alignment_file_base, output_format = output_alignment_file.rsplit(".", 1) + if value[:len(strand_key)] == strand_key: + strand_index = index + + if output_index is None: + sys.exit(f"Error: could not get output key {output_key} from parameters {params}") + + if output_alignment_file_base is None: + sys.exit(f"Error: could not get output alignment file base from parameters {params}") + + if output_format is None: + sys.exit(f"Error: could not get output format from parameters {params}") + + if strand_index is None: + sys.exit(f"Error: could not get strand key {strand_key} from parameters {params}") + + err_index = -1 # error file is at very end + err_name_base = params[-1].split(".err", 1)[0] + + data: dict[tuple[str, str], list[tuple[int, int, str]]] = {} # dict of list of tuple (x, y, str) + + direction = None + if "plus" in params[strand_index]: + direction = "f" + elif "minus" in params[strand_index]: + direction = "r" + else: + sys.exit(f"Error: could not figure out direction from strand value {params[strand_index]}") + + for line in open(input_file, "r"): + if line == "": + continue + seq1_name, seq1_start, seq1_end, seq2_name, seq2_start, seq2_end, _dir, score = line.split() + # data.append((int(seq1_start), int(seq2_start), line)) + half_dist = int((int(seq1_end) - int(seq1_start)) // 2) + assert int(seq1_end) > int(seq1_start) + assert int(seq2_end) > int(seq2_start) + seq1_mid = int(seq1_start) + half_dist + seq2_mid = int(seq2_start) + half_dist + data.setdefault((seq1_name, seq2_name), []).append((seq1_mid, seq2_mid, line)) + + # If there are chromosome pairs with segment count <= chunk_size + # then no need to sort and split these pairs into separate files. + # It is better to keep these pairs in a single segment file. + skip_pairs = [] # pairs that have count <= chunk_size. these will not be sorted + + # save query key order + # for lastz segment files: 'Query sequence names must appear in the same + # order as they do in the query file' + query_key_order = list(dict.fromkeys([i[1] for i in data.keys()])) + + # NOTE: assuming data.keys() preserves order of keys. Requires Python 3.7+ + + if len(data.keys()) > 1: + for pair in data.keys(): + if len(data[pair]) <= chunk_size: + skip_pairs.append(pair) + + # sorting for forward segments + if direction == "r": + for pair in data.keys(): + if pair not in skip_pairs: + data[pair] = sorted(data[pair], key=lambda coord: (coord[1] - coord[0], coord[0])) + # sorting for reverse segments + elif direction == "f": + for pair in data.keys(): + if pair not in skip_pairs: + data[pair] = sorted(data[pair], key=lambda coord: (coord[1] + coord[0], coord[0])) + else: + sys.exit(f"INVALID DIRECTION VALUE: {direction}") + + # Writing file in chunks + ctr = 0 + for pair in data.keys() - skip_pairs: # [i for i in data_keys if i not in set(skip_pairs)]: + for chunk in chunks(list(zip(*data[pair]))[2], chunk_size): + ctr += 1 + name_addition = f".split{ctr}" + fname = input_file.split(".segments", 1)[0] + name_addition + ".segments" + + assert len(chunk) != 0 + with open(fname, "w") as f: + f.writelines(chunk) + # update segment file in command + params[segment_index] = segment_key + fname + # update output file in command + params[output_index] = output_key + output_alignment_file_base + name_addition + "." + output_format + # update error file in command + params[-1] = err_name_base + name_addition + ".err" + print(" ".join(params), flush=True) + + # writing unsorted skipped pairs + if len(skip_pairs) > 0: + skip_pairs_with_len = sorted([(len(data[p]), p) for p in skip_pairs]) # list of tuples of (pair length, pair) + # NOTE: This can violate lastz query key order requirement + + query_key_order_table = {item: idx for idx, item in enumerate(query_key_order)} # used for sorting + + aggregated_skip_pairs: list[list[tuple[str, str]]] = [] # list of list of pair names + current_count = 0 + aggregated_skip_pairs.append([]) + for count, pair in skip_pairs_with_len: + if current_count + count <= chunk_size: + current_count += count + aggregated_skip_pairs[-1].append(pair) + else: + aggregated_skip_pairs.append([]) + current_count = count + aggregated_skip_pairs[-1].append(pair) + + for aggregate in aggregated_skip_pairs: + ctr += 1 + name_addition = f".split{ctr}" + fname = input_file.split(".segments", 1)[0] + name_addition + ".segments" + + with open(fname, "w") as f: + # fix possible lastz query key order violations + for pair in sorted(aggregate, key=lambda p: query_key_order_table[p[1]]): # p[1] is query key + chunk = list(zip(*data[pair]))[2] + f.writelines(chunk) + # update segment file in command + params[segment_index] = segment_key + fname + # update output file in command + params[output_index] = output_key + output_alignment_file_base + name_addition + "." + output_format + # update error file in command + params[-1] = err_name_base + name_addition + ".err" + print(" ".join(params), flush=True) + + if DELETE_AFTER_CHUNKING: + os.remove(input_file) diff -r 150de8a3954a -r 08e987868f0f macros.xml --- a/macros.xml Tue Jul 09 17:37:53 2024 +0000 +++ b/macros.xml Wed Jul 10 17:06:45 2024 +0000 @@ -1,7 +1,7 @@ - segalign-full + segalign-galaxy 0.1.2.7 diff -r 150de8a3954a -r 08e987868f0f package_output.py --- a/package_output.py Tue Jul 09 17:37:53 2024 +0000 +++ b/package_output.py Wed Jul 10 17:06:45 2024 +0000 @@ -277,7 +277,7 @@ config.read(lastz_command_config_file) package_file = PackageFile() - lastz_command_file = "lastz_commands.txt" + lastz_command_file = "lastz-commands.txt" bashCommandLineFile(lastz_command_file, config, package_file) package_file.close() diff -r 150de8a3954a -r 08e987868f0f runner.py --- a/runner.py Tue Jul 09 17:37:53 2024 +0000 +++ b/runner.py Wed Jul 10 17:06:45 2024 +0000 @@ -301,17 +301,17 @@ with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor: for i in range(num_workers): - executor.submit(diagonal_partition_worker(input_q, output_q, chunk_size, i)) + executor.submit(diagonal_partition_worker(args, input_q, output_q, chunk_size, i)) -def diagonal_partition_worker(input_q: queue.Queue[str], output_q: queue.Queue[str], chunk_size: int, instance: int) -> None: +def diagonal_partition_worker(args: argparse.Namespace, input_q: queue.Queue[str], output_q: queue.Queue[str], chunk_size: int, instance: int) -> None: while True: line = input_q.get() if line == SENTINEL_VALUE: input_q.task_done() break - run_args = ["python", "/jetstream2/scratch/rico/job-dir/tool_files/diagonal_partition.py", str(chunk_size)] + run_args = ["python", f"{args.tool_directory}/diagonal_partition.py", str(chunk_size)] for word in line.split(): run_args.append(word) process = subprocess.run(run_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) @@ -380,6 +380,7 @@ run_args.extend(segalign_args) run_args.append("--num_threads") run_args.append(str(args.num_cpu)) + run_args.append("work/") if args.debug: beg: int = time.monotonic_ns() diff -r 150de8a3954a -r 08e987868f0f segalign.xml --- a/segalign.xml Tue Jul 09 17:37:53 2024 +0000 +++ b/segalign.xml Wed Jul 10 17:06:45 2024 +0000 @@ -33,21 +33,26 @@ ## Mode ---------------------------------------------------------------- #if str($mode.mode_selector) == "segalign" + #if str($mode.gapped_extension_options.nogapped) == "false" + mkdir -p "\$(pwd)/work" && + faToTwoBit <(gzip -cdfq '$mode.target') "\$(pwd)/work/ref.2bit" && + faToTwoBit <(gzip -cdfq '$mode.query') "\$(pwd)/work/query.2bit" && + #end if #if str($mode.diagonal_partition_options.diagonal_partition) == "true" #set $segalign_mode = "segalign_diagonal_partition" - ## explicitly calling bash to bypass a pulsar bug + ## explicitly calling python to bypass a pulsar bug ## https://github.com/galaxyproject/pulsar/issues/341 - bash '$__tool_directory__/runner.py' + python '$__tool_directory__/runner.py' --output-type tarball - --output-filename fake + --output-file '$segalign_diagonal_partition_output' --diagonal-partition --num-cpu \${GALAXY_SLOTS:-2} --tool_directory '$__tool_directory__' #else #set $segalign_mode = "segalign" - bash '$__tool_directory__/runner.py' + python '$__tool_directory__/runner.py' --output-type output - --output-filename foo.maf + --output-file '$segalign_output' --num-cpu \${GALAXY_SLOTS:-2} --tool_directory '$__tool_directory__' #end if @@ -57,6 +62,7 @@ #set $segalign_mode = "segalign_repeat_masker" run_segalign_repeat_masker '$mode.seq_file' + --num_cpu \${GALAXY_SLOTS:-2} #end if ## Sequence Options ---------------------------------------------------- @@ -144,9 +150,6 @@ ## && ## Rscript $r_plot > /dev/null 2>&1 ## .end if - #if $segalign_mode == "segalign" - --output '$segalign_output' - #end if #else if $segalign_mode == "segalign_repeat_masker" --M '$mode.output_options.M' --output '$segalign_repeat_masker_output' diff -r 150de8a3954a -r 08e987868f0f test-data/hg38.chr20.chunk.fa.gz Binary file test-data/hg38.chr20.chunk.fa.gz has changed diff -r 150de8a3954a -r 08e987868f0f test-data/mm39.chr2.chunk.fa.gz Binary file test-data/mm39.chr2.chunk.fa.gz has changed diff -r 150de8a3954a -r 08e987868f0f test-data/segalign-output.maf.gz Binary file test-data/segalign-output.maf.gz has changed