Mercurial > repos > bimib > cobraxy
diff COBRAxy/flux_to_map.py @ 293:7b8d9de81a86 draft
Uploaded
author | francesco_lapi |
---|---|
date | Thu, 15 May 2025 18:23:52 +0000 |
parents | e87aeb3a33cd |
children | 626b6d1de075 |
line wrap: on
line diff
--- a/COBRAxy/flux_to_map.py Thu May 15 18:21:58 2025 +0000 +++ b/COBRAxy/flux_to_map.py Thu May 15 18:23:52 2025 +0000 @@ -15,7 +15,7 @@ import copy import argparse import pyvips -from PIL import Image, ImageDraw, ImageFont +from PIL import Image from typing import Tuple, Union, Optional, List, Dict import matplotlib.pyplot as plt @@ -50,8 +50,15 @@ parser.add_argument( '-co', '--comparison', type = str, - default = '1vs1', + default = 'manyvsmany', choices = ['manyvsmany', 'onevsrest', 'onevsmany']) + + parser.add_argument( + '-te' ,'--test', + type = str, + default = 'ks', + choices = ['ks', 'ttest_p', 'ttest_ind', 'wilcoxon', 'mw'], + help = 'Statistical test to use (default: %(default)s)') parser.add_argument( '-pv' ,'--pValue', @@ -130,7 +137,7 @@ args.net = True # TODO SICCOME I FLUSSI POSSONO ESSERE ANCHE NEGATIVI SONO SEMPRE CONSIDERATI NETTI return args - + ############################ dataset input #################################### def read_dataset(data :str, name :str) -> pd.DataFrame: """ @@ -195,100 +202,6 @@ return 'INF' else: # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1 return (avg1 - avg2) / (abs(avg1) + abs(avg2)) - -def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str: - """ - Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params. - - Args: - l : current style string of an SVG element - col : new value for the "stroke" style property - width : new value for the "stroke-width" style property - dash : new value for the "stroke-dasharray" style property - - Returns: - str : the fixed style string - """ - tmp = l.split(';') - flag_col = False - flag_width = False - flag_dash = False - for i in range(len(tmp)): - if tmp[i].startswith('stroke:'): - tmp[i] = 'stroke:' + col - flag_col = True - if tmp[i].startswith('stroke-width:'): - tmp[i] = 'stroke-width:' + width - flag_width = True - if tmp[i].startswith('stroke-dasharray:'): - tmp[i] = 'stroke-dasharray:' + dash - flag_dash = True - if not flag_col: - tmp.append('stroke:' + col) - if not flag_width: - tmp.append('stroke-width:' + width) - if not flag_dash: - tmp.append('stroke-dasharray:' + dash) - return ';'.join(tmp) - -# The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix! -def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree: - """ - Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs. - - Args: - d : dictionary mapping a p-value and a fold-change value (values) to each reaction ID as encoded in the SVG map (keys) - core_map : SVG map to modify - threshold_P_V : threshold for a p-value to be considered significant - threshold_F_C : threshold for a fold change value to be considered significant - max_z_score : highest z-score (absolute value) - - Returns: - ET.ElementTree : the modified core_map - - Side effects: - core_map : mut - """ - maxT = 12 - minT = 2 - grey = '#BEBEBE' - blue = '#6495ed' # azzurrino - red = '#ecac68' # arancione - for el in core_map.iter(): - el_id = str(el.get('id')) - if el_id.startswith('R_'): - tmp = d.get(el_id[2:]) - if tmp != None: - p_val :float = tmp[0] - f_c = tmp[1] - z_score = tmp[2] - - if math.isnan(p_val) or (isinstance(f_c, float) and math.isnan(f_c)): continue - - if p_val < threshold_P_V: - if not isinstance(f_c, str): - if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # - col = grey - width = str(minT) - else: - if f_c < 0: - col = blue - elif f_c > 0: - col = red - width = str(max((abs(z_score) * maxT) / max_z_score, minT)) - else: - if f_c == '-INF': - col = blue - elif f_c == 'INF': - col = red - width = str(maxT) - dash = 'none' - else: - dash = '5,5' - col = grey - width = str(minT) - el.set('style', fix_style(el.get('style', ""), col, width, dash)) - return core_map def getElementById(reactionId :str, metabMap :ET.ElementTree) -> utils.Result[ET.Element, utils.Result.ResultErr]: """ @@ -496,9 +409,11 @@ continue width = Arrow.MAX_W - if not math.isinf(foldChange): + if not math.isinf(z_score): try: - width = max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W) + width = min( + max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W), + Arrow.MAX_W) except ZeroDivisionError: pass # TODO CHECK RV @@ -697,11 +612,26 @@ Returns: tuple: (P-value, Z-score) - - P-value from a Kolmogorov-Smirnov test on the provided data. + - P-value from the selected test on the provided data. - Z-score of the difference between means of the two datasets. """ - # Perform Kolmogorov-Smirnov test - ks_statistic, p_value = st.ks_2samp(dataset1Data, dataset2Data) + + match ARGS.test: + case "ks": + # Perform Kolmogorov-Smirnov test + _, p_value = st.ks_2samp(dataset1Data, dataset2Data) + case "ttest_p": + # Perform t-test for paired samples + _, p_value = st.ttest_rel(dataset1Data, dataset2Data) + case "ttest_ind": + # Perform t-test for independent samples + _, p_value = st.ttest_ind(dataset1Data, dataset2Data) + case "wilcoxon": + # Perform Wilcoxon signed-rank test + _, p_value = st.wilcoxon(dataset1Data, dataset2Data) + case "mw": + # Perform Mann-Whitney U test + _, p_value = st.mannwhitneyu(dataset1Data, dataset2Data) # Calculate means and standard deviations mean1 = np.nanmean(dataset1Data) @@ -732,7 +662,7 @@ avg1 = sum(l1) / len(l1) avg2 = sum(l2) / len(l2) f_c = fold_change(avg1, avg2) - if not isinstance(z_score, str) and max_z_score < abs(z_score): max_z_score = abs(z_score) + if np.isfinite(z_score) and max_z_score < abs(z_score): max_z_score = abs(z_score) tmp[reactId] = [float(p_value), f_c, z_score, avg1, avg2] except (TypeError, ZeroDivisionError): continue @@ -815,7 +745,7 @@ classes = read_dataset(classPath, "class") classes = classes.astype(str) resolve_rules_float, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)") - #check if classes have mathc on ids + #check if classes have match on ids if not all(classes.iloc[:, 0].isin(ids)): utils.logWarning( "No match between classes and sample IDs", ARGS.out_log) @@ -888,7 +818,7 @@ for i, col in enumerate(dataset.columns)} }) - print(new_rows) + #print(new_rows) # Ritorna il dataset originale con le nuove righe dataset.reset_index(inplace=True) @@ -913,8 +843,6 @@ rgb = (np.array(rgb) * 255).astype(int) return '#{:02x}{:02x}{:02x}'.format(rgb[0], rgb[1], rgb[2]) - - def save_colormap_image(min_value: float, max_value: float, path: utils.FilePath, colorMap:str="viridis"): """ Create and save an image of the colormap showing the gradient and its range. @@ -1060,7 +988,6 @@ if not ARGS.generate_svg: os.remove(svgFilePath.show()) - ############################ MAIN ############################################# def main(args:List[str] = None) -> None: """