Mercurial > repos > bimib > cobraxy
diff COBRAxy/utils/general_utils.py @ 456:a6e45049c1b9 draft default tip
Uploaded
author | francesco_lapi |
---|---|
date | Fri, 12 Sep 2025 17:28:45 +0000 |
parents | 4a385fdb9e58 |
children |
line wrap: on
line diff
--- a/COBRAxy/utils/general_utils.py Fri Sep 12 15:05:54 2025 +0000 +++ b/COBRAxy/utils/general_utils.py Fri Sep 12 17:28:45 2025 +0000 @@ -1,3 +1,13 @@ +""" +General utilities for COBRAxy. + +This module provides: +- File and path helpers (FileFormat, FilePath) +- Error and result handling utilities (CustomErr, Result) +- Basic I/O helpers (CSV/TSV, pickle, SVG) +- Lightweight CLI argument parsers (Bool, Float) +- Model loader utilities for COBRA models, including compressed formats +""" import math import re import sys @@ -7,11 +17,10 @@ from enum import Enum from itertools import count -from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Set, Tuple +from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Tuple import pandas as pd import cobra -from cobra import Model as cobraModel, Reaction, Metabolite import zipfile import gzip @@ -19,7 +28,7 @@ from io import StringIO - +from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Tuple class ValueErr(Exception): def __init__(self, param_name, expected, actual): super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}") @@ -32,21 +41,21 @@ """ Encodes possible file extensions to conditionally save data in a different format. """ - DAT = ("dat",) # this is how galaxy treats all your files! - CSV = ("csv",) # this is how most editable input data is written - TSV = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!! - SVG = ("svg",) # this is how most metabolic maps are written - PNG = ("png",) # this is a common output format for images (such as metabolic maps) - PDF = ("pdf",) # this is also a common output format for images, as it's required in publications. - - # Updated to include compressed variants - XML = ("xml", "xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed - JSON = ("json", "json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed - MAT = ("mat", "mat.gz", "mat.zip", "mat.bz2") # COBRA models can be stored as MAT files, sometimes compressed - YML = ("yml", "yml.gz", "yml.zip", "yml.bz2") # COBRA models can be stored as YML files, sometimes compressed + DAT = ("dat",) + CSV = ("csv",) + TSV = ("tsv",) + SVG = ("svg",) + PNG = ("png",) + PDF = ("pdf",) - TXT = ("txt",) # this is how most output data is written - PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved + # Compressed variants for common model formats + XML = ("xml", "xml.gz", "xml.zip", "xml.bz2") + JSON = ("json", "json.gz", "json.zip", "json.bz2") + MAT = ("mat", "mat.gz", "mat.zip", "mat.bz2") + YML = ("yml", "yml.gz", "yml.zip", "yml.bz2") + + TXT = ("txt",) + PICKLE = ("pickle", "pk", "p") def __init__(self, *extensions): self.extensions = extensions @@ -83,11 +92,9 @@ Returns: str : the string representation of the file extension. """ - # If we have an original extension stored (for compressed files only), use it if hasattr(self, '_original_extension') and self._original_extension: return self._original_extension - # For XML, JSON, MAT and YML without original extension, use the base extension if self == FileFormat.XML: return "xml" elif self == FileFormat.JSON: @@ -101,18 +108,15 @@ class FilePath(): """ - Represents a file path. View this as an attempt to standardize file-related operations by expecting - values of this type in any process requesting a file path. + Represents a file path with format-aware helpers. """ def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None: """ - (Private) Initializes an instance of FilePath. + Initialize FilePath. Args: - path : the end of the path, containing the file name. - ext : the file's extension. - prefix : anything before path, if the last '/' isn't there it's added by the code. - Returns: - None : practically, a FilePath instance. + path: File name stem. + ext: File extension (FileFormat). + prefix: Optional directory path (trailing '/' auto-added). """ self.ext = ext self.filePath = filePath @@ -124,9 +128,7 @@ @classmethod def fromStrPath(cls, path: str) -> "FilePath": """ - Factory method to parse a string from which to obtain, if possible, a valid FilePath instance. - It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models. - These double extensions are not supported for other file types such as .csv. + Parse a string path into a FilePath, supporting double extensions for models (e.g., .json.gz). Args: path : the string containing the path Raises: @@ -141,27 +143,22 @@ prefix = result["prefix"] if result["prefix"] else "" name, ext = result["name"], result["ext"] - # Check for double extensions (json.gz, xml.zip, etc.) parts = path.split(".") if len(parts) >= 3: penultimate = parts[-2] last = parts[-1] double_ext = f"{penultimate}.{last}" - # Try the double extension first try: ext_format = FileFormat.fromExt(double_ext) name = ".".join(parts[:-2]) - # Extract prefix if it exists if '/' in name: prefix = name[:name.rfind('/') + 1] name = name[name.rfind('/') + 1:] return cls(name, ext_format, prefix=prefix) except ValueErr: - # If double extension doesn't work, fall back to single extension pass - # Single extension fallback (original logic) try: ext_format = FileFormat.fromExt(ext) return cls(name, ext_format, prefix=prefix) @@ -198,19 +195,14 @@ newline is added by the function. Args: - s (str): The warning message to be logged and printed. + msg (str): The warning message to be logged and printed. loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and immediately read back (beware relative expensive operation, log with caution). Returns: None """ - # building the path and then reading it immediately seems useless, but it's actually a way of - # validating that reduces repetition on the caller's side. Besides, logging a message by writing - # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from - # mindlessly logging whenever something comes up, log at the very end and tell the user everything - # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to - # the file only at the end of the program's execution. + # Note: validates path via FilePath; keep logging minimal to avoid overhead. with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n") class CustomErr(Exception): @@ -238,15 +230,19 @@ def throw(self, loggerPath = "") -> None: """ - Raises the current CustomErr instance, logging a warning message before doing so. + Raises the current CustomErr instance, optionally logging it first. + + Args: + loggerPath (str): Optional path to a log file to append this error before raising. Raises: self: The current CustomErr instance. - + Returns: None """ - if loggerPath: logWarning(str(self), loggerPath) + if loggerPath: + logWarning(str(self), loggerPath) raise self def abort(self) -> None: @@ -316,7 +312,7 @@ """ def __init__(self, value :Union[T, E], isOk :bool) -> None: """ - (Private) Initializes an instance of Result. + Initialize an instance of Result. Args: value (Union[T, E]): The value to be stored in the Result instance. @@ -332,7 +328,7 @@ @classmethod def Ok(cls, value :T) -> "Result": """ - Constructs a new Result instance with a successful operation. + Construct a successful Result. Args: value (T): The value to be stored in the Result instance, set as successful. @@ -345,7 +341,7 @@ @classmethod def Err(cls, value :E) -> "Result": """ - Constructs a new Result instance with a failed operation. + Construct a failed Result. Args: value (E): The value to be stored in the Result instance, set as failed. @@ -437,35 +433,6 @@ return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})" # FILES -def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame: - """ - Reads a .csv or .tsv file and returns it as a Pandas DataFrame. - - Args: - path : the path to the dataset file. - datasetName : the name of the dataset. - - Raises: - DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if - it has less than 2 columns. - - Returns: - pandas.DataFrame: The dataset loaded as a Pandas DataFrame. - """ - # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than - # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code. - # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really - # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and - # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is - # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions. - try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python") - except: - try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python") - except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}") - - if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns") - return dataset - def readPickle(path :FilePath) -> Any: """ Reads the contents of a .pickle file, which needs to exist at the given path. @@ -570,6 +537,7 @@ # UI ARGUMENTS class Bool: + """Simple boolean CLI argument parser accepting 'true' or 'false' (case-insensitive).""" def __init__(self, argName :str) -> None: self.argName = argName @@ -582,6 +550,7 @@ raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"") class Float: + """Float CLI argument parser supporting NaN and None keywords (case-insensitive).""" def __init__(self, argName = "Dataset values, not an argument") -> None: self.argName = argName @@ -607,7 +576,7 @@ ENGRO2_no_legend = "ENGRO2_no_legend" HMRcore = "HMRcore" HMRcore_no_legend = "HMRcore_no_legend" - Custom = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths. + Custom = "Custom" def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None: if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model") @@ -635,17 +604,20 @@ return readPickle(path) def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree: + """Open the SVG metabolic map for this model.""" path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/") self.__raiseMissingPathErr(path) return readSvg(path, customErr = DataErr(path, f"custom map in wrong format")) def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model: + """Load the COBRA model for this enum variant (supports Custom with explicit path/extension).""" if(self is Model.Custom): return self.load_custom_model(customPath, customExtension) else: return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show()) def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model: + """Load a COBRA model from a custom path, supporting XML, JSON, MAT, and YML (compressed or not).""" ext = ext if ext else file_path.ext try: if str(ext) in FileFormat.XML.value: