view env/lib/python3.9/site-packages/humanfriendly/testing.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

# Human friendly input/output in Python.
#
# Author: Peter Odding <peter@peterodding.com>
# Last Change: March 6, 2020
# URL: https://humanfriendly.readthedocs.io

"""
Utility classes and functions that make it easy to write :mod:`unittest` compatible test suites.

Over the years I've developed the habit of writing test suites for Python
projects using the :mod:`unittest` module. During those years I've come to know
:pypi:`pytest` and in fact I use :pypi:`pytest` to run my test suites (due to
its much better error reporting) but I've yet to publish a test suite that
*requires* :pypi:`pytest`. I have several reasons for doing so:

- It's nice to keep my test suites as simple and accessible as possible and
  not requiring a specific test runner is part of that attitude.

- Whereas :mod:`unittest` is quite explicit, :pypi:`pytest` contains a lot of
  magic, which kind of contradicts the Python mantra "explicit is better than
  implicit" (IMHO).
"""

# Standard library module
import functools
import logging
import os
import pipes
import shutil
import sys
import tempfile
import time
import unittest

# Modules included in our package.
from humanfriendly.compat import StringIO
from humanfriendly.text import random_string

# Initialize a logger for this module.
logger = logging.getLogger(__name__)

# A unique object reference used to detect missing attributes.
NOTHING = object()

# Public identifiers that require documentation.
__all__ = (
    'CallableTimedOut',
    'CaptureBuffer',
    'CaptureOutput',
    'ContextManager',
    'CustomSearchPath',
    'MockedProgram',
    'PatchedAttribute',
    'PatchedItem',
    'TemporaryDirectory',
    'TestCase',
    'configure_logging',
    'make_dirs',
    'retry',
    'run_cli',
    'skip_on_raise',
    'touch',
)


def configure_logging(log_level=logging.DEBUG):
    """configure_logging(log_level=logging.DEBUG)
    Automatically configure logging to the terminal.

    :param log_level: The log verbosity (a number, defaults
                      to :mod:`logging.DEBUG <logging>`).

    When :mod:`coloredlogs` is installed :func:`coloredlogs.install()` will be
    used to configure logging to the terminal. When this fails with an
    :exc:`~exceptions.ImportError` then :func:`logging.basicConfig()` is used
    as a fall back.
    """
    try:
        import coloredlogs
        coloredlogs.install(level=log_level)
    except ImportError:
        logging.basicConfig(
            level=log_level,
            format='%(asctime)s %(name)s[%(process)d] %(levelname)s %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S')


def make_dirs(pathname):
    """
    Create missing directories.

    :param pathname: The pathname of a directory (a string).
    """
    if not os.path.isdir(pathname):
        os.makedirs(pathname)


def retry(func, timeout=60, exc_type=AssertionError):
    """retry(func, timeout=60, exc_type=AssertionError)
    Retry a function until assertions no longer fail.

    :param func: A callable. When the callable returns
                 :data:`False` it will also be retried.
    :param timeout: The number of seconds after which to abort (a number,
                    defaults to 60).
    :param exc_type: The type of exceptions to retry (defaults
                     to :exc:`~exceptions.AssertionError`).
    :returns: The value returned by `func`.
    :raises: Once the timeout has expired :func:`retry()` will raise the
             previously retried assertion error. When `func` keeps returning
             :data:`False` until `timeout` expires :exc:`CallableTimedOut`
             will be raised.

    This function sleeps between retries to avoid claiming CPU cycles we don't
    need. It starts by sleeping for 0.1 second but adjusts this to one second
    as the number of retries grows.
    """
    pause = 0.1
    timeout += time.time()
    while True:
        try:
            result = func()
            if result is not False:
                return result
        except exc_type:
            if time.time() > timeout:
                raise
        else:
            if time.time() > timeout:
                raise CallableTimedOut()
        time.sleep(pause)
        if pause < 1:
            pause *= 2


