comparison COBRAxy/utils/model_utils.py @ 455:4e2bc80764b6 draft

Uploaded
author francesco_lapi
date Fri, 12 Sep 2025 15:05:54 +0000
parents f8b1761eee37
children a6e45049c1b9
comparison
equal deleted inserted replaced
454:3654c08668f1 455:4e2bc80764b6
458 # possibili nomi per ciascuna categoria 458 # possibili nomi per ciascuna categoria
459 candidates = { 459 candidates = {
460 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'], 460 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'],
461 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'], 461 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'],
462 'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'], 462 'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'],
463 'entrez_id': ['entrez', 'entrez_id', 'entrezgene'] 463 'entrez_id': ['entrez', 'entrez_id', 'entrezgene'],
464 'gene_number': ['gene_number']
464 } 465 }
465 for key, names in candidates.items(): 466 for key, names in candidates.items():
466 for n in names: 467 for n in names:
467 if n in cols: 468 if n in cols:
468 chosen[key] = cols[n] 469 chosen[key] = cols[n]
508 # trova target con più di 1 source 509 # trova target con più di 1 source
509 problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1} 510 problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1}
510 511
511 if problematic: 512 if problematic:
512 # prepara messaggio di errore con esempi (fino a 20) 513 # prepara messaggio di errore con esempi (fino a 20)
513 sample_items = list(problematic.items())[:20] 514 sample_items = list(problematic.items())
514 msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."] 515 msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."]
515 for tgt, sources in sample_items: 516 for tgt, sources in sample_items:
516 msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}") 517 msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}")
517 if len(problematic) > len(sample_items):
518 msg_lines.append(f" ... and {len(problematic) - len(sample_items)} more cases.")
519 full_msg = "\n".join(msg_lines) 518 full_msg = "\n".join(msg_lines)
520 # loggare e sollevare errore 519 # loggare e sollevare errore
521 logger.error(full_msg) 520 logger.warning(full_msg)
522 raise ValueError(full_msg)
523 521
524 # se tutto ok 522 # se tutto ok
525 logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).") 523 logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).")
526 524
527 525
533 # remove common prefixes 531 # remove common prefixes
534 g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE) 532 g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE)
535 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE) 533 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE)
536 return g 534 return g
537 535
536 def _simplify_boolean_expression(expr: str) -> str:
537 """
538 Semplifica un'espressione booleana rimuovendo duplicati e riducendo ridondanze.
539 Gestisce espressioni con 'and' e 'or'.
540 """
541 if not expr or not expr.strip():
542 return expr
543
544 # Normalizza gli operatori
545 expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
546
547 # Funzione ricorsiva per processare le espressioni
548 def process_expression(s: str) -> str:
549 s = s.strip()
550 if not s:
551 return s
552
553 # Gestisci le parentesi
554 while '(' in s:
555 # Trova la parentesi più interna
556 start = -1
557 for i, c in enumerate(s):
558 if c == '(':
559 start = i
560 elif c == ')' and start != -1:
561 # Processa il contenuto tra parentesi
562 inner = s[start+1:i]
563 processed_inner = process_expression(inner)
564 s = s[:start] + processed_inner + s[i+1:]
565 break
566 else:
567 break
568
569 # Dividi per 'or' al livello più alto
570 or_parts = []
571 current_part = ""
572 paren_count = 0
573
574 tokens = s.split()
575 i = 0
576 while i < len(tokens):
577 token = tokens[i]
578 if token == 'or' and paren_count == 0:
579 if current_part.strip():
580 or_parts.append(current_part.strip())
581 current_part = ""
582 else:
583 if token.count('(') > token.count(')'):
584 paren_count += token.count('(') - token.count(')')
585 elif token.count(')') > token.count('('):
586 paren_count -= token.count(')') - token.count('(')
587 current_part += token + " "
588 i += 1
589
590 if current_part.strip():
591 or_parts.append(current_part.strip())
592
593 # Processa ogni parte OR
594 processed_or_parts = []
595 for or_part in or_parts:
596 # Dividi per 'and' all'interno di ogni parte OR
597 and_parts = []
598 current_and = ""
599 paren_count = 0
600
601 and_tokens = or_part.split()
602 j = 0
603 while j < len(and_tokens):
604 token = and_tokens[j]
605 if token == 'and' and paren_count == 0:
606 if current_and.strip():
607 and_parts.append(current_and.strip())
608 current_and = ""
609 else:
610 if token.count('(') > token.count(')'):
611 paren_count += token.count('(') - token.count(')')
612 elif token.count(')') > token.count('('):
613 paren_count -= token.count(')') - token.count('(')
614 current_and += token + " "
615 j += 1
616
617 if current_and.strip():
618 and_parts.append(current_and.strip())
619
620 # Rimuovi duplicati nelle parti AND
621 unique_and_parts = list(dict.fromkeys(and_parts)) # mantiene l'ordine
622
623 if len(unique_and_parts) == 1:
624 processed_or_parts.append(unique_and_parts[0])
625 elif len(unique_and_parts) > 1:
626 processed_or_parts.append(" and ".join(unique_and_parts))
627
628 # Rimuovi duplicati nelle parti OR
629 unique_or_parts = list(dict.fromkeys(processed_or_parts))
630
631 if len(unique_or_parts) == 1:
632 return unique_or_parts[0]
633 elif len(unique_or_parts) > 1:
634 return " or ".join(unique_or_parts)
635 else:
636 return ""
637
638 try:
639 return process_expression(expr)
640 except Exception:
641 # Se la semplificazione fallisce, ritorna l'espressione originale
642 return expr
643
538 # ---------- Main public function ---------- 644 # ---------- Main public function ----------
539 def translate_model_genes(model: 'cobra.Model', 645 def translate_model_genes(model: 'cobra.Model',
540 mapping_df: 'pd.DataFrame', 646 mapping_df: 'pd.DataFrame',
541 target_nomenclature: str, 647 target_nomenclature: str,
542 source_nomenclature: str = 'hgnc_id', 648 source_nomenclature: str = 'hgnc_id',
649 allow_many_to_one: bool = False,
543 logger: Optional[logging.Logger] = None) -> 'cobra.Model': 650 logger: Optional[logging.Logger] = None) -> 'cobra.Model':
544 """ 651 """
545 Translate model genes from source_nomenclature to target_nomenclature. 652 Translate model genes from source_nomenclature to target_nomenclature.
546 mapping_df should contain at least columns that allow the mapping 653 mapping_df should contain at least columns that allow the mapping
547 (e.g. ensg, hgnc_id, hgnc_symbol, entrez). 654 (e.g. ensg, hgnc_id, hgnc_symbol, entrez).
655
656 Args:
657 allow_many_to_one: Se True, permette che più source vengano mappati allo stesso target
658 e gestisce i duplicati nelle GPR. Se False, valida l'unicità dei target.
548 """ 659 """
549 if logger is None: 660 if logger is None:
550 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') 661 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
551 logger = logging.getLogger(__name__) 662 logger = logging.getLogger(__name__)
552 663
561 # normalize input args 672 # normalize input args
562 src_key = source_nomenclature.strip().lower() 673 src_key = source_nomenclature.strip().lower()
563 tgt_key = target_nomenclature.strip().lower() 674 tgt_key = target_nomenclature.strip().lower()
564 675
565 # try to find the actual column names for requested keys 676 # try to find the actual column names for requested keys
566 # support synonyms: user may pass "ensg" or "ENSG" etc.
567 col_for_src = None 677 col_for_src = None
568 col_for_tgt = None 678 col_for_tgt = None
569 # first, try exact match 679 # first, try exact match
570 for k, actual in chosen.items(): 680 for k, actual in chosen.items():
571 if k == src_key: 681 if k == src_key:
573 if k == tgt_key: 683 if k == tgt_key:
574 col_for_tgt = actual 684 col_for_tgt = actual
575 685
576 # if not found, try mapping common names 686 # if not found, try mapping common names
577 if col_for_src is None: 687 if col_for_src is None:
578 # fallback: if user passed 'hgnc_id' but chosen has only 'hgnc_symbol', it's not useful
579 # we require at least the source column to exist
580 possible_src_names = {k: v for k, v in chosen.items()} 688 possible_src_names = {k: v for k, v in chosen.items()}
581 # try to match by contained substring 689 # try to match by contained substring
582 for k, actual in possible_src_names.items(): 690 for k, actual in possible_src_names.items():
583 if src_key in k: 691 if src_key in k:
584 col_for_src = actual 692 col_for_src = actual
592 700
593 if col_for_src is None: 701 if col_for_src is None:
594 raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.") 702 raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.")
595 if col_for_tgt is None: 703 if col_for_tgt is None:
596 raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.") 704 raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.")
597
598 705
599 model_source_genes = { _normalize_gene_id(g.id) for g in model.genes } 706 model_source_genes = { _normalize_gene_id(g.id) for g in model.genes }
600 logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).") 707 logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).")
601 708
602 tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy() 709 tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy()
603 tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).map(_normalize_gene_id) 710 tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).map(_normalize_gene_id)
604 711
605 filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy() 712 filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy()
606 713
607 # Se non ci sono righe rilevanti, avvisa (possono non esserci mapping per i geni presenti) 714 # Se non ci sono righe rilevanti, avvisa
608 if filtered_map.empty: 715 if filtered_map.empty:
609 logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).") 716 logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).")
610 717
611 # --- VALIDAZIONE: nessun target deve essere mappato da piu' di un source (nell'insieme filtrato) --- 718 # --- VALIDAZIONE: opzionale in base al parametro allow_many_to_one ---
612 # Se vuoi la verifica su tutto il dataframe (non solo sui geni del modello), passa model_source_genes=None. 719 if not allow_many_to_one:
613 _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger) 720 _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger)
614 721
615 # Ora crea il mapping solo sul sottoinsieme filtrato (piu' efficiente) 722 # Crea il mapping
616 # ATTENZIONE: _create_gene_mapping si aspetta i nomi originali delle colonne
617 # quindi passiamo filtered_map con le colonne rimappate (senza la col_for_src + "_norm")
618 gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger) 723 gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger)
619 724
620 # copy model 725 # copy model
621 model_copy = model.copy() 726 model_copy = model.copy()
622 727
623 # statistics 728 # statistics
624 stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0} 729 stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0}
625 unmapped = [] 730 unmapped = []
626 multi = [] 731 multi = []
627 732
628 original_genes = {g.id for g in model_copy.genes} 733 original_genes = {g.id for g in model_copy.genes}
629 logger.info(f"Original genes count: {len(original_genes)}") 734 logger.info(f"Original genes count: {len(original_genes)}")
632 for rxn in model_copy.reactions: 737 for rxn in model_copy.reactions:
633 gpr = rxn.gene_reaction_rule 738 gpr = rxn.gene_reaction_rule
634 if gpr and gpr.strip(): 739 if gpr and gpr.strip():
635 new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger) 740 new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger)
636 if new_gpr != gpr: 741 if new_gpr != gpr:
637 rxn.gene_reaction_rule = new_gpr 742 # Semplifica l'espressione booleana per rimuovere duplicati
638 logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{new_gpr}'") 743 simplified_gpr = _simplify_boolean_expression(new_gpr)
744 if simplified_gpr != new_gpr:
745 stats['simplified_gprs'] += 1
746 logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'")
747 rxn.gene_reaction_rule = simplified_gpr
748 logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{simplified_gpr}'")
639 749
640 # update model genes based on new GPRs 750 # update model genes based on new GPRs
641 _update_model_genes(model_copy, logger) 751 _update_model_genes(model_copy, logger)
642 752
643 # final logging 753 # final logging
781 final_genes, 891 final_genes,
782 logger: logging.Logger): 892 logger: logging.Logger):
783 logger.info("=== TRANSLATION STATISTICS ===") 893 logger.info("=== TRANSLATION STATISTICS ===")
784 logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})") 894 logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})")
785 logger.info(f"Not found tokens: {stats.get('not_found', 0)}") 895 logger.info(f"Not found tokens: {stats.get('not_found', 0)}")
896 logger.info(f"Simplified GPRs: {stats.get('simplified_gprs', 0)}")
786 897
787 final_ids = {g.id for g in final_genes} 898 final_ids = {g.id for g in final_genes}
788 logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}") 899 logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}")
789 900
790 if unmapped_genes: 901 if unmapped_genes: