comparison run_lastz_tarball.py @ 0:103538753e81 draft

planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/batched_lastz commit 7b119b432f721e228a73396d4e8f0d54350b0481
author richard-burhans
date Tue, 30 Apr 2024 21:06:58 +0000
parents
children 007990f98551
comparison
equal deleted inserted replaced
-1:000000000000 0:103538753e81
1 #!/usr/bin/env python
2
3 import argparse
4 import concurrent.futures
5 import json
6 import multiprocessing
7 import os
8 import queue
9 import re
10 import shutil
11 import sys
12 import subprocess
13 import tarfile
14 import tempfile
15 import typing
16 import time
17
18
19 lastz_output_format_regex = re.compile(
20 r"^(?:axt\+?|blastn|cigar|differences|general-?.+|lav|lav\+text|maf[-+]?|none|paf(?::wfmash)?|rdotplot|sam-?|softsam-?|text)$",
21 re.IGNORECASE,
22 )
23
24
25 # Specifies the output format: lav, lav+text, axt, axt+, maf, maf+, maf-, sam, softsam, sam-, softsam-, cigar, BLASTN, PAF, PAF:wfmash, differences, rdotplot, text, general[:<fields>], or general-[:<fields>].
26 # ‑‑format=none can be used when no alignment output is desired.
27
28
29 def run_command(
30 instance: int,
31 input_queue: "queue.Queue[typing.Dict[str, typing.Any]]",
32 output_queue: "queue.Queue[float]",
33 debug: bool = False,
34 ) -> None:
35 os.chdir("galaxy/files")
36
37 while True:
38 command_dict = input_queue.get()
39
40 if not command_dict:
41 return
42
43 args = ["lastz"]
44 args.extend(command_dict["args"])
45
46 stdin = command_dict["stdin"]
47 if stdin is not None:
48 stdin = open(stdin, "r")
49
50 stdout = command_dict["stdout"]
51 if stdout is not None:
52 stdout = open(stdout, "w")
53
54 stderr = command_dict["stderr"]
55 if stderr is not None:
56 stderr = open(stderr, "w")
57
58 begin = time.perf_counter()
59 p = subprocess.run(args, stdin=stdin, stdout=stdout, stderr=stderr)
60
61 for var in [stdin, stdout, stderr]:
62 if var is not None:
63 var.close()
64
65 if p.returncode != 0:
66 sys.exit(f"command failed: {' '.join(args)}")
67 else:
68 stderr = command_dict["stderr"]
69 if stderr is not None:
70 try:
71 statinfo = os.stat(stderr, follow_symlinks=False)
72 except:
73 statinfo = None
74
75 if statinfo is None:
76 sys.exit(f"unable to stat stderr file: {' '.join(args)}")
77
78 if statinfo.st_size != 0:
79 sys.exit(f"stderr file is not empty: {' '.join(args)}")
80
81 elapsed = time.perf_counter() - begin
82 output_queue.put(elapsed)
83
84 if debug:
85 print(f"runtime {elapsed}", file=sys.stderr, flush=True)
86
87
88 class BatchTar:
89 def __init__(self, pathname: str, debug: bool = False) -> None:
90 self.pathname = pathname
91 self.debug = debug
92 self.tarfile = None
93 self.commands: typing.List[typing.Dict[str, typing.Any]] = []
94 self._extract()
95 self._load_commands()
96
97 def batch_commands(self) -> typing.Iterator[typing.Dict[str, typing.Any]]:
98 for command in self.commands:
99 yield command
100
101 def _load_commands(self) -> None:
102 try:
103 f = open("galaxy/commands.json")
104 except FileNotFoundError:
105 sys.exit(
106 f"ERROR: input tarball missing galaxy/commands.json: {self.pathname}"
107 )
108
109 begin = time.perf_counter()
110 for json_line in f:
111 json_line = json_line.rstrip("\n")
112 try:
113 command_dict = json.loads(json_line)
114 except json.JSONDecodeError:
115 sys.exit(
116 f"ERROR: bad json line in galaxy/commands.json: {self.pathname}"
117 )
118
119 self._load_command(command_dict)
120
121 f.close()
122 elapsed = time.perf_counter() - begin
123
124 if self.debug:
125 print(
126 f"loaded {len(self.commands)} commands in {elapsed} seconds ",
127 file=sys.stderr,
128 flush=True,
129 )
130
131 def _load_command(self, command_dict: typing.Dict[str, typing.Any]) -> None:
132 # check command_dict structure
133 field_types: typing.Dict[str, typing.List[typing.Any]] = {
134 "executable": [str],
135 "args": [list],
136 "stdin": [str, "None"],
137 "stdout": [str, "None"],
138 "stderr": [str, "None"],
139 }
140
141 bad_format = False
142 for field_name in field_types.keys():
143 # missing field
144 if field_name not in command_dict:
145 bad_format = True
146 break
147
148 # incorrect field type
149 good_type = False
150 for field_type in field_types[field_name]:
151 if isinstance(field_type, str) and field_type == "None":
152 if command_dict[field_name] is None:
153 good_type = True
154 break
155 elif isinstance(command_dict[field_name], field_type):
156 good_type = True
157 break
158
159 if good_type is False:
160 bad_format = True
161
162 if not bad_format:
163 # all args must be strings
164 for arg in command_dict["args"]:
165 if not isinstance(arg, str):
166 bad_format = True
167 break
168
169 if bad_format:
170 sys.exit(
171 f"ERROR: unexpected json format in line in galaxy/commands.json: {self.pathname}"
172 )
173
174 self.commands.append(command_dict)
175
176 def _extract(self) -> None:
177 try:
178 self.tarball = tarfile.open(
179 name=self.pathname, mode="r:*", format=tarfile.GNU_FORMAT
180 )
181 except FileNotFoundError:
182 sys.exit(f"ERROR: unable to find input tarball: {self.pathname}")
183 except tarfile.ReadError:
184 sys.exit(f"ERROR: error reading input tarball: {self.pathname}")
185
186 begin = time.perf_counter()
187 self.tarball.extractall(filter="data")
188 self.tarball.close()
189 elapsed = time.perf_counter() - begin
190
191 if self.debug:
192 print(
193 f"Extracted tarball in {elapsed} seconds", file=sys.stderr, flush=True
194 )
195
196
197 class TarRunner:
198 def __init__(
199 self,
200 input_pathname: str,
201 output_pathname: str,
202 parallel: int,
203 debug: bool = False,
204 ) -> None:
205 self.input_pathname = input_pathname
206 self.output_pathname = output_pathname
207 self.parallel = parallel
208 self.debug = debug
209 self.batch_tar = BatchTar(self.input_pathname, debug=self.debug)
210 self.output_file_format: typing.Dict[str, str] = {}
211 self.output_files: typing.Dict[str, typing.List[str]] = {}
212 self._set_output()
213 self._set_target_query()
214
215 def _set_output(self) -> None:
216 for command_dict in self.batch_tar.batch_commands():
217 output_file = None
218 output_format = None
219
220 for arg in command_dict["args"]:
221 if arg.startswith("--format="):
222 output_format = arg[9:]
223 elif arg.startswith("--output="):
224 output_file = arg[9:]
225
226 if output_file is None:
227 f = tempfile.NamedTemporaryFile(dir="galaxy/files", delete=False)
228 output_file = os.path.basename(f.name)
229 f.close()
230 command_dict["args"].append(f"--output={output_file}")
231
232 if output_format is None:
233 output_format = "lav"
234 command_dict["args"].append(f"--format={output_format}")
235
236 if not lastz_output_format_regex.match(output_format):
237 sys.exit(f"ERROR: invalid output format: {output_format}")
238
239 self.output_file_format[output_file] = output_format
240
241 for output_file, output_format in self.output_file_format.items():
242 self.output_files.setdefault(output_format, [])
243 self.output_files[output_format].append(output_file)
244
245 def _set_target_query(self) -> None:
246 for command_dict in self.batch_tar.batch_commands():
247 new_args: typing.List[str] = []
248
249 for arg in command_dict["args"]:
250 if arg.startswith("--target="):
251 new_args.insert(0, arg[9:])
252 elif arg.startswith("--query="):
253 new_args.insert(1, arg[8:])
254 else:
255 new_args.append(arg)
256
257 command_dict["args"] = new_args
258
259 def run(self) -> None:
260 run_times = []
261 begin = time.perf_counter()
262
263 with multiprocessing.Manager() as manager:
264 input_queue: queue.Queue[typing.Dict[str, typing.Any]] = manager.Queue()
265 output_queue: queue.Queue[float] = manager.Queue()
266
267 for command_dict in self.batch_tar.batch_commands():
268 input_queue.put(command_dict)
269
270 # use the empty dict as a sentinel
271 for _ in range(self.parallel):
272 input_queue.put({})
273
274 with concurrent.futures.ProcessPoolExecutor(
275 max_workers=self.parallel
276 ) as executor:
277 futures = [
278 executor.submit(
279 run_command,
280 instance,
281 input_queue,
282 output_queue,
283 debug=self.debug,
284 )
285 for instance in range(self.parallel)
286 ]
287
288 for f in concurrent.futures.as_completed(futures):
289 if not f.done() or f.cancelled() or f.exception() is not None:
290 sys.exit("lastz command failed")
291
292 while not output_queue.empty():
293 run_time = output_queue.get()
294 run_times.append(run_time)
295
296 elapsed = time.perf_counter() - begin
297
298 if self.debug:
299 print(f"elapsed {elapsed}", file=sys.stderr, flush=True)
300
301 self._cleanup()
302
303 def _cleanup(self) -> None:
304 num_output_files = len(self.output_files.keys())
305
306 for file_type, file_list in self.output_files.items():
307 with open(f"output.{file_type}", "w") as ofh:
308 for filename in file_list:
309 with open(f"galaxy/files/{filename}") as ifh:
310 for line in ifh:
311 ofh.write(line)
312
313 if num_output_files == 1:
314 file_type = list(self.output_files.keys())[0]
315 src_filename = f"output.{file_type}"
316 shutil.copy2(src_filename, self.output_pathname)
317
318
319 def main() -> None:
320 if not hasattr(tarfile, "data_filter"):
321 sys.exit("ERROR: extracting may be unsafe; consider updating Python")
322
323 parser = argparse.ArgumentParser()
324 parser.add_argument("--input", type=str, required=True)
325 parser.add_argument("--output", type=str, required=True)
326 parser.add_argument("--parallel", type=int, default=1, required=False)
327 parser.add_argument("--debug", action="store_true", required=False)
328
329 args = parser.parse_args()
330 runner = TarRunner(args.input, args.output, args.parallel, args.debug)
331 runner.run()
332
333
334 if __name__ == "__main__":
335 main()