diff COBRAxy/utils/model_utils.py @ 456:a6e45049c1b9 draft default tip

Uploaded
author francesco_lapi
date Fri, 12 Sep 2025 17:28:45 +0000
parents 4e2bc80764b6
children
line wrap: on
line diff
--- a/COBRAxy/utils/model_utils.py	Fri Sep 12 15:05:54 2025 +0000
+++ b/COBRAxy/utils/model_utils.py	Fri Sep 12 17:28:45 2025 +0000
@@ -1,14 +1,20 @@
+"""
+Utilities for generating and manipulating COBRA models and related metadata.
+
+This module includes helpers to:
+- extract rules, reactions, bounds, objective coefficients, and compartments
+- build a COBRA model from a tabular file
+- set objective and medium from dataframes
+- validate a model and convert gene identifiers
+- translate model GPRs using mapping tables
+"""
 import os
-import csv
 import cobra
-import pickle
-import argparse
 import pandas as pd
 import re
 import logging
 from typing import Optional, Tuple, Union, List, Dict, Set
 from collections import defaultdict
-import utils.general_utils as utils
 import utils.rule_parsing  as rulesUtils
 import utils.reaction_parsing as reactionUtils
 from cobra import Model as cobraModel, Reaction, Metabolite
@@ -17,19 +23,16 @@
 ReactionId = str
 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]:
     """
-    Generates a dictionary mapping reaction ids to rules from the model.
+    Generate a dictionary mapping reaction IDs to GPR rules from the model.
 
     Args:
-        model : the model to derive data from.
-        asParsed : if True parses the rules to an optimized runtime format, otherwise leaves them as strings.
+        model: COBRA model to derive data from.
+        asParsed: If True, parse rules into a nested list structure; otherwise keep raw strings.
 
     Returns:
-        Dict[ReactionId, rulesUtils.OpList] : the generated dictionary of parsed rules.
-        Dict[ReactionId, str] : the generated dictionary of raw rules.
+        Dict[ReactionId, rulesUtils.OpList]: Parsed rules by reaction ID.
+        Dict[ReactionId, str]: Raw rules by reaction ID.
     """