def run_cli(entry_point, *arguments, **options):
    """
    Test a command line entry point.

    :param entry_point: The function that implements the command line interface
                        (a callable).
    :param arguments: Any positional arguments (strings) become the command
                      line arguments (:data:`sys.argv` items 1-N).
    :param options: The following keyword arguments are supported:

                    **capture**
                     Whether to use :class:`CaptureOutput`. Defaults
                     to :data:`True` but can be disabled by passing
                     :data:`False` instead.
                    **input**
                     Refer to :class:`CaptureOutput`.
                    **merged**
                     Refer to :class:`CaptureOutput`.
                    **program_name**
                     Used to set :data:`sys.argv` item 0.
    :returns: A tuple with two values:

              1. The return code (an integer).
              2. The captured output (a string).
    """
    # Add the `program_name' option to the arguments.
    arguments = list(arguments)
    arguments.insert(0, options.pop('program_name', sys.executable))
    # Log the command line arguments (and the fact that we're about to call the
    # command line entry point function).
    logger.debug("Calling command line entry point with arguments: %s", arguments)
    # Prepare to capture the return code and output even if the command line
    # interface raises an exception (whether the exception type is SystemExit
    # or something else).
    returncode = 0
    stdout = None
    stderr = None
    try:
        # Temporarily override sys.argv.
        with PatchedAttribute(sys, 'argv', arguments):
            # Manipulate the standard input/output/error streams?
            options['enabled'] = options.pop('capture', True)
            with CaptureOutput(**options) as capturer:
                try:
                    # Call the command line interface.
                    entry_point()
                finally:
                    # Get the output even if an exception is raised.
                    stdout = capturer.stdout.getvalue()
                    stderr = capturer.stderr.getvalue()
                    # Reconfigure logging to the terminal because it is very
                    # likely that the entry point function has changed the
                    # configured log level.
                    configure_logging()
    except BaseException as e:
        if isinstance(e, SystemExit):
            logger.debug("Intercepting return code %s from SystemExit exception.", e.code)
            returncode = e.code
        else:
            logger.warning("Defaulting return code to 1 due to raised exception.", exc_info=True)
            returncode = 1
    else:
        logger.debug("Command line entry point returned successfully!")
    # Always log the output captured on stdout/stderr, to make it easier to
    # diagnose test failures (but avoid duplicate logging when merged=True).
    is_merged = options.get('merged', False)
    merged_streams = [('merged streams', stdout)]
    separate_streams = [('stdout', stdout), ('stderr', stderr)]
    streams = merged_streams if is_merged else separate_streams
    for name, value in streams:
        if value:
            logger.debug("Output on %s:\n%s", name, value)
        else:
            logger.debug("No output on %s.", name)
    return returncode, stdout


def skip_on_raise(*exc_types):
    """
    Decorate a test function to translation specific exception types to :exc:`unittest.SkipTest`.

    :param exc_types: One or more positional arguments give the exception
                      types to be translated to :exc:`unittest.SkipTest`.
    :returns: A decorator function specialized to `exc_types`.
    """
    def decorator(function):
        @functools.wraps(function)
        def wrapper(*args, **kw):
            try:
                return function(*args, **kw)
            except exc_types as e:
                logger.debug("Translating exception to unittest.SkipTest ..", exc_info=True)
                raise unittest.SkipTest("skipping test because %s was raised" % type(e))
        return wrapper
    return decorator


def touch(filename):
    """
    The equivalent of the UNIX :man:`touch` program in Python.

    :param filename: The pathname of the file to touch (a string).

    Note that missing directories are automatically created using
    :func:`make_dirs()`.
    """
    make_dirs(os.path.dirname(filename))
    with open(filename, 'a'):
        os.utime(filename, None)


class CallableTimedOut(Exception):

    """Raised by :func:`retry()` when the timeout expires."""


class ContextManager(object):

    """Base class to enable composition of context managers."""

    def __enter__(self):
        """Enable use as context managers."""
        return self

    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
        """Enable use as context managers."""


