view COBRAxy/ras_generator.py @ 531:1fc5c1edb0ef draft default tip

Uploaded
author francesco_lapi
date Wed, 22 Oct 2025 13:30:18 +0000
parents 352c51a39e23
children
line wrap: on
line source

"""
Generate Reaction Activity Scores (RAS) from a gene expression dataset and GPR rules.

The script reads a tabular dataset (genes x samples) and a rules file (GPRs),
computes RAS per reaction for each sample/cell line, and writes a tabular output.
"""
from __future__ import division
import sys
import argparse
import pandas as pd
import numpy as np
import utils.general_utils as utils
from typing import List, Dict
import ast

# Optional imports for AnnData mode (not used in ras_generator.py)
try:
    from progressbar import ProgressBar, Bar, Percentage
    from scanpy import AnnData
    from cobra.flux_analysis.variability import find_essential_reactions, find_essential_genes
except ImportError:
    # These are only needed for AnnData mode, not for ras_generator.py
    pass

ERRORS = []
########################## argparse ##########################################
ARGS :argparse.Namespace
def process_args(args:List[str] = None) -> argparse.Namespace:
    """
    Processes command-line arguments.

    Args:
        args (list): List of command-line arguments.

    Returns:
        Namespace: An object containing parsed arguments.
    """
    parser = argparse.ArgumentParser(
        usage = '%(prog)s [options]',
        description = "process some value's genes to create a comparison's map.")
    
    parser.add_argument("-rl", "--model_upload", type = str,
        help = "path to input file containing the rules")

    parser.add_argument("-rn", "--model_upload_name", type = str, help = "custom rules name")
    # Galaxy converts files into .dat, this helps infer the original extension when needed.
    
    parser.add_argument(
        '-n', '--none',
        type = utils.Bool("none"), default = True,
        help = 'compute Nan values')
    
    parser.add_argument(
        '-td', '--tool_dir',
        type = str,
        required = True, help = 'your tool directory')
    
    parser.add_argument(
        '-ol', '--out_log',
        type = str,
        help = "Output log")    
    
    parser.add_argument(
        '-in', '--input',
        type = str,
        help = 'input dataset')
    
    parser.add_argument(
        '-ra', '--ras_output',
        type = str,
        required = True, help = 'ras output')

    
    return parser.parse_args(args)

############################ dataset input ####################################
def read_dataset(data :str, name :str) -> pd.DataFrame:
    """
    Read a dataset from a CSV file and return it as a pandas DataFrame.

    Args:
        data (str): Path to the CSV file containing the dataset.
        name (str): Name of the dataset, used in error messages.

    Returns:
        pandas.DataFrame: DataFrame containing the dataset.

    Raises:
        pd.errors.EmptyDataError: If the CSV file is empty.
        sys.exit: If the CSV file has the wrong format, the execution is aborted.
    """
    try:
        dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python', index_col=0)
        dataset = dataset.astype(float)
    except pd.errors.EmptyDataError:
        sys.exit('Execution aborted: wrong file format of ' + name + '\n')
    if len(dataset.columns) < 2:
        sys.exit('Execution aborted: wrong file format of ' + name + '\n')
    return dataset


def load_custom_rules() -> Dict[str,str]:
    """
    Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be
    performed, significantly impacting the runtime.

    Returns:
        Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules.
    """
    datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload)  # actual file, stored in Galaxy as a .dat

    dict_rule = {}

    try:
        rows = utils.readCsv(datFilePath, delimiter = "\t", skipHeader=False)
        if len(rows) <= 1:
            raise ValueError("Model tabular with 1 column is not supported.")

        if not rows:
            raise ValueError("Model tabular is file is empty.")
        
        id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR")
        
    # First, try using a tab delimiter
        for line in rows[1:]:
            if len(line) <= idx_gpr:
                utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
                continue
            
            dict_rule[line[id_idx]] = line[idx_gpr] 

    except Exception as e:
        # If parsing with tabs fails, try comma delimiter
        try:
            rows = utils.readCsv(datFilePath, delimiter = ",", skipHeader=False)
            
            if len(rows) <= 1:
                raise ValueError("Model tabular with 1 column is not supported.")

            if not rows:
                raise ValueError("Model tabular is file is empty.")
            
            id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR")
            
            # Try again parsing row content with the GPR column using comma-separated values
            for line in rows[1:]:
                if len(line) <= idx_gpr:
                    utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log)
                    continue
                
                dict_rule[line[id_idx]] =line[idx_gpr]
                    
        except Exception as e2:
            raise ValueError(f"Unable to parse rules file. Tried both tab and comma delimiters. Original errors: Tab: {e}, Comma: {e2}")

    if not dict_rule:
            raise ValueError("No valid rules found in the uploaded file. Please check the file format.")
    # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed.
    return dict_rule


