Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/provenance_profile.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 import copy | |
| 2 import datetime | |
| 3 import logging | |
| 4 import urllib | |
| 5 import uuid | |
| 6 from io import BytesIO | |
| 7 from pathlib import PurePath, PurePosixPath | |
| 8 from socket import getfqdn | |
| 9 from typing import List, MutableMapping, MutableSequence, Optional, Tuple, Union, cast | |
| 10 | |
| 11 from prov.identifier import Identifier | |
| 12 from prov.model import PROV, PROV_LABEL, PROV_TYPE, PROV_VALUE, ProvDocument, ProvEntity | |
| 13 from schema_salad.sourceline import SourceLine | |
| 14 from typing_extensions import TYPE_CHECKING | |
| 15 | |
| 16 from .errors import WorkflowException | |
| 17 from .job import CommandLineJob, JobBase | |
| 18 from .loghandler import _logger | |
| 19 from .process import Process, shortname | |
| 20 from .provenance_constants import ( | |
| 21 ACCOUNT_UUID, | |
| 22 CWLPROV, | |
| 23 ENCODING, | |
| 24 FOAF, | |
| 25 METADATA, | |
| 26 ORE, | |
| 27 PROVENANCE, | |
| 28 RO, | |
| 29 SCHEMA, | |
| 30 SHA1, | |
| 31 SHA256, | |
| 32 TEXT_PLAIN, | |
| 33 UUID, | |
| 34 WF4EVER, | |
| 35 WFDESC, | |
| 36 WFPROV, | |
| 37 ) | |
| 38 from .stdfsaccess import StdFsAccess | |
| 39 from .utils import ( | |
| 40 CWLObjectType, | |
| 41 CWLOutputType, | |
| 42 JobsType, | |
| 43 get_listing, | |
| 44 posix_path, | |
| 45 versionstring, | |
| 46 ) | |
| 47 from .workflow_job import WorkflowJob | |
| 48 | |
| 49 if TYPE_CHECKING: | |
| 50 from .provenance import ResearchObject | |
| 51 | |
| 52 | |
| 53 def copy_job_order( | |
| 54 job: Union[Process, JobsType], job_order_object: CWLObjectType | |
| 55 ) -> CWLObjectType: | |
| 56 """Create copy of job object for provenance.""" | |
| 57 if not isinstance(job, WorkflowJob): | |
| 58 # direct command line tool execution | |
| 59 return job_order_object | |
| 60 customised_job = {} # type: CWLObjectType | |
| 61 # new job object for RO | |
| 62 for each, i in enumerate(job.tool["inputs"]): | |
| 63 with SourceLine( | |
| 64 job.tool["inputs"], | |
| 65 each, | |
| 66 WorkflowException, | |
| 67 _logger.isEnabledFor(logging.DEBUG), | |
| 68 ): | |
| 69 iid = shortname(i["id"]) | |
| 70 if iid in job_order_object: | |
| 71 customised_job[iid] = copy.deepcopy(job_order_object[iid]) | |
| 72 # add the input element in dictionary for provenance | |
| 73 elif "default" in i: | |
| 74 customised_job[iid] = copy.deepcopy(i["default"]) | |
| 75 # add the default elements in the dictionary for provenance | |
| 76 else: | |
| 77 pass | |
| 78 return customised_job | |
| 79 | |
| 80 | |
| 81 class ProvenanceProfile: | |
| 82 """ | |
| 83 Provenance profile. | |
| 84 | |
| 85 Populated as the workflow runs. | |
| 86 """ | |
| 87 | |
| 88 def __init__( | |
| 89 self, | |
| 90 research_object: "ResearchObject", | |
| 91 full_name: str, | |
| 92 host_provenance: bool, | |
| 93 user_provenance: bool, | |
| 94 orcid: str, | |
| 95 fsaccess: StdFsAccess, | |
| 96 run_uuid: Optional[uuid.UUID] = None, | |
| 97 ) -> None: | |
| 98 """Initialize the provenance profile.""" | |
| 99 self.fsaccess = fsaccess | |
| 100 self.orcid = orcid | |
| 101 self.research_object = research_object | |
| 102 self.folder = self.research_object.folder | |
| 103 self.document = ProvDocument() | |
| 104 self.host_provenance = host_provenance | |
| 105 self.user_provenance = user_provenance | |
| 106 self.engine_uuid = research_object.engine_uuid # type: str | |
| 107 self.add_to_manifest = self.research_object.add_to_manifest | |
| 108 if self.orcid: | |
| 109 _logger.debug("[provenance] Creator ORCID: %s", self.orcid) | |
| 110 self.full_name = full_name | |
| 111 if self.full_name: | |
| 112 _logger.debug("[provenance] Creator Full name: %s", self.full_name) | |
| 113 self.workflow_run_uuid = run_uuid or uuid.uuid4() | |
| 114 self.workflow_run_uri = self.workflow_run_uuid.urn # type: str | |
| 115 self.generate_prov_doc() | |
| 116 | |
| 117 def __str__(self) -> str: | |
| 118 """Represent this Provenvance profile as a string.""" | |
| 119 return "ProvenanceProfile <{}> in <{}>".format( | |
| 120 self.workflow_run_uri, | |
| 121 self.research_object, | |
| 122 ) | |
| 123 | |
| 124 def generate_prov_doc(self) -> Tuple[str, ProvDocument]: | |
| 125 """Add basic namespaces.""" | |
| 126 | |
| 127 def host_provenance(document: ProvDocument) -> None: | |
| 128 """Record host provenance.""" | |
| 129 document.add_namespace(CWLPROV) | |
| 130 document.add_namespace(UUID) | |
| 131 document.add_namespace(FOAF) | |
| 132 | |
| 133 hostname = getfqdn() | |
| 134 # won't have a foaf:accountServiceHomepage for unix hosts, but | |
| 135 # we can at least provide hostname | |
| 136 document.agent( | |
| 137 ACCOUNT_UUID, | |
| 138 { | |
| 139 PROV_TYPE: FOAF["OnlineAccount"], | |
| 140 "prov:location": hostname, | |
| 141 CWLPROV["hostname"]: hostname, | |
| 142 }, | |
| 143 ) | |
| 144 | |
| 145 self.cwltool_version = "cwltool %s" % versionstring().split()[-1] | |
| 146 self.document.add_namespace("wfprov", "http://purl.org/wf4ever/wfprov#") | |
| 147 # document.add_namespace('prov', 'http://www.w3.org/ns/prov#') | |
| 148 self.document.add_namespace("wfdesc", "http://purl.org/wf4ever/wfdesc#") | |
| 149 # TODO: Make this ontology. For now only has cwlprov:image | |
| 150 self.document.add_namespace("cwlprov", "https://w3id.org/cwl/prov#") | |
| 151 self.document.add_namespace("foaf", "http://xmlns.com/foaf/0.1/") | |
| 152 self.document.add_namespace("schema", "http://schema.org/") | |
| 153 self.document.add_namespace("orcid", "https://orcid.org/") | |
| 154 self.document.add_namespace("id", "urn:uuid:") | |
| 155 # NOTE: Internet draft expired 2004-03-04 (!) | |
| 156 # https://tools.ietf.org/html/draft-thiemann-hash-urn-01 | |
| 157 # TODO: Change to nih:sha-256; hashes | |
| 158 # https://tools.ietf.org/html/rfc6920#section-7 | |
| 159 self.document.add_namespace("data", "urn:hash::sha1:") | |
| 160 # Also needed for docker images | |
| 161 self.document.add_namespace(SHA256, "nih:sha-256;") | |
| 162 | |
| 163 # info only, won't really be used by prov as sub-resources use / | |
| 164 self.document.add_namespace("researchobject", self.research_object.base_uri) | |
| 165 # annotations | |
| 166 self.metadata_ns = self.document.add_namespace( | |
| 167 "metadata", self.research_object.base_uri + METADATA + "/" | |
| 168 ) | |
| 169 # Pre-register provenance directory so we can refer to its files | |
| 170 self.provenance_ns = self.document.add_namespace( | |
| 171 "provenance", self.research_object.base_uri + posix_path(PROVENANCE) + "/" | |
| 172 ) | |
| 173 ro_identifier_workflow = self.research_object.base_uri + "workflow/packed.cwl#" | |
| 174 self.wf_ns = self.document.add_namespace("wf", ro_identifier_workflow) | |
| 175 ro_identifier_input = ( | |
| 176 self.research_object.base_uri + "workflow/primary-job.json#" | |
| 177 ) | |
| 178 self.document.add_namespace("input", ro_identifier_input) | |
| 179 | |
| 180 # More info about the account (e.g. username, fullname) | |
| 181 # may or may not have been previously logged by user_provenance() | |
| 182 # .. but we always know cwltool was launched (directly or indirectly) | |
| 183 # by a user account, as cwltool is a command line tool | |
| 184 account = self.document.agent(ACCOUNT_UUID) | |
| 185 if self.orcid or self.full_name: | |
| 186 person = {PROV_TYPE: PROV["Person"], "prov:type": SCHEMA["Person"]} | |
| 187 if self.full_name: | |
| 188 person["prov:label"] = self.full_name | |
| 189 person["foaf:name"] = self.full_name | |
| 190 person["schema:name"] = self.full_name | |
| 191 else: | |
| 192 # TODO: Look up name from ORCID API? | |
| 193 pass | |
| 194 agent = self.document.agent(self.orcid or uuid.uuid4().urn, person) | |
| 195 self.document.actedOnBehalfOf(account, agent) | |
| 196 else: | |
| 197 if self.host_provenance: | |
| 198 host_provenance(self.document) | |
| 199 if self.user_provenance: | |
| 200 self.research_object.user_provenance(self.document) | |
| 201 # The execution of cwltool | |
| 202 wfengine = self.document.agent( | |
| 203 self.engine_uuid, | |
| 204 { | |
| 205 PROV_TYPE: PROV["SoftwareAgent"], | |
| 206 "prov:type": WFPROV["WorkflowEngine"], | |
| 207 "prov:label": self.cwltool_version, | |
| 208 }, | |
| 209 ) | |
| 210 # FIXME: This datetime will be a bit too delayed, we should | |
| 211 # capture when cwltool.py earliest started? | |
| 212 self.document.wasStartedBy(wfengine, None, account, datetime.datetime.now()) | |
| 213 # define workflow run level activity | |
| 214 self.document.activity( | |
| 215 self.workflow_run_uri, | |
| 216 datetime.datetime.now(), | |
| 217 None, | |
| 218 { | |
| 219 PROV_TYPE: WFPROV["WorkflowRun"], | |
| 220 "prov:label": "Run of workflow/packed.cwl#main", | |
| 221 }, | |
| 222 ) | |
| 223 # association between SoftwareAgent and WorkflowRun | |
| 224 main_workflow = "wf:main" | |
| 225 self.document.wasAssociatedWith( | |
| 226 self.workflow_run_uri, self.engine_uuid, main_workflow | |
| 227 ) | |
| 228 self.document.wasStartedBy( | |
| 229 self.workflow_run_uri, None, self.engine_uuid, datetime.datetime.now() | |
| 230 ) | |
| 231 return (self.workflow_run_uri, self.document) | |
| 232 | |
| 233 def evaluate( | |
| 234 self, | |
| 235 process: Process, | |
| 236 job: JobsType, | |
| 237 job_order_object: CWLObjectType, | |
| 238 research_obj: "ResearchObject", | |
| 239 ) -> None: | |
| 240 """Evaluate the nature of job.""" | |
| 241 if not hasattr(process, "steps"): | |
| 242 # record provenance of independent commandline tool executions | |
| 243 self.prospective_prov(job) | |
| 244 customised_job = copy_job_order(job, job_order_object) | |
| 245 self.used_artefacts(customised_job, self.workflow_run_uri) | |
| 246 research_obj.create_job(customised_job) | |
| 247 elif hasattr(job, "workflow"): | |
| 248 # record provenance of workflow executions | |
| 249 self.prospective_prov(job) | |
| 250 customised_job = copy_job_order(job, job_order_object) | |
| 251 self.used_artefacts(customised_job, self.workflow_run_uri) | |
| 252 | |
| 253 def record_process_start( | |
| 254 self, process: Process, job: JobsType, process_run_id: Optional[str] = None | |
| 255 ) -> Optional[str]: | |
| 256 if not hasattr(process, "steps"): | |
| 257 process_run_id = self.workflow_run_uri | |
| 258 elif not hasattr(job, "workflow"): | |
| 259 # commandline tool execution as part of workflow | |
| 260 name = "" | |
| 261 if isinstance(job, (CommandLineJob, JobBase, WorkflowJob)): | |
| 262 name = job.name | |
| 263 process_name = urllib.parse.quote(name, safe=":/,#") | |
| 264 process_run_id = self.start_process(process_name, datetime.datetime.now()) | |
| 265 return process_run_id | |
| 266 | |
| 267 def start_process( | |
| 268 self, | |
| 269 process_name: str, | |
| 270 when: datetime.datetime, | |
| 271 process_run_id: Optional[str] = None, | |
| 272 ) -> str: | |
| 273 """Record the start of each Process.""" | |
| 274 if process_run_id is None: | |
| 275 process_run_id = uuid.uuid4().urn | |
| 276 prov_label = "Run of workflow/packed.cwl#main/" + process_name | |
| 277 self.document.activity( | |
| 278 process_run_id, | |
| 279 None, | |
| 280 None, | |
| 281 {PROV_TYPE: WFPROV["ProcessRun"], PROV_LABEL: prov_label}, | |
| 282 ) | |
| 283 self.document.wasAssociatedWith( | |
| 284 process_run_id, self.engine_uuid, str("wf:main/" + process_name) | |
| 285 ) | |
| 286 self.document.wasStartedBy( | |
| 287 process_run_id, None, self.workflow_run_uri, when, None, None | |
| 288 ) | |
| 289 return process_run_id | |
| 290 | |
| 291 def record_process_end( | |
| 292 self, | |
| 293 process_name: str, | |
| 294 process_run_id: str, | |
| 295 outputs: Union[CWLObjectType, MutableSequence[CWLObjectType], None], | |
| 296 when: datetime.datetime, | |
| 297 ) -> None: | |
| 298 self.generate_output_prov(outputs, process_run_id, process_name) | |
| 299 self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when) | |
| 300 | |
| 301 def declare_file(self, value: CWLObjectType) -> Tuple[ProvEntity, ProvEntity, str]: | |
| 302 if value["class"] != "File": | |
| 303 raise ValueError("Must have class:File: %s" % value) | |
| 304 # Need to determine file hash aka RO filename | |
| 305 entity = None # type: Optional[ProvEntity] | |
| 306 checksum = None | |
| 307 if "checksum" in value: | |
| 308 csum = cast(str, value["checksum"]) | |
| 309 (method, checksum) = csum.split("$", 1) | |
| 310 if method == SHA1 and self.research_object.has_data_file(checksum): | |
| 311 entity = self.document.entity("data:" + checksum) | |
| 312 | |
| 313 if not entity and "location" in value: | |
| 314 location = str(value["location"]) | |
| 315 # If we made it here, we'll have to add it to the RO | |
| 316 with self.fsaccess.open(location, "rb") as fhandle: | |
| 317 relative_path = self.research_object.add_data_file(fhandle) | |
| 318 # FIXME: This naively relies on add_data_file setting hash as filename | |
| 319 checksum = PurePath(relative_path).name | |
| 320 entity = self.document.entity( | |
| 321 "data:" + checksum, {PROV_TYPE: WFPROV["Artifact"]} | |
| 322 ) | |
| 323 if "checksum" not in value: | |
| 324 value["checksum"] = f"{SHA1}${checksum}" | |
| 325 | |
| 326 if not entity and "contents" in value: | |
| 327 # Anonymous file, add content as string | |
| 328 entity, checksum = self.declare_string(cast(str, value["contents"])) | |
| 329 | |
| 330 # By here one of them should have worked! | |
| 331 if not entity or not checksum: | |
| 332 raise ValueError( | |
| 333 "class:File but missing checksum/location/content: %r" % value | |
| 334 ) | |
| 335 | |
| 336 # Track filename and extension, this is generally useful only for | |
| 337 # secondaryFiles. Note that multiple uses of a file might thus record | |
| 338 # different names for the same entity, so we'll | |
| 339 # make/track a specialized entity by UUID | |
| 340 file_id = value.setdefault("@id", uuid.uuid4().urn) | |
| 341 # A specialized entity that has just these names | |
| 342 file_entity = self.document.entity( | |
| 343 file_id, | |
| 344 [(PROV_TYPE, WFPROV["Artifact"]), (PROV_TYPE, WF4EVER["File"])], | |
| 345 ) # type: ProvEntity | |
| 346 | |
| 347 if "basename" in value: | |
| 348 file_entity.add_attributes({CWLPROV["basename"]: value["basename"]}) | |
| 349 if "nameroot" in value: | |
| 350 file_entity.add_attributes({CWLPROV["nameroot"]: value["nameroot"]}) | |
| 351 if "nameext" in value: | |
| 352 file_entity.add_attributes({CWLPROV["nameext"]: value["nameext"]}) | |
| 353 self.document.specializationOf(file_entity, entity) | |
| 354 | |
| 355 # Check for secondaries | |
| 356 for sec in cast( | |
| 357 MutableSequence[CWLObjectType], value.get("secondaryFiles", []) | |
| 358 ): | |
| 359 # TODO: Record these in a specializationOf entity with UUID? | |
| 360 if sec["class"] == "File": | |
| 361 (sec_entity, _, _) = self.declare_file(sec) | |
| 362 elif sec["class"] == "Directory": | |
| 363 sec_entity = self.declare_directory(sec) | |
| 364 else: | |
| 365 raise ValueError(f"Got unexpected secondaryFiles value: {sec}") | |
| 366 # We don't know how/when/where the secondary file was generated, | |
| 367 # but CWL convention is a kind of summary/index derived | |
| 368 # from the original file. As its generally in a different format | |
| 369 # then prov:Quotation is not appropriate. | |
| 370 self.document.derivation( | |
| 371 sec_entity, | |
| 372 file_entity, | |
| 373 other_attributes={PROV["type"]: CWLPROV["SecondaryFile"]}, | |
| 374 ) | |
| 375 | |
| 376 return file_entity, entity, checksum | |
| 377 | |
| 378 def declare_directory(self, value: CWLObjectType) -> ProvEntity: | |
| 379 """Register any nested files/directories.""" | |
| 380 # FIXME: Calculate a hash-like identifier for directory | |
| 381 # so we get same value if it's the same filenames/hashes | |
| 382 # in a different location. | |
| 383 # For now, mint a new UUID to identify this directory, but | |
| 384 # attempt to keep it inside the value dictionary | |
| 385 dir_id = cast(str, value.setdefault("@id", uuid.uuid4().urn)) | |
| 386 | |
| 387 # New annotation file to keep the ORE Folder listing | |
| 388 ore_doc_fn = dir_id.replace("urn:uuid:", "directory-") + ".ttl" | |
| 389 dir_bundle = self.document.bundle(self.metadata_ns[ore_doc_fn]) | |
| 390 | |
| 391 coll = self.document.entity( | |
| 392 dir_id, | |
| 393 [ | |
| 394 (PROV_TYPE, WFPROV["Artifact"]), | |
| 395 (PROV_TYPE, PROV["Collection"]), | |
| 396 (PROV_TYPE, PROV["Dictionary"]), | |
| 397 (PROV_TYPE, RO["Folder"]), | |
| 398 ], | |
| 399 ) | |
| 400 # ORE description of ro:Folder, saved separately | |
| 401 coll_b = dir_bundle.entity( | |
| 402 dir_id, | |
| 403 [(PROV_TYPE, RO["Folder"]), (PROV_TYPE, ORE["Aggregation"])], | |
| 404 ) | |
| 405 self.document.mentionOf(dir_id + "#ore", dir_id, dir_bundle.identifier) | |
| 406 | |
| 407 # dir_manifest = dir_bundle.entity( | |
| 408 # dir_bundle.identifier, {PROV["type"]: ORE["ResourceMap"], | |
| 409 # ORE["describes"]: coll_b.identifier}) | |
| 410 | |
| 411 coll_attribs = [(ORE["isDescribedBy"], dir_bundle.identifier)] | |
| 412 coll_b_attribs = [] # type: List[Tuple[Identifier, ProvEntity]] | |
| 413 | |
| 414 # FIXME: .listing might not be populated yet - hopefully | |
| 415 # a later call to this method will sort that | |
| 416 is_empty = True | |
| 417 | |
| 418 if "listing" not in value: | |
| 419 get_listing(self.fsaccess, value) | |
| 420 for entry in cast(MutableSequence[CWLObjectType], value.get("listing", [])): | |
| 421 is_empty = False | |
| 422 # Declare child-artifacts | |
| 423 entity = self.declare_artefact(entry) | |
| 424 self.document.membership(coll, entity) | |
| 425 # Membership relation aka our ORE Proxy | |
| 426 m_id = uuid.uuid4().urn | |
| 427 m_entity = self.document.entity(m_id) | |
| 428 m_b = dir_bundle.entity(m_id) | |
| 429 | |
| 430 # PROV-O style Dictionary | |
| 431 # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition | |
| 432 # ..as prov.py do not currently allow PROV-N extensions | |
| 433 # like hadDictionaryMember(..) | |
| 434 m_entity.add_asserted_type(PROV["KeyEntityPair"]) | |
| 435 | |
| 436 m_entity.add_attributes( | |
| 437 { | |
| 438 PROV["pairKey"]: entry["basename"], | |
| 439 PROV["pairEntity"]: entity, | |
| 440 } | |
| 441 ) | |
| 442 | |
| 443 # As well as a being a | |
| 444 # http://wf4ever.github.io/ro/2016-01-28/ro/#FolderEntry | |
| 445 m_b.add_asserted_type(RO["FolderEntry"]) | |
| 446 m_b.add_asserted_type(ORE["Proxy"]) | |
| 447 m_b.add_attributes( | |
| 448 { | |
| 449 RO["entryName"]: entry["basename"], | |
| 450 ORE["proxyIn"]: coll, | |
| 451 ORE["proxyFor"]: entity, | |
| 452 } | |
| 453 ) | |
| 454 coll_attribs.append((PROV["hadDictionaryMember"], m_entity)) | |
| 455 coll_b_attribs.append((ORE["aggregates"], m_b)) | |
| 456 | |
| 457 coll.add_attributes(coll_attribs) | |
| 458 coll_b.add_attributes(coll_b_attribs) | |
| 459 | |
| 460 # Also Save ORE Folder as annotation metadata | |
| 461 ore_doc = ProvDocument() | |
| 462 ore_doc.add_namespace(ORE) | |
| 463 ore_doc.add_namespace(RO) | |
| 464 ore_doc.add_namespace(UUID) | |
| 465 ore_doc.add_bundle(dir_bundle) | |
| 466 ore_doc = ore_doc.flattened() | |
| 467 ore_doc_path = str(PurePosixPath(METADATA, ore_doc_fn)) | |
| 468 with self.research_object.write_bag_file(ore_doc_path) as provenance_file: | |
| 469 ore_doc.serialize(provenance_file, format="rdf", rdf_format="turtle") | |
| 470 self.research_object.add_annotation( | |
| 471 dir_id, [ore_doc_fn], ORE["isDescribedBy"].uri | |
| 472 ) | |
| 473 | |
| 474 if is_empty: | |
| 475 # Empty directory | |
| 476 coll.add_asserted_type(PROV["EmptyCollection"]) | |
| 477 coll.add_asserted_type(PROV["EmptyDictionary"]) | |
| 478 self.research_object.add_uri(coll.identifier.uri) | |
| 479 return coll | |
| 480 | |
| 481 def declare_string(self, value: str) -> Tuple[ProvEntity, str]: | |
| 482 """Save as string in UTF-8.""" | |
| 483 byte_s = BytesIO(str(value).encode(ENCODING)) | |
| 484 data_file = self.research_object.add_data_file(byte_s, content_type=TEXT_PLAIN) | |
| 485 checksum = PurePosixPath(data_file).name | |
| 486 # FIXME: Don't naively assume add_data_file uses hash in filename! | |
| 487 data_id = "data:%s" % PurePosixPath(data_file).stem | |
| 488 entity = self.document.entity( | |
| 489 data_id, {PROV_TYPE: WFPROV["Artifact"], PROV_VALUE: str(value)} | |
| 490 ) # type: ProvEntity | |
| 491 return entity, checksum | |
| 492 | |
| 493 def declare_artefact(self, value: Optional[CWLOutputType]) -> ProvEntity: | |
| 494 """Create data artefact entities for all file objects.""" | |
| 495 if value is None: | |
| 496 # FIXME: If this can happen in CWL, we'll | |
| 497 # need a better way to represent this in PROV | |
| 498 return self.document.entity(CWLPROV["None"], {PROV_LABEL: "None"}) | |
| 499 | |
| 500 if isinstance(value, (bool, int, float)): | |
| 501 # Typically used in job documents for flags | |
| 502 | |
| 503 # FIXME: Make consistent hash URIs for these | |
| 504 # that somehow include the type | |
| 505 # (so "1" != 1 != "1.0" != true) | |
| 506 entity = self.document.entity(uuid.uuid4().urn, {PROV_VALUE: value}) | |
| 507 self.research_object.add_uri(entity.identifier.uri) | |
| 508 return entity | |
| 509 | |
| 510 if isinstance(value, (str, str)): | |
| 511 (entity, _) = self.declare_string(value) | |
| 512 return entity | |
| 513 | |
| 514 if isinstance(value, bytes): | |
| 515 # If we got here then we must be in Python 3 | |
| 516 byte_s = BytesIO(value) | |
| 517 data_file = self.research_object.add_data_file(byte_s) | |
| 518 # FIXME: Don't naively assume add_data_file uses hash in filename! | |
| 519 data_id = "data:%s" % PurePosixPath(data_file).stem | |
| 520 return self.document.entity( | |
| 521 data_id, | |
| 522 {PROV_TYPE: WFPROV["Artifact"], PROV_VALUE: str(value)}, | |
| 523 ) | |
| 524 | |
| 525 if isinstance(value, MutableMapping): | |
| 526 if "@id" in value: | |
| 527 # Already processed this value, but it might not be in this PROV | |
| 528 entities = self.document.get_record(value["@id"]) | |
| 529 if entities: | |
| 530 return entities[0] | |
| 531 # else, unknown in PROV, re-add below as if it's fresh | |
| 532 | |
| 533 # Base case - we found a File we need to update | |
| 534 if value.get("class") == "File": | |
| 535 (entity, _, _) = self.declare_file(value) | |
| 536 value["@id"] = entity.identifier.uri | |
| 537 return entity | |
| 538 | |
| 539 if value.get("class") == "Directory": | |
| 540 entity = self.declare_directory(value) | |
| 541 value["@id"] = entity.identifier.uri | |
| 542 return entity | |
| 543 coll_id = value.setdefault("@id", uuid.uuid4().urn) | |
| 544 # some other kind of dictionary? | |
| 545 # TODO: also Save as JSON | |
| 546 coll = self.document.entity( | |
| 547 coll_id, | |
| 548 [ | |
| 549 (PROV_TYPE, WFPROV["Artifact"]), | |
| 550 (PROV_TYPE, PROV["Collection"]), | |
| 551 (PROV_TYPE, PROV["Dictionary"]), | |
| 552 ], | |
| 553 ) | |
| 554 | |
| 555 if value.get("class"): | |
| 556 _logger.warning("Unknown data class %s.", value["class"]) | |
| 557 # FIXME: The class might be "http://example.com/somethingelse" | |
| 558 coll.add_asserted_type(CWLPROV[value["class"]]) | |
| 559 | |
| 560 # Let's iterate and recurse | |
| 561 coll_attribs = [] # type: List[Tuple[Identifier, ProvEntity]] | |
| 562 for (key, val) in value.items(): | |
| 563 v_ent = self.declare_artefact(val) | |
| 564 self.document.membership(coll, v_ent) | |
| 565 m_entity = self.document.entity(uuid.uuid4().urn) | |
| 566 # Note: only support PROV-O style dictionary | |
| 567 # https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition | |
| 568 # as prov.py do not easily allow PROV-N extensions | |
| 569 m_entity.add_asserted_type(PROV["KeyEntityPair"]) | |
| 570 m_entity.add_attributes( | |
| 571 {PROV["pairKey"]: str(key), PROV["pairEntity"]: v_ent} | |
| 572 ) | |
| 573 coll_attribs.append((PROV["hadDictionaryMember"], m_entity)) | |
| 574 coll.add_attributes(coll_attribs) | |
| 575 self.research_object.add_uri(coll.identifier.uri) | |
| 576 return coll | |
| 577 | |
| 578 # some other kind of Collection? | |
| 579 # TODO: also save as JSON | |
| 580 try: | |
| 581 members = [] | |
| 582 for each_input_obj in iter(value): | |
| 583 # Recurse and register any nested objects | |
| 584 e = self.declare_artefact(each_input_obj) | |
| 585 members.append(e) | |
| 586 | |
| 587 # If we reached this, then we were allowed to iterate | |
| 588 coll = self.document.entity( | |
| 589 uuid.uuid4().urn, | |
| 590 [ | |
| 591 (PROV_TYPE, WFPROV["Artifact"]), | |
| 592 (PROV_TYPE, PROV["Collection"]), | |
| 593 ], | |
| 594 ) | |
| 595 if not members: | |
| 596 coll.add_asserted_type(PROV["EmptyCollection"]) | |
| 597 else: | |
| 598 for member in members: | |
| 599 # FIXME: This won't preserve order, for that | |
| 600 # we would need to use PROV.Dictionary | |
| 601 # with numeric keys | |
| 602 self.document.membership(coll, member) | |
| 603 self.research_object.add_uri(coll.identifier.uri) | |
| 604 # FIXME: list value does not support adding "@id" | |
| 605 return coll | |
| 606 except TypeError: | |
| 607 _logger.warning("Unrecognized type %s of %r", type(value), value) | |
| 608 # Let's just fall back to Python repr() | |
| 609 entity = self.document.entity(uuid.uuid4().urn, {PROV_LABEL: repr(value)}) | |
| 610 self.research_object.add_uri(entity.identifier.uri) | |
| 611 return entity | |
| 612 | |
| 613 def used_artefacts( | |
| 614 self, | |
| 615 job_order: Union[CWLObjectType, List[CWLObjectType]], | |
| 616 process_run_id: str, | |
| 617 name: Optional[str] = None, | |
| 618 ) -> None: | |
| 619 """Add used() for each data artefact.""" | |
| 620 if isinstance(job_order, list): | |
| 621 for entry in job_order: | |
| 622 self.used_artefacts(entry, process_run_id, name) | |
| 623 else: | |
| 624 # FIXME: Use workflow name in packed.cwl, "main" is wrong for nested workflows | |
| 625 base = "main" | |
| 626 if name is not None: | |
| 627 base += "/" + name | |
| 628 for key, value in job_order.items(): | |
| 629 prov_role = self.wf_ns[f"{base}/{key}"] | |
| 630 try: | |
| 631 entity = self.declare_artefact(value) | |
| 632 self.document.used( | |
| 633 process_run_id, | |
| 634 entity, | |
| 635 datetime.datetime.now(), | |
| 636 None, | |
| 637 {"prov:role": prov_role}, | |
| 638 ) | |
| 639 except OSError: | |
| 640 pass | |
| 641 | |
| 642 def generate_output_prov( | |
| 643 self, | |
| 644 final_output: Union[CWLObjectType, MutableSequence[CWLObjectType], None], | |
| 645 process_run_id: Optional[str], | |
| 646 name: Optional[str], | |
| 647 ) -> None: | |
| 648 """Call wasGeneratedBy() for each output,copy the files into the RO.""" | |
| 649 if isinstance(final_output, MutableSequence): | |
| 650 for entry in final_output: | |
| 651 self.generate_output_prov(entry, process_run_id, name) | |
| 652 elif final_output is not None: | |
| 653 # Timestamp should be created at the earliest | |
| 654 timestamp = datetime.datetime.now() | |
| 655 | |
| 656 # For each output, find/register the corresponding | |
| 657 # entity (UUID) and document it as generated in | |
| 658 # a role corresponding to the output | |
| 659 for output, value in final_output.items(): | |
| 660 entity = self.declare_artefact(value) | |
| 661 if name is not None: | |
| 662 name = urllib.parse.quote(str(name), safe=":/,#") | |
| 663 # FIXME: Probably not "main" in nested workflows | |
| 664 role = self.wf_ns[f"main/{name}/{output}"] | |
| 665 else: | |
| 666 role = self.wf_ns["main/%s" % output] | |
| 667 | |
| 668 if not process_run_id: | |
| 669 process_run_id = self.workflow_run_uri | |
| 670 | |
| 671 self.document.wasGeneratedBy( | |
| 672 entity, process_run_id, timestamp, None, {"prov:role": role} | |
| 673 ) | |
| 674 | |
| 675 def prospective_prov(self, job: JobsType) -> None: | |
| 676 """Create prospective prov recording as wfdesc prov:Plan.""" | |
| 677 if not isinstance(job, WorkflowJob): | |
| 678 # direct command line tool execution | |
| 679 self.document.entity( | |
| 680 "wf:main", | |
| 681 { | |
| 682 PROV_TYPE: WFDESC["Process"], | |
| 683 "prov:type": PROV["Plan"], | |
| 684 "prov:label": "Prospective provenance", | |
| 685 }, | |
| 686 ) | |
| 687 return | |
| 688 | |
| 689 self.document.entity( | |
| 690 "wf:main", | |
| 691 { | |
| 692 PROV_TYPE: WFDESC["Workflow"], | |
| 693 "prov:type": PROV["Plan"], | |
| 694 "prov:label": "Prospective provenance", | |
| 695 }, | |
| 696 ) | |
| 697 | |
| 698 for step in job.steps: | |
| 699 stepnametemp = "wf:main/" + str(step.name)[5:] | |
| 700 stepname = urllib.parse.quote(stepnametemp, safe=":/,#") | |
| 701 provstep = self.document.entity( | |
| 702 stepname, | |
| 703 {PROV_TYPE: WFDESC["Process"], "prov:type": PROV["Plan"]}, | |
| 704 ) | |
| 705 self.document.entity( | |
| 706 "wf:main", | |
| 707 { | |
| 708 "wfdesc:hasSubProcess": provstep, | |
| 709 "prov:label": "Prospective provenance", | |
| 710 }, | |
| 711 ) | |
| 712 # TODO: Declare roles/parameters as well | |
| 713 | |
| 714 def activity_has_provenance(self, activity, prov_ids): | |
| 715 # type: (str, List[Identifier]) -> None | |
| 716 """Add http://www.w3.org/TR/prov-aq/ relations to nested PROV files.""" | |
| 717 # NOTE: The below will only work if the corresponding metadata/provenance arcp URI | |
| 718 # is a pre-registered namespace in the PROV Document | |
| 719 attribs = [(PROV["has_provenance"], prov_id) for prov_id in prov_ids] | |
| 720 self.document.activity(activity, other_attributes=attribs) | |
| 721 # Tip: we can't use https://www.w3.org/TR/prov-links/#term-mention | |
| 722 # as prov:mentionOf() is only for entities, not activities | |
| 723 uris = [i.uri for i in prov_ids] | |
| 724 self.research_object.add_annotation(activity, uris, PROV["has_provenance"].uri) | |
| 725 | |
| 726 def finalize_prov_profile(self, name): | |
| 727 # type: (Optional[str]) -> List[Identifier] | |
| 728 """Transfer the provenance related files to the RO.""" | |
| 729 # NOTE: Relative posix path | |
| 730 if name is None: | |
| 731 # main workflow, fixed filenames | |
| 732 filename = "primary.cwlprov" | |
| 733 else: | |
| 734 # ASCII-friendly filename, avoiding % as we don't want %2520 in manifest.json | |
| 735 wf_name = urllib.parse.quote(str(name), safe="").replace("%", "_") | |
| 736 # Note that the above could cause overlaps for similarly named | |
| 737 # workflows, but that's OK as we'll also include run uuid | |
| 738 # which also covers thhe case of this step being run in | |
| 739 # multiple places or iterations | |
| 740 filename = f"{wf_name}.{self.workflow_run_uuid}.cwlprov" | |
| 741 | |
| 742 basename = str(PurePosixPath(PROVENANCE) / filename) | |
| 743 | |
| 744 # TODO: Also support other profiles than CWLProv, e.g. ProvOne | |
| 745 | |
| 746 # list of prov identifiers of provenance files | |
| 747 prov_ids = [] | |
| 748 | |
| 749 # https://www.w3.org/TR/prov-xml/ | |
| 750 with self.research_object.write_bag_file(basename + ".xml") as provenance_file: | |
| 751 self.document.serialize(provenance_file, format="xml", indent=4) | |
| 752 prov_ids.append(self.provenance_ns[filename + ".xml"]) | |
| 753 | |
| 754 # https://www.w3.org/TR/prov-n/ | |
| 755 with self.research_object.write_bag_file( | |
| 756 basename + ".provn" | |
| 757 ) as provenance_file: | |
| 758 self.document.serialize(provenance_file, format="provn", indent=2) | |
| 759 prov_ids.append(self.provenance_ns[filename + ".provn"]) | |
| 760 | |
| 761 # https://www.w3.org/Submission/prov-json/ | |
| 762 with self.research_object.write_bag_file(basename + ".json") as provenance_file: | |
| 763 self.document.serialize(provenance_file, format="json", indent=2) | |
| 764 prov_ids.append(self.provenance_ns[filename + ".json"]) | |
| 765 | |
| 766 # "rdf" aka https://www.w3.org/TR/prov-o/ | |
| 767 # which can be serialized to ttl/nt/jsonld (and more!) | |
| 768 | |
| 769 # https://www.w3.org/TR/turtle/ | |
| 770 with self.research_object.write_bag_file(basename + ".ttl") as provenance_file: | |
| 771 self.document.serialize(provenance_file, format="rdf", rdf_format="turtle") | |
| 772 prov_ids.append(self.provenance_ns[filename + ".ttl"]) | |
| 773 | |
| 774 # https://www.w3.org/TR/n-triples/ | |
| 775 with self.research_object.write_bag_file(basename + ".nt") as provenance_file: | |
| 776 self.document.serialize( | |
| 777 provenance_file, format="rdf", rdf_format="ntriples" | |
| 778 ) | |
| 779 prov_ids.append(self.provenance_ns[filename + ".nt"]) | |
| 780 | |
| 781 # https://www.w3.org/TR/json-ld/ | |
| 782 # TODO: Use a nice JSON-LD context | |
| 783 # see also https://eprints.soton.ac.uk/395985/ | |
| 784 # 404 Not Found on https://provenance.ecs.soton.ac.uk/prov.jsonld :( | |
| 785 with self.research_object.write_bag_file( | |
| 786 basename + ".jsonld" | |
| 787 ) as provenance_file: | |
| 788 self.document.serialize(provenance_file, format="rdf", rdf_format="json-ld") | |
| 789 prov_ids.append(self.provenance_ns[filename + ".jsonld"]) | |
| 790 | |
| 791 _logger.debug("[provenance] added provenance: %s", prov_ids) | |
| 792 return prov_ids |
