Mercurial > repos > richard-burhans > batched_lastz
comparison run_lastz_tarball.py @ 0:103538753e81 draft
planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/batched_lastz commit 7b119b432f721e228a73396d4e8f0d54350b0481
author | richard-burhans |
---|---|
date | Tue, 30 Apr 2024 21:06:58 +0000 |
parents | |
children | 007990f98551 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:103538753e81 |
---|---|
1 #!/usr/bin/env python | |
2 | |
3 import argparse | |
4 import concurrent.futures | |
5 import json | |
6 import multiprocessing | |
7 import os | |
8 import queue | |
9 import re | |
10 import shutil | |
11 import sys | |
12 import subprocess | |
13 import tarfile | |
14 import tempfile | |
15 import typing | |
16 import time | |
17 | |
18 | |
19 lastz_output_format_regex = re.compile( | |
20 r"^(?:axt\+?|blastn|cigar|differences|general-?.+|lav|lav\+text|maf[-+]?|none|paf(?::wfmash)?|rdotplot|sam-?|softsam-?|text)$", | |
21 re.IGNORECASE, | |
22 ) | |
23 | |
24 | |
25 # Specifies the output format: lav, lav+text, axt, axt+, maf, maf+, maf-, sam, softsam, sam-, softsam-, cigar, BLASTN, PAF, PAF:wfmash, differences, rdotplot, text, general[:<fields>], or general-[:<fields>]. | |
26 # ‑‑format=none can be used when no alignment output is desired. | |
27 | |
28 | |
29 def run_command( | |
30 instance: int, | |
31 input_queue: "queue.Queue[typing.Dict[str, typing.Any]]", | |
32 output_queue: "queue.Queue[float]", | |
33 debug: bool = False, | |
34 ) -> None: | |
35 os.chdir("galaxy/files") | |
36 | |
37 while True: | |
38 command_dict = input_queue.get() | |
39 | |
40 if not command_dict: | |
41 return | |
42 | |
43 args = ["lastz"] | |
44 args.extend(command_dict["args"]) | |
45 | |
46 stdin = command_dict["stdin"] | |
47 if stdin is not None: | |
48 stdin = open(stdin, "r") | |
49 | |
50 stdout = command_dict["stdout"] | |
51 if stdout is not None: | |
52 stdout = open(stdout, "w") | |
53 | |
54 stderr = command_dict["stderr"] | |
55 if stderr is not None: | |
56 stderr = open(stderr, "w") | |
57 | |
58 begin = time.perf_counter() | |
59 p = subprocess.run(args, stdin=stdin, stdout=stdout, stderr=stderr) | |
60 | |
61 for var in [stdin, stdout, stderr]: | |
62 if var is not None: | |
63 var.close() | |
64 | |
65 if p.returncode != 0: | |
66 sys.exit(f"command failed: {' '.join(args)}") | |
67 else: | |
68 stderr = command_dict["stderr"] | |
69 if stderr is not None: | |
70 try: | |
71 statinfo = os.stat(stderr, follow_symlinks=False) | |
72 except: | |
73 statinfo = None | |
74 | |
75 if statinfo is None: | |
76 sys.exit(f"unable to stat stderr file: {' '.join(args)}") | |
77 | |
78 if statinfo.st_size != 0: | |
79 sys.exit(f"stderr file is not empty: {' '.join(args)}") | |
80 | |
81 elapsed = time.perf_counter() - begin | |
82 output_queue.put(elapsed) | |
83 | |
84 if debug: | |
85 print(f"runtime {elapsed}", file=sys.stderr, flush=True) | |
86 | |
87 | |
88 class BatchTar: | |
89 def __init__(self, pathname: str, debug: bool = False) -> None: | |
90 self.pathname = pathname | |
91 self.debug = debug | |
92 self.tarfile = None | |
93 self.commands: typing.List[typing.Dict[str, typing.Any]] = [] | |
94 self._extract() | |
95 self._load_commands() | |
96 | |
97 def batch_commands(self) -> typing.Iterator[typing.Dict[str, typing.Any]]: | |
98 for command in self.commands: | |
99 yield command | |
100 | |
101 def _load_commands(self) -> None: | |
102 try: | |
103 f = open("galaxy/commands.json") | |
104 except FileNotFoundError: | |
105 sys.exit( | |
106 f"ERROR: input tarball missing galaxy/commands.json: {self.pathname}" | |
107 ) | |
108 | |
109 begin = time.perf_counter() | |
110 for json_line in f: | |
111 json_line = json_line.rstrip("\n") | |
112 try: | |
113 command_dict = json.loads(json_line) | |
114 except json.JSONDecodeError: | |
115 sys.exit( | |
116 f"ERROR: bad json line in galaxy/commands.json: {self.pathname}" | |
117 ) | |
118 | |
119 self._load_command(command_dict) | |
120 | |
121 f.close() | |
122 elapsed = time.perf_counter() - begin | |
123 | |
124 if self.debug: | |
125 print( | |
126 f"loaded {len(self.commands)} commands in {elapsed} seconds ", | |
127 file=sys.stderr, | |
128 flush=True, | |
129 ) | |
130 | |
131 def _load_command(self, command_dict: typing.Dict[str, typing.Any]) -> None: | |
132 # check command_dict structure | |
133 field_types: typing.Dict[str, typing.List[typing.Any]] = { | |
134 "executable": [str], | |
135 "args": [list], | |
136 "stdin": [str, "None"], | |
137 "stdout": [str, "None"], | |
138 "stderr": [str, "None"], | |
139 } | |
140 | |
141 bad_format = False | |
142 for field_name in field_types.keys(): | |
143 # missing field | |
144 if field_name not in command_dict: | |
145 bad_format = True | |
146 break | |
147 | |
148 # incorrect field type | |
149 good_type = False | |
150 for field_type in field_types[field_name]: | |
151 if isinstance(field_type, str) and field_type == "None": | |
152 if command_dict[field_name] is None: | |
153 good_type = True | |
154 break | |
155 elif isinstance(command_dict[field_name], field_type): | |
156 good_type = True | |
157 break | |
158 | |
159 if good_type is False: | |
160 bad_format = True | |
161 | |
162 if not bad_format: | |
163 # all args must be strings | |
164 for arg in command_dict["args"]: | |
165 if not isinstance(arg, str): | |
166 bad_format = True | |
167 break | |
168 | |
169 if bad_format: | |
170 sys.exit( | |
171 f"ERROR: unexpected json format in line in galaxy/commands.json: {self.pathname}" | |
172 ) | |
173 | |
174 self.commands.append(command_dict) | |
175 | |
176 def _extract(self) -> None: | |
177 try: | |
178 self.tarball = tarfile.open( | |
179 name=self.pathname, mode="r:*", format=tarfile.GNU_FORMAT | |
180 ) | |
181 except FileNotFoundError: | |
182 sys.exit(f"ERROR: unable to find input tarball: {self.pathname}") | |
183 except tarfile.ReadError: | |
184 sys.exit(f"ERROR: error reading input tarball: {self.pathname}") | |
185 | |
186 begin = time.perf_counter() | |
187 self.tarball.extractall(filter="data") | |
188 self.tarball.close() | |
189 elapsed = time.perf_counter() - begin | |
190 | |
191 if self.debug: | |
192 print( | |
193 f"Extracted tarball in {elapsed} seconds", file=sys.stderr, flush=True | |
194 ) | |
195 | |
196 | |
197 class TarRunner: | |
198 def __init__( | |
199 self, | |
200 input_pathname: str, | |
201 output_pathname: str, | |
202 parallel: int, | |
203 debug: bool = False, | |
204 ) -> None: | |
205 self.input_pathname = input_pathname | |
206 self.output_pathname = output_pathname | |
207 self.parallel = parallel | |
208 self.debug = debug | |
209 self.batch_tar = BatchTar(self.input_pathname, debug=self.debug) | |
210 self.output_file_format: typing.Dict[str, str] = {} | |
211 self.output_files: typing.Dict[str, typing.List[str]] = {} | |
212 self._set_output() | |
213 self._set_target_query() | |
214 | |
215 def _set_output(self) -> None: | |
216 for command_dict in self.batch_tar.batch_commands(): | |
217 output_file = None | |
218 output_format = None | |
219 | |
220 for arg in command_dict["args"]: | |
221 if arg.startswith("--format="): | |
222 output_format = arg[9:] | |
223 elif arg.startswith("--output="): | |
224 output_file = arg[9:] | |
225 | |
226 if output_file is None: | |
227 f = tempfile.NamedTemporaryFile(dir="galaxy/files", delete=False) | |
228 output_file = os.path.basename(f.name) | |
229 f.close() | |
230 command_dict["args"].append(f"--output={output_file}") | |
231 | |
232 if output_format is None: | |
233 output_format = "lav" | |
234 command_dict["args"].append(f"--format={output_format}") | |
235 | |
236 if not lastz_output_format_regex.match(output_format): | |
237 sys.exit(f"ERROR: invalid output format: {output_format}") | |
238 | |
239 self.output_file_format[output_file] = output_format | |
240 | |
241 for output_file, output_format in self.output_file_format.items(): | |
242 self.output_files.setdefault(output_format, []) | |
243 self.output_files[output_format].append(output_file) | |
244 | |
245 def _set_target_query(self) -> None: | |
246 for command_dict in self.batch_tar.batch_commands(): | |
247 new_args: typing.List[str] = [] | |
248 | |
249 for arg in command_dict["args"]: | |
250 if arg.startswith("--target="): | |
251 new_args.insert(0, arg[9:]) | |
252 elif arg.startswith("--query="): | |
253 new_args.insert(1, arg[8:]) | |
254 else: | |
255 new_args.append(arg) | |
256 | |
257 command_dict["args"] = new_args | |
258 | |
259 def run(self) -> None: | |
260 run_times = [] | |
261 begin = time.perf_counter() | |
262 | |
263 with multiprocessing.Manager() as manager: | |
264 input_queue: queue.Queue[typing.Dict[str, typing.Any]] = manager.Queue() | |
265 output_queue: queue.Queue[float] = manager.Queue() | |
266 | |
267 for command_dict in self.batch_tar.batch_commands(): | |
268 input_queue.put(command_dict) | |
269 | |
270 # use the empty dict as a sentinel | |
271 for _ in range(self.parallel): | |
272 input_queue.put({}) | |
273 | |
274 with concurrent.futures.ProcessPoolExecutor( | |
275 max_workers=self.parallel | |
276 ) as executor: | |
277 futures = [ | |
278 executor.submit( | |
279 run_command, | |
280 instance, | |
281 input_queue, | |
282 output_queue, | |
283 debug=self.debug, | |
284 ) | |
285 for instance in range(self.parallel) | |
286 ] | |
287 | |
288 for f in concurrent.futures.as_completed(futures): | |
289 if not f.done() or f.cancelled() or f.exception() is not None: | |
290 sys.exit("lastz command failed") | |
291 | |
292 while not output_queue.empty(): | |
293 run_time = output_queue.get() | |
294 run_times.append(run_time) | |
295 | |
296 elapsed = time.perf_counter() - begin | |
297 | |
298 if self.debug: | |
299 print(f"elapsed {elapsed}", file=sys.stderr, flush=True) | |
300 | |
301 self._cleanup() | |
302 | |
303 def _cleanup(self) -> None: | |
304 num_output_files = len(self.output_files.keys()) | |
305 | |
306 for file_type, file_list in self.output_files.items(): | |
307 with open(f"output.{file_type}", "w") as ofh: | |
308 for filename in file_list: | |
309 with open(f"galaxy/files/{filename}") as ifh: | |
310 for line in ifh: | |
311 ofh.write(line) | |
312 | |
313 if num_output_files == 1: | |
314 file_type = list(self.output_files.keys())[0] | |
315 src_filename = f"output.{file_type}" | |
316 shutil.copy2(src_filename, self.output_pathname) | |
317 | |
318 | |
319 def main() -> None: | |
320 if not hasattr(tarfile, "data_filter"): | |
321 sys.exit("ERROR: extracting may be unsafe; consider updating Python") | |
322 | |
323 parser = argparse.ArgumentParser() | |
324 parser.add_argument("--input", type=str, required=True) | |
325 parser.add_argument("--output", type=str, required=True) | |
326 parser.add_argument("--parallel", type=int, default=1, required=False) | |
327 parser.add_argument("--debug", action="store_true", required=False) | |
328 | |
329 args = parser.parse_args() | |
330 runner = TarRunner(args.input, args.output, args.parallel, args.debug) | |
331 runner.run() | |
332 | |
333 | |
334 if __name__ == "__main__": | |
335 main() |