| 
392
 | 
     1 import math
 | 
| 
 | 
     2 import re
 | 
| 
 | 
     3 import sys
 | 
| 
 | 
     4 import csv
 | 
| 
 | 
     5 import pickle
 | 
| 
 | 
     6 import lxml.etree as ET
 | 
| 
 | 
     7 
 | 
| 
 | 
     8 from enum import Enum
 | 
| 
 | 
     9 from itertools import count
 | 
| 
408
 | 
    10 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Set, Tuple
 | 
| 
392
 | 
    11 
 | 
| 
 | 
    12 import pandas as pd
 | 
| 
 | 
    13 import cobra
 | 
| 
409
 | 
    14 from cobra import Model as cobraModel, Reaction, Metabolite
 | 
| 
392
 | 
    15 
 | 
| 
 | 
    16 import zipfile
 | 
| 
 | 
    17 import gzip
 | 
| 
 | 
    18 import bz2
 | 
| 
 | 
    19 from io import StringIO
 | 
| 
 | 
    20 
 | 
| 
394
 | 
    21 
 | 
| 
 | 
    22 
 | 
| 
392
 | 
    23 class ValueErr(Exception):
 | 
| 
 | 
    24     def __init__(self, param_name, expected, actual):
 | 
| 
 | 
    25         super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}")
 | 
| 
 | 
    26 
 | 
| 
 | 
    27 class PathErr(Exception):
 | 
| 
 | 
    28     def __init__(self, path, message):
 | 
| 
 | 
    29         super().__init__(f"Path error for '{path}': {message}")
 | 
| 
 | 
    30 
 | 
| 
 | 
    31 class FileFormat(Enum):
 | 
| 
 | 
    32     """
 | 
| 
 | 
    33     Encodes possible file extensions to conditionally save data in a different format.
 | 
| 
 | 
    34     """
 | 
| 
 | 
    35     DAT    = ("dat",) # this is how galaxy treats all your files!
 | 
| 
 | 
    36     CSV    = ("csv",) # this is how most editable input data is written
 | 
| 
 | 
    37     TSV    = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!!
 | 
| 
 | 
    38     SVG    = ("svg",) # this is how most metabolic maps are written
 | 
| 
 | 
    39     PNG    = ("png",) # this is a common output format for images (such as metabolic maps)
 | 
| 
 | 
    40     PDF    = ("pdf",) # this is also a common output format for images, as it's required in publications.
 | 
| 
 | 
    41     
 | 
| 
 | 
    42     # Updated to include compressed variants
 | 
