Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/setuptools/archive_util.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """Utilities for extracting common archive formats""" | |
2 | |
3 import zipfile | |
4 import tarfile | |
5 import os | |
6 import shutil | |
7 import posixpath | |
8 import contextlib | |
9 from distutils.errors import DistutilsError | |
10 | |
11 from pkg_resources import ensure_directory | |
12 | |
13 __all__ = [ | |
14 "unpack_archive", "unpack_zipfile", "unpack_tarfile", "default_filter", | |
15 "UnrecognizedFormat", "extraction_drivers", "unpack_directory", | |
16 ] | |
17 | |
18 | |
19 class UnrecognizedFormat(DistutilsError): | |
20 """Couldn't recognize the archive type""" | |
21 | |
22 | |
23 def default_filter(src, dst): | |
24 """The default progress/filter callback; returns True for all files""" | |
25 return dst | |
26 | |
27 | |
28 def unpack_archive( | |
29 filename, extract_dir, progress_filter=default_filter, | |
30 drivers=None): | |
31 """Unpack `filename` to `extract_dir`, or raise ``UnrecognizedFormat`` | |
32 | |
33 `progress_filter` is a function taking two arguments: a source path | |
34 internal to the archive ('/'-separated), and a filesystem path where it | |
35 will be extracted. The callback must return the desired extract path | |
36 (which may be the same as the one passed in), or else ``None`` to skip | |
37 that file or directory. The callback can thus be used to report on the | |
38 progress of the extraction, as well as to filter the items extracted or | |
39 alter their extraction paths. | |
40 | |
41 `drivers`, if supplied, must be a non-empty sequence of functions with the | |
42 same signature as this function (minus the `drivers` argument), that raise | |
43 ``UnrecognizedFormat`` if they do not support extracting the designated | |
44 archive type. The `drivers` are tried in sequence until one is found that | |
45 does not raise an error, or until all are exhausted (in which case | |
46 ``UnrecognizedFormat`` is raised). If you do not supply a sequence of | |
47 drivers, the module's ``extraction_drivers`` constant will be used, which | |
48 means that ``unpack_zipfile`` and ``unpack_tarfile`` will be tried, in that | |
49 order. | |
50 """ | |
51 for driver in drivers or extraction_drivers: | |
52 try: | |
53 driver(filename, extract_dir, progress_filter) | |
54 except UnrecognizedFormat: | |
55 continue | |
56 else: | |
57 return | |
58 else: | |
59 raise UnrecognizedFormat( | |
60 "Not a recognized archive type: %s" % filename | |
61 ) | |
62 | |
63 | |
64 def unpack_directory(filename, extract_dir, progress_filter=default_filter): | |
65 """"Unpack" a directory, using the same interface as for archives | |
66 | |
67 Raises ``UnrecognizedFormat`` if `filename` is not a directory | |
68 """ | |
69 if not os.path.isdir(filename): | |
70 raise UnrecognizedFormat("%s is not a directory" % filename) | |
71 | |
72 paths = { | |
73 filename: ('', extract_dir), | |
74 } | |
75 for base, dirs, files in os.walk(filename): | |
76 src, dst = paths[base] | |
77 for d in dirs: | |
78 paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d) | |
79 for f in files: | |
80 target = os.path.join(dst, f) | |
81 target = progress_filter(src + f, target) | |
82 if not target: | |
83 # skip non-files | |
84 continue | |
85 ensure_directory(target) | |
86 f = os.path.join(base, f) | |
87 shutil.copyfile(f, target) | |
88 shutil.copystat(f, target) | |
89 | |
90 | |
91 def unpack_zipfile(filename, extract_dir, progress_filter=default_filter): | |
92 """Unpack zip `filename` to `extract_dir` | |
93 | |
94 Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined | |
95 by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation | |
96 of the `progress_filter` argument. | |
97 """ | |
98 | |
99 if not zipfile.is_zipfile(filename): | |
100 raise UnrecognizedFormat("%s is not a zip file" % (filename,)) | |
101 | |
102 with zipfile.ZipFile(filename) as z: | |
103 for info in z.infolist(): | |
104 name = info.filename | |
105 | |
106 # don't extract absolute paths or ones with .. in them | |
107 if name.startswith('/') or '..' in name.split('/'): | |
108 continue | |
109 | |
110 target = os.path.join(extract_dir, *name.split('/')) | |
111 target = progress_filter(name, target) | |
112 if not target: | |
113 continue | |
114 if name.endswith('/'): | |
115 # directory | |
116 ensure_directory(target) | |
117 else: | |
118 # file | |
119 ensure_directory(target) | |
120 data = z.read(info.filename) | |
121 with open(target, 'wb') as f: | |
122 f.write(data) | |
123 unix_attributes = info.external_attr >> 16 | |
124 if unix_attributes: | |
125 os.chmod(target, unix_attributes) | |
126 | |
127 | |
128 def _resolve_tar_file_or_dir(tar_obj, tar_member_obj): | |
129 """Resolve any links and extract link targets as normal files.""" | |
130 while tar_member_obj is not None and ( | |
131 tar_member_obj.islnk() or tar_member_obj.issym()): | |
132 linkpath = tar_member_obj.linkname | |
133 if tar_member_obj.issym(): | |
134 base = posixpath.dirname(tar_member_obj.name) | |
135 linkpath = posixpath.join(base, linkpath) | |
136 linkpath = posixpath.normpath(linkpath) | |
137 tar_member_obj = tar_obj._getmember(linkpath) | |
138 | |
139 is_file_or_dir = ( | |
140 tar_member_obj is not None and | |
141 (tar_member_obj.isfile() or tar_member_obj.isdir()) | |
142 ) | |
143 if is_file_or_dir: | |
144 return tar_member_obj | |
145 | |
146 raise LookupError('Got unknown file type') | |
147 | |
148 | |
149 def _iter_open_tar(tar_obj, extract_dir, progress_filter): | |
150 """Emit member-destination pairs from a tar archive.""" | |
151 # don't do any chowning! | |
152 tar_obj.chown = lambda *args: None | |
153 | |
154 with contextlib.closing(tar_obj): | |
155 for member in tar_obj: | |
156 name = member.name | |
157 # don't extract absolute paths or ones with .. in them | |
158 if name.startswith('/') or '..' in name.split('/'): | |
159 continue | |
160 | |
161 prelim_dst = os.path.join(extract_dir, *name.split('/')) | |
162 | |
163 try: | |
164 member = _resolve_tar_file_or_dir(tar_obj, member) | |
165 except LookupError: | |
166 continue | |
167 | |
168 final_dst = progress_filter(name, prelim_dst) | |
169 if not final_dst: | |
170 continue | |
171 | |
172 if final_dst.endswith(os.sep): | |
173 final_dst = final_dst[:-1] | |
174 | |
175 yield member, final_dst | |
176 | |
177 | |
178 def unpack_tarfile(filename, extract_dir, progress_filter=default_filter): | |
179 """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir` | |
180 | |
181 Raises ``UnrecognizedFormat`` if `filename` is not a tarfile (as determined | |
182 by ``tarfile.open()``). See ``unpack_archive()`` for an explanation | |
183 of the `progress_filter` argument. | |
184 """ | |
185 try: | |
186 tarobj = tarfile.open(filename) | |
187 except tarfile.TarError as e: | |
188 raise UnrecognizedFormat( | |
189 "%s is not a compressed or uncompressed tar file" % (filename,) | |
190 ) from e | |
191 | |
192 for member, final_dst in _iter_open_tar( | |
193 tarobj, extract_dir, progress_filter, | |
194 ): | |
195 try: | |
196 # XXX Ugh | |
197 tarobj._extract_member(member, final_dst) | |
198 except tarfile.ExtractError: | |
199 # chown/chmod/mkfifo/mknode/makedev failed | |
200 pass | |
201 | |
202 return True | |
203 | |
204 | |
205 extraction_drivers = unpack_directory, unpack_zipfile, unpack_tarfile |