Mercurial > repos > bimib > cobraxy
comparison COBRAxy/flux_simulation_beta.py @ 410:d660c5b03c14 draft
Uploaded
| author | francesco_lapi |
|---|---|
| date | Mon, 08 Sep 2025 17:33:52 +0000 |
| parents | |
| children | 6b015d3184ab |
comparison
equal
deleted
inserted
replaced
| 409:71850bdf9e1e | 410:d660c5b03c14 |
|---|---|
| 1 import argparse | |
| 2 import utils.general_utils as utils | |
| 3 from typing import Optional, List | |
| 4 import os | |
| 5 import numpy as np | |
| 6 import pandas as pd | |
| 7 import cobra | |
| 8 import utils.CBS_backend as CBS_backend | |
| 9 from joblib import Parallel, delayed, cpu_count | |
| 10 from cobra.sampling import OptGPSampler | |
| 11 import sys | |
| 12 | |
| 13 | |
| 14 ################################# process args ############################### | |
| 15 def process_args(args :List[str] = None) -> argparse.Namespace: | |
| 16 """ | |
| 17 Processes command-line arguments. | |
| 18 | |
| 19 Args: | |
| 20 args (list): List of command-line arguments. | |
| 21 | |
| 22 Returns: | |
| 23 Namespace: An object containing parsed arguments. | |
| 24 """ | |
| 25 parser = argparse.ArgumentParser(usage = '%(prog)s [options]', | |
| 26 description = 'process some value\'s') | |
| 27 | |
| 28 parser.add_argument("-mo", "--model_upload", type = str, | |
| 29 help = "path to input file with custom rules, if provided") | |
| 30 | |
| 31 parser.add_argument('-ol', '--out_log', | |
| 32 help = "Output log") | |
| 33 | |
| 34 parser.add_argument('-td', '--tool_dir', | |
| 35 type = str, | |
| 36 required = True, | |
| 37 help = 'your tool directory') | |
| 38 | |
| 39 parser.add_argument('-in', '--input', | |
| 40 required = True, | |
| 41 type=str, | |
| 42 help = 'inputs bounds') | |
| 43 | |
| 44 parser.add_argument('-ni', '--names', | |
| 45 required = True, | |
| 46 type=str, | |
| 47 help = 'cell names') | |
| 48 | |
| 49 parser.add_argument('-a', '--algorithm', | |
| 50 type = str, | |
| 51 choices = ['OPTGP', 'CBS'], | |
| 52 required = True, | |
| 53 help = 'choose sampling algorithm') | |
| 54 | |
| 55 parser.add_argument('-th', '--thinning', | |
| 56 type = int, | |
| 57 default= 100, | |
| 58 required=False, | |
| 59 help = 'choose thinning') | |
| 60 | |
| 61 parser.add_argument('-ns', '--n_samples', | |
| 62 type = int, | |
| 63 required = True, | |
| 64 help = 'choose how many samples') | |
| 65 | |
| 66 parser.add_argument('-sd', '--seed', | |
| 67 type = int, | |
| 68 required = True, | |
| 69 help = 'seed') | |
| 70 | |
| 71 parser.add_argument('-nb', '--n_batches', | |
| 72 type = int, | |
| 73 required = True, | |
| 74 help = 'choose how many batches') | |
| 75 | |
| 76 parser.add_argument('-ot', '--output_type', | |
| 77 type = str, | |
| 78 required = True, | |
| 79 help = 'output type') | |
| 80 | |
| 81 parser.add_argument('-ota', '--output_type_analysis', | |
| 82 type = str, | |
| 83 required = False, | |
| 84 help = 'output type analysis') | |
| 85 | |
| 86 parser.add_argument('-idop', '--output_path', | |
| 87 type = str, | |
| 88 default='flux_simulation', | |
| 89 help = 'output path for maps') | |
| 90 | |
| 91 ARGS = parser.parse_args(args) | |
| 92 return ARGS | |
| 93 | |
| 94 ########################### warning ########################################### | |
| 95 def warning(s :str) -> None: | |
| 96 """ | |
| 97 Log a warning message to an output log file and print it to the console. | |
| 98 | |
| 99 Args: | |
| 100 s (str): The warning message to be logged and printed. | |
| 101 | |
| 102 Returns: | |
| 103 None | |
| 104 """ | |
| 105 with open(ARGS.out_log, 'a') as log: | |
| 106 log.write(s + "\n\n") | |
| 107 print(s) | |
| 108 | |
| 109 | |
| 110 def write_to_file(dataset: pd.DataFrame, name: str, keep_index:bool=False)->None: | |
| 111 dataset.index.name = 'Reactions' | |
| 112 dataset.to_csv(ARGS.output_path + "/" + name + ".csv", sep = '\t', index = keep_index) | |
| 113 | |
| 114 ############################ dataset input #################################### | |
| 115 def read_dataset(data :str, name :str) -> pd.DataFrame: | |
| 116 """ | |
| 117 Read a dataset from a CSV file and return it as a pandas DataFrame. | |
| 118 | |
| 119 Args: | |
| 120 data (str): Path to the CSV file containing the dataset. | |
| 121 name (str): Name of the dataset, used in error messages. | |
| 122 | |
| 123 Returns: | |
| 124 pandas.DataFrame: DataFrame containing the dataset. | |
| 125 | |
| 126 Raises: | |
| 127 pd.errors.EmptyDataError: If the CSV file is empty. | |
| 128 sys.exit: If the CSV file has the wrong format, the execution is aborted. | |
| 129 """ | |
| 130 try: | |
| 131 dataset = pd.read_csv(data, sep = '\t', header = 0, index_col=0, engine='python') | |
| 132 except pd.errors.EmptyDataError: | |
| 133 sys.exit('Execution aborted: wrong format of ' + name + '\n') | |
| 134 if len(dataset.columns) < 2: | |
| 135 sys.exit('Execution aborted: wrong format of ' + name + '\n') | |
| 136 return dataset | |
| 137 | |
| 138 | |
| 139 | |
| 140 def OPTGP_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, thinning:int=100, n_batches:int=1, seed:int=0)-> None: | |
| 141 """ | |
| 142 Samples from the OPTGP (Optimal Global Perturbation) algorithm and saves the results to CSV files. | |
| 143 | |
| 144 Args: | |
| 145 model (cobra.Model): The COBRA model to sample from. | |
| 146 model_name (str): The name of the model, used in naming output files. | |
| 147 n_samples (int, optional): Number of samples per batch. Default is 1000. | |
| 148 thinning (int, optional): Thinning parameter for the sampler. Default is 100. | |
| 149 n_batches (int, optional): Number of batches to run. Default is 1. | |
| 150 seed (int, optional): Random seed for reproducibility. Default is 0. | |
| 151 | |
| 152 Returns: | |
| 153 None | |
| 154 """ | |
| 155 | |
| 156 for i in range(0, n_batches): | |
| 157 optgp = OptGPSampler(model, thinning, seed) | |
| 158 samples = optgp.sample(n_samples) | |
| 159 samples.to_csv(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_OPTGP.csv', index=False) | |
| 160 seed+=1 | |
| 161 samplesTotal = pd.DataFrame() | |
| 162 for i in range(0, n_batches): | |
| 163 samples_batch = pd.read_csv(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_OPTGP.csv') | |
| 164 samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True) | |
| 165 | |
| 166 write_to_file(samplesTotal.T, model_name, True) | |
| 167 | |
| 168 for i in range(0, n_batches): | |
| 169 os.remove(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_OPTGP.csv') | |
| 170 pass | |
| 171 | |
| 172 | |
| 173 def CBS_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, n_batches:int=1, seed:int=0)-> None: | |
| 174 """ | |
| 175 Samples using the CBS (Constraint-based Sampling) algorithm and saves the results to CSV files. | |
| 176 | |
| 177 Args: | |
| 178 model (cobra.Model): The COBRA model to sample from. | |
| 179 model_name (str): The name of the model, used in naming output files. | |
| 180 n_samples (int, optional): Number of samples per batch. Default is 1000. | |
| 181 n_batches (int, optional): Number of batches to run. Default is 1. | |
| 182 seed (int, optional): Random seed for reproducibility. Default is 0. | |
| 183 | |
| 184 Returns: | |
| 185 None | |
| 186 """ | |
| 187 | |
| 188 df_FVA = cobra.flux_analysis.flux_variability_analysis(model,fraction_of_optimum=0).round(6) | |
| 189 | |
| 190 df_coefficients = CBS_backend.randomObjectiveFunction(model, n_samples*n_batches, df_FVA, seed=seed) | |
| 191 | |
| 192 for i in range(0, n_batches): | |
| 193 samples = pd.DataFrame(columns =[reaction.id for reaction in model.reactions], index = range(n_samples)) | |
| 194 try: | |
| 195 CBS_backend.randomObjectiveFunctionSampling(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], samples) | |
| 196 except Exception as e: | |
| 197 utils.logWarning( | |
| 198 "Warning: GLPK solver has failed for " + model_name + ". Trying with COBRA interface. Error:" + str(e), | |
| 199 ARGS.out_log) | |
| 200 CBS_backend.randomObjectiveFunctionSampling_cobrapy(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], | |
| 201 samples) | |
| 202 utils.logWarning(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv', ARGS.out_log) | |
| 203 samples.to_csv(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv', index=False) | |
| 204 | |
| 205 samplesTotal = pd.DataFrame() | |
| 206 for i in range(0, n_batches): | |
| 207 samples_batch = pd.read_csv(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv') | |
| 208 samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True) | |
| 209 | |
| 210 write_to_file(samplesTotal.T, model_name, True) | |
| 211 | |
| 212 for i in range(0, n_batches): | |
| 213 os.remove(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv') | |
| 214 pass | |
| 215 | |
| 216 | |
| 217 def model_sampler(model_input_original:cobra.Model, bounds_path:str, cell_name:str)-> List[pd.DataFrame]: | |
| 218 """ | |
| 219 Prepares the model with bounds from the dataset and performs sampling and analysis based on the selected algorithm. | |
| 220 | |
| 221 Args: | |
| 222 model_input_original (cobra.Model): The original COBRA model. | |
| 223 bounds_path (str): Path to the CSV file containing the bounds dataset. | |
| 224 cell_name (str): Name of the cell, used to generate filenames for output. | |
| 225 | |
| 226 Returns: | |
| 227 List[pd.DataFrame]: A list of DataFrames containing statistics and analysis results. | |
| 228 """ | |
| 229 | |
| 230 model_input = model_input_original.copy() | |
| 231 bounds_df = read_dataset(bounds_path, "bounds dataset") | |
| 232 for rxn_index, row in bounds_df.iterrows(): | |
| 233 model_input.reactions.get_by_id(rxn_index).lower_bound = row.lower_bound | |
| 234 model_input.reactions.get_by_id(rxn_index).upper_bound = row.upper_bound | |
| 235 | |
| 236 | |
| 237 if ARGS.algorithm == 'OPTGP': | |
| 238 OPTGP_sampler(model_input, cell_name, ARGS.n_samples, ARGS.thinning, ARGS.n_batches, ARGS.seed) | |
| 239 | |
| 240 elif ARGS.algorithm == 'CBS': | |
| 241 CBS_sampler(model_input, cell_name, ARGS.n_samples, ARGS.n_batches, ARGS.seed) | |
| 242 | |
| 243 df_mean, df_median, df_quantiles = fluxes_statistics(cell_name, ARGS.output_types) | |
| 244 | |
| 245 if("fluxes" not in ARGS.output_types): | |
| 246 os.remove(ARGS.output_path + "/" + cell_name + '.csv') | |
| 247 | |
| 248 returnList = [] | |
| 249 returnList.append(df_mean) | |
| 250 returnList.append(df_median) | |
| 251 returnList.append(df_quantiles) | |
| 252 | |
| 253 df_pFBA, df_FVA, df_sensitivity = fluxes_analysis(model_input, cell_name, ARGS.output_type_analysis) | |
| 254 | |
| 255 if("pFBA" in ARGS.output_type_analysis): | |
| 256 returnList.append(df_pFBA) | |
| 257 if("FVA" in ARGS.output_type_analysis): | |
| 258 returnList.append(df_FVA) | |
| 259 if("sensitivity" in ARGS.output_type_analysis): | |
| 260 returnList.append(df_sensitivity) | |
| 261 | |
| 262 return returnList | |
| 263 | |
| 264 def fluxes_statistics(model_name: str, output_types:List)-> List[pd.DataFrame]: | |
| 265 """ | |
| 266 Computes statistics (mean, median, quantiles) for the fluxes. | |
| 267 | |
| 268 Args: | |
| 269 model_name (str): Name of the model, used in filename for input. | |
| 270 output_types (List[str]): Types of statistics to compute (mean, median, quantiles). | |
| 271 | |
| 272 Returns: | |
| 273 List[pd.DataFrame]: List of DataFrames containing mean, median, and quantiles statistics. | |
| 274 """ | |
| 275 | |
| 276 df_mean = pd.DataFrame() | |
| 277 df_median= pd.DataFrame() | |
| 278 df_quantiles= pd.DataFrame() | |
| 279 | |
| 280 df_samples = pd.read_csv(ARGS.output_path + "/" + model_name + '.csv', sep = '\t', index_col = 0).T | |
| 281 df_samples = df_samples.round(8) | |
| 282 | |
| 283 for output_type in output_types: | |
| 284 if(output_type == "mean"): | |
| 285 df_mean = df_samples.mean() | |
| 286 df_mean = df_mean.to_frame().T | |
| 287 df_mean = df_mean.reset_index(drop=True) | |
| 288 df_mean.index = [model_name] | |
| 289 elif(output_type == "median"): | |
| 290 df_median = df_samples.median() | |
| 291 df_median = df_median.to_frame().T | |
| 292 df_median = df_median.reset_index(drop=True) | |
| 293 df_median.index = [model_name] | |
| 294 elif(output_type == "quantiles"): | |
| 295 newRow = [] | |
| 296 cols = [] | |
| 297 for rxn in df_samples.columns: | |
| 298 quantiles = df_samples[rxn].quantile([0.25, 0.50, 0.75]) | |
| 299 newRow.append(quantiles[0.25]) | |
| 300 cols.append(rxn + "_q1") | |
| 301 newRow.append(quantiles[0.5]) | |
| 302 cols.append(rxn + "_q2") | |
| 303 newRow.append(quantiles[0.75]) | |
| 304 cols.append(rxn + "_q3") | |
| 305 df_quantiles = pd.DataFrame(columns=cols) | |
| 306 df_quantiles.loc[0] = newRow | |
| 307 df_quantiles = df_quantiles.reset_index(drop=True) | |
| 308 df_quantiles.index = [model_name] | |
| 309 | |
| 310 return df_mean, df_median, df_quantiles | |
| 311 | |
| 312 def fluxes_analysis(model:cobra.Model, model_name:str, output_types:List)-> List[pd.DataFrame]: | |
| 313 """ | |
| 314 Performs flux analysis including pFBA, FVA, and sensitivity analysis. | |
| 315 | |
| 316 Args: | |
| 317 model (cobra.Model): The COBRA model to analyze. | |
| 318 model_name (str): Name of the model, used in filenames for output. | |
| 319 output_types (List[str]): Types of analysis to perform (pFBA, FVA, sensitivity). | |
| 320 | |
| 321 Returns: | |
| 322 List[pd.DataFrame]: List of DataFrames containing pFBA, FVA, and sensitivity analysis results. | |
| 323 """ | |
| 324 | |
| 325 df_pFBA = pd.DataFrame() | |
| 326 df_FVA= pd.DataFrame() | |
| 327 df_sensitivity= pd.DataFrame() | |
| 328 | |
| 329 for output_type in output_types: | |
| 330 if(output_type == "pFBA"): | |
| 331 model.objective = "Biomass" | |
| 332 solution = cobra.flux_analysis.pfba(model) | |
| 333 fluxes = solution.fluxes | |
| 334 df_pFBA.loc[0,[rxn._id for rxn in model.reactions]] = fluxes.tolist() | |
| 335 df_pFBA = df_pFBA.reset_index(drop=True) | |
| 336 df_pFBA.index = [model_name] | |
| 337 df_pFBA = df_pFBA.astype(float).round(6) | |
| 338 elif(output_type == "FVA"): | |
| 339 fva = cobra.flux_analysis.flux_variability_analysis(model, fraction_of_optimum=0, processes=1).round(8) | |
| 340 columns = [] | |
| 341 for rxn in fva.index.to_list(): | |
| 342 columns.append(rxn + "_min") | |
| 343 columns.append(rxn + "_max") | |
| 344 df_FVA= pd.DataFrame(columns = columns) | |
| 345 for index_rxn, row in fva.iterrows(): | |
| 346 df_FVA.loc[0, index_rxn+ "_min"] = fva.loc[index_rxn, "minimum"] | |
| 347 df_FVA.loc[0, index_rxn+ "_max"] = fva.loc[index_rxn, "maximum"] | |
| 348 df_FVA = df_FVA.reset_index(drop=True) | |
| 349 df_FVA.index = [model_name] | |
| 350 df_FVA = df_FVA.astype(float).round(6) | |
| 351 elif(output_type == "sensitivity"): | |
| 352 model.objective = "Biomass" | |
| 353 solution_original = model.optimize().objective_value | |
| 354 reactions = model.reactions | |
| 355 single = cobra.flux_analysis.single_reaction_deletion(model) | |
| 356 newRow = [] | |
| 357 df_sensitivity = pd.DataFrame(columns = [rxn.id for rxn in reactions], index = [model_name]) | |
| 358 for rxn in reactions: | |
| 359 newRow.append(single.knockout[rxn.id].growth.values[0]/solution_original) | |
| 360 df_sensitivity.loc[model_name] = newRow | |
| 361 df_sensitivity = df_sensitivity.astype(float).round(6) | |
| 362 return df_pFBA, df_FVA, df_sensitivity | |
| 363 | |
| 364 ############################# main ########################################### | |
| 365 def main(args :List[str] = None) -> None: | |
| 366 """ | |
| 367 Initializes everything and sets the program in motion based on the fronted input arguments. | |
| 368 | |
| 369 Returns: | |
| 370 None | |
| 371 """ | |
| 372 | |
| 373 num_processors = cpu_count() | |
| 374 | |
| 375 global ARGS | |
| 376 ARGS = process_args(args) | |
| 377 | |
| 378 if not os.path.exists(ARGS.output_path): | |
| 379 os.makedirs(ARGS.output_path) | |
| 380 | |
| 381 #model_type :utils.Model = ARGS.model_selector | |
| 382 #if model_type is utils.Model.Custom: | |
| 383 # model = model_type.getCOBRAmodel(customPath = utils.FilePath.fromStrPath(ARGS.model), customExtension = utils.FilePath.fromStrPath(ARGS.model_name).ext) | |
| 384 #else: | |
| 385 # model = model_type.getCOBRAmodel(toolDir=ARGS.tool_dir) | |
| 386 | |
| 387 model = utils.build_cobra_model_from_csv(ARGS.model_upload) | |
| 388 | |
| 389 validation = utils.validate_model(model) | |
| 390 | |
| 391 print("\n=== VALIDAZIONE MODELLO ===") | |
| 392 for key, value in validation.items(): | |
| 393 print(f"{key}: {value}") | |
| 394 | |
| 395 #Set solver verbosity to 1 to see warning and error messages only. | |
| 396 model.solver.configuration.verbosity = 1 | |
| 397 | |
| 398 ARGS.bounds = ARGS.input.split(",") | |
| 399 ARGS.bounds_name = ARGS.names.split(",") | |
| 400 ARGS.output_types = ARGS.output_type.split(",") | |
| 401 ARGS.output_type_analysis = ARGS.output_type_analysis.split(",") | |
| 402 | |
| 403 | |
| 404 results = Parallel(n_jobs=num_processors)(delayed(model_sampler)(model, bounds_path, cell_name) for bounds_path, cell_name in zip(ARGS.bounds, ARGS.bounds_name)) | |
| 405 | |
| 406 all_mean = pd.concat([result[0] for result in results], ignore_index=False) | |
| 407 all_median = pd.concat([result[1] for result in results], ignore_index=False) | |
| 408 all_quantiles = pd.concat([result[2] for result in results], ignore_index=False) | |
| 409 | |
| 410 if("mean" in ARGS.output_types): | |
| 411 all_mean = all_mean.fillna(0.0) | |
| 412 all_mean = all_mean.sort_index() | |
| 413 write_to_file(all_mean.T, "mean", True) | |
| 414 | |
| 415 if("median" in ARGS.output_types): | |
| 416 all_median = all_median.fillna(0.0) | |
| 417 all_median = all_median.sort_index() | |
| 418 write_to_file(all_median.T, "median", True) | |
| 419 | |
| 420 if("quantiles" in ARGS.output_types): | |
| 421 all_quantiles = all_quantiles.fillna(0.0) | |
| 422 all_quantiles = all_quantiles.sort_index() | |
| 423 write_to_file(all_quantiles.T, "quantiles", True) | |
| 424 | |
| 425 index_result = 3 | |
| 426 if("pFBA" in ARGS.output_type_analysis): | |
| 427 all_pFBA = pd.concat([result[index_result] for result in results], ignore_index=False) | |
| 428 all_pFBA = all_pFBA.sort_index() | |
| 429 write_to_file(all_pFBA.T, "pFBA", True) | |
| 430 index_result+=1 | |
| 431 if("FVA" in ARGS.output_type_analysis): | |
| 432 all_FVA= pd.concat([result[index_result] for result in results], ignore_index=False) | |
| 433 all_FVA = all_FVA.sort_index() | |
| 434 write_to_file(all_FVA.T, "FVA", True) | |
| 435 index_result+=1 | |
| 436 if("sensitivity" in ARGS.output_type_analysis): | |
| 437 all_sensitivity = pd.concat([result[index_result] for result in results], ignore_index=False) | |
| 438 all_sensitivity = all_sensitivity.sort_index() | |
| 439 write_to_file(all_sensitivity.T, "sensitivity", True) | |
| 440 | |
| 441 pass | |
| 442 | |
| 443 ############################################################################## | |
| 444 if __name__ == "__main__": | |
| 445 main() |