| 
 | 
    43     XML    = ("xml", "xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed
 | 
| 
 | 
    44     JSON   = ("json", "json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed
 | 
| 
 | 
    45     MAT    = ("mat", "mat.gz", "mat.zip", "mat.bz2") # COBRA models can be stored as MAT files, sometimes compressed
 | 
| 
 | 
    46     YML    = ("yml", "yml.gz", "yml.zip", "yml.bz2") # COBRA models can be stored as YML files, sometimes compressed
 | 
| 
 | 
    47 
 | 
| 
 | 
    48     TXT    = ("txt",) # this is how most output data is written
 | 
| 
 | 
    49     PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved
 | 
| 
 | 
    50 
 | 
| 
 | 
    51     def __init__(self, *extensions):
 | 
| 
 | 
    52         self.extensions = extensions
 | 
| 
 | 
    53         # Store original extension when set via fromExt
 | 
| 
 | 
    54         self._original_extension = None
 | 
| 
 | 
    55 
 | 
| 
 | 
    56     @classmethod
 | 
| 
 | 
    57     def fromExt(cls, ext: str) -> "FileFormat":
 | 
| 
 | 
    58         """
 | 
| 
 | 
    59         Converts a file extension string to a FileFormat instance.
 | 
| 
 | 
    60         Args:
 | 
| 
 | 
    61             ext : The file extension as a string.
 | 
| 
 | 
    62         Returns:
 | 
| 
 | 
    63             FileFormat: The FileFormat instance corresponding to the file extension.
 | 
| 
 | 
    64         """
 | 
| 
 | 
    65         variantName = ext.upper()
 | 
| 
 | 
    66         if variantName in FileFormat.__members__: 
 | 
| 
 | 
    67             instance = FileFormat[variantName]
 | 
| 
 | 
    68             instance._original_extension = ext
 | 
| 
 | 
    69             return instance
 | 
| 
 | 
    70         
 | 
| 
 | 
    71         variantName = ext.lower()
 | 
| 
 | 
    72         for member in cls:
 | 
| 
 | 
    73             if variantName in member.value: 
 | 
| 
 | 
    74                 # Create a copy-like behavior by storing the original extension
 | 
| 
 | 
    75                 member._original_extension = ext
 | 
| 
 | 
    76                 return member
 | 
| 
 | 
    77         
 | 
| 
 | 
    78         raise ValueErr("ext", "a valid FileFormat file extension", ext)
 | 
| 
 | 
    79 
 | 
| 
 | 
    80     def __str__(self) -> str:
 | 
| 
 | 
    81         """
 | 
| 
 | 
    82         (Private) converts to str representation. Good practice for usage with argparse.
 | 
| 
 | 
    83         Returns:
 | 
| 
 | 
    84             str : the string representation of the file extension.
 | 
| 
 | 
    85         """
 | 
| 
 | 
    86         # If we have an original extension stored (for compressed files only), use it
 | 
| 
 | 
    87         if hasattr(self, '_original_extension') and self._original_extension:
 | 
| 
 | 
    88             return self._original_extension
 | 
| 
 | 
    89         
 | 
| 
 | 
    90         # For XML, JSON, MAT and YML without original extension, use the base extension
 | 
| 
 | 
    91         if self == FileFormat.XML:
 | 
| 
 | 
    92             return "xml"
 | 
| 
 | 
    93         elif self == FileFormat.JSON:
 | 
| 
 | 
    94             return "json"
 | 
| 
 | 
    95         elif self == FileFormat.MAT:
 | 
| 
 | 
    96             return "mat"
 | 
| 
 | 
    97         elif self == FileFormat.YML:
 | 
| 
 | 
    98             return "yml"
 | 
| 
 | 
    99         
 | 
| 
 | 
   100         return self.value[-1]
 | 
| 
 | 
   101 
 | 
| 
 | 
   102 class FilePath():
 | 
| 
 | 
   103     """
 | 
| 
 | 
   104     Represents a file path. View this as an attempt to standardize file-related operations by expecting
 | 
| 
 | 
   105     values of this type in any process requesting a file path.
 | 
| 
 | 
   106     """
 | 
| 
 | 
   107     def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None:
 | 
| 
 | 
   108         """
 | 
| 
 | 
   109         (Private) Initializes an instance of FilePath.
 | 
| 
 | 
   110         Args:
 | 
| 
 | 
   111             path : the end of the path, containing the file name.
 | 
| 
 | 
   112             ext : the file's extension.
 | 
| 
 | 
   113             prefix : anything before path, if the last '/' isn't there it's added by the code.
 | 
| 
 | 
   114         Returns:
 | 
| 
 | 
   115             None : practically, a FilePath instance.
 | 
| 
 | 
   116         """
 | 
| 
 | 
   117         self.ext = ext
 | 
| 
 | 
   118         self.filePath = filePath
 | 
| 
 | 
   119 
 | 
| 
 | 
   120         if prefix and prefix[-1] != '/': 
 | 
| 
 | 
   121             prefix += '/'
 | 
| 
 | 
   122         self.prefix = prefix
 | 
| 
 | 
   123     
 | 
| 
 | 
   124     @classmethod
 | 
| 
 | 
   125     def fromStrPath(cls, path: str) -> "FilePath":
 | 
| 
 | 
   126         """
 | 
| 
 | 
   127         Factory method to parse a string from which to obtain, if possible, a valid FilePath instance.
 | 
| 
 | 
   128         It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models.
 | 
| 
 | 
   129         These double extensions are not supported for other file types such as .csv.
 | 
| 
 | 
   130         Args:
 | 
| 
 | 
   131             path : the string containing the path
 | 
| 
 | 
   132         Raises:
 | 
| 
 | 
   133             PathErr : if the provided string doesn't represent a valid path.
 | 
| 
 | 
   134         Returns:
 | 
| 
 | 
   135             FilePath : the constructed instance.
 | 
| 
 | 
   136         """
 | 
| 
 | 
   137         result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path)
 | 
| 
 | 
   138         if not result or not result["name"] or not result["ext"]:
 | 
| 
 | 
   139             raise PathErr(path, "cannot recognize folder structure or extension in path")
 | 
| 
 | 
   140 
 | 
| 
 | 
   141         prefix = result["prefix"] if result["prefix"] else ""
 | 
| 
 | 
   142         name, ext = result["name"], result["ext"]
 | 
| 
 | 
   143 
 | 
| 
 | 
   144         # Check for double extensions (json.gz, xml.zip, etc.)
 | 
| 
 | 
   145         parts = path.split(".")
 | 
| 
 | 
   146         if len(parts) >= 3:  
 | 
| 
 | 
   147             penultimate = parts[-2]
 | 
| 
 | 
   148             last = parts[-1]
 | 
| 
 | 
   149             double_ext = f"{penultimate}.{last}"
 | 
| 
 | 
   150             
 | 
| 
 | 
   151             # Try the double extension first
 | 
| 
 | 
   152             try:
 | 
| 
 | 
   153                 ext_format = FileFormat.fromExt(double_ext)
 | 
| 
 | 
   154                 name = ".".join(parts[:-2])
 | 
| 
 | 
   155                 # Extract prefix if it exists
 | 
| 
 | 
   156                 if '/' in name:
 | 
| 
 | 
   157                     prefix = name[:name.rfind('/') + 1]
 | 
| 
 | 
   158                     name = name[name.rfind('/') + 1:]
 | 
| 
 | 
   159                 return cls(name, ext_format, prefix=prefix)
 | 
| 
 | 
   160             except ValueErr:
 | 
| 
 | 
   161                 # If double extension doesn't work, fall back to single extension
 | 
| 
 | 
   162                 pass
 | 
| 
 | 
   163 
 | 
| 
 | 
   164         # Single extension fallback (original logic)
 | 
| 
 | 
   165         try:
 | 
| 
 | 
   166             ext_format = FileFormat.fromExt(ext)
 | 
| 
 | 
   167             return cls(name, ext_format, prefix=prefix)
 | 
| 
 | 
   168         except ValueErr:
 | 
| 
 | 
   169             raise PathErr(path, f"unsupported file extension: {ext}")
 | 
| 
 | 
   170 
 | 
| 
 | 
   171     def show(self) -> str:
 | 
| 
 | 
   172         """
 | 
| 
 | 
   173         Shows the path as a string.
 | 
| 
 | 
   174         Returns:
 | 
| 
 | 
   175             str : the path shown as a string.
 | 
| 
 | 
   176         """
 | 
| 
 | 
   177         return f"{self.prefix}{self.filePath}.{self.ext}"
 | 
| 
 | 
   178     
 | 
| 
 | 
   179     def __str__(self) -> str: 
 | 
| 
 | 
   180         return self.show()
 | 
| 
 | 
   181 
 | 
| 
 | 
   182 # ERRORS
 | 
| 
 | 
   183 def terminate(msg :str) -> None:
 | 
| 
 | 
   184     """
 | 
| 
 | 
   185     Terminate the execution of the script with an error message.
 | 
| 
 | 
   186     
 | 
| 
 | 
   187     Args:
 | 
| 
 | 
   188         msg (str): The error message to be displayed.
 | 
| 
 | 
   189     
 | 
| 
 | 
   190     Returns:
 | 
| 
 | 
   191         None
 | 
| 
 | 
   192     """
 | 
| 
 | 
   193     sys.exit(f"Execution aborted: {msg}\n")
 | 
| 
 | 
   194 
 | 
| 
 | 
   195 def logWarning(msg :str, loggerPath :str) -> None:
 | 
| 
 | 
   196     """
 | 
| 
 | 
   197     Log a warning message to an output log file and print it to the console. The final period and a
 | 
| 
 | 
   198     newline is added by the function.
 | 
| 
 | 
   199 
 | 
| 
 | 
   200     Args:
 | 
| 
 | 
   201         s (str): The warning message to be logged and printed.
 | 
| 
 | 
   202         loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and
 | 
| 
 | 
   203         immediately read back (beware relative expensive operation, log with caution).
 | 
| 
 | 
   204 
 | 
| 
 | 
   205     Returns:
 | 
| 
 | 
   206         None
 | 
| 
 | 
   207     """
 | 
| 
 | 
   208     # building the path and then reading it immediately seems useless, but it's actually a way of
 | 
| 
 | 
   209     # validating that reduces repetition on the caller's side. Besides, logging a message by writing
 | 
| 
 | 
   210     # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from
 | 
| 
 | 
   211     # mindlessly logging whenever something comes up, log at the very end and tell the user everything
 | 
| 
 | 
   212     # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to
 | 
| 
 | 
   213     # the file only at the end of the program's execution.
 | 
| 
 | 
   214     with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n")
 | 
| 
 | 
   215 
 | 
| 
 | 
   216 class CustomErr(Exception):
 | 
| 
 | 
   217     """
 | 
| 
 | 
   218     Custom error class to handle exceptions in a structured way, with a unique identifier and a message.
 | 
| 
 | 
   219     """
 | 
| 
 | 
   220     __idGenerator = count()
 | 
| 
 | 
   221     errName = "Custom Error"
 | 
| 
 | 
   222     def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None:
 | 
| 
 | 
   223         """
 | 
| 
 | 
   224         (Private) Initializes an instance of CustomErr.
 | 
| 
 | 
   225 
 | 
| 
 | 
   226         Args:
 | 
| 
 | 
   227             msg (str): Error message to be displayed.
 | 
| 
 | 
   228             details (str): Informs the user more about the error encountered. Defaults to "".
 | 
| 
 | 
   229             explicitErrCode (int): Explicit error code to be used. Defaults to -1.
 | 
| 
 | 
   230         
 | 
| 
 | 
   231         Returns:
 | 
| 
 | 
   232             None : practically, a CustomErr instance.
 | 
| 
 | 
   233         """
 | 
| 
 | 
   234         self.msg     = msg
 | 
| 
 | 
   235         self.details = details
 | 
| 
 | 
   236 
 | 
| 
 | 
   237         self.id = max(explicitErrCode, next(CustomErr.__idGenerator))
 | 
| 
 | 
   238 
 | 
| 
 | 
   239     def throw(self, loggerPath = "") -> None:
 | 
| 
 | 
   240         """
 | 
| 
 | 
   241         Raises the current CustomErr instance, logging a warning message before doing so.
 | 
| 
 | 
   242 
 | 
| 
 | 
   243         Raises:
 | 
| 
 | 
   244             self: The current CustomErr instance.
 | 
| 
 | 
   245         
 | 
| 
 | 
   246         Returns:
 | 
| 
 | 
   247             None
 | 
| 
 | 
   248         """
 | 
| 
 | 
   249         if loggerPath: logWarning(str(self), loggerPath)
 | 
| 
 | 
   250         raise self
 | 
| 
 | 
   251 
 | 
| 
 | 
   252     def abort(self) -> None:
 | 
| 
 | 
   253         """
 | 
| 
 | 
   254         Aborts the execution of the script.
 | 
| 
 | 
   255         
 | 
| 
 | 
   256         Returns:
 | 
| 
 | 
   257             None
 | 
| 
 | 
   258         """
 | 
| 
 | 
   259         terminate(str(self))
 | 
| 
 | 
   260 
 | 
| 
 | 
   261     def __str__(self) -> str:
 | 
| 
 | 
   262         """
 | 
| 
 | 
   263         (Private) Returns a string representing the current CustomErr instance.
 | 
| 
 | 
   264 
 | 
| 
 | 
   265         Returns:
 | 
| 
 | 
   266             str: A string representing the current CustomErr instance.
 | 
| 
 | 
   267         """
 | 
| 
 | 
   268         return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}."
 | 
