| 
491
 | 
     1 """
 | 
| 
 | 
     2 Scripts to generate a tabular file of a metabolic model (built-in or custom).
 | 
| 
 | 
     3 
 | 
| 
 | 
     4 This script loads a COBRA model (built-in or custom), optionally applies
 | 
| 
 | 
     5 medium and gene nomenclature settings, derives reaction-related metadata
 | 
| 
 | 
     6 (GPR rules, formulas, bounds, objective coefficients, medium membership,
 | 
| 
 | 
     7 and compartments for ENGRO2), and writes a tabular summary.
 | 
| 
 | 
     8 """
 | 
| 
 | 
     9 
 | 
| 
 | 
    10 import os
 | 
| 
 | 
    11 import csv
 | 
| 
 | 
    12 import cobra
 | 
| 
 | 
    13 import argparse
 | 
| 
 | 
    14 import pandas as pd
 | 
| 
 | 
    15 import utils.general_utils as utils
 | 
| 
 | 
    16 from typing import Optional, Tuple, List
 | 
| 
 | 
    17 import utils.model_utils as modelUtils
 | 
| 
 | 
    18 import logging
 | 
| 
 | 
    19 from pathlib import Path
 | 
| 
 | 
    20 
 | 
| 
 | 
    21 
 | 
| 
 | 
    22 ARGS : argparse.Namespace
 | 
| 
 | 
    23 def process_args(args: List[str] = None) -> argparse.Namespace:
 | 
| 
 | 
    24     """
 | 
| 
 | 
    25     Parse command-line arguments for metabolic_model_setting.
 | 
| 
 | 
    26     """
 | 
| 
 | 
    27 
 | 
| 
 | 
    28     parser = argparse.ArgumentParser(
 | 
| 
 | 
    29         usage="%(prog)s [options]",
 | 
| 
 | 
    30         description="Generate custom data from a given model"
 | 
| 
 | 
    31     )
 | 
| 
 | 
    32 
 | 
| 
 | 
    33     parser.add_argument("--out_log", type=str, required=True,
 | 
| 
 | 
    34                         help="Output log file")
 | 
| 
 | 
    35 
 | 
| 
 | 
    36     parser.add_argument("--model", type=str,
 | 
| 
 | 
    37                         help="Built-in model identifier (e.g., ENGRO2, Recon, HMRcore)")
 | 
| 
 | 
    38     parser.add_argument("--input", type=str,
 | 
| 
498
 | 
    39                         help="Custom model file (JSON, XML, MAT, YAML)")
 | 
| 
496
 | 
    40     parser.add_argument("--name", nargs='*', required=True,
 | 
| 
491
 | 
    41                         help="Model name (default or custom)")
 | 
| 
 | 
    42     
 | 
| 
 | 
    43     parser.add_argument("--medium_selector", type=str, required=True,
 | 
| 
 | 
    44                         help="Medium selection option")
 | 
| 
 | 
    45 
 | 
| 
 | 
    46     parser.add_argument("--gene_format", type=str, default="Default",
 | 
| 
 | 
    47                         help="Gene nomenclature format: Default (original), ENSNG, HGNC_SYMBOL, HGNC_ID, ENTREZ")
 | 
| 
 | 
    48     
 | 
| 
 | 
    49     parser.add_argument("--out_tabular", type=str,
 | 
| 
 | 
    50                         help="Output file for the merged dataset (CSV or XLSX)")
 | 
| 
 | 
    51     
 | 
| 
 | 
    52     parser.add_argument("--tool_dir", type=str, default=os.path.dirname(__file__),
 | 
| 
 | 
    53                         help="Tool directory (passed from Galaxy as $__tool_directory__)")
 | 
| 
 | 
    54 
 | 
| 
 | 
    55 
 | 
| 
 | 
    56     return parser.parse_args(args)
 | 
| 
 | 
    57 
 | 
| 
 | 
    58 ################################- INPUT DATA LOADING -################################
 | 
