comparison marea.py @ 283:813439d60f85 draft

Uploaded
author luca_milaz
date Mon, 08 Jul 2024 22:18:11 +0000
parents
children
comparison
equal deleted inserted replaced
282:d385c4df70c3 283:813439d60f85
1 from __future__ import division
2 import csv
3 from enum import Enum
4 import re
5 import sys
6 import numpy as np
7 import pandas as pd
8 import itertools as it
9 import scipy.stats as st
10 import lxml.etree as ET
11 import math
12 import os
13 import argparse
14 import pyvips
15 import utils.general_utils as utils
16 from PIL import Image
17 from typing import Tuple, Union, Optional, List, Dict
18
19 ERRORS = []
20 ########################## argparse ##########################################
21 ARGS :argparse.Namespace
22 def process_args() -> argparse.Namespace:
23 """
24 Interfaces the script of a module with its frontend, making the user's choices for various parameters available as values in code.
25
26 Args:
27 args : Always obtained (in file) from sys.argv
28
29 Returns:
30 Namespace : An object containing the parsed arguments
31 """
32 parser = argparse.ArgumentParser(
33 usage = "%(prog)s [options]",
34 description = "process some value's genes to create a comparison's map.")
35
36 #General:
37 parser.add_argument(
38 '-td', '--tool_dir',
39 type = str,
40 required = True,
41 help = 'your tool directory')
42
43 parser.add_argument('-on', '--control', type = str)
44 parser.add_argument('-ol', '--out_log', help = "Output log")
45
46 #Computation details:
47 parser.add_argument(
48 '-co', '--comparison',
49 type = str,
50 default = '1vs1',
51 choices = ['manyvsmany', 'onevsrest', 'onevsmany'])
52
53 parser.add_argument(
54 '-pv' ,'--pValue',
55 type = float,
56 default = 0.1,
57 help = 'P-Value threshold (default: %(default)s)')
58
59 parser.add_argument(
60 '-fc', '--fChange',
61 type = float,
62 default = 1.5,
63 help = 'Fold-Change threshold (default: %(default)s)')
64
65 parser.add_argument(
66 "-ne", "--net",
67 type = utils.Bool("net"), default = False,
68 help = "choose if you want net enrichment for RPS")
69
70 parser.add_argument(
71 '-op', '--option',
72 type = str,
73 choices = ['datasets', 'dataset_class'],
74 help='dataset or dataset and class')
75
76 #RAS:
77 parser.add_argument(
78 "-ra", "--using_RAS",
79 type = utils.Bool("using_RAS"), default = True,
80 help = "choose whether to use RAS datasets.")
81
82 parser.add_argument(
83 '-id', '--input_data',
84 type = str,
85 help = 'input dataset')
86
87 parser.add_argument(
88 '-ic', '--input_class',
89 type = str,
90 help = 'sample group specification')
91
92 parser.add_argument(
93 '-ids', '--input_datas',
94 type = str,
95 nargs = '+',
96 help = 'input datasets')
97
98 parser.add_argument(
99 '-na', '--names',
100 type = str,
101 nargs = '+',
102 help = 'input names')
103
104 #RPS:
105 parser.add_argument(
106 "-rp", "--using_RPS",
107 type = utils.Bool("using_RPS"), default = False,
108 help = "choose whether to use RPS datasets.")
109
110 parser.add_argument(
111 '-idr', '--input_data_rps',
112 type = str,
113 help = 'input dataset rps')
114
115 parser.add_argument(
116 '-icr', '--input_class_rps',
117 type = str,
118 help = 'sample group specification rps')
119
120 parser.add_argument(
121 '-idsr', '--input_datas_rps',
122 type = str,
123 nargs = '+',
124 help = 'input datasets rps')
125
126 parser.add_argument(
127 '-nar', '--names_rps',
128 type = str,
129 nargs = '+',
130 help = 'input names rps')
131
132 #Output:
133 parser.add_argument(
134 "-gs", "--generate_svg",
135 type = utils.Bool("generate_svg"), default = True,
136 help = "choose whether to use RAS datasets.")
137
138 parser.add_argument(
139 "-gp", "--generate_pdf",
140 type = utils.Bool("generate_pdf"), default = True,
141 help = "choose whether to use RAS datasets.")
142
143 parser.add_argument(
144 '-cm', '--custom_map',
145 type = str,
146 help='custom map to use')
147
148 parser.add_argument(
149 '-mc', '--choice_map',
150 type = utils.Model, default = utils.Model.HMRcore,
151 choices = [utils.Model.HMRcore, utils.Model.ENGRO2, utils.Model.Custom])
152
153 args :argparse.Namespace = parser.parse_args()
154 if args.using_RAS and not args.using_RPS: args.net = False
155
156 return args
157
158 ############################ dataset input ####################################
159 def read_dataset(data :str, name :str) -> pd.DataFrame:
160 """
161 Tries to read the dataset from its path (data) as a tsv and turns it into a DataFrame.
162
163 Args:
164 data : filepath of a dataset (from frontend input params or literals upon calling)
165 name : name associated with the dataset (from frontend input params or literals upon calling)
166
167 Returns:
168 pd.DataFrame : dataset in a runtime operable shape
169
170 Raises:
171 sys.exit : if there's no data (pd.errors.EmptyDataError) or if the dataset has less than 2 columns
172 """
173 try:
174 dataset = pd.read_csv(data, sep = '\t', header = 0, engine='python')
175 except pd.errors.EmptyDataError:
176 sys.exit('Execution aborted: wrong format of ' + name + '\n')
177 if len(dataset.columns) < 2:
178 sys.exit('Execution aborted: wrong format of ' + name + '\n')
179 return dataset
180
181 ############################ dataset name #####################################
182 def name_dataset(name_data :str, count :int) -> str:
183 """
184 Produces a unique name for a dataset based on what was provided by the user. The default name for any dataset is "Dataset", thus if the user didn't change it this function appends f"_{count}" to make it unique.
185
186 Args:
187 name_data : name associated with the dataset (from frontend input params)
188 count : counter from 1 to make these names unique (external)
189
190 Returns:
191 str : the name made unique
192 """
193 if str(name_data) == 'Dataset':
194 return str(name_data) + '_' + str(count)
195 else:
196 return str(name_data)
197
198 ############################ map_methods ######################################
199 FoldChange = Union[float, int, str] # Union[float, Literal[0, "-INF", "INF"]]
200 def fold_change(avg1 :float, avg2 :float) -> FoldChange:
201 """
202 Calculates the fold change between two gene expression values.
203
204 Args:
205 avg1 : average expression value from one dataset avg2 : average expression value from the other dataset
206
207 Returns:
208 FoldChange :
209 0 : when both input values are 0
210 "-INF" : when avg1 is 0
211 "INF" : when avg2 is 0
212 float : for any other combination of values
213 """
214 if avg1 == 0 and avg2 == 0:
215 return 0
216 elif avg1 == 0:
217 return '-INF'
218 elif avg2 == 0:
219 return 'INF'
220 else:
221 return math.log(avg1 / avg2, 2)
222
223 def fix_style(l :str, col :Optional[str], width :str, dash :str) -> str:
224 """
225 Produces a "fixed" style string to assign to a reaction arrow in the SVG map, assigning style properties to the corresponding values passed as input params.
226
227 Args:
228 l : current style string of an SVG element
229 col : new value for the "stroke" style property
230 width : new value for the "stroke-width" style property
231 dash : new value for the "stroke-dasharray" style property
232
233 Returns:
234 str : the fixed style string
235 """
236 tmp = l.split(';')
237 flag_col = False
238 flag_width = False
239 flag_dash = False
240 for i in range(len(tmp)):
241 if tmp[i].startswith('stroke:'):
242 tmp[i] = 'stroke:' + col
243 flag_col = True
244 if tmp[i].startswith('stroke-width:'):
245 tmp[i] = 'stroke-width:' + width
246 flag_width = True
247 if tmp[i].startswith('stroke-dasharray:'):
248 tmp[i] = 'stroke-dasharray:' + dash
249 flag_dash = True
250 if not flag_col:
251 tmp.append('stroke:' + col)
252 if not flag_width:
253 tmp.append('stroke-width:' + width)
254 if not flag_dash:
255 tmp.append('stroke-dasharray:' + dash)
256 return ';'.join(tmp)
257
258 # The type of d values is collapsed, losing precision, because the dict containst lists instead of tuples, please fix!
259 def fix_map(d :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, threshold_P_V :float, threshold_F_C :float, max_F_C :float) -> ET.ElementTree:
260 """
261 Edits the selected SVG map based on the p-value and fold change data (d) and some significance thresholds also passed as inputs.
262
263 Args:
264 d : dictionary mapping a p-value and a fold-change value (values) to each reaction ID as encoded in the SVG map (keys)
265 core_map : SVG map to modify
266 threshold_P_V : threshold for a p-value to be considered significant
267 threshold_F_C : threshold for a fold change value to be considered significant
268 max_F_C : highest fold change (absolute value)
269
270 Returns:
271 ET.ElementTree : the modified core_map
272
273 Side effects:
274 core_map : mut
275 """
276 maxT = 12
277 minT = 2
278 grey = '#BEBEBE'
279 blue = '#0000FF'
280 red = '#E41A1C'
281 for el in core_map.iter():
282 el_id = str(el.get('id'))
283 if el_id.startswith('R_'):
284 tmp = d.get(el_id[2:])
285 if tmp != None:
286 p_val :float = tmp[0]
287 f_c = tmp[1]
288 if p_val < threshold_P_V:
289 if not isinstance(f_c, str):
290 if abs(f_c) < math.log(threshold_F_C, 2):
291 col = grey
292 width = str(minT)
293 else:
294 if f_c < 0:
295 col = blue
296 elif f_c > 0:
297 col = red
298 width = str(max((abs(f_c) * maxT) / max_F_C, minT))
299 else:
300 if f_c == '-INF':
301 col = blue
302 elif f_c == 'INF':
303 col = red
304 width = str(maxT)
305 dash = 'none'
306 else:
307 dash = '5,5'
308 col = grey
309 width = str(minT)
310 el.set('style', fix_style(el.get('style', ""), col, width, dash))
311 return core_map
312
313 def getElementById(reactionId :str, metabMap :ET.ElementTree) -> utils.Result[ET.Element, utils.Result.ResultErr]:
314 """
315 Finds any element in the given map with the given ID. ID uniqueness in an svg file is recommended but
316 not enforced, if more than one element with the exact ID is found only the first will be returned.
317
318 Args:
319 reactionId (str): exact ID of the requested element.
320 metabMap (ET.ElementTree): metabolic map containing the element.
321
322 Returns:
323 utils.Result[ET.Element, ResultErr]: result of the search, either the first match found or a ResultErr.
324 """
325 return utils.Result.Ok(
326 f"//*[@id=\"{reactionId}\"]").map(
327 lambda xPath : metabMap.xpath(xPath)[0]).mapErr(
328 lambda _ : utils.Result.ResultErr(f"No elements with ID \"{reactionId}\" found in map"))
329 # ^^^ we shamelessly ignore the contents of the IndexError, it offers nothing to the user.
330
331 def styleMapElement(element :ET.Element, styleStr :str) -> None:
332 currentStyles :str = element.get("style", "")
333 if re.search(r";stroke:[^;]+;stroke-width:[^;]+;stroke-dasharray:[^;]+$", currentStyles):
334 currentStyles = ';'.join(currentStyles.split(';')[:-3])
335
336 element.set("style", currentStyles + styleStr)
337
338 class ReactionDirection(Enum):
339 Unknown = ""
340 Direct = "_F"
341 Inverse = "_B"
342
343 @classmethod
344 def fromDir(cls, s :str) -> "ReactionDirection":
345 # vvv as long as there's so few variants I actually condone the if spam:
346 if s == ReactionDirection.Direct.value: return ReactionDirection.Direct
347 if s == ReactionDirection.Inverse.value: return ReactionDirection.Inverse
348 return ReactionDirection.Unknown
349
350 @classmethod
351 def fromReactionId(cls, reactionId :str) -> "ReactionDirection":
352 return ReactionDirection.fromDir(reactionId[-2:])
353
354 def getArrowBodyElementId(reactionId :str) -> str:
355 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
356 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: reactionId = reactionId[:-2]
357 return f"R_{reactionId}"
358
359 def getArrowHeadElementId(reactionId :str) -> Tuple[str, str]:
360 """
361 We attempt extracting the direction information from the provided reaction ID, if unsuccessful we provide the IDs of both directions.
362
363 Args:
364 reactionId : the provided reaction ID.
365
366 Returns:
367 Tuple[str, str]: either a single str ID for the correct arrow head followed by an empty string or both options to try.
368 """
369 if reactionId.endswith("_RV"): reactionId = reactionId[:-3] #TODO: standardize _RV
370 elif ReactionDirection.fromReactionId(reactionId) is not ReactionDirection.Unknown: return reactionId[:-3:-1] + reactionId[:-2], ""
371 return f"F_{reactionId}", f"B_{reactionId}"
372
373 class ArrowColor(Enum):
374 """
375 Encodes possible arrow colors based on their meaning in the enrichment process.
376 """
377 Invalid = "#BEBEBE" # gray, fold-change under treshold
378 UpRegulated = "#E41A1C" # red, up-regulated reaction
379 DownRegulated = "#0000FF" # blue, down-regulated reaction
380
381 UpRegulatedInv = "#FF7A00"
382 # ^^^ different shade of red (actually orange), up-regulated net value for a reversible reaction with
383 # conflicting enrichment in the two directions.
384
385 DownRegulatedInv = "#B22CF1"
386 # ^^^ different shade of blue (actually purple), down-regulated net value for a reversible reaction with
387 # conflicting enrichment in the two directions.
388
389 @classmethod
390 def fromFoldChangeSign(cls, foldChange :float, *, useAltColor = False) -> "ArrowColor":
391 colors = (cls.DownRegulated, cls.DownRegulatedInv) if foldChange < 0 else (cls.UpRegulated, cls.UpRegulatedInv)
392 return colors[useAltColor]
393
394 def __str__(self) -> str: return self.value
395
396 class Arrow:
397 """
398 Models the properties of a reaction arrow that change based on enrichment.
399 """
400 MIN_W = 2
401 MAX_W = 12
402
403 def __init__(self, width :int, col: ArrowColor, *, isDashed = False) -> None:
404 """
405 (Private) Initializes an instance of Arrow.
406
407 Args:
408 width : width of the arrow, ideally to be kept within Arrow.MIN_W and Arrow.MAX_W (not enforced).
409 col : color of the arrow.
410 isDashed : whether the arrow should be dashed, meaning the associated pValue resulted not significant.
411
412 Returns:
413 None : practically, a Arrow instance.
414 """
415 self.w = width
416 self.col = col
417 self.dash = isDashed
418
419 def applyTo(self, reactionId :str, metabMap :ET.ElementTree, styleStr :str) -> None:
420 if getElementById(reactionId, metabMap).map(lambda el : styleMapElement(el, styleStr)).isErr:
421 ERRORS.append(reactionId)
422
423 def styleReactionElements(self, metabMap :ET.ElementTree, reactionId :str, *, mindReactionDir = True) -> None:
424 # If We're dealing with RAS data or in general don't care about the direction of the reaction we only style the arrow body
425 if not mindReactionDir:
426 return self.applyTo(getArrowBodyElementId(reactionId), metabMap, self.toStyleStr())
427
428 # Now we style the arrow head(s):
429 idOpt1, idOpt2 = getArrowHeadElementId(reactionId)
430 self.applyTo(idOpt1, metabMap, self.toStyleStr(downSizedForTips = True))
431 if idOpt2: self.applyTo(idOpt2, metabMap, self.toStyleStr(downSizedForTips = True))
432
433 def getMapReactionId(self, reactionId :str, mindReactionDir :bool) -> str:
434 """
435 Computes the reaction ID as encoded in the map for a given reaction ID from the dataset.
436
437 Args:
438 reactionId: the reaction ID, as encoded in the dataset.
439 mindReactionDir: if True forward (F_) and backward (B_) directions will be encoded in the result.
440
441 Returns:
442 str : the ID of an arrow's body or tips in the map.
443 """
444 # we assume the reactionIds also don't encode reaction dir if they don't mind it when styling the map.
445 if not mindReactionDir: return "R_" + reactionId
446
447 #TODO: this is clearly something we need to make consistent in RPS
448 return (reactionId[:-3:-1] + reactionId[:-2]) if reactionId[:-2] in ["_F", "_B"] else f"F_{reactionId}" # "Pyr_F" --> "F_Pyr"
449
450 def toStyleStr(self, *, downSizedForTips = False) -> str:
451 """
452 Collapses the styles of this Arrow into a str, ready to be applied as part of the "style" property on an svg element.
453
454 Returns:
455 str : the styles string.
456 """
457 width = self.w
458 if downSizedForTips: width *= 0.15
459 return f";stroke:{self.col};stroke-width:{width};stroke-dasharray:{'5,5' if self.dash else 'none'}"
460
461 # vvv These constants could be inside the class itself a static properties, but python
462 # was built by brainless organisms so here we are!
463 INVALID_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid)
464 INSIGNIFICANT_ARROW = Arrow(Arrow.MIN_W, ArrowColor.Invalid, isDashed = True)
465
466 def applyRpsEnrichmentToMap(rpsEnrichmentRes :Dict[str, Union[Tuple[float, FoldChange], Tuple[float, FoldChange, float, float]]], metabMap :ET.ElementTree, maxNumericFoldChange :float) -> None:
467 """
468 Applies RPS enrichment results to the provided metabolic map.
469
470 Args:
471 rpsEnrichmentRes : RPS enrichment results.
472 metabMap : the metabolic map to edit.
473 maxNumericFoldChange : biggest finite fold-change value found.
474
475 Side effects:
476 metabMap : mut
477
478 Returns:
479 None
480 """
481 for reactionId, values in rpsEnrichmentRes.items():
482 pValue = values[0]
483 foldChange = values[1]
484
485 if isinstance(foldChange, str): foldChange = float(foldChange)
486 if pValue >= ARGS.pValue: # pValue above tresh: dashed arrow
487 INSIGNIFICANT_ARROW.styleReactionElements(metabMap, reactionId)
488 continue
489
490 if abs(foldChange) < math.log(ARGS.fChange, 2):
491 INVALID_ARROW.styleReactionElements(metabMap, reactionId)
492 continue
493
494 width = Arrow.MAX_W
495 if not math.isinf(foldChange):
496 try: width = max(abs(foldChange * Arrow.MAX_W) / maxNumericFoldChange, Arrow.MIN_W)
497 except ZeroDivisionError: pass
498
499 if not reactionId.endswith("_RV"): # RV stands for reversible reactions
500 Arrow(width, ArrowColor.fromFoldChangeSign(foldChange)).styleReactionElements(metabMap, reactionId)
501 continue
502
503 reactionId = reactionId[:-3] # Remove "_RV"
504
505 inversionScore = (values[2] < 0) + (values[3] < 0) # Compacts the signs of averages into 1 easy to check score
506 if inversionScore == 2: foldChange *= -1
507 # ^^^ Style the inverse direction with the opposite sign netValue
508
509 # If the score is 1 (opposite signs) we use alternative colors vvv
510 arrow = Arrow(width, ArrowColor.fromFoldChangeSign(foldChange, useAltColor = inversionScore == 1))
511
512 # vvv These 2 if statements can both be true and can both happen
513 if ARGS.net: # style arrow head(s):
514 arrow.styleReactionElements(metabMap, reactionId + ("_B" if inversionScore == 2 else "_F"))
515
516 if not ARGS.using_RAS: # style arrow body
517 arrow.styleReactionElements(metabMap, reactionId, mindReactionDir = False)
518
519 ############################ split class ######################################
520 def split_class(classes :pd.DataFrame, resolve_rules :Dict[str, List[float]]) -> Dict[str, List[List[float]]]:
521 """
522 Generates a :dict that groups together data from a :DataFrame based on classes the data is related to.
523
524 Args:
525 classes : a :DataFrame of only string values, containing class information (rows) and keys to query the resolve_rules :dict
526 resolve_rules : a :dict containing :float data
527
528 Returns:
529 dict : the dict with data grouped by class
530
531 Side effects:
532 classes : mut
533 """
534 class_pat :Dict[str, List[List[float]]] = {}
535 for i in range(len(classes)):
536 classe :str = classes.iloc[i, 1]
537 if pd.isnull(classe): continue
538
539 l :List[List[float]] = []
540 for j in range(i, len(classes)):
541 if classes.iloc[j, 1] == classe:
542 pat_id :str = classes.iloc[j, 0]
543 tmp = resolve_rules.get(pat_id, None)
544 if tmp != None:
545 l.append(tmp)
546 classes.iloc[j, 1] = None
547
548 if l:
549 class_pat[classe] = list(map(list, zip(*l)))
550 continue
551
552 utils.logWarning(
553 f"Warning: no sample found in class \"{classe}\", the class has been disregarded", ARGS.out_log)
554
555 return class_pat
556
557 ############################ conversion ##############################################
558 #conversion from svg to png
559 def svg_to_png_with_background(svg_path :utils.FilePath, png_path :utils.FilePath, dpi :int = 72, scale :int = 1, size :Optional[float] = None) -> None:
560 """
561 Internal utility to convert an SVG to PNG (forced opaque) to aid in PDF conversion.
562
563 Args:
564 svg_path : path to SVG file
565 png_path : path for new PNG file
566 dpi : dots per inch of the generated PNG
567 scale : scaling factor for the generated PNG, computed internally when a size is provided
568 size : final effective width of the generated PNG
569
570 Returns:
571 None
572 """
573 if size:
574 image = pyvips.Image.new_from_file(svg_path.show(), dpi=dpi, scale=1)
575 scale = size / image.width
576 image = image.resize(scale)
577 else:
578 image = pyvips.Image.new_from_file(svg_path.show(), dpi=dpi, scale=scale)
579
580 white_background = pyvips.Image.black(image.width, image.height).new_from_image([255, 255, 255])
581 white_background = white_background.affine([scale, 0, 0, scale])
582
583 if white_background.bands != image.bands:
584 white_background = white_background.extract_band(0)
585
586 composite_image = white_background.composite2(image, 'over')
587 composite_image.write_to_file(png_path.show())
588
589 #funzione unica, lascio fuori i file e li passo in input
590 #conversion from png to pdf
591 def convert_png_to_pdf(png_file :utils.FilePath, pdf_file :utils.FilePath) -> None:
592 """
593 Internal utility to convert a PNG to PDF to aid from SVG conversion.
594
595 Args:
596 png_file : path to PNG file
597 pdf_file : path to new PDF file
598
599 Returns:
600 None
601 """
602 image = Image.open(png_file.show())
603 image = image.convert("RGB")
604 image.save(pdf_file.show(), "PDF", resolution=100.0)
605
606 #function called to reduce redundancy in the code
607 def convert_to_pdf(file_svg :utils.FilePath, file_png :utils.FilePath, file_pdf :utils.FilePath) -> None:
608 """
609 Converts the SVG map at the provided path to PDF.
610
611 Args:
612 file_svg : path to SVG file
613 file_png : path to PNG file
614 file_pdf : path to new PDF file
615
616 Returns:
617 None
618 """
619 svg_to_png_with_background(file_svg, file_png)
620 try:
621 convert_png_to_pdf(file_png, file_pdf)
622 print(f'PDF file {file_pdf.filePath} successfully generated.')
623
624 except Exception as e:
625 raise utils.DataErr(file_pdf.show(), f'Error generating PDF file: {e}')
626
627 ############################ map ##############################################
628 def buildOutputPath(dataset1Name :str, dataset2Name = "rest", *, details = "", ext :utils.FileFormat) -> utils.FilePath:
629 """
630 Builds a FilePath instance from the names of confronted datasets ready to point to a location in the
631 "result/" folder, used by this tool for output files in collections.
632
633 Args:
634 dataset1Name : _description_
635 dataset2Name : _description_. Defaults to "rest".
636 details : _description_
637 ext : _description_
638
639 Returns:
640 utils.FilePath : _description_
641 """
642 # This function returns a util data structure but is extremely specific to this module.
643 # RAS also uses collections as output and as such might benefit from a method like this, but I'd wait
644 # TODO: until a third tool with multiple outputs appears before porting this to utils.
645 return utils.FilePath(
646 f"{dataset1Name}_vs_{dataset2Name}" + (f" ({details})" if details else ""),
647 # ^^^ yes this string is built every time even if the form is the same for the same 2 datasets in
648 # all output files: I don't care, this was never the performance bottleneck of the tool and
649 # there is no other net gain in saving and re-using the built string.
650 ext,
651 prefix = "result")
652
653 FIELD_NOT_AVAILABLE = '/'
654 def writeToCsv(rows: List[list], fieldNames :List[str], outPath :utils.FilePath) -> None:
655 fieldsAmt = len(fieldNames)
656 with open(outPath.show(), "w", newline = "") as fd:
657 writer = csv.DictWriter(fd, fieldnames = fieldNames, delimiter = '\t')
658 writer.writeheader()
659
660 for row in rows:
661 sizeMismatch = fieldsAmt - len(row)
662 if sizeMismatch > 0: row.extend([FIELD_NOT_AVAILABLE] * sizeMismatch)
663 writer.writerow({ field : data for field, data in zip(fieldNames, row) })
664
665 OldEnrichedScores = Dict[str, List[Union[float, FoldChange]]] #TODO: try to use Tuple whenever possible
666 def writeTabularResult(enrichedScores : OldEnrichedScores, ras_enrichment: bool, outPath :utils.FilePath) -> None:
667 fieldNames = ["ids", "P_Value", "Log2(fold change)"]
668 if not ras_enrichment: fieldNames.extend(["average_1", "average_2"])
669
670 writeToCsv([ [reactId] + values for reactId, values in enrichedScores.items() ], fieldNames, outPath)
671
672 def temp_thingsInCommon(tmp :Dict[str, List[Union[float, FoldChange]]], core_map :ET.ElementTree, max_F_C :float, dataset1Name :str, dataset2Name = "rest", ras_enrichment = True) -> None:
673 # this function compiles the things always in common between comparison modes after enrichment.
674 # TODO: organize, name better.
675 writeTabularResult(tmp, ras_enrichment, buildOutputPath(dataset1Name, dataset2Name, details = "Tabular Result", ext = utils.FileFormat.TSV))
676
677 if ras_enrichment:
678 fix_map(tmp, core_map, ARGS.pValue, ARGS.fChange, max_F_C)
679 return
680
681 for reactId, enrichData in tmp.items(): tmp[reactId] = tuple(enrichData)
682 applyRpsEnrichmentToMap(tmp, core_map, max_F_C)
683
684 def computePValue(dataset1Data :List[float], dataset2Data :List[float]) -> float:
685 """
686 Computes the statistical significance score (P-value) of the comparison between coherent data
687 from two datasets. The data is supposed to, in both datasets:
688 - be related to the same reaction ID;
689 - be ordered by sample, such that the item at position i in both lists is related to the
690 same sample or cell line.
691
692 Args:
693 dataset1Data : data from the 1st dataset.
694 dataset2Data : data from the 2nd dataset.
695
696 Returns:
697 float: P-value from a Kolmogorov-Smirnov test on the provided data.
698 """
699 return st.ks_2samp(dataset1Data, dataset2Data)[1]
700
701 def compareDatasetPair(dataset1Data :List[List[float]], dataset2Data :List[List[float]], ids :List[str]) -> Tuple[Dict[str, List[Union[float, FoldChange]]], float]:
702 #TODO: the following code still suffers from "dumbvarnames-osis"
703 tmp :Dict[str, List[Union[float, FoldChange]]] = {}
704 count = 0
705 max_F_C = 0
706
707 for l1, l2 in zip(dataset1Data, dataset2Data):
708 reactId = ids[count]
709 count += 1
710 if not reactId: continue # we skip ids that have already been processed
711
712 try: #TODO: identify the source of these errors and minimize code in the try block
713 reactDir = ReactionDirection.fromReactionId(reactId)
714 # Net score is computed only for reversible reactions when user wants it on arrow tips or when RAS datasets aren't used
715 if (ARGS.net or not ARGS.using_RAS) and reactDir is not ReactionDirection.Unknown:
716 try: position = ids.index(reactId[:-1] + ('B' if reactDir is ReactionDirection.Direct else 'F'))
717 except ValueError: continue # we look for the complementary id, if not found we skip
718
719 nets1 = np.subtract(l1, dataset1Data[position])
720 nets2 = np.subtract(l2, dataset2Data[position])
721
722 p_value = computePValue(nets1, nets2)
723 avg1 = sum(nets1) / len(nets1)
724 avg2 = sum(nets2) / len(nets2)
725 net = (avg1 - avg2) / abs(avg2)
726
727 if math.isnan(net): continue
728 tmp[reactId[:-1] + "RV"] = [p_value, net, avg1, avg2]
729
730 # vvv complementary directional ids are set to None once processed if net is to be applied to tips
731 if ARGS.net:
732 ids[position] = None
733 continue
734
735 # fallthrough is intended, regular scores need to be computed when tips aren't net but RAS datasets aren't used
736 p_value = computePValue(l1, l2)
737 avg = fold_change(sum(l1) / len(l1), sum(l2) / len(l2))
738 if not isinstance(avg, str) and max_F_C < abs(avg): max_F_C = abs(avg)
739 tmp[reactId] = [float(p_value), avg]
740
741 except (TypeError, ZeroDivisionError): continue
742
743 return tmp, max_F_C
744
745 def computeEnrichment(metabMap :ET.ElementTree, class_pat :Dict[str, List[List[float]]], ids :List[str], *, fromRAS = True) -> None:
746 """
747 Compares clustered data based on a given comparison mode and applies enrichment-based styling on the
748 provided metabolic map.
749
750 Args:
751 metabMap : SVG map to modify.
752 class_pat : the clustered data.
753 ids : ids for data association.
754 fromRAS : whether the data to enrich consists of RAS scores.
755
756 Returns:
757 None
758
759 Raises:
760 sys.exit : if there are less than 2 classes for comparison
761
762 Side effects:
763 metabMap : mut
764 ids : mut
765 """
766 class_pat = { k.strip() : v for k, v in class_pat.items() }
767 #TODO: simplfy this stuff vvv and stop using sys.exit (raise the correct utils error)
768 if (not class_pat) or (len(class_pat.keys()) < 2): sys.exit('Execution aborted: classes provided for comparisons are less than two\n')
769
770 if ARGS.comparison == "manyvsmany":
771 for i, j in it.combinations(class_pat.keys(), 2):
772 #TODO: these 2 functions are always called in pair and in this order and need common data,
773 # some clever refactoring would be appreciated.
774 comparisonDict, max_F_C = compareDatasetPair(class_pat.get(i), class_pat.get(j), ids)
775 temp_thingsInCommon(comparisonDict, metabMap, max_F_C, i, j, fromRAS)
776
777 elif ARGS.comparison == "onevsrest":
778 for single_cluster in class_pat.keys():
779 t :List[List[List[float]]] = []
780 for k in class_pat.keys():
781 if k != single_cluster:
782 t.append(class_pat.get(k))
783
784 rest :List[List[float]] = []
785 for i in t:
786 rest = rest + i
787
788 comparisonDict, max_F_C = compareDatasetPair(class_pat.get(single_cluster), rest, ids)
789 temp_thingsInCommon(comparisonDict, metabMap, max_F_C, single_cluster, fromRAS)
790
791 elif ARGS.comparison == "onevsmany":
792 controlItems = class_pat.get(ARGS.control)
793 for otherDataset in class_pat.keys():
794 if otherDataset == ARGS.control: continue
795
796 comparisonDict, max_F_C = compareDatasetPair(controlItems, class_pat.get(otherDataset), ids)
797 temp_thingsInCommon(comparisonDict, metabMap, max_F_C, ARGS.control, otherDataset, fromRAS)
798
799 def createOutputMaps(dataset1Name :str, dataset2Name :str, core_map :ET.ElementTree) -> None:
800 svgFilePath = buildOutputPath(dataset1Name, dataset2Name, details = "SVG Map", ext = utils.FileFormat.SVG)
801 utils.writeSvg(svgFilePath, core_map)
802
803 if ARGS.generate_pdf:
804 pngPath = buildOutputPath(dataset1Name, dataset2Name, details = "PNG Map", ext = utils.FileFormat.PNG)
805 pdfPath = buildOutputPath(dataset1Name, dataset2Name, details = "PDF Map", ext = utils.FileFormat.PDF)
806 convert_to_pdf(svgFilePath, pngPath, pdfPath)
807
808 if not ARGS.generate_svg: os.remove(svgFilePath.show())
809
810 ClassPat = Dict[str, List[List[float]]]
811 def getClassesAndIdsFromDatasets(datasetsPaths :List[str], datasetPath :str, classPath :str, names :List[str]) -> Tuple[List[str], ClassPat]:
812 # TODO: I suggest creating dicts with ids as keys instead of keeping class_pat and ids separate,
813 # for the sake of everyone's sanity.
814 class_pat :ClassPat = {}
815 if ARGS.option == 'datasets':
816 num = 1 #TODO: the dataset naming function could be a generator
817 for path, name in zip(datasetsPaths, names):
818 name = name_dataset(name, num)
819 resolve_rules_float, ids = getDatasetValues(path, name)
820 if resolve_rules_float != None:
821 class_pat[name] = list(map(list, zip(*resolve_rules_float.values())))
822
823 num += 1
824
825 elif ARGS.option == "dataset_class":
826 classes = read_dataset(classPath, "class")
827 classes = classes.astype(str)
828
829 resolve_rules_float, ids = getDatasetValues(datasetPath, "Dataset Class (not actual name)")
830 if resolve_rules_float != None: class_pat = split_class(classes, resolve_rules_float)
831
832 return ids, class_pat
833 #^^^ TODO: this could be a match statement over an enum, make it happen future marea dev with python 3.12! (it's why I kept the ifs)
834
835 #TODO: create these damn args as FilePath objects
836 def getDatasetValues(datasetPath :str, datasetName :str) -> Tuple[ClassPat, List[str]]:
837 """
838 Opens the dataset at the given path and extracts the values (expected nullable numerics) and the IDs.
839
840 Args:
841 datasetPath : path to the dataset
842 datasetName (str): dataset name, used in error reporting
843
844 Returns:
845 Tuple[ClassPat, List[str]]: values and IDs extracted from the dataset
846 """
847 dataset = read_dataset(datasetPath, datasetName)
848 IDs = pd.Series.tolist(dataset.iloc[:, 0].astype(str))
849
850 dataset = dataset.drop(dataset.columns[0], axis = "columns").to_dict("list")
851 return { id : list(map(utils.Float("Dataset values, not an argument"), values)) for id, values in dataset.items() }, IDs
852
853 ############################ MAIN #############################################
854 def main() -> None:
855 """
856 Initializes everything and sets the program in motion based on the fronted input arguments.
857
858 Returns:
859 None
860
861 Raises:
862 sys.exit : if a user-provided custom map is in the wrong format (ET.XMLSyntaxError, ET.XMLSchemaParseError)
863 """
864 global ARGS
865 ARGS = process_args()
866
867 if os.path.isdir('result') == False: os.makedirs('result')
868
869 core_map :ET.ElementTree = ARGS.choice_map.getMap(
870 ARGS.tool_dir,
871 utils.FilePath.fromStrPath(ARGS.custom_map) if ARGS.custom_map else None)
872 # TODO: ^^^ ugly but fine for now, the argument is None if the model isn't custom because no file was given.
873 # getMap will None-check the customPath and panic when the model IS custom but there's no file (good). A cleaner
874 # solution can be derived from my comment in FilePath.fromStrPath
875
876 if ARGS.using_RAS:
877 ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas, ARGS.input_data, ARGS.input_class, ARGS.names)
878 computeEnrichment(core_map, class_pat, ids)
879
880 if ARGS.using_RPS:
881 ids, class_pat = getClassesAndIdsFromDatasets(ARGS.input_datas_rps, ARGS.input_data_rps, ARGS.input_class_rps, ARGS.names_rps)
882 computeEnrichment(core_map, class_pat, ids, fromRAS = False)
883
884 # create output files: TODO: this is the same comparison happening in "maps", find a better way to organize this
885 if ARGS.comparison == "manyvsmany":
886 for i, j in it.combinations(class_pat.keys(), 2): createOutputMaps(i, j, core_map)
887 return
888
889 if ARGS.comparison == "onevsrest":
890 for single_cluster in class_pat.keys(): createOutputMaps(single_cluster, "rest", core_map)
891 return
892
893 for otherDataset in class_pat.keys():
894 if otherDataset != ARGS.control: createOutputMaps(i, j, core_map)
895
896 if not ERRORS: return
897 utils.logWarning(
898 f"The following reaction IDs were mentioned in the dataset but weren't found in the map: {ERRORS}",
899 ARGS.out_log)
900
901 print('Execution succeded')
902
903 ###############################################################################
904 if __name__ == "__main__":
905 main()