Mercurial > repos > onnodg > blast_annotations_processor
view blast_annotations_processor.py @ 2:9ca209477dfd draft default tip
planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_annotations_tool commit 4017d38cf327c48a6252e488ba792527dae97a70-dirty
| author | onnodg |
|---|---|
| date | Mon, 15 Dec 2025 16:43:36 +0000 |
| parents | a3989edf0a4a |
| children |
line wrap: on
line source
"""Galaxy-compatible BLAST annotation processor. This script processes a single annotated BLAST file along with a FASTA file containing unannotated reads. It generates multiple types of outputs for integration with Galaxy workflows: - E-value distribution plots (PNG) - Taxonomic composition reports (text) - Circular taxonomy diagram data (JSON) - Header annotations with merged and per-read information (Excel) - Annotation statistics summary (text) Main workflow: 1. Parse command-line arguments. 2. Load annotated BLAST results and unannotated FASTA headers. 3. Group BLAST hits per query (q_id), filter by thresholds. 4. Resolve taxonomic conflicts with uncertainty rules. 5. Generate requested outputs. Notes: - Headers in BLAST and FASTA should correspond. - Uses matplotlib, pandas, and openpyxl for visualization and reporting. """ import argparse import json import os from collections import OrderedDict from collections import defaultdict import matplotlib.pyplot as plt import numpy as np import pandas as pd # Default taxonomic levels TAXONOMIC_LEVELS = ["K", "P", "C", "O", "F", "G", "S"] def parse_arguments(arg_list=None): """Parse command line arguments for cluster processing.""" parser = argparse.ArgumentParser(description='Process BLAST annotation results for Galaxy') parser.add_argument('--input-anno', required=True, help='Annotated BLAST output file (tabular format)') parser.add_argument('--input-unanno', required=True, help='Unannotated sequences file (FASTA format)') parser.add_argument('--eval-plot', help='Output path for E-value plot (PNG)') parser.add_argument('--taxa-output', help='Output path for taxa report (tabular)') parser.add_argument('--circle-data', help='Output path for circular taxonomy data (txt)') parser.add_argument('--header-anno', help='Output path for header annotations (tabular/xlsx)') parser.add_argument('--log', help='Output path for log file (txt)', required=True) parser.add_argument('--filtered-fasta', required=True, help='Filtered fasta file (fasta format) for downstream analysis') parser.add_argument('--uncertain-threshold', type=float, default=0.9, required=True, help='Threshold for resolving taxonomic conflicts (default: 0.9)') parser.add_argument('--eval-threshold', default='1e-10', type=float, required=True, help='E-value threshold for filtering results (default: 1e-10)') parser.add_argument('--use-counts', action='store_true', default=False, required=False, help='Use read counts in circular data') parser.add_argument('--ignore-rank', default='unknown', required=False, help='Ignore rank when containing this text') parser.add_argument('--ignore-taxonomy', default='environmental', required=False, help="Don't use taxonomy containing this taxonomy") parser.add_argument('--bitscore-perc-cutoff', type=float, default=8, required=True, help='Bitscore percentage cutoff for considered hits') parser.add_argument('--min-bitscore', type=int, default=0, required=True, help='Minimum bitscore threshold for hits') parser.add_argument('-iot', '--ignore-obiclean-type', type=str, default='singleton', required=False, help='Ignore sequences with this obiclean type') parser.add_argument('-iit', '--ignore-illuminapairend-type', type=str, default='pairend', required=False, help='Ignore sequences with this illumina paired end output type') parser.add_argument('--min-identity', type=int, default=0, required=True, help='Minimum sequence identity to consider a hit') parser.add_argument('--min-coverage', type=int, default=0, required=True, help='Minimum sequence coverage to consider a hit') parser.add_argument('--ignore-seqids', type=str, default='', required=False, help='Ignore sequences with this sequence identifier') parser.add_argument('--min-support', type=int, default=0, required=True, help='A taxon is kept only if it (or its descendants) have at least N reads assigned.') return parser.parse_args(arg_list) def log_message(log_messages, msg): """ Helper to both print and collect log messages. """ if log_messages is not None: log_messages.append(msg) def list_to_string(x): """ Convert a list, pandas Series, or numpy array to a comma-separated string. """ if isinstance(x, (list, pd.Series, np.ndarray)): return ", ".join(map(str, x)) return str(x) def make_eval_plot(e_val_sets, output_path, log_messages): """ Generate an E-value distribution plot. This function aggregates E-values per read, transforms them onto a log axis and produces a visual summary of the distribution of best hits across reads. If no E-values are available, no plot is created. :param e_val_sets: Set of E-values per read. :type e_val_sets: list[set[str]] :param output_path: Output PNG file. :type output_path: str :param log_messages: Log collection list. :type log_messages: list[str] :return: None :rtype: None """ if not e_val_sets: log_message(log_messages, "No E-values to plot") return None processed_sets = sorted([sorted(float(e_val) for e_val in e_val_set) for e_val_set in e_val_sets]) bar_positions = [] bar_heights = [] for i, e_set in enumerate(processed_sets): if len(e_set) == 1: e_set.append(0) for j, e_val in enumerate(e_set): bar_positions.append(i + j * 0.29) if e_val != 0: bar_heights.append(1 / e_val) else: bar_heights.append(e_val) plt.figure(figsize=(21, 9)) plt.bar(bar_positions, bar_heights, width=0.29, color=['blue', 'red']) plt.yscale('log') plt.grid(axis='y', linestyle='--', color='gray', alpha=0.7) plt.ylabel("e-values") plt.xticks(ticks=range(len(processed_sets)), labels=[f'{i + 1}' for i in range(len(processed_sets))]) plt.tight_layout() plt.savefig(output_path, format='png') plt.close() return None def calculate_annotation_stats(anno_count, unanno_file_path, unique_anno_count, total_unique_count): """ Compute annotation statistics for a dataset. This function calculates summary statistics for annotated and unannotated sequences in a dataset. It counts the total number of sequences in the unannotated file (FASTA-style, based on lines starting with '>'), and computes the percentage of annotated sequences and unique annotated sequences relative to their totals. :param anno_count: Total number of annotated sequences. :type anno_count: int :param unanno_file_path: Path to a FASTA file containing unannotated sequences. :type unanno_file_path: str :param unique_anno_count: Number of unique annotated sequences. :type unique_anno_count: int :param total_unique_count: Total number of unique sequences in the dataset. :type total_unique_count: int :return: Dictionary containing: - 'percentage_annotated' (float): Percentage of annotated sequences. - 'annotated_sequences' (int): Number of annotated sequences. - 'total_sequences' (int): Total number of sequences (annotated + unannotated). - 'percentage_unique_annotated' (float): Percentage of unique annotated sequences. - 'unique_annotated' (int): Number of unique annotated sequences. - 'total_unique' (int): Total number of unique sequences. :rtype: dict[str, float | int] """ total_sequences = 0 with open(unanno_file_path, 'r') as f: total_sequences = sum(1 for line in f if line.startswith('>')) percentage_annotated = (anno_count / total_sequences * 100) if total_sequences > 0 else 0 percentage_unique_annotated = (unique_anno_count / total_unique_count * 100) if total_unique_count > 0 else 0 return {'percentage_annotated': percentage_annotated, 'annotated_sequences': anno_count, 'total_sequences': total_sequences, 'percentage_unique_annotated': percentage_unique_annotated, 'unique_annotated': unique_anno_count, 'total_unique': total_unique_count} def resolve_tax_majority(taxon_counts, uncertain_threshold): total_count = sum(taxon_counts.values()) if total_count == 0: return "Uncertain taxa", "Uncertain taxa" most_common_taxon, count = max(taxon_counts.items(), key=lambda x: x[1]) # Use most common if above threshold if count / total_count >= uncertain_threshold: return most_common_taxon, most_common_taxon conflicting_taxa = list(taxon_counts.keys()) conflicting_levels = [taxon.split(" / ") for taxon in conflicting_taxa] # Resolve uncertainty per level for level_idx in range(min(len(level) for level in conflicting_levels)): current_level_names = {level[level_idx] for level in conflicting_levels} if len(current_level_names) > 1: lowest_common_path = conflicting_levels[0][:level_idx + 1] lowest_common_path[-1] = 'Uncertain taxa' small_output = " / ".join(lowest_common_path) min_conflicting_level = min(len(level) for level in conflicting_levels) uncertain_path = conflicting_levels[0][:level_idx + 1] uncertain_path[-1] = 'Uncertain taxa' for _ in range(level_idx + 1, min_conflicting_level): uncertain_path.append('Uncertain taxa') return small_output, " / ".join(uncertain_path) return conflicting_taxa[0], conflicting_taxa[0] def process_taxa_output(taxa_dicts, output_path, uncertain_threshold): """ Generate a hierarchical taxa summary file. This function resolves best taxonomic assignments per read and aggregates counts into taxonomic levels, producing a readable text report listing percentages, counts and resolved taxonomy. :param taxa_dicts: Best taxa dictionaries per read. :type taxa_dicts: list[dict] :param output_path: Path of output report. :type output_path: str :param uncertain_threshold: Confidence value for majority voting. :type uncertain_threshold: float :return: None :rtype: None """ uncertain_dict = {level: 0 for level in TAXONOMIC_LEVELS} aggregated_counts = defaultdict(int) for read_taxa in taxa_dicts: if not read_taxa: continue resolved_taxon, _ = resolve_tax_majority(read_taxa, uncertain_threshold) # Add counts for resolved taxon levels = resolved_taxon.split(" / ") for i in range(1, len(levels) + 1): aggregated_counts[" / ".join(levels[:i])] += 1 total_count = sum(value for key, value in aggregated_counts.items() if len(key.split(" / ")) == 1) report_lines = [] for taxonomy, count in sorted(aggregated_counts.items(), key=lambda x: x[0]): levels = taxonomy.split(" / ") indent = " " * (len(levels) - 1) * 2 taxon_name = levels[-1] taxon_level_code = TAXONOMIC_LEVELS[len(levels) - 1] if len(levels) <= len(TAXONOMIC_LEVELS) else "U" percentage = (count / total_count) * 100 if total_count > 0 else 0 report_lines.append(f"{percentage:.2f}\t{count}\t{total_count}\t{taxon_level_code}\t{indent}{taxon_name}") if taxon_name == 'Uncertain taxa': uncertain_dict[taxon_level_code] += count with open(output_path, 'w') as f: f.write("Uncertain count per taxonomic level" + str(uncertain_dict) + '\n') f.write('percentage_rooted\tnumber_rooted\ttotal_num\ttaxon_level\tidentification\n') f.write("\n".join(report_lines)) def process_header_annotations(taxa_dicts, headers, output_path, uncertain_threshold, source_list, seq_id_list, log_messages): """ Create an Excel report summarizing individual and merged read annotations. This function converts best taxonomy assignments into a per-read table, generates an aggregated table grouped by taxon, and saves both tables to separate sheets in a single Excel file. :param taxa_dicts: Taxonomy assignments per read. :type taxa_dicts: list[dict] :param headers: Original FASTA headers. :type headers: list[str] :param output_path: Output Excel file name. :type output_path: str :param uncertain_threshold: Confidence level for taxonomy conflict resolution. :type uncertain_threshold: float :param source_list: Source identifiers per read. :type source_list: list[list[str]] :param seq_id_list: Sequence identifiers per read. :type seq_id_list: list[list[str]] :param log_messages: Log collection list. :type log_messages: list[str] :return: None :rtype: None """ report_lines = [] for i, read_taxa in enumerate(taxa_dicts): if not read_taxa: continue _, resolved_taxon_long = resolve_tax_majority(read_taxa, uncertain_threshold) source = source_list[i][0] if source_list[i] else "N/A" seq_id = seq_id_list[i][0] if seq_id_list[i] else "N/A" header = headers[i] if i < len(headers) else f"Header missing" # Extract count try: header_base, count_str = header.rsplit("(", 1) count = int(count_str.rstrip(")")) except ValueError as e: log_message(log_messages, f'Failed extracting count: {e}') header_base = header count = 1 report_lines.append(f'{header_base}\t{seq_id}\t{source}\t{count}\t{resolved_taxon_long}') # temp path is needed to write to a xlsx from galaxy temp_tsv_path = 'temp.tsv' try: with open(temp_tsv_path, 'w') as f: f.write('header\tseq_id\tsource\tcount\ttaxa\n') f.write("\n".join(report_lines)) except PermissionError as e: log_message(log_messages, f"Unable to write to file, error: {e} file might be opened") df = pd.read_csv(temp_tsv_path, sep='\t', encoding="latin1") if not df.empty: taxa_split = df["taxa"].str.split(" / ", expand=True) max_levels = taxa_split.shape[1] if taxa_split.shape[1] is not None else 7 level_names = ["kingdom", "phylum", "class", "order", "family", "genus", "species"][:max_levels] taxa_split.columns = level_names df_individual = pd.concat([df, taxa_split], axis=1) df_individual = df_individual.sort_values(['species', 'genus', 'family'], ascending=[True, True, True]) df_for_merge = df_individual.copy() group_cols = ["taxa"] agg_dict = {"seq_id": lambda x: list_to_string(list(x.unique())), "count": "sum", "source": lambda x: list_to_string(list(x.unique())), } for level in level_names: if level in df_for_merge.columns: agg_dict[level] = "first" df_merged = df_for_merge.groupby(group_cols, as_index=False).agg(agg_dict) sort_columns = [] sort_ascending = [] for level in ['species', 'genus', 'family']: if level in df_merged.columns: sort_columns.append(level) sort_ascending.append(True) df_merged = df_merged.sort_values(sort_columns, ascending=sort_ascending) temp_path = output_path + ".xlsx" os.makedirs(os.path.dirname(temp_path), exist_ok=True) with pd.ExcelWriter(temp_path, engine='openpyxl', mode='w') as writer: df_individual.to_excel(writer, sheet_name='Individual_Reads', index=False) df_merged.to_excel(writer, sheet_name='Merged_by_Taxa', index=False) for sheet_name in writer.sheets: worksheet = writer.sheets[sheet_name] for column in worksheet.columns: max_length = 0 column_letter = column[0].column_letter for cell in column: try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except AttributeError as e: log_message(log_messages, f"Error {e}, {cell} has no value, something probably went wrong") pass adjusted_width = min(max_length + 2, 50) worksheet.column_dimensions[column_letter].width = adjusted_width os.replace(temp_path, output_path) # Clean up temporary file os.remove(temp_tsv_path) else: log_message(log_messages, "Dataframe empty, no annotation results") def create_circle_data(taxa_dicts, output_path, use_counts, uncertain_threshold): """ Generate circular taxonomy layer data. The function converts resolved taxonomy per read into hierarchical cumulative counts that can be visualized as concentric circles. :param taxa_dicts: Per-read taxonomy assignments. :type taxa_dicts: list[dict] :param output_path: JSON destination file. :type output_path: str :param use_counts: Whether to count total reads or unique taxa. :type use_counts: bool :param uncertain_threshold: Majority resolution threshold. :type uncertain_threshold: float :return: None :rtype: None """ aggregated_counts = defaultdict(int) seen_taxa = set() for read_taxa in taxa_dicts: if not read_taxa: continue _, resolved_taxon_long = resolve_tax_majority(read_taxa, uncertain_threshold) levels = resolved_taxon_long.split(" / ") if use_counts: for i in range(1, len(levels) + 1): aggregated_counts[" / ".join(levels[:i])] += 1 else: if resolved_taxon_long not in seen_taxa: for i in range(1, len(levels) + 1): aggregated_counts[" / ".join(levels[:i])] += 1 seen_taxa.add(resolved_taxon_long) circle_data = [] for level in range(len(TAXONOMIC_LEVELS)): circle_data.append({"labels": [], "sizes": []}) for taxonomy, count in sorted(aggregated_counts.items(), key=lambda x: x[0]): levels = taxonomy.split(" / ") if len(levels) <= len(TAXONOMIC_LEVELS): circle_data[len(levels) - 1]["labels"].append(levels[-1].strip()) circle_data[len(levels) - 1]["sizes"].append(count) with open(output_path, "w") as f: json.dump(circle_data, f, indent=2) def read_unannotated_fasta(unanno_file_path, ignore_illuminapairend_type, ignore_obiclean_type, min_support, filtered_fasta_path, log_messages): """ Read, filter and optionally rewrite an unannotated FASTA file. This function extracts dereplicated sequence counts, applies header filters and a minimum support threshold, collects statistics and, if requested, writes a filtered FASTA file for downstream processing. :param unanno_file_path: Path to FASTA file. :type unanno_file_path: str :param ignore_illuminapairend_type: Illumina pair-end status to remove. :type ignore_illuminapairend_type: str :param ignore_obiclean_type: Obiclean label to remove. :type ignore_obiclean_type: str :param min_support: Minimum dereplicated read count retained. :type min_support: int :param filtered_fasta_path: Optional path for filtered FASTA output. :type filtered_fasta_path: str | None :param log_messages: Log collection list. :type log_messages: list[str] :return: (headers, total_unique_count, invalid_count) :rtype: tuple[list[str], int, int] """ headers = [] total_unique_count = 0 invalid_count = 0 total_headers = 0 low_support_count = 0 header_filter_count = 0 written_fasta_count = 0 if not os.path.exists(unanno_file_path): log_message(log_messages, f"Warning: Unannotated file {unanno_file_path} not found") return headers, total_unique_count, invalid_count write_fasta = filtered_fasta_path is not None filtered_fasta = None if write_fasta: try: filtered_fasta = open(filtered_fasta_path, "w") except OSError as e: log_message(log_messages, f"Error: Cannot open {filtered_fasta_path} for writing: {e}") write_fasta = False write_flag = False with open(unanno_file_path, 'r') as f: for line in f: if line.startswith('>'): total_headers += 1 header_line = line.rstrip("\n") header = header_line.split()[0].strip('>') if "count=" in header_line: count = int(header_line.split("count=")[1].split(";")[0]) else: count = 0 passes_header_filter = check_header_string(header_line, ignore_illuminapairend_type, ignore_obiclean_type) if passes_header_filter and int(count) >= int(min_support): headers.append(header) total_unique_count += count write_flag = True if write_fasta: filtered_fasta.write(line) written_fasta_count += 1 else: # Invalid header (either header filter or low support) if not passes_header_filter: header_filter_count += 1 elif int(count) < int(min_support): low_support_count += 1 invalid_count += 1 write_flag = False else: if write_flag and write_fasta: filtered_fasta.write(line) if write_fasta and filtered_fasta is not None: filtered_fasta.close() log_message(log_messages, f"Filtered FASTA written succesfully" f"({written_fasta_count} sequences)") log_message(log_messages, f"FASTA: total headers: {total_headers}") log_message(log_messages, f"FASTA: headers kept after filters and min_support={min_support}: {len(headers)}") log_message(log_messages, f"FASTA: removed due to header filters (illumina/obiclean/etc.): {header_filter_count}") log_message(log_messages, f"FASTA: removed due to low dereplicated count (<{min_support}): {low_support_count}") log_message(log_messages, f"FASTA: total invalid (header filter + low support): {invalid_count}") return headers, total_unique_count, invalid_count def parse_blast_output(anno_file_path, eval_threshold, ignore_taxonomy, ignore_rank, min_coverage, min_identity, min_bitscore, ignore_seqids, log_messages): """ Parse a BLAST tabular file and retain high-quality hits. This function reads BLAST annotations, applies E-value and quality filters, excludes invalid taxa, groups remaining hits by query identifier, and logs summary statistics. :param anno_file_path: Input BLAST result file. :type anno_file_path: str :param eval_threshold: Maximum allowed E-value. :type eval_threshold: float :param ignore_taxonomy: Taxonomy substring filter. :type ignore_taxonomy: str :param ignore_rank: Rank substring filter. :type ignore_rank: str :param min_coverage: Minimum coverage threshold. :type min_coverage: int :param min_identity: Minimum identity threshold. :type min_identity: int :param min_bitscore: Minimum bitscore threshold. :type min_bitscore: int :param ignore_seqids: Comma-separated seqids to ignore. :type ignore_seqids: str :param log_messages: Log collection list. :type log_messages: list[str] :return: Mapping of q_id → hits :rtype: OrderedDict[str, list[dict]] """ if not os.path.exists(anno_file_path): log_message(log_messages, f"Error: Input file {anno_file_path} not found") return OrderedDict() blast_groups = OrderedDict() total_hits = 0 kept_hits = 0 filtered_hits = 0 invalid_taxon_count = 0 ignored_seqid_count = 0 try: with open(anno_file_path, 'r') as f: for line in f: if line.startswith("#"): continue parts = line.strip().split('\t') if len(parts) < 7: log_message(log_messages, f"less than 7 parts in line, skipped faulty line: {line}") continue q_id = parts[0] total_hits += 1 try: e_val = float(parts[6]) except ValueError as e: log_message(log_messages, f"Error {e}, while extracting evalue") filtered_hits += 1 continue taxon = parts[-1].strip() if is_invalid_taxon(taxon, ignore_taxonomy, ignore_rank): invalid_taxon_count += 1 filtered_hits += 1 continue seq_id = parts[2] if len(parts) > 2 else '' ignore_seqid_set = set(parse_list_param(ignore_seqids)) if seq_id in ignore_seqid_set: ignored_seqid_count += 1 filtered_hits += 1 continue identity = parts[4] if len(parts) > 4 else '' cov = parts[5] if len(parts) > 5 else '' bitscore = float(parts[7]) if len(parts) > 7 and parts[7] else 0.0 source = parts[8] if len(parts) > 8 else '' hit = {'e_val': e_val, 'identity': identity, 'cov': cov, 'bitscore': bitscore, 'source': source, 'taxon': taxon, 'seq_id': seq_id, } if not is_insufficient_hit(hit, min_coverage, min_identity, min_bitscore, eval_threshold): blast_groups.setdefault(q_id, []).append(hit) kept_hits += 1 else: filtered_hits += 1 except Exception as e: log_message(log_messages, f"Error reading BLAST file {anno_file_path}: {e}") return OrderedDict() log_message(log_messages, f"BLAST: total hits read: {total_hits}") log_message(log_messages, f"BLAST: hits kept after quality filters: {kept_hits}") log_message(log_messages, f"BLAST: hits filtered (evalue/coverage/identity/bitscore): {filtered_hits}") log_message(log_messages, f"BLAST: hits removed due to invalid taxon: {invalid_taxon_count}") log_message(log_messages, f"BLAST: hits removed due to ignored seqids: {ignored_seqid_count}") return blast_groups def process_single_query(header, hits, bitscore_perc_cutoff): """ Process all BLAST hits for a single query/header and return a summary dict. """ current_read_info = {'taxa_dict': {}, 'header': header, 'e_values': set(), 'identities': set(), 'coverages': set(), 'sources': set(), 'bitscores': set(), 'seq_id': set(), 'best_taxa_dict': {}} max_bitscore = max(h['bitscore'] for h in hits) if bitscore_perc_cutoff > 0: top_bitscore = float(max_bitscore) * (1 - (float(bitscore_perc_cutoff) / 100.0)) else: top_bitscore = float(max_bitscore) current_read_info['best_taxa_dict'] = defaultdict(int) for hit in hits: e_val = hit['e_val'] identity = hit['identity'] cov = hit['cov'] bitscore = hit['bitscore'] source = hit['source'] taxon = hit['taxon'] seq_id = hit['seq_id'] # Keep track of hits that pass bitscore threshold if bitscore >= top_bitscore: current_read_info['best_taxa_dict'][taxon] += 1 current_read_info['e_values'].add(str(e_val)) current_read_info['identities'].add(identity) current_read_info['coverages'].add(cov) current_read_info['sources'].add(source) current_read_info['bitscores'].add(bitscore) current_read_info['seq_id'].add(seq_id) return current_read_info def process_single_file(anno_file_path, unanno_file_path, args, log_messages): """ Execute the complete BLAST annotation processing workflow. This function performs FASTA parsing, BLAST parsing, taxonomy resolution, per-read and aggregated reporting, summary statistics computation, and generation of optional plot and table outputs. All validations, errors and counters are written into the log collector. :param anno_file_path: BLAST annotation file. :type anno_file_path: str :param unanno_file_path: Input FASTA file. :type unanno_file_path: str :param args: Parsed arguments namespace. :type args: argparse.Namespace :param log_messages: Log collection list. :type log_messages: list[str] :return: None :rtype: None """ log_message(log_messages, f"Starting processing for FASTA") log_message(log_messages, "=== PARAMETERS USED ===") skip_keys = { "input_anno", "input_unanno", "eval_plot", "taxa_output", "circle_data", "header_anno", "log", "filtered_fasta", } for key, value in vars(args).items(): if key in skip_keys: continue log_message(log_messages, f"{key}: {value}") log_message(log_messages, "=== END PARAMETERS ===") unanno_headers_ordered, total_unique_count, invalid_count = read_unannotated_fasta(unanno_file_path, args.ignore_illuminapairend_type, args.ignore_obiclean_type, args.min_support, args.filtered_fasta, log_messages) unanno_set = set(unanno_headers_ordered) if not os.path.exists(anno_file_path): log_message(log_messages, f"Error: Input file {anno_file_path} not found") with open(args.log, 'w') as f: print('gaat nog niet goed hoor') f.write("\n".join(log_messages)) return log_message(log_messages, f"Reading BLAST annotations") blast_groups = parse_blast_output(anno_file_path, args.eval_threshold, args.ignore_taxonomy, args.ignore_rank, args.min_coverage, args.min_identity, args.min_bitscore, args.ignore_seqids, log_messages) extra_blast_qids = [q for q in blast_groups.keys() if q not in unanno_set] if extra_blast_qids: log_message(log_messages, f"Note: {len(extra_blast_qids)} BLAST q_ids not in FASTA (showing up to 10): {extra_blast_qids[:10]}") read_data = [] headers = [] seen_reads = set() annotated_unique_count = 0 total_headers_for_annotation = len(unanno_headers_ordered) reads_with_hits = 0 for header in unanno_headers_ordered: if header not in blast_groups: continue hits = blast_groups[header] current_read_info = process_single_query(header, hits, args.bitscore_perc_cutoff) read_data.append(current_read_info) reads_with_hits += 1 if header not in seen_reads: seen_reads.add(header) headers.append(header) try: count = int(header.split('(')[1].split(')')[0]) except (IndexError, ValueError, AttributeError) as e: log_message(log_messages, f"Error {e}: could not parse count in header: {header}") count = 0 annotated_unique_count += count reads_without_hits = total_headers_for_annotation - reads_with_hits log_message(log_messages, f"ANNOTATION: total FASTA headers considered: {total_headers_for_annotation}") log_message(log_messages, f"ANNOTATION: reads with BLAST hits: {reads_with_hits}") log_message(log_messages, f"ANNOTATION: reads without BLAST hits: {reads_without_hits}") log_message(log_messages, f"ANNOTATION: unique annotated count (from header counts): {annotated_unique_count}") log_message(log_messages, f"ANNOTATION: total unique count (from FASTA): {total_unique_count}") taxa_dicts = [read['taxa_dict'] for read in read_data] td = [read['best_taxa_dict'] for read in read_data] if args.eval_plot: e_val_sets = [read['e_values'] for read in read_data] make_eval_plot(e_val_sets, args.eval_plot, log_messages) log_message(log_messages, f"E-value plot written succesfully") if args.taxa_output: process_taxa_output(td, args.taxa_output, args.uncertain_threshold) log_message(log_messages, f"Taxa summary written succesfully") if args.header_anno: source_for_headers = [[read['sources']] for read in read_data] seq_id_for_headers = [[read['seq_id']] for read in read_data] process_header_annotations(td, headers, args.header_anno, args.uncertain_threshold, source_for_headers, seq_id_for_headers, log_messages) log_message(log_messages, f"Header annotations written succesfully") if args.circle_data: create_circle_data(td, args.circle_data, args.use_counts, args.uncertain_threshold) log_message(log_messages, f"Circle diagram JSON written succesfully") stats = calculate_annotation_stats(len(taxa_dicts), unanno_file_path, annotated_unique_count, int(total_unique_count)) log_message(log_messages, "=== ANNOTATION STATISTICS ===") for key, value in stats.items(): log_message(log_messages, f"{key}: {value}") with open(args.log, 'w') as f: f.write("\n".join(log_messages)) def check_header_string(header_line, invalid_header_string, invalid_line_string): """ Checks the header input for string that it needs to be filtered on """ invalid_string_list = parse_list_param(invalid_header_string) + parse_list_param(invalid_line_string) for string in invalid_string_list: if string.lower() == 'singleton': string = "obiclean_status={'XXX': 's'}" elif string.lower() == 'variant': string = "obiclean_status={'XXX': 'i'}" elif string.lower() == 'head': string = "obiclean_status={'XXX': 'h'}" elif string.lower() == 'pairend' or string.lower() == 'paired end': string = "PairEnd" elif string.lower() == 'consensus' or string.lower() == 'cons': string = "CONS" if string in header_line: return False return True def is_insufficient_hit(hit, min_cov, min_id, min_bit, min_eval): """ Checks if hit has values that pass the thresholds """ if float(hit['cov']) < min_cov: return True elif float(hit['identity']) < min_id: return True elif float(hit['bitscore']) < min_bit: return True elif int(hit['e_val']) > min_eval: return True else: return False def is_invalid_taxon(taxon: str, invalid_taxa, invalid_ranks) -> bool: """ Determine whether a given taxonomic path should be considered invalid and excluded from analysis. """ invalid_string_list = parse_list_param(invalid_ranks) + parse_list_param(invalid_taxa) taxon_lower = taxon.lower() for string in invalid_string_list: if string in taxon_lower: return True return False def parse_list_param(param): """ Safely convert a comma-separated Galaxy text parameter into a list. """ if not param: return [] return [x.strip() for x in param.strip(', \n').split(',') if x.strip()] def main(arg_list=None): """ Entry point for Galaxy-compatible BLAST annotation processing. :param arg_list: Optional list of command-line arguments to override ``sys.argv``. Primarily used for testing or programmatic execution. If ``None``, arguments are read directly from the command line. :type arg_list: list[str] | None :return: None :rtype: None Notes: Calls `process_single_file` with parsed arguments. """ log_messages = [] args = parse_arguments(arg_list) process_single_file(args.input_anno, args.input_unanno, args, log_messages) print("Processing completed successfully") if __name__ == "__main__": main()
