Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/galaxy/util/compression_utils.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
comparison
equal
deleted
inserted
replaced
| 4:79f47841a781 | 5:9b1c78e6ba9c |
|---|---|
| 1 from __future__ import absolute_import | |
| 2 | |
| 3 import gzip | |
| 4 import io | |
| 5 import logging | |
| 6 import os | |
| 7 import tarfile | |
| 8 import zipfile | |
| 9 | |
| 10 from galaxy.util.path import safe_relpath | |
| 11 from .checkers import ( | |
| 12 bz2, | |
| 13 is_bz2, | |
| 14 is_gzip | |
| 15 ) | |
| 16 | |
| 17 log = logging.getLogger(__name__) | |
| 18 | |
| 19 | |
| 20 def get_fileobj(filename, mode="r", compressed_formats=None): | |
| 21 """ | |
| 22 Returns a fileobj. If the file is compressed, return an appropriate file | |
| 23 reader. In text mode, always use 'utf-8' encoding. | |
| 24 | |
| 25 :param filename: path to file that should be opened | |
| 26 :param mode: mode to pass to opener | |
| 27 :param compressed_formats: list of allowed compressed file formats among | |
| 28 'bz2', 'gzip' and 'zip'. If left to None, all 3 formats are allowed | |
| 29 """ | |
| 30 return get_fileobj_raw(filename, mode, compressed_formats)[1] | |
| 31 | |
| 32 | |
| 33 def get_fileobj_raw(filename, mode="r", compressed_formats=None): | |
| 34 if compressed_formats is None: | |
| 35 compressed_formats = ['bz2', 'gzip', 'zip'] | |
| 36 # Remove 't' from mode, which may cause an error for compressed files | |
| 37 mode = mode.replace('t', '') | |
| 38 # 'U' mode is deprecated, we open in 'r'. | |
| 39 if mode == 'U': | |
| 40 mode = 'r' | |
| 41 compressed_format = None | |
| 42 if 'gzip' in compressed_formats and is_gzip(filename): | |
| 43 fh = gzip.GzipFile(filename, mode) | |
| 44 compressed_format = 'gzip' | |
| 45 elif 'bz2' in compressed_formats and is_bz2(filename): | |
| 46 fh = bz2.BZ2File(filename, mode) | |
| 47 compressed_format = 'bz2' | |
| 48 elif 'zip' in compressed_formats and zipfile.is_zipfile(filename): | |
| 49 # Return fileobj for the first file in a zip file. | |
| 50 # 'b' is not allowed in the ZipFile mode argument | |
| 51 # since it always opens files in binary mode. | |
| 52 # For emulating text mode, we will be returning the binary fh in a | |
| 53 # TextIOWrapper. | |
| 54 zf_mode = mode.replace('b', '') | |
| 55 with zipfile.ZipFile(filename, zf_mode) as zh: | |
| 56 fh = zh.open(zh.namelist()[0], zf_mode) | |
| 57 compressed_format = 'zip' | |
| 58 elif 'b' in mode: | |
| 59 return compressed_format, open(filename, mode) | |
| 60 else: | |
| 61 return compressed_format, io.open(filename, mode, encoding='utf-8') | |
| 62 if 'b' not in mode: | |
| 63 return compressed_format, io.TextIOWrapper(fh, encoding='utf-8') | |
| 64 else: | |
| 65 return compressed_format, fh | |
| 66 | |
| 67 | |
| 68 class CompressedFile(object): | |
| 69 | |
| 70 @staticmethod | |
| 71 def can_decompress(file_path): | |
| 72 return tarfile.is_tarfile(file_path) or zipfile.is_zipfile(file_path) | |
| 73 | |
| 74 def __init__(self, file_path, mode='r'): | |
| 75 if tarfile.is_tarfile(file_path): | |
| 76 self.file_type = 'tar' | |
| 77 elif zipfile.is_zipfile(file_path) and not file_path.endswith('.jar'): | |
| 78 self.file_type = 'zip' | |
| 79 self.file_name = os.path.splitext(os.path.basename(file_path))[0] | |
| 80 if self.file_name.endswith('.tar'): | |
| 81 self.file_name = os.path.splitext(self.file_name)[0] | |
| 82 self.type = self.file_type | |
| 83 method = 'open_%s' % self.file_type | |
| 84 if hasattr(self, method): | |
| 85 self.archive = getattr(self, method)(file_path, mode) | |
| 86 else: | |
| 87 raise NameError('File type %s specified, no open method found.' % self.file_type) | |
| 88 | |
| 89 @property | |
| 90 def common_prefix_dir(self): | |
| 91 """ | |
| 92 Get the common prefix directory for all the files in the archive, if any. | |
| 93 | |
| 94 Returns '' if the archive contains multiple files and/or directories at | |
| 95 the root of the archive. | |
| 96 """ | |
| 97 contents = self.getmembers() | |
| 98 common_prefix = '' | |
| 99 if len(contents) > 1: | |
| 100 common_prefix = os.path.commonprefix([self.getname(item) for item in contents]) | |
| 101 # If the common_prefix does not end with a slash, check that is a | |
| 102 # directory and all other files are contained in it | |
| 103 if len(common_prefix) >= 1 and not common_prefix.endswith(os.sep) and self.isdir(self.getmember(common_prefix)) \ | |
| 104 and all(self.getname(item).startswith(common_prefix + os.sep) for item in contents if self.isfile(item)): | |
| 105 common_prefix += os.sep | |
| 106 if not common_prefix.endswith(os.sep): | |
| 107 common_prefix = '' | |
| 108 return common_prefix | |
| 109 | |
| 110 def extract(self, path): | |
| 111 '''Determine the path to which the archive should be extracted.''' | |
| 112 contents = self.getmembers() | |
| 113 extraction_path = path | |
| 114 common_prefix_dir = self.common_prefix_dir | |
| 115 if len(contents) == 1: | |
| 116 # The archive contains a single file, return the extraction path. | |
| 117 if self.isfile(contents[0]): | |
| 118 extraction_path = os.path.join(path, self.file_name) | |
| 119 if not os.path.exists(extraction_path): | |
| 120 os.makedirs(extraction_path) | |
| 121 self.archive.extractall(extraction_path, members=self.safemembers()) | |
| 122 else: | |
| 123 if not common_prefix_dir: | |
| 124 extraction_path = os.path.join(path, self.file_name) | |
| 125 if not os.path.exists(extraction_path): | |
| 126 os.makedirs(extraction_path) | |
| 127 self.archive.extractall(extraction_path, members=self.safemembers()) | |
| 128 # Since .zip files store unix permissions separately, we need to iterate through the zip file | |
| 129 # and set permissions on extracted members. | |
| 130 if self.file_type == 'zip': | |
| 131 for zipped_file in contents: | |
| 132 filename = self.getname(zipped_file) | |
| 133 absolute_filepath = os.path.join(extraction_path, filename) | |
| 134 external_attributes = self.archive.getinfo(filename).external_attr | |
| 135 # The 2 least significant bytes are irrelevant, the next two contain unix permissions. | |
| 136 unix_permissions = external_attributes >> 16 | |
| 137 if unix_permissions != 0: | |
| 138 if os.path.exists(absolute_filepath): | |
| 139 os.chmod(absolute_filepath, unix_permissions) | |
| 140 else: | |
| 141 log.warning("Unable to change permission on extracted file '%s' as it does not exist" % absolute_filepath) | |
| 142 return os.path.abspath(os.path.join(extraction_path, common_prefix_dir)) | |
| 143 | |
| 144 def safemembers(self): | |
| 145 members = self.archive | |
| 146 common_prefix_dir = self.common_prefix_dir | |
| 147 if self.file_type == "tar": | |
| 148 for finfo in members: | |
| 149 if not safe_relpath(finfo.name): | |
| 150 raise Exception("Path '%s' is blocked (illegal path)." % finfo.name) | |
| 151 if finfo.issym() or finfo.islnk(): | |
| 152 link_target = os.path.join(os.path.dirname(finfo.name), finfo.linkname) | |
| 153 if not safe_relpath(link_target) or not os.path.normpath(link_target).startswith(common_prefix_dir): | |
| 154 raise Exception("Link '%s' to '%s' is blocked." % (finfo.name, finfo.linkname)) | |
| 155 yield finfo | |
| 156 elif self.file_type == "zip": | |
| 157 for name in members.namelist(): | |
| 158 if not safe_relpath(name): | |
| 159 raise Exception(name + " is blocked (illegal path).") | |
| 160 yield name | |
| 161 | |
| 162 def getmembers_tar(self): | |
| 163 return self.archive.getmembers() | |
| 164 | |
| 165 def getmembers_zip(self): | |
| 166 return self.archive.infolist() | |
| 167 | |
| 168 def getname_tar(self, item): | |
| 169 return item.name | |
| 170 | |
| 171 def getname_zip(self, item): | |
| 172 return item.filename | |
| 173 | |
| 174 def getmember(self, name): | |
| 175 for member in self.getmembers(): | |
| 176 if self.getname(member) == name: | |
| 177 return member | |
| 178 | |
| 179 def getmembers(self): | |
| 180 return getattr(self, 'getmembers_%s' % self.type)() | |
| 181 | |
| 182 def getname(self, member): | |
| 183 return getattr(self, 'getname_%s' % self.type)(member) | |
| 184 | |
| 185 def isdir(self, member): | |
| 186 return getattr(self, 'isdir_%s' % self.type)(member) | |
| 187 | |
| 188 def isdir_tar(self, member): | |
| 189 return member.isdir() | |
| 190 | |
| 191 def isdir_zip(self, member): | |
| 192 if member.filename.endswith(os.sep): | |
| 193 return True | |
| 194 return False | |
| 195 | |
| 196 def isfile(self, member): | |
| 197 if not self.isdir(member): | |
| 198 return True | |
| 199 return False | |
| 200 | |
| 201 def open_tar(self, filepath, mode): | |
| 202 return tarfile.open(filepath, mode, errorlevel=0) | |
| 203 | |
| 204 def open_zip(self, filepath, mode): | |
| 205 return zipfile.ZipFile(filepath, mode) | |
| 206 | |
| 207 def zipfile_ok(self, path_to_archive): | |
| 208 """ | |
| 209 This function is a bit pedantic and not functionally necessary. It checks whether there is | |
| 210 no file pointing outside of the extraction, because ZipFile.extractall() has some potential | |
| 211 security holes. See python zipfile documentation for more details. | |
| 212 """ | |
| 213 basename = os.path.realpath(os.path.dirname(path_to_archive)) | |
| 214 zip_archive = zipfile.ZipFile(path_to_archive) | |
| 215 for member in zip_archive.namelist(): | |
| 216 member_path = os.path.realpath(os.path.join(basename, member)) | |
| 217 if not member_path.startswith(basename): | |
| 218 return False | |
| 219 return True |
