diff COBRAxy/utils/general_utils.py @ 408:f413b78d61bf draft

Uploaded
author francesco_lapi
date Mon, 08 Sep 2025 17:12:35 +0000
parents a0b53ccc73a8
children 71850bdf9e1e
line wrap: on
line diff
--- a/COBRAxy/utils/general_utils.py	Mon Sep 08 16:52:46 2025 +0000
+++ b/COBRAxy/utils/general_utils.py	Mon Sep 08 17:12:35 2025 +0000
@@ -7,10 +7,11 @@
 
 from enum import Enum
 from itertools import count
-from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union
+from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Set, Tuple
 
 import pandas as pd
 import cobra
+from cobra import Model, Reaction, Metabolite
 
 import zipfile
 import gzip
@@ -713,4 +714,270 @@
         return -1
     rename_genes(model2,dict_genes)
 
-    return model2
\ No newline at end of file
+    return model2
+
+
+def build_cobra_model_from_csv(csv_path: str, model_id: str = "ENGRO2_custom") -> cobra.Model:
+    """
+    Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni.
+    
+    Args:
+        csv_path: Path al file CSV (separato da tab)
+        model_id: ID del modello da creare
+        
+    Returns:
+        cobra.Model: Il modello COBRApy costruito
+    """
+    
+    # Leggi i dati dal CSV
+    df = pd.read_csv(csv_path, sep='\t')
+    
+    # Crea il modello vuoto
+    model = Model(model_id)
+    
+    # Dict per tenere traccia di metaboliti e compartimenti
+    metabolites_dict = {}
+    compartments_dict = {}
+    
+    print(f"Costruendo modello da {len(df)} reazioni...")
+    
+    # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni
+    for idx, row in df.iterrows():
+        reaction_formula = str(row['Reaction']).strip()
+        if not reaction_formula or reaction_formula == 'nan':
+            continue
+            
+        # Estrai metaboliti dalla formula della reazione
+        metabolites = extract_metabolites_from_reaction(reaction_formula)
+        
+        for met_id in metabolites:
+            compartment = extract_compartment_from_metabolite(met_id)
+            
+            # Aggiungi compartimento se non esiste
+            if compartment not in compartments_dict:
+                compartments_dict[compartment] = compartment
+            
+            # Aggiungi metabolita se non esiste
+            if met_id not in metabolites_dict:
+                metabolites_dict[met_id] = Metabolite(
+                    id=met_id,
+                    compartment=compartment,
+                    name=met_id.replace(f"_{compartment}", "").replace("__", "_")
+                )
+    
+    # Aggiungi compartimenti al modello
+    model.compartments = compartments_dict
+    
+    # Aggiungi metaboliti al modello  
+    model.add_metabolites(list(metabolites_dict.values()))
+    
+    print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti")
+    
+    # Seconda passata: aggiungi le reazioni
+    reactions_added = 0
+    reactions_skipped = 0
+    
+    for idx, row in df.iterrows():
+        try:
+            reaction_id = str(row['ReactionID']).strip()
+            reaction_formula = str(row['Reaction']).strip()
+            
+            # Salta reazioni senza formula
+            if not reaction_formula or reaction_formula == 'nan':
+                reactions_skipped += 1
+                continue
+            
+            # Crea la reazione
+            reaction = Reaction(reaction_id)
+            reaction.name = reaction_id
+            
+            # Imposta bounds
+            reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0
+            reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0
+            
+            # Aggiungi gene rule se presente
+            if pd.notna(row['Rule']) and str(row['Rule']).strip():
+                reaction.gene_reaction_rule = str(row['Rule']).strip()
+            
+            # Parse della formula della reazione
+            try:
+                parse_reaction_formula(reaction, reaction_formula, metabolites_dict)
+            except Exception as e:
+                print(f"Errore nel parsing della reazione {reaction_id}: {e}")
+                reactions_skipped += 1
+                continue
+            
+            # Aggiungi la reazione al modello
+            model.add_reactions([reaction])
+            reactions_added += 1
+            
+        except Exception as e:
+            print(f"Errore nell'aggiungere la reazione {reaction_id}: {e}")
+            reactions_skipped += 1
+            continue
+    
+    print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni")
+    
+    # Imposta l'obiettivo di biomassa
+    set_biomass_objective(model)
+    
+    # Imposta il medium
+    set_medium_from_data(model, df)
+    
+    print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti")
+    
+    return model
+
+
+# Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore)
+def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
+    """
+    Estrae gli ID dei metaboliti da una formula di reazione.
+    Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e)
+    e permette che comincino con cifre o underscore.
+    """
+    metabolites = set()
+    # coefficiente opzionale seguito da un token che termina con _<letters>
+    pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)'
+    matches = re.findall(pattern, reaction_formula)
+    metabolites.update(matches)
+    return metabolites
+
+
+def extract_compartment_from_metabolite(metabolite_id: str) -> str:
+    """
+    Estrae il compartimento dall'ID del metabolita.
+    """
+    # Il compartimento รจ solitamente l'ultima lettera dopo l'underscore
+    if '_' in metabolite_id:
+        return metabolite_id.split('_')[-1]
+    return 'c'  # default cytoplasm
+
+
+def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]):
+    """
+    Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti.
+    """
+
+    if reaction.id == 'EX_thbpt_e':
+        print(reaction.id)
+        print(formula)
+    # Dividi in parte sinistra e destra
+    if '<=>' in formula:
+        left, right = formula.split('<=>')
+        reversible = True
+    elif '<--' in formula:
+        left, right = formula.split('<--')
+        reversible = False
+        left, right = left, right
+    elif '-->' in formula:
+        left, right = formula.split('-->')
+        reversible = False
+    elif '<-' in formula:
+        left, right = formula.split('<-')
+        reversible = False
+        left, right = left, right
+    else:
+        raise ValueError(f"Formato reazione non riconosciuto: {formula}")
+    
+    # Parse dei metaboliti e coefficienti
+    reactants = parse_metabolites_side(left.strip())
+    products = parse_metabolites_side(right.strip())
+    
+    # Aggiungi metaboliti alla reazione
+    metabolites_to_add = {}
+    
+    # Reagenti (coefficienti negativi)
+    for met_id, coeff in reactants.items():
+        if met_id in metabolites_dict:
+            metabolites_to_add[metabolites_dict[met_id]] = -coeff
+    
+    # Prodotti (coefficienti positivi)
+    for met_id, coeff in products.items():
+        if met_id in metabolites_dict:
+            metabolites_to_add[metabolites_dict[met_id]] = coeff
+    
+    reaction.add_metabolites(metabolites_to_add)
+
+
+def parse_metabolites_side(side_str: str) -> Dict[str, float]:
+    """
+    Parsa un lato della reazione per estrarre metaboliti e coefficienti.
+    """
+    metabolites = {}
+    if not side_str or side_str.strip() == '':
+        return metabolites
+
+    terms = side_str.split('+')
+    for term in terms:
+        term = term.strip()
+        if not term:
+            continue
+
+        # pattern allineato: coefficiente opzionale + id che termina con _<compartimento>
+        match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term)
+        if match:
+            coeff_str, met_id = match.groups()
+            coeff = float(coeff_str) if coeff_str else 1.0
+            metabolites[met_id] = coeff
+
+    return metabolites
+
+
+
+def set_biomass_objective(model: Model):
+    """
+    Imposta la reazione di biomassa come obiettivo.
+    """
+    biomass_reactions = [r for r in model.reactions if 'biomass' in r.id.lower()]
+    
+    if biomass_reactions:
+        model.objective = biomass_reactions[0].id
+        print(f"Obiettivo impostato su: {biomass_reactions[0].id}")
+    else:
+        print("Nessuna reazione di biomassa trovata")
+
+
+def set_medium_from_data(model: Model, df: pd.DataFrame):
+    """
+    Imposta il medium basato sulla colonna InMedium.
+    """
+    medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist()
+    
+    medium_dict = {}
+    for rxn_id in medium_reactions:
+        if rxn_id in [r.id for r in model.reactions]:
+            reaction = model.reactions.get_by_id(rxn_id)
+            if reaction.lower_bound < 0:  # Solo reazioni di uptake
+                medium_dict[rxn_id] = abs(reaction.lower_bound)
+    
+    if medium_dict:
+        model.medium = medium_dict
+        print(f"Medium impostato con {len(medium_dict)} componenti")
+
+
+def validate_model(model: Model) -> Dict[str, any]:
+    """
+    Valida il modello e fornisce statistiche di base.
+    """
+    validation = {
+        'num_reactions': len(model.reactions),
+        'num_metabolites': len(model.metabolites),
+        'num_genes': len(model.genes),
+        'num_compartments': len(model.compartments),
+        'objective': str(model.objective),
+        'medium_size': len(model.medium),
+        'reversible_reactions': len([r for r in model.reactions if r.reversibility]),
+        'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]),
+    }
+    
+    try:
+        # Test di crescita
+        solution = model.optimize()
+        validation['growth_rate'] = solution.objective_value
+        validation['status'] = solution.status
+    except Exception as e:
+        validation['growth_rate'] = None
+        validation['status'] = f"Error: {e}"
+    
+    return validation