Mercurial > repos > bimib > cobraxy
comparison COBRAxy/src/test/test_marea.py @ 539:2fb97466e404 draft
Uploaded
| author | francesco_lapi |
|---|---|
| date | Sat, 25 Oct 2025 14:55:13 +0000 |
| parents | |
| children | 7d5b35c715e8 |
comparison
equal
deleted
inserted
replaced
| 538:fd53d42348bd | 539:2fb97466e404 |
|---|---|
| 1 """ | |
| 2 Unit tests for MAREA, flux_simulation, and related visualization modules. | |
| 3 | |
| 4 Run with: python -m pytest test_marea.py -v | |
| 5 Or: python test_marea.py | |
| 6 """ | |
| 7 | |
| 8 import sys | |
| 9 import os | |
| 10 import pandas as pd | |
| 11 import numpy as np | |
| 12 import tempfile | |
| 13 from pathlib import Path | |
| 14 | |
| 15 # Try to import pytest, but don't fail if not available | |
| 16 try: | |
| 17 import pytest | |
| 18 HAS_PYTEST = True | |
| 19 except ImportError: | |
| 20 HAS_PYTEST = False | |
| 21 class _DummyPytest: | |
| 22 class raises: | |
| 23 def __init__(self, *args, **kwargs): | |
| 24 self.expected_exceptions = args | |
| 25 def __enter__(self): | |
| 26 return self | |
| 27 def __exit__(self, exc_type, exc_val, exc_tb): | |
| 28 if exc_type is None: | |
| 29 raise AssertionError("Expected an exception but none was raised") | |
| 30 if not any(issubclass(exc_type, e) for e in self.expected_exceptions): | |
| 31 return False | |
| 32 return True | |
| 33 pytest = _DummyPytest() | |
| 34 | |
| 35 # Add parent directory to path | |
| 36 sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) | |
| 37 | |
| 38 import marea | |
| 39 import flux_simulation | |
| 40 import flux_to_map | |
| 41 import ras_to_bounds | |
| 42 import utils.general_utils as utils | |
| 43 | |
| 44 # Get the tool directory | |
| 45 TOOL_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) | |
| 46 | |
| 47 | |
| 48 class TestMAREA: | |
| 49 """Tests for marea module""" | |
| 50 | |
| 51 def test_process_args(self): | |
| 52 """Test argument processing for MAREA""" | |
| 53 # Create minimal args for testing | |
| 54 with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f: | |
| 55 f.write("reaction_id,value\n") | |
| 56 f.write("r1,1.5\n") | |
| 57 temp_file = f.name | |
| 58 | |
| 59 try: | |
| 60 args = marea.process_args([ | |
| 61 '-td', TOOL_DIR, | |
| 62 '--tool_dir', TOOL_DIR | |
| 63 ]) | |
| 64 assert hasattr(args, 'tool_dir') | |
| 65 assert args.tool_dir == TOOL_DIR | |
| 66 finally: | |
| 67 if os.path.exists(temp_file): | |
| 68 os.unlink(temp_file) | |
| 69 | |
| 70 def test_comparison_types(self): | |
| 71 """Test that comparison type enum exists and is correct""" | |
| 72 # Check that the ComparisonType enum has expected values | |
| 73 assert hasattr(marea, 'ComparisonType') or hasattr(marea, 'GroupingCriterion') | |
| 74 | |
| 75 def test_ras_transformation(self): | |
| 76 """Test RAS transformation logic""" | |
| 77 # Create sample RAS data | |
| 78 ras_data = pd.DataFrame({ | |
| 79 'reaction': ['r1', 'r2', 'r3'], | |
| 80 'value': [1.5, 0.5, 2.0] | |
| 81 }) | |
| 82 | |
| 83 # Test that data can be processed | |
| 84 assert len(ras_data) == 3 | |
| 85 assert ras_data['value'].max() == 2.0 | |
| 86 | |
| 87 | |
| 88 class TestFluxSimulation: | |
| 89 """Tests for flux_simulation module""" | |
| 90 | |
| 91 def test_process_args(self): | |
| 92 """Test argument processing for flux simulation""" | |
| 93 args = flux_simulation.process_args([ | |
| 94 '-td', TOOL_DIR | |
| 95 ]) | |
| 96 assert hasattr(args, 'tool_dir') | |
| 97 | |
| 98 def test_flux_balance_setup(self): | |
| 99 """Test that FBA setup functions exist""" | |
| 100 # Check that key functions exist | |
| 101 assert hasattr(flux_simulation, 'process_args') | |
| 102 assert hasattr(flux_simulation, 'main') | |
| 103 | |
| 104 | |
| 105 class TestFluxToMap: | |
| 106 """Tests for flux_to_map module""" | |
| 107 | |
| 108 def test_process_args(self): | |
| 109 """Test argument processing for flux to map""" | |
| 110 args = flux_to_map.process_args([ | |
| 111 '-td', TOOL_DIR | |
| 112 ]) | |
| 113 assert hasattr(args, 'tool_dir') | |
| 114 | |
| 115 def test_color_map_options(self): | |
| 116 """Test that color map options are available""" | |
| 117 # The module should have color map functionality | |
| 118 assert hasattr(flux_to_map, 'process_args') | |
| 119 | |
| 120 | |
| 121 class TestRasToBounds: | |
| 122 """Tests for ras_to_bounds module""" | |
| 123 | |
| 124 def test_process_args(self): | |
| 125 """Test argument processing for RAS to bounds""" | |
| 126 args = ras_to_bounds.process_args([ | |
| 127 '-td', TOOL_DIR | |
| 128 ]) | |
| 129 assert hasattr(args, 'tool_dir') | |
| 130 | |
| 131 def test_bounds_conversion(self): | |
| 132 """Test that bounds conversion logic exists""" | |
| 133 # Create sample RAS data | |
| 134 ras_data = { | |
| 135 'r1': 1.5, | |
| 136 'r2': 0.5, | |
| 137 'r3': 2.0 | |
| 138 } | |
| 139 | |
| 140 # Test basic transformation logic | |
| 141 # Reactions with higher RAS should have higher bounds | |
| 142 assert ras_data['r3'] > ras_data['r1'] > ras_data['r2'] | |
| 143 | |
| 144 | |
| 145 class TestModelConversion: | |
| 146 """Tests for model conversion tools""" | |
| 147 | |
| 148 def test_tabular_to_model(self): | |
| 149 """Test tabular to model conversion""" | |
| 150 import tabular2MetabolicModel | |
| 151 | |
| 152 args = tabular2MetabolicModel.process_args([]) | |
| 153 assert hasattr(args, 'tool_dir') | |
| 154 | |
| 155 def test_model_to_tabular(self): | |
| 156 """Test model to tabular conversion""" | |
| 157 import metabolicModel2Tabular | |
| 158 | |
| 159 args = metabolicModel2Tabular.process_args([]) | |
| 160 assert hasattr(args, 'tool_dir') | |
| 161 | |
| 162 | |
| 163 class TestDataProcessing: | |
| 164 """Tests for data processing utilities used across tools""" | |
| 165 | |
| 166 def test_ras_data_format(self): | |
| 167 """Test RAS data format validation""" | |
| 168 # Create valid RAS data | |
| 169 ras_df = pd.DataFrame({ | |
| 170 'reaction_id': ['r1', 'r2', 'r3'], | |
| 171 'group1': [1.5, 0.5, 2.0], | |
| 172 'group2': [1.8, 0.3, 2.2] | |
| 173 }) | |
| 174 | |
| 175 assert 'reaction_id' in ras_df.columns | |
| 176 assert len(ras_df) > 0 | |
| 177 | |
| 178 def test_rps_data_format(self): | |
| 179 """Test RPS data format validation""" | |
| 180 # Create valid RPS data | |
| 181 rps_df = pd.DataFrame({ | |
| 182 'reaction_id': ['r1', 'r2', 'r3'], | |
| 183 'sample1': [100.5, 50.3, 200.1], | |
| 184 'sample2': [150.2, 30.8, 250.5] | |
| 185 }) | |
| 186 | |
| 187 assert 'reaction_id' in rps_df.columns | |
| 188 assert len(rps_df) > 0 | |
| 189 | |
| 190 def test_flux_data_format(self): | |
| 191 """Test flux data format validation""" | |
| 192 # Create valid flux data | |
| 193 flux_df = pd.DataFrame({ | |
| 194 'reaction_id': ['r1', 'r2', 'r3'], | |
| 195 'flux': [1.5, -0.5, 2.0], | |
| 196 'lower_bound': [-10, -10, 0], | |
| 197 'upper_bound': [10, 10, 10] | |
| 198 }) | |
| 199 | |
| 200 assert 'reaction_id' in flux_df.columns | |
| 201 assert 'flux' in flux_df.columns | |
| 202 | |
| 203 | |
| 204 class TestStatistics: | |
| 205 """Tests for statistical operations in MAREA""" | |
| 206 | |
| 207 def test_fold_change_calculation(self): | |
| 208 """Test fold change calculation""" | |
| 209 # Simple fold change test | |
| 210 group1_mean = 2.0 | |
| 211 group2_mean = 4.0 | |
| 212 fold_change = group2_mean / group1_mean | |
| 213 | |
| 214 assert fold_change == 2.0 | |
| 215 | |
| 216 def test_log_fold_change(self): | |
| 217 """Test log fold change calculation""" | |
| 218 group1_mean = 2.0 | |
| 219 group2_mean = 8.0 | |
| 220 log_fc = np.log2(group2_mean / group1_mean) | |
| 221 | |
| 222 assert log_fc == 2.0 # log2(8/2) = log2(4) = 2 | |
| 223 | |
| 224 def test_pvalue_correction(self): | |
| 225 """Test that statistical functions handle edge cases""" | |
| 226 # Test with identical values (should give p-value close to 1) | |
| 227 group1 = [1.0, 1.0, 1.0] | |
| 228 group2 = [1.0, 1.0, 1.0] | |
| 229 | |
| 230 from scipy import stats | |
| 231 t_stat, p_value = stats.ttest_ind(group1, group2) | |
| 232 | |
| 233 # p-value should be NaN or close to 1 for identical groups | |
| 234 assert np.isnan(p_value) or p_value > 0.9 | |
| 235 | |
| 236 | |
| 237 class TestMapVisualization: | |
| 238 """Tests for SVG map visualization""" | |
| 239 | |
| 240 def test_svg_maps_exist(self): | |
| 241 """Test that SVG maps exist""" | |
| 242 map_dir = os.path.join(TOOL_DIR, "local", "svg metabolic maps") | |
| 243 assert os.path.exists(map_dir) | |
| 244 | |
| 245 # Check for at least one map | |
| 246 maps = [f for f in os.listdir(map_dir) if f.endswith('.svg')] | |
| 247 assert len(maps) > 0, "No SVG maps found" | |
| 248 | |
| 249 def test_model_has_map(self): | |
| 250 """Test that models have associated maps""" | |
| 251 # ENGRO2 should have a map | |
| 252 engro2_map = os.path.join(TOOL_DIR, "local", "svg metabolic maps", "ENGRO2_map.svg") | |
| 253 if os.path.exists(engro2_map): | |
| 254 assert os.path.getsize(engro2_map) > 0 | |
| 255 | |
| 256 def test_color_gradient(self): | |
| 257 """Test color gradient generation""" | |
| 258 # Test that we can generate colors for a range of values | |
| 259 values = [-2.0, -1.0, 0.0, 1.0, 2.0] | |
| 260 | |
| 261 # All values should be processable | |
| 262 for val in values: | |
| 263 # Simple color mapping test | |
| 264 if val < 0: | |
| 265 # Negative values should map to one color scheme | |
| 266 assert val < 0 | |
| 267 elif val > 0: | |
| 268 # Positive values should map to another | |
| 269 assert val > 0 | |
| 270 else: | |
| 271 # Zero should be neutral | |
| 272 assert val == 0 | |
| 273 | |
| 274 | |
| 275 class TestIntegration: | |
| 276 """Integration tests for complete workflows""" | |
| 277 | |
| 278 def test_ras_to_marea_workflow(self): | |
| 279 """Test that RAS data can flow into MAREA""" | |
| 280 # Create sample RAS data | |
| 281 ras_data = pd.DataFrame({ | |
| 282 'reaction_id': ['r1', 'r2', 'r3'], | |
| 283 'control': [1.5, 0.8, 1.2], | |
| 284 'treatment': [2.0, 0.5, 1.8] | |
| 285 }) | |
| 286 | |
| 287 # Calculate fold changes | |
| 288 ras_data['fold_change'] = ras_data['treatment'] / ras_data['control'] | |
| 289 | |
| 290 assert 'fold_change' in ras_data.columns | |
| 291 assert len(ras_data) == 3 | |
| 292 | |
| 293 def test_rps_to_flux_workflow(self): | |
| 294 """Test that RPS data can be used for flux simulation""" | |
| 295 # Create sample RPS data | |
| 296 rps_data = pd.DataFrame({ | |
| 297 'reaction_id': ['r1', 'r2', 'r3'], | |
| 298 'rps': [100.0, 50.0, 200.0] | |
| 299 }) | |
| 300 | |
| 301 # RPS can be used to set bounds | |
| 302 rps_data['upper_bound'] = rps_data['rps'] / 10 | |
| 303 | |
| 304 assert 'upper_bound' in rps_data.columns | |
| 305 | |
| 306 | |
| 307 class TestErrorHandling: | |
| 308 """Tests for error handling across modules""" | |
| 309 | |
| 310 def test_invalid_model_name(self): | |
| 311 """Test handling of invalid model names""" | |
| 312 with pytest.raises((ValueError, KeyError, AttributeError)): | |
| 313 utils.Model("INVALID_MODEL") | |
| 314 | |
| 315 def test_missing_required_column(self): | |
| 316 """Test handling of missing required columns""" | |
| 317 # Create incomplete data | |
| 318 incomplete_data = pd.DataFrame({ | |
| 319 'wrong_column': [1, 2, 3] | |
| 320 }) | |
| 321 | |
| 322 # Should fail when looking for required columns | |
| 323 with pytest.raises(KeyError): | |
| 324 value = incomplete_data['reaction_id'] | |
| 325 | |
| 326 | |
| 327 if __name__ == "__main__": | |
| 328 # Run tests with pytest if available | |
| 329 if HAS_PYTEST: | |
| 330 pytest.main([__file__, "-v"]) | |
| 331 else: | |
| 332 print("pytest not available, running basic tests...") | |
| 333 | |
| 334 test_classes = [ | |
| 335 TestMAREA(), | |
| 336 TestFluxSimulation(), | |
| 337 TestFluxToMap(), | |
| 338 TestRasToBounds(), | |
| 339 TestModelConversion(), | |
| 340 TestDataProcessing(), | |
| 341 TestStatistics(), | |
| 342 TestMapVisualization(), | |
| 343 TestIntegration(), | |
| 344 TestErrorHandling() | |
| 345 ] | |
| 346 | |
| 347 failed = 0 | |
| 348 passed = 0 | |
| 349 | |
| 350 for test_class in test_classes: | |
| 351 class_name = test_class.__class__.__name__ | |
| 352 print(f"\n{class_name}:") | |
| 353 | |
| 354 for method_name in dir(test_class): | |
| 355 if method_name.startswith("test_"): | |
| 356 try: | |
| 357 method = getattr(test_class, method_name) | |
| 358 method() | |
| 359 print(f" ✓ {method_name}") | |
| 360 passed += 1 | |
| 361 except Exception as e: | |
| 362 print(f" ✗ {method_name}: {str(e)}") | |
| 363 import traceback | |
| 364 traceback.print_exc() | |
| 365 failed += 1 | |
| 366 | |
| 367 print(f"\n{'='*60}") | |
| 368 print(f"Results: {passed} passed, {failed} failed") | |
| 369 if failed > 0: | |
| 370 sys.exit(1) |
