view env/lib/python3.9/site-packages/schema_salad/jsonld_context.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

import logging
import unicodedata
from typing import (
    Any,
    Dict,
    Iterable,
    List,
    MutableMapping,
    MutableSequence,
    Optional,
    Tuple,
    Union,
    cast,
)
from urllib.parse import urldefrag, urlsplit

import rdflib
import rdflib.namespace
from rdflib import Graph, URIRef
from rdflib.namespace import RDF, RDFS
from ruamel.yaml.comments import CommentedMap, CommentedSeq

from .exceptions import SchemaException
from .utils import ContextType, aslist, json_dumps

_logger = logging.getLogger("salad")


def pred(
    datatype: MutableMapping[str, Union[Dict[str, str], str]],
    field: Optional[Dict[str, Any]],
    name: str,
    context: ContextType,
    defaultBase: str,
    namespaces: Dict[str, rdflib.namespace.Namespace],
) -> Union[Dict[str, Union[str, None]], str]:
    split = urlsplit(name)

    vee = None  # type: Optional[str]

    if split.scheme != "":
        vee = name
        (ns, ln) = rdflib.namespace.split_uri(str(vee))
        name = ln
        if ns[0:-1] in namespaces:
            vee = str(namespaces[ns[0:-1]][ln])
        _logger.debug("name, v %s %s", name, vee)

    v = None  # type: Optional[Union[Dict[str, Union[str, None]], str]]

    if field is not None and "jsonldPredicate" in field:
        if isinstance(field["jsonldPredicate"], MutableMapping):
            v = {}
            for k, val in field["jsonldPredicate"].items():
                v[("@" + k[1:] if k.startswith("_") else k)] = val
            if "@id" not in v:
                v["@id"] = vee
        else:
            v = field["jsonldPredicate"]
    elif "jsonldPredicate" in datatype:
        if isinstance(datatype["jsonldPredicate"], Iterable):
            for d in datatype["jsonldPredicate"]:
                if isinstance(d, MutableMapping):
                    if d["symbol"] == name:
                        v = d["predicate"]
                else:
                    raise SchemaException(
                        "entries in the jsonldPredicate List must be " "Dictionaries"
                    )
        else:
            raise SchemaException("jsonldPredicate must be a List of Dictionaries.")

    ret = v or vee

    if not ret:
        ret = defaultBase + name

    if name in context:
        if context[name] != ret:
            raise SchemaException(
                "Predicate collision on {}, '{}' != '{}'".format(
                    name, context[name], ret
                )
            )
    else:
        _logger.debug("Adding to context '%s' %s (%s)", name, ret, type(ret))
        context[name] = ret

    return ret


