Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/schema_salad/ref_resolver.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
import copy import logging import os import pathlib import re import tempfile import xml.sax # nosec from io import StringIO from typing import ( Any, Callable, Dict, List, MutableMapping, MutableSequence, Optional, Set, TypeVar, Union, cast, ) import requests from cachecontrol.caches import FileCache from cachecontrol.wrapper import CacheControl from rdflib.graph import Graph from rdflib.namespace import OWL, RDF, RDFS from rdflib.plugins.parsers.notation3 import BadSyntax from ruamel import yaml from ruamel.yaml.comments import CommentedMap, CommentedSeq, LineCol from six.moves import urllib from .exceptions import SchemaSaladException, ValidationException from .fetcher import DefaultFetcher from .sourceline import SourceLine, add_lc_filename, relname from .utils import ( AttachmentsType, CacheType, ContextType, FetcherCallableType, IdxResultType, IdxType, ResolvedRefType, ResolveType, aslist, onWindows, ) _logger = logging.getLogger("salad") typeDSLregex = re.compile(r"^([^[?]+)(\[\])?(\?)?$") def file_uri(path: str, split_frag: bool = False) -> str: if path.startswith("file://"): return path if split_frag: pathsp = path.split("#", 2) if len(pathsp) == 2: frag = "#" + urllib.parse.quote(str(pathsp[1])) else: frag = "" urlpath = urllib.request.pathname2url(str(pathsp[0])) else: urlpath = urllib.request.pathname2url(path) frag = "" if urlpath.startswith("//"): return f"file:{urlpath}{frag}" return f"file://{urlpath}{frag}" def uri_file_path(url: str) -> str: split = urllib.parse.urlsplit(url) if split.scheme == "file": return urllib.request.url2pathname(str(split.path)) + ( "#" + urllib.parse.unquote(str(split.fragment)) if bool(split.fragment) else "" ) raise ValidationException(f"Not a file URI: {url}") def to_validation_exception(e: yaml.error.MarkedYAMLError) -> ValidationException: fname_regex = re.compile(r"^file://" + re.escape(os.getcwd()) + "/") exc = ValidationException(e.problem) mark = e.problem_mark exc.file = re.sub(fname_regex, "", mark.name) exc.start = (mark.line + 1, mark.column + 1) exc.end = None if e.context: parent = ValidationException(e.context) mark = e.context_mark parent.file = re.sub(fname_regex, "", mark.name) parent.start = (mark.line + 1, mark.column + 1) parent.end = None parent.children = [exc] return parent return exc class NormDict(Dict[str, Union[CommentedMap, CommentedSeq, str, None]]): """A Dict where all keys are normalized using the provided function.""" def __init__(self, normalize: Callable[[str], str] = str) -> None: super().__init__() self.normalize = normalize def __eq__(self, other: Any) -> bool: return super().__eq__(other) def __getitem__(self, key): # type: (Any) -> Any return super().__getitem__(self.normalize(key)) def __setitem__(self, key, value): # type: (Any, Any) -> Any return super().__setitem__(self.normalize(key), value) def __delitem__(self, key): # type: (Any) -> Any return super().__delitem__(self.normalize(key)) def __contains__(self, key: Any) -> bool: return super().__contains__(self.normalize(key)) def SubLoader(loader: "Loader") -> "Loader": return Loader( loader.ctx, schemagraph=loader.graph, foreign_properties=loader.foreign_properties, idx=loader.idx, cache=loader.cache, fetcher_constructor=loader.fetcher_constructor, skip_schemas=loader.skip_schemas, url_fields=loader.url_fields, allow_attachments=loader.allow_attachments, session=loader.session, ) class Loader: def __init__( self, ctx: ContextType, schemagraph: Optional[Graph] = None, foreign_properties: Optional[Set[str]] = None, idx: Optional[IdxType] = None, cache: Optional[CacheType] = None, session: Optional[requests.sessions.Session] = None, fetcher_constructor: Optional[FetcherCallableType] = None, skip_schemas: Optional[bool] = None, url_fields: Optional[Set[str]] = None, allow_attachments: Optional[AttachmentsType] = None, doc_cache: Union[str, bool] = True, ) -> None: self.idx = ( NormDict(lambda url: urllib.parse.urlsplit(url).geturl()) if idx is None else idx ) # type: IdxType self.ctx = {} # type: ContextType self.graph = schemagraph if schemagraph is not None else Graph() self.foreign_properties = ( set(foreign_properties) if foreign_properties is not None else set() ) self.cache = cache if cache is not None else {} self.skip_schemas = skip_schemas if skip_schemas is not None else False if session is None: if doc_cache is False: self.session = requests.Session() elif doc_cache is True: root = pathlib.Path(os.environ.get("HOME", tempfile.gettempdir())) self.session = CacheControl( requests.Session(), cache=FileCache(root / ".cache" / "salad"), ) elif isinstance(doc_cache, str): self.session = CacheControl( requests.Session(), cache=FileCache(doc_cache) ) else: self.session = session self.fetcher_constructor = ( fetcher_constructor if fetcher_constructor is not None else DefaultFetcher ) self.fetcher = self.fetcher_constructor(self.cache, self.session) self.fetch_text = self.fetcher.fetch_text self.check_exists = self.fetcher.check_exists self.url_fields = ( set() if url_fields is None else set(url_fields) ) # type: Set[str] self.scoped_ref_fields = {} # type: Dict[str, int] self.vocab_fields = set() # type: Set[str] self.identifiers = [] # type: List[str] self.identity_links = set() # type: Set[str] self.standalone = None # type: Optional[Set[str]] self.nolinkcheck = set() # type: Set[str] self.vocab = {} # type: Dict[str, str] self.rvocab = {} # type: Dict[str, str] self.idmap = {} # type: Dict[str, str] self.mapPredicate = {} # type: Dict[str, str] self.type_dsl_fields = set() # type: Set[str] self.subscopes = {} # type: Dict[str, str] self.secondaryFile_dsl_fields = set() # type: Set[str] self.allow_attachments = allow_attachments self.add_context(ctx) def expand_url( self, url: str, base_url: str, scoped_id: bool = False, vocab_term: bool = False, scoped_ref: Optional[int] = None, ) -> str: if url in ("@id", "@type"): return url if vocab_term and url in self.vocab: return url if url.startswith("_:"): return url if bool(self.vocab) and ":" in url: prefix = url.split(":")[0] if prefix in self.vocab: url = self.vocab[prefix] + url[len(prefix) + 1 :] elif prefix not in self.fetcher.supported_schemes(): _logger.warning( "URI prefix '%s' of '%s' not recognized, are you missing a " "$namespaces section?", prefix, url, ) split = urllib.parse.urlsplit(url) if ( (bool(split.scheme) and split.scheme in ["http", "https", "file"]) or url.startswith("$(") or url.startswith("${") ): pass elif scoped_id and not bool(split.fragment): splitbase = urllib.parse.urlsplit(base_url) frg = ( splitbase.fragment + "/" + split.path if bool(splitbase.fragment) else split.path ) pt = splitbase.path if splitbase.path != "" else "/" url = urllib.parse.urlunsplit( (splitbase.scheme, splitbase.netloc, pt, splitbase.query, frg) ) elif scoped_ref is not None and not split.fragment: pass else: url = self.fetcher.urljoin(base_url, url) if vocab_term and url in self.rvocab: return self.rvocab[url] return url def _add_properties(self, s: str) -> None: for _, _, rng in self.graph.triples((s, RDFS.range, None)): literal = ( str(rng).startswith("http://www.w3.org/2001/XMLSchema#") and str(rng) != "http://www.w3.org/2001/XMLSchema#anyURI" ) or str(rng) == "http://www.w3.org/2000/01/rdf-schema#Literal" if not literal: self.url_fields.add(str(s)) self.foreign_properties.add(str(s)) def add_namespaces(self, ns: Dict[str, str]) -> None: self.vocab.update(ns) def add_schemas(self, ns: Union[List[str], str], base_url: str) -> None: if self.skip_schemas: return for sch in aslist(ns): try: fetchurl = self.fetcher.urljoin(base_url, sch) if fetchurl not in self.cache or self.cache[fetchurl] is True: _logger.debug("Getting external schema %s", fetchurl) content = self.fetch_text(fetchurl) self.cache[fetchurl] = newGraph = Graph() for fmt in ["xml", "turtle", "rdfa"]: try: newGraph.parse( data=content, format=fmt, publicID=str(fetchurl) ) self.graph += self.cache[fetchurl] break except xml.sax.SAXParseException: pass except TypeError: pass except BadSyntax: pass except Exception as e: _logger.warning( "Could not load extension schema %s: %s", fetchurl, str(e) ) for s, _, _ in self.graph.triples((None, RDF.type, RDF.Property)): self._add_properties(s) for s, _, o in self.graph.triples((None, RDFS.subPropertyOf, None)): self._add_properties(s) self._add_properties(o) for s, _, _ in self.graph.triples((None, RDFS.range, None)): self._add_properties(s) for s, _, _ in self.graph.triples((None, RDF.type, OWL.ObjectProperty)): self._add_properties(s) for s, _, _ in self.graph.triples((None, None, None)): self.idx[str(s)] = None def add_context(self, newcontext: ContextType) -> None: if bool(self.vocab): raise ValidationException("Refreshing context that already has stuff in it") self.url_fields = {"$schemas"} self.scoped_ref_fields.clear() self.vocab_fields.clear() self.identifiers.clear() self.identity_links.clear() self.standalone = set() self.nolinkcheck.clear() self.idmap.clear() self.mapPredicate.clear() self.vocab.clear() self.rvocab.clear() self.type_dsl_fields.clear() self.secondaryFile_dsl_fields.clear() self.subscopes.clear() self.ctx.update(_copy_dict_without_key(newcontext, "@context")) _logger.debug("ctx is %s", self.ctx) for key, value in self.ctx.items(): if value == "@id": self.identifiers.append(key) self.identity_links.add(key) elif isinstance(value, MutableMapping): if value.get("@type") == "@id": self.url_fields.add(key) if "refScope" in value: self.scoped_ref_fields[key] = value["refScope"] if value.get("identity", False): self.identity_links.add(key) if value.get("@type") == "@vocab": self.url_fields.add(key) self.vocab_fields.add(key) if "refScope" in value: self.scoped_ref_fields[key] = value["refScope"] if value.get("typeDSL"): self.type_dsl_fields.add(key) if value.get("secondaryFilesDSL"): self.secondaryFile_dsl_fields.add(key) if value.get("noLinkCheck"): self.nolinkcheck.add(key) if value.get("mapSubject"): self.idmap[key] = value["mapSubject"] if value.get("mapPredicate"): self.mapPredicate[key] = value["mapPredicate"] if value.get("@id"): self.vocab[key] = value["@id"] if value.get("subscope"): self.subscopes[key] = value["subscope"] elif isinstance(value, str): self.vocab[key] = value for k, v in self.vocab.items(): self.rvocab[self.expand_url(v, "", scoped_id=False)] = k self.identifiers.sort() _logger.debug("identifiers is %s", self.identifiers) _logger.debug("identity_links is %s", self.identity_links) _logger.debug("url_fields is %s", self.url_fields) _logger.debug("vocab_fields is %s", self.vocab_fields) _logger.debug("vocab is %s", self.vocab) def resolve_ref( self, ref: ResolveType, base_url: Optional[str] = None, checklinks: bool = True, strict_foreign_properties: bool = False, content_types: Optional[List[str]] = None, # Expected content-types ) -> ResolvedRefType: lref = ref obj = None # type: Optional[CommentedMap] resolved_obj = None # type: ResolveType inc = False mixin = None # type: Optional[MutableMapping[str, str]] if not base_url: base_url = file_uri(os.getcwd()) + "/" sl = SourceLine(None, None) # If `ref` is a dict, look for special directives. if isinstance(lref, CommentedMap): obj = lref if "$import" in obj: sl = SourceLine(obj, "$import") if len(obj) == 1: lref = obj["$import"] obj = None else: raise ValidationException( f"'$import' must be the only field in {obj}", sl ) elif "$include" in obj: sl = SourceLine(obj, "$include") if len(obj) == 1: lref = obj["$include"] inc = True obj = None else: raise ValidationException( f"'$include' must be the only field in {obj}", sl ) elif "$mixin" in obj: sl = SourceLine(obj, "$mixin") lref = obj["$mixin"] mixin = obj obj = None else: lref = None for identifier in self.identifiers: if identifier in obj: lref = obj[identifier] break if not lref: raise ValidationException( "Object `{}` does not have identifier field in {}".format( obj, self.identifiers ), sl, ) if not isinstance(lref, str): raise ValidationException( "Expected CommentedMap or string, got {}: `{}`".format(type(lref), lref) ) if isinstance(lref, str) and os.sep == "\\": # Convert Windows path separator in ref lref = lref.replace("\\", "/") url = self.expand_url(lref, base_url, scoped_id=(obj is not None)) # Has this reference been loaded already? if url in self.idx and (not mixin): resolved_obj = self.idx[url] if isinstance(resolved_obj, MutableMapping): metadata = self.idx.get( urllib.parse.urldefrag(url)[0], CommentedMap() ) # type: Union[CommentedMap, CommentedSeq, str, None] if isinstance(metadata, MutableMapping): if "$graph" in resolved_obj: metadata = _copy_dict_without_key(resolved_obj, "$graph") return resolved_obj["$graph"], metadata else: return resolved_obj, metadata else: raise ValidationException( "Expected CommentedMap, got {}: `{}`".format( type(metadata), metadata ) ) elif isinstance(resolved_obj, MutableSequence): metadata = self.idx.get(urllib.parse.urldefrag(url)[0], CommentedMap()) if isinstance(metadata, MutableMapping): return resolved_obj, metadata else: return resolved_obj, CommentedMap() elif isinstance(resolved_obj, str): return resolved_obj, CommentedMap() else: raise ValidationException( "Expected MutableMapping or MutableSequence, got {}: `{}`".format( type(resolved_obj), resolved_obj ) ) # "$include" directive means load raw text if inc: return self.fetch_text(url), CommentedMap() doc = None if isinstance(obj, MutableMapping): for identifier in self.identifiers: obj[identifier] = url doc_url = url else: # Load structured document doc_url, frg = urllib.parse.urldefrag(url) if doc_url in self.idx and (not mixin): # If the base document is in the index, it was already loaded, # so if we didn't find the reference earlier then it must not # exist. raise ValidationException( f"Reference `#{frg}` not found in file `{doc_url}`.", sl ) doc = self.fetch( doc_url, inject_ids=(not mixin), content_types=content_types ) # Recursively expand urls and resolve directives if bool(mixin): doc = copy.deepcopy(doc) if isinstance(doc, CommentedMap) and mixin is not None: doc.update(mixin) del doc["$mixin"] resolved_obj, metadata = self.resolve_all( doc, base_url, file_base=doc_url, checklinks=checklinks, strict_foreign_properties=strict_foreign_properties, ) else: resolved_obj, metadata = self.resolve_all( doc or obj, doc_url, checklinks=checklinks, strict_foreign_properties=strict_foreign_properties, ) # Requested reference should be in the index now, otherwise it's a bad # reference if not bool(mixin): if url in self.idx: resolved_obj = self.idx[url] else: raise ValidationException( "Reference `{}` is not in the index. Index contains: {}".format( url, ", ".join(self.idx) ) ) if isinstance(resolved_obj, CommentedMap): if "$graph" in resolved_obj: metadata = _copy_dict_without_key(resolved_obj, "$graph") return resolved_obj["$graph"], metadata else: return resolved_obj, metadata else: return resolved_obj, metadata def _resolve_idmap( self, document: CommentedMap, loader: "Loader", ) -> None: # Convert fields with mapSubject into lists # use mapPredicate if the mapped value isn't a dict. for idmapField in loader.idmap: if idmapField in document: idmapFieldValue = document[idmapField] if ( isinstance(idmapFieldValue, MutableMapping) and "$import" not in idmapFieldValue and "$include" not in idmapFieldValue ): ls = CommentedSeq() for k in sorted(idmapFieldValue.keys()): val = idmapFieldValue[k] v = None # type: Optional[CommentedMap] if not isinstance(val, CommentedMap): if idmapField in loader.mapPredicate: v = CommentedMap( ((loader.mapPredicate[idmapField], val),) ) v.lc.add_kv_line_col( loader.mapPredicate[idmapField], document[idmapField].lc.data[k], ) v.lc.filename = document.lc.filename else: raise ValidationException( "mapSubject '{}' value '{}' is not a dict " "and does not have a mapPredicate.".format(k, v) ) else: v = val v[loader.idmap[idmapField]] = k v.lc.add_kv_line_col( loader.idmap[idmapField], document[idmapField].lc.data[k] ) v.lc.filename = document.lc.filename ls.lc.add_kv_line_col(len(ls), document[idmapField].lc.data[k]) ls.lc.filename = document.lc.filename ls.append(v) document[idmapField] = ls def _type_dsl( self, t: Union[str, CommentedMap, CommentedSeq], lc: LineCol, filename: str, ) -> Union[str, CommentedMap, CommentedSeq]: if not isinstance(t, str): return t m = typeDSLregex.match(t) if not m: return t first = m.group(1) assert first # nosec second = third = None if bool(m.group(2)): second = CommentedMap((("type", "array"), ("items", first))) second.lc.add_kv_line_col("type", lc) second.lc.add_kv_line_col("items", lc) second.lc.filename = filename if bool(m.group(3)): third = CommentedSeq(["null", second or first]) third.lc.add_kv_line_col(0, lc) third.lc.add_kv_line_col(1, lc) third.lc.filename = filename return third or second or first def _secondaryFile_dsl( self, t: Union[str, CommentedMap, CommentedSeq], lc: LineCol, filename: str, ) -> Union[str, CommentedMap, CommentedSeq]: if not isinstance(t, str): return t pat = t[0:-1] if t.endswith("?") else t req = False if t.endswith("?") else None # type: Optional[bool] second = CommentedMap((("pattern", pat), ("required", req))) second.lc.add_kv_line_col("pattern", lc) second.lc.add_kv_line_col("required", lc) second.lc.filename = filename return second def _apply_dsl( self, datum: Union[str, CommentedMap, CommentedSeq], d: str, loader: "Loader", lc: LineCol, filename: str, ) -> Union[str, CommentedMap, CommentedSeq]: if d in loader.type_dsl_fields: return self._type_dsl(datum, lc, filename) elif d in loader.secondaryFile_dsl_fields: return self._secondaryFile_dsl(datum, lc, filename) else: return datum def _resolve_dsl( self, document: CommentedMap, loader: "Loader", ) -> None: fields = list(loader.type_dsl_fields) fields.extend(loader.secondaryFile_dsl_fields) for d in fields: if d in document: datum2 = datum = document[d] if isinstance(datum, str): datum2 = self._apply_dsl( datum, d, loader, document.lc.data[d], document.lc.filename ) elif isinstance(datum, CommentedSeq): datum2 = CommentedSeq() for n, t in enumerate(datum): if datum.lc and datum.lc.data: datum2.lc.add_kv_line_col(len(datum2), datum.lc.data[n]) datum2.append( self._apply_dsl( t, d, loader, datum.lc.data[n], document.lc.filename ) ) else: datum2.append(self._apply_dsl(t, d, loader, LineCol(), "")) if isinstance(datum2, CommentedSeq): datum3 = CommentedSeq() seen = [] # type: List[str] for i, item in enumerate(datum2): if isinstance(item, CommentedSeq): for j, v in enumerate(item): if v not in seen: datum3.lc.add_kv_line_col( len(datum3), item.lc.data[j] ) datum3.append(v) seen.append(v) else: if item not in seen: if datum2.lc and datum2.lc.data: datum3.lc.add_kv_line_col( len(datum3), datum2.lc.data[i] ) datum3.append(item) seen.append(item) document[d] = datum3 else: document[d] = datum2 def _resolve_identifier( self, document: CommentedMap, loader: "Loader", base_url: str ) -> str: # Expand identifier field (usually 'id') to resolve scope for identifer in loader.identifiers: if identifer in document: if isinstance(document[identifer], str): document[identifer] = loader.expand_url( document[identifer], base_url, scoped_id=True ) if document[identifer] not in loader.idx or isinstance( loader.idx[document[identifer]], str ): loader.idx[document[identifer]] = document base_url = document[identifer] else: raise ValidationException( "identifier field '{}' must be a string".format( document[identifer] ) ) return base_url def _resolve_identity( self, document: Dict[str, Union[str, MutableSequence[Union[str, CommentedMap]]]], loader: "Loader", base_url: str, ) -> None: # Resolve scope for identity fields (fields where the value is the # identity of a standalone node, such as enum symbols) for identifer in loader.identity_links: if identifer in document and isinstance( document[identifer], MutableSequence ): for n, v in enumerate(document[identifer]): if isinstance(v, str): document[identifer][n] = loader.expand_url( # type: ignore v, base_url, scoped_id=True ) if document[identifer][n] not in loader.idx: loader.idx[document[identifer][n]] = v def _normalize_fields(self, document: CommentedMap, loader: "Loader") -> None: # Normalize fields which are prefixed or full URIn to vocabulary terms for d in list(document.keys()): if isinstance(d, str): d2 = loader.expand_url(d, "", scoped_id=False, vocab_term=True) if d != d2: document[d2] = document[d] document.lc.add_kv_line_col(d2, document.lc.data[d]) del document[d] def _resolve_uris( self, document: Dict[str, Union[str, MutableSequence[Union[str, CommentedMap]]]], loader: "Loader", base_url: str, ) -> None: # Resolve remaining URLs based on document base for d in loader.url_fields: if d in document: datum = document[d] if isinstance(datum, str): document[d] = loader.expand_url( datum, base_url, scoped_id=False, vocab_term=(d in loader.vocab_fields), scoped_ref=loader.scoped_ref_fields.get(d), ) elif isinstance(datum, MutableSequence): for i, url in enumerate(datum): if isinstance(url, str): datum[i] = loader.expand_url( url, base_url, scoped_id=False, vocab_term=(d in loader.vocab_fields), scoped_ref=loader.scoped_ref_fields.get(d), ) def resolve_all( self, document: ResolveType, base_url: str, file_base: Optional[str] = None, checklinks: bool = True, strict_foreign_properties: bool = False, ) -> ResolvedRefType: loader = self metadata = CommentedMap() if file_base is None: file_base = base_url if isinstance(document, CommentedMap): # Handle $import and $include if "$import" in document or "$include" in document: return self.resolve_ref( document, base_url=file_base, checklinks=checklinks, strict_foreign_properties=strict_foreign_properties, ) elif "$mixin" in document: return self.resolve_ref( document, base_url=base_url, checklinks=checklinks, strict_foreign_properties=strict_foreign_properties, ) elif isinstance(document, CommentedSeq): pass elif isinstance(document, (list, dict)): raise ValidationException( "Expected CommentedMap or CommentedSeq, got {}: `{}`".format( type(document), document ) ) else: return (document, metadata) newctx = None # type: Optional["Loader"] if isinstance(document, CommentedMap): # Handle $base, $profile, $namespaces, $schemas and $graph if "$base" in document: base_url = document["$base"] if "$profile" in document: if newctx is None: newctx = SubLoader(self) newctx.add_namespaces(document.get("$namespaces", CommentedMap())) newctx.add_schemas(document.get("$schemas", []), document["$profile"]) if "$namespaces" in document: if newctx is None: newctx = SubLoader(self) newctx.add_namespaces(document["$namespaces"]) if "$schemas" in document: if newctx is None: newctx = SubLoader(self) newctx.add_schemas(document["$schemas"], file_base) if newctx is not None: loader = newctx for identifer in loader.identity_links: if identifer in document: if isinstance(document[identifer], str): document[identifer] = loader.expand_url( document[identifer], base_url, scoped_id=True ) loader.idx[document[identifer]] = document metadata = document if "$graph" in document: document = document["$graph"] if isinstance(document, CommentedMap): self._normalize_fields(document, loader) self._resolve_idmap(document, loader) self._resolve_dsl(document, loader) base_url = self._resolve_identifier(document, loader, base_url) self._resolve_identity(document, loader, base_url) self._resolve_uris(document, loader, base_url) try: for key, val in document.items(): subscope = "" # type: str if key in loader.subscopes: subscope = "/" + loader.subscopes[key] document[key], _ = loader.resolve_all( val, base_url + subscope, file_base=file_base, checklinks=False ) except ValidationException as v: _logger.warning("loader is %s", id(loader), exc_info=True) raise ValidationException( "({}) ({}) Validation error in field {}:".format( id(loader), file_base, key ), None, [v], ) from v elif isinstance(document, CommentedSeq): i = 0 try: while i < len(document): val = document[i] if isinstance(val, CommentedMap) and ( "$import" in val or "$mixin" in val ): l, import_metadata = loader.resolve_ref( val, base_url=file_base, checklinks=False ) metadata.setdefault("$import_metadata", {}) for identifier in loader.identifiers: if identifier in import_metadata: metadata["$import_metadata"][ import_metadata[identifier] ] = import_metadata if isinstance(l, CommentedSeq): lc = document.lc.data[i] del document[i] llen = len(l) for j in range(len(document) + llen, i + llen, -1): document.lc.data[j - 1] = document.lc.data[j - llen] for item in l: cast(CommentedSeq, document).insert(i, item) document.lc.data[i] = lc i += 1 else: document[i] = l i += 1 else: document[i], _ = loader.resolve_all( val, base_url, file_base=file_base, checklinks=False ) i += 1 except ValidationException as v: _logger.warning("failed", exc_info=True) raise ValidationException( "({}) ({}) Validation error in position {}:".format( id(loader), file_base, i ), None, [v], ) from v if checklinks: all_doc_ids = {} # type: Dict[str, str] loader.validate_links( document, "", all_doc_ids, strict_foreign_properties=strict_foreign_properties, ) return document, metadata def fetch( self, url: str, inject_ids: bool = True, content_types: Optional[List[str]] = None, ) -> IdxResultType: if url in self.idx: return self.idx[url] try: text = self.fetch_text(url, content_types=content_types) if isinstance(text, bytes): textIO = StringIO(text.decode("utf-8")) else: textIO = StringIO(text) textIO.name = str(url) attachments = yaml.main.round_trip_load_all(textIO, preserve_quotes=True) result = cast(CommentedMap, next(attachments)) if self.allow_attachments is not None and self.allow_attachments(result): i = 1 for a in attachments: self.idx[f"{url}#attachment-{i}"] = a i += 1 add_lc_filename(result, url) except yaml.error.MarkedYAMLError as e: raise to_validation_exception(e) from e if isinstance(result, CommentedMap) and inject_ids and bool(self.identifiers): for identifier in self.identifiers: if identifier not in result: result[identifier] = url self.idx[ self.expand_url(result[identifier], url, scoped_id=True) ] = result self.idx[url] = result return result def validate_scoped(self, field: str, link: str, docid: str) -> str: split = urllib.parse.urlsplit(docid) sp = split.fragment.split("/") n = self.scoped_ref_fields[field] while n > 0 and len(sp) > 0: sp.pop() n -= 1 tried = [] while True: sp.append(link) url = urllib.parse.urlunsplit( (split.scheme, split.netloc, split.path, split.query, "/".join(sp)) ) tried.append(url) if url in self.idx: return url sp.pop() if len(sp) == 0: break sp.pop() if onWindows() and link.startswith("file:"): link = link.lower() raise ValidationException( "Field `{}` references unknown identifier `{}`, tried {}".format( field, link, ", ".join(tried) ) ) def validate_link( self, field: str, link: Union[str, CommentedSeq, CommentedMap], docid: str, all_doc_ids: Dict[str, str], ) -> Union[str, CommentedSeq, CommentedMap]: if field in self.nolinkcheck: return link if isinstance(link, str): if field in self.vocab_fields: if ( link not in self.vocab and link not in self.idx and link not in self.rvocab ): if field in self.scoped_ref_fields: return self.validate_scoped(field, link, docid) elif not self.check_exists(link): raise ValidationException( "Field `{}` contains undefined reference to `{}`".format( field, link ) ) elif link not in self.idx and link not in self.rvocab: if field in self.scoped_ref_fields: return self.validate_scoped(field, link, docid) elif not self.check_exists(link): raise ValidationException( "Field `{}` contains undefined reference to `{}`".format( field, link ) ) elif isinstance(link, CommentedSeq): errors = [] for n, i in enumerate(link): try: link[n] = self.validate_link(field, i, docid, all_doc_ids) except ValidationException as v: errors.append(v) if bool(errors): raise ValidationException("", None, errors) elif isinstance(link, CommentedMap): self.validate_links(link, docid, all_doc_ids) elif link is None: return link else: raise ValidationException( "`{}` field is {}, expected string, list, or a dict.".format( field, type(link).__name__ ) ) return link def getid(self, d: Any) -> Optional[str]: if isinstance(d, MutableMapping): for i in self.identifiers: if i in d: idd = d[i] if isinstance(idd, str): return idd return None def validate_links( self, document: ResolveType, base_url: str, all_doc_ids: Dict[str, str], strict_foreign_properties: bool = False, ) -> None: docid = self.getid(document) or base_url errors = [] # type: List[SchemaSaladException] iterator = None # type: Any if isinstance(document, MutableSequence): iterator = enumerate(document) elif isinstance(document, MutableMapping): for d in self.url_fields: sl = SourceLine(document, d, str) try: if d in document and d not in self.identity_links: document[d] = self.validate_link( d, document[d], docid, all_doc_ids ) except SchemaSaladException as v: v = v.with_sourceline(sl) if d == "$schemas" or ( d in self.foreign_properties and not strict_foreign_properties ): _logger.warning(v.as_warning()) else: errors.append(v) # TODO: Validator should local scope only in which # duplicated keys are prohibited. # See also https://github.com/common-workflow-language/common-workflow-language/issues/734 # noqa: B950 # In the future, it should raise # ValidationException instead of _logger.warn try: for ( identifier ) in self.identifiers: # validate that each id is defined uniquely if identifier in document: sl = SourceLine(document, identifier, str) if ( document[identifier] in all_doc_ids and sl.makeLead() != all_doc_ids[document[identifier]] ): _logger.warning( "%s object %s `%s` previously defined", all_doc_ids[document[identifier]], identifier, relname(document[identifier]), ) else: all_doc_ids[document[identifier]] = sl.makeLead() break except ValidationException as v: errors.append(v.with_sourceline(sl)) iterator = list(document.items()) else: return for key, val in iterator: sl = SourceLine(document, key, str) try: self.validate_links( val, docid, all_doc_ids, strict_foreign_properties=strict_foreign_properties, ) except ValidationException as v: if key in self.nolinkcheck or (isinstance(key, str) and ":" in key): _logger.warning(v.as_warning()) else: docid2 = self.getid(val) if docid2 is not None: errors.append( ValidationException( "checking object `{}`".format(relname(docid2)), sl, [v] ) ) else: if isinstance(key, str): errors.append( ValidationException(f"checking field `{key}`", sl, [v]) ) else: errors.append(ValidationException("checking item", sl, [v])) if bool(errors): if len(errors) > 1: raise ValidationException("", None, errors) else: raise errors[0] return D = TypeVar("D", CommentedMap, ContextType) def _copy_dict_without_key(from_dict: D, filtered_key: str) -> D: new_dict = CommentedMap(from_dict.items()) if filtered_key in new_dict: del new_dict[filtered_key] if isinstance(from_dict, CommentedMap): new_dict.lc.data = copy.copy(from_dict.lc.data) new_dict.lc.filename = from_dict.lc.filename return new_dict