Mercurial > repos > bimib > cobraxy
comparison COBRAxy/flux_simulation_beta.py @ 410:d660c5b03c14 draft
Uploaded
author | francesco_lapi |
---|---|
date | Mon, 08 Sep 2025 17:33:52 +0000 |
parents | |
children | 6b015d3184ab |
comparison
equal
deleted
inserted
replaced
409:71850bdf9e1e | 410:d660c5b03c14 |
---|---|
1 import argparse | |
2 import utils.general_utils as utils | |
3 from typing import Optional, List | |
4 import os | |
5 import numpy as np | |
6 import pandas as pd | |
7 import cobra | |
8 import utils.CBS_backend as CBS_backend | |
9 from joblib import Parallel, delayed, cpu_count | |
10 from cobra.sampling import OptGPSampler | |
11 import sys | |
12 | |
13 | |
14 ################################# process args ############################### | |
15 def process_args(args :List[str] = None) -> argparse.Namespace: | |
16 """ | |
17 Processes command-line arguments. | |
18 | |
19 Args: | |
20 args (list): List of command-line arguments. | |
21 | |
22 Returns: | |
23 Namespace: An object containing parsed arguments. | |
24 """ | |
25 parser = argparse.ArgumentParser(usage = '%(prog)s [options]', | |
26 description = 'process some value\'s') | |
27 | |
28 parser.add_argument("-mo", "--model_upload", type = str, | |
29 help = "path to input file with custom rules, if provided") | |
30 | |
31 parser.add_argument('-ol', '--out_log', | |
32 help = "Output log") | |
33 | |
34 parser.add_argument('-td', '--tool_dir', | |
35 type = str, | |
36 required = True, | |
37 help = 'your tool directory') | |
38 | |
39 parser.add_argument('-in', '--input', | |
40 required = True, | |
41 type=str, | |
42 help = 'inputs bounds') | |
43 | |
44 parser.add_argument('-ni', '--names', | |
45 required = True, | |
46 type=str, | |
47 help = 'cell names') | |
48 | |
49 parser.add_argument('-a', '--algorithm', | |
50 type = str, | |
51 choices = ['OPTGP', 'CBS'], | |
52 required = True, | |
53 help = 'choose sampling algorithm') | |
54 | |
55 parser.add_argument('-th', '--thinning', | |
56 type = int, | |
57 default= 100, | |
58 required=False, | |
59 help = 'choose thinning') | |
60 | |
61 parser.add_argument('-ns', '--n_samples', | |
62 type = int, | |
63 required = True, | |
64 help = 'choose how many samples') | |
65 | |
66 parser.add_argument('-sd', '--seed', | |
67 type = int, | |
68 required = True, | |
69 help = 'seed') | |
70 | |
71 parser.add_argument('-nb', '--n_batches', | |
72 type = int, | |
73 required = True, | |
74 help = 'choose how many batches') | |
75 | |
76 parser.add_argument('-ot', '--output_type', | |
77 type = str, | |
78 required = True, | |
79 help = 'output type') | |
80 | |
81 parser.add_argument('-ota', '--output_type_analysis', | |
82 type = str, | |
83 required = False, | |
84 help = 'output type analysis') | |
85 | |
86 parser.add_argument('-idop', '--output_path', | |
87 type = str, | |
88 default='flux_simulation', | |
89 help = 'output path for maps') | |
90 | |
91 ARGS = parser.parse_args(args) | |
92 return ARGS | |
93 | |
94 ########################### warning ########################################### | |
95 def warning(s :str) -> None: | |
96 """ | |
97 Log a warning message to an output log file and print it to the console. | |
98 | |
99 Args: | |
100 s (str): The warning message to be logged and printed. | |
101 | |
102 Returns: | |
103 None | |
104 """ | |
105 with open(ARGS.out_log, 'a') as log: | |
106 log.write(s + "\n\n") | |
107 print(s) | |
108 | |
109 | |
110 def write_to_file(dataset: pd.DataFrame, name: str, keep_index:bool=False)->None: | |
111 dataset.index.name = 'Reactions' | |
112 dataset.to_csv(ARGS.output_path + "/" + name + ".csv", sep = '\t', index = keep_index) | |
113 | |
114 ############################ dataset input #################################### | |
115 def read_dataset(data :str, name :str) -> pd.DataFrame: | |
116 """ | |
117 Read a dataset from a CSV file and return it as a pandas DataFrame. | |
118 | |
119 Args: | |
120 data (str): Path to the CSV file containing the dataset. | |
121 name (str): Name of the dataset, used in error messages. | |
122 | |
123 Returns: | |
124 pandas.DataFrame: DataFrame containing the dataset. | |
125 | |
126 Raises: | |
127 pd.errors.EmptyDataError: If the CSV file is empty. | |
128 sys.exit: If the CSV file has the wrong format, the execution is aborted. | |
129 """ | |
130 try: | |
131 dataset = pd.read_csv(data, sep = '\t', header = 0, index_col=0, engine='python') | |
132 except pd.errors.EmptyDataError: | |
133 sys.exit('Execution aborted: wrong format of ' + name + '\n') | |
134 if len(dataset.columns) < 2: | |
135 sys.exit('Execution aborted: wrong format of ' + name + '\n') | |
136 return dataset | |
137 | |
138 | |
139 | |
140 def OPTGP_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, thinning:int=100, n_batches:int=1, seed:int=0)-> None: | |
141 """ | |
142 Samples from the OPTGP (Optimal Global Perturbation) algorithm and saves the results to CSV files. | |
143 | |
144 Args: | |
145 model (cobra.Model): The COBRA model to sample from. | |
146 model_name (str): The name of the model, used in naming output files. | |
147 n_samples (int, optional): Number of samples per batch. Default is 1000. | |
148 thinning (int, optional): Thinning parameter for the sampler. Default is 100. | |
149 n_batches (int, optional): Number of batches to run. Default is 1. | |
150 seed (int, optional): Random seed for reproducibility. Default is 0. | |
151 | |
152 Returns: | |
153 None | |
154 """ | |
155 | |
156 for i in range(0, n_batches): | |
157 optgp = OptGPSampler(model, thinning, seed) | |
158 samples = optgp.sample(n_samples) | |
159 samples.to_csv(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_OPTGP.csv', index=False) | |
160 seed+=1 | |
161 samplesTotal = pd.DataFrame() | |
162 for i in range(0, n_batches): | |
163 samples_batch = pd.read_csv(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_OPTGP.csv') | |
164 samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True) | |
165 | |
166 write_to_file(samplesTotal.T, model_name, True) | |
167 | |
168 for i in range(0, n_batches): | |
169 os.remove(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_OPTGP.csv') | |
170 pass | |
171 | |
172 | |
173 def CBS_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, n_batches:int=1, seed:int=0)-> None: | |
174 """ | |
175 Samples using the CBS (Constraint-based Sampling) algorithm and saves the results to CSV files. | |
176 | |
177 Args: | |
178 model (cobra.Model): The COBRA model to sample from. | |
179 model_name (str): The name of the model, used in naming output files. | |
180 n_samples (int, optional): Number of samples per batch. Default is 1000. | |
181 n_batches (int, optional): Number of batches to run. Default is 1. | |
182 seed (int, optional): Random seed for reproducibility. Default is 0. | |
183 | |
184 Returns: | |
185 None | |
186 """ | |
187 | |
188 df_FVA = cobra.flux_analysis.flux_variability_analysis(model,fraction_of_optimum=0).round(6) | |
189 | |
190 df_coefficients = CBS_backend.randomObjectiveFunction(model, n_samples*n_batches, df_FVA, seed=seed) | |
191 | |
192 for i in range(0, n_batches): | |
193 samples = pd.DataFrame(columns =[reaction.id for reaction in model.reactions], index = range(n_samples)) | |
194 try: | |
195 CBS_backend.randomObjectiveFunctionSampling(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], samples) | |
196 except Exception as e: | |
197 utils.logWarning( | |
198 "Warning: GLPK solver has failed for " + model_name + ". Trying with COBRA interface. Error:" + str(e), | |
199 ARGS.out_log) | |
200 CBS_backend.randomObjectiveFunctionSampling_cobrapy(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], | |
201 samples) | |
202 utils.logWarning(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv', ARGS.out_log) | |
203 samples.to_csv(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv', index=False) | |
204 | |
205 samplesTotal = pd.DataFrame() | |
206 for i in range(0, n_batches): | |
207 samples_batch = pd.read_csv(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv') | |
208 samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True) | |
209 | |
210 write_to_file(samplesTotal.T, model_name, True) | |
211 | |
212 for i in range(0, n_batches): | |
213 os.remove(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv') | |
214 pass | |
215 | |
216 | |
217 def model_sampler(model_input_original:cobra.Model, bounds_path:str, cell_name:str)-> List[pd.DataFrame]: | |
218 """ | |
219 Prepares the model with bounds from the dataset and performs sampling and analysis based on the selected algorithm. | |
220 | |
221 Args: | |
222 model_input_original (cobra.Model): The original COBRA model. | |
223 bounds_path (str): Path to the CSV file containing the bounds dataset. | |
224 cell_name (str): Name of the cell, used to generate filenames for output. | |
225 | |
226 Returns: | |
227 List[pd.DataFrame]: A list of DataFrames containing statistics and analysis results. | |
228 """ | |
229 | |
230 model_input = model_input_original.copy() | |
231 bounds_df = read_dataset(bounds_path, "bounds dataset") | |
232 for rxn_index, row in bounds_df.iterrows(): | |
233 model_input.reactions.get_by_id(rxn_index).lower_bound = row.lower_bound | |
234 model_input.reactions.get_by_id(rxn_index).upper_bound = row.upper_bound | |
235 | |
236 | |
237 if ARGS.algorithm == 'OPTGP': | |
238 OPTGP_sampler(model_input, cell_name, ARGS.n_samples, ARGS.thinning, ARGS.n_batches, ARGS.seed) | |
239 | |
240 elif ARGS.algorithm == 'CBS': | |
241 CBS_sampler(model_input, cell_name, ARGS.n_samples, ARGS.n_batches, ARGS.seed) | |
242 | |
243 df_mean, df_median, df_quantiles = fluxes_statistics(cell_name, ARGS.output_types) | |
244 | |
245 if("fluxes" not in ARGS.output_types): | |
246 os.remove(ARGS.output_path + "/" + cell_name + '.csv') | |
247 | |
248 returnList = [] | |
249 returnList.append(df_mean) | |
250 returnList.append(df_median) | |
251 returnList.append(df_quantiles) | |
252 | |
253 df_pFBA, df_FVA, df_sensitivity = fluxes_analysis(model_input, cell_name, ARGS.output_type_analysis) | |
254 | |
255 if("pFBA" in ARGS.output_type_analysis): | |
256 returnList.append(df_pFBA) | |
257 if("FVA" in ARGS.output_type_analysis): | |
258 returnList.append(df_FVA) | |
259 if("sensitivity" in ARGS.output_type_analysis): | |
260 returnList.append(df_sensitivity) | |
261 | |
262 return returnList | |
263 | |
264 def fluxes_statistics(model_name: str, output_types:List)-> List[pd.DataFrame]: | |
265 """ | |
266 Computes statistics (mean, median, quantiles) for the fluxes. | |
267 | |
268 Args: | |
269 model_name (str): Name of the model, used in filename for input. | |
270 output_types (List[str]): Types of statistics to compute (mean, median, quantiles). | |
271 | |
272 Returns: | |
273 List[pd.DataFrame]: List of DataFrames containing mean, median, and quantiles statistics. | |
274 """ | |
275 | |
276 df_mean = pd.DataFrame() | |
277 df_median= pd.DataFrame() | |
278 df_quantiles= pd.DataFrame() | |
279 | |
280 df_samples = pd.read_csv(ARGS.output_path + "/" + model_name + '.csv', sep = '\t', index_col = 0).T | |
281 df_samples = df_samples.round(8) | |
282 | |
283 for output_type in output_types: | |
284 if(output_type == "mean"): | |
285 df_mean = df_samples.mean() | |
286 df_mean = df_mean.to_frame().T | |
287 df_mean = df_mean.reset_index(drop=True) | |
288 df_mean.index = [model_name] | |
289 elif(output_type == "median"): | |
290 df_median = df_samples.median() | |
291 df_median = df_median.to_frame().T | |
292 df_median = df_median.reset_index(drop=True) | |
293 df_median.index = [model_name] | |
294 elif(output_type == "quantiles"): | |
295 newRow = [] | |
296 cols = [] | |
297 for rxn in df_samples.columns: | |
298 quantiles = df_samples[rxn].quantile([0.25, 0.50, 0.75]) | |
299 newRow.append(quantiles[0.25]) | |
300 cols.append(rxn + "_q1") | |
301 newRow.append(quantiles[0.5]) | |
302 cols.append(rxn + "_q2") | |
303 newRow.append(quantiles[0.75]) | |
304 cols.append(rxn + "_q3") | |
305 df_quantiles = pd.DataFrame(columns=cols) | |
306 df_quantiles.loc[0] = newRow | |
307 df_quantiles = df_quantiles.reset_index(drop=True) | |
308 df_quantiles.index = [model_name] | |
309 | |
310 return df_mean, df_median, df_quantiles | |
311 | |
312 def fluxes_analysis(model:cobra.Model, model_name:str, output_types:List)-> List[pd.DataFrame]: | |
313 """ | |
314 Performs flux analysis including pFBA, FVA, and sensitivity analysis. | |
315 | |
316 Args: | |
317 model (cobra.Model): The COBRA model to analyze. | |
318 model_name (str): Name of the model, used in filenames for output. | |
319 output_types (List[str]): Types of analysis to perform (pFBA, FVA, sensitivity). | |
320 | |
321 Returns: | |
322 List[pd.DataFrame]: List of DataFrames containing pFBA, FVA, and sensitivity analysis results. | |
323 """ | |
324 | |
325 df_pFBA = pd.DataFrame() | |
326 df_FVA= pd.DataFrame() | |
327 df_sensitivity= pd.DataFrame() | |
328 | |
329 for output_type in output_types: | |
330 if(output_type == "pFBA"): | |
331 model.objective = "Biomass" | |
332 solution = cobra.flux_analysis.pfba(model) | |
333 fluxes = solution.fluxes | |
334 df_pFBA.loc[0,[rxn._id for rxn in model.reactions]] = fluxes.tolist() | |
335 df_pFBA = df_pFBA.reset_index(drop=True) | |
336 df_pFBA.index = [model_name] | |
337 df_pFBA = df_pFBA.astype(float).round(6) | |
338 elif(output_type == "FVA"): | |
339 fva = cobra.flux_analysis.flux_variability_analysis(model, fraction_of_optimum=0, processes=1).round(8) | |
340 columns = [] | |
341 for rxn in fva.index.to_list(): | |
342 columns.append(rxn + "_min") | |
343 columns.append(rxn + "_max") | |
344 df_FVA= pd.DataFrame(columns = columns) | |
345 for index_rxn, row in fva.iterrows(): | |
346 df_FVA.loc[0, index_rxn+ "_min"] = fva.loc[index_rxn, "minimum"] | |
347 df_FVA.loc[0, index_rxn+ "_max"] = fva.loc[index_rxn, "maximum"] | |
348 df_FVA = df_FVA.reset_index(drop=True) | |
349 df_FVA.index = [model_name] | |
350 df_FVA = df_FVA.astype(float).round(6) | |
351 elif(output_type == "sensitivity"): | |
352 model.objective = "Biomass" | |
353 solution_original = model.optimize().objective_value | |
354 reactions = model.reactions | |
355 single = cobra.flux_analysis.single_reaction_deletion(model) | |
356 newRow = [] | |
357 df_sensitivity = pd.DataFrame(columns = [rxn.id for rxn in reactions], index = [model_name]) | |
358 for rxn in reactions: | |
359 newRow.append(single.knockout[rxn.id].growth.values[0]/solution_original) | |
360 df_sensitivity.loc[model_name] = newRow | |
361 df_sensitivity = df_sensitivity.astype(float).round(6) | |
362 return df_pFBA, df_FVA, df_sensitivity | |
363 | |
364 ############################# main ########################################### | |
365 def main(args :List[str] = None) -> None: | |
366 """ | |
367 Initializes everything and sets the program in motion based on the fronted input arguments. | |
368 | |
369 Returns: | |
370 None | |
371 """ | |
372 | |
373 num_processors = cpu_count() | |
374 | |
375 global ARGS | |
376 ARGS = process_args(args) | |
377 | |
378 if not os.path.exists(ARGS.output_path): | |
379 os.makedirs(ARGS.output_path) | |
380 | |
381 #model_type :utils.Model = ARGS.model_selector | |
382 #if model_type is utils.Model.Custom: | |
383 # model = model_type.getCOBRAmodel(customPath = utils.FilePath.fromStrPath(ARGS.model), customExtension = utils.FilePath.fromStrPath(ARGS.model_name).ext) | |
384 #else: | |
385 # model = model_type.getCOBRAmodel(toolDir=ARGS.tool_dir) | |
386 | |
387 model = utils.build_cobra_model_from_csv(ARGS.model_upload) | |
388 | |
389 validation = utils.validate_model(model) | |
390 | |
391 print("\n=== VALIDAZIONE MODELLO ===") | |
392 for key, value in validation.items(): | |
393 print(f"{key}: {value}") | |
394 | |
395 #Set solver verbosity to 1 to see warning and error messages only. | |
396 model.solver.configuration.verbosity = 1 | |
397 | |
398 ARGS.bounds = ARGS.input.split(",") | |
399 ARGS.bounds_name = ARGS.names.split(",") | |
400 ARGS.output_types = ARGS.output_type.split(",") | |
401 ARGS.output_type_analysis = ARGS.output_type_analysis.split(",") | |
402 | |
403 | |
404 results = Parallel(n_jobs=num_processors)(delayed(model_sampler)(model, bounds_path, cell_name) for bounds_path, cell_name in zip(ARGS.bounds, ARGS.bounds_name)) | |
405 | |
406 all_mean = pd.concat([result[0] for result in results], ignore_index=False) | |
407 all_median = pd.concat([result[1] for result in results], ignore_index=False) | |
408 all_quantiles = pd.concat([result[2] for result in results], ignore_index=False) | |
409 | |
410 if("mean" in ARGS.output_types): | |
411 all_mean = all_mean.fillna(0.0) | |
412 all_mean = all_mean.sort_index() | |
413 write_to_file(all_mean.T, "mean", True) | |
414 | |
415 if("median" in ARGS.output_types): | |
416 all_median = all_median.fillna(0.0) | |
417 all_median = all_median.sort_index() | |
418 write_to_file(all_median.T, "median", True) | |
419 | |
420 if("quantiles" in ARGS.output_types): | |
421 all_quantiles = all_quantiles.fillna(0.0) | |
422 all_quantiles = all_quantiles.sort_index() | |
423 write_to_file(all_quantiles.T, "quantiles", True) | |
424 | |
425 index_result = 3 | |
426 if("pFBA" in ARGS.output_type_analysis): | |
427 all_pFBA = pd.concat([result[index_result] for result in results], ignore_index=False) | |
428 all_pFBA = all_pFBA.sort_index() | |
429 write_to_file(all_pFBA.T, "pFBA", True) | |
430 index_result+=1 | |
431 if("FVA" in ARGS.output_type_analysis): | |
432 all_FVA= pd.concat([result[index_result] for result in results], ignore_index=False) | |
433 all_FVA = all_FVA.sort_index() | |
434 write_to_file(all_FVA.T, "FVA", True) | |
435 index_result+=1 | |
436 if("sensitivity" in ARGS.output_type_analysis): | |
437 all_sensitivity = pd.concat([result[index_result] for result in results], ignore_index=False) | |
438 all_sensitivity = all_sensitivity.sort_index() | |
439 write_to_file(all_sensitivity.T, "sensitivity", True) | |
440 | |
441 pass | |
442 | |
443 ############################################################################## | |
444 if __name__ == "__main__": | |
445 main() |