diff env/lib/python3.7/site-packages/cwltool/provenance.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/provenance.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1679 +0,0 @@
-"""Stores Research Object including provenance."""
-from __future__ import absolute_import
-
-import copy
-import datetime
-import hashlib
-import logging
-import os
-import re
-import shutil
-import tempfile
-import uuid
-from collections import OrderedDict
-from getpass import getuser
-from io import BytesIO, FileIO, TextIOWrapper, open
-from socket import getfqdn
-from typing import (IO, Any, Callable, Dict, List, Generator,
-                    MutableMapping, Optional, Set, Tuple, Union, cast)
-from types import ModuleType
-
-import prov.model as provM
-import six
-from prov.identifier import Identifier, Namespace
-from prov.model import (PROV, ProvActivity,  # pylint: disable=unused-import
-                        ProvDocument, ProvEntity)
-from pathlib2 import Path, PurePosixPath, PurePath
-from ruamel import yaml
-from schema_salad.sourceline import SourceLine
-from six.moves import urllib
-from typing_extensions import (TYPE_CHECKING,  # pylint: disable=unused-import
-                               Text)
-# move to a regular typing import when Python 3.3-3.6 is no longer supported
-
-from .context import RuntimeContext  # pylint: disable=unused-import
-from .errors import WorkflowException
-from .loghandler import _logger
-from .pathmapper import get_listing
-from .process import Process, shortname  # pylint: disable=unused-import
-from .stdfsaccess import StdFsAccess  # pylint: disable=unused-import
-from .utils import json_dumps, versionstring, onWindows
-
-
-# imports needed for retrieving user data
-if onWindows():
-    import ctypes  # pylint: disable=unused-import
-else:
-    try:
-        import pwd  # pylint: disable=unused-import
-    except ImportError:
-        pass
-
-if TYPE_CHECKING:
-    from .command_line_tool import CommandLineTool, ExpressionTool  # pylint: disable=unused-import
-    from .workflow import Workflow  # pylint: disable=unused-import
-
-if six.PY2:
-    class PermissionError(OSError):  # pylint: disable=redefined-builtin
-        """Needed for Python2."""
-
-        pass
-__citation__ = "https://doi.org/10.5281/zenodo.1208477"
-
-# NOTE: Semantic versioning of the CWLProv Research Object
-# **and** the cwlprov files
-#
-# Rough guide (major.minor.patch):
-# 1. Bump major number if removing/"breaking" resources or PROV statements
-# 2. Bump minor number if adding resources or PROV statements
-# 3. Bump patch number for non-breaking non-adding changes,
-#    e.g. fixing broken relative paths
-CWLPROV_VERSION = "https://w3id.org/cwl/prov/0.6.0"
-
-# Research Object folders
-METADATA = "metadata"
-DATA = "data"
-WORKFLOW = "workflow"
-SNAPSHOT = "snapshot"
-# sub-folders
-MAIN = os.path.join(WORKFLOW, "main")
-PROVENANCE = os.path.join(METADATA, "provenance")
-LOGS = os.path.join(METADATA, "logs")
-WFDESC = Namespace("wfdesc", 'http://purl.org/wf4ever/wfdesc#')
-WFPROV = Namespace("wfprov", 'http://purl.org/wf4ever/wfprov#')
-WF4EVER = Namespace("wf4ever", 'http://purl.org/wf4ever/wf4ever#')
-RO = Namespace("ro", 'http://purl.org/wf4ever/ro#')
-ORE = Namespace("ore", 'http://www.openarchives.org/ore/terms/')
-FOAF = Namespace("foaf", 'http://xmlns.com/foaf/0.1/')
-SCHEMA = Namespace("schema", 'http://schema.org/')
-CWLPROV = Namespace('cwlprov', 'https://w3id.org/cwl/prov#')
-ORCID = Namespace("orcid", "https://orcid.org/")
-UUID = Namespace("id", "urn:uuid:")
-
-# BagIt and YAML always use UTF-8
-ENCODING = "UTF-8"
-TEXT_PLAIN = 'text/plain; charset="%s"' % ENCODING
-
-# sha1, compatible with the File type's "checksum" field
-# e.g. "checksum" = "sha1$47a013e660d408619d894b20806b1d5086aab03b"
-# See ./cwltool/schemas/v1.0/Process.yml
-Hasher = hashlib.sha1
-SHA1 = "sha1"
-SHA256 = "sha256"
-SHA512 = "sha512"
-
-# TODO: Better identifiers for user, at least
-# these should be preserved in ~/.config/cwl for every execution
-# on this host
-USER_UUID = uuid.uuid4().urn
-ACCOUNT_UUID = uuid.uuid4().urn
-
-
-def _posix_path(local_path):
-    # type: (Text) -> Text
-    return str(PurePosixPath(Path(local_path)))
-
-
-def _local_path(posix_path):
-    # type: (Text) -> Text
-    return str(Path(posix_path))
-
-
-def _whoami():
-    # type: () -> Tuple[Text,Text]
-    """Return the current operating system account as (username, fullname)."""
-    username = getuser()
-    try:
-        if onWindows():
-            get_user_name = ctypes.windll.secur32.GetUserNameExW  # type: ignore
-            size = ctypes.pointer(ctypes.c_ulong(0))
-            get_user_name(3, None, size)
-
-            name_buffer = ctypes.create_unicode_buffer(size.contents.value)
-            get_user_name(3, name_buffer, size)
-            fullname = str(name_buffer.value)
-        else:
-            fullname = pwd.getpwuid(os.getuid())[4].split(',')[0]
-    except (KeyError, IndexError):
-        fullname = username
-
-    return (username, fullname)
-
-
-class WritableBagFile(FileIO):
-    """Writes files in research object."""
-
-    def __init__(self, research_object, rel_path):
-        # type: (ResearchObject, Text) -> None
-        """Initialize an ROBagIt."""
-        self.research_object = research_object
-        if Path(rel_path).is_absolute():
-            raise ValueError("rel_path must be relative: %s" % rel_path)
-        self.rel_path = rel_path
-        self.hashes = {SHA1: hashlib.sha1(),  # nosec
-                       SHA256: hashlib.sha256(),
-                       SHA512: hashlib.sha512()}
-        # Open file in Research Object folder
-        path = os.path.abspath(os.path.join(research_object.folder, _local_path(rel_path)))
-        if not path.startswith(os.path.abspath(research_object.folder)):
-            raise ValueError("Path is outside Research Object: %s" % path)
-        super(WritableBagFile, self).__init__(str(path), mode="w")
-
-
-    def write(self, b):
-        # type: (Union[bytes, Text]) -> int
-        if isinstance(b, bytes):
-            real_b = b
-        else:
-            real_b = b.encode('utf-8')
-        total = 0
-        length = len(real_b)
-        while total < length:
-            ret = super(WritableBagFile, self).write(real_b)
-            if ret:
-                total += ret
-        for _ in self.hashes.values():
-            _.update(real_b)
-        return total
-
-    def close(self):  # type: () -> None
-        # FIXME: Convert below block to a ResearchObject method?
-        if self.rel_path.startswith("data/"):
-            self.research_object.bagged_size[self.rel_path] = self.tell()
-        else:
-            self.research_object.tagfiles.add(self.rel_path)
-
-        super(WritableBagFile, self).close()
-        # { "sha1": "f572d396fae9206628714fb2ce00f72e94f2258f" }
-        checksums = {}
-        for name in self.hashes:
-            checksums[name] = self.hashes[name].hexdigest().lower()
-        self.research_object.add_to_manifest(self.rel_path, checksums)
-
-    # To simplify our hash calculation we won't support
-    # seeking, reading or truncating, as we can't do
-    # similar seeks in the current hash.
-    # TODO: Support these? At the expense of invalidating
-    # the current hash, then having to recalculate at close()
-    def seekable(self):  # type: () -> bool
-        return False
-
-    def readable(self):  # type: () -> bool
-        return False
-
-    def truncate(self, size=None):
-        # type: (Optional[int]) -> int
-        # FIXME: This breaks contract IOBase,
-        # as it means we would have to recalculate the hash
-        if size is not None:
-            raise IOError("WritableBagFile can't truncate")
-        return self.tell()
-
-
-def _check_mod_11_2(numeric_string):
-    # type: (Text) -> bool
-    """
-    Validate numeric_string for its MOD-11-2 checksum.
-
-    Any "-" in the numeric_string are ignored.
-
-    The last digit of numeric_string is assumed to be the checksum, 0-9 or X.
-
-    See ISO/IEC 7064:2003 and
-    https://support.orcid.org/knowledgebase/articles/116780-structure-of-the-orcid-identifier
-    """
-    # Strip -
-    nums = numeric_string.replace("-", "")
-    total = 0
-    # skip last (check)digit
-    for num in nums[:-1]:
-        digit = int(num)
-        total = (total+digit)*2
-    remainder = total % 11
-    result = (12-remainder) % 11
-    if result == 10:
-        checkdigit = "X"
-    else:
-        checkdigit = str(result)
-    # Compare against last digit or X
-    return nums[-1].upper() == checkdigit
-
-
-def _valid_orcid(orcid):  # type: (Optional[Text]) -> Text
-    """
-    Ensure orcid is a valid ORCID identifier.
-
-    The string must be equivalent to one of these forms:
-
-    0000-0002-1825-0097
-    orcid.org/0000-0002-1825-0097
-    http://orcid.org/0000-0002-1825-0097
-    https://orcid.org/0000-0002-1825-0097
-
-    If the ORCID number or prefix is invalid, a ValueError is raised.
-
-    The returned ORCID string is always in the form of:
-    https://orcid.org/0000-0002-1825-0097
-    """
-    if orcid is None or not orcid:
-        raise ValueError(u'ORCID cannot be unspecified')
-    # Liberal in what we consume, e.g. ORCID.org/0000-0002-1825-009x
-    orcid = orcid.lower()
-    match = re.match(
-        # Note: concatinated r"" r"" below so we can add comments to pattern
-
-        # Optional hostname, with or without protocol
-        r"(http://orcid\.org/|https://orcid\.org/|orcid\.org/)?"
-        # alternative pattern, but probably messier
-        # r"^((https?://)?orcid.org/)?"
-
-        # ORCID number is always 4x4 numerical digits,
-        # but last digit (modulus 11 checksum)
-        # can also be X (but we made it lowercase above).
-        # e.g. 0000-0002-1825-0097
-        # or   0000-0002-1694-233x
-        r"(?P<orcid>(\d{4}-\d{4}-\d{4}-\d{3}[0-9x]))$",
-        orcid)
-
-    help_url = u"https://support.orcid.org/knowledgebase/articles/"\
-               "116780-structure-of-the-orcid-identifier"
-    if not match:
-        raise ValueError(u"Invalid ORCID: %s\n%s" % (orcid, help_url))
-
-    # Conservative in what we produce:
-    # a) Ensure any checksum digit is uppercase
-    orcid_num = match.group("orcid").upper()
-    # b) ..and correct
-    if not _check_mod_11_2(orcid_num):
-        raise ValueError(
-            u"Invalid ORCID checksum: %s\n%s" % (orcid_num, help_url))
-
-    # c) Re-add the official prefix https://orcid.org/
-    return u"https://orcid.org/%s" % orcid_num
-
-
-class ProvenanceProfile():
-    """
-    Provenance profile.
-
-    Populated as the workflow runs.
-    """
-
-    def __init__(self,
-                 research_object,        # type: ResearchObject
-                 full_name,              # type: str
-                 host_provenance,        # type: bool
-                 user_provenance,        # type: bool
-                 orcid,                  # type: str
-                 fsaccess,               # type: StdFsAccess
-                 run_uuid=None           # type: Optional[uuid.UUID]
-                ):  # type: (...) -> None
-        """Initialize the provenance profile."""
-        self.fsaccess = fsaccess
-        self.orcid = orcid
-        self.research_object = research_object
-        self.folder = self.research_object.folder
-        self.document = ProvDocument()
-        self.host_provenance = host_provenance
-        self.user_provenance = user_provenance
-        self.engine_uuid = research_object.engine_uuid
-        self.add_to_manifest = self.research_object.add_to_manifest
-        if self.orcid:
-            _logger.debug(u"[provenance] Creator ORCID: %s", self.orcid)
-        self.full_name = full_name
-        if self.full_name:
-            _logger.debug(u"[provenance] Creator Full name: %s",
-                          self.full_name)
-        if run_uuid is None:
-            run_uuid = uuid.uuid4()
-        self.workflow_run_uuid = run_uuid
-        self.workflow_run_uri = run_uuid.urn
-        self.generate_prov_doc()
-
-    def __str__(self):  # type: () -> str
-        """Represent this Provenvance profile as a string."""
-        return "ProvenanceProfile <%s> in <%s>" % (
-            self.workflow_run_uri, self.research_object)
-
-    def generate_prov_doc(self):
-        # type: () -> Tuple[str, ProvDocument]
-        """Add basic namespaces."""
-        def host_provenance(document):
-            # type: (ProvDocument) -> None
-            """Record host provenance."""
-            document.add_namespace(CWLPROV)
-            document.add_namespace(UUID)
-            document.add_namespace(FOAF)
-
-            hostname = getfqdn()
-            # won't have a foaf:accountServiceHomepage for unix hosts, but
-            # we can at least provide hostname
-            document.agent(
-                ACCOUNT_UUID, {provM.PROV_TYPE: FOAF["OnlineAccount"],
-                               "prov:location": hostname,
-                               CWLPROV["hostname"]: hostname})
-
-        self.cwltool_version = "cwltool %s" % versionstring().split()[-1]
-        self.document.add_namespace(
-            'wfprov', 'http://purl.org/wf4ever/wfprov#')
-        # document.add_namespace('prov', 'http://www.w3.org/ns/prov#')
-        self.document.add_namespace(
-            'wfdesc', 'http://purl.org/wf4ever/wfdesc#')
-        # TODO: Make this ontology. For now only has cwlprov:image
-        self.document.add_namespace('cwlprov', 'https://w3id.org/cwl/prov#')
-        self.document.add_namespace('foaf', 'http://xmlns.com/foaf/0.1/')
-        self.document.add_namespace('schema', 'http://schema.org/')
-        self.document.add_namespace('orcid', 'https://orcid.org/')
-        self.document.add_namespace('id', 'urn:uuid:')
-        # NOTE: Internet draft expired 2004-03-04 (!)
-        #  https://tools.ietf.org/html/draft-thiemann-hash-urn-01
-        # TODO: Change to nih:sha-256; hashes
-        #  https://tools.ietf.org/html/rfc6920#section-7
-        self.document.add_namespace('data', 'urn:hash::sha1:')
-        # Also needed for docker images
-        self.document.add_namespace(SHA256, "nih:sha-256;")
-
-        # info only, won't really be used by prov as sub-resources use /
-        self.document.add_namespace(
-            'researchobject', self.research_object.base_uri)
-        # annotations
-        self.metadata_ns = self.document.add_namespace(
-            'metadata', self.research_object.base_uri + METADATA + "/")
-        # Pre-register provenance directory so we can refer to its files
-        self.provenance_ns = self.document.add_namespace(
-            'provenance', self.research_object.base_uri
-            + _posix_path(PROVENANCE) + "/")
-        ro_identifier_workflow = self.research_object.base_uri \
-            + "workflow/packed.cwl#"
-        self.wf_ns = self.document.add_namespace("wf", ro_identifier_workflow)
-        ro_identifier_input = self.research_object.base_uri \
-            + "workflow/primary-job.json#"
-        self.document.add_namespace("input", ro_identifier_input)
-
-        # More info about the account (e.g. username, fullname)
-        # may or may not have been previously logged by user_provenance()
-        # .. but we always know cwltool was launched (directly or indirectly)
-        # by a user account, as cwltool is a command line tool
-        account = self.document.agent(ACCOUNT_UUID)
-        if self.orcid or self.full_name:
-            person = {provM.PROV_TYPE: PROV["Person"],
-                      "prov:type": SCHEMA["Person"]}
-            if self.full_name:
-                person["prov:label"] = self.full_name
-                person["foaf:name"] = self.full_name
-                person["schema:name"] = self.full_name
-            else:
-                # TODO: Look up name from ORCID API?
-                pass
-            agent = self.document.agent(self.orcid or uuid.uuid4().urn,
-                                        person)
-            self.document.actedOnBehalfOf(account, agent)
-        else:
-            if self.host_provenance:
-                host_provenance(self.document)
-            if self.user_provenance:
-                self.research_object.user_provenance(self.document)
-        # The execution of cwltool
-        wfengine = self.document.agent(
-            self.engine_uuid,
-            {provM.PROV_TYPE: PROV["SoftwareAgent"],
-             "prov:type": WFPROV["WorkflowEngine"],
-             "prov:label": self.cwltool_version})
-        # FIXME: This datetime will be a bit too delayed, we should
-        # capture when cwltool.py earliest started?
-        self.document.wasStartedBy(
-            wfengine, None, account, datetime.datetime.now())
-        # define workflow run level activity
-        self.document.activity(
-            self.workflow_run_uri, datetime.datetime.now(), None,
-            {provM.PROV_TYPE: WFPROV["WorkflowRun"],
-             "prov:label": "Run of workflow/packed.cwl#main"})
-            # association between SoftwareAgent and WorkflowRun
-        main_workflow = "wf:main"
-        self.document.wasAssociatedWith(
-            self.workflow_run_uri, self.engine_uuid, main_workflow)
-        self.document.wasStartedBy(
-            self.workflow_run_uri, None, self.engine_uuid,
-            datetime.datetime.now())
-        return (self.workflow_run_uri, self.document)
-
-    def evaluate(self,
-                 process,           # type: Process
-                 job,               # type: Any
-                 job_order_object,  # type: Dict[Text, Text]
-                 research_obj       # type: ResearchObject
-                ):  # type: (...) -> None
-        """Evaluate the nature of job."""
-        if not hasattr(process, "steps"):
-            # record provenance of independent commandline tool executions
-            self.prospective_prov(job)
-            customised_job = copy_job_order(job, job_order_object)
-            self.used_artefacts(customised_job, self.workflow_run_uri)
-            research_obj.create_job(customised_job, job)
-        elif hasattr(job, "workflow"):
-            # record provenance of workflow executions
-            self.prospective_prov(job)
-            customised_job = copy_job_order(job, job_order_object)
-            self.used_artefacts(customised_job, self.workflow_run_uri)
-
-    def record_process_start(self, process, job, process_run_id=None):
-        # type: (Process, Any, Optional[str]) -> Optional[str]
-        if not hasattr(process, 'steps'):
-            process_run_id = self.workflow_run_uri
-        elif not hasattr(job, 'workflow'):
-            # commandline tool execution as part of workflow
-            name = str(job.name) if hasattr(job, 'name') else ''
-            process_name = urllib.parse.quote(name, safe=":/,#")
-            process_run_id = self.start_process(process_name, datetime.datetime.now())
-        return process_run_id
-
-    def start_process(self, process_name, when, process_run_id=None):
-        # type: (Text, datetime.datetime, Optional[str]) -> str
-        """Record the start of each Process."""
-        if process_run_id is None:
-            process_run_id = uuid.uuid4().urn
-        prov_label = "Run of workflow/packed.cwl#main/" + process_name
-        self.document.activity(
-            process_run_id, None, None,
-            {provM.PROV_TYPE: WFPROV["ProcessRun"],
-             provM.PROV_LABEL: prov_label})
-        self.document.wasAssociatedWith(
-            process_run_id, self.engine_uuid, str("wf:main/" + process_name))
-        self.document.wasStartedBy(
-            process_run_id, None, self.workflow_run_uri,
-            when, None, None)
-        return process_run_id
-
-    def record_process_end(self, process_name, process_run_id, outputs, when):
-        # type: (Text, str, Any, datetime.datetime) -> None
-        self.generate_output_prov(outputs, process_run_id, process_name)
-        self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when)
-
-    def declare_file(self, value):
-        # type: (MutableMapping[Text, Any]) -> Tuple[ProvEntity, ProvEntity, str]
-        if value["class"] != "File":
-            raise ValueError("Must have class:File: %s" % value)
-        # Need to determine file hash aka RO filename
-        entity = None  # type: Optional[ProvEntity]
-        checksum = None
-        if 'checksum' in value:
-            csum = value['checksum']
-            (method, checksum) = csum.split("$", 1)
-            if method == SHA1 and \
-                    self.research_object.has_data_file(checksum):
-                entity = self.document.entity("data:" + checksum)
-
-        if not entity and 'location' in value:
-            location = str(value['location'])
-            # If we made it here, we'll have to add it to the RO
-            with self.fsaccess.open(location, "rb") as fhandle:
-                relative_path = self.research_object.add_data_file(fhandle)
-                # FIXME: This naively relies on add_data_file setting hash as filename
-                checksum = PurePath(relative_path).name
-                entity = self.document.entity(
-                    "data:" + checksum, {provM.PROV_TYPE: WFPROV["Artifact"]})
-                if "checksum" not in value:
-                    value["checksum"] = "%s$%s" % (SHA1, checksum)
-
-
-        if not entity and 'contents' in value:
-            # Anonymous file, add content as string
-            entity, checksum = self.declare_string(value["contents"])
-
-        # By here one of them should have worked!
-        if not entity or not checksum:
-            raise ValueError("class:File but missing checksum/location/content: %r" % value)
-
-
-        # Track filename and extension, this is generally useful only for
-        # secondaryFiles. Note that multiple uses of a file might thus record
-        # different names for the same entity, so we'll
-        # make/track a specialized entity by UUID
-        file_id = value.setdefault("@id", uuid.uuid4().urn)
-        # A specialized entity that has just these names
-        file_entity = self.document.entity(
-            file_id, [(provM.PROV_TYPE, WFPROV["Artifact"]),
-                      (provM.PROV_TYPE, WF4EVER["File"])])  # type: ProvEntity
-
-        if "basename" in value:
-            file_entity.add_attributes({CWLPROV["basename"]: value["basename"]})
-        if "nameroot" in value:
-            file_entity.add_attributes({CWLPROV["nameroot"]: value["nameroot"]})
-        if "nameext" in value:
-            file_entity.add_attributes({CWLPROV["nameext"]: value["nameext"]})
-        self.document.specializationOf(file_entity, entity)
-
-        # Check for secondaries
-        for sec in value.get("secondaryFiles", ()):
-            # TODO: Record these in a specializationOf entity with UUID?
-            if sec['class'] == "File":
-                (sec_entity, _, _) = self.declare_file(sec)
-            elif sec['class'] == "Directory":
-                sec_entity = self.declare_directory(sec)
-            else:
-                raise ValueError("Got unexpected secondaryFiles value: {}".format(sec))
-            # We don't know how/when/where the secondary file was generated,
-            # but CWL convention is a kind of summary/index derived
-            # from the original file. As its generally in a different format
-            # then prov:Quotation is not appropriate.
-            self.document.derivation(
-                sec_entity, file_entity,
-                other_attributes={PROV["type"]: CWLPROV["SecondaryFile"]})
-
-        return file_entity, entity, checksum
-
-    def declare_directory(self, value):  # type: (MutableMapping[Text, Any]) -> ProvEntity
-        """Register any nested files/directories."""
-        # FIXME: Calculate a hash-like identifier for directory
-        # so we get same value if it's the same filenames/hashes
-        # in a different location.
-        # For now, mint a new UUID to identify this directory, but
-        # attempt to keep it inside the value dictionary
-        dir_id = value.setdefault("@id", uuid.uuid4().urn)
-
-        # New annotation file to keep the ORE Folder listing
-        ore_doc_fn = dir_id.replace("urn:uuid:", "directory-") + ".ttl"
-        dir_bundle = self.document.bundle(self.metadata_ns[ore_doc_fn])
-
-        coll = self.document.entity(
-            dir_id, [(provM.PROV_TYPE, WFPROV["Artifact"]),
-                     (provM.PROV_TYPE, PROV["Collection"]),
-                     (provM.PROV_TYPE, PROV["Dictionary"]),
-                     (provM.PROV_TYPE, RO["Folder"])])
-        # ORE description of ro:Folder, saved separately
-        coll_b = dir_bundle.entity(
-            dir_id, [(provM.PROV_TYPE, RO["Folder"]),
-                     (provM.PROV_TYPE, ORE["Aggregation"])])
-        self.document.mentionOf(dir_id + "#ore", dir_id, dir_bundle.identifier)
-
-        # dir_manifest = dir_bundle.entity(
-        #     dir_bundle.identifier, {PROV["type"]: ORE["ResourceMap"],
-        #                             ORE["describes"]: coll_b.identifier})
-
-        coll_attribs = [(ORE["isDescribedBy"], dir_bundle.identifier)]
-        coll_b_attribs = []  # type: List[Tuple[Identifier, ProvEntity]]
-
-        # FIXME: .listing might not be populated yet - hopefully
-        # a later call to this method will sort that
-        is_empty = True
-
-        if "listing" not in value:
-            get_listing(self.fsaccess, value)
-        for entry in value.get("listing", []):
-            is_empty = False
-            # Declare child-artifacts
-            entity = self.declare_artefact(entry)
-            self.document.membership(coll, entity)
-            # Membership relation aka our ORE Proxy
-            m_id = uuid.uuid4().urn
-            m_entity = self.document.entity(m_id)
-            m_b = dir_bundle.entity(m_id)
-
-            # PROV-O style Dictionary
-            # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition
-            # ..as prov.py do not currently allow PROV-N extensions
-            # like hadDictionaryMember(..)
-            m_entity.add_asserted_type(PROV["KeyEntityPair"])
-
-            m_entity.add_attributes({
-                PROV["pairKey"]: entry["basename"],
-                PROV["pairEntity"]: entity,
-            })
-
-            # As well as a being a
-            # http://wf4ever.github.io/ro/2016-01-28/ro/#FolderEntry
-            m_b.add_asserted_type(RO["FolderEntry"])
-            m_b.add_asserted_type(ORE["Proxy"])
-            m_b.add_attributes({
-                RO["entryName"]: entry["basename"],
-                ORE["proxyIn"]: coll,
-                ORE["proxyFor"]: entity,
-
-            })
-            coll_attribs.append((PROV["hadDictionaryMember"], m_entity))
-            coll_b_attribs.append((ORE["aggregates"], m_b))
-
-        coll.add_attributes(coll_attribs)
-        coll_b.add_attributes(coll_b_attribs)
-
-        # Also Save ORE Folder as annotation metadata
-        ore_doc = ProvDocument()
-        ore_doc.add_namespace(ORE)
-        ore_doc.add_namespace(RO)
-        ore_doc.add_namespace(UUID)
-        ore_doc.add_bundle(dir_bundle)
-        ore_doc = ore_doc.flattened()
-        ore_doc_path = str(PurePosixPath(METADATA, ore_doc_fn))
-        with self.research_object.write_bag_file(ore_doc_path) as provenance_file:
-            ore_doc.serialize(provenance_file, format="rdf", rdf_format="turtle")
-        self.research_object.add_annotation(dir_id, [ore_doc_fn], ORE["isDescribedBy"].uri)
-
-        if is_empty:
-            # Empty directory
-            coll.add_asserted_type(PROV["EmptyCollection"])
-            coll.add_asserted_type(PROV["EmptyDictionary"])
-        self.research_object.add_uri(coll.identifier.uri)
-        return coll
-
-    def declare_string(self, value):
-        # type: (Union[Text, str]) -> Tuple[ProvEntity,Text]
-        """Save as string in UTF-8."""
-        byte_s = BytesIO(str(value).encode(ENCODING))
-        data_file = self.research_object.add_data_file(byte_s, content_type=TEXT_PLAIN)
-        checksum = PurePosixPath(data_file).name
-        # FIXME: Don't naively assume add_data_file uses hash in filename!
-        data_id = "data:%s" % PurePosixPath(data_file).stem
-        entity = self.document.entity(
-            data_id, {provM.PROV_TYPE: WFPROV["Artifact"],
-                      provM.PROV_VALUE: str(value)})  # type: ProvEntity
-        return entity, checksum
-
-    def declare_artefact(self, value):
-        # type: (Any) -> ProvEntity
-        """Create data artefact entities for all file objects."""
-        if value is None:
-            # FIXME: If this can happen in CWL, we'll
-            # need a better way to represent this in PROV
-            return self.document.entity(
-                CWLPROV["None"], {provM.PROV_LABEL: "None"})
-
-        if isinstance(value, (bool, int, float)):
-            # Typically used in job documents for flags
-
-            # FIXME: Make consistent hash URIs for these
-            # that somehow include the type
-            # (so "1" != 1 != "1.0" != true)
-            entity = self.document.entity(
-                uuid.uuid4().urn, {provM.PROV_VALUE: value})
-            self.research_object.add_uri(entity.identifier.uri)
-            return entity
-
-        if isinstance(value, (Text, str)):
-            (entity, _) = self.declare_string(value)
-            return entity
-
-        if isinstance(value, bytes):
-            # If we got here then we must be in Python 3
-            byte_s = BytesIO(value)
-            data_file = self.research_object.add_data_file(byte_s)
-            # FIXME: Don't naively assume add_data_file uses hash in filename!
-            data_id = "data:%s" % PurePosixPath(data_file).stem
-            return self.document.entity(
-                data_id, {provM.PROV_TYPE: WFPROV["Artifact"],
-                          provM.PROV_VALUE: str(value)})
-
-        if isinstance(value, MutableMapping):
-            if "@id" in value:
-                # Already processed this value, but it might not be in this PROV
-                entities = self.document.get_record(value["@id"])
-                if entities:
-                    return entities[0]
-                # else, unknown in PROV, re-add below as if it's fresh
-
-            # Base case - we found a File we need to update
-            if value.get("class") == "File":
-                (entity, _, _) = self.declare_file(value)
-                value["@id"] = entity.identifier.uri
-                return entity
-
-            if value.get("class") == "Directory":
-                entity = self.declare_directory(value)
-                value["@id"] = entity.identifier.uri
-                return entity
-            coll_id = value.setdefault("@id", uuid.uuid4().urn)
-            # some other kind of dictionary?
-            # TODO: also Save as JSON
-            coll = self.document.entity(
-                coll_id, [(provM.PROV_TYPE, WFPROV["Artifact"]),
-                          (provM.PROV_TYPE, PROV["Collection"]),
-                          (provM.PROV_TYPE, PROV["Dictionary"])])
-
-            if value.get("class"):
-                _logger.warning("Unknown data class %s.", value["class"])
-                # FIXME: The class might be "http://example.com/somethingelse"
-                coll.add_asserted_type(CWLPROV[value["class"]])
-
-            # Let's iterate and recurse
-            coll_attribs = []  # type: List[Tuple[Identifier, ProvEntity]]
-            for (key, val) in value.items():
-                v_ent = self.declare_artefact(val)
-                self.document.membership(coll, v_ent)
-                m_entity = self.document.entity(uuid.uuid4().urn)
-                # Note: only support PROV-O style dictionary
-                # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition
-                # as prov.py do not easily allow PROV-N extensions
-                m_entity.add_asserted_type(PROV["KeyEntityPair"])
-                m_entity.add_attributes({
-                    PROV["pairKey"]: str(key),
-                    PROV["pairEntity"]: v_ent
-                })
-                coll_attribs.append((PROV["hadDictionaryMember"], m_entity))
-            coll.add_attributes(coll_attribs)
-            self.research_object.add_uri(coll.identifier.uri)
-            return coll
-
-        # some other kind of Collection?
-        # TODO: also save as JSON
-        try:
-            members = []
-            for each_input_obj in iter(value):
-                # Recurse and register any nested objects
-                e = self.declare_artefact(each_input_obj)
-                members.append(e)
-
-            # If we reached this, then we were allowed to iterate
-            coll = self.document.entity(
-                uuid.uuid4().urn, [(provM.PROV_TYPE, WFPROV["Artifact"]),
-                                   (provM.PROV_TYPE, PROV["Collection"])])
-            if not members:
-                coll.add_asserted_type(PROV["EmptyCollection"])
-            else:
-                for member in members:
-                    # FIXME: This won't preserve order, for that
-                    # we would need to use PROV.Dictionary
-                    # with numeric keys
-                    self.document.membership(coll, member)
-            self.research_object.add_uri(coll.identifier.uri)
-            # FIXME: list value does not support adding "@id"
-            return coll
-        except TypeError:
-            _logger.warning("Unrecognized type %s of %r",
-                            type(value), value)
-            # Let's just fall back to Python repr()
-            entity = self.document.entity(
-                uuid.uuid4().urn, {provM.PROV_LABEL: repr(value)})
-            self.research_object.add_uri(entity.identifier.uri)
-            return entity
-
-    def used_artefacts(self,
-                       job_order,            # type: Union[Dict[Any, Any], List[Dict[Any, Any]]]
-                       process_run_id,       # type: str
-                       name=None             # type: Optional[str]
-                      ):  # type: (...) -> None
-        """Add used() for each data artefact."""
-        if isinstance(job_order, list):
-            for entry in job_order:
-                self.used_artefacts(entry, process_run_id, name)
-        else:
-            # FIXME: Use workflow name in packed.cwl, "main" is wrong for nested workflows
-            base = "main"
-            if name is not None:
-                base += "/" + name
-            for key, value in job_order.items():
-                prov_role = self.wf_ns["%s/%s" % (base, key)]
-                try:
-                    entity = self.declare_artefact(value)
-                    self.document.used(
-                        process_run_id, entity, datetime.datetime.now(), None,
-                        {"prov:role": prov_role})
-                except OSError:
-                    pass
-
-    def generate_output_prov(self,
-                             final_output,    # type: Union[Dict[Text, Any], List[Dict[Text, Any]]]
-                             process_run_id,  # type: Optional[str]
-                             name             # type: Optional[Text]
-                            ):   # type: (...) -> None
-        """Call wasGeneratedBy() for each output,copy the files into the RO."""
-        if isinstance(final_output, list):
-            for entry in final_output:
-                self.generate_output_prov(entry, process_run_id, name)
-        else:
-            # Timestamp should be created at the earliest
-            timestamp = datetime.datetime.now()
-
-            # For each output, find/register the corresponding
-            # entity (UUID) and document it as generated in
-            # a role corresponding to the output
-            for output, value in final_output.items():
-                entity = self.declare_artefact(value)
-                if name is not None:
-                    name = urllib.parse.quote(str(name), safe=":/,#")
-                    # FIXME: Probably not "main" in nested workflows
-                    role = self.wf_ns["main/%s/%s" % (name, output)]
-                else:
-                    role = self.wf_ns["main/%s" % output]
-
-                if not process_run_id:
-                    process_run_id = self.workflow_run_uri
-
-                self.document.wasGeneratedBy(
-                    entity, process_run_id, timestamp, None, {"prov:role": role})
-
-    def prospective_prov(self, job):
-        # type: (Any) -> None
-        """Create prospective prov recording as wfdesc prov:Plan."""
-        if not hasattr(job, "steps"):
-            # direct command line tool execution
-            self.document.entity(
-                "wf:main", {provM.PROV_TYPE: WFDESC["Process"],
-                            "prov:type": PROV["Plan"],
-                            "prov:label":"Prospective provenance"})
-            return
-
-        self.document.entity(
-            "wf:main", {provM.PROV_TYPE: WFDESC["Workflow"],
-                        "prov:type": PROV["Plan"],
-                        "prov:label":"Prospective provenance"})
-
-        for step in job.steps:
-            stepnametemp = "wf:main/" + str(step.name)[5:]
-            stepname = urllib.parse.quote(stepnametemp, safe=":/,#")
-            step = self.document.entity(
-                stepname, {provM.PROV_TYPE: WFDESC["Process"],
-                           "prov:type": PROV["Plan"]})
-            self.document.entity(
-                "wf:main", {"wfdesc:hasSubProcess": step,
-                            "prov:label": "Prospective provenance"})
-        # TODO: Declare roles/parameters as well
-
-    def activity_has_provenance(self, activity, prov_ids):
-        # type: (str, List[Identifier]) -> None
-        """Add http://www.w3.org/TR/prov-aq/ relations to nested PROV files."""
-        # NOTE: The below will only work if the corresponding metadata/provenance arcp URI
-        # is a pre-registered namespace in the PROV Document
-        attribs = [(PROV["has_provenance"], prov_id) for prov_id in prov_ids]
-        self.document.activity(activity, other_attributes=attribs)
-        # Tip: we can't use https://www.w3.org/TR/prov-links/#term-mention
-        # as prov:mentionOf() is only for entities, not activities
-        uris = [i.uri for i in prov_ids]
-        self.research_object.add_annotation(activity, uris, PROV["has_provenance"].uri)
-
-    def finalize_prov_profile(self, name):
-        # type: (Optional[Text]) -> List[Identifier]
-        """Transfer the provenance related files to the RO."""
-        # NOTE: Relative posix path
-        if name is None:
-            # master workflow, fixed filenames
-            filename = "primary.cwlprov"
-        else:
-            # ASCII-friendly filename, avoiding % as we don't want %2520 in manifest.json
-            wf_name = urllib.parse.quote(str(name), safe="").replace("%", "_")
-            # Note that the above could cause overlaps for similarly named
-            # workflows, but that's OK as we'll also include run uuid
-            # which also covers thhe case of this step being run in
-            # multiple places or iterations
-            filename = "%s.%s.cwlprov" % (wf_name, self.workflow_run_uuid)
-
-        basename = str(PurePosixPath(PROVENANCE)/filename)
-
-        # TODO: Also support other profiles than CWLProv, e.g. ProvOne
-
-        # list of prov identifiers of provenance files
-        prov_ids = []
-
-        # https://www.w3.org/TR/prov-xml/
-        with self.research_object.write_bag_file(basename + ".xml") as provenance_file:
-            self.document.serialize(provenance_file, format="xml", indent=4)
-            prov_ids.append(self.provenance_ns[filename + ".xml"])
-
-        # https://www.w3.org/TR/prov-n/
-        with self.research_object.write_bag_file(basename + ".provn") as provenance_file:
-            self.document.serialize(provenance_file, format="provn", indent=2)
-            prov_ids.append(self.provenance_ns[filename + ".provn"])
-
-        # https://www.w3.org/Submission/prov-json/
-        with self.research_object.write_bag_file(basename + ".json") as provenance_file:
-            self.document.serialize(provenance_file, format="json", indent=2)
-            prov_ids.append(self.provenance_ns[filename + ".json"])
-
-        # "rdf" aka https://www.w3.org/TR/prov-o/
-        # which can be serialized to ttl/nt/jsonld (and more!)
-
-        # https://www.w3.org/TR/turtle/
-        with self.research_object.write_bag_file(basename + ".ttl") as provenance_file:
-            self.document.serialize(provenance_file, format="rdf", rdf_format="turtle")
-            prov_ids.append(self.provenance_ns[filename + ".ttl"])
-
-        # https://www.w3.org/TR/n-triples/
-        with self.research_object.write_bag_file(basename + ".nt") as provenance_file:
-            self.document.serialize(provenance_file, format="rdf", rdf_format="ntriples")
-            prov_ids.append(self.provenance_ns[filename + ".nt"])
-
-        # https://www.w3.org/TR/json-ld/
-        # TODO: Use a nice JSON-LD context
-        # see also https://eprints.soton.ac.uk/395985/
-        # 404 Not Found on https://provenance.ecs.soton.ac.uk/prov.jsonld :(
-        with self.research_object.write_bag_file(basename + ".jsonld") as provenance_file:
-            self.document.serialize(provenance_file, format="rdf", rdf_format="json-ld")
-            prov_ids.append(self.provenance_ns[filename + ".jsonld"])
-
-        _logger.debug(u"[provenance] added provenance: %s", prov_ids)
-        return prov_ids
-
-
-class ResearchObject():
-    """CWLProv Research Object."""
-
-    def __init__(self, fsaccess, temp_prefix_ro="tmp", orcid='', full_name=''):
-        # type: (StdFsAccess, str, Text, Text) -> None
-        """Initialize the ResearchObject."""
-        self.temp_prefix = temp_prefix_ro
-        self.orcid = '' if not orcid else _valid_orcid(orcid)
-        self.full_name = full_name
-        tmp_dir, tmp_prefix = os.path.split(temp_prefix_ro)
-        self.folder = os.path.abspath(tempfile.mkdtemp(prefix=tmp_prefix,
-            dir=tmp_dir))  # type: Text
-        self.closed = False
-        # map of filename "data/de/alsdklkas": 12398123 bytes
-        self.bagged_size = {}  # type: Dict[Text, int]
-        self.tagfiles = set()  # type: Set[Text]
-        self._file_provenance = {}  # type: Dict[Text, Dict[Text, Text]]
-        self._external_aggregates = []  # type: List[Dict[Text, Text]]
-        self.annotations = []  # type: List[Dict[Text, Any]]
-        self._content_types = {}  # type: Dict[Text,str]
-        self.fsaccess = fsaccess
-        # These should be replaced by generate_prov_doc when workflow/run IDs are known:
-        self.engine_uuid = "urn:uuid:%s" % uuid.uuid4()
-        self.ro_uuid = uuid.uuid4()
-        self.base_uri = "arcp://uuid,%s/" % self.ro_uuid
-        self.cwltool_version = "cwltool %s" % versionstring().split()[-1]
-        ##
-        self.relativised_input_object = {}  # type: Dict[Any, Any]
-
-        self._initialize()
-        _logger.debug(u"[provenance] Temporary research object: %s",
-                      self.folder)
-
-    def self_check(self):  # type: () -> None
-        """Raise ValueError if this RO is closed."""
-        if self.closed:
-            raise ValueError(
-                "This ResearchObject has already been closed and is not "
-                "available for futher manipulation.")
-
-    def __str__(self):  # type: () -> str
-        """Represent this RO as a string."""
-        return "ResearchObject <{}> in <{}>".format(self.ro_uuid, self.folder)
-
-    def _initialize(self):  # type: () -> None
-        for research_obj_folder in (METADATA, DATA, WORKFLOW, SNAPSHOT,
-                                    PROVENANCE, LOGS):
-            os.makedirs(os.path.join(self.folder, research_obj_folder))
-        self._initialize_bagit()
-
-    def _initialize_bagit(self):  # type: () -> None
-        """Write fixed bagit header."""
-        self.self_check()
-        bagit = os.path.join(self.folder, "bagit.txt")
-        # encoding: always UTF-8 (although ASCII would suffice here)
-        # newline: ensure LF also on Windows
-        with open(bagit, "w", encoding=ENCODING, newline='\n') as bag_it_file:
-            # TODO: \n or \r\n ?
-            bag_it_file.write(u"BagIt-Version: 0.97\n")
-            bag_it_file.write(u"Tag-File-Character-Encoding: %s\n" % ENCODING)
-
-    def open_log_file_for_activity(self, uuid_uri): # type: (Text) -> WritableBagFile
-        self.self_check()
-        # Ensure valid UUID for safe filenames
-        activity_uuid = uuid.UUID(uuid_uri)
-        if activity_uuid.urn == self.engine_uuid:
-            # It's the engine aka cwltool!
-            name = "engine"
-        else:
-            name = "activity"
-        p = os.path.join(LOGS, "{}.{}.txt".format(name, activity_uuid))
-        _logger.debug("[provenance] Opening log file for %s: %s" % (name, p))
-        self.add_annotation(activity_uuid.urn, [p], CWLPROV["log"].uri)
-        return self.write_bag_file(p)
-
-    def _finalize(self):  # type: () -> None
-        self._write_ro_manifest()
-        self._write_bag_info()
-
-    def user_provenance(self, document):  # type: (ProvDocument) -> None
-        """Add the user provenance."""
-        self.self_check()
-        (username, fullname) = _whoami()
-
-        if not self.full_name:
-            self.full_name = fullname
-
-        document.add_namespace(UUID)
-        document.add_namespace(ORCID)
-        document.add_namespace(FOAF)
-        account = document.agent(
-            ACCOUNT_UUID, {provM.PROV_TYPE: FOAF["OnlineAccount"],
-                           "prov:label": username,
-                           FOAF["accountName"]: username})
-
-        user = document.agent(
-            self.orcid or USER_UUID,
-            {provM.PROV_TYPE: PROV["Person"],
-             "prov:label": self.full_name,
-             FOAF["name"]: self.full_name,
-             FOAF["account"]: account})
-        # cwltool may be started on the shell (directly by user),
-        # by shell script (indirectly by user)
-        # or from a different program
-        #   (which again is launched by any of the above)
-        #
-        # We can't tell in which way, but ultimately we're still
-        # acting in behalf of that user (even if we might
-        # get their name wrong!)
-        document.actedOnBehalfOf(account, user)
-
-    def write_bag_file(self, path, encoding=ENCODING):
-        # type: (Text, Optional[str]) -> WritableBagFile
-        """Write the bag file into our research object."""
-        self.self_check()
-        # For some reason below throws BlockingIOError
-        #fp = BufferedWriter(WritableBagFile(self, path))
-        bag_file = WritableBagFile(self, path)
-        if encoding is not None:
-            # encoding: match Tag-File-Character-Encoding: UTF-8
-            # newline: ensure LF also on Windows
-            return cast(WritableBagFile,
-                        TextIOWrapper(cast(IO[bytes], bag_file), encoding=encoding,
-                                      newline="\n"))
-        return bag_file
-
-    def add_tagfile(self, path, timestamp=None):
-        # type: (Text, Optional[datetime.datetime]) -> None
-        """Add tag files to our research object."""
-        self.self_check()
-        checksums = {}
-        # Read file to calculate its checksum
-        if os.path.isdir(path):
-            return
-            # FIXME: do the right thing for directories
-        with open(path, "rb") as tag_file:
-            # FIXME: Should have more efficient open_tagfile() that
-            # does all checksums in one go while writing through,
-            # adding checksums after closing.
-            # Below probably OK for now as metadata files
-            # are not too large..?
-
-            checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1)
-
-            tag_file.seek(0)
-            checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256)
-
-            tag_file.seek(0)
-            checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512)
-
-        rel_path = _posix_path(os.path.relpath(path, self.folder))
-        self.tagfiles.add(rel_path)
-        self.add_to_manifest(rel_path, checksums)
-        if timestamp is not None:
-            self._file_provenance[rel_path] = {"createdOn": timestamp.isoformat()}
-
-    def _ro_aggregates(self):
-        # type: () -> List[Dict[Text, Any]]
-        """Gather dictionary of files to be added to the manifest."""
-        def guess_mediatype(rel_path):
-            # type: (Text) -> Dict[Text, Any]
-            """Return the mediatypes."""
-            media_types = {
-                # Adapted from
-                # https://w3id.org/bundle/2014-11-05/#media-types
-
-                "txt": TEXT_PLAIN,
-                "ttl": 'text/turtle; charset="UTF-8"',
-                "rdf": 'application/rdf+xml',
-                "json": 'application/json',
-                "jsonld": 'application/ld+json',
-                "xml": 'application/xml',
-                ##
-                "cwl": 'text/x+yaml; charset="UTF-8"',
-                "provn": 'text/provenance-notation; charset="UTF-8"',
-                "nt": 'application/n-triples',
-                }  # type: Dict[Text, Text]
-            conforms_to = {
-                "provn": 'http://www.w3.org/TR/2013/REC-prov-n-20130430/',
-                "cwl": 'https://w3id.org/cwl/',
-                }  # type: Dict[Text, Text]
-
-            prov_conforms_to = {
-                "provn": 'http://www.w3.org/TR/2013/REC-prov-n-20130430/',
-                "rdf": 'http://www.w3.org/TR/2013/REC-prov-o-20130430/',
-                "ttl": 'http://www.w3.org/TR/2013/REC-prov-o-20130430/',
-                "nt": 'http://www.w3.org/TR/2013/REC-prov-o-20130430/',
-                "jsonld": 'http://www.w3.org/TR/2013/REC-prov-o-20130430/',
-                "xml": 'http://www.w3.org/TR/2013/NOTE-prov-xml-20130430/',
-                "json": 'http://www.w3.org/Submission/2013/SUBM-prov-json-20130424/',
-                }  # type: Dict[Text, Text]
-
-
-            extension = rel_path.rsplit(".", 1)[-1].lower()  # type: Optional[Text]
-            if extension == rel_path:
-                # No ".", no extension
-                extension = None
-
-            local_aggregate = {}  # type: Dict[Text, Any]
-            if extension in media_types:
-                local_aggregate["mediatype"] = media_types[extension]
-
-            if extension in conforms_to:
-                # TODO: Open CWL file to read its declared "cwlVersion", e.g.
-                # cwlVersion = "v1.0"
-                local_aggregate["conformsTo"] = conforms_to[extension]
-
-            if (rel_path.startswith(_posix_path(PROVENANCE))
-                    and extension in prov_conforms_to):
-                if ".cwlprov" in rel_path:
-                    # Our own!
-                    local_aggregate["conformsTo"] = [prov_conforms_to[extension], CWLPROV_VERSION]
-                else:
-                    # Some other PROV
-                    # TODO: Recognize ProvOne etc.
-                    local_aggregate["conformsTo"] = prov_conforms_to[extension]
-            return local_aggregate
-
-        aggregates = [] # type: List[Dict[Text, Any]]
-        for path in self.bagged_size.keys():
-            aggregate_dict = {}  # type: Dict[Text, Any]
-
-            temp_path = PurePosixPath(path)
-            folder = temp_path.parent
-            filename = temp_path.name
-
-            # NOTE: Here we end up aggregating the abstract
-            # data items by their sha1 hash, so that it matches
-            # the entity() in the prov files.
-
-            # TODO: Change to nih:sha-256; hashes
-            #  https://tools.ietf.org/html/rfc6920#section-7
-            aggregate_dict["uri"] = 'urn:hash::sha1:' + filename
-            aggregate_dict["bundledAs"] = {
-                # The arcp URI is suitable ORE proxy; local to this Research Object.
-                # (as long as we don't also aggregate it by relative path!)
-                "uri": self.base_uri + path,
-                # relate it to the data/ path
-                "folder": "/%s/" % folder,
-                "filename": filename,
-            }
-            if path in self._file_provenance:
-                # Made by workflow run, merge captured provenance
-                aggregate_dict["bundledAs"].update(self._file_provenance[path])
-            else:
-                # Probably made outside wf run, part of job object?
-                pass
-            if path in self._content_types:
-                aggregate_dict["mediatype"] = self._content_types[path]
-
-            aggregates.append(aggregate_dict)
-
-        for path in self.tagfiles:
-            if (not (path.startswith(METADATA) or path.startswith(WORKFLOW) or
-                     path.startswith(SNAPSHOT))):
-                # probably a bagit file
-                continue
-            if path == PurePosixPath(METADATA)/"manifest.json":
-                # Should not really be there yet! But anyway, we won't
-                # aggregate it.
-                continue
-
-            rel_aggregates = {} # type: Dict[Text, Any]
-            # These are local paths like metadata/provenance - but
-            # we need to relativize them for our current directory for
-            # as we are saved in metadata/manifest.json
-            uri = str(Path(os.pardir)/path)
-
-            rel_aggregates["uri"] = uri
-            rel_aggregates.update(guess_mediatype(path))
-
-            if path in self._file_provenance:
-                # Propagate file provenance (e.g. timestamp)
-                rel_aggregates.update(self._file_provenance[path])
-            elif not path.startswith(SNAPSHOT):
-                # make new timestamp?
-                rel_aggregates.update(self._self_made())
-            aggregates.append(rel_aggregates)
-        aggregates.extend(self._external_aggregates)
-        return aggregates
-
-    def add_uri(self, uri, timestamp=None):
-        # type: (str, Optional[datetime.datetime]) -> Dict[Text, Any]
-        self.self_check()
-        aggr = self._self_made(timestamp=timestamp)
-        aggr["uri"] = uri
-        self._external_aggregates.append(aggr)
-        return aggr
-
-    def add_annotation(self, about, content, motivated_by="oa:describing"):
-        # type: (str, List[str], str) -> str
-        """Cheap URI relativize for current directory and /."""
-        self.self_check()
-        curr = self.base_uri + METADATA + "/"
-        content = [c.replace(curr, "").replace(self.base_uri, "../")
-                   for c in content]
-        uri = uuid.uuid4().urn
-        ann = {
-            u"uri": uri,
-            u"about": about,
-            u"content": content,
-            u"oa:motivatedBy": {"@id": motivated_by}
-        }
-        self.annotations.append(ann)
-        return uri
-
-    def _ro_annotations(self):
-        # type: () -> List[Dict[Text, Any]]
-        annotations = []  # type: List[Dict[Text, Any]]
-        annotations.append({
-            "uri": uuid.uuid4().urn,
-            "about": self.ro_uuid.urn,
-            "content": "/",
-            # https://www.w3.org/TR/annotation-vocab/#named-individuals
-            "oa:motivatedBy": {"@id": "oa:describing"}
-        })
-
-        # How was it run?
-        # FIXME: Only primary*
-        prov_files = [str(PurePosixPath(p).relative_to(METADATA)) for p in self.tagfiles
-                      if p.startswith(_posix_path(PROVENANCE))
-                      and "/primary." in p]
-        annotations.append({
-            "uri": uuid.uuid4().urn,
-            "about": self.ro_uuid.urn,
-            "content": prov_files,
-            # Modulation of https://www.w3.org/TR/prov-aq/
-            "oa:motivatedBy": {"@id": "http://www.w3.org/ns/prov#has_provenance"}
-        })
-
-        # Where is the main workflow?
-        annotations.append({
-            "uri": uuid.uuid4().urn,
-            "about": str(PurePosixPath("..")/WORKFLOW/"packed.cwl"),
-            "oa:motivatedBy": {"@id": "oa:highlighting"}
-        })
-
-        annotations.append({
-            "uri": uuid.uuid4().urn,
-            "about": self.ro_uuid.urn,
-            "content": [str(PurePosixPath("..")/WORKFLOW/"packed.cwl"),
-                        str(PurePosixPath("..")/WORKFLOW/"primary-job.json")],
-            "oa:motivatedBy": {"@id": "oa:linking"}
-        })
-        # Add user-added annotations at end
-        annotations.extend(self.annotations)
-        return annotations
-
-    def _authored_by(self):
-        # type: () -> Dict[Text, Any]
-        authored_by = {}
-        if self.orcid:
-            authored_by["orcid"] = self.orcid
-        if self.full_name:
-            authored_by["name"] = self.full_name
-            if not self.orcid:
-                authored_by["uri"] = USER_UUID
-
-        if authored_by:
-            return {"authoredBy": authored_by}
-        return {}
-
-
-    def _write_ro_manifest(self):
-        # type: () -> None
-
-        # Does not have to be this order, but it's nice to be consistent
-        manifest = OrderedDict()  # type: Dict[Text, Any]
-        manifest["@context"] = [
-            {"@base": "%s%s/" % (self.base_uri, _posix_path(METADATA))},
-            "https://w3id.org/bundle/context"
-        ]
-        manifest["id"] = "/"
-        manifest["conformsTo"] = CWLPROV_VERSION
-        filename = "manifest.json"
-        manifest["manifest"] = filename
-        manifest.update(self._self_made())
-        manifest.update(self._authored_by())
-        manifest["aggregates"] = self._ro_aggregates()
-        manifest["annotations"] = self._ro_annotations()
-
-        json_manifest = json_dumps(manifest, indent=4, ensure_ascii=False)
-        rel_path = str(PurePosixPath(METADATA)/filename)
-        json_manifest += "\n"
-        with self.write_bag_file(rel_path) as manifest_file:
-            manifest_file.write(json_manifest)
-
-    def _write_bag_info(self):
-        # type: () -> None
-
-        with self.write_bag_file("bag-info.txt") as info_file:
-            info_file.write(u"Bag-Software-Agent: %s\n" % self.cwltool_version)
-            # FIXME: require sha-512 of payload to comply with profile?
-            # FIXME: Update profile
-            info_file.write(u"BagIt-Profile-Identifier: https://w3id.org/ro/bagit/profile\n")
-            info_file.write(u"Bagging-Date: %s\n" % datetime.date.today().isoformat())
-            info_file.write(u"External-Description: Research Object of CWL workflow run\n")
-            if self.full_name:
-                info_file.write(u"Contact-Name: %s\n" % self.full_name)
-
-            # NOTE: We can't use the urn:uuid:{UUID} of the workflow run (a prov:Activity)
-            # as identifier for the RO/bagit (a prov:Entity). However the arcp base URI is good.
-            info_file.write(u"External-Identifier: %s\n" % self.base_uri)
-
-            # Calculate size of data/ (assuming no external fetch.txt files)
-            total_size = sum(self.bagged_size.values())
-            num_files = len(self.bagged_size)
-            info_file.write(u"Payload-Oxum: %d.%d\n" % (total_size, num_files))
-        _logger.debug(u"[provenance] Generated bagit metadata: %s",
-                      self.folder)
-
-    def generate_snapshot(self, prov_dep):
-        # type: (MutableMapping[Text, Any]) -> None
-        """Copy all of the CWL files to the snapshot/ directory."""
-        self.self_check()
-        for key, value in prov_dep.items():
-            if key == "location" and value.split("/")[-1]:
-                filename = value.split("/")[-1]
-                path = os.path.join(self.folder, SNAPSHOT, filename)
-                filepath = ''
-                if "file://" in value:
-                    filepath = value[7:]
-                else:
-                    filepath = value
-
-                # FIXME: What if destination path already exists?
-                if os.path.exists(filepath):
-                    try:
-                        if os.path.isdir(filepath):
-                            shutil.copytree(filepath, path)
-                        else:
-                            shutil.copy(filepath, path)
-                        timestamp = datetime.datetime.fromtimestamp(os.path.getmtime(filepath))
-                        self.add_tagfile(path, timestamp)
-                    except PermissionError:
-                        pass  # FIXME: avoids duplicate snapshotting; need better solution
-            elif key in ("secondaryFiles", "listing"):
-                for files in value:
-                    if isinstance(files, MutableMapping):
-                        self.generate_snapshot(files)
-            else:
-                pass
-
-    def packed_workflow(self, packed):  # type: (Text) -> None
-        """Pack CWL description to generate re-runnable CWL object in RO."""
-        self.self_check()
-        rel_path = str(PurePosixPath(WORKFLOW)/"packed.cwl")
-        # Write as binary
-        with self.write_bag_file(rel_path, encoding=None) as write_pack:
-            # YAML is always UTF8, but json.dumps gives us str in py2
-            write_pack.write(packed.encode(ENCODING))
-        _logger.debug(u"[provenance] Added packed workflow: %s", rel_path)
-
-    def has_data_file(self, sha1hash):  # type: (str) -> bool
-        """Confirm the presence of the given file in the RO."""
-        folder = os.path.join(self.folder, DATA, sha1hash[0:2])
-        hash_path = os.path.join(folder, sha1hash)
-        return os.path.isfile(hash_path)
-
-    def add_data_file(self, from_fp, timestamp=None, content_type=None):
-        # type: (IO[Any], Optional[datetime.datetime], Optional[str]) -> Text
-        """Copy inputs to data/ folder."""
-        self.self_check()
-        tmp_dir, tmp_prefix = os.path.split(self.temp_prefix)
-        with tempfile.NamedTemporaryFile(
-                prefix=tmp_prefix, dir=tmp_dir, delete=False) as tmp:
-            checksum = checksum_copy(from_fp, tmp)
-
-        # Calculate hash-based file path
-        folder = os.path.join(self.folder, DATA, checksum[0:2])
-        path = os.path.join(folder, checksum)
-        # os.rename assumed safe, as our temp file should
-        # be in same file system as our temp folder
-        if not os.path.isdir(folder):
-            os.makedirs(folder)
-        os.rename(tmp.name, path)
-
-        # Relative posix path
-        # (to avoid \ on Windows)
-        rel_path = _posix_path(os.path.relpath(path, self.folder))
-
-        # Register in bagit checksum
-        if Hasher == hashlib.sha1:
-            self._add_to_bagit(rel_path, sha1=checksum)
-        else:
-            _logger.warning(
-                u"[provenance] Unknown hash method %s for bagit manifest",
-                Hasher)
-            # Inefficient, bagit support need to checksum again
-            self._add_to_bagit(rel_path)
-        _logger.debug(u"[provenance] Added data file %s", path)
-        if timestamp is not None:
-            self._file_provenance[rel_path] = self._self_made(timestamp)
-        _logger.debug(u"[provenance] Relative path for data file %s", rel_path)
-
-        if content_type is not None:
-            self._content_types[rel_path] = content_type
-        return rel_path
-
-    def _self_made(self, timestamp=None):
-        # type: (Optional[datetime.datetime]) -> Dict[Text, Any]
-        if timestamp is None:
-            timestamp = datetime.datetime.now()
-        return {
-            "createdOn": timestamp.isoformat(),
-            "createdBy": {"uri": self.engine_uuid,
-                          "name": self.cwltool_version}
-        }
-
-    def add_to_manifest(self, rel_path, checksums):
-        # type: (Text, Dict[str,str]) -> None
-        """Add files to the research object manifest."""
-        self.self_check()
-        if PurePosixPath(rel_path).is_absolute():
-            raise ValueError("rel_path must be relative: %s" % rel_path)
-
-        if os.path.commonprefix(["data/", rel_path]) == "data/":
-            # payload file, go to manifest
-            manifest = "manifest"
-        else:
-            # metadata file, go to tag manifest
-            manifest = "tagmanifest"
-
-        # Add checksums to corresponding manifest files
-        for (method, hash_value) in checksums.items():
-            # File not in manifest because we bailed out on
-            # existence in bagged_size above
-            manifestpath = os.path.join(
-                self.folder, "%s-%s.txt" % (manifest, method.lower()))
-            # encoding: match Tag-File-Character-Encoding: UTF-8
-            # newline: ensure LF also on Windows
-            with open(manifestpath, "a", encoding=ENCODING, newline='\n') \
-                    as checksum_file:
-                line = u"%s  %s\n" % (hash_value, rel_path)
-                _logger.debug(u"[provenance] Added to %s: %s", manifestpath, line)
-                checksum_file.write(line)
-
-
-    def _add_to_bagit(self, rel_path, **checksums):
-        # type: (Text, Any) -> None
-        if PurePosixPath(rel_path).is_absolute():
-            raise ValueError("rel_path must be relative: %s" % rel_path)
-        local_path = os.path.join(self.folder, _local_path(rel_path))
-        if not os.path.exists(local_path):
-            raise IOError("File %s does not exist within RO: %s" % (rel_path, local_path))
-
-        if rel_path in self.bagged_size:
-            # Already added, assume checksum OK
-            return
-        self.bagged_size[rel_path] = os.path.getsize(local_path)
-
-        if SHA1 not in checksums:
-            # ensure we always have sha1
-            checksums = dict(checksums)
-            with open(local_path, "rb") as file_path:
-                # FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile?
-                checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1)
-
-        self.add_to_manifest(rel_path, checksums)
-
-    def create_job(self,
-                   builder_job,     # type: Dict[Text, Any]
-                   wf_job=None,     # type: Optional[Callable[[Dict[Text, Text], Callable[[Any, Any], Any], RuntimeContext], Generator[Any, None, None]]]
-                   is_output=False  # type: bool
-                  ):  # type: (...) -> Dict[Text, Text]
-        #TODO customise the file
-        """Generate the new job object with RO specific relative paths."""
-        copied = copy.deepcopy(builder_job)
-        relativised_input_objecttemp = {}  # type: Dict[Text, Any]
-        self._relativise_files(copied)
-        def jdefault(o):  # type: (Any) -> Dict[Any, Any]
-            return dict(o)
-        if is_output:
-            rel_path = PurePosixPath(WORKFLOW)/"primary-output.json"
-        else:
-            rel_path = PurePosixPath(WORKFLOW)/"primary-job.json"
-        j = json_dumps(copied, indent=4, ensure_ascii=False, default=jdefault)
-        with self.write_bag_file(str(rel_path)) as file_path:
-            file_path.write(j + u"\n")
-        _logger.debug(u"[provenance] Generated customised job file: %s",
-                      rel_path)
-        # Generate dictionary with keys as workflow level input IDs and values
-        # as
-        # 1) for files the relativised location containing hash
-        # 2) for other attributes, the actual value.
-        relativised_input_objecttemp = {}
-        for key, value in copied.items():
-            if isinstance(value, MutableMapping):
-                if value.get("class") in ("File", "Directory"):
-                    relativised_input_objecttemp[key] = value
-            else:
-                relativised_input_objecttemp[key] = value
-        self.relativised_input_object.update(
-            {k: v for k, v in relativised_input_objecttemp.items() if v})
-        return self.relativised_input_object
-
-    def _relativise_files(self, structure):
-        # type: (Dict[Any, Any]) -> None
-        """Save any file objects into the RO and update the local paths."""
-        # Base case - we found a File we need to update
-        _logger.debug(u"[provenance] Relativising: %s", structure)
-
-        if isinstance(structure, MutableMapping):
-            if structure.get("class") == "File":
-                relative_path = None
-                if "checksum" in structure:
-                    alg, checksum = structure["checksum"].split("$")
-                    if alg != SHA1:
-                        raise TypeError(
-                            "Only SHA1 CWL checksums are currently supported: "
-                            "{}".format(structure))
-                    if self.has_data_file(checksum):
-                        prefix = checksum[0:2]
-                        relative_path = PurePosixPath("data")/prefix/checksum
-
-                if not relative_path is not None and "location" in structure:
-                    # Register in RO; but why was this not picked
-                    # up by used_artefacts?
-                    _logger.info("[provenance] Adding to RO %s", structure["location"])
-                    with self.fsaccess.open(structure["location"], "rb") as fp:
-                        relative_path = self.add_data_file(fp)
-                        checksum = PurePosixPath(relative_path).name
-                        structure["checksum"] = "%s$%s" % (SHA1, checksum)
-                if relative_path is not None:
-                    # RO-relative path as new location
-                    structure["location"] = str(PurePosixPath("..")/relative_path)
-                else:
-                    _logger.warning("Could not determine RO path for file %s", structure)
-                if "path" in structure:
-                    del structure["path"]
-
-            if structure.get("class") == "Directory":
-                # TODO: Generate anonymoys Directory with a "listing"
-                # pointing to the hashed files
-                del structure["location"]
-
-            for val in structure.values():
-                try:
-                    self._relativise_files(val)
-                except OSError:
-                    pass
-            return
-
-        if isinstance(structure, (str, Text)):
-            # Just a string value, no need to iterate further
-            return
-        try:
-            for obj in iter(structure):
-                # Recurse and rewrite any nested File objects
-                self._relativise_files(obj)
-        except TypeError:
-            pass
-
-    def close(self, save_to=None):
-        # type: (Optional[str]) -> None
-        """Close the Research Object, optionally saving to specified folder.
-
-        Closing will remove any temporary files used by this research object.
-        After calling this method, this ResearchObject instance can no longer
-        be used, except for no-op calls to .close().
-
-        The 'saveTo' folder should not exist - if it does, it will be deleted.
-
-        It is safe to call this function multiple times without the
-        'saveTo' argument, e.g. within a try..finally block to
-        ensure the temporary files of this Research Object are removed.
-        """
-        if save_to is None:
-            if not self.closed:
-                _logger.debug(u"[provenance] Deleting temporary %s", self.folder)
-                shutil.rmtree(self.folder, ignore_errors=True)
-        else:
-            save_to = os.path.abspath(save_to)
-            _logger.info(u"[provenance] Finalizing Research Object")
-            self._finalize()  # write manifest etc.
-            # TODO: Write as archive (.zip or .tar) based on extension?
-
-            if os.path.isdir(save_to):
-                _logger.info(u"[provenance] Deleting existing %s", save_to)
-                shutil.rmtree(save_to)
-            shutil.move(self.folder, save_to)
-            _logger.info(u"[provenance] Research Object saved to %s", save_to)
-            self.folder = save_to
-        self.closed = True
-
-def checksum_copy(src_file,            # type: IO[Any]
-                  dst_file=None,      # type: Optional[IO[Any]]
-                  hasher=Hasher,        # type: Callable[[], hashlib._Hash]
-                  buffersize=1024*1024  # type: int
-                 ): # type: (...) -> str
-    """Compute checksums while copying a file."""
-    # TODO: Use hashlib.new(Hasher_str) instead?
-    checksum = hasher()
-    contents = src_file.read(buffersize)
-    if dst_file and hasattr(dst_file, "name") and hasattr(src_file, "name"):
-        temp_location = os.path.join(os.path.dirname(dst_file.name),
-                                     str(uuid.uuid4()))
-        try:
-            os.rename(dst_file.name, temp_location)
-            os.link(src_file.name, dst_file.name)
-            dst_file = None
-            os.unlink(temp_location)
-        except OSError:
-            pass
-        if os.path.exists(temp_location):
-            os.rename(temp_location, dst_file.name)  # type: ignore
-    while contents != b"":
-        if dst_file is not None:
-            dst_file.write(contents)
-        checksum.update(contents)
-        contents = src_file.read(buffersize)
-    if dst_file is not None:
-        dst_file.flush()
-    return checksum.hexdigest().lower()
-
-def copy_job_order(job, job_order_object):
-    # type: (Any, Any) -> Any
-    """Create copy of job object for provenance."""
-    if not hasattr(job, "tool"):
-        # direct command line tool execution
-        return job_order_object
-    customised_job = {}  # new job object for RO
-    for each, i in enumerate(job.tool["inputs"]):
-        with SourceLine(job.tool["inputs"], each, WorkflowException,
-                        _logger.isEnabledFor(logging.DEBUG)):
-            iid = shortname(i["id"])
-            if iid in job_order_object:
-                customised_job[iid] = copy.deepcopy(job_order_object[iid])
-                # add the input element in dictionary for provenance
-            elif "default" in i:
-                customised_job[iid] = copy.deepcopy(i["default"])
-                # add the default elements in the dictionary for provenance
-            else:
-                pass
-    return customised_job