| 
 | 
   269 
 | 
| 
 | 
   270 class ArgsErr(CustomErr):
 | 
| 
 | 
   271     """
 | 
| 
 | 
   272     CustomErr subclass for UI arguments errors.
 | 
| 
 | 
   273     """
 | 
| 
 | 
   274     errName = "Args Error"
 | 
| 
 | 
   275     def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None:
 | 
| 
 | 
   276         super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg)
 | 
| 
 | 
   277 
 | 
| 
 | 
   278 class DataErr(CustomErr):
 | 
| 
 | 
   279     """
 | 
| 
 | 
   280     CustomErr subclass for data formatting errors.
 | 
| 
 | 
   281     """
 | 
| 
 | 
   282     errName = "Data Format Error"
 | 
| 
 | 
   283     def __init__(self, fileName :str, msg = "no further details provided") -> None:
 | 
| 
 | 
   284         super().__init__(f"file \"{fileName}\" contains malformed data", msg)
 | 
| 
 | 
   285 
 | 
| 
 | 
   286 class PathErr(CustomErr):
 | 
| 
 | 
   287     """
 | 
| 
 | 
   288     CustomErr subclass for filepath formatting errors.
 | 
| 
 | 
   289     """
 | 
| 
 | 
   290     errName = "Path Error"
 | 
| 
 | 
   291     def __init__(self, path :FilePath, msg = "no further details provided") -> None:
 | 
| 
 | 
   292         super().__init__(f"path \"{path}\" is invalid", msg)
 | 
| 
 | 
   293 
 | 
| 
 | 
   294 class ValueErr(CustomErr):
 | 
| 
 | 
   295     """
 | 
| 
 | 
   296     CustomErr subclass for any value error.
 | 
| 
 | 
   297     """
 | 
| 
 | 
   298     errName = "Value Error"
 | 
| 
 | 
   299     def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None:
 | 
| 
 | 
   300         super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg)
 | 
| 
 | 
   301 
 | 
| 
 | 
   302 # RESULT
 | 
| 
 | 
   303 T = TypeVar('T')
 | 
| 
 | 
   304 E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened!
 | 
| 
 | 
   305 class Result(Generic[T, E]):
 | 
| 
 | 
   306     class ResultErr(CustomErr):
 | 
| 
 | 
   307         """
 | 
| 
 | 
   308         CustomErr subclass for all Result errors.
 | 
| 
 | 
   309         """
 | 
| 
 | 
   310         errName = "Result Error"
 | 
| 
 | 
   311         def __init__(self, msg = "no further details provided") -> None:
 | 
| 
 | 
   312             super().__init__(msg)
 | 
| 
 | 
   313     """
 | 
| 
 | 
   314     Class to handle the result of an operation, with a value and a boolean flag to indicate
 | 
| 
 | 
   315     whether the operation was successful or not.
 | 
| 
 | 
   316     """
 | 
| 
 | 
   317     def __init__(self, value :Union[T, E], isOk :bool) -> None:
 | 
| 
 | 
   318         """
 | 
| 
 | 
   319         (Private) Initializes an instance of Result.
 | 
| 
 | 
   320 
 | 
| 
 | 
   321         Args:
 | 
| 
 | 
   322             value (Union[T, E]): The value to be stored in the Result instance.
 | 
| 
 | 
   323             isOk (bool): A boolean flag to indicate whether the operation was successful or not.
 | 
| 
 | 
   324         
 | 
| 
 | 
   325             Returns:
 | 
| 
 | 
   326                 None : practically, a Result instance.
 | 
| 
 | 
   327         """
 | 
| 
 | 
   328         self.isOk  = isOk
 | 
| 
 | 
   329         self.isErr = not isOk
 | 
| 
 | 
   330         self.value = value
 | 
| 
 | 
   331 
 | 
| 
 | 
   332     @classmethod
 | 
| 
 | 
   333     def Ok(cls,  value :T) -> "Result":
 | 
| 
 | 
   334         """
 | 
| 
 | 
   335         Constructs a new Result instance with a successful operation.
 | 
| 
 | 
   336 
 | 
| 
 | 
   337         Args:
 | 
| 
 | 
   338             value (T): The value to be stored in the Result instance, set as successful.
 | 
| 
 | 
   339 
 | 
| 
 | 
   340         Returns:
 | 
| 
 | 
   341             Result: A new Result instance with a successful operation.
 | 
| 
 | 
   342         """
 | 
| 
 | 
   343         return Result(value, isOk = True)
 | 
| 
 | 
   344     
 | 
| 
 | 
   345     @classmethod
 | 
| 
 | 
   346     def Err(cls, value :E) -> "Result": 
 | 
| 
 | 
   347         """
 | 
| 
 | 
   348         Constructs a new Result instance with a failed operation.
 | 
| 
 | 
   349 
 | 
| 
 | 
   350         Args:
 | 
| 
 | 
   351             value (E): The value to be stored in the Result instance, set as failed.
 | 
| 
 | 
   352 
 | 
| 
 | 
   353         Returns:
 | 
| 
 | 
   354             Result: A new Result instance with a failed operation.
 | 
| 
 | 
   355         """
 | 
| 
 | 
   356         return Result(value, isOk = False)
 | 
| 
 | 
   357 
 | 
| 
 | 
   358     def unwrap(self) -> T:
 | 
| 
 | 
   359         """
 | 
| 
 | 
   360         Unwraps the value of the Result instance, if the operation was successful.
 | 
| 
 | 
   361 
 | 
| 
 | 
   362         Raises:
 | 
| 
 | 
   363             ResultErr: If the operation was not successful.
 | 
| 
 | 
   364 
 | 
| 
 | 
   365         Returns:
 | 
| 
 | 
   366             T: The value of the Result instance, if the operation was successful.
 | 
| 
 | 
   367         """
 | 
| 
 | 
   368         if self.isOk: return self.value
 | 
| 
 | 
   369         raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}")
 | 
| 
 | 
   370 
 | 
| 
 | 
   371     def unwrapOr(self, default :T) -> T:
 | 
