Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/provenance_profile.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import copy | |
2 import datetime | |
3 import logging | |
4 import urllib | |
5 import uuid | |
6 from io import BytesIO | |
7 from pathlib import PurePath, PurePosixPath | |
8 from socket import getfqdn | |
9 from typing import List, MutableMapping, MutableSequence, Optional, Tuple, Union, cast | |
10 | |
11 from prov.identifier import Identifier | |
12 from prov.model import PROV, PROV_LABEL, PROV_TYPE, PROV_VALUE, ProvDocument, ProvEntity | |
13 from schema_salad.sourceline import SourceLine | |
14 from typing_extensions import TYPE_CHECKING | |
15 | |
16 from .errors import WorkflowException | |
17 from .job import CommandLineJob, JobBase | |
18 from .loghandler import _logger | |
19 from .process import Process, shortname | |
20 from .provenance_constants import ( | |
21 ACCOUNT_UUID, | |
22 CWLPROV, | |
23 ENCODING, | |
24 FOAF, | |
25 METADATA, | |
26 ORE, | |
27 PROVENANCE, | |
28 RO, | |
29 SCHEMA, | |
30 SHA1, | |
31 SHA256, | |
32 TEXT_PLAIN, | |
33 UUID, | |
34 WF4EVER, | |
35 WFDESC, | |
36 WFPROV, | |
37 ) | |
38 from .stdfsaccess import StdFsAccess | |
39 from .utils import ( | |
40 CWLObjectType, | |
41 CWLOutputType, | |
42 JobsType, | |
43 get_listing, | |
44 posix_path, | |
45 versionstring, | |
46 ) | |
47 from .workflow_job import WorkflowJob | |
48 | |
49 if TYPE_CHECKING: | |
50 from .provenance import ResearchObject | |
51 | |
52 | |
53 def copy_job_order( | |
54 job: Union[Process, JobsType], job_order_object: CWLObjectType | |
55 ) -> CWLObjectType: | |
56 """Create copy of job object for provenance.""" | |
57 if not isinstance(job, WorkflowJob): | |
58 # direct command line tool execution | |
59 return job_order_object | |
60 customised_job = {} # type: CWLObjectType | |
61 # new job object for RO | |
62 for each, i in enumerate(job.tool["inputs"]): | |
63 with SourceLine( | |
64 job.tool["inputs"], | |
65 each, | |
66 WorkflowException, | |
67 _logger.isEnabledFor(logging.DEBUG), | |
68 ): | |
69 iid = shortname(i["id"]) | |
70 if iid in job_order_object: | |
71 customised_job[iid] = copy.deepcopy(job_order_object[iid]) | |
72 # add the input element in dictionary for provenance | |
73 elif "default" in i: | |
74 customised_job[iid] = copy.deepcopy(i["default"]) | |
75 # add the default elements in the dictionary for provenance | |
76 else: | |
77 pass | |
78 return customised_job | |
79 | |
80 | |
81 class ProvenanceProfile: | |
82 """ | |
83 Provenance profile. | |
84 | |
85 Populated as the workflow runs. | |
86 """ | |
87 | |
88 def __init__( | |
89 self, | |
90 research_object: "ResearchObject", | |
91 full_name: str, | |
92 host_provenance: bool, | |
93 user_provenance: bool, | |
94 orcid: str, | |
95 fsaccess: StdFsAccess, | |
96 run_uuid: Optional[uuid.UUID] = None, | |
97 ) -> None: | |
98 """Initialize the provenance profile.""" | |
99 self.fsaccess = fsaccess | |
100 self.orcid = orcid | |
101 self.research_object = research_object | |
102 self.folder = self.research_object.folder | |
103 self.document = ProvDocument() | |
104 self.host_provenance = host_provenance | |
105 self.user_provenance = user_provenance | |
106 self.engine_uuid = research_object.engine_uuid # type: str | |
107 self.add_to_manifest = self.research_object.add_to_manifest | |
108 if self.orcid: | |
109 _logger.debug("[provenance] Creator ORCID: %s", self.orcid) | |
110 self.full_name = full_name | |
111 if self.full_name: | |
112 _logger.debug("[provenance] Creator Full name: %s", self.full_name) | |
113 self.workflow_run_uuid = run_uuid or uuid.uuid4() | |
114 self.workflow_run_uri = self.workflow_run_uuid.urn # type: str | |
115 self.generate_prov_doc() | |
116 | |
117 def __str__(self) -> str: | |
118 """Represent this Provenvance profile as a string.""" | |
119 return "ProvenanceProfile <{}> in <{}>".format( | |
120 self.workflow_run_uri, | |
121 self.research_object, | |
122 ) | |
123 | |
124 def generate_prov_doc(self) -> Tuple[str, ProvDocument]: | |
125 """Add basic namespaces.""" | |
126 | |
127 def host_provenance(document: ProvDocument) -> None: | |
128 """Record host provenance.""" | |
129 document.add_namespace(CWLPROV) | |
130 document.add_namespace(UUID) | |
131 document.add_namespace(FOAF) | |
132 | |
133 hostname = getfqdn() | |
134 # won't have a foaf:accountServiceHomepage for unix hosts, but | |
135 # we can at least provide hostname | |
136 document.agent( | |
137 ACCOUNT_UUID, | |
138 { | |
139 PROV_TYPE: FOAF["OnlineAccount"], | |
140 "prov:location": hostname, | |
141 CWLPROV["hostname"]: hostname, | |
142 }, | |
143 ) | |
144 | |
145 self.cwltool_version = "cwltool %s" % versionstring().split()[-1] | |
146 self.document.add_namespace("wfprov", "http://purl.org/wf4ever/wfprov#") | |
147 # document.add_namespace('prov', 'http://www.w3.org/ns/prov#') | |
148 self.document.add_namespace("wfdesc", "http://purl.org/wf4ever/wfdesc#") | |
149 # TODO: Make this ontology. For now only has cwlprov:image | |
150 self.document.add_namespace("cwlprov", "https://w3id.org/cwl/prov#") | |
151 self.document.add_namespace("foaf", "http://xmlns.com/foaf/0.1/") | |
152 self.document.add_namespace("schema", "http://schema.org/") | |
153 self.document.add_namespace("orcid", "https://orcid.org/") | |
154 self.document.add_namespace("id", "urn:uuid:") | |
155 # NOTE: Internet draft expired 2004-03-04 (!) | |
156 # https://tools.ietf.org/html/draft-thiemann-hash-urn-01 | |
157 # TODO: Change to nih:sha-256; hashes | |
158 # https://tools.ietf.org/html/rfc6920#section-7 | |
159 self.document.add_namespace("data", "urn:hash::sha1:") | |
160 # Also needed for docker images | |
161 self.document.add_namespace(SHA256, "nih:sha-256;") | |
162 | |
163 # info only, won't really be used by prov as sub-resources use / | |
164 self.document.add_namespace("researchobject", self.research_object.base_uri) | |
165 # annotations | |
166 self.metadata_ns = self.document.add_namespace( | |
167 "metadata", self.research_object.base_uri + METADATA + "/" | |
168 ) | |
169 # Pre-register provenance directory so we can refer to its files | |
170 self.provenance_ns = self.document.add_namespace( | |
171 "provenance", self.research_object.base_uri + posix_path(PROVENANCE) + "/" | |
172 ) | |
173 ro_identifier_workflow = self.research_object.base_uri + "workflow/packed.cwl#" | |
174 self.wf_ns = self.document.add_namespace("wf", ro_identifier_workflow) | |
175 ro_identifier_input = ( | |
176 self.research_object.base_uri + "workflow/primary-job.json#" | |
177 ) | |
178 self.document.add_namespace("input", ro_identifier_input) | |
179 | |
180 # More info about the account (e.g. username, fullname) | |
181 # may or may not have been previously logged by user_provenance() | |
182 # .. but we always know cwltool was launched (directly or indirectly) | |
183 # by a user account, as cwltool is a command line tool | |
184 account = self.document.agent(ACCOUNT_UUID) | |
185 if self.orcid or self.full_name: | |
186 person = {PROV_TYPE: PROV["Person"], "prov:type": SCHEMA["Person"]} | |
187 if self.full_name: | |
188 person["prov:label"] = self.full_name | |
189 person["foaf:name"] = self.full_name | |
190 person["schema:name"] = self.full_name | |
191 else: | |
192 # TODO: Look up name from ORCID API? | |
193 pass | |
194 agent = self.document.agent(self.orcid or uuid.uuid4().urn, person) | |
195 self.document.actedOnBehalfOf(account, agent) | |
196 else: | |
197 if self.host_provenance: | |
198 host_provenance(self.document) | |
199 if self.user_provenance: | |
200 self.research_object.user_provenance(self.document) | |
201 # The execution of cwltool | |
202 wfengine = self.document.agent( | |
203 self.engine_uuid, | |
204 { | |
205 PROV_TYPE: PROV["SoftwareAgent"], | |
206 "prov:type": WFPROV["WorkflowEngine"], | |
207 "prov:label": self.cwltool_version, | |
208 }, | |
209 ) | |
210 # FIXME: This datetime will be a bit too delayed, we should | |
211 # capture when cwltool.py earliest started? | |
212 self.document.wasStartedBy(wfengine, None, account, datetime.datetime.now()) | |
213 # define workflow run level activity | |
214 self.document.activity( | |
215 self.workflow_run_uri, | |
216 datetime.datetime.now(), | |
217 None, | |
218 { | |
219 PROV_TYPE: WFPROV["WorkflowRun"], | |
220 "prov:label": "Run of workflow/packed.cwl#main", | |
221 }, | |
222 ) | |
223 # association between SoftwareAgent and WorkflowRun | |
224 main_workflow = "wf:main" | |
225 self.document.wasAssociatedWith( | |
226 self.workflow_run_uri, self.engine_uuid, main_workflow | |
227 ) | |
228 self.document.wasStartedBy( | |
229 self.workflow_run_uri, None, self.engine_uuid, datetime.datetime.now() | |
230 ) | |
231 return (self.workflow_run_uri, self.document) | |
232 | |
233 def evaluate( | |
234 self, | |
235 process: Process, | |
236 job: JobsType, | |
237 job_order_object: CWLObjectType, | |
238 research_obj: "ResearchObject", | |
239 ) -> None: | |
240 """Evaluate the nature of job.""" | |
241 if not hasattr(process, "steps"): | |
242 # record provenance of independent commandline tool executions | |
243 self.prospective_prov(job) | |
244 customised_job = copy_job_order(job, job_order_object) | |
245 self.used_artefacts(customised_job, self.workflow_run_uri) | |
246 research_obj.create_job(customised_job) | |
247 elif hasattr(job, "workflow"): | |
248 # record provenance of workflow executions | |
249 self.prospective_prov(job) | |
250 customised_job = copy_job_order(job, job_order_object) | |
251 self.used_artefacts(customised_job, self.workflow_run_uri) | |
252 | |
253 def record_process_start( | |
254 self, process: Process, job: JobsType, process_run_id: Optional[str] = None | |
255 ) -> Optional[str]: | |
256 if not hasattr(process, "steps"): | |
257 process_run_id = self.workflow_run_uri | |
258 elif not hasattr(job, "workflow"): | |
259 # commandline tool execution as part of workflow | |
260 name = "" | |
261 if isinstance(job, (CommandLineJob, JobBase, WorkflowJob)): | |
262 name = job.name | |
263 process_name = urllib.parse.quote(name, safe=":/,#") | |
264 process_run_id = self.start_process(process_name, datetime.datetime.now()) | |
265 return process_run_id | |
266 | |
267 def start_process( | |
268 self, | |
269 process_name: str, | |
270 when: datetime.datetime, | |
271 process_run_id: Optional[str] = None, | |
272 ) -> str: | |
273 """Record the start of each Process.""" | |
274 if process_run_id is None: | |
275 process_run_id = uuid.uuid4().urn | |
276 prov_label = "Run of workflow/packed.cwl#main/" + process_name | |
277 self.document.activity( | |
278 process_run_id, | |
279 None, | |
280 None, | |
281 {PROV_TYPE: WFPROV["ProcessRun"], PROV_LABEL: prov_label}, | |
282 ) | |
283 self.document.wasAssociatedWith( | |
284 process_run_id, self.engine_uuid, str("wf:main/" + process_name) | |
285 ) | |
286 self.document.wasStartedBy( | |
287 process_run_id, None, self.workflow_run_uri, when, None, None | |
288 ) | |
289 return process_run_id | |
290 | |
291 def record_process_end( | |
292 self, | |
293 process_name: str, | |
294 process_run_id: str, | |
295 outputs: Union[CWLObjectType, MutableSequence[CWLObjectType], None], | |
296 when: datetime.datetime, | |
297 ) -> None: | |
298 self.generate_output_prov(outputs, process_run_id, process_name) | |
299 self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when) | |
300 | |
301 def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, str]: | |
302 if value["class"] != "File": | |
303 raise ValueError("Must have class:File: %s" % value) | |
304 # Need to determine file hash aka RO filename | |
305 entity = None # type: Optional[ProvEntity] | |
306 checksum = None | |
307 if "checksum" in value: | |
308 csum = cast(str, value["checksum"]) | |
309 (method, checksum) = csum.split("$", 1) | |
310 if method == SHA1 and self.research_object.has_data_file(checksum): | |
311 entity = self.document.entity("data:" + checksum) | |
312 | |
313 if not entity and "location" in value: | |
314 location = str(value["location"]) | |
315 # If we made it here, we'll have to add it to the RO | |
316 with self.fsaccess.open(location, "rb") as fhandle: | |
317 relative_path = self.research_object.add_data_file(fhandle) | |
318 # FIXME: This naively relies on add_data_file setting hash as filename | |
319 checksum = PurePath(relative_path).name | |
320 entity = self.document.entity( | |
321 "data:" + checksum, {PROV_TYPE: WFPROV["Artifact"]} | |
322 ) | |
323 if "checksum" not in value: | |
324 value["checksum"] = f"{SHA1}${checksum}" | |
325 | |
326 if not entity and "contents" in value: | |
327 # Anonymous file, add content as string | |
328 entity, checksum = self.declare_string(cast(str, value["contents"])) | |
329 | |
330 # By here one of them should have worked! | |
331 if not entity or not checksum: | |
332 raise ValueError( | |
333 "class:File but missing checksum/location/content: %r" % value | |
334 ) | |
335 | |
336 # Track filename and extension, this is generally useful only for | |
337 # secondaryFiles. Note that multiple uses of a file might thus record | |
338 # different names for the same entity, so we'll | |
339 # make/track a specialized entity by UUID | |
340 file_id = value.setdefault("@id", uuid.uuid4().urn) | |
341 # A specialized entity that has just these names | |
342 file_entity = self.document.entity( | |
343 file_id, | |
344 [(PROV_TYPE, WFPROV["Artifact"]), (PROV_TYPE, WF4EVER["File"])], | |
345 ) # type: ProvEntity | |
346 | |
347 if "basename" in value: | |
348 file_entity.add_attributes({CWLPROV["basename"]: value["basename"]}) | |
349 if "nameroot" in value: | |
350 file_entity.add_attributes({CWLPROV["nameroot"]: value["nameroot"]}) | |
351 if "nameext" in value: | |
352 file_entity.add_attributes({CWLPROV["nameext"]: value["nameext"]}) | |
353 self.document.specializationOf(file_entity, entity) | |
354 | |
355 # Check for secondaries | |
356 for sec in cast( | |
357 MutableSequence[CWLObjectType], value.get("secondaryFiles", []) | |
358 ): | |
359 # TODO: Record these in a specializationOf entity with UUID? | |
360 if sec["class"] == "File": | |
361 (sec_entity, _, _) = self.declare_file(sec) | |
362 elif sec["class"] == "Directory": | |
363 sec_entity = self.declare_directory(sec) | |
364 else: | |
365 raise ValueError(f"Got unexpected secondaryFiles value: {sec}") | |
366 # We don't know how/when/where the secondary file was generated, | |
367 # but CWL convention is a kind of summary/index derived | |
368 # from the original file. As its generally in a different format | |
369 # then prov:Quotation is not appropriate. | |
370 self.document.derivation( | |
371 sec_entity, | |
372 file_entity, | |
373 other_attributes={PROV["type"]: CWLPROV["SecondaryFile"]}, | |
374 ) | |
375 | |
376 return file_entity, entity, checksum | |
377 | |
378 def declare_directory(self, value: CWLObjectType) -> ProvEntity: | |
379 """Register any nested files/directories.""" | |
380 # FIXME: Calculate a hash-like identifier for directory | |
381 # so we get same value if it's the same filenames/hashes | |
382 # in a different location. | |
383 # For now, mint a new UUID to identify this directory, but | |
384 # attempt to keep it inside the value dictionary | |
385 dir_id = cast(str, value.setdefault("@id", uuid.uuid4().urn)) | |
386 | |
387 # New annotation file to keep the ORE Folder listing | |
388 ore_doc_fn = dir_id.replace("urn:uuid:", "directory-") + ".ttl" | |
389 dir_bundle = self.document.bundle(self.metadata_ns[ore_doc_fn]) | |
390 | |
391 coll = self.document.entity( | |
392 dir_id, | |
393 [ | |
394 (PROV_TYPE, WFPROV["Artifact"]), | |
395 (PROV_TYPE, PROV["Collection"]), | |
396 (PROV_TYPE, PROV["Dictionary"]), | |
397 (PROV_TYPE, RO["Folder"]), | |
398 ], | |
399 ) | |
400 # ORE description of ro:Folder, saved separately | |
401 coll_b = dir_bundle.entity( | |
402 dir_id, | |
403 [(PROV_TYPE, RO["Folder"]), (PROV_TYPE, ORE["Aggregation"])], | |
404 ) | |
405 self.document.mentionOf(dir_id + "#ore", dir_id, dir_bundle.identifier) | |
406 | |
407 # dir_manifest = dir_bundle.entity( | |
408 # dir_bundle.identifier, {PROV["type"]: ORE["ResourceMap"], | |
409 # ORE["describes"]: coll_b.identifier}) | |
410 | |
411 coll_attribs = [(ORE["isDescribedBy"], dir_bundle.identifier)] | |
412 coll_b_attribs = [] # type: List[Tuple[Identifier, ProvEntity]] | |
413 | |
414 # FIXME: .listing might not be populated yet - hopefully | |
415 # a later call to this method will sort that | |
416 is_empty = True | |
417 | |
418 if "listing" not in value: | |
419 get_listing(self.fsaccess, value) | |
420 for entry in cast(MutableSequence[CWLObjectType], value.get("listing", [])): | |
421 is_empty = False | |
422 # Declare child-artifacts | |
423 entity = self.declare_artefact(entry) | |
424 self.document.membership(coll, entity) | |
425 # Membership relation aka our ORE Proxy | |
426 m_id = uuid.uuid4().urn | |
427 m_entity = self.document.entity(m_id) | |
428 m_b = dir_bundle.entity(m_id) | |
429 | |
430 # PROV-O style Dictionary | |
431 # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition | |
432 # ..as prov.py do not currently allow PROV-N extensions | |
433 # like hadDictionaryMember(..) | |
434 m_entity.add_asserted_type(PROV["KeyEntityPair"]) | |
435 | |
436 m_entity.add_attributes( | |
437 { | |
438 PROV["pairKey"]: entry["basename"], | |
439 PROV["pairEntity"]: entity, | |
440 } | |
441 ) | |
442 | |
443 # As well as a being a | |
444 # http://wf4ever.github.io/ro/2016-01-28/ro/#FolderEntry | |
445 m_b.add_asserted_type(RO["FolderEntry"]) | |
446 m_b.add_asserted_type(ORE["Proxy"]) | |
447 m_b.add_attributes( | |
448 { | |
449 RO["entryName"]: entry["basename"], | |
450 ORE["proxyIn"]: coll, | |
451 ORE["proxyFor"]: entity, | |
452 } | |
453 ) | |
454 coll_attribs.append((PROV["hadDictionaryMember"], m_entity)) | |
455 coll_b_attribs.append((ORE["aggregates"], m_b)) | |
456 | |
457 coll.add_attributes(coll_attribs) | |
458 coll_b.add_attributes(coll_b_attribs) | |
459 | |
460 # Also Save ORE Folder as annotation metadata | |
461 ore_doc = ProvDocument() | |
462 ore_doc.add_namespace(ORE) | |
463 ore_doc.add_namespace(RO) | |
464 ore_doc.add_namespace(UUID) | |
465 ore_doc.add_bundle(dir_bundle) | |
466 ore_doc = ore_doc.flattened() | |
467 ore_doc_path = str(PurePosixPath(METADATA, ore_doc_fn)) | |
468 with self.research_object.write_bag_file(ore_doc_path) as provenance_file: | |
469 ore_doc.serialize(provenance_file, format="rdf", rdf_format="turtle") | |
470 self.research_object.add_annotation( | |
471 dir_id, [ore_doc_fn], ORE["isDescribedBy"].uri | |
472 ) | |
473 | |
474 if is_empty: | |
475 # Empty directory | |
476 coll.add_asserted_type(PROV["EmptyCollection"]) | |
477 coll.add_asserted_type(PROV["EmptyDictionary"]) | |
478 self.research_object.add_uri(coll.identifier.uri) | |
479 return coll | |
480 | |
481 def declare_string(self, value: str) -> Tuple[ProvEntity, str]: | |
482 """Save as string in UTF-8.""" | |
483 byte_s = BytesIO(str(value).encode(ENCODING)) | |
484 data_file = self.research_object.add_data_file(byte_s, content_type=TEXT_PLAIN) | |
485 checksum = PurePosixPath(data_file).name | |
486 # FIXME: Don't naively assume add_data_file uses hash in filename! | |
487 data_id = "data:%s" % PurePosixPath(data_file).stem | |
488 entity = self.document.entity( | |
489 data_id, {PROV_TYPE: WFPROV["Artifact"], PROV_VALUE: str(value)} | |
490 ) # type: ProvEntity | |
491 return entity, checksum | |
492 | |
493 def declare_artefact(self, value: Optional[CWLOutputType]) -> ProvEntity: | |
494 """Create data artefact entities for all file objects.""" | |
495 if value is None: | |
496 # FIXME: If this can happen in CWL, we'll | |
497 # need a better way to represent this in PROV | |
498 return self.document.entity(CWLPROV["None"], {PROV_LABEL: "None"}) | |
499 | |
500 if isinstance(value, (bool, int, float)): | |
501 # Typically used in job documents for flags | |
502 | |
503 # FIXME: Make consistent hash URIs for these | |
504 # that somehow include the type | |
505 # (so "1" != 1 != "1.0" != true) | |
506 entity = self.document.entity(uuid.uuid4().urn, {PROV_VALUE: value}) | |
507 self.research_object.add_uri(entity.identifier.uri) | |
508 return entity | |
509 | |
510 if isinstance(value, (str, str)): | |
511 (entity, _) = self.declare_string(value) | |
512 return entity | |
513 | |
514 if isinstance(value, bytes): | |
515 # If we got here then we must be in Python 3 | |
516 byte_s = BytesIO(value) | |
517 data_file = self.research_object.add_data_file(byte_s) | |
518 # FIXME: Don't naively assume add_data_file uses hash in filename! | |
519 data_id = "data:%s" % PurePosixPath(data_file).stem | |
520 return self.document.entity( | |
521 data_id, | |
522 {PROV_TYPE: WFPROV["Artifact"], PROV_VALUE: str(value)}, | |
523 ) | |
524 | |
525 if isinstance(value, MutableMapping): | |
526 if "@id" in value: | |
527 # Already processed this value, but it might not be in this PROV | |
528 entities = self.document.get_record(value["@id"]) | |
529 if entities: | |
530 return entities[0] | |
531 # else, unknown in PROV, re-add below as if it's fresh | |
532 | |
533 # Base case - we found a File we need to update | |
534 if value.get("class") == "File": | |
535 (entity, _, _) = self.declare_file(value) | |
536 value["@id"] = entity.identifier.uri | |
537 return entity | |
538 | |
539 if value.get("class") == "Directory": | |
540 entity = self.declare_directory(value) | |
541 value["@id"] = entity.identifier.uri | |
542 return entity | |
543 coll_id = value.setdefault("@id", uuid.uuid4().urn) | |
544 # some other kind of dictionary? | |
545 # TODO: also Save as JSON | |
546 coll = self.document.entity( | |
547 coll_id, | |
548 [ | |
549 (PROV_TYPE, WFPROV["Artifact"]), | |
550 (PROV_TYPE, PROV["Collection"]), | |
551 (PROV_TYPE, PROV["Dictionary"]), | |
552 ], | |
553 ) | |
554 | |
555 if value.get("class"): | |
556 _logger.warning("Unknown data class %s.", value["class"]) | |
557 # FIXME: The class might be "http://example.com/somethingelse" | |
558 coll.add_asserted_type(CWLPROV[value["class"]]) | |
559 | |
560 # Let's iterate and recurse | |
561 coll_attribs = [] # type: List[Tuple[Identifier, ProvEntity]] | |
562 for (key, val) in value.items(): | |
563 v_ent = self.declare_artefact(val) | |
564 self.document.membership(coll, v_ent) | |
565 m_entity = self.document.entity(uuid.uuid4().urn) | |
566 # Note: only support PROV-O style dictionary | |
567 # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition | |
568 # as prov.py do not easily allow PROV-N extensions | |
569 m_entity.add_asserted_type(PROV["KeyEntityPair"]) | |
570 m_entity.add_attributes( | |
571 {PROV["pairKey"]: str(key), PROV["pairEntity"]: v_ent} | |
572 ) | |
573 coll_attribs.append((PROV["hadDictionaryMember"], m_entity)) | |
574 coll.add_attributes(coll_attribs) | |
575 self.research_object.add_uri(coll.identifier.uri) | |
576 return coll | |
577 | |
578 # some other kind of Collection? | |
579 # TODO: also save as JSON | |
580 try: | |
581 members = [] | |
582 for each_input_obj in iter(value): | |
583 # Recurse and register any nested objects | |
584 e = self.declare_artefact(each_input_obj) | |
585 members.append(e) | |
586 | |
587 # If we reached this, then we were allowed to iterate | |
588 coll = self.document.entity( | |
589 uuid.uuid4().urn, | |
590 [ | |
591 (PROV_TYPE, WFPROV["Artifact"]), | |
592 (PROV_TYPE, PROV["Collection"]), | |
593 ], | |
594 ) | |
595 if not members: | |
596 coll.add_asserted_type(PROV["EmptyCollection"]) | |
597 else: | |
598 for member in members: | |
599 # FIXME: This won't preserve order, for that | |
600 # we would need to use PROV.Dictionary | |
601 # with numeric keys | |
602 self.document.membership(coll, member) | |
603 self.research_object.add_uri(coll.identifier.uri) | |
604 # FIXME: list value does not support adding "@id" | |
605 return coll | |
606 except TypeError: | |
607 _logger.warning("Unrecognized type %s of %r", type(value), value) | |
608 # Let's just fall back to Python repr() | |
609 entity = self.document.entity(uuid.uuid4().urn, {PROV_LABEL: repr(value)}) | |
610 self.research_object.add_uri(entity.identifier.uri) | |
611 return entity | |
612 | |
613 def used_artefacts( | |
614 self, | |
615 job_order: Union[CWLObjectType, List[CWLObjectType]], | |
616 process_run_id: str, | |
617 name: Optional[str] = None, | |
618 ) -> None: | |
619 """Add used() for each data artefact.""" | |
620 if isinstance(job_order, list): | |
621 for entry in job_order: | |
622 self.used_artefacts(entry, process_run_id, name) | |
623 else: | |
624 # FIXME: Use workflow name in packed.cwl, "main" is wrong for nested workflows | |
625 base = "main" | |
626 if name is not None: | |
627 base += "/" + name | |
628 for key, value in job_order.items(): | |
629 prov_role = self.wf_ns[f"{base}/{key}"] | |
630 try: | |
631 entity = self.declare_artefact(value) | |
632 self.document.used( | |
633 process_run_id, | |
634 entity, | |
635 datetime.datetime.now(), | |
636 None, | |
637 {"prov:role": prov_role}, | |
638 ) | |
639 except OSError: | |
640 pass | |
641 | |
642 def generate_output_prov( | |
643 self, | |
644 final_output: Union[CWLObjectType, MutableSequence[CWLObjectType], None], | |
645 process_run_id: Optional[str], | |
646 name: Optional[str], | |
647 ) -> None: | |
648 """Call wasGeneratedBy() for each output,copy the files into the RO.""" | |
649 if isinstance(final_output, MutableSequence): | |
650 for entry in final_output: | |
651 self.generate_output_prov(entry, process_run_id, name) | |
652 elif final_output is not None: | |
653 # Timestamp should be created at the earliest | |
654 timestamp = datetime.datetime.now() | |
655 | |
656 # For each output, find/register the corresponding | |
657 # entity (UUID) and document it as generated in | |
658 # a role corresponding to the output | |
659 for output, value in final_output.items(): | |
660 entity = self.declare_artefact(value) | |
661 if name is not None: | |
662 name = urllib.parse.quote(str(name), safe=":/,#") | |
663 # FIXME: Probably not "main" in nested workflows | |
664 role = self.wf_ns[f"main/{name}/{output}"] | |
665 else: | |
666 role = self.wf_ns["main/%s" % output] | |
667 | |
668 if not process_run_id: | |
669 process_run_id = self.workflow_run_uri | |
670 | |
671 self.document.wasGeneratedBy( | |
672 entity, process_run_id, timestamp, None, {"prov:role": role} | |
673 ) | |
674 | |
675 def prospective_prov(self, job: JobsType) -> None: | |
676 """Create prospective prov recording as wfdesc prov:Plan.""" | |
677 if not isinstance(job, WorkflowJob): | |
678 # direct command line tool execution | |
679 self.document.entity( | |
680 "wf:main", | |
681 { | |
682 PROV_TYPE: WFDESC["Process"], | |
683 "prov:type": PROV["Plan"], | |
684 "prov:label": "Prospective provenance", | |
685 }, | |
686 ) | |
687 return | |
688 | |
689 self.document.entity( | |
690 "wf:main", | |
691 { | |
692 PROV_TYPE: WFDESC["Workflow"], | |
693 "prov:type": PROV["Plan"], | |
694 "prov:label": "Prospective provenance", | |
695 }, | |
696 ) | |
697 | |
698 for step in job.steps: | |
699 stepnametemp = "wf:main/" + str(step.name)[5:] | |
700 stepname = urllib.parse.quote(stepnametemp, safe=":/,#") | |
701 provstep = self.document.entity( | |
702 stepname, | |
703 {PROV_TYPE: WFDESC["Process"], "prov:type": PROV["Plan"]}, | |
704 ) | |
705 self.document.entity( | |
706 "wf:main", | |
707 { | |
708 "wfdesc:hasSubProcess": provstep, | |
709 "prov:label": "Prospective provenance", | |
710 }, | |
711 ) | |
712 # TODO: Declare roles/parameters as well | |
713 | |
714 def activity_has_provenance(self, activity, prov_ids): | |
715 # type: (str, List[Identifier]) -> None | |
716 """Add http://www.w3.org/TR/prov-aq/ relations to nested PROV files.""" | |
717 # NOTE: The below will only work if the corresponding metadata/provenance arcp URI | |
718 # is a pre-registered namespace in the PROV Document | |
719 attribs = [(PROV["has_provenance"], prov_id) for prov_id in prov_ids] | |
720 self.document.activity(activity, other_attributes=attribs) | |
721 # Tip: we can't use https://www.w3.org/TR/prov-links/#term-mention | |
722 # as prov:mentionOf() is only for entities, not activities | |
723 uris = [i.uri for i in prov_ids] | |
724 self.research_object.add_annotation(activity, uris, PROV["has_provenance"].uri) | |
725 | |
726 def finalize_prov_profile(self, name): | |
727 # type: (Optional[str]) -> List[Identifier] | |
728 """Transfer the provenance related files to the RO.""" | |
729 # NOTE: Relative posix path | |
730 if name is None: | |
731 # main workflow, fixed filenames | |
732 filename = "primary.cwlprov" | |
733 else: | |
734 # ASCII-friendly filename, avoiding % as we don't want %2520 in manifest.json | |
735 wf_name = urllib.parse.quote(str(name), safe="").replace("%", "_") | |
736 # Note that the above could cause overlaps for similarly named | |
737 # workflows, but that's OK as we'll also include run uuid | |
738 # which also covers thhe case of this step being run in | |
739 # multiple places or iterations | |
740 filename = f"{wf_name}.{self.workflow_run_uuid}.cwlprov" | |
741 | |
742 basename = str(PurePosixPath(PROVENANCE) / filename) | |
743 | |
744 # TODO: Also support other profiles than CWLProv, e.g. ProvOne | |
745 | |
746 # list of prov identifiers of provenance files | |
747 prov_ids = [] | |
748 | |
749 # https://www.w3.org/TR/prov-xml/ | |
750 with self.research_object.write_bag_file(basename + ".xml") as provenance_file: | |
751 self.document.serialize(provenance_file, format="xml", indent=4) | |
752 prov_ids.append(self.provenance_ns[filename + ".xml"]) | |
753 | |
754 # https://www.w3.org/TR/prov-n/ | |
755 with self.research_object.write_bag_file( | |
756 basename + ".provn" | |
757 ) as provenance_file: | |
758 self.document.serialize(provenance_file, format="provn", indent=2) | |
759 prov_ids.append(self.provenance_ns[filename + ".provn"]) | |
760 | |
761 # https://www.w3.org/Submission/prov-json/ | |
762 with self.research_object.write_bag_file(basename + ".json") as provenance_file: | |
763 self.document.serialize(provenance_file, format="json", indent=2) | |
764 prov_ids.append(self.provenance_ns[filename + ".json"]) | |
765 | |
766 # "rdf" aka https://www.w3.org/TR/prov-o/ | |
767 # which can be serialized to ttl/nt/jsonld (and more!) | |
768 | |
769 # https://www.w3.org/TR/turtle/ | |
770 with self.research_object.write_bag_file(basename + ".ttl") as provenance_file: | |
771 self.document.serialize(provenance_file, format="rdf", rdf_format="turtle") | |
772 prov_ids.append(self.provenance_ns[filename + ".ttl"]) | |
773 | |
774 # https://www.w3.org/TR/n-triples/ | |
775 with self.research_object.write_bag_file(basename + ".nt") as provenance_file: | |
776 self.document.serialize( | |
777 provenance_file, format="rdf", rdf_format="ntriples" | |
778 ) | |
779 prov_ids.append(self.provenance_ns[filename + ".nt"]) | |
780 | |
781 # https://www.w3.org/TR/json-ld/ | |
782 # TODO: Use a nice JSON-LD context | |
783 # see also https://eprints.soton.ac.uk/395985/ | |
784 # 404 Not Found on https://provenance.ecs.soton.ac.uk/prov.jsonld :( | |
785 with self.research_object.write_bag_file( | |
786 basename + ".jsonld" | |
787 ) as provenance_file: | |
788 self.document.serialize(provenance_file, format="rdf", rdf_format="json-ld") | |
789 prov_ids.append(self.provenance_ns[filename + ".jsonld"]) | |
790 | |
791 _logger.debug("[provenance] added provenance: %s", prov_ids) | |
792 return prov_ids |