changeset 291:7f3e66dd46fa draft default tip

Uploaded
author francesco_lapi
date Wed, 14 May 2025 11:26:59 +0000
parents 71ec0b6ddde9
children
files COBRAxy/marea.py
diffstat 1 files changed, 94 insertions(+), 99 deletions(-) [+]
line wrap: on
line diff
--- a/COBRAxy/marea.py	Wed May 14 10:43:04 2025 +0000
+++ b/COBRAxy/marea.py	Wed May 14 11:26:59 2025 +0000
@@ -48,7 +48,7 @@
     parser.add_argument(
         '-co', '--comparison',
         type = str, 
-        default = '1vs1',
+        default = 'manyvsmany',
         choices = ['manyvsmany', 'onevsrest', 'onevsmany'])
     
     parser.add_argument(
@@ -185,23 +185,6 @@
         sys.exit('Execution aborted: wrong format of ' + name + '\n')
     return dataset
 
-############################ dataset name #####################################
-def name_dataset(name_data :str, count :int) -> str:
-    """
-    Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique.
-
-    Args:
-        name_data : name associated with the dataset (from frontend input params)
-        count : counter from 1 to make these names unique (external)
-
-    Returns:
-        str : the name made unique
-    """
-    if str(name_data) == 'Dataset':
-        return str(name_data) + '_' + str(count)
-    else:
-        return str(name_data)
-
 ############################ map_methods ######################################
 FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]]
 def fold_change(avg1 :float, avg2 :float) -> FoldChange:
@@ -220,13 +203,17 @@
     """
     if avg1 == 0 and avg2 == 0:
         return 0
-    elif avg1 == 0:
-        return '-INF'
-    elif avg2 == 0:
+    
+    if avg1 == 0:
+        return '-INF' # TODO: maybe fix
+    
+    if avg2 == 0:
         return 'INF'
-    else: # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1
-        return (avg1 - avg2) / (abs(avg1) + abs(avg2))
     
+    # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1
+    return (avg1 - avg2) / (abs(avg1) + abs(avg2))
+
+# TODO: I would really like for this one to get the Thanos treatment
 def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str:
     """
     Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params.
@@ -262,6 +249,7 @@
         tmp.append('stroke-dasharray:' + dash)
     return ';'.join(tmp)
 
+# TODO: remove, there's applyRPS whatever
 # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix!
 def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree:
     """
@@ -290,31 +278,33 @@
         if el_id.startswith('R_'):
             tmp = d.get(el_id[2:])
             if tmp != None:
-                p_val :float = tmp[0]
-                f_c = tmp[1]
-                z_score = tmp[2]
+                p_val, f_c, z_score, avg1, avg2 = tmp
                 
                 if math.isnan(p_val) or (isinstance(f_c, float) and math.isnan(f_c)): continue
 
-                if p_val <= threshold_P_V:
-                    if not isinstance(f_c, str):
-                        if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # 
+                if p_val <= threshold_P_V: # p-value is OK
+                    if not isinstance(f_c, str): # FC is finite
+                        if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # FC is not OK
                             col = grey
                             width = str(minT)
-                        else:
+                        else: # FC is OK
                             if f_c < 0:
                                 col = blue
                             elif f_c > 0:
                                 col = red
-                            width = str(max((abs(z_score) * maxT) / max_z_score, minT))
-                    else:
+                            width = str(
+                                min(
+                                    max(abs(z_score * maxT) / max_z_score, minT),
+                                    maxT))
+                    
+                    else: # FC is infinite
                         if f_c == '-INF':
                             col = blue
                         elif f_c == 'INF':
                             col = red
                         width = str(maxT)
                     dash = 'none'
-                else:
+                else: # p-value is not OK
                     dash = '5,5'
                     col = grey
                     width = str(minT)
@@ -342,10 +332,13 @@
 def styleMapElement(element :ET.Element, styleStr :str) -> None:
     currentStyles :str = element.get("style", "")
     if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles):
-        currentStyles = ';'.join(currentStyles.split(';')[:-3])
+        currentStyles = ';'.join(currentStyles.split(';')[:-3]) # TODO: why the last 3? Are we sure?
+
+    #TODO: this is attempting to solve the styling override problem, not sure it does tho
 
     element.set("style", currentStyles + styleStr)
 
+# TODO: maybe remove vvv
 class ReactionDirection(Enum):
     Unknown = ""
     Direct  = "_F"
@@ -378,7 +371,9 @@
         Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try.
     """
     if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
