Mercurial > repos > bimib > marea_2_0
view marea_2_0/utils/flux_sampling.py @ 229:8ea5adb25828 draft
Uploaded
author | luca_milaz |
---|---|
date | Mon, 08 Jul 2024 11:59:56 +0000 |
parents | 35bc34faa7a4 |
children |
line wrap: on
line source
import argparse import utils.general_utils as utils from typing import Optional, List import os import numpy as np import pandas as pd import cobra import utils.CBS_backend as CBS_backend from joblib import Parallel, delayed, cpu_count from cobra.sampling import OptGPSampler import sys ################################# process args ############################### def process_args(args :List[str]) -> argparse.Namespace: """ Processes command-line arguments. Args: args (list): List of command-line arguments. Returns: Namespace: An object containing parsed arguments. """ parser = argparse.ArgumentParser(usage = '%(prog)s [options]', description = 'process some value\'s') parser.add_argument('-ol', '--out_log', help = "Output log") parser.add_argument('-td', '--tool_dir', type = str, required = True, help = 'your tool directory') parser.add_argument('-in', '--input', required = True, type=str, help = 'inputs model') parser.add_argument('-nm', '--name', required = True, type=str, help = 'inputs model ids') parser.add_argument('-a', '--algorithm', type = str, choices = ['OPTGP', 'CBS'], required = True, help = 'choose sampling algorithm') parser.add_argument('-th', '--thinning', type = int, default= 100, required=False, help = 'choose thinning') parser.add_argument('-ns', '--n_samples', type = int, required = True, help = 'choose how many samples') parser.add_argument('-sd', '--seed', type = int, required = True, help = 'seed') parser.add_argument('-nb', '--n_batches', type = int, required = True, help = 'choose how many batches') parser.add_argument('-ot', '--output_type', type = str, required = True, help = 'output type') ARGS = parser.parse_args() return ARGS ########################### warning ########################################### def warning(s :str) -> None: """ Log a warning message to an output log file and print it to the console. Args: s (str): The warning message to be logged and printed. Returns: None """ with open(ARGS.out_log, 'a') as log: log.write(s + "\n\n") print(s) def write_to_file(dataset: pd.DataFrame, name: str, keep_index:bool=False)->None: dataset.to_csv(ARGS.output_folder + name + ".csv", sep = '\t', index = keep_index) def OPTGP_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, thinning:int=100, n_batches:int=1, seed:int=0)-> None: for i in range(0, n_batches): optgp = OptGPSampler(model, thinning, seed) samples = optgp.sample(n_samples) samples.to_csv(ARGS.output_folder + model_name + '_'+ str(i)+'_OPTGP.csv', index=False) seed+=1 samplesTotal = pd.DataFrame() for i in range(0, n_batches): samples_batch = pd.read_csv(ARGS.output_folder + model_name + '_'+ str(i)+'_OPTGP.csv') samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True) write_to_file(samplesTotal, model_name) for i in range(0, n_batches): os.remove(ARGS.output_folder + model_name + '_'+ str(i)+'_OPTGP.csv') pass def CBS_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, n_batches:int=1, seed:int=0)-> None: df_FVA = cobra.flux_analysis.flux_variability_analysis(model,fraction_of_optimum=0).round(6) df_coefficients = CBS_backend.randomObjectiveFunction(model, n_samples*n_batches, df_FVA, seed=seed) for i in range(0, n_batches): samples = pd.DataFrame(columns =[reaction.id for reaction in model.reactions], index = range(n_samples)) try: CBS_backend.randomObjectiveFunctionSampling(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], samples) except Exception as e: utils.logWarning( "Warning: GLPK solver has failed for " + model_name + ". Trying with COBRA interface. Error:" + str(e), ARGS.out_log) CBS_backend.randomObjectiveFunctionSampling_cobrapy(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], samples) samples.to_csv(ARGS.output_folder + model_name + '_'+ str(i)+'_CBS.csv', index=False) samplesTotal = pd.DataFrame() for i in range(0, n_batches): samples_batch = pd.read_csv(ARGS.output_folder + model_name + '_'+ str(i)+'_CBS.csv') samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True) write_to_file(samplesTotal, model_name) for i in range(0, n_batches): os.remove(ARGS.output_folder + model_name + '_'+ str(i)+'_CBS.csv') pass def model_sampler(model_input:str, model_name:str)-> List[pd.DataFrame]: model = load_custom_model( utils.FilePath.fromStrPath(model_input), utils.FilePath.fromStrPath(model_name).ext) utils.logWarning( "Sampling model: " + model_name, ARGS.out_log) name = model_name.split('.')[0] if ARGS.algorithm == 'OPTGP': OPTGP_sampler(model, name, ARGS.n_samples, ARGS.thinning, ARGS.n_batches, ARGS.seed) elif ARGS.algorithm == 'CBS': CBS_sampler(model, name, ARGS.n_samples, ARGS.n_batches, ARGS.seed) df_mean, df_median, df_quantiles = fluxes_statistics(name, ARGS.output_types) if("fluxes" not in ARGS.output_types): os.remove(ARGS.output_folder + name + '.csv') return df_mean, df_median, df_quantiles def fluxes_statistics(model_name: str, output_types:List)-> List[pd.DataFrame]: df_mean = pd.DataFrame() df_median= pd.DataFrame() df_quantiles= pd.DataFrame() df_samples = pd.read_csv(ARGS.output_folder + model_name + '.csv', sep = '\t') for output_type in output_types: if(output_type == "mean"): df_mean = df_samples.mean() df_mean = df_mean.to_frame().T df_mean = df_mean.reset_index(drop=True) df_mean.index = [model_name] elif(output_type == "median"): df_median = df_samples.median() df_median = df_median.to_frame().T df_median = df_median.reset_index(drop=True) df_median.index = [model_name] elif(output_type == "quantiles"): df_quantile = df_samples.quantile([0.25, 0.5, 0.75]) newRow = [] cols = [] for rxn in df_quantile.columns: newRow.append(df_quantile[rxn].loc[0.25]) cols.append(rxn + "_q1") newRow.append(df_quantile[rxn].loc[0.5]) cols.append(rxn + "_q2") newRow.append(df_quantile[rxn].loc[0.75]) cols.append(rxn + "_q3") df_quantiles = pd.DataFrame(columns=cols) df_quantiles.loc[0] = newRow df_quantiles = df_quantiles.reset_index(drop=True) df_quantiles.index = [model_name] return df_mean, df_median, df_quantiles ################################- INPUT DATA LOADING -################################ def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model: """ Loads a custom model from a file, either in JSON or XML format. Args: file_path : The path to the file containing the custom model. ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour. Raises: DataErr : if the file is in an invalid format or cannot be opened for whatever reason. Returns: cobra.Model : the model, if successfully opened. """ ext = ext if ext else file_path.ext try: if ext is utils.FileFormat.XML: return cobra.io.read_sbml_model(file_path.show()) if ext is utils.FileFormat.JSON: return cobra.io.load_json_model(file_path.show()) except Exception as e: raise utils.DataErr(file_path, e.__str__()) raise utils.DataErr(file_path, f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.") ############################# main ########################################### def main() -> None: """ Initializes everything and sets the program in motion based on the fronted input arguments. Returns: None """ if not os.path.exists('flux_sampling'): os.makedirs('flux_sampling') num_processors = cpu_count() global ARGS ARGS = process_args(sys.argv) ARGS.output_folder = 'flux_sampling/' utils.logWarning( ARGS.output_type, ARGS.out_log) models_input = ARGS.input.split(",") models_name = ARGS.name.split(",") ARGS.output_types = ARGS.output_type.split(",") results = Parallel(n_jobs=num_processors)(delayed(model_sampler)(model_input, model_name) for model_input, model_name in zip(models_input, models_name)) all_mean = pd.concat([result[0] for result in results], ignore_index=False) all_median = pd.concat([result[1] for result in results], ignore_index=False) all_quantiles = pd.concat([result[2] for result in results], ignore_index=False) if("mean" in ARGS.output_types): all_mean = all_mean.fillna(0.0) all_mean = all_mean.sort_index() write_to_file(all_mean, "mean", True) if("median" in ARGS.output_types): all_median = all_median.fillna(0.0) all_median = all_median.sort_index() write_to_file(all_median, "median", True) if("quantiles" in ARGS.output_types): all_quantiles = all_quantiles.fillna(0.0) all_quantiles = all_quantiles.sort_index() write_to_file(all_quantiles, "quantiles", True) pass ############################################################################## if __name__ == "__main__": main()