| 
93
 | 
     1 from __future__ import division
 | 
| 
 | 
     2 # galaxy complains this ^^^ needs to be at the very beginning of the file, for some reason.
 | 
| 
 | 
     3 import sys
 | 
| 
 | 
     4 import argparse
 | 
| 
 | 
     5 import collections
 | 
| 
 | 
     6 import pandas as pd
 | 
| 
 | 
     7 import pickle as pk
 | 
| 
 | 
     8 import utils.general_utils as utils
 | 
| 
 | 
     9 import utils.rule_parsing as ruleUtils
 | 
| 
 | 
    10 from typing import Union, Optional, List, Dict, Tuple, TypeVar
 | 
| 
309
 | 
    11 import os
 | 
| 
93
 | 
    12 
 | 
| 
 | 
    13 ERRORS = []
 | 
| 
 | 
    14 ########################## argparse ##########################################
 | 
| 
 | 
    15 ARGS :argparse.Namespace
 | 
| 
147
 | 
    16 def process_args(args:List[str] = None) -> argparse.Namespace:
 | 
| 
93
 | 
    17     """
 | 
| 
 | 
    18     Processes command-line arguments.
 | 
| 
 | 
    19 
 | 
| 
 | 
    20     Args:
 | 
| 
 | 
    21         args (list): List of command-line arguments.
 | 
| 
 | 
    22 
 | 
| 
 | 
    23     Returns:
 | 
| 
 | 
    24         Namespace: An object containing parsed arguments.
 | 
| 
 | 
    25     """
 | 
| 
 | 
    26     parser = argparse.ArgumentParser(
 | 
| 
 | 
    27         usage = '%(prog)s [options]',
 | 
| 
 | 
    28         description = "process some value's genes to create a comparison's map.")
 | 
| 
 | 
    29     
 | 
| 
 | 
    30     parser.add_argument(
 | 
| 
 | 
    31         '-rs', '--rules_selector', 
 | 
| 
265
 | 
    32         type = utils.Model, default = utils.Model.ENGRO2, choices = list(utils.Model),
 | 
| 
93
 | 
    33         help = 'chose which type of dataset you want use')
 | 
| 
 | 
    34     
 | 
| 
 | 
    35     parser.add_argument("-rl", "--rule_list", type = str,
 | 
| 
 | 
    36         help = "path to input file with custom rules, if provided")
 | 
| 
 | 
    37 
 | 
| 
 | 
    38     parser.add_argument("-rn", "--rules_name", type = str, help = "custom rules name")
 | 
| 
 | 
    39     # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in
 | 
| 
 | 
    40     
 | 
| 
 | 
    41     parser.add_argument(
 | 
| 
 | 
    42         '-n', '--none',
 | 
| 
 | 
    43         type = utils.Bool("none"), default = True,
 | 
| 
 | 
    44         help = 'compute Nan values')
 | 
| 
 | 
    45     
 | 
| 
 | 
    46     parser.add_argument(
 | 
| 
 | 
    47         '-td', '--tool_dir',
 | 
| 
 | 
    48         type = str,
 | 
| 
 | 
    49         required = True, help = 'your tool directory')
 | 
| 
 | 
    50     
 | 
| 
 | 
    51     parser.add_argument(
 | 
| 
 | 
    52         '-ol', '--out_log',
 | 
| 
 | 
    53         type = str,
 | 
| 
 | 
    54         help = "Output log")    
 | 
| 
 | 
    55     
 | 
| 
 | 
    56     parser.add_argument(
 | 
| 
 | 
    57         '-in', '--input', #id รจ diventato in
 | 
| 
 | 
    58         type = str,
 | 
| 
 | 
    59         help = 'input dataset')
 | 
| 
 | 
    60     
 | 
| 
 | 
    61     parser.add_argument(
 | 
| 
 | 
    62         '-ra', '--ras_output',
 | 
| 
 | 
    63         type = str,
 | 
| 
 | 
    64         required = True, help = 'ras output')
 | 
| 
147
 | 
    65 
 | 
| 
93
 | 
    66     
 | 
| 
147
 | 
    67     return parser.parse_args(args)
 | 
| 
93
 | 
    68 
 | 
| 
 | 
    69 ############################ dataset input ####################################
 | 
| 
 | 
    70 def read_dataset(data :str, name :str) -> pd.DataFrame:
 | 
| 
 | 
    71     """
 | 
| 
 | 
    72     Read a dataset from a CSV file and return it as a pandas DataFrame.
 | 
| 
 | 
    73 
 | 
| 
 | 
    74     Args:
 | 
| 
 | 
    75         data (str): Path to the CSV file containing the dataset.
 | 
| 
 | 
    76         name (str): Name of the dataset, used in error messages.
 | 
| 
 | 
    77 
 | 
| 
 | 
    78     Returns:
 | 
| 
 | 
    79         pandas.DataFrame: DataFrame containing the dataset.
 | 
| 
 | 
    80 
 | 
| 
 | 
    81     Raises:
 | 
| 
 | 
    82         pd.errors.EmptyDataError: If the CSV file is empty.
 | 
| 
 | 
    83         sys.exit: If the CSV file has the wrong format, the execution is aborted.
 | 
| 
 | 
    84     """
 | 
| 
 | 
    85     try:
 | 
| 
 | 
    86         dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
 | 
| 
 | 
    87     except pd.errors.EmptyDataError:
 | 
| 
 | 
    88         sys.exit('Execution aborted: wrong format of ' + name + '\n')
 | 
| 
 | 
    89     if len(dataset.columns) < 2:
 | 
| 
 | 
    90         sys.exit('Execution aborted: wrong format of ' + name + '\n')
 | 
| 
 | 
    91     return dataset
 | 
| 
 | 
    92 
 | 
| 
 | 
    93 ############################ load id e rules ##################################
 | 
