changeset 134:54f816f17f56 draft

Uploaded
author luca_milaz
date Tue, 02 Jul 2024 22:00:47 +0000
parents 17f3c6ab2425
children 47b8ed94b70c
files marea_2_0/utils/flux_sampling.py
diffstat 1 files changed, 231 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/marea_2_0/utils/flux_sampling.py	Tue Jul 02 22:00:47 2024 +0000
@@ -0,0 +1,231 @@
+import argparse
+import utils.general_utils as utils
+import utils.rule_parsing  as rulesUtils
+from typing import Optional, Tuple, Union, Dict, List
+import utils.reaction_parsing as reactionUtils
+import os
+import numpy as np
+import pandas as pd
+import cobra
+from cobra.sampling import OptGPSampler
+#import CBS_backend
+#import utils.flux_analysis as flux_analysis
+import sys
+
+################################# process args ###############################
+def process_args(args :List[str]) -> argparse.Namespace:
+    """
+    Processes command-line arguments.
+
+    Args:
+        args (list): List of command-line arguments.
+
+    Returns:
+        Namespace: An object containing parsed arguments.
+    """
+    parser = argparse.ArgumentParser(usage = '%(prog)s [options]',
+                                     description = 'process some value\'s')
+
+    parser.add_argument('-ol', '--out_log', 
+                        help = "Output log")
+    
+    parser.add_argument('-td', '--tool_dir',
+                        type = str,
+                        required = True,
+                        help = 'your tool directory')
+    
+    parser.add_argument(
+        "-of", "--output_format",
+        type = utils.FileFormat.fromExt, default = utils.FileFormat.PICKLE,
+        choices = [utils.FileFormat.CSV, utils.FileFormat.PICKLE],
+        required = True, help = "Extension of all output files")
+    
+    parser.add_argument('-in', '--inputs',
+                        required = True,
+                        help = 'inputs model')
+    
+    parser.add_argument('-a', '--algorithm',
+                        type = str,
+                        choices = ['OPTGP', 'CBS'],
+                        default = 'OPTGP',
+                        help = 'choose sampling algorithm')
+    
+    parser.add_argument('-th', '--thinning', 
+                        type = int,
+                        default = 100,
+                        help = 'choose thinning')
+    
+    parser.add_argument('-ns', '--n_samples', 
+                        type = int,
+                        default = 1000,
+                        help = 'choose how many samples')
+    
+    parser.add_argument('-sd', '--seed', 
+                        type = int,
+                        default = 0,
+                        help = 'seed')
+    
+    parser.add_argument('-nb', '--n_batches', 
+                        type = int,
+                        default = 1,
+                        help = 'choose how many batches')
+    
+    ARGS = parser.parse_args()
+    return ARGS
+
+########################### warning ###########################################
+def warning(s :str) -> None:
+    """
+    Log a warning message to an output log file and print it to the console.
+
+    Args:
+        s (str): The warning message to be logged and printed.
+    
+    Returns:
+      None
+    """
+    with open(ARGS.out_log, 'a') as log:
+        log.write(s + "\n\n")
+    print(s)
+
+
+def write_to_file(dataset: pd.DataFrame, dest: str)->None:
+
+    if ARGS.output_format is utils.FileFormat.PICKLE:
+        utils.writePickle(dest,     dataset)
+    elif ARGS.output_format is utils.FileFormat.CSV:
+        dataset.to_csv(dest, sep = '\t', index = False)
+
+def OPTGP_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, thinning:int=100, n_batches:int=1, seed:int=0)-> None:
+
+    if not os.path.exists(ARGS.output_folder + "OPTGP/"):
+        os.makedirs(ARGS.output_folder + "OPTGP/")
+
+    for i in range(0, n_batches):
+        optgp = OptGPSampler(model, thinning, seed)
+        samples = optgp.sample(n_samples)
+        samples.to_csv(ARGS.output_folder + "OPTGP/" +  ARGS.model_name + '_'+ str(i)+'.csv')
+        i+=1
+        seed+=1
+    samplesTotal = pd.DataFrame()
+    for i in range(0, n_batches):
+        samples_batch = pd.read_csv(ARGS.output_folder + "OPTGP/" +  ARGS.model_name + '_'+ str(i)+'.csv')
+        samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True)
+    write_to_file(samplesTotal, ARGS.output_folder + "OPTGP/" + ARGS.model_name)
+    for i in range(0, n_batches):
+        os.remove(ARGS.output_folder + "OPTGP/" +  ARGS.model_name + '_'+ str(i)+'.csv')
+    pass
+
+
+def CBS_sampler(model:cobra.Model, n_samples:int=1000, n_batches:int=1, seed:int=0)-> None:
+
+    if not os.path.exists(ARGS.output_folder + "CBS/" + ARGS.model_name):
+        os.makedirs(ARGS.output_folder + "CBS/" + ARGS.model_name)
+
+    df_FVA = cobra.flux_analysis.flux_variability_analysis(model,fraction_of_optimum=0).round(6)
+    
+    df_coefficients = CBS_backend.randomObjectiveFunction(model, n_samples*n_batches, df_FVA, seed=seed)
+
+    for i in range(0, n_batches):
+        samples = pd.DataFrame(columns =[reaction.id for reaction in model.reactions], index = range(n_samples))
+        try:
+            CBS_backend.randomObjectiveFunctionSampling(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], samples)
+        except Exception as e:
+        # solver has failed, try sampling with cobrapy
+            utils.logWarning(
+            "Warning: GLPK solver has failed for " + ARGS.model_name + ". Trying with COBRA interface.",
+            ARGS.out_log)
+            CBS_backend.randomObjectiveFunctionSampling_cobrapy(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], 
+                                                    samples)
+        samples.to_csv(ARGS.output_folder + "CBS/" +  ARGS.model_name + '_'+ str(i)+'.csv')
+
+    for i in range(0, n_batches):
+        samples_batch = pd.read_csv(ARGS.output_folder + "CBS/" +  ARGS.model_name + '_'+ str(i)+'.csv')
+        samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True)
+    write_to_file(samplesTotal, ARGS.output_folder + "CBS/" + ARGS.model_name)
+    for i in range(0, n_batches):
+        os.remove(ARGS.output_folder + "CBS/" +  ARGS.model_name + '_'+ str(i)+'.csv')
+    pass
+
+
+################################- INPUT DATA LOADING -################################
+def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model:
+    """
+    Loads a custom model from a file, either in JSON or XML format.
+
+    Args:
+        file_path : The path to the file containing the custom model.
+        ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour.
+
+    Raises:
+        DataErr : if the file is in an invalid format or cannot be opened for whatever reason.    
+    
+    Returns:
+        cobra.Model : the model, if successfully opened.
+    """
+    ext = ext if ext else file_path.ext
+    try:
+        if ext is utils.FileFormat.XML:
+            return cobra.io.read_sbml_model(file_path.show())
+        
+        if ext is utils.FileFormat.JSON:
+            return cobra.io.load_json_model(file_path.show())
+
+    except Exception as e: raise utils.DataErr(file_path, e.__str__())
+    raise utils.DataErr(file_path,
+        f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.")
+
+############################# main ###########################################
+def main() -> None:
+    """
+    Initializes everything and sets the program in motion based on the fronted input arguments.
+
+    Returns:
+        None
+    """
+    if not os.path.exists('flux_sampling'):
+        os.makedirs('flux_sampling')
+
+    
+    global ARGS
+    ARGS = process_args(sys.argv)
+
+    ARGS.output_folder = 'flux_sampling'
+
+    utils.logWarning(
+        ARGS.inputs,
+        ARGS.out_log)
+
+
+    # load custom model
+    '''model1 = load_custom_model(
+        utils.FilePath.fromStrPath(ARGS.inputs[0]), ARGS.model_name[0].ext)
+    model2 = load_custom_model(
+        utils.FilePath.fromStrPath(ARGS.input[1]), ARGS.model_name[1].ext)'''
+    
+
+    
+
+    a = pd.DataFrame(columns=["a"])
+    a.loc[0] = ["sss"]
+    a.to_csv(ARGS.output_folder+"/test1.csv")
+
+    a = pd.DataFrame(columns=["a"])
+    a.loc[0] = ["sss"]
+    a.to_csv(ARGS.output_folder+"/test2.csv")
+
+
+    '''
+    
+    if ARGS.sampling_algorithm == 'OPTGP':
+        OPTGP_sampler(model, ARGS.model_name, ARGS.n_samples, ARGS.thinning, ARGS.n_batches, ARGS.seed, ARGS.out_dir)
+
+    elif ARGS.sampling_algorithm == 'CBS':
+        CBS_sampler(model, ARGS.model_name, ARGS.n_samples, ARGS.n_batches, ARGS.seed, ARGS.out_dir)
+    else:
+        raise utils.ValueErr(ARGS.sampling_algorithm,
+        f"Algorithm \"{ARGS.sampling_algorithm}\" is not recognized, only OPTGP and CBS are supported.")'''
+        
+##############################################################################
+if __name__ == "__main__":
+    main()
\ No newline at end of file