view tests/test_blast_annotations_processor.py @ 2:9ca209477dfd draft default tip

planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_annotations_tool commit 4017d38cf327c48a6252e488ba792527dae97a70-dirty
author onnodg
date Mon, 15 Dec 2025 16:43:36 +0000
parents a3989edf0a4a
children
line wrap: on
line source

"""
Test suite for BLAST annotation processor.
"""
import re
import ast
import pytest
import os
import sys
import json
import pandas as pd
from pathlib import Path

# Add the module to path for importing
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from Stage_1_translated.NLOOR_scripts.process_annotations_tool.blast_annotations_processor import (
    process_single_file,
    resolve_tax_majority,
    TAXONOMIC_LEVELS,
    check_header_string
)


class TestBlastAnnotationProcessor:
    """Test class for BLAST annotation processor"""

    @pytest.fixture(scope="class")
    def test_data_dir(self):
        """Setup test data directory structure"""
        base_dir = Path("test-data")
        base_dir.mkdir(exist_ok=True)

        for subdir in ["input", "expected", "output"]:
            (base_dir / subdir).mkdir(exist_ok=True)

        return base_dir

    @pytest.fixture(scope="class")
    def sample_files(self, test_data_dir):
        """Create sample input files for testing"""
        input_dir = test_data_dir / "input"

        blast_content = """#Query ID	#Subject	#Subject accession	#Subject Taxonomy ID	#Identity percentage	#Coverage	#evalue	#bitscore	#Source	#Taxonomy
        read1(100)	subject2	id2	subject2	90.0	95	1e-45	180	database1	Bacteria / Firmicutes / Bacilli / Bacillales / Bacillaceae / Bacillus / Bacillus_subtilis
read1(100)	subject1	id1	subject1	95.889	100	1e-50	200	database1	Bacteria / Firmicutes / Bacilli / Bacillales / Bacillaceae / Bacillus / Bacillus_subtilis
read2(50)	subject3	id3	subject3    85.0	90	1e-40	160	database2	Bacteria / Proteobacteria / Gammaproteobacteria / Enterobacterales / Enterobacteriaceae / Escherichia / Escherichia_coli
read3(25)	subject4	id4	subject4   	80.0	85	1e-35	140	database1	Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_smithii
read4(25)	subject4	id4	subject4   	80.0	85	1e-35	140	database1	Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_blabla
read4(25)	subject4	id4.1	subject4   	80.0	85	1e-40	140	database1	Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_eclhi
read4(25)	subject4	id4	subject4   	80.0	85	1e-35	140	database1	Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_elchi
read4(25)	subject4	id4.2	subject4   	90.0	87	1e-50	160	database1	Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_smithii
"""

        fasta_content = """>read1(100) count=100;
ATCGATCGATCGATCG
>read2(50) count=50;
GCTAGCTAGCTAGCTA
>read3(25) count=25;
TGACTGACTGACTGAC
>read4(25) count=25;
TGAAAAAAACACCAC
"""

        blast_file = input_dir / "test_blast.tabular"
        fasta_file = input_dir / "test_sequences.fasta"

        with open(blast_file, 'w') as f:
            f.write(blast_content)
        with open(fasta_file, 'w') as f:
            f.write(fasta_content)

        return {
            'blast': str(blast_file),
            'fasta': str(fasta_file)
        }

    @pytest.fixture(scope="class")
    def processed_output(self, test_data_dir, sample_files):
        """Run the processor on sample files and return output paths"""
        output_dir = test_data_dir / "output"

        class Args:
            def __init__(self):
                self.input_anno = sample_files['blast']
                self.input_unanno = sample_files['fasta']
                self.eval_plot = str(output_dir / "eval_plot.png")
                self.taxa_output = str(output_dir / "taxa_output.txt")
                self.circle_data = str(output_dir / "circle_data.json")
                self.header_anno = str(output_dir / "header_anno.xlsx")
                self.anno_stats = str(output_dir / "anno_stats.txt")
                self.filtered_fasta = str(output_dir / "filtered_fasta.fasta")
                self.log = str(output_dir / "log.txt")
                self.uncertain_threshold = 0.9
                self.eval_threshold = 1e-10
                self.min_bitscore = 60
                self.min_support = 1
                self.ignore_rank = 'unknown'
                self.ignore_taxonomy = 'environmental'
                self.bitscore_perc_cutoff = 10
                self.ignore_obiclean_type ='singleton'
                self.ignore_illuminapairend_type = 'pairend'
                self.min_identity = 70
                self.min_coverage = 70
                self.ignore_seqids = ''
                self.use_counts = True

        args = Args()
        log_messages = []
        process_single_file(args.input_anno, args.input_unanno, args, log_messages)

        return args


    def test_read_count_consistency(self, processed_output):
        """
        Test 1: Read Count Consistency

        Verifies that read counts from FASTA headers are correctly preserved
        and aggregated in all output files.
        """
        df = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads')
        expected_counts = {'read1': 100, 'read2': 50, 'read3': 25, 'read4':25}

        skipped_reads = []

        for read_name, expected_count in expected_counts.items():
            subset = df.loc[df['header'] == read_name]
            if subset.empty:
                skipped_reads.append(read_name)  # remember we skip this read
                continue
            row = subset.iloc[0]
            assert row['count'] == expected_count, f"Count mismatch for {read_name}"

        with open(processed_output.anno_stats, 'r') as f:
            stats_content = f.read()
        # Total unique count should be 175 (100+50+25)
        assert 'total_unique: 200' in stats_content, "Total unique count incorrect in stats"
        if skipped_reads:
            assert all(read not in df['header'].values for read in skipped_reads)
        print("✓ Test 1 PASSED: Read counts consistent across all outputs")

    def test_lowest_common_ancester(self, processed_output):
        """
        Test 2: Big Input Files

        Tests the functioning of lowest common ancestor selection with realistic inputfile sizes
        """
        test_conflicts = {
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita a': 10,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita b': 1,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita c': 1,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita d': 1,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita e': 1,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia a': 450,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia b': 2,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia c': 2,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia d': 2,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia e': 2,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia f': 12,
            'Viridiplantae / Streptophyta / Bryopsida / Funariales / Funariaceae / Funaria / Uncertain taxa': 6
        }
        resolved_short1, resolved_long1 = resolve_tax_majority(test_conflicts, 0.9)
        assert 'Ciceronia a' in resolved_short1, "Conflict not resolved to uncertain taxa"

        test_90_precent_conflicts = {
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita a': 90,
            'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita b': 10,
            'Viridiplantae / Streptophyta / Bryopsida / Funariales / Funariaceae / Funaria / Uncertain taxa': 6
        }
        resolved_short, resolved_long = resolve_tax_majority(test_90_precent_conflicts, 0.9)
        assert 'Viridiplantae / Streptophyta / Uncertain taxa' in resolved_long, "Conflict not resolved to uncertain taxa"

        print("✓ Test 2 PASSED: Lowest common ancestor works correctly")


    def test_taxonomic_conflict_resolution(self, processed_output):
        """
        Test 3: Taxonomic Conflict Resolution

        Tests the uncertainty threshold mechanism for resolving taxonomic conflicts.
        Uses a controlled scenario where multiple hits have different taxa.
        """
        test_conflicts = {
            'Bacteria / Firmicutes / Bacilli': 2,
            'Bacteria / Proteobacteria / Gammaproteobacteria': 1
        }

        resolved_short, resolved_long = resolve_tax_majority(test_conflicts, 0.9)

        # With threshold 0.9, should resolve to most common (2/3 = 0.67 < 0.9, so uncertain)
        assert 'Uncertain taxa' in resolved_short, "Conflict not resolved to uncertain taxa"

        test_high_confidence = {
            'Bacteria / Firmicutes / Bacilli': 9,
            'Bacteria / Proteobacteria / Gammaproteobacteria': 1
        }

        resolved_short, resolved_long = resolve_tax_majority(test_high_confidence, 0.9)
        assert 'Firmicutes' in resolved_short, "High confidence case not resolved correctly"

        print("✓ Test 3 PASSED: Taxonomic conflict resolution working correctly")

    def test_output_file_structures(self, processed_output):
        """
        Test 4: Output File Structure Validation

        Verifies that all output files are created with correct structure and format.
        """
        excel_file = processed_output.header_anno
        assert os.path.exists(excel_file), "Excel output file not created"

        xl_file = pd.ExcelFile(excel_file)
        expected_sheets = ['Individual_Reads', 'Merged_by_Taxa']
        assert all(sheet in xl_file.sheet_names for sheet in expected_sheets), "Missing Excel sheets"

        df_individual = pd.read_excel(excel_file, sheet_name='Individual_Reads')
        expected_cols = ['header', 'seq_id', 'source', 'count', 'taxa', 'kingdom', 'phylum', 'class', 'order', 'family', 'genus', 'species']
        assert all(col in df_individual.columns for col in expected_cols), "Missing columns in Individual_Reads"

        with open(processed_output.taxa_output, 'r') as f:
            taxa_lines = f.readlines()
        assert len(taxa_lines) == 2, "Taxa output too short"
        assert 'percentage_rooted\tnumber_rooted' in taxa_lines[1], "Taxa output header incorrect"

        with open(processed_output.anno_stats, 'r') as f:
            anno_stats = f.readlines()
            assert 'FASTA: headers kept after filters and min_support=1: 4\n' in anno_stats, "Taxa output header incorrect"
            filter_f = 4


        with open(processed_output.circle_data, 'r') as f:
            circle_data = json.load(f)

        assert isinstance(circle_data, list), "Circle data should be a list"
        assert len(circle_data) == len(TAXONOMIC_LEVELS), "Circle data should have entry per taxonomic level"

        with open(processed_output.filtered_fasta, 'r') as f:
            filtered_fasta = f.readlines()
            assert len(filtered_fasta) == filter_f * 2

        print("✓ Test 4 PASSED: All output files have correct structure")



    def test_header_synchronization(self, test_data_dir):
        """
        Test 5: Header Synchronization Between Files

        Tests that the processor correctly handles mismatched headers between
        annotated and unannotated files.
        """
        input_dir = test_data_dir / "input"
        output_dir = test_data_dir / "output"

        # Create mismatched files
        blast_content = """#Query ID	#Subject	#Subject accession	#Subject Taxonomy ID	#Identity percentage	#Coverage	#evalue	#bitscore	#Source	#Taxonomy
read1(100)	source=NCBI   sequenceID=KR738003   superkingdom=Eukaryota   kingdom=Viridiplantae   phylum=Streptophyta   subphylum=Streptophytina   class=Magnoliopsida   subclass=NA   infraclass=NA   order=Malvales   suborder=NA   infraorder=NA   superfamily=NA   family=Malvaceae   genus=Hibiscus   species=Hibiscus trionum   markercode=trnL   lat=0.304   lon=36.87	source=NCBI	N/A	100.000	100	7.35e-30	54.7	Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum
read1(100)	source=NCBI   sequenceID=KR738670   superkingdom=Eukaryota   kingdom=Viridiplantae   phylum=Streptophyta   subphylum=Streptophytina   class=Magnoliopsida   subclass=NA   infraclass=NA   order=Malvales   suborder=NA   infraorder=NA   superfamily=NA   family=Malvaceae   genus=Hibiscus   species=Hibiscus trionum   markercode=trnL   lat=0.304   lon=36.87	source=NCBI	N/A	100.000	100	7.35e-14	54.7	Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum
read2.1(50) 1   2   3   4   5   6   7   8   9
read3(25)	source=NCBI   sequenceID=KR737595   superkingdom=Eukaryota   kingdom=Viridiplantae   phylum=Streptophyta   subphylum=Streptophytina   class=Magnoliopsida   subclass=NA   infraclass=NA   order=Malvales   suborder=NA   infraorder=NA   superfamily=NA   family=Malvaceae   genus=Hibiscus   species=Hibiscus trionum   markercode=trnL   lat=0.304   lon=36.87	source=NCBI	N/A	97.561	87	1.68e-14	71.3	Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum
"""

        fasta_content = """>read1(100) count=100;
ATCG
>read2(50) merged_sample={}; count=1011; direction=right; seq_b_insertion=0; sminR=40.0; ali_length=53; seq_b_deletion=248; seq_a_deletion=248; seq_a_insertion=0; mode=alignment; sminL=40.0; seq_a_single=0; seq_b_single=0; 
gggcaatcctgagccaagtgactggagttcagataggtgcagagactcaatgg
>read3(25) merged_sample={}; count=179; direction=right; sminR=40.0; ali_length=49; seq_b_deletion=252; seq_a_deletion=252; seq_b_insertion=0; seq_a_insertion=0; mode=alignment; sminL=40.0; seq_a_single=0; seq_b_single=0; 
gggcaatcctgagccaactggagttcagataggtgcagagactcaatgg
"""

        blast_file = input_dir / "test_sync.tabular"
        fasta_file = input_dir / "test_sync.fasta"

        with open(blast_file, 'w') as f:
            f.write(blast_content)
        with open(fasta_file, 'w') as f:
            f.write(fasta_content)

        class Args:
            def __init__(self):
                self.input_anno = blast_file
                self.input_unanno = fasta_file
                self.header_anno = "Stage_1_translated/NLOOR_scripts/process_annotations_tool/test-data/sync_test.xlsx"
                self.eval_plot = None
                self.taxa_output = None
                self.circle_data = None
                self.filtered_fasta = str(output_dir / "filtered_fasta.fasta")
                self.anno_stats = str(output_dir / "sync_stats.txt")
                self.log = str(output_dir / "log.txt")
                self.uncertain_threshold = 0.9
                self.eval_threshold = 1e-10
                self.use_counts = True
                self.min_bitscore = 50
                self.min_support = 1
                self.ignore_rank = 'unknown'
                self.ignore_taxonomy = 'environmental'
                self.bitscore_perc_cutoff = 10
                self.ignore_obiclean_type = 'singleton'
                self.ignore_illuminapairend_type = 'pairend'
                self.min_identity = 30
                self.min_coverage = 30
                self.ignore_seqids = ''

        args = Args()
        process_single_file(args.input_anno, args.input_unanno, args, log_messages=[])
        df = pd.read_excel(args.header_anno, sheet_name='Individual_Reads')
        extracted = df['header'].str.extract(r'(read\d+)')
        headers = extracted[0].tolist()
        # Should have read1 and read3, read2 should be skipped
        assert 'read1' in headers, "read1 should be present"
        assert 'read2' not in headers, "read2 should not be present"
        assert 'read2.1' not in headers, "read2 should not be present"
        assert 'read3' in headers, "read3 should be present"

        print("✓ Test 5 PASSED: Header synchronization handled correctly")

    def test_check_header_string_all_behaviors(self):
        from Stage_1_translated.NLOOR_scripts.process_annotations_tool.blast_annotations_processor import \
            check_header_string

        # clean header — allowed
        assert check_header_string(">readA count=10;", "", "") is True
        # blocks singleton
        assert check_header_string(">r obiclean_status={'XXX': 's'}", "singleton", "") is False
        # blocks variant
        assert check_header_string(">r obiclean_status={'XXX': 'i'}", "variant", "") is False
        # blocks head
        assert check_header_string(">r obiclean_status={'XXX': 'h'}", "head", "") is False
        # blocks pairend
        assert check_header_string(">r PairEnd", "pairend", "") is False
        # blocks consensus
        assert check_header_string(">r CONS", "consensus", "") is False
        # blocks custom string
        assert check_header_string(">r FooBar", "FooBar", "") is False
        # blocks when string is in second param
        assert check_header_string(">r blah", "", "blah") is False
        # blocks when multiple ignore values contain it
        assert check_header_string(">r PairEnd obiclean_status={'XXX': 's'}", "pairend,singleton", "") is False
        # allows when no match
        assert check_header_string(">r something", "pairend", "") is True

    def test_excel_merged_vs_individual(self, processed_output):
        """
        Test 6: Excel Merged vs Individual Sheet Consistency

        Verifies that the merged sheet correctly aggregates data from the individual sheet.
        """
        df_individual = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads')
        df_merged = pd.read_excel(processed_output.header_anno, sheet_name='Merged_by_Taxa')

        individual_taxa = df_individual['taxa'].nunique()

        assert len(df_merged) == individual_taxa, "Merged sheet doesn't match unique taxa count"

        # Check that counts are properly aggregated
        # For taxa with multiple reads, counts should be summed
        for _, merged_row in df_merged.iterrows():
            taxa = merged_row['taxa']
            individual_rows = df_individual[df_individual['taxa'] == taxa]

            expected_count = individual_rows['count'].sum()
            actual_count = merged_row['count']

            assert actual_count == expected_count, f"Count mismatch for taxa {taxa}: expected {expected_count}, got {actual_count}"

        print("✓ Test 6 PASSED: Excel merged sheet correctly aggregates individual data")

    def test_annotation_statistics_accuracy(self, processed_output, sample_files):
        """
        Test 7: Annotation Statistics Accuracy

        Verifies that calculated annotation statistics match the actual data.
        Adapted for the new plain-text log file instead of tab-separated output.
        """
        stats = {}

        with open(processed_output.anno_stats, 'r') as f:
            lines = f.readlines()

        for line in lines:
            line = line.strip()
            if not line or ":" not in line:
                continue

            key, value = line.split(":", 1)
            key = key.strip()
            value = value.strip()

            try:
                stats[key] = float(value)
            except ValueError:
                stats[key] = value

        assert stats["total_sequences"] == 4.0, "Total sequences count incorrect"
        assert stats["annotated_sequences"] == 3.0, "Annotated sequence count incorrect"
        assert stats["total_unique"] == 200.0, "Total unique count incorrect"
        assert stats["unique_annotated"] == 150.0, "Unique annotated count incorrect"
        assert stats["percentage_annotated"] == 75.0, "Percentage annotated incorrect"
        assert stats["percentage_unique_annotated"] == 75.0, "Percentage unique annotated incorrect"

        print("✓ Test 7 PASSED: Annotation statistics are accurate")

    def test_combined_all_filters(self, test_data_dir):
        """
        Single integrated test that validates all FASTA + BLAST filter rules.
        Every read is designed to fail exactly one filter, except readOK.
        """
        input_dir = test_data_dir / "input"
        output_dir = test_data_dir / "output"

        fasta = input_dir / "combined_filters.fasta"
        blast = input_dir / "combined_filters.tabular"

        fasta.write_text(
            ">lowSupport(1) count=1;\nACGT\n"
            ">obicleanFail(10) count=10; obiclean_status={'XXX': 's'};\nACGT\n"
            ">pairendFail_PairEnd(10) count=10;\nACGT\n"
            ">identityFail(10) count=10;\nACGT\n"
            ">coverageFail(10) count=10;\nACGT\n"
            ">bitscoreFail(10) count=10;\nACGT\n"
            ">bscutoffHigh(10) count=10;\nACGT\n"
            ">envTaxFail(10) count=10;\nACGT\n"
            ">rankFail(10) count=10;\nACGT\n"
            ">seqidFail(10) count=10;\nACGT\n"
            ">readOK(10) count=10;\nACGT\n"
            ">readOK_multiple_id(10) count=10;\nACGT\n"
        )

        blast.write_text(
            # min_support (count=1 < 5)
            "lowSupport(1)\ts\tid1\t123\t99\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n"

            # ignore_obiclean_type = singleton
            "obicleanFail(10)\ts\tid2\t123\t99\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n"

            # ignore_illuminapairedend_type = pairend
            "pairendFail_PairEnd(10)\ts\tid3\t123\t99\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n"

            # min_identity = 90 → identity = 50 fails
            "identityFail(10)\ts\tid4\t123\t50\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n"

            # min_coverage = 50 → coverage = 20 fails
            "coverageFail(10)\ts\tid5\t123\t99\t20\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n"

            # min_bitscore = 60 → bitscore = 10 fails
            "bitscoreFail(10)\ts\tid6\t123\t99\t99\t1e-50\t10\tsrc\tA / B / C / D / E / F / G\n"

            # bitscore_perc_cutoff: best = 200 → cutoff = 180 → bitscore 150 fails
            "bscutoffHigh(10)\ts\tid7.1\t123\t99\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G.1\n"
            "bscutoffHigh(10)\ts\tid7.2\t123\t99\t99\t1e-50\t150\tsrc\tA / B / C / D / E / F / G.2\n"

            # ignore_taxonomy = 'environmental'
            "envTaxFail(10)\ts\tid8\t123\t99\t99\t1e-50\t200\tsrc\tEnvironmental / B / C / D / E / F / G\n"

            # ignore_rank = 'unknown'
            "rankFail(10)\ts\tid9\t123\t99\t99\t1e-50\t200\tsrc\tUnknown / B / C / D / E / F / G\n"

            # ignore_seqids = BADSEQ
            "seqidFail(10)\ts\tBADSEQ\t123\t99\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n"

            # readOK (valid, full taxonomy)
            "readOK(10)\ts\tidGood\t123\t99\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n"
        
            # readOK_multiple_id (valid, full taxonomy, multiple id's)
            "readOK_multiple_id(10)\ts\tidGood.1\t123\t99\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n"
            "readOK_multiple_id(10)\ts\tidGood.2\t123\t99\t99\t1e-50\t200\tsrc\tA / B / C / D / E / F / G\n"
        )

        class Args:
            def __init__(self):
                self.input_anno = str(blast)
                self.input_unanno = str(fasta)
                self.header_anno = str(output_dir / "combined.xlsx")
                self.filtered_fasta = str(output_dir / "combined.fasta")
                self.anno_stats = str(output_dir / "combined_stats.txt")
                self.eval_plot = None
                self.taxa_output = None
                self.circle_data = None
                self.log = str(output_dir / "log.txt")
                self.uncertain_threshold = 0.9
                self.eval_threshold = 1e-10
                self.use_counts = True
                self.min_bitscore = 60
                self.min_support = 5
                self.ignore_rank = 'unknown'
                self.ignore_taxonomy = 'environmental'
                self.bitscore_perc_cutoff = 10
                self.ignore_obiclean_type = 'singleton'
                self.ignore_illuminapairend_type = 'pairend'
                self.min_identity = 90
                self.min_coverage = 50
                self.ignore_seqids = 'BADSEQ'

        args = Args()
        process_single_file(args.input_anno, args.input_unanno, args, log_messages=[])

        with open(args.filtered_fasta) as f:
            headers = [l.strip() for l in f if l.startswith(">")]
        assert '>obicleanFail(10) count=10;' not in headers
        assert '>pairendFail_PairEnd(10) count=10;' not in headers
        assert len(headers) == 9, "FASTA filtering only applies to header-based rules"

        df = pd.read_excel(args.header_anno, sheet_name="Individual_Reads")
        seq_ids = {
            sid
            for val in df["seq_id"]
            for sid in (ast.literal_eval(val) if isinstance(val, str) else val)
        }

        expected = {'idGood.1', 'idGood', 'id7.1', 'idGood.2'}

        assert seq_ids == expected, f"Expected surviving seq_ids {expected}, got {seq_ids}"


    def test_log_filters_count(self, processed_output):
        """
        Verify that the BLAST filter counters in the log file match expected structure.
        """
        with open(processed_output.anno_stats) as f:
            log = f.read()

        assert "=== PARAMETERS USED ===" in log
        assert "input_anno:" in log
        assert "input_unanno:" in log

        assert "FASTA: total headers: 4" in log
        assert "FASTA: headers kept after filters" in log

        assert "BLAST: total hits read: 8" in log
        assert "BLAST: hits kept after quality filters: 7" in log

        assert "ANNOTATION: total FASTA headers considered: 4" in log
        assert "ANNOTATION: reads with BLAST hits: 3" in log
        assert "ANNOTATION: reads without BLAST hits: 1" in log

        assert "E-value plot written to:" in log
        assert "Taxa summary written to:" in log
        assert "Header annotations written to:" in log
        assert "Circle diagram JSON written to:" in log

        assert "=== ANNOTATION STATISTICS ===" in log
        assert "percentage_annotated: 75.0" in log
        assert "unique_annotated: 150" in log
        assert "total_unique: 200" in log

    def test_missing_blast_file_graceful(self, test_data_dir):
        """
        Crash / robustness test.

        When the BLAST file does NOT exist, the processor should:
        - not crash
        - write an anno_stats log mentioning the error
        - return without creating header_anno
        """
        input_dir = test_data_dir / "input"
        output_dir = test_data_dir / "output"

        fasta = input_dir / "missing_blast_test.fasta"
        fasta.write_text(">read1(10) count=10;\nACGT\n")

        missing_blast = input_dir / "this_file_does_not_exist.tabular"

        class Args:
            def __init__(self):
                self.input_anno = str(missing_blast)
                self.input_unanno = str(fasta)
                self.header_anno = str(output_dir / "missing_blast_header.xlsx")
                self.filtered_fasta = str(output_dir / "missing_blast_filtered.fasta")
                self.anno_stats = str(output_dir / "missing_blast_stats.txt")
                self.eval_plot = None
                self.taxa_output = None
                self.circle_data = None
                self.log = str(output_dir / "log.txt")
                self.uncertain_threshold = 0.9
                self.eval_threshold = 1e-10
                self.use_counts = True
                self.min_bitscore = 0
                self.min_support = 1
                self.ignore_rank = 'unknown'
                self.ignore_taxonomy = 'environmental'
                self.bitscore_perc_cutoff = 10
                self.ignore_obiclean_type = 'singleton'
                self.ignore_illuminapairend_type = 'pairend'
                self.min_identity = 0
                self.min_coverage = 0
                self.ignore_seqids = ''

        args = Args()

        process_single_file(args.input_anno, args.input_unanno, args, log_messages=[])

        assert not os.path.exists(args.header_anno), "Header file should not be created when BLAST is missing"

        assert os.path.exists(args.anno_stats), "anno_stats log should be created on error"

        from pathlib import Path
        log_text = Path(args.anno_stats).read_text()
        assert "Error: Input file" in log_text, "Missing BLAST file error not logged"
        assert "Starting processing for FASTA" in log_text, "FASTA processing log missing"


if __name__ == "__main__":
    pytest.main([__file__])