Mercurial > repos > bimib > marea_2_0
view marea_2_0/flux_sampling.py @ 146:a7df92bbb04f draft
Uploaded
author | luca_milaz |
---|---|
date | Tue, 02 Jul 2024 22:48:45 +0000 |
parents | c2f20844b3e3 |
children | 54e018e82447 |
line wrap: on
line source
import argparse import utils.general_utils as utils import utils.rule_parsing as rulesUtils from typing import Optional, Tuple, Union, Dict, List import utils.reaction_parsing as reactionUtils import os import numpy as np import pandas as pd import cobra from cobra.sampling import OptGPSampler #import CBS_backend #import utils.flux_analysis as flux_analysis import sys ################################# process args ############################### def process_args(args :List[str]) -> argparse.Namespace: """ Processes command-line arguments. Args: args (list): List of command-line arguments. Returns: Namespace: An object containing parsed arguments. """ parser = argparse.ArgumentParser(usage = '%(prog)s [options]', description = 'process some value\'s') parser.add_argument('-ol', '--out_log', help = "Output log") parser.add_argument('-td', '--tool_dir', type = str, required = True, help = 'your tool directory') parser.add_argument( "-of", "--output_format", type = utils.FileFormat.fromExt, default = utils.FileFormat.PICKLE, choices = [utils.FileFormat.CSV, utils.FileFormat.PICKLE], required = True, help = "Extension of all output files") parser.add_argument('-in', '--inputs', required = True, type=str, help = 'inputs model') parser.add_argument('-in', '--names', required = True, type=str, help = 'inputs model ids') parser.add_argument('-a', '--algorithm', type = str, choices = ['OPTGP', 'CBS'], default = 'OPTGP', help = 'choose sampling algorithm') parser.add_argument('-th', '--thinning', type = int, default = 100, help = 'choose thinning') parser.add_argument('-ns', '--n_samples', type = int, default = 1000, help = 'choose how many samples') parser.add_argument('-sd', '--seed', type = int, default = 0, help = 'seed') parser.add_argument('-nb', '--n_batches', type = int, default = 1, help = 'choose how many batches') ARGS = parser.parse_args() return ARGS ########################### warning ########################################### def warning(s :str) -> None: """ Log a warning message to an output log file and print it to the console. Args: s (str): The warning message to be logged and printed. Returns: None """ with open(ARGS.out_log, 'a') as log: log.write(s + "\n\n") print(s) def write_to_file(dataset: pd.DataFrame, dest: str)->None: if ARGS.output_format is utils.FileFormat.PICKLE: utils.writePickle(dest, dataset) elif ARGS.output_format is utils.FileFormat.CSV: dataset.to_csv(dest, sep = '\t', index = False) def OPTGP_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, thinning:int=100, n_batches:int=1, seed:int=0)-> None: if not os.path.exists(ARGS.output_folder + "OPTGP/"): os.makedirs(ARGS.output_folder + "OPTGP/") for i in range(0, n_batches): optgp = OptGPSampler(model, thinning, seed) samples = optgp.sample(n_samples) samples.to_csv(ARGS.output_folder + "OPTGP/" + ARGS.model_name + '_'+ str(i)+'.csv') i+=1 seed+=1 samplesTotal = pd.DataFrame() for i in range(0, n_batches): samples_batch = pd.read_csv(ARGS.output_folder + "OPTGP/" + ARGS.model_name + '_'+ str(i)+'.csv') samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True) write_to_file(samplesTotal, ARGS.output_folder + "OPTGP/" + ARGS.model_name) for i in range(0, n_batches): os.remove(ARGS.output_folder + "OPTGP/" + ARGS.model_name + '_'+ str(i)+'.csv') pass def CBS_sampler(model:cobra.Model, n_samples:int=1000, n_batches:int=1, seed:int=0)-> None: if not os.path.exists(ARGS.output_folder + "CBS/" + ARGS.model_name): os.makedirs(ARGS.output_folder + "CBS/" + ARGS.model_name) df_FVA = cobra.flux_analysis.flux_variability_analysis(model,fraction_of_optimum=0).round(6) df_coefficients = CBS_backend.randomObjectiveFunction(model, n_samples*n_batches, df_FVA, seed=seed) for i in range(0, n_batches): samples = pd.DataFrame(columns =[reaction.id for reaction in model.reactions], index = range(n_samples)) try: CBS_backend.randomObjectiveFunctionSampling(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], samples) except Exception as e: # solver has failed, try sampling with cobrapy utils.logWarning( "Warning: GLPK solver has failed for " + ARGS.model_name + ". Trying with COBRA interface.", ARGS.out_log) CBS_backend.randomObjectiveFunctionSampling_cobrapy(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], samples) samples.to_csv(ARGS.output_folder + "CBS/" + ARGS.model_name + '_'+ str(i)+'.csv') for i in range(0, n_batches): samples_batch = pd.read_csv(ARGS.output_folder + "CBS/" + ARGS.model_name + '_'+ str(i)+'.csv') samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True) write_to_file(samplesTotal, ARGS.output_folder + "CBS/" + ARGS.model_name) for i in range(0, n_batches): os.remove(ARGS.output_folder + "CBS/" + ARGS.model_name + '_'+ str(i)+'.csv') pass ################################- INPUT DATA LOADING -################################ def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model: """ Loads a custom model from a file, either in JSON or XML format. Args: file_path : The path to the file containing the custom model. ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour. Raises: DataErr : if the file is in an invalid format or cannot be opened for whatever reason. Returns: cobra.Model : the model, if successfully opened. """ ext = ext if ext else file_path.ext try: if ext is utils.FileFormat.XML: return cobra.io.read_sbml_model(file_path.show()) if ext is utils.FileFormat.JSON: return cobra.io.load_json_model(file_path.show()) except Exception as e: raise utils.DataErr(file_path, e.__str__()) raise utils.DataErr(file_path, f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.") ############################# main ########################################### def main() -> None: """ Initializes everything and sets the program in motion based on the fronted input arguments. Returns: None """ if not os.path.exists('flux_sampling'): os.makedirs('flux_sampling') global ARGS ARGS = process_args(sys.argv) ARGS.output_folder = 'flux_sampling' utils.logWarning( utils.FilePath.fromStrPath(ARGS.inputs), ARGS.out_log) utils.logWarning( utils.FilePath.fromStrPath(ARGS.names), ARGS.out_log) # load custom model #model1 = load_custom_model(utils.FilePath.fromStrPath(ARGS.inputs), "xml") #model = load_custom_model( #utils.FilePath.fromStrPath(ARGS.inputs), utils.FilePath.fromStrPath(ARGS.name).ext) a = pd.DataFrame(columns=["a"]) a.loc[0] = ["sss"] a.to_csv(ARGS.output_folder+"/test1.csv") a = pd.DataFrame(columns=["a"]) a.loc[0] = ["sss"] a.to_csv(ARGS.output_folder+"/test2.csv") ''' if ARGS.sampling_algorithm == 'OPTGP': OPTGP_sampler(model, ARGS.model_name, ARGS.n_samples, ARGS.thinning, ARGS.n_batches, ARGS.seed, ARGS.out_dir) elif ARGS.sampling_algorithm == 'CBS': CBS_sampler(model, ARGS.model_name, ARGS.n_samples, ARGS.n_batches, ARGS.seed, ARGS.out_dir) else: raise utils.ValueErr(ARGS.sampling_algorithm, f"Algorithm \"{ARGS.sampling_algorithm}\" is not recognized, only OPTGP and CBS are supported.")''' ############################################################################## if __name__ == "__main__": main()