Mercurial > repos > bimib > cobraxy
comparison COBRAxy/ras_generator.py @ 529:6acd64232dad draft
Uploaded
author | francesco_lapi |
---|---|
date | Wed, 22 Oct 2025 12:05:30 +0000 |
parents | b02cfa3b36dd |
children | 352c51a39e23 |
comparison
equal
deleted
inserted
replaced
528:698802db290d | 529:6acd64232dad |
---|---|
8 import sys | 8 import sys |
9 import argparse | 9 import argparse |
10 import pandas as pd | 10 import pandas as pd |
11 import numpy as np | 11 import numpy as np |
12 import utils.general_utils as utils | 12 import utils.general_utils as utils |
13 from typing import List, Dict | 13 from typing import List, Dict |
14 import ast | 14 import ast |
15 | |
16 # Optional imports for AnnData mode (not used in ras_generator.py) | |
17 try: | |
18 from progressbar import ProgressBar, Bar, Percentage | |
19 from scanpy import AnnData | |
20 from cobra.flux_analysis.variability import find_essential_reactions, find_essential_genes | |
21 except ImportError: | |
22 # These are only needed for AnnData mode, not for ras_generator.py | |
23 pass | |
15 | 24 |
16 ERRORS = [] | 25 ERRORS = [] |
17 ########################## argparse ########################################## | 26 ########################## argparse ########################################## |
18 ARGS :argparse.Namespace | 27 ARGS :argparse.Namespace |
19 def process_args(args:List[str] = None) -> argparse.Namespace: | 28 def process_args(args:List[str] = None) -> argparse.Namespace: |
148 raise ValueError("No valid rules found in the uploaded file. Please check the file format.") | 157 raise ValueError("No valid rules found in the uploaded file. Please check the file format.") |
149 # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed. | 158 # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed. |
150 return dict_rule | 159 return dict_rule |
151 | 160 |
152 | 161 |
162 """ | |
163 Class to compute the RAS values | |
164 | |
165 """ | |
166 | |
167 class RAS_computation: | |
168 | |
169 def __init__(self, adata=None, model=None, dataset=None, gene_rules=None, rules_total_string=None): | |
170 """ | |
171 Initialize RAS computation with two possible input modes: | |
172 | |
173 Mode 1 (Original - for sampling_main.py): | |
174 adata: AnnData object with gene expression (cells × genes) | |
175 model: COBRApy model object with reactions and GPRs | |
176 | |
177 Mode 2 (New - for ras_generator.py): | |
178 dataset: pandas DataFrame with gene expression (genes × samples) | |
179 gene_rules: dict mapping reaction IDs to GPR strings | |
180 rules_total_string: list of all gene names in GPRs (for validation) | |
181 """ | |
182 self._logic_operators = ['and', 'or', '(', ')'] | |
183 self.val_nan = np.nan | |
184 | |
185 # Determine which mode we're in | |
186 if adata is not None and model is not None: | |
187 # Mode 1: AnnData + COBRApy model (original) | |
188 self._init_from_anndata(adata, model) | |
189 elif dataset is not None and gene_rules is not None: | |
190 # Mode 2: DataFrame + rules dict (ras_generator style) | |
191 self._init_from_dataframe(dataset, gene_rules, rules_total_string) | |
192 else: | |
193 raise ValueError( | |
194 "Invalid initialization. Provide either:\n" | |
195 " - adata + model (for AnnData input), or\n" | |
196 " - dataset + gene_rules (for DataFrame input)" | |
197 ) | |
198 | |
199 def _normalize_gene_name(self, gene_name): | |
200 """Normalize gene names by replacing special characters.""" | |
201 return gene_name.replace("-", "_").replace(":", "_") | |
202 | |
203 def _normalize_rule(self, rule): | |
204 """Normalize GPR rule: lowercase operators, add spaces around parentheses, normalize gene names.""" | |
205 rule = rule.replace("OR", "or").replace("AND", "and") | |
206 rule = rule.replace("(", "( ").replace(")", " )") | |
207 # Normalize gene names in the rule | |
208 tokens = rule.split() | |
209 normalized_tokens = [token if token in self._logic_operators else self._normalize_gene_name(token) for token in tokens] | |
210 return " ".join(normalized_tokens) | |
211 | |
212 def _init_from_anndata(self, adata, model): | |
213 """Initialize from AnnData and COBRApy model (original mode).""" | |
214 # Build the dictionary for the GPRs | |
215 df_reactions = pd.DataFrame(index=[reaction.id for reaction in model.reactions]) | |
216 gene_rules = [self._normalize_rule(reaction.gene_reaction_rule) for reaction in model.reactions] | |
217 df_reactions['rule'] = gene_rules | |
218 df_reactions = df_reactions.reset_index() | |
219 df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x))) | |
220 | |
221 self.dict_rule_reactions = df_reactions.to_dict()['index'] | |
222 | |
223 # build useful structures for RAS computation | |
224 self.model = model | |
225 self.count_adata = adata.copy() | |
226 | |
227 # Normalize gene names in both model and dataset | |
228 model_genes = [self._normalize_gene_name(gene.id) for gene in model.genes] | |
229 dataset_genes = [self._normalize_gene_name(gene) for gene in self.count_adata.var.index] | |
230 self.genes = pd.Index(dataset_genes).intersection(model_genes) | |
231 | |
232 if len(self.genes) == 0: | |
233 raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.") | |
234 | |
235 self.cell_ids = list(self.count_adata.obs.index.values) | |
236 # Get expression data with normalized gene names | |
237 self.count_df_filtered = self.count_adata.to_df().T | |
238 self.count_df_filtered.index = [self._normalize_gene_name(g) for g in self.count_df_filtered.index] | |
239 self.count_df_filtered = self.count_df_filtered.loc[self.genes] | |
240 | |
241 def _init_from_dataframe(self, dataset, gene_rules, rules_total_string): | |
242 """Initialize from DataFrame and rules dict (ras_generator mode).""" | |
243 reactions = list(gene_rules.keys()) | |
244 | |
245 # Build the dictionary for the GPRs | |
246 df_reactions = pd.DataFrame(index=reactions) | |
247 gene_rules_list = [self._normalize_rule(gene_rules[reaction_id]) for reaction_id in reactions] | |
248 df_reactions['rule'] = gene_rules_list | |
249 df_reactions = df_reactions.reset_index() | |
250 df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x))) | |
251 | |
252 self.dict_rule_reactions = df_reactions.to_dict()['index'] | |
253 | |
254 # build useful structures for RAS computation | |
255 self.model = None | |
256 self.count_adata = None | |
257 | |
258 # Normalize gene names in dataset | |
259 dataset_normalized = dataset.copy() | |
260 dataset_normalized.index = [self._normalize_gene_name(g) for g in dataset_normalized.index] | |
261 | |
262 # Determine which genes are in both dataset and GPRs | |
263 if rules_total_string is not None: | |
264 rules_genes = [self._normalize_gene_name(g) for g in rules_total_string] | |
265 self.genes = dataset_normalized.index.intersection(rules_genes) | |
266 else: | |
267 # Extract all genes from rules | |
268 all_genes_in_rules = set() | |
269 for rule in gene_rules_list: | |
270 tokens = rule.split() | |
271 for token in tokens: | |
272 if token not in self._logic_operators: | |
273 all_genes_in_rules.add(token) | |
274 self.genes = dataset_normalized.index.intersection(all_genes_in_rules) | |
275 | |
276 if len(self.genes) == 0: | |
277 raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.") | |
278 | |
279 self.cell_ids = list(dataset_normalized.columns) | |
280 self.count_df_filtered = dataset_normalized.loc[self.genes] | |
281 | |
282 def compute(self, | |
283 or_expression=np.sum, # type of operation to do in case of an or expression (sum, max, mean) | |
284 and_expression=np.min, # type of operation to do in case of an and expression(min, sum) | |
285 drop_na_rows=True, # if True remove the nan rows of the ras matrix | |
286 drop_duplicates=False, # if true, remove duplicates rows | |
287 ignore_nan=True, # if True, ignore NaN values in GPR evaluation (e.g., A or NaN -> A) | |
288 print_progressbar=True, # if True, print the progress bar | |
289 add_count_metadata=True, # if True add metadata of cells in the ras adata | |
290 add_met_metadata=True, # if True add metadata from the metabolic model (gpr and compartments of reactions) | |
291 add_essential_reactions=False, | |
292 add_essential_genes=False | |
293 ): | |
294 | |
295 self.or_function = or_expression | |
296 self.and_function = and_expression | |
297 | |
298 ras_df = np.full((len(self.dict_rule_reactions), len(self.cell_ids)), np.nan) | |
299 genes_not_mapped = set() # Track genes not in dataset | |
300 | |
301 if print_progressbar: | |
302 pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=len(self.dict_rule_reactions)).start() | |
303 | |
304 # Process each unique GPR rule | |
305 for ind, (rule, reaction_ids) in enumerate(self.dict_rule_reactions.items()): | |
306 if len(rule) == 0: | |
307 # Empty rule - keep as NaN | |
308 pass | |
309 else: | |
310 # Extract genes from rule | |
311 rule_genes = [token for token in rule.split() if token not in self._logic_operators] | |
312 rule_genes_unique = list(set(rule_genes)) | |
313 | |
314 # Which genes are in the dataset? | |
315 genes_present = [g for g in rule_genes_unique if g in self.genes] | |
316 genes_missing = [g for g in rule_genes_unique if g not in self.genes] | |
317 | |
318 if genes_missing: | |
319 genes_not_mapped.update(genes_missing) | |
320 | |
321 if len(genes_present) == 0: | |
322 # No genes in dataset - keep as NaN | |
323 pass | |
324 elif len(genes_missing) > 0 and not ignore_nan: | |
325 # Some genes missing and we don't ignore NaN - set to NaN | |
326 pass | |
327 else: | |
328 # Evaluate the GPR expression using AST | |
329 # For single gene, AST handles it fine: ast.parse("GENE_A") works | |
330 try: | |
331 tree = ast.parse(rule, mode="eval").body | |
332 data = self.count_df_filtered.loc[genes_present] | |
333 | |
334 # Evaluate for each cell/sample | |
335 for j, col in enumerate(data.columns): | |
336 gene_values = dict(zip(data.index, data[col].values)) | |
337 ras_df[ind, j] = self._evaluate_ast(tree, gene_values, self.or_function, self.and_function, ignore_nan) | |
338 except: | |
339 # If parsing fails, keep as NaN | |
340 pass | |
341 | |
342 if print_progressbar: | |
343 pbar.update(ind + 1) | |
344 | |
345 if print_progressbar: | |
346 pbar.finish() | |
347 | |
348 # Store genes not mapped for later use | |
349 self.genes_not_mapped = sorted(genes_not_mapped) | |
350 | |
351 # create the dataframe of ras (rules x samples) | |
352 ras_df = pd.DataFrame(data=ras_df, index=range(len(self.dict_rule_reactions)), columns=self.cell_ids) | |
353 ras_df['REACTIONS'] = [reaction_ids for rule, reaction_ids in self.dict_rule_reactions.items()] | |
354 | |
355 reactions_common = pd.DataFrame() | |
356 reactions_common["REACTIONS"] = ras_df['REACTIONS'] | |
357 reactions_common["proof2"] = ras_df['REACTIONS'] | |
358 reactions_common = reactions_common.explode('REACTIONS') | |
359 reactions_common = reactions_common.set_index("REACTIONS") | |
360 | |
361 ras_df = ras_df.explode("REACTIONS") | |
362 ras_df = ras_df.set_index("REACTIONS") | |
363 | |
364 if drop_na_rows: | |
365 ras_df = ras_df.dropna(how="all") | |
366 | |
367 if drop_duplicates: | |
368 ras_df = ras_df.drop_duplicates() | |
369 | |
370 # If initialized from DataFrame (ras_generator mode), return DataFrame instead of AnnData | |
371 if self.count_adata is None: | |
372 return ras_df, self.genes_not_mapped | |
373 | |
374 # Original AnnData mode: create AnnData structure for RAS | |
375 ras_adata = AnnData(ras_df.T) | |
376 | |
377 #add metadata | |
378 if add_count_metadata: | |
379 ras_adata.var["common_gprs"] = reactions_common.loc[ras_df.index] | |
380 ras_adata.var["common_gprs"] = ras_adata.var["common_gprs"].apply(lambda x: ",".join(x)) | |
381 for el in self.count_adata.obs.columns: | |
382 ras_adata.obs["countmatrix_"+el]=self.count_adata.obs[el] | |
383 | |
384 if add_met_metadata: | |
385 if self.model is not None and len(self.model.compartments)>0: | |
386 ras_adata.var['compartments']=[list(self.model.reactions.get_by_id(reaction).compartments) for reaction in ras_adata.var.index] | |
387 ras_adata.var['compartments']=ras_adata.var["compartments"].apply(lambda x: ",".join(x)) | |
388 | |
389 if self.model is not None: | |
390 ras_adata.var['GPR rule'] = [self.model.reactions.get_by_id(reaction).gene_reaction_rule for reaction in ras_adata.var.index] | |
391 | |
392 if add_essential_reactions: | |
393 if self.model is not None: | |
394 essential_reactions=find_essential_reactions(self.model) | |
395 essential_reactions=[el.id for el in essential_reactions] | |
396 ras_adata.var['essential reactions']=["yes" if el in essential_reactions else "no" for el in ras_adata.var.index] | |
397 | |
398 if add_essential_genes: | |
399 if self.model is not None: | |
400 essential_genes=find_essential_genes(self.model) | |
401 essential_genes=[el.id for el in essential_genes] | |
402 ras_adata.var['essential genes']=[" ".join([gene for gene in genes.split() if gene in essential_genes]) for genes in ras_adata.var["GPR rule"]] | |
403 | |
404 return ras_adata | |
405 | |
406 def _evaluate_ast(self, node, values, or_function, and_function, ignore_nan): | |
407 """ | |
408 Evaluate a boolean expression using AST (Abstract Syntax Tree). | |
409 Handles all GPR types: single gene, simple (A and B), nested (A or (B and C)). | |
410 | |
411 Args: | |
412 node: AST node to evaluate | |
413 values: Dictionary mapping gene names to their expression values | |
414 or_function: Function to apply for OR operations | |
415 and_function: Function to apply for AND operations | |
416 ignore_nan: If True, ignore None/NaN values (e.g., A or None -> A) | |
417 | |
418 Returns: | |
419 Evaluated expression result (float or np.nan) | |
420 """ | |
421 if isinstance(node, ast.BoolOp): | |
422 # Boolean operation (and/or) | |
423 vals = [self._evaluate_ast(v, values, or_function, and_function, ignore_nan) for v in node.values] | |
424 | |
425 if ignore_nan: | |
426 # Filter out None/NaN values | |
427 vals = [v for v in vals if v is not None and not (isinstance(v, float) and np.isnan(v))] | |
428 | |
429 if not vals: | |
430 return np.nan | |
431 | |
432 if isinstance(node.op, ast.Or): | |
433 return or_function(vals) | |
434 elif isinstance(node.op, ast.And): | |
435 return and_function(vals) | |
436 | |
437 elif isinstance(node, ast.Name): | |
438 # Variable (gene name) | |
439 return values.get(node.id, None) | |
440 elif isinstance(node, ast.Constant): | |
441 # Constant (shouldn't happen in GPRs, but handle it) | |
442 return values.get(str(node.value), None) | |
443 else: | |
444 raise ValueError(f"Unexpected node type in GPR: {ast.dump(node)}") | |
445 | |
446 | |
447 # ============================================================================ | |
448 # STANDALONE FUNCTION FOR RAS_GENERATOR COMPATIBILITY | |
449 # ============================================================================ | |
153 | 450 |
154 def computeRAS( | 451 def computeRAS( |
155 dataset,gene_rules,rules_total_string, | 452 dataset, |
156 or_function=np.sum, # type of operation to do in case of an or expression (max, sum, mean) | 453 gene_rules, |
157 and_function=np.min, # type of operation to do in case of an and expression(min, sum) | 454 rules_total_string, |
158 ignore_nan = True | 455 or_function=np.sum, |
159 ): | 456 and_function=np.min, |
160 | 457 ignore_nan=True |
161 | 458 ): |
162 logic_operators = ['and', 'or', '(', ')'] | 459 """ |
163 reactions=list(gene_rules.keys()) | 460 Compute RAS from tabular data and GPR rules (ras_generator.py compatible). |
164 | 461 |
165 # Build the dictionary for the GPRs | 462 This is a standalone function that wraps the RAS_computation class |
166 df_reactions = pd.DataFrame(index=reactions) | 463 to provide the same interface as ras_generator.py. |
167 gene_rules=[gene_rules[reaction_id].replace("OR","or").replace("AND","and").replace("(","( ").replace(")"," )") for reaction_id in reactions] | 464 |
168 df_reactions['rule'] = gene_rules | 465 Args: |
169 df_reactions = df_reactions.reset_index() | 466 dataset: pandas DataFrame with gene expression (genes × samples) |
170 df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x))) | 467 gene_rules: dict mapping reaction IDs to GPR strings |
171 | 468 rules_total_string: list of all gene names in GPRs |
172 dict_rule_reactions = df_reactions.to_dict()['index'] | 469 or_function: function for OR operations (default: np.sum) |
173 | 470 and_function: function for AND operations (default: np.min) |
174 # build useful structures for RAS computation | 471 ignore_nan: if True, ignore NaN in GPR evaluation (default: True) |
175 genes =dataset.index.intersection(rules_total_string) | 472 |
176 | 473 Returns: |
177 #check if there is one gene at least | 474 tuple: (ras_df, genes_not_mapped) |
178 if len(genes)==0: | 475 - ras_df: DataFrame with RAS values (reactions × samples) |
179 raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.") | 476 - genes_not_mapped: list of genes in GPRs not found in dataset |
180 | 477 """ |
181 cell_ids = list(dataset.columns) | 478 # Create RAS computation object in DataFrame mode |
182 count_df_filtered = dataset.loc[genes] | 479 ras_obj = RAS_computation( |
183 count_df_filtered = count_df_filtered.rename(index=lambda x: x.replace("-", "_").replace(":", "_")) | 480 dataset=dataset, |
184 | 481 gene_rules=gene_rules, |
185 ras_df=np.full((len(dict_rule_reactions), len(cell_ids)), np.nan) | 482 rules_total_string=rules_total_string |
186 | 483 ) |
187 # for loop on rules | 484 |
188 genes_not_mapped=[] | 485 # Compute RAS |
189 ind = 0 | 486 result = ras_obj.compute( |
190 for rule, reaction_ids in dict_rule_reactions.items(): | 487 or_expression=or_function, |
191 if len(rule) != 0: | 488 and_expression=and_function, |
192 # there is one gene at least in the formula | 489 ignore_nan=ignore_nan, |
193 warning_rule="_" | 490 print_progressbar=False, # No progress bar for ras_generator |
194 if "-" in rule: | 491 add_count_metadata=False, # No metadata in DataFrame mode |
195 warning_rule="-" | 492 add_met_metadata=False, |
196 if ":" in rule: | 493 add_essential_reactions=False, |
197 warning_rule=":" | 494 add_essential_genes=False |
198 rule_orig=rule.split().copy() #original rule in list | 495 ) |
199 rule = rule.replace(warning_rule,"_") | 496 |
200 #modified rule | 497 # Result is a tuple (ras_df, genes_not_mapped) in DataFrame mode |
201 rule_split = rule.split() | 498 return result |
202 rule_split_elements = list(filter(lambda x: x not in logic_operators, rule_split)) # remove of all logical operators | |
203 rule_split_elements = list(set(rule_split_elements)) # genes in formula | |
204 | |
205 # which genes are in the count matrix? | |
206 genes_in_count_matrix = [el for el in rule_split_elements if el in genes] | |
207 genes_notin_count_matrix = [] | |
208 for el in rule_split_elements: | |
209 if el not in genes: #not present in original dataset | |
210 if el.replace("_",warning_rule) in rule_orig: | |
211 genes_notin_count_matrix.append(el.replace("_",warning_rule)) | |
212 else: | |
213 genes_notin_count_matrix.append(el) | |
214 genes_not_mapped.extend(genes_notin_count_matrix) | |
215 | |
216 # add genes not present in the data | |
217 if len(genes_in_count_matrix) > 0: #there is at least one gene in the count matrix | |
218 if len(rule_split) == 1: | |
219 #one gene --> one reaction | |
220 ras_df[ind] = count_df_filtered.loc[genes_in_count_matrix] | |
221 else: | |
222 if len(genes_notin_count_matrix) > 0 and ignore_nan == False: | |
223 ras_df[ind] = np.nan | |
224 else: | |
225 # more genes in the formula | |
226 check_only_and=("and" in rule and "or" not in rule) #only and | |
227 check_only_or=("or" in rule and "and" not in rule) #only or | |
228 if check_only_and or check_only_or: | |
229 #or/and sequence | |
230 matrix = count_df_filtered.loc[genes_in_count_matrix].values | |
231 #compute for all cells | |
232 if check_only_and: | |
233 ras_df[ind] = and_function(matrix, axis=0) | |
234 else: | |
235 ras_df[ind] = or_function(matrix, axis=0) | |
236 else: | |
237 # complex expression (e.g. A or (B and C)) | |
238 data = count_df_filtered.loc[genes_in_count_matrix] # dataframe of genes in the GPRs | |
239 tree = ast.parse(rule, mode="eval").body | |
240 values_by_cell = [dict(zip(data.index, data[col].values)) for col in data.columns] | |
241 for j, values in enumerate(values_by_cell): | |
242 ras_df[ind, j] = _evaluate_ast(tree, values, or_function, and_function) | |
243 | |
244 ind +=1 | |
245 | |
246 #create the dataframe of ras (rules x samples) | |
247 ras_df= pd.DataFrame(data=ras_df,index=range(len(dict_rule_reactions)), columns=cell_ids) | |
248 ras_df['Reactions'] = [reaction_ids for rule,reaction_ids in dict_rule_reactions.items()] | |
249 | |
250 #create the reaction dataframe for ras (reactions x samples) | |
251 ras_df = ras_df.explode("Reactions").set_index("Reactions") | |
252 | |
253 #total genes not mapped from the gpr | |
254 genes_not_mapped = sorted(set(genes_not_mapped)) | |
255 | |
256 return ras_df,genes_not_mapped | |
257 | |
258 # function to evalute a complex boolean expression e.g. A or (B and C) | |
259 # function to evalute a complex boolean expression e.g. A or (B and C) | |
260 def _evaluate_ast( node, values,or_function,and_function): | |
261 if isinstance(node, ast.BoolOp): | |
262 | |
263 vals = [_evaluate_ast(v, values,or_function,and_function) for v in node.values] | |
264 | |
265 vals = [v for v in vals if v is not None] | |
266 if not vals: | |
267 return np.nan | |
268 | |
269 vals = [np.array(v) if isinstance(v, (list, np.ndarray)) else v for v in vals] | |
270 | |
271 if isinstance(node.op, ast.Or): | |
272 return or_function(vals) | |
273 elif isinstance(node.op, ast.And): | |
274 return and_function(vals) | |
275 | |
276 elif isinstance(node, ast.Name): | |
277 return values.get(node.id, None) | |
278 elif isinstance(node, ast.Constant): | |
279 key = str(node.value) #convert in str | |
280 return values.get(key, None) | |
281 else: | |
282 raise ValueError(f"Error in boolean expression: {ast.dump(node)}") | |
283 | 499 |
284 def main(args:List[str] = None) -> None: | 500 def main(args:List[str] = None) -> None: |
285 """ | 501 """ |
286 Initializes everything and sets the program in motion based on the fronted input arguments. | 502 Initializes everything and sets the program in motion based on the fronted input arguments. |
287 | 503 |