Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/schema_salad/python_codegen_support.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
import copy import os import pathlib import re import tempfile import uuid as _uuid__ # pylint: disable=unused-import # noqa: F401 from io import StringIO from typing import ( Any, Dict, List, MutableMapping, MutableSequence, Optional, Sequence, Tuple, Type, Union, ) from urllib.parse import quote, urlsplit, urlunsplit from urllib.request import pathname2url from ruamel import yaml from ruamel.yaml.comments import CommentedMap from schema_salad.exceptions import SchemaSaladException, ValidationException from schema_salad.fetcher import DefaultFetcher, Fetcher from schema_salad.sourceline import SourceLine, add_lc_filename _vocab = {} # type: Dict[str, str] _rvocab = {} # type: Dict[str, str] class Savable: @classmethod def fromDoc(cls, _doc, baseuri, loadingOptions, docRoot=None): # type: (Any, str, LoadingOptions, Optional[str]) -> Savable pass def save(self, top=False, base_url="", relative_uris=True): # type: (bool, str, bool) -> Dict[str, str] pass class LoadingOptions: def __init__( self, fetcher=None, # type: Optional[Fetcher] namespaces=None, # type: Optional[Dict[str, str]] schemas=None, # type: Optional[Dict[str, str]] fileuri=None, # type: Optional[str] copyfrom=None, # type: Optional[LoadingOptions] original_doc=None, # type: Optional[Any] ): # type: (...) -> None self.idx = {} # type: Dict[str, Dict[str, Any]] self.fileuri = fileuri # type: Optional[str] self.namespaces = namespaces self.schemas = schemas self.original_doc = original_doc if copyfrom is not None: self.idx = copyfrom.idx if fetcher is None: fetcher = copyfrom.fetcher if fileuri is None: self.fileuri = copyfrom.fileuri if namespaces is None: self.namespaces = copyfrom.namespaces if schemas is None: self.schemas = copyfrom.schemas if fetcher is None: import requests from cachecontrol.caches import FileCache from cachecontrol.wrapper import CacheControl root = pathlib.Path(os.environ.get("HOME", tempfile.gettempdir())) session = CacheControl( requests.Session(), cache=FileCache(root / ".cache" / "salad"), ) self.fetcher: Fetcher = DefaultFetcher({}, session) else: self.fetcher = fetcher self.vocab = _vocab self.rvocab = _rvocab if namespaces is not None: self.vocab = self.vocab.copy() self.rvocab = self.rvocab.copy() for k, v in namespaces.items(): self.vocab[k] = v self.rvocab[v] = k def load_field(val, fieldtype, baseuri, loadingOptions): # type: (Union[str, Dict[str, str]], _Loader, str, LoadingOptions) -> Any if isinstance(val, MutableMapping): if "$import" in val: if loadingOptions.fileuri is None: raise SchemaSaladException("Cannot load $import without fileuri") return _document_load_by_url( fieldtype, loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$import"]), loadingOptions, ) elif "$include" in val: if loadingOptions.fileuri is None: raise SchemaSaladException("Cannot load $import without fileuri") val = loadingOptions.fetcher.fetch_text( loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$include"]) ) return fieldtype.load(val, baseuri, loadingOptions) save_type = Union[Dict[str, str], List[Union[Dict[str, str], List[Any], None]], None] def save( val, # type: Optional[Union[Savable, MutableSequence[Savable]]] top=True, # type: bool base_url="", # type: str relative_uris=True, # type: bool ): # type: (...) -> save_type if isinstance(val, Savable): return val.save(top=top, base_url=base_url, relative_uris=relative_uris) if isinstance(val, MutableSequence): return [ save(v, top=False, base_url=base_url, relative_uris=relative_uris) for v in val ] if isinstance(val, MutableMapping): newdict = {} for key in val: newdict[key] = save( val[key], top=False, base_url=base_url, relative_uris=relative_uris ) return newdict return val def expand_url( url, # type: str base_url, # type: str loadingOptions, # type: LoadingOptions scoped_id=False, # type: bool vocab_term=False, # type: bool scoped_ref=None, # type: Optional[int] ): # type: (...) -> str if url in ("@id", "@type"): return url if vocab_term and url in loadingOptions.vocab: return url if bool(loadingOptions.vocab) and ":" in url: prefix = url.split(":")[0] if prefix in loadingOptions.vocab: url = loadingOptions.vocab[prefix] + url[len(prefix) + 1 :] split = urlsplit(url) if ( (bool(split.scheme) and split.scheme in ["http", "https", "file"]) or url.startswith("$(") or url.startswith("${") ): pass elif scoped_id and not bool(split.fragment): splitbase = urlsplit(base_url) frg = "" if bool(splitbase.fragment): frg = splitbase.fragment + "/" + split.path else: frg = split.path pt = splitbase.path if splitbase.path != "" else "/" url = urlunsplit((splitbase.scheme, splitbase.netloc, pt, splitbase.query, frg)) elif scoped_ref is not None and not bool(split.fragment): splitbase = urlsplit(base_url) sp = splitbase.fragment.split("/") n = scoped_ref while n > 0 and len(sp) > 0: sp.pop() n -= 1 sp.append(url) url = urlunsplit( ( splitbase.scheme, splitbase.netloc, splitbase.path, splitbase.query, "/".join(sp), ) ) else: url = loadingOptions.fetcher.urljoin(base_url, url) if vocab_term: split = urlsplit(url) if bool(split.scheme): if url in loadingOptions.rvocab: return loadingOptions.rvocab[url] else: raise ValidationException(f"Term '{url}' not in vocabulary") return url class _Loader: def load(self, doc, baseuri, loadingOptions, docRoot=None): # type: (Any, str, LoadingOptions, Optional[str]) -> Any pass class _AnyLoader(_Loader): def load(self, doc, baseuri, loadingOptions, docRoot=None): # type: (Any, str, LoadingOptions, Optional[str]) -> Any if doc is not None: return doc raise ValidationException("Expected non-null") class _PrimitiveLoader(_Loader): def __init__(self, tp): # type: (Union[type, Tuple[Type[str], Type[str]]]) -> None self.tp = tp def load(self, doc, baseuri, loadingOptions, docRoot=None): # type: (Any, str, LoadingOptions, Optional[str]) -> Any if not isinstance(doc, self.tp): raise ValidationException( "Expected a {} but got {}".format( self.tp.__class__.__name__, doc.__class__.__name__ ) ) return doc def __repr__(self): # type: () -> str return str(self.tp) class _ArrayLoader(_Loader): def __init__(self, items): # type: (_Loader) -> None self.items = items def load(self, doc, baseuri, loadingOptions, docRoot=None): # type: (Any, str, LoadingOptions, Optional[str]) -> Any if not isinstance(doc, MutableSequence): raise ValidationException("Expected a list") r = [] # type: List[Any] errors = [] # type: List[SchemaSaladException] for i in range(0, len(doc)): try: lf = load_field( doc[i], _UnionLoader((self, self.items)), baseuri, loadingOptions ) if isinstance(lf, MutableSequence): r.extend(lf) else: r.append(lf) except ValidationException as e: errors.append(e.with_sourceline(SourceLine(doc, i, str))) if errors: raise ValidationException("", None, errors) return r def __repr__(self): # type: () -> str return f"array<{self.items}>" class _EnumLoader(_Loader): def __init__(self, symbols): # type: (Sequence[str]) -> None self.symbols = symbols def load(self, doc, baseuri, loadingOptions, docRoot=None): # type: (Any, str, LoadingOptions, Optional[str]) -> Any if doc in self.symbols: return doc else: raise ValidationException(f"Expected one of {self.symbols}") class _SecondaryDSLLoader(_Loader): def __init__(self, inner): # type: (_Loader) -> None self.inner = inner def load(self, doc, baseuri, loadingOptions, docRoot=None): # type: (Any, str, LoadingOptions, Optional[str]) -> Any r: List[Dict[str, Any]] = [] if isinstance(doc, MutableSequence): for d in doc: if isinstance(d, str): if d.endswith("?"): r.append({"pattern": d[:-1], "required": False}) else: r.append({"pattern": d}) elif isinstance(d, dict): new_dict: Dict[str, Any] = {} if "pattern" in d: new_dict["pattern"] = d.pop("pattern") else: raise ValidationException( "Missing pattern in secondaryFiles specification entry: {}".format( d ) ) new_dict["required"] = ( d.pop("required") if "required" in d else None ) if len(d): raise ValidationException( "Unallowed values in secondaryFiles specification entry: {}".format( d ) ) else: raise ValidationException( "Expected a string or sequence of (strings or mappings)." ) elif isinstance(doc, str): if doc.endswith("?"): r.append({"pattern": doc[:-1], "required": False}) else: r.append({"pattern": doc}) else: raise ValidationException("Expected str or sequence of str") return self.inner.load(r, baseuri, loadingOptions, docRoot) class _RecordLoader(_Loader): def __init__(self, classtype): # type: (Type[Savable]) -> None self.classtype = classtype def load(self, doc, baseuri, loadingOptions, docRoot=None): # type: (Any, str, LoadingOptions, Optional[str]) -> Any if not isinstance(doc, MutableMapping): raise ValidationException("Expected a dict") return self.classtype.fromDoc(doc, baseuri, loadingOptions, docRoot=docRoot) def __repr__(self): # type: () -> str return str(self.classtype) class _ExpressionLoader(_Loader): def __init__(self, items: Type[str]) -> None: self.items = items def load(self, doc, baseuri, loadingOptions, docRoot=None): # type: (Any, str, LoadingOptions, Optional[str]) -> Any if not isinstance(doc, str): raise ValidationException("Expected a str") return doc class _UnionLoader(_Loader): def __init__(self, alternates): # type: (Sequence[_Loader]) -> None self.alternates = alternates def load(self, doc, baseuri, loadingOptions, docRoot=None): # type: (Any, str, LoadingOptions, Optional[str]) -> Any errors = [] for t in self.alternates: try: return t.load(doc, baseuri, loadingOptions, docRoot=docRoot) except ValidationException as e: errors.append( ValidationException(f"tried {t.__class__.__name__} but", None, [e]) ) raise ValidationException("", None, errors, "-") def __repr__(self): # type: () -> str return " | ".join(str(a) for a in self.alternates) class _URILoader(_Loader): def __init__(self, inner, scoped_id, vocab_term, scoped_ref): # type: (_Loader, bool, bool, Union[int, None]) -> None self.inner = inner self.scoped_id = scoped_id self.vocab_term = vocab_term self.scoped_ref = scoped_ref def load(self, doc, baseuri, loadingOptions, docRoot=None): # type: (Any, str, LoadingOptions, Optional[str]) -> Any if isinstance(doc, MutableSequence): newdoc = [] for i in doc: if isinstance(i, str): newdoc.append( expand_url( i, baseuri, loadingOptions, self.scoped_id, self.vocab_term, self.scoped_ref, ) ) else: newdoc.append(i) doc = newdoc elif isinstance(doc, str): doc = expand_url( doc, baseuri, loadingOptions, self.scoped_id, self.vocab_term, self.scoped_ref, ) return self.inner.load(doc, baseuri, loadingOptions) class _TypeDSLLoader(_Loader): typeDSLregex = re.compile(r"^([^[?]+)(\[\])?(\?)?$") def __init__(self, inner, refScope): # type: (_Loader, Union[int, None]) -> None self.inner = inner self.refScope = refScope def resolve( self, doc, # type: str baseuri, # type: str loadingOptions, # type: LoadingOptions ): # type: (...) -> Union[List[Union[Dict[str, str], str]], Dict[str, str], str] m = self.typeDSLregex.match(doc) if m: group1 = m.group(1) assert group1 is not None # nosec first = expand_url( group1, baseuri, loadingOptions, False, True, self.refScope ) second = third = None if bool(m.group(2)): second = {"type": "array", "items": first} # second = CommentedMap((("type", "array"), # ("items", first))) # second.lc.add_kv_line_col("type", lc) # second.lc.add_kv_line_col("items", lc) # second.lc.filename = filename if bool(m.group(3)): third = ["null", second or first] # third = CommentedSeq(["null", second or first]) # third.lc.add_kv_line_col(0, lc) # third.lc.add_kv_line_col(1, lc) # third.lc.filename = filename return third or second or first return doc def load(self, doc, baseuri, loadingOptions, docRoot=None): # type: (Any, str, LoadingOptions, Optional[str]) -> Any if isinstance(doc, MutableSequence): r = [] # type: List[Any] for d in doc: if isinstance(d, str): resolved = self.resolve(d, baseuri, loadingOptions) if isinstance(resolved, MutableSequence): for i in resolved: if i not in r: r.append(i) else: if resolved not in r: r.append(resolved) else: r.append(d) doc = r elif isinstance(doc, str): doc = self.resolve(doc, baseuri, loadingOptions) return self.inner.load(doc, baseuri, loadingOptions) class _IdMapLoader(_Loader): def __init__(self, inner, mapSubject, mapPredicate): # type: (_Loader, str, Union[str, None]) -> None self.inner = inner self.mapSubject = mapSubject self.mapPredicate = mapPredicate def load(self, doc, baseuri, loadingOptions, docRoot=None): # type: (Any, str, LoadingOptions, Optional[str]) -> Any if isinstance(doc, MutableMapping): r = [] # type: List[Any] for k in sorted(doc.keys()): val = doc[k] if isinstance(val, CommentedMap): v = copy.copy(val) v.lc.data = val.lc.data v.lc.filename = val.lc.filename v[self.mapSubject] = k r.append(v) elif isinstance(val, MutableMapping): v2 = copy.copy(val) v2[self.mapSubject] = k r.append(v2) else: if self.mapPredicate: v3 = {self.mapPredicate: val} v3[self.mapSubject] = k r.append(v3) else: raise ValidationException("No mapPredicate") doc = r return self.inner.load(doc, baseuri, loadingOptions) def _document_load(loader, doc, baseuri, loadingOptions): # type: (_Loader, Any, str, LoadingOptions) -> Any if isinstance(doc, str): return _document_load_by_url( loader, loadingOptions.fetcher.urljoin(baseuri, doc), loadingOptions ) if isinstance(doc, MutableMapping): if "$namespaces" in doc or "$schemas" in doc: loadingOptions = LoadingOptions( copyfrom=loadingOptions, namespaces=doc.get("$namespaces", None), schemas=doc.get("$schemas", None), ) doc = {k: v for k, v in doc.items() if k not in ["$namespaces", "$schemas"]} if "$base" in doc: baseuri = doc["$base"] if "$graph" in doc: return loader.load(doc["$graph"], baseuri, loadingOptions) else: return loader.load(doc, baseuri, loadingOptions, docRoot=baseuri) if isinstance(doc, MutableSequence): return loader.load(doc, baseuri, loadingOptions) raise ValidationException("Oops, we shouldn't be here!") def _document_load_by_url(loader, url, loadingOptions): # type: (_Loader, str, LoadingOptions) -> Any if url in loadingOptions.idx: return _document_load(loader, loadingOptions.idx[url], url, loadingOptions) text = loadingOptions.fetcher.fetch_text(url) if isinstance(text, bytes): textIO = StringIO(text.decode("utf-8")) else: textIO = StringIO(text) textIO.name = str(url) result = yaml.main.round_trip_load(textIO, preserve_quotes=True) add_lc_filename(result, url) loadingOptions.idx[url] = result loadingOptions = LoadingOptions(copyfrom=loadingOptions, fileuri=url) return _document_load(loader, result, url, loadingOptions) def file_uri(path, split_frag=False): # type: (str, bool) -> str if path.startswith("file://"): return path if split_frag: pathsp = path.split("#", 2) frag = "#" + quote(str(pathsp[1])) if len(pathsp) == 2 else "" urlpath = pathname2url(str(pathsp[0])) else: urlpath = pathname2url(path) frag = "" if urlpath.startswith("//"): return f"file:{urlpath}{frag}" else: return f"file://{urlpath}{frag}" def prefix_url(url, namespaces): # type: (str, Dict[str, str]) -> str for k, v in namespaces.items(): if url.startswith(v): return k + ":" + url[len(v) :] return url def save_relative_uri(uri, base_url, scoped_id, ref_scope, relative_uris): # type: (str, str, bool, Optional[int], bool) -> Union[str, List[str]] if not relative_uris or uri == base_url: return uri if isinstance(uri, MutableSequence): return [ save_relative_uri(u, base_url, scoped_id, ref_scope, relative_uris) for u in uri ] elif isinstance(uri, str): urisplit = urlsplit(uri) basesplit = urlsplit(base_url) if urisplit.scheme == basesplit.scheme and urisplit.netloc == basesplit.netloc: if urisplit.path != basesplit.path: p = os.path.relpath(urisplit.path, os.path.dirname(basesplit.path)) if urisplit.fragment: p = p + "#" + urisplit.fragment return p basefrag = basesplit.fragment + "/" if ref_scope: sp = basefrag.split("/") i = 0 while i < ref_scope: sp.pop() i += 1 basefrag = "/".join(sp) if urisplit.fragment.startswith(basefrag): return urisplit.fragment[len(basefrag) :] else: return urisplit.fragment return uri else: return save(uri, top=False, base_url=base_url)