changeset 289:f7812d713af5 draft default tip

Uploaded
author luca_milaz
date Tue, 09 Jul 2024 22:45:02 +0000
parents 38a41d36bbc9
children
files utils/utils/general_utils.py utils/utils/reaction_parsing.py utils/utils/rule_parsing.py
diffstat 3 files changed, 924 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/utils/utils/general_utils.py	Tue Jul 09 22:45:02 2024 +0000
@@ -0,0 +1,551 @@
+import math
+import re
+import sys
+import csv
+import pickle
+import lxml.etree as ET
+
+from enum import Enum
+from itertools import count
+from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union
+
+import pandas as pd
+
+# FILES
+class FileFormat(Enum):
+    """
+    Encodes possible file extensions to conditionally save data in a different format.
+    """
+    DAT    = ("dat",) # this is how galaxy treats all your files!
+    CSV    = ("csv",) # this is how most editable input data is written
+    TSV    = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!!
+    
+    SVG    = ("svg",) # this is how most metabolic maps are written
+    PNG    = ("png",) # this is a common output format for images (such as metabolic maps)
+    PDF    = ("pdf",) # this is also a common output format for images, as it's required in publications.
+
+    XML    = ("xml",) # this is one main way cobra models appear in
+    JSON   = ("json",) # this is the other
+    
+    PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved
+    #TODO: we're in a pickle (ba dum tss), there's no point in supporting many extensions internally. The
+    # issue will never be solved for user-uploaded files and those are saved as .dat by galaxy anyway so it
+    # doesn't matter as long as we CAN recognize these 3 names as valid pickle extensions. We must however
+    # agree on an internal standard and use only that one, otherwise constructing usable paths becomes a nightmare.
+    @classmethod
+    def fromExt(cls, ext :str) -> "FileFormat":
+        """
+        Converts a file extension string to a FileFormat instance.
+
+        Args:
+            ext : The file extension as a string.
+
+        Returns:
+            FileFormat: The FileFormat instance corresponding to the file extension.
+        """
+        variantName = ext.upper()
+        if variantName in FileFormat.__members__: return FileFormat[variantName]
+        
+        variantName = variantName.lower()
+        for member in cls:
+            if variantName in member.value: return member
+        
+        raise ValueErr("ext", "a valid FileFormat file extension", ext)
+
+    def __str__(self) -> str:
+        """
+        (Private) converts to str representation. Good practice for usage with argparse.
+
+        Returns:
+            str : the string representation of the file extension.
+        """
+        return self.value[-1] #TODO: fix, it's the dumb pickle thing
+
+class FilePath():
+    """
+    Represents a file path. View this as an attempt to standardize file-related operations by expecting
+    values of this type in any process requesting a file path.
+    """
+    def __init__(self, filePath :str, ext :FileFormat, *, prefix = "") -> None:
+        """
+        (Private) Initializes an instance of FilePath.
+
+        Args:
+            path : the end of the path, containing the file name.
+            ext : the file's extension.
+            prefix : anything before path, if the last '/' isn't there it's added by the code.
+        
+        Returns:
+            None : practically, a FilePath instance.
+        """
+        self.ext      = ext
+        self.filePath = filePath
+
+        if prefix and prefix[-1] != '/': prefix += '/'
+        self.prefix = prefix
+    
+    @classmethod
+    def fromStrPath(cls, path :str) -> "FilePath":
+        """
+        Factory method to parse a string from which to obtain, if possible, a valid FilePath instance.
+
+        Args:
+            path : the string containing the path
+        
+        Raises:
+            PathErr : if the provided string doesn't represent a valid path.
+        
+        Returns:
+            FilePath : the constructed instance.
+        """
+        # This method is often used to construct FilePath instances from ARGS UI arguments. These arguments *should*
+        # always be correct paths and could be used as raw strings, however most if not all functions that work with
+        # file paths request the FilePath objects specifically, which is a very good thing in any case other than this.
+        # What ends up happening is we spend time parsing a string into a FilePath so that the function accepts it, only
+        # to call show() immediately to bring back the string and open the file it points to.
+        # TODO: this is an indication that the arguments SHOULD BE OF TYPE FilePath if they are filepaths, this ENSURES
+        # their correctness when modifying the UI and avoids the pointless back-and-forth.
+        result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path)
+        if not result or not result["name"] or not result["ext"]:
+            raise PathErr(path, "cannot recognize folder structure or extension in path")
+
+        prefix = result["prefix"] if result["prefix"] else ""
+        return cls(result["name"], FileFormat.fromExt(result["ext"]), prefix = prefix)
+
+    def show(self) -> str:
+        """
+        Shows the path as a string.
+
+        Returns:
+            str : the path shown as a string.
+        """
+        return f"{self.prefix}{self.filePath}.{self.ext}"
+    
+    def __str__(self) -> str: return self.show()
+
+# ERRORS
+def terminate(msg :str) -> None:
+    """
+    Terminate the execution of the script with an error message.
+    
+    Args:
+        msg (str): The error message to be displayed.
+    
+    Returns:
+        None
+    """
+    sys.exit(f"Execution aborted: {msg}\n")
+
+def logWarning(msg :str, loggerPath :str) -> None:
+    """
+    Log a warning message to an output log file and print it to the console. The final period and a
+    newline is added by the function.
+
+    Args:
+        s (str): The warning message to be logged and printed.
+        loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and
+        immediately read back (beware relative expensive operation, log with caution).
+
+    Returns:
+        None
+    """
+    # building the path and then reading it immediately seems useless, but it's actually a way of
+    # validating that reduces repetition on the caller's side. Besides, logging a message by writing
+    # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from
+    # mindlessly logging whenever something comes up, log at the very end and tell the user everything
+    # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to
+    # the file only at the end of the program's execution.
+    with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n")
+
+class CustomErr(Exception):
+    """
+    Custom error class to handle exceptions in a structured way, with a unique identifier and a message.
+    """
+    __idGenerator = count()
+    errName = "Custom Error"
+    def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None:
+        """
+        (Private) Initializes an instance of CustomErr.
+
+        Args:
+            msg (str): Error message to be displayed.
+            details (str): Informs the user more about the error encountered. Defaults to "".
+            explicitErrCode (int): Explicit error code to be used. Defaults to -1.
+        
+        Returns:
+            None : practically, a CustomErr instance.
+        """
+        self.msg     = msg
+        self.details = details
+
+        self.id = max(explicitErrCode, next(CustomErr.__idGenerator))
+
+    def throw(self, loggerPath = "") -> None:
+        """
+        Raises the current CustomErr instance, logging a warning message before doing so.
+
+        Raises:
+            self: The current CustomErr instance.
+        
+        Returns:
+            None
+        """
+        if loggerPath: logWarning(str(self), loggerPath)
+        raise self
+
+    def abort(self) -> None:
+        """
+        Aborts the execution of the script.
+        
+        Returns:
+            None
+        """
+        terminate(str(self))
+
+    def __str__(self) -> str:
+        """
+        (Private) Returns a string representing the current CustomErr instance.
+
+        Returns:
+            str: A string representing the current CustomErr instance.
+        """
+        return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}."
+
+class ArgsErr(CustomErr):
+    """
+    CustomErr subclass for UI arguments errors.
+    """
+    errName = "Args Error"
+    def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None:
+        super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg)
+
+class DataErr(CustomErr):
+    """
+    CustomErr subclass for data formatting errors.
+    """
+    errName = "Data Format Error"
+    def __init__(self, fileName :str, msg = "no further details provided") -> None:
+        super().__init__(f"file \"{fileName}\" contains malformed data", msg)
+
+class PathErr(CustomErr):
+    """
+    CustomErr subclass for filepath formatting errors.
+    """
+    errName = "Path Error"
+    def __init__(self, path :FilePath, msg = "no further details provided") -> None:
+        super().__init__(f"path \"{path}\" is invalid", msg)
+
+class ValueErr(CustomErr):
+    """
+    CustomErr subclass for any value error.
+    """
+    errName = "Value Error"
+    def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None:
+        super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg)
+
+# RESULT
+T = TypeVar('T')
+E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened!
+class Result(Generic[T, E]):
+    class ResultErr(CustomErr):
+        """
+        CustomErr subclass for all Result errors.
+        """
+        errName = "Result Error"
+        def __init__(self, msg = "no further details provided") -> None:
+            super().__init__(msg)
+    """
+    Class to handle the result of an operation, with a value and a boolean flag to indicate
+    whether the operation was successful or not.
+    """
+    def __init__(self, value :Union[T, E], isOk :bool) -> None:
+        """
+        (Private) Initializes an instance of Result.
+
+        Args:
+            value (Union[T, E]): The value to be stored in the Result instance.
+            isOk (bool): A boolean flag to indicate whether the operation was successful or not.
+        
+            Returns:
+                None : practically, a Result instance.
+        """
+        self.isOk  = isOk
+        self.isErr = not isOk
+        self.value = value
+
+    @classmethod
+    def Ok(cls,  value :T) -> "Result":
+        """
+        Constructs a new Result instance with a successful operation.
+
+        Args:
+            value (T): The value to be stored in the Result instance, set as successful.
+
+        Returns:
+            Result: A new Result instance with a successful operation.
+        """
+        return Result(value, isOk = True)
+    
+    @classmethod
+    def Err(cls, value :E) -> "Result": 
+        """
+        Constructs a new Result instance with a failed operation.
+
+        Args:
+            value (E): The value to be stored in the Result instance, set as failed.
+
+        Returns:
+            Result: A new Result instance with a failed operation.
+        """
+        return Result(value, isOk = False)
+
+    def unwrap(self) -> T:
+        """
+        Unwraps the value of the Result instance, if the operation was successful.
+
+        Raises:
+            ResultErr: If the operation was not successful.
+
+        Returns:
+            T: The value of the Result instance, if the operation was successful.
+        """
+        if self.isOk: return self.value
+        raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}")
+
+    def unwrapOr(self, default :T) -> T:
+        """
+        Unwraps the value of the Result instance, if the operation was successful, otherwise
+        it returns a default value.
+
+        Args:
+            default (T): The default value to be returned if the operation was not successful.
+
+        Returns:
+            T: The value of the Result instance, if the operation was successful,
+            otherwise the default value.
+        """
+        return self.value if self.isOk else default
+    
+    def expect(self, err :"Result.ResultErr") -> T:
+        """
+        Expects that the value of the Result instance is successful, otherwise it raises an error.
+
+        Args:
+            err (Exception): The error to be raised if the operation was not successful.
+
+        Raises:
+            err: The error raised if the operation was not successful.
+
+        Returns:
+            T: The value of the Result instance, if the operation was successful.
+        """
+        if self.isOk: return self.value
+        raise err
+
+    U = TypeVar("U")
+    def map(self, mapper: Callable[[T], U]) -> "Result[U, E]":
+        """
+        Maps the value of the current Result to whatever is returned by the mapper function.
+        If the Result contained an unsuccessful operation to begin with it remains unchanged
+        (a reference to the current instance is returned).
+        If the mapper function panics the returned result instance will be of the error kind.
+
+        Args:
+            mapper (Callable[[T], U]): The mapper operation to be applied to the Result value.
+
+        Returns:
+            Result[U, E]: The result of the mapper operation applied to the Result value.
+        """
+        if self.isErr: return self
+        try: return Result.Ok(mapper(self.value))
+        except Exception as e: return Result.Err(e)
+    
+    D = TypeVar("D", bound = "Result.ResultErr")
+    def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]":
+        """
+        Maps the error of the current Result to whatever is returned by the mapper function.
+        If the Result contained a successful operation it remains unchanged
+        (a reference to the current instance is returned).
+        If the mapper function panics this method does as well.
+
+        Args:
+            mapper (Callable[[E], D]): The mapper operation to be applied to the Result error.
+
+        Returns:
+            Result[U, E]: The result of the mapper operation applied to the Result error.
+        """
+        if self.isOk: return self
+        return Result.Err(mapper(self.value))
+
+    def __str__(self):
+        return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})"
+
+# FILES
+def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame:
+    """
+    Reads a .csv or .tsv file and returns it as a Pandas DataFrame.
+
+    Args:
+        path : the path to the dataset file.
+        datasetName : the name of the dataset.
+
+    Raises:
+        DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if
+        it has less than 2 columns.
+    
+    Returns:
+        pandas.DataFrame: The dataset loaded as a Pandas DataFrame.
+    """
+    # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than
+    # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code.
+    # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really
+    # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and
+    # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is
+    # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions.
+    try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python")
+    except:
+        try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python")
+        except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}")
+    
+    if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns")
+    return dataset
+
+def readPickle(path :FilePath) -> Any:
+    """
+    Reads the contents of a .pickle file, which needs to exist at the given path.
+
+    Args:
+        path : the path to the .pickle file.
+    
+    Returns:
+        Any : the data inside a pickle file, could be anything.
+    """
+    with open(path.show(), "rb") as fd: return pickle.load(fd)
+
+def writePickle(path :FilePath, data :Any) -> None:
+    """
+    Saves any data in a .pickle file, created at the given path.
+
+    Args:
+        path : the path to the .pickle file.
+        data : the data to be written to the file.
+    
+    Returns:
+        None
+    """
+    with open(path.show(), "wb") as fd: pickle.dump(data, fd)
+
+def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]:
+    """
+    Reads the contents of a .csv file, which needs to exist at the given path.
+
+    Args:
+        path : the path to the .csv file.
+        delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter).
+        skipHeader : whether the first row of the file is a header and should be skipped.
+    
+    Returns:
+        List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas.
+    """
+    with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:]
+
+def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree:
+    """
+    Reads the contents of a .svg file, which needs to exist at the given path.
+
+    Args:
+        path : the path to the .svg file.
+    
+    Raises:
+        DataErr : if the map is malformed.
+    
+    Returns:
+        Any : the data inside a svg file, could be anything.
+    """
+    try: return ET.parse(path.show())
+    except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err:
+        raise customErr if customErr else err
+
+def writeSvg(path :FilePath, data:ET.ElementTree) -> None:
+    """
+    Saves svg data opened with lxml.etree in a .svg file, created at the given path.
+
+    Args:
+        path : the path to the .svg file.
+        data : the data to be written to the file.
+    
+    Returns:
+        None
+    """
+    with open(path.show(), "wb") as fd: fd.write(ET.tostring(data))
+
+# UI ARGUMENTS
+class Bool:
+    def __init__(self, argName :str) -> None:
+        self.argName = argName
+
+    def __call__(self, s :str) -> bool: return self.check(s)
+
+    def check(self, s :str) -> bool:
+        s = s.lower()
+        if s == "true" : return True
+        if s == "false": return False
+        raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"")
+
+class Float:
+    def __init__(self, argName = "Dataset values, not an argument") -> None:
+        self.argName = argName
+    
+    def __call__(self, s :str) -> float: return self.check(s)
+
+    def check(self, s :str) -> float:
+        try: return float(s)
+        except ValueError:
+            s = s.lower()
+            if s == "nan" or s == "none": return math.nan
+            raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"")
+
+# MODELS
+OldRule = List[Union[str, "OldRule"]]
+class Model(Enum):
+    """
+    Represents a metabolic model, either custom or locally supported. Custom models don't point
+    to valid file paths.
+    """
+
+    Recon   = "Recon"
+    ENGRO2  = "ENGRO2"
+    HMRcore = "HMRcore"
+    Custom  = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths.
+
+    def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None:
+        if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model")
+
+    def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]:
+        """
+        Open "rules" file for this model.
+
+        Returns:
+            Dict[str, Dict[str, OldRule]] : the rules for this model.
+        """
+        path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
+        self.__raiseMissingPathErr(path)
+        return readPickle(path)
+    
+    def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]:
+        """
+        Open "gene translator (old: gene_in_rule)" file for this model.
+
+        Returns:
+            Dict[str, Dict[str, str]] : the translator dict for this model.
+        """
+        path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
+        self.__raiseMissingPathErr(path)
+        return readPickle(path)
+    
+    def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree:
+        path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/")
+        self.__raiseMissingPathErr(path)
+        return readSvg(path, customErr = DataErr(path, f"custom map in wrong format"))
+
+    def __str__(self) -> str: return self.value
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/utils/utils/reaction_parsing.py	Tue Jul 09 22:45:02 2024 +0000
@@ -0,0 +1,130 @@
+from enum import Enum
+import utils.general_utils as utils
+from typing import Dict
+import csv
+import re
+
+# Reaction direction encoding:
+class ReactionDir(Enum):
+  """
+  A reaction can go forwards, backwards or be reversible (able to proceed in both directions).
+  Models created / managed with cobrapy encode this information within the reaction's
+  formula using the arrows this enum keeps as values.
+  """
+  FORWARD    = "-->"
+  BACKWARD   = "<--"
+  REVERSIBLE = "<=>"
+
+  @classmethod
+  def fromReaction(cls, reaction :str) -> 'ReactionDir':
+    """
+    Takes a whole reaction formula string and looks for one of the arrows, returning the
+    corresponding reaction direction.
+
+    Args:
+      reaction : the reaction's formula.
+    
+    Raises:
+      ValueError : if no valid arrow is found.
+    
+    Returns:
+      ReactionDir : the corresponding reaction direction.
+    """
+    for member in cls:
+      if member.value in reaction: return member
+
+    raise ValueError("No valid arrow found within reaction string.")
+
+ReactionsDict = Dict[str, Dict[str, float]]
+
+
+def add_custom_reaction(reactionsDict :ReactionsDict, rId :str, reaction :str) -> None:
+  """
+  Adds an entry to the given reactionsDict. Each entry consists of a given unique reaction id
+  (key) and a :dict (value) matching each substrate in the reaction to its stoichiometric coefficient.
+  Keys and values are both obtained from the reaction's formula: if a substrate (custom metabolite id)
+  appears without an explicit coeff, the value 1.0 will be used instead.
+
+  Args:
+    reactionsDict : dictionary encoding custom reactions information.
+    rId : unique reaction id.
+    reaction : the reaction's formula.
+  
+  Returns:
+    None
+
+  Side effects:
+    reactionsDict : mut
+  """
+  reaction = reaction.strip()
+  if not reaction: return
+
+  reactionsDict[rId] = {}
+  # We assume the '+' separating consecutive metabs in a reaction is spaced from them,
+  # to avoid confusing it for electrical charge:
+  for word in reaction.split(" + "):
+    metabId, stoichCoeff = word, 1.0
+    # Implicit stoichiometric coeff is equal to 1, some coeffs are floats.
+
+    # Accepted coeffs can be integer or floats with a dot (.) decimal separator
+    # and must be separated from the metab with a space:
+    foundCoeff = re.search(r"\d+(\.\d+)? ", word)
+    if foundCoeff:
+      wholeMatch  = foundCoeff.group(0)
+      metabId     = word[len(wholeMatch):].strip()
+      stoichCoeff = float(wholeMatch.strip())
+
+    reactionsDict[rId][metabId] = stoichCoeff
+
+  if not reactionsDict[rId]: del reactionsDict[rId] # Empty reactions are removed.
+
+
+def create_reaction_dict(unparsed_reactions: Dict[str, str]) -> ReactionsDict:
+    """
+    Parses the given dictionary into the correct format.
+
+    Args:
+        unparsed_reactions (Dict[str, str]): A dictionary where keys are reaction IDs and values are unparsed reaction strings.
+
+    Returns:
+        ReactionsDict: The correctly parsed dict.
+    """
+    reactionsDict :ReactionsDict = {}
+    for rId, reaction in unparsed_reactions.items():
+        reactionDir = ReactionDir.fromReaction(reaction)
+        left, right = reaction.split(f" {reactionDir.value} ")
+
+        # Reversible reactions are split into distinct reactions, one for each direction.
+        # In general we only care about substrates, the product information is lost.
+        reactionIsReversible = reactionDir is ReactionDir.REVERSIBLE
+        if reactionDir is not ReactionDir.BACKWARD:
+            add_custom_reaction(reactionsDict, rId + "_F" * reactionIsReversible, left)
+        
+        if reactionDir is not ReactionDir.FORWARD:
+            add_custom_reaction(reactionsDict, rId + "_B" * reactionIsReversible, right)
+        
+        # ^^^ to further clarify: if a reaction is NOT reversible it will not be marked as _F or _B
+        # and whichever direction we DO keep (forward if --> and backward if <--) loses this information.
+        # This IS a small problem when coloring the map in marea.py because the arrow IDs in the map follow
+        # through with a similar convention on ALL reactions and correctly encode direction based on their
+        # model of origin. TODO: a proposed solution is to unify the standard in RPS to fully mimic the maps,
+        # which involves re-writing the "reactions" dictionary.
+    
+    return reactionsDict
+
+
+def parse_custom_reactions(customReactionsPath :str) -> ReactionsDict:
+  """
+  Creates a custom dictionary encoding reactions information from a csv file containing
+  data about these reactions, the path of which is given as input.
+
+  Args:
+    customReactionsPath : path to the reactions information file.
+  
+  Returns:
+    ReactionsDict : dictionary encoding custom reactions information.
+  """
+  reactionsData :Dict[str, str] = {row[0]: row[1] for row in utils.readCsv(utils.FilePath.fromStrPath(customReactionsPath))} 
+  
+  return create_reaction_dict(reactionsData)
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/utils/utils/rule_parsing.py	Tue Jul 09 22:45:02 2024 +0000
@@ -0,0 +1,243 @@
+from enum import Enum
+import utils.general_utils as utils
+from typing import List, Union, Optional
+
+class RuleErr(utils.CustomErr):
+    """
+    CustomErr subclass for rule syntax errors.
+    """
+    errName = "Rule Syntax Error"
+    def __init__(self, rule :str, msg = "no further details provided") -> None:
+        super().__init__(
+            f"rule \"{rule}\" is malformed, {msg}",
+            "please verify your input follows the validity guidelines")
+
+class RuleOp(Enum):
+    """
+    Encodes all operators valid in gene rules.
+    """
+    OR  = "or"
+    AND = "and"
+
+    @classmethod
+    def isOperator(cls, op :str) -> bool:
+        return op.upper() in cls.__members__
+
+    def __str__(self) -> str: return self.value
+
+class OpList(List[Union[str, "OpList"]]):
+    """
+    Represents a parsed rule and each of its nesting levels, including the operator that level uses.
+    """
+    def __init__(self, op :Optional[RuleOp] = None) -> None:
+        """
+        (Private) Initializes an instance of OpList.
+
+        Args:
+            op (str): Operator to be assigned to the OpList. Defaults to "".
+        
+        Returns:
+            None : practically, an OpList instance.
+        """
+        self.op = op
+
+    def setOpIfMissing(self, op :RuleOp) -> None:
+        """
+        Sets the operator of the OpList if it's missing.
+
+        Args:
+            op (str): Operator to be assigned to the OpList.
+        
+        Returns:
+            None
+        """
+        if not self.op: self.op = op
+
+    def __repr__(self, indent = "") -> str:
+        """
+        (Private) Returns a string representation of the current OpList instance.
+
+        Args:
+            indent (str): Indentation level . Defaults to "".
+
+        Returns:
+            str: A string representation of the current OpList instance.
+        """
+        nextIndent = indent + "  "
+        return f"<{self.op}>[\n" + ",\n".join([
+            f"{nextIndent}{item.__repr__(nextIndent) if isinstance(item, OpList) else item}"
+            for item in self ]) + f"\n{indent}]"
+
+class RuleStack:
+    """
+    FILO stack structure to save the intermediate representation of a Rule during parsing, with the
+    current nesting level at the top of the stack.
+    """
+    def __init__(self) -> None:
+        """
+        (Private) initializes an instance of RuleStack.
+
+        Returns:
+            None : practically, a RuleStack instance.
+        """
+        self.__stack = [OpList()] # the stack starts out with the result list already allocated
+        self.__updateCurrent()
+
+    def pop(self) -> None:
+        """
+        Removes the OpList on top of the stack, also flattening it once when possible.
+
+        Side Effects:
+            self : mut
+
+        Returns:
+            None
+        """
+        oldTop = self.__stack.pop()
+        if len(oldTop) == 1 and isinstance(oldTop[0], OpList): self.__stack[-1][-1] = oldTop[0]
+        self.__updateCurrent()
+
+    def push(self, operator = "") -> None:
+        """
+        Adds a new nesting level, in the form of a new OpList on top of the stack.
+
+        Args:
+            operator : the operator assigned to the new OpList.
+
+        Side Effects:
+            self : mut
+        
+        Returns:
+            None
+        """
+        newLevel = OpList(operator)
+        self.current.append(newLevel)
+        self.__stack.append(newLevel)
+        self.__updateCurrent()
+
+    def popForward(self) -> None:
+        """
+        Moves the last "actual" item from the 2nd to last list to the beginning of the top list, as per
+        the example below:
+        stack  : [list_a, list_b]
+        list_a : [item1, item2, list_b] --> [item1, list_b]
+        list_b : [item3, item4]         --> [item2, item3, item4]
+
+        This is essentially a "give back as needed" operation.
+
+        Side Effects:
+            self : mut
+        
+        Returns:
+            None
+        """
+        self.current.insert(0, self.__stack[-2].pop(-2))
+
+    def currentIsAnd(self) -> bool:
+        """
+        Checks if the current OpList's assigned operator is "and".
+
+        Returns:
+            bool : True if the current OpList's assigned operator is "and", False otherwise.
+        """
+        return self.current.op is RuleOp.AND
+
+    def obtain(self, err :Optional[utils.CustomErr] = None) -> Optional[OpList]:
+        """
+        Obtains the first OpList on the stack, only if it's the only element.
+
+        Args:
+            err : The error to raise if obtaining the result is not possible.
+
+        Side Effects:
+            self : mut    
+        
+        Raises:
+            err: If given, otherwise None is returned.
+
+        Returns:
+            Optional[OpList]: The first OpList on the stack, only if it's the only element.
+        """
+
+        if len(self.__stack) == 1: return self.__stack.pop()
+        if err: raise err
+        return None
+
+    def __updateCurrent(self) -> None:
+        """
+        (Private) Updates the current OpList to the one on top of the stack.
+
+        Side Effects:
+            self : mut
+        
+        Returns:
+            None
+        """
+        self.current = self.__stack[-1]
+
+def parseRuleToNestedList(rule :str) -> OpList:
+    """
+    Parse a single rule from its string representation to an OpList, making all priority explicit
+    through nesting levels.
+
+    Args:
+        rule : the string representation of a rule to be parsed.
+    
+    Raises:
+        RuleErr : whenever something goes wrong during parsing.
+    
+    Returns:
+        OpList : the parsed rule.
+    """
+    source = iter(rule
+        .replace("(", "( ").replace(")", " )") # Single out parens as words
+        .strip()  # remove whitespace at extremities
+        .split()) # split by spaces
+
+    stack = RuleStack()
+    nestingErr = RuleErr(rule, "mismatch between open and closed parentheses")
+    try:
+        while True: # keep reading until source ends
+            while True:
+                operand = next(source, None) # expected name or rule opening
+                if operand is None: raise RuleErr(rule, "found trailing open parentheses")
+                if operand == "and" or operand == "or" or operand == ")": # found operator instead, panic
+                    raise RuleErr(rule, f"found \"{operand}\" in unexpected position")
+
+                if operand != "(": break # found name
+
+                # found rule opening, we add new nesting level but don't know the operator
+                stack.push()
+
+            stack.current.append(operand)
+
+            while True: # keep reading until operator is found or source ends
+                operator = next(source, None) # expected operator or rule closing
+                if operator and operator != ")": break # found operator
+
+                if stack.currentIsAnd(): stack.pop() # we close the "and" chain
+
+                if not operator: break
+                stack.pop() # we close the parentheses
+
+            # we proceed with operator:
+            if not operator: break # there is no such thing as a double loop break.. yet
+            
+            if not RuleOp.isOperator(operator): raise RuleErr(
+                rule, f"found \"{operator}\" in unexpected position, expected operator")
+            
+            operator = RuleOp(operator)
+            if operator is RuleOp.OR and stack.currentIsAnd():
+                stack.pop()
+
+            elif operator is RuleOp.AND and not stack.currentIsAnd():
+                stack.push(operator)
+                stack.popForward()
+
+            stack.current.setOpIfMissing(operator) # buffer now knows what operator its data had
+
+    except RuleErr as err: raise err # bubble up proper errors
+    except: raise nestingErr # everything else is interpreted as a nesting error.
+
+    parsedRule = stack.obtain(nestingErr)
+    return parsedRule[0] if len(parsedRule) == 1 and isinstance(parsedRule[0], list) else parsedRule
\ No newline at end of file