view marea_2_0/flux_sampling.py @ 174:ea089c02b73e draft

Uploaded
author luca_milaz
date Wed, 03 Jul 2024 18:48:53 +0000
parents 82a1a4cfe1e0
children d58974850ed0
line wrap: on
line source

import argparse
import utils.general_utils as utils
import utils.rule_parsing  as rulesUtils
from typing import Optional, Tuple, Union, Dict, List
import utils.reaction_parsing as reactionUtils
import os
import numpy as np
import pandas as pd
import cobra
import CBS_backend
from joblib import Parallel, delayed, cpu_count
from cobra.sampling import OptGPSampler
#import utils.flux_analysis as flux_analysis
import sys

################################# process args ###############################
def process_args(args :List[str]) -> argparse.Namespace:
    """
    Processes command-line arguments.

    Args:
        args (list): List of command-line arguments.

    Returns:
        Namespace: An object containing parsed arguments.
    """
    parser = argparse.ArgumentParser(usage = '%(prog)s [options]',
                                     description = 'process some value\'s')

    parser.add_argument('-ol', '--out_log', 
                        help = "Output log")
    
    parser.add_argument('-td', '--tool_dir',
                        type = str,
                        required = True,
                        help = 'your tool directory')
    
    parser.add_argument(
        "-of", "--output_format",
        type = utils.FileFormat.fromExt,
        choices = [utils.FileFormat.CSV, utils.FileFormat.PICKLE],
        required = True, help = "Extension of all output files")
    
    parser.add_argument('-in', '--input',
                        required = True,
                        type=str,
                        help = 'inputs model')
    
    parser.add_argument('-nm', '--name',
                        required = True,
                        type=str,
                        help = 'inputs model ids')
    
    parser.add_argument('-a', '--algorithm',
                        type = str,
                        choices = ['OPTGP', 'CBS'],
                        required = True,
                        help = 'choose sampling algorithm')
    
    parser.add_argument('-th', '--thinning', 
                        type = int,
                        default= 100,
                        required=False,
                        help = 'choose thinning')
    
    parser.add_argument('-ns', '--n_samples', 
                        type = int,
                        required = True,
                        help = 'choose how many samples')
    
    parser.add_argument('-sd', '--seed', 
                        type = int,
                        required = True,
                        help = 'seed')
    
    parser.add_argument('-nb', '--n_batches', 
                        type = int,
                        required = True,
                        help = 'choose how many batches')
    
    ARGS = parser.parse_args()
    return ARGS

########################### warning ###########################################
def warning(s :str) -> None:
    """
    Log a warning message to an output log file and print it to the console.

    Args:
        s (str): The warning message to be logged and printed.
    
    Returns:
      None
    """
    with open(ARGS.out_log, 'a') as log:
        log.write(s + "\n\n")
    print(s)


def write_to_file(dataset: pd.DataFrame, name: str)->None:

    if ARGS.output_format is utils.FileFormat.PICKLE:
        save_path = utils.FilePath(name, ARGS.output_format, prefix = ARGS.output_folder)
        utils.writePickle(save_path, dataset)
    elif ARGS.output_format is utils.FileFormat.CSV:
        dataset.to_csv(ARGS.output_folder + name + ".csv", sep = '\t', index = False)

def OPTGP_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, thinning:int=100, n_batches:int=1, seed:int=0)-> None:

    if not os.path.exists(ARGS.output_folder + "OPTGP/"):
        os.makedirs(ARGS.output_folder + "OPTGP/")

    for i in range(0, n_batches):
        optgp = OptGPSampler(model, thinning, seed)
        samples = optgp.sample(n_samples)
        samples.to_csv(ARGS.output_folder + "OPTGP/" +  ARGS.model_name + '_'+ str(i)+'.csv')
        i+=1
        seed+=1
    samplesTotal = pd.DataFrame()
    for i in range(0, n_batches):
        samples_batch = pd.read_csv(ARGS.output_folder + "OPTGP/" +  ARGS.model_name + '_'+ str(i)+'.csv')
        samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True)
    write_to_file(samplesTotal, ARGS.output_folder + "OPTGP/" + ARGS.model_name)
    for i in range(0, n_batches):
        os.remove(ARGS.output_folder + "OPTGP/" +  ARGS.model_name + '_'+ str(i)+'.csv')
    pass


