Mercurial > repos > onnodg > blast_annotations_processor
view tests/test_blast_annotations_processor.py @ 1:2acf82433aa4 draft default tip
planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_annotations_tool commit d771f9fbfd42bcdeda1623d954550882a0863847-dirty
| author | onnodg |
|---|---|
| date | Mon, 20 Oct 2025 12:26:51 +0000 |
| parents | a3989edf0a4a |
| children |
line wrap: on
line source
""" Test suite for BLAST annotation processor. """ import pytest import os import sys import json import pandas as pd from pathlib import Path # Add the module to path for importing sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from Stage_1_translated.NLOOR_scripts.process_annotations_tool.blast_annotations_processor import ( process_single_file, resolve_taxon_majority, TAXONOMIC_LEVELS ) class TestBlastAnnotationProcessor: """Test class for BLAST annotation processor""" @pytest.fixture(scope="class") def test_data_dir(self): """Setup test data directory structure""" base_dir = Path("test-data") base_dir.mkdir(exist_ok=True) # Create subdirectories for subdir in ["input", "expected", "output"]: (base_dir / subdir).mkdir(exist_ok=True) return base_dir @pytest.fixture(scope="class") def sample_files(self, test_data_dir): """Create sample input files for testing""" input_dir = test_data_dir / "input" # Sample annotated BLAST file blast_content = """#Query ID #Subject #Subject accession #Subject Taxonomy ID #Identity percentage #Coverage #evalue #bitscore #Source #Taxonomy read1(100) subject2 subject2 subject2 90.0 95 1e-45 180 database1 Bacteria / Firmicutes / Bacilli / Bacillales / Bacillaceae / Bacillus / Bacillus_subtilis read1(100) subject1 subject1 subject1 95.889 100 1e-50 200 database1 Bacteria / Firmicutes / Bacilli / Bacillales / Bacillaceae / Bacillus / Bacillus_subtilis read2(50) subject3 subject3 subject3 85.0 90 1e-40 160 database2 Bacteria / Proteobacteria / Gammaproteobacteria / Enterobacterales / Enterobacteriaceae / Escherichia / Escherichia_coli read3(25) subject4 subject4 subject4 80.0 85 1e-35 140 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_smithii read4(25) subject4 subject4 subject4 80.0 85 1e-35 140 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_blabla read4(25) subject4 subject4 subject4 80.0 85 1e-40 140 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_eclhi read4(25) subject4 subject4 subject4 80.0 85 1e-35 140 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_elchi read4(25) subject4 subject4 subject4 90.0 87 1e-50 160 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_smithii """ # Sample unannotated FASTA file (headers must match BLAST q_id) fasta_content = """>read1(100) count=100; ATCGATCGATCGATCG >read2(50) count=50; GCTAGCTAGCTAGCTA >read3(25) count=25; TGACTGACTGACTGAC >read4(25) count=25; TGAAAAAAACACCAC """ blast_file = input_dir / "test_blast.tabular" fasta_file = input_dir / "test_sequences.fasta" with open(blast_file, 'w') as f: f.write(blast_content) with open(fasta_file, 'w') as f: f.write(fasta_content) return { 'blast': str(blast_file), 'fasta': str(fasta_file) } @pytest.fixture(scope="class") def processed_output(self, test_data_dir, sample_files): """Run the processor on sample files and return output paths""" output_dir = test_data_dir / "output" # Create arguments object class Args: def __init__(self): self.input_anno = sample_files['blast'] self.input_unanno = sample_files['fasta'] self.eval_plot = str(output_dir / "eval_plot.png") self.taxa_output = str(output_dir / "taxa_output.txt") self.circle_data = str(output_dir / "circle_data.json") self.header_anno = str(output_dir / "header_anno.xlsx") self.anno_stats = str(output_dir / "anno_stats.txt") self.uncertain_threshold = 0.9 self.eval_threshold = 1e-10 self.use_counts = True args = Args() # Process the files process_single_file(args.input_anno, args.input_unanno, args) return args def test_data_integrity_best_values(self, processed_output): """ Test 1: Data Integrity - Best Values Selection Verifies that for each read, the best e-value corresponds to the correct bitscore, identity, coverage, and taxonomic annotation. """ # Read the Excel output to verify the best values are correctly selected df = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads') print(df) # For read1(100), verify best e-value (1e-50) corresponds to correct values read1_row = df[df['header'].str.contains('read1')].iloc[0] assert read1_row['bitscore'] == float(200), "best bitscore doesn't match" assert read1_row['e_value'] == pytest.approx(1e-50, rel=1e-8, abs=1e-49), "Best e-value not correctly selected for read1" assert read1_row['identity percentage'] == float(95.889), "Identity doesn't match best bitscore for read1" assert 'Bacillus_subtilis' in read1_row['taxa'], "Taxa doesn't match best hit for read1" read4_row = df[df['header'].str.contains('read4')].iloc[0] assert read4_row['bitscore'] == float(160), "best bitscore doesn't match" assert 'Methanobrevibacter_smithii' in read4_row['taxa'], "Taxa doesn't match best hit for read1" print("✓ Test 1 PASSED: Best values correctly associated for each read") def test_read_count_consistency(self, processed_output): """ Test 2: Read Count Consistency Verifies that read counts from FASTA headers are correctly preserved and aggregated in all output files. """ # Check Excel output df = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads') # Verify counts are correctly extracted and preserved expected_counts = {'read1': 100, 'read2': 50, 'read3': 25, 'read4':25} skipped_reads = [] for read_name, expected_count in expected_counts.items(): subset = df.loc[df['header'] == read_name] if subset.empty: skipped_reads.append(read_name) # remember we skip this read continue row = subset.iloc[0] assert row['count'] == expected_count, f"Count mismatch for {read_name}" # Check annotation stats with open(processed_output.anno_stats, 'r') as f: stats_content = f.read() # Total unique count should be 175 (100+50+25) assert 'total_unique\t200' in stats_content, "Total unique count incorrect in stats" if skipped_reads: assert all(read not in df['header'].values for read in skipped_reads) print("✓ Test 2 PASSED: Read counts consistent across all outputs") def test_lowest_common_ancester(self, processed_output): """ Test 3: Big Input Files Tests the functioning of lowest common ancestor selection with realistic inputfile sizes """ # Test the function directly with known conflicts test_conflicts = { 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita a': 10, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita b': 1, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita c': 1, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita d': 1, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita e': 1, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia a': 187, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia b': 2, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia c': 2, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia d': 2, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia e': 2, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia f': 12, 'Viridiplantae / Streptophyta / Bryopsida / Funariales / Funariaceae / Funaria / Uncertain taxa': 6 } resolved_short, resolved_long = resolve_taxon_majority(test_conflicts, 0.9) assert 'Ciceronia a' in resolved_short, "Conflict not resolved to uncertain taxa" test_90_precent_conflicts = { 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita a': 90, 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita b': 10, 'Viridiplantae / Streptophyta / Bryopsida / Funariales / Funariaceae / Funaria / Uncertain taxa': 6 } resolved_short, resolved_long = resolve_taxon_majority(test_90_precent_conflicts, 0.9) assert 'Cicerbita a' in resolved_short, "Conflict not resolved to uncertain taxa" print("✓ Test 3 PASSED: Lowest common ancestor works correctly") def test_taxonomic_conflict_resolution(self, processed_output): """ Test 4: Taxonomic Conflict Resolution Tests the uncertainty threshold mechanism for resolving taxonomic conflicts. Uses a controlled scenario where multiple hits have different taxa. """ # Test the function directly with known conflicts test_conflicts = { 'Bacteria / Firmicutes / Bacilli': 2, 'Bacteria / Proteobacteria / Gammaproteobacteria': 1 } resolved_short, resolved_long = resolve_taxon_majority(test_conflicts, 0.9) # With threshold 0.9, should resolve to most common (2/3 = 0.67 < 0.9, so uncertain) assert 'Uncertain taxa' in resolved_short, "Conflict not resolved to uncertain taxa" # Test with higher confidence test_high_confidence = { 'Bacteria / Firmicutes / Bacilli': 9, 'Bacteria / Proteobacteria / Gammaproteobacteria': 1 } resolved_short, resolved_long = resolve_taxon_majority(test_high_confidence, 0.9) assert 'Firmicutes' in resolved_short, "High confidence case not resolved correctly" print("✓ Test 4 PASSED: Taxonomic conflict resolution working correctly") def test_output_file_structures(self, processed_output): """ Test 5: Output File Structure Validation Verifies that all output files are created with correct structure and format. """ # Test Excel file structure excel_file = processed_output.header_anno assert os.path.exists(excel_file), "Excel output file not created" # Check both sheets exist xl_file = pd.ExcelFile(excel_file) expected_sheets = ['Individual_Reads', 'Merged_by_Taxa'] assert all(sheet in xl_file.sheet_names for sheet in expected_sheets), "Missing Excel sheets" # Test Individual_Reads sheet structure df_individual = pd.read_excel(excel_file, sheet_name='Individual_Reads') expected_cols = ['header', 'e_value', 'identity percentage', 'coverage', 'bitscore', 'count', 'source', 'taxa'] assert all(col in df_individual.columns for col in expected_cols), "Missing columns in Individual_Reads" # Test taxa output structure with open(processed_output.taxa_output, 'r') as f: taxa_lines = f.readlines() # Should have header line and data lines assert len(taxa_lines) > 2, "Taxa output too short" assert 'percentage_rooted\tnumber_rooted' in taxa_lines[1], "Taxa output header incorrect" # Test circle data JSON structure with open(processed_output.circle_data, 'r') as f: circle_data = json.load(f) assert isinstance(circle_data, list), "Circle data should be a list" assert len(circle_data) == len(TAXONOMIC_LEVELS), "Circle data should have entry per taxonomic level" print("✓ Test 5 PASSED: All output files have correct structure") def test_evalue_filtering(self, test_data_dir): """ Test 6: E-value Threshold Filtering Tests that hits above the e-value threshold are correctly filtered out. """ input_dir = test_data_dir / "input" output_dir = test_data_dir / "output" # Create test file with mix of good and bad e-values blast_content_mixed = """#Query ID #Subject #Subject accession #Subject Taxonomy ID #Identity percentage #Coverage #evalue #bitscore #Source #Taxonomy read1(100) subject1 95.0 100 50 75 1e-50 200 database1 Viridiplantae / Streptophyta / Magnoliopsida / Fagales / Juglandaceae / Uncertain taxa / Uncertain taxa read1(100) subject2 90.0 95 45 70 1e-5 180 database1 Viridiplantae / Streptophyta / Magnoliopsida / Rosales / Rosaceae / Sorbus / Sorbus aucuparia read2(50) subject3 85.0 90 40 65 1e-3 160 database2 Viridiplantae / Streptophyta / Magnoliopsida / Solanales / Solanaceae / Uncertain taxa / Uncertain taxa """ fasta_content = """>read1(100) count=100; ATCG >read2(50) count=50; GCTA """ blast_file = "Stage_1_translated/NLOOR_scripts/process_annotations_tool/test-data/sorted_test.tabular" fasta_file = "Stage_1_translated/NLOOR_scripts/process_annotations_tool/test-data/sorted_test.fasta" with open(blast_file, 'w') as f: f.write(blast_content_mixed) with open(fasta_file, 'w') as f: f.write(fasta_content) # Process with strict e-value threshold class Args: def __init__(self): self.input_anno = str(blast_file) self.input_unanno = str(fasta_file) self.header_anno = str(output_dir / "evalue_test.xlsx") self.eval_plot = None self.taxa_output = None self.circle_data = None self.anno_stats = None self.uncertain_threshold = 0.9 self.eval_threshold = 1e-10 # Very strict threshold self.use_counts = True args = Args() process_single_file(args.input_anno, args.input_unanno, args) # Check that only the 1e-50 hit remains df = pd.read_excel(args.header_anno, sheet_name='Individual_Reads') # Should only have read1 (with 1e-50), read2 should be filtered out assert len(df) == 1, f"Expected 1 read after filtering, got {len(df)}" assert df.iloc[0]['e_value'] == pytest.approx(1e-50, rel=1e-8, abs=1e-12), "Wrong hit survived e-value filtering" print("✓ Test 6 PASSED: E-value filtering working correctly") def test_header_synchronization(self, test_data_dir): """ Test 7: Header Synchronization Between Files Tests that the processor correctly handles mismatched headers between annotated and unannotated files. """ input_dir = test_data_dir / "input" output_dir = test_data_dir / "output" # Create mismatched files blast_content = """#Query ID #Subject #Subject accession #Subject Taxonomy ID #Identity percentage #Coverage #evalue #bitscore #Source #Taxonomy read1(100) source=NCBI sequenceID=KR738003 superkingdom=Eukaryota kingdom=Viridiplantae phylum=Streptophyta subphylum=Streptophytina class=Magnoliopsida subclass=NA infraclass=NA order=Malvales suborder=NA infraorder=NA superfamily=NA family=Malvaceae genus=Hibiscus species=Hibiscus trionum markercode=trnL lat=0.304 lon=36.87 source=NCBI N/A 100.000 100 7.35e-30 54.7 Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum read1(100) source=NCBI sequenceID=KR738670 superkingdom=Eukaryota kingdom=Viridiplantae phylum=Streptophyta subphylum=Streptophytina class=Magnoliopsida subclass=NA infraclass=NA order=Malvales suborder=NA infraorder=NA superfamily=NA family=Malvaceae genus=Hibiscus species=Hibiscus trionum markercode=trnL lat=0.304 lon=36.87 source=NCBI N/A 100.000 100 7.35e-14 54.7 Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum read3(25) source=NCBI sequenceID=KR737595 superkingdom=Eukaryota kingdom=Viridiplantae phylum=Streptophyta subphylum=Streptophytina class=Magnoliopsida subclass=NA infraclass=NA order=Malvales suborder=NA infraorder=NA superfamily=NA family=Malvaceae genus=Hibiscus species=Hibiscus trionum markercode=trnL lat=0.304 lon=36.87 source=NCBI N/A 97.561 87 1.68e-14 71.3 Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum """ fasta_content = """>read1(100) count=100; ATCG >read2(50) merged_sample={}; count=1011; direction=right; seq_b_insertion=0; sminR=40.0; ali_length=53; seq_b_deletion=248; seq_a_deletion=248; seq_a_insertion=0; mode=alignment; sminL=40.0; seq_a_single=0; seq_b_single=0; gggcaatcctgagccaagtgactggagttcagataggtgcagagactcaatgg >read3(25) merged_sample={}; count=179; direction=right; sminR=40.0; ali_length=49; seq_b_deletion=252; seq_a_deletion=252; seq_b_insertion=0; seq_a_insertion=0; mode=alignment; sminL=40.0; seq_a_single=0; seq_b_single=0; gggcaatcctgagccaactggagttcagataggtgcagagactcaatgg """ blast_file = input_dir / "test_sync.tabular" fasta_file = input_dir / "test_sync.fasta" with open(blast_file, 'w') as f: f.write(blast_content) with open(fasta_file, 'w') as f: f.write(fasta_content) class Args: def __init__(self): self.input_anno = blast_file self.input_unanno = fasta_file self.header_anno = "Stage_1_translated/NLOOR_scripts/process_annotations_tool/test-data/sync_test.xlsx" self.eval_plot = None self.taxa_output = None self.circle_data = None self.anno_stats = str(output_dir / "sync_stats.txt") self.uncertain_threshold = 0.9 self.eval_threshold = 1e-10 self.use_counts = True args = Args() process_single_file(args.input_anno, args.input_unanno, args) # Check that processing handled the mismatch correctly df = pd.read_excel(args.header_anno, sheet_name='Individual_Reads') extracted = df['header'].str.extract(r'(read\d+)') # final list headers = extracted[0].tolist() # Should have read1 and read3, read2 should be skipped assert 'read1' in headers, "read1 should be present" assert 'read3' in headers, "read3 should be present" print("✓ Test 7 PASSED: Header synchronization handled correctly") def test_excel_merged_vs_individual(self, processed_output): """ Test 8: Excel Merged vs Individual Sheet Consistency Verifies that the merged sheet correctly aggregates data from the individual sheet. """ df_individual = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads') df_merged = pd.read_excel(processed_output.header_anno, sheet_name='Merged_by_Taxa') # Count unique taxa in individual sheet individual_taxa = df_individual['taxa'].nunique() # Should match number of rows in merged sheet assert len(df_merged) == individual_taxa, "Merged sheet doesn't match unique taxa count" # Check that counts are properly aggregated # For taxa with multiple reads, counts should be summed for _, merged_row in df_merged.iterrows(): taxa = merged_row['taxa'] individual_rows = df_individual[df_individual['taxa'] == taxa] expected_count = individual_rows['count'].sum() actual_count = merged_row['count'] assert actual_count == expected_count, f"Count mismatch for taxa {taxa}: expected {expected_count}, got {actual_count}" print("✓ Test 8 PASSED: Excel merged sheet correctly aggregates individual data") def test_annotation_statistics_accuracy(self, processed_output, sample_files): """ Test 9: Annotation Statistics Accuracy Verifies that calculated annotation statistics match the actual data. """ # Read stats file stats = {} with open(processed_output.anno_stats, 'r') as f: lines = f.readlines()[1:] # Skip header for line in lines: key, value = line.strip().split('\t') try: stats[key] = float(value) except ValueError: stats[key] = value # Manual verification assert stats['total_sequences'] == 4.0, "Total sequences count incorrect" assert stats['annotated_sequences'] == 3.0, "Annotated sequences count incorrect" assert stats['total_unique'] == 200, "Total unique count incorrect" assert stats['unique_annotated'] == 150, "Unique annotated count incorrect" assert stats['percentage_annotated'] == 75.0, "Percentage annotated incorrect" assert stats['percentage_unique_annotated'] == 75.0, "Percentage unique annotated incorrect" print("✓ Test 9 PASSED: Annotation statistics are accurate") if __name__ == "__main__": # Run all tests in this file pytest.main([__file__])
