Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/cwltool/provenance_profile.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
import copy import datetime import logging import urllib import uuid from io import BytesIO from pathlib import PurePath, PurePosixPath from socket import getfqdn from typing import List, MutableMapping, MutableSequence, Optional, Tuple, Union, cast from prov.identifier import Identifier from prov.model import PROV, PROV_LABEL, PROV_TYPE, PROV_VALUE, ProvDocument, ProvEntity from schema_salad.sourceline import SourceLine from typing_extensions import TYPE_CHECKING from .errors import WorkflowException from .job import CommandLineJob, JobBase from .loghandler import _logger from .process import Process, shortname from .provenance_constants import ( ACCOUNT_UUID, CWLPROV, ENCODING, FOAF, METADATA, ORE, PROVENANCE, RO, SCHEMA, SHA1, SHA256, TEXT_PLAIN, UUID, WF4EVER, WFDESC, WFPROV, ) from .stdfsaccess import StdFsAccess from .utils import ( CWLObjectType, CWLOutputType, JobsType, get_listing, posix_path, versionstring, ) from .workflow_job import WorkflowJob if TYPE_CHECKING: from .provenance import ResearchObject def copy_job_order( job: Union[Process, JobsType], job_order_object: CWLObjectType ) -> CWLObjectType: """Create copy of job object for provenance.""" if not isinstance(job, WorkflowJob): # direct command line tool execution return job_order_object customised_job = {} # type: CWLObjectType # new job object for RO for each, i in enumerate(job.tool["inputs"]): with SourceLine( job.tool["inputs"], each, WorkflowException, _logger.isEnabledFor(logging.DEBUG), ): iid = shortname(i["id"]) if iid in job_order_object: customised_job[iid] = copy.deepcopy(job_order_object[iid]) # add the input element in dictionary for provenance elif "default" in i: customised_job[iid] = copy.deepcopy(i["default"]) # add the default elements in the dictionary for provenance else: pass return customised_job class ProvenanceProfile: """ Provenance profile. Populated as the workflow runs. """ def __init__( self, research_object: "ResearchObject", full_name: str, host_provenance: bool, user_provenance: bool, orcid: str, fsaccess: StdFsAccess, run_uuid: Optional[uuid.UUID] = None, ) -> None: """Initialize the provenance profile.""" self.fsaccess = fsaccess self.orcid = orcid self.research_object = research_object self.folder = self.research_object.folder self.document = ProvDocument() self.host_provenance = host_provenance self.user_provenance = user_provenance self.engine_uuid = research_object.engine_uuid # type: str self.add_to_manifest = self.research_object.add_to_manifest if self.orcid: _logger.debug("[provenance] Creator ORCID: %s", self.orcid) self.full_name = full_name if self.full_name: _logger.debug("[provenance] Creator Full name: %s", self.full_name) self.workflow_run_uuid = run_uuid or uuid.uuid4() self.workflow_run_uri = self.workflow_run_uuid.urn # type: str self.generate_prov_doc() def __str__(self) -> str: """Represent this Provenvance profile as a string.""" return "ProvenanceProfile <{}> in <{}>".format( self.workflow_run_uri, self.research_object, ) def generate_prov_doc(self) -> Tuple[str, ProvDocument]: """Add basic namespaces.""" def host_provenance(document: ProvDocument) -> None: """Record host provenance.""" document.add_namespace(CWLPROV) document.add_namespace(UUID) document.add_namespace(FOAF) hostname = getfqdn() # won't have a foaf:accountServiceHomepage for unix hosts, but # we can at least provide hostname document.agent( ACCOUNT_UUID, { PROV_TYPE: FOAF["OnlineAccount"], "prov:location": hostname, CWLPROV["hostname"]: hostname, }, ) self.cwltool_version = "cwltool %s" % versionstring().split()[-1] self.document.add_namespace("wfprov", "http://purl.org/wf4ever/wfprov#") # document.add_namespace('prov', 'http://www.w3.org/ns/prov#') self.document.add_namespace("wfdesc", "http://purl.org/wf4ever/wfdesc#") # TODO: Make this ontology. For now only has cwlprov:image self.document.add_namespace("cwlprov", "https://w3id.org/cwl/prov#") self.document.add_namespace("foaf", "http://xmlns.com/foaf/0.1/") self.document.add_namespace("schema", "http://schema.org/") self.document.add_namespace("orcid", "https://orcid.org/") self.document.add_namespace("id", "urn:uuid:") # NOTE: Internet draft expired 2004-03-04 (!) # https://tools.ietf.org/html/draft-thiemann-hash-urn-01 # TODO: Change to nih:sha-256; hashes # https://tools.ietf.org/html/rfc6920#section-7 self.document.add_namespace("data", "urn:hash::sha1:") # Also needed for docker images self.document.add_namespace(SHA256, "nih:sha-256;") # info only, won't really be used by prov as sub-resources use / self.document.add_namespace("researchobject", self.research_object.base_uri) # annotations self.metadata_ns = self.document.add_namespace( "metadata", self.research_object.base_uri + METADATA + "/" ) # Pre-register provenance directory so we can refer to its files self.provenance_ns = self.document.add_namespace( "provenance", self.research_object.base_uri + posix_path(PROVENANCE) + "/" ) ro_identifier_workflow = self.research_object.base_uri + "workflow/packed.cwl#" self.wf_ns = self.document.add_namespace("wf", ro_identifier_workflow) ro_identifier_input = ( self.research_object.base_uri + "workflow/primary-job.json#" ) self.document.add_namespace("input", ro_identifier_input) # More info about the account (e.g. username, fullname) # may or may not have been previously logged by user_provenance() # .. but we always know cwltool was launched (directly or indirectly) # by a user account, as cwltool is a command line tool account = self.document.agent(ACCOUNT_UUID) if self.orcid or self.full_name: person = {PROV_TYPE: PROV["Person"], "prov:type": SCHEMA["Person"]} if self.full_name: person["prov:label"] = self.full_name person["foaf:name"] = self.full_name person["schema:name"] = self.full_name else: # TODO: Look up name from ORCID API? pass agent = self.document.agent(self.orcid or uuid.uuid4().urn, person) self.document.actedOnBehalfOf(account, agent) else: if self.host_provenance: host_provenance(self.document) if self.user_provenance: self.research_object.user_provenance(self.document) # The execution of cwltool wfengine = self.document.agent( self.engine_uuid, { PROV_TYPE: PROV["SoftwareAgent"], "prov:type": WFPROV["WorkflowEngine"], "prov:label": self.cwltool_version, }, ) # FIXME: This datetime will be a bit too delayed, we should # capture when cwltool.py earliest started? self.document.wasStartedBy(wfengine, None, account, datetime.datetime.now()) # define workflow run level activity self.document.activity( self.workflow_run_uri, datetime.datetime.now(), None, { PROV_TYPE: WFPROV["WorkflowRun"], "prov:label": "Run of workflow/packed.cwl#main", }, ) # association between SoftwareAgent and WorkflowRun main_workflow = "wf:main" self.document.wasAssociatedWith( self.workflow_run_uri, self.engine_uuid, main_workflow ) self.document.wasStartedBy( self.workflow_run_uri, None, self.engine_uuid, datetime.datetime.now() ) return (self.workflow_run_uri, self.document) def evaluate( self, process: Process, job: JobsType, job_order_object: CWLObjectType, research_obj: "ResearchObject", ) -> None: """Evaluate the nature of job.""" if not hasattr(process, "steps"): # record provenance of independent commandline tool executions self.prospective_prov(job) customised_job = copy_job_order(job, job_order_object) self.used_artefacts(customised_job, self.workflow_run_uri) research_obj.create_job(customised_job) elif hasattr(job, "workflow"): # record provenance of workflow executions self.prospective_prov(job) customised_job = copy_job_order(job, job_order_object) self.used_artefacts(customised_job, self.workflow_run_uri) def record_process_start( self, process: Process, job: JobsType, process_run_id: Optional[str] = None ) -> Optional[str]: if not hasattr(process, "steps"): process_run_id = self.workflow_run_uri elif not hasattr(job, "workflow"): # commandline tool execution as part of workflow name = "" if isinstance(job, (CommandLineJob, JobBase, WorkflowJob)): name = job.name process_name = urllib.parse.quote(name, safe=":/,#") process_run_id = self.start_process(process_name, datetime.datetime.now()) return process_run_id def start_process( self, process_name: str, when: datetime.datetime, process_run_id: Optional[str] = None, ) -> str: """Record the start of each Process.""" if process_run_id is None: process_run_id = uuid.uuid4().urn prov_label = "Run of workflow/packed.cwl#main/" + process_name self.document.activity( process_run_id, None, None, {PROV_TYPE: WFPROV["ProcessRun"], PROV_LABEL: prov_label}, ) self.document.wasAssociatedWith( process_run_id, self.engine_uuid, str("wf:main/" + process_name) ) self.document.wasStartedBy( process_run_id, None, self.workflow_run_uri, when, None, None ) return process_run_id def record_process_end( self, process_name: str, process_run_id: str, outputs: Union[CWLObjectType, MutableSequence[CWLObjectType], None], when: datetime.datetime, ) -> None: self.generate_output_prov(outputs, process_run_id, process_name) self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when) def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, str]: if value["class"] != "File": raise ValueError("Must have class:File: %s" % value) # Need to determine file hash aka RO filename entity = None # type: Optional[ProvEntity] checksum = None if "checksum" in value: csum = cast(str, value["checksum"]) (method, checksum) = csum.split("$", 1) if method == SHA1 and self.research_object.has_data_file(checksum): entity = self.document.entity("data:" + checksum) if not entity and "location" in value: location = str(value["location"]) # If we made it here, we'll have to add it to the RO with self.fsaccess.open(location, "rb") as fhandle: relative_path = self.research_object.add_data_file(fhandle) # FIXME: This naively relies on add_data_file setting hash as filename checksum = PurePath(relative_path).name entity = self.document.entity( "data:" + checksum, {PROV_TYPE: WFPROV["Artifact"]} ) if "checksum" not in value: value["checksum"] = f"{SHA1}${checksum}" if not entity and "contents" in value: # Anonymous file, add content as string entity, checksum = self.declare_string(cast(str, value["contents"])) # By here one of them should have worked! if not entity or not checksum: raise ValueError( "class:File but missing checksum/location/content: %r" % value ) # Track filename and extension, this is generally useful only for # secondaryFiles. Note that multiple uses of a file might thus record # different names for the same entity, so we'll # make/track a specialized entity by UUID file_id = value.setdefault("@id", uuid.uuid4().urn) # A specialized entity that has just these names file_entity = self.document.entity( file_id, [(PROV_TYPE, WFPROV["Artifact"]), (PROV_TYPE, WF4EVER["File"])], ) # type: ProvEntity if "basename" in value: file_entity.add_attributes({CWLPROV["basename"]: value["basename"]}) if "nameroot" in value: file_entity.add_attributes({CWLPROV["nameroot"]: value["nameroot"]}) if "nameext" in value: file_entity.add_attributes({CWLPROV["nameext"]: value["nameext"]}) self.document.specializationOf(file_entity, entity) # Check for secondaries for sec in cast( MutableSequence[CWLObjectType], value.get("secondaryFiles", []) ): # TODO: Record these in a specializationOf entity with UUID? if sec["class"] == "File": (sec_entity, _, _) = self.declare_file(sec) elif sec["class"] == "Directory": sec_entity = self.declare_directory(sec) else: raise ValueError(f"Got unexpected secondaryFiles value: {sec}") # We don't know how/when/where the secondary file was generated, # but CWL convention is a kind of summary/index derived # from the original file. As its generally in a different format # then prov:Quotation is not appropriate. self.document.derivation( sec_entity, file_entity, other_attributes={PROV["type"]: CWLPROV["SecondaryFile"]}, ) return file_entity, entity, checksum def declare_directory(self, value: CWLObjectType) -> ProvEntity: """Register any nested files/directories.""" # FIXME: Calculate a hash-like identifier for directory # so we get same value if it's the same filenames/hashes # in a different location. # For now, mint a new UUID to identify this directory, but # attempt to keep it inside the value dictionary dir_id = cast(str, value.setdefault("@id", uuid.uuid4().urn)) # New annotation file to keep the ORE Folder listing ore_doc_fn = dir_id.replace("urn:uuid:", "directory-") + ".ttl" dir_bundle = self.document.bundle(self.metadata_ns[ore_doc_fn]) coll = self.document.entity( dir_id, [ (PROV_TYPE, WFPROV["Artifact"]), (PROV_TYPE, PROV["Collection"]), (PROV_TYPE, PROV["Dictionary"]), (PROV_TYPE, RO["Folder"]), ], ) # ORE description of ro:Folder, saved separately coll_b = dir_bundle.entity( dir_id, [(PROV_TYPE, RO["Folder"]), (PROV_TYPE, ORE["Aggregation"])], ) self.document.mentionOf(dir_id + "#ore", dir_id, dir_bundle.identifier) # dir_manifest = dir_bundle.entity( # dir_bundle.identifier, {PROV["type"]: ORE["ResourceMap"], # ORE["describes"]: coll_b.identifier}) coll_attribs = [(ORE["isDescribedBy"], dir_bundle.identifier)] coll_b_attribs = [] # type: List[Tuple[Identifier, ProvEntity]] # FIXME: .listing might not be populated yet - hopefully # a later call to this method will sort that is_empty = True if "listing" not in value: get_listing(self.fsaccess, value) for entry in cast(MutableSequence[CWLObjectType], value.get("listing", [])): is_empty = False # Declare child-artifacts entity = self.declare_artefact(entry) self.document.membership(coll, entity) # Membership relation aka our ORE Proxy m_id = uuid.uuid4().urn m_entity = self.document.entity(m_id) m_b = dir_bundle.entity(m_id) # PROV-O style Dictionary # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition # ..as prov.py do not currently allow PROV-N extensions # like hadDictionaryMember(..) m_entity.add_asserted_type(PROV["KeyEntityPair"]) m_entity.add_attributes( { PROV["pairKey"]: entry["basename"], PROV["pairEntity"]: entity, } ) # As well as a being a # http://wf4ever.github.io/ro/2016-01-28/ro/#FolderEntry m_b.add_asserted_type(RO["FolderEntry"]) m_b.add_asserted_type(ORE["Proxy"]) m_b.add_attributes( { RO["entryName"]: entry["basename"], ORE["proxyIn"]: coll, ORE["proxyFor"]: entity, } ) coll_attribs.append((PROV["hadDictionaryMember"], m_entity)) coll_b_attribs.append((ORE["aggregates"], m_b)) coll.add_attributes(coll_attribs) coll_b.add_attributes(coll_b_attribs) # Also Save ORE Folder as annotation metadata ore_doc = ProvDocument() ore_doc.add_namespace(ORE) ore_doc.add_namespace(RO) ore_doc.add_namespace(UUID) ore_doc.add_bundle(dir_bundle) ore_doc = ore_doc.flattened() ore_doc_path = str(PurePosixPath(METADATA, ore_doc_fn)) with self.research_object.write_bag_file(ore_doc_path) as provenance_file: ore_doc.serialize(provenance_file, format="rdf", rdf_format="turtle") self.research_object.add_annotation( dir_id, [ore_doc_fn], ORE["isDescribedBy"].uri ) if is_empty: # Empty directory coll.add_asserted_type(PROV["EmptyCollection"]) coll.add_asserted_type(PROV["EmptyDictionary"]) self.research_object.add_uri(coll.identifier.uri) return coll def declare_string(self, value: str) -> Tuple[ProvEntity, str]: """Save as string in UTF-8.""" byte_s = BytesIO(str(value).encode(ENCODING)) data_file = self.research_object.add_data_file(byte_s, content_type=TEXT_PLAIN) checksum = PurePosixPath(data_file).name # FIXME: Don't naively assume add_data_file uses hash in filename! data_id = "data:%s" % PurePosixPath(data_file).stem entity = self.document.entity( data_id, {PROV_TYPE: WFPROV["Artifact"], PROV_VALUE: str(value)} ) # type: ProvEntity return entity, checksum def declare_artefact(self, value: Optional[CWLOutputType]) -> ProvEntity: """Create data artefact entities for all file objects.""" if value is None: # FIXME: If this can happen in CWL, we'll # need a better way to represent this in PROV return self.document.entity(CWLPROV["None"], {PROV_LABEL: "None"}) if isinstance(value, (bool, int, float)): # Typically used in job documents for flags # FIXME: Make consistent hash URIs for these # that somehow include the type # (so "1" != 1 != "1.0" != true) entity = self.document.entity(uuid.uuid4().urn, {PROV_VALUE: value}) self.research_object.add_uri(entity.identifier.uri) return entity if isinstance(value, (str, str)): (entity, _) = self.declare_string(value) return entity if isinstance(value, bytes): # If we got here then we must be in Python 3 byte_s = BytesIO(value) data_file = self.research_object.add_data_file(byte_s) # FIXME: Don't naively assume add_data_file uses hash in filename! data_id = "data:%s" % PurePosixPath(data_file).stem return self.document.entity( data_id, {PROV_TYPE: WFPROV["Artifact"], PROV_VALUE: str(value)}, ) if isinstance(value, MutableMapping): if "@id" in value: # Already processed this value, but it might not be in this PROV entities = self.document.get_record(value["@id"]) if entities: return entities[0] # else, unknown in PROV, re-add below as if it's fresh # Base case - we found a File we need to update if value.get("class") == "File": (entity, _, _) = self.declare_file(value) value["@id"] = entity.identifier.uri return entity if value.get("class") == "Directory": entity = self.declare_directory(value) value["@id"] = entity.identifier.uri return entity coll_id = value.setdefault("@id", uuid.uuid4().urn) # some other kind of dictionary? # TODO: also Save as JSON coll = self.document.entity( coll_id, [ (PROV_TYPE, WFPROV["Artifact"]), (PROV_TYPE, PROV["Collection"]), (PROV_TYPE, PROV["Dictionary"]), ], ) if value.get("class"): _logger.warning("Unknown data class %s.", value["class"]) # FIXME: The class might be "http://example.com/somethingelse" coll.add_asserted_type(CWLPROV[value["class"]]) # Let's iterate and recurse coll_attribs = [] # type: List[Tuple[Identifier, ProvEntity]] for (key, val) in value.items(): v_ent = self.declare_artefact(val) self.document.membership(coll, v_ent) m_entity = self.document.entity(uuid.uuid4().urn) # Note: only support PROV-O style dictionary # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition # as prov.py do not easily allow PROV-N extensions m_entity.add_asserted_type(PROV["KeyEntityPair"]) m_entity.add_attributes( {PROV["pairKey"]: str(key), PROV["pairEntity"]: v_ent} ) coll_attribs.append((PROV["hadDictionaryMember"], m_entity)) coll.add_attributes(coll_attribs) self.research_object.add_uri(coll.identifier.uri) return coll # some other kind of Collection? # TODO: also save as JSON try: members = [] for each_input_obj in iter(value): # Recurse and register any nested objects e = self.declare_artefact(each_input_obj) members.append(e) # If we reached this, then we were allowed to iterate coll = self.document.entity( uuid.uuid4().urn, [ (PROV_TYPE, WFPROV["Artifact"]), (PROV_TYPE, PROV["Collection"]), ], ) if not members: coll.add_asserted_type(PROV["EmptyCollection"]) else: for member in members: # FIXME: This won't preserve order, for that # we would need to use PROV.Dictionary # with numeric keys self.document.membership(coll, member) self.research_object.add_uri(coll.identifier.uri) # FIXME: list value does not support adding "@id" return coll except TypeError: _logger.warning("Unrecognized type %s of %r", type(value), value) # Let's just fall back to Python repr() entity = self.document.entity(uuid.uuid4().urn, {PROV_LABEL: repr(value)}) self.research_object.add_uri(entity.identifier.uri) return entity def used_artefacts( self, job_order: Union[CWLObjectType, List[CWLObjectType]], process_run_id: str, name: Optional[str] = None, ) -> None: """Add used() for each data artefact.""" if isinstance(job_order, list): for entry in job_order: self.used_artefacts(entry, process_run_id, name) else: # FIXME: Use workflow name in packed.cwl, "main" is wrong for nested workflows base = "main" if name is not None: base += "/" + name for key, value in job_order.items(): prov_role = self.wf_ns[f"{base}/{key}"] try: entity = self.declare_artefact(value) self.document.used( process_run_id, entity, datetime.datetime.now(), None, {"prov:role": prov_role}, ) except OSError: pass def generate_output_prov( self, final_output: Union[CWLObjectType, MutableSequence[CWLObjectType], None], process_run_id: Optional[str], name: Optional[str], ) -> None: """Call wasGeneratedBy() for each output,copy the files into the RO.""" if isinstance(final_output, MutableSequence): for entry in final_output: self.generate_output_prov(entry, process_run_id, name) elif final_output is not None: # Timestamp should be created at the earliest timestamp = datetime.datetime.now() # For each output, find/register the corresponding # entity (UUID) and document it as generated in # a role corresponding to the output for output, value in final_output.items(): entity = self.declare_artefact(value) if name is not None: name = urllib.parse.quote(str(name), safe=":/,#") # FIXME: Probably not "main" in nested workflows role = self.wf_ns[f"main/{name}/{output}"] else: role = self.wf_ns["main/%s" % output] if not process_run_id: process_run_id = self.workflow_run_uri self.document.wasGeneratedBy( entity, process_run_id, timestamp, None, {"prov:role": role} ) def prospective_prov(self, job: JobsType) -> None: """Create prospective prov recording as wfdesc prov:Plan.""" if not isinstance(job, WorkflowJob): # direct command line tool execution self.document.entity( "wf:main", { PROV_TYPE: WFDESC["Process"], "prov:type": PROV["Plan"], "prov:label": "Prospective provenance", }, ) return self.document.entity( "wf:main", { PROV_TYPE: WFDESC["Workflow"], "prov:type": PROV["Plan"], "prov:label": "Prospective provenance", }, ) for step in job.steps: stepnametemp = "wf:main/" + str(step.name)[5:] stepname = urllib.parse.quote(stepnametemp, safe=":/,#") provstep = self.document.entity( stepname, {PROV_TYPE: WFDESC["Process"], "prov:type": PROV["Plan"]}, ) self.document.entity( "wf:main", { "wfdesc:hasSubProcess": provstep, "prov:label": "Prospective provenance", }, ) # TODO: Declare roles/parameters as well def activity_has_provenance(self, activity, prov_ids): # type: (str, List[Identifier]) -> None """Add http://www.w3.org/TR/prov-aq/ relations to nested PROV files.""" # NOTE: The below will only work if the corresponding metadata/provenance arcp URI # is a pre-registered namespace in the PROV Document attribs = [(PROV["has_provenance"], prov_id) for prov_id in prov_ids] self.document.activity(activity, other_attributes=attribs) # Tip: we can't use https://www.w3.org/TR/prov-links/#term-mention # as prov:mentionOf() is only for entities, not activities uris = [i.uri for i in prov_ids] self.research_object.add_annotation(activity, uris, PROV["has_provenance"].uri) def finalize_prov_profile(self, name): # type: (Optional[str]) -> List[Identifier] """Transfer the provenance related files to the RO.""" # NOTE: Relative posix path if name is None: # main workflow, fixed filenames filename = "primary.cwlprov" else: # ASCII-friendly filename, avoiding % as we don't want %2520 in manifest.json wf_name = urllib.parse.quote(str(name), safe="").replace("%", "_") # Note that the above could cause overlaps for similarly named # workflows, but that's OK as we'll also include run uuid # which also covers thhe case of this step being run in # multiple places or iterations filename = f"{wf_name}.{self.workflow_run_uuid}.cwlprov" basename = str(PurePosixPath(PROVENANCE) / filename) # TODO: Also support other profiles than CWLProv, e.g. ProvOne # list of prov identifiers of provenance files prov_ids = [] # https://www.w3.org/TR/prov-xml/ with self.research_object.write_bag_file(basename + ".xml") as provenance_file: self.document.serialize(provenance_file, format="xml", indent=4) prov_ids.append(self.provenance_ns[filename + ".xml"]) # https://www.w3.org/TR/prov-n/ with self.research_object.write_bag_file( basename + ".provn" ) as provenance_file: self.document.serialize(provenance_file, format="provn", indent=2) prov_ids.append(self.provenance_ns[filename + ".provn"]) # https://www.w3.org/Submission/prov-json/ with self.research_object.write_bag_file(basename + ".json") as provenance_file: self.document.serialize(provenance_file, format="json", indent=2) prov_ids.append(self.provenance_ns[filename + ".json"]) # "rdf" aka https://www.w3.org/TR/prov-o/ # which can be serialized to ttl/nt/jsonld (and more!) # https://www.w3.org/TR/turtle/ with self.research_object.write_bag_file(basename + ".ttl") as provenance_file: self.document.serialize(provenance_file, format="rdf", rdf_format="turtle") prov_ids.append(self.provenance_ns[filename + ".ttl"]) # https://www.w3.org/TR/n-triples/ with self.research_object.write_bag_file(basename + ".nt") as provenance_file: self.document.serialize( provenance_file, format="rdf", rdf_format="ntriples" ) prov_ids.append(self.provenance_ns[filename + ".nt"]) # https://www.w3.org/TR/json-ld/ # TODO: Use a nice JSON-LD context # see also https://eprints.soton.ac.uk/395985/ # 404 Not Found on https://provenance.ecs.soton.ac.uk/prov.jsonld :( with self.research_object.write_bag_file( basename + ".jsonld" ) as provenance_file: self.document.serialize(provenance_file, format="rdf", rdf_format="json-ld") prov_ids.append(self.provenance_ns[filename + ".jsonld"]) _logger.debug("[provenance] added provenance: %s", prov_ids) return prov_ids