| 
539
 | 
     1 """
 | 
| 
 | 
     2 Utilities for generating and manipulating COBRA models and related metadata.
 | 
| 
 | 
     3 
 | 
| 
 | 
     4 This module includes helpers to:
 | 
| 
 | 
     5 - extract rules, reactions, bounds, objective coefficients, and compartments
 | 
| 
 | 
     6 - build a COBRA model from a tabular file
 | 
| 
 | 
     7 - set objective and medium from dataframes
 | 
| 
 | 
     8 - validate a model and convert gene identifiers
 | 
| 
 | 
     9 - translate model GPRs using mapping tables
 | 
| 
 | 
    10 """
 | 
| 
 | 
    11 import os
 | 
| 
 | 
    12 import cobra
 | 
| 
 | 
    13 import pandas as pd
 | 
| 
 | 
    14 import re
 | 
| 
 | 
    15 import logging
 | 
| 
 | 
    16 from typing import Optional, Tuple, Union, List, Dict, Set
 | 
| 
 | 
    17 from collections import defaultdict
 | 
| 
 | 
    18 from cobra import Model as cobraModel, Reaction, Metabolite
 | 
| 
 | 
    19 import sys
 | 
| 
 | 
    20 
 | 
| 
542
 | 
    21 try:
 | 
| 
 | 
    22     from . import rule_parsing as rulesUtils
 | 
| 
 | 
    23     from . import reaction_parsing as reactionUtils
 | 
| 
 | 
    24 except:
 | 
| 
 | 
    25     import rule_parsing as rulesUtils
 | 
| 
 | 
    26     import reaction_parsing as reactionUtils
 | 
| 
 | 
    27 
 | 
| 
539
 | 
    28 
 | 
| 
 | 
    29 ############################ check_methods ####################################
 | 
| 
 | 
    30 def gene_type(l :str, name :str) -> str:
 | 
| 
 | 
    31     """
 | 
| 
 | 
    32     Determine the type of gene ID.
 | 
| 
 | 
    33 
 | 
| 
 | 
    34     Args:
 | 
| 
 | 
    35         l (str): The gene identifier to check.
 | 
| 
 | 
    36         name (str): The name of the dataset, used in error messages.
 | 
| 
 | 
    37 
 | 
| 
 | 
    38     Returns:
 | 
| 
 | 
    39         str: The type of gene ID ('HGNC_ID', 'ENSG', 'HGNC_symbol', or 'entrez_id').
 | 
| 
 | 
    40 
 | 
| 
 | 
    41     Raises:
 | 
| 
 | 
    42         sys.exit: If the gene ID type is not supported, the execution is aborted.
 | 
| 
 | 
    43     """
 | 
| 
 | 
    44     if check_hgnc(l):
 | 
| 
 | 
    45         return 'HGNC_ID'
 | 
| 
 | 
    46     elif check_ensembl(l):
 | 
| 
 | 
    47         return 'ENSG'
 | 
| 
 | 
    48     elif check_symbol(l):
 | 
| 
 | 
    49         return 'HGNC_symbol'
 | 
| 
 | 
    50     elif check_entrez(l):
 | 
| 
 | 
    51         return 'entrez_id'
 | 
| 
 | 
    52     else:
 | 
| 
 | 
    53         sys.exit('Execution aborted:\n' +
 | 
| 
 | 
    54                  'gene ID type in ' + name + ' not supported. Supported ID'+
 | 
| 
 | 
    55                  'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n')
 | 
| 
 | 
    56 
 | 
| 
 | 
    57 def check_hgnc(l :str) -> bool:
 | 
| 
 | 
    58     """
 | 
| 
 | 
    59     Check if a gene identifier follows the HGNC format.
 | 
| 
 | 
    60 
 | 
| 
 | 
    61     Args:
 | 
| 
 | 
    62         l (str): The gene identifier to check.
 | 
| 
 | 
    63 
 | 
| 
 | 
    64     Returns:
 | 
| 
 | 
    65         bool: True if the gene identifier follows the HGNC format, False otherwise.
 | 
| 
 | 
    66     """
 | 
| 
 | 
    67     if len(l) > 5:
 | 
| 
 | 
    68         if (l.upper()).startswith('HGNC:'):
 | 
| 
 | 
    69             return l[5:].isdigit()
 | 
| 
 | 
    70         else:
 | 
| 
 | 
    71             return False
 | 
| 
 | 
    72     else:
 | 
| 
 | 
    73         return False
 | 
| 
 | 
    74 
 | 
| 
 | 
    75 def check_ensembl(l :str) -> bool:
 | 
| 
 | 
    76     """
 | 
| 
 | 
    77     Check if a gene identifier follows the Ensembl format.
 | 
| 
 | 
    78 
 | 
| 
 | 
    79     Args:
 | 
| 
 | 
    80         l (str): The gene identifier to check.
 | 
| 
 | 
    81 
 | 
| 
 | 
    82     Returns:
 | 
| 
 | 
    83         bool: True if the gene identifier follows the Ensembl format, False otherwise.
 | 
| 
 | 
    84     """
 | 
| 
 | 
    85     return l.upper().startswith('ENS')
 | 
| 
 | 
    86  
 | 
| 
 | 
    87 
 | 
| 
 | 
    88 def check_symbol(l :str) -> bool:
 | 
| 
 | 
    89     """
 | 
| 
 | 
    90     Check if a gene identifier follows the symbol format.
 | 
| 
 | 
    91 
 | 
| 
 | 
    92     Args:
 | 
| 
 | 
    93         l (str): The gene identifier to check.
 | 
| 
 | 
    94 
 | 
| 
 | 
    95     Returns:
 | 
| 
 | 
    96         bool: True if the gene identifier follows the symbol format, False otherwise.
 | 
| 
 | 
    97     """
 | 
| 
 | 
    98     if len(l) > 0:
 | 
| 
 | 
    99         if l[0].isalpha() and l[1:].isalnum():
 | 
| 
 | 
   100             return True
 | 
| 
 | 
   101         else:
 | 
| 
 | 
   102             return False
 | 
| 
 | 
   103     else:
 | 
| 
 | 
   104         return False
 | 
| 
 | 
   105 
 | 
| 
 | 
   106 def check_entrez(l :str) -> bool:
 | 
| 
 | 
   107     """
 | 
| 
 | 
   108     Check if a gene identifier follows the Entrez ID format.
 | 
| 
 | 
   109 
 | 
| 
 | 
   110     Args:
 | 
| 
 | 
   111         l (str): The gene identifier to check.
 | 
| 
 | 
   112 
 | 
| 
 | 
   113     Returns:
 | 
| 
 | 
   114         bool: True if the gene identifier follows the Entrez ID format, False otherwise.
 | 
| 
 | 
   115     """ 
 | 
| 
 | 
   116     if len(l) > 0:
 | 
| 
 | 
   117         return l.isdigit()
 | 
| 
 | 
   118     else: 
 | 
| 
 | 
   119         return False
 | 
| 
 | 
   120 
 | 
| 
 | 
   121 ################################- DATA GENERATION -################################
 | 
| 
 | 
   122 ReactionId = str
 | 
| 
 | 
   123 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]:
 | 
| 
 | 
   124     """
 | 
| 
 | 
   125     Generate a dictionary mapping reaction IDs to GPR rules from the model.
 | 
| 
 | 
   126 
 | 
| 
 | 
   127     Args:
 | 
| 
 | 
   128         model: COBRA model to derive data from.
 | 
| 
 | 
   129         asParsed: If True, parse rules into a nested list structure; otherwise keep raw strings.
 | 
| 
 | 
   130 
 | 
| 
 | 
   131     Returns:
 | 
| 
 | 
   132         Dict[ReactionId, rulesUtils.OpList]: Parsed rules by reaction ID.
 | 
| 
 | 
   133         Dict[ReactionId, str]: Raw rules by reaction ID.
 | 
| 
 | 
   134     """
 | 
| 
 | 
   135     _ruleGetter   =  lambda reaction : reaction.gene_reaction_rule
 | 
| 
 | 
   136     ruleExtractor = (lambda reaction :
 | 
| 
 | 
   137         rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter
 | 
| 
 | 
   138 
 | 
| 
 | 
   139     return {
 | 
| 
 | 
   140         reaction.id : ruleExtractor(reaction)
 | 
| 
 | 
   141         for reaction in model.reactions
 | 
| 
 | 
   142         if reaction.gene_reaction_rule }
 | 
| 
 | 
   143 
 | 
| 
 | 
   144 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]:
 | 
| 
 | 
   145     """
 | 
| 
 | 
   146     Generate a dictionary mapping reaction IDs to reaction formulas from the model.
 | 
| 
 | 
   147 
 | 
| 
 | 
   148     Args:
 | 
| 
 | 
   149         model: COBRA model to derive data from.
 | 
| 
 | 
   150         asParsed: If True, convert formulas into a parsed representation; otherwise keep raw strings.
 | 
| 
 | 
   151 
 | 
| 
 | 
   152     Returns:
 | 
| 
 | 
   153         Dict[ReactionId, str]: Reactions by reaction ID (parsed if requested).
 | 
| 
 | 
   154     """
 | 
| 
 | 
   155 
 | 
| 
 | 
   156     unparsedReactions = {
 | 
| 
 | 
   157         reaction.id : reaction.reaction
 | 
| 
 | 
   158         for reaction in model.reactions
 | 
| 
 | 
   159         if reaction.reaction 
 | 
| 
 | 
   160     }
 | 
| 
 | 
   161 
 | 
| 
 | 
   162     if not asParsed: return unparsedReactions
 | 
| 
 | 
   163     
 | 
| 
 | 
   164     return reactionUtils.create_reaction_dict(unparsedReactions)
 | 
| 
 | 
   165 
 | 
| 
 | 
   166 def get_medium(model:cobraModel) -> pd.DataFrame:
 | 
| 
 | 
   167     """
 | 
| 
 | 
   168     Extract the uptake reactions representing the model medium.
 | 
| 
 | 
   169 
 | 
| 
 | 
   170     Returns a DataFrame with a single column 'reaction' listing exchange reactions
 | 
| 
 | 
   171     with negative lower bound and no positive stoichiometric coefficients (uptake only).
 | 
| 
 | 
   172     """
 | 