| 
 | 
   372         """
 | 
| 
 | 
   373         Unwraps the value of the Result instance, if the operation was successful, otherwise
 | 
| 
 | 
   374         it returns a default value.
 | 
| 
 | 
   375 
 | 
| 
 | 
   376         Args:
 | 
| 
 | 
   377             default (T): The default value to be returned if the operation was not successful.
 | 
| 
 | 
   378 
 | 
| 
 | 
   379         Returns:
 | 
| 
 | 
   380             T: The value of the Result instance, if the operation was successful,
 | 
| 
 | 
   381             otherwise the default value.
 | 
| 
 | 
   382         """
 | 
| 
 | 
   383         return self.value if self.isOk else default
 | 
| 
 | 
   384     
 | 
| 
 | 
   385     def expect(self, err :"Result.ResultErr") -> T:
 | 
| 
 | 
   386         """
 | 
| 
 | 
   387         Expects that the value of the Result instance is successful, otherwise it raises an error.
 | 
| 
 | 
   388 
 | 
| 
 | 
   389         Args:
 | 
| 
 | 
   390             err (Exception): The error to be raised if the operation was not successful.
 | 
| 
 | 
   391 
 | 
| 
 | 
   392         Raises:
 | 
| 
 | 
   393             err: The error raised if the operation was not successful.
 | 
| 
 | 
   394 
 | 
| 
 | 
   395         Returns:
 | 
| 
 | 
   396             T: The value of the Result instance, if the operation was successful.
 | 
| 
 | 
   397         """
 | 
| 
 | 
   398         if self.isOk: return self.value
 | 
| 
 | 
   399         raise err
 | 
| 
 | 
   400 
 | 
| 
 | 
   401     U = TypeVar("U")
 | 
| 
 | 
   402     def map(self, mapper: Callable[[T], U]) -> "Result[U, E]":
 | 
| 
 | 
   403         """
 | 
| 
 | 
   404         Maps the value of the current Result to whatever is returned by the mapper function.
 | 
| 
 | 
   405         If the Result contained an unsuccessful operation to begin with it remains unchanged
 | 
| 
 | 
   406         (a reference to the current instance is returned).
 | 
| 
 | 
   407         If the mapper function panics the returned result instance will be of the error kind.
 | 
| 
 | 
   408 
 | 
| 
 | 
   409         Args:
 | 
| 
 | 
   410             mapper (Callable[[T], U]): The mapper operation to be applied to the Result value.
 | 
| 
 | 
   411 
 | 
| 
 | 
   412         Returns:
 | 
| 
 | 
   413             Result[U, E]: The result of the mapper operation applied to the Result value.
 | 
| 
 | 
   414         """
 | 
| 
 | 
   415         if self.isErr: return self
 | 
| 
 | 
   416         try: return Result.Ok(mapper(self.value))
 | 
| 
 | 
   417         except Exception as e: return Result.Err(e)
 | 
| 
 | 
   418     
 | 
| 
 | 
   419     D = TypeVar("D", bound = "Result.ResultErr")
 | 
| 
 | 
   420     def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]":
 | 
| 
 | 
   421         """
 | 
| 
 | 
   422         Maps the error of the current Result to whatever is returned by the mapper function.
 | 
| 
 | 
   423         If the Result contained a successful operation it remains unchanged
 | 
| 
 | 
   424         (a reference to the current instance is returned).
 | 
| 
 | 
   425         If the mapper function panics this method does as well.
 | 
| 
 | 
   426 
 | 
| 
 | 
   427         Args:
 | 
| 
 | 
   428             mapper (Callable[[E], D]): The mapper operation to be applied to the Result error.
 | 
| 
 | 
   429 
 | 
| 
 | 
   430         Returns:
 | 
| 
 | 
   431             Result[U, E]: The result of the mapper operation applied to the Result error.
 | 
| 
 | 
   432         """
 | 
| 
 | 
   433         if self.isOk: return self
 | 
| 
 | 
   434         return Result.Err(mapper(self.value))
 | 
| 
 | 
   435 
 | 
| 
 | 
   436     def __str__(self):
 | 
| 
 | 
   437         return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})"
 | 
| 
 | 
   438 
 | 
| 
 | 
   439 # FILES
 | 
| 
 | 
   440 def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame:
 | 
| 
 | 
   441     """
 | 
| 
 | 
   442     Reads a .csv or .tsv file and returns it as a Pandas DataFrame.
 | 
| 
 | 
   443 
 | 
| 
 | 
   444     Args:
 | 
| 
 | 
   445         path : the path to the dataset file.
 | 
| 
 | 
   446         datasetName : the name of the dataset.
 | 
| 
 | 
   447 
 | 
| 
 | 
   448     Raises:
 | 
| 
 | 
   449         DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if
 | 
| 
 | 
   450         it has less than 2 columns.
 | 
| 
 | 
   451     
 | 
| 
 | 
   452     Returns:
 | 
| 
 | 
   453         pandas.DataFrame: The dataset loaded as a Pandas DataFrame.
 | 
| 
 | 
   454     """
 | 
| 
 | 
   455     # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than
 | 
| 
 | 
   456     # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code.
 | 
| 
 | 
   457     # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really
 | 
| 
 | 
   458     # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and
 | 
| 
 | 
   459     # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is
 | 
| 
 | 
   460     # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions.
 | 
| 
 | 
   461     try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python")
 | 
| 
 | 
   462     except:
 | 
| 
 | 
   463         try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python")
 | 
| 
 | 
   464         except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}")
 | 
| 
 | 
   465     
 | 
| 
 | 
   466     if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns")
 | 
| 
 | 
   467     return dataset
 | 
| 
 | 
   468 
 | 
| 
 | 
   469 def readPickle(path :FilePath) -> Any:
 | 
| 
 | 
   470     """
 | 
| 
 | 
   471     Reads the contents of a .pickle file, which needs to exist at the given path.
 | 
| 
 | 
   472 
 | 
| 
 | 
   473     Args:
 | 
| 
 | 
   474         path : the path to the .pickle file.
 | 
| 
 | 
   475     
 | 
| 
 | 
   476     Returns:
 | 
| 
 | 
   477         Any : the data inside a pickle file, could be anything.
 | 
| 
 | 
   478     """
 | 
| 
 | 
   479     with open(path.show(), "rb") as fd: return pickle.load(fd)
 | 
| 
 | 
   480 
 | 
| 
 | 
   481 def writePickle(path :FilePath, data :Any) -> None:
 | 
| 
 | 
   482     """
 | 
| 
 | 
   483     Saves any data in a .pickle file, created at the given path.
 | 
| 
 | 
   484 
 | 
| 
 | 
   485     Args:
 | 
| 
 | 
   486         path : the path to the .pickle file.
 | 
| 
 | 
   487         data : the data to be written to the file.
 | 
| 
 | 
   488     
 | 
| 
 | 
   489     Returns:
 | 
| 
 | 
   490         None
 | 
| 
 | 
   491     """
 | 
| 
 | 
   492     with open(path.show(), "wb") as fd: pickle.dump(data, fd)
 | 
| 
 | 
   493 
 | 
| 
 | 
   494 def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]:
 | 
| 
 | 
   495     """
 | 
| 
 | 
   496     Reads the contents of a .csv file, which needs to exist at the given path.
 | 
| 
 | 
   497 
 | 
| 
 | 
   498     Args:
 | 
| 
 | 
   499         path : the path to the .csv file.
 | 
| 
 | 
   500         delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter).
 | 
| 
 | 
   501         skipHeader : whether the first row of the file is a header and should be skipped.
 | 
| 
 | 
   502     
 | 
| 
 | 
   503     Returns:
 | 
| 
 | 
   504         List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas.
 | 
| 
 | 
   505     """
 | 