| 
 | 
    94 def load_id_rules(reactions :Dict[str, Dict[str, List[str]]]) -> Tuple[List[str], List[Dict[str, List[str]]]]:
 | 
| 
 | 
    95     """
 | 
| 
 | 
    96     Load IDs and rules from a dictionary of reactions.
 | 
| 
 | 
    97 
 | 
| 
 | 
    98     Args:
 | 
| 
 | 
    99         reactions (dict): A dictionary where keys are IDs and values are rules.
 | 
| 
 | 
   100 
 | 
| 
 | 
   101     Returns:
 | 
| 
 | 
   102         tuple: A tuple containing two lists, the first list containing IDs and the second list containing rules.
 | 
| 
 | 
   103     """
 | 
| 
 | 
   104     ids, rules = [], []
 | 
| 
 | 
   105     for key, value in reactions.items():
 | 
| 
 | 
   106             ids.append(key)
 | 
| 
 | 
   107             rules.append(value)
 | 
| 
 | 
   108     return (ids, rules)
 | 
| 
 | 
   109 
 | 
| 
 | 
   110 ############################ check_methods ####################################
 | 
| 
 | 
   111 def gene_type(l :str, name :str) -> str:
 | 
| 
 | 
   112     """
 | 
| 
 | 
   113     Determine the type of gene ID.
 | 
| 
 | 
   114 
 | 
| 
 | 
   115     Args:
 | 
| 
 | 
   116         l (str): The gene identifier to check.
 | 
| 
 | 
   117         name (str): The name of the dataset, used in error messages.
 | 
| 
 | 
   118 
 | 
| 
 | 
   119     Returns:
 | 
| 
 | 
   120         str: The type of gene ID ('hugo_id', 'ensembl_gene_id', 'symbol', or 'entrez_id').
 | 
| 
 | 
   121 
 | 
| 
 | 
   122     Raises:
 | 
| 
 | 
   123         sys.exit: If the gene ID type is not supported, the execution is aborted.
 | 
| 
 | 
   124     """
 | 
| 
 | 
   125     if check_hgnc(l):
 | 
| 
 | 
   126         return 'hugo_id'
 | 
| 
 | 
   127     elif check_ensembl(l):
 | 
| 
 | 
   128         return 'ensembl_gene_id'
 | 
| 
 | 
   129     elif check_symbol(l):
 | 
| 
 | 
   130         return 'symbol'
 | 
| 
 | 
   131     elif check_entrez(l):
 | 
| 
 | 
   132         return 'entrez_id'
 | 
| 
 | 
   133     else:
 | 
| 
 | 
   134         sys.exit('Execution aborted:\n' +
 | 
| 
 | 
   135                  'gene ID type in ' + name + ' not supported. Supported ID'+
 | 
| 
 | 
   136                  'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n')
 | 
| 
 | 
   137 
 | 
| 
 | 
   138 def check_hgnc(l :str) -> bool:
 | 
| 
 | 
   139     """
 | 
| 
 | 
   140     Check if a gene identifier follows the HGNC format.
 | 
| 
 | 
   141 
 | 
| 
 | 
   142     Args:
 | 
| 
 | 
   143         l (str): The gene identifier to check.
 | 
| 
 | 
   144 
 | 
| 
 | 
   145     Returns:
 | 
| 
 | 
   146         bool: True if the gene identifier follows the HGNC format, False otherwise.
 | 
| 
 | 
   147     """
 | 
| 
 | 
   148     if len(l) > 5:
 | 
| 
 | 
   149         if (l.upper()).startswith('HGNC:'):
 | 
| 
 | 
   150             return l[5:].isdigit()
 | 
| 
 | 
   151         else:
 | 
| 
 | 
   152             return False
 | 
| 
 | 
   153     else:
 | 
| 
 | 
   154         return False
 | 
| 
 | 
   155 
 | 
| 
 | 
   156 def check_ensembl(l :str) -> bool:
 | 
| 
 | 
   157     """
 | 
| 
 | 
   158     Check if a gene identifier follows the Ensembl format.
 | 
| 
 | 
   159 
 | 
| 
 | 
   160     Args:
 | 
| 
 | 
   161         l (str): The gene identifier to check.
 | 
| 
 | 
   162 
 | 
| 
 | 
   163     Returns:
 | 
| 
 | 
   164         bool: True if the gene identifier follows the Ensembl format, False otherwise.
 | 
| 
 | 
   165     """
 | 
| 
 | 
   166     return l.upper().startswith('ENS')
 | 
| 
 | 
   167  
 | 
| 
 | 
   168 
 | 
| 
 | 
   169 def check_symbol(l :str) -> bool:
 | 
| 
 | 
   170     """
 | 
| 
 | 
   171     Check if a gene identifier follows the symbol format.
 | 
| 
 | 
   172 
 | 
| 
 | 
   173     Args:
 | 
| 
 | 
   174         l (str): The gene identifier to check.
 | 
| 
 | 
   175 
 | 
| 
 | 
   176     Returns:
 | 
| 
 | 
   177         bool: True if the gene identifier follows the symbol format, False otherwise.
 | 
| 
 | 
   178     """
 | 
| 
 | 
   179     if len(l) > 0:
 | 
| 
 | 
   180         if l[0].isalpha() and l[1:].isalnum():
 | 
| 
 | 
   181             return True
 | 
| 
 | 
   182         else:
 | 
| 
 | 
   183             return False
 | 
| 
 | 
   184     else:
 | 
| 
 | 
   185         return False
 | 
| 
 | 
   186 
 | 
| 
 | 
   187 def check_entrez(l :str) -> bool:
 | 
