Mercurial > repos > onnodg > cdhit_analysis
diff tests/test_cdhit_analysis.py @ 4:e64af72e1b8f draft default tip
planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_clusters_tool commit 4017d38cf327c48a6252e488ba792527dae97a70-dirty
| author | onnodg |
|---|---|
| date | Mon, 15 Dec 2025 16:44:40 +0000 |
| parents | ff68835adb2b |
| children |
line wrap: on
line diff
--- a/tests/test_cdhit_analysis.py Fri Oct 24 09:38:24 2025 +0000 +++ b/tests/test_cdhit_analysis.py Mon Dec 15 16:44:40 2025 +0000 @@ -1,626 +1,303 @@ """ Test suite for CD-HIT cluster analysis processor. """ - import pytest from pathlib import Path import pandas as pd import os import sys -# Add module path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from Stage_1_translated.NLOOR_scripts.process_clusters_tool.cdhit_analysis import ( parse_cluster_file, process_cluster_data, calculate_cluster_taxa, write_similarity_output, - write_evalue_output, write_count_output, - write_taxa_clusters_output, - write_taxa_processed_output, + write_taxa_excel, ) + class TestCDHitAnalysis: - """Test class for CD-HIT cluster analysis processor using real XLSX test data.""" @pytest.fixture(scope="class") def test_data_dir(self): - """Return path to the test-data directory with real XLSX files.""" - base_dir = Path("Stage_1_translated/NLOOR_scripts/process_clusters_tool/test-data") - assert base_dir.exists(), f"Test data directory does not exist: {base_dir}" - return base_dir + base = Path("Stage_1_translated/NLOOR_scripts/process_clusters_tool/test-data") + assert base.exists() + return base @pytest.fixture(scope="class") def sample_cluster_file(self, test_data_dir): - """Return path to the sample cluster XLSX file.""" - cluster_file = test_data_dir / "29-test.clstr.txt" - assert cluster_file.exists(), f"Sample cluster file not found: {cluster_file}" - return str(cluster_file) + f = test_data_dir / "prev_anno.txt" + assert f.exists() + return str(f) @pytest.fixture(scope="class") def sample_annotation_file(self, test_data_dir): - """Return path to the sample annotation XLSX file.""" - annotation_file = test_data_dir / "header_anno_29_test.xlsx" - assert annotation_file.exists(), f"Sample annotation file not found: {annotation_file}" - return str(annotation_file) + f = test_data_dir / "prev4.xlsx" + assert f.exists() + return str(f) @pytest.fixture(scope="class") def parsed_clusters(self, sample_cluster_file, sample_annotation_file): - """Parse the sample cluster file with annotations.""" return parse_cluster_file(sample_cluster_file, sample_annotation_file) + def test_cluster_parsing_structure(self, parsed_clusters): - """ - Test 1: Cluster File Parsing Structure - - Verifies that cluster files are correctly parsed into the expected data structure - with proper extraction of headers, counts, similarities, and cluster groupings. - """ - # Should have 4 clusters based on sample data - # for x in parsed_clusters: print(x); - assert len(parsed_clusters) == 24, f"Expected 24 clusters, got {len(parsed_clusters)}" - - # Test Cluster 0 structure (3 members) + assert len(parsed_clusters) == 514 cluster_0 = parsed_clusters[0] - assert len(cluster_0) == 41, "Cluster 0 should have 41 members" - cluster_3 = parsed_clusters[3] - assert len(cluster_3) == 4, "Cluster 3 should have 4 members" - - # Check specific member data - assert 'M01687:476:000000000-LL5F5:1:2119:23468:21624_CONS' in cluster_0, "this read should be in cluster 0" - read1_data = cluster_0['M01687:476:000000000-LL5F5:1:2119:23468:21624_CONS'] - assert read1_data['count'] == 1, "read1 count should be 1" - assert read1_data['similarity'] == 97.78, "read1 should be representative (100% similarity)" - assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in read1_data['taxa'], "read1 should have this taxa" + assert len(cluster_0) == 430 - # Check non-representative member - assert 'M01687:476:000000000-LL5F5:1:1107:11168:7701_CONS' in cluster_0, "this read should be in cluster 0" - read2_data = cluster_0['M01687:476:000000000-LL5F5:1:1107:11168:7701_CONS'] - assert read2_data['count'] == 1, "read2 count should be 50" - assert read2_data['similarity'] == 100, "read2 similarity should be 100%" - assert read2_data['taxa'] == "Unannotated read" + read = cluster_0["M01687:460:000000000-LGY9G:1:1101:8356:6156_CONS"] + assert read["count"] == 19 + assert isinstance(read["similarity"], float) - # Test single-member cluster (Cluster 2) - cluster_2 = parsed_clusters[2] - assert len(cluster_2) == 1, "Cluster 2 should have 1 member" - assert 'M01687:476:000000000-LL5F5:1:2108:17627:10678_CONS' in cluster_2, "this read should be in cluster 2" - - print("✓ Test 1 PASSED: Cluster file parsing structure correct") - - def test_annotation_integration(self, parsed_clusters): - """ - Test 2: Annotation Integration - - Verifies that annotations from the separate annotation file are correctly - matched to cluster members based on header names. - """ - # Check that annotations were properly integrated + def test_annotation_integration_basic(self, parsed_clusters): cluster_0 = parsed_clusters[0] - # Verify e-values are correctly assigned - assert cluster_0['M01687:476:000000000-LL5F5:1:1102:8813:1648_CONS']['evalue'] == 1.41e-39, "read1 e-value incorrect" - assert cluster_0['M01687:476:000000000-LL5F5:1:1102:23329:6743_CONS']['evalue'] == 2.32e-37, "read2 e-value incorrect" - assert cluster_0['M01687:476:000000000-LL5F5:1:1102:22397:8283_CONS']['evalue'] == 2.32e-37, "read3 e-value incorrect" - - # Verify taxa assignments - assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:8813:1648_CONS']['taxa'], "read1 taxa incorrect" - assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:23329:6743_CONS']['taxa'], "read2 taxa incorrect" - assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:22397:8283_CONS']['taxa'], "read3 taxa incorrect" + annotated_found = any( + data["taxa"] != "Unannotated read" for data in cluster_0.values() + ) + assert annotated_found, "At least one annotated read expected" - # Test missing annotation handling (if any reads lack annotations) - # All our test reads have annotations, so this tests the default case - for cluster in parsed_clusters: - for header, data in cluster.items(): - if data['evalue'] == 'Unannotated read': - assert data['taxa'] == 'Unannotated read', "Unannotated handling incorrect" - print("✓ Test 2 PASSED: Annotations correctly integrated with cluster data") - - def test_cluster_data_processing(self, parsed_clusters): - """ - Test 3: Cluster Data Processing - - Tests the processing of individual clusters to extract evaluation lists, - similarity lists, and taxa dictionaries with correct count aggregation. - """ - # Test processing of Cluster 0 (mixed taxa) - cluster_0 = parsed_clusters[0] - eval_list, simi_list, taxa_dict = process_cluster_data(cluster_0) + def test_process_cluster_data_counts_and_taxa_map(self, parsed_clusters): + sim, taxa_map, annotated, unannotated = process_cluster_data(parsed_clusters[0]) - # Check eval_list structure - # for x in eval_list: print(x) - assert eval_list[0] == 2, "Two unannotated reads in this cluster, should be 2" - assert len(eval_list) == 409, "Should have 409 annotated reads + 2 unnanotated reads (counted as 1)" + assert isinstance(sim, list) + assert annotated + unannotated == sum(d["count"] for d in parsed_clusters[0].values()) + assert isinstance(taxa_map, dict) + assert annotated == 47004 and unannotated == 9 - # Check that e-values are correctly converted and repeated by count - eval_values = eval_list[1:] # Skip unannotated count - read1_evals = [e for e in eval_values if e == 1.41e-39] - assert len(read1_evals) == 365, "Should have 100 instances of read1's e-value" - - # # Check similarity list - # for x in simi_list: print(x) - assert len(simi_list) == 410, "Should have 410 similarity values" - read1_similarities = [s for s in simi_list if s == 100.0] - assert len(read1_similarities) == 2, "Should have 2 instances of 100% similarity" - assert taxa_dict['Unannotated read'] == 2, "Unannotated reads should be 2" - assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa'] == 406, "taxa should be 406" - assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Uncertain taxa / Uncertain taxa / Uncertain taxa'] == 1, "taxa should be 1" - assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Actinidia / Actinidia kolomikta'] == 1, "taxa should be 1" - print("✓ Test 3 PASSED: Cluster data processing produces correct aggregated data") + def test_weighted_lca_splitting_on_uncertain_taxa(self): + taxa_dict = { + "K / P / C / O / F / G1 / S1": 60, + "K / P / C / O / F / Uncertain taxa / Uncertain taxa": 60, + } - def test_taxa_calculation_simple_case(self, parsed_clusters): - """ - Test 4: Taxa Calculation - Simple Case - - Tests taxonomic resolution for clusters with clear dominant taxa - (single taxa or overwhelming majority). - """ - - # Create test arguments - class TestArgs: + class ArgsLow: uncertain_taxa_use_ratio = 0.5 min_to_split = 0.45 min_count_to_split = 10 - args = TestArgs() - - # Test Cluster 1 (should be clear Archaea) - cluster_5 = parsed_clusters[5] - _, _, taxa_dict_5 = process_cluster_data(cluster_5) - - result_5 = calculate_cluster_taxa(taxa_dict_5, args) - # Should return single taxa group for Archaea - assert len(result_5) == 1, "Single dominant taxa should not split" - dominant_taxa = list(result_5[0].keys())[0] - assert 'Viridiplantae / Streptophyta / Magnoliopsida / Fagales / Juglandaceae / ' \ - 'Uncertain taxa / Uncertain taxa' in dominant_taxa, "Should identify Juglandaceae as dominant" + class ArgsHigh: + uncertain_taxa_use_ratio = 1.0 + min_to_split = 0.45 + min_count_to_split = 10 - # Test single-member cluster (Cluster 2) - cluster_2 = parsed_clusters[2] - _, _, taxa_dict_2 = process_cluster_data(cluster_2) - - result_2 = calculate_cluster_taxa(taxa_dict_2, args) - total = sum(value for d in result_2 for value in d.values()) - assert total == 1, "Single member cluster should not split" - - print("✓ Test 4 PASSED: Simple taxa calculation cases work correctly") - - def test_taxa_calculation_complex_splitting(self, parsed_clusters): - """ - Test 5: Taxa Calculation - Complex Splitting - - Tests the recursive taxonomic resolution algorithm for clusters with - multiple competing taxa that should be split based on thresholds. - """ + # LOW weight → uncertain counts half → G1 wins → no split + res_low = calculate_cluster_taxa(taxa_dict, ArgsLow()) + assert len(res_low) == 1 + assert sum(res_low[0].values()) == 60 # total preserved - class TestArgs: - uncertain_taxa_use_ratio = 0.5 - min_to_split = 0.30 # Lower threshold to encourage splitting - min_count_to_split = 5 # Lower threshold to encourage splitting - - args = TestArgs() - - # Test Cluster 3 (mixed Firmicutes and Proteobacteria) - cluster_3 = parsed_clusters[3] - _, _, taxa_dict_3 = process_cluster_data(cluster_3) - - # Manual check of expected taxa distribution - expected_taxa = {} - for header, data in cluster_3.items(): - taxa = data['taxa'] - count = data['count'] - expected_taxa[taxa] = expected_taxa.get(taxa, 0) + count + # HIGH weight → uncertain = full weight → equal → split + res_high = calculate_cluster_taxa(taxa_dict, ArgsHigh()) + assert len(res_high) == 2 + total = sum(sum(g.values()) for g in res_high) + assert total == 120 - result_3 = calculate_cluster_taxa(taxa_dict_3, args) - - # With mixed taxa and low thresholds, should potentially split - # The exact behavior depends on the algorithm implementation - total_result_count = sum(sum(group.values()) for group in result_3) - expected_total = sum(expected_taxa.values()) - - assert total_result_count == expected_total, "Total counts should be preserved after splitting" - print("✓ Test 5 PASSED: Complex taxa splitting preserves counts and follows thresholds") + def test_calculate_cluster_taxa_preserves_counts_real_cluster(self, parsed_clusters): + sim, taxa_map, annotated, unannotated = process_cluster_data(parsed_clusters[3]) - def test_statistical_calculations(self, parsed_clusters): - """ - Test 6: Statistical Calculations - - Verifies that similarity and e-value statistics are calculated correctly - including averages, standard deviations, and distributions. - """ - # Process all clusters to get combined data - eval_list, simi_list, _ = process_cluster_data(parsed_clusters[5]) - # Test similarity statistics - if eval_list: - expected_avg = sum(simi_list) / len(simi_list) - - # Manual verification of a few key values - # From our test data: read1=100% (100 times), read2=96.67% (50 times), etc. - total_similarity_sum = (100.0 * 166) + (98.88 * 9) + 98.86 - total_count = 176 - manual_avg = total_similarity_sum / total_count - - assert abs( - expected_avg - manual_avg) < 0.01, f"Similarity average mismatch: expected ~{manual_avg}, got {expected_avg}" + raw_total = annotated + unannotated + taxa_map_total = sum(info["count"] for info in taxa_map.values()) + assert raw_total == taxa_map_total - # Test e-value data structure - annotated_evals = eval_list[1:] - assert all(isinstance(e, (int, float)) for e in annotated_evals), "All e-values should be numeric" - assert all(e > 0 for e in annotated_evals), "All e-values should be positive" - - print("✓ Test 6 PASSED: Statistical calculations are mathematically correct") + class Args: + uncertain_taxa_use_ratio = 0.5 + min_to_split = 0.3 + min_count_to_split = 5 - def test_output_file_formats(self, test_data_dir, sample_cluster_file, sample_annotation_file): - """ - Test 7: Output File Formats - Tests that all output files are created with correct structure and content, - including text files, Excel files with multiple sheets, and plot files. - """ - output_dir = test_data_dir - - # Parse data - clusters = parse_cluster_file(sample_cluster_file, sample_annotation_file) + results = calculate_cluster_taxa({t: i["count"] for t, i in taxa_map.items()}, Args()) - # Process all clusters - cluster_data_list = [] - all_eval_data = [0] - all_simi_data = [] - for cluster in clusters: - eval_list, simi_list, taxa_dict = process_cluster_data(cluster) - cluster_data_list.append((eval_list, simi_list, taxa_dict)) - all_eval_data[0] += eval_list[0] - all_eval_data.extend(eval_list[1:]) - all_simi_data.extend(simi_list) + resolved_total = sum(sum(group.values()) for group in results) + assert resolved_total <= raw_total + assert resolved_total > 0 - # Test similarity output - simi_output = output_dir / "test_similarity.txt" - write_similarity_output(all_simi_data, str(simi_output)) - assert simi_output.exists(), "Similarity output file not created" - with open(simi_output, 'r') as f: - content = f.read() - assert "# Average similarity:" in content, "Missing average similarity in output" - assert "# Standard deviation:" in content, "Missing standard deviation in output" - assert "similarity\tcount" in content, "Missing header in similarity output" + def test_write_similarity_and_count_outputs(self, tmp_path, parsed_clusters): + out_simi = tmp_path / "simi.txt" + out_count = tmp_path / "count.txt" - # Test e-value output - eval_output = output_dir / "test_evalue.txt" - write_evalue_output(all_eval_data, str(eval_output)) - - assert eval_output.exists(), "E-value output file not created" - with open(eval_output, 'r') as f: - content = f.read() - assert "evalue\tcount" in content, "Missing header in e-value output" - - # Test count output - count_output = output_dir / "test_count.txt" - write_count_output(all_eval_data, cluster_data_list, str(count_output)) + cluster_data_list = [] + all_simi = [] - assert count_output.exists(), "Count output file not created" - with open(count_output, 'r') as f: - content = f.read() - assert "cluster\tunannotated\tannotated" in content, "Missing header in count output" - assert "TOTAL\t" in content, "Missing total row in count output" - - # Test taxa clusters Excel output - taxa_clusters_output = output_dir / "test_taxa_clusters.xlsx" - write_taxa_clusters_output(cluster_data_list, str(taxa_clusters_output)) - - assert taxa_clusters_output.exists(), "Taxa clusters Excel file not created" - df = pd.read_excel(taxa_clusters_output, sheet_name='Raw_Taxa_Clusters') - expected_columns = ['cluster', 'count', 'taxa_full', 'kingdom', 'phylum', 'class', 'order', 'family', 'genus', - 'species'] - assert all(col in df.columns for col in expected_columns), "Missing columns in taxa clusters output" - - print("✓ Test 7 PASSED: All output file formats are correct and complete") - - def test_taxa_processed_output_structure(self, test_data_dir, sample_cluster_file, sample_annotation_file): - """ - Test 8: Processed Taxa Output Structure - - Tests the complex processed taxa Excel output with multiple sheets - and parameter tracking. - """ - output_dir = test_data_dir + for c in parsed_clusters: + sim, taxa_map, annotated, unannotated = process_cluster_data(c) + cluster_data_list.append( + { + "similarities": sim, + "taxa_map": taxa_map, + "annotated": annotated, + "unannotated": unannotated, + } + ) + all_simi.extend(sim) - class TestArgs: - uncertain_taxa_use_ratio = 0.6 - min_to_split = 0.35 - min_count_to_split = 15 - show_unannotated_clusters = True - - args = TestArgs() - - # Parse and process data - clusters = parse_cluster_file(sample_cluster_file, sample_annotation_file) - cluster_data_list = [] - - for cluster in clusters: - eval_list, simi_list, taxa_dict = process_cluster_data(cluster) - cluster_data_list.append((eval_list, simi_list, taxa_dict)) + write_similarity_output(cluster_data_list, str(out_simi)) + assert out_simi.exists() - # Test processed taxa output - processed_output = output_dir / "test_processed_taxa.xlsx" - write_taxa_processed_output(cluster_data_list, args, str(processed_output)) - - assert processed_output.exists(), "Processed taxa Excel file not created" - - # Check multiple sheets exist - xl_file = pd.ExcelFile(processed_output) - expected_sheets = ['Processed_Taxa_Clusters', 'Settings'] - assert all(sheet in xl_file.sheet_names for sheet in expected_sheets), "Missing sheets in processed taxa output" + write_count_output(cluster_data_list, str(out_count)) + assert out_count.exists() - # Check main data sheet - df_main = pd.read_excel(processed_output, sheet_name='Processed_Taxa_Clusters') - expected_columns = ['cluster', 'count', 'taxa_full', 'kingdom', 'phylum', 'class', 'order', 'family', 'genus', - 'species'] - assert all(col in df_main.columns for col in expected_columns), "Missing columns in processed taxa sheet" - # Check settings sheet - df_settings = pd.read_excel(processed_output, sheet_name='Settings') - assert 'Parameter' in df_settings.columns, "Missing Parameter column in settings" - assert 'Value' in df_settings.columns, "Missing Value column in settings" + def test_write_taxa_excel_raw_and_processed(self, tmp_path, parsed_clusters): - # Verify settings values are recorded - settings_dict = dict(zip(df_settings['Parameter'], df_settings['Value'])) - assert settings_dict['uncertain_taxa_use_ratio'] == 0.6, "Settings not correctly recorded" - assert settings_dict['min_to_split'] == 0.35, "Settings not correctly recorded" - - print("✓ Test 8 PASSED: Processed taxa output has correct structure and settings tracking") - - def test_edge_cases(self, test_data_dir): - """ - Test 9: Edge Cases and Error Handling - - Tests handling of edge cases like empty files, missing annotations, - single-member clusters, and malformed input data. - """ - input_dir = test_data_dir - - # Test empty cluster file - empty_cluster = input_dir / "empty_cluster.clstr" - with open(empty_cluster, 'w') as f: - f.write("") - - clusters_empty = parse_cluster_file(str(empty_cluster)) - assert len(clusters_empty) == 0, "Empty cluster file should produce no clusters" - - # Test cluster file with no annotations - simple_cluster = input_dir / "simple_cluster.clstr" - simple_cluster_content = """>Cluster 0 -0 100nt, >read_no_anno:50... * -""" - with open(simple_cluster, 'w') as f: - f.write(simple_cluster_content) - - with pytest.raises(UnboundLocalError): - parse_cluster_file(str(simple_cluster), raise_on_error=True) + class Args: + uncertain_taxa_use_ratio = 0.5 + min_to_split = 0.45 + min_count_to_split = 10 + min_cluster_support = 1 + make_taxa_in_cluster_split = False - # Test malformed cluster entries (missing parts) - malformed_cluster = input_dir / "malformed_cluster.clstr" - malformed_content = """>Cluster 0 -0 100nt, >read1:50..._CONS(50) * -invalid_line_without_proper_format -1 90nt, >read2:25..._CONS(25) at /+/95% -""" - annotations_malformed = input_dir / "test_pytest.xlsx" - with open(malformed_cluster, 'w') as f: - f.write(malformed_content) - - clusters_malformed = parse_cluster_file(str(malformed_cluster), str(annotations_malformed)) - # Should still parse valid entries and skip invalid ones - assert len(clusters_malformed) == 1, "Should parse valid entries from malformed file" - assert len(clusters_malformed[0]) == 2, "Should have 2 valid read" - assert clusters_malformed[0]['read1:50..._CONS']['evalue'] == 1.0e-50 - assert clusters_malformed[0]['read2:25..._CONS']['count'] == 25 + cluster_data_list = [] + for c in parsed_clusters: + sim, taxa_map, annotated, unannotated = process_cluster_data(c) + cluster_data_list.append( + { + "similarities": sim, + "taxa_map": taxa_map, + "annotated": annotated, + "unannotated": unannotated, + } + ) - print("✓ Test 9 PASSED: Edge cases handled gracefully without crashes") - - def test_count_preservation_across_processing(self, parsed_clusters): - """ - Test 10: Count Preservation Across Processing Pipeline + out = tmp_path / "taxa.xlsx" + write_taxa_excel( + cluster_data_list, Args(), str(out), write_raw=True, write_processed=True + ) - Verifies that read counts are preserved throughout the entire processing - pipeline from cluster parsing through taxa calculation to final output. - """ - # Calculate expected total counts from original data - expected_total = 0 - for cluster in parsed_clusters: - for header, data in cluster.items(): - expected_total += data['count'] + xl = pd.ExcelFile(out) + assert "Raw_Taxa_Clusters" in xl.sheet_names + assert "Processed_Taxa_Clusters" in xl.sheet_names + assert "Settings" in xl.sheet_names - # Process through pipeline and verify counts at each stage - total_from_processing = 0 - taxa_processing_totals = [] + def test_write_taxa_excel_only_raw_or_only_processed(self, tmp_path, parsed_clusters): - class TestArgs: + class Args: uncertain_taxa_use_ratio = 0.5 min_to_split = 0.45 min_count_to_split = 10 - - args = TestArgs() - - for cluster in parsed_clusters: - eval_list, simi_list, taxa_dict = process_cluster_data(cluster) + min_cluster_support = 1 + make_taxa_in_cluster_split = False - # Check that cluster processing preserves counts - cluster_total = eval_list[0] + len(eval_list[1:]) # unannotated + annotated - cluster_expected = sum(data['count'] for data in cluster.values()) - assert cluster_total == cluster_expected, f"Count mismatch in cluster processing: expected {cluster_expected}, got {cluster_total}" - - total_from_processing += cluster_total - - # Check taxa calculation preserves counts - taxa_results = calculate_cluster_taxa(taxa_dict, args) - taxa_total = sum(sum(group.values()) for group in taxa_results) - taxa_processing_totals.append(taxa_total) + cluster_data_list = [] + for c in parsed_clusters: + sim, taxa_map, annotated, unannotated = process_cluster_data(c) + cluster_data_list.append( + { + "similarities": sim, + "taxa_map": taxa_map, + "annotated": annotated, + "unannotated": unannotated, + } + ) - # Verify taxa dict total matches - taxa_dict_total = sum(taxa_dict.values()) - assert taxa_total <= taxa_dict_total, f"Count mismatch in taxa calculation: expected {taxa_dict_total}, got {taxa_total}" - # Final verification - assert total_from_processing == expected_total, f"Total count preservation failed: expected {expected_total}, got {total_from_processing}" + out_raw = tmp_path / "raw.xlsx" + write_taxa_excel(cluster_data_list, Args(), str(out_raw), write_raw=True, write_processed=False) + xl_raw = pd.ExcelFile(out_raw) + assert "Raw_Taxa_Clusters" in xl_raw.sheet_names + assert "Processed_Taxa_Clusters" not in xl_raw.sheet_names + - # Verify sum of all taxa processing equals original - total_taxa_processed = sum(taxa_processing_totals) - assert total_taxa_processed <= expected_total, f"Taxa processing total mismatch: expected {expected_total}, got {total_taxa_processed}" - - print("✓ Test 10 PASSED: Read counts preserved throughout entire processing pipeline") + out_proc = tmp_path / "proc.xlsx" + write_taxa_excel(cluster_data_list, Args(), str(out_proc), write_raw=False, write_processed=True) + xl_proc = pd.ExcelFile(out_proc) + assert "Processed_Taxa_Clusters" in xl_proc.sheet_names - def test_11_parse_arguments_all_flags(self, tmp_path): - """ - Test 11: Argument Parsing with All Flags - Ensures parse_arguments correctly handles all optional flags and values. - """ + def test_parse_arguments_all_flags(self, tmp_path): from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca args = ca.parse_arguments([ - '--input_cluster', str(tmp_path / "dummy.clstr"), - '--simi_plot_y_min', '90', - '--simi_plot_y_max', '99', - '--uncertain_taxa_use_ratio', '0.3', - '--min_to_split', '0.2', - '--min_count_to_split', '5', - '--show_unannotated_clusters', - '--make_taxa_in_cluster_split', - '--print_empty_files' + "--input_cluster", str(tmp_path / "dummy.clstr"), + "--simi_plot_y_min", "90", + "--simi_plot_y_max", "99", + "--uncertain_taxa_use_ratio", "0.3", + "--min_to_split", "0.2", + "--min_count_to_split", "5", + "--output_excel", str(tmp_path / "report.xlsx"), ]) assert args.simi_plot_y_min == 90 - assert args.print_empty_files is True + assert args.simi_plot_y_max == 99 - def test_12_process_cluster_data_valueerror(self): - """ - Test 12: Process Cluster Data with Bad E-value - - Ensures ValueError branches are handled and unannotated counts increase. - """ + def test_main_runs_and_creates_outputs(self, tmp_path): from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca - cluster = { - "seq1": {"count": 1, "similarity": 95.0, "taxa": "taxonA", "evalue": "not_a_number"} - } - eval_list, simi_list, taxa_dict = ca.process_cluster_data(cluster) - assert eval_list[0] == 1 # unannotated read + + clstr = tmp_path / "simple.clstr" + clstr.write_text(">Cluster 0\n0\t88nt, >read1_CONS(3)... *\n") - def test_13_write_similarity_and_evalue_empty(self, tmp_path): - """ - Test 13: Output Writers with Empty Data - """ - from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca - sim_file = tmp_path / "sim.txt" - eval_file = tmp_path / "eval.txt" + anno = tmp_path / "anno.xlsx" + df = pd.DataFrame([ + { + "header": "read1_CONS", + "seq_id": "SEQ001", + "source": "Genbank", + "taxa": "K / P / C / O / F / G / S", + } + ]) + with pd.ExcelWriter(anno) as w: + df.to_excel(w, sheet_name="Individual_Reads", index=False) - ca.write_similarity_output([], str(sim_file)) - assert not sim_file.exists() or sim_file.read_text() == "" + sim_file = tmp_path / "sim.txt" + excel_file = tmp_path / "taxa.xlsx" + args = [ + "--input_cluster", str(clstr), + "--input_annotation", str(anno), + "--output_similarity_txt", str(sim_file), + "--output_excel", str(excel_file), + '--output_taxa_clusters', + '--output_taxa_processed', + '--log_file', 'test-data/new_logs.txt', + '--simi_plot_y_min', '95', + '--simi_plot_y_max', '100', + '--uncertain_taxa_use_ratio', '0.5', + '--min_to_split', '0.45', + '--min_count_to_split', '10', + '--min_cluster_support', '1' + ] - ca.write_evalue_output([5], str(eval_file)) - assert "unannotated" in eval_file.read_text() + ca.main(args) + assert sim_file.exists() + assert excel_file.exists() - def test_14_write_count_zero_and_taxa_clusters_incomplete(self, tmp_path): - """ - Test 14: Count Writer with Zero Data and Taxa Clusters with Incomplete Taxa - """ - from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca - count_file = tmp_path / "count.txt" - taxa_file = tmp_path / "taxa.xlsx" + def test_parse_cluster_file_empty_and_no_annotation(self, tmp_path): + from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis2 as ca + + empty = tmp_path / "empty.clstr" + empty.write_text("") + + clusters = ca.parse_cluster_file(str(empty), annotation_file=None, log_messages=[]) + assert clusters == [] + + def test_create_similarity_plot_creates_file(self, tmp_path, parsed_clusters): + from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis2 as ca + - ca.write_count_output([0], [], str(count_file)) - assert "TOTAL" in count_file.read_text() + cluster_data_list = [] + all_simi = [] + lengths = [] - cluster_data = [([0], [], {"bad": 1})] - ca.write_taxa_clusters_output(cluster_data, str(taxa_file)) - assert taxa_file.exists() - - def test_15_write_taxa_processed_uncertain_and_settings(self, tmp_path): - """ - Test 15: Processed Taxa Output with Settings - """ - from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca + for c in parsed_clusters[:5]: + sim, taxa_map, annotated, unannotated = process_cluster_data(c) + cluster_data_list.append( + {"similarities": sim, "taxa_map": taxa_map, + "annotated": annotated, "unannotated": unannotated} + ) + if sim: + all_simi.extend(sim) + lengths.append(len(sim)) class Args: - uncertain_taxa_use_ratio = 0.5 - min_to_split = 0.2 - min_count_to_split = 2 - show_unannotated_clusters = True - - out_file = tmp_path / "processed.xlsx" - cluster_data = [([0], [], {"Unannotated read": 2})] - ca.write_taxa_processed_output(cluster_data, Args(), str(out_file)) - assert out_file.exists() - - def test_16_create_evalue_plot_edge_cases(self, tmp_path): - """ - Test 16: E-value Plot Edge Cases - """ - from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca - out = tmp_path / "plot.png" - - # Only unannotated - ca.create_evalue_plot([0], [0], str(out)) - assert not out.exists() or out.stat().st_size == 0 - - # Empty after filtering - ca.create_evalue_plot([0, ], [], str(out)) - assert not out.exists() or out.stat().st_size == 0 - - # With valid values - ca.create_evalue_plot([0, 1e-5, 1e-3], [2], str(out)) - assert out.exists() - - def test_17_main_runs_and_prints(self, tmp_path, capsys): - """ - Test 17: Main Entry Point - """ - from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca - clstr = tmp_path / "simple.clstr" - clstr.write_text(">Cluster 0\n0 100nt, >seq1... *\n") + simi_plot_y_min = 95.0 + simi_plot_y_max = 100.0 - out = tmp_path / "sim.txt" - args = [ - '--input_cluster', str(clstr), - '--output_similarity_txt', str(out) - ] - ca.main(args) - captured = capsys.readouterr() - assert "Processing complete" in captured.out - - - def test_18a_prepare_evalue_histogram_valid_data(self): - """ - Test 18a: prepare_evalue_histogram returns correct counts/bins. - """ - from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca - counts, bins = ca.prepare_evalue_histogram([1e-5, 1e-3, 0.5], []) - assert counts.sum() == 3 # 3 entries counted - assert len(bins) == 51 # 50 bins => 51 edges - - def test_18b_prepare_evalue_histogram_empty(self): - """ - Test 18b: prepare_evalue_histogram with empty/invalid data returns (None, None). - """ - from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca - counts, bins = ca.prepare_evalue_histogram([0, None, "bad"], []) - assert counts is None - assert bins is None - - def test_18c_create_evalue_plot_creates_file_and_returns_data(self, tmp_path): - """ - Test 18c: create_evalue_plot saves a PNG and returns numeric data. - """ - from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca - out = tmp_path / "eval.png" - counts, bins = ca.create_evalue_plot_test([1e-5, 1e-3, 0.5], [], str(out)) - assert out.exists() - assert counts.sum() == 3 - assert len(bins) == 51 - - -if __name__ == "__main__": - # Run all tests in this file - pytest.main([__file__]) \ No newline at end of file + out_png = tmp_path / "sim.png" + ca.create_similarity_plot(all_simi, lengths, Args(), str(out_png)) + if all_simi: + assert out_png.exists() \ No newline at end of file
