Mercurial > repos > bimib > cobraxy
comparison COBRAxy/utils/model_utils.py @ 455:4e2bc80764b6 draft
Uploaded
author | francesco_lapi |
---|---|
date | Fri, 12 Sep 2025 15:05:54 +0000 |
parents | f8b1761eee37 |
children | a6e45049c1b9 |
comparison
equal
deleted
inserted
replaced
454:3654c08668f1 | 455:4e2bc80764b6 |
---|---|
458 # possibili nomi per ciascuna categoria | 458 # possibili nomi per ciascuna categoria |
459 candidates = { | 459 candidates = { |
460 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'], | 460 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'], |
461 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'], | 461 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'], |
462 'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'], | 462 'hgnc_symbol': ['hgnc_symbol', 'hgnc symbol', 'symbol'], |
463 'entrez_id': ['entrez', 'entrez_id', 'entrezgene'] | 463 'entrez_id': ['entrez', 'entrez_id', 'entrezgene'], |
464 'gene_number': ['gene_number'] | |
464 } | 465 } |
465 for key, names in candidates.items(): | 466 for key, names in candidates.items(): |
466 for n in names: | 467 for n in names: |
467 if n in cols: | 468 if n in cols: |
468 chosen[key] = cols[n] | 469 chosen[key] = cols[n] |
508 # trova target con più di 1 source | 509 # trova target con più di 1 source |
509 problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1} | 510 problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1} |
510 | 511 |
511 if problematic: | 512 if problematic: |
512 # prepara messaggio di errore con esempi (fino a 20) | 513 # prepara messaggio di errore con esempi (fino a 20) |
513 sample_items = list(problematic.items())[:20] | 514 sample_items = list(problematic.items()) |
514 msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."] | 515 msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."] |
515 for tgt, sources in sample_items: | 516 for tgt, sources in sample_items: |
516 msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}") | 517 msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}") |
517 if len(problematic) > len(sample_items): | |
518 msg_lines.append(f" ... and {len(problematic) - len(sample_items)} more cases.") | |
519 full_msg = "\n".join(msg_lines) | 518 full_msg = "\n".join(msg_lines) |
520 # loggare e sollevare errore | 519 # loggare e sollevare errore |
521 logger.error(full_msg) | 520 logger.warning(full_msg) |
522 raise ValueError(full_msg) | |
523 | 521 |
524 # se tutto ok | 522 # se tutto ok |
525 logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).") | 523 logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).") |
526 | 524 |
527 | 525 |
533 # remove common prefixes | 531 # remove common prefixes |
534 g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE) | 532 g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE) |
535 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE) | 533 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE) |
536 return g | 534 return g |
537 | 535 |
536 def _simplify_boolean_expression(expr: str) -> str: | |
537 """ | |
538 Semplifica un'espressione booleana rimuovendo duplicati e riducendo ridondanze. | |
539 Gestisce espressioni con 'and' e 'or'. | |
540 """ | |
541 if not expr or not expr.strip(): | |
542 return expr | |
543 | |
544 # Normalizza gli operatori | |
545 expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') | |
546 | |
547 # Funzione ricorsiva per processare le espressioni | |
548 def process_expression(s: str) -> str: | |
549 s = s.strip() | |
550 if not s: | |
551 return s | |
552 | |
553 # Gestisci le parentesi | |
554 while '(' in s: | |
555 # Trova la parentesi più interna | |
556 start = -1 | |
557 for i, c in enumerate(s): | |
558 if c == '(': | |
559 start = i | |
560 elif c == ')' and start != -1: | |
561 # Processa il contenuto tra parentesi | |
562 inner = s[start+1:i] | |
563 processed_inner = process_expression(inner) | |
564 s = s[:start] + processed_inner + s[i+1:] | |
565 break | |
566 else: | |
567 break | |
568 | |
569 # Dividi per 'or' al livello più alto | |
570 or_parts = [] | |
571 current_part = "" | |
572 paren_count = 0 | |
573 | |
574 tokens = s.split() | |
575 i = 0 | |
576 while i < len(tokens): | |
577 token = tokens[i] | |
578 if token == 'or' and paren_count == 0: | |
579 if current_part.strip(): | |
580 or_parts.append(current_part.strip()) | |
581 current_part = "" | |
582 else: | |
583 if token.count('(') > token.count(')'): | |
584 paren_count += token.count('(') - token.count(')') | |
585 elif token.count(')') > token.count('('): | |
586 paren_count -= token.count(')') - token.count('(') | |
587 current_part += token + " " | |
588 i += 1 | |
589 | |
590 if current_part.strip(): | |
591 or_parts.append(current_part.strip()) | |
592 | |
593 # Processa ogni parte OR | |
594 processed_or_parts = [] | |
595 for or_part in or_parts: | |
596 # Dividi per 'and' all'interno di ogni parte OR | |
597 and_parts = [] | |
598 current_and = "" | |
599 paren_count = 0 | |
600 | |
601 and_tokens = or_part.split() | |
602 j = 0 | |
603 while j < len(and_tokens): | |
604 token = and_tokens[j] | |
605 if token == 'and' and paren_count == 0: | |
606 if current_and.strip(): | |
607 and_parts.append(current_and.strip()) | |
608 current_and = "" | |
609 else: | |
610 if token.count('(') > token.count(')'): | |
611 paren_count += token.count('(') - token.count(')') | |
612 elif token.count(')') > token.count('('): | |
613 paren_count -= token.count(')') - token.count('(') | |
614 current_and += token + " " | |
615 j += 1 | |
616 | |
617 if current_and.strip(): | |
618 and_parts.append(current_and.strip()) | |
619 | |
620 # Rimuovi duplicati nelle parti AND | |
621 unique_and_parts = list(dict.fromkeys(and_parts)) # mantiene l'ordine | |
622 | |
623 if len(unique_and_parts) == 1: | |
624 processed_or_parts.append(unique_and_parts[0]) | |
625 elif len(unique_and_parts) > 1: | |
626 processed_or_parts.append(" and ".join(unique_and_parts)) | |
627 | |
628 # Rimuovi duplicati nelle parti OR | |
629 unique_or_parts = list(dict.fromkeys(processed_or_parts)) | |
630 | |
631 if len(unique_or_parts) == 1: | |
632 return unique_or_parts[0] | |
633 elif len(unique_or_parts) > 1: | |
634 return " or ".join(unique_or_parts) | |
635 else: | |
636 return "" | |
637 | |
638 try: | |
639 return process_expression(expr) | |
640 except Exception: | |
641 # Se la semplificazione fallisce, ritorna l'espressione originale | |
642 return expr | |
643 | |
538 # ---------- Main public function ---------- | 644 # ---------- Main public function ---------- |
539 def translate_model_genes(model: 'cobra.Model', | 645 def translate_model_genes(model: 'cobra.Model', |
540 mapping_df: 'pd.DataFrame', | 646 mapping_df: 'pd.DataFrame', |
541 target_nomenclature: str, | 647 target_nomenclature: str, |
542 source_nomenclature: str = 'hgnc_id', | 648 source_nomenclature: str = 'hgnc_id', |
649 allow_many_to_one: bool = False, | |
543 logger: Optional[logging.Logger] = None) -> 'cobra.Model': | 650 logger: Optional[logging.Logger] = None) -> 'cobra.Model': |
544 """ | 651 """ |
545 Translate model genes from source_nomenclature to target_nomenclature. | 652 Translate model genes from source_nomenclature to target_nomenclature. |
546 mapping_df should contain at least columns that allow the mapping | 653 mapping_df should contain at least columns that allow the mapping |
547 (e.g. ensg, hgnc_id, hgnc_symbol, entrez). | 654 (e.g. ensg, hgnc_id, hgnc_symbol, entrez). |
655 | |
656 Args: | |
657 allow_many_to_one: Se True, permette che più source vengano mappati allo stesso target | |
658 e gestisce i duplicati nelle GPR. Se False, valida l'unicità dei target. | |
548 """ | 659 """ |
549 if logger is None: | 660 if logger is None: |
550 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | 661 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
551 logger = logging.getLogger(__name__) | 662 logger = logging.getLogger(__name__) |
552 | 663 |
561 # normalize input args | 672 # normalize input args |
562 src_key = source_nomenclature.strip().lower() | 673 src_key = source_nomenclature.strip().lower() |
563 tgt_key = target_nomenclature.strip().lower() | 674 tgt_key = target_nomenclature.strip().lower() |
564 | 675 |
565 # try to find the actual column names for requested keys | 676 # try to find the actual column names for requested keys |
566 # support synonyms: user may pass "ensg" or "ENSG" etc. | |
567 col_for_src = None | 677 col_for_src = None |
568 col_for_tgt = None | 678 col_for_tgt = None |
569 # first, try exact match | 679 # first, try exact match |
570 for k, actual in chosen.items(): | 680 for k, actual in chosen.items(): |
571 if k == src_key: | 681 if k == src_key: |
573 if k == tgt_key: | 683 if k == tgt_key: |
574 col_for_tgt = actual | 684 col_for_tgt = actual |
575 | 685 |
576 # if not found, try mapping common names | 686 # if not found, try mapping common names |
577 if col_for_src is None: | 687 if col_for_src is None: |
578 # fallback: if user passed 'hgnc_id' but chosen has only 'hgnc_symbol', it's not useful | |
579 # we require at least the source column to exist | |
580 possible_src_names = {k: v for k, v in chosen.items()} | 688 possible_src_names = {k: v for k, v in chosen.items()} |
581 # try to match by contained substring | 689 # try to match by contained substring |
582 for k, actual in possible_src_names.items(): | 690 for k, actual in possible_src_names.items(): |
583 if src_key in k: | 691 if src_key in k: |
584 col_for_src = actual | 692 col_for_src = actual |
592 | 700 |
593 if col_for_src is None: | 701 if col_for_src is None: |
594 raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.") | 702 raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.") |
595 if col_for_tgt is None: | 703 if col_for_tgt is None: |
596 raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.") | 704 raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.") |
597 | |
598 | 705 |
599 model_source_genes = { _normalize_gene_id(g.id) for g in model.genes } | 706 model_source_genes = { _normalize_gene_id(g.id) for g in model.genes } |
600 logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).") | 707 logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).") |
601 | 708 |
602 tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy() | 709 tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy() |
603 tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).map(_normalize_gene_id) | 710 tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).map(_normalize_gene_id) |
604 | 711 |
605 filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy() | 712 filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy() |
606 | 713 |
607 # Se non ci sono righe rilevanti, avvisa (possono non esserci mapping per i geni presenti) | 714 # Se non ci sono righe rilevanti, avvisa |
608 if filtered_map.empty: | 715 if filtered_map.empty: |
609 logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).") | 716 logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).") |
610 | 717 |
611 # --- VALIDAZIONE: nessun target deve essere mappato da piu' di un source (nell'insieme filtrato) --- | 718 # --- VALIDAZIONE: opzionale in base al parametro allow_many_to_one --- |
612 # Se vuoi la verifica su tutto il dataframe (non solo sui geni del modello), passa model_source_genes=None. | 719 if not allow_many_to_one: |
613 _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger) | 720 _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger) |
614 | 721 |
615 # Ora crea il mapping solo sul sottoinsieme filtrato (piu' efficiente) | 722 # Crea il mapping |
616 # ATTENZIONE: _create_gene_mapping si aspetta i nomi originali delle colonne | |
617 # quindi passiamo filtered_map con le colonne rimappate (senza la col_for_src + "_norm") | |
618 gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger) | 723 gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger) |
619 | 724 |
620 # copy model | 725 # copy model |
621 model_copy = model.copy() | 726 model_copy = model.copy() |
622 | 727 |
623 # statistics | 728 # statistics |
624 stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0} | 729 stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0} |
625 unmapped = [] | 730 unmapped = [] |
626 multi = [] | 731 multi = [] |
627 | 732 |
628 original_genes = {g.id for g in model_copy.genes} | 733 original_genes = {g.id for g in model_copy.genes} |
629 logger.info(f"Original genes count: {len(original_genes)}") | 734 logger.info(f"Original genes count: {len(original_genes)}") |
632 for rxn in model_copy.reactions: | 737 for rxn in model_copy.reactions: |
633 gpr = rxn.gene_reaction_rule | 738 gpr = rxn.gene_reaction_rule |
634 if gpr and gpr.strip(): | 739 if gpr and gpr.strip(): |
635 new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger) | 740 new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger) |
636 if new_gpr != gpr: | 741 if new_gpr != gpr: |
637 rxn.gene_reaction_rule = new_gpr | 742 # Semplifica l'espressione booleana per rimuovere duplicati |
638 logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{new_gpr}'") | 743 simplified_gpr = _simplify_boolean_expression(new_gpr) |
744 if simplified_gpr != new_gpr: | |
745 stats['simplified_gprs'] += 1 | |
746 logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'") | |
747 rxn.gene_reaction_rule = simplified_gpr | |
748 logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{simplified_gpr}'") | |
639 | 749 |
640 # update model genes based on new GPRs | 750 # update model genes based on new GPRs |
641 _update_model_genes(model_copy, logger) | 751 _update_model_genes(model_copy, logger) |
642 | 752 |
643 # final logging | 753 # final logging |
781 final_genes, | 891 final_genes, |
782 logger: logging.Logger): | 892 logger: logging.Logger): |
783 logger.info("=== TRANSLATION STATISTICS ===") | 893 logger.info("=== TRANSLATION STATISTICS ===") |
784 logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})") | 894 logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})") |
785 logger.info(f"Not found tokens: {stats.get('not_found', 0)}") | 895 logger.info(f"Not found tokens: {stats.get('not_found', 0)}") |
896 logger.info(f"Simplified GPRs: {stats.get('simplified_gprs', 0)}") | |
786 | 897 |
787 final_ids = {g.id for g in final_genes} | 898 final_ids = {g.id for g in final_genes} |
788 logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}") | 899 logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}") |
789 | 900 |
790 if unmapped_genes: | 901 if unmapped_genes: |