Mercurial > repos > bimib > marea_2_0
changeset 289:f7812d713af5 draft default tip
Uploaded
| author | luca_milaz | 
|---|---|
| date | Tue, 09 Jul 2024 22:45:02 +0000 | 
| parents | 38a41d36bbc9 | 
| children | |
| files | utils/utils/general_utils.py utils/utils/reaction_parsing.py utils/utils/rule_parsing.py | 
| diffstat | 3 files changed, 924 insertions(+), 0 deletions(-) [+] | 
line wrap: on
 line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/utils/utils/general_utils.py Tue Jul 09 22:45:02 2024 +0000 @@ -0,0 +1,551 @@ +import math +import re +import sys +import csv +import pickle +import lxml.etree as ET + +from enum import Enum +from itertools import count +from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union + +import pandas as pd + +# FILES +class FileFormat(Enum): + """ + Encodes possible file extensions to conditionally save data in a different format. + """ + DAT = ("dat",) # this is how galaxy treats all your files! + CSV = ("csv",) # this is how most editable input data is written + TSV = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!! + + SVG = ("svg",) # this is how most metabolic maps are written + PNG = ("png",) # this is a common output format for images (such as metabolic maps) + PDF = ("pdf",) # this is also a common output format for images, as it's required in publications. + + XML = ("xml",) # this is one main way cobra models appear in + JSON = ("json",) # this is the other + + PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved + #TODO: we're in a pickle (ba dum tss), there's no point in supporting many extensions internally. The + # issue will never be solved for user-uploaded files and those are saved as .dat by galaxy anyway so it + # doesn't matter as long as we CAN recognize these 3 names as valid pickle extensions. We must however + # agree on an internal standard and use only that one, otherwise constructing usable paths becomes a nightmare. + @classmethod + def fromExt(cls, ext :str) -> "FileFormat": + """ + Converts a file extension string to a FileFormat instance. + + Args: + ext : The file extension as a string. + + Returns: + FileFormat: The FileFormat instance corresponding to the file extension. + """ + variantName = ext.upper() + if variantName in FileFormat.__members__: return FileFormat[variantName] + + variantName = variantName.lower() + for member in cls: + if variantName in member.value: return member + + raise ValueErr("ext", "a valid FileFormat file extension", ext) + + def __str__(self) -> str: + """ + (Private) converts to str representation. Good practice for usage with argparse. + + Returns: + str : the string representation of the file extension. + """ + return self.value[-1] #TODO: fix, it's the dumb pickle thing + +class FilePath(): + """ + Represents a file path. View this as an attempt to standardize file-related operations by expecting + values of this type in any process requesting a file path. + """ + def __init__(self, filePath :str, ext :FileFormat, *, prefix = "") -> None: + """ + (Private) Initializes an instance of FilePath. + + Args: + path : the end of the path, containing the file name. + ext : the file's extension. + prefix : anything before path, if the last '/' isn't there it's added by the code. + + Returns: + None : practically, a FilePath instance. + """ + self.ext = ext + self.filePath = filePath + + if prefix and prefix[-1] != '/': prefix += '/' + self.prefix = prefix + + @classmethod + def fromStrPath(cls, path :str) -> "FilePath": + """ + Factory method to parse a string from which to obtain, if possible, a valid FilePath instance. + + Args: + path : the string containing the path + + Raises: + PathErr : if the provided string doesn't represent a valid path. + + Returns: + FilePath : the constructed instance. + """ + # This method is often used to construct FilePath instances from ARGS UI arguments. These arguments *should* + # always be correct paths and could be used as raw strings, however most if not all functions that work with + # file paths request the FilePath objects specifically, which is a very good thing in any case other than this. + # What ends up happening is we spend time parsing a string into a FilePath so that the function accepts it, only + # to call show() immediately to bring back the string and open the file it points to. + # TODO: this is an indication that the arguments SHOULD BE OF TYPE FilePath if they are filepaths, this ENSURES + # their correctness when modifying the UI and avoids the pointless back-and-forth. + result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path) + if not result or not result["name"] or not result["ext"]: + raise PathErr(path, "cannot recognize folder structure or extension in path") + + prefix = result["prefix"] if result["prefix"] else "" + return cls(result["name"], FileFormat.fromExt(result["ext"]), prefix = prefix) + + def show(self) -> str: + """ + Shows the path as a string. + + Returns: + str : the path shown as a string. + """ + return f"{self.prefix}{self.filePath}.{self.ext}" + + def __str__(self) -> str: return self.show() + +# ERRORS +def terminate(msg :str) -> None: + """ + Terminate the execution of the script with an error message. + + Args: + msg (str): The error message to be displayed. + + Returns: + None + """ + sys.exit(f"Execution aborted: {msg}\n") + +def logWarning(msg :str, loggerPath :str) -> None: + """ + Log a warning message to an output log file and print it to the console. The final period and a + newline is added by the function. + + Args: + s (str): The warning message to be logged and printed. + loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and + immediately read back (beware relative expensive operation, log with caution). + + Returns: + None + """ + # building the path and then reading it immediately seems useless, but it's actually a way of + # validating that reduces repetition on the caller's side. Besides, logging a message by writing + # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from + # mindlessly logging whenever something comes up, log at the very end and tell the user everything + # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to + # the file only at the end of the program's execution. + with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n") + +class CustomErr(Exception): + """ + Custom error class to handle exceptions in a structured way, with a unique identifier and a message. + """ + __idGenerator = count() + errName = "Custom Error" + def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None: + """ + (Private) Initializes an instance of CustomErr. + + Args: + msg (str): Error message to be displayed. + details (str): Informs the user more about the error encountered. Defaults to "". + explicitErrCode (int): Explicit error code to be used. Defaults to -1. + + Returns: + None : practically, a CustomErr instance. + """ + self.msg = msg + self.details = details + + self.id = max(explicitErrCode, next(CustomErr.__idGenerator)) + + def throw(self, loggerPath = "") -> None: + """ + Raises the current CustomErr instance, logging a warning message before doing so. + + Raises: + self: The current CustomErr instance. + + Returns: + None + """ + if loggerPath: logWarning(str(self), loggerPath) + raise self + + def abort(self) -> None: + """ + Aborts the execution of the script. + + Returns: + None + """ + terminate(str(self)) + + def __str__(self) -> str: + """ + (Private) Returns a string representing the current CustomErr instance. + + Returns: + str: A string representing the current CustomErr instance. + """ + return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}." + +class ArgsErr(CustomErr): + """ + CustomErr subclass for UI arguments errors. + """ + errName = "Args Error" + def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None: + super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg) + +class DataErr(CustomErr): + """ + CustomErr subclass for data formatting errors. + """ + errName = "Data Format Error" + def __init__(self, fileName :str, msg = "no further details provided") -> None: + super().__init__(f"file \"{fileName}\" contains malformed data", msg) + +class PathErr(CustomErr): + """ + CustomErr subclass for filepath formatting errors. + """ + errName = "Path Error" + def __init__(self, path :FilePath, msg = "no further details provided") -> None: + super().__init__(f"path \"{path}\" is invalid", msg) + +class ValueErr(CustomErr): + """ + CustomErr subclass for any value error. + """ + errName = "Value Error" + def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None: + super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg) + +# RESULT +T = TypeVar('T') +E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened! +class Result(Generic[T, E]): + class ResultErr(CustomErr): + """ + CustomErr subclass for all Result errors. + """ + errName = "Result Error" + def __init__(self, msg = "no further details provided") -> None: + super().__init__(msg) + """ + Class to handle the result of an operation, with a value and a boolean flag to indicate + whether the operation was successful or not. + """ + def __init__(self, value :Union[T, E], isOk :bool) -> None: + """ + (Private) Initializes an instance of Result. + + Args: + value (Union[T, E]): The value to be stored in the Result instance. + isOk (bool): A boolean flag to indicate whether the operation was successful or not. + + Returns: + None : practically, a Result instance. + """ + self.isOk = isOk + self.isErr = not isOk + self.value = value + + @classmethod + def Ok(cls, value :T) -> "Result": + """ + Constructs a new Result instance with a successful operation. + + Args: + value (T): The value to be stored in the Result instance, set as successful. + + Returns: + Result: A new Result instance with a successful operation. + """ + return Result(value, isOk = True) + + @classmethod + def Err(cls, value :E) -> "Result": + """ + Constructs a new Result instance with a failed operation. + + Args: + value (E): The value to be stored in the Result instance, set as failed. + + Returns: + Result: A new Result instance with a failed operation. + """ + return Result(value, isOk = False) + + def unwrap(self) -> T: + """ + Unwraps the value of the Result instance, if the operation was successful. + + Raises: + ResultErr: If the operation was not successful. + + Returns: + T: The value of the Result instance, if the operation was successful. + """ + if self.isOk: return self.value + raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}") + + def unwrapOr(self, default :T) -> T: + """ + Unwraps the value of the Result instance, if the operation was successful, otherwise + it returns a default value. + + Args: + default (T): The default value to be returned if the operation was not successful. + + Returns: + T: The value of the Result instance, if the operation was successful, + otherwise the default value. + """ + return self.value if self.isOk else default + + def expect(self, err :"Result.ResultErr") -> T: + """ + Expects that the value of the Result instance is successful, otherwise it raises an error. + + Args: + err (Exception): The error to be raised if the operation was not successful. + + Raises: + err: The error raised if the operation was not successful. + + Returns: + T: The value of the Result instance, if the operation was successful. + """ + if self.isOk: return self.value + raise err + + U = TypeVar("U") + def map(self, mapper: Callable[[T], U]) -> "Result[U, E]": + """ + Maps the value of the current Result to whatever is returned by the mapper function. + If the Result contained an unsuccessful operation to begin with it remains unchanged + (a reference to the current instance is returned). + If the mapper function panics the returned result instance will be of the error kind. + + Args: + mapper (Callable[[T], U]): The mapper operation to be applied to the Result value. + + Returns: + Result[U, E]: The result of the mapper operation applied to the Result value. + """ + if self.isErr: return self + try: return Result.Ok(mapper(self.value)) + except Exception as e: return Result.Err(e) + + D = TypeVar("D", bound = "Result.ResultErr") + def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]": + """ + Maps the error of the current Result to whatever is returned by the mapper function. + If the Result contained a successful operation it remains unchanged + (a reference to the current instance is returned). + If the mapper function panics this method does as well. + + Args: + mapper (Callable[[E], D]): The mapper operation to be applied to the Result error. + + Returns: + Result[U, E]: The result of the mapper operation applied to the Result error. + """ + if self.isOk: return self + return Result.Err(mapper(self.value)) + + def __str__(self): + return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})" + +# FILES +def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame: + """ + Reads a .csv or .tsv file and returns it as a Pandas DataFrame. + + Args: + path : the path to the dataset file. + datasetName : the name of the dataset. + + Raises: + DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if + it has less than 2 columns. + + Returns: + pandas.DataFrame: The dataset loaded as a Pandas DataFrame. + """ + # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than + # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code. + # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really + # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and + # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is + # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions. + try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python") + except: + try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python") + except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}") + + if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns") + return dataset + +def readPickle(path :FilePath) -> Any: + """ + Reads the contents of a .pickle file, which needs to exist at the given path. + + Args: + path : the path to the .pickle file. + + Returns: + Any : the data inside a pickle file, could be anything. + """ + with open(path.show(), "rb") as fd: return pickle.load(fd) + +def writePickle(path :FilePath, data :Any) -> None: + """ + Saves any data in a .pickle file, created at the given path. + + Args: + path : the path to the .pickle file. + data : the data to be written to the file. + + Returns: + None + """ + with open(path.show(), "wb") as fd: pickle.dump(data, fd) + +def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]: + """ + Reads the contents of a .csv file, which needs to exist at the given path. + + Args: + path : the path to the .csv file. + delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter). + skipHeader : whether the first row of the file is a header and should be skipped. + + Returns: + List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas. + """ + with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:] + +def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree: + """ + Reads the contents of a .svg file, which needs to exist at the given path. + + Args: + path : the path to the .svg file. + + Raises: + DataErr : if the map is malformed. + + Returns: + Any : the data inside a svg file, could be anything. + """ + try: return ET.parse(path.show()) + except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err: + raise customErr if customErr else err + +def writeSvg(path :FilePath, data:ET.ElementTree) -> None: + """ + Saves svg data opened with lxml.etree in a .svg file, created at the given path. + + Args: + path : the path to the .svg file. + data : the data to be written to the file. + + Returns: + None + """ + with open(path.show(), "wb") as fd: fd.write(ET.tostring(data)) + +# UI ARGUMENTS +class Bool: + def __init__(self, argName :str) -> None: + self.argName = argName + + def __call__(self, s :str) -> bool: return self.check(s) + + def check(self, s :str) -> bool: + s = s.lower() + if s == "true" : return True + if s == "false": return False + raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"") + +class Float: + def __init__(self, argName = "Dataset values, not an argument") -> None: + self.argName = argName + + def __call__(self, s :str) -> float: return self.check(s) + + def check(self, s :str) -> float: + try: return float(s) + except ValueError: + s = s.lower() + if s == "nan" or s == "none": return math.nan + raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"") + +# MODELS +OldRule = List[Union[str, "OldRule"]] +class Model(Enum): + """ + Represents a metabolic model, either custom or locally supported. Custom models don't point + to valid file paths. + """ + + Recon = "Recon" + ENGRO2 = "ENGRO2" + HMRcore = "HMRcore" + Custom = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths. + + def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None: + if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model") + + def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]: + """ + Open "rules" file for this model. + + Returns: + Dict[str, Dict[str, OldRule]] : the rules for this model. + """ + path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") + self.__raiseMissingPathErr(path) + return readPickle(path) + + def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]: + """ + Open "gene translator (old: gene_in_rule)" file for this model. + + Returns: + Dict[str, Dict[str, str]] : the translator dict for this model. + """ + path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/") + self.__raiseMissingPathErr(path) + return readPickle(path) + + def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree: + path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/") + self.__raiseMissingPathErr(path) + return readSvg(path, customErr = DataErr(path, f"custom map in wrong format")) + + def __str__(self) -> str: return self.value \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/utils/utils/reaction_parsing.py Tue Jul 09 22:45:02 2024 +0000 @@ -0,0 +1,130 @@ +from enum import Enum +import utils.general_utils as utils +from typing import Dict +import csv +import re + +# Reaction direction encoding: +class ReactionDir(Enum): + """ + A reaction can go forwards, backwards or be reversible (able to proceed in both directions). + Models created / managed with cobrapy encode this information within the reaction's + formula using the arrows this enum keeps as values. + """ + FORWARD = "-->" + BACKWARD = "<--" + REVERSIBLE = "<=>" + + @classmethod + def fromReaction(cls, reaction :str) -> 'ReactionDir': + """ + Takes a whole reaction formula string and looks for one of the arrows, returning the + corresponding reaction direction. + + Args: + reaction : the reaction's formula. + + Raises: + ValueError : if no valid arrow is found. + + Returns: + ReactionDir : the corresponding reaction direction. + """ + for member in cls: + if member.value in reaction: return member + + raise ValueError("No valid arrow found within reaction string.") + +ReactionsDict = Dict[str, Dict[str, float]] + + +def add_custom_reaction(reactionsDict :ReactionsDict, rId :str, reaction :str) -> None: + """ + Adds an entry to the given reactionsDict. Each entry consists of a given unique reaction id + (key) and a :dict (value) matching each substrate in the reaction to its stoichiometric coefficient. + Keys and values are both obtained from the reaction's formula: if a substrate (custom metabolite id) + appears without an explicit coeff, the value 1.0 will be used instead. + + Args: + reactionsDict : dictionary encoding custom reactions information. + rId : unique reaction id. + reaction : the reaction's formula. + + Returns: + None + + Side effects: + reactionsDict : mut + """ + reaction = reaction.strip() + if not reaction: return + + reactionsDict[rId] = {} + # We assume the '+' separating consecutive metabs in a reaction is spaced from them, + # to avoid confusing it for electrical charge: + for word in reaction.split(" + "): + metabId, stoichCoeff = word, 1.0 + # Implicit stoichiometric coeff is equal to 1, some coeffs are floats. + + # Accepted coeffs can be integer or floats with a dot (.) decimal separator + # and must be separated from the metab with a space: + foundCoeff = re.search(r"\d+(\.\d+)? ", word) + if foundCoeff: + wholeMatch = foundCoeff.group(0) + metabId = word[len(wholeMatch):].strip() + stoichCoeff = float(wholeMatch.strip()) + + reactionsDict[rId][metabId] = stoichCoeff + + if not reactionsDict[rId]: del reactionsDict[rId] # Empty reactions are removed. + + +def create_reaction_dict(unparsed_reactions: Dict[str, str]) -> ReactionsDict: + """ + Parses the given dictionary into the correct format. + + Args: + unparsed_reactions (Dict[str, str]): A dictionary where keys are reaction IDs and values are unparsed reaction strings. + + Returns: + ReactionsDict: The correctly parsed dict. + """ + reactionsDict :ReactionsDict = {} + for rId, reaction in unparsed_reactions.items(): + reactionDir = ReactionDir.fromReaction(reaction) + left, right = reaction.split(f" {reactionDir.value} ") + + # Reversible reactions are split into distinct reactions, one for each direction. + # In general we only care about substrates, the product information is lost. + reactionIsReversible = reactionDir is ReactionDir.REVERSIBLE + if reactionDir is not ReactionDir.BACKWARD: + add_custom_reaction(reactionsDict, rId + "_F" * reactionIsReversible, left) + + if reactionDir is not ReactionDir.FORWARD: + add_custom_reaction(reactionsDict, rId + "_B" * reactionIsReversible, right) + + # ^^^ to further clarify: if a reaction is NOT reversible it will not be marked as _F or _B + # and whichever direction we DO keep (forward if --> and backward if <--) loses this information. + # This IS a small problem when coloring the map in marea.py because the arrow IDs in the map follow + # through with a similar convention on ALL reactions and correctly encode direction based on their + # model of origin. TODO: a proposed solution is to unify the standard in RPS to fully mimic the maps, + # which involves re-writing the "reactions" dictionary. + + return reactionsDict + + +def parse_custom_reactions(customReactionsPath :str) -> ReactionsDict: + """ + Creates a custom dictionary encoding reactions information from a csv file containing + data about these reactions, the path of which is given as input. + + Args: + customReactionsPath : path to the reactions information file. + + Returns: + ReactionsDict : dictionary encoding custom reactions information. + """ + reactionsData :Dict[str, str] = {row[0]: row[1] for row in utils.readCsv(utils.FilePath.fromStrPath(customReactionsPath))} + + return create_reaction_dict(reactionsData) +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/utils/utils/rule_parsing.py Tue Jul 09 22:45:02 2024 +0000 @@ -0,0 +1,243 @@ +from enum import Enum +import utils.general_utils as utils +from typing import List, Union, Optional + +class RuleErr(utils.CustomErr): + """ + CustomErr subclass for rule syntax errors. + """ + errName = "Rule Syntax Error" + def __init__(self, rule :str, msg = "no further details provided") -> None: + super().__init__( + f"rule \"{rule}\" is malformed, {msg}", + "please verify your input follows the validity guidelines") + +class RuleOp(Enum): + """ + Encodes all operators valid in gene rules. + """ + OR = "or" + AND = "and" + + @classmethod + def isOperator(cls, op :str) -> bool: + return op.upper() in cls.__members__ + + def __str__(self) -> str: return self.value + +class OpList(List[Union[str, "OpList"]]): + """ + Represents a parsed rule and each of its nesting levels, including the operator that level uses. + """ + def __init__(self, op :Optional[RuleOp] = None) -> None: + """ + (Private) Initializes an instance of OpList. + + Args: + op (str): Operator to be assigned to the OpList. Defaults to "". + + Returns: + None : practically, an OpList instance. + """ + self.op = op + + def setOpIfMissing(self, op :RuleOp) -> None: + """ + Sets the operator of the OpList if it's missing. + + Args: + op (str): Operator to be assigned to the OpList. + + Returns: + None + """ + if not self.op: self.op = op + + def __repr__(self, indent = "") -> str: + """ + (Private) Returns a string representation of the current OpList instance. + + Args: + indent (str): Indentation level . Defaults to "". + + Returns: + str: A string representation of the current OpList instance. + """ + nextIndent = indent + " " + return f"<{self.op}>[\n" + ",\n".join([ + f"{nextIndent}{item.__repr__(nextIndent) if isinstance(item, OpList) else item}" + for item in self ]) + f"\n{indent}]" + +class RuleStack: + """ + FILO stack structure to save the intermediate representation of a Rule during parsing, with the + current nesting level at the top of the stack. + """ + def __init__(self) -> None: + """ + (Private) initializes an instance of RuleStack. + + Returns: + None : practically, a RuleStack instance. + """ + self.__stack = [OpList()] # the stack starts out with the result list already allocated + self.__updateCurrent() + + def pop(self) -> None: + """ + Removes the OpList on top of the stack, also flattening it once when possible. + + Side Effects: + self : mut + + Returns: + None + """ + oldTop = self.__stack.pop() + if len(oldTop) == 1 and isinstance(oldTop[0], OpList): self.__stack[-1][-1] = oldTop[0] + self.__updateCurrent() + + def push(self, operator = "") -> None: + """ + Adds a new nesting level, in the form of a new OpList on top of the stack. + + Args: + operator : the operator assigned to the new OpList. + + Side Effects: + self : mut + + Returns: + None + """ + newLevel = OpList(operator) + self.current.append(newLevel) + self.__stack.append(newLevel) + self.__updateCurrent() + + def popForward(self) -> None: + """ + Moves the last "actual" item from the 2nd to last list to the beginning of the top list, as per + the example below: + stack : [list_a, list_b] + list_a : [item1, item2, list_b] --> [item1, list_b] + list_b : [item3, item4] --> [item2, item3, item4] + + This is essentially a "give back as needed" operation. + + Side Effects: + self : mut + + Returns: + None + """ + self.current.insert(0, self.__stack[-2].pop(-2)) + + def currentIsAnd(self) -> bool: + """ + Checks if the current OpList's assigned operator is "and". + + Returns: + bool : True if the current OpList's assigned operator is "and", False otherwise. + """ + return self.current.op is RuleOp.AND + + def obtain(self, err :Optional[utils.CustomErr] = None) -> Optional[OpList]: + """ + Obtains the first OpList on the stack, only if it's the only element. + + Args: + err : The error to raise if obtaining the result is not possible. + + Side Effects: + self : mut + + Raises: + err: If given, otherwise None is returned. + + Returns: + Optional[OpList]: The first OpList on the stack, only if it's the only element. + """ + + if len(self.__stack) == 1: return self.__stack.pop() + if err: raise err + return None + + def __updateCurrent(self) -> None: + """ + (Private) Updates the current OpList to the one on top of the stack. + + Side Effects: + self : mut + + Returns: + None + """ + self.current = self.__stack[-1] + +def parseRuleToNestedList(rule :str) -> OpList: + """ + Parse a single rule from its string representation to an OpList, making all priority explicit + through nesting levels. + + Args: + rule : the string representation of a rule to be parsed. + + Raises: + RuleErr : whenever something goes wrong during parsing. + + Returns: + OpList : the parsed rule. + """ + source = iter(rule + .replace("(", "( ").replace(")", " )") # Single out parens as words + .strip() # remove whitespace at extremities + .split()) # split by spaces + + stack = RuleStack() + nestingErr = RuleErr(rule, "mismatch between open and closed parentheses") + try: + while True: # keep reading until source ends + while True: + operand = next(source, None) # expected name or rule opening + if operand is None: raise RuleErr(rule, "found trailing open parentheses") + if operand == "and" or operand == "or" or operand == ")": # found operator instead, panic + raise RuleErr(rule, f"found \"{operand}\" in unexpected position") + + if operand != "(": break # found name + + # found rule opening, we add new nesting level but don't know the operator + stack.push() + + stack.current.append(operand) + + while True: # keep reading until operator is found or source ends + operator = next(source, None) # expected operator or rule closing + if operator and operator != ")": break # found operator + + if stack.currentIsAnd(): stack.pop() # we close the "and" chain + + if not operator: break + stack.pop() # we close the parentheses + + # we proceed with operator: + if not operator: break # there is no such thing as a double loop break.. yet + + if not RuleOp.isOperator(operator): raise RuleErr( + rule, f"found \"{operator}\" in unexpected position, expected operator") + + operator = RuleOp(operator) + if operator is RuleOp.OR and stack.currentIsAnd(): + stack.pop() + + elif operator is RuleOp.AND and not stack.currentIsAnd(): + stack.push(operator) + stack.popForward() + + stack.current.setOpIfMissing(operator) # buffer now knows what operator its data had + + except RuleErr as err: raise err # bubble up proper errors + except: raise nestingErr # everything else is interpreted as a nesting error. + + parsedRule = stack.obtain(nestingErr) + return parsedRule[0] if len(parsedRule) == 1 and isinstance(parsedRule[0], list) else parsedRule \ No newline at end of file
