| 489 | 1 """ | 
|  | 2 Generate Reaction Activity Scores (RAS) from a gene expression dataset and GPR rules. | 
|  | 3 | 
|  | 4 The script reads a tabular dataset (genes x samples) and a rules file (GPRs), | 
|  | 5 computes RAS per reaction for each sample/cell line, and writes a tabular output. | 
|  | 6 """ | 
| 93 | 7 from __future__ import division | 
|  | 8 import sys | 
|  | 9 import argparse | 
|  | 10 import collections | 
|  | 11 import pandas as pd | 
|  | 12 import pickle as pk | 
|  | 13 import utils.general_utils as utils | 
|  | 14 import utils.rule_parsing as ruleUtils | 
|  | 15 from typing import Union, Optional, List, Dict, Tuple, TypeVar | 
|  | 16 | 
|  | 17 ERRORS = [] | 
|  | 18 ########################## argparse ########################################## | 
|  | 19 ARGS :argparse.Namespace | 
| 147 | 20 def process_args(args:List[str] = None) -> argparse.Namespace: | 
| 93 | 21     """ | 
|  | 22     Processes command-line arguments. | 
|  | 23 | 
|  | 24     Args: | 
|  | 25         args (list): List of command-line arguments. | 
|  | 26 | 
|  | 27     Returns: | 
|  | 28         Namespace: An object containing parsed arguments. | 
|  | 29     """ | 
|  | 30     parser = argparse.ArgumentParser( | 
|  | 31         usage = '%(prog)s [options]', | 
|  | 32         description = "process some value's genes to create a comparison's map.") | 
|  | 33 | 
| 489 | 34     parser.add_argument("-rl", "--model_upload", type = str, | 
|  | 35         help = "path to input file containing the rules") | 
| 93 | 36 | 
| 489 | 37     parser.add_argument("-rn", "--model_upload_name", type = str, help = "custom rules name") | 
|  | 38     # Galaxy converts files into .dat, this helps infer the original extension when needed. | 
| 93 | 39 | 
|  | 40     parser.add_argument( | 
|  | 41         '-n', '--none', | 
|  | 42         type = utils.Bool("none"), default = True, | 
|  | 43         help = 'compute Nan values') | 
|  | 44 | 
|  | 45     parser.add_argument( | 
|  | 46         '-td', '--tool_dir', | 
|  | 47         type = str, | 
|  | 48         required = True, help = 'your tool directory') | 
|  | 49 | 
|  | 50     parser.add_argument( | 
|  | 51         '-ol', '--out_log', | 
|  | 52         type = str, | 
|  | 53         help = "Output log") | 
|  | 54 | 
|  | 55     parser.add_argument( | 
| 489 | 56         '-in', '--input', | 
| 93 | 57         type = str, | 
|  | 58         help = 'input dataset') | 
|  | 59 | 
|  | 60     parser.add_argument( | 
|  | 61         '-ra', '--ras_output', | 
|  | 62         type = str, | 
|  | 63         required = True, help = 'ras output') | 
| 147 | 64 | 
| 93 | 65 | 
| 147 | 66     return parser.parse_args(args) | 
| 93 | 67 | 
|  | 68 ############################ dataset input #################################### | 
|  | 69 def read_dataset(data :str, name :str) -> pd.DataFrame: | 
|  | 70     """ | 
|  | 71     Read a dataset from a CSV file and return it as a pandas DataFrame. | 
|  | 72 | 
|  | 73     Args: | 
|  | 74         data (str): Path to the CSV file containing the dataset. | 
|  | 75         name (str): Name of the dataset, used in error messages. | 
|  | 76 | 
|  | 77     Returns: | 
|  | 78         pandas.DataFrame: DataFrame containing the dataset. | 
|  | 79 | 
|  | 80     Raises: | 
|  | 81         pd.errors.EmptyDataError: If the CSV file is empty. | 
|  | 82         sys.exit: If the CSV file has the wrong format, the execution is aborted. | 
|  | 83     """ | 
|  | 84     try: | 
|  | 85         dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python') | 
|  | 86     except pd.errors.EmptyDataError: | 
|  | 87         sys.exit('Execution aborted: wrong format of ' + name + '\n') | 
|  | 88     if len(dataset.columns) < 2: | 
|  | 89         sys.exit('Execution aborted: wrong format of ' + name + '\n') | 
|  | 90     return dataset | 
|  | 91 | 
|  | 92 ############################ load id e rules ################################## | 
|  | 93 def load_id_rules(reactions :Dict[str, Dict[str, List[str]]]) -> Tuple[List[str], List[Dict[str, List[str]]]]: | 
|  | 94     """ | 
|  | 95     Load IDs and rules from a dictionary of reactions. | 
|  | 96 | 
|  | 97     Args: | 
|  | 98         reactions (dict): A dictionary where keys are IDs and values are rules. | 
|  | 99 | 
|  | 100     Returns: | 
|  | 101         tuple: A tuple containing two lists, the first list containing IDs and the second list containing rules. | 
|  | 102     """ | 
|  | 103     ids, rules = [], [] | 
|  | 104     for key, value in reactions.items(): | 
|  | 105             ids.append(key) | 
|  | 106             rules.append(value) | 
|  | 107     return (ids, rules) | 
|  | 108 | 
|  | 109 | 
|  | 110 ############################ gene ############################################# | 
|  | 111 def data_gene(gene: pd.DataFrame, type_gene: str, name: str, gene_custom: Optional[Dict[str, str]]) -> Dict[str, str]: | 
|  | 112     """ | 
|  | 113     Process gene data to ensure correct formatting and handle duplicates. | 
|  | 114 | 
|  | 115     Args: | 
|  | 116         gene (DataFrame): DataFrame containing gene data. | 
|  | 117         type_gene (str): Type of gene data (e.g., 'hugo_id', 'ensembl_gene_id', 'symbol', 'entrez_id'). | 
|  | 118         name (str): Name of the dataset. | 
|  | 119         gene_custom (dict or None): Custom gene data dictionary if provided. | 
|  | 120 | 
|  | 121     Returns: | 
|  | 122         dict: A dictionary containing gene data with gene IDs as keys and corresponding values. | 
|  | 123     """ | 
| 309 | 124 | 
| 93 | 125     for i in range(len(gene)): | 
|  | 126         tmp = gene.iloc[i, 0] | 
|  | 127         gene.iloc[i, 0] = tmp.strip().split('.')[0] | 
|  | 128 | 
|  | 129     gene_dup = [item for item, count in | 
|  | 130                collections.Counter(gene[gene.columns[0]]).items() if count > 1] | 
|  | 131     pat_dup = [item for item, count in | 
|  | 132                collections.Counter(list(gene.columns)).items() if count > 1] | 
| 260 | 133 | 
|  | 134     gene_in_rule = None | 
| 259 | 135 | 
| 93 | 136     if gene_dup: | 
|  | 137         if gene_custom == None: | 
| 264 | 138 | 
| 309 | 139             if str(ARGS.rules_selector) == 'HMRcore': | 
|  | 140                 gene_in_rule = pk.load(open(ARGS.tool_dir + '/local/pickle files/HMRcore_genes.p', 'rb')) | 
| 93 | 141 | 
| 309 | 142             elif str(ARGS.rules_selector) == 'Recon': | 
|  | 143                 gene_in_rule = pk.load(open(ARGS.tool_dir + '/local/pickle files/Recon_genes.p', 'rb')) | 
| 93 | 144 | 
| 309 | 145             elif str(ARGS.rules_selector) == 'ENGRO2': | 
|  | 146                 gene_in_rule = pk.load(open(ARGS.tool_dir + '/local/pickle files/ENGRO2_genes.p', 'rb')) | 
| 263 | 147 | 
| 309 | 148             utils.logWarning(f"{ARGS.tool_dir}'/local/pickle files/ENGRO2_genes.p'", ARGS.out_log) | 
| 259 | 149 | 
| 93 | 150             gene_in_rule = gene_in_rule.get(type_gene) | 
|  | 151 | 
|  | 152         else: | 
|  | 153             gene_in_rule = gene_custom | 
| 260 | 154 | 
| 93 | 155         tmp = [] | 
|  | 156         for i in gene_dup: | 
|  | 157             if gene_in_rule.get(i) == 'ok': | 
|  | 158                 tmp.append(i) | 
|  | 159         if tmp: | 
|  | 160             sys.exit('Execution aborted because gene ID ' | 
|  | 161                      +str(tmp)+' in '+name+' is duplicated\n') | 
|  | 162 | 
|  | 163     if pat_dup: utils.logWarning(f"Warning: duplicated label\n{pat_dup} in {name}", ARGS.out_log) | 
|  | 164     return (gene.set_index(gene.columns[0])).to_dict() | 
|  | 165 | 
|  | 166 ############################ resolve ########################################## | 
|  | 167 def replace_gene_value(l :str, d :str) -> Tuple[Union[int, float], list]: | 
|  | 168     """ | 
| 489 | 169     Replace gene identifiers in a parsed rule expression with values from a dict. | 
| 93 | 170 | 
|  | 171     Args: | 
| 489 | 172         l: Parsed rule as a nested list structure (strings, lists, and operators). | 
|  | 173         d: Dict mapping gene IDs to numeric values. | 
| 93 | 174 | 
|  | 175     Returns: | 
| 489 | 176         tuple: (new_expression, not_found_genes) | 
| 93 | 177     """ | 
|  | 178     tmp = [] | 
|  | 179     err = [] | 
|  | 180     while l: | 
|  | 181         if isinstance(l[0], list): | 
|  | 182             tmp_rules, tmp_err = replace_gene_value(l[0], d) | 
|  | 183             tmp.append(tmp_rules) | 
|  | 184             err.extend(tmp_err) | 
|  | 185         else: | 
|  | 186             value = replace_gene(l[0], d) | 
|  | 187             tmp.append(value) | 
|  | 188             if value == None: | 
|  | 189                 err.append(l[0]) | 
|  | 190         l = l[1:] | 
|  | 191     return (tmp, err) | 
|  | 192 | 
| 489 | 193 def replace_gene(l: str, d: Dict[str, Union[int, float]]) -> Union[int, float, None]: | 
| 93 | 194     """ | 
|  | 195     Replace a single gene identifier with its corresponding value from a dictionary. | 
|  | 196 | 
|  | 197     Args: | 
|  | 198         l (str): Gene identifier to replace. | 
| 489 | 199         d (dict): Dict mapping gene IDs to numeric values. | 
| 93 | 200 | 
|  | 201     Returns: | 
| 489 | 202         float/int/None: Corresponding value from the dictionary if found, None otherwise. | 
| 93 | 203 | 
|  | 204     Raises: | 
|  | 205         sys.exit: If the value associated with the gene identifier is not valid. | 
|  | 206     """ | 
|  | 207     if l =='and' or l == 'or': | 
|  | 208         return l | 
|  | 209     else: | 
|  | 210         value = d.get(l, None) | 
|  | 211         if not(value == None or isinstance(value, (int, float))): | 
|  | 212             sys.exit('Execution aborted: ' + value + ' value not valid\n') | 
|  | 213         return value | 
|  | 214 | 
|  | 215 T = TypeVar("T", bound = Optional[Union[int, float]]) | 
|  | 216 def computes(val1 :T, op :str, val2 :T, cn :bool) -> T: | 
|  | 217     """ | 
|  | 218     Compute the RAS value between two value and an operator ('and' or 'or'). | 
|  | 219 | 
|  | 220     Args: | 
|  | 221         val1(Optional(Union[float, int])): First value. | 
|  | 222         op (str): Operator ('and' or 'or'). | 
|  | 223         val2(Optional(Union[float, int])): Second value. | 
|  | 224         cn (bool): Control boolean value. | 
|  | 225 | 
|  | 226     Returns: | 
|  | 227         Optional(Union[float, int]): Result of the computation. | 
|  | 228     """ | 
|  | 229     if val1 != None and val2 != None: | 
|  | 230         if op == 'and': | 
|  | 231             return min(val1, val2) | 
|  | 232         else: | 
|  | 233             return val1 + val2 | 
|  | 234     elif op == 'and': | 
|  | 235         if cn is True: | 
|  | 236             if val1 != None: | 
|  | 237                 return val1 | 
|  | 238             elif val2 != None: | 
|  | 239                 return val2 | 
|  | 240             else: | 
|  | 241                 return None | 
|  | 242         else: | 
|  | 243             return None | 
|  | 244     else: | 
|  | 245         if val1 != None: | 
|  | 246             return val1 | 
|  | 247         elif val2 != None: | 
|  | 248             return val2 | 
|  | 249         else: | 
|  | 250             return None | 
|  | 251 | 
|  | 252 # ris should be Literal[None] but Literal is not supported in Python 3.7 | 
|  | 253 def control(ris, l :List[Union[int, float, list]], cn :bool) -> Union[bool, int, float]: #Union[Literal[False], int, float]: | 
|  | 254     """ | 
|  | 255     Control the format of the expression. | 
|  | 256 | 
|  | 257     Args: | 
|  | 258         ris: Intermediate result. | 
|  | 259         l (list): Expression to control. | 
|  | 260         cn (bool): Control boolean value. | 
|  | 261 | 
|  | 262     Returns: | 
|  | 263         Union[Literal[False], int, float]: Result of the control. | 
|  | 264     """ | 
|  | 265     if len(l) == 1: | 
|  | 266         if isinstance(l[0], (float, int)) or l[0] == None: | 
|  | 267             return l[0] | 
|  | 268         elif isinstance(l[0], list): | 
|  | 269             return control(None, l[0], cn) | 
|  | 270         else: | 
|  | 271             return False | 
|  | 272     elif len(l) > 2: | 
|  | 273         return control_list(ris, l, cn) | 
|  | 274     else: | 
|  | 275         return False | 
|  | 276 | 
|  | 277 def control_list(ris, l :List[Optional[Union[float, int, list]]], cn :bool) -> Optional[bool]: #Optional[Literal[False]]: | 
|  | 278     """ | 
|  | 279     Control the format of a list of expressions. | 
|  | 280 | 
|  | 281     Args: | 
|  | 282         ris: Intermediate result. | 
|  | 283         l (list): List of expressions to control. | 
|  | 284         cn (bool): Control boolean value. | 
|  | 285 | 
|  | 286     Returns: | 
|  | 287         Optional[Literal[False]]: Result of the control. | 
|  | 288     """ | 
|  | 289     while l: | 
|  | 290         if len(l) == 1: | 
|  | 291             return False | 
|  | 292         elif (isinstance(l[0], (float, int)) or | 
|  | 293               l[0] == None) and l[1] in ['and', 'or']: | 
|  | 294             if isinstance(l[2], (float, int)) or l[2] == None: | 
|  | 295                 ris = computes(l[0], l[1], l[2], cn) | 
|  | 296             elif isinstance(l[2], list): | 
|  | 297                 tmp = control(None, l[2], cn) | 
|  | 298                 if tmp is False: | 
|  | 299                     return False | 
|  | 300                 else: | 
|  | 301                     ris = computes(l[0], l[1], tmp, cn) | 
|  | 302             else: | 
|  | 303                 return False | 
|  | 304             l = l[3:] | 
|  | 305         elif l[0] in ['and', 'or']: | 
|  | 306             if isinstance(l[1], (float, int)) or l[1] == None: | 
|  | 307                 ris = computes(ris, l[0], l[1], cn) | 
|  | 308             elif isinstance(l[1], list): | 
|  | 309                 tmp = control(None,l[1], cn) | 
|  | 310                 if tmp is False: | 
|  | 311                     return False | 
|  | 312                 else: | 
|  | 313                     ris = computes(ris, l[0], tmp, cn) | 
|  | 314             else: | 
|  | 315                 return False | 
|  | 316             l = l[2:] | 
|  | 317         elif isinstance(l[0], list) and l[1] in ['and', 'or']: | 
|  | 318             if isinstance(l[2], (float, int)) or l[2] == None: | 
|  | 319                 tmp = control(None, l[0], cn) | 
|  | 320                 if tmp is False: | 
|  | 321                     return False | 
|  | 322                 else: | 
|  | 323                     ris = computes(tmp, l[1], l[2], cn) | 
|  | 324             elif isinstance(l[2], list): | 
|  | 325                 tmp = control(None, l[0], cn) | 
|  | 326                 tmp2 = control(None, l[2], cn) | 
|  | 327                 if tmp is False or tmp2 is False: | 
|  | 328                     return False | 
|  | 329                 else: | 
|  | 330                     ris = computes(tmp, l[1], tmp2, cn) | 
|  | 331             else: | 
|  | 332                 return False | 
|  | 333             l = l[3:] | 
|  | 334         else: | 
|  | 335             return False | 
|  | 336     return ris | 
|  | 337 | 
|  | 338 ResolvedRules = Dict[str, List[Optional[Union[float, int]]]] | 
|  | 339 def resolve(genes: Dict[str, str], rules: List[str], ids: List[str], resolve_none: bool, name: str) -> Tuple[Optional[ResolvedRules], Optional[list]]: | 
|  | 340     """ | 
|  | 341     Resolve rules using gene data to compute scores for each rule. | 
|  | 342 | 
|  | 343     Args: | 
|  | 344         genes (dict): Dictionary containing gene data with gene IDs as keys and corresponding values. | 
|  | 345         rules (list): List of rules to resolve. | 
|  | 346         ids (list): List of IDs corresponding to the rules. | 
|  | 347         resolve_none (bool): Flag indicating whether to resolve None values in the rules. | 
|  | 348         name (str): Name of the dataset. | 
|  | 349 | 
|  | 350     Returns: | 
|  | 351         tuple: A tuple containing resolved rules as a dictionary and a list of gene IDs not found in the data. | 
|  | 352     """ | 
|  | 353     resolve_rules = {} | 
|  | 354     not_found = [] | 
|  | 355     flag = False | 
|  | 356     for key, value in genes.items(): | 
|  | 357         tmp_resolve = [] | 
|  | 358         for i in range(len(rules)): | 
|  | 359             tmp = rules[i] | 
|  | 360             if tmp: | 
|  | 361                 tmp, err = replace_gene_value(tmp, value) | 
|  | 362                 if err: | 
|  | 363                     not_found.extend(err) | 
|  | 364                 ris = control(None, tmp, resolve_none) | 
|  | 365                 if ris is False or ris == None: | 
|  | 366                     tmp_resolve.append(None) | 
|  | 367                 else: | 
|  | 368                     tmp_resolve.append(ris) | 
|  | 369                     flag = True | 
|  | 370             else: | 
|  | 371                 tmp_resolve.append(None) | 
|  | 372         resolve_rules[key] = tmp_resolve | 
|  | 373 | 
|  | 374     if flag is False: | 
|  | 375         utils.logWarning( | 
|  | 376             f"Warning: no computable score (due to missing gene values) for class {name}, the class has been disregarded", | 
|  | 377             ARGS.out_log) | 
|  | 378 | 
|  | 379         return (None, None) | 
|  | 380 | 
|  | 381     return (resolve_rules, list(set(not_found))) | 
|  | 382 ############################ create_ras ####################################### | 
|  | 383 def create_ras(resolve_rules: Optional[ResolvedRules], dataset_name: str, rules: List[str], ids: List[str], file: str) -> None: | 
|  | 384     """ | 
|  | 385     Create a RAS (Reaction Activity Score) file from resolved rules. | 
|  | 386 | 
|  | 387     Args: | 
|  | 388         resolve_rules (dict): Dictionary containing resolved rules. | 
|  | 389         dataset_name (str): Name of the dataset. | 
|  | 390         rules (list): List of rules. | 
|  | 391         file (str): Path to the output RAS file. | 
|  | 392 | 
|  | 393     Returns: | 
|  | 394         None | 
|  | 395     """ | 
|  | 396     if resolve_rules is None: | 
|  | 397         utils.logWarning(f"Couldn't generate RAS for current dataset: {dataset_name}", ARGS.out_log) | 
|  | 398 | 
|  | 399     for geni in resolve_rules.values(): | 
|  | 400         for i, valori in enumerate(geni): | 
|  | 401             if valori == None: | 
|  | 402                 geni[i] = 'None' | 
|  | 403 | 
|  | 404     output_ras = pd.DataFrame.from_dict(resolve_rules) | 
|  | 405 | 
|  | 406     output_ras.insert(0, 'Reactions', ids) | 
|  | 407     output_to_csv = pd.DataFrame.to_csv(output_ras, sep = '\t', index = False) | 
|  | 408 | 
|  | 409     text_file = open(file, "w") | 
|  | 410 | 
|  | 411     text_file.write(output_to_csv) | 
|  | 412     text_file.close() | 
|  | 413 | 
|  | 414 ################################- NEW RAS COMPUTATION -################################ | 
|  | 415 Expr = Optional[Union[int, float]] | 
|  | 416 Ras  = Expr | 
|  | 417 def ras_for_cell_lines(dataset: pd.DataFrame, rules: Dict[str, ruleUtils.OpList]) -> Dict[str, Dict[str, Ras]]: | 
|  | 418     """ | 
|  | 419     Generates the RAS scores for each cell line found in the dataset. | 
|  | 420 | 
|  | 421     Args: | 
|  | 422         dataset (pd.DataFrame): Dataset containing gene values. | 
|  | 423         rules (dict): The dict containing reaction ids as keys and rules as values. | 
| 489 | 424 | 
|  | 425     Note: | 
|  | 426         Modifies dataset in place by setting the first column as index. | 
| 93 | 427 | 
|  | 428     Returns: | 
|  | 429         dict: A dictionary where each key corresponds to a cell line name and each value is a dictionary | 
|  | 430         where each key corresponds to a reaction ID and each value is its computed RAS score. | 
|  | 431     """ | 
|  | 432     ras_values_by_cell_line = {} | 
|  | 433     dataset.set_index(dataset.columns[0], inplace=True) | 
| 489 | 434 | 
|  | 435     for cell_line_name in dataset.columns: #[1:]: | 
| 93 | 436         cell_line = dataset[cell_line_name].to_dict() | 
|  | 437         ras_values_by_cell_line[cell_line_name]= get_ras_values(rules, cell_line) | 
|  | 438     return ras_values_by_cell_line | 
|  | 439 | 
|  | 440 def get_ras_values(value_rules: Dict[str, ruleUtils.OpList], dataset: Dict[str, Expr]) -> Dict[str, Ras]: | 
|  | 441     """ | 
|  | 442     Computes the RAS (Reaction Activity Score) values for each rule in the given dict. | 
|  | 443 | 
|  | 444     Args: | 
|  | 445         value_rules (dict): A dictionary where keys are reaction ids and values are OpLists. | 
|  | 446         dataset : gene expression data of one cell line. | 
|  | 447 | 
|  | 448     Returns: | 
|  | 449         dict: A dictionary where keys are reaction ids and values are the computed RAS values for each rule. | 
|  | 450     """ | 
|  | 451     return {key: ras_op_list(op_list, dataset) for key, op_list in value_rules.items()} | 
|  | 452 | 
|  | 453 def get_gene_expr(dataset :Dict[str, Expr], name :str) -> Expr: | 
|  | 454     """ | 
|  | 455     Extracts the gene expression of the given gene from a cell line dataset. | 
|  | 456 | 
|  | 457     Args: | 
|  | 458         dataset : gene expression data of one cell line. | 
|  | 459         name : gene name. | 
|  | 460 | 
|  | 461     Returns: | 
|  | 462         Expr : the gene's expression value. | 
|  | 463     """ | 
|  | 464     expr = dataset.get(name, None) | 
|  | 465     if expr is None: ERRORS.append(name) | 
|  | 466 | 
|  | 467     return expr | 
|  | 468 | 
|  | 469 def ras_op_list(op_list: ruleUtils.OpList, dataset: Dict[str, Expr]) -> Ras: | 
|  | 470     """ | 
|  | 471     Computes recursively the RAS (Reaction Activity Score) value for the given OpList, considering the specified flag to control None behavior. | 
|  | 472 | 
|  | 473     Args: | 
|  | 474         op_list (OpList): The OpList representing a rule with gene values. | 
|  | 475         dataset : gene expression data of one cell line. | 
|  | 476 | 
|  | 477     Returns: | 
|  | 478         Ras: The computed RAS value for the given OpList. | 
|  | 479     """ | 
|  | 480     op = op_list.op | 
|  | 481     ras_value :Ras = None | 
|  | 482     if not op: return get_gene_expr(dataset, op_list[0]) | 
|  | 483     if op is ruleUtils.RuleOp.AND and not ARGS.none and None in op_list: return None | 
|  | 484 | 
|  | 485     for i in range(len(op_list)): | 
|  | 486         item = op_list[i] | 
|  | 487         if isinstance(item, ruleUtils.OpList): | 
|  | 488             item = ras_op_list(item, dataset) | 
|  | 489 | 
|  | 490         else: | 
|  | 491           item = get_gene_expr(dataset, item) | 
|  | 492 | 
|  | 493         if item is None: | 
|  | 494           if op is ruleUtils.RuleOp.AND and not ARGS.none: return None | 
|  | 495           continue | 
|  | 496 | 
|  | 497         if ras_value is None: | 
|  | 498           ras_value = item | 
|  | 499         else: | 
|  | 500           ras_value = ras_value + item if op is ruleUtils.RuleOp.OR else min(ras_value, item) | 
|  | 501 | 
|  | 502     return ras_value | 
|  | 503 | 
|  | 504 def save_as_tsv(rasScores: Dict[str, Dict[str, Ras]], reactions :List[str]) -> None: | 
|  | 505     """ | 
| 489 | 506     Save computed RAS scores to ARGS.ras_output as a TSV file. | 
| 93 | 507 | 
|  | 508     Args: | 
|  | 509         rasScores : the computed ras scores. | 
| 489 | 510         reactions : the list of reaction IDs, used as the first column. | 
| 93 | 511 | 
|  | 512     Returns: | 
|  | 513         None | 
|  | 514     """ | 
|  | 515     for scores in rasScores.values(): # this is actually a lot faster than using the ootb dataframe metod, sadly | 
|  | 516         for reactId, score in scores.items(): | 
|  | 517             if score is None: scores[reactId] = "None" | 
|  | 518 | 
|  | 519     output_ras = pd.DataFrame.from_dict(rasScores) | 
|  | 520     output_ras.insert(0, 'Reactions', reactions) | 
|  | 521     output_ras.to_csv(ARGS.ras_output, sep = '\t', index = False) | 
|  | 522 | 
|  | 523 ############################ MAIN ############################################# | 
|  | 524 #TODO: not used but keep, it will be when the new translator dicts will be used. | 
|  | 525 def translateGene(geneName :str, encoding :str, geneTranslator :Dict[str, Dict[str, str]]) -> str: | 
|  | 526     """ | 
|  | 527     Translate gene from any supported encoding to HugoID. | 
|  | 528 | 
|  | 529     Args: | 
|  | 530         geneName (str): the name of the gene in its current encoding. | 
|  | 531         encoding (str): the encoding. | 
|  | 532         geneTranslator (Dict[str, Dict[str, str]]): the dict containing all supported gene names | 
|  | 533         and encodings in the current model, mapping each to the corresponding HugoID encoding. | 
|  | 534 | 
|  | 535     Raises: | 
|  | 536         ValueError: When the gene isn't supported in the model. | 
|  | 537 | 
|  | 538     Returns: | 
|  | 539         str: the gene in HugoID encoding. | 
|  | 540     """ | 
|  | 541     supportedGenesInEncoding = geneTranslator[encoding] | 
|  | 542     if geneName in supportedGenesInEncoding: return supportedGenesInEncoding[geneName] | 
| 489 | 543     raise ValueError(f"Gene '{geneName}' not found. Please verify you are using the correct model.") | 
| 93 | 544 | 
|  | 545 def load_custom_rules() -> Dict[str, ruleUtils.OpList]: | 
|  | 546     """ | 
|  | 547     Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be | 
|  | 548     performed, significantly impacting the runtime. | 
|  | 549 | 
|  | 550     Returns: | 
|  | 551         Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules. | 
|  | 552     """ | 
| 489 | 553     datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload)  # actual file, stored in Galaxy as a .dat | 
|  | 554 | 
|  | 555     dict_rule = {} | 
|  | 556 | 
|  | 557     try: | 
|  | 558         rows = utils.readCsv(datFilePath, delimiter = "\t", skipHeader=False) | 
|  | 559         if len(rows) <= 1: | 
|  | 560             raise ValueError("Model tabular with 1 column is not supported.") | 
| 381 | 561 | 
| 489 | 562         if not rows: | 
|  | 563             raise ValueError("Model tabular is file is empty.") | 
|  | 564 | 
|  | 565         id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR") | 
|  | 566 | 
|  | 567     # First, try using a tab delimiter | 
|  | 568         for line in rows[1:]: | 
|  | 569             if len(line) <= idx_gpr: | 
|  | 570                 utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log) | 
|  | 571                 continue | 
|  | 572 | 
|  | 573             if line[idx_gpr] == "": | 
|  | 574                 dict_rule[line[id_idx]] = ruleUtils.OpList([""]) | 
|  | 575             else: | 
|  | 576                 dict_rule[line[id_idx]] = ruleUtils.parseRuleToNestedList(line[idx_gpr]) | 
|  | 577 | 
|  | 578     except Exception as e: | 
|  | 579         # If parsing with tabs fails, try comma delimiter | 
|  | 580         try: | 
|  | 581             rows = utils.readCsv(datFilePath, delimiter = ",", skipHeader=False) | 
|  | 582 | 
|  | 583             if len(rows) <= 1: | 
|  | 584                 raise ValueError("Model tabular with 1 column is not supported.") | 
|  | 585 | 
|  | 586             if not rows: | 
|  | 587                 raise ValueError("Model tabular is file is empty.") | 
|  | 588 | 
|  | 589             id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR") | 
|  | 590 | 
|  | 591             # Try again parsing row content with the GPR column using comma-separated values | 
|  | 592             for line in rows[1:]: | 
|  | 593                 if len(line) <= idx_gpr: | 
|  | 594                     utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log) | 
|  | 595                     continue | 
|  | 596 | 
|  | 597                 if line[idx_gpr] == "": | 
|  | 598                     dict_rule[line[id_idx]] = ruleUtils.OpList([""]) | 
|  | 599                 else: | 
|  | 600                     dict_rule[line[id_idx]] = ruleUtils.parseRuleToNestedList(line[idx_gpr]) | 
|  | 601 | 
|  | 602         except Exception as e2: | 
|  | 603             raise ValueError(f"Unable to parse rules file. Tried both tab and comma delimiters. Original errors: Tab: {e}, Comma: {e2}") | 
|  | 604 | 
|  | 605     if not dict_rule: | 
|  | 606             raise ValueError("No valid rules found in the uploaded file. Please check the file format.") | 
| 93 | 607     # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed. | 
| 489 | 608     return dict_rule | 
|  | 609 | 
| 401 | 610 | 
| 147 | 611 def main(args:List[str] = None) -> None: | 
| 93 | 612     """ | 
|  | 613     Initializes everything and sets the program in motion based on the fronted input arguments. | 
|  | 614 | 
|  | 615     Returns: | 
|  | 616         None | 
|  | 617     """ | 
|  | 618     # get args from frontend (related xml) | 
|  | 619     global ARGS | 
| 147 | 620     ARGS = process_args(args) | 
| 309 | 621 | 
| 93 | 622     # read dataset | 
|  | 623     dataset = read_dataset(ARGS.input, "dataset") | 
|  | 624     dataset.iloc[:, 0] = (dataset.iloc[:, 0]).astype(str) | 
|  | 625 | 
|  | 626     # remove versioning from gene names | 
|  | 627     dataset.iloc[:, 0] = dataset.iloc[:, 0].str.split('.').str[0] | 
|  | 628 | 
| 489 | 629     rules = load_custom_rules() | 
|  | 630     reactions = list(rules.keys()) | 
| 93 | 631 | 
| 489 | 632     save_as_tsv(ras_for_cell_lines(dataset, rules), reactions) | 
|  | 633     if ERRORS: utils.logWarning( | 
|  | 634         f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}", | 
|  | 635         ARGS.out_log) | 
| 381 | 636 | 
| 489 | 637 | 
|  | 638     print("Execution succeeded") | 
| 93 | 639 | 
|  | 640 ############################################################################### | 
|  | 641 if __name__ == "__main__": | 
| 309 | 642     main() |