class PatchedAttribute(ContextManager):

    """Context manager that temporary replaces an object attribute using :func:`setattr()`."""

    def __init__(self, obj, name, value):
        """
        Initialize a :class:`PatchedAttribute` object.

        :param obj: The object to patch.
        :param name: An attribute name.
        :param value: The value to set.
        """
        self.object_to_patch = obj
        self.attribute_to_patch = name
        self.patched_value = value
        self.original_value = NOTHING

    def __enter__(self):
        """
        Replace (patch) the attribute.

        :returns: The object whose attribute was patched.
        """
        # Enable composition of context managers.
        super(PatchedAttribute, self).__enter__()
        # Patch the object's attribute.
        self.original_value = getattr(self.object_to_patch, self.attribute_to_patch, NOTHING)
        setattr(self.object_to_patch, self.attribute_to_patch, self.patched_value)
        return self.object_to_patch

    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
        """Restore the attribute to its original value."""
        # Enable composition of context managers.
        super(PatchedAttribute, self).__exit__(exc_type, exc_value, traceback)
        # Restore the object's attribute.
        if self.original_value is NOTHING:
            delattr(self.object_to_patch, self.attribute_to_patch)
        else:
            setattr(self.object_to_patch, self.attribute_to_patch, self.original_value)


class PatchedItem(ContextManager):

    """Context manager that temporary replaces an object item using :meth:`~object.__setitem__()`."""

    def __init__(self, obj, item, value):
        """
        Initialize a :class:`PatchedItem` object.

        :param obj: The object to patch.
        :param item: The item to patch.
        :param value: The value to set.
        """
        self.object_to_patch = obj
        self.item_to_patch = item
        self.patched_value = value
        self.original_value = NOTHING

    def __enter__(self):
        """
        Replace (patch) the item.

        :returns: The object whose item was patched.
        """
        # Enable composition of context managers.
        super(PatchedItem, self).__enter__()
        # Patch the object's item.
        try:
            self.original_value = self.object_to_patch[self.item_to_patch]
        except KeyError:
            self.original_value = NOTHING
        self.object_to_patch[self.item_to_patch] = self.patched_value
        return self.object_to_patch

    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
        """Restore the item to its original value."""
        # Enable composition of context managers.
        super(PatchedItem, self).__exit__(exc_type, exc_value, traceback)
        # Restore the object's item.
        if self.original_value is NOTHING:
            del self.object_to_patch[self.item_to_patch]
        else:
            self.object_to_patch[self.item_to_patch] = self.original_value


class TemporaryDirectory(ContextManager):

    """
    Easy temporary directory creation & cleanup using the :keyword:`with` statement.

    Here's an example of how to use this:

    .. code-block:: python

       with TemporaryDirectory() as directory:
           # Do something useful here.
           assert os.path.isdir(directory)
    """

    def __init__(self, **options):
        """
        Initialize a :class:`TemporaryDirectory` object.

        :param options: Any keyword arguments are passed on to
                        :func:`tempfile.mkdtemp()`.
        """
        self.mkdtemp_options = options
        self.temporary_directory = None

    def __enter__(self):
        """
        Create the temporary directory using :func:`tempfile.mkdtemp()`.

        :returns: The pathname of the directory (a string).
        """
        # Enable composition of context managers.
        super(TemporaryDirectory, self).__enter__()
        # Create the temporary directory.
        self.temporary_directory = tempfile.mkdtemp(**self.mkdtemp_options)
        return self.temporary_directory

    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
        """Cleanup the temporary directory using :func:`shutil.rmtree()`."""
        # Enable composition of context managers.
        super(TemporaryDirectory, self).__exit__(exc_type, exc_value, traceback)
        # Cleanup the temporary directory.
        if self.temporary_directory is not None:
            shutil.rmtree(self.temporary_directory)
            self.temporary_directory = None


class MockedHomeDirectory(PatchedItem, TemporaryDirectory):

    """
    Context manager to temporarily change ``$HOME`` (the current user's profile directory).

    This class is a composition of the :class:`PatchedItem` and
    :class:`TemporaryDirectory` context managers.
    """

    def __init__(self):
        """Initialize a :class:`MockedHomeDirectory` object."""
        PatchedItem.__init__(self, os.environ, 'HOME', os.environ.get('HOME'))
        TemporaryDirectory.__init__(self)

    def __enter__(self):
        """
        Activate the custom ``$PATH``.

        :returns: The pathname of the directory that has
                  been added to ``$PATH`` (a string).
        """
        # Get the temporary directory.
        directory = TemporaryDirectory.__enter__(self)
        # Override the value to patch now that we have
        # the pathname of the temporary directory.
        self.patched_value = directory
        # Temporary patch $HOME.
        PatchedItem.__enter__(self)
        # Pass the pathname of the temporary directory to the caller.
        return directory

    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
        """Deactivate the custom ``$HOME``."""
        super(MockedHomeDirectory, self).__exit__(exc_type, exc_value, traceback)


