view marea_2_0/utils/flux_sampling.py @ 229:8ea5adb25828 draft

Uploaded
author luca_milaz
date Mon, 08 Jul 2024 11:59:56 +0000
parents 35bc34faa7a4
children
line wrap: on
line source

import argparse
import utils.general_utils as utils
from typing import Optional, List
import os
import numpy as np
import pandas as pd
import cobra
import utils.CBS_backend as CBS_backend
from joblib import Parallel, delayed, cpu_count
from cobra.sampling import OptGPSampler
import sys

################################# process args ###############################
def process_args(args :List[str]) -> argparse.Namespace:
    """
    Processes command-line arguments.

    Args:
        args (list): List of command-line arguments.

    Returns:
        Namespace: An object containing parsed arguments.
    """
    parser = argparse.ArgumentParser(usage = '%(prog)s [options]',
                                     description = 'process some value\'s')

    parser.add_argument('-ol', '--out_log', 
                        help = "Output log")
    
    parser.add_argument('-td', '--tool_dir',
                        type = str,
                        required = True,
                        help = 'your tool directory')
    
    
    parser.add_argument('-in', '--input',
                        required = True,
                        type=str,
                        help = 'inputs model')
    
    parser.add_argument('-nm', '--name',
                        required = True,
                        type=str,
                        help = 'inputs model ids')
    
    parser.add_argument('-a', '--algorithm',
                        type = str,
                        choices = ['OPTGP', 'CBS'],
                        required = True,
                        help = 'choose sampling algorithm')
    
    parser.add_argument('-th', '--thinning', 
                        type = int,
                        default= 100,
                        required=False,
                        help = 'choose thinning')
    
    parser.add_argument('-ns', '--n_samples', 
                        type = int,
                        required = True,
                        help = 'choose how many samples')
    
    parser.add_argument('-sd', '--seed', 
                        type = int,
                        required = True,
                        help = 'seed')
    
    parser.add_argument('-nb', '--n_batches', 
                        type = int,
                        required = True,
                        help = 'choose how many batches')
    
    parser.add_argument('-ot', '--output_type', 
                        type = str,
                        required = True,
                        help = 'output type')
    
    ARGS = parser.parse_args()
    return ARGS

########################### warning ###########################################
def warning(s :str) -> None:
    """
    Log a warning message to an output log file and print it to the console.

    Args:
        s (str): The warning message to be logged and printed.
    
    Returns:
      None
    """
    with open(ARGS.out_log, 'a') as log:
        log.write(s + "\n\n")
    print(s)


def write_to_file(dataset: pd.DataFrame, name: str, keep_index:bool=False)->None:
    dataset.to_csv(ARGS.output_folder + name + ".csv", sep = '\t', index = keep_index)



def OPTGP_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, thinning:int=100, n_batches:int=1, seed:int=0)-> None:

    for i in range(0, n_batches):
        optgp = OptGPSampler(model, thinning, seed)
        samples = optgp.sample(n_samples)
        samples.to_csv(ARGS.output_folder +  model_name + '_'+ str(i)+'_OPTGP.csv', index=False)
        seed+=1
    samplesTotal = pd.DataFrame()
    for i in range(0, n_batches):
        samples_batch = pd.read_csv(ARGS.output_folder  +  model_name + '_'+ str(i)+'_OPTGP.csv')
        samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True)

    write_to_file(samplesTotal, model_name)

    for i in range(0, n_batches):
        os.remove(ARGS.output_folder +   model_name + '_'+ str(i)+'_OPTGP.csv')
    pass


def CBS_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, n_batches:int=1, seed:int=0)-> None:

    df_FVA = cobra.flux_analysis.flux_variability_analysis(model,fraction_of_optimum=0).round(6)
    
    df_coefficients = CBS_backend.randomObjectiveFunction(model, n_samples*n_batches, df_FVA, seed=seed)

    for i in range(0, n_batches):
        samples = pd.DataFrame(columns =[reaction.id for reaction in model.reactions], index = range(n_samples))
        try:
            CBS_backend.randomObjectiveFunctionSampling(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], samples)
        except Exception as e:
            utils.logWarning(
            "Warning: GLPK solver has failed for " + model_name + ". Trying with COBRA interface. Error:" + str(e),
            ARGS.out_log)
            CBS_backend.randomObjectiveFunctionSampling_cobrapy(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], 
                                                    samples)
        samples.to_csv(ARGS.output_folder +  model_name + '_'+ str(i)+'_CBS.csv', index=False)

    samplesTotal = pd.DataFrame()
    for i in range(0, n_batches):
        samples_batch = pd.read_csv(ARGS.output_folder  +  model_name + '_'+ str(i)+'_CBS.csv')
        samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True)

    write_to_file(samplesTotal, model_name)

    for i in range(0, n_batches):
        os.remove(ARGS.output_folder +   model_name + '_'+ str(i)+'_CBS.csv')
    pass


