diff tests/test_cdhit_analysis.py @ 4:e64af72e1b8f draft default tip

planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_clusters_tool commit 4017d38cf327c48a6252e488ba792527dae97a70-dirty
author onnodg
date Mon, 15 Dec 2025 16:44:40 +0000
parents ff68835adb2b
children
line wrap: on
line diff
--- a/tests/test_cdhit_analysis.py	Fri Oct 24 09:38:24 2025 +0000
+++ b/tests/test_cdhit_analysis.py	Mon Dec 15 16:44:40 2025 +0000
@@ -1,626 +1,303 @@
 """
 Test suite for CD-HIT cluster analysis processor.
 """
-
 import pytest
 from pathlib import Path
 import pandas as pd
 import os
 import sys
 
-# Add module path
 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 from Stage_1_translated.NLOOR_scripts.process_clusters_tool.cdhit_analysis import (
     parse_cluster_file,
     process_cluster_data,
     calculate_cluster_taxa,
     write_similarity_output,
-    write_evalue_output,
     write_count_output,
-    write_taxa_clusters_output,
-    write_taxa_processed_output,
+    write_taxa_excel,
 )
 
+
 class TestCDHitAnalysis:
-    """Test class for CD-HIT cluster analysis processor using real XLSX test data."""
 
     @pytest.fixture(scope="class")
     def test_data_dir(self):
-        """Return path to the test-data directory with real XLSX files."""
-        base_dir = Path("Stage_1_translated/NLOOR_scripts/process_clusters_tool/test-data")
-        assert base_dir.exists(), f"Test data directory does not exist: {base_dir}"
-        return base_dir
+        base = Path("Stage_1_translated/NLOOR_scripts/process_clusters_tool/test-data")
+        assert base.exists()
+        return base
 
     @pytest.fixture(scope="class")
     def sample_cluster_file(self, test_data_dir):
-        """Return path to the sample cluster XLSX file."""
-        cluster_file = test_data_dir / "29-test.clstr.txt"
-        assert cluster_file.exists(), f"Sample cluster file not found: {cluster_file}"
-        return str(cluster_file)
+        f = test_data_dir / "prev_anno.txt"
+        assert f.exists()
+        return str(f)
 
     @pytest.fixture(scope="class")
     def sample_annotation_file(self, test_data_dir):
-        """Return path to the sample annotation XLSX file."""
-        annotation_file = test_data_dir / "header_anno_29_test.xlsx"
-        assert annotation_file.exists(), f"Sample annotation file not found: {annotation_file}"
-        return str(annotation_file)
+        f = test_data_dir / "prev4.xlsx"
+        assert f.exists()
+        return str(f)
 
     @pytest.fixture(scope="class")
     def parsed_clusters(self, sample_cluster_file, sample_annotation_file):
-        """Parse the sample cluster file with annotations."""
         return parse_cluster_file(sample_cluster_file, sample_annotation_file)
 
+
     def test_cluster_parsing_structure(self, parsed_clusters):
-        """
-        Test 1: Cluster File Parsing Structure
-
-        Verifies that cluster files are correctly parsed into the expected data structure
-        with proper extraction of headers, counts, similarities, and cluster groupings.
-        """
-        # Should have 4 clusters based on sample data
-        # for x in parsed_clusters: print(x);
-        assert len(parsed_clusters) == 24, f"Expected 24 clusters, got {len(parsed_clusters)}"
-
-        # Test Cluster 0 structure (3 members)
+        assert len(parsed_clusters) == 514
         cluster_0 = parsed_clusters[0]
-        assert len(cluster_0) == 41, "Cluster 0 should have 41 members"
-        cluster_3 = parsed_clusters[3]
-        assert len(cluster_3) == 4, "Cluster 3 should have 4 members"
-
-        # Check specific member data
-        assert 'M01687:476:000000000-LL5F5:1:2119:23468:21624_CONS' in cluster_0, "this read should be in cluster 0"
-        read1_data = cluster_0['M01687:476:000000000-LL5F5:1:2119:23468:21624_CONS']
-        assert read1_data['count'] == 1, "read1 count should be 1"
-        assert read1_data['similarity'] == 97.78, "read1 should be representative (100% similarity)"
-        assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in read1_data['taxa'], "read1 should have this taxa"
+        assert len(cluster_0) == 430
 
