changeset 410:d660c5b03c14 draft default tip

Uploaded
author francesco_lapi
date Mon, 08 Sep 2025 17:33:52 +0000
parents 71850bdf9e1e
children
files COBRAxy/flux_simulation_beta.py COBRAxy/flux_simulation_beta.xml
diffstat 2 files changed, 572 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/COBRAxy/flux_simulation_beta.py	Mon Sep 08 17:33:52 2025 +0000
@@ -0,0 +1,445 @@
+import argparse
+import utils.general_utils as utils
+from typing import Optional, List
+import os
+import numpy as np
+import pandas as pd
+import cobra
+import utils.CBS_backend as CBS_backend
+from joblib import Parallel, delayed, cpu_count
+from cobra.sampling import OptGPSampler
+import sys
+
+
+################################# process args ###############################
+def process_args(args :List[str] = None) -> argparse.Namespace:
+    """
+    Processes command-line arguments.
+
+    Args:
+        args (list): List of command-line arguments.
+
+    Returns:
+        Namespace: An object containing parsed arguments.
+    """
+    parser = argparse.ArgumentParser(usage = '%(prog)s [options]',
+                                     description = 'process some value\'s')
+    
+    parser.add_argument("-mo", "--model_upload", type = str,
+        help = "path to input file with custom rules, if provided")
+
+    parser.add_argument('-ol', '--out_log', 
+                        help = "Output log")
+    
+    parser.add_argument('-td', '--tool_dir',
+                        type = str,
+                        required = True,
+                        help = 'your tool directory')
+    
+    parser.add_argument('-in', '--input',
+                        required = True,
+                        type=str,
+                        help = 'inputs bounds')
+    
+    parser.add_argument('-ni', '--names',
+                        required = True,
+                        type=str,
+                        help = 'cell names')
+    
+    parser.add_argument('-a', '--algorithm',
+                        type = str,
+                        choices = ['OPTGP', 'CBS'],
+                        required = True,
+                        help = 'choose sampling algorithm')
+    
+    parser.add_argument('-th', '--thinning', 
+                        type = int,
+                        default= 100,
+                        required=False,
+                        help = 'choose thinning')
+    
+    parser.add_argument('-ns', '--n_samples', 
+                        type = int,
+                        required = True,
+                        help = 'choose how many samples')
+    
+    parser.add_argument('-sd', '--seed', 
+                        type = int,
+                        required = True,
+                        help = 'seed')
+    
+    parser.add_argument('-nb', '--n_batches', 
+                        type = int,
+                        required = True,
+                        help = 'choose how many batches')
+    
+    parser.add_argument('-ot', '--output_type', 
+                        type = str,
+                        required = True,
+                        help = 'output type')
+    
+    parser.add_argument('-ota', '--output_type_analysis', 
+                        type = str,
+                        required = False,
+                        help = 'output type analysis')
+    
+    parser.add_argument('-idop', '--output_path', 
+                        type = str,
+                        default='flux_simulation',
+                        help = 'output path for maps')
+    
+    ARGS = parser.parse_args(args)
+    return ARGS
+
+########################### warning ###########################################
+def warning(s :str) -> None:
+    """
+    Log a warning message to an output log file and print it to the console.
+
+    Args:
+        s (str): The warning message to be logged and printed.
+    
+    Returns:
+      None
+    """
+    with open(ARGS.out_log, 'a') as log:
+        log.write(s + "\n\n")
+    print(s)
+
+
+def write_to_file(dataset: pd.DataFrame, name: str, keep_index:bool=False)->None:
+    dataset.index.name = 'Reactions'
+    dataset.to_csv(ARGS.output_path + "/" + name + ".csv", sep = '\t', index = keep_index)
+
+############################ dataset input ####################################
+def read_dataset(data :str, name :str) -> pd.DataFrame:
+    """
+    Read a dataset from a CSV file and return it as a pandas DataFrame.
+
+    Args:
+        data (str): Path to the CSV file containing the dataset.
+        name (str): Name of the dataset, used in error messages.
+
+    Returns:
+        pandas.DataFrame: DataFrame containing the dataset.
+
+    Raises:
+        pd.errors.EmptyDataError: If the CSV file is empty.
+        sys.exit: If the CSV file has the wrong format, the execution is aborted.
+    """
+    try:
+        dataset = pd.read_csv(data, sep = '\t', header = 0, index_col=0, engine='python')
+    except pd.errors.EmptyDataError:
+        sys.exit('Execution aborted: wrong format of ' + name + '\n')
+    if len(dataset.columns) < 2:
+        sys.exit('Execution aborted: wrong format of ' + name + '\n')
+    return dataset
+
+
+
+def OPTGP_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, thinning:int=100, n_batches:int=1, seed:int=0)-> None:
+    """
+    Samples from the OPTGP (Optimal Global Perturbation) algorithm and saves the results to CSV files.
+
+    Args:
+        model (cobra.Model): The COBRA model to sample from.
+        model_name (str): The name of the model, used in naming output files.
+        n_samples (int, optional): Number of samples per batch. Default is 1000.
+        thinning (int, optional): Thinning parameter for the sampler. Default is 100.
+        n_batches (int, optional): Number of batches to run. Default is 1.
+        seed (int, optional): Random seed for reproducibility. Default is 0.
+    
+    Returns:
+        None
+    """
+
+    for i in range(0, n_batches):
+        optgp = OptGPSampler(model, thinning, seed)
+        samples = optgp.sample(n_samples)
+        samples.to_csv(ARGS.output_path + "/" +  model_name + '_'+ str(i)+'_OPTGP.csv', index=False)
+        seed+=1
+    samplesTotal = pd.DataFrame()
+    for i in range(0, n_batches):
+        samples_batch = pd.read_csv(ARGS.output_path + "/"  +  model_name + '_'+ str(i)+'_OPTGP.csv')
+        samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True)
+
+    write_to_file(samplesTotal.T, model_name, True)
+
+    for i in range(0, n_batches):
+        os.remove(ARGS.output_path + "/" +   model_name + '_'+ str(i)+'_OPTGP.csv')
+    pass
+
+
+def CBS_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, n_batches:int=1, seed:int=0)-> None:
+    """
+    Samples using the CBS (Constraint-based Sampling) algorithm and saves the results to CSV files.
+
+    Args:
+        model (cobra.Model): The COBRA model to sample from.
+        model_name (str): The name of the model, used in naming output files.
+        n_samples (int, optional): Number of samples per batch. Default is 1000.
+        n_batches (int, optional): Number of batches to run. Default is 1.
+        seed (int, optional): Random seed for reproducibility. Default is 0.
+    
+    Returns:
+        None
+    """
+
+    df_FVA = cobra.flux_analysis.flux_variability_analysis(model,fraction_of_optimum=0).round(6)
+    
+    df_coefficients = CBS_backend.randomObjectiveFunction(model, n_samples*n_batches, df_FVA, seed=seed)
+
+    for i in range(0, n_batches):
+        samples = pd.DataFrame(columns =[reaction.id for reaction in model.reactions], index = range(n_samples))
+        try:
+            CBS_backend.randomObjectiveFunctionSampling(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], samples)
+        except Exception as e:
+            utils.logWarning(
+            "Warning: GLPK solver has failed for " + model_name + ". Trying with COBRA interface. Error:" + str(e),
+            ARGS.out_log)
+            CBS_backend.randomObjectiveFunctionSampling_cobrapy(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], 
+                                                    samples)
+        utils.logWarning(ARGS.output_path + "/" +  model_name + '_'+ str(i)+'_CBS.csv', ARGS.out_log)
+        samples.to_csv(ARGS.output_path + "/" +  model_name + '_'+ str(i)+'_CBS.csv', index=False)
+
+    samplesTotal = pd.DataFrame()
+    for i in range(0, n_batches):
+        samples_batch = pd.read_csv(ARGS.output_path + "/"  +  model_name + '_'+ str(i)+'_CBS.csv')
+        samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True)
+
+    write_to_file(samplesTotal.T, model_name, True)
+
+    for i in range(0, n_batches):
+        os.remove(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv')
+    pass
+
+
+def model_sampler(model_input_original:cobra.Model, bounds_path:str, cell_name:str)-> List[pd.DataFrame]:
+    """
+    Prepares the model with bounds from the dataset and performs sampling and analysis based on the selected algorithm.
+
+    Args:
+        model_input_original (cobra.Model): The original COBRA model.
+        bounds_path (str): Path to the CSV file containing the bounds dataset.
+        cell_name (str): Name of the cell, used to generate filenames for output.
+
+    Returns:
+        List[pd.DataFrame]: A list of DataFrames containing statistics and analysis results.
+    """
+
+    model_input = model_input_original.copy()
+    bounds_df = read_dataset(bounds_path, "bounds dataset")
+    for rxn_index, row in bounds_df.iterrows():
+        model_input.reactions.get_by_id(rxn_index).lower_bound = row.lower_bound
+        model_input.reactions.get_by_id(rxn_index).upper_bound = row.upper_bound
+    
+    
+    if ARGS.algorithm == 'OPTGP':
+        OPTGP_sampler(model_input, cell_name, ARGS.n_samples, ARGS.thinning, ARGS.n_batches, ARGS.seed)
+
+    elif ARGS.algorithm == 'CBS':
+        CBS_sampler(model_input,  cell_name, ARGS.n_samples, ARGS.n_batches, ARGS.seed)
+
+    df_mean, df_median, df_quantiles = fluxes_statistics(cell_name, ARGS.output_types)
+
+    if("fluxes" not in ARGS.output_types):
+        os.remove(ARGS.output_path + "/"  +  cell_name + '.csv')
+
+    returnList = []
+    returnList.append(df_mean)
+    returnList.append(df_median)
+    returnList.append(df_quantiles)
+
+    df_pFBA, df_FVA, df_sensitivity = fluxes_analysis(model_input, cell_name, ARGS.output_type_analysis)
+
+    if("pFBA" in ARGS.output_type_analysis):
+        returnList.append(df_pFBA)
+    if("FVA" in ARGS.output_type_analysis):
+        returnList.append(df_FVA)
+    if("sensitivity" in ARGS.output_type_analysis):
+        returnList.append(df_sensitivity)
+
+    return returnList
+
+def fluxes_statistics(model_name: str,  output_types:List)-> List[pd.DataFrame]:
+    """
+    Computes statistics (mean, median, quantiles) for the fluxes.
+
+    Args:
+        model_name (str): Name of the model, used in filename for input.
+        output_types (List[str]): Types of statistics to compute (mean, median, quantiles).
+
+    Returns:
+        List[pd.DataFrame]: List of DataFrames containing mean, median, and quantiles statistics.
+    """
+
+    df_mean = pd.DataFrame()
+    df_median= pd.DataFrame()
+    df_quantiles= pd.DataFrame()
+
+    df_samples = pd.read_csv(ARGS.output_path + "/"  +  model_name + '.csv', sep = '\t', index_col = 0).T
+    df_samples = df_samples.round(8)
+
+    for output_type in output_types:
+        if(output_type == "mean"):
+            df_mean = df_samples.mean()
+            df_mean = df_mean.to_frame().T
+            df_mean = df_mean.reset_index(drop=True)
+            df_mean.index = [model_name]
+        elif(output_type == "median"):
+            df_median = df_samples.median()
+            df_median = df_median.to_frame().T
+            df_median = df_median.reset_index(drop=True)
+            df_median.index = [model_name]
+        elif(output_type == "quantiles"):
+            newRow = []
+            cols = []
+            for rxn in df_samples.columns:
+                quantiles = df_samples[rxn].quantile([0.25, 0.50, 0.75])
+                newRow.append(quantiles[0.25])
+                cols.append(rxn + "_q1")
+                newRow.append(quantiles[0.5])
+                cols.append(rxn + "_q2")
+                newRow.append(quantiles[0.75])
+                cols.append(rxn + "_q3")
+            df_quantiles = pd.DataFrame(columns=cols)
+            df_quantiles.loc[0] = newRow
+            df_quantiles = df_quantiles.reset_index(drop=True)
+            df_quantiles.index = [model_name]
+    
+    return df_mean, df_median, df_quantiles
+
+def fluxes_analysis(model:cobra.Model,  model_name:str, output_types:List)-> List[pd.DataFrame]:
+    """
+    Performs flux analysis including pFBA, FVA, and sensitivity analysis.
+
+    Args:
+        model (cobra.Model): The COBRA model to analyze.
+        model_name (str): Name of the model, used in filenames for output.
+        output_types (List[str]): Types of analysis to perform (pFBA, FVA, sensitivity).
+
+    Returns:
+        List[pd.DataFrame]: List of DataFrames containing pFBA, FVA, and sensitivity analysis results.
+    """
+
+    df_pFBA = pd.DataFrame()
+    df_FVA= pd.DataFrame()
+    df_sensitivity= pd.DataFrame()
+
+    for output_type in output_types:
+        if(output_type == "pFBA"):
+            model.objective = "Biomass"
+            solution = cobra.flux_analysis.pfba(model)
+            fluxes = solution.fluxes
+            df_pFBA.loc[0,[rxn._id for rxn in model.reactions]] = fluxes.tolist()
+            df_pFBA = df_pFBA.reset_index(drop=True)
+            df_pFBA.index = [model_name]
+            df_pFBA = df_pFBA.astype(float).round(6)
+        elif(output_type == "FVA"):
+            fva = cobra.flux_analysis.flux_variability_analysis(model, fraction_of_optimum=0, processes=1).round(8)
+            columns = []
+            for rxn in fva.index.to_list():
+                columns.append(rxn + "_min")
+                columns.append(rxn + "_max")
+            df_FVA= pd.DataFrame(columns = columns)
+            for index_rxn, row in fva.iterrows():
+                df_FVA.loc[0, index_rxn+ "_min"] = fva.loc[index_rxn, "minimum"]
+                df_FVA.loc[0, index_rxn+ "_max"] = fva.loc[index_rxn, "maximum"]
+            df_FVA = df_FVA.reset_index(drop=True)
+            df_FVA.index = [model_name]
+            df_FVA = df_FVA.astype(float).round(6)
+        elif(output_type == "sensitivity"):
+            model.objective = "Biomass"
+            solution_original = model.optimize().objective_value
+            reactions = model.reactions
+            single = cobra.flux_analysis.single_reaction_deletion(model)
+            newRow = []
+            df_sensitivity = pd.DataFrame(columns = [rxn.id for rxn in reactions], index = [model_name])
+            for rxn in reactions:
+                newRow.append(single.knockout[rxn.id].growth.values[0]/solution_original)
+            df_sensitivity.loc[model_name] = newRow
+            df_sensitivity = df_sensitivity.astype(float).round(6)
+    return df_pFBA, df_FVA, df_sensitivity
+
+############################# main ###########################################
+def main(args :List[str] = None) -> None:
+    """
+    Initializes everything and sets the program in motion based on the fronted input arguments.
+
+    Returns:
+        None
+    """
+
+    num_processors = cpu_count()
+
+    global ARGS
+    ARGS = process_args(args)
+
+    if not os.path.exists(ARGS.output_path):
+        os.makedirs(ARGS.output_path)
+    
+    #model_type :utils.Model = ARGS.model_selector
+    #if model_type is utils.Model.Custom:
+    #    model = model_type.getCOBRAmodel(customPath = utils.FilePath.fromStrPath(ARGS.model), customExtension = utils.FilePath.fromStrPath(ARGS.model_name).ext)
+    #else:
+    #    model = model_type.getCOBRAmodel(toolDir=ARGS.tool_dir)
+
+    model = utils.build_cobra_model_from_csv(ARGS.model_upload)
+
+    validation = utils.validate_model(model)
+
+    print("\n=== VALIDAZIONE MODELLO ===")
+    for key, value in validation.items():
+        print(f"{key}: {value}")
+
+    #Set solver verbosity to 1 to see warning and error messages only.
+    model.solver.configuration.verbosity = 1
+    
+    ARGS.bounds = ARGS.input.split(",")
+    ARGS.bounds_name = ARGS.names.split(",")
+    ARGS.output_types = ARGS.output_type.split(",")
+    ARGS.output_type_analysis = ARGS.output_type_analysis.split(",")
+
+
+    results = Parallel(n_jobs=num_processors)(delayed(model_sampler)(model, bounds_path, cell_name) for bounds_path, cell_name in zip(ARGS.bounds, ARGS.bounds_name))
+
+    all_mean = pd.concat([result[0] for result in results], ignore_index=False)
+    all_median = pd.concat([result[1] for result in results], ignore_index=False)
+    all_quantiles = pd.concat([result[2] for result in results], ignore_index=False)
+
+    if("mean" in ARGS.output_types):
+        all_mean = all_mean.fillna(0.0)
+        all_mean = all_mean.sort_index()
+        write_to_file(all_mean.T, "mean", True)
+
+    if("median" in ARGS.output_types):
+        all_median = all_median.fillna(0.0)
+        all_median = all_median.sort_index()
+        write_to_file(all_median.T, "median", True)
+    
+    if("quantiles" in ARGS.output_types):
+        all_quantiles = all_quantiles.fillna(0.0)
+        all_quantiles = all_quantiles.sort_index()
+        write_to_file(all_quantiles.T, "quantiles", True)
+
+    index_result = 3
+    if("pFBA" in ARGS.output_type_analysis):
+        all_pFBA = pd.concat([result[index_result] for result in results], ignore_index=False)
+        all_pFBA = all_pFBA.sort_index()
+        write_to_file(all_pFBA.T, "pFBA", True)
+        index_result+=1
+    if("FVA" in ARGS.output_type_analysis):
+        all_FVA= pd.concat([result[index_result] for result in results], ignore_index=False)
+        all_FVA = all_FVA.sort_index()
+        write_to_file(all_FVA.T, "FVA", True)
+        index_result+=1
+    if("sensitivity" in ARGS.output_type_analysis):
+        all_sensitivity = pd.concat([result[index_result] for result in results], ignore_index=False)
+        all_sensitivity = all_sensitivity.sort_index()
+        write_to_file(all_sensitivity.T, "sensitivity", True)
+
+    pass
+        
+##############################################################################
+if __name__ == "__main__":
+    main()
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/COBRAxy/flux_simulation_beta.xml	Mon Sep 08 17:33:52 2025 +0000
@@ -0,0 +1,127 @@
+<tool id="fluxSimulation - Beta" name="Flux Simulation - BETA" version="2.0.0">
+    
+    <macros>
+        <import>marea_macros.xml</import>
+    </macros>
+
+	<requirements>
+        <requirement type="package" version="1.24.4">numpy</requirement>
+        <requirement type="package" version="2.0.3">pandas</requirement>
+		<requirement type="package" version="0.29.0">cobra</requirement>
+        <requirement type="package" version="5.2.2">lxml</requirement>
+        <requirement type="package" version="1.4.2">joblib</requirement>
+        <requirement type="package" version="1.11">scipy</requirement>
+	</requirements>
+
+    <command detect_errors="exit_code">
+        <![CDATA[
+        python $__tool_directory__/flux_simulation_beta.py
+        --tool_dir $__tool_directory__
+        --model_upload $model_upload
+        --input "${",".join(map(str, $inputs))}"
+        #set $names = ""
+        #for $input_temp in $inputs:
+            #set $names = $names + $input_temp.element_identifier + ","
+        #end for
+        --name $names
+        --thinning 0
+        #if $algorithm_param.algorithm == 'OPTGP':
+        	--thinning $algorithm_param.thinning
+        #end if
+        --algorithm $algorithm_param.algorithm
+        --n_batches $n_batches
+        --n_samples $n_samples
+        --seed $seed
+        --output_type "${",".join(map(str, $output_types))}"
+        --output_type_analysis "${",".join(map(str, $output_types_analysis))}"
+        --out_log $log
+        ]]>
+    </command>
+    <inputs>
+
+         <param name="model_upload" argument="--model_upload" type="data" format="csv,tsv,tabular" 
+            label="Model rules file:" help="Upload a CSV/TSV file containing reaction rules generated by the Model Initialization tool." />
+
+
+        <param name="inputs" argument="--inputs" multiple="true" type="data" format="tabular, csv, tsv" label="Bound(s):" />
+        
+        
+        <conditional name="algorithm_param">
+			<param name="algorithm" argument="--algorithm" type="select" label="Choose sampling algorithm:">
+                    <option value="CBS" selected="true">CBS</option>
+                	<option value="OPTGP">OPTGP</option>
+        	</param>
+        	<when value="OPTGP">
+        		<param name="thinning" argument="--thinning" type="integer" label="Thinning:"  value="100" help="Number of iterations to wait before taking a sample."/>
+        	</when>
+
+		</conditional>
+
+
+        <param name="n_samples" argument="--n_samples" type="integer" label="Samples:" value="1000"/>
+
+        <param name="n_batches" argument="--n_batches" type="integer" label="Batches:" value="1" help="This is useful for computational perfomances."/>
+
+        <param name="seed" argument="--seed" type="integer" label="Seed:" value="0" helph="Random seed."/>
+
+        <param type="select" argument="--output_types" multiple="true" name="output_types" label="Desired outputs from sampling">
+            <option value="mean" selected="true">Mean</option>
+            <option value="median" selected="true">Median</option>
+            <option value="quantiles" selected="true">Quantiles</option>
+            <option value="fluxes" selected="false">All fluxes</option>
+        </param>
+
+        <param type="select" argument="--output_types_analysis" multiple="true" name="output_types_analysis" label="Desired outputs from flux analysis">
+            <option value="pFBA" selected="false">pFBA</option>
+            <option value="FVA" selected="false">FVA</option>
+            <option value="sensitivity" selected="false">Sensitivity reaction knock-out (Biomass)</option>
+        </param>
+    </inputs>
+
+        		
+    <outputs>
+        <data format="txt" name="log" label="Flux Simulation - Log" />
+
+        <data name="output" format="tabular" label="Flux Simulation - Output">
+            <discover_datasets pattern="__name_and_ext__"
+                directory="flux_simulation" visible="true" />
+        </data>
+
+    </outputs>
+
+    <help>
+    <![CDATA[
+What it does
+-------------
+
+This tool generates flux samples starting from a model in JSON or XML format by using CBS (Corner-based sampling) or OPTGP (Improved Artificial Centering Hit-and-Run sampler) sampling algorithms.
+
+It can return sampled fluxes by appliying summary statistics: 
+   - mean
+   - median
+   - quantiles (0.25, 0.50, 0.75). 
+
+Flux analysis can be perfomed over the metabolic model:
+   - parsimoniuos-FBA (optimized by Biomass)
+   - FVA
+   - Biomass sensitivity analysis (single reaction knock-out). It is the ratio between the optimal of the Biomass reaction computed by FBA after knocking-out a reaction and the same over the complete model.
+
+Accepted files:
+   - A model: JSON, XML, MAT or YAML (.yml) file reporting reactions and rules contained in the model. Supported compressed formats: .zip, .gz and .bz2. Filename must follow the pattern: {model_name}.{extension}.[zip|gz|bz2]
+   - Context-specific bounds: generated by RAS to Bounds tool. This can be a collection of bounds too (one bounds file per context).
+
+Output:
+-------------
+
+The tool generates:
+   - Samples: reporting the sampled fluxes for each reaction (reaction names on the rows and sample names on the columns). Format: tab-separated.
+   - a log file (.txt).
+
+**TIP**: The Batches parameter is useful to mantain in memory just a batch of samples at time. For example, if you wish to sample 10.000 points, than it is suggested to select n_samples = 1.000 and n_batches=10.
+**TIP**: The Thinning parameter of the OPTGP algorithm is useful to converge to a stationary distribution (see cited articles by Galuzzi, Milazzo and Damiani).
+
+]]>
+    </help>
+    <expand macro="citations_fluxes" />
+ 
+</tool>
\ No newline at end of file