diff env/lib/python3.9/site-packages/boltons/excutils.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/boltons/excutils.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,258 @@
+# -*- coding: utf-8 -*-
+
+import sys
+import traceback
+import linecache
+from collections import namedtuple
+
+# TODO: last arg or first arg?  (last arg makes it harder to *args
+#       into, but makes it more readable in the default exception
+#       __repr__ output)
+# TODO: Multiexception wrapper
+
+
+__all__ = ['ExceptionCauseMixin']
+
+
+class ExceptionCauseMixin(Exception):
+    """
+    A mixin class for wrapping an exception in another exception, or
+    otherwise indicating an exception was caused by another exception.
+
+    This is most useful in concurrent or failure-intolerant scenarios,
+    where just because one operation failed, doesn't mean the remainder
+    should be aborted, or that it's the appropriate time to raise
+    exceptions.
+
+    This is still a work in progress, but an example use case at the
+    bottom of this module.
+
+    NOTE: when inheriting, you will probably want to put the
+    ExceptionCauseMixin first. Builtin exceptions are not good about
+    calling super()
+    """
+
+    cause = None
+
+    def __new__(cls, *args, **kw):
+        cause = None
+        if args and isinstance(args[0], Exception):
+            cause, args = args[0], args[1:]
+        ret = super(ExceptionCauseMixin, cls).__new__(cls, *args, **kw)
+        ret.cause = cause
+        if cause is None:
+            return ret
+        root_cause = getattr(cause, 'root_cause', None)
+        if root_cause is None:
+            ret.root_cause = cause
+        else:
+            ret.root_cause = root_cause
+
+        full_trace = getattr(cause, 'full_trace', None)
+        if full_trace is not None:
+            ret.full_trace = list(full_trace)
+            ret._tb = list(cause._tb)
+            ret._stack = list(cause._stack)
+            return ret
+
+        try:
+            exc_type, exc_value, exc_tb = sys.exc_info()
+            if exc_type is None and exc_value is None:
+                return ret
+            if cause is exc_value or root_cause is exc_value:
+                # handles when cause is the current exception or when
+                # there are multiple wraps while handling the original
+                # exception, but a cause was never provided
+                ret._tb = _extract_from_tb(exc_tb)
+                ret._stack = _extract_from_frame(exc_tb.tb_frame)
+                ret.full_trace = ret._stack[:-1] + ret._tb
+        finally:
+            del exc_tb
+        return ret
+
+    def get_str(self):
+        """
+        Get formatted the formatted traceback and exception
+        message. This function exists separately from __str__()
+        because __str__() is somewhat specialized for the built-in
+        traceback module's particular usage.
+        """
+        ret = []
+        trace_str = self._get_trace_str()
+        if trace_str:
+            ret.extend(['Traceback (most recent call last):\n', trace_str])
+        ret.append(self._get_exc_str())
+        return ''.join(ret)
+
+    def _get_message(self):
+        args = getattr(self, 'args', [])
+        if self.cause:
+            args = args[1:]
+        if args and args[0]:
+            return args[0]
+        return ''
+
+    def _get_trace_str(self):
+        if not self.cause:
+            return super(ExceptionCauseMixin, self).__repr__()
+        if self.full_trace:
+            return ''.join(traceback.format_list(self.full_trace))
+        return ''
+
+    def _get_exc_str(self, incl_name=True):
+        cause_str = _format_exc(self.root_cause)
+        message = self._get_message()
+        ret = []
+        if incl_name:
+            ret = [self.__class__.__name__, ': ']
+        if message:
+            ret.extend([message, ' (caused by ', cause_str, ')'])
+        else:
+            ret.extend([' caused by ', cause_str])
+        return ''.join(ret)
+
+    def __str__(self):
+        if not self.cause:
+            return super(ExceptionCauseMixin, self).__str__()
+        trace_str = self._get_trace_str()
+        ret = []
+        if trace_str:
+            message = self._get_message()
+            if message:
+                ret.extend([message, ' --- '])
+            ret.extend(['Wrapped traceback (most recent call last):\n',
+                        trace_str,
+                        self._get_exc_str(incl_name=True)])
+            return ''.join(ret)
+        else:
+            return self._get_exc_str(incl_name=False)
+
+
+def _format_exc(exc, message=None):
+    if message is None:
+        message = exc
+    exc_str = traceback._format_final_exc_line(exc.__class__.__name__, message)
+    return exc_str.rstrip()
+
+
+_BaseTBItem = namedtuple('_BaseTBItem', 'filename, lineno, name, line')
+
+
+class _TBItem(_BaseTBItem):
+    def __repr__(self):
+        ret = super(_TBItem, self).__repr__()
+        ret += ' <%r>' % self.frame_id
+        return ret
+
+
+class _DeferredLine(object):
+    def __init__(self, filename, lineno, module_globals=None):
+        self.filename = filename
+        self.lineno = lineno
+        module_globals = module_globals or {}
+        self.module_globals = dict([(k, v) for k, v in module_globals.items()
+                                    if k in ('__name__', '__loader__')])
+
+    def __eq__(self, other):
+        return (self.lineno, self.filename) == (other.lineno, other.filename)
+
+    def __ne__(self, other):
+        return (self.lineno, self.filename) != (other.lineno, other.filename)
+
+    def __str__(self):
+        if hasattr(self, '_line'):
+            return self._line
+        linecache.checkcache(self.filename)
+        line = linecache.getline(self.filename,
+                                 self.lineno,
+                                 self.module_globals)
+        if line:
+            line = line.strip()
+        else:
+            line = None
+        self._line = line
+        return line
+
+    def __repr__(self):
+        return repr(str(self))
+
+    def __len__(self):
+        return len(str(self))
+
+    def strip(self):
+        return str(self).strip()
+
+
+def _extract_from_frame(f=None, limit=None):
+    ret = []
+    if f is None:
+        f = sys._getframe(1)  # cross-impl yadayada
+    if limit is None:
+        limit = getattr(sys, 'tracebacklimit', 1000)
+    n = 0
+    while f is not None and n < limit:
+        filename = f.f_code.co_filename
+        lineno = f.f_lineno
+        name = f.f_code.co_name
+        line = _DeferredLine(filename, lineno, f.f_globals)
+        item = _TBItem(filename, lineno, name, line)
+        item.frame_id = id(f)
+        ret.append(item)
+        f = f.f_back
+        n += 1
+    ret.reverse()
+    return ret
+
+
+def _extract_from_tb(tb, limit=None):
+    ret = []
+    if limit is None:
+        limit = getattr(sys, 'tracebacklimit', 1000)
+    n = 0
+    while tb is not None and n < limit:
+        filename = tb.tb_frame.f_code.co_filename
+        lineno = tb.tb_lineno
+        name = tb.tb_frame.f_code.co_name
+        line = _DeferredLine(filename, lineno, tb.tb_frame.f_globals)
+        item = _TBItem(filename, lineno, name, line)
+        item.frame_id = id(tb.tb_frame)
+        ret.append(item)
+        tb = tb.tb_next
+        n += 1
+    return ret
+
+
+# An Example/Prototest:
+
+
+class MathError(ExceptionCauseMixin, ValueError):
+    pass
+
+
+def whoops_math():
+    return 1/0
+
+
+def math_lol(n=0):
+    if n < 3:
+        return math_lol(n=n+1)
+    try:
+        return whoops_math()
+    except ZeroDivisionError as zde:
+        exc = MathError(zde, 'ya done messed up')
+        raise exc
+
+def main():
+    try:
+        math_lol()
+    except ValueError as me:
+        exc = MathError(me, 'hi')
+        raise exc
+
+
+if __name__ == '__main__':
+    try:
+        main()
+    except Exception:
+        import pdb;pdb.post_mortem()
+        raise