"""
Class to compute the RAS values

"""

class RAS_computation:

    def __init__(self, adata=None, model=None, dataset=None, gene_rules=None, rules_total_string=None):
        """
        Initialize RAS computation with two possible input modes:
        
        Mode 1 (Original - for sampling_main.py):
            adata: AnnData object with gene expression (cells × genes)
            model: COBRApy model object with reactions and GPRs
            
        Mode 2 (New - for ras_generator.py):
            dataset: pandas DataFrame with gene expression (genes × samples)
            gene_rules: dict mapping reaction IDs to GPR strings
            rules_total_string: list of all gene names in GPRs (for validation)
        """
        self._logic_operators = ['and', 'or', '(', ')']
        self.val_nan = np.nan
        
        # Determine which mode we're in
        if adata is not None and model is not None:
            # Mode 1: AnnData + COBRApy model (original)
            self._init_from_anndata(adata, model)
        elif dataset is not None and gene_rules is not None:
            # Mode 2: DataFrame + rules dict (ras_generator style)
            self._init_from_dataframe(dataset, gene_rules, rules_total_string)
        else:
            raise ValueError(
                "Invalid initialization. Provide either:\n"
                "  - adata + model (for AnnData input), or\n"
                "  - dataset + gene_rules (for DataFrame input)"
            )
    
    def _normalize_gene_name(self, gene_name):
        """Normalize gene names by replacing special characters."""
        return gene_name.replace("-", "_").replace(":", "_")
    
    def _normalize_rule(self, rule):
        """Normalize GPR rule: lowercase operators, add spaces around parentheses, normalize gene names."""
        rule = rule.replace("OR", "or").replace("AND", "and")
        rule = rule.replace("(", "( ").replace(")", " )")
        # Normalize gene names in the rule
        tokens = rule.split()
        normalized_tokens = [token if token in self._logic_operators else self._normalize_gene_name(token) for token in tokens]
        return " ".join(normalized_tokens)
    
    def _init_from_anndata(self, adata, model):
        """Initialize from AnnData and COBRApy model (original mode)."""
        # Build the dictionary for the GPRs
        df_reactions = pd.DataFrame(index=[reaction.id for reaction in model.reactions])
        gene_rules = [self._normalize_rule(reaction.gene_reaction_rule) for reaction in model.reactions]
        df_reactions['rule'] = gene_rules
        df_reactions = df_reactions.reset_index()
        df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x)))
        
        self.dict_rule_reactions = df_reactions.to_dict()['index']

        # build useful structures for RAS computation
        self.model = model
        self.count_adata = adata.copy()
        
        # Normalize gene names in both model and dataset
        model_genes = [self._normalize_gene_name(gene.id) for gene in model.genes]
        dataset_genes = [self._normalize_gene_name(gene) for gene in self.count_adata.var.index]
        self.genes = pd.Index(dataset_genes).intersection(model_genes)
        
        if len(self.genes) == 0:
            raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.")
        
        self.cell_ids = list(self.count_adata.obs.index.values)
        # Get expression data with normalized gene names
        self.count_df_filtered = self.count_adata.to_df().T
        self.count_df_filtered.index = [self._normalize_gene_name(g) for g in self.count_df_filtered.index]
        self.count_df_filtered = self.count_df_filtered.loc[self.genes]
    
    def _init_from_dataframe(self, dataset, gene_rules, rules_total_string):
        """Initialize from DataFrame and rules dict (ras_generator mode)."""
        reactions = list(gene_rules.keys())
        
        # Build the dictionary for the GPRs
        df_reactions = pd.DataFrame(index=reactions)
        gene_rules_list = [self._normalize_rule(gene_rules[reaction_id]) for reaction_id in reactions]
        df_reactions['rule'] = gene_rules_list
        df_reactions = df_reactions.reset_index()
        df_reactions = df_reactions.groupby('rule').agg(lambda x: sorted(list(x)))

        self.dict_rule_reactions = df_reactions.to_dict()['index']

        # build useful structures for RAS computation
        self.model = None
        self.count_adata = None
        
        # Normalize gene names in dataset
        dataset_normalized = dataset.copy()
        dataset_normalized.index = [self._normalize_gene_name(g) for g in dataset_normalized.index]
        
        # Determine which genes are in both dataset and GPRs
        if rules_total_string is not None:
            rules_genes = [self._normalize_gene_name(g) for g in rules_total_string]
            self.genes = dataset_normalized.index.intersection(rules_genes)
        else:
            # Extract all genes from rules
            all_genes_in_rules = set()
            for rule in gene_rules_list:
                tokens = rule.split()
                for token in tokens:
                    if token not in self._logic_operators:
                        all_genes_in_rules.add(token)
            self.genes = dataset_normalized.index.intersection(all_genes_in_rules)
        
        if len(self.genes) == 0:
            raise ValueError("ERROR: No genes from the count matrix match the metabolic model. Check that gene annotations are consistent between model and dataset.")
        
        self.cell_ids = list(dataset_normalized.columns)
        self.count_df_filtered = dataset_normalized.loc[self.genes]
 
    def compute(self,
                or_expression=np.sum,       # type of operation to do in case of an or expression (sum, max, mean)
                and_expression=np.min,      # type of operation to do in case of an and expression(min, sum)
                drop_na_rows=False,          # if True remove the nan rows of the ras  matrix
                drop_duplicates=False,      # if true, remove duplicates rows
                ignore_nan=True,            # if True, ignore NaN values in GPR evaluation (e.g., A or NaN -> A)
                print_progressbar=True,     # if True, print the progress bar
                add_count_metadata=True,    # if True add metadata of cells in the ras adata
                add_met_metadata=True,      # if True add metadata from the metabolic model (gpr and compartments of reactions)
                add_essential_reactions=False,
                add_essential_genes=False
                ):

        self.or_function = or_expression
        self.and_function = and_expression
        
        ras_df = np.full((len(self.dict_rule_reactions), len(self.cell_ids)), np.nan)
        genes_not_mapped = set()  # Track genes not in dataset
        
        if print_progressbar:
            pbar = ProgressBar(widgets=[Percentage(), Bar()], maxval=len(self.dict_rule_reactions)).start()
        
        # Process each unique GPR rule
        for ind, (rule, reaction_ids) in enumerate(self.dict_rule_reactions.items()):
            if len(rule) == 0:
                # Empty rule - keep as NaN
                pass
            else:
                # Extract genes from rule
                rule_genes = [token for token in rule.split() if token not in self._logic_operators]
                rule_genes_unique = list(set(rule_genes))
                
                # Which genes are in the dataset?
                genes_present = [g for g in rule_genes_unique if g in self.genes]
                genes_missing = [g for g in rule_genes_unique if g not in self.genes]
                
                if genes_missing:
                    genes_not_mapped.update(genes_missing)
                
                if len(genes_present) == 0:
                    # No genes in dataset - keep as NaN
                    pass
                elif len(genes_missing) > 0 and not ignore_nan:
                    # Some genes missing and we don't ignore NaN - set to NaN
                    pass
                else:
                    # Evaluate the GPR expression using AST
                    # For single gene, AST handles it fine: ast.parse("GENE_A") works
                    # more genes in the formula
                    check_only_and=("and" in rule and "or" not in rule) #only and
                    check_only_or=("or" in rule and "and" not in rule)  #only or
                    if check_only_and or check_only_or:
                        #or/and sequence
                        matrix = self.count_df_filtered.loc[genes_present].values
                        #compute for all cells
                        if check_only_and: 
                            ras_df[ind] = self.and_function(matrix, axis=0)
                        else:
                            ras_df[ind] = self.or_function(matrix, axis=0)
                    else:
                        # complex expression (e.g. A or (B and C))
                        data = self.count_df_filtered.loc[genes_present]  # dataframe of genes in the GPRs
                        tree = ast.parse(rule, mode="eval").body
                        values_by_cell = [dict(zip(data.index, data[col].values)) for col in data.columns]
                        for j, values in enumerate(values_by_cell):
                            ras_df[ind, j] =self._evaluate_ast(tree, values, self.or_function, self.and_function, ignore_nan)

            if print_progressbar:
                pbar.update(ind + 1)
        
        if print_progressbar:
            pbar.finish()
        
        # Store genes not mapped for later use
        self.genes_not_mapped = sorted(genes_not_mapped)
        
        # create the dataframe of ras (rules x samples)
        ras_df = pd.DataFrame(data=ras_df, index=range(len(self.dict_rule_reactions)), columns=self.cell_ids)
        ras_df['Reactions'] = [reaction_ids for rule, reaction_ids in self.dict_rule_reactions.items()]
        
        reactions_common = pd.DataFrame()
        reactions_common["Reactions"] = ras_df['Reactions']
        reactions_common["proof2"] = ras_df['Reactions']
        reactions_common = reactions_common.explode('Reactions')
        reactions_common = reactions_common.set_index("Reactions")

        ras_df = ras_df.explode("Reactions")
        ras_df = ras_df.set_index("Reactions")

        if drop_na_rows:
            ras_df = ras_df.dropna(how="all")
            
        if drop_duplicates:
            ras_df = ras_df.drop_duplicates()
        
        # If initialized from DataFrame (ras_generator mode), return DataFrame instead of AnnData
        if self.count_adata is None:
            return ras_df, self.genes_not_mapped
        
        # Original AnnData mode: create AnnData structure for RAS
        ras_adata = AnnData(ras_df.T)

        #add metadata
        if add_count_metadata:
            ras_adata.var["common_gprs"] = reactions_common.loc[ras_df.index]
            ras_adata.var["common_gprs"] = ras_adata.var["common_gprs"].apply(lambda x: ",".join(x))
            for el in self.count_adata.obs.columns:
                ras_adata.obs["countmatrix_"+el]=self.count_adata.obs[el]

        if add_met_metadata:
            if self.model is not None and len(self.model.compartments)>0:
                  ras_adata.var['compartments']=[list(self.model.reactions.get_by_id(reaction).compartments) for reaction in ras_adata.var.index]  
                  ras_adata.var['compartments']=ras_adata.var["compartments"].apply(lambda x: ",".join(x))
            
            if self.model is not None:
                ras_adata.var['GPR rule'] = [self.model.reactions.get_by_id(reaction).gene_reaction_rule for reaction in ras_adata.var.index]

        if add_essential_reactions:
            if self.model is not None:
                essential_reactions=find_essential_reactions(self.model)
                essential_reactions=[el.id for el in essential_reactions]            
                ras_adata.var['essential reactions']=["yes" if el in essential_reactions else "no" for el in ras_adata.var.index]
        
        if add_essential_genes:
            if self.model is not None:
                essential_genes=find_essential_genes(self.model)
                essential_genes=[el.id for el in essential_genes]
                ras_adata.var['essential genes']=[" ".join([gene for gene in genes.split()  if gene in essential_genes]) for genes in ras_adata.var["GPR rule"]]
        
        return ras_adata

    def _evaluate_ast(self, node, values, or_function, and_function, ignore_nan):
        """
        Evaluate a boolean expression using AST (Abstract Syntax Tree).
        Handles all GPR types: single gene, simple (A and B), nested (A or (B and C)).
        
        Args:
            node: AST node to evaluate
            values: Dictionary mapping gene names to their expression values
            or_function: Function to apply for OR operations
            and_function: Function to apply for AND operations
            ignore_nan: If True, ignore None/NaN values (e.g., A or None -> A)
            
        Returns:
            Evaluated expression result (float or np.nan)
        """
        if isinstance(node, ast.BoolOp):
            # Boolean operation (and/or)
            vals = [self._evaluate_ast(v, values, or_function, and_function, ignore_nan) for v in node.values]
            
            if ignore_nan:
                # Filter out None/NaN values
                vals = [v for v in vals if v is not None and not (isinstance(v, float) and np.isnan(v))]
            
            if not vals:
                return np.nan
            
            if isinstance(node.op, ast.Or):
                return or_function(vals)
            elif isinstance(node.op, ast.And):
                return and_function(vals)

        elif isinstance(node, ast.Name):
            # Variable (gene name)
            return values.get(node.id, None)
        elif isinstance(node, ast.Constant):
            # Constant (shouldn't happen in GPRs, but handle it)
            return values.get(str(node.value), None)
        else:
            raise ValueError(f"Unexpected node type in GPR: {ast.dump(node)}")


