Mercurial > repos > onnodg > blast_annotations_processor
comparison tests/test_blast_annotations_processor.py @ 0:a3989edf0a4a draft
planemo upload for repository https://github.com/Onnodg/Naturalis_NLOOR/tree/main/NLOOR_scripts/process_annotations_tool commit c944fd5685f295acba06679e85b67973c173b137
| author | onnodg |
|---|---|
| date | Tue, 14 Oct 2025 09:08:30 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:a3989edf0a4a |
|---|---|
| 1 """ | |
| 2 Test suite for BLAST annotation processor. | |
| 3 """ | |
| 4 | |
| 5 import pytest | |
| 6 import os | |
| 7 import sys | |
| 8 import json | |
| 9 import pandas as pd | |
| 10 from pathlib import Path | |
| 11 | |
| 12 # Add the module to path for importing | |
| 13 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| 14 from Stage_1_translated.NLOOR_scripts.process_annotations_tool.blast_annotations_processor import ( | |
| 15 process_single_file, | |
| 16 resolve_taxon_majority, | |
| 17 TAXONOMIC_LEVELS | |
| 18 ) | |
| 19 | |
| 20 | |
| 21 class TestBlastAnnotationProcessor: | |
| 22 """Test class for BLAST annotation processor""" | |
| 23 | |
| 24 @pytest.fixture(scope="class") | |
| 25 def test_data_dir(self): | |
| 26 """Setup test data directory structure""" | |
| 27 base_dir = Path("test-data") | |
| 28 base_dir.mkdir(exist_ok=True) | |
| 29 | |
| 30 # Create subdirectories | |
| 31 for subdir in ["input", "expected", "output"]: | |
| 32 (base_dir / subdir).mkdir(exist_ok=True) | |
| 33 | |
| 34 return base_dir | |
| 35 | |
| 36 @pytest.fixture(scope="class") | |
| 37 def sample_files(self, test_data_dir): | |
| 38 """Create sample input files for testing""" | |
| 39 input_dir = test_data_dir / "input" | |
| 40 | |
| 41 # Sample annotated BLAST file | |
| 42 blast_content = """#Query ID #Subject #Subject accession #Subject Taxonomy ID #Identity percentage #Coverage #evalue #bitscore #Source #Taxonomy | |
| 43 read1(100) subject2 subject2 subject2 90.0 95 1e-45 180 database1 Bacteria / Firmicutes / Bacilli / Bacillales / Bacillaceae / Bacillus / Bacillus_subtilis | |
| 44 read1(100) subject1 subject1 subject1 95.889 100 1e-50 200 database1 Bacteria / Firmicutes / Bacilli / Bacillales / Bacillaceae / Bacillus / Bacillus_subtilis | |
| 45 read2(50) subject3 subject3 subject3 85.0 90 1e-40 160 database2 Bacteria / Proteobacteria / Gammaproteobacteria / Enterobacterales / Enterobacteriaceae / Escherichia / Escherichia_coli | |
| 46 read3(25) subject4 subject4 subject4 80.0 85 1e-35 140 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_smithii | |
| 47 read4(25) subject4 subject4 subject4 80.0 85 1e-35 140 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_blabla | |
| 48 read4(25) subject4 subject4 subject4 80.0 85 1e-40 140 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_eclhi | |
| 49 read4(25) subject4 subject4 subject4 80.0 85 1e-35 140 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_elchi | |
| 50 read4(25) subject4 subject4 subject4 90.0 87 1e-50 160 database1 Archaea / Euryarchaeota / Methanobacteria / Methanobacteriales / Methanobacteriaceae / Methanobrevibacter / Methanobrevibacter_smithii | |
| 51 """ | |
| 52 | |
| 53 # Sample unannotated FASTA file (headers must match BLAST q_id) | |
| 54 fasta_content = """>read1(100) count=100; | |
| 55 ATCGATCGATCGATCG | |
| 56 >read2(50) count=50; | |
| 57 GCTAGCTAGCTAGCTA | |
| 58 >read3(25) count=25; | |
| 59 TGACTGACTGACTGAC | |
| 60 >read4(25) count=25; | |
| 61 TGAAAAAAACACCAC | |
| 62 """ | |
| 63 | |
| 64 blast_file = input_dir / "test_blast.tabular" | |
| 65 fasta_file = input_dir / "test_sequences.fasta" | |
| 66 | |
| 67 with open(blast_file, 'w') as f: | |
| 68 f.write(blast_content) | |
| 69 with open(fasta_file, 'w') as f: | |
| 70 f.write(fasta_content) | |
| 71 | |
| 72 return { | |
| 73 'blast': str(blast_file), | |
| 74 'fasta': str(fasta_file) | |
| 75 } | |
| 76 | |
| 77 @pytest.fixture(scope="class") | |
| 78 def processed_output(self, test_data_dir, sample_files): | |
| 79 """Run the processor on sample files and return output paths""" | |
| 80 output_dir = test_data_dir / "output" | |
| 81 | |
| 82 # Create arguments object | |
| 83 class Args: | |
| 84 def __init__(self): | |
| 85 self.input_anno = sample_files['blast'] | |
| 86 self.input_unanno = sample_files['fasta'] | |
| 87 self.eval_plot = str(output_dir / "eval_plot.png") | |
| 88 self.taxa_output = str(output_dir / "taxa_output.txt") | |
| 89 self.circle_data = str(output_dir / "circle_data.json") | |
| 90 self.header_anno = str(output_dir / "header_anno.xlsx") | |
| 91 self.anno_stats = str(output_dir / "anno_stats.txt") | |
| 92 self.uncertain_threshold = 0.9 | |
| 93 self.eval_threshold = 1e-10 | |
| 94 self.use_counts = True | |
| 95 | |
| 96 args = Args() | |
| 97 | |
| 98 # Process the files | |
| 99 process_single_file(args.input_anno, args.input_unanno, args) | |
| 100 | |
| 101 return args | |
| 102 | |
| 103 def test_data_integrity_best_values(self, processed_output): | |
| 104 """ | |
| 105 Test 1: Data Integrity - Best Values Selection | |
| 106 | |
| 107 Verifies that for each read, the best e-value corresponds to the correct | |
| 108 bitscore, identity, coverage, and taxonomic annotation. | |
| 109 """ | |
| 110 # Read the Excel output to verify the best values are correctly selected | |
| 111 df = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads') | |
| 112 print(df) | |
| 113 # For read1(100), verify best e-value (1e-50) corresponds to correct values | |
| 114 read1_row = df[df['header'].str.contains('read1')].iloc[0] | |
| 115 assert read1_row['bitscore'] == float(200), "best bitscore doesn't match" | |
| 116 assert read1_row['e_value'] == pytest.approx(1e-50, rel=1e-8, abs=1e-49), "Best e-value not correctly selected for read1" | |
| 117 assert read1_row['identity percentage'] == float(95.889), "Identity doesn't match best bitscore for read1" | |
| 118 assert 'Bacillus_subtilis' in read1_row['taxa'], "Taxa doesn't match best hit for read1" | |
| 119 | |
| 120 read4_row = df[df['header'].str.contains('read4')].iloc[0] | |
| 121 assert read4_row['bitscore'] == float(160), "best bitscore doesn't match" | |
| 122 assert 'Methanobrevibacter_smithii' in read4_row['taxa'], "Taxa doesn't match best hit for read1" | |
| 123 print("✓ Test 1 PASSED: Best values correctly associated for each read") | |
| 124 | |
| 125 def test_read_count_consistency(self, processed_output): | |
| 126 """ | |
| 127 Test 2: Read Count Consistency | |
| 128 | |
| 129 Verifies that read counts from FASTA headers are correctly preserved | |
| 130 and aggregated in all output files. | |
| 131 """ | |
| 132 # Check Excel output | |
| 133 df = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads') | |
| 134 # Verify counts are correctly extracted and preserved | |
| 135 expected_counts = {'read1': 100, 'read2': 50, 'read3': 25, 'read4':25} | |
| 136 | |
| 137 skipped_reads = [] | |
| 138 | |
| 139 for read_name, expected_count in expected_counts.items(): | |
| 140 subset = df.loc[df['header'] == read_name] | |
| 141 if subset.empty: | |
| 142 skipped_reads.append(read_name) # remember we skip this read | |
| 143 continue | |
| 144 row = subset.iloc[0] | |
| 145 assert row['count'] == expected_count, f"Count mismatch for {read_name}" | |
| 146 | |
| 147 # Check annotation stats | |
| 148 with open(processed_output.anno_stats, 'r') as f: | |
| 149 stats_content = f.read() | |
| 150 # Total unique count should be 175 (100+50+25) | |
| 151 assert 'total_unique\t200' in stats_content, "Total unique count incorrect in stats" | |
| 152 if skipped_reads: | |
| 153 assert all(read not in df['header'].values for read in skipped_reads) | |
| 154 print("✓ Test 2 PASSED: Read counts consistent across all outputs") | |
| 155 | |
| 156 def test_lowest_common_ancester(self, processed_output): | |
| 157 """ | |
| 158 Test 3: Big Input Files | |
| 159 | |
| 160 Tests the functioning of lowest common ancestor selection with realistic inputfile sizes | |
| 161 """ | |
| 162 # Test the function directly with known conflicts | |
| 163 test_conflicts = { | |
| 164 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita a': 10, | |
| 165 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita b': 1, | |
| 166 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita c': 1, | |
| 167 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita d': 1, | |
| 168 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita e': 1, | |
| 169 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia a': 187, | |
| 170 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia b': 2, | |
| 171 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia c': 2, | |
| 172 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia d': 2, | |
| 173 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia e': 2, | |
| 174 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Ciceronia / Ciceronia f': 12, | |
| 175 'Viridiplantae / Streptophyta / Bryopsida / Funariales / Funariaceae / Funaria / Uncertain taxa': 6 | |
| 176 } | |
| 177 resolved_short, resolved_long = resolve_taxon_majority(test_conflicts, 0.9) | |
| 178 assert 'Ciceronia a' in resolved_short, "Conflict not resolved to uncertain taxa" | |
| 179 | |
| 180 test_90_precent_conflicts = { | |
| 181 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita a': 90, | |
| 182 'Viridiplantae / Streptophyta / Magnoliopsida / Asterales / Asteraceae / Cicerbita / Cicerbita b': 10, | |
| 183 'Viridiplantae / Streptophyta / Bryopsida / Funariales / Funariaceae / Funaria / Uncertain taxa': 6 | |
| 184 } | |
| 185 resolved_short, resolved_long = resolve_taxon_majority(test_90_precent_conflicts, 0.9) | |
| 186 assert 'Cicerbita a' in resolved_short, "Conflict not resolved to uncertain taxa" | |
| 187 | |
| 188 print("✓ Test 3 PASSED: Lowest common ancestor works correctly") | |
| 189 | |
| 190 | |
| 191 def test_taxonomic_conflict_resolution(self, processed_output): | |
| 192 """ | |
| 193 Test 4: Taxonomic Conflict Resolution | |
| 194 | |
| 195 Tests the uncertainty threshold mechanism for resolving taxonomic conflicts. | |
| 196 Uses a controlled scenario where multiple hits have different taxa. | |
| 197 """ | |
| 198 # Test the function directly with known conflicts | |
| 199 test_conflicts = { | |
| 200 'Bacteria / Firmicutes / Bacilli': 2, | |
| 201 'Bacteria / Proteobacteria / Gammaproteobacteria': 1 | |
| 202 } | |
| 203 | |
| 204 resolved_short, resolved_long = resolve_taxon_majority(test_conflicts, 0.9) | |
| 205 | |
| 206 # With threshold 0.9, should resolve to most common (2/3 = 0.67 < 0.9, so uncertain) | |
| 207 assert 'Uncertain taxa' in resolved_short, "Conflict not resolved to uncertain taxa" | |
| 208 | |
| 209 # Test with higher confidence | |
| 210 test_high_confidence = { | |
| 211 'Bacteria / Firmicutes / Bacilli': 9, | |
| 212 'Bacteria / Proteobacteria / Gammaproteobacteria': 1 | |
| 213 } | |
| 214 | |
| 215 resolved_short, resolved_long = resolve_taxon_majority(test_high_confidence, 0.9) | |
| 216 assert 'Firmicutes' in resolved_short, "High confidence case not resolved correctly" | |
| 217 | |
| 218 print("✓ Test 4 PASSED: Taxonomic conflict resolution working correctly") | |
| 219 | |
| 220 def test_output_file_structures(self, processed_output): | |
| 221 """ | |
| 222 Test 5: Output File Structure Validation | |
| 223 | |
| 224 Verifies that all output files are created with correct structure and format. | |
| 225 """ | |
| 226 # Test Excel file structure | |
| 227 excel_file = processed_output.header_anno | |
| 228 assert os.path.exists(excel_file), "Excel output file not created" | |
| 229 | |
| 230 # Check both sheets exist | |
| 231 xl_file = pd.ExcelFile(excel_file) | |
| 232 expected_sheets = ['Individual_Reads', 'Merged_by_Taxa'] | |
| 233 assert all(sheet in xl_file.sheet_names for sheet in expected_sheets), "Missing Excel sheets" | |
| 234 | |
| 235 # Test Individual_Reads sheet structure | |
| 236 df_individual = pd.read_excel(excel_file, sheet_name='Individual_Reads') | |
| 237 expected_cols = ['header', 'e_value', 'identity percentage', 'coverage', | |
| 238 'bitscore', 'count', 'source', 'taxa'] | |
| 239 assert all(col in df_individual.columns for col in expected_cols), "Missing columns in Individual_Reads" | |
| 240 | |
| 241 # Test taxa output structure | |
| 242 with open(processed_output.taxa_output, 'r') as f: | |
| 243 taxa_lines = f.readlines() | |
| 244 | |
| 245 # Should have header line and data lines | |
| 246 assert len(taxa_lines) > 2, "Taxa output too short" | |
| 247 assert 'percentage_rooted\tnumber_rooted' in taxa_lines[1], "Taxa output header incorrect" | |
| 248 | |
| 249 # Test circle data JSON structure | |
| 250 with open(processed_output.circle_data, 'r') as f: | |
| 251 circle_data = json.load(f) | |
| 252 | |
| 253 assert isinstance(circle_data, list), "Circle data should be a list" | |
| 254 assert len(circle_data) == len(TAXONOMIC_LEVELS), "Circle data should have entry per taxonomic level" | |
| 255 | |
| 256 print("✓ Test 5 PASSED: All output files have correct structure") | |
| 257 | |
| 258 def test_evalue_filtering(self, test_data_dir): | |
| 259 """ | |
| 260 Test 6: E-value Threshold Filtering | |
| 261 | |
| 262 Tests that hits above the e-value threshold are correctly filtered out. | |
| 263 """ | |
| 264 input_dir = test_data_dir / "input" | |
| 265 output_dir = test_data_dir / "output" | |
| 266 | |
| 267 # Create test file with mix of good and bad e-values | |
| 268 blast_content_mixed = """#Query ID #Subject #Subject accession #Subject Taxonomy ID #Identity percentage #Coverage #evalue #bitscore #Source #Taxonomy | |
| 269 read1(100) subject1 95.0 100 50 75 1e-50 200 database1 Viridiplantae / Streptophyta / Magnoliopsida / Fagales / Juglandaceae / Uncertain taxa / Uncertain taxa | |
| 270 read1(100) subject2 90.0 95 45 70 1e-5 180 database1 Viridiplantae / Streptophyta / Magnoliopsida / Rosales / Rosaceae / Sorbus / Sorbus aucuparia | |
| 271 read2(50) subject3 85.0 90 40 65 1e-3 160 database2 Viridiplantae / Streptophyta / Magnoliopsida / Solanales / Solanaceae / Uncertain taxa / Uncertain taxa | |
| 272 """ | |
| 273 | |
| 274 fasta_content = """>read1(100) count=100; | |
| 275 ATCG | |
| 276 >read2(50) count=50; | |
| 277 GCTA | |
| 278 """ | |
| 279 | |
| 280 blast_file = "Stage_1_translated/NLOOR_scripts/process_annotations_tool/test-data/sorted_test.tabular" | |
| 281 fasta_file = "Stage_1_translated/NLOOR_scripts/process_annotations_tool/test-data/sorted_test.fasta" | |
| 282 | |
| 283 with open(blast_file, 'w') as f: | |
| 284 f.write(blast_content_mixed) | |
| 285 with open(fasta_file, 'w') as f: | |
| 286 f.write(fasta_content) | |
| 287 | |
| 288 # Process with strict e-value threshold | |
| 289 class Args: | |
| 290 def __init__(self): | |
| 291 self.input_anno = str(blast_file) | |
| 292 self.input_unanno = str(fasta_file) | |
| 293 self.header_anno = str(output_dir / "evalue_test.xlsx") | |
| 294 self.eval_plot = None | |
| 295 self.taxa_output = None | |
| 296 self.circle_data = None | |
| 297 self.anno_stats = None | |
| 298 self.uncertain_threshold = 0.9 | |
| 299 self.eval_threshold = 1e-10 # Very strict threshold | |
| 300 self.use_counts = True | |
| 301 | |
| 302 args = Args() | |
| 303 process_single_file(args.input_anno, args.input_unanno, args) | |
| 304 | |
| 305 # Check that only the 1e-50 hit remains | |
| 306 df = pd.read_excel(args.header_anno, sheet_name='Individual_Reads') | |
| 307 | |
| 308 # Should only have read1 (with 1e-50), read2 should be filtered out | |
| 309 assert len(df) == 1, f"Expected 1 read after filtering, got {len(df)}" | |
| 310 assert df.iloc[0]['e_value'] == pytest.approx(1e-50, rel=1e-8, abs=1e-12), "Wrong hit survived e-value filtering" | |
| 311 | |
| 312 print("✓ Test 6 PASSED: E-value filtering working correctly") | |
| 313 | |
| 314 def test_header_synchronization(self, test_data_dir): | |
| 315 """ | |
| 316 Test 7: Header Synchronization Between Files | |
| 317 | |
| 318 Tests that the processor correctly handles mismatched headers between | |
| 319 annotated and unannotated files. | |
| 320 """ | |
| 321 input_dir = test_data_dir / "input" | |
| 322 output_dir = test_data_dir / "output" | |
| 323 | |
| 324 # Create mismatched files | |
| 325 blast_content = """#Query ID #Subject #Subject accession #Subject Taxonomy ID #Identity percentage #Coverage #evalue #bitscore #Source #Taxonomy | |
| 326 read1(100) source=NCBI sequenceID=KR738003 superkingdom=Eukaryota kingdom=Viridiplantae phylum=Streptophyta subphylum=Streptophytina class=Magnoliopsida subclass=NA infraclass=NA order=Malvales suborder=NA infraorder=NA superfamily=NA family=Malvaceae genus=Hibiscus species=Hibiscus trionum markercode=trnL lat=0.304 lon=36.87 source=NCBI N/A 100.000 100 7.35e-30 54.7 Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum | |
| 327 read1(100) source=NCBI sequenceID=KR738670 superkingdom=Eukaryota kingdom=Viridiplantae phylum=Streptophyta subphylum=Streptophytina class=Magnoliopsida subclass=NA infraclass=NA order=Malvales suborder=NA infraorder=NA superfamily=NA family=Malvaceae genus=Hibiscus species=Hibiscus trionum markercode=trnL lat=0.304 lon=36.87 source=NCBI N/A 100.000 100 7.35e-14 54.7 Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum | |
| 328 read3(25) source=NCBI sequenceID=KR737595 superkingdom=Eukaryota kingdom=Viridiplantae phylum=Streptophyta subphylum=Streptophytina class=Magnoliopsida subclass=NA infraclass=NA order=Malvales suborder=NA infraorder=NA superfamily=NA family=Malvaceae genus=Hibiscus species=Hibiscus trionum markercode=trnL lat=0.304 lon=36.87 source=NCBI N/A 97.561 87 1.68e-14 71.3 Viridiplantae / Streptophyta / Magnoliopsida / Malvales / Malvaceae / Hibiscus / Hibiscus trionum | |
| 329 """ | |
| 330 | |
| 331 fasta_content = """>read1(100) count=100; | |
| 332 ATCG | |
| 333 >read2(50) merged_sample={}; count=1011; direction=right; seq_b_insertion=0; sminR=40.0; ali_length=53; seq_b_deletion=248; seq_a_deletion=248; seq_a_insertion=0; mode=alignment; sminL=40.0; seq_a_single=0; seq_b_single=0; | |
| 334 gggcaatcctgagccaagtgactggagttcagataggtgcagagactcaatgg | |
| 335 >read3(25) merged_sample={}; count=179; direction=right; sminR=40.0; ali_length=49; seq_b_deletion=252; seq_a_deletion=252; seq_b_insertion=0; seq_a_insertion=0; mode=alignment; sminL=40.0; seq_a_single=0; seq_b_single=0; | |
| 336 gggcaatcctgagccaactggagttcagataggtgcagagactcaatgg | |
| 337 """ | |
| 338 | |
| 339 blast_file = input_dir / "test_sync.tabular" | |
| 340 fasta_file = input_dir / "test_sync.fasta" | |
| 341 | |
| 342 with open(blast_file, 'w') as f: | |
| 343 f.write(blast_content) | |
| 344 with open(fasta_file, 'w') as f: | |
| 345 f.write(fasta_content) | |
| 346 | |
| 347 class Args: | |
| 348 def __init__(self): | |
| 349 self.input_anno = blast_file | |
| 350 self.input_unanno = fasta_file | |
| 351 self.header_anno = "Stage_1_translated/NLOOR_scripts/process_annotations_tool/test-data/sync_test.xlsx" | |
| 352 self.eval_plot = None | |
| 353 self.taxa_output = None | |
| 354 self.circle_data = None | |
| 355 self.anno_stats = str(output_dir / "sync_stats.txt") | |
| 356 self.uncertain_threshold = 0.9 | |
| 357 self.eval_threshold = 1e-10 | |
| 358 self.use_counts = True | |
| 359 | |
| 360 args = Args() | |
| 361 process_single_file(args.input_anno, args.input_unanno, args) | |
| 362 # Check that processing handled the mismatch correctly | |
| 363 df = pd.read_excel(args.header_anno, sheet_name='Individual_Reads') | |
| 364 extracted = df['header'].str.extract(r'(read\d+)') | |
| 365 # final list | |
| 366 headers = extracted[0].tolist() | |
| 367 # Should have read1 and read3, read2 should be skipped | |
| 368 assert 'read1' in headers, "read1 should be present" | |
| 369 assert 'read3' in headers, "read3 should be present" | |
| 370 | |
| 371 print("✓ Test 7 PASSED: Header synchronization handled correctly") | |
| 372 | |
| 373 def test_excel_merged_vs_individual(self, processed_output): | |
| 374 """ | |
| 375 Test 8: Excel Merged vs Individual Sheet Consistency | |
| 376 | |
| 377 Verifies that the merged sheet correctly aggregates data from the individual sheet. | |
| 378 """ | |
| 379 df_individual = pd.read_excel(processed_output.header_anno, sheet_name='Individual_Reads') | |
| 380 df_merged = pd.read_excel(processed_output.header_anno, sheet_name='Merged_by_Taxa') | |
| 381 | |
| 382 # Count unique taxa in individual sheet | |
| 383 individual_taxa = df_individual['taxa'].nunique() | |
| 384 | |
| 385 # Should match number of rows in merged sheet | |
| 386 assert len(df_merged) == individual_taxa, "Merged sheet doesn't match unique taxa count" | |
| 387 | |
| 388 # Check that counts are properly aggregated | |
| 389 # For taxa with multiple reads, counts should be summed | |
| 390 for _, merged_row in df_merged.iterrows(): | |
| 391 taxa = merged_row['taxa'] | |
| 392 individual_rows = df_individual[df_individual['taxa'] == taxa] | |
| 393 | |
| 394 expected_count = individual_rows['count'].sum() | |
| 395 actual_count = merged_row['count'] | |
| 396 | |
| 397 assert actual_count == expected_count, f"Count mismatch for taxa {taxa}: expected {expected_count}, got {actual_count}" | |
| 398 | |
| 399 print("✓ Test 8 PASSED: Excel merged sheet correctly aggregates individual data") | |
| 400 | |
| 401 def test_annotation_statistics_accuracy(self, processed_output, sample_files): | |
| 402 """ | |
| 403 Test 9: Annotation Statistics Accuracy | |
| 404 | |
| 405 Verifies that calculated annotation statistics match the actual data. | |
| 406 """ | |
| 407 # Read stats file | |
| 408 stats = {} | |
| 409 with open(processed_output.anno_stats, 'r') as f: | |
| 410 lines = f.readlines()[1:] # Skip header | |
| 411 for line in lines: | |
| 412 key, value = line.strip().split('\t') | |
| 413 try: | |
| 414 stats[key] = float(value) | |
| 415 except ValueError: | |
| 416 stats[key] = value | |
| 417 | |
| 418 # Manual verification | |
| 419 assert stats['total_sequences'] == 4.0, "Total sequences count incorrect" | |
| 420 assert stats['annotated_sequences'] == 3.0, "Annotated sequences count incorrect" | |
| 421 assert stats['total_unique'] == 200, "Total unique count incorrect" | |
| 422 assert stats['unique_annotated'] == 150, "Unique annotated count incorrect" | |
| 423 assert stats['percentage_annotated'] == 75.0, "Percentage annotated incorrect" | |
| 424 assert stats['percentage_unique_annotated'] == 75.0, "Percentage unique annotated incorrect" | |
| 425 | |
| 426 print("✓ Test 9 PASSED: Annotation statistics are accurate") | |
| 427 | |
| 428 | |
| 429 | |
| 430 if __name__ == "__main__": | |
| 431 # Run all tests in this file | |
| 432 pytest.main([__file__]) |
