changeset 165:244e2f302c68 draft

Uploaded
author francesco_lapi
date Wed, 24 Jul 2024 16:13:10 +0000
parents ce6b7021c3d1
children e19d829063b2
files marea_2/marea.py
diffstat 1 files changed, 48 insertions(+), 29 deletions(-) [+]
line wrap: on
line diff
--- a/marea_2/marea.py	Tue Jul 23 14:10:17 2024 +0000
+++ b/marea_2/marea.py	Wed Jul 24 16:13:10 2024 +0000
@@ -256,7 +256,7 @@
     return ';'.join(tmp)
 
 # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix!
-def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_F_C :float) -> ET.ElementTree:
+def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree:
     """
     Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs.
 
@@ -265,7 +265,7 @@
         core_map : SVG map to modify
         threshold_P_V : threshold for a p-value to be considered significant
         threshold_F_C : threshold for a fold change value to be considered significant
-        max_F_C : highest fold change (absolute value)
+        max_z_score : highest z-score (absolute value)
     
     Returns:
         ET.ElementTree : the modified core_map
@@ -276,8 +276,8 @@
     maxT = 12
     minT = 2
     grey = '#BEBEBE'
-    blue = '#0000FF'
-    red = '#E41A1C'
+    blue = '#6495ed'
+    red = '#ecac68'
     for el in core_map.iter():
         el_id = str(el.get('id'))
         if el_id.startswith('R_'):
@@ -285,6 +285,7 @@
             if tmp != None:
                 p_val :float = tmp[0]
                 f_c = tmp[1]
+                z_score = tmp[2]
                 if p_val < threshold_P_V:
                     if not isinstance(f_c, str):
                         if abs(f_c) < math.log(threshold_F_C, 2):
@@ -295,7 +296,7 @@
                                 col = blue
                             elif f_c > 0:
                                 col = red
-                            width = str(max((abs(f_c) * maxT) / max_F_C, minT))
+                            width = str(max((abs(z_score) * maxT) / max_z_score, minT))
                     else:
                         if f_c == '-INF':
                             col = blue
@@ -375,8 +376,8 @@
     Encodes possible arrow colors based on their meaning in the enrichment process.
     """
     Invalid       = "#BEBEBE" # gray, fold-change under treshold
-    UpRegulated   = "#E41A1C" # red, up-regulated reaction
-    DownRegulated = "#0000FF" # blue, down-regulated reaction
+    UpRegulated   = "#ecac68" # red, up-regulated reaction
+    DownRegulated = "#6495ed" # blue, down-regulated reaction
 
     UpRegulatedInv = "#FF7A00"
     # ^^^ different shade of red (actually orange), up-regulated net value for a reversible reaction with
@@ -463,14 +464,14 @@
 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid)
 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True)
 
-def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericFoldChange :float) -> None:
+def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None:
     """
     Applies RPS enrichment results to the provided metabolic map.
 
     Args:
         rpsEnrichmentRes : RPS enrichment results.
         metabMap : the metabolic map to edit.
-        maxNumericFoldChange : biggest finite fold-change value found.
+        maxNumericZScore : biggest finite z-score value found.
     
     Side effects:
         metabMap : mut
@@ -481,6 +482,7 @@
     for reactionId, values in rpsEnrichmentRes.items():
         pValue = values[0]
         foldChange = values[1]
+        z_score = values[2]
 
         if isinstance(foldChange, str): foldChange = float(foldChange)
         if pValue >= ARGS.pValue: # pValue above tresh: dashed arrow
@@ -493,7 +495,7 @@
         
         width = Arrow.MAX_W
         if not math.isinf(foldChange):
-            try: width = max(abs(foldChange * Arrow.MAX_W) / maxNumericFoldChange, Arrow.MIN_W)
+            try: width = max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W)
             except ZeroDivisionError: pass
         
         if not reactionId.endswith("_RV"): # RV stands for reversible reactions
@@ -669,19 +671,19 @@
 
     writeToCsv([ [reactId] + values for reactId, values in enrichedScores.items() ], fieldNames, outPath)
 
-def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_F_C :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None:
+def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None:
     # this function compiles the things always in common between comparison modes after enrichment.
     # TODO: organize, name better.
     writeTabularResult(tmp, ras_enrichment, buildOutputPath(dataset1Name, dataset2Name, details = "Tabular Result", ext = utils.FileFormat.TSV))
     
     if ras_enrichment:
-        fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_F_C)
+        fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_z_score)
         return
 
     for reactId, enrichData in tmp.items(): tmp[reactId] = tuple(enrichData)
-    applyRpsEnrichmentToMap(tmp, core_map, max_F_C)
+    applyRpsEnrichmentToMap(tmp, core_map, max_z_score)
 
-def computePValue(dataset1Data :List[float], dataset2Data :List[float]) -> float:
+def computePValue(dataset1Data: List[float], dataset2Data: List[float]) -> Tuple[float, float]:
     """
     Computes the statistical significance score (P-value) of the comparison between coherent data
     from two datasets. The data is supposed to, in both datasets:
