comparison env/lib/python3.9/site-packages/setuptools/archive_util.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Utilities for extracting common archive formats"""
2
3 import zipfile
4 import tarfile
5 import os
6 import shutil
7 import posixpath
8 import contextlib
9 from distutils.errors import DistutilsError
10
11 from pkg_resources import ensure_directory
12
13 __all__ = [
14 "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter",
15 "UnrecognizedFormat", "extraction_drivers", "unpack_directory",
16 ]
17
18
19 class UnrecognizedFormat(DistutilsError):
20 """Couldn't recognize the archive type"""
21
22
23 def default_filter(src, dst):
24 """The default progress/filter callback; returns True for all files"""
25 return dst
26
27
28 def unpack_archive(
29 filename, extract_dir, progress_filter=default_filter,
30 drivers=None):
31 """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat``
32
33 `progress_filter` is a function taking two arguments: a source path
34 internal to the archive ('/'-separated), and a filesystem path where it
35 will be extracted. The callback must return the desired extract path
36 (which may be the same as the one passed in), or else ``None`` to skip
37 that file or directory. The callback can thus be used to report on the
38 progress of the extraction, as well as to filter the items extracted or
39 alter their extraction paths.
40
41 `drivers`, if supplied, must be a non-empty sequence of functions with the
42 same signature as this function (minus the `drivers` argument), that raise
43 ``UnrecognizedFormat`` if they do not support extracting the designated
44 archive type. The `drivers` are tried in sequence until one is found that
45 does not raise an error, or until all are exhausted (in which case
46 ``UnrecognizedFormat`` is raised). If you do not supply a sequence of
47 drivers, the module's ``extraction_drivers`` constant will be used, which
48 means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that
49 order.
50 """
51 for driver in drivers or extraction_drivers:
52 try:
53 driver(filename, extract_dir, progress_filter)
54 except UnrecognizedFormat:
55 continue
56 else:
57 return
58 else:
59 raise UnrecognizedFormat(
60 "Not a recognized archive type: %s" % filename
61 )
62
63
64 def unpack_directory(filename, extract_dir, progress_filter=default_filter):
65 """"Unpack" a directory, using the same interface as for archives
66
67 Raises ``UnrecognizedFormat`` if `filename` is not a directory
68 """
69 if not os.path.isdir(filename):
70 raise UnrecognizedFormat("%s is not a directory" % filename)
71
72 paths = {
73 filename: ('', extract_dir),
74 }
75 for base, dirs, files in os.walk(filename):
76 src, dst = paths[base]
77 for d in dirs:
78 paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
79 for f in files:
80 target = os.path.join(dst, f)
81 target = progress_filter(src + f, target)
82 if not target:
83 # skip non-files
84 continue
85 ensure_directory(target)
86 f = os.path.join(base, f)
87 shutil.copyfile(f, target)
88 shutil.copystat(f, target)
89
90
91 def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
92 """Unpack zip `filename` to `extract_dir`
93
94 Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
95 by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation
96 of the `progress_filter` argument.
97 """
98
99 if not zipfile.is_zipfile(filename):
100 raise UnrecognizedFormat("%s is not a zip file" % (filename,))
101
102 with zipfile.ZipFile(filename) as z:
103 for info in z.infolist():
104 name = info.filename
105
106 # don't extract absolute paths or ones with .. in them
107 if name.startswith('/') or '..' in name.split('/'):
108 continue
109
110 target = os.path.join(extract_dir, *name.split('/'))
111 target = progress_filter(name, target)
112 if not target:
113 continue
114 if name.endswith('/'):
115 # directory
116 ensure_directory(target)
117 else:
118 # file
119 ensure_directory(target)
120 data = z.read(info.filename)
121 with open(target, 'wb') as f:
122 f.write(data)
123 unix_attributes = info.external_attr >> 16
124 if unix_attributes:
125 os.chmod(target, unix_attributes)
126
127
128 def _resolve_tar_file_or_dir(tar_obj, tar_member_obj):
129 """Resolve any links and extract link targets as normal files."""
130 while tar_member_obj is not None and (
131 tar_member_obj.islnk() or tar_member_obj.issym()):
132 linkpath = tar_member_obj.linkname
133 if tar_member_obj.issym():
134 base = posixpath.dirname(tar_member_obj.name)
135 linkpath = posixpath.join(base, linkpath)
136 linkpath = posixpath.normpath(linkpath)
137 tar_member_obj = tar_obj._getmember(linkpath)
138
139 is_file_or_dir = (
140 tar_member_obj is not None and
141 (tar_member_obj.isfile() or tar_member_obj.isdir())
142 )
143 if is_file_or_dir:
144 return tar_member_obj
145
146 raise LookupError('Got unknown file type')
147
148
149 def _iter_open_tar(tar_obj, extract_dir, progress_filter):
150 """Emit member-destination pairs from a tar archive."""
151 # don't do any chowning!
152 tar_obj.chown = lambda *args: None
153
154 with contextlib.closing(tar_obj):
155 for member in tar_obj:
156 name = member.name
157 # don't extract absolute paths or ones with .. in them
158 if name.startswith('/') or '..' in name.split('/'):
159 continue
160
161 prelim_dst = os.path.join(extract_dir, *name.split('/'))
162
163 try:
164 member = _resolve_tar_file_or_dir(tar_obj, member)
165 except LookupError:
166 continue
167
168 final_dst = progress_filter(name, prelim_dst)
169 if not final_dst:
170 continue
171
172 if final_dst.endswith(os.sep):
173 final_dst = final_dst[:-1]
174
175 yield member, final_dst
176
177
178 def unpack_tarfile(filename, extract_dir, progress_filter=default_filter):
179 """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
180
181 Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined
182 by ``tarfile.open()``). See ``unpack_archive()`` for an explanation
183 of the `progress_filter` argument.
184 """
185 try:
186 tarobj = tarfile.open(filename)
187 except tarfile.TarError as e:
188 raise UnrecognizedFormat(
189 "%s is not a compressed or uncompressed tar file" % (filename,)
190 ) from e
191
192 for member, final_dst in _iter_open_tar(
193 tarobj, extract_dir, progress_filter,
194 ):
195 try:
196 # XXX Ugh
197 tarobj._extract_member(member, final_dst)
198 except tarfile.ExtractError:
199 # chown/chmod/mkfifo/mknode/makedev failed
200 pass
201
202 return True
203
204
205 extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile