diff tests/test_blast_annotations_processor.py @ 0:a3989edf0a4a draft

planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_annotations_tool commit c944fd5685f295acba06679e85b67973c173b137
author onnodg
date Tue, 14 Oct 2025 09:08:30 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test_blast_annotations_processor.py	Tue Oct 14 09:08:30 2025 +0000
@@ -0,0 +1,432 @@
+"""
+Test suite for BLAST annotation processor.
+"""
+
+import pytest
+import os
+import sys
+import json
+import pandas as pd
+from pathlib import Path
+
+# Add the module to path for importing
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+from Stage_1_translated.NLOOR_scripts.process_annotations_tool.blast_annotations_processor import (
+    process_single_file,
+    resolve_taxon_majority,
+    TAXONOMIC_LEVELS
+)
+
+
+class TestBlastAnnotationProcessor:
+    """Test class for BLAST annotation processor"""
+
+    @pytest.fixture(scope="class")
+    def test_data_dir(self):
+        """Setup test data directory structure"""
+        base_dir = Path("test-data")
+        base_dir.mkdir(exist_ok=True)
+
+        # Create subdirectories
+        for subdir in ["input", "expected", "output"]:
+            (base_dir / subdir).mkdir(exist_ok=True)
+
+        return base_dir
+
+    @pytest.fixture(scope="class")
+    def sample_files(self, test_data_dir):
+        """Create sample input files for testing"""
+        input_dir = test_data_dir / "input"
+
+        # Sample annotated BLAST file
+        blast_content = """#Query ID	#Subject	#Subject accession	#Subject Taxonomy ID	#Identity percentage	#Coverage	#evalue	#bitscore	#Source	#Taxonomy
+        read1(100)	subject2	subject2	subject2	90.0	95	1e-45	180	database1	Bacteria / Firmicutes / Bacilli / Bacillales / Bacillaceae / Bacillus / Bacillus_subtilis
+read1(100)	subject1	subject1	subject1	95.889	100	1e-50	200	database1	Bacteria / Firmicutes / Bacilli / Bacillales / Bacillaceae / Bacillus / Bacillus_subtilis
+read2(50)	subject3	subject3	subject3    85.0	90	1e-40	160	database2	Bacteria / Proteobacteria / Gammaproteobacteria / Enterobacterales / Enterobacteriaceae / Escherichia / Escherichia_coli
+read3(25)	subject4	subject4	subject4   	80.0	85	1e-35	140	database1	Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_smithii
+read4(25)	subject4	subject4	subject4   	80.0	85	1e-35	140	database1	Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_blabla
+read4(25)	subject4	subject4	subject4   	80.0	85	1e-40	140	database1	Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_eclhi
+read4(25)	subject4	subject4	subject4   	80.0	85	1e-35	140	database1	Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_elchi
+read4(25)	subject4	subject4	subject4   	90.0	87	1e-50	160	database1	Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_smithii
+"""
+
+        # Sample unannotated FASTA file (headers must match BLAST q_id)
+        fasta_content = """>read1(100) count=100;
+ATCGATCGATCGATCG
+>read2(50) count=50;
+GCTAGCTAGCTAGCTA
+>read3(25) count=25;
+TGACTGACTGACTGAC
+>read4(25) count=25;
+TGAAAAAAACACCAC
+"""
+
+        blast_file = input_dir / "test_blast.tabular"
+        fasta_file = input_dir / "test_sequences.fasta"
+
+        with open(blast_file, 'w') as f:
+            f.write(blast_content)
+        with open(fasta_file, 'w') as f:
+            f.write(fasta_content)
+
+        return {
+            'blast': str(blast_file),
+            'fasta': str(fasta_file)
+        }
+
+    @pytest.fixture(scope="class")
+    def processed_output(self, test_data_dir, sample_files):
+        """Run the processor on sample files and return output paths"""
+        output_dir = test_data_dir / "output"
+
+        # Create arguments object
+        class Args:
+            def __init__(self):
+                self.input_anno = sample_files['blast']
+                self.input_unanno = sample_files['fasta']
+                self.eval_plot = str(output_dir / "eval_plot.png")
+                self.taxa_output = str(output_dir / "taxa_output.txt")
+                self.circle_data = str(output_dir / "circle_data.json")
+                self.header_anno = str(output_dir / "header_anno.xlsx")
+                self.anno_stats = str(output_dir / "anno_stats.txt")
+                self.uncertain_threshold = 0.9
+                self.eval_threshold = 1e-10
+                self.use_counts = True
+
+        args = Args()
+
+        # Process the files
+        process_single_file(args.input_anno, args.input_unanno, args)
+
+        return args
+
+    def test_data_integrity_best_values(self, processed_output):
+        """
+        Test 1: Data Integrity - Best Values Selection
+
+        Verifies that for each read, the best e-value corresponds to the correct
+        bitscore, identity, coverage, and taxonomic annotation.
+        """
+        # Read the Excel output to verify the best values are correctly selected
+        df = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads')
+        print(df)
+        # For read1(100), verify best e-value (1e-50) corresponds to correct values
+        read1_row = df[df['header'].str.contains('read1')].iloc[0]
+        assert read1_row['bitscore'] == float(200), "best bitscore doesn't match"
+        assert read1_row['e_value'] == pytest.approx(1e-50, rel=1e-8, abs=1e-49), "Best e-value not correctly selected for read1"
+        assert read1_row['identity percentage'] == float(95.889), "Identity doesn't match best bitscore for read1"
+        assert 'Bacillus_subtilis' in read1_row['taxa'], "Taxa doesn't match best hit for read1"
+
+        read4_row = df[df['header'].str.contains('read4')].iloc[0]
+        assert read4_row['bitscore'] == float(160), "best bitscore doesn't match"
+        assert 'Methanobrevibacter_smithii' in read4_row['taxa'], "Taxa doesn't match best hit for read1"
+        print("✓ Test 1 PASSED: Best values correctly associated for each read")
+
+    def test_read_count_consistency(self, processed_output):
+        """
+        Test 2: Read Count Consistency
+
+        Verifies that read counts from FASTA headers are correctly preserved
+        and aggregated in all output files.
+        """
+        # Check Excel output
+        df = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads')
+        # Verify counts are correctly extracted and preserved
+        expected_counts = {'read1': 100, 'read2': 50, 'read3': 25, 'read4':25}
+
+        skipped_reads = []
+
+        for read_name, expected_count in expected_counts.items():
+            subset = df.loc[df['header'] == read_name]
+            if subset.empty:
+                skipped_reads.append(read_name)  # remember we skip this read
+                continue
+            row = subset.iloc[0]
+            assert row['count'] == expected_count, f"Count mismatch for {read_name}"
+
+        # Check annotation stats
+        with open(processed_output.anno_stats, 'r') as f:
+            stats_content = f.read()
+        # Total unique count should be 175 (100+50+25)
+        assert 'total_unique\t200' in stats_content, "Total unique count incorrect in stats"
+        if skipped_reads:
+            assert all(read not in df['header'].values for read in skipped_reads)
+        print("✓ Test 2 PASSED: Read counts consistent across all outputs")
+
+    def test_lowest_common_ancester(self, processed_output):
+        """
+        Test 3: Big Input Files
+
+        Tests the functioning of lowest common ancestor selection with realistic inputfile sizes
+        """
+        # Test the function directly with known conflicts
+        test_conflicts = {
+            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita a': 10,
+            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita b': 1,
+            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita c': 1,
+            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita d': 1,
+            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita e': 1,
+            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia a': 187,
+            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia b': 2,
+            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia c': 2,
+            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia d': 2,
+            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia e': 2,
+            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia f': 12,
+            'Viridiplantae / Streptophyta / Bryopsida / Funariales / Funariaceae / Funaria / Uncertain taxa': 6
+        }
+        resolved_short, resolved_long = resolve_taxon_majority(test_conflicts, 0.9)
+        assert 'Ciceronia a' in resolved_short, "Conflict not resolved to uncertain taxa"
+
+        test_90_precent_conflicts = {
+            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita a': 90,
+            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita b': 10,
+            'Viridiplantae / Streptophyta / Bryopsida / Funariales / Funariaceae / Funaria / Uncertain taxa': 6
+        }
+        resolved_short, resolved_long = resolve_taxon_majority(test_90_precent_conflicts, 0.9)
+        assert 'Cicerbita a' in resolved_short, "Conflict not resolved to uncertain taxa"
+
+        print("✓ Test 3 PASSED: Lowest common ancestor works correctly")
+
+
+    def test_taxonomic_conflict_resolution(self, processed_output):
+        """
+        Test 4: Taxonomic Conflict Resolution
+
+        Tests the uncertainty threshold mechanism for resolving taxonomic conflicts.
+        Uses a controlled scenario where multiple hits have different taxa.
+        """
+        # Test the function directly with known conflicts
+        test_conflicts = {
+            'Bacteria / Firmicutes / Bacilli': 2,
+            'Bacteria / Proteobacteria / Gammaproteobacteria': 1
+        }
+
+        resolved_short, resolved_long = resolve_taxon_majority(test_conflicts, 0.9)
+
+        # With threshold 0.9, should resolve to most common (2/3 = 0.67 < 0.9, so uncertain)
+        assert 'Uncertain taxa' in resolved_short, "Conflict not resolved to uncertain taxa"
+
+        # Test with higher confidence
+        test_high_confidence = {
+            'Bacteria / Firmicutes / Bacilli': 9,
+            'Bacteria / Proteobacteria / Gammaproteobacteria': 1
+        }
+
+        resolved_short, resolved_long = resolve_taxon_majority(test_high_confidence, 0.9)
+        assert 'Firmicutes' in resolved_short, "High confidence case not resolved correctly"
+
+        print("✓ Test 4 PASSED: Taxonomic conflict resolution working correctly")
+
+    def test_output_file_structures(self, processed_output):
+        """
+        Test 5: Output File Structure Validation
+
+        Verifies that all output files are created with correct structure and format.
+        """
+        # Test Excel file structure
+        excel_file = processed_output.header_anno
+        assert os.path.exists(excel_file), "Excel output file not created"
+
+        # Check both sheets exist
+        xl_file = pd.ExcelFile(excel_file)
+        expected_sheets = ['Individual_Reads', 'Merged_by_Taxa']
+        assert all(sheet in xl_file.sheet_names for sheet in expected_sheets), "Missing Excel sheets"
+
+        # Test Individual_Reads sheet structure
+        df_individual = pd.read_excel(excel_file, sheet_name='Individual_Reads')
+        expected_cols = ['header', 'e_value', 'identity percentage', 'coverage',
+                         'bitscore', 'count', 'source', 'taxa']
+        assert all(col in df_individual.columns for col in expected_cols), "Missing columns in Individual_Reads"
+
+        # Test taxa output structure
+        with open(processed_output.taxa_output, 'r') as f:
+            taxa_lines = f.readlines()
+
+        # Should have header line and data lines
+        assert len(taxa_lines) > 2, "Taxa output too short"
+        assert 'percentage_rooted\tnumber_rooted' in taxa_lines[1], "Taxa output header incorrect"
+
+        # Test circle data JSON structure
+        with open(processed_output.circle_data, 'r') as f:
+            circle_data = json.load(f)
+
+        assert isinstance(circle_data, list), "Circle data should be a list"
+        assert len(circle_data) == len(TAXONOMIC_LEVELS), "Circle data should have entry per taxonomic level"
+
+        print("✓ Test 5 PASSED: All output files have correct structure")
+
+    def test_evalue_filtering(self, test_data_dir):
+        """
+        Test 6: E-value Threshold Filtering
+
+        Tests that hits above the e-value threshold are correctly filtered out.
+        """
+        input_dir = test_data_dir / "input"
+        output_dir = test_data_dir / "output"
+
+        # Create test file with mix of good and bad e-values
+        blast_content_mixed = """#Query ID	#Subject	#Subject accession	#Subject Taxonomy ID	#Identity percentage	#Coverage	#evalue	#bitscore	#Source	#Taxonomy
+        read1(100)	subject1	95.0	100	50	75	1e-50	200	database1	Viridiplantae / Streptophyta / Magnoliopsida / Fagales / Juglandaceae / Uncertain taxa / Uncertain taxa
+read1(100)	subject2	90.0	95	45	70	1e-5	180	database1	Viridiplantae / Streptophyta / Magnoliopsida / Rosales / Rosaceae / Sorbus / Sorbus aucuparia
+read2(50)	subject3	85.0	90	40	65	1e-3	160	database2	Viridiplantae / Streptophyta / Magnoliopsida / Solanales / Solanaceae / Uncertain taxa / Uncertain taxa
+"""
+
+        fasta_content = """>read1(100) count=100;
+ATCG
+>read2(50) count=50;
+GCTA
+"""
+
+        blast_file = "Stage_1_translated/NLOOR_scripts/process_annotations_tool/test-data/sorted_test.tabular"
+        fasta_file = "Stage_1_translated/NLOOR_scripts/process_annotations_tool/test-data/sorted_test.fasta"
+
+        with open(blast_file, 'w') as f:
+            f.write(blast_content_mixed)
+        with open(fasta_file, 'w') as f:
+            f.write(fasta_content)
+
+        # Process with strict e-value threshold
+        class Args:
+            def __init__(self):
+                self.input_anno = str(blast_file)
+                self.input_unanno = str(fasta_file)
+                self.header_anno = str(output_dir / "evalue_test.xlsx")
+                self.eval_plot = None
+                self.taxa_output = None
+                self.circle_data = None
+                self.anno_stats = None
+                self.uncertain_threshold = 0.9
+                self.eval_threshold = 1e-10  # Very strict threshold
+                self.use_counts = True
+
+        args = Args()
+        process_single_file(args.input_anno, args.input_unanno, args)
+
+        # Check that only the 1e-50 hit remains
+        df = pd.read_excel(args.header_anno, sheet_name='Individual_Reads')
+
+        # Should only have read1 (with 1e-50), read2 should be filtered out
+        assert len(df) == 1, f"Expected 1 read after filtering, got {len(df)}"
+        assert df.iloc[0]['e_value'] == pytest.approx(1e-50, rel=1e-8, abs=1e-12), "Wrong hit survived e-value filtering"
+
+        print("✓ Test 6 PASSED: E-value filtering working correctly")
+
+    def test_header_synchronization(self, test_data_dir):
+        """
+        Test 7: Header Synchronization Between Files
+
+        Tests that the processor correctly handles mismatched headers between
+        annotated and unannotated files.
+        """
+        input_dir = test_data_dir / "input"
+        output_dir = test_data_dir / "output"
+
+        # Create mismatched files
+        blast_content = """#Query ID	#Subject	#Subject accession	#Subject Taxonomy ID	#Identity percentage	#Coverage	#evalue	#bitscore	#Source	#Taxonomy
+read1(100)	source=NCBI   sequenceID=KR738003   superkingdom=Eukaryota   kingdom=Viridiplantae   phylum=Streptophyta   subphylum=Streptophytina   class=Magnoliopsida   subclass=NA   infraclass=NA   order=Malvales   suborder=NA   infraorder=NA   superfamily=NA   family=Malvaceae   genus=Hibiscus   species=Hibiscus trionum   markercode=trnL   lat=0.304   lon=36.87	source=NCBI	N/A	100.000	100	7.35e-30	54.7	Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum
+read1(100)	source=NCBI   sequenceID=KR738670   superkingdom=Eukaryota   kingdom=Viridiplantae   phylum=Streptophyta   subphylum=Streptophytina   class=Magnoliopsida   subclass=NA   infraclass=NA   order=Malvales   suborder=NA   infraorder=NA   superfamily=NA   family=Malvaceae   genus=Hibiscus   species=Hibiscus trionum   markercode=trnL   lat=0.304   lon=36.87	source=NCBI	N/A	100.000	100	7.35e-14	54.7	Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum
+read3(25)	source=NCBI   sequenceID=KR737595   superkingdom=Eukaryota   kingdom=Viridiplantae   phylum=Streptophyta   subphylum=Streptophytina   class=Magnoliopsida   subclass=NA   infraclass=NA   order=Malvales   suborder=NA   infraorder=NA   superfamily=NA   family=Malvaceae   genus=Hibiscus   species=Hibiscus trionum   markercode=trnL   lat=0.304   lon=36.87	source=NCBI	N/A	97.561	87	1.68e-14	71.3	Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum
+"""
+
+        fasta_content = """>read1(100) count=100;
+ATCG
+>read2(50) merged_sample={}; count=1011; direction=right; seq_b_insertion=0; sminR=40.0; ali_length=53; seq_b_deletion=248; seq_a_deletion=248; seq_a_insertion=0; mode=alignment; sminL=40.0; seq_a_single=0; seq_b_single=0; 
+gggcaatcctgagccaagtgactggagttcagataggtgcagagactcaatgg
+>read3(25) merged_sample={}; count=179; direction=right; sminR=40.0; ali_length=49; seq_b_deletion=252; seq_a_deletion=252; seq_b_insertion=0; seq_a_insertion=0; mode=alignment; sminL=40.0; seq_a_single=0; seq_b_single=0; 
+gggcaatcctgagccaactggagttcagataggtgcagagactcaatgg
+"""
+
+        blast_file = input_dir / "test_sync.tabular"
+        fasta_file = input_dir / "test_sync.fasta"
+
+        with open(blast_file, 'w') as f:
+            f.write(blast_content)
+        with open(fasta_file, 'w') as f:
+            f.write(fasta_content)
+
+        class Args:
+            def __init__(self):
+                self.input_anno = blast_file
+                self.input_unanno = fasta_file
+                self.header_anno = "Stage_1_translated/NLOOR_scripts/process_annotations_tool/test-data/sync_test.xlsx"
+                self.eval_plot = None
+                self.taxa_output = None
+                self.circle_data = None
+                self.anno_stats = str(output_dir / "sync_stats.txt")
+                self.uncertain_threshold = 0.9
+                self.eval_threshold = 1e-10
+                self.use_counts = True
+
+        args = Args()
+        process_single_file(args.input_anno, args.input_unanno, args)
+        # Check that processing handled the mismatch correctly
+        df = pd.read_excel(args.header_anno, sheet_name='Individual_Reads')
+        extracted = df['header'].str.extract(r'(read\d+)')
+        # final list
+        headers = extracted[0].tolist()
+        # Should have read1 and read3, read2 should be skipped
+        assert 'read1' in headers, "read1 should be present"
+        assert 'read3' in headers, "read3 should be present"
+
+        print("✓ Test 7 PASSED: Header synchronization handled correctly")
+
+    def test_excel_merged_vs_individual(self, processed_output):
+        """
+        Test 8: Excel Merged vs Individual Sheet Consistency
+
+        Verifies that the merged sheet correctly aggregates data from the individual sheet.
+        """
+        df_individual = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads')
+        df_merged = pd.read_excel(processed_output.header_anno, sheet_name='Merged_by_Taxa')
+
+        # Count unique taxa in individual sheet
+        individual_taxa = df_individual['taxa'].nunique()
+
+        # Should match number of rows in merged sheet
+        assert len(df_merged) == individual_taxa, "Merged sheet doesn't match unique taxa count"
+
+        # Check that counts are properly aggregated
+        # For taxa with multiple reads, counts should be summed
+        for _, merged_row in df_merged.iterrows():
+            taxa = merged_row['taxa']
+            individual_rows = df_individual[df_individual['taxa'] == taxa]
+
+            expected_count = individual_rows['count'].sum()
+            actual_count = merged_row['count']
+
+            assert actual_count == expected_count, f"Count mismatch for taxa {taxa}: expected {expected_count}, got {actual_count}"
+
+        print("✓ Test 8 PASSED: Excel merged sheet correctly aggregates individual data")
+
+    def test_annotation_statistics_accuracy(self, processed_output, sample_files):
+        """
+        Test 9: Annotation Statistics Accuracy
+
+        Verifies that calculated annotation statistics match the actual data.
+        """
+        # Read stats file
+        stats = {}
+        with open(processed_output.anno_stats, 'r') as f:
+            lines = f.readlines()[1:]  # Skip header
+            for line in lines:
+                key, value = line.strip().split('\t')
+                try:
+                    stats[key] = float(value)
+                except ValueError:
+                    stats[key] = value
+
+        # Manual verification
+        assert stats['total_sequences'] == 4.0, "Total sequences count incorrect"
+        assert stats['annotated_sequences'] == 3.0, "Annotated sequences count incorrect"
+        assert stats['total_unique'] == 200, "Total unique count incorrect"
+        assert stats['unique_annotated'] == 150, "Unique annotated count incorrect"
+        assert stats['percentage_annotated'] == 75.0, "Percentage annotated incorrect"
+        assert stats['percentage_unique_annotated'] == 75.0, "Percentage unique annotated incorrect"
+
+        print("✓ Test 9 PASSED: Annotation statistics are accurate")
+
+
+
+if __name__ == "__main__":
+    # Run all tests in this file
+    pytest.main([__file__])
\ No newline at end of file