diff env/lib/python3.9/site-packages/prov/serializers/provxml.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/prov/serializers/provxml.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,388 @@
+from __future__ import (absolute_import, division, print_function,
+                        unicode_literals)
+
+import datetime
+import logging
+from lxml import etree
+import io
+import warnings
+import prov
+import prov.identifier
+from prov.model import DEFAULT_NAMESPACES, sorted_attributes
+from prov.constants import *  # NOQA
+
+
+__author__ = 'Lion Krischer'
+__email__ = 'krischer@geophysik.uni-muenchen.de'
+
+logger = logging.getLogger(__name__)
+
+# Create a dictionary containing all top-level PROV XML elements for an easy
+# mapping.
+FULL_NAMES_MAP = dict(PROV_N_MAP)
+FULL_NAMES_MAP.update(ADDITIONAL_N_MAP)
+# Inverse mapping.
+FULL_PROV_RECORD_IDS_MAP = dict((FULL_NAMES_MAP[rec_type_id], rec_type_id) for
+                                rec_type_id in FULL_NAMES_MAP)
+
+XML_XSD_URI = 'http://www.w3.org/2001/XMLSchema'
+
+
+class ProvXMLException(prov.Error):
+    pass
+
+
+class ProvXMLSerializer(prov.serializers.Serializer):
+    """PROV-XML serializer for :class:`~prov.model.ProvDocument`
+    """
+    def serialize(self, stream, force_types=False, **kwargs):
+        """
+        Serializes a :class:`~prov.model.ProvDocument` instance to `PROV-XML
+        <http://www.w3.org/TR/prov-xml/>`_.
+
+        :param stream: Where to save the output.
+        :type force_types: boolean, optional
+        :param force_types: Will force xsd:types to be written for most
+            attributes mainly PROV-"attributes", e.g. tags not in the
+            PROV namespace. Off by default meaning xsd:type attributes will
+            only be set for prov:type, prov:location, and prov:value as is
+            done in the official PROV-XML specification. Furthermore the
+            types will always be set if the Python type requires it. False
+            is a good default and it should rarely require changing.
+        """
+        xml_root = self.serialize_bundle(bundle=self.document,
+                                         force_types=force_types)
+        for bundle in self.document.bundles:
+            self.serialize_bundle(bundle=bundle, element=xml_root,
+                                  force_types=force_types)
+        # No encoding must be specified when writing to String object which
+        # does not have the concept of an encoding as it should already
+        # represent unicode code points.
+        et = etree.ElementTree(xml_root)
+        if isinstance(stream, io.TextIOBase):
+            stream.write(etree.tostring(et, xml_declaration=True,
+                                        pretty_print=True).decode('utf-8'))
+        else:
+            et.write(stream, pretty_print=True, xml_declaration=True,
+                     encoding="UTF-8")
+
+    def serialize_bundle(self, bundle, element=None, force_types=False):
+        """
+        Serializes a bundle or document to PROV XML.
+
+        :param bundle: The bundle or document.
+        :param element: The XML element to write to. Will be created if None.
+        :type force_types: boolean, optional
+        :param force_types: Will force xsd:types to be written for most
+            attributes mainly PROV-"attributes", e.g. tags not in the
+            PROV namespace. Off by default meaning xsd:type attributes will
+            only be set for prov:type, prov:location, and prov:value as is
+            done in the official PROV-XML specification. Furthermore the
+            types will always be set if the Python type requires it. False
+            is a good default and it should rarely require changing.
+        """
+        # Build the namespace map for lxml and attach it to the root XML
+        # element. No dictionary comprehension in Python 2.6!
+        nsmap = dict((ns.prefix, ns.uri) for ns in
+                     self.document._namespaces.get_registered_namespaces())
+        if self.document._namespaces._default:
+            nsmap[None] = self.document._namespaces._default.uri
+        for namespace in bundle.namespaces:
+            if namespace not in nsmap:
+                nsmap[namespace.prefix] = namespace.uri
+
+        for key, value in DEFAULT_NAMESPACES.items():
+            uri = value.uri
+            if value.prefix == "xsd":
+                # The XSD namespace for some reason has no hash at the end
+                # for PROV XML, but for all other serializations it does.
+                uri = uri.rstrip("#")
+            nsmap[value.prefix] = uri
+
+        if element is not None:
+            xml_bundle_root = etree.SubElement(
+                element, _ns_prov("bundleContent"), nsmap=nsmap)
+        else:
+            xml_bundle_root = etree.Element(_ns_prov("document"), nsmap=nsmap)
+
+        if bundle.identifier:
+            xml_bundle_root.attrib[_ns_prov("id")] = \
+                six.text_type(bundle.identifier)
+
+        for record in bundle._records:
+            rec_type = record.get_type()
+            identifier = six.text_type(record._identifier) \
+                if record._identifier else None
+
+            if identifier:
+                attrs = {_ns_prov("id"): identifier}
+            else:
+                attrs = None
+
+            # Derive the record label from its attributes which is sometimes
+            # needed.
+            attributes = list(record.attributes)
+            rec_label = self._derive_record_label(rec_type, attributes)
+
+            elem = etree.SubElement(xml_bundle_root,
+                                    _ns_prov(rec_label), attrs)
+
+            for attr, value in sorted_attributes(rec_type, attributes):
+                subelem = etree.SubElement(
+                    elem, _ns(attr.namespace.uri, attr.localpart))
+                if isinstance(value, prov.model.Literal):
+                    if value.datatype not in \
+                            [None, PROV["InternationalizedString"]]:
+                        subelem.attrib[_ns_xsi("type")] = "%s:%s" % (
+                            value.datatype.namespace.prefix,
+                            value.datatype.localpart)
+                    if value.langtag is not None:
+                        subelem.attrib[_ns_xml("lang")] = value.langtag
+                    v = value.value
+                elif isinstance(value, prov.model.QualifiedName):
+                    if attr not in PROV_ATTRIBUTE_QNAMES:
+                        subelem.attrib[_ns_xsi("type")] = "xsd:QName"
+                    v = six.text_type(value)
+                elif isinstance(value, datetime.datetime):
+                    v = value.isoformat()
+                else:
+                    v = six.text_type(value)
+
+                # xsd type inference.
+                #
+                # This is a bit messy and there are all kinds of special
+                # rules but it appears to get the job done.
+                #
+                # If it is a type element and does not yet have an
+                # associated xsi type, try to infer it from the value.
+                # The not startswith("prov:") check is a little bit hacky to
+                # avoid type interference when the type is a standard prov
+                # type.
+                #
+                # To enable a mapping of Python types to XML and back,
+                # the XSD type must be written for these types.
+                ALWAYS_CHECK = [bool, datetime.datetime, float,
+                                prov.identifier.Identifier]
+                # Add long and int on Python 2, only int on Python 3.
+                ALWAYS_CHECK.extend(six.integer_types)
+                ALWAYS_CHECK = tuple(ALWAYS_CHECK)
+                if (force_types or
+                        type(value) in ALWAYS_CHECK or
+                        attr in [PROV_TYPE, PROV_LOCATION, PROV_VALUE]) and \
+                        _ns_xsi("type") not in subelem.attrib and \
+                        not six.text_type(value).startswith("prov:") and \
+                        not (attr in PROV_ATTRIBUTE_QNAMES and v) and \
+                        attr not in [PROV_ATTR_TIME, PROV_LABEL]:
+                    xsd_type = None
+                    if isinstance(value, bool):
+                        xsd_type = XSD_BOOLEAN
+                        v = v.lower()
+                    elif isinstance(value, six.string_types):
+                        xsd_type = XSD_STRING
+                    elif isinstance(value, float):
+                        xsd_type = XSD_DOUBLE
+                    elif isinstance(value, six.integer_types):
+                        xsd_type = XSD_INT
+                    elif isinstance(value, datetime.datetime):
+                        # Exception of the exception, while technically
+                        # still correct, do not write XSD dateTime type for
+                        # attributes in the PROV namespaces as the type is
+                        # already declared in the XSD and PROV XML also does
+                        # not specify it in the docs.
+                        if attr.namespace.prefix != "prov" \
+                                or "time" not in attr.localpart.lower():
+                            xsd_type = XSD_DATETIME
+                    elif isinstance(value, prov.identifier.Identifier):
+                        xsd_type = XSD_ANYURI
+
+                    if xsd_type is not None:
+                        subelem.attrib[_ns_xsi("type")] = \
+                            six.text_type(xsd_type)
+
+                if attr in PROV_ATTRIBUTE_QNAMES and v:
+                    subelem.attrib[_ns_prov("ref")] = v
+                else:
+                    subelem.text = v
+        return xml_bundle_root
+
+    def deserialize(self, stream, **kwargs):
+        """
+        Deserialize from `PROV-XML <http://www.w3.org/TR/prov-xml/>`_
+        representation to a :class:`~prov.model.ProvDocument` instance.
+
+        :param stream: Input data.
+        """
+        if isinstance(stream, io.TextIOBase):
+            with io.BytesIO() as buf:
+                buf.write(stream.read().encode('utf-8'))
+                buf.seek(0, 0)
+                xml_doc = etree.parse(buf).getroot()
+        else:
+            xml_doc = etree.parse(stream).getroot()
+
+        # Remove all comments.
+        for c in xml_doc.xpath("//comment()"):
+            p = c.getparent()
+            p.remove(c)
+
+        document = prov.model.ProvDocument()
+        self.deserialize_subtree(xml_doc, document)
+        return document
+
+    def deserialize_subtree(self, xml_doc, bundle):
+        """
+        Deserialize an etree element containing a PROV document or a bundle
+        and write it to the provided internal object.
+
+        :param xml_doc: An etree element containing the information to read.
+        :param bundle: The bundle object to write to.
+        """
+
+        for element in xml_doc:
+            qname = etree.QName(element)
+            if qname.namespace != DEFAULT_NAMESPACES["prov"].uri:
+                raise ProvXMLException("Non PROV element discovered in "
+                                       "document or bundle.")
+            # Ignore the <prov:other> element storing non-PROV information.
+            if qname.localname == "other":
+                warnings.warn(
+                    "Document contains non-PROV information in "
+                    "<prov:other>. It will be ignored in this package.",
+                    UserWarning)
+                continue
+
+            id_tag = _ns_prov("id")
+            rec_id = element.attrib[id_tag] if id_tag in element.attrib \
+                else None
+
+            if rec_id is not None:
+                # Try to make a qualified name out of it!
+                rec_id = xml_qname_to_QualifiedName(element, rec_id)
+
+            # Recursively read bundles.
+            if qname.localname == "bundleContent":
+                b = bundle.bundle(identifier=rec_id)
+                self.deserialize_subtree(element, b)
+                continue
+
+            attributes = _extract_attributes(element)
+
+            # Map the record type to its base type.
+            q_prov_name = FULL_PROV_RECORD_IDS_MAP[qname.localname]
+            rec_type = PROV_BASE_CLS[q_prov_name]
+
+            if _ns_xsi("type") in element.attrib:
+                value = xml_qname_to_QualifiedName(
+                    element, element.attrib[_ns_xsi("type")]
+                )
+                attributes.append((PROV["type"], value))
+
+            rec = bundle.new_record(rec_type, rec_id, attributes)
+
+            # Add the actual type in case a base type has been used.
+            if rec_type != q_prov_name:
+                rec.add_asserted_type(q_prov_name)
+        return bundle
+
+    def _derive_record_label(self, rec_type, attributes):
+        """
+        Helper function trying to derive the record label taking care of
+        subtypes and what not. It will also remove the type declaration for
+        the attributes if it was used to specialize the type.
+
+        :param rec_type: The type of records.
+        :param attributes: The attributes of the record.
+        """
+        rec_label = FULL_NAMES_MAP[rec_type]
+
+        for key, value in list(attributes):
+            if key != PROV_TYPE:
+                continue
+            if isinstance(value, prov.model.Literal):
+                value = value.value
+            if value in PROV_BASE_CLS and PROV_BASE_CLS[value] != value:
+                attributes.remove((key, value))
+                rec_label = FULL_NAMES_MAP[value]
+                break
+        return rec_label
+
+
+def _extract_attributes(element):
+    """
+    Extract the PROV attributes from an etree element.
+
+    :param element: The lxml.etree.Element instance.
+    """
+    attributes = []
+    for subel in element:
+        sqname = etree.QName(subel)
+        _t = xml_qname_to_QualifiedName(
+            subel, "%s:%s" % (subel.prefix, sqname.localname)
+        )
+
+        for key, value in subel.attrib.items():
+            if key == _ns_xsi("type"):
+                datatype = xml_qname_to_QualifiedName(subel, value)
+                if datatype == XSD_QNAME:
+                    _v = xml_qname_to_QualifiedName(subel, subel.text)
+                else:
+                    _v = prov.model.Literal(subel.text, datatype)
+            elif key == _ns_prov("ref"):
+                _v = xml_qname_to_QualifiedName(subel, value)
+            elif key == _ns_xml("lang"):
+                _v = prov.model.Literal(subel.text, langtag=value)
+            else:
+                warnings.warn(
+                    "The element '%s' contains an attribute %s='%s' "
+                    "which is not representable in the prov module's "
+                    "internal data model and will thus be ignored." %
+                    (_t, six.text_type(key), six.text_type(value)),
+                    UserWarning)
+
+        if not subel.attrib:
+            _v = subel.text
+
+        attributes.append((_t, _v))
+
+    return attributes
+
+
+def xml_qname_to_QualifiedName(element, qname_str):
+    if ':' in qname_str:
+        prefix, localpart = qname_str.split(':', 1)
+        if prefix in element.nsmap:
+            ns_uri = element.nsmap[prefix]
+            if ns_uri == XML_XSD_URI:
+                ns = XSD  # use the standard xsd namespace (i.e. with #)
+            elif ns_uri == PROV.uri:
+                ns = PROV
+            else:
+                ns = Namespace(prefix, ns_uri)
+            return ns[localpart]
+    # case 1: no colon
+    # case 2: unknown prefix
+    if None in element.nsmap:
+        ns_uri = element.nsmap[None]
+        ns = Namespace('', ns_uri)
+        return ns[qname_str]
+    # no default namespace
+    raise ProvXMLException(
+        'Could not create a valid QualifiedName for "%s"' % qname_str
+    )
+
+
+def _ns(ns, tag):
+    return "{%s}%s" % (ns, tag)
+
+
+def _ns_prov(tag):
+    return _ns(DEFAULT_NAMESPACES['prov'].uri, tag)
+
+
+def _ns_xsi(tag):
+    return _ns(DEFAULT_NAMESPACES['xsi'].uri, tag)
+
+
+def _ns_xml(tag):
+    NS_XML = "http://www.w3.org/XML/1998/namespace"
+    return _ns(NS_XML, tag)