class CustomSearchPath(PatchedItem, TemporaryDirectory):

    """
    Context manager to temporarily customize ``$PATH`` (the executable search path).

    This class is a composition of the :class:`PatchedItem` and
    :class:`TemporaryDirectory` context managers.
    """

    def __init__(self, isolated=False):
        """
        Initialize a :class:`CustomSearchPath` object.

        :param isolated: :data:`True` to clear the original search path,
                         :data:`False` to add the temporary directory to the
                         start of the search path.
        """
        # Initialize our own instance variables.
        self.isolated_search_path = isolated
        # Selectively initialize our superclasses.
        PatchedItem.__init__(self, os.environ, 'PATH', self.current_search_path)
        TemporaryDirectory.__init__(self)

    def __enter__(self):
        """
        Activate the custom ``$PATH``.

        :returns: The pathname of the directory that has
                  been added to ``$PATH`` (a string).
        """
        # Get the temporary directory.
        directory = TemporaryDirectory.__enter__(self)
        # Override the value to patch now that we have
        # the pathname of the temporary directory.
        self.patched_value = (
            directory if self.isolated_search_path
            else os.pathsep.join([directory] + self.current_search_path.split(os.pathsep))
        )
        # Temporary patch the $PATH.
        PatchedItem.__enter__(self)
        # Pass the pathname of the temporary directory to the caller
        # because they may want to `install' custom executables.
        return directory

    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
        """Deactivate the custom ``$PATH``."""
        super(CustomSearchPath, self).__exit__(exc_type, exc_value, traceback)

    @property
    def current_search_path(self):
        """The value of ``$PATH`` or :data:`os.defpath` (a string)."""
        return os.environ.get('PATH', os.defpath)


class MockedProgram(CustomSearchPath):

    """
    Context manager to mock the existence of a program (executable).

    This class extends the functionality of :class:`CustomSearchPath`.
    """

    def __init__(self, name, returncode=0, script=None):
        """
        Initialize a :class:`MockedProgram` object.

        :param name: The name of the program (a string).
        :param returncode: The return code that the program should emit (a
                           number, defaults to zero).
        :param script: Shell script code to include in the mocked program (a
                       string or :data:`None`). This can be used to mock a
                       program that is expected to generate specific output.
        """
        # Initialize our own instance variables.
        self.program_name = name
        self.program_returncode = returncode
        self.program_script = script
        self.program_signal_file = None
        # Initialize our superclasses.
        super(MockedProgram, self).__init__()

    def __enter__(self):
        """
        Create the mock program.

        :returns: The pathname of the directory that has
                  been added to ``$PATH`` (a string).
        """
        directory = super(MockedProgram, self).__enter__()
        self.program_signal_file = os.path.join(directory, 'program-was-run-%s' % random_string(10))
        pathname = os.path.join(directory, self.program_name)
        with open(pathname, 'w') as handle:
            handle.write('#!/bin/sh\n')
            handle.write('echo > %s\n' % pipes.quote(self.program_signal_file))
            if self.program_script:
                handle.write('%s\n' % self.program_script.strip())
            handle.write('exit %i\n' % self.program_returncode)
        os.chmod(pathname, 0o755)
        return directory

    def __exit__(self, *args, **kw):
        """
        Ensure that the mock program was run.

        :raises: :exc:`~exceptions.AssertionError` when
                 the mock program hasn't been run.
        """
        try:
            assert self.program_signal_file and os.path.isfile(self.program_signal_file), \
                ("It looks like %r was never run!" % self.program_name)
        finally:
            return super(MockedProgram, self).__exit__(*args, **kw)