-        # Check non-representative member
-        assert 'M01687:476:000000000-LL5F5:1:1107:11168:7701_CONS' in cluster_0, "this read should be in cluster 0"
-        read2_data = cluster_0['M01687:476:000000000-LL5F5:1:1107:11168:7701_CONS']
-        assert read2_data['count'] == 1, "read2 count should be 50"
-        assert read2_data['similarity'] == 100, "read2 similarity should be 100%"
-        assert read2_data['taxa'] == "Unannotated read"
+        read = cluster_0["M01687:460:000000000-LGY9G:1:1101:8356:6156_CONS"]
+        assert read["count"] == 19
+        assert isinstance(read["similarity"], float)
 
-        # Test single-member cluster (Cluster 2)
-        cluster_2 = parsed_clusters[2]
-        assert len(cluster_2) == 1, "Cluster 2 should have 1 member"
-        assert 'M01687:476:000000000-LL5F5:1:2108:17627:10678_CONS' in cluster_2, "this read should be in cluster 2"
-
-        print("✓ Test 1 PASSED: Cluster file parsing structure correct")
-
-    def test_annotation_integration(self, parsed_clusters):
-        """
-        Test 2: Annotation Integration
-
-        Verifies that annotations from the separate annotation file are correctly
-        matched to cluster members based on header names.
-        """
-        # Check that annotations were properly integrated
+    def test_annotation_integration_basic(self, parsed_clusters):
         cluster_0 = parsed_clusters[0]
 
-        # Verify e-values are correctly assigned
-        assert cluster_0['M01687:476:000000000-LL5F5:1:1102:8813:1648_CONS']['evalue'] == 1.41e-39, "read1 e-value incorrect"
-        assert cluster_0['M01687:476:000000000-LL5F5:1:1102:23329:6743_CONS']['evalue'] == 2.32e-37, "read2 e-value incorrect"
-        assert cluster_0['M01687:476:000000000-LL5F5:1:1102:22397:8283_CONS']['evalue'] == 2.32e-37, "read3 e-value incorrect"
-
-        # Verify taxa assignments
-        assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:8813:1648_CONS']['taxa'], "read1 taxa incorrect"
-        assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:23329:6743_CONS']['taxa'], "read2 taxa incorrect"
-        assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:22397:8283_CONS']['taxa'], "read3 taxa incorrect"
+        annotated_found = any(
+            data["taxa"] != "Unannotated read" for data in cluster_0.values()
+        )
+        assert annotated_found, "At least one annotated read expected"
 
-        # Test missing annotation handling (if any reads lack annotations)
-        # All our test reads have annotations, so this tests the default case
-        for cluster in parsed_clusters:
-            for header, data in cluster.items():
-                if data['evalue'] == 'Unannotated read':
-                    assert data['taxa'] == 'Unannotated read', "Unannotated handling incorrect"
 
-        print("✓ Test 2 PASSED: Annotations correctly integrated with cluster data")
-
-    def test_cluster_data_processing(self, parsed_clusters):
-        """
-        Test 3: Cluster Data Processing
-
-        Tests the processing of individual clusters to extract evaluation lists,
-        similarity lists, and taxa dictionaries with correct count aggregation.
-        """
-        # Test processing of Cluster 0 (mixed taxa)
-        cluster_0 = parsed_clusters[0]
-        eval_list, simi_list, taxa_dict = process_cluster_data(cluster_0)
+    def test_process_cluster_data_counts_and_taxa_map(self, parsed_clusters):
+        sim, taxa_map, annotated, unannotated = process_cluster_data(parsed_clusters[0])
 
-        # Check eval_list structure
-        # for x in eval_list: print(x)
-        assert eval_list[0] == 2, "Two unannotated reads in this cluster, should be 2"
-        assert len(eval_list) == 409, "Should have 409 annotated reads + 2 unnanotated reads (counted as 1)"
+        assert isinstance(sim, list)
+        assert annotated + unannotated == sum(d["count"] for d in parsed_clusters[0].values())
+        assert isinstance(taxa_map, dict)
+        assert annotated == 47004 and unannotated == 9
 
-        # Check that e-values are correctly converted and repeated by count
-        eval_values = eval_list[1:]  # Skip unannotated count
-        read1_evals = [e for e in eval_values if e == 1.41e-39]
-        assert len(read1_evals) == 365, "Should have 100 instances of read1's e-value"
-
-        # # Check similarity list
-        # for x in simi_list: print(x)
-        assert len(simi_list) == 410, "Should have 410 similarity values"
-        read1_similarities = [s for s in simi_list if s == 100.0]
-        assert len(read1_similarities) == 2, "Should have 2 instances of 100% similarity"
 