-    # Is the below approach convoluted? yes
-    # Ok but is it inefficient? probably
-    # Ok but at least I don't have to repeat the check at every rule (I'm clinically insane)
     _ruleGetter   =  lambda reaction : reaction.gene_reaction_rule
     ruleExtractor = (lambda reaction :
         rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter
@@ -41,14 +44,14 @@
 
 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]:
     """
-    Generates a dictionary mapping reaction ids to reaction formulas from the model.
+    Generate a dictionary mapping reaction IDs to reaction formulas from the model.
 
     Args:
-        model : the model to derive data from.
-        asParsed : if True parses the reactions to an optimized runtime format, otherwise leaves them as they are.
+        model: COBRA model to derive data from.
+        asParsed: If True, convert formulas into a parsed representation; otherwise keep raw strings.
 
     Returns:
-        Dict[ReactionId, str] : the generated dictionary.
+        Dict[ReactionId, str]: Reactions by reaction ID (parsed if requested).
     """
 
     unparsedReactions = {
@@ -62,6 +65,12 @@
     return reactionUtils.create_reaction_dict(unparsedReactions)
 
 def get_medium(model:cobraModel) -> pd.DataFrame:
+    """
+    Extract the uptake reactions representing the model medium.
+
+    Returns a DataFrame with a single column 'reaction' listing exchange reactions
+    with negative lower bound and no positive stoichiometric coefficients (uptake only).
+    """
     trueMedium=[]
     for r in model.reactions:
         positiveCoeff=0
@@ -77,16 +86,16 @@
 
 def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame:
     """
-    Estrae i coefficienti della funzione obiettivo per ciascuna reazione del modello.
-    
+    Extract objective coefficients for each reaction.
+
     Args:
-        model : cobra.Model
-        
+        model: COBRA model
+
     Returns:
-        pd.DataFrame con colonne: ReactionID, ObjectiveCoefficient
+        pd.DataFrame with columns: ReactionID, ObjectiveCoefficient
     """
     coeffs = []
-    # model.objective.expression è un'espressione lineare
+    # model.objective.expression is a linear expression
     objective_expr = model.objective.expression.as_coefficients_dict()
     
     for reaction in model.reactions:
@@ -99,6 +108,12 @@
     return pd.DataFrame(coeffs)
 
 def generate_bounds(model:cobraModel) -> pd.DataFrame:
+    """
+    Build a DataFrame of lower/upper bounds for all reactions.
+
+    Returns:
+        pd.DataFrame indexed by reaction IDs with columns ['lower_bound', 'upper_bound'].
+    """
 
     rxns = []
     for reaction in model.reactions:
@@ -160,45 +175,38 @@
 
 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel:
     """
-    Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni.
-    
+    Build a COBRApy model from a tabular file with reaction data.
+
     Args:
-        csv_path: Path al file CSV (separato da tab)
-        model_id: ID del modello da creare
-        
+        csv_path: Path to the tab-separated file.
+        model_id: ID for the newly created model.
+
     Returns:
-        cobra.Model: Il modello COBRApy costruito
+        cobra.Model: The constructed COBRApy model.
     """
     
-    # Leggi i dati dal CSV
     df = pd.read_csv(csv_path, sep='\t')
     
-    # Crea il modello vuoto
     model = cobraModel(model_id)
     
-    # Dict per tenere traccia di metaboliti e compartimenti
     metabolites_dict = {}
     compartments_dict = {}
     
-    print(f"Costruendo modello da {len(df)} reazioni...")
+    print(f"Building model from {len(df)} reactions...")
     
-    # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni
     for idx, row in df.iterrows():
         reaction_formula = str(row['Formula']).strip()
         if not reaction_formula or reaction_formula == 'nan':
             continue
             
-        # Estrai metaboliti dalla formula della reazione
         metabolites = extract_metabolites_from_reaction(reaction_formula)
         
         for met_id in metabolites:
             compartment = extract_compartment_from_metabolite(met_id)
             
-            # Aggiungi compartimento se non esiste
             if compartment not in compartments_dict:
                 compartments_dict[compartment] = compartment
             
-            # Aggiungi metabolita se non esiste
             if met_id not in metabolites_dict:
                 metabolites_dict[met_id] = Metabolite(
                     id=met_id,
@@ -206,15 +214,12 @@
                     name=met_id.replace(f"_{compartment}", "").replace("__", "_")
                 )
     
-    # Aggiungi compartimenti al modello
     model.compartments = compartments_dict
     
-    # Aggiungi metaboliti al modello  
     model.add_metabolites(list(metabolites_dict.values()))
     
-    print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti")
+    print(f"Added {len(metabolites_dict)} metabolites and {len(compartments_dict)} compartments")
     
-    # Seconda passata: aggiungi le reazioni
     reactions_added = 0
     reactions_skipped = 0
     
@@ -223,44 +228,37 @@
         reaction_id = str(row['ReactionID']).strip()
         reaction_formula = str(row['Formula']).strip()
         
-        # Salta reazioni senza formula
         if not reaction_formula or reaction_formula == 'nan':
-            raise ValueError(f"Formula della reazione mancante {reaction_id}")
+            raise ValueError(f"Missing reaction formula for {reaction_id}")
         
-        # Crea la reazione
         reaction = Reaction(reaction_id)
         reaction.name = reaction_id
         
-        # Imposta bounds
         reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0
         reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0
         
-        # Aggiungi gene rule se presente
         if pd.notna(row['GPR']) and str(row['GPR']).strip():
             reaction.gene_reaction_rule = str(row['GPR']).strip()
         
-        # Parse della formula della reazione
         try:
             parse_reaction_formula(reaction, reaction_formula, metabolites_dict)
         except Exception as e:
-            print(f"Errore nel parsing della reazione {reaction_id}: {e}")
+            print(f"Error parsing reaction {reaction_id}: {e}")
             reactions_skipped += 1
             continue
         
-        # Aggiungi la reazione al modello
         model.add_reactions([reaction])
         reactions_added += 1
             
     
-    print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni")
+    print(f"Added {reactions_added} reactions, skipped {reactions_skipped} reactions")
     
     # set objective function
     set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient")
 
-    # Imposta il medium
     set_medium_from_data(model, df)
     
-    print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti")
+    print(f"Model completed: {len(model.reactions)} reactions, {len(model.metabolites)} metabolites")
     
     return model
 
@@ -268,12 +266,12 @@
 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore)
 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
     """
-    Estrae gli ID dei metaboliti da una formula di reazione.
-    Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e)
-    e permette che comincino con cifre o underscore.
+    Extract metabolite IDs from a reaction formula.
+    Robust pattern: tokens ending with _<compartment> (e.g., _c, _m, _e),
+    allowing leading digits/underscores.
     """
     metabolites = set()
