Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/cwltool/provenance.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
| author | shellac |
|---|---|
| date | Sat, 02 May 2020 07:14:21 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:26e78fe6e8c4 |
|---|---|
| 1 """Stores Research Object including provenance.""" | |
| 2 from __future__ import absolute_import | |
| 3 | |
| 4 import copy | |
| 5 import datetime | |
| 6 import hashlib | |
| 7 import logging | |
| 8 import os | |
| 9 import re | |
| 10 import shutil | |
| 11 import tempfile | |
| 12 import uuid | |
| 13 from collections import OrderedDict | |
| 14 from getpass import getuser | |
| 15 from io import BytesIO, FileIO, TextIOWrapper, open | |
| 16 from socket import getfqdn | |
| 17 from typing import (IO, Any, Callable, Dict, List, Generator, | |
| 18 MutableMapping, Optional, Set, Tuple, Union, cast) | |
| 19 from types import ModuleType | |
| 20 | |
| 21 import prov.model as provM | |
| 22 import six | |
| 23 from prov.identifier import Identifier, Namespace | |
| 24 from prov.model import (PROV, ProvActivity, # pylint: disable=unused-import | |
| 25 ProvDocument, ProvEntity) | |
| 26 from pathlib2 import Path, PurePosixPath, PurePath | |
| 27 from ruamel import yaml | |
| 28 from schema_salad.sourceline import SourceLine | |
| 29 from six.moves import urllib | |
| 30 from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import | |
| 31 Text) | |
| 32 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
| 33 | |
| 34 from .context import RuntimeContext # pylint: disable=unused-import | |
| 35 from .errors import WorkflowException | |
| 36 from .loghandler import _logger | |
| 37 from .pathmapper import get_listing | |
| 38 from .process import Process, shortname # pylint: disable=unused-import | |
| 39 from .stdfsaccess import StdFsAccess # pylint: disable=unused-import | |
| 40 from .utils import json_dumps, versionstring, onWindows | |
| 41 | |
| 42 | |
| 43 # imports needed for retrieving user data | |
| 44 if onWindows(): | |
| 45 import ctypes # pylint: disable=unused-import | |
| 46 else: | |
| 47 try: | |
| 48 import pwd # pylint: disable=unused-import | |
| 49 except ImportError: | |
| 50 pass | |
| 51 | |
| 52 if TYPE_CHECKING: | |
| 53 from .command_line_tool import CommandLineTool, ExpressionTool # pylint: disable=unused-import | |
| 54 from .workflow import Workflow # pylint: disable=unused-import | |
| 55 | |
| 56 if six.PY2: | |
| 57 class PermissionError(OSError): # pylint: disable=redefined-builtin | |
| 58 """Needed for Python2.""" | |
| 59 | |
| 60 pass | |
| 61 __citation__ = "https://doi.org/10.5281/zenodo.1208477" | |
| 62 | |
| 63 # NOTE: Semantic versioning of the CWLProv Research Object | |
| 64 # **and** the cwlprov files | |
| 65 # | |
| 66 # Rough guide (major.minor.patch): | |
| 67 # 1. Bump major number if removing/"breaking" resources or PROV statements | |
| 68 # 2. Bump minor number if adding resources or PROV statements | |
| 69 # 3. Bump patch number for non-breaking non-adding changes, | |
| 70 # e.g. fixing broken relative paths | |
| 71 CWLPROV_VERSION = "https://w3id.org/cwl/prov/0.6.0" | |
| 72 | |
| 73 # Research Object folders | |
| 74 METADATA = "metadata" | |
| 75 DATA = "data" | |
| 76 WORKFLOW = "workflow" | |
| 77 SNAPSHOT = "snapshot" | |
| 78 # sub-folders | |
| 79 MAIN = os.path.join(WORKFLOW, "main") | |
| 80 PROVENANCE = os.path.join(METADATA, "provenance") | |
| 81 LOGS = os.path.join(METADATA, "logs") | |
| 82 WFDESC = Namespace("wfdesc", 'http://purl.org/wf4ever/wfdesc#') | |
| 83 WFPROV = Namespace("wfprov", 'http://purl.org/wf4ever/wfprov#') | |
| 84 WF4EVER = Namespace("wf4ever", 'http://purl.org/wf4ever/wf4ever#') | |
| 85 RO = Namespace("ro", 'http://purl.org/wf4ever/ro#') | |
| 86 ORE = Namespace("ore", 'http://www.openarchives.org/ore/terms/') | |
| 87 FOAF = Namespace("foaf", 'http://xmlns.com/foaf/0.1/') | |
| 88 SCHEMA = Namespace("schema", 'http://schema.org/') | |
| 89 CWLPROV = Namespace('cwlprov', 'https://w3id.org/cwl/prov#') | |
| 90 ORCID = Namespace("orcid", "https://orcid.org/") | |
| 91 UUID = Namespace("id", "urn:uuid:") | |
| 92 | |
| 93 # BagIt and YAML always use UTF-8 | |
| 94 ENCODING = "UTF-8" | |
| 95 TEXT_PLAIN = 'text/plain; charset="%s"' % ENCODING | |
| 96 | |
| 97 # sha1, compatible with the File type's "checksum" field | |
| 98 # e.g. "checksum" = "sha1$47a013e660d408619d894b20806b1d5086aab03b" | |
| 99 # See ./cwltool/schemas/v1.0/Process.yml | |
| 100 Hasher = hashlib.sha1 | |
| 101 SHA1 = "sha1" | |
| 102 SHA256 = "sha256" | |
| 103 SHA512 = "sha512" | |
| 104 | |
| 105 # TODO: Better identifiers for user, at least | |
| 106 # these should be preserved in ~/.config/cwl for every execution | |
| 107 # on this host | |
| 108 USER_UUID = uuid.uuid4().urn | |
| 109 ACCOUNT_UUID = uuid.uuid4().urn | |
| 110 | |
| 111 | |
| 112 def _posix_path(local_path): | |
| 113 # type: (Text) -> Text | |
| 114 return str(PurePosixPath(Path(local_path))) | |
| 115 | |
| 116 | |
| 117 def _local_path(posix_path): | |
| 118 # type: (Text) -> Text | |
| 119 return str(Path(posix_path)) | |
| 120 | |
| 121 | |
| 122 def _whoami(): | |
| 123 # type: () -> Tuple[Text,Text] | |
| 124 """Return the current operating system account as (username, fullname).""" | |
| 125 username = getuser() | |
| 126 try: | |
| 127 if onWindows(): | |
| 128 get_user_name = ctypes.windll.secur32.GetUserNameExW # type: ignore | |
| 129 size = ctypes.pointer(ctypes.c_ulong(0)) | |
| 130 get_user_name(3, None, size) | |
| 131 | |
| 132 name_buffer = ctypes.create_unicode_buffer(size.contents.value) | |
| 133 get_user_name(3, name_buffer, size) | |
| 134 fullname = str(name_buffer.value) | |
| 135 else: | |
| 136 fullname = pwd.getpwuid(os.getuid())[4].split(',')[0] | |
| 137 except (KeyError, IndexError): | |
| 138 fullname = username | |
| 139 | |
| 140 return (username, fullname) | |
| 141 | |
| 142 | |
| 143 class WritableBagFile(FileIO): | |
| 144 """Writes files in research object.""" | |
| 145 | |
| 146 def __init__(self, research_object, rel_path): | |
| 147 # type: (ResearchObject, Text) -> None | |
| 148 """Initialize an ROBagIt.""" | |
| 149 self.research_object = research_object | |
| 150 if Path(rel_path).is_absolute(): | |
| 151 raise ValueError("rel_path must be relative: %s" % rel_path) | |
| 152 self.rel_path = rel_path | |
| 153 self.hashes = {SHA1: hashlib.sha1(), # nosec | |
| 154 SHA256: hashlib.sha256(), | |
| 155 SHA512: hashlib.sha512()} | |
| 156 # Open file in Research Object folder | |
| 157 path = os.path.abspath(os.path.join(research_object.folder, _local_path(rel_path))) | |
| 158 if not path.startswith(os.path.abspath(research_object.folder)): | |
| 159 raise ValueError("Path is outside Research Object: %s" % path) | |
| 160 super(WritableBagFile, self).__init__(str(path), mode="w") | |
| 161 | |
| 162 | |
| 163 def write(self, b): | |
| 164 # type: (Union[bytes, Text]) -> int | |
| 165 if isinstance(b, bytes): | |
| 166 real_b = b | |
| 167 else: | |
| 168 real_b = b.encode('utf-8') | |
| 169 total = 0 | |
| 170 length = len(real_b) | |
| 171 while total < length: | |
| 172 ret = super(WritableBagFile, self).write(real_b) | |
| 173 if ret: | |
| 174 total += ret | |
| 175 for _ in self.hashes.values(): | |
| 176 _.update(real_b) | |
| 177 return total | |
| 178 | |
| 179 def close(self): # type: () -> None | |
| 180 # FIXME: Convert below block to a ResearchObject method? | |
| 181 if self.rel_path.startswith("data/"): | |
| 182 self.research_object.bagged_size[self.rel_path] = self.tell() | |
| 183 else: | |
| 184 self.research_object.tagfiles.add(self.rel_path) | |
| 185 | |
| 186 super(WritableBagFile, self).close() | |
| 187 # { "sha1": "f572d396fae9206628714fb2ce00f72e94f2258f" } | |
| 188 checksums = {} | |
| 189 for name in self.hashes: | |
| 190 checksums[name] = self.hashes[name].hexdigest().lower() | |
| 191 self.research_object.add_to_manifest(self.rel_path, checksums) | |
| 192 | |
| 193 # To simplify our hash calculation we won't support | |
| 194 # seeking, reading or truncating, as we can't do | |
| 195 # similar seeks in the current hash. | |
| 196 # TODO: Support these? At the expense of invalidating | |
| 197 # the current hash, then having to recalculate at close() | |
| 198 def seekable(self): # type: () -> bool | |
| 199 return False | |
| 200 | |
| 201 def readable(self): # type: () -> bool | |
| 202 return False | |
| 203 | |
| 204 def truncate(self, size=None): | |
| 205 # type: (Optional[int]) -> int | |
| 206 # FIXME: This breaks contract IOBase, | |
| 207 # as it means we would have to recalculate the hash | |
| 208 if size is not None: | |
| 209 raise IOError("WritableBagFile can't truncate") | |
| 210 return self.tell() | |
| 211 | |
| 212 | |
| 213 def _check_mod_11_2(numeric_string): | |
| 214 # type: (Text) -> bool | |
| 215 """ | |
| 216 Validate numeric_string for its MOD-11-2 checksum. | |
| 217 | |
| 218 Any "-" in the numeric_string are ignored. | |
| 219 | |
| 220 The last digit of numeric_string is assumed to be the checksum, 0-9 or X. | |
| 221 | |
| 222 See ISO/IEC 7064:2003 and | |
| 223 https://support.orcid.org/knowledgebase/articles/116780-structure-of-the-orcid-identifier | |
| 224 """ | |
| 225 # Strip - | |
| 226 nums = numeric_string.replace("-", "") | |
| 227 total = 0 | |
| 228 # skip last (check)digit | |
| 229 for num in nums[:-1]: | |
| 230 digit = int(num) | |
| 231 total = (total+digit)*2 | |
| 232 remainder = total % 11 | |
| 233 result = (12-remainder) % 11 | |
| 234 if result == 10: | |
| 235 checkdigit = "X" | |
| 236 else: | |
| 237 checkdigit = str(result) | |
| 238 # Compare against last digit or X | |
| 239 return nums[-1].upper() == checkdigit | |
| 240 | |
| 241 | |
| 242 def _valid_orcid(orcid): # type: (Optional[Text]) -> Text | |
| 243 """ | |
| 244 Ensure orcid is a valid ORCID identifier. | |
| 245 | |
| 246 The string must be equivalent to one of these forms: | |
| 247 | |
| 248 0000-0002-1825-0097 | |
| 249 orcid.org/0000-0002-1825-0097 | |
| 250 http://orcid.org/0000-0002-1825-0097 | |
| 251 https://orcid.org/0000-0002-1825-0097 | |
| 252 | |
| 253 If the ORCID number or prefix is invalid, a ValueError is raised. | |
| 254 | |
| 255 The returned ORCID string is always in the form of: | |
| 256 https://orcid.org/0000-0002-1825-0097 | |
| 257 """ | |
| 258 if orcid is None or not orcid: | |
| 259 raise ValueError(u'ORCID cannot be unspecified') | |
| 260 # Liberal in what we consume, e.g. ORCID.org/0000-0002-1825-009x | |
| 261 orcid = orcid.lower() | |
| 262 match = re.match( | |
| 263 # Note: concatinated r"" r"" below so we can add comments to pattern | |
| 264 | |
| 265 # Optional hostname, with or without protocol | |
| 266 r"(http://orcid\.org/|https://orcid\.org/|orcid\.org/)?" | |
| 267 # alternative pattern, but probably messier | |
| 268 # r"^((https?://)?orcid.org/)?" | |
| 269 | |
| 270 # ORCID number is always 4x4 numerical digits, | |
| 271 # but last digit (modulus 11 checksum) | |
| 272 # can also be X (but we made it lowercase above). | |
| 273 # e.g. 0000-0002-1825-0097 | |
| 274 # or 0000-0002-1694-233x | |
| 275 r"(?P<orcid>(\d{4}-\d{4}-\d{4}-\d{3}[0-9x]))$", | |
| 276 orcid) | |
| 277 | |
| 278 help_url = u"https://support.orcid.org/knowledgebase/articles/"\ | |
| 279 "116780-structure-of-the-orcid-identifier" | |
| 280 if not match: | |
| 281 raise ValueError(u"Invalid ORCID: %s\n%s" % (orcid, help_url)) | |
| 282 | |
| 283 # Conservative in what we produce: | |
| 284 # a) Ensure any checksum digit is uppercase | |
| 285 orcid_num = match.group("orcid").upper() | |
| 286 # b) ..and correct | |
| 287 if not _check_mod_11_2(orcid_num): | |
| 288 raise ValueError( | |
| 289 u"Invalid ORCID checksum: %s\n%s" % (orcid_num, help_url)) | |
| 290 | |
| 291 # c) Re-add the official prefix https://orcid.org/ | |
| 292 return u"https://orcid.org/%s" % orcid_num | |
| 293 | |
| 294 | |
| 295 class ProvenanceProfile(): | |
| 296 """ | |
| 297 Provenance profile. | |
| 298 | |
| 299 Populated as the workflow runs. | |
| 300 """ | |
| 301 | |
| 302 def __init__(self, | |
| 303 research_object, # type: ResearchObject | |
| 304 full_name, # type: str | |
| 305 host_provenance, # type: bool | |
| 306 user_provenance, # type: bool | |
| 307 orcid, # type: str | |
| 308 fsaccess, # type: StdFsAccess | |
| 309 run_uuid=None # type: Optional[uuid.UUID] | |
| 310 ): # type: (...) -> None | |
| 311 """Initialize the provenance profile.""" | |
| 312 self.fsaccess = fsaccess | |
| 313 self.orcid = orcid | |
| 314 self.research_object = research_object | |
| 315 self.folder = self.research_object.folder | |
| 316 self.document = ProvDocument() | |
| 317 self.host_provenance = host_provenance | |
| 318 self.user_provenance = user_provenance | |
| 319 self.engine_uuid = research_object.engine_uuid | |
| 320 self.add_to_manifest = self.research_object.add_to_manifest | |
| 321 if self.orcid: | |
| 322 _logger.debug(u"[provenance] Creator ORCID: %s", self.orcid) | |
| 323 self.full_name = full_name | |
| 324 if self.full_name: | |
| 325 _logger.debug(u"[provenance] Creator Full name: %s", | |
| 326 self.full_name) | |
| 327 if run_uuid is None: | |
| 328 run_uuid = uuid.uuid4() | |
| 329 self.workflow_run_uuid = run_uuid | |
| 330 self.workflow_run_uri = run_uuid.urn | |
| 331 self.generate_prov_doc() | |
| 332 | |
| 333 def __str__(self): # type: () -> str | |
| 334 """Represent this Provenvance profile as a string.""" | |
| 335 return "ProvenanceProfile <%s> in <%s>" % ( | |
| 336 self.workflow_run_uri, self.research_object) | |
| 337 | |
| 338 def generate_prov_doc(self): | |
| 339 # type: () -> Tuple[str, ProvDocument] | |
| 340 """Add basic namespaces.""" | |
| 341 def host_provenance(document): | |
| 342 # type: (ProvDocument) -> None | |
| 343 """Record host provenance.""" | |
| 344 document.add_namespace(CWLPROV) | |
| 345 document.add_namespace(UUID) | |
| 346 document.add_namespace(FOAF) | |
| 347 | |
| 348 hostname = getfqdn() | |
| 349 # won't have a foaf:accountServiceHomepage for unix hosts, but | |
| 350 # we can at least provide hostname | |
| 351 document.agent( | |
| 352 ACCOUNT_UUID, {provM.PROV_TYPE: FOAF["OnlineAccount"], | |
| 353 "prov:location": hostname, | |
| 354 CWLPROV["hostname"]: hostname}) | |
| 355 | |
| 356 self.cwltool_version = "cwltool %s" % versionstring().split()[-1] | |
| 357 self.document.add_namespace( | |
| 358 'wfprov', 'http://purl.org/wf4ever/wfprov#') | |
| 359 # document.add_namespace('prov', 'http://www.w3.org/ns/prov#') | |
| 360 self.document.add_namespace( | |
| 361 'wfdesc', 'http://purl.org/wf4ever/wfdesc#') | |
| 362 # TODO: Make this ontology. For now only has cwlprov:image | |
| 363 self.document.add_namespace('cwlprov', 'https://w3id.org/cwl/prov#') | |
| 364 self.document.add_namespace('foaf', 'http://xmlns.com/foaf/0.1/') | |
| 365 self.document.add_namespace('schema', 'http://schema.org/') | |
| 366 self.document.add_namespace('orcid', 'https://orcid.org/') | |
| 367 self.document.add_namespace('id', 'urn:uuid:') | |
| 368 # NOTE: Internet draft expired 2004-03-04 (!) | |
| 369 # https://tools.ietf.org/html/draft-thiemann-hash-urn-01 | |
| 370 # TODO: Change to nih:sha-256; hashes | |
| 371 # https://tools.ietf.org/html/rfc6920#section-7 | |
| 372 self.document.add_namespace('data', 'urn:hash::sha1:') | |
| 373 # Also needed for docker images | |
| 374 self.document.add_namespace(SHA256, "nih:sha-256;") | |
| 375 | |
| 376 # info only, won't really be used by prov as sub-resources use / | |
| 377 self.document.add_namespace( | |
| 378 'researchobject', self.research_object.base_uri) | |
| 379 # annotations | |
| 380 self.metadata_ns = self.document.add_namespace( | |
| 381 'metadata', self.research_object.base_uri + METADATA + "/") | |
| 382 # Pre-register provenance directory so we can refer to its files | |
| 383 self.provenance_ns = self.document.add_namespace( | |
| 384 'provenance', self.research_object.base_uri | |
| 385 + _posix_path(PROVENANCE) + "/") | |
| 386 ro_identifier_workflow = self.research_object.base_uri \ | |
| 387 + "workflow/packed.cwl#" | |
| 388 self.wf_ns = self.document.add_namespace("wf", ro_identifier_workflow) | |
| 389 ro_identifier_input = self.research_object.base_uri \ | |
| 390 + "workflow/primary-job.json#" | |
| 391 self.document.add_namespace("input", ro_identifier_input) | |
| 392 | |
| 393 # More info about the account (e.g. username, fullname) | |
| 394 # may or may not have been previously logged by user_provenance() | |
| 395 # .. but we always know cwltool was launched (directly or indirectly) | |
| 396 # by a user account, as cwltool is a command line tool | |
| 397 account = self.document.agent(ACCOUNT_UUID) | |
| 398 if self.orcid or self.full_name: | |
| 399 person = {provM.PROV_TYPE: PROV["Person"], | |
| 400 "prov:type": SCHEMA["Person"]} | |
| 401 if self.full_name: | |
| 402 person["prov:label"] = self.full_name | |
| 403 person["foaf:name"] = self.full_name | |
| 404 person["schema:name"] = self.full_name | |
| 405 else: | |
| 406 # TODO: Look up name from ORCID API? | |
| 407 pass | |
| 408 agent = self.document.agent(self.orcid or uuid.uuid4().urn, | |
| 409 person) | |
| 410 self.document.actedOnBehalfOf(account, agent) | |
| 411 else: | |
| 412 if self.host_provenance: | |
| 413 host_provenance(self.document) | |
| 414 if self.user_provenance: | |
| 415 self.research_object.user_provenance(self.document) | |
| 416 # The execution of cwltool | |
| 417 wfengine = self.document.agent( | |
| 418 self.engine_uuid, | |
| 419 {provM.PROV_TYPE: PROV["SoftwareAgent"], | |
| 420 "prov:type": WFPROV["WorkflowEngine"], | |
| 421 "prov:label": self.cwltool_version}) | |
| 422 # FIXME: This datetime will be a bit too delayed, we should | |
| 423 # capture when cwltool.py earliest started? | |
| 424 self.document.wasStartedBy( | |
| 425 wfengine, None, account, datetime.datetime.now()) | |
| 426 # define workflow run level activity | |
| 427 self.document.activity( | |
| 428 self.workflow_run_uri, datetime.datetime.now(), None, | |
| 429 {provM.PROV_TYPE: WFPROV["WorkflowRun"], | |
| 430 "prov:label": "Run of workflow/packed.cwl#main"}) | |
| 431 # association between SoftwareAgent and WorkflowRun | |
| 432 main_workflow = "wf:main" | |
| 433 self.document.wasAssociatedWith( | |
| 434 self.workflow_run_uri, self.engine_uuid, main_workflow) | |
| 435 self.document.wasStartedBy( | |
| 436 self.workflow_run_uri, None, self.engine_uuid, | |
| 437 datetime.datetime.now()) | |
| 438 return (self.workflow_run_uri, self.document) | |
| 439 | |
| 440 def evaluate(self, | |
| 441 process, # type: Process | |
| 442 job, # type: Any | |
| 443 job_order_object, # type: Dict[Text, Text] | |
| 444 research_obj # type: ResearchObject | |
| 445 ): # type: (...) -> None | |
| 446 """Evaluate the nature of job.""" | |
| 447 if not hasattr(process, "steps"): | |
| 448 # record provenance of independent commandline tool executions | |
| 449 self.prospective_prov(job) | |
| 450 customised_job = copy_job_order(job, job_order_object) | |
| 451 self.used_artefacts(customised_job, self.workflow_run_uri) | |
| 452 research_obj.create_job(customised_job, job) | |
| 453 elif hasattr(job, "workflow"): | |
| 454 # record provenance of workflow executions | |
| 455 self.prospective_prov(job) | |
| 456 customised_job = copy_job_order(job, job_order_object) | |
| 457 self.used_artefacts(customised_job, self.workflow_run_uri) | |
| 458 | |
| 459 def record_process_start(self, process, job, process_run_id=None): | |
| 460 # type: (Process, Any, Optional[str]) -> Optional[str] | |
| 461 if not hasattr(process, 'steps'): | |
| 462 process_run_id = self.workflow_run_uri | |
| 463 elif not hasattr(job, 'workflow'): | |
| 464 # commandline tool execution as part of workflow | |
| 465 name = str(job.name) if hasattr(job, 'name') else '' | |
| 466 process_name = urllib.parse.quote(name, safe=":/,#") | |
| 467 process_run_id = self.start_process(process_name, datetime.datetime.now()) | |
| 468 return process_run_id | |
| 469 | |
| 470 def start_process(self, process_name, when, process_run_id=None): | |
| 471 # type: (Text, datetime.datetime, Optional[str]) -> str | |
| 472 """Record the start of each Process.""" | |
| 473 if process_run_id is None: | |
| 474 process_run_id = uuid.uuid4().urn | |
| 475 prov_label = "Run of workflow/packed.cwl#main/" + process_name | |
| 476 self.document.activity( | |
| 477 process_run_id, None, None, | |
| 478 {provM.PROV_TYPE: WFPROV["ProcessRun"], | |
| 479 provM.PROV_LABEL: prov_label}) | |
| 480 self.document.wasAssociatedWith( | |
| 481 process_run_id, self.engine_uuid, str("wf:main/" + process_name)) | |
| 482 self.document.wasStartedBy( | |
| 483 process_run_id, None, self.workflow_run_uri, | |
| 484 when, None, None) | |
| 485 return process_run_id | |
| 486 | |
| 487 def record_process_end(self, process_name, process_run_id, outputs, when): | |
| 488 # type: (Text, str, Any, datetime.datetime) -> None | |
| 489 self.generate_output_prov(outputs, process_run_id, process_name) | |
| 490 self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when) | |
| 491 | |
| 492 def declare_file(self, value): | |
| 493 # type: (MutableMapping[Text, Any]) -> Tuple[ProvEntity, ProvEntity, str] | |
| 494 if value["class"] != "File": | |
| 495 raise ValueError("Must have class:File: %s" % value) | |
| 496 # Need to determine file hash aka RO filename | |
| 497 entity = None # type: Optional[ProvEntity] | |
| 498 checksum = None | |
| 499 if 'checksum' in value: | |
| 500 csum = value['checksum'] | |
| 501 (method, checksum) = csum.split("$", 1) | |
| 502 if method == SHA1 and \ | |
| 503 self.research_object.has_data_file(checksum): | |
| 504 entity = self.document.entity("data:" + checksum) | |
| 505 | |
| 506 if not entity and 'location' in value: | |
| 507 location = str(value['location']) | |
| 508 # If we made it here, we'll have to add it to the RO | |
| 509 with self.fsaccess.open(location, "rb") as fhandle: | |
| 510 relative_path = self.research_object.add_data_file(fhandle) | |
| 511 # FIXME: This naively relies on add_data_file setting hash as filename | |
| 512 checksum = PurePath(relative_path).name | |
| 513 entity = self.document.entity( | |
| 514 "data:" + checksum, {provM.PROV_TYPE: WFPROV["Artifact"]}) | |
| 515 if "checksum" not in value: | |
| 516 value["checksum"] = "%s$%s" % (SHA1, checksum) | |
| 517 | |
| 518 | |
| 519 if not entity and 'contents' in value: | |
| 520 # Anonymous file, add content as string | |
| 521 entity, checksum = self.declare_string(value["contents"]) | |
| 522 | |
| 523 # By here one of them should have worked! | |
| 524 if not entity or not checksum: | |
| 525 raise ValueError("class:File but missing checksum/location/content: %r" % value) | |
| 526 | |
| 527 | |
| 528 # Track filename and extension, this is generally useful only for | |
| 529 # secondaryFiles. Note that multiple uses of a file might thus record | |
| 530 # different names for the same entity, so we'll | |
| 531 # make/track a specialized entity by UUID | |
| 532 file_id = value.setdefault("@id", uuid.uuid4().urn) | |
| 533 # A specialized entity that has just these names | |
| 534 file_entity = self.document.entity( | |
| 535 file_id, [(provM.PROV_TYPE, WFPROV["Artifact"]), | |
| 536 (provM.PROV_TYPE, WF4EVER["File"])]) # type: ProvEntity | |
| 537 | |
| 538 if "basename" in value: | |
| 539 file_entity.add_attributes({CWLPROV["basename"]: value["basename"]}) | |
| 540 if "nameroot" in value: | |
| 541 file_entity.add_attributes({CWLPROV["nameroot"]: value["nameroot"]}) | |
| 542 if "nameext" in value: | |
| 543 file_entity.add_attributes({CWLPROV["nameext"]: value["nameext"]}) | |
| 544 self.document.specializationOf(file_entity, entity) | |
| 545 | |
| 546 # Check for secondaries | |
| 547 for sec in value.get("secondaryFiles", ()): | |
| 548 # TODO: Record these in a specializationOf entity with UUID? | |
| 549 if sec['class'] == "File": | |
| 550 (sec_entity, _, _) = self.declare_file(sec) | |
| 551 elif sec['class'] == "Directory": | |
| 552 sec_entity = self.declare_directory(sec) | |
| 553 else: | |
| 554 raise ValueError("Got unexpected secondaryFiles value: {}".format(sec)) | |
| 555 # We don't know how/when/where the secondary file was generated, | |
| 556 # but CWL convention is a kind of summary/index derived | |
| 557 # from the original file. As its generally in a different format | |
| 558 # then prov:Quotation is not appropriate. | |
| 559 self.document.derivation( | |
| 560 sec_entity, file_entity, | |
| 561 other_attributes={PROV["type"]: CWLPROV["SecondaryFile"]}) | |
| 562 | |
| 563 return file_entity, entity, checksum | |
| 564 | |
| 565 def declare_directory(self, value): # type: (MutableMapping[Text, Any]) -> ProvEntity | |
| 566 """Register any nested files/directories.""" | |
| 567 # FIXME: Calculate a hash-like identifier for directory | |
| 568 # so we get same value if it's the same filenames/hashes | |
| 569 # in a different location. | |
| 570 # For now, mint a new UUID to identify this directory, but | |
| 571 # attempt to keep it inside the value dictionary | |
| 572 dir_id = value.setdefault("@id", uuid.uuid4().urn) | |
| 573 | |
| 574 # New annotation file to keep the ORE Folder listing | |
| 575 ore_doc_fn = dir_id.replace("urn:uuid:", "directory-") + ".ttl" | |
| 576 dir_bundle = self.document.bundle(self.metadata_ns[ore_doc_fn]) | |
| 577 | |
| 578 coll = self.document.entity( | |
| 579 dir_id, [(provM.PROV_TYPE, WFPROV["Artifact"]), | |
| 580 (provM.PROV_TYPE, PROV["Collection"]), | |
| 581 (provM.PROV_TYPE, PROV["Dictionary"]), | |
| 582 (provM.PROV_TYPE, RO["Folder"])]) | |
| 583 # ORE description of ro:Folder, saved separately | |
| 584 coll_b = dir_bundle.entity( | |
| 585 dir_id, [(provM.PROV_TYPE, RO["Folder"]), | |
| 586 (provM.PROV_TYPE, ORE["Aggregation"])]) | |
| 587 self.document.mentionOf(dir_id + "#ore", dir_id, dir_bundle.identifier) | |
| 588 | |
| 589 # dir_manifest = dir_bundle.entity( | |
| 590 # dir_bundle.identifier, {PROV["type"]: ORE["ResourceMap"], | |
| 591 # ORE["describes"]: coll_b.identifier}) | |
| 592 | |
| 593 coll_attribs = [(ORE["isDescribedBy"], dir_bundle.identifier)] | |
| 594 coll_b_attribs = [] # type: List[Tuple[Identifier, ProvEntity]] | |
| 595 | |
| 596 # FIXME: .listing might not be populated yet - hopefully | |
| 597 # a later call to this method will sort that | |
| 598 is_empty = True | |
| 599 | |
| 600 if "listing" not in value: | |
| 601 get_listing(self.fsaccess, value) | |
| 602 for entry in value.get("listing", []): | |
| 603 is_empty = False | |
| 604 # Declare child-artifacts | |
| 605 entity = self.declare_artefact(entry) | |
| 606 self.document.membership(coll, entity) | |
| 607 # Membership relation aka our ORE Proxy | |
| 608 m_id = uuid.uuid4().urn | |
| 609 m_entity = self.document.entity(m_id) | |
| 610 m_b = dir_bundle.entity(m_id) | |
| 611 | |
| 612 # PROV-O style Dictionary | |
| 613 # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition | |
| 614 # ..as prov.py do not currently allow PROV-N extensions | |
| 615 # like hadDictionaryMember(..) | |
| 616 m_entity.add_asserted_type(PROV["KeyEntityPair"]) | |
| 617 | |
| 618 m_entity.add_attributes({ | |
| 619 PROV["pairKey"]: entry["basename"], | |
| 620 PROV["pairEntity"]: entity, | |
| 621 }) | |
| 622 | |
| 623 # As well as a being a | |
| 624 # http://wf4ever.github.io/ro/2016-01-28/ro/#FolderEntry | |
| 625 m_b.add_asserted_type(RO["FolderEntry"]) | |
| 626 m_b.add_asserted_type(ORE["Proxy"]) | |
| 627 m_b.add_attributes({ | |
| 628 RO["entryName"]: entry["basename"], | |
| 629 ORE["proxyIn"]: coll, | |
| 630 ORE["proxyFor"]: entity, | |
| 631 | |
| 632 }) | |
| 633 coll_attribs.append((PROV["hadDictionaryMember"], m_entity)) | |
| 634 coll_b_attribs.append((ORE["aggregates"], m_b)) | |
| 635 | |
| 636 coll.add_attributes(coll_attribs) | |
| 637 coll_b.add_attributes(coll_b_attribs) | |
| 638 | |
| 639 # Also Save ORE Folder as annotation metadata | |
| 640 ore_doc = ProvDocument() | |
| 641 ore_doc.add_namespace(ORE) | |
| 642 ore_doc.add_namespace(RO) | |
| 643 ore_doc.add_namespace(UUID) | |
| 644 ore_doc.add_bundle(dir_bundle) | |
| 645 ore_doc = ore_doc.flattened() | |
| 646 ore_doc_path = str(PurePosixPath(METADATA, ore_doc_fn)) | |
| 647 with self.research_object.write_bag_file(ore_doc_path) as provenance_file: | |
| 648 ore_doc.serialize(provenance_file, format="rdf", rdf_format="turtle") | |
| 649 self.research_object.add_annotation(dir_id, [ore_doc_fn], ORE["isDescribedBy"].uri) | |
| 650 | |
| 651 if is_empty: | |
| 652 # Empty directory | |
| 653 coll.add_asserted_type(PROV["EmptyCollection"]) | |
| 654 coll.add_asserted_type(PROV["EmptyDictionary"]) | |
| 655 self.research_object.add_uri(coll.identifier.uri) | |
| 656 return coll | |
| 657 | |
| 658 def declare_string(self, value): | |
| 659 # type: (Union[Text, str]) -> Tuple[ProvEntity,Text] | |
| 660 """Save as string in UTF-8.""" | |
| 661 byte_s = BytesIO(str(value).encode(ENCODING)) | |
| 662 data_file = self.research_object.add_data_file(byte_s, content_type=TEXT_PLAIN) | |
| 663 checksum = PurePosixPath(data_file).name | |
| 664 # FIXME: Don't naively assume add_data_file uses hash in filename! | |
| 665 data_id = "data:%s" % PurePosixPath(data_file).stem | |
| 666 entity = self.document.entity( | |
| 667 data_id, {provM.PROV_TYPE: WFPROV["Artifact"], | |
| 668 provM.PROV_VALUE: str(value)}) # type: ProvEntity | |
| 669 return entity, checksum | |
| 670 | |
| 671 def declare_artefact(self, value): | |
| 672 # type: (Any) -> ProvEntity | |
| 673 """Create data artefact entities for all file objects.""" | |
| 674 if value is None: | |
| 675 # FIXME: If this can happen in CWL, we'll | |
| 676 # need a better way to represent this in PROV | |
| 677 return self.document.entity( | |
| 678 CWLPROV["None"], {provM.PROV_LABEL: "None"}) | |
| 679 | |
| 680 if isinstance(value, (bool, int, float)): | |
| 681 # Typically used in job documents for flags | |
| 682 | |
| 683 # FIXME: Make consistent hash URIs for these | |
| 684 # that somehow include the type | |
| 685 # (so "1" != 1 != "1.0" != true) | |
| 686 entity = self.document.entity( | |
| 687 uuid.uuid4().urn, {provM.PROV_VALUE: value}) | |
| 688 self.research_object.add_uri(entity.identifier.uri) | |
| 689 return entity | |
| 690 | |
| 691 if isinstance(value, (Text, str)): | |
| 692 (entity, _) = self.declare_string(value) | |
| 693 return entity | |
| 694 | |
| 695 if isinstance(value, bytes): | |
| 696 # If we got here then we must be in Python 3 | |
| 697 byte_s = BytesIO(value) | |
| 698 data_file = self.research_object.add_data_file(byte_s) | |
| 699 # FIXME: Don't naively assume add_data_file uses hash in filename! | |
| 700 data_id = "data:%s" % PurePosixPath(data_file).stem | |
| 701 return self.document.entity( | |
| 702 data_id, {provM.PROV_TYPE: WFPROV["Artifact"], | |
| 703 provM.PROV_VALUE: str(value)}) | |
| 704 | |
| 705 if isinstance(value, MutableMapping): | |
| 706 if "@id" in value: | |
| 707 # Already processed this value, but it might not be in this PROV | |
| 708 entities = self.document.get_record(value["@id"]) | |
| 709 if entities: | |
| 710 return entities[0] | |
| 711 # else, unknown in PROV, re-add below as if it's fresh | |
| 712 | |
| 713 # Base case - we found a File we need to update | |
| 714 if value.get("class") == "File": | |
| 715 (entity, _, _) = self.declare_file(value) | |
| 716 value["@id"] = entity.identifier.uri | |
| 717 return entity | |
| 718 | |
| 719 if value.get("class") == "Directory": | |
| 720 entity = self.declare_directory(value) | |
| 721 value["@id"] = entity.identifier.uri | |
| 722 return entity | |
| 723 coll_id = value.setdefault("@id", uuid.uuid4().urn) | |
| 724 # some other kind of dictionary? | |
| 725 # TODO: also Save as JSON | |
| 726 coll = self.document.entity( | |
| 727 coll_id, [(provM.PROV_TYPE, WFPROV["Artifact"]), | |
| 728 (provM.PROV_TYPE, PROV["Collection"]), | |
| 729 (provM.PROV_TYPE, PROV["Dictionary"])]) | |
| 730 | |
| 731 if value.get("class"): | |
| 732 _logger.warning("Unknown data class %s.", value["class"]) | |
| 733 # FIXME: The class might be "http://example.com/somethingelse" | |
| 734 coll.add_asserted_type(CWLPROV[value["class"]]) | |
| 735 | |
| 736 # Let's iterate and recurse | |
| 737 coll_attribs = [] # type: List[Tuple[Identifier, ProvEntity]] | |
| 738 for (key, val) in value.items(): | |
| 739 v_ent = self.declare_artefact(val) | |
| 740 self.document.membership(coll, v_ent) | |
| 741 m_entity = self.document.entity(uuid.uuid4().urn) | |
| 742 # Note: only support PROV-O style dictionary | |
| 743 # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition | |
| 744 # as prov.py do not easily allow PROV-N extensions | |
| 745 m_entity.add_asserted_type(PROV["KeyEntityPair"]) | |
| 746 m_entity.add_attributes({ | |
| 747 PROV["pairKey"]: str(key), | |
| 748 PROV["pairEntity"]: v_ent | |
| 749 }) | |
| 750 coll_attribs.append((PROV["hadDictionaryMember"], m_entity)) | |
| 751 coll.add_attributes(coll_attribs) | |
| 752 self.research_object.add_uri(coll.identifier.uri) | |
| 753 return coll | |
| 754 | |
| 755 # some other kind of Collection? | |
| 756 # TODO: also save as JSON | |
| 757 try: | |
| 758 members = [] | |
| 759 for each_input_obj in iter(value): | |
| 760 # Recurse and register any nested objects | |
| 761 e = self.declare_artefact(each_input_obj) | |
| 762 members.append(e) | |
| 763 | |
| 764 # If we reached this, then we were allowed to iterate | |
| 765 coll = self.document.entity( | |
| 766 uuid.uuid4().urn, [(provM.PROV_TYPE, WFPROV["Artifact"]), | |
| 767 (provM.PROV_TYPE, PROV["Collection"])]) | |
| 768 if not members: | |
| 769 coll.add_asserted_type(PROV["EmptyCollection"]) | |
| 770 else: | |
| 771 for member in members: | |
| 772 # FIXME: This won't preserve order, for that | |
| 773 # we would need to use PROV.Dictionary | |
| 774 # with numeric keys | |
| 775 self.document.membership(coll, member) | |
| 776 self.research_object.add_uri(coll.identifier.uri) | |
| 777 # FIXME: list value does not support adding "@id" | |
| 778 return coll | |
| 779 except TypeError: | |
| 780 _logger.warning("Unrecognized type %s of %r", | |
| 781 type(value), value) | |
| 782 # Let's just fall back to Python repr() | |
| 783 entity = self.document.entity( | |
| 784 uuid.uuid4().urn, {provM.PROV_LABEL: repr(value)}) | |
| 785 self.research_object.add_uri(entity.identifier.uri) | |
| 786 return entity | |
| 787 | |
| 788 def used_artefacts(self, | |
| 789 job_order, # type: Union[Dict[Any, Any], List[Dict[Any, Any]]] | |
| 790 process_run_id, # type: str | |
| 791 name=None # type: Optional[str] | |
| 792 ): # type: (...) -> None | |
| 793 """Add used() for each data artefact.""" | |
| 794 if isinstance(job_order, list): | |
| 795 for entry in job_order: | |
| 796 self.used_artefacts(entry, process_run_id, name) | |
| 797 else: | |
| 798 # FIXME: Use workflow name in packed.cwl, "main" is wrong for nested workflows | |
| 799 base = "main" | |
| 800 if name is not None: | |
| 801 base += "/" + name | |
| 802 for key, value in job_order.items(): | |
| 803 prov_role = self.wf_ns["%s/%s" % (base, key)] | |
| 804 try: | |
| 805 entity = self.declare_artefact(value) | |
| 806 self.document.used( | |
| 807 process_run_id, entity, datetime.datetime.now(), None, | |
| 808 {"prov:role": prov_role}) | |
| 809 except OSError: | |
| 810 pass | |
| 811 | |
| 812 def generate_output_prov(self, | |
| 813 final_output, # type: Union[Dict[Text, Any], List[Dict[Text, Any]]] | |
| 814 process_run_id, # type: Optional[str] | |
| 815 name # type: Optional[Text] | |
| 816 ): # type: (...) -> None | |
| 817 """Call wasGeneratedBy() for each output,copy the files into the RO.""" | |
| 818 if isinstance(final_output, list): | |
| 819 for entry in final_output: | |
| 820 self.generate_output_prov(entry, process_run_id, name) | |
| 821 else: | |
| 822 # Timestamp should be created at the earliest | |
| 823 timestamp = datetime.datetime.now() | |
| 824 | |
| 825 # For each output, find/register the corresponding | |
| 826 # entity (UUID) and document it as generated in | |
| 827 # a role corresponding to the output | |
| 828 for output, value in final_output.items(): | |
| 829 entity = self.declare_artefact(value) | |
| 830 if name is not None: | |
| 831 name = urllib.parse.quote(str(name), safe=":/,#") | |
| 832 # FIXME: Probably not "main" in nested workflows | |
| 833 role = self.wf_ns["main/%s/%s" % (name, output)] | |
| 834 else: | |
| 835 role = self.wf_ns["main/%s" % output] | |
| 836 | |
| 837 if not process_run_id: | |
| 838 process_run_id = self.workflow_run_uri | |
| 839 | |
| 840 self.document.wasGeneratedBy( | |
| 841 entity, process_run_id, timestamp, None, {"prov:role": role}) | |
| 842 | |
| 843 def prospective_prov(self, job): | |
| 844 # type: (Any) -> None | |
| 845 """Create prospective prov recording as wfdesc prov:Plan.""" | |
| 846 if not hasattr(job, "steps"): | |
| 847 # direct command line tool execution | |
| 848 self.document.entity( | |
| 849 "wf:main", {provM.PROV_TYPE: WFDESC["Process"], | |
| 850 "prov:type": PROV["Plan"], | |
| 851 "prov:label":"Prospective provenance"}) | |
| 852 return | |
| 853 | |
| 854 self.document.entity( | |
| 855 "wf:main", {provM.PROV_TYPE: WFDESC["Workflow"], | |
| 856 "prov:type": PROV["Plan"], | |
| 857 "prov:label":"Prospective provenance"}) | |
| 858 | |
| 859 for step in job.steps: | |
| 860 stepnametemp = "wf:main/" + str(step.name)[5:] | |
| 861 stepname = urllib.parse.quote(stepnametemp, safe=":/,#") | |
| 862 step = self.document.entity( | |
| 863 stepname, {provM.PROV_TYPE: WFDESC["Process"], | |
| 864 "prov:type": PROV["Plan"]}) | |
| 865 self.document.entity( | |
| 866 "wf:main", {"wfdesc:hasSubProcess": step, | |
| 867 "prov:label": "Prospective provenance"}) | |
| 868 # TODO: Declare roles/parameters as well | |
| 869 | |
| 870 def activity_has_provenance(self, activity, prov_ids): | |
| 871 # type: (str, List[Identifier]) -> None | |
| 872 """Add http://www.w3.org/TR/prov-aq/ relations to nested PROV files.""" | |
| 873 # NOTE: The below will only work if the corresponding metadata/provenance arcp URI | |
| 874 # is a pre-registered namespace in the PROV Document | |
| 875 attribs = [(PROV["has_provenance"], prov_id) for prov_id in prov_ids] | |
| 876 self.document.activity(activity, other_attributes=attribs) | |
| 877 # Tip: we can't use https://www.w3.org/TR/prov-links/#term-mention | |
| 878 # as prov:mentionOf() is only for entities, not activities | |
| 879 uris = [i.uri for i in prov_ids] | |
| 880 self.research_object.add_annotation(activity, uris, PROV["has_provenance"].uri) | |
| 881 | |
| 882 def finalize_prov_profile(self, name): | |
| 883 # type: (Optional[Text]) -> List[Identifier] | |
| 884 """Transfer the provenance related files to the RO.""" | |
| 885 # NOTE: Relative posix path | |
| 886 if name is None: | |
| 887 # master workflow, fixed filenames | |
| 888 filename = "primary.cwlprov" | |
| 889 else: | |
| 890 # ASCII-friendly filename, avoiding % as we don't want %2520 in manifest.json | |
| 891 wf_name = urllib.parse.quote(str(name), safe="").replace("%", "_") | |
| 892 # Note that the above could cause overlaps for similarly named | |
| 893 # workflows, but that's OK as we'll also include run uuid | |
| 894 # which also covers thhe case of this step being run in | |
| 895 # multiple places or iterations | |
| 896 filename = "%s.%s.cwlprov" % (wf_name, self.workflow_run_uuid) | |
| 897 | |
| 898 basename = str(PurePosixPath(PROVENANCE)/filename) | |
| 899 | |
| 900 # TODO: Also support other profiles than CWLProv, e.g. ProvOne | |
| 901 | |
| 902 # list of prov identifiers of provenance files | |
| 903 prov_ids = [] | |
| 904 | |
| 905 # https://www.w3.org/TR/prov-xml/ | |
| 906 with self.research_object.write_bag_file(basename + ".xml") as provenance_file: | |
| 907 self.document.serialize(provenance_file, format="xml", indent=4) | |
| 908 prov_ids.append(self.provenance_ns[filename + ".xml"]) | |
| 909 | |
| 910 # https://www.w3.org/TR/prov-n/ | |
| 911 with self.research_object.write_bag_file(basename + ".provn") as provenance_file: | |
| 912 self.document.serialize(provenance_file, format="provn", indent=2) | |
| 913 prov_ids.append(self.provenance_ns[filename + ".provn"]) | |
| 914 | |
| 915 # https://www.w3.org/Submission/prov-json/ | |
| 916 with self.research_object.write_bag_file(basename + ".json") as provenance_file: | |
| 917 self.document.serialize(provenance_file, format="json", indent=2) | |
| 918 prov_ids.append(self.provenance_ns[filename + ".json"]) | |
| 919 | |
| 920 # "rdf" aka https://www.w3.org/TR/prov-o/ | |
| 921 # which can be serialized to ttl/nt/jsonld (and more!) | |
| 922 | |
| 923 # https://www.w3.org/TR/turtle/ | |
| 924 with self.research_object.write_bag_file(basename + ".ttl") as provenance_file: | |
| 925 self.document.serialize(provenance_file, format="rdf", rdf_format="turtle") | |
| 926 prov_ids.append(self.provenance_ns[filename + ".ttl"]) | |
| 927 | |
| 928 # https://www.w3.org/TR/n-triples/ | |
| 929 with self.research_object.write_bag_file(basename + ".nt") as provenance_file: | |
| 930 self.document.serialize(provenance_file, format="rdf", rdf_format="ntriples") | |
| 931 prov_ids.append(self.provenance_ns[filename + ".nt"]) | |
| 932 | |
| 933 # https://www.w3.org/TR/json-ld/ | |
| 934 # TODO: Use a nice JSON-LD context | |
| 935 # see also https://eprints.soton.ac.uk/395985/ | |
| 936 # 404 Not Found on https://provenance.ecs.soton.ac.uk/prov.jsonld :( | |
| 937 with self.research_object.write_bag_file(basename + ".jsonld") as provenance_file: | |
| 938 self.document.serialize(provenance_file, format="rdf", rdf_format="json-ld") | |
| 939 prov_ids.append(self.provenance_ns[filename + ".jsonld"]) | |
| 940 | |
| 941 _logger.debug(u"[provenance] added provenance: %s", prov_ids) | |
| 942 return prov_ids | |
| 943 | |
| 944 | |
| 945 class ResearchObject(): | |
| 946 """CWLProv Research Object.""" | |
| 947 | |
| 948 def __init__(self, fsaccess, temp_prefix_ro="tmp", orcid='', full_name=''): | |
| 949 # type: (StdFsAccess, str, Text, Text) -> None | |
| 950 """Initialize the ResearchObject.""" | |
| 951 self.temp_prefix = temp_prefix_ro | |
| 952 self.orcid = '' if not orcid else _valid_orcid(orcid) | |
| 953 self.full_name = full_name | |
| 954 tmp_dir, tmp_prefix = os.path.split(temp_prefix_ro) | |
| 955 self.folder = os.path.abspath(tempfile.mkdtemp(prefix=tmp_prefix, | |
| 956 dir=tmp_dir)) # type: Text | |
| 957 self.closed = False | |
| 958 # map of filename "data/de/alsdklkas": 12398123 bytes | |
| 959 self.bagged_size = {} # type: Dict[Text, int] | |
| 960 self.tagfiles = set() # type: Set[Text] | |
| 961 self._file_provenance = {} # type: Dict[Text, Dict[Text, Text]] | |
| 962 self._external_aggregates = [] # type: List[Dict[Text, Text]] | |
| 963 self.annotations = [] # type: List[Dict[Text, Any]] | |
| 964 self._content_types = {} # type: Dict[Text,str] | |
| 965 self.fsaccess = fsaccess | |
| 966 # These should be replaced by generate_prov_doc when workflow/run IDs are known: | |
| 967 self.engine_uuid = "urn:uuid:%s" % uuid.uuid4() | |
| 968 self.ro_uuid = uuid.uuid4() | |
| 969 self.base_uri = "arcp://uuid,%s/" % self.ro_uuid | |
| 970 self.cwltool_version = "cwltool %s" % versionstring().split()[-1] | |
| 971 ## | |
| 972 self.relativised_input_object = {} # type: Dict[Any, Any] | |
| 973 | |
| 974 self._initialize() | |
| 975 _logger.debug(u"[provenance] Temporary research object: %s", | |
| 976 self.folder) | |
| 977 | |
| 978 def self_check(self): # type: () -> None | |
| 979 """Raise ValueError if this RO is closed.""" | |
| 980 if self.closed: | |
| 981 raise ValueError( | |
| 982 "This ResearchObject has already been closed and is not " | |
| 983 "available for futher manipulation.") | |
| 984 | |
| 985 def __str__(self): # type: () -> str | |
| 986 """Represent this RO as a string.""" | |
| 987 return "ResearchObject <{}> in <{}>".format(self.ro_uuid, self.folder) | |
| 988 | |
| 989 def _initialize(self): # type: () -> None | |
| 990 for research_obj_folder in (METADATA, DATA, WORKFLOW, SNAPSHOT, | |
| 991 PROVENANCE, LOGS): | |
| 992 os.makedirs(os.path.join(self.folder, research_obj_folder)) | |
| 993 self._initialize_bagit() | |
| 994 | |
| 995 def _initialize_bagit(self): # type: () -> None | |
| 996 """Write fixed bagit header.""" | |
| 997 self.self_check() | |
| 998 bagit = os.path.join(self.folder, "bagit.txt") | |
| 999 # encoding: always UTF-8 (although ASCII would suffice here) | |
| 1000 # newline: ensure LF also on Windows | |
| 1001 with open(bagit, "w", encoding=ENCODING, newline='\n') as bag_it_file: | |
| 1002 # TODO: \n or \r\n ? | |
| 1003 bag_it_file.write(u"BagIt-Version: 0.97\n") | |
| 1004 bag_it_file.write(u"Tag-File-Character-Encoding: %s\n" % ENCODING) | |
| 1005 | |
| 1006 def open_log_file_for_activity(self, uuid_uri): # type: (Text) -> WritableBagFile | |
| 1007 self.self_check() | |
| 1008 # Ensure valid UUID for safe filenames | |
| 1009 activity_uuid = uuid.UUID(uuid_uri) | |
| 1010 if activity_uuid.urn == self.engine_uuid: | |
| 1011 # It's the engine aka cwltool! | |
| 1012 name = "engine" | |
| 1013 else: | |
| 1014 name = "activity" | |
| 1015 p = os.path.join(LOGS, "{}.{}.txt".format(name, activity_uuid)) | |
| 1016 _logger.debug("[provenance] Opening log file for %s: %s" % (name, p)) | |
| 1017 self.add_annotation(activity_uuid.urn, [p], CWLPROV["log"].uri) | |
| 1018 return self.write_bag_file(p) | |
| 1019 | |
| 1020 def _finalize(self): # type: () -> None | |
| 1021 self._write_ro_manifest() | |
| 1022 self._write_bag_info() | |
| 1023 | |
| 1024 def user_provenance(self, document): # type: (ProvDocument) -> None | |
| 1025 """Add the user provenance.""" | |
| 1026 self.self_check() | |
| 1027 (username, fullname) = _whoami() | |
| 1028 | |
| 1029 if not self.full_name: | |
| 1030 self.full_name = fullname | |
| 1031 | |
| 1032 document.add_namespace(UUID) | |
| 1033 document.add_namespace(ORCID) | |
| 1034 document.add_namespace(FOAF) | |
| 1035 account = document.agent( | |
| 1036 ACCOUNT_UUID, {provM.PROV_TYPE: FOAF["OnlineAccount"], | |
| 1037 "prov:label": username, | |
| 1038 FOAF["accountName"]: username}) | |
| 1039 | |
| 1040 user = document.agent( | |
| 1041 self.orcid or USER_UUID, | |
| 1042 {provM.PROV_TYPE: PROV["Person"], | |
| 1043 "prov:label": self.full_name, | |
| 1044 FOAF["name"]: self.full_name, | |
| 1045 FOAF["account"]: account}) | |
| 1046 # cwltool may be started on the shell (directly by user), | |
| 1047 # by shell script (indirectly by user) | |
| 1048 # or from a different program | |
| 1049 # (which again is launched by any of the above) | |
| 1050 # | |
| 1051 # We can't tell in which way, but ultimately we're still | |
| 1052 # acting in behalf of that user (even if we might | |
| 1053 # get their name wrong!) | |
| 1054 document.actedOnBehalfOf(account, user) | |
| 1055 | |
| 1056 def write_bag_file(self, path, encoding=ENCODING): | |
| 1057 # type: (Text, Optional[str]) -> WritableBagFile | |
| 1058 """Write the bag file into our research object.""" | |
| 1059 self.self_check() | |
| 1060 # For some reason below throws BlockingIOError | |
| 1061 #fp = BufferedWriter(WritableBagFile(self, path)) | |
| 1062 bag_file = WritableBagFile(self, path) | |
| 1063 if encoding is not None: | |
| 1064 # encoding: match Tag-File-Character-Encoding: UTF-8 | |
| 1065 # newline: ensure LF also on Windows | |
| 1066 return cast(WritableBagFile, | |
| 1067 TextIOWrapper(cast(IO[bytes], bag_file), encoding=encoding, | |
| 1068 newline="\n")) | |
| 1069 return bag_file | |
| 1070 | |
| 1071 def add_tagfile(self, path, timestamp=None): | |
| 1072 # type: (Text, Optional[datetime.datetime]) -> None | |
| 1073 """Add tag files to our research object.""" | |
| 1074 self.self_check() | |
| 1075 checksums = {} | |
| 1076 # Read file to calculate its checksum | |
| 1077 if os.path.isdir(path): | |
| 1078 return | |
| 1079 # FIXME: do the right thing for directories | |
| 1080 with open(path, "rb") as tag_file: | |
| 1081 # FIXME: Should have more efficient open_tagfile() that | |
| 1082 # does all checksums in one go while writing through, | |
| 1083 # adding checksums after closing. | |
| 1084 # Below probably OK for now as metadata files | |
| 1085 # are not too large..? | |
| 1086 | |
| 1087 checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1) | |
| 1088 | |
| 1089 tag_file.seek(0) | |
| 1090 checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256) | |
| 1091 | |
| 1092 tag_file.seek(0) | |
| 1093 checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512) | |
| 1094 | |
| 1095 rel_path = _posix_path(os.path.relpath(path, self.folder)) | |
| 1096 self.tagfiles.add(rel_path) | |
| 1097 self.add_to_manifest(rel_path, checksums) | |
| 1098 if timestamp is not None: | |
| 1099 self._file_provenance[rel_path] = {"createdOn": timestamp.isoformat()} | |
| 1100 | |
| 1101 def _ro_aggregates(self): | |
| 1102 # type: () -> List[Dict[Text, Any]] | |
| 1103 """Gather dictionary of files to be added to the manifest.""" | |
| 1104 def guess_mediatype(rel_path): | |
| 1105 # type: (Text) -> Dict[Text, Any] | |
| 1106 """Return the mediatypes.""" | |
| 1107 media_types = { | |
| 1108 # Adapted from | |
| 1109 # https://w3id.org/bundle/2014-11-05/#media-types | |
| 1110 | |
| 1111 "txt": TEXT_PLAIN, | |
| 1112 "ttl": 'text/turtle; charset="UTF-8"', | |
| 1113 "rdf": 'application/rdf+xml', | |
| 1114 "json": 'application/json', | |
| 1115 "jsonld": 'application/ld+json', | |
| 1116 "xml": 'application/xml', | |
| 1117 ## | |
| 1118 "cwl": 'text/x+yaml; charset="UTF-8"', | |
| 1119 "provn": 'text/provenance-notation; charset="UTF-8"', | |
| 1120 "nt": 'application/n-triples', | |
| 1121 } # type: Dict[Text, Text] | |
| 1122 conforms_to = { | |
| 1123 "provn": 'http://www.w3.org/TR/2013/REC-prov-n-20130430/', | |
| 1124 "cwl": 'https://w3id.org/cwl/', | |
| 1125 } # type: Dict[Text, Text] | |
| 1126 | |
| 1127 prov_conforms_to = { | |
| 1128 "provn": 'http://www.w3.org/TR/2013/REC-prov-n-20130430/', | |
| 1129 "rdf": 'http://www.w3.org/TR/2013/REC-prov-o-20130430/', | |
| 1130 "ttl": 'http://www.w3.org/TR/2013/REC-prov-o-20130430/', | |
| 1131 "nt": 'http://www.w3.org/TR/2013/REC-prov-o-20130430/', | |
| 1132 "jsonld": 'http://www.w3.org/TR/2013/REC-prov-o-20130430/', | |
| 1133 "xml": 'http://www.w3.org/TR/2013/NOTE-prov-xml-20130430/', | |
| 1134 "json": 'http://www.w3.org/Submission/2013/SUBM-prov-json-20130424/', | |
| 1135 } # type: Dict[Text, Text] | |
| 1136 | |
| 1137 | |
| 1138 extension = rel_path.rsplit(".", 1)[-1].lower() # type: Optional[Text] | |
| 1139 if extension == rel_path: | |
| 1140 # No ".", no extension | |
| 1141 extension = None | |
| 1142 | |
| 1143 local_aggregate = {} # type: Dict[Text, Any] | |
| 1144 if extension in media_types: | |
| 1145 local_aggregate["mediatype"] = media_types[extension] | |
| 1146 | |
| 1147 if extension in conforms_to: | |
| 1148 # TODO: Open CWL file to read its declared "cwlVersion", e.g. | |
| 1149 # cwlVersion = "v1.0" | |
| 1150 local_aggregate["conformsTo"] = conforms_to[extension] | |
| 1151 | |
| 1152 if (rel_path.startswith(_posix_path(PROVENANCE)) | |
| 1153 and extension in prov_conforms_to): | |
| 1154 if ".cwlprov" in rel_path: | |
| 1155 # Our own! | |
| 1156 local_aggregate["conformsTo"] = [prov_conforms_to[extension], CWLPROV_VERSION] | |
| 1157 else: | |
| 1158 # Some other PROV | |
| 1159 # TODO: Recognize ProvOne etc. | |
| 1160 local_aggregate["conformsTo"] = prov_conforms_to[extension] | |
| 1161 return local_aggregate | |
| 1162 | |
| 1163 aggregates = [] # type: List[Dict[Text, Any]] | |
| 1164 for path in self.bagged_size.keys(): | |
| 1165 aggregate_dict = {} # type: Dict[Text, Any] | |
| 1166 | |
| 1167 temp_path = PurePosixPath(path) | |
| 1168 folder = temp_path.parent | |
| 1169 filename = temp_path.name | |
| 1170 | |
| 1171 # NOTE: Here we end up aggregating the abstract | |
| 1172 # data items by their sha1 hash, so that it matches | |
| 1173 # the entity() in the prov files. | |
| 1174 | |
| 1175 # TODO: Change to nih:sha-256; hashes | |
| 1176 # https://tools.ietf.org/html/rfc6920#section-7 | |
| 1177 aggregate_dict["uri"] = 'urn:hash::sha1:' + filename | |
| 1178 aggregate_dict["bundledAs"] = { | |
| 1179 # The arcp URI is suitable ORE proxy; local to this Research Object. | |
| 1180 # (as long as we don't also aggregate it by relative path!) | |
| 1181 "uri": self.base_uri + path, | |
| 1182 # relate it to the data/ path | |
| 1183 "folder": "/%s/" % folder, | |
| 1184 "filename": filename, | |
| 1185 } | |
| 1186 if path in self._file_provenance: | |
| 1187 # Made by workflow run, merge captured provenance | |
| 1188 aggregate_dict["bundledAs"].update(self._file_provenance[path]) | |
| 1189 else: | |
| 1190 # Probably made outside wf run, part of job object? | |
| 1191 pass | |
| 1192 if path in self._content_types: | |
| 1193 aggregate_dict["mediatype"] = self._content_types[path] | |
| 1194 | |
| 1195 aggregates.append(aggregate_dict) | |
| 1196 | |
| 1197 for path in self.tagfiles: | |
| 1198 if (not (path.startswith(METADATA) or path.startswith(WORKFLOW) or | |
| 1199 path.startswith(SNAPSHOT))): | |
| 1200 # probably a bagit file | |
| 1201 continue | |
| 1202 if path == PurePosixPath(METADATA)/"manifest.json": | |
| 1203 # Should not really be there yet! But anyway, we won't | |
| 1204 # aggregate it. | |
| 1205 continue | |
| 1206 | |
| 1207 rel_aggregates = {} # type: Dict[Text, Any] | |
| 1208 # These are local paths like metadata/provenance - but | |
| 1209 # we need to relativize them for our current directory for | |
| 1210 # as we are saved in metadata/manifest.json | |
| 1211 uri = str(Path(os.pardir)/path) | |
| 1212 | |
| 1213 rel_aggregates["uri"] = uri | |
| 1214 rel_aggregates.update(guess_mediatype(path)) | |
| 1215 | |
| 1216 if path in self._file_provenance: | |
| 1217 # Propagate file provenance (e.g. timestamp) | |
| 1218 rel_aggregates.update(self._file_provenance[path]) | |
| 1219 elif not path.startswith(SNAPSHOT): | |
| 1220 # make new timestamp? | |
| 1221 rel_aggregates.update(self._self_made()) | |
| 1222 aggregates.append(rel_aggregates) | |
| 1223 aggregates.extend(self._external_aggregates) | |
| 1224 return aggregates | |
| 1225 | |
| 1226 def add_uri(self, uri, timestamp=None): | |
| 1227 # type: (str, Optional[datetime.datetime]) -> Dict[Text, Any] | |
| 1228 self.self_check() | |
| 1229 aggr = self._self_made(timestamp=timestamp) | |
| 1230 aggr["uri"] = uri | |
| 1231 self._external_aggregates.append(aggr) | |
| 1232 return aggr | |
| 1233 | |
| 1234 def add_annotation(self, about, content, motivated_by="oa:describing"): | |
| 1235 # type: (str, List[str], str) -> str | |
| 1236 """Cheap URI relativize for current directory and /.""" | |
| 1237 self.self_check() | |
| 1238 curr = self.base_uri + METADATA + "/" | |
| 1239 content = [c.replace(curr, "").replace(self.base_uri, "../") | |
| 1240 for c in content] | |
| 1241 uri = uuid.uuid4().urn | |
| 1242 ann = { | |
| 1243 u"uri": uri, | |
| 1244 u"about": about, | |
| 1245 u"content": content, | |
| 1246 u"oa:motivatedBy": {"@id": motivated_by} | |
| 1247 } | |
| 1248 self.annotations.append(ann) | |
| 1249 return uri | |
| 1250 | |
| 1251 def _ro_annotations(self): | |
| 1252 # type: () -> List[Dict[Text, Any]] | |
| 1253 annotations = [] # type: List[Dict[Text, Any]] | |
| 1254 annotations.append({ | |
| 1255 "uri": uuid.uuid4().urn, | |
| 1256 "about": self.ro_uuid.urn, | |
| 1257 "content": "/", | |
| 1258 # https://www.w3.org/TR/annotation-vocab/#named-individuals | |
| 1259 "oa:motivatedBy": {"@id": "oa:describing"} | |
| 1260 }) | |
| 1261 | |
| 1262 # How was it run? | |
| 1263 # FIXME: Only primary* | |
| 1264 prov_files = [str(PurePosixPath(p).relative_to(METADATA)) for p in self.tagfiles | |
| 1265 if p.startswith(_posix_path(PROVENANCE)) | |
| 1266 and "/primary." in p] | |
| 1267 annotations.append({ | |
| 1268 "uri": uuid.uuid4().urn, | |
| 1269 "about": self.ro_uuid.urn, | |
| 1270 "content": prov_files, | |
| 1271 # Modulation of https://www.w3.org/TR/prov-aq/ | |
| 1272 "oa:motivatedBy": {"@id": "http://www.w3.org/ns/prov#has_provenance"} | |
| 1273 }) | |
| 1274 | |
| 1275 # Where is the main workflow? | |
| 1276 annotations.append({ | |
| 1277 "uri": uuid.uuid4().urn, | |
| 1278 "about": str(PurePosixPath("..")/WORKFLOW/"packed.cwl"), | |
| 1279 "oa:motivatedBy": {"@id": "oa:highlighting"} | |
| 1280 }) | |
| 1281 | |
| 1282 annotations.append({ | |
| 1283 "uri": uuid.uuid4().urn, | |
| 1284 "about": self.ro_uuid.urn, | |
| 1285 "content": [str(PurePosixPath("..")/WORKFLOW/"packed.cwl"), | |
| 1286 str(PurePosixPath("..")/WORKFLOW/"primary-job.json")], | |
| 1287 "oa:motivatedBy": {"@id": "oa:linking"} | |
| 1288 }) | |
| 1289 # Add user-added annotations at end | |
| 1290 annotations.extend(self.annotations) | |
| 1291 return annotations | |
| 1292 | |
| 1293 def _authored_by(self): | |
| 1294 # type: () -> Dict[Text, Any] | |
| 1295 authored_by = {} | |
| 1296 if self.orcid: | |
| 1297 authored_by["orcid"] = self.orcid | |
| 1298 if self.full_name: | |
| 1299 authored_by["name"] = self.full_name | |
| 1300 if not self.orcid: | |
| 1301 authored_by["uri"] = USER_UUID | |
| 1302 | |
| 1303 if authored_by: | |
| 1304 return {"authoredBy": authored_by} | |
| 1305 return {} | |
| 1306 | |
| 1307 | |
| 1308 def _write_ro_manifest(self): | |
| 1309 # type: () -> None | |
| 1310 | |
| 1311 # Does not have to be this order, but it's nice to be consistent | |
| 1312 manifest = OrderedDict() # type: Dict[Text, Any] | |
| 1313 manifest["@context"] = [ | |
| 1314 {"@base": "%s%s/" % (self.base_uri, _posix_path(METADATA))}, | |
| 1315 "https://w3id.org/bundle/context" | |
| 1316 ] | |
| 1317 manifest["id"] = "/" | |
| 1318 manifest["conformsTo"] = CWLPROV_VERSION | |
| 1319 filename = "manifest.json" | |
| 1320 manifest["manifest"] = filename | |
| 1321 manifest.update(self._self_made()) | |
| 1322 manifest.update(self._authored_by()) | |
| 1323 manifest["aggregates"] = self._ro_aggregates() | |
| 1324 manifest["annotations"] = self._ro_annotations() | |
| 1325 | |
| 1326 json_manifest = json_dumps(manifest, indent=4, ensure_ascii=False) | |
| 1327 rel_path = str(PurePosixPath(METADATA)/filename) | |
| 1328 json_manifest += "\n" | |
| 1329 with self.write_bag_file(rel_path) as manifest_file: | |
| 1330 manifest_file.write(json_manifest) | |
| 1331 | |
| 1332 def _write_bag_info(self): | |
| 1333 # type: () -> None | |
| 1334 | |
| 1335 with self.write_bag_file("bag-info.txt") as info_file: | |
| 1336 info_file.write(u"Bag-Software-Agent: %s\n" % self.cwltool_version) | |
| 1337 # FIXME: require sha-512 of payload to comply with profile? | |
| 1338 # FIXME: Update profile | |
| 1339 info_file.write(u"BagIt-Profile-Identifier: https://w3id.org/ro/bagit/profile\n") | |
| 1340 info_file.write(u"Bagging-Date: %s\n" % datetime.date.today().isoformat()) | |
| 1341 info_file.write(u"External-Description: Research Object of CWL workflow run\n") | |
| 1342 if self.full_name: | |
| 1343 info_file.write(u"Contact-Name: %s\n" % self.full_name) | |
| 1344 | |
| 1345 # NOTE: We can't use the urn:uuid:{UUID} of the workflow run (a prov:Activity) | |
| 1346 # as identifier for the RO/bagit (a prov:Entity). However the arcp base URI is good. | |
| 1347 info_file.write(u"External-Identifier: %s\n" % self.base_uri) | |
| 1348 | |
| 1349 # Calculate size of data/ (assuming no external fetch.txt files) | |
| 1350 total_size = sum(self.bagged_size.values()) | |
| 1351 num_files = len(self.bagged_size) | |
| 1352 info_file.write(u"Payload-Oxum: %d.%d\n" % (total_size, num_files)) | |
| 1353 _logger.debug(u"[provenance] Generated bagit metadata: %s", | |
| 1354 self.folder) | |
| 1355 | |
| 1356 def generate_snapshot(self, prov_dep): | |
| 1357 # type: (MutableMapping[Text, Any]) -> None | |
| 1358 """Copy all of the CWL files to the snapshot/ directory.""" | |
| 1359 self.self_check() | |
| 1360 for key, value in prov_dep.items(): | |
| 1361 if key == "location" and value.split("/")[-1]: | |
| 1362 filename = value.split("/")[-1] | |
| 1363 path = os.path.join(self.folder, SNAPSHOT, filename) | |
| 1364 filepath = '' | |
| 1365 if "file://" in value: | |
| 1366 filepath = value[7:] | |
| 1367 else: | |
| 1368 filepath = value | |
| 1369 | |
| 1370 # FIXME: What if destination path already exists? | |
| 1371 if os.path.exists(filepath): | |
| 1372 try: | |
| 1373 if os.path.isdir(filepath): | |
| 1374 shutil.copytree(filepath, path) | |
| 1375 else: | |
| 1376 shutil.copy(filepath, path) | |
| 1377 timestamp = datetime.datetime.fromtimestamp(os.path.getmtime(filepath)) | |
| 1378 self.add_tagfile(path, timestamp) | |
| 1379 except PermissionError: | |
| 1380 pass # FIXME: avoids duplicate snapshotting; need better solution | |
| 1381 elif key in ("secondaryFiles", "listing"): | |
| 1382 for files in value: | |
| 1383 if isinstance(files, MutableMapping): | |
| 1384 self.generate_snapshot(files) | |
| 1385 else: | |
| 1386 pass | |
| 1387 | |
| 1388 def packed_workflow(self, packed): # type: (Text) -> None | |
| 1389 """Pack CWL description to generate re-runnable CWL object in RO.""" | |
| 1390 self.self_check() | |
| 1391 rel_path = str(PurePosixPath(WORKFLOW)/"packed.cwl") | |
| 1392 # Write as binary | |
| 1393 with self.write_bag_file(rel_path, encoding=None) as write_pack: | |
| 1394 # YAML is always UTF8, but json.dumps gives us str in py2 | |
| 1395 write_pack.write(packed.encode(ENCODING)) | |
| 1396 _logger.debug(u"[provenance] Added packed workflow: %s", rel_path) | |
| 1397 | |
| 1398 def has_data_file(self, sha1hash): # type: (str) -> bool | |
| 1399 """Confirm the presence of the given file in the RO.""" | |
| 1400 folder = os.path.join(self.folder, DATA, sha1hash[0:2]) | |
| 1401 hash_path = os.path.join(folder, sha1hash) | |
| 1402 return os.path.isfile(hash_path) | |
| 1403 | |
| 1404 def add_data_file(self, from_fp, timestamp=None, content_type=None): | |
| 1405 # type: (IO[Any], Optional[datetime.datetime], Optional[str]) -> Text | |
| 1406 """Copy inputs to data/ folder.""" | |
| 1407 self.self_check() | |
| 1408 tmp_dir, tmp_prefix = os.path.split(self.temp_prefix) | |
| 1409 with tempfile.NamedTemporaryFile( | |
| 1410 prefix=tmp_prefix, dir=tmp_dir, delete=False) as tmp: | |
| 1411 checksum = checksum_copy(from_fp, tmp) | |
| 1412 | |
| 1413 # Calculate hash-based file path | |
| 1414 folder = os.path.join(self.folder, DATA, checksum[0:2]) | |
| 1415 path = os.path.join(folder, checksum) | |
| 1416 # os.rename assumed safe, as our temp file should | |
| 1417 # be in same file system as our temp folder | |
| 1418 if not os.path.isdir(folder): | |
| 1419 os.makedirs(folder) | |
| 1420 os.rename(tmp.name, path) | |
| 1421 | |
| 1422 # Relative posix path | |
| 1423 # (to avoid \ on Windows) | |
| 1424 rel_path = _posix_path(os.path.relpath(path, self.folder)) | |
| 1425 | |
| 1426 # Register in bagit checksum | |
| 1427 if Hasher == hashlib.sha1: | |
| 1428 self._add_to_bagit(rel_path, sha1=checksum) | |
| 1429 else: | |
| 1430 _logger.warning( | |
| 1431 u"[provenance] Unknown hash method %s for bagit manifest", | |
| 1432 Hasher) | |
| 1433 # Inefficient, bagit support need to checksum again | |
| 1434 self._add_to_bagit(rel_path) | |
| 1435 _logger.debug(u"[provenance] Added data file %s", path) | |
| 1436 if timestamp is not None: | |
| 1437 self._file_provenance[rel_path] = self._self_made(timestamp) | |
| 1438 _logger.debug(u"[provenance] Relative path for data file %s", rel_path) | |
| 1439 | |
| 1440 if content_type is not None: | |
| 1441 self._content_types[rel_path] = content_type | |
| 1442 return rel_path | |
| 1443 | |
| 1444 def _self_made(self, timestamp=None): | |
| 1445 # type: (Optional[datetime.datetime]) -> Dict[Text, Any] | |
| 1446 if timestamp is None: | |
| 1447 timestamp = datetime.datetime.now() | |
| 1448 return { | |
| 1449 "createdOn": timestamp.isoformat(), | |
| 1450 "createdBy": {"uri": self.engine_uuid, | |
| 1451 "name": self.cwltool_version} | |
| 1452 } | |
| 1453 | |
| 1454 def add_to_manifest(self, rel_path, checksums): | |
| 1455 # type: (Text, Dict[str,str]) -> None | |
| 1456 """Add files to the research object manifest.""" | |
| 1457 self.self_check() | |
| 1458 if PurePosixPath(rel_path).is_absolute(): | |
| 1459 raise ValueError("rel_path must be relative: %s" % rel_path) | |
| 1460 | |
| 1461 if os.path.commonprefix(["data/", rel_path]) == "data/": | |
| 1462 # payload file, go to manifest | |
| 1463 manifest = "manifest" | |
| 1464 else: | |
| 1465 # metadata file, go to tag manifest | |
| 1466 manifest = "tagmanifest" | |
| 1467 | |
| 1468 # Add checksums to corresponding manifest files | |
| 1469 for (method, hash_value) in checksums.items(): | |
| 1470 # File not in manifest because we bailed out on | |
| 1471 # existence in bagged_size above | |
| 1472 manifestpath = os.path.join( | |
| 1473 self.folder, "%s-%s.txt" % (manifest, method.lower())) | |
| 1474 # encoding: match Tag-File-Character-Encoding: UTF-8 | |
| 1475 # newline: ensure LF also on Windows | |
| 1476 with open(manifestpath, "a", encoding=ENCODING, newline='\n') \ | |
| 1477 as checksum_file: | |
| 1478 line = u"%s %s\n" % (hash_value, rel_path) | |
| 1479 _logger.debug(u"[provenance] Added to %s: %s", manifestpath, line) | |
| 1480 checksum_file.write(line) | |
| 1481 | |
| 1482 | |
| 1483 def _add_to_bagit(self, rel_path, **checksums): | |
| 1484 # type: (Text, Any) -> None | |
| 1485 if PurePosixPath(rel_path).is_absolute(): | |
| 1486 raise ValueError("rel_path must be relative: %s" % rel_path) | |
| 1487 local_path = os.path.join(self.folder, _local_path(rel_path)) | |
| 1488 if not os.path.exists(local_path): | |
| 1489 raise IOError("File %s does not exist within RO: %s" % (rel_path, local_path)) | |
| 1490 | |
| 1491 if rel_path in self.bagged_size: | |
| 1492 # Already added, assume checksum OK | |
| 1493 return | |
| 1494 self.bagged_size[rel_path] = os.path.getsize(local_path) | |
| 1495 | |
| 1496 if SHA1 not in checksums: | |
| 1497 # ensure we always have sha1 | |
| 1498 checksums = dict(checksums) | |
| 1499 with open(local_path, "rb") as file_path: | |
| 1500 # FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile? | |
| 1501 checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1) | |
| 1502 | |
| 1503 self.add_to_manifest(rel_path, checksums) | |
| 1504 | |
| 1505 def create_job(self, | |
| 1506 builder_job, # type: Dict[Text, Any] | |
| 1507 wf_job=None, # type: Optional[Callable[[Dict[Text, Text], Callable[[Any, Any], Any], RuntimeContext], Generator[Any, None, None]]] | |
| 1508 is_output=False # type: bool | |
| 1509 ): # type: (...) -> Dict[Text, Text] | |
| 1510 #TODO customise the file | |
| 1511 """Generate the new job object with RO specific relative paths.""" | |
| 1512 copied = copy.deepcopy(builder_job) | |
| 1513 relativised_input_objecttemp = {} # type: Dict[Text, Any] | |
| 1514 self._relativise_files(copied) | |
| 1515 def jdefault(o): # type: (Any) -> Dict[Any, Any] | |
| 1516 return dict(o) | |
| 1517 if is_output: | |
| 1518 rel_path = PurePosixPath(WORKFLOW)/"primary-output.json" | |
| 1519 else: | |
| 1520 rel_path = PurePosixPath(WORKFLOW)/"primary-job.json" | |
| 1521 j = json_dumps(copied, indent=4, ensure_ascii=False, default=jdefault) | |
| 1522 with self.write_bag_file(str(rel_path)) as file_path: | |
| 1523 file_path.write(j + u"\n") | |
| 1524 _logger.debug(u"[provenance] Generated customised job file: %s", | |
| 1525 rel_path) | |
| 1526 # Generate dictionary with keys as workflow level input IDs and values | |
| 1527 # as | |
| 1528 # 1) for files the relativised location containing hash | |
| 1529 # 2) for other attributes, the actual value. | |
| 1530 relativised_input_objecttemp = {} | |
| 1531 for key, value in copied.items(): | |
| 1532 if isinstance(value, MutableMapping): | |
| 1533 if value.get("class") in ("File", "Directory"): | |
| 1534 relativised_input_objecttemp[key] = value | |
| 1535 else: | |
| 1536 relativised_input_objecttemp[key] = value | |
| 1537 self.relativised_input_object.update( | |
| 1538 {k: v for k, v in relativised_input_objecttemp.items() if v}) | |
| 1539 return self.relativised_input_object | |
| 1540 | |
| 1541 def _relativise_files(self, structure): | |
| 1542 # type: (Dict[Any, Any]) -> None | |
| 1543 """Save any file objects into the RO and update the local paths.""" | |
| 1544 # Base case - we found a File we need to update | |
| 1545 _logger.debug(u"[provenance] Relativising: %s", structure) | |
| 1546 | |
| 1547 if isinstance(structure, MutableMapping): | |
| 1548 if structure.get("class") == "File": | |
| 1549 relative_path = None | |
| 1550 if "checksum" in structure: | |
| 1551 alg, checksum = structure["checksum"].split("$") | |
| 1552 if alg != SHA1: | |
| 1553 raise TypeError( | |
| 1554 "Only SHA1 CWL checksums are currently supported: " | |
| 1555 "{}".format(structure)) | |
| 1556 if self.has_data_file(checksum): | |
| 1557 prefix = checksum[0:2] | |
| 1558 relative_path = PurePosixPath("data")/prefix/checksum | |
| 1559 | |
| 1560 if not relative_path is not None and "location" in structure: | |
| 1561 # Register in RO; but why was this not picked | |
| 1562 # up by used_artefacts? | |
| 1563 _logger.info("[provenance] Adding to RO %s", structure["location"]) | |
| 1564 with self.fsaccess.open(structure["location"], "rb") as fp: | |
| 1565 relative_path = self.add_data_file(fp) | |
| 1566 checksum = PurePosixPath(relative_path).name | |
| 1567 structure["checksum"] = "%s$%s" % (SHA1, checksum) | |
| 1568 if relative_path is not None: | |
| 1569 # RO-relative path as new location | |
| 1570 structure["location"] = str(PurePosixPath("..")/relative_path) | |
| 1571 else: | |
| 1572 _logger.warning("Could not determine RO path for file %s", structure) | |
| 1573 if "path" in structure: | |
| 1574 del structure["path"] | |
| 1575 | |
| 1576 if structure.get("class") == "Directory": | |
| 1577 # TODO: Generate anonymoys Directory with a "listing" | |
| 1578 # pointing to the hashed files | |
| 1579 del structure["location"] | |
| 1580 | |
| 1581 for val in structure.values(): | |
| 1582 try: | |
| 1583 self._relativise_files(val) | |
| 1584 except OSError: | |
| 1585 pass | |
| 1586 return | |
| 1587 | |
| 1588 if isinstance(structure, (str, Text)): | |
| 1589 # Just a string value, no need to iterate further | |
| 1590 return | |
| 1591 try: | |
| 1592 for obj in iter(structure): | |
| 1593 # Recurse and rewrite any nested File objects | |
| 1594 self._relativise_files(obj) | |
| 1595 except TypeError: | |
| 1596 pass | |
| 1597 | |
| 1598 def close(self, save_to=None): | |
| 1599 # type: (Optional[str]) -> None | |
| 1600 """Close the Research Object, optionally saving to specified folder. | |
| 1601 | |
| 1602 Closing will remove any temporary files used by this research object. | |
| 1603 After calling this method, this ResearchObject instance can no longer | |
| 1604 be used, except for no-op calls to .close(). | |
| 1605 | |
| 1606 The 'saveTo' folder should not exist - if it does, it will be deleted. | |
| 1607 | |
| 1608 It is safe to call this function multiple times without the | |
| 1609 'saveTo' argument, e.g. within a try..finally block to | |
| 1610 ensure the temporary files of this Research Object are removed. | |
| 1611 """ | |
| 1612 if save_to is None: | |
| 1613 if not self.closed: | |
| 1614 _logger.debug(u"[provenance] Deleting temporary %s", self.folder) | |
| 1615 shutil.rmtree(self.folder, ignore_errors=True) | |
| 1616 else: | |
| 1617 save_to = os.path.abspath(save_to) | |
| 1618 _logger.info(u"[provenance] Finalizing Research Object") | |
| 1619 self._finalize() # write manifest etc. | |
| 1620 # TODO: Write as archive (.zip or .tar) based on extension? | |
| 1621 | |
| 1622 if os.path.isdir(save_to): | |
| 1623 _logger.info(u"[provenance] Deleting existing %s", save_to) | |
| 1624 shutil.rmtree(save_to) | |
| 1625 shutil.move(self.folder, save_to) | |
| 1626 _logger.info(u"[provenance] Research Object saved to %s", save_to) | |
| 1627 self.folder = save_to | |
| 1628 self.closed = True | |
| 1629 | |
| 1630 def checksum_copy(src_file, # type: IO[Any] | |
| 1631 dst_file=None, # type: Optional[IO[Any]] | |
| 1632 hasher=Hasher, # type: Callable[[], hashlib._Hash] | |
| 1633 buffersize=1024*1024 # type: int | |
| 1634 ): # type: (...) -> str | |
| 1635 """Compute checksums while copying a file.""" | |
| 1636 # TODO: Use hashlib.new(Hasher_str) instead? | |
| 1637 checksum = hasher() | |
| 1638 contents = src_file.read(buffersize) | |
| 1639 if dst_file and hasattr(dst_file, "name") and hasattr(src_file, "name"): | |
| 1640 temp_location = os.path.join(os.path.dirname(dst_file.name), | |
| 1641 str(uuid.uuid4())) | |
| 1642 try: | |
| 1643 os.rename(dst_file.name, temp_location) | |
| 1644 os.link(src_file.name, dst_file.name) | |
| 1645 dst_file = None | |
| 1646 os.unlink(temp_location) | |
| 1647 except OSError: | |
| 1648 pass | |
| 1649 if os.path.exists(temp_location): | |
| 1650 os.rename(temp_location, dst_file.name) # type: ignore | |
| 1651 while contents != b"": | |
| 1652 if dst_file is not None: | |
| 1653 dst_file.write(contents) | |
| 1654 checksum.update(contents) | |
| 1655 contents = src_file.read(buffersize) | |
| 1656 if dst_file is not None: | |
| 1657 dst_file.flush() | |
| 1658 return checksum.hexdigest().lower() | |
| 1659 | |
| 1660 def copy_job_order(job, job_order_object): | |
| 1661 # type: (Any, Any) -> Any | |
| 1662 """Create copy of job object for provenance.""" | |
| 1663 if not hasattr(job, "tool"): | |
| 1664 # direct command line tool execution | |
| 1665 return job_order_object | |
| 1666 customised_job = {} # new job object for RO | |
| 1667 for each, i in enumerate(job.tool["inputs"]): | |
| 1668 with SourceLine(job.tool["inputs"], each, WorkflowException, | |
| 1669 _logger.isEnabledFor(logging.DEBUG)): | |
| 1670 iid = shortname(i["id"]) | |
| 1671 if iid in job_order_object: | |
| 1672 customised_job[iid] = copy.deepcopy(job_order_object[iid]) | |
| 1673 # add the input element in dictionary for provenance | |
| 1674 elif "default" in i: | |
| 1675 customised_job[iid] = copy.deepcopy(i["default"]) | |
| 1676 # add the default elements in the dictionary for provenance | |
| 1677 else: | |
| 1678 pass | |
| 1679 return customised_job |
