Mercurial > repos > onnodg > cdhit_analysis
comparison tests/test_cdhit_analysis.py @ 0:00d56396b32a draft
planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_clusters_tool commit c944fd5685f295acba06679e85b67973c173b137
| author | onnodg |
|---|---|
| date | Tue, 14 Oct 2025 09:09:46 +0000 |
| parents | |
| children | ff68835adb2b |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:00d56396b32a |
|---|---|
| 1 """ | |
| 2 Test suite for CD-HIT cluster analysis processor. | |
| 3 """ | |
| 4 | |
| 5 import pytest | |
| 6 from pathlib import Path | |
| 7 import pandas as pd | |
| 8 import os | |
| 9 import sys | |
| 10 | |
| 11 # Add module path | |
| 12 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| 13 from Stage_1_translated.NLOOR_scripts.process_clusters_tool.cdhit_analysis import ( | |
| 14 parse_cluster_file, | |
| 15 process_cluster_data, | |
| 16 calculate_cluster_taxa, | |
| 17 write_similarity_output, | |
| 18 write_evalue_output, | |
| 19 write_count_output, | |
| 20 write_taxa_clusters_output, | |
| 21 write_taxa_processed_output, | |
| 22 ) | |
| 23 | |
| 24 class TestCDHitAnalysis: | |
| 25 """Test class for CD-HIT cluster analysis processor using real XLSX test data.""" | |
| 26 | |
| 27 @pytest.fixture(scope="class") | |
| 28 def test_data_dir(self): | |
| 29 """Return path to the test-data directory with real XLSX files.""" | |
| 30 base_dir = Path("Stage_1_translated/NLOOR_scripts/process_clusters_tool/test-data") | |
| 31 assert base_dir.exists(), f"Test data directory does not exist: {base_dir}" | |
| 32 return base_dir | |
| 33 | |
| 34 @pytest.fixture(scope="class") | |
| 35 def sample_cluster_file(self, test_data_dir): | |
| 36 """Return path to the sample cluster XLSX file.""" | |
| 37 cluster_file = test_data_dir / "29-test.clstr.txt" | |
| 38 assert cluster_file.exists(), f"Sample cluster file not found: {cluster_file}" | |
| 39 return str(cluster_file) | |
| 40 | |
| 41 @pytest.fixture(scope="class") | |
| 42 def sample_annotation_file(self, test_data_dir): | |
| 43 """Return path to the sample annotation XLSX file.""" | |
| 44 annotation_file = test_data_dir / "header_anno_29_test.xlsx" | |
| 45 assert annotation_file.exists(), f"Sample annotation file not found: {annotation_file}" | |
| 46 return str(annotation_file) | |
| 47 | |
| 48 @pytest.fixture(scope="class") | |
| 49 def parsed_clusters(self, sample_cluster_file, sample_annotation_file): | |
| 50 """Parse the sample cluster file with annotations.""" | |
| 51 return parse_cluster_file(sample_cluster_file, sample_annotation_file) | |
| 52 | |
| 53 def test_cluster_parsing_structure(self, parsed_clusters): | |
| 54 """ | |
| 55 Test 1: Cluster File Parsing Structure | |
| 56 | |
| 57 Verifies that cluster files are correctly parsed into the expected data structure | |
| 58 with proper extraction of headers, counts, similarities, and cluster groupings. | |
| 59 """ | |
| 60 # Should have 4 clusters based on sample data | |
| 61 # for x in parsed_clusters: print(x); | |
| 62 assert len(parsed_clusters) == 24, f"Expected 24 clusters, got {len(parsed_clusters)}" | |
| 63 | |
| 64 # Test Cluster 0 structure (3 members) | |
| 65 cluster_0 = parsed_clusters[0] | |
| 66 assert len(cluster_0) == 41, "Cluster 0 should have 41 members" | |
| 67 cluster_3 = parsed_clusters[3] | |
| 68 assert len(cluster_3) == 4, "Cluster 3 should have 4 members" | |
| 69 | |
| 70 # Check specific member data | |
| 71 assert 'M01687:476:000000000-LL5F5:1:2119:23468:21624_CONS' in cluster_0, "this read should be in cluster 0" | |
| 72 read1_data = cluster_0['M01687:476:000000000-LL5F5:1:2119:23468:21624_CONS'] | |
| 73 assert read1_data['count'] == 1, "read1 count should be 1" | |
| 74 assert read1_data['similarity'] == 97.78, "read1 should be representative (100% similarity)" | |
| 75 assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in read1_data['taxa'], "read1 should have this taxa" | |
| 76 | |
| 77 # Check non-representative member | |
| 78 assert 'M01687:476:000000000-LL5F5:1:1107:11168:7701_CONS' in cluster_0, "this read should be in cluster 0" | |
| 79 read2_data = cluster_0['M01687:476:000000000-LL5F5:1:1107:11168:7701_CONS'] | |
| 80 assert read2_data['count'] == 1, "read2 count should be 50" | |
| 81 assert read2_data['similarity'] == 100, "read2 similarity should be 100%" | |
| 82 assert read2_data['taxa'] == "Unannotated read" | |
| 83 | |
| 84 # Test single-member cluster (Cluster 2) | |
| 85 cluster_2 = parsed_clusters[2] | |
| 86 assert len(cluster_2) == 1, "Cluster 2 should have 1 member" | |
| 87 assert 'M01687:476:000000000-LL5F5:1:2108:17627:10678_CONS' in cluster_2, "this read should be in cluster 2" | |
| 88 | |
| 89 print("✓ Test 1 PASSED: Cluster file parsing structure correct") | |
| 90 | |
| 91 def test_annotation_integration(self, parsed_clusters): | |
| 92 """ | |
| 93 Test 2: Annotation Integration | |
| 94 | |
| 95 Verifies that annotations from the separate annotation file are correctly | |
| 96 matched to cluster members based on header names. | |
| 97 """ | |
| 98 # Check that annotations were properly integrated | |
| 99 cluster_0 = parsed_clusters[0] | |
| 100 | |
| 101 # Verify e-values are correctly assigned | |
| 102 assert cluster_0['M01687:476:000000000-LL5F5:1:1102:8813:1648_CONS']['evalue'] == 1.41e-39, "read1 e-value incorrect" | |
| 103 assert cluster_0['M01687:476:000000000-LL5F5:1:1102:23329:6743_CONS']['evalue'] == 2.32e-37, "read2 e-value incorrect" | |
| 104 assert cluster_0['M01687:476:000000000-LL5F5:1:1102:22397:8283_CONS']['evalue'] == 2.32e-37, "read3 e-value incorrect" | |
| 105 | |
| 106 # Verify taxa assignments | |
| 107 assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:8813:1648_CONS']['taxa'], "read1 taxa incorrect" | |
| 108 assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:23329:6743_CONS']['taxa'], "read2 taxa incorrect" | |
| 109 assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:22397:8283_CONS']['taxa'], "read3 taxa incorrect" | |
| 110 | |
| 111 # Test missing annotation handling (if any reads lack annotations) | |
| 112 # All our test reads have annotations, so this tests the default case | |
| 113 for cluster in parsed_clusters: | |
| 114 for header, data in cluster.items(): | |
| 115 if data['evalue'] == 'Unannotated read': | |
| 116 assert data['taxa'] == 'Unannotated read', "Unannotated handling incorrect" | |
| 117 | |
| 118 print("✓ Test 2 PASSED: Annotations correctly integrated with cluster data") | |
| 119 | |
| 120 def test_cluster_data_processing(self, parsed_clusters): | |
| 121 """ | |
| 122 Test 3: Cluster Data Processing | |
| 123 | |
| 124 Tests the processing of individual clusters to extract evaluation lists, | |
| 125 similarity lists, and taxa dictionaries with correct count aggregation. | |
| 126 """ | |
| 127 # Test processing of Cluster 0 (mixed taxa) | |
| 128 cluster_0 = parsed_clusters[0] | |
| 129 eval_list, simi_list, taxa_dict = process_cluster_data(cluster_0) | |
| 130 | |
| 131 # Check eval_list structure | |
| 132 # for x in eval_list: print(x) | |
| 133 assert eval_list[0] == 2, "Two unannotated reads in this cluster, should be 2" | |
| 134 assert len(eval_list) == 409, "Should have 409 annotated reads + 2 unnanotated reads (counted as 1)" | |
| 135 | |
| 136 # Check that e-values are correctly converted and repeated by count | |
| 137 eval_values = eval_list[1:] # Skip unannotated count | |
| 138 read1_evals = [e for e in eval_values if e == 1.41e-39] | |
| 139 assert len(read1_evals) == 365, "Should have 100 instances of read1's e-value" | |
| 140 | |
| 141 # # Check similarity list | |
| 142 # for x in simi_list: print(x) | |
| 143 assert len(simi_list) == 410, "Should have 410 similarity values" | |
| 144 read1_similarities = [s for s in simi_list if s == 100.0] | |
| 145 assert len(read1_similarities) == 2, "Should have 2 instances of 100% similarity" | |
| 146 | |
| 147 assert taxa_dict['Unannotated read'] == 2, "Unannotated reads should be 2" | |
| 148 assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa'] == 406, "taxa should be 406" | |
| 149 assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Uncertain taxa / Uncertain taxa / Uncertain taxa'] == 1, "taxa should be 1" | |
| 150 assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Actinidia / Actinidia kolomikta'] == 1, "taxa should be 1" | |
| 151 print("✓ Test 3 PASSED: Cluster data processing produces correct aggregated data") | |
| 152 | |
| 153 def test_taxa_calculation_simple_case(self, parsed_clusters): | |
| 154 """ | |
| 155 Test 4: Taxa Calculation - Simple Case | |
| 156 | |
| 157 Tests taxonomic resolution for clusters with clear dominant taxa | |
| 158 (single taxa or overwhelming majority). | |
| 159 """ | |
| 160 | |
| 161 # Create test arguments | |
| 162 class TestArgs: | |
| 163 uncertain_taxa_use_ratio = 0.5 | |
| 164 min_to_split = 0.45 | |
| 165 min_count_to_split = 10 | |
| 166 | |
| 167 args = TestArgs() | |
| 168 | |
| 169 # Test Cluster 1 (should be clear Archaea) | |
| 170 cluster_5 = parsed_clusters[5] | |
| 171 _, _, taxa_dict_5 = process_cluster_data(cluster_5) | |
| 172 | |
| 173 result_5 = calculate_cluster_taxa(taxa_dict_5, args) | |
| 174 # Should return single taxa group for Archaea | |
| 175 assert len(result_5) == 1, "Single dominant taxa should not split" | |
| 176 dominant_taxa = list(result_5[0].keys())[0] | |
| 177 assert 'Viridiplantae / Streptophyta / Magnoliopsida / Fagales / Juglandaceae / ' \ | |
| 178 'Uncertain taxa / Uncertain taxa' in dominant_taxa, "Should identify Juglandaceae as dominant" | |
| 179 | |
| 180 # Test single-member cluster (Cluster 2) | |
| 181 cluster_2 = parsed_clusters[2] | |
| 182 _, _, taxa_dict_2 = process_cluster_data(cluster_2) | |
| 183 | |
| 184 result_2 = calculate_cluster_taxa(taxa_dict_2, args) | |
| 185 total = sum(value for d in result_2 for value in d.values()) | |
| 186 assert total == 1, "Single member cluster should not split" | |
| 187 | |
| 188 print("✓ Test 4 PASSED: Simple taxa calculation cases work correctly") | |
| 189 | |
| 190 def test_taxa_calculation_complex_splitting(self, parsed_clusters): | |
| 191 """ | |
| 192 Test 5: Taxa Calculation - Complex Splitting | |
| 193 | |
| 194 Tests the recursive taxonomic resolution algorithm for clusters with | |
| 195 multiple competing taxa that should be split based on thresholds. | |
| 196 """ | |
| 197 | |
| 198 class TestArgs: | |
| 199 uncertain_taxa_use_ratio = 0.5 | |
| 200 min_to_split = 0.30 # Lower threshold to encourage splitting | |
| 201 min_count_to_split = 5 # Lower threshold to encourage splitting | |
| 202 | |
| 203 args = TestArgs() | |
| 204 | |
| 205 # Test Cluster 3 (mixed Firmicutes and Proteobacteria) | |
| 206 cluster_3 = parsed_clusters[3] | |
| 207 _, _, taxa_dict_3 = process_cluster_data(cluster_3) | |
| 208 | |
| 209 # Manual check of expected taxa distribution | |
| 210 expected_taxa = {} | |
| 211 for header, data in cluster_3.items(): | |
| 212 taxa = data['taxa'] | |
| 213 count = data['count'] | |
| 214 expected_taxa[taxa] = expected_taxa.get(taxa, 0) + count | |
| 215 | |
| 216 result_3 = calculate_cluster_taxa(taxa_dict_3, args) | |
| 217 | |
| 218 # With mixed taxa and low thresholds, should potentially split | |
| 219 # The exact behavior depends on the algorithm implementation | |
| 220 total_result_count = sum(sum(group.values()) for group in result_3) | |
| 221 expected_total = sum(expected_taxa.values()) | |
| 222 | |
| 223 assert total_result_count == expected_total, "Total counts should be preserved after splitting" | |
| 224 | |
| 225 print("✓ Test 5 PASSED: Complex taxa splitting preserves counts and follows thresholds") | |
| 226 | |
| 227 def test_statistical_calculations(self, parsed_clusters): | |
| 228 """ | |
| 229 Test 6: Statistical Calculations | |
| 230 | |
| 231 Verifies that similarity and e-value statistics are calculated correctly | |
| 232 including averages, standard deviations, and distributions. | |
| 233 """ | |
| 234 # Process all clusters to get combined data | |
| 235 | |
| 236 eval_list, simi_list, _ = process_cluster_data(parsed_clusters[5]) | |
| 237 # Test similarity statistics | |
| 238 if eval_list: | |
| 239 expected_avg = sum(simi_list) / len(simi_list) | |
| 240 | |
| 241 # Manual verification of a few key values | |
| 242 # From our test data: read1=100% (100 times), read2=96.67% (50 times), etc. | |
| 243 total_similarity_sum = (100.0 * 166) + (98.88 * 9) + 98.86 | |
| 244 total_count = 176 | |
| 245 manual_avg = total_similarity_sum / total_count | |
| 246 | |
| 247 assert abs( | |
| 248 expected_avg - manual_avg) < 0.01, f"Similarity average mismatch: expected ~{manual_avg}, got {expected_avg}" | |
| 249 | |
| 250 # Test e-value data structure | |
| 251 annotated_evals = eval_list[1:] | |
| 252 assert all(isinstance(e, (int, float)) for e in annotated_evals), "All e-values should be numeric" | |
| 253 assert all(e > 0 for e in annotated_evals), "All e-values should be positive" | |
| 254 | |
| 255 print("✓ Test 6 PASSED: Statistical calculations are mathematically correct") | |
| 256 | |
| 257 def test_output_file_formats(self, test_data_dir, sample_cluster_file, sample_annotation_file): | |
| 258 """ | |
| 259 Test 7: Output File Formats | |
| 260 | |
| 261 Tests that all output files are created with correct structure and content, | |
| 262 including text files, Excel files with multiple sheets, and plot files. | |
| 263 """ | |
| 264 output_dir = test_data_dir | |
| 265 | |
| 266 # Parse data | |
| 267 clusters = parse_cluster_file(sample_cluster_file, sample_annotation_file) | |
| 268 | |
| 269 # Process all clusters | |
| 270 cluster_data_list = [] | |
| 271 all_eval_data = [0] | |
| 272 all_simi_data = [] | |
| 273 | |
| 274 for cluster in clusters: | |
| 275 eval_list, simi_list, taxa_dict = process_cluster_data(cluster) | |
| 276 cluster_data_list.append((eval_list, simi_list, taxa_dict)) | |
| 277 all_eval_data[0] += eval_list[0] | |
| 278 all_eval_data.extend(eval_list[1:]) | |
| 279 all_simi_data.extend(simi_list) | |
| 280 | |
| 281 # Test similarity output | |
| 282 simi_output = output_dir / "test_similarity.txt" | |
| 283 write_similarity_output(all_simi_data, str(simi_output)) | |
| 284 | |
| 285 assert simi_output.exists(), "Similarity output file not created" | |
| 286 with open(simi_output, 'r') as f: | |
| 287 content = f.read() | |
| 288 assert "# Average similarity:" in content, "Missing average similarity in output" | |
| 289 assert "# Standard deviation:" in content, "Missing standard deviation in output" | |
| 290 assert "similarity\tcount" in content, "Missing header in similarity output" | |
| 291 | |
| 292 # Test e-value output | |
| 293 eval_output = output_dir / "test_evalue.txt" | |
| 294 write_evalue_output(all_eval_data, str(eval_output)) | |
| 295 | |
| 296 assert eval_output.exists(), "E-value output file not created" | |
| 297 with open(eval_output, 'r') as f: | |
| 298 content = f.read() | |
| 299 assert "evalue\tcount" in content, "Missing header in e-value output" | |
| 300 | |
| 301 # Test count output | |
| 302 count_output = output_dir / "test_count.txt" | |
| 303 write_count_output(all_eval_data, cluster_data_list, str(count_output)) | |
| 304 | |
| 305 assert count_output.exists(), "Count output file not created" | |
| 306 with open(count_output, 'r') as f: | |
| 307 content = f.read() | |
| 308 assert "cluster\tunannotated\tannotated" in content, "Missing header in count output" | |
| 309 assert "TOTAL\t" in content, "Missing total row in count output" | |
| 310 | |
| 311 # Test taxa clusters Excel output | |
| 312 taxa_clusters_output = output_dir / "test_taxa_clusters.xlsx" | |
| 313 write_taxa_clusters_output(cluster_data_list, str(taxa_clusters_output)) | |
| 314 | |
| 315 assert taxa_clusters_output.exists(), "Taxa clusters Excel file not created" | |
| 316 df = pd.read_excel(taxa_clusters_output, sheet_name='Raw_Taxa_Clusters') | |
| 317 expected_columns = ['cluster', 'count', 'taxa_full', 'kingdom', 'phylum', 'class', 'order', 'family', 'genus', | |
| 318 'species'] | |
| 319 assert all(col in df.columns for col in expected_columns), "Missing columns in taxa clusters output" | |
| 320 | |
| 321 print("✓ Test 7 PASSED: All output file formats are correct and complete") | |
| 322 | |
| 323 def test_taxa_processed_output_structure(self, test_data_dir, sample_cluster_file, sample_annotation_file): | |
| 324 """ | |
| 325 Test 8: Processed Taxa Output Structure | |
| 326 | |
| 327 Tests the complex processed taxa Excel output with multiple sheets | |
| 328 and parameter tracking. | |
| 329 """ | |
| 330 output_dir = test_data_dir | |
| 331 | |
| 332 class TestArgs: | |
| 333 uncertain_taxa_use_ratio = 0.6 | |
| 334 min_to_split = 0.35 | |
| 335 min_count_to_split = 15 | |
| 336 show_unannotated_clusters = True | |
| 337 | |
| 338 args = TestArgs() | |
| 339 | |
| 340 # Parse and process data | |
| 341 clusters = parse_cluster_file(sample_cluster_file, sample_annotation_file) | |
| 342 cluster_data_list = [] | |
| 343 | |
| 344 for cluster in clusters: | |
| 345 eval_list, simi_list, taxa_dict = process_cluster_data(cluster) | |
| 346 cluster_data_list.append((eval_list, simi_list, taxa_dict)) | |
| 347 | |
| 348 # Test processed taxa output | |
| 349 processed_output = output_dir / "test_processed_taxa.xlsx" | |
| 350 write_taxa_processed_output(cluster_data_list, args, str(processed_output)) | |
| 351 | |
| 352 assert processed_output.exists(), "Processed taxa Excel file not created" | |
| 353 | |
| 354 # Check multiple sheets exist | |
| 355 xl_file = pd.ExcelFile(processed_output) | |
| 356 expected_sheets = ['Processed_Taxa_Clusters', 'Settings'] | |
| 357 assert all(sheet in xl_file.sheet_names for sheet in expected_sheets), "Missing sheets in processed taxa output" | |
| 358 | |
| 359 # Check main data sheet | |
| 360 df_main = pd.read_excel(processed_output, sheet_name='Processed_Taxa_Clusters') | |
| 361 expected_columns = ['cluster', 'count', 'taxa_full', 'kingdom', 'phylum', 'class', 'order', 'family', 'genus', | |
| 362 'species'] | |
| 363 assert all(col in df_main.columns for col in expected_columns), "Missing columns in processed taxa sheet" | |
| 364 | |
| 365 # Check settings sheet | |
| 366 df_settings = pd.read_excel(processed_output, sheet_name='Settings') | |
| 367 assert 'Parameter' in df_settings.columns, "Missing Parameter column in settings" | |
| 368 assert 'Value' in df_settings.columns, "Missing Value column in settings" | |
| 369 | |
| 370 # Verify settings values are recorded | |
| 371 settings_dict = dict(zip(df_settings['Parameter'], df_settings['Value'])) | |
| 372 assert settings_dict['uncertain_taxa_use_ratio'] == 0.6, "Settings not correctly recorded" | |
| 373 assert settings_dict['min_to_split'] == 0.35, "Settings not correctly recorded" | |
| 374 | |
| 375 print("✓ Test 8 PASSED: Processed taxa output has correct structure and settings tracking") | |
| 376 | |
| 377 def test_edge_cases(self, test_data_dir): | |
| 378 """ | |
| 379 Test 9: Edge Cases and Error Handling | |
| 380 | |
| 381 Tests handling of edge cases like empty files, missing annotations, | |
| 382 single-member clusters, and malformed input data. | |
| 383 """ | |
| 384 input_dir = test_data_dir | |
| 385 | |
| 386 # Test empty cluster file | |
| 387 empty_cluster = input_dir / "empty_cluster.clstr" | |
| 388 with open(empty_cluster, 'w') as f: | |
| 389 f.write("") | |
| 390 | |
| 391 clusters_empty = parse_cluster_file(str(empty_cluster)) | |
| 392 assert len(clusters_empty) == 0, "Empty cluster file should produce no clusters" | |
| 393 | |
| 394 # Test cluster file with no annotations | |
| 395 simple_cluster = input_dir / "simple_cluster.clstr" | |
| 396 simple_cluster_content = """>Cluster 0 | |
| 397 0 100nt, >read_no_anno:50... * | |
| 398 """ | |
| 399 with open(simple_cluster, 'w') as f: | |
| 400 f.write(simple_cluster_content) | |
| 401 | |
| 402 with pytest.raises(UnboundLocalError): | |
| 403 parse_cluster_file(str(simple_cluster), raise_on_error=True) | |
| 404 | |
| 405 # Test malformed cluster entries (missing parts) | |
| 406 malformed_cluster = input_dir / "malformed_cluster.clstr" | |
| 407 malformed_content = """>Cluster 0 | |
| 408 0 100nt, >read1:50..._CONS(50) * | |
| 409 invalid_line_without_proper_format | |
| 410 1 90nt, >read2:25..._CONS(25) at /+/95% | |
| 411 """ | |
| 412 annotations_malformed = input_dir / "test_pytest.xlsx" | |
| 413 with open(malformed_cluster, 'w') as f: | |
| 414 f.write(malformed_content) | |
| 415 | |
| 416 clusters_malformed = parse_cluster_file(str(malformed_cluster), str(annotations_malformed)) | |
| 417 # Should still parse valid entries and skip invalid ones | |
| 418 assert len(clusters_malformed) == 1, "Should parse valid entries from malformed file" | |
| 419 assert len(clusters_malformed[0]) == 2, "Should have 2 valid read" | |
| 420 assert clusters_malformed[0]['read1:50..._CONS']['evalue'] == 1.0e-50 | |
| 421 assert clusters_malformed[0]['read2:25..._CONS']['count'] == 25 | |
| 422 | |
| 423 print("✓ Test 9 PASSED: Edge cases handled gracefully without crashes") | |
| 424 | |
| 425 def test_count_preservation_across_processing(self, parsed_clusters): | |
| 426 """ | |
| 427 Test 10: Count Preservation Across Processing Pipeline | |
| 428 | |
| 429 Verifies that read counts are preserved throughout the entire processing | |
| 430 pipeline from cluster parsing through taxa calculation to final output. | |
| 431 """ | |
| 432 # Calculate expected total counts from original data | |
| 433 expected_total = 0 | |
| 434 for cluster in parsed_clusters: | |
| 435 for header, data in cluster.items(): | |
| 436 expected_total += data['count'] | |
| 437 | |
| 438 # Process through pipeline and verify counts at each stage | |
| 439 total_from_processing = 0 | |
| 440 taxa_processing_totals = [] | |
| 441 | |
| 442 class TestArgs: | |
| 443 uncertain_taxa_use_ratio = 0.5 | |
| 444 min_to_split = 0.45 | |
| 445 min_count_to_split = 10 | |
| 446 | |
| 447 args = TestArgs() | |
| 448 | |
| 449 for cluster in parsed_clusters: | |
| 450 eval_list, simi_list, taxa_dict = process_cluster_data(cluster) | |
| 451 | |
| 452 # Check that cluster processing preserves counts | |
| 453 cluster_total = eval_list[0] + len(eval_list[1:]) # unannotated + annotated | |
| 454 cluster_expected = sum(data['count'] for data in cluster.values()) | |
| 455 assert cluster_total == cluster_expected, f"Count mismatch in cluster processing: expected {cluster_expected}, got {cluster_total}" | |
| 456 | |
| 457 total_from_processing += cluster_total | |
| 458 | |
| 459 # Check taxa calculation preserves counts | |
| 460 taxa_results = calculate_cluster_taxa(taxa_dict, args) | |
| 461 taxa_total = sum(sum(group.values()) for group in taxa_results) | |
| 462 taxa_processing_totals.append(taxa_total) | |
| 463 | |
| 464 # Verify taxa dict total matches | |
| 465 taxa_dict_total = sum(taxa_dict.values()) | |
| 466 assert taxa_total <= taxa_dict_total, f"Count mismatch in taxa calculation: expected {taxa_dict_total}, got {taxa_total}" | |
| 467 | |
| 468 # Final verification | |
| 469 assert total_from_processing == expected_total, f"Total count preservation failed: expected {expected_total}, got {total_from_processing}" | |
| 470 | |
| 471 # Verify sum of all taxa processing equals original | |
| 472 total_taxa_processed = sum(taxa_processing_totals) | |
| 473 assert total_taxa_processed <= expected_total, f"Taxa processing total mismatch: expected {expected_total}, got {total_taxa_processed}" | |
| 474 | |
| 475 print("✓ Test 10 PASSED: Read counts preserved throughout entire processing pipeline") | |
| 476 | |
| 477 def test_11_parse_arguments_all_flags(self, tmp_path): | |
| 478 """ | |
| 479 Test 11: Argument Parsing with All Flags | |
| 480 | |
| 481 Ensures parse_arguments correctly handles all optional flags and values. | |
| 482 """ | |
| 483 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca | |
| 484 args = ca.parse_arguments([ | |
| 485 '--input_cluster', str(tmp_path / "dummy.clstr"), | |
| 486 '--simi_plot_y_min', '90', | |
| 487 '--simi_plot_y_max', '99', | |
| 488 '--uncertain_taxa_use_ratio', '0.3', | |
| 489 '--min_to_split', '0.2', | |
| 490 '--min_count_to_split', '5', | |
| 491 '--show_unannotated_clusters', | |
| 492 '--make_taxa_in_cluster_split', | |
| 493 '--print_empty_files' | |
| 494 ]) | |
| 495 assert args.simi_plot_y_min == 90 | |
| 496 assert args.print_empty_files is True | |
| 497 | |
| 498 def test_12_process_cluster_data_valueerror(self): | |
| 499 """ | |
| 500 Test 12: Process Cluster Data with Bad E-value | |
| 501 | |
| 502 Ensures ValueError branches are handled and unannotated counts increase. | |
| 503 """ | |
| 504 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca | |
| 505 cluster = { | |
| 506 "seq1": {"count": 1, "similarity": 95.0, "taxa": "taxonA", "evalue": "not_a_number"} | |
| 507 } | |
| 508 eval_list, simi_list, taxa_dict = ca.process_cluster_data(cluster) | |
| 509 assert eval_list[0] == 1 # unannotated read | |
| 510 | |
| 511 def test_13_write_similarity_and_evalue_empty(self, tmp_path): | |
| 512 """ | |
| 513 Test 13: Output Writers with Empty Data | |
| 514 """ | |
| 515 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca | |
| 516 sim_file = tmp_path / "sim.txt" | |
| 517 eval_file = tmp_path / "eval.txt" | |
| 518 | |
| 519 ca.write_similarity_output([], str(sim_file)) | |
| 520 assert not sim_file.exists() or sim_file.read_text() == "" | |
| 521 | |
| 522 ca.write_evalue_output([5], str(eval_file)) | |
| 523 assert "unannotated" in eval_file.read_text() | |
| 524 | |
| 525 def test_14_write_count_zero_and_taxa_clusters_incomplete(self, tmp_path): | |
| 526 """ | |
| 527 Test 14: Count Writer with Zero Data and Taxa Clusters with Incomplete Taxa | |
| 528 """ | |
| 529 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca | |
| 530 count_file = tmp_path / "count.txt" | |
| 531 taxa_file = tmp_path / "taxa.xlsx" | |
| 532 | |
| 533 ca.write_count_output([0], [], str(count_file)) | |
| 534 assert "TOTAL" in count_file.read_text() | |
| 535 | |
| 536 cluster_data = [([0], [], {"bad": 1})] | |
| 537 ca.write_taxa_clusters_output(cluster_data, str(taxa_file)) | |
| 538 assert taxa_file.exists() | |
| 539 | |
| 540 def test_15_write_taxa_processed_uncertain_and_settings(self, tmp_path): | |
| 541 """ | |
| 542 Test 15: Processed Taxa Output with Settings | |
| 543 """ | |
| 544 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca | |
| 545 | |
| 546 class Args: | |
| 547 uncertain_taxa_use_ratio = 0.5 | |
| 548 min_to_split = 0.2 | |
| 549 min_count_to_split = 2 | |
| 550 show_unannotated_clusters = True | |
| 551 | |
| 552 out_file = tmp_path / "processed.xlsx" | |
| 553 cluster_data = [([0], [], {"Unannotated read": 2})] | |
| 554 ca.write_taxa_processed_output(cluster_data, Args(), str(out_file)) | |
| 555 assert out_file.exists() | |
| 556 | |
| 557 def test_16_create_evalue_plot_edge_cases(self, tmp_path): | |
| 558 """ | |
| 559 Test 16: E-value Plot Edge Cases | |
| 560 """ | |
| 561 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca | |
| 562 out = tmp_path / "plot.png" | |
| 563 | |
| 564 # Only unannotated | |
| 565 ca.create_evalue_plot([0], [0], str(out)) | |
| 566 assert not out.exists() or out.stat().st_size == 0 | |
| 567 | |
| 568 # Empty after filtering | |
| 569 ca.create_evalue_plot([0, ], [], str(out)) | |
| 570 assert not out.exists() or out.stat().st_size == 0 | |
| 571 | |
| 572 # With valid values | |
| 573 ca.create_evalue_plot([0, 1e-5, 1e-3], [2], str(out)) | |
| 574 assert out.exists() | |
| 575 | |
| 576 def test_17_main_runs_and_prints(self, tmp_path, capsys): | |
| 577 """ | |
| 578 Test 17: Main Entry Point | |
| 579 """ | |
| 580 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca | |
| 581 clstr = tmp_path / "simple.clstr" | |
| 582 clstr.write_text(">Cluster 0\n0 100nt, >seq1... *\n") | |
| 583 | |
| 584 out = tmp_path / "sim.txt" | |
| 585 args = [ | |
| 586 '--input_cluster', str(clstr), | |
| 587 '--output_similarity_txt', str(out) | |
| 588 ] | |
| 589 ca.main(args) | |
| 590 captured = capsys.readouterr() | |
| 591 assert "Processing complete" in captured.out | |
| 592 | |
| 593 | |
| 594 def test_16a_prepare_evalue_histogram_valid_data(self): | |
| 595 """ | |
| 596 Test 16a: prepare_evalue_histogram returns correct counts/bins. | |
| 597 """ | |
| 598 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca | |
| 599 counts, bins = ca.prepare_evalue_histogram([1e-5, 1e-3, 0.5], []) | |
| 600 assert counts.sum() == 3 # 3 entries counted | |
| 601 assert len(bins) == 51 # 50 bins => 51 edges | |
| 602 | |
| 603 def test_16b_prepare_evalue_histogram_empty(self): | |
| 604 """ | |
| 605 Test 16b: prepare_evalue_histogram with empty/invalid data returns (None, None). | |
| 606 """ | |
| 607 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca | |
| 608 counts, bins = ca.prepare_evalue_histogram([0, None, "bad"], []) | |
| 609 assert counts is None | |
| 610 assert bins is None | |
| 611 | |
| 612 def test_16c_create_evalue_plot_creates_file_and_returns_data(self, tmp_path): | |
| 613 """ | |
| 614 Test 16c: create_evalue_plot saves a PNG and returns numeric data. | |
| 615 """ | |
| 616 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca | |
| 617 out = tmp_path / "eval.png" | |
| 618 counts, bins = ca.create_evalue_plot_test([1e-5, 1e-3, 0.5], [], str(out)) | |
| 619 assert out.exists() | |
| 620 assert counts.sum() == 3 | |
| 621 assert len(bins) == 51 | |
| 622 | |
| 623 | |
| 624 if __name__ == "__main__": | |
| 625 # Run all tests in this file | |
| 626 pytest.main([__file__]) |