-    # coefficiente opzionale seguito da un token che termina con _<letters>
+    # optional coefficient followed by a token ending with _<letters>
     pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)'
     matches = re.findall(pattern, reaction_formula)
     metabolites.update(matches)
@@ -281,54 +279,39 @@
 
 
 def extract_compartment_from_metabolite(metabolite_id: str) -> str:
-    """
-    Estrae il compartimento dall'ID del metabolita.
-    """
-    # Il compartimento è solitamente l'ultima lettera dopo l'underscore
+    """Extract the compartment from a metabolite ID."""
     if '_' in metabolite_id:
         return metabolite_id.split('_')[-1]
     return 'c'  # default cytoplasm
 
 
 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]):
-    """
-    Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti.
-    """
+    """Parse a reaction formula and set metabolites with their coefficients."""
 
-    if reaction.id == 'EX_thbpt_e':
-        print(reaction.id)
-        print(formula)
-    # Dividi in parte sinistra e destra
     if '<=>' in formula:
         left, right = formula.split('<=>')
         reversible = True
     elif '<--' in formula:
         left, right = formula.split('<--')
         reversible = False
-        left, right = left, right
     elif '-->' in formula:
         left, right = formula.split('-->')
         reversible = False
     elif '<-' in formula:
         left, right = formula.split('<-')
         reversible = False
-        left, right = left, right
     else:
-        raise ValueError(f"Formato reazione non riconosciuto: {formula}")
+        raise ValueError(f"Unrecognized reaction format: {formula}")
     
-    # Parse dei metaboliti e coefficienti
     reactants = parse_metabolites_side(left.strip())
     products = parse_metabolites_side(right.strip())
     
-    # Aggiungi metaboliti alla reazione
     metabolites_to_add = {}
     
-    # Reagenti (coefficienti negativi)
     for met_id, coeff in reactants.items():
         if met_id in metabolites_dict:
             metabolites_to_add[metabolites_dict[met_id]] = -coeff
     
-    # Prodotti (coefficienti positivi)
     for met_id, coeff in products.items():
         if met_id in metabolites_dict:
             metabolites_to_add[metabolites_dict[met_id]] = coeff
@@ -337,9 +320,7 @@
 
 
 def parse_metabolites_side(side_str: str) -> Dict[str, float]:
-    """
-    Parsa un lato della reazione per estrarre metaboliti e coefficienti.
-    """
+    """Parse one side of a reaction and extract metabolites with coefficients."""
     metabolites = {}
     if not side_str or side_str.strip() == '':
         return metabolites
@@ -350,7 +331,7 @@
         if not term:
             continue
 
-        # pattern allineato: coefficiente opzionale + id che termina con _<compartimento>
+        # optional coefficient + id ending with _<compartment>
         match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term)
         if match:
             coeff_str, met_id = match.groups()
@@ -372,7 +353,6 @@
         reaction_id = str(row['ReactionID']).strip()
         coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0
         if coeff != 0:
-            # Assicuriamoci che la reazione esista nel modello
             if reaction_id in model.reactions:
                 obj_dict[model.reactions.get_by_id(reaction_id)] = coeff
             else:
@@ -381,7 +361,6 @@
     if not obj_dict:
         raise ValueError("No reactions found with non-zero objective coefficient.")
 
-    # Imposta la funzione obiettivo
     model.objective = obj_dict
     print(f"Objective set with {len(obj_dict)} reactions.")
 
@@ -389,9 +368,7 @@
 
 
 def set_medium_from_data(model: cobraModel, df: pd.DataFrame):
-    """
-    Imposta il medium basato sulla colonna InMedium.
-    """
+    """Set the medium based on the 'InMedium' column in the dataframe."""
     medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist()
     
     medium_dict = {}
@@ -403,13 +380,11 @@
     
     if medium_dict:
         model.medium = medium_dict
