Mercurial > repos > bimib > cobraxy
diff COBRAxy/src/utils/model_utils.py @ 539:2fb97466e404 draft
Uploaded
| author | francesco_lapi |
|---|---|
| date | Sat, 25 Oct 2025 14:55:13 +0000 |
| parents | |
| children | fcdbc81feb45 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/COBRAxy/src/utils/model_utils.py Sat Oct 25 14:55:13 2025 +0000 @@ -0,0 +1,1256 @@ +""" +Utilities for generating and manipulating COBRA models and related metadata. + +This module includes helpers to: +- extract rules, reactions, bounds, objective coefficients, and compartments +- build a COBRA model from a tabular file +- set objective and medium from dataframes +- validate a model and convert gene identifiers +- translate model GPRs using mapping tables +""" +import os +import cobra +import pandas as pd +import re +import logging +from typing import Optional, Tuple, Union, List, Dict, Set +from collections import defaultdict +import utils.rule_parsing as rulesUtils +import utils.reaction_parsing as reactionUtils +from cobra import Model as cobraModel, Reaction, Metabolite +import sys + + +############################ check_methods #################################### +def gene_type(l :str, name :str) -> str: + """ + Determine the type of gene ID. + + Args: + l (str): The gene identifier to check. + name (str): The name of the dataset, used in error messages. + + Returns: + str: The type of gene ID ('HGNC_ID', 'ENSG', 'HGNC_symbol', or 'entrez_id'). + + Raises: + sys.exit: If the gene ID type is not supported, the execution is aborted. + """ + if check_hgnc(l): + return 'HGNC_ID' + elif check_ensembl(l): + return 'ENSG' + elif check_symbol(l): + return 'HGNC_symbol' + elif check_entrez(l): + return 'entrez_id' + else: + sys.exit('Execution aborted:\n' + + 'gene ID type in ' + name + ' not supported. Supported ID'+ + 'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n') + +def check_hgnc(l :str) -> bool: + """ + Check if a gene identifier follows the HGNC format. + + Args: + l (str): The gene identifier to check. + + Returns: + bool: True if the gene identifier follows the HGNC format, False otherwise. + """ + if len(l) > 5: + if (l.upper()).startswith('HGNC:'): + return l[5:].isdigit() + else: + return False + else: + return False + +def check_ensembl(l :str) -> bool: + """ + Check if a gene identifier follows the Ensembl format. + + Args: + l (str): The gene identifier to check. + + Returns: + bool: True if the gene identifier follows the Ensembl format, False otherwise. + """ + return l.upper().startswith('ENS') + + +def check_symbol(l :str) -> bool: + """ + Check if a gene identifier follows the symbol format. + + Args: + l (str): The gene identifier to check. + + Returns: + bool: True if the gene identifier follows the symbol format, False otherwise. + """ + if len(l) > 0: + if l[0].isalpha() and l[1:].isalnum(): + return True + else: + return False + else: + return False + +def check_entrez(l :str) -> bool: + """ + Check if a gene identifier follows the Entrez ID format. + + Args: + l (str): The gene identifier to check. + + Returns: + bool: True if the gene identifier follows the Entrez ID format, False otherwise. + """ + if len(l) > 0: + return l.isdigit() + else: + return False + +################################- DATA GENERATION -################################ +ReactionId = str +def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: + """ + Generate a dictionary mapping reaction IDs to GPR rules from the model. + + Args: + model: COBRA model to derive data from. + asParsed: If True, parse rules into a nested list structure; otherwise keep raw strings. + + Returns: + Dict[ReactionId, rulesUtils.OpList]: Parsed rules by reaction ID. + Dict[ReactionId, str]: Raw rules by reaction ID. + """ + _ruleGetter = lambda reaction : reaction.gene_reaction_rule + ruleExtractor = (lambda reaction : + rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter + + return { + reaction.id : ruleExtractor(reaction) + for reaction in model.reactions + if reaction.gene_reaction_rule } + +def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]: + """ + Generate a dictionary mapping reaction IDs to reaction formulas from the model. + + Args: + model: COBRA model to derive data from. + asParsed: If True, convert formulas into a parsed representation; otherwise keep raw strings. + + Returns: + Dict[ReactionId, str]: Reactions by reaction ID (parsed if requested). + """ + + unparsedReactions = { + reaction.id : reaction.reaction + for reaction in model.reactions + if reaction.reaction + } + + if not asParsed: return unparsedReactions + + return reactionUtils.create_reaction_dict(unparsedReactions) + +def get_medium(model:cobraModel) -> pd.DataFrame: + """ + Extract the uptake reactions representing the model medium. + + Returns a DataFrame with a single column 'reaction' listing exchange reactions + with negative lower bound and no positive stoichiometric coefficients (uptake only). + """ + trueMedium=[] + for r in model.reactions: + positiveCoeff=0 + for m in r.metabolites: + if r.get_coefficient(m.id)>0: + positiveCoeff=1; + if (positiveCoeff==0 and r.lower_bound<0): + trueMedium.append(r.id) + + df_medium = pd.DataFrame() + df_medium["reaction"] = trueMedium + return df_medium + +def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame: + """ + Extract objective coefficients for each reaction. + + Args: + model: COBRA model + + Returns: + pd.DataFrame with columns: ReactionID, ObjectiveCoefficient + """ + coeffs = [] + # model.objective.expression is a linear expression + objective_expr = model.objective.expression.as_coefficients_dict() + + for reaction in model.reactions: + coeff = objective_expr.get(reaction.forward_variable, 0.0) + coeffs.append({ + "ReactionID": reaction.id, + "ObjectiveCoefficient": coeff + }) + + return pd.DataFrame(coeffs) + +def generate_bounds(model:cobraModel) -> pd.DataFrame: + """ + Build a DataFrame of lower/upper bounds for all reactions. + + Returns: + pd.DataFrame indexed by reaction IDs with columns ['lower_bound', 'upper_bound']. + """ + + rxns = [] + for reaction in model.reactions: + rxns.append(reaction.id) + + bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns) + + for reaction in model.reactions: + bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound] + return bounds + + + +def generate_compartments(model: cobraModel) -> pd.DataFrame: + """ + Generates a DataFrame containing pathway information for each reaction. + Creates columns for each pathway position (Pathway_1, Pathway_2, etc.) only if pathways exist. + + Args: + model: the COBRA model to extract pathway data from. + + Returns: + pd.DataFrame: DataFrame with ReactionID and pathway columns (if any pathways exist) + """ + pathway_data = [] + + # First pass: determine the maximum number of pathways any reaction has + max_pathways = 0 + reaction_pathways = {} + has_any_pathways = False + + for reaction in model.reactions: + # Get unique pathways from all metabolites in the reaction + if 'pathways' in reaction.annotation and reaction.annotation['pathways']: + if type(reaction.annotation['pathways']) == list: + # Filter out empty/None values + valid_pathways = [p for p in reaction.annotation['pathways'] if p] + if valid_pathways: + reaction_pathways[reaction.id] = valid_pathways + max_pathways = max(max_pathways, len(valid_pathways)) + has_any_pathways = True + else: + reaction_pathways[reaction.id] = [] + else: + # Single pathway value + if reaction.annotation['pathways']: + reaction_pathways[reaction.id] = [reaction.annotation['pathways']] + max_pathways = max(max_pathways, 1) + has_any_pathways = True + else: + reaction_pathways[reaction.id] = [] + else: + # No pathway annotation - use empty list + reaction_pathways[reaction.id] = [] + + # If no pathways exist in the model, return DataFrame with only ReactionID + if not has_any_pathways: + return None + + # Create column names for pathways only if they exist + pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)] + + # Second pass: create the data with pathway columns + for reaction_id, pathways in reaction_pathways.items(): + row = {"ReactionID": reaction_id} + + # Fill pathway columns + for i in range(max_pathways): + col_name = pathway_columns[i] + if i < len(pathways): + row[col_name] = pathways[i] + else: + row[col_name] = None + + pathway_data.append(row) + + return pd.DataFrame(pathway_data) + +def set_annotation_pathways_from_data(model: cobraModel, df: pd.DataFrame): + """Set reaction pathways based on 'Pathway_1', 'Pathway_2', ... columns in the dataframe.""" + pathway_cols = [col for col in df.columns if col.startswith('Pathway_')] + if not pathway_cols: + print("No 'Pathway_' columns found, skipping pathway annotation") + return + + pathway_data = defaultdict(list) + + for idx, row in df.iterrows(): + reaction_id = str(row['ReactionID']).strip() + if reaction_id not in model.reactions: + continue + + pathways = [] + for col in pathway_cols: + if pd.notna(row[col]) and str(row[col]).strip(): + pathways.append(str(row[col]).strip()) + + if pathways: + + reaction = model.reactions.get_by_id(reaction_id) + if len(pathways) == 1: + reaction.annotation['pathways'] = pathways[0] + else: + reaction.annotation['pathways'] = pathways + + pathway_data[reaction_id] = pathways + + print(f"Annotated {len(pathway_data)} reactions with pathways.") + +def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel: + """ + Build a COBRApy model from a tabular file with reaction data. + + Args: + csv_path: Path to the tab-separated file. + model_id: ID for the newly created model. + + Returns: + cobra.Model: The constructed COBRApy model. + """ + + # Try to detect separator + with open(csv_path, 'r') as f: + first_line = f.readline() + sep = '\t' if '\t' in first_line else ',' + + df = pd.read_csv(csv_path, sep=sep) + + # Check required columns + required_cols = ['ReactionID', 'Formula'] + missing_cols = [col for col in required_cols if col not in df.columns] + if missing_cols: + raise ValueError(f"Missing required columns: {missing_cols}. Available columns: {list(df.columns)}") + + model = cobraModel(model_id) + + metabolites_dict = {} + compartments_dict = {} + + print(f"Building model from {len(df)} reactions...") + + for idx, row in df.iterrows(): + reaction_formula = str(row['Formula']).strip() + if not reaction_formula or reaction_formula == 'nan': + continue + + metabolites = extract_metabolites_from_reaction(reaction_formula) + + for met_id in metabolites: + compartment = extract_compartment_from_metabolite(met_id) + + if compartment not in compartments_dict: + compartments_dict[compartment] = compartment + + if met_id not in metabolites_dict: + metabolites_dict[met_id] = Metabolite( + id=met_id, + compartment=compartment, + name=met_id.replace(f"_{compartment}", "").replace("__", "_") + ) + + model.compartments = compartments_dict + + model.add_metabolites(list(metabolites_dict.values())) + + print(f"Added {len(metabolites_dict)} metabolites and {len(compartments_dict)} compartments") + + reactions_added = 0 + reactions_skipped = 0 + + for idx, row in df.iterrows(): + + reaction_id = str(row['ReactionID']).strip() + reaction_formula = str(row['Formula']).strip() + + if not reaction_formula or reaction_formula == 'nan': + raise ValueError(f"Missing reaction formula for {reaction_id}") + + reaction = Reaction(reaction_id) + reaction.name = reaction_id + + reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 + reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 + + if pd.notna(row['GPR']) and str(row['GPR']).strip(): + reaction.gene_reaction_rule = str(row['GPR']).strip() + + try: + parse_reaction_formula(reaction, reaction_formula, metabolites_dict) + except Exception as e: + print(f"Error parsing reaction {reaction_id}: {e}") + reactions_skipped += 1 + continue + + model.add_reactions([reaction]) + reactions_added += 1 + + + print(f"Added {reactions_added} reactions, skipped {reactions_skipped} reactions") + + # set objective function + set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient") + + set_medium_from_data(model, df) + + set_annotation_pathways_from_data(model, df) + + print(f"Model completed: {len(model.reactions)} reactions, {len(model.metabolites)} metabolites") + + return model + + +# Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore) +#def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: +# """ +# Extract metabolite IDs from a reaction formula. +# Robust pattern: tokens ending with _<compartment> (e.g., _c, _m, _e), +# allowing leading digits/underscores. +# """ +# metabolites = set() +# # optional coefficient followed by a token ending with _<letters> +# if reaction_formula[-1] == ']' and reaction_formula[-3] == '[': +# pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+[[A-Za-z0-9]]+)' +# else: +# pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[A-Za-z0-9]+)' +# matches = re.findall(pattern, reaction_formula) +# metabolites.update(matches) +# return metabolites + + +def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: + """ + Extract metabolite IDs from a reaction formula. + + Handles: + - optional stoichiometric coefficients (integers or decimals) + - compartment tags at the end of the metabolite, either [c] or _c + + Returns the IDs including the compartment suffix exactly as written. + """ + pattern = re.compile( + r'(?:^|(?<=\s)|(?<=\+)|(?<=,)|(?<==)|(?<=:))' # left boundary (start, space, +, comma, =, :) + r'(?:\d+(?:\.\d+)?\s+)?' # optional coefficient (requires space after) + r'([A-Za-z0-9][A-Za-z0-9_]*(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+))' # metabolite + compartment (can start with number) + ) + return {m.group(1) for m in pattern.finditer(reaction_formula)} + + + +def extract_compartment_from_metabolite(metabolite_id: str) -> str: + """Extract the compartment from a metabolite ID.""" + if '_' == metabolite_id[-2]: + return metabolite_id.split('_')[-1] + if metabolite_id[-1] == ']' and metabolite_id[-3] == '[': + return metabolite_id[-2] + return 'c' # default cytoplasm + + +def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]): + """Parse a reaction formula and set metabolites with their coefficients.""" + + if '<=>' in formula: + parts = formula.split('<=>') + reversible = True + elif '<--' in formula: + parts = formula.split('<--') + reversible = False + elif '-->' in formula: + parts = formula.split('-->') + reversible = False + elif '<-' in formula: + parts = formula.split('<-') + reversible = False + else: + raise ValueError(f"Unrecognized reaction format: {formula}") + + # Handle cases where one side might be empty (exchange reactions) + if len(parts) != 2: + raise ValueError(f"Invalid reaction format, expected 2 parts: {formula}") + + left, right = parts[0].strip(), parts[1].strip() + + reactants = parse_metabolites_side(left) if left else {} + products = parse_metabolites_side(right) if right else {} + + metabolites_to_add = {} + + for met_id, coeff in reactants.items(): + if met_id in metabolites_dict: + metabolites_to_add[metabolites_dict[met_id]] = -coeff + + for met_id, coeff in products.items(): + if met_id in metabolites_dict: + metabolites_to_add[metabolites_dict[met_id]] = coeff + + reaction.add_metabolites(metabolites_to_add) + + +def parse_metabolites_side(side_str: str) -> Dict[str, float]: + """Parse one side of a reaction and extract metabolites with coefficients.""" + metabolites = {} + if not side_str or side_str.strip() == '': + return metabolites + + terms = side_str.split('+') + for term in terms: + term = term.strip() + if not term: + continue + + # First check if term has space-separated coefficient and metabolite + parts = term.split() + if len(parts) == 2: + # Two parts: potential coefficient + metabolite + try: + coeff = float(parts[0]) + met_id = parts[1] + # Verify the second part looks like a metabolite with compartment + if re.match(r'[A-Za-z0-9_]+(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+)', met_id): + metabolites[met_id] = coeff + continue + except ValueError: + pass + + # Single term - check if it's a metabolite (no coefficient) + # Updated pattern to include metabolites starting with numbers + if re.match(r'[A-Za-z0-9][A-Za-z0-9_]*(?:\[[A-Za-z0-9]+\]|_[A-Za-z0-9]+)', term): + metabolites[term] = 1.0 + else: + print(f"Warning: Could not parse metabolite term: '{term}'") + + return metabolites + + + +def set_objective_from_csv(model: cobra.Model, df: pd.DataFrame, obj_col: str = "ObjectiveCoefficient"): + """ + Sets the model's objective function based on a column of coefficients in the CSV. + Can be any reaction(s), not necessarily biomass. + """ + obj_dict = {} + + for idx, row in df.iterrows(): + reaction_id = str(row['ReactionID']).strip() + coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0 + if coeff != 0: + if reaction_id in model.reactions: + obj_dict[model.reactions.get_by_id(reaction_id)] = coeff + else: + print(f"Warning: reaction {reaction_id} not found in model, skipping for objective.") + + if not obj_dict: + raise ValueError("No reactions found with non-zero objective coefficient.") + + model.objective = obj_dict + print(f"Objective set with {len(obj_dict)} reactions.") + + + + +def set_medium_from_data(model: cobraModel, df: pd.DataFrame): + """Set the medium based on the 'InMedium' column in the dataframe.""" + if 'InMedium' not in df.columns: + print("No 'InMedium' column found, skipping medium setup") + return + + medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist() + + medium_dict = {} + for rxn_id in medium_reactions: + if rxn_id in [r.id for r in model.reactions]: + reaction = model.reactions.get_by_id(rxn_id) + if reaction.lower_bound < 0: + medium_dict[rxn_id] = abs(reaction.lower_bound) + + if medium_dict: + model.medium = medium_dict + print(f"Medium set with {len(medium_dict)} components") + else: + print("No medium components found") +def validate_model(model: cobraModel) -> Dict[str, any]: + """Validate the model and return basic statistics.""" + validation = { + 'num_reactions': len(model.reactions), + 'num_metabolites': len(model.metabolites), + 'num_genes': len(model.genes), + 'num_compartments': len(model.compartments), + 'objective': str(model.objective), + 'medium_size': len(model.medium), + 'reversible_reactions': len([r for r in model.reactions if r.reversibility]), + 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]), + } + + try: + # Growth test + solution = model.optimize() + validation['growth_rate'] = solution.objective_value + validation['status'] = solution.status + except Exception as e: + validation['growth_rate'] = None + validation['status'] = f"Error: {e}" + + return validation + +def convert_genes(model, annotation): + """Rename genes using a selected annotation key in gene.notes; returns a model copy.""" + from cobra.manipulation import rename_genes + model2=model.copy() + try: + dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes} + except: + print("No annotation in gene dict!") + return -1 + rename_genes(model2,dict_genes) + + return model2 + +# ---------- Utility helpers ---------- +def _normalize_colname(col: str) -> str: + return col.strip().lower().replace(' ', '_') + +def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]: + """ + Find useful columns and return a dict {ensg: colname1, hgnc_id: colname2, ...}. + Raise ValueError if no suitable mapping is found. + """ + cols = { _normalize_colname(c): c for c in mapping_df.columns } + chosen = {} + # candidate names for each category + candidates = { + 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'], + 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'], + 'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'], + 'entrez_id': ['entrez', 'entrez_id', 'entrezgene'], + 'gene_number': ['gene_number'] + } + for key, names in candidates.items(): + for n in names: + if n in cols: + chosen[key] = cols[n] + break + return chosen + +def _validate_target_uniqueness(mapping_df: 'pd.DataFrame', + source_col: str, + target_col: str, + model_source_genes: Optional[Set[str]] = None, + logger: Optional[logging.Logger] = None) -> None: + """ + Check that, within the filtered mapping_df, each target maps to at most one source. + Log examples if duplicates are found. + """ + if logger is None: + logger = logging.getLogger(__name__) + + if mapping_df.empty: + logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.") + return + + # normalize temporary columns for grouping (without altering the original df) + tmp = mapping_df[[source_col, target_col]].copy() + tmp['_src_norm'] = tmp[source_col].astype(str).apply(_normalize_gene_id) + tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip() + + # optionally filter to the set of model source genes + if model_source_genes is not None: + tmp = tmp[tmp['_src_norm'].isin(model_source_genes)] + + if tmp.empty: + logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.") + return + + # build reverse mapping: target -> set(sources) + grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna())) + # find targets with more than one source + problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1} + + if problematic: + # prepare warning message with examples (limited subset) + sample_items = list(problematic.items()) + msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."] + for tgt, sources in sample_items: + msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}") + full_msg = "\n".join(msg_lines) + # log warning + logger.warning(full_msg) + + # if everything is fine + logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).") + + +def _normalize_gene_id(g: str) -> str: + """Normalize a gene ID for use as a key (removes prefixes like 'HGNC:' and strips).""" + if g is None: + return "" + g = str(g).strip() + # remove common prefixes + g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE) + g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE) + return g + +def _is_or_only_expression(expr: str) -> bool: + """ + Check if a GPR expression contains only OR operators (no AND operators). + + Args: + expr: GPR expression string + + Returns: + bool: True if expression contains only OR (and parentheses) and has multiple genes, False otherwise + """ + if not expr or not expr.strip(): + return False + + # Normalize the expression + normalized = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') + + # Check if it contains any AND operators + has_and = ' and ' in normalized.lower() + + # Check if it contains OR operators + has_or = ' or ' in normalized.lower() + + # Must have OR operators and no AND operators + return has_or and not has_and + + +def _flatten_or_only_gpr(expr: str) -> str: + """ + Flatten a GPR expression that contains only OR operators by: + 1. Removing all parentheses + 2. Extracting unique gene names + 3. Joining them with ' or ' + + Args: + expr: GPR expression string with only OR operators + + Returns: + str: Flattened GPR expression + """ + if not expr or not expr.strip(): + return expr + + # Extract all gene tokens (exclude logical operators and parentheses) + gene_pattern = r'\b[A-Za-z0-9:_.-]+\b' + logical = {'and', 'or', 'AND', 'OR', '(', ')'} + + tokens = re.findall(gene_pattern, expr) + genes = [t for t in tokens if t not in logical] + + # Create set to remove duplicates, then convert back to list to maintain some order + unique_genes = list(dict.fromkeys(genes)) # Preserves insertion order + + if len(unique_genes) == 0: + return expr + elif len(unique_genes) == 1: + return unique_genes[0] + else: + return ' or '.join(unique_genes) + + +def _simplify_boolean_expression(expr: str) -> str: + """ + Simplify a boolean expression by removing duplicates while strictly preserving semantics. + This function handles simple duplicates within parentheses while being conservative about + complex expressions that could change semantics. + """ + if not expr or not expr.strip(): + return expr + + # Normalize operators and whitespace + expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') + expr = ' '.join(expr.split()) # Normalize whitespace + + def simplify_parentheses_content(match_obj): + """Helper function to simplify content within parentheses.""" + content = match_obj.group(1) # Content inside parentheses + + # Only simplify if it's a pure OR or pure AND chain + if ' or ' in content and ' and ' not in content: + # Pure OR chain - safe to deduplicate + parts = [p.strip() for p in content.split(' or ') if p.strip()] + unique_parts = [] + seen = set() + for part in parts: + if part not in seen: + unique_parts.append(part) + seen.add(part) + + if len(unique_parts) == 1: + return unique_parts[0] # Remove unnecessary parentheses for single items + else: + return '(' + ' or '.join(unique_parts) + ')' + + elif ' and ' in content and ' or ' not in content: + # Pure AND chain - safe to deduplicate + parts = [p.strip() for p in content.split(' and ') if p.strip()] + unique_parts = [] + seen = set() + for part in parts: + if part not in seen: + unique_parts.append(part) + seen.add(part) + + if len(unique_parts) == 1: + return unique_parts[0] # Remove unnecessary parentheses for single items + else: + return '(' + ' and '.join(unique_parts) + ')' + else: + # Mixed operators or single item - return with parentheses as-is + return '(' + content + ')' + + def remove_duplicates_simple(parts_str: str, separator: str) -> str: + """Remove duplicates from a simple chain of operations.""" + parts = [p.strip() for p in parts_str.split(separator) if p.strip()] + + # Remove duplicates while preserving order + unique_parts = [] + seen = set() + for part in parts: + if part not in seen: + unique_parts.append(part) + seen.add(part) + + if len(unique_parts) == 1: + return unique_parts[0] + else: + return f' {separator} '.join(unique_parts) + + try: + import re + + # First, simplify content within parentheses + # This handles cases like (A or A) -> A and (B and B) -> B + expr_simplified = re.sub(r'\(([^()]+)\)', simplify_parentheses_content, expr) + + # Check if the resulting expression has mixed operators + has_and = ' and ' in expr_simplified + has_or = ' or ' in expr_simplified + + # Only simplify top-level if it's pure AND or pure OR + if has_and and not has_or and '(' not in expr_simplified: + # Pure AND chain at top level - safe to deduplicate + return remove_duplicates_simple(expr_simplified, 'and') + elif has_or and not has_and and '(' not in expr_simplified: + # Pure OR chain at top level - safe to deduplicate + return remove_duplicates_simple(expr_simplified, 'or') + else: + # Mixed operators or has parentheses - return the simplified version (with parentheses content cleaned) + return expr_simplified + + except Exception: + # If anything goes wrong, return the original expression + return expr + + +def translate_model_genes(model: 'cobra.Model', + mapping_df: 'pd.DataFrame', + target_nomenclature: str, + source_nomenclature: str = 'hgnc_id', + allow_many_to_one: bool = False, + logger: Optional[logging.Logger] = None) -> Tuple['cobra.Model', Dict[str, str]]: + """ + Translate model genes from source_nomenclature to target_nomenclature using a mapping table. + mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez). + + Args: + model: COBRA model to translate. + mapping_df: DataFrame containing the mapping information. + target_nomenclature: Desired target key (e.g., 'hgnc_symbol'). + source_nomenclature: Current source key in the model (default 'hgnc_id'). + allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs. + logger: Optional logger. + + Returns: + Tuple containing: + - Translated COBRA model + - Dictionary mapping reaction IDs to translation issue descriptions + """ + if logger is None: + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + logger = logging.getLogger(__name__) + + logger.info(f"Translating genes from '{source_nomenclature}' to '{target_nomenclature}'") + + # normalize column names and choose relevant columns + chosen = _choose_columns(mapping_df) + if not chosen: + raise ValueError("Could not detect useful columns in mapping_df. Expected at least one of: ensg, hgnc_id, hgnc_symbol, entrez.") + + # map source/target to actual dataframe column names (allow user-specified source/target keys) + # normalize input args + src_key = source_nomenclature.strip().lower() + tgt_key = target_nomenclature.strip().lower() + + # try to find the actual column names for requested keys + col_for_src = None + col_for_tgt = None + # first, try exact match + for k, actual in chosen.items(): + if k == src_key: + col_for_src = actual + if k == tgt_key: + col_for_tgt = actual + + # if not found, try mapping common names + if col_for_src is None: + possible_src_names = {k: v for k, v in chosen.items()} + # try to match by contained substring + for k, actual in possible_src_names.items(): + if src_key in k: + col_for_src = actual + break + + if col_for_tgt is None: + for k, actual in chosen.items(): + if tgt_key in k: + col_for_tgt = actual + break + + if col_for_src is None: + raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.") + if col_for_tgt is None: + raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.") + + model_source_genes = { _normalize_gene_id(g.id) for g in model.genes } + logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).") + + tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy() + tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).apply(_normalize_gene_id) + + filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy() + + if filtered_map.empty: + logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).") + + if not allow_many_to_one: + _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger) + + # Crea il mapping + gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger) + + # copy model + model_copy = model.copy() + + # statistics + stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0, 'flattened_or_gprs': 0} + unmapped = [] + multi = [] + + # Dictionary to store translation issues per reaction + reaction_translation_issues = {} + + original_genes = {g.id for g in model_copy.genes} + logger.info(f"Original genes count: {len(original_genes)}") + + # translate GPRs + for rxn in model_copy.reactions: + gpr = rxn.gene_reaction_rule + if gpr and gpr.strip(): + new_gpr, rxn_issues = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger, track_issues=True) + if rxn_issues: + reaction_translation_issues[rxn.id] = rxn_issues + + if new_gpr != gpr: + # Check if this GPR has translation issues and contains only OR operators + if rxn_issues and _is_or_only_expression(new_gpr): + # Flatten the GPR: remove parentheses and create set of unique genes + flattened_gpr = _flatten_or_only_gpr(new_gpr) + if flattened_gpr != new_gpr: + stats['flattened_or_gprs'] += 1 + logger.debug(f"Flattened OR-only GPR with issues for {rxn.id}: '{new_gpr}' -> '{flattened_gpr}'") + new_gpr = flattened_gpr + + simplified_gpr = _simplify_boolean_expression(new_gpr) + if simplified_gpr != new_gpr: + stats['simplified_gprs'] += 1 + logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'") + rxn.gene_reaction_rule = simplified_gpr + logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{simplified_gpr}'") + + # update model genes based on new GPRs + _update_model_genes(model_copy, logger) + + # final logging + _log_translation_statistics(stats, unmapped, multi, original_genes, model_copy.genes, logger) + + logger.info("Translation finished") + return model_copy, reaction_translation_issues + + +# ---------- helper functions ---------- +def _create_gene_mapping(mapping_df, source_col: str, target_col: str, logger: logging.Logger) -> Dict[str, List[str]]: + """ + Build mapping dict: source_id -> list of target_ids + Normalizes IDs (removes prefixes like 'HGNC:' etc). + """ + df = mapping_df[[source_col, target_col]].dropna().copy() + # normalize to string + df[source_col] = df[source_col].astype(str).apply(_normalize_gene_id) + df[target_col] = df[target_col].astype(str).str.strip() + + df = df.drop_duplicates() + + logger.info(f"Creating mapping from {len(df)} rows") + + mapping = defaultdict(list) + for _, row in df.iterrows(): + s = row[source_col] + t = row[target_col] + if t not in mapping[s]: + mapping[s].append(t) + + # stats + one_to_one = sum(1 for v in mapping.values() if len(v) == 1) + one_to_many = sum(1 for v in mapping.values() if len(v) > 1) + logger.info(f"Mapping: {len(mapping)} source keys, {one_to_one} 1:1, {one_to_many} 1:many") + return dict(mapping) + + +def _translate_gpr(gpr_string: str, + gene_mapping: Dict[str, List[str]], + stats: Dict[str, int], + unmapped_genes: List[str], + multi_mapping_genes: List[Tuple[str, List[str]]], + logger: logging.Logger, + track_issues: bool = False) -> Union[str, Tuple[str, str]]: + """ + Translate genes inside a GPR string using gene_mapping. + Returns new GPR string, and optionally translation issues if track_issues=True. + """ + # Generic token pattern: letters, digits, :, _, -, ., (captures HGNC:1234, ENSG000..., symbols) + token_pattern = r'\b[A-Za-z0-9:_.-]+\b' + tokens = re.findall(token_pattern, gpr_string) + + logical = {'and', 'or', 'AND', 'OR', '(', ')'} + tokens = [t for t in tokens if t not in logical] + + new_gpr = gpr_string + issues = [] + + for token in sorted(set(tokens), key=lambda x: -len(x)): # longer tokens first to avoid partial replacement + norm = _normalize_gene_id(token) + if norm in gene_mapping: + targets = gene_mapping[norm] + stats['translated'] += 1 + if len(targets) == 1: + stats['one_to_one'] += 1 + replacement = targets[0] + else: + stats['one_to_many'] += 1 + multi_mapping_genes.append((token, targets)) + replacement = "(" + " or ".join(targets) + ")" + if track_issues: + issues.append(f"{token} -> {' or '.join(targets)}") + + pattern = r'\b' + re.escape(token) + r'\b' + new_gpr = re.sub(pattern, replacement, new_gpr) + else: + stats['not_found'] += 1 + if token not in unmapped_genes: + unmapped_genes.append(token) + logger.debug(f"Token not found in mapping (left as-is): {token}") + + # Check for many-to-one cases (multiple source genes mapping to same target) + if track_issues: + # Build reverse mapping to detect many-to-one cases from original tokens + original_to_target = {} + + for orig_token in tokens: + norm = _normalize_gene_id(orig_token) + if norm in gene_mapping: + targets = gene_mapping[norm] + for target in targets: + if target not in original_to_target: + original_to_target[target] = [] + if orig_token not in original_to_target[target]: + original_to_target[target].append(orig_token) + + # Identify many-to-one mappings in this specific GPR + for target, original_genes in original_to_target.items(): + if len(original_genes) > 1: + issues.append(f"{' or '.join(original_genes)} -> {target}") + + issue_text = "; ".join(issues) if issues else "" + + if track_issues: + return new_gpr, issue_text + else: + return new_gpr + + +def _update_model_genes(model: 'cobra.Model', logger: logging.Logger): + """ + Rebuild model.genes from gene_reaction_rule content. + Removes genes not referenced and adds missing ones. + """ + # collect genes in GPRs + gene_pattern = r'\b[A-Za-z0-9:_.-]+\b' + logical = {'and', 'or', 'AND', 'OR', '(', ')'} + genes_in_gpr: Set[str] = set() + + for rxn in model.reactions: + gpr = rxn.gene_reaction_rule + if gpr and gpr.strip(): + toks = re.findall(gene_pattern, gpr) + toks = [t for t in toks if t not in logical] + # normalize IDs consistent with mapping normalization + toks = [_normalize_gene_id(t) for t in toks] + genes_in_gpr.update(toks) + + # existing gene ids + existing = {g.id for g in model.genes} + + # remove obsolete genes + to_remove = [gid for gid in existing if gid not in genes_in_gpr] + removed = 0 + for gid in to_remove: + try: + gene_obj = model.genes.get_by_id(gid) + model.genes.remove(gene_obj) + removed += 1 + except Exception: + # safe-ignore + pass + + # add new genes + added = 0 + for gid in genes_in_gpr: + if gid not in existing: + new_gene = cobra.Gene(gid) + try: + model.genes.add(new_gene) + except Exception: + # fallback: if model.genes doesn't support add, try append or model.add_genes + try: + model.genes.append(new_gene) + except Exception: + try: + model.add_genes([new_gene]) + except Exception: + logger.warning(f"Could not add gene object for {gid}") + added += 1 + + logger.info(f"Model genes updated: removed {removed}, added {added}") + + +def export_model_to_tabular(model: cobraModel, + output_path: str, + translation_issues: Dict = None, + include_objective: bool = True, + save_function = None) -> pd.DataFrame: + """ + Export a COBRA model to tabular format with optional components. + + Args: + model: COBRA model to export + output_path: Path where to save the tabular file + translation_issues: Optional dict of {reaction_id: issues} from gene translation + include_objective: Whether to include objective coefficient column + save_function: Optional custom save function, if None uses pd.DataFrame.to_csv + + Returns: + pd.DataFrame: The merged tabular data + """ + # Generate model data + rules = generate_rules(model, asParsed=False) + + reactions = generate_reactions(model, asParsed=False) + bounds = generate_bounds(model) + medium = get_medium(model) + compartments = generate_compartments(model) + + # Create base DataFrames + df_rules = pd.DataFrame(list(rules.items()), columns=["ReactionID", "GPR"]) + df_reactions = pd.DataFrame(list(reactions.items()), columns=["ReactionID", "Formula"]) + df_bounds = bounds.reset_index().rename(columns={"index": "ReactionID"}) + df_medium = medium.rename(columns={"reaction": "ReactionID"}) + df_medium["InMedium"] = True + + # Start merging + merged = df_reactions.merge(df_rules, on="ReactionID", how="outer") + merged = merged.merge(df_bounds, on="ReactionID", how="outer") + + # Add objective coefficients if requested + if include_objective: + objective_function = extract_objective_coefficients(model) + merged = merged.merge(objective_function, on="ReactionID", how="outer") + + # Add compartments/pathways if they exist + if compartments is not None: + merged = merged.merge(compartments, on="ReactionID", how="outer") + + # Add medium information + merged = merged.merge(df_medium, on="ReactionID", how="left") + + # Add translation issues if provided + if translation_issues: + df_translation_issues = pd.DataFrame([ + {"ReactionID": rxn_id, "TranslationIssues": issues} + for rxn_id, issues in translation_issues.items() + ]) + if not df_translation_issues.empty: + merged = merged.merge(df_translation_issues, on="ReactionID", how="left") + merged["TranslationIssues"] = merged["TranslationIssues"].fillna("") + + # Final processing + merged["InMedium"] = merged["InMedium"].fillna(False) + merged = merged.sort_values(by="InMedium", ascending=False) + + # Save the file + if save_function: + save_function(merged, output_path) + else: + merged.to_csv(output_path, sep="\t", index=False) + + return merged + + +def _log_translation_statistics(stats: Dict[str, int], + unmapped_genes: List[str], + multi_mapping_genes: List[Tuple[str, List[str]]], + original_genes: Set[str], + final_genes, + logger: logging.Logger): + logger.info("=== TRANSLATION STATISTICS ===") + logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})") + logger.info(f"Not found tokens: {stats.get('not_found', 0)}") + logger.info(f"Simplified GPRs: {stats.get('simplified_gprs', 0)}") + logger.info(f"Flattened OR-only GPRs with issues: {stats.get('flattened_or_gprs', 0)}") + + final_ids = {g.id for g in final_genes} + logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}") + + if unmapped_genes: + logger.warning(f"Unmapped tokens ({len(unmapped_genes)}): {', '.join(unmapped_genes[:20])}{(' ...' if len(unmapped_genes)>20 else '')}") + if multi_mapping_genes: + logger.info(f"Multi-mapping examples ({len(multi_mapping_genes)}):") + for orig, targets in multi_mapping_genes[:10]: + logger.info(f" {orig} -> {', '.join(targets)}") + + # Log summary of flattened GPRs if any + if stats.get('flattened_or_gprs', 0) > 0: + logger.info(f"Flattened {stats['flattened_or_gprs']} OR-only GPRs that had translation issues (removed parentheses, created unique gene sets)") + + \ No newline at end of file