# ============================================================================
# STANDALONE FUNCTION FOR RAS_GENERATOR COMPATIBILITY
# ============================================================================

def computeRAS(
    dataset, 
    gene_rules, 
    rules_total_string,
    or_function=np.sum,
    and_function=np.min,
    ignore_nan=True
):
    """
    Compute RAS from tabular data and GPR rules (ras_generator.py compatible).
    
    This is a standalone function that wraps the RAS_computation class
    to provide the same interface as ras_generator.py.
    
    Args:
        dataset: pandas DataFrame with gene expression (genes × samples)
        gene_rules: dict mapping reaction IDs to GPR strings
        rules_total_string: list of all gene names in GPRs
        or_function: function for OR operations (default: np.sum)
        and_function: function for AND operations (default: np.min)
        ignore_nan: if True, ignore NaN in GPR evaluation (default: True)
    
    Returns:
        tuple: (ras_df, genes_not_mapped)
            - ras_df: DataFrame with RAS values (reactions × samples)
            - genes_not_mapped: list of genes in GPRs not found in dataset
    """
    # Create RAS computation object in DataFrame mode
    ras_obj = RAS_computation(
        dataset=dataset,
        gene_rules=gene_rules,
        rules_total_string=rules_total_string
    )
    
    # Compute RAS
    result = ras_obj.compute(
        or_expression=or_function,
        and_expression=and_function,
        ignore_nan=ignore_nan,
        print_progressbar=False,  # No progress bar for ras_generator
        add_count_metadata=False,  # No metadata in DataFrame mode
        add_met_metadata=False,
        add_essential_reactions=False,
        add_essential_genes=False
    )
    
    # Result is a tuple (ras_df, genes_not_mapped) in DataFrame mode
    return result