-        assert taxa_dict['Unannotated read'] == 2, "Unannotated reads should be 2"
-        assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa'] == 406, "taxa should be 406"
-        assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Uncertain taxa / Uncertain taxa / Uncertain taxa'] == 1, "taxa should be 1"
-        assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Actinidia / Actinidia kolomikta'] == 1, "taxa should be 1"
-        print("✓ Test 3 PASSED: Cluster data processing produces correct aggregated data")
+    def test_weighted_lca_splitting_on_uncertain_taxa(self):
+        taxa_dict = {
+            "K / P / C / O / F / G1 / S1": 60,
+            "K / P / C / O / F / Uncertain taxa / Uncertain taxa": 60,
+        }
 
-    def test_taxa_calculation_simple_case(self, parsed_clusters):
-        """
-        Test 4: Taxa Calculation - Simple Case
-
-        Tests taxonomic resolution for clusters with clear dominant taxa
-        (single taxa or overwhelming majority).
-        """
-
-        # Create test arguments
-        class TestArgs:
+        class ArgsLow:
             uncertain_taxa_use_ratio = 0.5
             min_to_split = 0.45
             min_count_to_split = 10
 
-        args = TestArgs()
-
-        # Test Cluster 1 (should be clear Archaea)
-        cluster_5 = parsed_clusters[5]
-        _, _, taxa_dict_5 = process_cluster_data(cluster_5)
-
-        result_5 = calculate_cluster_taxa(taxa_dict_5, args)
-        # Should return single taxa group for Archaea
-        assert len(result_5) == 1, "Single dominant taxa should not split"
-        dominant_taxa = list(result_5[0].keys())[0]
-        assert 'Viridiplantae / Streptophyta / Magnoliopsida / Fagales / Juglandaceae / ' \
-        'Uncertain taxa / Uncertain taxa' in dominant_taxa, "Should identify Juglandaceae as dominant"
+        class ArgsHigh:
+            uncertain_taxa_use_ratio = 1.0
+            min_to_split = 0.45
+            min_count_to_split = 10
 
-        # Test single-member cluster (Cluster 2)
-        cluster_2 = parsed_clusters[2]
-        _, _, taxa_dict_2 = process_cluster_data(cluster_2)
-
-        result_2 = calculate_cluster_taxa(taxa_dict_2, args)
-        total = sum(value for d in result_2 for value in d.values())
-        assert total == 1, "Single member cluster should not split"
-
-        print("✓ Test 4 PASSED: Simple taxa calculation cases work correctly")
-
-    def test_taxa_calculation_complex_splitting(self, parsed_clusters):
-        """
-        Test 5: Taxa Calculation - Complex Splitting
-
-        Tests the recursive taxonomic resolution algorithm for clusters with
-        multiple competing taxa that should be split based on thresholds.
-        """
+        # LOW weight → uncertain counts half → G1 wins → no split
+        res_low = calculate_cluster_taxa(taxa_dict, ArgsLow())
+        assert len(res_low) == 1
+        assert sum(res_low[0].values()) == 60  # total preserved
 
-        class TestArgs:
-            uncertain_taxa_use_ratio = 0.5
-            min_to_split = 0.30  # Lower threshold to encourage splitting
-            min_count_to_split = 5  # Lower threshold to encourage splitting
-
-        args = TestArgs()
-
-        # Test Cluster 3 (mixed Firmicutes and Proteobacteria)
-        cluster_3 = parsed_clusters[3]
-        _, _, taxa_dict_3 = process_cluster_data(cluster_3)
-
-        # Manual check of expected taxa distribution
-        expected_taxa = {}
-        for header, data in cluster_3.items():
-            taxa = data['taxa']
-            count = data['count']
-            expected_taxa[taxa] = expected_taxa.get(taxa, 0) + count
+        # HIGH weight → uncertain = full weight → equal → split
+        res_high = calculate_cluster_taxa(taxa_dict, ArgsHigh())
+        assert len(res_high) == 2
+        total = sum(sum(g.values()) for g in res_high)
+        assert total == 120
 
-        result_3 = calculate_cluster_taxa(taxa_dict_3, args)
-
-        # With mixed taxa and low thresholds, should potentially split
-        # The exact behavior depends on the algorithm implementation
-        total_result_count = sum(sum(group.values()) for group in result_3)
-        expected_total = sum(expected_taxa.values())
-
-        assert total_result_count == expected_total, "Total counts should be preserved after splitting"
 
-        print("✓ Test 5 PASSED: Complex taxa splitting preserves counts and follows thresholds")
+    def test_calculate_cluster_taxa_preserves_counts_real_cluster(self, parsed_clusters):
+        sim, taxa_map, annotated, unannotated = process_cluster_data(parsed_clusters[3])
 
-    def test_statistical_calculations(self, parsed_clusters):
-        """
-        Test 6: Statistical Calculations
-
-        Verifies that similarity and e-value statistics are calculated correctly
-        including averages, standard deviations, and distributions.
-        """
-        # Process all clusters to get combined data
 
