diff COBRAxy/utils/model_utils.py @ 455:4e2bc80764b6 draft

Uploaded
author francesco_lapi
date Fri, 12 Sep 2025 15:05:54 +0000
parents f8b1761eee37
children a6e45049c1b9
line wrap: on
line diff
--- a/COBRAxy/utils/model_utils.py	Thu Sep 11 21:02:09 2025 +0000
+++ b/COBRAxy/utils/model_utils.py	Fri Sep 12 15:05:54 2025 +0000
@@ -460,7 +460,8 @@
         'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'],
         'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'],
         'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'],
-        'entrez_id': ['entrez', 'entrez_id', 'entrezgene']
+        'entrez_id': ['entrez', 'entrez_id', 'entrezgene'],
+        'gene_number': ['gene_number']
     }
     for key, names in candidates.items():
         for n in names:
@@ -510,16 +511,13 @@
 
     if problematic:
         # prepara messaggio di errore con esempi (fino a 20)
-        sample_items = list(problematic.items())[:20]
+        sample_items = list(problematic.items())
         msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."]
         for tgt, sources in sample_items:
             msg_lines.append(f"  - target '{tgt}' <- sources: {', '.join(sources)}")
-        if len(problematic) > len(sample_items):
-            msg_lines.append(f"  ... and {len(problematic) - len(sample_items)} more cases.")
         full_msg = "\n".join(msg_lines)
         # loggare e sollevare errore
-        logger.error(full_msg)
-        raise ValueError(full_msg)
+        logger.warning(full_msg)
 
     # se tutto ok
     logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).")
@@ -535,16 +533,129 @@
     g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE)
     return g
 
+def _simplify_boolean_expression(expr: str) -> str:
+    """
+    Semplifica un'espressione booleana rimuovendo duplicati e riducendo ridondanze.
+    Gestisce espressioni con 'and' e 'or'.
+    """
+    if not expr or not expr.strip():
+        return expr
+    
+    # Normalizza gli operatori
+    expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
+    
+    # Funzione ricorsiva per processare le espressioni
+    def process_expression(s: str) -> str:
+        s = s.strip()
+        if not s:
+            return s
+            
+        # Gestisci le parentesi
+        while '(' in s:
+            # Trova la parentesi più interna
+            start = -1
+            for i, c in enumerate(s):
+                if c == '(':
+                    start = i
+                elif c == ')' and start != -1:
+                    # Processa il contenuto tra parentesi
+                    inner = s[start+1:i]
+                    processed_inner = process_expression(inner)
+                    s = s[:start] + processed_inner + s[i+1:]
+                    break
+            else:
+                break
+        
+        # Dividi per 'or' al livello più alto
+        or_parts = []
+        current_part = ""
+        paren_count = 0
+        
+        tokens = s.split()
+        i = 0
+        while i < len(tokens):
+            token = tokens[i]
+            if token == 'or' and paren_count == 0:
+                if current_part.strip():
+                    or_parts.append(current_part.strip())
+                current_part = ""
+            else:
+                if token.count('(') > token.count(')'):
+                    paren_count += token.count('(') - token.count(')')
+                elif token.count(')') > token.count('('):
+                    paren_count -= token.count(')') - token.count('(')
+                current_part += token + " "
+            i += 1
+        
+        if current_part.strip():
+            or_parts.append(current_part.strip())
+        
+        # Processa ogni parte OR
+        processed_or_parts = []
+        for or_part in or_parts:
+            # Dividi per 'and' all'interno di ogni parte OR
+            and_parts = []
+            current_and = ""
+            paren_count = 0
+            
+            and_tokens = or_part.split()
+            j = 0
+            while j < len(and_tokens):
+                token = and_tokens[j]
+                if token == 'and' and paren_count == 0:
+                    if current_and.strip():
+                        and_parts.append(current_and.strip())
+                    current_and = ""
+                else:
+                    if token.count('(') > token.count(')'):
+                        paren_count += token.count('(') - token.count(')')
+                    elif token.count(')') > token.count('('):
+                        paren_count -= token.count(')') - token.count('(')
+                    current_and += token + " "
+                j += 1
+            
+            if current_and.strip():
+                and_parts.append(current_and.strip())
+            
+            # Rimuovi duplicati nelle parti AND
+            unique_and_parts = list(dict.fromkeys(and_parts))  # mantiene l'ordine
+            
+            if len(unique_and_parts) == 1:
+                processed_or_parts.append(unique_and_parts[0])
+            elif len(unique_and_parts) > 1:
+                processed_or_parts.append(" and ".join(unique_and_parts))
+        
+        # Rimuovi duplicati nelle parti OR
+        unique_or_parts = list(dict.fromkeys(processed_or_parts))
+        
+        if len(unique_or_parts) == 1:
+            return unique_or_parts[0]
+        elif len(unique_or_parts) > 1:
+            return " or ".join(unique_or_parts)
+        else:
+            return ""
+    
+    try:
+        return process_expression(expr)
+    except Exception:
+        # Se la semplificazione fallisce, ritorna l'espressione originale
+        return expr
+
 # ---------- Main public function ----------
 def translate_model_genes(model: 'cobra.Model',
                          mapping_df: 'pd.DataFrame',
                          target_nomenclature: str,
                          source_nomenclature: str = 'hgnc_id',
+                         allow_many_to_one: bool = False,
                          logger: Optional[logging.Logger] = None) -> 'cobra.Model':
     """
     Translate model genes from source_nomenclature to target_nomenclature.
     mapping_df should contain at least columns that allow the mapping
     (e.g. ensg, hgnc_id, hgnc_symbol, entrez).
+    
+    Args:
+        allow_many_to_one: Se True, permette che più source vengano mappati allo stesso target
+                          e gestisce i duplicati nelle GPR. Se False, valida l'unicità dei target.
     """
     if logger is None:
         logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -563,7 +674,6 @@
     tgt_key = target_nomenclature.strip().lower()
 
     # try to find the actual column names for requested keys