| 
 | 
   173     trueMedium=[]
 | 
| 
 | 
   174     for r in model.reactions:
 | 
| 
 | 
   175         positiveCoeff=0
 | 
| 
 | 
   176         for m in r.metabolites:
 | 
| 
 | 
   177             if r.get_coefficient(m.id)>0:
 | 
| 
 | 
   178                 positiveCoeff=1;
 | 
| 
 | 
   179         if (positiveCoeff==0 and r.lower_bound<0):
 | 
| 
 | 
   180             trueMedium.append(r.id)
 | 
| 
 | 
   181 
 | 
| 
 | 
   182     df_medium = pd.DataFrame()
 | 
| 
 | 
   183     df_medium["reaction"] = trueMedium
 | 
| 
 | 
   184     return df_medium
 | 
| 
 | 
   185 
 | 
| 
 | 
   186 def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame:
 | 
| 
 | 
   187     """
 | 
| 
 | 
   188     Extract objective coefficients for each reaction.
 | 
| 
 | 
   189 
 | 
| 
 | 
   190     Args:
 | 
| 
 | 
   191         model: COBRA model
 | 
| 
 | 
   192 
 | 
| 
 | 
   193     Returns:
 | 
| 
 | 
   194         pd.DataFrame with columns: ReactionID, ObjectiveCoefficient
 | 
| 
 | 
   195     """
 | 
| 
 | 
   196     coeffs = []
 | 
| 
 | 
   197     # model.objective.expression is a linear expression
 | 
| 
 | 
   198     objective_expr = model.objective.expression.as_coefficients_dict()
 | 
| 
 | 
   199     
 | 
| 
 | 
   200     for reaction in model.reactions:
 | 
| 
 | 
   201         coeff = objective_expr.get(reaction.forward_variable, 0.0)
 | 
| 
 | 
   202         coeffs.append({
 | 
| 
 | 
   203             "ReactionID": reaction.id,
 | 
| 
 | 
   204             "ObjectiveCoefficient": coeff
 | 
| 
 | 
   205         })
 | 
| 
 | 
   206     
 | 
| 
 | 
   207     return pd.DataFrame(coeffs)
 | 
| 
 | 
   208 
 | 
| 
 | 
   209 def generate_bounds(model:cobraModel) -> pd.DataFrame:
 | 
| 
 | 
   210     """
 | 
| 
 | 
   211     Build a DataFrame of lower/upper bounds for all reactions.
 | 
| 
 | 
   212 
 | 
| 
 | 
   213     Returns:
 | 
| 
 | 
   214         pd.DataFrame indexed by reaction IDs with columns ['lower_bound', 'upper_bound'].
 | 
| 
 | 
   215     """
 | 
| 
 | 
   216 
 | 
| 
 | 
   217     rxns = []
 | 
| 
 | 
   218     for reaction in model.reactions:
 | 
| 
 | 
   219         rxns.append(reaction.id)
 | 
| 
 | 
   220 
 | 
| 
 | 
   221     bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns)
 | 
| 
 | 
   222 
 | 
| 
 | 
   223     for reaction in model.reactions:
 | 
| 
 | 
   224         bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound]
 | 
| 
 | 
   225     return bounds
 | 
| 
 | 
   226 
 | 
| 
 | 
   227 
 | 
| 
 | 
   228 
 | 
| 
 | 
   229 def generate_compartments(model: cobraModel) -> pd.DataFrame:
 | 
| 
 | 
   230     """
 | 
| 
 | 
   231     Generates a DataFrame containing pathway information for each reaction.
 | 
| 
 | 
   232     Creates columns for each pathway position (Pathway_1, Pathway_2, etc.) only if pathways exist.
 | 
| 
 | 
   233     
 | 
| 
 | 
   234     Args:
 | 
| 
 | 
   235         model: the COBRA model to extract pathway data from.
 | 
| 
 | 
   236         
 | 
| 
 | 
   237     Returns:
 | 
| 
 | 
   238         pd.DataFrame: DataFrame with ReactionID and pathway columns (if any pathways exist)
 | 
| 
 | 
   239     """
 | 
| 
 | 
   240     pathway_data = []
 | 
| 
 | 
   241 
 | 
| 
 | 
   242     # First pass: determine the maximum number of pathways any reaction has
 | 
| 
 | 
   243     max_pathways = 0
 | 
| 
 | 
   244     reaction_pathways = {}
 | 
| 
 | 
   245     has_any_pathways = False
 | 
| 
 | 
   246 
 | 
| 
 | 
   247     for reaction in model.reactions:
 | 
| 
 | 
   248         # Get unique pathways from all metabolites in the reaction
 | 
| 
 | 
   249         if 'pathways' in reaction.annotation and reaction.annotation['pathways']:
 | 
| 
 | 
   250             if type(reaction.annotation['pathways']) == list:
 | 
| 
 | 
   251                 # Filter out empty/None values
 | 
| 
 | 
   252                 valid_pathways = [p for p in reaction.annotation['pathways'] if p]
 | 
| 
 | 
   253                 if valid_pathways:
 | 
| 
 | 
   254                     reaction_pathways[reaction.id] = valid_pathways
 | 
| 
 | 
   255                     max_pathways = max(max_pathways, len(valid_pathways))
 | 
| 
 | 
   256                     has_any_pathways = True
 | 
| 
 | 
   257                 else:
 | 
| 
 | 
   258                     reaction_pathways[reaction.id] = []
 | 
| 
 | 
   259             else:
 | 
| 
 | 
   260                 # Single pathway value
 | 
| 
 | 
   261                 if reaction.annotation['pathways']:
 | 
| 
 | 
   262                     reaction_pathways[reaction.id] = [reaction.annotation['pathways']]
 | 
| 
 | 
   263                     max_pathways = max(max_pathways, 1)
 | 
| 
 | 
   264                     has_any_pathways = True
 | 
| 
 | 
   265                 else:
 | 
| 
 | 
   266                     reaction_pathways[reaction.id] = []
 | 
| 
 | 
   267         else:
 | 
| 
 | 
   268             # No pathway annotation - use empty list
 | 
| 
 | 
   269             reaction_pathways[reaction.id] = []
 | 
| 
 | 
   270 
 | 
| 
 | 
   271     # If no pathways exist in the model, return DataFrame with only ReactionID
 | 
| 
 | 
   272     if not has_any_pathways:
 | 
| 
 | 
   273         return None
 | 
| 
 | 
   274 
 | 
| 
 | 
   275     # Create column names for pathways only if they exist
 | 
| 
 | 
   276     pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)]
 | 
| 
 | 
   277 
 | 
| 
 | 
   278     # Second pass: create the data with pathway columns
 | 
| 
 | 
   279     for reaction_id, pathways in reaction_pathways.items():
 | 
| 
 | 
   280         row = {"ReactionID": reaction_id}
 | 
| 
 | 
   281         
 | 
| 
 | 
   282         # Fill pathway columns
 | 
| 
 | 
   283         for i in range(max_pathways):
 | 
| 
 | 
   284             col_name = pathway_columns[i]
 | 
| 
 | 
   285             if i < len(pathways):
 | 
| 
 | 
   286                 row[col_name] = pathways[i]
 | 
| 
 | 
   287             else:
 | 
| 
 | 
   288                 row[col_name] = None
 | 
| 
 | 
   289 
 | 
| 
 | 
   290         pathway_data.append(row)
 | 
| 
 | 
   291 
 | 
| 
 | 
   292     return pd.DataFrame(pathway_data)
 | 
| 
 | 
   293 
 | 
| 
 | 
   294 def set_annotation_pathways_from_data(model: cobraModel, df: pd.DataFrame):
 | 
| 
 | 
   295     """Set reaction pathways based on 'Pathway_1', 'Pathway_2', ... columns in the dataframe."""
 | 
| 
 | 
   296     pathway_cols = [col for col in df.columns if col.startswith('Pathway_')]
 | 
| 
 | 
   297     if not pathway_cols:
 | 
| 
 | 
   298         print("No 'Pathway_' columns found, skipping pathway annotation")
 | 
| 
 | 
   299         return
 | 
| 
 | 
   300     
 | 
| 
 | 
   301     pathway_data = defaultdict(list)
 | 
| 
 | 
   302     
 | 
| 
 | 
   303     for idx, row in df.iterrows():
 | 
| 
 | 
   304         reaction_id = str(row['ReactionID']).strip()
 | 
| 
 | 
   305         if reaction_id not in model.reactions:
 | 
| 
 | 
   306             continue
 | 
| 
 | 
   307         
 | 
| 
 | 
   308         pathways = []
 | 
| 
 | 
   309         for col in pathway_cols:
 | 
| 
 | 
   310             if pd.notna(row[col]) and str(row[col]).strip():
 | 
| 
 | 
   311                 pathways.append(str(row[col]).strip())
 | 
| 
 | 
   312         
 | 
| 
 | 
   313         if pathways:
 | 
| 
 | 
   314 
 | 
| 
 | 
   315             reaction = model.reactions.get_by_id(reaction_id)
 | 
| 
 | 
   316             if len(pathways) == 1:
 | 
| 
 | 
   317                 reaction.annotation['pathways'] = pathways[0]
 | 
| 
 | 
   318             else:
 | 
| 
 | 
   319                 reaction.annotation['pathways'] = pathways
 | 
| 
 | 
   320 
 | 
| 
 | 
   321             pathway_data[reaction_id] = pathways
 | 
| 
 | 
   322     
 | 
| 
 | 
   323     print(f"Annotated {len(pathway_data)} reactions with pathways.")
 | 
| 
 | 
   324 
 | 
| 
 | 
   325 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel:
 | 
| 
 | 
   326     """
 | 
| 
 | 
   327     Build a COBRApy model from a tabular file with reaction data.
 | 
| 
 | 
   328 
 | 
| 
 | 
   329     Args:
 | 
| 
 | 
   330         csv_path: Path to the tab-separated file.
 | 
| 
 | 
   331         model_id: ID for the newly created model.
 | 
| 
 | 
   332 
 | 
| 
 | 
   333     Returns:
 | 
| 
 | 
   334         cobra.Model: The constructed COBRApy model.
 | 
| 
 | 
   335     """
 | 
