view env/lib/python3.9/site-packages/cwltool/provenance_profile.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

import copy
import datetime
import logging
import urllib
import uuid
from io import BytesIO
from pathlib import PurePath, PurePosixPath
from socket import getfqdn
from typing import List, MutableMapping, MutableSequence, Optional, Tuple, Union, cast

from prov.identifier import Identifier
from prov.model import PROV, PROV_LABEL, PROV_TYPE, PROV_VALUE, ProvDocument, ProvEntity
from schema_salad.sourceline import SourceLine
from typing_extensions import TYPE_CHECKING

from .errors import WorkflowException
from .job import CommandLineJob, JobBase
from .loghandler import _logger
from .process import Process, shortname
from .provenance_constants import (
    ACCOUNT_UUID,
    CWLPROV,
    ENCODING,
    FOAF,
    METADATA,
    ORE,
    PROVENANCE,
    RO,
    SCHEMA,
    SHA1,
    SHA256,
    TEXT_PLAIN,
    UUID,
    WF4EVER,
    WFDESC,
    WFPROV,
)
from .stdfsaccess import StdFsAccess
from .utils import (
    CWLObjectType,
    CWLOutputType,
    JobsType,
    get_listing,
    posix_path,
    versionstring,
)
from .workflow_job import WorkflowJob

if TYPE_CHECKING:
    from .provenance import ResearchObject


def copy_job_order(
    job: Union[Process, JobsType], job_order_object: CWLObjectType
) -> CWLObjectType:
    """Create copy of job object for provenance."""
    if not isinstance(job, WorkflowJob):
        # direct command line tool execution
        return job_order_object
    customised_job = {}  # type: CWLObjectType
    # new job object for RO
    for each, i in enumerate(job.tool["inputs"]):
        with SourceLine(
            job.tool["inputs"],
            each,
            WorkflowException,
            _logger.isEnabledFor(logging.DEBUG),
        ):
            iid = shortname(i["id"])
            if iid in job_order_object:
                customised_job[iid] = copy.deepcopy(job_order_object[iid])
                # add the input element in dictionary for provenance
            elif "default" in i:
                customised_job[iid] = copy.deepcopy(i["default"])
                # add the default elements in the dictionary for provenance
            else:
                pass
    return customised_job


