Mercurial > repos > bimib > cobraxy
changeset 456:a6e45049c1b9 draft default tip
Uploaded
author | francesco_lapi |
---|---|
date | Fri, 12 Sep 2025 17:28:45 +0000 |
parents | 4e2bc80764b6 |
children | |
files | COBRAxy/README.md COBRAxy/custom_data_generator_beta.py COBRAxy/flux_simulation_beta.py COBRAxy/fromCSVtoCOBRA_beta.py COBRAxy/local/readme.txt COBRAxy/marea.py COBRAxy/ras_generator_beta.py COBRAxy/ras_to_bounds_beta.py COBRAxy/rps_generator_beta.py COBRAxy/utils/CBS_backend.py COBRAxy/utils/general_utils.py COBRAxy/utils/model_utils.py COBRAxy/utils/reaction_parsing.py COBRAxy/utils/rule_parsing.py |
diffstat | 14 files changed, 827 insertions(+), 544 deletions(-) [+] |
line wrap: on
line diff
--- a/COBRAxy/README.md Fri Sep 12 15:05:54 2025 +0000 +++ b/COBRAxy/README.md Fri Sep 12 17:28:45 2025 +0000 @@ -1,11 +1,289 @@ -# Official repository for the COBRAxy toolset -> COBRAxy (COBRApy in Galaxy) is a user-friendly tool that allows a user to user to characterize and to graphically compare simulated fluxomics coming from groups of samples with different transcriptional regulation of metabolism. -It extends the MaREA 2 (Metabolic Reaction Enrichment Analysis) tool that enables users to compare groups in terms of RAS and RPS only. The tool is available as plug-in for the widely-used Galaxy platform for comparative genomics and bioinformatics analyses. +<p align="center"> + <img src="https://opencobra.github.io/cobrapy/_static/img/cobrapy_logo.png" alt="COBRApy logo" width="120"/> +</p> + +# COBRAxy — Metabolic analysis and visualization toolkit (Galaxy-ready) + +COBRAxy (COBRApy in Galaxy) is a toolkit to compute, analyze, and visualize metabolism at the reaction level from transcriptomics and metabolomics data. It enables users to: + +- derive Reaction Activity Scores (RAS) from gene expression and Reaction Propensity Scores (RPS) from metabolite abundances, +- integrate RAS into model bounds, +- perform flux sampling with either CBS (constraint-based sampling) or OPTGP, +- compute statistics (pFBA, FVA, sensitivity) and generate styled SVG/PDF metabolic maps, +- run all tools as Galaxy wrappers or via CLI on any machine. + +It extends the MaREA 2 (Metabolic Reaction Enrichment Analysis) concept by adding sampling-based flux comparison and rich visualization. The repository ships both Python CLIs and Galaxy tool XMLs. + +## Table of contents + +- Overview and features +- Requirements +- Installation (pip/conda) +- Quick start (CLI) +- Tools and usage + - custom_data_generator + - ras_generator (RAS) + - rps_generator (RPS) + - ras_to_bounds + - flux_simulation (CBS/OPTGP) + - marea (enrichment + maps) + - flux_to_map (maps from fluxes) + - marea_cluster (clustering auxiliaries) +- Typical workflow +- Input/output formats +- Galaxy usage +- Troubleshooting +- Contributing +- License and citations +- Useful links + +## Overview and features + +COBRAxy builds on COBRApy to deliver end‑to‑end analysis from expression/metabolite data to flux statistics and map rendering: + +- RAS and RPS computation from tabular inputs +- Bounds integration and model preparation +- Flux sampling: CBS (GLPK backend) with automatic fallback to a COBRApy interface, or OPTGP +- Flux statistics: mean/median/quantiles, pFBA, FVA, sensitivity +- Map styling/export: SVG with optional PDF/PNG export +- Ready-made Galaxy wrappers for all tools + +Bundled resources in `local/` include example models (ENGRO2, Recon), gene mappings, a default medium, and SVG maps. + +## Requirements + +- OS: Linux, macOS, or Windows (Linux recommended; Galaxy typically runs on Linux) +- Python: 3.8.20 ≤ version < 3.12 (as per `setup.py`) +- Python packages (installed automatically by `pip install .`): + - cobra==0.29.0, numpy==1.24.4, pandas==2.0.3, scipy==1.11, scikit-learn==1.3.2, seaborn==0.13.0 + - matplotlib==3.7.3, lxml==5.2.2, cairosvg==2.7.1, svglib==1.5.1, pyvips==2.2.3, Pillow + - joblib==1.4.2, anndata==0.8.0, pydeseq2==0.5.1 +- Optional but recommended for CBS sampling performance: + - GLPK solver and Python bindings + - System library: glpk (e.g., Ubuntu: `apt-get install glpk-utils libglpk40`) + - Python: `swiglpk` (note: CBS falls back to a COBRApy interface if GLPK is unavailable) +- For pyvips: system libvips (e.g., Ubuntu: `apt-get install libvips`) + +Notes: +- If you hit system-level library errors for SVG/PDF/PNG conversion or vips, install the corresponding OS packages. +- GPU is not required. + +## Installation + +Python virtual environment is strongly recommended. + +### Install from source (pip) + +1) Clone the repo and install: + +```bash +git clone https://github.com/CompBtBs/COBRAxy.git +cd COBRAxy +python3 -m venv .venv && source .venv/bin/activate +pip install --upgrade pip +pip install . +``` + +This installs console entry points: `custom_data_generator`, `ras_generator`, `rps_generator`, `ras_to_bounds`, `flux_simulation`, `flux_to_map`, `marea`, `marea_cluster`. + +### Install with conda (alternative) + +```bash +conda create -n cobraxy python=3.10 -y +conda activate cobraxy +pip install . +# Optional system deps (Ubuntu): sudo apt-get install libvips libxml2 libxslt1.1 glpk-utils +# Optional Python bindings for GLPK: pip install swiglpk +``` + +## Quick start (CLI) + +All tools provide `-h/--help` for details. Outputs are TSV/CSV and SVG/PDF files depending on the tool and flags. + +Example minimal flow (using built-in ENGRO2 model and provided assets): + +```bash +# 1) Generate rules/reactions/bounds/medium from a model (optional if using bundled ones) +custom_data_generator \ + -id local/models/ENGRO2.xml \ + -mn ENGRO2.xml \ + -orules out/ENGRO2_rules.tsv \ + -orxns out/ENGRO2_reactions.tsv \ + -omedium out/ENGRO2_medium.tsv \ + -obnds out/ENGRO2_bounds.tsv + +# 2) Compute RAS from expression data +ras_generator \ + -td $(pwd) \ + -in my_expression.tsv \ + -ra out/ras.tsv \ + -rs ENGRO2 + +# 3) Integrate RAS into bounds +ras_to_bounds \ + -td $(pwd) \ + -ms ENGRO2 \ + -ir out/ras.tsv \ + -rs true \ + -idop out/ras_bounds + +# 4) Flux sampling (CBS) +flux_simulation \ + -td $(pwd) \ + -ms ENGRO2 \ + -in out/ras_bounds/sample1.tsv,out/ras_bounds/sample2.tsv \ + -ni sample1,sample2 \ + -a CBS -ns 500 -sd 0 -nb 1 \ + -ot mean,median,quantiles \ + -ota pFBA,FVA,sensitivity \ + -idop out/flux -## Useful links: +# 5) Enrichment + map styling (RAS/RPS or fluxes) +marea \ + -td $(pwd) \ + -using_RAS true -input_data out/ras.tsv \ + -comparison manyvsmany -test ks \ + -generate_svg true -generate_pdf true \ + -choice_map ENGRO2 -idop out/maps +``` + +## Tools and usage + +Below is a high‑level summary of each CLI. Use `--help` for the full list of options. + +### 1) custom_data_generator + +Generate model‑derived assets. + +Required inputs: +- `-id/--input`: model file (XML or JSON; gz/zip/bz2 also supported via extension) +- `-mn/--name`: the original file name including extension (Galaxy renames files; this preserves the true format) +- `-orules`, `-orxns`, `-omedium`, `-obnds`: output paths + +Outputs: +- TSV with rules, reactions, exchange medium, and bounds. + +### 2) ras_generator (Reaction Activity Scores) + +Compute RAS from a gene expression table. + +Key inputs: +- `-td/--tool_dir`: repository root path (used to locate `local/` assets) +- `-in/--input`: expression TSV (rows: genes; columns: samples) +- `-rs/--rules_selector`: model/rules choice, e.g. `ENGRO2` or `Custom` with `-rl` and `-rn` +- Optional: `-rl/--rule_list` custom rules TSV, `-rn/--rules_name` its original name/extension +- Output: `-ra/--ras_output` TSV + +### 3) rps_generator (Reaction Propensity Scores) + +Compute RPS from a metabolite abundance table. + +Key inputs: +- `-td/--tool_dir`: repository root +- `-id/--input`: metabolite TSV (rows: metabolites; columns: samples) +- `-rc/--reaction_choice`: `default` or `custom` with `-cm/--custom` reactions TSV +- Output: `-rp/--rps_output` TSV + +### 4) ras_to_bounds + +Integrate RAS into reaction bounds for a given model and medium. + +Key inputs: +- `-td/--tool_dir`: repository root +- `-ms/--model_selector`: one of `ENGRO2` or `Custom` with `-mo/--model` and `-mn/--model_name` +- Medium: `-mes/--medium_selector` (default `allOpen`) or `-meo/--medium` custom TSV +- RAS: `-ir/--input_ras` and `-rs/--ras_selector` (true/false) +- Output folder: `-idop/--output_path` + +Outputs: +- One bounds TSV per sample in the RAS table. + +### 5) flux_simulation + +Flux sampling with CBS or OPTGP and downstream statistics. + +Key inputs: +- `-td/--tool_dir` +- Model: `-ms/--model_selector` (ENGRO2 or Custom with `-mo`/`-mn`) +- Bounds files: `-in` (comma‑separated list) and `-ni/--names` (comma‑separated sample names) +- Algorithm: `-a CBS|OPTGP`; CBS uses GLPK if available and falls back to a COBRApy interface +- Sampling params: `-ns/--n_samples`, `-th/--thinning` (OPTGP), `-nb/--n_batches`, `-sd/--seed` +- Outputs: `-ot/--output_type` (mean,median,quantiles) and `-ota/--output_type_analysis` (pFBA,FVA,sensitivity) +- Output path: `-idop/--output_path` + +Outputs: +- Per‑sample or aggregated CSV/TSV with flux samples and statistics. + +### 6) marea + +Statistical enrichment and map styling for RAS and/or RPS groups with optional DESeq2‑style testing via `pydeseq2`. + +Key inputs: +- `-td/--tool_dir` +- Comparison: `-co manyvsmany|onevsrest|onevsmany` +- Test: `-te ks|ttest_p|ttest_ind|wilcoxon|mw|DESeq` +- Thresholds: `-pv`, `-adj` (FDR), `-fc` +- Data: RAS `-using_RAS` plus `-input_data` or multiple datasets with names; similarly for RPS with `-using_RPS` +- Map: `-choice_map HMRcore|ENGRO2|Custom` or `-custom_map` SVG +- Output: `-gs/--generate_svg`, `-gp/--generate_pdf`, output dir `-idop` + +Outputs: +- Styled SVG (and optional PDF/PNG) highlighting enriched reactions by color/width per your thresholds. + +### 7) flux_to_map + +Like `marea`, but driven by fluxes instead of RAS/RPS. Accepts single or multiple flux datasets and produces styled maps. + +### 8) marea_cluster + +Convenience clustering utilities (k‑means, DBSCAN, hierarchical) for grouping samples; produces labels and optional plots. + +## Typical workflow + +1. Prepare a model and generate its assets (optional if using bundled assets): `custom_data_generator` +2. Compute RAS from expression: `ras_generator` (and/or compute RPS via `rps_generator`) +3. Integrate RAS into bounds: `ras_to_bounds` +4. Sample fluxes: `flux_simulation` with CBS or OPTGP +5. Analyze and visualize: `marea` or `flux_to_map` to render SVG/PDF metabolic maps +6. Optionally cluster or further analyze results: `marea_cluster` + +## Input/output formats + +Unless otherwise stated, inputs are tab‑separated (TSV) text files with headers. + +- Expression (RAS): rows = genes (HGNC/Ensembl/symbol/Entrez supported), columns = samples +- Metabolite table (RPS): rows = metabolites, columns = samples +- Rules/Reactions: TSV with two columns: ReactionID, Rule/Reaction +- Bounds: TSV with index = reaction IDs, columns = lower_bound, upper_bound +- Medium: single‑column TSV listing exchange reactions +- Flux samples/statistics: CSV/TSV with reactions as rows and samples/statistics as columns + +## Galaxy usage + +Each CLI has a corresponding Galaxy tool XML in the repository (e.g., `marea.xml`, `flux_simulation.xml`). Use `shed.yml` to publish to a Galaxy toolshed. The `local/` directory provides models, mappings, and maps for out‑of‑the‑box runs inside Galaxy. + +## Troubleshooting + +- GLPK/CBS issues: if `swiglpk` or GLPK is missing, `flux_simulation` will attempt a COBRApy fallback. Install GLPK + `swiglpk` for best performance. +- pyvips errors: install `libvips` on your system. Reinstall the `pyvips` wheel afterward if needed. +- PDF/SVG conversions: ensure `cairosvg`, `svglib`, and system libraries (`libxml2`, `libxslt`) are installed. +- Python version: stick to Python ≥3.8.20 and <3.12. +- Memory/time: reduce `-ns` (samples) or `-nb` (batches); consider OPTGP if CBS is slow for your model. + +## Contributing + +Pull requests are welcome. Please: +- keep changes focused and documented, +- add concise docstrings/comments in English, +- preserve public CLI parameters and file formats. + +## License and citations + +This project is distributed under the MIT License. If you use COBRAxy in academic work, please cite COBRApy and MaREA, and reference this repository. + +## Useful links + - COBRAxy Google Summer of Code 2024: https://summerofcode.withgoogle.com/programs/2024/projects/LSrCKfq7 - COBRApy: https://opencobra.github.io/cobrapy/ - MaREA4Galaxy: https://galaxyproject.org/use/marea4galaxy/ - Galaxy project: https://usegalaxy.org/ - -## Documentation: \ No newline at end of file
--- a/COBRAxy/custom_data_generator_beta.py Fri Sep 12 15:05:54 2025 +0000 +++ b/COBRAxy/custom_data_generator_beta.py Fri Sep 12 17:28:45 2025 +0000 @@ -1,13 +1,19 @@ +""" +Custom data generator for COBRA models. + +This script loads a COBRA model (built-in or custom), optionally applies +medium and gene nomenclature settings, derives reaction-related metadata +(GPR rules, formulas, bounds, objective coefficients, medium membership, +and compartments for ENGRO2), and writes a tabular summary. +""" + import os import csv import cobra -import pickle import argparse import pandas as pd import utils.general_utils as utils -import utils.rule_parsing as rulesUtils -from typing import Optional, Tuple, Union, List, Dict -import utils.reaction_parsing as reactionUtils +from typing import Optional, Tuple, List import utils.model_utils as modelUtils import logging @@ -50,7 +56,7 @@ ################################- INPUT DATA LOADING -################################ def load_custom_model(file_path :utils.FilePath, ext :Optional[utils.FileFormat] = None) -> cobra.Model: """ - Loads a custom model from a file, either in JSON or XML format. + Loads a custom model from a file, either in JSON, XML, MAT, or YML format. Args: file_path : The path to the file containing the custom model. @@ -70,9 +76,17 @@ if ext is utils.FileFormat.JSON: return cobra.io.load_json_model(file_path.show()) + if ext is utils.FileFormat.MAT: + return cobra.io.load_matlab_model(file_path.show()) + + if ext is utils.FileFormat.YML: + return cobra.io.load_yaml_model(file_path.show()) + except Exception as e: raise utils.DataErr(file_path, e.__str__()) - raise utils.DataErr(file_path, - f"Formato \"{file_path.ext}\" non riconosciuto, sono supportati solo file JSON e XML") + raise utils.DataErr( + file_path, + f"Unrecognized format '{file_path.ext}'. Only JSON, XML, MAT, YML are supported." + ) ###############################- FILE SAVING -################################ @@ -115,6 +129,19 @@ writer.writerow({ fieldNames[0] : key, fieldNames[1] : value }) def save_as_tabular_df(df: pd.DataFrame, path: str) -> None: + """ + Save a pandas DataFrame as a tab-separated file, creating directories as needed. + + Args: + df: The DataFrame to write. + path: Destination file path (will be written as TSV). + + Raises: + DataErr: If writing the output fails for any reason. + + Returns: + None + """ try: os.makedirs(os.path.dirname(path) or ".", exist_ok=True) df.to_csv(path, sep="\t", index=False) @@ -125,22 +152,22 @@ ###############################- ENTRY POINT -################################ def main(args:List[str] = None) -> None: """ - Initializes everything and sets the program in motion based on the fronted input arguments. + Initialize and generate custom data based on the frontend input arguments. Returns: None """ - # get args from frontend (related xml) + # Parse args from frontend (Galaxy XML) global ARGS ARGS = process_args(args) if ARGS.input: - # load custom model + # Load a custom model from file model = load_custom_model( utils.FilePath.fromStrPath(ARGS.input), utils.FilePath.fromStrPath(ARGS.name).ext) else: - # load built-in model + # Load a built-in model try: model_enum = utils.Model[ARGS.model] # e.g., Model['ENGRO2'] @@ -164,28 +191,15 @@ medium = df_mediums[[ARGS.medium_selector]] medium = medium[ARGS.medium_selector].to_dict() - # Set all reactions to zero in the medium + # Reset all medium reactions lower bound to zero for rxn_id, _ in model.medium.items(): model.reactions.get_by_id(rxn_id).lower_bound = float(0.0) - # Set medium conditions + # Apply selected medium uptake bounds (negative for uptake) for reaction, value in medium.items(): if value is not None: model.reactions.get_by_id(reaction).lower_bound = -float(value) - #if ARGS.name == "ENGRO2" and ARGS.gene_format != "Default": - # logging.basicConfig(level=logging.INFO) - # logger = logging.getLogger(__name__) - - #model = modelUtils.translate_model_genes( - # model=model, - # mapping_df= pd.read_csv(ARGS.tool_dir + "/local/mappings/genes_human.csv"), dtype={'entrez_id': str}, - # target_nomenclature=ARGS.gene_format.replace("HGNC_", "HGNC "), - # source_nomenclature='HGNC_ID', - # logger=logger - #) - #model = modelUtils.convert_genes(model, ARGS.gene_format.replace("HGNC_", "HGNC ")) - if (ARGS.name == "Recon" or ARGS.name == "ENGRO2") and ARGS.gene_format != "Default": logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -213,7 +227,7 @@ df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) df_medium = medium.rename(columns = {"reaction": "ReactionID"}) - df_medium["InMedium"] = True # flag per indicare la presenza nel medium + df_medium["InMedium"] = True merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer") merged = merged.merge(df_bounds, on = "ReactionID", how = "outer") @@ -226,12 +240,6 @@ merged = merged.sort_values(by = "InMedium", ascending = False) - #out_file = os.path.join(ARGS.output_path, f"{os.path.basename(ARGS.name).split('.')[0]}_custom_data") - - #merged.to_csv(out_file, sep = '\t', index = False) - - #### - if not ARGS.out_tabular: raise utils.ArgsErr("out_tabular", "output path (--out_tabular) is required when output_format == tabular", ARGS.out_tabular) save_as_tabular_df(merged, ARGS.out_tabular) @@ -239,7 +247,7 @@ # verify output exists and non-empty if not expected or not os.path.exists(expected) or os.path.getsize(expected) == 0: - raise utils.DataErr(expected, "Output non creato o vuoto") + raise utils.DataErr(expected, "Output not created or empty") print("CustomDataGenerator: completed successfully")
--- a/COBRAxy/flux_simulation_beta.py Fri Sep 12 15:05:54 2025 +0000 +++ b/COBRAxy/flux_simulation_beta.py Fri Sep 12 17:28:45 2025 +0000 @@ -1,8 +1,19 @@ +""" +Flux sampling and analysis utilities for COBRA models. + +This script supports two modes: +- Mode 1 (model_and_bounds=True): load a base model and apply bounds from + separate files before sampling. +- Mode 2 (model_and_bounds=False): load complete models and sample directly. + +Sampling algorithms supported: OPTGP and CBS. Outputs include flux samples +and optional analyses (pFBA, FVA, sensitivity), saved as tabular files. +""" + import argparse import utils.general_utils as utils -from typing import Optional, List +from typing import List import os -import numpy as np import pandas as pd import cobra import utils.CBS_backend as CBS_backend @@ -121,6 +132,17 @@ def write_to_file(dataset: pd.DataFrame, name: str, keep_index:bool=False)->None: + """ + Write a DataFrame to a TSV file under ARGS.output_path with a given base name. + + Args: + dataset: The DataFrame to write. + name: Base file name (without extension). + keep_index: Whether to keep the DataFrame index in the file. + + Returns: + None + """ dataset.index.name = 'Reactions' dataset.to_csv(ARGS.output_path + "/" + name + ".csv", sep = '\t', index = keep_index) @@ -180,7 +202,6 @@ for i in range(0, n_batches): os.remove(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_OPTGP.csv') - pass def CBS_sampler(model:cobra.Model, model_name:str, n_samples:int=1000, n_batches:int=1, seed:int=0)-> None: @@ -224,7 +245,6 @@ for i in range(0, n_batches): os.remove(ARGS.output_path + "/" + model_name + '_'+ str(i)+'_CBS.csv') - pass @@ -393,7 +413,7 @@ ############################# main ########################################### def main(args :List[str] = None) -> None: """ - Initializes everything and sets the program in motion based on the fronted input arguments. + Initialize and run sampling/analysis based on the frontend input arguments. Returns: None @@ -407,11 +427,6 @@ if not os.path.exists(ARGS.output_path): os.makedirs(ARGS.output_path) - #ARGS.bounds = ARGS.input.split(",") - #ARGS.bounds_name = ARGS.name.split(",") - #ARGS.output_types = ARGS.output_type.split(",") - #ARGS.output_type_analysis = ARGS.output_type_analysis.split(",") - # --- Normalize inputs (the tool may pass comma-separated --input and either --name or --names) --- ARGS.input_files = ARGS.input.split(",") if ARGS.input else [] ARGS.file_names = ARGS.name.split(",") @@ -439,14 +454,14 @@ validation = model_utils.validate_model(base_model) - print("\n=== VALIDAZIONE MODELLO ===") + print("\n=== MODEL VALIDATION ===") for key, value in validation.items(): print(f"{key}: {value}") - #Set solver verbosity to 1 to see warning and error messages only. + # Set solver verbosity to 1 to see warning and error messages only. base_model.solver.configuration.verbosity = 1 - # Process each bounds file with the base model + # Process each bounds file with the base model results = Parallel(n_jobs=num_processors)( delayed(model_sampler_with_bounds)(base_model, bounds_file, cell_name) for bounds_file, cell_name in zip(ARGS.input_files, ARGS.file_names) @@ -498,7 +513,7 @@ all_sensitivity = all_sensitivity.sort_index() write_to_file(all_sensitivity.T, "sensitivity", True) - pass + return ############################################################################## if __name__ == "__main__":
--- a/COBRAxy/fromCSVtoCOBRA_beta.py Fri Sep 12 15:05:54 2025 +0000 +++ b/COBRAxy/fromCSVtoCOBRA_beta.py Fri Sep 12 17:28:45 2025 +0000 @@ -1,18 +1,25 @@ +""" +Convert a tabular (CSV/TSV/Tabular) description of a COBRA model into a COBRA file. + +Supported output formats: SBML, JSON, MATLAB (.mat), YAML. +The script logs to a user-provided file for easier debugging in Galaxy. +""" + import os -import csv import cobra -import pickle import argparse -import pandas as pd -import utils.general_utils as utils -from typing import Optional, Tuple, Union, List, Dict +from typing import List import logging -import utils.rule_parsing as rulesUtils -import utils.reaction_parsing as reactionUtils import utils.model_utils as modelUtils ARGS : argparse.Namespace def process_args(args: List[str] = None) -> argparse.Namespace: + """ + Parse command-line arguments for the CSV-to-COBRA conversion tool. + + Returns: + argparse.Namespace: Parsed arguments. + """ parser = argparse.ArgumentParser( usage="%(prog)s [options]", description="Convert a tabular/CSV file to a COBRA model" @@ -45,6 +52,13 @@ ###############################- ENTRY POINT -################################ def main(args: List[str] = None) -> None: + """ + Entry point: parse arguments, build the COBRA model from a CSV/TSV file, + and save it in the requested format. + + Returns: + None + """ global ARGS ARGS = process_args(args)
--- a/COBRAxy/local/readme.txt Fri Sep 12 15:05:54 2025 +0000 +++ b/COBRAxy/local/readme.txt Fri Sep 12 17:28:45 2025 +0000 @@ -4,6 +4,4 @@ {keys = possibili codifiche dei geni : value = { keys = nome dei geni nella codifica corrispondente : 'ok' } } Regole: -{keys = possibili codifiche dei geni : value = { keys = nome della reazione/metabolita : [ lista di stringhe contenente la regola nella codifica corrispondente alla keys ] } } - -README DA RIVEDERE \ No newline at end of file +{keys = possibili codifiche dei geni : value = { keys = nome della reazione/metabolita : [ lista di stringhe contenente la regola nella codifica corrispondente alla keys ] } } \ No newline at end of file
--- a/COBRAxy/marea.py Fri Sep 12 15:05:54 2025 +0000 +++ b/COBRAxy/marea.py Fri Sep 12 17:28:45 2025 +0000 @@ -1,3 +1,10 @@ +""" +MAREA: Enrichment and map styling for RAS/RPS data. + +This module compares groups of samples using RAS (Reaction Activity Scores) and/or +RPS (Reaction Propensity Scores), computes statistics (p-values, z-scores, fold change), +and applies visual styling to an SVG metabolic map (with optional PDF/PNG export). +""" from __future__ import division import csv from enum import Enum @@ -26,13 +33,13 @@ ARGS :argparse.Namespace def process_args(args:List[str] = None) -> argparse.Namespace: """ - Interfaces the script of a module with its frontend, making the user's choices for various parameters available as values in code. + Parse command-line arguments exposed by the Galaxy frontend for this module. Args: - args : Always obtained (in file) from sys.argv + args: Optional list of arguments, defaults to sys.argv when None. Returns: - Namespace : An object containing the parsed arguments + Namespace: Parsed arguments. """ parser = argparse.ArgumentParser( usage = "%(prog)s [options]", @@ -265,8 +272,6 @@ tmp.append('stroke-dasharray:' + dash) return ';'.join(tmp) -# TODO: remove, there's applyRPS whatever -# The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix! def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree: """ Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs. @@ -343,18 +348,15 @@ f"//*[@id=\"{reactionId}\"]").map( lambda xPath : metabMap.xpath(xPath)[0]).mapErr( lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map")) - # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user. def styleMapElement(element :ET.Element, styleStr :str) -> None: + """Append/override stroke-related styles on a given SVG element.""" currentStyles :str = element.get("style", "") if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles): - currentStyles = ';'.join(currentStyles.split(';')[:-3]) # TODO: why the last 3? Are we sure? - - #TODO: this is attempting to solve the styling override problem, not sure it does tho + currentStyles = ';'.join(currentStyles.split(';')[:-3]) element.set("style", currentStyles + styleStr) -# TODO: maybe remove vvv class ReactionDirection(Enum): Unknown = "" Direct = "_F" @@ -372,6 +374,7 @@ return ReactionDirection.fromDir(reactionId[-2:]) def getArrowBodyElementId(reactionId :str) -> str: + """Return the SVG element id for a reaction arrow body, normalizing direction tags.""" if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: reactionId = reactionId[:-2] return f"R_{reactionId}" @@ -401,13 +404,9 @@ UpRegulated = "#ecac68" # orange, up-regulated reaction DownRegulated = "#6495ed" # lightblue, down-regulated reaction - UpRegulatedInv = "#FF0000" - # ^^^ bright red, up-regulated net value for a reversible reaction with - # conflicting enrichment in the two directions. + UpRegulatedInv = "#FF0000" # bright red for reversible with conflicting directions - DownRegulatedInv = "#0000FF" - # ^^^ bright blue, down-regulated net value for a reversible reaction with - # conflicting enrichment in the two directions. + DownRegulatedInv = "#0000FF" # bright blue for reversible with conflicting directions @classmethod def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor": @@ -444,7 +443,7 @@ ERRORS.append(reactionId) def styleReactionElements(self, metabMap :ET.ElementTree, reactionId :str, *, mindReactionDir = True) -> None: - # If We're dealing with RAS data or in general don't care about the direction of the reaction we only style the arrow body + # If direction is irrelevant (e.g., RAS), style only the arrow body if not mindReactionDir: return self.applyTo(getArrowBodyElementId(reactionId), metabMap, self.toStyleStr()) @@ -453,24 +452,6 @@ self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True)) if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True)) - # TODO: this seems to be unused, remove - def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str: - """ - Computes the reaction ID as encoded in the map for a given reaction ID from the dataset. - - Args: - reactionId: the reaction ID, as encoded in the dataset. - mindReactionDir: if True forward (F_) and backward (B_) directions will be encoded in the result. - - Returns: - str : the ID of an arrow's body or tips in the map. - """ - # we assume the reactionIds also don't encode reaction dir if they don't mind it when styling the map. - if not mindReactionDir: return "R_" + reactionId - - #TODO: this is clearly something we need to make consistent in RPS - return (reactionId[:-3:-1] + reactionId[:-2]) if reactionId[:-2] in ["_F", "_B"] else f"F_{reactionId}" # "Pyr_F" --> "F_Pyr" - def toStyleStr(self, *, downSizedForTips = False) -> str: """ Collapses the styles of this Arrow into a str, ready to be applied as part of the "style" property on an svg element. @@ -482,13 +463,11 @@ if downSizedForTips: width *= 0.8 return f";stroke:{self.col};stroke-width:{width};stroke-dasharray:{'5,5' if self.dash else 'none'}" -# vvv These constants could be inside the class itself a static properties, but python -# was built by brainless organisms so here we are! +# Default arrows used for different significance states INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid) INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True) TRANSPARENT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Transparent) # Who cares how big it is if it's transparent -# TODO: A more general version of this can be used for RAS as well, we don't need "fix map" or whatever def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None: """ Applies RPS enrichment results to the provided metabolic map. @@ -581,7 +560,7 @@ if l: class_pat[classe] = { - "values": list(map(list, zip(*l))), # trasposta + "values": list(map(list, zip(*l))), # transpose "samples": sample_ids } continue @@ -592,7 +571,7 @@ return class_pat ############################ conversion ############################################## -#conversion from svg to png +# Conversion from SVG to PNG def svg_to_png_with_background(svg_path :utils.FilePath, png_path :utils.FilePath, dpi :int = 72, scale :int = 1, size :Optional[float] = None) -> None: """ Internal utility to convert an SVG to PNG (forced opaque) to aid in PDF conversion. @@ -660,14 +639,8 @@ Returns: utils.FilePath : _description_ """ - # This function returns a util data structure but is extremely specific to this module. - # RAS also uses collections as output and as such might benefit from a method like this, but I'd wait - # TODO: until a third tool with multiple outputs appears before porting this to utils. return utils.FilePath( f"{dataset1Name}_vs_{dataset2Name}" + (f" ({details})" if details else ""), - # ^^^ yes this string is built every time even if the form is the same for the same 2 datasets in - # all output files: I don't care, this was never the performance bottleneck of the tool and - # there is no other net gain in saving and re-using the built string. ext, prefix = ARGS.output_path) @@ -683,10 +656,8 @@ if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch) writer.writerow({ field : data for field, data in zip(fieldNames, row) }) -OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible +OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] def temp_thingsInCommon(tmp :OldEnrichedScores, core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None: - # this function compiles the things always in common between comparison modes after enrichment. - # TODO: organize, name better. suffix = "RAS" if ras_enrichment else "RPS" writeToCsv( [ [reactId] + values for reactId, values in tmp.items() ], @@ -809,7 +780,6 @@ # TODO: the net RPS computation should be done in the RPS module def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float, Dict[str, Tuple[np.ndarray, np.ndarray]]]: - #TODO: the following code still suffers from "dumbvarnames-osis" netRPS :Dict[str, Tuple[np.ndarray, np.ndarray]] = {} comparisonResult :Dict[str, List[Union[float, FoldChange]]] = {} count = 0 @@ -818,7 +788,7 @@ for l1, l2 in zip(dataset1Data, dataset2Data): reactId = ids[count] count += 1 - if not reactId: continue # we skip ids that have already been processed + if not reactId: continue try: #TODO: identify the source of these errors and minimize code in the try block reactDir = ReactionDirection.fromReactionId(reactId) @@ -947,13 +917,11 @@ except Exception as e: raise utils.DataErr(pdfPath.show(), f'Error generating PDF file: {e}') - if not ARGS.generate_svg: # This argument is useless, who cares if the user wants the svg or not + if not ARGS.generate_svg: os.remove(svgFilePath.show()) ClassPat = Dict[str, List[List[float]]] def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat, Dict[str, List[str]]]: - # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate, - # for the sake of everyone's sanity. columnNames :Dict[str, List[str]] = {} # { datasetName : [ columnName, ... ], ... } class_pat :ClassPat = {} if ARGS.option == 'datasets': @@ -983,9 +951,7 @@ columnNames[clas] = ["Reactions", *values_and_samples_id["samples"]] return ids, class_pat, columnNames - #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs) -#TODO: create these damn args as FilePath objects def getDatasetValues(datasetPath :str, datasetName :str) -> Tuple[ClassPat, List[str]]: """ Opens the dataset at the given path and extracts the values (expected nullable numerics) and the IDs. @@ -1025,18 +991,16 @@ ARGS.tool_dir, utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None) - # TODO: in the future keep the indices WITH the data and fix the code below. - # Prepare enrichment results containers ras_results = [] rps_results = [] # Compute RAS enrichment if requested - if ARGS.using_RAS: # vvv columnNames only matter with RPS data + if ARGS.using_RAS: ids_ras, class_pat_ras, _ = getClassesAndIdsFromDatasets( ARGS.input_datas, ARGS.input_data, ARGS.input_class, ARGS.names) ras_results, _ = computeEnrichment(class_pat_ras, ids_ras, fromRAS=True) - # ^^^ netRPS only matter with RPS data + # Compute RPS enrichment if requested if ARGS.using_RPS: @@ -1065,7 +1029,7 @@ # Apply RPS styling to arrow heads if res.get('rps'): tmp_rps, max_z_rps = res['rps'] - # applyRpsEnrichmentToMap styles only heads unless only RPS are used + temp_thingsInCommon(tmp_rps, map_copy, max_z_rps, i, j, ras_enrichment=False) # Output both SVG and PDF/PNG as configured @@ -1076,7 +1040,6 @@ for datasetName, rows in netRPS.items(): writeToCsv( [[reactId, *netValues] for reactId, netValues in rows.items()], - # vvv In weird comparison modes the dataset names are not recorded properly.. columnNames.get(datasetName, ["Reactions"]), utils.FilePath( "Net_RPS_" + datasetName,
--- a/COBRAxy/ras_generator_beta.py Fri Sep 12 15:05:54 2025 +0000 +++ b/COBRAxy/ras_generator_beta.py Fri Sep 12 17:28:45 2025 +0000 @@ -1,5 +1,10 @@ +""" +Generate Reaction Activity Scores (RAS) from a gene expression dataset and GPR rules. + +The script reads a tabular dataset (genes x samples) and a rules file (GPRs), +computes RAS per reaction for each sample/cell line, and writes a tabular output. +""" from __future__ import division -# galaxy complains this ^^^ needs to be at the very beginning of the file, for some reason. import sys import argparse import collections @@ -8,7 +13,6 @@ import utils.general_utils as utils import utils.rule_parsing as ruleUtils from typing import Union, Optional, List, Dict, Tuple, TypeVar -import os ERRORS = [] ########################## argparse ########################################## @@ -31,7 +35,7 @@ help = "path to input file containing the rules") parser.add_argument("-rn", "--model_upload_name", type = str, help = "custom rules name") - # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in + # Galaxy converts files into .dat, this helps infer the original extension when needed. parser.add_argument( '-n', '--none', @@ -49,7 +53,7 @@ help = "Output log") parser.add_argument( - '-in', '--input', #id è diventato in + '-in', '--input', type = str, help = 'input dataset') @@ -253,14 +257,14 @@ ############################ resolve ########################################## def replace_gene_value(l :str, d :str) -> Tuple[Union[int, float], list]: """ - Replace gene identifiers with corresponding values from a dictionary. + Replace gene identifiers in a parsed rule expression with values from a dict. Args: - l (str): String of gene identifier. - d (str): String corresponding to its value. + l: Parsed rule as a nested list structure (strings, lists, and operators). + d: Dict mapping gene IDs to numeric values. Returns: - tuple: A tuple containing two lists: the first list contains replaced values, and the second list contains any errors encountered during replacement. + tuple: (new_expression, not_found_genes) """ tmp = [] err = [] @@ -277,16 +281,16 @@ l = l[1:] return (tmp, err) -def replace_gene(l :str, d :str) -> Union[int, float]: +def replace_gene(l: str, d: Dict[str, Union[int, float]]) -> Union[int, float, None]: """ Replace a single gene identifier with its corresponding value from a dictionary. Args: l (str): Gene identifier to replace. - d (str): String corresponding to its value. + d (dict): Dict mapping gene IDs to numeric values. Returns: - float/int: Corresponding value from the dictionary if found, None otherwise. + float/int/None: Corresponding value from the dictionary if found, None otherwise. Raises: sys.exit: If the value associated with the gene identifier is not valid. @@ -508,9 +512,9 @@ Args: dataset (pd.DataFrame): Dataset containing gene values. rules (dict): The dict containing reaction ids as keys and rules as values. - - Side effects: - dataset : mut + + Note: + Modifies dataset in place by setting the first column as index. Returns: dict: A dictionary where each key corresponds to a cell line name and each value is a dictionary @@ -590,11 +594,11 @@ def save_as_tsv(rasScores: Dict[str, Dict[str, Ras]], reactions :List[str]) -> None: """ - Save computed ras scores to the given path, as a tsv file. + Save computed RAS scores to ARGS.ras_output as a TSV file. Args: rasScores : the computed ras scores. - path : the output tsv file's path. + reactions : the list of reaction IDs, used as the first column. Returns: None @@ -627,7 +631,7 @@ """ supportedGenesInEncoding = geneTranslator[encoding] if geneName in supportedGenesInEncoding: return supportedGenesInEncoding[geneName] - raise ValueError(f"Gene \"{geneName}\" non trovato, verifica di star utilizzando il modello corretto!") + raise ValueError(f"Gene '{geneName}' not found. Please verify you are using the correct model.") def load_custom_rules() -> Dict[str, ruleUtils.OpList]: """ @@ -637,14 +641,7 @@ Returns: Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules. """ - datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload) # actual file, stored in galaxy as a .dat - - #try: filenamePath = utils.FilePath.fromStrPath(ARGS.model_upload_name) # file's name in input, to determine its original ext - #except utils.PathErr as err: - # utils.logWarning(f"Cannot determine file extension from filename '{ARGS.model_upload_name}'. Assuming tabular format.", ARGS.out_log) - # filenamePath = None - - #if filenamePath.ext is utils.FileFormat.PICKLE: return utils.readPickle(datFilePath) + datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload) # actual file, stored in Galaxy as a .dat dict_rule = {} @@ -658,7 +655,7 @@ id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR") - # Proviamo prima con delimitatore tab + # First, try using a tab delimiter for line in rows[1:]: if len(line) <= idx_gpr: utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log) @@ -670,7 +667,7 @@ dict_rule[line[id_idx]] = ruleUtils.parseRuleToNestedList(line[idx_gpr]) except Exception as e: - # Se fallisce con tab, proviamo con virgola + # If parsing with tabs fails, try comma delimiter try: rows = utils.readCsv(datFilePath, delimiter = ",", skipHeader=False) @@ -682,7 +679,7 @@ id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR") - # Proviamo prima con delimitatore tab + # Try again parsing row content with the GPR column using comma-separated values for line in rows[1:]: if len(line) <= idx_gpr: utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log) @@ -729,39 +726,7 @@ ARGS.out_log) - ############ - - # handle custom models - #model :utils.Model = ARGS.rules_selector - - #if model is utils.Model.Custom: - # rules = load_custom_rules() - # reactions = list(rules.keys()) - - # save_as_tsv(ras_for_cell_lines(dataset, rules), reactions) - # if ERRORS: utils.logWarning( - # f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}", - # ARGS.out_log) - - # return - - # This is the standard flow of the ras_generator program, for non-custom models. - #name = "RAS Dataset" - #type_gene = gene_type(dataset.iloc[0, 0], name) - - #rules = model.getRules(ARGS.tool_dir) - #genes = data_gene(dataset, type_gene, name, None) - #ids, rules = load_id_rules(rules.get(type_gene)) - - #resolve_rules, err = resolve(genes, rules, ids, ARGS.none, name) - #create_ras(resolve_rules, name, rules, ids, ARGS.ras_output) - - #if err: utils.logWarning( - # f"Warning: gene(s) {err} not found in class \"{name}\", " + - # "the expression level for this gene will be considered NaN", - # ARGS.out_log) - - print("Execution succeded") + print("Execution succeeded") ############################################################################### if __name__ == "__main__":
--- a/COBRAxy/ras_to_bounds_beta.py Fri Sep 12 15:05:54 2025 +0000 +++ b/COBRAxy/ras_to_bounds_beta.py Fri Sep 12 17:28:45 2025 +0000 @@ -1,3 +1,13 @@ +""" +Apply RAS-based scaling to reaction bounds and optionally save updated models. + +Workflow: +- Read one or more RAS matrices (patients/samples x reactions) +- Normalize and merge them, optionally adding class suffixes to sample IDs +- Build a COBRA model from a tabular CSV +- Run FVA to initialize bounds, then scale per-sample based on RAS values +- Save bounds per sample and optionally export updated models in chosen formats +""" import argparse import utils.general_utils as utils from typing import Optional, Dict, Set, List, Tuple, Union @@ -5,13 +15,9 @@ import numpy as np import pandas as pd import cobra -from cobra import Model, Reaction, Metabolite -import re +from cobra import Model import sys -import csv from joblib import Parallel, delayed, cpu_count -import utils.rule_parsing as rulesUtils -import utils.reaction_parsing as reactionUtils import utils.model_utils as modelUtils ################################# process args ############################### @@ -181,7 +187,7 @@ df_reactions = pd.DataFrame(list(reactions.items()), columns = ["ReactionID", "Reaction"]) df_bounds = bounds.reset_index().rename(columns = {"index": "ReactionID"}) df_medium = medium.rename(columns = {"reaction": "ReactionID"}) - df_medium["InMedium"] = True # flag per indicare la presenza nel medium + df_medium["InMedium"] = True merged = df_reactions.merge(df_rules, on = "ReactionID", how = "outer") merged = merged.merge(df_bounds, on = "ReactionID", how = "outer") @@ -264,7 +270,7 @@ modified_model = apply_bounds_to_model(model, new_bounds) save_model(modified_model, cellName, save_models_path, save_models_format) - pass + return def generate_bounds_model(model: cobra.Model, ras=None, output_folder='output/', save_models=False, save_models_path='saved_models/', save_models_format='csv') -> pd.DataFrame: """ @@ -298,12 +304,12 @@ ) for cellName, ras_row in ras.iterrows()) else: raise ValueError("RAS DataFrame is None. Cannot generate bounds without RAS data.") - pass + return ############################# main ########################################### def main(args:List[str] = None) -> None: """ - Initializes everything and sets the program in motion based on the fronted input arguments. + Initialize and execute RAS-to-bounds pipeline based on the frontend input arguments. Returns: None @@ -321,7 +327,7 @@ error_message = "Duplicated file names in the uploaded RAS matrices." warning(error_message) raise ValueError(error_message) - pass + ras_class_names = [] for file in ras_file_names: ras_class_names.append(file.rsplit(".", 1)[0]) @@ -334,7 +340,7 @@ ras = ras.T ras = ras.astype(float) if(len(ras_file_list)>1): - #append class name to patient id (dataframe index) + # Append class name to patient id (DataFrame index) ras.index = [f"{idx}_{ras_class_name}" for idx in ras.index] else: ras.index = [f"{idx}" for idx in ras.index] @@ -343,9 +349,9 @@ class_assignments.loc[class_assignments.shape[0]] = [patient_id, ras_class_name] - # Concatenate all ras DataFrames into a single DataFrame + # Concatenate all RAS DataFrames into a single DataFrame ras_combined = pd.concat(ras_list, axis=0) - # Normalize the RAS values by max RAS + # Normalize RAS values column-wise by max RAS ras_combined = ras_combined.div(ras_combined.max(axis=0)) ras_combined.dropna(axis=1, how='all', inplace=True) @@ -353,7 +359,7 @@ validation = modelUtils.validate_model(model) - print("\n=== VALIDAZIONE MODELLO ===") + print("\n=== MODEL VALIDATION ===") for key, value in validation.items(): print(f"{key}: {value}") @@ -364,7 +370,7 @@ class_assignments.to_csv(ARGS.cell_class, sep='\t', index=False) - pass + return ############################################################################## if __name__ == "__main__":
--- a/COBRAxy/rps_generator_beta.py Fri Sep 12 15:05:54 2025 +0000 +++ b/COBRAxy/rps_generator_beta.py Fri Sep 12 17:28:45 2025 +0000 @@ -1,3 +1,10 @@ +""" +Compute Reaction Propensity Scores (RPS) from metabolite abundances and reaction stoichiometry. + +Given a tabular dataset (metabolites x samples) and a reaction set, this script +maps metabolite names via synonyms, fills missing metabolites, and computes RPS +per reaction for each sample using a log-normalized formula. +""" import math import argparse @@ -22,14 +29,14 @@ Returns: Namespace: An object containing parsed arguments. """ - parser = argparse.ArgumentParser(usage = '%(prog)s [options]', - description = 'process some value\'s'+ - ' abundances and reactions to create RPS scores.') + parser = argparse.ArgumentParser( + usage='%(prog)s [options]', + description='Process abundances and reactions to create RPS scores.' + ) parser.add_argument("-rl", "--model_upload", type = str, help = "path to input file containing the reactions") - # model_upload custom parser.add_argument('-td', '--tool_dir', type = str, required = True, @@ -54,8 +61,8 @@ Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique. Args: - name_data : name associated with the dataset (from frontend input params) - count : counter from 1 to make these names unique (external) + name_data: Name associated with the dataset (from frontend input params). + count: Counter starting at 1 to make names unique when default. Returns: str : the name made unique @@ -81,7 +88,7 @@ Returns None if the cell index is invalid. """ if cell_line_index < 0 or cell_line_index >= len(dataset.index): - print(f"Errore: This cell line index: '{cell_line_index}' is not valid.") + print(f"Error: cell line index '{cell_line_index}' is not valid.") return None cell_line_name = dataset.columns[cell_line_index] @@ -138,7 +145,7 @@ list[str] : list of metabolite names that were missing in the original abundances dictionary and thus their aboundances were set to 1. Side effects: - dataset_by_rows : mut + dataset_by_rows: mutated to include missing metabolites with default abundances. """ missing_list = [] for reaction in reactions.values(): @@ -203,8 +210,8 @@ abundances_dict = {} for row in dataset[1:]: - id = get_metabolite_id(row[0], syn_dict) #if translationIsApplied else row[0] - if id: + id = get_metabolite_id(row[0], syn_dict) + if id: abundances_dict[id] = list(map(utils.Float(), row[1:])) missing_list = check_missing_metab(reactions, abundances_dict, len((cell_lines))) @@ -217,7 +224,8 @@ df = pd.DataFrame.from_dict(rps_scores) df = df.loc[list(reactions.keys()),:] - print(df.head(10)) + # Optional preview: first 10 rows + # print(df.head(10)) df.index.name = 'Reactions' df.to_csv(ARGS.rps_output, sep='\t', na_rep='None', index=True) @@ -232,20 +240,17 @@ global ARGS ARGS = process_args(args) - # TODO:use utils functions vvv + # Load support data (black list and synonyms) with open(ARGS.tool_dir + '/local/pickle files/black_list.pickle', 'rb') as bl: black_list = pk.load(bl) with open(ARGS.tool_dir + '/local/pickle files/synonyms.pickle', 'rb') as sd: syn_dict = pk.load(sd) - dataset = utils.readCsv(utils.FilePath.fromStrPath(ARGS.input), '\t', skipHeader = False) + dataset = utils.readCsv(utils.FilePath.fromStrPath(ARGS.input), '\t', skipHeader=False) tmp_dict = None - #if ARGS.reaction_choice == 'default': - # reactions = pk.load(open(ARGS.tool_dir + '/local/pickle files/reactions.pickle', 'rb')) - # substrateFreqTable = pk.load(open(ARGS.tool_dir + '/local/pickle files/substrate_frequencies.pickle', 'rb')) - - #elif ARGS.reaction_choice == 'custom': + + # Parse custom reactions from uploaded file reactions = reactionUtils.parse_custom_reactions(ARGS.model_upload) for r, s in reactions.items(): tmp_list = list(s.keys()) @@ -258,20 +263,21 @@ if substrateName not in substrateFreqTable: substrateFreqTable[substrateName] = 0 substrateFreqTable[substrateName] += 1 - print(f"Reactions: {reactions}") - print(f"Substrate Frequencies: {substrateFreqTable}") - print(f"Synonyms: {syn_dict}") + # Debug prints (can be enabled during troubleshooting) + # print(f"Reactions: {reactions}") + # print(f"Substrate Frequencies: {substrateFreqTable}") + # print(f"Synonyms: {syn_dict}") tmp_dict = {} for metabName, freq in substrateFreqTable.items(): tmp_metabName = clean_metabolite_name(metabName) for syn_key, syn_list in syn_dict.items(): if tmp_metabName in syn_list or tmp_metabName == clean_metabolite_name(syn_key): - print(f"Mapping {tmp_metabName} to {syn_key}") + # print(f"Mapping {tmp_metabName} to {syn_key}") tmp_dict[syn_key] = syn_list tmp_dict[syn_key].append(tmp_metabName) rps_for_cell_lines(dataset, reactions, black_list, syn_dict, substrateFreqTable) - print('Execution succeded') + print('Execution succeeded') ############################################################################## if __name__ == "__main__": main()
--- a/COBRAxy/utils/CBS_backend.py Fri Sep 12 15:05:54 2025 +0000 +++ b/COBRAxy/utils/CBS_backend.py Fri Sep 12 17:28:45 2025 +0000 @@ -1,3 +1,10 @@ +""" +CBS backend utilities using GLPK for constraint-based sampling. + +This module builds and solves LP problems from COBRA models, supports random +objective function generation (CBS), and provides both swiglpk-based and +COBRApy-based sampling fallbacks. +""" from swiglpk import * import random import pandas as pd @@ -6,6 +13,20 @@ # Initialize LP problem def initialize_lp_problem(S): + """ + Prepare sparse matrix structures for GLPK given a stoichiometric matrix. + + Args: + S: Stoichiometric matrix (DOK or similar) as returned by cobra.util.create_stoichiometric_matrix. + + Returns: + tuple: (len_vector, values, indexes, ia, ja, ar, nrows, ncol) + - len_vector: number of non-zero entries + - values: list of non-zero values + - indexes: list of (row, col) indices for non-zero entries + - ia, ja, ar: GLPK-ready arrays for the sparse matrix + - nrows, ncol: matrix shape + """ len_vector=len(S.keys()) values=list(S.values()) @@ -29,9 +50,22 @@ -# Solve LP problem from the structure of the metabolic model def create_and_solve_lp_problem(lb,ub,nrows, ncol, len_vector, ia, ja, ar, obj_coefs,reactions,return_lp=False): + """ + Create and solve a GLPK LP problem for a metabolic model. + + Args: + lb, ub: Lower/upper bounds per reaction (lists of floats). + nrows, ncol: Dimensions of the S matrix. + len_vector, ia, ja, ar: Sparse matrix data prepared for GLPK. + obj_coefs: Objective function coefficients (list of floats). + reactions: Reaction identifiers (list of str), used for output mapping. + return_lp: If True, also return the GLPK problem object (caller must delete). + + Returns: + tuple: (fluxes, Z) or (fluxes, Z, lp) if return_lp=True. + """ lp = glp_create_prob(); @@ -60,8 +94,18 @@ raise Exception(e) -# Solve LP problem from the structure of the metabolic model def solve_lp_problem(lp,obj_coefs,reactions): + """ + Configure and solve an LP with GLPK using provided objective coefficients. + + Args: + lp: GLPK problem handle. + obj_coefs: Objective coefficients. + reactions: Reaction identifiers (unused in computation; length used for output). + + Returns: + tuple: (fluxes, Z) where fluxes is a list of primal column values and Z is the objective value. + """ # Set the coefficients of the objective function i=1 @@ -101,8 +145,16 @@ # Re-enable terminal output after solving glp_term_out(GLP_ON) -# Create LP structure def create_lp_structure(model): + """ + Extract the LP structure (S matrix, bounds, objective) from a COBRA model. + + Args: + model (cobra.Model): The COBRA model. + + Returns: + tuple: (S, lb, ub, coefs_obj, reactions) + """ reactions=[el.id for el in model.reactions] coefs_obj=[reaction.objective_coefficient for reaction in model.reactions] @@ -116,8 +168,19 @@ return S,lb,ub,coefs_obj,reactions -# CBS sampling interface def randomObjectiveFunctionSampling(model, nsample, coefficients_df, df_sample): + """ + Sample fluxes using GLPK by iterating over random objective functions. + + Args: + model: COBRA model. + nsample: Number of samples to generate. + coefficients_df: DataFrame of objective coefficients (columns are samples; last row is type_of_problem). + df_sample: Output DataFrame to fill with flux samples (index: sample, columns: reactions). + + Returns: + None + """ S,lb,ub,coefs_obj,reactions = create_lp_structure(model) len_vector, values, indexes, ia, ja, ar, nrow, ncol = initialize_lp_problem(S) @@ -126,7 +189,7 @@ coefs_obj=coefficients_df.iloc[:,i].values - if coefs_obj[-1]==1: #minimize + if coefs_obj[-1]==1: # minimize coefs_obj= coefs_obj[0:-1] * -1 else: coefs_obj=coefs_obj[0:-1] @@ -134,17 +197,29 @@ fluxes,Z = create_and_solve_lp_problem(lb,ub, nrow, ncol, len_vector, ia, ja, ar, coefs_obj,reactions,return_lp=False) df_sample.loc[i] = fluxes - pass + return def randomObjectiveFunctionSampling_cobrapy(model, nsample, coefficients_df, df_sample): + """ + Fallback sampling using COBRApy's optimize with per-sample randomized objectives. + + Args: + model: COBRA model. + nsample: Number of samples to generate. + coefficients_df: DataFrame of objective coefficients (columns are samples; last row is type_of_problem). + df_sample: Output DataFrame to fill with flux samples (index: sample, columns: reactions). + + Returns: + None + """ for i in range(nsample): dict_coeff={} if(coefficients_df.iloc[-1][i]==1): - type_problem = -1 #minimize + type_problem = -1 # minimize else: - type_problem = 1 + type_problem = 1 # maximize for rxn in [reaction.id for reaction in model.reactions]: dict_coeff[model.reactions.get_by_id(rxn)] = coefficients_df.loc[rxn][i] * type_problem @@ -154,57 +229,69 @@ for rxn, flux in solution.items(): df_sample.loc[i][rxn] = flux - pass + return + +def randomObjectiveFunction(model, n_samples, df_fva, seed=0): + """ + Create random objective function coefficients for CBS sampling. -# Create random coefficients for CBS -def randomObjectiveFunction(model, n_samples, df_fva, seed=0): - + The last row 'type_of_problem' encodes 0 for maximize and 1 for minimize. + + Args: + model: COBRA model. + n_samples: Number of random objectives to generate. + df_fva: Flux Variability Analysis results with 'minimum' and 'maximum' per reaction. + seed: Seed for reproducibility. - #reactions = model.reactions - reactions = [reaction.id for reaction in model.reactions] - cont=seed - list_ex=reactions.copy() - list_ex.append("type_of_problem") - coefficients_df = pd.DataFrame(index=list_ex,columns=[str(i) for i in range(n_samples)]) + Returns: + pandas.DataFrame: Coefficients DataFrame indexed by reaction IDs plus 'type_of_problem'. + """ + # reactions = model.reactions + reactions = [reaction.id for reaction in model.reactions] + cont = seed + list_ex = reactions.copy() + list_ex.append("type_of_problem") + coefficients_df = pd.DataFrame(index=list_ex, columns=[str(i) for i in range(n_samples)]) + + for i in range(0, n_samples): - for i in range(0, n_samples): - - cont=cont+1 + cont = cont + 1 + random.seed(cont) + + # Generate a random threshold in [0, 1] + threshold = random.random() + + for reaction in reactions: + + cont = cont + 1 random.seed(cont) - - # Genera un numero casuale tra 0 e 1 - threshold = random.random() #coefficiente tra 0 e 1 - - for reaction in reactions: + + val = random.random() - cont=cont+1 + if val > threshold: + + cont = cont + 1 random.seed(cont) - - val=random.random() - - if val>threshold: + + # Coefficient in [-1, 1] + c = 2 * random.random() - 1 - cont=cont+1 - random.seed(cont) - - c=2*random.random()-1 #coefficiente tra -1 e 1 - - val_max = np.max([abs(df_fva.loc[reaction,"minimum"]), abs(df_fva.loc[reaction,"maximum"])]) + val_max = np.max([abs(df_fva.loc[reaction, "minimum"]), abs(df_fva.loc[reaction, "maximum"])]) + + if val_max != 0: # only if FVA is non-zero + coefficients_df.loc[reaction, str(i)] = c / val_max # scale by FVA + else: + coefficients_df.loc[reaction, str(i)] = 0 - if val_max!=0: #solo se la fva è diversa da zero - coefficients_df.loc[reaction,str(i)] = c/val_max #divido per la fva - else: - coefficients_df.loc[reaction,str(i)] = 0 + else: + coefficients_df.loc[reaction, str(i)] = 0 - else: - coefficients_df.loc[reaction,str(i)] = 0 + cont = cont + 1 + random.seed(cont) - cont=cont+1 - random.seed(cont) - - if random.random()<0.5: - coefficients_df.loc["type_of_problem",str(i)] = 0 #maximize - else: - coefficients_df.loc["type_of_problem",str(i)] = 1 #minimize - - return coefficients_df + if random.random() < 0.5: + coefficients_df.loc["type_of_problem", str(i)] = 0 # maximize + else: + coefficients_df.loc["type_of_problem", str(i)] = 1 # minimize + + return coefficients_df
--- a/COBRAxy/utils/general_utils.py Fri Sep 12 15:05:54 2025 +0000 +++ b/COBRAxy/utils/general_utils.py Fri Sep 12 17:28:45 2025 +0000 @@ -1,3 +1,13 @@ +""" +General utilities for COBRAxy. + +This module provides: +- File and path helpers (FileFormat, FilePath) +- Error and result handling utilities (CustomErr, Result) +- Basic I/O helpers (CSV/TSV, pickle, SVG) +- Lightweight CLI argument parsers (Bool, Float) +- Model loader utilities for COBRA models, including compressed formats +""" import math import re import sys @@ -7,11 +17,10 @@ from enum import Enum from itertools import count -from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Set, Tuple +from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Tuple import pandas as pd import cobra -from cobra import Model as cobraModel, Reaction, Metabolite import zipfile import gzip @@ -19,7 +28,7 @@ from io import StringIO - +from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Tuple class ValueErr(Exception): def __init__(self, param_name, expected, actual): super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}") @@ -32,21 +41,21 @@ """ Encodes possible file extensions to conditionally save data in a different format. """ - DAT = ("dat",) # this is how galaxy treats all your files! - CSV = ("csv",) # this is how most editable input data is written - TSV = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!! - SVG = ("svg",) # this is how most metabolic maps are written - PNG = ("png",) # this is a common output format for images (such as metabolic maps) - PDF = ("pdf",) # this is also a common output format for images, as it's required in publications. - - # Updated to include compressed variants - XML = ("xml", "xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed - JSON = ("json", "json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed - MAT = ("mat", "mat.gz", "mat.zip", "mat.bz2") # COBRA models can be stored as MAT files, sometimes compressed - YML = ("yml", "yml.gz", "yml.zip", "yml.bz2") # COBRA models can be stored as YML files, sometimes compressed + DAT = ("dat",) + CSV = ("csv",) + TSV = ("tsv",) + SVG = ("svg",) + PNG = ("png",) + PDF = ("pdf",) - TXT = ("txt",) # this is how most output data is written - PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved + # Compressed variants for common model formats + XML = ("xml", "xml.gz", "xml.zip", "xml.bz2") + JSON = ("json", "json.gz", "json.zip", "json.bz2") + MAT = ("mat", "mat.gz", "mat.zip", "mat.bz2") + YML = ("yml", "yml.gz", "yml.zip", "yml.bz2") + + TXT = ("txt",) + PICKLE = ("pickle", "pk", "p") def __init__(self, *extensions): self.extensions = extensions @@ -83,11 +92,9 @@ Returns: str : the string representation of the file extension. """ - # If we have an original extension stored (for compressed files only), use it if hasattr(self, '_original_extension') and self._original_extension: return self._original_extension - # For XML, JSON, MAT and YML without original extension, use the base extension if self == FileFormat.XML: return "xml" elif self == FileFormat.JSON: @@ -101,18 +108,15 @@ class FilePath(): """ - Represents a file path. View this as an attempt to standardize file-related operations by expecting - values of this type in any process requesting a file path. + Represents a file path with format-aware helpers. """ def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None: """ - (Private) Initializes an instance of FilePath. + Initialize FilePath. Args: - path : the end of the path, containing the file name. - ext : the file's extension. - prefix : anything before path, if the last '/' isn't there it's added by the code. - Returns: - None : practically, a FilePath instance. + path: File name stem. + ext: File extension (FileFormat). + prefix: Optional directory path (trailing '/' auto-added). """ self.ext = ext self.filePath = filePath @@ -124,9 +128,7 @@ @classmethod def fromStrPath(cls, path: str) -> "FilePath": """ - Factory method to parse a string from which to obtain, if possible, a valid FilePath instance. - It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models. - These double extensions are not supported for other file types such as .csv. + Parse a string path into a FilePath, supporting double extensions for models (e.g., .json.gz). Args: path : the string containing the path Raises: @@ -141,27 +143,22 @@ prefix = result["prefix"] if result["prefix"] else "" name, ext = result["name"], result["ext"] - # Check for double extensions (json.gz, xml.zip, etc.) parts = path.split(".") if len(parts) >= 3: penultimate = parts[-2] last = parts[-1] double_ext = f"{penultimate}.{last}" - # Try the double extension first try: ext_format = FileFormat.fromExt(double_ext) name = ".".join(parts[:-2]) - # Extract prefix if it exists if '/' in name: prefix = name[:name.rfind('/') + 1] name = name[name.rfind('/') + 1:] return cls(name, ext_format, prefix=prefix) except ValueErr: - # If double extension doesn't work, fall back to single extension pass - # Single extension fallback (original logic) try: ext_format = FileFormat.fromExt(ext) return cls(name, ext_format, prefix=prefix) @@ -198,19 +195,14 @@ newline is added by the function. Args: - s (str): The warning message to be logged and printed. + msg (str): The warning message to be logged and printed. loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and immediately read back (beware relative expensive operation, log with caution). Returns: None """ - # building the path and then reading it immediately seems useless, but it's actually a way of - # validating that reduces repetition on the caller's side. Besides, logging a message by writing - # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from - # mindlessly logging whenever something comes up, log at the very end and tell the user everything - # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to - # the file only at the end of the program's execution. + # Note: validates path via FilePath; keep logging minimal to avoid overhead. with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n") class CustomErr(Exception): @@ -238,15 +230,19 @@ def throw(self, loggerPath = "") -> None: """ - Raises the current CustomErr instance, logging a warning message before doing so. + Raises the current CustomErr instance, optionally logging it first. + + Args: + loggerPath (str): Optional path to a log file to append this error before raising. Raises: self: The current CustomErr instance. - + Returns: None """ - if loggerPath: logWarning(str(self), loggerPath) + if loggerPath: + logWarning(str(self), loggerPath) raise self def abort(self) -> None: @@ -316,7 +312,7 @@ """ def __init__(self, value :Union[T, E], isOk :bool) -> None: """ - (Private) Initializes an instance of Result. + Initialize an instance of Result. Args: value (Union[T, E]): The value to be stored in the Result instance. @@ -332,7 +328,7 @@ @classmethod def Ok(cls, value :T) -> "Result": """ - Constructs a new Result instance with a successful operation. + Construct a successful Result. Args: value (T): The value to be stored in the Result instance, set as successful. @@ -345,7 +341,7 @@ @classmethod def Err(cls, value :E) -> "Result": """ - Constructs a new Result instance with a failed operation. + Construct a failed Result. Args: value (E): The value to be stored in the Result instance, set as failed. @@ -437,35 +433,6 @@ return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})" # FILES -def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame: - """ - Reads a .csv or .tsv file and returns it as a Pandas DataFrame. - - Args: - path : the path to the dataset file. - datasetName : the name of the dataset. - - Raises: - DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if - it has less than 2 columns. - - Returns: - pandas.DataFrame: The dataset loaded as a Pandas DataFrame. - """ - # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than - # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code. - # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really - # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and - # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is - # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions. - try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python") - except: - try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python") - except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}") - - if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns") - return dataset - def readPickle(path :FilePath) -> Any: """ Reads the contents of a .pickle file, which needs to exist at the given path. @@ -570,6 +537,7 @@ # UI ARGUMENTS class Bool: + """Simple boolean CLI argument parser accepting 'true' or 'false' (case-insensitive).""" def __init__(self, argName :str) -> None: self.argName = argName @@ -582,6 +550,7 @@ raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"") class Float: + """Float CLI argument parser supporting NaN and None keywords (case-insensitive).""" def __init__(self, argName = "Dataset values, not an argument") -> None: self.argName = argName @@ -607,7 +576,7 @@ ENGRO2_no_legend = "ENGRO2_no_legend" HMRcore = "HMRcore" HMRcore_no_legend = "HMRcore_no_legend" - Custom = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths. + Custom = "Custom" def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None: if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model") @@ -635,17 +604,20 @@ return readPickle(path) def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree: + """Open the SVG metabolic map for this model.""" path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/") self.__raiseMissingPathErr(path) return readSvg(path, customErr = DataErr(path, f"custom map in wrong format")) def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model: + """Load the COBRA model for this enum variant (supports Custom with explicit path/extension).""" if(self is Model.Custom): return self.load_custom_model(customPath, customExtension) else: return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show()) def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model: + """Load a COBRA model from a custom path, supporting XML, JSON, MAT, and YML (compressed or not).""" ext = ext if ext else file_path.ext try: if str(ext) in FileFormat.XML.value:
--- a/COBRAxy/utils/model_utils.py Fri Sep 12 15:05:54 2025 +0000 +++ b/COBRAxy/utils/model_utils.py Fri Sep 12 17:28:45 2025 +0000 @@ -1,14 +1,20 @@ +""" +Utilities for generating and manipulating COBRA models and related metadata. + +This module includes helpers to: +- extract rules, reactions, bounds, objective coefficients, and compartments +- build a COBRA model from a tabular file +- set objective and medium from dataframes +- validate a model and convert gene identifiers +- translate model GPRs using mapping tables +""" import os -import csv import cobra -import pickle -import argparse import pandas as pd import re import logging from typing import Optional, Tuple, Union, List, Dict, Set from collections import defaultdict -import utils.general_utils as utils import utils.rule_parsing as rulesUtils import utils.reaction_parsing as reactionUtils from cobra import Model as cobraModel, Reaction, Metabolite @@ -17,19 +23,16 @@ ReactionId = str def generate_rules(model: cobraModel, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]: """ - Generates a dictionary mapping reaction ids to rules from the model. + Generate a dictionary mapping reaction IDs to GPR rules from the model. Args: - model : the model to derive data from. - asParsed : if True parses the rules to an optimized runtime format, otherwise leaves them as strings. + model: COBRA model to derive data from. + asParsed: If True, parse rules into a nested list structure; otherwise keep raw strings. Returns: - Dict[ReactionId, rulesUtils.OpList] : the generated dictionary of parsed rules. - Dict[ReactionId, str] : the generated dictionary of raw rules. + Dict[ReactionId, rulesUtils.OpList]: Parsed rules by reaction ID. + Dict[ReactionId, str]: Raw rules by reaction ID. """ - # Is the below approach convoluted? yes - # Ok but is it inefficient? probably - # Ok but at least I don't have to repeat the check at every rule (I'm clinically insane) _ruleGetter = lambda reaction : reaction.gene_reaction_rule ruleExtractor = (lambda reaction : rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter @@ -41,14 +44,14 @@ def generate_reactions(model :cobraModel, *, asParsed = True) -> Dict[ReactionId, str]: """ - Generates a dictionary mapping reaction ids to reaction formulas from the model. + Generate a dictionary mapping reaction IDs to reaction formulas from the model. Args: - model : the model to derive data from. - asParsed : if True parses the reactions to an optimized runtime format, otherwise leaves them as they are. + model: COBRA model to derive data from. + asParsed: If True, convert formulas into a parsed representation; otherwise keep raw strings. Returns: - Dict[ReactionId, str] : the generated dictionary. + Dict[ReactionId, str]: Reactions by reaction ID (parsed if requested). """ unparsedReactions = { @@ -62,6 +65,12 @@ return reactionUtils.create_reaction_dict(unparsedReactions) def get_medium(model:cobraModel) -> pd.DataFrame: + """ + Extract the uptake reactions representing the model medium. + + Returns a DataFrame with a single column 'reaction' listing exchange reactions + with negative lower bound and no positive stoichiometric coefficients (uptake only). + """ trueMedium=[] for r in model.reactions: positiveCoeff=0 @@ -77,16 +86,16 @@ def extract_objective_coefficients(model: cobraModel) -> pd.DataFrame: """ - Estrae i coefficienti della funzione obiettivo per ciascuna reazione del modello. - + Extract objective coefficients for each reaction. + Args: - model : cobra.Model - + model: COBRA model + Returns: - pd.DataFrame con colonne: ReactionID, ObjectiveCoefficient + pd.DataFrame with columns: ReactionID, ObjectiveCoefficient """ coeffs = [] - # model.objective.expression è un'espressione lineare + # model.objective.expression is a linear expression objective_expr = model.objective.expression.as_coefficients_dict() for reaction in model.reactions: @@ -99,6 +108,12 @@ return pd.DataFrame(coeffs) def generate_bounds(model:cobraModel) -> pd.DataFrame: + """ + Build a DataFrame of lower/upper bounds for all reactions. + + Returns: + pd.DataFrame indexed by reaction IDs with columns ['lower_bound', 'upper_bound']. + """ rxns = [] for reaction in model.reactions: @@ -160,45 +175,38 @@ def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobraModel: """ - Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni. - + Build a COBRApy model from a tabular file with reaction data. + Args: - csv_path: Path al file CSV (separato da tab) - model_id: ID del modello da creare - + csv_path: Path to the tab-separated file. + model_id: ID for the newly created model. + Returns: - cobra.Model: Il modello COBRApy costruito + cobra.Model: The constructed COBRApy model. """ - # Leggi i dati dal CSV df = pd.read_csv(csv_path, sep='\t') - # Crea il modello vuoto model = cobraModel(model_id) - # Dict per tenere traccia di metaboliti e compartimenti metabolites_dict = {} compartments_dict = {} - print(f"Costruendo modello da {len(df)} reazioni...") + print(f"Building model from {len(df)} reactions...") - # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni for idx, row in df.iterrows(): reaction_formula = str(row['Formula']).strip() if not reaction_formula or reaction_formula == 'nan': continue - # Estrai metaboliti dalla formula della reazione metabolites = extract_metabolites_from_reaction(reaction_formula) for met_id in metabolites: compartment = extract_compartment_from_metabolite(met_id) - # Aggiungi compartimento se non esiste if compartment not in compartments_dict: compartments_dict[compartment] = compartment - # Aggiungi metabolita se non esiste if met_id not in metabolites_dict: metabolites_dict[met_id] = Metabolite( id=met_id, @@ -206,15 +214,12 @@ name=met_id.replace(f"_{compartment}", "").replace("__", "_") ) - # Aggiungi compartimenti al modello model.compartments = compartments_dict - # Aggiungi metaboliti al modello model.add_metabolites(list(metabolites_dict.values())) - print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti") + print(f"Added {len(metabolites_dict)} metabolites and {len(compartments_dict)} compartments") - # Seconda passata: aggiungi le reazioni reactions_added = 0 reactions_skipped = 0 @@ -223,44 +228,37 @@ reaction_id = str(row['ReactionID']).strip() reaction_formula = str(row['Formula']).strip() - # Salta reazioni senza formula if not reaction_formula or reaction_formula == 'nan': - raise ValueError(f"Formula della reazione mancante {reaction_id}") + raise ValueError(f"Missing reaction formula for {reaction_id}") - # Crea la reazione reaction = Reaction(reaction_id) reaction.name = reaction_id - # Imposta bounds reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0 - # Aggiungi gene rule se presente if pd.notna(row['GPR']) and str(row['GPR']).strip(): reaction.gene_reaction_rule = str(row['GPR']).strip() - # Parse della formula della reazione try: parse_reaction_formula(reaction, reaction_formula, metabolites_dict) except Exception as e: - print(f"Errore nel parsing della reazione {reaction_id}: {e}") + print(f"Error parsing reaction {reaction_id}: {e}") reactions_skipped += 1 continue - # Aggiungi la reazione al modello model.add_reactions([reaction]) reactions_added += 1 - print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni") + print(f"Added {reactions_added} reactions, skipped {reactions_skipped} reactions") # set objective function set_objective_from_csv(model, df, obj_col="ObjectiveCoefficient") - # Imposta il medium set_medium_from_data(model, df) - print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti") + print(f"Model completed: {len(model.reactions)} reactions, {len(model.metabolites)} metabolites") return model @@ -268,12 +266,12 @@ # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore) def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]: """ - Estrae gli ID dei metaboliti da una formula di reazione. - Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e) - e permette che comincino con cifre o underscore. + Extract metabolite IDs from a reaction formula. + Robust pattern: tokens ending with _<compartment> (e.g., _c, _m, _e), + allowing leading digits/underscores. """ metabolites = set() - # coefficiente opzionale seguito da un token che termina con _<letters> + # optional coefficient followed by a token ending with _<letters> pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)' matches = re.findall(pattern, reaction_formula) metabolites.update(matches) @@ -281,54 +279,39 @@ def extract_compartment_from_metabolite(metabolite_id: str) -> str: - """ - Estrae il compartimento dall'ID del metabolita. - """ - # Il compartimento è solitamente l'ultima lettera dopo l'underscore + """Extract the compartment from a metabolite ID.""" if '_' in metabolite_id: return metabolite_id.split('_')[-1] return 'c' # default cytoplasm def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]): - """ - Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti. - """ + """Parse a reaction formula and set metabolites with their coefficients.""" - if reaction.id == 'EX_thbpt_e': - print(reaction.id) - print(formula) - # Dividi in parte sinistra e destra if '<=>' in formula: left, right = formula.split('<=>') reversible = True elif '<--' in formula: left, right = formula.split('<--') reversible = False - left, right = left, right elif '-->' in formula: left, right = formula.split('-->') reversible = False elif '<-' in formula: left, right = formula.split('<-') reversible = False - left, right = left, right else: - raise ValueError(f"Formato reazione non riconosciuto: {formula}") + raise ValueError(f"Unrecognized reaction format: {formula}") - # Parse dei metaboliti e coefficienti reactants = parse_metabolites_side(left.strip()) products = parse_metabolites_side(right.strip()) - # Aggiungi metaboliti alla reazione metabolites_to_add = {} - # Reagenti (coefficienti negativi) for met_id, coeff in reactants.items(): if met_id in metabolites_dict: metabolites_to_add[metabolites_dict[met_id]] = -coeff - # Prodotti (coefficienti positivi) for met_id, coeff in products.items(): if met_id in metabolites_dict: metabolites_to_add[metabolites_dict[met_id]] = coeff @@ -337,9 +320,7 @@ def parse_metabolites_side(side_str: str) -> Dict[str, float]: - """ - Parsa un lato della reazione per estrarre metaboliti e coefficienti. - """ + """Parse one side of a reaction and extract metabolites with coefficients.""" metabolites = {} if not side_str or side_str.strip() == '': return metabolites @@ -350,7 +331,7 @@ if not term: continue - # pattern allineato: coefficiente opzionale + id che termina con _<compartimento> + # optional coefficient + id ending with _<compartment> match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term) if match: coeff_str, met_id = match.groups() @@ -372,7 +353,6 @@ reaction_id = str(row['ReactionID']).strip() coeff = float(row[obj_col]) if pd.notna(row[obj_col]) else 0.0 if coeff != 0: - # Assicuriamoci che la reazione esista nel modello if reaction_id in model.reactions: obj_dict[model.reactions.get_by_id(reaction_id)] = coeff else: @@ -381,7 +361,6 @@ if not obj_dict: raise ValueError("No reactions found with non-zero objective coefficient.") - # Imposta la funzione obiettivo model.objective = obj_dict print(f"Objective set with {len(obj_dict)} reactions.") @@ -389,9 +368,7 @@ def set_medium_from_data(model: cobraModel, df: pd.DataFrame): - """ - Imposta il medium basato sulla colonna InMedium. - """ + """Set the medium based on the 'InMedium' column in the dataframe.""" medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist() medium_dict = {} @@ -403,13 +380,11 @@ if medium_dict: model.medium = medium_dict - print(f"Medium impostato con {len(medium_dict)} componenti") + print(f"Medium set with {len(medium_dict)} components") def validate_model(model: cobraModel) -> Dict[str, any]: - """ - Valida il modello e fornisce statistiche di base. - """ + """Validate the model and return basic statistics.""" validation = { 'num_reactions': len(model.reactions), 'num_metabolites': len(model.metabolites), @@ -422,7 +397,7 @@ } try: - # Test di crescita + # Growth test solution = model.optimize() validation['growth_rate'] = solution.objective_value validation['status'] = solution.status @@ -432,7 +407,8 @@ return validation -def convert_genes(model,annotation): +def convert_genes(model, annotation): + """Rename genes using a selected annotation key in gene.notes; returns a model copy.""" from cobra.manipulation import rename_genes model2=model.copy() try: @@ -450,12 +426,12 @@ def _choose_columns(mapping_df: 'pd.DataFrame') -> Dict[str, str]: """ - Cerca colonne utili e ritorna dict {ensg: colname1, hgnc_id: colname2, ...} - Lancia ValueError se non trova almeno un mapping utile. + Find useful columns and return a dict {ensg: colname1, hgnc_id: colname2, ...}. + Raise ValueError if no suitable mapping is found. """ cols = { _normalize_colname(c): c for c in mapping_df.columns } chosen = {} - # possibili nomi per ciascuna categoria + # candidate names for each category candidates = { 'ensg': ['ensg', 'ensembl_gene_id', 'ensembl'], 'hgnc_id': ['hgnc_id', 'hgnc', 'hgnc:'], @@ -476,13 +452,8 @@ model_source_genes: Optional[Set[str]] = None, logger: Optional[logging.Logger] = None) -> None: """ - Verifica che, nel mapping_df (eventualmente già filtrato sui source di interesse), - ogni target sia associato ad al massimo un source. Se trova target associati a - >1 source solleva ValueError mostrando esempi. - - - mapping_df: DataFrame con colonne source_col, target_col - - model_source_genes: se fornito, è un set di source normalizzati che stiamo traducendo - (se None, si usa tutto mapping_df) + Check that, within the filtered mapping_df, each target maps to at most one source. + Log examples if duplicates are found. """ if logger is None: logger = logging.getLogger(__name__) @@ -491,12 +462,12 @@ logger.warning("Mapping dataframe is empty for the requested source genes; skipping uniqueness validation.") return - # normalizza le colonne temporanee per gruppi (senza modificare il df originale) + # normalize temporary columns for grouping (without altering the original df) tmp = mapping_df[[source_col, target_col]].copy() tmp['_src_norm'] = tmp[source_col].astype(str).map(_normalize_gene_id) tmp['_tgt_norm'] = tmp[target_col].astype(str).str.strip() - # se è passato un insieme di geni modello, filtra qui (già fatto nella chiamata, ma doppio-check ok) + # optionally filter to the set of model source genes if model_source_genes is not None: tmp = tmp[tmp['_src_norm'].isin(model_source_genes)] @@ -504,27 +475,27 @@ logger.warning("After filtering to model source genes, mapping table is empty — nothing to validate.") return - # costruisci il reverse mapping target -> set(sources) + # build reverse mapping: target -> set(sources) grouped = tmp.groupby('_tgt_norm')['_src_norm'].agg(lambda s: set(s.dropna())) - # trova target con più di 1 source + # find targets with more than one source problematic = {t: sorted(list(s)) for t, s in grouped.items() if len(s) > 1} if problematic: - # prepara messaggio di errore con esempi (fino a 20) + # prepare warning message with examples (limited subset) sample_items = list(problematic.items()) msg_lines = ["Mapping validation failed: some target IDs are associated with multiple source IDs."] for tgt, sources in sample_items: msg_lines.append(f" - target '{tgt}' <- sources: {', '.join(sources)}") full_msg = "\n".join(msg_lines) - # loggare e sollevare errore + # log warning logger.warning(full_msg) - # se tutto ok + # if everything is fine logger.info("Mapping validation passed: no target ID is associated with multiple source IDs (within filtered set).") def _normalize_gene_id(g: str) -> str: - """Rendi consistente un gene id per l'uso come chiave (rimuove prefissi come 'HGNC:' e strip).""" + """Normalize a gene ID for use as a key (removes prefixes like 'HGNC:' and strips).""" if g is None: return "" g = str(g).strip() @@ -535,30 +506,30 @@ def _simplify_boolean_expression(expr: str) -> str: """ - Semplifica un'espressione booleana rimuovendo duplicati e riducendo ridondanze. - Gestisce espressioni con 'and' e 'or'. + Simplify a boolean expression by removing duplicates and redundancies. + Handles expressions with 'and' and 'or'. """ if not expr or not expr.strip(): return expr - # Normalizza gli operatori + # normalize operators expr = expr.replace(' AND ', ' and ').replace(' OR ', ' or ') - # Funzione ricorsiva per processare le espressioni + # recursive helper to process expressions def process_expression(s: str) -> str: s = s.strip() if not s: return s - # Gestisci le parentesi + # handle parentheses while '(' in s: - # Trova la parentesi più interna + # find the innermost parentheses start = -1 for i, c in enumerate(s): if c == '(': start = i elif c == ')' and start != -1: - # Processa il contenuto tra parentesi + # process inner content inner = s[start+1:i] processed_inner = process_expression(inner) s = s[:start] + processed_inner + s[i+1:] @@ -566,7 +537,7 @@ else: break - # Dividi per 'or' al livello più alto + # split by 'or' at top level or_parts = [] current_part = "" paren_count = 0 @@ -590,10 +561,10 @@ if current_part.strip(): or_parts.append(current_part.strip()) - # Processa ogni parte OR + # process each OR part processed_or_parts = [] for or_part in or_parts: - # Dividi per 'and' all'interno di ogni parte OR + # split by 'and' within each OR part and_parts = [] current_and = "" paren_count = 0 @@ -617,7 +588,7 @@ if current_and.strip(): and_parts.append(current_and.strip()) - # Rimuovi duplicati nelle parti AND + # deduplicate AND parts unique_and_parts = list(dict.fromkeys(and_parts)) # mantiene l'ordine if len(unique_and_parts) == 1: @@ -625,7 +596,7 @@ elif len(unique_and_parts) > 1: processed_or_parts.append(" and ".join(unique_and_parts)) - # Rimuovi duplicati nelle parti OR + # deduplicate OR parts unique_or_parts = list(dict.fromkeys(processed_or_parts)) if len(unique_or_parts) == 1: @@ -638,7 +609,7 @@ try: return process_expression(expr) except Exception: - # Se la semplificazione fallisce, ritorna l'espressione originale + # if simplification fails, return the original expression return expr # ---------- Main public function ---------- @@ -649,13 +620,16 @@ allow_many_to_one: bool = False, logger: Optional[logging.Logger] = None) -> 'cobra.Model': """ - Translate model genes from source_nomenclature to target_nomenclature. - mapping_df should contain at least columns that allow the mapping - (e.g. ensg, hgnc_id, hgnc_symbol, entrez). - + Translate model genes from source_nomenclature to target_nomenclature using a mapping table. + mapping_df should contain columns enabling mapping (e.g., ensg, hgnc_id, hgnc_symbol, entrez). + Args: - allow_many_to_one: Se True, permette che più source vengano mappati allo stesso target - e gestisce i duplicati nelle GPR. Se False, valida l'unicità dei target. + model: COBRA model to translate. + mapping_df: DataFrame containing the mapping information. + target_nomenclature: Desired target key (e.g., 'hgnc_symbol'). + source_nomenclature: Current source key in the model (default 'hgnc_id'). + allow_many_to_one: If True, allow many-to-one mappings and handle duplicates in GPRs. + logger: Optional logger. """ if logger is None: logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @@ -711,11 +685,9 @@ filtered_map = tmp_map[tmp_map[col_for_src + "_norm"].isin(model_source_genes)].copy() - # Se non ci sono righe rilevanti, avvisa if filtered_map.empty: logger.warning("No mapping rows correspond to source genes present in the model after filtering. Proceeding with empty mapping (no translation will occur).") - # --- VALIDAZIONE: opzionale in base al parametro allow_many_to_one --- if not allow_many_to_one: _validate_target_uniqueness(filtered_map, col_for_src, col_for_tgt, model_source_genes=model_source_genes, logger=logger) @@ -739,7 +711,6 @@ if gpr and gpr.strip(): new_gpr = _translate_gpr(gpr, gene_mapping, stats, unmapped, multi, logger) if new_gpr != gpr: - # Semplifica l'espressione booleana per rimuovere duplicati simplified_gpr = _simplify_boolean_expression(new_gpr) if simplified_gpr != new_gpr: stats['simplified_gprs'] += 1
--- a/COBRAxy/utils/reaction_parsing.py Fri Sep 12 15:05:54 2025 +0000 +++ b/COBRAxy/utils/reaction_parsing.py Fri Sep 12 17:28:45 2025 +0000 @@ -1,15 +1,22 @@ +""" +Helpers to parse reaction strings into structured dictionaries. + +Features: +- Reaction direction detection (forward, backward, reversible) +- Parsing of custom reaction strings into stoichiometric maps +- Conversion of a dict of raw reactions into a directional reactions dict +- Loading custom reactions from a tabular file (TSV) +""" from enum import Enum import utils.general_utils as utils from typing import Dict -import csv import re # Reaction direction encoding: class ReactionDir(Enum): """ - A reaction can go forwards, backwards or be reversible (able to proceed in both directions). - Models created / managed with cobrapy encode this information within the reaction's - formula using the arrows this enum keeps as values. + A reaction can go forward, backward, or be reversible (both directions). + Cobrapy-style formulas encode direction using specific arrows handled here. """ FORWARD = "-->" BACKWARD = "<--" @@ -40,34 +47,29 @@ def add_custom_reaction(reactionsDict :ReactionsDict, rId :str, reaction :str) -> None: """ - Adds an entry to the given reactionsDict. Each entry consists of a given unique reaction id - (key) and a :dict (value) matching each substrate in the reaction to its stoichiometric coefficient. - Keys and values are both obtained from the reaction's formula: if a substrate (custom metabolite id) - appears without an explicit coeff, the value 1.0 will be used instead. + Add one reaction entry to reactionsDict. + + The entry maps each substrate ID to its stoichiometric coefficient. + If a substrate appears without an explicit coefficient, 1.0 is assumed. Args: - reactionsDict : dictionary encoding custom reactions information. - rId : unique reaction id. - reaction : the reaction's formula. + reactionsDict: Dict to update in place. + rId: Unique reaction ID. + reaction: Reaction formula string. Returns: None - Side effects: - reactionsDict : mut + Side effects: updates reactionsDict in place. """ reaction = reaction.strip() if not reaction: return reactionsDict[rId] = {} - # We assume the '+' separating consecutive metabs in a reaction is spaced from them, - # to avoid confusing it for electrical charge: + # Assumes ' + ' is spaced to avoid confusion with charge symbols. for word in reaction.split(" + "): metabId, stoichCoeff = word, 1.0 - # Implicit stoichiometric coeff is equal to 1, some coeffs are floats. - - # Accepted coeffs can be integer or floats with a dot (.) decimal separator - # and must be separated from the metab with a space: + # Coefficient can be integer or float (dot decimal) and must be space-separated. foundCoeff = re.search(r"\d+(\.\d+)? ", word) if foundCoeff: wholeMatch = foundCoeff.group(0) @@ -81,48 +83,39 @@ def create_reaction_dict(unparsed_reactions: Dict[str, str]) -> ReactionsDict: """ - Parses the given dictionary into the correct format. + Parse a dict of raw reaction strings into a directional reactions dict. Args: - unparsed_reactions (Dict[str, str]): A dictionary where keys are reaction IDs and values are unparsed reaction strings. + unparsed_reactions: Mapping reaction ID -> raw reaction string. Returns: - ReactionsDict: The correctly parsed dict. + ReactionsDict: Parsed dict. Reversible reactions produce two entries with _F and _B suffixes. """ reactionsDict :ReactionsDict = {} for rId, reaction in unparsed_reactions.items(): reactionDir = ReactionDir.fromReaction(reaction) left, right = reaction.split(f" {reactionDir.value} ") - # Reversible reactions are split into distinct reactions, one for each direction. - # In general we only care about substrates, the product information is lost. + # Reversible reactions are split into two: forward (_F) and backward (_B). reactionIsReversible = reactionDir is ReactionDir.REVERSIBLE if reactionDir is not ReactionDir.BACKWARD: add_custom_reaction(reactionsDict, rId + "_F" * reactionIsReversible, left) if reactionDir is not ReactionDir.FORWARD: add_custom_reaction(reactionsDict, rId + "_B" * reactionIsReversible, right) - - # ^^^ to further clarify: if a reaction is NOT reversible it will not be marked as _F or _B - # and whichever direction we DO keep (forward if --> and backward if <--) loses this information. - # This IS a small problem when coloring the map in marea.py because the arrow IDs in the map follow - # through with a similar convention on ALL reactions and correctly encode direction based on their - # model of origin. TODO: a proposed solution is to unify the standard in RPS to fully mimic the maps, - # which involves re-writing the "reactions" dictionary. return reactionsDict def parse_custom_reactions(customReactionsPath :str) -> ReactionsDict: """ - Creates a custom dictionary encoding reactions information from a csv file containing - data about these reactions, the path of which is given as input. + Load custom reactions from a tabular file and parse into a reactions dict. Args: - customReactionsPath : path to the reactions information file. + customReactionsPath: Path to the reactions file (TSV or CSV-like). Returns: - ReactionsDict : dictionary encoding custom reactions information. + ReactionsDict: Parsed reactions dictionary. """ try: rows = utils.readCsv(utils.FilePath.fromStrPath(customReactionsPath), delimiter = "\t", skipHeader=False) @@ -132,7 +125,7 @@ id_idx, idx_formula = utils.findIdxByName(rows[0], "Formula") except Exception as e: - + # Fallback re-read with same settings; preserves original behavior rows = utils.readCsv(utils.FilePath.fromStrPath(customReactionsPath), delimiter = "\t", skipHeader=False) if len(rows) <= 1: raise ValueError("The custom reactions file must contain at least one reaction.")
--- a/COBRAxy/utils/rule_parsing.py Fri Sep 12 15:05:54 2025 +0000 +++ b/COBRAxy/utils/rule_parsing.py Fri Sep 12 17:28:45 2025 +0000 @@ -1,10 +1,20 @@ +""" +Parsing utilities for gene rules (GPRs). + +This module provides: +- RuleErr: structured errors for malformed rules +- RuleOp: valid logical operators (AND/OR) +- OpList: nested list structure representing parsed rules with explicit operator +- RuleStack: helper stack to build nested OpLists during parsing +- parseRuleToNestedList: main entry to parse a rule string into an OpList +""" from enum import Enum import utils.general_utils as utils from typing import List, Union, Optional class RuleErr(utils.CustomErr): """ - CustomErr subclass for rule syntax errors. + Error type for rule syntax errors. """ errName = "Rule Syntax Error" def __init__(self, rule :str, msg = "no further details provided") -> None: @@ -14,7 +24,7 @@ class RuleOp(Enum): """ - Encodes all operators valid in gene rules. + Valid logical operators for gene rules. """ OR = "or" AND = "and" @@ -27,7 +37,7 @@ class OpList(List[Union[str, "OpList"]]): """ - Represents a parsed rule and each of its nesting levels, including the operator that level uses. + Parsed rule structure: a list with an associated operator for that level. """ def __init__(self, op :Optional[RuleOp] = None) -> None: """ @@ -70,8 +80,7 @@ class RuleStack: """ - FILO stack structure to save the intermediate representation of a Rule during parsing, with the - current nesting level at the top of the stack. + FILO stack used during parsing to build nested OpLists; the top is the current level. """ def __init__(self) -> None: """ @@ -177,51 +186,49 @@ def parseRuleToNestedList(rule :str) -> OpList: """ - Parse a single rule from its string representation to an OpList, making all priority explicit - through nesting levels. + Parse a rule string into an OpList, making operator precedence explicit via nesting. Args: - rule : the string representation of a rule to be parsed. + rule: Rule string to parse (supports parentheses, 'and', 'or'). Raises: - RuleErr : whenever something goes wrong during parsing. + RuleErr: If the rule is malformed (e.g., mismatched parentheses or misplaced operators). Returns: - OpList : the parsed rule. + OpList: Parsed rule as an OpList structure. """ source = iter(rule - .replace("(", "( ").replace(")", " )") # Single out parens as words - .strip() # remove whitespace at extremities + .replace("(", "( ").replace(")", " )") # single out parentheses as words + .strip() # trim edges .split()) # split by spaces stack = RuleStack() nestingErr = RuleErr(rule, "mismatch between open and closed parentheses") try: - while True: # keep reading until source ends + while True: # read until source ends while True: - operand = next(source, None) # expected name or rule opening + operand = next(source, None) # expect operand or '(' if operand is None: raise RuleErr(rule, "found trailing open parentheses") - if operand == "and" or operand == "or" or operand == ")": # found operator instead, panic + if operand in ("and", "or", ")"): # unexpected operator position raise RuleErr(rule, f"found \"{operand}\" in unexpected position") - if operand != "(": break # found name + if operand != "(": break # got a name - # found rule opening, we add new nesting level but don't know the operator + # found rule opening: add a new nesting level stack.push() stack.current.append(operand) - while True: # keep reading until operator is found or source ends - operator = next(source, None) # expected operator or rule closing - if operator and operator != ")": break # found operator + while True: # read until operator found or source ends + operator = next(source, None) # expect operator or ')' + if operator and operator != ")": break # got operator - if stack.currentIsAnd(): stack.pop() # we close the "and" chain + if stack.currentIsAnd(): stack.pop() # close current AND chain if not operator: break - stack.pop() # we close the parentheses + stack.pop() # close parentheses - # we proceed with operator: - if not operator: break # there is no such thing as a double loop break.. yet + if not operator: break if not RuleOp.isOperator(operator): raise RuleErr( rule, f"found \"{operator}\" in unexpected position, expected operator") @@ -234,7 +241,7 @@ stack.push(operator) stack.popForward() - stack.current.setOpIfMissing(operator) # buffer now knows what operator its data had + stack.current.setOpIfMissing(operator) except RuleErr as err: raise err # bubble up proper errors except: raise nestingErr # everything else is interpreted as a nesting error.