| 
 | 
   188     """
 | 
| 
 | 
   189     Check if a gene identifier follows the Entrez ID format.
 | 
| 
 | 
   190 
 | 
| 
 | 
   191     Args:
 | 
| 
 | 
   192         l (str): The gene identifier to check.
 | 
| 
 | 
   193 
 | 
| 
 | 
   194     Returns:
 | 
| 
 | 
   195         bool: True if the gene identifier follows the Entrez ID format, False otherwise.
 | 
| 
 | 
   196     """ 
 | 
| 
 | 
   197     if len(l) > 0:
 | 
| 
 | 
   198         return l.isdigit()
 | 
| 
 | 
   199     else: 
 | 
| 
 | 
   200         return False
 | 
| 
 | 
   201 
 | 
| 
 | 
   202 ############################ gene #############################################
 | 
| 
 | 
   203 def data_gene(gene: pd.DataFrame, type_gene: str, name: str, gene_custom: Optional[Dict[str, str]]) -> Dict[str, str]:
 | 
| 
 | 
   204     """
 | 
| 
 | 
   205     Process gene data to ensure correct formatting and handle duplicates.
 | 
| 
 | 
   206 
 | 
| 
 | 
   207     Args:
 | 
| 
 | 
   208         gene (DataFrame): DataFrame containing gene data.
 | 
| 
 | 
   209         type_gene (str): Type of gene data (e.g., 'hugo_id', 'ensembl_gene_id', 'symbol', 'entrez_id').
 | 
| 
 | 
   210         name (str): Name of the dataset.
 | 
| 
 | 
   211         gene_custom (dict or None): Custom gene data dictionary if provided.
 | 
| 
 | 
   212 
 | 
| 
 | 
   213     Returns:
 | 
| 
 | 
   214         dict: A dictionary containing gene data with gene IDs as keys and corresponding values.
 | 
| 
 | 
   215     """
 | 
| 
309
 | 
   216  
 | 
| 
93
 | 
   217     for i in range(len(gene)):
 | 
| 
 | 
   218         tmp = gene.iloc[i, 0]
 | 
| 
 | 
   219         gene.iloc[i, 0] = tmp.strip().split('.')[0]
 | 
| 
 | 
   220 
 | 
| 
 | 
   221     gene_dup = [item for item, count in 
 | 
| 
 | 
   222                collections.Counter(gene[gene.columns[0]]).items() if count > 1]
 | 
| 
 | 
   223     pat_dup = [item for item, count in 
 | 
| 
 | 
   224                collections.Counter(list(gene.columns)).items() if count > 1]
 | 
| 
260
 | 
   225     
 | 
| 
 | 
   226     gene_in_rule = None
 | 
| 
259
 | 
   227 
 | 
| 
93
 | 
   228     if gene_dup:
 | 
| 
 | 
   229         if gene_custom == None:
 | 
| 
264
 | 
   230 
 | 
| 
309
 | 
   231             if str(ARGS.rules_selector) == 'HMRcore':
 | 
| 
 | 
   232                 gene_in_rule = pk.load(open(ARGS.tool_dir + '/local/pickle files/HMRcore_genes.p', 'rb'))
 | 
| 
93
 | 
   233             
 | 
| 
309
 | 
   234             elif str(ARGS.rules_selector) == 'Recon':
 | 
| 
 | 
   235                 gene_in_rule = pk.load(open(ARGS.tool_dir + '/local/pickle files/Recon_genes.p', 'rb'))
 | 
| 
93
 | 
   236             
 | 
| 
309
 | 
   237             elif str(ARGS.rules_selector) == 'ENGRO2':
 | 
| 
 | 
   238                 gene_in_rule = pk.load(open(ARGS.tool_dir + '/local/pickle files/ENGRO2_genes.p', 'rb'))
 | 
| 
263
 | 
   239 
 | 
| 
309
 | 
   240             utils.logWarning(f"{ARGS.tool_dir}'/local/pickle files/ENGRO2_genes.p'", ARGS.out_log)
 | 
| 
259
 | 
   241 
 | 
| 
93
 | 
   242             gene_in_rule = gene_in_rule.get(type_gene)
 | 
| 
 | 
   243         
 | 
| 
 | 
   244         else:
 | 
| 
 | 
   245             gene_in_rule = gene_custom
 | 
| 
260
 | 
   246 
 | 
| 
93
 | 
   247         tmp = []
 | 
| 
 | 
   248         for i in gene_dup:
 | 
| 
 | 
   249             if gene_in_rule.get(i) == 'ok':
 | 
| 
 | 
   250                 tmp.append(i)
 | 
| 
 | 
   251         if tmp:
 | 
| 
 | 
   252             sys.exit('Execution aborted because gene ID '
 | 
| 
 | 
   253                      +str(tmp)+' in '+name+' is duplicated\n')
 | 
| 
 | 
   254     
 | 
| 
 | 
   255     if pat_dup: utils.logWarning(f"Warning: duplicated label\n{pat_dup} in {name}", ARGS.out_log)
 | 
| 
 | 
   256     return (gene.set_index(gene.columns[0])).to_dict()
 | 
| 
 | 
   257 
 | 
| 
 | 
   258 ############################ resolve ##########################################
 | 
| 
 | 
   259 def replace_gene_value(l :str, d :str) -> Tuple[Union[int, float], list]:
 | 
| 
 | 
   260     """
 | 
| 
 | 
   261     Replace gene identifiers with corresponding values from a dictionary.
 | 
| 
 | 
   262 
 | 
| 
 | 
   263     Args:
 | 
| 
 | 
   264         l (str): String of gene identifier.
 | 
| 
 | 
   265         d (str): String corresponding to its value.
 | 
| 
 | 
   266 
 | 
| 
 | 
   267     Returns:
 | 
| 
 | 
   268         tuple: A tuple containing two lists: the first list contains replaced values, and the second list contains any errors encountered during replacement.
 | 
| 
 | 
   269     """
 | 
| 
 | 
   270     tmp = []
 | 