| 
 | 
   506     with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:]
 | 
| 
 | 
   507 
 | 
| 
 | 
   508 def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree:
 | 
| 
 | 
   509     """
 | 
| 
 | 
   510     Reads the contents of a .svg file, which needs to exist at the given path.
 | 
| 
 | 
   511 
 | 
| 
 | 
   512     Args:
 | 
| 
 | 
   513         path : the path to the .svg file.
 | 
| 
 | 
   514     
 | 
| 
 | 
   515     Raises:
 | 
| 
 | 
   516         DataErr : if the map is malformed.
 | 
| 
 | 
   517     
 | 
| 
 | 
   518     Returns:
 | 
| 
 | 
   519         Any : the data inside a svg file, could be anything.
 | 
| 
 | 
   520     """
 | 
| 
 | 
   521     try: return ET.parse(path.show())
 | 
| 
 | 
   522     except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err:
 | 
| 
 | 
   523         raise customErr if customErr else err
 | 
| 
 | 
   524 
 | 
| 
 | 
   525 def writeSvg(path :FilePath, data:ET.ElementTree) -> None:
 | 
| 
 | 
   526     """
 | 
| 
 | 
   527     Saves svg data opened with lxml.etree in a .svg file, created at the given path.
 | 
| 
 | 
   528 
 | 
| 
 | 
   529     Args:
 | 
| 
 | 
   530         path : the path to the .svg file.
 | 
| 
 | 
   531         data : the data to be written to the file.
 | 
| 
 | 
   532     
 | 
| 
 | 
   533     Returns:
 | 
| 
 | 
   534         None
 | 
| 
 | 
   535     """
 | 
| 
 | 
   536     with open(path.show(), "wb") as fd: fd.write(ET.tostring(data))
 | 
| 
 | 
   537 
 | 
| 
 | 
   538 # UI ARGUMENTS
 | 
| 
 | 
   539 class Bool:
 | 
| 
 | 
   540     def __init__(self, argName :str) -> None:
 | 
| 
 | 
   541         self.argName = argName
 | 
| 
 | 
   542 
 | 
| 
 | 
   543     def __call__(self, s :str) -> bool: return self.check(s)
 | 
| 
 | 
   544 
 | 
| 
 | 
   545     def check(self, s :str) -> bool:
 | 
| 
 | 
   546         s = s.lower()
 | 
| 
 | 
   547         if s == "true" : return True
 | 
| 
 | 
   548         if s == "false": return False
 | 
| 
 | 
   549         raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"")
 | 
| 
 | 
   550 
 | 
| 
 | 
   551 class Float:
 | 
| 
 | 
   552     def __init__(self, argName = "Dataset values, not an argument") -> None:
 | 
| 
 | 
   553         self.argName = argName
 | 
| 
 | 
   554     
 | 
| 
 | 
   555     def __call__(self, s :str) -> float: return self.check(s)
 | 
| 
 | 
   556 
 | 
| 
 | 
   557     def check(self, s :str) -> float:
 | 
| 
 | 
   558         try: return float(s)
 | 
| 
 | 
   559         except ValueError:
 | 
| 
 | 
   560             s = s.lower()
 | 
| 
 | 
   561             if s == "nan" or s == "none": return math.nan
 | 
| 
 | 
   562             raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"")
 | 
| 
 | 
   563 
 | 
| 
 | 
   564 # MODELS
 | 
| 
 | 
   565 OldRule = List[Union[str, "OldRule"]]
 | 
| 
 | 
   566 class Model(Enum):
 | 
| 
 | 
   567     """
 | 
| 
 | 
   568     Represents a metabolic model, either custom or locally supported. Custom models don't point
 | 
| 
 | 
   569     to valid file paths.
 | 
| 
 | 
   570     """
 | 
| 
 | 
   571 
 | 
| 
 | 
   572     Recon   = "Recon"
 | 
| 
 | 
   573     ENGRO2  = "ENGRO2"
 | 
| 
 | 
   574     ENGRO2_no_legend = "ENGRO2_no_legend"
 | 
| 
 | 
   575     HMRcore = "HMRcore"
 | 
| 
 | 
   576     HMRcore_no_legend = "HMRcore_no_legend"
 | 
| 
 | 
   577     Custom  = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths.
 | 
| 
 | 
   578 
 | 
| 
 | 
   579     def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None:
 | 
| 
 | 
   580         if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model")
 | 
| 
 | 
   581 
 | 
| 
 | 
   582     def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]:
 | 
| 
 | 
   583         """
 | 
| 
 | 
   584         Open "rules" file for this model.
 | 
| 
 | 
   585 
 | 
| 
 | 
   586         Returns:
 | 
| 
 | 
   587             Dict[str, Dict[str, OldRule]] : the rules for this model.
 | 
| 
 | 
   588         """
 | 
| 
 | 
   589         path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
 | 
| 
 | 
   590         self.__raiseMissingPathErr(path)
 | 
| 
 | 
   591         return readPickle(path)
 | 
| 
 | 
   592     
 | 
| 
 | 
   593     def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]:
 | 
| 
 | 
   594         """
 | 
| 
 | 
   595         Open "gene translator (old: gene_in_rule)" file for this model.
 | 
| 
 | 
   596 
 | 
| 
 | 
   597         Returns:
 | 
| 
 | 
   598             Dict[str, Dict[str, str]] : the translator dict for this model.
 | 
| 
 | 
   599         """
 | 
| 
 | 
   600         path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
 | 
| 
 | 
   601         self.__raiseMissingPathErr(path)
 | 
| 
 | 
   602         return readPickle(path)
 | 
| 
 | 
   603     
 | 
| 
 | 
   604     def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree:
 | 
| 
 | 
   605         path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/")
 | 
| 
 | 
   606         self.__raiseMissingPathErr(path)
 | 
| 
 | 
   607         return readSvg(path, customErr = DataErr(path, f"custom map in wrong format"))
 | 
| 
 | 
   608     
 | 
| 
 | 
   609     def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model:
 | 
| 
 | 
   610         if(self is Model.Custom):
 | 
| 
 | 
   611             return self.load_custom_model(customPath, customExtension)
 | 
| 
 | 
   612         else:
 | 
| 
 | 
   613             return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show())
 | 
| 
 | 
   614         
 | 
| 
 | 
   615     def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model:
 | 
| 
 | 
   616         ext = ext if ext else file_path.ext
 | 
| 
 | 
   617         try:
 | 
| 
 | 
   618             if str(ext) in FileFormat.XML.value:
 | 
| 
 | 
   619                 return cobra.io.read_sbml_model(file_path.show())
 | 
| 
 | 
   620             
 | 
| 
 | 
   621             if str(ext) in FileFormat.JSON.value:
 | 
| 
 | 
   622                 # Compressed files are not automatically handled by cobra
 | 
| 
 | 
   623                 if(ext == "json"):
 | 
| 
 | 
   624                     return cobra.io.load_json_model(file_path.show())
 | 
| 
 | 
   625                 else: 
 | 
| 
 | 
   626                     return self.extract_model(file_path, ext, "json")
 | 
| 
 | 
   627 
 | 
| 
 | 
   628             if str(ext) in FileFormat.MAT.value:
 | 
| 
 | 
   629                 # Compressed files are not automatically handled by cobra
 | 
