Mercurial > repos > richard-burhans > segalign
comparison runner.py @ 8:150de8a3954a draft
planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit 11132ef8b4103731b6f5860ea736bf332a06e303
author | richard-burhans |
---|---|
date | Tue, 09 Jul 2024 17:37:53 +0000 |
parents | |
children | 08e987868f0f |
comparison
equal
deleted
inserted
replaced
7:39c21be8f8b9 | 8:150de8a3954a |
---|---|
1 #!/usr/bin/env python | |
2 | |
3 import argparse | |
4 import collections | |
5 import concurrent.futures | |
6 import multiprocessing | |
7 import os | |
8 import queue | |
9 import re | |
10 import resource | |
11 import statistics | |
12 import subprocess | |
13 import sys | |
14 import time | |
15 import typing | |
16 | |
17 SENTINEL_VALUE: typing.Final = "SENTINEL" | |
18 # CA_SENTEL_VALUE: typing.Final = ChunkAddress(0, 0, 0, 0, 0, SENTINEL_VALUE) | |
19 RUSAGE_ATTRS: typing.Final = ["ru_utime", "ru_stime", "ru_maxrss", "ru_minflt", "ru_majflt", "ru_inblock", "ru_oublock", "ru_nvcsw", "ru_nivcsw"] | |
20 | |
21 | |
22 class LastzCommands: | |
23 def __init__(self) -> None: | |
24 self.commands: dict[str, LastzCommand] = {} | |
25 self.segalign_segments: SegAlignSegments = SegAlignSegments() | |
26 | |
27 def add(self, line: str) -> None: | |
28 if line not in self.commands: | |
29 self.commands[line] = LastzCommand(line) | |
30 | |
31 command = self.commands[line] | |
32 self.segalign_segments.add(command.segments_filename) | |
33 | |
34 def segments(self) -> typing.Iterator["SegAlignSegment"]: | |
35 for segment in self.segalign_segments: | |
36 yield segment | |
37 | |
38 | |
39 class LastzCommand: | |
40 lastz_command_regex = re.compile(r"lastz (.+?)?ref\.2bit\[nameparse=darkspace\]\[multiple\]\[subset=ref_block(\d+)\.name\] (.+?)?query\.2bit\[nameparse=darkspace\]\[subset=query_block(\d+)\.name] --format=(\S+) --ydrop=(\d+) --gappedthresh=(\d+) --strand=(minus|plus)(?: --ambiguous=(\S+))?(?: --(notrivial))?(?: --scoring=(\S+))? --segments=tmp(\d+)\.block(\d+)\.r(\d+)\.(minus|plus)(?:\.split(\d+))?\.segments --output=tmp(\d+)\.block(\d+)\.r(\d+)\.(minus|plus)(?:\.split(\d+))?\.(\S+) 2> tmp(\d+)\.block(\d+)\.r(\d+)\.(minus|plus)(?:\.split(\d+))?\.err") | |
41 | |
42 def __init__(self, line: str) -> None: | |
43 self.line = line | |
44 self.args: list[str] = [] | |
45 self.target_filename: str = '' | |
46 self.query_filename: str = '' | |
47 self.data_folder: str = '' | |
48 self.ref_block: int = 0 | |
49 self.query_block: int = 0 | |
50 self.output_format: str = '' | |
51 self.ydrop: int = 0 | |
52 self.gappedthresh: int = 0 | |
53 self.strand: int | None = None | |
54 self.ambiguous: bool = False | |
55 self.nontrivial: bool = False | |
56 self.scoring: str | None = None | |
57 self.segments_filename: str = '' | |
58 self.output_filename: str = '' | |
59 self.error_filename: str = '' | |
60 | |
61 self._parse_command() | |
62 | |
63 def _parse_command(self) -> None: | |
64 match = self.lastz_command_regex.match(self.line) | |
65 if not match: | |
66 sys.exit(f"Unkown lastz command format: {self.line}") | |
67 | |
68 self.data_folder = match.group(1) | |
69 self.ref_block = int(match.group(2)) | |
70 self.query_block = int(match.group(4)) | |
71 self.output_format = match.group(5) | |
72 self.ydrop = int(match.group(6)) | |
73 self.gappedthresh = int(match.group(7)) | |
74 strand = match.group(8) | |
75 | |
76 if strand == 'plus': | |
77 self.strand = 0 | |
78 elif strand == 'minus': | |
79 self.strand = 1 | |
80 | |
81 self.target_filename = f"{self.data_folder}ref.2bit[nameparse=darkspace][multiple][subset=ref_block{self.ref_block}.name]" | |
82 self.query_filename = f"{self.data_folder}query.2bit[nameparse=darkspace][subset=query_block{self.query_block}.name]" | |
83 | |
84 self.args = [ | |
85 "lastz", | |
86 self.target_filename, | |
87 self.query_filename, | |
88 f"--format={self.output_format}", | |
89 f"--ydrop={self.ydrop}", | |
90 f"--gappedthresh={self.gappedthresh}", | |
91 f"--strand={strand}" | |
92 ] | |
93 | |
94 ambiguous = match.group(9) | |
95 if ambiguous is not None: | |
96 self.ambiguous = True | |
97 self.args.append(f"--ambiguous={ambiguous}") | |
98 | |
99 nontrivial = match.group(10) | |
100 if nontrivial is not None: | |
101 self.nontrivial = True | |
102 self.args.append("--nontrivial") | |
103 | |
104 scoring = match.group(11) | |
105 if scoring is not None: | |
106 self.scoring = scoring | |
107 self.args.append(f"--scoring={scoring}") | |
108 | |
109 tmp_no = int(match.group(12)) | |
110 block_no = int(match.group(13)) | |
111 r_no = int(match.group(14)) | |
112 split = match.group(16) | |
113 | |
114 base_filename = f"tmp{tmp_no}.block{block_no}.r{r_no}.{strand}" | |
115 | |
116 if split is not None: | |
117 base_filename = f"{base_filename}.split{split}" | |
118 | |
119 self.segments_filename = f"{base_filename}.segments" | |
120 self.args.append(f"--segments={self.segments_filename}") | |
121 | |
122 self.output_filename = f"{base_filename}.{self.output_format}" | |
123 self.args.append(f"--output={self.output_filename}") | |
124 | |
125 self.error_filename = f"{base_filename}.err" | |
126 | |
127 | |
128 class SegAlignSegments: | |
129 def __init__(self) -> None: | |
130 self.segments: dict[str, SegAlignSegment] = {} | |
131 | |
132 def add(self, filename: str) -> None: | |
133 if filename not in self.segments: | |
134 self.segments[filename] = SegAlignSegment(filename) | |
135 | |
136 def __iter__(self) -> "SegAlignSegments": | |
137 return self | |
138 | |
139 def __next__(self) -> typing.Generator["SegAlignSegment", None, None]: | |
140 for segment in sorted(self.segments.values()): | |
141 yield segment | |
142 | |
143 | |
144 class SegAlignSegment: | |
145 def __init__(self, filename: str): | |
146 self.filename = filename | |
147 self.tmp: int = 0 | |
148 self.block: int = 0 | |
149 self.r: int = 0 | |
150 self.strand: int = 0 | |
151 self.split: int | None = None | |
152 self.fmt: str = "" | |
153 self._parse_filename() | |
154 | |
155 def _parse_filename(self) -> None: | |
156 match = re.match(r"tmp(\d+)\.block(\d+)\.r(\d+)\.(minus|plus)(?:\.split(\d+))?\.segments$", self.filename) | |
157 if not match: | |
158 sys.exit(f"Unkown segment filename format: {self.filename}") | |
159 | |
160 self.tmp = int(match.group(1)) | |
161 self.block = int(match.group(2)) | |
162 self.r = int(match.group(3)) | |
163 | |
164 strand = match.group(4) | |
165 if strand == 'plus': | |
166 self.strand = 0 | |
167 if strand == 'minus': | |
168 self.strand = 1 | |
169 | |
170 split = match.group(5) | |
171 if split is None: | |
172 self.split = None | |
173 else: | |
174 self.split = int(split) | |
175 | |
176 def __lt__(self, other: "SegAlignSegment") -> bool: | |
177 for attr in ['strand', 'tmp', 'block', 'r', 'split']: | |
178 self_value = getattr(self, attr) | |
179 other_value = getattr(other, attr) | |
180 if self_value < other_value: | |
181 return True | |
182 elif self_value > other_value: | |
183 return False | |
184 | |
185 return False | |
186 | |
187 | |
188 def main() -> None: | |
189 args, segalign_args = parse_args() | |
190 lastz_commands = LastzCommands() | |
191 | |
192 if args.diagonal_partition: | |
193 num_diagonal_partitioners = args.num_cpu | |
194 else: | |
195 num_diagonal_partitioners = 0 | |
196 | |
197 with multiprocessing.Manager() as manager: | |
198 segalign_q: queue.Queue[str] = manager.Queue() | |
199 skip_segalign = run_segalign(args, num_diagonal_partitioners, segalign_args, segalign_q, lastz_commands) | |
200 | |
201 if num_diagonal_partitioners > 0: | |
202 diagonal_partition_q = segalign_q | |
203 segalign_q = manager.Queue() | |
204 run_diagonal_partitioners(args, num_diagonal_partitioners, diagonal_partition_q, segalign_q) | |
205 | |
206 segalign_q.put(SENTINEL_VALUE) | |
207 output_q = segalign_q | |
208 segalign_q = manager.Queue() | |
209 | |
210 output_filename = "lastz-commands.txt" | |
211 if args.output_type == "commands": | |
212 output_filename = args.output_file | |
213 | |
214 with open(output_filename, "w") as f: | |
215 while True: | |
216 line = output_q.get() | |
217 if line == SENTINEL_VALUE: | |
218 output_q.task_done() | |
219 break | |
220 | |
221 # messy, fix this | |
222 if skip_segalign: | |
223 lastz_commands.add(line) | |
224 | |
225 if args.output_type != "commands": | |
226 segalign_q.put(line) | |
227 | |
228 print(line, file=f) | |
229 | |
230 if args.output_type == "output": | |
231 run_lastz(args, segalign_q, lastz_commands) | |
232 | |
233 with open(args.output_file, 'w') as of: | |
234 print("##maf version=1", file=of) | |
235 for lastz_command in lastz_commands.commands.values(): | |
236 with open(lastz_command.output_filename) as f: | |
237 for line in f: | |
238 of.write(line) | |
239 | |
240 if args.output_type == "tarball": | |
241 pass | |
242 | |
243 | |
244 def run_lastz(args: argparse.Namespace, input_q: queue.Queue[str], lastz_commands: LastzCommands) -> None: | |
245 num_lastz_workers = args.num_cpu | |
246 | |
247 for _ in range(num_lastz_workers): | |
248 input_q.put(SENTINEL_VALUE) | |
249 | |
250 if args.debug: | |
251 r_beg = resource.getrusage(resource.RUSAGE_CHILDREN) | |
252 beg: int = time.monotonic_ns() | |
253 | |
254 try: | |
255 with concurrent.futures.ProcessPoolExecutor(max_workers=num_lastz_workers) as executor: | |
256 for i in range(num_lastz_workers): | |
257 executor.submit(lastz_worker, input_q, i, lastz_commands) | |
258 except Exception as e: | |
259 sys.exit(f"Error: lastz failed: {e}") | |
260 | |
261 if args.debug: | |
262 ns: int = time.monotonic_ns() - beg | |
263 r_end = resource.getrusage(resource.RUSAGE_CHILDREN) | |
264 print(f"lastz clock time: {ns} ns", file=sys.stderr, flush=True) | |
265 for rusage_attr in RUSAGE_ATTRS: | |
266 value = getattr(r_end, rusage_attr) - getattr(r_beg, rusage_attr) | |
267 print(f" lastz {rusage_attr}: {value}", file=sys.stderr, flush=True) | |
268 | |
269 | |
270 def lastz_worker(input_q: queue.Queue[str], instance: int, lastz_commands: LastzCommands) -> None: | |
271 while True: | |
272 line = input_q.get() | |
273 if line == SENTINEL_VALUE: | |
274 input_q.task_done() | |
275 break | |
276 | |
277 if line not in lastz_commands.commands: | |
278 sys.exit(f"Error: lastz worker {instance} unexpected command format: {line}") | |
279 | |
280 command = lastz_commands.commands[line] | |
281 | |
282 if not os.path.exists(command.output_filename): | |
283 process = subprocess.run(command.args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
284 | |
285 for line in process.stdout.splitlines(): | |
286 print(line, file=sys.stdout, flush=True) | |
287 | |
288 if len(process.stderr) != 0: | |
289 for line in process.stderr.splitlines(): | |
290 print(line, file=sys.stderr, flush=True) | |
291 | |
292 if process.returncode != 0: | |
293 sys.exit(f"Error: lastz {instance} exited with returncode {process.returncode}") | |
294 | |
295 | |
296 def run_diagonal_partitioners(args: argparse.Namespace, num_workers: int, input_q: queue.Queue[str], output_q: queue.Queue[str]) -> None: | |
297 chunk_size = estimate_chunk_size(args) | |
298 | |
299 if args.debug: | |
300 print(f"estimated chunk size: {chunk_size}", file=sys.stderr, flush=True) | |
301 | |
302 with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor: | |
303 for i in range(num_workers): | |
304 executor.submit(diagonal_partition_worker(input_q, output_q, chunk_size, i)) | |
305 | |
306 | |
307 def diagonal_partition_worker(input_q: queue.Queue[str], output_q: queue.Queue[str], chunk_size: int, instance: int) -> None: | |
308 while True: | |
309 line = input_q.get() | |
310 if line == SENTINEL_VALUE: | |
311 input_q.task_done() | |
312 break | |
313 | |
314 run_args = ["python", "/jetstream2/scratch/rico/job-dir/tool_files/diagonal_partition.py", str(chunk_size)] | |
315 for word in line.split(): | |
316 run_args.append(word) | |
317 process = subprocess.run(run_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
318 | |
319 for line in process.stdout.splitlines(): | |
320 output_q.put(line) | |
321 | |
322 for line in process.stderr.splitlines(): | |
323 print(line, file=sys.stderr, flush=True) | |
324 | |
325 if process.returncode != 0: | |
326 sys.exit(f"Error: diagonal partitioner {instance} exited with returncode {process.returncode}") | |
327 | |
328 | |
329 def estimate_chunk_size(args: argparse.Namespace) -> int: | |
330 chunk_size = -1 | |
331 line_size = -1 | |
332 | |
333 if args.debug: | |
334 r_beg = resource.getrusage(resource.RUSAGE_SELF) | |
335 beg: int = time.monotonic_ns() | |
336 | |
337 # get size of each segment assuming DELETE_AFTER_CHUNKING == True | |
338 # takes into account already split segments | |
339 fdict: typing.DefaultDict[str, int] = collections.defaultdict(int) | |
340 for entry in os.scandir("."): | |
341 if entry.name.endswith(".segments"): | |
342 try: | |
343 file_size = entry.stat().st_size | |
344 except FileNotFoundError: | |
345 continue | |
346 | |
347 if line_size == -1: | |
348 try: | |
349 with open(entry.name) as f: | |
350 line_size = len(f.readline()) # add 1 for newline | |
351 except FileNotFoundError: | |
352 continue | |
353 | |
354 fdict[entry.name.split(".split", 1)[0]] += file_size | |
355 | |
356 # if noot enough segment files for estimation, continue | |
357 if len(fdict) > 2: | |
358 chunk_size = int(statistics.quantiles(fdict.values())[-1] // line_size) | |
359 | |
360 if args.debug: | |
361 ns: int = time.monotonic_ns() - beg | |
362 r_end = resource.getrusage(resource.RUSAGE_SELF) | |
363 print(f"estimate chunk size clock time: {ns} ns", file=sys.stderr, flush=True) | |
364 for rusage_attr in RUSAGE_ATTRS: | |
365 value = getattr(r_end, rusage_attr) - getattr(r_beg, rusage_attr) | |
366 print(f" estimate chunk size {rusage_attr}: {value}", file=sys.stderr, flush=True) | |
367 | |
368 return chunk_size | |
369 | |
370 | |
371 def run_segalign(args: argparse.Namespace, num_sentinel: int, segalign_args: list[str], segalign_q: queue.Queue[str], commands: LastzCommands) -> bool: | |
372 skip_segalign: bool = False | |
373 | |
374 # use the currently existing output file if it exists | |
375 if args.debug: | |
376 skip_segalign = load_segalign_output("lastz-commands.txt", segalign_q) | |
377 | |
378 if not skip_segalign: | |
379 run_args = ["segalign"] | |
380 run_args.extend(segalign_args) | |
381 run_args.append("--num_threads") | |
382 run_args.append(str(args.num_cpu)) | |
383 | |
384 if args.debug: | |
385 beg: int = time.monotonic_ns() | |
386 r_beg = resource.getrusage(resource.RUSAGE_CHILDREN) | |
387 | |
388 process = subprocess.run(run_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
389 | |
390 for line in process.stdout.splitlines(): | |
391 commands.add(line) | |
392 segalign_q.put(line) | |
393 | |
394 if len(process.stderr) != 0: | |
395 for line in process.stderr.splitlines(): | |
396 print(line, file=sys.stderr, flush=True) | |
397 | |
398 if process.returncode != 0: | |
399 sys.exit(f"Error: segalign exited with returncode {process.returncode}") | |
400 | |
401 if args.debug: | |
402 ns: int = time.monotonic_ns() - beg | |
403 r_end = resource.getrusage(resource.RUSAGE_CHILDREN) | |
404 print(f"segalign clock time: {ns} ns", file=sys.stderr, flush=True) | |
405 for rusage_attr in RUSAGE_ATTRS: | |
406 value = getattr(r_end, rusage_attr) - getattr(r_beg, rusage_attr) | |
407 print(f" segalign {rusage_attr}: {value}", flush=True) | |
408 | |
409 for _ in range(num_sentinel): | |
410 segalign_q.put(SENTINEL_VALUE) | |
411 | |
412 return skip_segalign | |
413 | |
414 | |
415 def load_segalign_output(filename: str, segalign_q: queue.Queue[str]) -> bool: | |
416 load_success = False | |
417 | |
418 r_beg = resource.getrusage(resource.RUSAGE_SELF) | |
419 beg: int = time.monotonic_ns() | |
420 | |
421 try: | |
422 with open(filename) as f: | |
423 for line in f: | |
424 line = line.rstrip("\n") | |
425 segalign_q.put(line) | |
426 load_success = True | |
427 except FileNotFoundError: | |
428 pass | |
429 | |
430 if load_success: | |
431 ns: int = time.monotonic_ns() - beg | |
432 r_end = resource.getrusage(resource.RUSAGE_SELF) | |
433 print(f"load output clock time: {ns} ns", file=sys.stderr, flush=True) | |
434 for rusage_attr in RUSAGE_ATTRS: | |
435 value = getattr(r_end, rusage_attr) - getattr(r_beg, rusage_attr) | |
436 print(f" load output {rusage_attr}: {value}", flush=True) | |
437 | |
438 return load_success | |
439 | |
440 | |
441 def parse_args() -> tuple[argparse.Namespace, list[str]]: | |
442 parser = argparse.ArgumentParser(allow_abbrev=False) | |
443 | |
444 parser.add_argument("--output-type", nargs="?", const="commands", default="commands", type=str, choices=["commands", "output", "tarball"], help="output type (default: %(default)s)") | |
445 parser.add_argument("--output-file", type=str, required=True, help="output pathname") | |
446 parser.add_argument("--diagonal-partition", action="store_true", help="run diagonal partition optimization") | |
447 parser.add_argument("--nogapped", action="store_true", help="don't perform gapped extension stage") | |
448 parser.add_argument("--markend", action="store_true", help="write a marker line just before completion") | |
449 parser.add_argument("--num-gpu", default=-1, type=int, help="number of GPUs to use (default: %(default)s [use all GPUs])") | |
450 parser.add_argument("--num-cpu", default=-1, type=int, help="number of CPUs to use (default: %(default)s [use all CPUs])") | |
451 parser.add_argument("--debug", action="store_true", help="print debug messages") | |
452 parser.add_argument("--tool_directory", type=str, required=True, help="tool directory") | |
453 | |
454 if not sys.argv[1:]: | |
455 parser.print_help() | |
456 sys.exit(0) | |
457 | |
458 args, segalign_args = parser.parse_known_args(sys.argv[1:]) | |
459 | |
460 cpus_available = len(os.sched_getaffinity(0)) | |
461 if args.num_cpu == -1: | |
462 args.num_cpu = cpus_available | |
463 elif args.num_cpu > cpus_available: | |
464 sys.exit(f"Error: additional {args.num_cpu - cpus_available} CPUs") | |
465 | |
466 if args.nogapped: | |
467 segalign_args.append("--nogapped") | |
468 | |
469 if args.markend: | |
470 segalign_args.append("--markend") | |
471 | |
472 if args.num_gpu != -1: | |
473 segalign_args.extend(["--num_gpu", f"{args.num_gpu}"]) | |
474 | |
475 if args.debug: | |
476 segalign_args.append("--debug") | |
477 | |
478 return args, segalign_args | |
479 | |
480 | |
481 if __name__ == "__main__": | |
482 main() |