Mercurial > repos > onnodg > blast_annotations_processor
view tests/test_blast_annotations_processor.py @ 2:9ca209477dfd draft default tip
planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_annotations_tool commit 4017d38cf327c48a6252e488ba792527dae97a70-dirty
| author | onnodg |
|---|---|
| date | Mon, 15 Dec 2025 16:43:36 +0000 |
| parents | a3989edf0a4a |
| children |
line wrap: on
line source
""" Test suite for BLAST annotation processor. """ import re import ast import pytest import os import sys import json import pandas as pd from pathlib import Path # Add the module to path for importing sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from Stage_1_translated.NLOOR_scripts.process_annotations_tool.blast_annotations_processor import ( process_single_file, resolve_tax_majority, TAXONOMIC_LEVELS, check_header_string ) class TestBlastAnnotationProcessor: """Test class for BLAST annotation processor""" @pytest.fixture(scope="class") def test_data_dir(self): """Setup test data directory structure""" base_dir = Path("test-data") base_dir.mkdir(exist_ok=True) for subdir in ["input", "expected", "output"]: (base_dir / subdir).mkdir(exist_ok=True) return base_dir @pytest.fixture(scope="class") def sample_files(self, test_data_dir): """Create sample input files for testing""" input_dir = test_data_dir / "input" blast_content = """#Query ID #Subject #Subject accession #Subject Taxonomy ID #Identity percentage #Coverage #evalue #bitscore #Source #Taxonomy read1(100) subject2 id2 subject2 90.0 95 1e-45 180 database1 Bacteria / Firmicutes / Bacilli / Bacillales / Bacillaceae / Bacillus / Bacillus_subtilis read1(100) subject1 id1 subject1 95.889 100 1e-50 200 database1 Bacteria / Firmicutes / Bacilli / Bacillales / Bacillaceae / Bacillus / Bacillus_subtilis read2(50) subject3 id3 subject3 85.0 90 1e-40 160 database2 Bacteria / Proteobacteria / Gammaproteobacteria / Enterobacterales / Enterobacteriaceae / Escherichia / Escherichia_coli read3(25) subject4 id4 subject4 80.0 85 1e-35 140 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_smithii read4(25) subject4 id4 subject4 80.0 85 1e-35 140 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_blabla read4(25) subject4 id4.1 subject4 80.0 85 1e-40 140 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_eclhi read4(25) subject4 id4 subject4 80.0 85 1e-35 140 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_elchi read4(25) subject4 id4.2 subject4 90.0 87 1e-50 160 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_smithii """ fasta_content = """>read1(100) count=100; ATCGATCGATCGATCG >read2(50) count=50; GCTAGCTAGCTAGCTA >read3(25) count=25; TGACTGACTGACTGAC >read4(25) count=25; TGAAAAAAACACCAC """ blast_file = input_dir / "test_blast.tabular" fasta_file = input_dir / "test_sequences.fasta" with open(blast_file, 'w') as f: f.write(blast_content) with open(fasta_file, 'w') as f: f.write(fasta_content) return { 'blast': str(blast_file), 'fasta': str(fasta_file) } @pytest.fixture(scope="class") def processed_output(self, test_data_dir, sample_files): """Run the processor on sample files and return output paths""" output_dir = test_data_dir / "output" class Args: def __init__(self): self.input_anno = sample_files['blast'] self.input_unanno = sample_files['fasta'] self.eval_plot = str(output_dir / "eval_plot.png") self.taxa_output = str(output_dir / "taxa_output.txt") self.circle_data = str(output_dir / "circle_data.json") self.header_anno = str(output_dir / "header_anno.xlsx") self.anno_stats = str(output_dir / "anno_stats.txt") self.filtered_fasta = str(output_dir / "filtered_fasta.fasta") self.log = str(output_dir / "log.txt") self.uncertain_threshold = 0.9 self.eval_threshold = 1e-10 self.min_bitscore = 60 self.min_support = 1 self.ignore_rank = 'unknown' self.ignore_taxonomy = 'environmental' self.bitscore_perc_cutoff = 10 self.ignore_obiclean_type ='singleton' self.ignore_illuminapairend_type = 'pairend' self.min_identity = 70 self.min_coverage = 70 self.ignore_seqids = '' self.use_counts = True args = Args() log_messages = [] process_single_file(args.input_anno, args.input_unanno, args, log_messages) return args def test_read_count_consistency(self, processed_output): """ Test 1: Read Count Consistency Verifies that read counts from FASTA headers are correctly preserved and aggregated in all output files. """ df = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads') expected_counts = {'read1': 100, 'read2': 50, 'read3': 25, 'read4':25} skipped_reads = [] for read_name, expected_count in expected_counts.items(): subset = df.loc[df['header'] == read_name] if subset.empty: skipped_reads.append(read_name) # remember we skip this read continue row = subset.iloc[0] assert row['count'] == expected_count, f"Count mismatch for {read_name}" with open(processed_output.anno_stats, 'r') as f: stats_content = f.read() # Total unique count should be 175 (100+50+25) assert 'total_unique: 200' in stats_content, "Total unique count incorrect in stats" if skipped_reads: assert all(read not in df['header'].values for read in skipped_reads) print("✓ Test 1 PASSED: Read counts consistent across all outputs") def test_lowest_common_ancester(self, processed_output): """ Test 2: Big Input Files Tests the functioning of lowest common ancestor selection with realistic inputfile sizes """ test_conflicts = { 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita a': 10, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita b': 1, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita c': 1, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita d': 1, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita e': 1, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia a': 450, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia b': 2, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia c': 2, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia d': 2, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia e': 2, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia f': 12, 'Viridiplantae / Streptophyta / Bryopsida / Funariales / Funariaceae / Funaria / Uncertain taxa': 6 } resolved_short1, resolved_long1 = resolve_tax_majority(test_conflicts, 0.9) assert 'Ciceronia a' in resolved_short1, "Conflict not resolved to uncertain taxa" test_90_precent_conflicts = { 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita a': 90, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita b': 10, 'Viridiplantae / Streptophyta / Bryopsida / Funariales / Funariaceae / Funaria / Uncertain taxa': 6 } resolved_short, resolved_long = resolve_tax_majority(test_90_precent_conflicts, 0.9) assert 'Viridiplantae / Streptophyta / Uncertain taxa' in resolved_long, "Conflict not resolved to uncertain taxa" print("✓ Test 2 PASSED: Lowest common ancestor works correctly") def test_taxonomic_conflict_resolution(self, processed_output): """ Test 3: Taxonomic Conflict Resolution Tests the uncertainty threshold mechanism for resolving taxonomic conflicts. Uses a controlled scenario where multiple hits have different taxa. """ test_conflicts = { 'Bacteria / Firmicutes / Bacilli': 2, 'Bacteria / Proteobacteria / Gammaproteobacteria': 1 } resolved_short, resolved_long = resolve_tax_majority(test_conflicts, 0.9) # With threshold 0.9, should resolve to most common (2/3 = 0.67 < 0.9, so uncertain) assert 'Uncertain taxa' in resolved_short, "Conflict not resolved to uncertain taxa" test_high_confidence = { 'Bacteria / Firmicutes / Bacilli': 9, 'Bacteria / Proteobacteria / Gammaproteobacteria': 1 } resolved_short, resolved_long = resolve_tax_majority(test_high_confidence, 0.9) assert 'Firmicutes' in resolved_short, "High confidence case not resolved correctly" print("✓ Test 3 PASSED: Taxonomic conflict resolution working correctly") def test_output_file_structures(self, processed_output): """ Test 4: Output File Structure Validation Verifies that all output files are created with correct structure and format. """ excel_file = processed_output.header_anno assert os.path.exists(excel_file), "Excel output file not created" xl_file = pd.ExcelFile(excel_file) expected_sheets = ['Individual_Reads', 'Merged_by_Taxa'] assert all(sheet in xl_file.sheet_names for sheet in expected_sheets), "Missing Excel sheets" df_individual = pd.read_excel(excel_file, sheet_name='Individual_Reads') expected_cols = ['header', 'seq_id', 'source', 'count', 'taxa', 'kingdom', 'phylum', 'class', 'order', 'family', 'genus', 'species'] assert all(col in df_individual.columns for col in expected_cols), "Missing columns in Individual_Reads" with open(processed_output.taxa_output, 'r') as f: taxa_lines = f.readlines() assert len(taxa_lines) == 2, "Taxa output too short" assert 'percentage_rooted\tnumber_rooted' in taxa_lines[1], "Taxa output header incorrect" with open(processed_output.anno_stats, 'r') as f: anno_stats = f.readlines() assert 'FASTA: headers kept after filters and min_support=1: 4\n' in anno_stats, "Taxa output header incorrect" filter_f = 4 with open(processed_output.circle_data, 'r') as f: circle_data = json.load(f) assert isinstance(circle_data, list), "Circle data should be a list" assert len(circle_data) == len(TAXONOMIC_LEVELS), "Circle data should have entry per taxonomic level" with open(processed_output.filtered_fasta, 'r') as f: filtered_fasta = f.readlines() assert len(filtered_fasta) == filter_f * 2 print("✓ Test 4 PASSED: All output files have correct structure") def test_header_synchronization(self, test_data_dir): """ Test 5: Header Synchronization Between Files Tests that the processor correctly handles mismatched headers between annotated and unannotated files. """ input_dir = test_data_dir / "input" output_dir = test_data_dir / "output" # Create mismatched files blast_content = """#Query ID #Subject #Subject accession #Subject Taxonomy ID #Identity percentage #Coverage #evalue #bitscore #Source #Taxonomy read1(100) source=NCBI sequenceID=KR738003 superkingdom=Eukaryota kingdom=Viridiplantae phylum=Streptophyta subphylum=Streptophytina class=Magnoliopsida subclass=NA infraclass=NA order=Malvales suborder=NA infraorder=NA superfamily=NA family=Malvaceae genus=Hibiscus species=Hibiscus trionum markercode=trnL lat=0.304 lon=36.87 source=NCBI N/A 100.000 100 7.35e-30 54.7 Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum read1(100) source=NCBI sequenceID=KR738670 superkingdom=Eukaryota kingdom=Viridiplantae phylum=Streptophyta subphylum=Streptophytina class=Magnoliopsida subclass=NA infraclass=NA order=Malvales suborder=NA infraorder=NA superfamily=NA family=Malvaceae genus=Hibiscus species=Hibiscus trionum markercode=trnL lat=0.304 lon=36.87 source=NCBI N/A 100.000 100 7.35e-14 54.7 Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum read2.1(50) 1 2 3 4 5 6 7 8 9 read3(25) source=NCBI sequenceID=KR737595 superkingdom=Eukaryota kingdom=Viridiplantae phylum=Streptophyta subphylum=Streptophytina class=Magnoliopsida subclass=NA infraclass=NA order=Malvales suborder=NA infraorder=NA superfamily=NA family=Malvaceae genus=Hibiscus species=Hibiscus trionum markercode=trnL lat=0.304 lon=36.87 source=NCBI N/A 97.561 87 1.68e-14 71.3 Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum """ fasta_content = """>read1(100) count=100; ATCG >read2(50) merged_sample={}; count=1011; direction=right; seq_b_insertion=0; sminR=40.0; ali_length=53; seq_b_deletion=248; seq_a_deletion=248; seq_a_insertion=0; mode=alignment; sminL=40.0; seq_a_single=0; seq_b_single=0; gggcaatcctgagccaagtgactggagttcagataggtgcagagactcaatgg >read3(25) merged_sample={}; count=179; direction=right; sminR=40.0; ali_length=49; seq_b_deletion=252; seq_a_deletion=252; seq_b_insertion=0; seq_a_insertion=0; mode=alignment; sminL=40.0; seq_a_single=0; seq_b_single=0; gggcaatcctgagccaactggagttcagataggtgcagagactcaatgg """ blast_file = input_dir / "test_sync.tabular" fasta_file = input_dir / "test_sync.fasta" with open(blast_file, 'w') as f: f.write(blast_content) with open(fasta_file, 'w') as f: f.write(fasta_content) class Args: def __init__(self): self.input_anno = blast_file self.input_unanno = fasta_file self.header_anno = "Stage_1_translated/NLOOR_scripts/process_annotations_tool/test-data/sync_test.xlsx" self.eval_plot = None self.taxa_output = None self.circle_data = None self.filtered_fasta = str(output_dir / "filtered_fasta.fasta") self.anno_stats = str(output_dir / "sync_stats.txt") self.log = str(output_dir / "log.txt") self.uncertain_threshold = 0.9 self.eval_threshold = 1e-10 self.use_counts = True self.min_bitscore = 50 self.min_support = 1 self.ignore_rank = 'unknown' self.ignore_taxonomy = 'environmental' self.bitscore_perc_cutoff = 10 self.ignore_obiclean_type = 'singleton' self.ignore_illuminapairend_type = 'pairend' self.min_identity = 30 self.min_coverage = 30 self.ignore_seqids = '' args = Args() process_single_file(args.input_anno, args.input_unanno, args, log_messages=[]) df = pd.read_excel(args.header_anno, sheet_name='Individual_Reads') extracted = df['header'].str.extract(r'(read\d+)') headers = extracted[0].tolist() # Should have read1 and read3, read2 should be skipped assert 'read1' in headers, "read1 should be present" assert 'read2' not in headers, "read2 should not be present" assert 'read2.1' not in headers, "read2 should not be present" assert 'read3' in headers, "read3 should be present" print("✓ Test 5 PASSED: Header synchronization handled correctly") def test_check_header_string_all_behaviors(self): from Stage_1_translated.NLOOR_scripts.process_annotations_tool.blast_annotations_processor import \ check_header_string # clean header — allowed assert check_header_string(">readA count=10;", "", "") is True # blocks singleton assert check_header_string(">r obiclean_status={'XXX': 's'}", "singleton", "") is False # blocks variant assert check_header_string(">r obiclean_status={'XXX': 'i'}", "variant", "") is False # blocks head assert check_header_string(">r obiclean_status={'XXX': 'h'}", "head", "") is False # blocks pairend assert check_header_string(">r PairEnd", "pairend", "") is False # blocks consensus assert check_header_string(">r CONS", "consensus", "") is False # blocks custom string assert check_header_string(">r FooBar", "FooBar", "") is False # blocks when string is in second param assert check_header_string(">r blah", "", "blah") is False # blocks when multiple ignore values contain it assert check_header_string(">r PairEnd obiclean_status={'XXX': 's'}", "pairend,singleton", "") is False # allows when no match assert check_header_string(">r something", "pairend", "") is True def test_excel_merged_vs_individual(self, processed_output): """ Test 6: Excel Merged vs Individual Sheet Consistency Verifies that the merged sheet correctly aggregates data from the individual sheet. """ df_individual = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads') df_merged = pd.read_excel(processed_output.header_anno, sheet_name='Merged_by_Taxa') individual_taxa = df_individual['taxa'].nunique() assert len(df_merged) == individual_taxa, "Merged sheet doesn't match unique taxa count" # Check that counts are properly aggregated # For taxa with multiple reads, counts should be summed for _, merged_row in df_merged.iterrows(): taxa = merged_row['taxa'] individual_rows = df_individual[df_individual['taxa'] == taxa] expected_count = individual_rows['count'].sum() actual_count = merged_row['count'] assert actual_count == expected_count, f"Count mismatch for taxa {taxa}: expected {expected_count}, got {actual_count}" print("✓ Test 6 PASSED: Excel merged sheet correctly aggregates individual data") def test_annotation_statistics_accuracy(self, processed_output, sample_files): """ Test 7: Annotation Statistics Accuracy Verifies that calculated annotation statistics match the actual data. Adapted for the new plain-text log file instead of tab-separated output. """ stats = {} with open(processed_output.anno_stats, 'r') as f: lines = f.readlines() for line in lines: line = line.strip() if not line or ":" not in line: continue key, value = line.split(":", 1) key = key.strip() value = value.strip() try: stats[key] = float(value) except ValueError: stats[key] = value assert stats["total_sequences"] == 4.0, "Total sequences count incorrect" assert stats["annotated_sequences"] == 3.0, "Annotated sequence count incorrect" assert stats["total_unique"] == 200.0, "Total unique count incorrect" assert stats["unique_annotated"] == 150.0, "Unique annotated count incorrect" assert stats["percentage_annotated"] == 75.0, "Percentage annotated incorrect" assert stats["percentage_unique_annotated"] == 75.0, "Percentage unique annotated incorrect" print("✓ Test 7 PASSED: Annotation statistics are accurate") def test_combined_all_filters(self, test_data_dir): """ Single integrated test that validates all FASTA + BLAST filter rules. Every read is designed to fail exactly one filter, except readOK. """ input_dir = test_data_dir / "input" output_dir = test_data_dir / "output" fasta = input_dir / "combined_filters.fasta" blast = input_dir / "combined_filters.tabular" fasta.write_text( ">lowSupport(1) count=1;\nACGT\n" ">obicleanFail(10) count=10; obiclean_status={'XXX': 's'};\nACGT\n" ">pairendFail_PairEnd(10) count=10;\nACGT\n" ">identityFail(10) count=10;\nACGT\n" ">coverageFail(10) count=10;\nACGT\n" ">bitscoreFail(10) count=10;\nACGT\n" ">bscutoffHigh(10) count=10;\nACGT\n" ">envTaxFail(10) count=10;\nACGT\n" ">rankFail(10) count=10;\nACGT\n" ">seqidFail(10) count=10;\nACGT\n" ">readOK(10) count=10;\nACGT\n" ">readOK_multiple_id(10) count=10;\nACGT\n" ) blast.write_text( # min_support (count=1 < 5) "lowSupport(1)\ts\tid1\t123\t99\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n" # ignore_obiclean_type = singleton "obicleanFail(10)\ts\tid2\t123\t99\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n" # ignore_illuminapairedend_type = pairend "pairendFail_PairEnd(10)\ts\tid3\t123\t99\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n" # min_identity = 90 → identity = 50 fails "identityFail(10)\ts\tid4\t123\t50\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n" # min_coverage = 50 → coverage = 20 fails "coverageFail(10)\ts\tid5\t123\t99\t20\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n" # min_bitscore = 60 → bitscore = 10 fails "bitscoreFail(10)\ts\tid6\t123\t99\t99\t1e-50\t10\tsrc\tA / B / C / D / E / F / G\n" # bitscore_perc_cutoff: best = 200 → cutoff = 180 → bitscore 150 fails "bscutoffHigh(10)\ts\tid7.1\t123\t99\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G.1\n" "bscutoffHigh(10)\ts\tid7.2\t123\t99\t99\t1e-50\t150\tsrc\tA / B / C / D / E / F / G.2\n" # ignore_taxonomy = 'environmental' "envTaxFail(10)\ts\tid8\t123\t99\t99\t1e-50\t200\tsrc\tEnvironmental / B / C / D / E / F / G\n" # ignore_rank = 'unknown' "rankFail(10)\ts\tid9\t123\t99\t99\t1e-50\t200\tsrc\tUnknown / B / C / D / E / F / G\n" # ignore_seqids = BADSEQ "seqidFail(10)\ts\tBADSEQ\t123\t99\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n" # readOK (valid, full taxonomy) "readOK(10)\ts\tidGood\t123\t99\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n" # readOK_multiple_id (valid, full taxonomy, multiple id's) "readOK_multiple_id(10)\ts\tidGood.1\t123\t99\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n" "readOK_multiple_id(10)\ts\tidGood.2\t123\t99\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n" ) class Args: def __init__(self): self.input_anno = str(blast) self.input_unanno = str(fasta) self.header_anno = str(output_dir / "combined.xlsx") self.filtered_fasta = str(output_dir / "combined.fasta") self.anno_stats = str(output_dir / "combined_stats.txt") self.eval_plot = None self.taxa_output = None self.circle_data = None self.log = str(output_dir / "log.txt") self.uncertain_threshold = 0.9 self.eval_threshold = 1e-10 self.use_counts = True self.min_bitscore = 60 self.min_support = 5 self.ignore_rank = 'unknown' self.ignore_taxonomy = 'environmental' self.bitscore_perc_cutoff = 10 self.ignore_obiclean_type = 'singleton' self.ignore_illuminapairend_type = 'pairend' self.min_identity = 90 self.min_coverage = 50 self.ignore_seqids = 'BADSEQ' args = Args() process_single_file(args.input_anno, args.input_unanno, args, log_messages=[]) with open(args.filtered_fasta) as f: headers = [l.strip() for l in f if l.startswith(">")] assert '>obicleanFail(10) count=10;' not in headers assert '>pairendFail_PairEnd(10) count=10;' not in headers assert len(headers) == 9, "FASTA filtering only applies to header-based rules" df = pd.read_excel(args.header_anno, sheet_name="Individual_Reads") seq_ids = { sid for val in df["seq_id"] for sid in (ast.literal_eval(val) if isinstance(val, str) else val) } expected = {'idGood.1', 'idGood', 'id7.1', 'idGood.2'} assert seq_ids == expected, f"Expected surviving seq_ids {expected}, got {seq_ids}" def test_log_filters_count(self, processed_output): """ Verify that the BLAST filter counters in the log file match expected structure. """ with open(processed_output.anno_stats) as f: log = f.read() assert "=== PARAMETERS USED ===" in log assert "input_anno:" in log assert "input_unanno:" in log assert "FASTA: total headers: 4" in log assert "FASTA: headers kept after filters" in log assert "BLAST: total hits read: 8" in log assert "BLAST: hits kept after quality filters: 7" in log assert "ANNOTATION: total FASTA headers considered: 4" in log assert "ANNOTATION: reads with BLAST hits: 3" in log assert "ANNOTATION: reads without BLAST hits: 1" in log assert "E-value plot written to:" in log assert "Taxa summary written to:" in log assert "Header annotations written to:" in log assert "Circle diagram JSON written to:" in log assert "=== ANNOTATION STATISTICS ===" in log assert "percentage_annotated: 75.0" in log assert "unique_annotated: 150" in log assert "total_unique: 200" in log def test_missing_blast_file_graceful(self, test_data_dir): """ Crash / robustness test. When the BLAST file does NOT exist, the processor should: - not crash - write an anno_stats log mentioning the error - return without creating header_anno """ input_dir = test_data_dir / "input" output_dir = test_data_dir / "output" fasta = input_dir / "missing_blast_test.fasta" fasta.write_text(">read1(10) count=10;\nACGT\n") missing_blast = input_dir / "this_file_does_not_exist.tabular" class Args: def __init__(self): self.input_anno = str(missing_blast) self.input_unanno = str(fasta) self.header_anno = str(output_dir / "missing_blast_header.xlsx") self.filtered_fasta = str(output_dir / "missing_blast_filtered.fasta") self.anno_stats = str(output_dir / "missing_blast_stats.txt") self.eval_plot = None self.taxa_output = None self.circle_data = None self.log = str(output_dir / "log.txt") self.uncertain_threshold = 0.9 self.eval_threshold = 1e-10 self.use_counts = True self.min_bitscore = 0 self.min_support = 1 self.ignore_rank = 'unknown' self.ignore_taxonomy = 'environmental' self.bitscore_perc_cutoff = 10 self.ignore_obiclean_type = 'singleton' self.ignore_illuminapairend_type = 'pairend' self.min_identity = 0 self.min_coverage = 0 self.ignore_seqids = '' args = Args() process_single_file(args.input_anno, args.input_unanno, args, log_messages=[]) assert not os.path.exists(args.header_anno), "Header file should not be created when BLAST is missing" assert os.path.exists(args.anno_stats), "anno_stats log should be created on error" from pathlib import Path log_text = Path(args.anno_stats).read_text() assert "Error: Input file" in log_text, "Missing BLAST file error not logged" assert "Starting processing for FASTA" in log_text, "FASTA processing log missing" if __name__ == "__main__": pytest.main([__file__])