| 
 | 
   630                 if(ext == "mat"):
 | 
| 
 | 
   631                     return cobra.io.load_matlab_model(file_path.show())
 | 
| 
 | 
   632                 else: 
 | 
| 
 | 
   633                     return self.extract_model(file_path, ext, "mat")
 | 
| 
 | 
   634 
 | 
| 
 | 
   635             if str(ext) in FileFormat.YML.value:
 | 
| 
 | 
   636                 # Compressed files are not automatically handled by cobra
 | 
| 
 | 
   637                 if(ext == "yml"):
 | 
| 
 | 
   638                     return cobra.io.load_yaml_model(file_path.show())
 | 
| 
 | 
   639                 else: 
 | 
| 
 | 
   640                     return self.extract_model(file_path, ext, "yml")
 | 
| 
 | 
   641 
 | 
| 
 | 
   642         except Exception as e: raise DataErr(file_path, e.__str__())
 | 
| 
 | 
   643         raise DataErr(file_path,
 | 
| 
 | 
   644             f"Fomat \"{file_path.ext}\" is not recognized, only JSON, XML, MAT and YAML (.yml) files are supported.")
 | 
| 
 | 
   645     
 | 
| 
 | 
   646 
 | 
| 
 | 
   647     def extract_model(self, file_path:FilePath, ext :FileFormat, model_encoding:Literal["json", "mat", "yml"]) -> cobra.Model:
 | 
| 
 | 
   648         """
 | 
| 
 | 
   649         Extract JSON, MAT and YAML COBRA model from a compressed file (zip, gz, bz2).
 | 
| 
 | 
   650         
 | 
| 
 | 
   651         Args:
 | 
| 
 | 
   652             file_path: File path of the model
 | 
| 
 | 
   653             ext: File extensions of class FileFormat (should be .zip, .gz or .bz2)
 | 
| 
 | 
   654             
 | 
| 
 | 
   655         Returns:
 | 
| 
 | 
   656             cobra.Model: COBRApy model 
 | 
| 
 | 
   657             
 | 
| 
 | 
   658         Raises:
 | 
| 
 | 
   659             Exception: Extraction errors
 | 
| 
 | 
   660         """
 | 
| 
 | 
   661         ext_str = str(ext)
 | 
| 
 | 
   662 
 | 
| 
 | 
   663         try:
 | 
| 
 | 
   664             if '.zip' in ext_str:
 | 
| 
 | 
   665                 with zipfile.ZipFile(file_path.show(), 'r') as zip_ref:
 | 
| 
 | 
   666                     with zip_ref.open(zip_ref.namelist()[0]) as json_file:
 | 
| 
 | 
   667                         content = json_file.read().decode('utf-8')
 | 
| 
 | 
   668                         if model_encoding == "json":
 | 
| 
 | 
   669                             return cobra.io.load_json_model(StringIO(content))
 | 
| 
 | 
   670                         elif model_encoding == "mat":
 | 
| 
 | 
   671                             return cobra.io.load_matlab_model(StringIO(content))
 | 
| 
 | 
   672                         elif model_encoding == "yml":
 | 
| 
 | 
   673                             return cobra.io.load_yaml_model(StringIO(content))
 | 
| 
 | 
   674                         else:
 | 
| 
 | 
   675                             raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
 | 
| 
 | 
   676             elif '.gz' in ext_str:
 | 
| 
 | 
   677                 with gzip.open(file_path.show(), 'rt', encoding='utf-8') as gz_ref:
 | 
| 
 | 
   678                     if model_encoding == "json":
 | 
| 
 | 
   679                         return cobra.io.load_json_model(gz_ref)
 | 
| 
 | 
   680                     elif model_encoding == "mat":
 | 
| 
 | 
   681                         return cobra.io.load_matlab_model(gz_ref)
 | 
| 
 | 
   682                     elif model_encoding == "yml":
 | 
| 
 | 
   683                         return cobra.io.load_yaml_model(gz_ref)
 | 
| 
 | 
   684                     else:
 | 
| 
 | 
   685                         raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
 | 
| 
 | 
   686             elif '.bz2' in ext_str:
 | 
| 
 | 
   687                 with bz2.open(file_path.show(), 'rt', encoding='utf-8') as bz2_ref:
 | 
| 
 | 
   688                     if model_encoding == "json":
 | 
| 
 | 
   689                         return cobra.io.load_json_model(bz2_ref)
 | 
| 
 | 
   690                     elif model_encoding == "mat":
 | 
| 
 | 
   691                         return cobra.io.load_matlab_model(bz2_ref)
 | 
| 
 | 
   692                     elif model_encoding == "yml":
 | 
| 
 | 
   693                         return cobra.io.load_yaml_model(bz2_ref)
 | 
| 
 | 
   694                     else:
 | 
| 
 | 
   695                         raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
 | 
| 
 | 
   696             else:
 | 
| 
 | 
   697                 raise ValueError(f"Compression format not supported: {ext_str}. Supported: .zip, .gz and .bz2")
 | 
| 
 | 
   698             
 | 
| 
 | 
   699         except Exception as e:
 | 
| 
 | 
   700             raise Exception(f"Error during model extraction: {str(e)}")
 | 
| 
 | 
   701         
 | 
| 
 | 
   702 
 | 
| 
 | 
   703 
 | 
| 
394
 | 
   704     def __str__(self) -> str: return self.value
 | 
| 
 | 
   705 
 | 
| 
 | 
   706 
 | 
| 
 | 
   707 def convert_genes(model,annotation):
 | 
| 
 | 
   708     from cobra.manipulation import rename_genes
 | 
| 
 | 
   709     model2=model.copy()
 | 
| 
 | 
   710     try:
 | 
| 
 | 
   711         dict_genes={gene.id:gene.notes[annotation]  for gene in model2.genes}
 | 
| 
 | 
   712     except:
 | 
| 
 | 
   713         print("No annotation in gene dict!")
 | 
| 
 | 
   714         return -1
 | 
| 
 | 
   715     rename_genes(model2,dict_genes)
 | 
| 
 | 
   716 
 | 
| 
408
 | 
   717     return model2
 | 
| 
 | 
   718 
 | 
| 
 | 
   719 
 | 
| 
409
 | 
   720 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobra.Model:
 | 
| 
408
 | 
   721     """
 | 
| 
 | 
   722     Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni.
 | 
| 
 | 
   723     
 | 
| 
 | 
   724     Args:
 | 
| 
 | 
   725         csv_path: Path al file CSV (separato da tab)
 | 
| 
 | 
   726         model_id: ID del modello da creare
 | 
| 
 | 
   727         
 | 
| 
 | 
   728     Returns:
 | 
| 
 | 
   729         cobra.Model: Il modello COBRApy costruito
 | 
| 
 | 
   730     """
 | 
| 
 | 
   731     
 | 
| 
 | 
   732     # Leggi i dati dal CSV
 | 
| 
 | 
   733     df = pd.read_csv(csv_path, sep='\t')
 | 
| 
 | 
   734     
 | 
| 
 | 
   735     # Crea il modello vuoto
 | 
| 
409
 | 
   736     model = cobraModel(model_id)
 | 
| 
408
 | 
   737     
 | 
| 
 | 
   738     # Dict per tenere traccia di metaboliti e compartimenti
 | 
| 
 | 
   739     metabolites_dict = {}
 | 
| 
 | 
   740     compartments_dict = {}
 | 
| 
 | 
   741     
 | 
