| 489 | 1 """ | 
|  | 2 Generate Reaction Activity Scores (RAS) from a gene expression dataset and GPR rules. | 
|  | 3 | 
|  | 4 The script reads a tabular dataset (genes x samples) and a rules file (GPRs), | 
|  | 5 computes RAS per reaction for each sample/cell line, and writes a tabular output. | 
|  | 6 """ | 
| 93 | 7 from __future__ import division | 
|  | 8 import sys | 
|  | 9 import argparse | 
|  | 10 import pandas as pd | 
| 505 | 11 import numpy as np | 
| 93 | 12 import utils.general_utils as utils | 
| 529 | 13 from typing import List, Dict | 
| 505 | 14 import ast | 
| 93 | 15 | 
| 529 | 16 # Optional imports for AnnData mode (not used in ras_generator.py) | 
|  | 17 try: | 
|  | 18     from progressbar import ProgressBar, Bar, Percentage | 
|  | 19     from scanpy import AnnData | 
|  | 20     from cobra.flux_analysis.variability import find_essential_reactions, find_essential_genes | 
|  | 21 except ImportError: | 
|  | 22     # These are only needed for AnnData mode, not for ras_generator.py | 
|  | 23     pass | 
|  | 24 | 
| 93 | 25 ERRORS = [] | 
|  | 26 ########################## argparse ########################################## | 
|  | 27 ARGS :argparse.Namespace | 
| 147 | 28 def process_args(args:List[str] = None) -> argparse.Namespace: | 
| 93 | 29     """ | 
|  | 30     Processes command-line arguments. | 
|  | 31 | 
|  | 32     Args: | 
|  | 33         args (list): List of command-line arguments. | 
|  | 34 | 
|  | 35     Returns: | 
|  | 36         Namespace: An object containing parsed arguments. | 
|  | 37     """ | 
|  | 38     parser = argparse.ArgumentParser( | 
|  | 39         usage = '%(prog)s [options]', | 
|  | 40         description = "process some value's genes to create a comparison's map.") | 
|  | 41 | 
| 489 | 42     parser.add_argument("-rl", "--model_upload", type = str, | 
|  | 43         help = "path to input file containing the rules") | 
| 93 | 44 | 
| 489 | 45     parser.add_argument("-rn", "--model_upload_name", type = str, help = "custom rules name") | 
|  | 46     # Galaxy converts files into .dat, this helps infer the original extension when needed. | 
| 93 | 47 | 
|  | 48     parser.add_argument( | 
|  | 49         '-n', '--none', | 
|  | 50         type = utils.Bool("none"), default = True, | 
|  | 51         help = 'compute Nan values') | 
|  | 52 | 
|  | 53     parser.add_argument( | 
|  | 54         '-td', '--tool_dir', | 
|  | 55         type = str, | 
|  | 56         required = True, help = 'your tool directory') | 
|  | 57 | 
|  | 58     parser.add_argument( | 
|  | 59         '-ol', '--out_log', | 
|  | 60         type = str, | 
|  | 61         help = "Output log") | 
|  | 62 | 
|  | 63     parser.add_argument( | 
| 489 | 64         '-in', '--input', | 
| 93 | 65         type = str, | 
|  | 66         help = 'input dataset') | 
|  | 67 | 
|  | 68     parser.add_argument( | 
|  | 69         '-ra', '--ras_output', | 
|  | 70         type = str, | 
|  | 71         required = True, help = 'ras output') | 
| 147 | 72 | 
| 93 | 73 | 
| 147 | 74     return parser.parse_args(args) | 
| 93 | 75 | 
|  | 76 ############################ dataset input #################################### | 
|  | 77 def read_dataset(data :str, name :str) -> pd.DataFrame: | 
|  | 78     """ | 
|  | 79     Read a dataset from a CSV file and return it as a pandas DataFrame. | 
|  | 80 | 
|  | 81     Args: | 
|  | 82         data (str): Path to the CSV file containing the dataset. | 
|  | 83         name (str): Name of the dataset, used in error messages. | 
|  | 84 | 
|  | 85     Returns: | 
|  | 86         pandas.DataFrame: DataFrame containing the dataset. | 
|  | 87 | 
|  | 88     Raises: | 
|  | 89         pd.errors.EmptyDataError: If the CSV file is empty. | 
|  | 90         sys.exit: If the CSV file has the wrong format, the execution is aborted. | 
|  | 91     """ | 
|  | 92     try: | 
| 505 | 93         dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python', index_col=0) | 
|  | 94         dataset = dataset.astype(float) | 
| 93 | 95     except pd.errors.EmptyDataError: | 
| 505 | 96         sys.exit('Execution aborted: wrong file format of ' + name + '\n') | 
| 93 | 97     if len(dataset.columns) < 2: | 
| 505 | 98         sys.exit('Execution aborted: wrong file format of ' + name + '\n') | 
| 93 | 99     return dataset | 
|  | 100 | 
|  | 101 | 
| 505 | 102 def load_custom_rules() -> Dict[str,str]: | 
| 93 | 103     """ | 
|  | 104     Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be | 
|  | 105     performed, significantly impacting the runtime. | 
|  | 106 | 
|  | 107     Returns: | 
|  | 108         Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules. | 
|  | 109     """ | 
| 489 | 110     datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload)  # actual file, stored in Galaxy as a .dat | 
|  | 111 | 
|  | 112     dict_rule = {} | 
|  | 113 | 
|  | 114     try: | 
|  | 115         rows = utils.readCsv(datFilePath, delimiter = "\t", skipHeader=False) | 
|  | 116         if len(rows) <= 1: | 
|  | 117             raise ValueError("Model tabular with 1 column is not supported.") | 
| 381 | 118 | 
| 489 | 119         if not rows: | 
|  | 120             raise ValueError("Model tabular is file is empty.") | 
|  | 121 | 
|  | 122         id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR") | 
|  | 123 | 
|  | 124     # First, try using a tab delimiter | 
|  | 125         for line in rows[1:]: | 
|  | 126             if len(line) <= idx_gpr: | 
|  | 127                 utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log) | 
|  | 128                 continue | 
|  | 129 | 
| 505 | 130             dict_rule[line[id_idx]] = line[idx_gpr] | 
|  | 131 | 
| 489 | 132     except Exception as e: | 
|  | 133         # If parsing with tabs fails, try comma delimiter | 
|  | 134         try: | 
|  | 135             rows = utils.readCsv(datFilePath, delimiter = ",", skipHeader=False) | 
|  | 136 | 
|  | 137             if len(rows) <= 1: | 
|  | 138                 raise ValueError("Model tabular with 1 column is not supported.") | 
|  | 139 | 
|  | 140             if not rows: | 
|  | 141                 raise ValueError("Model tabular is file is empty.") | 
|  | 142 | 
|  | 143             id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR") | 
|  | 144 | 
|  | 145             # Try again parsing row content with the GPR column using comma-separated values | 
|  | 146             for line in rows[1:]: | 
|  | 147                 if len(line) <= idx_gpr: | 
|  | 148                     utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log) | 
|  | 149                     continue | 
|  | 150 | 
| 505 | 151                 dict_rule[line[id_idx]] =line[idx_gpr] | 
| 489 | 152 | 
|  | 153         except Exception as e2: | 
|  | 154             raise ValueError(f"Unable to parse rules file. Tried both tab and comma delimiters. Original errors: Tab: {e}, Comma: {e2}") | 
|  | 155 | 
|  | 156     if not dict_rule: | 
|  | 157             raise ValueError("No valid rules found in the uploaded file. Please check the file format.") | 
| 93 | 158     # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed. | 
| 489 | 159     return dict_rule | 
|  | 160 | 
| 401 | 161 | 
| 529 | 162 """ | 
|  | 163 Class to compute the RAS values | 
| 505 | 164 | 
| 529 | 165 """ | 
|  | 166 | 
|  | 167 class RAS_computation: | 
|  | 168 | 
|  | 169     def __init__(self, adata=None, model=None, dataset=None, gene_rules=None, rules_total_string=None): | 
|  | 170         """ | 
|  | 171         Initialize RAS computation with two possible input modes: | 
|  | 172 | 
|  | 173         Mode 1 (Original - for sampling_main.py): | 
|  | 174             adata: AnnData object with gene expression (cells × genes) | 
|  | 175             model: COBRApy model object with reactions and GPRs | 
|  | 176 | 
|  | 177         Mode 2 (New - for ras_generator.py): | 
|  | 178             dataset: pandas DataFrame with gene expression (genes × samples) | 
|  | 179             gene_rules: dict mapping reaction IDs to GPR strings | 
|  | 180             rules_total_string: list of all gene names in GPRs (for validation) | 
|  | 181         """ | 
|  | 182         self._logic_operators = ['and', 'or', '(', ')'] | 
|  | 183         self.val_nan = np.nan | 
|  | 184 | 
|  | 185         # Determine which mode we're in | 
|  | 186         if adata is not None and model is not None: | 
|  | 187             # Mode 1: AnnData + COBRApy model (original) | 
|  | 188             self._init_from_anndata(adata, model) | 
|  | 189         elif dataset is not None and gene_rules is not None: | 
|  | 190             # Mode 2: DataFrame + rules dict (ras_generator style) | 
|  | 191             self._init_from_dataframe(dataset, gene_rules, rules_total_string) | 
|  | 192         else: | 
|  | 193             raise ValueError( | 
|  | 194                 "Invalid initialization. Provide either:\n" | 
|  | 195                 "  - adata + model (for AnnData input), or\n" | 
|  | 196                 "  - dataset + gene_rules (for DataFrame input)" | 
|  | 197             ) | 
|  | 198 | 
|  | 199     def _normalize_gene_name(self, gene_name): | 
|  | 200         """Normalize gene names by replacing special characters.""" | 
|  | 201         return gene_name.replace("-", "_").replace(":", "_") | 
|  | 202 | 
|  | 203     def _normalize_rule(self, rule): | 
|  | 204         """Normalize GPR rule: lowercase operators, add spaces around parentheses, normalize gene names.""" | 
|  | 205         rule = rule.replace("OR", "or").replace("AND", "and") | 
|  | 206         rule = rule.replace("(", "( ").replace(")", " )") | 
|  | 207         # Normalize gene names in the rule | 
|  | 208         tokens = rule.split() | 
|  | 209         normalized_tokens = [token if token in self._logic_operators else self._normalize_gene_name(token) for token in tokens] | 
|  | 210         return " ".join(normalized_tokens) | 
|  | 211 | 
|  | 212     def _init_from_anndata(self, adata, model): | 
|  | 213         """Initialize from AnnData and COBRApy model (original mode).""" | 
|  | 214         # Build the dictionary for the GPRs | 
|  | 215         df_reactions = pd.DataFrame(index=[reaction.id for reaction in model.reactions]) | 
|  | 216         gene_rules = [self._normalize_rule(reaction.gene_reaction_rule) for reaction in model.reactions] | 
|  | 217         df_reactions['rule'] = gene_rules | 
|  | 218         df_reactions = df_reactions.reset_index() | 
|  | 219         df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x))) | 
|  | 220 | 
|  | 221         self.dict_rule_reactions = df_reactions.to_dict()['index'] | 
|  | 222 | 
|  | 223         # build useful structures for RAS computation | 
|  | 224         self.model = model | 
|  | 225         self.count_adata = adata.copy() | 
|  | 226 | 
|  | 227         # Normalize gene names in both model and dataset | 
|  | 228         model_genes = [self._normalize_gene_name(gene.id) for gene in model.genes] | 
|  | 229         dataset_genes = [self._normalize_gene_name(gene) for gene in self.count_adata.var.index] | 
|  | 230         self.genes = pd.Index(dataset_genes).intersection(model_genes) | 
|  | 231 | 
|  | 232         if len(self.genes) == 0: | 
|  | 233             raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.") | 
|  | 234 | 
|  | 235         self.cell_ids = list(self.count_adata.obs.index.values) | 
|  | 236         # Get expression data with normalized gene names | 
|  | 237         self.count_df_filtered = self.count_adata.to_df().T | 
|  | 238         self.count_df_filtered.index = [self._normalize_gene_name(g) for g in self.count_df_filtered.index] | 
|  | 239         self.count_df_filtered = self.count_df_filtered.loc[self.genes] | 
|  | 240 | 
|  | 241     def _init_from_dataframe(self, dataset, gene_rules, rules_total_string): | 
|  | 242         """Initialize from DataFrame and rules dict (ras_generator mode).""" | 
|  | 243         reactions = list(gene_rules.keys()) | 
|  | 244 | 
|  | 245         # Build the dictionary for the GPRs | 
|  | 246         df_reactions = pd.DataFrame(index=reactions) | 
|  | 247         gene_rules_list = [self._normalize_rule(gene_rules[reaction_id]) for reaction_id in reactions] | 
|  | 248         df_reactions['rule'] = gene_rules_list | 
|  | 249         df_reactions = df_reactions.reset_index() | 
|  | 250         df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x))) | 
|  | 251 | 
|  | 252         self.dict_rule_reactions = df_reactions.to_dict()['index'] | 
|  | 253 | 
|  | 254         # build useful structures for RAS computation | 
|  | 255         self.model = None | 
|  | 256         self.count_adata = None | 
|  | 257 | 
|  | 258         # Normalize gene names in dataset | 
|  | 259         dataset_normalized = dataset.copy() | 
|  | 260         dataset_normalized.index = [self._normalize_gene_name(g) for g in dataset_normalized.index] | 
|  | 261 | 
|  | 262         # Determine which genes are in both dataset and GPRs | 
|  | 263         if rules_total_string is not None: | 
|  | 264             rules_genes = [self._normalize_gene_name(g) for g in rules_total_string] | 
|  | 265             self.genes = dataset_normalized.index.intersection(rules_genes) | 
|  | 266         else: | 
|  | 267             # Extract all genes from rules | 
|  | 268             all_genes_in_rules = set() | 
|  | 269             for rule in gene_rules_list: | 
|  | 270                 tokens = rule.split() | 
|  | 271                 for token in tokens: | 
|  | 272                     if token not in self._logic_operators: | 
|  | 273                         all_genes_in_rules.add(token) | 
|  | 274             self.genes = dataset_normalized.index.intersection(all_genes_in_rules) | 
|  | 275 | 
|  | 276         if len(self.genes) == 0: | 
|  | 277             raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.") | 
|  | 278 | 
|  | 279         self.cell_ids = list(dataset_normalized.columns) | 
|  | 280         self.count_df_filtered = dataset_normalized.loc[self.genes] | 
|  | 281 | 
|  | 282     def compute(self, | 
|  | 283                 or_expression=np.sum,       # type of operation to do in case of an or expression (sum, max, mean) | 
|  | 284                 and_expression=np.min,      # type of operation to do in case of an and expression(min, sum) | 
|  | 285                 drop_na_rows=True,          # if True remove the nan rows of the ras  matrix | 
|  | 286                 drop_duplicates=False,      # if true, remove duplicates rows | 
|  | 287                 ignore_nan=True,            # if True, ignore NaN values in GPR evaluation (e.g., A or NaN -> A) | 
|  | 288                 print_progressbar=True,     # if True, print the progress bar | 
|  | 289                 add_count_metadata=True,    # if True add metadata of cells in the ras adata | 
|  | 290                 add_met_metadata=True,      # if True add metadata from the metabolic model (gpr and compartments of reactions) | 
|  | 291                 add_essential_reactions=False, | 
|  | 292                 add_essential_genes=False | 
|  | 293                 ): | 
|  | 294 | 
|  | 295         self.or_function = or_expression | 
|  | 296         self.and_function = and_expression | 
|  | 297 | 
|  | 298         ras_df = np.full((len(self.dict_rule_reactions), len(self.cell_ids)), np.nan) | 
|  | 299         genes_not_mapped = set()  # Track genes not in dataset | 
|  | 300 | 
|  | 301         if print_progressbar: | 
|  | 302             pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=len(self.dict_rule_reactions)).start() | 
|  | 303 | 
|  | 304         # Process each unique GPR rule | 
|  | 305         for ind, (rule, reaction_ids) in enumerate(self.dict_rule_reactions.items()): | 
|  | 306             if len(rule) == 0: | 
|  | 307                 # Empty rule - keep as NaN | 
|  | 308                 pass | 
|  | 309             else: | 
|  | 310                 # Extract genes from rule | 
|  | 311                 rule_genes = [token for token in rule.split() if token not in self._logic_operators] | 
|  | 312                 rule_genes_unique = list(set(rule_genes)) | 
|  | 313 | 
|  | 314                 # Which genes are in the dataset? | 
|  | 315                 genes_present = [g for g in rule_genes_unique if g in self.genes] | 
|  | 316                 genes_missing = [g for g in rule_genes_unique if g not in self.genes] | 
|  | 317 | 
|  | 318                 if genes_missing: | 
|  | 319                     genes_not_mapped.update(genes_missing) | 
|  | 320 | 
|  | 321                 if len(genes_present) == 0: | 
|  | 322                     # No genes in dataset - keep as NaN | 
|  | 323                     pass | 
|  | 324                 elif len(genes_missing) > 0 and not ignore_nan: | 
|  | 325                     # Some genes missing and we don't ignore NaN - set to NaN | 
|  | 326                     pass | 
|  | 327                 else: | 
|  | 328                     # Evaluate the GPR expression using AST | 
|  | 329                     # For single gene, AST handles it fine: ast.parse("GENE_A") works | 
|  | 330                     try: | 
|  | 331                         tree = ast.parse(rule, mode="eval").body | 
|  | 332                         data = self.count_df_filtered.loc[genes_present] | 
|  | 333 | 
|  | 334                         # Evaluate for each cell/sample | 
|  | 335                         for j, col in enumerate(data.columns): | 
|  | 336                             gene_values = dict(zip(data.index, data[col].values)) | 
|  | 337                             ras_df[ind, j] = self._evaluate_ast(tree, gene_values, self.or_function, self.and_function, ignore_nan) | 
|  | 338                     except: | 
|  | 339                         # If parsing fails, keep as NaN | 
|  | 340                         pass | 
|  | 341 | 
|  | 342             if print_progressbar: | 
|  | 343                 pbar.update(ind + 1) | 
|  | 344 | 
|  | 345         if print_progressbar: | 
|  | 346             pbar.finish() | 
|  | 347 | 
|  | 348         # Store genes not mapped for later use | 
|  | 349         self.genes_not_mapped = sorted(genes_not_mapped) | 
|  | 350 | 
|  | 351         # create the dataframe of ras (rules x samples) | 
|  | 352         ras_df = pd.DataFrame(data=ras_df, index=range(len(self.dict_rule_reactions)), columns=self.cell_ids) | 
|  | 353         ras_df['REACTIONS'] = [reaction_ids for rule, reaction_ids in self.dict_rule_reactions.items()] | 
|  | 354 | 
|  | 355         reactions_common = pd.DataFrame() | 
|  | 356         reactions_common["REACTIONS"] = ras_df['REACTIONS'] | 
|  | 357         reactions_common["proof2"] = ras_df['REACTIONS'] | 
|  | 358         reactions_common = reactions_common.explode('REACTIONS') | 
|  | 359         reactions_common = reactions_common.set_index("REACTIONS") | 
|  | 360 | 
|  | 361         ras_df = ras_df.explode("REACTIONS") | 
|  | 362         ras_df = ras_df.set_index("REACTIONS") | 
|  | 363 | 
|  | 364         if drop_na_rows: | 
|  | 365             ras_df = ras_df.dropna(how="all") | 
|  | 366 | 
|  | 367         if drop_duplicates: | 
|  | 368             ras_df = ras_df.drop_duplicates() | 
|  | 369 | 
|  | 370         # If initialized from DataFrame (ras_generator mode), return DataFrame instead of AnnData | 
|  | 371         if self.count_adata is None: | 
|  | 372             return ras_df, self.genes_not_mapped | 
|  | 373 | 
|  | 374         # Original AnnData mode: create AnnData structure for RAS | 
|  | 375         ras_adata = AnnData(ras_df.T) | 
|  | 376 | 
|  | 377         #add metadata | 
|  | 378         if add_count_metadata: | 
|  | 379             ras_adata.var["common_gprs"] = reactions_common.loc[ras_df.index] | 
|  | 380             ras_adata.var["common_gprs"] = ras_adata.var["common_gprs"].apply(lambda x: ",".join(x)) | 
|  | 381             for el in self.count_adata.obs.columns: | 
|  | 382                 ras_adata.obs["countmatrix_"+el]=self.count_adata.obs[el] | 
|  | 383 | 
|  | 384         if add_met_metadata: | 
|  | 385             if self.model is not None and len(self.model.compartments)>0: | 
|  | 386                   ras_adata.var['compartments']=[list(self.model.reactions.get_by_id(reaction).compartments) for reaction in ras_adata.var.index] | 
|  | 387                   ras_adata.var['compartments']=ras_adata.var["compartments"].apply(lambda x: ",".join(x)) | 
|  | 388 | 
|  | 389             if self.model is not None: | 
|  | 390                 ras_adata.var['GPR rule'] = [self.model.reactions.get_by_id(reaction).gene_reaction_rule for reaction in ras_adata.var.index] | 
|  | 391 | 
|  | 392         if add_essential_reactions: | 
|  | 393             if self.model is not None: | 
|  | 394                 essential_reactions=find_essential_reactions(self.model) | 
|  | 395                 essential_reactions=[el.id for el in essential_reactions] | 
|  | 396                 ras_adata.var['essential reactions']=["yes" if el in essential_reactions else "no" for el in ras_adata.var.index] | 
|  | 397 | 
|  | 398         if add_essential_genes: | 
|  | 399             if self.model is not None: | 
|  | 400                 essential_genes=find_essential_genes(self.model) | 
|  | 401                 essential_genes=[el.id for el in essential_genes] | 
|  | 402                 ras_adata.var['essential genes']=[" ".join([gene for gene in genes.split()  if gene in essential_genes]) for genes in ras_adata.var["GPR rule"]] | 
|  | 403 | 
|  | 404         return ras_adata | 
|  | 405 | 
|  | 406     def _evaluate_ast(self, node, values, or_function, and_function, ignore_nan): | 
|  | 407         """ | 
|  | 408         Evaluate a boolean expression using AST (Abstract Syntax Tree). | 
|  | 409         Handles all GPR types: single gene, simple (A and B), nested (A or (B and C)). | 
|  | 410 | 
|  | 411         Args: | 
|  | 412             node: AST node to evaluate | 
|  | 413             values: Dictionary mapping gene names to their expression values | 
|  | 414             or_function: Function to apply for OR operations | 
|  | 415             and_function: Function to apply for AND operations | 
|  | 416             ignore_nan: If True, ignore None/NaN values (e.g., A or None -> A) | 
|  | 417 | 
|  | 418         Returns: | 
|  | 419             Evaluated expression result (float or np.nan) | 
|  | 420         """ | 
|  | 421         if isinstance(node, ast.BoolOp): | 
|  | 422             # Boolean operation (and/or) | 
|  | 423             vals = [self._evaluate_ast(v, values, or_function, and_function, ignore_nan) for v in node.values] | 
|  | 424 | 
|  | 425             if ignore_nan: | 
|  | 426                 # Filter out None/NaN values | 
|  | 427                 vals = [v for v in vals if v is not None and not (isinstance(v, float) and np.isnan(v))] | 
|  | 428 | 
|  | 429             if not vals: | 
|  | 430                 return np.nan | 
|  | 431 | 
|  | 432             if isinstance(node.op, ast.Or): | 
|  | 433                 return or_function(vals) | 
|  | 434             elif isinstance(node.op, ast.And): | 
|  | 435                 return and_function(vals) | 
|  | 436 | 
|  | 437         elif isinstance(node, ast.Name): | 
|  | 438             # Variable (gene name) | 
|  | 439             return values.get(node.id, None) | 
|  | 440         elif isinstance(node, ast.Constant): | 
|  | 441             # Constant (shouldn't happen in GPRs, but handle it) | 
|  | 442             return values.get(str(node.value), None) | 
|  | 443         else: | 
|  | 444             raise ValueError(f"Unexpected node type in GPR: {ast.dump(node)}") | 
| 505 | 445 | 
|  | 446 | 
| 529 | 447 # ============================================================================ | 
|  | 448 # STANDALONE FUNCTION FOR RAS_GENERATOR COMPATIBILITY | 
|  | 449 # ============================================================================ | 
| 505 | 450 | 
| 529 | 451 def computeRAS( | 
|  | 452     dataset, | 
|  | 453     gene_rules, | 
|  | 454     rules_total_string, | 
|  | 455     or_function=np.sum, | 
|  | 456     and_function=np.min, | 
|  | 457     ignore_nan=True | 
|  | 458 ): | 
|  | 459     """ | 
|  | 460     Compute RAS from tabular data and GPR rules (ras_generator.py compatible). | 
|  | 461 | 
|  | 462     This is a standalone function that wraps the RAS_computation class | 
|  | 463     to provide the same interface as ras_generator.py. | 
|  | 464 | 
|  | 465     Args: | 
|  | 466         dataset: pandas DataFrame with gene expression (genes × samples) | 
|  | 467         gene_rules: dict mapping reaction IDs to GPR strings | 
|  | 468         rules_total_string: list of all gene names in GPRs | 
|  | 469         or_function: function for OR operations (default: np.sum) | 
|  | 470         and_function: function for AND operations (default: np.min) | 
|  | 471         ignore_nan: if True, ignore NaN in GPR evaluation (default: True) | 
| 505 | 472 | 
| 529 | 473     Returns: | 
|  | 474         tuple: (ras_df, genes_not_mapped) | 
|  | 475             - ras_df: DataFrame with RAS values (reactions × samples) | 
|  | 476             - genes_not_mapped: list of genes in GPRs not found in dataset | 
|  | 477     """ | 
|  | 478     # Create RAS computation object in DataFrame mode | 
|  | 479     ras_obj = RAS_computation( | 
|  | 480         dataset=dataset, | 
|  | 481         gene_rules=gene_rules, | 
|  | 482         rules_total_string=rules_total_string | 
|  | 483     ) | 
| 505 | 484 | 
| 529 | 485     # Compute RAS | 
|  | 486     result = ras_obj.compute( | 
|  | 487         or_expression=or_function, | 
|  | 488         and_expression=and_function, | 
|  | 489         ignore_nan=ignore_nan, | 
|  | 490         print_progressbar=False,  # No progress bar for ras_generator | 
|  | 491         add_count_metadata=False,  # No metadata in DataFrame mode | 
|  | 492         add_met_metadata=False, | 
|  | 493         add_essential_reactions=False, | 
|  | 494         add_essential_genes=False | 
|  | 495     ) | 
|  | 496 | 
|  | 497     # Result is a tuple (ras_df, genes_not_mapped) in DataFrame mode | 
|  | 498     return result | 
| 505 | 499 | 
| 147 | 500 def main(args:List[str] = None) -> None: | 
| 93 | 501     """ | 
|  | 502     Initializes everything and sets the program in motion based on the fronted input arguments. | 
|  | 503 | 
|  | 504     Returns: | 
|  | 505         None | 
|  | 506     """ | 
|  | 507     # get args from frontend (related xml) | 
|  | 508     global ARGS | 
| 147 | 509     ARGS = process_args(args) | 
| 309 | 510 | 
| 505 | 511     # read dataset and remove versioning from gene names | 
| 93 | 512     dataset = read_dataset(ARGS.input, "dataset") | 
| 510 | 513     orig_gene_list=dataset.index.copy() | 
|  | 514     dataset.index =  [str(el.split(".")[0]) for el in dataset.index] | 
|  | 515 | 
| 505 | 516     #load GPR rules | 
| 489 | 517     rules = load_custom_rules() | 
| 505 | 518 | 
|  | 519     #create a list of all the gpr | 
|  | 520     rules_total_string="" | 
|  | 521     for id,rule in rules.items(): | 
|  | 522         rules_total_string+=rule.replace("(","").replace(")","") + " " | 
|  | 523     rules_total_string=list(set(rules_total_string.split(" "))) | 
| 93 | 524 | 
| 512 | 525     if any(dataset.index.duplicated(keep=False)): | 
|  | 526         genes_duplicates=orig_gene_list[dataset.index.duplicated(keep=False)] | 
|  | 527         genes_duplicates_in_model=[elem for elem in genes_duplicates if elem in rules_total_string] | 
| 513 | 528 | 
| 512 | 529         if len(genes_duplicates_in_model)>0:#metabolic genes have duplicated entries in the dataset | 
|  | 530             list_str=", ".join(genes_duplicates_in_model) | 
| 513 | 531             list_genes=f"ERROR: Duplicate entries in the gene dataset present in one or more GPR. The following metabolic genes are duplicated: "+list_str | 
|  | 532             raise ValueError(list_genes) | 
|  | 533         else: | 
|  | 534             list_str=", ".join(genes_duplicates) | 
|  | 535             list_genes=f"INFO: Duplicate entries in the gene dataset. The following genes are duplicated in the dataset but not mentioned in the GPRs: "+list_str | 
|  | 536             utils.logWarning(list_genes,ARGS.out_log) | 
| 512 | 537 | 
| 505 | 538     #check if nan value must be ignored in the GPR | 
|  | 539     if ARGS.none: | 
|  | 540     #    #e.g. (A or nan --> A) | 
|  | 541         ignore_nan = True | 
|  | 542     else: | 
|  | 543         #e.g. (A or nan --> nan) | 
|  | 544         ignore_nan = False | 
|  | 545 | 
|  | 546     #compure ras | 
| 510 | 547     ras_df,genes_not_mapped=computeRAS(dataset,rules, | 
| 505 | 548                     rules_total_string, | 
|  | 549                     or_function=np.sum,    # type of operation to do in case of an or expression (max, sum, mean) | 
|  | 550                     and_function=np.min, | 
|  | 551                     ignore_nan=ignore_nan) | 
|  | 552 | 
|  | 553     #save to csv and replace nan with None | 
| 510 | 554     ras_df.replace([np.nan,None],"None").to_csv(ARGS.ras_output, sep = '\t') | 
| 381 | 555 | 
| 510 | 556     #report genes not present in the data | 
|  | 557     if len(genes_not_mapped)>0: | 
|  | 558         genes_not_mapped_str=", ".join(genes_not_mapped) | 
|  | 559         utils.logWarning( | 
| 513 | 560         f"INFO: The following genes are mentioned in the GPR rules but don't appear in the dataset: "+genes_not_mapped_str, | 
| 510 | 561         ARGS.out_log) | 
|  | 562 | 
| 489 | 563     print("Execution succeeded") | 
| 93 | 564 | 
|  | 565 ############################################################################### | 
|  | 566 if __name__ == "__main__": | 
| 505 | 567     main() |