418
|
1 import os
|
|
2 import csv
|
|
3 import cobra
|
|
4 import pickle
|
|
5 import argparse
|
|
6 import pandas as pd
|
419
|
7 import re
|
|
8 from typing import Optional, Tuple, Union, List, Dict, Set
|
418
|
9 import utils.general_utils as utils
|
|
10 import utils.rule_parsing as rulesUtils
|
419
|
11 import utils.reaction_parsing as reactionUtils
|
|
12 from cobra import Model as cobraModel, Reaction, Metabolite
|
418
|
13
|
|
14 ################################- DATA GENERATION -################################
|
|
15 ReactionId = str
|
419
|
16 def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]:
|
418
|
17 """
|
|
18 Generates a dictionary mapping reaction ids to rules from the model.
|
|
19
|
|
20 Args:
|
|
21 model : the model to derive data from.
|
|
22 asParsed : if True parses the rules to an optimized runtime format, otherwise leaves them as strings.
|
|
23
|
|
24 Returns:
|
|
25 Dict[ReactionId, rulesUtils.OpList] : the generated dictionary of parsed rules.
|
|
26 Dict[ReactionId, str] : the generated dictionary of raw rules.
|
|
27 """
|
|
28 # Is the below approach convoluted? yes
|
|
29 # Ok but is it inefficient? probably
|
|
30 # Ok but at least I don't have to repeat the check at every rule (I'm clinically insane)
|
|
31 _ruleGetter = lambda reaction : reaction.gene_reaction_rule
|
|
32 ruleExtractor = (lambda reaction :
|
|
33 rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter
|
|
34
|
|
35 return {
|
|
36 reaction.id : ruleExtractor(reaction)
|
|
37 for reaction in model.reactions
|
|
38 if reaction.gene_reaction_rule }
|
|
39
|
419
|
40 def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]:
|
418
|
41 """
|
|
42 Generates a dictionary mapping reaction ids to reaction formulas from the model.
|
|
43
|
|
44 Args:
|
|
45 model : the model to derive data from.
|
|
46 asParsed : if True parses the reactions to an optimized runtime format, otherwise leaves them as they are.
|
|
47
|
|
48 Returns:
|
|
49 Dict[ReactionId, str] : the generated dictionary.
|
|
50 """
|
|
51
|
|
52 unparsedReactions = {
|
|
53 reaction.id : reaction.reaction
|
|
54 for reaction in model.reactions
|
|
55 if reaction.reaction
|
|
56 }
|
|
57
|
|
58 if not asParsed: return unparsedReactions
|
|
59
|
|
60 return reactionUtils.create_reaction_dict(unparsedReactions)
|
|
61
|
419
|
62 def get_medium(model:cobraModel) -> pd.DataFrame:
|
418
|
63 trueMedium=[]
|
|
64 for r in model.reactions:
|
|
65 positiveCoeff=0
|
|
66 for m in r.metabolites:
|
|
67 if r.get_coefficient(m.id)>0:
|
|
68 positiveCoeff=1;
|
|
69 if (positiveCoeff==0 and r.lower_bound<0):
|
|
70 trueMedium.append(r.id)
|
|
71
|
|
72 df_medium = pd.DataFrame()
|
|
73 df_medium["reaction"] = trueMedium
|
|
74 return df_medium
|
|
75
|
419
|
76 def generate_bounds(model:cobraModel) -> pd.DataFrame:
|
418
|
77
|
|
78 rxns = []
|
|
79 for reaction in model.reactions:
|
|
80 rxns.append(reaction.id)
|
|
81
|
|
82 bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns)
|
|
83
|
|
84 for reaction in model.reactions:
|
|
85 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound]
|
|
86 return bounds
|
|
87
|
|
88
|
|
89
|
419
|
90 def generate_compartments(model: cobraModel) -> pd.DataFrame:
|
418
|
91 """
|
|
92 Generates a DataFrame containing compartment information for each reaction.
|
|
93 Creates columns for each compartment position (Compartment_1, Compartment_2, etc.)
|
|
94
|
|
95 Args:
|
|
96 model: the COBRA model to extract compartment data from.
|
|
97
|
|
98 Returns:
|
|
99 pd.DataFrame: DataFrame with ReactionID and compartment columns
|
|
100 """
|
|
101 pathway_data = []
|
|
102
|
|
103 # First pass: determine the maximum number of pathways any reaction has
|
|
104 max_pathways = 0
|
|
105 reaction_pathways = {}
|
|
106
|
|
107 for reaction in model.reactions:
|
|
108 # Get unique pathways from all metabolites in the reaction
|
|
109 if type(reaction.annotation['pathways']) == list:
|
|
110 reaction_pathways[reaction.id] = reaction.annotation['pathways']
|
|
111 max_pathways = max(max_pathways, len(reaction.annotation['pathways']))
|
|
112 else:
|
|
113 reaction_pathways[reaction.id] = [reaction.annotation['pathways']]
|
|
114
|
|
115 # Create column names for pathways
|
|
116 pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)]
|
|
117
|
|
118 # Second pass: create the data
|
|
119 for reaction_id, pathways in reaction_pathways.items():
|
|
120 row = {"ReactionID": reaction_id}
|
|
121
|
|
122 # Fill pathway columns
|
|
123 for i in range(max_pathways):
|
|
124 col_name = pathway_columns[i]
|
|
125 if i < len(pathways):
|
|
126 row[col_name] = pathways[i]
|
|
127 else:
|
|
128 row[col_name] = None # or "" if you prefer empty strings
|
|
129
|
|
130 pathway_data.append(row)
|
|
131
|
419
|
132 return pd.DataFrame(pathway_data)
|
|
133
|
|
134
|
|
135
|
|
136 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel:
|
|
137 """
|
|
138 Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni.
|
|
139
|
|
140 Args:
|
|
141 csv_path: Path al file CSV (separato da tab)
|
|
142 model_id: ID del modello da creare
|
|
143
|
|
144 Returns:
|
|
145 cobra.Model: Il modello COBRApy costruito
|
|
146 """
|
|
147
|
|
148 # Leggi i dati dal CSV
|
|
149 df = pd.read_csv(csv_path, sep='\t')
|
|
150
|
|
151 # Crea il modello vuoto
|
|
152 model = cobraModel(model_id)
|
|
153
|
|
154 # Dict per tenere traccia di metaboliti e compartimenti
|
|
155 metabolites_dict = {}
|
|
156 compartments_dict = {}
|
|
157
|
|
158 print(f"Costruendo modello da {len(df)} reazioni...")
|
|
159
|
|
160 # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni
|
|
161 for idx, row in df.iterrows():
|
|
162 reaction_formula = str(row['Reaction']).strip()
|
|
163 if not reaction_formula or reaction_formula == 'nan':
|
|
164 continue
|
|
165
|
|
166 # Estrai metaboliti dalla formula della reazione
|
|
167 metabolites = extract_metabolites_from_reaction(reaction_formula)
|
|
168
|
|
169 for met_id in metabolites:
|
|
170 compartment = extract_compartment_from_metabolite(met_id)
|
|
171
|
|
172 # Aggiungi compartimento se non esiste
|
|
173 if compartment not in compartments_dict:
|
|
174 compartments_dict[compartment] = compartment
|
|
175
|
|
176 # Aggiungi metabolita se non esiste
|
|
177 if met_id not in metabolites_dict:
|
|
178 metabolites_dict[met_id] = Metabolite(
|
|
179 id=met_id,
|
|
180 compartment=compartment,
|
|
181 name=met_id.replace(f"_{compartment}", "").replace("__", "_")
|
|
182 )
|
|
183
|
|
184 # Aggiungi compartimenti al modello
|
|
185 model.compartments = compartments_dict
|
|
186
|
|
187 # Aggiungi metaboliti al modello
|
|
188 model.add_metabolites(list(metabolites_dict.values()))
|
|
189
|
|
190 print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti")
|
|
191
|
|
192 # Seconda passata: aggiungi le reazioni
|
|
193 reactions_added = 0
|
|
194 reactions_skipped = 0
|
|
195
|
|
196 for idx, row in df.iterrows():
|
|
197
|
|
198 reaction_id = str(row['ReactionID']).strip()
|
|
199 reaction_formula = str(row['Reaction']).strip()
|
|
200
|
|
201 # Salta reazioni senza formula
|
|
202 if not reaction_formula or reaction_formula == 'nan':
|
|
203 raise ValueError(f"Formula della reazione mancante {reaction_id}")
|
|
204
|
|
205 # Crea la reazione
|
|
206 reaction = Reaction(reaction_id)
|
|
207 reaction.name = reaction_id
|
|
208
|
|
209 # Imposta bounds
|
|
210 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0
|
|
211 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0
|
|
212
|
|
213 # Aggiungi gene rule se presente
|
|
214 if pd.notna(row['Rule']) and str(row['Rule']).strip():
|
|
215 reaction.gene_reaction_rule = str(row['Rule']).strip()
|
|
216
|
|
217 # Parse della formula della reazione
|
|
218 try:
|
|
219 parse_reaction_formula(reaction, reaction_formula, metabolites_dict)
|
|
220 except Exception as e:
|
|
221 print(f"Errore nel parsing della reazione {reaction_id}: {e}")
|
|
222 reactions_skipped += 1
|
|
223 continue
|
|
224
|
|
225 # Aggiungi la reazione al modello
|
|
226 model.add_reactions([reaction])
|
|
227 reactions_added += 1
|
|
228
|
|
229
|
|
230 print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni")
|
|
231
|
|
232 # Imposta l'obiettivo di biomassa
|
|
233 set_biomass_objective(model)
|
|
234
|
|
235 # Imposta il medium
|
|
236 set_medium_from_data(model, df)
|
|
237
|
|
238 print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti")
|
|
239
|
|
240 return model
|
|
241
|
|
242
|
|
243 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore)
|
|
244 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
|
|
245 """
|
|
246 Estrae gli ID dei metaboliti da una formula di reazione.
|
|
247 Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e)
|
|
248 e permette che comincino con cifre o underscore.
|
|
249 """
|
|
250 metabolites = set()
|
|
251 # coefficiente opzionale seguito da un token che termina con _<letters>
|
|
252 pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)'
|
|
253 matches = re.findall(pattern, reaction_formula)
|
|
254 metabolites.update(matches)
|
|
255 return metabolites
|
|
256
|
|
257
|
|
258 def extract_compartment_from_metabolite(metabolite_id: str) -> str:
|
|
259 """
|
|
260 Estrae il compartimento dall'ID del metabolita.
|
|
261 """
|
|
262 # Il compartimento è solitamente l'ultima lettera dopo l'underscore
|
|
263 if '_' in metabolite_id:
|
|
264 return metabolite_id.split('_')[-1]
|
|
265 return 'c' # default cytoplasm
|
|
266
|
|
267
|
|
268 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]):
|
|
269 """
|
|
270 Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti.
|
|
271 """
|
|
272
|
|
273 if reaction.id == 'EX_thbpt_e':
|
|
274 print(reaction.id)
|
|
275 print(formula)
|
|
276 # Dividi in parte sinistra e destra
|
|
277 if '<=>' in formula:
|
|
278 left, right = formula.split('<=>')
|
|
279 reversible = True
|
|
280 elif '<--' in formula:
|
|
281 left, right = formula.split('<--')
|
|
282 reversible = False
|
|
283 left, right = left, right
|
|
284 elif '-->' in formula:
|
|
285 left, right = formula.split('-->')
|
|
286 reversible = False
|
|
287 elif '<-' in formula:
|
|
288 left, right = formula.split('<-')
|
|
289 reversible = False
|
|
290 left, right = left, right
|
|
291 else:
|
|
292 raise ValueError(f"Formato reazione non riconosciuto: {formula}")
|
|
293
|
|
294 # Parse dei metaboliti e coefficienti
|
|
295 reactants = parse_metabolites_side(left.strip())
|
|
296 products = parse_metabolites_side(right.strip())
|
|
297
|
|
298 # Aggiungi metaboliti alla reazione
|
|
299 metabolites_to_add = {}
|
|
300
|
|
301 # Reagenti (coefficienti negativi)
|
|
302 for met_id, coeff in reactants.items():
|
|
303 if met_id in metabolites_dict:
|
|
304 metabolites_to_add[metabolites_dict[met_id]] = -coeff
|
|
305
|
|
306 # Prodotti (coefficienti positivi)
|
|
307 for met_id, coeff in products.items():
|
|
308 if met_id in metabolites_dict:
|
|
309 metabolites_to_add[metabolites_dict[met_id]] = coeff
|
|
310
|
|
311 reaction.add_metabolites(metabolites_to_add)
|
|
312
|
|
313
|
|
314 def parse_metabolites_side(side_str: str) -> Dict[str, float]:
|
|
315 """
|
|
316 Parsa un lato della reazione per estrarre metaboliti e coefficienti.
|
|
317 """
|
|
318 metabolites = {}
|
|
319 if not side_str or side_str.strip() == '':
|
|
320 return metabolites
|
|
321
|
|
322 terms = side_str.split('+')
|
|
323 for term in terms:
|
|
324 term = term.strip()
|
|
325 if not term:
|
|
326 continue
|
|
327
|
|
328 # pattern allineato: coefficiente opzionale + id che termina con _<compartimento>
|
|
329 match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term)
|
|
330 if match:
|
|
331 coeff_str, met_id = match.groups()
|
|
332 coeff = float(coeff_str) if coeff_str else 1.0
|
|
333 metabolites[met_id] = coeff
|
|
334
|
|
335 return metabolites
|
|
336
|
|
337
|
|
338
|
|
339 def set_biomass_objective(model: cobraModel):
|
|
340 """
|
|
341 Imposta la reazione di biomassa come obiettivo.
|
|
342 """
|
|
343 biomass_reactions = [r for r in model.reactions if 'biomass' in r.id.lower()]
|
|
344
|
|
345 if biomass_reactions:
|
|
346 model.objective = biomass_reactions[0].id
|
|
347 print(f"Obiettivo impostato su: {biomass_reactions[0].id}")
|
|
348 else:
|
|
349 print("Nessuna reazione di biomassa trovata")
|
|
350
|
|
351
|
|
352 def set_medium_from_data(model: cobraModel, df: pd.DataFrame):
|
|
353 """
|
|
354 Imposta il medium basato sulla colonna InMedium.
|
|
355 """
|
|
356 medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist()
|
|
357
|
|
358 medium_dict = {}
|
|
359 for rxn_id in medium_reactions:
|
|
360 if rxn_id in [r.id for r in model.reactions]:
|
|
361 reaction = model.reactions.get_by_id(rxn_id)
|
|
362 if reaction.lower_bound < 0: # Solo reazioni di uptake
|
|
363 medium_dict[rxn_id] = abs(reaction.lower_bound)
|
|
364
|
|
365 if medium_dict:
|
|
366 model.medium = medium_dict
|
|
367 print(f"Medium impostato con {len(medium_dict)} componenti")
|
|
368
|
|
369
|
|
370 def validate_model(model: cobraModel) -> Dict[str, any]:
|
|
371 """
|
|
372 Valida il modello e fornisce statistiche di base.
|
|
373 """
|
|
374 validation = {
|
|
375 'num_reactions': len(model.reactions),
|
|
376 'num_metabolites': len(model.metabolites),
|
|
377 'num_genes': len(model.genes),
|
|
378 'num_compartments': len(model.compartments),
|
|
379 'objective': str(model.objective),
|
|
380 'medium_size': len(model.medium),
|
|
381 'reversible_reactions': len([r for r in model.reactions if r.reversibility]),
|
|
382 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]),
|
|
383 }
|
|
384
|
|
385 try:
|
|
386 # Test di crescita
|
|
387 solution = model.optimize()
|
|
388 validation['growth_rate'] = solution.objective_value
|
|
389 validation['status'] = solution.status
|
|
390 except Exception as e:
|
|
391 validation['growth_rate'] = None
|
|
392 validation['status'] = f"Error: {e}"
|
|
393
|
|
394 return validation
|
|
395
|
|
396 def convert_genes(model,annotation):
|
|
397 from cobra.manipulation import rename_genes
|
|
398 model2=model.copy()
|
|
399 try:
|
|
400 dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes}
|
|
401 except:
|
|
402 print("No annotation in gene dict!")
|
|
403 return -1
|
|
404 rename_genes(model2,dict_genes)
|
|
405
|
|
406 return model2 |