-        eval_list, simi_list, _ = process_cluster_data(parsed_clusters[5])
-        # Test similarity statistics
-        if eval_list:
-            expected_avg = sum(simi_list) / len(simi_list)
-
-            # Manual verification of a few key values
-            # From our test data: read1=100% (100 times), read2=96.67% (50 times), etc.
-            total_similarity_sum = (100.0 * 166) + (98.88 * 9) + 98.86
-            total_count = 176
-            manual_avg = total_similarity_sum / total_count
-
-            assert abs(
-                expected_avg - manual_avg) < 0.01, f"Similarity average mismatch: expected ~{manual_avg}, got {expected_avg}"
+        raw_total = annotated + unannotated
+        taxa_map_total = sum(info["count"] for info in taxa_map.values())
+        assert raw_total == taxa_map_total
 
-        # Test e-value data structure
-        annotated_evals = eval_list[1:]
-        assert all(isinstance(e, (int, float)) for e in annotated_evals), "All e-values should be numeric"
-        assert all(e > 0 for e in annotated_evals), "All e-values should be positive"
-
-        print("✓ Test 6 PASSED: Statistical calculations are mathematically correct")
+        class Args:
+            uncertain_taxa_use_ratio = 0.5
+            min_to_split = 0.3
+            min_count_to_split = 5
 
-    def test_output_file_formats(self, test_data_dir, sample_cluster_file, sample_annotation_file):
-        """
-        Test 7: Output File Formats
 
-        Tests that all output files are created with correct structure and content,
-        including text files, Excel files with multiple sheets, and plot files.
-        """
-        output_dir = test_data_dir
-
-        # Parse data
-        clusters = parse_cluster_file(sample_cluster_file, sample_annotation_file)
+        results = calculate_cluster_taxa({t: i["count"] for t, i in taxa_map.items()}, Args())
 
-        # Process all clusters
-        cluster_data_list = []
-        all_eval_data = [0]
-        all_simi_data = []
 
-        for cluster in clusters:
-            eval_list, simi_list, taxa_dict = process_cluster_data(cluster)
-            cluster_data_list.append((eval_list, simi_list, taxa_dict))
-            all_eval_data[0] += eval_list[0]
-            all_eval_data.extend(eval_list[1:])
-            all_simi_data.extend(simi_list)
+        resolved_total = sum(sum(group.values()) for group in results)
+        assert resolved_total <= raw_total
+        assert resolved_total > 0
 
-        # Test similarity output
-        simi_output = output_dir / "test_similarity.txt"
-        write_similarity_output(all_simi_data, str(simi_output))
 
-        assert simi_output.exists(), "Similarity output file not created"
-        with open(simi_output, 'r') as f:
-            content = f.read()
-            assert "# Average similarity:" in content, "Missing average similarity in output"
-            assert "# Standard deviation:" in content, "Missing standard deviation in output"
-            assert "similarity\tcount" in content, "Missing header in similarity output"
+    def test_write_similarity_and_count_outputs(self, tmp_path, parsed_clusters):
+        out_simi = tmp_path / "simi.txt"
+        out_count = tmp_path / "count.txt"
 
-        # Test e-value output
-        eval_output = output_dir / "test_evalue.txt"
-        write_evalue_output(all_eval_data, str(eval_output))
-
-        assert eval_output.exists(), "E-value output file not created"
-        with open(eval_output, 'r') as f:
-            content = f.read()
-            assert "evalue\tcount" in content, "Missing header in e-value output"
-
-        # Test count output
-        count_output = output_dir / "test_count.txt"
-        write_count_output(all_eval_data, cluster_data_list, str(count_output))
+        cluster_data_list = []
+        all_simi = []
 
-        assert count_output.exists(), "Count output file not created"
-        with open(count_output, 'r') as f:
-            content = f.read()
-            assert "cluster\tunannotated\tannotated" in content, "Missing header in count output"
-            assert "TOTAL\t" in content, "Missing total row in count output"
-
-        # Test taxa clusters Excel output
-        taxa_clusters_output = output_dir / "test_taxa_clusters.xlsx"
-        write_taxa_clusters_output(cluster_data_list, str(taxa_clusters_output))
-
-        assert taxa_clusters_output.exists(), "Taxa clusters Excel file not created"
-        df = pd.read_excel(taxa_clusters_output, sheet_name='Raw_Taxa_Clusters')
-        expected_columns = ['cluster', 'count', 'taxa_full', 'kingdom', 'phylum', 'class', 'order', 'family', 'genus',
-                            'species']
-        assert all(col in df.columns for col in expected_columns), "Missing columns in taxa clusters output"
-
-        print("✓ Test 7 PASSED: All output file formats are correct and complete")
-
-    def test_taxa_processed_output_structure(self, test_data_dir, sample_cluster_file, sample_annotation_file):
-        """
-        Test 8: Processed Taxa Output Structure
-
-        Tests the complex processed taxa Excel output with multiple sheets
-        and parameter tracking.
-        """
-        output_dir = test_data_dir
+        for c in parsed_clusters:
+            sim, taxa_map, annotated, unannotated = process_cluster_data(c)
+            cluster_data_list.append(
+                {
+                    "similarities": sim,
+                    "taxa_map": taxa_map,
+                    "annotated": annotated,
+                    "unannotated": unannotated,
+                }
+            )
+            all_simi.extend(sim)
 
