comparison tests/test_blast_annotations_processor.py @ 0:a3989edf0a4a draft

planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_annotations_tool commit c944fd5685f295acba06679e85b67973c173b137
author onnodg
date Tue, 14 Oct 2025 09:08:30 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:a3989edf0a4a
1 """
2 Test suite for BLAST annotation processor.
3 """
4
5 import pytest
6 import os
7 import sys
8 import json
9 import pandas as pd
10 from pathlib import Path
11
12 # Add the module to path for importing
13 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
14 from Stage_1_translated.NLOOR_scripts.process_annotations_tool.blast_annotations_processor import (
15 process_single_file,
16 resolve_taxon_majority,
17 TAXONOMIC_LEVELS
18 )
19
20
21 class TestBlastAnnotationProcessor:
22 """Test class for BLAST annotation processor"""
23
24 @pytest.fixture(scope="class")
25 def test_data_dir(self):
26 """Setup test data directory structure"""
27 base_dir = Path("test-data")
28 base_dir.mkdir(exist_ok=True)
29
30 # Create subdirectories
31 for subdir in ["input", "expected", "output"]:
32 (base_dir / subdir).mkdir(exist_ok=True)
33
34 return base_dir
35
36 @pytest.fixture(scope="class")
37 def sample_files(self, test_data_dir):
38 """Create sample input files for testing"""
39 input_dir = test_data_dir / "input"
40
41 # Sample annotated BLAST file
42 blast_content = """#Query ID #Subject #Subject accession #Subject Taxonomy ID #Identity percentage #Coverage #evalue #bitscore #Source #Taxonomy
43 read1(100) subject2 subject2 subject2 90.0 95 1e-45 180 database1 Bacteria / Firmicutes / Bacilli / Bacillales / Bacillaceae / Bacillus / Bacillus_subtilis
44 read1(100) subject1 subject1 subject1 95.889 100 1e-50 200 database1 Bacteria / Firmicutes / Bacilli / Bacillales / Bacillaceae / Bacillus / Bacillus_subtilis
45 read2(50) subject3 subject3 subject3 85.0 90 1e-40 160 database2 Bacteria / Proteobacteria / Gammaproteobacteria / Enterobacterales / Enterobacteriaceae / Escherichia / Escherichia_coli
46 read3(25) subject4 subject4 subject4 80.0 85 1e-35 140 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_smithii
47 read4(25) subject4 subject4 subject4 80.0 85 1e-35 140 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_blabla
48 read4(25) subject4 subject4 subject4 80.0 85 1e-40 140 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_eclhi
49 read4(25) subject4 subject4 subject4 80.0 85 1e-35 140 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_elchi
50 read4(25) subject4 subject4 subject4 90.0 87 1e-50 160 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_smithii
51 """
52
53 # Sample unannotated FASTA file (headers must match BLAST q_id)
54 fasta_content = """>read1(100) count=100;
55 ATCGATCGATCGATCG
56 >read2(50) count=50;
57 GCTAGCTAGCTAGCTA
58 >read3(25) count=25;
59 TGACTGACTGACTGAC
60 >read4(25) count=25;
61 TGAAAAAAACACCAC
62 """
63
64 blast_file = input_dir / "test_blast.tabular"
65 fasta_file = input_dir / "test_sequences.fasta"
66
67 with open(blast_file, 'w') as f:
68 f.write(blast_content)
69 with open(fasta_file, 'w') as f:
70 f.write(fasta_content)
71
72 return {
73 'blast': str(blast_file),
74 'fasta': str(fasta_file)
75 }
76
77 @pytest.fixture(scope="class")
78 def processed_output(self, test_data_dir, sample_files):
79 """Run the processor on sample files and return output paths"""
80 output_dir = test_data_dir / "output"
81
82 # Create arguments object
83 class Args:
84 def __init__(self):
85 self.input_anno = sample_files['blast']
86 self.input_unanno = sample_files['fasta']
87 self.eval_plot = str(output_dir / "eval_plot.png")
88 self.taxa_output = str(output_dir / "taxa_output.txt")
89 self.circle_data = str(output_dir / "circle_data.json")
90 self.header_anno = str(output_dir / "header_anno.xlsx")
91 self.anno_stats = str(output_dir / "anno_stats.txt")
92 self.uncertain_threshold = 0.9
93 self.eval_threshold = 1e-10
94 self.use_counts = True
95
96 args = Args()
97
98 # Process the files
99 process_single_file(args.input_anno, args.input_unanno, args)
100
101 return args
102
103 def test_data_integrity_best_values(self, processed_output):
104 """
105 Test 1: Data Integrity - Best Values Selection
106
107 Verifies that for each read, the best e-value corresponds to the correct
108 bitscore, identity, coverage, and taxonomic annotation.
109 """
110 # Read the Excel output to verify the best values are correctly selected
111 df = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads')
112 print(df)
113 # For read1(100), verify best e-value (1e-50) corresponds to correct values
114 read1_row = df[df['header'].str.contains('read1')].iloc[0]
115 assert read1_row['bitscore'] == float(200), "best bitscore doesn't match"
116 assert read1_row['e_value'] == pytest.approx(1e-50, rel=1e-8, abs=1e-49), "Best e-value not correctly selected for read1"
117 assert read1_row['identity percentage'] == float(95.889), "Identity doesn't match best bitscore for read1"
118 assert 'Bacillus_subtilis' in read1_row['taxa'], "Taxa doesn't match best hit for read1"
119
120 read4_row = df[df['header'].str.contains('read4')].iloc[0]
121 assert read4_row['bitscore'] == float(160), "best bitscore doesn't match"
122 assert 'Methanobrevibacter_smithii' in read4_row['taxa'], "Taxa doesn't match best hit for read1"
123 print("✓ Test 1 PASSED: Best values correctly associated for each read")
124
125 def test_read_count_consistency(self, processed_output):
126 """
127 Test 2: Read Count Consistency
128
129 Verifies that read counts from FASTA headers are correctly preserved
130 and aggregated in all output files.
131 """
132 # Check Excel output
133 df = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads')
134 # Verify counts are correctly extracted and preserved
135 expected_counts = {'read1': 100, 'read2': 50, 'read3': 25, 'read4':25}
136
137 skipped_reads = []
138
139 for read_name, expected_count in expected_counts.items():
140 subset = df.loc[df['header'] == read_name]
141 if subset.empty:
142 skipped_reads.append(read_name) # remember we skip this read
143 continue
144 row = subset.iloc[0]
145 assert row['count'] == expected_count, f"Count mismatch for {read_name}"
146
147 # Check annotation stats
148 with open(processed_output.anno_stats, 'r') as f:
149 stats_content = f.read()
150 # Total unique count should be 175 (100+50+25)
151 assert 'total_unique\t200' in stats_content, "Total unique count incorrect in stats"
152 if skipped_reads:
153 assert all(read not in df['header'].values for read in skipped_reads)
154 print("✓ Test 2 PASSED: Read counts consistent across all outputs")
155
156 def test_lowest_common_ancester(self, processed_output):
157 """
158 Test 3: Big Input Files
159
160 Tests the functioning of lowest common ancestor selection with realistic inputfile sizes
161 """
162 # Test the function directly with known conflicts
163 test_conflicts = {
164 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita a': 10,
165 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita b': 1,
166 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita c': 1,
167 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita d': 1,
168 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita e': 1,
169 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia a': 187,
170 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia b': 2,
171 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia c': 2,
172 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia d': 2,
173 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia e': 2,
174 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia f': 12,
175 'Viridiplantae / Streptophyta / Bryopsida / Funariales / Funariaceae / Funaria / Uncertain taxa': 6
176 }
177 resolved_short, resolved_long = resolve_taxon_majority(test_conflicts, 0.9)
178 assert 'Ciceronia a' in resolved_short, "Conflict not resolved to uncertain taxa"
179
180 test_90_precent_conflicts = {
181 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita a': 90,
182 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita b': 10,
183 'Viridiplantae / Streptophyta / Bryopsida / Funariales / Funariaceae / Funaria / Uncertain taxa': 6
184 }
185 resolved_short, resolved_long = resolve_taxon_majority(test_90_precent_conflicts, 0.9)
186 assert 'Cicerbita a' in resolved_short, "Conflict not resolved to uncertain taxa"
187
188 print("✓ Test 3 PASSED: Lowest common ancestor works correctly")
189
190
191 def test_taxonomic_conflict_resolution(self, processed_output):
192 """
193 Test 4: Taxonomic Conflict Resolution
194
195 Tests the uncertainty threshold mechanism for resolving taxonomic conflicts.
196 Uses a controlled scenario where multiple hits have different taxa.
197 """
198 # Test the function directly with known conflicts
199 test_conflicts = {
200 'Bacteria / Firmicutes / Bacilli': 2,
201 'Bacteria / Proteobacteria / Gammaproteobacteria': 1
202 }
203
204 resolved_short, resolved_long = resolve_taxon_majority(test_conflicts, 0.9)
205
206 # With threshold 0.9, should resolve to most common (2/3 = 0.67 < 0.9, so uncertain)
207 assert 'Uncertain taxa' in resolved_short, "Conflict not resolved to uncertain taxa"
208
209 # Test with higher confidence
210 test_high_confidence = {
211 'Bacteria / Firmicutes / Bacilli': 9,
212 'Bacteria / Proteobacteria / Gammaproteobacteria': 1
213 }
214
215 resolved_short, resolved_long = resolve_taxon_majority(test_high_confidence, 0.9)
216 assert 'Firmicutes' in resolved_short, "High confidence case not resolved correctly"
217
218 print("✓ Test 4 PASSED: Taxonomic conflict resolution working correctly")
219
220 def test_output_file_structures(self, processed_output):
221 """
222 Test 5: Output File Structure Validation
223
224 Verifies that all output files are created with correct structure and format.
225 """
226 # Test Excel file structure
227 excel_file = processed_output.header_anno
228 assert os.path.exists(excel_file), "Excel output file not created"
229
230 # Check both sheets exist
231 xl_file = pd.ExcelFile(excel_file)
232 expected_sheets = ['Individual_Reads', 'Merged_by_Taxa']
233 assert all(sheet in xl_file.sheet_names for sheet in expected_sheets), "Missing Excel sheets"
234
235 # Test Individual_Reads sheet structure
236 df_individual = pd.read_excel(excel_file, sheet_name='Individual_Reads')
237 expected_cols = ['header', 'e_value', 'identity percentage', 'coverage',
238 'bitscore', 'count', 'source', 'taxa']
239 assert all(col in df_individual.columns for col in expected_cols), "Missing columns in Individual_Reads"
240
241 # Test taxa output structure
242 with open(processed_output.taxa_output, 'r') as f:
243 taxa_lines = f.readlines()
244
245 # Should have header line and data lines
246 assert len(taxa_lines) > 2, "Taxa output too short"
247 assert 'percentage_rooted\tnumber_rooted' in taxa_lines[1], "Taxa output header incorrect"
248
249 # Test circle data JSON structure
250 with open(processed_output.circle_data, 'r') as f:
251 circle_data = json.load(f)
252
253 assert isinstance(circle_data, list), "Circle data should be a list"
254 assert len(circle_data) == len(TAXONOMIC_LEVELS), "Circle data should have entry per taxonomic level"
255
256 print("✓ Test 5 PASSED: All output files have correct structure")
257
258 def test_evalue_filtering(self, test_data_dir):
259 """
260 Test 6: E-value Threshold Filtering
261
262 Tests that hits above the e-value threshold are correctly filtered out.
263 """
264 input_dir = test_data_dir / "input"
265 output_dir = test_data_dir / "output"
266
267 # Create test file with mix of good and bad e-values
268 blast_content_mixed = """#Query ID #Subject #Subject accession #Subject Taxonomy ID #Identity percentage #Coverage #evalue #bitscore #Source #Taxonomy
269 read1(100) subject1 95.0 100 50 75 1e-50 200 database1 Viridiplantae / Streptophyta / Magnoliopsida / Fagales / Juglandaceae / Uncertain taxa / Uncertain taxa
270 read1(100) subject2 90.0 95 45 70 1e-5 180 database1 Viridiplantae / Streptophyta / Magnoliopsida / Rosales / Rosaceae / Sorbus / Sorbus aucuparia
271 read2(50) subject3 85.0 90 40 65 1e-3 160 database2 Viridiplantae / Streptophyta / Magnoliopsida / Solanales / Solanaceae / Uncertain taxa / Uncertain taxa
272 """
273
274 fasta_content = """>read1(100) count=100;
275 ATCG
276 >read2(50) count=50;
277 GCTA
278 """
279
280 blast_file = "Stage_1_translated/NLOOR_scripts/process_annotations_tool/test-data/sorted_test.tabular"
281 fasta_file = "Stage_1_translated/NLOOR_scripts/process_annotations_tool/test-data/sorted_test.fasta"
282
283 with open(blast_file, 'w') as f:
284 f.write(blast_content_mixed)
285 with open(fasta_file, 'w') as f:
286 f.write(fasta_content)
287
288 # Process with strict e-value threshold
289 class Args:
290 def __init__(self):
291 self.input_anno = str(blast_file)
292 self.input_unanno = str(fasta_file)
293 self.header_anno = str(output_dir / "evalue_test.xlsx")
294 self.eval_plot = None
295 self.taxa_output = None
296 self.circle_data = None
297 self.anno_stats = None
298 self.uncertain_threshold = 0.9
299 self.eval_threshold = 1e-10 # Very strict threshold
300 self.use_counts = True
301
302 args = Args()
303 process_single_file(args.input_anno, args.input_unanno, args)
304
305 # Check that only the 1e-50 hit remains
306 df = pd.read_excel(args.header_anno, sheet_name='Individual_Reads')
307
308 # Should only have read1 (with 1e-50), read2 should be filtered out
309 assert len(df) == 1, f"Expected 1 read after filtering, got {len(df)}"
310 assert df.iloc[0]['e_value'] == pytest.approx(1e-50, rel=1e-8, abs=1e-12), "Wrong hit survived e-value filtering"
311
312 print("✓ Test 6 PASSED: E-value filtering working correctly")
313
314 def test_header_synchronization(self, test_data_dir):
315 """
316 Test 7: Header Synchronization Between Files
317
318 Tests that the processor correctly handles mismatched headers between
319 annotated and unannotated files.
320 """
321 input_dir = test_data_dir / "input"
322 output_dir = test_data_dir / "output"
323
324 # Create mismatched files
325 blast_content = """#Query ID #Subject #Subject accession #Subject Taxonomy ID #Identity percentage #Coverage #evalue #bitscore #Source #Taxonomy
326 read1(100) source=NCBI sequenceID=KR738003 superkingdom=Eukaryota kingdom=Viridiplantae phylum=Streptophyta subphylum=Streptophytina class=Magnoliopsida subclass=NA infraclass=NA order=Malvales suborder=NA infraorder=NA superfamily=NA family=Malvaceae genus=Hibiscus species=Hibiscus trionum markercode=trnL lat=0.304 lon=36.87 source=NCBI N/A 100.000 100 7.35e-30 54.7 Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum
327 read1(100) source=NCBI sequenceID=KR738670 superkingdom=Eukaryota kingdom=Viridiplantae phylum=Streptophyta subphylum=Streptophytina class=Magnoliopsida subclass=NA infraclass=NA order=Malvales suborder=NA infraorder=NA superfamily=NA family=Malvaceae genus=Hibiscus species=Hibiscus trionum markercode=trnL lat=0.304 lon=36.87 source=NCBI N/A 100.000 100 7.35e-14 54.7 Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum
328 read3(25) source=NCBI sequenceID=KR737595 superkingdom=Eukaryota kingdom=Viridiplantae phylum=Streptophyta subphylum=Streptophytina class=Magnoliopsida subclass=NA infraclass=NA order=Malvales suborder=NA infraorder=NA superfamily=NA family=Malvaceae genus=Hibiscus species=Hibiscus trionum markercode=trnL lat=0.304 lon=36.87 source=NCBI N/A 97.561 87 1.68e-14 71.3 Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum
329 """
330
331 fasta_content = """>read1(100) count=100;
332 ATCG
333 >read2(50) merged_sample={}; count=1011; direction=right; seq_b_insertion=0; sminR=40.0; ali_length=53; seq_b_deletion=248; seq_a_deletion=248; seq_a_insertion=0; mode=alignment; sminL=40.0; seq_a_single=0; seq_b_single=0;
334 gggcaatcctgagccaagtgactggagttcagataggtgcagagactcaatgg
335 >read3(25) merged_sample={}; count=179; direction=right; sminR=40.0; ali_length=49; seq_b_deletion=252; seq_a_deletion=252; seq_b_insertion=0; seq_a_insertion=0; mode=alignment; sminL=40.0; seq_a_single=0; seq_b_single=0;
336 gggcaatcctgagccaactggagttcagataggtgcagagactcaatgg
337 """
338
339 blast_file = input_dir / "test_sync.tabular"
340 fasta_file = input_dir / "test_sync.fasta"
341
342 with open(blast_file, 'w') as f:
343 f.write(blast_content)
344 with open(fasta_file, 'w') as f:
345 f.write(fasta_content)
346
347 class Args:
348 def __init__(self):
349 self.input_anno = blast_file
350 self.input_unanno = fasta_file
351 self.header_anno = "Stage_1_translated/NLOOR_scripts/process_annotations_tool/test-data/sync_test.xlsx"
352 self.eval_plot = None
353 self.taxa_output = None
354 self.circle_data = None
355 self.anno_stats = str(output_dir / "sync_stats.txt")
356 self.uncertain_threshold = 0.9
357 self.eval_threshold = 1e-10
358 self.use_counts = True
359
360 args = Args()
361 process_single_file(args.input_anno, args.input_unanno, args)
362 # Check that processing handled the mismatch correctly
363 df = pd.read_excel(args.header_anno, sheet_name='Individual_Reads')
364 extracted = df['header'].str.extract(r'(read\d+)')
365 # final list
366 headers = extracted[0].tolist()
367 # Should have read1 and read3, read2 should be skipped
368 assert 'read1' in headers, "read1 should be present"
369 assert 'read3' in headers, "read3 should be present"
370
371 print("✓ Test 7 PASSED: Header synchronization handled correctly")
372
373 def test_excel_merged_vs_individual(self, processed_output):
374 """
375 Test 8: Excel Merged vs Individual Sheet Consistency
376
377 Verifies that the merged sheet correctly aggregates data from the individual sheet.
378 """
379 df_individual = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads')
380 df_merged = pd.read_excel(processed_output.header_anno, sheet_name='Merged_by_Taxa')
381
382 # Count unique taxa in individual sheet
383 individual_taxa = df_individual['taxa'].nunique()
384
385 # Should match number of rows in merged sheet
386 assert len(df_merged) == individual_taxa, "Merged sheet doesn't match unique taxa count"
387
388 # Check that counts are properly aggregated
389 # For taxa with multiple reads, counts should be summed
390 for _, merged_row in df_merged.iterrows():
391 taxa = merged_row['taxa']
392 individual_rows = df_individual[df_individual['taxa'] == taxa]
393
394 expected_count = individual_rows['count'].sum()
395 actual_count = merged_row['count']
396
397 assert actual_count == expected_count, f"Count mismatch for taxa {taxa}: expected {expected_count}, got {actual_count}"
398
399 print("✓ Test 8 PASSED: Excel merged sheet correctly aggregates individual data")
400
401 def test_annotation_statistics_accuracy(self, processed_output, sample_files):
402 """
403 Test 9: Annotation Statistics Accuracy
404
405 Verifies that calculated annotation statistics match the actual data.
406 """
407 # Read stats file
408 stats = {}
409 with open(processed_output.anno_stats, 'r') as f:
410 lines = f.readlines()[1:] # Skip header
411 for line in lines:
412 key, value = line.strip().split('\t')
413 try:
414 stats[key] = float(value)
415 except ValueError:
416 stats[key] = value
417
418 # Manual verification
419 assert stats['total_sequences'] == 4.0, "Total sequences count incorrect"
420 assert stats['annotated_sequences'] == 3.0, "Annotated sequences count incorrect"
421 assert stats['total_unique'] == 200, "Total unique count incorrect"
422 assert stats['unique_annotated'] == 150, "Unique annotated count incorrect"
423 assert stats['percentage_annotated'] == 75.0, "Percentage annotated incorrect"
424 assert stats['percentage_unique_annotated'] == 75.0, "Percentage unique annotated incorrect"
425
426 print("✓ Test 9 PASSED: Annotation statistics are accurate")
427
428
429
430 if __name__ == "__main__":
431 # Run all tests in this file
432 pytest.main([__file__])