Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/schema_salad/sourceline.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:32:28 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| 0:d30785e31577 | 1:56ad4e20f292 |
|---|---|
| 1 from __future__ import absolute_import | |
| 2 | |
| 3 import itertools | |
| 4 import os | |
| 5 import re | |
| 6 import traceback | |
| 7 from typing import ( | |
| 8 Any, | |
| 9 AnyStr, | |
| 10 Dict, | |
| 11 List, | |
| 12 MutableMapping, | |
| 13 MutableSequence, | |
| 14 Optional, | |
| 15 Pattern, | |
| 16 Tuple, | |
| 17 Type, | |
| 18 Union, | |
| 19 ) | |
| 20 | |
| 21 import six | |
| 22 from future.utils import raise_from | |
| 23 from typing_extensions import Text # pylint: disable=unused-import | |
| 24 | |
| 25 import ruamel.yaml | |
| 26 from ruamel.yaml.comments import CommentedBase, CommentedMap, CommentedSeq | |
| 27 | |
| 28 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
| 29 | |
| 30 | |
| 31 lineno_re = re.compile(u"^(.*?:[0-9]+:[0-9]+: )(( *)(.*))") | |
| 32 | |
| 33 | |
| 34 def regex_chunk(lines, regex): | |
| 35 # type: (List[str], Pattern[str]) -> List[List[str]] | |
| 36 lst = list(itertools.dropwhile(lambda x: not regex.match(x), lines)) | |
| 37 arr = [] | |
| 38 while lst: | |
| 39 ret = [lst[0]] + list( | |
| 40 itertools.takewhile(lambda x: not regex.match(x), lst[1:]) | |
| 41 ) | |
| 42 arr.append(ret) | |
| 43 lst = list(itertools.dropwhile(lambda x: not regex.match(x), lst[1:])) | |
| 44 return arr | |
| 45 | |
| 46 | |
| 47 def chunk_messages(message): # type: (str) -> List[Tuple[int, str]] | |
| 48 file_regex = re.compile(r"^(.+:\d+:\d+:)(\s+)(.+)$") | |
| 49 item_regex = re.compile(r"^\s*\*\s+") | |
| 50 arr = [] | |
| 51 for chun in regex_chunk(message.splitlines(), file_regex): | |
| 52 fst = chun[0] | |
| 53 mat = file_regex.match(fst) | |
| 54 if mat: | |
| 55 place = mat.group(1) | |
| 56 indent = len(mat.group(2)) | |
| 57 | |
| 58 lst = [mat.group(3)] + chun[1:] | |
| 59 if [x for x in lst if item_regex.match(x)]: | |
| 60 for item in regex_chunk(lst, item_regex): | |
| 61 msg = re.sub(item_regex, "", "\n".join(item)) | |
| 62 arr.append((indent, place + " " + re.sub(r"[\n\s]+", " ", msg))) | |
| 63 else: | |
| 64 msg = re.sub(item_regex, "", "\n".join(lst)) | |
| 65 arr.append((indent, place + " " + re.sub(r"[\n\s]+", " ", msg))) | |
| 66 return arr | |
| 67 | |
| 68 | |
| 69 def reformat_yaml_exception_message(message): # type: (str) -> str | |
| 70 line_regex = re.compile(r'^\s+in "(.+)", line (\d+), column (\d+)$') | |
| 71 fname_regex = re.compile(r"^file://" + re.escape(os.getcwd()) + "/") | |
| 72 msgs = message.splitlines() | |
| 73 ret = [] | |
| 74 | |
| 75 if len(msgs) == 3: | |
| 76 msgs = msgs[1:] | |
| 77 nblanks = 0 | |
| 78 elif len(msgs) == 4: | |
| 79 c_msg = msgs[0] | |
| 80 match = line_regex.match(msgs[1]) | |
| 81 if match: | |
| 82 c_file, c_line, c_column = match.groups() | |
| 83 c_file = re.sub(fname_regex, "", c_file) | |
| 84 ret.append("{}:{}:{}: {}".format(c_file, c_line, c_column, c_msg)) | |
| 85 | |
| 86 msgs = msgs[2:] | |
| 87 nblanks = 2 | |
| 88 | |
| 89 p_msg = msgs[0] | |
| 90 match = line_regex.match(msgs[1]) | |
| 91 if match: | |
| 92 p_file, p_line, p_column = match.groups() | |
| 93 p_file = re.sub(fname_regex, "", p_file) | |
| 94 ret.append( | |
| 95 "{}:{}:{}:{} {}".format(p_file, p_line, p_column, " " * nblanks, p_msg) | |
| 96 ) | |
| 97 return "\n".join(ret) | |
| 98 | |
| 99 | |
| 100 def _add_lc_filename( | |
| 101 r, source | |
| 102 ): # type: (ruamel.yaml.comments.CommentedBase, AnyStr) -> None | |
| 103 if isinstance(r, ruamel.yaml.comments.CommentedBase): | |
| 104 r.lc.filename = source | |
| 105 if isinstance(r, MutableSequence): | |
| 106 for d in r: | |
| 107 _add_lc_filename(d, source) | |
| 108 elif isinstance(r, MutableMapping): | |
| 109 for d in six.itervalues(r): | |
| 110 _add_lc_filename(d, source) | |
| 111 | |
| 112 | |
| 113 def relname(source): # type: (Text) -> Text | |
| 114 if source.startswith("file://"): | |
| 115 source = source[7:] | |
| 116 source = os.path.relpath(source) | |
| 117 return source | |
| 118 | |
| 119 | |
| 120 def add_lc_filename( | |
| 121 r, source | |
| 122 ): # type: (ruamel.yaml.comments.CommentedBase, Text) -> None | |
| 123 _add_lc_filename(r, relname(source)) | |
| 124 | |
| 125 | |
| 126 def reflow_all(text, maxline=None): # type: (Text, Optional[int]) -> Text | |
| 127 if maxline is None: | |
| 128 maxline = int(os.environ.get("COLUMNS", "100")) | |
| 129 maxno = 0 | |
| 130 for l in text.splitlines(): | |
| 131 g = lineno_re.match(l) | |
| 132 if not g: | |
| 133 continue | |
| 134 maxno = max(maxno, len(g.group(1))) | |
| 135 maxno_text = maxline - maxno | |
| 136 msg = [] | |
| 137 for l in text.splitlines(): | |
| 138 g = lineno_re.match(l) | |
| 139 if not g: | |
| 140 msg.append(l) | |
| 141 continue | |
| 142 pre = g.group(1) | |
| 143 reflowed = reflow(g.group(2), maxno_text, g.group(3)).splitlines() | |
| 144 msg.extend([pre.ljust(maxno, " ") + r for r in reflowed]) | |
| 145 return "\n".join(msg) | |
| 146 | |
| 147 | |
| 148 def reflow(text, maxline, shift=""): # type: (Text, int, Text) -> Text | |
| 149 if maxline < 20: | |
| 150 maxline = 20 | |
| 151 if len(text) > maxline: | |
| 152 sp = text.rfind(" ", 0, maxline) | |
| 153 if sp < 1: | |
| 154 sp = text.find(" ", sp + 1) | |
| 155 if sp == -1: | |
| 156 sp = len(text) | |
| 157 if sp < len(text): | |
| 158 return "{}\n{}{}".format( | |
| 159 text[0:sp], shift, reflow(text[sp + 1 :], maxline, shift) | |
| 160 ) | |
| 161 return text | |
| 162 | |
| 163 | |
| 164 def indent( | |
| 165 v, nolead=False, shift=u" ", bullet=u" " | |
| 166 ): # type: (Text, bool, Text, Text) -> Text | |
| 167 if nolead: | |
| 168 return v.splitlines()[0] + u"\n".join([shift + l for l in v.splitlines()[1:]]) | |
| 169 else: | |
| 170 | |
| 171 def lineno(i, l): # type: (int, Text) -> Text | |
| 172 r = lineno_re.match(l) | |
| 173 if r is not None: | |
| 174 return r.group(1) + (bullet if i == 0 else shift) + r.group(2) | |
| 175 else: | |
| 176 return (bullet if i == 0 else shift) + l | |
| 177 | |
| 178 return u"\n".join([lineno(i, l) for i, l in enumerate(v.splitlines())]) | |
| 179 | |
| 180 | |
| 181 def bullets(textlist, bul): # type: (List[Text], Text) -> Text | |
| 182 if len(textlist) == 1: | |
| 183 return textlist[0] | |
| 184 else: | |
| 185 return "\n".join(indent(t, bullet=bul) for t in textlist) | |
| 186 | |
| 187 | |
| 188 def strip_duplicated_lineno(text): # type: (Text) -> Text | |
| 189 """Same as `strip_dup_lineno` but without reflow""" | |
| 190 pre = None | |
| 191 msg = [] | |
| 192 for l in text.splitlines(): | |
| 193 g = lineno_re.match(l) | |
| 194 if not g: | |
| 195 msg.append(l) | |
| 196 continue | |
| 197 elif g.group(1) != pre: | |
| 198 msg.append(l) | |
| 199 pre = g.group(1) | |
| 200 else: | |
| 201 msg.append(" " * len(g.group(1)) + g.group(2)) | |
| 202 return "\n".join(msg) | |
| 203 | |
| 204 | |
| 205 def strip_dup_lineno(text, maxline=None): # type: (Text, Optional[int]) -> Text | |
| 206 if maxline is None: | |
| 207 maxline = int(os.environ.get("COLUMNS", "100")) | |
| 208 pre = None | |
| 209 msg = [] | |
| 210 maxno = 0 | |
| 211 for l in text.splitlines(): | |
| 212 g = lineno_re.match(l) | |
| 213 if not g: | |
| 214 continue | |
| 215 maxno = max(maxno, len(g.group(1))) | |
| 216 | |
| 217 for l in text.splitlines(): | |
| 218 g = lineno_re.match(l) | |
| 219 if not g: | |
| 220 msg.append(l) | |
| 221 continue | |
| 222 if g.group(1) != pre: | |
| 223 shift = maxno + len(g.group(3)) | |
| 224 g2 = reflow(g.group(2), maxline - shift, " " * shift) | |
| 225 pre = g.group(1) | |
| 226 msg.append(pre + " " * (maxno - len(g.group(1))) + g2) | |
| 227 else: | |
| 228 g2 = reflow(g.group(2), maxline - maxno, " " * (maxno + len(g.group(3)))) | |
| 229 msg.append(" " * maxno + g2) | |
| 230 return "\n".join(msg) | |
| 231 | |
| 232 | |
| 233 def cmap( | |
| 234 d, # type: Union[int, float, str, Text, Dict[Text, Any], List[Dict[Text, Any]]] | |
| 235 lc=None, # type: Optional[List[int]] | |
| 236 fn=None, # type: Optional[Text] | |
| 237 ): # type: (...) -> Union[int, float, str, Text, CommentedMap, CommentedSeq] | |
| 238 if lc is None: | |
| 239 lc = [0, 0, 0, 0] | |
| 240 if fn is None: | |
| 241 fn = "test" | |
| 242 | |
| 243 if isinstance(d, CommentedMap): | |
| 244 fn = d.lc.filename if hasattr(d.lc, "filename") else fn | |
| 245 for k, v in six.iteritems(d): | |
| 246 if d.lc.data is not None and k in d.lc.data: | |
| 247 d[k] = cmap(v, lc=d.lc.data[k], fn=fn) | |
| 248 else: | |
| 249 d[k] = cmap(v, lc, fn=fn) | |
| 250 return d | |
| 251 if isinstance(d, CommentedSeq): | |
| 252 fn = d.lc.filename if hasattr(d.lc, "filename") else fn | |
| 253 for k2, v2 in enumerate(d): | |
| 254 if d.lc.data is not None and k2 in d.lc.data: | |
| 255 d[k2] = cmap(v2, lc=d.lc.data[k2], fn=fn) | |
| 256 else: | |
| 257 d[k2] = cmap(v2, lc, fn=fn) | |
| 258 return d | |
| 259 if isinstance(d, MutableMapping): | |
| 260 cm = CommentedMap() | |
| 261 for k in sorted(d.keys()): | |
| 262 v = d[k] | |
| 263 if isinstance(v, CommentedBase): | |
| 264 uselc = [v.lc.line, v.lc.col, v.lc.line, v.lc.col] | |
| 265 vfn = v.lc.filename if hasattr(v.lc, "filename") else fn | |
| 266 else: | |
| 267 uselc = lc | |
| 268 vfn = fn | |
| 269 cm[k] = cmap(v, lc=uselc, fn=vfn) | |
| 270 cm.lc.add_kv_line_col(k, uselc) | |
| 271 cm.lc.filename = fn | |
| 272 return cm | |
| 273 if isinstance(d, MutableSequence): | |
| 274 cs = CommentedSeq() | |
| 275 for k3, v3 in enumerate(d): | |
| 276 if isinstance(v3, CommentedBase): | |
| 277 uselc = [v3.lc.line, v3.lc.col, v3.lc.line, v3.lc.col] | |
| 278 vfn = v3.lc.filename if hasattr(v3.lc, "filename") else fn | |
| 279 else: | |
| 280 uselc = lc | |
| 281 vfn = fn | |
| 282 cs.append(cmap(v3, lc=uselc, fn=vfn)) | |
| 283 cs.lc.add_kv_line_col(k3, uselc) | |
| 284 cs.lc.filename = fn | |
| 285 return cs | |
| 286 else: | |
| 287 return d | |
| 288 | |
| 289 | |
| 290 class SourceLine(object): | |
| 291 def __init__( | |
| 292 self, | |
| 293 item, # type: Any | |
| 294 key=None, # type: Optional[Any] | |
| 295 raise_type=six.text_type, # type: Union[Type[six.text_type], Type[Exception]] | |
| 296 include_traceback=False, # type: bool | |
| 297 ): # type: (...) -> None | |
| 298 self.item = item | |
| 299 self.key = key | |
| 300 self.raise_type = raise_type | |
| 301 self.include_traceback = include_traceback | |
| 302 | |
| 303 def __enter__(self): # type: () -> SourceLine | |
| 304 return self | |
| 305 | |
| 306 def __exit__( | |
| 307 self, | |
| 308 exc_type, # type: Any | |
| 309 exc_value, # type: Any | |
| 310 tb, # type: Any | |
| 311 ): # type: (...) -> None | |
| 312 if not exc_value: | |
| 313 return | |
| 314 if self.include_traceback and six.PY2: | |
| 315 # Python2 doesn't actually have chained exceptions, so | |
| 316 # fake it by injecting the backtrace into the message. | |
| 317 raise_from( | |
| 318 self.makeError( | |
| 319 "\n".join(traceback.format_exception(exc_type, exc_value, tb)) | |
| 320 ), | |
| 321 exc_value, | |
| 322 ) | |
| 323 else: | |
| 324 raise_from(self.makeError(six.text_type(exc_value)), exc_value) | |
| 325 | |
| 326 def file(self): # type: () -> Optional[Text] | |
| 327 if hasattr(self.item, "lc") and hasattr(self.item.lc, "filename"): | |
| 328 return Text(self.item.lc.filename) | |
| 329 else: | |
| 330 return None | |
| 331 | |
| 332 def start(self): # type: () -> Optional[Tuple[int, int]] | |
| 333 if self.file() is None: | |
| 334 return None | |
| 335 elif ( | |
| 336 self.key is None | |
| 337 or self.item.lc.data is None | |
| 338 or self.key not in self.item.lc.data | |
| 339 ): | |
| 340 return ((self.item.lc.line or 0) + 1, (self.item.lc.col or 0) + 1) | |
| 341 else: | |
| 342 return ( | |
| 343 (self.item.lc.data[self.key][0] or 0) + 1, | |
| 344 (self.item.lc.data[self.key][1] or 0) + 1, | |
| 345 ) | |
| 346 | |
| 347 def end(self): # type: () -> Optional[Tuple[int, int]] | |
| 348 return None | |
| 349 | |
| 350 def makeLead(self): # type: () -> Text | |
| 351 if self.file(): | |
| 352 lcol = self.start() | |
| 353 line, col = lcol if lcol else ("", "") | |
| 354 return "{}:{}:{}:".format(self.file(), line, col) | |
| 355 else: | |
| 356 return "" | |
| 357 | |
| 358 def makeError(self, msg): # type: (Text) -> Any | |
| 359 if not isinstance(self.item, ruamel.yaml.comments.CommentedBase): | |
| 360 return self.raise_type(msg) | |
| 361 errs = [] | |
| 362 lead = self.makeLead() | |
| 363 for m in msg.splitlines(): | |
| 364 if bool(lineno_re.match(m)): | |
| 365 errs.append(m) | |
| 366 else: | |
| 367 errs.append("{} {}".format(lead, m)) | |
| 368 return self.raise_type("\n".join(errs)) |
