diff COBRAxy/ras_generator.py @ 489:97eea560a10f draft

Uploaded
author francesco_lapi
date Mon, 29 Sep 2025 10:33:26 +0000
parents 187cee1a00e2
children c6ea189ea7e9
line wrap: on
line diff
--- a/COBRAxy/ras_generator.py	Tue Sep 23 13:48:24 2025 +0000
+++ b/COBRAxy/ras_generator.py	Mon Sep 29 10:33:26 2025 +0000
@@ -1,5 +1,10 @@
+"""
+Generate Reaction Activity Scores (RAS) from a gene expression dataset and GPR rules.
+
+The script reads a tabular dataset (genes x samples) and a rules file (GPRs),
+computes RAS per reaction for each sample/cell line, and writes a tabular output.
+"""
 from __future__ import division
-# galaxy complains this ^^^ needs to be at the very beginning of the file, for some reason.
 import sys
 import argparse
 import collections
@@ -8,7 +13,6 @@
 import utils.general_utils as utils
 import utils.rule_parsing as ruleUtils
 from typing import Union, Optional, List, Dict, Tuple, TypeVar
-import os
 
 ERRORS = []
 ########################## argparse ##########################################
@@ -27,16 +31,11 @@
         usage = '%(prog)s [options]',
         description = "process some value's genes to create a comparison's map.")
     
-    parser.add_argument(
-        '-rs', '--rules_selector', 
-        type = utils.Model, default = utils.Model.ENGRO2, choices = list(utils.Model),
-        help = 'chose which type of dataset you want use')
-    
-    parser.add_argument("-rl", "--rule_list", type = str,
-        help = "path to input file with custom rules, if provided")
+    parser.add_argument("-rl", "--model_upload", type = str,
+        help = "path to input file containing the rules")
 
-    parser.add_argument("-rn", "--rules_name", type = str, help = "custom rules name")
-    # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in
+    parser.add_argument("-rn", "--model_upload_name", type = str, help = "custom rules name")
+    # Galaxy converts files into .dat, this helps infer the original extension when needed.
     
     parser.add_argument(
         '-n', '--none',
@@ -54,7 +53,7 @@
         help = "Output log")    
     
     parser.add_argument(
-        '-in', '--input', #id รจ diventato in
+        '-in', '--input',
         type = str,
         help = 'input dataset')
     
@@ -258,14 +257,14 @@
 ############################ resolve ##########################################
 def replace_gene_value(l :str, d :str) -> Tuple[Union[int, float], list]:
     """
-    Replace gene identifiers with corresponding values from a dictionary.
+    Replace gene identifiers in a parsed rule expression with values from a dict.
 
     Args:
-        l (str): String of gene identifier.
-        d (str): String corresponding to its value.
+        l: Parsed rule as a nested list structure (strings, lists, and operators).
+        d: Dict mapping gene IDs to numeric values.
 
     Returns:
-        tuple: A tuple containing two lists: the first list contains replaced values, and the second list contains any errors encountered during replacement.
+        tuple: (new_expression, not_found_genes)
     """
     tmp = []
     err = []
@@ -282,16 +281,16 @@
         l = l[1:]
     return (tmp, err)
 
-def replace_gene(l :str, d :str) -> Union[int, float]:
+def replace_gene(l: str, d: Dict[str, Union[int, float]]) -> Union[int, float, None]:
     """
     Replace a single gene identifier with its corresponding value from a dictionary.
 
     Args:
         l (str): Gene identifier to replace.
-        d (str): String corresponding to its value.
+        d (dict): Dict mapping gene IDs to numeric values.
 
     Returns:
-        float/int: Corresponding value from the dictionary if found, None otherwise.
+        float/int/None: Corresponding value from the dictionary if found, None otherwise.
 
     Raises:
         sys.exit: If the value associated with the gene identifier is not valid.
@@ -513,9 +512,9 @@
     Args:
         dataset (pd.DataFrame): Dataset containing gene values.
         rules (dict): The dict containing reaction ids as keys and rules as values.
-
-    Side effects:
-        dataset : mut
+    
+    Note:
+        Modifies dataset in place by setting the first column as index.
     
     Returns:
         dict: A dictionary where each key corresponds to a cell line name and each value is a dictionary
@@ -523,8 +522,8 @@
     """
     ras_values_by_cell_line = {}
     dataset.set_index(dataset.columns[0], inplace=True)
-    # Considera tutte le colonne tranne la prima in cui ci sono gli hugo quindi va scartata
-    for cell_line_name in dataset.columns[1:]:
+    
+    for cell_line_name in dataset.columns: #[1:]:
         cell_line = dataset[cell_line_name].to_dict()
         ras_values_by_cell_line[cell_line_name]= get_ras_values(rules, cell_line)
     return ras_values_by_cell_line
