Mercurial > repos > bimib > cobraxy
comparison COBRAxy/marea.py @ 286:eaa623b4b703 draft
Uploaded
author | francesco_lapi |
---|---|
date | Wed, 14 May 2025 08:44:03 +0000 |
parents | 216a796a967a |
children | c8622bca2480 |
comparison
equal
deleted
inserted
replaced
285:216a796a967a | 286:eaa623b4b703 |
---|---|
46 | 46 |
47 #Computation details: | 47 #Computation details: |
48 parser.add_argument( | 48 parser.add_argument( |
49 '-co', '--comparison', | 49 '-co', '--comparison', |
50 type = str, | 50 type = str, |
51 default = '1vs1', | 51 default = 'manyvsmany', |
52 choices = ['manyvsmany', 'onevsrest', 'onevsmany']) | 52 choices = ['manyvsmany', 'onevsrest', 'onevsmany']) |
53 | 53 |
54 parser.add_argument( | 54 parser.add_argument( |
55 '-pv' ,'--pValue', | 55 '-pv' ,'--pValue', |
56 type = float, | 56 type = float, |
183 sys.exit('Execution aborted: wrong format of ' + name + '\n') | 183 sys.exit('Execution aborted: wrong format of ' + name + '\n') |
184 if len(dataset.columns) < 2: | 184 if len(dataset.columns) < 2: |
185 sys.exit('Execution aborted: wrong format of ' + name + '\n') | 185 sys.exit('Execution aborted: wrong format of ' + name + '\n') |
186 return dataset | 186 return dataset |
187 | 187 |
188 ############################ dataset name ##################################### | |
189 def name_dataset(name_data :str, count :int) -> str: | |
190 """ | |
191 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique. | |
192 | |
193 Args: | |
194 name_data : name associated with the dataset (from frontend input params) | |
195 count : counter from 1 to make these names unique (external) | |
196 | |
197 Returns: | |
198 str : the name made unique | |
199 """ | |
200 if str(name_data) == 'Dataset': | |
201 return str(name_data) + '_' + str(count) | |
202 else: | |
203 return str(name_data) | |
204 | |
205 ############################ map_methods ###################################### | 188 ############################ map_methods ###################################### |
206 FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]] | 189 FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]] |
207 def fold_change(avg1 :float, avg2 :float) -> FoldChange: | 190 def fold_change(avg1 :float, avg2 :float) -> FoldChange: |
208 """ | 191 """ |
209 Calculates the fold change between two gene expression values. | 192 Calculates the fold change between two gene expression values. |
218 "INF" : when avg2 is 0 | 201 "INF" : when avg2 is 0 |
219 float : for any other combination of values | 202 float : for any other combination of values |
220 """ | 203 """ |
221 if avg1 == 0 and avg2 == 0: | 204 if avg1 == 0 and avg2 == 0: |
222 return 0 | 205 return 0 |
223 elif avg1 == 0: | 206 |
224 return '-INF' | 207 if avg1 == 0: |
225 elif avg2 == 0: | 208 return '-INF' # TODO: maybe fix |
209 | |
210 if avg2 == 0: | |
226 return 'INF' | 211 return 'INF' |
227 else: # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1 | 212 |
228 return (avg1 - avg2) / (abs(avg1) + abs(avg2)) | 213 # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1 |
229 | 214 fc = (avg1 - avg2) / (abs(avg1) + abs(avg2)) |
215 if avg1 < 0 and avg2 < 0: fc *= -1 | |
216 return fc | |
217 | |
218 # TODO: I would really like for this one to get the Thanos treatment | |
230 def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str: | 219 def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str: |
231 """ | 220 """ |
232 Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params. | 221 Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params. |
233 | 222 |
234 Args: | 223 Args: |
260 tmp.append('stroke-width:' + width) | 249 tmp.append('stroke-width:' + width) |
261 if not flag_dash: | 250 if not flag_dash: |
262 tmp.append('stroke-dasharray:' + dash) | 251 tmp.append('stroke-dasharray:' + dash) |
263 return ';'.join(tmp) | 252 return ';'.join(tmp) |
264 | 253 |
254 # TODO: remove, there's applyRPS whatever | |
265 # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix! | 255 # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix! |
266 def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree: | 256 def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree: |
267 """ | 257 """ |
268 Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs. | 258 Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs. |
269 | 259 |
288 for el in core_map.iter(): | 278 for el in core_map.iter(): |
289 el_id = str(el.get('id')) | 279 el_id = str(el.get('id')) |
290 if el_id.startswith('R_'): | 280 if el_id.startswith('R_'): |
291 tmp = d.get(el_id[2:]) | 281 tmp = d.get(el_id[2:]) |
292 if tmp != None: | 282 if tmp != None: |
293 p_val :float = tmp[0] | 283 p_val, f_c, z_score, avg1, avg2 = tmp |
294 f_c = tmp[1] | |
295 z_score = tmp[2] | |
296 | 284 |
297 if math.isnan(p_val) or (isinstance(f_c, float) and math.isnan(f_c)): continue | 285 if math.isnan(p_val) or (isinstance(f_c, float) and math.isnan(f_c)): continue |
298 | 286 |
299 if p_val <= threshold_P_V: | 287 if p_val <= threshold_P_V: # p-value is OK |
300 if not isinstance(f_c, str): | 288 if not isinstance(f_c, str): # FC is finite |
301 if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # | 289 if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # FC is not OK |
302 col = grey | 290 col = grey |
303 width = str(minT) | 291 width = str(minT) |
304 else: | 292 else: # FC is OK |
305 if f_c < 0: | 293 if f_c < 0: |
306 col = blue | 294 col = blue |
307 elif f_c > 0: | 295 elif f_c > 0: |
308 col = red | 296 col = red |
309 width = str(max((abs(z_score) * maxT) / max_z_score, minT)) | 297 width = str( |
310 else: | 298 min( |
299 max(abs(z_score * maxT) / max_z_score, minT), | |
300 maxT)) | |
301 | |
302 else: # FC is infinite | |
311 if f_c == '-INF': | 303 if f_c == '-INF': |
312 col = blue | 304 col = blue |
313 elif f_c == 'INF': | 305 elif f_c == 'INF': |
314 col = red | 306 col = red |
315 width = str(maxT) | 307 width = str(maxT) |
316 dash = 'none' | 308 dash = 'none' |
317 else: | 309 else: # p-value is not OK |
318 dash = '5,5' | 310 dash = '5,5' |
319 col = grey | 311 col = grey |
320 width = str(minT) | 312 width = str(minT) |
321 el.set('style', fix_style(el.get('style', ""), col, width, dash)) | 313 el.set('style', fix_style(el.get('style', ""), col, width, dash)) |
322 return core_map | 314 return core_map |
332 | 324 |
333 Returns: | 325 Returns: |
334 utils.Result[ET.Element, ResultErr]: result of the search, either the first match found or a ResultErr. | 326 utils.Result[ET.Element, ResultErr]: result of the search, either the first match found or a ResultErr. |
335 """ | 327 """ |
336 return utils.Result.Ok( | 328 return utils.Result.Ok( |
337 f"//*[@id=\"{reactionId}\"]").map( | 329 lambda: metabMap.xpath(f"//*[@id=\"{reactionId}\"]")[0]).mapErr( |
338 lambda xPath : metabMap.xpath(xPath)[0]).mapErr( | |
339 lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map")) | 330 lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map")) |
340 # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user. | 331 # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user. |
341 | 332 |
342 def styleMapElement(element :ET.Element, styleStr :str) -> None: | 333 def styleMapElement(element :ET.Element, styleStr :str) -> None: |
343 currentStyles :str = element.get("style", "") | 334 currentStyles :str = element.get("style", "") |
344 if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles): | 335 if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles): |
345 currentStyles = ';'.join(currentStyles.split(';')[:-3]) | 336 currentStyles = ';'.join(currentStyles.split(';')[:-3]) # TODO: why the last 3? Are we sure? |
337 | |
338 #TODO: this is attempting to solve the styling override problem, not sure it does tho | |
346 | 339 |
347 element.set("style", currentStyles + styleStr) | 340 element.set("style", currentStyles + styleStr) |
348 | 341 |
342 # TODO: maybe remove vvv | |
349 class ReactionDirection(Enum): | 343 class ReactionDirection(Enum): |
350 Unknown = "" | 344 Unknown = "" |
351 Direct = "_F" | 345 Direct = "_F" |
352 Inverse = "_B" | 346 Inverse = "_B" |
353 | 347 |
376 | 370 |
377 Returns: | 371 Returns: |
378 Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try. | 372 Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try. |
379 """ | 373 """ |
380 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV | 374 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV |
381 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: return reactionId[:-3:-1] + reactionId[:-2], "" | 375 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: |
376 return reactionId[:-3:-1] + reactionId[:-2], "" # ^^^ Invert _F to F_ | |
377 | |
382 return f"F_{reactionId}", f"B_{reactionId}" | 378 return f"F_{reactionId}", f"B_{reactionId}" |
383 | 379 |
384 class ArrowColor(Enum): | 380 class ArrowColor(Enum): |
385 """ | 381 """ |
386 Encodes possible arrow colors based on their meaning in the enrichment process. | 382 Encodes possible arrow colors based on their meaning in the enrichment process. |
387 """ | 383 """ |
388 Invalid = "#BEBEBE" # gray, fold-change under treshold | 384 Invalid = "#BEBEBE" # gray, fold-change under treshold |
389 UpRegulated = "#ecac68" # red, up-regulated reaction | 385 UpRegulated = "#ecac68" # orange, up-regulated reaction |
390 DownRegulated = "#6495ed" # blue, down-regulated reaction | 386 DownRegulated = "#6495ed" # lightblue, down-regulated reaction |
391 | 387 |
392 UpRegulatedInv = "#FF0000" | 388 UpRegulatedInv = "#FF0000" |
393 # ^^^ different shade of red (actually orange), up-regulated net value for a reversible reaction with | 389 # ^^^ bright red, up-regulated net value for a reversible reaction with |
394 # conflicting enrichment in the two directions. | 390 # conflicting enrichment in the two directions. |
395 | 391 |
396 DownRegulatedInv = "#0000FF" | 392 DownRegulatedInv = "#0000FF" |
397 # ^^^ different shade of blue (actually purple), down-regulated net value for a reversible reaction with | 393 # ^^^ bright blue, down-regulated net value for a reversible reaction with |
398 # conflicting enrichment in the two directions. | 394 # conflicting enrichment in the two directions. |
399 | 395 |
400 @classmethod | 396 @classmethod |
401 def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor": | 397 def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor": |
402 colors = (cls.DownRegulated, cls.DownRegulatedInv) if foldChange < 0 else (cls.UpRegulated, cls.UpRegulatedInv) | 398 colors = (cls.DownRegulated, cls.DownRegulatedInv) if foldChange < 0 else (cls.UpRegulated, cls.UpRegulatedInv) |
439 # Now we style the arrow head(s): | 435 # Now we style the arrow head(s): |
440 idOpt1, idOpt2 = getArrowHeadElementId(reactionId) | 436 idOpt1, idOpt2 = getArrowHeadElementId(reactionId) |
441 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True)) | 437 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True)) |
442 if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True)) | 438 if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True)) |
443 | 439 |
440 # TODO: this seems to be unused, remove | |
444 def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str: | 441 def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str: |
445 """ | 442 """ |
446 Computes the reaction ID as encoded in the map for a given reaction ID from the dataset. | 443 Computes the reaction ID as encoded in the map for a given reaction ID from the dataset. |
447 | 444 |
448 Args: | 445 Args: |
472 # vvv These constants could be inside the class itself a static properties, but python | 469 # vvv These constants could be inside the class itself a static properties, but python |
473 # was built by brainless organisms so here we are! | 470 # was built by brainless organisms so here we are! |
474 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid) | 471 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid) |
475 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True) | 472 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True) |
476 | 473 |
474 # TODO: A more general version of this can be used for RAS as well, we don't need "fix map" or whatever | |
477 def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None: | 475 def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None: |
478 """ | 476 """ |
479 Applies RPS enrichment results to the provided metabolic map. | 477 Applies RPS enrichment results to the provided metabolic map. |
480 | 478 |
481 Args: | 479 Args: |
499 if isinstance(foldChange, str): foldChange = float(foldChange) | 497 if isinstance(foldChange, str): foldChange = float(foldChange) |
500 if pValue >= ARGS.pValue: # pValue above tresh: dashed arrow | 498 if pValue >= ARGS.pValue: # pValue above tresh: dashed arrow |
501 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId) | 499 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId) |
502 continue | 500 continue |
503 | 501 |
504 if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1): | 502 if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1): |
505 INVALID_ARROW.styleReactionElements(metabMap, reactionId) | 503 INVALID_ARROW.styleReactionElements(metabMap, reactionId) |
506 continue | 504 continue |
507 | 505 |
508 width = Arrow.MAX_W | 506 width = Arrow.MAX_W |
509 if not math.isinf(foldChange): | 507 if not math.isinf(foldChange): |
510 try: width = max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W) | 508 try: width = min( |
509 max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W), | |
510 Arrow.MAX_W) | |
511 | |
511 except ZeroDivisionError: pass | 512 except ZeroDivisionError: pass |
512 | 513 |
513 if not reactionId.endswith("_RV"): # RV stands for reversible reactions | 514 if not reactionId.endswith("_RV"): # RV stands for reversible reactions |
514 Arrow(width, ArrowColor.fromFoldChangeSign(foldChange)).styleReactionElements(metabMap, reactionId) | 515 Arrow(width, ArrowColor.fromFoldChangeSign(foldChange)).styleReactionElements(metabMap, reactionId) |
515 continue | 516 continue |
516 | 517 |
517 reactionId = reactionId[:-3] # Remove "_RV" | 518 reactionId = reactionId[:-3] # Remove "_RV" |
518 | 519 |
519 inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score | 520 inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score |
520 if inversionScore == 2: foldChange *= -1 | |
521 # ^^^ Style the inverse direction with the opposite sign netValue | |
522 | 521 |
523 # If the score is 1 (opposite signs) we use alternative colors vvv | 522 # If the score is 1 (opposite signs) we use alternative colors vvv |
524 arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1)) | 523 arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1)) |
525 | 524 |
526 # vvv These 2 if statements can both be true and can both happen | 525 # vvv These 2 if statements can both be true and can both happen |
529 | 528 |
530 if not ARGS.using_RAS: # style arrow body | 529 if not ARGS.using_RAS: # style arrow body |
531 arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False) | 530 arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False) |
532 | 531 |
533 ############################ split class ###################################### | 532 ############################ split class ###################################### |
534 def split_class(classes :pd.DataFrame, resolve_rules :Dict[str, List[float]]) -> Dict[str, List[List[float]]]: | 533 def split_class(classes :pd.DataFrame, dataset_values :Dict[str, List[float]]) -> Dict[str, List[List[float]]]: |
535 """ | 534 """ |
536 Generates a :dict that groups together data from a :DataFrame based on classes the data is related to. | 535 Generates a :dict that groups together data from a :DataFrame based on classes the data is related to. |
537 | 536 |
538 Args: | 537 Args: |
539 classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict | 538 classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict |
540 resolve_rules : a :dict containing :float data | 539 dataset_values : a :dict containing :float data |
541 | 540 |
542 Returns: | 541 Returns: |
543 dict : the dict with data grouped by class | 542 dict : the dict with data grouped by class |
544 | 543 |
545 Side effects: | 544 Side effects: |
551 if pd.isnull(classe): continue | 550 if pd.isnull(classe): continue |
552 | 551 |
553 l :List[List[float]] = [] | 552 l :List[List[float]] = [] |
554 for j in range(i, len(classes)): | 553 for j in range(i, len(classes)): |
555 if classes.iloc[j, 1] == classe: | 554 if classes.iloc[j, 1] == classe: |
556 pat_id :str = classes.iloc[j, 0] | 555 pat_id :str = classes.iloc[j, 0] # sample name |
557 tmp = resolve_rules.get(pat_id, None) | 556 values = dataset_values.get(pat_id, None) # the column of values for that sample |
558 if tmp != None: | 557 if values != None: |
559 l.append(tmp) | 558 l.append(values) |
560 classes.iloc[j, 1] = None | 559 classes.iloc[j, 1] = None # TODO: problems? |
561 | 560 |
562 if l: | 561 if l: |
563 class_pat[classe] = list(map(list, zip(*l))) | 562 class_pat[classe] = list(map(list, zip(*l))) |
564 continue | 563 continue |
565 | 564 |
598 white_background = white_background.extract_band(0) | 597 white_background = white_background.extract_band(0) |
599 | 598 |
600 composite_image = white_background.composite2(image, 'over') | 599 composite_image = white_background.composite2(image, 'over') |
601 composite_image.write_to_file(png_path.show()) | 600 composite_image.write_to_file(png_path.show()) |
602 | 601 |
603 #funzione unica, lascio fuori i file e li passo in input | |
604 #conversion from png to pdf | |
605 def convert_png_to_pdf(png_file :utils.FilePath, pdf_file :utils.FilePath) -> None: | |
606 """ | |
607 Internal utility to convert a PNG to PDF to aid from SVG conversion. | |
608 | |
609 Args: | |
610 png_file : path to PNG file | |
611 pdf_file : path to new PDF file | |
612 | |
613 Returns: | |
614 None | |
615 """ | |
616 image = Image.open(png_file.show()) | |
617 image = image.convert("RGB") | |
618 image.save(pdf_file.show(), "PDF", resolution=100.0) | |
619 | |
620 #function called to reduce redundancy in the code | |
621 def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None: | 602 def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None: |
622 """ | 603 """ |
623 Converts the SVG map at the provided path to PDF. | 604 Converts the SVG map at the provided path to PDF. |
624 | 605 |
625 Args: | 606 Args: |
630 Returns: | 611 Returns: |
631 None | 612 None |
632 """ | 613 """ |
633 svg_to_png_with_background(file_svg, file_png) | 614 svg_to_png_with_background(file_svg, file_png) |
634 try: | 615 try: |
635 convert_png_to_pdf(file_png, file_pdf) | 616 image = Image.open(file_png.show()) |
617 image = image.convert("RGB") | |
618 image.save(file_pdf.show(), "PDF", resolution=100.0) | |
636 print(f'PDF file {file_pdf.filePath} successfully generated.') | 619 print(f'PDF file {file_pdf.filePath} successfully generated.') |
637 | 620 |
638 except Exception as e: | 621 except Exception as e: |
639 raise utils.DataErr(file_pdf.show(), f'Error generating PDF file: {e}') | 622 raise utils.DataErr(file_pdf.show(), f'Error generating PDF file: {e}') |
640 | 623 |
675 sizeMismatch = fieldsAmt - len(row) | 658 sizeMismatch = fieldsAmt - len(row) |
676 if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch) | 659 if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch) |
677 writer.writerow({ field : data for field, data in zip(fieldNames, row) }) | 660 writer.writerow({ field : data for field, data in zip(fieldNames, row) }) |
678 | 661 |
679 OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible | 662 OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible |
680 def writeTabularResult(enrichedScores : OldEnrichedScores, ras_enrichment: bool, outPath :utils.FilePath) -> None: | 663 def temp_thingsInCommon(tmp :OldEnrichedScores, core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None: |
681 fieldNames = ["ids", "P_Value", "fold change", "average_1", "average_2"] | |
682 | |
683 writeToCsv([ [reactId] + values for reactId, values in enrichedScores.items() ], fieldNames, outPath) | |
684 | |
685 def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None: | |
686 # this function compiles the things always in common between comparison modes after enrichment. | 664 # this function compiles the things always in common between comparison modes after enrichment. |
687 # TODO: organize, name better. | 665 # TODO: organize, name better. |
688 suffix = "RAS" if ras_enrichment else "RPS" | 666 suffix = "RAS" if ras_enrichment else "RPS" |
689 details = f"Tabular Result ({suffix})" | 667 writeToCsv( |
690 | 668 [ [reactId] + values for reactId, values in tmp.items() ], |
691 writeTabularResult(tmp, ras_enrichment, buildOutputPath(dataset1Name, dataset2Name, details = details, ext = utils.FileFormat.TSV)) | 669 ["ids", "P_Value", "fold change", "average_1", "average_2"], |
670 buildOutputPath(dataset1Name, dataset2Name, details = f"Tabular Result ({suffix})", ext = utils.FileFormat.TSV)) | |
692 | 671 |
693 if ras_enrichment: | 672 if ras_enrichment: |
694 fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_z_score) | 673 fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_z_score) |
695 return | 674 return |
696 | 675 |
731 | 710 |
732 return p_value, z_score | 711 return p_value, z_score |
733 | 712 |
734 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]: | 713 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]: |
735 #TODO: the following code still suffers from "dumbvarnames-osis" | 714 #TODO: the following code still suffers from "dumbvarnames-osis" |
736 tmp :Dict[str, List[Union[float, FoldChange]]] = {} | 715 comparisonResult :Dict[str, List[Union[float, FoldChange]]] = {} |
737 count = 0 | 716 count = 0 |
738 max_z_score = 0 | 717 max_z_score = 0 |
739 | 718 |
740 for l1, l2 in zip(dataset1Data, dataset2Data): | 719 for l1, l2 in zip(dataset1Data, dataset2Data): |
741 reactId = ids[count] | 720 reactId = ids[count] |
756 avg1 = sum(nets1) / len(nets1) | 735 avg1 = sum(nets1) / len(nets1) |
757 avg2 = sum(nets2) / len(nets2) | 736 avg2 = sum(nets2) / len(nets2) |
758 net = fold_change(avg1, avg2) | 737 net = fold_change(avg1, avg2) |
759 | 738 |
760 if math.isnan(net): continue | 739 if math.isnan(net): continue |
761 tmp[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2] | 740 comparisonResult[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2] |
762 | 741 |
763 # vvv complementary directional ids are set to None once processed if net is to be applied to tips | 742 # vvv complementary directional ids are set to None once processed if net is to be applied to tips |
764 if ARGS.net: | 743 if ARGS.net: # If only using RPS, we cannot delete the inverse, as it's needed to color the arrows |
765 ids[position] = None | 744 ids[position] = None |
766 continue | 745 continue |
767 | 746 |
768 # fallthrough is intended, regular scores need to be computed when tips aren't net but RAS datasets aren't used | 747 # fallthrough is intended, regular scores need to be computed when tips aren't net but RAS datasets aren't used |
769 p_value, z_score = computePValue(l1, l2) | 748 p_value, z_score = computePValue(l1, l2) |
770 avg = fold_change(sum(l1) / len(l1), sum(l2) / len(l2)) | 749 avg = fold_change(sum(l1) / len(l1), sum(l2) / len(l2)) |
771 if not isinstance(z_score, str) and max_z_score < abs(z_score): max_z_score = abs(z_score) | 750 # vvv TODO: Check numpy version compatibility |
772 tmp[reactId] = [float(p_value), avg, z_score, sum(l1) / len(l1), sum(l2) / len(l2)] | 751 if np.isfinite(z_score) and max_z_score < abs(z_score): max_z_score = abs(z_score) |
752 comparisonResult[reactId] = [float(p_value), avg, z_score, sum(l1) / len(l1), sum(l2) / len(l2)] | |
773 | 753 |
774 except (TypeError, ZeroDivisionError): continue | 754 except (TypeError, ZeroDivisionError): continue |
775 | 755 |
776 return tmp, max_z_score | 756 return comparisonResult, max_z_score |
777 | 757 |
778 def computeEnrichment(class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> List[Tuple[str, str, dict, float]]: | 758 def computeEnrichment(class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> List[Tuple[str, str, dict, float]]: |
779 """ | 759 """ |
780 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the | 760 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the |
781 provided metabolic map. | 761 provided metabolic map. |
823 utils.writeSvg(svgFilePath, core_map) | 803 utils.writeSvg(svgFilePath, core_map) |
824 | 804 |
825 if ARGS.generate_pdf: | 805 if ARGS.generate_pdf: |
826 pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG) | 806 pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG) |
827 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF) | 807 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF) |
828 convert_to_pdf(svgFilePath, pngPath, pdfPath) | 808 svg_to_png_with_background(svgFilePath, pngPath) |
829 | 809 try: |
830 if not ARGS.generate_svg: | 810 image = Image.open(pngPath.show()) |
811 image = image.convert("RGB") | |
812 image.save(pdfPath.show(), "PDF", resolution=100.0) | |
813 print(f'PDF file {pdfPath.filePath} successfully generated.') | |
814 | |
815 except Exception as e: | |
816 raise utils.DataErr(pdfPath.show(), f'Error generating PDF file: {e}') | |
817 | |
818 if not ARGS.generate_svg: # This argument is useless, who cares if the user wants the svg or not | |
831 os.remove(svgFilePath.show()) | 819 os.remove(svgFilePath.show()) |
832 | 820 |
833 ClassPat = Dict[str, List[List[float]]] | 821 ClassPat = Dict[str, List[List[float]]] |
834 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]: | 822 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]: |
835 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate, | 823 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate, |
836 # for the sake of everyone's sanity. | 824 # for the sake of everyone's sanity. |
837 class_pat :ClassPat = {} | 825 class_pat :ClassPat = {} |
838 if ARGS.option == 'datasets': | 826 if ARGS.option == 'datasets': |
839 num = 1 #TODO: the dataset naming function could be a generator | 827 num = 1 |
840 for path, name in zip(datasetsPaths, names): | 828 for path, name in zip(datasetsPaths, names): |
841 name = name_dataset(name, num) | 829 name = str(name) |
842 resolve_rules_float, ids = getDatasetValues(path, name) | 830 if name == 'Dataset': |
843 if resolve_rules_float != None: | 831 name += '_' + str(num) |
844 class_pat[name] = list(map(list, zip(*resolve_rules_float.values()))) | 832 |
845 | 833 values, ids = getDatasetValues(path, name) |
834 if values != None: | |
835 class_pat[name] = list(map(list, zip(*values.values()))) | |
836 # TODO: ??? | |
837 | |
846 num += 1 | 838 num += 1 |
847 | 839 |
848 elif ARGS.option == "dataset_class": | 840 elif ARGS.option == "dataset_class": |
849 classes = read_dataset(classPath, "class") | 841 classes = read_dataset(classPath, "class") |
850 classes = classes.astype(str) | 842 classes = classes.astype(str) |
851 | 843 |
852 resolve_rules_float, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)") | 844 values, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)") |
853 if resolve_rules_float != None: class_pat = split_class(classes, resolve_rules_float) | 845 if values != None: class_pat = split_class(classes, values) |
854 | 846 |
855 return ids, class_pat | 847 return ids, class_pat |
856 #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs) | 848 #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs) |
857 | 849 |
858 #TODO: create these damn args as FilePath objects | 850 #TODO: create these damn args as FilePath objects |
885 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError) | 877 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError) |
886 """ | 878 """ |
887 global ARGS | 879 global ARGS |
888 ARGS = process_args(args) | 880 ARGS = process_args(args) |
889 | 881 |
882 # Create output folder | |
890 if not os.path.isdir(ARGS.output_path): | 883 if not os.path.isdir(ARGS.output_path): |
891 os.makedirs(ARGS.output_path) | 884 os.makedirs(ARGS.output_path, exist_ok=True) |
892 | 885 |
893 core_map: ET.ElementTree = ARGS.choice_map.getMap( | 886 core_map: ET.ElementTree = ARGS.choice_map.getMap( |
894 ARGS.tool_dir, | 887 ARGS.tool_dir, |
895 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None) | 888 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None) |
896 | 889 |
897 # Prepare enrichment results containers | 890 # TODO: in the future keep the indices WITH the data and fix the code below. |
891 | |
892 # Prepare enrichment results containers | |
898 ras_results = [] | 893 ras_results = [] |
899 rps_results = [] | 894 rps_results = [] |
900 | 895 |
901 # Compute RAS enrichment if requested | 896 # Compute RAS enrichment if requested |
902 if ARGS.using_RAS: | 897 if ARGS.using_RAS: |
910 ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps) | 905 ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps) |
911 rps_results = computeEnrichment(class_pat_rps, ids_rps, fromRAS=False) | 906 rps_results = computeEnrichment(class_pat_rps, ids_rps, fromRAS=False) |
912 | 907 |
913 # Organize by comparison pairs | 908 # Organize by comparison pairs |
914 comparisons: Dict[Tuple[str, str], Dict[str, Tuple]] = {} | 909 comparisons: Dict[Tuple[str, str], Dict[str, Tuple]] = {} |
915 for i, j, d, mz in ras_results: | 910 for i, j, comparison_data, max_z_score in ras_results: |
916 comparisons[(i, j)] = {'ras': (d, mz), 'rps': None} | 911 comparisons[(i, j)] = {'ras': (comparison_data, max_z_score), 'rps': None} |
917 for i, j, d, mz in rps_results: | 912 for i, j, comparison_data, max_z_score in rps_results: |
918 comparisons.setdefault((i, j), {}).update({'rps': (d, mz)}) | 913 comparisons.setdefault((i, j), {}).update({'rps': (comparison_data, max_z_score)}) |
919 | 914 |
920 # For each comparison, create a styled map with RAS bodies and RPS heads | 915 # For each comparison, create a styled map with RAS bodies and RPS heads |
921 for (i, j), res in comparisons.items(): | 916 for (i, j), res in comparisons.items(): |
922 map_copy = copy.deepcopy(core_map) | 917 map_copy = copy.deepcopy(core_map) |
923 | 918 |
927 temp_thingsInCommon(tmp_ras, map_copy, max_z_ras, i, j, ras_enrichment=True) | 922 temp_thingsInCommon(tmp_ras, map_copy, max_z_ras, i, j, ras_enrichment=True) |
928 | 923 |
929 # Apply RPS styling to arrow heads | 924 # Apply RPS styling to arrow heads |
930 if res.get('rps'): | 925 if res.get('rps'): |
931 tmp_rps, max_z_rps = res['rps'] | 926 tmp_rps, max_z_rps = res['rps'] |
932 # Ensure applyRpsEnrichmentToMap styles only heads; adjust internals if needed | 927 # applyRpsEnrichmentToMap styles only heads unless only RPS are used |
933 temp_thingsInCommon(tmp_rps, map_copy, max_z_rps, i, j, ras_enrichment=False) | 928 temp_thingsInCommon(tmp_rps, map_copy, max_z_rps, i, j, ras_enrichment=False) |
934 | 929 |
935 # Output both SVG and PDF/PNG as configured | 930 # Output both SVG and PDF/PNG as configured |
936 createOutputMaps(i, j, map_copy) | 931 createOutputMaps(i, j, map_copy) |
937 print('Execution succeeded') | 932 print('Execution succeeded') |