Mercurial > repos > bimib > cobraxy
diff COBRAxy/custom_data_generator.py @ 406:187cee1a00e2 draft
Uploaded
author | francesco_lapi |
---|---|
date | Mon, 08 Sep 2025 14:44:15 +0000 |
parents | 08f1ff359397 |
children |
line wrap: on
line diff
--- a/COBRAxy/custom_data_generator.py Mon Sep 08 13:52:58 2025 +0000 +++ b/COBRAxy/custom_data_generator.py Mon Sep 08 14:44:15 2025 +0000 @@ -10,40 +10,36 @@ import utils.reaction_parsing as reactionUtils ARGS : argparse.Namespace -def process_args(args: List[str] = None) -> argparse.Namespace: +def process_args(args:List[str] = None) -> argparse.Namespace: """ - Parse command-line arguments for CustomDataGenerator. - """ + Interfaces the script of a module with its frontend, making the user's choices for + various parameters available as values in code. - parser = argparse.ArgumentParser( - usage="%(prog)s [options]", - description="Generate custom data from a given model" - ) - - parser.add_argument("--out_log", type=str, required=True, - help="Output log file") + Args: + args : Always obtained (in file) from sys.argv - parser.add_argument("--model", type=str, - help="Built-in model identifier (e.g., ENGRO2, Recon, HMRcore)") - parser.add_argument("--input", type=str, - help="Custom model file (JSON or XML)") - parser.add_argument("--name", type=str, required=True, - help="Model name (default or custom)") + Returns: + Namespace : An object containing the parsed arguments + """ + parser = argparse.ArgumentParser( + usage = "%(prog)s [options]", + description = "generate custom data from a given model") - parser.add_argument("--medium_selector", type=str, required=True, - help="Medium selection option") + parser.add_argument("-ol", "--out_log", type = str, required = True, help = "Output log") - parser.add_argument("--gene_format", type=str, default="Default", - help="Gene nomenclature format: Default (original), ENSNG, HGNC_SYMBOL, HGNC_ID, ENTREZ") - - parser.add_argument("--out_tabular", type=str, - help="Output file for the merged dataset (CSV or XLSX)") - - parser.add_argument("--tool_dir", type=str, default=os.path.dirname(__file__), - help="Tool directory (passed from Galaxy as $__tool_directory__)") + parser.add_argument("-orules", "--out_rules", type = str, required = True, help = "Output rules") + parser.add_argument("-orxns", "--out_reactions", type = str, required = True, help = "Output reactions") + parser.add_argument("-omedium", "--out_medium", type = str, required = True, help = "Output medium") + parser.add_argument("-obnds", "--out_bounds", type = str, required = True, help = "Output bounds") + parser.add_argument("-id", "--input", type = str, required = True, help = "Input model") + parser.add_argument("-mn", "--name", type = str, required = True, help = "Input model name") + # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in + parser.add_argument('-idop', '--output_path', type = str, default='result', help = 'output path for maps') + argsNamespace = parser.parse_args(args) + # ^ can't get this one to work from xml, there doesn't seem to be a way to get the directory attribute from the collection - return parser.parse_args(args) + return argsNamespace ################################- INPUT DATA LOADING -################################ def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model: @@ -147,52 +143,6 @@ return bounds - -def generate_compartments(model: cobra.Model) -> pd.DataFrame: - """ - Generates a DataFrame containing compartment information for each reaction. - Creates columns for each compartment position (Compartment_1, Compartment_2, etc.) - - Args: - model: the COBRA model to extract compartment data from. - - Returns: - pd.DataFrame: DataFrame with ReactionID and compartment columns - """ - pathway_data = [] - - # First pass: determine the maximum number of pathways any reaction has - max_pathways = 0 - reaction_pathways = {} - - for reaction in model.reactions: - # Get unique pathways from all metabolites in the reaction - if type(reaction.annotation['pathways']) == list: - reaction_pathways[reaction.id] = reaction.annotation['pathways'] - max_pathways = max(max_pathways, len(reaction.annotation['pathways'])) - else: - reaction_pathways[reaction.id] = [reaction.annotation['pathways']] - - # Create column names for pathways - pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)] - - # Second pass: create the data - for reaction_id, pathways in reaction_pathways.items(): - row = {"ReactionID": reaction_id} - - # Fill pathway columns - for i in range(max_pathways): - col_name = pathway_columns[i] - if i < len(pathways): - row[col_name] = pathways[i] - else: - row[col_name] = None # or "" if you prefer empty strings - - pathway_data.append(row) - - return pd.DataFrame(pathway_data) - - ###############################- FILE SAVING -################################ def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None: """ @@ -232,14 +182,6 @@ for key, value in data.items(): writer.writerow({ fieldNames[0] : key, fieldNames[1] : value }) -def save_as_tabular_df(df: pd.DataFrame, path: str) -> None: - try: - os.makedirs(os.path.dirname(path) or ".", exist_ok=True) - df.to_csv(path, sep="\t", index=False) - except Exception as e: - raise utils.DataErr(path, f"failed writing tabular output: {e}") - - ###############################- ENTRY POINT -################################ def main(args:List[str] = None) -> None: """ @@ -252,92 +194,24 @@ global ARGS ARGS = process_args(args) - - if ARGS.input: - # load custom model - model = load_custom_model( - utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext) - else: - # load built-in model - - try: - model_enum = utils.Model[ARGS.model] # e.g., Model['ENGRO2'] - except KeyError: - raise utils.ArgsErr("model", "one of Recon/ENGRO2/HMRcore/Custom_model", ARGS.model) - - # Load built-in model (Model.getCOBRAmodel uses tool_dir to locate local models) - try: - model = model_enum.getCOBRAmodel(toolDir=ARGS.tool_dir) - except Exception as e: - # Wrap/normalize load errors as DataErr for consistency - raise utils.DataErr(ARGS.model, f"failed loading built-in model: {e}") + # this is the worst thing I've seen so far, congrats to the former MaREA devs for suggesting this! + if os.path.isdir(ARGS.output_path) == False: os.makedirs(ARGS.output_path) - # Determine final model name: explicit --name overrides, otherwise use the model id - - model_name = ARGS.name if ARGS.name else ARGS.model - - if ARGS.name == "ENGRO2" and ARGS.medium_selector != "Default": - df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0) - ARGS.medium_selector = ARGS.medium_selector.replace("_", " ") - medium = df_mediums[[ARGS.medium_selector]] - medium = medium[ARGS.medium_selector].to_dict() - - # Set all reactions to zero in the medium - for rxn_id, _ in model.medium.items(): - model.reactions.get_by_id(rxn_id).lower_bound = float(0.0) - - # Set medium conditions - for reaction, value in medium.items(): - if value is not None: - model.reactions.get_by_id(reaction).lower_bound = -float(value) - - if ARGS.name == "ENGRO2" and ARGS.gene_format != "Default": - - model = utils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC ")) + # load custom model + model = load_custom_model( + utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext) # generate data rules = generate_rules(model, asParsed = False) reactions = generate_reactions(model, asParsed = False) bounds = generate_bounds(model) medium = get_medium(model) - if ARGS.name == "ENGRO2": - compartments = generate_compartments(model) - df_rules = pd.DataFrame(list(rules.items()), columns = ["ReactionID", "Rule"]) - df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Reaction"]) - - df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) - df_medium = medium.rename(columns = {"reaction": "ReactionID"}) - df_medium["InMedium"] = True # flag per indicare la presenza nel medium - - merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer") - merged = merged.merge(df_bounds, on = "ReactionID", how = "outer") - if ARGS.name == "ENGRO2": - merged = merged.merge(compartments, on = "ReactionID", how = "outer") - merged = merged.merge(df_medium, on = "ReactionID", how = "left") - - merged["InMedium"] = merged["InMedium"].fillna(False) - - merged = merged.sort_values(by = "InMedium", ascending = False) - - #out_file = os.path.join(ARGS.output_path, f"{os.path.basename(ARGS.name).split('.')[0]}_custom_data") - - #merged.to_csv(out_file, sep = '\t', index = False) - - - #### - - - if not ARGS.out_tabular: - raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular) - save_as_tabular_df(merged, ARGS.out_tabular) - expected = ARGS.out_tabular - - # verify output exists and non-empty - if not expected or not os.path.exists(expected) or os.path.getsize(expected) == 0: - raise utils.DataErr(expected, "Output non creato o vuoto") - - print("CustomDataGenerator: completed successfully") + # save files out of collection: path coming from xml + save_as_csv(rules, ARGS.out_rules, ("ReactionID", "Rule")) + save_as_csv(reactions, ARGS.out_reactions, ("ReactionID", "Reaction")) + bounds.to_csv(ARGS.out_bounds, sep = '\t') + medium.to_csv(ARGS.out_medium, sep = '\t') if __name__ == '__main__': main() \ No newline at end of file