Mercurial > repos > bimib > cobraxy
changeset 461:73f02860f7d7 draft
Uploaded
author | luca_milaz |
---|---|
date | Mon, 22 Sep 2025 13:51:19 +0000 |
parents | 6a7010997b32 |
children | 227a1031ae47 |
files | COBRAxy/flux_simulation_beta.py |
diffstat | 1 files changed, 198 insertions(+), 124 deletions(-) [+] |
line wrap: on
line diff
--- a/COBRAxy/flux_simulation_beta.py Mon Sep 22 13:42:03 2025 +0000 +++ b/COBRAxy/flux_simulation_beta.py Mon Sep 22 13:51:19 2025 +0000 @@ -15,6 +15,7 @@ from typing import List import os import pandas as pd +import numpy as np import cobra import utils.CBS_backend as CBS_backend from joblib import Parallel, delayed, cpu_count @@ -24,97 +25,95 @@ ################################# process args ############################### -def process_args(args :List[str] = None) -> argparse.Namespace: +def process_args(args: List[str] = None) -> argparse.Namespace: """ Processes command-line arguments. - + Args: args (list): List of command-line arguments. - + Returns: Namespace: An object containing parsed arguments. """ - parser = argparse.ArgumentParser(usage = '%(prog)s [options]', - description = 'process some value\'s') + parser = argparse.ArgumentParser(usage='%(prog)s [options]', + description='process some value\'s') + + parser.add_argument("-mo", "--model_upload", type=str, + help="path to input file with custom rules, if provided") - parser.add_argument("-mo", "--model_upload", type = str, - help = "path to input file with custom rules, if provided") - - parser.add_argument("-mab", "--model_and_bounds", type = str, - choices = ['True', 'False'], - required = True, - help = "upload mode: True for model+bounds, False for complete models") - - - parser.add_argument('-ol', '--out_log', - help = "Output log") + parser.add_argument("-mab", "--model_and_bounds", type=str, + choices=['True', 'False'], + required=True, + help="upload mode: True for model+bounds, False for complete models") + + parser.add_argument('-ol', '--out_log', + help="Output log") parser.add_argument('-td', '--tool_dir', - type = str, - required = True, - help = 'your tool directory') + type=str, + required=True, + help='your tool directory') parser.add_argument('-in', '--input', - required = True, - type=str, - help = 'input bounds files or complete model files') + required=True, + type=str, + help='input bounds files or complete model files') parser.add_argument('-ni', '--name', - required = True, + required=True, type=str, - help = 'cell names') + help='cell names') parser.add_argument('-a', '--algorithm', - type = str, - choices = ['OPTGP', 'CBS'], - required = True, - help = 'choose sampling algorithm') + type=str, + choices=['OPTGP', 'CBS'], + required=True, + help='choose sampling algorithm') - parser.add_argument('-th', '--thinning', - type = int, - default= 100, + parser.add_argument('-th', '--thinning', + type=int, + default=100, required=False, - help = 'choose thinning') + help='choose thinning') - parser.add_argument('-ns', '--n_samples', - type = int, - required = True, - help = 'choose how many samples') + parser.add_argument('-ns', '--n_samples', + type=int, + required=True, + help='choose how many samples (set to 0 for optimization only)') - parser.add_argument('-sd', '--seed', - type = int, - required = True, - help = 'seed') + parser.add_argument('-sd', '--seed', + type=int, + required=True, + help='seed for random number generation') - parser.add_argument('-nb', '--n_batches', - type = int, - required = True, - help = 'choose how many batches') + parser.add_argument('-nb', '--n_batches', + type=int, + required=True, + help='choose how many batches') parser.add_argument('-opt', '--perc_opt', - type = float, + type=float, default=0.9, - required = False, - help = 'choose the fraction of optimality for FVA (0-1)') + required=False, + help='choose the fraction of optimality for FVA (0-1)') - parser.add_argument('-ot', '--output_type', - type = str, - required = True, - help = 'output type') + parser.add_argument('-ot', '--output_type', + type=str, + required=True, + help='output type for sampling results') - parser.add_argument('-ota', '--output_type_analysis', - type = str, - required = False, - help = 'output type analysis') + parser.add_argument('-ota', '--output_type_analysis', + type=str, + required=False, + help='output type analysis (optimization methods)') - parser.add_argument('-idop', '--output_path', - type = str, + parser.add_argument('-idop', '--output_path', + type=str, default='flux_simulation', - help = 'output path for maps') + help='output path for maps') ARGS = parser.parse_args(args) return ARGS - ########################### warning ########################################### def warning(s :str) -> None: """ @@ -172,10 +171,10 @@ -def OPTGP_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, thinning:int=100, n_batches:int=1, seed:int=0)-> None: +def OPTGP_sampler(model: cobra.Model, model_name: str, n_samples: int = 1000, thinning: int = 100, n_batches: int = 1, seed: int = 0) -> None: """ Samples from the OPTGP (Optimal Global Perturbation) algorithm and saves the results to CSV files. - + Args: model (cobra.Model): The COBRA model to sample from. model_name (str): The name of the model, used in naming output files. @@ -183,68 +182,125 @@ thinning (int, optional): Thinning parameter for the sampler. Default is 100. n_batches (int, optional): Number of batches to run. Default is 1. seed (int, optional): Random seed for reproducibility. Default is 0. - + Returns: None """ - - for i in range(0, n_batches): + import numpy as np + + # Get reaction IDs for consistent column ordering + reaction_ids = [rxn.id for rxn in model.reactions] + + # Sample and save each batch as numpy file + for i in range(n_batches): optgp = OptGPSampler(model, thinning, seed) samples = optgp.sample(n_samples) - samples.to_csv(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_OPTGP.csv', index=False) - seed+=1 - samplesTotal = pd.DataFrame() - for i in range(0, n_batches): - samples_batch = pd.read_csv(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_OPTGP.csv') - samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True) - + + # Save as numpy array (more memory efficient) + batch_filename = f"{ARGS.output_path}/{model_name}_{i}_OPTGP.npy" + np.save(batch_filename, samples.values) + + seed += 1 + + # Merge all batches into a single DataFrame + all_samples = [] + + for i in range(n_batches): + batch_filename = f"{ARGS.output_path}/{model_name}_{i}_OPTGP.npy" + batch_data = np.load(batch_filename) + all_samples.append(batch_data) + + # Concatenate all batches + samplesTotal_array = np.vstack(all_samples) + + # Convert back to DataFrame with proper column names + samplesTotal = pd.DataFrame(samplesTotal_array, columns=reaction_ids) + + # Save the final merged result as CSV write_to_file(samplesTotal.T, model_name, True) - - for i in range(0, n_batches): - os.remove(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_OPTGP.csv') + + # Clean up temporary numpy files + for i in range(n_batches): + batch_filename = f"{ARGS.output_path}/{model_name}_{i}_OPTGP.npy" + if os.path.exists(batch_filename): + os.remove(batch_filename) -def CBS_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, n_batches:int=1, seed:int=0)-> None: +def CBS_sampler(model: cobra.Model, model_name: str, n_samples: int = 1000, n_batches: int = 1, seed: int = 0) -> None: """ Samples using the CBS (Constraint-based Sampling) algorithm and saves the results to CSV files. - + Args: model (cobra.Model): The COBRA model to sample from. model_name (str): The name of the model, used in naming output files. n_samples (int, optional): Number of samples per batch. Default is 1000. n_batches (int, optional): Number of batches to run. Default is 1. seed (int, optional): Random seed for reproducibility. Default is 0. - + Returns: None """ - - df_FVA = cobra.flux_analysis.flux_variability_analysis(model,fraction_of_optimum=0).round(6) + import numpy as np + + # Get reaction IDs for consistent column ordering + reaction_ids = [reaction.id for reaction in model.reactions] + + # Perform FVA analysis once for all batches + df_FVA = cobra.flux_analysis.flux_variability_analysis(model, fraction_of_optimum=0).round(6) + + # Generate random objective functions for all samples across all batches + df_coefficients = CBS_backend.randomObjectiveFunction(model, n_samples * n_batches, df_FVA, seed=seed) - df_coefficients = CBS_backend.randomObjectiveFunction(model, n_samples*n_batches, df_FVA, seed=seed) - - for i in range(0, n_batches): - samples = pd.DataFrame(columns =[reaction.id for reaction in model.reactions], index = range(n_samples)) + # Sample and save each batch as numpy file + for i in range(n_batches): + samples = pd.DataFrame(columns=reaction_ids, index=range(n_samples)) + try: - CBS_backend.randomObjectiveFunctionSampling(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], samples) + CBS_backend.randomObjectiveFunctionSampling( + model, + n_samples, + df_coefficients.iloc[:, i * n_samples:(i + 1) * n_samples], + samples + ) except Exception as e: utils.logWarning( - "Warning: GLPK solver has failed for " + model_name + ". Trying with COBRA interface. Error:" + str(e), - ARGS.out_log) - CBS_backend.randomObjectiveFunctionSampling_cobrapy(model, n_samples, df_coefficients.iloc[:,i*n_samples:(i+1)*n_samples], - samples) - utils.logWarning(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv', ARGS.out_log) - samples.to_csv(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv', index=False) - - samplesTotal = pd.DataFrame() - for i in range(0, n_batches): - samples_batch = pd.read_csv(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv') - samplesTotal = pd.concat([samplesTotal, samples_batch], ignore_index = True) - + f"Warning: GLPK solver has failed for {model_name}. Trying with COBRA interface. Error: {str(e)}", + ARGS.out_log + ) + CBS_backend.randomObjectiveFunctionSampling_cobrapy( + model, + n_samples, + df_coefficients.iloc[:, i * n_samples:(i + 1) * n_samples], + samples + ) + + # Save as numpy array (more memory efficient) + batch_filename = f"{ARGS.output_path}/{model_name}_{i}_CBS.npy" + utils.logWarning(batch_filename, ARGS.out_log) + np.save(batch_filename, samples.values) + + # Merge all batches into a single DataFrame + all_samples = [] + + for i in range(n_batches): + batch_filename = f"{ARGS.output_path}/{model_name}_{i}_CBS.npy" + batch_data = np.load(batch_filename) + all_samples.append(batch_data) + + # Concatenate all batches + samplesTotal_array = np.vstack(all_samples) + + # Convert back to DataFrame with proper column names + samplesTotal = pd.DataFrame(samplesTotal_array, columns=reaction_ids) + + # Save the final merged result as CSV write_to_file(samplesTotal.T, model_name, True) - - for i in range(0, n_batches): - os.remove(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv') + + # Clean up temporary numpy files + for i in range(n_batches): + batch_filename = f"{ARGS.output_path}/{model_name}_{i}_CBS.npy" + if os.path.exists(batch_filename): + os.remove(batch_filename) @@ -411,7 +467,7 @@ return df_pFBA, df_FVA, df_sensitivity ############################# main ########################################### -def main(args :List[str] = None) -> None: +def main(args: List[str] = None) -> None: """ Initialize and run sampling/analysis based on the frontend input arguments. @@ -435,12 +491,16 @@ # optional analysis output types -> list or empty ARGS.output_type_analysis = ARGS.output_type_analysis.split(",") if ARGS.output_type_analysis else [] + # Determine if sampling should be performed + perform_sampling = ARGS.n_samples > 0 + print("=== INPUT FILES ===") print(f"{ARGS.input_files}") print(f"{ARGS.file_names}") print(f"{ARGS.output_type}") print(f"{ARGS.output_types}") print(f"{ARGS.output_type_analysis}") + print(f"Sampling enabled: {perform_sampling} (n_samples: {ARGS.n_samples})") if ARGS.model_and_bounds == "True": # MODE 1: Model + bounds (separate files) @@ -477,38 +537,52 @@ for model_file, cell_name in zip(ARGS.input_files, ARGS.file_names) ) - - all_mean = pd.concat([result[0] for result in results], ignore_index=False) - all_median = pd.concat([result[1] for result in results], ignore_index=False) - all_quantiles = pd.concat([result[2] for result in results], ignore_index=False) + # Handle sampling outputs (only if sampling was performed) + if perform_sampling: + print("=== PROCESSING SAMPLING RESULTS ===") + + all_mean = pd.concat([result[0] for result in results], ignore_index=False) + all_median = pd.concat([result[1] for result in results], ignore_index=False) + all_quantiles = pd.concat([result[2] for result in results], ignore_index=False) - if("mean" in ARGS.output_types): - all_mean = all_mean.fillna(0.0) - all_mean = all_mean.sort_index() - write_to_file(all_mean.T, "mean", True) + if "mean" in ARGS.output_types: + all_mean = all_mean.fillna(0.0) + all_mean = all_mean.sort_index() + write_to_file(all_mean.T, "mean", True) - if("median" in ARGS.output_types): - all_median = all_median.fillna(0.0) - all_median = all_median.sort_index() - write_to_file(all_median.T, "median", True) + if "median" in ARGS.output_types: + all_median = all_median.fillna(0.0) + all_median = all_median.sort_index() + write_to_file(all_median.T, "median", True) + + if "quantiles" in ARGS.output_types: + all_quantiles = all_quantiles.fillna(0.0) + all_quantiles = all_quantiles.sort_index() + write_to_file(all_quantiles.T, "quantiles", True) + else: + print("=== SAMPLING SKIPPED (n_samples = 0) ===") + + # Handle optimization analysis outputs (always available) + print("=== PROCESSING OPTIMIZATION RESULTS ===") - if("quantiles" in ARGS.output_types): - all_quantiles = all_quantiles.fillna(0.0) - all_quantiles = all_quantiles.sort_index() - write_to_file(all_quantiles.T, "quantiles", True) - - index_result = 3 - if("pFBA" in ARGS.output_type_analysis): + # Determine the starting index for optimization results + # If sampling was performed, optimization results start at index 3 + # If no sampling, optimization results start at index 0 + index_result = 3 if perform_sampling else 0 + + if "pFBA" in ARGS.output_type_analysis: all_pFBA = pd.concat([result[index_result] for result in results], ignore_index=False) all_pFBA = all_pFBA.sort_index() write_to_file(all_pFBA.T, "pFBA", True) - index_result+=1 - if("FVA" in ARGS.output_type_analysis): - all_FVA= pd.concat([result[index_result] for result in results], ignore_index=False) + index_result += 1 + + if "FVA" in ARGS.output_type_analysis: + all_FVA = pd.concat([result[index_result] for result in results], ignore_index=False) all_FVA = all_FVA.sort_index() write_to_file(all_FVA.T, "FVA", True) - index_result+=1 - if("sensitivity" in ARGS.output_type_analysis): + index_result += 1 + + if "sensitivity" in ARGS.output_type_analysis: all_sensitivity = pd.concat([result[index_result] for result in results], ignore_index=False) all_sensitivity = all_sensitivity.sort_index() write_to_file(all_sensitivity.T, "sensitivity", True)