| 
 | 
   271     err = []
 | 
| 
 | 
   272     while l:
 | 
| 
 | 
   273         if isinstance(l[0], list):
 | 
| 
 | 
   274             tmp_rules, tmp_err = replace_gene_value(l[0], d)
 | 
| 
 | 
   275             tmp.append(tmp_rules)
 | 
| 
 | 
   276             err.extend(tmp_err)
 | 
| 
 | 
   277         else:
 | 
| 
 | 
   278             value = replace_gene(l[0], d)
 | 
| 
 | 
   279             tmp.append(value)
 | 
| 
 | 
   280             if value == None:
 | 
| 
 | 
   281                 err.append(l[0])
 | 
| 
 | 
   282         l = l[1:]
 | 
| 
 | 
   283     return (tmp, err)
 | 
| 
 | 
   284 
 | 
| 
 | 
   285 def replace_gene(l :str, d :str) -> Union[int, float]:
 | 
| 
 | 
   286     """
 | 
| 
 | 
   287     Replace a single gene identifier with its corresponding value from a dictionary.
 | 
| 
 | 
   288 
 | 
| 
 | 
   289     Args:
 | 
| 
 | 
   290         l (str): Gene identifier to replace.
 | 
| 
 | 
   291         d (str): String corresponding to its value.
 | 
| 
 | 
   292 
 | 
| 
 | 
   293     Returns:
 | 
| 
 | 
   294         float/int: Corresponding value from the dictionary if found, None otherwise.
 | 
| 
 | 
   295 
 | 
| 
 | 
   296     Raises:
 | 
| 
 | 
   297         sys.exit: If the value associated with the gene identifier is not valid.
 | 
| 
 | 
   298     """
 | 
| 
 | 
   299     if l =='and' or l == 'or':
 | 
| 
 | 
   300         return l
 | 
| 
 | 
   301     else:
 | 
| 
 | 
   302         value = d.get(l, None)
 | 
| 
 | 
   303         if not(value == None or isinstance(value, (int, float))):
 | 
| 
 | 
   304             sys.exit('Execution aborted: ' + value + ' value not valid\n')
 | 
| 
 | 
   305         return value
 | 
| 
 | 
   306 
 | 
| 
 | 
   307 T = TypeVar("T", bound = Optional[Union[int, float]])
 | 
| 
 | 
   308 def computes(val1 :T, op :str, val2 :T, cn :bool) -> T:
 | 
| 
 | 
   309     """
 | 
| 
 | 
   310     Compute the RAS value between two value and an operator ('and' or 'or').
 | 
| 
 | 
   311 
 | 
| 
 | 
   312     Args:
 | 
| 
 | 
   313         val1(Optional(Union[float, int])): First value.
 | 
| 
 | 
   314         op (str): Operator ('and' or 'or').
 | 
| 
 | 
   315         val2(Optional(Union[float, int])): Second value.
 | 
| 
 | 
   316         cn (bool): Control boolean value.
 | 
| 
 | 
   317 
 | 
| 
 | 
   318     Returns:
 | 
| 
 | 
   319         Optional(Union[float, int]): Result of the computation.
 | 
| 
 | 
   320     """
 | 
| 
 | 
   321     if val1 != None and val2 != None:
 | 
| 
 | 
   322         if op == 'and':
 | 
| 
 | 
   323             return min(val1, val2)
 | 
| 
 | 
   324         else:
 | 
| 
 | 
   325             return val1 + val2
 | 
| 
 | 
   326     elif op == 'and':
 | 
| 
 | 
   327         if cn is True:
 | 
| 
 | 
   328             if val1 != None:
 | 
| 
 | 
   329                 return val1
 | 
| 
 | 
   330             elif val2 != None:
 | 
| 
 | 
   331                 return val2
 | 
| 
 | 
   332             else:
 | 
| 
 | 
   333                 return None
 | 
| 
 | 
   334         else:
 | 
| 
 | 
   335             return None
 | 
| 
 | 
   336     else:
 | 
| 
 | 
   337         if val1 != None:
 | 
| 
 | 
   338             return val1
 | 
| 
 | 
   339         elif val2 != None:
 | 
| 
 | 
   340             return val2
 | 
| 
 | 
   341         else:
 | 
| 
 | 
   342             return None
 | 
| 
 | 
   343 
 | 
| 
 | 
   344 # ris should be Literal[None] but Literal is not supported in Python 3.7
 | 
| 
 | 
   345 def control(ris, l :List[Union[int, float, list]], cn :bool) -> Union[bool, int, float]: #Union[Literal[False], int, float]:
 | 
| 
 | 
   346     """
 | 
| 
 | 
   347     Control the format of the expression.
 | 
| 
 | 
   348 
 | 
| 
 | 
   349     Args:
 | 
| 
 | 
   350         ris: Intermediate result.
 | 
| 
 | 
   351         l (list): Expression to control.
 | 
| 
 | 
   352         cn (bool): Control boolean value.
 | 
| 
 | 
   353 
 | 
| 
 | 
   354     Returns:
 | 
| 
 | 
   355         Union[Literal[False], int, float]: Result of the control.
 | 
| 
 | 
   356     """
 | 
| 
 | 
   357     if len(l) == 1:
 | 
| 
 | 
   358         if isinstance(l[0], (float, int)) or l[0] == None:
 | 
| 
 | 
   359             return l[0]
 | 
| 
 | 
   360         elif isinstance(l[0], list):
 | 
| 
 | 
   361             return control(None, l[0], cn)
 | 
| 
 | 
   362         else:
 | 
| 
 | 
   363             return False
 | 
| 
 | 
   364     elif len(l) > 2:
 | 
| 
 | 
   365         return control_list(ris, l, cn)
 | 
| 
 | 
   366     else:
 | 
| 
 | 
   367         return False
 | 
| 
 | 
   368 
 | 
