view marea_2_0/flux_sampling.py @ 123:a11a2b97b87b draft

Uploaded
author luca_milaz
date Tue, 02 Jul 2024 20:49:03 +0000
parents ad5250c72a7e
children b99c5230b928
line wrap: on
line source

import argparse
import utils.general_utils as utils
import utils.rule_parsing  as rulesUtils
from typing import Optional, Tuple, Union, Dict, List
import utils.reaction_parsing as reactionUtils
import os
import numpy as np
import pandas as pd
import cobra
from cobra.sampling import OptGPSampler
import CBS_backend
#import utils.flux_analysis as flux_analysis
import sys

################################# process args ###############################
def process_args(args :List[str]) -> argparse.Namespace:
    """
    Processes command-line arguments.

    Args:
        args (list): List of command-line arguments.

    Returns:
        Namespace: An object containing parsed arguments.
    """
    parser = argparse.ArgumentParser(usage = '%(prog)s [options]',
                                     description = 'process some value\'s')

    parser.add_argument('-ol', '--out_log', 
                        help = "Output log")
    
    parser.add_argument(
        "-of", "--output_format",
        type = utils.FileFormat.fromExt, default = utils.FileFormat.PICKLE,
        choices = [utils.FileFormat.CSV, utils.FileFormat.PICKLE],
        required = True, help = "Extension of all output files")
    
    parser.add_argument('-in', '--input',
                        required = True,
                        help = 'inputs model')
    
    parser.add_argument('-mn', '--model_name',
                        required = True,
                        help = 'model_names')
    
    parser.add_argument('-a', '--algorithm',
                        type = str,
                        choices = ['OPTGP', 'CBS'],
                        default = 'OPTGP',
                        help = 'choose sampling algorithm')
    
    parser.add_argument('-th', '--thinning', 
                        type = int,
                        default = 100,
                        help = 'choose thinning')
    
    parser.add_argument('-ns', '--n_samples', 
                        type = int,
                        default = 1000,
                        help = 'choose how many samples')
    
    parser.add_argument('-sd', '--seed', 
                        type = int,
                        default = 0,
                        help = 'seed')
    
    parser.add_argument('-nb', '--n_batches', 
                        type = int,
                        default = 1,
                        help = 'choose how many batches')
    
    ARGS = parser.parse_args()
    return ARGS

########################### warning ###########################################
def warning(s :str) -> None:
    """
    Log a warning message to an output log file and print it to the console.

    Args:
        s (str): The warning message to be logged and printed.
    
    Returns:
      None
    """
    with open(ARGS.out_log, 'a') as log:
        log.write(s + "\n\n")
    print(s)


def write_to_file(dataset: pd.DataFrame, dest: str)->None:

    if ARGS.output_format is utils.FileFormat.PICKLE:
        utils.writePickle(dest,     dataset)
    elif ARGS.output_format is utils.FileFormat.CSV:
        dataset.to_csv(dest, sep = '\t', index = False)

def OPTGP_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, thinning:int=100, n_batches:int=1, seed:int=0)-> None:

    if not os.path.exists(ARGS.output_folder + "OPTGP/"):
        os.makedirs(ARGS.output_folder + "OPTGP/")

    for i in range(0, n_batches):
        optgp = OptGPSampler(model, thinning, seed)
        samples = optgp.sample(n_samples)
        samples.to_csv(ARGS.output_folder + "OPTGP/" +  ARGS.model_name + '_'+ str(i)+'.csv')
        i+=1
        seed+=1
    samplesTotal = pd.DataFrame()
    for i in range(0, n_batches):
        samples_batch = pd.read_csv(ARGS.output_folder + "OPTGP/" +  ARGS.model_name + '_'+ str(i)+'.csv')
        samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True)
    write_to_file(samplesTotal, ARGS.output_folder + "OPTGP/" + ARGS.model_name)
    for i in range(0, n_batches):
        os.remove(ARGS.output_folder + "OPTGP/" +  ARGS.model_name + '_'+ str(i)+'.csv')
    pass


