| 93 | 1 from __future__ import division | 
|  | 2 # galaxy complains this ^^^ needs to be at the very beginning of the file, for some reason. | 
|  | 3 import sys | 
|  | 4 import argparse | 
|  | 5 import collections | 
|  | 6 import pandas as pd | 
|  | 7 import pickle as pk | 
|  | 8 import utils.general_utils as utils | 
|  | 9 import utils.rule_parsing as ruleUtils | 
|  | 10 from typing import Union, Optional, List, Dict, Tuple, TypeVar | 
| 309 | 11 import os | 
| 93 | 12 | 
|  | 13 ERRORS = [] | 
|  | 14 ########################## argparse ########################################## | 
|  | 15 ARGS :argparse.Namespace | 
| 147 | 16 def process_args(args:List[str] = None) -> argparse.Namespace: | 
| 93 | 17     """ | 
|  | 18     Processes command-line arguments. | 
|  | 19 | 
|  | 20     Args: | 
|  | 21         args (list): List of command-line arguments. | 
|  | 22 | 
|  | 23     Returns: | 
|  | 24         Namespace: An object containing parsed arguments. | 
|  | 25     """ | 
|  | 26     parser = argparse.ArgumentParser( | 
|  | 27         usage = '%(prog)s [options]', | 
|  | 28         description = "process some value's genes to create a comparison's map.") | 
|  | 29 | 
| 398 | 30     parser.add_argument("-rl", "--model_upload", type = str, | 
| 93 | 31         help = "path to input file with custom rules, if provided") | 
|  | 32 | 
| 398 | 33     parser.add_argument("-rn", "--model_upload_name", type = str, help = "custom rules name") | 
| 93 | 34     # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in | 
|  | 35 | 
|  | 36     parser.add_argument( | 
|  | 37         '-n', '--none', | 
|  | 38         type = utils.Bool("none"), default = True, | 
|  | 39         help = 'compute Nan values') | 
|  | 40 | 
|  | 41     parser.add_argument( | 
|  | 42         '-td', '--tool_dir', | 
|  | 43         type = str, | 
|  | 44         required = True, help = 'your tool directory') | 
|  | 45 | 
|  | 46     parser.add_argument( | 
|  | 47         '-ol', '--out_log', | 
|  | 48         type = str, | 
|  | 49         help = "Output log") | 
|  | 50 | 
|  | 51     parser.add_argument( | 
|  | 52         '-in', '--input', #id รจ diventato in | 
|  | 53         type = str, | 
|  | 54         help = 'input dataset') | 
|  | 55 | 
|  | 56     parser.add_argument( | 
|  | 57         '-ra', '--ras_output', | 
|  | 58         type = str, | 
|  | 59         required = True, help = 'ras output') | 
| 147 | 60 | 
| 93 | 61 | 
| 147 | 62     return parser.parse_args(args) | 
| 93 | 63 | 
|  | 64 ############################ dataset input #################################### | 
|  | 65 def read_dataset(data :str, name :str) -> pd.DataFrame: | 
|  | 66     """ | 
|  | 67     Read a dataset from a CSV file and return it as a pandas DataFrame. | 
|  | 68 | 
|  | 69     Args: | 
|  | 70         data (str): Path to the CSV file containing the dataset. | 
|  | 71         name (str): Name of the dataset, used in error messages. | 
|  | 72 | 
|  | 73     Returns: | 
|  | 74         pandas.DataFrame: DataFrame containing the dataset. | 
|  | 75 | 
|  | 76     Raises: | 
|  | 77         pd.errors.EmptyDataError: If the CSV file is empty. | 
|  | 78         sys.exit: If the CSV file has the wrong format, the execution is aborted. | 
|  | 79     """ | 
|  | 80     try: | 
|  | 81         dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python') | 
|  | 82     except pd.errors.EmptyDataError: | 
|  | 83         sys.exit('Execution aborted: wrong format of ' + name + '\n') | 
|  | 84     if len(dataset.columns) < 2: | 
|  | 85         sys.exit('Execution aborted: wrong format of ' + name + '\n') | 
|  | 86     return dataset | 
|  | 87 | 
|  | 88 ############################ load id e rules ################################## | 
|  | 89 def load_id_rules(reactions :Dict[str, Dict[str, List[str]]]) -> Tuple[List[str], List[Dict[str, List[str]]]]: | 
|  | 90     """ | 
|  | 91     Load IDs and rules from a dictionary of reactions. | 
|  | 92 | 
|  | 93     Args: | 
|  | 94         reactions (dict): A dictionary where keys are IDs and values are rules. | 
|  | 95 | 
|  | 96     Returns: | 
|  | 97         tuple: A tuple containing two lists, the first list containing IDs and the second list containing rules. | 
|  | 98     """ | 
|  | 99     ids, rules = [], [] | 
|  | 100     for key, value in reactions.items(): | 
|  | 101             ids.append(key) | 
|  | 102             rules.append(value) | 
|  | 103     return (ids, rules) | 
|  | 104 | 
|  | 105 ############################ check_methods #################################### | 
|  | 106 def gene_type(l :str, name :str) -> str: | 
|  | 107     """ | 
|  | 108     Determine the type of gene ID. | 
|  | 109 | 
|  | 110     Args: | 
|  | 111         l (str): The gene identifier to check. | 
|  | 112         name (str): The name of the dataset, used in error messages. | 
|  | 113 | 
|  | 114     Returns: | 
|  | 115         str: The type of gene ID ('hugo_id', 'ensembl_gene_id', 'symbol', or 'entrez_id'). | 
|  | 116 | 
|  | 117     Raises: | 
|  | 118         sys.exit: If the gene ID type is not supported, the execution is aborted. | 
|  | 119     """ | 
|  | 120     if check_hgnc(l): | 
|  | 121         return 'hugo_id' | 
|  | 122     elif check_ensembl(l): | 
|  | 123         return 'ensembl_gene_id' | 
|  | 124     elif check_symbol(l): | 
|  | 125         return 'symbol' | 
|  | 126     elif check_entrez(l): | 
|  | 127         return 'entrez_id' | 
|  | 128     else: | 
|  | 129         sys.exit('Execution aborted:\n' + | 
|  | 130                  'gene ID type in ' + name + ' not supported. Supported ID'+ | 
|  | 131                  'types are: HUGO ID, Ensemble ID, HUGO symbol, Entrez ID\n') | 
|  | 132 | 
|  | 133 def check_hgnc(l :str) -> bool: | 
|  | 134     """ | 
|  | 135     Check if a gene identifier follows the HGNC format. | 
|  | 136 | 
|  | 137     Args: | 
|  | 138         l (str): The gene identifier to check. | 
|  | 139 | 
|  | 140     Returns: | 
|  | 141         bool: True if the gene identifier follows the HGNC format, False otherwise. | 
|  | 142     """ | 
|  | 143     if len(l) > 5: | 
|  | 144         if (l.upper()).startswith('HGNC:'): | 
|  | 145             return l[5:].isdigit() | 
|  | 146         else: | 
|  | 147             return False | 
|  | 148     else: | 
|  | 149         return False | 
|  | 150 | 
|  | 151 def check_ensembl(l :str) -> bool: | 
|  | 152     """ | 
|  | 153     Check if a gene identifier follows the Ensembl format. | 
|  | 154 | 
|  | 155     Args: | 
|  | 156         l (str): The gene identifier to check. | 
|  | 157 | 
|  | 158     Returns: | 
|  | 159         bool: True if the gene identifier follows the Ensembl format, False otherwise. | 
|  | 160     """ | 
|  | 161     return l.upper().startswith('ENS') | 
|  | 162 | 
|  | 163 | 
|  | 164 def check_symbol(l :str) -> bool: | 
|  | 165     """ | 
|  | 166     Check if a gene identifier follows the symbol format. | 
|  | 167 | 
|  | 168     Args: | 
|  | 169         l (str): The gene identifier to check. | 
|  | 170 | 
|  | 171     Returns: | 
|  | 172         bool: True if the gene identifier follows the symbol format, False otherwise. | 
|  | 173     """ | 
|  | 174     if len(l) > 0: | 
|  | 175         if l[0].isalpha() and l[1:].isalnum(): | 
|  | 176             return True | 
|  | 177         else: | 
|  | 178             return False | 
|  | 179     else: | 
|  | 180         return False | 
|  | 181 | 
|  | 182 def check_entrez(l :str) -> bool: | 
|  | 183     """ | 
|  | 184     Check if a gene identifier follows the Entrez ID format. | 
|  | 185 | 
|  | 186     Args: | 
|  | 187         l (str): The gene identifier to check. | 
|  | 188 | 
|  | 189     Returns: | 
|  | 190         bool: True if the gene identifier follows the Entrez ID format, False otherwise. | 
|  | 191     """ | 
|  | 192     if len(l) > 0: | 
|  | 193         return l.isdigit() | 
|  | 194     else: | 
|  | 195         return False | 
|  | 196 | 
|  | 197 ############################ gene ############################################# | 
|  | 198 def data_gene(gene: pd.DataFrame, type_gene: str, name: str, gene_custom: Optional[Dict[str, str]]) -> Dict[str, str]: | 
|  | 199     """ | 
|  | 200     Process gene data to ensure correct formatting and handle duplicates. | 
|  | 201 | 
|  | 202     Args: | 
|  | 203         gene (DataFrame): DataFrame containing gene data. | 
|  | 204         type_gene (str): Type of gene data (e.g., 'hugo_id', 'ensembl_gene_id', 'symbol', 'entrez_id'). | 
|  | 205         name (str): Name of the dataset. | 
|  | 206         gene_custom (dict or None): Custom gene data dictionary if provided. | 
|  | 207 | 
|  | 208     Returns: | 
|  | 209         dict: A dictionary containing gene data with gene IDs as keys and corresponding values. | 
|  | 210     """ | 
| 309 | 211 | 
| 93 | 212     for i in range(len(gene)): | 
|  | 213         tmp = gene.iloc[i, 0] | 
|  | 214         gene.iloc[i, 0] = tmp.strip().split('.')[0] | 
|  | 215 | 
|  | 216     gene_dup = [item for item, count in | 
|  | 217                collections.Counter(gene[gene.columns[0]]).items() if count > 1] | 
|  | 218     pat_dup = [item for item, count in | 
|  | 219                collections.Counter(list(gene.columns)).items() if count > 1] | 
| 260 | 220 | 
|  | 221     gene_in_rule = None | 
| 259 | 222 | 
| 93 | 223     if gene_dup: | 
|  | 224         if gene_custom == None: | 
| 264 | 225 | 
| 309 | 226             if str(ARGS.rules_selector) == 'HMRcore': | 
|  | 227                 gene_in_rule = pk.load(open(ARGS.tool_dir + '/local/pickle files/HMRcore_genes.p', 'rb')) | 
| 93 | 228 | 
| 309 | 229             elif str(ARGS.rules_selector) == 'Recon': | 
|  | 230                 gene_in_rule = pk.load(open(ARGS.tool_dir + '/local/pickle files/Recon_genes.p', 'rb')) | 
| 93 | 231 | 
| 309 | 232             elif str(ARGS.rules_selector) == 'ENGRO2': | 
|  | 233                 gene_in_rule = pk.load(open(ARGS.tool_dir + '/local/pickle files/ENGRO2_genes.p', 'rb')) | 
| 263 | 234 | 
| 309 | 235             utils.logWarning(f"{ARGS.tool_dir}'/local/pickle files/ENGRO2_genes.p'", ARGS.out_log) | 
| 259 | 236 | 
| 93 | 237             gene_in_rule = gene_in_rule.get(type_gene) | 
|  | 238 | 
|  | 239         else: | 
|  | 240             gene_in_rule = gene_custom | 
| 260 | 241 | 
| 93 | 242         tmp = [] | 
|  | 243         for i in gene_dup: | 
|  | 244             if gene_in_rule.get(i) == 'ok': | 
|  | 245                 tmp.append(i) | 
|  | 246         if tmp: | 
|  | 247             sys.exit('Execution aborted because gene ID ' | 
|  | 248                      +str(tmp)+' in '+name+' is duplicated\n') | 
|  | 249 | 
|  | 250     if pat_dup: utils.logWarning(f"Warning: duplicated label\n{pat_dup} in {name}", ARGS.out_log) | 
|  | 251     return (gene.set_index(gene.columns[0])).to_dict() | 
|  | 252 | 
|  | 253 ############################ resolve ########################################## | 
|  | 254 def replace_gene_value(l :str, d :str) -> Tuple[Union[int, float], list]: | 
|  | 255     """ | 
|  | 256     Replace gene identifiers with corresponding values from a dictionary. | 
|  | 257 | 
|  | 258     Args: | 
|  | 259         l (str): String of gene identifier. | 
|  | 260         d (str): String corresponding to its value. | 
|  | 261 | 
|  | 262     Returns: | 
|  | 263         tuple: A tuple containing two lists: the first list contains replaced values, and the second list contains any errors encountered during replacement. | 
|  | 264     """ | 
|  | 265     tmp = [] | 
|  | 266     err = [] | 
|  | 267     while l: | 
|  | 268         if isinstance(l[0], list): | 
|  | 269             tmp_rules, tmp_err = replace_gene_value(l[0], d) | 
|  | 270             tmp.append(tmp_rules) | 
|  | 271             err.extend(tmp_err) | 
|  | 272         else: | 
|  | 273             value = replace_gene(l[0], d) | 
|  | 274             tmp.append(value) | 
|  | 275             if value == None: | 
|  | 276                 err.append(l[0]) | 
|  | 277         l = l[1:] | 
|  | 278     return (tmp, err) | 
|  | 279 | 
|  | 280 def replace_gene(l :str, d :str) -> Union[int, float]: | 
|  | 281     """ | 
|  | 282     Replace a single gene identifier with its corresponding value from a dictionary. | 
|  | 283 | 
|  | 284     Args: | 
|  | 285         l (str): Gene identifier to replace. | 
|  | 286         d (str): String corresponding to its value. | 
|  | 287 | 
|  | 288     Returns: | 
|  | 289         float/int: Corresponding value from the dictionary if found, None otherwise. | 
|  | 290 | 
|  | 291     Raises: | 
|  | 292         sys.exit: If the value associated with the gene identifier is not valid. | 
|  | 293     """ | 
|  | 294     if l =='and' or l == 'or': | 
|  | 295         return l | 
|  | 296     else: | 
|  | 297         value = d.get(l, None) | 
|  | 298         if not(value == None or isinstance(value, (int, float))): | 
|  | 299             sys.exit('Execution aborted: ' + value + ' value not valid\n') | 
|  | 300         return value | 
|  | 301 | 
|  | 302 T = TypeVar("T", bound = Optional[Union[int, float]]) | 
|  | 303 def computes(val1 :T, op :str, val2 :T, cn :bool) -> T: | 
|  | 304     """ | 
|  | 305     Compute the RAS value between two value and an operator ('and' or 'or'). | 
|  | 306 | 
|  | 307     Args: | 
|  | 308         val1(Optional(Union[float, int])): First value. | 
|  | 309         op (str): Operator ('and' or 'or'). | 
|  | 310         val2(Optional(Union[float, int])): Second value. | 
|  | 311         cn (bool): Control boolean value. | 
|  | 312 | 
|  | 313     Returns: | 
|  | 314         Optional(Union[float, int]): Result of the computation. | 
|  | 315     """ | 
|  | 316     if val1 != None and val2 != None: | 
|  | 317         if op == 'and': | 
|  | 318             return min(val1, val2) | 
|  | 319         else: | 
|  | 320             return val1 + val2 | 
|  | 321     elif op == 'and': | 
|  | 322         if cn is True: | 
|  | 323             if val1 != None: | 
|  | 324                 return val1 | 
|  | 325             elif val2 != None: | 
|  | 326                 return val2 | 
|  | 327             else: | 
|  | 328                 return None | 
|  | 329         else: | 
|  | 330             return None | 
|  | 331     else: | 
|  | 332         if val1 != None: | 
|  | 333             return val1 | 
|  | 334         elif val2 != None: | 
|  | 335             return val2 | 
|  | 336         else: | 
|  | 337             return None | 
|  | 338 | 
|  | 339 # ris should be Literal[None] but Literal is not supported in Python 3.7 | 
|  | 340 def control(ris, l :List[Union[int, float, list]], cn :bool) -> Union[bool, int, float]: #Union[Literal[False], int, float]: | 
|  | 341     """ | 
|  | 342     Control the format of the expression. | 
|  | 343 | 
|  | 344     Args: | 
|  | 345         ris: Intermediate result. | 
|  | 346         l (list): Expression to control. | 
|  | 347         cn (bool): Control boolean value. | 
|  | 348 | 
|  | 349     Returns: | 
|  | 350         Union[Literal[False], int, float]: Result of the control. | 
|  | 351     """ | 
|  | 352     if len(l) == 1: | 
|  | 353         if isinstance(l[0], (float, int)) or l[0] == None: | 
|  | 354             return l[0] | 
|  | 355         elif isinstance(l[0], list): | 
|  | 356             return control(None, l[0], cn) | 
|  | 357         else: | 
|  | 358             return False | 
|  | 359     elif len(l) > 2: | 
|  | 360         return control_list(ris, l, cn) | 
|  | 361     else: | 
|  | 362         return False | 
|  | 363 | 
|  | 364 def control_list(ris, l :List[Optional[Union[float, int, list]]], cn :bool) -> Optional[bool]: #Optional[Literal[False]]: | 
|  | 365     """ | 
|  | 366     Control the format of a list of expressions. | 
|  | 367 | 
|  | 368     Args: | 
|  | 369         ris: Intermediate result. | 
|  | 370         l (list): List of expressions to control. | 
|  | 371         cn (bool): Control boolean value. | 
|  | 372 | 
|  | 373     Returns: | 
|  | 374         Optional[Literal[False]]: Result of the control. | 
|  | 375     """ | 
|  | 376     while l: | 
|  | 377         if len(l) == 1: | 
|  | 378             return False | 
|  | 379         elif (isinstance(l[0], (float, int)) or | 
|  | 380               l[0] == None) and l[1] in ['and', 'or']: | 
|  | 381             if isinstance(l[2], (float, int)) or l[2] == None: | 
|  | 382                 ris = computes(l[0], l[1], l[2], cn) | 
|  | 383             elif isinstance(l[2], list): | 
|  | 384                 tmp = control(None, l[2], cn) | 
|  | 385                 if tmp is False: | 
|  | 386                     return False | 
|  | 387                 else: | 
|  | 388                     ris = computes(l[0], l[1], tmp, cn) | 
|  | 389             else: | 
|  | 390                 return False | 
|  | 391             l = l[3:] | 
|  | 392         elif l[0] in ['and', 'or']: | 
|  | 393             if isinstance(l[1], (float, int)) or l[1] == None: | 
|  | 394                 ris = computes(ris, l[0], l[1], cn) | 
|  | 395             elif isinstance(l[1], list): | 
|  | 396                 tmp = control(None,l[1], cn) | 
|  | 397                 if tmp is False: | 
|  | 398                     return False | 
|  | 399                 else: | 
|  | 400                     ris = computes(ris, l[0], tmp, cn) | 
|  | 401             else: | 
|  | 402                 return False | 
|  | 403             l = l[2:] | 
|  | 404         elif isinstance(l[0], list) and l[1] in ['and', 'or']: | 
|  | 405             if isinstance(l[2], (float, int)) or l[2] == None: | 
|  | 406                 tmp = control(None, l[0], cn) | 
|  | 407                 if tmp is False: | 
|  | 408                     return False | 
|  | 409                 else: | 
|  | 410                     ris = computes(tmp, l[1], l[2], cn) | 
|  | 411             elif isinstance(l[2], list): | 
|  | 412                 tmp = control(None, l[0], cn) | 
|  | 413                 tmp2 = control(None, l[2], cn) | 
|  | 414                 if tmp is False or tmp2 is False: | 
|  | 415                     return False | 
|  | 416                 else: | 
|  | 417                     ris = computes(tmp, l[1], tmp2, cn) | 
|  | 418             else: | 
|  | 419                 return False | 
|  | 420             l = l[3:] | 
|  | 421         else: | 
|  | 422             return False | 
|  | 423     return ris | 
|  | 424 | 
|  | 425 ResolvedRules = Dict[str, List[Optional[Union[float, int]]]] | 
|  | 426 def resolve(genes: Dict[str, str], rules: List[str], ids: List[str], resolve_none: bool, name: str) -> Tuple[Optional[ResolvedRules], Optional[list]]: | 
|  | 427     """ | 
|  | 428     Resolve rules using gene data to compute scores for each rule. | 
|  | 429 | 
|  | 430     Args: | 
|  | 431         genes (dict): Dictionary containing gene data with gene IDs as keys and corresponding values. | 
|  | 432         rules (list): List of rules to resolve. | 
|  | 433         ids (list): List of IDs corresponding to the rules. | 
|  | 434         resolve_none (bool): Flag indicating whether to resolve None values in the rules. | 
|  | 435         name (str): Name of the dataset. | 
|  | 436 | 
|  | 437     Returns: | 
|  | 438         tuple: A tuple containing resolved rules as a dictionary and a list of gene IDs not found in the data. | 
|  | 439     """ | 
|  | 440     resolve_rules = {} | 
|  | 441     not_found = [] | 
|  | 442     flag = False | 
|  | 443     for key, value in genes.items(): | 
|  | 444         tmp_resolve = [] | 
|  | 445         for i in range(len(rules)): | 
|  | 446             tmp = rules[i] | 
|  | 447             if tmp: | 
|  | 448                 tmp, err = replace_gene_value(tmp, value) | 
|  | 449                 if err: | 
|  | 450                     not_found.extend(err) | 
|  | 451                 ris = control(None, tmp, resolve_none) | 
|  | 452                 if ris is False or ris == None: | 
|  | 453                     tmp_resolve.append(None) | 
|  | 454                 else: | 
|  | 455                     tmp_resolve.append(ris) | 
|  | 456                     flag = True | 
|  | 457             else: | 
|  | 458                 tmp_resolve.append(None) | 
|  | 459         resolve_rules[key] = tmp_resolve | 
|  | 460 | 
|  | 461     if flag is False: | 
|  | 462         utils.logWarning( | 
|  | 463             f"Warning: no computable score (due to missing gene values) for class {name}, the class has been disregarded", | 
|  | 464             ARGS.out_log) | 
|  | 465 | 
|  | 466         return (None, None) | 
|  | 467 | 
|  | 468     return (resolve_rules, list(set(not_found))) | 
|  | 469 ############################ create_ras ####################################### | 
|  | 470 def create_ras(resolve_rules: Optional[ResolvedRules], dataset_name: str, rules: List[str], ids: List[str], file: str) -> None: | 
|  | 471     """ | 
|  | 472     Create a RAS (Reaction Activity Score) file from resolved rules. | 
|  | 473 | 
|  | 474     Args: | 
|  | 475         resolve_rules (dict): Dictionary containing resolved rules. | 
|  | 476         dataset_name (str): Name of the dataset. | 
|  | 477         rules (list): List of rules. | 
|  | 478         file (str): Path to the output RAS file. | 
|  | 479 | 
|  | 480     Returns: | 
|  | 481         None | 
|  | 482     """ | 
|  | 483     if resolve_rules is None: | 
|  | 484         utils.logWarning(f"Couldn't generate RAS for current dataset: {dataset_name}", ARGS.out_log) | 
|  | 485 | 
|  | 486     for geni in resolve_rules.values(): | 
|  | 487         for i, valori in enumerate(geni): | 
|  | 488             if valori == None: | 
|  | 489                 geni[i] = 'None' | 
|  | 490 | 
|  | 491     output_ras = pd.DataFrame.from_dict(resolve_rules) | 
|  | 492 | 
|  | 493     output_ras.insert(0, 'Reactions', ids) | 
|  | 494     output_to_csv = pd.DataFrame.to_csv(output_ras, sep = '\t', index = False) | 
|  | 495 | 
|  | 496     text_file = open(file, "w") | 
|  | 497 | 
|  | 498     text_file.write(output_to_csv) | 
|  | 499     text_file.close() | 
|  | 500 | 
|  | 501 ################################- NEW RAS COMPUTATION -################################ | 
|  | 502 Expr = Optional[Union[int, float]] | 
|  | 503 Ras  = Expr | 
|  | 504 def ras_for_cell_lines(dataset: pd.DataFrame, rules: Dict[str, ruleUtils.OpList]) -> Dict[str, Dict[str, Ras]]: | 
|  | 505     """ | 
|  | 506     Generates the RAS scores for each cell line found in the dataset. | 
|  | 507 | 
|  | 508     Args: | 
|  | 509         dataset (pd.DataFrame): Dataset containing gene values. | 
|  | 510         rules (dict): The dict containing reaction ids as keys and rules as values. | 
|  | 511 | 
|  | 512     Side effects: | 
|  | 513         dataset : mut | 
|  | 514 | 
|  | 515     Returns: | 
|  | 516         dict: A dictionary where each key corresponds to a cell line name and each value is a dictionary | 
|  | 517         where each key corresponds to a reaction ID and each value is its computed RAS score. | 
|  | 518     """ | 
|  | 519     ras_values_by_cell_line = {} | 
|  | 520     dataset.set_index(dataset.columns[0], inplace=True) | 
| 381 | 521 | 
|  | 522     for cell_line_name in dataset.columns: #[1:]: | 
| 93 | 523         cell_line = dataset[cell_line_name].to_dict() | 
|  | 524         ras_values_by_cell_line[cell_line_name]= get_ras_values(rules, cell_line) | 
|  | 525     return ras_values_by_cell_line | 
|  | 526 | 
|  | 527 def get_ras_values(value_rules: Dict[str, ruleUtils.OpList], dataset: Dict[str, Expr]) -> Dict[str, Ras]: | 
|  | 528     """ | 
|  | 529     Computes the RAS (Reaction Activity Score) values for each rule in the given dict. | 
|  | 530 | 
|  | 531     Args: | 
|  | 532         value_rules (dict): A dictionary where keys are reaction ids and values are OpLists. | 
|  | 533         dataset : gene expression data of one cell line. | 
|  | 534 | 
|  | 535     Returns: | 
|  | 536         dict: A dictionary where keys are reaction ids and values are the computed RAS values for each rule. | 
|  | 537     """ | 
|  | 538     return {key: ras_op_list(op_list, dataset) for key, op_list in value_rules.items()} | 
|  | 539 | 
|  | 540 def get_gene_expr(dataset :Dict[str, Expr], name :str) -> Expr: | 
|  | 541     """ | 
|  | 542     Extracts the gene expression of the given gene from a cell line dataset. | 
|  | 543 | 
|  | 544     Args: | 
|  | 545         dataset : gene expression data of one cell line. | 
|  | 546         name : gene name. | 
|  | 547 | 
|  | 548     Returns: | 
|  | 549         Expr : the gene's expression value. | 
|  | 550     """ | 
|  | 551     expr = dataset.get(name, None) | 
|  | 552     if expr is None: ERRORS.append(name) | 
|  | 553 | 
|  | 554     return expr | 
|  | 555 | 
|  | 556 def ras_op_list(op_list: ruleUtils.OpList, dataset: Dict[str, Expr]) -> Ras: | 
|  | 557     """ | 
|  | 558     Computes recursively the RAS (Reaction Activity Score) value for the given OpList, considering the specified flag to control None behavior. | 
|  | 559 | 
|  | 560     Args: | 
|  | 561         op_list (OpList): The OpList representing a rule with gene values. | 
|  | 562         dataset : gene expression data of one cell line. | 
|  | 563 | 
|  | 564     Returns: | 
|  | 565         Ras: The computed RAS value for the given OpList. | 
|  | 566     """ | 
|  | 567     op = op_list.op | 
|  | 568     ras_value :Ras = None | 
|  | 569     if not op: return get_gene_expr(dataset, op_list[0]) | 
|  | 570     if op is ruleUtils.RuleOp.AND and not ARGS.none and None in op_list: return None | 
|  | 571 | 
|  | 572     for i in range(len(op_list)): | 
|  | 573         item = op_list[i] | 
|  | 574         if isinstance(item, ruleUtils.OpList): | 
|  | 575             item = ras_op_list(item, dataset) | 
|  | 576 | 
|  | 577         else: | 
|  | 578           item = get_gene_expr(dataset, item) | 
|  | 579 | 
|  | 580         if item is None: | 
|  | 581           if op is ruleUtils.RuleOp.AND and not ARGS.none: return None | 
|  | 582           continue | 
|  | 583 | 
|  | 584         if ras_value is None: | 
|  | 585           ras_value = item | 
|  | 586         else: | 
|  | 587           ras_value = ras_value + item if op is ruleUtils.RuleOp.OR else min(ras_value, item) | 
|  | 588 | 
|  | 589     return ras_value | 
|  | 590 | 
|  | 591 def save_as_tsv(rasScores: Dict[str, Dict[str, Ras]], reactions :List[str]) -> None: | 
|  | 592     """ | 
|  | 593     Save computed ras scores to the given path, as a tsv file. | 
|  | 594 | 
|  | 595     Args: | 
|  | 596         rasScores : the computed ras scores. | 
|  | 597         path : the output tsv file's path. | 
|  | 598 | 
|  | 599     Returns: | 
|  | 600         None | 
|  | 601     """ | 
|  | 602     for scores in rasScores.values(): # this is actually a lot faster than using the ootb dataframe metod, sadly | 
|  | 603         for reactId, score in scores.items(): | 
|  | 604             if score is None: scores[reactId] = "None" | 
|  | 605 | 
|  | 606     output_ras = pd.DataFrame.from_dict(rasScores) | 
|  | 607     output_ras.insert(0, 'Reactions', reactions) | 
|  | 608     output_ras.to_csv(ARGS.ras_output, sep = '\t', index = False) | 
|  | 609 | 
|  | 610 ############################ MAIN ############################################# | 
|  | 611 #TODO: not used but keep, it will be when the new translator dicts will be used. | 
|  | 612 def translateGene(geneName :str, encoding :str, geneTranslator :Dict[str, Dict[str, str]]) -> str: | 
|  | 613     """ | 
|  | 614     Translate gene from any supported encoding to HugoID. | 
|  | 615 | 
|  | 616     Args: | 
|  | 617         geneName (str): the name of the gene in its current encoding. | 
|  | 618         encoding (str): the encoding. | 
|  | 619         geneTranslator (Dict[str, Dict[str, str]]): the dict containing all supported gene names | 
|  | 620         and encodings in the current model, mapping each to the corresponding HugoID encoding. | 
|  | 621 | 
|  | 622     Raises: | 
|  | 623         ValueError: When the gene isn't supported in the model. | 
|  | 624 | 
|  | 625     Returns: | 
|  | 626         str: the gene in HugoID encoding. | 
|  | 627     """ | 
|  | 628     supportedGenesInEncoding = geneTranslator[encoding] | 
|  | 629     if geneName in supportedGenesInEncoding: return supportedGenesInEncoding[geneName] | 
|  | 630     raise ValueError(f"Gene \"{geneName}\" non trovato, verifica di star utilizzando il modello corretto!") | 
|  | 631 | 
|  | 632 def load_custom_rules() -> Dict[str, ruleUtils.OpList]: | 
|  | 633     """ | 
|  | 634     Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be | 
|  | 635     performed, significantly impacting the runtime. | 
|  | 636 | 
|  | 637     Returns: | 
|  | 638         Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules. | 
|  | 639     """ | 
| 398 | 640     datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload) # actual file, stored in galaxy as a .dat | 
|  | 641 | 
|  | 642     try: filenamePath = utils.FilePath.fromStrPath(ARGS.model_upload_name) # file's name in input, to determine its original ext | 
| 93 | 643     except utils.PathErr as err: | 
|  | 644         raise utils.PathErr(filenamePath, f"Please make sure your file's name is a valid file path, {err.msg}") | 
|  | 645 | 
|  | 646     if filenamePath.ext is utils.FileFormat.PICKLE: return utils.readPickle(datFilePath) | 
|  | 647 | 
| 381 | 648     dict_rule = {} | 
|  | 649     for line in utils.readCsv(datFilePath, delimiter = "\t"): | 
|  | 650         if line[2] == "": | 
|  | 651             dict_rule[line[0]] = ruleUtils.OpList([""]) | 
|  | 652         else: | 
|  | 653             dict_rule[line[0]] = ruleUtils.parseRuleToNestedList(line[2]) | 
|  | 654 | 
| 93 | 655     # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed. | 
| 381 | 656     return dict_rule | 
| 93 | 657 | 
| 147 | 658 def main(args:List[str] = None) -> None: | 
| 93 | 659     """ | 
|  | 660     Initializes everything and sets the program in motion based on the fronted input arguments. | 
|  | 661 | 
|  | 662     Returns: | 
|  | 663         None | 
|  | 664     """ | 
|  | 665     # get args from frontend (related xml) | 
|  | 666     global ARGS | 
| 147 | 667     ARGS = process_args(args) | 
| 309 | 668 | 
| 93 | 669     # read dataset | 
|  | 670     dataset = read_dataset(ARGS.input, "dataset") | 
|  | 671     dataset.iloc[:, 0] = (dataset.iloc[:, 0]).astype(str) | 
|  | 672 | 
|  | 673     # remove versioning from gene names | 
|  | 674     dataset.iloc[:, 0] = dataset.iloc[:, 0].str.split('.').str[0] | 
|  | 675 | 
| 398 | 676     rules = load_custom_rules() | 
|  | 677     reactions = list(rules.keys()) | 
| 309 | 678 | 
| 398 | 679     save_as_tsv(ras_for_cell_lines(dataset, rules), reactions) | 
|  | 680     if ERRORS: utils.logWarning( | 
|  | 681         f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}", | 
|  | 682         ARGS.out_log) | 
|  | 683 | 
|  | 684 | 
|  | 685     ############ | 
| 93 | 686 | 
| 398 | 687     # handle custom models | 
|  | 688     #model :utils.Model = ARGS.rules_selector | 
|  | 689 | 
|  | 690     #if model is utils.Model.Custom: | 
|  | 691     #    rules = load_custom_rules() | 
|  | 692     #    reactions = list(rules.keys()) | 
|  | 693 | 
|  | 694     #    save_as_tsv(ras_for_cell_lines(dataset, rules), reactions) | 
|  | 695     #    if ERRORS: utils.logWarning( | 
|  | 696     #        f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}", | 
|  | 697     #        ARGS.out_log) | 
| 93 | 698 | 
| 398 | 699     #    return | 
| 93 | 700 | 
|  | 701     # This is the standard flow of the ras_generator program, for non-custom models. | 
| 398 | 702     #name = "RAS Dataset" | 
|  | 703     #type_gene = gene_type(dataset.iloc[0, 0], name) | 
| 93 | 704 | 
| 398 | 705     #rules      = model.getRules(ARGS.tool_dir) | 
|  | 706     #genes      = data_gene(dataset, type_gene, name, None) | 
|  | 707     #ids, rules = load_id_rules(rules.get(type_gene)) | 
| 381 | 708 | 
| 398 | 709     #resolve_rules, err = resolve(genes, rules, ids, ARGS.none, name) | 
|  | 710     #create_ras(resolve_rules, name, rules, ids, ARGS.ras_output) | 
| 93 | 711 | 
| 398 | 712     #if err: utils.logWarning( | 
|  | 713     #    f"Warning: gene(s) {err} not found in class \"{name}\", " + | 
|  | 714     #    "the expression level for this gene will be considered NaN", | 
|  | 715     #    ARGS.out_log) | 
| 93 | 716 | 
|  | 717     print("Execution succeded") | 
|  | 718 | 
|  | 719 ############################################################################### | 
|  | 720 if __name__ == "__main__": | 
| 309 | 721     main() |