| 542 | 1 """ | 
|  | 2 Generate Reaction Activity Scores (RAS) from a gene expression dataset and GPR rules. | 
|  | 3 | 
|  | 4 The script reads a tabular dataset (genes x samples) and a rules file (GPRs), | 
|  | 5 computes RAS per reaction for each sample/cell line, and writes a tabular output. | 
|  | 6 """ | 
| 544 | 7 from __future__ import division | 
| 543 | 8 import os | 
| 542 | 9 import sys | 
|  | 10 import argparse | 
|  | 11 import pandas as pd | 
|  | 12 import numpy as np | 
|  | 13 try: | 
|  | 14     from .utils import general_utils as utils | 
|  | 15 except: | 
|  | 16     import utils.general_utils as utils | 
|  | 17 from typing import List, Dict | 
|  | 18 import ast | 
|  | 19 | 
|  | 20 # Optional imports for AnnData mode (not used in ras_generator.py) | 
|  | 21 try: | 
|  | 22     from progressbar import ProgressBar, Bar, Percentage | 
|  | 23     from scanpy import AnnData | 
|  | 24     from cobra.flux_analysis.variability import find_essential_reactions, find_essential_genes | 
|  | 25 except ImportError: | 
|  | 26     # These are only needed for AnnData mode, not for ras_generator.py | 
|  | 27     pass | 
|  | 28 | 
|  | 29 ERRORS = [] | 
|  | 30 ########################## argparse ########################################## | 
|  | 31 ARGS :argparse.Namespace | 
|  | 32 def process_args(args:List[str] = None) -> argparse.Namespace: | 
|  | 33     """ | 
|  | 34     Processes command-line arguments. | 
|  | 35 | 
|  | 36     Args: | 
|  | 37         args (list): List of command-line arguments. | 
|  | 38 | 
|  | 39     Returns: | 
|  | 40         Namespace: An object containing parsed arguments. | 
|  | 41     """ | 
|  | 42     parser = argparse.ArgumentParser( | 
|  | 43         usage = '%(prog)s [options]', | 
|  | 44         description = "process some value's genes to create a comparison's map.") | 
|  | 45 | 
|  | 46     parser.add_argument("-rl", "--model_upload", type = str, | 
|  | 47         help = "path to input file containing the rules") | 
|  | 48 | 
|  | 49     parser.add_argument("-rn", "--model_upload_name", type = str, help = "custom rules name") | 
|  | 50     # Galaxy converts files into .dat, this helps infer the original extension when needed. | 
|  | 51 | 
|  | 52     parser.add_argument( | 
|  | 53         '-n', '--none', | 
|  | 54         type = utils.Bool("none"), default = True, | 
|  | 55         help = 'compute Nan values') | 
|  | 56 | 
|  | 57     parser.add_argument( | 
|  | 58         '-td', '--tool_dir', | 
|  | 59         type = str, | 
|  | 60         default = os.path.dirname(os.path.abspath(__file__)), | 
|  | 61         help = 'your tool directory (default: auto-detected package location)') | 
|  | 62 | 
|  | 63     parser.add_argument( | 
|  | 64         '-ol', '--out_log', | 
|  | 65         type = str, | 
|  | 66         help = "Output log") | 
|  | 67 | 
|  | 68     parser.add_argument( | 
|  | 69         '-in', '--input', | 
|  | 70         type = str, | 
|  | 71         help = 'input dataset') | 
|  | 72 | 
|  | 73     parser.add_argument( | 
|  | 74         '-ra', '--ras_output', | 
|  | 75         type = str, | 
|  | 76         required = True, help = 'ras output') | 
|  | 77 | 
|  | 78 | 
|  | 79     return parser.parse_args(args) | 
|  | 80 | 
|  | 81 ############################ dataset input #################################### | 
|  | 82 def read_dataset(data :str, name :str) -> pd.DataFrame: | 
|  | 83     """ | 
|  | 84     Read a dataset from a CSV file and return it as a pandas DataFrame. | 
|  | 85 | 
|  | 86     Args: | 
|  | 87         data (str): Path to the CSV file containing the dataset. | 
|  | 88         name (str): Name of the dataset, used in error messages. | 
|  | 89 | 
|  | 90     Returns: | 
|  | 91         pandas.DataFrame: DataFrame containing the dataset. | 
|  | 92 | 
|  | 93     Raises: | 
|  | 94         pd.errors.EmptyDataError: If the CSV file is empty. | 
|  | 95         sys.exit: If the CSV file has the wrong format, the execution is aborted. | 
|  | 96     """ | 
|  | 97     try: | 
|  | 98         dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python', index_col=0) | 
|  | 99         dataset = dataset.astype(float) | 
|  | 100     except pd.errors.EmptyDataError: | 
|  | 101         sys.exit('Execution aborted: wrong file format of ' + name + '\n') | 
|  | 102     if len(dataset.columns) < 2: | 
|  | 103         sys.exit('Execution aborted: wrong file format of ' + name + '\n') | 
|  | 104     return dataset | 
|  | 105 | 
|  | 106 | 
|  | 107 def load_custom_rules() -> Dict[str,str]: | 
|  | 108     """ | 
|  | 109     Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be | 
|  | 110     performed, significantly impacting the runtime. | 
|  | 111 | 
|  | 112     Returns: | 
|  | 113         Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules. | 
|  | 114     """ | 
|  | 115     datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload)  # actual file, stored in Galaxy as a .dat | 
|  | 116 | 
|  | 117     dict_rule = {} | 
|  | 118 | 
|  | 119     try: | 
|  | 120         rows = utils.readCsv(datFilePath, delimiter = "\t", skipHeader=False) | 
|  | 121         if len(rows) <= 1: | 
|  | 122             raise ValueError("Model tabular with 1 column is not supported.") | 
|  | 123 | 
|  | 124         if not rows: | 
|  | 125             raise ValueError("Model tabular is file is empty.") | 
|  | 126 | 
|  | 127         id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR") | 
|  | 128 | 
|  | 129     # First, try using a tab delimiter | 
|  | 130         for line in rows[1:]: | 
|  | 131             if len(line) <= idx_gpr: | 
|  | 132                 utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log) | 
|  | 133                 continue | 
|  | 134 | 
|  | 135             dict_rule[line[id_idx]] = line[idx_gpr] | 
|  | 136 | 
|  | 137     except Exception as e: | 
|  | 138         # If parsing with tabs fails, try comma delimiter | 
|  | 139         try: | 
|  | 140             rows = utils.readCsv(datFilePath, delimiter = ",", skipHeader=False) | 
|  | 141 | 
|  | 142             if len(rows) <= 1: | 
|  | 143                 raise ValueError("Model tabular with 1 column is not supported.") | 
|  | 144 | 
|  | 145             if not rows: | 
|  | 146                 raise ValueError("Model tabular is file is empty.") | 
|  | 147 | 
|  | 148             id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR") | 
|  | 149 | 
|  | 150             # Try again parsing row content with the GPR column using comma-separated values | 
|  | 151             for line in rows[1:]: | 
|  | 152                 if len(line) <= idx_gpr: | 
|  | 153                     utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log) | 
|  | 154                     continue | 
|  | 155 | 
|  | 156                 dict_rule[line[id_idx]] =line[idx_gpr] | 
|  | 157 | 
|  | 158         except Exception as e2: | 
|  | 159             raise ValueError(f"Unable to parse rules file. Tried both tab and comma delimiters. Original errors: Tab: {e}, Comma: {e2}") | 
|  | 160 | 
|  | 161     if not dict_rule: | 
|  | 162             raise ValueError("No valid rules found in the uploaded file. Please check the file format.") | 
|  | 163     # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed. | 
|  | 164     return dict_rule | 
|  | 165 | 
|  | 166 | 
|  | 167 """ | 
|  | 168 Class to compute the RAS values | 
|  | 169 | 
|  | 170 """ | 
|  | 171 | 
|  | 172 class RAS_computation: | 
|  | 173 | 
|  | 174     def __init__(self, adata=None, model=None, dataset=None, gene_rules=None, rules_total_string=None): | 
|  | 175         """ | 
|  | 176         Initialize RAS computation with two possible input modes: | 
|  | 177 | 
|  | 178         Mode 1 (Original - for sampling_main.py): | 
|  | 179             adata: AnnData object with gene expression (cells × genes) | 
|  | 180             model: COBRApy model object with reactions and GPRs | 
|  | 181 | 
|  | 182         Mode 2 (New - for ras_generator.py): | 
|  | 183             dataset: pandas DataFrame with gene expression (genes × samples) | 
|  | 184             gene_rules: dict mapping reaction IDs to GPR strings | 
|  | 185             rules_total_string: list of all gene names in GPRs (for validation) | 
|  | 186         """ | 
|  | 187         self._logic_operators = ['and', 'or', '(', ')'] | 
|  | 188         self.val_nan = np.nan | 
|  | 189 | 
|  | 190         # Determine which mode we're in | 
|  | 191         if adata is not None and model is not None: | 
|  | 192             # Mode 1: AnnData + COBRApy model (original) | 
|  | 193             self._init_from_anndata(adata, model) | 
|  | 194         elif dataset is not None and gene_rules is not None: | 
|  | 195             # Mode 2: DataFrame + rules dict (ras_generator style) | 
|  | 196             self._init_from_dataframe(dataset, gene_rules, rules_total_string) | 
|  | 197         else: | 
|  | 198             raise ValueError( | 
|  | 199                 "Invalid initialization. Provide either:\n" | 
|  | 200                 "  - adata + model (for AnnData input), or\n" | 
|  | 201                 "  - dataset + gene_rules (for DataFrame input)" | 
|  | 202             ) | 
|  | 203 | 
|  | 204     def _normalize_gene_name(self, gene_name): | 
|  | 205         """Normalize gene names by replacing special characters.""" | 
|  | 206         return gene_name.replace("-", "_").replace(":", "_") | 
|  | 207 | 
|  | 208     def _normalize_rule(self, rule): | 
|  | 209         """Normalize GPR rule: lowercase operators, add spaces around parentheses, normalize gene names.""" | 
|  | 210         rule = rule.replace("OR", "or").replace("AND", "and") | 
|  | 211         rule = rule.replace("(", "( ").replace(")", " )") | 
|  | 212         # Normalize gene names in the rule | 
|  | 213         tokens = rule.split() | 
|  | 214         normalized_tokens = [token if token in self._logic_operators else self._normalize_gene_name(token) for token in tokens] | 
|  | 215         return " ".join(normalized_tokens) | 
|  | 216 | 
|  | 217     def _init_from_anndata(self, adata, model): | 
|  | 218         """Initialize from AnnData and COBRApy model (original mode).""" | 
|  | 219         # Build the dictionary for the GPRs | 
|  | 220         df_reactions = pd.DataFrame(index=[reaction.id for reaction in model.reactions]) | 
|  | 221         gene_rules = [self._normalize_rule(reaction.gene_reaction_rule) for reaction in model.reactions] | 
|  | 222         df_reactions['rule'] = gene_rules | 
|  | 223         df_reactions = df_reactions.reset_index() | 
|  | 224         df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x))) | 
|  | 225 | 
|  | 226         self.dict_rule_reactions = df_reactions.to_dict()['index'] | 
|  | 227 | 
|  | 228         # build useful structures for RAS computation | 
|  | 229         self.model = model | 
|  | 230         self.count_adata = adata.copy() | 
|  | 231 | 
|  | 232         # Normalize gene names in both model and dataset | 
|  | 233         model_genes = [self._normalize_gene_name(gene.id) for gene in model.genes] | 
|  | 234         dataset_genes = [self._normalize_gene_name(gene) for gene in self.count_adata.var.index] | 
|  | 235         self.genes = pd.Index(dataset_genes).intersection(model_genes) | 
|  | 236 | 
|  | 237         if len(self.genes) == 0: | 
|  | 238             raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.") | 
|  | 239 | 
|  | 240         self.cell_ids = list(self.count_adata.obs.index.values) | 
|  | 241         # Get expression data with normalized gene names | 
|  | 242         self.count_df_filtered = self.count_adata.to_df().T | 
|  | 243         self.count_df_filtered.index = [self._normalize_gene_name(g) for g in self.count_df_filtered.index] | 
|  | 244         self.count_df_filtered = self.count_df_filtered.loc[self.genes] | 
|  | 245 | 
|  | 246     def _init_from_dataframe(self, dataset, gene_rules, rules_total_string): | 
|  | 247         """Initialize from DataFrame and rules dict (ras_generator mode).""" | 
|  | 248         reactions = list(gene_rules.keys()) | 
|  | 249 | 
|  | 250         # Build the dictionary for the GPRs | 
|  | 251         df_reactions = pd.DataFrame(index=reactions) | 
|  | 252         gene_rules_list = [self._normalize_rule(gene_rules[reaction_id]) for reaction_id in reactions] | 
|  | 253         df_reactions['rule'] = gene_rules_list | 
|  | 254         df_reactions = df_reactions.reset_index() | 
|  | 255         df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x))) | 
|  | 256 | 
|  | 257         self.dict_rule_reactions = df_reactions.to_dict()['index'] | 
|  | 258 | 
|  | 259         # build useful structures for RAS computation | 
|  | 260         self.model = None | 
|  | 261         self.count_adata = None | 
|  | 262 | 
|  | 263         # Normalize gene names in dataset | 
|  | 264         dataset_normalized = dataset.copy() | 
|  | 265         dataset_normalized.index = [self._normalize_gene_name(g) for g in dataset_normalized.index] | 
|  | 266 | 
|  | 267         # Determine which genes are in both dataset and GPRs | 
|  | 268         if rules_total_string is not None: | 
|  | 269             rules_genes = [self._normalize_gene_name(g) for g in rules_total_string] | 
|  | 270             self.genes = dataset_normalized.index.intersection(rules_genes) | 
|  | 271         else: | 
|  | 272             # Extract all genes from rules | 
|  | 273             all_genes_in_rules = set() | 
|  | 274             for rule in gene_rules_list: | 
|  | 275                 tokens = rule.split() | 
|  | 276                 for token in tokens: | 
|  | 277                     if token not in self._logic_operators: | 
|  | 278                         all_genes_in_rules.add(token) | 
|  | 279             self.genes = dataset_normalized.index.intersection(all_genes_in_rules) | 
|  | 280 | 
|  | 281         if len(self.genes) == 0: | 
|  | 282             raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.") | 
|  | 283 | 
|  | 284         self.cell_ids = list(dataset_normalized.columns) | 
|  | 285         self.count_df_filtered = dataset_normalized.loc[self.genes] | 
|  | 286 | 
|  | 287     def compute(self, | 
|  | 288                 or_expression=np.sum,       # type of operation to do in case of an or expression (sum, max, mean) | 
|  | 289                 and_expression=np.min,      # type of operation to do in case of an and expression(min, sum) | 
|  | 290                 drop_na_rows=False,          # if True remove the nan rows of the ras  matrix | 
|  | 291                 drop_duplicates=False,      # if true, remove duplicates rows | 
|  | 292                 ignore_nan=True,            # if True, ignore NaN values in GPR evaluation (e.g., A or NaN -> A) | 
|  | 293                 print_progressbar=True,     # if True, print the progress bar | 
|  | 294                 add_count_metadata=True,    # if True add metadata of cells in the ras adata | 
|  | 295                 add_met_metadata=True,      # if True add metadata from the metabolic model (gpr and compartments of reactions) | 
|  | 296                 add_essential_reactions=False, | 
|  | 297                 add_essential_genes=False | 
|  | 298                 ): | 
|  | 299 | 
|  | 300         self.or_function = or_expression | 
|  | 301         self.and_function = and_expression | 
|  | 302 | 
|  | 303         ras_df = np.full((len(self.dict_rule_reactions), len(self.cell_ids)), np.nan) | 
|  | 304         genes_not_mapped = set()  # Track genes not in dataset | 
|  | 305 | 
|  | 306         if print_progressbar: | 
|  | 307             pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=len(self.dict_rule_reactions)).start() | 
|  | 308 | 
|  | 309         # Process each unique GPR rule | 
|  | 310         for ind, (rule, reaction_ids) in enumerate(self.dict_rule_reactions.items()): | 
|  | 311             if len(rule) == 0: | 
|  | 312                 # Empty rule - keep as NaN | 
|  | 313                 pass | 
|  | 314             else: | 
|  | 315                 # Extract genes from rule | 
|  | 316                 rule_genes = [token for token in rule.split() if token not in self._logic_operators] | 
|  | 317                 rule_genes_unique = list(set(rule_genes)) | 
|  | 318 | 
|  | 319                 # Which genes are in the dataset? | 
|  | 320                 genes_present = [g for g in rule_genes_unique if g in self.genes] | 
|  | 321                 genes_missing = [g for g in rule_genes_unique if g not in self.genes] | 
|  | 322 | 
|  | 323                 if genes_missing: | 
|  | 324                     genes_not_mapped.update(genes_missing) | 
|  | 325 | 
|  | 326                 if len(genes_present) == 0: | 
|  | 327                     # No genes in dataset - keep as NaN | 
|  | 328                     pass | 
|  | 329                 elif len(genes_missing) > 0 and not ignore_nan: | 
|  | 330                     # Some genes missing and we don't ignore NaN - set to NaN | 
|  | 331                     pass | 
|  | 332                 else: | 
|  | 333                     # Evaluate the GPR expression using AST | 
|  | 334                     # For single gene, AST handles it fine: ast.parse("GENE_A") works | 
|  | 335                     # more genes in the formula | 
|  | 336                     check_only_and=("and" in rule and "or" not in rule) #only and | 
|  | 337                     check_only_or=("or" in rule and "and" not in rule)  #only or | 
|  | 338                     if check_only_and or check_only_or: | 
|  | 339                         #or/and sequence | 
|  | 340                         matrix = self.count_df_filtered.loc[genes_present].values | 
|  | 341                         #compute for all cells | 
|  | 342                         if check_only_and: | 
|  | 343                             ras_df[ind] = self.and_function(matrix, axis=0) | 
|  | 344                         else: | 
|  | 345                             ras_df[ind] = self.or_function(matrix, axis=0) | 
|  | 346                     else: | 
|  | 347                         # complex expression (e.g. A or (B and C)) | 
|  | 348                         data = self.count_df_filtered.loc[genes_present]  # dataframe of genes in the GPRs | 
|  | 349                         tree = ast.parse(rule, mode="eval").body | 
|  | 350                         values_by_cell = [dict(zip(data.index, data[col].values)) for col in data.columns] | 
|  | 351                         for j, values in enumerate(values_by_cell): | 
|  | 352                             ras_df[ind, j] =self._evaluate_ast(tree, values, self.or_function, self.and_function, ignore_nan) | 
|  | 353 | 
|  | 354             if print_progressbar: | 
|  | 355                 pbar.update(ind + 1) | 
|  | 356 | 
|  | 357         if print_progressbar: | 
|  | 358             pbar.finish() | 
|  | 359 | 
|  | 360         # Store genes not mapped for later use | 
|  | 361         self.genes_not_mapped = sorted(genes_not_mapped) | 
|  | 362 | 
|  | 363         # create the dataframe of ras (rules x samples) | 
|  | 364         ras_df = pd.DataFrame(data=ras_df, index=range(len(self.dict_rule_reactions)), columns=self.cell_ids) | 
|  | 365         ras_df['Reactions'] = [reaction_ids for rule, reaction_ids in self.dict_rule_reactions.items()] | 
|  | 366 | 
|  | 367         reactions_common = pd.DataFrame() | 
|  | 368         reactions_common["Reactions"] = ras_df['Reactions'] | 
|  | 369         reactions_common["proof2"] = ras_df['Reactions'] | 
|  | 370         reactions_common = reactions_common.explode('Reactions') | 
|  | 371         reactions_common = reactions_common.set_index("Reactions") | 
|  | 372 | 
|  | 373         ras_df = ras_df.explode("Reactions") | 
|  | 374         ras_df = ras_df.set_index("Reactions") | 
|  | 375 | 
|  | 376         if drop_na_rows: | 
|  | 377             ras_df = ras_df.dropna(how="all") | 
|  | 378 | 
|  | 379         if drop_duplicates: | 
|  | 380             ras_df = ras_df.drop_duplicates() | 
|  | 381 | 
|  | 382         # If initialized from DataFrame (ras_generator mode), return DataFrame instead of AnnData | 
|  | 383         if self.count_adata is None: | 
|  | 384             return ras_df, self.genes_not_mapped | 
|  | 385 | 
|  | 386         # Original AnnData mode: create AnnData structure for RAS | 
|  | 387         ras_adata = AnnData(ras_df.T) | 
|  | 388 | 
|  | 389         #add metadata | 
|  | 390         if add_count_metadata: | 
|  | 391             ras_adata.var["common_gprs"] = reactions_common.loc[ras_df.index] | 
|  | 392             ras_adata.var["common_gprs"] = ras_adata.var["common_gprs"].apply(lambda x: ",".join(x)) | 
|  | 393             for el in self.count_adata.obs.columns: | 
|  | 394                 ras_adata.obs["countmatrix_"+el]=self.count_adata.obs[el] | 
|  | 395 | 
|  | 396         if add_met_metadata: | 
|  | 397             if self.model is not None and len(self.model.compartments)>0: | 
|  | 398                   ras_adata.var['compartments']=[list(self.model.reactions.get_by_id(reaction).compartments) for reaction in ras_adata.var.index] | 
|  | 399                   ras_adata.var['compartments']=ras_adata.var["compartments"].apply(lambda x: ",".join(x)) | 
|  | 400 | 
|  | 401             if self.model is not None: | 
|  | 402                 ras_adata.var['GPR rule'] = [self.model.reactions.get_by_id(reaction).gene_reaction_rule for reaction in ras_adata.var.index] | 
|  | 403 | 
|  | 404         if add_essential_reactions: | 
|  | 405             if self.model is not None: | 
|  | 406                 essential_reactions=find_essential_reactions(self.model) | 
|  | 407                 essential_reactions=[el.id for el in essential_reactions] | 
|  | 408                 ras_adata.var['essential reactions']=["yes" if el in essential_reactions else "no" for el in ras_adata.var.index] | 
|  | 409 | 
|  | 410         if add_essential_genes: | 
|  | 411             if self.model is not None: | 
|  | 412                 essential_genes=find_essential_genes(self.model) | 
|  | 413                 essential_genes=[el.id for el in essential_genes] | 
|  | 414                 ras_adata.var['essential genes']=[" ".join([gene for gene in genes.split()  if gene in essential_genes]) for genes in ras_adata.var["GPR rule"]] | 
|  | 415 | 
|  | 416         return ras_adata | 
|  | 417 | 
|  | 418     def _evaluate_ast(self, node, values, or_function, and_function, ignore_nan): | 
|  | 419         """ | 
|  | 420         Evaluate a boolean expression using AST (Abstract Syntax Tree). | 
|  | 421         Handles all GPR types: single gene, simple (A and B), nested (A or (B and C)). | 
|  | 422 | 
|  | 423         Args: | 
|  | 424             node: AST node to evaluate | 
|  | 425             values: Dictionary mapping gene names to their expression values | 
|  | 426             or_function: Function to apply for OR operations | 
|  | 427             and_function: Function to apply for AND operations | 
|  | 428             ignore_nan: If True, ignore None/NaN values (e.g., A or None -> A) | 
|  | 429 | 
|  | 430         Returns: | 
|  | 431             Evaluated expression result (float or np.nan) | 
|  | 432         """ | 
|  | 433         if isinstance(node, ast.BoolOp): | 
|  | 434             # Boolean operation (and/or) | 
|  | 435             vals = [self._evaluate_ast(v, values, or_function, and_function, ignore_nan) for v in node.values] | 
|  | 436 | 
|  | 437             if ignore_nan: | 
|  | 438                 # Filter out None/NaN values | 
|  | 439                 vals = [v for v in vals if v is not None and not (isinstance(v, float) and np.isnan(v))] | 
|  | 440 | 
|  | 441             if not vals: | 
|  | 442                 return np.nan | 
|  | 443 | 
|  | 444             if isinstance(node.op, ast.Or): | 
|  | 445                 return or_function(vals) | 
|  | 446             elif isinstance(node.op, ast.And): | 
|  | 447                 return and_function(vals) | 
|  | 448 | 
|  | 449         elif isinstance(node, ast.Name): | 
|  | 450             # Variable (gene name) | 
|  | 451             return values.get(node.id, None) | 
|  | 452         elif isinstance(node, ast.Constant): | 
|  | 453             # Constant (shouldn't happen in GPRs, but handle it) | 
|  | 454             return values.get(str(node.value), None) | 
|  | 455         else: | 
|  | 456             raise ValueError(f"Unexpected node type in GPR: {ast.dump(node)}") | 
|  | 457 | 
|  | 458 | 
|  | 459 # ============================================================================ | 
|  | 460 # STANDALONE FUNCTION FOR RAS_GENERATOR COMPATIBILITY | 
|  | 461 # ============================================================================ | 
|  | 462 | 
|  | 463 def computeRAS( | 
|  | 464     dataset, | 
|  | 465     gene_rules, | 
|  | 466     rules_total_string, | 
|  | 467     or_function=np.sum, | 
|  | 468     and_function=np.min, | 
|  | 469     ignore_nan=True | 
|  | 470 ): | 
|  | 471     """ | 
|  | 472     Compute RAS from tabular data and GPR rules (ras_generator.py compatible). | 
|  | 473 | 
|  | 474     This is a standalone function that wraps the RAS_computation class | 
|  | 475     to provide the same interface as ras_generator.py. | 
|  | 476 | 
|  | 477     Args: | 
|  | 478         dataset: pandas DataFrame with gene expression (genes × samples) | 
|  | 479         gene_rules: dict mapping reaction IDs to GPR strings | 
|  | 480         rules_total_string: list of all gene names in GPRs | 
|  | 481         or_function: function for OR operations (default: np.sum) | 
|  | 482         and_function: function for AND operations (default: np.min) | 
|  | 483         ignore_nan: if True, ignore NaN in GPR evaluation (default: True) | 
|  | 484 | 
|  | 485     Returns: | 
|  | 486         tuple: (ras_df, genes_not_mapped) | 
|  | 487             - ras_df: DataFrame with RAS values (reactions × samples) | 
|  | 488             - genes_not_mapped: list of genes in GPRs not found in dataset | 
|  | 489     """ | 
|  | 490     # Create RAS computation object in DataFrame mode | 
|  | 491     ras_obj = RAS_computation( | 
|  | 492         dataset=dataset, | 
|  | 493         gene_rules=gene_rules, | 
|  | 494         rules_total_string=rules_total_string | 
|  | 495     ) | 
|  | 496 | 
|  | 497     # Compute RAS | 
|  | 498     result = ras_obj.compute( | 
|  | 499         or_expression=or_function, | 
|  | 500         and_expression=and_function, | 
|  | 501         ignore_nan=ignore_nan, | 
|  | 502         print_progressbar=False,  # No progress bar for ras_generator | 
|  | 503         add_count_metadata=False,  # No metadata in DataFrame mode | 
|  | 504         add_met_metadata=False, | 
|  | 505         add_essential_reactions=False, | 
|  | 506         add_essential_genes=False | 
|  | 507     ) | 
|  | 508 | 
|  | 509     # Result is a tuple (ras_df, genes_not_mapped) in DataFrame mode | 
|  | 510     return result | 
|  | 511 | 
|  | 512 def main(args:List[str] = None) -> None: | 
|  | 513     """ | 
|  | 514     Initializes everything and sets the program in motion based on the fronted input arguments. | 
|  | 515 | 
|  | 516     Returns: | 
|  | 517         None | 
|  | 518     """ | 
|  | 519     # get args from frontend (related xml) | 
|  | 520     global ARGS | 
|  | 521     ARGS = process_args(args) | 
|  | 522 | 
|  | 523     # read dataset and remove versioning from gene names | 
|  | 524     dataset = read_dataset(ARGS.input, "dataset") | 
|  | 525     orig_gene_list=dataset.index.copy() | 
|  | 526     dataset.index =  [str(el.split(".")[0]) for el in dataset.index] | 
|  | 527 | 
|  | 528     #load GPR rules | 
|  | 529     rules = load_custom_rules() | 
|  | 530 | 
|  | 531     #create a list of all the gpr | 
|  | 532     rules_total_string="" | 
|  | 533     for id,rule in rules.items(): | 
|  | 534         rules_total_string+=rule.replace("(","").replace(")","") + " " | 
|  | 535     rules_total_string=list(set(rules_total_string.split(" "))) | 
|  | 536 | 
|  | 537     if any(dataset.index.duplicated(keep=False)): | 
|  | 538         genes_duplicates=orig_gene_list[dataset.index.duplicated(keep=False)] | 
|  | 539         genes_duplicates_in_model=[elem for elem in genes_duplicates if elem in rules_total_string] | 
|  | 540 | 
|  | 541         if len(genes_duplicates_in_model)>0:#metabolic genes have duplicated entries in the dataset | 
|  | 542             list_str=", ".join(genes_duplicates_in_model) | 
|  | 543             list_genes=f"ERROR: Duplicate entries in the gene dataset present in one or more GPR. The following metabolic genes are duplicated: "+list_str | 
|  | 544             raise ValueError(list_genes) | 
|  | 545         else: | 
|  | 546             list_str=", ".join(genes_duplicates) | 
|  | 547             list_genes=f"INFO: Duplicate entries in the gene dataset. The following genes are duplicated in the dataset but not mentioned in the GPRs: "+list_str | 
|  | 548             utils.logWarning(list_genes,ARGS.out_log) | 
|  | 549 | 
|  | 550     #check if nan value must be ignored in the GPR | 
|  | 551     if ARGS.none: | 
|  | 552     #    #e.g. (A or nan --> A) | 
|  | 553         ignore_nan = True | 
|  | 554     else: | 
|  | 555         #e.g. (A or nan --> nan) | 
|  | 556         ignore_nan = False | 
|  | 557 | 
|  | 558     #compure ras | 
|  | 559     ras_df,genes_not_mapped=computeRAS(dataset,rules, | 
|  | 560                     rules_total_string, | 
|  | 561                     or_function=np.sum,    # type of operation to do in case of an or expression (max, sum, mean) | 
|  | 562                     and_function=np.min, | 
|  | 563                     ignore_nan=ignore_nan) | 
|  | 564 | 
|  | 565     #save to csv and replace nan with None | 
|  | 566     ras_df.replace([np.nan,None],"None").to_csv(ARGS.ras_output, sep = '\t') | 
|  | 567 | 
|  | 568     #report genes not present in the data | 
|  | 569     if len(genes_not_mapped)>0: | 
|  | 570         genes_not_mapped_str=", ".join(genes_not_mapped) | 
|  | 571         utils.logWarning( | 
|  | 572         f"INFO: The following genes are mentioned in the GPR rules but don't appear in the dataset: "+genes_not_mapped_str, | 
|  | 573         ARGS.out_log) | 
|  | 574 | 
|  | 575     print("Execution succeeded") | 
|  | 576 | 
|  | 577 ############################################################################### | 
|  | 578 if __name__ == "__main__": | 
| 539 | 579     main() |