comparison env/lib/python3.9/site-packages/schema_salad/sourceline.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import os
2 import re
3 from typing import (
4 Any,
5 AnyStr,
6 Callable,
7 Dict,
8 List,
9 MutableMapping,
10 MutableSequence,
11 Optional,
12 Tuple,
13 Union,
14 )
15
16 import ruamel.yaml
17 from ruamel.yaml.comments import CommentedBase, CommentedMap, CommentedSeq
18
19 lineno_re = re.compile("^(.*?:[0-9]+:[0-9]+: )(( *)(.*))")
20
21
22 def _add_lc_filename(r: ruamel.yaml.comments.CommentedBase, source: AnyStr) -> None:
23 if isinstance(r, ruamel.yaml.comments.CommentedBase):
24 r.lc.filename = source
25 if isinstance(r, MutableSequence):
26 for d in r:
27 _add_lc_filename(d, source)
28 elif isinstance(r, MutableMapping):
29 for d in r.values():
30 _add_lc_filename(d, source)
31
32
33 def relname(source: str) -> str:
34 if source.startswith("file://"):
35 source = source[7:]
36 source = os.path.relpath(source)
37 return source
38
39
40 def add_lc_filename(r: ruamel.yaml.comments.CommentedBase, source: str) -> None:
41 _add_lc_filename(r, relname(source))
42
43
44 def reflow_all(text: str, maxline: Optional[int] = None) -> str:
45 if maxline is None:
46 maxline = int(os.environ.get("COLUMNS", "100"))
47 maxno = 0
48 for line in text.splitlines():
49 g = lineno_re.match(line)
50 if not g:
51 continue
52 group = g.group(1)
53 assert group is not None # nosec
54 maxno = max(maxno, len(group))
55 maxno_text = maxline - maxno
56 msg = [] # type: List[str]
57 for line in text.splitlines():
58 g = lineno_re.match(line)
59 if not g:
60 msg.append(line)
61 continue
62 pre = g.group(1)
63 assert pre is not None # nosec
64 group2 = g.group(2)
65 assert group2 is not None # nosec
66 reflowed = reflow(group2, maxno_text, g.group(3)).splitlines()
67 msg.extend([pre.ljust(maxno, " ") + r for r in reflowed])
68 return "\n".join(msg)
69
70
71 def reflow(text: str, maxline: int, shift: Optional[str] = "") -> str:
72 if maxline < 20:
73 maxline = 20
74 if len(text) > maxline:
75 sp = text.rfind(" ", 0, maxline)
76 if sp < 1:
77 sp = text.find(" ", sp + 1)
78 if sp == -1:
79 sp = len(text)
80 if sp < len(text):
81 return "{}\n{}{}".format(
82 text[0:sp], shift, reflow(text[sp + 1 :], maxline, shift)
83 )
84 return text
85
86
87 def indent(v: str, nolead: bool = False, shift: str = " ", bullet: str = " ") -> str:
88 if nolead:
89 return v.splitlines()[0] + "\n".join(
90 [shift + line for line in v.splitlines()[1:]]
91 )
92 else:
93
94 def lineno(i: int, line: str) -> str:
95 r = lineno_re.match(line)
96 if r is not None:
97 group1 = r.group(1)
98 group2 = r.group(2)
99 assert group1 is not None # nosec
100 assert group2 is not None # nosec
101 return group1 + (bullet if i == 0 else shift) + group2
102 else:
103 return (bullet if i == 0 else shift) + line
104
105 return "\n".join([lineno(i, line) for i, line in enumerate(v.splitlines())])
106
107
108 def bullets(textlist: List[str], bul: str) -> str:
109 if len(textlist) == 1:
110 return textlist[0]
111 else:
112 return "\n".join(indent(t, bullet=bul) for t in textlist)
113
114
115 def strip_duplicated_lineno(text: str) -> str:
116 """Same as `strip_dup_lineno` but without reflow"""
117 pre = None # type: Optional[str]
118 msg = []
119 for line in text.splitlines():
120 g = lineno_re.match(line)
121 if not g:
122 msg.append(line)
123 continue
124 elif g.group(1) != pre:
125 msg.append(line)
126 pre = g.group(1)
127 else:
128 group1 = g.group(1)
129 group2 = g.group(2)
130 assert group1 is not None # nosec
131 assert group2 is not None # nosec
132 msg.append(" " * len(group1) + group2)
133 return "\n".join(msg)
134
135
136 def strip_dup_lineno(text: str, maxline: Optional[int] = None) -> str:
137 if maxline is None:
138 maxline = int(os.environ.get("COLUMNS", "100"))
139 pre = None # type: Optional[str]
140 msg = []
141 maxno = 0
142 for line in text.splitlines():
143 g = lineno_re.match(line)
144 if not g:
145 continue
146 group1 = g.group(1)
147 assert group1 is not None # nosec
148 maxno = max(maxno, len(group1))
149
150 for line in text.splitlines():
151 g = lineno_re.match(line)
152 if not g:
153 msg.append(line)
154 continue
155 if g.group(1) != pre:
156 group3 = g.group(3)
157 assert group3 is not None # nosec
158 shift = maxno + len(group3)
159 group2 = g.group(2)
160 assert group2 is not None # nosec
161 g2 = reflow(group2, maxline - shift, " " * shift)
162 pre = g.group(1)
163 assert pre is not None # nosec
164 msg.append(pre + " " * (maxno - len(pre)) + g2)
165 else:
166 group2 = g.group(2)
167 assert group2 is not None # nosec
168 group3 = g.group(3)
169 assert group3 is not None # nosec
170 g2 = reflow(group2, maxline - maxno, " " * (maxno + len(group3)))
171 msg.append(" " * maxno + g2)
172 return "\n".join(msg)
173
174
175 def cmap(
176 d: Union[int, float, str, Dict[str, Any], List[Any], None],
177 lc: Optional[List[int]] = None,
178 fn: Optional[str] = None,
179 ) -> Union[int, float, str, CommentedMap, CommentedSeq, None]:
180 if lc is None:
181 lc = [0, 0, 0, 0]
182 if fn is None:
183 fn = "test"
184
185 if isinstance(d, CommentedMap):
186 fn = d.lc.filename if hasattr(d.lc, "filename") else fn
187 for k, v in d.items():
188 if d.lc.data is not None and k in d.lc.data:
189 d[k] = cmap(v, lc=d.lc.data[k], fn=fn)
190 else:
191 d[k] = cmap(v, lc, fn=fn)
192 return d
193 if isinstance(d, CommentedSeq):
194 fn = d.lc.filename if hasattr(d.lc, "filename") else fn
195 for k2, v2 in enumerate(d):
196 if d.lc.data is not None and k2 in d.lc.data:
197 d[k2] = cmap(v2, lc=d.lc.data[k2], fn=fn)
198 else:
199 d[k2] = cmap(v2, lc, fn=fn)
200 return d
201 if isinstance(d, MutableMapping):
202 cm = CommentedMap()
203 for k in sorted(d.keys()):
204 v = d[k]
205 if isinstance(v, CommentedBase):
206 uselc = [v.lc.line, v.lc.col, v.lc.line, v.lc.col]
207 vfn = v.lc.filename if hasattr(v.lc, "filename") else fn
208 else:
209 uselc = lc
210 vfn = fn
211 cm[k] = cmap(v, lc=uselc, fn=vfn)
212 cm.lc.add_kv_line_col(k, uselc)
213 cm.lc.filename = fn
214 return cm
215 if isinstance(d, MutableSequence):
216 cs = CommentedSeq()
217 for k3, v3 in enumerate(d):
218 if isinstance(v3, CommentedBase):
219 uselc = [v3.lc.line, v3.lc.col, v3.lc.line, v3.lc.col]
220 vfn = v3.lc.filename if hasattr(v3.lc, "filename") else fn
221 else:
222 uselc = lc
223 vfn = fn
224 cs.append(cmap(v3, lc=uselc, fn=vfn))
225 cs.lc.add_kv_line_col(k3, uselc)
226 cs.lc.filename = fn
227 return cs
228 else:
229 return d
230
231
232 class SourceLine:
233 def __init__(
234 self,
235 item: Any,
236 key: Optional[Any] = None,
237 raise_type: Callable[[str], Any] = str,
238 include_traceback: bool = False,
239 ) -> None:
240 self.item = item
241 self.key = key
242 self.raise_type = raise_type
243 self.include_traceback = include_traceback
244
245 def __enter__(self) -> "SourceLine":
246 return self
247
248 def __exit__(
249 self,
250 exc_type: Any,
251 exc_value: Any,
252 tb: Any,
253 ) -> None:
254 if not exc_value:
255 return
256 raise self.makeError(str(exc_value)) from exc_value
257
258 def file(self) -> Optional[str]:
259 if hasattr(self.item, "lc") and hasattr(self.item.lc, "filename"):
260 return str(self.item.lc.filename)
261 else:
262 return None
263
264 def start(self) -> Optional[Tuple[int, int]]:
265 if self.file() is None:
266 return None
267 elif (
268 self.key is None
269 or self.item.lc.data is None
270 or self.key not in self.item.lc.data
271 ):
272 return ((self.item.lc.line or 0) + 1, (self.item.lc.col or 0) + 1)
273 else:
274 return (
275 (self.item.lc.data[self.key][0] or 0) + 1,
276 (self.item.lc.data[self.key][1] or 0) + 1,
277 )
278
279 def end(self) -> Optional[Tuple[int, int]]:
280 return None
281
282 def makeLead(self) -> str:
283 if self.file():
284 lcol = self.start()
285 line, col = lcol if lcol else ("", "")
286 return f"{self.file()}:{line}:{col}:"
287 else:
288 return ""
289
290 def makeError(self, msg: str) -> Any:
291 if not isinstance(self.item, ruamel.yaml.comments.CommentedBase):
292 return self.raise_type(msg)
293 errs = []
294 lead = self.makeLead()
295 for m in msg.splitlines():
296 if bool(lineno_re.match(m)):
297 errs.append(m)
298 else:
299 errs.append(f"{lead} {m}")
300 return self.raise_type("\n".join(errs))