# HG changeset patch
# User francesco_lapi
# Date 1757689554 0
# Node ID 4e2bc80764b6a56510970c01560977993e50cb8a
# Parent 3654c08668f10289c8b577b189558461349dc7e4
Uploaded
diff -r 3654c08668f1 -r 4e2bc80764b6 COBRAxy/fromCSVtoCOBRA_beta.py
--- a/COBRAxy/fromCSVtoCOBRA_beta.py Thu Sep 11 21:02:09 2025 +0000
+++ b/COBRAxy/fromCSVtoCOBRA_beta.py Fri Sep 12 15:05:54 2025 +0000
@@ -63,6 +63,7 @@
logging.error('Input file not found: %s', ARGS.input)
out_dir = os.path.dirname(os.path.abspath(ARGS.output))
+
if out_dir and not os.path.isdir(out_dir):
try:
os.makedirs(out_dir, exist_ok=True)
@@ -70,7 +71,6 @@
except Exception as e:
logging.exception('Cannot create output directory: %s', out_dir)
- # Build the model from the CSV (NOTE: use ARGS.input here)
model = modelUtils.build_cobra_model_from_csv(ARGS.input)
# Save model in requested format
diff -r 3654c08668f1 -r 4e2bc80764b6 COBRAxy/local/svg metabolic maps/ENGRO2_map.svg
--- a/COBRAxy/local/svg metabolic maps/ENGRO2_map.svg Thu Sep 11 21:02:09 2025 +0000
+++ b/COBRAxy/local/svg metabolic maps/ENGRO2_map.svg Fri Sep 12 15:05:54 2025 +0000
@@ -203,7 +203,7 @@
y="42.726181">Down-regulated in dataset2Down-regulated in dataset1Down-regulated in dataset2Down-regulated in dataset1 len(sample_items):
- msg_lines.append(f" ... and {len(problematic) - len(sample_items)} more cases.")
full_msg = "\n".join(msg_lines)
# loggare e sollevare errore
- logger.error(full_msg)
- raise ValueError(full_msg)
+ logger.warning(full_msg)
# se tutto ok
logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).")
@@ -535,16 +533,129 @@
g = re.sub(r'^(ENSG:)', '', g, flags=re.IGNORECASE)
return g
+def _simplify_boolean_expression(expr: str) -> str:
+ """
+ Semplifica un'espressione booleana rimuovendo duplicati e riducendo ridondanze.
+ Gestisce espressioni con 'and' e 'or'.
+ """
+ if not expr or not expr.strip():
+ return expr
+
+ # Normalizza gli operatori
+ expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ')
+
+ # Funzione ricorsiva per processare le espressioni
+ def process_expression(s: str) -> str:
+ s = s.strip()
+ if not s:
+ return s
+
+ # Gestisci le parentesi
+ while '(' in s:
+ # Trova la parentesi più interna
+ start = -1
+ for i, c in enumerate(s):
+ if c == '(':
+ start = i
+ elif c == ')' and start != -1:
+ # Processa il contenuto tra parentesi
+ inner = s[start+1:i]
+ processed_inner = process_expression(inner)
+ s = s[:start] + processed_inner + s[i+1:]
+ break
+ else:
+ break
+
+ # Dividi per 'or' al livello più alto
+ or_parts = []
+ current_part = ""
+ paren_count = 0
+
+ tokens = s.split()
+ i = 0
+ while i < len(tokens):
+ token = tokens[i]
+ if token == 'or' and paren_count == 0:
+ if current_part.strip():
+ or_parts.append(current_part.strip())
+ current_part = ""
+ else:
+ if token.count('(') > token.count(')'):
+ paren_count += token.count('(') - token.count(')')
+ elif token.count(')') > token.count('('):
+ paren_count -= token.count(')') - token.count('(')
+ current_part += token + " "
+ i += 1
+
+ if current_part.strip():
+ or_parts.append(current_part.strip())
+
+ # Processa ogni parte OR
+ processed_or_parts = []
+ for or_part in or_parts:
+ # Dividi per 'and' all'interno di ogni parte OR
+ and_parts = []
+ current_and = ""
+ paren_count = 0
+
+ and_tokens = or_part.split()
+ j = 0
+ while j < len(and_tokens):
+ token = and_tokens[j]
+ if token == 'and' and paren_count == 0:
+ if current_and.strip():
+ and_parts.append(current_and.strip())
+ current_and = ""
+ else:
+ if token.count('(') > token.count(')'):
+ paren_count += token.count('(') - token.count(')')
+ elif token.count(')') > token.count('('):
+ paren_count -= token.count(')') - token.count('(')
+ current_and += token + " "
+ j += 1
+
+ if current_and.strip():
+ and_parts.append(current_and.strip())
+
+ # Rimuovi duplicati nelle parti AND
+ unique_and_parts = list(dict.fromkeys(and_parts)) # mantiene l'ordine
+
+ if len(unique_and_parts) == 1:
+ processed_or_parts.append(unique_and_parts[0])
+ elif len(unique_and_parts) > 1:
+ processed_or_parts.append(" and ".join(unique_and_parts))
+
+ # Rimuovi duplicati nelle parti OR
+ unique_or_parts = list(dict.fromkeys(processed_or_parts))
+
+ if len(unique_or_parts) == 1:
+ return unique_or_parts[0]
+ elif len(unique_or_parts) > 1:
+ return " or ".join(unique_or_parts)
+ else:
+ return ""
+
+ try:
+ return process_expression(expr)
+ except Exception:
+ # Se la semplificazione fallisce, ritorna l'espressione originale
+ return expr
+
# ---------- Main public function ----------
def translate_model_genes(model: 'cobra.Model',
mapping_df: 'pd.DataFrame',
target_nomenclature: str,
source_nomenclature: str = 'hgnc_id',
+ allow_many_to_one: bool = False,
logger: Optional[logging.Logger] = None) -> 'cobra.Model':
"""
Translate model genes from source_nomenclature to target_nomenclature.
mapping_df should contain at least columns that allow the mapping
(e.g. ensg, hgnc_id, hgnc_symbol, entrez).
+
+ Args:
+ allow_many_to_one: Se True, permette che più source vengano mappati allo stesso target
+ e gestisce i duplicati nelle GPR. Se False, valida l'unicità dei target.
"""
if logger is None:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -563,7 +674,6 @@
tgt_key = target_nomenclature.strip().lower()
# try to find the actual column names for requested keys
- # support synonyms: user may pass "ensg" or "ENSG" etc.
col_for_src = None
col_for_tgt = None
# first, try exact match
@@ -575,8 +685,6 @@
# if not found, try mapping common names
if col_for_src is None:
- # fallback: if user passed 'hgnc_id' but chosen has only 'hgnc_symbol', it's not useful
- # we require at least the source column to exist
possible_src_names = {k: v for k, v in chosen.items()}
# try to match by contained substring
for k, actual in possible_src_names.items():
@@ -594,7 +702,6 @@
raise ValueError(f"Source column for '{source_nomenclature}' not found in mapping dataframe.")
if col_for_tgt is None:
raise ValueError(f"Target column for '{target_nomenclature}' not found in mapping dataframe.")
-
model_source_genes = { _normalize_gene_id(g.id) for g in model.genes }
logger.info(f"Filtering mapping to {len(model_source_genes)} source genes present in model (normalized).")
@@ -604,24 +711,22 @@
filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy()
- # Se non ci sono righe rilevanti, avvisa (possono non esserci mapping per i geni presenti)
+ # Se non ci sono righe rilevanti, avvisa
if filtered_map.empty:
logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).")
- # --- VALIDAZIONE: nessun target deve essere mappato da piu' di un source (nell'insieme filtrato) ---
- # Se vuoi la verifica su tutto il dataframe (non solo sui geni del modello), passa model_source_genes=None.
- _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger)
+ # --- VALIDAZIONE: opzionale in base al parametro allow_many_to_one ---
+ if not allow_many_to_one:
+ _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger)
- # Ora crea il mapping solo sul sottoinsieme filtrato (piu' efficiente)
- # ATTENZIONE: _create_gene_mapping si aspetta i nomi originali delle colonne
- # quindi passiamo filtered_map con le colonne rimappate (senza la col_for_src + "_norm")
+ # Crea il mapping
gene_mapping = _create_gene_mapping(filtered_map, col_for_src, col_for_tgt, logger)
# copy model
model_copy = model.copy()
# statistics
- stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0}
+ stats = {'translated': 0, 'one_to_one': 0, 'one_to_many': 0, 'not_found': 0, 'simplified_gprs': 0}
unmapped = []
multi = []
@@ -634,8 +739,13 @@
if gpr and gpr.strip():
new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger)
if new_gpr != gpr:
- rxn.gene_reaction_rule = new_gpr
- logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{new_gpr}'")
+ # Semplifica l'espressione booleana per rimuovere duplicati
+ simplified_gpr = _simplify_boolean_expression(new_gpr)
+ if simplified_gpr != new_gpr:
+ stats['simplified_gprs'] += 1
+ logger.debug(f"Simplified GPR for {rxn.id}: '{new_gpr}' -> '{simplified_gpr}'")
+ rxn.gene_reaction_rule = simplified_gpr
+ logger.debug(f"Reaction {rxn.id}: '{gpr}' -> '{simplified_gpr}'")
# update model genes based on new GPRs
_update_model_genes(model_copy, logger)
@@ -783,6 +893,7 @@
logger.info("=== TRANSLATION STATISTICS ===")
logger.info(f"Translated: {stats.get('translated', 0)} (1:1 = {stats.get('one_to_one', 0)}, 1:many = {stats.get('one_to_many', 0)})")
logger.info(f"Not found tokens: {stats.get('not_found', 0)}")
+ logger.info(f"Simplified GPRs: {stats.get('simplified_gprs', 0)}")
final_ids = {g.id for g in final_genes}
logger.info(f"Genes in model: {len(original_genes)} -> {len(final_ids)}")