| 
 | 
   336     
 | 
| 
 | 
   337     # Try to detect separator
 | 
| 
 | 
   338     with open(csv_path, 'r') as f:
 | 
| 
 | 
   339         first_line = f.readline()
 | 
| 
 | 
   340         sep = '\t' if '\t' in first_line else ','
 | 
| 
 | 
   341     
 | 
| 
 | 
   342     df = pd.read_csv(csv_path, sep=sep)
 | 
| 
 | 
   343     
 | 
| 
 | 
   344     # Check required columns
 | 
| 
 | 
   345     required_cols = ['ReactionID', 'Formula']
 | 
| 
 | 
   346     missing_cols = [col for col in required_cols if col not in df.columns]
 | 
| 
 | 
   347     if missing_cols:
 | 
| 
 | 
   348         raise ValueError(f"Missing required columns: {missing_cols}. Available columns: {list(df.columns)}")
 | 
| 
 | 
   349     
 | 
| 
 | 
   350     model = cobraModel(model_id)
 | 
| 
 | 
   351     
 | 
| 
 | 
   352     metabolites_dict = {}
 | 
| 
 | 
   353     compartments_dict = {}
 | 
| 
 | 
   354     
 | 
| 
 | 
   355     print(f"Building model from {len(df)} reactions...")
 | 
| 
 | 
   356     
 | 
| 
 | 
   357     for idx, row in df.iterrows():
 | 
| 
 | 
   358         reaction_formula = str(row['Formula']).strip()
 | 
| 
 | 
   359         if not reaction_formula or reaction_formula == 'nan':
 | 
| 
 | 
   360             continue
 | 
| 
 | 
   361             
 | 
| 
 | 
   362         metabolites = extract_metabolites_from_reaction(reaction_formula)
 | 
| 
 | 
   363         
 | 
| 
 | 
   364         for met_id in metabolites:
 | 
| 
 | 
   365             compartment = extract_compartment_from_metabolite(met_id)
 | 
| 
 | 
   366             
 | 
| 
 | 
   367             if compartment not in compartments_dict:
 | 
| 
 | 
   368                 compartments_dict[compartment] = compartment
 | 
| 
 | 
   369             
 | 
| 
 | 
   370             if met_id not in metabolites_dict:
 | 
| 
 | 
   371                 metabolites_dict[met_id] = Metabolite(
 | 
| 
 | 
   372                     id=met_id,
 | 
| 
 | 
   373                     compartment=compartment,
 | 
| 
 | 
   374                     name=met_id.replace(f"_{compartment}", "").replace("__", "_")
 | 
| 
 | 
   375                 )
 | 
| 
 | 
   376     
 | 
| 
 | 
   377     model.compartments = compartments_dict
 | 
| 
 | 
   378     
 | 
| 
 | 
   379     model.add_metabolites(list(metabolites_dict.values()))
 | 
| 
 | 
   380     
 | 
| 
 | 
   381     print(f"Added {len(metabolites_dict)} metabolites and {len(compartments_dict)} compartments")
 | 
| 
 | 
   382     
 | 
| 
 | 
   383     reactions_added = 0
 | 
| 
 | 
   384     reactions_skipped = 0
 | 
| 
 | 
   385     
 | 
| 
 | 
   386     for idx, row in df.iterrows():
 | 
| 
 | 
   387 
 | 
| 
 | 
   388         reaction_id = str(row['ReactionID']).strip()
 | 
| 
 | 
   389         reaction_formula = str(row['Formula']).strip()
 | 
| 
 | 
   390         
 | 
| 
 | 
   391         if not reaction_formula or reaction_formula == 'nan':
 | 
| 
 | 
   392             raise ValueError(f"Missing reaction formula for {reaction_id}")
 | 
| 
 | 
   393         
 | 
| 
 | 
   394         reaction = Reaction(reaction_id)
 | 
| 
 | 
   395         reaction.name = reaction_id
 | 
| 
 | 
   396         
 | 
| 
 | 
   397         reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0
 | 
| 
 | 
   398         reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0
 | 
| 
 | 
   399         
 | 
| 
 | 
   400         if pd.notna(row['GPR']) and str(row['GPR']).strip():
 | 
| 
 | 
   401             reaction.gene_reaction_rule = str(row['GPR']).strip()
 | 
| 
 | 
   402         
 | 
| 
 | 
   403         try:
 | 
| 
 | 
   404             parse_reaction_formula(reaction, reaction_formula, metabolites_dict)
 | 
| 
 | 
   405         except Exception as e:
 | 
| 
 | 
   406             print(f"Error parsing reaction {reaction_id}: {e}")
 | 
| 
 | 
   407             reactions_skipped += 1
 | 
| 
 | 
   408             continue
 | 
| 
 | 
   409         
 | 
| 
 | 
   410         model.add_reactions([reaction])
 | 
| 
 | 
   411         reactions_added += 1
 | 
| 
 | 
   412             
 | 
| 
 | 
   413     
 | 
| 
 | 
   414     print(f"Added {reactions_added} reactions, skipped {reactions_skipped} reactions")
 | 
| 
 | 
   415     
 | 
| 
 | 
   416     # set objective function
 | 
| 
 | 
   417     set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient")
 | 
| 
 | 
   418 
 | 
| 
 | 
   419     set_medium_from_data(model, df)
 | 
| 
 | 
   420 
 | 
| 
 | 
   421     set_annotation_pathways_from_data(model, df)
 | 
| 
 | 
   422     
 | 
| 
 | 
   423     print(f"Model completed: {len(model.reactions)} reactions, {len(model.metabolites)} metabolites")
 | 
| 
 | 
   424     
 | 
| 
 | 
   425     return model
 | 
| 
 | 
   426 
 | 
| 
 | 
   427 
 | 
| 
 | 
   428 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore)
 | 
| 
 | 
   429 #def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
 | 
| 
 | 
   430 #    """
 | 
| 
 | 
   431 #    Extract metabolite IDs from a reaction formula.
 | 
| 
 | 
   432 #    Robust pattern: tokens ending with _<compartment> (e.g., _c, _m, _e),
 | 
| 
 | 
   433 #    allowing leading digits/underscores.
 | 
| 
 | 
   434 #    """
 | 
| 
 | 
   435 #    metabolites = set()
 | 
| 
 | 
   436 #    # optional coefficient followed by a token ending with _<letters>
 | 
| 
 | 
   437 #    if reaction_formula[-1] == ']' and reaction_formula[-3] == '[':
 | 
| 
 | 
   438 #        pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+[[A-Za-z0-9]]+)'
 | 
| 
 | 
   439 #    else:
 | 
| 
 | 
   440 #        pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[A-Za-z0-9]+)'
 | 
| 
 | 
   441 #    matches = re.findall(pattern, reaction_formula)
 | 
| 
 | 
   442 #    metabolites.update(matches)
 | 
| 
 | 
   443 #    return metabolites
 | 
| 
 | 
   444 
 | 
| 
 | 
   445 
 | 
| 
 | 
   446 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
 | 
| 
 | 
   447     """
 | 
| 
 | 
   448     Extract metabolite IDs from a reaction formula.
 | 
| 
 | 
   449 
 | 
| 
 | 
   450     Handles:
 | 
| 
 | 
   451       - optional stoichiometric coefficients (integers or decimals)
 | 
| 
 | 
   452       - compartment tags at the end of the metabolite, either [c] or _c
 | 
| 
 | 
   453 
 | 
| 
 | 
   454     Returns the IDs including the compartment suffix exactly as written.
 | 
| 
 | 
   455     """
 | 
| 
 | 
   456     pattern = re.compile(
 | 
| 
 | 
   457         r'(?:^|(?<=\s)|(?<=\+)|(?<=,)|(?<==)|(?<=:))'              # left boundary (start, space, +, comma, =, :)
 | 
| 
 | 
   458         r'(?:\d+(?:\.\d+)?\s+)?'                                   # optional coefficient (requires space after)
 | 
| 
 | 
   459         r'([A-Za-z0-9][A-Za-z0-9_]*(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+))'  # metabolite + compartment (can start with number)
 | 
| 
 | 
   460     )
 | 
| 
 | 
   461     return {m.group(1) for m in pattern.finditer(reaction_formula)}
 | 
| 
 | 
   462 
 | 
| 
 | 
   463 
 | 
| 
 | 
   464 
 | 
| 
 | 
   465 def extract_compartment_from_metabolite(metabolite_id: str) -> str:
 | 
| 
 | 
   466     """Extract the compartment from a metabolite ID."""
 | 
| 
 | 
   467     if '_' == metabolite_id[-2]:
 | 
| 
 | 
   468         return metabolite_id.split('_')[-1]
 | 
| 
 | 
   469     if metabolite_id[-1] == ']' and metabolite_id[-3] == '[':
 | 
| 
 | 
   470         return metabolite_id[-2]
 | 
| 
 | 
   471     return 'c'  # default cytoplasm
 | 
| 
 | 
   472 
 | 
| 
 | 
   473 
 | 
| 
 | 
   474 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]):
 | 
| 
 | 
   475     """Parse a reaction formula and set metabolites with their coefficients."""
 | 
| 
 | 
   476 
 | 
| 
 | 
   477     if '<=>' in formula:
 | 
| 
 | 
   478         parts = formula.split('<=>')
 | 
| 
 | 
   479         reversible = True
 | 
| 
 | 
   480     elif '<--' in formula:
 | 
| 
 | 
   481         parts = formula.split('<--')
 | 
| 
 | 
   482         reversible = False
 | 
| 
 | 
   483     elif '-->' in formula:
 | 
| 
 | 
   484         parts = formula.split('-->')
 | 
| 
 | 
   485         reversible = False
 | 
| 
 | 
   486     elif '<-' in formula:
 | 
| 
 | 
   487         parts = formula.split('<-')
 | 
| 
 | 
   488         reversible = False
 | 
| 
 | 
   489     else:
 | 
| 
 | 
   490         raise ValueError(f"Unrecognized reaction format: {formula}")
 | 
| 
 | 
   491     
 | 
| 
 | 
   492     # Handle cases where one side might be empty (exchange reactions)
 | 
| 
 | 
   493     if len(parts) != 2:
 | 