| 
 | 
   369 def control_list(ris, l :List[Optional[Union[float, int, list]]], cn :bool) -> Optional[bool]: #Optional[Literal[False]]:
 | 
| 
 | 
   370     """
 | 
| 
 | 
   371     Control the format of a list of expressions.
 | 
| 
 | 
   372 
 | 
| 
 | 
   373     Args:
 | 
| 
 | 
   374         ris: Intermediate result.
 | 
| 
 | 
   375         l (list): List of expressions to control.
 | 
| 
 | 
   376         cn (bool): Control boolean value.
 | 
| 
 | 
   377 
 | 
| 
 | 
   378     Returns:
 | 
| 
 | 
   379         Optional[Literal[False]]: Result of the control.
 | 
| 
 | 
   380     """
 | 
| 
 | 
   381     while l:
 | 
| 
 | 
   382         if len(l) == 1:
 | 
| 
 | 
   383             return False
 | 
| 
 | 
   384         elif (isinstance(l[0], (float, int)) or
 | 
| 
 | 
   385               l[0] == None) and l[1] in ['and', 'or']:
 | 
| 
 | 
   386             if isinstance(l[2], (float, int)) or l[2] == None:
 | 
| 
 | 
   387                 ris = computes(l[0], l[1], l[2], cn)            
 | 
| 
 | 
   388             elif isinstance(l[2], list):
 | 
| 
 | 
   389                 tmp = control(None, l[2], cn)
 | 
| 
 | 
   390                 if tmp is False:
 | 
| 
 | 
   391                     return False
 | 
| 
 | 
   392                 else:
 | 
| 
 | 
   393                     ris = computes(l[0], l[1], tmp, cn)
 | 
| 
 | 
   394             else:
 | 
| 
 | 
   395                 return False
 | 
| 
 | 
   396             l = l[3:]
 | 
| 
 | 
   397         elif l[0] in ['and', 'or']:
 | 
| 
 | 
   398             if isinstance(l[1], (float, int)) or l[1] == None:
 | 
| 
 | 
   399                 ris = computes(ris, l[0], l[1], cn)
 | 
| 
 | 
   400             elif isinstance(l[1], list):
 | 
| 
 | 
   401                 tmp = control(None,l[1], cn)
 | 
| 
 | 
   402                 if tmp is False:
 | 
| 
 | 
   403                     return False
 | 
| 
 | 
   404                 else:
 | 
| 
 | 
   405                     ris = computes(ris, l[0], tmp, cn)
 | 
| 
 | 
   406             else:
 | 
| 
 | 
   407                 return False
 | 
| 
 | 
   408             l = l[2:]
 | 
| 
 | 
   409         elif isinstance(l[0], list) and l[1] in ['and', 'or']:
 | 
| 
 | 
   410             if isinstance(l[2], (float, int)) or l[2] == None:
 | 
| 
 | 
   411                 tmp = control(None, l[0], cn)
 | 
| 
 | 
   412                 if tmp is False:
 | 
| 
 | 
   413                     return False
 | 
| 
 | 
   414                 else:
 | 
| 
 | 
   415                     ris = computes(tmp, l[1], l[2], cn)
 | 
| 
 | 
   416             elif isinstance(l[2], list):
 | 
| 
 | 
   417                 tmp = control(None, l[0], cn)
 | 
| 
 | 
   418                 tmp2 = control(None, l[2], cn)
 | 
| 
 | 
   419                 if tmp is False or tmp2 is False:
 | 
| 
 | 
   420                     return False
 | 
| 
 | 
   421                 else:
 | 
| 
 | 
   422                     ris = computes(tmp, l[1], tmp2, cn)
 | 
| 
 | 
   423             else:
 | 
| 
 | 
   424                 return False
 | 
| 
 | 
   425             l = l[3:]
 | 
| 
 | 
   426         else:
 | 
| 
 | 
   427             return False
 | 
| 
 | 
   428     return ris
 | 
| 
 | 
   429 
 | 
| 
 | 
   430 ResolvedRules = Dict[str, List[Optional[Union[float, int]]]]
 | 
| 
 | 
   431 def resolve(genes: Dict[str, str], rules: List[str], ids: List[str], resolve_none: bool, name: str) -> Tuple[Optional[ResolvedRules], Optional[list]]:
 | 
| 
 | 
   432     """
 | 
| 
 | 
   433     Resolve rules using gene data to compute scores for each rule.
 | 
| 
 | 
   434 
 | 
| 
 | 
   435     Args:
 | 
| 
 | 
   436         genes (dict): Dictionary containing gene data with gene IDs as keys and corresponding values.
 | 
| 
 | 
   437         rules (list): List of rules to resolve.
 | 
| 
 | 
   438         ids (list): List of IDs corresponding to the rules.
 | 
| 
 | 
   439         resolve_none (bool): Flag indicating whether to resolve None values in the rules.
 | 
| 
 | 
   440         name (str): Name of the dataset.
 | 
| 
 | 
   441 
 | 
| 
 | 
   442     Returns:
 | 
| 
 | 
   443         tuple: A tuple containing resolved rules as a dictionary and a list of gene IDs not found in the data.
 | 
| 
 | 
   444     """
 | 
| 
 | 
   445     resolve_rules = {}
 | 
| 
 | 
   446     not_found = []
 | 
| 
 | 
   447     flag = False
 | 
| 
 | 
   448     for key, value in genes.items():
 | 
| 
 | 
   449         tmp_resolve = []
 | 
| 
 | 
   450         for i in range(len(rules)):
 | 
| 
 | 
   451             tmp = rules[i]
 | 
| 
 | 
   452             if tmp:
 | 
| 
 | 
   453                 tmp, err = replace_gene_value(tmp, value)
 | 
| 
 | 
   454                 if err:
 | 
| 
 | 
   455                     not_found.extend(err)
 | 