-        class TestArgs:
-            uncertain_taxa_use_ratio = 0.6
-            min_to_split = 0.35
-            min_count_to_split = 15
-            show_unannotated_clusters = True
-
-        args = TestArgs()
-
-        # Parse and process data
-        clusters = parse_cluster_file(sample_cluster_file, sample_annotation_file)
-        cluster_data_list = []
-
-        for cluster in clusters:
-            eval_list, simi_list, taxa_dict = process_cluster_data(cluster)
-            cluster_data_list.append((eval_list, simi_list, taxa_dict))
+        write_similarity_output(cluster_data_list, str(out_simi))
+        assert out_simi.exists()
 
-        # Test processed taxa output
-        processed_output = output_dir / "test_processed_taxa.xlsx"
-        write_taxa_processed_output(cluster_data_list, args, str(processed_output))
-
-        assert processed_output.exists(), "Processed taxa Excel file not created"
-
-        # Check multiple sheets exist
-        xl_file = pd.ExcelFile(processed_output)
-        expected_sheets = ['Processed_Taxa_Clusters', 'Settings']
-        assert all(sheet in xl_file.sheet_names for sheet in expected_sheets), "Missing sheets in processed taxa output"
+        write_count_output(cluster_data_list, str(out_count))
+        assert out_count.exists()
 
-        # Check main data sheet
-        df_main = pd.read_excel(processed_output, sheet_name='Processed_Taxa_Clusters')
-        expected_columns = ['cluster', 'count', 'taxa_full', 'kingdom', 'phylum', 'class', 'order', 'family', 'genus',
-                            'species']
-        assert all(col in df_main.columns for col in expected_columns), "Missing columns in processed taxa sheet"
 
-        # Check settings sheet
-        df_settings = pd.read_excel(processed_output, sheet_name='Settings')
-        assert 'Parameter' in df_settings.columns, "Missing Parameter column in settings"
-        assert 'Value' in df_settings.columns, "Missing Value column in settings"
+    def test_write_taxa_excel_raw_and_processed(self, tmp_path, parsed_clusters):
 
-        # Verify settings values are recorded
-        settings_dict = dict(zip(df_settings['Parameter'], df_settings['Value']))
-        assert settings_dict['uncertain_taxa_use_ratio'] == 0.6, "Settings not correctly recorded"
-        assert settings_dict['min_to_split'] == 0.35, "Settings not correctly recorded"
-
-        print("✓ Test 8 PASSED: Processed taxa output has correct structure and settings tracking")
-
-    def test_edge_cases(self, test_data_dir):
-        """
-        Test 9: Edge Cases and Error Handling
-
-        Tests handling of edge cases like empty files, missing annotations,
-        single-member clusters, and malformed input data.
-        """
-        input_dir = test_data_dir
-
-        # Test empty cluster file
-        empty_cluster = input_dir / "empty_cluster.clstr"
-        with open(empty_cluster, 'w') as f:
-            f.write("")
-
-        clusters_empty = parse_cluster_file(str(empty_cluster))
-        assert len(clusters_empty) == 0, "Empty cluster file should produce no clusters"
-
-        # Test cluster file with no annotations
-        simple_cluster = input_dir / "simple_cluster.clstr"
-        simple_cluster_content = """>Cluster 0
-0	100nt, >read_no_anno:50... *
-"""
-        with open(simple_cluster, 'w') as f:
-            f.write(simple_cluster_content)
-
-        with pytest.raises(UnboundLocalError):
-            parse_cluster_file(str(simple_cluster),  raise_on_error=True)
+        class Args:
+            uncertain_taxa_use_ratio = 0.5
+            min_to_split = 0.45
+            min_count_to_split = 10
+            min_cluster_support = 1
+            make_taxa_in_cluster_split = False
 
