Mercurial > repos > onnodg > blast_annotations_processor
diff blast_annotations_processor.py @ 2:9ca209477dfd draft default tip
planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_annotations_tool commit 4017d38cf327c48a6252e488ba792527dae97a70-dirty
| author | onnodg |
|---|---|
| date | Mon, 15 Dec 2025 16:43:36 +0000 |
| parents | a3989edf0a4a |
| children |
line wrap: on
line diff
--- a/blast_annotations_processor.py Mon Oct 20 12:26:51 2025 +0000 +++ b/blast_annotations_processor.py Mon Dec 15 16:43:36 2025 +0000 @@ -23,74 +23,103 @@ """ import argparse -from collections import defaultdict import json import os +from collections import OrderedDict +from collections import defaultdict + import matplotlib.pyplot as plt +import numpy as np import pandas as pd -import numpy as np -import openpyxl # Default taxonomic levels TAXONOMIC_LEVELS = ["K", "P", "C", "O", "F", "G", "S"] + def parse_arguments(arg_list=None): """Parse command line arguments for cluster processing.""" - parser = argparse.ArgumentParser( - description='Process BLAST annotation results for Galaxy' - ) - # Required inputs - parser.add_argument('--input-anno', required=True, - help='Annotated BLAST output file (tabular format)') - parser.add_argument('--input-unanno', required=True, - help='Unannotated sequences file (FASTA format)') - # Optional outputs - parser.add_argument('--eval-plot', - help='Output path for E-value plot (PNG)') - parser.add_argument('--taxa-output', - help='Output path for taxa report (tabular)') - parser.add_argument('--circle-data', - help='Output path for circular taxonomy data (txt)') - parser.add_argument('--header-anno', - help='Output path for header annotations (tabular/xlsx)') - parser.add_argument('--anno-stats', - help='Output path for annotation statistics (txt)') - # Parameters - parser.add_argument('--uncertain-threshold', type=float, default=0.9, + parser = argparse.ArgumentParser(description='Process BLAST annotation results for Galaxy') + + parser.add_argument('--input-anno', required=True, help='Annotated BLAST output file (tabular format)') + parser.add_argument('--input-unanno', required=True, help='Unannotated sequences file (FASTA format)') + + parser.add_argument('--eval-plot', help='Output path for E-value plot (PNG)') + parser.add_argument('--taxa-output', help='Output path for taxa report (tabular)') + parser.add_argument('--circle-data', help='Output path for circular taxonomy data (txt)') + parser.add_argument('--header-anno', help='Output path for header annotations (tabular/xlsx)') + parser.add_argument('--log', help='Output path for log file (txt)', required=True) + parser.add_argument('--filtered-fasta', required=True, + help='Filtered fasta file (fasta format) for downstream analysis') + + parser.add_argument('--uncertain-threshold', type=float, default=0.9, required=True, help='Threshold for resolving taxonomic conflicts (default: 0.9)') - parser.add_argument('--eval-threshold', type=float, default=1e-10, + parser.add_argument('--eval-threshold', default='1e-10', type=float, required=True, help='E-value threshold for filtering results (default: 1e-10)') - parser.add_argument('--use-counts', action='store_true', default=True, + parser.add_argument('--use-counts', action='store_true', default=False, required=False, help='Use read counts in circular data') + parser.add_argument('--ignore-rank', default='unknown', required=False, + help='Ignore rank when containing this text') + parser.add_argument('--ignore-taxonomy', default='environmental', required=False, + help="Don't use taxonomy containing this taxonomy") + parser.add_argument('--bitscore-perc-cutoff', type=float, default=8, required=True, + help='Bitscore percentage cutoff for considered hits') + parser.add_argument('--min-bitscore', type=int, default=0, required=True, + help='Minimum bitscore threshold for hits') + parser.add_argument('-iot', '--ignore-obiclean-type', type=str, default='singleton', required=False, + help='Ignore sequences with this obiclean type') + parser.add_argument('-iit', '--ignore-illuminapairend-type', type=str, default='pairend', required=False, + help='Ignore sequences with this illumina paired end output type') + parser.add_argument('--min-identity', type=int, default=0, required=True, + help='Minimum sequence identity to consider a hit') + parser.add_argument('--min-coverage', type=int, default=0, required=True, + help='Minimum sequence coverage to consider a hit') + parser.add_argument('--ignore-seqids', type=str, default='', required=False, + help='Ignore sequences with this sequence identifier') + parser.add_argument('--min-support', type=int, default=0, required=True, + help='A taxon is kept only if it (or its descendants) have at least N reads assigned.') return parser.parse_args(arg_list) +def log_message(log_messages, msg): + """ + Helper to both print and collect log messages. + """ + if log_messages is not None: + log_messages.append(msg) + + def list_to_string(x): """ - Convert a list, pandas Series, or numpy array to a comma-separated string.""" + Convert a list, pandas Series, or numpy array to a comma-separated string. + """ if isinstance(x, (list, pd.Series, np.ndarray)): return ", ".join(map(str, x)) return str(x) -def make_eval_plot(e_val_sets, output_path): + +def make_eval_plot(e_val_sets, output_path, log_messages): """ - Create a bar plot of E-values per read. + Generate an E-value distribution plot. - :param e_val_sets: List of sets containing E-values per read. + This function aggregates E-values per read, transforms them onto a log axis + and produces a visual summary of the distribution of best hits across reads. + If no E-values are available, no plot is created. + + :param e_val_sets: Set of E-values per read. :type e_val_sets: list[set[str]] - :param output_path: Path to save the plot (PNG). + :param output_path: Output PNG file. :type output_path: str + :param log_messages: Log collection list. + :type log_messages: list[str] :return: None :rtype: None """ if not e_val_sets: - print("No E-values to plot") + log_message(log_messages, "No E-values to plot") return None - # Convert string E-values to floats and sort - processed_sets = sorted([sorted(float(e_val) for e_val in e_val_set) - for e_val_set in e_val_sets]) + processed_sets = sorted([sorted(float(e_val) for e_val in e_val_set) for e_val_set in e_val_sets]) bar_positions = [] bar_heights = [] @@ -111,8 +140,7 @@ plt.yscale('log') plt.grid(axis='y', linestyle='--', color='gray', alpha=0.7) plt.ylabel("e-values") - plt.xticks(ticks=range(len(processed_sets)), - labels=[f'{i + 1}' for i in range(len(processed_sets))]) + plt.xticks(ticks=range(len(processed_sets)), labels=[f'{i + 1}' for i in range(len(processed_sets))]) plt.tight_layout() plt.savefig(output_path, format='png') @@ -147,7 +175,6 @@ - 'total_unique' (int): Total number of unique sequences. :rtype: dict[str, float | int] """ - # Count total sequences in unannotated file total_sequences = 0 with open(unanno_file_path, 'r') as f: total_sequences = sum(1 for line in f if line.startswith('>')) @@ -155,222 +182,69 @@ percentage_annotated = (anno_count / total_sequences * 100) if total_sequences > 0 else 0 percentage_unique_annotated = (unique_anno_count / total_unique_count * 100) if total_unique_count > 0 else 0 - return { - 'percentage_annotated': percentage_annotated, - 'annotated_sequences': anno_count, - 'total_sequences': total_sequences, - 'percentage_unique_annotated': percentage_unique_annotated, - 'unique_annotated': unique_anno_count, - 'total_unique': total_unique_count - } - -def _choose_taxon(level_counts: dict, threshold: float): - """ - Determine the most representative taxonomic level based on counts and a threshold. - - This function evaluates a dictionary of taxonomic level counts and returns the - level name that holds a strict majority. If no level reaches the threshold, or if - there is a tie for the highest count, the function returns ``None``. - - :param level_counts: A dictionary mapping taxonomic level names (e.g., "species", - "genus") to their respective counts. - :type level_counts: dict[str, int] - :param threshold: The minimum fraction (between 0 and 1) of the total counts that - the top taxonomic level must have to be considered dominant. - :type threshold: float - - :return: The name of the taxonomic level that meets or exceeds the threshold and - is not tied with another level. Returns ``None`` if no level qualifies or - if there is a tie. - :rtype: str | None - - :example: - _choose_taxon({"species": 6, "genus": 3, "family": 1}, 0.5) - 'species' - _choose_taxon({"species": 4, "genus": 4}, 0.6) - None - _choose_taxon({"species": 2, "genus": 3}, 0.6) - 'genus' - """ - total = sum(level_counts.values()) - if total == 0: - return None - items = sorted(level_counts.items(), key=lambda x: x[1], reverse=True) - top_name, top_count = items[0] - # tie -> no clear majority - if sum(1 for _, c in items if c == top_count) > 1: - return None - if top_count / total >= threshold: - return top_name - return None + return {'percentage_annotated': percentage_annotated, 'annotated_sequences': anno_count, + 'total_sequences': total_sequences, 'percentage_unique_annotated': percentage_unique_annotated, + 'unique_annotated': unique_anno_count, 'total_unique': total_unique_count} -def _resolve_conflicts_recursive(conflicting: list, level_idx: int, threshold: float, max_levels: int, prefix=None): - """ - Recursively resolve taxonomic classification conflicts across multiple levels. - - This function attempts to resolve conflicts between multiple taxonomic paths by - recursively evaluating each taxonomic level. At each level, it determines whether - there is a clear majority (based on a specified threshold) for a taxon name. If - a majority is found, the function continues to the next level using only the - taxa that match that name. If no majority exists, the function constructs - "uncertain" outputs to indicate ambiguous classification. - - :param conflicting: List of tuples containing full taxonomic paths and their - associated counts, e.g. ``[("Bacteria / Proteobacteria / Gammaproteobacteria", 10), - ("Bacteria / Proteobacteria / Alphaproteobacteria", 5)]``. - :type conflicting: list[tuple[str, int]] - :param level_idx: The current taxonomic level index being evaluated (0-based). - :type level_idx: int - :param threshold: The minimum fraction (between 0 and 1) required for a strict - majority decision at a given taxonomic level. - :type threshold: float - :param max_levels: The maximum number of taxonomic levels to evaluate before - stopping recursion. - :type max_levels: int - :param prefix: A list of already-resolved taxon names up to the previous level. - Defaults to ``None``, in which case an empty list is used. - :type prefix: list[str] | None - - :return: A tuple containing: - - **short_output** (str): A simplified taxonomic path with "Uncertain taxa" - appended if no consensus was reached at the current level. - - **full_output** (str): The full taxonomic path, filled with additional - "Uncertain taxa" entries up to the minimal conflicting level depth. - :rtype: tuple[str, str] - - :example: - conflicts = [ - ... ("Bacteria / Proteobacteria / Gammaproteobacteria", 10), - ... ("Bacteria / Proteobacteria / Alphaproteobacteria", 5) - ... ] - _resolve_conflicts_recursive(conflicts, level_idx=0, threshold=0.6, max_levels=5) - ('Bacteria / Proteobacteria / Uncertain taxa', - 'Bacteria / Proteobacteria / Uncertain taxa / Uncertain taxa') - """ - if prefix is None: - prefix = [] - - if not conflicting: - return "", "" - - # split paths (keeps sync) - conflicting_levels = [t.split(" / ") for t, _ in conflicting] - min_conflicting_level = min(len(p) for p in conflicting_levels) - - # If only one full path remains -> return it (both short & full) - if len(conflicting) == 1: - return conflicting[0][0], conflicting[0][0] +def resolve_tax_majority(taxon_counts, uncertain_threshold): + total_count = sum(taxon_counts.values()) + if total_count == 0: + return "Uncertain taxa", "Uncertain taxa" + most_common_taxon, count = max(taxon_counts.items(), key=lambda x: x[1]) + # Use most common if above threshold + if count / total_count >= uncertain_threshold: + return most_common_taxon, most_common_taxon + conflicting_taxa = list(taxon_counts.keys()) + conflicting_levels = [taxon.split(" / ") for taxon in conflicting_taxa] - # If we've reached deepest comparable level or max_levels -> pick most supported full path - if level_idx >= min_conflicting_level or level_idx >= max_levels: - full_counts = defaultdict(int) - for t, c in conflicting: - full_counts[t] += c - chosen_full = max(full_counts.items(), key=lambda x: x[1])[0] - return chosen_full, chosen_full - - # Count names at this level - level_counts = defaultdict(int) - for taxon, count in conflicting: - parts = taxon.split(" / ") - if level_idx < len(parts): - level_counts[parts[level_idx]] += count - - # If everyone agrees at this level, append that name and go deeper - if len(level_counts) == 1: - name = next(iter(level_counts)) - return _resolve_conflicts_recursive(conflicting, level_idx + 1, threshold, max_levels, prefix + [name]) - - # Check for a strict majority at this level - chosen_level_name = _choose_taxon(level_counts, threshold) - if chosen_level_name is not None: - # Keep only taxa that match the chosen name at this level and go deeper - filtered = [(t, c) for (t, c) in conflicting - if level_idx < len(t.split(" / ")) and t.split(" / ")[level_idx] == chosen_level_name] - return _resolve_conflicts_recursive(filtered, level_idx + 1, threshold, max_levels, prefix + [chosen_level_name]) - - # No majority at this level -> construct uncertain outputs. - # short: prefix + 'Uncertain taxa' (stop here) - short_path = prefix + ['Uncertain taxa'] - short_output = " / ".join(short_path) - - # full: prefix + 'Uncertain taxa' + fill until min_conflicting_level - full_path = short_path + ['Uncertain taxa'] * (min_conflicting_level - (level_idx + 1)) - full_output = " / ".join(full_path) - - return short_output, full_output - - -def resolve_taxon_majority(taxon_counts: dict, threshold: float = 0.90, max_levels: int = 7): - """ - Resolve the majority consensus taxonomic classification from a set of annotated paths. - - :param taxon_counts: A dictionary mapping full taxonomic paths to their respective - occurrence counts. For example: - ``{"Bacteria / Proteobacteria / Gammaproteobacteria": 10, - "Bacteria / Proteobacteria / Alphaproteobacteria": 5}``. - :type taxon_counts: dict[str, int] - :param threshold: Minimum fraction (between 0 and 1) required for a strict majority - at each taxonomic level. Defaults to ``0.90``. - :type threshold: float, optional - :param max_levels: Maximum number of taxonomic levels to evaluate before stopping - recursion. Defaults to ``7``. - :type max_levels: int, optional - - :return: A tuple containing: - - **short_output** (str): The consensus taxonomic path, possibly ending with - "Uncertain taxa" if ambiguity remains. - - **full_output** (str): The expanded taxonomic path including placeholder - "Uncertain taxa" labels up to the minimal conflicting depth. - :rtype: tuple[str, str] - """ - conflicting = list(taxon_counts.items()) # list of (path, count) keeps taxa<->count synced - return _resolve_conflicts_recursive(conflicting, level_idx=0, threshold=threshold, max_levels=max_levels) + # Resolve uncertainty per level + for level_idx in range(min(len(level) for level in conflicting_levels)): + current_level_names = {level[level_idx] for level in conflicting_levels} + if len(current_level_names) > 1: + lowest_common_path = conflicting_levels[0][:level_idx + 1] + lowest_common_path[-1] = 'Uncertain taxa' + small_output = " / ".join(lowest_common_path) + min_conflicting_level = min(len(level) for level in conflicting_levels) + uncertain_path = conflicting_levels[0][:level_idx + 1] + uncertain_path[-1] = 'Uncertain taxa' + for _ in range(level_idx + 1, min_conflicting_level): + uncertain_path.append('Uncertain taxa') + return small_output, " / ".join(uncertain_path) + return conflicting_taxa[0], conflicting_taxa[0] def process_taxa_output(taxa_dicts, output_path, uncertain_threshold): """ - Generate a tabular taxa report from a list of read taxonomies. + Generate a hierarchical taxa summary file. - This function processes taxonomic assignments for each read in a dataset - and produces a summary report similar in style to Kraken2 output. The - report includes: + This function resolves best taxonomic assignments per read and aggregates + counts into taxonomic levels, producing a readable text report listing + percentages, counts and resolved taxonomy. - - The number of reads that were marked as "Uncertain taxa" at each - taxonomic rank. - - For all other taxa, the number of reads assigned and their percentage - relative to the sample total. - - The taxonomic rank for each assignment, indicated in the report with - indentation proportional to the rank depth. - - :param taxa_dicts: List of taxon count dictionaries, one per read, where - keys are taxon names and values are counts of how often - they were assigned to the read. - :type taxa_dicts: list[dict[str, int]] - :param output_path: Path to save the tabular taxa report. + :param taxa_dicts: Best taxa dictionaries per read. + :type taxa_dicts: list[dict] + :param output_path: Path of output report. :type output_path: str - :param uncertain_threshold: Threshold for resolving conflicting taxonomic - assignments (values below the threshold are - marked as "Uncertain taxa"). + :param uncertain_threshold: Confidence value for majority voting. :type uncertain_threshold: float - :return: None. The function writes the taxa report to ``output_path``. + :return: None :rtype: None """ uncertain_dict = {level: 0 for level in TAXONOMIC_LEVELS} aggregated_counts = defaultdict(int) for read_taxa in taxa_dicts: - resolved_taxon, _ = resolve_taxon_majority(read_taxa, uncertain_threshold) + if not read_taxa: + continue + resolved_taxon, _ = resolve_tax_majority(read_taxa, uncertain_threshold) # Add counts for resolved taxon levels = resolved_taxon.split(" / ") for i in range(1, len(levels) + 1): aggregated_counts[" / ".join(levels[:i])] += 1 - # Calculate total count - total_count = sum(value for key, value in aggregated_counts.items() - if len(key.split(" / ")) == 1) + total_count = sum(value for key, value in aggregated_counts.items() if len(key.split(" / ")) == 1) report_lines = [] @@ -381,83 +255,50 @@ taxon_level_code = TAXONOMIC_LEVELS[len(levels) - 1] if len(levels) <= len(TAXONOMIC_LEVELS) else "U" percentage = (count / total_count) * 100 if total_count > 0 else 0 - report_lines.append( - f"{percentage:.2f}\t{count}\t{total_count}\t{taxon_level_code}\t{indent}{taxon_name}" - ) + report_lines.append(f"{percentage:.2f}\t{count}\t{total_count}\t{taxon_level_code}\t{indent}{taxon_name}") if taxon_name == 'Uncertain taxa': uncertain_dict[taxon_level_code] += count - # Write output with open(output_path, 'w') as f: - f.write("Uncertain count per taxonomie level" + str(uncertain_dict) + '\n') - f.write('percentage_rooted\tnumber_rooted\ttotal_num\ttaxon_level\tindentificatie\n') + f.write("Uncertain count per taxonomic level" + str(uncertain_dict) + '\n') + f.write('percentage_rooted\tnumber_rooted\ttotal_num\ttaxon_level\tidentification\n') f.write("\n".join(report_lines)) -def process_header_annotations(taxa_dicts, headers, e_values, output_path, uncertain_threshold, - identity_list, coverage_list, source_list, bitscore_list): +def process_header_annotations(taxa_dicts, headers, output_path, uncertain_threshold, source_list, seq_id_list, + log_messages): """ - Generate an Excel report with per-read and aggregated taxonomic annotations. - - This function processes taxonomic assignments per read, along with associated - metrics (e-value, identity, coverage, bitscore, source), and generates an Excel - file with two sheets: + Create an Excel report summarizing individual and merged read annotations. - 1. **Individual_Reads** – One row per sequence, including all metrics and - the resolved taxonomic path. - 2. **Merged_by_Taxa** – Aggregated results per unique taxonomic path, including - merged metrics and counts. + This function converts best taxonomy assignments into a per-read table, + generates an aggregated table grouped by taxon, and saves both tables to + separate sheets in a single Excel file. - Internally, the function creates a temporary TSV file, processes it into - a pandas DataFrame, expands the taxonomic path into separate columns - (e.g., kingdom, phylum, class, …), and then writes the results into a - multi-sheet Excel file. Column widths are automatically adjusted for readability. - - :param taxa_dicts: List of taxon count dictionaries, one per read, where keys - are taxa and values are their counts in the read. - :type taxa_dicts: list[dict[str, int]] - :param headers: Sequence headers corresponding to each read. Used to extract - read identifiers and counts. + :param taxa_dicts: Taxonomy assignments per read. + :type taxa_dicts: list[dict] + :param headers: Original FASTA headers. :type headers: list[str] - :param e_values: List of e-values per read. Each element is a list of values - associated with that read. - :type e_values: list[list[float]] - :param output_path: Path to the Excel file to be created. + :param output_path: Output Excel file name. :type output_path: str - :param uncertain_threshold: Threshold for resolving conflicting taxonomic - assignments. Values below the threshold are - replaced with "Uncertain taxa". + :param uncertain_threshold: Confidence level for taxonomy conflict resolution. :type uncertain_threshold: float - :param identity_list: Percent identity values per read. - :type identity_list: list[list[str]] - :param coverage_list: Coverage percentage values per read. - :type coverage_list: list[list[str]] - :param source_list: Source identifiers per read (e.g., database names). + :param source_list: Source identifiers per read. :type source_list: list[list[str]] - :param bitscore_list: Bitscore values per read. - :type bitscore_list: list[list[str]] - :return: None. The function writes results to an Excel file at ``output_path``. + :param seq_id_list: Sequence identifiers per read. + :type seq_id_list: list[list[str]] + :param log_messages: Log collection list. + :type log_messages: list[str] + :return: None :rtype: None - :raises PermissionError: If the temporary TSV or output Excel file cannot be written - (e.g., file is open in another program). - :raises IndexError: If the input lists (headers, e_values, etc.) are not aligned - with ``taxa_dicts``. - :raises ValueError: If sequence headers are not formatted as expected for extracting counts. """ report_lines = [] for i, read_taxa in enumerate(taxa_dicts): - _, resolved_taxon_long = resolve_taxon_majority(read_taxa, uncertain_threshold) - try: - e_val = e_values[i][0] if e_values[i] else "N/A" - identity = identity_list[i][0] if identity_list[i] else "N/A" - coverage = coverage_list[i][0] if coverage_list[i] else "N/A" - source = source_list[i][0] if source_list[i] else "N/A" - bitscore = bitscore_list[i][0] if bitscore_list[i] else "N/A" - except (IndexError, TypeError) as e: - print(f"Mismatch while extracting values: {e}. Check that your annotated and unannotated input files correspond correctly.") + if not read_taxa: continue - + _, resolved_taxon_long = resolve_tax_majority(read_taxa, uncertain_threshold) + source = source_list[i][0] if source_list[i] else "N/A" + seq_id = seq_id_list[i][0] if seq_id_list[i] else "N/A" header = headers[i] if i < len(headers) else f"Header missing" # Extract count @@ -465,47 +306,35 @@ header_base, count_str = header.rsplit("(", 1) count = int(count_str.rstrip(")")) except ValueError as e: - print(f'Failed extracting count: {e}') + log_message(log_messages, f'Failed extracting count: {e}') header_base = header count = 1 - report_lines.append( - f'{header_base}\t{e_val}\t{identity}\t{coverage}\t{bitscore}\t{count}\t{source}\t{resolved_taxon_long}') - # Create temporary TSV for processing + report_lines.append(f'{header_base}\t{seq_id}\t{source}\t{count}\t{resolved_taxon_long}') + # temp path is needed to write to a xlsx from galaxy temp_tsv_path = 'temp.tsv' try: with open(temp_tsv_path, 'w') as f: - f.write('header\te_value\tidentity percentage\tcoverage\tbitscore\tcount\tsource\ttaxa\n') + f.write('header\tseq_id\tsource\tcount\ttaxa\n') f.write("\n".join(report_lines)) except PermissionError as e: - print(f"Unable to write to file, error: {e} file might be opened") + log_message(log_messages, f"Unable to write to file, error: {e} file might be opened") - # Process the data df = pd.read_csv(temp_tsv_path, sep='\t', encoding="latin1") if not df.empty: - # Split taxa into separate columns taxa_split = df["taxa"].str.split(" / ", expand=True) max_levels = taxa_split.shape[1] if taxa_split.shape[1] is not None else 7 level_names = ["kingdom", "phylum", "class", "order", "family", "genus", "species"][:max_levels] taxa_split.columns = level_names - # Add taxa columns to main dataframe df_individual = pd.concat([df, taxa_split], axis=1) df_individual = df_individual.sort_values(['species', 'genus', 'family'], ascending=[True, True, True]) - # Create merged dataframe - first add taxa columns, then aggregate df_for_merge = df_individual.copy() group_cols = ["taxa"] - agg_dict = { - "e_value": lambda x: f"{x.min()}–{x.max()}", - "identity percentage": lambda x: list_to_string(list(x.unique())), - "coverage": lambda x: list_to_string(list(x.unique())), - "bitscore": lambda x: list_to_string(list(x.unique())), - "count": "sum", - "source": "first" - } + agg_dict = {"seq_id": lambda x: list_to_string(list(x.unique())), "count": "sum", + "source": lambda x: list_to_string(list(x.unique())), } - # Add aggregation for taxa levels that actually exist for level in level_names: if level in df_for_merge.columns: agg_dict[level] = "first" @@ -515,7 +344,6 @@ sort_columns = [] sort_ascending = [] - # Add family, genus, species if they exist for level in ['species', 'genus', 'family']: if level in df_merged.columns: sort_columns.append(level) @@ -523,14 +351,12 @@ df_merged = df_merged.sort_values(sort_columns, ascending=sort_ascending) - # Write both datasets to single Excel file with multiple sheets temp_path = output_path + ".xlsx" os.makedirs(os.path.dirname(temp_path), exist_ok=True) with pd.ExcelWriter(temp_path, engine='openpyxl', mode='w') as writer: df_individual.to_excel(writer, sheet_name='Individual_Reads', index=False) df_merged.to_excel(writer, sheet_name='Merged_by_Taxa', index=False) - # Auto-adjust column widths for both sheets for sheet_name in writer.sheets: worksheet = writer.sheets[sheet_name] for column in worksheet.columns: @@ -540,54 +366,43 @@ try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) - except: + except AttributeError as e: + log_message(log_messages, f"Error {e}, {cell} has no value, something probably went wrong") pass - adjusted_width = min(max_length + 2, 50) # Cap at 50 characters + adjusted_width = min(max_length + 2, 50) worksheet.column_dimensions[column_letter].width = adjusted_width os.replace(temp_path, output_path) # Clean up temporary file os.remove(temp_tsv_path) else: - print("Dataframe empty, no annotation results") + log_message(log_messages, "Dataframe empty, no annotation results") -def create_circle_diagram(taxa_dicts, output_path, use_counts, uncertain_threshold): +def create_circle_data(taxa_dicts, output_path, use_counts, uncertain_threshold): """ - Generate hierarchical JSON data for a circular taxonomy diagram. - - This function aggregates taxonomic classification results from multiple reads - and prepares hierarchical count data suitable for circular visualization. + Generate circular taxonomy layer data. - :param taxa_dicts: A list of dictionaries, where each dictionary represents a - mapping from full taxonomic paths to their corresponding counts for a single - read. Each key should be a taxonomic path in the format - :type taxa_dicts: list[dict[str, int]] - :param output_path: File path to write the generated JSON output containing - circle diagram data. + The function converts resolved taxonomy per read into hierarchical + cumulative counts that can be visualized as concentric circles. + + :param taxa_dicts: Per-read taxonomy assignments. + :type taxa_dicts: list[dict] + :param output_path: JSON destination file. :type output_path: str - :param use_counts: Whether to aggregate counts by total read frequency - (``True``) or to count only unique taxa once (``False``). - :type use_counts: boolean - :param uncertain_threshold: Fraction (between 0 and 1) representing the minimum - majority threshold used for resolving conflicting taxonomic assignments. + :param use_counts: Whether to count total reads or unique taxa. + :type use_counts: bool + :param uncertain_threshold: Majority resolution threshold. :type uncertain_threshold: float - - :return: None. The resulting JSON file will be written to ``output_path``. The - file contains a list of dictionaries, one per taxonomic level, with: - - **labels** (*list[str]*): Names of taxa at that level. - - **sizes** (*list[int]*): Corresponding aggregated counts. + :return: None :rtype: None - - :note: - - Ambiguous classifications are labeled as "Uncertain taxa". - - The number of hierarchical levels is determined by the length of - ``TAXONOMIC_LEVELS``. """ aggregated_counts = defaultdict(int) seen_taxa = set() for read_taxa in taxa_dicts: - _, resolved_taxon_long = resolve_taxon_majority(read_taxa, uncertain_threshold) + if not read_taxa: + continue + _, resolved_taxon_long = resolve_tax_majority(read_taxa, uncertain_threshold) levels = resolved_taxon_long.split(" / ") @@ -600,7 +415,6 @@ aggregated_counts[" / ".join(levels[:i])] += 1 seen_taxa.add(resolved_taxon_long) - # Prepare circle data circle_data = [] for level in range(len(TAXONOMIC_LEVELS)): circle_data.append({"labels": [], "sizes": []}) @@ -614,279 +428,429 @@ json.dump(circle_data, f, indent=2) -def process_single_file(anno_file_path, unanno_file_path, args): +def read_unannotated_fasta(unanno_file_path, ignore_illuminapairend_type, ignore_obiclean_type, min_support, + filtered_fasta_path, log_messages): """ - Process a single annotated BLAST file and generate all requested analytical outputs. + Read, filter and optionally rewrite an unannotated FASTA file. - This function integrates multiple steps for analyzing annotated BLAST output - files and corresponding unannotated FASTA sequences. It organizes BLAST hits - per query, extracts the best-scoring matches, and produces various types of - output files (plots, tables, diagrams, and statistics) according to user - arguments. + This function extracts dereplicated sequence counts, applies header filters + and a minimum support threshold, collects statistics and, if requested, + writes a filtered FASTA file for downstream processing. - :param anno_file_path: Path to the annotated BLAST results file. - :type anno_file_path: str - :param unanno_file_path: Path to a FASTA file containing unannotated sequences. - Used to determine sequence order and total unique sequence counts. + :param unanno_file_path: Path to FASTA file. :type unanno_file_path: str - :param args: Parsed command-line arguments containing the following attributes: - - **eval_threshold** (*float*): Maximum allowed E-value for valid hits. - - **eval_plot** (*str | None*): Output path for E-value distribution plot. - - **taxa_output** (*str | None*): Output path for taxonomic summary table. - - **header_anno** (*str | None*): Output path for annotated header table. - - **circle_data** (*str | None*): Output path for circular diagram JSON data. - - **anno_stats** (*str | None*): Output path for annotation statistics file. - - **use_counts** (*bool*): Whether to use read counts or unique taxa only. - - **uncertain_threshold** (*float*): Majority threshold for resolving taxonomic ambiguity. + :param ignore_illuminapairend_type: Illumina pair-end status to remove. + :type ignore_illuminapairend_type: str + :param ignore_obiclean_type: Obiclean label to remove. + :type ignore_obiclean_type: str + :param min_support: Minimum dereplicated read count retained. + :type min_support: int + :param filtered_fasta_path: Optional path for filtered FASTA output. + :type filtered_fasta_path: str | None + :param log_messages: Log collection list. + :type log_messages: list[str] + :return: (headers, total_unique_count, invalid_count) + :rtype: tuple[list[str], int, int] + """ + headers = [] + total_unique_count = 0 + invalid_count = 0 + + total_headers = 0 + low_support_count = 0 + header_filter_count = 0 + written_fasta_count = 0 + + if not os.path.exists(unanno_file_path): + log_message(log_messages, f"Warning: Unannotated file {unanno_file_path} not found") + return headers, total_unique_count, invalid_count + + write_fasta = filtered_fasta_path is not None + filtered_fasta = None + if write_fasta: + try: + filtered_fasta = open(filtered_fasta_path, "w") + except OSError as e: + log_message(log_messages, f"Error: Cannot open {filtered_fasta_path} for writing: {e}") + write_fasta = False + + write_flag = False + + with open(unanno_file_path, 'r') as f: + for line in f: + if line.startswith('>'): + total_headers += 1 + header_line = line.rstrip("\n") + header = header_line.split()[0].strip('>') + if "count=" in header_line: + count = int(header_line.split("count=")[1].split(";")[0]) + else: + count = 0 + + passes_header_filter = check_header_string(header_line, ignore_illuminapairend_type, + ignore_obiclean_type) + + if passes_header_filter and int(count) >= int(min_support): + headers.append(header) + total_unique_count += count + + write_flag = True + if write_fasta: + filtered_fasta.write(line) + written_fasta_count += 1 + else: + # Invalid header (either header filter or low support) + if not passes_header_filter: + header_filter_count += 1 + elif int(count) < int(min_support): + low_support_count += 1 + + invalid_count += 1 + write_flag = False + else: + if write_flag and write_fasta: + filtered_fasta.write(line) + + if write_fasta and filtered_fasta is not None: + filtered_fasta.close() + log_message(log_messages, f"Filtered FASTA written succesfully" + f"({written_fasta_count} sequences)") + + log_message(log_messages, f"FASTA: total headers: {total_headers}") + log_message(log_messages, f"FASTA: headers kept after filters and min_support={min_support}: {len(headers)}") + log_message(log_messages, f"FASTA: removed due to header filters (illumina/obiclean/etc.): {header_filter_count}") + log_message(log_messages, f"FASTA: removed due to low dereplicated count (<{min_support}): {low_support_count}") + log_message(log_messages, f"FASTA: total invalid (header filter + low support): {invalid_count}") + + return headers, total_unique_count, invalid_count + + +def parse_blast_output(anno_file_path, eval_threshold, ignore_taxonomy, ignore_rank, min_coverage, min_identity, + min_bitscore, ignore_seqids, log_messages): + """ + Parse a BLAST tabular file and retain high-quality hits. + + This function reads BLAST annotations, applies E-value and quality filters, + excludes invalid taxa, groups remaining hits by query identifier, and logs + summary statistics. + + :param anno_file_path: Input BLAST result file. + :type anno_file_path: str + :param eval_threshold: Maximum allowed E-value. + :type eval_threshold: float + :param ignore_taxonomy: Taxonomy substring filter. + :type ignore_taxonomy: str + :param ignore_rank: Rank substring filter. + :type ignore_rank: str + :param min_coverage: Minimum coverage threshold. + :type min_coverage: int + :param min_identity: Minimum identity threshold. + :type min_identity: int + :param min_bitscore: Minimum bitscore threshold. + :type min_bitscore: int + :param ignore_seqids: Comma-separated seqids to ignore. + :type ignore_seqids: str + :param log_messages: Log collection list. + :type log_messages: list[str] + :return: Mapping of q_id → hits + :rtype: OrderedDict[str, list[dict]] + """ + if not os.path.exists(anno_file_path): + log_message(log_messages, f"Error: Input file {anno_file_path} not found") + return OrderedDict() + + blast_groups = OrderedDict() + + total_hits = 0 + kept_hits = 0 + filtered_hits = 0 + invalid_taxon_count = 0 + ignored_seqid_count = 0 + + try: + with open(anno_file_path, 'r') as f: + for line in f: + if line.startswith("#"): + continue + + parts = line.strip().split('\t') + if len(parts) < 7: + log_message(log_messages, f"less than 7 parts in line, skipped faulty line: {line}") + continue + + q_id = parts[0] + total_hits += 1 + + try: + e_val = float(parts[6]) + except ValueError as e: + log_message(log_messages, f"Error {e}, while extracting evalue") + filtered_hits += 1 + continue + + taxon = parts[-1].strip() + if is_invalid_taxon(taxon, ignore_taxonomy, ignore_rank): + invalid_taxon_count += 1 + filtered_hits += 1 + continue + + seq_id = parts[2] if len(parts) > 2 else '' + ignore_seqid_set = set(parse_list_param(ignore_seqids)) + if seq_id in ignore_seqid_set: + ignored_seqid_count += 1 + filtered_hits += 1 + continue + + identity = parts[4] if len(parts) > 4 else '' + cov = parts[5] if len(parts) > 5 else '' + bitscore = float(parts[7]) if len(parts) > 7 and parts[7] else 0.0 + source = parts[8] if len(parts) > 8 else '' + + hit = {'e_val': e_val, 'identity': identity, 'cov': cov, 'bitscore': bitscore, 'source': source, + 'taxon': taxon, 'seq_id': seq_id, } + + if not is_insufficient_hit(hit, min_coverage, min_identity, min_bitscore, eval_threshold): + blast_groups.setdefault(q_id, []).append(hit) + kept_hits += 1 + else: + filtered_hits += 1 + except Exception as e: + log_message(log_messages, f"Error reading BLAST file {anno_file_path}: {e}") + return OrderedDict() + + log_message(log_messages, f"BLAST: total hits read: {total_hits}") + log_message(log_messages, f"BLAST: hits kept after quality filters: {kept_hits}") + log_message(log_messages, f"BLAST: hits filtered (evalue/coverage/identity/bitscore): {filtered_hits}") + log_message(log_messages, f"BLAST: hits removed due to invalid taxon: {invalid_taxon_count}") + log_message(log_messages, f"BLAST: hits removed due to ignored seqids: {ignored_seqid_count}") + + return blast_groups + + +def process_single_query(header, hits, bitscore_perc_cutoff): + """ + Process all BLAST hits for a single query/header and return a summary dict. + """ + current_read_info = {'taxa_dict': {}, 'header': header, 'e_values': set(), 'identities': set(), 'coverages': set(), + 'sources': set(), 'bitscores': set(), 'seq_id': set(), 'best_taxa_dict': {}} + + max_bitscore = max(h['bitscore'] for h in hits) + if bitscore_perc_cutoff > 0: + top_bitscore = float(max_bitscore) * (1 - (float(bitscore_perc_cutoff) / 100.0)) + else: + top_bitscore = float(max_bitscore) + current_read_info['best_taxa_dict'] = defaultdict(int) + + for hit in hits: + e_val = hit['e_val'] + identity = hit['identity'] + cov = hit['cov'] + bitscore = hit['bitscore'] + source = hit['source'] + taxon = hit['taxon'] + seq_id = hit['seq_id'] + + # Keep track of hits that pass bitscore threshold + if bitscore >= top_bitscore: + current_read_info['best_taxa_dict'][taxon] += 1 + current_read_info['e_values'].add(str(e_val)) + current_read_info['identities'].add(identity) + current_read_info['coverages'].add(cov) + current_read_info['sources'].add(source) + current_read_info['bitscores'].add(bitscore) + current_read_info['seq_id'].add(seq_id) + + return current_read_info + + +def process_single_file(anno_file_path, unanno_file_path, args, log_messages): + """ + Execute the complete BLAST annotation processing workflow. + + This function performs FASTA parsing, BLAST parsing, taxonomy resolution, + per-read and aggregated reporting, summary statistics computation, and + generation of optional plot and table outputs. + + All validations, errors and counters are written into the log collector. + + :param anno_file_path: BLAST annotation file. + :type anno_file_path: str + :param unanno_file_path: Input FASTA file. + :type unanno_file_path: str + :param args: Parsed arguments namespace. :type args: argparse.Namespace - - :return: None. This function writes multiple output files to the paths - specified in ``args`` + :param log_messages: Log collection list. + :type log_messages: list[str] + :return: None :rtype: None + """ + log_message(log_messages, f"Starting processing for FASTA") + log_message(log_messages, "=== PARAMETERS USED ===") - :raises FileNotFoundError: If the annotated BLAST or unannotated FASTA file cannot be found. - :raises ValueError: If file content is malformed or missing required columns. + skip_keys = { + "input_anno", + "input_unanno", + "eval_plot", + "taxa_output", + "circle_data", + "header_anno", + "log", + "filtered_fasta", + } + + for key, value in vars(args).items(): + if key in skip_keys: + continue + log_message(log_messages, f"{key}: {value}") - :notes: - - The function first groups BLAST hits by query ID and filters out invalid or - low-confidence taxa. - - For each query, the best BLAST hit is determined based on bitscore and E-value. - - The total number of unique annotated sequences is inferred from sequence - headers containing a ``count=`` field. - - Progress and warnings are printed to standard output for traceability. - """ - # Core data structures - one per read - read_data = [] # List of dicts, one per read + log_message(log_messages, "=== END PARAMETERS ===") + unanno_headers_ordered, total_unique_count, invalid_count = read_unannotated_fasta(unanno_file_path, + args.ignore_illuminapairend_type, args.ignore_obiclean_type, args.min_support, args.filtered_fasta, + log_messages) + + unanno_set = set(unanno_headers_ordered) + + if not os.path.exists(anno_file_path): + log_message(log_messages, f"Error: Input file {anno_file_path} not found") + with open(args.log, 'w') as f: + print('gaat nog niet goed hoor') + f.write("\n".join(log_messages)) + return + + log_message(log_messages, f"Reading BLAST annotations") + blast_groups = parse_blast_output(anno_file_path, args.eval_threshold, args.ignore_taxonomy, args.ignore_rank, + args.min_coverage, args.min_identity, args.min_bitscore, args.ignore_seqids, log_messages) + + extra_blast_qids = [q for q in blast_groups.keys() if q not in unanno_set] + if extra_blast_qids: + log_message(log_messages, + f"Note: {len(extra_blast_qids)} BLAST q_ids not in FASTA (showing up to 10): {extra_blast_qids[:10]}") + + read_data = [] headers = [] seen_reads = set() annotated_unique_count = 0 - total_unique_count = 0 - # Current read processing - current_q_id = '' - current_read_info = None - - # Check if file exists and has content - if not os.path.exists(anno_file_path): - print(f"Error: Input file {anno_file_path} not found") - return - - unanno_headers_ordered = [] - if os.path.exists(unanno_file_path): - with open(unanno_file_path, 'r') as f: - for line in f: - if line.startswith('>'): - header = line.split()[0].strip('>') - unanno_headers_ordered.append(header) - print(f"Found {len(unanno_headers_ordered)} headers in unannotated file") - else: - print(f"Warning: Unannotated file {unanno_file_path} not found") - - # Counter to track which read we're currently expecting - unanno_set = set(unanno_headers_ordered) - read_counter = 0 + total_headers_for_annotation = len(unanno_headers_ordered) + reads_with_hits = 0 - # Count total unique sequences from unannotated file - if os.path.exists(unanno_file_path): - with open(unanno_file_path, 'r') as f: - for line in f: - if "count=" in line: - count = int(line.split("count=")[1].split(";")[0]) - total_unique_count += count - - # Process annotated file - from collections import OrderedDict - blast_groups = OrderedDict() # q_id -> list of hit dicts - - # Group BLAST hits per q_id - try: - with open(anno_file_path, 'r') as f: - for line in f: - if line.startswith("#"): - continue - parts = line.strip().split('\t') - if len(parts) < 7: - continue - - q_id = parts[0] - try: - e_val = float(parts[6]) - except Exception: - continue - if e_val > args.eval_threshold: - continue - valid_taxa = True - taxon = parts[-1].strip() - if is_invalid_taxon(taxon): - print(f"Skipping {q_id}: invalid taxon: {taxon}") - valid_taxa = False + for header in unanno_headers_ordered: + if header not in blast_groups: + continue + hits = blast_groups[header] + current_read_info = process_single_query(header, hits, args.bitscore_perc_cutoff) + read_data.append(current_read_info) + reads_with_hits += 1 - identity = parts[4] if len(parts) > 4 else '' - cov = parts[5] if len(parts) > 5 else '' - bitscore = float(parts[7] if len(parts) > 7 else '') - source = parts[8] if len(parts) > 8 else '' - if valid_taxa: - hit = { - 'e_val': e_val, - 'identity': identity, - 'cov': cov, - 'bitscore': bitscore, - 'source': source, - 'taxon': taxon - } - blast_groups.setdefault(q_id, []).append(hit) - except Exception as e: - print(f"Error reading BLAST file {anno_file_path}: {e}") - return - - # BLAST-reads not in fasta - extra_blast_qids = [q for q in blast_groups.keys() if q not in unanno_set] - if extra_blast_qids: - print(f"Note: {len(extra_blast_qids)} BLAST q_ids not in FASTA: {extra_blast_qids[:10]}...") - - # Process fasta order - for header in unanno_headers_ordered: - if header not in blast_groups: - print(f"Skipping {header}: no BLAST hits") - continue - current_read_info = { - 'taxa_dict': {}, - 'header': header, - 'e_values': set(), - 'identities': set(), - 'coverages': set(), - 'sources': set(), - 'bitscores': set(), - 'best_eval': float(), - 'best_identity': '', - 'best_coverage': '', - 'best_source': '', - 'best_bitscore': -1, - 'best_taxa_dict' : {} - } + if header not in seen_reads: + seen_reads.add(header) + headers.append(header) + try: + count = int(header.split('(')[1].split(')')[0]) + except (IndexError, ValueError, AttributeError) as e: + log_message(log_messages, f"Error {e}: could not parse count in header: {header}") + count = 0 + annotated_unique_count += count - # process all hits for this header - for hit in blast_groups[header]: - e_val = hit['e_val'] - identity = hit['identity'] - cov = hit['cov'] - bitscore = hit['bitscore'] - source = hit['source'] - taxon = hit['taxon'] - - current_read_info['e_values'].add(str(e_val)) - current_read_info['identities'].add(identity) - current_read_info['coverages'].add(cov) - current_read_info['sources'].add(source) - current_read_info['bitscores'].add(bitscore) + reads_without_hits = total_headers_for_annotation - reads_with_hits + log_message(log_messages, f"ANNOTATION: total FASTA headers considered: {total_headers_for_annotation}") + log_message(log_messages, f"ANNOTATION: reads with BLAST hits: {reads_with_hits}") + log_message(log_messages, f"ANNOTATION: reads without BLAST hits: {reads_without_hits}") + log_message(log_messages, f"ANNOTATION: unique annotated count (from header counts): {annotated_unique_count}") + log_message(log_messages, f"ANNOTATION: total unique count (from FASTA): {total_unique_count}") - # Keep track of best hit - if (bitscore > current_read_info['best_bitscore'] or - (bitscore == current_read_info['best_bitscore'] and e_val < current_read_info['best_eval'])): - # New best set → reset - current_read_info['best_bitscore'] = bitscore - current_read_info['best_eval'] = e_val - current_read_info['best_identity'] = identity - current_read_info['best_coverage'] = cov - current_read_info['best_source'] = source - current_read_info['best_taxa_dict'] = {taxon: 1} - - # If hit is of same quality: add to existing dictionary - elif bitscore == current_read_info['best_bitscore'] and e_val == current_read_info['best_eval']: - td = current_read_info['best_taxa_dict'] - td[taxon] = td.get(taxon, 0) + 1 - # Might be unnecesary, best_taxa_dict seems to be the same as taxa_dict - # in output. Too scared to remove. - if len(current_read_info['e_values']) == 1: - taxa_dict = current_read_info['taxa_dict'] - taxa_dict[taxon] = taxa_dict.get(taxon, 0) + 1 - read_data.append(current_read_info) - - if header not in seen_reads: - seen_reads.add(header) - headers.append(header) - try: - count = int(header.split('(')[1].split(')')[0]) - except IndexError or ValueError or AttributeError: - count = 0 - annotated_unique_count += count - - # Extract data for different functions from unified structure taxa_dicts = [read['taxa_dict'] for read in read_data] td = [read['best_taxa_dict'] for read in read_data] - # Generate outputs based on arguments + if args.eval_plot: e_val_sets = [read['e_values'] for read in read_data] - make_eval_plot(e_val_sets, args.eval_plot) + make_eval_plot(e_val_sets, args.eval_plot, log_messages) + log_message(log_messages, f"E-value plot written succesfully") if args.taxa_output: process_taxa_output(td, args.taxa_output, args.uncertain_threshold) + log_message(log_messages, f"Taxa summary written succesfully") if args.header_anno: - # Extract best values efficiently - single list comprehension per metric - e_values_for_headers = [[read['best_eval']] for read in read_data] - identity_for_headers = [[read['best_identity']] for read in read_data] - coverage_for_headers = [[read['best_coverage']] for read in read_data] - source_for_headers = [[read['best_source']] for read in read_data] - bitscore_for_headers= [[read['best_bitscore']] for read in read_data] - process_header_annotations(td, headers, e_values_for_headers, - args.header_anno, args.uncertain_threshold, - identity_for_headers, coverage_for_headers, source_for_headers, bitscore_for_headers) + source_for_headers = [[read['sources']] for read in read_data] + seq_id_for_headers = [[read['seq_id']] for read in read_data] + process_header_annotations(td, headers, args.header_anno, args.uncertain_threshold, source_for_headers, + seq_id_for_headers, log_messages) + log_message(log_messages, f"Header annotations written succesfully") if args.circle_data: - create_circle_diagram(td, args.circle_data, args.use_counts, - args.uncertain_threshold) + create_circle_data(td, args.circle_data, args.use_counts, args.uncertain_threshold) + log_message(log_messages, f"Circle diagram JSON written succesfully") + + stats = calculate_annotation_stats(len(taxa_dicts), unanno_file_path, annotated_unique_count, + int(total_unique_count)) + + log_message(log_messages, "=== ANNOTATION STATISTICS ===") + for key, value in stats.items(): + log_message(log_messages, f"{key}: {value}") + + with open(args.log, 'w') as f: + f.write("\n".join(log_messages)) + - if args.anno_stats: - stats = calculate_annotation_stats(len(taxa_dicts), unanno_file_path, - annotated_unique_count, total_unique_count) - with open(args.anno_stats, 'w') as f: - f.write('metric\tvalue\n') - for key, value in stats.items(): - f.write(f'{key}\t{value}\n') +def check_header_string(header_line, invalid_header_string, invalid_line_string): + """ + Checks the header input for string that it needs to be filtered on + """ + invalid_string_list = parse_list_param(invalid_header_string) + parse_list_param(invalid_line_string) + for string in invalid_string_list: + if string.lower() == 'singleton': + string = "obiclean_status={'XXX': 's'}" + elif string.lower() == 'variant': + string = "obiclean_status={'XXX': 'i'}" + elif string.lower() == 'head': + string = "obiclean_status={'XXX': 'h'}" + elif string.lower() == 'pairend' or string.lower() == 'paired end': + string = "PairEnd" + elif string.lower() == 'consensus' or string.lower() == 'cons': + string = "CONS" + if string in header_line: + return False + return True -def is_invalid_taxon(taxon: str) -> bool: + +def is_insufficient_hit(hit, min_cov, min_id, min_bit, min_eval): + """ + Checks if hit has values that pass the thresholds + """ + if float(hit['cov']) < min_cov: + return True + elif float(hit['identity']) < min_id: + return True + elif float(hit['bitscore']) < min_bit: + return True + elif int(hit['e_val']) > min_eval: + return True + else: + return False + + +def is_invalid_taxon(taxon: str, invalid_taxa, invalid_ranks) -> bool: """ Determine whether a given taxonomic path should be considered invalid and excluded from analysis. - - This function identifies and filters out taxonomic strings that contain unreliable, - incomplete, or placeholder taxonomic information. It is used to clean BLAST or - classification outputs before aggregation and visualization. - - :param taxon: Full taxonomic path as a string - :type taxon: str - :return: ``True`` if the taxon is considered invalid according to the criteria below; - otherwise ``False``. - :rtype: bool - - :criteria: - A taxon is considered invalid if: - - It contains the substring ``"invalid taxid"`` or ``"GenBank invalid taxon"``. - - It includes the term ``"environmental sample"`` (non-specific environmental sequence). - - It has any hierarchical level labeled as ``"unknown"`` followed by a more - specific level (e.g., *unknown genus / Homo sapiens*). - - :note: - This function performs a **case-insensitive** check and assumes the - input taxonomic path uses ``" / "`` as a level delimiter. """ + invalid_string_list = parse_list_param(invalid_ranks) + parse_list_param(invalid_taxa) taxon_lower = taxon.lower() - - if "invalid taxid" in taxon_lower or "genbank invalid taxon" in taxon_lower: - return True - - if "environmental sample" in taxon_lower: - return True - - # check for "unknown" followed by something deeper - parts = taxon.split(" / ") - for i, part in enumerate(parts): - if "unknown" in part.lower() and i < len(parts) - 1: - # there is something more specific after an unknown level + for string in invalid_string_list: + if string in taxon_lower: return True return False +def parse_list_param(param): + """ + Safely convert a comma-separated Galaxy text parameter into a list. + """ + if not param: + return [] + return [x.strip() for x in param.strip(', \n').split(',') if x.strip()] + + def main(arg_list=None): """ Entry point for Galaxy-compatible BLAST annotation processing. @@ -901,13 +865,11 @@ Notes: Calls `process_single_file` with parsed arguments. """ + log_messages = [] args = parse_arguments(arg_list) - for output_file in [args.eval_plot, args.header_anno, args.taxa_output, args.circle_data, args.anno_stats]: - if output_file: - os.makedirs(os.path.dirname(output_file), exist_ok=True) - process_single_file(args.input_anno, args.input_unanno, args) + process_single_file(args.input_anno, args.input_unanno, args, log_messages) print("Processing completed successfully") if __name__ == "__main__": - main() \ No newline at end of file + main()
