comparison COBRAxy/src/flux_to_map.py @ 539:2fb97466e404 draft

Uploaded
author francesco_lapi
date Sat, 25 Oct 2025 14:55:13 +0000
parents
children fcdbc81feb45
comparison
equal deleted inserted replaced
538:fd53d42348bd 539:2fb97466e404
1 from __future__ import division
2 import csv
3 from enum import Enum
4 import re
5 import sys
6 import numpy as np
7 import pandas as pd
8 import itertools as it
9 import scipy.stats as st
10 import lxml.etree as ET
11 import math
12 import utils.general_utils as utils
13 from PIL import Image
14 import os
15 import copy
16 import argparse
17 import pyvips
18 from PIL import Image
19 from typing import Tuple, Union, Optional, List, Dict
20 import matplotlib.pyplot as plt
21
22 ERRORS = []
23 ########################## argparse ##########################################
24 ARGS :argparse.Namespace
25 def process_args(args:List[str] = None) -> argparse.Namespace:
26 """
27 Interfaces the script of a module with its frontend, making the user's choices for various parameters available as values in code.
28
29 Args:
30 args : Always obtained (in file) from sys.argv
31
32 Returns:
33 Namespace : An object containing the parsed arguments
34 """
35 parser = argparse.ArgumentParser(
36 usage = "%(prog)s [options]",
37 description = "process some value's genes to create a comparison's map.")
38
39 #General:
40 parser.add_argument(
41 '-td', '--tool_dir',
42 type = str,
43 required = True,
44 help = 'your tool directory')
45
46 parser.add_argument('-on', '--control', type = str)
47 parser.add_argument('-ol', '--out_log', help = "Output log")
48
49 #Computation details:
50 parser.add_argument(
51 '-co', '--comparison',
52 type = str,
53 default = 'manyvsmany',
54 choices = ['manyvsmany', 'onevsrest', 'onevsmany'])
55
56 parser.add_argument(
57 '-te' ,'--test',
58 type = str,
59 default = 'ks',
60 choices = ['ks', 'ttest_p', 'ttest_ind', 'wilcoxon', 'mw'],
61 help = 'Statistical test to use (default: %(default)s)')
62
63 parser.add_argument(
64 '-pv' ,'--pValue',
65 type = float,
66 default = 0.1,
67 help = 'P-Value threshold (default: %(default)s)')
68
69 parser.add_argument(
70 '-adj' ,'--adjusted',
71 type = utils.Bool("adjusted"), default = False,
72 help = 'Apply the FDR (Benjamini-Hochberg) correction (default: %(default)s)')
73
74 parser.add_argument(
75 '-fc', '--fChange',
76 type = float,
77 default = 1.5,
78 help = 'Fold-Change threshold (default: %(default)s)')
79
80 parser.add_argument(
81 '-op', '--option',
82 type = str,
83 choices = ['datasets', 'dataset_class'],
84 help='dataset or dataset and class')
85
86 parser.add_argument(
87 '-idf', '--input_data_fluxes',
88 type = str,
89 help = 'input dataset fluxes')
90
91 parser.add_argument(
92 '-icf', '--input_class_fluxes',
93 type = str,
94 help = 'sample group specification fluxes')
95
96 parser.add_argument(
97 '-idsf', '--input_datas_fluxes',
98 type = str,
99 nargs = '+',
100 help = 'input datasets fluxes')
101
102 parser.add_argument(
103 '-naf', '--names_fluxes',
104 type = str,
105 nargs = '+',
106 help = 'input names fluxes')
107
108 #Output:
109 parser.add_argument(
110 "-gs", "--generate_svg",
111 type = utils.Bool("generate_svg"), default = True,
112 help = "choose whether to generate svg")
113
114 parser.add_argument(
115 "-gp", "--generate_pdf",
116 type = utils.Bool("generate_pdf"), default = True,
117 help = "choose whether to generate pdf")
118
119 parser.add_argument(
120 '-cm', '--custom_map',
121 type = str,
122 help='custom map to use')
123
124 parser.add_argument(
125 '-mc', '--choice_map',
126 type = utils.Model, default = utils.Model.HMRcore,
127 choices = [utils.Model.HMRcore, utils.Model.ENGRO2, utils.Model.Custom])
128
129 parser.add_argument(
130 '-colorm', '--color_map',
131 type = str,
132 choices = ["jet", "viridis"])
133
134 parser.add_argument(
135 '-idop', '--output_path',
136 type = str,
137 default='result',
138 help = 'output path for maps')
139
140 args :argparse.Namespace = parser.parse_args(args)
141 args.net = True # TODO SICCOME I FLUSSI POSSONO ESSERE ANCHE NEGATIVI SONO SEMPRE CONSIDERATI NETTI
142
143 return args
144
145 ############################ dataset input ####################################
146 def read_dataset(data :str, name :str) -> pd.DataFrame:
147 """
148 Tries to read the dataset from its path (data) as a tsv and turns it into a DataFrame.
149
150 Args:
151 data : filepath of a dataset (from frontend input params or literals upon calling)
152 name : name associated with the dataset (from frontend input params or literals upon calling)
153
154 Returns:
155 pd.DataFrame : dataset in a runtime operable shape
156
157 Raises:
158 sys.exit : if there's no data (pd.errors.EmptyDataError) or if the dataset has less than 2 columns
159 """
160 try:
161 dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
162 except pd.errors.EmptyDataError:
163 sys.exit('Execution aborted: wrong format of ' + name + '\n')
164 if len(dataset.columns) < 2:
165 sys.exit('Execution aborted: wrong format of ' + name + '\n')
166 return dataset
167
168 ############################ dataset name #####################################
169 def name_dataset(name_data :str, count :int) -> str:
170 """
171 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique.
172
173 Args:
174 name_data : name associated with the dataset (from frontend input params)
175 count : counter from 1 to make these names unique (external)
176
177 Returns:
178 str : the name made unique
179 """
180 if str(name_data) == 'Dataset':
181 return str(name_data) + '_' + str(count)
182 else:
183 return str(name_data)
184
185 ############################ map_methods ######################################
186 FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]]
187 def fold_change(avg1 :float, avg2 :float) -> FoldChange:
188 """
189 Calculates the fold change between two gene expression values.
190
191 Args:
192 avg1 : average expression value from one dataset avg2 : average expression value from the other dataset
193
194 Returns:
195 FoldChange :
196 0 : when both input values are 0
197 "-INF" : when avg1 is 0
198 "INF" : when avg2 is 0
199 float : for any other combination of values
200 """
201 if avg1 == 0 and avg2 == 0:
202 return 0
203 elif avg1 == 0:
204 return '-INF'
205 elif avg2 == 0:
206 return 'INF'
207 else: # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1
208 return (avg1 - avg2) / (abs(avg1) + abs(avg2))
209
210 def getElementById(reactionId :str, metabMap :ET.ElementTree) -> utils.Result[ET.Element, utils.Result.ResultErr]:
211 """
212 Finds any element in the given map with the given ID. ID uniqueness in an svg file is recommended but
213 not enforced, if more than one element with the exact ID is found only the first will be returned.
214
215 Args:
216 reactionId (str): exact ID of the requested element.
217 metabMap (ET.ElementTree): metabolic map containing the element.
218
219 Returns:
220 utils.Result[ET.Element, ResultErr]: result of the search, either the first match found or a ResultErr.
221 """
222 return utils.Result.Ok(
223 f"//*[@id=\"{reactionId}\"]").map(
224 lambda xPath : metabMap.xpath(xPath)[0]).mapErr(
225 lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map"))
226 # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user.
227
228 def styleMapElement(element :ET.Element, styleStr :str) -> None:
229 currentStyles :str = element.get("style", "")
230 if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles):
231 currentStyles = ';'.join(currentStyles.split(';')[:-3])
232
233 element.set("style", currentStyles + styleStr)
234
235 class ReactionDirection(Enum):
236 Unknown = ""
237 Direct = "_F"
238 Inverse = "_B"
239
240 @classmethod
241 def fromDir(cls, s :str) -> "ReactionDirection":
242 # vvv as long as there's so few variants I actually condone the if spam:
243 if s == ReactionDirection.Direct.value: return ReactionDirection.Direct
244 if s == ReactionDirection.Inverse.value: return ReactionDirection.Inverse
245 return ReactionDirection.Unknown
246
247 @classmethod
248 def fromReactionId(cls, reactionId :str) -> "ReactionDirection":
249 return ReactionDirection.fromDir(reactionId[-2:])
250
251 def getArrowBodyElementId(reactionId :str) -> str:
252 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
253 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: reactionId = reactionId[:-2]
254 return f"R_{reactionId}"
255
256 def getArrowHeadElementId(reactionId :str) -> Tuple[str, str]:
257 """
258 We attempt extracting the direction information from the provided reaction ID, if unsuccessful we provide the IDs of both directions.
259
260 Args:
261 reactionId : the provided reaction ID.
262
263 Returns:
264 Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try.
265 """
266 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
267 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: return reactionId[:-3:-1] + reactionId[:-2], ""
268 return f"F_{reactionId}", f"B_{reactionId}"
269
270 class ArrowColor(Enum):
271 """
272 Encodes possible arrow colors based on their meaning in the enrichment process.
273 """
274 Invalid = "#BEBEBE" # gray, fold-change under treshold or not significant p-value
275 Transparent = "#ffffff00" # transparent, to make some arrow segments disappear
276 UpRegulated = "#ecac68" # red, up-regulated reaction
277 DownRegulated = "#6495ed" # blue, down-regulated reaction
278
279 UpRegulatedInv = "#FF0000"
280 # ^^^ different shade of red (actually orange), up-regulated net value for a reversible reaction with
281 # conflicting enrichment in the two directions.
282
283 DownRegulatedInv = "#0000FF"
284 # ^^^ different shade of blue (actually purple), down-regulated net value for a reversible reaction with
285 # conflicting enrichment in the two directions.
286
287 @classmethod
288 def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor":
289 colors = (cls.DownRegulated, cls.DownRegulatedInv) if foldChange < 0 else (cls.UpRegulated, cls.UpRegulatedInv)
290 return colors[useAltColor]
291
292 def __str__(self) -> str: return self.value
293
294 class Arrow:
295 """
296 Models the properties of a reaction arrow that change based on enrichment.
297 """
298 MIN_W = 2
299 MAX_W = 12
300
301 def __init__(self, width :int, col: ArrowColor, *, isDashed = False) -> None:
302 """
303 (Private) Initializes an instance of Arrow.
304
305 Args:
306 width : width of the arrow, ideally to be kept within Arrow.MIN_W and Arrow.MAX_W (not enforced).
307 col : color of the arrow.
308 isDashed : whether the arrow should be dashed, meaning the associated pValue resulted not significant.
309
310 Returns:
311 None : practically, a Arrow instance.
312 """
313 self.w = width
314 self.col = col
315 self.dash = isDashed
316
317 def applyTo(self, reactionId :str, metabMap :ET.ElementTree, styleStr :str) -> None:
318 if getElementById(reactionId, metabMap).map(lambda el : styleMapElement(el, styleStr)).isErr:
319 ERRORS.append(reactionId)
320
321 def styleReactionElements(self, metabMap :ET.ElementTree, reactionId :str, *, mindReactionDir = True) -> None:
322 if not mindReactionDir:
323 return self.applyTo(getArrowBodyElementId(reactionId), metabMap, self.toStyleStr())
324
325 # Now we style the arrow head(s):
326 idOpt1, idOpt2 = getArrowHeadElementId(reactionId)
327 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True))
328 if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True))
329
330 def styleReactionElementsMeanMedian(self, metabMap :ET.ElementTree, reactionId :str, isNegative:bool) -> None:
331
332 self.applyTo(getArrowBodyElementId(reactionId), metabMap, self.toStyleStr())
333 idOpt1, idOpt2 = getArrowHeadElementId(reactionId)
334
335 if(isNegative):
336 self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True))
337 self.col = ArrowColor.Transparent
338 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True)) #trasp
339 else:
340 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True))
341 self.col = ArrowColor.Transparent
342 self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True)) #trasp
343
344
345
346 def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str:
347 """
348 Computes the reaction ID as encoded in the map for a given reaction ID from the dataset.
349
350 Args:
351 reactionId: the reaction ID, as encoded in the dataset.
352 mindReactionDir: if True forward (F_) and backward (B_) directions will be encoded in the result.
353
354 Returns:
355 str : the ID of an arrow's body or tips in the map.
356 """
357 # we assume the reactionIds also don't encode reaction dir if they don't mind it when styling the map.
358 if not mindReactionDir: return "R_" + reactionId
359
360 #TODO: this is clearly something we need to make consistent in fluxes
361 return (reactionId[:-3:-1] + reactionId[:-2]) if reactionId[:-2] in ["_F", "_B"] else f"F_{reactionId}" # "Pyr_F" --> "F_Pyr"
362
363 def toStyleStr(self, *, downSizedForTips = False) -> str:
364 """
365 Collapses the styles of this Arrow into a str, ready to be applied as part of the "style" property on an svg element.
366
367 Returns:
368 str : the styles string.
369 """
370 width = self.w
371 if downSizedForTips: width *= 0.8
372 return f";stroke:{self.col};stroke-width:{width};stroke-dasharray:{'5,5' if self.dash else 'none'}"
373
374 # vvv These constants could be inside the class itself a static properties, but python
375 # was built by brainless organisms so here we are!
376 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid)
377 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True)
378
379 def applyFluxesEnrichmentToMap(fluxesEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None:
380 """
381 Applies fluxes enrichment results to the provided metabolic map.
382
383 Args:
384 fluxesEnrichmentRes : fluxes enrichment results.
385 metabMap : the metabolic map to edit.
386 maxNumericZScore : biggest finite z-score value found.
387
388 Side effects:
389 metabMap : mut
390
391 Returns:
392 None
393 """
394 for reactionId, values in fluxesEnrichmentRes.items():
395 pValue = values[0]
396 foldChange = values[1]
397 z_score = values[2]
398
399 if math.isnan(pValue) or (isinstance(foldChange, float) and math.isnan(foldChange)):
400 continue
401
402 if isinstance(foldChange, str): foldChange = float(foldChange)
403 if pValue > ARGS.pValue: # pValue above tresh: dashed arrow
404 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId)
405 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId, mindReactionDir = False)
406
407 continue
408
409 if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1):
410 INVALID_ARROW.styleReactionElements(metabMap, reactionId)
411 INVALID_ARROW.styleReactionElements(metabMap, reactionId, mindReactionDir = False)
412
413 continue
414
415 width = Arrow.MAX_W
416 if not math.isinf(z_score):
417 try:
418 width = min(
419 max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W),
420 Arrow.MAX_W)
421
422 except ZeroDivisionError: pass
423 # TODO CHECK RV
424 #if not reactionId.endswith("_RV"): # RV stands for reversible reactions
425 # Arrow(width, ArrowColor.fromFoldChangeSign(foldChange)).styleReactionElements(metabMap, reactionId)
426 # continue
427
428 #reactionId = reactionId[:-3] # Remove "_RV"
429
430 inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score
431 if inversionScore == 2: foldChange *= -1
432 # ^^^ Style the inverse direction with the opposite sign netValue
433
434 # If the score is 1 (opposite signs) we use alternative colors vvv
435 arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1))
436
437 # vvv These 2 if statements can both be true and can both happen
438 if ARGS.net: # style arrow head(s):
439 arrow.styleReactionElements(metabMap, reactionId + ("_B" if inversionScore == 2 else "_F"))
440 arrow.applyTo(("F_" if inversionScore == 2 else "B_") + reactionId, metabMap, f";stroke:{ArrowColor.Transparent};stroke-width:0;stroke-dasharray:None")
441
442 arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False)
443
444
445 ############################ split class ######################################
446 def split_class(classes :pd.DataFrame, resolve_rules :Dict[str, List[float]]) -> Dict[str, List[List[float]]]:
447 """
448 Generates a :dict that groups together data from a :DataFrame based on classes the data is related to.
449
450 Args:
451 classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict
452 resolve_rules : a :dict containing :float data
453
454 Returns:
455 dict : the dict with data grouped by class
456
457 Side effects:
458 classes : mut
459 """
460 class_pat :Dict[str, List[List[float]]] = {}
461 for i in range(len(classes)):
462 classe :str = classes.iloc[i, 1]
463 if pd.isnull(classe): continue
464
465 l :List[List[float]] = []
466 for j in range(i, len(classes)):
467 if classes.iloc[j, 1] == classe:
468 pat_id :str = classes.iloc[j, 0]
469 tmp = resolve_rules.get(pat_id, None)
470 if tmp != None:
471 l.append(tmp)
472 classes.iloc[j, 1] = None
473
474 if l:
475 class_pat[classe] = list(map(list, zip(*l)))
476 continue
477
478 utils.logWarning(
479 f"Warning: no sample found in class \"{classe}\", the class has been disregarded", ARGS.out_log)
480
481 return class_pat
482
483 ############################ conversion ##############################################
484 #conversion from svg to png
485 def svg_to_png_with_background(svg_path :utils.FilePath, png_path :utils.FilePath, dpi :int = 72, scale :int = 1, size :Optional[float] = None) -> None:
486 """
487 Internal utility to convert an SVG to PNG (forced opaque) to aid in PDF conversion.
488
489 Args:
490 svg_path : path to SVG file
491 png_path : path for new PNG file
492 dpi : dots per inch of the generated PNG
493 scale : scaling factor for the generated PNG, computed internally when a size is provided
494 size : final effective width of the generated PNG
495
496 Returns:
497 None
498 """
499 if size:
500 image = pyvips.Image.new_from_file(svg_path.show(), dpi=dpi, scale=1)
501 scale = size / image.width
502 image = image.resize(scale)
503 else:
504 image = pyvips.Image.new_from_file(svg_path.show(), dpi=dpi, scale=scale)
505
506 white_background = pyvips.Image.black(image.width, image.height).new_from_image([255, 255, 255])
507 white_background = white_background.affine([scale, 0, 0, scale])
508
509 if white_background.bands != image.bands:
510 white_background = white_background.extract_band(0)
511
512 composite_image = white_background.composite2(image, 'over')
513 composite_image.write_to_file(png_path.show())
514
515 #funzione unica, lascio fuori i file e li passo in input
516 #conversion from png to pdf
517 def convert_png_to_pdf(png_file :utils.FilePath, pdf_file :utils.FilePath) -> None:
518 """
519 Internal utility to convert a PNG to PDF to aid from SVG conversion.
520
521 Args:
522 png_file : path to PNG file
523 pdf_file : path to new PDF file
524
525 Returns:
526 None
527 """
528 image = Image.open(png_file.show())
529 image = image.convert("RGB")
530 image.save(pdf_file.show(), "PDF", resolution=100.0)
531
532 #function called to reduce redundancy in the code
533 def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None:
534 """
535 Converts the SVG map at the provided path to PDF.
536
537 Args:
538 file_svg : path to SVG file
539 file_png : path to PNG file
540 file_pdf : path to new PDF file
541
542 Returns:
543 None
544 """
545 svg_to_png_with_background(file_svg, file_png)
546 try:
547 convert_png_to_pdf(file_png, file_pdf)
548 print(f'PDF file {file_pdf.filePath} successfully generated.')
549
550 except Exception as e:
551 raise utils.DataErr(file_pdf.show(), f'Error generating PDF file: {e}')
552
553 ############################ map ##############################################
554 def buildOutputPath(dataset1Name :str, dataset2Name = "rest", *, details = "", ext :utils.FileFormat) -> utils.FilePath:
555 """
556 Builds a FilePath instance from the names of confronted datasets ready to point to a location in the
557 "result/" folder, used by this tool for output files in collections.
558
559 Args:
560 dataset1Name : _description_
561 dataset2Name : _description_. Defaults to "rest".
562 details : _description_
563 ext : _description_
564
565 Returns:
566 utils.FilePath : _description_
567 """
568 # This function returns a util data structure but is extremely specific to this module.
569 # RAS also uses collections as output and as such might benefit from a method like this, but I'd wait
570 # TODO: until a third tool with multiple outputs appears before porting this to utils.
571 return utils.FilePath(
572 f"{dataset1Name}_vs_{dataset2Name}" + (f" ({details})" if details else ""),
573 # ^^^ yes this string is built every time even if the form is the same for the same 2 datasets in
574 # all output files: I don't care, this was never the performance bottleneck of the tool and
575 # there is no other net gain in saving and re-using the built string.
576 ext,
577 prefix = ARGS.output_path)
578
579 FIELD_NOT_AVAILABLE = '/'
580 def writeToCsv(rows: List[list], fieldNames :List[str], outPath :utils.FilePath) -> None:
581 fieldsAmt = len(fieldNames)
582 with open(outPath.show(), "w", newline = "") as fd:
583 writer = csv.DictWriter(fd, fieldnames = fieldNames, delimiter = '\t')
584 writer.writeheader()
585
586 for row in rows:
587 sizeMismatch = fieldsAmt - len(row)
588 if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch)
589 writer.writerow({ field : data for field, data in zip(fieldNames, row) })
590
591 OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible
592 def writeTabularResult(enrichedScores : OldEnrichedScores, outPath :utils.FilePath) -> None:
593 fieldNames = ["ids", "P_Value", "fold change", "z-score"]
594 fieldNames.extend(["average_1", "average_2"])
595
596 writeToCsv([ [reactId] + values for reactId, values in enrichedScores.items() ], fieldNames, outPath)
597
598 def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest") -> None:
599 # this function compiles the things always in common between comparison modes after enrichment.
600 # TODO: organize, name better.
601 writeTabularResult(tmp, buildOutputPath(dataset1Name, dataset2Name, details = "Tabular Result", ext = utils.FileFormat.TSV))
602 for reactId, enrichData in tmp.items(): tmp[reactId] = tuple(enrichData)
603 applyFluxesEnrichmentToMap(tmp, core_map, max_z_score)
604
605 def computePValue(dataset1Data: List[float], dataset2Data: List[float]) -> Tuple[float, float]:
606 """
607 Computes the statistical significance score (P-value) of the comparison between coherent data
608 from two datasets. The data is supposed to, in both datasets:
609 - be related to the same reaction ID;
610 - be ordered by sample, such that the item at position i in both lists is related to the
611 same sample or cell line.
612
613 Args:
614 dataset1Data : data from the 1st dataset.
615 dataset2Data : data from the 2nd dataset.
616
617 Returns:
618 tuple: (P-value, Z-score)
619 - P-value from the selected test on the provided data.
620 - Z-score of the difference between means of the two datasets.
621 """
622
623 match ARGS.test:
624 case "ks":
625 # Perform Kolmogorov-Smirnov test
626 _, p_value = st.ks_2samp(dataset1Data, dataset2Data)
627 case "ttest_p":
628 # Datasets should have same size
629 if len(dataset1Data) != len(dataset2Data):
630 raise ValueError("Datasets must have the same size for paired t-test.")
631 # Perform t-test for paired samples
632 _, p_value = st.ttest_rel(dataset1Data, dataset2Data)
633 case "ttest_ind":
634 # Perform t-test for independent samples
635 _, p_value = st.ttest_ind(dataset1Data, dataset2Data)
636 case "wilcoxon":
637 # Datasets should have same size
638 if len(dataset1Data) != len(dataset2Data):
639 raise ValueError("Datasets must have the same size for Wilcoxon signed-rank test.")
640 # Perform Wilcoxon signed-rank test
641 np.random.seed(42) # Ensure reproducibility since zsplit method is used
642 _, p_value = st.wilcoxon(dataset1Data, dataset2Data, zero_method="zsplit")
643 case "mw":
644 # Perform Mann-Whitney U test
645 _, p_value = st.mannwhitneyu(dataset1Data, dataset2Data)
646
647 # Calculate means and standard deviations
648 mean1 = np.nanmean(dataset1Data)
649 mean2 = np.nanmean(dataset2Data)
650 std1 = np.nanstd(dataset1Data, ddof=1)
651 std2 = np.nanstd(dataset2Data, ddof=1)
652
653 n1 = len(dataset1Data)
654 n2 = len(dataset2Data)
655
656 # Calculate Z-score
657 z_score = (mean1 - mean2) / np.sqrt((std1**2 / n1) + (std2**2 / n2))
658
659 return p_value, z_score
660
661 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]:
662 #TODO: the following code still suffers from "dumbvarnames-osis"
663 comparisonResult :Dict[str, List[Union[float, FoldChange]]] = {}
664 count = 0
665 max_z_score = 0
666 for l1, l2 in zip(dataset1Data, dataset2Data):
667 reactId = ids[count]
668 count += 1
669 if not reactId: continue # we skip ids that have already been processed
670
671 try:
672 p_value, z_score = computePValue(l1, l2)
673 avg1 = sum(l1) / len(l1)
674 avg2 = sum(l2) / len(l2)
675 f_c = fold_change(avg1, avg2)
676 if np.isfinite(z_score) and max_z_score < abs(z_score): max_z_score = abs(z_score)
677
678 comparisonResult[reactId] = [float(p_value), f_c, z_score, avg1, avg2]
679 except (TypeError, ZeroDivisionError): continue
680
681 # Apply multiple testing correction if set by the user
682 if ARGS.adjusted:
683
684 # Retrieve the p-values from the comparisonResult dictionary, they have to be different from NaN
685 validPValues = [(reactId, result[0]) for reactId, result in comparisonResult.items() if not np.isnan(result[0])]
686
687 if not validPValues:
688 return comparisonResult, max_z_score
689
690 # Unpack the valid p-values
691 reactIds, pValues = zip(*validPValues)
692 # Adjust the p-values using the Benjamini-Hochberg method
693 adjustedPValues = st.false_discovery_control(pValues)
694 # Update the comparisonResult dictionary with the adjusted p-values
695 for reactId , adjustedPValue in zip(reactIds, adjustedPValues):
696 comparisonResult[reactId][0] = adjustedPValue
697
698 return comparisonResult, max_z_score
699
700 def computeEnrichment(class_pat :Dict[str, List[List[float]]], ids :List[str]) -> List[Tuple[str, str, dict, float]]:
701 """
702 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the
703 provided metabolic map.
704
705 Args:
706 class_pat : the clustered data.
707 ids : ids for data association.
708
709
710 Returns:
711 List[Tuple[str, str, dict, float]]: List of tuples with pairs of dataset names, comparison dictionary, and max z-score.
712
713 Raises:
714 sys.exit : if there are less than 2 classes for comparison
715
716 """
717 class_pat = { k.strip() : v for k, v in class_pat.items() }
718 #TODO: simplfy this stuff vvv and stop using sys.exit (raise the correct utils error)
719 if (not class_pat) or (len(class_pat.keys()) < 2): sys.exit('Execution aborted: classes provided for comparisons are less than two\n')
720
721 enrichment_results = []
722
723
724 if ARGS.comparison == "manyvsmany":
725 for i, j in it.combinations(class_pat.keys(), 2):
726 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids)
727 enrichment_results.append((i, j, comparisonDict, max_z_score))
728
729 elif ARGS.comparison == "onevsrest":
730 for single_cluster in class_pat.keys():
731 rest = [item for k, v in class_pat.items() if k != single_cluster for item in v]
732
733 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(single_cluster), rest, ids)
734 enrichment_results.append((single_cluster, "rest", comparisonDict, max_z_score))
735
736 #elif ARGS.comparison == "onevsmany":
737 # controlItems = class_pat.get(ARGS.control)
738 # for otherDataset in class_pat.keys():
739 # if otherDataset == ARGS.control:
740 # continue
741 # comparisonDict, max_z_score = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids)
742 # enrichment_results.append((ARGS.control, otherDataset, comparisonDict, max_z_score))
743 elif ARGS.comparison == "onevsmany":
744 controlItems = class_pat.get(ARGS.control)
745 for otherDataset in class_pat.keys():
746 if otherDataset == ARGS.control:
747 continue
748 comparisonDict, max_z_score = compareDatasetPair(class_pat.get(otherDataset),controlItems, ids)
749 enrichment_results.append(( otherDataset,ARGS.control, comparisonDict, max_z_score))
750
751 return enrichment_results
752
753 def createOutputMaps(dataset1Name :str, dataset2Name :str, core_map :ET.ElementTree) -> None:
754 svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details="SVG Map", ext=utils.FileFormat.SVG)
755 utils.writeSvg(svgFilePath, core_map)
756
757 if ARGS.generate_pdf:
758 pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG)
759 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF)
760 convert_to_pdf(svgFilePath, pngPath, pdfPath)
761
762 if not ARGS.generate_svg:
763 os.remove(svgFilePath.show())
764
765 ClassPat = Dict[str, List[List[float]]]
766 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]:
767 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate,
768 # for the sake of everyone's sanity.
769 class_pat :ClassPat = {}
770 if ARGS.option == 'datasets':
771 num = 1 #TODO: the dataset naming function could be a generator
772 for path, name in zip(datasetsPaths, names):
773 name = name_dataset(name, num)
774 resolve_rules_float, ids = getDatasetValues(path, name)
775 if resolve_rules_float != None:
776 class_pat[name] = list(map(list, zip(*resolve_rules_float.values())))
777
778 num += 1
779
780 elif ARGS.option == "dataset_class":
781 classes = read_dataset(classPath, "class")
782 classes = classes.astype(str)
783 resolve_rules_float, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)")
784 #check if classes have match on ids
785 if not all(classes.iloc[:, 0].isin(ids)):
786 utils.logWarning(
787 "No match between classes and sample IDs", ARGS.out_log)
788 if resolve_rules_float != None: class_pat = split_class(classes, resolve_rules_float)
789
790 return ids, class_pat
791 #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs)
792
793 #TODO: create these damn args as FilePath objects
794 def getDatasetValues(datasetPath :str, datasetName :str) -> Tuple[ClassPat, List[str]]:
795 """
796 Opens the dataset at the given path and extracts the values (expected nullable numerics) and the IDs.
797
798 Args:
799 datasetPath : path to the dataset
800 datasetName (str): dataset name, used in error reporting
801
802 Returns:
803 Tuple[ClassPat, List[str]]: values and IDs extracted from the dataset
804 """
805 dataset = read_dataset(datasetPath, datasetName)
806
807 # Ensure the first column is treated as the reaction name
808 dataset = dataset.set_index(dataset.columns[0])
809
810 # Check if required reactions exist in the dataset
811 required_reactions = ['EX_lac__L_e', 'EX_glc__D_e', 'EX_gln__L_e', 'EX_glu__L_e']
812 missing_reactions = [reaction for reaction in required_reactions if reaction not in dataset.index]
813
814 if missing_reactions:
815 sys.exit(f'Execution aborted: Missing required reactions {missing_reactions} in {datasetName}\n')
816
817 # Calculate new rows using safe division
818 lact_glc = np.divide(
819 np.clip(dataset.loc['EX_lac__L_e'].to_numpy(), a_min=0, a_max=None),
820 np.clip(dataset.loc['EX_glc__D_e'].to_numpy(), a_min=None, a_max=0),
821 out=np.full_like(dataset.loc['EX_lac__L_e'].to_numpy(), np.nan), # Prepara un array con NaN come output di default
822 where=dataset.loc['EX_glc__D_e'].to_numpy() != 0 # Condizione per evitare la divisione per zero
823 )
824 lact_gln = np.divide(
825 np.clip(dataset.loc['EX_lac__L_e'].to_numpy(), a_min=0, a_max=None),
826 np.clip(dataset.loc['EX_gln__L_e'].to_numpy(), a_min=None, a_max=0),
827 out=np.full_like(dataset.loc['EX_lac__L_e'].to_numpy(), np.nan),
828 where=dataset.loc['EX_gln__L_e'].to_numpy() != 0
829 )
830 lact_o2 = np.divide(
831 np.clip(dataset.loc['EX_lac__L_e'].to_numpy(), a_min=0, a_max=None),
832 np.clip(dataset.loc['EX_o2_e'].to_numpy(), a_min=None, a_max=0),
833 out=np.full_like(dataset.loc['EX_lac__L_e'].to_numpy(), np.nan),
834 where=dataset.loc['EX_o2_e'].to_numpy() != 0
835 )
836 glu_gln = np.divide(
837 dataset.loc['EX_glu__L_e'].to_numpy(),
838 np.clip(dataset.loc['EX_gln__L_e'].to_numpy(), a_min=None, a_max=0),
839 out=np.full_like(dataset.loc['EX_lac__L_e'].to_numpy(), np.nan),
840 where=dataset.loc['EX_gln__L_e'].to_numpy() != 0
841 )
842
843
844 values = {'lact_glc': lact_glc, 'lact_gln': lact_gln, 'lact_o2': lact_o2, 'glu_gln': glu_gln}
845
846 # Sostituzione di inf e NaN con 0 se necessario
847 for key in values:
848 values[key] = np.nan_to_num(values[key], nan=0.0, posinf=0.0, neginf=0.0)
849
850 # Creazione delle nuove righe da aggiungere al dataset
851 new_rows = pd.DataFrame({
852 dataset.index.name: ['LactGlc', 'LactGln', 'LactO2', 'GluGln'],
853 **{col: [values['lact_glc'][i], values['lact_gln'][i], values['lact_o2'][i], values['glu_gln'][i]]
854 for i, col in enumerate(dataset.columns)}
855 })
856
857 #print(new_rows)
858
859 # Ritorna il dataset originale con le nuove righe
860 dataset.reset_index(inplace=True)
861 dataset = pd.concat([dataset, new_rows], ignore_index=True)
862
863 IDs = pd.Series.tolist(dataset.iloc[:, 0].astype(str))
864
865 dataset = dataset.drop(dataset.columns[0], axis = "columns").to_dict("list")
866 return { id : list(map(utils.Float("Dataset values, not an argument"), values)) for id, values in dataset.items() }, IDs
867
868 def rgb_to_hex(rgb):
869 """
870 Convert RGB values (0-1 range) to hexadecimal color format.
871
872 Args:
873 rgb (numpy.ndarray): An array of RGB color components (in the range [0, 1]).
874
875 Returns:
876 str: The color in hexadecimal format (e.g., '#ff0000' for red).
877 """
878 # Convert RGB values (0-1 range) to hexadecimal format
879 rgb = (np.array(rgb) * 255).astype(int)
880 return '#{:02x}{:02x}{:02x}'.format(rgb[0], rgb[1], rgb[2])
881
882 def save_colormap_image(min_value: float, max_value: float, path: utils.FilePath, colorMap:str="viridis"):
883 """
884 Create and save an image of the colormap showing the gradient and its range.
885
886 Args:
887 min_value (float): The minimum value of the colormap range.
888 max_value (float): The maximum value of the colormap range.
889 filename (str): The filename for saving the image.
890 """
891
892 # Create a colormap using matplotlib
893 cmap = plt.get_cmap(colorMap)
894
895 # Create a figure and axis
896 fig, ax = plt.subplots(figsize=(6, 1))
897 fig.subplots_adjust(bottom=0.5)
898
899 # Create a gradient image
900 gradient = np.linspace(0, 1, 256)
901 gradient = np.vstack((gradient, gradient))
902
903 # Add min and max value annotations
904 ax.text(0, 0.5, f'{np.round(min_value, 3)}', va='center', ha='right', transform=ax.transAxes, fontsize=12, color='black')
905 ax.text(1, 0.5, f'{np.round(max_value, 3)}', va='center', ha='left', transform=ax.transAxes, fontsize=12, color='black')
906
907
908 # Display the gradient image
909 ax.imshow(gradient, aspect='auto', cmap=cmap)
910 ax.set_axis_off()
911
912 # Save the image
913 plt.savefig(path.show(), bbox_inches='tight', pad_inches=0)
914 plt.close()
915 pass
916
917 def min_nonzero_abs(arr):
918 # Flatten the array and filter out zeros, then find the minimum of the remaining values
919 non_zero_elements = np.abs(arr)[np.abs(arr) > 0]
920 return np.min(non_zero_elements) if non_zero_elements.size > 0 else None
921
922 def computeEnrichmentMeanMedian(metabMap: ET.ElementTree, class_pat: Dict[str, List[List[float]]], ids: List[str], colormap:str) -> None:
923 """
924 Compute and visualize the metabolic map based on mean and median of the input fluxes.
925 The fluxes are normalised across classes/datasets and visualised using the given colormap.
926
927 Args:
928 metabMap (ET.ElementTree): An XML tree representing the metabolic map.
929 class_pat (Dict[str, List[List[float]]]): A dictionary where keys are class names and values are lists of enrichment values.
930 ids (List[str]): A list of reaction IDs to be used for coloring arrows.
931
932 Returns:
933 None
934 """
935 # Create copies only if they are needed
936 metabMap_mean = copy.deepcopy(metabMap)
937 metabMap_median = copy.deepcopy(metabMap)
938
939 # Compute medians and means
940 medians = {key: np.round(np.nanmedian(np.array(value), axis=1), 6) for key, value in class_pat.items()}
941 means = {key: np.round(np.nanmean(np.array(value), axis=1),6) for key, value in class_pat.items()}
942
943 # Normalize medians and means
944 max_flux_medians = max(np.max(np.abs(arr)) for arr in medians.values())
945 max_flux_means = max(np.max(np.abs(arr)) for arr in means.values())
946
947 min_flux_medians = min(min_nonzero_abs(arr) for arr in medians.values())
948 min_flux_means = min(min_nonzero_abs(arr) for arr in means.values())
949
950 medians = {key: median/max_flux_medians for key, median in medians.items()}
951 means = {key: mean/max_flux_means for key, mean in means.items()}
952
953 save_colormap_image(min_flux_medians, max_flux_medians, utils.FilePath("Color map median", ext=utils.FileFormat.PNG, prefix=ARGS.output_path), colormap)
954 save_colormap_image(min_flux_means, max_flux_means, utils.FilePath("Color map mean", ext=utils.FileFormat.PNG, prefix=ARGS.output_path), colormap)
955
956 cmap = plt.get_cmap(colormap)
957
958 min_width = 2.0 # Minimum arrow width
959 max_width = 15.0 # Maximum arrow width
960
961 for key in class_pat:
962 # Create color mappings for median and mean
963 colors_median = {
964 rxn_id: rgb_to_hex(cmap(abs(medians[key][i]))) if medians[key][i] != 0 else '#bebebe' #grey blocked
965 for i, rxn_id in enumerate(ids)
966 }
967
968 colors_mean = {
969 rxn_id: rgb_to_hex(cmap(abs(means[key][i]))) if means[key][i] != 0 else '#bebebe' #grey blocked
970 for i, rxn_id in enumerate(ids)
971 }
972
973 for i, rxn_id in enumerate(ids):
974 # Calculate arrow width for median
975 width_median = np.interp(abs(medians[key][i]), [0, 1], [min_width, max_width])
976 isNegative = medians[key][i] < 0
977 apply_arrow(metabMap_median, rxn_id, colors_median[rxn_id], isNegative, width_median)
978
979 # Calculate arrow width for mean
980 width_mean = np.interp(abs(means[key][i]), [0, 1], [min_width, max_width])
981 isNegative = means[key][i] < 0
982 apply_arrow(metabMap_mean, rxn_id, colors_mean[rxn_id], isNegative, width_mean)
983
984 # Save and convert the SVG files
985 save_and_convert(metabMap_mean, "mean", key)
986 save_and_convert(metabMap_median, "median", key)
987
988 def apply_arrow(metabMap, rxn_id, color, isNegative, width=5):
989 """
990 Apply an arrow to a specific reaction in the metabolic map with a given color.
991
992 Args:
993 metabMap (ET.ElementTree): An XML tree representing the metabolic map.
994 rxn_id (str): The ID of the reaction to which the arrow will be applied.
995 color (str): The color of the arrow in hexadecimal format.
996 isNegative (bool): A boolean indicating if the arrow represents a negative value.
997 width (int): The width of the arrow.
998
999 Returns:
1000 None
1001 """
1002 arrow = Arrow(width=width, col=color)
1003 arrow.styleReactionElementsMeanMedian(metabMap, rxn_id, isNegative)
1004 pass
1005
1006 def save_and_convert(metabMap, map_type, key):
1007 """
1008 Save the metabolic map as an SVG file and optionally convert it to PNG and PDF formats.
1009
1010 Args:
1011 metabMap (ET.ElementTree): An XML tree representing the metabolic map.
1012 map_type (str): The type of map ('mean' or 'median').
1013 key (str): The key identifying the specific map.
1014
1015 Returns:
1016 None
1017 """
1018 svgFilePath = utils.FilePath(f"SVG Map {map_type} - {key}", ext=utils.FileFormat.SVG, prefix=ARGS.output_path)
1019 utils.writeSvg(svgFilePath, metabMap)
1020 if ARGS.generate_pdf:
1021 pngPath = utils.FilePath(f"PNG Map {map_type} - {key}", ext=utils.FileFormat.PNG, prefix=ARGS.output_path)
1022 pdfPath = utils.FilePath(f"PDF Map {map_type} - {key}", ext=utils.FileFormat.PDF, prefix=ARGS.output_path)
1023 convert_to_pdf(svgFilePath, pngPath, pdfPath)
1024 if not ARGS.generate_svg:
1025 os.remove(svgFilePath.show())
1026
1027 ############################ MAIN #############################################
1028 def main(args:List[str] = None) -> None:
1029 """
1030 Initializes everything and sets the program in motion based on the fronted input arguments.
1031
1032 Returns:
1033 None
1034
1035 Raises:
1036 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError)
1037 """
1038
1039 global ARGS
1040 ARGS = process_args(args)
1041
1042 if ARGS.custom_map == 'None':
1043 ARGS.custom_map = None
1044
1045 if os.path.isdir(ARGS.output_path) == False: os.makedirs(ARGS.output_path)
1046
1047 core_map :ET.ElementTree = ARGS.choice_map.getMap(
1048 ARGS.tool_dir,
1049 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None)
1050 # TODO: ^^^ ugly but fine for now, the argument is None if the model isn't custom because no file was given.
1051 # getMap will None-check the customPath and panic when the model IS custom but there's no file (good). A cleaner
1052 # solution can be derived from my comment in FilePath.fromStrPath
1053
1054 ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas_fluxes, ARGS.input_data_fluxes, ARGS.input_class_fluxes, ARGS.names_fluxes)
1055
1056 if(ARGS.choice_map == utils.Model.HMRcore):
1057 temp_map = utils.Model.HMRcore_no_legend
1058 computeEnrichmentMeanMedian(temp_map.getMap(ARGS.tool_dir), class_pat, ids, ARGS.color_map)
1059 elif(ARGS.choice_map == utils.Model.ENGRO2):
1060 temp_map = utils.Model.ENGRO2_no_legend
1061 computeEnrichmentMeanMedian(temp_map.getMap(ARGS.tool_dir), class_pat, ids, ARGS.color_map)
1062 else:
1063 computeEnrichmentMeanMedian(core_map, class_pat, ids, ARGS.color_map)
1064
1065
1066 enrichment_results = computeEnrichment(class_pat, ids)
1067 for i, j, comparisonDict, max_z_score in enrichment_results:
1068 map_copy = copy.deepcopy(core_map)
1069 temp_thingsInCommon(comparisonDict, map_copy, max_z_score, i, j)
1070 createOutputMaps(i, j, map_copy)
1071
1072 if not ERRORS: return
1073 utils.logWarning(
1074 f"The following reaction IDs were mentioned in the dataset but weren't found in the map: {ERRORS}",
1075 ARGS.out_log)
1076
1077 print('Execution succeded')
1078
1079 ###############################################################################
1080 if __name__ == "__main__":
1081 main()
1082