Mercurial > repos > richard-burhans > segalign
changeset 21:25fa179d9d0a draft
planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit e4e05d23d9da18ea87bc352122ca9e6cfa73d1c7
author | richard-burhans |
---|---|
date | Fri, 09 Aug 2024 20:23:12 +0000 |
parents | 96ff17622b17 |
children | 860e1f6f3de4 |
files | diagonal_partition.py runner.py system_options.xml |
diffstat | 3 files changed, 95 insertions(+), 42 deletions(-) [+] |
line wrap: on
line diff
--- a/diagonal_partition.py Wed Aug 07 20:34:45 2024 +0000 +++ b/diagonal_partition.py Fri Aug 09 20:23:12 2024 +0000 @@ -1,16 +1,18 @@ #!/usr/bin/env python + """ Diagonal partitioning for segment files output by SegAlign. Usage: diagonal_partition.py <max-segments> <lastz-command> -set <max-segments> = 0 to skip partitioning, -1 to infer best parameter +set <max-segments> = 0 to skip partitioning, -1 to estimate best parameter """ - +import collections import os +import statistics import sys import typing @@ -22,19 +24,28 @@ if __name__ == "__main__": + # TODO: make these optional user defined parameters + # deletes original segment file after splitting DELETE_AFTER_CHUNKING = True - MIN_CHUNK_SIZE = 5000 # don't partition segment files with line count below this value + + # don't partition segment files with line count below this value + MIN_CHUNK_SIZE = 5000 - # input_params = "10000 sad sadsa sad --segments=tmp21.block0.r0.minus.segments dsa sa --strand=plus --output=out.maf sadads 2> logging.err" - # sys.argv = [sys.argv[0]] + input_params.split(' ') - chunk_size = int(sys.argv[1]) # first parameter contains chunk size + # only used when segment size is being estimated + MAX_CHUNK_SIZE = 50000 + + # include chosen split size in file name + DEBUG = False + + # first parameter contains chunk size + chunk_size = int(sys.argv[1]) params = sys.argv[2:] # don't do anything if 0 chunk size if chunk_size == 0: print(" ".join(params), flush=True) - exit(0) + sys.exit(0) # Parsing command output from SegAlign segment_key = "--segments=" @@ -56,10 +67,12 @@ if not os.path.isfile(input_file): sys.exit(f"Error: File {input_file} does not exist") - line_size = None # each char in 1 byte + # each char is 1 byte + line_size = None file_size = os.path.getsize(input_file) with open(input_file, "r") as f: - line_size = len(f.readline()) # add 1 for newline + # add 1 for newline + line_size = len(f.readline()) estimated_lines = file_size // line_size @@ -68,29 +81,38 @@ # optimization, do not need to get each file size in this case if estimated_lines < MIN_CHUNK_SIZE: print(" ".join(params), flush=True) - exit(0) + sys.exit(0) - from collections import defaultdict - from statistics import quantiles # get size of each segment assuming DELETE_AFTER_CHUNKING == True # takes into account already split segments files = [i for i in os.listdir(".") if i.endswith(".segments")] - # find . -maxdepth 1 -name "*.segments" -print0 | du -ba --files0-from=- - # if not enough segment files for estimation, continue - if len(files) <= 2: - print(" ".join(params), flush=True) - exit(0) + + if len(files) < 2: + # if not enough segment files for estimation, use MAX_CHUNK_SIZE + chunk_size = MAX_CHUNK_SIZE + else: + fdict: typing.DefaultDict[str, int] = collections.defaultdict(int) + for filename in files: + size = os.path.getsize(filename) + f_ = filename.split(".split", 1)[0] + fdict[f_] += size - fdict: typing.DefaultDict[str, int] = defaultdict(int) - for filename in files: - size = os.path.getsize(filename) - f_ = filename.split(".split", 1)[0] - fdict[f_] += size - chunk_size = int(quantiles(fdict.values())[-1] // line_size) + if len(fdict) < 7: + # outliers can heavily skew prediction if <7 data points + # to be safe, use 50% quantile + chunk_size = int(statistics.quantiles(fdict.values())[1] // line_size) + else: + # otherwise use 75% quantile + chunk_size = int(statistics.quantiles(fdict.values())[-1] // line_size) + # if not enough data points, there is a chance of getting unlucky + # minimize worst case by using MAX_CHUNK_SIZE - if file_size // line_size <= chunk_size: # no need to sort if number of lines <= chunk_size + chunk_size = min(chunk_size, MAX_CHUNK_SIZE) + + # no need to sort if number of lines <= chunk_size + if (estimated_lines <= chunk_size): print(" ".join(params), flush=True) - exit(0) + sys.exit(0) # Find rest of relevant parameters output_key = "--output=" @@ -106,25 +128,28 @@ output_index = index output_alignment_file = value[len(output_key):] output_alignment_file_base, output_format = output_alignment_file.rsplit(".", 1) + if value[:len(strand_key)] == strand_key: strand_index = index - if output_index is None: - sys.exit(f"Error: could not get output key {output_key} from parameters {params}") - if output_alignment_file_base is None: sys.exit(f"Error: could not get output alignment file base from parameters {params}") if output_format is None: sys.exit(f"Error: could not get output format from parameters {params}") + if output_index is None: + sys.exit(f"Error: could not get output key {output_key} from parameters {params}") + if strand_index is None: sys.exit(f"Error: could not get strand key {strand_key} from parameters {params}") - err_index = -1 # error file is at very end + # error file is at very end + err_index = -1 err_name_base = params[-1].split(".err", 1)[0] - data: dict[tuple[str, str], list[tuple[int, int, str]]] = {} # dict of list of tuple (x, y, str) + # dict of list of tuple (x, y, str) + data: dict[tuple[str, str], list[tuple[int, int, str]]] = {} direction = None if "plus" in params[strand_index]: @@ -149,15 +174,18 @@ # If there are chromosome pairs with segment count <= chunk_size # then no need to sort and split these pairs into separate files. # It is better to keep these pairs in a single segment file. - skip_pairs = [] # pairs that have count <= chunk_size. these will not be sorted + + # pairs that have count <= chunk_size. these will not be sorted + skip_pairs = [] # save query key order # for lastz segment files: 'Query sequence names must appear in the same # order as they do in the query file' - query_key_order = list(dict.fromkeys([i[1] for i in data.keys()])) # NOTE: assuming data.keys() preserves order of keys. Requires Python 3.7+ + query_key_order = list(dict.fromkeys([i[1] for i in data.keys()])) + if len(data.keys()) > 1: for pair in data.keys(): if len(data[pair]) <= chunk_size: @@ -168,6 +196,7 @@ for pair in data.keys(): if pair not in skip_pairs: data[pair] = sorted(data[pair], key=lambda coord: (coord[1] - coord[0], coord[0])) + # sorting for reverse segments elif direction == "f": for pair in data.keys(): @@ -178,10 +207,15 @@ # Writing file in chunks ctr = 0 - for pair in data.keys() - skip_pairs: # [i for i in data_keys if i not in set(skip_pairs)]: + # [i for i in data_keys if i not in set(skip_pairs)]: + for pair in (data.keys() - skip_pairs): for chunk in chunks(list(zip(*data[pair]))[2], chunk_size): ctr += 1 name_addition = f".split{ctr}" + + if DEBUG: + name_addition = f".{chunk_size}{name_addition}" + fname = input_file.split(".segments", 1)[0] + name_addition + ".segments" assert len(chunk) != 0 @@ -197,12 +231,15 @@ # writing unsorted skipped pairs if len(skip_pairs) > 0: - skip_pairs_with_len = sorted([(len(data[p]), p) for p in skip_pairs]) # list of tuples of (pair length, pair) - # NOTE: This can violate lastz query key order requirement + # list of tuples of (pair length, pair) + skip_pairs_with_len = sorted([(len(data[p]), p) for p in skip_pairs]) + # NOTE: This sorting can violate lastz query key order requirement, this is fixed later - query_key_order_table = {item: idx for idx, item in enumerate(query_key_order)} # used for sorting + # used for sorting + query_key_order_table = {item: idx for idx, item in enumerate(query_key_order)} - aggregated_skip_pairs: list[list[tuple[str, str]]] = [] # list of list of pair names + # list of list of pair names + aggregated_skip_pairs: list[list[tuple[str, str]]] = [] current_count = 0 aggregated_skip_pairs.append([]) for count, pair in skip_pairs_with_len: @@ -217,11 +254,16 @@ for aggregate in aggregated_skip_pairs: ctr += 1 name_addition = f".split{ctr}" + + if DEBUG: + name_addition = f".{chunk_size}{name_addition}" + fname = input_file.split(".segments", 1)[0] + name_addition + ".segments" with open(fname, "w") as f: # fix possible lastz query key order violations - for pair in sorted(aggregate, key=lambda p: query_key_order_table[p[1]]): # p[1] is query key + # p[1] is query key + for pair in sorted(aggregate, key=lambda p: query_key_order_table[p[1]]): chunk = list(zip(*data[pair]))[2] f.writelines(chunk) # update segment file in command
--- a/runner.py Wed Aug 07 20:34:45 2024 +0000 +++ b/runner.py Fri Aug 09 20:23:12 2024 +0000 @@ -314,7 +314,7 @@ run_args = ["python", f"{args.tool_directory}/diagonal_partition.py", str(chunk_size)] for word in line.split(): run_args.append(word) - process = subprocess.run(run_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + process = subprocess.run(run_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1, text=True) for line in process.stdout.splitlines(): output_q.put(line) @@ -327,6 +327,8 @@ def estimate_chunk_size(args: argparse.Namespace) -> int: + # only used when segment size is being estimated + MAX_CHUNK_SIZE = 50000 chunk_size = -1 line_size = -1 @@ -353,10 +355,19 @@ fdict[entry.name.split(".split", 1)[0]] += file_size - # if noot enough segment files for estimation, continue - if len(fdict) > 2: + if len(fdict) < 7: + # outliers can heavily skew prediction if <7 data points + # to be safe, use 50% quantile + chunk_size = int(statistics.quantiles(fdict.values())[1] // line_size) + else: + # otherwise use 75% quantile chunk_size = int(statistics.quantiles(fdict.values())[-1] // line_size) + # if not enough data points, there is a chance of getting unlucky + # minimize worst case by using MAX_CHUNK_SIZE + + chunk_size = min(chunk_size, MAX_CHUNK_SIZE) + if args.debug: ns: int = time.monotonic_ns() - beg r_end = resource.getrusage(resource.RUSAGE_SELF)
--- a/system_options.xml Wed Aug 07 20:34:45 2024 +0000 +++ b/system_options.xml Fri Aug 09 20:23:12 2024 +0000 @@ -3,7 +3,7 @@ <section name="system_options" title="System Options"> <param argument="--wga_chunk_size" type="integer" value="250000" label="Chunk sizes for GPU calls for Xdrop - ⚠ change only if you are a developer"/> <param argument="--lastz_interval_size" type="integer" value="10000000" label="LASTZ interval for Ydrop - ⚠ change only if you are a developer"/> - <param argument="--seq_block_size" type="integer" value="500000000" label="LASTZ interval for Ydrop - ⚠ change only if you are a developer"/> + <param argument="--seq_block_size" type="integer" value="400000000" label="LASTZ interval for Ydrop - ⚠ change only if you are a developer"/> <param argument="--num_gpu" type="integer" value="-1" label="Specify number of GPUs to use - -1 if all the GPUs should be used"/> <param argument="--debug" type="boolean" value="false" label="Print debug messages"/> </section>