| 
 | 
   494         raise ValueError(f"Invalid reaction format, expected 2 parts: {formula}")
 | 
| 
 | 
   495     
 | 
| 
 | 
   496     left, right = parts[0].strip(), parts[1].strip()
 | 
| 
 | 
   497     
 | 
| 
 | 
   498     reactants = parse_metabolites_side(left) if left else {}
 | 
| 
 | 
   499     products = parse_metabolites_side(right) if right else {}
 | 
| 
 | 
   500     
 | 
| 
 | 
   501     metabolites_to_add = {}
 | 
| 
 | 
   502     
 | 
| 
 | 
   503     for met_id, coeff in reactants.items():
 | 
| 
 | 
   504         if met_id in metabolites_dict:
 | 
| 
 | 
   505             metabolites_to_add[metabolites_dict[met_id]] = -coeff
 | 
| 
 | 
   506     
 | 
| 
 | 
   507     for met_id, coeff in products.items():
 | 
| 
 | 
   508         if met_id in metabolites_dict:
 | 
| 
 | 
   509             metabolites_to_add[metabolites_dict[met_id]] = coeff
 | 
| 
 | 
   510     
 | 
| 
 | 
   511     reaction.add_metabolites(metabolites_to_add)
 | 
| 
 | 
   512 
 | 
| 
 | 
   513 
 | 
| 
 | 
   514 def parse_metabolites_side(side_str: str) -> Dict[str, float]:
 | 
| 
 | 
   515     """Parse one side of a reaction and extract metabolites with coefficients."""
 | 
| 
 | 
   516     metabolites = {}
 | 
| 
 | 
   517     if not side_str or side_str.strip() == '':
 | 
| 
 | 
   518         return metabolites
 | 
| 
 | 
   519 
 | 
| 
 | 
   520     terms = side_str.split('+')
 | 
| 
 | 
   521     for term in terms:
 | 
| 
 | 
   522         term = term.strip()
 | 
| 
 | 
   523         if not term:
 | 
| 
 | 
   524             continue
 | 
| 
 | 
   525 
 | 
| 
 | 
   526         # First check if term has space-separated coefficient and metabolite
 | 
| 
 | 
   527         parts = term.split()
 | 
| 
 | 
   528         if len(parts) == 2:
 | 
| 
 | 
   529             # Two parts: potential coefficient + metabolite
 | 
| 
 | 
   530             try:
 | 
| 
 | 
   531                 coeff = float(parts[0])
 | 
| 
 | 
   532                 met_id = parts[1]
 | 
| 
 | 
   533                 # Verify the second part looks like a metabolite with compartment
 | 
| 
 | 
   534                 if re.match(r'[A-Za-z0-9_]+(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+)', met_id):
 | 
| 
 | 
   535                     metabolites[met_id] = coeff
 | 
| 
 | 
   536                     continue
 | 
| 
 | 
   537             except ValueError:
 | 
| 
 | 
   538                 pass
 | 
| 
 | 
   539         
 | 
| 
 | 
   540         # Single term - check if it's a metabolite (no coefficient)  
 | 
| 
 | 
   541         # Updated pattern to include metabolites starting with numbers
 | 
| 
 | 
   542         if re.match(r'[A-Za-z0-9][A-Za-z0-9_]*(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+)', term):
 | 
| 
 | 
   543             metabolites[term] = 1.0
 | 
| 
 | 
   544         else:
 | 
| 
 | 
   545             print(f"Warning: Could not parse metabolite term: '{term}'")
 | 
| 
 | 
   546 
 | 
| 
 | 
   547     return metabolites
 | 
| 
 | 
   548 
 | 
| 
 | 
   549 
 | 
| 
 | 
   550 
 | 
| 
 | 
   551 def set_objective_from_csv(model: cobra.Model, df: pd.DataFrame, obj_col: str = "ObjectiveCoefficient"):
 | 
| 
 | 
   552     """
 | 
| 
 | 
   553     Sets the model's objective function based on a column of coefficients in the CSV.
 | 
| 
 | 
   554     Can be any reaction(s), not necessarily biomass.
 | 
| 
 | 
   555     """
 | 
| 
 | 
   556     obj_dict = {}
 | 
| 
 | 
   557     
 | 
| 
 | 
   558     for idx, row in df.iterrows():
 | 
| 
 | 
   559         reaction_id = str(row['ReactionID']).strip()
 | 
| 
 | 
   560         coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0
 | 
| 
 | 
   561         if coeff != 0:
 | 
| 
 | 
   562             if reaction_id in model.reactions:
 | 
| 
 | 
   563                 obj_dict[model.reactions.get_by_id(reaction_id)] = coeff
 | 
| 
 | 
   564             else:
 | 
| 
 | 
   565                 print(f"Warning: reaction {reaction_id} not found in model, skipping for objective.")
 | 
| 
 | 
   566 
 | 
| 
 | 
   567     if not obj_dict:
 | 
| 
 | 
   568         raise ValueError("No reactions found with non-zero objective coefficient.")
 | 
| 
 | 
   569 
 | 
| 
 | 
   570     model.objective = obj_dict
 | 
| 
 | 
   571     print(f"Objective set with {len(obj_dict)} reactions.")
 | 
| 
 | 
   572 
 | 
| 
 | 
   573 
 | 
| 
 | 
   574 
 | 
| 
 | 
   575 
 | 
| 
 | 
   576 def set_medium_from_data(model: cobraModel, df: pd.DataFrame):
 | 
| 
 | 
   577     """Set the medium based on the 'InMedium' column in the dataframe."""
 | 
| 
 | 
   578     if 'InMedium' not in df.columns:
 | 
| 
 | 
   579         print("No 'InMedium' column found, skipping medium setup")
 | 
| 
 | 
   580         return
 | 
| 
 | 
   581         
 | 
| 
 | 
   582     medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist()
 | 
| 
 | 
   583     
 | 
| 
 | 
   584     medium_dict = {}
 | 
| 
 | 
   585     for rxn_id in medium_reactions:
 | 
| 
 | 
   586         if rxn_id in [r.id for r in model.reactions]:
 | 
| 
 | 
   587             reaction = model.reactions.get_by_id(rxn_id)
 | 
| 
 | 
   588             if reaction.lower_bound < 0: 
 | 
| 
 | 
   589                 medium_dict[rxn_id] = abs(reaction.lower_bound)
 | 
| 
 | 
   590     
 | 
| 
 | 
   591     if medium_dict:
 | 
| 
 | 
   592         model.medium = medium_dict
 | 
| 
 | 
   593         print(f"Medium set with {len(medium_dict)} components")
 | 
| 
 | 
   594     else:
 | 
| 
 | 
   595         print("No medium components found")
 | 
| 
 | 
   596 def validate_model(model: cobraModel) -> Dict[str, any]:
 | 
| 
 | 
   597     """Validate the model and return basic statistics."""
 | 
| 
 | 
   598     validation = {
 | 
| 
 | 
   599         'num_reactions': len(model.reactions),
 | 
| 
 | 
   600         'num_metabolites': len(model.metabolites),
 | 
| 
 | 
   601         'num_genes': len(model.genes),
 | 
| 
 | 
   602         'num_compartments': len(model.compartments),
 | 
| 
 | 
   603         'objective': str(model.objective),
 | 
| 
 | 
   604         'medium_size': len(model.medium),
 | 
| 
 | 
   605         'reversible_reactions': len([r for r in model.reactions if r.reversibility]),
 | 
| 
 | 
   606         'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]),
 | 
| 
 | 
   607     }
 | 
| 
 | 
   608     
 | 
| 
 | 
   609     try:
 | 
| 
 | 
   610         # Growth test
 | 
| 
 | 
   611         solution = model.optimize()
 | 
| 
 | 
   612         validation['growth_rate'] = solution.objective_value
 | 
| 
 | 
   613         validation['status'] = solution.status
 | 
| 
 | 
   614     except Exception as e:
 | 
| 
 | 
   615         validation['growth_rate'] = None
 | 
| 
 | 
   616         validation['status'] = f"Error: {e}"
 | 
| 
 | 
   617     
 | 
| 
 | 
   618     return validation
 | 
| 
 | 
   619 
 | 
| 
 | 
   620 def convert_genes(model, annotation):
 | 
| 
 | 
   621     """Rename genes using a selected annotation key in gene.notes; returns a model copy."""
 | 
| 
 | 
   622     from cobra.manipulation import rename_genes
 | 
| 
 | 
   623     model2=model.copy()
 | 
| 
 | 
   624     try:
 | 
| 
 | 
   625         dict_genes={gene.id:gene.notes[annotation]  for gene in model2.genes}
 | 
| 
 | 
   626     except:
 | 
| 
 | 
   627         print("No annotation in gene dict!")
 | 
| 
 | 
   628         return -1
 | 
| 
 | 
   629     rename_genes(model2,dict_genes)
 | 
| 
 | 
   630 
 | 
| 
 | 
   631     return model2
 | 
| 
 | 
   632 
 | 
| 
 | 
   633 # ---------- Utility helpers ----------
 | 
| 
 | 
   634 def _normalize_colname(col: str) -> str:
 | 
| 
 | 
   635     return col.strip().lower().replace(' ', '_')
 | 
| 
 | 
   636 
 | 
| 
 | 
   637 def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]:
 | 
| 
 | 
   638     """
 | 
| 
 | 
   639     Find useful columns and return a dict {ensg: colname1, hgnc_id: colname2, ...}.
 | 
| 
 | 
   640     Raise ValueError if no suitable mapping is found.
 | 
| 
 | 
   641     """
 | 
| 
 | 
   642     cols = { _normalize_colname(c): c for c in mapping_df.columns }
 | 
| 
 | 
   643     chosen = {}
 | 
| 
 | 
   644     # candidate names for each category
 | 
| 
 | 
   645     candidates = {
 | 
| 
 | 
   646         'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'],
 | 
| 
 | 
   647         'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'],
 | 
| 
 | 
   648         'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'],
 | 
| 
 | 
   649         'entrez_id': ['entrez', 'entrez_id', 'entrezgene'],
 | 
| 
 | 
   650         'gene_number': ['gene_number']
 | 
| 
 | 
   651     }
 | 
| 
 | 
   652     for key, names in candidates.items():
 | 
| 
 | 
   653         for n in names:
 | 
