comparison env/lib/python3.9/site-packages/schema_salad/python_codegen_support.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import copy
2 import os
3 import pathlib
4 import re
5 import tempfile
6 import uuid as _uuid__ # pylint: disable=unused-import # noqa: F401
7 from io import StringIO
8 from typing import (
9 Any,
10 Dict,
11 List,
12 MutableMapping,
13 MutableSequence,
14 Optional,
15 Sequence,
16 Tuple,
17 Type,
18 Union,
19 )
20 from urllib.parse import quote, urlsplit, urlunsplit
21 from urllib.request import pathname2url
22
23 from ruamel import yaml
24 from ruamel.yaml.comments import CommentedMap
25
26 from schema_salad.exceptions import SchemaSaladException, ValidationException
27 from schema_salad.fetcher import DefaultFetcher, Fetcher
28 from schema_salad.sourceline import SourceLine, add_lc_filename
29
30 _vocab = {} # type: Dict[str, str]
31 _rvocab = {} # type: Dict[str, str]
32
33
34 class Savable:
35 @classmethod
36 def fromDoc(cls, _doc, baseuri, loadingOptions, docRoot=None):
37 # type: (Any, str, LoadingOptions, Optional[str]) -> Savable
38 pass
39
40 def save(self, top=False, base_url="", relative_uris=True):
41 # type: (bool, str, bool) -> Dict[str, str]
42 pass
43
44
45 class LoadingOptions:
46 def __init__(
47 self,
48 fetcher=None, # type: Optional[Fetcher]
49 namespaces=None, # type: Optional[Dict[str, str]]
50 schemas=None, # type: Optional[Dict[str, str]]
51 fileuri=None, # type: Optional[str]
52 copyfrom=None, # type: Optional[LoadingOptions]
53 original_doc=None, # type: Optional[Any]
54 ): # type: (...) -> None
55 self.idx = {} # type: Dict[str, Dict[str, Any]]
56 self.fileuri = fileuri # type: Optional[str]
57 self.namespaces = namespaces
58 self.schemas = schemas
59 self.original_doc = original_doc
60 if copyfrom is not None:
61 self.idx = copyfrom.idx
62 if fetcher is None:
63 fetcher = copyfrom.fetcher
64 if fileuri is None:
65 self.fileuri = copyfrom.fileuri
66 if namespaces is None:
67 self.namespaces = copyfrom.namespaces
68 if schemas is None:
69 self.schemas = copyfrom.schemas
70
71 if fetcher is None:
72 import requests
73 from cachecontrol.caches import FileCache
74 from cachecontrol.wrapper import CacheControl
75
76 root = pathlib.Path(os.environ.get("HOME", tempfile.gettempdir()))
77 session = CacheControl(
78 requests.Session(),
79 cache=FileCache(root / ".cache" / "salad"),
80 )
81 self.fetcher: Fetcher = DefaultFetcher({}, session)
82 else:
83 self.fetcher = fetcher
84
85 self.vocab = _vocab
86 self.rvocab = _rvocab
87
88 if namespaces is not None:
89 self.vocab = self.vocab.copy()
90 self.rvocab = self.rvocab.copy()
91 for k, v in namespaces.items():
92 self.vocab[k] = v
93 self.rvocab[v] = k
94
95
96 def load_field(val, fieldtype, baseuri, loadingOptions):
97 # type: (Union[str, Dict[str, str]], _Loader, str, LoadingOptions) -> Any
98 if isinstance(val, MutableMapping):
99 if "$import" in val:
100 if loadingOptions.fileuri is None:
101 raise SchemaSaladException("Cannot load $import without fileuri")
102 return _document_load_by_url(
103 fieldtype,
104 loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$import"]),
105 loadingOptions,
106 )
107 elif "$include" in val:
108 if loadingOptions.fileuri is None:
109 raise SchemaSaladException("Cannot load $import without fileuri")
110 val = loadingOptions.fetcher.fetch_text(
111 loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$include"])
112 )
113 return fieldtype.load(val, baseuri, loadingOptions)
114
115
116 save_type = Union[Dict[str, str], List[Union[Dict[str, str], List[Any], None]], None]
117
118
119 def save(
120 val, # type: Optional[Union[Savable, MutableSequence[Savable]]]
121 top=True, # type: bool
122 base_url="", # type: str
123 relative_uris=True, # type: bool
124 ): # type: (...) -> save_type
125
126 if isinstance(val, Savable):
127 return val.save(top=top, base_url=base_url, relative_uris=relative_uris)
128 if isinstance(val, MutableSequence):
129 return [
130 save(v, top=False, base_url=base_url, relative_uris=relative_uris)
131 for v in val
132 ]
133 if isinstance(val, MutableMapping):
134 newdict = {}
135 for key in val:
136 newdict[key] = save(
137 val[key], top=False, base_url=base_url, relative_uris=relative_uris
138 )
139 return newdict
140 return val
141
142
143 def expand_url(
144 url, # type: str
145 base_url, # type: str
146 loadingOptions, # type: LoadingOptions
147 scoped_id=False, # type: bool
148 vocab_term=False, # type: bool
149 scoped_ref=None, # type: Optional[int]
150 ):
151 # type: (...) -> str
152 if url in ("@id", "@type"):
153 return url
154
155 if vocab_term and url in loadingOptions.vocab:
156 return url
157
158 if bool(loadingOptions.vocab) and ":" in url:
159 prefix = url.split(":")[0]
160 if prefix in loadingOptions.vocab:
161 url = loadingOptions.vocab[prefix] + url[len(prefix) + 1 :]
162
163 split = urlsplit(url)
164
165 if (
166 (bool(split.scheme) and split.scheme in ["http", "https", "file"])
167 or url.startswith("$(")
168 or url.startswith("${")
169 ):
170 pass
171 elif scoped_id and not bool(split.fragment):
172 splitbase = urlsplit(base_url)
173 frg = ""
174 if bool(splitbase.fragment):
175 frg = splitbase.fragment + "/" + split.path
176 else:
177 frg = split.path
178 pt = splitbase.path if splitbase.path != "" else "/"
179 url = urlunsplit((splitbase.scheme, splitbase.netloc, pt, splitbase.query, frg))
180 elif scoped_ref is not None and not bool(split.fragment):
181 splitbase = urlsplit(base_url)
182 sp = splitbase.fragment.split("/")
183 n = scoped_ref
184 while n > 0 and len(sp) > 0:
185 sp.pop()
186 n -= 1
187 sp.append(url)
188 url = urlunsplit(
189 (
190 splitbase.scheme,
191 splitbase.netloc,
192 splitbase.path,
193 splitbase.query,
194 "/".join(sp),
195 )
196 )
197 else:
198 url = loadingOptions.fetcher.urljoin(base_url, url)
199
200 if vocab_term:
201 split = urlsplit(url)
202 if bool(split.scheme):
203 if url in loadingOptions.rvocab:
204 return loadingOptions.rvocab[url]
205 else:
206 raise ValidationException(f"Term '{url}' not in vocabulary")
207
208 return url
209
210
211 class _Loader:
212 def load(self, doc, baseuri, loadingOptions, docRoot=None):
213 # type: (Any, str, LoadingOptions, Optional[str]) -> Any
214 pass
215
216
217 class _AnyLoader(_Loader):
218 def load(self, doc, baseuri, loadingOptions, docRoot=None):
219 # type: (Any, str, LoadingOptions, Optional[str]) -> Any
220 if doc is not None:
221 return doc
222 raise ValidationException("Expected non-null")
223
224
225 class _PrimitiveLoader(_Loader):
226 def __init__(self, tp):
227 # type: (Union[type, Tuple[Type[str], Type[str]]]) -> None
228 self.tp = tp
229
230 def load(self, doc, baseuri, loadingOptions, docRoot=None):
231 # type: (Any, str, LoadingOptions, Optional[str]) -> Any
232 if not isinstance(doc, self.tp):
233 raise ValidationException(
234 "Expected a {} but got {}".format(
235 self.tp.__class__.__name__, doc.__class__.__name__
236 )
237 )
238 return doc
239
240 def __repr__(self): # type: () -> str
241 return str(self.tp)
242
243
244 class _ArrayLoader(_Loader):
245 def __init__(self, items):
246 # type: (_Loader) -> None
247 self.items = items
248
249 def load(self, doc, baseuri, loadingOptions, docRoot=None):
250 # type: (Any, str, LoadingOptions, Optional[str]) -> Any
251 if not isinstance(doc, MutableSequence):
252 raise ValidationException("Expected a list")
253 r = [] # type: List[Any]
254 errors = [] # type: List[SchemaSaladException]
255 for i in range(0, len(doc)):
256 try:
257 lf = load_field(
258 doc[i], _UnionLoader((self, self.items)), baseuri, loadingOptions
259 )
260 if isinstance(lf, MutableSequence):
261 r.extend(lf)
262 else:
263 r.append(lf)
264 except ValidationException as e:
265 errors.append(e.with_sourceline(SourceLine(doc, i, str)))
266 if errors:
267 raise ValidationException("", None, errors)
268 return r
269
270 def __repr__(self): # type: () -> str
271 return f"array<{self.items}>"
272
273
274 class _EnumLoader(_Loader):
275 def __init__(self, symbols):
276 # type: (Sequence[str]) -> None
277 self.symbols = symbols
278
279 def load(self, doc, baseuri, loadingOptions, docRoot=None):
280 # type: (Any, str, LoadingOptions, Optional[str]) -> Any
281 if doc in self.symbols:
282 return doc
283 else:
284 raise ValidationException(f"Expected one of {self.symbols}")
285
286
287 class _SecondaryDSLLoader(_Loader):
288 def __init__(self, inner):
289 # type: (_Loader) -> None
290 self.inner = inner
291
292 def load(self, doc, baseuri, loadingOptions, docRoot=None):
293 # type: (Any, str, LoadingOptions, Optional[str]) -> Any
294 r: List[Dict[str, Any]] = []
295 if isinstance(doc, MutableSequence):
296 for d in doc:
297 if isinstance(d, str):
298 if d.endswith("?"):
299 r.append({"pattern": d[:-1], "required": False})
300 else:
301 r.append({"pattern": d})
302 elif isinstance(d, dict):
303 new_dict: Dict[str, Any] = {}
304 if "pattern" in d:
305 new_dict["pattern"] = d.pop("pattern")
306 else:
307 raise ValidationException(
308 "Missing pattern in secondaryFiles specification entry: {}".format(
309 d
310 )
311 )
312 new_dict["required"] = (
313 d.pop("required") if "required" in d else None
314 )
315
316 if len(d):
317 raise ValidationException(
318 "Unallowed values in secondaryFiles specification entry: {}".format(
319 d
320 )
321 )
322
323 else:
324 raise ValidationException(
325 "Expected a string or sequence of (strings or mappings)."
326 )
327 elif isinstance(doc, str):
328 if doc.endswith("?"):
329 r.append({"pattern": doc[:-1], "required": False})
330 else:
331 r.append({"pattern": doc})
332 else:
333 raise ValidationException("Expected str or sequence of str")
334 return self.inner.load(r, baseuri, loadingOptions, docRoot)
335
336
337 class _RecordLoader(_Loader):
338 def __init__(self, classtype):
339 # type: (Type[Savable]) -> None
340 self.classtype = classtype
341
342 def load(self, doc, baseuri, loadingOptions, docRoot=None):
343 # type: (Any, str, LoadingOptions, Optional[str]) -> Any
344 if not isinstance(doc, MutableMapping):
345 raise ValidationException("Expected a dict")
346 return self.classtype.fromDoc(doc, baseuri, loadingOptions, docRoot=docRoot)
347
348 def __repr__(self): # type: () -> str
349 return str(self.classtype)
350
351
352 class _ExpressionLoader(_Loader):
353 def __init__(self, items: Type[str]) -> None:
354 self.items = items
355
356 def load(self, doc, baseuri, loadingOptions, docRoot=None):
357 # type: (Any, str, LoadingOptions, Optional[str]) -> Any
358 if not isinstance(doc, str):
359 raise ValidationException("Expected a str")
360 return doc
361
362
363 class _UnionLoader(_Loader):
364 def __init__(self, alternates):
365 # type: (Sequence[_Loader]) -> None
366 self.alternates = alternates
367
368 def load(self, doc, baseuri, loadingOptions, docRoot=None):
369 # type: (Any, str, LoadingOptions, Optional[str]) -> Any
370 errors = []
371 for t in self.alternates:
372 try:
373 return t.load(doc, baseuri, loadingOptions, docRoot=docRoot)
374 except ValidationException as e:
375 errors.append(
376 ValidationException(f"tried {t.__class__.__name__} but", None, [e])
377 )
378 raise ValidationException("", None, errors, "-")
379
380 def __repr__(self): # type: () -> str
381 return " | ".join(str(a) for a in self.alternates)
382
383
384 class _URILoader(_Loader):
385 def __init__(self, inner, scoped_id, vocab_term, scoped_ref):
386 # type: (_Loader, bool, bool, Union[int, None]) -> None
387 self.inner = inner
388 self.scoped_id = scoped_id
389 self.vocab_term = vocab_term
390 self.scoped_ref = scoped_ref
391
392 def load(self, doc, baseuri, loadingOptions, docRoot=None):
393 # type: (Any, str, LoadingOptions, Optional[str]) -> Any
394 if isinstance(doc, MutableSequence):
395 newdoc = []
396 for i in doc:
397 if isinstance(i, str):
398 newdoc.append(
399 expand_url(
400 i,
401 baseuri,
402 loadingOptions,
403 self.scoped_id,
404 self.vocab_term,
405 self.scoped_ref,
406 )
407 )
408 else:
409 newdoc.append(i)
410 doc = newdoc
411 elif isinstance(doc, str):
412 doc = expand_url(
413 doc,
414 baseuri,
415 loadingOptions,
416 self.scoped_id,
417 self.vocab_term,
418 self.scoped_ref,
419 )
420 return self.inner.load(doc, baseuri, loadingOptions)
421
422
423 class _TypeDSLLoader(_Loader):
424 typeDSLregex = re.compile(r"^([^[?]+)(\[\])?(\?)?$")
425
426 def __init__(self, inner, refScope):
427 # type: (_Loader, Union[int, None]) -> None
428 self.inner = inner
429 self.refScope = refScope
430
431 def resolve(
432 self,
433 doc, # type: str
434 baseuri, # type: str
435 loadingOptions, # type: LoadingOptions
436 ):
437 # type: (...) -> Union[List[Union[Dict[str, str], str]], Dict[str, str], str]
438 m = self.typeDSLregex.match(doc)
439 if m:
440 group1 = m.group(1)
441 assert group1 is not None # nosec
442 first = expand_url(
443 group1, baseuri, loadingOptions, False, True, self.refScope
444 )
445 second = third = None
446 if bool(m.group(2)):
447 second = {"type": "array", "items": first}
448 # second = CommentedMap((("type", "array"),
449 # ("items", first)))
450 # second.lc.add_kv_line_col("type", lc)
451 # second.lc.add_kv_line_col("items", lc)
452 # second.lc.filename = filename
453 if bool(m.group(3)):
454 third = ["null", second or first]
455 # third = CommentedSeq(["null", second or first])
456 # third.lc.add_kv_line_col(0, lc)
457 # third.lc.add_kv_line_col(1, lc)
458 # third.lc.filename = filename
459 return third or second or first
460 return doc
461
462 def load(self, doc, baseuri, loadingOptions, docRoot=None):
463 # type: (Any, str, LoadingOptions, Optional[str]) -> Any
464 if isinstance(doc, MutableSequence):
465 r = [] # type: List[Any]
466 for d in doc:
467 if isinstance(d, str):
468 resolved = self.resolve(d, baseuri, loadingOptions)
469 if isinstance(resolved, MutableSequence):
470 for i in resolved:
471 if i not in r:
472 r.append(i)
473 else:
474 if resolved not in r:
475 r.append(resolved)
476 else:
477 r.append(d)
478 doc = r
479 elif isinstance(doc, str):
480 doc = self.resolve(doc, baseuri, loadingOptions)
481
482 return self.inner.load(doc, baseuri, loadingOptions)
483
484
485 class _IdMapLoader(_Loader):
486 def __init__(self, inner, mapSubject, mapPredicate):
487 # type: (_Loader, str, Union[str, None]) -> None
488 self.inner = inner
489 self.mapSubject = mapSubject
490 self.mapPredicate = mapPredicate
491
492 def load(self, doc, baseuri, loadingOptions, docRoot=None):
493 # type: (Any, str, LoadingOptions, Optional[str]) -> Any
494 if isinstance(doc, MutableMapping):
495 r = [] # type: List[Any]
496 for k in sorted(doc.keys()):
497 val = doc[k]
498 if isinstance(val, CommentedMap):
499 v = copy.copy(val)
500 v.lc.data = val.lc.data
501 v.lc.filename = val.lc.filename
502 v[self.mapSubject] = k
503 r.append(v)
504 elif isinstance(val, MutableMapping):
505 v2 = copy.copy(val)
506 v2[self.mapSubject] = k
507 r.append(v2)
508 else:
509 if self.mapPredicate:
510 v3 = {self.mapPredicate: val}
511 v3[self.mapSubject] = k
512 r.append(v3)
513 else:
514 raise ValidationException("No mapPredicate")
515 doc = r
516 return self.inner.load(doc, baseuri, loadingOptions)
517
518
519 def _document_load(loader, doc, baseuri, loadingOptions):
520 # type: (_Loader, Any, str, LoadingOptions) -> Any
521 if isinstance(doc, str):
522 return _document_load_by_url(
523 loader, loadingOptions.fetcher.urljoin(baseuri, doc), loadingOptions
524 )
525
526 if isinstance(doc, MutableMapping):
527 if "$namespaces" in doc or "$schemas" in doc:
528 loadingOptions = LoadingOptions(
529 copyfrom=loadingOptions,
530 namespaces=doc.get("$namespaces", None),
531 schemas=doc.get("$schemas", None),
532 )
533 doc = {k: v for k, v in doc.items() if k not in ["$namespaces", "$schemas"]}
534
535 if "$base" in doc:
536 baseuri = doc["$base"]
537
538 if "$graph" in doc:
539 return loader.load(doc["$graph"], baseuri, loadingOptions)
540 else:
541 return loader.load(doc, baseuri, loadingOptions, docRoot=baseuri)
542
543 if isinstance(doc, MutableSequence):
544 return loader.load(doc, baseuri, loadingOptions)
545
546 raise ValidationException("Oops, we shouldn't be here!")
547
548
549 def _document_load_by_url(loader, url, loadingOptions):
550 # type: (_Loader, str, LoadingOptions) -> Any
551 if url in loadingOptions.idx:
552 return _document_load(loader, loadingOptions.idx[url], url, loadingOptions)
553
554 text = loadingOptions.fetcher.fetch_text(url)
555 if isinstance(text, bytes):
556 textIO = StringIO(text.decode("utf-8"))
557 else:
558 textIO = StringIO(text)
559 textIO.name = str(url)
560 result = yaml.main.round_trip_load(textIO, preserve_quotes=True)
561 add_lc_filename(result, url)
562
563 loadingOptions.idx[url] = result
564
565 loadingOptions = LoadingOptions(copyfrom=loadingOptions, fileuri=url)
566
567 return _document_load(loader, result, url, loadingOptions)
568
569
570 def file_uri(path, split_frag=False): # type: (str, bool) -> str
571 if path.startswith("file://"):
572 return path
573 if split_frag:
574 pathsp = path.split("#", 2)
575 frag = "#" + quote(str(pathsp[1])) if len(pathsp) == 2 else ""
576 urlpath = pathname2url(str(pathsp[0]))
577 else:
578 urlpath = pathname2url(path)
579 frag = ""
580 if urlpath.startswith("//"):
581 return f"file:{urlpath}{frag}"
582 else:
583 return f"file://{urlpath}{frag}"
584
585
586 def prefix_url(url, namespaces): # type: (str, Dict[str, str]) -> str
587 for k, v in namespaces.items():
588 if url.startswith(v):
589 return k + ":" + url[len(v) :]
590 return url
591
592
593 def save_relative_uri(uri, base_url, scoped_id, ref_scope, relative_uris):
594 # type: (str, str, bool, Optional[int], bool) -> Union[str, List[str]]
595 if not relative_uris or uri == base_url:
596 return uri
597 if isinstance(uri, MutableSequence):
598 return [
599 save_relative_uri(u, base_url, scoped_id, ref_scope, relative_uris)
600 for u in uri
601 ]
602 elif isinstance(uri, str):
603 urisplit = urlsplit(uri)
604 basesplit = urlsplit(base_url)
605 if urisplit.scheme == basesplit.scheme and urisplit.netloc == basesplit.netloc:
606 if urisplit.path != basesplit.path:
607 p = os.path.relpath(urisplit.path, os.path.dirname(basesplit.path))
608 if urisplit.fragment:
609 p = p + "#" + urisplit.fragment
610 return p
611
612 basefrag = basesplit.fragment + "/"
613 if ref_scope:
614 sp = basefrag.split("/")
615 i = 0
616 while i < ref_scope:
617 sp.pop()
618 i += 1
619 basefrag = "/".join(sp)
620
621 if urisplit.fragment.startswith(basefrag):
622 return urisplit.fragment[len(basefrag) :]
623 else:
624 return urisplit.fragment
625 return uri
626 else:
627 return save(uri, top=False, base_url=base_url)