| 
498
 | 
    59 def detect_file_format(file_path: str) -> utils.FileFormat:
 | 
| 
 | 
    60     """
 | 
| 
 | 
    61     Detect file format by examining file content and extension.
 | 
| 
 | 
    62     Handles Galaxy .dat files by looking at content.
 | 
| 
 | 
    63     """
 | 
| 
 | 
    64     try:
 | 
| 
 | 
    65         with open(file_path, 'r') as f:
 | 
| 
 | 
    66             first_lines = ''.join([f.readline() for _ in range(5)])
 | 
| 
 | 
    67         
 | 
| 
 | 
    68         # Check for XML (SBML)
 | 
| 
 | 
    69         if '<?xml' in first_lines or '<sbml' in first_lines:
 | 
| 
 | 
    70             return utils.FileFormat.XML
 | 
| 
 | 
    71         
 | 
| 
 | 
    72         # Check for JSON
 | 
| 
 | 
    73         if first_lines.strip().startswith('{'):
 | 
| 
 | 
    74             return utils.FileFormat.JSON
 | 
| 
 | 
    75             
 | 
| 
 | 
    76         # Check for YAML
 | 
| 
 | 
    77         if any(line.strip().endswith(':') for line in first_lines.split('\n')[:3]):
 | 
| 
 | 
    78             return utils.FileFormat.YML
 | 
| 
 | 
    79             
 | 
| 
 | 
    80     except:
 | 
| 
 | 
    81         pass
 | 
| 
 | 
    82     
 | 
| 
 | 
    83     # Fall back to extension-based detection
 | 
| 
 | 
    84     if file_path.endswith('.xml') or file_path.endswith('.sbml'):
 | 
| 
 | 
    85         return utils.FileFormat.XML
 | 
| 
 | 
    86     elif file_path.endswith('.json'):
 | 
| 
 | 
    87         return utils.FileFormat.JSON
 | 
| 
 | 
    88     elif file_path.endswith('.mat'):
 | 
| 
 | 
    89         return utils.FileFormat.MAT
 | 
| 
 | 
    90     elif file_path.endswith('.yml') or file_path.endswith('.yaml'):
 | 
| 
 | 
    91         return utils.FileFormat.YML
 | 
| 
 | 
    92     
 | 
| 
 | 
    93     # Default to XML for unknown extensions
 | 
| 
 | 
    94     return utils.FileFormat.XML
 | 
| 
 | 
    95 
 | 
| 
491
 | 
    96 def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model:
 | 
| 
 | 
    97     """
 | 
| 
 | 
    98     Loads a custom model from a file, either in JSON, XML, MAT, or YML format.
 | 
| 
 | 
    99 
 | 
| 
 | 
   100     Args:
 | 
| 
 | 
   101         file_path : The path to the file containing the custom model.
 | 
| 
 | 
   102         ext : explicit file extension. Necessary for standard use in galaxy because of its weird behaviour.
 | 
| 
 | 
   103 
 | 
| 
 | 
   104     Raises:
 | 
| 
 | 
   105         DataErr : if the file is in an invalid format or cannot be opened for whatever reason.    
 | 
| 
 | 
   106     
 | 
| 
 | 
   107     Returns:
 | 
| 
 | 
   108         cobra.Model : the model, if successfully opened.
 | 
| 
 | 
   109     """
 | 
| 
 | 
   110     ext = ext if ext else file_path.ext
 | 
| 
 | 
   111     try:
 | 
| 
 | 
   112         if ext is utils.FileFormat.XML:
 | 
| 
 | 
   113             return cobra.io.read_sbml_model(file_path.show())
 | 
| 
 | 
   114         
 | 
| 
 | 
   115         if ext is utils.FileFormat.JSON:
 | 
| 
 | 
   116             return cobra.io.load_json_model(file_path.show())
 | 
| 
 | 
   117 
 | 
| 
 | 
   118         if ext is utils.FileFormat.MAT:
 | 
| 
 | 
   119             return cobra.io.load_matlab_model(file_path.show())
 | 
