comparison env/lib/python3.9/site-packages/click/testing.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import contextlib
2 import os
3 import shlex
4 import shutil
5 import sys
6 import tempfile
7
8 from . import formatting
9 from . import termui
10 from . import utils
11 from ._compat import iteritems
12 from ._compat import PY2
13 from ._compat import string_types
14
15
16 if PY2:
17 from cStringIO import StringIO
18 else:
19 import io
20 from ._compat import _find_binary_reader
21
22
23 class EchoingStdin(object):
24 def __init__(self, input, output):
25 self._input = input
26 self._output = output
27
28 def __getattr__(self, x):
29 return getattr(self._input, x)
30
31 def _echo(self, rv):
32 self._output.write(rv)
33 return rv
34
35 def read(self, n=-1):
36 return self._echo(self._input.read(n))
37
38 def readline(self, n=-1):
39 return self._echo(self._input.readline(n))
40
41 def readlines(self):
42 return [self._echo(x) for x in self._input.readlines()]
43
44 def __iter__(self):
45 return iter(self._echo(x) for x in self._input)
46
47 def __repr__(self):
48 return repr(self._input)
49
50
51 def make_input_stream(input, charset):
52 # Is already an input stream.
53 if hasattr(input, "read"):
54 if PY2:
55 return input
56 rv = _find_binary_reader(input)
57 if rv is not None:
58 return rv
59 raise TypeError("Could not find binary reader for input stream.")
60
61 if input is None:
62 input = b""
63 elif not isinstance(input, bytes):
64 input = input.encode(charset)
65 if PY2:
66 return StringIO(input)
67 return io.BytesIO(input)
68
69
70 class Result(object):
71 """Holds the captured result of an invoked CLI script."""
72
73 def __init__(
74 self, runner, stdout_bytes, stderr_bytes, exit_code, exception, exc_info=None
75 ):
76 #: The runner that created the result
77 self.runner = runner
78 #: The standard output as bytes.
79 self.stdout_bytes = stdout_bytes
80 #: The standard error as bytes, or None if not available
81 self.stderr_bytes = stderr_bytes
82 #: The exit code as integer.
83 self.exit_code = exit_code
84 #: The exception that happened if one did.
85 self.exception = exception
86 #: The traceback
87 self.exc_info = exc_info
88
89 @property
90 def output(self):
91 """The (standard) output as unicode string."""
92 return self.stdout
93
94 @property
95 def stdout(self):
96 """The standard output as unicode string."""
97 return self.stdout_bytes.decode(self.runner.charset, "replace").replace(
98 "\r\n", "\n"
99 )
100
101 @property
102 def stderr(self):
103 """The standard error as unicode string."""
104 if self.stderr_bytes is None:
105 raise ValueError("stderr not separately captured")
106 return self.stderr_bytes.decode(self.runner.charset, "replace").replace(
107 "\r\n", "\n"
108 )
109
110 def __repr__(self):
111 return "<{} {}>".format(
112 type(self).__name__, repr(self.exception) if self.exception else "okay"
113 )
114
115
116 class CliRunner(object):
117 """The CLI runner provides functionality to invoke a Click command line
118 script for unittesting purposes in a isolated environment. This only
119 works in single-threaded systems without any concurrency as it changes the
120 global interpreter state.
121
122 :param charset: the character set for the input and output data. This is
123 UTF-8 by default and should not be changed currently as
124 the reporting to Click only works in Python 2 properly.
125 :param env: a dictionary with environment variables for overriding.
126 :param echo_stdin: if this is set to `True`, then reading from stdin writes
127 to stdout. This is useful for showing examples in
128 some circumstances. Note that regular prompts
129 will automatically echo the input.
130 :param mix_stderr: if this is set to `False`, then stdout and stderr are
131 preserved as independent streams. This is useful for
132 Unix-philosophy apps that have predictable stdout and
133 noisy stderr, such that each may be measured
134 independently
135 """
136
137 def __init__(self, charset=None, env=None, echo_stdin=False, mix_stderr=True):
138 if charset is None:
139 charset = "utf-8"
140 self.charset = charset
141 self.env = env or {}
142 self.echo_stdin = echo_stdin
143 self.mix_stderr = mix_stderr
144
145 def get_default_prog_name(self, cli):
146 """Given a command object it will return the default program name
147 for it. The default is the `name` attribute or ``"root"`` if not
148 set.
149 """
150 return cli.name or "root"
151
152 def make_env(self, overrides=None):
153 """Returns the environment overrides for invoking a script."""
154 rv = dict(self.env)
155 if overrides:
156 rv.update(overrides)
157 return rv
158
159 @contextlib.contextmanager
160 def isolation(self, input=None, env=None, color=False):
161 """A context manager that sets up the isolation for invoking of a
162 command line tool. This sets up stdin with the given input data
163 and `os.environ` with the overrides from the given dictionary.
164 This also rebinds some internals in Click to be mocked (like the
165 prompt functionality).
166
167 This is automatically done in the :meth:`invoke` method.
168
169 .. versionadded:: 4.0
170 The ``color`` parameter was added.
171
172 :param input: the input stream to put into sys.stdin.
173 :param env: the environment overrides as dictionary.
174 :param color: whether the output should contain color codes. The
175 application can still override this explicitly.
176 """
177 input = make_input_stream(input, self.charset)
178
179 old_stdin = sys.stdin
180 old_stdout = sys.stdout
181 old_stderr = sys.stderr
182 old_forced_width = formatting.FORCED_WIDTH
183 formatting.FORCED_WIDTH = 80
184
185 env = self.make_env(env)
186
187 if PY2:
188 bytes_output = StringIO()
189 if self.echo_stdin:
190 input = EchoingStdin(input, bytes_output)
191 sys.stdout = bytes_output
192 if not self.mix_stderr:
193 bytes_error = StringIO()
194 sys.stderr = bytes_error
195 else:
196 bytes_output = io.BytesIO()
197 if self.echo_stdin:
198 input = EchoingStdin(input, bytes_output)
199 input = io.TextIOWrapper(input, encoding=self.charset)
200 sys.stdout = io.TextIOWrapper(bytes_output, encoding=self.charset)
201 if not self.mix_stderr:
202 bytes_error = io.BytesIO()
203 sys.stderr = io.TextIOWrapper(bytes_error, encoding=self.charset)
204
205 if self.mix_stderr:
206 sys.stderr = sys.stdout
207
208 sys.stdin = input
209
210 def visible_input(prompt=None):
211 sys.stdout.write(prompt or "")
212 val = input.readline().rstrip("\r\n")
213 sys.stdout.write("{}\n".format(val))
214 sys.stdout.flush()
215 return val
216
217 def hidden_input(prompt=None):
218 sys.stdout.write("{}\n".format(prompt or ""))
219 sys.stdout.flush()
220 return input.readline().rstrip("\r\n")
221
222 def _getchar(echo):
223 char = sys.stdin.read(1)
224 if echo:
225 sys.stdout.write(char)
226 sys.stdout.flush()
227 return char
228
229 default_color = color
230
231 def should_strip_ansi(stream=None, color=None):
232 if color is None:
233 return not default_color
234 return not color
235
236 old_visible_prompt_func = termui.visible_prompt_func
237 old_hidden_prompt_func = termui.hidden_prompt_func
238 old__getchar_func = termui._getchar
239 old_should_strip_ansi = utils.should_strip_ansi
240 termui.visible_prompt_func = visible_input
241 termui.hidden_prompt_func = hidden_input
242 termui._getchar = _getchar
243 utils.should_strip_ansi = should_strip_ansi
244
245 old_env = {}
246 try:
247 for key, value in iteritems(env):
248 old_env[key] = os.environ.get(key)
249 if value is None:
250 try:
251 del os.environ[key]
252 except Exception:
253 pass
254 else:
255 os.environ[key] = value
256 yield (bytes_output, not self.mix_stderr and bytes_error)
257 finally:
258 for key, value in iteritems(old_env):
259 if value is None:
260 try:
261 del os.environ[key]
262 except Exception:
263 pass
264 else:
265 os.environ[key] = value
266 sys.stdout = old_stdout
267 sys.stderr = old_stderr
268 sys.stdin = old_stdin
269 termui.visible_prompt_func = old_visible_prompt_func
270 termui.hidden_prompt_func = old_hidden_prompt_func
271 termui._getchar = old__getchar_func
272 utils.should_strip_ansi = old_should_strip_ansi
273 formatting.FORCED_WIDTH = old_forced_width
274
275 def invoke(
276 self,
277 cli,
278 args=None,
279 input=None,
280 env=None,
281 catch_exceptions=True,
282 color=False,
283 **extra
284 ):
285 """Invokes a command in an isolated environment. The arguments are
286 forwarded directly to the command line script, the `extra` keyword
287 arguments are passed to the :meth:`~clickpkg.Command.main` function of
288 the command.
289
290 This returns a :class:`Result` object.
291
292 .. versionadded:: 3.0
293 The ``catch_exceptions`` parameter was added.
294
295 .. versionchanged:: 3.0
296 The result object now has an `exc_info` attribute with the
297 traceback if available.
298
299 .. versionadded:: 4.0
300 The ``color`` parameter was added.
301
302 :param cli: the command to invoke
303 :param args: the arguments to invoke. It may be given as an iterable
304 or a string. When given as string it will be interpreted
305 as a Unix shell command. More details at
306 :func:`shlex.split`.
307 :param input: the input data for `sys.stdin`.
308 :param env: the environment overrides.
309 :param catch_exceptions: Whether to catch any other exceptions than
310 ``SystemExit``.
311 :param extra: the keyword arguments to pass to :meth:`main`.
312 :param color: whether the output should contain color codes. The
313 application can still override this explicitly.
314 """
315 exc_info = None
316 with self.isolation(input=input, env=env, color=color) as outstreams:
317 exception = None
318 exit_code = 0
319
320 if isinstance(args, string_types):
321 args = shlex.split(args)
322
323 try:
324 prog_name = extra.pop("prog_name")
325 except KeyError:
326 prog_name = self.get_default_prog_name(cli)
327
328 try:
329 cli.main(args=args or (), prog_name=prog_name, **extra)
330 except SystemExit as e:
331 exc_info = sys.exc_info()
332 exit_code = e.code
333 if exit_code is None:
334 exit_code = 0
335
336 if exit_code != 0:
337 exception = e
338
339 if not isinstance(exit_code, int):
340 sys.stdout.write(str(exit_code))
341 sys.stdout.write("\n")
342 exit_code = 1
343
344 except Exception as e:
345 if not catch_exceptions:
346 raise
347 exception = e
348 exit_code = 1
349 exc_info = sys.exc_info()
350 finally:
351 sys.stdout.flush()
352 stdout = outstreams[0].getvalue()
353 if self.mix_stderr:
354 stderr = None
355 else:
356 stderr = outstreams[1].getvalue()
357
358 return Result(
359 runner=self,
360 stdout_bytes=stdout,
361 stderr_bytes=stderr,
362 exit_code=exit_code,
363 exception=exception,
364 exc_info=exc_info,
365 )
366
367 @contextlib.contextmanager
368 def isolated_filesystem(self):
369 """A context manager that creates a temporary folder and changes
370 the current working directory to it for isolated filesystem tests.
371 """
372 cwd = os.getcwd()
373 t = tempfile.mkdtemp()
374 os.chdir(t)
375 try:
376 yield t
377 finally:
378 os.chdir(cwd)
379 try:
380 shutil.rmtree(t)
381 except (OSError, IOError): # noqa: B014
382 pass