Mercurial > repos > bimib > cobraxy
changeset 291:7f3e66dd46fa draft default tip
Uploaded
author | francesco_lapi |
---|---|
date | Wed, 14 May 2025 11:26:59 +0000 |
parents | 71ec0b6ddde9 |
children | |
files | COBRAxy/marea.py |
diffstat | 1 files changed, 94 insertions(+), 99 deletions(-) [+] |
line wrap: on
line diff
--- a/COBRAxy/marea.py Wed May 14 10:43:04 2025 +0000 +++ b/COBRAxy/marea.py Wed May 14 11:26:59 2025 +0000 @@ -48,7 +48,7 @@ parser.add_argument( '-co', '--comparison', type = str, - default = '1vs1', + default = 'manyvsmany', choices = ['manyvsmany', 'onevsrest', 'onevsmany']) parser.add_argument( @@ -185,23 +185,6 @@ sys.exit('Execution aborted: wrong format of ' + name + '\n') return dataset -############################ dataset name ##################################### -def name_dataset(name_data :str, count :int) -> str: - """ - Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique. - - Args: - name_data : name associated with the dataset (from frontend input params) - count : counter from 1 to make these names unique (external) - - Returns: - str : the name made unique - """ - if str(name_data) == 'Dataset': - return str(name_data) + '_' + str(count) - else: - return str(name_data) - ############################ map_methods ###################################### FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]] def fold_change(avg1 :float, avg2 :float) -> FoldChange: @@ -220,13 +203,17 @@ """ if avg1 == 0 and avg2 == 0: return 0 - elif avg1 == 0: - return '-INF' - elif avg2 == 0: + + if avg1 == 0: + return '-INF' # TODO: maybe fix + + if avg2 == 0: return 'INF' - else: # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1 - return (avg1 - avg2) / (abs(avg1) + abs(avg2)) + # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1 + return (avg1 - avg2) / (abs(avg1) + abs(avg2)) + +# TODO: I would really like for this one to get the Thanos treatment def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str: """ Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params. @@ -262,6 +249,7 @@ tmp.append('stroke-dasharray:' + dash) return ';'.join(tmp) +# TODO: remove, there's applyRPS whatever # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix! def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree: """ @@ -290,31 +278,33 @@ if el_id.startswith('R_'): tmp = d.get(el_id[2:]) if tmp != None: - p_val :float = tmp[0] - f_c = tmp[1] - z_score = tmp[2] + p_val, f_c, z_score, avg1, avg2 = tmp if math.isnan(p_val) or (isinstance(f_c, float) and math.isnan(f_c)): continue - if p_val <= threshold_P_V: - if not isinstance(f_c, str): - if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # + if p_val <= threshold_P_V: # p-value is OK + if not isinstance(f_c, str): # FC is finite + if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # FC is not OK col = grey width = str(minT) - else: + else: # FC is OK if f_c < 0: col = blue elif f_c > 0: col = red - width = str(max((abs(z_score) * maxT) / max_z_score, minT)) - else: + width = str( + min( + max(abs(z_score * maxT) / max_z_score, minT), + maxT)) + + else: # FC is infinite if f_c == '-INF': col = blue elif f_c == 'INF': col = red width = str(maxT) dash = 'none' - else: + else: # p-value is not OK dash = '5,5' col = grey width = str(minT) @@ -342,10 +332,13 @@ def styleMapElement(element :ET.Element, styleStr :str) -> None: currentStyles :str = element.get("style", "") if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles): - currentStyles = ';'.join(currentStyles.split(';')[:-3]) + currentStyles = ';'.join(currentStyles.split(';')[:-3]) # TODO: why the last 3? Are we sure? + + #TODO: this is attempting to solve the styling override problem, not sure it does tho element.set("style", currentStyles + styleStr) +# TODO: maybe remove vvv class ReactionDirection(Enum): Unknown = "" Direct = "_F" @@ -378,7 +371,9 @@ Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try. """ if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV - elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: return reactionId[:-3:-1] + reactionId[:-2], "" + elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: + return reactionId[:-3:-1] + reactionId[:-2], "" # ^^^ Invert _F to F_ + return f"F_{reactionId}", f"B_{reactionId}" class ArrowColor(Enum): @@ -386,15 +381,15 @@ Encodes possible arrow colors based on their meaning in the enrichment process. """ Invalid = "#BEBEBE" # gray, fold-change under treshold - UpRegulated = "#ecac68" # red, up-regulated reaction - DownRegulated = "#6495ed" # blue, down-regulated reaction + UpRegulated = "#ecac68" # orange, up-regulated reaction + DownRegulated = "#6495ed" # lightblue, down-regulated reaction UpRegulatedInv = "#FF0000" - # ^^^ different shade of red (actually orange), up-regulated net value for a reversible reaction with + # ^^^ bright red, up-regulated net value for a reversible reaction with # conflicting enrichment in the two directions. DownRegulatedInv = "#0000FF" - # ^^^ different shade of blue (actually purple), down-regulated net value for a reversible reaction with + # ^^^ bright blue, down-regulated net value for a reversible reaction with # conflicting enrichment in the two directions. @classmethod @@ -441,6 +436,7 @@ self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True)) if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True)) + # TODO: this seems to be unused, remove def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str: """ Computes the reaction ID as encoded in the map for a given reaction ID from the dataset. @@ -474,6 +470,7 @@ INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid) INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True) +# TODO: A more general version of this can be used for RAS as well, we don't need "fix map" or whatever def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None: """ Applies RPS enrichment results to the provided metabolic map. @@ -501,13 +498,16 @@ INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId) continue - if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1): + if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1): INVALID_ARROW.styleReactionElements(metabMap, reactionId) continue width = Arrow.MAX_W if not math.isinf(foldChange): - try: width = max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W) + try: width = min( + max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W), + Arrow.MAX_W) + except ZeroDivisionError: pass if not reactionId.endswith("_RV"): # RV stands for reversible reactions @@ -518,7 +518,6 @@ inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score if inversionScore == 2: foldChange *= -1 - # ^^^ Style the inverse direction with the opposite sign netValue # If the score is 1 (opposite signs) we use alternative colors vvv arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1)) @@ -531,13 +530,13 @@ arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False) ############################ split class ###################################### -def split_class(classes :pd.DataFrame, resolve_rules :Dict[str, List[float]]) -> Dict[str, List[List[float]]]: +def split_class(classes :pd.DataFrame, dataset_values :Dict[str, List[float]]) -> Dict[str, List[List[float]]]: """ Generates a :dict that groups together data from a :DataFrame based on classes the data is related to. Args: classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict - resolve_rules : a :dict containing :float data + dataset_values : a :dict containing :float data Returns: dict : the dict with data grouped by class @@ -553,11 +552,11 @@ l :List[List[float]] = [] for j in range(i, len(classes)): if classes.iloc[j, 1] == classe: - pat_id :str = classes.iloc[j, 0] - tmp = resolve_rules.get(pat_id, None) - if tmp != None: - l.append(tmp) - classes.iloc[j, 1] = None + pat_id :str = classes.iloc[j, 0] # sample name + values = dataset_values.get(pat_id, None) # the column of values for that sample + if values != None: + l.append(values) + classes.iloc[j, 1] = None # TODO: problems? if l: class_pat[classe] = list(map(list, zip(*l))) @@ -600,24 +599,6 @@ composite_image = white_background.composite2(image, 'over') composite_image.write_to_file(png_path.show()) -#funzione unica, lascio fuori i file e li passo in input -#conversion from png to pdf -def convert_png_to_pdf(png_file :utils.FilePath, pdf_file :utils.FilePath) -> None: - """ - Internal utility to convert a PNG to PDF to aid from SVG conversion. - - Args: - png_file : path to PNG file - pdf_file : path to new PDF file - - Returns: - None - """ - image = Image.open(png_file.show()) - image = image.convert("RGB") - image.save(pdf_file.show(), "PDF", resolution=100.0) - -#function called to reduce redundancy in the code def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None: """ Converts the SVG map at the provided path to PDF. @@ -632,7 +613,9 @@ """ svg_to_png_with_background(file_svg, file_png) try: - convert_png_to_pdf(file_png, file_pdf) + image = Image.open(file_png.show()) + image = image.convert("RGB") + image.save(file_pdf.show(), "PDF", resolution=100.0) print(f'PDF file {file_pdf.filePath} successfully generated.') except Exception as e: @@ -677,18 +660,14 @@ writer.writerow({ field : data for field, data in zip(fieldNames, row) }) OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible -def writeTabularResult(enrichedScores : OldEnrichedScores, ras_enrichment: bool, outPath :utils.FilePath) -> None: - fieldNames = ["ids", "P_Value", "fold change", "average_1", "average_2"] - - writeToCsv([ [reactId] + values for reactId, values in enrichedScores.items() ], fieldNames, outPath) - -def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None: +def temp_thingsInCommon(tmp :OldEnrichedScores, core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None: # this function compiles the things always in common between comparison modes after enrichment. # TODO: organize, name better. suffix = "RAS" if ras_enrichment else "RPS" - details = f"Tabular Result ({suffix})" - - writeTabularResult(tmp, ras_enrichment, buildOutputPath(dataset1Name, dataset2Name, details = details, ext = utils.FileFormat.TSV)) + writeToCsv( + [ [reactId] + values for reactId, values in tmp.items() ], + ["ids", "P_Value", "fold change", "average_1", "average_2"], + buildOutputPath(dataset1Name, dataset2Name, details = f"Tabular Result ({suffix})", ext = utils.FileFormat.TSV)) if ras_enrichment: fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_z_score) @@ -733,7 +712,7 @@ def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]: #TODO: the following code still suffers from "dumbvarnames-osis" - tmp :Dict[str, List[Union[float, FoldChange]]] = {} + comparisonResult :Dict[str, List[Union[float, FoldChange]]] = {} count = 0 max_z_score = 0 @@ -758,22 +737,23 @@ net = fold_change(avg1, avg2) if math.isnan(net): continue - tmp[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2] + comparisonResult[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2] # vvv complementary directional ids are set to None once processed if net is to be applied to tips - if ARGS.net: + if ARGS.net: # If only using RPS, we cannot delete the inverse, as it's needed to color the arrows ids[position] = None continue # fallthrough is intended, regular scores need to be computed when tips aren't net but RAS datasets aren't used p_value, z_score = computePValue(l1, l2) avg = fold_change(sum(l1) / len(l1), sum(l2) / len(l2)) - if not isinstance(z_score, str) and max_z_score < abs(z_score): max_z_score = abs(z_score) - tmp[reactId] = [float(p_value), avg, z_score, sum(l1) / len(l1), sum(l2) / len(l2)] + # vvv TODO: Check numpy version compatibility + if np.isfinite(z_score) and max_z_score < abs(z_score): max_z_score = abs(z_score) + comparisonResult[reactId] = [float(p_value), avg, z_score, sum(l1) / len(l1), sum(l2) / len(l2)] except (TypeError, ZeroDivisionError): continue - return tmp, max_z_score + return comparisonResult, max_z_score def computeEnrichment(class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> List[Tuple[str, str, dict, float]]: """ @@ -825,9 +805,17 @@ if ARGS.generate_pdf: pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG) pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF) - convert_to_pdf(svgFilePath, pngPath, pdfPath) + svg_to_png_with_background(svgFilePath, pngPath) + try: + image = Image.open(pngPath.show()) + image = image.convert("RGB") + image.save(pdfPath.show(), "PDF", resolution=100.0) + print(f'PDF file {pdfPath.filePath} successfully generated.') + + except Exception as e: + raise utils.DataErr(pdfPath.show(), f'Error generating PDF file: {e}') - if not ARGS.generate_svg: + if not ARGS.generate_svg: # This argument is useless, who cares if the user wants the svg or not os.remove(svgFilePath.show()) ClassPat = Dict[str, List[List[float]]] @@ -836,21 +824,25 @@ # for the sake of everyone's sanity. class_pat :ClassPat = {} if ARGS.option == 'datasets': - num = 1 #TODO: the dataset naming function could be a generator + num = 1 for path, name in zip(datasetsPaths, names): - name = name_dataset(name, num) - resolve_rules_float, ids = getDatasetValues(path, name) - if resolve_rules_float != None: - class_pat[name] = list(map(list, zip(*resolve_rules_float.values()))) - + name = str(name) + if name == 'Dataset': + name += '_' + str(num) + + values, ids = getDatasetValues(path, name) + if values != None: + class_pat[name] = list(map(list, zip(*values.values()))) + # TODO: ??? + num += 1 elif ARGS.option == "dataset_class": classes = read_dataset(classPath, "class") classes = classes.astype(str) - resolve_rules_float, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)") - if resolve_rules_float != None: class_pat = split_class(classes, resolve_rules_float) + values, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)") + if values != None: class_pat = split_class(classes, values) return ids, class_pat #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs) @@ -887,6 +879,7 @@ global ARGS ARGS = process_args(args) + # Create output folder if not os.path.isdir(ARGS.output_path): os.makedirs(ARGS.output_path, exist_ok=True) @@ -894,6 +887,8 @@ ARGS.tool_dir, utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None) + # TODO: in the future keep the indices WITH the data and fix the code below. + # Prepare enrichment results containers ras_results = [] rps_results = [] @@ -912,10 +907,10 @@ # Organize by comparison pairs comparisons: Dict[Tuple[str, str], Dict[str, Tuple]] = {} - for i, j, d, mz in ras_results: - comparisons[(i, j)] = {'ras': (d, mz), 'rps': None} - for i, j, d, mz in rps_results: - comparisons.setdefault((i, j), {}).update({'rps': (d, mz)}) + for i, j, comparison_data, max_z_score in ras_results: + comparisons[(i, j)] = {'ras': (comparison_data, max_z_score), 'rps': None} + for i, j, comparison_data, max_z_score in rps_results: + comparisons.setdefault((i, j), {}).update({'rps': (comparison_data, max_z_score)}) # For each comparison, create a styled map with RAS bodies and RPS heads for (i, j), res in comparisons.items(): @@ -929,7 +924,7 @@ # Apply RPS styling to arrow heads if res.get('rps'): tmp_rps, max_z_rps = res['rps'] - # Ensure applyRpsEnrichmentToMap styles only heads; adjust internals if needed + # applyRpsEnrichmentToMap styles only heads unless only RPS are used temp_thingsInCommon(tmp_rps, map_copy, max_z_rps, i, j, ras_enrichment=False) # Output both SVG and PDF/PNG as configured @@ -937,4 +932,4 @@ print('Execution succeeded') ############################################################################### if __name__ == "__main__": - main() + main() \ No newline at end of file