def CBS_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, n_batches:int=1, seed:int=0)-> None:

    df_FVA = cobra.flux_analysis.flux_variability_analysis(model,fraction_of_optimum=0).round(6)
    
    df_coefficients = CBS_backend.randomObjectiveFunction(model, n_samples*n_batches, df_FVA, seed=seed)

    for i in range(0, n_batches):
        samples = pd.DataFrame(columns =[reaction.id for reaction in model.reactions], index = range(n_samples))
        try:
            CBS_backend.randomObjectiveFunctionSampling(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], samples)
        except Exception as e:
            utils.logWarning(
            "Warning: GLPK solver has failed for " + model_name + ". Trying with COBRA interface.",
            ARGS.out_log)
            CBS_backend.randomObjectiveFunctionSampling_cobrapy(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], 
                                                    samples)
        samples.to_csv(ARGS.output_folder +  model_name + '_'+ str(i)+'.csv')

    samplesTotal = pd.DataFrame()
    for i in range(0, n_batches):
        samples_batch = pd.read_csv(ARGS.output_folder  +  model_name + '_'+ str(i)+'.csv')
        samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True)

    write_to_file(samplesTotal, model_name)

    for i in range(0, n_batches):
        os.remove(ARGS.output_folder +   model_name + '_'+ str(i)+'.csv')
    pass


def model_sampler(model_input:str, model_name:str)->None:

    model = load_custom_model(
        utils.FilePath.fromStrPath(model_input), utils.FilePath.fromStrPath(model_name).ext)
    
    utils.logWarning(
        "Sampling model: " + model_name,
        ARGS.out_log)
    
    name = model_name.split('.')[0]
    
    if ARGS.algorithm == 'OPTGP':
        OPTGP_sampler(model, name, ARGS.n_samples, ARGS.thinning, ARGS.n_batches, ARGS.seed)

    elif ARGS.algorithm == 'CBS':
        CBS_sampler(model,  name, ARGS.n_samples, ARGS.n_batches, ARGS.seed)
    
    pass



################################- INPUT DATA LOADING -################################
def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model:
    """
    Loads a custom model from a file, either in JSON or XML format.

    Args:
        file_path : The path to the file containing the custom model.
        ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour.

    Raises:
        DataErr : if the file is in an invalid format or cannot be opened for whatever reason.    
    
    Returns:
        cobra.Model : the model, if successfully opened.
    """
    ext = ext if ext else file_path.ext
    try:
        if ext is utils.FileFormat.XML:
            return cobra.io.read_sbml_model(file_path.show())
        
        if ext is utils.FileFormat.JSON:
            return cobra.io.load_json_model(file_path.show())

    except Exception as e: raise utils.DataErr(file_path, e.__str__())
    raise utils.DataErr(file_path,
        f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.")

############################# main ###########################################
def main() -> None:
    """
    Initializes everything and sets the program in motion based on the fronted input arguments.

    Returns:
        None
    """
    if not os.path.exists('flux_sampling'):
        os.makedirs('flux_sampling')

    num_processors = cpu_count()

    global ARGS
    ARGS = process_args(sys.argv)

    ARGS.output_folder = 'flux_sampling/'

    utils.logWarning(
        ARGS.input,
        ARGS.out_log)
    
    utils.logWarning(
        ARGS.name,
        ARGS.out_log)
    
    models_input = ARGS.input.split(",")
    models_name = ARGS.name.split(",")

    #Parallel(n_jobs=num_processors)(delayed(model_sampler)(model_input, model_name) for model_input, model_name in zip(models_input, models_name))
    model_sampler(models_input[0], models_name[0])
        
##############################################################################
if __name__ == "__main__":
    main()