class CaptureOutput(ContextManager):

    """
    Context manager that captures what's written to :data:`sys.stdout` and :data:`sys.stderr`.

    .. attribute:: stdin

       The :class:`~humanfriendly.compat.StringIO` object used to feed the standard input stream.

    .. attribute:: stdout

       The :class:`CaptureBuffer` object used to capture the standard output stream.

    .. attribute:: stderr

       The :class:`CaptureBuffer` object used to capture the standard error stream.
    """

    def __init__(self, merged=False, input='', enabled=True):
        """
        Initialize a :class:`CaptureOutput` object.

        :param merged: :data:`True` to merge the streams,
                       :data:`False` to capture them separately.
        :param input: The data that reads from :data:`sys.stdin`
                      should return (a string).
        :param enabled: :data:`True` to enable capturing (the default),
                        :data:`False` otherwise. This makes it easy to
                        unconditionally use :class:`CaptureOutput` in
                        a :keyword:`with` block while preserving the
                        choice to opt out of capturing output.
        """
        self.stdin = StringIO(input)
        self.stdout = CaptureBuffer()
        self.stderr = self.stdout if merged else CaptureBuffer()
        self.patched_attributes = []
        if enabled:
            self.patched_attributes.extend(
                PatchedAttribute(sys, name, getattr(self, name))
                for name in ('stdin', 'stdout', 'stderr')
            )

    def __enter__(self):
        """Start capturing what's written to :data:`sys.stdout` and :data:`sys.stderr`."""
        super(CaptureOutput, self).__enter__()
        for context in self.patched_attributes:
            context.__enter__()
        return self

    def __exit__(self, exc_type=None, exc_value=None, traceback=None):
        """Stop capturing what's written to :data:`sys.stdout` and :data:`sys.stderr`."""
        super(CaptureOutput, self).__exit__(exc_type, exc_value, traceback)
        for context in self.patched_attributes:
            context.__exit__(exc_type, exc_value, traceback)

    def get_lines(self):
        """Get the contents of :attr:`stdout` split into separate lines."""
        return self.get_text().splitlines()

    def get_text(self):
        """Get the contents of :attr:`stdout` as a Unicode string."""
        return self.stdout.get_text()

    def getvalue(self):
        """Get the text written to :data:`sys.stdout`."""
        return self.stdout.getvalue()


class CaptureBuffer(StringIO):

    """
    Helper for :class:`CaptureOutput` to provide an easy to use API.

    The two methods defined by this subclass were specifically chosen to match
    the names of the methods provided by my :pypi:`capturer` package which
    serves a similar role as :class:`CaptureOutput` but knows how to simulate
    an interactive terminal (tty).
    """

    def get_lines(self):
        """Get the contents of the buffer split into separate lines."""
        return self.get_text().splitlines()

    def get_text(self):
        """Get the contents of the buffer as a Unicode string."""
        return self.getvalue()


class TestCase(unittest.TestCase):

    """Subclass of :class:`unittest.TestCase` with automatic logging and other miscellaneous features."""

    def __init__(self, *args, **kw):
        """
        Initialize a :class:`TestCase` object.

        Any positional and/or keyword arguments are passed on to the
        initializer of the superclass.
        """
        super(TestCase, self).__init__(*args, **kw)

    def setUp(self, log_level=logging.DEBUG):
        """setUp(log_level=logging.DEBUG)
        Automatically configure logging to the terminal.

        :param log_level: Refer to :func:`configure_logging()`.

        The :func:`setUp()` method is automatically called by
        :class:`unittest.TestCase` before each test method starts.
        It does two things:

        - Logging to the terminal is configured using
          :func:`configure_logging()`.

        - Before the test method starts a newline is emitted, to separate the
          name of the test method (which will be printed to the terminal by
          :mod:`unittest` or :pypi:`pytest`) from the first line of logging
          output that the test method is likely going to generate.
        """
        # Configure logging to the terminal.
        configure_logging(log_level)
        # Separate the name of the test method (printed by the superclass
        # and/or py.test without a newline at the end) from the first line of
        # logging output that the test method is likely going to generate.
        sys.stderr.write("\n")