def model_sampler(model_input:str, model_name:str)-> List[pd.DataFrame]:

    model = load_custom_model(
        utils.FilePath.fromStrPath(model_input), utils.FilePath.fromStrPath(model_name).ext)
    
    utils.logWarning(
        "Sampling model: " + model_name,
        ARGS.out_log)
    
    name = model_name.split('.')[0]
    
    if ARGS.algorithm == 'OPTGP':
        OPTGP_sampler(model, name, ARGS.n_samples, ARGS.thinning, ARGS.n_batches, ARGS.seed)

    elif ARGS.algorithm == 'CBS':
        CBS_sampler(model,  name, ARGS.n_samples, ARGS.n_batches, ARGS.seed)

    df_mean, df_median, df_quantiles = fluxes_statistics(name, ARGS.output_types)

    if("fluxes" not in ARGS.output_types):
        os.remove(ARGS.output_folder  +  name + '.csv')

    return df_mean, df_median, df_quantiles

def fluxes_statistics(model_name: str,  output_types:List)-> List[pd.DataFrame]:

    df_mean = pd.DataFrame()
    df_median= pd.DataFrame()
    df_quantiles= pd.DataFrame()

    df_samples = pd.read_csv(ARGS.output_folder  +  model_name + '.csv', sep = '\t')
    for output_type in output_types:
        if(output_type == "mean"):
            df_mean = df_samples.mean()
            df_mean = df_mean.to_frame().T
            df_mean = df_mean.reset_index(drop=True)
            df_mean.index = [model_name]
        elif(output_type == "median"):
            df_median = df_samples.median()
            df_median = df_median.to_frame().T
            df_median = df_median.reset_index(drop=True)
            df_median.index = [model_name]
        elif(output_type == "quantiles"):
            df_quantile = df_samples.quantile([0.25, 0.5, 0.75])
            newRow = []
            cols = []
            for rxn in df_quantile.columns:
                newRow.append(df_quantile[rxn].loc[0.25])
                cols.append(rxn + "_q1")
                newRow.append(df_quantile[rxn].loc[0.5])
                cols.append(rxn + "_q2")
                newRow.append(df_quantile[rxn].loc[0.75])
                cols.append(rxn + "_q3")
            df_quantiles = pd.DataFrame(columns=cols)
            df_quantiles.loc[0] = newRow
            df_quantiles = df_quantiles.reset_index(drop=True)
            df_quantiles.index = [model_name]
    
    return df_mean, df_median, df_quantiles
    

################################- INPUT DATA LOADING -################################
def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model:
    """
    Loads a custom model from a file, either in JSON or XML format.

    Args:
        file_path : The path to the file containing the custom model.
        ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour.

    Raises:
        DataErr : if the file is in an invalid format or cannot be opened for whatever reason.    
    
    Returns:
        cobra.Model : the model, if successfully opened.
    """
    ext = ext if ext else file_path.ext
    try:
        if ext is utils.FileFormat.XML:
            return cobra.io.read_sbml_model(file_path.show())
        
        if ext is utils.FileFormat.JSON:
            return cobra.io.load_json_model(file_path.show())

    except Exception as e: raise utils.DataErr(file_path, e.__str__())
    raise utils.DataErr(file_path,
        f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.")

############################# main ###########################################
def main() -> None:
    """
    Initializes everything and sets the program in motion based on the fronted input arguments.

    Returns:
        None
    """
    if not os.path.exists('flux_sampling'):
        os.makedirs('flux_sampling')

    num_processors = cpu_count()

    global ARGS
    ARGS = process_args(sys.argv)

    ARGS.output_folder = 'flux_sampling/'
    
    utils.logWarning(
        ARGS.output_type,
        ARGS.out_log)
    
    models_input = ARGS.input.split(",")
    models_name = ARGS.name.split(",")
    ARGS.output_types = ARGS.output_type.split(",")

 
    results = Parallel(n_jobs=num_processors)(delayed(model_sampler)(model_input, model_name) for model_input, model_name in zip(models_input, models_name))

    all_mean = pd.concat([result[0] for result in results], ignore_index=False)
    all_median = pd.concat([result[1] for result in results], ignore_index=False)
    all_quantiles = pd.concat([result[2] for result in results], ignore_index=False)

    if("mean" in ARGS.output_types):
        all_mean = all_mean.fillna(0.0)
        all_mean = all_mean.sort_index()
        write_to_file(all_mean, "mean", True)

    if("median" in ARGS.output_types):
        all_median = all_median.fillna(0.0)
        all_median = all_median.sort_index()
        write_to_file(all_median, "median", True)
    
    if("quantiles" in ARGS.output_types):
        all_quantiles = all_quantiles.fillna(0.0)
        all_quantiles = all_quantiles.sort_index()
        write_to_file(all_quantiles, "quantiles", True)
    pass
        
##############################################################################
if __name__ == "__main__":
    main()