Mercurial > repos > bimib > cobraxy
comparison COBRAxy/utils/general_utils.py @ 456:a6e45049c1b9 draft default tip
Uploaded
author | francesco_lapi |
---|---|
date | Fri, 12 Sep 2025 17:28:45 +0000 |
parents | 4a385fdb9e58 |
children |
comparison
equal
deleted
inserted
replaced
455:4e2bc80764b6 | 456:a6e45049c1b9 |
---|---|
1 """ | |
2 General utilities for COBRAxy. | |
3 | |
4 This module provides: | |
5 - File and path helpers (FileFormat, FilePath) | |
6 - Error and result handling utilities (CustomErr, Result) | |
7 - Basic I/O helpers (CSV/TSV, pickle, SVG) | |
8 - Lightweight CLI argument parsers (Bool, Float) | |
9 - Model loader utilities for COBRA models, including compressed formats | |
10 """ | |
1 import math | 11 import math |
2 import re | 12 import re |
3 import sys | 13 import sys |
4 import csv | 14 import csv |
5 import pickle | 15 import pickle |
6 import lxml.etree as ET | 16 import lxml.etree as ET |
7 | 17 |
8 from enum import Enum | 18 from enum import Enum |
9 from itertools import count | 19 from itertools import count |
10 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Set, Tuple | 20 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Tuple |
11 | 21 |
12 import pandas as pd | 22 import pandas as pd |
13 import cobra | 23 import cobra |
14 from cobra import Model as cobraModel, Reaction, Metabolite | |
15 | 24 |
16 import zipfile | 25 import zipfile |
17 import gzip | 26 import gzip |
18 import bz2 | 27 import bz2 |
19 from io import StringIO | 28 from io import StringIO |
20 | 29 |
21 | 30 |
22 | 31 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Tuple |
23 class ValueErr(Exception): | 32 class ValueErr(Exception): |
24 def __init__(self, param_name, expected, actual): | 33 def __init__(self, param_name, expected, actual): |
25 super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}") | 34 super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}") |
26 | 35 |
27 class PathErr(Exception): | 36 class PathErr(Exception): |
30 | 39 |
31 class FileFormat(Enum): | 40 class FileFormat(Enum): |
32 """ | 41 """ |
33 Encodes possible file extensions to conditionally save data in a different format. | 42 Encodes possible file extensions to conditionally save data in a different format. |
34 """ | 43 """ |
35 DAT = ("dat",) # this is how galaxy treats all your files! | 44 DAT = ("dat",) |
36 CSV = ("csv",) # this is how most editable input data is written | 45 CSV = ("csv",) |
37 TSV = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!! | 46 TSV = ("tsv",) |
38 SVG = ("svg",) # this is how most metabolic maps are written | 47 SVG = ("svg",) |
39 PNG = ("png",) # this is a common output format for images (such as metabolic maps) | 48 PNG = ("png",) |
40 PDF = ("pdf",) # this is also a common output format for images, as it's required in publications. | 49 PDF = ("pdf",) |
41 | 50 |
42 # Updated to include compressed variants | 51 # Compressed variants for common model formats |
43 XML = ("xml", "xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed | 52 XML = ("xml", "xml.gz", "xml.zip", "xml.bz2") |
44 JSON = ("json", "json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed | 53 JSON = ("json", "json.gz", "json.zip", "json.bz2") |
45 MAT = ("mat", "mat.gz", "mat.zip", "mat.bz2") # COBRA models can be stored as MAT files, sometimes compressed | 54 MAT = ("mat", "mat.gz", "mat.zip", "mat.bz2") |
46 YML = ("yml", "yml.gz", "yml.zip", "yml.bz2") # COBRA models can be stored as YML files, sometimes compressed | 55 YML = ("yml", "yml.gz", "yml.zip", "yml.bz2") |
47 | 56 |
48 TXT = ("txt",) # this is how most output data is written | 57 TXT = ("txt",) |
49 PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved | 58 PICKLE = ("pickle", "pk", "p") |
50 | 59 |
51 def __init__(self, *extensions): | 60 def __init__(self, *extensions): |
52 self.extensions = extensions | 61 self.extensions = extensions |
53 # Store original extension when set via fromExt | 62 # Store original extension when set via fromExt |
54 self._original_extension = None | 63 self._original_extension = None |
81 """ | 90 """ |
82 (Private) converts to str representation. Good practice for usage with argparse. | 91 (Private) converts to str representation. Good practice for usage with argparse. |
83 Returns: | 92 Returns: |
84 str : the string representation of the file extension. | 93 str : the string representation of the file extension. |
85 """ | 94 """ |
86 # If we have an original extension stored (for compressed files only), use it | |
87 if hasattr(self, '_original_extension') and self._original_extension: | 95 if hasattr(self, '_original_extension') and self._original_extension: |
88 return self._original_extension | 96 return self._original_extension |
89 | 97 |
90 # For XML, JSON, MAT and YML without original extension, use the base extension | |
91 if self == FileFormat.XML: | 98 if self == FileFormat.XML: |
92 return "xml" | 99 return "xml" |
93 elif self == FileFormat.JSON: | 100 elif self == FileFormat.JSON: |
94 return "json" | 101 return "json" |
95 elif self == FileFormat.MAT: | 102 elif self == FileFormat.MAT: |
99 | 106 |
100 return self.value[-1] | 107 return self.value[-1] |
101 | 108 |
102 class FilePath(): | 109 class FilePath(): |
103 """ | 110 """ |
104 Represents a file path. View this as an attempt to standardize file-related operations by expecting | 111 Represents a file path with format-aware helpers. |
105 values of this type in any process requesting a file path. | |
106 """ | 112 """ |
107 def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None: | 113 def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None: |
108 """ | 114 """ |
109 (Private) Initializes an instance of FilePath. | 115 Initialize FilePath. |
110 Args: | 116 Args: |
111 path : the end of the path, containing the file name. | 117 path: File name stem. |
112 ext : the file's extension. | 118 ext: File extension (FileFormat). |
113 prefix : anything before path, if the last '/' isn't there it's added by the code. | 119 prefix: Optional directory path (trailing '/' auto-added). |
114 Returns: | |
115 None : practically, a FilePath instance. | |
116 """ | 120 """ |
117 self.ext = ext | 121 self.ext = ext |
118 self.filePath = filePath | 122 self.filePath = filePath |
119 | 123 |
120 if prefix and prefix[-1] != '/': | 124 if prefix and prefix[-1] != '/': |
122 self.prefix = prefix | 126 self.prefix = prefix |
123 | 127 |
124 @classmethod | 128 @classmethod |
125 def fromStrPath(cls, path: str) -> "FilePath": | 129 def fromStrPath(cls, path: str) -> "FilePath": |
126 """ | 130 """ |
127 Factory method to parse a string from which to obtain, if possible, a valid FilePath instance. | 131 Parse a string path into a FilePath, supporting double extensions for models (e.g., .json.gz). |
128 It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models. | |
129 These double extensions are not supported for other file types such as .csv. | |
130 Args: | 132 Args: |
131 path : the string containing the path | 133 path : the string containing the path |
132 Raises: | 134 Raises: |
133 PathErr : if the provided string doesn't represent a valid path. | 135 PathErr : if the provided string doesn't represent a valid path. |
134 Returns: | 136 Returns: |
139 raise PathErr(path, "cannot recognize folder structure or extension in path") | 141 raise PathErr(path, "cannot recognize folder structure or extension in path") |
140 | 142 |
141 prefix = result["prefix"] if result["prefix"] else "" | 143 prefix = result["prefix"] if result["prefix"] else "" |
142 name, ext = result["name"], result["ext"] | 144 name, ext = result["name"], result["ext"] |
143 | 145 |
144 # Check for double extensions (json.gz, xml.zip, etc.) | |
145 parts = path.split(".") | 146 parts = path.split(".") |
146 if len(parts) >= 3: | 147 if len(parts) >= 3: |
147 penultimate = parts[-2] | 148 penultimate = parts[-2] |
148 last = parts[-1] | 149 last = parts[-1] |
149 double_ext = f"{penultimate}.{last}" | 150 double_ext = f"{penultimate}.{last}" |
150 | 151 |
151 # Try the double extension first | |
152 try: | 152 try: |
153 ext_format = FileFormat.fromExt(double_ext) | 153 ext_format = FileFormat.fromExt(double_ext) |
154 name = ".".join(parts[:-2]) | 154 name = ".".join(parts[:-2]) |
155 # Extract prefix if it exists | |
156 if '/' in name: | 155 if '/' in name: |
157 prefix = name[:name.rfind('/') + 1] | 156 prefix = name[:name.rfind('/') + 1] |
158 name = name[name.rfind('/') + 1:] | 157 name = name[name.rfind('/') + 1:] |
159 return cls(name, ext_format, prefix=prefix) | 158 return cls(name, ext_format, prefix=prefix) |
160 except ValueErr: | 159 except ValueErr: |
161 # If double extension doesn't work, fall back to single extension | |
162 pass | 160 pass |
163 | 161 |
164 # Single extension fallback (original logic) | |
165 try: | 162 try: |
166 ext_format = FileFormat.fromExt(ext) | 163 ext_format = FileFormat.fromExt(ext) |
167 return cls(name, ext_format, prefix=prefix) | 164 return cls(name, ext_format, prefix=prefix) |
168 except ValueErr: | 165 except ValueErr: |
169 raise PathErr(path, f"unsupported file extension: {ext}") | 166 raise PathErr(path, f"unsupported file extension: {ext}") |
196 """ | 193 """ |
197 Log a warning message to an output log file and print it to the console. The final period and a | 194 Log a warning message to an output log file and print it to the console. The final period and a |
198 newline is added by the function. | 195 newline is added by the function. |
199 | 196 |
200 Args: | 197 Args: |
201 s (str): The warning message to be logged and printed. | 198 msg (str): The warning message to be logged and printed. |
202 loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and | 199 loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and |
203 immediately read back (beware relative expensive operation, log with caution). | 200 immediately read back (beware relative expensive operation, log with caution). |
204 | 201 |
205 Returns: | 202 Returns: |
206 None | 203 None |
207 """ | 204 """ |
208 # building the path and then reading it immediately seems useless, but it's actually a way of | 205 # Note: validates path via FilePath; keep logging minimal to avoid overhead. |
209 # validating that reduces repetition on the caller's side. Besides, logging a message by writing | |
210 # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from | |
211 # mindlessly logging whenever something comes up, log at the very end and tell the user everything | |
212 # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to | |
213 # the file only at the end of the program's execution. | |
214 with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n") | 206 with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n") |
215 | 207 |
216 class CustomErr(Exception): | 208 class CustomErr(Exception): |
217 """ | 209 """ |
218 Custom error class to handle exceptions in a structured way, with a unique identifier and a message. | 210 Custom error class to handle exceptions in a structured way, with a unique identifier and a message. |
236 | 228 |
237 self.id = max(explicitErrCode, next(CustomErr.__idGenerator)) | 229 self.id = max(explicitErrCode, next(CustomErr.__idGenerator)) |
238 | 230 |
239 def throw(self, loggerPath = "") -> None: | 231 def throw(self, loggerPath = "") -> None: |
240 """ | 232 """ |
241 Raises the current CustomErr instance, logging a warning message before doing so. | 233 Raises the current CustomErr instance, optionally logging it first. |
234 | |
235 Args: | |
236 loggerPath (str): Optional path to a log file to append this error before raising. | |
242 | 237 |
243 Raises: | 238 Raises: |
244 self: The current CustomErr instance. | 239 self: The current CustomErr instance. |
245 | 240 |
246 Returns: | 241 Returns: |
247 None | 242 None |
248 """ | 243 """ |
249 if loggerPath: logWarning(str(self), loggerPath) | 244 if loggerPath: |
245 logWarning(str(self), loggerPath) | |
250 raise self | 246 raise self |
251 | 247 |
252 def abort(self) -> None: | 248 def abort(self) -> None: |
253 """ | 249 """ |
254 Aborts the execution of the script. | 250 Aborts the execution of the script. |
314 Class to handle the result of an operation, with a value and a boolean flag to indicate | 310 Class to handle the result of an operation, with a value and a boolean flag to indicate |
315 whether the operation was successful or not. | 311 whether the operation was successful or not. |
316 """ | 312 """ |
317 def __init__(self, value :Union[T, E], isOk :bool) -> None: | 313 def __init__(self, value :Union[T, E], isOk :bool) -> None: |
318 """ | 314 """ |
319 (Private) Initializes an instance of Result. | 315 Initialize an instance of Result. |
320 | 316 |
321 Args: | 317 Args: |
322 value (Union[T, E]): The value to be stored in the Result instance. | 318 value (Union[T, E]): The value to be stored in the Result instance. |
323 isOk (bool): A boolean flag to indicate whether the operation was successful or not. | 319 isOk (bool): A boolean flag to indicate whether the operation was successful or not. |
324 | 320 |
330 self.value = value | 326 self.value = value |
331 | 327 |
332 @classmethod | 328 @classmethod |
333 def Ok(cls, value :T) -> "Result": | 329 def Ok(cls, value :T) -> "Result": |
334 """ | 330 """ |
335 Constructs a new Result instance with a successful operation. | 331 Construct a successful Result. |
336 | 332 |
337 Args: | 333 Args: |
338 value (T): The value to be stored in the Result instance, set as successful. | 334 value (T): The value to be stored in the Result instance, set as successful. |
339 | 335 |
340 Returns: | 336 Returns: |
343 return Result(value, isOk = True) | 339 return Result(value, isOk = True) |
344 | 340 |
345 @classmethod | 341 @classmethod |
346 def Err(cls, value :E) -> "Result": | 342 def Err(cls, value :E) -> "Result": |
347 """ | 343 """ |
348 Constructs a new Result instance with a failed operation. | 344 Construct a failed Result. |
349 | 345 |
350 Args: | 346 Args: |
351 value (E): The value to be stored in the Result instance, set as failed. | 347 value (E): The value to be stored in the Result instance, set as failed. |
352 | 348 |
353 Returns: | 349 Returns: |
435 | 431 |
436 def __str__(self): | 432 def __str__(self): |
437 return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})" | 433 return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})" |
438 | 434 |
439 # FILES | 435 # FILES |
440 def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame: | |
441 """ | |
442 Reads a .csv or .tsv file and returns it as a Pandas DataFrame. | |
443 | |
444 Args: | |
445 path : the path to the dataset file. | |
446 datasetName : the name of the dataset. | |
447 | |
448 Raises: | |
449 DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if | |
450 it has less than 2 columns. | |
451 | |
452 Returns: | |
453 pandas.DataFrame: The dataset loaded as a Pandas DataFrame. | |
454 """ | |
455 # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than | |
456 # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code. | |
457 # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really | |
458 # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and | |
459 # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is | |
460 # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions. | |
461 try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python") | |
462 except: | |
463 try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python") | |
464 except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}") | |
465 | |
466 if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns") | |
467 return dataset | |
468 | |
469 def readPickle(path :FilePath) -> Any: | 436 def readPickle(path :FilePath) -> Any: |
470 """ | 437 """ |
471 Reads the contents of a .pickle file, which needs to exist at the given path. | 438 Reads the contents of a .pickle file, which needs to exist at the given path. |
472 | 439 |
473 Args: | 440 Args: |
568 """ | 535 """ |
569 with open(path.show(), "wb") as fd: fd.write(ET.tostring(data)) | 536 with open(path.show(), "wb") as fd: fd.write(ET.tostring(data)) |
570 | 537 |
571 # UI ARGUMENTS | 538 # UI ARGUMENTS |
572 class Bool: | 539 class Bool: |
540 """Simple boolean CLI argument parser accepting 'true' or 'false' (case-insensitive).""" | |
573 def __init__(self, argName :str) -> None: | 541 def __init__(self, argName :str) -> None: |
574 self.argName = argName | 542 self.argName = argName |
575 | 543 |
576 def __call__(self, s :str) -> bool: return self.check(s) | 544 def __call__(self, s :str) -> bool: return self.check(s) |
577 | 545 |
580 if s == "true" : return True | 548 if s == "true" : return True |
581 if s == "false": return False | 549 if s == "false": return False |
582 raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"") | 550 raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"") |
583 | 551 |
584 class Float: | 552 class Float: |
553 """Float CLI argument parser supporting NaN and None keywords (case-insensitive).""" | |
585 def __init__(self, argName = "Dataset values, not an argument") -> None: | 554 def __init__(self, argName = "Dataset values, not an argument") -> None: |
586 self.argName = argName | 555 self.argName = argName |
587 | 556 |
588 def __call__(self, s :str) -> float: return self.check(s) | 557 def __call__(self, s :str) -> float: return self.check(s) |
589 | 558 |
605 Recon = "Recon" | 574 Recon = "Recon" |
606 ENGRO2 = "ENGRO2" | 575 ENGRO2 = "ENGRO2" |
607 ENGRO2_no_legend = "ENGRO2_no_legend" | 576 ENGRO2_no_legend = "ENGRO2_no_legend" |
608 HMRcore = "HMRcore" | 577 HMRcore = "HMRcore" |
609 HMRcore_no_legend = "HMRcore_no_legend" | 578 HMRcore_no_legend = "HMRcore_no_legend" |
610 Custom = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths. | 579 Custom = "Custom" |
611 | 580 |
612 def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None: | 581 def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None: |
613 if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model") | 582 if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model") |
614 | 583 |
615 def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]: | 584 def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]: |
633 path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") | 602 path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") |
634 self.__raiseMissingPathErr(path) | 603 self.__raiseMissingPathErr(path) |
635 return readPickle(path) | 604 return readPickle(path) |
636 | 605 |
637 def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree: | 606 def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree: |
607 """Open the SVG metabolic map for this model.""" | |
638 path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/") | 608 path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/") |
639 self.__raiseMissingPathErr(path) | 609 self.__raiseMissingPathErr(path) |
640 return readSvg(path, customErr = DataErr(path, f"custom map in wrong format")) | 610 return readSvg(path, customErr = DataErr(path, f"custom map in wrong format")) |
641 | 611 |
642 def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model: | 612 def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model: |
613 """Load the COBRA model for this enum variant (supports Custom with explicit path/extension).""" | |
643 if(self is Model.Custom): | 614 if(self is Model.Custom): |
644 return self.load_custom_model(customPath, customExtension) | 615 return self.load_custom_model(customPath, customExtension) |
645 else: | 616 else: |
646 return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show()) | 617 return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show()) |
647 | 618 |
648 def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model: | 619 def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model: |
620 """Load a COBRA model from a custom path, supporting XML, JSON, MAT, and YML (compressed or not).""" | |
649 ext = ext if ext else file_path.ext | 621 ext = ext if ext else file_path.ext |
650 try: | 622 try: |
651 if str(ext) in FileFormat.XML.value: | 623 if str(ext) in FileFormat.XML.value: |
652 return cobra.io.read_sbml_model(file_path.show()) | 624 return cobra.io.read_sbml_model(file_path.show()) |
653 | 625 |