comparison runner.py @ 9:08e987868f0f draft

planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit 062a761a340e095ea7ef7ed7cd1d3d55b1fdc5c4
author richard-burhans
date Wed, 10 Jul 2024 17:06:45 +0000
parents 150de8a3954a
children ae2cd39594eb
comparison
equal deleted inserted replaced
8:150de8a3954a 9:08e987868f0f
299 if args.debug: 299 if args.debug:
300 print(f"estimated chunk size: {chunk_size}", file=sys.stderr, flush=True) 300 print(f"estimated chunk size: {chunk_size}", file=sys.stderr, flush=True)
301 301
302 with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor: 302 with concurrent.futures.ProcessPoolExecutor(max_workers=num_workers) as executor:
303 for i in range(num_workers): 303 for i in range(num_workers):
304 executor.submit(diagonal_partition_worker(input_q, output_q, chunk_size, i)) 304 executor.submit(diagonal_partition_worker(args, input_q, output_q, chunk_size, i))
305 305
306 306
307 def diagonal_partition_worker(input_q: queue.Queue[str], output_q: queue.Queue[str], chunk_size: int, instance: int) -> None: 307 def diagonal_partition_worker(args: argparse.Namespace, input_q: queue.Queue[str], output_q: queue.Queue[str], chunk_size: int, instance: int) -> None:
308 while True: 308 while True:
309 line = input_q.get() 309 line = input_q.get()
310 if line == SENTINEL_VALUE: 310 if line == SENTINEL_VALUE:
311 input_q.task_done() 311 input_q.task_done()
312 break 312 break
313 313
314 run_args = ["python", "/jetstream2/scratch/rico/job-dir/tool_files/diagonal_partition.py", str(chunk_size)] 314 run_args = ["python", f"{args.tool_directory}/diagonal_partition.py", str(chunk_size)]
315 for word in line.split(): 315 for word in line.split():
316 run_args.append(word) 316 run_args.append(word)
317 process = subprocess.run(run_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) 317 process = subprocess.run(run_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
318 318
319 for line in process.stdout.splitlines(): 319 for line in process.stdout.splitlines():
378 if not skip_segalign: 378 if not skip_segalign:
379 run_args = ["segalign"] 379 run_args = ["segalign"]
380 run_args.extend(segalign_args) 380 run_args.extend(segalign_args)
381 run_args.append("--num_threads") 381 run_args.append("--num_threads")
382 run_args.append(str(args.num_cpu)) 382 run_args.append(str(args.num_cpu))
383 run_args.append("work/")
383 384
384 if args.debug: 385 if args.debug:
385 beg: int = time.monotonic_ns() 386 beg: int = time.monotonic_ns()
386 r_beg = resource.getrusage(resource.RUSAGE_CHILDREN) 387 r_beg = resource.getrusage(resource.RUSAGE_CHILDREN)
387 388