-    elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: return reactionId[:-3:-1] + reactionId[:-2], ""
+    elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown:
+        return reactionId[:-3:-1] + reactionId[:-2], "" # ^^^ Invert _F to F_
+
     return f"F_{reactionId}", f"B_{reactionId}"
 
 class ArrowColor(Enum):
@@ -386,15 +381,15 @@
     Encodes possible arrow colors based on their meaning in the enrichment process.
     """
     Invalid       = "#BEBEBE" # gray, fold-change under treshold
-    UpRegulated   = "#ecac68" # red, up-regulated reaction
-    DownRegulated = "#6495ed" # blue, down-regulated reaction
+    UpRegulated   = "#ecac68" # orange, up-regulated reaction
+    DownRegulated = "#6495ed" # lightblue, down-regulated reaction
 
     UpRegulatedInv = "#FF0000"
-    # ^^^ different shade of red (actually orange), up-regulated net value for a reversible reaction with
+    # ^^^ bright red, up-regulated net value for a reversible reaction with
     # conflicting enrichment in the two directions.
 
     DownRegulatedInv = "#0000FF"
-    # ^^^ different shade of blue (actually purple), down-regulated net value for a reversible reaction with
+    # ^^^ bright blue, down-regulated net value for a reversible reaction with
     # conflicting enrichment in the two directions.
 
     @classmethod
@@ -441,6 +436,7 @@
         self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True))
         if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True))
     
+    # TODO: this seems to be unused, remove
     def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str:
         """
         Computes the reaction ID as encoded in the map for a given reaction ID from the dataset.
@@ -474,6 +470,7 @@
 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid)
 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True)
 
+# TODO: A more general version of this can be used for RAS as well, we don't need "fix map" or whatever
 def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None:
     """
     Applies RPS enrichment results to the provided metabolic map.
@@ -501,13 +498,16 @@
             INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId)
             continue
 
-        if abs(foldChange) <  (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1):
+        if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1):
             INVALID_ARROW.styleReactionElements(metabMap, reactionId)
             continue
         
         width = Arrow.MAX_W
         if not math.isinf(foldChange):
-            try: width = max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W)
+            try: width = min(
+                max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W),
+                Arrow.MAX_W)
+            
             except ZeroDivisionError: pass
         
         if not reactionId.endswith("_RV"): # RV stands for reversible reactions
@@ -518,7 +518,6 @@
         
         inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score
         if inversionScore == 2: foldChange *= -1
-        # ^^^ Style the inverse direction with the opposite sign netValue
         
         # If the score is 1 (opposite signs) we use alternative colors vvv
         arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1))
@@ -531,13 +530,13 @@
             arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False)
 
 ############################ split class ######################################
-def split_class(classes :pd.DataFrame, resolve_rules :Dict[str, List[float]]) -> Dict[str, List[List[float]]]:
+def split_class(classes :pd.DataFrame, dataset_values :Dict[str, List[float]]) -> Dict[str, List[List[float]]]:
     """
     Generates a :dict that groups together data from a :DataFrame based on classes the data is related to.
 
     Args:
         classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict
-        resolve_rules : a :dict containing :float data
+        dataset_values : a :dict containing :float data
 
     Returns:
         dict : the dict with data grouped by class
@@ -553,11 +552,11 @@
         l :List[List[float]] = []
         for j in range(i, len(classes)):
             if classes.iloc[j, 1] == classe:
-                pat_id :str = classes.iloc[j, 0]
-                tmp = resolve_rules.get(pat_id, None)
-                if tmp != None:
-                    l.append(tmp)
-                classes.iloc[j, 1] = None
+                pat_id :str = classes.iloc[j, 0] # sample name
+                values = dataset_values.get(pat_id, None) # the column of values for that sample
+                if values != None:
+                    l.append(values)
+                classes.iloc[j, 1] = None # TODO: problems?
         
         if l:
             class_pat[classe] = list(map(list, zip(*l)))
