comparison env/lib/python3.9/site-packages/cwltool/provenance_profile.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import copy
2 import datetime
3 import logging
4 import urllib
5 import uuid
6 from io import BytesIO
7 from pathlib import PurePath, PurePosixPath
8 from socket import getfqdn
9 from typing import List, MutableMapping, MutableSequence, Optional, Tuple, Union, cast
10
11 from prov.identifier import Identifier
12 from prov.model import PROV, PROV_LABEL, PROV_TYPE, PROV_VALUE, ProvDocument, ProvEntity
13 from schema_salad.sourceline import SourceLine
14 from typing_extensions import TYPE_CHECKING
15
16 from .errors import WorkflowException
17 from .job import CommandLineJob, JobBase
18 from .loghandler import _logger
19 from .process import Process, shortname
20 from .provenance_constants import (
21 ACCOUNT_UUID,
22 CWLPROV,
23 ENCODING,
24 FOAF,
25 METADATA,
26 ORE,
27 PROVENANCE,
28 RO,
29 SCHEMA,
30 SHA1,
31 SHA256,
32 TEXT_PLAIN,
33 UUID,
34 WF4EVER,
35 WFDESC,
36 WFPROV,
37 )
38 from .stdfsaccess import StdFsAccess
39 from .utils import (
40 CWLObjectType,
41 CWLOutputType,
42 JobsType,
43 get_listing,
44 posix_path,
45 versionstring,
46 )
47 from .workflow_job import WorkflowJob
48
49 if TYPE_CHECKING:
50 from .provenance import ResearchObject
51
52
53 def copy_job_order(
54 job: Union[Process, JobsType], job_order_object: CWLObjectType
55 ) -> CWLObjectType:
56 """Create copy of job object for provenance."""
57 if not isinstance(job, WorkflowJob):
58 # direct command line tool execution
59 return job_order_object
60 customised_job = {} # type: CWLObjectType
61 # new job object for RO
62 for each, i in enumerate(job.tool["inputs"]):
63 with SourceLine(
64 job.tool["inputs"],
65 each,
66 WorkflowException,
67 _logger.isEnabledFor(logging.DEBUG),
68 ):
69 iid = shortname(i["id"])
70 if iid in job_order_object:
71 customised_job[iid] = copy.deepcopy(job_order_object[iid])
72 # add the input element in dictionary for provenance
73 elif "default" in i:
74 customised_job[iid] = copy.deepcopy(i["default"])
75 # add the default elements in the dictionary for provenance
76 else:
77 pass
78 return customised_job
79
80
81 class ProvenanceProfile:
82 """
83 Provenance profile.
84
85 Populated as the workflow runs.
86 """
87
88 def __init__(
89 self,
90 research_object: "ResearchObject",
91 full_name: str,
92 host_provenance: bool,
93 user_provenance: bool,
94 orcid: str,
95 fsaccess: StdFsAccess,
96 run_uuid: Optional[uuid.UUID] = None,
97 ) -> None:
98 """Initialize the provenance profile."""
99 self.fsaccess = fsaccess
100 self.orcid = orcid
101 self.research_object = research_object
102 self.folder = self.research_object.folder
103 self.document = ProvDocument()
104 self.host_provenance = host_provenance
105 self.user_provenance = user_provenance
106 self.engine_uuid = research_object.engine_uuid # type: str
107 self.add_to_manifest = self.research_object.add_to_manifest
108 if self.orcid:
109 _logger.debug("[provenance] Creator ORCID: %s", self.orcid)
110 self.full_name = full_name
111 if self.full_name:
112 _logger.debug("[provenance] Creator Full name: %s", self.full_name)
113 self.workflow_run_uuid = run_uuid or uuid.uuid4()
114 self.workflow_run_uri = self.workflow_run_uuid.urn # type: str
115 self.generate_prov_doc()
116
117 def __str__(self) -> str:
118 """Represent this Provenvance profile as a string."""
119 return "ProvenanceProfile <{}> in <{}>".format(
120 self.workflow_run_uri,
121 self.research_object,
122 )
123
124 def generate_prov_doc(self) -> Tuple[str, ProvDocument]:
125 """Add basic namespaces."""
126
127 def host_provenance(document: ProvDocument) -> None:
128 """Record host provenance."""
129 document.add_namespace(CWLPROV)
130 document.add_namespace(UUID)
131 document.add_namespace(FOAF)
132
133 hostname = getfqdn()
134 # won't have a foaf:accountServiceHomepage for unix hosts, but
135 # we can at least provide hostname
136 document.agent(
137 ACCOUNT_UUID,
138 {
139 PROV_TYPE: FOAF["OnlineAccount"],
140 "prov:location": hostname,
141 CWLPROV["hostname"]: hostname,
142 },
143 )
144
145 self.cwltool_version = "cwltool %s" % versionstring().split()[-1]
146 self.document.add_namespace("wfprov", "http://purl.org/wf4ever/wfprov#")
147 # document.add_namespace('prov', 'http://www.w3.org/ns/prov#')
148 self.document.add_namespace("wfdesc", "http://purl.org/wf4ever/wfdesc#")
149 # TODO: Make this ontology. For now only has cwlprov:image
150 self.document.add_namespace("cwlprov", "https://w3id.org/cwl/prov#")
151 self.document.add_namespace("foaf", "http://xmlns.com/foaf/0.1/")
152 self.document.add_namespace("schema", "http://schema.org/")
153 self.document.add_namespace("orcid", "https://orcid.org/")
154 self.document.add_namespace("id", "urn:uuid:")
155 # NOTE: Internet draft expired 2004-03-04 (!)
156 # https://tools.ietf.org/html/draft-thiemann-hash-urn-01
157 # TODO: Change to nih:sha-256; hashes
158 # https://tools.ietf.org/html/rfc6920#section-7
159 self.document.add_namespace("data", "urn:hash::sha1:")
160 # Also needed for docker images
161 self.document.add_namespace(SHA256, "nih:sha-256;")
162
163 # info only, won't really be used by prov as sub-resources use /
164 self.document.add_namespace("researchobject", self.research_object.base_uri)
165 # annotations
166 self.metadata_ns = self.document.add_namespace(
167 "metadata", self.research_object.base_uri + METADATA + "/"
168 )
169 # Pre-register provenance directory so we can refer to its files
170 self.provenance_ns = self.document.add_namespace(
171 "provenance", self.research_object.base_uri + posix_path(PROVENANCE) + "/"
172 )
173 ro_identifier_workflow = self.research_object.base_uri + "workflow/packed.cwl#"
174 self.wf_ns = self.document.add_namespace("wf", ro_identifier_workflow)
175 ro_identifier_input = (
176 self.research_object.base_uri + "workflow/primary-job.json#"
177 )
178 self.document.add_namespace("input", ro_identifier_input)
179
180 # More info about the account (e.g. username, fullname)
181 # may or may not have been previously logged by user_provenance()
182 # .. but we always know cwltool was launched (directly or indirectly)
183 # by a user account, as cwltool is a command line tool
184 account = self.document.agent(ACCOUNT_UUID)
185 if self.orcid or self.full_name:
186 person = {PROV_TYPE: PROV["Person"], "prov:type": SCHEMA["Person"]}
187 if self.full_name:
188 person["prov:label"] = self.full_name
189 person["foaf:name"] = self.full_name
190 person["schema:name"] = self.full_name
191 else:
192 # TODO: Look up name from ORCID API?
193 pass
194 agent = self.document.agent(self.orcid or uuid.uuid4().urn, person)
195 self.document.actedOnBehalfOf(account, agent)
196 else:
197 if self.host_provenance:
198 host_provenance(self.document)
199 if self.user_provenance:
200 self.research_object.user_provenance(self.document)
201 # The execution of cwltool
202 wfengine = self.document.agent(
203 self.engine_uuid,
204 {
205 PROV_TYPE: PROV["SoftwareAgent"],
206 "prov:type": WFPROV["WorkflowEngine"],
207 "prov:label": self.cwltool_version,
208 },
209 )
210 # FIXME: This datetime will be a bit too delayed, we should
211 # capture when cwltool.py earliest started?
212 self.document.wasStartedBy(wfengine, None, account, datetime.datetime.now())
213 # define workflow run level activity
214 self.document.activity(
215 self.workflow_run_uri,
216 datetime.datetime.now(),
217 None,
218 {
219 PROV_TYPE: WFPROV["WorkflowRun"],
220 "prov:label": "Run of workflow/packed.cwl#main",
221 },
222 )
223 # association between SoftwareAgent and WorkflowRun
224 main_workflow = "wf:main"
225 self.document.wasAssociatedWith(
226 self.workflow_run_uri, self.engine_uuid, main_workflow
227 )
228 self.document.wasStartedBy(
229 self.workflow_run_uri, None, self.engine_uuid, datetime.datetime.now()
230 )
231 return (self.workflow_run_uri, self.document)
232
233 def evaluate(
234 self,
235 process: Process,
236 job: JobsType,
237 job_order_object: CWLObjectType,
238 research_obj: "ResearchObject",
239 ) -> None:
240 """Evaluate the nature of job."""
241 if not hasattr(process, "steps"):
242 # record provenance of independent commandline tool executions
243 self.prospective_prov(job)
244 customised_job = copy_job_order(job, job_order_object)
245 self.used_artefacts(customised_job, self.workflow_run_uri)
246 research_obj.create_job(customised_job)
247 elif hasattr(job, "workflow"):
248 # record provenance of workflow executions
249 self.prospective_prov(job)
250 customised_job = copy_job_order(job, job_order_object)
251 self.used_artefacts(customised_job, self.workflow_run_uri)
252
253 def record_process_start(
254 self, process: Process, job: JobsType, process_run_id: Optional[str] = None
255 ) -> Optional[str]:
256 if not hasattr(process, "steps"):
257 process_run_id = self.workflow_run_uri
258 elif not hasattr(job, "workflow"):
259 # commandline tool execution as part of workflow
260 name = ""
261 if isinstance(job, (CommandLineJob, JobBase, WorkflowJob)):
262 name = job.name
263 process_name = urllib.parse.quote(name, safe=":/,#")
264 process_run_id = self.start_process(process_name, datetime.datetime.now())
265 return process_run_id
266
267 def start_process(
268 self,
269 process_name: str,
270 when: datetime.datetime,
271 process_run_id: Optional[str] = None,
272 ) -> str:
273 """Record the start of each Process."""
274 if process_run_id is None:
275 process_run_id = uuid.uuid4().urn
276 prov_label = "Run of workflow/packed.cwl#main/" + process_name
277 self.document.activity(
278 process_run_id,
279 None,
280 None,
281 {PROV_TYPE: WFPROV["ProcessRun"], PROV_LABEL: prov_label},
282 )
283 self.document.wasAssociatedWith(
284 process_run_id, self.engine_uuid, str("wf:main/" + process_name)
285 )
286 self.document.wasStartedBy(
287 process_run_id, None, self.workflow_run_uri, when, None, None
288 )
289 return process_run_id
290
291 def record_process_end(
292 self,
293 process_name: str,
294 process_run_id: str,
295 outputs: Union[CWLObjectType, MutableSequence[CWLObjectType], None],
296 when: datetime.datetime,
297 ) -> None:
298 self.generate_output_prov(outputs, process_run_id, process_name)
299 self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when)
300
301 def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, str]:
302 if value["class"] != "File":
303 raise ValueError("Must have class:File: %s" % value)
304 # Need to determine file hash aka RO filename
305 entity = None # type: Optional[ProvEntity]
306 checksum = None
307 if "checksum" in value:
308 csum = cast(str, value["checksum"])
309 (method, checksum) = csum.split("$", 1)
310 if method == SHA1 and self.research_object.has_data_file(checksum):
311 entity = self.document.entity("data:" + checksum)
312
313 if not entity and "location" in value:
314 location = str(value["location"])
315 # If we made it here, we'll have to add it to the RO
316 with self.fsaccess.open(location, "rb") as fhandle:
317 relative_path = self.research_object.add_data_file(fhandle)
318 # FIXME: This naively relies on add_data_file setting hash as filename
319 checksum = PurePath(relative_path).name
320 entity = self.document.entity(
321 "data:" + checksum, {PROV_TYPE: WFPROV["Artifact"]}
322 )
323 if "checksum" not in value:
324 value["checksum"] = f"{SHA1}${checksum}"
325
326 if not entity and "contents" in value:
327 # Anonymous file, add content as string
328 entity, checksum = self.declare_string(cast(str, value["contents"]))
329
330 # By here one of them should have worked!
331 if not entity or not checksum:
332 raise ValueError(
333 "class:File but missing checksum/location/content: %r" % value
334 )
335
336 # Track filename and extension, this is generally useful only for
337 # secondaryFiles. Note that multiple uses of a file might thus record
338 # different names for the same entity, so we'll
339 # make/track a specialized entity by UUID
340 file_id = value.setdefault("@id", uuid.uuid4().urn)
341 # A specialized entity that has just these names
342 file_entity = self.document.entity(
343 file_id,
344 [(PROV_TYPE, WFPROV["Artifact"]), (PROV_TYPE, WF4EVER["File"])],
345 ) # type: ProvEntity
346
347 if "basename" in value:
348 file_entity.add_attributes({CWLPROV["basename"]: value["basename"]})
349 if "nameroot" in value:
350 file_entity.add_attributes({CWLPROV["nameroot"]: value["nameroot"]})
351 if "nameext" in value:
352 file_entity.add_attributes({CWLPROV["nameext"]: value["nameext"]})
353 self.document.specializationOf(file_entity, entity)
354
355 # Check for secondaries
356 for sec in cast(
357 MutableSequence[CWLObjectType], value.get("secondaryFiles", [])
358 ):
359 # TODO: Record these in a specializationOf entity with UUID?
360 if sec["class"] == "File":
361 (sec_entity, _, _) = self.declare_file(sec)
362 elif sec["class"] == "Directory":
363 sec_entity = self.declare_directory(sec)
364 else:
365 raise ValueError(f"Got unexpected secondaryFiles value: {sec}")
366 # We don't know how/when/where the secondary file was generated,
367 # but CWL convention is a kind of summary/index derived
368 # from the original file. As its generally in a different format
369 # then prov:Quotation is not appropriate.
370 self.document.derivation(
371 sec_entity,
372 file_entity,
373 other_attributes={PROV["type"]: CWLPROV["SecondaryFile"]},
374 )
375
376 return file_entity, entity, checksum
377
378 def declare_directory(self, value: CWLObjectType) -> ProvEntity:
379 """Register any nested files/directories."""
380 # FIXME: Calculate a hash-like identifier for directory
381 # so we get same value if it's the same filenames/hashes
382 # in a different location.
383 # For now, mint a new UUID to identify this directory, but
384 # attempt to keep it inside the value dictionary
385 dir_id = cast(str, value.setdefault("@id", uuid.uuid4().urn))
386
387 # New annotation file to keep the ORE Folder listing
388 ore_doc_fn = dir_id.replace("urn:uuid:", "directory-") + ".ttl"
389 dir_bundle = self.document.bundle(self.metadata_ns[ore_doc_fn])
390
391 coll = self.document.entity(
392 dir_id,
393 [
394 (PROV_TYPE, WFPROV["Artifact"]),
395 (PROV_TYPE, PROV["Collection"]),
396 (PROV_TYPE, PROV["Dictionary"]),
397 (PROV_TYPE, RO["Folder"]),
398 ],
399 )
400 # ORE description of ro:Folder, saved separately
401 coll_b = dir_bundle.entity(
402 dir_id,
403 [(PROV_TYPE, RO["Folder"]), (PROV_TYPE, ORE["Aggregation"])],
404 )
405 self.document.mentionOf(dir_id + "#ore", dir_id, dir_bundle.identifier)
406
407 # dir_manifest = dir_bundle.entity(
408 # dir_bundle.identifier, {PROV["type"]: ORE["ResourceMap"],
409 # ORE["describes"]: coll_b.identifier})
410
411 coll_attribs = [(ORE["isDescribedBy"], dir_bundle.identifier)]
412 coll_b_attribs = [] # type: List[Tuple[Identifier, ProvEntity]]
413
414 # FIXME: .listing might not be populated yet - hopefully
415 # a later call to this method will sort that
416 is_empty = True
417
418 if "listing" not in value:
419 get_listing(self.fsaccess, value)
420 for entry in cast(MutableSequence[CWLObjectType], value.get("listing", [])):
421 is_empty = False
422 # Declare child-artifacts
423 entity = self.declare_artefact(entry)
424 self.document.membership(coll, entity)
425 # Membership relation aka our ORE Proxy
426 m_id = uuid.uuid4().urn
427 m_entity = self.document.entity(m_id)
428 m_b = dir_bundle.entity(m_id)
429
430 # PROV-O style Dictionary
431 # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition
432 # ..as prov.py do not currently allow PROV-N extensions
433 # like hadDictionaryMember(..)
434 m_entity.add_asserted_type(PROV["KeyEntityPair"])
435
436 m_entity.add_attributes(
437 {
438 PROV["pairKey"]: entry["basename"],
439 PROV["pairEntity"]: entity,
440 }
441 )
442
443 # As well as a being a
444 # http://wf4ever.github.io/ro/2016-01-28/ro/#FolderEntry
445 m_b.add_asserted_type(RO["FolderEntry"])
446 m_b.add_asserted_type(ORE["Proxy"])
447 m_b.add_attributes(
448 {
449 RO["entryName"]: entry["basename"],
450 ORE["proxyIn"]: coll,
451 ORE["proxyFor"]: entity,
452 }
453 )
454 coll_attribs.append((PROV["hadDictionaryMember"], m_entity))
455 coll_b_attribs.append((ORE["aggregates"], m_b))
456
457 coll.add_attributes(coll_attribs)
458 coll_b.add_attributes(coll_b_attribs)
459
460 # Also Save ORE Folder as annotation metadata
461 ore_doc = ProvDocument()
462 ore_doc.add_namespace(ORE)
463 ore_doc.add_namespace(RO)
464 ore_doc.add_namespace(UUID)
465 ore_doc.add_bundle(dir_bundle)
466 ore_doc = ore_doc.flattened()
467 ore_doc_path = str(PurePosixPath(METADATA, ore_doc_fn))
468 with self.research_object.write_bag_file(ore_doc_path) as provenance_file:
469 ore_doc.serialize(provenance_file, format="rdf", rdf_format="turtle")
470 self.research_object.add_annotation(
471 dir_id, [ore_doc_fn], ORE["isDescribedBy"].uri
472 )
473
474 if is_empty:
475 # Empty directory
476 coll.add_asserted_type(PROV["EmptyCollection"])
477 coll.add_asserted_type(PROV["EmptyDictionary"])
478 self.research_object.add_uri(coll.identifier.uri)
479 return coll
480
481 def declare_string(self, value: str) -> Tuple[ProvEntity, str]:
482 """Save as string in UTF-8."""
483 byte_s = BytesIO(str(value).encode(ENCODING))
484 data_file = self.research_object.add_data_file(byte_s, content_type=TEXT_PLAIN)
485 checksum = PurePosixPath(data_file).name
486 # FIXME: Don't naively assume add_data_file uses hash in filename!
487 data_id = "data:%s" % PurePosixPath(data_file).stem
488 entity = self.document.entity(
489 data_id, {PROV_TYPE: WFPROV["Artifact"], PROV_VALUE: str(value)}
490 ) # type: ProvEntity
491 return entity, checksum
492
493 def declare_artefact(self, value: Optional[CWLOutputType]) -> ProvEntity:
494 """Create data artefact entities for all file objects."""
495 if value is None:
496 # FIXME: If this can happen in CWL, we'll
497 # need a better way to represent this in PROV
498 return self.document.entity(CWLPROV["None"], {PROV_LABEL: "None"})
499
500 if isinstance(value, (bool, int, float)):
501 # Typically used in job documents for flags
502
503 # FIXME: Make consistent hash URIs for these
504 # that somehow include the type
505 # (so "1" != 1 != "1.0" != true)
506 entity = self.document.entity(uuid.uuid4().urn, {PROV_VALUE: value})
507 self.research_object.add_uri(entity.identifier.uri)
508 return entity
509
510 if isinstance(value, (str, str)):
511 (entity, _) = self.declare_string(value)
512 return entity
513
514 if isinstance(value, bytes):
515 # If we got here then we must be in Python 3
516 byte_s = BytesIO(value)
517 data_file = self.research_object.add_data_file(byte_s)
518 # FIXME: Don't naively assume add_data_file uses hash in filename!
519 data_id = "data:%s" % PurePosixPath(data_file).stem
520 return self.document.entity(
521 data_id,
522 {PROV_TYPE: WFPROV["Artifact"], PROV_VALUE: str(value)},
523 )
524
525 if isinstance(value, MutableMapping):
526 if "@id" in value:
527 # Already processed this value, but it might not be in this PROV
528 entities = self.document.get_record(value["@id"])
529 if entities:
530 return entities[0]
531 # else, unknown in PROV, re-add below as if it's fresh
532
533 # Base case - we found a File we need to update
534 if value.get("class") == "File":
535 (entity, _, _) = self.declare_file(value)
536 value["@id"] = entity.identifier.uri
537 return entity
538
539 if value.get("class") == "Directory":
540 entity = self.declare_directory(value)
541 value["@id"] = entity.identifier.uri
542 return entity
543 coll_id = value.setdefault("@id", uuid.uuid4().urn)
544 # some other kind of dictionary?
545 # TODO: also Save as JSON
546 coll = self.document.entity(
547 coll_id,
548 [
549 (PROV_TYPE, WFPROV["Artifact"]),
550 (PROV_TYPE, PROV["Collection"]),
551 (PROV_TYPE, PROV["Dictionary"]),
552 ],
553 )
554
555 if value.get("class"):
556 _logger.warning("Unknown data class %s.", value["class"])
557 # FIXME: The class might be "http://example.com/somethingelse"
558 coll.add_asserted_type(CWLPROV[value["class"]])
559
560 # Let's iterate and recurse
561 coll_attribs = [] # type: List[Tuple[Identifier, ProvEntity]]
562 for (key, val) in value.items():
563 v_ent = self.declare_artefact(val)
564 self.document.membership(coll, v_ent)
565 m_entity = self.document.entity(uuid.uuid4().urn)
566 # Note: only support PROV-O style dictionary
567 # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition
568 # as prov.py do not easily allow PROV-N extensions
569 m_entity.add_asserted_type(PROV["KeyEntityPair"])
570 m_entity.add_attributes(
571 {PROV["pairKey"]: str(key), PROV["pairEntity"]: v_ent}
572 )
573 coll_attribs.append((PROV["hadDictionaryMember"], m_entity))
574 coll.add_attributes(coll_attribs)
575 self.research_object.add_uri(coll.identifier.uri)
576 return coll
577
578 # some other kind of Collection?
579 # TODO: also save as JSON
580 try:
581 members = []
582 for each_input_obj in iter(value):
583 # Recurse and register any nested objects
584 e = self.declare_artefact(each_input_obj)
585 members.append(e)
586
587 # If we reached this, then we were allowed to iterate
588 coll = self.document.entity(
589 uuid.uuid4().urn,
590 [
591 (PROV_TYPE, WFPROV["Artifact"]),
592 (PROV_TYPE, PROV["Collection"]),
593 ],
594 )
595 if not members:
596 coll.add_asserted_type(PROV["EmptyCollection"])
597 else:
598 for member in members:
599 # FIXME: This won't preserve order, for that
600 # we would need to use PROV.Dictionary
601 # with numeric keys
602 self.document.membership(coll, member)
603 self.research_object.add_uri(coll.identifier.uri)
604 # FIXME: list value does not support adding "@id"
605 return coll
606 except TypeError:
607 _logger.warning("Unrecognized type %s of %r", type(value), value)
608 # Let's just fall back to Python repr()
609 entity = self.document.entity(uuid.uuid4().urn, {PROV_LABEL: repr(value)})
610 self.research_object.add_uri(entity.identifier.uri)
611 return entity
612
613 def used_artefacts(
614 self,
615 job_order: Union[CWLObjectType, List[CWLObjectType]],
616 process_run_id: str,
617 name: Optional[str] = None,
618 ) -> None:
619 """Add used() for each data artefact."""
620 if isinstance(job_order, list):
621 for entry in job_order:
622 self.used_artefacts(entry, process_run_id, name)
623 else:
624 # FIXME: Use workflow name in packed.cwl, "main" is wrong for nested workflows
625 base = "main"
626 if name is not None:
627 base += "/" + name
628 for key, value in job_order.items():
629 prov_role = self.wf_ns[f"{base}/{key}"]
630 try:
631 entity = self.declare_artefact(value)
632 self.document.used(
633 process_run_id,
634 entity,
635 datetime.datetime.now(),
636 None,
637 {"prov:role": prov_role},
638 )
639 except OSError:
640 pass
641
642 def generate_output_prov(
643 self,
644 final_output: Union[CWLObjectType, MutableSequence[CWLObjectType], None],
645 process_run_id: Optional[str],
646 name: Optional[str],
647 ) -> None:
648 """Call wasGeneratedBy() for each output,copy the files into the RO."""
649 if isinstance(final_output, MutableSequence):
650 for entry in final_output:
651 self.generate_output_prov(entry, process_run_id, name)
652 elif final_output is not None:
653 # Timestamp should be created at the earliest
654 timestamp = datetime.datetime.now()
655
656 # For each output, find/register the corresponding
657 # entity (UUID) and document it as generated in
658 # a role corresponding to the output
659 for output, value in final_output.items():
660 entity = self.declare_artefact(value)
661 if name is not None:
662 name = urllib.parse.quote(str(name), safe=":/,#")
663 # FIXME: Probably not "main" in nested workflows
664 role = self.wf_ns[f"main/{name}/{output}"]
665 else:
666 role = self.wf_ns["main/%s" % output]
667
668 if not process_run_id:
669 process_run_id = self.workflow_run_uri
670
671 self.document.wasGeneratedBy(
672 entity, process_run_id, timestamp, None, {"prov:role": role}
673 )
674
675 def prospective_prov(self, job: JobsType) -> None:
676 """Create prospective prov recording as wfdesc prov:Plan."""
677 if not isinstance(job, WorkflowJob):
678 # direct command line tool execution
679 self.document.entity(
680 "wf:main",
681 {
682 PROV_TYPE: WFDESC["Process"],
683 "prov:type": PROV["Plan"],
684 "prov:label": "Prospective provenance",
685 },
686 )
687 return
688
689 self.document.entity(
690 "wf:main",
691 {
692 PROV_TYPE: WFDESC["Workflow"],
693 "prov:type": PROV["Plan"],
694 "prov:label": "Prospective provenance",
695 },
696 )
697
698 for step in job.steps:
699 stepnametemp = "wf:main/" + str(step.name)[5:]
700 stepname = urllib.parse.quote(stepnametemp, safe=":/,#")
701 provstep = self.document.entity(
702 stepname,
703 {PROV_TYPE: WFDESC["Process"], "prov:type": PROV["Plan"]},
704 )
705 self.document.entity(
706 "wf:main",
707 {
708 "wfdesc:hasSubProcess": provstep,
709 "prov:label": "Prospective provenance",
710 },
711 )
712 # TODO: Declare roles/parameters as well
713
714 def activity_has_provenance(self, activity, prov_ids):
715 # type: (str, List[Identifier]) -> None
716 """Add http://www.w3.org/TR/prov-aq/ relations to nested PROV files."""
717 # NOTE: The below will only work if the corresponding metadata/provenance arcp URI
718 # is a pre-registered namespace in the PROV Document
719 attribs = [(PROV["has_provenance"], prov_id) for prov_id in prov_ids]
720 self.document.activity(activity, other_attributes=attribs)
721 # Tip: we can't use https://www.w3.org/TR/prov-links/#term-mention
722 # as prov:mentionOf() is only for entities, not activities
723 uris = [i.uri for i in prov_ids]
724 self.research_object.add_annotation(activity, uris, PROV["has_provenance"].uri)
725
726 def finalize_prov_profile(self, name):
727 # type: (Optional[str]) -> List[Identifier]
728 """Transfer the provenance related files to the RO."""
729 # NOTE: Relative posix path
730 if name is None:
731 # main workflow, fixed filenames
732 filename = "primary.cwlprov"
733 else:
734 # ASCII-friendly filename, avoiding % as we don't want %2520 in manifest.json
735 wf_name = urllib.parse.quote(str(name), safe="").replace("%", "_")
736 # Note that the above could cause overlaps for similarly named
737 # workflows, but that's OK as we'll also include run uuid
738 # which also covers thhe case of this step being run in
739 # multiple places or iterations
740 filename = f"{wf_name}.{self.workflow_run_uuid}.cwlprov"
741
742 basename = str(PurePosixPath(PROVENANCE) / filename)
743
744 # TODO: Also support other profiles than CWLProv, e.g. ProvOne
745
746 # list of prov identifiers of provenance files
747 prov_ids = []
748
749 # https://www.w3.org/TR/prov-xml/
750 with self.research_object.write_bag_file(basename + ".xml") as provenance_file:
751 self.document.serialize(provenance_file, format="xml", indent=4)
752 prov_ids.append(self.provenance_ns[filename + ".xml"])
753
754 # https://www.w3.org/TR/prov-n/
755 with self.research_object.write_bag_file(
756 basename + ".provn"
757 ) as provenance_file:
758 self.document.serialize(provenance_file, format="provn", indent=2)
759 prov_ids.append(self.provenance_ns[filename + ".provn"])
760
761 # https://www.w3.org/Submission/prov-json/
762 with self.research_object.write_bag_file(basename + ".json") as provenance_file:
763 self.document.serialize(provenance_file, format="json", indent=2)
764 prov_ids.append(self.provenance_ns[filename + ".json"])
765
766 # "rdf" aka https://www.w3.org/TR/prov-o/
767 # which can be serialized to ttl/nt/jsonld (and more!)
768
769 # https://www.w3.org/TR/turtle/
770 with self.research_object.write_bag_file(basename + ".ttl") as provenance_file:
771 self.document.serialize(provenance_file, format="rdf", rdf_format="turtle")
772 prov_ids.append(self.provenance_ns[filename + ".ttl"])
773
774 # https://www.w3.org/TR/n-triples/
775 with self.research_object.write_bag_file(basename + ".nt") as provenance_file:
776 self.document.serialize(
777 provenance_file, format="rdf", rdf_format="ntriples"
778 )
779 prov_ids.append(self.provenance_ns[filename + ".nt"])
780
781 # https://www.w3.org/TR/json-ld/
782 # TODO: Use a nice JSON-LD context
783 # see also https://eprints.soton.ac.uk/395985/
784 # 404 Not Found on https://provenance.ecs.soton.ac.uk/prov.jsonld :(
785 with self.research_object.write_bag_file(
786 basename + ".jsonld"
787 ) as provenance_file:
788 self.document.serialize(provenance_file, format="rdf", rdf_format="json-ld")
789 prov_ids.append(self.provenance_ns[filename + ".jsonld"])
790
791 _logger.debug("[provenance] added provenance: %s", prov_ids)
792 return prov_ids