| 
 | 
   742     print(f"Costruendo modello da {len(df)} reazioni...")
 | 
| 
 | 
   743     
 | 
| 
 | 
   744     # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni
 | 
| 
 | 
   745     for idx, row in df.iterrows():
 | 
| 
 | 
   746         reaction_formula = str(row['Reaction']).strip()
 | 
| 
 | 
   747         if not reaction_formula or reaction_formula == 'nan':
 | 
| 
 | 
   748             continue
 | 
| 
 | 
   749             
 | 
| 
 | 
   750         # Estrai metaboliti dalla formula della reazione
 | 
| 
 | 
   751         metabolites = extract_metabolites_from_reaction(reaction_formula)
 | 
| 
 | 
   752         
 | 
| 
 | 
   753         for met_id in metabolites:
 | 
| 
 | 
   754             compartment = extract_compartment_from_metabolite(met_id)
 | 
| 
 | 
   755             
 | 
| 
 | 
   756             # Aggiungi compartimento se non esiste
 | 
| 
 | 
   757             if compartment not in compartments_dict:
 | 
| 
 | 
   758                 compartments_dict[compartment] = compartment
 | 
| 
 | 
   759             
 | 
| 
 | 
   760             # Aggiungi metabolita se non esiste
 | 
| 
 | 
   761             if met_id not in metabolites_dict:
 | 
| 
 | 
   762                 metabolites_dict[met_id] = Metabolite(
 | 
| 
 | 
   763                     id=met_id,
 | 
| 
 | 
   764                     compartment=compartment,
 | 
| 
 | 
   765                     name=met_id.replace(f"_{compartment}", "").replace("__", "_")
 | 
| 
 | 
   766                 )
 | 
| 
 | 
   767     
 | 
| 
 | 
   768     # Aggiungi compartimenti al modello
 | 
| 
 | 
   769     model.compartments = compartments_dict
 | 
| 
 | 
   770     
 | 
| 
 | 
   771     # Aggiungi metaboliti al modello  
 | 
| 
 | 
   772     model.add_metabolites(list(metabolites_dict.values()))
 | 
| 
 | 
   773     
 | 
| 
 | 
   774     print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti")
 | 
| 
 | 
   775     
 | 
| 
 | 
   776     # Seconda passata: aggiungi le reazioni
 | 
| 
 | 
   777     reactions_added = 0
 | 
| 
 | 
   778     reactions_skipped = 0
 | 
| 
 | 
   779     
 | 
| 
 | 
   780     for idx, row in df.iterrows():
 | 
| 
 | 
   781         try:
 | 
| 
 | 
   782             reaction_id = str(row['ReactionID']).strip()
 | 
| 
 | 
   783             reaction_formula = str(row['Reaction']).strip()
 | 
| 
 | 
   784             
 | 
| 
 | 
   785             # Salta reazioni senza formula
 | 
| 
 | 
   786             if not reaction_formula or reaction_formula == 'nan':
 | 
| 
 | 
   787                 reactions_skipped += 1
 | 
| 
 | 
   788                 continue
 | 
| 
 | 
   789             
 | 
| 
 | 
   790             # Crea la reazione
 | 
| 
 | 
   791             reaction = Reaction(reaction_id)
 | 
| 
 | 
   792             reaction.name = reaction_id
 | 
| 
 | 
   793             
 | 
| 
 | 
   794             # Imposta bounds
 | 
| 
 | 
   795             reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0
 | 
| 
 | 
   796             reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0
 | 
| 
 | 
   797             
 | 
| 
 | 
   798             # Aggiungi gene rule se presente
 | 
| 
 | 
   799             if pd.notna(row['Rule']) and str(row['Rule']).strip():
 | 
| 
 | 
   800                 reaction.gene_reaction_rule = str(row['Rule']).strip()
 | 
| 
 | 
   801             
 | 
| 
 | 
   802             # Parse della formula della reazione
 | 
| 
 | 
   803             try:
 | 
| 
 | 
   804                 parse_reaction_formula(reaction, reaction_formula, metabolites_dict)
 | 
| 
 | 
   805             except Exception as e:
 | 
| 
 | 
   806                 print(f"Errore nel parsing della reazione {reaction_id}: {e}")
 | 
| 
 | 
   807                 reactions_skipped += 1
 | 
| 
 | 
   808                 continue
 | 
| 
 | 
   809             
 | 
| 
 | 
   810             # Aggiungi la reazione al modello
 | 
| 
 | 
   811             model.add_reactions([reaction])
 | 
| 
 | 
   812             reactions_added += 1
 | 
| 
 | 
   813             
 | 
| 
 | 
   814         except Exception as e:
 | 
| 
 | 
   815             print(f"Errore nell'aggiungere la reazione {reaction_id}: {e}")
 | 
| 
 | 
   816             reactions_skipped += 1
 | 
| 
 | 
   817             continue
 | 
| 
 | 
   818     
 | 
| 
 | 
   819     print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni")
 | 
| 
 | 
   820     
 | 
| 
 | 
   821     # Imposta l'obiettivo di biomassa
 | 
| 
 | 
   822     set_biomass_objective(model)
 | 
| 
 | 
   823     
 | 
| 
 | 
   824     # Imposta il medium
 | 
| 
 | 
   825     set_medium_from_data(model, df)
 | 
| 
 | 
   826     
 | 
| 
 | 
   827     print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti")
 | 
| 
 | 
   828     
 | 
| 
 | 
   829     return model
 | 
| 
 | 
   830 
 | 
| 
 | 
   831 
 | 
| 
 | 
   832 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore)
 | 
| 
 | 
   833 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
 | 
| 
 | 
   834     """
 | 
| 
 | 
   835     Estrae gli ID dei metaboliti da una formula di reazione.
 | 
| 
 | 
   836     Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e)
 | 
| 
 | 
   837     e permette che comincino con cifre o underscore.
 | 
| 
 | 
   838     """
 | 
| 
 | 
   839     metabolites = set()
 | 
| 
 | 
   840     # coefficiente opzionale seguito da un token che termina con _<letters>
 | 
| 
 | 
   841     pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)'
 | 
| 
 | 
   842     matches = re.findall(pattern, reaction_formula)
 | 
| 
 | 
   843     metabolites.update(matches)
 | 
| 
 | 
   844     return metabolites
 | 
| 
 | 
   845 
 | 
| 
 | 
   846 
 | 
| 
 | 
   847 def extract_compartment_from_metabolite(metabolite_id: str) -> str:
 | 
| 
 | 
   848     """
 | 
| 
 | 
   849     Estrae il compartimento dall'ID del metabolita.
 | 
| 
 | 
   850     """
 | 
| 
 | 
   851     # Il compartimento è solitamente l'ultima lettera dopo l'underscore
 | 
| 
 | 
   852     if '_' in metabolite_id:
 | 
| 
 | 
   853         return metabolite_id.split('_')[-1]
 | 
| 
 | 
   854     return 'c'  # default cytoplasm
 | 
| 
 | 
   855 
 | 
| 
 | 
   856 
 | 
| 
 | 
   857 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]):
 | 
| 
 | 
   858     """
 | 
| 
 | 
   859     Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti.
 | 
| 
 | 
   860     """
 | 
| 
 | 
   861 
 | 
| 
 | 
   862     if reaction.id == 'EX_thbpt_e':
 | 
| 
 | 
   863         print(reaction.id)
 | 
| 
 | 
   864         print(formula)
 | 
