Mercurial > repos > bimib > cobraxy
comparison COBRAxy/marea.py @ 286:eaa623b4b703 draft
Uploaded
| author | francesco_lapi |
|---|---|
| date | Wed, 14 May 2025 08:44:03 +0000 |
| parents | 216a796a967a |
| children | c8622bca2480 |
comparison
equal
deleted
inserted
replaced
| 285:216a796a967a | 286:eaa623b4b703 |
|---|---|
| 46 | 46 |
| 47 #Computation details: | 47 #Computation details: |
| 48 parser.add_argument( | 48 parser.add_argument( |
| 49 '-co', '--comparison', | 49 '-co', '--comparison', |
| 50 type = str, | 50 type = str, |
| 51 default = '1vs1', | 51 default = 'manyvsmany', |
| 52 choices = ['manyvsmany', 'onevsrest', 'onevsmany']) | 52 choices = ['manyvsmany', 'onevsrest', 'onevsmany']) |
| 53 | 53 |
| 54 parser.add_argument( | 54 parser.add_argument( |
| 55 '-pv' ,'--pValue', | 55 '-pv' ,'--pValue', |
| 56 type = float, | 56 type = float, |
| 183 sys.exit('Execution aborted: wrong format of ' + name + '\n') | 183 sys.exit('Execution aborted: wrong format of ' + name + '\n') |
| 184 if len(dataset.columns) < 2: | 184 if len(dataset.columns) < 2: |
| 185 sys.exit('Execution aborted: wrong format of ' + name + '\n') | 185 sys.exit('Execution aborted: wrong format of ' + name + '\n') |
| 186 return dataset | 186 return dataset |
| 187 | 187 |
| 188 ############################ dataset name ##################################### | |
| 189 def name_dataset(name_data :str, count :int) -> str: | |
| 190 """ | |
| 191 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique. | |
| 192 | |
| 193 Args: | |
| 194 name_data : name associated with the dataset (from frontend input params) | |
| 195 count : counter from 1 to make these names unique (external) | |
| 196 | |
| 197 Returns: | |
| 198 str : the name made unique | |
| 199 """ | |
| 200 if str(name_data) == 'Dataset': | |
| 201 return str(name_data) + '_' + str(count) | |
| 202 else: | |
| 203 return str(name_data) | |
| 204 | |
| 205 ############################ map_methods ###################################### | 188 ############################ map_methods ###################################### |
| 206 FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]] | 189 FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]] |
| 207 def fold_change(avg1 :float, avg2 :float) -> FoldChange: | 190 def fold_change(avg1 :float, avg2 :float) -> FoldChange: |
| 208 """ | 191 """ |
| 209 Calculates the fold change between two gene expression values. | 192 Calculates the fold change between two gene expression values. |
| 218 "INF" : when avg2 is 0 | 201 "INF" : when avg2 is 0 |
| 219 float : for any other combination of values | 202 float : for any other combination of values |
| 220 """ | 203 """ |
| 221 if avg1 == 0 and avg2 == 0: | 204 if avg1 == 0 and avg2 == 0: |
| 222 return 0 | 205 return 0 |
| 223 elif avg1 == 0: | 206 |
| 224 return '-INF' | 207 if avg1 == 0: |
| 225 elif avg2 == 0: | 208 return '-INF' # TODO: maybe fix |
| 209 | |
| 210 if avg2 == 0: | |
| 226 return 'INF' | 211 return 'INF' |
| 227 else: # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1 | 212 |
| 228 return (avg1 - avg2) / (abs(avg1) + abs(avg2)) | 213 # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1 |
| 229 | 214 fc = (avg1 - avg2) / (abs(avg1) + abs(avg2)) |
| 215 if avg1 < 0 and avg2 < 0: fc *= -1 | |
| 216 return fc | |
| 217 | |
| 218 # TODO: I would really like for this one to get the Thanos treatment | |
| 230 def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str: | 219 def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str: |
| 231 """ | 220 """ |
| 232 Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params. | 221 Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params. |
| 233 | 222 |
| 234 Args: | 223 Args: |
| 260 tmp.append('stroke-width:' + width) | 249 tmp.append('stroke-width:' + width) |
| 261 if not flag_dash: | 250 if not flag_dash: |
| 262 tmp.append('stroke-dasharray:' + dash) | 251 tmp.append('stroke-dasharray:' + dash) |
| 263 return ';'.join(tmp) | 252 return ';'.join(tmp) |
| 264 | 253 |
| 254 # TODO: remove, there's applyRPS whatever | |
| 265 # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix! | 255 # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix! |
| 266 def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree: | 256 def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree: |
| 267 """ | 257 """ |
| 268 Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs. | 258 Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs. |
| 269 | 259 |
| 288 for el in core_map.iter(): | 278 for el in core_map.iter(): |
| 289 el_id = str(el.get('id')) | 279 el_id = str(el.get('id')) |
| 290 if el_id.startswith('R_'): | 280 if el_id.startswith('R_'): |
| 291 tmp = d.get(el_id[2:]) | 281 tmp = d.get(el_id[2:]) |
| 292 if tmp != None: | 282 if tmp != None: |
| 293 p_val :float = tmp[0] | 283 p_val, f_c, z_score, avg1, avg2 = tmp |
| 294 f_c = tmp[1] | |
| 295 z_score = tmp[2] | |
| 296 | 284 |
| 297 if math.isnan(p_val) or (isinstance(f_c, float) and math.isnan(f_c)): continue | 285 if math.isnan(p_val) or (isinstance(f_c, float) and math.isnan(f_c)): continue |
| 298 | 286 |
| 299 if p_val <= threshold_P_V: | 287 if p_val <= threshold_P_V: # p-value is OK |
| 300 if not isinstance(f_c, str): | 288 if not isinstance(f_c, str): # FC is finite |
| 301 if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # | 289 if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # FC is not OK |
| 302 col = grey | 290 col = grey |
| 303 width = str(minT) | 291 width = str(minT) |
| 304 else: | 292 else: # FC is OK |
| 305 if f_c < 0: | 293 if f_c < 0: |
| 306 col = blue | 294 col = blue |
| 307 elif f_c > 0: | 295 elif f_c > 0: |
| 308 col = red | 296 col = red |
| 309 width = str(max((abs(z_score) * maxT) / max_z_score, minT)) | 297 width = str( |
| 310 else: | 298 min( |
| 299 max(abs(z_score * maxT) / max_z_score, minT), | |
| 300 maxT)) | |
| 301 | |
| 302 else: # FC is infinite | |
| 311 if f_c == '-INF': | 303 if f_c == '-INF': |
| 312 col = blue | 304 col = blue |
| 313 elif f_c == 'INF': | 305 elif f_c == 'INF': |
| 314 col = red | 306 col = red |
| 315 width = str(maxT) | 307 width = str(maxT) |
| 316 dash = 'none' | 308 dash = 'none' |
| 317 else: | 309 else: # p-value is not OK |
| 318 dash = '5,5' | 310 dash = '5,5' |
| 319 col = grey | 311 col = grey |
| 320 width = str(minT) | 312 width = str(minT) |
| 321 el.set('style', fix_style(el.get('style', ""), col, width, dash)) | 313 el.set('style', fix_style(el.get('style', ""), col, width, dash)) |
| 322 return core_map | 314 return core_map |
| 332 | 324 |
| 333 Returns: | 325 Returns: |
| 334 utils.Result[ET.Element, ResultErr]: result of the search, either the first match found or a ResultErr. | 326 utils.Result[ET.Element, ResultErr]: result of the search, either the first match found or a ResultErr. |
| 335 """ | 327 """ |
| 336 return utils.Result.Ok( | 328 return utils.Result.Ok( |
| 337 f"//*[@id=\"{reactionId}\"]").map( | 329 lambda: metabMap.xpath(f"//*[@id=\"{reactionId}\"]")[0]).mapErr( |
| 338 lambda xPath : metabMap.xpath(xPath)[0]).mapErr( | |
| 339 lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map")) | 330 lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map")) |
| 340 # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user. | 331 # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user. |
| 341 | 332 |
| 342 def styleMapElement(element :ET.Element, styleStr :str) -> None: | 333 def styleMapElement(element :ET.Element, styleStr :str) -> None: |
| 343 currentStyles :str = element.get("style", "") | 334 currentStyles :str = element.get("style", "") |
| 344 if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles): | 335 if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles): |
| 345 currentStyles = ';'.join(currentStyles.split(';')[:-3]) | 336 currentStyles = ';'.join(currentStyles.split(';')[:-3]) # TODO: why the last 3? Are we sure? |
| 337 | |
| 338 #TODO: this is attempting to solve the styling override problem, not sure it does tho | |
| 346 | 339 |
| 347 element.set("style", currentStyles + styleStr) | 340 element.set("style", currentStyles + styleStr) |
| 348 | 341 |
| 342 # TODO: maybe remove vvv | |
| 349 class ReactionDirection(Enum): | 343 class ReactionDirection(Enum): |
| 350 Unknown = "" | 344 Unknown = "" |
| 351 Direct = "_F" | 345 Direct = "_F" |
| 352 Inverse = "_B" | 346 Inverse = "_B" |
| 353 | 347 |
| 376 | 370 |
| 377 Returns: | 371 Returns: |
| 378 Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try. | 372 Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try. |
| 379 """ | 373 """ |
| 380 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV | 374 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV |
| 381 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: return reactionId[:-3:-1] + reactionId[:-2], "" | 375 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: |
| 376 return reactionId[:-3:-1] + reactionId[:-2], "" # ^^^ Invert _F to F_ | |
| 377 | |
| 382 return f"F_{reactionId}", f"B_{reactionId}" | 378 return f"F_{reactionId}", f"B_{reactionId}" |
| 383 | 379 |
| 384 class ArrowColor(Enum): | 380 class ArrowColor(Enum): |
| 385 """ | 381 """ |
| 386 Encodes possible arrow colors based on their meaning in the enrichment process. | 382 Encodes possible arrow colors based on their meaning in the enrichment process. |
| 387 """ | 383 """ |
| 388 Invalid = "#BEBEBE" # gray, fold-change under treshold | 384 Invalid = "#BEBEBE" # gray, fold-change under treshold |
| 389 UpRegulated = "#ecac68" # red, up-regulated reaction | 385 UpRegulated = "#ecac68" # orange, up-regulated reaction |
| 390 DownRegulated = "#6495ed" # blue, down-regulated reaction | 386 DownRegulated = "#6495ed" # lightblue, down-regulated reaction |
| 391 | 387 |
| 392 UpRegulatedInv = "#FF0000" | 388 UpRegulatedInv = "#FF0000" |
| 393 # ^^^ different shade of red (actually orange), up-regulated net value for a reversible reaction with | 389 # ^^^ bright red, up-regulated net value for a reversible reaction with |
| 394 # conflicting enrichment in the two directions. | 390 # conflicting enrichment in the two directions. |
| 395 | 391 |
| 396 DownRegulatedInv = "#0000FF" | 392 DownRegulatedInv = "#0000FF" |
| 397 # ^^^ different shade of blue (actually purple), down-regulated net value for a reversible reaction with | 393 # ^^^ bright blue, down-regulated net value for a reversible reaction with |
| 398 # conflicting enrichment in the two directions. | 394 # conflicting enrichment in the two directions. |
| 399 | 395 |
| 400 @classmethod | 396 @classmethod |
| 401 def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor": | 397 def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor": |
| 402 colors = (cls.DownRegulated, cls.DownRegulatedInv) if foldChange < 0 else (cls.UpRegulated, cls.UpRegulatedInv) | 398 colors = (cls.DownRegulated, cls.DownRegulatedInv) if foldChange < 0 else (cls.UpRegulated, cls.UpRegulatedInv) |
| 439 # Now we style the arrow head(s): | 435 # Now we style the arrow head(s): |
| 440 idOpt1, idOpt2 = getArrowHeadElementId(reactionId) | 436 idOpt1, idOpt2 = getArrowHeadElementId(reactionId) |
| 441 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True)) | 437 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True)) |
| 442 if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True)) | 438 if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True)) |
| 443 | 439 |
| 440 # TODO: this seems to be unused, remove | |
| 444 def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str: | 441 def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str: |
| 445 """ | 442 """ |
| 446 Computes the reaction ID as encoded in the map for a given reaction ID from the dataset. | 443 Computes the reaction ID as encoded in the map for a given reaction ID from the dataset. |
| 447 | 444 |
| 448 Args: | 445 Args: |
| 472 # vvv These constants could be inside the class itself a static properties, but python | 469 # vvv These constants could be inside the class itself a static properties, but python |
| 473 # was built by brainless organisms so here we are! | 470 # was built by brainless organisms so here we are! |
| 474 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid) | 471 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid) |
| 475 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True) | 472 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True) |
| 476 | 473 |
| 474 # TODO: A more general version of this can be used for RAS as well, we don't need "fix map" or whatever | |
| 477 def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None: | 475 def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None: |
| 478 """ | 476 """ |
| 479 Applies RPS enrichment results to the provided metabolic map. | 477 Applies RPS enrichment results to the provided metabolic map. |
| 480 | 478 |
| 481 Args: | 479 Args: |
| 499 if isinstance(foldChange, str): foldChange = float(foldChange) | 497 if isinstance(foldChange, str): foldChange = float(foldChange) |
| 500 if pValue >= ARGS.pValue: # pValue above tresh: dashed arrow | 498 if pValue >= ARGS.pValue: # pValue above tresh: dashed arrow |
| 501 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId) | 499 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId) |
| 502 continue | 500 continue |
| 503 | 501 |
| 504 if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1): | 502 if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1): |
| 505 INVALID_ARROW.styleReactionElements(metabMap, reactionId) | 503 INVALID_ARROW.styleReactionElements(metabMap, reactionId) |
| 506 continue | 504 continue |
| 507 | 505 |
| 508 width = Arrow.MAX_W | 506 width = Arrow.MAX_W |
| 509 if not math.isinf(foldChange): | 507 if not math.isinf(foldChange): |
| 510 try: width = max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W) | 508 try: width = min( |
| 509 max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W), | |
| 510 Arrow.MAX_W) | |
| 511 | |
| 511 except ZeroDivisionError: pass | 512 except ZeroDivisionError: pass |
| 512 | 513 |
| 513 if not reactionId.endswith("_RV"): # RV stands for reversible reactions | 514 if not reactionId.endswith("_RV"): # RV stands for reversible reactions |
| 514 Arrow(width, ArrowColor.fromFoldChangeSign(foldChange)).styleReactionElements(metabMap, reactionId) | 515 Arrow(width, ArrowColor.fromFoldChangeSign(foldChange)).styleReactionElements(metabMap, reactionId) |
| 515 continue | 516 continue |
| 516 | 517 |
| 517 reactionId = reactionId[:-3] # Remove "_RV" | 518 reactionId = reactionId[:-3] # Remove "_RV" |
| 518 | 519 |
| 519 inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score | 520 inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score |
| 520 if inversionScore == 2: foldChange *= -1 | |
| 521 # ^^^ Style the inverse direction with the opposite sign netValue | |
| 522 | 521 |
| 523 # If the score is 1 (opposite signs) we use alternative colors vvv | 522 # If the score is 1 (opposite signs) we use alternative colors vvv |
| 524 arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1)) | 523 arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1)) |
| 525 | 524 |
| 526 # vvv These 2 if statements can both be true and can both happen | 525 # vvv These 2 if statements can both be true and can both happen |
| 529 | 528 |
| 530 if not ARGS.using_RAS: # style arrow body | 529 if not ARGS.using_RAS: # style arrow body |
| 531 arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False) | 530 arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False) |
| 532 | 531 |
| 533 ############################ split class ###################################### | 532 ############################ split class ###################################### |
| 534 def split_class(classes :pd.DataFrame, resolve_rules :Dict[str, List[float]]) -> Dict[str, List[List[float]]]: | 533 def split_class(classes :pd.DataFrame, dataset_values :Dict[str, List[float]]) -> Dict[str, List[List[float]]]: |
| 535 """ | 534 """ |
| 536 Generates a :dict that groups together data from a :DataFrame based on classes the data is related to. | 535 Generates a :dict that groups together data from a :DataFrame based on classes the data is related to. |
| 537 | 536 |
| 538 Args: | 537 Args: |
| 539 classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict | 538 classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict |
| 540 resolve_rules : a :dict containing :float data | 539 dataset_values : a :dict containing :float data |
| 541 | 540 |
| 542 Returns: | 541 Returns: |
| 543 dict : the dict with data grouped by class | 542 dict : the dict with data grouped by class |
| 544 | 543 |
| 545 Side effects: | 544 Side effects: |
| 551 if pd.isnull(classe): continue | 550 if pd.isnull(classe): continue |
| 552 | 551 |
| 553 l :List[List[float]] = [] | 552 l :List[List[float]] = [] |
| 554 for j in range(i, len(classes)): | 553 for j in range(i, len(classes)): |
| 555 if classes.iloc[j, 1] == classe: | 554 if classes.iloc[j, 1] == classe: |
| 556 pat_id :str = classes.iloc[j, 0] | 555 pat_id :str = classes.iloc[j, 0] # sample name |
| 557 tmp = resolve_rules.get(pat_id, None) | 556 values = dataset_values.get(pat_id, None) # the column of values for that sample |
| 558 if tmp != None: | 557 if values != None: |
| 559 l.append(tmp) | 558 l.append(values) |
| 560 classes.iloc[j, 1] = None | 559 classes.iloc[j, 1] = None # TODO: problems? |
| 561 | 560 |
| 562 if l: | 561 if l: |
| 563 class_pat[classe] = list(map(list, zip(*l))) | 562 class_pat[classe] = list(map(list, zip(*l))) |
| 564 continue | 563 continue |
| 565 | 564 |
| 598 white_background = white_background.extract_band(0) | 597 white_background = white_background.extract_band(0) |
| 599 | 598 |
| 600 composite_image = white_background.composite2(image, 'over') | 599 composite_image = white_background.composite2(image, 'over') |
| 601 composite_image.write_to_file(png_path.show()) | 600 composite_image.write_to_file(png_path.show()) |
| 602 | 601 |
| 603 #funzione unica, lascio fuori i file e li passo in input | |
| 604 #conversion from png to pdf | |
| 605 def convert_png_to_pdf(png_file :utils.FilePath, pdf_file :utils.FilePath) -> None: | |
| 606 """ | |
| 607 Internal utility to convert a PNG to PDF to aid from SVG conversion. | |
| 608 | |
| 609 Args: | |
| 610 png_file : path to PNG file | |
| 611 pdf_file : path to new PDF file | |
| 612 | |
| 613 Returns: | |
| 614 None | |
| 615 """ | |
| 616 image = Image.open(png_file.show()) | |
| 617 image = image.convert("RGB") | |
| 618 image.save(pdf_file.show(), "PDF", resolution=100.0) | |
| 619 | |
| 620 #function called to reduce redundancy in the code | |
| 621 def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None: | 602 def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None: |
| 622 """ | 603 """ |
| 623 Converts the SVG map at the provided path to PDF. | 604 Converts the SVG map at the provided path to PDF. |
| 624 | 605 |
| 625 Args: | 606 Args: |
| 630 Returns: | 611 Returns: |
| 631 None | 612 None |
| 632 """ | 613 """ |
| 633 svg_to_png_with_background(file_svg, file_png) | 614 svg_to_png_with_background(file_svg, file_png) |
| 634 try: | 615 try: |
| 635 convert_png_to_pdf(file_png, file_pdf) | 616 image = Image.open(file_png.show()) |
| 617 image = image.convert("RGB") | |
| 618 image.save(file_pdf.show(), "PDF", resolution=100.0) | |
| 636 print(f'PDF file {file_pdf.filePath} successfully generated.') | 619 print(f'PDF file {file_pdf.filePath} successfully generated.') |
| 637 | 620 |
| 638 except Exception as e: | 621 except Exception as e: |
| 639 raise utils.DataErr(file_pdf.show(), f'Error generating PDF file: {e}') | 622 raise utils.DataErr(file_pdf.show(), f'Error generating PDF file: {e}') |
| 640 | 623 |
| 675 sizeMismatch = fieldsAmt - len(row) | 658 sizeMismatch = fieldsAmt - len(row) |
| 676 if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch) | 659 if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch) |
| 677 writer.writerow({ field : data for field, data in zip(fieldNames, row) }) | 660 writer.writerow({ field : data for field, data in zip(fieldNames, row) }) |
| 678 | 661 |
| 679 OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible | 662 OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible |
| 680 def writeTabularResult(enrichedScores : OldEnrichedScores, ras_enrichment: bool, outPath :utils.FilePath) -> None: | 663 def temp_thingsInCommon(tmp :OldEnrichedScores, core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None: |
| 681 fieldNames = ["ids", "P_Value", "fold change", "average_1", "average_2"] | |
| 682 | |
| 683 writeToCsv([ [reactId] + values for reactId, values in enrichedScores.items() ], fieldNames, outPath) | |
| 684 | |
| 685 def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None: | |
| 686 # this function compiles the things always in common between comparison modes after enrichment. | 664 # this function compiles the things always in common between comparison modes after enrichment. |
| 687 # TODO: organize, name better. | 665 # TODO: organize, name better. |
| 688 suffix = "RAS" if ras_enrichment else "RPS" | 666 suffix = "RAS" if ras_enrichment else "RPS" |
| 689 details = f"Tabular Result ({suffix})" | 667 writeToCsv( |
| 690 | 668 [ [reactId] + values for reactId, values in tmp.items() ], |
| 691 writeTabularResult(tmp, ras_enrichment, buildOutputPath(dataset1Name, dataset2Name, details = details, ext = utils.FileFormat.TSV)) | 669 ["ids", "P_Value", "fold change", "average_1", "average_2"], |
| 670 buildOutputPath(dataset1Name, dataset2Name, details = f"Tabular Result ({suffix})", ext = utils.FileFormat.TSV)) | |
| 692 | 671 |
| 693 if ras_enrichment: | 672 if ras_enrichment: |
| 694 fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_z_score) | 673 fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_z_score) |
| 695 return | 674 return |
| 696 | 675 |
| 731 | 710 |
| 732 return p_value, z_score | 711 return p_value, z_score |
| 733 | 712 |
| 734 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]: | 713 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]: |
| 735 #TODO: the following code still suffers from "dumbvarnames-osis" | 714 #TODO: the following code still suffers from "dumbvarnames-osis" |
| 736 tmp :Dict[str, List[Union[float, FoldChange]]] = {} | 715 comparisonResult :Dict[str, List[Union[float, FoldChange]]] = {} |
| 737 count = 0 | 716 count = 0 |
| 738 max_z_score = 0 | 717 max_z_score = 0 |
| 739 | 718 |
| 740 for l1, l2 in zip(dataset1Data, dataset2Data): | 719 for l1, l2 in zip(dataset1Data, dataset2Data): |
| 741 reactId = ids[count] | 720 reactId = ids[count] |
| 756 avg1 = sum(nets1) / len(nets1) | 735 avg1 = sum(nets1) / len(nets1) |
| 757 avg2 = sum(nets2) / len(nets2) | 736 avg2 = sum(nets2) / len(nets2) |
| 758 net = fold_change(avg1, avg2) | 737 net = fold_change(avg1, avg2) |
| 759 | 738 |
| 760 if math.isnan(net): continue | 739 if math.isnan(net): continue |
| 761 tmp[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2] | 740 comparisonResult[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2] |
| 762 | 741 |
| 763 # vvv complementary directional ids are set to None once processed if net is to be applied to tips | 742 # vvv complementary directional ids are set to None once processed if net is to be applied to tips |
| 764 if ARGS.net: | 743 if ARGS.net: # If only using RPS, we cannot delete the inverse, as it's needed to color the arrows |
| 765 ids[position] = None | 744 ids[position] = None |
| 766 continue | 745 continue |
| 767 | 746 |
| 768 # fallthrough is intended, regular scores need to be computed when tips aren't net but RAS datasets aren't used | 747 # fallthrough is intended, regular scores need to be computed when tips aren't net but RAS datasets aren't used |
| 769 p_value, z_score = computePValue(l1, l2) | 748 p_value, z_score = computePValue(l1, l2) |
| 770 avg = fold_change(sum(l1) / len(l1), sum(l2) / len(l2)) | 749 avg = fold_change(sum(l1) / len(l1), sum(l2) / len(l2)) |
| 771 if not isinstance(z_score, str) and max_z_score < abs(z_score): max_z_score = abs(z_score) | 750 # vvv TODO: Check numpy version compatibility |
| 772 tmp[reactId] = [float(p_value), avg, z_score, sum(l1) / len(l1), sum(l2) / len(l2)] | 751 if np.isfinite(z_score) and max_z_score < abs(z_score): max_z_score = abs(z_score) |
| 752 comparisonResult[reactId] = [float(p_value), avg, z_score, sum(l1) / len(l1), sum(l2) / len(l2)] | |
| 773 | 753 |
| 774 except (TypeError, ZeroDivisionError): continue | 754 except (TypeError, ZeroDivisionError): continue |
| 775 | 755 |
| 776 return tmp, max_z_score | 756 return comparisonResult, max_z_score |
| 777 | 757 |
| 778 def computeEnrichment(class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> List[Tuple[str, str, dict, float]]: | 758 def computeEnrichment(class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> List[Tuple[str, str, dict, float]]: |
| 779 """ | 759 """ |
| 780 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the | 760 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the |
| 781 provided metabolic map. | 761 provided metabolic map. |
| 823 utils.writeSvg(svgFilePath, core_map) | 803 utils.writeSvg(svgFilePath, core_map) |
| 824 | 804 |
| 825 if ARGS.generate_pdf: | 805 if ARGS.generate_pdf: |
| 826 pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG) | 806 pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG) |
| 827 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF) | 807 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF) |
| 828 convert_to_pdf(svgFilePath, pngPath, pdfPath) | 808 svg_to_png_with_background(svgFilePath, pngPath) |
| 829 | 809 try: |
| 830 if not ARGS.generate_svg: | 810 image = Image.open(pngPath.show()) |
| 811 image = image.convert("RGB") | |
| 812 image.save(pdfPath.show(), "PDF", resolution=100.0) | |
| 813 print(f'PDF file {pdfPath.filePath} successfully generated.') | |
| 814 | |
| 815 except Exception as e: | |
| 816 raise utils.DataErr(pdfPath.show(), f'Error generating PDF file: {e}') | |
| 817 | |
| 818 if not ARGS.generate_svg: # This argument is useless, who cares if the user wants the svg or not | |
| 831 os.remove(svgFilePath.show()) | 819 os.remove(svgFilePath.show()) |
| 832 | 820 |
| 833 ClassPat = Dict[str, List[List[float]]] | 821 ClassPat = Dict[str, List[List[float]]] |
| 834 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]: | 822 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]: |
| 835 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate, | 823 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate, |
| 836 # for the sake of everyone's sanity. | 824 # for the sake of everyone's sanity. |
| 837 class_pat :ClassPat = {} | 825 class_pat :ClassPat = {} |
| 838 if ARGS.option == 'datasets': | 826 if ARGS.option == 'datasets': |
| 839 num = 1 #TODO: the dataset naming function could be a generator | 827 num = 1 |
| 840 for path, name in zip(datasetsPaths, names): | 828 for path, name in zip(datasetsPaths, names): |
| 841 name = name_dataset(name, num) | 829 name = str(name) |
| 842 resolve_rules_float, ids = getDatasetValues(path, name) | 830 if name == 'Dataset': |
| 843 if resolve_rules_float != None: | 831 name += '_' + str(num) |
| 844 class_pat[name] = list(map(list, zip(*resolve_rules_float.values()))) | 832 |
| 845 | 833 values, ids = getDatasetValues(path, name) |
| 834 if values != None: | |
| 835 class_pat[name] = list(map(list, zip(*values.values()))) | |
| 836 # TODO: ??? | |
| 837 | |
| 846 num += 1 | 838 num += 1 |
| 847 | 839 |
| 848 elif ARGS.option == "dataset_class": | 840 elif ARGS.option == "dataset_class": |
| 849 classes = read_dataset(classPath, "class") | 841 classes = read_dataset(classPath, "class") |
| 850 classes = classes.astype(str) | 842 classes = classes.astype(str) |
| 851 | 843 |
| 852 resolve_rules_float, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)") | 844 values, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)") |
| 853 if resolve_rules_float != None: class_pat = split_class(classes, resolve_rules_float) | 845 if values != None: class_pat = split_class(classes, values) |
| 854 | 846 |
| 855 return ids, class_pat | 847 return ids, class_pat |
| 856 #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs) | 848 #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs) |
| 857 | 849 |
| 858 #TODO: create these damn args as FilePath objects | 850 #TODO: create these damn args as FilePath objects |
| 885 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError) | 877 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError) |
| 886 """ | 878 """ |
| 887 global ARGS | 879 global ARGS |
| 888 ARGS = process_args(args) | 880 ARGS = process_args(args) |
| 889 | 881 |
| 882 # Create output folder | |
| 890 if not os.path.isdir(ARGS.output_path): | 883 if not os.path.isdir(ARGS.output_path): |
| 891 os.makedirs(ARGS.output_path) | 884 os.makedirs(ARGS.output_path, exist_ok=True) |
| 892 | 885 |
| 893 core_map: ET.ElementTree = ARGS.choice_map.getMap( | 886 core_map: ET.ElementTree = ARGS.choice_map.getMap( |
| 894 ARGS.tool_dir, | 887 ARGS.tool_dir, |
| 895 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None) | 888 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None) |
| 896 | 889 |
| 897 # Prepare enrichment results containers | 890 # TODO: in the future keep the indices WITH the data and fix the code below. |
| 891 | |
| 892 # Prepare enrichment results containers | |
| 898 ras_results = [] | 893 ras_results = [] |
| 899 rps_results = [] | 894 rps_results = [] |
| 900 | 895 |
| 901 # Compute RAS enrichment if requested | 896 # Compute RAS enrichment if requested |
| 902 if ARGS.using_RAS: | 897 if ARGS.using_RAS: |
| 910 ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps) | 905 ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps) |
| 911 rps_results = computeEnrichment(class_pat_rps, ids_rps, fromRAS=False) | 906 rps_results = computeEnrichment(class_pat_rps, ids_rps, fromRAS=False) |
| 912 | 907 |
| 913 # Organize by comparison pairs | 908 # Organize by comparison pairs |
| 914 comparisons: Dict[Tuple[str, str], Dict[str, Tuple]] = {} | 909 comparisons: Dict[Tuple[str, str], Dict[str, Tuple]] = {} |
| 915 for i, j, d, mz in ras_results: | 910 for i, j, comparison_data, max_z_score in ras_results: |
| 916 comparisons[(i, j)] = {'ras': (d, mz), 'rps': None} | 911 comparisons[(i, j)] = {'ras': (comparison_data, max_z_score), 'rps': None} |
| 917 for i, j, d, mz in rps_results: | 912 for i, j, comparison_data, max_z_score in rps_results: |
| 918 comparisons.setdefault((i, j), {}).update({'rps': (d, mz)}) | 913 comparisons.setdefault((i, j), {}).update({'rps': (comparison_data, max_z_score)}) |
| 919 | 914 |
| 920 # For each comparison, create a styled map with RAS bodies and RPS heads | 915 # For each comparison, create a styled map with RAS bodies and RPS heads |
| 921 for (i, j), res in comparisons.items(): | 916 for (i, j), res in comparisons.items(): |
| 922 map_copy = copy.deepcopy(core_map) | 917 map_copy = copy.deepcopy(core_map) |
| 923 | 918 |
| 927 temp_thingsInCommon(tmp_ras, map_copy, max_z_ras, i, j, ras_enrichment=True) | 922 temp_thingsInCommon(tmp_ras, map_copy, max_z_ras, i, j, ras_enrichment=True) |
| 928 | 923 |
| 929 # Apply RPS styling to arrow heads | 924 # Apply RPS styling to arrow heads |
| 930 if res.get('rps'): | 925 if res.get('rps'): |
| 931 tmp_rps, max_z_rps = res['rps'] | 926 tmp_rps, max_z_rps = res['rps'] |
| 932 # Ensure applyRpsEnrichmentToMap styles only heads; adjust internals if needed | 927 # applyRpsEnrichmentToMap styles only heads unless only RPS are used |
| 933 temp_thingsInCommon(tmp_rps, map_copy, max_z_rps, i, j, ras_enrichment=False) | 928 temp_thingsInCommon(tmp_rps, map_copy, max_z_rps, i, j, ras_enrichment=False) |
| 934 | 929 |
| 935 # Output both SVG and PDF/PNG as configured | 930 # Output both SVG and PDF/PNG as configured |
| 936 createOutputMaps(i, j, map_copy) | 931 createOutputMaps(i, j, map_copy) |
| 937 print('Execution succeeded') | 932 print('Execution succeeded') |