-        print(f"Medium impostato con {len(medium_dict)} componenti")
+        print(f"Medium set with {len(medium_dict)} components")
 
 
 def validate_model(model: cobraModel) -> Dict[str, any]:
-    """
-    Valida il modello e fornisce statistiche di base.
-    """
+    """Validate the model and return basic statistics."""
     validation = {
         'num_reactions': len(model.reactions),
         'num_metabolites': len(model.metabolites),
@@ -422,7 +397,7 @@
     }
     
     try:
-        # Test di crescita
+        # Growth test
         solution = model.optimize()
         validation['growth_rate'] = solution.objective_value
         validation['status'] = solution.status
@@ -432,7 +407,8 @@
     
     return validation
 
-def convert_genes(model,annotation):
+def convert_genes(model, annotation):
+    """Rename genes using a selected annotation key in gene.notes; returns a model copy."""
     from cobra.manipulation import rename_genes
     model2=model.copy()
     try:
@@ -450,12 +426,12 @@
 
 def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]:
     """
-    Cerca colonne utili e ritorna dict {ensg: colname1, hgnc_id: colname2, ...}
-    Lancia ValueError se non trova almeno un mapping utile.
+    Find useful columns and return a dict {ensg: colname1, hgnc_id: colname2, ...}.
+    Raise ValueError if no suitable mapping is found.
     """
     cols = { _normalize_colname(c): c for c in mapping_df.columns }
     chosen = {}
-    # possibili nomi per ciascuna categoria
+    # candidate names for each category
     candidates = {
         'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'],
         'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'],
@@ -476,13 +452,8 @@
                                 model_source_genes: Optional[Set[str]] = None,
                                 logger: Optional[logging.Logger] = None) -> None:
     """
-    Verifica che, nel mapping_df (eventualmente già filtrato sui source di interesse),
-    ogni target sia associato ad al massimo un source. Se trova target associati a
-    >1 source solleva ValueError mostrando esempi.
-
-    - mapping_df: DataFrame con colonne source_col, target_col
-    - model_source_genes: se fornito, è un set di source normalizzati che stiamo traducendo
-      (se None, si usa tutto mapping_df)
+        Check that, within the filtered mapping_df, each target maps to at most one source.
+        Log examples if duplicates are found.
     """
     if logger is None:
         logger = logging.getLogger(__name__)
@@ -491,12 +462,12 @@
         logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.")
         return
 
-    # normalizza le colonne temporanee per gruppi (senza modificare il df originale)
+    # normalize temporary columns for grouping (without altering the original df)
     tmp = mapping_df[[source_col, target_col]].copy()
     tmp['_src_norm'] = tmp[source_col].astype(str).map(_normalize_gene_id)
     tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip()
 
-    # se è passato un insieme di geni modello, filtra qui (già fatto nella chiamata, ma doppio-check ok)
+    # optionally filter to the set of model source genes
     if model_source_genes is not None:
         tmp = tmp[tmp['_src_norm'].isin(model_source_genes)]
 
@@ -504,27 +475,27 @@
         logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.")
         return
 
-    # costruisci il reverse mapping target -> set(sources)
+    # build reverse mapping: target -> set(sources)
     grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna()))
-    # trova target con più di 1 source
+    # find targets with more than one source
     problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1}
 
     if problematic:
-        # prepara messaggio di errore con esempi (fino a 20)
+    # prepare warning message with examples (limited subset)
         sample_items = list(problematic.items())
         msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."]
         for tgt, sources in sample_items:
             msg_lines.append(f"  - target '{tgt}' <- sources: {', '.join(sources)}")
         full_msg = "\n".join(msg_lines)
-        # loggare e sollevare errore
+    # log warning
         logger.warning(full_msg)
 
-    # se tutto ok
+    # if everything is fine
     logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).")
 
 
 def _normalize_gene_id(g: str) -> str:
-    """Rendi consistente un gene id per l'uso come chiave (rimuove prefissi come 'HGNC:' e strip)."""
+    """Normalize a gene ID for use as a key (removes prefixes like 'HGNC:' and strips)."""
     if g is None:
         return ""
     g = str(g).strip()
