comparison tests/test_cdhit_analysis.py @ 4:e64af72e1b8f draft default tip

planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_clusters_tool commit 4017d38cf327c48a6252e488ba792527dae97a70-dirty
author onnodg
date Mon, 15 Dec 2025 16:44:40 +0000
parents ff68835adb2b
children
comparison
equal deleted inserted replaced
3:c6981ea453ae 4:e64af72e1b8f
1 """ 1 """
2 Test suite for CD-HIT cluster analysis processor. 2 Test suite for CD-HIT cluster analysis processor.
3 """ 3 """
4
5 import pytest 4 import pytest
6 from pathlib import Path 5 from pathlib import Path
7 import pandas as pd 6 import pandas as pd
8 import os 7 import os
9 import sys 8 import sys
10 9
11 # Add module path
12 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) 10 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
13 from Stage_1_translated.NLOOR_scripts.process_clusters_tool.cdhit_analysis import ( 11 from Stage_1_translated.NLOOR_scripts.process_clusters_tool.cdhit_analysis import (
14 parse_cluster_file, 12 parse_cluster_file,
15 process_cluster_data, 13 process_cluster_data,
16 calculate_cluster_taxa, 14 calculate_cluster_taxa,
17 write_similarity_output, 15 write_similarity_output,
18 write_evalue_output,
19 write_count_output, 16 write_count_output,
20 write_taxa_clusters_output, 17 write_taxa_excel,
21 write_taxa_processed_output,
22 ) 18 )
23 19
20
24 class TestCDHitAnalysis: 21 class TestCDHitAnalysis:
25 """Test class for CD-HIT cluster analysis processor using real XLSX test data."""
26 22
27 @pytest.fixture(scope="class") 23 @pytest.fixture(scope="class")
28 def test_data_dir(self): 24 def test_data_dir(self):
29 """Return path to the test-data directory with real XLSX files.""" 25 base = Path("Stage_1_translated/NLOOR_scripts/process_clusters_tool/test-data")
30 base_dir = Path("Stage_1_translated/NLOOR_scripts/process_clusters_tool/test-data") 26 assert base.exists()
31 assert base_dir.exists(), f"Test data directory does not exist: {base_dir}" 27 return base
32 return base_dir
33 28
34 @pytest.fixture(scope="class") 29 @pytest.fixture(scope="class")
35 def sample_cluster_file(self, test_data_dir): 30 def sample_cluster_file(self, test_data_dir):
36 """Return path to the sample cluster XLSX file.""" 31 f = test_data_dir / "prev_anno.txt"
37 cluster_file = test_data_dir / "29-test.clstr.txt" 32 assert f.exists()
38 assert cluster_file.exists(), f"Sample cluster file not found: {cluster_file}" 33 return str(f)
39 return str(cluster_file)
40 34
41 @pytest.fixture(scope="class") 35 @pytest.fixture(scope="class")
42 def sample_annotation_file(self, test_data_dir): 36 def sample_annotation_file(self, test_data_dir):
43 """Return path to the sample annotation XLSX file.""" 37 f = test_data_dir / "prev4.xlsx"
44 annotation_file = test_data_dir / "header_anno_29_test.xlsx" 38 assert f.exists()
45 assert annotation_file.exists(), f"Sample annotation file not found: {annotation_file}" 39 return str(f)
46 return str(annotation_file)
47 40
48 @pytest.fixture(scope="class") 41 @pytest.fixture(scope="class")
49 def parsed_clusters(self, sample_cluster_file, sample_annotation_file): 42 def parsed_clusters(self, sample_cluster_file, sample_annotation_file):
50 """Parse the sample cluster file with annotations."""
51 return parse_cluster_file(sample_cluster_file, sample_annotation_file) 43 return parse_cluster_file(sample_cluster_file, sample_annotation_file)
52 44
45
53 def test_cluster_parsing_structure(self, parsed_clusters): 46 def test_cluster_parsing_structure(self, parsed_clusters):
54 """ 47 assert len(parsed_clusters) == 514
55 Test 1: Cluster File Parsing Structure
56
57 Verifies that cluster files are correctly parsed into the expected data structure
58 with proper extraction of headers, counts, similarities, and cluster groupings.
59 """
60 # Should have 4 clusters based on sample data
61 # for x in parsed_clusters: print(x);
62 assert len(parsed_clusters) == 24, f"Expected 24 clusters, got {len(parsed_clusters)}"
63
64 # Test Cluster 0 structure (3 members)
65 cluster_0 = parsed_clusters[0] 48 cluster_0 = parsed_clusters[0]
66 assert len(cluster_0) == 41, "Cluster 0 should have 41 members" 49 assert len(cluster_0) == 430
67 cluster_3 = parsed_clusters[3] 50
68 assert len(cluster_3) == 4, "Cluster 3 should have 4 members" 51 read = cluster_0["M01687:460:000000000-LGY9G:1:1101:8356:6156_CONS"]
69 52 assert read["count"] == 19
70 # Check specific member data 53 assert isinstance(read["similarity"], float)
71 assert 'M01687:476:000000000-LL5F5:1:2119:23468:21624_CONS' in cluster_0, "this read should be in cluster 0" 54
72 read1_data = cluster_0['M01687:476:000000000-LL5F5:1:2119:23468:21624_CONS'] 55 def test_annotation_integration_basic(self, parsed_clusters):
73 assert read1_data['count'] == 1, "read1 count should be 1"
74 assert read1_data['similarity'] == 97.78, "read1 should be representative (100% similarity)"
75 assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in read1_data['taxa'], "read1 should have this taxa"
76
77 # Check non-representative member
78 assert 'M01687:476:000000000-LL5F5:1:1107:11168:7701_CONS' in cluster_0, "this read should be in cluster 0"
79 read2_data = cluster_0['M01687:476:000000000-LL5F5:1:1107:11168:7701_CONS']
80 assert read2_data['count'] == 1, "read2 count should be 50"
81 assert read2_data['similarity'] == 100, "read2 similarity should be 100%"
82 assert read2_data['taxa'] == "Unannotated read"
83
84 # Test single-member cluster (Cluster 2)
85 cluster_2 = parsed_clusters[2]
86 assert len(cluster_2) == 1, "Cluster 2 should have 1 member"
87 assert 'M01687:476:000000000-LL5F5:1:2108:17627:10678_CONS' in cluster_2, "this read should be in cluster 2"
88
89 print("✓ Test 1 PASSED: Cluster file parsing structure correct")
90
91 def test_annotation_integration(self, parsed_clusters):
92 """
93 Test 2: Annotation Integration
94
95 Verifies that annotations from the separate annotation file are correctly
96 matched to cluster members based on header names.
97 """
98 # Check that annotations were properly integrated
99 cluster_0 = parsed_clusters[0] 56 cluster_0 = parsed_clusters[0]
100 57
101 # Verify e-values are correctly assigned 58 annotated_found = any(
102 assert cluster_0['M01687:476:000000000-LL5F5:1:1102:8813:1648_CONS']['evalue'] == 1.41e-39, "read1 e-value incorrect" 59 data["taxa"] != "Unannotated read" for data in cluster_0.values()
103 assert cluster_0['M01687:476:000000000-LL5F5:1:1102:23329:6743_CONS']['evalue'] == 2.32e-37, "read2 e-value incorrect" 60 )
104 assert cluster_0['M01687:476:000000000-LL5F5:1:1102:22397:8283_CONS']['evalue'] == 2.32e-37, "read3 e-value incorrect" 61 assert annotated_found, "At least one annotated read expected"
105 62
106 # Verify taxa assignments 63
107 assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:8813:1648_CONS']['taxa'], "read1 taxa incorrect" 64 def test_process_cluster_data_counts_and_taxa_map(self, parsed_clusters):
108 assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:23329:6743_CONS']['taxa'], "read2 taxa incorrect" 65 sim, taxa_map, annotated, unannotated = process_cluster_data(parsed_clusters[0])
109 assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:22397:8283_CONS']['taxa'], "read3 taxa incorrect" 66
110 67 assert isinstance(sim, list)
111 # Test missing annotation handling (if any reads lack annotations) 68 assert annotated + unannotated == sum(d["count"] for d in parsed_clusters[0].values())
112 # All our test reads have annotations, so this tests the default case 69 assert isinstance(taxa_map, dict)
113 for cluster in parsed_clusters: 70 assert annotated == 47004 and unannotated == 9
114 for header, data in cluster.items(): 71
115 if data['evalue'] == 'Unannotated read': 72
116 assert data['taxa'] == 'Unannotated read', "Unannotated handling incorrect" 73 def test_weighted_lca_splitting_on_uncertain_taxa(self):
117 74 taxa_dict = {
118 print("✓ Test 2 PASSED: Annotations correctly integrated with cluster data") 75 "K / P / C / O / F / G1 / S1": 60,
119 76 "K / P / C / O / F / Uncertain taxa / Uncertain taxa": 60,
120 def test_cluster_data_processing(self, parsed_clusters): 77 }
121 """ 78
122 Test 3: Cluster Data Processing 79 class ArgsLow:
123
124 Tests the processing of individual clusters to extract evaluation lists,
125 similarity lists, and taxa dictionaries with correct count aggregation.
126 """
127 # Test processing of Cluster 0 (mixed taxa)
128 cluster_0 = parsed_clusters[0]
129 eval_list, simi_list, taxa_dict = process_cluster_data(cluster_0)
130
131 # Check eval_list structure
132 # for x in eval_list: print(x)
133 assert eval_list[0] == 2, "Two unannotated reads in this cluster, should be 2"
134 assert len(eval_list) == 409, "Should have 409 annotated reads + 2 unnanotated reads (counted as 1)"
135
136 # Check that e-values are correctly converted and repeated by count
137 eval_values = eval_list[1:] # Skip unannotated count
138 read1_evals = [e for e in eval_values if e == 1.41e-39]
139 assert len(read1_evals) == 365, "Should have 100 instances of read1's e-value"
140
141 # # Check similarity list
142 # for x in simi_list: print(x)
143 assert len(simi_list) == 410, "Should have 410 similarity values"
144 read1_similarities = [s for s in simi_list if s == 100.0]
145 assert len(read1_similarities) == 2, "Should have 2 instances of 100% similarity"
146
147 assert taxa_dict['Unannotated read'] == 2, "Unannotated reads should be 2"
148 assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa'] == 406, "taxa should be 406"
149 assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Uncertain taxa / Uncertain taxa / Uncertain taxa'] == 1, "taxa should be 1"
150 assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Actinidia / Actinidia kolomikta'] == 1, "taxa should be 1"
151 print("✓ Test 3 PASSED: Cluster data processing produces correct aggregated data")
152
153 def test_taxa_calculation_simple_case(self, parsed_clusters):
154 """
155 Test 4: Taxa Calculation - Simple Case
156
157 Tests taxonomic resolution for clusters with clear dominant taxa
158 (single taxa or overwhelming majority).
159 """
160
161 # Create test arguments
162 class TestArgs:
163 uncertain_taxa_use_ratio = 0.5 80 uncertain_taxa_use_ratio = 0.5
164 min_to_split = 0.45 81 min_to_split = 0.45
165 min_count_to_split = 10 82 min_count_to_split = 10
166 83
167 args = TestArgs() 84 class ArgsHigh:
168 85 uncertain_taxa_use_ratio = 1.0
169 # Test Cluster 1 (should be clear Archaea) 86 min_to_split = 0.45
170 cluster_5 = parsed_clusters[5] 87 min_count_to_split = 10
171 _, _, taxa_dict_5 = process_cluster_data(cluster_5) 88
172 89 # LOW weight → uncertain counts half → G1 wins → no split
173 result_5 = calculate_cluster_taxa(taxa_dict_5, args) 90 res_low = calculate_cluster_taxa(taxa_dict, ArgsLow())
174 # Should return single taxa group for Archaea 91 assert len(res_low) == 1
175 assert len(result_5) == 1, "Single dominant taxa should not split" 92 assert sum(res_low[0].values()) == 60 # total preserved
176 dominant_taxa = list(result_5[0].keys())[0] 93
177 assert 'Viridiplantae / Streptophyta / Magnoliopsida / Fagales / Juglandaceae / ' \ 94 # HIGH weight → uncertain = full weight → equal → split
178 'Uncertain taxa / Uncertain taxa' in dominant_taxa, "Should identify Juglandaceae as dominant" 95 res_high = calculate_cluster_taxa(taxa_dict, ArgsHigh())
179 96 assert len(res_high) == 2
180 # Test single-member cluster (Cluster 2) 97 total = sum(sum(g.values()) for g in res_high)
181 cluster_2 = parsed_clusters[2] 98 assert total == 120
182 _, _, taxa_dict_2 = process_cluster_data(cluster_2) 99
183 100
184 result_2 = calculate_cluster_taxa(taxa_dict_2, args) 101 def test_calculate_cluster_taxa_preserves_counts_real_cluster(self, parsed_clusters):
185 total = sum(value for d in result_2 for value in d.values()) 102 sim, taxa_map, annotated, unannotated = process_cluster_data(parsed_clusters[3])
186 assert total == 1, "Single member cluster should not split" 103
187 104
188 print("✓ Test 4 PASSED: Simple taxa calculation cases work correctly") 105 raw_total = annotated + unannotated
189 106 taxa_map_total = sum(info["count"] for info in taxa_map.values())
190 def test_taxa_calculation_complex_splitting(self, parsed_clusters): 107 assert raw_total == taxa_map_total
191 """ 108
192 Test 5: Taxa Calculation - Complex Splitting 109 class Args:
193
194 Tests the recursive taxonomic resolution algorithm for clusters with
195 multiple competing taxa that should be split based on thresholds.
196 """
197
198 class TestArgs:
199 uncertain_taxa_use_ratio = 0.5 110 uncertain_taxa_use_ratio = 0.5
200 min_to_split = 0.30 # Lower threshold to encourage splitting 111 min_to_split = 0.3
201 min_count_to_split = 5 # Lower threshold to encourage splitting 112 min_count_to_split = 5
202 113
203 args = TestArgs() 114
204 115 results = calculate_cluster_taxa({t: i["count"] for t, i in taxa_map.items()}, Args())
205 # Test Cluster 3 (mixed Firmicutes and Proteobacteria) 116
206 cluster_3 = parsed_clusters[3] 117
207 _, _, taxa_dict_3 = process_cluster_data(cluster_3) 118 resolved_total = sum(sum(group.values()) for group in results)
208 119 assert resolved_total <= raw_total
209 # Manual check of expected taxa distribution 120 assert resolved_total > 0
210 expected_taxa = {} 121
211 for header, data in cluster_3.items(): 122
212 taxa = data['taxa'] 123 def test_write_similarity_and_count_outputs(self, tmp_path, parsed_clusters):
213 count = data['count'] 124 out_simi = tmp_path / "simi.txt"
214 expected_taxa[taxa] = expected_taxa.get(taxa, 0) + count 125 out_count = tmp_path / "count.txt"
215 126
216 result_3 = calculate_cluster_taxa(taxa_dict_3, args)
217
218 # With mixed taxa and low thresholds, should potentially split
219 # The exact behavior depends on the algorithm implementation
220 total_result_count = sum(sum(group.values()) for group in result_3)
221 expected_total = sum(expected_taxa.values())
222
223 assert total_result_count == expected_total, "Total counts should be preserved after splitting"
224
225 print("✓ Test 5 PASSED: Complex taxa splitting preserves counts and follows thresholds")
226
227 def test_statistical_calculations(self, parsed_clusters):
228 """
229 Test 6: Statistical Calculations
230
231 Verifies that similarity and e-value statistics are calculated correctly
232 including averages, standard deviations, and distributions.
233 """
234 # Process all clusters to get combined data
235
236 eval_list, simi_list, _ = process_cluster_data(parsed_clusters[5])
237 # Test similarity statistics
238 if eval_list:
239 expected_avg = sum(simi_list) / len(simi_list)
240
241 # Manual verification of a few key values
242 # From our test data: read1=100% (100 times), read2=96.67% (50 times), etc.
243 total_similarity_sum = (100.0 * 166) + (98.88 * 9) + 98.86
244 total_count = 176
245 manual_avg = total_similarity_sum / total_count
246
247 assert abs(
248 expected_avg - manual_avg) < 0.01, f"Similarity average mismatch: expected ~{manual_avg}, got {expected_avg}"
249
250 # Test e-value data structure
251 annotated_evals = eval_list[1:]
252 assert all(isinstance(e, (int, float)) for e in annotated_evals), "All e-values should be numeric"
253 assert all(e > 0 for e in annotated_evals), "All e-values should be positive"
254
255 print("✓ Test 6 PASSED: Statistical calculations are mathematically correct")
256
257 def test_output_file_formats(self, test_data_dir, sample_cluster_file, sample_annotation_file):
258 """
259 Test 7: Output File Formats
260
261 Tests that all output files are created with correct structure and content,
262 including text files, Excel files with multiple sheets, and plot files.
263 """
264 output_dir = test_data_dir
265
266 # Parse data
267 clusters = parse_cluster_file(sample_cluster_file, sample_annotation_file)
268
269 # Process all clusters
270 cluster_data_list = [] 127 cluster_data_list = []
271 all_eval_data = [0] 128 all_simi = []
272 all_simi_data = [] 129
273 130 for c in parsed_clusters:
274 for cluster in clusters: 131 sim, taxa_map, annotated, unannotated = process_cluster_data(c)
275 eval_list, simi_list, taxa_dict = process_cluster_data(cluster) 132 cluster_data_list.append(
276 cluster_data_list.append((eval_list, simi_list, taxa_dict)) 133 {
277 all_eval_data[0] += eval_list[0] 134 "similarities": sim,
278 all_eval_data.extend(eval_list[1:]) 135 "taxa_map": taxa_map,
279 all_simi_data.extend(simi_list) 136 "annotated": annotated,
280 137 "unannotated": unannotated,
281 # Test similarity output 138 }
282 simi_output = output_dir / "test_similarity.txt" 139 )
283 write_similarity_output(all_simi_data, str(simi_output)) 140 all_simi.extend(sim)
284 141
285 assert simi_output.exists(), "Similarity output file not created" 142 write_similarity_output(cluster_data_list, str(out_simi))
286 with open(simi_output, 'r') as f: 143 assert out_simi.exists()
287 content = f.read() 144
288 assert "# Average similarity:" in content, "Missing average similarity in output" 145 write_count_output(cluster_data_list, str(out_count))
289 assert "# Standard deviation:" in content, "Missing standard deviation in output" 146 assert out_count.exists()
290 assert "similarity\tcount" in content, "Missing header in similarity output" 147
291 148
292 # Test e-value output 149 def test_write_taxa_excel_raw_and_processed(self, tmp_path, parsed_clusters):
293 eval_output = output_dir / "test_evalue.txt" 150
294 write_evalue_output(all_eval_data, str(eval_output)) 151 class Args:
295
296 assert eval_output.exists(), "E-value output file not created"
297 with open(eval_output, 'r') as f:
298 content = f.read()
299 assert "evalue\tcount" in content, "Missing header in e-value output"
300
301 # Test count output
302 count_output = output_dir / "test_count.txt"
303 write_count_output(all_eval_data, cluster_data_list, str(count_output))
304
305 assert count_output.exists(), "Count output file not created"
306 with open(count_output, 'r') as f:
307 content = f.read()
308 assert "cluster\tunannotated\tannotated" in content, "Missing header in count output"
309 assert "TOTAL\t" in content, "Missing total row in count output"
310
311 # Test taxa clusters Excel output
312 taxa_clusters_output = output_dir / "test_taxa_clusters.xlsx"
313 write_taxa_clusters_output(cluster_data_list, str(taxa_clusters_output))
314
315 assert taxa_clusters_output.exists(), "Taxa clusters Excel file not created"
316 df = pd.read_excel(taxa_clusters_output, sheet_name='Raw_Taxa_Clusters')
317 expected_columns = ['cluster', 'count', 'taxa_full', 'kingdom', 'phylum', 'class', 'order', 'family', 'genus',
318 'species']
319 assert all(col in df.columns for col in expected_columns), "Missing columns in taxa clusters output"
320
321 print("✓ Test 7 PASSED: All output file formats are correct and complete")
322
323 def test_taxa_processed_output_structure(self, test_data_dir, sample_cluster_file, sample_annotation_file):
324 """
325 Test 8: Processed Taxa Output Structure
326
327 Tests the complex processed taxa Excel output with multiple sheets
328 and parameter tracking.
329 """
330 output_dir = test_data_dir
331
332 class TestArgs:
333 uncertain_taxa_use_ratio = 0.6
334 min_to_split = 0.35
335 min_count_to_split = 15
336 show_unannotated_clusters = True
337
338 args = TestArgs()
339
340 # Parse and process data
341 clusters = parse_cluster_file(sample_cluster_file, sample_annotation_file)
342 cluster_data_list = []
343
344 for cluster in clusters:
345 eval_list, simi_list, taxa_dict = process_cluster_data(cluster)
346 cluster_data_list.append((eval_list, simi_list, taxa_dict))
347
348 # Test processed taxa output
349 processed_output = output_dir / "test_processed_taxa.xlsx"
350 write_taxa_processed_output(cluster_data_list, args, str(processed_output))
351
352 assert processed_output.exists(), "Processed taxa Excel file not created"
353
354 # Check multiple sheets exist
355 xl_file = pd.ExcelFile(processed_output)
356 expected_sheets = ['Processed_Taxa_Clusters', 'Settings']
357 assert all(sheet in xl_file.sheet_names for sheet in expected_sheets), "Missing sheets in processed taxa output"
358
359 # Check main data sheet
360 df_main = pd.read_excel(processed_output, sheet_name='Processed_Taxa_Clusters')
361 expected_columns = ['cluster', 'count', 'taxa_full', 'kingdom', 'phylum', 'class', 'order', 'family', 'genus',
362 'species']
363 assert all(col in df_main.columns for col in expected_columns), "Missing columns in processed taxa sheet"
364
365 # Check settings sheet
366 df_settings = pd.read_excel(processed_output, sheet_name='Settings')
367 assert 'Parameter' in df_settings.columns, "Missing Parameter column in settings"
368 assert 'Value' in df_settings.columns, "Missing Value column in settings"
369
370 # Verify settings values are recorded
371 settings_dict = dict(zip(df_settings['Parameter'], df_settings['Value']))
372 assert settings_dict['uncertain_taxa_use_ratio'] == 0.6, "Settings not correctly recorded"
373 assert settings_dict['min_to_split'] == 0.35, "Settings not correctly recorded"
374
375 print("✓ Test 8 PASSED: Processed taxa output has correct structure and settings tracking")
376
377 def test_edge_cases(self, test_data_dir):
378 """
379 Test 9: Edge Cases and Error Handling
380
381 Tests handling of edge cases like empty files, missing annotations,
382 single-member clusters, and malformed input data.
383 """
384 input_dir = test_data_dir
385
386 # Test empty cluster file
387 empty_cluster = input_dir / "empty_cluster.clstr"
388 with open(empty_cluster, 'w') as f:
389 f.write("")
390
391 clusters_empty = parse_cluster_file(str(empty_cluster))
392 assert len(clusters_empty) == 0, "Empty cluster file should produce no clusters"
393
394 # Test cluster file with no annotations
395 simple_cluster = input_dir / "simple_cluster.clstr"
396 simple_cluster_content = """>Cluster 0
397 0 100nt, >read_no_anno:50... *
398 """
399 with open(simple_cluster, 'w') as f:
400 f.write(simple_cluster_content)
401
402 with pytest.raises(UnboundLocalError):
403 parse_cluster_file(str(simple_cluster), raise_on_error=True)
404
405 # Test malformed cluster entries (missing parts)
406 malformed_cluster = input_dir / "malformed_cluster.clstr"
407 malformed_content = """>Cluster 0
408 0 100nt, >read1:50..._CONS(50) *
409 invalid_line_without_proper_format
410 1 90nt, >read2:25..._CONS(25) at /+/95%
411 """
412 annotations_malformed = input_dir / "test_pytest.xlsx"
413 with open(malformed_cluster, 'w') as f:
414 f.write(malformed_content)
415
416 clusters_malformed = parse_cluster_file(str(malformed_cluster), str(annotations_malformed))
417 # Should still parse valid entries and skip invalid ones
418 assert len(clusters_malformed) == 1, "Should parse valid entries from malformed file"
419 assert len(clusters_malformed[0]) == 2, "Should have 2 valid read"
420 assert clusters_malformed[0]['read1:50..._CONS']['evalue'] == 1.0e-50
421 assert clusters_malformed[0]['read2:25..._CONS']['count'] == 25
422
423 print("✓ Test 9 PASSED: Edge cases handled gracefully without crashes")
424
425 def test_count_preservation_across_processing(self, parsed_clusters):
426 """
427 Test 10: Count Preservation Across Processing Pipeline
428
429 Verifies that read counts are preserved throughout the entire processing
430 pipeline from cluster parsing through taxa calculation to final output.
431 """
432 # Calculate expected total counts from original data
433 expected_total = 0
434 for cluster in parsed_clusters:
435 for header, data in cluster.items():
436 expected_total += data['count']
437
438 # Process through pipeline and verify counts at each stage
439 total_from_processing = 0
440 taxa_processing_totals = []
441
442 class TestArgs:
443 uncertain_taxa_use_ratio = 0.5 152 uncertain_taxa_use_ratio = 0.5
444 min_to_split = 0.45 153 min_to_split = 0.45
445 min_count_to_split = 10 154 min_count_to_split = 10
446 155 min_cluster_support = 1
447 args = TestArgs() 156 make_taxa_in_cluster_split = False
448 157
449 for cluster in parsed_clusters: 158 cluster_data_list = []
450 eval_list, simi_list, taxa_dict = process_cluster_data(cluster) 159 for c in parsed_clusters:
451 160 sim, taxa_map, annotated, unannotated = process_cluster_data(c)
452 # Check that cluster processing preserves counts 161 cluster_data_list.append(
453 cluster_total = eval_list[0] + len(eval_list[1:]) # unannotated + annotated 162 {
454 cluster_expected = sum(data['count'] for data in cluster.values()) 163 "similarities": sim,
455 assert cluster_total == cluster_expected, f"Count mismatch in cluster processing: expected {cluster_expected}, got {cluster_total}" 164 "taxa_map": taxa_map,
456 165 "annotated": annotated,
457 total_from_processing += cluster_total 166 "unannotated": unannotated,
458 167 }
459 # Check taxa calculation preserves counts 168 )
460 taxa_results = calculate_cluster_taxa(taxa_dict, args) 169
461 taxa_total = sum(sum(group.values()) for group in taxa_results) 170 out = tmp_path / "taxa.xlsx"
462 taxa_processing_totals.append(taxa_total) 171 write_taxa_excel(
463 172 cluster_data_list, Args(), str(out), write_raw=True, write_processed=True
464 # Verify taxa dict total matches 173 )
465 taxa_dict_total = sum(taxa_dict.values()) 174
466 assert taxa_total <= taxa_dict_total, f"Count mismatch in taxa calculation: expected {taxa_dict_total}, got {taxa_total}" 175 xl = pd.ExcelFile(out)
467 176 assert "Raw_Taxa_Clusters" in xl.sheet_names
468 # Final verification 177 assert "Processed_Taxa_Clusters" in xl.sheet_names
469 assert total_from_processing == expected_total, f"Total count preservation failed: expected {expected_total}, got {total_from_processing}" 178 assert "Settings" in xl.sheet_names
470 179
471 # Verify sum of all taxa processing equals original 180 def test_write_taxa_excel_only_raw_or_only_processed(self, tmp_path, parsed_clusters):
472 total_taxa_processed = sum(taxa_processing_totals) 181
473 assert total_taxa_processed <= expected_total, f"Taxa processing total mismatch: expected {expected_total}, got {total_taxa_processed}" 182 class Args:
474 183 uncertain_taxa_use_ratio = 0.5
475 print("✓ Test 10 PASSED: Read counts preserved throughout entire processing pipeline") 184 min_to_split = 0.45
476 185 min_count_to_split = 10
477 def test_11_parse_arguments_all_flags(self, tmp_path): 186 min_cluster_support = 1
478 """ 187 make_taxa_in_cluster_split = False
479 Test 11: Argument Parsing with All Flags 188
480 189 cluster_data_list = []
481 Ensures parse_arguments correctly handles all optional flags and values. 190 for c in parsed_clusters:
482 """ 191 sim, taxa_map, annotated, unannotated = process_cluster_data(c)
192 cluster_data_list.append(
193 {
194 "similarities": sim,
195 "taxa_map": taxa_map,
196 "annotated": annotated,
197 "unannotated": unannotated,
198 }
199 )
200
201
202 out_raw = tmp_path / "raw.xlsx"
203 write_taxa_excel(cluster_data_list, Args(), str(out_raw), write_raw=True, write_processed=False)
204 xl_raw = pd.ExcelFile(out_raw)
205 assert "Raw_Taxa_Clusters" in xl_raw.sheet_names
206 assert "Processed_Taxa_Clusters" not in xl_raw.sheet_names
207
208
209 out_proc = tmp_path / "proc.xlsx"
210 write_taxa_excel(cluster_data_list, Args(), str(out_proc), write_raw=False, write_processed=True)
211 xl_proc = pd.ExcelFile(out_proc)
212 assert "Processed_Taxa_Clusters" in xl_proc.sheet_names
213
214
215 def test_parse_arguments_all_flags(self, tmp_path):
483 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca 216 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
484 args = ca.parse_arguments([ 217 args = ca.parse_arguments([
485 '--input_cluster', str(tmp_path / "dummy.clstr"), 218 "--input_cluster", str(tmp_path / "dummy.clstr"),
486 '--simi_plot_y_min', '90', 219 "--simi_plot_y_min", "90",
487 '--simi_plot_y_max', '99', 220 "--simi_plot_y_max", "99",
488 '--uncertain_taxa_use_ratio', '0.3', 221 "--uncertain_taxa_use_ratio", "0.3",
489 '--min_to_split', '0.2', 222 "--min_to_split", "0.2",
490 '--min_count_to_split', '5', 223 "--min_count_to_split", "5",
491 '--show_unannotated_clusters', 224 "--output_excel", str(tmp_path / "report.xlsx"),
492 '--make_taxa_in_cluster_split',
493 '--print_empty_files'
494 ]) 225 ])
495 assert args.simi_plot_y_min == 90 226 assert args.simi_plot_y_min == 90
496 assert args.print_empty_files is True 227 assert args.simi_plot_y_max == 99
497 228
498 def test_12_process_cluster_data_valueerror(self): 229 def test_main_runs_and_creates_outputs(self, tmp_path):
499 """
500 Test 12: Process Cluster Data with Bad E-value
501
502 Ensures ValueError branches are handled and unannotated counts increase.
503 """
504 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca 230 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
505 cluster = { 231
506 "seq1": {"count": 1, "similarity": 95.0, "taxa": "taxonA", "evalue": "not_a_number"} 232 clstr = tmp_path / "simple.clstr"
507 } 233 clstr.write_text(">Cluster 0\n0\t88nt, >read1_CONS(3)... *\n")
508 eval_list, simi_list, taxa_dict = ca.process_cluster_data(cluster) 234
509 assert eval_list[0] == 1 # unannotated read 235 anno = tmp_path / "anno.xlsx"
510 236 df = pd.DataFrame([
511 def test_13_write_similarity_and_evalue_empty(self, tmp_path): 237 {
512 """ 238 "header": "read1_CONS",
513 Test 13: Output Writers with Empty Data 239 "seq_id": "SEQ001",
514 """ 240 "source": "Genbank",
515 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca 241 "taxa": "K / P / C / O / F / G / S",
242 }
243 ])
244 with pd.ExcelWriter(anno) as w:
245 df.to_excel(w, sheet_name="Individual_Reads", index=False)
246
516 sim_file = tmp_path / "sim.txt" 247 sim_file = tmp_path / "sim.txt"
517 eval_file = tmp_path / "eval.txt" 248 excel_file = tmp_path / "taxa.xlsx"
518 249 args = [
519 ca.write_similarity_output([], str(sim_file)) 250 "--input_cluster", str(clstr),
520 assert not sim_file.exists() or sim_file.read_text() == "" 251 "--input_annotation", str(anno),
521 252 "--output_similarity_txt", str(sim_file),
522 ca.write_evalue_output([5], str(eval_file)) 253 "--output_excel", str(excel_file),
523 assert "unannotated" in eval_file.read_text() 254 '--output_taxa_clusters',
524 255 '--output_taxa_processed',
525 def test_14_write_count_zero_and_taxa_clusters_incomplete(self, tmp_path): 256 '--log_file', 'test-data/new_logs.txt',
526 """ 257 '--simi_plot_y_min', '95',
527 Test 14: Count Writer with Zero Data and Taxa Clusters with Incomplete Taxa 258 '--simi_plot_y_max', '100',
528 """ 259 '--uncertain_taxa_use_ratio', '0.5',
529 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca 260 '--min_to_split', '0.45',
530 count_file = tmp_path / "count.txt" 261 '--min_count_to_split', '10',
531 taxa_file = tmp_path / "taxa.xlsx" 262 '--min_cluster_support', '1'
532 263 ]
533 ca.write_count_output([0], [], str(count_file)) 264
534 assert "TOTAL" in count_file.read_text() 265 ca.main(args)
535 266 assert sim_file.exists()
536 cluster_data = [([0], [], {"bad": 1})] 267 assert excel_file.exists()
537 ca.write_taxa_clusters_output(cluster_data, str(taxa_file)) 268
538 assert taxa_file.exists() 269 def test_parse_cluster_file_empty_and_no_annotation(self, tmp_path):
539 270 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis2 as ca
540 def test_15_write_taxa_processed_uncertain_and_settings(self, tmp_path): 271
541 """ 272 empty = tmp_path / "empty.clstr"
542 Test 15: Processed Taxa Output with Settings 273 empty.write_text("")
543 """ 274
544 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca 275 clusters = ca.parse_cluster_file(str(empty), annotation_file=None, log_messages=[])
276 assert clusters == []
277
278 def test_create_similarity_plot_creates_file(self, tmp_path, parsed_clusters):
279 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis2 as ca
280
281
282 cluster_data_list = []
283 all_simi = []
284 lengths = []
285
286 for c in parsed_clusters[:5]:
287 sim, taxa_map, annotated, unannotated = process_cluster_data(c)
288 cluster_data_list.append(
289 {"similarities": sim, "taxa_map": taxa_map,
290 "annotated": annotated, "unannotated": unannotated}
291 )
292 if sim:
293 all_simi.extend(sim)
294 lengths.append(len(sim))
545 295
546 class Args: 296 class Args:
547 uncertain_taxa_use_ratio = 0.5 297 simi_plot_y_min = 95.0
548 min_to_split = 0.2 298 simi_plot_y_max = 100.0
549 min_count_to_split = 2 299
550 show_unannotated_clusters = True 300 out_png = tmp_path / "sim.png"
551 301 ca.create_similarity_plot(all_simi, lengths, Args(), str(out_png))
552 out_file = tmp_path / "processed.xlsx" 302 if all_simi:
553 cluster_data = [([0], [], {"Unannotated read": 2})] 303 assert out_png.exists()
554 ca.write_taxa_processed_output(cluster_data, Args(), str(out_file))
555 assert out_file.exists()
556
557 def test_16_create_evalue_plot_edge_cases(self, tmp_path):
558 """
559 Test 16: E-value Plot Edge Cases
560 """
561 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
562 out = tmp_path / "plot.png"
563
564 # Only unannotated
565 ca.create_evalue_plot([0], [0], str(out))
566 assert not out.exists() or out.stat().st_size == 0
567
568 # Empty after filtering
569 ca.create_evalue_plot([0, ], [], str(out))
570 assert not out.exists() or out.stat().st_size == 0
571
572 # With valid values
573 ca.create_evalue_plot([0, 1e-5, 1e-3], [2], str(out))
574 assert out.exists()
575
576 def test_17_main_runs_and_prints(self, tmp_path, capsys):
577 """
578 Test 17: Main Entry Point
579 """
580 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
581 clstr = tmp_path / "simple.clstr"
582 clstr.write_text(">Cluster 0\n0 100nt, >seq1... *\n")
583
584 out = tmp_path / "sim.txt"
585 args = [
586 '--input_cluster', str(clstr),
587 '--output_similarity_txt', str(out)
588 ]
589 ca.main(args)
590 captured = capsys.readouterr()
591 assert "Processing complete" in captured.out
592
593
594 def test_18a_prepare_evalue_histogram_valid_data(self):
595 """
596 Test 18a: prepare_evalue_histogram returns correct counts/bins.
597 """
598 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
599 counts, bins = ca.prepare_evalue_histogram([1e-5, 1e-3, 0.5], [])
600 assert counts.sum() == 3 # 3 entries counted
601 assert len(bins) == 51 # 50 bins => 51 edges
602
603 def test_18b_prepare_evalue_histogram_empty(self):
604 """
605 Test 18b: prepare_evalue_histogram with empty/invalid data returns (None, None).
606 """
607 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
608 counts, bins = ca.prepare_evalue_histogram([0, None, "bad"], [])
609 assert counts is None
610 assert bins is None
611
612 def test_18c_create_evalue_plot_creates_file_and_returns_data(self, tmp_path):
613 """
614 Test 18c: create_evalue_plot saves a PNG and returns numeric data.
615 """
616 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
617 out = tmp_path / "eval.png"
618 counts, bins = ca.create_evalue_plot_test([1e-5, 1e-3, 0.5], [], str(out))
619 assert out.exists()
620 assert counts.sum() == 3
621 assert len(bins) == 51
622
623
624 if __name__ == "__main__":
625 # Run all tests in this file
626 pytest.main([__file__])