| 
 | 
   120 
 | 
| 
 | 
   121         if ext is utils.FileFormat.YML:
 | 
| 
 | 
   122             return cobra.io.load_yaml_model(file_path.show())
 | 
| 
 | 
   123 
 | 
| 
 | 
   124     except Exception as e: raise utils.DataErr(file_path, e.__str__())
 | 
| 
 | 
   125     raise utils.DataErr(
 | 
| 
 | 
   126         file_path,
 | 
| 
 | 
   127         f"Unrecognized format '{file_path.ext}'. Only JSON, XML, MAT, YML are supported."
 | 
| 
 | 
   128     )
 | 
| 
 | 
   129 
 | 
| 
 | 
   130 
 | 
| 
 | 
   131 ###############################- FILE SAVING -################################
 | 
| 
 | 
   132 def save_as_csv_filePath(data :dict, file_path :utils.FilePath, fieldNames :Tuple[str, str]) -> None:
 | 
| 
 | 
   133     """
 | 
| 
 | 
   134     Saves any dictionary-shaped data in a .csv file created at the given file_path as FilePath.
 | 
| 
 | 
   135 
 | 
| 
 | 
   136     Args:
 | 
| 
 | 
   137         data : the data to be written to the file.
 | 
| 
 | 
   138         file_path : the path to the .csv file.
 | 
| 
 | 
   139         fieldNames : the names of the fields (columns) in the .csv file.
 | 
| 
 | 
   140     
 | 
| 
 | 
   141     Returns:
 | 
| 
 | 
   142         None
 | 
| 
 | 
   143     """
 | 
| 
 | 
   144     with open(file_path.show(), 'w', newline='') as csvfile:
 | 
| 
 | 
   145         writer = csv.DictWriter(csvfile, fieldnames = fieldNames, dialect="excel-tab")
 | 
| 
 | 
   146         writer.writeheader()
 | 
| 
 | 
   147 
 | 
| 
 | 
   148         for key, value in data.items():
 | 
| 
 | 
   149             writer.writerow({ fieldNames[0] : key, fieldNames[1] : value })
 | 
| 
 | 
   150 
 | 
| 
 | 
   151 def save_as_csv(data :dict, file_path :str, fieldNames :Tuple[str, str]) -> None:
 | 
| 
 | 
   152     """
 | 
| 
 | 
   153     Saves any dictionary-shaped data in a .csv file created at the given file_path as string.
 | 
| 
 | 
   154 
 | 
| 
 | 
   155     Args:
 | 
| 
 | 
   156         data : the data to be written to the file.
 | 
| 
 | 
   157         file_path : the path to the .csv file.
 | 
| 
 | 
   158         fieldNames : the names of the fields (columns) in the .csv file.
 | 
| 
 | 
   159     
 | 
| 
 | 
   160     Returns:
 | 
| 
 | 
   161         None
 | 
| 
 | 
   162     """
 | 
| 
 | 
   163     with open(file_path, 'w', newline='') as csvfile:
 | 
| 
 | 
   164         writer = csv.DictWriter(csvfile, fieldnames = fieldNames, dialect="excel-tab")
 | 
| 
 | 
   165         writer.writeheader()
 | 
| 
 | 
   166 
 | 
| 
 | 
   167         for key, value in data.items():
 | 
| 
 | 
   168             writer.writerow({ fieldNames[0] : key, fieldNames[1] : value })
 | 
| 
 | 
   169 
 | 
| 
 | 
   170 def save_as_tabular_df(df: pd.DataFrame, path: str) -> None:
 | 
| 
 | 
   171     """
 | 
| 
 | 
   172     Save a pandas DataFrame as a tab-separated file, creating directories as needed.
 | 
| 
 | 
   173 
 | 
| 
 | 
   174     Args:
 | 
| 
 | 
   175         df: The DataFrame to write.
 | 
| 
 | 
   176         path: Destination file path (will be written as TSV).
 | 
| 
 | 
   177 
 | 
| 
 | 
   178     Raises:
 | 
| 
 | 
   179         DataErr: If writing the output fails for any reason.
 | 
| 
 | 
   180 
 | 
| 
 | 
   181     Returns:
 | 
| 
 | 
   182         None
 | 
| 
 | 
   183     """
 | 
