changeset 21:25fa179d9d0a draft

planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit e4e05d23d9da18ea87bc352122ca9e6cfa73d1c7
author richard-burhans
date Fri, 09 Aug 2024 20:23:12 +0000
parents 96ff17622b17
children 860e1f6f3de4
files diagonal_partition.py runner.py system_options.xml
diffstat 3 files changed, 95 insertions(+), 42 deletions(-) [+]
line wrap: on
line diff
--- a/diagonal_partition.py	Wed Aug 07 20:34:45 2024 +0000
+++ b/diagonal_partition.py	Fri Aug 09 20:23:12 2024 +0000
@@ -1,16 +1,18 @@
 #!/usr/bin/env python
 
+
 """
 Diagonal partitioning for segment files output by SegAlign.
 
 Usage:
 diagonal_partition.py <max-segments> <lastz-command>
 
-set <max-segments> = 0 to skip partitioning, -1 to infer best parameter
+set <max-segments> = 0 to skip partitioning, -1 to estimate best parameter
 """
 
-
+import collections
 import os
+import statistics
 import sys
 import typing
 
@@ -22,19 +24,28 @@
 
 
 if __name__ == "__main__":
+    # TODO: make these optional user defined parameters
 
+    # deletes original segment file after splitting
     DELETE_AFTER_CHUNKING = True
-    MIN_CHUNK_SIZE = 5000  # don't partition segment files with line count below this value
+
+    # don't partition segment files with line count below this value
+    MIN_CHUNK_SIZE = 5000
 
-    # input_params = "10000 sad sadsa sad --segments=tmp21.block0.r0.minus.segments dsa sa --strand=plus --output=out.maf sadads 2> logging.err"
-    # sys.argv = [sys.argv[0]] + input_params.split(' ')
-    chunk_size = int(sys.argv[1])  # first parameter contains chunk size
+    # only used when segment size is being estimated
+    MAX_CHUNK_SIZE = 50000
+
+    # include chosen split size in file name
+    DEBUG = False
+
+    # first parameter contains chunk size
+    chunk_size = int(sys.argv[1])
     params = sys.argv[2:]
 
     # don't do anything if 0 chunk size
     if chunk_size == 0:
         print(" ".join(params), flush=True)
-        exit(0)
+        sys.exit(0)
 
     # Parsing command output from SegAlign
     segment_key = "--segments="
@@ -56,10 +67,12 @@
     if not os.path.isfile(input_file):
         sys.exit(f"Error: File {input_file} does not exist")
 
-    line_size = None  # each char in 1 byte
+    # each char is 1 byte
+    line_size = None
     file_size = os.path.getsize(input_file)
     with open(input_file, "r") as f:
-        line_size = len(f.readline())  # add 1 for newline
+        # add 1 for newline
+        line_size = len(f.readline())
 
     estimated_lines = file_size // line_size
 
@@ -68,29 +81,38 @@
         # optimization, do not need to get each file size in this case
         if estimated_lines < MIN_CHUNK_SIZE:
             print(" ".join(params), flush=True)
-            exit(0)
+            sys.exit(0)
 
-        from collections import defaultdict
-        from statistics import quantiles
         # get size of each segment assuming DELETE_AFTER_CHUNKING == True
         # takes into account already split segments
         files = [i for i in os.listdir(".") if i.endswith(".segments")]
-        # find . -maxdepth 1 -name "*.segments" -print0 | du -ba --files0-from=-
-        # if not enough segment files for estimation, continue
-        if len(files) <= 2:
-            print(" ".join(params), flush=True)
-            exit(0)
+
+        if len(files) < 2:
+            # if not enough segment files for estimation, use MAX_CHUNK_SIZE
+            chunk_size = MAX_CHUNK_SIZE
+        else:
+            fdict: typing.DefaultDict[str, int] = collections.defaultdict(int)
+            for filename in files:
+                size = os.path.getsize(filename)
+                f_ = filename.split(".split", 1)[0]
+                fdict[f_] += size
 