@@ -600,24 +599,6 @@
     composite_image = white_background.composite2(image, 'over')
     composite_image.write_to_file(png_path.show())
 
-#funzione unica, lascio fuori i file e li passo in input
-#conversion from png to pdf
-def convert_png_to_pdf(png_file :utils.FilePath, pdf_file :utils.FilePath) -> None:
-    """
-    Internal utility to convert a PNG to PDF to aid from SVG conversion.
-
-    Args:
-        png_file : path to PNG file
-        pdf_file : path to new PDF file
-
-    Returns:
-        None
-    """
-    image = Image.open(png_file.show())
-    image = image.convert("RGB")
-    image.save(pdf_file.show(), "PDF", resolution=100.0)
-
-#function called to reduce redundancy in the code
 def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None:
     """
     Converts the SVG map at the provided path to PDF.
@@ -632,7 +613,9 @@
     """
     svg_to_png_with_background(file_svg, file_png)
     try:
-        convert_png_to_pdf(file_png, file_pdf)
+        image = Image.open(file_png.show())
+        image = image.convert("RGB")
+        image.save(file_pdf.show(), "PDF", resolution=100.0)
         print(f'PDF file {file_pdf.filePath} successfully generated.')
     
     except Exception as e:
@@ -677,18 +660,14 @@
             writer.writerow({ field : data for field, data in zip(fieldNames, row) })
 
 OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible
-def writeTabularResult(enrichedScores : OldEnrichedScores, ras_enrichment: bool, outPath :utils.FilePath) -> None:
-    fieldNames = ["ids", "P_Value", "fold change", "average_1", "average_2"]
-
-    writeToCsv([ [reactId] + values for reactId, values in enrichedScores.items() ], fieldNames, outPath)
-
-def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None:
+def temp_thingsInCommon(tmp :OldEnrichedScores, core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None:
     # this function compiles the things always in common between comparison modes after enrichment.
     # TODO: organize, name better.
     suffix = "RAS" if ras_enrichment else "RPS"
-    details = f"Tabular Result ({suffix})"
-
-    writeTabularResult(tmp, ras_enrichment, buildOutputPath(dataset1Name, dataset2Name, details = details, ext = utils.FileFormat.TSV))
+    writeToCsv(
+        [ [reactId] + values for reactId, values in tmp.items() ],
+        ["ids", "P_Value", "fold change", "average_1", "average_2"],
+        buildOutputPath(dataset1Name, dataset2Name, details = f"Tabular Result ({suffix})", ext = utils.FileFormat.TSV))
     
     if ras_enrichment:
         fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_z_score)
@@ -733,7 +712,7 @@
 
 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]:
     #TODO: the following code still suffers from "dumbvarnames-osis"
-    tmp :Dict[str, List[Union[float, FoldChange]]] = {}
+    comparisonResult :Dict[str, List[Union[float, FoldChange]]] = {}
     count   = 0
     max_z_score = 0
 
@@ -758,22 +737,23 @@
                 net = fold_change(avg1, avg2)
                 
                 if math.isnan(net): continue
-                tmp[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2]
+                comparisonResult[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2]
                 
                 # vvv complementary directional ids are set to None once processed if net is to be applied to tips
-                if ARGS.net:
+                if ARGS.net: # If only using RPS, we cannot delete the inverse, as it's needed to color the arrows
                     ids[position] = None
                     continue
 
             # fallthrough is intended, regular scores need to be computed when tips aren't net but RAS datasets aren't used
             p_value, z_score = computePValue(l1, l2)
             avg = fold_change(sum(l1) / len(l1), sum(l2) / len(l2))
-            if not isinstance(z_score, str) and max_z_score < abs(z_score): max_z_score = abs(z_score)
-            tmp[reactId] = [float(p_value), avg, z_score, sum(l1) / len(l1), sum(l2) / len(l2)]
+            # vvv TODO: Check numpy version compatibility
+            if np.isfinite(z_score) and max_z_score < abs(z_score): max_z_score = abs(z_score)
+            comparisonResult[reactId] = [float(p_value), avg, z_score, sum(l1) / len(l1), sum(l2) / len(l2)]
         
         except (TypeError, ZeroDivisionError): continue
     