-    # support synonyms: user may pass "ensg" or "ENSG" etc.
     col_for_src = None
     col_for_tgt = None
     # first, try exact match
@@ -575,8 +685,6 @@
 
     # if not found, try mapping common names
     if col_for_src is None:
-        # fallback: if user passed 'hgnc_id' but chosen has only 'hgnc_symbol', it's not useful
-        # we require at least the source column to exist
         possible_src_names = {k: v for k, v in chosen.items()}
         # try to match by contained substring
         for k, actual in possible_src_names.items():
@@ -594,7 +702,6 @@
         raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.")
     if col_for_tgt is None:
         raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.")
-    
 
     model_source_genes = { _normalize_gene_id(g.id) for g in model.genes }
     logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).")
@@ -604,24 +711,22 @@
 
     filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy()
 
-    # Se non ci sono righe rilevanti, avvisa (possono non esserci mapping per i geni presenti)
+    # Se non ci sono righe rilevanti, avvisa
     if filtered_map.empty:
         logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).")
 
-    # --- VALIDAZIONE: nessun target deve essere mappato da piu' di un source (nell'insieme filtrato) ---
-    # Se vuoi la verifica su tutto il dataframe (non solo sui geni del modello), passa model_source_genes=None.
-    _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger)
+    # --- VALIDAZIONE: opzionale in base al parametro allow_many_to_one ---
+    if not allow_many_to_one:
+        _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger)
 
-    # Ora crea il mapping solo sul sottoinsieme filtrato (piu' efficiente)
-    # ATTENZIONE: _create_gene_mapping si aspetta i nomi originali delle colonne
-    # quindi passiamo filtered_map con le colonne rimappate (senza la col_for_src + "_norm")
+    # Crea il mapping
     gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger)
 
     # copy model
     model_copy = model.copy()
 
     # statistics
-    stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0}
+    stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0}
     unmapped = []
     multi = []
 
@@ -634,8 +739,13 @@
         if gpr and gpr.strip():
             new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger)
             if new_gpr != gpr:
-                rxn.gene_reaction_rule = new_gpr
-                logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{new_gpr}'")
+                # Semplifica l'espressione booleana per rimuovere duplicati
+                simplified_gpr = _simplify_boolean_expression(new_gpr)
+                if simplified_gpr != new_gpr:
+                    stats['simplified_gprs'] += 1
+                    logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'")
+                rxn.gene_reaction_rule = simplified_gpr
+                logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{simplified_gpr}'")
 
     # update model genes based on new GPRs
     _update_model_genes(model_copy, logger)
@@ -783,6 +893,7 @@
     logger.info("=== TRANSLATION STATISTICS ===")
     logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})")
     logger.info(f"Not found tokens: {stats.get('not_found', 0)}")
+    logger.info(f"Simplified GPRs: {stats.get('simplified_gprs', 0)}")
 
     final_ids = {g.id for g in final_genes}
     logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}")