Mercurial > repos > bimib > cobraxy
comparison COBRAxy/src/utils/general_utils.py @ 539:2fb97466e404 draft
Uploaded
| author | francesco_lapi | 
|---|---|
| date | Sat, 25 Oct 2025 14:55:13 +0000 | 
| parents | |
| children | 
   comparison
  equal
  deleted
  inserted
  replaced
| 538:fd53d42348bd | 539:2fb97466e404 | 
|---|---|
| 1 """ | |
| 2 General utilities for COBRAxy. | |
| 3 | |
| 4 This module provides: | |
| 5 - File and path helpers (FileFormat, FilePath) | |
| 6 - Error and result handling utilities (CustomErr, Result) | |
| 7 - Basic I/O helpers (CSV/TSV, pickle, SVG) | |
| 8 - Lightweight CLI argument parsers (Bool, Float) | |
| 9 - Model loader utilities for COBRA models, including compressed formats | |
| 10 """ | |
| 11 import math | |
| 12 import re | |
| 13 import sys | |
| 14 import csv | |
| 15 import pickle | |
| 16 import lxml.etree as ET | |
| 17 | |
| 18 from enum import Enum | |
| 19 from itertools import count | |
| 20 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Tuple | |
| 21 | |
| 22 import pandas as pd | |
| 23 import cobra | |
| 24 | |
| 25 import zipfile | |
| 26 import gzip | |
| 27 import bz2 | |
| 28 from io import StringIO | |
| 29 | |
| 30 | |
| 31 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Tuple | |
| 32 class ValueErr(Exception): | |
| 33 def __init__(self, param_name, expected, actual): | |
| 34 super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}") | |
| 35 | |
| 36 class PathErr(Exception): | |
| 37 def __init__(self, path, message): | |
| 38 super().__init__(f"Path error for '{path}': {message}") | |
| 39 | |
| 40 class FileFormat(Enum): | |
| 41 """ | |
| 42 Encodes possible file extensions to conditionally save data in a different format. | |
| 43 """ | |
| 44 DAT = ("dat",) | |
| 45 CSV = ("csv",) | |
| 46 TSV = ("tsv",) | |
| 47 SVG = ("svg",) | |
| 48 PNG = ("png",) | |
| 49 PDF = ("pdf",) | |
| 50 | |
| 51 # Compressed variants for common model formats | |
| 52 XML = ("xml", "xml.gz", "xml.zip", "xml.bz2") | |
| 53 JSON = ("json", "json.gz", "json.zip", "json.bz2") | |
| 54 MAT = ("mat", "mat.gz", "mat.zip", "mat.bz2") | |
| 55 YML = ("yml", "yml.gz", "yml.zip", "yml.bz2") | |
| 56 | |
| 57 TXT = ("txt",) | |
| 58 PICKLE = ("pickle", "pk", "p") | |
| 59 | |
| 60 def __init__(self, *extensions): | |
| 61 self.extensions = extensions | |
| 62 # Store original extension when set via fromExt | |
| 63 self._original_extension = None | |
| 64 | |
| 65 @classmethod | |
| 66 def fromExt(cls, ext: str) -> "FileFormat": | |
| 67 """ | |
| 68 Converts a file extension string to a FileFormat instance. | |
| 69 Args: | |
| 70 ext : The file extension as a string. | |
| 71 Returns: | |
| 72 FileFormat: The FileFormat instance corresponding to the file extension. | |
| 73 """ | |
| 74 variantName = ext.upper() | |
| 75 if variantName in FileFormat.__members__: | |
| 76 instance = FileFormat[variantName] | |
| 77 instance._original_extension = ext | |
| 78 return instance | |
| 79 | |
| 80 variantName = ext.lower() | |
| 81 for member in cls: | |
| 82 if variantName in member.value: | |
| 83 # Create a copy-like behavior by storing the original extension | |
| 84 member._original_extension = ext | |
| 85 return member | |
| 86 | |
| 87 raise ValueErr("ext", "a valid FileFormat file extension", ext) | |
| 88 | |
| 89 def __str__(self) -> str: | |
| 90 """ | |
| 91 (Private) converts to str representation. Good practice for usage with argparse. | |
| 92 Returns: | |
| 93 str : the string representation of the file extension. | |
| 94 """ | |
| 95 if hasattr(self, '_original_extension') and self._original_extension: | |
| 96 return self._original_extension | |
| 97 | |
| 98 if self == FileFormat.XML: | |
| 99 return "xml" | |
| 100 elif self == FileFormat.JSON: | |
| 101 return "json" | |
| 102 elif self == FileFormat.MAT: | |
| 103 return "mat" | |
| 104 elif self == FileFormat.YML: | |
| 105 return "yml" | |
| 106 | |
| 107 return self.value[-1] | |
| 108 | |
| 109 class FilePath(): | |
| 110 """ | |
| 111 Represents a file path with format-aware helpers. | |
| 112 """ | |
| 113 def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None: | |
| 114 """ | |
| 115 Initialize FilePath. | |
| 116 Args: | |
| 117 path: File name stem. | |
| 118 ext: File extension (FileFormat). | |
| 119 prefix: Optional directory path (trailing '/' auto-added). | |
| 120 """ | |
| 121 self.ext = ext | |
| 122 self.filePath = filePath | |
| 123 | |
| 124 if prefix and prefix[-1] != '/': | |
| 125 prefix += '/' | |
| 126 self.prefix = prefix | |
| 127 | |
| 128 @classmethod | |
| 129 def fromStrPath(cls, path: str) -> "FilePath": | |
| 130 """ | |
| 131 Parse a string path into a FilePath, supporting double extensions for models (e.g., .json.gz). | |
| 132 Args: | |
| 133 path : the string containing the path | |
| 134 Raises: | |
| 135 PathErr : if the provided string doesn't represent a valid path. | |
| 136 Returns: | |
| 137 FilePath : the constructed instance. | |
| 138 """ | |
| 139 result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path) | |
| 140 if not result or not result["name"] or not result["ext"]: | |
| 141 raise PathErr(path, "cannot recognize folder structure or extension in path") | |
| 142 | |
| 143 prefix = result["prefix"] if result["prefix"] else "" | |
| 144 name, ext = result["name"], result["ext"] | |
| 145 | |
| 146 parts = path.split(".") | |
| 147 if len(parts) >= 3: | |
| 148 penultimate = parts[-2] | |
| 149 last = parts[-1] | |
| 150 double_ext = f"{penultimate}.{last}" | |
| 151 | |
| 152 try: | |
| 153 ext_format = FileFormat.fromExt(double_ext) | |
| 154 name = ".".join(parts[:-2]) | |
| 155 if '/' in name: | |
| 156 prefix = name[:name.rfind('/') + 1] | |
| 157 name = name[name.rfind('/') + 1:] | |
| 158 return cls(name, ext_format, prefix=prefix) | |
| 159 except ValueErr: | |
| 160 pass | |
| 161 | |
| 162 try: | |
| 163 ext_format = FileFormat.fromExt(ext) | |
| 164 return cls(name, ext_format, prefix=prefix) | |
| 165 except ValueErr: | |
| 166 raise PathErr(path, f"unsupported file extension: {ext}") | |
| 167 | |
| 168 def show(self) -> str: | |
| 169 """ | |
| 170 Shows the path as a string. | |
| 171 Returns: | |
| 172 str : the path shown as a string. | |
| 173 """ | |
| 174 return f"{self.prefix}{self.filePath}.{self.ext}" | |
| 175 | |
| 176 def __str__(self) -> str: | |
| 177 return self.show() | |
| 178 | |
| 179 # ERRORS | |
| 180 def terminate(msg :str) -> None: | |
| 181 """ | |
| 182 Terminate the execution of the script with an error message. | |
| 183 | |
| 184 Args: | |
| 185 msg (str): The error message to be displayed. | |
| 186 | |
| 187 Returns: | |
| 188 None | |
| 189 """ | |
| 190 sys.exit(f"Execution aborted: {msg}\n") | |
| 191 | |
| 192 def logWarning(msg :str, loggerPath :str) -> None: | |
| 193 """ | |
| 194 Log a warning message to an output log file and print it to the console. The final period and a | |
| 195 newline is added by the function. | |
| 196 | |
| 197 Args: | |
| 198 msg (str): The warning message to be logged and printed. | |
| 199 loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and | |
| 200 immediately read back (beware relative expensive operation, log with caution). | |
| 201 | |
| 202 Returns: | |
| 203 None | |
| 204 """ | |
| 205 # Note: validates path via FilePath; keep logging minimal to avoid overhead. | |
| 206 with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n") | |
| 207 | |
| 208 class CustomErr(Exception): | |
| 209 """ | |
| 210 Custom error class to handle exceptions in a structured way, with a unique identifier and a message. | |
| 211 """ | |
| 212 __idGenerator = count() | |
| 213 errName = "Custom Error" | |
| 214 def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None: | |
| 215 """ | |
| 216 (Private) Initializes an instance of CustomErr. | |
| 217 | |
| 218 Args: | |
| 219 msg (str): Error message to be displayed. | |
| 220 details (str): Informs the user more about the error encountered. Defaults to "". | |
| 221 explicitErrCode (int): Explicit error code to be used. Defaults to -1. | |
| 222 | |
| 223 Returns: | |
| 224 None : practically, a CustomErr instance. | |
| 225 """ | |
| 226 self.msg = msg | |
| 227 self.details = details | |
| 228 | |
| 229 self.id = max(explicitErrCode, next(CustomErr.__idGenerator)) | |
| 230 | |
| 231 def throw(self, loggerPath = "") -> None: | |
| 232 """ | |
| 233 Raises the current CustomErr instance, optionally logging it first. | |
| 234 | |
| 235 Args: | |
| 236 loggerPath (str): Optional path to a log file to append this error before raising. | |
| 237 | |
| 238 Raises: | |
| 239 self: The current CustomErr instance. | |
| 240 | |
| 241 Returns: | |
| 242 None | |
| 243 """ | |
| 244 if loggerPath: | |
| 245 logWarning(str(self), loggerPath) | |
| 246 raise self | |
| 247 | |
| 248 def abort(self) -> None: | |
| 249 """ | |
| 250 Aborts the execution of the script. | |
| 251 | |
| 252 Returns: | |
| 253 None | |
| 254 """ | |
| 255 terminate(str(self)) | |
| 256 | |
| 257 def __str__(self) -> str: | |
| 258 """ | |
| 259 (Private) Returns a string representing the current CustomErr instance. | |
| 260 | |
| 261 Returns: | |
| 262 str: A string representing the current CustomErr instance. | |
| 263 """ | |
| 264 return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}." | |
| 265 | |
| 266 class ArgsErr(CustomErr): | |
| 267 """ | |
| 268 CustomErr subclass for UI arguments errors. | |
| 269 """ | |
| 270 errName = "Args Error" | |
| 271 def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None: | |
| 272 super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg) | |
| 273 | |
| 274 class DataErr(CustomErr): | |
| 275 """ | |
| 276 CustomErr subclass for data formatting errors. | |
| 277 """ | |
| 278 errName = "Data Format Error" | |
| 279 def __init__(self, fileName :str, msg = "no further details provided") -> None: | |
| 280 super().__init__(f"file \"{fileName}\" contains malformed data", msg) | |
| 281 | |
| 282 class PathErr(CustomErr): | |
| 283 """ | |
| 284 CustomErr subclass for filepath formatting errors. | |
| 285 """ | |
| 286 errName = "Path Error" | |
| 287 def __init__(self, path :FilePath, msg = "no further details provided") -> None: | |
| 288 super().__init__(f"path \"{path}\" is invalid", msg) | |
| 289 | |
| 290 class ValueErr(CustomErr): | |
| 291 """ | |
| 292 CustomErr subclass for any value error. | |
| 293 """ | |
| 294 errName = "Value Error" | |
| 295 def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None: | |
| 296 super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg) | |
| 297 | |
| 298 # RESULT | |
| 299 T = TypeVar('T') | |
| 300 E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened! | |
| 301 class Result(Generic[T, E]): | |
| 302 class ResultErr(CustomErr): | |
| 303 """ | |
| 304 CustomErr subclass for all Result errors. | |
| 305 """ | |
| 306 errName = "Result Error" | |
| 307 def __init__(self, msg = "no further details provided") -> None: | |
| 308 super().__init__(msg) | |
| 309 """ | |
| 310 Class to handle the result of an operation, with a value and a boolean flag to indicate | |
| 311 whether the operation was successful or not. | |
| 312 """ | |
| 313 def __init__(self, value :Union[T, E], isOk :bool) -> None: | |
| 314 """ | |
| 315 Initialize an instance of Result. | |
| 316 | |
| 317 Args: | |
| 318 value (Union[T, E]): The value to be stored in the Result instance. | |
| 319 isOk (bool): A boolean flag to indicate whether the operation was successful or not. | |
| 320 | |
| 321 Returns: | |
| 322 None : practically, a Result instance. | |
| 323 """ | |
| 324 self.isOk = isOk | |
| 325 self.isErr = not isOk | |
| 326 self.value = value | |
| 327 | |
| 328 @classmethod | |
| 329 def Ok(cls, value :T) -> "Result": | |
| 330 """ | |
| 331 Construct a successful Result. | |
| 332 | |
| 333 Args: | |
| 334 value (T): The value to be stored in the Result instance, set as successful. | |
| 335 | |
| 336 Returns: | |
| 337 Result: A new Result instance with a successful operation. | |
| 338 """ | |
| 339 return Result(value, isOk = True) | |
| 340 | |
| 341 @classmethod | |
| 342 def Err(cls, value :E) -> "Result": | |
| 343 """ | |
| 344 Construct a failed Result. | |
| 345 | |
| 346 Args: | |
| 347 value (E): The value to be stored in the Result instance, set as failed. | |
| 348 | |
| 349 Returns: | |
| 350 Result: A new Result instance with a failed operation. | |
| 351 """ | |
| 352 return Result(value, isOk = False) | |
| 353 | |
| 354 def unwrap(self) -> T: | |
| 355 """ | |
| 356 Unwraps the value of the Result instance, if the operation was successful. | |
| 357 | |
| 358 Raises: | |
| 359 ResultErr: If the operation was not successful. | |
| 360 | |
| 361 Returns: | |
| 362 T: The value of the Result instance, if the operation was successful. | |
| 363 """ | |
| 364 if self.isOk: return self.value | |
| 365 raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}") | |
| 366 | |
| 367 def unwrapOr(self, default :T) -> T: | |
| 368 """ | |
| 369 Unwraps the value of the Result instance, if the operation was successful, otherwise | |
| 370 it returns a default value. | |
| 371 | |
| 372 Args: | |
| 373 default (T): The default value to be returned if the operation was not successful. | |
| 374 | |
| 375 Returns: | |
| 376 T: The value of the Result instance, if the operation was successful, | |
| 377 otherwise the default value. | |
| 378 """ | |
| 379 return self.value if self.isOk else default | |
| 380 | |
| 381 def expect(self, err :"Result.ResultErr") -> T: | |
| 382 """ | |
| 383 Expects that the value of the Result instance is successful, otherwise it raises an error. | |
| 384 | |
| 385 Args: | |
| 386 err (Exception): The error to be raised if the operation was not successful. | |
| 387 | |
| 388 Raises: | |
| 389 err: The error raised if the operation was not successful. | |
| 390 | |
| 391 Returns: | |
| 392 T: The value of the Result instance, if the operation was successful. | |
| 393 """ | |
| 394 if self.isOk: return self.value | |
| 395 raise err | |
| 396 | |
| 397 U = TypeVar("U") | |
| 398 def map(self, mapper: Callable[[T], U]) -> "Result[U, E]": | |
| 399 """ | |
| 400 Maps the value of the current Result to whatever is returned by the mapper function. | |
| 401 If the Result contained an unsuccessful operation to begin with it remains unchanged | |
| 402 (a reference to the current instance is returned). | |
| 403 If the mapper function panics the returned result instance will be of the error kind. | |
| 404 | |
| 405 Args: | |
| 406 mapper (Callable[[T], U]): The mapper operation to be applied to the Result value. | |
| 407 | |
| 408 Returns: | |
| 409 Result[U, E]: The result of the mapper operation applied to the Result value. | |
| 410 """ | |
| 411 if self.isErr: return self | |
| 412 try: return Result.Ok(mapper(self.value)) | |
| 413 except Exception as e: return Result.Err(e) | |
| 414 | |
| 415 D = TypeVar("D", bound = "Result.ResultErr") | |
| 416 def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]": | |
| 417 """ | |
| 418 Maps the error of the current Result to whatever is returned by the mapper function. | |
| 419 If the Result contained a successful operation it remains unchanged | |
| 420 (a reference to the current instance is returned). | |
| 421 If the mapper function panics this method does as well. | |
| 422 | |
| 423 Args: | |
| 424 mapper (Callable[[E], D]): The mapper operation to be applied to the Result error. | |
| 425 | |
| 426 Returns: | |
| 427 Result[U, E]: The result of the mapper operation applied to the Result error. | |
| 428 """ | |
| 429 if self.isOk: return self | |
| 430 return Result.Err(mapper(self.value)) | |
| 431 | |
| 432 def __str__(self): | |
| 433 return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})" | |
| 434 | |
| 435 # FILES | |
| 436 def readPickle(path :FilePath) -> Any: | |
| 437 """ | |
| 438 Reads the contents of a .pickle file, which needs to exist at the given path. | |
| 439 | |
| 440 Args: | |
| 441 path : the path to the .pickle file. | |
| 442 | |
| 443 Returns: | |
| 444 Any : the data inside a pickle file, could be anything. | |
| 445 """ | |
| 446 with open(path.show(), "rb") as fd: return pickle.load(fd) | |
| 447 | |
| 448 def writePickle(path :FilePath, data :Any) -> None: | |
| 449 """ | |
| 450 Saves any data in a .pickle file, created at the given path. | |
| 451 | |
| 452 Args: | |
| 453 path : the path to the .pickle file. | |
| 454 data : the data to be written to the file. | |
| 455 | |
| 456 Returns: | |
| 457 None | |
| 458 """ | |
| 459 with open(path.show(), "wb") as fd: pickle.dump(data, fd) | |
| 460 | |
| 461 def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]: | |
| 462 """ | |
| 463 Reads the contents of a .csv file, which needs to exist at the given path. | |
| 464 | |
| 465 Args: | |
| 466 path : the path to the .csv file. | |
| 467 delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter). | |
| 468 skipHeader : whether the first row of the file is a header and should be skipped. | |
| 469 | |
| 470 Returns: | |
| 471 List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas. | |
| 472 """ | |
| 473 with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:] | |
| 474 | |
| 475 def findIdxByName(header: List[str], name: str, colName="name") -> Optional[int]: | |
| 476 """ | |
| 477 Find the indices of the 'ReactionID' column and a user-specified column name | |
| 478 within the header row of a tabular file. | |
| 479 | |
| 480 Args: | |
| 481 header (List[str]): The header row, as a list of column names. | |
| 482 name (str): The name of the column to look for (e.g. 'GPR'). | |
| 483 colName (str, optional): Label used in error messages for clarity. Defaults to "name". | |
| 484 | |
| 485 Returns: | |
| 486 Tuple[int, int]: A tuple containing: | |
| 487 - The index of the 'ReactionID' column. | |
| 488 - The index of the requested column `name`. | |
| 489 | |
| 490 Raises: | |
| 491 ValueError: If 'ReactionID' or the requested column `name` is not found in the header. | |
| 492 | |
| 493 Notes: | |
| 494 Both 'ReactionID' and the requested column are mandatory for downstream processing. | |
| 495 """ | |
| 496 | |
| 497 col_index = {col_name: idx for idx, col_name in enumerate(header)} | |
| 498 | |
| 499 if name not in col_index or "ReactionID" not in col_index: | |
| 500 raise ValueError(f"Tabular file must contain 'ReactionID' and {name} columns.") | |
| 501 | |
| 502 id_idx = col_index["ReactionID"] | |
| 503 idx_gpr = col_index[name] | |
| 504 | |
| 505 return id_idx, idx_gpr | |
| 506 | |
| 507 | |
| 508 def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree: | |
| 509 """ | |
| 510 Reads the contents of a .svg file, which needs to exist at the given path. | |
| 511 | |
| 512 Args: | |
| 513 path : the path to the .svg file. | |
| 514 | |
| 515 Raises: | |
| 516 DataErr : if the map is malformed. | |
| 517 | |
| 518 Returns: | |
| 519 Any : the data inside a svg file, could be anything. | |
| 520 """ | |
| 521 try: return ET.parse(path.show()) | |
| 522 except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err: | |
| 523 raise customErr if customErr else err | |
| 524 | |
| 525 def writeSvg(path :FilePath, data:ET.ElementTree) -> None: | |
| 526 """ | |
| 527 Saves svg data opened with lxml.etree in a .svg file, created at the given path. | |
| 528 | |
| 529 Args: | |
| 530 path : the path to the .svg file. | |
| 531 data : the data to be written to the file. | |
| 532 | |
| 533 Returns: | |
| 534 None | |
| 535 """ | |
| 536 with open(path.show(), "wb") as fd: fd.write(ET.tostring(data)) | |
| 537 | |
| 538 # UI ARGUMENTS | |
| 539 class Bool: | |
| 540 """Simple boolean CLI argument parser accepting 'true' or 'false' (case-insensitive).""" | |
| 541 def __init__(self, argName :str) -> None: | |
| 542 self.argName = argName | |
| 543 | |
| 544 def __call__(self, s :str) -> bool: return self.check(s) | |
| 545 | |
| 546 def check(self, s :str) -> bool: | |
| 547 s = s.lower() | |
| 548 if s == "true" : return True | |
| 549 if s == "false": return False | |
| 550 raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"") | |
| 551 | |
| 552 class Float: | |
| 553 """Float CLI argument parser supporting NaN and None keywords (case-insensitive).""" | |
| 554 def __init__(self, argName = "Dataset values, not an argument") -> None: | |
| 555 self.argName = argName | |
| 556 | |
| 557 def __call__(self, s :str) -> float: return self.check(s) | |
| 558 | |
| 559 def check(self, s :str) -> float: | |
| 560 try: return float(s) | |
| 561 except ValueError: | |
| 562 s = s.lower() | |
| 563 if s == "nan" or s == "none": return math.nan | |
| 564 raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"") | |
| 565 | |
| 566 # MODELS | |
| 567 OldRule = List[Union[str, "OldRule"]] | |
| 568 class Model(Enum): | |
| 569 """ | |
| 570 Represents a metabolic model, either custom or locally supported. Custom models don't point | |
| 571 to valid file paths. | |
| 572 """ | |
| 573 | |
| 574 Recon = "Recon" | |
| 575 ENGRO2 = "ENGRO2" | |
| 576 ENGRO2_no_legend = "ENGRO2_no_legend" | |
| 577 HMRcore = "HMRcore" | |
| 578 HMRcore_no_legend = "HMRcore_no_legend" | |
| 579 Custom = "Custom" | |
| 580 | |
| 581 def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None: | |
| 582 if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model") | |
| 583 | |
| 584 def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]: | |
| 585 """ | |
| 586 Open "rules" file for this model. | |
| 587 | |
| 588 Returns: | |
| 589 Dict[str, Dict[str, OldRule]] : the rules for this model. | |
| 590 """ | |
| 591 path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") | |
| 592 self.__raiseMissingPathErr(path) | |
| 593 return readPickle(path) | |
| 594 | |
| 595 def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]: | |
| 596 """ | |
| 597 Open "gene translator (old: gene_in_rule)" file for this model. | |
| 598 | |
| 599 Returns: | |
| 600 Dict[str, Dict[str, str]] : the translator dict for this model. | |
| 601 """ | |
| 602 path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") | |
| 603 self.__raiseMissingPathErr(path) | |
| 604 return readPickle(path) | |
| 605 | |
| 606 def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree: | |
| 607 """Open the SVG metabolic map for this model.""" | |
| 608 path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/") | |
| 609 self.__raiseMissingPathErr(path) | |
| 610 return readSvg(path, customErr = DataErr(path, f"custom map in wrong format")) | |
| 611 | |
| 612 def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model: | |
| 613 """Load the COBRA model for this enum variant (supports Custom with explicit path/extension).""" | |
| 614 if(self is Model.Custom): | |
| 615 return self.load_custom_model(customPath, customExtension) | |
| 616 else: | |
| 617 return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show()) | |
| 618 | |
| 619 def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model: | |
| 620 """Load a COBRA model from a custom path, supporting XML, JSON, MAT, and YML (compressed or not).""" | |
| 621 ext = ext if ext else file_path.ext | |
| 622 try: | |
| 623 if str(ext) in FileFormat.XML.value: | |
| 624 return cobra.io.read_sbml_model(file_path.show()) | |
| 625 | |
| 626 if str(ext) in FileFormat.JSON.value: | |
| 627 # Compressed files are not automatically handled by cobra | |
| 628 if(ext == "json"): | |
| 629 return cobra.io.load_json_model(file_path.show()) | |
| 630 else: | |
| 631 return self.extract_model(file_path, ext, "json") | |
| 632 | |
| 633 if str(ext) in FileFormat.MAT.value: | |
| 634 # Compressed files are not automatically handled by cobra | |
| 635 if(ext == "mat"): | |
| 636 return cobra.io.load_matlab_model(file_path.show()) | |
| 637 else: | |
| 638 return self.extract_model(file_path, ext, "mat") | |
| 639 | |
| 640 if str(ext) in FileFormat.YML.value: | |
| 641 # Compressed files are not automatically handled by cobra | |
| 642 if(ext == "yml"): | |
| 643 return cobra.io.load_yaml_model(file_path.show()) | |
| 644 else: | |
| 645 return self.extract_model(file_path, ext, "yml") | |
| 646 | |
| 647 except Exception as e: raise DataErr(file_path, e.__str__()) | |
| 648 raise DataErr(file_path, | |
| 649 f"Fomat \"{file_path.ext}\" is not recognized, only JSON, XML, MAT and YAML (.yml) files are supported.") | |
| 650 | |
| 651 | |
| 652 def extract_model(self, file_path:FilePath, ext :FileFormat, model_encoding:Literal["json", "mat", "yml"]) -> cobra.Model: | |
| 653 """ | |
| 654 Extract JSON, MAT and YAML COBRA model from a compressed file (zip, gz, bz2). | |
| 655 | |
| 656 Args: | |
| 657 file_path: File path of the model | |
| 658 ext: File extensions of class FileFormat (should be .zip, .gz or .bz2) | |
| 659 | |
| 660 Returns: | |
| 661 cobra.Model: COBRApy model | |
| 662 | |
| 663 Raises: | |
| 664 Exception: Extraction errors | |
| 665 """ | |
| 666 ext_str = str(ext) | |
| 667 | |
| 668 try: | |
| 669 if '.zip' in ext_str: | |
| 670 with zipfile.ZipFile(file_path.show(), 'r') as zip_ref: | |
| 671 with zip_ref.open(zip_ref.namelist()[0]) as json_file: | |
| 672 content = json_file.read().decode('utf-8') | |
| 673 if model_encoding == "json": | |
| 674 return cobra.io.load_json_model(StringIO(content)) | |
| 675 elif model_encoding == "mat": | |
| 676 return cobra.io.load_matlab_model(StringIO(content)) | |
| 677 elif model_encoding == "yml": | |
| 678 return cobra.io.load_yaml_model(StringIO(content)) | |
| 679 else: | |
| 680 raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml") | |
| 681 elif '.gz' in ext_str: | |
| 682 with gzip.open(file_path.show(), 'rt', encoding='utf-8') as gz_ref: | |
| 683 if model_encoding == "json": | |
| 684 return cobra.io.load_json_model(gz_ref) | |
| 685 elif model_encoding == "mat": | |
| 686 return cobra.io.load_matlab_model(gz_ref) | |
| 687 elif model_encoding == "yml": | |
| 688 return cobra.io.load_yaml_model(gz_ref) | |
| 689 else: | |
| 690 raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml") | |
| 691 elif '.bz2' in ext_str: | |
| 692 with bz2.open(file_path.show(), 'rt', encoding='utf-8') as bz2_ref: | |
| 693 if model_encoding == "json": | |
| 694 return cobra.io.load_json_model(bz2_ref) | |
| 695 elif model_encoding == "mat": | |
| 696 return cobra.io.load_matlab_model(bz2_ref) | |
| 697 elif model_encoding == "yml": | |
| 698 return cobra.io.load_yaml_model(bz2_ref) | |
| 699 else: | |
| 700 raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml") | |
| 701 else: | |
| 702 raise ValueError(f"Compression format not supported: {ext_str}. Supported: .zip, .gz and .bz2") | |
| 703 | |
| 704 except Exception as e: | |
| 705 raise Exception(f"Error during model extraction: {str(e)}") | |
| 706 | |
| 707 | |
| 708 | |
| 709 def __str__(self) -> str: return self.value | |
| 710 | |
| 711 | 