| 
 | 
   654             if n in cols:
 | 
| 
 | 
   655                 chosen[key] = cols[n]
 | 
| 
 | 
   656                 break
 | 
| 
 | 
   657     return chosen
 | 
| 
 | 
   658 
 | 
| 
 | 
   659 def _validate_target_uniqueness(mapping_df: 'pd.DataFrame',
 | 
| 
 | 
   660                                 source_col: str,
 | 
| 
 | 
   661                                 target_col: str,
 | 
| 
 | 
   662                                 model_source_genes: Optional[Set[str]] = None,
 | 
| 
 | 
   663                                 logger: Optional[logging.Logger] = None) -> None:
 | 
| 
 | 
   664     """
 | 
| 
 | 
   665         Check that, within the filtered mapping_df, each target maps to at most one source.
 | 
| 
 | 
   666         Log examples if duplicates are found.
 | 
| 
 | 
   667     """
 | 
| 
 | 
   668     if logger is None:
 | 
| 
 | 
   669         logger = logging.getLogger(__name__)
 | 
| 
 | 
   670 
 | 
| 
 | 
   671     if mapping_df.empty:
 | 
| 
 | 
   672         logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.")
 | 
| 
 | 
   673         return
 | 
| 
 | 
   674 
 | 
| 
 | 
   675     # normalize temporary columns for grouping (without altering the original df)
 | 
| 
 | 
   676     tmp = mapping_df[[source_col, target_col]].copy()
 | 
| 
 | 
   677     tmp['_src_norm'] = tmp[source_col].astype(str).apply(_normalize_gene_id)
 | 
| 
 | 
   678     tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip()
 | 
| 
 | 
   679 
 | 
| 
 | 
   680     # optionally filter to the set of model source genes
 | 
| 
 | 
   681     if model_source_genes is not None:
 | 
| 
 | 
   682         tmp = tmp[tmp['_src_norm'].isin(model_source_genes)]
 | 
| 
 | 
   683 
 | 
| 
 | 
   684     if tmp.empty:
 | 
| 
 | 
   685         logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.")
 | 
| 
 | 
   686         return
 | 
| 
 | 
   687 
 | 
| 
 | 
   688     # build reverse mapping: target -> set(sources)
 | 
| 
 | 
   689     grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna()))
 | 
| 
 | 
   690     # find targets with more than one source
 | 
| 
 | 
   691     problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1}
 | 
| 
 | 
   692 
 | 
| 
 | 
   693     if problematic:
 | 
| 
 | 
   694     # prepare warning message with examples (limited subset)
 | 
| 
 | 
   695         sample_items = list(problematic.items())
 | 
| 
 | 
   696         msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."]
 | 
| 
 | 
   697         for tgt, sources in sample_items:
 | 
| 
 | 
   698             msg_lines.append(f"  - target '{tgt}' <- sources: {', '.join(sources)}")
 | 
| 
 | 
   699         full_msg = "\n".join(msg_lines)
 | 
| 
 | 
   700     # log warning
 | 
| 
 | 
   701         logger.warning(full_msg)
 | 
| 
 | 
   702 
 | 
| 
 | 
   703     # if everything is fine
 | 
| 
 | 
   704     logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).")
 | 
| 
 | 
   705 
 | 
| 
 | 
   706 
 | 
| 
 | 
   707 def _normalize_gene_id(g: str) -> str:
 | 
| 
 | 
   708     """Normalize a gene ID for use as a key (removes prefixes like 'HGNC:' and strips)."""
 | 
| 
 | 
   709     if g is None:
 | 
| 
 | 
   710         return ""
 | 
| 
 | 
   711     g = str(g).strip()
 | 
| 
 | 
   712     # remove common prefixes
 | 
| 
 | 
   713     g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE)
 | 
| 
 | 
   714     g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE)
 | 
| 
 | 
   715     return g
 | 
| 
 | 
   716 
 | 
| 
 | 
   717 def _is_or_only_expression(expr: str) -> bool:
 | 
| 
 | 
   718     """
 | 
| 
 | 
   719     Check if a GPR expression contains only OR operators (no AND operators).
 | 
| 
 | 
   720     
 | 
| 
 | 
   721     Args:
 | 
| 
 | 
   722         expr: GPR expression string
 | 
| 
 | 
   723         
 | 
| 
 | 
   724     Returns:
 | 
| 
 | 
   725         bool: True if expression contains only OR (and parentheses) and has multiple genes, False otherwise
 | 
| 
 | 
   726     """
 | 
| 
 | 
   727     if not expr or not expr.strip():
 | 
| 
 | 
   728         return False
 | 
| 
 | 
   729         
 | 
| 
 | 
   730     # Normalize the expression
 | 
| 
 | 
   731     normalized = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
 | 
| 
 | 
   732     
 | 
| 
 | 
   733     # Check if it contains any AND operators
 | 
| 
 | 
   734     has_and = ' and ' in normalized.lower()
 | 
| 
 | 
   735     
 | 
| 
 | 
   736     # Check if it contains OR operators
 | 
| 
 | 
   737     has_or = ' or ' in normalized.lower()
 | 
| 
 | 
   738     
 | 
| 
 | 
   739     # Must have OR operators and no AND operators
 | 
| 
 | 
   740     return has_or and not has_and
 | 
| 
 | 
   741 
 | 
| 
 | 
   742 
 | 
| 
 | 
   743 def _flatten_or_only_gpr(expr: str) -> str:
 | 
| 
 | 
   744     """
 | 
| 
 | 
   745     Flatten a GPR expression that contains only OR operators by:
 | 
| 
 | 
   746     1. Removing all parentheses
 | 
| 
 | 
   747     2. Extracting unique gene names
 | 
| 
 | 
   748     3. Joining them with ' or '
 | 
| 
 | 
   749     
 | 
| 
 | 
   750     Args:
 | 
| 
 | 
   751         expr: GPR expression string with only OR operators
 | 
| 
 | 
   752         
 | 
| 
 | 
   753     Returns:
 | 
| 
 | 
   754         str: Flattened GPR expression
 | 
| 
 | 
   755     """
 | 
| 
 | 
   756     if not expr or not expr.strip():
 | 
| 
 | 
   757         return expr
 | 
| 
 | 
   758         
 | 
| 
 | 
   759     # Extract all gene tokens (exclude logical operators and parentheses)
 | 
| 
 | 
   760     gene_pattern = r'\b[A-Za-z0-9:_.-]+\b'
 | 
| 
 | 
   761     logical = {'and', 'or', 'AND', 'OR', '(', ')'}
 | 
| 
 | 
   762     
 | 
| 
 | 
   763     tokens = re.findall(gene_pattern, expr)
 | 
| 
 | 
   764     genes = [t for t in tokens if t not in logical]
 | 
| 
 | 
   765     
 | 
| 
 | 
   766     # Create set to remove duplicates, then convert back to list to maintain some order
 | 
| 
 | 
   767     unique_genes = list(dict.fromkeys(genes))  # Preserves insertion order
 | 
| 
 | 
   768     
 | 
| 
 | 
   769     if len(unique_genes) == 0:
 | 
| 
 | 
   770         return expr
 | 
| 
 | 
   771     elif len(unique_genes) == 1:
 | 
| 
 | 
   772         return unique_genes[0]
 | 
| 
 | 
   773     else:
 | 
| 
 | 
   774         return ' or '.join(unique_genes)
 | 
| 
 | 
   775 
 | 
| 
 | 
   776 
 | 
| 
 | 
   777 def _simplify_boolean_expression(expr: str) -> str:
 | 
| 
 | 
   778     """
 | 
| 
 | 
   779     Simplify a boolean expression by removing duplicates while strictly preserving semantics.
 | 
| 
 | 
   780     This function handles simple duplicates within parentheses while being conservative about
 | 
| 
 | 
   781     complex expressions that could change semantics.
 | 
| 
 | 
   782     """
 | 
| 
 | 
   783     if not expr or not expr.strip():
 | 
| 
 | 
   784         return expr
 | 
| 
 | 
   785     
 | 
| 
 | 
   786     # Normalize operators and whitespace
 | 
| 
 | 
   787     expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
 | 
| 
 | 
   788     expr = ' '.join(expr.split())  # Normalize whitespace
 | 
| 
 | 
   789     
 | 
| 
 | 
   790     def simplify_parentheses_content(match_obj):
 | 
| 
 | 
   791         """Helper function to simplify content within parentheses."""
 | 
| 
 | 
   792         content = match_obj.group(1)  # Content inside parentheses
 | 
| 
 | 
   793         
 | 
| 
 | 
   794         # Only simplify if it's a pure OR or pure AND chain
 | 
| 
 | 
   795         if ' or ' in content and ' and ' not in content:
 | 
| 
 | 
   796             # Pure OR chain - safe to deduplicate
 | 
| 
 | 
   797             parts = [p.strip() for p in content.split(' or ') if p.strip()]
 | 
| 
 | 
   798             unique_parts = []
 | 
| 
 | 
   799             seen = set()
 | 
| 
 | 
   800             for part in parts:
 | 
| 
 | 
   801                 if part not in seen:
 | 
| 
 | 
   802                     unique_parts.append(part)
 | 
| 
 | 
   803                     seen.add(part)
 | 
| 
 | 
   804             
 | 
| 
 | 
   805             if len(unique_parts) == 1:
 | 
| 
 | 
   806                 return unique_parts[0]  # Remove unnecessary parentheses for single items
 | 
| 
 | 
   807             else:
 | 
| 
 | 
   808                 return '(' + ' or '.join(unique_parts) + ')'
 | 
| 
 | 
   809                 
 | 
| 
 | 
   810         elif ' and ' in content and ' or ' not in content:
 | 
| 
 | 
   811             # Pure AND chain - safe to deduplicate  
 | 
| 
 | 
   812             parts = [p.strip() for p in content.split(' and ') if p.strip()]
 | 
| 
 | 
   813             unique_parts = []
 | 
| 
 | 
   814             seen = set()
 | 
| 
 | 
   815             for part in parts:
 | 
| 
 | 
   816                 if part not in seen:
 | 
| 
 | 
   817                     unique_parts.append(part)
 | 
| 
 | 
   818                     seen.add(part)
 | 
