Mercurial > repos > richard-burhans > segalign
comparison runner.py @ 9:08e987868f0f draft
planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit 062a761a340e095ea7ef7ed7cd1d3d55b1fdc5c4
author | richard-burhans |
---|---|
date | Wed, 10 Jul 2024 17:06:45 +0000 |
parents | 150de8a3954a |
children | ae2cd39594eb |
comparison
equal
deleted
inserted
replaced
8:150de8a3954a | 9:08e987868f0f |
---|---|
299 if args.debug: | 299 if args.debug: |
300 print(f"estimated chunk size: {chunk_size}", file=sys.stderr, flush=True) | 300 print(f"estimated chunk size: {chunk_size}", file=sys.stderr, flush=True) |
301 | 301 |
302 with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor: | 302 with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor: |
303 for i in range(num_workers): | 303 for i in range(num_workers): |
304 executor.submit(diagonal_partition_worker(input_q, output_q, chunk_size, i)) | 304 executor.submit(diagonal_partition_worker(args, input_q, output_q, chunk_size, i)) |
305 | 305 |
306 | 306 |
307 def diagonal_partition_worker(input_q: queue.Queue[str], output_q: queue.Queue[str], chunk_size: int, instance: int) -> None: | 307 def diagonal_partition_worker(args: argparse.Namespace, input_q: queue.Queue[str], output_q: queue.Queue[str], chunk_size: int, instance: int) -> None: |
308 while True: | 308 while True: |
309 line = input_q.get() | 309 line = input_q.get() |
310 if line == SENTINEL_VALUE: | 310 if line == SENTINEL_VALUE: |
311 input_q.task_done() | 311 input_q.task_done() |
312 break | 312 break |
313 | 313 |
314 run_args = ["python", "/jetstream2/scratch/rico/job-dir/tool_files/diagonal_partition.py", str(chunk_size)] | 314 run_args = ["python", f"{args.tool_directory}/diagonal_partition.py", str(chunk_size)] |
315 for word in line.split(): | 315 for word in line.split(): |
316 run_args.append(word) | 316 run_args.append(word) |
317 process = subprocess.run(run_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | 317 process = subprocess.run(run_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) |
318 | 318 |
319 for line in process.stdout.splitlines(): | 319 for line in process.stdout.splitlines(): |
378 if not skip_segalign: | 378 if not skip_segalign: |
379 run_args = ["segalign"] | 379 run_args = ["segalign"] |
380 run_args.extend(segalign_args) | 380 run_args.extend(segalign_args) |
381 run_args.append("--num_threads") | 381 run_args.append("--num_threads") |
382 run_args.append(str(args.num_cpu)) | 382 run_args.append(str(args.num_cpu)) |
383 run_args.append("work/") | |
383 | 384 |
384 if args.debug: | 385 if args.debug: |
385 beg: int = time.monotonic_ns() | 386 beg: int = time.monotonic_ns() |
386 r_beg = resource.getrusage(resource.RUSAGE_CHILDREN) | 387 r_beg = resource.getrusage(resource.RUSAGE_CHILDREN) |
387 | 388 |