Mercurial > repos > bimib > cobraxy
comparison COBRAxy/marea.py @ 143:507efdc9d226 draft
Uploaded
| author | luca_milaz |
|---|---|
| date | Tue, 05 Nov 2024 21:42:17 +0000 |
| parents | 41f35c2f0c7b |
| children | a9a490ae198d |
comparison
equal
deleted
inserted
replaced
| 142:accda943dfb9 | 143:507efdc9d226 |
|---|---|
| 13 from PIL import Image | 13 from PIL import Image |
| 14 import os | 14 import os |
| 15 import argparse | 15 import argparse |
| 16 import pyvips | 16 import pyvips |
| 17 from typing import Tuple, Union, Optional, List, Dict | 17 from typing import Tuple, Union, Optional, List, Dict |
| 18 import copy | |
| 18 | 19 |
| 19 ERRORS = [] | 20 ERRORS = [] |
| 20 ########################## argparse ########################################## | 21 ########################## argparse ########################################## |
| 21 ARGS :argparse.Namespace | 22 ARGS :argparse.Namespace |
| 22 def process_args() -> argparse.Namespace: | 23 def process_args() -> argparse.Namespace: |
| 759 | 760 |
| 760 except (TypeError, ZeroDivisionError): continue | 761 except (TypeError, ZeroDivisionError): continue |
| 761 | 762 |
| 762 return tmp, max_z_score | 763 return tmp, max_z_score |
| 763 | 764 |
| 764 def computeEnrichment(metabMap :ET.ElementTree, class_pat :Dict[str, List[List[float]]], ids :List[str], *, fromRAS = True) -> None: | 765 def computeEnrichment(metabMap: ET.ElementTree, class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> List[Tuple[str, str, dict, float]]: |
| 765 """ | 766 """ |
| 766 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the | 767 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the |
| 767 provided metabolic map. | 768 provided metabolic map. |
| 768 | 769 |
| 769 Args: | 770 Args: |
| 771 class_pat : the clustered data. | 772 class_pat : the clustered data. |
| 772 ids : ids for data association. | 773 ids : ids for data association. |
| 773 fromRAS : whether the data to enrich consists of RAS scores. | 774 fromRAS : whether the data to enrich consists of RAS scores. |
| 774 | 775 |
| 775 Returns: | 776 Returns: |
| 776 None | 777 List[Tuple[str, str, dict, float]]: List of tuples with pairs of dataset names, comparison dictionary, and max z-score. |
| 777 | 778 |
| 778 Raises: | 779 Raises: |
| 779 sys.exit : if there are less than 2 classes for comparison | 780 sys.exit : if there are less than 2 classes for comparison |
| 780 | 781 |
| 781 Side effects: | 782 Side effects: |
| 782 metabMap : mut | 783 metabMap : mutates based on calculated enrichment |
| 783 ids : mut | 784 """ |
| 784 """ | 785 class_pat = {k.strip(): v for k, v in class_pat.items()} |
| 785 class_pat = { k.strip() : v for k, v in class_pat.items() } | 786 if (not class_pat) or (len(class_pat.keys()) < 2): |
| 786 #TODO: simplfy this stuff vvv and stop using sys.exit (raise the correct utils error) | 787 sys.exit('Execution aborted: classes provided for comparisons are less than two\n') |
| 787 if (not class_pat) or (len(class_pat.keys()) < 2): sys.exit('Execution aborted: classes provided for comparisons are less than two\n') | 788 |
| 789 enrichment_results = [] | |
| 788 | 790 |
| 789 if ARGS.comparison == "manyvsmany": | 791 if ARGS.comparison == "manyvsmany": |
| 790 for i, j in it.combinations(class_pat.keys(), 2): | 792 for i, j in it.combinations(class_pat.keys(), 2): |
| 791 #TODO: these 2 functions are always called in pair and in this order and need common data, | |
| 792 # some clever refactoring would be appreciated. | |
| 793 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids) | 793 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids) |
| 794 temp_thingsInCommon(comparisonDict, metabMap, max_z_score, i, j, fromRAS) | 794 enrichment_results.append((i, j, comparisonDict, max_z_score)) |
| 795 | 795 |
| 796 elif ARGS.comparison == "onevsrest": | 796 elif ARGS.comparison == "onevsrest": |
| 797 for single_cluster in class_pat.keys(): | 797 for single_cluster in class_pat.keys(): |
| 798 t :List[List[List[float]]] = [] | 798 rest = [item for k, v in class_pat.items() if k != single_cluster for item in v] |
| 799 for k in class_pat.keys(): | |
| 800 if k != single_cluster: | |
| 801 t.append(class_pat.get(k)) | |
| 802 | |
| 803 rest :List[List[float]] = [] | |
| 804 for i in t: | |
| 805 rest = rest + i | |
| 806 | |
| 807 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(single_cluster), rest, ids) | 799 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(single_cluster), rest, ids) |
| 808 temp_thingsInCommon(comparisonDict, metabMap, max_z_score, single_cluster, fromRAS) | 800 enrichment_results.append((single_cluster, "rest", comparisonDict, max_z_score)) |
| 809 | 801 |
| 810 elif ARGS.comparison == "onevsmany": | 802 elif ARGS.comparison == "onevsmany": |
| 811 controlItems = class_pat.get(ARGS.control) | 803 controlItems = class_pat.get(ARGS.control) |
| 812 for otherDataset in class_pat.keys(): | 804 for otherDataset in class_pat.keys(): |
| 813 if otherDataset == ARGS.control: continue | 805 if otherDataset == ARGS.control: |
| 814 | 806 continue |
| 815 comparisonDict, max_z_score = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids) | 807 comparisonDict, max_z_score = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids) |
| 816 temp_thingsInCommon(comparisonDict, metabMap, max_z_score, ARGS.control, otherDataset, fromRAS) | 808 enrichment_results.append((ARGS.control, otherDataset, comparisonDict, max_z_score)) |
| 817 | 809 |
| 818 def createOutputMaps(dataset1Name :str, dataset2Name :str, core_map :ET.ElementTree) -> None: | 810 return enrichment_results |
| 819 svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details = "SVG Map", ext = utils.FileFormat.SVG) | 811 |
| 812 def createOutputMaps(dataset1Name: str, dataset2Name: str, core_map: ET.ElementTree) -> None: | |
| 813 svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details="SVG Map", ext=utils.FileFormat.SVG) | |
| 820 utils.writeSvg(svgFilePath, core_map) | 814 utils.writeSvg(svgFilePath, core_map) |
| 821 | 815 |
| 822 if ARGS.generate_pdf: | 816 if ARGS.generate_pdf: |
| 823 pngPath = buildOutputPath(dataset1Name, dataset2Name, details = "PNG Map", ext = utils.FileFormat.PNG) | 817 pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG) |
| 824 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details = "PDF Map", ext = utils.FileFormat.PDF) | 818 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF) |
| 825 convert_to_pdf(svgFilePath, pngPath, pdfPath) | 819 convert_to_pdf(svgFilePath, pngPath, pdfPath) |
| 826 | 820 |
| 827 if not ARGS.generate_svg: os.remove(svgFilePath.show()) | 821 if not ARGS.generate_svg: |
| 822 os.remove(svgFilePath) | |
| 828 | 823 |
| 829 ClassPat = Dict[str, List[List[float]]] | 824 ClassPat = Dict[str, List[List[float]]] |
| 830 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]: | 825 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]: |
| 831 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate, | 826 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate, |
| 832 # for the sake of everyone's sanity. | 827 # for the sake of everyone's sanity. |
| 878 None | 873 None |
| 879 | 874 |
| 880 Raises: | 875 Raises: |
| 881 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError) | 876 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError) |
| 882 """ | 877 """ |
| 883 | |
| 884 global ARGS | 878 global ARGS |
| 885 ARGS = process_args() | 879 ARGS = process_args() |
| 886 | 880 |
| 887 if os.path.isdir('result') == False: os.makedirs('result') | 881 if not os.path.isdir('result'): |
| 888 | 882 os.makedirs('result') |
| 889 core_map :ET.ElementTree = ARGS.choice_map.getMap( | 883 |
| 884 core_map: ET.ElementTree = ARGS.choice_map.getMap( | |
| 890 ARGS.tool_dir, | 885 ARGS.tool_dir, |
| 891 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None) | 886 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None) |
| 892 # TODO: ^^^ ugly but fine for now, the argument is None if the model isn't custom because no file was given. | 887 |
| 893 # getMap will None-check the customPath and panic when the model IS custom but there's no file (good). A cleaner | |
| 894 # solution can be derived from my comment in FilePath.fromStrPath | |
| 895 | |
| 896 if ARGS.using_RAS: | 888 if ARGS.using_RAS: |
| 897 ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas, ARGS.input_data, ARGS.input_class, ARGS.names) | 889 ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas, ARGS.input_data, ARGS.input_class, ARGS.names) |
| 898 computeEnrichment(core_map, class_pat, ids) | 890 enrichment_results = computeEnrichment(core_map, class_pat, ids) |
| 891 for i, j, comparisonDict, max_z_score in enrichment_results: | |
| 892 map_copy = copy.deepcopy(core_map) | |
| 893 temp_thingsInCommon(comparisonDict, map_copy, max_z_score, i, j, fromRAS=True) | |
| 894 createOutputMaps(i, j, map_copy) | |
| 899 | 895 |
| 900 if ARGS.using_RPS: | 896 if ARGS.using_RPS: |
| 901 ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps) | 897 ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps) |
| 902 computeEnrichment(core_map, class_pat, ids, fromRAS = False) | 898 enrichment_results = computeEnrichment(core_map, class_pat, ids, fromRAS=False) |
| 903 | 899 for i, j, comparisonDict, max_z_score in enrichment_results: |
| 904 # create output files: TODO: this is the same comparison happening in "maps", find a better way to organize this | 900 map_copy = copy.deepcopy(core_map) |
| 905 if ARGS.comparison == "manyvsmany": | 901 temp_thingsInCommon(comparisonDict, map_copy, max_z_score, i, j, fromRAS=False) |
| 906 for i, j in it.combinations(class_pat.keys(), 2): createOutputMaps(i, j, core_map) | 902 createOutputMaps(i, j, map_copy) |
| 907 return | 903 |
| 908 | 904 print('Execution succeeded') |
| 909 if ARGS.comparison == "onevsrest": | |
| 910 for single_cluster in class_pat.keys(): createOutputMaps(single_cluster, "rest", core_map) | |
| 911 return | |
| 912 | |
| 913 for otherDataset in class_pat.keys(): | |
| 914 if otherDataset != ARGS.control: createOutputMaps(i, j, core_map) | |
| 915 | |
| 916 if not ERRORS: return | |
| 917 utils.logWarning( | |
| 918 f"The following reaction IDs were mentioned in the dataset but weren't found in the map: {ERRORS}", | |
| 919 ARGS.out_log) | |
| 920 | |
| 921 print('Execution succeded') | |
| 922 | |
| 923 ############################################################################### | 905 ############################################################################### |
| 924 if __name__ == "__main__": | 906 if __name__ == "__main__": |
| 925 main() | 907 main() |
