comparison env/lib/python3.9/site-packages/prov/serializers/provxml.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 from __future__ import (absolute_import, division, print_function,
2 unicode_literals)
3
4 import datetime
5 import logging
6 from lxml import etree
7 import io
8 import warnings
9 import prov
10 import prov.identifier
11 from prov.model import DEFAULT_NAMESPACES, sorted_attributes
12 from prov.constants import * # NOQA
13
14
15 __author__ = 'Lion Krischer'
16 __email__ = 'krischer@geophysik.uni-muenchen.de'
17
18 logger = logging.getLogger(__name__)
19
20 # Create a dictionary containing all top-level PROV XML elements for an easy
21 # mapping.
22 FULL_NAMES_MAP = dict(PROV_N_MAP)
23 FULL_NAMES_MAP.update(ADDITIONAL_N_MAP)
24 # Inverse mapping.
25 FULL_PROV_RECORD_IDS_MAP = dict((FULL_NAMES_MAP[rec_type_id], rec_type_id) for
26 rec_type_id in FULL_NAMES_MAP)
27
28 XML_XSD_URI = 'http://www.w3.org/2001/XMLSchema'
29
30
31 class ProvXMLException(prov.Error):
32 pass
33
34
35 class ProvXMLSerializer(prov.serializers.Serializer):
36 """PROV-XML serializer for :class:`~prov.model.ProvDocument`
37 """
38 def serialize(self, stream, force_types=False, **kwargs):
39 """
40 Serializes a :class:`~prov.model.ProvDocument` instance to `PROV-XML
41 <http://www.w3.org/TR/prov-xml/>`_.
42
43 :param stream: Where to save the output.
44 :type force_types: boolean, optional
45 :param force_types: Will force xsd:types to be written for most
46 attributes mainly PROV-"attributes", e.g. tags not in the
47 PROV namespace. Off by default meaning xsd:type attributes will
48 only be set for prov:type, prov:location, and prov:value as is
49 done in the official PROV-XML specification. Furthermore the
50 types will always be set if the Python type requires it. False
51 is a good default and it should rarely require changing.
52 """
53 xml_root = self.serialize_bundle(bundle=self.document,
54 force_types=force_types)
55 for bundle in self.document.bundles:
56 self.serialize_bundle(bundle=bundle, element=xml_root,
57 force_types=force_types)
58 # No encoding must be specified when writing to String object which
59 # does not have the concept of an encoding as it should already
60 # represent unicode code points.
61 et = etree.ElementTree(xml_root)
62 if isinstance(stream, io.TextIOBase):
63 stream.write(etree.tostring(et, xml_declaration=True,
64 pretty_print=True).decode('utf-8'))
65 else:
66 et.write(stream, pretty_print=True, xml_declaration=True,
67 encoding="UTF-8")
68
69 def serialize_bundle(self, bundle, element=None, force_types=False):
70 """
71 Serializes a bundle or document to PROV XML.
72
73 :param bundle: The bundle or document.
74 :param element: The XML element to write to. Will be created if None.
75 :type force_types: boolean, optional
76 :param force_types: Will force xsd:types to be written for most
77 attributes mainly PROV-"attributes", e.g. tags not in the
78 PROV namespace. Off by default meaning xsd:type attributes will
79 only be set for prov:type, prov:location, and prov:value as is
80 done in the official PROV-XML specification. Furthermore the
81 types will always be set if the Python type requires it. False
82 is a good default and it should rarely require changing.
83 """
84 # Build the namespace map for lxml and attach it to the root XML
85 # element. No dictionary comprehension in Python 2.6!
86 nsmap = dict((ns.prefix, ns.uri) for ns in
87 self.document._namespaces.get_registered_namespaces())
88 if self.document._namespaces._default:
89 nsmap[None] = self.document._namespaces._default.uri
90 for namespace in bundle.namespaces:
91 if namespace not in nsmap:
92 nsmap[namespace.prefix] = namespace.uri
93
94 for key, value in DEFAULT_NAMESPACES.items():
95 uri = value.uri
96 if value.prefix == "xsd":
97 # The XSD namespace for some reason has no hash at the end
98 # for PROV XML, but for all other serializations it does.
99 uri = uri.rstrip("#")
100 nsmap[value.prefix] = uri
101
102 if element is not None:
103 xml_bundle_root = etree.SubElement(
104 element, _ns_prov("bundleContent"), nsmap=nsmap)
105 else:
106 xml_bundle_root = etree.Element(_ns_prov("document"), nsmap=nsmap)
107
108 if bundle.identifier:
109 xml_bundle_root.attrib[_ns_prov("id")] = \
110 six.text_type(bundle.identifier)
111
112 for record in bundle._records:
113 rec_type = record.get_type()
114 identifier = six.text_type(record._identifier) \
115 if record._identifier else None
116
117 if identifier:
118 attrs = {_ns_prov("id"): identifier}
119 else:
120 attrs = None
121
122 # Derive the record label from its attributes which is sometimes
123 # needed.
124 attributes = list(record.attributes)
125 rec_label = self._derive_record_label(rec_type, attributes)
126
127 elem = etree.SubElement(xml_bundle_root,
128 _ns_prov(rec_label), attrs)
129
130 for attr, value in sorted_attributes(rec_type, attributes):
131 subelem = etree.SubElement(
132 elem, _ns(attr.namespace.uri, attr.localpart))
133 if isinstance(value, prov.model.Literal):
134 if value.datatype not in \
135 [None, PROV["InternationalizedString"]]:
136 subelem.attrib[_ns_xsi("type")] = "%s:%s" % (
137 value.datatype.namespace.prefix,
138 value.datatype.localpart)
139 if value.langtag is not None:
140 subelem.attrib[_ns_xml("lang")] = value.langtag
141 v = value.value
142 elif isinstance(value, prov.model.QualifiedName):
143 if attr not in PROV_ATTRIBUTE_QNAMES:
144 subelem.attrib[_ns_xsi("type")] = "xsd:QName"
145 v = six.text_type(value)
146 elif isinstance(value, datetime.datetime):
147 v = value.isoformat()
148 else:
149 v = six.text_type(value)
150
151 # xsd type inference.
152 #
153 # This is a bit messy and there are all kinds of special
154 # rules but it appears to get the job done.
155 #
156 # If it is a type element and does not yet have an
157 # associated xsi type, try to infer it from the value.
158 # The not startswith("prov:") check is a little bit hacky to
159 # avoid type interference when the type is a standard prov
160 # type.
161 #
162 # To enable a mapping of Python types to XML and back,
163 # the XSD type must be written for these types.
164 ALWAYS_CHECK = [bool, datetime.datetime, float,
165 prov.identifier.Identifier]
166 # Add long and int on Python 2, only int on Python 3.
167 ALWAYS_CHECK.extend(six.integer_types)
168 ALWAYS_CHECK = tuple(ALWAYS_CHECK)
169 if (force_types or
170 type(value) in ALWAYS_CHECK or
171 attr in [PROV_TYPE, PROV_LOCATION, PROV_VALUE]) and \
172 _ns_xsi("type") not in subelem.attrib and \
173 not six.text_type(value).startswith("prov:") and \
174 not (attr in PROV_ATTRIBUTE_QNAMES and v) and \
175 attr not in [PROV_ATTR_TIME, PROV_LABEL]:
176 xsd_type = None
177 if isinstance(value, bool):
178 xsd_type = XSD_BOOLEAN
179 v = v.lower()
180 elif isinstance(value, six.string_types):
181 xsd_type = XSD_STRING
182 elif isinstance(value, float):
183 xsd_type = XSD_DOUBLE
184 elif isinstance(value, six.integer_types):
185 xsd_type = XSD_INT
186 elif isinstance(value, datetime.datetime):
187 # Exception of the exception, while technically
188 # still correct, do not write XSD dateTime type for
189 # attributes in the PROV namespaces as the type is
190 # already declared in the XSD and PROV XML also does
191 # not specify it in the docs.
192 if attr.namespace.prefix != "prov" \
193 or "time" not in attr.localpart.lower():
194 xsd_type = XSD_DATETIME
195 elif isinstance(value, prov.identifier.Identifier):
196 xsd_type = XSD_ANYURI
197
198 if xsd_type is not None:
199 subelem.attrib[_ns_xsi("type")] = \
200 six.text_type(xsd_type)
201
202 if attr in PROV_ATTRIBUTE_QNAMES and v:
203 subelem.attrib[_ns_prov("ref")] = v
204 else:
205 subelem.text = v
206 return xml_bundle_root
207
208 def deserialize(self, stream, **kwargs):
209 """
210 Deserialize from `PROV-XML <http://www.w3.org/TR/prov-xml/>`_
211 representation to a :class:`~prov.model.ProvDocument` instance.
212
213 :param stream: Input data.
214 """
215 if isinstance(stream, io.TextIOBase):
216 with io.BytesIO() as buf:
217 buf.write(stream.read().encode('utf-8'))
218 buf.seek(0, 0)
219 xml_doc = etree.parse(buf).getroot()
220 else:
221 xml_doc = etree.parse(stream).getroot()
222
223 # Remove all comments.
224 for c in xml_doc.xpath("//comment()"):
225 p = c.getparent()
226 p.remove(c)
227
228 document = prov.model.ProvDocument()
229 self.deserialize_subtree(xml_doc, document)
230 return document
231
232 def deserialize_subtree(self, xml_doc, bundle):
233 """
234 Deserialize an etree element containing a PROV document or a bundle
235 and write it to the provided internal object.
236
237 :param xml_doc: An etree element containing the information to read.
238 :param bundle: The bundle object to write to.
239 """
240
241 for element in xml_doc:
242 qname = etree.QName(element)
243 if qname.namespace != DEFAULT_NAMESPACES["prov"].uri:
244 raise ProvXMLException("Non PROV element discovered in "
245 "document or bundle.")
246 # Ignore the <prov:other> element storing non-PROV information.
247 if qname.localname == "other":
248 warnings.warn(
249 "Document contains non-PROV information in "
250 "<prov:other>. It will be ignored in this package.",
251 UserWarning)
252 continue
253
254 id_tag = _ns_prov("id")
255 rec_id = element.attrib[id_tag] if id_tag in element.attrib \
256 else None
257
258 if rec_id is not None:
259 # Try to make a qualified name out of it!
260 rec_id = xml_qname_to_QualifiedName(element, rec_id)
261
262 # Recursively read bundles.
263 if qname.localname == "bundleContent":
264 b = bundle.bundle(identifier=rec_id)
265 self.deserialize_subtree(element, b)
266 continue
267
268 attributes = _extract_attributes(element)
269
270 # Map the record type to its base type.
271 q_prov_name = FULL_PROV_RECORD_IDS_MAP[qname.localname]
272 rec_type = PROV_BASE_CLS[q_prov_name]
273
274 if _ns_xsi("type") in element.attrib:
275 value = xml_qname_to_QualifiedName(
276 element, element.attrib[_ns_xsi("type")]
277 )
278 attributes.append((PROV["type"], value))
279
280 rec = bundle.new_record(rec_type, rec_id, attributes)
281
282 # Add the actual type in case a base type has been used.
283 if rec_type != q_prov_name:
284 rec.add_asserted_type(q_prov_name)
285 return bundle
286
287 def _derive_record_label(self, rec_type, attributes):
288 """
289 Helper function trying to derive the record label taking care of
290 subtypes and what not. It will also remove the type declaration for
291 the attributes if it was used to specialize the type.
292
293 :param rec_type: The type of records.
294 :param attributes: The attributes of the record.
295 """
296 rec_label = FULL_NAMES_MAP[rec_type]
297
298 for key, value in list(attributes):
299 if key != PROV_TYPE:
300 continue
301 if isinstance(value, prov.model.Literal):
302 value = value.value
303 if value in PROV_BASE_CLS and PROV_BASE_CLS[value] != value:
304 attributes.remove((key, value))
305 rec_label = FULL_NAMES_MAP[value]
306 break
307 return rec_label
308
309
310 def _extract_attributes(element):
311 """
312 Extract the PROV attributes from an etree element.
313
314 :param element: The lxml.etree.Element instance.
315 """
316 attributes = []
317 for subel in element:
318 sqname = etree.QName(subel)
319 _t = xml_qname_to_QualifiedName(
320 subel, "%s:%s" % (subel.prefix, sqname.localname)
321 )
322
323 for key, value in subel.attrib.items():
324 if key == _ns_xsi("type"):
325 datatype = xml_qname_to_QualifiedName(subel, value)
326 if datatype == XSD_QNAME:
327 _v = xml_qname_to_QualifiedName(subel, subel.text)
328 else:
329 _v = prov.model.Literal(subel.text, datatype)
330 elif key == _ns_prov("ref"):
331 _v = xml_qname_to_QualifiedName(subel, value)
332 elif key == _ns_xml("lang"):
333 _v = prov.model.Literal(subel.text, langtag=value)
334 else:
335 warnings.warn(
336 "The element '%s' contains an attribute %s='%s' "
337 "which is not representable in the prov module's "
338 "internal data model and will thus be ignored." %
339 (_t, six.text_type(key), six.text_type(value)),
340 UserWarning)
341
342 if not subel.attrib:
343 _v = subel.text
344
345 attributes.append((_t, _v))
346
347 return attributes
348
349
350 def xml_qname_to_QualifiedName(element, qname_str):
351 if ':' in qname_str:
352 prefix, localpart = qname_str.split(':', 1)
353 if prefix in element.nsmap:
354 ns_uri = element.nsmap[prefix]
355 if ns_uri == XML_XSD_URI:
356 ns = XSD # use the standard xsd namespace (i.e. with #)
357 elif ns_uri == PROV.uri:
358 ns = PROV
359 else:
360 ns = Namespace(prefix, ns_uri)
361 return ns[localpart]
362 # case 1: no colon
363 # case 2: unknown prefix
364 if None in element.nsmap:
365 ns_uri = element.nsmap[None]
366 ns = Namespace('', ns_uri)
367 return ns[qname_str]
368 # no default namespace
369 raise ProvXMLException(
370 'Could not create a valid QualifiedName for "%s"' % qname_str
371 )
372
373
374 def _ns(ns, tag):
375 return "{%s}%s" % (ns, tag)
376
377
378 def _ns_prov(tag):
379 return _ns(DEFAULT_NAMESPACES['prov'].uri, tag)
380
381
382 def _ns_xsi(tag):
383 return _ns(DEFAULT_NAMESPACES['xsi'].uri, tag)
384
385
386 def _ns_xml(tag):
387 NS_XML = "http://www.w3.org/XML/1998/namespace"
388 return _ns(NS_XML, tag)