def process_type(
    t: MutableMapping[str, Any],
    g: Graph,
    context: ContextType,
    defaultBase: str,
    namespaces: Dict[str, rdflib.namespace.Namespace],
    defaultPrefix: str,
) -> None:
    if t["type"] not in ("record", "enum"):
        return

    if "name" in t:
        recordname = t["name"]

        _logger.debug("Processing %s %s\n", t.get("type"), t)

        classnode = URIRef(recordname)
        g.add((classnode, RDF.type, RDFS.Class))

        split = urlsplit(recordname)
        predicate = recordname
        if t.get("inVocab", True):
            if split.scheme:
                (ns, ln) = rdflib.namespace.split_uri(str(recordname))
                predicate = recordname
                recordname = ln
            else:
                predicate = f"{defaultPrefix}:{recordname}"

        if context.get(recordname, predicate) != predicate:
            raise SchemaException(
                "Predicate collision on '{}', '{}' != '{}'".format(
                    recordname, context[recordname], predicate
                )
            )

        if not recordname:
            raise SchemaException(f"Unable to find/derive recordname for {t}")

        _logger.debug(
            "Adding to context '%s' %s (%s)", recordname, predicate, type(predicate)
        )
        context[recordname] = predicate

    if t["type"] == "record":
        for i in t.get("fields", []):
            fieldname = i["name"]

            _logger.debug("Processing field %s", i)

            v = pred(
                t, i, fieldname, context, defaultPrefix, namespaces
            )  # type: Union[Dict[Any, Any], str, None]

            if isinstance(v, str):
                v = v if v[0] != "@" else None
            elif v is not None:
                v = v["_@id"] if v.get("_@id", "@")[0] != "@" else None

            if bool(v):
                try:
                    (ns, ln) = rdflib.namespace.split_uri(str(v))
                except ValueError:
                    # rdflib 5.0.0 compatibility
                    uri = str(v)
                    colon_index = str(v).rfind(":")

                    if colon_index < 0:
                        raise
                    split_start = rdflib.namespace.SPLIT_START_CATEGORIES
                    for j in range(-1 - colon_index, len(uri)):
                        if unicodedata.category(uri[j]) in split_start or uri[j] == "_":
                            # _ prevents early split, roundtrip not generate
                            ns = uri[:j]
                            if not ns:
                                break
                            ln = uri[j:]
                            break
                    if not ns or not ln:
                        raise

                if ns[0:-1] in namespaces:
                    propnode = namespaces[ns[0:-1]][ln]
                else:
                    propnode = URIRef(v)

                g.add((propnode, RDF.type, RDF.Property))
                g.add((propnode, RDFS.domain, classnode))

                # TODO generate range from datatype.

            if isinstance(i["type"], MutableMapping):
                process_type(
                    i["type"], g, context, defaultBase, namespaces, defaultPrefix
                )

        if "extends" in t:
            for e in aslist(t["extends"]):
                g.add((classnode, RDFS.subClassOf, URIRef(e)))
    elif t["type"] == "enum":
        _logger.debug("Processing enum %s", t.get("name"))

        for i in t["symbols"]:
            pred(t, None, i, context, defaultBase, namespaces)


def salad_to_jsonld_context(
    j: Iterable[MutableMapping[str, Any]], schema_ctx: MutableMapping[str, Any]
) -> Tuple[ContextType, Graph]:
    context = {}  # type: ContextType
    namespaces = {}
    g = Graph()
    defaultPrefix = ""

    for k, v in schema_ctx.items():
        context[k] = v
        namespaces[k] = rdflib.namespace.Namespace(v)

    if "@base" in context:
        defaultBase = cast(str, context["@base"])
        del context["@base"]
    else:
        defaultBase = ""

    for k, v in namespaces.items():
        g.bind(str(k), v)

    for t in j:
        process_type(t, g, context, defaultBase, namespaces, defaultPrefix)

    return (context, g)


def fix_jsonld_ids(
    obj: Union[CommentedMap, float, str, CommentedSeq], ids: List[str]
) -> None:
    if isinstance(obj, MutableMapping):
        for i in ids:
            if i in obj:
                obj["@id"] = obj[i]
        for v in obj.values():
            fix_jsonld_ids(v, ids)
    if isinstance(obj, MutableSequence):
        for entry in obj:
            fix_jsonld_ids(entry, ids)


def makerdf(
    workflow: Optional[str],
    wf: Union[CommentedMap, float, str, CommentedSeq],
    ctx: ContextType,
    graph: Optional[Graph] = None,
) -> Graph:
    prefixes = {}
    idfields = []
    for k, v in ctx.items():
        if isinstance(v, MutableMapping):
            url = v["@id"]
        else:
            url = v
        if url == "@id":
            idfields.append(k)
        doc_url, frg = urldefrag(url)
        if "/" in frg:
            p = frg.split("/")[0]
            prefixes[p] = f"{doc_url}#{p}/"

    fix_jsonld_ids(wf, idfields)

    g = Graph() if graph is None else graph

    if isinstance(wf, MutableSequence):
        for w in wf:
            w["@context"] = ctx
            g.parse(data=json_dumps(w), format="json-ld", publicID=str(workflow))
    elif isinstance(wf, MutableMapping):
        wf["@context"] = ctx
        g.parse(data=json_dumps(wf), format="json-ld", publicID=str(workflow))
    else:
        raise SchemaException(f"{wf} is not a workflow")

    # Bug in json-ld loader causes @id fields to be added to the graph
    for sub, pred, obj in g.triples((None, URIRef("@id"), None)):
        g.remove((sub, pred, obj))

    for k2, v2 in prefixes.items():
        g.namespace_manager.bind(k2, v2)

    return g