-        # Test malformed cluster entries (missing parts)
-        malformed_cluster = input_dir / "malformed_cluster.clstr"
-        malformed_content = """>Cluster 0
-0	100nt, >read1:50..._CONS(50) *
-invalid_line_without_proper_format
-1	90nt, >read2:25..._CONS(25) at /+/95%
-"""
-        annotations_malformed = input_dir / "test_pytest.xlsx"
-        with open(malformed_cluster, 'w') as f:
-            f.write(malformed_content)
-
-        clusters_malformed = parse_cluster_file(str(malformed_cluster), str(annotations_malformed))
-        # Should still parse valid entries and skip invalid ones
-        assert len(clusters_malformed) == 1, "Should parse valid entries from malformed file"
-        assert len(clusters_malformed[0]) == 2, "Should have 2 valid read"
-        assert clusters_malformed[0]['read1:50..._CONS']['evalue'] == 1.0e-50
-        assert clusters_malformed[0]['read2:25..._CONS']['count'] == 25
+        cluster_data_list = []
+        for c in parsed_clusters:
+            sim, taxa_map, annotated, unannotated = process_cluster_data(c)
+            cluster_data_list.append(
+                {
+                    "similarities": sim,
+                    "taxa_map": taxa_map,
+                    "annotated": annotated,
+                    "unannotated": unannotated,
+                }
+            )
 
-        print("✓ Test 9 PASSED: Edge cases handled gracefully without crashes")
-
-    def test_count_preservation_across_processing(self, parsed_clusters):
-        """
-        Test 10: Count Preservation Across Processing Pipeline
+        out = tmp_path / "taxa.xlsx"
+        write_taxa_excel(
+            cluster_data_list, Args(), str(out), write_raw=True, write_processed=True
+        )
 
-        Verifies that read counts are preserved throughout the entire processing
-        pipeline from cluster parsing through taxa calculation to final output.
-        """
-        # Calculate expected total counts from original data
-        expected_total = 0
-        for cluster in parsed_clusters:
-            for header, data in cluster.items():
-                expected_total += data['count']
+        xl = pd.ExcelFile(out)
+        assert "Raw_Taxa_Clusters" in xl.sheet_names
+        assert "Processed_Taxa_Clusters" in xl.sheet_names
+        assert "Settings" in xl.sheet_names
 
-        # Process through pipeline and verify counts at each stage
-        total_from_processing = 0
-        taxa_processing_totals = []
+    def test_write_taxa_excel_only_raw_or_only_processed(self, tmp_path, parsed_clusters):
 
-        class TestArgs:
+        class Args:
             uncertain_taxa_use_ratio = 0.5
             min_to_split = 0.45
             min_count_to_split = 10
-
-        args = TestArgs()
-
-        for cluster in parsed_clusters:
-            eval_list, simi_list, taxa_dict = process_cluster_data(cluster)
+            min_cluster_support = 1
+            make_taxa_in_cluster_split = False
 
-            # Check that cluster processing preserves counts
-            cluster_total = eval_list[0] + len(eval_list[1:])  # unannotated + annotated
-            cluster_expected = sum(data['count'] for data in cluster.values())
-            assert cluster_total == cluster_expected, f"Count mismatch in cluster processing: expected {cluster_expected}, got {cluster_total}"
-
-            total_from_processing += cluster_total
-
-            # Check taxa calculation preserves counts
-            taxa_results = calculate_cluster_taxa(taxa_dict, args)
-            taxa_total = sum(sum(group.values()) for group in taxa_results)
-            taxa_processing_totals.append(taxa_total)
+        cluster_data_list = []
+        for c in parsed_clusters:
+            sim, taxa_map, annotated, unannotated = process_cluster_data(c)
+            cluster_data_list.append(
+                {
+                    "similarities": sim,
+                    "taxa_map": taxa_map,
+                    "annotated": annotated,
+                    "unannotated": unannotated,
+                }
+            )
 
-            # Verify taxa dict total matches
-            taxa_dict_total = sum(taxa_dict.values())
-            assert taxa_total <= taxa_dict_total, f"Count mismatch in taxa calculation: expected {taxa_dict_total}, got {taxa_total}"
 
-        # Final verification
-        assert total_from_processing == expected_total, f"Total count preservation failed: expected {expected_total}, got {total_from_processing}"
+        out_raw = tmp_path / "raw.xlsx"
+        write_taxa_excel(cluster_data_list, Args(), str(out_raw), write_raw=True, write_processed=False)
+        xl_raw = pd.ExcelFile(out_raw)
+        assert "Raw_Taxa_Clusters" in xl_raw.sheet_names
+        assert "Processed_Taxa_Clusters" not in xl_raw.sheet_names
+
 
