Mercurial > repos > bimib > cobraxy
diff COBRAxy/utils/general_utils.py @ 392:f73d57641124 draft default tip
Uploaded
author | francesco_lapi |
---|---|
date | Fri, 05 Sep 2025 10:53:36 +0000 |
parents | 4e53b178031a |
children |
line wrap: on
line diff
--- a/COBRAxy/utils/general_utils.py Fri Sep 05 10:50:12 2025 +0000 +++ b/COBRAxy/utils/general_utils.py Fri Sep 05 10:53:36 2025 +0000 @@ -1,701 +1,701 @@ -import math -import re -import sys -import csv -import pickle -import lxml.etree as ET - -from enum import Enum -from itertools import count -from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union - -import pandas as pd -import cobra - -import zipfile -import gzip -import bz2 -from io import StringIO - -class ValueErr(Exception): - def __init__(self, param_name, expected, actual): - super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}") - -class PathErr(Exception): - def __init__(self, path, message): - super().__init__(f"Path error for '{path}': {message}") - -class FileFormat(Enum): - """ - Encodes possible file extensions to conditionally save data in a different format. - """ - DAT = ("dat",) # this is how galaxy treats all your files! - CSV = ("csv",) # this is how most editable input data is written - TSV = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!! - SVG = ("svg",) # this is how most metabolic maps are written - PNG = ("png",) # this is a common output format for images (such as metabolic maps) - PDF = ("pdf",) # this is also a common output format for images, as it's required in publications. - - # Updated to include compressed variants - XML = ("xml", "xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed - JSON = ("json", "json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed - MAT = ("mat", "mat.gz", "mat.zip", "mat.bz2") # COBRA models can be stored as MAT files, sometimes compressed - YML = ("yml", "yml.gz", "yml.zip", "yml.bz2") # COBRA models can be stored as YML files, sometimes compressed - - TXT = ("txt",) # this is how most output data is written - PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved - - def __init__(self, *extensions): - self.extensions = extensions - # Store original extension when set via fromExt - self._original_extension = None - - @classmethod - def fromExt(cls, ext: str) -> "FileFormat": - """ - Converts a file extension string to a FileFormat instance. - Args: - ext : The file extension as a string. - Returns: - FileFormat: The FileFormat instance corresponding to the file extension. - """ - variantName = ext.upper() - if variantName in FileFormat.__members__: - instance = FileFormat[variantName] - instance._original_extension = ext - return instance - - variantName = ext.lower() - for member in cls: - if variantName in member.value: - # Create a copy-like behavior by storing the original extension - member._original_extension = ext - return member - - raise ValueErr("ext", "a valid FileFormat file extension", ext) - - def __str__(self) -> str: - """ - (Private) converts to str representation. Good practice for usage with argparse. - Returns: - str : the string representation of the file extension. - """ - # If we have an original extension stored (for compressed files only), use it - if hasattr(self, '_original_extension') and self._original_extension: - return self._original_extension - - # For XML, JSON, MAT and YML without original extension, use the base extension - if self == FileFormat.XML: - return "xml" - elif self == FileFormat.JSON: - return "json" - elif self == FileFormat.MAT: - return "mat" - elif self == FileFormat.YML: - return "yml" - - return self.value[-1] - -class FilePath(): - """ - Represents a file path. View this as an attempt to standardize file-related operations by expecting - values of this type in any process requesting a file path. - """ - def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None: - """ - (Private) Initializes an instance of FilePath. - Args: - path : the end of the path, containing the file name. - ext : the file's extension. - prefix : anything before path, if the last '/' isn't there it's added by the code. - Returns: - None : practically, a FilePath instance. - """ - self.ext = ext - self.filePath = filePath - - if prefix and prefix[-1] != '/': - prefix += '/' - self.prefix = prefix - - @classmethod - def fromStrPath(cls, path: str) -> "FilePath": - """ - Factory method to parse a string from which to obtain, if possible, a valid FilePath instance. - It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models. - These double extensions are not supported for other file types such as .csv. - Args: - path : the string containing the path - Raises: - PathErr : if the provided string doesn't represent a valid path. - Returns: - FilePath : the constructed instance. - """ - result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path) - if not result or not result["name"] or not result["ext"]: - raise PathErr(path, "cannot recognize folder structure or extension in path") - - prefix = result["prefix"] if result["prefix"] else "" - name, ext = result["name"], result["ext"] - - # Check for double extensions (json.gz, xml.zip, etc.) - parts = path.split(".") - if len(parts) >= 3: - penultimate = parts[-2] - last = parts[-1] - double_ext = f"{penultimate}.{last}" - - # Try the double extension first - try: - ext_format = FileFormat.fromExt(double_ext) - name = ".".join(parts[:-2]) - # Extract prefix if it exists - if '/' in name: - prefix = name[:name.rfind('/') + 1] - name = name[name.rfind('/') + 1:] - return cls(name, ext_format, prefix=prefix) - except ValueErr: - # If double extension doesn't work, fall back to single extension - pass - - # Single extension fallback (original logic) - try: - ext_format = FileFormat.fromExt(ext) - return cls(name, ext_format, prefix=prefix) - except ValueErr: - raise PathErr(path, f"unsupported file extension: {ext}") - - def show(self) -> str: - """ - Shows the path as a string. - Returns: - str : the path shown as a string. - """ - return f"{self.prefix}{self.filePath}.{self.ext}" - - def __str__(self) -> str: - return self.show() - -# ERRORS -def terminate(msg :str) -> None: - """ - Terminate the execution of the script with an error message. - - Args: - msg (str): The error message to be displayed. - - Returns: - None - """ - sys.exit(f"Execution aborted: {msg}\n") - -def logWarning(msg :str, loggerPath :str) -> None: - """ - Log a warning message to an output log file and print it to the console. The final period and a - newline is added by the function. - - Args: - s (str): The warning message to be logged and printed. - loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and - immediately read back (beware relative expensive operation, log with caution). - - Returns: - None - """ - # building the path and then reading it immediately seems useless, but it's actually a way of - # validating that reduces repetition on the caller's side. Besides, logging a message by writing - # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from - # mindlessly logging whenever something comes up, log at the very end and tell the user everything - # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to - # the file only at the end of the program's execution. - with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n") - -class CustomErr(Exception): - """ - Custom error class to handle exceptions in a structured way, with a unique identifier and a message. - """ - __idGenerator = count() - errName = "Custom Error" - def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None: - """ - (Private) Initializes an instance of CustomErr. - - Args: - msg (str): Error message to be displayed. - details (str): Informs the user more about the error encountered. Defaults to "". - explicitErrCode (int): Explicit error code to be used. Defaults to -1. - - Returns: - None : practically, a CustomErr instance. - """ - self.msg = msg - self.details = details - - self.id = max(explicitErrCode, next(CustomErr.__idGenerator)) - - def throw(self, loggerPath = "") -> None: - """ - Raises the current CustomErr instance, logging a warning message before doing so. - - Raises: - self: The current CustomErr instance. - - Returns: - None - """ - if loggerPath: logWarning(str(self), loggerPath) - raise self - - def abort(self) -> None: - """ - Aborts the execution of the script. - - Returns: - None - """ - terminate(str(self)) - - def __str__(self) -> str: - """ - (Private) Returns a string representing the current CustomErr instance. - - Returns: - str: A string representing the current CustomErr instance. - """ - return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}." - -class ArgsErr(CustomErr): - """ - CustomErr subclass for UI arguments errors. - """ - errName = "Args Error" - def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None: - super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg) - -class DataErr(CustomErr): - """ - CustomErr subclass for data formatting errors. - """ - errName = "Data Format Error" - def __init__(self, fileName :str, msg = "no further details provided") -> None: - super().__init__(f"file \"{fileName}\" contains malformed data", msg) - -class PathErr(CustomErr): - """ - CustomErr subclass for filepath formatting errors. - """ - errName = "Path Error" - def __init__(self, path :FilePath, msg = "no further details provided") -> None: - super().__init__(f"path \"{path}\" is invalid", msg) - -class ValueErr(CustomErr): - """ - CustomErr subclass for any value error. - """ - errName = "Value Error" - def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None: - super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg) - -# RESULT -T = TypeVar('T') -E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened! -class Result(Generic[T, E]): - class ResultErr(CustomErr): - """ - CustomErr subclass for all Result errors. - """ - errName = "Result Error" - def __init__(self, msg = "no further details provided") -> None: - super().__init__(msg) - """ - Class to handle the result of an operation, with a value and a boolean flag to indicate - whether the operation was successful or not. - """ - def __init__(self, value :Union[T, E], isOk :bool) -> None: - """ - (Private) Initializes an instance of Result. - - Args: - value (Union[T, E]): The value to be stored in the Result instance. - isOk (bool): A boolean flag to indicate whether the operation was successful or not. - - Returns: - None : practically, a Result instance. - """ - self.isOk = isOk - self.isErr = not isOk - self.value = value - - @classmethod - def Ok(cls, value :T) -> "Result": - """ - Constructs a new Result instance with a successful operation. - - Args: - value (T): The value to be stored in the Result instance, set as successful. - - Returns: - Result: A new Result instance with a successful operation. - """ - return Result(value, isOk = True) - - @classmethod - def Err(cls, value :E) -> "Result": - """ - Constructs a new Result instance with a failed operation. - - Args: - value (E): The value to be stored in the Result instance, set as failed. - - Returns: - Result: A new Result instance with a failed operation. - """ - return Result(value, isOk = False) - - def unwrap(self) -> T: - """ - Unwraps the value of the Result instance, if the operation was successful. - - Raises: - ResultErr: If the operation was not successful. - - Returns: - T: The value of the Result instance, if the operation was successful. - """ - if self.isOk: return self.value - raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}") - - def unwrapOr(self, default :T) -> T: - """ - Unwraps the value of the Result instance, if the operation was successful, otherwise - it returns a default value. - - Args: - default (T): The default value to be returned if the operation was not successful. - - Returns: - T: The value of the Result instance, if the operation was successful, - otherwise the default value. - """ - return self.value if self.isOk else default - - def expect(self, err :"Result.ResultErr") -> T: - """ - Expects that the value of the Result instance is successful, otherwise it raises an error. - - Args: - err (Exception): The error to be raised if the operation was not successful. - - Raises: - err: The error raised if the operation was not successful. - - Returns: - T: The value of the Result instance, if the operation was successful. - """ - if self.isOk: return self.value - raise err - - U = TypeVar("U") - def map(self, mapper: Callable[[T], U]) -> "Result[U, E]": - """ - Maps the value of the current Result to whatever is returned by the mapper function. - If the Result contained an unsuccessful operation to begin with it remains unchanged - (a reference to the current instance is returned). - If the mapper function panics the returned result instance will be of the error kind. - - Args: - mapper (Callable[[T], U]): The mapper operation to be applied to the Result value. - - Returns: - Result[U, E]: The result of the mapper operation applied to the Result value. - """ - if self.isErr: return self - try: return Result.Ok(mapper(self.value)) - except Exception as e: return Result.Err(e) - - D = TypeVar("D", bound = "Result.ResultErr") - def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]": - """ - Maps the error of the current Result to whatever is returned by the mapper function. - If the Result contained a successful operation it remains unchanged - (a reference to the current instance is returned). - If the mapper function panics this method does as well. - - Args: - mapper (Callable[[E], D]): The mapper operation to be applied to the Result error. - - Returns: - Result[U, E]: The result of the mapper operation applied to the Result error. - """ - if self.isOk: return self - return Result.Err(mapper(self.value)) - - def __str__(self): - return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})" - -# FILES -def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame: - """ - Reads a .csv or .tsv file and returns it as a Pandas DataFrame. - - Args: - path : the path to the dataset file. - datasetName : the name of the dataset. - - Raises: - DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if - it has less than 2 columns. - - Returns: - pandas.DataFrame: The dataset loaded as a Pandas DataFrame. - """ - # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than - # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code. - # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really - # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and - # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is - # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions. - try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python") - except: - try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python") - except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}") - - if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns") - return dataset - -def readPickle(path :FilePath) -> Any: - """ - Reads the contents of a .pickle file, which needs to exist at the given path. - - Args: - path : the path to the .pickle file. - - Returns: - Any : the data inside a pickle file, could be anything. - """ - with open(path.show(), "rb") as fd: return pickle.load(fd) - -def writePickle(path :FilePath, data :Any) -> None: - """ - Saves any data in a .pickle file, created at the given path. - - Args: - path : the path to the .pickle file. - data : the data to be written to the file. - - Returns: - None - """ - with open(path.show(), "wb") as fd: pickle.dump(data, fd) - -def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]: - """ - Reads the contents of a .csv file, which needs to exist at the given path. - - Args: - path : the path to the .csv file. - delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter). - skipHeader : whether the first row of the file is a header and should be skipped. - - Returns: - List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas. - """ - with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:] - -def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree: - """ - Reads the contents of a .svg file, which needs to exist at the given path. - - Args: - path : the path to the .svg file. - - Raises: - DataErr : if the map is malformed. - - Returns: - Any : the data inside a svg file, could be anything. - """ - try: return ET.parse(path.show()) - except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err: - raise customErr if customErr else err - -def writeSvg(path :FilePath, data:ET.ElementTree) -> None: - """ - Saves svg data opened with lxml.etree in a .svg file, created at the given path. - - Args: - path : the path to the .svg file. - data : the data to be written to the file. - - Returns: - None - """ - with open(path.show(), "wb") as fd: fd.write(ET.tostring(data)) - -# UI ARGUMENTS -class Bool: - def __init__(self, argName :str) -> None: - self.argName = argName - - def __call__(self, s :str) -> bool: return self.check(s) - - def check(self, s :str) -> bool: - s = s.lower() - if s == "true" : return True - if s == "false": return False - raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"") - -class Float: - def __init__(self, argName = "Dataset values, not an argument") -> None: - self.argName = argName - - def __call__(self, s :str) -> float: return self.check(s) - - def check(self, s :str) -> float: - try: return float(s) - except ValueError: - s = s.lower() - if s == "nan" or s == "none": return math.nan - raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"") - -# MODELS -OldRule = List[Union[str, "OldRule"]] -class Model(Enum): - """ - Represents a metabolic model, either custom or locally supported. Custom models don't point - to valid file paths. - """ - - Recon = "Recon" - ENGRO2 = "ENGRO2" - ENGRO2_no_legend = "ENGRO2_no_legend" - HMRcore = "HMRcore" - HMRcore_no_legend = "HMRcore_no_legend" - Custom = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths. - - def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None: - if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model") - - def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]: - """ - Open "rules" file for this model. - - Returns: - Dict[str, Dict[str, OldRule]] : the rules for this model. - """ - path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") - self.__raiseMissingPathErr(path) - return readPickle(path) - - def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]: - """ - Open "gene translator (old: gene_in_rule)" file for this model. - - Returns: - Dict[str, Dict[str, str]] : the translator dict for this model. - """ - path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") - self.__raiseMissingPathErr(path) - return readPickle(path) - - def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree: - path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/") - self.__raiseMissingPathErr(path) - return readSvg(path, customErr = DataErr(path, f"custom map in wrong format")) - - def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model: - if(self is Model.Custom): - return self.load_custom_model(customPath, customExtension) - else: - return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show()) - - def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model: - ext = ext if ext else file_path.ext - try: - if str(ext) in FileFormat.XML.value: - return cobra.io.read_sbml_model(file_path.show()) - - if str(ext) in FileFormat.JSON.value: - # Compressed files are not automatically handled by cobra - if(ext == "json"): - return cobra.io.load_json_model(file_path.show()) - else: - return self.extract_model(file_path, ext, "json") - - if str(ext) in FileFormat.MAT.value: - # Compressed files are not automatically handled by cobra - if(ext == "mat"): - return cobra.io.load_matlab_model(file_path.show()) - else: - return self.extract_model(file_path, ext, "mat") - - if str(ext) in FileFormat.YML.value: - # Compressed files are not automatically handled by cobra - if(ext == "yml"): - return cobra.io.load_yaml_model(file_path.show()) - else: - return self.extract_model(file_path, ext, "yml") - - except Exception as e: raise DataErr(file_path, e.__str__()) - raise DataErr(file_path, - f"Fomat \"{file_path.ext}\" is not recognized, only JSON, XML, MAT and YAML (.yml) files are supported.") - - - def extract_model(self, file_path:FilePath, ext :FileFormat, model_encoding:Literal["json", "mat", "yml"]) -> cobra.Model: - """ - Extract JSON, MAT and YAML COBRA model from a compressed file (zip, gz, bz2). - - Args: - file_path: File path of the model - ext: File extensions of class FileFormat (should be .zip, .gz or .bz2) - - Returns: - cobra.Model: COBRApy model - - Raises: - Exception: Extraction errors - """ - ext_str = str(ext) - - try: - if '.zip' in ext_str: - with zipfile.ZipFile(file_path.show(), 'r') as zip_ref: - with zip_ref.open(zip_ref.namelist()[0]) as json_file: - content = json_file.read().decode('utf-8') - if model_encoding == "json": - return cobra.io.load_json_model(StringIO(content)) - elif model_encoding == "mat": - return cobra.io.load_matlab_model(StringIO(content)) - elif model_encoding == "yml": - return cobra.io.load_yaml_model(StringIO(content)) - else: - raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml") - elif '.gz' in ext_str: - with gzip.open(file_path.show(), 'rt', encoding='utf-8') as gz_ref: - if model_encoding == "json": - return cobra.io.load_json_model(gz_ref) - elif model_encoding == "mat": - return cobra.io.load_matlab_model(gz_ref) - elif model_encoding == "yml": - return cobra.io.load_yaml_model(gz_ref) - else: - raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml") - elif '.bz2' in ext_str: - with bz2.open(file_path.show(), 'rt', encoding='utf-8') as bz2_ref: - if model_encoding == "json": - return cobra.io.load_json_model(bz2_ref) - elif model_encoding == "mat": - return cobra.io.load_matlab_model(bz2_ref) - elif model_encoding == "yml": - return cobra.io.load_yaml_model(bz2_ref) - else: - raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml") - else: - raise ValueError(f"Compression format not supported: {ext_str}. Supported: .zip, .gz and .bz2") - - except Exception as e: - raise Exception(f"Error during model extraction: {str(e)}") - - - +import math +import re +import sys +import csv +import pickle +import lxml.etree as ET + +from enum import Enum +from itertools import count +from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union + +import pandas as pd +import cobra + +import zipfile +import gzip +import bz2 +from io import StringIO + +class ValueErr(Exception): + def __init__(self, param_name, expected, actual): + super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}") + +class PathErr(Exception): + def __init__(self, path, message): + super().__init__(f"Path error for '{path}': {message}") + +class FileFormat(Enum): + """ + Encodes possible file extensions to conditionally save data in a different format. + """ + DAT = ("dat",) # this is how galaxy treats all your files! + CSV = ("csv",) # this is how most editable input data is written + TSV = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!! + SVG = ("svg",) # this is how most metabolic maps are written + PNG = ("png",) # this is a common output format for images (such as metabolic maps) + PDF = ("pdf",) # this is also a common output format for images, as it's required in publications. + + # Updated to include compressed variants + XML = ("xml", "xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed + JSON = ("json", "json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed + MAT = ("mat", "mat.gz", "mat.zip", "mat.bz2") # COBRA models can be stored as MAT files, sometimes compressed + YML = ("yml", "yml.gz", "yml.zip", "yml.bz2") # COBRA models can be stored as YML files, sometimes compressed + + TXT = ("txt",) # this is how most output data is written + PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved + + def __init__(self, *extensions): + self.extensions = extensions + # Store original extension when set via fromExt + self._original_extension = None + + @classmethod + def fromExt(cls, ext: str) -> "FileFormat": + """ + Converts a file extension string to a FileFormat instance. + Args: + ext : The file extension as a string. + Returns: + FileFormat: The FileFormat instance corresponding to the file extension. + """ + variantName = ext.upper() + if variantName in FileFormat.__members__: + instance = FileFormat[variantName] + instance._original_extension = ext + return instance + + variantName = ext.lower() + for member in cls: + if variantName in member.value: + # Create a copy-like behavior by storing the original extension + member._original_extension = ext + return member + + raise ValueErr("ext", "a valid FileFormat file extension", ext) + + def __str__(self) -> str: + """ + (Private) converts to str representation. Good practice for usage with argparse. + Returns: + str : the string representation of the file extension. + """ + # If we have an original extension stored (for compressed files only), use it + if hasattr(self, '_original_extension') and self._original_extension: + return self._original_extension + + # For XML, JSON, MAT and YML without original extension, use the base extension + if self == FileFormat.XML: + return "xml" + elif self == FileFormat.JSON: + return "json" + elif self == FileFormat.MAT: + return "mat" + elif self == FileFormat.YML: + return "yml" + + return self.value[-1] + +class FilePath(): + """ + Represents a file path. View this as an attempt to standardize file-related operations by expecting + values of this type in any process requesting a file path. + """ + def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None: + """ + (Private) Initializes an instance of FilePath. + Args: + path : the end of the path, containing the file name. + ext : the file's extension. + prefix : anything before path, if the last '/' isn't there it's added by the code. + Returns: + None : practically, a FilePath instance. + """ + self.ext = ext + self.filePath = filePath + + if prefix and prefix[-1] != '/': + prefix += '/' + self.prefix = prefix + + @classmethod + def fromStrPath(cls, path: str) -> "FilePath": + """ + Factory method to parse a string from which to obtain, if possible, a valid FilePath instance. + It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models. + These double extensions are not supported for other file types such as .csv. + Args: + path : the string containing the path + Raises: + PathErr : if the provided string doesn't represent a valid path. + Returns: + FilePath : the constructed instance. + """ + result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path) + if not result or not result["name"] or not result["ext"]: + raise PathErr(path, "cannot recognize folder structure or extension in path") + + prefix = result["prefix"] if result["prefix"] else "" + name, ext = result["name"], result["ext"] + + # Check for double extensions (json.gz, xml.zip, etc.) + parts = path.split(".") + if len(parts) >= 3: + penultimate = parts[-2] + last = parts[-1] + double_ext = f"{penultimate}.{last}" + + # Try the double extension first + try: + ext_format = FileFormat.fromExt(double_ext) + name = ".".join(parts[:-2]) + # Extract prefix if it exists + if '/' in name: + prefix = name[:name.rfind('/') + 1] + name = name[name.rfind('/') + 1:] + return cls(name, ext_format, prefix=prefix) + except ValueErr: + # If double extension doesn't work, fall back to single extension + pass + + # Single extension fallback (original logic) + try: + ext_format = FileFormat.fromExt(ext) + return cls(name, ext_format, prefix=prefix) + except ValueErr: + raise PathErr(path, f"unsupported file extension: {ext}") + + def show(self) -> str: + """ + Shows the path as a string. + Returns: + str : the path shown as a string. + """ + return f"{self.prefix}{self.filePath}.{self.ext}" + + def __str__(self) -> str: + return self.show() + +# ERRORS +def terminate(msg :str) -> None: + """ + Terminate the execution of the script with an error message. + + Args: + msg (str): The error message to be displayed. + + Returns: + None + """ + sys.exit(f"Execution aborted: {msg}\n") + +def logWarning(msg :str, loggerPath :str) -> None: + """ + Log a warning message to an output log file and print it to the console. The final period and a + newline is added by the function. + + Args: + s (str): The warning message to be logged and printed. + loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and + immediately read back (beware relative expensive operation, log with caution). + + Returns: + None + """ + # building the path and then reading it immediately seems useless, but it's actually a way of + # validating that reduces repetition on the caller's side. Besides, logging a message by writing + # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from + # mindlessly logging whenever something comes up, log at the very end and tell the user everything + # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to + # the file only at the end of the program's execution. + with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n") + +class CustomErr(Exception): + """ + Custom error class to handle exceptions in a structured way, with a unique identifier and a message. + """ + __idGenerator = count() + errName = "Custom Error" + def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None: + """ + (Private) Initializes an instance of CustomErr. + + Args: + msg (str): Error message to be displayed. + details (str): Informs the user more about the error encountered. Defaults to "". + explicitErrCode (int): Explicit error code to be used. Defaults to -1. + + Returns: + None : practically, a CustomErr instance. + """ + self.msg = msg + self.details = details + + self.id = max(explicitErrCode, next(CustomErr.__idGenerator)) + + def throw(self, loggerPath = "") -> None: + """ + Raises the current CustomErr instance, logging a warning message before doing so. + + Raises: + self: The current CustomErr instance. + + Returns: + None + """ + if loggerPath: logWarning(str(self), loggerPath) + raise self + + def abort(self) -> None: + """ + Aborts the execution of the script. + + Returns: + None + """ + terminate(str(self)) + + def __str__(self) -> str: + """ + (Private) Returns a string representing the current CustomErr instance. + + Returns: + str: A string representing the current CustomErr instance. + """ + return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}." + +class ArgsErr(CustomErr): + """ + CustomErr subclass for UI arguments errors. + """ + errName = "Args Error" + def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None: + super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg) + +class DataErr(CustomErr): + """ + CustomErr subclass for data formatting errors. + """ + errName = "Data Format Error" + def __init__(self, fileName :str, msg = "no further details provided") -> None: + super().__init__(f"file \"{fileName}\" contains malformed data", msg) + +class PathErr(CustomErr): + """ + CustomErr subclass for filepath formatting errors. + """ + errName = "Path Error" + def __init__(self, path :FilePath, msg = "no further details provided") -> None: + super().__init__(f"path \"{path}\" is invalid", msg) + +class ValueErr(CustomErr): + """ + CustomErr subclass for any value error. + """ + errName = "Value Error" + def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None: + super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg) + +# RESULT +T = TypeVar('T') +E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened! +class Result(Generic[T, E]): + class ResultErr(CustomErr): + """ + CustomErr subclass for all Result errors. + """ + errName = "Result Error" + def __init__(self, msg = "no further details provided") -> None: + super().__init__(msg) + """ + Class to handle the result of an operation, with a value and a boolean flag to indicate + whether the operation was successful or not. + """ + def __init__(self, value :Union[T, E], isOk :bool) -> None: + """ + (Private) Initializes an instance of Result. + + Args: + value (Union[T, E]): The value to be stored in the Result instance. + isOk (bool): A boolean flag to indicate whether the operation was successful or not. + + Returns: + None : practically, a Result instance. + """ + self.isOk = isOk + self.isErr = not isOk + self.value = value + + @classmethod + def Ok(cls, value :T) -> "Result": + """ + Constructs a new Result instance with a successful operation. + + Args: + value (T): The value to be stored in the Result instance, set as successful. + + Returns: + Result: A new Result instance with a successful operation. + """ + return Result(value, isOk = True) + + @classmethod + def Err(cls, value :E) -> "Result": + """ + Constructs a new Result instance with a failed operation. + + Args: + value (E): The value to be stored in the Result instance, set as failed. + + Returns: + Result: A new Result instance with a failed operation. + """ + return Result(value, isOk = False) + + def unwrap(self) -> T: + """ + Unwraps the value of the Result instance, if the operation was successful. + + Raises: + ResultErr: If the operation was not successful. + + Returns: + T: The value of the Result instance, if the operation was successful. + """ + if self.isOk: return self.value + raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}") + + def unwrapOr(self, default :T) -> T: + """ + Unwraps the value of the Result instance, if the operation was successful, otherwise + it returns a default value. + + Args: + default (T): The default value to be returned if the operation was not successful. + + Returns: + T: The value of the Result instance, if the operation was successful, + otherwise the default value. + """ + return self.value if self.isOk else default + + def expect(self, err :"Result.ResultErr") -> T: + """ + Expects that the value of the Result instance is successful, otherwise it raises an error. + + Args: + err (Exception): The error to be raised if the operation was not successful. + + Raises: + err: The error raised if the operation was not successful. + + Returns: + T: The value of the Result instance, if the operation was successful. + """ + if self.isOk: return self.value + raise err + + U = TypeVar("U") + def map(self, mapper: Callable[[T], U]) -> "Result[U, E]": + """ + Maps the value of the current Result to whatever is returned by the mapper function. + If the Result contained an unsuccessful operation to begin with it remains unchanged + (a reference to the current instance is returned). + If the mapper function panics the returned result instance will be of the error kind. + + Args: + mapper (Callable[[T], U]): The mapper operation to be applied to the Result value. + + Returns: + Result[U, E]: The result of the mapper operation applied to the Result value. + """ + if self.isErr: return self + try: return Result.Ok(mapper(self.value)) + except Exception as e: return Result.Err(e) + + D = TypeVar("D", bound = "Result.ResultErr") + def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]": + """ + Maps the error of the current Result to whatever is returned by the mapper function. + If the Result contained a successful operation it remains unchanged + (a reference to the current instance is returned). + If the mapper function panics this method does as well. + + Args: + mapper (Callable[[E], D]): The mapper operation to be applied to the Result error. + + Returns: + Result[U, E]: The result of the mapper operation applied to the Result error. + """ + if self.isOk: return self + return Result.Err(mapper(self.value)) + + def __str__(self): + return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})" + +# FILES +def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame: + """ + Reads a .csv or .tsv file and returns it as a Pandas DataFrame. + + Args: + path : the path to the dataset file. + datasetName : the name of the dataset. + + Raises: + DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if + it has less than 2 columns. + + Returns: + pandas.DataFrame: The dataset loaded as a Pandas DataFrame. + """ + # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than + # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code. + # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really + # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and + # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is + # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions. + try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python") + except: + try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python") + except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}") + + if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns") + return dataset + +def readPickle(path :FilePath) -> Any: + """ + Reads the contents of a .pickle file, which needs to exist at the given path. + + Args: + path : the path to the .pickle file. + + Returns: + Any : the data inside a pickle file, could be anything. + """ + with open(path.show(), "rb") as fd: return pickle.load(fd) + +def writePickle(path :FilePath, data :Any) -> None: + """ + Saves any data in a .pickle file, created at the given path. + + Args: + path : the path to the .pickle file. + data : the data to be written to the file. + + Returns: + None + """ + with open(path.show(), "wb") as fd: pickle.dump(data, fd) + +def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]: + """ + Reads the contents of a .csv file, which needs to exist at the given path. + + Args: + path : the path to the .csv file. + delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter). + skipHeader : whether the first row of the file is a header and should be skipped. + + Returns: + List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas. + """ + with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:] + +def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree: + """ + Reads the contents of a .svg file, which needs to exist at the given path. + + Args: + path : the path to the .svg file. + + Raises: + DataErr : if the map is malformed. + + Returns: + Any : the data inside a svg file, could be anything. + """ + try: return ET.parse(path.show()) + except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err: + raise customErr if customErr else err + +def writeSvg(path :FilePath, data:ET.ElementTree) -> None: + """ + Saves svg data opened with lxml.etree in a .svg file, created at the given path. + + Args: + path : the path to the .svg file. + data : the data to be written to the file. + + Returns: + None + """ + with open(path.show(), "wb") as fd: fd.write(ET.tostring(data)) + +# UI ARGUMENTS +class Bool: + def __init__(self, argName :str) -> None: + self.argName = argName + + def __call__(self, s :str) -> bool: return self.check(s) + + def check(self, s :str) -> bool: + s = s.lower() + if s == "true" : return True + if s == "false": return False + raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"") + +class Float: + def __init__(self, argName = "Dataset values, not an argument") -> None: + self.argName = argName + + def __call__(self, s :str) -> float: return self.check(s) + + def check(self, s :str) -> float: + try: return float(s) + except ValueError: + s = s.lower() + if s == "nan" or s == "none": return math.nan + raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"") + +# MODELS +OldRule = List[Union[str, "OldRule"]] +class Model(Enum): + """ + Represents a metabolic model, either custom or locally supported. Custom models don't point + to valid file paths. + """ + + Recon = "Recon" + ENGRO2 = "ENGRO2" + ENGRO2_no_legend = "ENGRO2_no_legend" + HMRcore = "HMRcore" + HMRcore_no_legend = "HMRcore_no_legend" + Custom = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths. + + def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None: + if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model") + + def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]: + """ + Open "rules" file for this model. + + Returns: + Dict[str, Dict[str, OldRule]] : the rules for this model. + """ + path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") + self.__raiseMissingPathErr(path) + return readPickle(path) + + def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]: + """ + Open "gene translator (old: gene_in_rule)" file for this model. + + Returns: + Dict[str, Dict[str, str]] : the translator dict for this model. + """ + path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") + self.__raiseMissingPathErr(path) + return readPickle(path) + + def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree: + path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/") + self.__raiseMissingPathErr(path) + return readSvg(path, customErr = DataErr(path, f"custom map in wrong format")) + + def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model: + if(self is Model.Custom): + return self.load_custom_model(customPath, customExtension) + else: + return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show()) + + def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model: + ext = ext if ext else file_path.ext + try: + if str(ext) in FileFormat.XML.value: + return cobra.io.read_sbml_model(file_path.show()) + + if str(ext) in FileFormat.JSON.value: + # Compressed files are not automatically handled by cobra + if(ext == "json"): + return cobra.io.load_json_model(file_path.show()) + else: + return self.extract_model(file_path, ext, "json") + + if str(ext) in FileFormat.MAT.value: + # Compressed files are not automatically handled by cobra + if(ext == "mat"): + return cobra.io.load_matlab_model(file_path.show()) + else: + return self.extract_model(file_path, ext, "mat") + + if str(ext) in FileFormat.YML.value: + # Compressed files are not automatically handled by cobra + if(ext == "yml"): + return cobra.io.load_yaml_model(file_path.show()) + else: + return self.extract_model(file_path, ext, "yml") + + except Exception as e: raise DataErr(file_path, e.__str__()) + raise DataErr(file_path, + f"Fomat \"{file_path.ext}\" is not recognized, only JSON, XML, MAT and YAML (.yml) files are supported.") + + + def extract_model(self, file_path:FilePath, ext :FileFormat, model_encoding:Literal["json", "mat", "yml"]) -> cobra.Model: + """ + Extract JSON, MAT and YAML COBRA model from a compressed file (zip, gz, bz2). + + Args: + file_path: File path of the model + ext: File extensions of class FileFormat (should be .zip, .gz or .bz2) + + Returns: + cobra.Model: COBRApy model + + Raises: + Exception: Extraction errors + """ + ext_str = str(ext) + + try: + if '.zip' in ext_str: + with zipfile.ZipFile(file_path.show(), 'r') as zip_ref: + with zip_ref.open(zip_ref.namelist()[0]) as json_file: + content = json_file.read().decode('utf-8') + if model_encoding == "json": + return cobra.io.load_json_model(StringIO(content)) + elif model_encoding == "mat": + return cobra.io.load_matlab_model(StringIO(content)) + elif model_encoding == "yml": + return cobra.io.load_yaml_model(StringIO(content)) + else: + raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml") + elif '.gz' in ext_str: + with gzip.open(file_path.show(), 'rt', encoding='utf-8') as gz_ref: + if model_encoding == "json": + return cobra.io.load_json_model(gz_ref) + elif model_encoding == "mat": + return cobra.io.load_matlab_model(gz_ref) + elif model_encoding == "yml": + return cobra.io.load_yaml_model(gz_ref) + else: + raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml") + elif '.bz2' in ext_str: + with bz2.open(file_path.show(), 'rt', encoding='utf-8') as bz2_ref: + if model_encoding == "json": + return cobra.io.load_json_model(bz2_ref) + elif model_encoding == "mat": + return cobra.io.load_matlab_model(bz2_ref) + elif model_encoding == "yml": + return cobra.io.load_yaml_model(bz2_ref) + else: + raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml") + else: + raise ValueError(f"Compression format not supported: {ext_str}. Supported: .zip, .gz and .bz2") + + except Exception as e: + raise Exception(f"Error during model extraction: {str(e)}") + + + def __str__(self) -> str: return self.value \ No newline at end of file