diff env/lib/python3.9/site-packages/click/_compat.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/click/_compat.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,786 @@
+# flake8: noqa
+import codecs
+import io
+import os
+import re
+import sys
+from weakref import WeakKeyDictionary
+
+PY2 = sys.version_info[0] == 2
+CYGWIN = sys.platform.startswith("cygwin")
+MSYS2 = sys.platform.startswith("win") and ("GCC" in sys.version)
+# Determine local App Engine environment, per Google's own suggestion
+APP_ENGINE = "APPENGINE_RUNTIME" in os.environ and "Development/" in os.environ.get(
+    "SERVER_SOFTWARE", ""
+)
+WIN = sys.platform.startswith("win") and not APP_ENGINE and not MSYS2
+DEFAULT_COLUMNS = 80
+
+
+_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]")
+
+
+def get_filesystem_encoding():
+    return sys.getfilesystemencoding() or sys.getdefaultencoding()
+
+
+def _make_text_stream(
+    stream, encoding, errors, force_readable=False, force_writable=False
+):
+    if encoding is None:
+        encoding = get_best_encoding(stream)
+    if errors is None:
+        errors = "replace"
+    return _NonClosingTextIOWrapper(
+        stream,
+        encoding,
+        errors,
+        line_buffering=True,
+        force_readable=force_readable,
+        force_writable=force_writable,
+    )
+
+
+def is_ascii_encoding(encoding):
+    """Checks if a given encoding is ascii."""
+    try:
+        return codecs.lookup(encoding).name == "ascii"
+    except LookupError:
+        return False
+
+
+def get_best_encoding(stream):
+    """Returns the default stream encoding if not found."""
+    rv = getattr(stream, "encoding", None) or sys.getdefaultencoding()
+    if is_ascii_encoding(rv):
+        return "utf-8"
+    return rv
+
+
+class _NonClosingTextIOWrapper(io.TextIOWrapper):
+    def __init__(
+        self,
+        stream,
+        encoding,
+        errors,
+        force_readable=False,
+        force_writable=False,
+        **extra
+    ):
+        self._stream = stream = _FixupStream(stream, force_readable, force_writable)
+        io.TextIOWrapper.__init__(self, stream, encoding, errors, **extra)
+
+    # The io module is a place where the Python 3 text behavior
+    # was forced upon Python 2, so we need to unbreak
+    # it to look like Python 2.
+    if PY2:
+
+        def write(self, x):
+            if isinstance(x, str) or is_bytes(x):
+                try:
+                    self.flush()
+                except Exception:
+                    pass
+                return self.buffer.write(str(x))
+            return io.TextIOWrapper.write(self, x)
+
+        def writelines(self, lines):
+            for line in lines:
+                self.write(line)
+
+    def __del__(self):
+        try:
+            self.detach()
+        except Exception:
+            pass
+
+    def isatty(self):
+        # https://bitbucket.org/pypy/pypy/issue/1803
+        return self._stream.isatty()
+
+
+class _FixupStream(object):
+    """The new io interface needs more from streams than streams
+    traditionally implement.  As such, this fix-up code is necessary in
+    some circumstances.
+
+    The forcing of readable and writable flags are there because some tools
+    put badly patched objects on sys (one such offender are certain version
+    of jupyter notebook).
+    """
+
+    def __init__(self, stream, force_readable=False, force_writable=False):
+        self._stream = stream
+        self._force_readable = force_readable
+        self._force_writable = force_writable
+
+    def __getattr__(self, name):
+        return getattr(self._stream, name)
+
+    def read1(self, size):
+        f = getattr(self._stream, "read1", None)
+        if f is not None:
+            return f(size)
+        # We only dispatch to readline instead of read in Python 2 as we
+        # do not want cause problems with the different implementation
+        # of line buffering.
+        if PY2:
+            return self._stream.readline(size)
+        return self._stream.read(size)
+
+    def readable(self):
+        if self._force_readable:
+            return True
+        x = getattr(self._stream, "readable", None)
+        if x is not None:
+            return x()
+        try:
+            self._stream.read(0)
+        except Exception:
+            return False
+        return True
+
+    def writable(self):
+        if self._force_writable:
+            return True
+        x = getattr(self._stream, "writable", None)
+        if x is not None:
+            return x()
+        try:
+            self._stream.write("")
+        except Exception:
+            try:
+                self._stream.write(b"")
+            except Exception:
+                return False
+        return True
+
+    def seekable(self):
+        x = getattr(self._stream, "seekable", None)
+        if x is not None:
+            return x()
+        try:
+            self._stream.seek(self._stream.tell())
+        except Exception:
+            return False
+        return True
+
+
+if PY2:
+    text_type = unicode
+    raw_input = raw_input
+    string_types = (str, unicode)
+    int_types = (int, long)
+    iteritems = lambda x: x.iteritems()
+    range_type = xrange
+
+    def is_bytes(x):
+        return isinstance(x, (buffer, bytearray))
+
+    _identifier_re = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
+
+    # For Windows, we need to force stdout/stdin/stderr to binary if it's
+    # fetched for that.  This obviously is not the most correct way to do
+    # it as it changes global state.  Unfortunately, there does not seem to
+    # be a clear better way to do it as just reopening the file in binary
+    # mode does not change anything.
+    #
+    # An option would be to do what Python 3 does and to open the file as
+    # binary only, patch it back to the system, and then use a wrapper
+    # stream that converts newlines.  It's not quite clear what's the
+    # correct option here.
+    #
+    # This code also lives in _winconsole for the fallback to the console
+    # emulation stream.
+    #
+    # There are also Windows environments where the `msvcrt` module is not
+    # available (which is why we use try-catch instead of the WIN variable
+    # here), such as the Google App Engine development server on Windows. In
+    # those cases there is just nothing we can do.
+    def set_binary_mode(f):
+        return f
+
+    try:
+        import msvcrt
+    except ImportError:
+        pass
+    else:
+
+        def set_binary_mode(f):
+            try:
+                fileno = f.fileno()
+            except Exception:
+                pass
+            else:
+                msvcrt.setmode(fileno, os.O_BINARY)
+            return f
+
+    try:
+        import fcntl
+    except ImportError:
+        pass
+    else:
+
+        def set_binary_mode(f):
+            try:
+                fileno = f.fileno()
+            except Exception:
+                pass
+            else:
+                flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
+                fcntl.fcntl(fileno, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
+            return f
+
+    def isidentifier(x):
+        return _identifier_re.search(x) is not None
+
+    def get_binary_stdin():
+        return set_binary_mode(sys.stdin)
+
+    def get_binary_stdout():
+        _wrap_std_stream("stdout")
+        return set_binary_mode(sys.stdout)
+
+    def get_binary_stderr():
+        _wrap_std_stream("stderr")
+        return set_binary_mode(sys.stderr)
+
+    def get_text_stdin(encoding=None, errors=None):
+        rv = _get_windows_console_stream(sys.stdin, encoding, errors)
+        if rv is not None:
+            return rv
+        return _make_text_stream(sys.stdin, encoding, errors, force_readable=True)
+
+    def get_text_stdout(encoding=None, errors=None):
+        _wrap_std_stream("stdout")
+        rv = _get_windows_console_stream(sys.stdout, encoding, errors)
+        if rv is not None:
+            return rv
+        return _make_text_stream(sys.stdout, encoding, errors, force_writable=True)
+
+    def get_text_stderr(encoding=None, errors=None):
+        _wrap_std_stream("stderr")
+        rv = _get_windows_console_stream(sys.stderr, encoding, errors)
+        if rv is not None:
+            return rv
+        return _make_text_stream(sys.stderr, encoding, errors, force_writable=True)
+
+    def filename_to_ui(value):
+        if isinstance(value, bytes):
+            value = value.decode(get_filesystem_encoding(), "replace")
+        return value
+
+
+else:
+    import io
+
+    text_type = str
+    raw_input = input
+    string_types = (str,)
+    int_types = (int,)
+    range_type = range
+    isidentifier = lambda x: x.isidentifier()
+    iteritems = lambda x: iter(x.items())
+
+    def is_bytes(x):
+        return isinstance(x, (bytes, memoryview, bytearray))
+
+    def _is_binary_reader(stream, default=False):
+        try:
+            return isinstance(stream.read(0), bytes)
+        except Exception:
+            return default
+            # This happens in some cases where the stream was already
+            # closed.  In this case, we assume the default.
+
+    def _is_binary_writer(stream, default=False):
+        try:
+            stream.write(b"")
+        except Exception:
+            try:
+                stream.write("")
+                return False
+            except Exception:
+                pass
+            return default
+        return True
+
+    def _find_binary_reader(stream):
+        # We need to figure out if the given stream is already binary.
+        # This can happen because the official docs recommend detaching
+        # the streams to get binary streams.  Some code might do this, so
+        # we need to deal with this case explicitly.
+        if _is_binary_reader(stream, False):
+            return stream
+
+        buf = getattr(stream, "buffer", None)
+
+        # Same situation here; this time we assume that the buffer is
+        # actually binary in case it's closed.
+        if buf is not None and _is_binary_reader(buf, True):
+            return buf
+
+    def _find_binary_writer(stream):
+        # We need to figure out if the given stream is already binary.
+        # This can happen because the official docs recommend detatching
+        # the streams to get binary streams.  Some code might do this, so
+        # we need to deal with this case explicitly.
+        if _is_binary_writer(stream, False):
+            return stream
+
+        buf = getattr(stream, "buffer", None)
+
+        # Same situation here; this time we assume that the buffer is
+        # actually binary in case it's closed.
+        if buf is not None and _is_binary_writer(buf, True):
+            return buf
+
+    def _stream_is_misconfigured(stream):
+        """A stream is misconfigured if its encoding is ASCII."""
+        # If the stream does not have an encoding set, we assume it's set
+        # to ASCII.  This appears to happen in certain unittest
+        # environments.  It's not quite clear what the correct behavior is
+        # but this at least will force Click to recover somehow.
+        return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii")
+
+    def _is_compat_stream_attr(stream, attr, value):
+        """A stream attribute is compatible if it is equal to the
+        desired value or the desired value is unset and the attribute
+        has a value.
+        """
+        stream_value = getattr(stream, attr, None)
+        return stream_value == value or (value is None and stream_value is not None)
+
+    def _is_compatible_text_stream(stream, encoding, errors):
+        """Check if a stream's encoding and errors attributes are
+        compatible with the desired values.
+        """
+        return _is_compat_stream_attr(
+            stream, "encoding", encoding
+        ) and _is_compat_stream_attr(stream, "errors", errors)
+
+    def _force_correct_text_stream(
+        text_stream,
+        encoding,
+        errors,
+        is_binary,
+        find_binary,
+        force_readable=False,
+        force_writable=False,
+    ):
+        if is_binary(text_stream, False):
+            binary_reader = text_stream
+        else:
+            # If the stream looks compatible, and won't default to a
+            # misconfigured ascii encoding, return it as-is.
+            if _is_compatible_text_stream(text_stream, encoding, errors) and not (
+                encoding is None and _stream_is_misconfigured(text_stream)
+            ):
+                return text_stream
+
+            # Otherwise, get the underlying binary reader.
+            binary_reader = find_binary(text_stream)
+
+            # If that's not possible, silently use the original reader
+            # and get mojibake instead of exceptions.
+            if binary_reader is None:
+                return text_stream
+
+        # Default errors to replace instead of strict in order to get
+        # something that works.
+        if errors is None:
+            errors = "replace"
+
+        # Wrap the binary stream in a text stream with the correct
+        # encoding parameters.
+        return _make_text_stream(
+            binary_reader,
+            encoding,
+            errors,
+            force_readable=force_readable,
+            force_writable=force_writable,
+        )
+
+    def _force_correct_text_reader(text_reader, encoding, errors, force_readable=False):
+        return _force_correct_text_stream(
+            text_reader,
+            encoding,
+            errors,
+            _is_binary_reader,
+            _find_binary_reader,
+            force_readable=force_readable,
+        )
+
+    def _force_correct_text_writer(text_writer, encoding, errors, force_writable=False):
+        return _force_correct_text_stream(
+            text_writer,
+            encoding,
+            errors,
+            _is_binary_writer,
+            _find_binary_writer,
+            force_writable=force_writable,
+        )
+
+    def get_binary_stdin():
+        reader = _find_binary_reader(sys.stdin)
+        if reader is None:
+            raise RuntimeError("Was not able to determine binary stream for sys.stdin.")
+        return reader
+
+    def get_binary_stdout():
+        writer = _find_binary_writer(sys.stdout)
+        if writer is None:
+            raise RuntimeError(
+                "Was not able to determine binary stream for sys.stdout."
+            )
+        return writer
+
+    def get_binary_stderr():
+        writer = _find_binary_writer(sys.stderr)
+        if writer is None:
+            raise RuntimeError(
+                "Was not able to determine binary stream for sys.stderr."
+            )
+        return writer
+
+    def get_text_stdin(encoding=None, errors=None):
+        rv = _get_windows_console_stream(sys.stdin, encoding, errors)
+        if rv is not None:
+            return rv
+        return _force_correct_text_reader(
+            sys.stdin, encoding, errors, force_readable=True
+        )
+
+    def get_text_stdout(encoding=None, errors=None):
+        rv = _get_windows_console_stream(sys.stdout, encoding, errors)
+        if rv is not None:
+            return rv
+        return _force_correct_text_writer(
+            sys.stdout, encoding, errors, force_writable=True
+        )
+
+    def get_text_stderr(encoding=None, errors=None):
+        rv = _get_windows_console_stream(sys.stderr, encoding, errors)
+        if rv is not None:
+            return rv
+        return _force_correct_text_writer(
+            sys.stderr, encoding, errors, force_writable=True
+        )
+
+    def filename_to_ui(value):
+        if isinstance(value, bytes):
+            value = value.decode(get_filesystem_encoding(), "replace")
+        else:
+            value = value.encode("utf-8", "surrogateescape").decode("utf-8", "replace")
+        return value
+
+
+def get_streerror(e, default=None):
+    if hasattr(e, "strerror"):
+        msg = e.strerror
+    else:
+        if default is not None:
+            msg = default
+        else:
+            msg = str(e)
+    if isinstance(msg, bytes):
+        msg = msg.decode("utf-8", "replace")
+    return msg
+
+
+def _wrap_io_open(file, mode, encoding, errors):
+    """On Python 2, :func:`io.open` returns a text file wrapper that
+    requires passing ``unicode`` to ``write``. Need to open the file in
+    binary mode then wrap it in a subclass that can write ``str`` and
+    ``unicode``.
+
+    Also handles not passing ``encoding`` and ``errors`` in binary mode.
+    """
+    binary = "b" in mode
+
+    if binary:
+        kwargs = {}
+    else:
+        kwargs = {"encoding": encoding, "errors": errors}
+
+    if not PY2 or binary:
+        return io.open(file, mode, **kwargs)
+
+    f = io.open(file, "{}b".format(mode.replace("t", "")))
+    return _make_text_stream(f, **kwargs)
+
+
+def open_stream(filename, mode="r", encoding=None, errors="strict", atomic=False):
+    binary = "b" in mode
+
+    # Standard streams first.  These are simple because they don't need
+    # special handling for the atomic flag.  It's entirely ignored.
+    if filename == "-":
+        if any(m in mode for m in ["w", "a", "x"]):
+            if binary:
+                return get_binary_stdout(), False
+            return get_text_stdout(encoding=encoding, errors=errors), False
+        if binary:
+            return get_binary_stdin(), False
+        return get_text_stdin(encoding=encoding, errors=errors), False
+
+    # Non-atomic writes directly go out through the regular open functions.
+    if not atomic:
+        return _wrap_io_open(filename, mode, encoding, errors), True
+
+    # Some usability stuff for atomic writes
+    if "a" in mode:
+        raise ValueError(
+            "Appending to an existing file is not supported, because that"
+            " would involve an expensive `copy`-operation to a temporary"
+            " file. Open the file in normal `w`-mode and copy explicitly"
+            " if that's what you're after."
+        )
+    if "x" in mode:
+        raise ValueError("Use the `overwrite`-parameter instead.")
+    if "w" not in mode:
+        raise ValueError("Atomic writes only make sense with `w`-mode.")
+
+    # Atomic writes are more complicated.  They work by opening a file
+    # as a proxy in the same folder and then using the fdopen
+    # functionality to wrap it in a Python file.  Then we wrap it in an
+    # atomic file that moves the file over on close.
+    import errno
+    import random
+
+    try:
+        perm = os.stat(filename).st_mode
+    except OSError:
+        perm = None
+
+    flags = os.O_RDWR | os.O_CREAT | os.O_EXCL
+
+    if binary:
+        flags |= getattr(os, "O_BINARY", 0)
+
+    while True:
+        tmp_filename = os.path.join(
+            os.path.dirname(filename),
+            ".__atomic-write{:08x}".format(random.randrange(1 << 32)),
+        )
+        try:
+            fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm)
+            break
+        except OSError as e:
+            if e.errno == errno.EEXIST or (
+                os.name == "nt"
+                and e.errno == errno.EACCES
+                and os.path.isdir(e.filename)
+                and os.access(e.filename, os.W_OK)
+            ):
+                continue
+            raise
+
+    if perm is not None:
+        os.chmod(tmp_filename, perm)  # in case perm includes bits in umask
+
+    f = _wrap_io_open(fd, mode, encoding, errors)
+    return _AtomicFile(f, tmp_filename, os.path.realpath(filename)), True
+
+
+# Used in a destructor call, needs extra protection from interpreter cleanup.
+if hasattr(os, "replace"):
+    _replace = os.replace
+    _can_replace = True
+else:
+    _replace = os.rename
+    _can_replace = not WIN
+
+
+class _AtomicFile(object):
+    def __init__(self, f, tmp_filename, real_filename):
+        self._f = f
+        self._tmp_filename = tmp_filename
+        self._real_filename = real_filename
+        self.closed = False
+
+    @property
+    def name(self):
+        return self._real_filename
+
+    def close(self, delete=False):
+        if self.closed:
+            return
+        self._f.close()
+        if not _can_replace:
+            try:
+                os.remove(self._real_filename)
+            except OSError:
+                pass
+        _replace(self._tmp_filename, self._real_filename)
+        self.closed = True
+
+    def __getattr__(self, name):
+        return getattr(self._f, name)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, tb):
+        self.close(delete=exc_type is not None)
+
+    def __repr__(self):
+        return repr(self._f)
+
+
+auto_wrap_for_ansi = None
+colorama = None
+get_winterm_size = None
+
+
+def strip_ansi(value):
+    return _ansi_re.sub("", value)
+
+
+def _is_jupyter_kernel_output(stream):
+    if WIN:
+        # TODO: Couldn't test on Windows, should't try to support until
+        # someone tests the details wrt colorama.
+        return
+
+    while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)):
+        stream = stream._stream
+
+    return stream.__class__.__module__.startswith("ipykernel.")
+
+
+def should_strip_ansi(stream=None, color=None):
+    if color is None:
+        if stream is None:
+            stream = sys.stdin
+        return not isatty(stream) and not _is_jupyter_kernel_output(stream)
+    return not color
+
+
+# If we're on Windows, we provide transparent integration through
+# colorama.  This will make ANSI colors through the echo function
+# work automatically.
+if WIN:
+    # Windows has a smaller terminal
+    DEFAULT_COLUMNS = 79
+
+    from ._winconsole import _get_windows_console_stream, _wrap_std_stream
+
+    def _get_argv_encoding():
+        import locale
+
+        return locale.getpreferredencoding()
+
+    if PY2:
+
+        def raw_input(prompt=""):
+            sys.stderr.flush()
+            if prompt:
+                stdout = _default_text_stdout()
+                stdout.write(prompt)
+            stdin = _default_text_stdin()
+            return stdin.readline().rstrip("\r\n")
+
+    try:
+        import colorama
+    except ImportError:
+        pass
+    else:
+        _ansi_stream_wrappers = WeakKeyDictionary()
+
+        def auto_wrap_for_ansi(stream, color=None):
+            """This function wraps a stream so that calls through colorama
+            are issued to the win32 console API to recolor on demand.  It
+            also ensures to reset the colors if a write call is interrupted
+            to not destroy the console afterwards.
+            """
+            try:
+                cached = _ansi_stream_wrappers.get(stream)
+            except Exception:
+                cached = None
+            if cached is not None:
+                return cached
+            strip = should_strip_ansi(stream, color)
+            ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip)
+            rv = ansi_wrapper.stream
+            _write = rv.write
+
+            def _safe_write(s):
+                try:
+                    return _write(s)
+                except:
+                    ansi_wrapper.reset_all()
+                    raise
+
+            rv.write = _safe_write
+            try:
+                _ansi_stream_wrappers[stream] = rv
+            except Exception:
+                pass
+            return rv
+
+        def get_winterm_size():
+            win = colorama.win32.GetConsoleScreenBufferInfo(
+                colorama.win32.STDOUT
+            ).srWindow
+            return win.Right - win.Left, win.Bottom - win.Top
+
+
+else:
+
+    def _get_argv_encoding():
+        return getattr(sys.stdin, "encoding", None) or get_filesystem_encoding()
+
+    _get_windows_console_stream = lambda *x: None
+    _wrap_std_stream = lambda *x: None
+
+
+def term_len(x):
+    return len(strip_ansi(x))
+
+
+def isatty(stream):
+    try:
+        return stream.isatty()
+    except Exception:
+        return False
+
+
+def _make_cached_stream_func(src_func, wrapper_func):
+    cache = WeakKeyDictionary()
+
+    def func():
+        stream = src_func()
+        try:
+            rv = cache.get(stream)
+        except Exception:
+            rv = None
+        if rv is not None:
+            return rv
+        rv = wrapper_func()
+        try:
+            stream = src_func()  # In case wrapper_func() modified the stream
+            cache[stream] = rv
+        except Exception:
+            pass
+        return rv
+
+    return func
+
+
+_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin)
+_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout)
+_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr)
+
+
+binary_streams = {
+    "stdin": get_binary_stdin,
+    "stdout": get_binary_stdout,
+    "stderr": get_binary_stderr,
+}
+
+text_streams = {
+    "stdin": get_text_stdin,
+    "stdout": get_text_stdout,
+    "stderr": get_text_stderr,
+}