changeset 9:08e987868f0f draft

planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit 062a761a340e095ea7ef7ed7cd1d3d55b1fdc5c4
author richard-burhans
date Wed, 10 Jul 2024 17:06:45 +0000
parents 150de8a3954a
children ec709ce3d91b
files diagonal_partition.py macros.xml package_output.py runner.py segalign.xml test-data/hg38.chr20.chunk.fa.gz test-data/mm39.chr2.chunk.fa.gz test-data/segalign-output.maf.gz
diffstat 8 files changed, 253 insertions(+), 13 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/diagonal_partition.py	Wed Jul 10 17:06:45 2024 +0000
@@ -0,0 +1,236 @@
+#!/usr/bin/env python
+
+"""
+Diagonal partitioning for segment files output by SegAlign.
+
+Usage:
+diagonal_partition.py <max-segments> <lastz-command>
+
+set <max-segments> = 0 to skip partitioning, -1 to infer best parameter
+"""
+
+
+import os
+import sys
+import typing
+
+
+def chunks(lst: tuple[str, ...], n: int) -> typing.Iterator[tuple[str, ...]]:
+    """Yield successive n-sized chunks from list."""
+    for i in range(0, len(lst), n):
+        yield lst[i:i + n]
+
+
+if __name__ == "__main__":
+
+    DELETE_AFTER_CHUNKING = True
+    MIN_CHUNK_SIZE = 5000  # don't partition segment files with line count below this value
+
+    # input_params = "10000 sad sadsa sad --segments=tmp21.block0.r0.minus.segments dsa sa --strand=plus --output=out.maf sadads 2> logging.err"
+    # sys.argv = [sys.argv[0]] + input_params.split(' ')
+    chunk_size = int(sys.argv[1])  # first parameter contains chunk size
+    params = sys.argv[2:]
+
+    # don't do anything if 0 chunk size
+    if chunk_size == 0:
+        print(" ".join(params), flush=True)
+        exit(0)
+
+    # Parsing command output from SegAlign
+    segment_key = "--segments="
+    segment_index = None
+    input_file = None
+
+    for index, value in enumerate(params):
+        if value[:len(segment_key)] == segment_key:
+            segment_index = index
+            input_file = value[len(segment_key):]
+            break
+
+    if segment_index is None:
+        sys.exit(f"Error: could not get segment key {segment_key} from parameters {params}")
+
+    if input_file is None:
+        sys.exit(f"Error: could not get segment file from parameters {params}")
+
+    if not os.path.isfile(input_file):
+        sys.exit(f"Error: File {input_file} does not exist")
+
+    line_size = None  # each char in 1 byte
+    file_size = os.path.getsize(input_file)
+    with open(input_file, "r") as f:
+        line_size = len(f.readline())  # add 1 for newline
+
+    estimated_lines = file_size // line_size
+
+    # check if chunk size should be estimated
+    if chunk_size < 0:
+        # optimization, do not need to get each file size in this case
+        if estimated_lines < MIN_CHUNK_SIZE:
+            print(" ".join(params), flush=True)
+            exit(0)
+
+        from collections import defaultdict
+        from statistics import quantiles
+        # get size of each segment assuming DELETE_AFTER_CHUNKING == True
+        # takes into account already split segments
+        files = [i for i in os.listdir(".") if i.endswith(".segments")]
+        # find . -maxdepth 1 -name "*.segments" -print0 | du -ba --files0-from=-
+        # if not enough segment files for estimation, continue
+        if len(files) <= 2:
+            print(" ".join(params), flush=True)
+            exit(0)
+
+        fdict: typing.DefaultDict[str, int] = defaultdict(int)
+        for filename in files:
+            size = os.path.getsize(filename)
+            f_ = filename.split(".split", 1)[0]
+            fdict[f_] += size
+        chunk_size = int(quantiles(fdict.values())[-1] // line_size)
+
+    if file_size // line_size <= chunk_size:  # no need to sort if number of lines <= chunk_size
+        print(" ".join(params), flush=True)
+        exit(0)
+
+    # Find rest of relevant parameters
+    output_key = "--output="
+    output_index = None
+    output_alignment_file = None
+    output_alignment_file_base = None
+    output_format = None
+
+    strand_key = "--strand="
+    strand_index = None
+    for index, value in enumerate(params):
+        if value[:len(output_key)] == output_key:
+            output_index = index
+            output_alignment_file = value[len(output_key):]
+            output_alignment_file_base, output_format = output_alignment_file.rsplit(".", 1)
+        if value[:len(strand_key)] == strand_key:
+            strand_index = index
+
+    if output_index is None:
+        sys.exit(f"Error: could not get output key {output_key} from parameters {params}")
+
+    if output_alignment_file_base is None:
+        sys.exit(f"Error: could not get output alignment file base from parameters {params}")
+
+    if output_format is None:
+        sys.exit(f"Error: could not get output format from parameters {params}")
+
+    if strand_index is None:
+        sys.exit(f"Error: could not get strand key {strand_key} from parameters {params}")
+
+    err_index = -1  # error file is at very end
+    err_name_base = params[-1].split(".err", 1)[0]
+
+    data: dict[tuple[str, str], list[tuple[int, int, str]]] = {}  # dict of list of tuple (x, y, str)
+
+    direction = None
+    if "plus" in params[strand_index]:
+        direction = "f"
+    elif "minus" in params[strand_index]:
+        direction = "r"
+    else:
+        sys.exit(f"Error: could not figure out direction from strand value {params[strand_index]}")
+
+    for line in open(input_file, "r"):
+        if line == "":
+            continue
+        seq1_name, seq1_start, seq1_end, seq2_name, seq2_start, seq2_end, _dir, score = line.split()
+        # data.append((int(seq1_start), int(seq2_start), line))
+        half_dist = int((int(seq1_end) - int(seq1_start)) // 2)
+        assert int(seq1_end) > int(seq1_start)
+        assert int(seq2_end) > int(seq2_start)
+        seq1_mid = int(seq1_start) + half_dist
+        seq2_mid = int(seq2_start) + half_dist
+        data.setdefault((seq1_name, seq2_name), []).append((seq1_mid, seq2_mid, line))
+
+    # If there are chromosome pairs with segment count <= chunk_size
+    # then no need to sort and split these pairs into separate files.
+    # It is better to keep these pairs in a single segment file.
+    skip_pairs = []  # pairs that have count <= chunk_size. these will not be sorted
+
+    # save query key order
+    # for lastz segment files: 'Query sequence names must appear in the same
+    # order as they do in the query file'
+    query_key_order = list(dict.fromkeys([i[1] for i in data.keys()]))
+
+    # NOTE: assuming data.keys() preserves order of keys. Requires Python 3.7+
+
+    if len(data.keys()) > 1:
+        for pair in data.keys():
+            if len(data[pair]) <= chunk_size:
+                skip_pairs.append(pair)
+
+    # sorting for forward segments
+    if direction == "r":
+        for pair in data.keys():
+            if pair not in skip_pairs:
+                data[pair] = sorted(data[pair], key=lambda coord: (coord[1] - coord[0], coord[0]))
+    # sorting for reverse segments
+    elif direction == "f":
+        for pair in data.keys():
+            if pair not in skip_pairs:
+                data[pair] = sorted(data[pair], key=lambda coord: (coord[1] + coord[0], coord[0]))
+    else:
+        sys.exit(f"INVALID DIRECTION VALUE: {direction}")
+
+    # Writing file in chunks
+    ctr = 0
+    for pair in data.keys() - skip_pairs:  # [i for i in data_keys if i not in set(skip_pairs)]:
+        for chunk in chunks(list(zip(*data[pair]))[2], chunk_size):
+            ctr += 1
+            name_addition = f".split{ctr}"
+            fname = input_file.split(".segments", 1)[0] + name_addition + ".segments"
+
+            assert len(chunk) != 0
+            with open(fname, "w") as f:
+                f.writelines(chunk)
+            # update segment file in command
+            params[segment_index] = segment_key + fname
+            # update output file in command
+            params[output_index] = output_key + output_alignment_file_base + name_addition + "." + output_format
+            # update error file in command
+            params[-1] = err_name_base + name_addition + ".err"
+            print(" ".join(params), flush=True)
+
+    # writing unsorted skipped pairs
+    if len(skip_pairs) > 0:
+        skip_pairs_with_len = sorted([(len(data[p]), p) for p in skip_pairs])   # list of tuples of (pair length, pair)
+        # NOTE: This can violate lastz query key order requirement
+
+        query_key_order_table = {item: idx for idx, item in enumerate(query_key_order)}  # used for sorting
+
+        aggregated_skip_pairs: list[list[tuple[str, str]]] = []  # list of list of pair names
+        current_count = 0
+        aggregated_skip_pairs.append([])
+        for count, pair in skip_pairs_with_len:
+            if current_count + count <= chunk_size:
+                current_count += count
+                aggregated_skip_pairs[-1].append(pair)
+            else:
+                aggregated_skip_pairs.append([])
+                current_count = count
+                aggregated_skip_pairs[-1].append(pair)
+
+        for aggregate in aggregated_skip_pairs:
+            ctr += 1
+            name_addition = f".split{ctr}"
+            fname = input_file.split(".segments", 1)[0] + name_addition + ".segments"
+
+            with open(fname, "w") as f:
+                # fix possible lastz query key order violations
+                for pair in sorted(aggregate, key=lambda p: query_key_order_table[p[1]]):  # p[1] is query key
+                    chunk = list(zip(*data[pair]))[2]
+                    f.writelines(chunk)
+            # update segment file in command
+            params[segment_index] = segment_key + fname
+            # update output file in command
+            params[output_index] = output_key + output_alignment_file_base + name_addition + "." + output_format
+            # update error file in command
+            params[-1] = err_name_base + name_addition + ".err"
+            print(" ".join(params), flush=True)
+
+    if DELETE_AFTER_CHUNKING:
+        os.remove(input_file)
--- a/macros.xml	Tue Jul 09 17:37:53 2024 +0000
+++ b/macros.xml	Wed Jul 10 17:06:45 2024 +0000
@@ -1,7 +1,7 @@
 <macros>
     <xml name="requirements">
         <requirements>
-            <requirement type="package" version="@TOOL_VERSION@">segalign-full</requirement>
+            <requirement type="package" version="@TOOL_VERSION@">segalign-galaxy</requirement>
         </requirements>
     </xml>
     <token name="@TOOL_VERSION@">0.1.2.7</token>
--- a/package_output.py	Tue Jul 09 17:37:53 2024 +0000
+++ b/package_output.py	Wed Jul 10 17:06:45 2024 +0000
@@ -277,7 +277,7 @@
     config.read(lastz_command_config_file)
 
     package_file = PackageFile()
-    lastz_command_file = "lastz_commands.txt"
+    lastz_command_file = "lastz-commands.txt"
     bashCommandLineFile(lastz_command_file, config, package_file)
     package_file.close()
 
--- a/runner.py	Tue Jul 09 17:37:53 2024 +0000
+++ b/runner.py	Wed Jul 10 17:06:45 2024 +0000
@@ -301,17 +301,17 @@
 
     with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor:
         for i in range(num_workers):
-            executor.submit(diagonal_partition_worker(input_q, output_q, chunk_size, i))
+            executor.submit(diagonal_partition_worker(args, input_q, output_q, chunk_size, i))
 
 
-def diagonal_partition_worker(input_q: queue.Queue[str], output_q: queue.Queue[str], chunk_size: int, instance: int) -> None:
+def diagonal_partition_worker(args: argparse.Namespace, input_q: queue.Queue[str], output_q: queue.Queue[str], chunk_size: int, instance: int) -> None:
     while True:
         line = input_q.get()
         if line == SENTINEL_VALUE:
             input_q.task_done()
             break
 
-        run_args = ["python", "/jetstream2/scratch/rico/job-dir/tool_files/diagonal_partition.py", str(chunk_size)]
+        run_args = ["python", f"{args.tool_directory}/diagonal_partition.py", str(chunk_size)]
         for word in line.split():
             run_args.append(word)
         process = subprocess.run(run_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
@@ -380,6 +380,7 @@
         run_args.extend(segalign_args)
         run_args.append("--num_threads")
         run_args.append(str(args.num_cpu))
+        run_args.append("work/")
 
         if args.debug:
             beg: int = time.monotonic_ns()
--- a/segalign.xml	Tue Jul 09 17:37:53 2024 +0000
+++ b/segalign.xml	Wed Jul 10 17:06:45 2024 +0000
@@ -33,21 +33,26 @@
 ## Mode ----------------------------------------------------------------
 
 #if str($mode.mode_selector) == "segalign"
+    #if str($mode.gapped_extension_options.nogapped) == "false"
+        mkdir -p "\$(pwd)/work" && 
+        faToTwoBit <(gzip -cdfq '$mode.target') "\$(pwd)/work/ref.2bit" &&
+        faToTwoBit <(gzip -cdfq '$mode.query') "\$(pwd)/work/query.2bit" &&
+    #end if
     #if str($mode.diagonal_partition_options.diagonal_partition) == "true"
         #set $segalign_mode = "segalign_diagonal_partition"
-        ## explicitly calling bash to bypass a pulsar bug
+        ## explicitly calling python to bypass a pulsar bug
         ## https://github.com/galaxyproject/pulsar/issues/341
-        bash '$__tool_directory__/runner.py'
+        python '$__tool_directory__/runner.py'
             --output-type tarball
-            --output-filename fake
+            --output-file '$segalign_diagonal_partition_output'
             --diagonal-partition
             --num-cpu \${GALAXY_SLOTS:-2}
             --tool_directory '$__tool_directory__'
     #else
         #set $segalign_mode = "segalign"
-        bash '$__tool_directory__/runner.py'
+        python '$__tool_directory__/runner.py'
             --output-type output
-            --output-filename foo.maf
+            --output-file '$segalign_output'
             --num-cpu \${GALAXY_SLOTS:-2}
             --tool_directory '$__tool_directory__'
     #end if
@@ -57,6 +62,7 @@
     #set $segalign_mode = "segalign_repeat_masker"
         run_segalign_repeat_masker
             '$mode.seq_file'
+            --num_cpu \${GALAXY_SLOTS:-2}
 #end if
 
 ## Sequence Options ----------------------------------------------------
@@ -144,9 +150,6 @@
     ##      &&
     ##      Rscript $r_plot > /dev/null 2>&1
     ##  .end if
-    #if $segalign_mode == "segalign"
-            --output '$segalign_output'
-    #end if
 #else if $segalign_mode == "segalign_repeat_masker"
             --M '$mode.output_options.M'
             --output '$segalign_repeat_masker_output'
Binary file test-data/hg38.chr20.chunk.fa.gz has changed
Binary file test-data/mm39.chr2.chunk.fa.gz has changed
Binary file test-data/segalign-output.maf.gz has changed