Mercurial > repos > bimib > cobraxy
changeset 299:8ae278b4a5d5 draft default tip
Uploaded
author | francesco_lapi |
---|---|
date | Fri, 16 May 2025 15:47:05 +0000 |
parents | bc8cbaacafd0 |
children | |
files | COBRAxy/marea.py |
diffstat | 1 files changed, 62 insertions(+), 23 deletions(-) [+] |
line wrap: on
line diff
--- a/COBRAxy/marea.py Fri May 16 10:57:02 2025 +0000 +++ b/COBRAxy/marea.py Fri May 16 15:47:05 2025 +0000 @@ -392,7 +392,8 @@ """ Encodes possible arrow colors based on their meaning in the enrichment process. """ - Invalid = "#BEBEBE" # gray, fold-change under treshold + Invalid = "#BEBEBE" # gray, fold-change under treshold or not significant p-value + Transparent = "#ffffff00" # transparent, to make some arrow segments disappear UpRegulated = "#ecac68" # orange, up-regulated reaction DownRegulated = "#6495ed" # lightblue, down-regulated reaction @@ -479,8 +480,9 @@ # vvv These constants could be inside the class itself a static properties, but python # was built by brainless organisms so here we are! -INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid) +INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid) INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True) +TRANSPARENT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Transparent) # Who cares how big it is if it's transparent # TODO: A more general version of this can be used for RAS as well, we don't need "fix map" or whatever def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None: @@ -710,12 +712,18 @@ # Perform Kolmogorov-Smirnov test _, p_value = st.ks_2samp(dataset1Data, dataset2Data) case "ttest_p": + # Datasets should have same size + if len(dataset1Data) != len(dataset2Data): + raise ValueError("Datasets must have the same size for paired t-test.") # Perform t-test for paired samples _, p_value = st.ttest_rel(dataset1Data, dataset2Data) case "ttest_ind": # Perform t-test for independent samples _, p_value = st.ttest_ind(dataset1Data, dataset2Data) case "wilcoxon": + # Datasets should have same size + if len(dataset1Data) != len(dataset2Data): + raise ValueError("Datasets must have the same size for Wilcoxon signed-rank test.") # Perform Wilcoxon signed-rank test _, p_value = st.wilcoxon(dataset1Data, dataset2Data) case "mw": @@ -736,8 +744,10 @@ return p_value, z_score -def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]: +# TODO: the net RPS computation should be done in the RPS module +def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float, Dict[str, Tuple[np.ndarray, np.ndarray]]]: #TODO: the following code still suffers from "dumbvarnames-osis" + netRPS :Dict[str, Tuple[np.ndarray, np.ndarray]] = {} comparisonResult :Dict[str, List[Union[float, FoldChange]]] = {} count = 0 max_z_score = 0 @@ -756,6 +766,7 @@ nets1 = np.subtract(l1, dataset1Data[position]) nets2 = np.subtract(l2, dataset2Data[position]) + netRPS[reactId] = (nets1, nets2) p_value, z_score = computePValue(nets1, nets2) avg1 = sum(nets1) / len(nets1) @@ -787,13 +798,13 @@ pValues = [comparisonResult[reactId][0] for reactId in reactIds] # Apply the Benjamini-Hochberg correction and update - adjustedPValues = st.multipletests(pValues, method="fdr_bh")[1] + adjustedPValues = st.false_discovery_control(pValues)[1] for i, reactId in enumerate(reactIds): comparisonResult[reactId][0] = adjustedPValues[i] - return comparisonResult, max_z_score + return comparisonResult, max_z_score, netRPS -def computeEnrichment(class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> List[Tuple[str, str, dict, float]]: +def computeEnrichment(class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> Tuple[List[Tuple[str, str, dict, float]], dict]: """ Compares clustered data based on a given comparison mode and applies enrichment-based styling on the provided metabolic map. @@ -804,8 +815,10 @@ fromRAS : whether the data to enrich consists of RAS scores. Returns: - List[Tuple[str, str, dict, float]]: List of tuples with pairs of dataset names, comparison dictionary, and max z-score. - + tuple: A tuple containing: + - List[Tuple[str, str, dict, float]]: List of tuples with pairs of dataset names, comparison dictionary and max z-score. + - dict : net RPS values for each dataset's reactions + Raises: sys.exit : if there are less than 2 classes for comparison """ @@ -813,28 +826,37 @@ if (not class_pat) or (len(class_pat.keys()) < 2): sys.exit('Execution aborted: classes provided for comparisons are less than two\n') + # { datasetName : { reactId : netRPS, ... }, ... } + netRPSResults :Dict[str, Dict[str, np.ndarray]] = {} enrichment_results = [] if ARGS.comparison == "manyvsmany": for i, j in it.combinations(class_pat.keys(), 2): - comparisonDict, max_z_score = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids) + comparisonDict, max_z_score, netRPS = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids) enrichment_results.append((i, j, comparisonDict, max_z_score)) + netRPSResults[i] = { reactId : net[0] for reactId, net in netRPS.items() } + netRPSResults[j] = { reactId : net[1] for reactId, net in netRPS.items() } elif ARGS.comparison == "onevsrest": for single_cluster in class_pat.keys(): rest = [item for k, v in class_pat.items() if k != single_cluster for item in v] - comparisonDict, max_z_score = compareDatasetPair(class_pat.get(single_cluster), rest, ids) + comparisonDict, max_z_score, netRPS = compareDatasetPair(class_pat.get(single_cluster), rest, ids) enrichment_results.append((single_cluster, "rest", comparisonDict, max_z_score)) + netRPSResults[single_cluster] = { reactId : net[0] for reactId, net in netRPS.items() } + netRPSResults["rest"] = { reactId : net[1] for reactId, net in netRPS.items() } elif ARGS.comparison == "onevsmany": controlItems = class_pat.get(ARGS.control) for otherDataset in class_pat.keys(): if otherDataset == ARGS.control: continue - comparisonDict, max_z_score = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids) + + comparisonDict, max_z_score, netRPS = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids) enrichment_results.append((ARGS.control, otherDataset, comparisonDict, max_z_score)) + netRPSResults[ARGS.control] = { reactId : net[0] for reactId, net in netRPS.items() } + netRPSResults[otherDataset] = { reactId : net[1] for reactId, net in netRPS.items() } - return enrichment_results + return enrichment_results, netRPSResults def createOutputMaps(dataset1Name: str, dataset2Name: str, core_map: ET.ElementTree) -> None: svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details="SVG Map", ext=utils.FileFormat.SVG) @@ -857,9 +879,10 @@ os.remove(svgFilePath.show()) ClassPat = Dict[str, List[List[float]]] -def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]: +def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat, Dict[str, List[str]]]: # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate, # for the sake of everyone's sanity. + columnNames :Dict[str, List[str]] = {} # { datasetName : [ columnName, ... ], ... } class_pat :ClassPat = {} if ARGS.option == 'datasets': num = 1 @@ -870,8 +893,8 @@ values, ids = getDatasetValues(path, name) if values != None: - class_pat[name] = list(map(list, zip(*values.values()))) - # TODO: ??? + class_pat[name] = list(map(list, zip(*values.values()))) # TODO: ??? + columnNames[name] = list(values.keys()) num += 1 @@ -880,9 +903,11 @@ classes = classes.astype(str) values, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)") - if values != None: class_pat = split_class(classes, values) + if values != None: + # TODO: add the columnNames thing, I didn't because I don't understand the whole "dataset classes" thing + class_pat = split_class(classes, values) - return ids, class_pat + return ids, class_pat, columnNames #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs) #TODO: create these damn args as FilePath objects @@ -932,22 +957,25 @@ rps_results = [] # Compute RAS enrichment if requested - if ARGS.using_RAS: - ids_ras, class_pat_ras = getClassesAndIdsFromDatasets( + if ARGS.using_RAS: # vvv columnNames only matter with RPS data + ids_ras, class_pat_ras, _ = getClassesAndIdsFromDatasets( ARGS.input_datas, ARGS.input_data, ARGS.input_class, ARGS.names) - ras_results = computeEnrichment(class_pat_ras, ids_ras, fromRAS=True) + ras_results, _ = computeEnrichment(class_pat_ras, ids_ras, fromRAS=True) + # ^^^ netRPS only matter with RPS data # Compute RPS enrichment if requested if ARGS.using_RPS: - ids_rps, class_pat_rps = getClassesAndIdsFromDatasets( + ids_rps, class_pat_rps, columnNames = getClassesAndIdsFromDatasets( ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps) - rps_results = computeEnrichment(class_pat_rps, ids_rps, fromRAS=False) + + rps_results, netRPS = computeEnrichment(class_pat_rps, ids_rps, fromRAS=False) # Organize by comparison pairs comparisons: Dict[Tuple[str, str], Dict[str, Tuple]] = {} for i, j, comparison_data, max_z_score in ras_results: comparisons[(i, j)] = {'ras': (comparison_data, max_z_score), 'rps': None} - for i, j, comparison_data, max_z_score in rps_results: + + for i, j, comparison_data, max_z_score, in rps_results: comparisons.setdefault((i, j), {}).update({'rps': (comparison_data, max_z_score)}) # For each comparison, create a styled map with RAS bodies and RPS heads @@ -967,6 +995,17 @@ # Output both SVG and PDF/PNG as configured createOutputMaps(i, j, map_copy) + + # Add net RPS output file + if ARGS.net or not ARGS.using_RAS: + for datasetName, rows in netRPS.items(): + writeToCsv( + [[reactId, *netValues] for reactId, netValues in rows.items()], + # vvv In weird comparison modes the dataset names are not recorded properly.. + columnNames.get(datasetName, ["Reactions", "other cool column names.."]), + utils.FilePath( + datasetName, ext = utils.FileFormat.CSV, prefix = ARGS.output_path)) + print('Execution succeeded') ############################################################################### if __name__ == "__main__":