changeset 299:8ae278b4a5d5 draft default tip

Uploaded
author francesco_lapi
date Fri, 16 May 2025 15:47:05 +0000
parents bc8cbaacafd0
children
files COBRAxy/marea.py
diffstat 1 files changed, 62 insertions(+), 23 deletions(-) [+]
line wrap: on
line diff
--- a/COBRAxy/marea.py	Fri May 16 10:57:02 2025 +0000
+++ b/COBRAxy/marea.py	Fri May 16 15:47:05 2025 +0000
@@ -392,7 +392,8 @@
     """
     Encodes possible arrow colors based on their meaning in the enrichment process.
     """
-    Invalid       = "#BEBEBE" # gray, fold-change under treshold
+    Invalid       = "#BEBEBE" # gray, fold-change under treshold or not significant p-value
+    Transparent   = "#ffffff00" # transparent, to make some arrow segments disappear
     UpRegulated   = "#ecac68" # orange, up-regulated reaction
     DownRegulated = "#6495ed" # lightblue, down-regulated reaction
 
@@ -479,8 +480,9 @@
 
 # vvv These constants could be inside the class itself a static properties, but python
 # was built by brainless organisms so here we are!
-INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid)
+INVALID_ARROW       = Arrow(Arrow.MIN_W, ArrowColor.Invalid)
 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True)
+TRANSPARENT_ARROW   = Arrow(Arrow.MIN_W, ArrowColor.Transparent) # Who cares how big it is if it's transparent
 
 # TODO: A more general version of this can be used for RAS as well, we don't need "fix map" or whatever
 def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None:
@@ -710,12 +712,18 @@
             # Perform Kolmogorov-Smirnov test
             _, p_value = st.ks_2samp(dataset1Data, dataset2Data)
         case "ttest_p":
+            # Datasets should have same size
+            if len(dataset1Data) != len(dataset2Data):
+                raise ValueError("Datasets must have the same size for paired t-test.")
             # Perform t-test for paired samples
             _, p_value = st.ttest_rel(dataset1Data, dataset2Data)
         case "ttest_ind":
             # Perform t-test for independent samples
             _, p_value = st.ttest_ind(dataset1Data, dataset2Data)
         case "wilcoxon":
+            # Datasets should have same size
+            if len(dataset1Data) != len(dataset2Data):
+                raise ValueError("Datasets must have the same size for Wilcoxon signed-rank test.")
             # Perform Wilcoxon signed-rank test
             _, p_value = st.wilcoxon(dataset1Data, dataset2Data)
         case "mw":
@@ -736,8 +744,10 @@
     
     return p_value, z_score
 
-def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]:
+# TODO: the net RPS computation should be done in the RPS module
+def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float, Dict[str, Tuple[np.ndarray, np.ndarray]]]:
     #TODO: the following code still suffers from "dumbvarnames-osis"
+    netRPS :Dict[str, Tuple[np.ndarray, np.ndarray]] = {}
     comparisonResult :Dict[str, List[Union[float, FoldChange]]] = {}
     count   = 0
     max_z_score = 0
@@ -756,6 +766,7 @@
 
                 nets1 = np.subtract(l1, dataset1Data[position])
                 nets2 = np.subtract(l2, dataset2Data[position])
+                netRPS[reactId] = (nets1, nets2)
 
                 p_value, z_score = computePValue(nets1, nets2)
                 avg1 = sum(nets1)   / len(nets1)
@@ -787,13 +798,13 @@
         pValues = [comparisonResult[reactId][0] for reactId in reactIds]
         
         # Apply the Benjamini-Hochberg correction and update
-        adjustedPValues = st.multipletests(pValues, method="fdr_bh")[1]
+        adjustedPValues = st.false_discovery_control(pValues)[1]
         for i, reactId in enumerate(reactIds):
             comparisonResult[reactId][0] = adjustedPValues[i]
 
-    return comparisonResult, max_z_score
+    return comparisonResult, max_z_score, netRPS
 
-def computeEnrichment(class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> List[Tuple[str, str, dict, float]]:
+def computeEnrichment(class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> Tuple[List[Tuple[str, str, dict, float]], dict]:
     """
     Compares clustered data based on a given comparison mode and applies enrichment-based styling on the
     provided metabolic map.
@@ -804,8 +815,10 @@
         fromRAS : whether the data to enrich consists of RAS scores.
 
     Returns:
-        List[Tuple[str, str, dict, float]]: List of tuples with pairs of dataset names, comparison dictionary, and max z-score.
-        
+        tuple: A tuple containing:
+        - List[Tuple[str, str, dict, float]]: List of tuples with pairs of dataset names, comparison dictionary and max z-score.
+        - dict : net RPS values for each dataset's reactions
+    
     Raises:
         sys.exit : if there are less than 2 classes for comparison
     """
@@ -813,28 +826,37 @@
     if (not class_pat) or (len(class_pat.keys()) < 2):
         sys.exit('Execution aborted: classes provided for comparisons are less than two\n')
     
+    # { datasetName : { reactId : netRPS, ... }, ... }
+    netRPSResults :Dict[str, Dict[str, np.ndarray]] = {}
     enrichment_results = []
 
     if ARGS.comparison == "manyvsmany":
         for i, j in it.combinations(class_pat.keys(), 2):
-            comparisonDict, max_z_score = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids)
+            comparisonDict, max_z_score, netRPS = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids)
             enrichment_results.append((i, j, comparisonDict, max_z_score))
