Mercurial > repos > bimib > cobraxy
comparison COBRAxy/utils/general_utils.py @ 335:2b7911a8366f draft
Uploaded
| author | luca_milaz |
|---|---|
| date | Thu, 04 Sep 2025 12:05:10 +0000 |
| parents | 63f5078627a9 |
| children | b89091ae2484 |
comparison
equal
deleted
inserted
replaced
| 334:c561c060a55f | 335:2b7911a8366f |
|---|---|
| 5 import pickle | 5 import pickle |
| 6 import lxml.etree as ET | 6 import lxml.etree as ET |
| 7 | 7 |
| 8 from enum import Enum | 8 from enum import Enum |
| 9 from itertools import count | 9 from itertools import count |
| 10 from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Union | 10 from typing import Any, Callable, Dict, Generic, List, Literal, Optional, TypeVar, Union |
| 11 | 11 |
| 12 import pandas as pd | 12 import pandas as pd |
| 13 import cobra | 13 import cobra |
| 14 | 14 |
| 15 import zipfile | |
| 16 import gzip | |
| 17 import bz2 | |
| 18 from io import StringIO | |
| 19 | |
| 15 # FILES | 20 # FILES |
| 16 class FileFormat(Enum): | 21 class FileFormat(Enum): |
| 17 """ | 22 """ |
| 18 Encodes possible file extensions to conditionally save data in a different format. | 23 Encodes possible file extensions to conditionally save data in a different format. |
| 19 """ | 24 """ |
| 20 DAT = ("dat",) # this is how galaxy treats all your files! | 25 DAT = ("dat",) # this is how galaxy treats all your files! |
| 21 CSV = ("csv",) # this is how most editable input data is written | 26 CSV = ("csv",) # this is how most editable input data is written |
| 22 TSV = ("tsv",) # this is how most editable input data is ACTUALLY written TODO:more support pls!! | 27 TSV = ("tsv",) # this is how most editable input data is ACTUALLY written |
| 23 | 28 |
| 24 SVG = ("svg",) # this is how most metabolic maps are written | 29 SVG = ("svg",) # this is how most metabolic maps are written |
| 25 PNG = ("png",) # this is a common output format for images (such as metabolic maps) | 30 PNG = ("png",) # this is a common output format for images (such as metabolic maps) |
| 26 PDF = ("pdf",) # this is also a common output format for images, as it's required in publications. | 31 PDF = ("pdf",) # this is also a common output format for images, as it's required in publications. |
| 27 | 32 |
| 28 XML = ("xml",) # this is one main way cobra models appear in | 33 XML = ("xml","xml.gz", "xml.zip", "xml.bz2") # SBML files are XML files, sometimes compressed |
| 29 JSON = ("json",) # this is the other | 34 JSON = ("json","json.gz", "json.zip", "json.bz2") # COBRA models can be stored as JSON files, sometimes compressed |
| 30 | 35 |
| 31 TXT = ("txt",) # this is how most output data is written | 36 TXT = ("txt",) # this is how most output data is written |
| 32 | 37 |
| 33 PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved | 38 PICKLE = ("pickle", "pk", "p") # this is how all runtime data structures are saved |
| 34 #TODO: we're in a pickle (ba dum tss), there's no point in supporting many extensions internally. The | 39 |
| 35 # issue will never be solved for user-uploaded files and those are saved as .dat by galaxy anyway so it | 40 def __init__(self): |
| 36 # doesn't matter as long as we CAN recognize these 3 names as valid pickle extensions. We must however | 41 self.original_extension = "" |
| 37 # agree on an internal standard and use only that one, otherwise constructing usable paths becomes a nightmare. | 42 |
| 43 | |
| 38 @classmethod | 44 @classmethod |
| 39 def fromExt(cls, ext :str) -> "FileFormat": | 45 def fromExt(cls, ext :str) -> "FileFormat": |
| 40 """ | 46 """ |
| 41 Converts a file extension string to a FileFormat instance. | 47 Converts a file extension string to a FileFormat instance. |
| 42 | 48 |
| 45 | 51 |
| 46 Returns: | 52 Returns: |
| 47 FileFormat: The FileFormat instance corresponding to the file extension. | 53 FileFormat: The FileFormat instance corresponding to the file extension. |
| 48 """ | 54 """ |
| 49 variantName = ext.upper() | 55 variantName = ext.upper() |
| 50 if variantName in FileFormat.__members__: return FileFormat[variantName] | 56 if variantName in FileFormat.__members__: |
| 57 instance = FileFormat[variantName] | |
| 58 instance.original_extension = ext | |
| 59 return instance | |
| 51 | 60 |
| 52 variantName = variantName.lower() | 61 variantName = variantName.lower() |
| 53 for member in cls: | 62 for member in cls: |
| 54 if variantName in member.value: return member | 63 if variantName in member.value: |
| 64 member.original_extension = ext | |
| 65 return member | |
| 55 | 66 |
| 56 raise ValueErr("ext", "a valid FileFormat file extension", ext) | 67 raise ValueErr("ext", "a valid FileFormat file extension", ext) |
| 57 | 68 |
| 58 def __str__(self) -> str: | 69 def __str__(self) -> str: |
| 59 """ | 70 """ |
| 60 (Private) converts to str representation. Good practice for usage with argparse. | 71 (Private) converts to str representation. Good practice for usage with argparse. |
| 61 | 72 |
| 62 Returns: | 73 Returns: |
| 63 str : the string representation of the file extension. | 74 str : the string representation of the file extension. |
| 64 """ | 75 """ |
| 65 return self.value[-1] #TODO: fix, it's the dumb pickle thing | 76 |
| 77 if(self.values[-1] in ["json", "xml"]): #return the original string extension for compressed files | |
| 78 return self.original_extension | |
| 79 else: | |
| 80 return self.value[-1] # for all other formats and pickle | |
| 66 | 81 |
| 67 class FilePath(): | 82 class FilePath(): |
| 68 """ | 83 """ |
| 69 Represents a file path. View this as an attempt to standardize file-related operations by expecting | 84 Represents a file path. View this as an attempt to standardize file-related operations by expecting |
| 70 values of this type in any process requesting a file path. | 85 values of this type in any process requesting a file path. |
| 89 | 104 |
| 90 @classmethod | 105 @classmethod |
| 91 def fromStrPath(cls, path :str) -> "FilePath": | 106 def fromStrPath(cls, path :str) -> "FilePath": |
| 92 """ | 107 """ |
| 93 Factory method to parse a string from which to obtain, if possible, a valid FilePath instance. | 108 Factory method to parse a string from which to obtain, if possible, a valid FilePath instance. |
| 109 It detects double extensions such as .json.gz and .xml.bz2, which are common in COBRA models. | |
| 110 These double extensions are not supported for other file types such as .csv. | |
| 94 | 111 |
| 95 Args: | 112 Args: |
| 96 path : the string containing the path | 113 path : the string containing the path |
| 97 | 114 |
| 98 Raises: | 115 Raises: |
| 111 result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path) | 128 result = re.search(r"^(?P<prefix>.*\/)?(?P<name>.*)\.(?P<ext>[^.]*)$", path) |
| 112 if not result or not result["name"] or not result["ext"]: | 129 if not result or not result["name"] or not result["ext"]: |
| 113 raise PathErr(path, "cannot recognize folder structure or extension in path") | 130 raise PathErr(path, "cannot recognize folder structure or extension in path") |
| 114 | 131 |
| 115 prefix = result["prefix"] if result["prefix"] else "" | 132 prefix = result["prefix"] if result["prefix"] else "" |
| 116 return cls(result["name"], FileFormat.fromExt(result["ext"]), prefix = prefix) | 133 name, ext = result["name"], result["ext"] |
| 134 | |
| 135 # Split path into parts | |
| 136 parts = path.split(".") | |
| 137 if len(parts) >= 3: | |
| 138 penultimate = parts[-2] | |
| 139 last = parts[-1] | |
| 140 if penultimate in {"json", "xml"}: | |
| 141 name = ".".join(parts[:-2]) | |
| 142 ext = f"{penultimate}.{last}" | |
| 143 | |
| 144 return cls(name, FileFormat.fromExt(ext), prefix=prefix) | |
| 117 | 145 |
| 118 def show(self) -> str: | 146 def show(self) -> str: |
| 119 """ | 147 """ |
| 120 Shows the path as a string. | 148 Shows the path as a string. |
| 121 | 149 |
| 560 return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show()) | 588 return cobra.io.read_sbml_model(FilePath(f"{self.name}", FileFormat.XML, prefix = f"{toolDir}/local/models/").show()) |
| 561 | 589 |
| 562 def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model: | 590 def load_custom_model(self, file_path :FilePath, ext :Optional[FileFormat] = None) -> cobra.Model: |
| 563 ext = ext if ext else file_path.ext | 591 ext = ext if ext else file_path.ext |
| 564 try: | 592 try: |
| 565 if ext is FileFormat.XML: | 593 if ext in FileFormat.XML: |
| 566 return cobra.io.read_sbml_model(file_path.show()) | 594 return cobra.io.read_sbml_model(file_path.show()) |
| 567 | 595 |
| 568 if ext is FileFormat.JSON: | 596 if ext in FileFormat.JSON: |
| 569 return cobra.io.load_json_model(file_path.show()) | 597 # Compressed files are not automatically handled by cobra |
| 598 if(ext == "json"): | |
| 599 return cobra.io.load_json_model(file_path.show()) | |
| 600 else: | |
| 601 return self.extract_json_model(file_path, ext) | |
| 570 | 602 |
| 571 except Exception as e: raise DataErr(file_path, e.__str__()) | 603 except Exception as e: raise DataErr(file_path, e.__str__()) |
| 572 raise DataErr(file_path, | 604 raise DataErr(file_path, |
| 573 f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.") | 605 f"Fomat \"{file_path.ext}\" is not recognized, only JSON and XML files are supported.") |
| 606 | |
| 607 | |
| 608 def extract_json_model(file_path:FilePath, ext :FileFormat) -> cobra.Model: | |
| 609 """ | |
| 610 Extract json COBRA model from a compressed file (zip, gz, bz2). | |
| 611 | |
| 612 Args: | |
| 613 file_path: File path of the model | |
| 614 ext: File extensions of class FileFormat (should be .zip, .gz or .bz2) | |
| 615 | |
| 616 Returns: | |
| 617 cobra.Model: COBRApy model | |
| 618 | |
| 619 Raises: | |
| 620 Exception: Extraction errors | |
| 621 """ | |
| 622 ext_str = str(ext) | |
| 623 | |
| 624 try: | |
| 625 if '.zip' in ext_str: | |
| 626 with zipfile.ZipFile(file_path.show(), 'r') as zip_ref: | |
| 627 with zip_ref.open(zip_ref.namelist()[0]) as json_file: | |
| 628 content = json_file.read().decode('utf-8') | |
| 629 return cobra.io.load_json_model(StringIO(content)) | |
| 630 elif '.gz' in ext_str: | |
| 631 with gzip.open(file_path.show(), 'rt', encoding='utf-8') as gz_ref: | |
| 632 return cobra.io.load_json_model(gz_ref) | |
| 633 elif '.bz2' in ext_str: | |
| 634 with bz2.open(file_path.show(), 'rt', encoding='utf-8') as bz2_ref: | |
| 635 return cobra.io.load_json_model(bz2_ref) | |
| 636 else: | |
| 637 raise ValueError(f"Compression format not supported: {ext_str}. Supported: .zip, .gz and .bz2") | |
| 638 | |
| 639 except Exception as e: | |
| 640 raise Exception(f"Error during model extraction: {str(e)}") | |
| 641 | |
| 642 | |
| 574 | 643 |
| 575 def __str__(self) -> str: return self.value | 644 def __str__(self) -> str: return self.value |
