comparison COBRAxy/src/utils/model_utils.py @ 539:2fb97466e404 draft

Uploaded
author francesco_lapi
date Sat, 25 Oct 2025 14:55:13 +0000
parents
children fcdbc81feb45
comparison
equal deleted inserted replaced
538:fd53d42348bd 539:2fb97466e404
1 """
2 Utilities for generating and manipulating COBRA models and related metadata.
3
4 This module includes helpers to:
5 - extract rules, reactions, bounds, objective coefficients, and compartments
6 - build a COBRA model from a tabular file
7 - set objective and medium from dataframes
8 - validate a model and convert gene identifiers
9 - translate model GPRs using mapping tables
10 """
11 import os
12 import cobra
13 import pandas as pd
14 import re
15 import logging
16 from typing import Optional, Tuple, Union, List, Dict, Set
17 from collections import defaultdict
18 import utils.rule_parsing as rulesUtils
19 import utils.reaction_parsing as reactionUtils
20 from cobra import Model as cobraModel, Reaction, Metabolite
21 import sys
22
23
24 ############################ check_methods ####################################
25 def gene_type(l :str, name :str) -> str:
26 """
27 Determine the type of gene ID.
28
29 Args:
30 l (str): The gene identifier to check.
31 name (str): The name of the dataset, used in error messages.
32
33 Returns:
34 str: The type of gene ID ('HGNC_ID', 'ENSG', 'HGNC_symbol', or 'entrez_id').
35
36 Raises:
37 sys.exit: If the gene ID type is not supported, the execution is aborted.
38 """
39 if check_hgnc(l):
40 return 'HGNC_ID'
41 elif check_ensembl(l):
42 return 'ENSG'
43 elif check_symbol(l):
44 return 'HGNC_symbol'
45 elif check_entrez(l):
46 return 'entrez_id'
47 else:
48 sys.exit('Execution aborted:\n' +
49 'gene ID type in ' + name + ' not supported. Supported ID'+
50 'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n')
51
52 def check_hgnc(l :str) -> bool:
53 """
54 Check if a gene identifier follows the HGNC format.
55
56 Args:
57 l (str): The gene identifier to check.
58
59 Returns:
60 bool: True if the gene identifier follows the HGNC format, False otherwise.
61 """
62 if len(l) > 5:
63 if (l.upper()).startswith('HGNC:'):
64 return l[5:].isdigit()
65 else:
66 return False
67 else:
68 return False
69
70 def check_ensembl(l :str) -> bool:
71 """
72 Check if a gene identifier follows the Ensembl format.
73
74 Args:
75 l (str): The gene identifier to check.
76
77 Returns:
78 bool: True if the gene identifier follows the Ensembl format, False otherwise.
79 """
80 return l.upper().startswith('ENS')
81
82
83 def check_symbol(l :str) -> bool:
84 """
85 Check if a gene identifier follows the symbol format.
86
87 Args:
88 l (str): The gene identifier to check.
89
90 Returns:
91 bool: True if the gene identifier follows the symbol format, False otherwise.
92 """
93 if len(l) > 0:
94 if l[0].isalpha() and l[1:].isalnum():
95 return True
96 else:
97 return False
98 else:
99 return False
100
101 def check_entrez(l :str) -> bool:
102 """
103 Check if a gene identifier follows the Entrez ID format.
104
105 Args:
106 l (str): The gene identifier to check.
107
108 Returns:
109 bool: True if the gene identifier follows the Entrez ID format, False otherwise.
110 """
111 if len(l) > 0:
112 return l.isdigit()
113 else:
114 return False
115
116 ################################- DATA GENERATION -################################
117 ReactionId = str
118 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]:
119 """
120 Generate a dictionary mapping reaction IDs to GPR rules from the model.
121
122 Args:
123 model: COBRA model to derive data from.
124 asParsed: If True, parse rules into a nested list structure; otherwise keep raw strings.
125
126 Returns:
127 Dict[ReactionId, rulesUtils.OpList]: Parsed rules by reaction ID.
128 Dict[ReactionId, str]: Raw rules by reaction ID.
129 """
130 _ruleGetter = lambda reaction : reaction.gene_reaction_rule
131 ruleExtractor = (lambda reaction :
132 rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter
133
134 return {
135 reaction.id : ruleExtractor(reaction)
136 for reaction in model.reactions
137 if reaction.gene_reaction_rule }
138
139 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]:
140 """
141 Generate a dictionary mapping reaction IDs to reaction formulas from the model.
142
143 Args:
144 model: COBRA model to derive data from.
145 asParsed: If True, convert formulas into a parsed representation; otherwise keep raw strings.
146
147 Returns:
148 Dict[ReactionId, str]: Reactions by reaction ID (parsed if requested).
149 """
150
151 unparsedReactions = {
152 reaction.id : reaction.reaction
153 for reaction in model.reactions
154 if reaction.reaction
155 }
156
157 if not asParsed: return unparsedReactions
158
159 return reactionUtils.create_reaction_dict(unparsedReactions)
160
161 def get_medium(model:cobraModel) -> pd.DataFrame:
162 """
163 Extract the uptake reactions representing the model medium.
164
165 Returns a DataFrame with a single column 'reaction' listing exchange reactions
166 with negative lower bound and no positive stoichiometric coefficients (uptake only).
167 """
168 trueMedium=[]
169 for r in model.reactions:
170 positiveCoeff=0
171 for m in r.metabolites:
172 if r.get_coefficient(m.id)>0:
173 positiveCoeff=1;
174 if (positiveCoeff==0 and r.lower_bound<0):
175 trueMedium.append(r.id)
176
177 df_medium = pd.DataFrame()
178 df_medium["reaction"] = trueMedium
179 return df_medium
180
181 def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame:
182 """
183 Extract objective coefficients for each reaction.
184
185 Args:
186 model: COBRA model
187
188 Returns:
189 pd.DataFrame with columns: ReactionID, ObjectiveCoefficient
190 """
191 coeffs = []
192 # model.objective.expression is a linear expression
193 objective_expr = model.objective.expression.as_coefficients_dict()
194
195 for reaction in model.reactions:
196 coeff = objective_expr.get(reaction.forward_variable, 0.0)
197 coeffs.append({
198 "ReactionID": reaction.id,
199 "ObjectiveCoefficient": coeff
200 })
201
202 return pd.DataFrame(coeffs)
203
204 def generate_bounds(model:cobraModel) -> pd.DataFrame:
205 """
206 Build a DataFrame of lower/upper bounds for all reactions.
207
208 Returns:
209 pd.DataFrame indexed by reaction IDs with columns ['lower_bound', 'upper_bound'].
210 """
211
212 rxns = []
213 for reaction in model.reactions:
214 rxns.append(reaction.id)
215
216 bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns)
217
218 for reaction in model.reactions:
219 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound]
220 return bounds
221
222
223
224 def generate_compartments(model: cobraModel) -> pd.DataFrame:
225 """
226 Generates a DataFrame containing pathway information for each reaction.
227 Creates columns for each pathway position (Pathway_1, Pathway_2, etc.) only if pathways exist.
228
229 Args:
230 model: the COBRA model to extract pathway data from.
231
232 Returns:
233 pd.DataFrame: DataFrame with ReactionID and pathway columns (if any pathways exist)
234 """
235 pathway_data = []
236
237 # First pass: determine the maximum number of pathways any reaction has
238 max_pathways = 0
239 reaction_pathways = {}
240 has_any_pathways = False
241
242 for reaction in model.reactions:
243 # Get unique pathways from all metabolites in the reaction
244 if 'pathways' in reaction.annotation and reaction.annotation['pathways']:
245 if type(reaction.annotation['pathways']) == list:
246 # Filter out empty/None values
247 valid_pathways = [p for p in reaction.annotation['pathways'] if p]
248 if valid_pathways:
249 reaction_pathways[reaction.id] = valid_pathways
250 max_pathways = max(max_pathways, len(valid_pathways))
251 has_any_pathways = True
252 else:
253 reaction_pathways[reaction.id] = []
254 else:
255 # Single pathway value
256 if reaction.annotation['pathways']:
257 reaction_pathways[reaction.id] = [reaction.annotation['pathways']]
258 max_pathways = max(max_pathways, 1)
259 has_any_pathways = True
260 else:
261 reaction_pathways[reaction.id] = []
262 else:
263 # No pathway annotation - use empty list
264 reaction_pathways[reaction.id] = []
265
266 # If no pathways exist in the model, return DataFrame with only ReactionID
267 if not has_any_pathways:
268 return None
269
270 # Create column names for pathways only if they exist
271 pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)]
272
273 # Second pass: create the data with pathway columns
274 for reaction_id, pathways in reaction_pathways.items():
275 row = {"ReactionID": reaction_id}
276
277 # Fill pathway columns
278 for i in range(max_pathways):
279 col_name = pathway_columns[i]
280 if i < len(pathways):
281 row[col_name] = pathways[i]
282 else:
283 row[col_name] = None
284
285 pathway_data.append(row)
286
287 return pd.DataFrame(pathway_data)
288
289 def set_annotation_pathways_from_data(model: cobraModel, df: pd.DataFrame):
290 """Set reaction pathways based on 'Pathway_1', 'Pathway_2', ... columns in the dataframe."""
291 pathway_cols = [col for col in df.columns if col.startswith('Pathway_')]
292 if not pathway_cols:
293 print("No 'Pathway_' columns found, skipping pathway annotation")
294 return
295
296 pathway_data = defaultdict(list)
297
298 for idx, row in df.iterrows():
299 reaction_id = str(row['ReactionID']).strip()
300 if reaction_id not in model.reactions:
301 continue
302
303 pathways = []
304 for col in pathway_cols:
305 if pd.notna(row[col]) and str(row[col]).strip():
306 pathways.append(str(row[col]).strip())
307
308 if pathways:
309
310 reaction = model.reactions.get_by_id(reaction_id)
311 if len(pathways) == 1:
312 reaction.annotation['pathways'] = pathways[0]
313 else:
314 reaction.annotation['pathways'] = pathways
315
316 pathway_data[reaction_id] = pathways
317
318 print(f"Annotated {len(pathway_data)} reactions with pathways.")
319
320 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel:
321 """
322 Build a COBRApy model from a tabular file with reaction data.
323
324 Args:
325 csv_path: Path to the tab-separated file.
326 model_id: ID for the newly created model.
327
328 Returns:
329 cobra.Model: The constructed COBRApy model.
330 """
331
332 # Try to detect separator
333 with open(csv_path, 'r') as f:
334 first_line = f.readline()
335 sep = '\t' if '\t' in first_line else ','
336
337 df = pd.read_csv(csv_path, sep=sep)
338
339 # Check required columns
340 required_cols = ['ReactionID', 'Formula']
341 missing_cols = [col for col in required_cols if col not in df.columns]
342 if missing_cols:
343 raise ValueError(f"Missing required columns: {missing_cols}. Available columns: {list(df.columns)}")
344
345 model = cobraModel(model_id)
346
347 metabolites_dict = {}
348 compartments_dict = {}
349
350 print(f"Building model from {len(df)} reactions...")
351
352 for idx, row in df.iterrows():
353 reaction_formula = str(row['Formula']).strip()
354 if not reaction_formula or reaction_formula == 'nan':
355 continue
356
357 metabolites = extract_metabolites_from_reaction(reaction_formula)
358
359 for met_id in metabolites:
360 compartment = extract_compartment_from_metabolite(met_id)
361
362 if compartment not in compartments_dict:
363 compartments_dict[compartment] = compartment
364
365 if met_id not in metabolites_dict:
366 metabolites_dict[met_id] = Metabolite(
367 id=met_id,
368 compartment=compartment,
369 name=met_id.replace(f"_{compartment}", "").replace("__", "_")
370 )
371
372 model.compartments = compartments_dict
373
374 model.add_metabolites(list(metabolites_dict.values()))
375
376 print(f"Added {len(metabolites_dict)} metabolites and {len(compartments_dict)} compartments")
377
378 reactions_added = 0
379 reactions_skipped = 0
380
381 for idx, row in df.iterrows():
382
383 reaction_id = str(row['ReactionID']).strip()
384 reaction_formula = str(row['Formula']).strip()
385
386 if not reaction_formula or reaction_formula == 'nan':
387 raise ValueError(f"Missing reaction formula for {reaction_id}")
388
389 reaction = Reaction(reaction_id)
390 reaction.name = reaction_id
391
392 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0
393 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0
394
395 if pd.notna(row['GPR']) and str(row['GPR']).strip():
396 reaction.gene_reaction_rule = str(row['GPR']).strip()
397
398 try:
399 parse_reaction_formula(reaction, reaction_formula, metabolites_dict)
400 except Exception as e:
401 print(f"Error parsing reaction {reaction_id}: {e}")
402 reactions_skipped += 1
403 continue
404
405 model.add_reactions([reaction])
406 reactions_added += 1
407
408
409 print(f"Added {reactions_added} reactions, skipped {reactions_skipped} reactions")
410
411 # set objective function
412 set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient")
413
414 set_medium_from_data(model, df)
415
416 set_annotation_pathways_from_data(model, df)
417
418 print(f"Model completed: {len(model.reactions)} reactions, {len(model.metabolites)} metabolites")
419
420 return model
421
422
423 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore)
424 #def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
425 # """
426 # Extract metabolite IDs from a reaction formula.
427 # Robust pattern: tokens ending with _<compartment> (e.g., _c, _m, _e),
428 # allowing leading digits/underscores.
429 # """
430 # metabolites = set()
431 # # optional coefficient followed by a token ending with _<letters>
432 # if reaction_formula[-1] == ']' and reaction_formula[-3] == '[':
433 # pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+[[A-Za-z0-9]]+)'
434 # else:
435 # pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[A-Za-z0-9]+)'
436 # matches = re.findall(pattern, reaction_formula)
437 # metabolites.update(matches)
438 # return metabolites
439
440
441 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
442 """
443 Extract metabolite IDs from a reaction formula.
444
445 Handles:
446 - optional stoichiometric coefficients (integers or decimals)
447 - compartment tags at the end of the metabolite, either [c] or _c
448
449 Returns the IDs including the compartment suffix exactly as written.
450 """
451 pattern = re.compile(
452 r'(?:^|(?<=\s)|(?<=\+)|(?<=,)|(?<==)|(?<=:))' # left boundary (start, space, +, comma, =, :)
453 r'(?:\d+(?:\.\d+)?\s+)?' # optional coefficient (requires space after)
454 r'([A-Za-z0-9][A-Za-z0-9_]*(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+))' # metabolite + compartment (can start with number)
455 )
456 return {m.group(1) for m in pattern.finditer(reaction_formula)}
457
458
459
460 def extract_compartment_from_metabolite(metabolite_id: str) -> str:
461 """Extract the compartment from a metabolite ID."""
462 if '_' == metabolite_id[-2]:
463 return metabolite_id.split('_')[-1]
464 if metabolite_id[-1] == ']' and metabolite_id[-3] == '[':
465 return metabolite_id[-2]
466 return 'c' # default cytoplasm
467
468
469 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]):
470 """Parse a reaction formula and set metabolites with their coefficients."""
471
472 if '<=>' in formula:
473 parts = formula.split('<=>')
474 reversible = True
475 elif '<--' in formula:
476 parts = formula.split('<--')
477 reversible = False
478 elif '-->' in formula:
479 parts = formula.split('-->')
480 reversible = False
481 elif '<-' in formula:
482 parts = formula.split('<-')
483 reversible = False
484 else:
485 raise ValueError(f"Unrecognized reaction format: {formula}")
486
487 # Handle cases where one side might be empty (exchange reactions)
488 if len(parts) != 2:
489 raise ValueError(f"Invalid reaction format, expected 2 parts: {formula}")
490
491 left, right = parts[0].strip(), parts[1].strip()
492
493 reactants = parse_metabolites_side(left) if left else {}
494 products = parse_metabolites_side(right) if right else {}
495
496 metabolites_to_add = {}
497
498 for met_id, coeff in reactants.items():
499 if met_id in metabolites_dict:
500 metabolites_to_add[metabolites_dict[met_id]] = -coeff
501
502 for met_id, coeff in products.items():
503 if met_id in metabolites_dict:
504 metabolites_to_add[metabolites_dict[met_id]] = coeff
505
506 reaction.add_metabolites(metabolites_to_add)
507
508
509 def parse_metabolites_side(side_str: str) -> Dict[str, float]:
510 """Parse one side of a reaction and extract metabolites with coefficients."""
511 metabolites = {}
512 if not side_str or side_str.strip() == '':
513 return metabolites
514
515 terms = side_str.split('+')
516 for term in terms:
517 term = term.strip()
518 if not term:
519 continue
520
521 # First check if term has space-separated coefficient and metabolite
522 parts = term.split()
523 if len(parts) == 2:
524 # Two parts: potential coefficient + metabolite
525 try:
526 coeff = float(parts[0])
527 met_id = parts[1]
528 # Verify the second part looks like a metabolite with compartment
529 if re.match(r'[A-Za-z0-9_]+(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+)', met_id):
530 metabolites[met_id] = coeff
531 continue
532 except ValueError:
533 pass
534
535 # Single term - check if it's a metabolite (no coefficient)
536 # Updated pattern to include metabolites starting with numbers
537 if re.match(r'[A-Za-z0-9][A-Za-z0-9_]*(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+)', term):
538 metabolites[term] = 1.0
539 else:
540 print(f"Warning: Could not parse metabolite term: '{term}'")
541
542 return metabolites
543
544
545
546 def set_objective_from_csv(model: cobra.Model, df: pd.DataFrame, obj_col: str = "ObjectiveCoefficient"):
547 """
548 Sets the model's objective function based on a column of coefficients in the CSV.
549 Can be any reaction(s), not necessarily biomass.
550 """
551 obj_dict = {}
552
553 for idx, row in df.iterrows():
554 reaction_id = str(row['ReactionID']).strip()
555 coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0
556 if coeff != 0:
557 if reaction_id in model.reactions:
558 obj_dict[model.reactions.get_by_id(reaction_id)] = coeff
559 else:
560 print(f"Warning: reaction {reaction_id} not found in model, skipping for objective.")
561
562 if not obj_dict:
563 raise ValueError("No reactions found with non-zero objective coefficient.")
564
565 model.objective = obj_dict
566 print(f"Objective set with {len(obj_dict)} reactions.")
567
568
569
570
571 def set_medium_from_data(model: cobraModel, df: pd.DataFrame):
572 """Set the medium based on the 'InMedium' column in the dataframe."""
573 if 'InMedium' not in df.columns:
574 print("No 'InMedium' column found, skipping medium setup")
575 return
576
577 medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist()
578
579 medium_dict = {}
580 for rxn_id in medium_reactions:
581 if rxn_id in [r.id for r in model.reactions]:
582 reaction = model.reactions.get_by_id(rxn_id)
583 if reaction.lower_bound < 0:
584 medium_dict[rxn_id] = abs(reaction.lower_bound)
585
586 if medium_dict:
587 model.medium = medium_dict
588 print(f"Medium set with {len(medium_dict)} components")
589 else:
590 print("No medium components found")
591 def validate_model(model: cobraModel) -> Dict[str, any]:
592 """Validate the model and return basic statistics."""
593 validation = {
594 'num_reactions': len(model.reactions),
595 'num_metabolites': len(model.metabolites),
596 'num_genes': len(model.genes),
597 'num_compartments': len(model.compartments),
598 'objective': str(model.objective),
599 'medium_size': len(model.medium),
600 'reversible_reactions': len([r for r in model.reactions if r.reversibility]),
601 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]),
602 }
603
604 try:
605 # Growth test
606 solution = model.optimize()
607 validation['growth_rate'] = solution.objective_value
608 validation['status'] = solution.status
609 except Exception as e:
610 validation['growth_rate'] = None
611 validation['status'] = f"Error: {e}"
612
613 return validation
614
615 def convert_genes(model, annotation):
616 """Rename genes using a selected annotation key in gene.notes; returns a model copy."""
617 from cobra.manipulation import rename_genes
618 model2=model.copy()
619 try:
620 dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes}
621 except:
622 print("No annotation in gene dict!")
623 return -1
624 rename_genes(model2,dict_genes)
625
626 return model2
627
628 # ---------- Utility helpers ----------
629 def _normalize_colname(col: str) -> str:
630 return col.strip().lower().replace(' ', '_')
631
632 def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]:
633 """
634 Find useful columns and return a dict {ensg: colname1, hgnc_id: colname2, ...}.
635 Raise ValueError if no suitable mapping is found.
636 """
637 cols = { _normalize_colname(c): c for c in mapping_df.columns }
638 chosen = {}
639 # candidate names for each category
640 candidates = {
641 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'],
642 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'],
643 'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'],
644 'entrez_id': ['entrez', 'entrez_id', 'entrezgene'],
645 'gene_number': ['gene_number']
646 }
647 for key, names in candidates.items():
648 for n in names:
649 if n in cols:
650 chosen[key] = cols[n]
651 break
652 return chosen
653
654 def _validate_target_uniqueness(mapping_df: 'pd.DataFrame',
655 source_col: str,
656 target_col: str,
657 model_source_genes: Optional[Set[str]] = None,
658 logger: Optional[logging.Logger] = None) -> None:
659 """
660 Check that, within the filtered mapping_df, each target maps to at most one source.
661 Log examples if duplicates are found.
662 """
663 if logger is None:
664 logger = logging.getLogger(__name__)
665
666 if mapping_df.empty:
667 logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.")
668 return
669
670 # normalize temporary columns for grouping (without altering the original df)
671 tmp = mapping_df[[source_col, target_col]].copy()
672 tmp['_src_norm'] = tmp[source_col].astype(str).apply(_normalize_gene_id)
673 tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip()
674
675 # optionally filter to the set of model source genes
676 if model_source_genes is not None:
677 tmp = tmp[tmp['_src_norm'].isin(model_source_genes)]
678
679 if tmp.empty:
680 logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.")
681 return
682
683 # build reverse mapping: target -> set(sources)
684 grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna()))
685 # find targets with more than one source
686 problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1}
687
688 if problematic:
689 # prepare warning message with examples (limited subset)
690 sample_items = list(problematic.items())
691 msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."]
692 for tgt, sources in sample_items:
693 msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}")
694 full_msg = "\n".join(msg_lines)
695 # log warning
696 logger.warning(full_msg)
697
698 # if everything is fine
699 logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).")
700
701
702 def _normalize_gene_id(g: str) -> str:
703 """Normalize a gene ID for use as a key (removes prefixes like 'HGNC:' and strips)."""
704 if g is None:
705 return ""
706 g = str(g).strip()
707 # remove common prefixes
708 g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE)
709 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE)
710 return g
711
712 def _is_or_only_expression(expr: str) -> bool:
713 """
714 Check if a GPR expression contains only OR operators (no AND operators).
715
716 Args:
717 expr: GPR expression string
718
719 Returns:
720 bool: True if expression contains only OR (and parentheses) and has multiple genes, False otherwise
721 """
722 if not expr or not expr.strip():
723 return False
724
725 # Normalize the expression
726 normalized = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
727
728 # Check if it contains any AND operators
729 has_and = ' and ' in normalized.lower()
730
731 # Check if it contains OR operators
732 has_or = ' or ' in normalized.lower()
733
734 # Must have OR operators and no AND operators
735 return has_or and not has_and
736
737
738 def _flatten_or_only_gpr(expr: str) -> str:
739 """
740 Flatten a GPR expression that contains only OR operators by:
741 1. Removing all parentheses
742 2. Extracting unique gene names
743 3. Joining them with ' or '
744
745 Args:
746 expr: GPR expression string with only OR operators
747
748 Returns:
749 str: Flattened GPR expression
750 """
751 if not expr or not expr.strip():
752 return expr
753
754 # Extract all gene tokens (exclude logical operators and parentheses)
755 gene_pattern = r'\b[A-Za-z0-9:_.-]+\b'
756 logical = {'and', 'or', 'AND', 'OR', '(', ')'}
757
758 tokens = re.findall(gene_pattern, expr)
759 genes = [t for t in tokens if t not in logical]
760
761 # Create set to remove duplicates, then convert back to list to maintain some order
762 unique_genes = list(dict.fromkeys(genes)) # Preserves insertion order
763
764 if len(unique_genes) == 0:
765 return expr
766 elif len(unique_genes) == 1:
767 return unique_genes[0]
768 else:
769 return ' or '.join(unique_genes)
770
771
772 def _simplify_boolean_expression(expr: str) -> str:
773 """
774 Simplify a boolean expression by removing duplicates while strictly preserving semantics.
775 This function handles simple duplicates within parentheses while being conservative about
776 complex expressions that could change semantics.
777 """
778 if not expr or not expr.strip():
779 return expr
780
781 # Normalize operators and whitespace
782 expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
783 expr = ' '.join(expr.split()) # Normalize whitespace
784
785 def simplify_parentheses_content(match_obj):
786 """Helper function to simplify content within parentheses."""
787 content = match_obj.group(1) # Content inside parentheses
788
789 # Only simplify if it's a pure OR or pure AND chain
790 if ' or ' in content and ' and ' not in content:
791 # Pure OR chain - safe to deduplicate
792 parts = [p.strip() for p in content.split(' or ') if p.strip()]
793 unique_parts = []
794 seen = set()
795 for part in parts:
796 if part not in seen:
797 unique_parts.append(part)
798 seen.add(part)
799
800 if len(unique_parts) == 1:
801 return unique_parts[0] # Remove unnecessary parentheses for single items
802 else:
803 return '(' + ' or '.join(unique_parts) + ')'
804
805 elif ' and ' in content and ' or ' not in content:
806 # Pure AND chain - safe to deduplicate
807 parts = [p.strip() for p in content.split(' and ') if p.strip()]
808 unique_parts = []
809 seen = set()
810 for part in parts:
811 if part not in seen:
812 unique_parts.append(part)
813 seen.add(part)
814
815 if len(unique_parts) == 1:
816 return unique_parts[0] # Remove unnecessary parentheses for single items
817 else:
818 return '(' + ' and '.join(unique_parts) + ')'
819 else:
820 # Mixed operators or single item - return with parentheses as-is
821 return '(' + content + ')'
822
823 def remove_duplicates_simple(parts_str: str, separator: str) -> str:
824 """Remove duplicates from a simple chain of operations."""
825 parts = [p.strip() for p in parts_str.split(separator) if p.strip()]
826
827 # Remove duplicates while preserving order
828 unique_parts = []
829 seen = set()
830 for part in parts:
831 if part not in seen:
832 unique_parts.append(part)
833 seen.add(part)
834
835 if len(unique_parts) == 1:
836 return unique_parts[0]
837 else:
838 return f' {separator} '.join(unique_parts)
839
840 try:
841 import re
842
843 # First, simplify content within parentheses
844 # This handles cases like (A or A) -> A and (B and B) -> B
845 expr_simplified = re.sub(r'\(([^()]+)\)', simplify_parentheses_content, expr)
846
847 # Check if the resulting expression has mixed operators
848 has_and = ' and ' in expr_simplified
849 has_or = ' or ' in expr_simplified
850
851 # Only simplify top-level if it's pure AND or pure OR
852 if has_and and not has_or and '(' not in expr_simplified:
853 # Pure AND chain at top level - safe to deduplicate
854 return remove_duplicates_simple(expr_simplified, 'and')
855 elif has_or and not has_and and '(' not in expr_simplified:
856 # Pure OR chain at top level - safe to deduplicate
857 return remove_duplicates_simple(expr_simplified, 'or')
858 else:
859 # Mixed operators or has parentheses - return the simplified version (with parentheses content cleaned)
860 return expr_simplified
861
862 except Exception:
863 # If anything goes wrong, return the original expression
864 return expr
865
866
867 def translate_model_genes(model: 'cobra.Model',
868 mapping_df: 'pd.DataFrame',
869 target_nomenclature: str,
870 source_nomenclature: str = 'hgnc_id',
871 allow_many_to_one: bool = False,
872 logger: Optional[logging.Logger] = None) -> Tuple['cobra.Model', Dict[str, str]]:
873 """
874 Translate model genes from source_nomenclature to target_nomenclature using a mapping table.
875 mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez).
876
877 Args:
878 model: COBRA model to translate.
879 mapping_df: DataFrame containing the mapping information.
880 target_nomenclature: Desired target key (e.g., 'hgnc_symbol').
881 source_nomenclature: Current source key in the model (default 'hgnc_id').
882 allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs.
883 logger: Optional logger.
884
885 Returns:
886 Tuple containing:
887 - Translated COBRA model
888 - Dictionary mapping reaction IDs to translation issue descriptions
889 """
890 if logger is None:
891 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
892 logger = logging.getLogger(__name__)
893
894 logger.info(f"Translating genes from '{source_nomenclature}' to '{target_nomenclature}'")
895
896 # normalize column names and choose relevant columns
897 chosen = _choose_columns(mapping_df)
898 if not chosen:
899 raise ValueError("Could not detect useful columns in mapping_df. Expected at least one of: ensg, hgnc_id, hgnc_symbol, entrez.")
900
901 # map source/target to actual dataframe column names (allow user-specified source/target keys)
902 # normalize input args
903 src_key = source_nomenclature.strip().lower()
904 tgt_key = target_nomenclature.strip().lower()
905
906 # try to find the actual column names for requested keys
907 col_for_src = None
908 col_for_tgt = None
909 # first, try exact match
910 for k, actual in chosen.items():
911 if k == src_key:
912 col_for_src = actual
913 if k == tgt_key:
914 col_for_tgt = actual
915
916 # if not found, try mapping common names
917 if col_for_src is None:
918 possible_src_names = {k: v for k, v in chosen.items()}
919 # try to match by contained substring
920 for k, actual in possible_src_names.items():
921 if src_key in k:
922 col_for_src = actual
923 break
924
925 if col_for_tgt is None:
926 for k, actual in chosen.items():
927 if tgt_key in k:
928 col_for_tgt = actual
929 break
930
931 if col_for_src is None:
932 raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.")
933 if col_for_tgt is None:
934 raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.")
935
936 model_source_genes = { _normalize_gene_id(g.id) for g in model.genes }
937 logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).")
938
939 tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy()
940 tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).apply(_normalize_gene_id)
941
942 filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy()
943
944 if filtered_map.empty:
945 logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).")
946
947 if not allow_many_to_one:
948 _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger)
949
950 # Crea il mapping
951 gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger)
952
953 # copy model
954 model_copy = model.copy()
955
956 # statistics
957 stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0, 'flattened_or_gprs': 0}
958 unmapped = []
959 multi = []
960
961 # Dictionary to store translation issues per reaction
962 reaction_translation_issues = {}
963
964 original_genes = {g.id for g in model_copy.genes}
965 logger.info(f"Original genes count: {len(original_genes)}")
966
967 # translate GPRs
968 for rxn in model_copy.reactions:
969 gpr = rxn.gene_reaction_rule
970 if gpr and gpr.strip():
971 new_gpr, rxn_issues = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger, track_issues=True)
972 if rxn_issues:
973 reaction_translation_issues[rxn.id] = rxn_issues
974
975 if new_gpr != gpr:
976 # Check if this GPR has translation issues and contains only OR operators
977 if rxn_issues and _is_or_only_expression(new_gpr):
978 # Flatten the GPR: remove parentheses and create set of unique genes
979 flattened_gpr = _flatten_or_only_gpr(new_gpr)
980 if flattened_gpr != new_gpr:
981 stats['flattened_or_gprs'] += 1
982 logger.debug(f"Flattened OR-only GPR with issues for {rxn.id}: '{new_gpr}' -> '{flattened_gpr}'")
983 new_gpr = flattened_gpr
984
985 simplified_gpr = _simplify_boolean_expression(new_gpr)
986 if simplified_gpr != new_gpr:
987 stats['simplified_gprs'] += 1
988 logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'")
989 rxn.gene_reaction_rule = simplified_gpr
990 logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{simplified_gpr}'")
991
992 # update model genes based on new GPRs
993 _update_model_genes(model_copy, logger)
994
995 # final logging
996 _log_translation_statistics(stats, unmapped, multi, original_genes, model_copy.genes, logger)
997
998 logger.info("Translation finished")
999 return model_copy, reaction_translation_issues
1000
1001
1002 # ---------- helper functions ----------
1003 def _create_gene_mapping(mapping_df, source_col: str, target_col: str, logger: logging.Logger) -> Dict[str, List[str]]:
1004 """
1005 Build mapping dict: source_id -> list of target_ids
1006 Normalizes IDs (removes prefixes like 'HGNC:' etc).
1007 """
1008 df = mapping_df[[source_col, target_col]].dropna().copy()
1009 # normalize to string
1010 df[source_col] = df[source_col].astype(str).apply(_normalize_gene_id)
1011 df[target_col] = df[target_col].astype(str).str.strip()
1012
1013 df = df.drop_duplicates()
1014
1015 logger.info(f"Creating mapping from {len(df)} rows")
1016
1017 mapping = defaultdict(list)
1018 for _, row in df.iterrows():
1019 s = row[source_col]
1020 t = row[target_col]
1021 if t not in mapping[s]:
1022 mapping[s].append(t)
1023
1024 # stats
1025 one_to_one = sum(1 for v in mapping.values() if len(v) == 1)
1026 one_to_many = sum(1 for v in mapping.values() if len(v) > 1)
1027 logger.info(f"Mapping: {len(mapping)} source keys, {one_to_one} 1:1, {one_to_many} 1:many")
1028 return dict(mapping)
1029
1030
1031 def _translate_gpr(gpr_string: str,
1032 gene_mapping: Dict[str, List[str]],
1033 stats: Dict[str, int],
1034 unmapped_genes: List[str],
1035 multi_mapping_genes: List[Tuple[str, List[str]]],
1036 logger: logging.Logger,
1037 track_issues: bool = False) -> Union[str, Tuple[str, str]]:
1038 """
1039 Translate genes inside a GPR string using gene_mapping.
1040 Returns new GPR string, and optionally translation issues if track_issues=True.
1041 """
1042 # Generic token pattern: letters, digits, :, _, -, ., (captures HGNC:1234, ENSG000..., symbols)
1043 token_pattern = r'\b[A-Za-z0-9:_.-]+\b'
1044 tokens = re.findall(token_pattern, gpr_string)
1045
1046 logical = {'and', 'or', 'AND', 'OR', '(', ')'}
1047 tokens = [t for t in tokens if t not in logical]
1048
1049 new_gpr = gpr_string
1050 issues = []
1051
1052 for token in sorted(set(tokens), key=lambda x: -len(x)): # longer tokens first to avoid partial replacement
1053 norm = _normalize_gene_id(token)
1054 if norm in gene_mapping:
1055 targets = gene_mapping[norm]
1056 stats['translated'] += 1
1057 if len(targets) == 1:
1058 stats['one_to_one'] += 1
1059 replacement = targets[0]
1060 else:
1061 stats['one_to_many'] += 1
1062 multi_mapping_genes.append((token, targets))
1063 replacement = "(" + " or ".join(targets) + ")"
1064 if track_issues:
1065 issues.append(f"{token} -> {' or '.join(targets)}")
1066
1067 pattern = r'\b' + re.escape(token) + r'\b'
1068 new_gpr = re.sub(pattern, replacement, new_gpr)
1069 else:
1070 stats['not_found'] += 1
1071 if token not in unmapped_genes:
1072 unmapped_genes.append(token)
1073 logger.debug(f"Token not found in mapping (left as-is): {token}")
1074
1075 # Check for many-to-one cases (multiple source genes mapping to same target)
1076 if track_issues:
1077 # Build reverse mapping to detect many-to-one cases from original tokens
1078 original_to_target = {}
1079
1080 for orig_token in tokens:
1081 norm = _normalize_gene_id(orig_token)
1082 if norm in gene_mapping:
1083 targets = gene_mapping[norm]
1084 for target in targets:
1085 if target not in original_to_target:
1086 original_to_target[target] = []
1087 if orig_token not in original_to_target[target]:
1088 original_to_target[target].append(orig_token)
1089
1090 # Identify many-to-one mappings in this specific GPR
1091 for target, original_genes in original_to_target.items():
1092 if len(original_genes) > 1:
1093 issues.append(f"{' or '.join(original_genes)} -> {target}")
1094
1095 issue_text = "; ".join(issues) if issues else ""
1096
1097 if track_issues:
1098 return new_gpr, issue_text
1099 else:
1100 return new_gpr
1101
1102
1103 def _update_model_genes(model: 'cobra.Model', logger: logging.Logger):
1104 """
1105 Rebuild model.genes from gene_reaction_rule content.
1106 Removes genes not referenced and adds missing ones.
1107 """
1108 # collect genes in GPRs
1109 gene_pattern = r'\b[A-Za-z0-9:_.-]+\b'
1110 logical = {'and', 'or', 'AND', 'OR', '(', ')'}
1111 genes_in_gpr: Set[str] = set()
1112
1113 for rxn in model.reactions:
1114 gpr = rxn.gene_reaction_rule
1115 if gpr and gpr.strip():
1116 toks = re.findall(gene_pattern, gpr)
1117 toks = [t for t in toks if t not in logical]
1118 # normalize IDs consistent with mapping normalization
1119 toks = [_normalize_gene_id(t) for t in toks]
1120 genes_in_gpr.update(toks)
1121
1122 # existing gene ids
1123 existing = {g.id for g in model.genes}
1124
1125 # remove obsolete genes
1126 to_remove = [gid for gid in existing if gid not in genes_in_gpr]
1127 removed = 0
1128 for gid in to_remove:
1129 try:
1130 gene_obj = model.genes.get_by_id(gid)
1131 model.genes.remove(gene_obj)
1132 removed += 1
1133 except Exception:
1134 # safe-ignore
1135 pass
1136
1137 # add new genes
1138 added = 0
1139 for gid in genes_in_gpr:
1140 if gid not in existing:
1141 new_gene = cobra.Gene(gid)
1142 try:
1143 model.genes.add(new_gene)
1144 except Exception:
1145 # fallback: if model.genes doesn't support add, try append or model.add_genes
1146 try:
1147 model.genes.append(new_gene)
1148 except Exception:
1149 try:
1150 model.add_genes([new_gene])
1151 except Exception:
1152 logger.warning(f"Could not add gene object for {gid}")
1153 added += 1
1154
1155 logger.info(f"Model genes updated: removed {removed}, added {added}")
1156
1157
1158 def export_model_to_tabular(model: cobraModel,
1159 output_path: str,
1160 translation_issues: Dict = None,
1161 include_objective: bool = True,
1162 save_function = None) -> pd.DataFrame:
1163 """
1164 Export a COBRA model to tabular format with optional components.
1165
1166 Args:
1167 model: COBRA model to export
1168 output_path: Path where to save the tabular file
1169 translation_issues: Optional dict of {reaction_id: issues} from gene translation
1170 include_objective: Whether to include objective coefficient column
1171 save_function: Optional custom save function, if None uses pd.DataFrame.to_csv
1172
1173 Returns:
1174 pd.DataFrame: The merged tabular data
1175 """
1176 # Generate model data
1177 rules = generate_rules(model, asParsed=False)
1178
1179 reactions = generate_reactions(model, asParsed=False)
1180 bounds = generate_bounds(model)
1181 medium = get_medium(model)
1182 compartments = generate_compartments(model)
1183
1184 # Create base DataFrames
1185 df_rules = pd.DataFrame(list(rules.items()), columns=["ReactionID", "GPR"])
1186 df_reactions = pd.DataFrame(list(reactions.items()), columns=["ReactionID", "Formula"])
1187 df_bounds = bounds.reset_index().rename(columns={"index": "ReactionID"})
1188 df_medium = medium.rename(columns={"reaction": "ReactionID"})
1189 df_medium["InMedium"] = True
1190
1191 # Start merging
1192 merged = df_reactions.merge(df_rules, on="ReactionID", how="outer")
1193 merged = merged.merge(df_bounds, on="ReactionID", how="outer")
1194
1195 # Add objective coefficients if requested
1196 if include_objective:
1197 objective_function = extract_objective_coefficients(model)
1198 merged = merged.merge(objective_function, on="ReactionID", how="outer")
1199
1200 # Add compartments/pathways if they exist
1201 if compartments is not None:
1202 merged = merged.merge(compartments, on="ReactionID", how="outer")
1203
1204 # Add medium information
1205 merged = merged.merge(df_medium, on="ReactionID", how="left")
1206
1207 # Add translation issues if provided
1208 if translation_issues:
1209 df_translation_issues = pd.DataFrame([
1210 {"ReactionID": rxn_id, "TranslationIssues": issues}
1211 for rxn_id, issues in translation_issues.items()
1212 ])
1213 if not df_translation_issues.empty:
1214 merged = merged.merge(df_translation_issues, on="ReactionID", how="left")
1215 merged["TranslationIssues"] = merged["TranslationIssues"].fillna("")
1216
1217 # Final processing
1218 merged["InMedium"] = merged["InMedium"].fillna(False)
1219 merged = merged.sort_values(by="InMedium", ascending=False)
1220
1221 # Save the file
1222 if save_function:
1223 save_function(merged, output_path)
1224 else:
1225 merged.to_csv(output_path, sep="\t", index=False)
1226
1227 return merged
1228
1229
1230 def _log_translation_statistics(stats: Dict[str, int],
1231 unmapped_genes: List[str],
1232 multi_mapping_genes: List[Tuple[str, List[str]]],
1233 original_genes: Set[str],
1234 final_genes,
1235 logger: logging.Logger):
1236 logger.info("=== TRANSLATION STATISTICS ===")
1237 logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})")
1238 logger.info(f"Not found tokens: {stats.get('not_found', 0)}")
1239 logger.info(f"Simplified GPRs: {stats.get('simplified_gprs', 0)}")
1240 logger.info(f"Flattened OR-only GPRs with issues: {stats.get('flattened_or_gprs', 0)}")
1241
1242 final_ids = {g.id for g in final_genes}
1243 logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}")
1244
1245 if unmapped_genes:
1246 logger.warning(f"Unmapped tokens ({len(unmapped_genes)}): {', '.join(unmapped_genes[:20])}{(' ...' if len(unmapped_genes)>20 else '')}")
1247 if multi_mapping_genes:
1248 logger.info(f"Multi-mapping examples ({len(multi_mapping_genes)}):")
1249 for orig, targets in multi_mapping_genes[:10]:
1250 logger.info(f" {orig} -> {', '.join(targets)}")
1251
1252 # Log summary of flattened GPRs if any
1253 if stats.get('flattened_or_gprs', 0) > 0:
1254 logger.info(f"Flattened {stats['flattened_or_gprs']} OR-only GPRs that had translation issues (removed parentheses, created unique gene sets)")
1255
1256