Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/provenance.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
| author | shellac |
|---|---|
| date | Sat, 02 May 2020 07:14:21 -0400 |
| parents | |
| children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/cwltool/provenance.py Sat May 02 07:14:21 2020 -0400 @@ -0,0 +1,1679 @@ +"""Stores Research Object including provenance.""" +from __future__ import absolute_import + +import copy +import datetime +import hashlib +import logging +import os +import re +import shutil +import tempfile +import uuid +from collections import OrderedDict +from getpass import getuser +from io import BytesIO, FileIO, TextIOWrapper, open +from socket import getfqdn +from typing import (IO, Any, Callable, Dict, List, Generator, + MutableMapping, Optional, Set, Tuple, Union, cast) +from types import ModuleType + +import prov.model as provM +import six +from prov.identifier import Identifier, Namespace +from prov.model import (PROV, ProvActivity, # pylint: disable=unused-import + ProvDocument, ProvEntity) +from pathlib2 import Path, PurePosixPath, PurePath +from ruamel import yaml +from schema_salad.sourceline import SourceLine +from six.moves import urllib +from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import + Text) +# move to a regular typing import when Python 3.3-3.6 is no longer supported + +from .context import RuntimeContext # pylint: disable=unused-import +from .errors import WorkflowException +from .loghandler import _logger +from .pathmapper import get_listing +from .process import Process, shortname # pylint: disable=unused-import +from .stdfsaccess import StdFsAccess # pylint: disable=unused-import +from .utils import json_dumps, versionstring, onWindows + + +# imports needed for retrieving user data +if onWindows(): + import ctypes # pylint: disable=unused-import +else: + try: + import pwd # pylint: disable=unused-import + except ImportError: + pass + +if TYPE_CHECKING: + from .command_line_tool import CommandLineTool, ExpressionTool # pylint: disable=unused-import + from .workflow import Workflow # pylint: disable=unused-import + +if six.PY2: + class PermissionError(OSError): # pylint: disable=redefined-builtin + """Needed for Python2.""" + + pass +__citation__ = "https://doi.org/10.5281/zenodo.1208477" + +# NOTE: Semantic versioning of the CWLProv Research Object +# **and** the cwlprov files +# +# Rough guide (major.minor.patch): +# 1. Bump major number if removing/"breaking" resources or PROV statements +# 2. Bump minor number if adding resources or PROV statements +# 3. Bump patch number for non-breaking non-adding changes, +# e.g. fixing broken relative paths +CWLPROV_VERSION = "https://w3id.org/cwl/prov/0.6.0" + +# Research Object folders +METADATA = "metadata" +DATA = "data" +WORKFLOW = "workflow" +SNAPSHOT = "snapshot" +# sub-folders +MAIN = os.path.join(WORKFLOW, "main") +PROVENANCE = os.path.join(METADATA, "provenance") +LOGS = os.path.join(METADATA, "logs") +WFDESC = Namespace("wfdesc", 'http://purl.org/wf4ever/wfdesc#') +WFPROV = Namespace("wfprov", 'http://purl.org/wf4ever/wfprov#') +WF4EVER = Namespace("wf4ever", 'http://purl.org/wf4ever/wf4ever#') +RO = Namespace("ro", 'http://purl.org/wf4ever/ro#') +ORE = Namespace("ore", 'http://www.openarchives.org/ore/terms/') +FOAF = Namespace("foaf", 'http://xmlns.com/foaf/0.1/') +SCHEMA = Namespace("schema", 'http://schema.org/') +CWLPROV = Namespace('cwlprov', 'https://w3id.org/cwl/prov#') +ORCID = Namespace("orcid", "https://orcid.org/") +UUID = Namespace("id", "urn:uuid:") + +# BagIt and YAML always use UTF-8 +ENCODING = "UTF-8" +TEXT_PLAIN = 'text/plain; charset="%s"' % ENCODING + +# sha1, compatible with the File type's "checksum" field +# e.g. "checksum" = "sha1$47a013e660d408619d894b20806b1d5086aab03b" +# See ./cwltool/schemas/v1.0/Process.yml +Hasher = hashlib.sha1 +SHA1 = "sha1" +SHA256 = "sha256" +SHA512 = "sha512" + +# TODO: Better identifiers for user, at least +# these should be preserved in ~/.config/cwl for every execution +# on this host +USER_UUID = uuid.uuid4().urn +ACCOUNT_UUID = uuid.uuid4().urn + + +def _posix_path(local_path): + # type: (Text) -> Text + return str(PurePosixPath(Path(local_path))) + + +def _local_path(posix_path): + # type: (Text) -> Text + return str(Path(posix_path)) + + +def _whoami(): + # type: () -> Tuple[Text,Text] + """Return the current operating system account as (username, fullname).""" + username = getuser() + try: + if onWindows(): + get_user_name = ctypes.windll.secur32.GetUserNameExW # type: ignore + size = ctypes.pointer(ctypes.c_ulong(0)) + get_user_name(3, None, size) + + name_buffer = ctypes.create_unicode_buffer(size.contents.value) + get_user_name(3, name_buffer, size) + fullname = str(name_buffer.value) + else: + fullname = pwd.getpwuid(os.getuid())[4].split(',')[0] + except (KeyError, IndexError): + fullname = username + + return (username, fullname) + + +class WritableBagFile(FileIO): + """Writes files in research object.""" + + def __init__(self, research_object, rel_path): + # type: (ResearchObject, Text) -> None + """Initialize an ROBagIt.""" + self.research_object = research_object + if Path(rel_path).is_absolute(): + raise ValueError("rel_path must be relative: %s" % rel_path) + self.rel_path = rel_path + self.hashes = {SHA1: hashlib.sha1(), # nosec + SHA256: hashlib.sha256(), + SHA512: hashlib.sha512()} + # Open file in Research Object folder + path = os.path.abspath(os.path.join(research_object.folder, _local_path(rel_path))) + if not path.startswith(os.path.abspath(research_object.folder)): + raise ValueError("Path is outside Research Object: %s" % path) + super(WritableBagFile, self).__init__(str(path), mode="w") + + + def write(self, b): + # type: (Union[bytes, Text]) -> int + if isinstance(b, bytes): + real_b = b + else: + real_b = b.encode('utf-8') + total = 0 + length = len(real_b) + while total < length: + ret = super(WritableBagFile, self).write(real_b) + if ret: + total += ret + for _ in self.hashes.values(): + _.update(real_b) + return total + + def close(self): # type: () -> None + # FIXME: Convert below block to a ResearchObject method? + if self.rel_path.startswith("data/"): + self.research_object.bagged_size[self.rel_path] = self.tell() + else: + self.research_object.tagfiles.add(self.rel_path) + + super(WritableBagFile, self).close() + # { "sha1": "f572d396fae9206628714fb2ce00f72e94f2258f" } + checksums = {} + for name in self.hashes: + checksums[name] = self.hashes[name].hexdigest().lower() + self.research_object.add_to_manifest(self.rel_path, checksums) + + # To simplify our hash calculation we won't support + # seeking, reading or truncating, as we can't do + # similar seeks in the current hash. + # TODO: Support these? At the expense of invalidating + # the current hash, then having to recalculate at close() + def seekable(self): # type: () -> bool + return False + + def readable(self): # type: () -> bool + return False + + def truncate(self, size=None): + # type: (Optional[int]) -> int + # FIXME: This breaks contract IOBase, + # as it means we would have to recalculate the hash + if size is not None: + raise IOError("WritableBagFile can't truncate") + return self.tell() + + +def _check_mod_11_2(numeric_string): + # type: (Text) -> bool + """ + Validate numeric_string for its MOD-11-2 checksum. + + Any "-" in the numeric_string are ignored. + + The last digit of numeric_string is assumed to be the checksum, 0-9 or X. + + See ISO/IEC 7064:2003 and + https://support.orcid.org/knowledgebase/articles/116780-structure-of-the-orcid-identifier + """ + # Strip - + nums = numeric_string.replace("-", "") + total = 0 + # skip last (check)digit + for num in nums[:-1]: + digit = int(num) + total = (total+digit)*2 + remainder = total % 11 + result = (12-remainder) % 11 + if result == 10: + checkdigit = "X" + else: + checkdigit = str(result) + # Compare against last digit or X + return nums[-1].upper() == checkdigit + + +def _valid_orcid(orcid): # type: (Optional[Text]) -> Text + """ + Ensure orcid is a valid ORCID identifier. + + The string must be equivalent to one of these forms: + + 0000-0002-1825-0097 + orcid.org/0000-0002-1825-0097 + http://orcid.org/0000-0002-1825-0097 + https://orcid.org/0000-0002-1825-0097 + + If the ORCID number or prefix is invalid, a ValueError is raised. + + The returned ORCID string is always in the form of: + https://orcid.org/0000-0002-1825-0097 + """ + if orcid is None or not orcid: + raise ValueError(u'ORCID cannot be unspecified') + # Liberal in what we consume, e.g. ORCID.org/0000-0002-1825-009x + orcid = orcid.lower() + match = re.match( + # Note: concatinated r"" r"" below so we can add comments to pattern + + # Optional hostname, with or without protocol + r"(http://orcid\.org/|https://orcid\.org/|orcid\.org/)?" + # alternative pattern, but probably messier + # r"^((https?://)?orcid.org/)?" + + # ORCID number is always 4x4 numerical digits, + # but last digit (modulus 11 checksum) + # can also be X (but we made it lowercase above). + # e.g. 0000-0002-1825-0097 + # or 0000-0002-1694-233x + r"(?P<orcid>(\d{4}-\d{4}-\d{4}-\d{3}[0-9x]))$", + orcid) + + help_url = u"https://support.orcid.org/knowledgebase/articles/"\ + "116780-structure-of-the-orcid-identifier" + if not match: + raise ValueError(u"Invalid ORCID: %s\n%s" % (orcid, help_url)) + + # Conservative in what we produce: + # a) Ensure any checksum digit is uppercase + orcid_num = match.group("orcid").upper() + # b) ..and correct + if not _check_mod_11_2(orcid_num): + raise ValueError( + u"Invalid ORCID checksum: %s\n%s" % (orcid_num, help_url)) + + # c) Re-add the official prefix https://orcid.org/ + return u"https://orcid.org/%s" % orcid_num + + +class ProvenanceProfile(): + """ + Provenance profile. + + Populated as the workflow runs. + """ + + def __init__(self, + research_object, # type: ResearchObject + full_name, # type: str + host_provenance, # type: bool + user_provenance, # type: bool + orcid, # type: str + fsaccess, # type: StdFsAccess + run_uuid=None # type: Optional[uuid.UUID] + ): # type: (...) -> None + """Initialize the provenance profile.""" + self.fsaccess = fsaccess + self.orcid = orcid + self.research_object = research_object + self.folder = self.research_object.folder + self.document = ProvDocument() + self.host_provenance = host_provenance + self.user_provenance = user_provenance + self.engine_uuid = research_object.engine_uuid + self.add_to_manifest = self.research_object.add_to_manifest + if self.orcid: + _logger.debug(u"[provenance] Creator ORCID: %s", self.orcid) + self.full_name = full_name + if self.full_name: + _logger.debug(u"[provenance] Creator Full name: %s", + self.full_name) + if run_uuid is None: + run_uuid = uuid.uuid4() + self.workflow_run_uuid = run_uuid + self.workflow_run_uri = run_uuid.urn + self.generate_prov_doc() + + def __str__(self): # type: () -> str + """Represent this Provenvance profile as a string.""" + return "ProvenanceProfile <%s> in <%s>" % ( + self.workflow_run_uri, self.research_object) + + def generate_prov_doc(self): + # type: () -> Tuple[str, ProvDocument] + """Add basic namespaces.""" + def host_provenance(document): + # type: (ProvDocument) -> None + """Record host provenance.""" + document.add_namespace(CWLPROV) + document.add_namespace(UUID) + document.add_namespace(FOAF) + + hostname = getfqdn() + # won't have a foaf:accountServiceHomepage for unix hosts, but + # we can at least provide hostname + document.agent( + ACCOUNT_UUID, {provM.PROV_TYPE: FOAF["OnlineAccount"], + "prov:location": hostname, + CWLPROV["hostname"]: hostname}) + + self.cwltool_version = "cwltool %s" % versionstring().split()[-1] + self.document.add_namespace( + 'wfprov', 'http://purl.org/wf4ever/wfprov#') + # document.add_namespace('prov', 'http://www.w3.org/ns/prov#') + self.document.add_namespace( + 'wfdesc', 'http://purl.org/wf4ever/wfdesc#') + # TODO: Make this ontology. For now only has cwlprov:image + self.document.add_namespace('cwlprov', 'https://w3id.org/cwl/prov#') + self.document.add_namespace('foaf', 'http://xmlns.com/foaf/0.1/') + self.document.add_namespace('schema', 'http://schema.org/') + self.document.add_namespace('orcid', 'https://orcid.org/') + self.document.add_namespace('id', 'urn:uuid:') + # NOTE: Internet draft expired 2004-03-04 (!) + # https://tools.ietf.org/html/draft-thiemann-hash-urn-01 + # TODO: Change to nih:sha-256; hashes + # https://tools.ietf.org/html/rfc6920#section-7 + self.document.add_namespace('data', 'urn:hash::sha1:') + # Also needed for docker images + self.document.add_namespace(SHA256, "nih:sha-256;") + + # info only, won't really be used by prov as sub-resources use / + self.document.add_namespace( + 'researchobject', self.research_object.base_uri) + # annotations + self.metadata_ns = self.document.add_namespace( + 'metadata', self.research_object.base_uri + METADATA + "/") + # Pre-register provenance directory so we can refer to its files + self.provenance_ns = self.document.add_namespace( + 'provenance', self.research_object.base_uri + + _posix_path(PROVENANCE) + "/") + ro_identifier_workflow = self.research_object.base_uri \ + + "workflow/packed.cwl#" + self.wf_ns = self.document.add_namespace("wf", ro_identifier_workflow) + ro_identifier_input = self.research_object.base_uri \ + + "workflow/primary-job.json#" + self.document.add_namespace("input", ro_identifier_input) + + # More info about the account (e.g. username, fullname) + # may or may not have been previously logged by user_provenance() + # .. but we always know cwltool was launched (directly or indirectly) + # by a user account, as cwltool is a command line tool + account = self.document.agent(ACCOUNT_UUID) + if self.orcid or self.full_name: + person = {provM.PROV_TYPE: PROV["Person"], + "prov:type": SCHEMA["Person"]} + if self.full_name: + person["prov:label"] = self.full_name + person["foaf:name"] = self.full_name + person["schema:name"] = self.full_name + else: + # TODO: Look up name from ORCID API? + pass + agent = self.document.agent(self.orcid or uuid.uuid4().urn, + person) + self.document.actedOnBehalfOf(account, agent) + else: + if self.host_provenance: + host_provenance(self.document) + if self.user_provenance: + self.research_object.user_provenance(self.document) + # The execution of cwltool + wfengine = self.document.agent( + self.engine_uuid, + {provM.PROV_TYPE: PROV["SoftwareAgent"], + "prov:type": WFPROV["WorkflowEngine"], + "prov:label": self.cwltool_version}) + # FIXME: This datetime will be a bit too delayed, we should + # capture when cwltool.py earliest started? + self.document.wasStartedBy( + wfengine, None, account, datetime.datetime.now()) + # define workflow run level activity + self.document.activity( + self.workflow_run_uri, datetime.datetime.now(), None, + {provM.PROV_TYPE: WFPROV["WorkflowRun"], + "prov:label": "Run of workflow/packed.cwl#main"}) + # association between SoftwareAgent and WorkflowRun + main_workflow = "wf:main" + self.document.wasAssociatedWith( + self.workflow_run_uri, self.engine_uuid, main_workflow) + self.document.wasStartedBy( + self.workflow_run_uri, None, self.engine_uuid, + datetime.datetime.now()) + return (self.workflow_run_uri, self.document) + + def evaluate(self, + process, # type: Process + job, # type: Any + job_order_object, # type: Dict[Text, Text] + research_obj # type: ResearchObject + ): # type: (...) -> None + """Evaluate the nature of job.""" + if not hasattr(process, "steps"): + # record provenance of independent commandline tool executions + self.prospective_prov(job) + customised_job = copy_job_order(job, job_order_object) + self.used_artefacts(customised_job, self.workflow_run_uri) + research_obj.create_job(customised_job, job) + elif hasattr(job, "workflow"): + # record provenance of workflow executions + self.prospective_prov(job) + customised_job = copy_job_order(job, job_order_object) + self.used_artefacts(customised_job, self.workflow_run_uri) + + def record_process_start(self, process, job, process_run_id=None): + # type: (Process, Any, Optional[str]) -> Optional[str] + if not hasattr(process, 'steps'): + process_run_id = self.workflow_run_uri + elif not hasattr(job, 'workflow'): + # commandline tool execution as part of workflow + name = str(job.name) if hasattr(job, 'name') else '' + process_name = urllib.parse.quote(name, safe=":/,#") + process_run_id = self.start_process(process_name, datetime.datetime.now()) + return process_run_id + + def start_process(self, process_name, when, process_run_id=None): + # type: (Text, datetime.datetime, Optional[str]) -> str + """Record the start of each Process.""" + if process_run_id is None: + process_run_id = uuid.uuid4().urn + prov_label = "Run of workflow/packed.cwl#main/" + process_name + self.document.activity( + process_run_id, None, None, + {provM.PROV_TYPE: WFPROV["ProcessRun"], + provM.PROV_LABEL: prov_label}) + self.document.wasAssociatedWith( + process_run_id, self.engine_uuid, str("wf:main/" + process_name)) + self.document.wasStartedBy( + process_run_id, None, self.workflow_run_uri, + when, None, None) + return process_run_id + + def record_process_end(self, process_name, process_run_id, outputs, when): + # type: (Text, str, Any, datetime.datetime) -> None + self.generate_output_prov(outputs, process_run_id, process_name) + self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when) + + def declare_file(self, value): + # type: (MutableMapping[Text, Any]) -> Tuple[ProvEntity, ProvEntity, str] + if value["class"] != "File": + raise ValueError("Must have class:File: %s" % value) + # Need to determine file hash aka RO filename + entity = None # type: Optional[ProvEntity] + checksum = None + if 'checksum' in value: + csum = value['checksum'] + (method, checksum) = csum.split("$", 1) + if method == SHA1 and \ + self.research_object.has_data_file(checksum): + entity = self.document.entity("data:" + checksum) + + if not entity and 'location' in value: + location = str(value['location']) + # If we made it here, we'll have to add it to the RO + with self.fsaccess.open(location, "rb") as fhandle: + relative_path = self.research_object.add_data_file(fhandle) + # FIXME: This naively relies on add_data_file setting hash as filename + checksum = PurePath(relative_path).name + entity = self.document.entity( + "data:" + checksum, {provM.PROV_TYPE: WFPROV["Artifact"]}) + if "checksum" not in value: + value["checksum"] = "%s$%s" % (SHA1, checksum) + + + if not entity and 'contents' in value: + # Anonymous file, add content as string + entity, checksum = self.declare_string(value["contents"]) + + # By here one of them should have worked! + if not entity or not checksum: + raise ValueError("class:File but missing checksum/location/content: %r" % value) + + + # Track filename and extension, this is generally useful only for + # secondaryFiles. Note that multiple uses of a file might thus record + # different names for the same entity, so we'll + # make/track a specialized entity by UUID + file_id = value.setdefault("@id", uuid.uuid4().urn) + # A specialized entity that has just these names + file_entity = self.document.entity( + file_id, [(provM.PROV_TYPE, WFPROV["Artifact"]), + (provM.PROV_TYPE, WF4EVER["File"])]) # type: ProvEntity + + if "basename" in value: + file_entity.add_attributes({CWLPROV["basename"]: value["basename"]}) + if "nameroot" in value: + file_entity.add_attributes({CWLPROV["nameroot"]: value["nameroot"]}) + if "nameext" in value: + file_entity.add_attributes({CWLPROV["nameext"]: value["nameext"]}) + self.document.specializationOf(file_entity, entity) + + # Check for secondaries + for sec in value.get("secondaryFiles", ()): + # TODO: Record these in a specializationOf entity with UUID? + if sec['class'] == "File": + (sec_entity, _, _) = self.declare_file(sec) + elif sec['class'] == "Directory": + sec_entity = self.declare_directory(sec) + else: + raise ValueError("Got unexpected secondaryFiles value: {}".format(sec)) + # We don't know how/when/where the secondary file was generated, + # but CWL convention is a kind of summary/index derived + # from the original file. As its generally in a different format + # then prov:Quotation is not appropriate. + self.document.derivation( + sec_entity, file_entity, + other_attributes={PROV["type"]: CWLPROV["SecondaryFile"]}) + + return file_entity, entity, checksum + + def declare_directory(self, value): # type: (MutableMapping[Text, Any]) -> ProvEntity + """Register any nested files/directories.""" + # FIXME: Calculate a hash-like identifier for directory + # so we get same value if it's the same filenames/hashes + # in a different location. + # For now, mint a new UUID to identify this directory, but + # attempt to keep it inside the value dictionary + dir_id = value.setdefault("@id", uuid.uuid4().urn) + + # New annotation file to keep the ORE Folder listing + ore_doc_fn = dir_id.replace("urn:uuid:", "directory-") + ".ttl" + dir_bundle = self.document.bundle(self.metadata_ns[ore_doc_fn]) + + coll = self.document.entity( + dir_id, [(provM.PROV_TYPE, WFPROV["Artifact"]), + (provM.PROV_TYPE, PROV["Collection"]), + (provM.PROV_TYPE, PROV["Dictionary"]), + (provM.PROV_TYPE, RO["Folder"])]) + # ORE description of ro:Folder, saved separately + coll_b = dir_bundle.entity( + dir_id, [(provM.PROV_TYPE, RO["Folder"]), + (provM.PROV_TYPE, ORE["Aggregation"])]) + self.document.mentionOf(dir_id + "#ore", dir_id, dir_bundle.identifier) + + # dir_manifest = dir_bundle.entity( + # dir_bundle.identifier, {PROV["type"]: ORE["ResourceMap"], + # ORE["describes"]: coll_b.identifier}) + + coll_attribs = [(ORE["isDescribedBy"], dir_bundle.identifier)] + coll_b_attribs = [] # type: List[Tuple[Identifier, ProvEntity]] + + # FIXME: .listing might not be populated yet - hopefully + # a later call to this method will sort that + is_empty = True + + if "listing" not in value: + get_listing(self.fsaccess, value) + for entry in value.get("listing", []): + is_empty = False + # Declare child-artifacts + entity = self.declare_artefact(entry) + self.document.membership(coll, entity) + # Membership relation aka our ORE Proxy + m_id = uuid.uuid4().urn + m_entity = self.document.entity(m_id) + m_b = dir_bundle.entity(m_id) + + # PROV-O style Dictionary + # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition + # ..as prov.py do not currently allow PROV-N extensions + # like hadDictionaryMember(..) + m_entity.add_asserted_type(PROV["KeyEntityPair"]) + + m_entity.add_attributes({ + PROV["pairKey"]: entry["basename"], + PROV["pairEntity"]: entity, + }) + + # As well as a being a + # http://wf4ever.github.io/ro/2016-01-28/ro/#FolderEntry + m_b.add_asserted_type(RO["FolderEntry"]) + m_b.add_asserted_type(ORE["Proxy"]) + m_b.add_attributes({ + RO["entryName"]: entry["basename"], + ORE["proxyIn"]: coll, + ORE["proxyFor"]: entity, + + }) + coll_attribs.append((PROV["hadDictionaryMember"], m_entity)) + coll_b_attribs.append((ORE["aggregates"], m_b)) + + coll.add_attributes(coll_attribs) + coll_b.add_attributes(coll_b_attribs) + + # Also Save ORE Folder as annotation metadata + ore_doc = ProvDocument() + ore_doc.add_namespace(ORE) + ore_doc.add_namespace(RO) + ore_doc.add_namespace(UUID) + ore_doc.add_bundle(dir_bundle) + ore_doc = ore_doc.flattened() + ore_doc_path = str(PurePosixPath(METADATA, ore_doc_fn)) + with self.research_object.write_bag_file(ore_doc_path) as provenance_file: + ore_doc.serialize(provenance_file, format="rdf", rdf_format="turtle") + self.research_object.add_annotation(dir_id, [ore_doc_fn], ORE["isDescribedBy"].uri) + + if is_empty: + # Empty directory + coll.add_asserted_type(PROV["EmptyCollection"]) + coll.add_asserted_type(PROV["EmptyDictionary"]) + self.research_object.add_uri(coll.identifier.uri) + return coll + + def declare_string(self, value): + # type: (Union[Text, str]) -> Tuple[ProvEntity,Text] + """Save as string in UTF-8.""" + byte_s = BytesIO(str(value).encode(ENCODING)) + data_file = self.research_object.add_data_file(byte_s, content_type=TEXT_PLAIN) + checksum = PurePosixPath(data_file).name + # FIXME: Don't naively assume add_data_file uses hash in filename! + data_id = "data:%s" % PurePosixPath(data_file).stem + entity = self.document.entity( + data_id, {provM.PROV_TYPE: WFPROV["Artifact"], + provM.PROV_VALUE: str(value)}) # type: ProvEntity + return entity, checksum + + def declare_artefact(self, value): + # type: (Any) -> ProvEntity + """Create data artefact entities for all file objects.""" + if value is None: + # FIXME: If this can happen in CWL, we'll + # need a better way to represent this in PROV + return self.document.entity( + CWLPROV["None"], {provM.PROV_LABEL: "None"}) + + if isinstance(value, (bool, int, float)): + # Typically used in job documents for flags + + # FIXME: Make consistent hash URIs for these + # that somehow include the type + # (so "1" != 1 != "1.0" != true) + entity = self.document.entity( + uuid.uuid4().urn, {provM.PROV_VALUE: value}) + self.research_object.add_uri(entity.identifier.uri) + return entity + + if isinstance(value, (Text, str)): + (entity, _) = self.declare_string(value) + return entity + + if isinstance(value, bytes): + # If we got here then we must be in Python 3 + byte_s = BytesIO(value) + data_file = self.research_object.add_data_file(byte_s) + # FIXME: Don't naively assume add_data_file uses hash in filename! + data_id = "data:%s" % PurePosixPath(data_file).stem + return self.document.entity( + data_id, {provM.PROV_TYPE: WFPROV["Artifact"], + provM.PROV_VALUE: str(value)}) + + if isinstance(value, MutableMapping): + if "@id" in value: + # Already processed this value, but it might not be in this PROV + entities = self.document.get_record(value["@id"]) + if entities: + return entities[0] + # else, unknown in PROV, re-add below as if it's fresh + + # Base case - we found a File we need to update + if value.get("class") == "File": + (entity, _, _) = self.declare_file(value) + value["@id"] = entity.identifier.uri + return entity + + if value.get("class") == "Directory": + entity = self.declare_directory(value) + value["@id"] = entity.identifier.uri + return entity + coll_id = value.setdefault("@id", uuid.uuid4().urn) + # some other kind of dictionary? + # TODO: also Save as JSON + coll = self.document.entity( + coll_id, [(provM.PROV_TYPE, WFPROV["Artifact"]), + (provM.PROV_TYPE, PROV["Collection"]), + (provM.PROV_TYPE, PROV["Dictionary"])]) + + if value.get("class"): + _logger.warning("Unknown data class %s.", value["class"]) + # FIXME: The class might be "http://example.com/somethingelse" + coll.add_asserted_type(CWLPROV[value["class"]]) + + # Let's iterate and recurse + coll_attribs = [] # type: List[Tuple[Identifier, ProvEntity]] + for (key, val) in value.items(): + v_ent = self.declare_artefact(val) + self.document.membership(coll, v_ent) + m_entity = self.document.entity(uuid.uuid4().urn) + # Note: only support PROV-O style dictionary + # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition + # as prov.py do not easily allow PROV-N extensions + m_entity.add_asserted_type(PROV["KeyEntityPair"]) + m_entity.add_attributes({ + PROV["pairKey"]: str(key), + PROV["pairEntity"]: v_ent + }) + coll_attribs.append((PROV["hadDictionaryMember"], m_entity)) + coll.add_attributes(coll_attribs) + self.research_object.add_uri(coll.identifier.uri) + return coll + + # some other kind of Collection? + # TODO: also save as JSON + try: + members = [] + for each_input_obj in iter(value): + # Recurse and register any nested objects + e = self.declare_artefact(each_input_obj) + members.append(e) + + # If we reached this, then we were allowed to iterate + coll = self.document.entity( + uuid.uuid4().urn, [(provM.PROV_TYPE, WFPROV["Artifact"]), + (provM.PROV_TYPE, PROV["Collection"])]) + if not members: + coll.add_asserted_type(PROV["EmptyCollection"]) + else: + for member in members: + # FIXME: This won't preserve order, for that + # we would need to use PROV.Dictionary + # with numeric keys + self.document.membership(coll, member) + self.research_object.add_uri(coll.identifier.uri) + # FIXME: list value does not support adding "@id" + return coll + except TypeError: + _logger.warning("Unrecognized type %s of %r", + type(value), value) + # Let's just fall back to Python repr() + entity = self.document.entity( + uuid.uuid4().urn, {provM.PROV_LABEL: repr(value)}) + self.research_object.add_uri(entity.identifier.uri) + return entity + + def used_artefacts(self, + job_order, # type: Union[Dict[Any, Any], List[Dict[Any, Any]]] + process_run_id, # type: str + name=None # type: Optional[str] + ): # type: (...) -> None + """Add used() for each data artefact.""" + if isinstance(job_order, list): + for entry in job_order: + self.used_artefacts(entry, process_run_id, name) + else: + # FIXME: Use workflow name in packed.cwl, "main" is wrong for nested workflows + base = "main" + if name is not None: + base += "/" + name + for key, value in job_order.items(): + prov_role = self.wf_ns["%s/%s" % (base, key)] + try: + entity = self.declare_artefact(value) + self.document.used( + process_run_id, entity, datetime.datetime.now(), None, + {"prov:role": prov_role}) + except OSError: + pass + + def generate_output_prov(self, + final_output, # type: Union[Dict[Text, Any], List[Dict[Text, Any]]] + process_run_id, # type: Optional[str] + name # type: Optional[Text] + ): # type: (...) -> None + """Call wasGeneratedBy() for each output,copy the files into the RO.""" + if isinstance(final_output, list): + for entry in final_output: + self.generate_output_prov(entry, process_run_id, name) + else: + # Timestamp should be created at the earliest + timestamp = datetime.datetime.now() + + # For each output, find/register the corresponding + # entity (UUID) and document it as generated in + # a role corresponding to the output + for output, value in final_output.items(): + entity = self.declare_artefact(value) + if name is not None: + name = urllib.parse.quote(str(name), safe=":/,#") + # FIXME: Probably not "main" in nested workflows + role = self.wf_ns["main/%s/%s" % (name, output)] + else: + role = self.wf_ns["main/%s" % output] + + if not process_run_id: + process_run_id = self.workflow_run_uri + + self.document.wasGeneratedBy( + entity, process_run_id, timestamp, None, {"prov:role": role}) + + def prospective_prov(self, job): + # type: (Any) -> None + """Create prospective prov recording as wfdesc prov:Plan.""" + if not hasattr(job, "steps"): + # direct command line tool execution + self.document.entity( + "wf:main", {provM.PROV_TYPE: WFDESC["Process"], + "prov:type": PROV["Plan"], + "prov:label":"Prospective provenance"}) + return + + self.document.entity( + "wf:main", {provM.PROV_TYPE: WFDESC["Workflow"], + "prov:type": PROV["Plan"], + "prov:label":"Prospective provenance"}) + + for step in job.steps: + stepnametemp = "wf:main/" + str(step.name)[5:] + stepname = urllib.parse.quote(stepnametemp, safe=":/,#") + step = self.document.entity( + stepname, {provM.PROV_TYPE: WFDESC["Process"], + "prov:type": PROV["Plan"]}) + self.document.entity( + "wf:main", {"wfdesc:hasSubProcess": step, + "prov:label": "Prospective provenance"}) + # TODO: Declare roles/parameters as well + + def activity_has_provenance(self, activity, prov_ids): + # type: (str, List[Identifier]) -> None + """Add http://www.w3.org/TR/prov-aq/ relations to nested PROV files.""" + # NOTE: The below will only work if the corresponding metadata/provenance arcp URI + # is a pre-registered namespace in the PROV Document + attribs = [(PROV["has_provenance"], prov_id) for prov_id in prov_ids] + self.document.activity(activity, other_attributes=attribs) + # Tip: we can't use https://www.w3.org/TR/prov-links/#term-mention + # as prov:mentionOf() is only for entities, not activities + uris = [i.uri for i in prov_ids] + self.research_object.add_annotation(activity, uris, PROV["has_provenance"].uri) + + def finalize_prov_profile(self, name): + # type: (Optional[Text]) -> List[Identifier] + """Transfer the provenance related files to the RO.""" + # NOTE: Relative posix path + if name is None: + # master workflow, fixed filenames + filename = "primary.cwlprov" + else: + # ASCII-friendly filename, avoiding % as we don't want %2520 in manifest.json + wf_name = urllib.parse.quote(str(name), safe="").replace("%", "_") + # Note that the above could cause overlaps for similarly named + # workflows, but that's OK as we'll also include run uuid + # which also covers thhe case of this step being run in + # multiple places or iterations + filename = "%s.%s.cwlprov" % (wf_name, self.workflow_run_uuid) + + basename = str(PurePosixPath(PROVENANCE)/filename) + + # TODO: Also support other profiles than CWLProv, e.g. ProvOne + + # list of prov identifiers of provenance files + prov_ids = [] + + # https://www.w3.org/TR/prov-xml/ + with self.research_object.write_bag_file(basename + ".xml") as provenance_file: + self.document.serialize(provenance_file, format="xml", indent=4) + prov_ids.append(self.provenance_ns[filename + ".xml"]) + + # https://www.w3.org/TR/prov-n/ + with self.research_object.write_bag_file(basename + ".provn") as provenance_file: + self.document.serialize(provenance_file, format="provn", indent=2) + prov_ids.append(self.provenance_ns[filename + ".provn"]) + + # https://www.w3.org/Submission/prov-json/ + with self.research_object.write_bag_file(basename + ".json") as provenance_file: + self.document.serialize(provenance_file, format="json", indent=2) + prov_ids.append(self.provenance_ns[filename + ".json"]) + + # "rdf" aka https://www.w3.org/TR/prov-o/ + # which can be serialized to ttl/nt/jsonld (and more!) + + # https://www.w3.org/TR/turtle/ + with self.research_object.write_bag_file(basename + ".ttl") as provenance_file: + self.document.serialize(provenance_file, format="rdf", rdf_format="turtle") + prov_ids.append(self.provenance_ns[filename + ".ttl"]) + + # https://www.w3.org/TR/n-triples/ + with self.research_object.write_bag_file(basename + ".nt") as provenance_file: + self.document.serialize(provenance_file, format="rdf", rdf_format="ntriples") + prov_ids.append(self.provenance_ns[filename + ".nt"]) + + # https://www.w3.org/TR/json-ld/ + # TODO: Use a nice JSON-LD context + # see also https://eprints.soton.ac.uk/395985/ + # 404 Not Found on https://provenance.ecs.soton.ac.uk/prov.jsonld :( + with self.research_object.write_bag_file(basename + ".jsonld") as provenance_file: + self.document.serialize(provenance_file, format="rdf", rdf_format="json-ld") + prov_ids.append(self.provenance_ns[filename + ".jsonld"]) + + _logger.debug(u"[provenance] added provenance: %s", prov_ids) + return prov_ids + + +class ResearchObject(): + """CWLProv Research Object.""" + + def __init__(self, fsaccess, temp_prefix_ro="tmp", orcid='', full_name=''): + # type: (StdFsAccess, str, Text, Text) -> None + """Initialize the ResearchObject.""" + self.temp_prefix = temp_prefix_ro + self.orcid = '' if not orcid else _valid_orcid(orcid) + self.full_name = full_name + tmp_dir, tmp_prefix = os.path.split(temp_prefix_ro) + self.folder = os.path.abspath(tempfile.mkdtemp(prefix=tmp_prefix, + dir=tmp_dir)) # type: Text + self.closed = False + # map of filename "data/de/alsdklkas": 12398123 bytes + self.bagged_size = {} # type: Dict[Text, int] + self.tagfiles = set() # type: Set[Text] + self._file_provenance = {} # type: Dict[Text, Dict[Text, Text]] + self._external_aggregates = [] # type: List[Dict[Text, Text]] + self.annotations = [] # type: List[Dict[Text, Any]] + self._content_types = {} # type: Dict[Text,str] + self.fsaccess = fsaccess + # These should be replaced by generate_prov_doc when workflow/run IDs are known: + self.engine_uuid = "urn:uuid:%s" % uuid.uuid4() + self.ro_uuid = uuid.uuid4() + self.base_uri = "arcp://uuid,%s/" % self.ro_uuid + self.cwltool_version = "cwltool %s" % versionstring().split()[-1] + ## + self.relativised_input_object = {} # type: Dict[Any, Any] + + self._initialize() + _logger.debug(u"[provenance] Temporary research object: %s", + self.folder) + + def self_check(self): # type: () -> None + """Raise ValueError if this RO is closed.""" + if self.closed: + raise ValueError( + "This ResearchObject has already been closed and is not " + "available for futher manipulation.") + + def __str__(self): # type: () -> str + """Represent this RO as a string.""" + return "ResearchObject <{}> in <{}>".format(self.ro_uuid, self.folder) + + def _initialize(self): # type: () -> None + for research_obj_folder in (METADATA, DATA, WORKFLOW, SNAPSHOT, + PROVENANCE, LOGS): + os.makedirs(os.path.join(self.folder, research_obj_folder)) + self._initialize_bagit() + + def _initialize_bagit(self): # type: () -> None + """Write fixed bagit header.""" + self.self_check() + bagit = os.path.join(self.folder, "bagit.txt") + # encoding: always UTF-8 (although ASCII would suffice here) + # newline: ensure LF also on Windows + with open(bagit, "w", encoding=ENCODING, newline='\n') as bag_it_file: + # TODO: \n or \r\n ? + bag_it_file.write(u"BagIt-Version: 0.97\n") + bag_it_file.write(u"Tag-File-Character-Encoding: %s\n" % ENCODING) + + def open_log_file_for_activity(self, uuid_uri): # type: (Text) -> WritableBagFile + self.self_check() + # Ensure valid UUID for safe filenames + activity_uuid = uuid.UUID(uuid_uri) + if activity_uuid.urn == self.engine_uuid: + # It's the engine aka cwltool! + name = "engine" + else: + name = "activity" + p = os.path.join(LOGS, "{}.{}.txt".format(name, activity_uuid)) + _logger.debug("[provenance] Opening log file for %s: %s" % (name, p)) + self.add_annotation(activity_uuid.urn, [p], CWLPROV["log"].uri) + return self.write_bag_file(p) + + def _finalize(self): # type: () -> None + self._write_ro_manifest() + self._write_bag_info() + + def user_provenance(self, document): # type: (ProvDocument) -> None + """Add the user provenance.""" + self.self_check() + (username, fullname) = _whoami() + + if not self.full_name: + self.full_name = fullname + + document.add_namespace(UUID) + document.add_namespace(ORCID) + document.add_namespace(FOAF) + account = document.agent( + ACCOUNT_UUID, {provM.PROV_TYPE: FOAF["OnlineAccount"], + "prov:label": username, + FOAF["accountName"]: username}) + + user = document.agent( + self.orcid or USER_UUID, + {provM.PROV_TYPE: PROV["Person"], + "prov:label": self.full_name, + FOAF["name"]: self.full_name, + FOAF["account"]: account}) + # cwltool may be started on the shell (directly by user), + # by shell script (indirectly by user) + # or from a different program + # (which again is launched by any of the above) + # + # We can't tell in which way, but ultimately we're still + # acting in behalf of that user (even if we might + # get their name wrong!) + document.actedOnBehalfOf(account, user) + + def write_bag_file(self, path, encoding=ENCODING): + # type: (Text, Optional[str]) -> WritableBagFile + """Write the bag file into our research object.""" + self.self_check() + # For some reason below throws BlockingIOError + #fp = BufferedWriter(WritableBagFile(self, path)) + bag_file = WritableBagFile(self, path) + if encoding is not None: + # encoding: match Tag-File-Character-Encoding: UTF-8 + # newline: ensure LF also on Windows + return cast(WritableBagFile, + TextIOWrapper(cast(IO[bytes], bag_file), encoding=encoding, + newline="\n")) + return bag_file + + def add_tagfile(self, path, timestamp=None): + # type: (Text, Optional[datetime.datetime]) -> None + """Add tag files to our research object.""" + self.self_check() + checksums = {} + # Read file to calculate its checksum + if os.path.isdir(path): + return + # FIXME: do the right thing for directories + with open(path, "rb") as tag_file: + # FIXME: Should have more efficient open_tagfile() that + # does all checksums in one go while writing through, + # adding checksums after closing. + # Below probably OK for now as metadata files + # are not too large..? + + checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1) + + tag_file.seek(0) + checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256) + + tag_file.seek(0) + checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512) + + rel_path = _posix_path(os.path.relpath(path, self.folder)) + self.tagfiles.add(rel_path) + self.add_to_manifest(rel_path, checksums) + if timestamp is not None: + self._file_provenance[rel_path] = {"createdOn": timestamp.isoformat()} + + def _ro_aggregates(self): + # type: () -> List[Dict[Text, Any]] + """Gather dictionary of files to be added to the manifest.""" + def guess_mediatype(rel_path): + # type: (Text) -> Dict[Text, Any] + """Return the mediatypes.""" + media_types = { + # Adapted from + # https://w3id.org/bundle/2014-11-05/#media-types + + "txt": TEXT_PLAIN, + "ttl": 'text/turtle; charset="UTF-8"', + "rdf": 'application/rdf+xml', + "json": 'application/json', + "jsonld": 'application/ld+json', + "xml": 'application/xml', + ## + "cwl": 'text/x+yaml; charset="UTF-8"', + "provn": 'text/provenance-notation; charset="UTF-8"', + "nt": 'application/n-triples', + } # type: Dict[Text, Text] + conforms_to = { + "provn": 'http://www.w3.org/TR/2013/REC-prov-n-20130430/', + "cwl": 'https://w3id.org/cwl/', + } # type: Dict[Text, Text] + + prov_conforms_to = { + "provn": 'http://www.w3.org/TR/2013/REC-prov-n-20130430/', + "rdf": 'http://www.w3.org/TR/2013/REC-prov-o-20130430/', + "ttl": 'http://www.w3.org/TR/2013/REC-prov-o-20130430/', + "nt": 'http://www.w3.org/TR/2013/REC-prov-o-20130430/', + "jsonld": 'http://www.w3.org/TR/2013/REC-prov-o-20130430/', + "xml": 'http://www.w3.org/TR/2013/NOTE-prov-xml-20130430/', + "json": 'http://www.w3.org/Submission/2013/SUBM-prov-json-20130424/', + } # type: Dict[Text, Text] + + + extension = rel_path.rsplit(".", 1)[-1].lower() # type: Optional[Text] + if extension == rel_path: + # No ".", no extension + extension = None + + local_aggregate = {} # type: Dict[Text, Any] + if extension in media_types: + local_aggregate["mediatype"] = media_types[extension] + + if extension in conforms_to: + # TODO: Open CWL file to read its declared "cwlVersion", e.g. + # cwlVersion = "v1.0" + local_aggregate["conformsTo"] = conforms_to[extension] + + if (rel_path.startswith(_posix_path(PROVENANCE)) + and extension in prov_conforms_to): + if ".cwlprov" in rel_path: + # Our own! + local_aggregate["conformsTo"] = [prov_conforms_to[extension], CWLPROV_VERSION] + else: + # Some other PROV + # TODO: Recognize ProvOne etc. + local_aggregate["conformsTo"] = prov_conforms_to[extension] + return local_aggregate + + aggregates = [] # type: List[Dict[Text, Any]] + for path in self.bagged_size.keys(): + aggregate_dict = {} # type: Dict[Text, Any] + + temp_path = PurePosixPath(path) + folder = temp_path.parent + filename = temp_path.name + + # NOTE: Here we end up aggregating the abstract + # data items by their sha1 hash, so that it matches + # the entity() in the prov files. + + # TODO: Change to nih:sha-256; hashes + # https://tools.ietf.org/html/rfc6920#section-7 + aggregate_dict["uri"] = 'urn:hash::sha1:' + filename + aggregate_dict["bundledAs"] = { + # The arcp URI is suitable ORE proxy; local to this Research Object. + # (as long as we don't also aggregate it by relative path!) + "uri": self.base_uri + path, + # relate it to the data/ path + "folder": "/%s/" % folder, + "filename": filename, + } + if path in self._file_provenance: + # Made by workflow run, merge captured provenance + aggregate_dict["bundledAs"].update(self._file_provenance[path]) + else: + # Probably made outside wf run, part of job object? + pass + if path in self._content_types: + aggregate_dict["mediatype"] = self._content_types[path] + + aggregates.append(aggregate_dict) + + for path in self.tagfiles: + if (not (path.startswith(METADATA) or path.startswith(WORKFLOW) or + path.startswith(SNAPSHOT))): + # probably a bagit file + continue + if path == PurePosixPath(METADATA)/"manifest.json": + # Should not really be there yet! But anyway, we won't + # aggregate it. + continue + + rel_aggregates = {} # type: Dict[Text, Any] + # These are local paths like metadata/provenance - but + # we need to relativize them for our current directory for + # as we are saved in metadata/manifest.json + uri = str(Path(os.pardir)/path) + + rel_aggregates["uri"] = uri + rel_aggregates.update(guess_mediatype(path)) + + if path in self._file_provenance: + # Propagate file provenance (e.g. timestamp) + rel_aggregates.update(self._file_provenance[path]) + elif not path.startswith(SNAPSHOT): + # make new timestamp? + rel_aggregates.update(self._self_made()) + aggregates.append(rel_aggregates) + aggregates.extend(self._external_aggregates) + return aggregates + + def add_uri(self, uri, timestamp=None): + # type: (str, Optional[datetime.datetime]) -> Dict[Text, Any] + self.self_check() + aggr = self._self_made(timestamp=timestamp) + aggr["uri"] = uri + self._external_aggregates.append(aggr) + return aggr + + def add_annotation(self, about, content, motivated_by="oa:describing"): + # type: (str, List[str], str) -> str + """Cheap URI relativize for current directory and /.""" + self.self_check() + curr = self.base_uri + METADATA + "/" + content = [c.replace(curr, "").replace(self.base_uri, "../") + for c in content] + uri = uuid.uuid4().urn + ann = { + u"uri": uri, + u"about": about, + u"content": content, + u"oa:motivatedBy": {"@id": motivated_by} + } + self.annotations.append(ann) + return uri + + def _ro_annotations(self): + # type: () -> List[Dict[Text, Any]] + annotations = [] # type: List[Dict[Text, Any]] + annotations.append({ + "uri": uuid.uuid4().urn, + "about": self.ro_uuid.urn, + "content": "/", + # https://www.w3.org/TR/annotation-vocab/#named-individuals + "oa:motivatedBy": {"@id": "oa:describing"} + }) + + # How was it run? + # FIXME: Only primary* + prov_files = [str(PurePosixPath(p).relative_to(METADATA)) for p in self.tagfiles + if p.startswith(_posix_path(PROVENANCE)) + and "/primary." in p] + annotations.append({ + "uri": uuid.uuid4().urn, + "about": self.ro_uuid.urn, + "content": prov_files, + # Modulation of https://www.w3.org/TR/prov-aq/ + "oa:motivatedBy": {"@id": "http://www.w3.org/ns/prov#has_provenance"} + }) + + # Where is the main workflow? + annotations.append({ + "uri": uuid.uuid4().urn, + "about": str(PurePosixPath("..")/WORKFLOW/"packed.cwl"), + "oa:motivatedBy": {"@id": "oa:highlighting"} + }) + + annotations.append({ + "uri": uuid.uuid4().urn, + "about": self.ro_uuid.urn, + "content": [str(PurePosixPath("..")/WORKFLOW/"packed.cwl"), + str(PurePosixPath("..")/WORKFLOW/"primary-job.json")], + "oa:motivatedBy": {"@id": "oa:linking"} + }) + # Add user-added annotations at end + annotations.extend(self.annotations) + return annotations + + def _authored_by(self): + # type: () -> Dict[Text, Any] + authored_by = {} + if self.orcid: + authored_by["orcid"] = self.orcid + if self.full_name: + authored_by["name"] = self.full_name + if not self.orcid: + authored_by["uri"] = USER_UUID + + if authored_by: + return {"authoredBy": authored_by} + return {} + + + def _write_ro_manifest(self): + # type: () -> None + + # Does not have to be this order, but it's nice to be consistent + manifest = OrderedDict() # type: Dict[Text, Any] + manifest["@context"] = [ + {"@base": "%s%s/" % (self.base_uri, _posix_path(METADATA))}, + "https://w3id.org/bundle/context" + ] + manifest["id"] = "/" + manifest["conformsTo"] = CWLPROV_VERSION + filename = "manifest.json" + manifest["manifest"] = filename + manifest.update(self._self_made()) + manifest.update(self._authored_by()) + manifest["aggregates"] = self._ro_aggregates() + manifest["annotations"] = self._ro_annotations() + + json_manifest = json_dumps(manifest, indent=4, ensure_ascii=False) + rel_path = str(PurePosixPath(METADATA)/filename) + json_manifest += "\n" + with self.write_bag_file(rel_path) as manifest_file: + manifest_file.write(json_manifest) + + def _write_bag_info(self): + # type: () -> None + + with self.write_bag_file("bag-info.txt") as info_file: + info_file.write(u"Bag-Software-Agent: %s\n" % self.cwltool_version) + # FIXME: require sha-512 of payload to comply with profile? + # FIXME: Update profile + info_file.write(u"BagIt-Profile-Identifier: https://w3id.org/ro/bagit/profile\n") + info_file.write(u"Bagging-Date: %s\n" % datetime.date.today().isoformat()) + info_file.write(u"External-Description: Research Object of CWL workflow run\n") + if self.full_name: + info_file.write(u"Contact-Name: %s\n" % self.full_name) + + # NOTE: We can't use the urn:uuid:{UUID} of the workflow run (a prov:Activity) + # as identifier for the RO/bagit (a prov:Entity). However the arcp base URI is good. + info_file.write(u"External-Identifier: %s\n" % self.base_uri) + + # Calculate size of data/ (assuming no external fetch.txt files) + total_size = sum(self.bagged_size.values()) + num_files = len(self.bagged_size) + info_file.write(u"Payload-Oxum: %d.%d\n" % (total_size, num_files)) + _logger.debug(u"[provenance] Generated bagit metadata: %s", + self.folder) + + def generate_snapshot(self, prov_dep): + # type: (MutableMapping[Text, Any]) -> None + """Copy all of the CWL files to the snapshot/ directory.""" + self.self_check() + for key, value in prov_dep.items(): + if key == "location" and value.split("/")[-1]: + filename = value.split("/")[-1] + path = os.path.join(self.folder, SNAPSHOT, filename) + filepath = '' + if "file://" in value: + filepath = value[7:] + else: + filepath = value + + # FIXME: What if destination path already exists? + if os.path.exists(filepath): + try: + if os.path.isdir(filepath): + shutil.copytree(filepath, path) + else: + shutil.copy(filepath, path) + timestamp = datetime.datetime.fromtimestamp(os.path.getmtime(filepath)) + self.add_tagfile(path, timestamp) + except PermissionError: + pass # FIXME: avoids duplicate snapshotting; need better solution + elif key in ("secondaryFiles", "listing"): + for files in value: + if isinstance(files, MutableMapping): + self.generate_snapshot(files) + else: + pass + + def packed_workflow(self, packed): # type: (Text) -> None + """Pack CWL description to generate re-runnable CWL object in RO.""" + self.self_check() + rel_path = str(PurePosixPath(WORKFLOW)/"packed.cwl") + # Write as binary + with self.write_bag_file(rel_path, encoding=None) as write_pack: + # YAML is always UTF8, but json.dumps gives us str in py2 + write_pack.write(packed.encode(ENCODING)) + _logger.debug(u"[provenance] Added packed workflow: %s", rel_path) + + def has_data_file(self, sha1hash): # type: (str) -> bool + """Confirm the presence of the given file in the RO.""" + folder = os.path.join(self.folder, DATA, sha1hash[0:2]) + hash_path = os.path.join(folder, sha1hash) + return os.path.isfile(hash_path) + + def add_data_file(self, from_fp, timestamp=None, content_type=None): + # type: (IO[Any], Optional[datetime.datetime], Optional[str]) -> Text + """Copy inputs to data/ folder.""" + self.self_check() + tmp_dir, tmp_prefix = os.path.split(self.temp_prefix) + with tempfile.NamedTemporaryFile( + prefix=tmp_prefix, dir=tmp_dir, delete=False) as tmp: + checksum = checksum_copy(from_fp, tmp) + + # Calculate hash-based file path + folder = os.path.join(self.folder, DATA, checksum[0:2]) + path = os.path.join(folder, checksum) + # os.rename assumed safe, as our temp file should + # be in same file system as our temp folder + if not os.path.isdir(folder): + os.makedirs(folder) + os.rename(tmp.name, path) + + # Relative posix path + # (to avoid \ on Windows) + rel_path = _posix_path(os.path.relpath(path, self.folder)) + + # Register in bagit checksum + if Hasher == hashlib.sha1: + self._add_to_bagit(rel_path, sha1=checksum) + else: + _logger.warning( + u"[provenance] Unknown hash method %s for bagit manifest", + Hasher) + # Inefficient, bagit support need to checksum again + self._add_to_bagit(rel_path) + _logger.debug(u"[provenance] Added data file %s", path) + if timestamp is not None: + self._file_provenance[rel_path] = self._self_made(timestamp) + _logger.debug(u"[provenance] Relative path for data file %s", rel_path) + + if content_type is not None: + self._content_types[rel_path] = content_type + return rel_path + + def _self_made(self, timestamp=None): + # type: (Optional[datetime.datetime]) -> Dict[Text, Any] + if timestamp is None: + timestamp = datetime.datetime.now() + return { + "createdOn": timestamp.isoformat(), + "createdBy": {"uri": self.engine_uuid, + "name": self.cwltool_version} + } + + def add_to_manifest(self, rel_path, checksums): + # type: (Text, Dict[str,str]) -> None + """Add files to the research object manifest.""" + self.self_check() + if PurePosixPath(rel_path).is_absolute(): + raise ValueError("rel_path must be relative: %s" % rel_path) + + if os.path.commonprefix(["data/", rel_path]) == "data/": + # payload file, go to manifest + manifest = "manifest" + else: + # metadata file, go to tag manifest + manifest = "tagmanifest" + + # Add checksums to corresponding manifest files + for (method, hash_value) in checksums.items(): + # File not in manifest because we bailed out on + # existence in bagged_size above + manifestpath = os.path.join( + self.folder, "%s-%s.txt" % (manifest, method.lower())) + # encoding: match Tag-File-Character-Encoding: UTF-8 + # newline: ensure LF also on Windows + with open(manifestpath, "a", encoding=ENCODING, newline='\n') \ + as checksum_file: + line = u"%s %s\n" % (hash_value, rel_path) + _logger.debug(u"[provenance] Added to %s: %s", manifestpath, line) + checksum_file.write(line) + + + def _add_to_bagit(self, rel_path, **checksums): + # type: (Text, Any) -> None + if PurePosixPath(rel_path).is_absolute(): + raise ValueError("rel_path must be relative: %s" % rel_path) + local_path = os.path.join(self.folder, _local_path(rel_path)) + if not os.path.exists(local_path): + raise IOError("File %s does not exist within RO: %s" % (rel_path, local_path)) + + if rel_path in self.bagged_size: + # Already added, assume checksum OK + return + self.bagged_size[rel_path] = os.path.getsize(local_path) + + if SHA1 not in checksums: + # ensure we always have sha1 + checksums = dict(checksums) + with open(local_path, "rb") as file_path: + # FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile? + checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1) + + self.add_to_manifest(rel_path, checksums) + + def create_job(self, + builder_job, # type: Dict[Text, Any] + wf_job=None, # type: Optional[Callable[[Dict[Text, Text], Callable[[Any, Any], Any], RuntimeContext], Generator[Any, None, None]]] + is_output=False # type: bool + ): # type: (...) -> Dict[Text, Text] + #TODO customise the file + """Generate the new job object with RO specific relative paths.""" + copied = copy.deepcopy(builder_job) + relativised_input_objecttemp = {} # type: Dict[Text, Any] + self._relativise_files(copied) + def jdefault(o): # type: (Any) -> Dict[Any, Any] + return dict(o) + if is_output: + rel_path = PurePosixPath(WORKFLOW)/"primary-output.json" + else: + rel_path = PurePosixPath(WORKFLOW)/"primary-job.json" + j = json_dumps(copied, indent=4, ensure_ascii=False, default=jdefault) + with self.write_bag_file(str(rel_path)) as file_path: + file_path.write(j + u"\n") + _logger.debug(u"[provenance] Generated customised job file: %s", + rel_path) + # Generate dictionary with keys as workflow level input IDs and values + # as + # 1) for files the relativised location containing hash + # 2) for other attributes, the actual value. + relativised_input_objecttemp = {} + for key, value in copied.items(): + if isinstance(value, MutableMapping): + if value.get("class") in ("File", "Directory"): + relativised_input_objecttemp[key] = value + else: + relativised_input_objecttemp[key] = value + self.relativised_input_object.update( + {k: v for k, v in relativised_input_objecttemp.items() if v}) + return self.relativised_input_object + + def _relativise_files(self, structure): + # type: (Dict[Any, Any]) -> None + """Save any file objects into the RO and update the local paths.""" + # Base case - we found a File we need to update + _logger.debug(u"[provenance] Relativising: %s", structure) + + if isinstance(structure, MutableMapping): + if structure.get("class") == "File": + relative_path = None + if "checksum" in structure: + alg, checksum = structure["checksum"].split("$") + if alg != SHA1: + raise TypeError( + "Only SHA1 CWL checksums are currently supported: " + "{}".format(structure)) + if self.has_data_file(checksum): + prefix = checksum[0:2] + relative_path = PurePosixPath("data")/prefix/checksum + + if not relative_path is not None and "location" in structure: + # Register in RO; but why was this not picked + # up by used_artefacts? + _logger.info("[provenance] Adding to RO %s", structure["location"]) + with self.fsaccess.open(structure["location"], "rb") as fp: + relative_path = self.add_data_file(fp) + checksum = PurePosixPath(relative_path).name + structure["checksum"] = "%s$%s" % (SHA1, checksum) + if relative_path is not None: + # RO-relative path as new location + structure["location"] = str(PurePosixPath("..")/relative_path) + else: + _logger.warning("Could not determine RO path for file %s", structure) + if "path" in structure: + del structure["path"] + + if structure.get("class") == "Directory": + # TODO: Generate anonymoys Directory with a "listing" + # pointing to the hashed files + del structure["location"] + + for val in structure.values(): + try: + self._relativise_files(val) + except OSError: + pass + return + + if isinstance(structure, (str, Text)): + # Just a string value, no need to iterate further + return + try: + for obj in iter(structure): + # Recurse and rewrite any nested File objects + self._relativise_files(obj) + except TypeError: + pass + + def close(self, save_to=None): + # type: (Optional[str]) -> None + """Close the Research Object, optionally saving to specified folder. + + Closing will remove any temporary files used by this research object. + After calling this method, this ResearchObject instance can no longer + be used, except for no-op calls to .close(). + + The 'saveTo' folder should not exist - if it does, it will be deleted. + + It is safe to call this function multiple times without the + 'saveTo' argument, e.g. within a try..finally block to + ensure the temporary files of this Research Object are removed. + """ + if save_to is None: + if not self.closed: + _logger.debug(u"[provenance] Deleting temporary %s", self.folder) + shutil.rmtree(self.folder, ignore_errors=True) + else: + save_to = os.path.abspath(save_to) + _logger.info(u"[provenance] Finalizing Research Object") + self._finalize() # write manifest etc. + # TODO: Write as archive (.zip or .tar) based on extension? + + if os.path.isdir(save_to): + _logger.info(u"[provenance] Deleting existing %s", save_to) + shutil.rmtree(save_to) + shutil.move(self.folder, save_to) + _logger.info(u"[provenance] Research Object saved to %s", save_to) + self.folder = save_to + self.closed = True + +def checksum_copy(src_file, # type: IO[Any] + dst_file=None, # type: Optional[IO[Any]] + hasher=Hasher, # type: Callable[[], hashlib._Hash] + buffersize=1024*1024 # type: int + ): # type: (...) -> str + """Compute checksums while copying a file.""" + # TODO: Use hashlib.new(Hasher_str) instead? + checksum = hasher() + contents = src_file.read(buffersize) + if dst_file and hasattr(dst_file, "name") and hasattr(src_file, "name"): + temp_location = os.path.join(os.path.dirname(dst_file.name), + str(uuid.uuid4())) + try: + os.rename(dst_file.name, temp_location) + os.link(src_file.name, dst_file.name) + dst_file = None + os.unlink(temp_location) + except OSError: + pass + if os.path.exists(temp_location): + os.rename(temp_location, dst_file.name) # type: ignore + while contents != b"": + if dst_file is not None: + dst_file.write(contents) + checksum.update(contents) + contents = src_file.read(buffersize) + if dst_file is not None: + dst_file.flush() + return checksum.hexdigest().lower() + +def copy_job_order(job, job_order_object): + # type: (Any, Any) -> Any + """Create copy of job object for provenance.""" + if not hasattr(job, "tool"): + # direct command line tool execution + return job_order_object + customised_job = {} # new job object for RO + for each, i in enumerate(job.tool["inputs"]): + with SourceLine(job.tool["inputs"], each, WorkflowException, + _logger.isEnabledFor(logging.DEBUG)): + iid = shortname(i["id"]) + if iid in job_order_object: + customised_job[iid] = copy.deepcopy(job_order_object[iid]) + # add the input element in dictionary for provenance + elif "default" in i: + customised_job[iid] = copy.deepcopy(i["default"]) + # add the default elements in the dictionary for provenance + else: + pass + return customised_job
