Mercurial > repos > richard-burhans > segalign
comparison diagonal_partition.py @ 4:36cafb694dd2 draft
planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit b8aa943b38b865defab8a27e4404bb8a2131f919
author | richard-burhans |
---|---|
date | Tue, 23 Apr 2024 22:39:23 +0000 |
parents | 9e34b25a8670 |
children |
comparison
equal
deleted
inserted
replaced
3:6f46cebc9ed8 | 4:36cafb694dd2 |
---|---|
7 diagonal_partition <max-segments> <lastz-command> | 7 diagonal_partition <max-segments> <lastz-command> |
8 """ | 8 """ |
9 | 9 |
10 import os | 10 import os |
11 import sys | 11 import sys |
12 | 12 import typing |
13 | 13 |
14 def chunks(lst, n): | 14 T = typing.TypeVar("T", bound="_Sliceable") |
15 | |
16 | |
17 class _Sliceable(typing.Protocol): | |
18 def __len__(self) -> int: | |
19 ... | |
20 | |
21 def __getitem__(self: T, i: slice) -> T: | |
22 ... | |
23 | |
24 | |
25 def chunks(lst: T, n: int) -> typing.Iterator[T]: | |
15 """Yield successive n-sized chunks from list.""" | 26 """Yield successive n-sized chunks from list.""" |
16 for i in range(0, len(lst), n): | 27 for i in range(0, len(lst), n): |
17 yield lst[i: i + n] | 28 yield lst[i: i + n] |
18 | 29 |
19 | 30 |
27 params = sys.argv[2:] | 38 params = sys.argv[2:] |
28 | 39 |
29 # Parsing command output from SegAlign | 40 # Parsing command output from SegAlign |
30 segment_key = "--segments=" | 41 segment_key = "--segments=" |
31 segment_index = None | 42 segment_index = None |
32 input_file = None | 43 input_file: typing.Optional[str] = None |
33 | 44 |
34 for index, value in enumerate(params): | 45 for index, value in enumerate(params): |
35 if value[: len(segment_key)] == segment_key: | 46 if value[: len(segment_key)] == segment_key: |
36 segment_index = index | 47 segment_index = index |
37 input_file = value[len(segment_key):] | 48 input_file = value[len(segment_key):] |
38 break | 49 break |
39 if segment_index is None: | 50 if segment_index is None: |
40 print(f"Error: could not segment key {segment_key} in parameters {params}") | 51 print(f"Error: could not segment key {segment_key} in parameters {params}") |
41 exit(0) | 52 exit(0) |
42 | 53 |
43 if not os.path.isfile(input_file): | 54 if input_file is None or not os.path.isfile(input_file): |
44 print(f"Error: File {input_file} does not exist") | 55 print(f"Error: File {input_file} does not exist") |
45 exit(0) | 56 exit(0) |
46 | 57 |
47 if ( | 58 if ( |
48 chunk_size == 0 or sum(1 for _ in open(input_file)) <= chunk_size | 59 chunk_size == 0 or sum(1 for _ in open(input_file)) <= chunk_size |
50 print(" ".join(params), flush=True) | 61 print(" ".join(params), flush=True) |
51 exit(0) | 62 exit(0) |
52 | 63 |
53 # Find rest of relevant parameters | 64 # Find rest of relevant parameters |
54 output_key = "--output=" | 65 output_key = "--output=" |
55 output_index = None | 66 output_index: typing.Optional[int] = None |
56 output_alignment_file = None | 67 output_alignment_file = None |
57 output_alignment_file_base = None | 68 output_alignment_file_base: typing.Optional[str] = None |
58 output_format = None | 69 output_format = None |
59 | 70 |
60 strand_key = "--strand=" | 71 strand_key = "--strand=" |
61 strand_index = None | 72 strand_index = None |
62 for index, value in enumerate(params): | 73 for index, value in enumerate(params): |
76 exit(0) | 87 exit(0) |
77 | 88 |
78 err_index = -1 # error file is at very end | 89 err_index = -1 # error file is at very end |
79 err_name_base = params[-1].split(".err", 1)[0] | 90 err_name_base = params[-1].split(".err", 1)[0] |
80 | 91 |
81 data = {} # dict of list of tuple (x, y, str) | 92 data: typing.Dict[ |
93 typing.Tuple[str, str], typing.List[typing.Tuple[int, int, str]] | |
94 ] = {} # dict of list of tuple (x, y, str) | |
82 | 95 |
83 direction = None | 96 direction = None |
84 if "plus" in params[strand_index]: | 97 if "plus" in params[strand_index]: |
85 direction = "f" | 98 direction = "f" |
86 elif "minus" in params[strand_index]: | 99 elif "minus" in params[strand_index]: |
151 with open(fname, "w") as f: | 164 with open(fname, "w") as f: |
152 f.writelines(chunk) | 165 f.writelines(chunk) |
153 # update segment file in command | 166 # update segment file in command |
154 params[segment_index] = segment_key + fname | 167 params[segment_index] = segment_key + fname |
155 # update output file in command | 168 # update output file in command |
156 params[output_index] = ( | 169 if output_index is not None: |
157 output_key | 170 params[output_index] = ( |
158 + output_alignment_file_base | 171 f"{output_key}{output_alignment_file_base}{name_addition}.{output_format}" |
159 + name_addition | 172 ) |
160 + "." | |
161 + output_format | |
162 ) | |
163 # update error file in command | 173 # update error file in command |
164 params[-1] = err_name_base + name_addition + ".err" | 174 params[-1] = err_name_base + name_addition + ".err" |
165 print(" ".join(params), flush=True) | 175 print(" ".join(params), flush=True) |
166 | 176 |
167 # writing unsorted skipped pairs | 177 # writing unsorted skipped pairs |
168 if len(skip_pairs) > 0: | 178 if len(skip_pairs) > 0: |
169 skip_pairs_with_len = sorted( | 179 skip_pairs_with_len = sorted( |
170 [(len(data[p]), p) for p in skip_pairs] | 180 [(len(data[p]), p) for p in skip_pairs] |
171 ) # list of tuples of (pair length, pair) | 181 ) # list of tuples of (pair length, pair) |
172 aggregated_skip_pairs = [] # list of list of pair names | 182 aggregated_skip_pairs: typing.List[typing.List[typing.Any]] = ( |
183 [] | |
184 ) # list of list of pair names | |
173 current_count = 0 | 185 current_count = 0 |
174 aggregated_skip_pairs.append([]) | 186 aggregated_skip_pairs.append([]) |
175 for count, pair in skip_pairs_with_len: | 187 for count, pair in skip_pairs_with_len: |
176 if current_count + count <= chunk_size: | 188 if current_count + count <= chunk_size: |
177 current_count += count | 189 current_count += count |
190 chunk = list(zip(*data[pair]))[2] | 202 chunk = list(zip(*data[pair]))[2] |
191 f.writelines(chunk) | 203 f.writelines(chunk) |
192 # update segment file in command | 204 # update segment file in command |
193 params[segment_index] = segment_key + fname | 205 params[segment_index] = segment_key + fname |
194 # update output file in command | 206 # update output file in command |
195 params[output_index] = ( | 207 if output_index is not None: |
196 output_key | 208 params[output_index] = ( |
197 + output_alignment_file_base | 209 f"{output_key}{output_alignment_file_base}{name_addition}.{output_format}" |
198 + name_addition | 210 ) |
199 + "." | |
200 + output_format | |
201 ) | |
202 # update error file in command | 211 # update error file in command |
203 params[-1] = err_name_base + name_addition + ".err" | 212 params[-1] = err_name_base + name_addition + ".err" |
204 print(" ".join(params), flush=True) | 213 print(" ".join(params), flush=True) |
205 | 214 |
206 if DELETE_AFTER_CHUNKING: | 215 if DELETE_AFTER_CHUNKING: |