Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/coloredlogs/tests.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/coloredlogs/tests.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,657 @@ +# Automated tests for the `coloredlogs' package. +# +# Author: Peter Odding <peter@peterodding.com> +# Last Change: December 10, 2020 +# URL: https://coloredlogs.readthedocs.io + +"""Automated tests for the `coloredlogs` package.""" + +# Standard library modules. +import contextlib +import logging +import logging.handlers +import os +import re +import subprocess +import sys +import tempfile + +# External dependencies. +from humanfriendly.compat import StringIO +from humanfriendly.terminal import ANSI_COLOR_CODES, ANSI_CSI, ansi_style, ansi_wrap +from humanfriendly.testing import PatchedAttribute, PatchedItem, TestCase, retry +from humanfriendly.text import format, random_string + +# The module we're testing. +import coloredlogs +import coloredlogs.cli +from coloredlogs import ( + CHROOT_FILES, + ColoredFormatter, + NameNormalizer, + decrease_verbosity, + find_defined_levels, + find_handler, + find_hostname, + find_program_name, + find_username, + get_level, + increase_verbosity, + install, + is_verbose, + level_to_number, + match_stream_handler, + parse_encoded_styles, + set_level, + walk_propagation_tree, +) +from coloredlogs.demo import demonstrate_colored_logging +from coloredlogs.syslog import SystemLogging, is_syslog_supported, match_syslog_handler +from coloredlogs.converter import ( + ColoredCronMailer, + EIGHT_COLOR_PALETTE, + capture, + convert, +) + +# External test dependencies. +from capturer import CaptureOutput +from verboselogs import VerboseLogger + +# Compiled regular expression that matches a single line of output produced by +# the default log format (does not include matching of ANSI escape sequences). +PLAIN_TEXT_PATTERN = re.compile(r''' + (?P<date> \d{4}-\d{2}-\d{2} ) + \s (?P<time> \d{2}:\d{2}:\d{2} ) + \s (?P<hostname> \S+ ) + \s (?P<logger_name> \w+ ) + \[ (?P<process_id> \d+ ) \] + \s (?P<severity> [A-Z]+ ) + \s (?P<message> .* ) +''', re.VERBOSE) + +# Compiled regular expression that matches a single line of output produced by +# the default log format with milliseconds=True. +PATTERN_INCLUDING_MILLISECONDS = re.compile(r''' + (?P<date> \d{4}-\d{2}-\d{2} ) + \s (?P<time> \d{2}:\d{2}:\d{2},\d{3} ) + \s (?P<hostname> \S+ ) + \s (?P<logger_name> \w+ ) + \[ (?P<process_id> \d+ ) \] + \s (?P<severity> [A-Z]+ ) + \s (?P<message> .* ) +''', re.VERBOSE) + + +def setUpModule(): + """Speed up the tests by disabling the demo's artificial delay.""" + os.environ['COLOREDLOGS_DEMO_DELAY'] = '0' + coloredlogs.demo.DEMO_DELAY = 0 + + +class ColoredLogsTestCase(TestCase): + + """Container for the `coloredlogs` tests.""" + + def find_system_log(self): + """Find the system log file or skip the current test.""" + filename = ('/var/log/system.log' if sys.platform == 'darwin' else ( + '/var/log/syslog' if 'linux' in sys.platform else None + )) + if not filename: + self.skipTest("Location of system log file unknown!") + elif not os.path.isfile(filename): + self.skipTest("System log file not found! (%s)" % filename) + elif not os.access(filename, os.R_OK): + self.skipTest("Insufficient permissions to read system log file! (%s)" % filename) + else: + return filename + + def test_level_to_number(self): + """Make sure :func:`level_to_number()` works as intended.""" + # Make sure the default levels are translated as expected. + assert level_to_number('debug') == logging.DEBUG + assert level_to_number('info') == logging.INFO + assert level_to_number('warning') == logging.WARNING + assert level_to_number('error') == logging.ERROR + assert level_to_number('fatal') == logging.FATAL + # Make sure bogus level names don't blow up. + assert level_to_number('bogus-level') == logging.INFO + + def test_find_hostname(self): + """Make sure :func:`~find_hostname()` works correctly.""" + assert find_hostname() + # Create a temporary file as a placeholder for e.g. /etc/debian_chroot. + fd, temporary_file = tempfile.mkstemp() + try: + with open(temporary_file, 'w') as handle: + handle.write('first line\n') + handle.write('second line\n') + CHROOT_FILES.insert(0, temporary_file) + # Make sure the chroot file is being read. + assert find_hostname() == 'first line' + finally: + # Clean up. + CHROOT_FILES.pop(0) + os.unlink(temporary_file) + # Test that unreadable chroot files don't break coloredlogs. + try: + CHROOT_FILES.insert(0, temporary_file) + # Make sure that a usable value is still produced. + assert find_hostname() + finally: + # Clean up. + CHROOT_FILES.pop(0) + + def test_host_name_filter(self): + """Make sure :func:`install()` integrates with :class:`~coloredlogs.HostNameFilter()`.""" + install(fmt='%(hostname)s') + with CaptureOutput() as capturer: + logging.info("A truly insignificant message ..") + output = capturer.get_text() + assert find_hostname() in output + + def test_program_name_filter(self): + """Make sure :func:`install()` integrates with :class:`~coloredlogs.ProgramNameFilter()`.""" + install(fmt='%(programname)s') + with CaptureOutput() as capturer: + logging.info("A truly insignificant message ..") + output = capturer.get_text() + assert find_program_name() in output + + def test_username_filter(self): + """Make sure :func:`install()` integrates with :class:`~coloredlogs.UserNameFilter()`.""" + install(fmt='%(username)s') + with CaptureOutput() as capturer: + logging.info("A truly insignificant message ..") + output = capturer.get_text() + assert find_username() in output + + def test_system_logging(self): + """Make sure the :class:`coloredlogs.syslog.SystemLogging` context manager works.""" + system_log_file = self.find_system_log() + expected_message = random_string(50) + with SystemLogging(programname='coloredlogs-test-suite') as syslog: + if not syslog: + return self.skipTest("couldn't connect to syslog daemon") + # When I tried out the system logging support on macOS 10.13.1 on + # 2018-01-05 I found that while WARNING and ERROR messages show up + # in the system log DEBUG and INFO messages don't. This explains + # the importance of the level of the log message below. + logging.error("%s", expected_message) + # Retry the following assertion (for up to 60 seconds) to give the + # logging daemon time to write our log message to disk. This + # appears to be needed on MacOS workers on Travis CI, see: + # https://travis-ci.org/xolox/python-coloredlogs/jobs/325245853 + retry(lambda: check_contents(system_log_file, expected_message, True)) + + def test_system_logging_override(self): + """Make sure the :class:`coloredlogs.syslog.is_syslog_supported` respects the override.""" + with PatchedItem(os.environ, 'COLOREDLOGS_SYSLOG', 'true'): + assert is_syslog_supported() is True + with PatchedItem(os.environ, 'COLOREDLOGS_SYSLOG', 'false'): + assert is_syslog_supported() is False + + def test_syslog_shortcut_simple(self): + """Make sure that ``coloredlogs.install(syslog=True)`` works.""" + system_log_file = self.find_system_log() + expected_message = random_string(50) + with cleanup_handlers(): + # See test_system_logging() for the importance of this log level. + coloredlogs.install(syslog=True) + logging.error("%s", expected_message) + # See the comments in test_system_logging() on why this is retried. + retry(lambda: check_contents(system_log_file, expected_message, True)) + + def test_syslog_shortcut_enhanced(self): + """Make sure that ``coloredlogs.install(syslog='warning')`` works.""" + system_log_file = self.find_system_log() + the_expected_message = random_string(50) + not_an_expected_message = random_string(50) + with cleanup_handlers(): + # See test_system_logging() for the importance of these log levels. + coloredlogs.install(syslog='error') + logging.warning("%s", not_an_expected_message) + logging.error("%s", the_expected_message) + # See the comments in test_system_logging() on why this is retried. + retry(lambda: check_contents(system_log_file, the_expected_message, True)) + retry(lambda: check_contents(system_log_file, not_an_expected_message, False)) + + def test_name_normalization(self): + """Make sure :class:`~coloredlogs.NameNormalizer` works as intended.""" + nn = NameNormalizer() + for canonical_name in ['debug', 'info', 'warning', 'error', 'critical']: + assert nn.normalize_name(canonical_name) == canonical_name + assert nn.normalize_name(canonical_name.upper()) == canonical_name + assert nn.normalize_name('warn') == 'warning' + assert nn.normalize_name('fatal') == 'critical' + + def test_style_parsing(self): + """Make sure :func:`~coloredlogs.parse_encoded_styles()` works as intended.""" + encoded_styles = 'debug=green;warning=yellow;error=red;critical=red,bold' + decoded_styles = parse_encoded_styles(encoded_styles, normalize_key=lambda k: k.upper()) + assert sorted(decoded_styles.keys()) == sorted(['debug', 'warning', 'error', 'critical']) + assert decoded_styles['debug']['color'] == 'green' + assert decoded_styles['warning']['color'] == 'yellow' + assert decoded_styles['error']['color'] == 'red' + assert decoded_styles['critical']['color'] == 'red' + assert decoded_styles['critical']['bold'] is True + + def test_is_verbose(self): + """Make sure is_verbose() does what it should :-).""" + set_level(logging.INFO) + assert not is_verbose() + set_level(logging.DEBUG) + assert is_verbose() + set_level(logging.VERBOSE) + assert is_verbose() + + def test_increase_verbosity(self): + """Make sure increase_verbosity() respects default and custom levels.""" + # Start from a known state. + set_level(logging.INFO) + assert get_level() == logging.INFO + # INFO -> VERBOSE. + increase_verbosity() + assert get_level() == logging.VERBOSE + # VERBOSE -> DEBUG. + increase_verbosity() + assert get_level() == logging.DEBUG + # DEBUG -> SPAM. + increase_verbosity() + assert get_level() == logging.SPAM + # SPAM -> NOTSET. + increase_verbosity() + assert get_level() == logging.NOTSET + # NOTSET -> NOTSET. + increase_verbosity() + assert get_level() == logging.NOTSET + + def test_decrease_verbosity(self): + """Make sure decrease_verbosity() respects default and custom levels.""" + # Start from a known state. + set_level(logging.INFO) + assert get_level() == logging.INFO + # INFO -> NOTICE. + decrease_verbosity() + assert get_level() == logging.NOTICE + # NOTICE -> WARNING. + decrease_verbosity() + assert get_level() == logging.WARNING + # WARNING -> SUCCESS. + decrease_verbosity() + assert get_level() == logging.SUCCESS + # SUCCESS -> ERROR. + decrease_verbosity() + assert get_level() == logging.ERROR + # ERROR -> CRITICAL. + decrease_verbosity() + assert get_level() == logging.CRITICAL + # CRITICAL -> CRITICAL. + decrease_verbosity() + assert get_level() == logging.CRITICAL + + def test_level_discovery(self): + """Make sure find_defined_levels() always reports the levels defined in Python's standard library.""" + defined_levels = find_defined_levels() + level_values = defined_levels.values() + for number in (0, 10, 20, 30, 40, 50): + assert number in level_values + + def test_walk_propagation_tree(self): + """Make sure walk_propagation_tree() properly walks the tree of loggers.""" + root, parent, child, grand_child = self.get_logger_tree() + # Check the default mode of operation. + loggers = list(walk_propagation_tree(grand_child)) + assert loggers == [grand_child, child, parent, root] + # Now change the propagation (non-default mode of operation). + child.propagate = False + loggers = list(walk_propagation_tree(grand_child)) + assert loggers == [grand_child, child] + + def test_find_handler(self): + """Make sure find_handler() works as intended.""" + root, parent, child, grand_child = self.get_logger_tree() + # Add some handlers to the tree. + stream_handler = logging.StreamHandler() + syslog_handler = logging.handlers.SysLogHandler() + child.addHandler(stream_handler) + parent.addHandler(syslog_handler) + # Make sure the first matching handler is returned. + matched_handler, matched_logger = find_handler(grand_child, lambda h: isinstance(h, logging.Handler)) + assert matched_handler is stream_handler + # Make sure the first matching handler of the given type is returned. + matched_handler, matched_logger = find_handler(child, lambda h: isinstance(h, logging.handlers.SysLogHandler)) + assert matched_handler is syslog_handler + + def get_logger_tree(self): + """Create and return a tree of loggers.""" + # Get the root logger. + root = logging.getLogger() + # Create a top level logger for ourselves. + parent_name = random_string() + parent = logging.getLogger(parent_name) + # Create a child logger. + child_name = '%s.%s' % (parent_name, random_string()) + child = logging.getLogger(child_name) + # Create a grand child logger. + grand_child_name = '%s.%s' % (child_name, random_string()) + grand_child = logging.getLogger(grand_child_name) + return root, parent, child, grand_child + + def test_support_for_milliseconds(self): + """Make sure milliseconds are hidden by default but can be easily enabled.""" + # Check that the default log format doesn't include milliseconds. + stream = StringIO() + install(reconfigure=True, stream=stream) + logging.info("This should not include milliseconds.") + assert all(map(PLAIN_TEXT_PATTERN.match, stream.getvalue().splitlines())) + # Check that milliseconds can be enabled via a shortcut. + stream = StringIO() + install(milliseconds=True, reconfigure=True, stream=stream) + logging.info("This should include milliseconds.") + assert all(map(PATTERN_INCLUDING_MILLISECONDS.match, stream.getvalue().splitlines())) + + def test_support_for_milliseconds_directive(self): + """Make sure milliseconds using the ``%f`` directive are supported.""" + stream = StringIO() + install(reconfigure=True, stream=stream, datefmt='%Y-%m-%dT%H:%M:%S.%f%z') + logging.info("This should be timestamped according to #45.") + assert re.match(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}[+-]\d{4}\s', stream.getvalue()) + + def test_plain_text_output_format(self): + """Inspect the plain text output of coloredlogs.""" + logger = VerboseLogger(random_string(25)) + stream = StringIO() + install(level=logging.NOTSET, logger=logger, stream=stream) + # Test that filtering on severity works. + logger.setLevel(logging.INFO) + logger.debug("No one should see this message.") + assert len(stream.getvalue().strip()) == 0 + # Test that the default output format looks okay in plain text. + logger.setLevel(logging.NOTSET) + for method, severity in ((logger.debug, 'DEBUG'), + (logger.info, 'INFO'), + (logger.verbose, 'VERBOSE'), + (logger.warning, 'WARNING'), + (logger.error, 'ERROR'), + (logger.critical, 'CRITICAL')): + # XXX Workaround for a regression in Python 3.7 caused by the + # Logger.isEnabledFor() method using stale cache entries. If we + # don't clear the cache then logger.isEnabledFor(logging.DEBUG) + # returns False and no DEBUG message is emitted. + try: + logger._cache.clear() + except AttributeError: + pass + # Prepare the text. + text = "This is a message with severity %r." % severity.lower() + # Log the message with the given severity. + method(text) + # Get the line of output generated by the handler. + output = stream.getvalue() + lines = output.splitlines() + last_line = lines[-1] + assert text in last_line + assert severity in last_line + assert PLAIN_TEXT_PATTERN.match(last_line) + + def test_force_enable(self): + """Make sure ANSI escape sequences can be forced (bypassing auto-detection).""" + interpreter = subprocess.Popen([ + sys.executable, "-c", ";".join([ + "import coloredlogs, logging", + "coloredlogs.install(isatty=True)", + "logging.info('Hello world')", + ]), + ], stderr=subprocess.PIPE) + stdout, stderr = interpreter.communicate() + assert ANSI_CSI in stderr.decode('UTF-8') + + def test_auto_disable(self): + """ + Make sure ANSI escape sequences are not emitted when logging output is being redirected. + + This is a regression test for https://github.com/xolox/python-coloredlogs/issues/100. + + It works as follows: + + 1. We mock an interactive terminal using 'capturer' to ensure that this + test works inside test drivers that capture output (like pytest). + + 2. We launch a subprocess (to ensure a clean process state) where + stderr is captured but stdout is not, emulating issue #100. + + 3. The output captured on stderr contained ANSI escape sequences after + this test was written and before the issue was fixed, so now this + serves as a regression test for issue #100. + """ + with CaptureOutput(): + interpreter = subprocess.Popen([ + sys.executable, "-c", ";".join([ + "import coloredlogs, logging", + "coloredlogs.install()", + "logging.info('Hello world')", + ]), + ], stderr=subprocess.PIPE) + stdout, stderr = interpreter.communicate() + assert ANSI_CSI not in stderr.decode('UTF-8') + + def test_env_disable(self): + """Make sure ANSI escape sequences can be disabled using ``$NO_COLOR``.""" + with PatchedItem(os.environ, 'NO_COLOR', 'I like monochrome'): + with CaptureOutput() as capturer: + subprocess.check_call([ + sys.executable, "-c", ";".join([ + "import coloredlogs, logging", + "coloredlogs.install()", + "logging.info('Hello world')", + ]), + ]) + output = capturer.get_text() + assert ANSI_CSI not in output + + def test_html_conversion(self): + """Check the conversion from ANSI escape sequences to HTML.""" + # Check conversion of colored text. + for color_name, ansi_code in ANSI_COLOR_CODES.items(): + ansi_encoded_text = 'plain text followed by %s text' % ansi_wrap(color_name, color=color_name) + expected_html = format( + '<code>plain text followed by <span style="color:{css}">{name}</span> text</code>', + css=EIGHT_COLOR_PALETTE[ansi_code], name=color_name, + ) + self.assertEqual(expected_html, convert(ansi_encoded_text)) + # Check conversion of bright colored text. + expected_html = '<code><span style="color:#FF0">bright yellow</span></code>' + self.assertEqual(expected_html, convert(ansi_wrap('bright yellow', color='yellow', bright=True))) + # Check conversion of text with a background color. + expected_html = '<code><span style="background-color:#DE382B">red background</span></code>' + self.assertEqual(expected_html, convert(ansi_wrap('red background', background='red'))) + # Check conversion of text with a bright background color. + expected_html = '<code><span style="background-color:#F00">bright red background</span></code>' + self.assertEqual(expected_html, convert(ansi_wrap('bright red background', background='red', bright=True))) + # Check conversion of text that uses the 256 color mode palette as a foreground color. + expected_html = '<code><span style="color:#FFAF00">256 color mode foreground</span></code>' + self.assertEqual(expected_html, convert(ansi_wrap('256 color mode foreground', color=214))) + # Check conversion of text that uses the 256 color mode palette as a background color. + expected_html = '<code><span style="background-color:#AF0000">256 color mode background</span></code>' + self.assertEqual(expected_html, convert(ansi_wrap('256 color mode background', background=124))) + # Check that invalid 256 color mode indexes don't raise exceptions. + expected_html = '<code>plain text expected</code>' + self.assertEqual(expected_html, convert('\x1b[38;5;256mplain text expected\x1b[0m')) + # Check conversion of bold text. + expected_html = '<code><span style="font-weight:bold">bold text</span></code>' + self.assertEqual(expected_html, convert(ansi_wrap('bold text', bold=True))) + # Check conversion of underlined text. + expected_html = '<code><span style="text-decoration:underline">underlined text</span></code>' + self.assertEqual(expected_html, convert(ansi_wrap('underlined text', underline=True))) + # Check conversion of strike-through text. + expected_html = '<code><span style="text-decoration:line-through">strike-through text</span></code>' + self.assertEqual(expected_html, convert(ansi_wrap('strike-through text', strike_through=True))) + # Check conversion of inverse text. + expected_html = '<code><span style="background-color:#FFC706;color:#000">inverse</span></code>' + self.assertEqual(expected_html, convert(ansi_wrap('inverse', color='yellow', inverse=True))) + # Check conversion of URLs. + for sample_text in 'www.python.org', 'http://coloredlogs.rtfd.org', 'https://coloredlogs.rtfd.org': + sample_url = sample_text if '://' in sample_text else ('http://' + sample_text) + expected_html = '<code><a href="%s" style="color:inherit">%s</a></code>' % (sample_url, sample_text) + self.assertEqual(expected_html, convert(sample_text)) + # Check that the capture pattern for URLs doesn't match ANSI escape + # sequences and also check that the short hand for the 0 reset code is + # supported. These are tests for regressions of bugs found in + # coloredlogs <= 8.0. + reset_short_hand = '\x1b[0m' + blue_underlined = ansi_style(color='blue', underline=True) + ansi_encoded_text = '<%shttps://coloredlogs.readthedocs.io%s>' % (blue_underlined, reset_short_hand) + expected_html = ( + '<code><<span style="color:#006FB8;text-decoration:underline">' + '<a href="https://coloredlogs.readthedocs.io" style="color:inherit">' + 'https://coloredlogs.readthedocs.io' + '</a></span>></code>' + ) + self.assertEqual(expected_html, convert(ansi_encoded_text)) + + def test_output_interception(self): + """Test capturing of output from external commands.""" + expected_output = 'testing, 1, 2, 3 ..' + actual_output = capture(['echo', expected_output]) + assert actual_output.strip() == expected_output.strip() + + def test_enable_colored_cron_mailer(self): + """Test that automatic ANSI to HTML conversion when running under ``cron`` can be enabled.""" + with PatchedItem(os.environ, 'CONTENT_TYPE', 'text/html'): + with ColoredCronMailer() as mailer: + assert mailer.is_enabled + + def test_disable_colored_cron_mailer(self): + """Test that automatic ANSI to HTML conversion when running under ``cron`` can be disabled.""" + with PatchedItem(os.environ, 'CONTENT_TYPE', 'text/plain'): + with ColoredCronMailer() as mailer: + assert not mailer.is_enabled + + def test_auto_install(self): + """Test :func:`coloredlogs.auto_install()`.""" + needle = random_string() + command_line = [sys.executable, '-c', 'import logging; logging.info(%r)' % needle] + # Sanity check that log messages aren't enabled by default. + with CaptureOutput() as capturer: + os.environ['COLOREDLOGS_AUTO_INSTALL'] = 'false' + subprocess.check_call(command_line) + output = capturer.get_text() + assert needle not in output + # Test that the $COLOREDLOGS_AUTO_INSTALL environment variable can be + # used to automatically call coloredlogs.install() during initialization. + with CaptureOutput() as capturer: + os.environ['COLOREDLOGS_AUTO_INSTALL'] = 'true' + subprocess.check_call(command_line) + output = capturer.get_text() + assert needle in output + + def test_cli_demo(self): + """Test the command line colored logging demonstration.""" + with CaptureOutput() as capturer: + main('coloredlogs', '--demo') + output = capturer.get_text() + # Make sure the output contains all of the expected logging level names. + for name in 'debug', 'info', 'warning', 'error', 'critical': + assert name.upper() in output + + def test_cli_conversion(self): + """Test the command line HTML conversion.""" + output = main('coloredlogs', '--convert', 'coloredlogs', '--demo', capture=True) + # Make sure the output is encoded as HTML. + assert '<span' in output + + def test_empty_conversion(self): + """ + Test that conversion of empty output produces no HTML. + + This test was added because I found that ``coloredlogs --convert`` when + used in a cron job could cause cron to send out what appeared to be + empty emails. On more careful inspection the body of those emails was + ``<code></code>``. By not emitting the wrapper element when no other + HTML is generated, cron will not send out an email. + """ + output = main('coloredlogs', '--convert', 'true', capture=True) + assert not output.strip() + + def test_implicit_usage_message(self): + """Test that the usage message is shown when no actions are given.""" + assert 'Usage:' in main('coloredlogs', capture=True) + + def test_explicit_usage_message(self): + """Test that the usage message is shown when ``--help`` is given.""" + assert 'Usage:' in main('coloredlogs', '--help', capture=True) + + def test_custom_record_factory(self): + """ + Test that custom LogRecord factories are supported. + + This test is a bit convoluted because the logging module suppresses + exceptions. We monkey patch the method suspected of encountering + exceptions so that we can tell after it was called whether any + exceptions occurred (despite the exceptions not propagating). + """ + if not hasattr(logging, 'getLogRecordFactory'): + return self.skipTest("this test requires Python >= 3.2") + + exceptions = [] + original_method = ColoredFormatter.format + original_factory = logging.getLogRecordFactory() + + def custom_factory(*args, **kwargs): + record = original_factory(*args, **kwargs) + record.custom_attribute = 0xdecafbad + return record + + def custom_method(*args, **kw): + try: + return original_method(*args, **kw) + except Exception as e: + exceptions.append(e) + raise + + with PatchedAttribute(ColoredFormatter, 'format', custom_method): + logging.setLogRecordFactory(custom_factory) + try: + demonstrate_colored_logging() + finally: + logging.setLogRecordFactory(original_factory) + + # Ensure that no exceptions were triggered. + assert not exceptions + + +def check_contents(filename, contents, match): + """Check if a line in a file contains an expected string.""" + with open(filename) as handle: + assert any(contents in line for line in handle) == match + + +def main(*arguments, **options): + """Wrap the command line interface to make it easier to test.""" + capture = options.get('capture', False) + saved_argv = sys.argv + saved_stdout = sys.stdout + try: + sys.argv = arguments + if capture: + sys.stdout = StringIO() + coloredlogs.cli.main() + if capture: + return sys.stdout.getvalue() + finally: + sys.argv = saved_argv + sys.stdout = saved_stdout + + +@contextlib.contextmanager +def cleanup_handlers(): + """Context manager to cleanup output handlers.""" + # There's nothing to set up so we immediately yield control. + yield + # After the with block ends we cleanup any output handlers. + for match_func in match_stream_handler, match_syslog_handler: + handler, logger = find_handler(logging.getLogger(), match_func) + if handler and logger: + logger.removeHandler(handler)