Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/cwltool/provenance_profile.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/cwltool/provenance_profile.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,792 @@ +import copy +import datetime +import logging +import urllib +import uuid +from io import BytesIO +from pathlib import PurePath, PurePosixPath +from socket import getfqdn +from typing import List, MutableMapping, MutableSequence, Optional, Tuple, Union, cast + +from prov.identifier import Identifier +from prov.model import PROV, PROV_LABEL, PROV_TYPE, PROV_VALUE, ProvDocument, ProvEntity +from schema_salad.sourceline import SourceLine +from typing_extensions import TYPE_CHECKING + +from .errors import WorkflowException +from .job import CommandLineJob, JobBase +from .loghandler import _logger +from .process import Process, shortname +from .provenance_constants import ( + ACCOUNT_UUID, + CWLPROV, + ENCODING, + FOAF, + METADATA, + ORE, + PROVENANCE, + RO, + SCHEMA, + SHA1, + SHA256, + TEXT_PLAIN, + UUID, + WF4EVER, + WFDESC, + WFPROV, +) +from .stdfsaccess import StdFsAccess +from .utils import ( + CWLObjectType, + CWLOutputType, + JobsType, + get_listing, + posix_path, + versionstring, +) +from .workflow_job import WorkflowJob + +if TYPE_CHECKING: + from .provenance import ResearchObject + + +def copy_job_order( + job: Union[Process, JobsType], job_order_object: CWLObjectType +) -> CWLObjectType: + """Create copy of job object for provenance.""" + if not isinstance(job, WorkflowJob): + # direct command line tool execution + return job_order_object + customised_job = {} # type: CWLObjectType + # new job object for RO + for each, i in enumerate(job.tool["inputs"]): + with SourceLine( + job.tool["inputs"], + each, + WorkflowException, + _logger.isEnabledFor(logging.DEBUG), + ): + iid = shortname(i["id"]) + if iid in job_order_object: + customised_job[iid] = copy.deepcopy(job_order_object[iid]) + # add the input element in dictionary for provenance + elif "default" in i: + customised_job[iid] = copy.deepcopy(i["default"]) + # add the default elements in the dictionary for provenance + else: + pass + return customised_job + + +class ProvenanceProfile: + """ + Provenance profile. + + Populated as the workflow runs. + """ + + def __init__( + self, + research_object: "ResearchObject", + full_name: str, + host_provenance: bool, + user_provenance: bool, + orcid: str, + fsaccess: StdFsAccess, + run_uuid: Optional[uuid.UUID] = None, + ) -> None: + """Initialize the provenance profile.""" + self.fsaccess = fsaccess + self.orcid = orcid + self.research_object = research_object + self.folder = self.research_object.folder + self.document = ProvDocument() + self.host_provenance = host_provenance + self.user_provenance = user_provenance + self.engine_uuid = research_object.engine_uuid # type: str + self.add_to_manifest = self.research_object.add_to_manifest + if self.orcid: + _logger.debug("[provenance] Creator ORCID: %s", self.orcid) + self.full_name = full_name + if self.full_name: + _logger.debug("[provenance] Creator Full name: %s", self.full_name) + self.workflow_run_uuid = run_uuid or uuid.uuid4() + self.workflow_run_uri = self.workflow_run_uuid.urn # type: str + self.generate_prov_doc() + + def __str__(self) -> str: + """Represent this Provenvance profile as a string.""" + return "ProvenanceProfile <{}> in <{}>".format( + self.workflow_run_uri, + self.research_object, + ) + + def generate_prov_doc(self) -> Tuple[str, ProvDocument]: + """Add basic namespaces.""" + + def host_provenance(document: ProvDocument) -> None: + """Record host provenance.""" + document.add_namespace(CWLPROV) + document.add_namespace(UUID) + document.add_namespace(FOAF) + + hostname = getfqdn() + # won't have a foaf:accountServiceHomepage for unix hosts, but + # we can at least provide hostname + document.agent( + ACCOUNT_UUID, + { + PROV_TYPE: FOAF["OnlineAccount"], + "prov:location": hostname, + CWLPROV["hostname"]: hostname, + }, + ) + + self.cwltool_version = "cwltool %s" % versionstring().split()[-1] + self.document.add_namespace("wfprov", "http://purl.org/wf4ever/wfprov#") + # document.add_namespace('prov', 'http://www.w3.org/ns/prov#') + self.document.add_namespace("wfdesc", "http://purl.org/wf4ever/wfdesc#") + # TODO: Make this ontology. For now only has cwlprov:image + self.document.add_namespace("cwlprov", "https://w3id.org/cwl/prov#") + self.document.add_namespace("foaf", "http://xmlns.com/foaf/0.1/") + self.document.add_namespace("schema", "http://schema.org/") + self.document.add_namespace("orcid", "https://orcid.org/") + self.document.add_namespace("id", "urn:uuid:") + # NOTE: Internet draft expired 2004-03-04 (!) + # https://tools.ietf.org/html/draft-thiemann-hash-urn-01 + # TODO: Change to nih:sha-256; hashes + # https://tools.ietf.org/html/rfc6920#section-7 + self.document.add_namespace("data", "urn:hash::sha1:") + # Also needed for docker images + self.document.add_namespace(SHA256, "nih:sha-256;") + + # info only, won't really be used by prov as sub-resources use / + self.document.add_namespace("researchobject", self.research_object.base_uri) + # annotations + self.metadata_ns = self.document.add_namespace( + "metadata", self.research_object.base_uri + METADATA + "/" + ) + # Pre-register provenance directory so we can refer to its files + self.provenance_ns = self.document.add_namespace( + "provenance", self.research_object.base_uri + posix_path(PROVENANCE) + "/" + ) + ro_identifier_workflow = self.research_object.base_uri + "workflow/packed.cwl#" + self.wf_ns = self.document.add_namespace("wf", ro_identifier_workflow) + ro_identifier_input = ( + self.research_object.base_uri + "workflow/primary-job.json#" + ) + self.document.add_namespace("input", ro_identifier_input) + + # More info about the account (e.g. username, fullname) + # may or may not have been previously logged by user_provenance() + # .. but we always know cwltool was launched (directly or indirectly) + # by a user account, as cwltool is a command line tool + account = self.document.agent(ACCOUNT_UUID) + if self.orcid or self.full_name: + person = {PROV_TYPE: PROV["Person"], "prov:type": SCHEMA["Person"]} + if self.full_name: + person["prov:label"] = self.full_name + person["foaf:name"] = self.full_name + person["schema:name"] = self.full_name + else: + # TODO: Look up name from ORCID API? + pass + agent = self.document.agent(self.orcid or uuid.uuid4().urn, person) + self.document.actedOnBehalfOf(account, agent) + else: + if self.host_provenance: + host_provenance(self.document) + if self.user_provenance: + self.research_object.user_provenance(self.document) + # The execution of cwltool + wfengine = self.document.agent( + self.engine_uuid, + { + PROV_TYPE: PROV["SoftwareAgent"], + "prov:type": WFPROV["WorkflowEngine"], + "prov:label": self.cwltool_version, + }, + ) + # FIXME: This datetime will be a bit too delayed, we should + # capture when cwltool.py earliest started? + self.document.wasStartedBy(wfengine, None, account, datetime.datetime.now()) + # define workflow run level activity + self.document.activity( + self.workflow_run_uri, + datetime.datetime.now(), + None, + { + PROV_TYPE: WFPROV["WorkflowRun"], + "prov:label": "Run of workflow/packed.cwl#main", + }, + ) + # association between SoftwareAgent and WorkflowRun + main_workflow = "wf:main" + self.document.wasAssociatedWith( + self.workflow_run_uri, self.engine_uuid, main_workflow + ) + self.document.wasStartedBy( + self.workflow_run_uri, None, self.engine_uuid, datetime.datetime.now() + ) + return (self.workflow_run_uri, self.document) + + def evaluate( + self, + process: Process, + job: JobsType, + job_order_object: CWLObjectType, + research_obj: "ResearchObject", + ) -> None: + """Evaluate the nature of job.""" + if not hasattr(process, "steps"): + # record provenance of independent commandline tool executions + self.prospective_prov(job) + customised_job = copy_job_order(job, job_order_object) + self.used_artefacts(customised_job, self.workflow_run_uri) + research_obj.create_job(customised_job) + elif hasattr(job, "workflow"): + # record provenance of workflow executions + self.prospective_prov(job) + customised_job = copy_job_order(job, job_order_object) + self.used_artefacts(customised_job, self.workflow_run_uri) + + def record_process_start( + self, process: Process, job: JobsType, process_run_id: Optional[str] = None + ) -> Optional[str]: + if not hasattr(process, "steps"): + process_run_id = self.workflow_run_uri + elif not hasattr(job, "workflow"): + # commandline tool execution as part of workflow + name = "" + if isinstance(job, (CommandLineJob, JobBase, WorkflowJob)): + name = job.name + process_name = urllib.parse.quote(name, safe=":/,#") + process_run_id = self.start_process(process_name, datetime.datetime.now()) + return process_run_id + + def start_process( + self, + process_name: str, + when: datetime.datetime, + process_run_id: Optional[str] = None, + ) -> str: + """Record the start of each Process.""" + if process_run_id is None: + process_run_id = uuid.uuid4().urn + prov_label = "Run of workflow/packed.cwl#main/" + process_name + self.document.activity( + process_run_id, + None, + None, + {PROV_TYPE: WFPROV["ProcessRun"], PROV_LABEL: prov_label}, + ) + self.document.wasAssociatedWith( + process_run_id, self.engine_uuid, str("wf:main/" + process_name) + ) + self.document.wasStartedBy( + process_run_id, None, self.workflow_run_uri, when, None, None + ) + return process_run_id + + def record_process_end( + self, + process_name: str, + process_run_id: str, + outputs: Union[CWLObjectType, MutableSequence[CWLObjectType], None], + when: datetime.datetime, + ) -> None: + self.generate_output_prov(outputs, process_run_id, process_name) + self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when) + + def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, str]: + if value["class"] != "File": + raise ValueError("Must have class:File: %s" % value) + # Need to determine file hash aka RO filename + entity = None # type: Optional[ProvEntity] + checksum = None + if "checksum" in value: + csum = cast(str, value["checksum"]) + (method, checksum) = csum.split("$", 1) + if method == SHA1 and self.research_object.has_data_file(checksum): + entity = self.document.entity("data:" + checksum) + + if not entity and "location" in value: + location = str(value["location"]) + # If we made it here, we'll have to add it to the RO + with self.fsaccess.open(location, "rb") as fhandle: + relative_path = self.research_object.add_data_file(fhandle) + # FIXME: This naively relies on add_data_file setting hash as filename + checksum = PurePath(relative_path).name + entity = self.document.entity( + "data:" + checksum, {PROV_TYPE: WFPROV["Artifact"]} + ) + if "checksum" not in value: + value["checksum"] = f"{SHA1}${checksum}" + + if not entity and "contents" in value: + # Anonymous file, add content as string + entity, checksum = self.declare_string(cast(str, value["contents"])) + + # By here one of them should have worked! + if not entity or not checksum: + raise ValueError( + "class:File but missing checksum/location/content: %r" % value + ) + + # Track filename and extension, this is generally useful only for + # secondaryFiles. Note that multiple uses of a file might thus record + # different names for the same entity, so we'll + # make/track a specialized entity by UUID + file_id = value.setdefault("@id", uuid.uuid4().urn) + # A specialized entity that has just these names + file_entity = self.document.entity( + file_id, + [(PROV_TYPE, WFPROV["Artifact"]), (PROV_TYPE, WF4EVER["File"])], + ) # type: ProvEntity + + if "basename" in value: + file_entity.add_attributes({CWLPROV["basename"]: value["basename"]}) + if "nameroot" in value: + file_entity.add_attributes({CWLPROV["nameroot"]: value["nameroot"]}) + if "nameext" in value: + file_entity.add_attributes({CWLPROV["nameext"]: value["nameext"]}) + self.document.specializationOf(file_entity, entity) + + # Check for secondaries + for sec in cast( + MutableSequence[CWLObjectType], value.get("secondaryFiles", []) + ): + # TODO: Record these in a specializationOf entity with UUID? + if sec["class"] == "File": + (sec_entity, _, _) = self.declare_file(sec) + elif sec["class"] == "Directory": + sec_entity = self.declare_directory(sec) + else: + raise ValueError(f"Got unexpected secondaryFiles value: {sec}") + # We don't know how/when/where the secondary file was generated, + # but CWL convention is a kind of summary/index derived + # from the original file. As its generally in a different format + # then prov:Quotation is not appropriate. + self.document.derivation( + sec_entity, + file_entity, + other_attributes={PROV["type"]: CWLPROV["SecondaryFile"]}, + ) + + return file_entity, entity, checksum + + def declare_directory(self, value: CWLObjectType) -> ProvEntity: + """Register any nested files/directories.""" + # FIXME: Calculate a hash-like identifier for directory + # so we get same value if it's the same filenames/hashes + # in a different location. + # For now, mint a new UUID to identify this directory, but + # attempt to keep it inside the value dictionary + dir_id = cast(str, value.setdefault("@id", uuid.uuid4().urn)) + + # New annotation file to keep the ORE Folder listing + ore_doc_fn = dir_id.replace("urn:uuid:", "directory-") + ".ttl" + dir_bundle = self.document.bundle(self.metadata_ns[ore_doc_fn]) + + coll = self.document.entity( + dir_id, + [ + (PROV_TYPE, WFPROV["Artifact"]), + (PROV_TYPE, PROV["Collection"]), + (PROV_TYPE, PROV["Dictionary"]), + (PROV_TYPE, RO["Folder"]), + ], + ) + # ORE description of ro:Folder, saved separately + coll_b = dir_bundle.entity( + dir_id, + [(PROV_TYPE, RO["Folder"]), (PROV_TYPE, ORE["Aggregation"])], + ) + self.document.mentionOf(dir_id + "#ore", dir_id, dir_bundle.identifier) + + # dir_manifest = dir_bundle.entity( + # dir_bundle.identifier, {PROV["type"]: ORE["ResourceMap"], + # ORE["describes"]: coll_b.identifier}) + + coll_attribs = [(ORE["isDescribedBy"], dir_bundle.identifier)] + coll_b_attribs = [] # type: List[Tuple[Identifier, ProvEntity]] + + # FIXME: .listing might not be populated yet - hopefully + # a later call to this method will sort that + is_empty = True + + if "listing" not in value: + get_listing(self.fsaccess, value) + for entry in cast(MutableSequence[CWLObjectType], value.get("listing", [])): + is_empty = False + # Declare child-artifacts + entity = self.declare_artefact(entry) + self.document.membership(coll, entity) + # Membership relation aka our ORE Proxy + m_id = uuid.uuid4().urn + m_entity = self.document.entity(m_id) + m_b = dir_bundle.entity(m_id) + + # PROV-O style Dictionary + # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition + # ..as prov.py do not currently allow PROV-N extensions + # like hadDictionaryMember(..) + m_entity.add_asserted_type(PROV["KeyEntityPair"]) + + m_entity.add_attributes( + { + PROV["pairKey"]: entry["basename"], + PROV["pairEntity"]: entity, + } + ) + + # As well as a being a + # http://wf4ever.github.io/ro/2016-01-28/ro/#FolderEntry + m_b.add_asserted_type(RO["FolderEntry"]) + m_b.add_asserted_type(ORE["Proxy"]) + m_b.add_attributes( + { + RO["entryName"]: entry["basename"], + ORE["proxyIn"]: coll, + ORE["proxyFor"]: entity, + } + ) + coll_attribs.append((PROV["hadDictionaryMember"], m_entity)) + coll_b_attribs.append((ORE["aggregates"], m_b)) + + coll.add_attributes(coll_attribs) + coll_b.add_attributes(coll_b_attribs) + + # Also Save ORE Folder as annotation metadata + ore_doc = ProvDocument() + ore_doc.add_namespace(ORE) + ore_doc.add_namespace(RO) + ore_doc.add_namespace(UUID) + ore_doc.add_bundle(dir_bundle) + ore_doc = ore_doc.flattened() + ore_doc_path = str(PurePosixPath(METADATA, ore_doc_fn)) + with self.research_object.write_bag_file(ore_doc_path) as provenance_file: + ore_doc.serialize(provenance_file, format="rdf", rdf_format="turtle") + self.research_object.add_annotation( + dir_id, [ore_doc_fn], ORE["isDescribedBy"].uri + ) + + if is_empty: + # Empty directory + coll.add_asserted_type(PROV["EmptyCollection"]) + coll.add_asserted_type(PROV["EmptyDictionary"]) + self.research_object.add_uri(coll.identifier.uri) + return coll + + def declare_string(self, value: str) -> Tuple[ProvEntity, str]: + """Save as string in UTF-8.""" + byte_s = BytesIO(str(value).encode(ENCODING)) + data_file = self.research_object.add_data_file(byte_s, content_type=TEXT_PLAIN) + checksum = PurePosixPath(data_file).name + # FIXME: Don't naively assume add_data_file uses hash in filename! + data_id = "data:%s" % PurePosixPath(data_file).stem + entity = self.document.entity( + data_id, {PROV_TYPE: WFPROV["Artifact"], PROV_VALUE: str(value)} + ) # type: ProvEntity + return entity, checksum + + def declare_artefact(self, value: Optional[CWLOutputType]) -> ProvEntity: + """Create data artefact entities for all file objects.""" + if value is None: + # FIXME: If this can happen in CWL, we'll + # need a better way to represent this in PROV + return self.document.entity(CWLPROV["None"], {PROV_LABEL: "None"}) + + if isinstance(value, (bool, int, float)): + # Typically used in job documents for flags + + # FIXME: Make consistent hash URIs for these + # that somehow include the type + # (so "1" != 1 != "1.0" != true) + entity = self.document.entity(uuid.uuid4().urn, {PROV_VALUE: value}) + self.research_object.add_uri(entity.identifier.uri) + return entity + + if isinstance(value, (str, str)): + (entity, _) = self.declare_string(value) + return entity + + if isinstance(value, bytes): + # If we got here then we must be in Python 3 + byte_s = BytesIO(value) + data_file = self.research_object.add_data_file(byte_s) + # FIXME: Don't naively assume add_data_file uses hash in filename! + data_id = "data:%s" % PurePosixPath(data_file).stem + return self.document.entity( + data_id, + {PROV_TYPE: WFPROV["Artifact"], PROV_VALUE: str(value)}, + ) + + if isinstance(value, MutableMapping): + if "@id" in value: + # Already processed this value, but it might not be in this PROV + entities = self.document.get_record(value["@id"]) + if entities: + return entities[0] + # else, unknown in PROV, re-add below as if it's fresh + + # Base case - we found a File we need to update + if value.get("class") == "File": + (entity, _, _) = self.declare_file(value) + value["@id"] = entity.identifier.uri + return entity + + if value.get("class") == "Directory": + entity = self.declare_directory(value) + value["@id"] = entity.identifier.uri + return entity + coll_id = value.setdefault("@id", uuid.uuid4().urn) + # some other kind of dictionary? + # TODO: also Save as JSON + coll = self.document.entity( + coll_id, + [ + (PROV_TYPE, WFPROV["Artifact"]), + (PROV_TYPE, PROV["Collection"]), + (PROV_TYPE, PROV["Dictionary"]), + ], + ) + + if value.get("class"): + _logger.warning("Unknown data class %s.", value["class"]) + # FIXME: The class might be "http://example.com/somethingelse" + coll.add_asserted_type(CWLPROV[value["class"]]) + + # Let's iterate and recurse + coll_attribs = [] # type: List[Tuple[Identifier, ProvEntity]] + for (key, val) in value.items(): + v_ent = self.declare_artefact(val) + self.document.membership(coll, v_ent) + m_entity = self.document.entity(uuid.uuid4().urn) + # Note: only support PROV-O style dictionary + # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition + # as prov.py do not easily allow PROV-N extensions + m_entity.add_asserted_type(PROV["KeyEntityPair"]) + m_entity.add_attributes( + {PROV["pairKey"]: str(key), PROV["pairEntity"]: v_ent} + ) + coll_attribs.append((PROV["hadDictionaryMember"], m_entity)) + coll.add_attributes(coll_attribs) + self.research_object.add_uri(coll.identifier.uri) + return coll + + # some other kind of Collection? + # TODO: also save as JSON + try: + members = [] + for each_input_obj in iter(value): + # Recurse and register any nested objects + e = self.declare_artefact(each_input_obj) + members.append(e) + + # If we reached this, then we were allowed to iterate + coll = self.document.entity( + uuid.uuid4().urn, + [ + (PROV_TYPE, WFPROV["Artifact"]), + (PROV_TYPE, PROV["Collection"]), + ], + ) + if not members: + coll.add_asserted_type(PROV["EmptyCollection"]) + else: + for member in members: + # FIXME: This won't preserve order, for that + # we would need to use PROV.Dictionary + # with numeric keys + self.document.membership(coll, member) + self.research_object.add_uri(coll.identifier.uri) + # FIXME: list value does not support adding "@id" + return coll + except TypeError: + _logger.warning("Unrecognized type %s of %r", type(value), value) + # Let's just fall back to Python repr() + entity = self.document.entity(uuid.uuid4().urn, {PROV_LABEL: repr(value)}) + self.research_object.add_uri(entity.identifier.uri) + return entity + + def used_artefacts( + self, + job_order: Union[CWLObjectType, List[CWLObjectType]], + process_run_id: str, + name: Optional[str] = None, + ) -> None: + """Add used() for each data artefact.""" + if isinstance(job_order, list): + for entry in job_order: + self.used_artefacts(entry, process_run_id, name) + else: + # FIXME: Use workflow name in packed.cwl, "main" is wrong for nested workflows + base = "main" + if name is not None: + base += "/" + name + for key, value in job_order.items(): + prov_role = self.wf_ns[f"{base}/{key}"] + try: + entity = self.declare_artefact(value) + self.document.used( + process_run_id, + entity, + datetime.datetime.now(), + None, + {"prov:role": prov_role}, + ) + except OSError: + pass + + def generate_output_prov( + self, + final_output: Union[CWLObjectType, MutableSequence[CWLObjectType], None], + process_run_id: Optional[str], + name: Optional[str], + ) -> None: + """Call wasGeneratedBy() for each output,copy the files into the RO.""" + if isinstance(final_output, MutableSequence): + for entry in final_output: + self.generate_output_prov(entry, process_run_id, name) + elif final_output is not None: + # Timestamp should be created at the earliest + timestamp = datetime.datetime.now() + + # For each output, find/register the corresponding + # entity (UUID) and document it as generated in + # a role corresponding to the output + for output, value in final_output.items(): + entity = self.declare_artefact(value) + if name is not None: + name = urllib.parse.quote(str(name), safe=":/,#") + # FIXME: Probably not "main" in nested workflows + role = self.wf_ns[f"main/{name}/{output}"] + else: + role = self.wf_ns["main/%s" % output] + + if not process_run_id: + process_run_id = self.workflow_run_uri + + self.document.wasGeneratedBy( + entity, process_run_id, timestamp, None, {"prov:role": role} + ) + + def prospective_prov(self, job: JobsType) -> None: + """Create prospective prov recording as wfdesc prov:Plan.""" + if not isinstance(job, WorkflowJob): + # direct command line tool execution + self.document.entity( + "wf:main", + { + PROV_TYPE: WFDESC["Process"], + "prov:type": PROV["Plan"], + "prov:label": "Prospective provenance", + }, + ) + return + + self.document.entity( + "wf:main", + { + PROV_TYPE: WFDESC["Workflow"], + "prov:type": PROV["Plan"], + "prov:label": "Prospective provenance", + }, + ) + + for step in job.steps: + stepnametemp = "wf:main/" + str(step.name)[5:] + stepname = urllib.parse.quote(stepnametemp, safe=":/,#") + provstep = self.document.entity( + stepname, + {PROV_TYPE: WFDESC["Process"], "prov:type": PROV["Plan"]}, + ) + self.document.entity( + "wf:main", + { + "wfdesc:hasSubProcess": provstep, + "prov:label": "Prospective provenance", + }, + ) + # TODO: Declare roles/parameters as well + + def activity_has_provenance(self, activity, prov_ids): + # type: (str, List[Identifier]) -> None + """Add http://www.w3.org/TR/prov-aq/ relations to nested PROV files.""" + # NOTE: The below will only work if the corresponding metadata/provenance arcp URI + # is a pre-registered namespace in the PROV Document + attribs = [(PROV["has_provenance"], prov_id) for prov_id in prov_ids] + self.document.activity(activity, other_attributes=attribs) + # Tip: we can't use https://www.w3.org/TR/prov-links/#term-mention + # as prov:mentionOf() is only for entities, not activities + uris = [i.uri for i in prov_ids] + self.research_object.add_annotation(activity, uris, PROV["has_provenance"].uri) + + def finalize_prov_profile(self, name): + # type: (Optional[str]) -> List[Identifier] + """Transfer the provenance related files to the RO.""" + # NOTE: Relative posix path + if name is None: + # main workflow, fixed filenames + filename = "primary.cwlprov" + else: + # ASCII-friendly filename, avoiding % as we don't want %2520 in manifest.json + wf_name = urllib.parse.quote(str(name), safe="").replace("%", "_") + # Note that the above could cause overlaps for similarly named + # workflows, but that's OK as we'll also include run uuid + # which also covers thhe case of this step being run in + # multiple places or iterations + filename = f"{wf_name}.{self.workflow_run_uuid}.cwlprov" + + basename = str(PurePosixPath(PROVENANCE) / filename) + + # TODO: Also support other profiles than CWLProv, e.g. ProvOne + + # list of prov identifiers of provenance files + prov_ids = [] + + # https://www.w3.org/TR/prov-xml/ + with self.research_object.write_bag_file(basename + ".xml") as provenance_file: + self.document.serialize(provenance_file, format="xml", indent=4) + prov_ids.append(self.provenance_ns[filename + ".xml"]) + + # https://www.w3.org/TR/prov-n/ + with self.research_object.write_bag_file( + basename + ".provn" + ) as provenance_file: + self.document.serialize(provenance_file, format="provn", indent=2) + prov_ids.append(self.provenance_ns[filename + ".provn"]) + + # https://www.w3.org/Submission/prov-json/ + with self.research_object.write_bag_file(basename + ".json") as provenance_file: + self.document.serialize(provenance_file, format="json", indent=2) + prov_ids.append(self.provenance_ns[filename + ".json"]) + + # "rdf" aka https://www.w3.org/TR/prov-o/ + # which can be serialized to ttl/nt/jsonld (and more!) + + # https://www.w3.org/TR/turtle/ + with self.research_object.write_bag_file(basename + ".ttl") as provenance_file: + self.document.serialize(provenance_file, format="rdf", rdf_format="turtle") + prov_ids.append(self.provenance_ns[filename + ".ttl"]) + + # https://www.w3.org/TR/n-triples/ + with self.research_object.write_bag_file(basename + ".nt") as provenance_file: + self.document.serialize( + provenance_file, format="rdf", rdf_format="ntriples" + ) + prov_ids.append(self.provenance_ns[filename + ".nt"]) + + # https://www.w3.org/TR/json-ld/ + # TODO: Use a nice JSON-LD context + # see also https://eprints.soton.ac.uk/395985/ + # 404 Not Found on https://provenance.ecs.soton.ac.uk/prov.jsonld :( + with self.research_object.write_bag_file( + basename + ".jsonld" + ) as provenance_file: + self.document.serialize(provenance_file, format="rdf", rdf_format="json-ld") + prov_ids.append(self.provenance_ns[filename + ".jsonld"]) + + _logger.debug("[provenance] added provenance: %s", prov_ids) + return prov_ids