diff COBRAxy/flux_to_map.py @ 293:7b8d9de81a86 draft

Uploaded
author francesco_lapi
date Thu, 15 May 2025 18:23:52 +0000
parents e87aeb3a33cd
children 626b6d1de075
line wrap: on
line diff
--- a/COBRAxy/flux_to_map.py	Thu May 15 18:21:58 2025 +0000
+++ b/COBRAxy/flux_to_map.py	Thu May 15 18:23:52 2025 +0000
@@ -15,7 +15,7 @@
 import copy
 import argparse
 import pyvips
-from PIL import Image, ImageDraw, ImageFont
+from PIL import Image
 from typing import Tuple, Union, Optional, List, Dict
 import matplotlib.pyplot as plt
 
@@ -50,8 +50,15 @@
     parser.add_argument(
         '-co', '--comparison',
         type = str, 
-        default = '1vs1',
+        default = 'manyvsmany',
         choices = ['manyvsmany', 'onevsrest', 'onevsmany'])
+
+    parser.add_argument(
+        '-te' ,'--test',
+        type = str, 
+        default = 'ks', 
+        choices = ['ks', 'ttest_p', 'ttest_ind', 'wilcoxon', 'mw'],
+        help = 'Statistical test to use (default: %(default)s)')
     
     parser.add_argument(
         '-pv' ,'--pValue',
@@ -130,7 +137,7 @@
     args.net = True # TODO SICCOME I FLUSSI POSSONO ESSERE ANCHE NEGATIVI SONO SEMPRE CONSIDERATI NETTI
 
     return args
-          
+
 ############################ dataset input ####################################
 def read_dataset(data :str, name :str) -> pd.DataFrame:
     """
@@ -195,100 +202,6 @@
         return 'INF'
     else: # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1
         return (avg1 - avg2) / (abs(avg1) + abs(avg2))
-    
-def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str:
-    """
-    Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params.
-
-    Args:
-        l : current style string of an SVG element
-        col : new value for the "stroke" style property
-        width : new value for the "stroke-width" style property
-        dash : new value for the "stroke-dasharray" style property
-
-    Returns:
-        str : the fixed style string
-    """
-    tmp = l.split(';')
-    flag_col = False
-    flag_width = False
-    flag_dash = False
-    for i in range(len(tmp)):
-        if tmp[i].startswith('stroke:'):
-            tmp[i] = 'stroke:' + col
-            flag_col = True
-        if tmp[i].startswith('stroke-width:'):
-            tmp[i] = 'stroke-width:' + width
-            flag_width = True
-        if tmp[i].startswith('stroke-dasharray:'):
-            tmp[i] = 'stroke-dasharray:' + dash
-            flag_dash = True
-    if not flag_col:
-        tmp.append('stroke:' + col)
-    if not flag_width:
-        tmp.append('stroke-width:' + width)
-    if not flag_dash:
-        tmp.append('stroke-dasharray:' + dash)
-    return ';'.join(tmp)
-
-# The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix!
-def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree:
-    """
-    Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs.
-
-    Args:
-        d : dictionary mapping a p-value and a fold-change value (values) to each reaction ID as encoded in the SVG map (keys)
-        core_map : SVG map to modify
-        threshold_P_V : threshold for a p-value to be considered significant
-        threshold_F_C : threshold for a fold change value to be considered significant
-        max_z_score : highest z-score (absolute value)
-    
-    Returns:
-        ET.ElementTree : the modified core_map
-
-    Side effects:
-        core_map : mut
-    """
-    maxT = 12
-    minT = 2
-    grey = '#BEBEBE'
-    blue = '#6495ed' # azzurrino
-    red = '#ecac68' # arancione
-    for el in core_map.iter():
-        el_id = str(el.get('id'))
-        if el_id.startswith('R_'):
-            tmp = d.get(el_id[2:])
-            if tmp != None:
-                p_val :float = tmp[0]
-                f_c = tmp[1]
-                z_score = tmp[2]
-
-                if math.isnan(p_val) or (isinstance(f_c, float) and math.isnan(f_c)): continue
-                
-                if p_val < threshold_P_V:
-                    if not isinstance(f_c, str):
-                        if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # 
-                            col = grey
-                            width = str(minT)
-                        else:
-                            if f_c < 0:
-                                col = blue
-                            elif f_c > 0:
-                                col = red
-                            width = str(max((abs(z_score) * maxT) / max_z_score, minT))
-                    else:
-                        if f_c == '-INF':
-                            col = blue
-                        elif f_c == 'INF':
-                            col = red
-                        width = str(maxT)
-                    dash = 'none'
-                else:
-                    dash = '5,5'
-                    col = grey
-                    width = str(minT)
-                el.set('style', fix_style(el.get('style', ""), col, width, dash))
-    return core_map
 
 def getElementById(reactionId :str, metabMap :ET.ElementTree) -> utils.Result[ET.Element, utils.Result.ResultErr]:
     """
@@ -496,9 +409,11 @@
             continue
         
         width = Arrow.MAX_W
-        if not math.isinf(foldChange):
+        if not math.isinf(z_score):
             try: 
-                width = max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W) 
+                width = min(
+                    max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W),
+                    Arrow.MAX_W) 
 
             except ZeroDivisionError: pass
         # TODO CHECK RV