-    return tmp, max_z_score
+    return comparisonResult, max_z_score
 
 def computeEnrichment(class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> List[Tuple[str, str, dict, float]]:
     """
@@ -825,9 +805,17 @@
     if ARGS.generate_pdf:
         pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG)
         pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF)
-        convert_to_pdf(svgFilePath, pngPath, pdfPath)
+        svg_to_png_with_background(svgFilePath, pngPath)
+        try:
+            image = Image.open(pngPath.show())
+            image = image.convert("RGB")
+            image.save(pdfPath.show(), "PDF", resolution=100.0)
+            print(f'PDF file {pdfPath.filePath} successfully generated.')
+        
+        except Exception as e:
+            raise utils.DataErr(pdfPath.show(), f'Error generating PDF file: {e}')
 
-    if not ARGS.generate_svg:
+    if not ARGS.generate_svg: # This argument is useless, who cares if the user wants the svg or not
         os.remove(svgFilePath.show())
 
 ClassPat = Dict[str, List[List[float]]]
@@ -836,21 +824,25 @@
     # for the sake of everyone's sanity.
     class_pat :ClassPat = {}
     if ARGS.option == 'datasets':
-        num = 1 #TODO: the dataset naming function could be a generator
+        num = 1
         for path, name in zip(datasetsPaths, names):
-            name = name_dataset(name, num)
-            resolve_rules_float, ids = getDatasetValues(path, name)
-            if resolve_rules_float != None:
-                class_pat[name] = list(map(list, zip(*resolve_rules_float.values())))
-        
+            name = str(name)
+            if name == 'Dataset':
+                name += '_' + str(num)
+            
+            values, ids = getDatasetValues(path, name)
+            if values != None:
+                class_pat[name] = list(map(list, zip(*values.values())))
+                # TODO: ???
+            
             num += 1
     
     elif ARGS.option == "dataset_class":
         classes = read_dataset(classPath, "class")
         classes = classes.astype(str)
 
-        resolve_rules_float, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)")
-        if resolve_rules_float != None: class_pat = split_class(classes, resolve_rules_float)
+        values, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)")
+        if values != None: class_pat = split_class(classes, values)
     
     return ids, class_pat
     #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs)
@@ -887,6 +879,7 @@
     global ARGS
     ARGS = process_args(args)
 
+    # Create output folder
     if not os.path.isdir(ARGS.output_path):
         os.makedirs(ARGS.output_path, exist_ok=True)
     
@@ -894,6 +887,8 @@
         ARGS.tool_dir,
         utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None)
     
+    # TODO: in the future keep the indices WITH the data and fix the code below.
+
     # Prepare enrichment results containers
     ras_results = []
     rps_results = []
@@ -912,10 +907,10 @@
 
     # Organize by comparison pairs
     comparisons: Dict[Tuple[str, str], Dict[str, Tuple]] = {}
-    for i, j, d, mz in ras_results:
-        comparisons[(i, j)] = {'ras': (d, mz), 'rps': None}
-    for i, j, d, mz in rps_results:
-        comparisons.setdefault((i, j), {}).update({'rps': (d, mz)})
+    for i, j, comparison_data, max_z_score in ras_results:
+        comparisons[(i, j)] = {'ras': (comparison_data, max_z_score), 'rps': None}
+    for i, j, comparison_data, max_z_score in rps_results:
+        comparisons.setdefault((i, j), {}).update({'rps': (comparison_data, max_z_score)})
 
     # For each comparison, create a styled map with RAS bodies and RPS heads
     for (i, j), res in comparisons.items():
@@ -929,7 +924,7 @@
         # Apply RPS styling to arrow heads
         if res.get('rps'):
             tmp_rps, max_z_rps = res['rps']
-            # Ensure applyRpsEnrichmentToMap styles only heads; adjust internals if needed
+            # applyRpsEnrichmentToMap styles only heads unless only RPS are used
             temp_thingsInCommon(tmp_rps, map_copy, max_z_rps, i, j, ras_enrichment=False)
 
         # Output both SVG and PDF/PNG as configured
@@ -937,4 +932,4 @@
     print('Execution succeeded')
 ###############################################################################
 if __name__ == "__main__":
-    main()
+    main()
\ No newline at end of file