+            netRPSResults[i] = { reactId : net[0] for reactId, net in netRPS.items() }
+            netRPSResults[j] = { reactId : net[1] for reactId, net in netRPS.items() }
     
     elif ARGS.comparison == "onevsrest":
         for single_cluster in class_pat.keys():
             rest = [item for k, v in class_pat.items() if k != single_cluster for item in v]
-            comparisonDict, max_z_score = compareDatasetPair(class_pat.get(single_cluster), rest, ids)
+            comparisonDict, max_z_score, netRPS = compareDatasetPair(class_pat.get(single_cluster), rest, ids)
             enrichment_results.append((single_cluster, "rest", comparisonDict, max_z_score))
+            netRPSResults[single_cluster] = { reactId : net[0] for reactId, net in netRPS.items() }
+            netRPSResults["rest"]         = { reactId : net[1] for reactId, net in netRPS.items() }
     
     elif ARGS.comparison == "onevsmany":
         controlItems = class_pat.get(ARGS.control)
         for otherDataset in class_pat.keys():
             if otherDataset == ARGS.control:
                 continue
-            comparisonDict, max_z_score = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids)
+            
+            comparisonDict, max_z_score, netRPS = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids)
             enrichment_results.append((ARGS.control, otherDataset, comparisonDict, max_z_score))
+            netRPSResults[ARGS.control] = { reactId : net[0] for reactId, net in netRPS.items() }
+            netRPSResults[otherDataset] = { reactId : net[1] for reactId, net in netRPS.items() }
     
-    return enrichment_results
+    return enrichment_results, netRPSResults
 
 def createOutputMaps(dataset1Name: str, dataset2Name: str, core_map: ET.ElementTree) -> None:
     svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details="SVG Map", ext=utils.FileFormat.SVG)
@@ -857,9 +879,10 @@
         os.remove(svgFilePath.show())
 
 ClassPat = Dict[str, List[List[float]]]
-def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]:
+def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat, Dict[str, List[str]]]:
     # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate,
     # for the sake of everyone's sanity.
+    columnNames :Dict[str, List[str]] = {} # { datasetName : [ columnName, ... ], ... }
     class_pat :ClassPat = {}
     if ARGS.option == 'datasets':
         num = 1
@@ -870,8 +893,8 @@
             
             values, ids = getDatasetValues(path, name)
             if values != None:
-                class_pat[name] = list(map(list, zip(*values.values())))
-                # TODO: ???
+                class_pat[name]   = list(map(list, zip(*values.values()))) # TODO: ???
+                columnNames[name] = list(values.keys())
             
             num += 1
     
@@ -880,9 +903,11 @@
         classes = classes.astype(str)
 
         values, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)")
-        if values != None: class_pat = split_class(classes, values)
+        if values != None:
+            # TODO: add the columnNames thing, I didn't because I don't understand the whole "dataset classes" thing
+            class_pat = split_class(classes, values)
     
-    return ids, class_pat
+    return ids, class_pat, columnNames
     #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs)
 
 #TODO: create these damn args as FilePath objects
@@ -932,22 +957,25 @@
     rps_results = []
 
     # Compute RAS enrichment if requested
-    if ARGS.using_RAS:
-        ids_ras, class_pat_ras = getClassesAndIdsFromDatasets(
+    if ARGS.using_RAS: #       vvv columnNames only matter with RPS data
+        ids_ras, class_pat_ras, _ = getClassesAndIdsFromDatasets(
             ARGS.input_datas, ARGS.input_data, ARGS.input_class, ARGS.names)
-        ras_results = computeEnrichment(class_pat_ras, ids_ras, fromRAS=True)
+        ras_results, _ = computeEnrichment(class_pat_ras, ids_ras, fromRAS=True)
+        #           ^^^ netRPS only matter with RPS data
 
     # Compute RPS enrichment if requested
     if ARGS.using_RPS:
-        ids_rps, class_pat_rps = getClassesAndIdsFromDatasets(
+        ids_rps, class_pat_rps, columnNames = getClassesAndIdsFromDatasets(
             ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps)
-        rps_results = computeEnrichment(class_pat_rps, ids_rps, fromRAS=False)
+        
+        rps_results, netRPS = computeEnrichment(class_pat_rps, ids_rps, fromRAS=False)
 
     # Organize by comparison pairs
     comparisons: Dict[Tuple[str, str], Dict[str, Tuple]] = {}
     for i, j, comparison_data, max_z_score in ras_results:
         comparisons[(i, j)] = {'ras': (comparison_data, max_z_score), 'rps': None}
-    for i, j, comparison_data, max_z_score in rps_results:
+    
+    for i, j, comparison_data, max_z_score,  in rps_results:
         comparisons.setdefault((i, j), {}).update({'rps': (comparison_data, max_z_score)})
 
     # For each comparison, create a styled map with RAS bodies and RPS heads
@@ -967,6 +995,17 @@
 
         # Output both SVG and PDF/PNG as configured
         createOutputMaps(i, j, map_copy)
+    
+    # Add net RPS output file
+    if ARGS.net or not ARGS.using_RAS:
+        for datasetName, rows in netRPS.items():
+            writeToCsv(
+                [[reactId, *netValues] for reactId, netValues in rows.items()],
+                # vvv In weird comparison modes the dataset names are not recorded properly..
+                columnNames.get(datasetName, ["Reactions", "other cool column names.."]),
+                utils.FilePath(
+                    datasetName, ext = utils.FileFormat.CSV, prefix = ARGS.output_path))
+
     print('Execution succeeded')
 ###############################################################################
 if __name__ == "__main__":