comparison env/lib/python3.9/site-packages/humanfriendly/testing.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # Human friendly input/output in Python.
2 #
3 # Author: Peter Odding <peter@peterodding.com>
4 # Last Change: March 6, 2020
5 # URL: https://humanfriendly.readthedocs.io
6
7 """
8 Utility classes and functions that make it easy to write :mod:`unittest` compatible test suites.
9
10 Over the years I've developed the habit of writing test suites for Python
11 projects using the :mod:`unittest` module. During those years I've come to know
12 :pypi:`pytest` and in fact I use :pypi:`pytest` to run my test suites (due to
13 its much better error reporting) but I've yet to publish a test suite that
14 *requires* :pypi:`pytest`. I have several reasons for doing so:
15
16 - It's nice to keep my test suites as simple and accessible as possible and
17 not requiring a specific test runner is part of that attitude.
18
19 - Whereas :mod:`unittest` is quite explicit, :pypi:`pytest` contains a lot of
20 magic, which kind of contradicts the Python mantra "explicit is better than
21 implicit" (IMHO).
22 """
23
24 # Standard library module
25 import functools
26 import logging
27 import os
28 import pipes
29 import shutil
30 import sys
31 import tempfile
32 import time
33 import unittest
34
35 # Modules included in our package.
36 from humanfriendly.compat import StringIO
37 from humanfriendly.text import random_string
38
39 # Initialize a logger for this module.
40 logger = logging.getLogger(__name__)
41
42 # A unique object reference used to detect missing attributes.
43 NOTHING = object()
44
45 # Public identifiers that require documentation.
46 __all__ = (
47 'CallableTimedOut',
48 'CaptureBuffer',
49 'CaptureOutput',
50 'ContextManager',
51 'CustomSearchPath',
52 'MockedProgram',
53 'PatchedAttribute',
54 'PatchedItem',
55 'TemporaryDirectory',
56 'TestCase',
57 'configure_logging',
58 'make_dirs',
59 'retry',
60 'run_cli',
61 'skip_on_raise',
62 'touch',
63 )
64
65
66 def configure_logging(log_level=logging.DEBUG):
67 """configure_logging(log_level=logging.DEBUG)
68 Automatically configure logging to the terminal.
69
70 :param log_level: The log verbosity (a number, defaults
71 to :mod:`logging.DEBUG <logging>`).
72
73 When :mod:`coloredlogs` is installed :func:`coloredlogs.install()` will be
74 used to configure logging to the terminal. When this fails with an
75 :exc:`~exceptions.ImportError` then :func:`logging.basicConfig()` is used
76 as a fall back.
77 """
78 try:
79 import coloredlogs
80 coloredlogs.install(level=log_level)
81 except ImportError:
82 logging.basicConfig(
83 level=log_level,
84 format='%(asctime)s %(name)s[%(process)d] %(levelname)s %(message)s',
85 datefmt='%Y-%m-%d %H:%M:%S')
86
87
88 def make_dirs(pathname):
89 """
90 Create missing directories.
91
92 :param pathname: The pathname of a directory (a string).
93 """
94 if not os.path.isdir(pathname):
95 os.makedirs(pathname)
96
97
98 def retry(func, timeout=60, exc_type=AssertionError):
99 """retry(func, timeout=60, exc_type=AssertionError)
100 Retry a function until assertions no longer fail.
101
102 :param func: A callable. When the callable returns
103 :data:`False` it will also be retried.
104 :param timeout: The number of seconds after which to abort (a number,
105 defaults to 60).
106 :param exc_type: The type of exceptions to retry (defaults
107 to :exc:`~exceptions.AssertionError`).
108 :returns: The value returned by `func`.
109 :raises: Once the timeout has expired :func:`retry()` will raise the
110 previously retried assertion error. When `func` keeps returning
111 :data:`False` until `timeout` expires :exc:`CallableTimedOut`
112 will be raised.
113
114 This function sleeps between retries to avoid claiming CPU cycles we don't
115 need. It starts by sleeping for 0.1 second but adjusts this to one second
116 as the number of retries grows.
117 """
118 pause = 0.1
119 timeout += time.time()
120 while True:
121 try:
122 result = func()
123 if result is not False:
124 return result
125 except exc_type:
126 if time.time() > timeout:
127 raise
128 else:
129 if time.time() > timeout:
130 raise CallableTimedOut()
131 time.sleep(pause)
132 if pause < 1:
133 pause *= 2
134
135
136 def run_cli(entry_point, *arguments, **options):
137 """
138 Test a command line entry point.
139
140 :param entry_point: The function that implements the command line interface
141 (a callable).
142 :param arguments: Any positional arguments (strings) become the command
143 line arguments (:data:`sys.argv` items 1-N).
144 :param options: The following keyword arguments are supported:
145
146 **capture**
147 Whether to use :class:`CaptureOutput`. Defaults
148 to :data:`True` but can be disabled by passing
149 :data:`False` instead.
150 **input**
151 Refer to :class:`CaptureOutput`.
152 **merged**
153 Refer to :class:`CaptureOutput`.
154 **program_name**
155 Used to set :data:`sys.argv` item 0.
156 :returns: A tuple with two values:
157
158 1. The return code (an integer).
159 2. The captured output (a string).
160 """
161 # Add the `program_name' option to the arguments.
162 arguments = list(arguments)
163 arguments.insert(0, options.pop('program_name', sys.executable))
164 # Log the command line arguments (and the fact that we're about to call the
165 # command line entry point function).
166 logger.debug("Calling command line entry point with arguments: %s", arguments)
167 # Prepare to capture the return code and output even if the command line
168 # interface raises an exception (whether the exception type is SystemExit
169 # or something else).
170 returncode = 0
171 stdout = None
172 stderr = None
173 try:
174 # Temporarily override sys.argv.
175 with PatchedAttribute(sys, 'argv', arguments):
176 # Manipulate the standard input/output/error streams?
177 options['enabled'] = options.pop('capture', True)
178 with CaptureOutput(**options) as capturer:
179 try:
180 # Call the command line interface.
181 entry_point()
182 finally:
183 # Get the output even if an exception is raised.
184 stdout = capturer.stdout.getvalue()
185 stderr = capturer.stderr.getvalue()
186 # Reconfigure logging to the terminal because it is very
187 # likely that the entry point function has changed the
188 # configured log level.
189 configure_logging()
190 except BaseException as e:
191 if isinstance(e, SystemExit):
192 logger.debug("Intercepting return code %s from SystemExit exception.", e.code)
193 returncode = e.code
194 else:
195 logger.warning("Defaulting return code to 1 due to raised exception.", exc_info=True)
196 returncode = 1
197 else:
198 logger.debug("Command line entry point returned successfully!")
199 # Always log the output captured on stdout/stderr, to make it easier to
200 # diagnose test failures (but avoid duplicate logging when merged=True).
201 is_merged = options.get('merged', False)
202 merged_streams = [('merged streams', stdout)]
203 separate_streams = [('stdout', stdout), ('stderr', stderr)]
204 streams = merged_streams if is_merged else separate_streams
205 for name, value in streams:
206 if value:
207 logger.debug("Output on %s:\n%s", name, value)
208 else:
209 logger.debug("No output on %s.", name)
210 return returncode, stdout
211
212
213 def skip_on_raise(*exc_types):
214 """
215 Decorate a test function to translation specific exception types to :exc:`unittest.SkipTest`.
216
217 :param exc_types: One or more positional arguments give the exception
218 types to be translated to :exc:`unittest.SkipTest`.
219 :returns: A decorator function specialized to `exc_types`.
220 """
221 def decorator(function):
222 @functools.wraps(function)
223 def wrapper(*args, **kw):
224 try:
225 return function(*args, **kw)
226 except exc_types as e:
227 logger.debug("Translating exception to unittest.SkipTest ..", exc_info=True)
228 raise unittest.SkipTest("skipping test because %s was raised" % type(e))
229 return wrapper
230 return decorator
231
232
233 def touch(filename):
234 """
235 The equivalent of the UNIX :man:`touch` program in Python.
236
237 :param filename: The pathname of the file to touch (a string).
238
239 Note that missing directories are automatically created using
240 :func:`make_dirs()`.
241 """
242 make_dirs(os.path.dirname(filename))
243 with open(filename, 'a'):
244 os.utime(filename, None)
245
246
247 class CallableTimedOut(Exception):
248
249 """Raised by :func:`retry()` when the timeout expires."""
250
251
252 class ContextManager(object):
253
254 """Base class to enable composition of context managers."""
255
256 def __enter__(self):
257 """Enable use as context managers."""
258 return self
259
260 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
261 """Enable use as context managers."""
262
263
264 class PatchedAttribute(ContextManager):
265
266 """Context manager that temporary replaces an object attribute using :func:`setattr()`."""
267
268 def __init__(self, obj, name, value):
269 """
270 Initialize a :class:`PatchedAttribute` object.
271
272 :param obj: The object to patch.
273 :param name: An attribute name.
274 :param value: The value to set.
275 """
276 self.object_to_patch = obj
277 self.attribute_to_patch = name
278 self.patched_value = value
279 self.original_value = NOTHING
280
281 def __enter__(self):
282 """
283 Replace (patch) the attribute.
284
285 :returns: The object whose attribute was patched.
286 """
287 # Enable composition of context managers.
288 super(PatchedAttribute, self).__enter__()
289 # Patch the object's attribute.
290 self.original_value = getattr(self.object_to_patch, self.attribute_to_patch, NOTHING)
291 setattr(self.object_to_patch, self.attribute_to_patch, self.patched_value)
292 return self.object_to_patch
293
294 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
295 """Restore the attribute to its original value."""
296 # Enable composition of context managers.
297 super(PatchedAttribute, self).__exit__(exc_type, exc_value, traceback)
298 # Restore the object's attribute.
299 if self.original_value is NOTHING:
300 delattr(self.object_to_patch, self.attribute_to_patch)
301 else:
302 setattr(self.object_to_patch, self.attribute_to_patch, self.original_value)
303
304
305 class PatchedItem(ContextManager):
306
307 """Context manager that temporary replaces an object item using :meth:`~object.__setitem__()`."""
308
309 def __init__(self, obj, item, value):
310 """
311 Initialize a :class:`PatchedItem` object.
312
313 :param obj: The object to patch.
314 :param item: The item to patch.
315 :param value: The value to set.
316 """
317 self.object_to_patch = obj
318 self.item_to_patch = item
319 self.patched_value = value
320 self.original_value = NOTHING
321
322 def __enter__(self):
323 """
324 Replace (patch) the item.
325
326 :returns: The object whose item was patched.
327 """
328 # Enable composition of context managers.
329 super(PatchedItem, self).__enter__()
330 # Patch the object's item.
331 try:
332 self.original_value = self.object_to_patch[self.item_to_patch]
333 except KeyError:
334 self.original_value = NOTHING
335 self.object_to_patch[self.item_to_patch] = self.patched_value
336 return self.object_to_patch
337
338 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
339 """Restore the item to its original value."""
340 # Enable composition of context managers.
341 super(PatchedItem, self).__exit__(exc_type, exc_value, traceback)
342 # Restore the object's item.
343 if self.original_value is NOTHING:
344 del self.object_to_patch[self.item_to_patch]
345 else:
346 self.object_to_patch[self.item_to_patch] = self.original_value
347
348
349 class TemporaryDirectory(ContextManager):
350
351 """
352 Easy temporary directory creation & cleanup using the :keyword:`with` statement.
353
354 Here's an example of how to use this:
355
356 .. code-block:: python
357
358 with TemporaryDirectory() as directory:
359 # Do something useful here.
360 assert os.path.isdir(directory)
361 """
362
363 def __init__(self, **options):
364 """
365 Initialize a :class:`TemporaryDirectory` object.
366
367 :param options: Any keyword arguments are passed on to
368 :func:`tempfile.mkdtemp()`.
369 """
370 self.mkdtemp_options = options
371 self.temporary_directory = None
372
373 def __enter__(self):
374 """
375 Create the temporary directory using :func:`tempfile.mkdtemp()`.
376
377 :returns: The pathname of the directory (a string).
378 """
379 # Enable composition of context managers.
380 super(TemporaryDirectory, self).__enter__()
381 # Create the temporary directory.
382 self.temporary_directory = tempfile.mkdtemp(**self.mkdtemp_options)
383 return self.temporary_directory
384
385 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
386 """Cleanup the temporary directory using :func:`shutil.rmtree()`."""
387 # Enable composition of context managers.
388 super(TemporaryDirectory, self).__exit__(exc_type, exc_value, traceback)
389 # Cleanup the temporary directory.
390 if self.temporary_directory is not None:
391 shutil.rmtree(self.temporary_directory)
392 self.temporary_directory = None
393
394
395 class MockedHomeDirectory(PatchedItem, TemporaryDirectory):
396
397 """
398 Context manager to temporarily change ``$HOME`` (the current user's profile directory).
399
400 This class is a composition of the :class:`PatchedItem` and
401 :class:`TemporaryDirectory` context managers.
402 """
403
404 def __init__(self):
405 """Initialize a :class:`MockedHomeDirectory` object."""
406 PatchedItem.__init__(self, os.environ, 'HOME', os.environ.get('HOME'))
407 TemporaryDirectory.__init__(self)
408
409 def __enter__(self):
410 """
411 Activate the custom ``$PATH``.
412
413 :returns: The pathname of the directory that has
414 been added to ``$PATH`` (a string).
415 """
416 # Get the temporary directory.
417 directory = TemporaryDirectory.__enter__(self)
418 # Override the value to patch now that we have
419 # the pathname of the temporary directory.
420 self.patched_value = directory
421 # Temporary patch $HOME.
422 PatchedItem.__enter__(self)
423 # Pass the pathname of the temporary directory to the caller.
424 return directory
425
426 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
427 """Deactivate the custom ``$HOME``."""
428 super(MockedHomeDirectory, self).__exit__(exc_type, exc_value, traceback)
429
430
431 class CustomSearchPath(PatchedItem, TemporaryDirectory):
432
433 """
434 Context manager to temporarily customize ``$PATH`` (the executable search path).
435
436 This class is a composition of the :class:`PatchedItem` and
437 :class:`TemporaryDirectory` context managers.
438 """
439
440 def __init__(self, isolated=False):
441 """
442 Initialize a :class:`CustomSearchPath` object.
443
444 :param isolated: :data:`True` to clear the original search path,
445 :data:`False` to add the temporary directory to the
446 start of the search path.
447 """
448 # Initialize our own instance variables.
449 self.isolated_search_path = isolated
450 # Selectively initialize our superclasses.
451 PatchedItem.__init__(self, os.environ, 'PATH', self.current_search_path)
452 TemporaryDirectory.__init__(self)
453
454 def __enter__(self):
455 """
456 Activate the custom ``$PATH``.
457
458 :returns: The pathname of the directory that has
459 been added to ``$PATH`` (a string).
460 """
461 # Get the temporary directory.
462 directory = TemporaryDirectory.__enter__(self)
463 # Override the value to patch now that we have
464 # the pathname of the temporary directory.
465 self.patched_value = (
466 directory if self.isolated_search_path
467 else os.pathsep.join([directory] + self.current_search_path.split(os.pathsep))
468 )
469 # Temporary patch the $PATH.
470 PatchedItem.__enter__(self)
471 # Pass the pathname of the temporary directory to the caller
472 # because they may want to `install' custom executables.
473 return directory
474
475 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
476 """Deactivate the custom ``$PATH``."""
477 super(CustomSearchPath, self).__exit__(exc_type, exc_value, traceback)
478
479 @property
480 def current_search_path(self):
481 """The value of ``$PATH`` or :data:`os.defpath` (a string)."""
482 return os.environ.get('PATH', os.defpath)
483
484
485 class MockedProgram(CustomSearchPath):
486
487 """
488 Context manager to mock the existence of a program (executable).
489
490 This class extends the functionality of :class:`CustomSearchPath`.
491 """
492
493 def __init__(self, name, returncode=0, script=None):
494 """
495 Initialize a :class:`MockedProgram` object.
496
497 :param name: The name of the program (a string).
498 :param returncode: The return code that the program should emit (a
499 number, defaults to zero).
500 :param script: Shell script code to include in the mocked program (a
501 string or :data:`None`). This can be used to mock a
502 program that is expected to generate specific output.
503 """
504 # Initialize our own instance variables.
505 self.program_name = name
506 self.program_returncode = returncode
507 self.program_script = script
508 self.program_signal_file = None
509 # Initialize our superclasses.
510 super(MockedProgram, self).__init__()
511
512 def __enter__(self):
513 """
514 Create the mock program.
515
516 :returns: The pathname of the directory that has
517 been added to ``$PATH`` (a string).
518 """
519 directory = super(MockedProgram, self).__enter__()
520 self.program_signal_file = os.path.join(directory, 'program-was-run-%s' % random_string(10))
521 pathname = os.path.join(directory, self.program_name)
522 with open(pathname, 'w') as handle:
523 handle.write('#!/bin/sh\n')
524 handle.write('echo > %s\n' % pipes.quote(self.program_signal_file))
525 if self.program_script:
526 handle.write('%s\n' % self.program_script.strip())
527 handle.write('exit %i\n' % self.program_returncode)
528 os.chmod(pathname, 0o755)
529 return directory
530
531 def __exit__(self, *args, **kw):
532 """
533 Ensure that the mock program was run.
534
535 :raises: :exc:`~exceptions.AssertionError` when
536 the mock program hasn't been run.
537 """
538 try:
539 assert self.program_signal_file and os.path.isfile(self.program_signal_file), \
540 ("It looks like %r was never run!" % self.program_name)
541 finally:
542 return super(MockedProgram, self).__exit__(*args, **kw)
543
544
545 class CaptureOutput(ContextManager):
546
547 """
548 Context manager that captures what's written to :data:`sys.stdout` and :data:`sys.stderr`.
549
550 .. attribute:: stdin
551
552 The :class:`~humanfriendly.compat.StringIO` object used to feed the standard input stream.
553
554 .. attribute:: stdout
555
556 The :class:`CaptureBuffer` object used to capture the standard output stream.
557
558 .. attribute:: stderr
559
560 The :class:`CaptureBuffer` object used to capture the standard error stream.
561 """
562
563 def __init__(self, merged=False, input='', enabled=True):
564 """
565 Initialize a :class:`CaptureOutput` object.
566
567 :param merged: :data:`True` to merge the streams,
568 :data:`False` to capture them separately.
569 :param input: The data that reads from :data:`sys.stdin`
570 should return (a string).
571 :param enabled: :data:`True` to enable capturing (the default),
572 :data:`False` otherwise. This makes it easy to
573 unconditionally use :class:`CaptureOutput` in
574 a :keyword:`with` block while preserving the
575 choice to opt out of capturing output.
576 """
577 self.stdin = StringIO(input)
578 self.stdout = CaptureBuffer()
579 self.stderr = self.stdout if merged else CaptureBuffer()
580 self.patched_attributes = []
581 if enabled:
582 self.patched_attributes.extend(
583 PatchedAttribute(sys, name, getattr(self, name))
584 for name in ('stdin', 'stdout', 'stderr')
585 )
586
587 def __enter__(self):
588 """Start capturing what's written to :data:`sys.stdout` and :data:`sys.stderr`."""
589 super(CaptureOutput, self).__enter__()
590 for context in self.patched_attributes:
591 context.__enter__()
592 return self
593
594 def __exit__(self, exc_type=None, exc_value=None, traceback=None):
595 """Stop capturing what's written to :data:`sys.stdout` and :data:`sys.stderr`."""
596 super(CaptureOutput, self).__exit__(exc_type, exc_value, traceback)
597 for context in self.patched_attributes:
598 context.__exit__(exc_type, exc_value, traceback)
599
600 def get_lines(self):
601 """Get the contents of :attr:`stdout` split into separate lines."""
602 return self.get_text().splitlines()
603
604 def get_text(self):
605 """Get the contents of :attr:`stdout` as a Unicode string."""
606 return self.stdout.get_text()
607
608 def getvalue(self):
609 """Get the text written to :data:`sys.stdout`."""
610 return self.stdout.getvalue()
611
612
613 class CaptureBuffer(StringIO):
614
615 """
616 Helper for :class:`CaptureOutput` to provide an easy to use API.
617
618 The two methods defined by this subclass were specifically chosen to match
619 the names of the methods provided by my :pypi:`capturer` package which
620 serves a similar role as :class:`CaptureOutput` but knows how to simulate
621 an interactive terminal (tty).
622 """
623
624 def get_lines(self):
625 """Get the contents of the buffer split into separate lines."""
626 return self.get_text().splitlines()
627
628 def get_text(self):
629 """Get the contents of the buffer as a Unicode string."""
630 return self.getvalue()
631
632
633 class TestCase(unittest.TestCase):
634
635 """Subclass of :class:`unittest.TestCase` with automatic logging and other miscellaneous features."""
636
637 def __init__(self, *args, **kw):
638 """
639 Initialize a :class:`TestCase` object.
640
641 Any positional and/or keyword arguments are passed on to the
642 initializer of the superclass.
643 """
644 super(TestCase, self).__init__(*args, **kw)
645
646 def setUp(self, log_level=logging.DEBUG):
647 """setUp(log_level=logging.DEBUG)
648 Automatically configure logging to the terminal.
649
650 :param log_level: Refer to :func:`configure_logging()`.
651
652 The :func:`setUp()` method is automatically called by
653 :class:`unittest.TestCase` before each test method starts.
654 It does two things:
655
656 - Logging to the terminal is configured using
657 :func:`configure_logging()`.
658
659 - Before the test method starts a newline is emitted, to separate the
660 name of the test method (which will be printed to the terminal by
661 :mod:`unittest` or :pypi:`pytest`) from the first line of logging
662 output that the test method is likely going to generate.
663 """
664 # Configure logging to the terminal.
665 configure_logging(log_level)
666 # Separate the name of the test method (printed by the superclass
667 # and/or py.test without a newline at the end) from the first line of
668 # logging output that the test method is likely going to generate.
669 sys.stderr.write("\n")