Mercurial > repos > richard-burhans > segalign
diff diagonal_partition.py @ 4:36cafb694dd2 draft
planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit b8aa943b38b865defab8a27e4404bb8a2131f919
author | richard-burhans |
---|---|
date | Tue, 23 Apr 2024 22:39:23 +0000 |
parents | 9e34b25a8670 |
children |
line wrap: on
line diff
--- a/diagonal_partition.py Thu Apr 18 17:22:46 2024 +0000 +++ b/diagonal_partition.py Tue Apr 23 22:39:23 2024 +0000 @@ -9,9 +9,20 @@ import os import sys +import typing + +T = typing.TypeVar("T", bound="_Sliceable") -def chunks(lst, n): +class _Sliceable(typing.Protocol): + def __len__(self) -> int: + ... + + def __getitem__(self: T, i: slice) -> T: + ... + + +def chunks(lst: T, n: int) -> typing.Iterator[T]: """Yield successive n-sized chunks from list.""" for i in range(0, len(lst), n): yield lst[i: i + n] @@ -29,7 +40,7 @@ # Parsing command output from SegAlign segment_key = "--segments=" segment_index = None - input_file = None + input_file: typing.Optional[str] = None for index, value in enumerate(params): if value[: len(segment_key)] == segment_key: @@ -40,7 +51,7 @@ print(f"Error: could not segment key {segment_key} in parameters {params}") exit(0) - if not os.path.isfile(input_file): + if input_file is None or not os.path.isfile(input_file): print(f"Error: File {input_file} does not exist") exit(0) @@ -52,9 +63,9 @@ # Find rest of relevant parameters output_key = "--output=" - output_index = None + output_index: typing.Optional[int] = None output_alignment_file = None - output_alignment_file_base = None + output_alignment_file_base: typing.Optional[str] = None output_format = None strand_key = "--strand=" @@ -78,7 +89,9 @@ err_index = -1 # error file is at very end err_name_base = params[-1].split(".err", 1)[0] - data = {} # dict of list of tuple (x, y, str) + data: typing.Dict[ + typing.Tuple[str, str], typing.List[typing.Tuple[int, int, str]] + ] = {} # dict of list of tuple (x, y, str) direction = None if "plus" in params[strand_index]: @@ -153,13 +166,10 @@ # update segment file in command params[segment_index] = segment_key + fname # update output file in command - params[output_index] = ( - output_key - + output_alignment_file_base - + name_addition - + "." - + output_format - ) + if output_index is not None: + params[output_index] = ( + f"{output_key}{output_alignment_file_base}{name_addition}.{output_format}" + ) # update error file in command params[-1] = err_name_base + name_addition + ".err" print(" ".join(params), flush=True) @@ -169,7 +179,9 @@ skip_pairs_with_len = sorted( [(len(data[p]), p) for p in skip_pairs] ) # list of tuples of (pair length, pair) - aggregated_skip_pairs = [] # list of list of pair names + aggregated_skip_pairs: typing.List[typing.List[typing.Any]] = ( + [] + ) # list of list of pair names current_count = 0 aggregated_skip_pairs.append([]) for count, pair in skip_pairs_with_len: @@ -192,13 +204,10 @@ # update segment file in command params[segment_index] = segment_key + fname # update output file in command - params[output_index] = ( - output_key - + output_alignment_file_base - + name_addition - + "." - + output_format - ) + if output_index is not None: + params[output_index] = ( + f"{output_key}{output_alignment_file_base}{name_addition}.{output_format}" + ) # update error file in command params[-1] = err_name_base + name_addition + ".err" print(" ".join(params), flush=True)