comparison tests/test_cdhit_analysis.py @ 0:00d56396b32a draft

planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_clusters_tool commit c944fd5685f295acba06679e85b67973c173b137
author onnodg
date Tue, 14 Oct 2025 09:09:46 +0000
parents
children ff68835adb2b
comparison
equal deleted inserted replaced
-1:000000000000 0:00d56396b32a
1 """
2 Test suite for CD-HIT cluster analysis processor.
3 """
4
5 import pytest
6 from pathlib import Path
7 import pandas as pd
8 import os
9 import sys
10
11 # Add module path
12 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
13 from Stage_1_translated.NLOOR_scripts.process_clusters_tool.cdhit_analysis import (
14 parse_cluster_file,
15 process_cluster_data,
16 calculate_cluster_taxa,
17 write_similarity_output,
18 write_evalue_output,
19 write_count_output,
20 write_taxa_clusters_output,
21 write_taxa_processed_output,
22 )
23
24 class TestCDHitAnalysis:
25 """Test class for CD-HIT cluster analysis processor using real XLSX test data."""
26
27 @pytest.fixture(scope="class")
28 def test_data_dir(self):
29 """Return path to the test-data directory with real XLSX files."""
30 base_dir = Path("Stage_1_translated/NLOOR_scripts/process_clusters_tool/test-data")
31 assert base_dir.exists(), f"Test data directory does not exist: {base_dir}"
32 return base_dir
33
34 @pytest.fixture(scope="class")
35 def sample_cluster_file(self, test_data_dir):
36 """Return path to the sample cluster XLSX file."""
37 cluster_file = test_data_dir / "29-test.clstr.txt"
38 assert cluster_file.exists(), f"Sample cluster file not found: {cluster_file}"
39 return str(cluster_file)
40
41 @pytest.fixture(scope="class")
42 def sample_annotation_file(self, test_data_dir):
43 """Return path to the sample annotation XLSX file."""
44 annotation_file = test_data_dir / "header_anno_29_test.xlsx"
45 assert annotation_file.exists(), f"Sample annotation file not found: {annotation_file}"
46 return str(annotation_file)
47
48 @pytest.fixture(scope="class")
49 def parsed_clusters(self, sample_cluster_file, sample_annotation_file):
50 """Parse the sample cluster file with annotations."""
51 return parse_cluster_file(sample_cluster_file, sample_annotation_file)
52
53 def test_cluster_parsing_structure(self, parsed_clusters):
54 """
55 Test 1: Cluster File Parsing Structure
56
57 Verifies that cluster files are correctly parsed into the expected data structure
58 with proper extraction of headers, counts, similarities, and cluster groupings.
59 """
60 # Should have 4 clusters based on sample data
61 # for x in parsed_clusters: print(x);
62 assert len(parsed_clusters) == 24, f"Expected 24 clusters, got {len(parsed_clusters)}"
63
64 # Test Cluster 0 structure (3 members)
65 cluster_0 = parsed_clusters[0]
66 assert len(cluster_0) == 41, "Cluster 0 should have 41 members"
67 cluster_3 = parsed_clusters[3]
68 assert len(cluster_3) == 4, "Cluster 3 should have 4 members"
69
70 # Check specific member data
71 assert 'M01687:476:000000000-LL5F5:1:2119:23468:21624_CONS' in cluster_0, "this read should be in cluster 0"
72 read1_data = cluster_0['M01687:476:000000000-LL5F5:1:2119:23468:21624_CONS']
73 assert read1_data['count'] == 1, "read1 count should be 1"
74 assert read1_data['similarity'] == 97.78, "read1 should be representative (100% similarity)"
75 assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in read1_data['taxa'], "read1 should have this taxa"
76
77 # Check non-representative member
78 assert 'M01687:476:000000000-LL5F5:1:1107:11168:7701_CONS' in cluster_0, "this read should be in cluster 0"
79 read2_data = cluster_0['M01687:476:000000000-LL5F5:1:1107:11168:7701_CONS']
80 assert read2_data['count'] == 1, "read2 count should be 50"
81 assert read2_data['similarity'] == 100, "read2 similarity should be 100%"
82 assert read2_data['taxa'] == "Unannotated read"
83
84 # Test single-member cluster (Cluster 2)
85 cluster_2 = parsed_clusters[2]
86 assert len(cluster_2) == 1, "Cluster 2 should have 1 member"
87 assert 'M01687:476:000000000-LL5F5:1:2108:17627:10678_CONS' in cluster_2, "this read should be in cluster 2"
88
89 print("✓ Test 1 PASSED: Cluster file parsing structure correct")
90
91 def test_annotation_integration(self, parsed_clusters):
92 """
93 Test 2: Annotation Integration
94
95 Verifies that annotations from the separate annotation file are correctly
96 matched to cluster members based on header names.
97 """
98 # Check that annotations were properly integrated
99 cluster_0 = parsed_clusters[0]
100
101 # Verify e-values are correctly assigned
102 assert cluster_0['M01687:476:000000000-LL5F5:1:1102:8813:1648_CONS']['evalue'] == 1.41e-39, "read1 e-value incorrect"
103 assert cluster_0['M01687:476:000000000-LL5F5:1:1102:23329:6743_CONS']['evalue'] == 2.32e-37, "read2 e-value incorrect"
104 assert cluster_0['M01687:476:000000000-LL5F5:1:1102:22397:8283_CONS']['evalue'] == 2.32e-37, "read3 e-value incorrect"
105
106 # Verify taxa assignments
107 assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:8813:1648_CONS']['taxa'], "read1 taxa incorrect"
108 assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:23329:6743_CONS']['taxa'], "read2 taxa incorrect"
109 assert 'Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa' in cluster_0['M01687:476:000000000-LL5F5:1:1102:22397:8283_CONS']['taxa'], "read3 taxa incorrect"
110
111 # Test missing annotation handling (if any reads lack annotations)
112 # All our test reads have annotations, so this tests the default case
113 for cluster in parsed_clusters:
114 for header, data in cluster.items():
115 if data['evalue'] == 'Unannotated read':
116 assert data['taxa'] == 'Unannotated read', "Unannotated handling incorrect"
117
118 print("✓ Test 2 PASSED: Annotations correctly integrated with cluster data")
119
120 def test_cluster_data_processing(self, parsed_clusters):
121 """
122 Test 3: Cluster Data Processing
123
124 Tests the processing of individual clusters to extract evaluation lists,
125 similarity lists, and taxa dictionaries with correct count aggregation.
126 """
127 # Test processing of Cluster 0 (mixed taxa)
128 cluster_0 = parsed_clusters[0]
129 eval_list, simi_list, taxa_dict = process_cluster_data(cluster_0)
130
131 # Check eval_list structure
132 # for x in eval_list: print(x)
133 assert eval_list[0] == 2, "Two unannotated reads in this cluster, should be 2"
134 assert len(eval_list) == 409, "Should have 409 annotated reads + 2 unnanotated reads (counted as 1)"
135
136 # Check that e-values are correctly converted and repeated by count
137 eval_values = eval_list[1:] # Skip unannotated count
138 read1_evals = [e for e in eval_values if e == 1.41e-39]
139 assert len(read1_evals) == 365, "Should have 100 instances of read1's e-value"
140
141 # # Check similarity list
142 # for x in simi_list: print(x)
143 assert len(simi_list) == 410, "Should have 410 similarity values"
144 read1_similarities = [s for s in simi_list if s == 100.0]
145 assert len(read1_similarities) == 2, "Should have 2 instances of 100% similarity"
146
147 assert taxa_dict['Unannotated read'] == 2, "Unannotated reads should be 2"
148 assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Uncertain taxa / Uncertain taxa'] == 406, "taxa should be 406"
149 assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Uncertain taxa / Uncertain taxa / Uncertain taxa'] == 1, "taxa should be 1"
150 assert taxa_dict['Viridiplantae / Streptophyta / Magnoliopsida / Ericales / Actinidiaceae / Actinidia / Actinidia kolomikta'] == 1, "taxa should be 1"
151 print("✓ Test 3 PASSED: Cluster data processing produces correct aggregated data")
152
153 def test_taxa_calculation_simple_case(self, parsed_clusters):
154 """
155 Test 4: Taxa Calculation - Simple Case
156
157 Tests taxonomic resolution for clusters with clear dominant taxa
158 (single taxa or overwhelming majority).
159 """
160
161 # Create test arguments
162 class TestArgs:
163 uncertain_taxa_use_ratio = 0.5
164 min_to_split = 0.45
165 min_count_to_split = 10
166
167 args = TestArgs()
168
169 # Test Cluster 1 (should be clear Archaea)
170 cluster_5 = parsed_clusters[5]
171 _, _, taxa_dict_5 = process_cluster_data(cluster_5)
172
173 result_5 = calculate_cluster_taxa(taxa_dict_5, args)
174 # Should return single taxa group for Archaea
175 assert len(result_5) == 1, "Single dominant taxa should not split"
176 dominant_taxa = list(result_5[0].keys())[0]
177 assert 'Viridiplantae / Streptophyta / Magnoliopsida / Fagales / Juglandaceae / ' \
178 'Uncertain taxa / Uncertain taxa' in dominant_taxa, "Should identify Juglandaceae as dominant"
179
180 # Test single-member cluster (Cluster 2)
181 cluster_2 = parsed_clusters[2]
182 _, _, taxa_dict_2 = process_cluster_data(cluster_2)
183
184 result_2 = calculate_cluster_taxa(taxa_dict_2, args)
185 total = sum(value for d in result_2 for value in d.values())
186 assert total == 1, "Single member cluster should not split"
187
188 print("✓ Test 4 PASSED: Simple taxa calculation cases work correctly")
189
190 def test_taxa_calculation_complex_splitting(self, parsed_clusters):
191 """
192 Test 5: Taxa Calculation - Complex Splitting
193
194 Tests the recursive taxonomic resolution algorithm for clusters with
195 multiple competing taxa that should be split based on thresholds.
196 """
197
198 class TestArgs:
199 uncertain_taxa_use_ratio = 0.5
200 min_to_split = 0.30 # Lower threshold to encourage splitting
201 min_count_to_split = 5 # Lower threshold to encourage splitting
202
203 args = TestArgs()
204
205 # Test Cluster 3 (mixed Firmicutes and Proteobacteria)
206 cluster_3 = parsed_clusters[3]
207 _, _, taxa_dict_3 = process_cluster_data(cluster_3)
208
209 # Manual check of expected taxa distribution
210 expected_taxa = {}
211 for header, data in cluster_3.items():
212 taxa = data['taxa']
213 count = data['count']
214 expected_taxa[taxa] = expected_taxa.get(taxa, 0) + count
215
216 result_3 = calculate_cluster_taxa(taxa_dict_3, args)
217
218 # With mixed taxa and low thresholds, should potentially split
219 # The exact behavior depends on the algorithm implementation
220 total_result_count = sum(sum(group.values()) for group in result_3)
221 expected_total = sum(expected_taxa.values())
222
223 assert total_result_count == expected_total, "Total counts should be preserved after splitting"
224
225 print("✓ Test 5 PASSED: Complex taxa splitting preserves counts and follows thresholds")
226
227 def test_statistical_calculations(self, parsed_clusters):
228 """
229 Test 6: Statistical Calculations
230
231 Verifies that similarity and e-value statistics are calculated correctly
232 including averages, standard deviations, and distributions.
233 """
234 # Process all clusters to get combined data
235
236 eval_list, simi_list, _ = process_cluster_data(parsed_clusters[5])
237 # Test similarity statistics
238 if eval_list:
239 expected_avg = sum(simi_list) / len(simi_list)
240
241 # Manual verification of a few key values
242 # From our test data: read1=100% (100 times), read2=96.67% (50 times), etc.
243 total_similarity_sum = (100.0 * 166) + (98.88 * 9) + 98.86
244 total_count = 176
245 manual_avg = total_similarity_sum / total_count
246
247 assert abs(
248 expected_avg - manual_avg) < 0.01, f"Similarity average mismatch: expected ~{manual_avg}, got {expected_avg}"
249
250 # Test e-value data structure
251 annotated_evals = eval_list[1:]
252 assert all(isinstance(e, (int, float)) for e in annotated_evals), "All e-values should be numeric"
253 assert all(e > 0 for e in annotated_evals), "All e-values should be positive"
254
255 print("✓ Test 6 PASSED: Statistical calculations are mathematically correct")
256
257 def test_output_file_formats(self, test_data_dir, sample_cluster_file, sample_annotation_file):
258 """
259 Test 7: Output File Formats
260
261 Tests that all output files are created with correct structure and content,
262 including text files, Excel files with multiple sheets, and plot files.
263 """
264 output_dir = test_data_dir
265
266 # Parse data
267 clusters = parse_cluster_file(sample_cluster_file, sample_annotation_file)
268
269 # Process all clusters
270 cluster_data_list = []
271 all_eval_data = [0]
272 all_simi_data = []
273
274 for cluster in clusters:
275 eval_list, simi_list, taxa_dict = process_cluster_data(cluster)
276 cluster_data_list.append((eval_list, simi_list, taxa_dict))
277 all_eval_data[0] += eval_list[0]
278 all_eval_data.extend(eval_list[1:])
279 all_simi_data.extend(simi_list)
280
281 # Test similarity output
282 simi_output = output_dir / "test_similarity.txt"
283 write_similarity_output(all_simi_data, str(simi_output))
284
285 assert simi_output.exists(), "Similarity output file not created"
286 with open(simi_output, 'r') as f:
287 content = f.read()
288 assert "# Average similarity:" in content, "Missing average similarity in output"
289 assert "# Standard deviation:" in content, "Missing standard deviation in output"
290 assert "similarity\tcount" in content, "Missing header in similarity output"
291
292 # Test e-value output
293 eval_output = output_dir / "test_evalue.txt"
294 write_evalue_output(all_eval_data, str(eval_output))
295
296 assert eval_output.exists(), "E-value output file not created"
297 with open(eval_output, 'r') as f:
298 content = f.read()
299 assert "evalue\tcount" in content, "Missing header in e-value output"
300
301 # Test count output
302 count_output = output_dir / "test_count.txt"
303 write_count_output(all_eval_data, cluster_data_list, str(count_output))
304
305 assert count_output.exists(), "Count output file not created"
306 with open(count_output, 'r') as f:
307 content = f.read()
308 assert "cluster\tunannotated\tannotated" in content, "Missing header in count output"
309 assert "TOTAL\t" in content, "Missing total row in count output"
310
311 # Test taxa clusters Excel output
312 taxa_clusters_output = output_dir / "test_taxa_clusters.xlsx"
313 write_taxa_clusters_output(cluster_data_list, str(taxa_clusters_output))
314
315 assert taxa_clusters_output.exists(), "Taxa clusters Excel file not created"
316 df = pd.read_excel(taxa_clusters_output, sheet_name='Raw_Taxa_Clusters')
317 expected_columns = ['cluster', 'count', 'taxa_full', 'kingdom', 'phylum', 'class', 'order', 'family', 'genus',
318 'species']
319 assert all(col in df.columns for col in expected_columns), "Missing columns in taxa clusters output"
320
321 print("✓ Test 7 PASSED: All output file formats are correct and complete")
322
323 def test_taxa_processed_output_structure(self, test_data_dir, sample_cluster_file, sample_annotation_file):
324 """
325 Test 8: Processed Taxa Output Structure
326
327 Tests the complex processed taxa Excel output with multiple sheets
328 and parameter tracking.
329 """
330 output_dir = test_data_dir
331
332 class TestArgs:
333 uncertain_taxa_use_ratio = 0.6
334 min_to_split = 0.35
335 min_count_to_split = 15
336 show_unannotated_clusters = True
337
338 args = TestArgs()
339
340 # Parse and process data
341 clusters = parse_cluster_file(sample_cluster_file, sample_annotation_file)
342 cluster_data_list = []
343
344 for cluster in clusters:
345 eval_list, simi_list, taxa_dict = process_cluster_data(cluster)
346 cluster_data_list.append((eval_list, simi_list, taxa_dict))
347
348 # Test processed taxa output
349 processed_output = output_dir / "test_processed_taxa.xlsx"
350 write_taxa_processed_output(cluster_data_list, args, str(processed_output))
351
352 assert processed_output.exists(), "Processed taxa Excel file not created"
353
354 # Check multiple sheets exist
355 xl_file = pd.ExcelFile(processed_output)
356 expected_sheets = ['Processed_Taxa_Clusters', 'Settings']
357 assert all(sheet in xl_file.sheet_names for sheet in expected_sheets), "Missing sheets in processed taxa output"
358
359 # Check main data sheet
360 df_main = pd.read_excel(processed_output, sheet_name='Processed_Taxa_Clusters')
361 expected_columns = ['cluster', 'count', 'taxa_full', 'kingdom', 'phylum', 'class', 'order', 'family', 'genus',
362 'species']
363 assert all(col in df_main.columns for col in expected_columns), "Missing columns in processed taxa sheet"
364
365 # Check settings sheet
366 df_settings = pd.read_excel(processed_output, sheet_name='Settings')
367 assert 'Parameter' in df_settings.columns, "Missing Parameter column in settings"
368 assert 'Value' in df_settings.columns, "Missing Value column in settings"
369
370 # Verify settings values are recorded
371 settings_dict = dict(zip(df_settings['Parameter'], df_settings['Value']))
372 assert settings_dict['uncertain_taxa_use_ratio'] == 0.6, "Settings not correctly recorded"
373 assert settings_dict['min_to_split'] == 0.35, "Settings not correctly recorded"
374
375 print("✓ Test 8 PASSED: Processed taxa output has correct structure and settings tracking")
376
377 def test_edge_cases(self, test_data_dir):
378 """
379 Test 9: Edge Cases and Error Handling
380
381 Tests handling of edge cases like empty files, missing annotations,
382 single-member clusters, and malformed input data.
383 """
384 input_dir = test_data_dir
385
386 # Test empty cluster file
387 empty_cluster = input_dir / "empty_cluster.clstr"
388 with open(empty_cluster, 'w') as f:
389 f.write("")
390
391 clusters_empty = parse_cluster_file(str(empty_cluster))
392 assert len(clusters_empty) == 0, "Empty cluster file should produce no clusters"
393
394 # Test cluster file with no annotations
395 simple_cluster = input_dir / "simple_cluster.clstr"
396 simple_cluster_content = """>Cluster 0
397 0 100nt, >read_no_anno:50... *
398 """
399 with open(simple_cluster, 'w') as f:
400 f.write(simple_cluster_content)
401
402 with pytest.raises(UnboundLocalError):
403 parse_cluster_file(str(simple_cluster), raise_on_error=True)
404
405 # Test malformed cluster entries (missing parts)
406 malformed_cluster = input_dir / "malformed_cluster.clstr"
407 malformed_content = """>Cluster 0
408 0 100nt, >read1:50..._CONS(50) *
409 invalid_line_without_proper_format
410 1 90nt, >read2:25..._CONS(25) at /+/95%
411 """
412 annotations_malformed = input_dir / "test_pytest.xlsx"
413 with open(malformed_cluster, 'w') as f:
414 f.write(malformed_content)
415
416 clusters_malformed = parse_cluster_file(str(malformed_cluster), str(annotations_malformed))
417 # Should still parse valid entries and skip invalid ones
418 assert len(clusters_malformed) == 1, "Should parse valid entries from malformed file"
419 assert len(clusters_malformed[0]) == 2, "Should have 2 valid read"
420 assert clusters_malformed[0]['read1:50..._CONS']['evalue'] == 1.0e-50
421 assert clusters_malformed[0]['read2:25..._CONS']['count'] == 25
422
423 print("✓ Test 9 PASSED: Edge cases handled gracefully without crashes")
424
425 def test_count_preservation_across_processing(self, parsed_clusters):
426 """
427 Test 10: Count Preservation Across Processing Pipeline
428
429 Verifies that read counts are preserved throughout the entire processing
430 pipeline from cluster parsing through taxa calculation to final output.
431 """
432 # Calculate expected total counts from original data
433 expected_total = 0
434 for cluster in parsed_clusters:
435 for header, data in cluster.items():
436 expected_total += data['count']
437
438 # Process through pipeline and verify counts at each stage
439 total_from_processing = 0
440 taxa_processing_totals = []
441
442 class TestArgs:
443 uncertain_taxa_use_ratio = 0.5
444 min_to_split = 0.45
445 min_count_to_split = 10
446
447 args = TestArgs()
448
449 for cluster in parsed_clusters:
450 eval_list, simi_list, taxa_dict = process_cluster_data(cluster)
451
452 # Check that cluster processing preserves counts
453 cluster_total = eval_list[0] + len(eval_list[1:]) # unannotated + annotated
454 cluster_expected = sum(data['count'] for data in cluster.values())
455 assert cluster_total == cluster_expected, f"Count mismatch in cluster processing: expected {cluster_expected}, got {cluster_total}"
456
457 total_from_processing += cluster_total
458
459 # Check taxa calculation preserves counts
460 taxa_results = calculate_cluster_taxa(taxa_dict, args)
461 taxa_total = sum(sum(group.values()) for group in taxa_results)
462 taxa_processing_totals.append(taxa_total)
463
464 # Verify taxa dict total matches
465 taxa_dict_total = sum(taxa_dict.values())
466 assert taxa_total <= taxa_dict_total, f"Count mismatch in taxa calculation: expected {taxa_dict_total}, got {taxa_total}"
467
468 # Final verification
469 assert total_from_processing == expected_total, f"Total count preservation failed: expected {expected_total}, got {total_from_processing}"
470
471 # Verify sum of all taxa processing equals original
472 total_taxa_processed = sum(taxa_processing_totals)
473 assert total_taxa_processed <= expected_total, f"Taxa processing total mismatch: expected {expected_total}, got {total_taxa_processed}"
474
475 print("✓ Test 10 PASSED: Read counts preserved throughout entire processing pipeline")
476
477 def test_11_parse_arguments_all_flags(self, tmp_path):
478 """
479 Test 11: Argument Parsing with All Flags
480
481 Ensures parse_arguments correctly handles all optional flags and values.
482 """
483 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
484 args = ca.parse_arguments([
485 '--input_cluster', str(tmp_path / "dummy.clstr"),
486 '--simi_plot_y_min', '90',
487 '--simi_plot_y_max', '99',
488 '--uncertain_taxa_use_ratio', '0.3',
489 '--min_to_split', '0.2',
490 '--min_count_to_split', '5',
491 '--show_unannotated_clusters',
492 '--make_taxa_in_cluster_split',
493 '--print_empty_files'
494 ])
495 assert args.simi_plot_y_min == 90
496 assert args.print_empty_files is True
497
498 def test_12_process_cluster_data_valueerror(self):
499 """
500 Test 12: Process Cluster Data with Bad E-value
501
502 Ensures ValueError branches are handled and unannotated counts increase.
503 """
504 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
505 cluster = {
506 "seq1": {"count": 1, "similarity": 95.0, "taxa": "taxonA", "evalue": "not_a_number"}
507 }
508 eval_list, simi_list, taxa_dict = ca.process_cluster_data(cluster)
509 assert eval_list[0] == 1 # unannotated read
510
511 def test_13_write_similarity_and_evalue_empty(self, tmp_path):
512 """
513 Test 13: Output Writers with Empty Data
514 """
515 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
516 sim_file = tmp_path / "sim.txt"
517 eval_file = tmp_path / "eval.txt"
518
519 ca.write_similarity_output([], str(sim_file))
520 assert not sim_file.exists() or sim_file.read_text() == ""
521
522 ca.write_evalue_output([5], str(eval_file))
523 assert "unannotated" in eval_file.read_text()
524
525 def test_14_write_count_zero_and_taxa_clusters_incomplete(self, tmp_path):
526 """
527 Test 14: Count Writer with Zero Data and Taxa Clusters with Incomplete Taxa
528 """
529 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
530 count_file = tmp_path / "count.txt"
531 taxa_file = tmp_path / "taxa.xlsx"
532
533 ca.write_count_output([0], [], str(count_file))
534 assert "TOTAL" in count_file.read_text()
535
536 cluster_data = [([0], [], {"bad": 1})]
537 ca.write_taxa_clusters_output(cluster_data, str(taxa_file))
538 assert taxa_file.exists()
539
540 def test_15_write_taxa_processed_uncertain_and_settings(self, tmp_path):
541 """
542 Test 15: Processed Taxa Output with Settings
543 """
544 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
545
546 class Args:
547 uncertain_taxa_use_ratio = 0.5
548 min_to_split = 0.2
549 min_count_to_split = 2
550 show_unannotated_clusters = True
551
552 out_file = tmp_path / "processed.xlsx"
553 cluster_data = [([0], [], {"Unannotated read": 2})]
554 ca.write_taxa_processed_output(cluster_data, Args(), str(out_file))
555 assert out_file.exists()
556
557 def test_16_create_evalue_plot_edge_cases(self, tmp_path):
558 """
559 Test 16: E-value Plot Edge Cases
560 """
561 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
562 out = tmp_path / "plot.png"
563
564 # Only unannotated
565 ca.create_evalue_plot([0], [0], str(out))
566 assert not out.exists() or out.stat().st_size == 0
567
568 # Empty after filtering
569 ca.create_evalue_plot([0, ], [], str(out))
570 assert not out.exists() or out.stat().st_size == 0
571
572 # With valid values
573 ca.create_evalue_plot([0, 1e-5, 1e-3], [2], str(out))
574 assert out.exists()
575
576 def test_17_main_runs_and_prints(self, tmp_path, capsys):
577 """
578 Test 17: Main Entry Point
579 """
580 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
581 clstr = tmp_path / "simple.clstr"
582 clstr.write_text(">Cluster 0\n0 100nt, >seq1... *\n")
583
584 out = tmp_path / "sim.txt"
585 args = [
586 '--input_cluster', str(clstr),
587 '--output_similarity_txt', str(out)
588 ]
589 ca.main(args)
590 captured = capsys.readouterr()
591 assert "Processing complete" in captured.out
592
593
594 def test_16a_prepare_evalue_histogram_valid_data(self):
595 """
596 Test 16a: prepare_evalue_histogram returns correct counts/bins.
597 """
598 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
599 counts, bins = ca.prepare_evalue_histogram([1e-5, 1e-3, 0.5], [])
600 assert counts.sum() == 3 # 3 entries counted
601 assert len(bins) == 51 # 50 bins => 51 edges
602
603 def test_16b_prepare_evalue_histogram_empty(self):
604 """
605 Test 16b: prepare_evalue_histogram with empty/invalid data returns (None, None).
606 """
607 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
608 counts, bins = ca.prepare_evalue_histogram([0, None, "bad"], [])
609 assert counts is None
610 assert bins is None
611
612 def test_16c_create_evalue_plot_creates_file_and_returns_data(self, tmp_path):
613 """
614 Test 16c: create_evalue_plot saves a PNG and returns numeric data.
615 """
616 from Stage_1_translated.NLOOR_scripts.process_clusters_tool import cdhit_analysis as ca
617 out = tmp_path / "eval.png"
618 counts, bins = ca.create_evalue_plot_test([1e-5, 1e-3, 0.5], [], str(out))
619 assert out.exists()
620 assert counts.sum() == 3
621 assert len(bins) == 51
622
623
624 if __name__ == "__main__":
625 # Run all tests in this file
626 pytest.main([__file__])