| 
 | 
   184     try:
 | 
| 
 | 
   185         os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
 | 
| 
 | 
   186         df.to_csv(path, sep="\t", index=False)
 | 
| 
 | 
   187     except Exception as e:
 | 
| 
 | 
   188         raise utils.DataErr(path, f"failed writing tabular output: {e}")
 | 
| 
 | 
   189     
 | 
| 
 | 
   190 def is_placeholder(gid) -> bool:
 | 
| 
 | 
   191     """Return True if the gene id looks like a placeholder (e.g., 0/NA/NAN/empty)."""
 | 
| 
 | 
   192     if gid is None:
 | 
| 
 | 
   193         return True
 | 
| 
 | 
   194     s = str(gid).strip().lower()
 | 
| 
 | 
   195     return s in {"0", "", "na", "nan"}  # lowercase for simple matching
 | 
| 
 | 
   196 
 | 
| 
 | 
   197 def sample_valid_gene_ids(genes, limit=10):
 | 
| 
 | 
   198     """Yield up to `limit` valid gene IDs, skipping placeholders (e.g., the first 0 in RECON)."""
 | 
| 
 | 
   199     out = []
 | 
| 
 | 
   200     for g in genes:
 | 
| 
 | 
   201         gid = getattr(g, "id", getattr(g, "gene_id", g))
 | 
| 
 | 
   202         if not is_placeholder(gid):
 | 
| 
 | 
   203             out.append(str(gid))
 | 
| 
 | 
   204             if len(out) >= limit:
 | 
| 
 | 
   205                 break
 | 
| 
 | 
   206     return out
 | 
| 
 | 
   207 
 | 
| 
 | 
   208 
 | 
| 
 | 
   209 ###############################- ENTRY POINT -################################
 | 
| 
 | 
   210 def main(args:List[str] = None) -> None:
 | 
| 
 | 
   211     """
 | 
| 
 | 
   212     Initialize and generate custom data based on the frontend input arguments.
 | 
| 
 | 
   213     
 | 
| 
 | 
   214     Returns:
 | 
| 
 | 
   215         None
 | 
| 
 | 
   216     """
 | 
| 
 | 
   217     # Parse args from frontend (Galaxy XML)
 | 
| 
 | 
   218     global ARGS
 | 
| 
 | 
   219     ARGS = process_args(args)
 | 
| 
 | 
   220 
 | 
| 
496
 | 
   221     # Convert name from list to string (handles names with spaces)
 | 
| 
 | 
   222     if isinstance(ARGS.name, list):
 | 
| 
 | 
   223         ARGS.name = ' '.join(ARGS.name)
 | 
| 
491
 | 
   224 
 | 
| 
 | 
   225     if ARGS.input:
 | 
| 
498
 | 
   226         # Load a custom model from file with auto-detected format
 | 
| 
 | 
   227         detected_format = detect_file_format(ARGS.input)
 | 
| 
 | 
   228         model = load_custom_model(utils.FilePath.fromStrPath(ARGS.input), detected_format)
 | 
| 
491
 | 
   229     else:
 | 
| 
 | 
   230         # Load a built-in model
 | 
| 
495
 | 
   231         if not ARGS.model:
 | 
| 
 | 
   232             raise utils.ArgsErr("model", "either --model or --input must be provided", "None")
 | 
| 
491
 | 
   233 
 | 
| 
 | 
   234         try:
 | 
| 
 | 
   235             model_enum = utils.Model[ARGS.model]  # e.g., Model['ENGRO2']
 | 
| 
 | 
   236         except KeyError:
 | 
| 
 | 
   237             raise utils.ArgsErr("model", "one of Recon/ENGRO2/HMRcore/Custom_model", ARGS.model)
 | 