| 
 | 
   456                 ris = control(None, tmp, resolve_none)
 | 
| 
 | 
   457                 if ris is False or ris == None:
 | 
| 
 | 
   458                     tmp_resolve.append(None)
 | 
| 
 | 
   459                 else:
 | 
| 
 | 
   460                     tmp_resolve.append(ris)
 | 
| 
 | 
   461                     flag = True
 | 
| 
 | 
   462             else:
 | 
| 
 | 
   463                 tmp_resolve.append(None)    
 | 
| 
 | 
   464         resolve_rules[key] = tmp_resolve
 | 
| 
 | 
   465     
 | 
| 
 | 
   466     if flag is False:
 | 
| 
 | 
   467         utils.logWarning(
 | 
| 
 | 
   468             f"Warning: no computable score (due to missing gene values) for class {name}, the class has been disregarded",
 | 
| 
 | 
   469             ARGS.out_log)
 | 
| 
 | 
   470         
 | 
| 
 | 
   471         return (None, None)
 | 
| 
 | 
   472     
 | 
| 
 | 
   473     return (resolve_rules, list(set(not_found)))
 | 
| 
 | 
   474 ############################ create_ras #######################################
 | 
| 
 | 
   475 def create_ras(resolve_rules: Optional[ResolvedRules], dataset_name: str, rules: List[str], ids: List[str], file: str) -> None:
 | 
| 
 | 
   476     """
 | 
| 
 | 
   477     Create a RAS (Reaction Activity Score) file from resolved rules.
 | 
| 
 | 
   478 
 | 
| 
 | 
   479     Args:
 | 
| 
 | 
   480         resolve_rules (dict): Dictionary containing resolved rules.
 | 
| 
 | 
   481         dataset_name (str): Name of the dataset.
 | 
| 
 | 
   482         rules (list): List of rules.
 | 
| 
 | 
   483         file (str): Path to the output RAS file.
 | 
| 
 | 
   484 
 | 
| 
 | 
   485     Returns:
 | 
| 
 | 
   486         None
 | 
| 
 | 
   487     """
 | 
| 
 | 
   488     if resolve_rules is None:
 | 
| 
 | 
   489         utils.logWarning(f"Couldn't generate RAS for current dataset: {dataset_name}", ARGS.out_log)
 | 
| 
 | 
   490 
 | 
| 
 | 
   491     for geni in resolve_rules.values():
 | 
| 
 | 
   492         for i, valori in enumerate(geni):
 | 
| 
 | 
   493             if valori == None:
 | 
| 
 | 
   494                 geni[i] = 'None'
 | 
| 
 | 
   495                 
 | 
| 
 | 
   496     output_ras = pd.DataFrame.from_dict(resolve_rules)
 | 
| 
 | 
   497     
 | 
| 
 | 
   498     output_ras.insert(0, 'Reactions', ids)
 | 
| 
 | 
   499     output_to_csv = pd.DataFrame.to_csv(output_ras, sep = '\t', index = False)
 | 
| 
 | 
   500     
 | 
| 
 | 
   501     text_file = open(file, "w")
 | 
| 
 | 
   502     
 | 
| 
 | 
   503     text_file.write(output_to_csv)
 | 
| 
 | 
   504     text_file.close()
 | 
| 
 | 
   505 
 | 
| 
 | 
   506 ################################- NEW RAS COMPUTATION -################################
 | 
| 
 | 
   507 Expr = Optional[Union[int, float]]
 | 
| 
 | 
   508 Ras  = Expr
 | 
| 
 | 
   509 def ras_for_cell_lines(dataset: pd.DataFrame, rules: Dict[str, ruleUtils.OpList]) -> Dict[str, Dict[str, Ras]]:
 | 
| 
 | 
   510     """
 | 
| 
 | 
   511     Generates the RAS scores for each cell line found in the dataset.
 | 
| 
 | 
   512 
 | 
| 
 | 
   513     Args:
 | 
| 
 | 
   514         dataset (pd.DataFrame): Dataset containing gene values.
 | 
| 
 | 
   515         rules (dict): The dict containing reaction ids as keys and rules as values.
 | 
| 
 | 
   516 
 | 
| 
 | 
   517     Side effects:
 | 
| 
 | 
   518         dataset : mut
 | 
| 
 | 
   519     
 | 
| 
 | 
   520     Returns:
 | 
| 
 | 
   521         dict: A dictionary where each key corresponds to a cell line name and each value is a dictionary
 | 
| 
 | 
   522         where each key corresponds to a reaction ID and each value is its computed RAS score.
 | 
| 
 | 
   523     """
 | 
| 
 | 
   524     ras_values_by_cell_line = {}
 | 
| 
 | 
   525     dataset.set_index(dataset.columns[0], inplace=True)
 | 
| 
 | 
   526     # Considera tutte le colonne tranne la prima in cui ci sono gli hugo quindi va scartata
 | 
| 
 | 
   527     for cell_line_name in dataset.columns[1:]:
 | 
| 
 | 
   528         cell_line = dataset[cell_line_name].to_dict()
 | 
| 
 | 
   529         ras_values_by_cell_line[cell_line_name]= get_ras_values(rules, cell_line)
 | 
| 
 | 
   530     return ras_values_by_cell_line
 | 
| 
 | 
   531 
 | 
| 
 | 
   532 def get_ras_values(value_rules: Dict[str, ruleUtils.OpList], dataset: Dict[str, Expr]) -> Dict[str, Ras]:
 | 
| 
 | 
   533     """
 | 
| 
 | 
   534     Computes the RAS (Reaction Activity Score) values for each rule in the given dict.
 | 
| 
 | 
   535 
 | 
| 
 | 
   536     Args:
 | 
| 
 | 
   537         value_rules (dict): A dictionary where keys are reaction ids and values are OpLists.
 | 
| 
 | 
   538         dataset : gene expression data of one cell line.
 | 
| 
 | 
   539 
 | 
| 
 | 
   540     Returns:
 | 
| 
 | 
   541         dict: A dictionary where keys are reaction ids and values are the computed RAS values for each rule.
 | 
| 
 | 
   542     """
 | 
