comparison COBRAxy/src/ras_generator.py @ 539:2fb97466e404 draft

Uploaded
author francesco_lapi
date Sat, 25 Oct 2025 14:55:13 +0000
parents
children fcdbc81feb45
comparison
equal deleted inserted replaced
538:fd53d42348bd 539:2fb97466e404
1 """
2 Generate Reaction Activity Scores (RAS) from a gene expression dataset and GPR rules.
3
4 The script reads a tabular dataset (genes x samples) and a rules file (GPRs),
5 computes RAS per reaction for each sample/cell line, and writes a tabular output.
6 """
7 from __future__ import division
8 import sys
9 import argparse
10 import pandas as pd
11 import numpy as np
12 import utils.general_utils as utils
13 from typing import List, Dict
14 import ast
15
16 # Optional imports for AnnData mode (not used in ras_generator.py)
17 try:
18 from progressbar import ProgressBar, Bar, Percentage
19 from scanpy import AnnData
20 from cobra.flux_analysis.variability import find_essential_reactions, find_essential_genes
21 except ImportError:
22 # These are only needed for AnnData mode, not for ras_generator.py
23 pass
24
25 ERRORS = []
26 ########################## argparse ##########################################
27 ARGS :argparse.Namespace
28 def process_args(args:List[str] = None) -> argparse.Namespace:
29 """
30 Processes command-line arguments.
31
32 Args:
33 args (list): List of command-line arguments.
34
35 Returns:
36 Namespace: An object containing parsed arguments.
37 """
38 parser = argparse.ArgumentParser(
39 usage = '%(prog)s [options]',
40 description = "process some value's genes to create a comparison's map.")
41
42 parser.add_argument("-rl", "--model_upload", type = str,
43 help = "path to input file containing the rules")
44
45 parser.add_argument("-rn", "--model_upload_name", type = str, help = "custom rules name")
46 # Galaxy converts files into .dat, this helps infer the original extension when needed.
47
48 parser.add_argument(
49 '-n', '--none',
50 type = utils.Bool("none"), default = True,
51 help = 'compute Nan values')
52
53 parser.add_argument(
54 '-td', '--tool_dir',
55 type = str,
56 required = True, help = 'your tool directory')
57
58 parser.add_argument(
59 '-ol', '--out_log',
60 type = str,
61 help = "Output log")
62
63 parser.add_argument(
64 '-in', '--input',
65 type = str,
66 help = 'input dataset')
67
68 parser.add_argument(
69 '-ra', '--ras_output',
70 type = str,
71 required = True, help = 'ras output')
72
73
74 return parser.parse_args(args)
75
76 ############################ dataset input ####################################
77 def read_dataset(data :str, name :str) -> pd.DataFrame:
78 """
79 Read a dataset from a CSV file and return it as a pandas DataFrame.
80
81 Args:
82 data (str): Path to the CSV file containing the dataset.
83 name (str): Name of the dataset, used in error messages.
84
85 Returns:
86 pandas.DataFrame: DataFrame containing the dataset.
87
88 Raises:
89 pd.errors.EmptyDataError: If the CSV file is empty.
90 sys.exit: If the CSV file has the wrong format, the execution is aborted.
91 """
92 try:
93 dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python', index_col=0)
94 dataset = dataset.astype(float)
95 except pd.errors.EmptyDataError:
96 sys.exit('Execution aborted: wrong file format of ' + name + '\n')
97 if len(dataset.columns) < 2:
98 sys.exit('Execution aborted: wrong file format of ' + name + '\n')
99 return dataset
100
101
102 def load_custom_rules() -> Dict[str,str]:
103 """
104 Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be
105 performed, significantly impacting the runtime.
106
107 Returns:
108 Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules.
109 """
110 datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload) # actual file, stored in Galaxy as a .dat
111
112 dict_rule = {}
113
114 try:
115 rows = utils.readCsv(datFilePath, delimiter = "\t", skipHeader=False)
116 if len(rows) <= 1:
117 raise ValueError("Model tabular with 1 column is not supported.")
118
119 if not rows:
120 raise ValueError("Model tabular is file is empty.")
121
122 id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR")
123
124 # First, try using a tab delimiter
125 for line in rows[1:]:
126 if len(line) <= idx_gpr:
127 utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
128 continue
129
130 dict_rule[line[id_idx]] = line[idx_gpr]
131
132 except Exception as e:
133 # If parsing with tabs fails, try comma delimiter
134 try:
135 rows = utils.readCsv(datFilePath, delimiter = ",", skipHeader=False)
136
137 if len(rows) <= 1:
138 raise ValueError("Model tabular with 1 column is not supported.")
139
140 if not rows:
141 raise ValueError("Model tabular is file is empty.")
142
143 id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR")
144
145 # Try again parsing row content with the GPR column using comma-separated values
146 for line in rows[1:]:
147 if len(line) <= idx_gpr:
148 utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
149 continue
150
151 dict_rule[line[id_idx]] =line[idx_gpr]
152
153 except Exception as e2:
154 raise ValueError(f"Unable to parse rules file. Tried both tab and comma delimiters. Original errors: Tab: {e}, Comma: {e2}")
155
156 if not dict_rule:
157 raise ValueError("No valid rules found in the uploaded file. Please check the file format.")
158 # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed.
159 return dict_rule
160
161
162 """
163 Class to compute the RAS values
164
165 """
166
167 class RAS_computation:
168
169 def __init__(self, adata=None, model=None, dataset=None, gene_rules=None, rules_total_string=None):
170 """
171 Initialize RAS computation with two possible input modes:
172
173 Mode 1 (Original - for sampling_main.py):
174 adata: AnnData object with gene expression (cells × genes)
175 model: COBRApy model object with reactions and GPRs
176
177 Mode 2 (New - for ras_generator.py):
178 dataset: pandas DataFrame with gene expression (genes × samples)
179 gene_rules: dict mapping reaction IDs to GPR strings
180 rules_total_string: list of all gene names in GPRs (for validation)
181 """
182 self._logic_operators = ['and', 'or', '(', ')']
183 self.val_nan = np.nan
184
185 # Determine which mode we're in
186 if adata is not None and model is not None:
187 # Mode 1: AnnData + COBRApy model (original)
188 self._init_from_anndata(adata, model)
189 elif dataset is not None and gene_rules is not None:
190 # Mode 2: DataFrame + rules dict (ras_generator style)
191 self._init_from_dataframe(dataset, gene_rules, rules_total_string)
192 else:
193 raise ValueError(
194 "Invalid initialization. Provide either:\n"
195 " - adata + model (for AnnData input), or\n"
196 " - dataset + gene_rules (for DataFrame input)"
197 )
198
199 def _normalize_gene_name(self, gene_name):
200 """Normalize gene names by replacing special characters."""
201 return gene_name.replace("-", "_").replace(":", "_")
202
203 def _normalize_rule(self, rule):
204 """Normalize GPR rule: lowercase operators, add spaces around parentheses, normalize gene names."""
205 rule = rule.replace("OR", "or").replace("AND", "and")
206 rule = rule.replace("(", "( ").replace(")", " )")
207 # Normalize gene names in the rule
208 tokens = rule.split()
209 normalized_tokens = [token if token in self._logic_operators else self._normalize_gene_name(token) for token in tokens]
210 return " ".join(normalized_tokens)
211
212 def _init_from_anndata(self, adata, model):
213 """Initialize from AnnData and COBRApy model (original mode)."""
214 # Build the dictionary for the GPRs
215 df_reactions = pd.DataFrame(index=[reaction.id for reaction in model.reactions])
216 gene_rules = [self._normalize_rule(reaction.gene_reaction_rule) for reaction in model.reactions]
217 df_reactions['rule'] = gene_rules
218 df_reactions = df_reactions.reset_index()
219 df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x)))
220
221 self.dict_rule_reactions = df_reactions.to_dict()['index']
222
223 # build useful structures for RAS computation
224 self.model = model
225 self.count_adata = adata.copy()
226
227 # Normalize gene names in both model and dataset
228 model_genes = [self._normalize_gene_name(gene.id) for gene in model.genes]
229 dataset_genes = [self._normalize_gene_name(gene) for gene in self.count_adata.var.index]
230 self.genes = pd.Index(dataset_genes).intersection(model_genes)
231
232 if len(self.genes) == 0:
233 raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.")
234
235 self.cell_ids = list(self.count_adata.obs.index.values)
236 # Get expression data with normalized gene names
237 self.count_df_filtered = self.count_adata.to_df().T
238 self.count_df_filtered.index = [self._normalize_gene_name(g) for g in self.count_df_filtered.index]
239 self.count_df_filtered = self.count_df_filtered.loc[self.genes]
240
241 def _init_from_dataframe(self, dataset, gene_rules, rules_total_string):
242 """Initialize from DataFrame and rules dict (ras_generator mode)."""
243 reactions = list(gene_rules.keys())
244
245 # Build the dictionary for the GPRs
246 df_reactions = pd.DataFrame(index=reactions)
247 gene_rules_list = [self._normalize_rule(gene_rules[reaction_id]) for reaction_id in reactions]
248 df_reactions['rule'] = gene_rules_list
249 df_reactions = df_reactions.reset_index()
250 df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x)))
251
252 self.dict_rule_reactions = df_reactions.to_dict()['index']
253
254 # build useful structures for RAS computation
255 self.model = None
256 self.count_adata = None
257
258 # Normalize gene names in dataset
259 dataset_normalized = dataset.copy()
260 dataset_normalized.index = [self._normalize_gene_name(g) for g in dataset_normalized.index]
261
262 # Determine which genes are in both dataset and GPRs
263 if rules_total_string is not None:
264 rules_genes = [self._normalize_gene_name(g) for g in rules_total_string]
265 self.genes = dataset_normalized.index.intersection(rules_genes)
266 else:
267 # Extract all genes from rules
268 all_genes_in_rules = set()
269 for rule in gene_rules_list:
270 tokens = rule.split()
271 for token in tokens:
272 if token not in self._logic_operators:
273 all_genes_in_rules.add(token)
274 self.genes = dataset_normalized.index.intersection(all_genes_in_rules)
275
276 if len(self.genes) == 0:
277 raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.")
278
279 self.cell_ids = list(dataset_normalized.columns)
280 self.count_df_filtered = dataset_normalized.loc[self.genes]
281
282 def compute(self,
283 or_expression=np.sum, # type of operation to do in case of an or expression (sum, max, mean)
284 and_expression=np.min, # type of operation to do in case of an and expression(min, sum)
285 drop_na_rows=False, # if True remove the nan rows of the ras matrix
286 drop_duplicates=False, # if true, remove duplicates rows
287 ignore_nan=True, # if True, ignore NaN values in GPR evaluation (e.g., A or NaN -> A)
288 print_progressbar=True, # if True, print the progress bar
289 add_count_metadata=True, # if True add metadata of cells in the ras adata
290 add_met_metadata=True, # if True add metadata from the metabolic model (gpr and compartments of reactions)
291 add_essential_reactions=False,
292 add_essential_genes=False
293 ):
294
295 self.or_function = or_expression
296 self.and_function = and_expression
297
298 ras_df = np.full((len(self.dict_rule_reactions), len(self.cell_ids)), np.nan)
299 genes_not_mapped = set() # Track genes not in dataset
300
301 if print_progressbar:
302 pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=len(self.dict_rule_reactions)).start()
303
304 # Process each unique GPR rule
305 for ind, (rule, reaction_ids) in enumerate(self.dict_rule_reactions.items()):
306 if len(rule) == 0:
307 # Empty rule - keep as NaN
308 pass
309 else:
310 # Extract genes from rule
311 rule_genes = [token for token in rule.split() if token not in self._logic_operators]
312 rule_genes_unique = list(set(rule_genes))
313
314 # Which genes are in the dataset?
315 genes_present = [g for g in rule_genes_unique if g in self.genes]
316 genes_missing = [g for g in rule_genes_unique if g not in self.genes]
317
318 if genes_missing:
319 genes_not_mapped.update(genes_missing)
320
321 if len(genes_present) == 0:
322 # No genes in dataset - keep as NaN
323 pass
324 elif len(genes_missing) > 0 and not ignore_nan:
325 # Some genes missing and we don't ignore NaN - set to NaN
326 pass
327 else:
328 # Evaluate the GPR expression using AST
329 # For single gene, AST handles it fine: ast.parse("GENE_A") works
330 # more genes in the formula
331 check_only_and=("and" in rule and "or" not in rule) #only and
332 check_only_or=("or" in rule and "and" not in rule) #only or
333 if check_only_and or check_only_or:
334 #or/and sequence
335 matrix = self.count_df_filtered.loc[genes_present].values
336 #compute for all cells
337 if check_only_and:
338 ras_df[ind] = self.and_function(matrix, axis=0)
339 else:
340 ras_df[ind] = self.or_function(matrix, axis=0)
341 else:
342 # complex expression (e.g. A or (B and C))
343 data = self.count_df_filtered.loc[genes_present] # dataframe of genes in the GPRs
344 tree = ast.parse(rule, mode="eval").body
345 values_by_cell = [dict(zip(data.index, data[col].values)) for col in data.columns]
346 for j, values in enumerate(values_by_cell):
347 ras_df[ind, j] =self._evaluate_ast(tree, values, self.or_function, self.and_function, ignore_nan)
348
349 if print_progressbar:
350 pbar.update(ind + 1)
351
352 if print_progressbar:
353 pbar.finish()
354
355 # Store genes not mapped for later use
356 self.genes_not_mapped = sorted(genes_not_mapped)
357
358 # create the dataframe of ras (rules x samples)
359 ras_df = pd.DataFrame(data=ras_df, index=range(len(self.dict_rule_reactions)), columns=self.cell_ids)
360 ras_df['Reactions'] = [reaction_ids for rule, reaction_ids in self.dict_rule_reactions.items()]
361
362 reactions_common = pd.DataFrame()
363 reactions_common["Reactions"] = ras_df['Reactions']
364 reactions_common["proof2"] = ras_df['Reactions']
365 reactions_common = reactions_common.explode('Reactions')
366 reactions_common = reactions_common.set_index("Reactions")
367
368 ras_df = ras_df.explode("Reactions")
369 ras_df = ras_df.set_index("Reactions")
370
371 if drop_na_rows:
372 ras_df = ras_df.dropna(how="all")
373
374 if drop_duplicates:
375 ras_df = ras_df.drop_duplicates()
376
377 # If initialized from DataFrame (ras_generator mode), return DataFrame instead of AnnData
378 if self.count_adata is None:
379 return ras_df, self.genes_not_mapped
380
381 # Original AnnData mode: create AnnData structure for RAS
382 ras_adata = AnnData(ras_df.T)
383
384 #add metadata
385 if add_count_metadata:
386 ras_adata.var["common_gprs"] = reactions_common.loc[ras_df.index]
387 ras_adata.var["common_gprs"] = ras_adata.var["common_gprs"].apply(lambda x: ",".join(x))
388 for el in self.count_adata.obs.columns:
389 ras_adata.obs["countmatrix_"+el]=self.count_adata.obs[el]
390
391 if add_met_metadata:
392 if self.model is not None and len(self.model.compartments)>0:
393 ras_adata.var['compartments']=[list(self.model.reactions.get_by_id(reaction).compartments) for reaction in ras_adata.var.index]
394 ras_adata.var['compartments']=ras_adata.var["compartments"].apply(lambda x: ",".join(x))
395
396 if self.model is not None:
397 ras_adata.var['GPR rule'] = [self.model.reactions.get_by_id(reaction).gene_reaction_rule for reaction in ras_adata.var.index]
398
399 if add_essential_reactions:
400 if self.model is not None:
401 essential_reactions=find_essential_reactions(self.model)
402 essential_reactions=[el.id for el in essential_reactions]
403 ras_adata.var['essential reactions']=["yes" if el in essential_reactions else "no" for el in ras_adata.var.index]
404
405 if add_essential_genes:
406 if self.model is not None:
407 essential_genes=find_essential_genes(self.model)
408 essential_genes=[el.id for el in essential_genes]
409 ras_adata.var['essential genes']=[" ".join([gene for gene in genes.split() if gene in essential_genes]) for genes in ras_adata.var["GPR rule"]]
410
411 return ras_adata
412
413 def _evaluate_ast(self, node, values, or_function, and_function, ignore_nan):
414 """
415 Evaluate a boolean expression using AST (Abstract Syntax Tree).
416 Handles all GPR types: single gene, simple (A and B), nested (A or (B and C)).
417
418 Args:
419 node: AST node to evaluate
420 values: Dictionary mapping gene names to their expression values
421 or_function: Function to apply for OR operations
422 and_function: Function to apply for AND operations
423 ignore_nan: If True, ignore None/NaN values (e.g., A or None -> A)
424
425 Returns:
426 Evaluated expression result (float or np.nan)
427 """
428 if isinstance(node, ast.BoolOp):
429 # Boolean operation (and/or)
430 vals = [self._evaluate_ast(v, values, or_function, and_function, ignore_nan) for v in node.values]
431
432 if ignore_nan:
433 # Filter out None/NaN values
434 vals = [v for v in vals if v is not None and not (isinstance(v, float) and np.isnan(v))]
435
436 if not vals:
437 return np.nan
438
439 if isinstance(node.op, ast.Or):
440 return or_function(vals)
441 elif isinstance(node.op, ast.And):
442 return and_function(vals)
443
444 elif isinstance(node, ast.Name):
445 # Variable (gene name)
446 return values.get(node.id, None)
447 elif isinstance(node, ast.Constant):
448 # Constant (shouldn't happen in GPRs, but handle it)
449 return values.get(str(node.value), None)
450 else:
451 raise ValueError(f"Unexpected node type in GPR: {ast.dump(node)}")
452
453
454 # ============================================================================
455 # STANDALONE FUNCTION FOR RAS_GENERATOR COMPATIBILITY
456 # ============================================================================
457
458 def computeRAS(
459 dataset,
460 gene_rules,
461 rules_total_string,
462 or_function=np.sum,
463 and_function=np.min,
464 ignore_nan=True
465 ):
466 """
467 Compute RAS from tabular data and GPR rules (ras_generator.py compatible).
468
469 This is a standalone function that wraps the RAS_computation class
470 to provide the same interface as ras_generator.py.
471
472 Args:
473 dataset: pandas DataFrame with gene expression (genes × samples)
474 gene_rules: dict mapping reaction IDs to GPR strings
475 rules_total_string: list of all gene names in GPRs
476 or_function: function for OR operations (default: np.sum)
477 and_function: function for AND operations (default: np.min)
478 ignore_nan: if True, ignore NaN in GPR evaluation (default: True)
479
480 Returns:
481 tuple: (ras_df, genes_not_mapped)
482 - ras_df: DataFrame with RAS values (reactions × samples)
483 - genes_not_mapped: list of genes in GPRs not found in dataset
484 """
485 # Create RAS computation object in DataFrame mode
486 ras_obj = RAS_computation(
487 dataset=dataset,
488 gene_rules=gene_rules,
489 rules_total_string=rules_total_string
490 )
491
492 # Compute RAS
493 result = ras_obj.compute(
494 or_expression=or_function,
495 and_expression=and_function,
496 ignore_nan=ignore_nan,
497 print_progressbar=False, # No progress bar for ras_generator
498 add_count_metadata=False, # No metadata in DataFrame mode
499 add_met_metadata=False,
500 add_essential_reactions=False,
501 add_essential_genes=False
502 )
503
504 # Result is a tuple (ras_df, genes_not_mapped) in DataFrame mode
505 return result
506
507 def main(args:List[str] = None) -> None:
508 """
509 Initializes everything and sets the program in motion based on the fronted input arguments.
510
511 Returns:
512 None
513 """
514 # get args from frontend (related xml)
515 global ARGS
516 ARGS = process_args(args)
517
518 # read dataset and remove versioning from gene names
519 dataset = read_dataset(ARGS.input, "dataset")
520 orig_gene_list=dataset.index.copy()
521 dataset.index = [str(el.split(".")[0]) for el in dataset.index]
522
523 #load GPR rules
524 rules = load_custom_rules()
525
526 #create a list of all the gpr
527 rules_total_string=""
528 for id,rule in rules.items():
529 rules_total_string+=rule.replace("(","").replace(")","") + " "
530 rules_total_string=list(set(rules_total_string.split(" ")))
531
532 if any(dataset.index.duplicated(keep=False)):
533 genes_duplicates=orig_gene_list[dataset.index.duplicated(keep=False)]
534 genes_duplicates_in_model=[elem for elem in genes_duplicates if elem in rules_total_string]
535
536 if len(genes_duplicates_in_model)>0:#metabolic genes have duplicated entries in the dataset
537 list_str=", ".join(genes_duplicates_in_model)
538 list_genes=f"ERROR: Duplicate entries in the gene dataset present in one or more GPR. The following metabolic genes are duplicated: "+list_str
539 raise ValueError(list_genes)
540 else:
541 list_str=", ".join(genes_duplicates)
542 list_genes=f"INFO: Duplicate entries in the gene dataset. The following genes are duplicated in the dataset but not mentioned in the GPRs: "+list_str
543 utils.logWarning(list_genes,ARGS.out_log)
544
545 #check if nan value must be ignored in the GPR
546 if ARGS.none:
547 # #e.g. (A or nan --> A)
548 ignore_nan = True
549 else:
550 #e.g. (A or nan --> nan)
551 ignore_nan = False
552
553 #compure ras
554 ras_df,genes_not_mapped=computeRAS(dataset,rules,
555 rules_total_string,
556 or_function=np.sum, # type of operation to do in case of an or expression (max, sum, mean)
557 and_function=np.min,
558 ignore_nan=ignore_nan)
559
560 #save to csv and replace nan with None
561 ras_df.replace([np.nan,None],"None").to_csv(ARGS.ras_output, sep = '\t')
562
563 #report genes not present in the data
564 if len(genes_not_mapped)>0:
565 genes_not_mapped_str=", ".join(genes_not_mapped)
566 utils.logWarning(
567 f"INFO: The following genes are mentioned in the GPR rules but don't appear in the dataset: "+genes_not_mapped_str,
568 ARGS.out_log)
569
570 print("Execution succeeded")
571
572 ###############################################################################
573 if __name__ == "__main__":
574 main()