view tests/test_blast_annotations_processor.py @ 1:2acf82433aa4 draft default tip

planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_annotations_tool commit d771f9fbfd42bcdeda1623d954550882a0863847-dirty
author onnodg
date Mon, 20 Oct 2025 12:26:51 +0000
parents a3989edf0a4a
children
line wrap: on
line source

"""
Test suite for BLAST annotation processor.
"""

import pytest
import os
import sys
import json
import pandas as pd
from pathlib import Path

# Add the module to path for importing
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from Stage_1_translated.NLOOR_scripts.process_annotations_tool.blast_annotations_processor import (
    process_single_file,
    resolve_taxon_majority,
    TAXONOMIC_LEVELS
)


class TestBlastAnnotationProcessor:
    """Test class for BLAST annotation processor"""

    @pytest.fixture(scope="class")
    def test_data_dir(self):
        """Setup test data directory structure"""
        base_dir = Path("test-data")
        base_dir.mkdir(exist_ok=True)

        # Create subdirectories
        for subdir in ["input", "expected", "output"]:
            (base_dir / subdir).mkdir(exist_ok=True)

        return base_dir

    @pytest.fixture(scope="class")
    def sample_files(self, test_data_dir):
        """Create sample input files for testing"""
        input_dir = test_data_dir / "input"

        # Sample annotated BLAST file
        blast_content = """#Query ID	#Subject	#Subject accession	#Subject Taxonomy ID	#Identity percentage	#Coverage	#evalue	#bitscore	#Source	#Taxonomy
        read1(100)	subject2	subject2	subject2	90.0	95	1e-45	180	database1	Bacteria / Firmicutes / Bacilli / Bacillales / Bacillaceae / Bacillus / Bacillus_subtilis
read1(100)	subject1	subject1	subject1	95.889	100	1e-50	200	database1	Bacteria / Firmicutes / Bacilli / Bacillales / Bacillaceae / Bacillus / Bacillus_subtilis
read2(50)	subject3	subject3	subject3    85.0	90	1e-40	160	database2	Bacteria / Proteobacteria / Gammaproteobacteria / Enterobacterales / Enterobacteriaceae / Escherichia / Escherichia_coli
read3(25)	subject4	subject4	subject4   	80.0	85	1e-35	140	database1	Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_smithii
read4(25)	subject4	subject4	subject4   	80.0	85	1e-35	140	database1	Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_blabla
read4(25)	subject4	subject4	subject4   	80.0	85	1e-40	140	database1	Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_eclhi
read4(25)	subject4	subject4	subject4   	80.0	85	1e-35	140	database1	Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_elchi
read4(25)	subject4	subject4	subject4   	90.0	87	1e-50	160	database1	Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_smithii
"""

        # Sample unannotated FASTA file (headers must match BLAST q_id)
        fasta_content = """>read1(100) count=100;
ATCGATCGATCGATCG
>read2(50) count=50;
GCTAGCTAGCTAGCTA
>read3(25) count=25;
TGACTGACTGACTGAC
>read4(25) count=25;
TGAAAAAAACACCAC
"""

        blast_file = input_dir / "test_blast.tabular"
        fasta_file = input_dir / "test_sequences.fasta"

        with open(blast_file, 'w') as f:
            f.write(blast_content)
        with open(fasta_file, 'w') as f:
            f.write(fasta_content)

        return {
            'blast': str(blast_file),
            'fasta': str(fasta_file)
        }

    @pytest.fixture(scope="class")
    def processed_output(self, test_data_dir, sample_files):
        """Run the processor on sample files and return output paths"""
        output_dir = test_data_dir / "output"

        # Create arguments object
        class Args:
            def __init__(self):
                self.input_anno = sample_files['blast']
                self.input_unanno = sample_files['fasta']
                self.eval_plot = str(output_dir / "eval_plot.png")
                self.taxa_output = str(output_dir / "taxa_output.txt")
                self.circle_data = str(output_dir / "circle_data.json")
                self.header_anno = str(output_dir / "header_anno.xlsx")
                self.anno_stats = str(output_dir / "anno_stats.txt")
                self.uncertain_threshold = 0.9
                self.eval_threshold = 1e-10
                self.use_counts = True

        args = Args()

        # Process the files
        process_single_file(args.input_anno, args.input_unanno, args)

        return args

    def test_data_integrity_best_values(self, processed_output):
        """
        Test 1: Data Integrity - Best Values Selection

        Verifies that for each read, the best e-value corresponds to the correct
        bitscore, identity, coverage, and taxonomic annotation.
        """
        # Read the Excel output to verify the best values are correctly selected
        df = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads')
        print(df)
        # For read1(100), verify best e-value (1e-50) corresponds to correct values
        read1_row = df[df['header'].str.contains('read1')].iloc[0]
        assert read1_row['bitscore'] == float(200), "best bitscore doesn't match"
        assert read1_row['e_value'] == pytest.approx(1e-50, rel=1e-8, abs=1e-49), "Best e-value not correctly selected for read1"
        assert read1_row['identity percentage'] == float(95.889), "Identity doesn't match best bitscore for read1"
        assert 'Bacillus_subtilis' in read1_row['taxa'], "Taxa doesn't match best hit for read1"

        read4_row = df[df['header'].str.contains('read4')].iloc[0]
        assert read4_row['bitscore'] == float(160), "best bitscore doesn't match"
        assert 'Methanobrevibacter_smithii' in read4_row['taxa'], "Taxa doesn't match best hit for read1"
        print("✓ Test 1 PASSED: Best values correctly associated for each read")

    def test_read_count_consistency(self, processed_output):
        """
        Test 2: Read Count Consistency

        Verifies that read counts from FASTA headers are correctly preserved
        and aggregated in all output files.
        """
        # Check Excel output
        df = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads')
        # Verify counts are correctly extracted and preserved
        expected_counts = {'read1': 100, 'read2': 50, 'read3': 25, 'read4':25}

        skipped_reads = []

        for read_name, expected_count in expected_counts.items():
            subset = df.loc[df['header'] == read_name]
            if subset.empty:
                skipped_reads.append(read_name)  # remember we skip this read
                continue
            row = subset.iloc[0]
            assert row['count'] == expected_count, f"Count mismatch for {read_name}"

        # Check annotation stats
        with open(processed_output.anno_stats, 'r') as f:
            stats_content = f.read()
        # Total unique count should be 175 (100+50+25)
        assert 'total_unique\t200' in stats_content, "Total unique count incorrect in stats"
        if skipped_reads:
            assert all(read not in df['header'].values for read in skipped_reads)
        print("✓ Test 2 PASSED: Read counts consistent across all outputs")

    def test_lowest_common_ancester(self, processed_output):
        """
        Test 3: Big Input Files

        Tests the functioning of lowest common ancestor selection with realistic inputfile sizes
        """
        # Test the function directly with known conflicts
        test_conflicts = {
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita a': 10,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita b': 1,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita c': 1,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita d': 1,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita e': 1,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia a': 187,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia b': 2,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia c': 2,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia d': 2,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia e': 2,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia f': 12,
            'Viridiplantae / Streptophyta / Bryopsida / Funariales / Funariaceae / Funaria / Uncertain taxa': 6
        }
        resolved_short, resolved_long = resolve_taxon_majority(test_conflicts, 0.9)
        assert 'Ciceronia a' in resolved_short, "Conflict not resolved to uncertain taxa"

        test_90_precent_conflicts = {
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita a': 90,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita b': 10,
            'Viridiplantae / Streptophyta / Bryopsida / Funariales / Funariaceae / Funaria / Uncertain taxa': 6
        }
        resolved_short, resolved_long = resolve_taxon_majority(test_90_precent_conflicts, 0.9)
        assert 'Cicerbita a' in resolved_short, "Conflict not resolved to uncertain taxa"

        print("✓ Test 3 PASSED: Lowest common ancestor works correctly")


    def test_taxonomic_conflict_resolution(self, processed_output):
        """
        Test 4: Taxonomic Conflict Resolution

        Tests the uncertainty threshold mechanism for resolving taxonomic conflicts.
        Uses a controlled scenario where multiple hits have different taxa.
        """
        # Test the function directly with known conflicts
        test_conflicts = {
            'Bacteria / Firmicutes / Bacilli': 2,
            'Bacteria / Proteobacteria / Gammaproteobacteria': 1
        }

        resolved_short, resolved_long = resolve_taxon_majority(test_conflicts, 0.9)

        # With threshold 0.9, should resolve to most common (2/3 = 0.67 < 0.9, so uncertain)
        assert 'Uncertain taxa' in resolved_short, "Conflict not resolved to uncertain taxa"

        # Test with higher confidence
        test_high_confidence = {
            'Bacteria / Firmicutes / Bacilli': 9,
            'Bacteria / Proteobacteria / Gammaproteobacteria': 1
        }

        resolved_short, resolved_long = resolve_taxon_majority(test_high_confidence, 0.9)
        assert 'Firmicutes' in resolved_short, "High confidence case not resolved correctly"

        print("✓ Test 4 PASSED: Taxonomic conflict resolution working correctly")

    def test_output_file_structures(self, processed_output):
        """
        Test 5: Output File Structure Validation

        Verifies that all output files are created with correct structure and format.
        """
        # Test Excel file structure
        excel_file = processed_output.header_anno
        assert os.path.exists(excel_file), "Excel output file not created"

        # Check both sheets exist
        xl_file = pd.ExcelFile(excel_file)
        expected_sheets = ['Individual_Reads', 'Merged_by_Taxa']
        assert all(sheet in xl_file.sheet_names for sheet in expected_sheets), "Missing Excel sheets"

        # Test Individual_Reads sheet structure
        df_individual = pd.read_excel(excel_file, sheet_name='Individual_Reads')
        expected_cols = ['header', 'e_value', 'identity percentage', 'coverage',
                         'bitscore', 'count', 'source', 'taxa']
        assert all(col in df_individual.columns for col in expected_cols), "Missing columns in Individual_Reads"

        # Test taxa output structure
        with open(processed_output.taxa_output, 'r') as f:
            taxa_lines = f.readlines()

        # Should have header line and data lines
        assert len(taxa_lines) > 2, "Taxa output too short"
        assert 'percentage_rooted\tnumber_rooted' in taxa_lines[1], "Taxa output header incorrect"

        # Test circle data JSON structure
        with open(processed_output.circle_data, 'r') as f:
            circle_data = json.load(f)

        assert isinstance(circle_data, list), "Circle data should be a list"
        assert len(circle_data) == len(TAXONOMIC_LEVELS), "Circle data should have entry per taxonomic level"

        print("✓ Test 5 PASSED: All output files have correct structure")

    def test_evalue_filtering(self, test_data_dir):
        """
        Test 6: E-value Threshold Filtering

        Tests that hits above the e-value threshold are correctly filtered out.
        """
        input_dir = test_data_dir / "input"
        output_dir = test_data_dir / "output"

        # Create test file with mix of good and bad e-values
        blast_content_mixed = """#Query ID	#Subject	#Subject accession	#Subject Taxonomy ID	#Identity percentage	#Coverage	#evalue	#bitscore	#Source	#Taxonomy
        read1(100)	subject1	95.0	100	50	75	1e-50	200	database1	Viridiplantae / Streptophyta / Magnoliopsida / Fagales / Juglandaceae / Uncertain taxa / Uncertain taxa
read1(100)	subject2	90.0	95	45	70	1e-5	180	database1	Viridiplantae / Streptophyta / Magnoliopsida / Rosales / Rosaceae / Sorbus / Sorbus aucuparia
read2(50)	subject3	85.0	90	40	65	1e-3	160	database2	Viridiplantae / Streptophyta / Magnoliopsida / Solanales / Solanaceae / Uncertain taxa / Uncertain taxa
"""

        fasta_content = """>read1(100) count=100;
ATCG
>read2(50) count=50;
GCTA
"""

        blast_file = "Stage_1_translated/NLOOR_scripts/process_annotations_tool/test-data/sorted_test.tabular"
        fasta_file = "Stage_1_translated/NLOOR_scripts/process_annotations_tool/test-data/sorted_test.fasta"

        with open(blast_file, 'w') as f:
            f.write(blast_content_mixed)
        with open(fasta_file, 'w') as f:
            f.write(fasta_content)

        # Process with strict e-value threshold
        class Args:
            def __init__(self):
                self.input_anno = str(blast_file)
                self.input_unanno = str(fasta_file)
                self.header_anno = str(output_dir / "evalue_test.xlsx")
                self.eval_plot = None
                self.taxa_output = None
                self.circle_data = None
                self.anno_stats = None
                self.uncertain_threshold = 0.9
                self.eval_threshold = 1e-10  # Very strict threshold
                self.use_counts = True

        args = Args()
        process_single_file(args.input_anno, args.input_unanno, args)

        # Check that only the 1e-50 hit remains
        df = pd.read_excel(args.header_anno, sheet_name='Individual_Reads')

        # Should only have read1 (with 1e-50), read2 should be filtered out
        assert len(df) == 1, f"Expected 1 read after filtering, got {len(df)}"
        assert df.iloc[0]['e_value'] == pytest.approx(1e-50, rel=1e-8, abs=1e-12), "Wrong hit survived e-value filtering"

        print("✓ Test 6 PASSED: E-value filtering working correctly")

    def test_header_synchronization(self, test_data_dir):
        """
        Test 7: Header Synchronization Between Files

        Tests that the processor correctly handles mismatched headers between
        annotated and unannotated files.
        """
        input_dir = test_data_dir / "input"
        output_dir = test_data_dir / "output"

        # Create mismatched files
        blast_content = """#Query ID	#Subject	#Subject accession	#Subject Taxonomy ID	#Identity percentage	#Coverage	#evalue	#bitscore	#Source	#Taxonomy
read1(100)	source=NCBI   sequenceID=KR738003   superkingdom=Eukaryota   kingdom=Viridiplantae   phylum=Streptophyta   subphylum=Streptophytina   class=Magnoliopsida   subclass=NA   infraclass=NA   order=Malvales   suborder=NA   infraorder=NA   superfamily=NA   family=Malvaceae   genus=Hibiscus   species=Hibiscus trionum   markercode=trnL   lat=0.304   lon=36.87	source=NCBI	N/A	100.000	100	7.35e-30	54.7	Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum
read1(100)	source=NCBI   sequenceID=KR738670   superkingdom=Eukaryota   kingdom=Viridiplantae   phylum=Streptophyta   subphylum=Streptophytina   class=Magnoliopsida   subclass=NA   infraclass=NA   order=Malvales   suborder=NA   infraorder=NA   superfamily=NA   family=Malvaceae   genus=Hibiscus   species=Hibiscus trionum   markercode=trnL   lat=0.304   lon=36.87	source=NCBI	N/A	100.000	100	7.35e-14	54.7	Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum
read3(25)	source=NCBI   sequenceID=KR737595   superkingdom=Eukaryota   kingdom=Viridiplantae   phylum=Streptophyta   subphylum=Streptophytina   class=Magnoliopsida   subclass=NA   infraclass=NA   order=Malvales   suborder=NA   infraorder=NA   superfamily=NA   family=Malvaceae   genus=Hibiscus   species=Hibiscus trionum   markercode=trnL   lat=0.304   lon=36.87	source=NCBI	N/A	97.561	87	1.68e-14	71.3	Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum
"""

        fasta_content = """>read1(100) count=100;
ATCG
>read2(50) merged_sample={}; count=1011; direction=right; seq_b_insertion=0; sminR=40.0; ali_length=53; seq_b_deletion=248; seq_a_deletion=248; seq_a_insertion=0; mode=alignment; sminL=40.0; seq_a_single=0; seq_b_single=0; 
gggcaatcctgagccaagtgactggagttcagataggtgcagagactcaatgg
>read3(25) merged_sample={}; count=179; direction=right; sminR=40.0; ali_length=49; seq_b_deletion=252; seq_a_deletion=252; seq_b_insertion=0; seq_a_insertion=0; mode=alignment; sminL=40.0; seq_a_single=0; seq_b_single=0; 
gggcaatcctgagccaactggagttcagataggtgcagagactcaatgg
"""

        blast_file = input_dir / "test_sync.tabular"
        fasta_file = input_dir / "test_sync.fasta"

        with open(blast_file, 'w') as f:
            f.write(blast_content)
        with open(fasta_file, 'w') as f:
            f.write(fasta_content)

        class Args:
            def __init__(self):
                self.input_anno = blast_file
                self.input_unanno = fasta_file
                self.header_anno = "Stage_1_translated/NLOOR_scripts/process_annotations_tool/test-data/sync_test.xlsx"
                self.eval_plot = None
                self.taxa_output = None
                self.circle_data = None
                self.anno_stats = str(output_dir / "sync_stats.txt")
                self.uncertain_threshold = 0.9
                self.eval_threshold = 1e-10
                self.use_counts = True

        args = Args()
        process_single_file(args.input_anno, args.input_unanno, args)
        # Check that processing handled the mismatch correctly
        df = pd.read_excel(args.header_anno, sheet_name='Individual_Reads')
        extracted = df['header'].str.extract(r'(read\d+)')
        # final list
        headers = extracted[0].tolist()
        # Should have read1 and read3, read2 should be skipped
        assert 'read1' in headers, "read1 should be present"
        assert 'read3' in headers, "read3 should be present"

        print("✓ Test 7 PASSED: Header synchronization handled correctly")

    def test_excel_merged_vs_individual(self, processed_output):
        """
        Test 8: Excel Merged vs Individual Sheet Consistency

        Verifies that the merged sheet correctly aggregates data from the individual sheet.
        """
        df_individual = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads')
        df_merged = pd.read_excel(processed_output.header_anno, sheet_name='Merged_by_Taxa')

        # Count unique taxa in individual sheet
        individual_taxa = df_individual['taxa'].nunique()

        # Should match number of rows in merged sheet
        assert len(df_merged) == individual_taxa, "Merged sheet doesn't match unique taxa count"

        # Check that counts are properly aggregated
        # For taxa with multiple reads, counts should be summed
        for _, merged_row in df_merged.iterrows():
            taxa = merged_row['taxa']
            individual_rows = df_individual[df_individual['taxa'] == taxa]

            expected_count = individual_rows['count'].sum()
            actual_count = merged_row['count']

            assert actual_count == expected_count, f"Count mismatch for taxa {taxa}: expected {expected_count}, got {actual_count}"

        print("✓ Test 8 PASSED: Excel merged sheet correctly aggregates individual data")

    def test_annotation_statistics_accuracy(self, processed_output, sample_files):
        """
        Test 9: Annotation Statistics Accuracy

        Verifies that calculated annotation statistics match the actual data.
        """
        # Read stats file
        stats = {}
        with open(processed_output.anno_stats, 'r') as f:
            lines = f.readlines()[1:]  # Skip header
            for line in lines:
                key, value = line.strip().split('\t')
                try:
                    stats[key] = float(value)
                except ValueError:
                    stats[key] = value

        # Manual verification
        assert stats['total_sequences'] == 4.0, "Total sequences count incorrect"
        assert stats['annotated_sequences'] == 3.0, "Annotated sequences count incorrect"
        assert stats['total_unique'] == 200, "Total unique count incorrect"
        assert stats['unique_annotated'] == 150, "Unique annotated count incorrect"
        assert stats['percentage_annotated'] == 75.0, "Percentage annotated incorrect"
        assert stats['percentage_unique_annotated'] == 75.0, "Percentage unique annotated incorrect"

        print("✓ Test 9 PASSED: Annotation statistics are accurate")



if __name__ == "__main__":
    # Run all tests in this file
    pytest.main([__file__])