diff COBRAxy/src/utils/model_utils.py @ 539:2fb97466e404 draft

Uploaded
author francesco_lapi
date Sat, 25 Oct 2025 14:55:13 +0000
parents
children fcdbc81feb45
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/COBRAxy/src/utils/model_utils.py	Sat Oct 25 14:55:13 2025 +0000
@@ -0,0 +1,1256 @@
+"""
+Utilities for generating and manipulating COBRA models and related metadata.
+
+This module includes helpers to:
+- extract rules, reactions, bounds, objective coefficients, and compartments
+- build a COBRA model from a tabular file
+- set objective and medium from dataframes
+- validate a model and convert gene identifiers
+- translate model GPRs using mapping tables
+"""
+import os
+import cobra
+import pandas as pd
+import re
+import logging
+from typing import Optional, Tuple, Union, List, Dict, Set
+from collections import defaultdict
+import utils.rule_parsing  as rulesUtils
+import utils.reaction_parsing as reactionUtils
+from cobra import Model as cobraModel, Reaction, Metabolite
+import sys
+
+
+############################ check_methods ####################################
+def gene_type(l :str, name :str) -> str:
+    """
+    Determine the type of gene ID.
+
+    Args:
+        l (str): The gene identifier to check.
+        name (str): The name of the dataset, used in error messages.
+
+    Returns:
+        str: The type of gene ID ('HGNC_ID', 'ENSG', 'HGNC_symbol', or 'entrez_id').
+
+    Raises:
+        sys.exit: If the gene ID type is not supported, the execution is aborted.
+    """
+    if check_hgnc(l):
+        return 'HGNC_ID'
+    elif check_ensembl(l):
+        return 'ENSG'
+    elif check_symbol(l):
+        return 'HGNC_symbol'
+    elif check_entrez(l):
+        return 'entrez_id'
+    else:
+        sys.exit('Execution aborted:\n' +
+                 'gene ID type in ' + name + ' not supported. Supported ID'+
+                 'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n')
+
+def check_hgnc(l :str) -> bool:
+    """
+    Check if a gene identifier follows the HGNC format.
+
+    Args:
+        l (str): The gene identifier to check.
+
+    Returns:
+        bool: True if the gene identifier follows the HGNC format, False otherwise.
+    """
+    if len(l) > 5:
+        if (l.upper()).startswith('HGNC:'):
+            return l[5:].isdigit()
+        else:
+            return False
+    else:
+        return False
+
+def check_ensembl(l :str) -> bool:
+    """
+    Check if a gene identifier follows the Ensembl format.
+
+    Args:
+        l (str): The gene identifier to check.
+
+    Returns:
+        bool: True if the gene identifier follows the Ensembl format, False otherwise.
+    """
+    return l.upper().startswith('ENS')
+ 
+
+def check_symbol(l :str) -> bool:
+    """
+    Check if a gene identifier follows the symbol format.
+
+    Args:
+        l (str): The gene identifier to check.
+
+    Returns:
+        bool: True if the gene identifier follows the symbol format, False otherwise.
+    """
+    if len(l) > 0:
+        if l[0].isalpha() and l[1:].isalnum():
+            return True
+        else:
+            return False
+    else:
+        return False
+
+def check_entrez(l :str) -> bool:
+    """
+    Check if a gene identifier follows the Entrez ID format.
+
+    Args:
+        l (str): The gene identifier to check.
+
+    Returns:
+        bool: True if the gene identifier follows the Entrez ID format, False otherwise.
+    """ 
+    if len(l) > 0:
+        return l.isdigit()
+    else: 
+        return False
+
+################################- DATA GENERATION -################################
+ReactionId = str
+def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]:
+    """
+    Generate a dictionary mapping reaction IDs to GPR rules from the model.
+
+    Args:
+        model: COBRA model to derive data from.
+        asParsed: If True, parse rules into a nested list structure; otherwise keep raw strings.
+
+    Returns:
+        Dict[ReactionId, rulesUtils.OpList]: Parsed rules by reaction ID.
+        Dict[ReactionId, str]: Raw rules by reaction ID.
+    """
+    _ruleGetter   =  lambda reaction : reaction.gene_reaction_rule
+    ruleExtractor = (lambda reaction :
+        rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter
+
+    return {
+        reaction.id : ruleExtractor(reaction)
+        for reaction in model.reactions
+        if reaction.gene_reaction_rule }
+
+def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]:
+    """
+    Generate a dictionary mapping reaction IDs to reaction formulas from the model.
+
+    Args:
+        model: COBRA model to derive data from.
+        asParsed: If True, convert formulas into a parsed representation; otherwise keep raw strings.
+
+    Returns:
+        Dict[ReactionId, str]: Reactions by reaction ID (parsed if requested).
+    """
+
+    unparsedReactions = {
+        reaction.id : reaction.reaction
+        for reaction in model.reactions
+        if reaction.reaction 
+    }
+
+    if not asParsed: return unparsedReactions
+    
+    return reactionUtils.create_reaction_dict(unparsedReactions)
+
+def get_medium(model:cobraModel) -> pd.DataFrame:
+    """
+    Extract the uptake reactions representing the model medium.
+
+    Returns a DataFrame with a single column 'reaction' listing exchange reactions
+    with negative lower bound and no positive stoichiometric coefficients (uptake only).
+    """
+    trueMedium=[]
+    for r in model.reactions:
+        positiveCoeff=0
+        for m in r.metabolites:
+            if r.get_coefficient(m.id)>0:
+                positiveCoeff=1;
+        if (positiveCoeff==0 and r.lower_bound<0):
+            trueMedium.append(r.id)
+
+    df_medium = pd.DataFrame()
+    df_medium["reaction"] = trueMedium
+    return df_medium
+
+def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame:
+    """
+    Extract objective coefficients for each reaction.
+
+    Args:
+        model: COBRA model
+
+    Returns:
+        pd.DataFrame with columns: ReactionID, ObjectiveCoefficient
+    """
+    coeffs = []
+    # model.objective.expression is a linear expression
+    objective_expr = model.objective.expression.as_coefficients_dict()
+    
+    for reaction in model.reactions:
+        coeff = objective_expr.get(reaction.forward_variable, 0.0)
+        coeffs.append({
+            "ReactionID": reaction.id,
+            "ObjectiveCoefficient": coeff
+        })
+    
+    return pd.DataFrame(coeffs)
+
+def generate_bounds(model:cobraModel) -> pd.DataFrame:
+    """
+    Build a DataFrame of lower/upper bounds for all reactions.
+
+    Returns:
+        pd.DataFrame indexed by reaction IDs with columns ['lower_bound', 'upper_bound'].
+    """
+
+    rxns = []
+    for reaction in model.reactions:
+        rxns.append(reaction.id)
+
+    bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns)
+
+    for reaction in model.reactions:
+        bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound]
+    return bounds
+
+
+
+def generate_compartments(model: cobraModel) -> pd.DataFrame:
+    """
+    Generates a DataFrame containing pathway information for each reaction.
+    Creates columns for each pathway position (Pathway_1, Pathway_2, etc.) only if pathways exist.
+    
+    Args:
+        model: the COBRA model to extract pathway data from.
+        
+    Returns:
+        pd.DataFrame: DataFrame with ReactionID and pathway columns (if any pathways exist)
+    """
+    pathway_data = []
+
+    # First pass: determine the maximum number of pathways any reaction has
+    max_pathways = 0
+    reaction_pathways = {}
+    has_any_pathways = False
+
+    for reaction in model.reactions:
+        # Get unique pathways from all metabolites in the reaction
+        if 'pathways' in reaction.annotation and reaction.annotation['pathways']:
+            if type(reaction.annotation['pathways']) == list:
+                # Filter out empty/None values
+                valid_pathways = [p for p in reaction.annotation['pathways'] if p]
+                if valid_pathways:
+                    reaction_pathways[reaction.id] = valid_pathways
+                    max_pathways = max(max_pathways, len(valid_pathways))
+                    has_any_pathways = True
+                else:
+                    reaction_pathways[reaction.id] = []
+            else:
+                # Single pathway value
+                if reaction.annotation['pathways']:
+                    reaction_pathways[reaction.id] = [reaction.annotation['pathways']]
+                    max_pathways = max(max_pathways, 1)
+                    has_any_pathways = True
+                else:
+                    reaction_pathways[reaction.id] = []
+        else:
+            # No pathway annotation - use empty list
+            reaction_pathways[reaction.id] = []
+
+    # If no pathways exist in the model, return DataFrame with only ReactionID
+    if not has_any_pathways:
+        return None
+
+    # Create column names for pathways only if they exist
+    pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)]
+
+    # Second pass: create the data with pathway columns
+    for reaction_id, pathways in reaction_pathways.items():
+        row = {"ReactionID": reaction_id}
+        
+        # Fill pathway columns
+        for i in range(max_pathways):
+            col_name = pathway_columns[i]
+            if i < len(pathways):
+                row[col_name] = pathways[i]
+            else:
+                row[col_name] = None
+
+        pathway_data.append(row)
+
+    return pd.DataFrame(pathway_data)
+
+def set_annotation_pathways_from_data(model: cobraModel, df: pd.DataFrame):
+    """Set reaction pathways based on 'Pathway_1', 'Pathway_2', ... columns in the dataframe."""
+    pathway_cols = [col for col in df.columns if col.startswith('Pathway_')]
+    if not pathway_cols:
+        print("No 'Pathway_' columns found, skipping pathway annotation")
+        return
+    
+    pathway_data = defaultdict(list)
+    
+    for idx, row in df.iterrows():
+        reaction_id = str(row['ReactionID']).strip()
+        if reaction_id not in model.reactions:
+            continue
+        
+        pathways = []
+        for col in pathway_cols:
+            if pd.notna(row[col]) and str(row[col]).strip():
+                pathways.append(str(row[col]).strip())
+        
+        if pathways:
+
+            reaction = model.reactions.get_by_id(reaction_id)
+            if len(pathways) == 1:
+                reaction.annotation['pathways'] = pathways[0]
+            else:
+                reaction.annotation['pathways'] = pathways
+
+            pathway_data[reaction_id] = pathways
+    
+    print(f"Annotated {len(pathway_data)} reactions with pathways.")
+
+def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel:
+    """
+    Build a COBRApy model from a tabular file with reaction data.
+
+    Args:
+        csv_path: Path to the tab-separated file.
+        model_id: ID for the newly created model.
+
+    Returns:
+        cobra.Model: The constructed COBRApy model.
+    """
+    
+    # Try to detect separator
+    with open(csv_path, 'r') as f:
+        first_line = f.readline()
+        sep = '\t' if '\t' in first_line else ','
+    
+    df = pd.read_csv(csv_path, sep=sep)
+    
+    # Check required columns
+    required_cols = ['ReactionID', 'Formula']
+    missing_cols = [col for col in required_cols if col not in df.columns]
+    if missing_cols:
+        raise ValueError(f"Missing required columns: {missing_cols}. Available columns: {list(df.columns)}")
+    
+    model = cobraModel(model_id)
+    
+    metabolites_dict = {}
+    compartments_dict = {}
+    
+    print(f"Building model from {len(df)} reactions...")
+    
+    for idx, row in df.iterrows():
+        reaction_formula = str(row['Formula']).strip()
+        if not reaction_formula or reaction_formula == 'nan':
+            continue
+            
+        metabolites = extract_metabolites_from_reaction(reaction_formula)
+        
+        for met_id in metabolites:
+            compartment = extract_compartment_from_metabolite(met_id)
+            
+            if compartment not in compartments_dict:
+                compartments_dict[compartment] = compartment
+            
+            if met_id not in metabolites_dict:
+                metabolites_dict[met_id] = Metabolite(
+                    id=met_id,
+                    compartment=compartment,
+                    name=met_id.replace(f"_{compartment}", "").replace("__", "_")
+                )
+    
+    model.compartments = compartments_dict
+    
+    model.add_metabolites(list(metabolites_dict.values()))
+    
+    print(f"Added {len(metabolites_dict)} metabolites and {len(compartments_dict)} compartments")
+    
+    reactions_added = 0
+    reactions_skipped = 0
+    
+    for idx, row in df.iterrows():
+
+        reaction_id = str(row['ReactionID']).strip()
+        reaction_formula = str(row['Formula']).strip()
+        
+        if not reaction_formula or reaction_formula == 'nan':
+            raise ValueError(f"Missing reaction formula for {reaction_id}")
+        
+        reaction = Reaction(reaction_id)
+        reaction.name = reaction_id
+        
+        reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0
+        reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0
+        
+        if pd.notna(row['GPR']) and str(row['GPR']).strip():
+            reaction.gene_reaction_rule = str(row['GPR']).strip()
+        
+        try:
+            parse_reaction_formula(reaction, reaction_formula, metabolites_dict)
+        except Exception as e:
+            print(f"Error parsing reaction {reaction_id}: {e}")
+            reactions_skipped += 1
+            continue
+        
+        model.add_reactions([reaction])
+        reactions_added += 1
+            
+    
+    print(f"Added {reactions_added} reactions, skipped {reactions_skipped} reactions")
+    
+    # set objective function
+    set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient")
+
+    set_medium_from_data(model, df)
+
+    set_annotation_pathways_from_data(model, df)
+    
+    print(f"Model completed: {len(model.reactions)} reactions, {len(model.metabolites)} metabolites")
+    
+    return model
+
+
+# Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore)
+#def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
+#    """
+#    Extract metabolite IDs from a reaction formula.
+#    Robust pattern: tokens ending with _<compartment> (e.g., _c, _m, _e),
+#    allowing leading digits/underscores.
+#    """
+#    metabolites = set()
+#    # optional coefficient followed by a token ending with _<letters>
+#    if reaction_formula[-1] == ']' and reaction_formula[-3] == '[':
+#        pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+[[A-Za-z0-9]]+)'
+#    else:
+#        pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[A-Za-z0-9]+)'
+#    matches = re.findall(pattern, reaction_formula)
+#    metabolites.update(matches)
+#    return metabolites
+
+
+def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
+    """
+    Extract metabolite IDs from a reaction formula.
+
+    Handles:
+      - optional stoichiometric coefficients (integers or decimals)
+      - compartment tags at the end of the metabolite, either [c] or _c
+
+    Returns the IDs including the compartment suffix exactly as written.
+    """
+    pattern = re.compile(
+        r'(?:^|(?<=\s)|(?<=\+)|(?<=,)|(?<==)|(?<=:))'              # left boundary (start, space, +, comma, =, :)
+        r'(?:\d+(?:\.\d+)?\s+)?'                                   # optional coefficient (requires space after)
+        r'([A-Za-z0-9][A-Za-z0-9_]*(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+))'  # metabolite + compartment (can start with number)
+    )
+    return {m.group(1) for m in pattern.finditer(reaction_formula)}
+
+
+
+def extract_compartment_from_metabolite(metabolite_id: str) -> str:
+    """Extract the compartment from a metabolite ID."""
+    if '_' == metabolite_id[-2]:
+        return metabolite_id.split('_')[-1]
+    if metabolite_id[-1] == ']' and metabolite_id[-3] == '[':
+        return metabolite_id[-2]
+    return 'c'  # default cytoplasm
+
+
+def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]):
+    """Parse a reaction formula and set metabolites with their coefficients."""
+
+    if '<=>' in formula:
+        parts = formula.split('<=>')
+        reversible = True
+    elif '<--' in formula:
+        parts = formula.split('<--')
+        reversible = False
+    elif '-->' in formula:
+        parts = formula.split('-->')
+        reversible = False
+    elif '<-' in formula:
+        parts = formula.split('<-')
+        reversible = False
+    else:
+        raise ValueError(f"Unrecognized reaction format: {formula}")
+    
+    # Handle cases where one side might be empty (exchange reactions)
+    if len(parts) != 2:
+        raise ValueError(f"Invalid reaction format, expected 2 parts: {formula}")
+    
+    left, right = parts[0].strip(), parts[1].strip()
+    
+    reactants = parse_metabolites_side(left) if left else {}
+    products = parse_metabolites_side(right) if right else {}
+    
+    metabolites_to_add = {}
+    
+    for met_id, coeff in reactants.items():
+        if met_id in metabolites_dict:
+            metabolites_to_add[metabolites_dict[met_id]] = -coeff
+    
+    for met_id, coeff in products.items():
+        if met_id in metabolites_dict:
+            metabolites_to_add[metabolites_dict[met_id]] = coeff
+    
+    reaction.add_metabolites(metabolites_to_add)
+
+
+def parse_metabolites_side(side_str: str) -> Dict[str, float]:
+    """Parse one side of a reaction and extract metabolites with coefficients."""
+    metabolites = {}
+    if not side_str or side_str.strip() == '':
+        return metabolites
+
+    terms = side_str.split('+')
+    for term in terms:
+        term = term.strip()
+        if not term:
+            continue
+
+        # First check if term has space-separated coefficient and metabolite
+        parts = term.split()
+        if len(parts) == 2:
+            # Two parts: potential coefficient + metabolite
+            try:
+                coeff = float(parts[0])
+                met_id = parts[1]
+                # Verify the second part looks like a metabolite with compartment
+                if re.match(r'[A-Za-z0-9_]+(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+)', met_id):
+                    metabolites[met_id] = coeff
+                    continue
+            except ValueError:
+                pass
+        
+        # Single term - check if it's a metabolite (no coefficient)  
+        # Updated pattern to include metabolites starting with numbers
+        if re.match(r'[A-Za-z0-9][A-Za-z0-9_]*(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+)', term):
+            metabolites[term] = 1.0
+        else:
+            print(f"Warning: Could not parse metabolite term: '{term}'")
+
+    return metabolites
+
+
+
+def set_objective_from_csv(model: cobra.Model, df: pd.DataFrame, obj_col: str = "ObjectiveCoefficient"):
+    """
+    Sets the model's objective function based on a column of coefficients in the CSV.
+    Can be any reaction(s), not necessarily biomass.
+    """
+    obj_dict = {}
+    
+    for idx, row in df.iterrows():
+        reaction_id = str(row['ReactionID']).strip()
+        coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0
+        if coeff != 0:
+            if reaction_id in model.reactions:
+                obj_dict[model.reactions.get_by_id(reaction_id)] = coeff
+            else:
+                print(f"Warning: reaction {reaction_id} not found in model, skipping for objective.")
+
+    if not obj_dict:
+        raise ValueError("No reactions found with non-zero objective coefficient.")
+
+    model.objective = obj_dict
+    print(f"Objective set with {len(obj_dict)} reactions.")
+
+
+
+
+def set_medium_from_data(model: cobraModel, df: pd.DataFrame):
+    """Set the medium based on the 'InMedium' column in the dataframe."""
+    if 'InMedium' not in df.columns:
+        print("No 'InMedium' column found, skipping medium setup")
+        return
+        
+    medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist()
+    
+    medium_dict = {}
+    for rxn_id in medium_reactions:
+        if rxn_id in [r.id for r in model.reactions]:
+            reaction = model.reactions.get_by_id(rxn_id)
+            if reaction.lower_bound < 0: 
+                medium_dict[rxn_id] = abs(reaction.lower_bound)
+    
+    if medium_dict:
+        model.medium = medium_dict
+        print(f"Medium set with {len(medium_dict)} components")
+    else:
+        print("No medium components found")
+def validate_model(model: cobraModel) -> Dict[str, any]:
+    """Validate the model and return basic statistics."""
+    validation = {
+        'num_reactions': len(model.reactions),
+        'num_metabolites': len(model.metabolites),
+        'num_genes': len(model.genes),
+        'num_compartments': len(model.compartments),
+        'objective': str(model.objective),
+        'medium_size': len(model.medium),
+        'reversible_reactions': len([r for r in model.reactions if r.reversibility]),
+        'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]),
+    }
+    
+    try:
+        # Growth test
+        solution = model.optimize()
+        validation['growth_rate'] = solution.objective_value
+        validation['status'] = solution.status
+    except Exception as e:
+        validation['growth_rate'] = None
+        validation['status'] = f"Error: {e}"
+    
+    return validation
+
+def convert_genes(model, annotation):
+    """Rename genes using a selected annotation key in gene.notes; returns a model copy."""
+    from cobra.manipulation import rename_genes
+    model2=model.copy()
+    try:
+        dict_genes={gene.id:gene.notes[annotation]  for gene in model2.genes}
+    except:
+        print("No annotation in gene dict!")
+        return -1
+    rename_genes(model2,dict_genes)
+
+    return model2
+
+# ---------- Utility helpers ----------
+def _normalize_colname(col: str) -> str:
+    return col.strip().lower().replace(' ', '_')
+
+def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]:
+    """
+    Find useful columns and return a dict {ensg: colname1, hgnc_id: colname2, ...}.
+    Raise ValueError if no suitable mapping is found.
+    """
+    cols = { _normalize_colname(c): c for c in mapping_df.columns }
+    chosen = {}
+    # candidate names for each category
+    candidates = {
+        'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'],
+        'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'],
+        'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'],
+        'entrez_id': ['entrez', 'entrez_id', 'entrezgene'],
+        'gene_number': ['gene_number']
+    }
+    for key, names in candidates.items():
+        for n in names:
+            if n in cols:
+                chosen[key] = cols[n]
+                break
+    return chosen
+
+def _validate_target_uniqueness(mapping_df: 'pd.DataFrame',
+                                source_col: str,
+                                target_col: str,
+                                model_source_genes: Optional[Set[str]] = None,
+                                logger: Optional[logging.Logger] = None) -> None:
+    """
+        Check that, within the filtered mapping_df, each target maps to at most one source.
+        Log examples if duplicates are found.
+    """
+    if logger is None:
+        logger = logging.getLogger(__name__)
+
+    if mapping_df.empty:
+        logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.")
+        return
+
+    # normalize temporary columns for grouping (without altering the original df)
+    tmp = mapping_df[[source_col, target_col]].copy()
+    tmp['_src_norm'] = tmp[source_col].astype(str).apply(_normalize_gene_id)
+    tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip()
+
+    # optionally filter to the set of model source genes
+    if model_source_genes is not None:
+        tmp = tmp[tmp['_src_norm'].isin(model_source_genes)]
+
+    if tmp.empty:
+        logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.")
+        return
+
+    # build reverse mapping: target -> set(sources)
+    grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna()))
+    # find targets with more than one source
+    problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1}
+
+    if problematic:
+    # prepare warning message with examples (limited subset)
+        sample_items = list(problematic.items())
+        msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."]
+        for tgt, sources in sample_items:
+            msg_lines.append(f"  - target '{tgt}' <- sources: {', '.join(sources)}")
+        full_msg = "\n".join(msg_lines)
+    # log warning
+        logger.warning(full_msg)
+
+    # if everything is fine
+    logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).")
+
+
+def _normalize_gene_id(g: str) -> str:
+    """Normalize a gene ID for use as a key (removes prefixes like 'HGNC:' and strips)."""
+    if g is None:
+        return ""
+    g = str(g).strip()
+    # remove common prefixes
+    g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE)
+    g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE)
+    return g
+
+def _is_or_only_expression(expr: str) -> bool:
+    """
+    Check if a GPR expression contains only OR operators (no AND operators).
+    
+    Args:
+        expr: GPR expression string
+        
+    Returns:
+        bool: True if expression contains only OR (and parentheses) and has multiple genes, False otherwise
+    """
+    if not expr or not expr.strip():
+        return False
+        
+    # Normalize the expression
+    normalized = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
+    
+    # Check if it contains any AND operators
+    has_and = ' and ' in normalized.lower()
+    
+    # Check if it contains OR operators
+    has_or = ' or ' in normalized.lower()
+    
+    # Must have OR operators and no AND operators
+    return has_or and not has_and
+
+
+def _flatten_or_only_gpr(expr: str) -> str:
+    """
+    Flatten a GPR expression that contains only OR operators by:
+    1. Removing all parentheses
+    2. Extracting unique gene names
+    3. Joining them with ' or '
+    
+    Args:
+        expr: GPR expression string with only OR operators
+        
+    Returns:
+        str: Flattened GPR expression
+    """
+    if not expr or not expr.strip():
+        return expr
+        
+    # Extract all gene tokens (exclude logical operators and parentheses)
+    gene_pattern = r'\b[A-Za-z0-9:_.-]+\b'
+    logical = {'and', 'or', 'AND', 'OR', '(', ')'}
+    
+    tokens = re.findall(gene_pattern, expr)
+    genes = [t for t in tokens if t not in logical]
+    
+    # Create set to remove duplicates, then convert back to list to maintain some order
+    unique_genes = list(dict.fromkeys(genes))  # Preserves insertion order
+    
+    if len(unique_genes) == 0:
+        return expr
+    elif len(unique_genes) == 1:
+        return unique_genes[0]
+    else:
+        return ' or '.join(unique_genes)
+
+
+def _simplify_boolean_expression(expr: str) -> str:
+    """
+    Simplify a boolean expression by removing duplicates while strictly preserving semantics.
+    This function handles simple duplicates within parentheses while being conservative about
+    complex expressions that could change semantics.
+    """
+    if not expr or not expr.strip():
+        return expr
+    
+    # Normalize operators and whitespace
+    expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
+    expr = ' '.join(expr.split())  # Normalize whitespace
+    
+    def simplify_parentheses_content(match_obj):
+        """Helper function to simplify content within parentheses."""
+        content = match_obj.group(1)  # Content inside parentheses
+        
+        # Only simplify if it's a pure OR or pure AND chain
+        if ' or ' in content and ' and ' not in content:
+            # Pure OR chain - safe to deduplicate
+            parts = [p.strip() for p in content.split(' or ') if p.strip()]
+            unique_parts = []
+            seen = set()
+            for part in parts:
+                if part not in seen:
+                    unique_parts.append(part)
+                    seen.add(part)
+            
+            if len(unique_parts) == 1:
+                return unique_parts[0]  # Remove unnecessary parentheses for single items
+            else:
+                return '(' + ' or '.join(unique_parts) + ')'
+                
+        elif ' and ' in content and ' or ' not in content:
+            # Pure AND chain - safe to deduplicate  
+            parts = [p.strip() for p in content.split(' and ') if p.strip()]
+            unique_parts = []
+            seen = set()
+            for part in parts:
+                if part not in seen:
+                    unique_parts.append(part)
+                    seen.add(part)
+            
+            if len(unique_parts) == 1:
+                return unique_parts[0]  # Remove unnecessary parentheses for single items
+            else:
+                return '(' + ' and '.join(unique_parts) + ')'
+        else:
+            # Mixed operators or single item - return with parentheses as-is
+            return '(' + content + ')'
+    
+    def remove_duplicates_simple(parts_str: str, separator: str) -> str:
+        """Remove duplicates from a simple chain of operations."""
+        parts = [p.strip() for p in parts_str.split(separator) if p.strip()]
+        
+        # Remove duplicates while preserving order
+        unique_parts = []
+        seen = set()
+        for part in parts:
+            if part not in seen:
+                unique_parts.append(part)
+                seen.add(part)
+        
+        if len(unique_parts) == 1:
+            return unique_parts[0]
+        else:
+            return f' {separator} '.join(unique_parts)
+    
+    try:
+        import re
+        
+        # First, simplify content within parentheses
+        # This handles cases like (A or A) -> A and (B and B) -> B
+        expr_simplified = re.sub(r'\(([^()]+)\)', simplify_parentheses_content, expr)
+        
+        # Check if the resulting expression has mixed operators  
+        has_and = ' and ' in expr_simplified
+        has_or = ' or ' in expr_simplified
+        
+        # Only simplify top-level if it's pure AND or pure OR
+        if has_and and not has_or and '(' not in expr_simplified:
+            # Pure AND chain at top level - safe to deduplicate
+            return remove_duplicates_simple(expr_simplified, 'and')
+        elif has_or and not has_and and '(' not in expr_simplified:
+            # Pure OR chain at top level - safe to deduplicate  
+            return remove_duplicates_simple(expr_simplified, 'or')
+        else:
+            # Mixed operators or has parentheses - return the simplified version (with parentheses content cleaned)
+            return expr_simplified
+            
+    except Exception:
+        # If anything goes wrong, return the original expression
+        return expr
+
+
+def translate_model_genes(model: 'cobra.Model',
+                         mapping_df: 'pd.DataFrame',
+                         target_nomenclature: str,
+                         source_nomenclature: str = 'hgnc_id',
+                         allow_many_to_one: bool = False,
+                         logger: Optional[logging.Logger] = None) -> Tuple['cobra.Model', Dict[str, str]]:
+    """
+    Translate model genes from source_nomenclature to target_nomenclature using a mapping table.
+    mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez).
+
+    Args:
+        model: COBRA model to translate.
+        mapping_df: DataFrame containing the mapping information.
+        target_nomenclature: Desired target key (e.g., 'hgnc_symbol').
+        source_nomenclature: Current source key in the model (default 'hgnc_id').
+        allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs.
+        logger: Optional logger.
+    
+    Returns:
+        Tuple containing:
+        - Translated COBRA model
+        - Dictionary mapping reaction IDs to translation issue descriptions
+    """
+    if logger is None:
+        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+        logger = logging.getLogger(__name__)
+
+    logger.info(f"Translating genes from '{source_nomenclature}' to '{target_nomenclature}'")
+
+    # normalize column names and choose relevant columns
+    chosen = _choose_columns(mapping_df)
+    if not chosen:
+        raise ValueError("Could not detect useful columns in mapping_df. Expected at least one of: ensg, hgnc_id, hgnc_symbol, entrez.")
+
+    # map source/target to actual dataframe column names (allow user-specified source/target keys)
+    # normalize input args
+    src_key = source_nomenclature.strip().lower()
+    tgt_key = target_nomenclature.strip().lower()
+
+    # try to find the actual column names for requested keys
+    col_for_src = None
+    col_for_tgt = None
+    # first, try exact match
+    for k, actual in chosen.items():
+        if k == src_key:
+            col_for_src = actual
+        if k == tgt_key:
+            col_for_tgt = actual
+
+    # if not found, try mapping common names
+    if col_for_src is None:
+        possible_src_names = {k: v for k, v in chosen.items()}
+        # try to match by contained substring
+        for k, actual in possible_src_names.items():
+            if src_key in k:
+                col_for_src = actual
+                break
+
+    if col_for_tgt is None:
+        for k, actual in chosen.items():
+            if tgt_key in k:
+                col_for_tgt = actual
+                break
+
+    if col_for_src is None:
+        raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.")
+    if col_for_tgt is None:
+        raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.")
+
+    model_source_genes = { _normalize_gene_id(g.id) for g in model.genes }
+    logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).")
+
+    tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy()
+    tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).apply(_normalize_gene_id)
+
+    filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy()
+
+    if filtered_map.empty:
+        logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).")
+
+    if not allow_many_to_one:
+        _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger)
+
+    # Crea il mapping
+    gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger)
+
+    # copy model
+    model_copy = model.copy()
+
+    # statistics
+    stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0, 'flattened_or_gprs': 0}
+    unmapped = []
+    multi = []
+    
+    # Dictionary to store translation issues per reaction
+    reaction_translation_issues = {}
+
+    original_genes = {g.id for g in model_copy.genes}
+    logger.info(f"Original genes count: {len(original_genes)}")
+
+    # translate GPRs
+    for rxn in model_copy.reactions:
+        gpr = rxn.gene_reaction_rule
+        if gpr and gpr.strip():
+            new_gpr, rxn_issues = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger, track_issues=True)
+            if rxn_issues:
+                reaction_translation_issues[rxn.id] = rxn_issues
+            
+            if new_gpr != gpr:
+                # Check if this GPR has translation issues and contains only OR operators
+                if rxn_issues and _is_or_only_expression(new_gpr):
+                    # Flatten the GPR: remove parentheses and create set of unique genes
+                    flattened_gpr = _flatten_or_only_gpr(new_gpr)
+                    if flattened_gpr != new_gpr:
+                        stats['flattened_or_gprs'] += 1
+                        logger.debug(f"Flattened OR-only GPR with issues for {rxn.id}: '{new_gpr}' -> '{flattened_gpr}'")
+                        new_gpr = flattened_gpr
+                
+                simplified_gpr = _simplify_boolean_expression(new_gpr)
+                if simplified_gpr != new_gpr:
+                    stats['simplified_gprs'] += 1
+                    logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'")
+                rxn.gene_reaction_rule = simplified_gpr
+                logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{simplified_gpr}'")
+
+    # update model genes based on new GPRs
+    _update_model_genes(model_copy, logger)
+
+    # final logging
+    _log_translation_statistics(stats, unmapped, multi, original_genes, model_copy.genes, logger)
+
+    logger.info("Translation finished")
+    return model_copy, reaction_translation_issues
+
+
+# ---------- helper functions ----------
+def _create_gene_mapping(mapping_df, source_col: str, target_col: str, logger: logging.Logger) -> Dict[str, List[str]]:
+    """
+    Build mapping dict: source_id -> list of target_ids
+    Normalizes IDs (removes prefixes like 'HGNC:' etc).
+    """
+    df = mapping_df[[source_col, target_col]].dropna().copy()
+    # normalize to string
+    df[source_col] = df[source_col].astype(str).apply(_normalize_gene_id)
+    df[target_col] = df[target_col].astype(str).str.strip()
+
+    df = df.drop_duplicates()
+
+    logger.info(f"Creating mapping from {len(df)} rows")
+
+    mapping = defaultdict(list)
+    for _, row in df.iterrows():
+        s = row[source_col]
+        t = row[target_col]
+        if t not in mapping[s]:
+            mapping[s].append(t)
+
+    # stats
+    one_to_one = sum(1 for v in mapping.values() if len(v) == 1)
+    one_to_many = sum(1 for v in mapping.values() if len(v) > 1)
+    logger.info(f"Mapping: {len(mapping)} source keys, {one_to_one} 1:1, {one_to_many} 1:many")
+    return dict(mapping)
+
+
+def _translate_gpr(gpr_string: str,
+                   gene_mapping: Dict[str, List[str]],
+                   stats: Dict[str, int],
+                   unmapped_genes: List[str],
+                   multi_mapping_genes: List[Tuple[str, List[str]]],
+                   logger: logging.Logger,
+                   track_issues: bool = False) -> Union[str, Tuple[str, str]]:
+    """
+    Translate genes inside a GPR string using gene_mapping.
+    Returns new GPR string, and optionally translation issues if track_issues=True.
+    """
+    # Generic token pattern: letters, digits, :, _, -, ., (captures HGNC:1234, ENSG000..., symbols)
+    token_pattern = r'\b[A-Za-z0-9:_.-]+\b'
+    tokens = re.findall(token_pattern, gpr_string)
+
+    logical = {'and', 'or', 'AND', 'OR', '(', ')'}
+    tokens = [t for t in tokens if t not in logical]
+
+    new_gpr = gpr_string
+    issues = []
+
+    for token in sorted(set(tokens), key=lambda x: -len(x)):  # longer tokens first to avoid partial replacement
+        norm = _normalize_gene_id(token)
+        if norm in gene_mapping:
+            targets = gene_mapping[norm]
+            stats['translated'] += 1
+            if len(targets) == 1:
+                stats['one_to_one'] += 1
+                replacement = targets[0]
+            else:
+                stats['one_to_many'] += 1
+                multi_mapping_genes.append((token, targets))
+                replacement = "(" + " or ".join(targets) + ")"
+                if track_issues:
+                    issues.append(f"{token} -> {' or '.join(targets)}")
+
+            pattern = r'\b' + re.escape(token) + r'\b'
+            new_gpr = re.sub(pattern, replacement, new_gpr)
+        else:
+            stats['not_found'] += 1
+            if token not in unmapped_genes:
+                unmapped_genes.append(token)
+            logger.debug(f"Token not found in mapping (left as-is): {token}")
+
+    # Check for many-to-one cases (multiple source genes mapping to same target)
+    if track_issues:
+        # Build reverse mapping to detect many-to-one cases from original tokens
+        original_to_target = {}
+        
+        for orig_token in tokens:
+            norm = _normalize_gene_id(orig_token)
+            if norm in gene_mapping:
+                targets = gene_mapping[norm]
+                for target in targets:
+                    if target not in original_to_target:
+                        original_to_target[target] = []
+                    if orig_token not in original_to_target[target]:
+                        original_to_target[target].append(orig_token)
+        
+        # Identify many-to-one mappings in this specific GPR
+        for target, original_genes in original_to_target.items():
+            if len(original_genes) > 1:
+                issues.append(f"{' or '.join(original_genes)} -> {target}")
+    
+    issue_text = "; ".join(issues) if issues else ""
+    
+    if track_issues:
+        return new_gpr, issue_text
+    else:
+        return new_gpr
+
+
+def _update_model_genes(model: 'cobra.Model', logger: logging.Logger):
+    """
+    Rebuild model.genes from gene_reaction_rule content.
+    Removes genes not referenced and adds missing ones.
+    """
+    # collect genes in GPRs
+    gene_pattern = r'\b[A-Za-z0-9:_.-]+\b'
+    logical = {'and', 'or', 'AND', 'OR', '(', ')'}
+    genes_in_gpr: Set[str] = set()
+
+    for rxn in model.reactions:
+        gpr = rxn.gene_reaction_rule
+        if gpr and gpr.strip():
+            toks = re.findall(gene_pattern, gpr)
+            toks = [t for t in toks if t not in logical]
+            # normalize IDs consistent with mapping normalization
+            toks = [_normalize_gene_id(t) for t in toks]
+            genes_in_gpr.update(toks)
+
+    # existing gene ids
+    existing = {g.id for g in model.genes}
+
+    # remove obsolete genes
+    to_remove = [gid for gid in existing if gid not in genes_in_gpr]
+    removed = 0
+    for gid in to_remove:
+        try:
+            gene_obj = model.genes.get_by_id(gid)
+            model.genes.remove(gene_obj)
+            removed += 1
+        except Exception:
+            # safe-ignore
+            pass
+
+    # add new genes
+    added = 0
+    for gid in genes_in_gpr:
+        if gid not in existing:
+            new_gene = cobra.Gene(gid)
+            try:
+                model.genes.add(new_gene)
+            except Exception:
+                # fallback: if model.genes doesn't support add, try append or model.add_genes
+                try:
+                    model.genes.append(new_gene)
+                except Exception:
+                    try:
+                        model.add_genes([new_gene])
+                    except Exception:
+                        logger.warning(f"Could not add gene object for {gid}")
+            added += 1
+
+    logger.info(f"Model genes updated: removed {removed}, added {added}")
+
+
+def export_model_to_tabular(model: cobraModel, 
+                           output_path: str,
+                           translation_issues: Dict = None,
+                           include_objective: bool = True,
+                           save_function = None) -> pd.DataFrame:
+    """
+    Export a COBRA model to tabular format with optional components.
+    
+    Args:
+        model: COBRA model to export
+        output_path: Path where to save the tabular file
+        translation_issues: Optional dict of {reaction_id: issues} from gene translation
+        include_objective: Whether to include objective coefficient column
+        save_function: Optional custom save function, if None uses pd.DataFrame.to_csv
+        
+    Returns:
+        pd.DataFrame: The merged tabular data
+    """
+    # Generate model data
+    rules = generate_rules(model, asParsed=False)
+    
+    reactions = generate_reactions(model, asParsed=False)
+    bounds = generate_bounds(model)
+    medium = get_medium(model)
+    compartments = generate_compartments(model)
+    
+    # Create base DataFrames
+    df_rules = pd.DataFrame(list(rules.items()), columns=["ReactionID", "GPR"])
+    df_reactions = pd.DataFrame(list(reactions.items()), columns=["ReactionID", "Formula"])
+    df_bounds = bounds.reset_index().rename(columns={"index": "ReactionID"})
+    df_medium = medium.rename(columns={"reaction": "ReactionID"})
+    df_medium["InMedium"] = True
+    
+    # Start merging
+    merged = df_reactions.merge(df_rules, on="ReactionID", how="outer")
+    merged = merged.merge(df_bounds, on="ReactionID", how="outer")
+    
+    # Add objective coefficients if requested
+    if include_objective:
+        objective_function = extract_objective_coefficients(model)
+        merged = merged.merge(objective_function, on="ReactionID", how="outer")
+    
+    # Add compartments/pathways if they exist
+    if compartments is not None:
+        merged = merged.merge(compartments, on="ReactionID", how="outer")
+    
+    # Add medium information
+    merged = merged.merge(df_medium, on="ReactionID", how="left")
+    
+    # Add translation issues if provided
+    if translation_issues:
+        df_translation_issues = pd.DataFrame([
+            {"ReactionID": rxn_id, "TranslationIssues": issues}
+            for rxn_id, issues in translation_issues.items()
+        ])
+        if not df_translation_issues.empty:
+            merged = merged.merge(df_translation_issues, on="ReactionID", how="left")
+            merged["TranslationIssues"] = merged["TranslationIssues"].fillna("")
+    
+    # Final processing
+    merged["InMedium"] = merged["InMedium"].fillna(False)
+    merged = merged.sort_values(by="InMedium", ascending=False)
+    
+    # Save the file
+    if save_function:
+        save_function(merged, output_path)
+    else:
+        merged.to_csv(output_path, sep="\t", index=False)
+    
+    return merged
+
+
+def _log_translation_statistics(stats: Dict[str, int],
+                               unmapped_genes: List[str],
+                               multi_mapping_genes: List[Tuple[str, List[str]]],
+                               original_genes: Set[str],
+                               final_genes,
+                               logger: logging.Logger):
+    logger.info("=== TRANSLATION STATISTICS ===")
+    logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})")
+    logger.info(f"Not found tokens: {stats.get('not_found', 0)}")
+    logger.info(f"Simplified GPRs: {stats.get('simplified_gprs', 0)}")
+    logger.info(f"Flattened OR-only GPRs with issues: {stats.get('flattened_or_gprs', 0)}")
+
+    final_ids = {g.id for g in final_genes}
+    logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}")
+
+    if unmapped_genes:
+        logger.warning(f"Unmapped tokens ({len(unmapped_genes)}): {', '.join(unmapped_genes[:20])}{(' ...' if len(unmapped_genes)>20 else '')}")
+    if multi_mapping_genes:
+        logger.info(f"Multi-mapping examples ({len(multi_mapping_genes)}):")
+        for orig, targets in multi_mapping_genes[:10]:
+            logger.info(f"  {orig} -> {', '.join(targets)}")
+    
+    # Log summary of flattened GPRs if any
+    if stats.get('flattened_or_gprs', 0) > 0:
+        logger.info(f"Flattened {stats['flattened_or_gprs']} OR-only GPRs that had translation issues (removed parentheses, created unique gene sets)")
+
+    
\ No newline at end of file