| 
 | 
   819             
 | 
| 
 | 
   820             if len(unique_parts) == 1:
 | 
| 
 | 
   821                 return unique_parts[0]  # Remove unnecessary parentheses for single items
 | 
| 
 | 
   822             else:
 | 
| 
 | 
   823                 return '(' + ' and '.join(unique_parts) + ')'
 | 
| 
 | 
   824         else:
 | 
| 
 | 
   825             # Mixed operators or single item - return with parentheses as-is
 | 
| 
 | 
   826             return '(' + content + ')'
 | 
| 
 | 
   827     
 | 
| 
 | 
   828     def remove_duplicates_simple(parts_str: str, separator: str) -> str:
 | 
| 
 | 
   829         """Remove duplicates from a simple chain of operations."""
 | 
| 
 | 
   830         parts = [p.strip() for p in parts_str.split(separator) if p.strip()]
 | 
| 
 | 
   831         
 | 
| 
 | 
   832         # Remove duplicates while preserving order
 | 
| 
 | 
   833         unique_parts = []
 | 
| 
 | 
   834         seen = set()
 | 
| 
 | 
   835         for part in parts:
 | 
| 
 | 
   836             if part not in seen:
 | 
| 
 | 
   837                 unique_parts.append(part)
 | 
| 
 | 
   838                 seen.add(part)
 | 
| 
 | 
   839         
 | 
| 
 | 
   840         if len(unique_parts) == 1:
 | 
| 
 | 
   841             return unique_parts[0]
 | 
| 
 | 
   842         else:
 | 
| 
 | 
   843             return f' {separator} '.join(unique_parts)
 | 
| 
 | 
   844     
 | 
| 
 | 
   845     try:
 | 
| 
 | 
   846         import re
 | 
| 
 | 
   847         
 | 
| 
 | 
   848         # First, simplify content within parentheses
 | 
| 
 | 
   849         # This handles cases like (A or A) -> A and (B and B) -> B
 | 
| 
 | 
   850         expr_simplified = re.sub(r'\(([^()]+)\)', simplify_parentheses_content, expr)
 | 
| 
 | 
   851         
 | 
| 
 | 
   852         # Check if the resulting expression has mixed operators  
 | 
| 
 | 
   853         has_and = ' and ' in expr_simplified
 | 
| 
 | 
   854         has_or = ' or ' in expr_simplified
 | 
| 
 | 
   855         
 | 
| 
 | 
   856         # Only simplify top-level if it's pure AND or pure OR
 | 
| 
 | 
   857         if has_and and not has_or and '(' not in expr_simplified:
 | 
| 
 | 
   858             # Pure AND chain at top level - safe to deduplicate
 | 
| 
 | 
   859             return remove_duplicates_simple(expr_simplified, 'and')
 | 
| 
 | 
   860         elif has_or and not has_and and '(' not in expr_simplified:
 | 
| 
 | 
   861             # Pure OR chain at top level - safe to deduplicate  
 | 
| 
 | 
   862             return remove_duplicates_simple(expr_simplified, 'or')
 | 
| 
 | 
   863         else:
 | 
| 
 | 
   864             # Mixed operators or has parentheses - return the simplified version (with parentheses content cleaned)
 | 
| 
 | 
   865             return expr_simplified
 | 
| 
 | 
   866             
 | 
| 
 | 
   867     except Exception:
 | 
| 
 | 
   868         # If anything goes wrong, return the original expression
 | 
| 
 | 
   869         return expr
 | 
| 
 | 
   870 
 | 
| 
 | 
   871 
 | 
| 
 | 
   872 def translate_model_genes(model: 'cobra.Model',
 | 
| 
 | 
   873                          mapping_df: 'pd.DataFrame',
 | 
| 
 | 
   874                          target_nomenclature: str,
 | 
| 
 | 
   875                          source_nomenclature: str = 'hgnc_id',
 | 
| 
 | 
   876                          allow_many_to_one: bool = False,
 | 
| 
 | 
   877                          logger: Optional[logging.Logger] = None) -> Tuple['cobra.Model', Dict[str, str]]:
 | 
| 
 | 
   878     """
 | 
| 
 | 
   879     Translate model genes from source_nomenclature to target_nomenclature using a mapping table.
 | 
| 
 | 
   880     mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez).
 | 
| 
 | 
   881 
 | 
| 
 | 
   882     Args:
 | 
| 
 | 
   883         model: COBRA model to translate.
 | 
| 
 | 
   884         mapping_df: DataFrame containing the mapping information.
 | 
| 
 | 
   885         target_nomenclature: Desired target key (e.g., 'hgnc_symbol').
 | 
| 
 | 
   886         source_nomenclature: Current source key in the model (default 'hgnc_id').
 | 
| 
 | 
   887         allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs.
 | 
| 
 | 
   888         logger: Optional logger.
 | 
| 
 | 
   889     
 | 
| 
 | 
   890     Returns:
 | 
| 
 | 
   891         Tuple containing:
 | 
| 
 | 
   892         - Translated COBRA model
 | 
| 
 | 
   893         - Dictionary mapping reaction IDs to translation issue descriptions
 | 
| 
 | 
   894     """
 | 
| 
 | 
   895     if logger is None:
 | 
| 
 | 
   896         logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 | 
| 
 | 
   897         logger = logging.getLogger(__name__)
 | 
| 
 | 
   898 
 | 
| 
 | 
   899     logger.info(f"Translating genes from '{source_nomenclature}' to '{target_nomenclature}'")
 | 
| 
 | 
   900 
 | 
| 
 | 
   901     # normalize column names and choose relevant columns
 | 
| 
 | 
   902     chosen = _choose_columns(mapping_df)
 | 
| 
 | 
   903     if not chosen:
 | 
| 
 | 
   904         raise ValueError("Could not detect useful columns in mapping_df. Expected at least one of: ensg, hgnc_id, hgnc_symbol, entrez.")
 | 
| 
 | 
   905 
 | 
| 
 | 
   906     # map source/target to actual dataframe column names (allow user-specified source/target keys)
 | 
| 
 | 
   907     # normalize input args
 | 
| 
 | 
   908     src_key = source_nomenclature.strip().lower()
 | 
| 
 | 
   909     tgt_key = target_nomenclature.strip().lower()
 | 
| 
 | 
   910 
 | 
| 
 | 
   911     # try to find the actual column names for requested keys
 | 
| 
 | 
   912     col_for_src = None
 | 
| 
 | 
   913     col_for_tgt = None
 | 
| 
 | 
   914     # first, try exact match
 | 
| 
 | 
   915     for k, actual in chosen.items():
 | 
| 
 | 
   916         if k == src_key:
 | 
| 
 | 
   917             col_for_src = actual
 | 
| 
 | 
   918         if k == tgt_key:
 | 
| 
 | 
   919             col_for_tgt = actual
 | 
| 
 | 
   920 
 | 
| 
 | 
   921     # if not found, try mapping common names
 | 
| 
 | 
   922     if col_for_src is None:
 | 
| 
 | 
   923         possible_src_names = {k: v for k, v in chosen.items()}
 | 
| 
 | 
   924         # try to match by contained substring
 | 
| 
 | 
   925         for k, actual in possible_src_names.items():
 | 
| 
 | 
   926             if src_key in k:
 | 
| 
 | 
   927                 col_for_src = actual
 | 
| 
 | 
   928                 break
 | 
| 
 | 
   929 
 | 
| 
 | 
   930     if col_for_tgt is None:
 | 
| 
 | 
   931         for k, actual in chosen.items():
 | 
| 
 | 
   932             if tgt_key in k:
 | 
| 
 | 
   933                 col_for_tgt = actual
 | 
| 
 | 
   934                 break
 | 
| 
 | 
   935 
 | 
| 
 | 
   936     if col_for_src is None:
 | 
| 
 | 
   937         raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.")
 | 
| 
 | 
   938     if col_for_tgt is None:
 | 
| 
 | 
   939         raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.")
 | 
| 
 | 
   940 
 | 
| 
 | 
   941     model_source_genes = { _normalize_gene_id(g.id) for g in model.genes }
 | 
| 
 | 
   942     logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).")
 | 
| 
 | 
   943 
 | 
| 
 | 
   944     tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy()
 | 
| 
 | 
   945     tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).apply(_normalize_gene_id)
 | 
| 
 | 
   946 
 | 
| 
 | 
   947     filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy()
 | 
| 
 | 
   948 
 | 
| 
 | 
   949     if filtered_map.empty:
 | 
| 
 | 
   950         logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).")
 | 
| 
 | 
   951 
 | 
| 
 | 
   952     if not allow_many_to_one:
 | 
| 
 | 
   953         _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger)
 | 
| 
 | 
   954 
 | 
| 
 | 
   955     # Crea il mapping
 | 
| 
 | 
   956     gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger)
 | 
| 
 | 
   957 
 | 
| 
 | 
   958     # copy model
 | 
| 
 | 
   959     model_copy = model.copy()
 | 
| 
 | 
   960 
 | 
| 
 | 
   961     # statistics
 | 
| 
 | 
   962     stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0, 'flattened_or_gprs': 0}
 | 
| 
 | 
   963     unmapped = []
 | 
| 
 | 
   964     multi = []
 | 
| 
 | 
   965     
 | 
| 
 | 
   966     # Dictionary to store translation issues per reaction
 | 
| 
 | 
   967     reaction_translation_issues = {}
 | 
| 
 | 
   968 
 | 
| 
 | 
   969     original_genes = {g.id for g in model_copy.genes}
 | 
| 
 | 
   970     logger.info(f"Original genes count: {len(original_genes)}")
 | 
| 
 | 
   971 
 | 
| 
 | 
   972     # translate GPRs
 | 
| 
 | 
   973     for rxn in model_copy.reactions:
 | 
| 
 | 
   974         gpr = rxn.gene_reaction_rule
 | 
| 
 | 
   975         if gpr and gpr.strip():
 | 
| 
 | 
   976             new_gpr, rxn_issues = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger, track_issues=True)
 | 
| 
 | 
   977             if rxn_issues:
 | 
| 
 | 
   978                 reaction_translation_issues[rxn.id] = rxn_issues
 | 
| 
 | 
   979             
 | 
