Mercurial > repos > bimib > marea_2_0
view marea_2_0/flux_sampling.py @ 174:ea089c02b73e draft
Uploaded
author | luca_milaz |
---|---|
date | Wed, 03 Jul 2024 18:48:53 +0000 |
parents | 82a1a4cfe1e0 |
children | d58974850ed0 |
line wrap: on
line source
import argparse import utils.general_utils as utils import utils.rule_parsing as rulesUtils from typing import Optional, Tuple, Union, Dict, List import utils.reaction_parsing as reactionUtils import os import numpy as np import pandas as pd import cobra import CBS_backend from joblib import Parallel, delayed, cpu_count from cobra.sampling import OptGPSampler #import utils.flux_analysis as flux_analysis import sys ################################# process args ############################### def process_args(args :List[str]) -> argparse.Namespace: """ Processes command-line arguments. Args: args (list): List of command-line arguments. Returns: Namespace: An object containing parsed arguments. """ parser = argparse.ArgumentParser(usage = '%(prog)s [options]', description = 'process some value\'s') parser.add_argument('-ol', '--out_log', help = "Output log") parser.add_argument('-td', '--tool_dir', type = str, required = True, help = 'your tool directory') parser.add_argument( "-of", "--output_format", type = utils.FileFormat.fromExt, choices = [utils.FileFormat.CSV, utils.FileFormat.PICKLE], required = True, help = "Extension of all output files") parser.add_argument('-in', '--input', required = True, type=str, help = 'inputs model') parser.add_argument('-nm', '--name', required = True, type=str, help = 'inputs model ids') parser.add_argument('-a', '--algorithm', type = str, choices = ['OPTGP', 'CBS'], required = True, help = 'choose sampling algorithm') parser.add_argument('-th', '--thinning', type = int, default= 100, required=False, help = 'choose thinning') parser.add_argument('-ns', '--n_samples', type = int, required = True, help = 'choose how many samples') parser.add_argument('-sd', '--seed', type = int, required = True, help = 'seed') parser.add_argument('-nb', '--n_batches', type = int, required = True, help = 'choose how many batches') ARGS = parser.parse_args() return ARGS ########################### warning ########################################### def warning(s :str) -> None: """ Log a warning message to an output log file and print it to the console. Args: s (str): The warning message to be logged and printed. Returns: None """ with open(ARGS.out_log, 'a') as log: log.write(s + "\n\n") print(s) def write_to_file(dataset: pd.DataFrame, name: str)->None: if ARGS.output_format is utils.FileFormat.PICKLE: save_path = utils.FilePath(name, ARGS.output_format, prefix = ARGS.output_folder) utils.writePickle(save_path, dataset) elif ARGS.output_format is utils.FileFormat.CSV: dataset.to_csv(ARGS.output_folder + name + ".csv", sep = '\t', index = False) def OPTGP_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, thinning:int=100, n_batches:int=1, seed:int=0)-> None: if not os.path.exists(ARGS.output_folder + "OPTGP/"): os.makedirs(ARGS.output_folder + "OPTGP/") for i in range(0, n_batches): optgp = OptGPSampler(model, thinning, seed) samples = optgp.sample(n_samples) samples.to_csv(ARGS.output_folder + "OPTGP/" + ARGS.model_name + '_'+ str(i)+'.csv') i+=1 seed+=1 samplesTotal = pd.DataFrame() for i in range(0, n_batches): samples_batch = pd.read_csv(ARGS.output_folder + "OPTGP/" + ARGS.model_name + '_'+ str(i)+'.csv') samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True) write_to_file(samplesTotal, ARGS.output_folder + "OPTGP/" + ARGS.model_name) for i in range(0, n_batches): os.remove(ARGS.output_folder + "OPTGP/" + ARGS.model_name + '_'+ str(i)+'.csv') pass def CBS_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, n_batches:int=1, seed:int=0)-> None: df_FVA = cobra.flux_analysis.flux_variability_analysis(model,fraction_of_optimum=0).round(6) df_coefficients = CBS_backend.randomObjectiveFunction(model, n_samples*n_batches, df_FVA, seed=seed) for i in range(0, n_batches): samples = pd.DataFrame(columns =[reaction.id for reaction in model.reactions], index = range(n_samples)) try: CBS_backend.randomObjectiveFunctionSampling(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], samples) except Exception as e: utils.logWarning( "Warning: GLPK solver has failed for " + model_name + ". Trying with COBRA interface.", ARGS.out_log) CBS_backend.randomObjectiveFunctionSampling_cobrapy(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], samples) samples.to_csv(ARGS.output_folder + model_name + '_'+ str(i)+'.csv') samplesTotal = pd.DataFrame() for i in range(0, n_batches): samples_batch = pd.read_csv(ARGS.output_folder + model_name + '_'+ str(i)+'.csv') samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True) write_to_file(samplesTotal, model_name) for i in range(0, n_batches): os.remove(ARGS.output_folder + model_name + '_'+ str(i)+'.csv') pass def model_sampler(model_input:str, model_name:str)->None: model = load_custom_model( utils.FilePath.fromStrPath(model_input), utils.FilePath.fromStrPath(model_name).ext) utils.logWarning( "Sampling model: " + model_name, ARGS.out_log) name = model_name.split('.')[0] if ARGS.algorithm == 'OPTGP': OPTGP_sampler(model, name, ARGS.n_samples, ARGS.thinning, ARGS.n_batches, ARGS.seed) elif ARGS.algorithm == 'CBS': CBS_sampler(model, name, ARGS.n_samples, ARGS.n_batches, ARGS.seed) pass ################################- INPUT DATA LOADING -################################ def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model: """ Loads a custom model from a file, either in JSON or XML format. Args: file_path : The path to the file containing the custom model. ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour. Raises: DataErr : if the file is in an invalid format or cannot be opened for whatever reason. Returns: cobra.Model : the model, if successfully opened. """ ext = ext if ext else file_path.ext try: if ext is utils.FileFormat.XML: return cobra.io.read_sbml_model(file_path.show()) if ext is utils.FileFormat.JSON: return cobra.io.load_json_model(file_path.show()) except Exception as e: raise utils.DataErr(file_path, e.__str__()) raise utils.DataErr(file_path, f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.") ############################# main ########################################### def main() -> None: """ Initializes everything and sets the program in motion based on the fronted input arguments. Returns: None """ if not os.path.exists('flux_sampling'): os.makedirs('flux_sampling') num_processors = cpu_count() global ARGS ARGS = process_args(sys.argv) ARGS.output_folder = 'flux_sampling/' utils.logWarning( ARGS.input, ARGS.out_log) utils.logWarning( ARGS.name, ARGS.out_log) models_input = ARGS.input.split(",") models_name = ARGS.name.split(",") #Parallel(n_jobs=num_processors)(delayed(model_sampler)(model_input, model_name) for model_input, model_name in zip(models_input, models_name)) model_sampler(models_input[0], models_name[0]) ############################################################################## if __name__ == "__main__": main()