diff env/lib/python3.9/site-packages/humanfriendly/testing.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/humanfriendly/testing.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,669 @@
+# Human friendly input/output in Python.
+#
+# Author: Peter Odding <peter@peterodding.com>
+# Last Change: March 6, 2020
+# URL: https://humanfriendly.readthedocs.io
+
+"""
+Utility classes and functions that make it easy to write :mod:`unittest` compatible test suites.
+
+Over the years I've developed the habit of writing test suites for Python
+projects using the :mod:`unittest` module. During those years I've come to know
+:pypi:`pytest` and in fact I use :pypi:`pytest` to run my test suites (due to
+its much better error reporting) but I've yet to publish a test suite that
+*requires* :pypi:`pytest`. I have several reasons for doing so:
+
+- It's nice to keep my test suites as simple and accessible as possible and
+  not requiring a specific test runner is part of that attitude.
+
+- Whereas :mod:`unittest` is quite explicit, :pypi:`pytest` contains a lot of
+  magic, which kind of contradicts the Python mantra "explicit is better than
+  implicit" (IMHO).
+"""
+
+# Standard library module
+import functools
+import logging
+import os
+import pipes
+import shutil
+import sys
+import tempfile
+import time
+import unittest
+
+# Modules included in our package.
+from humanfriendly.compat import StringIO
+from humanfriendly.text import random_string
+
+# Initialize a logger for this module.
+logger = logging.getLogger(__name__)
+
+# A unique object reference used to detect missing attributes.
+NOTHING = object()
+
+# Public identifiers that require documentation.
+__all__ = (
+    'CallableTimedOut',
+    'CaptureBuffer',
+    'CaptureOutput',
+    'ContextManager',
+    'CustomSearchPath',
+    'MockedProgram',
+    'PatchedAttribute',
+    'PatchedItem',
+    'TemporaryDirectory',
+    'TestCase',
+    'configure_logging',
+    'make_dirs',
+    'retry',
+    'run_cli',
+    'skip_on_raise',
+    'touch',
+)
+
+
+def configure_logging(log_level=logging.DEBUG):
+    """configure_logging(log_level=logging.DEBUG)
+    Automatically configure logging to the terminal.
+
+    :param log_level: The log verbosity (a number, defaults
+                      to :mod:`logging.DEBUG <logging>`).
+
+    When :mod:`coloredlogs` is installed :func:`coloredlogs.install()` will be
+    used to configure logging to the terminal. When this fails with an
+    :exc:`~exceptions.ImportError` then :func:`logging.basicConfig()` is used
+    as a fall back.
+    """
+    try:
+        import coloredlogs
+        coloredlogs.install(level=log_level)
+    except ImportError:
+        logging.basicConfig(
+            level=log_level,
+            format='%(asctime)s %(name)s[%(process)d] %(levelname)s %(message)s',
+            datefmt='%Y-%m-%d %H:%M:%S')
+
+
+def make_dirs(pathname):
+    """
+    Create missing directories.
+
+    :param pathname: The pathname of a directory (a string).
+    """
+    if not os.path.isdir(pathname):
+        os.makedirs(pathname)
+
+
+def retry(func, timeout=60, exc_type=AssertionError):
+    """retry(func, timeout=60, exc_type=AssertionError)
+    Retry a function until assertions no longer fail.
+
+    :param func: A callable. When the callable returns
+                 :data:`False` it will also be retried.
+    :param timeout: The number of seconds after which to abort (a number,
+                    defaults to 60).
+    :param exc_type: The type of exceptions to retry (defaults
+                     to :exc:`~exceptions.AssertionError`).
+    :returns: The value returned by `func`.
+    :raises: Once the timeout has expired :func:`retry()` will raise the
+             previously retried assertion error. When `func` keeps returning
+             :data:`False` until `timeout` expires :exc:`CallableTimedOut`
+             will be raised.
+
+    This function sleeps between retries to avoid claiming CPU cycles we don't
+    need. It starts by sleeping for 0.1 second but adjusts this to one second
+    as the number of retries grows.
+    """
+    pause = 0.1
+    timeout += time.time()
+    while True:
+        try:
+            result = func()
+            if result is not False:
+                return result
+        except exc_type:
+            if time.time() > timeout:
+                raise
+        else:
+            if time.time() > timeout:
+                raise CallableTimedOut()
+        time.sleep(pause)
+        if pause < 1:
+            pause *= 2
+
+
+def run_cli(entry_point, *arguments, **options):
+    """
+    Test a command line entry point.
+
+    :param entry_point: The function that implements the command line interface
+                        (a callable).
+    :param arguments: Any positional arguments (strings) become the command
+                      line arguments (:data:`sys.argv` items 1-N).
+    :param options: The following keyword arguments are supported:
+
+                    **capture**
+                     Whether to use :class:`CaptureOutput`. Defaults
+                     to :data:`True` but can be disabled by passing
+                     :data:`False` instead.
+                    **input**
+                     Refer to :class:`CaptureOutput`.
+                    **merged**
+                     Refer to :class:`CaptureOutput`.
+                    **program_name**
+                     Used to set :data:`sys.argv` item 0.
+    :returns: A tuple with two values:
+
+              1. The return code (an integer).
+              2. The captured output (a string).
+    """
+    # Add the `program_name' option to the arguments.
+    arguments = list(arguments)
+    arguments.insert(0, options.pop('program_name', sys.executable))
+    # Log the command line arguments (and the fact that we're about to call the
+    # command line entry point function).
+    logger.debug("Calling command line entry point with arguments: %s", arguments)
+    # Prepare to capture the return code and output even if the command line
+    # interface raises an exception (whether the exception type is SystemExit
+    # or something else).
+    returncode = 0
+    stdout = None
+    stderr = None
+    try:
+        # Temporarily override sys.argv.
+        with PatchedAttribute(sys, 'argv', arguments):
+            # Manipulate the standard input/output/error streams?
+            options['enabled'] = options.pop('capture', True)
+            with CaptureOutput(**options) as capturer:
+                try:
+                    # Call the command line interface.
+                    entry_point()
+                finally:
+                    # Get the output even if an exception is raised.
+                    stdout = capturer.stdout.getvalue()
+                    stderr = capturer.stderr.getvalue()
+                    # Reconfigure logging to the terminal because it is very
+                    # likely that the entry point function has changed the
+                    # configured log level.
+                    configure_logging()
+    except BaseException as e:
+        if isinstance(e, SystemExit):
+            logger.debug("Intercepting return code %s from SystemExit exception.", e.code)
+            returncode = e.code
+        else:
+            logger.warning("Defaulting return code to 1 due to raised exception.", exc_info=True)
+            returncode = 1
+    else:
+        logger.debug("Command line entry point returned successfully!")
+    # Always log the output captured on stdout/stderr, to make it easier to
+    # diagnose test failures (but avoid duplicate logging when merged=True).
+    is_merged = options.get('merged', False)
+    merged_streams = [('merged streams', stdout)]
+    separate_streams = [('stdout', stdout), ('stderr', stderr)]
+    streams = merged_streams if is_merged else separate_streams
+    for name, value in streams:
+        if value:
+            logger.debug("Output on %s:\n%s", name, value)
+        else:
+            logger.debug("No output on %s.", name)
+    return returncode, stdout
+
+
+def skip_on_raise(*exc_types):
+    """
+    Decorate a test function to translation specific exception types to :exc:`unittest.SkipTest`.
+
+    :param exc_types: One or more positional arguments give the exception
+                      types to be translated to :exc:`unittest.SkipTest`.
+    :returns: A decorator function specialized to `exc_types`.
+    """
+    def decorator(function):
+        @functools.wraps(function)
+        def wrapper(*args, **kw):
+            try:
+                return function(*args, **kw)
+            except exc_types as e:
+                logger.debug("Translating exception to unittest.SkipTest ..", exc_info=True)
+                raise unittest.SkipTest("skipping test because %s was raised" % type(e))
+        return wrapper
+    return decorator
+
+
+def touch(filename):
+    """
+    The equivalent of the UNIX :man:`touch` program in Python.
+
+    :param filename: The pathname of the file to touch (a string).
+
+    Note that missing directories are automatically created using
+    :func:`make_dirs()`.
+    """
+    make_dirs(os.path.dirname(filename))
+    with open(filename, 'a'):
+        os.utime(filename, None)
+
+
+class CallableTimedOut(Exception):
+
+    """Raised by :func:`retry()` when the timeout expires."""
+
+
+class ContextManager(object):
+
+    """Base class to enable composition of context managers."""
+
+    def __enter__(self):
+        """Enable use as context managers."""
+        return self
+
+    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
+        """Enable use as context managers."""
+
+
+class PatchedAttribute(ContextManager):
+
+    """Context manager that temporary replaces an object attribute using :func:`setattr()`."""
+
+    def __init__(self, obj, name, value):
+        """
+        Initialize a :class:`PatchedAttribute` object.
+
+        :param obj: The object to patch.
+        :param name: An attribute name.
+        :param value: The value to set.
+        """
+        self.object_to_patch = obj
+        self.attribute_to_patch = name
+        self.patched_value = value
+        self.original_value = NOTHING
+
+    def __enter__(self):
+        """
+        Replace (patch) the attribute.
+
+        :returns: The object whose attribute was patched.
+        """
+        # Enable composition of context managers.
+        super(PatchedAttribute, self).__enter__()
+        # Patch the object's attribute.
+        self.original_value = getattr(self.object_to_patch, self.attribute_to_patch, NOTHING)
+        setattr(self.object_to_patch, self.attribute_to_patch, self.patched_value)
+        return self.object_to_patch
+
+    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
+        """Restore the attribute to its original value."""
+        # Enable composition of context managers.
+        super(PatchedAttribute, self).__exit__(exc_type, exc_value, traceback)
+        # Restore the object's attribute.
+        if self.original_value is NOTHING:
+            delattr(self.object_to_patch, self.attribute_to_patch)
+        else:
+            setattr(self.object_to_patch, self.attribute_to_patch, self.original_value)
+
+
+class PatchedItem(ContextManager):
+
+    """Context manager that temporary replaces an object item using :meth:`~object.__setitem__()`."""
+
+    def __init__(self, obj, item, value):
+        """
+        Initialize a :class:`PatchedItem` object.
+
+        :param obj: The object to patch.
+        :param item: The item to patch.
+        :param value: The value to set.
+        """
+        self.object_to_patch = obj
+        self.item_to_patch = item
+        self.patched_value = value
+        self.original_value = NOTHING
+
+    def __enter__(self):
+        """
+        Replace (patch) the item.
+
+        :returns: The object whose item was patched.
+        """
+        # Enable composition of context managers.
+        super(PatchedItem, self).__enter__()
+        # Patch the object's item.
+        try:
+            self.original_value = self.object_to_patch[self.item_to_patch]
+        except KeyError:
+            self.original_value = NOTHING
+        self.object_to_patch[self.item_to_patch] = self.patched_value
+        return self.object_to_patch
+
+    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
+        """Restore the item to its original value."""
+        # Enable composition of context managers.
+        super(PatchedItem, self).__exit__(exc_type, exc_value, traceback)
+        # Restore the object's item.
+        if self.original_value is NOTHING:
+            del self.object_to_patch[self.item_to_patch]
+        else:
+            self.object_to_patch[self.item_to_patch] = self.original_value
+
+
+class TemporaryDirectory(ContextManager):
+
+    """
+    Easy temporary directory creation & cleanup using the :keyword:`with` statement.
+
+    Here's an example of how to use this:
+
+    .. code-block:: python
+
+       with TemporaryDirectory() as directory:
+           # Do something useful here.
+           assert os.path.isdir(directory)
+    """
+
+    def __init__(self, **options):
+        """
+        Initialize a :class:`TemporaryDirectory` object.
+
+        :param options: Any keyword arguments are passed on to
+                        :func:`tempfile.mkdtemp()`.
+        """
+        self.mkdtemp_options = options
+        self.temporary_directory = None
+
+    def __enter__(self):
+        """
+        Create the temporary directory using :func:`tempfile.mkdtemp()`.
+
+        :returns: The pathname of the directory (a string).
+        """
+        # Enable composition of context managers.
+        super(TemporaryDirectory, self).__enter__()
+        # Create the temporary directory.
+        self.temporary_directory = tempfile.mkdtemp(**self.mkdtemp_options)
+        return self.temporary_directory
+
+    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
+        """Cleanup the temporary directory using :func:`shutil.rmtree()`."""
+        # Enable composition of context managers.
+        super(TemporaryDirectory, self).__exit__(exc_type, exc_value, traceback)
+        # Cleanup the temporary directory.
+        if self.temporary_directory is not None:
+            shutil.rmtree(self.temporary_directory)
+            self.temporary_directory = None
+
+
+class MockedHomeDirectory(PatchedItem, TemporaryDirectory):
+
+    """
+    Context manager to temporarily change ``$HOME`` (the current user's profile directory).
+
+    This class is a composition of the :class:`PatchedItem` and
+    :class:`TemporaryDirectory` context managers.
+    """
+
+    def __init__(self):
+        """Initialize a :class:`MockedHomeDirectory` object."""
+        PatchedItem.__init__(self, os.environ, 'HOME', os.environ.get('HOME'))
+        TemporaryDirectory.__init__(self)
+
+    def __enter__(self):
+        """
+        Activate the custom ``$PATH``.
+
+        :returns: The pathname of the directory that has
+                  been added to ``$PATH`` (a string).
+        """
+        # Get the temporary directory.
+        directory = TemporaryDirectory.__enter__(self)
+        # Override the value to patch now that we have
+        # the pathname of the temporary directory.
+        self.patched_value = directory
+        # Temporary patch $HOME.
+        PatchedItem.__enter__(self)
+        # Pass the pathname of the temporary directory to the caller.
+        return directory
+
+    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
+        """Deactivate the custom ``$HOME``."""
+        super(MockedHomeDirectory, self).__exit__(exc_type, exc_value, traceback)
+
+
+class CustomSearchPath(PatchedItem, TemporaryDirectory):
+
+    """
+    Context manager to temporarily customize ``$PATH`` (the executable search path).
+
+    This class is a composition of the :class:`PatchedItem` and
+    :class:`TemporaryDirectory` context managers.
+    """
+
+    def __init__(self, isolated=False):
+        """
+        Initialize a :class:`CustomSearchPath` object.
+
+        :param isolated: :data:`True` to clear the original search path,
+                         :data:`False` to add the temporary directory to the
+                         start of the search path.
+        """
+        # Initialize our own instance variables.
+        self.isolated_search_path = isolated
+        # Selectively initialize our superclasses.
+        PatchedItem.__init__(self, os.environ, 'PATH', self.current_search_path)
+        TemporaryDirectory.__init__(self)
+
+    def __enter__(self):
+        """
+        Activate the custom ``$PATH``.
+
+        :returns: The pathname of the directory that has
+                  been added to ``$PATH`` (a string).
+        """
+        # Get the temporary directory.
+        directory = TemporaryDirectory.__enter__(self)
+        # Override the value to patch now that we have
+        # the pathname of the temporary directory.
+        self.patched_value = (
+            directory if self.isolated_search_path
+            else os.pathsep.join([directory] + self.current_search_path.split(os.pathsep))
+        )
+        # Temporary patch the $PATH.
+        PatchedItem.__enter__(self)
+        # Pass the pathname of the temporary directory to the caller
+        # because they may want to `install' custom executables.
+        return directory
+
+    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
+        """Deactivate the custom ``$PATH``."""
+        super(CustomSearchPath, self).__exit__(exc_type, exc_value, traceback)
+
+    @property
+    def current_search_path(self):
+        """The value of ``$PATH`` or :data:`os.defpath` (a string)."""
+        return os.environ.get('PATH', os.defpath)
+
+
+class MockedProgram(CustomSearchPath):
+
+    """
+    Context manager to mock the existence of a program (executable).
+
+    This class extends the functionality of :class:`CustomSearchPath`.
+    """
+
+    def __init__(self, name, returncode=0, script=None):
+        """
+        Initialize a :class:`MockedProgram` object.
+
+        :param name: The name of the program (a string).
+        :param returncode: The return code that the program should emit (a
+                           number, defaults to zero).
+        :param script: Shell script code to include in the mocked program (a
+                       string or :data:`None`). This can be used to mock a
+                       program that is expected to generate specific output.
+        """
+        # Initialize our own instance variables.
+        self.program_name = name
+        self.program_returncode = returncode
+        self.program_script = script
+        self.program_signal_file = None
+        # Initialize our superclasses.
+        super(MockedProgram, self).__init__()
+
+    def __enter__(self):
+        """
+        Create the mock program.
+
+        :returns: The pathname of the directory that has
+                  been added to ``$PATH`` (a string).
+        """
+        directory = super(MockedProgram, self).__enter__()
+        self.program_signal_file = os.path.join(directory, 'program-was-run-%s' % random_string(10))
+        pathname = os.path.join(directory, self.program_name)
+        with open(pathname, 'w') as handle:
+            handle.write('#!/bin/sh\n')
+            handle.write('echo > %s\n' % pipes.quote(self.program_signal_file))
+            if self.program_script:
+                handle.write('%s\n' % self.program_script.strip())
+            handle.write('exit %i\n' % self.program_returncode)
+        os.chmod(pathname, 0o755)
+        return directory
+
+    def __exit__(self, *args, **kw):
+        """
+        Ensure that the mock program was run.
+
+        :raises: :exc:`~exceptions.AssertionError` when
+                 the mock program hasn't been run.
+        """
+        try:
+            assert self.program_signal_file and os.path.isfile(self.program_signal_file), \
+                ("It looks like %r was never run!" % self.program_name)
+        finally:
+            return super(MockedProgram, self).__exit__(*args, **kw)
+
+
+class CaptureOutput(ContextManager):
+
+    """
+    Context manager that captures what's written to :data:`sys.stdout` and :data:`sys.stderr`.
+
+    .. attribute:: stdin
+
+       The :class:`~humanfriendly.compat.StringIO` object used to feed the standard input stream.
+
+    .. attribute:: stdout
+
+       The :class:`CaptureBuffer` object used to capture the standard output stream.
+
+    .. attribute:: stderr
+
+       The :class:`CaptureBuffer` object used to capture the standard error stream.
+    """
+
+    def __init__(self, merged=False, input='', enabled=True):
+        """
+        Initialize a :class:`CaptureOutput` object.
+
+        :param merged: :data:`True` to merge the streams,
+                       :data:`False` to capture them separately.
+        :param input: The data that reads from :data:`sys.stdin`
+                      should return (a string).
+        :param enabled: :data:`True` to enable capturing (the default),
+                        :data:`False` otherwise. This makes it easy to
+                        unconditionally use :class:`CaptureOutput` in
+                        a :keyword:`with` block while preserving the
+                        choice to opt out of capturing output.
+        """
+        self.stdin = StringIO(input)
+        self.stdout = CaptureBuffer()
+        self.stderr = self.stdout if merged else CaptureBuffer()
+        self.patched_attributes = []
+        if enabled:
+            self.patched_attributes.extend(
+                PatchedAttribute(sys, name, getattr(self, name))
+                for name in ('stdin', 'stdout', 'stderr')
+            )
+
+    def __enter__(self):
+        """Start capturing what's written to :data:`sys.stdout` and :data:`sys.stderr`."""
+        super(CaptureOutput, self).__enter__()
+        for context in self.patched_attributes:
+            context.__enter__()
+        return self
+
+    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
+        """Stop capturing what's written to :data:`sys.stdout` and :data:`sys.stderr`."""
+        super(CaptureOutput, self).__exit__(exc_type, exc_value, traceback)
+        for context in self.patched_attributes:
+            context.__exit__(exc_type, exc_value, traceback)
+
+    def get_lines(self):
+        """Get the contents of :attr:`stdout` split into separate lines."""
+        return self.get_text().splitlines()
+
+    def get_text(self):
+        """Get the contents of :attr:`stdout` as a Unicode string."""
+        return self.stdout.get_text()
+
+    def getvalue(self):
+        """Get the text written to :data:`sys.stdout`."""
+        return self.stdout.getvalue()
+
+
+class CaptureBuffer(StringIO):
+
+    """
+    Helper for :class:`CaptureOutput` to provide an easy to use API.
+
+    The two methods defined by this subclass were specifically chosen to match
+    the names of the methods provided by my :pypi:`capturer` package which
+    serves a similar role as :class:`CaptureOutput` but knows how to simulate
+    an interactive terminal (tty).
+    """
+
+    def get_lines(self):
+        """Get the contents of the buffer split into separate lines."""
+        return self.get_text().splitlines()
+
+    def get_text(self):
+        """Get the contents of the buffer as a Unicode string."""
+        return self.getvalue()
+
+
+class TestCase(unittest.TestCase):
+
+    """Subclass of :class:`unittest.TestCase` with automatic logging and other miscellaneous features."""
+
+    def __init__(self, *args, **kw):
+        """
+        Initialize a :class:`TestCase` object.
+
+        Any positional and/or keyword arguments are passed on to the
+        initializer of the superclass.
+        """
+        super(TestCase, self).__init__(*args, **kw)
+
+    def setUp(self, log_level=logging.DEBUG):
+        """setUp(log_level=logging.DEBUG)
+        Automatically configure logging to the terminal.
+
+        :param log_level: Refer to :func:`configure_logging()`.
+
+        The :func:`setUp()` method is automatically called by
+        :class:`unittest.TestCase` before each test method starts.
+        It does two things:
+
+        - Logging to the terminal is configured using
+          :func:`configure_logging()`.
+
+        - Before the test method starts a newline is emitted, to separate the
+          name of the test method (which will be printed to the terminal by
+          :mod:`unittest` or :pypi:`pytest`) from the first line of logging
+          output that the test method is likely going to generate.
+        """
+        # Configure logging to the terminal.
+        configure_logging(log_level)
+        # Separate the name of the test method (printed by the superclass
+        # and/or py.test without a newline at the end) from the first line of
+        # logging output that the test method is likely going to generate.
+        sys.stderr.write("\n")