392
|
1 import math
|
|
2 import re
|
|
3 import sys
|
|
4 import csv
|
|
5 import pickle
|
|
6 import lxml.etree as ET
|
|
7
|
|
8 from enum import Enum
|
|
9 from itertools import count
|
408
|
10 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union, Set, Tuple
|
392
|
11
|
|
12 import pandas as pd
|
|
13 import cobra
|
409
|
14 from cobra import Model as cobraModel, Reaction, Metabolite
|
392
|
15
|
|
16 import zipfile
|
|
17 import gzip
|
|
18 import bz2
|
|
19 from io import StringIO
|
412
|
20 import utils.rule_parsing as rulesUtils
|
|
21 import utils.reaction_parsing as reactionUtils
|
392
|
22
|
394
|
23
|
|
24
|
392
|
25 class ValueErr(Exception):
|
|
26 def __init__(self, param_name, expected, actual):
|
|
27 super().__init__(f"Invalid value for {param_name}: expected {expected}, got {actual}")
|
|
28
|
|
29 class PathErr(Exception):
|
|
30 def __init__(self, path, message):
|
|
31 super().__init__(f"Path error for '{path}': {message}")
|
|
32
|
|
33 class FileFormat(Enum):
|
|
34 """
|
|
35 Encodes possible file extensions to conditionally save data in a different format.
|
|
36 """
|
|
37 DAT = ("dat",) # this is how galaxy treats all your files!
|
|
38 CSV = ("csv",) # this is how most editable input data is written
|
|
39 TSV = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!!
|
|
40 SVG = ("svg",) # this is how most metabolic maps are written
|
|
41 PNG = ("png",) # this is a common output format for images (such as metabolic maps)
|
|
42 PDF = ("pdf",) # this is also a common output format for images, as it's required in publications.
|
|
43
|
|
44 # Updated to include compressed variants
|
|
45 XML = ("xml", "xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed
|
|
46 JSON = ("json", "json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed
|
|
47 MAT = ("mat", "mat.gz", "mat.zip", "mat.bz2") # COBRA models can be stored as MAT files, sometimes compressed
|
|
48 YML = ("yml", "yml.gz", "yml.zip", "yml.bz2") # COBRA models can be stored as YML files, sometimes compressed
|
|
49
|
|
50 TXT = ("txt",) # this is how most output data is written
|
|
51 PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved
|
|
52
|
|
53 def __init__(self, *extensions):
|
|
54 self.extensions = extensions
|
|
55 # Store original extension when set via fromExt
|
|
56 self._original_extension = None
|
|
57
|
|
58 @classmethod
|
|
59 def fromExt(cls, ext: str) -> "FileFormat":
|
|
60 """
|
|
61 Converts a file extension string to a FileFormat instance.
|
|
62 Args:
|
|
63 ext : The file extension as a string.
|
|
64 Returns:
|
|
65 FileFormat: The FileFormat instance corresponding to the file extension.
|
|
66 """
|
|
67 variantName = ext.upper()
|
|
68 if variantName in FileFormat.__members__:
|
|
69 instance = FileFormat[variantName]
|
|
70 instance._original_extension = ext
|
|
71 return instance
|
|
72
|
|
73 variantName = ext.lower()
|
|
74 for member in cls:
|
|
75 if variantName in member.value:
|
|
76 # Create a copy-like behavior by storing the original extension
|
|
77 member._original_extension = ext
|
|
78 return member
|
|
79
|
|
80 raise ValueErr("ext", "a valid FileFormat file extension", ext)
|
|
81
|
|
82 def __str__(self) -> str:
|
|
83 """
|
|
84 (Private) converts to str representation. Good practice for usage with argparse.
|
|
85 Returns:
|
|
86 str : the string representation of the file extension.
|
|
87 """
|
|
88 # If we have an original extension stored (for compressed files only), use it
|
|
89 if hasattr(self, '_original_extension') and self._original_extension:
|
|
90 return self._original_extension
|
|
91
|
|
92 # For XML, JSON, MAT and YML without original extension, use the base extension
|
|
93 if self == FileFormat.XML:
|
|
94 return "xml"
|
|
95 elif self == FileFormat.JSON:
|
|
96 return "json"
|
|
97 elif self == FileFormat.MAT:
|
|
98 return "mat"
|
|
99 elif self == FileFormat.YML:
|
|
100 return "yml"
|
|
101
|
|
102 return self.value[-1]
|
|
103
|
|
104 class FilePath():
|
|
105 """
|
|
106 Represents a file path. View this as an attempt to standardize file-related operations by expecting
|
|
107 values of this type in any process requesting a file path.
|
|
108 """
|
|
109 def __init__(self, filePath: str, ext: FileFormat, *, prefix="") -> None:
|
|
110 """
|
|
111 (Private) Initializes an instance of FilePath.
|
|
112 Args:
|
|
113 path : the end of the path, containing the file name.
|
|
114 ext : the file's extension.
|
|
115 prefix : anything before path, if the last '/' isn't there it's added by the code.
|
|
116 Returns:
|
|
117 None : practically, a FilePath instance.
|
|
118 """
|
|
119 self.ext = ext
|
|
120 self.filePath = filePath
|
|
121
|
|
122 if prefix and prefix[-1] != '/':
|
|
123 prefix += '/'
|
|
124 self.prefix = prefix
|
|
125
|
|
126 @classmethod
|
|
127 def fromStrPath(cls, path: str) -> "FilePath":
|
|
128 """
|
|
129 Factory method to parse a string from which to obtain, if possible, a valid FilePath instance.
|
|
130 It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models.
|
|
131 These double extensions are not supported for other file types such as .csv.
|
|
132 Args:
|
|
133 path : the string containing the path
|
|
134 Raises:
|
|
135 PathErr : if the provided string doesn't represent a valid path.
|
|
136 Returns:
|
|
137 FilePath : the constructed instance.
|
|
138 """
|
|
139 result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path)
|
|
140 if not result or not result["name"] or not result["ext"]:
|
|
141 raise PathErr(path, "cannot recognize folder structure or extension in path")
|
|
142
|
|
143 prefix = result["prefix"] if result["prefix"] else ""
|
|
144 name, ext = result["name"], result["ext"]
|
|
145
|
|
146 # Check for double extensions (json.gz, xml.zip, etc.)
|
|
147 parts = path.split(".")
|
|
148 if len(parts) >= 3:
|
|
149 penultimate = parts[-2]
|
|
150 last = parts[-1]
|
|
151 double_ext = f"{penultimate}.{last}"
|
|
152
|
|
153 # Try the double extension first
|
|
154 try:
|
|
155 ext_format = FileFormat.fromExt(double_ext)
|
|
156 name = ".".join(parts[:-2])
|
|
157 # Extract prefix if it exists
|
|
158 if '/' in name:
|
|
159 prefix = name[:name.rfind('/') + 1]
|
|
160 name = name[name.rfind('/') + 1:]
|
|
161 return cls(name, ext_format, prefix=prefix)
|
|
162 except ValueErr:
|
|
163 # If double extension doesn't work, fall back to single extension
|
|
164 pass
|
|
165
|
|
166 # Single extension fallback (original logic)
|
|
167 try:
|
|
168 ext_format = FileFormat.fromExt(ext)
|
|
169 return cls(name, ext_format, prefix=prefix)
|
|
170 except ValueErr:
|
|
171 raise PathErr(path, f"unsupported file extension: {ext}")
|
|
172
|
|
173 def show(self) -> str:
|
|
174 """
|
|
175 Shows the path as a string.
|
|
176 Returns:
|
|
177 str : the path shown as a string.
|
|
178 """
|
|
179 return f"{self.prefix}{self.filePath}.{self.ext}"
|
|
180
|
|
181 def __str__(self) -> str:
|
|
182 return self.show()
|
|
183
|
|
184 # ERRORS
|
|
185 def terminate(msg :str) -> None:
|
|
186 """
|
|
187 Terminate the execution of the script with an error message.
|
|
188
|
|
189 Args:
|
|
190 msg (str): The error message to be displayed.
|
|
191
|
|
192 Returns:
|
|
193 None
|
|
194 """
|
|
195 sys.exit(f"Execution aborted: {msg}\n")
|
|
196
|
|
197 def logWarning(msg :str, loggerPath :str) -> None:
|
|
198 """
|
|
199 Log a warning message to an output log file and print it to the console. The final period and a
|
|
200 newline is added by the function.
|
|
201
|
|
202 Args:
|
|
203 s (str): The warning message to be logged and printed.
|
|
204 loggerPath : The file path of the output log file. Given as a string, parsed to a FilePath and
|
|
205 immediately read back (beware relative expensive operation, log with caution).
|
|
206
|
|
207 Returns:
|
|
208 None
|
|
209 """
|
|
210 # building the path and then reading it immediately seems useless, but it's actually a way of
|
|
211 # validating that reduces repetition on the caller's side. Besides, logging a message by writing
|
|
212 # to a file is supposed to be computationally expensive anyway, so this is also a good deterrent from
|
|
213 # mindlessly logging whenever something comes up, log at the very end and tell the user everything
|
|
214 # that went wrong. If you don't like it: implement a persistent runtime buffer that gets dumped to
|
|
215 # the file only at the end of the program's execution.
|
|
216 with open(FilePath.fromStrPath(loggerPath).show(), 'a') as log: log.write(f"{msg}.\n")
|
|
217
|
|
218 class CustomErr(Exception):
|
|
219 """
|
|
220 Custom error class to handle exceptions in a structured way, with a unique identifier and a message.
|
|
221 """
|
|
222 __idGenerator = count()
|
|
223 errName = "Custom Error"
|
|
224 def __init__(self, msg :str, details = "", explicitErrCode = -1) -> None:
|
|
225 """
|
|
226 (Private) Initializes an instance of CustomErr.
|
|
227
|
|
228 Args:
|
|
229 msg (str): Error message to be displayed.
|
|
230 details (str): Informs the user more about the error encountered. Defaults to "".
|
|
231 explicitErrCode (int): Explicit error code to be used. Defaults to -1.
|
|
232
|
|
233 Returns:
|
|
234 None : practically, a CustomErr instance.
|
|
235 """
|
|
236 self.msg = msg
|
|
237 self.details = details
|
|
238
|
|
239 self.id = max(explicitErrCode, next(CustomErr.__idGenerator))
|
|
240
|
|
241 def throw(self, loggerPath = "") -> None:
|
|
242 """
|
|
243 Raises the current CustomErr instance, logging a warning message before doing so.
|
|
244
|
|
245 Raises:
|
|
246 self: The current CustomErr instance.
|
|
247
|
|
248 Returns:
|
|
249 None
|
|
250 """
|
|
251 if loggerPath: logWarning(str(self), loggerPath)
|
|
252 raise self
|
|
253
|
|
254 def abort(self) -> None:
|
|
255 """
|
|
256 Aborts the execution of the script.
|
|
257
|
|
258 Returns:
|
|
259 None
|
|
260 """
|
|
261 terminate(str(self))
|
|
262
|
|
263 def __str__(self) -> str:
|
|
264 """
|
|
265 (Private) Returns a string representing the current CustomErr instance.
|
|
266
|
|
267 Returns:
|
|
268 str: A string representing the current CustomErr instance.
|
|
269 """
|
|
270 return f"{CustomErr.errName} #{self.id}: {self.msg}, {self.details}."
|
|
271
|
|
272 class ArgsErr(CustomErr):
|
|
273 """
|
|
274 CustomErr subclass for UI arguments errors.
|
|
275 """
|
|
276 errName = "Args Error"
|
|
277 def __init__(self, argName :str, expected :Any, actual :Any, msg = "no further details provided") -> None:
|
|
278 super().__init__(f"argument \"{argName}\" expected {expected} but got {actual}", msg)
|
|
279
|
|
280 class DataErr(CustomErr):
|
|
281 """
|
|
282 CustomErr subclass for data formatting errors.
|
|
283 """
|
|
284 errName = "Data Format Error"
|
|
285 def __init__(self, fileName :str, msg = "no further details provided") -> None:
|
|
286 super().__init__(f"file \"{fileName}\" contains malformed data", msg)
|
|
287
|
|
288 class PathErr(CustomErr):
|
|
289 """
|
|
290 CustomErr subclass for filepath formatting errors.
|
|
291 """
|
|
292 errName = "Path Error"
|
|
293 def __init__(self, path :FilePath, msg = "no further details provided") -> None:
|
|
294 super().__init__(f"path \"{path}\" is invalid", msg)
|
|
295
|
|
296 class ValueErr(CustomErr):
|
|
297 """
|
|
298 CustomErr subclass for any value error.
|
|
299 """
|
|
300 errName = "Value Error"
|
|
301 def __init__(self, valueName: str, expected :Any, actual :Any, msg = "no further details provided") -> None:
|
|
302 super().__init__("value " + f"\"{valueName}\" " * bool(valueName) + f"was supposed to be {expected}, but got {actual} instead", msg)
|
|
303
|
|
304 # RESULT
|
|
305 T = TypeVar('T')
|
|
306 E = TypeVar('E', bound = CustomErr) # should bind to Result.ResultErr but python happened!
|
|
307 class Result(Generic[T, E]):
|
|
308 class ResultErr(CustomErr):
|
|
309 """
|
|
310 CustomErr subclass for all Result errors.
|
|
311 """
|
|
312 errName = "Result Error"
|
|
313 def __init__(self, msg = "no further details provided") -> None:
|
|
314 super().__init__(msg)
|
|
315 """
|
|
316 Class to handle the result of an operation, with a value and a boolean flag to indicate
|
|
317 whether the operation was successful or not.
|
|
318 """
|
|
319 def __init__(self, value :Union[T, E], isOk :bool) -> None:
|
|
320 """
|
|
321 (Private) Initializes an instance of Result.
|
|
322
|
|
323 Args:
|
|
324 value (Union[T, E]): The value to be stored in the Result instance.
|
|
325 isOk (bool): A boolean flag to indicate whether the operation was successful or not.
|
|
326
|
|
327 Returns:
|
|
328 None : practically, a Result instance.
|
|
329 """
|
|
330 self.isOk = isOk
|
|
331 self.isErr = not isOk
|
|
332 self.value = value
|
|
333
|
|
334 @classmethod
|
|
335 def Ok(cls, value :T) -> "Result":
|
|
336 """
|
|
337 Constructs a new Result instance with a successful operation.
|
|
338
|
|
339 Args:
|
|
340 value (T): The value to be stored in the Result instance, set as successful.
|
|
341
|
|
342 Returns:
|
|
343 Result: A new Result instance with a successful operation.
|
|
344 """
|
|
345 return Result(value, isOk = True)
|
|
346
|
|
347 @classmethod
|
|
348 def Err(cls, value :E) -> "Result":
|
|
349 """
|
|
350 Constructs a new Result instance with a failed operation.
|
|
351
|
|
352 Args:
|
|
353 value (E): The value to be stored in the Result instance, set as failed.
|
|
354
|
|
355 Returns:
|
|
356 Result: A new Result instance with a failed operation.
|
|
357 """
|
|
358 return Result(value, isOk = False)
|
|
359
|
|
360 def unwrap(self) -> T:
|
|
361 """
|
|
362 Unwraps the value of the Result instance, if the operation was successful.
|
|
363
|
|
364 Raises:
|
|
365 ResultErr: If the operation was not successful.
|
|
366
|
|
367 Returns:
|
|
368 T: The value of the Result instance, if the operation was successful.
|
|
369 """
|
|
370 if self.isOk: return self.value
|
|
371 raise Result.ResultErr(f"Unwrapped Result.Err : {self.value}")
|
|
372
|
|
373 def unwrapOr(self, default :T) -> T:
|
|
374 """
|
|
375 Unwraps the value of the Result instance, if the operation was successful, otherwise
|
|
376 it returns a default value.
|
|
377
|
|
378 Args:
|
|
379 default (T): The default value to be returned if the operation was not successful.
|
|
380
|
|
381 Returns:
|
|
382 T: The value of the Result instance, if the operation was successful,
|
|
383 otherwise the default value.
|
|
384 """
|
|
385 return self.value if self.isOk else default
|
|
386
|
|
387 def expect(self, err :"Result.ResultErr") -> T:
|
|
388 """
|
|
389 Expects that the value of the Result instance is successful, otherwise it raises an error.
|
|
390
|
|
391 Args:
|
|
392 err (Exception): The error to be raised if the operation was not successful.
|
|
393
|
|
394 Raises:
|
|
395 err: The error raised if the operation was not successful.
|
|
396
|
|
397 Returns:
|
|
398 T: The value of the Result instance, if the operation was successful.
|
|
399 """
|
|
400 if self.isOk: return self.value
|
|
401 raise err
|
|
402
|
|
403 U = TypeVar("U")
|
|
404 def map(self, mapper: Callable[[T], U]) -> "Result[U, E]":
|
|
405 """
|
|
406 Maps the value of the current Result to whatever is returned by the mapper function.
|
|
407 If the Result contained an unsuccessful operation to begin with it remains unchanged
|
|
408 (a reference to the current instance is returned).
|
|
409 If the mapper function panics the returned result instance will be of the error kind.
|
|
410
|
|
411 Args:
|
|
412 mapper (Callable[[T], U]): The mapper operation to be applied to the Result value.
|
|
413
|
|
414 Returns:
|
|
415 Result[U, E]: The result of the mapper operation applied to the Result value.
|
|
416 """
|
|
417 if self.isErr: return self
|
|
418 try: return Result.Ok(mapper(self.value))
|
|
419 except Exception as e: return Result.Err(e)
|
|
420
|
|
421 D = TypeVar("D", bound = "Result.ResultErr")
|
|
422 def mapErr(self, mapper :Callable[[E], D]) -> "Result[T, D]":
|
|
423 """
|
|
424 Maps the error of the current Result to whatever is returned by the mapper function.
|
|
425 If the Result contained a successful operation it remains unchanged
|
|
426 (a reference to the current instance is returned).
|
|
427 If the mapper function panics this method does as well.
|
|
428
|
|
429 Args:
|
|
430 mapper (Callable[[E], D]): The mapper operation to be applied to the Result error.
|
|
431
|
|
432 Returns:
|
|
433 Result[U, E]: The result of the mapper operation applied to the Result error.
|
|
434 """
|
|
435 if self.isOk: return self
|
|
436 return Result.Err(mapper(self.value))
|
|
437
|
|
438 def __str__(self):
|
|
439 return f"Result::{'Ok' if self.isOk else 'Err'}({self.value})"
|
|
440
|
|
441 # FILES
|
|
442 def read_dataset(path :FilePath, datasetName = "Dataset (not actual file name!)") -> pd.DataFrame:
|
|
443 """
|
|
444 Reads a .csv or .tsv file and returns it as a Pandas DataFrame.
|
|
445
|
|
446 Args:
|
|
447 path : the path to the dataset file.
|
|
448 datasetName : the name of the dataset.
|
|
449
|
|
450 Raises:
|
|
451 DataErr: If anything goes wrong when trying to open the file, if pandas thinks the dataset is empty or if
|
|
452 it has less than 2 columns.
|
|
453
|
|
454 Returns:
|
|
455 pandas.DataFrame: The dataset loaded as a Pandas DataFrame.
|
|
456 """
|
|
457 # I advise against the use of this function. This is an attempt at standardizing bad legacy code rather than
|
|
458 # removing / replacing it to avoid introducing as many bugs as possible in the tools still relying on this code.
|
|
459 # First off, this is not the best way to distinguish between .csv and .tsv files and Galaxy itself makes it really
|
|
460 # hard to implement anything better. Also, this function's name advertizes it as a dataset-specific operation and
|
|
461 # contains dubious responsibility (how many columns..) while being a file-opening function instead. My suggestion is
|
|
462 # TODO: stop using dataframes ever at all in anything and find a way to have tight control over file extensions.
|
|
463 try: dataset = pd.read_csv(path.show(), sep = '\t', header = None, engine = "python")
|
|
464 except:
|
|
465 try: dataset = pd.read_csv(path.show(), sep = ',', header = 0, engine = "python")
|
|
466 except Exception as err: raise DataErr(datasetName, f"encountered empty or wrongly formatted data: {err}")
|
|
467
|
|
468 if len(dataset.columns) < 2: raise DataErr(datasetName, "a dataset is always meant to have at least 2 columns")
|
|
469 return dataset
|
|
470
|
|
471 def readPickle(path :FilePath) -> Any:
|
|
472 """
|
|
473 Reads the contents of a .pickle file, which needs to exist at the given path.
|
|
474
|
|
475 Args:
|
|
476 path : the path to the .pickle file.
|
|
477
|
|
478 Returns:
|
|
479 Any : the data inside a pickle file, could be anything.
|
|
480 """
|
|
481 with open(path.show(), "rb") as fd: return pickle.load(fd)
|
|
482
|
|
483 def writePickle(path :FilePath, data :Any) -> None:
|
|
484 """
|
|
485 Saves any data in a .pickle file, created at the given path.
|
|
486
|
|
487 Args:
|
|
488 path : the path to the .pickle file.
|
|
489 data : the data to be written to the file.
|
|
490
|
|
491 Returns:
|
|
492 None
|
|
493 """
|
|
494 with open(path.show(), "wb") as fd: pickle.dump(data, fd)
|
|
495
|
|
496 def readCsv(path :FilePath, delimiter = ',', *, skipHeader = True) -> List[List[str]]:
|
|
497 """
|
|
498 Reads the contents of a .csv file, which needs to exist at the given path.
|
|
499
|
|
500 Args:
|
|
501 path : the path to the .csv file.
|
|
502 delimiter : allows other subformats such as .tsv to be opened by the same method (\\t delimiter).
|
|
503 skipHeader : whether the first row of the file is a header and should be skipped.
|
|
504
|
|
505 Returns:
|
|
506 List[List[str]] : list of rows from the file, each parsed as a list of strings originally separated by commas.
|
|
507 """
|
|
508 with open(path.show(), "r", newline = "") as fd: return list(csv.reader(fd, delimiter = delimiter))[skipHeader:]
|
|
509
|
|
510 def readSvg(path :FilePath, customErr :Optional[Exception] = None) -> ET.ElementTree:
|
|
511 """
|
|
512 Reads the contents of a .svg file, which needs to exist at the given path.
|
|
513
|
|
514 Args:
|
|
515 path : the path to the .svg file.
|
|
516
|
|
517 Raises:
|
|
518 DataErr : if the map is malformed.
|
|
519
|
|
520 Returns:
|
|
521 Any : the data inside a svg file, could be anything.
|
|
522 """
|
|
523 try: return ET.parse(path.show())
|
|
524 except (ET.XMLSyntaxError, ET.XMLSchemaParseError) as err:
|
|
525 raise customErr if customErr else err
|
|
526
|
|
527 def writeSvg(path :FilePath, data:ET.ElementTree) -> None:
|
|
528 """
|
|
529 Saves svg data opened with lxml.etree in a .svg file, created at the given path.
|
|
530
|
|
531 Args:
|
|
532 path : the path to the .svg file.
|
|
533 data : the data to be written to the file.
|
|
534
|
|
535 Returns:
|
|
536 None
|
|
537 """
|
|
538 with open(path.show(), "wb") as fd: fd.write(ET.tostring(data))
|
|
539
|
|
540 # UI ARGUMENTS
|
|
541 class Bool:
|
|
542 def __init__(self, argName :str) -> None:
|
|
543 self.argName = argName
|
|
544
|
|
545 def __call__(self, s :str) -> bool: return self.check(s)
|
|
546
|
|
547 def check(self, s :str) -> bool:
|
|
548 s = s.lower()
|
|
549 if s == "true" : return True
|
|
550 if s == "false": return False
|
|
551 raise ArgsErr(self.argName, "boolean string (true or false, not case sensitive)", f"\"{s}\"")
|
|
552
|
|
553 class Float:
|
|
554 def __init__(self, argName = "Dataset values, not an argument") -> None:
|
|
555 self.argName = argName
|
|
556
|
|
557 def __call__(self, s :str) -> float: return self.check(s)
|
|
558
|
|
559 def check(self, s :str) -> float:
|
|
560 try: return float(s)
|
|
561 except ValueError:
|
|
562 s = s.lower()
|
|
563 if s == "nan" or s == "none": return math.nan
|
|
564 raise ArgsErr(self.argName, "numeric string or \"None\" or \"NaN\" (not case sensitive)", f"\"{s}\"")
|
|
565
|
|
566 # MODELS
|
|
567 OldRule = List[Union[str, "OldRule"]]
|
|
568 class Model(Enum):
|
|
569 """
|
|
570 Represents a metabolic model, either custom or locally supported. Custom models don't point
|
|
571 to valid file paths.
|
|
572 """
|
|
573
|
|
574 Recon = "Recon"
|
|
575 ENGRO2 = "ENGRO2"
|
|
576 ENGRO2_no_legend = "ENGRO2_no_legend"
|
|
577 HMRcore = "HMRcore"
|
|
578 HMRcore_no_legend = "HMRcore_no_legend"
|
|
579 Custom = "Custom" # Exists as a valid variant in the UI, but doesn't point to valid file paths.
|
|
580
|
|
581 def __raiseMissingPathErr(self, path :Optional[FilePath]) -> None:
|
|
582 if not path: raise PathErr("<<MISSING>>", "it's necessary to provide a custom path when retrieving files from a custom model")
|
|
583
|
|
584 def getRules(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, OldRule]]:
|
|
585 """
|
|
586 Open "rules" file for this model.
|
|
587
|
|
588 Returns:
|
|
589 Dict[str, Dict[str, OldRule]] : the rules for this model.
|
|
590 """
|
|
591 path = customPath if self is Model.Custom else FilePath(f"{self.name}_rules", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
|
|
592 self.__raiseMissingPathErr(path)
|
|
593 return readPickle(path)
|
|
594
|
|
595 def getTranslator(self, toolDir :str, customPath :Optional[FilePath] = None) -> Dict[str, Dict[str, str]]:
|
|
596 """
|
|
597 Open "gene translator (old: gene_in_rule)" file for this model.
|
|
598
|
|
599 Returns:
|
|
600 Dict[str, Dict[str, str]] : the translator dict for this model.
|
|
601 """
|
|
602 path = customPath if self is Model.Custom else FilePath(f"{self.name}_genes", FileFormat.PICKLE, prefix = f"{toolDir}/local/pickle files/")
|
|
603 self.__raiseMissingPathErr(path)
|
|
604 return readPickle(path)
|
|
605
|
|
606 def getMap(self, toolDir = ".", customPath :Optional[FilePath] = None) -> ET.ElementTree:
|
|
607 path = customPath if self is Model.Custom else FilePath(f"{self.name}_map", FileFormat.SVG, prefix = f"{toolDir}/local/svg metabolic maps/")
|
|
608 self.__raiseMissingPathErr(path)
|
|
609 return readSvg(path, customErr = DataErr(path, f"custom map in wrong format"))
|
|
610
|
|
611 def getCOBRAmodel(self, toolDir = ".", customPath :Optional[FilePath] = None, customExtension :Optional[FilePath]=None)->cobra.Model:
|
|
612 if(self is Model.Custom):
|
|
613 return self.load_custom_model(customPath, customExtension)
|
|
614 else:
|
|
615 return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show())
|
|
616
|
|
617 def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model:
|
|
618 ext = ext if ext else file_path.ext
|
|
619 try:
|
|
620 if str(ext) in FileFormat.XML.value:
|
|
621 return cobra.io.read_sbml_model(file_path.show())
|
|
622
|
|
623 if str(ext) in FileFormat.JSON.value:
|
|
624 # Compressed files are not automatically handled by cobra
|
|
625 if(ext == "json"):
|
|
626 return cobra.io.load_json_model(file_path.show())
|
|
627 else:
|
|
628 return self.extract_model(file_path, ext, "json")
|
|
629
|
|
630 if str(ext) in FileFormat.MAT.value:
|
|
631 # Compressed files are not automatically handled by cobra
|
|
632 if(ext == "mat"):
|
|
633 return cobra.io.load_matlab_model(file_path.show())
|
|
634 else:
|
|
635 return self.extract_model(file_path, ext, "mat")
|
|
636
|
|
637 if str(ext) in FileFormat.YML.value:
|
|
638 # Compressed files are not automatically handled by cobra
|
|
639 if(ext == "yml"):
|
|
640 return cobra.io.load_yaml_model(file_path.show())
|
|
641 else:
|
|
642 return self.extract_model(file_path, ext, "yml")
|
|
643
|
|
644 except Exception as e: raise DataErr(file_path, e.__str__())
|
|
645 raise DataErr(file_path,
|
|
646 f"Fomat \"{file_path.ext}\" is not recognized, only JSON, XML, MAT and YAML (.yml) files are supported.")
|
|
647
|
|
648
|
|
649 def extract_model(self, file_path:FilePath, ext :FileFormat, model_encoding:Literal["json", "mat", "yml"]) -> cobra.Model:
|
|
650 """
|
|
651 Extract JSON, MAT and YAML COBRA model from a compressed file (zip, gz, bz2).
|
|
652
|
|
653 Args:
|
|
654 file_path: File path of the model
|
|
655 ext: File extensions of class FileFormat (should be .zip, .gz or .bz2)
|
|
656
|
|
657 Returns:
|
|
658 cobra.Model: COBRApy model
|
|
659
|
|
660 Raises:
|
|
661 Exception: Extraction errors
|
|
662 """
|
|
663 ext_str = str(ext)
|
|
664
|
|
665 try:
|
|
666 if '.zip' in ext_str:
|
|
667 with zipfile.ZipFile(file_path.show(), 'r') as zip_ref:
|
|
668 with zip_ref.open(zip_ref.namelist()[0]) as json_file:
|
|
669 content = json_file.read().decode('utf-8')
|
|
670 if model_encoding == "json":
|
|
671 return cobra.io.load_json_model(StringIO(content))
|
|
672 elif model_encoding == "mat":
|
|
673 return cobra.io.load_matlab_model(StringIO(content))
|
|
674 elif model_encoding == "yml":
|
|
675 return cobra.io.load_yaml_model(StringIO(content))
|
|
676 else:
|
|
677 raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
|
|
678 elif '.gz' in ext_str:
|
|
679 with gzip.open(file_path.show(), 'rt', encoding='utf-8') as gz_ref:
|
|
680 if model_encoding == "json":
|
|
681 return cobra.io.load_json_model(gz_ref)
|
|
682 elif model_encoding == "mat":
|
|
683 return cobra.io.load_matlab_model(gz_ref)
|
|
684 elif model_encoding == "yml":
|
|
685 return cobra.io.load_yaml_model(gz_ref)
|
|
686 else:
|
|
687 raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
|
|
688 elif '.bz2' in ext_str:
|
|
689 with bz2.open(file_path.show(), 'rt', encoding='utf-8') as bz2_ref:
|
|
690 if model_encoding == "json":
|
|
691 return cobra.io.load_json_model(bz2_ref)
|
|
692 elif model_encoding == "mat":
|
|
693 return cobra.io.load_matlab_model(bz2_ref)
|
|
694 elif model_encoding == "yml":
|
|
695 return cobra.io.load_yaml_model(bz2_ref)
|
|
696 else:
|
|
697 raise ValueError(f"Unsupported model encoding: {model_encoding}. Supported: json, mat, yml")
|
|
698 else:
|
|
699 raise ValueError(f"Compression format not supported: {ext_str}. Supported: .zip, .gz and .bz2")
|
|
700
|
|
701 except Exception as e:
|
|
702 raise Exception(f"Error during model extraction: {str(e)}")
|
|
703
|
|
704
|
|
705
|
394
|
706 def __str__(self) -> str: return self.value
|
|
707
|
|
708
|
|
709 def convert_genes(model,annotation):
|
|
710 from cobra.manipulation import rename_genes
|
|
711 model2=model.copy()
|
|
712 try:
|
|
713 dict_genes={gene.id:gene.notes[annotation] for gene in model2.genes}
|
|
714 except:
|
|
715 print("No annotation in gene dict!")
|
|
716 return -1
|
|
717 rename_genes(model2,dict_genes)
|
|
718
|
408
|
719 return model2
|
|
720
|
|
721
|
409
|
722 def build_cobra_model_from_csv(csv_path: str, model_id: str = "new_model") -> cobra.Model:
|
408
|
723 """
|
|
724 Costruisce un modello COBRApy a partire da un file CSV con i dati delle reazioni.
|
|
725
|
|
726 Args:
|
|
727 csv_path: Path al file CSV (separato da tab)
|
|
728 model_id: ID del modello da creare
|
|
729
|
|
730 Returns:
|
|
731 cobra.Model: Il modello COBRApy costruito
|
|
732 """
|
|
733
|
|
734 # Leggi i dati dal CSV
|
|
735 df = pd.read_csv(csv_path, sep='\t')
|
|
736
|
|
737 # Crea il modello vuoto
|
409
|
738 model = cobraModel(model_id)
|
408
|
739
|
|
740 # Dict per tenere traccia di metaboliti e compartimenti
|
|
741 metabolites_dict = {}
|
|
742 compartments_dict = {}
|
|
743
|
|
744 print(f"Costruendo modello da {len(df)} reazioni...")
|
|
745
|
|
746 # Prima passata: estrai metaboliti e compartimenti dalle formule delle reazioni
|
|
747 for idx, row in df.iterrows():
|
|
748 reaction_formula = str(row['Reaction']).strip()
|
|
749 if not reaction_formula or reaction_formula == 'nan':
|
|
750 continue
|
|
751
|
|
752 # Estrai metaboliti dalla formula della reazione
|
|
753 metabolites = extract_metabolites_from_reaction(reaction_formula)
|
|
754
|
|
755 for met_id in metabolites:
|
|
756 compartment = extract_compartment_from_metabolite(met_id)
|
|
757
|
|
758 # Aggiungi compartimento se non esiste
|
|
759 if compartment not in compartments_dict:
|
|
760 compartments_dict[compartment] = compartment
|
|
761
|
|
762 # Aggiungi metabolita se non esiste
|
|
763 if met_id not in metabolites_dict:
|
|
764 metabolites_dict[met_id] = Metabolite(
|
|
765 id=met_id,
|
|
766 compartment=compartment,
|
|
767 name=met_id.replace(f"_{compartment}", "").replace("__", "_")
|
|
768 )
|
|
769
|
|
770 # Aggiungi compartimenti al modello
|
|
771 model.compartments = compartments_dict
|
|
772
|
|
773 # Aggiungi metaboliti al modello
|
|
774 model.add_metabolites(list(metabolites_dict.values()))
|
|
775
|
|
776 print(f"Aggiunti {len(metabolites_dict)} metaboliti e {len(compartments_dict)} compartimenti")
|
|
777
|
|
778 # Seconda passata: aggiungi le reazioni
|
|
779 reactions_added = 0
|
|
780
|
|
781 for idx, row in df.iterrows():
|
412
|
782 reaction_id = str(row['ReactionID']).strip()
|
|
783 reaction_formula = str(row['Reaction']).strip()
|
|
784
|
|
785 # Salta reazioni senza formula
|
|
786 if not reaction_formula or reaction_formula == 'nan':
|
|
787 raise ValueError(f"Formula della reazione mancante {reaction_id}")
|
|
788
|
|
789 # Crea la reazione
|
|
790 reaction = Reaction(reaction_id)
|
|
791 reaction.name = reaction_id
|
|
792
|
|
793 # Imposta bounds
|
|
794 reaction.lower_bound = float(row['lower_bound']) if pd.notna(row['lower_bound']) else -1000.0
|
|
795 reaction.upper_bound = float(row['upper_bound']) if pd.notna(row['upper_bound']) else 1000.0
|
|
796
|
|
797 # Aggiungi gene rule se presente
|
|
798 if pd.notna(row['Rule']) and str(row['Rule']).strip():
|
|
799 reaction.gene_reaction_rule = str(row['Rule']).strip()
|
|
800
|
|
801 # Parse della formula della reazione
|
408
|
802 try:
|
412
|
803 parse_reaction_formula(reaction, reaction_formula, metabolites_dict)
|
408
|
804 except Exception as e:
|
412
|
805 print(f"Errore nel parsing della reazione {reaction_id}: {e}")
|
408
|
806 reactions_skipped += 1
|
|
807 continue
|
412
|
808
|
|
809 # Aggiungi la reazione al modello
|
|
810 model.add_reactions([reaction])
|
|
811 reactions_added += 1
|
|
812
|
408
|
813
|
|
814 print(f"Aggiunte {reactions_added} reazioni, saltate {reactions_skipped} reazioni")
|
|
815
|
|
816 # Imposta l'obiettivo di biomassa
|
|
817 set_biomass_objective(model)
|
|
818
|
|
819 # Imposta il medium
|
|
820 set_medium_from_data(model, df)
|
|
821
|
|
822 print(f"Modello completato: {len(model.reactions)} reazioni, {len(model.metabolites)} metaboliti")
|
|
823
|
|
824 return model
|
|
825
|
|
826
|
|
827 # Estrae tutti gli ID metaboliti nella formula (gestisce prefissi numerici + underscore)
|
|
828 def extract_metabolites_from_reaction(reaction_formula: str) -> Set[str]:
|
|
829 """
|
|
830 Estrae gli ID dei metaboliti da una formula di reazione.
|
|
831 Pattern robusto: cattura token che terminano con _<compartimento> (es. _c, _m, _e)
|
|
832 e permette che comincino con cifre o underscore.
|
|
833 """
|
|
834 metabolites = set()
|
|
835 # coefficiente opzionale seguito da un token che termina con _<letters>
|
|
836 pattern = r'(?:\d+(?:\.\d+)?\s+)?([A-Za-z0-9_]+_[a-z]+)'
|
|
837 matches = re.findall(pattern, reaction_formula)
|
|
838 metabolites.update(matches)
|
|
839 return metabolites
|
|
840
|
|
841
|
|
842 def extract_compartment_from_metabolite(metabolite_id: str) -> str:
|
|
843 """
|
|
844 Estrae il compartimento dall'ID del metabolita.
|
|
845 """
|
|
846 # Il compartimento è solitamente l'ultima lettera dopo l'underscore
|
|
847 if '_' in metabolite_id:
|
|
848 return metabolite_id.split('_')[-1]
|
|
849 return 'c' # default cytoplasm
|
|
850
|
|
851
|
|
852 def parse_reaction_formula(reaction: Reaction, formula: str, metabolites_dict: Dict[str, Metabolite]):
|
|
853 """
|
|
854 Parsa una formula di reazione e imposta i metaboliti con i loro coefficienti.
|
|
855 """
|
|
856
|
|
857 if reaction.id == 'EX_thbpt_e':
|
|
858 print(reaction.id)
|
|
859 print(formula)
|
|
860 # Dividi in parte sinistra e destra
|
|
861 if '<=>' in formula:
|
|
862 left, right = formula.split('<=>')
|
|
863 reversible = True
|
|
864 elif '<--' in formula:
|
|
865 left, right = formula.split('<--')
|
|
866 reversible = False
|
|
867 left, right = left, right
|
|
868 elif '-->' in formula:
|
|
869 left, right = formula.split('-->')
|
|
870 reversible = False
|
|
871 elif '<-' in formula:
|
|
872 left, right = formula.split('<-')
|
|
873 reversible = False
|
|
874 left, right = left, right
|
|
875 else:
|
|
876 raise ValueError(f"Formato reazione non riconosciuto: {formula}")
|
|
877
|
|
878 # Parse dei metaboliti e coefficienti
|
|
879 reactants = parse_metabolites_side(left.strip())
|
|
880 products = parse_metabolites_side(right.strip())
|
|
881
|
|
882 # Aggiungi metaboliti alla reazione
|
|
883 metabolites_to_add = {}
|
|
884
|
|
885 # Reagenti (coefficienti negativi)
|
|
886 for met_id, coeff in reactants.items():
|
|
887 if met_id in metabolites_dict:
|
|
888 metabolites_to_add[metabolites_dict[met_id]] = -coeff
|
|
889
|
|
890 # Prodotti (coefficienti positivi)
|
|
891 for met_id, coeff in products.items():
|
|
892 if met_id in metabolites_dict:
|
|
893 metabolites_to_add[metabolites_dict[met_id]] = coeff
|
|
894
|
|
895 reaction.add_metabolites(metabolites_to_add)
|
|
896
|
|
897
|
|
898 def parse_metabolites_side(side_str: str) -> Dict[str, float]:
|
|
899 """
|
|
900 Parsa un lato della reazione per estrarre metaboliti e coefficienti.
|
|
901 """
|
|
902 metabolites = {}
|
|
903 if not side_str or side_str.strip() == '':
|
|
904 return metabolites
|
|
905
|
|
906 terms = side_str.split('+')
|
|
907 for term in terms:
|
|
908 term = term.strip()
|
|
909 if not term:
|
|
910 continue
|
|
911
|
|
912 # pattern allineato: coefficiente opzionale + id che termina con _<compartimento>
|
|
913 match = re.match(r'(?:(\d+\.?\d*)\s+)?([A-Za-z0-9_]+_[a-z]+)', term)
|
|
914 if match:
|
|
915 coeff_str, met_id = match.groups()
|
|
916 coeff = float(coeff_str) if coeff_str else 1.0
|
|
917 metabolites[met_id] = coeff
|
|
918
|
|
919 return metabolites
|
|
920
|
|
921
|
|
922
|
|
923 def set_biomass_objective(model: Model):
|
|
924 """
|
|
925 Imposta la reazione di biomassa come obiettivo.
|
|
926 """
|
|
927 biomass_reactions = [r for r in model.reactions if 'biomass' in r.id.lower()]
|
|
928
|
|
929 if biomass_reactions:
|
|
930 model.objective = biomass_reactions[0].id
|
|
931 print(f"Obiettivo impostato su: {biomass_reactions[0].id}")
|
|
932 else:
|
|
933 print("Nessuna reazione di biomassa trovata")
|
|
934
|
|
935
|
|
936 def set_medium_from_data(model: Model, df: pd.DataFrame):
|
|
937 """
|
|
938 Imposta il medium basato sulla colonna InMedium.
|
|
939 """
|
|
940 medium_reactions = df[df['InMedium'] == True]['ReactionID'].tolist()
|
|
941
|
|
942 medium_dict = {}
|
|
943 for rxn_id in medium_reactions:
|
|
944 if rxn_id in [r.id for r in model.reactions]:
|
|
945 reaction = model.reactions.get_by_id(rxn_id)
|
|
946 if reaction.lower_bound < 0: # Solo reazioni di uptake
|
|
947 medium_dict[rxn_id] = abs(reaction.lower_bound)
|
|
948
|
|
949 if medium_dict:
|
|
950 model.medium = medium_dict
|
|
951 print(f"Medium impostato con {len(medium_dict)} componenti")
|
|
952
|
|
953
|
|
954 def validate_model(model: Model) -> Dict[str, any]:
|
|
955 """
|
|
956 Valida il modello e fornisce statistiche di base.
|
|
957 """
|
|
958 validation = {
|
|
959 'num_reactions': len(model.reactions),
|
|
960 'num_metabolites': len(model.metabolites),
|
|
961 'num_genes': len(model.genes),
|
|
962 'num_compartments': len(model.compartments),
|
|
963 'objective': str(model.objective),
|
|
964 'medium_size': len(model.medium),
|
|
965 'reversible_reactions': len([r for r in model.reactions if r.reversibility]),
|
|
966 'exchange_reactions': len([r for r in model.reactions if r.id.startswith('EX_')]),
|
|
967 }
|
|
968
|
|
969 try:
|
|
970 # Test di crescita
|
|
971 solution = model.optimize()
|
|
972 validation['growth_rate'] = solution.objective_value
|
|
973 validation['status'] = solution.status
|
|
974 except Exception as e:
|
|
975 validation['growth_rate'] = None
|
|
976 validation['status'] = f"Error: {e}"
|
|
977
|
|
978 return validation
|
411
|
979
|
|
980
|
|
981 ################################- DATA GENERATION -################################
|
|
982 ReactionId = str
|
|
983 def generate_rules(model: cobra.Model, *, asParsed = True) -> Union[Dict[ReactionId, rulesUtils.OpList], Dict[ReactionId, str]]:
|
|
984 """
|
|
985 Generates a dictionary mapping reaction ids to rules from the model.
|
|
986
|
|
987 Args:
|
|
988 model : the model to derive data from.
|
|
989 asParsed : if True parses the rules to an optimized runtime format, otherwise leaves them as strings.
|
|
990
|
|
991 Returns:
|
|
992 Dict[ReactionId, rulesUtils.OpList] : the generated dictionary of parsed rules.
|
|
993 Dict[ReactionId, str] : the generated dictionary of raw rules.
|
|
994 """
|
|
995 # Is the below approach convoluted? yes
|
|
996 # Ok but is it inefficient? probably
|
|
997 # Ok but at least I don't have to repeat the check at every rule (I'm clinically insane)
|
|
998 _ruleGetter = lambda reaction : reaction.gene_reaction_rule
|
|
999 ruleExtractor = (lambda reaction :
|
|
1000 rulesUtils.parseRuleToNestedList(_ruleGetter(reaction))) if asParsed else _ruleGetter
|
|
1001
|
|
1002 return {
|
|
1003 reaction.id : ruleExtractor(reaction)
|
|
1004 for reaction in model.reactions
|
|
1005 if reaction.gene_reaction_rule }
|
|
1006
|
|
1007 def generate_reactions(model :cobra.Model, *, asParsed = True) -> Dict[ReactionId, str]:
|
|
1008 """
|
|
1009 Generates a dictionary mapping reaction ids to reaction formulas from the model.
|
|
1010
|
|
1011 Args:
|
|
1012 model : the model to derive data from.
|
|
1013 asParsed : if True parses the reactions to an optimized runtime format, otherwise leaves them as they are.
|
|
1014
|
|
1015 Returns:
|
|
1016 Dict[ReactionId, str] : the generated dictionary.
|
|
1017 """
|
|
1018
|
|
1019 unparsedReactions = {
|
|
1020 reaction.id : reaction.reaction
|
|
1021 for reaction in model.reactions
|
|
1022 if reaction.reaction
|
|
1023 }
|
|
1024
|
|
1025 if not asParsed: return unparsedReactions
|
|
1026
|
|
1027 return reactionUtils.create_reaction_dict(unparsedReactions)
|
|
1028
|
|
1029 def get_medium(model:cobra.Model) -> pd.DataFrame:
|
|
1030 trueMedium=[]
|
|
1031 for r in model.reactions:
|
|
1032 positiveCoeff=0
|
|
1033 for m in r.metabolites:
|
|
1034 if r.get_coefficient(m.id)>0:
|
|
1035 positiveCoeff=1;
|
|
1036 if (positiveCoeff==0 and r.lower_bound<0):
|
|
1037 trueMedium.append(r.id)
|
|
1038
|
|
1039 df_medium = pd.DataFrame()
|
|
1040 df_medium["reaction"] = trueMedium
|
|
1041 return df_medium
|
|
1042
|
|
1043 def generate_bounds(model:cobra.Model) -> pd.DataFrame:
|
|
1044
|
|
1045 rxns = []
|
|
1046 for reaction in model.reactions:
|
|
1047 rxns.append(reaction.id)
|
|
1048
|
|
1049 bounds = pd.DataFrame(columns = ["lower_bound", "upper_bound"], index=rxns)
|
|
1050
|
|
1051 for reaction in model.reactions:
|
|
1052 bounds.loc[reaction.id] = [reaction.lower_bound, reaction.upper_bound]
|
|
1053 return bounds
|
|
1054
|
|
1055
|
|
1056
|
|
1057 def generate_compartments(model: cobra.Model) -> pd.DataFrame:
|
|
1058 """
|
|
1059 Generates a DataFrame containing compartment information for each reaction.
|
|
1060 Creates columns for each compartment position (Compartment_1, Compartment_2, etc.)
|
|
1061
|
|
1062 Args:
|
|
1063 model: the COBRA model to extract compartment data from.
|
|
1064
|
|
1065 Returns:
|
|
1066 pd.DataFrame: DataFrame with ReactionID and compartment columns
|
|
1067 """
|
|
1068 pathway_data = []
|
|
1069
|
|
1070 # First pass: determine the maximum number of pathways any reaction has
|
|
1071 max_pathways = 0
|
|
1072 reaction_pathways = {}
|
|
1073
|
|
1074 for reaction in model.reactions:
|
|
1075 # Get unique pathways from all metabolites in the reaction
|
|
1076 if type(reaction.annotation['pathways']) == list:
|
|
1077 reaction_pathways[reaction.id] = reaction.annotation['pathways']
|
|
1078 max_pathways = max(max_pathways, len(reaction.annotation['pathways']))
|
|
1079 else:
|
|
1080 reaction_pathways[reaction.id] = [reaction.annotation['pathways']]
|
|
1081
|
|
1082 # Create column names for pathways
|
|
1083 pathway_columns = [f"Pathway_{i+1}" for i in range(max_pathways)]
|
|
1084
|
|
1085 # Second pass: create the data
|
|
1086 for reaction_id, pathways in reaction_pathways.items():
|
|
1087 row = {"ReactionID": reaction_id}
|
|
1088
|
|
1089 # Fill pathway columns
|
|
1090 for i in range(max_pathways):
|
|
1091 col_name = pathway_columns[i]
|
|
1092 if i < len(pathways):
|
|
1093 row[col_name] = pathways[i]
|
|
1094 else:
|
|
1095 row[col_name] = None # or "" if you prefer empty strings
|
|
1096
|
|
1097 pathway_data.append(row)
|
|
1098
|
|
1099 return pd.DataFrame(pathway_data) |