comparison COBRAxy/src/marea.py @ 539:2fb97466e404 draft

Uploaded
author francesco_lapi
date Sat, 25 Oct 2025 14:55:13 +0000
parents
children fcdbc81feb45
comparison
equal deleted inserted replaced
538:fd53d42348bd 539:2fb97466e404
1 """
2 MAREA: Enrichment and map styling for RAS/RPS data.
3
4 This module compares groups of samples using RAS (Reaction Activity Scores) and/or
5 RPS (Reaction Propensity Scores), computes statistics (p-values, z-scores, fold change),
6 and applies visual styling to an SVG metabolic map (with optional PDF/PNG export).
7 """
8 from __future__ import division
9 import csv
10 from enum import Enum
11 import re
12 import sys
13 import numpy as np
14 import pandas as pd
15 import itertools as it
16 import scipy.stats as st
17 import lxml.etree as ET
18 import math
19 import utils.general_utils as utils
20 from PIL import Image
21 import os
22 import argparse
23 import pyvips
24 from typing import Tuple, Union, Optional, List, Dict
25 import copy
26
27 from pydeseq2.dds import DeseqDataSet
28 from pydeseq2.default_inference import DefaultInference
29 from pydeseq2.ds import DeseqStats
30
31 ERRORS = []
32 ########################## argparse ##########################################
33 ARGS :argparse.Namespace
34 def process_args(args:List[str] = None) -> argparse.Namespace:
35 """
36 Parse command-line arguments exposed by the Galaxy frontend for this module.
37
38 Args:
39 args: Optional list of arguments, defaults to sys.argv when None.
40
41 Returns:
42 Namespace: Parsed arguments.
43 """
44 parser = argparse.ArgumentParser(
45 usage = "%(prog)s [options]",
46 description = "process some value's genes to create a comparison's map.")
47
48 #General:
49 parser.add_argument(
50 '-td', '--tool_dir',
51 type = str,
52 required = True,
53 help = 'your tool directory')
54
55 parser.add_argument('-on', '--control', type = str)
56 parser.add_argument('-ol', '--out_log', help = "Output log")
57
58 #Computation details:
59 parser.add_argument(
60 '-co', '--comparison',
61 type = str,
62 default = 'manyvsmany',
63 choices = ['manyvsmany', 'onevsrest', 'onevsmany'])
64
65 parser.add_argument(
66 '-te' ,'--test',
67 type = str,
68 default = 'ks',
69 choices = ['ks', 'ttest_p', 'ttest_ind', 'wilcoxon', 'mw', 'DESeq'],
70 help = 'Statistical test to use (default: %(default)s)')
71
72 parser.add_argument(
73 '-pv' ,'--pValue',
74 type = float,
75 default = 0.1,
76 help = 'P-Value threshold (default: %(default)s)')
77
78 parser.add_argument(
79 '-adj' ,'--adjusted',
80 type = utils.Bool("adjusted"), default = False,
81 help = 'Apply the FDR (Benjamini-Hochberg) correction (default: %(default)s)')
82
83 parser.add_argument(
84 '-fc', '--fChange',
85 type = float,
86 default = 1.5,
87 help = 'Fold-Change threshold (default: %(default)s)')
88
89 parser.add_argument(
90 "-ne", "--net",
91 type = utils.Bool("net"), default = False,
92 help = "choose if you want net enrichment for RPS")
93
94 parser.add_argument(
95 '-op', '--option',
96 type = str,
97 choices = ['datasets', 'dataset_class'],
98 help='dataset or dataset and class')
99
100 #RAS:
101 parser.add_argument(
102 "-ra", "--using_RAS",
103 type = utils.Bool("using_RAS"), default = True,
104 help = "choose whether to use RAS datasets.")
105
106 parser.add_argument(
107 '-id', '--input_data',
108 type = str,
109 help = 'input dataset')
110
111 parser.add_argument(
112 '-ic', '--input_class',
113 type = str,
114 help = 'sample group specification')
115
116 parser.add_argument(
117 '-ids', '--input_datas',
118 type = str,
119 nargs = '+',
120 help = 'input datasets')
121
122 parser.add_argument(
123 '-na', '--names',
124 type = str,
125 nargs = '+',
126 help = 'input names')
127
128 #RPS:
129 parser.add_argument(
130 "-rp", "--using_RPS",
131 type = utils.Bool("using_RPS"), default = False,
132 help = "choose whether to use RPS datasets.")
133
134 parser.add_argument(
135 '-idr', '--input_data_rps',
136 type = str,
137 help = 'input dataset rps')
138
139 parser.add_argument(
140 '-icr', '--input_class_rps',
141 type = str,
142 help = 'sample group specification rps')
143
144 parser.add_argument(
145 '-idsr', '--input_datas_rps',
146 type = str,
147 nargs = '+',
148 help = 'input datasets rps')
149
150 parser.add_argument(
151 '-nar', '--names_rps',
152 type = str,
153 nargs = '+',
154 help = 'input names rps')
155
156 #Output:
157 parser.add_argument(
158 "-gs", "--generate_svg",
159 type = utils.Bool("generate_svg"), default = True,
160 help = "choose whether to use RAS datasets.")
161
162 parser.add_argument(
163 "-gp", "--generate_pdf",
164 type = utils.Bool("generate_pdf"), default = True,
165 help = "choose whether to use RAS datasets.")
166
167 parser.add_argument(
168 '-cm', '--custom_map',
169 type = str,
170 help='custom map to use')
171
172 parser.add_argument(
173 '-idop', '--output_path',
174 type = str,
175 default='result',
176 help = 'output path for maps')
177
178 parser.add_argument(
179 '-mc', '--choice_map',
180 type = utils.Model, default = utils.Model.HMRcore,
181 choices = [utils.Model.HMRcore, utils.Model.ENGRO2, utils.Model.Custom])
182
183 args :argparse.Namespace = parser.parse_args(args)
184 if args.using_RAS and not args.using_RPS: args.net = False
185
186 return args
187
188 ############################ dataset input ####################################
189 def read_dataset(data :str, name :str) -> pd.DataFrame:
190 """
191 Tries to read the dataset from its path (data) as a tsv and turns it into a DataFrame.
192
193 Args:
194 data : filepath of a dataset (from frontend input params or literals upon calling)
195 name : name associated with the dataset (from frontend input params or literals upon calling)
196
197 Returns:
198 pd.DataFrame : dataset in a runtime operable shape
199
200 Raises:
201 sys.exit : if there's no data (pd.errors.EmptyDataError) or if the dataset has less than 2 columns
202 """
203 try:
204 dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
205 except pd.errors.EmptyDataError:
206 sys.exit('Execution aborted: wrong format of ' + name + '\n')
207 if len(dataset.columns) < 2:
208 sys.exit('Execution aborted: wrong format of ' + name + '\n')
209 return dataset
210
211 ############################ map_methods ######################################
212 FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]]
213 def fold_change(avg1 :float, avg2 :float) -> FoldChange:
214 """
215 Calculates the fold change between two gene expression values.
216
217 Args:
218 avg1 : average expression value from one dataset avg2 : average expression value from the other dataset
219
220 Returns:
221 FoldChange :
222 0 : when both input values are 0
223 "-INF" : when avg1 is 0
224 "INF" : when avg2 is 0
225 float : for any other combination of values
226 """
227 if avg1 == 0 and avg2 == 0:
228 return 0
229
230 if avg1 == 0:
231 return '-INF' # TODO: maybe fix
232
233 if avg2 == 0:
234 return 'INF'
235
236 # (threshold_F_C - 1) / (abs(threshold_F_C) + 1) con threshold_F_C > 1
237 return (avg1 - avg2) / (abs(avg1) + abs(avg2))
238
239 # TODO: I would really like for this one to get the Thanos treatment
240 def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str:
241 """
242 Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params.
243
244 Args:
245 l : current style string of an SVG element
246 col : new value for the "stroke" style property
247 width : new value for the "stroke-width" style property
248 dash : new value for the "stroke-dasharray" style property
249
250 Returns:
251 str : the fixed style string
252 """
253 tmp = l.split(';')
254 flag_col = False
255 flag_width = False
256 flag_dash = False
257 for i in range(len(tmp)):
258 if tmp[i].startswith('stroke:'):
259 tmp[i] = 'stroke:' + col
260 flag_col = True
261 if tmp[i].startswith('stroke-width:'):
262 tmp[i] = 'stroke-width:' + width
263 flag_width = True
264 if tmp[i].startswith('stroke-dasharray:'):
265 tmp[i] = 'stroke-dasharray:' + dash
266 flag_dash = True
267 if not flag_col:
268 tmp.append('stroke:' + col)
269 if not flag_width:
270 tmp.append('stroke-width:' + width)
271 if not flag_dash:
272 tmp.append('stroke-dasharray:' + dash)
273 return ';'.join(tmp)
274
275 def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_z_score :float) -> ET.ElementTree:
276 """
277 Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs.
278
279 Args:
280 d : dictionary mapping a p-value and a fold-change value (values) to each reaction ID as encoded in the SVG map (keys)
281 core_map : SVG map to modify
282 threshold_P_V : threshold for a p-value to be considered significant
283 threshold_F_C : threshold for a fold change value to be considered significant
284 max_z_score : highest z-score (absolute value)
285
286 Returns:
287 ET.ElementTree : the modified core_map
288
289 Side effects:
290 core_map : mut
291 """
292 maxT = 12
293 minT = 2
294 grey = '#BEBEBE'
295 blue = '#6495ed'
296 red = '#ecac68'
297 for el in core_map.iter():
298 el_id = str(el.get('id'))
299 if el_id.startswith('R_'):
300 tmp = d.get(el_id[2:])
301 if tmp != None:
302 p_val, f_c, z_score, avg1, avg2 = tmp
303
304 if math.isnan(p_val) or (isinstance(f_c, float) and math.isnan(f_c)): continue
305
306 if p_val <= threshold_P_V: # p-value is OK
307 if not isinstance(f_c, str): # FC is finite
308 if abs(f_c) < ((threshold_F_C - 1) / (abs(threshold_F_C) + 1)): # FC is not OK
309 col = grey
310 width = str(minT)
311 else: # FC is OK
312 if f_c < 0:
313 col = blue
314 elif f_c > 0:
315 col = red
316 width = str(
317 min(
318 max(abs(z_score * maxT) / max_z_score, minT),
319 maxT))
320
321 else: # FC is infinite
322 if f_c == '-INF':
323 col = blue
324 elif f_c == 'INF':
325 col = red
326 width = str(maxT)
327 dash = 'none'
328 else: # p-value is not OK
329 dash = '5,5'
330 col = grey
331 width = str(minT)
332 el.set('style', fix_style(el.get('style', ""), col, width, dash))
333 return core_map
334
335 def getElementById(reactionId :str, metabMap :ET.ElementTree) -> utils.Result[ET.Element, utils.Result.ResultErr]:
336 """
337 Finds any element in the given map with the given ID. ID uniqueness in an svg file is recommended but
338 not enforced, if more than one element with the exact ID is found only the first will be returned.
339
340 Args:
341 reactionId (str): exact ID of the requested element.
342 metabMap (ET.ElementTree): metabolic map containing the element.
343
344 Returns:
345 utils.Result[ET.Element, ResultErr]: result of the search, either the first match found or a ResultErr.
346 """
347 return utils.Result.Ok(
348 f"//*[@id=\"{reactionId}\"]").map(
349 lambda xPath : metabMap.xpath(xPath)[0]).mapErr(
350 lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map"))
351
352 def styleMapElement(element :ET.Element, styleStr :str) -> None:
353 """Append/override stroke-related styles on a given SVG element."""
354 currentStyles :str = element.get("style", "")
355 if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles):
356 currentStyles = ';'.join(currentStyles.split(';')[:-3])
357
358 element.set("style", currentStyles + styleStr)
359
360 class ReactionDirection(Enum):
361 Unknown = ""
362 Direct = "_F"
363 Inverse = "_B"
364
365 @classmethod
366 def fromDir(cls, s :str) -> "ReactionDirection":
367 # vvv as long as there's so few variants I actually condone the if spam:
368 if s == ReactionDirection.Direct.value: return ReactionDirection.Direct
369 if s == ReactionDirection.Inverse.value: return ReactionDirection.Inverse
370 return ReactionDirection.Unknown
371
372 @classmethod
373 def fromReactionId(cls, reactionId :str) -> "ReactionDirection":
374 return ReactionDirection.fromDir(reactionId[-2:])
375
376 def getArrowBodyElementId(reactionId :str) -> str:
377 """Return the SVG element id for a reaction arrow body, normalizing direction tags."""
378 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
379 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: reactionId = reactionId[:-2]
380 return f"R_{reactionId}"
381
382 def getArrowHeadElementId(reactionId :str) -> Tuple[str, str]:
383 """
384 We attempt extracting the direction information from the provided reaction ID, if unsuccessful we provide the IDs of both directions.
385
386 Args:
387 reactionId : the provided reaction ID.
388
389 Returns:
390 Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try.
391 """
392 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
393 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown:
394 return reactionId[:-3:-1] + reactionId[:-2], "" # ^^^ Invert _F to F_
395
396 return f"F_{reactionId}", f"B_{reactionId}"
397
398 class ArrowColor(Enum):
399 """
400 Encodes possible arrow colors based on their meaning in the enrichment process.
401 """
402 Invalid = "#BEBEBE" # gray, fold-change under treshold or not significant p-value
403 Transparent = "#ffffff00" # transparent, to make some arrow segments disappear
404 UpRegulated = "#ecac68" # orange, up-regulated reaction
405 DownRegulated = "#6495ed" # lightblue, down-regulated reaction
406
407 UpRegulatedInv = "#FF0000" # bright red for reversible with conflicting directions
408
409 DownRegulatedInv = "#0000FF" # bright blue for reversible with conflicting directions
410
411 @classmethod
412 def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor":
413 colors = (cls.DownRegulated, cls.DownRegulatedInv) if foldChange < 0 else (cls.UpRegulated, cls.UpRegulatedInv)
414 return colors[useAltColor]
415
416 def __str__(self) -> str: return self.value
417
418 class Arrow:
419 """
420 Models the properties of a reaction arrow that change based on enrichment.
421 """
422 MIN_W = 2
423 MAX_W = 12
424
425 def __init__(self, width :int, col: ArrowColor, *, isDashed = False) -> None:
426 """
427 (Private) Initializes an instance of Arrow.
428
429 Args:
430 width : width of the arrow, ideally to be kept within Arrow.MIN_W and Arrow.MAX_W (not enforced).
431 col : color of the arrow.
432 isDashed : whether the arrow should be dashed, meaning the associated pValue resulted not significant.
433
434 Returns:
435 None : practically, a Arrow instance.
436 """
437 self.w = width
438 self.col = col
439 self.dash = isDashed
440
441 def applyTo(self, reactionId :str, metabMap :ET.ElementTree, styleStr :str) -> None:
442 if getElementById(reactionId, metabMap).map(lambda el : styleMapElement(el, styleStr)).isErr:
443 ERRORS.append(reactionId)
444
445 def styleReactionElements(self, metabMap :ET.ElementTree, reactionId :str, *, mindReactionDir = True) -> None:
446 # If direction is irrelevant (e.g., RAS), style only the arrow body
447 if not mindReactionDir:
448 return self.applyTo(getArrowBodyElementId(reactionId), metabMap, self.toStyleStr())
449
450 # Now we style the arrow head(s):
451 idOpt1, idOpt2 = getArrowHeadElementId(reactionId)
452 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True))
453 if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True))
454
455 def toStyleStr(self, *, downSizedForTips = False) -> str:
456 """
457 Collapses the styles of this Arrow into a str, ready to be applied as part of the "style" property on an svg element.
458
459 Returns:
460 str : the styles string.
461 """
462 width = self.w
463 if downSizedForTips: width *= 0.8
464 return f";stroke:{self.col};stroke-width:{width};stroke-dasharray:{'5,5' if self.dash else 'none'}"
465
466 # Default arrows used for different significance states
467 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid)
468 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True)
469 TRANSPARENT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Transparent) # Who cares how big it is if it's transparent
470
471 def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericZScore :float) -> None:
472 """
473 Applies RPS enrichment results to the provided metabolic map.
474
475 Args:
476 rpsEnrichmentRes : RPS enrichment results.
477 metabMap : the metabolic map to edit.
478 maxNumericZScore : biggest finite z-score value found.
479
480 Side effects:
481 metabMap : mut
482
483 Returns:
484 None
485 """
486 for reactionId, values in rpsEnrichmentRes.items():
487 pValue = values[0]
488 foldChange = values[1]
489 z_score = values[2]
490
491 if math.isnan(pValue) or (isinstance(foldChange, float) and math.isnan(foldChange)): continue
492
493 if isinstance(foldChange, str): foldChange = float(foldChange)
494 if pValue > ARGS.pValue: # pValue above tresh: dashed arrow
495 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId)
496 continue
497
498 if abs(foldChange) < (ARGS.fChange - 1) / (abs(ARGS.fChange) + 1):
499 INVALID_ARROW.styleReactionElements(metabMap, reactionId)
500 continue
501
502 width = Arrow.MAX_W
503 if not math.isinf(z_score):
504 try: width = min(
505 max(abs(z_score * Arrow.MAX_W) / maxNumericZScore, Arrow.MIN_W),
506 Arrow.MAX_W)
507
508 except ZeroDivisionError: pass
509
510 if not reactionId.endswith("_RV"): # RV stands for reversible reactions
511 Arrow(width, ArrowColor.fromFoldChangeSign(foldChange)).styleReactionElements(metabMap, reactionId)
512 continue
513
514 reactionId = reactionId[:-3] # Remove "_RV"
515
516 inversionScore = (values[3] < 0) + (values[4] < 0) # Compacts the signs of averages into 1 easy to check score
517 if inversionScore == 2: foldChange *= -1
518
519 # If the score is 1 (opposite signs) we use alternative colors vvv
520 arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1))
521
522 # vvv These 2 if statements can both be true and can both happen
523 if ARGS.net: # style arrow head(s):
524 arrow.styleReactionElements(metabMap, reactionId + ("_B" if inversionScore == 2 else "_F"))
525
526 if not ARGS.using_RAS: # style arrow body
527 arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False)
528
529 ############################ split class ######################################
530 def split_class(classes :pd.DataFrame, dataset_values :Dict[str, List[float]]) -> Dict[str, List[List[float]]]:
531 """
532 Generates a :dict that groups together data from a :DataFrame based on classes the data is related to.
533
534 Args:
535 classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict
536 dataset_values : a :dict containing :float data
537
538 Returns:
539 dict : the dict with data grouped by class
540
541 Side effects:
542 classes : mut
543 """
544 class_pat :Dict[str, List[List[float]]] = {}
545 for i in range(len(classes)):
546 classe :str = classes.iloc[i, 1]
547 if pd.isnull(classe): continue
548
549 l :List[List[float]] = []
550 sample_ids: List[str] = []
551
552 for j in range(i, len(classes)):
553 if classes.iloc[j, 1] == classe:
554 pat_id :str = classes.iloc[j, 0] # sample name
555 values = dataset_values.get(pat_id, None) # the column of values for that sample
556 if values != None:
557 l.append(values)
558 sample_ids.append(pat_id)
559 classes.iloc[j, 1] = None # TODO: problems?
560
561 if l:
562 class_pat[classe] = {
563 "values": list(map(list, zip(*l))), # transpose
564 "samples": sample_ids
565 }
566 continue
567
568 utils.logWarning(
569 f"Warning: no sample found in class \"{classe}\", the class has been disregarded", ARGS.out_log)
570
571 return class_pat
572
573 ############################ conversion ##############################################
574 # Conversion from SVG to PNG
575 def svg_to_png_with_background(svg_path :utils.FilePath, png_path :utils.FilePath, dpi :int = 72, scale :int = 1, size :Optional[float] = None) -> None:
576 """
577 Internal utility to convert an SVG to PNG (forced opaque) to aid in PDF conversion.
578
579 Args:
580 svg_path : path to SVG file
581 png_path : path for new PNG file
582 dpi : dots per inch of the generated PNG
583 scale : scaling factor for the generated PNG, computed internally when a size is provided
584 size : final effective width of the generated PNG
585
586 Returns:
587 None
588 """
589 if size:
590 image = pyvips.Image.new_from_file(svg_path.show(), dpi=dpi, scale=1)
591 scale = size / image.width
592 image = image.resize(scale)
593 else:
594 image = pyvips.Image.new_from_file(svg_path.show(), dpi=dpi, scale=scale)
595
596 white_background = pyvips.Image.black(image.width, image.height).new_from_image([255, 255, 255])
597 white_background = white_background.affine([scale, 0, 0, scale])
598
599 if white_background.bands != image.bands:
600 white_background = white_background.extract_band(0)
601
602 composite_image = white_background.composite2(image, 'over')
603 composite_image.write_to_file(png_path.show())
604
605 def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None:
606 """
607 Converts the SVG map at the provided path to PDF.
608
609 Args:
610 file_svg : path to SVG file
611 file_png : path to PNG file
612 file_pdf : path to new PDF file
613
614 Returns:
615 None
616 """
617 svg_to_png_with_background(file_svg, file_png)
618 try:
619 image = Image.open(file_png.show())
620 image = image.convert("RGB")
621 image.save(file_pdf.show(), "PDF", resolution=100.0)
622 print(f'PDF file {file_pdf.filePath} successfully generated.')
623
624 except Exception as e:
625 raise utils.DataErr(file_pdf.show(), f'Error generating PDF file: {e}')
626
627 ############################ map ##############################################
628 def buildOutputPath(dataset1Name :str, dataset2Name = "rest", *, details = "", ext :utils.FileFormat) -> utils.FilePath:
629 """
630 Builds a FilePath instance from the names of confronted datasets ready to point to a location in the
631 "result/" folder, used by this tool for output files in collections.
632
633 Args:
634 dataset1Name : _description_
635 dataset2Name : _description_. Defaults to "rest".
636 details : _description_
637 ext : _description_
638
639 Returns:
640 utils.FilePath : _description_
641 """
642 return utils.FilePath(
643 f"{dataset1Name}_vs_{dataset2Name}" + (f" ({details})" if details else ""),
644 ext,
645 prefix = ARGS.output_path)
646
647 FIELD_NOT_AVAILABLE = '/'
648 def writeToCsv(rows: List[list], fieldNames :List[str], outPath :utils.FilePath) -> None:
649 fieldsAmt = len(fieldNames)
650 with open(outPath.show(), "w", newline = "") as fd:
651 writer = csv.DictWriter(fd, fieldnames = fieldNames, delimiter = '\t')
652 writer.writeheader()
653
654 for row in rows:
655 sizeMismatch = fieldsAmt - len(row)
656 if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch)
657 writer.writerow({ field : data for field, data in zip(fieldNames, row) })
658
659 OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]]
660 def temp_thingsInCommon(tmp :OldEnrichedScores, core_map :ET.ElementTree, max_z_score :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None:
661 suffix = "RAS" if ras_enrichment else "RPS"
662 writeToCsv(
663 [ [reactId] + values for reactId, values in tmp.items() ],
664 ["ids", "P_Value", "fold change", "z-score", "average_1", "average_2"],
665 buildOutputPath(dataset1Name, dataset2Name, details = f"Tabular Result ({suffix})", ext = utils.FileFormat.TSV))
666
667 if ras_enrichment:
668 fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_z_score)
669 return
670
671 for reactId, enrichData in tmp.items(): tmp[reactId] = tuple(enrichData)
672 applyRpsEnrichmentToMap(tmp, core_map, max_z_score)
673
674 def computePValue(dataset1Data: List[float], dataset2Data: List[float]) -> Tuple[float, float]:
675 """
676 Computes the statistical significance score (P-value) of the comparison between coherent data
677 from two datasets. The data is supposed to, in both datasets:
678 - be related to the same reaction ID;
679 - be ordered by sample, such that the item at position i in both lists is related to the
680 same sample or cell line.
681
682 Args:
683 dataset1Data : data from the 1st dataset.
684 dataset2Data : data from the 2nd dataset.
685
686 Returns:
687 tuple: (P-value, Z-score)
688 - P-value from the selected test on the provided data.
689 - Z-score of the difference between means of the two datasets.
690 """
691 match ARGS.test:
692 case "ks":
693 # Perform Kolmogorov-Smirnov test
694 _, p_value = st.ks_2samp(dataset1Data, dataset2Data)
695 case "ttest_p":
696 # Datasets should have same size
697 if len(dataset1Data) != len(dataset2Data):
698 raise ValueError("Datasets must have the same size for paired t-test.")
699 # Perform t-test for paired samples
700 _, p_value = st.ttest_rel(dataset1Data, dataset2Data)
701 case "ttest_ind":
702 # Perform t-test for independent samples
703 _, p_value = st.ttest_ind(dataset1Data, dataset2Data)
704 case "wilcoxon":
705 # Datasets should have same size
706 if len(dataset1Data) != len(dataset2Data):
707 raise ValueError("Datasets must have the same size for Wilcoxon signed-rank test.")
708 # Perform Wilcoxon signed-rank test
709 np.random.seed(42) # Ensure reproducibility since zsplit method is used
710 _, p_value = st.wilcoxon(dataset1Data, dataset2Data, zero_method='zsplit')
711 case "mw":
712 # Perform Mann-Whitney U test
713 _, p_value = st.mannwhitneyu(dataset1Data, dataset2Data)
714 case _:
715 p_value = np.nan # Default value if no valid test is selected
716
717 # Calculate means and standard deviations
718 mean1 = np.mean(dataset1Data)
719 mean2 = np.mean(dataset2Data)
720 std1 = np.std(dataset1Data, ddof=1)
721 std2 = np.std(dataset2Data, ddof=1)
722
723 n1 = len(dataset1Data)
724 n2 = len(dataset2Data)
725
726 # Calculate Z-score
727 z_score = (mean1 - mean2) / np.sqrt((std1**2 / n1) + (std2**2 / n2))
728
729 return p_value, z_score
730
731
732 def DESeqPValue(comparisonResult :Dict[str, List[Union[float, FoldChange]]], dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> None:
733 """
734 Computes the p-value for each reaction in the comparisonResult dictionary using DESeq2.
735
736 Args:
737 comparisonResult : dictionary mapping a p-value and a fold-change value (values) to each reaction ID as encoded in the SVG map (keys)
738 dataset1Data : data from the 1st dataset.
739 dataset2Data : data from the 2nd dataset.
740 ids : list of reaction IDs.
741
742 Returns:
743 None : mutates the comparisonResult dictionary in place with the p-values.
744 """
745
746 # pyDESeq2 needs at least 2 replicates per sample so I check this
747 if len(dataset1Data[0]) < 2 or len(dataset2Data[0]) < 2:
748 raise ValueError("Datasets must have at least 2 replicates each")
749
750 # pyDESeq2 is based on pandas, so we need to convert the data into a DataFrame and clean it from NaN values
751 dataframe1 = pd.DataFrame(dataset1Data, index=ids)
752 dataframe2 = pd.DataFrame(dataset2Data, index=ids)
753
754 # pyDESeq2 requires datasets to be samples x reactions and integer values
755 dataframe1_clean = dataframe1.dropna(axis=0, how="any").T.astype(int)
756 dataframe2_clean = dataframe2.dropna(axis=0, how="any").T.astype(int)
757 dataframe1_clean.index = [f"ds1_rep{i+1}" for i in range(dataframe1_clean.shape[0])]
758 dataframe2_clean.index = [f"ds2_rep{j+1}" for j in range(dataframe2_clean.shape[0])]
759
760 # pyDESeq2 works on a DataFrame with values and another with infos about how samples are split (like dataset class)
761 dataframe = pd.concat([dataframe1_clean, dataframe2_clean], axis=0)
762 metadata = pd.DataFrame({"dataset": (["dataset1"]*dataframe1_clean.shape[0] + ["dataset2"]*dataframe2_clean.shape[0])}, index=dataframe.index)
763
764 # Ensure the index of the metadata matches the index of the dataframe
765 if not dataframe.index.equals(metadata.index):
766 raise ValueError("The index of the metadata DataFrame must match the index of the counts DataFrame.")
767
768 # Prepare and run pyDESeq2
769 inference = DefaultInference()
770 dds = DeseqDataSet(counts=dataframe, metadata=metadata, design="~dataset", inference=inference, quiet=True, low_memory=True)
771 dds.deseq2()
772 ds = DeseqStats(dds, contrast=["dataset", "dataset1", "dataset2"], inference=inference, quiet=True)
773 ds.summary()
774
775 # Retrieve the p-values from the DESeq2 results
776 for reactId in ds.results_df.index:
777 comparisonResult[reactId][0] = ds.results_df["pvalue"][reactId]
778
779
780 # TODO: the net RPS computation should be done in the RPS module
781 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float, Dict[str, Tuple[np.ndarray, np.ndarray]]]:
782
783 netRPS :Dict[str, Tuple[np.ndarray, np.ndarray]] = {}
784 comparisonResult :Dict[str, List[Union[float, FoldChange]]] = {}
785 count = 0
786 max_z_score = 0
787
788 for l1, l2 in zip(dataset1Data, dataset2Data):
789 reactId = ids[count]
790 count += 1
791 if not reactId: continue
792
793 try: #TODO: identify the source of these errors and minimize code in the try block
794 reactDir = ReactionDirection.fromReactionId(reactId)
795 # Net score is computed only for reversible reactions when user wants it on arrow tips or when RAS datasets aren't used
796 if (ARGS.net or not ARGS.using_RAS) and reactDir is not ReactionDirection.Unknown:
797 try: position = ids.index(reactId[:-1] + ('B' if reactDir is ReactionDirection.Direct else 'F'))
798 except ValueError: continue # we look for the complementary id, if not found we skip
799
800 nets1 = np.subtract(l1, dataset1Data[position])
801 nets2 = np.subtract(l2, dataset2Data[position])
802 netRPS[reactId] = (nets1, nets2)
803
804 # Compute p-value and z-score for the RPS scores, if the pyDESeq option is set, p-values will be computed after and this function will return p_value = 0
805 p_value, z_score = computePValue(nets1, nets2)
806 avg1 = sum(nets1) / len(nets1)
807 avg2 = sum(nets2) / len(nets2)
808 net = fold_change(avg1, avg2)
809
810 if math.isnan(net): continue
811 comparisonResult[reactId[:-1] + "RV"] = [p_value, net, z_score, avg1, avg2]
812
813 # vvv complementary directional ids are set to None once processed if net is to be applied to tips
814 if ARGS.net: # If only using RPS, we cannot delete the inverse, as it's needed to color the arrows
815 ids[position] = None
816 continue
817
818 # fallthrough is intended, regular scores need to be computed when tips aren't net but RAS datasets aren't used
819 # Compute p-value and z-score for the RAS scores, if the pyDESeq option is set, p-values will be computed after and this function will return p_value = 0
820 p_value, z_score = computePValue(l1, l2)
821 avg = fold_change(sum(l1) / len(l1), sum(l2) / len(l2))
822 # vvv TODO: Check numpy version compatibility
823 if np.isfinite(z_score) and max_z_score < abs(z_score): max_z_score = abs(z_score)
824 comparisonResult[reactId] = [float(p_value), avg, z_score, sum(l1) / len(l1), sum(l2) / len(l2)]
825
826 except (TypeError, ZeroDivisionError): continue
827
828 if ARGS.test == "DESeq":
829 # Compute p-values using DESeq2
830 DESeqPValue(comparisonResult, dataset1Data, dataset2Data, ids)
831
832 # Apply multiple testing correction if set by the user
833 if ARGS.adjusted:
834
835 # Retrieve the p-values from the comparisonResult dictionary, they have to be different from NaN
836 validPValues = [(reactId, result[0]) for reactId, result in comparisonResult.items() if not np.isnan(result[0])]
837 # Unpack the valid p-values
838 reactIds, pValues = zip(*validPValues)
839 # Adjust the p-values using the Benjamini-Hochberg method
840 adjustedPValues = st.false_discovery_control(pValues)
841 # Update the comparisonResult dictionary with the adjusted p-values
842 for reactId , adjustedPValue in zip(reactIds, adjustedPValues):
843 comparisonResult[reactId][0] = adjustedPValue
844
845 return comparisonResult, max_z_score, netRPS
846
847 def computeEnrichment(class_pat: Dict[str, List[List[float]]], ids: List[str], *, fromRAS=True) -> Tuple[List[Tuple[str, str, dict, float]], dict]:
848 """
849 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the
850 provided metabolic map.
851
852 Args:
853 class_pat : the clustered data.
854 ids : ids for data association.
855 fromRAS : whether the data to enrich consists of RAS scores.
856
857 Returns:
858 tuple: A tuple containing:
859 - List[Tuple[str, str, dict, float]]: List of tuples with pairs of dataset names, comparison dictionary and max z-score.
860 - dict : net RPS values for each dataset's reactions
861
862 Raises:
863 sys.exit : if there are less than 2 classes for comparison
864 """
865 class_pat = {k.strip(): v for k, v in class_pat.items()}
866 if (not class_pat) or (len(class_pat.keys()) < 2):
867 sys.exit('Execution aborted: classes provided for comparisons are less than two\n')
868
869 # { datasetName : { reactId : netRPS, ... }, ... }
870 netRPSResults :Dict[str, Dict[str, np.ndarray]] = {}
871 enrichment_results = []
872
873 if ARGS.comparison == "manyvsmany":
874 for i, j in it.combinations(class_pat.keys(), 2):
875 comparisonDict, max_z_score, netRPS = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids)
876 enrichment_results.append((i, j, comparisonDict, max_z_score))
877 netRPSResults[i] = { reactId : net[0] for reactId, net in netRPS.items() }
878 netRPSResults[j] = { reactId : net[1] for reactId, net in netRPS.items() }
879
880 elif ARGS.comparison == "onevsrest":
881 for single_cluster in class_pat.keys():
882 rest = [item for k, v in class_pat.items() if k != single_cluster for item in v]
883 comparisonDict, max_z_score, netRPS = compareDatasetPair(class_pat.get(single_cluster), rest, ids)
884 enrichment_results.append((single_cluster, "rest", comparisonDict, max_z_score))
885 netRPSResults[single_cluster] = { reactId : net[0] for reactId, net in netRPS.items() }
886 netRPSResults["rest"] = { reactId : net[1] for reactId, net in netRPS.items() }
887
888 elif ARGS.comparison == "onevsmany":
889 controlItems = class_pat.get(ARGS.control)
890 for otherDataset in class_pat.keys():
891 if otherDataset == ARGS.control:
892 continue
893
894 #comparisonDict, max_z_score, netRPS = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids)
895 comparisonDict, max_z_score, netRPS = compareDatasetPair(class_pat.get(otherDataset),controlItems, ids)
896 #enrichment_results.append((ARGS.control, otherDataset, comparisonDict, max_z_score))
897 enrichment_results.append(( otherDataset,ARGS.control, comparisonDict, max_z_score))
898 netRPSResults[otherDataset] = { reactId : net[0] for reactId, net in netRPS.items() }
899 netRPSResults[ARGS.control] = { reactId : net[1] for reactId, net in netRPS.items() }
900
901 return enrichment_results, netRPSResults
902
903 def createOutputMaps(dataset1Name: str, dataset2Name: str, core_map: ET.ElementTree) -> None:
904 svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details="SVG Map", ext=utils.FileFormat.SVG)
905 utils.writeSvg(svgFilePath, core_map)
906
907 if ARGS.generate_pdf:
908 pngPath = buildOutputPath(dataset1Name, dataset2Name, details="PNG Map", ext=utils.FileFormat.PNG)
909 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details="PDF Map", ext=utils.FileFormat.PDF)
910 svg_to_png_with_background(svgFilePath, pngPath)
911 try:
912 image = Image.open(pngPath.show())
913 image = image.convert("RGB")
914 image.save(pdfPath.show(), "PDF", resolution=100.0)
915 print(f'PDF file {pdfPath.filePath} successfully generated.')
916
917 except Exception as e:
918 raise utils.DataErr(pdfPath.show(), f'Error generating PDF file: {e}')
919
920 if not ARGS.generate_svg:
921 os.remove(svgFilePath.show())
922
923 ClassPat = Dict[str, List[List[float]]]
924 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat, Dict[str, List[str]]]:
925 columnNames :Dict[str, List[str]] = {} # { datasetName : [ columnName, ... ], ... }
926 class_pat :ClassPat = {}
927 if ARGS.option == 'datasets':
928 num = 1
929 for path, name in zip(datasetsPaths, names):
930 name = str(name)
931 if name == 'Dataset':
932 name += '_' + str(num)
933
934 values, ids = getDatasetValues(path, name)
935 if values != None:
936 class_pat[name] = list(map(list, zip(*values.values()))) # TODO: ???
937 columnNames[name] = ["Reactions", *values.keys()]
938
939 num += 1
940
941 elif ARGS.option == "dataset_class":
942 classes = read_dataset(classPath, "class")
943 classes = classes.astype(str)
944
945 values, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)")
946 if values != None:
947 class_pat_with_samples_id = split_class(classes, values)
948
949 for clas, values_and_samples_id in class_pat_with_samples_id.items():
950 class_pat[clas] = values_and_samples_id["values"]
951 columnNames[clas] = ["Reactions", *values_and_samples_id["samples"]]
952
953 return ids, class_pat, columnNames
954
955 def getDatasetValues(datasetPath :str, datasetName :str) -> Tuple[ClassPat, List[str]]:
956 """
957 Opens the dataset at the given path and extracts the values (expected nullable numerics) and the IDs.
958
959 Args:
960 datasetPath : path to the dataset
961 datasetName (str): dataset name, used in error reporting
962
963 Returns:
964 Tuple[ClassPat, List[str]]: values and IDs extracted from the dataset
965 """
966 dataset = read_dataset(datasetPath, datasetName)
967 IDs = pd.Series.tolist(dataset.iloc[:, 0].astype(str))
968
969 dataset = dataset.drop(dataset.columns[0], axis = "columns").to_dict("list")
970 return { id : list(map(utils.Float("Dataset values, not an argument"), values)) for id, values in dataset.items() }, IDs
971
972 ############################ MAIN #############################################
973 def main(args:List[str] = None) -> None:
974 """
975 Initializes everything and sets the program in motion based on the fronted input arguments.
976
977 Returns:
978 None
979
980 Raises:
981 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError)
982 """
983 global ARGS
984 ARGS = process_args(args)
985
986 # Create output folder
987 if not os.path.isdir(ARGS.output_path):
988 os.makedirs(ARGS.output_path, exist_ok=True)
989
990 core_map: ET.ElementTree = ARGS.choice_map.getMap(
991 ARGS.tool_dir,
992 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None)
993
994 # Prepare enrichment results containers
995 ras_results = []
996 rps_results = []
997
998 # Compute RAS enrichment if requested
999 if ARGS.using_RAS:
1000 ids_ras, class_pat_ras, _ = getClassesAndIdsFromDatasets(
1001 ARGS.input_datas, ARGS.input_data, ARGS.input_class, ARGS.names)
1002 ras_results, _ = computeEnrichment(class_pat_ras, ids_ras, fromRAS=True)
1003
1004
1005 # Compute RPS enrichment if requested
1006 if ARGS.using_RPS:
1007 ids_rps, class_pat_rps, columnNames = getClassesAndIdsFromDatasets(
1008 ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps)
1009
1010 rps_results, netRPS = computeEnrichment(class_pat_rps, ids_rps, fromRAS=False)
1011
1012 # Organize by comparison pairs
1013 comparisons: Dict[Tuple[str, str], Dict[str, Tuple]] = {}
1014 for i, j, comparison_data, max_z_score in ras_results:
1015 comparisons[(i, j)] = {'ras': (comparison_data, max_z_score), 'rps': None}
1016
1017 for i, j, comparison_data, max_z_score, in rps_results:
1018 comparisons.setdefault((i, j), {}).update({'rps': (comparison_data, max_z_score)})
1019
1020 # For each comparison, create a styled map with RAS bodies and RPS heads
1021 for (i, j), res in comparisons.items():
1022 map_copy = copy.deepcopy(core_map)
1023
1024 # Apply RAS styling to arrow bodies
1025 if res.get('ras'):
1026 tmp_ras, max_z_ras = res['ras']
1027 temp_thingsInCommon(tmp_ras, map_copy, max_z_ras, i, j, ras_enrichment=True)
1028
1029 # Apply RPS styling to arrow heads
1030 if res.get('rps'):
1031 tmp_rps, max_z_rps = res['rps']
1032
1033 temp_thingsInCommon(tmp_rps, map_copy, max_z_rps, i, j, ras_enrichment=False)
1034
1035 # Output both SVG and PDF/PNG as configured
1036 createOutputMaps(i, j, map_copy)
1037
1038 # Add net RPS output file
1039 if ARGS.net or not ARGS.using_RAS:
1040 for datasetName, rows in netRPS.items():
1041 writeToCsv(
1042 [[reactId, *netValues] for reactId, netValues in rows.items()],
1043 columnNames.get(datasetName, ["Reactions"]),
1044 utils.FilePath(
1045 "Net_RPS_" + datasetName,
1046 ext = utils.FileFormat.CSV,
1047 prefix = ARGS.output_path))
1048
1049 print('Execution succeeded')
1050 ###############################################################################
1051 if __name__ == "__main__":
1052 main()