Mercurial > repos > bimib > cobraxy
changeset 290:71ec0b6ddde9 draft
Uploaded
| author | francesco_lapi | 
|---|---|
| date | Wed, 14 May 2025 10:43:04 +0000 | 
| parents | 3f5cfcf4d9eb | 
| children | 7f3e66dd46fa | 
| files | COBRAxy/marea.py | 
| diffstat | 1 files changed, 102 insertions(+), 97 deletions(-) [+] | 
line wrap: on
 line diff
--- a/COBRAxy/marea.py Wed May 14 10:08:09 2025 +0000 +++ b/COBRAxy/marea.py Wed May 14 10:43:04 2025 +0000 @@ -48,7 +48,7 @@ parser.add_argument( '-co', '--comparison', type = str, - default = 'manyvsmany', + default = '1vs1', choices = ['manyvsmany', 'onevsrest', 'onevsmany']) parser.add_argument( @@ -185,6 +185,23 @@ sys.exit('Execution aborted: wrong format of ' + name + '\n') return dataset +############################ dataset name ##################################### +def name_dataset(name_data :str, count :int) -> str: + """ + Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique. + + Args: + name_data : name associated with the dataset (from frontend input params) + count : counter from 1 to make these names unique (external) + + Returns: + str : the name made unique + """ + if str(name_data) == 'Dataset': + return str(name_data) + '_' + str(count) + else: + return str(name_data) + ############################ map_methods ###################################### FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]] def fold_change(avg1 :float, avg2 :float) -> FoldChange: @@ -203,19 +220,13 @@ """ if avg1 == 0 and avg2 == 0: return 0 - - if avg1 == 0: - return '-INF' # TODO: maybe fix - - if avg2 == 0: + elif avg1 == 0: + return '-INF' + elif avg2 == 0: return 'INF' + else: # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1 + return (avg1 - avg2) / (abs(avg1) + abs(avg2)) - # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1 - fc = (avg1 - avg2) / (abs(avg1) + abs(avg2)) - if avg1 < 0 and avg2 < 0: fc *= -1 - return fc - -# TODO: I would really like for this one to get the Thanos treatment def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str: """ Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params. @@ -251,7 +262,6 @@ tmp.append('stroke-dasharray:' + dash) return ';'.join(tmp) -# TODO: remove, there's applyRPS whatever # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix! def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree: """ @@ -280,33 +290,31 @@ if el_id.startswith('R_'): tmp = d.get(el_id[2:]) if tmp != None: - p_val, f_c, z_score, avg1, avg2 = tmp + p_val :float = tmp[0] + f_c = tmp[1] + z_score = tmp[2] if math.isnan(p_val) or (isinstance(f_c, float) and math.isnan(f_c)): continue - if p_val <= threshold_P_V: # p-value is OK - if not isinstance(f_c, str): # FC is finite - if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # FC is not OK + if p_val <= threshold_P_V: + if not isinstance(f_c, str): + if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # col = grey width = str(minT) - else: # FC is OK + else: if f_c < 0: col = blue elif f_c > 0: col = red - width = str( - min( - max(abs(z_score * maxT) / max_z_score, minT), - maxT)) - - else: # FC is infinite + width = str(max((abs(z_score) * maxT) / max_z_score, minT)) + else: if f_c == '-INF': col = blue elif f_c == 'INF': col = red width = str(maxT) dash = 'none' - else: # p-value is not OK + else: dash = '5,5' col = grey width = str(minT) @@ -326,20 +334,18 @@ utils.Result[ET.Element, ResultErr]: result of the search, either the first match found or a ResultErr. """ return utils.Result.Ok( - lambda: metabMap.xpath(f"//*[@id=\"{reactionId}\"]")[0]).mapErr( + f"//*[@id=\"{reactionId}\"]").map( + lambda xPath : metabMap.xpath(xPath)[0]).mapErr( lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map")) # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user. def styleMapElement(element :ET.Element, styleStr :str) -> None: currentStyles :str = element.get("style", "") if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles): - currentStyles = ';'.join(currentStyles.split(';')[:-3]) # TODO: why the last 3? Are we sure? - - #TODO: this is attempting to solve the styling override problem, not sure it does tho + currentStyles = ';'.join(currentStyles.split(';')[:-3]) element.set("style", currentStyles + styleStr) -# TODO: maybe remove vvv class ReactionDirection(Enum): Unknown = "" Direct = "_F" @@ -372,9 +378,7 @@ Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try. """ if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV - elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: - return reactionId[:-3:-1] + reactionId[:-2], "" # ^^^ Invert _F to F_ - + elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: return reactionId[:-3:-1] + reactionId[:-2], "" return f"F_{reactionId}", f"B_{reactionId}" class ArrowColor(Enum): @@ -382,15 +386,15 @@ Encodes possible arrow colors based on their meaning in the enrichment process. """ Invalid = "#BEBEBE" # gray, fold-change under treshold - UpRegulated = "#ecac68" # orange, up-regulated reaction - DownRegulated = "#6495ed" # lightblue, down-regulated reaction + UpRegulated = "#ecac68" # red, up-regulated reaction + DownRegulated = "#6495ed" # blue, down-regulated reaction UpRegulatedInv = "#FF0000" - # ^^^ bright red, up-regulated net value for a reversible reaction with + # ^^^ different shade of red (actually orange), up-regulated net value for a reversible reaction with # conflicting enrichment in the two directions. DownRegulatedInv = "#0000FF" - # ^^^ bright blue, down-regulated net value for a reversible reaction with + # ^^^ different shade of blue (actually purple), down-regulated net value for a reversible reaction with # conflicting enrichment in the two directions. @classmethod @@ -437,7 +441,6 @@ self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True)) if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True)) - # TODO: this seems to be unused, remove def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str: """ Computes the reaction ID as encoded in the map for a given reaction ID from the dataset. @@ -471,7 +474,6 @@ INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid) INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True) -# TODO: A more general version of this can be used for RAS as well, we don't need "fix map" or whatever def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None: """ Applies RPS enrichment results to the provided metabolic map. @@ -499,16 +501,13 @@ INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId) continue - if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1): + if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1): INVALID_ARROW.styleReactionElements(metabMap, reactionId) continue width = Arrow.MAX_W if not math.isinf(foldChange): - try: width = min( - max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W), - Arrow.MAX_W) - + try: width = max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W) except ZeroDivisionError: pass if not reactionId.endswith("_RV"): # RV stands for reversible reactions @@ -518,6 +517,8 @@ reactionId = reactionId[:-3] # Remove "_RV" inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score + if inversionScore == 2: foldChange *= -1 + # ^^^ Style the inverse direction with the opposite sign netValue # If the score is 1 (opposite signs) we use alternative colors vvv arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1)) @@ -530,13 +531,13 @@ arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False) ############################ split class ###################################### -def split_class(classes :pd.DataFrame, dataset_values :Dict[str, List[float]]) -> Dict[str, List[List[float]]]: +def split_class(classes :pd.DataFrame, resolve_rules :Dict[str, List[float]]) -> Dict[str, List[List[float]]]: """ Generates a :dict that groups together data from a :DataFrame based on classes the data is related to. Args: classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict - dataset_values : a :dict containing :float data + resolve_rules : a :dict containing :float data Returns: dict : the dict with data grouped by class @@ -552,11 +553,11 @@ l :List[List[float]] = [] for j in range(i, len(classes)): if classes.iloc[j, 1] == classe: - pat_id :str = classes.iloc[j, 0] # sample name - values = dataset_values.get(pat_id, None) # the column of values for that sample - if values != None: - l.append(values) - classes.iloc[j, 1] = None # TODO: problems? + pat_id :str = classes.iloc[j, 0] + tmp = resolve_rules.get(pat_id, None) + if tmp != None: + l.append(tmp) + classes.iloc[j, 1] = None if l: class_pat[classe] = list(map(list, zip(*l))) @@ -599,6 +600,24 @@ composite_image = white_background.composite2(image, 'over') composite_image.write_to_file(png_path.show()) +#funzione unica, lascio fuori i file e li passo in input +#conversion from png to pdf +def convert_png_to_pdf(png_file :utils.FilePath, pdf_file :utils.FilePath) -> None: + """ + Internal utility to convert a PNG to PDF to aid from SVG conversion. + + Args: + png_file : path to PNG file + pdf_file : path to new PDF file + + Returns: + None + """ + image = Image.open(png_file.show()) + image = image.convert("RGB") + image.save(pdf_file.show(), "PDF", resolution=100.0) + +#function called to reduce redundancy in the code def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None: """ Converts the SVG map at the provided path to PDF. @@ -613,9 +632,7 @@ """ svg_to_png_with_background(file_svg, file_png) try: - image = Image.open(file_png.show()) - image = image.convert("RGB") - image.save(file_pdf.show(), "PDF", resolution=100.0) + convert_png_to_pdf(file_png, file_pdf) print(f'PDF file {file_pdf.filePath} successfully generated.') except Exception as e: @@ -660,14 +677,18 @@ writer.writerow({ field : data for field, data in zip(fieldNames, row) }) OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible -def temp_thingsInCommon(tmp :OldEnrichedScores, core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None: +def writeTabularResult(enrichedScores : OldEnrichedScores, ras_enrichment: bool, outPath :utils.FilePath) -> None: + fieldNames = ["ids", "P_Value", "fold change", "average_1", "average_2"] + + writeToCsv([ [reactId] + values for reactId, values in enrichedScores.items() ], fieldNames, outPath) + +def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None: # this function compiles the things always in common between comparison modes after enrichment. # TODO: organize, name better. suffix = "RAS" if ras_enrichment else "RPS" - writeToCsv( - [ [reactId] + values for reactId, values in tmp.items() ], - ["ids", "P_Value", "fold change", "average_1", "average_2"], - buildOutputPath(dataset1Name, dataset2Name, details = f"Tabular Result ({suffix})", ext = utils.FileFormat.TSV)) + details = f"Tabular Result ({suffix})" + + writeTabularResult(tmp, ras_enrichment, buildOutputPath(dataset1Name, dataset2Name, details = details, ext = utils.FileFormat.TSV)) if ras_enrichment: fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_z_score) @@ -712,7 +733,7 @@ def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]: #TODO: the following code still suffers from "dumbvarnames-osis" - comparisonResult :Dict[str, List[Union[float, FoldChange]]] = {} + tmp :Dict[str, List[Union[float, FoldChange]]] = {} count = 0 max_z_score = 0 @@ -737,23 +758,22 @@ net = fold_change(avg1, avg2) if math.isnan(net): continue - comparisonResult[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2] + tmp[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2] # vvv complementary directional ids are set to None once processed if net is to be applied to tips - if ARGS.net: # If only using RPS, we cannot delete the inverse, as it's needed to color the arrows + if ARGS.net: ids[position] = None continue # fallthrough is intended, regular scores need to be computed when tips aren't net but RAS datasets aren't used p_value, z_score = computePValue(l1, l2) avg = fold_change(sum(l1) / len(l1), sum(l2) / len(l2)) - # vvv TODO: Check numpy version compatibility - if np.isfinite(z_score) and max_z_score < abs(z_score): max_z_score = abs(z_score) - comparisonResult[reactId] = [float(p_value), avg, z_score, sum(l1) / len(l1), sum(l2) / len(l2)] + if not isinstance(z_score, str) and max_z_score < abs(z_score): max_z_score = abs(z_score) + tmp[reactId] = [float(p_value), avg, z_score, sum(l1) / len(l1), sum(l2) / len(l2)] except (TypeError, ZeroDivisionError): continue - return comparisonResult, max_z_score + return tmp, max_z_score def computeEnrichment(class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> List[Tuple[str, str, dict, float]]: """ @@ -805,17 +825,9 @@ if ARGS.generate_pdf: pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG) pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF) - svg_to_png_with_background(svgFilePath, pngPath) - try: - image = Image.open(pngPath.show()) - image = image.convert("RGB") - image.save(pdfPath.show(), "PDF", resolution=100.0) - print(f'PDF file {pdfPath.filePath} successfully generated.') - - except Exception as e: - raise utils.DataErr(pdfPath.show(), f'Error generating PDF file: {e}') + convert_to_pdf(svgFilePath, pngPath, pdfPath) - if not ARGS.generate_svg: # This argument is useless, who cares if the user wants the svg or not + if not ARGS.generate_svg: os.remove(svgFilePath.show()) ClassPat = Dict[str, List[List[float]]] @@ -824,25 +836,21 @@ # for the sake of everyone's sanity. class_pat :ClassPat = {} if ARGS.option == 'datasets': - num = 1 + num = 1 #TODO: the dataset naming function could be a generator for path, name in zip(datasetsPaths, names): - name = str(name) - if name == 'Dataset': - name += '_' + str(num) - - values, ids = getDatasetValues(path, name) - if values != None: - class_pat[name] = list(map(list, zip(*values.values()))) - # TODO: ??? - + name = name_dataset(name, num) + resolve_rules_float, ids = getDatasetValues(path, name) + if resolve_rules_float != None: + class_pat[name] = list(map(list, zip(*resolve_rules_float.values()))) + num += 1 elif ARGS.option == "dataset_class": classes = read_dataset(classPath, "class") classes = classes.astype(str) - values, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)") - if values != None: class_pat = split_class(classes, values) + resolve_rules_float, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)") + if resolve_rules_float != None: class_pat = split_class(classes, resolve_rules_float) return ids, class_pat #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs) @@ -879,7 +887,6 @@ global ARGS ARGS = process_args(args) - # Create output folder if not os.path.isdir(ARGS.output_path): os.makedirs(ARGS.output_path, exist_ok=True) @@ -887,8 +894,6 @@ ARGS.tool_dir, utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None) - # TODO: in the future keep the indices WITH the data and fix the code below. - # Prepare enrichment results containers ras_results = [] rps_results = [] @@ -907,10 +912,10 @@ # Organize by comparison pairs comparisons: Dict[Tuple[str, str], Dict[str, Tuple]] = {} - for i, j, comparison_data, max_z_score in ras_results: - comparisons[(i, j)] = {'ras': (comparison_data, max_z_score), 'rps': None} - for i, j, comparison_data, max_z_score in rps_results: - comparisons.setdefault((i, j), {}).update({'rps': (comparison_data, max_z_score)}) + for i, j, d, mz in ras_results: + comparisons[(i, j)] = {'ras': (d, mz), 'rps': None} + for i, j, d, mz in rps_results: + comparisons.setdefault((i, j), {}).update({'rps': (d, mz)}) # For each comparison, create a styled map with RAS bodies and RPS heads for (i, j), res in comparisons.items(): @@ -924,7 +929,7 @@ # Apply RPS styling to arrow heads if res.get('rps'): tmp_rps, max_z_rps = res['rps'] - # applyRpsEnrichmentToMap styles only heads unless only RPS are used + # Ensure applyRpsEnrichmentToMap styles only heads; adjust internals if needed temp_thingsInCommon(tmp_rps, map_copy, max_z_rps, i, j, ras_enrichment=False) # Output both SVG and PDF/PNG as configured @@ -932,4 +937,4 @@ print('Execution succeeded') ############################################################################### if __name__ == "__main__": - main() \ No newline at end of file + main()