| 
 | 
   980             if new_gpr != gpr:
 | 
| 
 | 
   981                 # Check if this GPR has translation issues and contains only OR operators
 | 
| 
 | 
   982                 if rxn_issues and _is_or_only_expression(new_gpr):
 | 
| 
 | 
   983                     # Flatten the GPR: remove parentheses and create set of unique genes
 | 
| 
 | 
   984                     flattened_gpr = _flatten_or_only_gpr(new_gpr)
 | 
| 
 | 
   985                     if flattened_gpr != new_gpr:
 | 
| 
 | 
   986                         stats['flattened_or_gprs'] += 1
 | 
| 
 | 
   987                         logger.debug(f"Flattened OR-only GPR with issues for {rxn.id}: '{new_gpr}' -> '{flattened_gpr}'")
 | 
| 
 | 
   988                         new_gpr = flattened_gpr
 | 
| 
 | 
   989                 
 | 
| 
 | 
   990                 simplified_gpr = _simplify_boolean_expression(new_gpr)
 | 
| 
 | 
   991                 if simplified_gpr != new_gpr:
 | 
| 
 | 
   992                     stats['simplified_gprs'] += 1
 | 
| 
 | 
   993                     logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'")
 | 
| 
 | 
   994                 rxn.gene_reaction_rule = simplified_gpr
 | 
| 
 | 
   995                 logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{simplified_gpr}'")
 | 
| 
 | 
   996 
 | 
| 
 | 
   997     # update model genes based on new GPRs
 | 
| 
 | 
   998     _update_model_genes(model_copy, logger)
 | 
| 
 | 
   999 
 | 
| 
 | 
  1000     # final logging
 | 
| 
 | 
  1001     _log_translation_statistics(stats, unmapped, multi, original_genes, model_copy.genes, logger)
 | 
| 
 | 
  1002 
 | 
| 
 | 
  1003     logger.info("Translation finished")
 | 
| 
 | 
  1004     return model_copy, reaction_translation_issues
 | 
| 
 | 
  1005 
 | 
| 
 | 
  1006 
 | 
| 
 | 
  1007 # ---------- helper functions ----------
 | 
| 
 | 
  1008 def _create_gene_mapping(mapping_df, source_col: str, target_col: str, logger: logging.Logger) -> Dict[str, List[str]]:
 | 
| 
 | 
  1009     """
 | 
| 
 | 
  1010     Build mapping dict: source_id -> list of target_ids
 | 
| 
 | 
  1011     Normalizes IDs (removes prefixes like 'HGNC:' etc).
 | 
| 
 | 
  1012     """
 | 
| 
 | 
  1013     df = mapping_df[[source_col, target_col]].dropna().copy()
 | 
| 
 | 
  1014     # normalize to string
 | 
| 
 | 
  1015     df[source_col] = df[source_col].astype(str).apply(_normalize_gene_id)
 | 
| 
 | 
  1016     df[target_col] = df[target_col].astype(str).str.strip()
 | 
| 
 | 
  1017 
 | 
| 
 | 
  1018     df = df.drop_duplicates()
 | 
| 
 | 
  1019 
 | 
| 
 | 
  1020     logger.info(f"Creating mapping from {len(df)} rows")
 | 
| 
 | 
  1021 
 | 
| 
 | 
  1022     mapping = defaultdict(list)
 | 
| 
 | 
  1023     for _, row in df.iterrows():
 | 
| 
 | 
  1024         s = row[source_col]
 | 
| 
 | 
  1025         t = row[target_col]
 | 
| 
 | 
  1026         if t not in mapping[s]:
 | 
| 
 | 
  1027             mapping[s].append(t)
 | 
| 
 | 
  1028 
 | 
| 
 | 
  1029     # stats
 | 
| 
 | 
  1030     one_to_one = sum(1 for v in mapping.values() if len(v) == 1)
 | 
| 
 | 
  1031     one_to_many = sum(1 for v in mapping.values() if len(v) > 1)
 | 
| 
 | 
  1032     logger.info(f"Mapping: {len(mapping)} source keys, {one_to_one} 1:1, {one_to_many} 1:many")
 | 
| 
 | 
  1033     return dict(mapping)
 | 
| 
 | 
  1034 
 | 
| 
 | 
  1035 
 | 
| 
 | 
  1036 def _translate_gpr(gpr_string: str,
 | 
| 
 | 
  1037                    gene_mapping: Dict[str, List[str]],
 | 
| 
 | 
  1038                    stats: Dict[str, int],
 | 
| 
 | 
  1039                    unmapped_genes: List[str],
 | 
| 
 | 
  1040                    multi_mapping_genes: List[Tuple[str, List[str]]],
 | 
| 
 | 
  1041                    logger: logging.Logger,
 | 
| 
 | 
  1042                    track_issues: bool = False) -> Union[str, Tuple[str, str]]:
 | 
| 
 | 
  1043     """
 | 
| 
 | 
  1044     Translate genes inside a GPR string using gene_mapping.
 | 
| 
 | 
  1045     Returns new GPR string, and optionally translation issues if track_issues=True.
 | 
| 
 | 
  1046     """
 | 
| 
 | 
  1047     # Generic token pattern: letters, digits, :, _, -, ., (captures HGNC:1234, ENSG000..., symbols)
 | 
| 
 | 
  1048     token_pattern = r'\b[A-Za-z0-9:_.-]+\b'
 | 
| 
 | 
  1049     tokens = re.findall(token_pattern, gpr_string)
 | 
| 
 | 
  1050 
 | 
| 
 | 
  1051     logical = {'and', 'or', 'AND', 'OR', '(', ')'}
 | 
| 
 | 
  1052     tokens = [t for t in tokens if t not in logical]
 | 
| 
 | 
  1053 
 | 
| 
 | 
  1054     new_gpr = gpr_string
 | 
| 
 | 
  1055     issues = []
 | 
| 
 | 
  1056 
 | 
| 
 | 
  1057     for token in sorted(set(tokens), key=lambda x: -len(x)):  # longer tokens first to avoid partial replacement
 | 
| 
 | 
  1058         norm = _normalize_gene_id(token)
 | 
| 
 | 
  1059         if norm in gene_mapping:
 | 
| 
 | 
  1060             targets = gene_mapping[norm]
 | 
| 
 | 
  1061             stats['translated'] += 1
 | 
| 
 | 
  1062             if len(targets) == 1:
 | 
| 
 | 
  1063                 stats['one_to_one'] += 1
 | 
| 
 | 
  1064                 replacement = targets[0]
 | 
| 
 | 
  1065             else:
 | 
| 
 | 
  1066                 stats['one_to_many'] += 1
 | 
| 
 | 
  1067                 multi_mapping_genes.append((token, targets))
 | 
| 
 | 
  1068                 replacement = "(" + " or ".join(targets) + ")"
 | 
| 
 | 
  1069                 if track_issues:
 | 
| 
 | 
  1070                     issues.append(f"{token} -> {' or '.join(targets)}")
 | 
| 
 | 
  1071 
 | 
| 
 | 
  1072             pattern = r'\b' + re.escape(token) + r'\b'
 | 
| 
 | 
  1073             new_gpr = re.sub(pattern, replacement, new_gpr)
 | 
| 
 | 
  1074         else:
 | 
| 
 | 
  1075             stats['not_found'] += 1
 | 
| 
 | 
  1076             if token not in unmapped_genes:
 | 
| 
 | 
  1077                 unmapped_genes.append(token)
 | 
| 
 | 
  1078             logger.debug(f"Token not found in mapping (left as-is): {token}")
 | 
| 
 | 
  1079 
 | 
| 
 | 
  1080     # Check for many-to-one cases (multiple source genes mapping to same target)
 | 
| 
 | 
  1081     if track_issues:
 | 
| 
 | 
  1082         # Build reverse mapping to detect many-to-one cases from original tokens
 | 
| 
 | 
  1083         original_to_target = {}
 | 
| 
 | 
  1084         
 | 
| 
 | 
  1085         for orig_token in tokens:
 | 
| 
 | 
  1086             norm = _normalize_gene_id(orig_token)
 | 
| 
 | 
  1087             if norm in gene_mapping:
 | 
| 
 | 
  1088                 targets = gene_mapping[norm]
 | 
| 
 | 
  1089                 for target in targets:
 | 
| 
 | 
  1090                     if target not in original_to_target:
 | 
| 
 | 
  1091                         original_to_target[target] = []
 | 
| 
 | 
  1092                     if orig_token not in original_to_target[target]:
 | 
| 
 | 
  1093                         original_to_target[target].append(orig_token)
 | 
| 
 | 
  1094         
 | 
| 
 | 
  1095         # Identify many-to-one mappings in this specific GPR
 | 
| 
 | 
  1096         for target, original_genes in original_to_target.items():
 | 
| 
 | 
  1097             if len(original_genes) > 1:
 | 
| 
 | 
  1098                 issues.append(f"{' or '.join(original_genes)} -> {target}")
 | 
| 
 | 
  1099     
 | 
| 
 | 
  1100     issue_text = "; ".join(issues) if issues else ""
 | 
| 
 | 
  1101     
 | 
| 
 | 
  1102     if track_issues:
 | 
| 
 | 
  1103         return new_gpr, issue_text
 | 
| 
 | 
  1104     else:
 | 
| 
 | 
  1105         return new_gpr
 | 
| 
 | 
  1106 
 | 
| 
 | 
  1107 
 | 
| 
 | 
  1108 def _update_model_genes(model: 'cobra.Model', logger: logging.Logger):
 | 
| 
 | 
  1109     """
 | 
| 
 | 
  1110     Rebuild model.genes from gene_reaction_rule content.
 | 
| 
 | 
  1111     Removes genes not referenced and adds missing ones.
 | 
| 
 | 
  1112     """
 | 
| 
 | 
  1113     # collect genes in GPRs
 | 
| 
 | 
  1114     gene_pattern = r'\b[A-Za-z0-9:_.-]+\b'
 | 
| 
 | 
  1115     logical = {'and', 'or', 'AND', 'OR', '(', ')'}
 | 
| 
 | 
  1116     genes_in_gpr: Set[str] = set()
 | 
| 
 | 
  1117 
 | 
| 
 | 
  1118     for rxn in model.reactions:
 | 
