view tests/test_cdhit_analysis.py @ 3:c6981ea453ae draft default tip

planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_clusters_tool commit ef31054ae26e19eff2f1b1f6c7979e39c47c0d5b-dirty
author onnodg
date Fri, 24 Oct 2025 09:38:24 +0000
parents ff68835adb2b
children
line wrap: on
line source

"""
Test suite for CD-HIT cluster analysis processor.
"""

import pytest
from pathlib import Path
import pandas as pd
import os
import sys

# Add module path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from Stage_1_translated.NLOOR_scripts.process_clusters_tool.cdhit_analysis import (
    parse_cluster_file,
    process_cluster_data,
    calculate_cluster_taxa,
    write_similarity_output,
    write_evalue_output,
    write_count_output,
    write_taxa_clusters_output,
    write_taxa_processed_output,
)

class TestCDHitAnalysis:
    """Test class for CD-HIT cluster analysis processor using real XLSX test data."""

    @pytest.fixture(scope="class")
    def test_data_dir(self):
        """Return path to the test-data directory with real XLSX files."""
        base_dir = Path("Stage_1_translated/NLOOR_scripts/process_clusters_tool/test-data")
        assert base_dir.exists(), f"Test data directory does not exist: {base_dir}"
        return base_dir

    @pytest.fixture(scope="class")
    def sample_cluster_file(self, test_data_dir):
        """Return path to the sample cluster XLSX file."""
        cluster_file = test_data_dir / "29-test.clstr.txt"
        assert cluster_file.exists(), f"Sample cluster file not found: {cluster_file}"
        return str(cluster_file)

    @pytest.fixture(scope="class")
    def sample_annotation_file(self, test_data_dir):
        """Return path to the sample annotation XLSX file."""
        annotation_file = test_data_dir / "header_anno_29_test.xlsx"
        assert annotation_file.exists(), f"Sample annotation file not found: {annotation_file}"
        return str(annotation_file)

    @pytest.fixture(scope="class")
    def parsed_clusters(self, sample_cluster_file, sample_annotation_file):
        """Parse the sample cluster file with annotations."""
        return parse_cluster_file(sample_cluster_file, sample_annotation_file)

    def test_cluster_parsing_structure(self, parsed_clusters):
        """
        Test 1: Cluster File Parsing Structure

        Verifies that cluster files are correctly parsed into the expected data structure
        with proper extraction of headers, counts, similarities, and cluster groupings.
        """
        # Should have 4 clusters based on sample data
        # for x in parsed_clusters: print(x);
        assert len(parsed_clusters) == 24, f"Expected 24 clusters, got {len(parsed_clusters)}"

        # Test Cluster 0 structure (3 members)
        cluster_0 = parsed_clusters[0]
        assert len(cluster_0) == 41, "Cluster 0 should have 41 members"
        cluster_3 = parsed_clusters[3]
        assert len(cluster_3) == 4, "Cluster 3 should have 4 members"

        # Check specific member data
        assert 'M01687:476:000000000-LL5F5:1:2119:23468:21624_CONS' in cluster_0, "this read should be in cluster 0"
        read1_data = cluster_0['M01687:476:000000000-LL5F5:1:2119:23468:21624_CONS']
        assert read1_data['count'] == 1, "read1 count should be 1"
        assert read1_data['similarity'] == 97.78, "read1 should be representative (100% similarity)"
        assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in read1_data['taxa'], "read1 should have this taxa"

        # Check non-representative member
        assert 'M01687:476:000000000-LL5F5:1:1107:11168:7701_CONS' in cluster_0, "this read should be in cluster 0"
        read2_data = cluster_0['M01687:476:000000000-LL5F5:1:1107:11168:7701_CONS']
        assert read2_data['count'] == 1, "read2 count should be 50"
        assert read2_data['similarity'] == 100, "read2 similarity should be 100%"
        assert read2_data['taxa'] == "Unannotated read"

        # Test single-member cluster (Cluster 2)
        cluster_2 = parsed_clusters[2]
        assert len(cluster_2) == 1, "Cluster 2 should have 1 member"
        assert 'M01687:476:000000000-LL5F5:1:2108:17627:10678_CONS' in cluster_2, "this read should be in cluster 2"

        print("✓ Test 1 PASSED: Cluster file parsing structure correct")

    def test_annotation_integration(self, parsed_clusters):
        """
        Test 2: Annotation Integration

        Verifies that annotations from the separate annotation file are correctly
        matched to cluster members based on header names.
        """
        # Check that annotations were properly integrated
        cluster_0 = parsed_clusters[0]

        # Verify e-values are correctly assigned
        assert cluster_0['M01687:476:000000000-LL5F5:1:1102:8813:1648_CONS']['evalue'] == 1.41e-39, "read1 e-value incorrect"
        assert cluster_0['M01687:476:000000000-LL5F5:1:1102:23329:6743_CONS']['evalue'] == 2.32e-37, "read2 e-value incorrect"
        assert cluster_0['M01687:476:000000000-LL5F5:1:1102:22397:8283_CONS']['evalue'] == 2.32e-37, "read3 e-value incorrect"

        # Verify taxa assignments
        assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:8813:1648_CONS']['taxa'], "read1 taxa incorrect"
        assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:23329:6743_CONS']['taxa'], "read2 taxa incorrect"
        assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:22397:8283_CONS']['taxa'], "read3 taxa incorrect"

        # Test missing annotation handling (if any reads lack annotations)
        # All our test reads have annotations, so this tests the default case
        for cluster in parsed_clusters:
            for header, data in cluster.items():
                if data['evalue'] == 'Unannotated read':
                    assert data['taxa'] == 'Unannotated read', "Unannotated handling incorrect"

        print("✓ Test 2 PASSED: Annotations correctly integrated with cluster data")

    def test_cluster_data_processing(self, parsed_clusters):
        """
        Test 3: Cluster Data Processing

        Tests the processing of individual clusters to extract evaluation lists,
        similarity lists, and taxa dictionaries with correct count aggregation.
        """
        # Test processing of Cluster 0 (mixed taxa)
        cluster_0 = parsed_clusters[0]
        eval_list, simi_list, taxa_dict = process_cluster_data(cluster_0)

        # Check eval_list structure
        # for x in eval_list: print(x)
        assert eval_list[0] == 2, "Two unannotated reads in this cluster, should be 2"
        assert len(eval_list) == 409, "Should have 409 annotated reads + 2 unnanotated reads (counted as 1)"

        # Check that e-values are correctly converted and repeated by count
        eval_values = eval_list[1:]  # Skip unannotated count
        read1_evals = [e for e in eval_values if e == 1.41e-39]
        assert len(read1_evals) == 365, "Should have 100 instances of read1's e-value"

        # # Check similarity list
        # for x in simi_list: print(x)
        assert len(simi_list) == 410, "Should have 410 similarity values"
        read1_similarities = [s for s in simi_list if s == 100.0]
        assert len(read1_similarities) == 2, "Should have 2 instances of 100% similarity"

        assert taxa_dict['Unannotated read'] == 2, "Unannotated reads should be 2"
        assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa'] == 406, "taxa should be 406"
        assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Uncertain taxa / Uncertain taxa / Uncertain taxa'] == 1, "taxa should be 1"
        assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Actinidia / Actinidia kolomikta'] == 1, "taxa should be 1"
        print("✓ Test 3 PASSED: Cluster data processing produces correct aggregated data")

    def test_taxa_calculation_simple_case(self, parsed_clusters):
        """
        Test 4: Taxa Calculation - Simple Case

        Tests taxonomic resolution for clusters with clear dominant taxa
        (single taxa or overwhelming majority).
        """

        # Create test arguments
        class TestArgs:
            uncertain_taxa_use_ratio = 0.5
            min_to_split = 0.45
            min_count_to_split = 10

        args = TestArgs()

        # Test Cluster 1 (should be clear Archaea)
        cluster_5 = parsed_clusters[5]
        _, _, taxa_dict_5 = process_cluster_data(cluster_5)

        result_5 = calculate_cluster_taxa(taxa_dict_5, args)
        # Should return single taxa group for Archaea
        assert len(result_5) == 1, "Single dominant taxa should not split"
        dominant_taxa = list(result_5[0].keys())[0]
        assert 'Viridiplantae / Streptophyta / Magnoliopsida / Fagales / Juglandaceae / ' \
        'Uncertain taxa / Uncertain taxa' in dominant_taxa, "Should identify Juglandaceae as dominant"

        # Test single-member cluster (Cluster 2)
        cluster_2 = parsed_clusters[2]
        _, _, taxa_dict_2 = process_cluster_data(cluster_2)

        result_2 = calculate_cluster_taxa(taxa_dict_2, args)
        total = sum(value for d in result_2 for value in d.values())
        assert total == 1, "Single member cluster should not split"

        print("✓ Test 4 PASSED: Simple taxa calculation cases work correctly")

    def test_taxa_calculation_complex_splitting(self, parsed_clusters):
        """
        Test 5: Taxa Calculation - Complex Splitting

        Tests the recursive taxonomic resolution algorithm for clusters with
        multiple competing taxa that should be split based on thresholds.
        """

        class TestArgs:
            uncertain_taxa_use_ratio = 0.5
            min_to_split = 0.30  # Lower threshold to encourage splitting
            min_count_to_split = 5  # Lower threshold to encourage splitting

        args = TestArgs()

        # Test Cluster 3 (mixed Firmicutes and Proteobacteria)
        cluster_3 = parsed_clusters[3]
        _, _, taxa_dict_3 = process_cluster_data(cluster_3)

        # Manual check of expected taxa distribution
        expected_taxa = {}
        for header, data in cluster_3.items():
            taxa = data['taxa']
            count = data['count']
            expected_taxa[taxa] = expected_taxa.get(taxa, 0) + count

        result_3 = calculate_cluster_taxa(taxa_dict_3, args)

        # With mixed taxa and low thresholds, should potentially split
        # The exact behavior depends on the algorithm implementation
        total_result_count = sum(sum(group.values()) for group in result_3)
        expected_total = sum(expected_taxa.values())

        assert total_result_count == expected_total, "Total counts should be preserved after splitting"

        print("✓ Test 5 PASSED: Complex taxa splitting preserves counts and follows thresholds")

    def test_statistical_calculations(self, parsed_clusters):
        """
        Test 6: Statistical Calculations

        Verifies that similarity and e-value statistics are calculated correctly
        including averages, standard deviations, and distributions.
        """
        # Process all clusters to get combined data

        eval_list, simi_list, _ = process_cluster_data(parsed_clusters[5])
        # Test similarity statistics
        if eval_list:
            expected_avg = sum(simi_list) / len(simi_list)

            # Manual verification of a few key values
            # From our test data: read1=100% (100 times), read2=96.67% (50 times), etc.
            total_similarity_sum = (100.0 * 166) + (98.88 * 9) + 98.86
            total_count = 176
            manual_avg = total_similarity_sum / total_count

            assert abs(
                expected_avg - manual_avg) < 0.01, f"Similarity average mismatch: expected ~{manual_avg}, got {expected_avg}"

        # Test e-value data structure
        annotated_evals = eval_list[1:]
        assert all(isinstance(e, (int, float)) for e in annotated_evals), "All e-values should be numeric"
        assert all(e > 0 for e in annotated_evals), "All e-values should be positive"

        print("✓ Test 6 PASSED: Statistical calculations are mathematically correct")

    def test_output_file_formats(self, test_data_dir, sample_cluster_file, sample_annotation_file):
        """
        Test 7: Output File Formats

        Tests that all output files are created with correct structure and content,
        including text files, Excel files with multiple sheets, and plot files.
        """
        output_dir = test_data_dir

        # Parse data
        clusters = parse_cluster_file(sample_cluster_file, sample_annotation_file)

        # Process all clusters
        cluster_data_list = []
        all_eval_data = [0]
        all_simi_data = []

        for cluster in clusters:
            eval_list, simi_list, taxa_dict = process_cluster_data(cluster)
            cluster_data_list.append((eval_list, simi_list, taxa_dict))
            all_eval_data[0] += eval_list[0]
            all_eval_data.extend(eval_list[1:])
            all_simi_data.extend(simi_list)

        # Test similarity output
        simi_output = output_dir / "test_similarity.txt"
        write_similarity_output(all_simi_data, str(simi_output))

        assert simi_output.exists(), "Similarity output file not created"
        with open(simi_output, 'r') as f:
            content = f.read()
            assert "# Average similarity:" in content, "Missing average similarity in output"
            assert "# Standard deviation:" in content, "Missing standard deviation in output"
            assert "similarity\tcount" in content, "Missing header in similarity output"

        # Test e-value output
        eval_output = output_dir / "test_evalue.txt"
        write_evalue_output(all_eval_data, str(eval_output))

        assert eval_output.exists(), "E-value output file not created"
        with open(eval_output, 'r') as f:
            content = f.read()
            assert "evalue\tcount" in content, "Missing header in e-value output"

        # Test count output
        count_output = output_dir / "test_count.txt"
        write_count_output(all_eval_data, cluster_data_list, str(count_output))

        assert count_output.exists(), "Count output file not created"
        with open(count_output, 'r') as f:
            content = f.read()
            assert "cluster\tunannotated\tannotated" in content, "Missing header in count output"
            assert "TOTAL\t" in content, "Missing total row in count output"

        # Test taxa clusters Excel output
        taxa_clusters_output = output_dir / "test_taxa_clusters.xlsx"
        write_taxa_clusters_output(cluster_data_list, str(taxa_clusters_output))

        assert taxa_clusters_output.exists(), "Taxa clusters Excel file not created"
        df = pd.read_excel(taxa_clusters_output, sheet_name='Raw_Taxa_Clusters')
        expected_columns = ['cluster', 'count', 'taxa_full', 'kingdom', 'phylum', 'class', 'order', 'family', 'genus',
                            'species']
        assert all(col in df.columns for col in expected_columns), "Missing columns in taxa clusters output"

        print("✓ Test 7 PASSED: All output file formats are correct and complete")

    def test_taxa_processed_output_structure(self, test_data_dir, sample_cluster_file, sample_annotation_file):
        """
        Test 8: Processed Taxa Output Structure

        Tests the complex processed taxa Excel output with multiple sheets
        and parameter tracking.
        """
        output_dir = test_data_dir

        class TestArgs:
            uncertain_taxa_use_ratio = 0.6
            min_to_split = 0.35
            min_count_to_split = 15
            show_unannotated_clusters = True

        args = TestArgs()

        # Parse and process data
        clusters = parse_cluster_file(sample_cluster_file, sample_annotation_file)
        cluster_data_list = []

        for cluster in clusters:
            eval_list, simi_list, taxa_dict = process_cluster_data(cluster)
            cluster_data_list.append((eval_list, simi_list, taxa_dict))

        # Test processed taxa output
        processed_output = output_dir / "test_processed_taxa.xlsx"
        write_taxa_processed_output(cluster_data_list, args, str(processed_output))

        assert processed_output.exists(), "Processed taxa Excel file not created"

        # Check multiple sheets exist
        xl_file = pd.ExcelFile(processed_output)
        expected_sheets = ['Processed_Taxa_Clusters', 'Settings']
        assert all(sheet in xl_file.sheet_names for sheet in expected_sheets), "Missing sheets in processed taxa output"

        # Check main data sheet
        df_main = pd.read_excel(processed_output, sheet_name='Processed_Taxa_Clusters')
        expected_columns = ['cluster', 'count', 'taxa_full', 'kingdom', 'phylum', 'class', 'order', 'family', 'genus',
                            'species']
        assert all(col in df_main.columns for col in expected_columns), "Missing columns in processed taxa sheet"

        # Check settings sheet
        df_settings = pd.read_excel(processed_output, sheet_name='Settings')
        assert 'Parameter' in df_settings.columns, "Missing Parameter column in settings"
        assert 'Value' in df_settings.columns, "Missing Value column in settings"

        # Verify settings values are recorded
        settings_dict = dict(zip(df_settings['Parameter'], df_settings['Value']))
        assert settings_dict['uncertain_taxa_use_ratio'] == 0.6, "Settings not correctly recorded"
        assert settings_dict['min_to_split'] == 0.35, "Settings not correctly recorded"

        print("✓ Test 8 PASSED: Processed taxa output has correct structure and settings tracking")

    def test_edge_cases(self, test_data_dir):
        """
        Test 9: Edge Cases and Error Handling

        Tests handling of edge cases like empty files, missing annotations,
        single-member clusters, and malformed input data.
        """
        input_dir = test_data_dir

        # Test empty cluster file
        empty_cluster = input_dir / "empty_cluster.clstr"
        with open(empty_cluster, 'w') as f:
            f.write("")

        clusters_empty = parse_cluster_file(str(empty_cluster))
        assert len(clusters_empty) == 0, "Empty cluster file should produce no clusters"

        # Test cluster file with no annotations
        simple_cluster = input_dir / "simple_cluster.clstr"
        simple_cluster_content = """>Cluster 0
0	100nt, >read_no_anno:50... *
"""
        with open(simple_cluster, 'w') as f:
            f.write(simple_cluster_content)

        with pytest.raises(UnboundLocalError):
            parse_cluster_file(str(simple_cluster),  raise_on_error=True)

        # Test malformed cluster entries (missing parts)
        malformed_cluster = input_dir / "malformed_cluster.clstr"
        malformed_content = """>Cluster 0
0	100nt, >read1:50..._CONS(50) *
invalid_line_without_proper_format
1	90nt, >read2:25..._CONS(25) at /+/95%
"""
        annotations_malformed = input_dir / "test_pytest.xlsx"
        with open(malformed_cluster, 'w') as f:
            f.write(malformed_content)

        clusters_malformed = parse_cluster_file(str(malformed_cluster), str(annotations_malformed))
        # Should still parse valid entries and skip invalid ones
        assert len(clusters_malformed) == 1, "Should parse valid entries from malformed file"
        assert len(clusters_malformed[0]) == 2, "Should have 2 valid read"
        assert clusters_malformed[0]['read1:50..._CONS']['evalue'] == 1.0e-50
        assert clusters_malformed[0]['read2:25..._CONS']['count'] == 25

        print("✓ Test 9 PASSED: Edge cases handled gracefully without crashes")

    def test_count_preservation_across_processing(self, parsed_clusters):
        """
        Test 10: Count Preservation Across Processing Pipeline

        Verifies that read counts are preserved throughout the entire processing
        pipeline from cluster parsing through taxa calculation to final output.
        """
        # Calculate expected total counts from original data
        expected_total = 0
        for cluster in parsed_clusters:
            for header, data in cluster.items():
                expected_total += data['count']

        # Process through pipeline and verify counts at each stage
        total_from_processing = 0
        taxa_processing_totals = []

        class TestArgs:
            uncertain_taxa_use_ratio = 0.5
            min_to_split = 0.45
            min_count_to_split = 10

        args = TestArgs()

        for cluster in parsed_clusters:
            eval_list, simi_list, taxa_dict = process_cluster_data(cluster)

            # Check that cluster processing preserves counts
            cluster_total = eval_list[0] + len(eval_list[1:])  # unannotated + annotated
            cluster_expected = sum(data['count'] for data in cluster.values())
            assert cluster_total == cluster_expected, f"Count mismatch in cluster processing: expected {cluster_expected}, got {cluster_total}"

            total_from_processing += cluster_total

            # Check taxa calculation preserves counts
            taxa_results = calculate_cluster_taxa(taxa_dict, args)
            taxa_total = sum(sum(group.values()) for group in taxa_results)
            taxa_processing_totals.append(taxa_total)

            # Verify taxa dict total matches
            taxa_dict_total = sum(taxa_dict.values())
            assert taxa_total <= taxa_dict_total, f"Count mismatch in taxa calculation: expected {taxa_dict_total}, got {taxa_total}"

        # Final verification
        assert total_from_processing == expected_total, f"Total count preservation failed: expected {expected_total}, got {total_from_processing}"

        # Verify sum of all taxa processing equals original
        total_taxa_processed = sum(taxa_processing_totals)
        assert total_taxa_processed <= expected_total, f"Taxa processing total mismatch: expected {expected_total}, got {total_taxa_processed}"

        print("✓ Test 10 PASSED: Read counts preserved throughout entire processing pipeline")

    def test_11_parse_arguments_all_flags(self, tmp_path):
        """
        Test 11: Argument Parsing with All Flags

        Ensures parse_arguments correctly handles all optional flags and values.
        """
        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
        args = ca.parse_arguments([
            '--input_cluster', str(tmp_path / "dummy.clstr"),
            '--simi_plot_y_min', '90',
            '--simi_plot_y_max', '99',
            '--uncertain_taxa_use_ratio', '0.3',
            '--min_to_split', '0.2',
            '--min_count_to_split', '5',
            '--show_unannotated_clusters',
            '--make_taxa_in_cluster_split',
            '--print_empty_files'
        ])
        assert args.simi_plot_y_min == 90
        assert args.print_empty_files is True

    def test_12_process_cluster_data_valueerror(self):
        """
        Test 12: Process Cluster Data with Bad E-value

        Ensures ValueError branches are handled and unannotated counts increase.
        """
        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
        cluster = {
            "seq1": {"count": 1, "similarity": 95.0, "taxa": "taxonA", "evalue": "not_a_number"}
        }
        eval_list, simi_list, taxa_dict = ca.process_cluster_data(cluster)
        assert eval_list[0] == 1  # unannotated read

    def test_13_write_similarity_and_evalue_empty(self, tmp_path):
        """
        Test 13: Output Writers with Empty Data
        """
        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
        sim_file = tmp_path / "sim.txt"
        eval_file = tmp_path / "eval.txt"

        ca.write_similarity_output([], str(sim_file))
        assert not sim_file.exists() or sim_file.read_text() == ""

        ca.write_evalue_output([5], str(eval_file))
        assert "unannotated" in eval_file.read_text()

    def test_14_write_count_zero_and_taxa_clusters_incomplete(self, tmp_path):
        """
        Test 14: Count Writer with Zero Data and Taxa Clusters with Incomplete Taxa
        """
        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
        count_file = tmp_path / "count.txt"
        taxa_file = tmp_path / "taxa.xlsx"

        ca.write_count_output([0], [], str(count_file))
        assert "TOTAL" in count_file.read_text()

        cluster_data = [([0], [], {"bad": 1})]
        ca.write_taxa_clusters_output(cluster_data, str(taxa_file))
        assert taxa_file.exists()

    def test_15_write_taxa_processed_uncertain_and_settings(self, tmp_path):
        """
        Test 15: Processed Taxa Output with Settings
        """
        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca

        class Args:
            uncertain_taxa_use_ratio = 0.5
            min_to_split = 0.2
            min_count_to_split = 2
            show_unannotated_clusters = True

        out_file = tmp_path / "processed.xlsx"
        cluster_data = [([0], [], {"Unannotated read": 2})]
        ca.write_taxa_processed_output(cluster_data, Args(), str(out_file))
        assert out_file.exists()

    def test_16_create_evalue_plot_edge_cases(self, tmp_path):
        """
        Test 16: E-value Plot Edge Cases
        """
        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
        out = tmp_path / "plot.png"

        # Only unannotated
        ca.create_evalue_plot([0], [0], str(out))
        assert not out.exists() or out.stat().st_size == 0

        # Empty after filtering
        ca.create_evalue_plot([0, ], [], str(out))
        assert not out.exists() or out.stat().st_size == 0

        # With valid values
        ca.create_evalue_plot([0, 1e-5, 1e-3], [2], str(out))
        assert out.exists()

    def test_17_main_runs_and_prints(self, tmp_path, capsys):
        """
        Test 17: Main Entry Point
        """
        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
        clstr = tmp_path / "simple.clstr"
        clstr.write_text(">Cluster 0\n0   100nt, >seq1... *\n")

        out = tmp_path / "sim.txt"
        args = [
            '--input_cluster', str(clstr),
            '--output_similarity_txt', str(out)
        ]
        ca.main(args)
        captured = capsys.readouterr()
        assert "Processing complete" in captured.out


    def test_18a_prepare_evalue_histogram_valid_data(self):
        """
        Test 18a: prepare_evalue_histogram returns correct counts/bins.
        """
        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
        counts, bins = ca.prepare_evalue_histogram([1e-5, 1e-3, 0.5], [])
        assert counts.sum() == 3  # 3 entries counted
        assert len(bins) == 51  # 50 bins => 51 edges

    def test_18b_prepare_evalue_histogram_empty(self):
        """
        Test 18b: prepare_evalue_histogram with empty/invalid data returns (None, None).
        """
        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
        counts, bins = ca.prepare_evalue_histogram([0, None, "bad"], [])
        assert counts is None
        assert bins is None

    def test_18c_create_evalue_plot_creates_file_and_returns_data(self, tmp_path):
        """
        Test 18c: create_evalue_plot saves a PNG and returns numeric data.
        """
        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
        out = tmp_path / "eval.png"
        counts, bins = ca.create_evalue_plot_test([1e-5, 1e-3, 0.5], [], str(out))
        assert out.exists()
        assert counts.sum() == 3
        assert len(bins) == 51


if __name__ == "__main__":
    # Run all tests in this file
    pytest.main([__file__])