@@ -697,11 +612,26 @@
 
     Returns:
         tuple: (P-value, Z-score)
-            - P-value from a Kolmogorov-Smirnov test on the provided data.
+            - P-value from the selected test on the provided data.
             - Z-score of the difference between means of the two datasets.
     """
-    # Perform Kolmogorov-Smirnov test
-    ks_statistic, p_value = st.ks_2samp(dataset1Data, dataset2Data)
+
+    match ARGS.test:
+        case "ks":
+            # Perform Kolmogorov-Smirnov test
+            _, p_value = st.ks_2samp(dataset1Data, dataset2Data)
+        case "ttest_p":
+            # Perform t-test for paired samples
+            _, p_value = st.ttest_rel(dataset1Data, dataset2Data)
+        case "ttest_ind":
+            # Perform t-test for independent samples
+            _, p_value = st.ttest_ind(dataset1Data, dataset2Data)
+        case "wilcoxon":
+            # Perform Wilcoxon signed-rank test
+            _, p_value = st.wilcoxon(dataset1Data, dataset2Data)
+        case "mw":
+            # Perform Mann-Whitney U test
+            _, p_value = st.mannwhitneyu(dataset1Data, dataset2Data)
     
     # Calculate means and standard deviations
     mean1 = np.nanmean(dataset1Data)
@@ -732,7 +662,7 @@
             avg1 = sum(l1) / len(l1)
             avg2 = sum(l2) / len(l2)
             f_c = fold_change(avg1, avg2)
-            if not isinstance(z_score, str) and max_z_score < abs(z_score): max_z_score = abs(z_score)
+            if np.isfinite(z_score) and max_z_score < abs(z_score): max_z_score = abs(z_score)
             
             tmp[reactId] = [float(p_value), f_c, z_score, avg1, avg2]
         except (TypeError, ZeroDivisionError): continue
@@ -815,7 +745,7 @@
         classes = read_dataset(classPath, "class")
         classes = classes.astype(str)
         resolve_rules_float, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)")
-        #check if classes have mathc on ids
+        #check if classes have match on ids
         if not all(classes.iloc[:, 0].isin(ids)):
             utils.logWarning(
             "No match between classes and sample IDs", ARGS.out_log)
@@ -888,7 +818,7 @@
            for i, col in enumerate(dataset.columns)}
     })
 
-    print(new_rows)
+    #print(new_rows)
 
     # Ritorna il dataset originale con le nuove righe
     dataset.reset_index(inplace=True)
@@ -913,8 +843,6 @@
     rgb = (np.array(rgb) * 255).astype(int)
     return '#{:02x}{:02x}{:02x}'.format(rgb[0], rgb[1], rgb[2])
 
-
-
 def save_colormap_image(min_value: float, max_value: float, path: utils.FilePath, colorMap:str="viridis"):
     """
     Create and save an image of the colormap showing the gradient and its range.
@@ -1060,7 +988,6 @@
     if not ARGS.generate_svg:
         os.remove(svgFilePath.show())
 
-    
 ############################ MAIN #############################################
 def main(args:List[str] = None) -> None:
     """