@@ -694,15 +696,32 @@
         dataset2Data : data from the 2nd dataset.
 
     Returns:
-        float: P-value from a Kolmogorov-Smirnov test on the provided data.
+        tuple: (P-value, Z-score)
+            - P-value from a Kolmogorov-Smirnov test on the provided data.
+            - Z-score of the difference between means of the two datasets.
     """
-    return st.ks_2samp(dataset1Data, dataset2Data)[1]
+    # Perform Kolmogorov-Smirnov test
+    ks_statistic, p_value = st.ks_2samp(dataset1Data, dataset2Data)
+    
+    # Calculate means and standard deviations
+    mean1 = np.mean(dataset1Data)
+    mean2 = np.mean(dataset2Data)
+    std1 = np.std(dataset1Data, ddof=1)
+    std2 = np.std(dataset2Data, ddof=1)
+    
+    n1 = len(dataset1Data)
+    n2 = len(dataset2Data)
+    
+    # Calculate Z-score
+    z_score = (mean1 - mean2) / np.sqrt((std1**2 / n1) + (std2**2 / n2))
+    
+    return p_value, z_score
 
 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]:
     #TODO: the following code still suffers from "dumbvarnames-osis"
     tmp :Dict[str, List[Union[float, FoldChange]]] = {}
     count   = 0
-    max_F_C = 0
+    max_z_score = 0
 
     for l1, l2 in zip(dataset1Data, dataset2Data):
         reactId = ids[count]
@@ -719,13 +738,13 @@
                 nets1 = np.subtract(l1, dataset1Data[position])
                 nets2 = np.subtract(l2, dataset2Data[position])
 
-                p_value = computePValue(nets1, nets2)
+                p_value, z_score = computePValue(nets1, nets2)
                 avg1 = sum(nets1)   / len(nets1)
                 avg2 = sum(nets2)   / len(nets2)
                 net = fold_change(avg1, avg2)
                 
                 if math.isnan(net): continue
-                tmp[reactId[:-1] + "RV"] = [p_value, net, avg1, avg2]
+                tmp[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2]
                 
                 # vvv complementary directional ids are set to None once processed if net is to be applied to tips
                 if ARGS.net:
@@ -733,14 +752,14 @@
                     continue
 
             # fallthrough is intended, regular scores need to be computed when tips aren't net but RAS datasets aren't used
-            p_value = computePValue(l1, l2)
+            p_value, z_score = computePValue(l1, l2)
             avg = fold_change(sum(l1) / len(l1), sum(l2) / len(l2))
-            if not isinstance(avg, str) and max_F_C < abs(avg): max_F_C = abs(avg)
-            tmp[reactId] = [float(p_value), avg]
+            if not isinstance(z_score, str) and max_z_score < abs(z_score): max_z_score = abs(z_score)
+            tmp[reactId] = [float(p_value), avg, z_score]
         
         except (TypeError, ZeroDivisionError): continue
     
-    return tmp, max_F_C
+    return tmp, max_z_score
 
 def computeEnrichment(metabMap :ET.ElementTree, class_pat :Dict[str, List[List[float]]], ids :List[str], *, fromRAS = True) -> None:
     """
@@ -771,8 +790,8 @@
         for i, j in it.combinations(class_pat.keys(), 2):
             #TODO: these 2 functions are always called in pair and in this order and need common data,
             # some clever refactoring would be appreciated.
-            comparisonDict, max_F_C = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids)
-            temp_thingsInCommon(comparisonDict, metabMap, max_F_C, i, j, fromRAS)
+            comparisonDict, max_z_score = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids)
+            temp_thingsInCommon(comparisonDict, metabMap, max_z_score, i, j, fromRAS)
     
     elif ARGS.comparison == "onevsrest":
         for single_cluster in class_pat.keys():
@@ -785,16 +804,16 @@
             for i in t:
                 rest = rest + i
             
-            comparisonDict, max_F_C = compareDatasetPair(class_pat.get(single_cluster), rest, ids)
-            temp_thingsInCommon(comparisonDict, metabMap, max_F_C, single_cluster, fromRAS)
+            comparisonDict, max_z_score = compareDatasetPair(class_pat.get(single_cluster), rest, ids)
+            temp_thingsInCommon(comparisonDict, metabMap, max_z_score, single_cluster, fromRAS)
     
     elif ARGS.comparison == "onevsmany":
         controlItems = class_pat.get(ARGS.control)
         for otherDataset in class_pat.keys():
             if otherDataset == ARGS.control: continue
             
-            comparisonDict, max_F_C = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids)
-            temp_thingsInCommon(comparisonDict, metabMap, max_F_C, ARGS.control, otherDataset, fromRAS)
+            comparisonDict, max_z_score = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids)
+            temp_thingsInCommon(comparisonDict, metabMap, max_z_score, ARGS.control, otherDataset, fromRAS)
 
 def createOutputMaps(dataset1Name :str, dataset2Name :str, core_map :ET.ElementTree) -> None:
     svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details = "SVG Map", ext = utils.FileFormat.SVG)