-        fdict: typing.DefaultDict[str, int] = defaultdict(int)
-        for filename in files:
-            size = os.path.getsize(filename)
-            f_ = filename.split(".split", 1)[0]
-            fdict[f_] += size
-        chunk_size = int(quantiles(fdict.values())[-1] // line_size)
+            if len(fdict) < 7:
+                # outliers can heavily skew prediction if <7 data points
+                # to be safe, use 50% quantile
+                chunk_size = int(statistics.quantiles(fdict.values())[1] // line_size)
+            else:
+                # otherwise use 75% quantile
+                chunk_size = int(statistics.quantiles(fdict.values())[-1] // line_size)
+            # if not enough data points, there is a chance of getting unlucky
+            # minimize worst case by using MAX_CHUNK_SIZE
 
-    if file_size // line_size <= chunk_size:  # no need to sort if number of lines <= chunk_size
+            chunk_size = min(chunk_size, MAX_CHUNK_SIZE)
+
+    # no need to sort if number of lines <= chunk_size
+    if (estimated_lines <= chunk_size):
         print(" ".join(params), flush=True)
-        exit(0)
+        sys.exit(0)
 
     # Find rest of relevant parameters
     output_key = "--output="
@@ -106,25 +128,28 @@
             output_index = index
             output_alignment_file = value[len(output_key):]
             output_alignment_file_base, output_format = output_alignment_file.rsplit(".", 1)
+
         if value[:len(strand_key)] == strand_key:
             strand_index = index
 
-    if output_index is None:
-        sys.exit(f"Error: could not get output key {output_key} from parameters {params}")
-
     if output_alignment_file_base is None:
         sys.exit(f"Error: could not get output alignment file base from parameters {params}")
 
     if output_format is None:
         sys.exit(f"Error: could not get output format from parameters {params}")
 
+    if output_index is None:
+        sys.exit(f"Error: could not get output key {output_key} from parameters {params}")
+
     if strand_index is None:
         sys.exit(f"Error: could not get strand key {strand_key} from parameters {params}")
 
-    err_index = -1  # error file is at very end
+    # error file is at very end
+    err_index = -1
     err_name_base = params[-1].split(".err", 1)[0]
 
-    data: dict[tuple[str, str], list[tuple[int, int, str]]] = {}  # dict of list of tuple (x, y, str)
+    # dict of list of tuple (x, y, str)
+    data: dict[tuple[str, str], list[tuple[int, int, str]]] = {}
 
     direction = None
     if "plus" in params[strand_index]:
@@ -149,15 +174,18 @@
     # If there are chromosome pairs with segment count <= chunk_size
     # then no need to sort and split these pairs into separate files.
     # It is better to keep these pairs in a single segment file.
-    skip_pairs = []  # pairs that have count <= chunk_size. these will not be sorted
+
+    # pairs that have count <= chunk_size. these will not be sorted
+    skip_pairs = []
 
     # save query key order
     # for lastz segment files: 'Query sequence names must appear in the same
     # order as they do in the query file'
-    query_key_order = list(dict.fromkeys([i[1] for i in data.keys()]))
 
     # NOTE: assuming data.keys() preserves order of keys. Requires Python 3.7+
 
+    query_key_order = list(dict.fromkeys([i[1] for i in data.keys()]))
+
     if len(data.keys()) > 1:
         for pair in data.keys():
             if len(data[pair]) <= chunk_size:
@@ -168,6 +196,7 @@
         for pair in data.keys():
             if pair not in skip_pairs:
                 data[pair] = sorted(data[pair], key=lambda coord: (coord[1] - coord[0], coord[0]))
+
     # sorting for reverse segments
     elif direction == "f":
         for pair in data.keys():
@@ -178,10 +207,15 @@
 
     # Writing file in chunks
     ctr = 0
-    for pair in data.keys() - skip_pairs:  # [i for i in data_keys if i not in set(skip_pairs)]:
+    # [i for i in data_keys if i not in set(skip_pairs)]:
+    for pair in (data.keys() - skip_pairs):
         for chunk in chunks(list(zip(*data[pair]))[2], chunk_size):
             ctr += 1
             name_addition = f".split{ctr}"
+
+            if DEBUG:
+                name_addition = f".{chunk_size}{name_addition}"
+
             fname = input_file.split(".segments", 1)[0] + name_addition + ".segments"
 
             assert len(chunk) != 0
@@ -197,12 +231,15 @@
 
     # writing unsorted skipped pairs
     if len(skip_pairs) > 0:
-        skip_pairs_with_len = sorted([(len(data[p]), p) for p in skip_pairs])   # list of tuples of (pair length, pair)
-        # NOTE: This can violate lastz query key order requirement
+        # list of tuples of (pair length, pair)
+        skip_pairs_with_len = sorted([(len(data[p]), p) for p in skip_pairs])
+        # NOTE: This sorting can violate lastz query key order requirement, this is fixed later
 
-        query_key_order_table = {item: idx for idx, item in enumerate(query_key_order)}  # used for sorting
+        # used for sorting
+        query_key_order_table = {item: idx for idx, item in enumerate(query_key_order)}
 
-        aggregated_skip_pairs: list[list[tuple[str, str]]] = []  # list of list of pair names
+        # list of list of pair names
+        aggregated_skip_pairs: list[list[tuple[str, str]]] = []
         current_count = 0
         aggregated_skip_pairs.append([])
         for count, pair in skip_pairs_with_len:
@@ -217,11 +254,16 @@
         for aggregate in aggregated_skip_pairs:
             ctr += 1
             name_addition = f".split{ctr}"
+
+            if DEBUG:
+                name_addition = f".{chunk_size}{name_addition}"
+
             fname = input_file.split(".segments", 1)[0] + name_addition + ".segments"
 
             with open(fname, "w") as f:
                 # fix possible lastz query key order violations
-                for pair in sorted(aggregate, key=lambda p: query_key_order_table[p[1]]):  # p[1] is query key
+                # p[1] is query key
+                for pair in sorted(aggregate, key=lambda p: query_key_order_table[p[1]]):
                     chunk = list(zip(*data[pair]))[2]
                     f.writelines(chunk)
             # update segment file in command
--- a/runner.py	Wed Aug 07 20:34:45 2024 +0000
+++ b/runner.py	Fri Aug 09 20:23:12 2024 +0000
@@ -314,7 +314,7 @@
         run_args = ["python", f"{args.tool_directory}/diagonal_partition.py", str(chunk_size)]
         for word in line.split():
             run_args.append(word)
-        process = subprocess.run(run_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
+        process = subprocess.run(run_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1, text=True)
 
         for line in process.stdout.splitlines():
             output_q.put(line)
@@ -327,6 +327,8 @@
 
 
 def estimate_chunk_size(args: argparse.Namespace) -> int:
+    # only used when segment size is being estimated
+    MAX_CHUNK_SIZE = 50000
     chunk_size = -1
     line_size = -1
 
@@ -353,10 +355,19 @@
 
             fdict[entry.name.split(".split", 1)[0]] += file_size
 
-    # if noot enough segment files for estimation, continue
-    if len(fdict) > 2:
+    if len(fdict) < 7:
+        # outliers can heavily skew prediction if <7 data points
+        # to be safe, use 50% quantile
+        chunk_size = int(statistics.quantiles(fdict.values())[1] // line_size)
+    else:
+        # otherwise use 75% quantile
         chunk_size = int(statistics.quantiles(fdict.values())[-1] // line_size)
 
+    # if not enough data points, there is a chance of getting unlucky
+    # minimize worst case by using MAX_CHUNK_SIZE
+
+    chunk_size = min(chunk_size, MAX_CHUNK_SIZE)
+
     if args.debug:
         ns: int = time.monotonic_ns() - beg
         r_end = resource.getrusage(resource.RUSAGE_SELF)
--- a/system_options.xml	Wed Aug 07 20:34:45 2024 +0000
+++ b/system_options.xml	Fri Aug 09 20:23:12 2024 +0000
@@ -3,7 +3,7 @@
         <section name="system_options" title="System Options">
             <param argument="--wga_chunk_size" type="integer" value="250000" label="Chunk sizes for GPU calls for Xdrop - ⚠ change only if you are a developer"/>
             <param argument="--lastz_interval_size" type="integer" value="10000000" label="LASTZ interval for Ydrop - ⚠ change only if you are a developer"/>
-            <param argument="--seq_block_size" type="integer" value="500000000" label="LASTZ interval for Ydrop - ⚠ change only if you are a developer"/>
+            <param argument="--seq_block_size" type="integer" value="400000000" label="LASTZ interval for Ydrop - ⚠ change only if you are a developer"/>
             <param argument="--num_gpu" type="integer" value="-1" label="Specify number of GPUs to use - -1 if all the GPUs should be used"/>
             <param argument="--debug" type="boolean" value="false" label="Print debug messages"/>
         </section>