Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/schema_salad/jsonld_context.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/schema_salad/jsonld_context.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,280 @@ +import logging +import unicodedata +from typing import ( + Any, + Dict, + Iterable, + List, + MutableMapping, + MutableSequence, + Optional, + Tuple, + Union, + cast, +) +from urllib.parse import urldefrag, urlsplit + +import rdflib +import rdflib.namespace +from rdflib import Graph, URIRef +from rdflib.namespace import RDF, RDFS +from ruamel.yaml.comments import CommentedMap, CommentedSeq + +from .exceptions import SchemaException +from .utils import ContextType, aslist, json_dumps + +_logger = logging.getLogger("salad") + + +def pred( + datatype: MutableMapping[str, Union[Dict[str, str], str]], + field: Optional[Dict[str, Any]], + name: str, + context: ContextType, + defaultBase: str, + namespaces: Dict[str, rdflib.namespace.Namespace], +) -> Union[Dict[str, Union[str, None]], str]: + split = urlsplit(name) + + vee = None # type: Optional[str] + + if split.scheme != "": + vee = name + (ns, ln) = rdflib.namespace.split_uri(str(vee)) + name = ln + if ns[0:-1] in namespaces: + vee = str(namespaces[ns[0:-1]][ln]) + _logger.debug("name, v %s %s", name, vee) + + v = None # type: Optional[Union[Dict[str, Union[str, None]], str]] + + if field is not None and "jsonldPredicate" in field: + if isinstance(field["jsonldPredicate"], MutableMapping): + v = {} + for k, val in field["jsonldPredicate"].items(): + v[("@" + k[1:] if k.startswith("_") else k)] = val + if "@id" not in v: + v["@id"] = vee + else: + v = field["jsonldPredicate"] + elif "jsonldPredicate" in datatype: + if isinstance(datatype["jsonldPredicate"], Iterable): + for d in datatype["jsonldPredicate"]: + if isinstance(d, MutableMapping): + if d["symbol"] == name: + v = d["predicate"] + else: + raise SchemaException( + "entries in the jsonldPredicate List must be " "Dictionaries" + ) + else: + raise SchemaException("jsonldPredicate must be a List of Dictionaries.") + + ret = v or vee + + if not ret: + ret = defaultBase + name + + if name in context: + if context[name] != ret: + raise SchemaException( + "Predicate collision on {}, '{}' != '{}'".format( + name, context[name], ret + ) + ) + else: + _logger.debug("Adding to context '%s' %s (%s)", name, ret, type(ret)) + context[name] = ret + + return ret + + +def process_type( + t: MutableMapping[str, Any], + g: Graph, + context: ContextType, + defaultBase: str, + namespaces: Dict[str, rdflib.namespace.Namespace], + defaultPrefix: str, +) -> None: + if t["type"] not in ("record", "enum"): + return + + if "name" in t: + recordname = t["name"] + + _logger.debug("Processing %s %s\n", t.get("type"), t) + + classnode = URIRef(recordname) + g.add((classnode, RDF.type, RDFS.Class)) + + split = urlsplit(recordname) + predicate = recordname + if t.get("inVocab", True): + if split.scheme: + (ns, ln) = rdflib.namespace.split_uri(str(recordname)) + predicate = recordname + recordname = ln + else: + predicate = f"{defaultPrefix}:{recordname}" + + if context.get(recordname, predicate) != predicate: + raise SchemaException( + "Predicate collision on '{}', '{}' != '{}'".format( + recordname, context[recordname], predicate + ) + ) + + if not recordname: + raise SchemaException(f"Unable to find/derive recordname for {t}") + + _logger.debug( + "Adding to context '%s' %s (%s)", recordname, predicate, type(predicate) + ) + context[recordname] = predicate + + if t["type"] == "record": + for i in t.get("fields", []): + fieldname = i["name"] + + _logger.debug("Processing field %s", i) + + v = pred( + t, i, fieldname, context, defaultPrefix, namespaces + ) # type: Union[Dict[Any, Any], str, None] + + if isinstance(v, str): + v = v if v[0] != "@" else None + elif v is not None: + v = v["_@id"] if v.get("_@id", "@")[0] != "@" else None + + if bool(v): + try: + (ns, ln) = rdflib.namespace.split_uri(str(v)) + except ValueError: + # rdflib 5.0.0 compatibility + uri = str(v) + colon_index = str(v).rfind(":") + + if colon_index < 0: + raise + split_start = rdflib.namespace.SPLIT_START_CATEGORIES + for j in range(-1 - colon_index, len(uri)): + if unicodedata.category(uri[j]) in split_start or uri[j] == "_": + # _ prevents early split, roundtrip not generate + ns = uri[:j] + if not ns: + break + ln = uri[j:] + break + if not ns or not ln: + raise + + if ns[0:-1] in namespaces: + propnode = namespaces[ns[0:-1]][ln] + else: + propnode = URIRef(v) + + g.add((propnode, RDF.type, RDF.Property)) + g.add((propnode, RDFS.domain, classnode)) + + # TODO generate range from datatype. + + if isinstance(i["type"], MutableMapping): + process_type( + i["type"], g, context, defaultBase, namespaces, defaultPrefix + ) + + if "extends" in t: + for e in aslist(t["extends"]): + g.add((classnode, RDFS.subClassOf, URIRef(e))) + elif t["type"] == "enum": + _logger.debug("Processing enum %s", t.get("name")) + + for i in t["symbols"]: + pred(t, None, i, context, defaultBase, namespaces) + + +def salad_to_jsonld_context( + j: Iterable[MutableMapping[str, Any]], schema_ctx: MutableMapping[str, Any] +) -> Tuple[ContextType, Graph]: + context = {} # type: ContextType + namespaces = {} + g = Graph() + defaultPrefix = "" + + for k, v in schema_ctx.items(): + context[k] = v + namespaces[k] = rdflib.namespace.Namespace(v) + + if "@base" in context: + defaultBase = cast(str, context["@base"]) + del context["@base"] + else: + defaultBase = "" + + for k, v in namespaces.items(): + g.bind(str(k), v) + + for t in j: + process_type(t, g, context, defaultBase, namespaces, defaultPrefix) + + return (context, g) + + +def fix_jsonld_ids( + obj: Union[CommentedMap, float, str, CommentedSeq], ids: List[str] +) -> None: + if isinstance(obj, MutableMapping): + for i in ids: + if i in obj: + obj["@id"] = obj[i] + for v in obj.values(): + fix_jsonld_ids(v, ids) + if isinstance(obj, MutableSequence): + for entry in obj: + fix_jsonld_ids(entry, ids) + + +def makerdf( + workflow: Optional[str], + wf: Union[CommentedMap, float, str, CommentedSeq], + ctx: ContextType, + graph: Optional[Graph] = None, +) -> Graph: + prefixes = {} + idfields = [] + for k, v in ctx.items(): + if isinstance(v, MutableMapping): + url = v["@id"] + else: + url = v + if url == "@id": + idfields.append(k) + doc_url, frg = urldefrag(url) + if "/" in frg: + p = frg.split("/")[0] + prefixes[p] = f"{doc_url}#{p}/" + + fix_jsonld_ids(wf, idfields) + + g = Graph() if graph is None else graph + + if isinstance(wf, MutableSequence): + for w in wf: + w["@context"] = ctx + g.parse(data=json_dumps(w), format="json-ld", publicID=str(workflow)) + elif isinstance(wf, MutableMapping): + wf["@context"] = ctx + g.parse(data=json_dumps(wf), format="json-ld", publicID=str(workflow)) + else: + raise SchemaException(f"{wf} is not a workflow") + + # Bug in json-ld loader causes @id fields to be added to the graph + for sub, pred, obj in g.triples((None, URIRef("@id"), None)): + g.remove((sub, pred, obj)) + + for k2, v2 in prefixes.items(): + g.namespace_manager.bind(k2, v2) + + return g