comparison COBRAxy/utils/general_utils.py @ 456:a6e45049c1b9 draft default tip

Uploaded
author francesco_lapi
date Fri, 12 Sep 2025 17:28:45 +0000
parents 4a385fdb9e58
children
comparison
equal deleted inserted replaced
455:4e2bc80764b6 456:a6e45049c1b9
1 """
2 General utilities for COBRAxy.
3
4 This module provides:
5 - File and path helpers (FileFormat, FilePath)
6 - Error and result handling utilities (CustomErr, Result)
7 - Basic I/O helpers (CSV/TSV, pickle, SVG)
8 - Lightweight CLI argument parsers (Bool, Float)
9 - Model loader utilities for COBRA models, including compressed formats
10 """
1 import math 11 import math
2 import re 12 import re
3 import sys 13 import sys
4 import csv 14 import csv
5 import pickle 15 import pickle
6 import lxml.etree as ET 16 import lxml.etree as ET
7 17
8 from enum import Enum 18 from enum import Enum
9 from itertools import count 19 from itertools import count
10 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Set, Tuple 20 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Tuple
11 21
12 import pandas as pd 22 import pandas as pd
13 import cobra 23 import cobra
14 from cobra import Model as cobraModel, Reaction, Metabolite
15 24
16 import zipfile 25 import zipfile
17 import gzip 26 import gzip
18 import bz2 27 import bz2
19 from io import StringIO 28 from io import StringIO
20 29
21 30
22 31 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Tuple
23 class ValueErr(Exception): 32 class ValueErr(Exception):
24 def __init__(self, param_name, expected, actual): 33 def __init__(self, param_name, expected, actual):
25 super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}") 34 super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}")
26 35
27 class PathErr(Exception): 36 class PathErr(Exception):
30 39
31 class FileFormat(Enum): 40 class FileFormat(Enum):
32 """ 41 """
33 Encodes possible file extensions to conditionally save data in a different format. 42 Encodes possible file extensions to conditionally save data in a different format.
34 """ 43 """
35 DAT = ("dat",) # this is how galaxy treats all your files! 44 DAT = ("dat",)
36 CSV = ("csv",) # this is how most editable input data is written 45 CSV = ("csv",)
37 TSV = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!! 46 TSV = ("tsv",)
38 SVG = ("svg",) # this is how most metabolic maps are written 47 SVG = ("svg",)
39 PNG = ("png",) # this is a common output format for images (such as metabolic maps) 48 PNG = ("png",)
40 PDF = ("pdf",) # this is also a common output format for images, as it's required in publications. 49 PDF = ("pdf",)
41 50
42 # Updated to include compressed variants 51 # Compressed variants for common model formats
43 XML = ("xml", "xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed 52 XML = ("xml", "xml.gz", "xml.zip", "xml.bz2")
44 JSON = ("json", "json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed 53 JSON = ("json", "json.gz", "json.zip", "json.bz2")
45 MAT = ("mat", "mat.gz", "mat.zip", "mat.bz2") # COBRA models can be stored as MAT files, sometimes compressed 54 MAT = ("mat", "mat.gz", "mat.zip", "mat.bz2")
46 YML = ("yml", "yml.gz", "yml.zip", "yml.bz2") # COBRA models can be stored as YML files, sometimes compressed 55 YML = ("yml", "yml.gz", "yml.zip", "yml.bz2")
47 56
48 TXT = ("txt",) # this is how most output data is written 57 TXT = ("txt",)
49 PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved 58 PICKLE = ("pickle", "pk", "p")
50 59
51 def __init__(self, *extensions): 60 def __init__(self, *extensions):
52 self.extensions = extensions 61 self.extensions = extensions
53 # Store original extension when set via fromExt 62 # Store original extension when set via fromExt
54 self._original_extension = None 63 self._original_extension = None
81 """ 90 """
82 (Private) converts to str representation. Good practice for usage with argparse. 91 (Private) converts to str representation. Good practice for usage with argparse.
83 Returns: 92 Returns:
84 str : the string representation of the file extension. 93 str : the string representation of the file extension.
85 """ 94 """
86 # If we have an original extension stored (for compressed files only), use it
87 if hasattr(self, '_original_extension') and self._original_extension: 95 if hasattr(self, '_original_extension') and self._original_extension:
88 return self._original_extension 96 return self._original_extension
89 97
90 # For XML, JSON, MAT and YML without original extension, use the base extension
91 if self == FileFormat.XML: 98 if self == FileFormat.XML:
92 return "xml" 99 return "xml"
93 elif self == FileFormat.JSON: 100 elif self == FileFormat.JSON:
94 return "json" 101 return "json"
95 elif self == FileFormat.MAT: 102 elif self == FileFormat.MAT:
99 106
100 return self.value[-1] 107 return self.value[-1]
101 108
102 class FilePath(): 109 class FilePath():
103 """ 110 """
104 Represents a file path. View this as an attempt to standardize file-related operations by expecting 111 Represents a file path with format-aware helpers.
105 values of this type in any process requesting a file path.
106 """ 112 """
107 def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None: 113 def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None:
108 """ 114 """
109 (Private) Initializes an instance of FilePath. 115 Initialize FilePath.
110 Args: 116 Args:
111 path : the end of the path, containing the file name. 117 path: File name stem.
112 ext : the file's extension. 118 ext: File extension (FileFormat).
113 prefix : anything before path, if the last '/' isn't there it's added by the code. 119 prefix: Optional directory path (trailing '/' auto-added).
114 Returns:
115 None : practically, a FilePath instance.
116 """ 120 """
117 self.ext = ext 121 self.ext = ext
118 self.filePath = filePath 122 self.filePath = filePath
119 123
120 if prefix and prefix[-1] != '/': 124 if prefix and prefix[-1] != '/':
122 self.prefix = prefix 126 self.prefix = prefix
123 127
124 @classmethod 128 @classmethod
125 def fromStrPath(cls, path: str) -> "FilePath": 129 def fromStrPath(cls, path: str) -> "FilePath":
126 """ 130 """
127 Factory method to parse a string from which to obtain, if possible, a valid FilePath instance. 131 Parse a string path into a FilePath, supporting double extensions for models (e.g., .json.gz).
128 It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models.
129 These double extensions are not supported for other file types such as .csv.
130 Args: 132 Args:
131 path : the string containing the path 133 path : the string containing the path
132 Raises: 134 Raises:
133 PathErr : if the provided string doesn't represent a valid path. 135 PathErr : if the provided string doesn't represent a valid path.
134 Returns: 136 Returns:
139 raise PathErr(path, "cannot recognize folder structure or extension in path") 141 raise PathErr(path, "cannot recognize folder structure or extension in path")
140 142
141 prefix = result["prefix"] if result["prefix"] else "" 143 prefix = result["prefix"] if result["prefix"] else ""
142 name, ext = result["name"], result["ext"] 144 name, ext = result["name"], result["ext"]
143 145
144 # Check for double extensions (json.gz, xml.zip, etc.)
145 parts = path.split(".") 146 parts = path.split(".")
146 if len(parts) >= 3: 147 if len(parts) >= 3:
147 penultimate = parts[-2] 148 penultimate = parts[-2]
148 last = parts[-1] 149 last = parts[-1]
149 double_ext = f"{penultimate}.{last}" 150 double_ext = f"{penultimate}.{last}"
150 151
151 # Try the double extension first
152 try: 152 try:
153 ext_format = FileFormat.fromExt(double_ext) 153 ext_format = FileFormat.fromExt(double_ext)
154 name = ".".join(parts[:-2]) 154 name = ".".join(parts[:-2])
155 # Extract prefix if it exists
156 if '/' in name: 155 if '/' in name:
157 prefix = name[:name.rfind('/') + 1] 156 prefix = name[:name.rfind('/') + 1]
158 name = name[name.rfind('/') + 1:] 157 name = name[name.rfind('/') + 1:]
159 return cls(name, ext_format, prefix=prefix) 158 return cls(name, ext_format, prefix=prefix)
160 except ValueErr: 159 except ValueErr:
161 # If double extension doesn't work, fall back to single extension
162 pass 160 pass
163 161
164 # Single extension fallback (original logic)
165 try: 162 try:
166 ext_format = FileFormat.fromExt(ext) 163 ext_format = FileFormat.fromExt(ext)
167 return cls(name, ext_format, prefix=prefix) 164 return cls(name, ext_format, prefix=prefix)
168 except ValueErr: 165 except ValueErr:
169 raise PathErr(path, f"unsupported file extension: {ext}") 166 raise PathErr(path, f"unsupported file extension: {ext}")
196 """ 193 """
197 Log a warning message to an output log file and print it to the console. The final period and a 194 Log a warning message to an output log file and print it to the console. The final period and a
198 newline is added by the function. 195 newline is added by the function.
199 196
200 Args: 197 Args:
201 s (str): The warning message to be logged and printed. 198 msg (str): The warning message to be logged and printed.
202 loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and 199 loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and
203 immediately read back (beware relative expensive operation, log with caution). 200 immediately read back (beware relative expensive operation, log with caution).
204 201
205 Returns: 202 Returns:
206 None 203 None
207 """ 204 """
208 # building the path and then reading it immediately seems useless, but it's actually a way of 205 # Note: validates path via FilePath; keep logging minimal to avoid overhead.
209 # validating that reduces repetition on the caller's side. Besides, logging a message by writing
210 # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from
211 # mindlessly logging whenever something comes up, log at the very end and tell the user everything
212 # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to
213 # the file only at the end of the program's execution.
214 with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n") 206 with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n")
215 207
216 class CustomErr(Exception): 208 class CustomErr(Exception):
217 """ 209 """
218 Custom error class to handle exceptions in a structured way, with a unique identifier and a message. 210 Custom error class to handle exceptions in a structured way, with a unique identifier and a message.
236 228
237 self.id = max(explicitErrCode, next(CustomErr.__idGenerator)) 229 self.id = max(explicitErrCode, next(CustomErr.__idGenerator))
238 230
239 def throw(self, loggerPath = "") -> None: 231 def throw(self, loggerPath = "") -> None:
240 """ 232 """
241 Raises the current CustomErr instance, logging a warning message before doing so. 233 Raises the current CustomErr instance, optionally logging it first.
234
235 Args:
236 loggerPath (str): Optional path to a log file to append this error before raising.
242 237
243 Raises: 238 Raises:
244 self: The current CustomErr instance. 239 self: The current CustomErr instance.
245 240
246 Returns: 241 Returns:
247 None 242 None
248 """ 243 """
249 if loggerPath: logWarning(str(self), loggerPath) 244 if loggerPath:
245 logWarning(str(self), loggerPath)
250 raise self 246 raise self
251 247
252 def abort(self) -> None: 248 def abort(self) -> None:
253 """ 249 """
254 Aborts the execution of the script. 250 Aborts the execution of the script.
314 Class to handle the result of an operation, with a value and a boolean flag to indicate 310 Class to handle the result of an operation, with a value and a boolean flag to indicate
315 whether the operation was successful or not. 311 whether the operation was successful or not.
316 """ 312 """
317 def __init__(self, value :Union[T, E], isOk :bool) -> None: 313 def __init__(self, value :Union[T, E], isOk :bool) -> None:
318 """ 314 """
319 (Private) Initializes an instance of Result. 315 Initialize an instance of Result.
320 316
321 Args: 317 Args:
322 value (Union[T, E]): The value to be stored in the Result instance. 318 value (Union[T, E]): The value to be stored in the Result instance.
323 isOk (bool): A boolean flag to indicate whether the operation was successful or not. 319 isOk (bool): A boolean flag to indicate whether the operation was successful or not.
324 320
330 self.value = value 326 self.value = value
331 327
332 @classmethod 328 @classmethod
333 def Ok(cls, value :T) -> "Result": 329 def Ok(cls, value :T) -> "Result":
334 """ 330 """
335 Constructs a new Result instance with a successful operation. 331 Construct a successful Result.
336 332
337 Args: 333 Args:
338 value (T): The value to be stored in the Result instance, set as successful. 334 value (T): The value to be stored in the Result instance, set as successful.
339 335
340 Returns: 336 Returns:
343 return Result(value, isOk = True) 339 return Result(value, isOk = True)
344 340
345 @classmethod 341 @classmethod
346 def Err(cls, value :E) -> "Result": 342 def Err(cls, value :E) -> "Result":
347 """ 343 """
348 Constructs a new Result instance with a failed operation. 344 Construct a failed Result.
349 345
350 Args: 346 Args:
351 value (E): The value to be stored in the Result instance, set as failed. 347 value (E): The value to be stored in the Result instance, set as failed.
352 348
353 Returns: 349 Returns:
435 431
436 def __str__(self): 432 def __str__(self):
437 return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})" 433 return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})"
438 434
439 # FILES 435 # FILES
440 def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame:
441 """
442 Reads a .csv or .tsv file and returns it as a Pandas DataFrame.
443
444 Args:
445 path : the path to the dataset file.
446 datasetName : the name of the dataset.
447
448 Raises:
449 DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if
450 it has less than 2 columns.
451
452 Returns:
453 pandas.DataFrame: The dataset loaded as a Pandas DataFrame.
454 """
455 # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than
456 # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code.
457 # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really
458 # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and
459 # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is
460 # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions.
461 try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python")
462 except:
463 try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python")
464 except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}")
465
466 if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns")
467 return dataset
468
469 def readPickle(path :FilePath) -> Any: 436 def readPickle(path :FilePath) -> Any:
470 """ 437 """
471 Reads the contents of a .pickle file, which needs to exist at the given path. 438 Reads the contents of a .pickle file, which needs to exist at the given path.
472 439
473 Args: 440 Args:
568 """ 535 """
569 with open(path.show(), "wb") as fd: fd.write(ET.tostring(data)) 536 with open(path.show(), "wb") as fd: fd.write(ET.tostring(data))
570 537
571 # UI ARGUMENTS 538 # UI ARGUMENTS
572 class Bool: 539 class Bool:
540 """Simple boolean CLI argument parser accepting 'true' or 'false' (case-insensitive)."""
573 def __init__(self, argName :str) -> None: 541 def __init__(self, argName :str) -> None:
574 self.argName = argName 542 self.argName = argName
575 543
576 def __call__(self, s :str) -> bool: return self.check(s) 544 def __call__(self, s :str) -> bool: return self.check(s)
577 545
580 if s == "true" : return True 548 if s == "true" : return True
581 if s == "false": return False 549 if s == "false": return False
582 raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"") 550 raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"")
583 551
584 class Float: 552 class Float:
553 """Float CLI argument parser supporting NaN and None keywords (case-insensitive)."""
585 def __init__(self, argName = "Dataset values, not an argument") -> None: 554 def __init__(self, argName = "Dataset values, not an argument") -> None:
586 self.argName = argName 555 self.argName = argName
587 556
588 def __call__(self, s :str) -> float: return self.check(s) 557 def __call__(self, s :str) -> float: return self.check(s)
589 558
605 Recon = "Recon" 574 Recon = "Recon"
606 ENGRO2 = "ENGRO2" 575 ENGRO2 = "ENGRO2"
607 ENGRO2_no_legend = "ENGRO2_no_legend" 576 ENGRO2_no_legend = "ENGRO2_no_legend"
608 HMRcore = "HMRcore" 577 HMRcore = "HMRcore"
609 HMRcore_no_legend = "HMRcore_no_legend" 578 HMRcore_no_legend = "HMRcore_no_legend"
610 Custom = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths. 579 Custom = "Custom"
611 580
612 def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None: 581 def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None:
613 if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model") 582 if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model")
614 583
615 def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]: 584 def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]:
633 path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") 602 path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
634 self.__raiseMissingPathErr(path) 603 self.__raiseMissingPathErr(path)
635 return readPickle(path) 604 return readPickle(path)
636 605
637 def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree: 606 def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree:
607 """Open the SVG metabolic map for this model."""
638 path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/") 608 path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/")
639 self.__raiseMissingPathErr(path) 609 self.__raiseMissingPathErr(path)
640 return readSvg(path, customErr = DataErr(path, f"custom map in wrong format")) 610 return readSvg(path, customErr = DataErr(path, f"custom map in wrong format"))
641 611
642 def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model: 612 def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model:
613 """Load the COBRA model for this enum variant (supports Custom with explicit path/extension)."""
643 if(self is Model.Custom): 614 if(self is Model.Custom):
644 return self.load_custom_model(customPath, customExtension) 615 return self.load_custom_model(customPath, customExtension)
645 else: 616 else:
646 return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show()) 617 return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show())
647 618
648 def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model: 619 def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model:
620 """Load a COBRA model from a custom path, supporting XML, JSON, MAT, and YML (compressed or not)."""
649 ext = ext if ext else file_path.ext 621 ext = ext if ext else file_path.ext
650 try: 622 try:
651 if str(ext) in FileFormat.XML.value: 623 if str(ext) in FileFormat.XML.value:
652 return cobra.io.read_sbml_model(file_path.show()) 624 return cobra.io.read_sbml_model(file_path.show())
653 625