Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/schema_salad/sourceline.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/schema_salad/sourceline.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,368 +0,0 @@ -from __future__ import absolute_import - -import itertools -import os -import re -import traceback -from typing import ( - Any, - AnyStr, - Dict, - List, - MutableMapping, - MutableSequence, - Optional, - Pattern, - Tuple, - Type, - Union, -) - -import six -from future.utils import raise_from -from typing_extensions import Text # pylint: disable=unused-import - -import ruamel.yaml -from ruamel.yaml.comments import CommentedBase, CommentedMap, CommentedSeq - -# move to a regular typing import when Python 3.3-3.6 is no longer supported - - -lineno_re = re.compile(u"^(.*?:[0-9]+:[0-9]+: )(( *)(.*))") - - -def regex_chunk(lines, regex): - # type: (List[str], Pattern[str]) -> List[List[str]] - lst = list(itertools.dropwhile(lambda x: not regex.match(x), lines)) - arr = [] - while lst: - ret = [lst[0]] + list( - itertools.takewhile(lambda x: not regex.match(x), lst[1:]) - ) - arr.append(ret) - lst = list(itertools.dropwhile(lambda x: not regex.match(x), lst[1:])) - return arr - - -def chunk_messages(message): # type: (str) -> List[Tuple[int, str]] - file_regex = re.compile(r"^(.+:\d+:\d+:)(\s+)(.+)$") - item_regex = re.compile(r"^\s*\*\s+") - arr = [] - for chun in regex_chunk(message.splitlines(), file_regex): - fst = chun[0] - mat = file_regex.match(fst) - if mat: - place = mat.group(1) - indent = len(mat.group(2)) - - lst = [mat.group(3)] + chun[1:] - if [x for x in lst if item_regex.match(x)]: - for item in regex_chunk(lst, item_regex): - msg = re.sub(item_regex, "", "\n".join(item)) - arr.append((indent, place + " " + re.sub(r"[\n\s]+", " ", msg))) - else: - msg = re.sub(item_regex, "", "\n".join(lst)) - arr.append((indent, place + " " + re.sub(r"[\n\s]+", " ", msg))) - return arr - - -def reformat_yaml_exception_message(message): # type: (str) -> str - line_regex = re.compile(r'^\s+in "(.+)", line (\d+), column (\d+)$') - fname_regex = re.compile(r"^file://" + re.escape(os.getcwd()) + "/") - msgs = message.splitlines() - ret = [] - - if len(msgs) == 3: - msgs = msgs[1:] - nblanks = 0 - elif len(msgs) == 4: - c_msg = msgs[0] - match = line_regex.match(msgs[1]) - if match: - c_file, c_line, c_column = match.groups() - c_file = re.sub(fname_regex, "", c_file) - ret.append("{}:{}:{}: {}".format(c_file, c_line, c_column, c_msg)) - - msgs = msgs[2:] - nblanks = 2 - - p_msg = msgs[0] - match = line_regex.match(msgs[1]) - if match: - p_file, p_line, p_column = match.groups() - p_file = re.sub(fname_regex, "", p_file) - ret.append( - "{}:{}:{}:{} {}".format(p_file, p_line, p_column, " " * nblanks, p_msg) - ) - return "\n".join(ret) - - -def _add_lc_filename( - r, source -): # type: (ruamel.yaml.comments.CommentedBase, AnyStr) -> None - if isinstance(r, ruamel.yaml.comments.CommentedBase): - r.lc.filename = source - if isinstance(r, MutableSequence): - for d in r: - _add_lc_filename(d, source) - elif isinstance(r, MutableMapping): - for d in six.itervalues(r): - _add_lc_filename(d, source) - - -def relname(source): # type: (Text) -> Text - if source.startswith("file://"): - source = source[7:] - source = os.path.relpath(source) - return source - - -def add_lc_filename( - r, source -): # type: (ruamel.yaml.comments.CommentedBase, Text) -> None - _add_lc_filename(r, relname(source)) - - -def reflow_all(text, maxline=None): # type: (Text, Optional[int]) -> Text - if maxline is None: - maxline = int(os.environ.get("COLUMNS", "100")) - maxno = 0 - for l in text.splitlines(): - g = lineno_re.match(l) - if not g: - continue - maxno = max(maxno, len(g.group(1))) - maxno_text = maxline - maxno - msg = [] - for l in text.splitlines(): - g = lineno_re.match(l) - if not g: - msg.append(l) - continue - pre = g.group(1) - reflowed = reflow(g.group(2), maxno_text, g.group(3)).splitlines() - msg.extend([pre.ljust(maxno, " ") + r for r in reflowed]) - return "\n".join(msg) - - -def reflow(text, maxline, shift=""): # type: (Text, int, Text) -> Text - if maxline < 20: - maxline = 20 - if len(text) > maxline: - sp = text.rfind(" ", 0, maxline) - if sp < 1: - sp = text.find(" ", sp + 1) - if sp == -1: - sp = len(text) - if sp < len(text): - return "{}\n{}{}".format( - text[0:sp], shift, reflow(text[sp + 1 :], maxline, shift) - ) - return text - - -def indent( - v, nolead=False, shift=u" ", bullet=u" " -): # type: (Text, bool, Text, Text) -> Text - if nolead: - return v.splitlines()[0] + u"\n".join([shift + l for l in v.splitlines()[1:]]) - else: - - def lineno(i, l): # type: (int, Text) -> Text - r = lineno_re.match(l) - if r is not None: - return r.group(1) + (bullet if i == 0 else shift) + r.group(2) - else: - return (bullet if i == 0 else shift) + l - - return u"\n".join([lineno(i, l) for i, l in enumerate(v.splitlines())]) - - -def bullets(textlist, bul): # type: (List[Text], Text) -> Text - if len(textlist) == 1: - return textlist[0] - else: - return "\n".join(indent(t, bullet=bul) for t in textlist) - - -def strip_duplicated_lineno(text): # type: (Text) -> Text - """Same as `strip_dup_lineno` but without reflow""" - pre = None - msg = [] - for l in text.splitlines(): - g = lineno_re.match(l) - if not g: - msg.append(l) - continue - elif g.group(1) != pre: - msg.append(l) - pre = g.group(1) - else: - msg.append(" " * len(g.group(1)) + g.group(2)) - return "\n".join(msg) - - -def strip_dup_lineno(text, maxline=None): # type: (Text, Optional[int]) -> Text - if maxline is None: - maxline = int(os.environ.get("COLUMNS", "100")) - pre = None - msg = [] - maxno = 0 - for l in text.splitlines(): - g = lineno_re.match(l) - if not g: - continue - maxno = max(maxno, len(g.group(1))) - - for l in text.splitlines(): - g = lineno_re.match(l) - if not g: - msg.append(l) - continue - if g.group(1) != pre: - shift = maxno + len(g.group(3)) - g2 = reflow(g.group(2), maxline - shift, " " * shift) - pre = g.group(1) - msg.append(pre + " " * (maxno - len(g.group(1))) + g2) - else: - g2 = reflow(g.group(2), maxline - maxno, " " * (maxno + len(g.group(3)))) - msg.append(" " * maxno + g2) - return "\n".join(msg) - - -def cmap( - d, # type: Union[int, float, str, Text, Dict[Text, Any], List[Dict[Text, Any]]] - lc=None, # type: Optional[List[int]] - fn=None, # type: Optional[Text] -): # type: (...) -> Union[int, float, str, Text, CommentedMap, CommentedSeq] - if lc is None: - lc = [0, 0, 0, 0] - if fn is None: - fn = "test" - - if isinstance(d, CommentedMap): - fn = d.lc.filename if hasattr(d.lc, "filename") else fn - for k, v in six.iteritems(d): - if d.lc.data is not None and k in d.lc.data: - d[k] = cmap(v, lc=d.lc.data[k], fn=fn) - else: - d[k] = cmap(v, lc, fn=fn) - return d - if isinstance(d, CommentedSeq): - fn = d.lc.filename if hasattr(d.lc, "filename") else fn - for k2, v2 in enumerate(d): - if d.lc.data is not None and k2 in d.lc.data: - d[k2] = cmap(v2, lc=d.lc.data[k2], fn=fn) - else: - d[k2] = cmap(v2, lc, fn=fn) - return d - if isinstance(d, MutableMapping): - cm = CommentedMap() - for k in sorted(d.keys()): - v = d[k] - if isinstance(v, CommentedBase): - uselc = [v.lc.line, v.lc.col, v.lc.line, v.lc.col] - vfn = v.lc.filename if hasattr(v.lc, "filename") else fn - else: - uselc = lc - vfn = fn - cm[k] = cmap(v, lc=uselc, fn=vfn) - cm.lc.add_kv_line_col(k, uselc) - cm.lc.filename = fn - return cm - if isinstance(d, MutableSequence): - cs = CommentedSeq() - for k3, v3 in enumerate(d): - if isinstance(v3, CommentedBase): - uselc = [v3.lc.line, v3.lc.col, v3.lc.line, v3.lc.col] - vfn = v3.lc.filename if hasattr(v3.lc, "filename") else fn - else: - uselc = lc - vfn = fn - cs.append(cmap(v3, lc=uselc, fn=vfn)) - cs.lc.add_kv_line_col(k3, uselc) - cs.lc.filename = fn - return cs - else: - return d - - -class SourceLine(object): - def __init__( - self, - item, # type: Any - key=None, # type: Optional[Any] - raise_type=six.text_type, # type: Union[Type[six.text_type], Type[Exception]] - include_traceback=False, # type: bool - ): # type: (...) -> None - self.item = item - self.key = key - self.raise_type = raise_type - self.include_traceback = include_traceback - - def __enter__(self): # type: () -> SourceLine - return self - - def __exit__( - self, - exc_type, # type: Any - exc_value, # type: Any - tb, # type: Any - ): # type: (...) -> None - if not exc_value: - return - if self.include_traceback and six.PY2: - # Python2 doesn't actually have chained exceptions, so - # fake it by injecting the backtrace into the message. - raise_from( - self.makeError( - "\n".join(traceback.format_exception(exc_type, exc_value, tb)) - ), - exc_value, - ) - else: - raise_from(self.makeError(six.text_type(exc_value)), exc_value) - - def file(self): # type: () -> Optional[Text] - if hasattr(self.item, "lc") and hasattr(self.item.lc, "filename"): - return Text(self.item.lc.filename) - else: - return None - - def start(self): # type: () -> Optional[Tuple[int, int]] - if self.file() is None: - return None - elif ( - self.key is None - or self.item.lc.data is None - or self.key not in self.item.lc.data - ): - return ((self.item.lc.line or 0) + 1, (self.item.lc.col or 0) + 1) - else: - return ( - (self.item.lc.data[self.key][0] or 0) + 1, - (self.item.lc.data[self.key][1] or 0) + 1, - ) - - def end(self): # type: () -> Optional[Tuple[int, int]] - return None - - def makeLead(self): # type: () -> Text - if self.file(): - lcol = self.start() - line, col = lcol if lcol else ("", "") - return "{}:{}:{}:".format(self.file(), line, col) - else: - return "" - - def makeError(self, msg): # type: (Text) -> Any - if not isinstance(self.item, ruamel.yaml.comments.CommentedBase): - return self.raise_type(msg) - errs = [] - lead = self.makeLead() - for m in msg.splitlines(): - if bool(lineno_re.match(m)): - errs.append(m) - else: - errs.append("{} {}".format(lead, m)) - return self.raise_type("\n".join(errs))
