Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/schema_salad/sourceline.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import os | |
2 import re | |
3 from typing import ( | |
4 Any, | |
5 AnyStr, | |
6 Callable, | |
7 Dict, | |
8 List, | |
9 MutableMapping, | |
10 MutableSequence, | |
11 Optional, | |
12 Tuple, | |
13 Union, | |
14 ) | |
15 | |
16 import ruamel.yaml | |
17 from ruamel.yaml.comments import CommentedBase, CommentedMap, CommentedSeq | |
18 | |
19 lineno_re = re.compile("^(.*?:[0-9]+:[0-9]+: )(( *)(.*))") | |
20 | |
21 | |
22 def _add_lc_filename(r: ruamel.yaml.comments.CommentedBase, source: AnyStr) -> None: | |
23 if isinstance(r, ruamel.yaml.comments.CommentedBase): | |
24 r.lc.filename = source | |
25 if isinstance(r, MutableSequence): | |
26 for d in r: | |
27 _add_lc_filename(d, source) | |
28 elif isinstance(r, MutableMapping): | |
29 for d in r.values(): | |
30 _add_lc_filename(d, source) | |
31 | |
32 | |
33 def relname(source: str) -> str: | |
34 if source.startswith("file://"): | |
35 source = source[7:] | |
36 source = os.path.relpath(source) | |
37 return source | |
38 | |
39 | |
40 def add_lc_filename(r: ruamel.yaml.comments.CommentedBase, source: str) -> None: | |
41 _add_lc_filename(r, relname(source)) | |
42 | |
43 | |
44 def reflow_all(text: str, maxline: Optional[int] = None) -> str: | |
45 if maxline is None: | |
46 maxline = int(os.environ.get("COLUMNS", "100")) | |
47 maxno = 0 | |
48 for line in text.splitlines(): | |
49 g = lineno_re.match(line) | |
50 if not g: | |
51 continue | |
52 group = g.group(1) | |
53 assert group is not None # nosec | |
54 maxno = max(maxno, len(group)) | |
55 maxno_text = maxline - maxno | |
56 msg = [] # type: List[str] | |
57 for line in text.splitlines(): | |
58 g = lineno_re.match(line) | |
59 if not g: | |
60 msg.append(line) | |
61 continue | |
62 pre = g.group(1) | |
63 assert pre is not None # nosec | |
64 group2 = g.group(2) | |
65 assert group2 is not None # nosec | |
66 reflowed = reflow(group2, maxno_text, g.group(3)).splitlines() | |
67 msg.extend([pre.ljust(maxno, " ") + r for r in reflowed]) | |
68 return "\n".join(msg) | |
69 | |
70 | |
71 def reflow(text: str, maxline: int, shift: Optional[str] = "") -> str: | |
72 if maxline < 20: | |
73 maxline = 20 | |
74 if len(text) > maxline: | |
75 sp = text.rfind(" ", 0, maxline) | |
76 if sp < 1: | |
77 sp = text.find(" ", sp + 1) | |
78 if sp == -1: | |
79 sp = len(text) | |
80 if sp < len(text): | |
81 return "{}\n{}{}".format( | |
82 text[0:sp], shift, reflow(text[sp + 1 :], maxline, shift) | |
83 ) | |
84 return text | |
85 | |
86 | |
87 def indent(v: str, nolead: bool = False, shift: str = " ", bullet: str = " ") -> str: | |
88 if nolead: | |
89 return v.splitlines()[0] + "\n".join( | |
90 [shift + line for line in v.splitlines()[1:]] | |
91 ) | |
92 else: | |
93 | |
94 def lineno(i: int, line: str) -> str: | |
95 r = lineno_re.match(line) | |
96 if r is not None: | |
97 group1 = r.group(1) | |
98 group2 = r.group(2) | |
99 assert group1 is not None # nosec | |
100 assert group2 is not None # nosec | |
101 return group1 + (bullet if i == 0 else shift) + group2 | |
102 else: | |
103 return (bullet if i == 0 else shift) + line | |
104 | |
105 return "\n".join([lineno(i, line) for i, line in enumerate(v.splitlines())]) | |
106 | |
107 | |
108 def bullets(textlist: List[str], bul: str) -> str: | |
109 if len(textlist) == 1: | |
110 return textlist[0] | |
111 else: | |
112 return "\n".join(indent(t, bullet=bul) for t in textlist) | |
113 | |
114 | |
115 def strip_duplicated_lineno(text: str) -> str: | |
116 """Same as `strip_dup_lineno` but without reflow""" | |
117 pre = None # type: Optional[str] | |
118 msg = [] | |
119 for line in text.splitlines(): | |
120 g = lineno_re.match(line) | |
121 if not g: | |
122 msg.append(line) | |
123 continue | |
124 elif g.group(1) != pre: | |
125 msg.append(line) | |
126 pre = g.group(1) | |
127 else: | |
128 group1 = g.group(1) | |
129 group2 = g.group(2) | |
130 assert group1 is not None # nosec | |
131 assert group2 is not None # nosec | |
132 msg.append(" " * len(group1) + group2) | |
133 return "\n".join(msg) | |
134 | |
135 | |
136 def strip_dup_lineno(text: str, maxline: Optional[int] = None) -> str: | |
137 if maxline is None: | |
138 maxline = int(os.environ.get("COLUMNS", "100")) | |
139 pre = None # type: Optional[str] | |
140 msg = [] | |
141 maxno = 0 | |
142 for line in text.splitlines(): | |
143 g = lineno_re.match(line) | |
144 if not g: | |
145 continue | |
146 group1 = g.group(1) | |
147 assert group1 is not None # nosec | |
148 maxno = max(maxno, len(group1)) | |
149 | |
150 for line in text.splitlines(): | |
151 g = lineno_re.match(line) | |
152 if not g: | |
153 msg.append(line) | |
154 continue | |
155 if g.group(1) != pre: | |
156 group3 = g.group(3) | |
157 assert group3 is not None # nosec | |
158 shift = maxno + len(group3) | |
159 group2 = g.group(2) | |
160 assert group2 is not None # nosec | |
161 g2 = reflow(group2, maxline - shift, " " * shift) | |
162 pre = g.group(1) | |
163 assert pre is not None # nosec | |
164 msg.append(pre + " " * (maxno - len(pre)) + g2) | |
165 else: | |
166 group2 = g.group(2) | |
167 assert group2 is not None # nosec | |
168 group3 = g.group(3) | |
169 assert group3 is not None # nosec | |
170 g2 = reflow(group2, maxline - maxno, " " * (maxno + len(group3))) | |
171 msg.append(" " * maxno + g2) | |
172 return "\n".join(msg) | |
173 | |
174 | |
175 def cmap( | |
176 d: Union[int, float, str, Dict[str, Any], List[Any], None], | |
177 lc: Optional[List[int]] = None, | |
178 fn: Optional[str] = None, | |
179 ) -> Union[int, float, str, CommentedMap, CommentedSeq, None]: | |
180 if lc is None: | |
181 lc = [0, 0, 0, 0] | |
182 if fn is None: | |
183 fn = "test" | |
184 | |
185 if isinstance(d, CommentedMap): | |
186 fn = d.lc.filename if hasattr(d.lc, "filename") else fn | |
187 for k, v in d.items(): | |
188 if d.lc.data is not None and k in d.lc.data: | |
189 d[k] = cmap(v, lc=d.lc.data[k], fn=fn) | |
190 else: | |
191 d[k] = cmap(v, lc, fn=fn) | |
192 return d | |
193 if isinstance(d, CommentedSeq): | |
194 fn = d.lc.filename if hasattr(d.lc, "filename") else fn | |
195 for k2, v2 in enumerate(d): | |
196 if d.lc.data is not None and k2 in d.lc.data: | |
197 d[k2] = cmap(v2, lc=d.lc.data[k2], fn=fn) | |
198 else: | |
199 d[k2] = cmap(v2, lc, fn=fn) | |
200 return d | |
201 if isinstance(d, MutableMapping): | |
202 cm = CommentedMap() | |
203 for k in sorted(d.keys()): | |
204 v = d[k] | |
205 if isinstance(v, CommentedBase): | |
206 uselc = [v.lc.line, v.lc.col, v.lc.line, v.lc.col] | |
207 vfn = v.lc.filename if hasattr(v.lc, "filename") else fn | |
208 else: | |
209 uselc = lc | |
210 vfn = fn | |
211 cm[k] = cmap(v, lc=uselc, fn=vfn) | |
212 cm.lc.add_kv_line_col(k, uselc) | |
213 cm.lc.filename = fn | |
214 return cm | |
215 if isinstance(d, MutableSequence): | |
216 cs = CommentedSeq() | |
217 for k3, v3 in enumerate(d): | |
218 if isinstance(v3, CommentedBase): | |
219 uselc = [v3.lc.line, v3.lc.col, v3.lc.line, v3.lc.col] | |
220 vfn = v3.lc.filename if hasattr(v3.lc, "filename") else fn | |
221 else: | |
222 uselc = lc | |
223 vfn = fn | |
224 cs.append(cmap(v3, lc=uselc, fn=vfn)) | |
225 cs.lc.add_kv_line_col(k3, uselc) | |
226 cs.lc.filename = fn | |
227 return cs | |
228 else: | |
229 return d | |
230 | |
231 | |
232 class SourceLine: | |
233 def __init__( | |
234 self, | |
235 item: Any, | |
236 key: Optional[Any] = None, | |
237 raise_type: Callable[[str], Any] = str, | |
238 include_traceback: bool = False, | |
239 ) -> None: | |
240 self.item = item | |
241 self.key = key | |
242 self.raise_type = raise_type | |
243 self.include_traceback = include_traceback | |
244 | |
245 def __enter__(self) -> "SourceLine": | |
246 return self | |
247 | |
248 def __exit__( | |
249 self, | |
250 exc_type: Any, | |
251 exc_value: Any, | |
252 tb: Any, | |
253 ) -> None: | |
254 if not exc_value: | |
255 return | |
256 raise self.makeError(str(exc_value)) from exc_value | |
257 | |
258 def file(self) -> Optional[str]: | |
259 if hasattr(self.item, "lc") and hasattr(self.item.lc, "filename"): | |
260 return str(self.item.lc.filename) | |
261 else: | |
262 return None | |
263 | |
264 def start(self) -> Optional[Tuple[int, int]]: | |
265 if self.file() is None: | |
266 return None | |
267 elif ( | |
268 self.key is None | |
269 or self.item.lc.data is None | |
270 or self.key not in self.item.lc.data | |
271 ): | |
272 return ((self.item.lc.line or 0) + 1, (self.item.lc.col or 0) + 1) | |
273 else: | |
274 return ( | |
275 (self.item.lc.data[self.key][0] or 0) + 1, | |
276 (self.item.lc.data[self.key][1] or 0) + 1, | |
277 ) | |
278 | |
279 def end(self) -> Optional[Tuple[int, int]]: | |
280 return None | |
281 | |
282 def makeLead(self) -> str: | |
283 if self.file(): | |
284 lcol = self.start() | |
285 line, col = lcol if lcol else ("", "") | |
286 return f"{self.file()}:{line}:{col}:" | |
287 else: | |
288 return "" | |
289 | |
290 def makeError(self, msg: str) -> Any: | |
291 if not isinstance(self.item, ruamel.yaml.comments.CommentedBase): | |
292 return self.raise_type(msg) | |
293 errs = [] | |
294 lead = self.makeLead() | |
295 for m in msg.splitlines(): | |
296 if bool(lineno_re.match(m)): | |
297 errs.append(m) | |
298 else: | |
299 errs.append(f"{lead} {m}") | |
300 return self.raise_type("\n".join(errs)) |