@@ -595,11 +594,11 @@
 
 def save_as_tsv(rasScores: Dict[str, Dict[str, Ras]], reactions :List[str]) -> None:
     """
-    Save computed ras scores to the given path, as a tsv file.
+    Save computed RAS scores to ARGS.ras_output as a TSV file.
 
     Args:
         rasScores : the computed ras scores.
-        path : the output tsv file's path.
+        reactions : the list of reaction IDs, used as the first column.
     
     Returns:
         None
@@ -632,7 +631,7 @@
     """
     supportedGenesInEncoding = geneTranslator[encoding]
     if geneName in supportedGenesInEncoding: return supportedGenesInEncoding[geneName]
-    raise ValueError(f"Gene \"{geneName}\" non trovato, verifica di star utilizzando il modello corretto!")
+    raise ValueError(f"Gene '{geneName}' not found. Please verify you are using the correct model.")
 
 def load_custom_rules() -> Dict[str, ruleUtils.OpList]:
     """
@@ -642,16 +641,63 @@
     Returns:
         Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules.
     """
-    datFilePath = utils.FilePath.fromStrPath(ARGS.rule_list) # actual file, stored in galaxy as a .dat
-    
-    try: filenamePath = utils.FilePath.fromStrPath(ARGS.rules_name) # file's name in input, to determine its original ext
-    except utils.PathErr as err:
-        raise utils.PathErr(filenamePath, f"Please make sure your file's name is a valid file path, {err.msg}")
-     
-    if filenamePath.ext is utils.FileFormat.PICKLE: return utils.readPickle(datFilePath)
+    datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload)  # actual file, stored in Galaxy as a .dat
+
+    dict_rule = {}
+
+    try:
+        rows = utils.readCsv(datFilePath, delimiter = "\t", skipHeader=False)
+        if len(rows) <= 1:
+            raise ValueError("Model tabular with 1 column is not supported.")
 
+        if not rows:
+            raise ValueError("Model tabular is file is empty.")
+        
+        id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR")
+        
+    # First, try using a tab delimiter
+        for line in rows[1:]:
+            if len(line) <= idx_gpr:
+                utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
+                continue
+            
+            if line[idx_gpr] == "":
+                dict_rule[line[id_idx]] = ruleUtils.OpList([""])
+            else:
+                dict_rule[line[id_idx]] = ruleUtils.parseRuleToNestedList(line[idx_gpr])
+                
+    except Exception as e:
+        # If parsing with tabs fails, try comma delimiter
+        try:
+            rows = utils.readCsv(datFilePath, delimiter = ",", skipHeader=False)
+            
+            if len(rows) <= 1:
+                raise ValueError("Model tabular with 1 column is not supported.")
+
+            if not rows:
+                raise ValueError("Model tabular is file is empty.")
+            
+            id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR")
+            
+            # Try again parsing row content with the GPR column using comma-separated values
+            for line in rows[1:]:
+                if len(line) <= idx_gpr:
+                    utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
+                    continue
+                
+                if line[idx_gpr] == "":
+                    dict_rule[line[id_idx]] = ruleUtils.OpList([""])
+                else:
+                    dict_rule[line[id_idx]] = ruleUtils.parseRuleToNestedList(line[idx_gpr])
+                    
+        except Exception as e2:
+            raise ValueError(f"Unable to parse rules file. Tried both tab and comma delimiters. Original errors: Tab: {e}, Comma: {e2}")
+
+    if not dict_rule:
+            raise ValueError("No valid rules found in the uploaded file. Please check the file format.")
     # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed.
-    return { line[0] : ruleUtils.parseRuleToNestedList(line[1]) for line in utils.readCsv(datFilePath) }
+    return dict_rule
+
 
 def main(args:List[str] = None) -> None:
     """
@@ -671,37 +717,16 @@
     # remove versioning from gene names
     dataset.iloc[:, 0] = dataset.iloc[:, 0].str.split('.').str[0]
 
-    # handle custom models
-    model :utils.Model = ARGS.rules_selector
-
-    if model is utils.Model.Custom:
-        rules = load_custom_rules()
-        reactions = list(rules.keys())
+    rules = load_custom_rules()
+    reactions = list(rules.keys())
 
-        save_as_tsv(ras_for_cell_lines(dataset, rules), reactions)
-        if ERRORS: utils.logWarning(
-            f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}",
-            ARGS.out_log)
-        
-        return
-    
-    # This is the standard flow of the ras_generator program, for non-custom models.
-    name = "RAS Dataset"
-    type_gene = gene_type(dataset.iloc[0, 0], name)
+    save_as_tsv(ras_for_cell_lines(dataset, rules), reactions)
+    if ERRORS: utils.logWarning(
+        f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}",
+        ARGS.out_log)  
 
-    rules      = model.getRules(ARGS.tool_dir)
-    genes      = data_gene(dataset, type_gene, name, None)
-    ids, rules = load_id_rules(rules.get(type_gene))
-    
-    resolve_rules, err = resolve(genes, rules, ids, ARGS.none, name)
-    create_ras(resolve_rules, name, rules, ids, ARGS.ras_output)
-    
-    if err: utils.logWarning(
-        f"Warning: gene(s) {err} not found in class \"{name}\", " +
-        "the expression level for this gene will be considered NaN",
-        ARGS.out_log)
-    
-    print("Execution succeded")
+
+    print("Execution succeeded")
 
 ###############################################################################
 if __name__ == "__main__":