| 
 | 
  1119         gpr = rxn.gene_reaction_rule
 | 
| 
 | 
  1120         if gpr and gpr.strip():
 | 
| 
 | 
  1121             toks = re.findall(gene_pattern, gpr)
 | 
| 
 | 
  1122             toks = [t for t in toks if t not in logical]
 | 
| 
 | 
  1123             # normalize IDs consistent with mapping normalization
 | 
| 
 | 
  1124             toks = [_normalize_gene_id(t) for t in toks]
 | 
| 
 | 
  1125             genes_in_gpr.update(toks)
 | 
| 
 | 
  1126 
 | 
| 
 | 
  1127     # existing gene ids
 | 
| 
 | 
  1128     existing = {g.id for g in model.genes}
 | 
| 
 | 
  1129 
 | 
| 
 | 
  1130     # remove obsolete genes
 | 
| 
 | 
  1131     to_remove = [gid for gid in existing if gid not in genes_in_gpr]
 | 
| 
 | 
  1132     removed = 0
 | 
| 
 | 
  1133     for gid in to_remove:
 | 
| 
 | 
  1134         try:
 | 
| 
 | 
  1135             gene_obj = model.genes.get_by_id(gid)
 | 
| 
 | 
  1136             model.genes.remove(gene_obj)
 | 
| 
 | 
  1137             removed += 1
 | 
| 
 | 
  1138         except Exception:
 | 
| 
 | 
  1139             # safe-ignore
 | 
| 
 | 
  1140             pass
 | 
| 
 | 
  1141 
 | 
| 
 | 
  1142     # add new genes
 | 
| 
 | 
  1143     added = 0
 | 
| 
 | 
  1144     for gid in genes_in_gpr:
 | 
| 
 | 
  1145         if gid not in existing:
 | 
| 
 | 
  1146             new_gene = cobra.Gene(gid)
 | 
| 
 | 
  1147             try:
 | 
| 
 | 
  1148                 model.genes.add(new_gene)
 | 
| 
 | 
  1149             except Exception:
 | 
| 
 | 
  1150                 # fallback: if model.genes doesn't support add, try append or model.add_genes
 | 
| 
 | 
  1151                 try:
 | 
| 
 | 
  1152                     model.genes.append(new_gene)
 | 
| 
 | 
  1153                 except Exception:
 | 
| 
 | 
  1154                     try:
 | 
| 
 | 
  1155                         model.add_genes([new_gene])
 | 
| 
 | 
  1156                     except Exception:
 | 
| 
 | 
  1157                         logger.warning(f"Could not add gene object for {gid}")
 | 
| 
 | 
  1158             added += 1
 | 
| 
 | 
  1159 
 | 
| 
 | 
  1160     logger.info(f"Model genes updated: removed {removed}, added {added}")
 | 
| 
 | 
  1161 
 | 
| 
 | 
  1162 
 | 
| 
 | 
  1163 def export_model_to_tabular(model: cobraModel, 
 | 
| 
 | 
  1164                            output_path: str,
 | 
| 
 | 
  1165                            translation_issues: Dict = None,
 | 
| 
 | 
  1166                            include_objective: bool = True,
 | 
| 
 | 
  1167                            save_function = None) -> pd.DataFrame:
 | 
| 
 | 
  1168     """
 | 
| 
 | 
  1169     Export a COBRA model to tabular format with optional components.
 | 
| 
 | 
  1170     
 | 
| 
 | 
  1171     Args:
 | 
| 
 | 
  1172         model: COBRA model to export
 | 
| 
 | 
  1173         output_path: Path where to save the tabular file
 | 
| 
 | 
  1174         translation_issues: Optional dict of {reaction_id: issues} from gene translation
 | 
| 
 | 
  1175         include_objective: Whether to include objective coefficient column
 | 
| 
 | 
  1176         save_function: Optional custom save function, if None uses pd.DataFrame.to_csv
 | 
| 
 | 
  1177         
 | 
| 
 | 
  1178     Returns:
 | 
| 
 | 
  1179         pd.DataFrame: The merged tabular data
 | 
| 
 | 
  1180     """
 | 
| 
 | 
  1181     # Generate model data
 | 
| 
 | 
  1182     rules = generate_rules(model, asParsed=False)
 | 
| 
 | 
  1183     
 | 
| 
 | 
  1184     reactions = generate_reactions(model, asParsed=False)
 | 
| 
 | 
  1185     bounds = generate_bounds(model)
 | 
| 
 | 
  1186     medium = get_medium(model)
 | 
| 
 | 
  1187     compartments = generate_compartments(model)
 | 
| 
 | 
  1188     
 | 
| 
 | 
  1189     # Create base DataFrames
 | 
| 
 | 
  1190     df_rules = pd.DataFrame(list(rules.items()), columns=["ReactionID", "GPR"])
 | 
| 
 | 
  1191     df_reactions = pd.DataFrame(list(reactions.items()), columns=["ReactionID", "Formula"])
 | 
| 
 | 
  1192     df_bounds = bounds.reset_index().rename(columns={"index": "ReactionID"})
 | 
| 
 | 
  1193     df_medium = medium.rename(columns={"reaction": "ReactionID"})
 | 
| 
 | 
  1194     df_medium["InMedium"] = True
 | 
| 
 | 
  1195     
 | 
| 
 | 
  1196     # Start merging
 | 
| 
 | 
  1197     merged = df_reactions.merge(df_rules, on="ReactionID", how="outer")
 | 
| 
 | 
  1198     merged = merged.merge(df_bounds, on="ReactionID", how="outer")
 | 
| 
 | 
  1199     
 | 
| 
 | 
  1200     # Add objective coefficients if requested
 | 
| 
 | 
  1201     if include_objective:
 | 
| 
 | 
  1202         objective_function = extract_objective_coefficients(model)
 | 
| 
 | 
  1203         merged = merged.merge(objective_function, on="ReactionID", how="outer")
 | 
| 
 | 
  1204     
 | 
| 
 | 
  1205     # Add compartments/pathways if they exist
 | 
| 
 | 
  1206     if compartments is not None:
 | 
| 
 | 
  1207         merged = merged.merge(compartments, on="ReactionID", how="outer")
 | 
| 
 | 
  1208     
 | 
| 
 | 
  1209     # Add medium information
 | 
| 
 | 
  1210     merged = merged.merge(df_medium, on="ReactionID", how="left")
 | 
| 
 | 
  1211     
 | 
| 
 | 
  1212     # Add translation issues if provided
 | 
| 
 | 
  1213     if translation_issues:
 | 
| 
 | 
  1214         df_translation_issues = pd.DataFrame([
 | 
| 
 | 
  1215             {"ReactionID": rxn_id, "TranslationIssues": issues}
 | 
| 
 | 
  1216             for rxn_id, issues in translation_issues.items()
 | 
| 
 | 
  1217         ])
 | 
| 
 | 
  1218         if not df_translation_issues.empty:
 | 
| 
 | 
  1219             merged = merged.merge(df_translation_issues, on="ReactionID", how="left")
 | 
| 
 | 
  1220             merged["TranslationIssues"] = merged["TranslationIssues"].fillna("")
 | 
| 
 | 
  1221     
 | 
| 
 | 
  1222     # Final processing
 | 
| 
 | 
  1223     merged["InMedium"] = merged["InMedium"].fillna(False)
 | 
| 
 | 
  1224     merged = merged.sort_values(by="InMedium", ascending=False)
 | 
| 
 | 
  1225     
 | 
| 
 | 
  1226     # Save the file
 | 
| 
 | 
  1227     if save_function:
 | 
| 
 | 
  1228         save_function(merged, output_path)
 | 
| 
 | 
  1229     else:
 | 
| 
 | 
  1230         merged.to_csv(output_path, sep="\t", index=False)
 | 
| 
 | 
  1231     
 | 
| 
 | 
  1232     return merged
 | 
| 
 | 
  1233 
 | 
| 
 | 
  1234 
 | 
| 
 | 
  1235 def _log_translation_statistics(stats: Dict[str, int],
 | 
| 
 | 
  1236                                unmapped_genes: List[str],
 | 
| 
 | 
  1237                                multi_mapping_genes: List[Tuple[str, List[str]]],
 | 
| 
 | 
  1238                                original_genes: Set[str],
 | 
| 
 | 
  1239                                final_genes,
 | 
| 
 | 
  1240                                logger: logging.Logger):
 | 
| 
 | 
  1241     logger.info("=== TRANSLATION STATISTICS ===")
 | 
| 
 | 
  1242     logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})")
 | 
| 
 | 
  1243     logger.info(f"Not found tokens: {stats.get('not_found', 0)}")
 | 
| 
 | 
  1244     logger.info(f"Simplified GPRs: {stats.get('simplified_gprs', 0)}")
 | 
| 
 | 
  1245     logger.info(f"Flattened OR-only GPRs with issues: {stats.get('flattened_or_gprs', 0)}")
 | 
| 
 | 
  1246 
 | 
| 
 | 
  1247     final_ids = {g.id for g in final_genes}
 | 
| 
 | 
  1248     logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}")
 | 
| 
 | 
  1249 
 | 
| 
 | 
  1250     if unmapped_genes:
 | 
| 
 | 
  1251         logger.warning(f"Unmapped tokens ({len(unmapped_genes)}): {', '.join(unmapped_genes[:20])}{(' ...' if len(unmapped_genes)>20 else '')}")
 | 
| 
 | 
  1252     if multi_mapping_genes:
 | 
| 
 | 
  1253         logger.info(f"Multi-mapping examples ({len(multi_mapping_genes)}):")
 | 
| 
 | 
  1254         for orig, targets in multi_mapping_genes[:10]:
 | 
| 
 | 
  1255             logger.info(f"  {orig} -> {', '.join(targets)}")
 | 
| 
 | 
  1256     
 | 
| 
 | 
  1257     # Log summary of flattened GPRs if any
 | 
| 
 | 
  1258     if stats.get('flattened_or_gprs', 0) > 0:
 | 
| 
 | 
  1259         logger.info(f"Flattened {stats['flattened_or_gprs']} OR-only GPRs that had translation issues (removed parentheses, created unique gene sets)")
 | 
| 
 | 
  1260 
 | 
| 
 | 
  1261      |