comparison env/lib/python3.7/site-packages/cwltool/provenance.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 """Stores Research Object including provenance."""
2 from __future__ import absolute_import
3
4 import copy
5 import datetime
6 import hashlib
7 import logging
8 import os
9 import re
10 import shutil
11 import tempfile
12 import uuid
13 from collections import OrderedDict
14 from getpass import getuser
15 from io import BytesIO, FileIO, TextIOWrapper, open
16 from socket import getfqdn
17 from typing import (IO, Any, Callable, Dict, List, Generator,
18 MutableMapping, Optional, Set, Tuple, Union, cast)
19 from types import ModuleType
20
21 import prov.model as provM
22 import six
23 from prov.identifier import Identifier, Namespace
24 from prov.model import (PROV, ProvActivity, # pylint: disable=unused-import
25 ProvDocument, ProvEntity)
26 from pathlib2 import Path, PurePosixPath, PurePath
27 from ruamel import yaml
28 from schema_salad.sourceline import SourceLine
29 from six.moves import urllib
30 from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import
31 Text)
32 # move to a regular typing import when Python 3.3-3.6 is no longer supported
33
34 from .context import RuntimeContext # pylint: disable=unused-import
35 from .errors import WorkflowException
36 from .loghandler import _logger
37 from .pathmapper import get_listing
38 from .process import Process, shortname # pylint: disable=unused-import
39 from .stdfsaccess import StdFsAccess # pylint: disable=unused-import
40 from .utils import json_dumps, versionstring, onWindows
41
42
43 # imports needed for retrieving user data
44 if onWindows():
45 import ctypes # pylint: disable=unused-import
46 else:
47 try:
48 import pwd # pylint: disable=unused-import
49 except ImportError:
50 pass
51
52 if TYPE_CHECKING:
53 from .command_line_tool import CommandLineTool, ExpressionTool # pylint: disable=unused-import
54 from .workflow import Workflow # pylint: disable=unused-import
55
56 if six.PY2:
57 class PermissionError(OSError): # pylint: disable=redefined-builtin
58 """Needed for Python2."""
59
60 pass
61 __citation__ = "https://doi.org/10.5281/zenodo.1208477"
62
63 # NOTE: Semantic versioning of the CWLProv Research Object
64 # **and** the cwlprov files
65 #
66 # Rough guide (major.minor.patch):
67 # 1. Bump major number if removing/"breaking" resources or PROV statements
68 # 2. Bump minor number if adding resources or PROV statements
69 # 3. Bump patch number for non-breaking non-adding changes,
70 # e.g. fixing broken relative paths
71 CWLPROV_VERSION = "https://w3id.org/cwl/prov/0.6.0"
72
73 # Research Object folders
74 METADATA = "metadata"
75 DATA = "data"
76 WORKFLOW = "workflow"
77 SNAPSHOT = "snapshot"
78 # sub-folders
79 MAIN = os.path.join(WORKFLOW, "main")
80 PROVENANCE = os.path.join(METADATA, "provenance")
81 LOGS = os.path.join(METADATA, "logs")
82 WFDESC = Namespace("wfdesc", 'http://purl.org/wf4ever/wfdesc#')
83 WFPROV = Namespace("wfprov", 'http://purl.org/wf4ever/wfprov#')
84 WF4EVER = Namespace("wf4ever", 'http://purl.org/wf4ever/wf4ever#')
85 RO = Namespace("ro", 'http://purl.org/wf4ever/ro#')
86 ORE = Namespace("ore", 'http://www.openarchives.org/ore/terms/')
87 FOAF = Namespace("foaf", 'http://xmlns.com/foaf/0.1/')
88 SCHEMA = Namespace("schema", 'http://schema.org/')
89 CWLPROV = Namespace('cwlprov', 'https://w3id.org/cwl/prov#')
90 ORCID = Namespace("orcid", "https://orcid.org/")
91 UUID = Namespace("id", "urn:uuid:")
92
93 # BagIt and YAML always use UTF-8
94 ENCODING = "UTF-8"
95 TEXT_PLAIN = 'text/plain; charset="%s"' % ENCODING
96
97 # sha1, compatible with the File type's "checksum" field
98 # e.g. "checksum" = "sha1$47a013e660d408619d894b20806b1d5086aab03b"
99 # See ./cwltool/schemas/v1.0/Process.yml
100 Hasher = hashlib.sha1
101 SHA1 = "sha1"
102 SHA256 = "sha256"
103 SHA512 = "sha512"
104
105 # TODO: Better identifiers for user, at least
106 # these should be preserved in ~/.config/cwl for every execution
107 # on this host
108 USER_UUID = uuid.uuid4().urn
109 ACCOUNT_UUID = uuid.uuid4().urn
110
111
112 def _posix_path(local_path):
113 # type: (Text) -> Text
114 return str(PurePosixPath(Path(local_path)))
115
116
117 def _local_path(posix_path):
118 # type: (Text) -> Text
119 return str(Path(posix_path))
120
121
122 def _whoami():
123 # type: () -> Tuple[Text,Text]
124 """Return the current operating system account as (username, fullname)."""
125 username = getuser()
126 try:
127 if onWindows():
128 get_user_name = ctypes.windll.secur32.GetUserNameExW # type: ignore
129 size = ctypes.pointer(ctypes.c_ulong(0))
130 get_user_name(3, None, size)
131
132 name_buffer = ctypes.create_unicode_buffer(size.contents.value)
133 get_user_name(3, name_buffer, size)
134 fullname = str(name_buffer.value)
135 else:
136 fullname = pwd.getpwuid(os.getuid())[4].split(',')[0]
137 except (KeyError, IndexError):
138 fullname = username
139
140 return (username, fullname)
141
142
143 class WritableBagFile(FileIO):
144 """Writes files in research object."""
145
146 def __init__(self, research_object, rel_path):
147 # type: (ResearchObject, Text) -> None
148 """Initialize an ROBagIt."""
149 self.research_object = research_object
150 if Path(rel_path).is_absolute():
151 raise ValueError("rel_path must be relative: %s" % rel_path)
152 self.rel_path = rel_path
153 self.hashes = {SHA1: hashlib.sha1(), # nosec
154 SHA256: hashlib.sha256(),
155 SHA512: hashlib.sha512()}
156 # Open file in Research Object folder
157 path = os.path.abspath(os.path.join(research_object.folder, _local_path(rel_path)))
158 if not path.startswith(os.path.abspath(research_object.folder)):
159 raise ValueError("Path is outside Research Object: %s" % path)
160 super(WritableBagFile, self).__init__(str(path), mode="w")
161
162
163 def write(self, b):
164 # type: (Union[bytes, Text]) -> int
165 if isinstance(b, bytes):
166 real_b = b
167 else:
168 real_b = b.encode('utf-8')
169 total = 0
170 length = len(real_b)
171 while total < length:
172 ret = super(WritableBagFile, self).write(real_b)
173 if ret:
174 total += ret
175 for _ in self.hashes.values():
176 _.update(real_b)
177 return total
178
179 def close(self): # type: () -> None
180 # FIXME: Convert below block to a ResearchObject method?
181 if self.rel_path.startswith("data/"):
182 self.research_object.bagged_size[self.rel_path] = self.tell()
183 else:
184 self.research_object.tagfiles.add(self.rel_path)
185
186 super(WritableBagFile, self).close()
187 # { "sha1": "f572d396fae9206628714fb2ce00f72e94f2258f" }
188 checksums = {}
189 for name in self.hashes:
190 checksums[name] = self.hashes[name].hexdigest().lower()
191 self.research_object.add_to_manifest(self.rel_path, checksums)
192
193 # To simplify our hash calculation we won't support
194 # seeking, reading or truncating, as we can't do
195 # similar seeks in the current hash.
196 # TODO: Support these? At the expense of invalidating
197 # the current hash, then having to recalculate at close()
198 def seekable(self): # type: () -> bool
199 return False
200
201 def readable(self): # type: () -> bool
202 return False
203
204 def truncate(self, size=None):
205 # type: (Optional[int]) -> int
206 # FIXME: This breaks contract IOBase,
207 # as it means we would have to recalculate the hash
208 if size is not None:
209 raise IOError("WritableBagFile can't truncate")
210 return self.tell()
211
212
213 def _check_mod_11_2(numeric_string):
214 # type: (Text) -> bool
215 """
216 Validate numeric_string for its MOD-11-2 checksum.
217
218 Any "-" in the numeric_string are ignored.
219
220 The last digit of numeric_string is assumed to be the checksum, 0-9 or X.
221
222 See ISO/IEC 7064:2003 and
223 https://support.orcid.org/knowledgebase/articles/116780-structure-of-the-orcid-identifier
224 """
225 # Strip -
226 nums = numeric_string.replace("-", "")
227 total = 0
228 # skip last (check)digit
229 for num in nums[:-1]:
230 digit = int(num)
231 total = (total+digit)*2
232 remainder = total % 11
233 result = (12-remainder) % 11
234 if result == 10:
235 checkdigit = "X"
236 else:
237 checkdigit = str(result)
238 # Compare against last digit or X
239 return nums[-1].upper() == checkdigit
240
241
242 def _valid_orcid(orcid): # type: (Optional[Text]) -> Text
243 """
244 Ensure orcid is a valid ORCID identifier.
245
246 The string must be equivalent to one of these forms:
247
248 0000-0002-1825-0097
249 orcid.org/0000-0002-1825-0097
250 http://orcid.org/0000-0002-1825-0097
251 https://orcid.org/0000-0002-1825-0097
252
253 If the ORCID number or prefix is invalid, a ValueError is raised.
254
255 The returned ORCID string is always in the form of:
256 https://orcid.org/0000-0002-1825-0097
257 """
258 if orcid is None or not orcid:
259 raise ValueError(u'ORCID cannot be unspecified')
260 # Liberal in what we consume, e.g. ORCID.org/0000-0002-1825-009x
261 orcid = orcid.lower()
262 match = re.match(
263 # Note: concatinated r"" r"" below so we can add comments to pattern
264
265 # Optional hostname, with or without protocol
266 r"(http://orcid\.org/|https://orcid\.org/|orcid\.org/)?"
267 # alternative pattern, but probably messier
268 # r"^((https?://)?orcid.org/)?"
269
270 # ORCID number is always 4x4 numerical digits,
271 # but last digit (modulus 11 checksum)
272 # can also be X (but we made it lowercase above).
273 # e.g. 0000-0002-1825-0097
274 # or 0000-0002-1694-233x
275 r"(?P<orcid>(\d{4}-\d{4}-\d{4}-\d{3}[0-9x]))$",
276 orcid)
277
278 help_url = u"https://support.orcid.org/knowledgebase/articles/"\
279 "116780-structure-of-the-orcid-identifier"
280 if not match:
281 raise ValueError(u"Invalid ORCID: %s\n%s" % (orcid, help_url))
282
283 # Conservative in what we produce:
284 # a) Ensure any checksum digit is uppercase
285 orcid_num = match.group("orcid").upper()
286 # b) ..and correct
287 if not _check_mod_11_2(orcid_num):
288 raise ValueError(
289 u"Invalid ORCID checksum: %s\n%s" % (orcid_num, help_url))
290
291 # c) Re-add the official prefix https://orcid.org/
292 return u"https://orcid.org/%s" % orcid_num
293
294
295 class ProvenanceProfile():
296 """
297 Provenance profile.
298
299 Populated as the workflow runs.
300 """
301
302 def __init__(self,
303 research_object, # type: ResearchObject
304 full_name, # type: str
305 host_provenance, # type: bool
306 user_provenance, # type: bool
307 orcid, # type: str
308 fsaccess, # type: StdFsAccess
309 run_uuid=None # type: Optional[uuid.UUID]
310 ): # type: (...) -> None
311 """Initialize the provenance profile."""
312 self.fsaccess = fsaccess
313 self.orcid = orcid
314 self.research_object = research_object
315 self.folder = self.research_object.folder
316 self.document = ProvDocument()
317 self.host_provenance = host_provenance
318 self.user_provenance = user_provenance
319 self.engine_uuid = research_object.engine_uuid
320 self.add_to_manifest = self.research_object.add_to_manifest
321 if self.orcid:
322 _logger.debug(u"[provenance] Creator ORCID: %s", self.orcid)
323 self.full_name = full_name
324 if self.full_name:
325 _logger.debug(u"[provenance] Creator Full name: %s",
326 self.full_name)
327 if run_uuid is None:
328 run_uuid = uuid.uuid4()
329 self.workflow_run_uuid = run_uuid
330 self.workflow_run_uri = run_uuid.urn
331 self.generate_prov_doc()
332
333 def __str__(self): # type: () -> str
334 """Represent this Provenvance profile as a string."""
335 return "ProvenanceProfile <%s> in <%s>" % (
336 self.workflow_run_uri, self.research_object)
337
338 def generate_prov_doc(self):
339 # type: () -> Tuple[str, ProvDocument]
340 """Add basic namespaces."""
341 def host_provenance(document):
342 # type: (ProvDocument) -> None
343 """Record host provenance."""
344 document.add_namespace(CWLPROV)
345 document.add_namespace(UUID)
346 document.add_namespace(FOAF)
347
348 hostname = getfqdn()
349 # won't have a foaf:accountServiceHomepage for unix hosts, but
350 # we can at least provide hostname
351 document.agent(
352 ACCOUNT_UUID, {provM.PROV_TYPE: FOAF["OnlineAccount"],
353 "prov:location": hostname,
354 CWLPROV["hostname"]: hostname})
355
356 self.cwltool_version = "cwltool %s" % versionstring().split()[-1]
357 self.document.add_namespace(
358 'wfprov', 'http://purl.org/wf4ever/wfprov#')
359 # document.add_namespace('prov', 'http://www.w3.org/ns/prov#')
360 self.document.add_namespace(
361 'wfdesc', 'http://purl.org/wf4ever/wfdesc#')
362 # TODO: Make this ontology. For now only has cwlprov:image
363 self.document.add_namespace('cwlprov', 'https://w3id.org/cwl/prov#')
364 self.document.add_namespace('foaf', 'http://xmlns.com/foaf/0.1/')
365 self.document.add_namespace('schema', 'http://schema.org/')
366 self.document.add_namespace('orcid', 'https://orcid.org/')
367 self.document.add_namespace('id', 'urn:uuid:')
368 # NOTE: Internet draft expired 2004-03-04 (!)
369 # https://tools.ietf.org/html/draft-thiemann-hash-urn-01
370 # TODO: Change to nih:sha-256; hashes
371 # https://tools.ietf.org/html/rfc6920#section-7
372 self.document.add_namespace('data', 'urn:hash::sha1:')
373 # Also needed for docker images
374 self.document.add_namespace(SHA256, "nih:sha-256;")
375
376 # info only, won't really be used by prov as sub-resources use /
377 self.document.add_namespace(
378 'researchobject', self.research_object.base_uri)
379 # annotations
380 self.metadata_ns = self.document.add_namespace(
381 'metadata', self.research_object.base_uri + METADATA + "/")
382 # Pre-register provenance directory so we can refer to its files
383 self.provenance_ns = self.document.add_namespace(
384 'provenance', self.research_object.base_uri
385 + _posix_path(PROVENANCE) + "/")
386 ro_identifier_workflow = self.research_object.base_uri \
387 + "workflow/packed.cwl#"
388 self.wf_ns = self.document.add_namespace("wf", ro_identifier_workflow)
389 ro_identifier_input = self.research_object.base_uri \
390 + "workflow/primary-job.json#"
391 self.document.add_namespace("input", ro_identifier_input)
392
393 # More info about the account (e.g. username, fullname)
394 # may or may not have been previously logged by user_provenance()
395 # .. but we always know cwltool was launched (directly or indirectly)
396 # by a user account, as cwltool is a command line tool
397 account = self.document.agent(ACCOUNT_UUID)
398 if self.orcid or self.full_name:
399 person = {provM.PROV_TYPE: PROV["Person"],
400 "prov:type": SCHEMA["Person"]}
401 if self.full_name:
402 person["prov:label"] = self.full_name
403 person["foaf:name"] = self.full_name
404 person["schema:name"] = self.full_name
405 else:
406 # TODO: Look up name from ORCID API?
407 pass
408 agent = self.document.agent(self.orcid or uuid.uuid4().urn,
409 person)
410 self.document.actedOnBehalfOf(account, agent)
411 else:
412 if self.host_provenance:
413 host_provenance(self.document)
414 if self.user_provenance:
415 self.research_object.user_provenance(self.document)
416 # The execution of cwltool
417 wfengine = self.document.agent(
418 self.engine_uuid,
419 {provM.PROV_TYPE: PROV["SoftwareAgent"],
420 "prov:type": WFPROV["WorkflowEngine"],
421 "prov:label": self.cwltool_version})
422 # FIXME: This datetime will be a bit too delayed, we should
423 # capture when cwltool.py earliest started?
424 self.document.wasStartedBy(
425 wfengine, None, account, datetime.datetime.now())
426 # define workflow run level activity
427 self.document.activity(
428 self.workflow_run_uri, datetime.datetime.now(), None,
429 {provM.PROV_TYPE: WFPROV["WorkflowRun"],
430 "prov:label": "Run of workflow/packed.cwl#main"})
431 # association between SoftwareAgent and WorkflowRun
432 main_workflow = "wf:main"
433 self.document.wasAssociatedWith(
434 self.workflow_run_uri, self.engine_uuid, main_workflow)
435 self.document.wasStartedBy(
436 self.workflow_run_uri, None, self.engine_uuid,
437 datetime.datetime.now())
438 return (self.workflow_run_uri, self.document)
439
440 def evaluate(self,
441 process, # type: Process
442 job, # type: Any
443 job_order_object, # type: Dict[Text, Text]
444 research_obj # type: ResearchObject
445 ): # type: (...) -> None
446 """Evaluate the nature of job."""
447 if not hasattr(process, "steps"):
448 # record provenance of independent commandline tool executions
449 self.prospective_prov(job)
450 customised_job = copy_job_order(job, job_order_object)
451 self.used_artefacts(customised_job, self.workflow_run_uri)
452 research_obj.create_job(customised_job, job)
453 elif hasattr(job, "workflow"):
454 # record provenance of workflow executions
455 self.prospective_prov(job)
456 customised_job = copy_job_order(job, job_order_object)
457 self.used_artefacts(customised_job, self.workflow_run_uri)
458
459 def record_process_start(self, process, job, process_run_id=None):
460 # type: (Process, Any, Optional[str]) -> Optional[str]
461 if not hasattr(process, 'steps'):
462 process_run_id = self.workflow_run_uri
463 elif not hasattr(job, 'workflow'):
464 # commandline tool execution as part of workflow
465 name = str(job.name) if hasattr(job, 'name') else ''
466 process_name = urllib.parse.quote(name, safe=":/,#")
467 process_run_id = self.start_process(process_name, datetime.datetime.now())
468 return process_run_id
469
470 def start_process(self, process_name, when, process_run_id=None):
471 # type: (Text, datetime.datetime, Optional[str]) -> str
472 """Record the start of each Process."""
473 if process_run_id is None:
474 process_run_id = uuid.uuid4().urn
475 prov_label = "Run of workflow/packed.cwl#main/" + process_name
476 self.document.activity(
477 process_run_id, None, None,
478 {provM.PROV_TYPE: WFPROV["ProcessRun"],
479 provM.PROV_LABEL: prov_label})
480 self.document.wasAssociatedWith(
481 process_run_id, self.engine_uuid, str("wf:main/" + process_name))
482 self.document.wasStartedBy(
483 process_run_id, None, self.workflow_run_uri,
484 when, None, None)
485 return process_run_id
486
487 def record_process_end(self, process_name, process_run_id, outputs, when):
488 # type: (Text, str, Any, datetime.datetime) -> None
489 self.generate_output_prov(outputs, process_run_id, process_name)
490 self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when)
491
492 def declare_file(self, value):
493 # type: (MutableMapping[Text, Any]) -> Tuple[ProvEntity, ProvEntity, str]
494 if value["class"] != "File":
495 raise ValueError("Must have class:File: %s" % value)
496 # Need to determine file hash aka RO filename
497 entity = None # type: Optional[ProvEntity]
498 checksum = None
499 if 'checksum' in value:
500 csum = value['checksum']
501 (method, checksum) = csum.split("$", 1)
502 if method == SHA1 and \
503 self.research_object.has_data_file(checksum):
504 entity = self.document.entity("data:" + checksum)
505
506 if not entity and 'location' in value:
507 location = str(value['location'])
508 # If we made it here, we'll have to add it to the RO
509 with self.fsaccess.open(location, "rb") as fhandle:
510 relative_path = self.research_object.add_data_file(fhandle)
511 # FIXME: This naively relies on add_data_file setting hash as filename
512 checksum = PurePath(relative_path).name
513 entity = self.document.entity(
514 "data:" + checksum, {provM.PROV_TYPE: WFPROV["Artifact"]})
515 if "checksum" not in value:
516 value["checksum"] = "%s$%s" % (SHA1, checksum)
517
518
519 if not entity and 'contents' in value:
520 # Anonymous file, add content as string
521 entity, checksum = self.declare_string(value["contents"])
522
523 # By here one of them should have worked!
524 if not entity or not checksum:
525 raise ValueError("class:File but missing checksum/location/content: %r" % value)
526
527
528 # Track filename and extension, this is generally useful only for
529 # secondaryFiles. Note that multiple uses of a file might thus record
530 # different names for the same entity, so we'll
531 # make/track a specialized entity by UUID
532 file_id = value.setdefault("@id", uuid.uuid4().urn)
533 # A specialized entity that has just these names
534 file_entity = self.document.entity(
535 file_id, [(provM.PROV_TYPE, WFPROV["Artifact"]),
536 (provM.PROV_TYPE, WF4EVER["File"])]) # type: ProvEntity
537
538 if "basename" in value:
539 file_entity.add_attributes({CWLPROV["basename"]: value["basename"]})
540 if "nameroot" in value:
541 file_entity.add_attributes({CWLPROV["nameroot"]: value["nameroot"]})
542 if "nameext" in value:
543 file_entity.add_attributes({CWLPROV["nameext"]: value["nameext"]})
544 self.document.specializationOf(file_entity, entity)
545
546 # Check for secondaries
547 for sec in value.get("secondaryFiles", ()):
548 # TODO: Record these in a specializationOf entity with UUID?
549 if sec['class'] == "File":
550 (sec_entity, _, _) = self.declare_file(sec)
551 elif sec['class'] == "Directory":
552 sec_entity = self.declare_directory(sec)
553 else:
554 raise ValueError("Got unexpected secondaryFiles value: {}".format(sec))
555 # We don't know how/when/where the secondary file was generated,
556 # but CWL convention is a kind of summary/index derived
557 # from the original file. As its generally in a different format
558 # then prov:Quotation is not appropriate.
559 self.document.derivation(
560 sec_entity, file_entity,
561 other_attributes={PROV["type"]: CWLPROV["SecondaryFile"]})
562
563 return file_entity, entity, checksum
564
565 def declare_directory(self, value): # type: (MutableMapping[Text, Any]) -> ProvEntity
566 """Register any nested files/directories."""
567 # FIXME: Calculate a hash-like identifier for directory
568 # so we get same value if it's the same filenames/hashes
569 # in a different location.
570 # For now, mint a new UUID to identify this directory, but
571 # attempt to keep it inside the value dictionary
572 dir_id = value.setdefault("@id", uuid.uuid4().urn)
573
574 # New annotation file to keep the ORE Folder listing
575 ore_doc_fn = dir_id.replace("urn:uuid:", "directory-") + ".ttl"
576 dir_bundle = self.document.bundle(self.metadata_ns[ore_doc_fn])
577
578 coll = self.document.entity(
579 dir_id, [(provM.PROV_TYPE, WFPROV["Artifact"]),
580 (provM.PROV_TYPE, PROV["Collection"]),
581 (provM.PROV_TYPE, PROV["Dictionary"]),
582 (provM.PROV_TYPE, RO["Folder"])])
583 # ORE description of ro:Folder, saved separately
584 coll_b = dir_bundle.entity(
585 dir_id, [(provM.PROV_TYPE, RO["Folder"]),
586 (provM.PROV_TYPE, ORE["Aggregation"])])
587 self.document.mentionOf(dir_id + "#ore", dir_id, dir_bundle.identifier)
588
589 # dir_manifest = dir_bundle.entity(
590 # dir_bundle.identifier, {PROV["type"]: ORE["ResourceMap"],
591 # ORE["describes"]: coll_b.identifier})
592
593 coll_attribs = [(ORE["isDescribedBy"], dir_bundle.identifier)]
594 coll_b_attribs = [] # type: List[Tuple[Identifier, ProvEntity]]
595
596 # FIXME: .listing might not be populated yet - hopefully
597 # a later call to this method will sort that
598 is_empty = True
599
600 if "listing" not in value:
601 get_listing(self.fsaccess, value)
602 for entry in value.get("listing", []):
603 is_empty = False
604 # Declare child-artifacts
605 entity = self.declare_artefact(entry)
606 self.document.membership(coll, entity)
607 # Membership relation aka our ORE Proxy
608 m_id = uuid.uuid4().urn
609 m_entity = self.document.entity(m_id)
610 m_b = dir_bundle.entity(m_id)
611
612 # PROV-O style Dictionary
613 # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition
614 # ..as prov.py do not currently allow PROV-N extensions
615 # like hadDictionaryMember(..)
616 m_entity.add_asserted_type(PROV["KeyEntityPair"])
617
618 m_entity.add_attributes({
619 PROV["pairKey"]: entry["basename"],
620 PROV["pairEntity"]: entity,
621 })
622
623 # As well as a being a
624 # http://wf4ever.github.io/ro/2016-01-28/ro/#FolderEntry
625 m_b.add_asserted_type(RO["FolderEntry"])
626 m_b.add_asserted_type(ORE["Proxy"])
627 m_b.add_attributes({
628 RO["entryName"]: entry["basename"],
629 ORE["proxyIn"]: coll,
630 ORE["proxyFor"]: entity,
631
632 })
633 coll_attribs.append((PROV["hadDictionaryMember"], m_entity))
634 coll_b_attribs.append((ORE["aggregates"], m_b))
635
636 coll.add_attributes(coll_attribs)
637 coll_b.add_attributes(coll_b_attribs)
638
639 # Also Save ORE Folder as annotation metadata
640 ore_doc = ProvDocument()
641 ore_doc.add_namespace(ORE)
642 ore_doc.add_namespace(RO)
643 ore_doc.add_namespace(UUID)
644 ore_doc.add_bundle(dir_bundle)
645 ore_doc = ore_doc.flattened()
646 ore_doc_path = str(PurePosixPath(METADATA, ore_doc_fn))
647 with self.research_object.write_bag_file(ore_doc_path) as provenance_file:
648 ore_doc.serialize(provenance_file, format="rdf", rdf_format="turtle")
649 self.research_object.add_annotation(dir_id, [ore_doc_fn], ORE["isDescribedBy"].uri)
650
651 if is_empty:
652 # Empty directory
653 coll.add_asserted_type(PROV["EmptyCollection"])
654 coll.add_asserted_type(PROV["EmptyDictionary"])
655 self.research_object.add_uri(coll.identifier.uri)
656 return coll
657
658 def declare_string(self, value):
659 # type: (Union[Text, str]) -> Tuple[ProvEntity,Text]
660 """Save as string in UTF-8."""
661 byte_s = BytesIO(str(value).encode(ENCODING))
662 data_file = self.research_object.add_data_file(byte_s, content_type=TEXT_PLAIN)
663 checksum = PurePosixPath(data_file).name
664 # FIXME: Don't naively assume add_data_file uses hash in filename!
665 data_id = "data:%s" % PurePosixPath(data_file).stem
666 entity = self.document.entity(
667 data_id, {provM.PROV_TYPE: WFPROV["Artifact"],
668 provM.PROV_VALUE: str(value)}) # type: ProvEntity
669 return entity, checksum
670
671 def declare_artefact(self, value):
672 # type: (Any) -> ProvEntity
673 """Create data artefact entities for all file objects."""
674 if value is None:
675 # FIXME: If this can happen in CWL, we'll
676 # need a better way to represent this in PROV
677 return self.document.entity(
678 CWLPROV["None"], {provM.PROV_LABEL: "None"})
679
680 if isinstance(value, (bool, int, float)):
681 # Typically used in job documents for flags
682
683 # FIXME: Make consistent hash URIs for these
684 # that somehow include the type
685 # (so "1" != 1 != "1.0" != true)
686 entity = self.document.entity(
687 uuid.uuid4().urn, {provM.PROV_VALUE: value})
688 self.research_object.add_uri(entity.identifier.uri)
689 return entity
690
691 if isinstance(value, (Text, str)):
692 (entity, _) = self.declare_string(value)
693 return entity
694
695 if isinstance(value, bytes):
696 # If we got here then we must be in Python 3
697 byte_s = BytesIO(value)
698 data_file = self.research_object.add_data_file(byte_s)
699 # FIXME: Don't naively assume add_data_file uses hash in filename!
700 data_id = "data:%s" % PurePosixPath(data_file).stem
701 return self.document.entity(
702 data_id, {provM.PROV_TYPE: WFPROV["Artifact"],
703 provM.PROV_VALUE: str(value)})
704
705 if isinstance(value, MutableMapping):
706 if "@id" in value:
707 # Already processed this value, but it might not be in this PROV
708 entities = self.document.get_record(value["@id"])
709 if entities:
710 return entities[0]
711 # else, unknown in PROV, re-add below as if it's fresh
712
713 # Base case - we found a File we need to update
714 if value.get("class") == "File":
715 (entity, _, _) = self.declare_file(value)
716 value["@id"] = entity.identifier.uri
717 return entity
718
719 if value.get("class") == "Directory":
720 entity = self.declare_directory(value)
721 value["@id"] = entity.identifier.uri
722 return entity
723 coll_id = value.setdefault("@id", uuid.uuid4().urn)
724 # some other kind of dictionary?
725 # TODO: also Save as JSON
726 coll = self.document.entity(
727 coll_id, [(provM.PROV_TYPE, WFPROV["Artifact"]),
728 (provM.PROV_TYPE, PROV["Collection"]),
729 (provM.PROV_TYPE, PROV["Dictionary"])])
730
731 if value.get("class"):
732 _logger.warning("Unknown data class %s.", value["class"])
733 # FIXME: The class might be "http://example.com/somethingelse"
734 coll.add_asserted_type(CWLPROV[value["class"]])
735
736 # Let's iterate and recurse
737 coll_attribs = [] # type: List[Tuple[Identifier, ProvEntity]]
738 for (key, val) in value.items():
739 v_ent = self.declare_artefact(val)
740 self.document.membership(coll, v_ent)
741 m_entity = self.document.entity(uuid.uuid4().urn)
742 # Note: only support PROV-O style dictionary
743 # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition
744 # as prov.py do not easily allow PROV-N extensions
745 m_entity.add_asserted_type(PROV["KeyEntityPair"])
746 m_entity.add_attributes({
747 PROV["pairKey"]: str(key),
748 PROV["pairEntity"]: v_ent
749 })
750 coll_attribs.append((PROV["hadDictionaryMember"], m_entity))
751 coll.add_attributes(coll_attribs)
752 self.research_object.add_uri(coll.identifier.uri)
753 return coll
754
755 # some other kind of Collection?
756 # TODO: also save as JSON
757 try:
758 members = []
759 for each_input_obj in iter(value):
760 # Recurse and register any nested objects
761 e = self.declare_artefact(each_input_obj)
762 members.append(e)
763
764 # If we reached this, then we were allowed to iterate
765 coll = self.document.entity(
766 uuid.uuid4().urn, [(provM.PROV_TYPE, WFPROV["Artifact"]),
767 (provM.PROV_TYPE, PROV["Collection"])])
768 if not members:
769 coll.add_asserted_type(PROV["EmptyCollection"])
770 else:
771 for member in members:
772 # FIXME: This won't preserve order, for that
773 # we would need to use PROV.Dictionary
774 # with numeric keys
775 self.document.membership(coll, member)
776 self.research_object.add_uri(coll.identifier.uri)
777 # FIXME: list value does not support adding "@id"
778 return coll
779 except TypeError:
780 _logger.warning("Unrecognized type %s of %r",
781 type(value), value)
782 # Let's just fall back to Python repr()
783 entity = self.document.entity(
784 uuid.uuid4().urn, {provM.PROV_LABEL: repr(value)})
785 self.research_object.add_uri(entity.identifier.uri)
786 return entity
787
788 def used_artefacts(self,
789 job_order, # type: Union[Dict[Any, Any], List[Dict[Any, Any]]]
790 process_run_id, # type: str
791 name=None # type: Optional[str]
792 ): # type: (...) -> None
793 """Add used() for each data artefact."""
794 if isinstance(job_order, list):
795 for entry in job_order:
796 self.used_artefacts(entry, process_run_id, name)
797 else:
798 # FIXME: Use workflow name in packed.cwl, "main" is wrong for nested workflows
799 base = "main"
800 if name is not None:
801 base += "/" + name
802 for key, value in job_order.items():
803 prov_role = self.wf_ns["%s/%s" % (base, key)]
804 try:
805 entity = self.declare_artefact(value)
806 self.document.used(
807 process_run_id, entity, datetime.datetime.now(), None,
808 {"prov:role": prov_role})
809 except OSError:
810 pass
811
812 def generate_output_prov(self,
813 final_output, # type: Union[Dict[Text, Any], List[Dict[Text, Any]]]
814 process_run_id, # type: Optional[str]
815 name # type: Optional[Text]
816 ): # type: (...) -> None
817 """Call wasGeneratedBy() for each output,copy the files into the RO."""
818 if isinstance(final_output, list):
819 for entry in final_output:
820 self.generate_output_prov(entry, process_run_id, name)
821 else:
822 # Timestamp should be created at the earliest
823 timestamp = datetime.datetime.now()
824
825 # For each output, find/register the corresponding
826 # entity (UUID) and document it as generated in
827 # a role corresponding to the output
828 for output, value in final_output.items():
829 entity = self.declare_artefact(value)
830 if name is not None:
831 name = urllib.parse.quote(str(name), safe=":/,#")
832 # FIXME: Probably not "main" in nested workflows
833 role = self.wf_ns["main/%s/%s" % (name, output)]
834 else:
835 role = self.wf_ns["main/%s" % output]
836
837 if not process_run_id:
838 process_run_id = self.workflow_run_uri
839
840 self.document.wasGeneratedBy(
841 entity, process_run_id, timestamp, None, {"prov:role": role})
842
843 def prospective_prov(self, job):
844 # type: (Any) -> None
845 """Create prospective prov recording as wfdesc prov:Plan."""
846 if not hasattr(job, "steps"):
847 # direct command line tool execution
848 self.document.entity(
849 "wf:main", {provM.PROV_TYPE: WFDESC["Process"],
850 "prov:type": PROV["Plan"],
851 "prov:label":"Prospective provenance"})
852 return
853
854 self.document.entity(
855 "wf:main", {provM.PROV_TYPE: WFDESC["Workflow"],
856 "prov:type": PROV["Plan"],
857 "prov:label":"Prospective provenance"})
858
859 for step in job.steps:
860 stepnametemp = "wf:main/" + str(step.name)[5:]
861 stepname = urllib.parse.quote(stepnametemp, safe=":/,#")
862 step = self.document.entity(
863 stepname, {provM.PROV_TYPE: WFDESC["Process"],
864 "prov:type": PROV["Plan"]})
865 self.document.entity(
866 "wf:main", {"wfdesc:hasSubProcess": step,
867 "prov:label": "Prospective provenance"})
868 # TODO: Declare roles/parameters as well
869
870 def activity_has_provenance(self, activity, prov_ids):
871 # type: (str, List[Identifier]) -> None
872 """Add http://www.w3.org/TR/prov-aq/ relations to nested PROV files."""
873 # NOTE: The below will only work if the corresponding metadata/provenance arcp URI
874 # is a pre-registered namespace in the PROV Document
875 attribs = [(PROV["has_provenance"], prov_id) for prov_id in prov_ids]
876 self.document.activity(activity, other_attributes=attribs)
877 # Tip: we can't use https://www.w3.org/TR/prov-links/#term-mention
878 # as prov:mentionOf() is only for entities, not activities
879 uris = [i.uri for i in prov_ids]
880 self.research_object.add_annotation(activity, uris, PROV["has_provenance"].uri)
881
882 def finalize_prov_profile(self, name):
883 # type: (Optional[Text]) -> List[Identifier]
884 """Transfer the provenance related files to the RO."""
885 # NOTE: Relative posix path
886 if name is None:
887 # master workflow, fixed filenames
888 filename = "primary.cwlprov"
889 else:
890 # ASCII-friendly filename, avoiding % as we don't want %2520 in manifest.json
891 wf_name = urllib.parse.quote(str(name), safe="").replace("%", "_")
892 # Note that the above could cause overlaps for similarly named
893 # workflows, but that's OK as we'll also include run uuid
894 # which also covers thhe case of this step being run in
895 # multiple places or iterations
896 filename = "%s.%s.cwlprov" % (wf_name, self.workflow_run_uuid)
897
898 basename = str(PurePosixPath(PROVENANCE)/filename)
899
900 # TODO: Also support other profiles than CWLProv, e.g. ProvOne
901
902 # list of prov identifiers of provenance files
903 prov_ids = []
904
905 # https://www.w3.org/TR/prov-xml/
906 with self.research_object.write_bag_file(basename + ".xml") as provenance_file:
907 self.document.serialize(provenance_file, format="xml", indent=4)
908 prov_ids.append(self.provenance_ns[filename + ".xml"])
909
910 # https://www.w3.org/TR/prov-n/
911 with self.research_object.write_bag_file(basename + ".provn") as provenance_file:
912 self.document.serialize(provenance_file, format="provn", indent=2)
913 prov_ids.append(self.provenance_ns[filename + ".provn"])
914
915 # https://www.w3.org/Submission/prov-json/
916 with self.research_object.write_bag_file(basename + ".json") as provenance_file:
917 self.document.serialize(provenance_file, format="json", indent=2)
918 prov_ids.append(self.provenance_ns[filename + ".json"])
919
920 # "rdf" aka https://www.w3.org/TR/prov-o/
921 # which can be serialized to ttl/nt/jsonld (and more!)
922
923 # https://www.w3.org/TR/turtle/
924 with self.research_object.write_bag_file(basename + ".ttl") as provenance_file:
925 self.document.serialize(provenance_file, format="rdf", rdf_format="turtle")
926 prov_ids.append(self.provenance_ns[filename + ".ttl"])
927
928 # https://www.w3.org/TR/n-triples/
929 with self.research_object.write_bag_file(basename + ".nt") as provenance_file:
930 self.document.serialize(provenance_file, format="rdf", rdf_format="ntriples")
931 prov_ids.append(self.provenance_ns[filename + ".nt"])
932
933 # https://www.w3.org/TR/json-ld/
934 # TODO: Use a nice JSON-LD context
935 # see also https://eprints.soton.ac.uk/395985/
936 # 404 Not Found on https://provenance.ecs.soton.ac.uk/prov.jsonld :(
937 with self.research_object.write_bag_file(basename + ".jsonld") as provenance_file:
938 self.document.serialize(provenance_file, format="rdf", rdf_format="json-ld")
939 prov_ids.append(self.provenance_ns[filename + ".jsonld"])
940
941 _logger.debug(u"[provenance] added provenance: %s", prov_ids)
942 return prov_ids
943
944
945 class ResearchObject():
946 """CWLProv Research Object."""
947
948 def __init__(self, fsaccess, temp_prefix_ro="tmp", orcid='', full_name=''):
949 # type: (StdFsAccess, str, Text, Text) -> None
950 """Initialize the ResearchObject."""
951 self.temp_prefix = temp_prefix_ro
952 self.orcid = '' if not orcid else _valid_orcid(orcid)
953 self.full_name = full_name
954 tmp_dir, tmp_prefix = os.path.split(temp_prefix_ro)
955 self.folder = os.path.abspath(tempfile.mkdtemp(prefix=tmp_prefix,
956 dir=tmp_dir)) # type: Text
957 self.closed = False
958 # map of filename "data/de/alsdklkas": 12398123 bytes
959 self.bagged_size = {} # type: Dict[Text, int]
960 self.tagfiles = set() # type: Set[Text]
961 self._file_provenance = {} # type: Dict[Text, Dict[Text, Text]]
962 self._external_aggregates = [] # type: List[Dict[Text, Text]]
963 self.annotations = [] # type: List[Dict[Text, Any]]
964 self._content_types = {} # type: Dict[Text,str]
965 self.fsaccess = fsaccess
966 # These should be replaced by generate_prov_doc when workflow/run IDs are known:
967 self.engine_uuid = "urn:uuid:%s" % uuid.uuid4()
968 self.ro_uuid = uuid.uuid4()
969 self.base_uri = "arcp://uuid,%s/" % self.ro_uuid
970 self.cwltool_version = "cwltool %s" % versionstring().split()[-1]
971 ##
972 self.relativised_input_object = {} # type: Dict[Any, Any]
973
974 self._initialize()
975 _logger.debug(u"[provenance] Temporary research object: %s",
976 self.folder)
977
978 def self_check(self): # type: () -> None
979 """Raise ValueError if this RO is closed."""
980 if self.closed:
981 raise ValueError(
982 "This ResearchObject has already been closed and is not "
983 "available for futher manipulation.")
984
985 def __str__(self): # type: () -> str
986 """Represent this RO as a string."""
987 return "ResearchObject <{}> in <{}>".format(self.ro_uuid, self.folder)
988
989 def _initialize(self): # type: () -> None
990 for research_obj_folder in (METADATA, DATA, WORKFLOW, SNAPSHOT,
991 PROVENANCE, LOGS):
992 os.makedirs(os.path.join(self.folder, research_obj_folder))
993 self._initialize_bagit()
994
995 def _initialize_bagit(self): # type: () -> None
996 """Write fixed bagit header."""
997 self.self_check()
998 bagit = os.path.join(self.folder, "bagit.txt")
999 # encoding: always UTF-8 (although ASCII would suffice here)
1000 # newline: ensure LF also on Windows
1001 with open(bagit, "w", encoding=ENCODING, newline='\n') as bag_it_file:
1002 # TODO: \n or \r\n ?
1003 bag_it_file.write(u"BagIt-Version: 0.97\n")
1004 bag_it_file.write(u"Tag-File-Character-Encoding: %s\n" % ENCODING)
1005
1006 def open_log_file_for_activity(self, uuid_uri): # type: (Text) -> WritableBagFile
1007 self.self_check()
1008 # Ensure valid UUID for safe filenames
1009 activity_uuid = uuid.UUID(uuid_uri)
1010 if activity_uuid.urn == self.engine_uuid:
1011 # It's the engine aka cwltool!
1012 name = "engine"
1013 else:
1014 name = "activity"
1015 p = os.path.join(LOGS, "{}.{}.txt".format(name, activity_uuid))
1016 _logger.debug("[provenance] Opening log file for %s: %s" % (name, p))
1017 self.add_annotation(activity_uuid.urn, [p], CWLPROV["log"].uri)
1018 return self.write_bag_file(p)
1019
1020 def _finalize(self): # type: () -> None
1021 self._write_ro_manifest()
1022 self._write_bag_info()
1023
1024 def user_provenance(self, document): # type: (ProvDocument) -> None
1025 """Add the user provenance."""
1026 self.self_check()
1027 (username, fullname) = _whoami()
1028
1029 if not self.full_name:
1030 self.full_name = fullname
1031
1032 document.add_namespace(UUID)
1033 document.add_namespace(ORCID)
1034 document.add_namespace(FOAF)
1035 account = document.agent(
1036 ACCOUNT_UUID, {provM.PROV_TYPE: FOAF["OnlineAccount"],
1037 "prov:label": username,
1038 FOAF["accountName"]: username})
1039
1040 user = document.agent(
1041 self.orcid or USER_UUID,
1042 {provM.PROV_TYPE: PROV["Person"],
1043 "prov:label": self.full_name,
1044 FOAF["name"]: self.full_name,
1045 FOAF["account"]: account})
1046 # cwltool may be started on the shell (directly by user),
1047 # by shell script (indirectly by user)
1048 # or from a different program
1049 # (which again is launched by any of the above)
1050 #
1051 # We can't tell in which way, but ultimately we're still
1052 # acting in behalf of that user (even if we might
1053 # get their name wrong!)
1054 document.actedOnBehalfOf(account, user)
1055
1056 def write_bag_file(self, path, encoding=ENCODING):
1057 # type: (Text, Optional[str]) -> WritableBagFile
1058 """Write the bag file into our research object."""
1059 self.self_check()
1060 # For some reason below throws BlockingIOError
1061 #fp = BufferedWriter(WritableBagFile(self, path))
1062 bag_file = WritableBagFile(self, path)
1063 if encoding is not None:
1064 # encoding: match Tag-File-Character-Encoding: UTF-8
1065 # newline: ensure LF also on Windows
1066 return cast(WritableBagFile,
1067 TextIOWrapper(cast(IO[bytes], bag_file), encoding=encoding,
1068 newline="\n"))
1069 return bag_file
1070
1071 def add_tagfile(self, path, timestamp=None):
1072 # type: (Text, Optional[datetime.datetime]) -> None
1073 """Add tag files to our research object."""
1074 self.self_check()
1075 checksums = {}
1076 # Read file to calculate its checksum
1077 if os.path.isdir(path):
1078 return
1079 # FIXME: do the right thing for directories
1080 with open(path, "rb") as tag_file:
1081 # FIXME: Should have more efficient open_tagfile() that
1082 # does all checksums in one go while writing through,
1083 # adding checksums after closing.
1084 # Below probably OK for now as metadata files
1085 # are not too large..?
1086
1087 checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1)
1088
1089 tag_file.seek(0)
1090 checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256)
1091
1092 tag_file.seek(0)
1093 checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512)
1094
1095 rel_path = _posix_path(os.path.relpath(path, self.folder))
1096 self.tagfiles.add(rel_path)
1097 self.add_to_manifest(rel_path, checksums)
1098 if timestamp is not None:
1099 self._file_provenance[rel_path] = {"createdOn": timestamp.isoformat()}
1100
1101 def _ro_aggregates(self):
1102 # type: () -> List[Dict[Text, Any]]
1103 """Gather dictionary of files to be added to the manifest."""
1104 def guess_mediatype(rel_path):
1105 # type: (Text) -> Dict[Text, Any]
1106 """Return the mediatypes."""
1107 media_types = {
1108 # Adapted from
1109 # https://w3id.org/bundle/2014-11-05/#media-types
1110
1111 "txt": TEXT_PLAIN,
1112 "ttl": 'text/turtle; charset="UTF-8"',
1113 "rdf": 'application/rdf+xml',
1114 "json": 'application/json',
1115 "jsonld": 'application/ld+json',
1116 "xml": 'application/xml',
1117 ##
1118 "cwl": 'text/x+yaml; charset="UTF-8"',
1119 "provn": 'text/provenance-notation; charset="UTF-8"',
1120 "nt": 'application/n-triples',
1121 } # type: Dict[Text, Text]
1122 conforms_to = {
1123 "provn": 'http://www.w3.org/TR/2013/REC-prov-n-20130430/',
1124 "cwl": 'https://w3id.org/cwl/',
1125 } # type: Dict[Text, Text]
1126
1127 prov_conforms_to = {
1128 "provn": 'http://www.w3.org/TR/2013/REC-prov-n-20130430/',
1129 "rdf": 'http://www.w3.org/TR/2013/REC-prov-o-20130430/',
1130 "ttl": 'http://www.w3.org/TR/2013/REC-prov-o-20130430/',
1131 "nt": 'http://www.w3.org/TR/2013/REC-prov-o-20130430/',
1132 "jsonld": 'http://www.w3.org/TR/2013/REC-prov-o-20130430/',
1133 "xml": 'http://www.w3.org/TR/2013/NOTE-prov-xml-20130430/',
1134 "json": 'http://www.w3.org/Submission/2013/SUBM-prov-json-20130424/',
1135 } # type: Dict[Text, Text]
1136
1137
1138 extension = rel_path.rsplit(".", 1)[-1].lower() # type: Optional[Text]
1139 if extension == rel_path:
1140 # No ".", no extension
1141 extension = None
1142
1143 local_aggregate = {} # type: Dict[Text, Any]
1144 if extension in media_types:
1145 local_aggregate["mediatype"] = media_types[extension]
1146
1147 if extension in conforms_to:
1148 # TODO: Open CWL file to read its declared "cwlVersion", e.g.
1149 # cwlVersion = "v1.0"
1150 local_aggregate["conformsTo"] = conforms_to[extension]
1151
1152 if (rel_path.startswith(_posix_path(PROVENANCE))
1153 and extension in prov_conforms_to):
1154 if ".cwlprov" in rel_path:
1155 # Our own!
1156 local_aggregate["conformsTo"] = [prov_conforms_to[extension], CWLPROV_VERSION]
1157 else:
1158 # Some other PROV
1159 # TODO: Recognize ProvOne etc.
1160 local_aggregate["conformsTo"] = prov_conforms_to[extension]
1161 return local_aggregate
1162
1163 aggregates = [] # type: List[Dict[Text, Any]]
1164 for path in self.bagged_size.keys():
1165 aggregate_dict = {} # type: Dict[Text, Any]
1166
1167 temp_path = PurePosixPath(path)
1168 folder = temp_path.parent
1169 filename = temp_path.name
1170
1171 # NOTE: Here we end up aggregating the abstract
1172 # data items by their sha1 hash, so that it matches
1173 # the entity() in the prov files.
1174
1175 # TODO: Change to nih:sha-256; hashes
1176 # https://tools.ietf.org/html/rfc6920#section-7
1177 aggregate_dict["uri"] = 'urn:hash::sha1:' + filename
1178 aggregate_dict["bundledAs"] = {
1179 # The arcp URI is suitable ORE proxy; local to this Research Object.
1180 # (as long as we don't also aggregate it by relative path!)
1181 "uri": self.base_uri + path,
1182 # relate it to the data/ path
1183 "folder": "/%s/" % folder,
1184 "filename": filename,
1185 }
1186 if path in self._file_provenance:
1187 # Made by workflow run, merge captured provenance
1188 aggregate_dict["bundledAs"].update(self._file_provenance[path])
1189 else:
1190 # Probably made outside wf run, part of job object?
1191 pass
1192 if path in self._content_types:
1193 aggregate_dict["mediatype"] = self._content_types[path]
1194
1195 aggregates.append(aggregate_dict)
1196
1197 for path in self.tagfiles:
1198 if (not (path.startswith(METADATA) or path.startswith(WORKFLOW) or
1199 path.startswith(SNAPSHOT))):
1200 # probably a bagit file
1201 continue
1202 if path == PurePosixPath(METADATA)/"manifest.json":
1203 # Should not really be there yet! But anyway, we won't
1204 # aggregate it.
1205 continue
1206
1207 rel_aggregates = {} # type: Dict[Text, Any]
1208 # These are local paths like metadata/provenance - but
1209 # we need to relativize them for our current directory for
1210 # as we are saved in metadata/manifest.json
1211 uri = str(Path(os.pardir)/path)
1212
1213 rel_aggregates["uri"] = uri
1214 rel_aggregates.update(guess_mediatype(path))
1215
1216 if path in self._file_provenance:
1217 # Propagate file provenance (e.g. timestamp)
1218 rel_aggregates.update(self._file_provenance[path])
1219 elif not path.startswith(SNAPSHOT):
1220 # make new timestamp?
1221 rel_aggregates.update(self._self_made())
1222 aggregates.append(rel_aggregates)
1223 aggregates.extend(self._external_aggregates)
1224 return aggregates
1225
1226 def add_uri(self, uri, timestamp=None):
1227 # type: (str, Optional[datetime.datetime]) -> Dict[Text, Any]
1228 self.self_check()
1229 aggr = self._self_made(timestamp=timestamp)
1230 aggr["uri"] = uri
1231 self._external_aggregates.append(aggr)
1232 return aggr
1233
1234 def add_annotation(self, about, content, motivated_by="oa:describing"):
1235 # type: (str, List[str], str) -> str
1236 """Cheap URI relativize for current directory and /."""
1237 self.self_check()
1238 curr = self.base_uri + METADATA + "/"
1239 content = [c.replace(curr, "").replace(self.base_uri, "../")
1240 for c in content]
1241 uri = uuid.uuid4().urn
1242 ann = {
1243 u"uri": uri,
1244 u"about": about,
1245 u"content": content,
1246 u"oa:motivatedBy": {"@id": motivated_by}
1247 }
1248 self.annotations.append(ann)
1249 return uri
1250
1251 def _ro_annotations(self):
1252 # type: () -> List[Dict[Text, Any]]
1253 annotations = [] # type: List[Dict[Text, Any]]
1254 annotations.append({
1255 "uri": uuid.uuid4().urn,
1256 "about": self.ro_uuid.urn,
1257 "content": "/",
1258 # https://www.w3.org/TR/annotation-vocab/#named-individuals
1259 "oa:motivatedBy": {"@id": "oa:describing"}
1260 })
1261
1262 # How was it run?
1263 # FIXME: Only primary*
1264 prov_files = [str(PurePosixPath(p).relative_to(METADATA)) for p in self.tagfiles
1265 if p.startswith(_posix_path(PROVENANCE))
1266 and "/primary." in p]
1267 annotations.append({
1268 "uri": uuid.uuid4().urn,
1269 "about": self.ro_uuid.urn,
1270 "content": prov_files,
1271 # Modulation of https://www.w3.org/TR/prov-aq/
1272 "oa:motivatedBy": {"@id": "http://www.w3.org/ns/prov#has_provenance"}
1273 })
1274
1275 # Where is the main workflow?
1276 annotations.append({
1277 "uri": uuid.uuid4().urn,
1278 "about": str(PurePosixPath("..")/WORKFLOW/"packed.cwl"),
1279 "oa:motivatedBy": {"@id": "oa:highlighting"}
1280 })
1281
1282 annotations.append({
1283 "uri": uuid.uuid4().urn,
1284 "about": self.ro_uuid.urn,
1285 "content": [str(PurePosixPath("..")/WORKFLOW/"packed.cwl"),
1286 str(PurePosixPath("..")/WORKFLOW/"primary-job.json")],
1287 "oa:motivatedBy": {"@id": "oa:linking"}
1288 })
1289 # Add user-added annotations at end
1290 annotations.extend(self.annotations)
1291 return annotations
1292
1293 def _authored_by(self):
1294 # type: () -> Dict[Text, Any]
1295 authored_by = {}
1296 if self.orcid:
1297 authored_by["orcid"] = self.orcid
1298 if self.full_name:
1299 authored_by["name"] = self.full_name
1300 if not self.orcid:
1301 authored_by["uri"] = USER_UUID
1302
1303 if authored_by:
1304 return {"authoredBy": authored_by}
1305 return {}
1306
1307
1308 def _write_ro_manifest(self):
1309 # type: () -> None
1310
1311 # Does not have to be this order, but it's nice to be consistent
1312 manifest = OrderedDict() # type: Dict[Text, Any]
1313 manifest["@context"] = [
1314 {"@base": "%s%s/" % (self.base_uri, _posix_path(METADATA))},
1315 "https://w3id.org/bundle/context"
1316 ]
1317 manifest["id"] = "/"
1318 manifest["conformsTo"] = CWLPROV_VERSION
1319 filename = "manifest.json"
1320 manifest["manifest"] = filename
1321 manifest.update(self._self_made())
1322 manifest.update(self._authored_by())
1323 manifest["aggregates"] = self._ro_aggregates()
1324 manifest["annotations"] = self._ro_annotations()
1325
1326 json_manifest = json_dumps(manifest, indent=4, ensure_ascii=False)
1327 rel_path = str(PurePosixPath(METADATA)/filename)
1328 json_manifest += "\n"
1329 with self.write_bag_file(rel_path) as manifest_file:
1330 manifest_file.write(json_manifest)
1331
1332 def _write_bag_info(self):
1333 # type: () -> None
1334
1335 with self.write_bag_file("bag-info.txt") as info_file:
1336 info_file.write(u"Bag-Software-Agent: %s\n" % self.cwltool_version)
1337 # FIXME: require sha-512 of payload to comply with profile?
1338 # FIXME: Update profile
1339 info_file.write(u"BagIt-Profile-Identifier: https://w3id.org/ro/bagit/profile\n")
1340 info_file.write(u"Bagging-Date: %s\n" % datetime.date.today().isoformat())
1341 info_file.write(u"External-Description: Research Object of CWL workflow run\n")
1342 if self.full_name:
1343 info_file.write(u"Contact-Name: %s\n" % self.full_name)
1344
1345 # NOTE: We can't use the urn:uuid:{UUID} of the workflow run (a prov:Activity)
1346 # as identifier for the RO/bagit (a prov:Entity). However the arcp base URI is good.
1347 info_file.write(u"External-Identifier: %s\n" % self.base_uri)
1348
1349 # Calculate size of data/ (assuming no external fetch.txt files)
1350 total_size = sum(self.bagged_size.values())
1351 num_files = len(self.bagged_size)
1352 info_file.write(u"Payload-Oxum: %d.%d\n" % (total_size, num_files))
1353 _logger.debug(u"[provenance] Generated bagit metadata: %s",
1354 self.folder)
1355
1356 def generate_snapshot(self, prov_dep):
1357 # type: (MutableMapping[Text, Any]) -> None
1358 """Copy all of the CWL files to the snapshot/ directory."""
1359 self.self_check()
1360 for key, value in prov_dep.items():
1361 if key == "location" and value.split("/")[-1]:
1362 filename = value.split("/")[-1]
1363 path = os.path.join(self.folder, SNAPSHOT, filename)
1364 filepath = ''
1365 if "file://" in value:
1366 filepath = value[7:]
1367 else:
1368 filepath = value
1369
1370 # FIXME: What if destination path already exists?
1371 if os.path.exists(filepath):
1372 try:
1373 if os.path.isdir(filepath):
1374 shutil.copytree(filepath, path)
1375 else:
1376 shutil.copy(filepath, path)
1377 timestamp = datetime.datetime.fromtimestamp(os.path.getmtime(filepath))
1378 self.add_tagfile(path, timestamp)
1379 except PermissionError:
1380 pass # FIXME: avoids duplicate snapshotting; need better solution
1381 elif key in ("secondaryFiles", "listing"):
1382 for files in value:
1383 if isinstance(files, MutableMapping):
1384 self.generate_snapshot(files)
1385 else:
1386 pass
1387
1388 def packed_workflow(self, packed): # type: (Text) -> None
1389 """Pack CWL description to generate re-runnable CWL object in RO."""
1390 self.self_check()
1391 rel_path = str(PurePosixPath(WORKFLOW)/"packed.cwl")
1392 # Write as binary
1393 with self.write_bag_file(rel_path, encoding=None) as write_pack:
1394 # YAML is always UTF8, but json.dumps gives us str in py2
1395 write_pack.write(packed.encode(ENCODING))
1396 _logger.debug(u"[provenance] Added packed workflow: %s", rel_path)
1397
1398 def has_data_file(self, sha1hash): # type: (str) -> bool
1399 """Confirm the presence of the given file in the RO."""
1400 folder = os.path.join(self.folder, DATA, sha1hash[0:2])
1401 hash_path = os.path.join(folder, sha1hash)
1402 return os.path.isfile(hash_path)
1403
1404 def add_data_file(self, from_fp, timestamp=None, content_type=None):
1405 # type: (IO[Any], Optional[datetime.datetime], Optional[str]) -> Text
1406 """Copy inputs to data/ folder."""
1407 self.self_check()
1408 tmp_dir, tmp_prefix = os.path.split(self.temp_prefix)
1409 with tempfile.NamedTemporaryFile(
1410 prefix=tmp_prefix, dir=tmp_dir, delete=False) as tmp:
1411 checksum = checksum_copy(from_fp, tmp)
1412
1413 # Calculate hash-based file path
1414 folder = os.path.join(self.folder, DATA, checksum[0:2])
1415 path = os.path.join(folder, checksum)
1416 # os.rename assumed safe, as our temp file should
1417 # be in same file system as our temp folder
1418 if not os.path.isdir(folder):
1419 os.makedirs(folder)
1420 os.rename(tmp.name, path)
1421
1422 # Relative posix path
1423 # (to avoid \ on Windows)
1424 rel_path = _posix_path(os.path.relpath(path, self.folder))
1425
1426 # Register in bagit checksum
1427 if Hasher == hashlib.sha1:
1428 self._add_to_bagit(rel_path, sha1=checksum)
1429 else:
1430 _logger.warning(
1431 u"[provenance] Unknown hash method %s for bagit manifest",
1432 Hasher)
1433 # Inefficient, bagit support need to checksum again
1434 self._add_to_bagit(rel_path)
1435 _logger.debug(u"[provenance] Added data file %s", path)
1436 if timestamp is not None:
1437 self._file_provenance[rel_path] = self._self_made(timestamp)
1438 _logger.debug(u"[provenance] Relative path for data file %s", rel_path)
1439
1440 if content_type is not None:
1441 self._content_types[rel_path] = content_type
1442 return rel_path
1443
1444 def _self_made(self, timestamp=None):
1445 # type: (Optional[datetime.datetime]) -> Dict[Text, Any]
1446 if timestamp is None:
1447 timestamp = datetime.datetime.now()
1448 return {
1449 "createdOn": timestamp.isoformat(),
1450 "createdBy": {"uri": self.engine_uuid,
1451 "name": self.cwltool_version}
1452 }
1453
1454 def add_to_manifest(self, rel_path, checksums):
1455 # type: (Text, Dict[str,str]) -> None
1456 """Add files to the research object manifest."""
1457 self.self_check()
1458 if PurePosixPath(rel_path).is_absolute():
1459 raise ValueError("rel_path must be relative: %s" % rel_path)
1460
1461 if os.path.commonprefix(["data/", rel_path]) == "data/":
1462 # payload file, go to manifest
1463 manifest = "manifest"
1464 else:
1465 # metadata file, go to tag manifest
1466 manifest = "tagmanifest"
1467
1468 # Add checksums to corresponding manifest files
1469 for (method, hash_value) in checksums.items():
1470 # File not in manifest because we bailed out on
1471 # existence in bagged_size above
1472 manifestpath = os.path.join(
1473 self.folder, "%s-%s.txt" % (manifest, method.lower()))
1474 # encoding: match Tag-File-Character-Encoding: UTF-8
1475 # newline: ensure LF also on Windows
1476 with open(manifestpath, "a", encoding=ENCODING, newline='\n') \
1477 as checksum_file:
1478 line = u"%s %s\n" % (hash_value, rel_path)
1479 _logger.debug(u"[provenance] Added to %s: %s", manifestpath, line)
1480 checksum_file.write(line)
1481
1482
1483 def _add_to_bagit(self, rel_path, **checksums):
1484 # type: (Text, Any) -> None
1485 if PurePosixPath(rel_path).is_absolute():
1486 raise ValueError("rel_path must be relative: %s" % rel_path)
1487 local_path = os.path.join(self.folder, _local_path(rel_path))
1488 if not os.path.exists(local_path):
1489 raise IOError("File %s does not exist within RO: %s" % (rel_path, local_path))
1490
1491 if rel_path in self.bagged_size:
1492 # Already added, assume checksum OK
1493 return
1494 self.bagged_size[rel_path] = os.path.getsize(local_path)
1495
1496 if SHA1 not in checksums:
1497 # ensure we always have sha1
1498 checksums = dict(checksums)
1499 with open(local_path, "rb") as file_path:
1500 # FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile?
1501 checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1)
1502
1503 self.add_to_manifest(rel_path, checksums)
1504
1505 def create_job(self,
1506 builder_job, # type: Dict[Text, Any]
1507 wf_job=None, # type: Optional[Callable[[Dict[Text, Text], Callable[[Any, Any], Any], RuntimeContext], Generator[Any, None, None]]]
1508 is_output=False # type: bool
1509 ): # type: (...) -> Dict[Text, Text]
1510 #TODO customise the file
1511 """Generate the new job object with RO specific relative paths."""
1512 copied = copy.deepcopy(builder_job)
1513 relativised_input_objecttemp = {} # type: Dict[Text, Any]
1514 self._relativise_files(copied)
1515 def jdefault(o): # type: (Any) -> Dict[Any, Any]
1516 return dict(o)
1517 if is_output:
1518 rel_path = PurePosixPath(WORKFLOW)/"primary-output.json"
1519 else:
1520 rel_path = PurePosixPath(WORKFLOW)/"primary-job.json"
1521 j = json_dumps(copied, indent=4, ensure_ascii=False, default=jdefault)
1522 with self.write_bag_file(str(rel_path)) as file_path:
1523 file_path.write(j + u"\n")
1524 _logger.debug(u"[provenance] Generated customised job file: %s",
1525 rel_path)
1526 # Generate dictionary with keys as workflow level input IDs and values
1527 # as
1528 # 1) for files the relativised location containing hash
1529 # 2) for other attributes, the actual value.
1530 relativised_input_objecttemp = {}
1531 for key, value in copied.items():
1532 if isinstance(value, MutableMapping):
1533 if value.get("class") in ("File", "Directory"):
1534 relativised_input_objecttemp[key] = value
1535 else:
1536 relativised_input_objecttemp[key] = value
1537 self.relativised_input_object.update(
1538 {k: v for k, v in relativised_input_objecttemp.items() if v})
1539 return self.relativised_input_object
1540
1541 def _relativise_files(self, structure):
1542 # type: (Dict[Any, Any]) -> None
1543 """Save any file objects into the RO and update the local paths."""
1544 # Base case - we found a File we need to update
1545 _logger.debug(u"[provenance] Relativising: %s", structure)
1546
1547 if isinstance(structure, MutableMapping):
1548 if structure.get("class") == "File":
1549 relative_path = None
1550 if "checksum" in structure:
1551 alg, checksum = structure["checksum"].split("$")
1552 if alg != SHA1:
1553 raise TypeError(
1554 "Only SHA1 CWL checksums are currently supported: "
1555 "{}".format(structure))
1556 if self.has_data_file(checksum):
1557 prefix = checksum[0:2]
1558 relative_path = PurePosixPath("data")/prefix/checksum
1559
1560 if not relative_path is not None and "location" in structure:
1561 # Register in RO; but why was this not picked
1562 # up by used_artefacts?
1563 _logger.info("[provenance] Adding to RO %s", structure["location"])
1564 with self.fsaccess.open(structure["location"], "rb") as fp:
1565 relative_path = self.add_data_file(fp)
1566 checksum = PurePosixPath(relative_path).name
1567 structure["checksum"] = "%s$%s" % (SHA1, checksum)
1568 if relative_path is not None:
1569 # RO-relative path as new location
1570 structure["location"] = str(PurePosixPath("..")/relative_path)
1571 else:
1572 _logger.warning("Could not determine RO path for file %s", structure)
1573 if "path" in structure:
1574 del structure["path"]
1575
1576 if structure.get("class") == "Directory":
1577 # TODO: Generate anonymoys Directory with a "listing"
1578 # pointing to the hashed files
1579 del structure["location"]
1580
1581 for val in structure.values():
1582 try:
1583 self._relativise_files(val)
1584 except OSError:
1585 pass
1586 return
1587
1588 if isinstance(structure, (str, Text)):
1589 # Just a string value, no need to iterate further
1590 return
1591 try:
1592 for obj in iter(structure):
1593 # Recurse and rewrite any nested File objects
1594 self._relativise_files(obj)
1595 except TypeError:
1596 pass
1597
1598 def close(self, save_to=None):
1599 # type: (Optional[str]) -> None
1600 """Close the Research Object, optionally saving to specified folder.
1601
1602 Closing will remove any temporary files used by this research object.
1603 After calling this method, this ResearchObject instance can no longer
1604 be used, except for no-op calls to .close().
1605
1606 The 'saveTo' folder should not exist - if it does, it will be deleted.
1607
1608 It is safe to call this function multiple times without the
1609 'saveTo' argument, e.g. within a try..finally block to
1610 ensure the temporary files of this Research Object are removed.
1611 """
1612 if save_to is None:
1613 if not self.closed:
1614 _logger.debug(u"[provenance] Deleting temporary %s", self.folder)
1615 shutil.rmtree(self.folder, ignore_errors=True)
1616 else:
1617 save_to = os.path.abspath(save_to)
1618 _logger.info(u"[provenance] Finalizing Research Object")
1619 self._finalize() # write manifest etc.
1620 # TODO: Write as archive (.zip or .tar) based on extension?
1621
1622 if os.path.isdir(save_to):
1623 _logger.info(u"[provenance] Deleting existing %s", save_to)
1624 shutil.rmtree(save_to)
1625 shutil.move(self.folder, save_to)
1626 _logger.info(u"[provenance] Research Object saved to %s", save_to)
1627 self.folder = save_to
1628 self.closed = True
1629
1630 def checksum_copy(src_file, # type: IO[Any]
1631 dst_file=None, # type: Optional[IO[Any]]
1632 hasher=Hasher, # type: Callable[[], hashlib._Hash]
1633 buffersize=1024*1024 # type: int
1634 ): # type: (...) -> str
1635 """Compute checksums while copying a file."""
1636 # TODO: Use hashlib.new(Hasher_str) instead?
1637 checksum = hasher()
1638 contents = src_file.read(buffersize)
1639 if dst_file and hasattr(dst_file, "name") and hasattr(src_file, "name"):
1640 temp_location = os.path.join(os.path.dirname(dst_file.name),
1641 str(uuid.uuid4()))
1642 try:
1643 os.rename(dst_file.name, temp_location)
1644 os.link(src_file.name, dst_file.name)
1645 dst_file = None
1646 os.unlink(temp_location)
1647 except OSError:
1648 pass
1649 if os.path.exists(temp_location):
1650 os.rename(temp_location, dst_file.name) # type: ignore
1651 while contents != b"":
1652 if dst_file is not None:
1653 dst_file.write(contents)
1654 checksum.update(contents)
1655 contents = src_file.read(buffersize)
1656 if dst_file is not None:
1657 dst_file.flush()
1658 return checksum.hexdigest().lower()
1659
1660 def copy_job_order(job, job_order_object):
1661 # type: (Any, Any) -> Any
1662 """Create copy of job object for provenance."""
1663 if not hasattr(job, "tool"):
1664 # direct command line tool execution
1665 return job_order_object
1666 customised_job = {} # new job object for RO
1667 for each, i in enumerate(job.tool["inputs"]):
1668 with SourceLine(job.tool["inputs"], each, WorkflowException,
1669 _logger.isEnabledFor(logging.DEBUG)):
1670 iid = shortname(i["id"])
1671 if iid in job_order_object:
1672 customised_job[iid] = copy.deepcopy(job_order_object[iid])
1673 # add the input element in dictionary for provenance
1674 elif "default" in i:
1675 customised_job[iid] = copy.deepcopy(i["default"])
1676 # add the default elements in the dictionary for provenance
1677 else:
1678 pass
1679 return customised_job