| 
 | 
   543     return {key: ras_op_list(op_list, dataset) for key, op_list in value_rules.items()}
 | 
| 
 | 
   544 
 | 
| 
 | 
   545 def get_gene_expr(dataset :Dict[str, Expr], name :str) -> Expr:
 | 
| 
 | 
   546     """
 | 
| 
 | 
   547     Extracts the gene expression of the given gene from a cell line dataset.
 | 
| 
 | 
   548 
 | 
| 
 | 
   549     Args:
 | 
| 
 | 
   550         dataset : gene expression data of one cell line.
 | 
| 
 | 
   551         name : gene name.
 | 
| 
 | 
   552     
 | 
| 
 | 
   553     Returns:
 | 
| 
 | 
   554         Expr : the gene's expression value.
 | 
| 
 | 
   555     """
 | 
| 
 | 
   556     expr = dataset.get(name, None)
 | 
| 
 | 
   557     if expr is None: ERRORS.append(name)
 | 
| 
 | 
   558   
 | 
| 
 | 
   559     return expr
 | 
| 
 | 
   560 
 | 
| 
 | 
   561 def ras_op_list(op_list: ruleUtils.OpList, dataset: Dict[str, Expr]) -> Ras:
 | 
| 
 | 
   562     """
 | 
| 
 | 
   563     Computes recursively the RAS (Reaction Activity Score) value for the given OpList, considering the specified flag to control None behavior.
 | 
| 
 | 
   564 
 | 
| 
 | 
   565     Args:
 | 
| 
 | 
   566         op_list (OpList): The OpList representing a rule with gene values.
 | 
| 
 | 
   567         dataset : gene expression data of one cell line.
 | 
| 
 | 
   568 
 | 
| 
 | 
   569     Returns:
 | 
| 
 | 
   570         Ras: The computed RAS value for the given OpList.
 | 
| 
 | 
   571     """
 | 
| 
 | 
   572     op = op_list.op
 | 
| 
 | 
   573     ras_value :Ras = None
 | 
| 
 | 
   574     if not op: return get_gene_expr(dataset, op_list[0])
 | 
| 
 | 
   575     if op is ruleUtils.RuleOp.AND and not ARGS.none and None in op_list: return None
 | 
| 
 | 
   576 
 | 
| 
 | 
   577     for i in range(len(op_list)):
 | 
| 
 | 
   578         item = op_list[i]
 | 
| 
 | 
   579         if isinstance(item, ruleUtils.OpList):
 | 
| 
 | 
   580             item = ras_op_list(item, dataset)
 | 
| 
 | 
   581 
 | 
| 
 | 
   582         else:
 | 
| 
 | 
   583           item = get_gene_expr(dataset, item)
 | 
| 
 | 
   584 
 | 
| 
 | 
   585         if item is None:
 | 
| 
 | 
   586           if op is ruleUtils.RuleOp.AND and not ARGS.none: return None
 | 
| 
 | 
   587           continue
 | 
| 
 | 
   588 
 | 
| 
 | 
   589         if ras_value is None:
 | 
| 
 | 
   590           ras_value = item
 | 
| 
 | 
   591         else:
 | 
| 
 | 
   592           ras_value = ras_value + item if op is ruleUtils.RuleOp.OR else min(ras_value, item)
 | 
| 
 | 
   593 
 | 
| 
 | 
   594     return ras_value
 | 
| 
 | 
   595 
 | 
| 
 | 
   596 def save_as_tsv(rasScores: Dict[str, Dict[str, Ras]], reactions :List[str]) -> None:
 | 
| 
 | 
   597     """
 | 
| 
 | 
   598     Save computed ras scores to the given path, as a tsv file.
 | 
| 
 | 
   599 
 | 
| 
 | 
   600     Args:
 | 
| 
 | 
   601         rasScores : the computed ras scores.
 | 
| 
 | 
   602         path : the output tsv file's path.
 | 
| 
 | 
   603     
 | 
| 
 | 
   604     Returns:
 | 
| 
 | 
   605         None
 | 
| 
 | 
   606     """
 | 
| 
 | 
   607     for scores in rasScores.values(): # this is actually a lot faster than using the ootb dataframe metod, sadly
 | 
| 
 | 
   608         for reactId, score in scores.items():
 | 
| 
 | 
   609             if score is None: scores[reactId] = "None"
 | 
| 
 | 
   610 
 | 
| 
 | 
   611     output_ras = pd.DataFrame.from_dict(rasScores)
 | 
| 
 | 
   612     output_ras.insert(0, 'Reactions', reactions)
 | 
| 
 | 
   613     output_ras.to_csv(ARGS.ras_output, sep = '\t', index = False)
 | 
| 
 | 
   614 
 | 
| 
 | 
   615 ############################ MAIN #############################################
 | 
| 
 | 
   616 #TODO: not used but keep, it will be when the new translator dicts will be used.
 | 
| 
 | 
   617 def translateGene(geneName :str, encoding :str, geneTranslator :Dict[str, Dict[str, str]]) -> str:
 | 