-        # Verify sum of all taxa processing equals original
-        total_taxa_processed = sum(taxa_processing_totals)
-        assert total_taxa_processed <= expected_total, f"Taxa processing total mismatch: expected {expected_total}, got {total_taxa_processed}"
-
-        print("✓ Test 10 PASSED: Read counts preserved throughout entire processing pipeline")
+        out_proc = tmp_path / "proc.xlsx"
+        write_taxa_excel(cluster_data_list, Args(), str(out_proc), write_raw=False, write_processed=True)
+        xl_proc = pd.ExcelFile(out_proc)
+        assert "Processed_Taxa_Clusters" in xl_proc.sheet_names
 
-    def test_11_parse_arguments_all_flags(self, tmp_path):
-        """
-        Test 11: Argument Parsing with All Flags
 
-        Ensures parse_arguments correctly handles all optional flags and values.
-        """
+    def test_parse_arguments_all_flags(self, tmp_path):
         from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
         args = ca.parse_arguments([
-            '--input_cluster', str(tmp_path / "dummy.clstr"),
-            '--simi_plot_y_min', '90',
-            '--simi_plot_y_max', '99',
-            '--uncertain_taxa_use_ratio', '0.3',
-            '--min_to_split', '0.2',
-            '--min_count_to_split', '5',
-            '--show_unannotated_clusters',
-            '--make_taxa_in_cluster_split',
-            '--print_empty_files'
+            "--input_cluster", str(tmp_path / "dummy.clstr"),
+            "--simi_plot_y_min", "90",
+            "--simi_plot_y_max", "99",
+            "--uncertain_taxa_use_ratio", "0.3",
+            "--min_to_split", "0.2",
+            "--min_count_to_split", "5",
+            "--output_excel", str(tmp_path / "report.xlsx"),
         ])
         assert args.simi_plot_y_min == 90
-        assert args.print_empty_files is True
+        assert args.simi_plot_y_max == 99
 
-    def test_12_process_cluster_data_valueerror(self):
-        """
-        Test 12: Process Cluster Data with Bad E-value
-
-        Ensures ValueError branches are handled and unannotated counts increase.
-        """
+    def test_main_runs_and_creates_outputs(self, tmp_path):
         from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
-        cluster = {
-            "seq1": {"count": 1, "similarity": 95.0, "taxa": "taxonA", "evalue": "not_a_number"}
-        }
-        eval_list, simi_list, taxa_dict = ca.process_cluster_data(cluster)
-        assert eval_list[0] == 1  # unannotated read
+
+        clstr = tmp_path / "simple.clstr"
+        clstr.write_text(">Cluster 0\n0\t88nt, >read1_CONS(3)... *\n")
 
-    def test_13_write_similarity_and_evalue_empty(self, tmp_path):
-        """
-        Test 13: Output Writers with Empty Data
-        """
-        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
-        sim_file = tmp_path / "sim.txt"
-        eval_file = tmp_path / "eval.txt"
+        anno = tmp_path / "anno.xlsx"
+        df = pd.DataFrame([
+            {
+                "header": "read1_CONS",
+                "seq_id": "SEQ001",
+                "source": "Genbank",
+                "taxa": "K / P / C / O / F / G / S",
+            }
+        ])
+        with pd.ExcelWriter(anno) as w:
+            df.to_excel(w, sheet_name="Individual_Reads", index=False)
 
-        ca.write_similarity_output([], str(sim_file))
-        assert not sim_file.exists() or sim_file.read_text() == ""
+        sim_file = tmp_path / "sim.txt"
+        excel_file = tmp_path / "taxa.xlsx"
+        args = [
+            "--input_cluster", str(clstr),
+            "--input_annotation", str(anno),
+            "--output_similarity_txt", str(sim_file),
+            "--output_excel", str(excel_file),
+            '--output_taxa_clusters',
+            '--output_taxa_processed',
+            '--log_file', 'test-data/new_logs.txt',
+            '--simi_plot_y_min', '95',
+            '--simi_plot_y_max', '100',
+            '--uncertain_taxa_use_ratio', '0.5',
+            '--min_to_split', '0.45',
+            '--min_count_to_split', '10',
+            '--min_cluster_support', '1'
+        ]
 
-        ca.write_evalue_output([5], str(eval_file))
-        assert "unannotated" in eval_file.read_text()
+        ca.main(args)
+        assert sim_file.exists()
+        assert excel_file.exists()
 
