Mercurial > repos > bimib > cobraxy
comparison COBRAxy/marea.py @ 290:71ec0b6ddde9 draft
Uploaded
| author | francesco_lapi |
|---|---|
| date | Wed, 14 May 2025 10:43:04 +0000 |
| parents | 3f5cfcf4d9eb |
| children | 7f3e66dd46fa |
comparison
equal
deleted
inserted
replaced
| 289:3f5cfcf4d9eb | 290:71ec0b6ddde9 |
|---|---|
| 46 | 46 |
| 47 #Computation details: | 47 #Computation details: |
| 48 parser.add_argument( | 48 parser.add_argument( |
| 49 '-co', '--comparison', | 49 '-co', '--comparison', |
| 50 type = str, | 50 type = str, |
| 51 default = 'manyvsmany', | 51 default = '1vs1', |
| 52 choices = ['manyvsmany', 'onevsrest', 'onevsmany']) | 52 choices = ['manyvsmany', 'onevsrest', 'onevsmany']) |
| 53 | 53 |
| 54 parser.add_argument( | 54 parser.add_argument( |
| 55 '-pv' ,'--pValue', | 55 '-pv' ,'--pValue', |
| 56 type = float, | 56 type = float, |
| 183 sys.exit('Execution aborted: wrong format of ' + name + '\n') | 183 sys.exit('Execution aborted: wrong format of ' + name + '\n') |
| 184 if len(dataset.columns) < 2: | 184 if len(dataset.columns) < 2: |
| 185 sys.exit('Execution aborted: wrong format of ' + name + '\n') | 185 sys.exit('Execution aborted: wrong format of ' + name + '\n') |
| 186 return dataset | 186 return dataset |
| 187 | 187 |
| 188 ############################ dataset name ##################################### | |
| 189 def name_dataset(name_data :str, count :int) -> str: | |
| 190 """ | |
| 191 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique. | |
| 192 | |
| 193 Args: | |
| 194 name_data : name associated with the dataset (from frontend input params) | |
| 195 count : counter from 1 to make these names unique (external) | |
| 196 | |
| 197 Returns: | |
| 198 str : the name made unique | |
| 199 """ | |
| 200 if str(name_data) == 'Dataset': | |
| 201 return str(name_data) + '_' + str(count) | |
| 202 else: | |
| 203 return str(name_data) | |
| 204 | |
| 188 ############################ map_methods ###################################### | 205 ############################ map_methods ###################################### |
| 189 FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]] | 206 FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]] |
| 190 def fold_change(avg1 :float, avg2 :float) -> FoldChange: | 207 def fold_change(avg1 :float, avg2 :float) -> FoldChange: |
| 191 """ | 208 """ |
| 192 Calculates the fold change between two gene expression values. | 209 Calculates the fold change between two gene expression values. |
| 201 "INF" : when avg2 is 0 | 218 "INF" : when avg2 is 0 |
| 202 float : for any other combination of values | 219 float : for any other combination of values |
| 203 """ | 220 """ |
| 204 if avg1 == 0 and avg2 == 0: | 221 if avg1 == 0 and avg2 == 0: |
| 205 return 0 | 222 return 0 |
| 206 | 223 elif avg1 == 0: |
| 207 if avg1 == 0: | 224 return '-INF' |
| 208 return '-INF' # TODO: maybe fix | 225 elif avg2 == 0: |
| 209 | |
| 210 if avg2 == 0: | |
| 211 return 'INF' | 226 return 'INF' |
| 212 | 227 else: # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1 |
| 213 # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1 | 228 return (avg1 - avg2) / (abs(avg1) + abs(avg2)) |
| 214 fc = (avg1 - avg2) / (abs(avg1) + abs(avg2)) | 229 |
| 215 if avg1 < 0 and avg2 < 0: fc *= -1 | |
| 216 return fc | |
| 217 | |
| 218 # TODO: I would really like for this one to get the Thanos treatment | |
| 219 def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str: | 230 def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str: |
| 220 """ | 231 """ |
| 221 Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params. | 232 Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params. |
| 222 | 233 |
| 223 Args: | 234 Args: |
| 249 tmp.append('stroke-width:' + width) | 260 tmp.append('stroke-width:' + width) |
| 250 if not flag_dash: | 261 if not flag_dash: |
| 251 tmp.append('stroke-dasharray:' + dash) | 262 tmp.append('stroke-dasharray:' + dash) |
| 252 return ';'.join(tmp) | 263 return ';'.join(tmp) |
| 253 | 264 |
| 254 # TODO: remove, there's applyRPS whatever | |
| 255 # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix! | 265 # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix! |
| 256 def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree: | 266 def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree: |
| 257 """ | 267 """ |
| 258 Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs. | 268 Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs. |
| 259 | 269 |
| 278 for el in core_map.iter(): | 288 for el in core_map.iter(): |
| 279 el_id = str(el.get('id')) | 289 el_id = str(el.get('id')) |
| 280 if el_id.startswith('R_'): | 290 if el_id.startswith('R_'): |
| 281 tmp = d.get(el_id[2:]) | 291 tmp = d.get(el_id[2:]) |
| 282 if tmp != None: | 292 if tmp != None: |
| 283 p_val, f_c, z_score, avg1, avg2 = tmp | 293 p_val :float = tmp[0] |
| 294 f_c = tmp[1] | |
| 295 z_score = tmp[2] | |
| 284 | 296 |
| 285 if math.isnan(p_val) or (isinstance(f_c, float) and math.isnan(f_c)): continue | 297 if math.isnan(p_val) or (isinstance(f_c, float) and math.isnan(f_c)): continue |
| 286 | 298 |
| 287 if p_val <= threshold_P_V: # p-value is OK | 299 if p_val <= threshold_P_V: |
| 288 if not isinstance(f_c, str): # FC is finite | 300 if not isinstance(f_c, str): |
| 289 if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # FC is not OK | 301 if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # |
| 290 col = grey | 302 col = grey |
| 291 width = str(minT) | 303 width = str(minT) |
| 292 else: # FC is OK | 304 else: |
| 293 if f_c < 0: | 305 if f_c < 0: |
| 294 col = blue | 306 col = blue |
| 295 elif f_c > 0: | 307 elif f_c > 0: |
| 296 col = red | 308 col = red |
| 297 width = str( | 309 width = str(max((abs(z_score) * maxT) / max_z_score, minT)) |
| 298 min( | 310 else: |
| 299 max(abs(z_score * maxT) / max_z_score, minT), | |
| 300 maxT)) | |
| 301 | |
| 302 else: # FC is infinite | |
| 303 if f_c == '-INF': | 311 if f_c == '-INF': |
| 304 col = blue | 312 col = blue |
| 305 elif f_c == 'INF': | 313 elif f_c == 'INF': |
| 306 col = red | 314 col = red |
| 307 width = str(maxT) | 315 width = str(maxT) |
| 308 dash = 'none' | 316 dash = 'none' |
| 309 else: # p-value is not OK | 317 else: |
| 310 dash = '5,5' | 318 dash = '5,5' |
| 311 col = grey | 319 col = grey |
| 312 width = str(minT) | 320 width = str(minT) |
| 313 el.set('style', fix_style(el.get('style', ""), col, width, dash)) | 321 el.set('style', fix_style(el.get('style', ""), col, width, dash)) |
| 314 return core_map | 322 return core_map |
| 324 | 332 |
| 325 Returns: | 333 Returns: |
| 326 utils.Result[ET.Element, ResultErr]: result of the search, either the first match found or a ResultErr. | 334 utils.Result[ET.Element, ResultErr]: result of the search, either the first match found or a ResultErr. |
| 327 """ | 335 """ |
| 328 return utils.Result.Ok( | 336 return utils.Result.Ok( |
| 329 lambda: metabMap.xpath(f"//*[@id=\"{reactionId}\"]")[0]).mapErr( | 337 f"//*[@id=\"{reactionId}\"]").map( |
| 338 lambda xPath : metabMap.xpath(xPath)[0]).mapErr( | |
| 330 lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map")) | 339 lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map")) |
| 331 # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user. | 340 # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user. |
| 332 | 341 |
| 333 def styleMapElement(element :ET.Element, styleStr :str) -> None: | 342 def styleMapElement(element :ET.Element, styleStr :str) -> None: |
| 334 currentStyles :str = element.get("style", "") | 343 currentStyles :str = element.get("style", "") |
| 335 if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles): | 344 if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles): |
| 336 currentStyles = ';'.join(currentStyles.split(';')[:-3]) # TODO: why the last 3? Are we sure? | 345 currentStyles = ';'.join(currentStyles.split(';')[:-3]) |
| 337 | |
| 338 #TODO: this is attempting to solve the styling override problem, not sure it does tho | |
| 339 | 346 |
| 340 element.set("style", currentStyles + styleStr) | 347 element.set("style", currentStyles + styleStr) |
| 341 | 348 |
| 342 # TODO: maybe remove vvv | |
| 343 class ReactionDirection(Enum): | 349 class ReactionDirection(Enum): |
| 344 Unknown = "" | 350 Unknown = "" |
| 345 Direct = "_F" | 351 Direct = "_F" |
| 346 Inverse = "_B" | 352 Inverse = "_B" |
| 347 | 353 |
| 370 | 376 |
| 371 Returns: | 377 Returns: |
| 372 Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try. | 378 Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try. |
| 373 """ | 379 """ |
| 374 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV | 380 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV |
| 375 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: | 381 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: return reactionId[:-3:-1] + reactionId[:-2], "" |
| 376 return reactionId[:-3:-1] + reactionId[:-2], "" # ^^^ Invert _F to F_ | |
| 377 | |
| 378 return f"F_{reactionId}", f"B_{reactionId}" | 382 return f"F_{reactionId}", f"B_{reactionId}" |
| 379 | 383 |
| 380 class ArrowColor(Enum): | 384 class ArrowColor(Enum): |
| 381 """ | 385 """ |
| 382 Encodes possible arrow colors based on their meaning in the enrichment process. | 386 Encodes possible arrow colors based on their meaning in the enrichment process. |
| 383 """ | 387 """ |
| 384 Invalid = "#BEBEBE" # gray, fold-change under treshold | 388 Invalid = "#BEBEBE" # gray, fold-change under treshold |
| 385 UpRegulated = "#ecac68" # orange, up-regulated reaction | 389 UpRegulated = "#ecac68" # red, up-regulated reaction |
| 386 DownRegulated = "#6495ed" # lightblue, down-regulated reaction | 390 DownRegulated = "#6495ed" # blue, down-regulated reaction |
| 387 | 391 |
| 388 UpRegulatedInv = "#FF0000" | 392 UpRegulatedInv = "#FF0000" |
| 389 # ^^^ bright red, up-regulated net value for a reversible reaction with | 393 # ^^^ different shade of red (actually orange), up-regulated net value for a reversible reaction with |
| 390 # conflicting enrichment in the two directions. | 394 # conflicting enrichment in the two directions. |
| 391 | 395 |
| 392 DownRegulatedInv = "#0000FF" | 396 DownRegulatedInv = "#0000FF" |
| 393 # ^^^ bright blue, down-regulated net value for a reversible reaction with | 397 # ^^^ different shade of blue (actually purple), down-regulated net value for a reversible reaction with |
| 394 # conflicting enrichment in the two directions. | 398 # conflicting enrichment in the two directions. |
| 395 | 399 |
| 396 @classmethod | 400 @classmethod |
| 397 def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor": | 401 def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor": |
| 398 colors = (cls.DownRegulated, cls.DownRegulatedInv) if foldChange < 0 else (cls.UpRegulated, cls.UpRegulatedInv) | 402 colors = (cls.DownRegulated, cls.DownRegulatedInv) if foldChange < 0 else (cls.UpRegulated, cls.UpRegulatedInv) |
| 435 # Now we style the arrow head(s): | 439 # Now we style the arrow head(s): |
| 436 idOpt1, idOpt2 = getArrowHeadElementId(reactionId) | 440 idOpt1, idOpt2 = getArrowHeadElementId(reactionId) |
| 437 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True)) | 441 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True)) |
| 438 if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True)) | 442 if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True)) |
| 439 | 443 |
| 440 # TODO: this seems to be unused, remove | |
| 441 def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str: | 444 def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str: |
| 442 """ | 445 """ |
| 443 Computes the reaction ID as encoded in the map for a given reaction ID from the dataset. | 446 Computes the reaction ID as encoded in the map for a given reaction ID from the dataset. |
| 444 | 447 |
| 445 Args: | 448 Args: |
| 469 # vvv These constants could be inside the class itself a static properties, but python | 472 # vvv These constants could be inside the class itself a static properties, but python |
| 470 # was built by brainless organisms so here we are! | 473 # was built by brainless organisms so here we are! |
| 471 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid) | 474 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid) |
| 472 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True) | 475 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True) |
| 473 | 476 |
| 474 # TODO: A more general version of this can be used for RAS as well, we don't need "fix map" or whatever | |
| 475 def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None: | 477 def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None: |
| 476 """ | 478 """ |
| 477 Applies RPS enrichment results to the provided metabolic map. | 479 Applies RPS enrichment results to the provided metabolic map. |
| 478 | 480 |
| 479 Args: | 481 Args: |
| 497 if isinstance(foldChange, str): foldChange = float(foldChange) | 499 if isinstance(foldChange, str): foldChange = float(foldChange) |
| 498 if pValue >= ARGS.pValue: # pValue above tresh: dashed arrow | 500 if pValue >= ARGS.pValue: # pValue above tresh: dashed arrow |
| 499 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId) | 501 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId) |
| 500 continue | 502 continue |
| 501 | 503 |
| 502 if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1): | 504 if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1): |
| 503 INVALID_ARROW.styleReactionElements(metabMap, reactionId) | 505 INVALID_ARROW.styleReactionElements(metabMap, reactionId) |
| 504 continue | 506 continue |
| 505 | 507 |
| 506 width = Arrow.MAX_W | 508 width = Arrow.MAX_W |
| 507 if not math.isinf(foldChange): | 509 if not math.isinf(foldChange): |
| 508 try: width = min( | 510 try: width = max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W) |
| 509 max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W), | |
| 510 Arrow.MAX_W) | |
| 511 | |
| 512 except ZeroDivisionError: pass | 511 except ZeroDivisionError: pass |
| 513 | 512 |
| 514 if not reactionId.endswith("_RV"): # RV stands for reversible reactions | 513 if not reactionId.endswith("_RV"): # RV stands for reversible reactions |
| 515 Arrow(width, ArrowColor.fromFoldChangeSign(foldChange)).styleReactionElements(metabMap, reactionId) | 514 Arrow(width, ArrowColor.fromFoldChangeSign(foldChange)).styleReactionElements(metabMap, reactionId) |
| 516 continue | 515 continue |
| 517 | 516 |
| 518 reactionId = reactionId[:-3] # Remove "_RV" | 517 reactionId = reactionId[:-3] # Remove "_RV" |
| 519 | 518 |
| 520 inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score | 519 inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score |
| 520 if inversionScore == 2: foldChange *= -1 | |
| 521 # ^^^ Style the inverse direction with the opposite sign netValue | |
| 521 | 522 |
| 522 # If the score is 1 (opposite signs) we use alternative colors vvv | 523 # If the score is 1 (opposite signs) we use alternative colors vvv |
| 523 arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1)) | 524 arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1)) |
| 524 | 525 |
| 525 # vvv These 2 if statements can both be true and can both happen | 526 # vvv These 2 if statements can both be true and can both happen |
| 528 | 529 |
| 529 if not ARGS.using_RAS: # style arrow body | 530 if not ARGS.using_RAS: # style arrow body |
| 530 arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False) | 531 arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False) |
| 531 | 532 |
| 532 ############################ split class ###################################### | 533 ############################ split class ###################################### |
| 533 def split_class(classes :pd.DataFrame, dataset_values :Dict[str, List[float]]) -> Dict[str, List[List[float]]]: | 534 def split_class(classes :pd.DataFrame, resolve_rules :Dict[str, List[float]]) -> Dict[str, List[List[float]]]: |
| 534 """ | 535 """ |
| 535 Generates a :dict that groups together data from a :DataFrame based on classes the data is related to. | 536 Generates a :dict that groups together data from a :DataFrame based on classes the data is related to. |
| 536 | 537 |
| 537 Args: | 538 Args: |
| 538 classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict | 539 classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict |
| 539 dataset_values : a :dict containing :float data | 540 resolve_rules : a :dict containing :float data |
| 540 | 541 |
| 541 Returns: | 542 Returns: |
| 542 dict : the dict with data grouped by class | 543 dict : the dict with data grouped by class |
| 543 | 544 |
| 544 Side effects: | 545 Side effects: |
| 550 if pd.isnull(classe): continue | 551 if pd.isnull(classe): continue |
| 551 | 552 |
| 552 l :List[List[float]] = [] | 553 l :List[List[float]] = [] |
| 553 for j in range(i, len(classes)): | 554 for j in range(i, len(classes)): |
| 554 if classes.iloc[j, 1] == classe: | 555 if classes.iloc[j, 1] == classe: |
| 555 pat_id :str = classes.iloc[j, 0] # sample name | 556 pat_id :str = classes.iloc[j, 0] |
| 556 values = dataset_values.get(pat_id, None) # the column of values for that sample | 557 tmp = resolve_rules.get(pat_id, None) |
| 557 if values != None: | 558 if tmp != None: |
| 558 l.append(values) | 559 l.append(tmp) |
| 559 classes.iloc[j, 1] = None # TODO: problems? | 560 classes.iloc[j, 1] = None |
| 560 | 561 |
| 561 if l: | 562 if l: |
| 562 class_pat[classe] = list(map(list, zip(*l))) | 563 class_pat[classe] = list(map(list, zip(*l))) |
| 563 continue | 564 continue |
| 564 | 565 |
| 597 white_background = white_background.extract_band(0) | 598 white_background = white_background.extract_band(0) |
| 598 | 599 |
| 599 composite_image = white_background.composite2(image, 'over') | 600 composite_image = white_background.composite2(image, 'over') |
| 600 composite_image.write_to_file(png_path.show()) | 601 composite_image.write_to_file(png_path.show()) |
| 601 | 602 |
| 603 #funzione unica, lascio fuori i file e li passo in input | |
| 604 #conversion from png to pdf | |
| 605 def convert_png_to_pdf(png_file :utils.FilePath, pdf_file :utils.FilePath) -> None: | |
| 606 """ | |
| 607 Internal utility to convert a PNG to PDF to aid from SVG conversion. | |
| 608 | |
| 609 Args: | |
| 610 png_file : path to PNG file | |
| 611 pdf_file : path to new PDF file | |
| 612 | |
| 613 Returns: | |
| 614 None | |
| 615 """ | |
| 616 image = Image.open(png_file.show()) | |
| 617 image = image.convert("RGB") | |
| 618 image.save(pdf_file.show(), "PDF", resolution=100.0) | |
| 619 | |
| 620 #function called to reduce redundancy in the code | |
| 602 def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None: | 621 def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None: |
| 603 """ | 622 """ |
| 604 Converts the SVG map at the provided path to PDF. | 623 Converts the SVG map at the provided path to PDF. |
| 605 | 624 |
| 606 Args: | 625 Args: |
| 611 Returns: | 630 Returns: |
| 612 None | 631 None |
| 613 """ | 632 """ |
| 614 svg_to_png_with_background(file_svg, file_png) | 633 svg_to_png_with_background(file_svg, file_png) |
| 615 try: | 634 try: |
| 616 image = Image.open(file_png.show()) | 635 convert_png_to_pdf(file_png, file_pdf) |
| 617 image = image.convert("RGB") | |
| 618 image.save(file_pdf.show(), "PDF", resolution=100.0) | |
| 619 print(f'PDF file {file_pdf.filePath} successfully generated.') | 636 print(f'PDF file {file_pdf.filePath} successfully generated.') |
| 620 | 637 |
| 621 except Exception as e: | 638 except Exception as e: |
| 622 raise utils.DataErr(file_pdf.show(), f'Error generating PDF file: {e}') | 639 raise utils.DataErr(file_pdf.show(), f'Error generating PDF file: {e}') |
| 623 | 640 |
| 658 sizeMismatch = fieldsAmt - len(row) | 675 sizeMismatch = fieldsAmt - len(row) |
| 659 if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch) | 676 if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch) |
| 660 writer.writerow({ field : data for field, data in zip(fieldNames, row) }) | 677 writer.writerow({ field : data for field, data in zip(fieldNames, row) }) |
| 661 | 678 |
| 662 OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible | 679 OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible |
| 663 def temp_thingsInCommon(tmp :OldEnrichedScores, core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None: | 680 def writeTabularResult(enrichedScores : OldEnrichedScores, ras_enrichment: bool, outPath :utils.FilePath) -> None: |
| 681 fieldNames = ["ids", "P_Value", "fold change", "average_1", "average_2"] | |
| 682 | |
| 683 writeToCsv([ [reactId] + values for reactId, values in enrichedScores.items() ], fieldNames, outPath) | |
| 684 | |
| 685 def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None: | |
| 664 # this function compiles the things always in common between comparison modes after enrichment. | 686 # this function compiles the things always in common between comparison modes after enrichment. |
| 665 # TODO: organize, name better. | 687 # TODO: organize, name better. |
| 666 suffix = "RAS" if ras_enrichment else "RPS" | 688 suffix = "RAS" if ras_enrichment else "RPS" |
| 667 writeToCsv( | 689 details = f"Tabular Result ({suffix})" |
| 668 [ [reactId] + values for reactId, values in tmp.items() ], | 690 |
| 669 ["ids", "P_Value", "fold change", "average_1", "average_2"], | 691 writeTabularResult(tmp, ras_enrichment, buildOutputPath(dataset1Name, dataset2Name, details = details, ext = utils.FileFormat.TSV)) |
| 670 buildOutputPath(dataset1Name, dataset2Name, details = f"Tabular Result ({suffix})", ext = utils.FileFormat.TSV)) | |
| 671 | 692 |
| 672 if ras_enrichment: | 693 if ras_enrichment: |
| 673 fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_z_score) | 694 fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_z_score) |
| 674 return | 695 return |
| 675 | 696 |
| 710 | 731 |
| 711 return p_value, z_score | 732 return p_value, z_score |
| 712 | 733 |
| 713 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]: | 734 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]: |
| 714 #TODO: the following code still suffers from "dumbvarnames-osis" | 735 #TODO: the following code still suffers from "dumbvarnames-osis" |
| 715 comparisonResult :Dict[str, List[Union[float, FoldChange]]] = {} | 736 tmp :Dict[str, List[Union[float, FoldChange]]] = {} |
| 716 count = 0 | 737 count = 0 |
| 717 max_z_score = 0 | 738 max_z_score = 0 |
| 718 | 739 |
| 719 for l1, l2 in zip(dataset1Data, dataset2Data): | 740 for l1, l2 in zip(dataset1Data, dataset2Data): |
| 720 reactId = ids[count] | 741 reactId = ids[count] |
| 735 avg1 = sum(nets1) / len(nets1) | 756 avg1 = sum(nets1) / len(nets1) |
| 736 avg2 = sum(nets2) / len(nets2) | 757 avg2 = sum(nets2) / len(nets2) |
| 737 net = fold_change(avg1, avg2) | 758 net = fold_change(avg1, avg2) |
| 738 | 759 |
| 739 if math.isnan(net): continue | 760 if math.isnan(net): continue |
| 740 comparisonResult[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2] | 761 tmp[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2] |
| 741 | 762 |
| 742 # vvv complementary directional ids are set to None once processed if net is to be applied to tips | 763 # vvv complementary directional ids are set to None once processed if net is to be applied to tips |
| 743 if ARGS.net: # If only using RPS, we cannot delete the inverse, as it's needed to color the arrows | 764 if ARGS.net: |
| 744 ids[position] = None | 765 ids[position] = None |
| 745 continue | 766 continue |
| 746 | 767 |
| 747 # fallthrough is intended, regular scores need to be computed when tips aren't net but RAS datasets aren't used | 768 # fallthrough is intended, regular scores need to be computed when tips aren't net but RAS datasets aren't used |
| 748 p_value, z_score = computePValue(l1, l2) | 769 p_value, z_score = computePValue(l1, l2) |
| 749 avg = fold_change(sum(l1) / len(l1), sum(l2) / len(l2)) | 770 avg = fold_change(sum(l1) / len(l1), sum(l2) / len(l2)) |
| 750 # vvv TODO: Check numpy version compatibility | 771 if not isinstance(z_score, str) and max_z_score < abs(z_score): max_z_score = abs(z_score) |
| 751 if np.isfinite(z_score) and max_z_score < abs(z_score): max_z_score = abs(z_score) | 772 tmp[reactId] = [float(p_value), avg, z_score, sum(l1) / len(l1), sum(l2) / len(l2)] |
| 752 comparisonResult[reactId] = [float(p_value), avg, z_score, sum(l1) / len(l1), sum(l2) / len(l2)] | |
| 753 | 773 |
| 754 except (TypeError, ZeroDivisionError): continue | 774 except (TypeError, ZeroDivisionError): continue |
| 755 | 775 |
| 756 return comparisonResult, max_z_score | 776 return tmp, max_z_score |
| 757 | 777 |
| 758 def computeEnrichment(class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> List[Tuple[str, str, dict, float]]: | 778 def computeEnrichment(class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> List[Tuple[str, str, dict, float]]: |
| 759 """ | 779 """ |
| 760 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the | 780 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the |
| 761 provided metabolic map. | 781 provided metabolic map. |
| 803 utils.writeSvg(svgFilePath, core_map) | 823 utils.writeSvg(svgFilePath, core_map) |
| 804 | 824 |
| 805 if ARGS.generate_pdf: | 825 if ARGS.generate_pdf: |
| 806 pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG) | 826 pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG) |
| 807 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF) | 827 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF) |
| 808 svg_to_png_with_background(svgFilePath, pngPath) | 828 convert_to_pdf(svgFilePath, pngPath, pdfPath) |
| 809 try: | 829 |
| 810 image = Image.open(pngPath.show()) | 830 if not ARGS.generate_svg: |
| 811 image = image.convert("RGB") | |
| 812 image.save(pdfPath.show(), "PDF", resolution=100.0) | |
| 813 print(f'PDF file {pdfPath.filePath} successfully generated.') | |
| 814 | |
| 815 except Exception as e: | |
| 816 raise utils.DataErr(pdfPath.show(), f'Error generating PDF file: {e}') | |
| 817 | |
| 818 if not ARGS.generate_svg: # This argument is useless, who cares if the user wants the svg or not | |
| 819 os.remove(svgFilePath.show()) | 831 os.remove(svgFilePath.show()) |
| 820 | 832 |
| 821 ClassPat = Dict[str, List[List[float]]] | 833 ClassPat = Dict[str, List[List[float]]] |
| 822 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]: | 834 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]: |
| 823 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate, | 835 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate, |
| 824 # for the sake of everyone's sanity. | 836 # for the sake of everyone's sanity. |
| 825 class_pat :ClassPat = {} | 837 class_pat :ClassPat = {} |
| 826 if ARGS.option == 'datasets': | 838 if ARGS.option == 'datasets': |
| 827 num = 1 | 839 num = 1 #TODO: the dataset naming function could be a generator |
| 828 for path, name in zip(datasetsPaths, names): | 840 for path, name in zip(datasetsPaths, names): |
| 829 name = str(name) | 841 name = name_dataset(name, num) |
| 830 if name == 'Dataset': | 842 resolve_rules_float, ids = getDatasetValues(path, name) |
| 831 name += '_' + str(num) | 843 if resolve_rules_float != None: |
| 832 | 844 class_pat[name] = list(map(list, zip(*resolve_rules_float.values()))) |
| 833 values, ids = getDatasetValues(path, name) | 845 |
| 834 if values != None: | |
| 835 class_pat[name] = list(map(list, zip(*values.values()))) | |
| 836 # TODO: ??? | |
| 837 | |
| 838 num += 1 | 846 num += 1 |
| 839 | 847 |
| 840 elif ARGS.option == "dataset_class": | 848 elif ARGS.option == "dataset_class": |
| 841 classes = read_dataset(classPath, "class") | 849 classes = read_dataset(classPath, "class") |
| 842 classes = classes.astype(str) | 850 classes = classes.astype(str) |
| 843 | 851 |
| 844 values, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)") | 852 resolve_rules_float, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)") |
| 845 if values != None: class_pat = split_class(classes, values) | 853 if resolve_rules_float != None: class_pat = split_class(classes, resolve_rules_float) |
| 846 | 854 |
| 847 return ids, class_pat | 855 return ids, class_pat |
| 848 #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs) | 856 #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs) |
| 849 | 857 |
| 850 #TODO: create these damn args as FilePath objects | 858 #TODO: create these damn args as FilePath objects |
| 877 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError) | 885 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError) |
| 878 """ | 886 """ |
| 879 global ARGS | 887 global ARGS |
| 880 ARGS = process_args(args) | 888 ARGS = process_args(args) |
| 881 | 889 |
| 882 # Create output folder | |
| 883 if not os.path.isdir(ARGS.output_path): | 890 if not os.path.isdir(ARGS.output_path): |
| 884 os.makedirs(ARGS.output_path, exist_ok=True) | 891 os.makedirs(ARGS.output_path, exist_ok=True) |
| 885 | 892 |
| 886 core_map: ET.ElementTree = ARGS.choice_map.getMap( | 893 core_map: ET.ElementTree = ARGS.choice_map.getMap( |
| 887 ARGS.tool_dir, | 894 ARGS.tool_dir, |
| 888 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None) | 895 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None) |
| 889 | 896 |
| 890 # TODO: in the future keep the indices WITH the data and fix the code below. | |
| 891 | |
| 892 # Prepare enrichment results containers | 897 # Prepare enrichment results containers |
| 893 ras_results = [] | 898 ras_results = [] |
| 894 rps_results = [] | 899 rps_results = [] |
| 895 | 900 |
| 896 # Compute RAS enrichment if requested | 901 # Compute RAS enrichment if requested |
| 905 ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps) | 910 ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps) |
| 906 rps_results = computeEnrichment(class_pat_rps, ids_rps, fromRAS=False) | 911 rps_results = computeEnrichment(class_pat_rps, ids_rps, fromRAS=False) |
| 907 | 912 |
| 908 # Organize by comparison pairs | 913 # Organize by comparison pairs |
| 909 comparisons: Dict[Tuple[str, str], Dict[str, Tuple]] = {} | 914 comparisons: Dict[Tuple[str, str], Dict[str, Tuple]] = {} |
| 910 for i, j, comparison_data, max_z_score in ras_results: | 915 for i, j, d, mz in ras_results: |
| 911 comparisons[(i, j)] = {'ras': (comparison_data, max_z_score), 'rps': None} | 916 comparisons[(i, j)] = {'ras': (d, mz), 'rps': None} |
| 912 for i, j, comparison_data, max_z_score in rps_results: | 917 for i, j, d, mz in rps_results: |
| 913 comparisons.setdefault((i, j), {}).update({'rps': (comparison_data, max_z_score)}) | 918 comparisons.setdefault((i, j), {}).update({'rps': (d, mz)}) |
| 914 | 919 |
| 915 # For each comparison, create a styled map with RAS bodies and RPS heads | 920 # For each comparison, create a styled map with RAS bodies and RPS heads |
| 916 for (i, j), res in comparisons.items(): | 921 for (i, j), res in comparisons.items(): |
| 917 map_copy = copy.deepcopy(core_map) | 922 map_copy = copy.deepcopy(core_map) |
| 918 | 923 |
| 922 temp_thingsInCommon(tmp_ras, map_copy, max_z_ras, i, j, ras_enrichment=True) | 927 temp_thingsInCommon(tmp_ras, map_copy, max_z_ras, i, j, ras_enrichment=True) |
| 923 | 928 |
| 924 # Apply RPS styling to arrow heads | 929 # Apply RPS styling to arrow heads |
| 925 if res.get('rps'): | 930 if res.get('rps'): |
| 926 tmp_rps, max_z_rps = res['rps'] | 931 tmp_rps, max_z_rps = res['rps'] |
| 927 # applyRpsEnrichmentToMap styles only heads unless only RPS are used | 932 # Ensure applyRpsEnrichmentToMap styles only heads; adjust internals if needed |
| 928 temp_thingsInCommon(tmp_rps, map_copy, max_z_rps, i, j, ras_enrichment=False) | 933 temp_thingsInCommon(tmp_rps, map_copy, max_z_rps, i, j, ras_enrichment=False) |
| 929 | 934 |
| 930 # Output both SVG and PDF/PNG as configured | 935 # Output both SVG and PDF/PNG as configured |
| 931 createOutputMaps(i, j, map_copy) | 936 createOutputMaps(i, j, map_copy) |
| 932 print('Execution succeeded') | 937 print('Execution succeeded') |