| 
 | 
   618     """
 | 
| 
 | 
   619     Translate gene from any supported encoding to HugoID.
 | 
| 
 | 
   620 
 | 
| 
 | 
   621     Args:
 | 
| 
 | 
   622         geneName (str): the name of the gene in its current encoding.
 | 
| 
 | 
   623         encoding (str): the encoding.
 | 
| 
 | 
   624         geneTranslator (Dict[str, Dict[str, str]]): the dict containing all supported gene names
 | 
| 
 | 
   625         and encodings in the current model, mapping each to the corresponding HugoID encoding.
 | 
| 
 | 
   626 
 | 
| 
 | 
   627     Raises:
 | 
| 
 | 
   628         ValueError: When the gene isn't supported in the model.
 | 
| 
 | 
   629 
 | 
| 
 | 
   630     Returns:
 | 
| 
 | 
   631         str: the gene in HugoID encoding.
 | 
| 
 | 
   632     """
 | 
| 
 | 
   633     supportedGenesInEncoding = geneTranslator[encoding]
 | 
| 
 | 
   634     if geneName in supportedGenesInEncoding: return supportedGenesInEncoding[geneName]
 | 
| 
 | 
   635     raise ValueError(f"Gene \"{geneName}\" non trovato, verifica di star utilizzando il modello corretto!")
 | 
| 
 | 
   636 
 | 
| 
 | 
   637 def load_custom_rules() -> Dict[str, ruleUtils.OpList]:
 | 
| 
 | 
   638     """
 | 
| 
 | 
   639     Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be
 | 
| 
 | 
   640     performed, significantly impacting the runtime.
 | 
| 
 | 
   641 
 | 
| 
 | 
   642     Returns:
 | 
| 
 | 
   643         Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules.
 | 
| 
 | 
   644     """
 | 
| 
 | 
   645     datFilePath = utils.FilePath.fromStrPath(ARGS.rule_list) # actual file, stored in galaxy as a .dat
 | 
| 
 | 
   646     
 | 
| 
 | 
   647     try: filenamePath = utils.FilePath.fromStrPath(ARGS.rules_name) # file's name in input, to determine its original ext
 | 
| 
 | 
   648     except utils.PathErr as err:
 | 
| 
 | 
   649         raise utils.PathErr(filenamePath, f"Please make sure your file's name is a valid file path, {err.msg}")
 | 
| 
 | 
   650      
 | 
| 
 | 
   651     if filenamePath.ext is utils.FileFormat.PICKLE: return utils.readPickle(datFilePath)
 | 
| 
 | 
   652 
 | 
| 
 | 
   653     # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed.
 | 
| 
 | 
   654     return { line[0] : ruleUtils.parseRuleToNestedList(line[1]) for line in utils.readCsv(datFilePath) }
 | 
| 
 | 
   655 
 | 
| 
147
 | 
   656 def main(args:List[str] = None) -> None:
 | 
| 
93
 | 
   657     """
 | 
| 
 | 
   658     Initializes everything and sets the program in motion based on the fronted input arguments.
 | 
| 
 | 
   659     
 | 
| 
 | 
   660     Returns:
 | 
| 
 | 
   661         None
 | 
| 
 | 
   662     """
 | 
| 
 | 
   663     # get args from frontend (related xml)
 | 
| 
 | 
   664     global ARGS
 | 
| 
147
 | 
   665     ARGS = process_args(args)
 | 
| 
309
 | 
   666 
 | 
| 
93
 | 
   667     # read dataset
 | 
| 
 | 
   668     dataset = read_dataset(ARGS.input, "dataset")
 | 
| 
 | 
   669     dataset.iloc[:, 0] = (dataset.iloc[:, 0]).astype(str)
 | 
| 
 | 
   670 
 | 
| 
 | 
   671     # remove versioning from gene names
 | 
| 
 | 
   672     dataset.iloc[:, 0] = dataset.iloc[:, 0].str.split('.').str[0]
 | 
| 
 | 
   673 
 | 
| 
 | 
   674     # handle custom models
 | 
| 
 | 
   675     model :utils.Model = ARGS.rules_selector
 | 
| 
309
 | 
   676 
 | 
| 
93
 | 
   677     if model is utils.Model.Custom:
 | 
| 
 | 
   678         rules = load_custom_rules()
 | 
| 
 | 
   679         reactions = list(rules.keys())
 | 
| 
 | 
   680 
 | 
| 
 | 
   681         save_as_tsv(ras_for_cell_lines(dataset, rules), reactions)
 | 
| 
 | 
   682         if ERRORS: utils.logWarning(
 | 
| 
 | 
   683             f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}",
 | 
| 
 | 
   684             ARGS.out_log)
 | 
| 
 | 
   685         
 | 
| 
 | 
   686         return
 | 
| 
 | 
   687     
 | 
| 
 | 
   688     # This is the standard flow of the ras_generator program, for non-custom models.
 | 
| 
 | 
   689     name = "RAS Dataset"
 | 
| 
 | 
   690     type_gene = gene_type(dataset.iloc[0, 0], name)
 | 
| 
 | 
   691 
 | 
| 
 | 
   692     rules      = model.getRules(ARGS.tool_dir)
 | 
| 
 | 
   693     genes      = data_gene(dataset, type_gene, name, None)
 | 
| 
 | 
   694     ids, rules = load_id_rules(rules.get(type_gene))
 | 
| 
 | 
   695     
 | 
| 
 | 
   696     resolve_rules, err = resolve(genes, rules, ids, ARGS.none, name)
 | 
| 
 | 
   697     create_ras(resolve_rules, name, rules, ids, ARGS.ras_output)
 | 
| 
 | 
   698     
 | 
| 
 | 
   699     if err: utils.logWarning(
 | 
| 
 | 
   700         f"Warning: gene(s) {err} not found in class \"{name}\", " +
 | 
| 
 | 
   701         "the expression level for this gene will be considered NaN",
 | 
| 
 | 
   702         ARGS.out_log)
 | 
| 
 | 
   703     
 | 
| 
 | 
   704     print("Execution succeded")
 | 
| 
 | 
   705 
 | 
| 
 | 
   706 ###############################################################################
 | 
| 
 | 
   707 if __name__ == "__main__":
 | 
| 
309
 | 
   708     main()
 |