-    def test_14_write_count_zero_and_taxa_clusters_incomplete(self, tmp_path):
-        """
-        Test 14: Count Writer with Zero Data and Taxa Clusters with Incomplete Taxa
-        """
-        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
-        count_file = tmp_path / "count.txt"
-        taxa_file = tmp_path / "taxa.xlsx"
+    def test_parse_cluster_file_empty_and_no_annotation(self, tmp_path):
+        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis2 as ca
+
+        empty = tmp_path / "empty.clstr"
+        empty.write_text("")
+
+        clusters = ca.parse_cluster_file(str(empty), annotation_file=None, log_messages=[])
+        assert clusters == []
+
+    def test_create_similarity_plot_creates_file(self, tmp_path, parsed_clusters):
+        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis2 as ca
+
 
-        ca.write_count_output([0], [], str(count_file))
-        assert "TOTAL" in count_file.read_text()
+        cluster_data_list = []
+        all_simi = []
+        lengths = []
 
-        cluster_data = [([0], [], {"bad": 1})]
-        ca.write_taxa_clusters_output(cluster_data, str(taxa_file))
-        assert taxa_file.exists()
-
-    def test_15_write_taxa_processed_uncertain_and_settings(self, tmp_path):
-        """
-        Test 15: Processed Taxa Output with Settings
-        """
-        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
+        for c in parsed_clusters[:5]:
+            sim, taxa_map, annotated, unannotated = process_cluster_data(c)
+            cluster_data_list.append(
+                {"similarities": sim, "taxa_map": taxa_map,
+                 "annotated": annotated, "unannotated": unannotated}
+            )
+            if sim:
+                all_simi.extend(sim)
+                lengths.append(len(sim))
 
         class Args:
-            uncertain_taxa_use_ratio = 0.5
-            min_to_split = 0.2
-            min_count_to_split = 2
-            show_unannotated_clusters = True
-
-        out_file = tmp_path / "processed.xlsx"
-        cluster_data = [([0], [], {"Unannotated read": 2})]
-        ca.write_taxa_processed_output(cluster_data, Args(), str(out_file))
-        assert out_file.exists()
-
-    def test_16_create_evalue_plot_edge_cases(self, tmp_path):
-        """
-        Test 16: E-value Plot Edge Cases
-        """
-        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
-        out = tmp_path / "plot.png"
-
-        # Only unannotated
-        ca.create_evalue_plot([0], [0], str(out))
-        assert not out.exists() or out.stat().st_size == 0
-
-        # Empty after filtering
-        ca.create_evalue_plot([0, ], [], str(out))
-        assert not out.exists() or out.stat().st_size == 0
-
-        # With valid values
-        ca.create_evalue_plot([0, 1e-5, 1e-3], [2], str(out))
-        assert out.exists()
-
-    def test_17_main_runs_and_prints(self, tmp_path, capsys):
-        """
-        Test 17: Main Entry Point
-        """
-        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
-        clstr = tmp_path / "simple.clstr"
-        clstr.write_text(">Cluster 0\n0   100nt, >seq1... *\n")
+            simi_plot_y_min = 95.0
+            simi_plot_y_max = 100.0
 
-        out = tmp_path / "sim.txt"
-        args = [
-            '--input_cluster', str(clstr),
-            '--output_similarity_txt', str(out)
-        ]
-        ca.main(args)
-        captured = capsys.readouterr()
-        assert "Processing complete" in captured.out
-
-
-    def test_18a_prepare_evalue_histogram_valid_data(self):
-        """
-        Test 18a: prepare_evalue_histogram returns correct counts/bins.
-        """
-        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
-        counts, bins = ca.prepare_evalue_histogram([1e-5, 1e-3, 0.5], [])
-        assert counts.sum() == 3  # 3 entries counted
-        assert len(bins) == 51  # 50 bins => 51 edges
-
-    def test_18b_prepare_evalue_histogram_empty(self):
-        """
-        Test 18b: prepare_evalue_histogram with empty/invalid data returns (None, None).
-        """
-        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
-        counts, bins = ca.prepare_evalue_histogram([0, None, "bad"], [])
-        assert counts is None
-        assert bins is None
-
-    def test_18c_create_evalue_plot_creates_file_and_returns_data(self, tmp_path):
-        """
-        Test 18c: create_evalue_plot saves a PNG and returns numeric data.
-        """
-        from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
-        out = tmp_path / "eval.png"
-        counts, bins = ca.create_evalue_plot_test([1e-5, 1e-3, 0.5], [], str(out))
-        assert out.exists()
-        assert counts.sum() == 3
-        assert len(bins) == 51
-
-
-if __name__ == "__main__":
-    # Run all tests in this file
-    pytest.main([__file__])
\ No newline at end of file
+        out_png = tmp_path / "sim.png"
+        ca.create_similarity_plot(all_simi, lengths, Args(), str(out_png))
+        if all_simi:
+            assert out_png.exists()
\ No newline at end of file