comparison blast_annotations_processor.py @ 0:a3989edf0a4a draft

planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_annotations_tool commit c944fd5685f295acba06679e85b67973c173b137
author onnodg
date Tue, 14 Oct 2025 09:08:30 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:a3989edf0a4a
1 """Galaxy-compatible BLAST annotation processor.
2
3 This script processes a single annotated BLAST file along with a FASTA file
4 containing unannotated reads. It generates multiple types of outputs for
5 integration with Galaxy workflows:
6
7 - E-value distribution plots (PNG)
8 - Taxonomic composition reports (text)
9 - Circular taxonomy diagram data (JSON)
10 - Header annotations with merged and per-read information (Excel)
11 - Annotation statistics summary (text)
12
13 Main workflow:
14 1. Parse command-line arguments.
15 2. Load annotated BLAST results and unannotated FASTA headers.
16 3. Group BLAST hits per query (q_id), filter by thresholds.
17 4. Resolve taxonomic conflicts with uncertainty rules.
18 5. Generate requested outputs.
19
20 Notes:
21 - Headers in BLAST and FASTA should correspond.
22 - Uses matplotlib, pandas, and openpyxl for visualization and reporting.
23 """
24
25 import argparse
26 from collections import defaultdict
27 import json
28 import os
29 import matplotlib.pyplot as plt
30 import pandas as pd
31 import numpy as np
32 import openpyxl
33
34 # Default taxonomic levels
35 TAXONOMIC_LEVELS = ["K", "P", "C", "O", "F", "G", "S"]
36
37 def parse_arguments(arg_list=None):
38 """Parse command line arguments for cluster processing."""
39 parser = argparse.ArgumentParser(
40 description='Process BLAST annotation results for Galaxy'
41 )
42 # Required inputs
43 parser.add_argument('--input-anno', required=True,
44 help='Annotated BLAST output file (tabular format)')
45 parser.add_argument('--input-unanno', required=True,
46 help='Unannotated sequences file (FASTA format)')
47 # Optional outputs
48 parser.add_argument('--eval-plot',
49 help='Output path for E-value plot (PNG)')
50 parser.add_argument('--taxa-output',
51 help='Output path for taxa report (tabular)')
52 parser.add_argument('--circle-data',
53 help='Output path for circular taxonomy data (txt)')
54 parser.add_argument('--header-anno',
55 help='Output path for header annotations (tabular/xlsx)')
56 parser.add_argument('--anno-stats',
57 help='Output path for annotation statistics (txt)')
58 # Parameters
59 parser.add_argument('--uncertain-threshold', type=float, default=0.9,
60 help='Threshold for resolving taxonomic conflicts (default: 0.9)')
61 parser.add_argument('--eval-threshold', type=float, default=1e-10,
62 help='E-value threshold for filtering results (default: 1e-10)')
63 parser.add_argument('--use-counts', action='store_true', default=True,
64 help='Use read counts in circular data')
65
66 return parser.parse_args(arg_list)
67
68
69 def list_to_string(x):
70 """
71 Convert a list, pandas Series, or numpy array to a comma-separated string."""
72 if isinstance(x, (list, pd.Series, np.ndarray)):
73 return ", ".join(map(str, x))
74 return str(x)
75
76 def make_eval_plot(e_val_sets, output_path):
77 """
78 Create a bar plot of E-values per read.
79
80 :param e_val_sets: List of sets containing E-values per read.
81 :type e_val_sets: list[set[str]]
82 :param output_path: Path to save the plot (PNG).
83 :type output_path: str
84 :return: None
85 :rtype: None
86 """
87 if not e_val_sets:
88 print("No E-values to plot")
89 return None
90
91 # Convert string E-values to floats and sort
92 processed_sets = sorted([sorted(float(e_val) for e_val in e_val_set)
93 for e_val_set in e_val_sets])
94
95 bar_positions = []
96 bar_heights = []
97
98 for i, e_set in enumerate(processed_sets):
99 if len(e_set) == 1:
100 e_set.append(0)
101 for j, e_val in enumerate(e_set):
102 bar_positions.append(i + j * 0.29)
103 if e_val != 0:
104 bar_heights.append(1 / e_val)
105 else:
106 bar_heights.append(e_val)
107
108 plt.figure(figsize=(21, 9))
109 plt.bar(bar_positions, bar_heights, width=0.29, color=['blue', 'red'])
110
111 plt.yscale('log')
112 plt.grid(axis='y', linestyle='--', color='gray', alpha=0.7)
113 plt.ylabel("e-values")
114 plt.xticks(ticks=range(len(processed_sets)),
115 labels=[f'{i + 1}' for i in range(len(processed_sets))])
116
117 plt.tight_layout()
118 plt.savefig(output_path, format='png')
119 plt.close()
120 return None
121
122
123 def calculate_annotation_stats(anno_count, unanno_file_path, unique_anno_count, total_unique_count):
124 """
125 Compute annotation statistics for a dataset.
126
127 This function calculates summary statistics for annotated and unannotated
128 sequences in a dataset. It counts the total number of sequences in the
129 unannotated file (FASTA-style, based on lines starting with '>'), and
130 computes the percentage of annotated sequences and unique annotated
131 sequences relative to their totals.
132
133 :param anno_count: Total number of annotated sequences.
134 :type anno_count: int
135 :param unanno_file_path: Path to a FASTA file containing unannotated sequences.
136 :type unanno_file_path: str
137 :param unique_anno_count: Number of unique annotated sequences.
138 :type unique_anno_count: int
139 :param total_unique_count: Total number of unique sequences in the dataset.
140 :type total_unique_count: int
141 :return: Dictionary containing:
142 - 'percentage_annotated' (float): Percentage of annotated sequences.
143 - 'annotated_sequences' (int): Number of annotated sequences.
144 - 'total_sequences' (int): Total number of sequences (annotated + unannotated).
145 - 'percentage_unique_annotated' (float): Percentage of unique annotated sequences.
146 - 'unique_annotated' (int): Number of unique annotated sequences.
147 - 'total_unique' (int): Total number of unique sequences.
148 :rtype: dict[str, float | int]
149 """
150 # Count total sequences in unannotated file
151 total_sequences = 0
152 with open(unanno_file_path, 'r') as f:
153 total_sequences = sum(1 for line in f if line.startswith('>'))
154
155 percentage_annotated = (anno_count / total_sequences * 100) if total_sequences > 0 else 0
156 percentage_unique_annotated = (unique_anno_count / total_unique_count * 100) if total_unique_count > 0 else 0
157
158 return {
159 'percentage_annotated': percentage_annotated,
160 'annotated_sequences': anno_count,
161 'total_sequences': total_sequences,
162 'percentage_unique_annotated': percentage_unique_annotated,
163 'unique_annotated': unique_anno_count,
164 'total_unique': total_unique_count
165 }
166
167 def _choose_taxon(level_counts: dict, threshold: float):
168 """
169 Determine the most representative taxonomic level based on counts and a threshold.
170
171 This function evaluates a dictionary of taxonomic level counts and returns the
172 level name that holds a strict majority. If no level reaches the threshold, or if
173 there is a tie for the highest count, the function returns ``None``.
174
175 :param level_counts: A dictionary mapping taxonomic level names (e.g., "species",
176 "genus") to their respective counts.
177 :type level_counts: dict[str, int]
178 :param threshold: The minimum fraction (between 0 and 1) of the total counts that
179 the top taxonomic level must have to be considered dominant.
180 :type threshold: float
181
182 :return: The name of the taxonomic level that meets or exceeds the threshold and
183 is not tied with another level. Returns ``None`` if no level qualifies or
184 if there is a tie.
185 :rtype: str | None
186
187 :example:
188 _choose_taxon({"species": 6, "genus": 3, "family": 1}, 0.5)
189 'species'
190 _choose_taxon({"species": 4, "genus": 4}, 0.6)
191 None
192 _choose_taxon({"species": 2, "genus": 3}, 0.6)
193 'genus'
194 """
195 total = sum(level_counts.values())
196 if total == 0:
197 return None
198 items = sorted(level_counts.items(), key=lambda x: x[1], reverse=True)
199 top_name, top_count = items[0]
200 # tie -> no clear majority
201 if sum(1 for _, c in items if c == top_count) > 1:
202 return None
203 if top_count / total >= threshold:
204 return top_name
205 return None
206
207
208 def _resolve_conflicts_recursive(conflicting: list, level_idx: int, threshold: float, max_levels: int, prefix=None):
209 """
210 Recursively resolve taxonomic classification conflicts across multiple levels.
211
212 This function attempts to resolve conflicts between multiple taxonomic paths by
213 recursively evaluating each taxonomic level. At each level, it determines whether
214 there is a clear majority (based on a specified threshold) for a taxon name. If
215 a majority is found, the function continues to the next level using only the
216 taxa that match that name. If no majority exists, the function constructs
217 "uncertain" outputs to indicate ambiguous classification.
218
219 :param conflicting: List of tuples containing full taxonomic paths and their
220 associated counts, e.g. ``[("Bacteria / Proteobacteria / Gammaproteobacteria", 10),
221 ("Bacteria / Proteobacteria / Alphaproteobacteria", 5)]``.
222 :type conflicting: list[tuple[str, int]]
223 :param level_idx: The current taxonomic level index being evaluated (0-based).
224 :type level_idx: int
225 :param threshold: The minimum fraction (between 0 and 1) required for a strict
226 majority decision at a given taxonomic level.
227 :type threshold: float
228 :param max_levels: The maximum number of taxonomic levels to evaluate before
229 stopping recursion.
230 :type max_levels: int
231 :param prefix: A list of already-resolved taxon names up to the previous level.
232 Defaults to ``None``, in which case an empty list is used.
233 :type prefix: list[str] | None
234
235 :return: A tuple containing:
236 - **short_output** (str): A simplified taxonomic path with "Uncertain taxa"
237 appended if no consensus was reached at the current level.
238 - **full_output** (str): The full taxonomic path, filled with additional
239 "Uncertain taxa" entries up to the minimal conflicting level depth.
240 :rtype: tuple[str, str]
241
242 :example:
243 conflicts = [
244 ... ("Bacteria / Proteobacteria / Gammaproteobacteria", 10),
245 ... ("Bacteria / Proteobacteria / Alphaproteobacteria", 5)
246 ... ]
247 _resolve_conflicts_recursive(conflicts, level_idx=0, threshold=0.6, max_levels=5)
248 ('Bacteria / Proteobacteria / Uncertain taxa',
249 'Bacteria / Proteobacteria / Uncertain taxa / Uncertain taxa')
250 """
251 if prefix is None:
252 prefix = []
253
254 if not conflicting:
255 return "", ""
256
257 # split paths (keeps sync)
258 conflicting_levels = [t.split(" / ") for t, _ in conflicting]
259 min_conflicting_level = min(len(p) for p in conflicting_levels)
260
261 # If only one full path remains -> return it (both short & full)
262 if len(conflicting) == 1:
263 return conflicting[0][0], conflicting[0][0]
264
265 # If we've reached deepest comparable level or max_levels -> pick most supported full path
266 if level_idx >= min_conflicting_level or level_idx >= max_levels:
267 full_counts = defaultdict(int)
268 for t, c in conflicting:
269 full_counts[t] += c
270 chosen_full = max(full_counts.items(), key=lambda x: x[1])[0]
271 return chosen_full, chosen_full
272
273 # Count names at this level
274 level_counts = defaultdict(int)
275 for taxon, count in conflicting:
276 parts = taxon.split(" / ")
277 if level_idx < len(parts):
278 level_counts[parts[level_idx]] += count
279
280 # If everyone agrees at this level, append that name and go deeper
281 if len(level_counts) == 1:
282 name = next(iter(level_counts))
283 return _resolve_conflicts_recursive(conflicting, level_idx + 1, threshold, max_levels, prefix + [name])
284
285 # Check for a strict majority at this level
286 chosen_level_name = _choose_taxon(level_counts, threshold)
287 if chosen_level_name is not None:
288 # Keep only taxa that match the chosen name at this level and go deeper
289 filtered = [(t, c) for (t, c) in conflicting
290 if level_idx < len(t.split(" / ")) and t.split(" / ")[level_idx] == chosen_level_name]
291 return _resolve_conflicts_recursive(filtered, level_idx + 1, threshold, max_levels, prefix + [chosen_level_name])
292
293 # No majority at this level -> construct uncertain outputs.
294 # short: prefix + 'Uncertain taxa' (stop here)
295 short_path = prefix + ['Uncertain taxa']
296 short_output = " / ".join(short_path)
297
298 # full: prefix + 'Uncertain taxa' + fill until min_conflicting_level
299 full_path = short_path + ['Uncertain taxa'] * (min_conflicting_level - (level_idx + 1))
300 full_output = " / ".join(full_path)
301
302 return short_output, full_output
303
304
305 def resolve_taxon_majority(taxon_counts: dict, threshold: float = 0.90, max_levels: int = 7):
306 """
307 Resolve the majority consensus taxonomic classification from a set of annotated paths.
308
309 :param taxon_counts: A dictionary mapping full taxonomic paths to their respective
310 occurrence counts. For example:
311 ``{"Bacteria / Proteobacteria / Gammaproteobacteria": 10,
312 "Bacteria / Proteobacteria / Alphaproteobacteria": 5}``.
313 :type taxon_counts: dict[str, int]
314 :param threshold: Minimum fraction (between 0 and 1) required for a strict majority
315 at each taxonomic level. Defaults to ``0.90``.
316 :type threshold: float, optional
317 :param max_levels: Maximum number of taxonomic levels to evaluate before stopping
318 recursion. Defaults to ``7``.
319 :type max_levels: int, optional
320
321 :return: A tuple containing:
322 - **short_output** (str): The consensus taxonomic path, possibly ending with
323 "Uncertain taxa" if ambiguity remains.
324 - **full_output** (str): The expanded taxonomic path including placeholder
325 "Uncertain taxa" labels up to the minimal conflicting depth.
326 :rtype: tuple[str, str]
327 """
328 conflicting = list(taxon_counts.items()) # list of (path, count) keeps taxa<->count synced
329 return _resolve_conflicts_recursive(conflicting, level_idx=0, threshold=threshold, max_levels=max_levels)
330
331
332 def process_taxa_output(taxa_dicts, output_path, uncertain_threshold):
333 """
334 Generate a tabular taxa report from a list of read taxonomies.
335
336 This function processes taxonomic assignments for each read in a dataset
337 and produces a summary report similar in style to Kraken2 output. The
338 report includes:
339
340 - The number of reads that were marked as "Uncertain taxa" at each
341 taxonomic rank.
342 - For all other taxa, the number of reads assigned and their percentage
343 relative to the sample total.
344 - The taxonomic rank for each assignment, indicated in the report with
345 indentation proportional to the rank depth.
346
347 :param taxa_dicts: List of taxon count dictionaries, one per read, where
348 keys are taxon names and values are counts of how often
349 they were assigned to the read.
350 :type taxa_dicts: list[dict[str, int]]
351 :param output_path: Path to save the tabular taxa report.
352 :type output_path: str
353 :param uncertain_threshold: Threshold for resolving conflicting taxonomic
354 assignments (values below the threshold are
355 marked as "Uncertain taxa").
356 :type uncertain_threshold: float
357 :return: None. The function writes the taxa report to ``output_path``.
358 :rtype: None
359 """
360 uncertain_dict = {level: 0 for level in TAXONOMIC_LEVELS}
361 aggregated_counts = defaultdict(int)
362
363 for read_taxa in taxa_dicts:
364 resolved_taxon, _ = resolve_taxon_majority(read_taxa, uncertain_threshold)
365
366 # Add counts for resolved taxon
367 levels = resolved_taxon.split(" / ")
368 for i in range(1, len(levels) + 1):
369 aggregated_counts[" / ".join(levels[:i])] += 1
370
371 # Calculate total count
372 total_count = sum(value for key, value in aggregated_counts.items()
373 if len(key.split(" / ")) == 1)
374
375 report_lines = []
376
377 for taxonomy, count in sorted(aggregated_counts.items(), key=lambda x: x[0]):
378 levels = taxonomy.split(" / ")
379 indent = " " * (len(levels) - 1) * 2
380 taxon_name = levels[-1]
381 taxon_level_code = TAXONOMIC_LEVELS[len(levels) - 1] if len(levels) <= len(TAXONOMIC_LEVELS) else "U"
382 percentage = (count / total_count) * 100 if total_count > 0 else 0
383
384 report_lines.append(
385 f"{percentage:.2f}\t{count}\t{total_count}\t{taxon_level_code}\t{indent}{taxon_name}"
386 )
387
388 if taxon_name == 'Uncertain taxa':
389 uncertain_dict[taxon_level_code] += count
390
391 # Write output
392 with open(output_path, 'w') as f:
393 f.write("Uncertain count per taxonomie level" + str(uncertain_dict) + '\n')
394 f.write('percentage_rooted\tnumber_rooted\ttotal_num\ttaxon_level\tindentificatie\n')
395 f.write("\n".join(report_lines))
396
397
398 def process_header_annotations(taxa_dicts, headers, e_values, output_path, uncertain_threshold,
399 identity_list, coverage_list, source_list, bitscore_list):
400 """
401 Generate an Excel report with per-read and aggregated taxonomic annotations.
402
403 This function processes taxonomic assignments per read, along with associated
404 metrics (e-value, identity, coverage, bitscore, source), and generates an Excel
405 file with two sheets:
406
407 1. **Individual_Reads** – One row per sequence, including all metrics and
408 the resolved taxonomic path.
409 2. **Merged_by_Taxa** – Aggregated results per unique taxonomic path, including
410 merged metrics and counts.
411
412 Internally, the function creates a temporary TSV file, processes it into
413 a pandas DataFrame, expands the taxonomic path into separate columns
414 (e.g., kingdom, phylum, class, …), and then writes the results into a
415 multi-sheet Excel file. Column widths are automatically adjusted for readability.
416
417 :param taxa_dicts: List of taxon count dictionaries, one per read, where keys
418 are taxa and values are their counts in the read.
419 :type taxa_dicts: list[dict[str, int]]
420 :param headers: Sequence headers corresponding to each read. Used to extract
421 read identifiers and counts.
422 :type headers: list[str]
423 :param e_values: List of e-values per read. Each element is a list of values
424 associated with that read.
425 :type e_values: list[list[float]]
426 :param output_path: Path to the Excel file to be created.
427 :type output_path: str
428 :param uncertain_threshold: Threshold for resolving conflicting taxonomic
429 assignments. Values below the threshold are
430 replaced with "Uncertain taxa".
431 :type uncertain_threshold: float
432 :param identity_list: Percent identity values per read.
433 :type identity_list: list[list[str]]
434 :param coverage_list: Coverage percentage values per read.
435 :type coverage_list: list[list[str]]
436 :param source_list: Source identifiers per read (e.g., database names).
437 :type source_list: list[list[str]]
438 :param bitscore_list: Bitscore values per read.
439 :type bitscore_list: list[list[str]]
440 :return: None. The function writes results to an Excel file at ``output_path``.
441 :rtype: None
442 :raises PermissionError: If the temporary TSV or output Excel file cannot be written
443 (e.g., file is open in another program).
444 :raises IndexError: If the input lists (headers, e_values, etc.) are not aligned
445 with ``taxa_dicts``.
446 :raises ValueError: If sequence headers are not formatted as expected for extracting counts.
447 """
448 report_lines = []
449 for i, read_taxa in enumerate(taxa_dicts):
450 _, resolved_taxon_long = resolve_taxon_majority(read_taxa, uncertain_threshold)
451 try:
452 e_val = e_values[i][0] if e_values[i] else "N/A"
453 identity = identity_list[i][0] if identity_list[i] else "N/A"
454 coverage = coverage_list[i][0] if coverage_list[i] else "N/A"
455 source = source_list[i][0] if source_list[i] else "N/A"
456 bitscore = bitscore_list[i][0] if bitscore_list[i] else "N/A"
457 except (IndexError, TypeError) as e:
458 print(f"Mismatch while extracting values: {e}. Check that your annotated and unannotated input files correspond correctly.")
459 continue
460
461 header = headers[i] if i < len(headers) else f"Header missing"
462
463 # Extract count
464 try:
465 header_base, count_str = header.rsplit("(", 1)
466 count = int(count_str.rstrip(")"))
467 except ValueError as e:
468 print(f'Failed extracting count: {e}')
469 header_base = header
470 count = 1
471
472 report_lines.append(
473 f'{header_base}\t{e_val}\t{identity}\t{coverage}\t{bitscore}\t{count}\t{source}\t{resolved_taxon_long}')
474 # Create temporary TSV for processing
475 temp_tsv_path = 'temp.tsv'
476 try:
477 with open(temp_tsv_path, 'w') as f:
478 f.write('header\te_value\tidentity percentage\tcoverage\tbitscore\tcount\tsource\ttaxa\n')
479 f.write("\n".join(report_lines))
480 except PermissionError as e:
481 print(f"Unable to write to file, error: {e} file might be opened")
482
483 # Process the data
484 df = pd.read_csv(temp_tsv_path, sep='\t', encoding="latin1")
485 if not df.empty:
486 # Split taxa into separate columns
487 taxa_split = df["taxa"].str.split(" / ", expand=True)
488 max_levels = taxa_split.shape[1] if taxa_split.shape[1] is not None else 7
489 level_names = ["kingdom", "phylum", "class", "order", "family", "genus", "species"][:max_levels]
490 taxa_split.columns = level_names
491
492 # Add taxa columns to main dataframe
493 df_individual = pd.concat([df, taxa_split], axis=1)
494 df_individual = df_individual.sort_values(['species', 'genus', 'family'], ascending=[True, True, True])
495 # Create merged dataframe - first add taxa columns, then aggregate
496 df_for_merge = df_individual.copy()
497
498 group_cols = ["taxa"]
499 agg_dict = {
500 "e_value": lambda x: f"{x.min()}–{x.max()}",
501 "identity percentage": lambda x: list_to_string(list(x.unique())),
502 "coverage": lambda x: list_to_string(list(x.unique())),
503 "bitscore": lambda x: list_to_string(list(x.unique())),
504 "count": "sum",
505 "source": "first"
506 }
507
508 # Add aggregation for taxa levels that actually exist
509 for level in level_names:
510 if level in df_for_merge.columns:
511 agg_dict[level] = "first"
512
513 df_merged = df_for_merge.groupby(group_cols, as_index=False).agg(agg_dict)
514
515 sort_columns = []
516 sort_ascending = []
517
518 # Add family, genus, species if they exist
519 for level in ['species', 'genus', 'family']:
520 if level in df_merged.columns:
521 sort_columns.append(level)
522 sort_ascending.append(True)
523
524 df_merged = df_merged.sort_values(sort_columns, ascending=sort_ascending)
525
526 # Write both datasets to single Excel file with multiple sheets
527 temp_path = output_path + ".xlsx"
528 os.makedirs(os.path.dirname(temp_path), exist_ok=True)
529 with pd.ExcelWriter(temp_path, engine='openpyxl', mode='w') as writer:
530 df_individual.to_excel(writer, sheet_name='Individual_Reads', index=False)
531 df_merged.to_excel(writer, sheet_name='Merged_by_Taxa', index=False)
532
533 # Auto-adjust column widths for both sheets
534 for sheet_name in writer.sheets:
535 worksheet = writer.sheets[sheet_name]
536 for column in worksheet.columns:
537 max_length = 0
538 column_letter = column[0].column_letter
539 for cell in column:
540 try:
541 if len(str(cell.value)) > max_length:
542 max_length = len(str(cell.value))
543 except:
544 pass
545 adjusted_width = min(max_length + 2, 50) # Cap at 50 characters
546 worksheet.column_dimensions[column_letter].width = adjusted_width
547 os.replace(temp_path, output_path)
548 # Clean up temporary file
549 os.remove(temp_tsv_path)
550 else:
551 print("Dataframe empty, no annotation results")
552
553
554 def create_circle_diagram(taxa_dicts, output_path, use_counts, uncertain_threshold):
555 """
556 Generate hierarchical JSON data for a circular taxonomy diagram.
557
558 This function aggregates taxonomic classification results from multiple reads
559 and prepares hierarchical count data suitable for circular visualization.
560
561 :param taxa_dicts: A list of dictionaries, where each dictionary represents a
562 mapping from full taxonomic paths to their corresponding counts for a single
563 read. Each key should be a taxonomic path in the format
564 :type taxa_dicts: list[dict[str, int]]
565 :param output_path: File path to write the generated JSON output containing
566 circle diagram data.
567 :type output_path: str
568 :param use_counts: Whether to aggregate counts by total read frequency
569 (``True``) or to count only unique taxa once (``False``).
570 :type use_counts: boolean
571 :param uncertain_threshold: Fraction (between 0 and 1) representing the minimum
572 majority threshold used for resolving conflicting taxonomic assignments.
573 :type uncertain_threshold: float
574
575 :return: None. The resulting JSON file will be written to ``output_path``. The
576 file contains a list of dictionaries, one per taxonomic level, with:
577 - **labels** (*list[str]*): Names of taxa at that level.
578 - **sizes** (*list[int]*): Corresponding aggregated counts.
579 :rtype: None
580
581 :note:
582 - Ambiguous classifications are labeled as "Uncertain taxa".
583 - The number of hierarchical levels is determined by the length of
584 ``TAXONOMIC_LEVELS``.
585 """
586 aggregated_counts = defaultdict(int)
587 seen_taxa = set()
588
589 for read_taxa in taxa_dicts:
590 _, resolved_taxon_long = resolve_taxon_majority(read_taxa, uncertain_threshold)
591
592 levels = resolved_taxon_long.split(" / ")
593
594 if use_counts:
595 for i in range(1, len(levels) + 1):
596 aggregated_counts[" / ".join(levels[:i])] += 1
597 else:
598 if resolved_taxon_long not in seen_taxa:
599 for i in range(1, len(levels) + 1):
600 aggregated_counts[" / ".join(levels[:i])] += 1
601 seen_taxa.add(resolved_taxon_long)
602
603 # Prepare circle data
604 circle_data = []
605 for level in range(len(TAXONOMIC_LEVELS)):
606 circle_data.append({"labels": [], "sizes": []})
607
608 for taxonomy, count in sorted(aggregated_counts.items(), key=lambda x: x[0]):
609 levels = taxonomy.split(" / ")
610 if len(levels) <= len(TAXONOMIC_LEVELS):
611 circle_data[len(levels) - 1]["labels"].append(levels[-1].strip())
612 circle_data[len(levels) - 1]["sizes"].append(count)
613 with open(output_path, "w") as f:
614 json.dump(circle_data, f, indent=2)
615
616
617 def process_single_file(anno_file_path, unanno_file_path, args):
618 """
619 Process a single annotated BLAST file and generate all requested analytical outputs.
620
621 This function integrates multiple steps for analyzing annotated BLAST output
622 files and corresponding unannotated FASTA sequences. It organizes BLAST hits
623 per query, extracts the best-scoring matches, and produces various types of
624 output files (plots, tables, diagrams, and statistics) according to user
625 arguments.
626
627 :param anno_file_path: Path to the annotated BLAST results file.
628 :type anno_file_path: str
629 :param unanno_file_path: Path to a FASTA file containing unannotated sequences.
630 Used to determine sequence order and total unique sequence counts.
631 :type unanno_file_path: str
632 :param args: Parsed command-line arguments containing the following attributes:
633 - **eval_threshold** (*float*): Maximum allowed E-value for valid hits.
634 - **eval_plot** (*str | None*): Output path for E-value distribution plot.
635 - **taxa_output** (*str | None*): Output path for taxonomic summary table.
636 - **header_anno** (*str | None*): Output path for annotated header table.
637 - **circle_data** (*str | None*): Output path for circular diagram JSON data.
638 - **anno_stats** (*str | None*): Output path for annotation statistics file.
639 - **use_counts** (*bool*): Whether to use read counts or unique taxa only.
640 - **uncertain_threshold** (*float*): Majority threshold for resolving taxonomic ambiguity.
641 :type args: argparse.Namespace
642
643 :return: None. This function writes multiple output files to the paths
644 specified in ``args``
645 :rtype: None
646
647 :raises FileNotFoundError: If the annotated BLAST or unannotated FASTA file cannot be found.
648 :raises ValueError: If file content is malformed or missing required columns.
649
650 :notes:
651 - The function first groups BLAST hits by query ID and filters out invalid or
652 low-confidence taxa.
653 - For each query, the best BLAST hit is determined based on bitscore and E-value.
654 - The total number of unique annotated sequences is inferred from sequence
655 headers containing a ``count=`` field.
656 - Progress and warnings are printed to standard output for traceability.
657 """
658 # Core data structures - one per read
659 read_data = [] # List of dicts, one per read
660 headers = []
661 seen_reads = set()
662 annotated_unique_count = 0
663 total_unique_count = 0
664
665 # Current read processing
666 current_q_id = ''
667 current_read_info = None
668
669 # Check if file exists and has content
670 if not os.path.exists(anno_file_path):
671 print(f"Error: Input file {anno_file_path} not found")
672 return
673
674 unanno_headers_ordered = []
675 if os.path.exists(unanno_file_path):
676 with open(unanno_file_path, 'r') as f:
677 for line in f:
678 if line.startswith('>'):
679 header = line.split()[0].strip('>')
680 unanno_headers_ordered.append(header)
681 print(f"Found {len(unanno_headers_ordered)} headers in unannotated file")
682 else:
683 print(f"Warning: Unannotated file {unanno_file_path} not found")
684
685 # Counter to track which read we're currently expecting
686 unanno_set = set(unanno_headers_ordered)
687 read_counter = 0
688
689 # Count total unique sequences from unannotated file
690 if os.path.exists(unanno_file_path):
691 with open(unanno_file_path, 'r') as f:
692 for line in f:
693 if "count=" in line:
694 count = int(line.split("count=")[1].split(";")[0])
695 total_unique_count += count
696
697 # Process annotated file
698 from collections import OrderedDict
699 blast_groups = OrderedDict() # q_id -> list of hit dicts
700
701 # Group BLAST hits per q_id
702 try:
703 with open(anno_file_path, 'r') as f:
704 for line in f:
705 if line.startswith("#"):
706 continue
707 parts = line.strip().split('\t')
708 if len(parts) < 7:
709 continue
710
711 q_id = parts[0]
712 try:
713 e_val = float(parts[6])
714 except Exception:
715 continue
716 if e_val > args.eval_threshold:
717 continue
718 valid_taxa = True
719 taxon = parts[-1].strip()
720 if is_invalid_taxon(taxon):
721 print(f"Skipping {q_id}: invalid taxon: {taxon}")
722 valid_taxa = False
723
724 identity = parts[4] if len(parts) > 4 else ''
725 cov = parts[5] if len(parts) > 5 else ''
726 bitscore = float(parts[7] if len(parts) > 7 else '')
727 source = parts[8] if len(parts) > 8 else ''
728 if valid_taxa:
729 hit = {
730 'e_val': e_val,
731 'identity': identity,
732 'cov': cov,
733 'bitscore': bitscore,
734 'source': source,
735 'taxon': taxon
736 }
737 blast_groups.setdefault(q_id, []).append(hit)
738 except Exception as e:
739 print(f"Error reading BLAST file {anno_file_path}: {e}")
740 return
741
742 # BLAST-reads not in fasta
743 extra_blast_qids = [q for q in blast_groups.keys() if q not in unanno_set]
744 if extra_blast_qids:
745 print(f"Note: {len(extra_blast_qids)} BLAST q_ids not in FASTA: {extra_blast_qids[:10]}...")
746
747 # Process fasta order
748 for header in unanno_headers_ordered:
749 if header not in blast_groups:
750 print(f"Skipping {header}: no BLAST hits")
751 continue
752 current_read_info = {
753 'taxa_dict': {},
754 'header': header,
755 'e_values': set(),
756 'identities': set(),
757 'coverages': set(),
758 'sources': set(),
759 'bitscores': set(),
760 'best_eval': float(),
761 'best_identity': '',
762 'best_coverage': '',
763 'best_source': '',
764 'best_bitscore': -1,
765 'best_taxa_dict' : {}
766 }
767
768 # process all hits for this header
769 for hit in blast_groups[header]:
770 e_val = hit['e_val']
771 identity = hit['identity']
772 cov = hit['cov']
773 bitscore = hit['bitscore']
774 source = hit['source']
775 taxon = hit['taxon']
776
777 current_read_info['e_values'].add(str(e_val))
778 current_read_info['identities'].add(identity)
779 current_read_info['coverages'].add(cov)
780 current_read_info['sources'].add(source)
781 current_read_info['bitscores'].add(bitscore)
782
783 # Keep track of best hit
784 if (bitscore > current_read_info['best_bitscore'] or
785 (bitscore == current_read_info['best_bitscore'] and e_val < current_read_info['best_eval'])):
786 # New best set → reset
787 current_read_info['best_bitscore'] = bitscore
788 current_read_info['best_eval'] = e_val
789 current_read_info['best_identity'] = identity
790 current_read_info['best_coverage'] = cov
791 current_read_info['best_source'] = source
792 current_read_info['best_taxa_dict'] = {taxon: 1}
793
794 # If hit is of same quality: add to existing dictionary
795 elif bitscore == current_read_info['best_bitscore'] and e_val == current_read_info['best_eval']:
796 td = current_read_info['best_taxa_dict']
797 td[taxon] = td.get(taxon, 0) + 1
798 # Might be unnecesary, best_taxa_dict seems to be the same as taxa_dict
799 # in output. Too scared to remove.
800 if len(current_read_info['e_values']) == 1:
801 taxa_dict = current_read_info['taxa_dict']
802 taxa_dict[taxon] = taxa_dict.get(taxon, 0) + 1
803 read_data.append(current_read_info)
804
805 if header not in seen_reads:
806 seen_reads.add(header)
807 headers.append(header)
808 try:
809 count = int(header.split('(')[1].split(')')[0])
810 except IndexError or ValueError or AttributeError:
811 count = 0
812 annotated_unique_count += count
813
814 # Extract data for different functions from unified structure
815 taxa_dicts = [read['taxa_dict'] for read in read_data]
816 td = [read['best_taxa_dict'] for read in read_data]
817 # Generate outputs based on arguments
818 if args.eval_plot:
819 e_val_sets = [read['e_values'] for read in read_data]
820 make_eval_plot(e_val_sets, args.eval_plot)
821
822 if args.taxa_output:
823 process_taxa_output(td, args.taxa_output, args.uncertain_threshold)
824
825 if args.header_anno:
826 # Extract best values efficiently - single list comprehension per metric
827 e_values_for_headers = [[read['best_eval']] for read in read_data]
828 identity_for_headers = [[read['best_identity']] for read in read_data]
829 coverage_for_headers = [[read['best_coverage']] for read in read_data]
830 source_for_headers = [[read['best_source']] for read in read_data]
831 bitscore_for_headers= [[read['best_bitscore']] for read in read_data]
832 process_header_annotations(td, headers, e_values_for_headers,
833 args.header_anno, args.uncertain_threshold,
834 identity_for_headers, coverage_for_headers, source_for_headers, bitscore_for_headers)
835
836 if args.circle_data:
837 create_circle_diagram(td, args.circle_data, args.use_counts,
838 args.uncertain_threshold)
839
840 if args.anno_stats:
841 stats = calculate_annotation_stats(len(taxa_dicts), unanno_file_path,
842 annotated_unique_count, total_unique_count)
843 with open(args.anno_stats, 'w') as f:
844 f.write('metric\tvalue\n')
845 for key, value in stats.items():
846 f.write(f'{key}\t{value}\n')
847
848 def is_invalid_taxon(taxon: str) -> bool:
849 """
850 Determine whether a given taxonomic path should be considered invalid and excluded from analysis.
851
852 This function identifies and filters out taxonomic strings that contain unreliable,
853 incomplete, or placeholder taxonomic information. It is used to clean BLAST or
854 classification outputs before aggregation and visualization.
855
856 :param taxon: Full taxonomic path as a string
857 :type taxon: str
858 :return: ``True`` if the taxon is considered invalid according to the criteria below;
859 otherwise ``False``.
860 :rtype: bool
861
862 :criteria:
863 A taxon is considered invalid if:
864 - It contains the substring ``"invalid taxid"`` or ``"GenBank invalid taxon"``.
865 - It includes the term ``"environmental sample"`` (non-specific environmental sequence).
866 - It has any hierarchical level labeled as ``"unknown"`` followed by a more
867 specific level (e.g., *unknown genus / Homo sapiens*).
868
869 :note:
870 This function performs a **case-insensitive** check and assumes the
871 input taxonomic path uses ``" / "`` as a level delimiter.
872 """
873 taxon_lower = taxon.lower()
874
875 if "invalid taxid" in taxon_lower or "genbank invalid taxon" in taxon_lower:
876 return True
877
878 if "environmental sample" in taxon_lower:
879 return True
880
881 # check for "unknown" followed by something deeper
882 parts = taxon.split(" / ")
883 for i, part in enumerate(parts):
884 if "unknown" in part.lower() and i < len(parts) - 1:
885 # there is something more specific after an unknown level
886 return True
887 return False
888
889
890 def main(arg_list=None):
891 """
892 Entry point for Galaxy-compatible BLAST annotation processing.
893
894 :param arg_list: Optional list of command-line arguments to override
895 ``sys.argv``. Primarily used for testing or programmatic execution.
896 If ``None``, arguments are read directly from the command line.
897 :type arg_list: list[str] | None
898 :return: None
899 :rtype: None
900
901 Notes:
902 Calls `process_single_file` with parsed arguments.
903 """
904 args = parse_arguments(arg_list)
905 for output_file in [args.eval_plot, args.header_anno, args.taxa_output, args.circle_data, args.anno_stats]:
906 if output_file:
907 os.makedirs(os.path.dirname(output_file), exist_ok=True)
908 process_single_file(args.input_anno, args.input_unanno, args)
909 print("Processing completed successfully")
910
911
912 if __name__ == "__main__":
913 main()