Mercurial > repos > bimib > cobraxy
diff COBRAxy/utils/model_utils.py @ 455:4e2bc80764b6 draft
Uploaded
author | francesco_lapi |
---|---|
date | Fri, 12 Sep 2025 15:05:54 +0000 |
parents | f8b1761eee37 |
children | a6e45049c1b9 |
line wrap: on
line diff
--- a/COBRAxy/utils/model_utils.py Thu Sep 11 21:02:09 2025 +0000 +++ b/COBRAxy/utils/model_utils.py Fri Sep 12 15:05:54 2025 +0000 @@ -460,7 +460,8 @@ 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'], 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'], 'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'], - 'entrez_id': ['entrez', 'entrez_id', 'entrezgene'] + 'entrez_id': ['entrez', 'entrez_id', 'entrezgene'], + 'gene_number': ['gene_number'] } for key, names in candidates.items(): for n in names: @@ -510,16 +511,13 @@ if problematic: # prepara messaggio di errore con esempi (fino a 20) - sample_items = list(problematic.items())[:20] + sample_items = list(problematic.items()) msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."] for tgt, sources in sample_items: msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}") - if len(problematic) > len(sample_items): - msg_lines.append(f" ... and {len(problematic) - len(sample_items)} more cases.") full_msg = "\n".join(msg_lines) # loggare e sollevare errore - logger.error(full_msg) - raise ValueError(full_msg) + logger.warning(full_msg) # se tutto ok logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).") @@ -535,16 +533,129 @@ g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE) return g +def _simplify_boolean_expression(expr: str) -> str: + """ + Semplifica un'espressione booleana rimuovendo duplicati e riducendo ridondanze. + Gestisce espressioni con 'and' e 'or'. + """ + if not expr or not expr.strip(): + return expr + + # Normalizza gli operatori + expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') + + # Funzione ricorsiva per processare le espressioni + def process_expression(s: str) -> str: + s = s.strip() + if not s: + return s + + # Gestisci le parentesi + while '(' in s: + # Trova la parentesi più interna + start = -1 + for i, c in enumerate(s): + if c == '(': + start = i + elif c == ')' and start != -1: + # Processa il contenuto tra parentesi + inner = s[start+1:i] + processed_inner = process_expression(inner) + s = s[:start] + processed_inner + s[i+1:] + break + else: + break + + # Dividi per 'or' al livello più alto + or_parts = [] + current_part = "" + paren_count = 0 + + tokens = s.split() + i = 0 + while i < len(tokens): + token = tokens[i] + if token == 'or' and paren_count == 0: + if current_part.strip(): + or_parts.append(current_part.strip()) + current_part = "" + else: + if token.count('(') > token.count(')'): + paren_count += token.count('(') - token.count(')') + elif token.count(')') > token.count('('): + paren_count -= token.count(')') - token.count('(') + current_part += token + " " + i += 1 + + if current_part.strip(): + or_parts.append(current_part.strip()) + + # Processa ogni parte OR + processed_or_parts = [] + for or_part in or_parts: + # Dividi per 'and' all'interno di ogni parte OR + and_parts = [] + current_and = "" + paren_count = 0 + + and_tokens = or_part.split() + j = 0 + while j < len(and_tokens): + token = and_tokens[j] + if token == 'and' and paren_count == 0: + if current_and.strip(): + and_parts.append(current_and.strip()) + current_and = "" + else: + if token.count('(') > token.count(')'): + paren_count += token.count('(') - token.count(')') + elif token.count(')') > token.count('('): + paren_count -= token.count(')') - token.count('(') + current_and += token + " " + j += 1 + + if current_and.strip(): + and_parts.append(current_and.strip()) + + # Rimuovi duplicati nelle parti AND + unique_and_parts = list(dict.fromkeys(and_parts)) # mantiene l'ordine + + if len(unique_and_parts) == 1: + processed_or_parts.append(unique_and_parts[0]) + elif len(unique_and_parts) > 1: + processed_or_parts.append(" and ".join(unique_and_parts)) + + # Rimuovi duplicati nelle parti OR + unique_or_parts = list(dict.fromkeys(processed_or_parts)) + + if len(unique_or_parts) == 1: + return unique_or_parts[0] + elif len(unique_or_parts) > 1: + return " or ".join(unique_or_parts) + else: + return "" + + try: + return process_expression(expr) + except Exception: + # Se la semplificazione fallisce, ritorna l'espressione originale + return expr + # ---------- Main public function ---------- def translate_model_genes(model: 'cobra.Model', mapping_df: 'pd.DataFrame', target_nomenclature: str, source_nomenclature: str = 'hgnc_id', + allow_many_to_one: bool = False, logger: Optional[logging.Logger] = None) -> 'cobra.Model': """ Translate model genes from source_nomenclature to target_nomenclature. mapping_df should contain at least columns that allow the mapping (e.g. ensg, hgnc_id, hgnc_symbol, entrez). + + Args: + allow_many_to_one: Se True, permette che più source vengano mappati allo stesso target + e gestisce i duplicati nelle GPR. Se False, valida l'unicità dei target. """ if logger is None: logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @@ -563,7 +674,6 @@ tgt_key = target_nomenclature.strip().lower() # try to find the actual column names for requested keys - # support synonyms: user may pass "ensg" or "ENSG" etc. col_for_src = None col_for_tgt = None # first, try exact match @@ -575,8 +685,6 @@ # if not found, try mapping common names if col_for_src is None: - # fallback: if user passed 'hgnc_id' but chosen has only 'hgnc_symbol', it's not useful - # we require at least the source column to exist possible_src_names = {k: v for k, v in chosen.items()} # try to match by contained substring for k, actual in possible_src_names.items(): @@ -594,7 +702,6 @@ raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.") if col_for_tgt is None: raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.") - model_source_genes = { _normalize_gene_id(g.id) for g in model.genes } logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).") @@ -604,24 +711,22 @@ filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy() - # Se non ci sono righe rilevanti, avvisa (possono non esserci mapping per i geni presenti) + # Se non ci sono righe rilevanti, avvisa if filtered_map.empty: logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).") - # --- VALIDAZIONE: nessun target deve essere mappato da piu' di un source (nell'insieme filtrato) --- - # Se vuoi la verifica su tutto il dataframe (non solo sui geni del modello), passa model_source_genes=None. - _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger) + # --- VALIDAZIONE: opzionale in base al parametro allow_many_to_one --- + if not allow_many_to_one: + _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger) - # Ora crea il mapping solo sul sottoinsieme filtrato (piu' efficiente) - # ATTENZIONE: _create_gene_mapping si aspetta i nomi originali delle colonne - # quindi passiamo filtered_map con le colonne rimappate (senza la col_for_src + "_norm") + # Crea il mapping gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger) # copy model model_copy = model.copy() # statistics - stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0} + stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0} unmapped = [] multi = [] @@ -634,8 +739,13 @@ if gpr and gpr.strip(): new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger) if new_gpr != gpr: - rxn.gene_reaction_rule = new_gpr - logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{new_gpr}'") + # Semplifica l'espressione booleana per rimuovere duplicati + simplified_gpr = _simplify_boolean_expression(new_gpr) + if simplified_gpr != new_gpr: + stats['simplified_gprs'] += 1 + logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'") + rxn.gene_reaction_rule = simplified_gpr + logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{simplified_gpr}'") # update model genes based on new GPRs _update_model_genes(model_copy, logger) @@ -783,6 +893,7 @@ logger.info("=== TRANSLATION STATISTICS ===") logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})") logger.info(f"Not found tokens: {stats.get('not_found', 0)}") + logger.info(f"Simplified GPRs: {stats.get('simplified_gprs', 0)}") final_ids = {g.id for g in final_genes} logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}")