comparison COBRAxy/utils/model_utils.py @ 426:00a78da611ba draft

Uploaded
author francesco_lapi
date Wed, 10 Sep 2025 09:25:32 +0000
parents ed2c1f9e20ba
children 4a385fdb9e58
comparison
equal deleted inserted replaced
425:233d5d1e6bb2 426:00a78da611ba
3 import cobra 3 import cobra
4 import pickle 4 import pickle
5 import argparse 5 import argparse
6 import pandas as pd 6 import pandas as pd
7 import re 7 import re
8 import logging
8 from typing import Optional, Tuple, Union, List, Dict, Set 9 from typing import Optional, Tuple, Union, List, Dict, Set
10 from collections import defaultdict
9 import utils.general_utils as utils 11 import utils.general_utils as utils
10 import utils.rule_parsing as rulesUtils 12 import utils.rule_parsing as rulesUtils
11 import utils.reaction_parsing as reactionUtils 13 import utils.reaction_parsing as reactionUtils
12 from cobra import Model as cobraModel, Reaction, Metabolite 14 from cobra import Model as cobraModel, Reaction, Metabolite
13 15
70 trueMedium.append(r.id) 72 trueMedium.append(r.id)
71 73
72 df_medium = pd.DataFrame() 74 df_medium = pd.DataFrame()
73 df_medium["reaction"] = trueMedium 75 df_medium["reaction"] = trueMedium
74 return df_medium 76 return df_medium
77
78 def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame:
79 """
80 Estrae i coefficienti della funzione obiettivo per ciascuna reazione del modello.
81
82 Args:
83 model : cobra.Model
84
85 Returns:
86 pd.DataFrame con colonne: ReactionID, ObjectiveCoefficient
87 """
88 coeffs = []
89 # model.objective.expression è un'espressione lineare
90 objective_expr = model.objective.expression.as_coefficients_dict()
91
92 for reaction in model.reactions:
93 coeff = objective_expr.get(reaction.forward_variable, 0.0)
94 coeffs.append({
95 "ReactionID": reaction.id,
96 "ObjectiveCoefficient": coeff
97 })
98
99 return pd.DataFrame(coeffs)
75 100
76 def generate_bounds(model:cobraModel) -> pd.DataFrame: 101 def generate_bounds(model:cobraModel) -> pd.DataFrame:
77 102
78 rxns = [] 103 rxns = []
79 for reaction in model.reactions: 104 for reaction in model.reactions:
402 print("No annotation in gene dict!") 427 print("No annotation in gene dict!")
403 return -1 428 return -1
404 rename_genes(model2,dict_genes) 429 rename_genes(model2,dict_genes)
405 430
406 return model2 431 return model2
432
433
434
435 # ---------- Utility helpers ----------
436 def _normalize_colname(col: str) -> str:
437 return col.strip().lower().replace(' ', '_')
438
439 def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]:
440 """
441 Cerca colonne utili e ritorna dict {ensg: colname1, hgnc_id: colname2, ...}
442 Lancia ValueError se non trova almeno un mapping utile.
443 """
444 cols = { _normalize_colname(c): c for c in mapping_df.columns }
445 chosen = {}
446 # possibili nomi per ciascuna categoria
447 candidates = {
448 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'],
449 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'],
450 'hgnc_symbol': ['hgnc_symbol', 'hgnc_symbol', 'symbol'],
451 'entrez_id': ['entrez', 'entrez_id', 'entrezgene']
452 }
453 for key, names in candidates.items():
454 for n in names:
455 if n in cols:
456 chosen[key] = cols[n]
457 break
458 return chosen
459
460 def _validate_target_uniqueness(mapping_df: 'pd.DataFrame',
461 source_col: str,
462 target_col: str,
463 model_source_genes: Optional[Set[str]] = None,
464 logger: Optional[logging.Logger] = None) -> None:
465 """
466 Verifica che, nel mapping_df (eventualmente già filtrato sui source di interesse),
467 ogni target sia associato ad al massimo un source. Se trova target associati a
468 >1 source solleva ValueError mostrando esempi.
469
470 - mapping_df: DataFrame con colonne source_col, target_col
471 - model_source_genes: se fornito, è un set di source normalizzati che stiamo traducendo
472 (se None, si usa tutto mapping_df)
473 """
474 if logger is None:
475 logger = logging.getLogger(__name__)
476
477 if mapping_df.empty:
478 logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.")
479 return
480
481 # normalizza le colonne temporanee per gruppi (senza modificare il df originale)
482 tmp = mapping_df[[source_col, target_col]].copy()
483 tmp['_src_norm'] = tmp[source_col].astype(str).map(_normalize_gene_id)
484 tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip()
485
486 # se è passato un insieme di geni modello, filtra qui (già fatto nella chiamata, ma doppio-check ok)
487 if model_source_genes is not None:
488 tmp = tmp[tmp['_src_norm'].isin(model_source_genes)]
489
490 if tmp.empty:
491 logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.")
492 return
493
494 # costruisci il reverse mapping target -> set(sources)
495 grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna()))
496 # trova target con più di 1 source
497 problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1}
498
499 if problematic:
500 # prepara messaggio di errore con esempi (fino a 20)
501 sample_items = list(problematic.items())[:20]
502 msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."]
503 for tgt, sources in sample_items:
504 msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}")
505 if len(problematic) > len(sample_items):
506 msg_lines.append(f" ... and {len(problematic) - len(sample_items)} more cases.")
507 full_msg = "\n".join(msg_lines)
508 # loggare e sollevare errore
509 logger.error(full_msg)
510 raise ValueError(full_msg)
511
512 # se tutto ok
513 logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).")
514
515
516 def _normalize_gene_id(g: str) -> str:
517 """Rendi consistente un gene id per l'uso come chiave (rimuove prefissi come 'HGNC:' e strip)."""
518 if g is None:
519 return ""
520 g = str(g).strip()
521 # remove common prefixes
522 g = re.sub(r'^(HGNC:)', '', g, flags=re.IGNORECASE)
523 g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE)
524 return g
525
526 # ---------- Main public function ----------
527 def translate_model_genes(model: 'cobra.Model',
528 mapping_df: 'pd.DataFrame',
529 target_nomenclature: str,
530 source_nomenclature: str = 'hgnc_id',
531 logger: Optional[logging.Logger] = None) -> 'cobra.Model':
532 """
533 Translate model genes from source_nomenclature to target_nomenclature.
534 mapping_df should contain at least columns that allow the mapping
535 (e.g. ensg, hgnc_id, hgnc_symbol, entrez).
536 """
537 if logger is None:
538 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
539 logger = logging.getLogger(__name__)
540
541 logger.info(f"Translating genes from '{source_nomenclature}' to '{target_nomenclature}'")
542
543 # normalize column names and choose relevant columns
544 chosen = _choose_columns(mapping_df)
545 if not chosen:
546 raise ValueError("Could not detect useful columns in mapping_df. Expected at least one of: ensg, hgnc_id, hgnc_symbol, entrez.")
547
548 # map source/target to actual dataframe column names (allow user-specified source/target keys)
549 # normalize input args
550 src_key = source_nomenclature.strip().lower()
551 tgt_key = target_nomenclature.strip().lower()
552
553 # try to find the actual column names for requested keys
554 # support synonyms: user may pass "ensg" or "ENSG" etc.
555 col_for_src = None
556 col_for_tgt = None
557 # first, try exact match
558 for k, actual in chosen.items():
559 if k == src_key:
560 col_for_src = actual
561 if k == tgt_key:
562 col_for_tgt = actual
563
564 # if not found, try mapping common names
565 if col_for_src is None:
566 # fallback: if user passed 'hgnc_id' but chosen has only 'hgnc_symbol', it's not useful
567 # we require at least the source column to exist
568 possible_src_names = {k: v for k, v in chosen.items()}
569 # try to match by contained substring
570 for k, actual in possible_src_names.items():
571 if src_key in k:
572 col_for_src = actual
573 break
574
575 if col_for_tgt is None:
576 for k, actual in chosen.items():
577 if tgt_key in k:
578 col_for_tgt = actual
579 break
580
581 if col_for_src is None:
582 raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.")
583 if col_for_tgt is None:
584 raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.")
585
586
587 model_source_genes = { _normalize_gene_id(g.id) for g in model.genes }
588 logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).")
589
590 tmp_map = mapping_df[[col_for_src, col_for_tgt]].dropna().copy()
591 tmp_map[col_for_src + "_norm"] = tmp_map[col_for_src].astype(str).map(_normalize_gene_id)
592
593 filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy()
594
595 # Se non ci sono righe rilevanti, avvisa (possono non esserci mapping per i geni presenti)
596 if filtered_map.empty:
597 logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).")
598
599 # --- VALIDAZIONE: nessun target deve essere mappato da piu' di un source (nell'insieme filtrato) ---
600 # Se vuoi la verifica su tutto il dataframe (non solo sui geni del modello), passa model_source_genes=None.
601 _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger)
602
603 # Ora crea il mapping solo sul sottoinsieme filtrato (piu' efficiente)
604 # ATTENZIONE: _create_gene_mapping si aspetta i nomi originali delle colonne
605 # quindi passiamo filtered_map con le colonne rimappate (senza la col_for_src + "_norm")
606 gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger)
607
608 # copy model
609 model_copy = model.copy()
610
611 # statistics
612 stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0}
613 unmapped = []
614 multi = []
615
616 original_genes = {g.id for g in model_copy.genes}
617 logger.info(f"Original genes count: {len(original_genes)}")
618
619 # translate GPRs
620 for rxn in model_copy.reactions:
621 gpr = rxn.gene_reaction_rule
622 if gpr and gpr.strip():
623 new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger)
624 if new_gpr != gpr:
625 rxn.gene_reaction_rule = new_gpr
626 logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{new_gpr}'")
627
628 # update model genes based on new GPRs
629 _update_model_genes(model_copy, logger)
630
631 # final logging
632 _log_translation_statistics(stats, unmapped, multi, original_genes, model_copy.genes, logger)
633
634 logger.info("Translation finished")
635 return model_copy
636
637
638 # ---------- helper functions ----------
639 def _create_gene_mapping(mapping_df, source_col: str, target_col: str, logger: logging.Logger) -> Dict[str, List[str]]:
640 """
641 Build mapping dict: source_id -> list of target_ids
642 Normalizes IDs (removes prefixes like 'HGNC:' etc).
643 """
644 df = mapping_df[[source_col, target_col]].dropna().copy()
645 # normalize to string
646 df[source_col] = df[source_col].astype(str).map(_normalize_gene_id)
647 df[target_col] = df[target_col].astype(str).str.strip()
648
649 df = df.drop_duplicates()
650
651 logger.info(f"Creating mapping from {len(df)} rows")
652
653 mapping = defaultdict(list)
654 for _, row in df.iterrows():
655 s = row[source_col]
656 t = row[target_col]
657 if t not in mapping[s]:
658 mapping[s].append(t)
659
660 # stats
661 one_to_one = sum(1 for v in mapping.values() if len(v) == 1)
662 one_to_many = sum(1 for v in mapping.values() if len(v) > 1)
663 logger.info(f"Mapping: {len(mapping)} source keys, {one_to_one} 1:1, {one_to_many} 1:many")
664 return dict(mapping)
665
666
667 def _translate_gpr(gpr_string: str,
668 gene_mapping: Dict[str, List[str]],
669 stats: Dict[str, int],
670 unmapped_genes: List[str],
671 multi_mapping_genes: List[Tuple[str, List[str]]],
672 logger: logging.Logger) -> str:
673 """
674 Translate genes inside a GPR string using gene_mapping.
675 Returns new GPR string.
676 """
677 # Generic token pattern: letters, digits, :, _, -, ., (captures HGNC:1234, ENSG000..., symbols)
678 token_pattern = r'\b[A-Za-z0-9:_.-]+\b'
679 tokens = re.findall(token_pattern, gpr_string)
680
681 logical = {'and', 'or', 'AND', 'OR', '(', ')'}
682 tokens = [t for t in tokens if t not in logical]
683
684 new_gpr = gpr_string
685
686 for token in sorted(set(tokens), key=lambda x: -len(x)): # longer tokens first to avoid partial replacement
687 norm = _normalize_gene_id(token)
688 if norm in gene_mapping:
689 targets = gene_mapping[norm]
690 stats['translated'] += 1
691 if len(targets) == 1:
692 stats['one_to_one'] += 1
693 replacement = targets[0]
694 else:
695 stats['one_to_many'] += 1
696 multi_mapping_genes.append((token, targets))
697 replacement = "(" + " or ".join(targets) + ")"
698
699 pattern = r'\b' + re.escape(token) + r'\b'
700 new_gpr = re.sub(pattern, replacement, new_gpr)
701 else:
702 stats['not_found'] += 1
703 if token not in unmapped_genes:
704 unmapped_genes.append(token)
705 logger.debug(f"Token not found in mapping (left as-is): {token}")
706
707 return new_gpr
708
709
710 def _update_model_genes(model: 'cobra.Model', logger: logging.Logger):
711 """
712 Rebuild model.genes from gene_reaction_rule content.
713 Removes genes not referenced and adds missing ones.
714 """
715 # collect genes in GPRs
716 gene_pattern = r'\b[A-Za-z0-9:_.-]+\b'
717 logical = {'and', 'or', 'AND', 'OR', '(', ')'}
718 genes_in_gpr: Set[str] = set()
719
720 for rxn in model.reactions:
721 gpr = rxn.gene_reaction_rule
722 if gpr and gpr.strip():
723 toks = re.findall(gene_pattern, gpr)
724 toks = [t for t in toks if t not in logical]
725 # normalize IDs consistent with mapping normalization
726 toks = [_normalize_gene_id(t) for t in toks]
727 genes_in_gpr.update(toks)
728
729 # existing gene ids
730 existing = {g.id for g in model.genes}
731
732 # remove obsolete genes
733 to_remove = [gid for gid in existing if gid not in genes_in_gpr]
734 removed = 0
735 for gid in to_remove:
736 try:
737 gene_obj = model.genes.get_by_id(gid)
738 model.genes.remove(gene_obj)
739 removed += 1
740 except Exception:
741 # safe-ignore
742 pass
743
744 # add new genes
745 added = 0
746 for gid in genes_in_gpr:
747 if gid not in existing:
748 new_gene = cobra.Gene(gid)
749 try:
750 model.genes.add(new_gene)
751 except Exception:
752 # fallback: if model.genes doesn't support add, try append or model.add_genes
753 try:
754 model.genes.append(new_gene)
755 except Exception:
756 try:
757 model.add_genes([new_gene])
758 except Exception:
759 logger.warning(f"Could not add gene object for {gid}")
760 added += 1
761
762 logger.info(f"Model genes updated: removed {removed}, added {added}")
763
764
765 def _log_translation_statistics(stats: Dict[str, int],
766 unmapped_genes: List[str],
767 multi_mapping_genes: List[Tuple[str, List[str]]],
768 original_genes: Set[str],
769 final_genes,
770 logger: logging.Logger):
771 logger.info("=== TRANSLATION STATISTICS ===")
772 logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})")
773 logger.info(f"Not found tokens: {stats.get('not_found', 0)}")
774
775 final_ids = {g.id for g in final_genes}
776 logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}")
777
778 if unmapped_genes:
779 logger.warning(f"Unmapped tokens ({len(unmapped_genes)}): {', '.join(unmapped_genes[:20])}{(' ...' if len(unmapped_genes)>20 else '')}")
780 if multi_mapping_genes:
781 logger.info(f"Multi-mapping examples ({len(multi_mapping_genes)}):")
782 for orig, targets in multi_mapping_genes[:10]:
783 logger.info(f" {orig} -> {', '.join(targets)}")