Mercurial > repos > richard-burhans > segalign
comparison runner.py @ 21:25fa179d9d0a draft
planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit e4e05d23d9da18ea87bc352122ca9e6cfa73d1c7
author | richard-burhans |
---|---|
date | Fri, 09 Aug 2024 20:23:12 +0000 |
parents | 96ff17622b17 |
children |
comparison
equal
deleted
inserted
replaced
20:96ff17622b17 | 21:25fa179d9d0a |
---|---|
312 break | 312 break |
313 | 313 |
314 run_args = ["python", f"{args.tool_directory}/diagonal_partition.py", str(chunk_size)] | 314 run_args = ["python", f"{args.tool_directory}/diagonal_partition.py", str(chunk_size)] |
315 for word in line.split(): | 315 for word in line.split(): |
316 run_args.append(word) | 316 run_args.append(word) |
317 process = subprocess.run(run_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | 317 process = subprocess.run(run_args, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1, text=True) |
318 | 318 |
319 for line in process.stdout.splitlines(): | 319 for line in process.stdout.splitlines(): |
320 output_q.put(line) | 320 output_q.put(line) |
321 | 321 |
322 for line in process.stderr.splitlines(): | 322 for line in process.stderr.splitlines(): |
325 if process.returncode != 0: | 325 if process.returncode != 0: |
326 sys.exit(f"Error: diagonal partitioner {instance} exited with returncode {process.returncode}") | 326 sys.exit(f"Error: diagonal partitioner {instance} exited with returncode {process.returncode}") |
327 | 327 |
328 | 328 |
329 def estimate_chunk_size(args: argparse.Namespace) -> int: | 329 def estimate_chunk_size(args: argparse.Namespace) -> int: |
330 # only used when segment size is being estimated | |
331 MAX_CHUNK_SIZE = 50000 | |
330 chunk_size = -1 | 332 chunk_size = -1 |
331 line_size = -1 | 333 line_size = -1 |
332 | 334 |
333 if args.debug: | 335 if args.debug: |
334 r_beg = resource.getrusage(resource.RUSAGE_SELF) | 336 r_beg = resource.getrusage(resource.RUSAGE_SELF) |
351 except FileNotFoundError: | 353 except FileNotFoundError: |
352 continue | 354 continue |
353 | 355 |
354 fdict[entry.name.split(".split", 1)[0]] += file_size | 356 fdict[entry.name.split(".split", 1)[0]] += file_size |
355 | 357 |
356 # if noot enough segment files for estimation, continue | 358 if len(fdict) < 7: |
357 if len(fdict) > 2: | 359 # outliers can heavily skew prediction if <7 data points |
360 # to be safe, use 50% quantile | |
361 chunk_size = int(statistics.quantiles(fdict.values())[1] // line_size) | |
362 else: | |
363 # otherwise use 75% quantile | |
358 chunk_size = int(statistics.quantiles(fdict.values())[-1] // line_size) | 364 chunk_size = int(statistics.quantiles(fdict.values())[-1] // line_size) |
365 | |
366 # if not enough data points, there is a chance of getting unlucky | |
367 # minimize worst case by using MAX_CHUNK_SIZE | |
368 | |
369 chunk_size = min(chunk_size, MAX_CHUNK_SIZE) | |
359 | 370 |
360 if args.debug: | 371 if args.debug: |
361 ns: int = time.monotonic_ns() - beg | 372 ns: int = time.monotonic_ns() - beg |
362 r_end = resource.getrusage(resource.RUSAGE_SELF) | 373 r_end = resource.getrusage(resource.RUSAGE_SELF) |
363 print(f"estimate chunk size clock time: {ns} ns", file=sys.stderr, flush=True) | 374 print(f"estimate chunk size clock time: {ns} ns", file=sys.stderr, flush=True) |