Mercurial > repos > richard-burhans > batched_lastz
comparison run_lastz_tarball.py @ 11:aa5586d83ae3 draft
planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/batched_lastz commit e93d50e4ea6522b591e24c3b0ddbe8f9d322a93a
author | richard-burhans |
---|---|
date | Tue, 13 Aug 2024 23:38:09 +0000 |
parents | d10e7502aba8 |
children |
comparison
equal
deleted
inserted
replaced
10:d10e7502aba8 | 11:aa5586d83ae3 |
---|---|
29 def run_command( | 29 def run_command( |
30 instance: int, | 30 instance: int, |
31 input_queue: "queue.Queue[typing.Dict[str, typing.Any]]", | 31 input_queue: "queue.Queue[typing.Dict[str, typing.Any]]", |
32 output_queue: "queue.Queue[float]", | 32 output_queue: "queue.Queue[float]", |
33 debug: bool = False, | 33 debug: bool = False, |
34 ) -> None: | 34 ) -> str | None: |
35 os.chdir("galaxy/files") | 35 os.chdir("galaxy/files") |
36 | 36 |
37 # These are not considered errors even though | 37 # These are not considered errors even though |
38 # we will end up with a segmented alignment | 38 # we will end up with a segmented alignment |
39 truncation_regex = re.compile( | 39 truncation_regex = re.compile( |
43 | 43 |
44 while True: | 44 while True: |
45 command_dict = input_queue.get() | 45 command_dict = input_queue.get() |
46 | 46 |
47 if not command_dict: | 47 if not command_dict: |
48 return | 48 return None |
49 | 49 |
50 args = ["lastz", "--allocate:traceback=1.99G"] | 50 args = ["lastz", "--allocate:traceback=1.99G"] |
51 args.extend(command_dict["args"]) | 51 args.extend(command_dict["args"]) |
52 | 52 |
53 stdin = command_dict["stdin"] | 53 stdin = command_dict["stdin"] |
67 | 67 |
68 for var in [stdin, stdout, stderr]: | 68 for var in [stdin, stdout, stderr]: |
69 if var is not None: | 69 if var is not None: |
70 var.close() | 70 var.close() |
71 | 71 |
72 if p.returncode == 0: | 72 # if there is a stderr_file, make sure it is |
73 stderr = command_dict["stderr"] | 73 # empty or only contains truncation messages |
74 if stderr is not None: | 74 stderr_ok = True |
75 try: | 75 stderr_file = command_dict["stderr"] |
76 statinfo = os.stat(stderr, follow_symlinks=False) | 76 |
77 except Exception: | 77 if stderr_file is not None: |
78 statinfo = None | 78 try: |
79 | 79 statinfo = os.stat(stderr_file, follow_symlinks=False) |
80 if statinfo is None: | |
81 sys.exit(f"unable to stat stderr file: {' '.join(args)}") | |
82 | |
83 if statinfo.st_size != 0: | 80 if statinfo.st_size != 0: |
84 sys.exit(f"stderr file is not empty: {' '.join(args)}") | 81 with open(stderr_file) as f: |
85 | 82 for stderr_line in f: |
83 stderr_line = stderr_line.strip() | |
84 if (not truncation_regex.match(stderr_line) and stderr_line != truncation_msg): | |
85 stderr_ok = False | |
86 except Exception: | |
87 stderr_ok = False | |
88 | |
89 if p.returncode in [0, 1] and stderr_ok: | |
86 elapsed = time.perf_counter() - begin | 90 elapsed = time.perf_counter() - begin |
87 output_queue.put(elapsed) | 91 output_queue.put(elapsed) |
88 | |
89 elif p.returncode == 1: | |
90 traceback_warning = True | |
91 | |
92 stderr_file = command_dict["stderr"] | |
93 if stderr_file is None: | |
94 traceback_warning = False | |
95 else: | |
96 with open(stderr_file) as f: | |
97 for stderr_line in f: | |
98 stderr_line = stderr_line.strip() | |
99 if ( | |
100 not truncation_regex.match(stderr_line) | |
101 and stderr_line != truncation_msg | |
102 ): | |
103 traceback_warning = False | |
104 | |
105 if traceback_warning: | |
106 elapsed = time.perf_counter() - begin | |
107 output_queue.put(elapsed) | |
108 else: | |
109 sys.exit(f"command failed: {' '.join(args)}") | |
110 | |
111 else: | 92 else: |
112 sys.exit(f"command failed: {' '.join(args)}") | 93 return f"command failed (rc={p.returncode}): {' '.join(args)}" |
113 | |
114 if debug: | |
115 print(f"runtime {elapsed}", file=sys.stderr, flush=True) | |
116 | 94 |
117 | 95 |
118 class BatchTar: | 96 class BatchTar: |
119 def __init__(self, pathname: str, debug: bool = False) -> None: | 97 def __init__(self, pathname: str, debug: bool = False) -> None: |
120 self.pathname = pathname | 98 self.pathname = pathname |
331 debug=self.debug, | 309 debug=self.debug, |
332 ) | 310 ) |
333 for instance in range(self.parallel) | 311 for instance in range(self.parallel) |
334 ] | 312 ] |
335 | 313 |
314 found_falures = False | |
315 | |
336 for f in concurrent.futures.as_completed(futures): | 316 for f in concurrent.futures.as_completed(futures): |
317 result = f.result() | |
318 if result is not None: | |
319 print(f"lastz: {result}", file=sys.stderr, flush=True) | |
320 | |
337 if not f.done() or f.cancelled() or f.exception() is not None: | 321 if not f.done() or f.cancelled() or f.exception() is not None: |
338 sys.exit("lastz command failed") | 322 found_falures = True |
339 | 323 |
340 while not output_queue.empty(): | 324 while not output_queue.empty(): |
341 run_time = output_queue.get() | 325 run_time = output_queue.get() |
342 run_times.append(run_time) | 326 run_times.append(run_time) |
327 | |
328 if found_falures: | |
329 sys.exit("lastz command failed") | |
343 | 330 |
344 elapsed = time.perf_counter() - begin | 331 elapsed = time.perf_counter() - begin |
345 | 332 |
346 if self.debug: | 333 if self.debug: |
347 print(f"elapsed {elapsed}", file=sys.stderr, flush=True) | 334 print(f"elapsed {elapsed}", file=sys.stderr, flush=True) |
357 | 344 |
358 for file_type, file_list in self.output_files.items(): | 345 for file_type, file_list in self.output_files.items(): |
359 with open(f"output.{final_output_format}", "w") as ofh: | 346 with open(f"output.{final_output_format}", "w") as ofh: |
360 if final_output_format == "maf": | 347 if final_output_format == "maf": |
361 print("##maf version=1", file=ofh) | 348 print("##maf version=1", file=ofh) |
349 | |
362 for filename in file_list: | 350 for filename in file_list: |
363 with open(f"galaxy/files/{filename}") as ifh: | 351 with open(f"galaxy/files/{filename}") as ifh: |
364 for line in ifh: | 352 for line in ifh: |
365 ofh.write(line) | 353 ofh.write(line) |
366 | 354 |