comparison run_lastz_tarball.py @ 11:aa5586d83ae3 draft

planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/batched_lastz commit e93d50e4ea6522b591e24c3b0ddbe8f9d322a93a
author richard-burhans
date Tue, 13 Aug 2024 23:38:09 +0000
parents d10e7502aba8
children
comparison
equal deleted inserted replaced
10:d10e7502aba8 11:aa5586d83ae3
29 def run_command( 29 def run_command(
30 instance: int, 30 instance: int,
31 input_queue: "queue.Queue[typing.Dict[str, typing.Any]]", 31 input_queue: "queue.Queue[typing.Dict[str, typing.Any]]",
32 output_queue: "queue.Queue[float]", 32 output_queue: "queue.Queue[float]",
33 debug: bool = False, 33 debug: bool = False,
34 ) -> None: 34 ) -> str | None:
35 os.chdir("galaxy/files") 35 os.chdir("galaxy/files")
36 36
37 # These are not considered errors even though 37 # These are not considered errors even though
38 # we will end up with a segmented alignment 38 # we will end up with a segmented alignment
39 truncation_regex = re.compile( 39 truncation_regex = re.compile(
43 43
44 while True: 44 while True:
45 command_dict = input_queue.get() 45 command_dict = input_queue.get()
46 46
47 if not command_dict: 47 if not command_dict:
48 return 48 return None
49 49
50 args = ["lastz", "--allocate:traceback=1.99G"] 50 args = ["lastz", "--allocate:traceback=1.99G"]
51 args.extend(command_dict["args"]) 51 args.extend(command_dict["args"])
52 52
53 stdin = command_dict["stdin"] 53 stdin = command_dict["stdin"]
67 67
68 for var in [stdin, stdout, stderr]: 68 for var in [stdin, stdout, stderr]:
69 if var is not None: 69 if var is not None:
70 var.close() 70 var.close()
71 71
72 if p.returncode == 0: 72 # if there is a stderr_file, make sure it is
73 stderr = command_dict["stderr"] 73 # empty or only contains truncation messages
74 if stderr is not None: 74 stderr_ok = True
75 try: 75 stderr_file = command_dict["stderr"]
76 statinfo = os.stat(stderr, follow_symlinks=False) 76
77 except Exception: 77 if stderr_file is not None:
78 statinfo = None 78 try:
79 79 statinfo = os.stat(stderr_file, follow_symlinks=False)
80 if statinfo is None:
81 sys.exit(f"unable to stat stderr file: {' '.join(args)}")
82
83 if statinfo.st_size != 0: 80 if statinfo.st_size != 0:
84 sys.exit(f"stderr file is not empty: {' '.join(args)}") 81 with open(stderr_file) as f:
85 82 for stderr_line in f:
83 stderr_line = stderr_line.strip()
84 if (not truncation_regex.match(stderr_line) and stderr_line != truncation_msg):
85 stderr_ok = False
86 except Exception:
87 stderr_ok = False
88
89 if p.returncode in [0, 1] and stderr_ok:
86 elapsed = time.perf_counter() - begin 90 elapsed = time.perf_counter() - begin
87 output_queue.put(elapsed) 91 output_queue.put(elapsed)
88
89 elif p.returncode == 1:
90 traceback_warning = True
91
92 stderr_file = command_dict["stderr"]
93 if stderr_file is None:
94 traceback_warning = False
95 else:
96 with open(stderr_file) as f:
97 for stderr_line in f:
98 stderr_line = stderr_line.strip()
99 if (
100 not truncation_regex.match(stderr_line)
101 and stderr_line != truncation_msg
102 ):
103 traceback_warning = False
104
105 if traceback_warning:
106 elapsed = time.perf_counter() - begin
107 output_queue.put(elapsed)
108 else:
109 sys.exit(f"command failed: {' '.join(args)}")
110
111 else: 92 else:
112 sys.exit(f"command failed: {' '.join(args)}") 93 return f"command failed (rc={p.returncode}): {' '.join(args)}"
113
114 if debug:
115 print(f"runtime {elapsed}", file=sys.stderr, flush=True)
116 94
117 95
118 class BatchTar: 96 class BatchTar:
119 def __init__(self, pathname: str, debug: bool = False) -> None: 97 def __init__(self, pathname: str, debug: bool = False) -> None:
120 self.pathname = pathname 98 self.pathname = pathname
331 debug=self.debug, 309 debug=self.debug,
332 ) 310 )
333 for instance in range(self.parallel) 311 for instance in range(self.parallel)
334 ] 312 ]
335 313
314 found_falures = False
315
336 for f in concurrent.futures.as_completed(futures): 316 for f in concurrent.futures.as_completed(futures):
317 result = f.result()
318 if result is not None:
319 print(f"lastz: {result}", file=sys.stderr, flush=True)
320
337 if not f.done() or f.cancelled() or f.exception() is not None: 321 if not f.done() or f.cancelled() or f.exception() is not None:
338 sys.exit("lastz command failed") 322 found_falures = True
339 323
340 while not output_queue.empty(): 324 while not output_queue.empty():
341 run_time = output_queue.get() 325 run_time = output_queue.get()
342 run_times.append(run_time) 326 run_times.append(run_time)
327
328 if found_falures:
329 sys.exit("lastz command failed")
343 330
344 elapsed = time.perf_counter() - begin 331 elapsed = time.perf_counter() - begin
345 332
346 if self.debug: 333 if self.debug:
347 print(f"elapsed {elapsed}", file=sys.stderr, flush=True) 334 print(f"elapsed {elapsed}", file=sys.stderr, flush=True)
357 344
358 for file_type, file_list in self.output_files.items(): 345 for file_type, file_list in self.output_files.items():
359 with open(f"output.{final_output_format}", "w") as ofh: 346 with open(f"output.{final_output_format}", "w") as ofh:
360 if final_output_format == "maf": 347 if final_output_format == "maf":
361 print("##maf version=1", file=ofh) 348 print("##maf version=1", file=ofh)
349
362 for filename in file_list: 350 for filename in file_list:
363 with open(f"galaxy/files/{filename}") as ifh: 351 with open(f"galaxy/files/{filename}") as ifh:
364 for line in ifh: 352 for line in ifh:
365 ofh.write(line) 353 ofh.write(line)
366 354