Mercurial > repos > bimib > marea_2_0
changeset 134:54f816f17f56 draft
Uploaded
author | luca_milaz |
---|---|
date | Tue, 02 Jul 2024 22:00:47 +0000 |
parents | 17f3c6ab2425 |
children | 47b8ed94b70c |
files | marea_2_0/utils/flux_sampling.py |
diffstat | 1 files changed, 231 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/marea_2_0/utils/flux_sampling.py Tue Jul 02 22:00:47 2024 +0000 @@ -0,0 +1,231 @@ +import argparse +import utils.general_utils as utils +import utils.rule_parsing as rulesUtils +from typing import Optional, Tuple, Union, Dict, List +import utils.reaction_parsing as reactionUtils +import os +import numpy as np +import pandas as pd +import cobra +from cobra.sampling import OptGPSampler +#import CBS_backend +#import utils.flux_analysis as flux_analysis +import sys + +################################# process args ############################### +def process_args(args :List[str]) -> argparse.Namespace: + """ + Processes command-line arguments. + + Args: + args (list): List of command-line arguments. + + Returns: + Namespace: An object containing parsed arguments. + """ + parser = argparse.ArgumentParser(usage = '%(prog)s [options]', + description = 'process some value\'s') + + parser.add_argument('-ol', '--out_log', + help = "Output log") + + parser.add_argument('-td', '--tool_dir', + type = str, + required = True, + help = 'your tool directory') + + parser.add_argument( + "-of", "--output_format", + type = utils.FileFormat.fromExt, default = utils.FileFormat.PICKLE, + choices = [utils.FileFormat.CSV, utils.FileFormat.PICKLE], + required = True, help = "Extension of all output files") + + parser.add_argument('-in', '--inputs', + required = True, + help = 'inputs model') + + parser.add_argument('-a', '--algorithm', + type = str, + choices = ['OPTGP', 'CBS'], + default = 'OPTGP', + help = 'choose sampling algorithm') + + parser.add_argument('-th', '--thinning', + type = int, + default = 100, + help = 'choose thinning') + + parser.add_argument('-ns', '--n_samples', + type = int, + default = 1000, + help = 'choose how many samples') + + parser.add_argument('-sd', '--seed', + type = int, + default = 0, + help = 'seed') + + parser.add_argument('-nb', '--n_batches', + type = int, + default = 1, + help = 'choose how many batches') + + ARGS = parser.parse_args() + return ARGS + +########################### warning ########################################### +def warning(s :str) -> None: + """ + Log a warning message to an output log file and print it to the console. + + Args: + s (str): The warning message to be logged and printed. + + Returns: + None + """ + with open(ARGS.out_log, 'a') as log: + log.write(s + "\n\n") + print(s) + + +def write_to_file(dataset: pd.DataFrame, dest: str)->None: + + if ARGS.output_format is utils.FileFormat.PICKLE: + utils.writePickle(dest, dataset) + elif ARGS.output_format is utils.FileFormat.CSV: + dataset.to_csv(dest, sep = '\t', index = False) + +def OPTGP_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, thinning:int=100, n_batches:int=1, seed:int=0)-> None: + + if not os.path.exists(ARGS.output_folder + "OPTGP/"): + os.makedirs(ARGS.output_folder + "OPTGP/") + + for i in range(0, n_batches): + optgp = OptGPSampler(model, thinning, seed) + samples = optgp.sample(n_samples) + samples.to_csv(ARGS.output_folder + "OPTGP/" + ARGS.model_name + '_'+ str(i)+'.csv') + i+=1 + seed+=1 + samplesTotal = pd.DataFrame() + for i in range(0, n_batches): + samples_batch = pd.read_csv(ARGS.output_folder + "OPTGP/" + ARGS.model_name + '_'+ str(i)+'.csv') + samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True) + write_to_file(samplesTotal, ARGS.output_folder + "OPTGP/" + ARGS.model_name) + for i in range(0, n_batches): + os.remove(ARGS.output_folder + "OPTGP/" + ARGS.model_name + '_'+ str(i)+'.csv') + pass + + +def CBS_sampler(model:cobra.Model, n_samples:int=1000, n_batches:int=1, seed:int=0)-> None: + + if not os.path.exists(ARGS.output_folder + "CBS/" + ARGS.model_name): + os.makedirs(ARGS.output_folder + "CBS/" + ARGS.model_name) + + df_FVA = cobra.flux_analysis.flux_variability_analysis(model,fraction_of_optimum=0).round(6) + + df_coefficients = CBS_backend.randomObjectiveFunction(model, n_samples*n_batches, df_FVA, seed=seed) + + for i in range(0, n_batches): + samples = pd.DataFrame(columns =[reaction.id for reaction in model.reactions], index = range(n_samples)) + try: + CBS_backend.randomObjectiveFunctionSampling(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], samples) + except Exception as e: + # solver has failed, try sampling with cobrapy + utils.logWarning( + "Warning: GLPK solver has failed for " + ARGS.model_name + ". Trying with COBRA interface.", + ARGS.out_log) + CBS_backend.randomObjectiveFunctionSampling_cobrapy(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], + samples) + samples.to_csv(ARGS.output_folder + "CBS/" + ARGS.model_name + '_'+ str(i)+'.csv') + + for i in range(0, n_batches): + samples_batch = pd.read_csv(ARGS.output_folder + "CBS/" + ARGS.model_name + '_'+ str(i)+'.csv') + samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True) + write_to_file(samplesTotal, ARGS.output_folder + "CBS/" + ARGS.model_name) + for i in range(0, n_batches): + os.remove(ARGS.output_folder + "CBS/" + ARGS.model_name + '_'+ str(i)+'.csv') + pass + + +################################- INPUT DATA LOADING -################################ +def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model: + """ + Loads a custom model from a file, either in JSON or XML format. + + Args: + file_path : The path to the file containing the custom model. + ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour. + + Raises: + DataErr : if the file is in an invalid format or cannot be opened for whatever reason. + + Returns: + cobra.Model : the model, if successfully opened. + """ + ext = ext if ext else file_path.ext + try: + if ext is utils.FileFormat.XML: + return cobra.io.read_sbml_model(file_path.show()) + + if ext is utils.FileFormat.JSON: + return cobra.io.load_json_model(file_path.show()) + + except Exception as e: raise utils.DataErr(file_path, e.__str__()) + raise utils.DataErr(file_path, + f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.") + +############################# main ########################################### +def main() -> None: + """ + Initializes everything and sets the program in motion based on the fronted input arguments. + + Returns: + None + """ + if not os.path.exists('flux_sampling'): + os.makedirs('flux_sampling') + + + global ARGS + ARGS = process_args(sys.argv) + + ARGS.output_folder = 'flux_sampling' + + utils.logWarning( + ARGS.inputs, + ARGS.out_log) + + + # load custom model + '''model1 = load_custom_model( + utils.FilePath.fromStrPath(ARGS.inputs[0]), ARGS.model_name[0].ext) + model2 = load_custom_model( + utils.FilePath.fromStrPath(ARGS.input[1]), ARGS.model_name[1].ext)''' + + + + + a = pd.DataFrame(columns=["a"]) + a.loc[0] = ["sss"] + a.to_csv(ARGS.output_folder+"/test1.csv") + + a = pd.DataFrame(columns=["a"]) + a.loc[0] = ["sss"] + a.to_csv(ARGS.output_folder+"/test2.csv") + + + ''' + + if ARGS.sampling_algorithm == 'OPTGP': + OPTGP_sampler(model, ARGS.model_name, ARGS.n_samples, ARGS.thinning, ARGS.n_batches, ARGS.seed, ARGS.out_dir) + + elif ARGS.sampling_algorithm == 'CBS': + CBS_sampler(model, ARGS.model_name, ARGS.n_samples, ARGS.n_batches, ARGS.seed, ARGS.out_dir) + else: + raise utils.ValueErr(ARGS.sampling_algorithm, + f"Algorithm \"{ARGS.sampling_algorithm}\" is not recognized, only OPTGP and CBS are supported.")''' + +############################################################################## +if __name__ == "__main__": + main() \ No newline at end of file