| 
 | 
   238 
 | 
| 
 | 
   239         # Load built-in model (Model.getCOBRAmodel uses tool_dir to locate local models)
 | 
| 
 | 
   240         try:
 | 
| 
 | 
   241             model = model_enum.getCOBRAmodel(toolDir=ARGS.tool_dir)
 | 
| 
 | 
   242         except Exception as e:
 | 
| 
 | 
   243             # Wrap/normalize load errors as DataErr for consistency
 | 
| 
 | 
   244             raise utils.DataErr(ARGS.model, f"failed loading built-in model: {e}")
 | 
| 
 | 
   245 
 | 
| 
 | 
   246     # Determine final model name: explicit --name overrides, otherwise use the model id
 | 
| 
 | 
   247     
 | 
| 
 | 
   248     if ARGS.name == "ENGRO2" and ARGS.medium_selector != "Default":
 | 
| 
 | 
   249         df_mediums = pd.read_csv(ARGS.tool_dir + "/local/medium/medium.csv", index_col = 0)
 | 
| 
528
 | 
   250         #ARGS.medium_selector = ARGS.medium_selector.replace("_", " ") medium.csv uses underscores now
 | 
| 
491
 | 
   251         medium = df_mediums[[ARGS.medium_selector]]
 | 
| 
 | 
   252         medium = medium[ARGS.medium_selector].to_dict()
 | 
| 
 | 
   253 
 | 
| 
 | 
   254         # Reset all medium reactions lower bound to zero
 | 
| 
 | 
   255         for rxn_id, _ in model.medium.items():
 | 
| 
 | 
   256             model.reactions.get_by_id(rxn_id).lower_bound = float(0.0)
 | 
| 
 | 
   257         
 | 
| 
 | 
   258         # Apply selected medium uptake bounds (negative for uptake)
 | 
| 
 | 
   259         for reaction, value in medium.items():
 | 
| 
 | 
   260             if value is not None:
 | 
| 
 | 
   261                 model.reactions.get_by_id(reaction).lower_bound = -float(value)
 | 
| 
 | 
   262 
 | 
| 
 | 
   263     # Initialize translation_issues dictionary
 | 
| 
 | 
   264     translation_issues = {}
 | 
| 
 | 
   265     
 | 
| 
 | 
   266     if (ARGS.name == "Recon" or ARGS.name == "ENGRO2") and ARGS.gene_format != "Default":
 | 
| 
 | 
   267         logging.basicConfig(level=logging.INFO)
 | 
| 
 | 
   268         logger = logging.getLogger(__name__)
 | 
| 
 | 
   269 
 | 
| 
 | 
   270         model, translation_issues = modelUtils.translate_model_genes(
 | 
| 
 | 
   271             model=model,
 | 
| 
 | 
   272             mapping_df= pd.read_csv(ARGS.tool_dir + "/local/mappings/genes_human.csv", dtype={'entrez_id': str}),
 | 
| 
 | 
   273             target_nomenclature=ARGS.gene_format,
 | 
| 
 | 
   274             source_nomenclature='HGNC_symbol',
 | 
| 
 | 
   275             logger=logger
 | 
| 
 | 
   276         )
 | 
| 
 | 
   277 
 | 
| 
502
 | 
   278     if ARGS.input and ARGS.gene_format != "Default":
 | 
| 
491
 | 
   279         logging.basicConfig(level=logging.INFO)
 | 
| 
 | 
   280         logger = logging.getLogger(__name__)
 | 
| 
 | 
   281 
 | 
| 
 | 
   282         # Take a small, clean sample of gene IDs (skipping placeholders like 0)
 | 
| 
 | 
   283         ids_sample = sample_valid_gene_ids(model.genes, limit=10)
 | 
| 
 | 
   284         if not ids_sample:
 | 
| 
 | 
   285             raise utils.DataErr(
 | 
| 
 | 
   286                 "Custom_model",
 | 
| 
 | 
   287                 "No valid gene IDs found (many may be placeholders like 0)."
 | 
| 
 | 
   288             )
 | 