def CBS_sampler(model:cobra.Model, n_samples:int=1000, n_batches:int=1, seed:int=0)-> None:

    if not os.path.exists(ARGS.output_folder + "CBS/" + ARGS.model_name):
        os.makedirs(ARGS.output_folder + "CBS/" + ARGS.model_name)

    df_FVA = cobra.flux_analysis.flux_variability_analysis(model,fraction_of_optimum=0).round(6)
    
    df_coefficients = CBS_backend.randomObjectiveFunction(model, n_samples*n_batches, df_FVA, seed=seed)

    for i in range(0, n_batches):
        samples = pd.DataFrame(columns =[reaction.id for reaction in model.reactions], index = range(n_samples))
        try:
            CBS_backend.randomObjectiveFunctionSampling(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], samples)
        except Exception as e:
        # solver has failed, try sampling with cobrapy
            utils.logWarning(
            "Warning: GLPK solver has failed for " + ARGS.model_name + ". Trying with COBRA interface.",
            ARGS.out_log)
            CBS_backend.randomObjectiveFunctionSampling_cobrapy(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], 
                                                    samples)
        samples.to_csv(ARGS.output_folder + "CBS/" +  ARGS.model_name + '_'+ str(i)+'.csv')

    for i in range(0, n_batches):
        samples_batch = pd.read_csv(ARGS.output_folder + "CBS/" +  ARGS.model_name + '_'+ str(i)+'.csv')
        samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True)
    write_to_file(samplesTotal, ARGS.output_folder + "CBS/" + ARGS.model_name)
    for i in range(0, n_batches):
        os.remove(ARGS.output_folder + "CBS/" +  ARGS.model_name + '_'+ str(i)+'.csv')
    pass


################################- INPUT DATA LOADING -################################
def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model:
    """
    Loads a custom model from a file, either in JSON or XML format.

    Args:
        file_path : The path to the file containing the custom model.
        ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour.

    Raises:
        DataErr : if the file is in an invalid format or cannot be opened for whatever reason.    
    
    Returns:
        cobra.Model : the model, if successfully opened.
    """
    ext = ext if ext else file_path.ext
    try:
        if ext is utils.FileFormat.XML:
            return cobra.io.read_sbml_model(file_path.show())
        
        if ext is utils.FileFormat.JSON:
            return cobra.io.load_json_model(file_path.show())

    except Exception as e: raise utils.DataErr(file_path, e.__str__())
    raise utils.DataErr(file_path,
        f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.")

############################# main ###########################################
def main() -> None:
    """
    Initializes everything and sets the program in motion based on the fronted input arguments.

    Returns:
        None
    """
    if not os.path.exists('flux_sampling'):
        os.makedirs('flux_sampling')

    global ARGS
    ARGS = process_args(sys.argv)

    ARGS.output_folder = 'flux_sampling/'



    # load custom model
    model1 = load_custom_model(
        utils.FilePath.fromStrPath(ARGS.input[0]), ARGS.model_name[0].ext)
    model2 = load_custom_model(
        utils.FilePath.fromStrPath(ARGS.input[1]), ARGS.model_name[1].ext)

    with open(ARGS.output_folder, 'w') as f:
                f.write([model1, model2])

    '''
    
    if ARGS.sampling_algorithm == 'OPTGP':
        OPTGP_sampler(model, ARGS.model_name, ARGS.n_samples, ARGS.thinning, ARGS.n_batches, ARGS.seed, ARGS.out_dir)

    elif ARGS.sampling_algorithm == 'CBS':
        CBS_sampler(model, ARGS.model_name, ARGS.n_samples, ARGS.n_batches, ARGS.seed, ARGS.out_dir)
    else:
        raise utils.ValueErr(ARGS.sampling_algorithm,
        f"Algorithm \"{ARGS.sampling_algorithm}\" is not recognized, only OPTGP and CBS are supported.")'''
        
##############################################################################
if __name__ == "__main__":
    main()