Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/galaxy/util/compression_utils.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/galaxy/util/compression_utils.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,219 +0,0 @@ -from __future__ import absolute_import - -import gzip -import io -import logging -import os -import tarfile -import zipfile - -from galaxy.util.path import safe_relpath -from .checkers import ( - bz2, - is_bz2, - is_gzip -) - -log = logging.getLogger(__name__) - - -def get_fileobj(filename, mode="r", compressed_formats=None): - """ - Returns a fileobj. If the file is compressed, return an appropriate file - reader. In text mode, always use 'utf-8' encoding. - - :param filename: path to file that should be opened - :param mode: mode to pass to opener - :param compressed_formats: list of allowed compressed file formats among - 'bz2', 'gzip' and 'zip'. If left to None, all 3 formats are allowed - """ - return get_fileobj_raw(filename, mode, compressed_formats)[1] - - -def get_fileobj_raw(filename, mode="r", compressed_formats=None): - if compressed_formats is None: - compressed_formats = ['bz2', 'gzip', 'zip'] - # Remove 't' from mode, which may cause an error for compressed files - mode = mode.replace('t', '') - # 'U' mode is deprecated, we open in 'r'. - if mode == 'U': - mode = 'r' - compressed_format = None - if 'gzip' in compressed_formats and is_gzip(filename): - fh = gzip.GzipFile(filename, mode) - compressed_format = 'gzip' - elif 'bz2' in compressed_formats and is_bz2(filename): - fh = bz2.BZ2File(filename, mode) - compressed_format = 'bz2' - elif 'zip' in compressed_formats and zipfile.is_zipfile(filename): - # Return fileobj for the first file in a zip file. - # 'b' is not allowed in the ZipFile mode argument - # since it always opens files in binary mode. - # For emulating text mode, we will be returning the binary fh in a - # TextIOWrapper. - zf_mode = mode.replace('b', '') - with zipfile.ZipFile(filename, zf_mode) as zh: - fh = zh.open(zh.namelist()[0], zf_mode) - compressed_format = 'zip' - elif 'b' in mode: - return compressed_format, open(filename, mode) - else: - return compressed_format, io.open(filename, mode, encoding='utf-8') - if 'b' not in mode: - return compressed_format, io.TextIOWrapper(fh, encoding='utf-8') - else: - return compressed_format, fh - - -class CompressedFile(object): - - @staticmethod - def can_decompress(file_path): - return tarfile.is_tarfile(file_path) or zipfile.is_zipfile(file_path) - - def __init__(self, file_path, mode='r'): - if tarfile.is_tarfile(file_path): - self.file_type = 'tar' - elif zipfile.is_zipfile(file_path) and not file_path.endswith('.jar'): - self.file_type = 'zip' - self.file_name = os.path.splitext(os.path.basename(file_path))[0] - if self.file_name.endswith('.tar'): - self.file_name = os.path.splitext(self.file_name)[0] - self.type = self.file_type - method = 'open_%s' % self.file_type - if hasattr(self, method): - self.archive = getattr(self, method)(file_path, mode) - else: - raise NameError('File type %s specified, no open method found.' % self.file_type) - - @property - def common_prefix_dir(self): - """ - Get the common prefix directory for all the files in the archive, if any. - - Returns '' if the archive contains multiple files and/or directories at - the root of the archive. - """ - contents = self.getmembers() - common_prefix = '' - if len(contents) > 1: - common_prefix = os.path.commonprefix([self.getname(item) for item in contents]) - # If the common_prefix does not end with a slash, check that is a - # directory and all other files are contained in it - if len(common_prefix) >= 1 and not common_prefix.endswith(os.sep) and self.isdir(self.getmember(common_prefix)) \ - and all(self.getname(item).startswith(common_prefix + os.sep) for item in contents if self.isfile(item)): - common_prefix += os.sep - if not common_prefix.endswith(os.sep): - common_prefix = '' - return common_prefix - - def extract(self, path): - '''Determine the path to which the archive should be extracted.''' - contents = self.getmembers() - extraction_path = path - common_prefix_dir = self.common_prefix_dir - if len(contents) == 1: - # The archive contains a single file, return the extraction path. - if self.isfile(contents[0]): - extraction_path = os.path.join(path, self.file_name) - if not os.path.exists(extraction_path): - os.makedirs(extraction_path) - self.archive.extractall(extraction_path, members=self.safemembers()) - else: - if not common_prefix_dir: - extraction_path = os.path.join(path, self.file_name) - if not os.path.exists(extraction_path): - os.makedirs(extraction_path) - self.archive.extractall(extraction_path, members=self.safemembers()) - # Since .zip files store unix permissions separately, we need to iterate through the zip file - # and set permissions on extracted members. - if self.file_type == 'zip': - for zipped_file in contents: - filename = self.getname(zipped_file) - absolute_filepath = os.path.join(extraction_path, filename) - external_attributes = self.archive.getinfo(filename).external_attr - # The 2 least significant bytes are irrelevant, the next two contain unix permissions. - unix_permissions = external_attributes >> 16 - if unix_permissions != 0: - if os.path.exists(absolute_filepath): - os.chmod(absolute_filepath, unix_permissions) - else: - log.warning("Unable to change permission on extracted file '%s' as it does not exist" % absolute_filepath) - return os.path.abspath(os.path.join(extraction_path, common_prefix_dir)) - - def safemembers(self): - members = self.archive - common_prefix_dir = self.common_prefix_dir - if self.file_type == "tar": - for finfo in members: - if not safe_relpath(finfo.name): - raise Exception("Path '%s' is blocked (illegal path)." % finfo.name) - if finfo.issym() or finfo.islnk(): - link_target = os.path.join(os.path.dirname(finfo.name), finfo.linkname) - if not safe_relpath(link_target) or not os.path.normpath(link_target).startswith(common_prefix_dir): - raise Exception("Link '%s' to '%s' is blocked." % (finfo.name, finfo.linkname)) - yield finfo - elif self.file_type == "zip": - for name in members.namelist(): - if not safe_relpath(name): - raise Exception(name + " is blocked (illegal path).") - yield name - - def getmembers_tar(self): - return self.archive.getmembers() - - def getmembers_zip(self): - return self.archive.infolist() - - def getname_tar(self, item): - return item.name - - def getname_zip(self, item): - return item.filename - - def getmember(self, name): - for member in self.getmembers(): - if self.getname(member) == name: - return member - - def getmembers(self): - return getattr(self, 'getmembers_%s' % self.type)() - - def getname(self, member): - return getattr(self, 'getname_%s' % self.type)(member) - - def isdir(self, member): - return getattr(self, 'isdir_%s' % self.type)(member) - - def isdir_tar(self, member): - return member.isdir() - - def isdir_zip(self, member): - if member.filename.endswith(os.sep): - return True - return False - - def isfile(self, member): - if not self.isdir(member): - return True - return False - - def open_tar(self, filepath, mode): - return tarfile.open(filepath, mode, errorlevel=0) - - def open_zip(self, filepath, mode): - return zipfile.ZipFile(filepath, mode) - - def zipfile_ok(self, path_to_archive): - """ - This function is a bit pedantic and not functionally necessary. It checks whether there is - no file pointing outside of the extraction, because ZipFile.extractall() has some potential - security holes. See python zipfile documentation for more details. - """ - basename = os.path.realpath(os.path.dirname(path_to_archive)) - zip_archive = zipfile.ZipFile(path_to_archive) - for member in zip_archive.namelist(): - member_path = os.path.realpath(os.path.join(basename, member)) - if not member_path.startswith(basename): - return False - return True
