Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/schema_salad/jsonld_context.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
import logging import unicodedata from typing import ( Any, Dict, Iterable, List, MutableMapping, MutableSequence, Optional, Tuple, Union, cast, ) from urllib.parse import urldefrag, urlsplit import rdflib import rdflib.namespace from rdflib import Graph, URIRef from rdflib.namespace import RDF, RDFS from ruamel.yaml.comments import CommentedMap, CommentedSeq from .exceptions import SchemaException from .utils import ContextType, aslist, json_dumps _logger = logging.getLogger("salad") def pred( datatype: MutableMapping[str, Union[Dict[str, str], str]], field: Optional[Dict[str, Any]], name: str, context: ContextType, defaultBase: str, namespaces: Dict[str, rdflib.namespace.Namespace], ) -> Union[Dict[str, Union[str, None]], str]: split = urlsplit(name) vee = None # type: Optional[str] if split.scheme != "": vee = name (ns, ln) = rdflib.namespace.split_uri(str(vee)) name = ln if ns[0:-1] in namespaces: vee = str(namespaces[ns[0:-1]][ln]) _logger.debug("name, v %s %s", name, vee) v = None # type: Optional[Union[Dict[str, Union[str, None]], str]] if field is not None and "jsonldPredicate" in field: if isinstance(field["jsonldPredicate"], MutableMapping): v = {} for k, val in field["jsonldPredicate"].items(): v[("@" + k[1:] if k.startswith("_") else k)] = val if "@id" not in v: v["@id"] = vee else: v = field["jsonldPredicate"] elif "jsonldPredicate" in datatype: if isinstance(datatype["jsonldPredicate"], Iterable): for d in datatype["jsonldPredicate"]: if isinstance(d, MutableMapping): if d["symbol"] == name: v = d["predicate"] else: raise SchemaException( "entries in the jsonldPredicate List must be " "Dictionaries" ) else: raise SchemaException("jsonldPredicate must be a List of Dictionaries.") ret = v or vee if not ret: ret = defaultBase + name if name in context: if context[name] != ret: raise SchemaException( "Predicate collision on {}, '{}' != '{}'".format( name, context[name], ret ) ) else: _logger.debug("Adding to context '%s' %s (%s)", name, ret, type(ret)) context[name] = ret return ret def process_type( t: MutableMapping[str, Any], g: Graph, context: ContextType, defaultBase: str, namespaces: Dict[str, rdflib.namespace.Namespace], defaultPrefix: str, ) -> None: if t["type"] not in ("record", "enum"): return if "name" in t: recordname = t["name"] _logger.debug("Processing %s %s\n", t.get("type"), t) classnode = URIRef(recordname) g.add((classnode, RDF.type, RDFS.Class)) split = urlsplit(recordname) predicate = recordname if t.get("inVocab", True): if split.scheme: (ns, ln) = rdflib.namespace.split_uri(str(recordname)) predicate = recordname recordname = ln else: predicate = f"{defaultPrefix}:{recordname}" if context.get(recordname, predicate) != predicate: raise SchemaException( "Predicate collision on '{}', '{}' != '{}'".format( recordname, context[recordname], predicate ) ) if not recordname: raise SchemaException(f"Unable to find/derive recordname for {t}") _logger.debug( "Adding to context '%s' %s (%s)", recordname, predicate, type(predicate) ) context[recordname] = predicate if t["type"] == "record": for i in t.get("fields", []): fieldname = i["name"] _logger.debug("Processing field %s", i) v = pred( t, i, fieldname, context, defaultPrefix, namespaces ) # type: Union[Dict[Any, Any], str, None] if isinstance(v, str): v = v if v[0] != "@" else None elif v is not None: v = v["_@id"] if v.get("_@id", "@")[0] != "@" else None if bool(v): try: (ns, ln) = rdflib.namespace.split_uri(str(v)) except ValueError: # rdflib 5.0.0 compatibility uri = str(v) colon_index = str(v).rfind(":") if colon_index < 0: raise split_start = rdflib.namespace.SPLIT_START_CATEGORIES for j in range(-1 - colon_index, len(uri)): if unicodedata.category(uri[j]) in split_start or uri[j] == "_": # _ prevents early split, roundtrip not generate ns = uri[:j] if not ns: break ln = uri[j:] break if not ns or not ln: raise if ns[0:-1] in namespaces: propnode = namespaces[ns[0:-1]][ln] else: propnode = URIRef(v) g.add((propnode, RDF.type, RDF.Property)) g.add((propnode, RDFS.domain, classnode)) # TODO generate range from datatype. if isinstance(i["type"], MutableMapping): process_type( i["type"], g, context, defaultBase, namespaces, defaultPrefix ) if "extends" in t: for e in aslist(t["extends"]): g.add((classnode, RDFS.subClassOf, URIRef(e))) elif t["type"] == "enum": _logger.debug("Processing enum %s", t.get("name")) for i in t["symbols"]: pred(t, None, i, context, defaultBase, namespaces) def salad_to_jsonld_context( j: Iterable[MutableMapping[str, Any]], schema_ctx: MutableMapping[str, Any] ) -> Tuple[ContextType, Graph]: context = {} # type: ContextType namespaces = {} g = Graph() defaultPrefix = "" for k, v in schema_ctx.items(): context[k] = v namespaces[k] = rdflib.namespace.Namespace(v) if "@base" in context: defaultBase = cast(str, context["@base"]) del context["@base"] else: defaultBase = "" for k, v in namespaces.items(): g.bind(str(k), v) for t in j: process_type(t, g, context, defaultBase, namespaces, defaultPrefix) return (context, g) def fix_jsonld_ids( obj: Union[CommentedMap, float, str, CommentedSeq], ids: List[str] ) -> None: if isinstance(obj, MutableMapping): for i in ids: if i in obj: obj["@id"] = obj[i] for v in obj.values(): fix_jsonld_ids(v, ids) if isinstance(obj, MutableSequence): for entry in obj: fix_jsonld_ids(entry, ids) def makerdf( workflow: Optional[str], wf: Union[CommentedMap, float, str, CommentedSeq], ctx: ContextType, graph: Optional[Graph] = None, ) -> Graph: prefixes = {} idfields = [] for k, v in ctx.items(): if isinstance(v, MutableMapping): url = v["@id"] else: url = v if url == "@id": idfields.append(k) doc_url, frg = urldefrag(url) if "/" in frg: p = frg.split("/")[0] prefixes[p] = f"{doc_url}#{p}/" fix_jsonld_ids(wf, idfields) g = Graph() if graph is None else graph if isinstance(wf, MutableSequence): for w in wf: w["@context"] = ctx g.parse(data=json_dumps(w), format="json-ld", publicID=str(workflow)) elif isinstance(wf, MutableMapping): wf["@context"] = ctx g.parse(data=json_dumps(wf), format="json-ld", publicID=str(workflow)) else: raise SchemaException(f"{wf} is not a workflow") # Bug in json-ld loader causes @id fields to be added to the graph for sub, pred, obj in g.triples((None, URIRef("@id"), None)): g.remove((sub, pred, obj)) for k2, v2 in prefixes.items(): g.namespace_manager.bind(k2, v2) return g