Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/click/testing.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:18:57 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:d30785e31577 |
|---|---|
| 1 import contextlib | |
| 2 import os | |
| 3 import shlex | |
| 4 import shutil | |
| 5 import sys | |
| 6 import tempfile | |
| 7 | |
| 8 from . import formatting | |
| 9 from . import termui | |
| 10 from . import utils | |
| 11 from ._compat import iteritems | |
| 12 from ._compat import PY2 | |
| 13 from ._compat import string_types | |
| 14 | |
| 15 | |
| 16 if PY2: | |
| 17 from cStringIO import StringIO | |
| 18 else: | |
| 19 import io | |
| 20 from ._compat import _find_binary_reader | |
| 21 | |
| 22 | |
| 23 class EchoingStdin(object): | |
| 24 def __init__(self, input, output): | |
| 25 self._input = input | |
| 26 self._output = output | |
| 27 | |
| 28 def __getattr__(self, x): | |
| 29 return getattr(self._input, x) | |
| 30 | |
| 31 def _echo(self, rv): | |
| 32 self._output.write(rv) | |
| 33 return rv | |
| 34 | |
| 35 def read(self, n=-1): | |
| 36 return self._echo(self._input.read(n)) | |
| 37 | |
| 38 def readline(self, n=-1): | |
| 39 return self._echo(self._input.readline(n)) | |
| 40 | |
| 41 def readlines(self): | |
| 42 return [self._echo(x) for x in self._input.readlines()] | |
| 43 | |
| 44 def __iter__(self): | |
| 45 return iter(self._echo(x) for x in self._input) | |
| 46 | |
| 47 def __repr__(self): | |
| 48 return repr(self._input) | |
| 49 | |
| 50 | |
| 51 def make_input_stream(input, charset): | |
| 52 # Is already an input stream. | |
| 53 if hasattr(input, "read"): | |
| 54 if PY2: | |
| 55 return input | |
| 56 rv = _find_binary_reader(input) | |
| 57 if rv is not None: | |
| 58 return rv | |
| 59 raise TypeError("Could not find binary reader for input stream.") | |
| 60 | |
| 61 if input is None: | |
| 62 input = b"" | |
| 63 elif not isinstance(input, bytes): | |
| 64 input = input.encode(charset) | |
| 65 if PY2: | |
| 66 return StringIO(input) | |
| 67 return io.BytesIO(input) | |
| 68 | |
| 69 | |
| 70 class Result(object): | |
| 71 """Holds the captured result of an invoked CLI script.""" | |
| 72 | |
| 73 def __init__( | |
| 74 self, runner, stdout_bytes, stderr_bytes, exit_code, exception, exc_info=None | |
| 75 ): | |
| 76 #: The runner that created the result | |
| 77 self.runner = runner | |
| 78 #: The standard output as bytes. | |
| 79 self.stdout_bytes = stdout_bytes | |
| 80 #: The standard error as bytes, or None if not available | |
| 81 self.stderr_bytes = stderr_bytes | |
| 82 #: The exit code as integer. | |
| 83 self.exit_code = exit_code | |
| 84 #: The exception that happened if one did. | |
| 85 self.exception = exception | |
| 86 #: The traceback | |
| 87 self.exc_info = exc_info | |
| 88 | |
| 89 @property | |
| 90 def output(self): | |
| 91 """The (standard) output as unicode string.""" | |
| 92 return self.stdout | |
| 93 | |
| 94 @property | |
| 95 def stdout(self): | |
| 96 """The standard output as unicode string.""" | |
| 97 return self.stdout_bytes.decode(self.runner.charset, "replace").replace( | |
| 98 "\r\n", "\n" | |
| 99 ) | |
| 100 | |
| 101 @property | |
| 102 def stderr(self): | |
| 103 """The standard error as unicode string.""" | |
| 104 if self.stderr_bytes is None: | |
| 105 raise ValueError("stderr not separately captured") | |
| 106 return self.stderr_bytes.decode(self.runner.charset, "replace").replace( | |
| 107 "\r\n", "\n" | |
| 108 ) | |
| 109 | |
| 110 def __repr__(self): | |
| 111 return "<{} {}>".format( | |
| 112 type(self).__name__, repr(self.exception) if self.exception else "okay" | |
| 113 ) | |
| 114 | |
| 115 | |
| 116 class CliRunner(object): | |
| 117 """The CLI runner provides functionality to invoke a Click command line | |
| 118 script for unittesting purposes in a isolated environment. This only | |
| 119 works in single-threaded systems without any concurrency as it changes the | |
| 120 global interpreter state. | |
| 121 | |
| 122 :param charset: the character set for the input and output data. This is | |
| 123 UTF-8 by default and should not be changed currently as | |
| 124 the reporting to Click only works in Python 2 properly. | |
| 125 :param env: a dictionary with environment variables for overriding. | |
| 126 :param echo_stdin: if this is set to `True`, then reading from stdin writes | |
| 127 to stdout. This is useful for showing examples in | |
| 128 some circumstances. Note that regular prompts | |
| 129 will automatically echo the input. | |
| 130 :param mix_stderr: if this is set to `False`, then stdout and stderr are | |
| 131 preserved as independent streams. This is useful for | |
| 132 Unix-philosophy apps that have predictable stdout and | |
| 133 noisy stderr, such that each may be measured | |
| 134 independently | |
| 135 """ | |
| 136 | |
| 137 def __init__(self, charset=None, env=None, echo_stdin=False, mix_stderr=True): | |
| 138 if charset is None: | |
| 139 charset = "utf-8" | |
| 140 self.charset = charset | |
| 141 self.env = env or {} | |
| 142 self.echo_stdin = echo_stdin | |
| 143 self.mix_stderr = mix_stderr | |
| 144 | |
| 145 def get_default_prog_name(self, cli): | |
| 146 """Given a command object it will return the default program name | |
| 147 for it. The default is the `name` attribute or ``"root"`` if not | |
| 148 set. | |
| 149 """ | |
| 150 return cli.name or "root" | |
| 151 | |
| 152 def make_env(self, overrides=None): | |
| 153 """Returns the environment overrides for invoking a script.""" | |
| 154 rv = dict(self.env) | |
| 155 if overrides: | |
| 156 rv.update(overrides) | |
| 157 return rv | |
| 158 | |
| 159 @contextlib.contextmanager | |
| 160 def isolation(self, input=None, env=None, color=False): | |
| 161 """A context manager that sets up the isolation for invoking of a | |
| 162 command line tool. This sets up stdin with the given input data | |
| 163 and `os.environ` with the overrides from the given dictionary. | |
| 164 This also rebinds some internals in Click to be mocked (like the | |
| 165 prompt functionality). | |
| 166 | |
| 167 This is automatically done in the :meth:`invoke` method. | |
| 168 | |
| 169 .. versionadded:: 4.0 | |
| 170 The ``color`` parameter was added. | |
| 171 | |
| 172 :param input: the input stream to put into sys.stdin. | |
| 173 :param env: the environment overrides as dictionary. | |
| 174 :param color: whether the output should contain color codes. The | |
| 175 application can still override this explicitly. | |
| 176 """ | |
| 177 input = make_input_stream(input, self.charset) | |
| 178 | |
| 179 old_stdin = sys.stdin | |
| 180 old_stdout = sys.stdout | |
| 181 old_stderr = sys.stderr | |
| 182 old_forced_width = formatting.FORCED_WIDTH | |
| 183 formatting.FORCED_WIDTH = 80 | |
| 184 | |
| 185 env = self.make_env(env) | |
| 186 | |
| 187 if PY2: | |
| 188 bytes_output = StringIO() | |
| 189 if self.echo_stdin: | |
| 190 input = EchoingStdin(input, bytes_output) | |
| 191 sys.stdout = bytes_output | |
| 192 if not self.mix_stderr: | |
| 193 bytes_error = StringIO() | |
| 194 sys.stderr = bytes_error | |
| 195 else: | |
| 196 bytes_output = io.BytesIO() | |
| 197 if self.echo_stdin: | |
| 198 input = EchoingStdin(input, bytes_output) | |
| 199 input = io.TextIOWrapper(input, encoding=self.charset) | |
| 200 sys.stdout = io.TextIOWrapper(bytes_output, encoding=self.charset) | |
| 201 if not self.mix_stderr: | |
| 202 bytes_error = io.BytesIO() | |
| 203 sys.stderr = io.TextIOWrapper(bytes_error, encoding=self.charset) | |
| 204 | |
| 205 if self.mix_stderr: | |
| 206 sys.stderr = sys.stdout | |
| 207 | |
| 208 sys.stdin = input | |
| 209 | |
| 210 def visible_input(prompt=None): | |
| 211 sys.stdout.write(prompt or "") | |
| 212 val = input.readline().rstrip("\r\n") | |
| 213 sys.stdout.write("{}\n".format(val)) | |
| 214 sys.stdout.flush() | |
| 215 return val | |
| 216 | |
| 217 def hidden_input(prompt=None): | |
| 218 sys.stdout.write("{}\n".format(prompt or "")) | |
| 219 sys.stdout.flush() | |
| 220 return input.readline().rstrip("\r\n") | |
| 221 | |
| 222 def _getchar(echo): | |
| 223 char = sys.stdin.read(1) | |
| 224 if echo: | |
| 225 sys.stdout.write(char) | |
| 226 sys.stdout.flush() | |
| 227 return char | |
| 228 | |
| 229 default_color = color | |
| 230 | |
| 231 def should_strip_ansi(stream=None, color=None): | |
| 232 if color is None: | |
| 233 return not default_color | |
| 234 return not color | |
| 235 | |
| 236 old_visible_prompt_func = termui.visible_prompt_func | |
| 237 old_hidden_prompt_func = termui.hidden_prompt_func | |
| 238 old__getchar_func = termui._getchar | |
| 239 old_should_strip_ansi = utils.should_strip_ansi | |
| 240 termui.visible_prompt_func = visible_input | |
| 241 termui.hidden_prompt_func = hidden_input | |
| 242 termui._getchar = _getchar | |
| 243 utils.should_strip_ansi = should_strip_ansi | |
| 244 | |
| 245 old_env = {} | |
| 246 try: | |
| 247 for key, value in iteritems(env): | |
| 248 old_env[key] = os.environ.get(key) | |
| 249 if value is None: | |
| 250 try: | |
| 251 del os.environ[key] | |
| 252 except Exception: | |
| 253 pass | |
| 254 else: | |
| 255 os.environ[key] = value | |
| 256 yield (bytes_output, not self.mix_stderr and bytes_error) | |
| 257 finally: | |
| 258 for key, value in iteritems(old_env): | |
| 259 if value is None: | |
| 260 try: | |
| 261 del os.environ[key] | |
| 262 except Exception: | |
| 263 pass | |
| 264 else: | |
| 265 os.environ[key] = value | |
| 266 sys.stdout = old_stdout | |
| 267 sys.stderr = old_stderr | |
| 268 sys.stdin = old_stdin | |
| 269 termui.visible_prompt_func = old_visible_prompt_func | |
| 270 termui.hidden_prompt_func = old_hidden_prompt_func | |
| 271 termui._getchar = old__getchar_func | |
| 272 utils.should_strip_ansi = old_should_strip_ansi | |
| 273 formatting.FORCED_WIDTH = old_forced_width | |
| 274 | |
| 275 def invoke( | |
| 276 self, | |
| 277 cli, | |
| 278 args=None, | |
| 279 input=None, | |
| 280 env=None, | |
| 281 catch_exceptions=True, | |
| 282 color=False, | |
| 283 **extra | |
| 284 ): | |
| 285 """Invokes a command in an isolated environment. The arguments are | |
| 286 forwarded directly to the command line script, the `extra` keyword | |
| 287 arguments are passed to the :meth:`~clickpkg.Command.main` function of | |
| 288 the command. | |
| 289 | |
| 290 This returns a :class:`Result` object. | |
| 291 | |
| 292 .. versionadded:: 3.0 | |
| 293 The ``catch_exceptions`` parameter was added. | |
| 294 | |
| 295 .. versionchanged:: 3.0 | |
| 296 The result object now has an `exc_info` attribute with the | |
| 297 traceback if available. | |
| 298 | |
| 299 .. versionadded:: 4.0 | |
| 300 The ``color`` parameter was added. | |
| 301 | |
| 302 :param cli: the command to invoke | |
| 303 :param args: the arguments to invoke. It may be given as an iterable | |
| 304 or a string. When given as string it will be interpreted | |
| 305 as a Unix shell command. More details at | |
| 306 :func:`shlex.split`. | |
| 307 :param input: the input data for `sys.stdin`. | |
| 308 :param env: the environment overrides. | |
| 309 :param catch_exceptions: Whether to catch any other exceptions than | |
| 310 ``SystemExit``. | |
| 311 :param extra: the keyword arguments to pass to :meth:`main`. | |
| 312 :param color: whether the output should contain color codes. The | |
| 313 application can still override this explicitly. | |
| 314 """ | |
| 315 exc_info = None | |
| 316 with self.isolation(input=input, env=env, color=color) as outstreams: | |
| 317 exception = None | |
| 318 exit_code = 0 | |
| 319 | |
| 320 if isinstance(args, string_types): | |
| 321 args = shlex.split(args) | |
| 322 | |
| 323 try: | |
| 324 prog_name = extra.pop("prog_name") | |
| 325 except KeyError: | |
| 326 prog_name = self.get_default_prog_name(cli) | |
| 327 | |
| 328 try: | |
| 329 cli.main(args=args or (), prog_name=prog_name, **extra) | |
| 330 except SystemExit as e: | |
| 331 exc_info = sys.exc_info() | |
| 332 exit_code = e.code | |
| 333 if exit_code is None: | |
| 334 exit_code = 0 | |
| 335 | |
| 336 if exit_code != 0: | |
| 337 exception = e | |
| 338 | |
| 339 if not isinstance(exit_code, int): | |
| 340 sys.stdout.write(str(exit_code)) | |
| 341 sys.stdout.write("\n") | |
| 342 exit_code = 1 | |
| 343 | |
| 344 except Exception as e: | |
| 345 if not catch_exceptions: | |
| 346 raise | |
| 347 exception = e | |
| 348 exit_code = 1 | |
| 349 exc_info = sys.exc_info() | |
| 350 finally: | |
| 351 sys.stdout.flush() | |
| 352 stdout = outstreams[0].getvalue() | |
| 353 if self.mix_stderr: | |
| 354 stderr = None | |
| 355 else: | |
| 356 stderr = outstreams[1].getvalue() | |
| 357 | |
| 358 return Result( | |
| 359 runner=self, | |
| 360 stdout_bytes=stdout, | |
| 361 stderr_bytes=stderr, | |
| 362 exit_code=exit_code, | |
| 363 exception=exception, | |
| 364 exc_info=exc_info, | |
| 365 ) | |
| 366 | |
| 367 @contextlib.contextmanager | |
| 368 def isolated_filesystem(self): | |
| 369 """A context manager that creates a temporary folder and changes | |
| 370 the current working directory to it for isolated filesystem tests. | |
| 371 """ | |
| 372 cwd = os.getcwd() | |
| 373 t = tempfile.mkdtemp() | |
| 374 os.chdir(t) | |
| 375 try: | |
| 376 yield t | |
| 377 finally: | |
| 378 os.chdir(cwd) | |
| 379 try: | |
| 380 shutil.rmtree(t) | |
| 381 except (OSError, IOError): # noqa: B014 | |
| 382 pass |
