Mercurial > repos > bimib > cobraxy
diff COBRAxy/utils/model_utils.py @ 456:a6e45049c1b9 draft default tip
Uploaded
author | francesco_lapi |
---|---|
date | Fri, 12 Sep 2025 17:28:45 +0000 |
parents | 4e2bc80764b6 |
children |
line wrap: on
line diff
--- a/COBRAxy/utils/model_utils.py Fri Sep 12 15:05:54 2025 +0000 +++ b/COBRAxy/utils/model_utils.py Fri Sep 12 17:28:45 2025 +0000 @@ -1,14 +1,20 @@ +""" +Utilities for generating and manipulating COBRA models and related metadata. + +This module includes helpers to: +- extract rules, reactions, bounds, objective coefficients, and compartments +- build a COBRA model from a tabular file +- set objective and medium from dataframes +- validate a model and convert gene identifiers +- translate model GPRs using mapping tables +""" import os -import csv import cobra -import pickle -import argparse import pandas as pd import re import logging from typing import Optional, Tuple, Union, List, Dict, Set from collections import defaultdict -import utils.general_utils as utils import utils.rule_parsing as rulesUtils import utils.reaction_parsing as reactionUtils from cobra import Model as cobraModel, Reaction, Metabolite @@ -17,19 +23,16 @@ ReactionId = str def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: """ - Generates a dictionary mapping reaction ids to rules from the model. + Generate a dictionary mapping reaction IDs to GPR rules from the model. Args: - model : the model to derive data from. - asParsed : if True parses the rules to an optimized runtime format, otherwise leaves them as strings. + model: COBRA model to derive data from. + asParsed: If True, parse rules into a nested list structure; otherwise keep raw strings. Returns: - Dict[ReactionId, rulesUtils.OpList] : the generated dictionary of parsed rules. - Dict[ReactionId, str] : the generated dictionary of raw rules. + Dict[ReactionId, rulesUtils.OpList]: Parsed rules by reaction ID. + Dict[ReactionId, str]: Raw rules by reaction ID. """ - # Is the below approach convoluted? yes - # Ok but is it inefficient? probably - # Ok but at least I don't have to repeat the check at every rule (I'm clinically insane) _ruleGetter = lambda reaction : reaction.gene_reaction_rule ruleExtractor = (lambda reaction : rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter @@ -41,14 +44,14 @@ def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]: """ - Generates a dictionary mapping reaction ids to reaction formulas from the model. + Generate a dictionary mapping reaction IDs to reaction formulas from the model. Args: - model : the model to derive data from. - asParsed : if True parses the reactions to an optimized runtime format, otherwise leaves them as they are. + model: COBRA model to derive data from. + asParsed: If True, convert formulas into a parsed representation; otherwise keep raw strings. Returns: - Dict[ReactionId, str] : the generated dictionary. + Dict[ReactionId, str]: Reactions by reaction ID (parsed if requested). """ unparsedReactions = { @@ -62,6 +65,12 @@ return reactionUtils.create_reaction_dict(unparsedReactions) def get_medium(model:cobraModel) -> pd.DataFrame: + """ + Extract the uptake reactions representing the model medium. + + Returns a DataFrame with a single column 'reaction' listing exchange reactions + with negative lower bound and no positive stoichiometric coefficients (uptake only). + """ trueMedium=[] for r in model.reactions: positiveCoeff=0 @@ -77,16 +86,16 @@ def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame: """ - Estrae i coefficienti della funzione obiettivo per ciascuna reazione del modello. - + Extract objective coefficients for each reaction. + Args: - model : cobra.Model - + model: COBRA model + Returns: - pd.DataFrame con colonne: ReactionID, ObjectiveCoefficient + pd.DataFrame with columns: ReactionID, ObjectiveCoefficient """ coeffs = [] - # model.objective.expression è un'espressione lineare + # model.objective.expression is a linear expression objective_expr = model.objective.expression.as_coefficients_dict() for reaction in model.reactions: @@ -99,6 +108,12 @@ return pd.DataFrame(coeffs) def generate_bounds(model:cobraModel) -> pd.DataFrame: + """ + Build a DataFrame of lower/upper bounds for all reactions. + + Returns: + pd.DataFrame indexed by reaction IDs with columns ['lower_bound', 'upper_bound']. + """ rxns = [] for reaction in model.reactions: @@ -160,45 +175,38 @@ def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel: """ - Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni. - + Build a COBRApy model from a tabular file with reaction data. + Args: - csv_path: Path al file CSV (separato da tab) - model_id: ID del modello da creare - + csv_path: Path to the tab-separated file. + model_id: ID for the newly created model. + Returns: - cobra.Model: Il modello COBRApy costruito + cobra.Model: The constructed COBRApy model. """ - # Leggi i dati dal CSV df = pd.read_csv(csv_path, sep='\t') - # Crea il modello vuoto model = cobraModel(model_id) - # Dict per tenere traccia di metaboliti e compartimenti metabolites_dict = {} compartments_dict = {} - print(f"Costruendo modello da {len(df)} reazioni...") + print(f"Building model from {len(df)} reactions...") - # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni for idx, row in df.iterrows(): reaction_formula = str(row['Formula']).strip() if not reaction_formula or reaction_formula == 'nan': continue - # Estrai metaboliti dalla formula della reazione metabolites = extract_metabolites_from_reaction(reaction_formula) for met_id in metabolites: compartment = extract_compartment_from_metabolite(met_id) - # Aggiungi compartimento se non esiste if compartment not in compartments_dict: compartments_dict[compartment] = compartment - # Aggiungi metabolita se non esiste if met_id not in metabolites_dict: metabolites_dict[met_id] = Metabolite( id=met_id, @@ -206,15 +214,12 @@ name=met_id.replace(f"_{compartment}", "").replace("__", "_") ) - # Aggiungi compartimenti al modello model.compartments = compartments_dict - # Aggiungi metaboliti al modello model.add_metabolites(list(metabolites_dict.values())) - print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti") + print(f"Added {len(metabolites_dict)} metabolites and {len(compartments_dict)} compartments") - # Seconda passata: aggiungi le reazioni reactions_added = 0 reactions_skipped = 0 @@ -223,44 +228,37 @@ reaction_id = str(row['ReactionID']).strip() reaction_formula = str(row['Formula']).strip() - # Salta reazioni senza formula if not reaction_formula or reaction_formula == 'nan': - raise ValueError(f"Formula della reazione mancante {reaction_id}") + raise ValueError(f"Missing reaction formula for {reaction_id}") - # Crea la reazione reaction = Reaction(reaction_id) reaction.name = reaction_id - # Imposta bounds reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 - # Aggiungi gene rule se presente if pd.notna(row['GPR']) and str(row['GPR']).strip(): reaction.gene_reaction_rule = str(row['GPR']).strip() - # Parse della formula della reazione try: parse_reaction_formula(reaction, reaction_formula, metabolites_dict) except Exception as e: - print(f"Errore nel parsing della reazione {reaction_id}: {e}") + print(f"Error parsing reaction {reaction_id}: {e}") reactions_skipped += 1 continue - # Aggiungi la reazione al modello model.add_reactions([reaction]) reactions_added += 1 - print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni") + print(f"Added {reactions_added} reactions, skipped {reactions_skipped} reactions") # set objective function set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient") - # Imposta il medium set_medium_from_data(model, df) - print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti") + print(f"Model completed: {len(model.reactions)} reactions, {len(model.metabolites)} metabolites") return model @@ -268,12 +266,12 @@ # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore) def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: """ - Estrae gli ID dei metaboliti da una formula di reazione. - Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e) - e permette che comincino con cifre o underscore. + Extract metabolite IDs from a reaction formula. + Robust pattern: tokens ending with _<compartment> (e.g., _c, _m, _e), + allowing leading digits/underscores. """ metabolites = set() - # coefficiente opzionale seguito da un token che termina con _<letters> + # optional coefficient followed by a token ending with _<letters> pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)' matches = re.findall(pattern, reaction_formula) metabolites.update(matches) @@ -281,54 +279,39 @@ def extract_compartment_from_metabolite(metabolite_id: str) -> str: - """ - Estrae il compartimento dall'ID del metabolita. - """ - # Il compartimento è solitamente l'ultima lettera dopo l'underscore + """Extract the compartment from a metabolite ID.""" if '_' in metabolite_id: return metabolite_id.split('_')[-1] return 'c' # default cytoplasm def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]): - """ - Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti. - """ + """Parse a reaction formula and set metabolites with their coefficients.""" - if reaction.id == 'EX_thbpt_e': - print(reaction.id) - print(formula) - # Dividi in parte sinistra e destra if '<=>' in formula: left, right = formula.split('<=>') reversible = True elif '<--' in formula: left, right = formula.split('<--') reversible = False - left, right = left, right elif '-->' in formula: left, right = formula.split('-->') reversible = False elif '<-' in formula: left, right = formula.split('<-') reversible = False - left, right = left, right else: - raise ValueError(f"Formato reazione non riconosciuto: {formula}") + raise ValueError(f"Unrecognized reaction format: {formula}") - # Parse dei metaboliti e coefficienti reactants = parse_metabolites_side(left.strip()) products = parse_metabolites_side(right.strip()) - # Aggiungi metaboliti alla reazione metabolites_to_add = {} - # Reagenti (coefficienti negativi) for met_id, coeff in reactants.items(): if met_id in metabolites_dict: metabolites_to_add[metabolites_dict[met_id]] = -coeff - # Prodotti (coefficienti positivi) for met_id, coeff in products.items(): if met_id in metabolites_dict: metabolites_to_add[metabolites_dict[met_id]] = coeff @@ -337,9 +320,7 @@ def parse_metabolites_side(side_str: str) -> Dict[str, float]: - """ - Parsa un lato della reazione per estrarre metaboliti e coefficienti. - """ + """Parse one side of a reaction and extract metabolites with coefficients.""" metabolites = {} if not side_str or side_str.strip() == '': return metabolites @@ -350,7 +331,7 @@ if not term: continue - # pattern allineato: coefficiente opzionale + id che termina con _<compartimento> + # optional coefficient + id ending with _<compartment> match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term) if match: coeff_str, met_id = match.groups() @@ -372,7 +353,6 @@ reaction_id = str(row['ReactionID']).strip() coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0 if coeff != 0: - # Assicuriamoci che la reazione esista nel modello if reaction_id in model.reactions: obj_dict[model.reactions.get_by_id(reaction_id)] = coeff else: @@ -381,7 +361,6 @@ if not obj_dict: raise ValueError("No reactions found with non-zero objective coefficient.") - # Imposta la funzione obiettivo model.objective = obj_dict print(f"Objective set with {len(obj_dict)} reactions.") @@ -389,9 +368,7 @@ def set_medium_from_data(model: cobraModel, df: pd.DataFrame): - """ - Imposta il medium basato sulla colonna InMedium. - """ + """Set the medium based on the 'InMedium' column in the dataframe.""" medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist() medium_dict = {} @@ -403,13 +380,11 @@ if medium_dict: model.medium = medium_dict - print(f"Medium impostato con {len(medium_dict)} componenti") + print(f"Medium set with {len(medium_dict)} components") def validate_model(model: cobraModel) -> Dict[str, any]: - """ - Valida il modello e fornisce statistiche di base. - """ + """Validate the model and return basic statistics.""" validation = { 'num_reactions': len(model.reactions), 'num_metabolites': len(model.metabolites), @@ -422,7 +397,7 @@ } try: - # Test di crescita + # Growth test solution = model.optimize() validation['growth_rate'] = solution.objective_value validation['status'] = solution.status @@ -432,7 +407,8 @@ return validation -def convert_genes(model,annotation): +def convert_genes(model, annotation): + """Rename genes using a selected annotation key in gene.notes; returns a model copy.""" from cobra.manipulation import rename_genes model2=model.copy() try: @@ -450,12 +426,12 @@ def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]: """ - Cerca colonne utili e ritorna dict {ensg: colname1, hgnc_id: colname2, ...} - Lancia ValueError se non trova almeno un mapping utile. + Find useful columns and return a dict {ensg: colname1, hgnc_id: colname2, ...}. + Raise ValueError if no suitable mapping is found. """ cols = { _normalize_colname(c): c for c in mapping_df.columns } chosen = {} - # possibili nomi per ciascuna categoria + # candidate names for each category candidates = { 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'], 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'], @@ -476,13 +452,8 @@ model_source_genes: Optional[Set[str]] = None, logger: Optional[logging.Logger] = None) -> None: """ - Verifica che, nel mapping_df (eventualmente già filtrato sui source di interesse), - ogni target sia associato ad al massimo un source. Se trova target associati a - >1 source solleva ValueError mostrando esempi. - - - mapping_df: DataFrame con colonne source_col, target_col - - model_source_genes: se fornito, è un set di source normalizzati che stiamo traducendo - (se None, si usa tutto mapping_df) + Check that, within the filtered mapping_df, each target maps to at most one source. + Log examples if duplicates are found. """ if logger is None: logger = logging.getLogger(__name__) @@ -491,12 +462,12 @@ logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.") return - # normalizza le colonne temporanee per gruppi (senza modificare il df originale) + # normalize temporary columns for grouping (without altering the original df) tmp = mapping_df[[source_col, target_col]].copy() tmp['_src_norm'] = tmp[source_col].astype(str).map(_normalize_gene_id) tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip() - # se è passato un insieme di geni modello, filtra qui (già fatto nella chiamata, ma doppio-check ok) + # optionally filter to the set of model source genes if model_source_genes is not None: tmp = tmp[tmp['_src_norm'].isin(model_source_genes)] @@ -504,27 +475,27 @@ logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.") return - # costruisci il reverse mapping target -> set(sources) + # build reverse mapping: target -> set(sources) grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna())) - # trova target con più di 1 source + # find targets with more than one source problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1} if problematic: - # prepara messaggio di errore con esempi (fino a 20) + # prepare warning message with examples (limited subset) sample_items = list(problematic.items()) msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."] for tgt, sources in sample_items: msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}") full_msg = "\n".join(msg_lines) - # loggare e sollevare errore + # log warning logger.warning(full_msg) - # se tutto ok + # if everything is fine logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).") def _normalize_gene_id(g: str) -> str: - """Rendi consistente un gene id per l'uso come chiave (rimuove prefissi come 'HGNC:' e strip).""" + """Normalize a gene ID for use as a key (removes prefixes like 'HGNC:' and strips).""" if g is None: return "" g = str(g).strip() @@ -535,30 +506,30 @@ def _simplify_boolean_expression(expr: str) -> str: """ - Semplifica un'espressione booleana rimuovendo duplicati e riducendo ridondanze. - Gestisce espressioni con 'and' e 'or'. + Simplify a boolean expression by removing duplicates and redundancies. + Handles expressions with 'and' and 'or'. """ if not expr or not expr.strip(): return expr - # Normalizza gli operatori + # normalize operators expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') - # Funzione ricorsiva per processare le espressioni + # recursive helper to process expressions def process_expression(s: str) -> str: s = s.strip() if not s: return s - # Gestisci le parentesi + # handle parentheses while '(' in s: - # Trova la parentesi più interna + # find the innermost parentheses start = -1 for i, c in enumerate(s): if c == '(': start = i elif c == ')' and start != -1: - # Processa il contenuto tra parentesi + # process inner content inner = s[start+1:i] processed_inner = process_expression(inner) s = s[:start] + processed_inner + s[i+1:] @@ -566,7 +537,7 @@ else: break - # Dividi per 'or' al livello più alto + # split by 'or' at top level or_parts = [] current_part = "" paren_count = 0 @@ -590,10 +561,10 @@ if current_part.strip(): or_parts.append(current_part.strip()) - # Processa ogni parte OR + # process each OR part processed_or_parts = [] for or_part in or_parts: - # Dividi per 'and' all'interno di ogni parte OR + # split by 'and' within each OR part and_parts = [] current_and = "" paren_count = 0 @@ -617,7 +588,7 @@ if current_and.strip(): and_parts.append(current_and.strip()) - # Rimuovi duplicati nelle parti AND + # deduplicate AND parts unique_and_parts = list(dict.fromkeys(and_parts)) # mantiene l'ordine if len(unique_and_parts) == 1: @@ -625,7 +596,7 @@ elif len(unique_and_parts) > 1: processed_or_parts.append(" and ".join(unique_and_parts)) - # Rimuovi duplicati nelle parti OR + # deduplicate OR parts unique_or_parts = list(dict.fromkeys(processed_or_parts)) if len(unique_or_parts) == 1: @@ -638,7 +609,7 @@ try: return process_expression(expr) except Exception: - # Se la semplificazione fallisce, ritorna l'espressione originale + # if simplification fails, return the original expression return expr # ---------- Main public function ---------- @@ -649,13 +620,16 @@ allow_many_to_one: bool = False, logger: Optional[logging.Logger] = None) -> 'cobra.Model': """ - Translate model genes from source_nomenclature to target_nomenclature. - mapping_df should contain at least columns that allow the mapping - (e.g. ensg, hgnc_id, hgnc_symbol, entrez). - + Translate model genes from source_nomenclature to target_nomenclature using a mapping table. + mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez). + Args: - allow_many_to_one: Se True, permette che più source vengano mappati allo stesso target - e gestisce i duplicati nelle GPR. Se False, valida l'unicità dei target. + model: COBRA model to translate. + mapping_df: DataFrame containing the mapping information. + target_nomenclature: Desired target key (e.g., 'hgnc_symbol'). + source_nomenclature: Current source key in the model (default 'hgnc_id'). + allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs. + logger: Optional logger. """ if logger is None: logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @@ -711,11 +685,9 @@ filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy() - # Se non ci sono righe rilevanti, avvisa if filtered_map.empty: logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).") - # --- VALIDAZIONE: opzionale in base al parametro allow_many_to_one --- if not allow_many_to_one: _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger) @@ -739,7 +711,6 @@ if gpr and gpr.strip(): new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger) if new_gpr != gpr: - # Semplifica l'espressione booleana per rimuovere duplicati simplified_gpr = _simplify_boolean_expression(new_gpr) if simplified_gpr != new_gpr: stats['simplified_gprs'] += 1