comparison env/lib/python3.9/site-packages/click/_compat.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # flake8: noqa
2 import codecs
3 import io
4 import os
5 import re
6 import sys
7 from weakref import WeakKeyDictionary
8
9 PY2 = sys.version_info[0] == 2
10 CYGWIN = sys.platform.startswith("cygwin")
11 MSYS2 = sys.platform.startswith("win") and ("GCC" in sys.version)
12 # Determine local App Engine environment, per Google's own suggestion
13 APP_ENGINE = "APPENGINE_RUNTIME" in os.environ and "Development/" in os.environ.get(
14 "SERVER_SOFTWARE", ""
15 )
16 WIN = sys.platform.startswith("win") and not APP_ENGINE and not MSYS2
17 DEFAULT_COLUMNS = 80
18
19
20 _ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]")
21
22
23 def get_filesystem_encoding():
24 return sys.getfilesystemencoding() or sys.getdefaultencoding()
25
26
27 def _make_text_stream(
28 stream, encoding, errors, force_readable=False, force_writable=False
29 ):
30 if encoding is None:
31 encoding = get_best_encoding(stream)
32 if errors is None:
33 errors = "replace"
34 return _NonClosingTextIOWrapper(
35 stream,
36 encoding,
37 errors,
38 line_buffering=True,
39 force_readable=force_readable,
40 force_writable=force_writable,
41 )
42
43
44 def is_ascii_encoding(encoding):
45 """Checks if a given encoding is ascii."""
46 try:
47 return codecs.lookup(encoding).name == "ascii"
48 except LookupError:
49 return False
50
51
52 def get_best_encoding(stream):
53 """Returns the default stream encoding if not found."""
54 rv = getattr(stream, "encoding", None) or sys.getdefaultencoding()
55 if is_ascii_encoding(rv):
56 return "utf-8"
57 return rv
58
59
60 class _NonClosingTextIOWrapper(io.TextIOWrapper):
61 def __init__(
62 self,
63 stream,
64 encoding,
65 errors,
66 force_readable=False,
67 force_writable=False,
68 **extra
69 ):
70 self._stream = stream = _FixupStream(stream, force_readable, force_writable)
71 io.TextIOWrapper.__init__(self, stream, encoding, errors, **extra)
72
73 # The io module is a place where the Python 3 text behavior
74 # was forced upon Python 2, so we need to unbreak
75 # it to look like Python 2.
76 if PY2:
77
78 def write(self, x):
79 if isinstance(x, str) or is_bytes(x):
80 try:
81 self.flush()
82 except Exception:
83 pass
84 return self.buffer.write(str(x))
85 return io.TextIOWrapper.write(self, x)
86
87 def writelines(self, lines):
88 for line in lines:
89 self.write(line)
90
91 def __del__(self):
92 try:
93 self.detach()
94 except Exception:
95 pass
96
97 def isatty(self):
98 # https://bitbucket.org/pypy/pypy/issue/1803
99 return self._stream.isatty()
100
101
102 class _FixupStream(object):
103 """The new io interface needs more from streams than streams
104 traditionally implement. As such, this fix-up code is necessary in
105 some circumstances.
106
107 The forcing of readable and writable flags are there because some tools
108 put badly patched objects on sys (one such offender are certain version
109 of jupyter notebook).
110 """
111
112 def __init__(self, stream, force_readable=False, force_writable=False):
113 self._stream = stream
114 self._force_readable = force_readable
115 self._force_writable = force_writable
116
117 def __getattr__(self, name):
118 return getattr(self._stream, name)
119
120 def read1(self, size):
121 f = getattr(self._stream, "read1", None)
122 if f is not None:
123 return f(size)
124 # We only dispatch to readline instead of read in Python 2 as we
125 # do not want cause problems with the different implementation
126 # of line buffering.
127 if PY2:
128 return self._stream.readline(size)
129 return self._stream.read(size)
130
131 def readable(self):
132 if self._force_readable:
133 return True
134 x = getattr(self._stream, "readable", None)
135 if x is not None:
136 return x()
137 try:
138 self._stream.read(0)
139 except Exception:
140 return False
141 return True
142
143 def writable(self):
144 if self._force_writable:
145 return True
146 x = getattr(self._stream, "writable", None)
147 if x is not None:
148 return x()
149 try:
150 self._stream.write("")
151 except Exception:
152 try:
153 self._stream.write(b"")
154 except Exception:
155 return False
156 return True
157
158 def seekable(self):
159 x = getattr(self._stream, "seekable", None)
160 if x is not None:
161 return x()
162 try:
163 self._stream.seek(self._stream.tell())
164 except Exception:
165 return False
166 return True
167
168
169 if PY2:
170 text_type = unicode
171 raw_input = raw_input
172 string_types = (str, unicode)
173 int_types = (int, long)
174 iteritems = lambda x: x.iteritems()
175 range_type = xrange
176
177 def is_bytes(x):
178 return isinstance(x, (buffer, bytearray))
179
180 _identifier_re = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
181
182 # For Windows, we need to force stdout/stdin/stderr to binary if it's
183 # fetched for that. This obviously is not the most correct way to do
184 # it as it changes global state. Unfortunately, there does not seem to
185 # be a clear better way to do it as just reopening the file in binary
186 # mode does not change anything.
187 #
188 # An option would be to do what Python 3 does and to open the file as
189 # binary only, patch it back to the system, and then use a wrapper
190 # stream that converts newlines. It's not quite clear what's the
191 # correct option here.
192 #
193 # This code also lives in _winconsole for the fallback to the console
194 # emulation stream.
195 #
196 # There are also Windows environments where the `msvcrt` module is not
197 # available (which is why we use try-catch instead of the WIN variable
198 # here), such as the Google App Engine development server on Windows. In
199 # those cases there is just nothing we can do.
200 def set_binary_mode(f):
201 return f
202
203 try:
204 import msvcrt
205 except ImportError:
206 pass
207 else:
208
209 def set_binary_mode(f):
210 try:
211 fileno = f.fileno()
212 except Exception:
213 pass
214 else:
215 msvcrt.setmode(fileno, os.O_BINARY)
216 return f
217
218 try:
219 import fcntl
220 except ImportError:
221 pass
222 else:
223
224 def set_binary_mode(f):
225 try:
226 fileno = f.fileno()
227 except Exception:
228 pass
229 else:
230 flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
231 fcntl.fcntl(fileno, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
232 return f
233
234 def isidentifier(x):
235 return _identifier_re.search(x) is not None
236
237 def get_binary_stdin():
238 return set_binary_mode(sys.stdin)
239
240 def get_binary_stdout():
241 _wrap_std_stream("stdout")
242 return set_binary_mode(sys.stdout)
243
244 def get_binary_stderr():
245 _wrap_std_stream("stderr")
246 return set_binary_mode(sys.stderr)
247
248 def get_text_stdin(encoding=None, errors=None):
249 rv = _get_windows_console_stream(sys.stdin, encoding, errors)
250 if rv is not None:
251 return rv
252 return _make_text_stream(sys.stdin, encoding, errors, force_readable=True)
253
254 def get_text_stdout(encoding=None, errors=None):
255 _wrap_std_stream("stdout")
256 rv = _get_windows_console_stream(sys.stdout, encoding, errors)
257 if rv is not None:
258 return rv
259 return _make_text_stream(sys.stdout, encoding, errors, force_writable=True)
260
261 def get_text_stderr(encoding=None, errors=None):
262 _wrap_std_stream("stderr")
263 rv = _get_windows_console_stream(sys.stderr, encoding, errors)
264 if rv is not None:
265 return rv
266 return _make_text_stream(sys.stderr, encoding, errors, force_writable=True)
267
268 def filename_to_ui(value):
269 if isinstance(value, bytes):
270 value = value.decode(get_filesystem_encoding(), "replace")
271 return value
272
273
274 else:
275 import io
276
277 text_type = str
278 raw_input = input
279 string_types = (str,)
280 int_types = (int,)
281 range_type = range
282 isidentifier = lambda x: x.isidentifier()
283 iteritems = lambda x: iter(x.items())
284
285 def is_bytes(x):
286 return isinstance(x, (bytes, memoryview, bytearray))
287
288 def _is_binary_reader(stream, default=False):
289 try:
290 return isinstance(stream.read(0), bytes)
291 except Exception:
292 return default
293 # This happens in some cases where the stream was already
294 # closed. In this case, we assume the default.
295
296 def _is_binary_writer(stream, default=False):
297 try:
298 stream.write(b"")
299 except Exception:
300 try:
301 stream.write("")
302 return False
303 except Exception:
304 pass
305 return default
306 return True
307
308 def _find_binary_reader(stream):
309 # We need to figure out if the given stream is already binary.
310 # This can happen because the official docs recommend detaching
311 # the streams to get binary streams. Some code might do this, so
312 # we need to deal with this case explicitly.
313 if _is_binary_reader(stream, False):
314 return stream
315
316 buf = getattr(stream, "buffer", None)
317
318 # Same situation here; this time we assume that the buffer is
319 # actually binary in case it's closed.
320 if buf is not None and _is_binary_reader(buf, True):
321 return buf
322
323 def _find_binary_writer(stream):
324 # We need to figure out if the given stream is already binary.
325 # This can happen because the official docs recommend detatching
326 # the streams to get binary streams. Some code might do this, so
327 # we need to deal with this case explicitly.
328 if _is_binary_writer(stream, False):
329 return stream
330
331 buf = getattr(stream, "buffer", None)
332
333 # Same situation here; this time we assume that the buffer is
334 # actually binary in case it's closed.
335 if buf is not None and _is_binary_writer(buf, True):
336 return buf
337
338 def _stream_is_misconfigured(stream):
339 """A stream is misconfigured if its encoding is ASCII."""
340 # If the stream does not have an encoding set, we assume it's set
341 # to ASCII. This appears to happen in certain unittest
342 # environments. It's not quite clear what the correct behavior is
343 # but this at least will force Click to recover somehow.
344 return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii")
345
346 def _is_compat_stream_attr(stream, attr, value):
347 """A stream attribute is compatible if it is equal to the
348 desired value or the desired value is unset and the attribute
349 has a value.
350 """
351 stream_value = getattr(stream, attr, None)
352 return stream_value == value or (value is None and stream_value is not None)
353
354 def _is_compatible_text_stream(stream, encoding, errors):
355 """Check if a stream's encoding and errors attributes are
356 compatible with the desired values.
357 """
358 return _is_compat_stream_attr(
359 stream, "encoding", encoding
360 ) and _is_compat_stream_attr(stream, "errors", errors)
361
362 def _force_correct_text_stream(
363 text_stream,
364 encoding,
365 errors,
366 is_binary,
367 find_binary,
368 force_readable=False,
369 force_writable=False,
370 ):
371 if is_binary(text_stream, False):
372 binary_reader = text_stream
373 else:
374 # If the stream looks compatible, and won't default to a
375 # misconfigured ascii encoding, return it as-is.
376 if _is_compatible_text_stream(text_stream, encoding, errors) and not (
377 encoding is None and _stream_is_misconfigured(text_stream)
378 ):
379 return text_stream
380
381 # Otherwise, get the underlying binary reader.
382 binary_reader = find_binary(text_stream)
383
384 # If that's not possible, silently use the original reader
385 # and get mojibake instead of exceptions.
386 if binary_reader is None:
387 return text_stream
388
389 # Default errors to replace instead of strict in order to get
390 # something that works.
391 if errors is None:
392 errors = "replace"
393
394 # Wrap the binary stream in a text stream with the correct
395 # encoding parameters.
396 return _make_text_stream(
397 binary_reader,
398 encoding,
399 errors,
400 force_readable=force_readable,
401 force_writable=force_writable,
402 )
403
404 def _force_correct_text_reader(text_reader, encoding, errors, force_readable=False):
405 return _force_correct_text_stream(
406 text_reader,
407 encoding,
408 errors,
409 _is_binary_reader,
410 _find_binary_reader,
411 force_readable=force_readable,
412 )
413
414 def _force_correct_text_writer(text_writer, encoding, errors, force_writable=False):
415 return _force_correct_text_stream(
416 text_writer,
417 encoding,
418 errors,
419 _is_binary_writer,
420 _find_binary_writer,
421 force_writable=force_writable,
422 )
423
424 def get_binary_stdin():
425 reader = _find_binary_reader(sys.stdin)
426 if reader is None:
427 raise RuntimeError("Was not able to determine binary stream for sys.stdin.")
428 return reader
429
430 def get_binary_stdout():
431 writer = _find_binary_writer(sys.stdout)
432 if writer is None:
433 raise RuntimeError(
434 "Was not able to determine binary stream for sys.stdout."
435 )
436 return writer
437
438 def get_binary_stderr():
439 writer = _find_binary_writer(sys.stderr)
440 if writer is None:
441 raise RuntimeError(
442 "Was not able to determine binary stream for sys.stderr."
443 )
444 return writer
445
446 def get_text_stdin(encoding=None, errors=None):
447 rv = _get_windows_console_stream(sys.stdin, encoding, errors)
448 if rv is not None:
449 return rv
450 return _force_correct_text_reader(
451 sys.stdin, encoding, errors, force_readable=True
452 )
453
454 def get_text_stdout(encoding=None, errors=None):
455 rv = _get_windows_console_stream(sys.stdout, encoding, errors)
456 if rv is not None:
457 return rv
458 return _force_correct_text_writer(
459 sys.stdout, encoding, errors, force_writable=True
460 )
461
462 def get_text_stderr(encoding=None, errors=None):
463 rv = _get_windows_console_stream(sys.stderr, encoding, errors)
464 if rv is not None:
465 return rv
466 return _force_correct_text_writer(
467 sys.stderr, encoding, errors, force_writable=True
468 )
469
470 def filename_to_ui(value):
471 if isinstance(value, bytes):
472 value = value.decode(get_filesystem_encoding(), "replace")
473 else:
474 value = value.encode("utf-8", "surrogateescape").decode("utf-8", "replace")
475 return value
476
477
478 def get_streerror(e, default=None):
479 if hasattr(e, "strerror"):
480 msg = e.strerror
481 else:
482 if default is not None:
483 msg = default
484 else:
485 msg = str(e)
486 if isinstance(msg, bytes):
487 msg = msg.decode("utf-8", "replace")
488 return msg
489
490
491 def _wrap_io_open(file, mode, encoding, errors):
492 """On Python 2, :func:`io.open` returns a text file wrapper that
493 requires passing ``unicode`` to ``write``. Need to open the file in
494 binary mode then wrap it in a subclass that can write ``str`` and
495 ``unicode``.
496
497 Also handles not passing ``encoding`` and ``errors`` in binary mode.
498 """
499 binary = "b" in mode
500
501 if binary:
502 kwargs = {}
503 else:
504 kwargs = {"encoding": encoding, "errors": errors}
505
506 if not PY2 or binary:
507 return io.open(file, mode, **kwargs)
508
509 f = io.open(file, "{}b".format(mode.replace("t", "")))
510 return _make_text_stream(f, **kwargs)
511
512
513 def open_stream(filename, mode="r", encoding=None, errors="strict", atomic=False):
514 binary = "b" in mode
515
516 # Standard streams first. These are simple because they don't need
517 # special handling for the atomic flag. It's entirely ignored.
518 if filename == "-":
519 if any(m in mode for m in ["w", "a", "x"]):
520 if binary:
521 return get_binary_stdout(), False
522 return get_text_stdout(encoding=encoding, errors=errors), False
523 if binary:
524 return get_binary_stdin(), False
525 return get_text_stdin(encoding=encoding, errors=errors), False
526
527 # Non-atomic writes directly go out through the regular open functions.
528 if not atomic:
529 return _wrap_io_open(filename, mode, encoding, errors), True
530
531 # Some usability stuff for atomic writes
532 if "a" in mode:
533 raise ValueError(
534 "Appending to an existing file is not supported, because that"
535 " would involve an expensive `copy`-operation to a temporary"
536 " file. Open the file in normal `w`-mode and copy explicitly"
537 " if that's what you're after."
538 )
539 if "x" in mode:
540 raise ValueError("Use the `overwrite`-parameter instead.")
541 if "w" not in mode:
542 raise ValueError("Atomic writes only make sense with `w`-mode.")
543
544 # Atomic writes are more complicated. They work by opening a file
545 # as a proxy in the same folder and then using the fdopen
546 # functionality to wrap it in a Python file. Then we wrap it in an
547 # atomic file that moves the file over on close.
548 import errno
549 import random
550
551 try:
552 perm = os.stat(filename).st_mode
553 except OSError:
554 perm = None
555
556 flags = os.O_RDWR | os.O_CREAT | os.O_EXCL
557
558 if binary:
559 flags |= getattr(os, "O_BINARY", 0)
560
561 while True:
562 tmp_filename = os.path.join(
563 os.path.dirname(filename),
564 ".__atomic-write{:08x}".format(random.randrange(1 << 32)),
565 )
566 try:
567 fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm)
568 break
569 except OSError as e:
570 if e.errno == errno.EEXIST or (
571 os.name == "nt"
572 and e.errno == errno.EACCES
573 and os.path.isdir(e.filename)
574 and os.access(e.filename, os.W_OK)
575 ):
576 continue
577 raise
578
579 if perm is not None:
580 os.chmod(tmp_filename, perm) # in case perm includes bits in umask
581
582 f = _wrap_io_open(fd, mode, encoding, errors)
583 return _AtomicFile(f, tmp_filename, os.path.realpath(filename)), True
584
585
586 # Used in a destructor call, needs extra protection from interpreter cleanup.
587 if hasattr(os, "replace"):
588 _replace = os.replace
589 _can_replace = True
590 else:
591 _replace = os.rename
592 _can_replace = not WIN
593
594
595 class _AtomicFile(object):
596 def __init__(self, f, tmp_filename, real_filename):
597 self._f = f
598 self._tmp_filename = tmp_filename
599 self._real_filename = real_filename
600 self.closed = False
601
602 @property
603 def name(self):
604 return self._real_filename
605
606 def close(self, delete=False):
607 if self.closed:
608 return
609 self._f.close()
610 if not _can_replace:
611 try:
612 os.remove(self._real_filename)
613 except OSError:
614 pass
615 _replace(self._tmp_filename, self._real_filename)
616 self.closed = True
617
618 def __getattr__(self, name):
619 return getattr(self._f, name)
620
621 def __enter__(self):
622 return self
623
624 def __exit__(self, exc_type, exc_value, tb):
625 self.close(delete=exc_type is not None)
626
627 def __repr__(self):
628 return repr(self._f)
629
630
631 auto_wrap_for_ansi = None
632 colorama = None
633 get_winterm_size = None
634
635
636 def strip_ansi(value):
637 return _ansi_re.sub("", value)
638
639
640 def _is_jupyter_kernel_output(stream):
641 if WIN:
642 # TODO: Couldn't test on Windows, should't try to support until
643 # someone tests the details wrt colorama.
644 return
645
646 while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)):
647 stream = stream._stream
648
649 return stream.__class__.__module__.startswith("ipykernel.")
650
651
652 def should_strip_ansi(stream=None, color=None):
653 if color is None:
654 if stream is None:
655 stream = sys.stdin
656 return not isatty(stream) and not _is_jupyter_kernel_output(stream)
657 return not color
658
659
660 # If we're on Windows, we provide transparent integration through
661 # colorama. This will make ANSI colors through the echo function
662 # work automatically.
663 if WIN:
664 # Windows has a smaller terminal
665 DEFAULT_COLUMNS = 79
666
667 from ._winconsole import _get_windows_console_stream, _wrap_std_stream
668
669 def _get_argv_encoding():
670 import locale
671
672 return locale.getpreferredencoding()
673
674 if PY2:
675
676 def raw_input(prompt=""):
677 sys.stderr.flush()
678 if prompt:
679 stdout = _default_text_stdout()
680 stdout.write(prompt)
681 stdin = _default_text_stdin()
682 return stdin.readline().rstrip("\r\n")
683
684 try:
685 import colorama
686 except ImportError:
687 pass
688 else:
689 _ansi_stream_wrappers = WeakKeyDictionary()
690
691 def auto_wrap_for_ansi(stream, color=None):
692 """This function wraps a stream so that calls through colorama
693 are issued to the win32 console API to recolor on demand. It
694 also ensures to reset the colors if a write call is interrupted
695 to not destroy the console afterwards.
696 """
697 try:
698 cached = _ansi_stream_wrappers.get(stream)
699 except Exception:
700 cached = None
701 if cached is not None:
702 return cached
703 strip = should_strip_ansi(stream, color)
704 ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip)
705 rv = ansi_wrapper.stream
706 _write = rv.write
707
708 def _safe_write(s):
709 try:
710 return _write(s)
711 except:
712 ansi_wrapper.reset_all()
713 raise
714
715 rv.write = _safe_write
716 try:
717 _ansi_stream_wrappers[stream] = rv
718 except Exception:
719 pass
720 return rv
721
722 def get_winterm_size():
723 win = colorama.win32.GetConsoleScreenBufferInfo(
724 colorama.win32.STDOUT
725 ).srWindow
726 return win.Right - win.Left, win.Bottom - win.Top
727
728
729 else:
730
731 def _get_argv_encoding():
732 return getattr(sys.stdin, "encoding", None) or get_filesystem_encoding()
733
734 _get_windows_console_stream = lambda *x: None
735 _wrap_std_stream = lambda *x: None
736
737
738 def term_len(x):
739 return len(strip_ansi(x))
740
741
742 def isatty(stream):
743 try:
744 return stream.isatty()
745 except Exception:
746 return False
747
748
749 def _make_cached_stream_func(src_func, wrapper_func):
750 cache = WeakKeyDictionary()
751
752 def func():
753 stream = src_func()
754 try:
755 rv = cache.get(stream)
756 except Exception:
757 rv = None
758 if rv is not None:
759 return rv
760 rv = wrapper_func()
761 try:
762 stream = src_func() # In case wrapper_func() modified the stream
763 cache[stream] = rv
764 except Exception:
765 pass
766 return rv
767
768 return func
769
770
771 _default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin)
772 _default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout)
773 _default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr)
774
775
776 binary_streams = {
777 "stdin": get_binary_stdin,
778 "stdout": get_binary_stdout,
779 "stderr": get_binary_stderr,
780 }
781
782 text_streams = {
783 "stdin": get_text_stdin,
784 "stdout": get_text_stdout,
785 "stderr": get_text_stderr,
786 }