def main(args:List[str] = None) -> None:
    """
    Initializes everything and sets the program in motion based on the fronted input arguments.
    
    Returns:
        None
    """
    # get args from frontend (related xml)
    global ARGS
    ARGS = process_args(args)

    # read dataset and remove versioning from gene names
    dataset = read_dataset(ARGS.input, "dataset")
    orig_gene_list=dataset.index.copy()
    dataset.index =  [str(el.split(".")[0]) for el in dataset.index]  

    #load GPR rules
    rules = load_custom_rules()
    
    #create a list of all the gpr
    rules_total_string=""
    for id,rule in rules.items():
        rules_total_string+=rule.replace("(","").replace(")","") + " "
    rules_total_string=list(set(rules_total_string.split(" ")))

    if any(dataset.index.duplicated(keep=False)):
        genes_duplicates=orig_gene_list[dataset.index.duplicated(keep=False)]
        genes_duplicates_in_model=[elem for elem in genes_duplicates if elem in rules_total_string]

        if len(genes_duplicates_in_model)>0:#metabolic genes have duplicated entries in the dataset
            list_str=", ".join(genes_duplicates_in_model)
            list_genes=f"ERROR: Duplicate entries in the gene dataset present in one or more GPR. The following metabolic genes are duplicated: "+list_str
            raise ValueError(list_genes)       
        else:
            list_str=", ".join(genes_duplicates)
            list_genes=f"INFO: Duplicate entries in the gene dataset. The following genes are duplicated in the dataset but not mentioned in the GPRs: "+list_str           
            utils.logWarning(list_genes,ARGS.out_log)  

    #check if nan value must be ignored in the GPR 
    if ARGS.none:
    #    #e.g. (A or nan --> A)
        ignore_nan = True
    else:
        #e.g. (A or nan --> nan)
        ignore_nan = False
    
    #compure ras
    ras_df,genes_not_mapped=computeRAS(dataset,rules,
                    rules_total_string,
                    or_function=np.sum,    # type of operation to do in case of an or expression (max, sum, mean)
                    and_function=np.min,
                    ignore_nan=ignore_nan)
    
    #save to csv and replace nan with None
    ras_df.replace([np.nan,None],"None").to_csv(ARGS.ras_output, sep = '\t')

    #report genes not present in the data
    if len(genes_not_mapped)>0: 
        genes_not_mapped_str=", ".join(genes_not_mapped)
        utils.logWarning(
        f"INFO: The following genes are mentioned in the GPR rules but don't appear in the dataset: "+genes_not_mapped_str,
        ARGS.out_log)  
 
    print("Execution succeeded")

###############################################################################
if __name__ == "__main__":
    main()