Mercurial > repos > bimib > cobraxy
comparison COBRAxy/ras_generator.py @ 489:97eea560a10f draft
Uploaded
| author | francesco_lapi |
|---|---|
| date | Mon, 29 Sep 2025 10:33:26 +0000 |
| parents | 187cee1a00e2 |
| children | c6ea189ea7e9 |
comparison
equal
deleted
inserted
replaced
| 488:e0bcc61b2feb | 489:97eea560a10f |
|---|---|
| 1 """ | |
| 2 Generate Reaction Activity Scores (RAS) from a gene expression dataset and GPR rules. | |
| 3 | |
| 4 The script reads a tabular dataset (genes x samples) and a rules file (GPRs), | |
| 5 computes RAS per reaction for each sample/cell line, and writes a tabular output. | |
| 6 """ | |
| 1 from __future__ import division | 7 from __future__ import division |
| 2 # galaxy complains this ^^^ needs to be at the very beginning of the file, for some reason. | |
| 3 import sys | 8 import sys |
| 4 import argparse | 9 import argparse |
| 5 import collections | 10 import collections |
| 6 import pandas as pd | 11 import pandas as pd |
| 7 import pickle as pk | 12 import pickle as pk |
| 8 import utils.general_utils as utils | 13 import utils.general_utils as utils |
| 9 import utils.rule_parsing as ruleUtils | 14 import utils.rule_parsing as ruleUtils |
| 10 from typing import Union, Optional, List, Dict, Tuple, TypeVar | 15 from typing import Union, Optional, List, Dict, Tuple, TypeVar |
| 11 import os | |
| 12 | 16 |
| 13 ERRORS = [] | 17 ERRORS = [] |
| 14 ########################## argparse ########################################## | 18 ########################## argparse ########################################## |
| 15 ARGS :argparse.Namespace | 19 ARGS :argparse.Namespace |
| 16 def process_args(args:List[str] = None) -> argparse.Namespace: | 20 def process_args(args:List[str] = None) -> argparse.Namespace: |
| 25 """ | 29 """ |
| 26 parser = argparse.ArgumentParser( | 30 parser = argparse.ArgumentParser( |
| 27 usage = '%(prog)s [options]', | 31 usage = '%(prog)s [options]', |
| 28 description = "process some value's genes to create a comparison's map.") | 32 description = "process some value's genes to create a comparison's map.") |
| 29 | 33 |
| 30 parser.add_argument( | 34 parser.add_argument("-rl", "--model_upload", type = str, |
| 31 '-rs', '--rules_selector', | 35 help = "path to input file containing the rules") |
| 32 type = utils.Model, default = utils.Model.ENGRO2, choices = list(utils.Model), | 36 |
| 33 help = 'chose which type of dataset you want use') | 37 parser.add_argument("-rn", "--model_upload_name", type = str, help = "custom rules name") |
| 34 | 38 # Galaxy converts files into .dat, this helps infer the original extension when needed. |
| 35 parser.add_argument("-rl", "--rule_list", type = str, | |
| 36 help = "path to input file with custom rules, if provided") | |
| 37 | |
| 38 parser.add_argument("-rn", "--rules_name", type = str, help = "custom rules name") | |
| 39 # ^ I need this because galaxy converts my files into .dat but I need to know what extension they were in | |
| 40 | 39 |
| 41 parser.add_argument( | 40 parser.add_argument( |
| 42 '-n', '--none', | 41 '-n', '--none', |
| 43 type = utils.Bool("none"), default = True, | 42 type = utils.Bool("none"), default = True, |
| 44 help = 'compute Nan values') | 43 help = 'compute Nan values') |
| 52 '-ol', '--out_log', | 51 '-ol', '--out_log', |
| 53 type = str, | 52 type = str, |
| 54 help = "Output log") | 53 help = "Output log") |
| 55 | 54 |
| 56 parser.add_argument( | 55 parser.add_argument( |
| 57 '-in', '--input', #id รจ diventato in | 56 '-in', '--input', |
| 58 type = str, | 57 type = str, |
| 59 help = 'input dataset') | 58 help = 'input dataset') |
| 60 | 59 |
| 61 parser.add_argument( | 60 parser.add_argument( |
| 62 '-ra', '--ras_output', | 61 '-ra', '--ras_output', |
| 256 return (gene.set_index(gene.columns[0])).to_dict() | 255 return (gene.set_index(gene.columns[0])).to_dict() |
| 257 | 256 |
| 258 ############################ resolve ########################################## | 257 ############################ resolve ########################################## |
| 259 def replace_gene_value(l :str, d :str) -> Tuple[Union[int, float], list]: | 258 def replace_gene_value(l :str, d :str) -> Tuple[Union[int, float], list]: |
| 260 """ | 259 """ |
| 261 Replace gene identifiers with corresponding values from a dictionary. | 260 Replace gene identifiers in a parsed rule expression with values from a dict. |
| 262 | 261 |
| 263 Args: | 262 Args: |
| 264 l (str): String of gene identifier. | 263 l: Parsed rule as a nested list structure (strings, lists, and operators). |
| 265 d (str): String corresponding to its value. | 264 d: Dict mapping gene IDs to numeric values. |
| 266 | 265 |
| 267 Returns: | 266 Returns: |
| 268 tuple: A tuple containing two lists: the first list contains replaced values, and the second list contains any errors encountered during replacement. | 267 tuple: (new_expression, not_found_genes) |
| 269 """ | 268 """ |
| 270 tmp = [] | 269 tmp = [] |
| 271 err = [] | 270 err = [] |
| 272 while l: | 271 while l: |
| 273 if isinstance(l[0], list): | 272 if isinstance(l[0], list): |
| 280 if value == None: | 279 if value == None: |
| 281 err.append(l[0]) | 280 err.append(l[0]) |
| 282 l = l[1:] | 281 l = l[1:] |
| 283 return (tmp, err) | 282 return (tmp, err) |
| 284 | 283 |
| 285 def replace_gene(l :str, d :str) -> Union[int, float]: | 284 def replace_gene(l: str, d: Dict[str, Union[int, float]]) -> Union[int, float, None]: |
| 286 """ | 285 """ |
| 287 Replace a single gene identifier with its corresponding value from a dictionary. | 286 Replace a single gene identifier with its corresponding value from a dictionary. |
| 288 | 287 |
| 289 Args: | 288 Args: |
| 290 l (str): Gene identifier to replace. | 289 l (str): Gene identifier to replace. |
| 291 d (str): String corresponding to its value. | 290 d (dict): Dict mapping gene IDs to numeric values. |
| 292 | 291 |
| 293 Returns: | 292 Returns: |
| 294 float/int: Corresponding value from the dictionary if found, None otherwise. | 293 float/int/None: Corresponding value from the dictionary if found, None otherwise. |
| 295 | 294 |
| 296 Raises: | 295 Raises: |
| 297 sys.exit: If the value associated with the gene identifier is not valid. | 296 sys.exit: If the value associated with the gene identifier is not valid. |
| 298 """ | 297 """ |
| 299 if l =='and' or l == 'or': | 298 if l =='and' or l == 'or': |
| 511 Generates the RAS scores for each cell line found in the dataset. | 510 Generates the RAS scores for each cell line found in the dataset. |
| 512 | 511 |
| 513 Args: | 512 Args: |
| 514 dataset (pd.DataFrame): Dataset containing gene values. | 513 dataset (pd.DataFrame): Dataset containing gene values. |
| 515 rules (dict): The dict containing reaction ids as keys and rules as values. | 514 rules (dict): The dict containing reaction ids as keys and rules as values. |
| 516 | 515 |
| 517 Side effects: | 516 Note: |
| 518 dataset : mut | 517 Modifies dataset in place by setting the first column as index. |
| 519 | 518 |
| 520 Returns: | 519 Returns: |
| 521 dict: A dictionary where each key corresponds to a cell line name and each value is a dictionary | 520 dict: A dictionary where each key corresponds to a cell line name and each value is a dictionary |
| 522 where each key corresponds to a reaction ID and each value is its computed RAS score. | 521 where each key corresponds to a reaction ID and each value is its computed RAS score. |
| 523 """ | 522 """ |
| 524 ras_values_by_cell_line = {} | 523 ras_values_by_cell_line = {} |
| 525 dataset.set_index(dataset.columns[0], inplace=True) | 524 dataset.set_index(dataset.columns[0], inplace=True) |
| 526 # Considera tutte le colonne tranne la prima in cui ci sono gli hugo quindi va scartata | 525 |
| 527 for cell_line_name in dataset.columns[1:]: | 526 for cell_line_name in dataset.columns: #[1:]: |
| 528 cell_line = dataset[cell_line_name].to_dict() | 527 cell_line = dataset[cell_line_name].to_dict() |
| 529 ras_values_by_cell_line[cell_line_name]= get_ras_values(rules, cell_line) | 528 ras_values_by_cell_line[cell_line_name]= get_ras_values(rules, cell_line) |
| 530 return ras_values_by_cell_line | 529 return ras_values_by_cell_line |
| 531 | 530 |
| 532 def get_ras_values(value_rules: Dict[str, ruleUtils.OpList], dataset: Dict[str, Expr]) -> Dict[str, Ras]: | 531 def get_ras_values(value_rules: Dict[str, ruleUtils.OpList], dataset: Dict[str, Expr]) -> Dict[str, Ras]: |
| 593 | 592 |
| 594 return ras_value | 593 return ras_value |
| 595 | 594 |
| 596 def save_as_tsv(rasScores: Dict[str, Dict[str, Ras]], reactions :List[str]) -> None: | 595 def save_as_tsv(rasScores: Dict[str, Dict[str, Ras]], reactions :List[str]) -> None: |
| 597 """ | 596 """ |
| 598 Save computed ras scores to the given path, as a tsv file. | 597 Save computed RAS scores to ARGS.ras_output as a TSV file. |
| 599 | 598 |
| 600 Args: | 599 Args: |
| 601 rasScores : the computed ras scores. | 600 rasScores : the computed ras scores. |
| 602 path : the output tsv file's path. | 601 reactions : the list of reaction IDs, used as the first column. |
| 603 | 602 |
| 604 Returns: | 603 Returns: |
| 605 None | 604 None |
| 606 """ | 605 """ |
| 607 for scores in rasScores.values(): # this is actually a lot faster than using the ootb dataframe metod, sadly | 606 for scores in rasScores.values(): # this is actually a lot faster than using the ootb dataframe metod, sadly |
| 630 Returns: | 629 Returns: |
| 631 str: the gene in HugoID encoding. | 630 str: the gene in HugoID encoding. |
| 632 """ | 631 """ |
| 633 supportedGenesInEncoding = geneTranslator[encoding] | 632 supportedGenesInEncoding = geneTranslator[encoding] |
| 634 if geneName in supportedGenesInEncoding: return supportedGenesInEncoding[geneName] | 633 if geneName in supportedGenesInEncoding: return supportedGenesInEncoding[geneName] |
| 635 raise ValueError(f"Gene \"{geneName}\" non trovato, verifica di star utilizzando il modello corretto!") | 634 raise ValueError(f"Gene '{geneName}' not found. Please verify you are using the correct model.") |
| 636 | 635 |
| 637 def load_custom_rules() -> Dict[str, ruleUtils.OpList]: | 636 def load_custom_rules() -> Dict[str, ruleUtils.OpList]: |
| 638 """ | 637 """ |
| 639 Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be | 638 Opens custom rules file and extracts the rules. If the file is in .csv format an additional parsing step will be |
| 640 performed, significantly impacting the runtime. | 639 performed, significantly impacting the runtime. |
| 641 | 640 |
| 642 Returns: | 641 Returns: |
| 643 Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules. | 642 Dict[str, ruleUtils.OpList] : dict mapping reaction IDs to rules. |
| 644 """ | 643 """ |
| 645 datFilePath = utils.FilePath.fromStrPath(ARGS.rule_list) # actual file, stored in galaxy as a .dat | 644 datFilePath = utils.FilePath.fromStrPath(ARGS.model_upload) # actual file, stored in Galaxy as a .dat |
| 646 | 645 |
| 647 try: filenamePath = utils.FilePath.fromStrPath(ARGS.rules_name) # file's name in input, to determine its original ext | 646 dict_rule = {} |
| 648 except utils.PathErr as err: | 647 |
| 649 raise utils.PathErr(filenamePath, f"Please make sure your file's name is a valid file path, {err.msg}") | 648 try: |
| 650 | 649 rows = utils.readCsv(datFilePath, delimiter = "\t", skipHeader=False) |
| 651 if filenamePath.ext is utils.FileFormat.PICKLE: return utils.readPickle(datFilePath) | 650 if len(rows) <= 1: |
| 652 | 651 raise ValueError("Model tabular with 1 column is not supported.") |
| 652 | |
| 653 if not rows: | |
| 654 raise ValueError("Model tabular is file is empty.") | |
| 655 | |
| 656 id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR") | |
| 657 | |
| 658 # First, try using a tab delimiter | |
| 659 for line in rows[1:]: | |
| 660 if len(line) <= idx_gpr: | |
| 661 utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log) | |
| 662 continue | |
| 663 | |
| 664 if line[idx_gpr] == "": | |
| 665 dict_rule[line[id_idx]] = ruleUtils.OpList([""]) | |
| 666 else: | |
| 667 dict_rule[line[id_idx]] = ruleUtils.parseRuleToNestedList(line[idx_gpr]) | |
| 668 | |
| 669 except Exception as e: | |
| 670 # If parsing with tabs fails, try comma delimiter | |
| 671 try: | |
| 672 rows = utils.readCsv(datFilePath, delimiter = ",", skipHeader=False) | |
| 673 | |
| 674 if len(rows) <= 1: | |
| 675 raise ValueError("Model tabular with 1 column is not supported.") | |
| 676 | |
| 677 if not rows: | |
| 678 raise ValueError("Model tabular is file is empty.") | |
| 679 | |
| 680 id_idx, idx_gpr = utils.findIdxByName(rows[0], "GPR") | |
| 681 | |
| 682 # Try again parsing row content with the GPR column using comma-separated values | |
| 683 for line in rows[1:]: | |
| 684 if len(line) <= idx_gpr: | |
| 685 utils.logWarning(f"Skipping malformed line: {line}", ARGS.out_log) | |
| 686 continue | |
| 687 | |
| 688 if line[idx_gpr] == "": | |
| 689 dict_rule[line[id_idx]] = ruleUtils.OpList([""]) | |
| 690 else: | |
| 691 dict_rule[line[id_idx]] = ruleUtils.parseRuleToNestedList(line[idx_gpr]) | |
| 692 | |
| 693 except Exception as e2: | |
| 694 raise ValueError(f"Unable to parse rules file. Tried both tab and comma delimiters. Original errors: Tab: {e}, Comma: {e2}") | |
| 695 | |
| 696 if not dict_rule: | |
| 697 raise ValueError("No valid rules found in the uploaded file. Please check the file format.") | |
| 653 # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed. | 698 # csv rules need to be parsed, those in a pickle format are taken to be pre-parsed. |
| 654 return { line[0] : ruleUtils.parseRuleToNestedList(line[1]) for line in utils.readCsv(datFilePath) } | 699 return dict_rule |
| 700 | |
| 655 | 701 |
| 656 def main(args:List[str] = None) -> None: | 702 def main(args:List[str] = None) -> None: |
| 657 """ | 703 """ |
| 658 Initializes everything and sets the program in motion based on the fronted input arguments. | 704 Initializes everything and sets the program in motion based on the fronted input arguments. |
| 659 | 705 |
| 669 dataset.iloc[:, 0] = (dataset.iloc[:, 0]).astype(str) | 715 dataset.iloc[:, 0] = (dataset.iloc[:, 0]).astype(str) |
| 670 | 716 |
| 671 # remove versioning from gene names | 717 # remove versioning from gene names |
| 672 dataset.iloc[:, 0] = dataset.iloc[:, 0].str.split('.').str[0] | 718 dataset.iloc[:, 0] = dataset.iloc[:, 0].str.split('.').str[0] |
| 673 | 719 |
| 674 # handle custom models | 720 rules = load_custom_rules() |
| 675 model :utils.Model = ARGS.rules_selector | 721 reactions = list(rules.keys()) |
| 676 | 722 |
| 677 if model is utils.Model.Custom: | 723 save_as_tsv(ras_for_cell_lines(dataset, rules), reactions) |
| 678 rules = load_custom_rules() | 724 if ERRORS: utils.logWarning( |
| 679 reactions = list(rules.keys()) | 725 f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}", |
| 680 | 726 ARGS.out_log) |
| 681 save_as_tsv(ras_for_cell_lines(dataset, rules), reactions) | 727 |
| 682 if ERRORS: utils.logWarning( | 728 |
| 683 f"The following genes are mentioned in the rules but don't appear in the dataset: {ERRORS}", | 729 print("Execution succeeded") |
| 684 ARGS.out_log) | |
| 685 | |
| 686 return | |
| 687 | |
| 688 # This is the standard flow of the ras_generator program, for non-custom models. | |
| 689 name = "RAS Dataset" | |
| 690 type_gene = gene_type(dataset.iloc[0, 0], name) | |
| 691 | |
| 692 rules = model.getRules(ARGS.tool_dir) | |
| 693 genes = data_gene(dataset, type_gene, name, None) | |
| 694 ids, rules = load_id_rules(rules.get(type_gene)) | |
| 695 | |
| 696 resolve_rules, err = resolve(genes, rules, ids, ARGS.none, name) | |
| 697 create_ras(resolve_rules, name, rules, ids, ARGS.ras_output) | |
| 698 | |
| 699 if err: utils.logWarning( | |
| 700 f"Warning: gene(s) {err} not found in class \"{name}\", " + | |
| 701 "the expression level for this gene will be considered NaN", | |
| 702 ARGS.out_log) | |
| 703 | |
| 704 print("Execution succeded") | |
| 705 | 730 |
| 706 ############################################################################### | 731 ############################################################################### |
| 707 if __name__ == "__main__": | 732 if __name__ == "__main__": |
| 708 main() | 733 main() |