| 
 | 
   865     # Dividi in parte sinistra e destra
 | 
| 
 | 
   866     if '<=>' in formula:
 | 
| 
 | 
   867         left, right = formula.split('<=>')
 | 
| 
 | 
   868         reversible = True
 | 
| 
 | 
   869     elif '<--' in formula:
 | 
| 
 | 
   870         left, right = formula.split('<--')
 | 
| 
 | 
   871         reversible = False
 | 
| 
 | 
   872         left, right = left, right
 | 
| 
 | 
   873     elif '-->' in formula:
 | 
| 
 | 
   874         left, right = formula.split('-->')
 | 
| 
 | 
   875         reversible = False
 | 
| 
 | 
   876     elif '<-' in formula:
 | 
| 
 | 
   877         left, right = formula.split('<-')
 | 
| 
 | 
   878         reversible = False
 | 
| 
 | 
   879         left, right = left, right
 | 
| 
 | 
   880     else:
 | 
| 
 | 
   881         raise ValueError(f"Formato reazione non riconosciuto: {formula}")
 | 
| 
 | 
   882     
 | 
| 
 | 
   883     # Parse dei metaboliti e coefficienti
 | 
| 
 | 
   884     reactants = parse_metabolites_side(left.strip())
 | 
| 
 | 
   885     products = parse_metabolites_side(right.strip())
 | 
| 
 | 
   886     
 | 
| 
 | 
   887     # Aggiungi metaboliti alla reazione
 | 
| 
 | 
   888     metabolites_to_add = {}
 | 
| 
 | 
   889     
 | 
| 
 | 
   890     # Reagenti (coefficienti negativi)
 | 
| 
 | 
   891     for met_id, coeff in reactants.items():
 | 
| 
 | 
   892         if met_id in metabolites_dict:
 | 
| 
 | 
   893             metabolites_to_add[metabolites_dict[met_id]] = -coeff
 | 
| 
 | 
   894     
 | 
| 
 | 
   895     # Prodotti (coefficienti positivi)
 | 
| 
 | 
   896     for met_id, coeff in products.items():
 | 
| 
 | 
   897         if met_id in metabolites_dict:
 | 
| 
 | 
   898             metabolites_to_add[metabolites_dict[met_id]] = coeff
 | 
| 
 | 
   899     
 | 
| 
 | 
   900     reaction.add_metabolites(metabolites_to_add)
 | 
| 
 | 
   901 
 | 
| 
 | 
   902 
 | 
| 
 | 
   903 def parse_metabolites_side(side_str: str) -> Dict[str, float]:
 | 
| 
 | 
   904     """
 | 
| 
 | 
   905     Parsa un lato della reazione per estrarre metaboliti e coefficienti.
 | 
| 
 | 
   906     """
 | 
| 
 | 
   907     metabolites = {}
 | 
| 
 | 
   908     if not side_str or side_str.strip() == '':
 | 
| 
 | 
   909         return metabolites
 | 
| 
 | 
   910 
 | 
| 
 | 
   911     terms = side_str.split('+')
 | 
| 
 | 
   912     for term in terms:
 | 
| 
 | 
   913         term = term.strip()
 | 
| 
 | 
   914         if not term:
 | 
| 
 | 
   915             continue
 | 
| 
 | 
   916 
 | 
| 
 | 
   917         # pattern allineato: coefficiente opzionale + id che termina con _<compartimento>
 | 
| 
 | 
   918         match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term)
 | 
| 
 | 
   919         if match:
 | 
| 
 | 
   920             coeff_str, met_id = match.groups()
 | 
| 
 | 
   921             coeff = float(coeff_str) if coeff_str else 1.0
 | 
| 
 | 
   922             metabolites[met_id] = coeff
 | 
| 
 | 
   923 
 | 
| 
 | 
   924     return metabolites
 | 
| 
 | 
   925 
 | 
| 
 | 
   926 
 | 
| 
 | 
   927 
 | 
| 
 | 
   928 def set_biomass_objective(model: Model):
 | 
| 
 | 
   929     """
 | 
| 
 | 
   930     Imposta la reazione di biomassa come obiettivo.
 | 
| 
 | 
   931     """
 | 
| 
 | 
   932     biomass_reactions = [r for r in model.reactions if 'biomass' in r.id.lower()]
 | 
| 
 | 
   933     
 | 
| 
 | 
   934     if biomass_reactions:
 | 
| 
 | 
   935         model.objective = biomass_reactions[0].id
 | 
| 
 | 
   936         print(f"Obiettivo impostato su: {biomass_reactions[0].id}")
 | 
| 
 | 
   937     else:
 | 
| 
 | 
   938         print("Nessuna reazione di biomassa trovata")
 | 
| 
 | 
   939 
 | 
| 
 | 
   940 
 | 
| 
 | 
   941 def set_medium_from_data(model: Model, df: pd.DataFrame):
 | 
| 
 | 
   942     """
 | 
| 
 | 
   943     Imposta il medium basato sulla colonna InMedium.
 | 
| 
 | 
   944     """
 | 
| 
 | 
   945     medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist()
 | 
| 
 | 
   946     
 | 
| 
 | 
   947     medium_dict = {}
 | 
| 
 | 
   948     for rxn_id in medium_reactions:
 | 
| 
 | 
   949         if rxn_id in [r.id for r in model.reactions]:
 | 
| 
 | 
   950             reaction = model.reactions.get_by_id(rxn_id)
 | 
| 
 | 
   951             if reaction.lower_bound < 0:  # Solo reazioni di uptake
 | 
| 
 | 
   952                 medium_dict[rxn_id] = abs(reaction.lower_bound)
 | 
| 
 | 
   953     
 | 
| 
 | 
   954     if medium_dict:
 | 
| 
 | 
   955         model.medium = medium_dict
 | 
| 
 | 
   956         print(f"Medium impostato con {len(medium_dict)} componenti")
 | 
| 
 | 
   957 
 | 
| 
 | 
   958 
 | 
| 
 | 
   959 def validate_model(model: Model) -> Dict[str, any]:
 | 
| 
 | 
   960     """
 | 
| 
 | 
   961     Valida il modello e fornisce statistiche di base.
 | 
| 
 | 
   962     """
 | 
| 
 | 
   963     validation = {
 | 
| 
 | 
   964         'num_reactions': len(model.reactions),
 | 
| 
 | 
   965         'num_metabolites': len(model.metabolites),
 | 
| 
 | 
   966         'num_genes': len(model.genes),
 | 
| 
 | 
   967         'num_compartments': len(model.compartments),
 | 
| 
 | 
   968         'objective': str(model.objective),
 | 
| 
 | 
   969         'medium_size': len(model.medium),
 | 
| 
 | 
   970         'reversible_reactions': len([r for r in model.reactions if r.reversibility]),
 | 
| 
 | 
   971         'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]),
 | 
| 
 | 
   972     }
 | 
| 
 | 
   973     
 | 
| 
 | 
   974     try:
 | 
| 
 | 
   975         # Test di crescita
 | 
| 
 | 
   976         solution = model.optimize()
 | 
| 
 | 
   977         validation['growth_rate'] = solution.objective_value
 | 
| 
 | 
   978         validation['status'] = solution.status
 | 
| 
 | 
   979     except Exception as e:
 | 
| 
 | 
   980         validation['growth_rate'] = None
 | 
| 
 | 
   981         validation['status'] = f"Error: {e}"
 | 
| 
 | 
   982     
 | 
| 
 | 
   983     return validation
 |