Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/prov/serializers/provrdf.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/prov/serializers/provrdf.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,593 @@ +"""PROV-RDF serializers for ProvDocument +""" +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +import base64 +from collections import OrderedDict +import datetime +import io + +import dateutil.parser +import six + +from rdflib.term import URIRef, BNode +from rdflib.term import Literal as RDFLiteral +from rdflib.graph import ConjunctiveGraph +from rdflib.namespace import RDF, RDFS, XSD + +import prov.model as pm +from prov.constants import ( + PROV, PROV_ID_ATTRIBUTES_MAP, PROV_N_MAP, PROV_BASE_CLS, XSD_QNAME, + PROV_END, PROV_START, PROV_USAGE, PROV_GENERATION, PROV_DERIVATION, PROV_INVALIDATION, + PROV_ALTERNATE, PROV_MENTION, PROV_DELEGATION, PROV_ACTIVITY, PROV_ATTR_STARTTIME, + PROV_ATTR_ENDTIME, PROV_LOCATION, PROV_ATTR_TIME, PROV_ROLE, PROV_COMMUNICATION, + PROV_ATTR_INFORMANT, PROV_ATTR_RESPONSIBLE, PROV_ATTR_TRIGGER, PROV_ATTR_ENDER, + PROV_ATTR_STARTER, PROV_ATTR_USED_ENTITY) +from prov.serializers import Serializer, Error + + +__author__ = 'Satrajit S. Ghosh' +__email__ = 'satra@mit.edu' + + +class ProvRDFException(Error): + pass + + +class AnonymousIDGenerator: + def __init__(self): + self._cache = {} + self._count = 0 + + def get_anon_id(self, obj, local_prefix="id"): + if obj not in self._cache: + self._count += 1 + self._cache[obj] = pm.Identifier( + '_:%s%d' % (local_prefix, self._count) + ).uri + return self._cache[obj] + + +# Reverse map for prov.model.XSD_DATATYPE_PARSERS +LITERAL_XSDTYPE_MAP = { + float: XSD['double'], + int: XSD['int'], + six.text_type: XSD['string'], + # boolean, string values are supported natively by PROV-RDF + # datetime values are converted separately +} + +# Add long on Python 2 +if six.integer_types[-1] not in LITERAL_XSDTYPE_MAP: + LITERAL_XSDTYPE_MAP[six.integer_types[-1]] = XSD['long'] + + +def attr2rdf(attr): + return URIRef(PROV[PROV_ID_ATTRIBUTES_MAP[attr].split('prov:')[1]].uri) + + +def valid_qualified_name(bundle, value, xsd_qname=False): + if value is None: + return None + qualified_name = bundle.valid_qualified_name(value) + return qualified_name if not xsd_qname else XSD_QNAME(qualified_name) + + +class ProvRDFSerializer(Serializer): + """ + PROV-O serializer for :class:`~prov.model.ProvDocument` + """ + + def serialize(self, stream=None, rdf_format='trig', **kwargs): + """ + Serializes a :class:`~prov.model.ProvDocument` instance to + `PROV-O <https://www.w3.org/TR/prov-o/>`_. + + :param stream: Where to save the output. + :param rdf_format: The RDF format of the output, default to TRiG. + """ + container = self.encode_document(self.document) + newargs = kwargs.copy() + newargs['format'] = rdf_format + + if six.PY2: + buf = io.BytesIO() + try: + container.serialize(buf, **newargs) + buf.seek(0, 0) + # Right now this is a bytestream. If the object to stream to is + # a text object is must be decoded. We assume utf-8 here which + # should be fine for almost every case. + if isinstance(stream, io.TextIOBase): + stream.write(buf.read().decode('utf-8')) + else: + stream.write(buf.read()) + finally: + buf.close() + else: + buf = io.BytesIO() + try: + container.serialize(buf, **newargs) + buf.seek(0, 0) + # Right now this is a bytestream. If the object to stream to is + # a text object is must be decoded. We assume utf-8 here which + # should be fine for almost every case. + if isinstance(stream, io.TextIOBase): + stream.write(buf.read().decode('utf-8')) + else: + stream.write(buf.read()) + finally: + buf.close() + + def deserialize(self, stream, rdf_format='trig', **kwargs): + """ + Deserialize from the `PROV-O <https://www.w3.org/TR/prov-o/>`_ + representation to a :class:`~prov.model.ProvDocument` instance. + + :param stream: Input data. + :param rdf_format: The RDF format of the input data, default: TRiG. + """ + newargs = kwargs.copy() + newargs['format'] = rdf_format + container = ConjunctiveGraph() + container.parse(stream, **newargs) + document = pm.ProvDocument() + self.document = document + self.decode_document(container, document) + return document + + def valid_identifier(self, value): + return self.document.valid_qualified_name(value) + + def encode_rdf_representation(self, value): + if isinstance(value, URIRef): + return value + elif isinstance(value, pm.Literal): + return literal_rdf_representation(value) + elif isinstance(value, datetime.datetime): + return RDFLiteral(value.isoformat(), datatype=XSD['dateTime']) + elif isinstance(value, pm.QualifiedName): + return URIRef(value.uri) + elif isinstance(value, pm.Identifier): + return RDFLiteral(value.uri, datatype=XSD['anyURI']) + elif type(value) in LITERAL_XSDTYPE_MAP: + return RDFLiteral(value, datatype=LITERAL_XSDTYPE_MAP[type(value)]) + else: + return RDFLiteral(value) + + def decode_rdf_representation(self, literal, graph): + if isinstance(literal, RDFLiteral): + value = literal.value if literal.value is not None else literal + datatype = literal.datatype if hasattr(literal, 'datatype') else None + langtag = literal.language if hasattr(literal, 'language') else None + if datatype and 'XMLLiteral' in datatype: + value = literal + if datatype and 'base64Binary' in datatype: + value = base64.standard_b64encode(value) + if datatype == XSD['QName']: + return pm.Literal(literal, datatype=XSD_QNAME) + if datatype == XSD['dateTime']: + return dateutil.parser.parse(literal) + if datatype == XSD['gYear']: + return pm.Literal(dateutil.parser.parse(literal).year, + datatype=self.valid_identifier(datatype)) + if datatype == XSD['gYearMonth']: + parsed_info = dateutil.parser.parse(literal) + return pm.Literal('{0}-{1:02d}'.format(parsed_info.year, parsed_info.month), + datatype=self.valid_identifier(datatype)) + else: + # The literal of standard Python types is not converted here + # It will be automatically converted when added to a record by + # _auto_literal_conversion() + return pm.Literal(value, self.valid_identifier(datatype), langtag) + elif isinstance(literal, URIRef): + rval = self.valid_identifier(literal) + if rval is None: + prefix, iri, _ = graph.namespace_manager.compute_qname(literal) + ns = self.document.add_namespace(prefix, iri) + rval = pm.QualifiedName(ns, literal.replace(ns.uri, '')) + return rval + else: + # simple type, just return it + return literal + + def encode_document(self, document): + container = self.encode_container(document) + for item in document.bundles: + # encoding the sub-bundle + bundle = self.encode_container(item, identifier=item.identifier.uri) + container.addN(bundle.quads()) + return container + + def encode_container(self, bundle, container=None, identifier=None): + if container is None: + container = ConjunctiveGraph(identifier=identifier) + nm = container.namespace_manager + nm.bind('prov', PROV.uri) + + for namespace in bundle.namespaces: + container.bind(namespace.prefix, namespace.uri) + + id_generator = AnonymousIDGenerator() + real_or_anon_id = lambda record: record._identifier.uri if \ + record._identifier else id_generator.get_anon_id(record) + + for record in bundle._records: + rec_type = record.get_type() + if hasattr(record, 'identifier') and record.identifier: + identifier = URIRef(six.text_type(real_or_anon_id(record))) + container.add((identifier, RDF.type, URIRef(rec_type.uri))) + else: + identifier = None + if record.attributes: + bnode = None + formal_objects = [] + used_objects = [] + all_attributes = list(record.formal_attributes) + list(record.attributes) + formal_qualifiers = False + for attrid, (attr, value) in enumerate(list(record.formal_attributes)): + if (identifier is not None and value is not None) or \ + (identifier is None and value is not None and attrid > 1): + formal_qualifiers = True + has_qualifiers = len(record.extra_attributes) > 0 or formal_qualifiers + for idx, (attr, value) in enumerate(all_attributes): + if record.is_relation(): + pred = URIRef(PROV[PROV_N_MAP[rec_type]].uri) + # create bnode relation + if bnode is None: + valid_formal_indices = set() + for idx, (key, val) in enumerate(record.formal_attributes): + formal_objects.append(key) + if val: + valid_formal_indices.add(idx) + used_objects = [record.formal_attributes[0][0]] + subj = None + if record.formal_attributes[0][1]: + subj = URIRef(record.formal_attributes[0][1].uri) + if identifier is None and subj is not None: + try: + obj_val = record.formal_attributes[1][1] + obj_attr = URIRef(record.formal_attributes[1][0].uri) + # TODO: Why is obj_attr above not used anywhere? + except IndexError: + obj_val = None + if obj_val and (rec_type not in {PROV_END, + PROV_START, + PROV_USAGE, + PROV_GENERATION, + PROV_DERIVATION, + PROV_INVALIDATION} or + (valid_formal_indices == {0, 1} and + len(record.extra_attributes) == 0)): + used_objects.append(record.formal_attributes[1][0]) + obj_val = self.encode_rdf_representation(obj_val) + if rec_type == PROV_ALTERNATE: + subj, obj_val = obj_val, subj + container.add((subj, pred, obj_val)) + if rec_type == PROV_MENTION: + if record.formal_attributes[2][1]: + used_objects.append(record.formal_attributes[2][0]) + obj_val = self.encode_rdf_representation(record.formal_attributes[2][1]) + container.add((subj, URIRef(PROV['asInBundle'].uri), obj_val)) + has_qualifiers = False + if rec_type in [PROV_ALTERNATE]: + continue + if subj and (has_qualifiers or identifier): + qualifier = rec_type._localpart + rec_uri = rec_type.uri + for attr_name, val in record.extra_attributes: + if attr_name == PROV['type']: + if PROV['Revision'] == val or \ + PROV['Quotation'] == val or \ + PROV['PrimarySource'] == val: + qualifier = val._localpart + rec_uri = val.uri + if identifier is not None: + container.remove((identifier, + RDF.type, + URIRef(rec_type.uri))) + QRole = URIRef(PROV['qualified' + qualifier].uri) + if identifier is not None: + container.add((subj, QRole, identifier)) + else: + bnode = identifier = BNode() + container.add((subj, QRole, identifier)) + container.add( + (identifier, RDF.type, URIRef(rec_uri)) + ) # reset identifier to BNode + if value is not None and attr not in used_objects: + if attr in formal_objects: + pred = attr2rdf(attr) + elif attr == PROV['role']: + pred = URIRef(PROV['hadRole'].uri) + elif attr == PROV['plan']: + pred = URIRef(PROV['hadPlan'].uri) + elif attr == PROV['type']: + pred = RDF.type + elif attr == PROV['label']: + pred = RDFS.label + elif isinstance(attr, pm.QualifiedName): + pred = URIRef(attr.uri) + else: + pred = self.encode_rdf_representation(attr) + if PROV['plan'].uri in pred: + pred = URIRef(PROV['hadPlan'].uri) + if PROV['informant'].uri in pred: + pred = URIRef(PROV['activity'].uri) + if PROV['responsible'].uri in pred: + pred = URIRef(PROV['agent'].uri) + if rec_type == PROV_DELEGATION and PROV['activity'].uri in pred: + pred = URIRef(PROV['hadActivity'].uri) + if (rec_type in [PROV_END, PROV_START] and PROV['trigger'].uri in pred) or\ + (rec_type in [PROV_USAGE] and PROV['used'].uri in pred): + pred = URIRef(PROV['entity'].uri) + if rec_type in [PROV_GENERATION, PROV_END, + PROV_START, PROV_USAGE, + PROV_INVALIDATION]: + if PROV['time'].uri in pred: + pred = URIRef(PROV['atTime'].uri) + if PROV['ender'].uri in pred: + pred = URIRef(PROV['hadActivity'].uri) + if PROV['starter'].uri in pred: + pred = URIRef(PROV['hadActivity'].uri) + if PROV['location'].uri in pred: + pred = URIRef(PROV['atLocation'].uri) + if rec_type in [PROV_ACTIVITY]: + if PROV_ATTR_STARTTIME in pred: + pred = URIRef(PROV['startedAtTime'].uri) + if PROV_ATTR_ENDTIME in pred: + pred = URIRef(PROV['endedAtTime'].uri) + if rec_type == PROV_DERIVATION: + if PROV['activity'].uri in pred: + pred = URIRef(PROV['hadActivity'].uri) + if PROV['generation'].uri in pred: + pred = URIRef(PROV['hadGeneration'].uri) + if PROV['usage'].uri in pred: + pred = URIRef(PROV['hadUsage'].uri) + if PROV['usedEntity'].uri in pred: + pred = URIRef(PROV['entity'].uri) + container.add((identifier, pred, + self.encode_rdf_representation(value))) + continue + if value is None: + continue + if isinstance(value, pm.ProvRecord): + obj = URIRef(six.text_type(real_or_anon_id(value))) + else: + # Assuming this is a datetime value + obj = self.encode_rdf_representation(value) + if attr == PROV['location']: + pred = URIRef(PROV['atLocation'].uri) + if False and isinstance(value, (URIRef, pm.QualifiedName)): + if isinstance(value, pm.QualifiedName): + value = URIRef(value.uri) + container.add((identifier, pred, value)) + else: + container.add((identifier, pred, + self.encode_rdf_representation(obj))) + continue + if attr == PROV['type']: + pred = RDF.type + elif attr == PROV['label']: + pred = RDFS.label + elif attr == PROV_ATTR_STARTTIME: + pred = URIRef(PROV['startedAtTime'].uri) + elif attr == PROV_ATTR_ENDTIME: + pred = URIRef(PROV['endedAtTime'].uri) + else: + pred = self.encode_rdf_representation(attr) + container.add((identifier, pred, obj)) + return container + + def decode_document(self, content, document): + for prefix, url in content.namespaces(): + document.add_namespace(prefix, six.text_type(url)) + if hasattr(content, 'contexts'): + for graph in content.contexts(): + if isinstance(graph.identifier, BNode): + self.decode_container(graph, document) + else: + bundle_id = six.text_type(graph.identifier) + bundle = document.bundle(bundle_id) + self.decode_container(graph, bundle) + else: + self.decode_container(content, document) + + def decode_container(self, graph, bundle): + ids = {} + PROV_CLS_MAP = {} + formal_attributes = {} + unique_sets = {} + for key, val in PROV_BASE_CLS.items(): + PROV_CLS_MAP[key.uri] = PROV_BASE_CLS[key] + relation_mapper = {URIRef(PROV['alternateOf'].uri): 'alternate', + URIRef(PROV['actedOnBehalfOf'].uri): 'delegation', + URIRef(PROV['specializationOf'].uri): 'specialization', + URIRef(PROV['mentionOf'].uri): 'mention', + URIRef(PROV['wasAssociatedWith'].uri): 'association', + URIRef(PROV['wasDerivedFrom'].uri): 'derivation', + URIRef(PROV['wasAttributedTo'].uri): 'attribution', + URIRef(PROV['wasInformedBy'].uri): 'communication', + URIRef(PROV['wasGeneratedBy'].uri): 'generation', + URIRef(PROV['wasInfluencedBy'].uri): 'influence', + URIRef(PROV['wasInvalidatedBy'].uri): 'invalidation', + URIRef(PROV['wasEndedBy'].uri): 'end', + URIRef(PROV['wasStartedBy'].uri): 'start', + URIRef(PROV['hadMember'].uri): 'membership', + URIRef(PROV['used'].uri): 'usage', + } + predicate_mapper = {RDFS.label: pm.PROV['label'], + URIRef(PROV['atLocation'].uri): PROV_LOCATION, + URIRef(PROV['startedAtTime'].uri): PROV_ATTR_STARTTIME, + URIRef(PROV['endedAtTime'].uri): PROV_ATTR_ENDTIME, + URIRef(PROV['atTime'].uri): PROV_ATTR_TIME, + URIRef(PROV['hadRole'].uri): PROV_ROLE, + URIRef(PROV['hadPlan'].uri): pm.PROV_ATTR_PLAN, + URIRef(PROV['hadUsage'].uri): pm.PROV_ATTR_USAGE, + URIRef(PROV['hadGeneration'].uri): pm.PROV_ATTR_GENERATION, + URIRef(PROV['hadActivity'].uri): pm.PROV_ATTR_ACTIVITY, + } + other_attributes = {} + for stmt in graph.triples((None, RDF.type, None)): + id = six.text_type(stmt[0]) + obj = six.text_type(stmt[2]) + if obj in PROV_CLS_MAP: + if not isinstance(stmt[0], BNode) and self.valid_identifier(id) is None: + prefix, iri, _ = graph.namespace_manager.compute_qname(id) + self.document.add_namespace(prefix, iri) + try: + prov_obj = PROV_CLS_MAP[obj] + except AttributeError: + prov_obj = None + add_attr = True + isderivation = pm.PROV['Revision'].uri in stmt[2] or \ + pm.PROV['Quotation'].uri in stmt[2] or \ + pm.PROV['PrimarySource'].uri in stmt[2] + if id not in ids and prov_obj and (prov_obj.uri == obj or + isderivation or + isinstance(stmt[0], BNode)): + ids[id] = prov_obj + klass = pm.PROV_REC_CLS[prov_obj] + formal_attributes[id] = OrderedDict([(key, None) for key in klass.FORMAL_ATTRIBUTES]) + unique_sets[id] = OrderedDict([(key, []) for key in klass.FORMAL_ATTRIBUTES]) + add_attr = False or ((isinstance(stmt[0], BNode) or isderivation) and prov_obj.uri != obj) + if add_attr: + if id not in other_attributes: + other_attributes[id] = [] + obj_formatted = self.decode_rdf_representation(stmt[2], graph) + other_attributes[id].append((pm.PROV['type'], obj_formatted)) + else: + if id not in other_attributes: + other_attributes[id] = [] + obj = self.decode_rdf_representation(stmt[2], graph) + other_attributes[id].append((pm.PROV['type'], obj)) + for id, pred, obj in graph: + id = six.text_type(id) + if id not in other_attributes: + other_attributes[id] = [] + if pred == RDF.type: + continue + if pred in relation_mapper: + if 'alternateOf' in pred: + getattr(bundle, relation_mapper[pred])(obj, id) + elif 'mentionOf' in pred: + mentionBundle = None + for stmt in graph.triples((URIRef(id), URIRef(pm.PROV['asInBundle'].uri), None)): + mentionBundle = stmt[2] + getattr(bundle, relation_mapper[pred])(id, six.text_type(obj), mentionBundle) + elif 'actedOnBehalfOf' in pred or 'wasAssociatedWith' in pred: + qualifier = 'qualified' + relation_mapper[pred].upper()[0] + relation_mapper[pred][1:] + qualifier_bnode = None + for stmt in graph.triples((URIRef(id), URIRef(pm.PROV[qualifier].uri), None)): + qualifier_bnode = stmt[2] + if qualifier_bnode is None: + getattr(bundle, relation_mapper[pred])(id, six.text_type(obj)) + else: + fakeys = list(formal_attributes[six.text_type(qualifier_bnode)].keys()) + formal_attributes[six.text_type(qualifier_bnode)][fakeys[0]] = id + formal_attributes[six.text_type(qualifier_bnode)][fakeys[1]] = six.text_type(obj) + else: + getattr(bundle, relation_mapper[pred])(id, six.text_type(obj)) + elif id in ids: + obj1 = self.decode_rdf_representation(obj, graph) + if obj is not None and obj1 is None: + raise ValueError(('Error transforming', obj)) + pred_new = pred + if pred in predicate_mapper: + pred_new = predicate_mapper[pred] + if ids[id] == PROV_COMMUNICATION and 'activity' in six.text_type(pred_new): + pred_new = PROV_ATTR_INFORMANT + if ids[id] == PROV_DELEGATION and 'agent' in six.text_type(pred_new): + pred_new = PROV_ATTR_RESPONSIBLE + if ids[id] in [PROV_END, PROV_START] and 'entity' in six.text_type(pred_new): + pred_new = PROV_ATTR_TRIGGER + if ids[id] in [PROV_END] and 'activity' in six.text_type(pred_new): + pred_new = PROV_ATTR_ENDER + if ids[id] in [PROV_START] and 'activity' in six.text_type(pred_new): + pred_new = PROV_ATTR_STARTER + if ids[id] == PROV_DERIVATION and 'entity' in six.text_type(pred_new): + pred_new = PROV_ATTR_USED_ENTITY + if six.text_type(pred_new) in [val.uri for val in formal_attributes[id]]: + qname_key = self.valid_identifier(pred_new) + formal_attributes[id][qname_key] = obj1 + unique_sets[id][qname_key].append(obj1) + if len(unique_sets[id][qname_key]) > 1: + formal_attributes[id][qname_key] = None + else: + if 'qualified' not in six.text_type(pred_new) and \ + 'asInBundle' not in six.text_type(pred_new): + other_attributes[id].append((six.text_type(pred_new), obj1)) + local_key = six.text_type(obj) + if local_key in ids: + if 'qualified' in pred: + formal_attributes[local_key][list(formal_attributes[local_key].keys())[0]] = id + for id in ids: + attrs = None + if id in other_attributes: + attrs = other_attributes[id] + items_to_walk = [] + for qname, values in unique_sets[id].items(): + if values and len(values) > 1: + items_to_walk.append((qname, values)) + if items_to_walk: + for subset in list(walk(items_to_walk)): + for key, value in subset.items(): + formal_attributes[id][key] = value + bundle.new_record(ids[id], id, formal_attributes[id], attrs) + else: + bundle.new_record(ids[id], id, formal_attributes[id], attrs) + ids[id] = None + if attrs is not None: + other_attributes[id] = [] + for key, val in other_attributes.items(): + if val: + ids[key].add_attributes(val) + + +def walk(children, level=0, path=None, usename=True): + """Generate all the full paths in a tree, as a dict. + + :Example: + + >>> from prov.serializers.provrdf import walk + >>> iterables = [('a', lambda: [1, 2]), ('b', lambda: [3, 4])] + >>> [val['a'] for val in walk(iterables)] + [1, 1, 2, 2] + >>> [val['b'] for val in walk(iterables)] + [3, 4, 3, 4] + """ + # Entry point + if level == 0: + path = {} + # Exit condition + if not children: + yield path.copy() + return + # Tree recursion + head, tail = children[0], children[1:] + name, func = head + for child in func: + # We can use the arg name or the tree level as a key + if usename: + path[name] = child + else: + path[level] = child + # Recurse into the next level + for child_paths in walk(tail, level + 1, path, usename): + yield child_paths + + +def literal_rdf_representation(literal): + value = six.text_type(literal.value) if literal.value else literal + if literal.langtag: + # a language tag can only go with prov:InternationalizedString + return RDFLiteral(value, lang=str(literal.langtag)) + else: + datatype = literal.datatype + if 'base64Binary' in datatype.uri: + if six.PY2: + value = base64.standard_b64encode(value) + else: + value = literal.value.encode() + return RDFLiteral(value, datatype=datatype.uri)