class ProvenanceProfile:
    """
    Provenance profile.

    Populated as the workflow runs.
    """

    def __init__(
        self,
        research_object: "ResearchObject",
        full_name: str,
        host_provenance: bool,
        user_provenance: bool,
        orcid: str,
        fsaccess: StdFsAccess,
        run_uuid: Optional[uuid.UUID] = None,
    ) -> None:
        """Initialize the provenance profile."""
        self.fsaccess = fsaccess
        self.orcid = orcid
        self.research_object = research_object
        self.folder = self.research_object.folder
        self.document = ProvDocument()
        self.host_provenance = host_provenance
        self.user_provenance = user_provenance
        self.engine_uuid = research_object.engine_uuid  # type: str
        self.add_to_manifest = self.research_object.add_to_manifest
        if self.orcid:
            _logger.debug("[provenance] Creator ORCID: %s", self.orcid)
        self.full_name = full_name
        if self.full_name:
            _logger.debug("[provenance] Creator Full name: %s", self.full_name)
        self.workflow_run_uuid = run_uuid or uuid.uuid4()
        self.workflow_run_uri = self.workflow_run_uuid.urn  # type: str
        self.generate_prov_doc()

    def __str__(self) -> str:
        """Represent this Provenvance profile as a string."""
        return "ProvenanceProfile <{}> in <{}>".format(
            self.workflow_run_uri,
            self.research_object,
        )

    def generate_prov_doc(self) -> Tuple[str, ProvDocument]:
        """Add basic namespaces."""

        def host_provenance(document: ProvDocument) -> None:
            """Record host provenance."""
            document.add_namespace(CWLPROV)
            document.add_namespace(UUID)
            document.add_namespace(FOAF)

            hostname = getfqdn()
            # won't have a foaf:accountServiceHomepage for unix hosts, but
            # we can at least provide hostname
            document.agent(
                ACCOUNT_UUID,
                {
                    PROV_TYPE: FOAF["OnlineAccount"],
                    "prov:location": hostname,
                    CWLPROV["hostname"]: hostname,
                },
            )

        self.cwltool_version = "cwltool %s" % versionstring().split()[-1]
        self.document.add_namespace("wfprov", "http://purl.org/wf4ever/wfprov#")
        # document.add_namespace('prov', 'http://www.w3.org/ns/prov#')
        self.document.add_namespace("wfdesc", "http://purl.org/wf4ever/wfdesc#")
        # TODO: Make this ontology. For now only has cwlprov:image
        self.document.add_namespace("cwlprov", "https://w3id.org/cwl/prov#")
        self.document.add_namespace("foaf", "http://xmlns.com/foaf/0.1/")
        self.document.add_namespace("schema", "http://schema.org/")
        self.document.add_namespace("orcid", "https://orcid.org/")
        self.document.add_namespace("id", "urn:uuid:")
        # NOTE: Internet draft expired 2004-03-04 (!)
        #  https://tools.ietf.org/html/draft-thiemann-hash-urn-01
        # TODO: Change to nih:sha-256; hashes
        #  https://tools.ietf.org/html/rfc6920#section-7
        self.document.add_namespace("data", "urn:hash::sha1:")
        # Also needed for docker images
        self.document.add_namespace(SHA256, "nih:sha-256;")

        # info only, won't really be used by prov as sub-resources use /
        self.document.add_namespace("researchobject", self.research_object.base_uri)
        # annotations
        self.metadata_ns = self.document.add_namespace(
            "metadata", self.research_object.base_uri + METADATA + "/"
        )
        # Pre-register provenance directory so we can refer to its files
        self.provenance_ns = self.document.add_namespace(
            "provenance", self.research_object.base_uri + posix_path(PROVENANCE) + "/"
        )
        ro_identifier_workflow = self.research_object.base_uri + "workflow/packed.cwl#"
        self.wf_ns = self.document.add_namespace("wf", ro_identifier_workflow)
        ro_identifier_input = (
            self.research_object.base_uri + "workflow/primary-job.json#"
        )
        self.document.add_namespace("input", ro_identifier_input)

        # More info about the account (e.g. username, fullname)
        # may or may not have been previously logged by user_provenance()
        # .. but we always know cwltool was launched (directly or indirectly)
        # by a user account, as cwltool is a command line tool
        account = self.document.agent(ACCOUNT_UUID)
        if self.orcid or self.full_name:
            person = {PROV_TYPE: PROV["Person"], "prov:type": SCHEMA["Person"]}
            if self.full_name:
                person["prov:label"] = self.full_name
                person["foaf:name"] = self.full_name
                person["schema:name"] = self.full_name
            else:
                # TODO: Look up name from ORCID API?
                pass
            agent = self.document.agent(self.orcid or uuid.uuid4().urn, person)
            self.document.actedOnBehalfOf(account, agent)
        else:
            if self.host_provenance:
                host_provenance(self.document)
            if self.user_provenance:
                self.research_object.user_provenance(self.document)
        # The execution of cwltool
        wfengine = self.document.agent(
            self.engine_uuid,
            {
                PROV_TYPE: PROV["SoftwareAgent"],
                "prov:type": WFPROV["WorkflowEngine"],
                "prov:label": self.cwltool_version,
            },
        )
        # FIXME: This datetime will be a bit too delayed, we should
        # capture when cwltool.py earliest started?
        self.document.wasStartedBy(wfengine, None, account, datetime.datetime.now())
        # define workflow run level activity
        self.document.activity(
            self.workflow_run_uri,
            datetime.datetime.now(),
            None,
            {
                PROV_TYPE: WFPROV["WorkflowRun"],
                "prov:label": "Run of workflow/packed.cwl#main",
            },
        )
        # association between SoftwareAgent and WorkflowRun
        main_workflow = "wf:main"
        self.document.wasAssociatedWith(
            self.workflow_run_uri, self.engine_uuid, main_workflow
        )
        self.document.wasStartedBy(
            self.workflow_run_uri, None, self.engine_uuid, datetime.datetime.now()
        )
        return (self.workflow_run_uri, self.document)

    def evaluate(
        self,
        process: Process,
        job: JobsType,
        job_order_object: CWLObjectType,
        research_obj: "ResearchObject",
    ) -> None:
        """Evaluate the nature of job."""
        if not hasattr(process, "steps"):
            # record provenance of independent commandline tool executions
            self.prospective_prov(job)
            customised_job = copy_job_order(job, job_order_object)
            self.used_artefacts(customised_job, self.workflow_run_uri)
            research_obj.create_job(customised_job)
        elif hasattr(job, "workflow"):
            # record provenance of workflow executions
            self.prospective_prov(job)
            customised_job = copy_job_order(job, job_order_object)
            self.used_artefacts(customised_job, self.workflow_run_uri)

    def record_process_start(
        self, process: Process, job: JobsType, process_run_id: Optional[str] = None
    ) -> Optional[str]:
        if not hasattr(process, "steps"):
            process_run_id = self.workflow_run_uri
        elif not hasattr(job, "workflow"):
            # commandline tool execution as part of workflow
            name = ""
            if isinstance(job, (CommandLineJob, JobBase, WorkflowJob)):
                name = job.name
            process_name = urllib.parse.quote(name, safe=":/,#")
            process_run_id = self.start_process(process_name, datetime.datetime.now())
        return process_run_id

    def start_process(
        self,
        process_name: str,
        when: datetime.datetime,
        process_run_id: Optional[str] = None,
    ) -> str:
        """Record the start of each Process."""
        if process_run_id is None:
            process_run_id = uuid.uuid4().urn
        prov_label = "Run of workflow/packed.cwl#main/" + process_name
        self.document.activity(
            process_run_id,
            None,
            None,
            {PROV_TYPE: WFPROV["ProcessRun"], PROV_LABEL: prov_label},
        )
        self.document.wasAssociatedWith(
            process_run_id, self.engine_uuid, str("wf:main/" + process_name)
        )
        self.document.wasStartedBy(
            process_run_id, None, self.workflow_run_uri, when, None, None
        )
        return process_run_id

    def record_process_end(
        self,
        process_name: str,
        process_run_id: str,
        outputs: Union[CWLObjectType, MutableSequence[CWLObjectType], None],
        when: datetime.datetime,
    ) -> None:
        self.generate_output_prov(outputs, process_run_id, process_name)
        self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when)

    def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, str]:
        if value["class"] != "File":
            raise ValueError("Must have class:File: %s" % value)
        # Need to determine file hash aka RO filename
        entity = None  # type: Optional[ProvEntity]
        checksum = None
        if "checksum" in value:
            csum = cast(str, value["checksum"])
            (method, checksum) = csum.split("$", 1)
            if method == SHA1 and self.research_object.has_data_file(checksum):
                entity = self.document.entity("data:" + checksum)

        if not entity and "location" in value:
            location = str(value["location"])
            # If we made it here, we'll have to add it to the RO
            with self.fsaccess.open(location, "rb") as fhandle:
                relative_path = self.research_object.add_data_file(fhandle)
                # FIXME: This naively relies on add_data_file setting hash as filename
                checksum = PurePath(relative_path).name
                entity = self.document.entity(
                    "data:" + checksum, {PROV_TYPE: WFPROV["Artifact"]}
                )
                if "checksum" not in value:
                    value["checksum"] = f"{SHA1}${checksum}"

        if not entity and "contents" in value:
            # Anonymous file, add content as string
            entity, checksum = self.declare_string(cast(str, value["contents"]))

        # By here one of them should have worked!
        if not entity or not checksum:
            raise ValueError(
                "class:File but missing checksum/location/content: %r" % value
            )

        # Track filename and extension, this is generally useful only for
        # secondaryFiles. Note that multiple uses of a file might thus record
        # different names for the same entity, so we'll
        # make/track a specialized entity by UUID
        file_id = value.setdefault("@id", uuid.uuid4().urn)
        # A specialized entity that has just these names
        file_entity = self.document.entity(
            file_id,
            [(PROV_TYPE, WFPROV["Artifact"]), (PROV_TYPE, WF4EVER["File"])],
        )  # type: ProvEntity

        if "basename" in value:
            file_entity.add_attributes({CWLPROV["basename"]: value["basename"]})
        if "nameroot" in value:
            file_entity.add_attributes({CWLPROV["nameroot"]: value["nameroot"]})
        if "nameext" in value:
            file_entity.add_attributes({CWLPROV["nameext"]: value["nameext"]})
        self.document.specializationOf(file_entity, entity)

        # Check for secondaries
        for sec in cast(
            MutableSequence[CWLObjectType], value.get("secondaryFiles", [])
        ):
            # TODO: Record these in a specializationOf entity with UUID?
            if sec["class"] == "File":
                (sec_entity, _, _) = self.declare_file(sec)
            elif sec["class"] == "Directory":
                sec_entity = self.declare_directory(sec)
            else:
                raise ValueError(f"Got unexpected secondaryFiles value: {sec}")
            # We don't know how/when/where the secondary file was generated,
            # but CWL convention is a kind of summary/index derived
            # from the original file. As its generally in a different format
            # then prov:Quotation is not appropriate.
            self.document.derivation(
                sec_entity,
                file_entity,
                other_attributes={PROV["type"]: CWLPROV["SecondaryFile"]},
            )

        return file_entity, entity, checksum

    def declare_directory(self, value: CWLObjectType) -> ProvEntity:
        """Register any nested files/directories."""
        # FIXME: Calculate a hash-like identifier for directory
        # so we get same value if it's the same filenames/hashes
        # in a different location.
        # For now, mint a new UUID to identify this directory, but
        # attempt to keep it inside the value dictionary
        dir_id = cast(str, value.setdefault("@id", uuid.uuid4().urn))

        # New annotation file to keep the ORE Folder listing
        ore_doc_fn = dir_id.replace("urn:uuid:", "directory-") + ".ttl"
        dir_bundle = self.document.bundle(self.metadata_ns[ore_doc_fn])

        coll = self.document.entity(
            dir_id,
            [
                (PROV_TYPE, WFPROV["Artifact"]),
                (PROV_TYPE, PROV["Collection"]),
                (PROV_TYPE, PROV["Dictionary"]),
                (PROV_TYPE, RO["Folder"]),
            ],
        )
        # ORE description of ro:Folder, saved separately
        coll_b = dir_bundle.entity(
            dir_id,
            [(PROV_TYPE, RO["Folder"]), (PROV_TYPE, ORE["Aggregation"])],
        )
        self.document.mentionOf(dir_id + "#ore", dir_id, dir_bundle.identifier)

        # dir_manifest = dir_bundle.entity(
        #     dir_bundle.identifier, {PROV["type"]: ORE["ResourceMap"],
        #                             ORE["describes"]: coll_b.identifier})

        coll_attribs = [(ORE["isDescribedBy"], dir_bundle.identifier)]
        coll_b_attribs = []  # type: List[Tuple[Identifier, ProvEntity]]

        # FIXME: .listing might not be populated yet - hopefully
        # a later call to this method will sort that
        is_empty = True

        if "listing" not in value:
            get_listing(self.fsaccess, value)
        for entry in cast(MutableSequence[CWLObjectType], value.get("listing", [])):
            is_empty = False
            # Declare child-artifacts
            entity = self.declare_artefact(entry)
            self.document.membership(coll, entity)
            # Membership relation aka our ORE Proxy
            m_id = uuid.uuid4().urn
            m_entity = self.document.entity(m_id)
            m_b = dir_bundle.entity(m_id)

            # PROV-O style Dictionary
            # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition
            # ..as prov.py do not currently allow PROV-N extensions
            # like hadDictionaryMember(..)
            m_entity.add_asserted_type(PROV["KeyEntityPair"])

            m_entity.add_attributes(
                {
                    PROV["pairKey"]: entry["basename"],
                    PROV["pairEntity"]: entity,
                }
            )

            # As well as a being a
            # http://wf4ever.github.io/ro/2016-01-28/ro/#FolderEntry
            m_b.add_asserted_type(RO["FolderEntry"])
            m_b.add_asserted_type(ORE["Proxy"])
            m_b.add_attributes(
                {
                    RO["entryName"]: entry["basename"],
                    ORE["proxyIn"]: coll,
                    ORE["proxyFor"]: entity,
                }
            )
            coll_attribs.append((PROV["hadDictionaryMember"], m_entity))
            coll_b_attribs.append((ORE["aggregates"], m_b))

        coll.add_attributes(coll_attribs)
        coll_b.add_attributes(coll_b_attribs)

        # Also Save ORE Folder as annotation metadata
        ore_doc = ProvDocument()
        ore_doc.add_namespace(ORE)
        ore_doc.add_namespace(RO)
        ore_doc.add_namespace(UUID)
        ore_doc.add_bundle(dir_bundle)
        ore_doc = ore_doc.flattened()
        ore_doc_path = str(PurePosixPath(METADATA, ore_doc_fn))
        with self.research_object.write_bag_file(ore_doc_path) as provenance_file:
            ore_doc.serialize(provenance_file, format="rdf", rdf_format="turtle")
        self.research_object.add_annotation(
            dir_id, [ore_doc_fn], ORE["isDescribedBy"].uri
        )

        if is_empty:
            # Empty directory
            coll.add_asserted_type(PROV["EmptyCollection"])
            coll.add_asserted_type(PROV["EmptyDictionary"])
        self.research_object.add_uri(coll.identifier.uri)
        return coll

    def declare_string(self, value: str) -> Tuple[ProvEntity, str]:
        """Save as string in UTF-8."""
        byte_s = BytesIO(str(value).encode(ENCODING))
        data_file = self.research_object.add_data_file(byte_s, content_type=TEXT_PLAIN)
        checksum = PurePosixPath(data_file).name
        # FIXME: Don't naively assume add_data_file uses hash in filename!
        data_id = "data:%s" % PurePosixPath(data_file).stem
        entity = self.document.entity(
            data_id, {PROV_TYPE: WFPROV["Artifact"], PROV_VALUE: str(value)}
        )  # type: ProvEntity
        return entity, checksum

    def declare_artefact(self, value: Optional[CWLOutputType]) -> ProvEntity:
        """Create data artefact entities for all file objects."""
        if value is None:
            # FIXME: If this can happen in CWL, we'll
            # need a better way to represent this in PROV
            return self.document.entity(CWLPROV["None"], {PROV_LABEL: "None"})

        if isinstance(value, (bool, int, float)):
            # Typically used in job documents for flags

            # FIXME: Make consistent hash URIs for these
            # that somehow include the type
            # (so "1" != 1 != "1.0" != true)
            entity = self.document.entity(uuid.uuid4().urn, {PROV_VALUE: value})
            self.research_object.add_uri(entity.identifier.uri)
            return entity

        if isinstance(value, (str, str)):
            (entity, _) = self.declare_string(value)
            return entity

        if isinstance(value, bytes):
            # If we got here then we must be in Python 3
            byte_s = BytesIO(value)
            data_file = self.research_object.add_data_file(byte_s)
            # FIXME: Don't naively assume add_data_file uses hash in filename!
            data_id = "data:%s" % PurePosixPath(data_file).stem
            return self.document.entity(
                data_id,
                {PROV_TYPE: WFPROV["Artifact"], PROV_VALUE: str(value)},
            )

        if isinstance(value, MutableMapping):
            if "@id" in value:
                # Already processed this value, but it might not be in this PROV
                entities = self.document.get_record(value["@id"])
                if entities:
                    return entities[0]
                # else, unknown in PROV, re-add below as if it's fresh

            # Base case - we found a File we need to update
            if value.get("class") == "File":
                (entity, _, _) = self.declare_file(value)
                value["@id"] = entity.identifier.uri
                return entity

            if value.get("class") == "Directory":
                entity = self.declare_directory(value)
                value["@id"] = entity.identifier.uri
                return entity
            coll_id = value.setdefault("@id", uuid.uuid4().urn)
            # some other kind of dictionary?
            # TODO: also Save as JSON
            coll = self.document.entity(
                coll_id,
                [
                    (PROV_TYPE, WFPROV["Artifact"]),
                    (PROV_TYPE, PROV["Collection"]),
                    (PROV_TYPE, PROV["Dictionary"]),
                ],
            )

            if value.get("class"):
                _logger.warning("Unknown data class %s.", value["class"])
                # FIXME: The class might be "http://example.com/somethingelse"
                coll.add_asserted_type(CWLPROV[value["class"]])

            # Let's iterate and recurse
            coll_attribs = []  # type: List[Tuple[Identifier, ProvEntity]]
            for (key, val) in value.items():
                v_ent = self.declare_artefact(val)
                self.document.membership(coll, v_ent)
                m_entity = self.document.entity(uuid.uuid4().urn)
                # Note: only support PROV-O style dictionary
                # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition
                # as prov.py do not easily allow PROV-N extensions
                m_entity.add_asserted_type(PROV["KeyEntityPair"])
                m_entity.add_attributes(
                    {PROV["pairKey"]: str(key), PROV["pairEntity"]: v_ent}
                )
                coll_attribs.append((PROV["hadDictionaryMember"], m_entity))
            coll.add_attributes(coll_attribs)
            self.research_object.add_uri(coll.identifier.uri)
            return coll

        # some other kind of Collection?
        # TODO: also save as JSON
        try:
            members = []
            for each_input_obj in iter(value):
                # Recurse and register any nested objects
                e = self.declare_artefact(each_input_obj)
                members.append(e)

            # If we reached this, then we were allowed to iterate
            coll = self.document.entity(
                uuid.uuid4().urn,
                [
                    (PROV_TYPE, WFPROV["Artifact"]),
                    (PROV_TYPE, PROV["Collection"]),
                ],
            )
            if not members:
                coll.add_asserted_type(PROV["EmptyCollection"])
            else:
                for member in members:
                    # FIXME: This won't preserve order, for that
                    # we would need to use PROV.Dictionary
                    # with numeric keys
                    self.document.membership(coll, member)
            self.research_object.add_uri(coll.identifier.uri)
            # FIXME: list value does not support adding "@id"
            return coll
        except TypeError:
            _logger.warning("Unrecognized type %s of %r", type(value), value)
            # Let's just fall back to Python repr()
            entity = self.document.entity(uuid.uuid4().urn, {PROV_LABEL: repr(value)})
            self.research_object.add_uri(entity.identifier.uri)
            return entity

    def used_artefacts(
        self,
        job_order: Union[CWLObjectType, List[CWLObjectType]],
        process_run_id: str,
        name: Optional[str] = None,
    ) -> None:
        """Add used() for each data artefact."""
        if isinstance(job_order, list):
            for entry in job_order:
                self.used_artefacts(entry, process_run_id, name)
        else:
            # FIXME: Use workflow name in packed.cwl, "main" is wrong for nested workflows
            base = "main"
            if name is not None:
                base += "/" + name
            for key, value in job_order.items():
                prov_role = self.wf_ns[f"{base}/{key}"]
                try:
                    entity = self.declare_artefact(value)
                    self.document.used(
                        process_run_id,
                        entity,
                        datetime.datetime.now(),
                        None,
                        {"prov:role": prov_role},
                    )
                except OSError:
                    pass

    def generate_output_prov(
        self,
        final_output: Union[CWLObjectType, MutableSequence[CWLObjectType], None],
        process_run_id: Optional[str],
        name: Optional[str],
    ) -> None:
        """Call wasGeneratedBy() for each output,copy the files into the RO."""
        if isinstance(final_output, MutableSequence):
            for entry in final_output:
                self.generate_output_prov(entry, process_run_id, name)
        elif final_output is not None:
            # Timestamp should be created at the earliest
            timestamp = datetime.datetime.now()

            # For each output, find/register the corresponding
            # entity (UUID) and document it as generated in
            # a role corresponding to the output
            for output, value in final_output.items():
                entity = self.declare_artefact(value)
                if name is not None:
                    name = urllib.parse.quote(str(name), safe=":/,#")
                    # FIXME: Probably not "main" in nested workflows
                    role = self.wf_ns[f"main/{name}/{output}"]
                else:
                    role = self.wf_ns["main/%s" % output]

                if not process_run_id:
                    process_run_id = self.workflow_run_uri

                self.document.wasGeneratedBy(
                    entity, process_run_id, timestamp, None, {"prov:role": role}
                )

    def prospective_prov(self, job: JobsType) -> None:
        """Create prospective prov recording as wfdesc prov:Plan."""
        if not isinstance(job, WorkflowJob):
            # direct command line tool execution
            self.document.entity(
                "wf:main",
                {
                    PROV_TYPE: WFDESC["Process"],
                    "prov:type": PROV["Plan"],
                    "prov:label": "Prospective provenance",
                },
            )
            return

        self.document.entity(
            "wf:main",
            {
                PROV_TYPE: WFDESC["Workflow"],
                "prov:type": PROV["Plan"],
                "prov:label": "Prospective provenance",
            },
        )

        for step in job.steps:
            stepnametemp = "wf:main/" + str(step.name)[5:]
            stepname = urllib.parse.quote(stepnametemp, safe=":/,#")
            provstep = self.document.entity(
                stepname,
                {PROV_TYPE: WFDESC["Process"], "prov:type": PROV["Plan"]},
            )
            self.document.entity(
                "wf:main",
                {
                    "wfdesc:hasSubProcess": provstep,
                    "prov:label": "Prospective provenance",
                },
            )
        # TODO: Declare roles/parameters as well

    def activity_has_provenance(self, activity, prov_ids):
        # type: (str, List[Identifier]) -> None
        """Add http://www.w3.org/TR/prov-aq/ relations to nested PROV files."""
        # NOTE: The below will only work if the corresponding metadata/provenance arcp URI
        # is a pre-registered namespace in the PROV Document
        attribs = [(PROV["has_provenance"], prov_id) for prov_id in prov_ids]
        self.document.activity(activity, other_attributes=attribs)
        # Tip: we can't use https://www.w3.org/TR/prov-links/#term-mention
        # as prov:mentionOf() is only for entities, not activities
        uris = [i.uri for i in prov_ids]
        self.research_object.add_annotation(activity, uris, PROV["has_provenance"].uri)

    def finalize_prov_profile(self, name):
        # type: (Optional[str]) -> List[Identifier]
        """Transfer the provenance related files to the RO."""
        # NOTE: Relative posix path
        if name is None:
            # main workflow, fixed filenames
            filename = "primary.cwlprov"
        else:
            # ASCII-friendly filename, avoiding % as we don't want %2520 in manifest.json
            wf_name = urllib.parse.quote(str(name), safe="").replace("%", "_")
            # Note that the above could cause overlaps for similarly named
            # workflows, but that's OK as we'll also include run uuid
            # which also covers thhe case of this step being run in
            # multiple places or iterations
            filename = f"{wf_name}.{self.workflow_run_uuid}.cwlprov"

        basename = str(PurePosixPath(PROVENANCE) / filename)

        # TODO: Also support other profiles than CWLProv, e.g. ProvOne

        # list of prov identifiers of provenance files
        prov_ids = []

        # https://www.w3.org/TR/prov-xml/
        with self.research_object.write_bag_file(basename + ".xml") as provenance_file:
            self.document.serialize(provenance_file, format="xml", indent=4)
            prov_ids.append(self.provenance_ns[filename + ".xml"])

        # https://www.w3.org/TR/prov-n/
        with self.research_object.write_bag_file(
            basename + ".provn"
        ) as provenance_file:
            self.document.serialize(provenance_file, format="provn", indent=2)
            prov_ids.append(self.provenance_ns[filename + ".provn"])

        # https://www.w3.org/Submission/prov-json/
        with self.research_object.write_bag_file(basename + ".json") as provenance_file:
            self.document.serialize(provenance_file, format="json", indent=2)
            prov_ids.append(self.provenance_ns[filename + ".json"])

        # "rdf" aka https://www.w3.org/TR/prov-o/
        # which can be serialized to ttl/nt/jsonld (and more!)

        # https://www.w3.org/TR/turtle/
        with self.research_object.write_bag_file(basename + ".ttl") as provenance_file:
            self.document.serialize(provenance_file, format="rdf", rdf_format="turtle")
            prov_ids.append(self.provenance_ns[filename + ".ttl"])

        # https://www.w3.org/TR/n-triples/
        with self.research_object.write_bag_file(basename + ".nt") as provenance_file:
            self.document.serialize(
                provenance_file, format="rdf", rdf_format="ntriples"
            )
            prov_ids.append(self.provenance_ns[filename + ".nt"])

        # https://www.w3.org/TR/json-ld/
        # TODO: Use a nice JSON-LD context
        # see also https://eprints.soton.ac.uk/395985/
        # 404 Not Found on https://provenance.ecs.soton.ac.uk/prov.jsonld :(
        with self.research_object.write_bag_file(
            basename + ".jsonld"
        ) as provenance_file:
            self.document.serialize(provenance_file, format="rdf", rdf_format="json-ld")
            prov_ids.append(self.provenance_ns[filename + ".jsonld"])

        _logger.debug("[provenance] added provenance: %s", prov_ids)
        return prov_ids