Mercurial > repos > richard-burhans > segalign
changeset 4:36cafb694dd2 draft
planemo upload for repository https://github.com/richard-burhans/galaxytools/tree/main/tools/segalign commit b8aa943b38b865defab8a27e4404bb8a2131f919
author | richard-burhans |
---|---|
date | Tue, 23 Apr 2024 22:39:23 +0000 |
parents | 6f46cebc9ed8 |
children | 75e15ba3b4c1 |
files | diagonal_partition.py package_output.py run_segalign_diagonal_partition segalign.xml |
diffstat | 4 files changed, 39 insertions(+), 30 deletions(-) [+] |
line wrap: on
line diff
--- a/diagonal_partition.py Thu Apr 18 17:22:46 2024 +0000 +++ b/diagonal_partition.py Tue Apr 23 22:39:23 2024 +0000 @@ -9,9 +9,20 @@ import os import sys +import typing + +T = typing.TypeVar("T", bound="_Sliceable") -def chunks(lst, n): +class _Sliceable(typing.Protocol): + def __len__(self) -> int: + ... + + def __getitem__(self: T, i: slice) -> T: + ... + + +def chunks(lst: T, n: int) -> typing.Iterator[T]: """Yield successive n-sized chunks from list.""" for i in range(0, len(lst), n): yield lst[i: i + n] @@ -29,7 +40,7 @@ # Parsing command output from SegAlign segment_key = "--segments=" segment_index = None - input_file = None + input_file: typing.Optional[str] = None for index, value in enumerate(params): if value[: len(segment_key)] == segment_key: @@ -40,7 +51,7 @@ print(f"Error: could not segment key {segment_key} in parameters {params}") exit(0) - if not os.path.isfile(input_file): + if input_file is None or not os.path.isfile(input_file): print(f"Error: File {input_file} does not exist") exit(0) @@ -52,9 +63,9 @@ # Find rest of relevant parameters output_key = "--output=" - output_index = None + output_index: typing.Optional[int] = None output_alignment_file = None - output_alignment_file_base = None + output_alignment_file_base: typing.Optional[str] = None output_format = None strand_key = "--strand=" @@ -78,7 +89,9 @@ err_index = -1 # error file is at very end err_name_base = params[-1].split(".err", 1)[0] - data = {} # dict of list of tuple (x, y, str) + data: typing.Dict[ + typing.Tuple[str, str], typing.List[typing.Tuple[int, int, str]] + ] = {} # dict of list of tuple (x, y, str) direction = None if "plus" in params[strand_index]: @@ -153,13 +166,10 @@ # update segment file in command params[segment_index] = segment_key + fname # update output file in command - params[output_index] = ( - output_key - + output_alignment_file_base - + name_addition - + "." - + output_format - ) + if output_index is not None: + params[output_index] = ( + f"{output_key}{output_alignment_file_base}{name_addition}.{output_format}" + ) # update error file in command params[-1] = err_name_base + name_addition + ".err" print(" ".join(params), flush=True) @@ -169,7 +179,9 @@ skip_pairs_with_len = sorted( [(len(data[p]), p) for p in skip_pairs] ) # list of tuples of (pair length, pair) - aggregated_skip_pairs = [] # list of list of pair names + aggregated_skip_pairs: typing.List[typing.List[typing.Any]] = ( + [] + ) # list of list of pair names current_count = 0 aggregated_skip_pairs.append([]) for count, pair in skip_pairs_with_len: @@ -192,13 +204,10 @@ # update segment file in command params[segment_index] = segment_key + fname # update output file in command - params[output_index] = ( - output_key - + output_alignment_file_base - + name_addition - + "." - + output_format - ) + if output_index is not None: + params[output_index] = ( + f"{output_key}{output_alignment_file_base}{name_addition}.{output_format}" + ) # update error file in command params[-1] = err_name_base + name_addition + ".err" print(" ".join(params), flush=True)
--- a/package_output.py Thu Apr 18 17:22:46 2024 +0000 +++ b/package_output.py Tue Apr 23 22:39:23 2024 +0000 @@ -33,7 +33,7 @@ name=self.pathname, mode="w:gz", format=tarfile.GNU_FORMAT, - compresslevel=1, + compresslevel=6, ) def add_config(self, pathname: str) -> None: @@ -152,7 +152,7 @@ def _parse_line(self, line: str) -> typing.Dict[str, typing.Any]: # resolve shell redirects - trees: typing.List[typing.Any] = bashlex.parse(line, strictmode=False) # type: ignore[attr-defined] + trees: typing.List[typing.Any] = bashlex.parse(line, strictmode=False) positions: typing.List[typing.Tuple[int, int]] = [] for tree in trees: @@ -176,7 +176,7 @@ return command_dict def _parse_processed_line(self, line: str) -> typing.Dict[str, typing.Any]: - argv: typing.List[str] = list(bashlex.split(line)) # type: ignore[attr-defined] + argv: typing.List[str] = list(bashlex.split(line)) self.executable = argv.pop(0) parser: argparse.ArgumentParser = argparse.ArgumentParser(add_help=False) @@ -236,7 +236,7 @@ return command_dict -class nodevisitor(bashlex.ast.nodevisitor): # type: ignore[name-defined,misc] +class nodevisitor(bashlex.ast.nodevisitor): # type: ignore[misc] def __init__(self, positions: typing.List[typing.Tuple[int, int]]) -> None: self.positions = positions self.stdin = None @@ -245,14 +245,14 @@ def visitredirect( self, - n: bashlex.ast.node, # type: ignore[name-defined] + n: bashlex.ast.node, n_input: int, n_type: str, output: typing.Any, heredoc: typing.Any, ) -> None: if isinstance(n_input, int) and 0 <= n_input <= 2: - if isinstance(output, bashlex.ast.node) and output.kind == "word": # type: ignore[attr-defined] + if isinstance(output, bashlex.ast.node) and output.kind == "word": self.positions.append(n.pos) if n_input == 0: self.stdin = output.word @@ -265,7 +265,7 @@ else: sys.exit(f"oops 2: {type(n_input)}") - def visitheredoc(self, n: bashlex.ast.node, value: typing.Any) -> None: # type: ignore[name-defined] + def visitheredoc(self, n: bashlex.ast.node, value: typing.Any) -> None: pass
--- a/run_segalign_diagonal_partition Thu Apr 18 17:22:46 2024 +0000 +++ b/run_segalign_diagonal_partition Tue Apr 23 22:39:23 2024 +0000 @@ -93,7 +93,7 @@ time { while IFS= read -r line; do - "$TOOL_DIRECTORY/diagonal_partition.py" $MAX_SEGMENT_SIZE $line >> $LASTZ_COMMAND_FILE + python "$TOOL_DIRECTORY/diagonal_partition.py" $MAX_SEGMENT_SIZE $line >> $LASTZ_COMMAND_FILE # segalign sort writes out the partitioned segment files to the working # directory and prints the modified lastz commands. done < <(stdbuf -oL segalign $ref_path $query_path "${DATA_FOLDER}/" "$@" ) # segalign begins running in this line,