comparison env/lib/python3.9/site-packages/prov/serializers/provjson.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 from __future__ import (absolute_import, division, print_function,
2 unicode_literals)
3
4 from collections import defaultdict
5 import datetime
6 import io
7 import json
8
9 from prov.serializers import Serializer, Error
10 from prov.constants import *
11 from prov.model import (Literal, Identifier, QualifiedName,
12 Namespace, ProvDocument, ProvBundle, first,
13 parse_xsd_datetime)
14
15 import logging
16 logger = logging.getLogger(__name__)
17
18 __author__ = 'Trung Dong Huynh'
19 __email__ = 'trungdong@donggiang.com'
20
21
22 class ProvJSONException(Error):
23 pass
24
25
26 class AnonymousIDGenerator:
27 def __init__(self):
28 self._cache = {}
29 self._count = 0
30
31 def get_anon_id(self, obj, local_prefix='id'):
32 if obj not in self._cache:
33 self._count += 1
34 self._cache[obj] = Identifier(
35 '_:%s%d' % (local_prefix, self._count)
36 )
37 return self._cache[obj]
38
39
40 # Reverse map for prov.model.XSD_DATATYPE_PARSERS
41 LITERAL_XSDTYPE_MAP = {
42 float: 'xsd:double',
43 int: 'xsd:int'
44 # boolean, string values are supported natively by PROV-JSON
45 # datetime values are converted separately
46 }
47
48 # Add long on Python 2
49 if six.integer_types[-1] not in LITERAL_XSDTYPE_MAP:
50 LITERAL_XSDTYPE_MAP[six.integer_types[-1]] = 'xsd:long'
51
52
53 class ProvJSONSerializer(Serializer):
54 """
55 PROV-JSON serializer for :class:`~prov.model.ProvDocument`
56 """
57 def serialize(self, stream, **kwargs):
58 """
59 Serializes a :class:`~prov.model.ProvDocument` instance to
60 `PROV-JSON <https://provenance.ecs.soton.ac.uk/prov-json/>`_.
61
62 :param stream: Where to save the output.
63 """
64 if six.PY2:
65 buf = io.BytesIO()
66 try:
67 json.dump(self.document, buf, cls=ProvJSONEncoder,
68 **kwargs)
69 buf.seek(0, 0)
70 # Right now this is a bytestream. If the object to stream to is
71 # a text object is must be decoded. We assume utf-8 here which
72 # should be fine for almost every case.
73 if isinstance(stream, io.TextIOBase):
74 stream.write(buf.read().decode('utf-8'))
75 else:
76 stream.write(buf.read())
77 finally:
78 buf.close()
79 else:
80 buf = io.StringIO()
81 try:
82 json.dump(self.document, buf, cls=ProvJSONEncoder,
83 **kwargs)
84 buf.seek(0, 0)
85 # Right now this is a bytestream. If the object to stream to is
86 # a text object is must be decoded. We assume utf-8 here which
87 # should be fine for almost every case.
88 if isinstance(stream, io.TextIOBase):
89 stream.write(buf.read())
90 else:
91 stream.write(buf.read().encode('utf-8'))
92 finally:
93 buf.close()
94
95 def deserialize(self, stream, **kwargs):
96 """
97 Deserialize from the `PROV JSON
98 <https://provenance.ecs.soton.ac.uk/prov-json/>`_ representation to a
99 :class:`~prov.model.ProvDocument` instance.
100
101 :param stream: Input data.
102 """
103 if not isinstance(stream, io.TextIOBase):
104 buf = io.StringIO(stream.read().decode('utf-8'))
105 stream = buf
106 return json.load(stream, cls=ProvJSONDecoder, **kwargs)
107
108
109 class ProvJSONEncoder(json.JSONEncoder):
110 def default(self, o):
111 if isinstance(o, ProvDocument):
112 return encode_json_document(o)
113 else:
114 return super(ProvJSONEncoder, self).encode(o)
115
116
117 class ProvJSONDecoder(json.JSONDecoder):
118 def decode(self, s, *args, **kwargs):
119 container = super(ProvJSONDecoder, self).decode(s, *args, **kwargs)
120 document = ProvDocument()
121 decode_json_document(container, document)
122 return document
123
124
125 # Encoding/decoding functions
126 def valid_qualified_name(bundle, value):
127 if value is None:
128 return None
129 qualified_name = bundle.valid_qualified_name(value)
130 return qualified_name
131
132
133 def encode_json_document(document):
134 container = encode_json_container(document)
135 for bundle in document.bundles:
136 # encoding the sub-bundle
137 bundle_json = encode_json_container(bundle)
138 container['bundle'][six.text_type(bundle.identifier)] = bundle_json
139 return container
140
141
142 def encode_json_container(bundle):
143 container = defaultdict(dict)
144 prefixes = {}
145 for namespace in bundle._namespaces.get_registered_namespaces():
146 prefixes[namespace.prefix] = namespace.uri
147 if bundle._namespaces._default:
148 prefixes['default'] = bundle._namespaces._default.uri
149 if prefixes:
150 container['prefix'] = prefixes
151
152 id_generator = AnonymousIDGenerator()
153
154 def real_or_anon_id(r):
155 return r._identifier if r._identifier else id_generator.get_anon_id(r)
156
157 for record in bundle._records:
158 rec_type = record.get_type()
159 rec_label = PROV_N_MAP[rec_type]
160 identifier = six.text_type(real_or_anon_id(record))
161
162 record_json = {}
163 if record._attributes:
164 for (attr, values) in record._attributes.items():
165 if not values:
166 continue
167 attr_name = six.text_type(attr)
168 if attr in PROV_ATTRIBUTE_QNAMES:
169 # TODO: QName export
170 record_json[attr_name] = six.text_type(first(values))
171 elif attr in PROV_ATTRIBUTE_LITERALS:
172 record_json[attr_name] = first(values).isoformat()
173 else:
174 if len(values) == 1:
175 # single value
176 record_json[attr_name] = encode_json_representation(
177 first(values)
178 )
179 else:
180 # multiple values
181 record_json[attr_name] = list(
182 encode_json_representation(value)
183 for value in values
184 )
185 # Check if the container already has the id of the record
186 if identifier not in container[rec_label]:
187 # this is the first instance, just put in the new record
188 container[rec_label][identifier] = record_json
189 else:
190 # the container already has some record(s) of the same identifier
191 # check if this is the second instance
192 current_content = container[rec_label][identifier]
193 if hasattr(current_content, 'items'):
194 # this is a dict, make it a singleton list
195 container[rec_label][identifier] = [current_content]
196 # now append the new record to the list
197 container[rec_label][identifier].append(record_json)
198
199 return container
200
201
202 def decode_json_document(content, document):
203 bundles = dict()
204 if 'bundle' in content:
205 bundles = content['bundle']
206 del content['bundle']
207
208 decode_json_container(content, document)
209
210 for bundle_id, bundle_content in bundles.items():
211 bundle = ProvBundle(document=document)
212 decode_json_container(bundle_content, bundle)
213 document.add_bundle(bundle, bundle.valid_qualified_name(bundle_id))
214
215
216 def decode_json_container(jc, bundle):
217 if 'prefix' in jc:
218 prefixes = jc['prefix']
219 for prefix, uri in prefixes.items():
220 if prefix != 'default':
221 bundle.add_namespace(Namespace(prefix, uri))
222 else:
223 bundle.set_default_namespace(uri)
224 del jc['prefix']
225
226 for rec_type_str in jc:
227 rec_type = PROV_RECORD_IDS_MAP[rec_type_str]
228 for rec_id, content in jc[rec_type_str].items():
229 if hasattr(content, 'items'): # it is a dict
230 # There is only one element, create a singleton list
231 elements = [content]
232 else:
233 # expect it to be a list of dictionaries
234 elements = content
235
236 for element in elements:
237 attributes = dict()
238 other_attributes = []
239 # this is for the multiple-entity membership hack to come
240 membership_extra_members = None
241 for attr_name, values in element.items():
242 attr = (
243 PROV_ATTRIBUTES_ID_MAP[attr_name]
244 if attr_name in PROV_ATTRIBUTES_ID_MAP
245 else valid_qualified_name(bundle, attr_name)
246 )
247 if attr in PROV_ATTRIBUTES:
248 if isinstance(values, list):
249 # only one value is allowed
250 if len(values) > 1:
251 # unless it is the membership hack
252 if rec_type == PROV_MEMBERSHIP and \
253 attr == PROV_ATTR_ENTITY:
254 # This is a membership relation with
255 # multiple entities
256 # HACK: create multiple membership
257 # relations, one for each entity
258
259 # Store all the extra entities
260 membership_extra_members = values[1:]
261 # Create the first membership relation as
262 # normal for the first entity
263 value = values[0]
264 else:
265 error_msg = (
266 'The prov package does not support PROV'
267 ' attributes having multiple values.'
268 )
269 logger.error(error_msg)
270 raise ProvJSONException(error_msg)
271 else:
272 value = values[0]
273 else:
274 value = values
275 value = (
276 valid_qualified_name(bundle, value)
277 if attr in PROV_ATTRIBUTE_QNAMES
278 else parse_xsd_datetime(value)
279 )
280 attributes[attr] = value
281 else:
282 if isinstance(values, list):
283 other_attributes.extend(
284 (
285 attr,
286 decode_json_representation(value, bundle)
287 )
288 for value in values
289 )
290 else:
291 # single value
292 other_attributes.append(
293 (
294 attr,
295 decode_json_representation(values, bundle)
296 )
297 )
298 bundle.new_record(
299 rec_type, rec_id, attributes, other_attributes
300 )
301 # HACK: creating extra (unidentified) membership relations
302 if membership_extra_members:
303 collection = attributes[PROV_ATTR_COLLECTION]
304 for member in membership_extra_members:
305 bundle.membership(
306 collection, valid_qualified_name(bundle, member)
307 )
308
309
310 def encode_json_representation(value):
311 if isinstance(value, Literal):
312 return literal_json_representation(value)
313 elif isinstance(value, datetime.datetime):
314 return {'$': value.isoformat(), 'type': 'xsd:dateTime'}
315 elif isinstance(value, QualifiedName):
316 # TODO Manage prefix in the whole structure consistently
317 # TODO QName export
318 return {'$': str(value), 'type': PROV_QUALIFIEDNAME._str}
319 elif isinstance(value, Identifier):
320 return {'$': value.uri, 'type': 'xsd:anyURI'}
321 elif type(value) in LITERAL_XSDTYPE_MAP:
322 return {'$': value, 'type': LITERAL_XSDTYPE_MAP[type(value)]}
323 else:
324 return value
325
326
327 def decode_json_representation(literal, bundle):
328 if isinstance(literal, dict):
329 # complex type
330 value = literal['$']
331 datatype = literal['type'] if 'type' in literal else None
332 datatype = valid_qualified_name(bundle, datatype)
333 langtag = literal['lang'] if 'lang' in literal else None
334 if datatype == XSD_ANYURI:
335 return Identifier(value)
336 elif datatype == PROV_QUALIFIEDNAME:
337 return valid_qualified_name(bundle, value)
338 else:
339 # The literal of standard Python types is not converted here
340 # It will be automatically converted when added to a record by
341 # _auto_literal_conversion()
342 return Literal(value, datatype, langtag)
343 else:
344 # simple type, just return it
345 return literal
346
347
348 def literal_json_representation(literal):
349 # TODO: QName export
350 value, datatype, langtag = literal.value, literal.datatype, literal.langtag
351 if langtag:
352 return {'$': value, 'lang': langtag}
353 else:
354 return {'$': value, 'type': six.text_type(datatype)}