diff env/lib/python3.7/site-packages/prov/serializers/provxml.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/prov/serializers/provxml.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,388 +0,0 @@
-from __future__ import (absolute_import, division, print_function,
-                        unicode_literals)
-
-import datetime
-import logging
-from lxml import etree
-import io
-import warnings
-import prov
-import prov.identifier
-from prov.model import DEFAULT_NAMESPACES, sorted_attributes
-from prov.constants import *  # NOQA
-
-
-__author__ = 'Lion Krischer'
-__email__ = 'krischer@geophysik.uni-muenchen.de'
-
-logger = logging.getLogger(__name__)
-
-# Create a dictionary containing all top-level PROV XML elements for an easy
-# mapping.
-FULL_NAMES_MAP = dict(PROV_N_MAP)
-FULL_NAMES_MAP.update(ADDITIONAL_N_MAP)
-# Inverse mapping.
-FULL_PROV_RECORD_IDS_MAP = dict((FULL_NAMES_MAP[rec_type_id], rec_type_id) for
-                                rec_type_id in FULL_NAMES_MAP)
-
-XML_XSD_URI = 'http://www.w3.org/2001/XMLSchema'
-
-
-class ProvXMLException(prov.Error):
-    pass
-
-
-class ProvXMLSerializer(prov.serializers.Serializer):
-    """PROV-XML serializer for :class:`~prov.model.ProvDocument`
-    """
-    def serialize(self, stream, force_types=False, **kwargs):
-        """
-        Serializes a :class:`~prov.model.ProvDocument` instance to `PROV-XML
-        <http://www.w3.org/TR/prov-xml/>`_.
-
-        :param stream: Where to save the output.
-        :type force_types: boolean, optional
-        :param force_types: Will force xsd:types to be written for most
-            attributes mainly PROV-"attributes", e.g. tags not in the
-            PROV namespace. Off by default meaning xsd:type attributes will
-            only be set for prov:type, prov:location, and prov:value as is
-            done in the official PROV-XML specification. Furthermore the
-            types will always be set if the Python type requires it. False
-            is a good default and it should rarely require changing.
-        """
-        xml_root = self.serialize_bundle(bundle=self.document,
-                                         force_types=force_types)
-        for bundle in self.document.bundles:
-            self.serialize_bundle(bundle=bundle, element=xml_root,
-                                  force_types=force_types)
-        # No encoding must be specified when writing to String object which
-        # does not have the concept of an encoding as it should already
-        # represent unicode code points.
-        et = etree.ElementTree(xml_root)
-        if isinstance(stream, io.TextIOBase):
-            stream.write(etree.tostring(et, xml_declaration=True,
-                                        pretty_print=True).decode('utf-8'))
-        else:
-            et.write(stream, pretty_print=True, xml_declaration=True,
-                     encoding="UTF-8")
-
-    def serialize_bundle(self, bundle, element=None, force_types=False):
-        """
-        Serializes a bundle or document to PROV XML.
-
-        :param bundle: The bundle or document.
-        :param element: The XML element to write to. Will be created if None.
-        :type force_types: boolean, optional
-        :param force_types: Will force xsd:types to be written for most
-            attributes mainly PROV-"attributes", e.g. tags not in the
-            PROV namespace. Off by default meaning xsd:type attributes will
-            only be set for prov:type, prov:location, and prov:value as is
-            done in the official PROV-XML specification. Furthermore the
-            types will always be set if the Python type requires it. False
-            is a good default and it should rarely require changing.
-        """
-        # Build the namespace map for lxml and attach it to the root XML
-        # element. No dictionary comprehension in Python 2.6!
-        nsmap = dict((ns.prefix, ns.uri) for ns in
-                     self.document._namespaces.get_registered_namespaces())
-        if self.document._namespaces._default:
-            nsmap[None] = self.document._namespaces._default.uri
-        for namespace in bundle.namespaces:
-            if namespace not in nsmap:
-                nsmap[namespace.prefix] = namespace.uri
-
-        for key, value in DEFAULT_NAMESPACES.items():
-            uri = value.uri
-            if value.prefix == "xsd":
-                # The XSD namespace for some reason has no hash at the end
-                # for PROV XML, but for all other serializations it does.
-                uri = uri.rstrip("#")
-            nsmap[value.prefix] = uri
-
-        if element is not None:
-            xml_bundle_root = etree.SubElement(
-                element, _ns_prov("bundleContent"), nsmap=nsmap)
-        else:
-            xml_bundle_root = etree.Element(_ns_prov("document"), nsmap=nsmap)
-
-        if bundle.identifier:
-            xml_bundle_root.attrib[_ns_prov("id")] = \
-                six.text_type(bundle.identifier)
-
-        for record in bundle._records:
-            rec_type = record.get_type()
-            identifier = six.text_type(record._identifier) \
-                if record._identifier else None
-
-            if identifier:
-                attrs = {_ns_prov("id"): identifier}
-            else:
-                attrs = None
-
-            # Derive the record label from its attributes which is sometimes
-            # needed.
-            attributes = list(record.attributes)
-            rec_label = self._derive_record_label(rec_type, attributes)
-
-            elem = etree.SubElement(xml_bundle_root,
-                                    _ns_prov(rec_label), attrs)
-
-            for attr, value in sorted_attributes(rec_type, attributes):
-                subelem = etree.SubElement(
-                    elem, _ns(attr.namespace.uri, attr.localpart))
-                if isinstance(value, prov.model.Literal):
-                    if value.datatype not in \
-                            [None, PROV["InternationalizedString"]]:
-                        subelem.attrib[_ns_xsi("type")] = "%s:%s" % (
-                            value.datatype.namespace.prefix,
-                            value.datatype.localpart)
-                    if value.langtag is not None:
-                        subelem.attrib[_ns_xml("lang")] = value.langtag
-                    v = value.value
-                elif isinstance(value, prov.model.QualifiedName):
-                    if attr not in PROV_ATTRIBUTE_QNAMES:
-                        subelem.attrib[_ns_xsi("type")] = "xsd:QName"
-                    v = six.text_type(value)
-                elif isinstance(value, datetime.datetime):
-                    v = value.isoformat()
-                else:
-                    v = six.text_type(value)
-
-                # xsd type inference.
-                #
-                # This is a bit messy and there are all kinds of special
-                # rules but it appears to get the job done.
-                #
-                # If it is a type element and does not yet have an
-                # associated xsi type, try to infer it from the value.
-                # The not startswith("prov:") check is a little bit hacky to
-                # avoid type interference when the type is a standard prov
-                # type.
-                #
-                # To enable a mapping of Python types to XML and back,
-                # the XSD type must be written for these types.
-                ALWAYS_CHECK = [bool, datetime.datetime, float,
-                                prov.identifier.Identifier]
-                # Add long and int on Python 2, only int on Python 3.
-                ALWAYS_CHECK.extend(six.integer_types)
-                ALWAYS_CHECK = tuple(ALWAYS_CHECK)
-                if (force_types or
-                        type(value) in ALWAYS_CHECK or
-                        attr in [PROV_TYPE, PROV_LOCATION, PROV_VALUE]) and \
-                        _ns_xsi("type") not in subelem.attrib and \
-                        not six.text_type(value).startswith("prov:") and \
-                        not (attr in PROV_ATTRIBUTE_QNAMES and v) and \
-                        attr not in [PROV_ATTR_TIME, PROV_LABEL]:
-                    xsd_type = None
-                    if isinstance(value, bool):
-                        xsd_type = XSD_BOOLEAN
-                        v = v.lower()
-                    elif isinstance(value, six.string_types):
-                        xsd_type = XSD_STRING
-                    elif isinstance(value, float):
-                        xsd_type = XSD_DOUBLE
-                    elif isinstance(value, six.integer_types):
-                        xsd_type = XSD_INT
-                    elif isinstance(value, datetime.datetime):
-                        # Exception of the exception, while technically
-                        # still correct, do not write XSD dateTime type for
-                        # attributes in the PROV namespaces as the type is
-                        # already declared in the XSD and PROV XML also does
-                        # not specify it in the docs.
-                        if attr.namespace.prefix != "prov" \
-                                or "time" not in attr.localpart.lower():
-                            xsd_type = XSD_DATETIME
-                    elif isinstance(value, prov.identifier.Identifier):
-                        xsd_type = XSD_ANYURI
-
-                    if xsd_type is not None:
-                        subelem.attrib[_ns_xsi("type")] = \
-                            six.text_type(xsd_type)
-
-                if attr in PROV_ATTRIBUTE_QNAMES and v:
-                    subelem.attrib[_ns_prov("ref")] = v
-                else:
-                    subelem.text = v
-        return xml_bundle_root
-
-    def deserialize(self, stream, **kwargs):
-        """
-        Deserialize from `PROV-XML <http://www.w3.org/TR/prov-xml/>`_
-        representation to a :class:`~prov.model.ProvDocument` instance.
-
-        :param stream: Input data.
-        """
-        if isinstance(stream, io.TextIOBase):
-            with io.BytesIO() as buf:
-                buf.write(stream.read().encode('utf-8'))
-                buf.seek(0, 0)
-                xml_doc = etree.parse(buf).getroot()
-        else:
-            xml_doc = etree.parse(stream).getroot()
-
-        # Remove all comments.
-        for c in xml_doc.xpath("//comment()"):
-            p = c.getparent()
-            p.remove(c)
-
-        document = prov.model.ProvDocument()
-        self.deserialize_subtree(xml_doc, document)
-        return document
-
-    def deserialize_subtree(self, xml_doc, bundle):
-        """
-        Deserialize an etree element containing a PROV document or a bundle
-        and write it to the provided internal object.
-
-        :param xml_doc: An etree element containing the information to read.
-        :param bundle: The bundle object to write to.
-        """
-
-        for element in xml_doc:
-            qname = etree.QName(element)
-            if qname.namespace != DEFAULT_NAMESPACES["prov"].uri:
-                raise ProvXMLException("Non PROV element discovered in "
-                                       "document or bundle.")
-            # Ignore the <prov:other> element storing non-PROV information.
-            if qname.localname == "other":
-                warnings.warn(
-                    "Document contains non-PROV information in "
-                    "<prov:other>. It will be ignored in this package.",
-                    UserWarning)
-                continue
-
-            id_tag = _ns_prov("id")
-            rec_id = element.attrib[id_tag] if id_tag in element.attrib \
-                else None
-
-            if rec_id is not None:
-                # Try to make a qualified name out of it!
-                rec_id = xml_qname_to_QualifiedName(element, rec_id)
-
-            # Recursively read bundles.
-            if qname.localname == "bundleContent":
-                b = bundle.bundle(identifier=rec_id)
-                self.deserialize_subtree(element, b)
-                continue
-
-            attributes = _extract_attributes(element)
-
-            # Map the record type to its base type.
-            q_prov_name = FULL_PROV_RECORD_IDS_MAP[qname.localname]
-            rec_type = PROV_BASE_CLS[q_prov_name]
-
-            if _ns_xsi("type") in element.attrib:
-                value = xml_qname_to_QualifiedName(
-                    element, element.attrib[_ns_xsi("type")]
-                )
-                attributes.append((PROV["type"], value))
-
-            rec = bundle.new_record(rec_type, rec_id, attributes)
-
-            # Add the actual type in case a base type has been used.
-            if rec_type != q_prov_name:
-                rec.add_asserted_type(q_prov_name)
-        return bundle
-
-    def _derive_record_label(self, rec_type, attributes):
-        """
-        Helper function trying to derive the record label taking care of
-        subtypes and what not. It will also remove the type declaration for
-        the attributes if it was used to specialize the type.
-
-        :param rec_type: The type of records.
-        :param attributes: The attributes of the record.
-        """
-        rec_label = FULL_NAMES_MAP[rec_type]
-
-        for key, value in list(attributes):
-            if key != PROV_TYPE:
-                continue
-            if isinstance(value, prov.model.Literal):
-                value = value.value
-            if value in PROV_BASE_CLS and PROV_BASE_CLS[value] != value:
-                attributes.remove((key, value))
-                rec_label = FULL_NAMES_MAP[value]
-                break
-        return rec_label
-
-
-def _extract_attributes(element):
-    """
-    Extract the PROV attributes from an etree element.
-
-    :param element: The lxml.etree.Element instance.
-    """
-    attributes = []
-    for subel in element:
-        sqname = etree.QName(subel)
-        _t = xml_qname_to_QualifiedName(
-            subel, "%s:%s" % (subel.prefix, sqname.localname)
-        )
-
-        for key, value in subel.attrib.items():
-            if key == _ns_xsi("type"):
-                datatype = xml_qname_to_QualifiedName(subel, value)
-                if datatype == XSD_QNAME:
-                    _v = xml_qname_to_QualifiedName(subel, subel.text)
-                else:
-                    _v = prov.model.Literal(subel.text, datatype)
-            elif key == _ns_prov("ref"):
-                _v = xml_qname_to_QualifiedName(subel, value)
-            elif key == _ns_xml("lang"):
-                _v = prov.model.Literal(subel.text, langtag=value)
-            else:
-                warnings.warn(
-                    "The element '%s' contains an attribute %s='%s' "
-                    "which is not representable in the prov module's "
-                    "internal data model and will thus be ignored." %
-                    (_t, six.text_type(key), six.text_type(value)),
-                    UserWarning)
-
-        if not subel.attrib:
-            _v = subel.text
-
-        attributes.append((_t, _v))
-
-    return attributes
-
-
-def xml_qname_to_QualifiedName(element, qname_str):
-    if ':' in qname_str:
-        prefix, localpart = qname_str.split(':', 1)
-        if prefix in element.nsmap:
-            ns_uri = element.nsmap[prefix]
-            if ns_uri == XML_XSD_URI:
-                ns = XSD  # use the standard xsd namespace (i.e. with #)
-            elif ns_uri == PROV.uri:
-                ns = PROV
-            else:
-                ns = Namespace(prefix, ns_uri)
-            return ns[localpart]
-    # case 1: no colon
-    # case 2: unknown prefix
-    if None in element.nsmap:
-        ns_uri = element.nsmap[None]
-        ns = Namespace('', ns_uri)
-        return ns[qname_str]
-    # no default namespace
-    raise ProvXMLException(
-        'Could not create a valid QualifiedName for "%s"' % qname_str
-    )
-
-
-def _ns(ns, tag):
-    return "{%s}%s" % (ns, tag)
-
-
-def _ns_prov(tag):
-    return _ns(DEFAULT_NAMESPACES['prov'].uri, tag)
-
-
-def _ns_xsi(tag):
-    return _ns(DEFAULT_NAMESPACES['xsi'].uri, tag)
-
-
-def _ns_xml(tag):
-    NS_XML = "http://www.w3.org/XML/1998/namespace"
-    return _ns(NS_XML, tag)