| 
 | 
   289 
 | 
| 
 | 
   290         # Detect source nomenclature on the sample
 | 
| 
 | 
   291         types = []
 | 
| 
 | 
   292         for gid in ids_sample:
 | 
| 
 | 
   293             try:
 | 
| 
 | 
   294                 t = modelUtils.gene_type(gid, "Custom_model")
 | 
| 
 | 
   295             except Exception as e:
 | 
| 
 | 
   296                 # Keep it simple: skip problematic IDs
 | 
| 
 | 
   297                 logger.debug(f"gene_type failed for {gid}: {e}")
 | 
| 
 | 
   298                 t = None
 | 
| 
 | 
   299             if t:
 | 
| 
 | 
   300                 types.append(t)
 | 
| 
 | 
   301 
 | 
| 
 | 
   302         if not types:
 | 
| 
 | 
   303             raise utils.DataErr(
 | 
| 
 | 
   304                 "Custom_model",
 | 
| 
 | 
   305                 "Could not detect a known gene nomenclature from the sample."
 | 
| 
 | 
   306             )
 | 
| 
 | 
   307 
 | 
| 
 | 
   308         unique_types = set(types)
 | 
| 
 | 
   309         if len(unique_types) > 1:
 | 
| 
 | 
   310             raise utils.DataErr(
 | 
| 
 | 
   311                 "Custom_model",
 | 
| 
 | 
   312                 "Mixed or inconsistent gene nomenclatures detected. "
 | 
| 
 | 
   313                 "Please unify them before converting."
 | 
| 
 | 
   314             )
 | 
| 
 | 
   315 
 | 
| 
 | 
   316         source_nomenclature = types[0]
 | 
| 
 | 
   317 
 | 
| 
 | 
   318         # Convert only if needed
 | 
| 
 | 
   319         if source_nomenclature != ARGS.gene_format:
 | 
| 
 | 
   320             model, translation_issues = modelUtils.translate_model_genes(
 | 
| 
 | 
   321                 model=model,
 | 
| 
 | 
   322                 mapping_df= pd.read_csv(ARGS.tool_dir + "/local/mappings/genes_human.csv", dtype={'entrez_id': str}),
 | 
| 
 | 
   323                 target_nomenclature=ARGS.gene_format,
 | 
| 
 | 
   324                 source_nomenclature=source_nomenclature,
 | 
| 
 | 
   325                 logger=logger
 | 
| 
 | 
   326             )
 | 
| 
 | 
   327 
 | 
| 
508
 | 
   328     # generate data using unified function
 | 
| 
491
 | 
   329     if not ARGS.out_tabular:
 | 
| 
 | 
   330         raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular)
 | 
| 
508
 | 
   331     
 | 
| 
 | 
   332     merged = modelUtils.export_model_to_tabular(
 | 
| 
 | 
   333         model=model,
 | 
| 
 | 
   334         output_path=ARGS.out_tabular,
 | 
| 
 | 
   335         translation_issues=translation_issues,
 | 
| 
 | 
   336         include_objective=True,
 | 
| 
 | 
   337         save_function=save_as_tabular_df
 | 
| 
 | 
   338     )
 | 
| 
491
 | 
   339     expected = ARGS.out_tabular
 | 
| 
 | 
   340 
 | 
| 
 | 
   341     # verify output exists and non-empty
 | 
| 
 | 
   342     if not expected or not os.path.exists(expected) or os.path.getsize(expected) == 0:
 | 
| 
 | 
   343         raise utils.DataErr(expected, "Output not created or empty")
 | 
| 
 | 
   344 
 | 
| 
 | 
   345     print("Metabolic_model_setting: completed successfully")
 | 
| 
 | 
   346 
 | 
| 
 | 
   347 if __name__ == '__main__':
 | 
| 
 | 
   348 
 | 
| 
 | 
   349     main()
 |