@@ -535,30 +506,30 @@
 
 def _simplify_boolean_expression(expr: str) -> str:
     """
-    Semplifica un'espressione booleana rimuovendo duplicati e riducendo ridondanze.
-    Gestisce espressioni con 'and' e 'or'.
+    Simplify a boolean expression by removing duplicates and redundancies.
+    Handles expressions with 'and' and 'or'.
     """
     if not expr or not expr.strip():
         return expr
     
-    # Normalizza gli operatori
+    # normalize operators
     expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
     
-    # Funzione ricorsiva per processare le espressioni
+    # recursive helper to process expressions
     def process_expression(s: str) -> str:
         s = s.strip()
         if not s:
             return s
             
-        # Gestisci le parentesi
+    # handle parentheses
         while '(' in s:
-            # Trova la parentesi più interna
+            # find the innermost parentheses
             start = -1
             for i, c in enumerate(s):
                 if c == '(':
                     start = i
                 elif c == ')' and start != -1:
-                    # Processa il contenuto tra parentesi
+                    # process inner content
                     inner = s[start+1:i]
                     processed_inner = process_expression(inner)
                     s = s[:start] + processed_inner + s[i+1:]
@@ -566,7 +537,7 @@
             else:
                 break
         
-        # Dividi per 'or' al livello più alto
+    # split by 'or' at top level
         or_parts = []
         current_part = ""
         paren_count = 0
@@ -590,10 +561,10 @@
         if current_part.strip():
             or_parts.append(current_part.strip())
         
-        # Processa ogni parte OR
+    # process each OR part
         processed_or_parts = []
         for or_part in or_parts:
-            # Dividi per 'and' all'interno di ogni parte OR
+            # split by 'and' within each OR part
             and_parts = []
             current_and = ""
             paren_count = 0
@@ -617,7 +588,7 @@
             if current_and.strip():
                 and_parts.append(current_and.strip())
             
-            # Rimuovi duplicati nelle parti AND
+            # deduplicate AND parts
             unique_and_parts = list(dict.fromkeys(and_parts))  # mantiene l'ordine
             
             if len(unique_and_parts) == 1:
@@ -625,7 +596,7 @@
             elif len(unique_and_parts) > 1:
                 processed_or_parts.append(" and ".join(unique_and_parts))
         
-        # Rimuovi duplicati nelle parti OR
+    # deduplicate OR parts
         unique_or_parts = list(dict.fromkeys(processed_or_parts))
         
         if len(unique_or_parts) == 1:
@@ -638,7 +609,7 @@
     try:
         return process_expression(expr)
     except Exception:
-        # Se la semplificazione fallisce, ritorna l'espressione originale
+    # if simplification fails, return the original expression
         return expr
 
 # ---------- Main public function ----------
@@ -649,13 +620,16 @@
                          allow_many_to_one: bool = False,
                          logger: Optional[logging.Logger] = None) -> 'cobra.Model':
     """
-    Translate model genes from source_nomenclature to target_nomenclature.
-    mapping_df should contain at least columns that allow the mapping
-    (e.g. ensg, hgnc_id, hgnc_symbol, entrez).
-    
+    Translate model genes from source_nomenclature to target_nomenclature using a mapping table.
+    mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez).
+
     Args:
-        allow_many_to_one: Se True, permette che più source vengano mappati allo stesso target
-                          e gestisce i duplicati nelle GPR. Se False, valida l'unicità dei target.
+        model: COBRA model to translate.
+        mapping_df: DataFrame containing the mapping information.
+        target_nomenclature: Desired target key (e.g., 'hgnc_symbol').
+        source_nomenclature: Current source key in the model (default 'hgnc_id').
+        allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs.
+        logger: Optional logger.
     """
     if logger is None:
         logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -711,11 +685,9 @@
 
     filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy()
 
-    # Se non ci sono righe rilevanti, avvisa
     if filtered_map.empty:
         logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).")
 
-    # --- VALIDAZIONE: opzionale in base al parametro allow_many_to_one ---
     if not allow_many_to_one:
         _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger)
 
@@ -739,7 +711,6 @@
         if gpr and gpr.strip():
             new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger)
             if new_gpr != gpr:
-                # Semplifica l'espressione booleana per rimuovere duplicati
                 simplified_gpr = _simplify_boolean_expression(new_gpr)
                 if simplified_gpr != new_gpr:
                     stats['simplified_gprs'] += 1