# HG changeset patch # User luca_milaz # Date 1721587673 0 # Node ID d9b29c64efc98cf576d58cbd910c51b78613ee13 # Parent df1eee095863f1946dce7b30eb1acd8308882b02 Uploaded diff -r df1eee095863 -r d9b29c64efc9 marea_2/ras_to_bounds.py --- a/marea_2/ras_to_bounds.py Sun Jul 21 18:47:41 2024 +0000 +++ b/marea_2/ras_to_bounds.py Sun Jul 21 18:47:53 2024 +0000 @@ -39,8 +39,6 @@ parser.add_argument("-meo", "--medium", type = str, help = "path to input file with custom medium, if provided") - - parser.add_argument("-men", "--medium_name", type = str, help = "custom medium name") parser.add_argument('-ol', '--out_log', help = "Output log") @@ -56,7 +54,7 @@ parser.add_argument('-rs', '--ras_selector', required = True, - type=str, + type=bool, help = 'ras selector') ARGS = parser.parse_args() @@ -77,10 +75,42 @@ log.write(s + "\n\n") print(s) +def generate_bounds(model:cobra.Model, medium:dict, ras=None) -> pd.DataFrame: + model_new = model.copy() + rxns_ids = [] + for rxn in model.reactions: + rxns_ids.append(rxn.id) + for reaction in medium.keys(): + if(medium[reaction] != None): + model_new.reactions.get_by_id(reaction).lower_bound=-float(medium[reaction]) + df_FVA = cobra.flux_analysis.flux_variability_analysis(model_new,fraction_of_optimum=0,processes=1).round(8) + for reaction in rxns_ids: + model_new.reactions.get_by_id(reaction).lower_bound=float(df_FVA.loc[reaction,"minimum"]) + model_new.reactions.get_by_id(reaction).upper_bound=float(df_FVA.loc[reaction,"maximum"]) + + if(ras is not None): + for reaction in rxns_ids: + if reaction in ras.keys(): + lower_bound=model_new.reactions.get_by_id(reaction).lower_bound + upper_bound=model_new.reactions.get_by_id(reaction).upper_bound + valMax=float((upper_bound)*ras[reaction]) + valMin=float((lower_bound)*ras[reaction]) + if upper_bound!=0 and lower_bound==0: + model_new.reactions.get_by_id(reaction).upper_bound=valMax + if upper_bound==0 and lower_bound!=0: + model_new.reactions.get_by_id(reaction).lower_bound=valMin + if upper_bound!=0 and lower_bound!=0: + model_new.reactions.get_by_id(reaction).lower_bound=valMin + model_new.reactions.get_by_id(reaction).upper_bound=valMax + rxns = [] + for reaction in model_new.reactions: + rxns.append(reaction.id) -def write_to_file(dataset: pd.DataFrame, name: str, keep_index:bool=False)->None: - dataset.to_csv(ARGS.output_folder + name + ".csv", sep = '\t', index = keep_index) + bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns) + for reaction in model.reactions: + bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound] + return bounds ############################# main ########################################### @@ -91,35 +121,46 @@ Returns: None """ - if not os.path.exists('model_generator'): - os.makedirs('model_generator') + if not os.path.exists('ras_to_bounds'): + os.makedirs('ras_to_bounds') - num_processors = cpu_count() + ARGS.output_folder = 'ras_to_bounds/' global ARGS ARGS = process_args(sys.argv) - - ARGS.output_folder = 'model_generator/' ARGS.output_types = ARGS.output_type.split(",") - ras = pd.read_table(ARGS.input_ras, header=0, sep=r'\s+', index_col = 0).T - ras.replace("None", None, inplace=True) - ras = ras.astype(float) + boundsPath = utils.FilePath("bounds", ".csv", prefix = ARGS.output_folder) + mediumPath = utils.FilePath("medium", ".csv", prefix = ARGS.output_folder) - #medium has rows cells and columns medium reactions, not common reactions set to None - medium = pd.read_csv(ARGS.input_medium, sep = '\t', header = 0, engine='python', index_col = 0) - medium = ras.astype(float) - + if(ARGS.ras_selector == True): + ras = pd.read_table(ARGS.input_ras, header=0, sep=r'\s+', index_col = 0).T + ras.replace("None", None, inplace=True) + ras = ras.astype(float) + model_type :utils.Model = ARGS.model_selector if model_type is utils.Model.Custom: model = model_type.getCOBRAmodel(customPath = utils.FilePath.fromStrPath(ARGS.model), customExtension = utils.FilePath.fromStrPath(ARGS.model_name).ext) else: model = model_type.getCOBRAmodel(toolDir=ARGS.tool_dir) - '''for index, row in ras.iterrows(): #iterate over cells RAS - generate_model(model, index, row, medium.loc[index], ARGS.output_model_format)''' + if(ARGS.medium_selector == "Custom"): + medium = pd.read_csv(ARGS.input_medium, index_col = 0) + medium = medium.astype(float) + medium = medium['medium'].to_dict() + else: + df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0) + medium = df_mediums[[ARGS.medium_selector]] + medium = medium[ARGS.medium_selector].to_dict() + if(ARGS.ras_selector == True): + bounds = generate_bounds(model, medium, ras = ras) + else: + bounds = generate_bounds(model, medium) + + bounds.to_csv(boundsPath.show()) + medium.to_csv(mediumPath.show()) pass ##############################################################################