Mercurial > repos > onnodg > cdhit_analysis
diff tests/test_cdhit_analysis.py @ 0:00d56396b32a draft
planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_clusters_tool commit c944fd5685f295acba06679e85b67973c173b137
| author | onnodg |
|---|---|
| date | Tue, 14 Oct 2025 09:09:46 +0000 |
| parents | |
| children | ff68835adb2b |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test_cdhit_analysis.py Tue Oct 14 09:09:46 2025 +0000 @@ -0,0 +1,626 @@ +""" +Test suite for CD-HIT cluster analysis processor. +""" + +import pytest +from pathlib import Path +import pandas as pd +import os +import sys + +# Add module path +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from Stage_1_translated.NLOOR_scripts.process_clusters_tool.cdhit_analysis import ( + parse_cluster_file, + process_cluster_data, + calculate_cluster_taxa, + write_similarity_output, + write_evalue_output, + write_count_output, + write_taxa_clusters_output, + write_taxa_processed_output, +) + +class TestCDHitAnalysis: + """Test class for CD-HIT cluster analysis processor using real XLSX test data.""" + + @pytest.fixture(scope="class") + def test_data_dir(self): + """Return path to the test-data directory with real XLSX files.""" + base_dir = Path("Stage_1_translated/NLOOR_scripts/process_clusters_tool/test-data") + assert base_dir.exists(), f"Test data directory does not exist: {base_dir}" + return base_dir + + @pytest.fixture(scope="class") + def sample_cluster_file(self, test_data_dir): + """Return path to the sample cluster XLSX file.""" + cluster_file = test_data_dir / "29-test.clstr.txt" + assert cluster_file.exists(), f"Sample cluster file not found: {cluster_file}" + return str(cluster_file) + + @pytest.fixture(scope="class") + def sample_annotation_file(self, test_data_dir): + """Return path to the sample annotation XLSX file.""" + annotation_file = test_data_dir / "header_anno_29_test.xlsx" + assert annotation_file.exists(), f"Sample annotation file not found: {annotation_file}" + return str(annotation_file) + + @pytest.fixture(scope="class") + def parsed_clusters(self, sample_cluster_file, sample_annotation_file): + """Parse the sample cluster file with annotations.""" + return parse_cluster_file(sample_cluster_file, sample_annotation_file) + + def test_cluster_parsing_structure(self, parsed_clusters): + """ + Test 1: Cluster File Parsing Structure + + Verifies that cluster files are correctly parsed into the expected data structure + with proper extraction of headers, counts, similarities, and cluster groupings. + """ + # Should have 4 clusters based on sample data + # for x in parsed_clusters: print(x); + assert len(parsed_clusters) == 24, f"Expected 24 clusters, got {len(parsed_clusters)}" + + # Test Cluster 0 structure (3 members) + cluster_0 = parsed_clusters[0] + assert len(cluster_0) == 41, "Cluster 0 should have 41 members" + cluster_3 = parsed_clusters[3] + assert len(cluster_3) == 4, "Cluster 3 should have 4 members" + + # Check specific member data + assert 'M01687:476:000000000-LL5F5:1:2119:23468:21624_CONS' in cluster_0, "this read should be in cluster 0" + read1_data = cluster_0['M01687:476:000000000-LL5F5:1:2119:23468:21624_CONS'] + assert read1_data['count'] == 1, "read1 count should be 1" + assert read1_data['similarity'] == 97.78, "read1 should be representative (100% similarity)" + assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in read1_data['taxa'], "read1 should have this taxa" + + # Check non-representative member + assert 'M01687:476:000000000-LL5F5:1:1107:11168:7701_CONS' in cluster_0, "this read should be in cluster 0" + read2_data = cluster_0['M01687:476:000000000-LL5F5:1:1107:11168:7701_CONS'] + assert read2_data['count'] == 1, "read2 count should be 50" + assert read2_data['similarity'] == 100, "read2 similarity should be 100%" + assert read2_data['taxa'] == "Unannotated read" + + # Test single-member cluster (Cluster 2) + cluster_2 = parsed_clusters[2] + assert len(cluster_2) == 1, "Cluster 2 should have 1 member" + assert 'M01687:476:000000000-LL5F5:1:2108:17627:10678_CONS' in cluster_2, "this read should be in cluster 2" + + print("✓ Test 1 PASSED: Cluster file parsing structure correct") + + def test_annotation_integration(self, parsed_clusters): + """ + Test 2: Annotation Integration + + Verifies that annotations from the separate annotation file are correctly + matched to cluster members based on header names. + """ + # Check that annotations were properly integrated + cluster_0 = parsed_clusters[0] + + # Verify e-values are correctly assigned + assert cluster_0['M01687:476:000000000-LL5F5:1:1102:8813:1648_CONS']['evalue'] == 1.41e-39, "read1 e-value incorrect" + assert cluster_0['M01687:476:000000000-LL5F5:1:1102:23329:6743_CONS']['evalue'] == 2.32e-37, "read2 e-value incorrect" + assert cluster_0['M01687:476:000000000-LL5F5:1:1102:22397:8283_CONS']['evalue'] == 2.32e-37, "read3 e-value incorrect" + + # Verify taxa assignments + assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:8813:1648_CONS']['taxa'], "read1 taxa incorrect" + assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:23329:6743_CONS']['taxa'], "read2 taxa incorrect" + assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:22397:8283_CONS']['taxa'], "read3 taxa incorrect" + + # Test missing annotation handling (if any reads lack annotations) + # All our test reads have annotations, so this tests the default case + for cluster in parsed_clusters: + for header, data in cluster.items(): + if data['evalue'] == 'Unannotated read': + assert data['taxa'] == 'Unannotated read', "Unannotated handling incorrect" + + print("✓ Test 2 PASSED: Annotations correctly integrated with cluster data") + + def test_cluster_data_processing(self, parsed_clusters): + """ + Test 3: Cluster Data Processing + + Tests the processing of individual clusters to extract evaluation lists, + similarity lists, and taxa dictionaries with correct count aggregation. + """ + # Test processing of Cluster 0 (mixed taxa) + cluster_0 = parsed_clusters[0] + eval_list, simi_list, taxa_dict = process_cluster_data(cluster_0) + + # Check eval_list structure + # for x in eval_list: print(x) + assert eval_list[0] == 2, "Two unannotated reads in this cluster, should be 2" + assert len(eval_list) == 409, "Should have 409 annotated reads + 2 unnanotated reads (counted as 1)" + + # Check that e-values are correctly converted and repeated by count + eval_values = eval_list[1:] # Skip unannotated count + read1_evals = [e for e in eval_values if e == 1.41e-39] + assert len(read1_evals) == 365, "Should have 100 instances of read1's e-value" + + # # Check similarity list + # for x in simi_list: print(x) + assert len(simi_list) == 410, "Should have 410 similarity values" + read1_similarities = [s for s in simi_list if s == 100.0] + assert len(read1_similarities) == 2, "Should have 2 instances of 100% similarity" + + assert taxa_dict['Unannotated read'] == 2, "Unannotated reads should be 2" + assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa'] == 406, "taxa should be 406" + assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Uncertain taxa / Uncertain taxa / Uncertain taxa'] == 1, "taxa should be 1" + assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Actinidia / Actinidia kolomikta'] == 1, "taxa should be 1" + print("✓ Test 3 PASSED: Cluster data processing produces correct aggregated data") + + def test_taxa_calculation_simple_case(self, parsed_clusters): + """ + Test 4: Taxa Calculation - Simple Case + + Tests taxonomic resolution for clusters with clear dominant taxa + (single taxa or overwhelming majority). + """ + + # Create test arguments + class TestArgs: + uncertain_taxa_use_ratio = 0.5 + min_to_split = 0.45 + min_count_to_split = 10 + + args = TestArgs() + + # Test Cluster 1 (should be clear Archaea) + cluster_5 = parsed_clusters[5] + _, _, taxa_dict_5 = process_cluster_data(cluster_5) + + result_5 = calculate_cluster_taxa(taxa_dict_5, args) + # Should return single taxa group for Archaea + assert len(result_5) == 1, "Single dominant taxa should not split" + dominant_taxa = list(result_5[0].keys())[0] + assert 'Viridiplantae / Streptophyta / Magnoliopsida / Fagales / Juglandaceae / ' \ + 'Uncertain taxa / Uncertain taxa' in dominant_taxa, "Should identify Juglandaceae as dominant" + + # Test single-member cluster (Cluster 2) + cluster_2 = parsed_clusters[2] + _, _, taxa_dict_2 = process_cluster_data(cluster_2) + + result_2 = calculate_cluster_taxa(taxa_dict_2, args) + total = sum(value for d in result_2 for value in d.values()) + assert total == 1, "Single member cluster should not split" + + print("✓ Test 4 PASSED: Simple taxa calculation cases work correctly") + + def test_taxa_calculation_complex_splitting(self, parsed_clusters): + """ + Test 5: Taxa Calculation - Complex Splitting + + Tests the recursive taxonomic resolution algorithm for clusters with + multiple competing taxa that should be split based on thresholds. + """ + + class TestArgs: + uncertain_taxa_use_ratio = 0.5 + min_to_split = 0.30 # Lower threshold to encourage splitting + min_count_to_split = 5 # Lower threshold to encourage splitting + + args = TestArgs() + + # Test Cluster 3 (mixed Firmicutes and Proteobacteria) + cluster_3 = parsed_clusters[3] + _, _, taxa_dict_3 = process_cluster_data(cluster_3) + + # Manual check of expected taxa distribution + expected_taxa = {} + for header, data in cluster_3.items(): + taxa = data['taxa'] + count = data['count'] + expected_taxa[taxa] = expected_taxa.get(taxa, 0) + count + + result_3 = calculate_cluster_taxa(taxa_dict_3, args) + + # With mixed taxa and low thresholds, should potentially split + # The exact behavior depends on the algorithm implementation + total_result_count = sum(sum(group.values()) for group in result_3) + expected_total = sum(expected_taxa.values()) + + assert total_result_count == expected_total, "Total counts should be preserved after splitting" + + print("✓ Test 5 PASSED: Complex taxa splitting preserves counts and follows thresholds") + + def test_statistical_calculations(self, parsed_clusters): + """ + Test 6: Statistical Calculations + + Verifies that similarity and e-value statistics are calculated correctly + including averages, standard deviations, and distributions. + """ + # Process all clusters to get combined data + + eval_list, simi_list, _ = process_cluster_data(parsed_clusters[5]) + # Test similarity statistics + if eval_list: + expected_avg = sum(simi_list) / len(simi_list) + + # Manual verification of a few key values + # From our test data: read1=100% (100 times), read2=96.67% (50 times), etc. + total_similarity_sum = (100.0 * 166) + (98.88 * 9) + 98.86 + total_count = 176 + manual_avg = total_similarity_sum / total_count + + assert abs( + expected_avg - manual_avg) < 0.01, f"Similarity average mismatch: expected ~{manual_avg}, got {expected_avg}" + + # Test e-value data structure + annotated_evals = eval_list[1:] + assert all(isinstance(e, (int, float)) for e in annotated_evals), "All e-values should be numeric" + assert all(e > 0 for e in annotated_evals), "All e-values should be positive" + + print("✓ Test 6 PASSED: Statistical calculations are mathematically correct") + + def test_output_file_formats(self, test_data_dir, sample_cluster_file, sample_annotation_file): + """ + Test 7: Output File Formats + + Tests that all output files are created with correct structure and content, + including text files, Excel files with multiple sheets, and plot files. + """ + output_dir = test_data_dir + + # Parse data + clusters = parse_cluster_file(sample_cluster_file, sample_annotation_file) + + # Process all clusters + cluster_data_list = [] + all_eval_data = [0] + all_simi_data = [] + + for cluster in clusters: + eval_list, simi_list, taxa_dict = process_cluster_data(cluster) + cluster_data_list.append((eval_list, simi_list, taxa_dict)) + all_eval_data[0] += eval_list[0] + all_eval_data.extend(eval_list[1:]) + all_simi_data.extend(simi_list) + + # Test similarity output + simi_output = output_dir / "test_similarity.txt" + write_similarity_output(all_simi_data, str(simi_output)) + + assert simi_output.exists(), "Similarity output file not created" + with open(simi_output, 'r') as f: + content = f.read() + assert "# Average similarity:" in content, "Missing average similarity in output" + assert "# Standard deviation:" in content, "Missing standard deviation in output" + assert "similarity\tcount" in content, "Missing header in similarity output" + + # Test e-value output + eval_output = output_dir / "test_evalue.txt" + write_evalue_output(all_eval_data, str(eval_output)) + + assert eval_output.exists(), "E-value output file not created" + with open(eval_output, 'r') as f: + content = f.read() + assert "evalue\tcount" in content, "Missing header in e-value output" + + # Test count output + count_output = output_dir / "test_count.txt" + write_count_output(all_eval_data, cluster_data_list, str(count_output)) + + assert count_output.exists(), "Count output file not created" + with open(count_output, 'r') as f: + content = f.read() + assert "cluster\tunannotated\tannotated" in content, "Missing header in count output" + assert "TOTAL\t" in content, "Missing total row in count output" + + # Test taxa clusters Excel output + taxa_clusters_output = output_dir / "test_taxa_clusters.xlsx" + write_taxa_clusters_output(cluster_data_list, str(taxa_clusters_output)) + + assert taxa_clusters_output.exists(), "Taxa clusters Excel file not created" + df = pd.read_excel(taxa_clusters_output, sheet_name='Raw_Taxa_Clusters') + expected_columns = ['cluster', 'count', 'taxa_full', 'kingdom', 'phylum', 'class', 'order', 'family', 'genus', + 'species'] + assert all(col in df.columns for col in expected_columns), "Missing columns in taxa clusters output" + + print("✓ Test 7 PASSED: All output file formats are correct and complete") + + def test_taxa_processed_output_structure(self, test_data_dir, sample_cluster_file, sample_annotation_file): + """ + Test 8: Processed Taxa Output Structure + + Tests the complex processed taxa Excel output with multiple sheets + and parameter tracking. + """ + output_dir = test_data_dir + + class TestArgs: + uncertain_taxa_use_ratio = 0.6 + min_to_split = 0.35 + min_count_to_split = 15 + show_unannotated_clusters = True + + args = TestArgs() + + # Parse and process data + clusters = parse_cluster_file(sample_cluster_file, sample_annotation_file) + cluster_data_list = [] + + for cluster in clusters: + eval_list, simi_list, taxa_dict = process_cluster_data(cluster) + cluster_data_list.append((eval_list, simi_list, taxa_dict)) + + # Test processed taxa output + processed_output = output_dir / "test_processed_taxa.xlsx" + write_taxa_processed_output(cluster_data_list, args, str(processed_output)) + + assert processed_output.exists(), "Processed taxa Excel file not created" + + # Check multiple sheets exist + xl_file = pd.ExcelFile(processed_output) + expected_sheets = ['Processed_Taxa_Clusters', 'Settings'] + assert all(sheet in xl_file.sheet_names for sheet in expected_sheets), "Missing sheets in processed taxa output" + + # Check main data sheet + df_main = pd.read_excel(processed_output, sheet_name='Processed_Taxa_Clusters') + expected_columns = ['cluster', 'count', 'taxa_full', 'kingdom', 'phylum', 'class', 'order', 'family', 'genus', + 'species'] + assert all(col in df_main.columns for col in expected_columns), "Missing columns in processed taxa sheet" + + # Check settings sheet + df_settings = pd.read_excel(processed_output, sheet_name='Settings') + assert 'Parameter' in df_settings.columns, "Missing Parameter column in settings" + assert 'Value' in df_settings.columns, "Missing Value column in settings" + + # Verify settings values are recorded + settings_dict = dict(zip(df_settings['Parameter'], df_settings['Value'])) + assert settings_dict['uncertain_taxa_use_ratio'] == 0.6, "Settings not correctly recorded" + assert settings_dict['min_to_split'] == 0.35, "Settings not correctly recorded" + + print("✓ Test 8 PASSED: Processed taxa output has correct structure and settings tracking") + + def test_edge_cases(self, test_data_dir): + """ + Test 9: Edge Cases and Error Handling + + Tests handling of edge cases like empty files, missing annotations, + single-member clusters, and malformed input data. + """ + input_dir = test_data_dir + + # Test empty cluster file + empty_cluster = input_dir / "empty_cluster.clstr" + with open(empty_cluster, 'w') as f: + f.write("") + + clusters_empty = parse_cluster_file(str(empty_cluster)) + assert len(clusters_empty) == 0, "Empty cluster file should produce no clusters" + + # Test cluster file with no annotations + simple_cluster = input_dir / "simple_cluster.clstr" + simple_cluster_content = """>Cluster 0 +0 100nt, >read_no_anno:50... * +""" + with open(simple_cluster, 'w') as f: + f.write(simple_cluster_content) + + with pytest.raises(UnboundLocalError): + parse_cluster_file(str(simple_cluster), raise_on_error=True) + + # Test malformed cluster entries (missing parts) + malformed_cluster = input_dir / "malformed_cluster.clstr" + malformed_content = """>Cluster 0 +0 100nt, >read1:50..._CONS(50) * +invalid_line_without_proper_format +1 90nt, >read2:25..._CONS(25) at /+/95% +""" + annotations_malformed = input_dir / "test_pytest.xlsx" + with open(malformed_cluster, 'w') as f: + f.write(malformed_content) + + clusters_malformed = parse_cluster_file(str(malformed_cluster), str(annotations_malformed)) + # Should still parse valid entries and skip invalid ones + assert len(clusters_malformed) == 1, "Should parse valid entries from malformed file" + assert len(clusters_malformed[0]) == 2, "Should have 2 valid read" + assert clusters_malformed[0]['read1:50..._CONS']['evalue'] == 1.0e-50 + assert clusters_malformed[0]['read2:25..._CONS']['count'] == 25 + + print("✓ Test 9 PASSED: Edge cases handled gracefully without crashes") + + def test_count_preservation_across_processing(self, parsed_clusters): + """ + Test 10: Count Preservation Across Processing Pipeline + + Verifies that read counts are preserved throughout the entire processing + pipeline from cluster parsing through taxa calculation to final output. + """ + # Calculate expected total counts from original data + expected_total = 0 + for cluster in parsed_clusters: + for header, data in cluster.items(): + expected_total += data['count'] + + # Process through pipeline and verify counts at each stage + total_from_processing = 0 + taxa_processing_totals = [] + + class TestArgs: + uncertain_taxa_use_ratio = 0.5 + min_to_split = 0.45 + min_count_to_split = 10 + + args = TestArgs() + + for cluster in parsed_clusters: + eval_list, simi_list, taxa_dict = process_cluster_data(cluster) + + # Check that cluster processing preserves counts + cluster_total = eval_list[0] + len(eval_list[1:]) # unannotated + annotated + cluster_expected = sum(data['count'] for data in cluster.values()) + assert cluster_total == cluster_expected, f"Count mismatch in cluster processing: expected {cluster_expected}, got {cluster_total}" + + total_from_processing += cluster_total + + # Check taxa calculation preserves counts + taxa_results = calculate_cluster_taxa(taxa_dict, args) + taxa_total = sum(sum(group.values()) for group in taxa_results) + taxa_processing_totals.append(taxa_total) + + # Verify taxa dict total matches + taxa_dict_total = sum(taxa_dict.values()) + assert taxa_total <= taxa_dict_total, f"Count mismatch in taxa calculation: expected {taxa_dict_total}, got {taxa_total}" + + # Final verification + assert total_from_processing == expected_total, f"Total count preservation failed: expected {expected_total}, got {total_from_processing}" + + # Verify sum of all taxa processing equals original + total_taxa_processed = sum(taxa_processing_totals) + assert total_taxa_processed <= expected_total, f"Taxa processing total mismatch: expected {expected_total}, got {total_taxa_processed}" + + print("✓ Test 10 PASSED: Read counts preserved throughout entire processing pipeline") + + def test_11_parse_arguments_all_flags(self, tmp_path): + """ + Test 11: Argument Parsing with All Flags + + Ensures parse_arguments correctly handles all optional flags and values. + """ + from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca + args = ca.parse_arguments([ + '--input_cluster', str(tmp_path / "dummy.clstr"), + '--simi_plot_y_min', '90', + '--simi_plot_y_max', '99', + '--uncertain_taxa_use_ratio', '0.3', + '--min_to_split', '0.2', + '--min_count_to_split', '5', + '--show_unannotated_clusters', + '--make_taxa_in_cluster_split', + '--print_empty_files' + ]) + assert args.simi_plot_y_min == 90 + assert args.print_empty_files is True + + def test_12_process_cluster_data_valueerror(self): + """ + Test 12: Process Cluster Data with Bad E-value + + Ensures ValueError branches are handled and unannotated counts increase. + """ + from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca + cluster = { + "seq1": {"count": 1, "similarity": 95.0, "taxa": "taxonA", "evalue": "not_a_number"} + } + eval_list, simi_list, taxa_dict = ca.process_cluster_data(cluster) + assert eval_list[0] == 1 # unannotated read + + def test_13_write_similarity_and_evalue_empty(self, tmp_path): + """ + Test 13: Output Writers with Empty Data + """ + from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca + sim_file = tmp_path / "sim.txt" + eval_file = tmp_path / "eval.txt" + + ca.write_similarity_output([], str(sim_file)) + assert not sim_file.exists() or sim_file.read_text() == "" + + ca.write_evalue_output([5], str(eval_file)) + assert "unannotated" in eval_file.read_text() + + def test_14_write_count_zero_and_taxa_clusters_incomplete(self, tmp_path): + """ + Test 14: Count Writer with Zero Data and Taxa Clusters with Incomplete Taxa + """ + from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca + count_file = tmp_path / "count.txt" + taxa_file = tmp_path / "taxa.xlsx" + + ca.write_count_output([0], [], str(count_file)) + assert "TOTAL" in count_file.read_text() + + cluster_data = [([0], [], {"bad": 1})] + ca.write_taxa_clusters_output(cluster_data, str(taxa_file)) + assert taxa_file.exists() + + def test_15_write_taxa_processed_uncertain_and_settings(self, tmp_path): + """ + Test 15: Processed Taxa Output with Settings + """ + from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca + + class Args: + uncertain_taxa_use_ratio = 0.5 + min_to_split = 0.2 + min_count_to_split = 2 + show_unannotated_clusters = True + + out_file = tmp_path / "processed.xlsx" + cluster_data = [([0], [], {"Unannotated read": 2})] + ca.write_taxa_processed_output(cluster_data, Args(), str(out_file)) + assert out_file.exists() + + def test_16_create_evalue_plot_edge_cases(self, tmp_path): + """ + Test 16: E-value Plot Edge Cases + """ + from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca + out = tmp_path / "plot.png" + + # Only unannotated + ca.create_evalue_plot([0], [0], str(out)) + assert not out.exists() or out.stat().st_size == 0 + + # Empty after filtering + ca.create_evalue_plot([0, ], [], str(out)) + assert not out.exists() or out.stat().st_size == 0 + + # With valid values + ca.create_evalue_plot([0, 1e-5, 1e-3], [2], str(out)) + assert out.exists() + + def test_17_main_runs_and_prints(self, tmp_path, capsys): + """ + Test 17: Main Entry Point + """ + from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca + clstr = tmp_path / "simple.clstr" + clstr.write_text(">Cluster 0\n0 100nt, >seq1... *\n") + + out = tmp_path / "sim.txt" + args = [ + '--input_cluster', str(clstr), + '--output_similarity_txt', str(out) + ] + ca.main(args) + captured = capsys.readouterr() + assert "Processing complete" in captured.out + + + def test_16a_prepare_evalue_histogram_valid_data(self): + """ + Test 16a: prepare_evalue_histogram returns correct counts/bins. + """ + from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca + counts, bins = ca.prepare_evalue_histogram([1e-5, 1e-3, 0.5], []) + assert counts.sum() == 3 # 3 entries counted + assert len(bins) == 51 # 50 bins => 51 edges + + def test_16b_prepare_evalue_histogram_empty(self): + """ + Test 16b: prepare_evalue_histogram with empty/invalid data returns (None, None). + """ + from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca + counts, bins = ca.prepare_evalue_histogram([0, None, "bad"], []) + assert counts is None + assert bins is None + + def test_16c_create_evalue_plot_creates_file_and_returns_data(self, tmp_path): + """ + Test 16c: create_evalue_plot saves a PNG and returns numeric data. + """ + from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca + out = tmp_path / "eval.png" + counts, bins = ca.create_evalue_plot_test([1e-5, 1e-3, 0.5], [], str(out)) + assert out.exists() + assert counts.sum() == 3 + assert len(bins) == 51 + + +if __name__ == "__main__": + # Run all tests in this file + pytest.main([__file__]) \ No newline at end of file
