comparison env/lib/python3.9/site-packages/distlib/_backport/shutil.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (C) 2012 The Python Software Foundation.
4 # See LICENSE.txt and CONTRIBUTORS.txt.
5 #
6 """Utility functions for copying and archiving files and directory trees.
7
8 XXX The functions here don't copy the resource fork or other metadata on Mac.
9
10 """
11
12 import os
13 import sys
14 import stat
15 from os.path import abspath
16 import fnmatch
17 try:
18 from collections.abc import Callable
19 except ImportError:
20 from collections import Callable
21 import errno
22 from . import tarfile
23
24 try:
25 import bz2
26 _BZ2_SUPPORTED = True
27 except ImportError:
28 _BZ2_SUPPORTED = False
29
30 try:
31 from pwd import getpwnam
32 except ImportError:
33 getpwnam = None
34
35 try:
36 from grp import getgrnam
37 except ImportError:
38 getgrnam = None
39
40 __all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2",
41 "copytree", "move", "rmtree", "Error", "SpecialFileError",
42 "ExecError", "make_archive", "get_archive_formats",
43 "register_archive_format", "unregister_archive_format",
44 "get_unpack_formats", "register_unpack_format",
45 "unregister_unpack_format", "unpack_archive", "ignore_patterns"]
46
47 class Error(EnvironmentError):
48 pass
49
50 class SpecialFileError(EnvironmentError):
51 """Raised when trying to do a kind of operation (e.g. copying) which is
52 not supported on a special file (e.g. a named pipe)"""
53
54 class ExecError(EnvironmentError):
55 """Raised when a command could not be executed"""
56
57 class ReadError(EnvironmentError):
58 """Raised when an archive cannot be read"""
59
60 class RegistryError(Exception):
61 """Raised when a registry operation with the archiving
62 and unpacking registries fails"""
63
64
65 try:
66 WindowsError
67 except NameError:
68 WindowsError = None
69
70 def copyfileobj(fsrc, fdst, length=16*1024):
71 """copy data from file-like object fsrc to file-like object fdst"""
72 while 1:
73 buf = fsrc.read(length)
74 if not buf:
75 break
76 fdst.write(buf)
77
78 def _samefile(src, dst):
79 # Macintosh, Unix.
80 if hasattr(os.path, 'samefile'):
81 try:
82 return os.path.samefile(src, dst)
83 except OSError:
84 return False
85
86 # All other platforms: check for same pathname.
87 return (os.path.normcase(os.path.abspath(src)) ==
88 os.path.normcase(os.path.abspath(dst)))
89
90 def copyfile(src, dst):
91 """Copy data from src to dst"""
92 if _samefile(src, dst):
93 raise Error("`%s` and `%s` are the same file" % (src, dst))
94
95 for fn in [src, dst]:
96 try:
97 st = os.stat(fn)
98 except OSError:
99 # File most likely does not exist
100 pass
101 else:
102 # XXX What about other special files? (sockets, devices...)
103 if stat.S_ISFIFO(st.st_mode):
104 raise SpecialFileError("`%s` is a named pipe" % fn)
105
106 with open(src, 'rb') as fsrc:
107 with open(dst, 'wb') as fdst:
108 copyfileobj(fsrc, fdst)
109
110 def copymode(src, dst):
111 """Copy mode bits from src to dst"""
112 if hasattr(os, 'chmod'):
113 st = os.stat(src)
114 mode = stat.S_IMODE(st.st_mode)
115 os.chmod(dst, mode)
116
117 def copystat(src, dst):
118 """Copy all stat info (mode bits, atime, mtime, flags) from src to dst"""
119 st = os.stat(src)
120 mode = stat.S_IMODE(st.st_mode)
121 if hasattr(os, 'utime'):
122 os.utime(dst, (st.st_atime, st.st_mtime))
123 if hasattr(os, 'chmod'):
124 os.chmod(dst, mode)
125 if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
126 try:
127 os.chflags(dst, st.st_flags)
128 except OSError as why:
129 if (not hasattr(errno, 'EOPNOTSUPP') or
130 why.errno != errno.EOPNOTSUPP):
131 raise
132
133 def copy(src, dst):
134 """Copy data and mode bits ("cp src dst").
135
136 The destination may be a directory.
137
138 """
139 if os.path.isdir(dst):
140 dst = os.path.join(dst, os.path.basename(src))
141 copyfile(src, dst)
142 copymode(src, dst)
143
144 def copy2(src, dst):
145 """Copy data and all stat info ("cp -p src dst").
146
147 The destination may be a directory.
148
149 """
150 if os.path.isdir(dst):
151 dst = os.path.join(dst, os.path.basename(src))
152 copyfile(src, dst)
153 copystat(src, dst)
154
155 def ignore_patterns(*patterns):
156 """Function that can be used as copytree() ignore parameter.
157
158 Patterns is a sequence of glob-style patterns
159 that are used to exclude files"""
160 def _ignore_patterns(path, names):
161 ignored_names = []
162 for pattern in patterns:
163 ignored_names.extend(fnmatch.filter(names, pattern))
164 return set(ignored_names)
165 return _ignore_patterns
166
167 def copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2,
168 ignore_dangling_symlinks=False):
169 """Recursively copy a directory tree.
170
171 The destination directory must not already exist.
172 If exception(s) occur, an Error is raised with a list of reasons.
173
174 If the optional symlinks flag is true, symbolic links in the
175 source tree result in symbolic links in the destination tree; if
176 it is false, the contents of the files pointed to by symbolic
177 links are copied. If the file pointed by the symlink doesn't
178 exist, an exception will be added in the list of errors raised in
179 an Error exception at the end of the copy process.
180
181 You can set the optional ignore_dangling_symlinks flag to true if you
182 want to silence this exception. Notice that this has no effect on
183 platforms that don't support os.symlink.
184
185 The optional ignore argument is a callable. If given, it
186 is called with the `src` parameter, which is the directory
187 being visited by copytree(), and `names` which is the list of
188 `src` contents, as returned by os.listdir():
189
190 callable(src, names) -> ignored_names
191
192 Since copytree() is called recursively, the callable will be
193 called once for each directory that is copied. It returns a
194 list of names relative to the `src` directory that should
195 not be copied.
196
197 The optional copy_function argument is a callable that will be used
198 to copy each file. It will be called with the source path and the
199 destination path as arguments. By default, copy2() is used, but any
200 function that supports the same signature (like copy()) can be used.
201
202 """
203 names = os.listdir(src)
204 if ignore is not None:
205 ignored_names = ignore(src, names)
206 else:
207 ignored_names = set()
208
209 os.makedirs(dst)
210 errors = []
211 for name in names:
212 if name in ignored_names:
213 continue
214 srcname = os.path.join(src, name)
215 dstname = os.path.join(dst, name)
216 try:
217 if os.path.islink(srcname):
218 linkto = os.readlink(srcname)
219 if symlinks:
220 os.symlink(linkto, dstname)
221 else:
222 # ignore dangling symlink if the flag is on
223 if not os.path.exists(linkto) and ignore_dangling_symlinks:
224 continue
225 # otherwise let the copy occurs. copy2 will raise an error
226 copy_function(srcname, dstname)
227 elif os.path.isdir(srcname):
228 copytree(srcname, dstname, symlinks, ignore, copy_function)
229 else:
230 # Will raise a SpecialFileError for unsupported file types
231 copy_function(srcname, dstname)
232 # catch the Error from the recursive copytree so that we can
233 # continue with other files
234 except Error as err:
235 errors.extend(err.args[0])
236 except EnvironmentError as why:
237 errors.append((srcname, dstname, str(why)))
238 try:
239 copystat(src, dst)
240 except OSError as why:
241 if WindowsError is not None and isinstance(why, WindowsError):
242 # Copying file access times may fail on Windows
243 pass
244 else:
245 errors.extend((src, dst, str(why)))
246 if errors:
247 raise Error(errors)
248
249 def rmtree(path, ignore_errors=False, onerror=None):
250 """Recursively delete a directory tree.
251
252 If ignore_errors is set, errors are ignored; otherwise, if onerror
253 is set, it is called to handle the error with arguments (func,
254 path, exc_info) where func is os.listdir, os.remove, or os.rmdir;
255 path is the argument to that function that caused it to fail; and
256 exc_info is a tuple returned by sys.exc_info(). If ignore_errors
257 is false and onerror is None, an exception is raised.
258
259 """
260 if ignore_errors:
261 def onerror(*args):
262 pass
263 elif onerror is None:
264 def onerror(*args):
265 raise
266 try:
267 if os.path.islink(path):
268 # symlinks to directories are forbidden, see bug #1669
269 raise OSError("Cannot call rmtree on a symbolic link")
270 except OSError:
271 onerror(os.path.islink, path, sys.exc_info())
272 # can't continue even if onerror hook returns
273 return
274 names = []
275 try:
276 names = os.listdir(path)
277 except os.error:
278 onerror(os.listdir, path, sys.exc_info())
279 for name in names:
280 fullname = os.path.join(path, name)
281 try:
282 mode = os.lstat(fullname).st_mode
283 except os.error:
284 mode = 0
285 if stat.S_ISDIR(mode):
286 rmtree(fullname, ignore_errors, onerror)
287 else:
288 try:
289 os.remove(fullname)
290 except os.error:
291 onerror(os.remove, fullname, sys.exc_info())
292 try:
293 os.rmdir(path)
294 except os.error:
295 onerror(os.rmdir, path, sys.exc_info())
296
297
298 def _basename(path):
299 # A basename() variant which first strips the trailing slash, if present.
300 # Thus we always get the last component of the path, even for directories.
301 return os.path.basename(path.rstrip(os.path.sep))
302
303 def move(src, dst):
304 """Recursively move a file or directory to another location. This is
305 similar to the Unix "mv" command.
306
307 If the destination is a directory or a symlink to a directory, the source
308 is moved inside the directory. The destination path must not already
309 exist.
310
311 If the destination already exists but is not a directory, it may be
312 overwritten depending on os.rename() semantics.
313
314 If the destination is on our current filesystem, then rename() is used.
315 Otherwise, src is copied to the destination and then removed.
316 A lot more could be done here... A look at a mv.c shows a lot of
317 the issues this implementation glosses over.
318
319 """
320 real_dst = dst
321 if os.path.isdir(dst):
322 if _samefile(src, dst):
323 # We might be on a case insensitive filesystem,
324 # perform the rename anyway.
325 os.rename(src, dst)
326 return
327
328 real_dst = os.path.join(dst, _basename(src))
329 if os.path.exists(real_dst):
330 raise Error("Destination path '%s' already exists" % real_dst)
331 try:
332 os.rename(src, real_dst)
333 except OSError:
334 if os.path.isdir(src):
335 if _destinsrc(src, dst):
336 raise Error("Cannot move a directory '%s' into itself '%s'." % (src, dst))
337 copytree(src, real_dst, symlinks=True)
338 rmtree(src)
339 else:
340 copy2(src, real_dst)
341 os.unlink(src)
342
343 def _destinsrc(src, dst):
344 src = abspath(src)
345 dst = abspath(dst)
346 if not src.endswith(os.path.sep):
347 src += os.path.sep
348 if not dst.endswith(os.path.sep):
349 dst += os.path.sep
350 return dst.startswith(src)
351
352 def _get_gid(name):
353 """Returns a gid, given a group name."""
354 if getgrnam is None or name is None:
355 return None
356 try:
357 result = getgrnam(name)
358 except KeyError:
359 result = None
360 if result is not None:
361 return result[2]
362 return None
363
364 def _get_uid(name):
365 """Returns an uid, given a user name."""
366 if getpwnam is None or name is None:
367 return None
368 try:
369 result = getpwnam(name)
370 except KeyError:
371 result = None
372 if result is not None:
373 return result[2]
374 return None
375
376 def _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0,
377 owner=None, group=None, logger=None):
378 """Create a (possibly compressed) tar file from all the files under
379 'base_dir'.
380
381 'compress' must be "gzip" (the default), "bzip2", or None.
382
383 'owner' and 'group' can be used to define an owner and a group for the
384 archive that is being built. If not provided, the current owner and group
385 will be used.
386
387 The output tar file will be named 'base_name' + ".tar", possibly plus
388 the appropriate compression extension (".gz", or ".bz2").
389
390 Returns the output filename.
391 """
392 tar_compression = {'gzip': 'gz', None: ''}
393 compress_ext = {'gzip': '.gz'}
394
395 if _BZ2_SUPPORTED:
396 tar_compression['bzip2'] = 'bz2'
397 compress_ext['bzip2'] = '.bz2'
398
399 # flags for compression program, each element of list will be an argument
400 if compress is not None and compress not in compress_ext:
401 raise ValueError("bad value for 'compress', or compression format not "
402 "supported : {0}".format(compress))
403
404 archive_name = base_name + '.tar' + compress_ext.get(compress, '')
405 archive_dir = os.path.dirname(archive_name)
406
407 if not os.path.exists(archive_dir):
408 if logger is not None:
409 logger.info("creating %s", archive_dir)
410 if not dry_run:
411 os.makedirs(archive_dir)
412
413 # creating the tarball
414 if logger is not None:
415 logger.info('Creating tar archive')
416
417 uid = _get_uid(owner)
418 gid = _get_gid(group)
419
420 def _set_uid_gid(tarinfo):
421 if gid is not None:
422 tarinfo.gid = gid
423 tarinfo.gname = group
424 if uid is not None:
425 tarinfo.uid = uid
426 tarinfo.uname = owner
427 return tarinfo
428
429 if not dry_run:
430 tar = tarfile.open(archive_name, 'w|%s' % tar_compression[compress])
431 try:
432 tar.add(base_dir, filter=_set_uid_gid)
433 finally:
434 tar.close()
435
436 return archive_name
437
438 def _call_external_zip(base_dir, zip_filename, verbose=False, dry_run=False):
439 # XXX see if we want to keep an external call here
440 if verbose:
441 zipoptions = "-r"
442 else:
443 zipoptions = "-rq"
444 from distutils.errors import DistutilsExecError
445 from distutils.spawn import spawn
446 try:
447 spawn(["zip", zipoptions, zip_filename, base_dir], dry_run=dry_run)
448 except DistutilsExecError:
449 # XXX really should distinguish between "couldn't find
450 # external 'zip' command" and "zip failed".
451 raise ExecError("unable to create zip file '%s': "
452 "could neither import the 'zipfile' module nor "
453 "find a standalone zip utility") % zip_filename
454
455 def _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, logger=None):
456 """Create a zip file from all the files under 'base_dir'.
457
458 The output zip file will be named 'base_name' + ".zip". Uses either the
459 "zipfile" Python module (if available) or the InfoZIP "zip" utility
460 (if installed and found on the default search path). If neither tool is
461 available, raises ExecError. Returns the name of the output zip
462 file.
463 """
464 zip_filename = base_name + ".zip"
465 archive_dir = os.path.dirname(base_name)
466
467 if not os.path.exists(archive_dir):
468 if logger is not None:
469 logger.info("creating %s", archive_dir)
470 if not dry_run:
471 os.makedirs(archive_dir)
472
473 # If zipfile module is not available, try spawning an external 'zip'
474 # command.
475 try:
476 import zipfile
477 except ImportError:
478 zipfile = None
479
480 if zipfile is None:
481 _call_external_zip(base_dir, zip_filename, verbose, dry_run)
482 else:
483 if logger is not None:
484 logger.info("creating '%s' and adding '%s' to it",
485 zip_filename, base_dir)
486
487 if not dry_run:
488 zip = zipfile.ZipFile(zip_filename, "w",
489 compression=zipfile.ZIP_DEFLATED)
490
491 for dirpath, dirnames, filenames in os.walk(base_dir):
492 for name in filenames:
493 path = os.path.normpath(os.path.join(dirpath, name))
494 if os.path.isfile(path):
495 zip.write(path, path)
496 if logger is not None:
497 logger.info("adding '%s'", path)
498 zip.close()
499
500 return zip_filename
501
502 _ARCHIVE_FORMATS = {
503 'gztar': (_make_tarball, [('compress', 'gzip')], "gzip'ed tar-file"),
504 'bztar': (_make_tarball, [('compress', 'bzip2')], "bzip2'ed tar-file"),
505 'tar': (_make_tarball, [('compress', None)], "uncompressed tar file"),
506 'zip': (_make_zipfile, [], "ZIP file"),
507 }
508
509 if _BZ2_SUPPORTED:
510 _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')],
511 "bzip2'ed tar-file")
512
513 def get_archive_formats():
514 """Returns a list of supported formats for archiving and unarchiving.
515
516 Each element of the returned sequence is a tuple (name, description)
517 """
518 formats = [(name, registry[2]) for name, registry in
519 _ARCHIVE_FORMATS.items()]
520 formats.sort()
521 return formats
522
523 def register_archive_format(name, function, extra_args=None, description=''):
524 """Registers an archive format.
525
526 name is the name of the format. function is the callable that will be
527 used to create archives. If provided, extra_args is a sequence of
528 (name, value) tuples that will be passed as arguments to the callable.
529 description can be provided to describe the format, and will be returned
530 by the get_archive_formats() function.
531 """
532 if extra_args is None:
533 extra_args = []
534 if not isinstance(function, Callable):
535 raise TypeError('The %s object is not callable' % function)
536 if not isinstance(extra_args, (tuple, list)):
537 raise TypeError('extra_args needs to be a sequence')
538 for element in extra_args:
539 if not isinstance(element, (tuple, list)) or len(element) !=2:
540 raise TypeError('extra_args elements are : (arg_name, value)')
541
542 _ARCHIVE_FORMATS[name] = (function, extra_args, description)
543
544 def unregister_archive_format(name):
545 del _ARCHIVE_FORMATS[name]
546
547 def make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0,
548 dry_run=0, owner=None, group=None, logger=None):
549 """Create an archive file (eg. zip or tar).
550
551 'base_name' is the name of the file to create, minus any format-specific
552 extension; 'format' is the archive format: one of "zip", "tar", "bztar"
553 or "gztar".
554
555 'root_dir' is a directory that will be the root directory of the
556 archive; ie. we typically chdir into 'root_dir' before creating the
557 archive. 'base_dir' is the directory where we start archiving from;
558 ie. 'base_dir' will be the common prefix of all files and
559 directories in the archive. 'root_dir' and 'base_dir' both default
560 to the current directory. Returns the name of the archive file.
561
562 'owner' and 'group' are used when creating a tar archive. By default,
563 uses the current owner and group.
564 """
565 save_cwd = os.getcwd()
566 if root_dir is not None:
567 if logger is not None:
568 logger.debug("changing into '%s'", root_dir)
569 base_name = os.path.abspath(base_name)
570 if not dry_run:
571 os.chdir(root_dir)
572
573 if base_dir is None:
574 base_dir = os.curdir
575
576 kwargs = {'dry_run': dry_run, 'logger': logger}
577
578 try:
579 format_info = _ARCHIVE_FORMATS[format]
580 except KeyError:
581 raise ValueError("unknown archive format '%s'" % format)
582
583 func = format_info[0]
584 for arg, val in format_info[1]:
585 kwargs[arg] = val
586
587 if format != 'zip':
588 kwargs['owner'] = owner
589 kwargs['group'] = group
590
591 try:
592 filename = func(base_name, base_dir, **kwargs)
593 finally:
594 if root_dir is not None:
595 if logger is not None:
596 logger.debug("changing back to '%s'", save_cwd)
597 os.chdir(save_cwd)
598
599 return filename
600
601
602 def get_unpack_formats():
603 """Returns a list of supported formats for unpacking.
604
605 Each element of the returned sequence is a tuple
606 (name, extensions, description)
607 """
608 formats = [(name, info[0], info[3]) for name, info in
609 _UNPACK_FORMATS.items()]
610 formats.sort()
611 return formats
612
613 def _check_unpack_options(extensions, function, extra_args):
614 """Checks what gets registered as an unpacker."""
615 # first make sure no other unpacker is registered for this extension
616 existing_extensions = {}
617 for name, info in _UNPACK_FORMATS.items():
618 for ext in info[0]:
619 existing_extensions[ext] = name
620
621 for extension in extensions:
622 if extension in existing_extensions:
623 msg = '%s is already registered for "%s"'
624 raise RegistryError(msg % (extension,
625 existing_extensions[extension]))
626
627 if not isinstance(function, Callable):
628 raise TypeError('The registered function must be a callable')
629
630
631 def register_unpack_format(name, extensions, function, extra_args=None,
632 description=''):
633 """Registers an unpack format.
634
635 `name` is the name of the format. `extensions` is a list of extensions
636 corresponding to the format.
637
638 `function` is the callable that will be
639 used to unpack archives. The callable will receive archives to unpack.
640 If it's unable to handle an archive, it needs to raise a ReadError
641 exception.
642
643 If provided, `extra_args` is a sequence of
644 (name, value) tuples that will be passed as arguments to the callable.
645 description can be provided to describe the format, and will be returned
646 by the get_unpack_formats() function.
647 """
648 if extra_args is None:
649 extra_args = []
650 _check_unpack_options(extensions, function, extra_args)
651 _UNPACK_FORMATS[name] = extensions, function, extra_args, description
652
653 def unregister_unpack_format(name):
654 """Removes the pack format from the registry."""
655 del _UNPACK_FORMATS[name]
656
657 def _ensure_directory(path):
658 """Ensure that the parent directory of `path` exists"""
659 dirname = os.path.dirname(path)
660 if not os.path.isdir(dirname):
661 os.makedirs(dirname)
662
663 def _unpack_zipfile(filename, extract_dir):
664 """Unpack zip `filename` to `extract_dir`
665 """
666 try:
667 import zipfile
668 except ImportError:
669 raise ReadError('zlib not supported, cannot unpack this archive.')
670
671 if not zipfile.is_zipfile(filename):
672 raise ReadError("%s is not a zip file" % filename)
673
674 zip = zipfile.ZipFile(filename)
675 try:
676 for info in zip.infolist():
677 name = info.filename
678
679 # don't extract absolute paths or ones with .. in them
680 if name.startswith('/') or '..' in name:
681 continue
682
683 target = os.path.join(extract_dir, *name.split('/'))
684 if not target:
685 continue
686
687 _ensure_directory(target)
688 if not name.endswith('/'):
689 # file
690 data = zip.read(info.filename)
691 f = open(target, 'wb')
692 try:
693 f.write(data)
694 finally:
695 f.close()
696 del data
697 finally:
698 zip.close()
699
700 def _unpack_tarfile(filename, extract_dir):
701 """Unpack tar/tar.gz/tar.bz2 `filename` to `extract_dir`
702 """
703 try:
704 tarobj = tarfile.open(filename)
705 except tarfile.TarError:
706 raise ReadError(
707 "%s is not a compressed or uncompressed tar file" % filename)
708 try:
709 tarobj.extractall(extract_dir)
710 finally:
711 tarobj.close()
712
713 _UNPACK_FORMATS = {
714 'gztar': (['.tar.gz', '.tgz'], _unpack_tarfile, [], "gzip'ed tar-file"),
715 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"),
716 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file")
717 }
718
719 if _BZ2_SUPPORTED:
720 _UNPACK_FORMATS['bztar'] = (['.bz2'], _unpack_tarfile, [],
721 "bzip2'ed tar-file")
722
723 def _find_unpack_format(filename):
724 for name, info in _UNPACK_FORMATS.items():
725 for extension in info[0]:
726 if filename.endswith(extension):
727 return name
728 return None
729
730 def unpack_archive(filename, extract_dir=None, format=None):
731 """Unpack an archive.
732
733 `filename` is the name of the archive.
734
735 `extract_dir` is the name of the target directory, where the archive
736 is unpacked. If not provided, the current working directory is used.
737
738 `format` is the archive format: one of "zip", "tar", or "gztar". Or any
739 other registered format. If not provided, unpack_archive will use the
740 filename extension and see if an unpacker was registered for that
741 extension.
742
743 In case none is found, a ValueError is raised.
744 """
745 if extract_dir is None:
746 extract_dir = os.getcwd()
747
748 if format is not None:
749 try:
750 format_info = _UNPACK_FORMATS[format]
751 except KeyError:
752 raise ValueError("Unknown unpack format '{0}'".format(format))
753
754 func = format_info[1]
755 func(filename, extract_dir, **dict(format_info[2]))
756 else:
757 # we need to look at the registered unpackers supported extensions
758 format = _find_unpack_format(filename)
759 if format is None:
760 raise ReadError("Unknown archive format '{0}'".format(filename))
761
762 func = _UNPACK_FORMATS[format][1]
763 kwargs = dict(_UNPACK_FORMATS[format][2])
764 func(filename, extract_dir, **kwargs)