comparison runner.py @ 8:150de8a3954a draft

planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit 11132ef8b4103731b6f5860ea736bf332a06e303
author richard-burhans
date Tue, 09 Jul 2024 17:37:53 +0000
parents
children 08e987868f0f
comparison
equal deleted inserted replaced
7:39c21be8f8b9 8:150de8a3954a
1 #!/usr/bin/env python
2
3 import argparse
4 import collections
5 import concurrent.futures
6 import multiprocessing
7 import os
8 import queue
9 import re
10 import resource
11 import statistics
12 import subprocess
13 import sys
14 import time
15 import typing
16
17 SENTINEL_VALUE: typing.Final = "SENTINEL"
18 # CA_SENTEL_VALUE: typing.Final = ChunkAddress(0, 0, 0, 0, 0, SENTINEL_VALUE)
19 RUSAGE_ATTRS: typing.Final = ["ru_utime", "ru_stime", "ru_maxrss", "ru_minflt", "ru_majflt", "ru_inblock", "ru_oublock", "ru_nvcsw", "ru_nivcsw"]
20
21
22 class LastzCommands:
23 def __init__(self) -> None:
24 self.commands: dict[str, LastzCommand] = {}
25 self.segalign_segments: SegAlignSegments = SegAlignSegments()
26
27 def add(self, line: str) -> None:
28 if line not in self.commands:
29 self.commands[line] = LastzCommand(line)
30
31 command = self.commands[line]
32 self.segalign_segments.add(command.segments_filename)
33
34 def segments(self) -> typing.Iterator["SegAlignSegment"]:
35 for segment in self.segalign_segments:
36 yield segment
37
38
39 class LastzCommand:
40 lastz_command_regex = re.compile(r"lastz (.+?)?ref\.2bit\[nameparse=darkspace\]\[multiple\]\[subset=ref_block(\d+)\.name\] (.+?)?query\.2bit\[nameparse=darkspace\]\[subset=query_block(\d+)\.name] --format=(\S+) --ydrop=(\d+) --gappedthresh=(\d+) --strand=(minus|plus)(?: --ambiguous=(\S+))?(?: --(notrivial))?(?: --scoring=(\S+))? --segments=tmp(\d+)\.block(\d+)\.r(\d+)\.(minus|plus)(?:\.split(\d+))?\.segments --output=tmp(\d+)\.block(\d+)\.r(\d+)\.(minus|plus)(?:\.split(\d+))?\.(\S+) 2> tmp(\d+)\.block(\d+)\.r(\d+)\.(minus|plus)(?:\.split(\d+))?\.err")
41
42 def __init__(self, line: str) -> None:
43 self.line = line
44 self.args: list[str] = []
45 self.target_filename: str = ''
46 self.query_filename: str = ''
47 self.data_folder: str = ''
48 self.ref_block: int = 0
49 self.query_block: int = 0
50 self.output_format: str = ''
51 self.ydrop: int = 0
52 self.gappedthresh: int = 0
53 self.strand: int | None = None
54 self.ambiguous: bool = False
55 self.nontrivial: bool = False
56 self.scoring: str | None = None
57 self.segments_filename: str = ''
58 self.output_filename: str = ''
59 self.error_filename: str = ''
60
61 self._parse_command()
62
63 def _parse_command(self) -> None:
64 match = self.lastz_command_regex.match(self.line)
65 if not match:
66 sys.exit(f"Unkown lastz command format: {self.line}")
67
68 self.data_folder = match.group(1)
69 self.ref_block = int(match.group(2))
70 self.query_block = int(match.group(4))
71 self.output_format = match.group(5)
72 self.ydrop = int(match.group(6))
73 self.gappedthresh = int(match.group(7))
74 strand = match.group(8)
75
76 if strand == 'plus':
77 self.strand = 0
78 elif strand == 'minus':
79 self.strand = 1
80
81 self.target_filename = f"{self.data_folder}ref.2bit[nameparse=darkspace][multiple][subset=ref_block{self.ref_block}.name]"
82 self.query_filename = f"{self.data_folder}query.2bit[nameparse=darkspace][subset=query_block{self.query_block}.name]"
83
84 self.args = [
85 "lastz",
86 self.target_filename,
87 self.query_filename,
88 f"--format={self.output_format}",
89 f"--ydrop={self.ydrop}",
90 f"--gappedthresh={self.gappedthresh}",
91 f"--strand={strand}"
92 ]
93
94 ambiguous = match.group(9)
95 if ambiguous is not None:
96 self.ambiguous = True
97 self.args.append(f"--ambiguous={ambiguous}")
98
99 nontrivial = match.group(10)
100 if nontrivial is not None:
101 self.nontrivial = True
102 self.args.append("--nontrivial")
103
104 scoring = match.group(11)
105 if scoring is not None:
106 self.scoring = scoring
107 self.args.append(f"--scoring={scoring}")
108
109 tmp_no = int(match.group(12))
110 block_no = int(match.group(13))
111 r_no = int(match.group(14))
112 split = match.group(16)
113
114 base_filename = f"tmp{tmp_no}.block{block_no}.r{r_no}.{strand}"
115
116 if split is not None:
117 base_filename = f"{base_filename}.split{split}"
118
119 self.segments_filename = f"{base_filename}.segments"
120 self.args.append(f"--segments={self.segments_filename}")
121
122 self.output_filename = f"{base_filename}.{self.output_format}"
123 self.args.append(f"--output={self.output_filename}")
124
125 self.error_filename = f"{base_filename}.err"
126
127
128 class SegAlignSegments:
129 def __init__(self) -> None:
130 self.segments: dict[str, SegAlignSegment] = {}
131
132 def add(self, filename: str) -> None:
133 if filename not in self.segments:
134 self.segments[filename] = SegAlignSegment(filename)
135
136 def __iter__(self) -> "SegAlignSegments":
137 return self
138
139 def __next__(self) -> typing.Generator["SegAlignSegment", None, None]:
140 for segment in sorted(self.segments.values()):
141 yield segment
142
143
144 class SegAlignSegment:
145 def __init__(self, filename: str):
146 self.filename = filename
147 self.tmp: int = 0
148 self.block: int = 0
149 self.r: int = 0
150 self.strand: int = 0
151 self.split: int | None = None
152 self.fmt: str = ""
153 self._parse_filename()
154
155 def _parse_filename(self) -> None:
156 match = re.match(r"tmp(\d+)\.block(\d+)\.r(\d+)\.(minus|plus)(?:\.split(\d+))?\.segments$", self.filename)
157 if not match:
158 sys.exit(f"Unkown segment filename format: {self.filename}")
159
160 self.tmp = int(match.group(1))
161 self.block = int(match.group(2))
162 self.r = int(match.group(3))
163
164 strand = match.group(4)
165 if strand == 'plus':
166 self.strand = 0
167 if strand == 'minus':
168 self.strand = 1
169
170 split = match.group(5)
171 if split is None:
172 self.split = None
173 else:
174 self.split = int(split)
175
176 def __lt__(self, other: "SegAlignSegment") -> bool:
177 for attr in ['strand', 'tmp', 'block', 'r', 'split']:
178 self_value = getattr(self, attr)
179 other_value = getattr(other, attr)
180 if self_value < other_value:
181 return True
182 elif self_value > other_value:
183 return False
184
185 return False
186
187
188 def main() -> None:
189 args, segalign_args = parse_args()
190 lastz_commands = LastzCommands()
191
192 if args.diagonal_partition:
193 num_diagonal_partitioners = args.num_cpu
194 else:
195 num_diagonal_partitioners = 0
196
197 with multiprocessing.Manager() as manager:
198 segalign_q: queue.Queue[str] = manager.Queue()
199 skip_segalign = run_segalign(args, num_diagonal_partitioners, segalign_args, segalign_q, lastz_commands)
200
201 if num_diagonal_partitioners > 0:
202 diagonal_partition_q = segalign_q
203 segalign_q = manager.Queue()
204 run_diagonal_partitioners(args, num_diagonal_partitioners, diagonal_partition_q, segalign_q)
205
206 segalign_q.put(SENTINEL_VALUE)
207 output_q = segalign_q
208 segalign_q = manager.Queue()
209
210 output_filename = "lastz-commands.txt"
211 if args.output_type == "commands":
212 output_filename = args.output_file
213
214 with open(output_filename, "w") as f:
215 while True:
216 line = output_q.get()
217 if line == SENTINEL_VALUE:
218 output_q.task_done()
219 break
220
221 # messy, fix this
222 if skip_segalign:
223 lastz_commands.add(line)
224
225 if args.output_type != "commands":
226 segalign_q.put(line)
227
228 print(line, file=f)
229
230 if args.output_type == "output":
231 run_lastz(args, segalign_q, lastz_commands)
232
233 with open(args.output_file, 'w') as of:
234 print("##maf version=1", file=of)
235 for lastz_command in lastz_commands.commands.values():
236 with open(lastz_command.output_filename) as f:
237 for line in f:
238 of.write(line)
239
240 if args.output_type == "tarball":
241 pass
242
243
244 def run_lastz(args: argparse.Namespace, input_q: queue.Queue[str], lastz_commands: LastzCommands) -> None:
245 num_lastz_workers = args.num_cpu
246
247 for _ in range(num_lastz_workers):
248 input_q.put(SENTINEL_VALUE)
249
250 if args.debug:
251 r_beg = resource.getrusage(resource.RUSAGE_CHILDREN)
252 beg: int = time.monotonic_ns()
253
254 try:
255 with concurrent.futures.ProcessPoolExecutor(max_workers=num_lastz_workers) as executor:
256 for i in range(num_lastz_workers):
257 executor.submit(lastz_worker, input_q, i, lastz_commands)
258 except Exception as e:
259 sys.exit(f"Error: lastz failed: {e}")
260
261 if args.debug:
262 ns: int = time.monotonic_ns() - beg
263 r_end = resource.getrusage(resource.RUSAGE_CHILDREN)
264 print(f"lastz clock time: {ns} ns", file=sys.stderr, flush=True)
265 for rusage_attr in RUSAGE_ATTRS:
266 value = getattr(r_end, rusage_attr) - getattr(r_beg, rusage_attr)
267 print(f" lastz {rusage_attr}: {value}", file=sys.stderr, flush=True)
268
269
270 def lastz_worker(input_q: queue.Queue[str], instance: int, lastz_commands: LastzCommands) -> None:
271 while True:
272 line = input_q.get()
273 if line == SENTINEL_VALUE:
274 input_q.task_done()
275 break
276
277 if line not in lastz_commands.commands:
278 sys.exit(f"Error: lastz worker {instance} unexpected command format: {line}")
279
280 command = lastz_commands.commands[line]
281
282 if not os.path.exists(command.output_filename):
283 process = subprocess.run(command.args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
284
285 for line in process.stdout.splitlines():
286 print(line, file=sys.stdout, flush=True)
287
288 if len(process.stderr) != 0:
289 for line in process.stderr.splitlines():
290 print(line, file=sys.stderr, flush=True)
291
292 if process.returncode != 0:
293 sys.exit(f"Error: lastz {instance} exited with returncode {process.returncode}")
294
295
296 def run_diagonal_partitioners(args: argparse.Namespace, num_workers: int, input_q: queue.Queue[str], output_q: queue.Queue[str]) -> None:
297 chunk_size = estimate_chunk_size(args)
298
299 if args.debug:
300 print(f"estimated chunk size: {chunk_size}", file=sys.stderr, flush=True)
301
302 with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor:
303 for i in range(num_workers):
304 executor.submit(diagonal_partition_worker(input_q, output_q, chunk_size, i))
305
306
307 def diagonal_partition_worker(input_q: queue.Queue[str], output_q: queue.Queue[str], chunk_size: int, instance: int) -> None:
308 while True:
309 line = input_q.get()
310 if line == SENTINEL_VALUE:
311 input_q.task_done()
312 break
313
314 run_args = ["python", "/jetstream2/scratch/rico/job-dir/tool_files/diagonal_partition.py", str(chunk_size)]
315 for word in line.split():
316 run_args.append(word)
317 process = subprocess.run(run_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
318
319 for line in process.stdout.splitlines():
320 output_q.put(line)
321
322 for line in process.stderr.splitlines():
323 print(line, file=sys.stderr, flush=True)
324
325 if process.returncode != 0:
326 sys.exit(f"Error: diagonal partitioner {instance} exited with returncode {process.returncode}")
327
328
329 def estimate_chunk_size(args: argparse.Namespace) -> int:
330 chunk_size = -1
331 line_size = -1
332
333 if args.debug:
334 r_beg = resource.getrusage(resource.RUSAGE_SELF)
335 beg: int = time.monotonic_ns()
336
337 # get size of each segment assuming DELETE_AFTER_CHUNKING == True
338 # takes into account already split segments
339 fdict: typing.DefaultDict[str, int] = collections.defaultdict(int)
340 for entry in os.scandir("."):
341 if entry.name.endswith(".segments"):
342 try:
343 file_size = entry.stat().st_size
344 except FileNotFoundError:
345 continue
346
347 if line_size == -1:
348 try:
349 with open(entry.name) as f:
350 line_size = len(f.readline()) # add 1 for newline
351 except FileNotFoundError:
352 continue
353
354 fdict[entry.name.split(".split", 1)[0]] += file_size
355
356 # if noot enough segment files for estimation, continue
357 if len(fdict) > 2:
358 chunk_size = int(statistics.quantiles(fdict.values())[-1] // line_size)
359
360 if args.debug:
361 ns: int = time.monotonic_ns() - beg
362 r_end = resource.getrusage(resource.RUSAGE_SELF)
363 print(f"estimate chunk size clock time: {ns} ns", file=sys.stderr, flush=True)
364 for rusage_attr in RUSAGE_ATTRS:
365 value = getattr(r_end, rusage_attr) - getattr(r_beg, rusage_attr)
366 print(f" estimate chunk size {rusage_attr}: {value}", file=sys.stderr, flush=True)
367
368 return chunk_size
369
370
371 def run_segalign(args: argparse.Namespace, num_sentinel: int, segalign_args: list[str], segalign_q: queue.Queue[str], commands: LastzCommands) -> bool:
372 skip_segalign: bool = False
373
374 # use the currently existing output file if it exists
375 if args.debug:
376 skip_segalign = load_segalign_output("lastz-commands.txt", segalign_q)
377
378 if not skip_segalign:
379 run_args = ["segalign"]
380 run_args.extend(segalign_args)
381 run_args.append("--num_threads")
382 run_args.append(str(args.num_cpu))
383
384 if args.debug:
385 beg: int = time.monotonic_ns()
386 r_beg = resource.getrusage(resource.RUSAGE_CHILDREN)
387
388 process = subprocess.run(run_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
389
390 for line in process.stdout.splitlines():
391 commands.add(line)
392 segalign_q.put(line)
393
394 if len(process.stderr) != 0:
395 for line in process.stderr.splitlines():
396 print(line, file=sys.stderr, flush=True)
397
398 if process.returncode != 0:
399 sys.exit(f"Error: segalign exited with returncode {process.returncode}")
400
401 if args.debug:
402 ns: int = time.monotonic_ns() - beg
403 r_end = resource.getrusage(resource.RUSAGE_CHILDREN)
404 print(f"segalign clock time: {ns} ns", file=sys.stderr, flush=True)
405 for rusage_attr in RUSAGE_ATTRS:
406 value = getattr(r_end, rusage_attr) - getattr(r_beg, rusage_attr)
407 print(f" segalign {rusage_attr}: {value}", flush=True)
408
409 for _ in range(num_sentinel):
410 segalign_q.put(SENTINEL_VALUE)
411
412 return skip_segalign
413
414
415 def load_segalign_output(filename: str, segalign_q: queue.Queue[str]) -> bool:
416 load_success = False
417
418 r_beg = resource.getrusage(resource.RUSAGE_SELF)
419 beg: int = time.monotonic_ns()
420
421 try:
422 with open(filename) as f:
423 for line in f:
424 line = line.rstrip("\n")
425 segalign_q.put(line)
426 load_success = True
427 except FileNotFoundError:
428 pass
429
430 if load_success:
431 ns: int = time.monotonic_ns() - beg
432 r_end = resource.getrusage(resource.RUSAGE_SELF)
433 print(f"load output clock time: {ns} ns", file=sys.stderr, flush=True)
434 for rusage_attr in RUSAGE_ATTRS:
435 value = getattr(r_end, rusage_attr) - getattr(r_beg, rusage_attr)
436 print(f" load output {rusage_attr}: {value}", flush=True)
437
438 return load_success
439
440
441 def parse_args() -> tuple[argparse.Namespace, list[str]]:
442 parser = argparse.ArgumentParser(allow_abbrev=False)
443
444 parser.add_argument("--output-type", nargs="?", const="commands", default="commands", type=str, choices=["commands", "output", "tarball"], help="output type (default: %(default)s)")
445 parser.add_argument("--output-file", type=str, required=True, help="output pathname")
446 parser.add_argument("--diagonal-partition", action="store_true", help="run diagonal partition optimization")
447 parser.add_argument("--nogapped", action="store_true", help="don't perform gapped extension stage")
448 parser.add_argument("--markend", action="store_true", help="write a marker line just before completion")
449 parser.add_argument("--num-gpu", default=-1, type=int, help="number of GPUs to use (default: %(default)s [use all GPUs])")
450 parser.add_argument("--num-cpu", default=-1, type=int, help="number of CPUs to use (default: %(default)s [use all CPUs])")
451 parser.add_argument("--debug", action="store_true", help="print debug messages")
452 parser.add_argument("--tool_directory", type=str, required=True, help="tool directory")
453
454 if not sys.argv[1:]:
455 parser.print_help()
456 sys.exit(0)
457
458 args, segalign_args = parser.parse_known_args(sys.argv[1:])
459
460 cpus_available = len(os.sched_getaffinity(0))
461 if args.num_cpu == -1:
462 args.num_cpu = cpus_available
463 elif args.num_cpu > cpus_available:
464 sys.exit(f"Error: additional {args.num_cpu - cpus_available} CPUs")
465
466 if args.nogapped:
467 segalign_args.append("--nogapped")
468
469 if args.markend:
470 segalign_args.append("--markend")
471
472 if args.num_gpu != -1:
473 segalign_args.extend(["--num_gpu", f"{args.num_gpu}"])
474
475 if args.debug:
476 segalign_args.append("--debug")
477
478 return args, segalign_args
479
480
481 if __name__ == "__main__":
482 main()