comparison env/lib/python3.9/site-packages/cwltool/tests/test_provenance.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import json
2 import os
3 import pickle
4 import sys
5 import urllib
6 from pathlib import Path
7 from typing import Any, Generator
8
9 import arcp
10 import bagit
11 import pytest
12 from rdflib import Graph, Namespace, URIRef
13 from rdflib.namespace import DC, DCTERMS, RDF
14 from rdflib.term import Literal
15
16 from cwltool import provenance, provenance_constants
17 from cwltool.main import main
18 from cwltool.provenance import ResearchObject
19 from cwltool.stdfsaccess import StdFsAccess
20
21 from .util import get_data, needs_docker, working_directory
22
23 # RDF namespaces we'll query for later
24 ORE = Namespace("http://www.openarchives.org/ore/terms/")
25 PROV = Namespace("http://www.w3.org/ns/prov#")
26 RO = Namespace("http://purl.org/wf4ever/ro#")
27 WFDESC = Namespace("http://purl.org/wf4ever/wfdesc#")
28 WFPROV = Namespace("http://purl.org/wf4ever/wfprov#")
29 SCHEMA = Namespace("http://schema.org/")
30 CWLPROV = Namespace("https://w3id.org/cwl/prov#")
31 OA = Namespace("http://www.w3.org/ns/oa#")
32
33
34 def cwltool(tmp_path: Path, *args: Any) -> Path:
35 prov_folder = tmp_path / "provenance"
36 prov_folder.mkdir()
37 new_args = ["--provenance", str(prov_folder)]
38 new_args.extend(args)
39 # Run within a temporary directory to not pollute git checkout
40 tmp_dir = tmp_path / "cwltool-run"
41 tmp_dir.mkdir()
42 with working_directory(tmp_dir):
43 status = main(new_args)
44 assert status == 0, "Failed: cwltool.main(%r)" % (args)
45 return prov_folder
46
47
48 @needs_docker
49 def test_hello_workflow(tmp_path: Path) -> None:
50 check_provenance(
51 cwltool(
52 tmp_path,
53 get_data("tests/wf/hello-workflow.cwl"),
54 "--usermessage",
55 "Hello workflow",
56 )
57 )
58
59
60 @needs_docker
61 def test_hello_single_tool(tmp_path: Path) -> None:
62 check_provenance(
63 cwltool(
64 tmp_path,
65 get_data("tests/wf/hello_single_tool.cwl"),
66 "--message",
67 "Hello tool",
68 ),
69 single_tool=True,
70 )
71
72
73 @needs_docker
74 def test_revsort_workflow(tmp_path: Path) -> None:
75 folder = cwltool(
76 tmp_path,
77 get_data("tests/wf/revsort.cwl"),
78 get_data("tests/wf/revsort-job.json"),
79 )
80 check_output_object(folder)
81 check_provenance(folder)
82
83
84 @needs_docker
85 def test_nested_workflow(tmp_path: Path) -> None:
86 check_provenance(cwltool(tmp_path, get_data("tests/wf/nested.cwl")), nested=True)
87
88
89 @needs_docker
90 def test_secondary_files_implicit(tmp_path: Path) -> None:
91 file1 = tmp_path / "foo1.txt"
92 file1idx = tmp_path / "foo1.txt.idx"
93
94 with open(str(file1), "w", encoding="ascii") as f:
95 f.write("foo")
96 with open(str(file1idx), "w", encoding="ascii") as f:
97 f.write("bar")
98
99 # secondary will be picked up by .idx
100 folder = cwltool(tmp_path, get_data("tests/wf/sec-wf.cwl"), "--file1", str(file1))
101 check_provenance(folder, secondary_files=True)
102 check_secondary_files(folder)
103
104
105 @needs_docker
106 def test_secondary_files_explicit(tmp_path: Path) -> None:
107 # Deliberately do NOT have common basename or extension
108 file1dir = tmp_path / "foo"
109 file1dir.mkdir()
110 file1 = file1dir / "foo"
111 file1idxdir = tmp_path / "bar"
112 file1idxdir.mkdir()
113 file1idx = file1idxdir / "bar"
114
115 with open(file1, "w", encoding="ascii") as f:
116 f.write("foo")
117 with open(file1idx, "w", encoding="ascii") as f:
118 f.write("bar")
119
120 # explicit secondaryFiles
121 job = {
122 "file1": {
123 "class": "File",
124 "path": str(file1),
125 "basename": "foo1.txt",
126 "secondaryFiles": [
127 {
128 "class": "File",
129 "path": str(file1idx),
130 "basename": "foo1.txt.idx",
131 }
132 ],
133 }
134 }
135
136 jobJson = tmp_path / "job.json"
137 with open(jobJson, "wb") as fp:
138 j = json.dumps(job, ensure_ascii=True)
139 fp.write(j.encode("ascii"))
140
141 folder = cwltool(tmp_path, get_data("tests/wf/sec-wf.cwl"), str(jobJson))
142 check_provenance(folder, secondary_files=True)
143 check_secondary_files(folder)
144
145
146 @needs_docker
147 def test_secondary_files_output(tmp_path: Path) -> None:
148 # secondary will be picked up by .idx
149 folder = cwltool(tmp_path, get_data("tests/wf/sec-wf-out.cwl"))
150 check_provenance(folder, secondary_files=True)
151 # Skipped, not the same secondary files as above
152 # self.check_secondary_files()
153
154
155 @needs_docker
156 def test_directory_workflow(tmp_path: Path) -> None:
157 dir2 = tmp_path / "dir2"
158 dir2.mkdir()
159 sha1 = {
160 # Expected hashes of ASCII letters (no linefeed)
161 # as returned from:
162 # for x in a b c ; do echo -n $x | sha1sum ; done
163 "a": "86f7e437faa5a7fce15d1ddcb9eaeaea377667b8",
164 "b": "e9d71f5ee7c92d6dc9e92ffdad17b8bd49418f98",
165 "c": "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4",
166 }
167 for x in "abc":
168 # Make test files with predictable hashes
169 with open(dir2 / x, "w", encoding="ascii") as f:
170 f.write(x)
171
172 folder = cwltool(tmp_path, get_data("tests/wf/directory.cwl"), "--dir", str(dir2))
173 check_provenance(folder, directory=True)
174
175 # Output should include ls stdout of filenames a b c on each line
176 file_list = (
177 folder
178 / "data"
179 / "3c"
180 / "3ca69e8d6c234a469d16ac28a4a658c92267c423"
181 # checksum as returned from:
182 # echo -e "a\nb\nc" | sha1sum
183 # 3ca69e8d6c234a469d16ac28a4a658c92267c423 -
184 )
185 assert file_list.is_file()
186
187 # Input files should be captured by hash value,
188 # even if they were inside a class: Directory
189 for (l, l_hash) in sha1.items():
190 prefix = l_hash[:2] # first 2 letters
191 p = folder / "data" / prefix / l_hash
192 assert p.is_file(), f"Could not find {l} as {p}"
193
194
195 def check_output_object(base_path: Path) -> None:
196 output_obj = base_path / "workflow" / "primary-output.json"
197 compare_checksum = "sha1$b9214658cc453331b62c2282b772a5c063dbd284"
198 compare_location = "../data/b9/b9214658cc453331b62c2282b772a5c063dbd284"
199 with open(output_obj) as fp:
200 out_json = json.load(fp)
201 f1 = out_json["sorted_output"]
202 assert f1["checksum"] == compare_checksum
203 assert f1["location"] == compare_location
204
205
206 def check_secondary_files(base_path: Path) -> None:
207 foo_data = (
208 base_path
209 / "data"
210 / "0b"
211 / "0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33"
212 # checksum as returned from:
213 # $ echo -n foo | sha1sum
214 # 0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33 -
215 )
216 bar_data = base_path / "data" / "62" / "62cdb7020ff920e5aa642c3d4066950dd1f01f4d"
217 assert foo_data.is_file(), "Did not capture file.txt 'foo'"
218 assert bar_data.is_file(), "Did not capture secondary file.txt.idx 'bar"
219
220 primary_job = base_path / "workflow" / "primary-job.json"
221 with open(primary_job) as fp:
222 job_json = json.load(fp)
223 # TODO: Verify secondaryFile in primary-job.json
224 f1 = job_json["file1"]
225 assert f1["location"] == "../data/0b/0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33"
226 assert f1["basename"] == "foo1.txt"
227
228 secondaries = f1["secondaryFiles"]
229 assert secondaries
230 f1idx = secondaries[0]
231 assert f1idx["location"] == "../data/62/62cdb7020ff920e5aa642c3d4066950dd1f01f4d"
232 assert f1idx["basename"], "foo1.txt.idx"
233
234
235 def check_provenance(
236 base_path: Path,
237 nested: bool = False,
238 single_tool: bool = False,
239 directory: bool = False,
240 secondary_files: bool = False,
241 ) -> None:
242 check_folders(base_path)
243 check_bagit(base_path)
244 check_ro(base_path, nested=nested)
245 check_prov(
246 base_path,
247 nested=nested,
248 single_tool=single_tool,
249 directory=directory,
250 secondary_files=secondary_files,
251 )
252
253
254 def check_folders(base_path: Path) -> None:
255 required_folders = [
256 "data",
257 "snapshot",
258 "workflow",
259 "metadata",
260 os.path.join("metadata", "provenance"),
261 ]
262
263 for folder in required_folders:
264 assert (base_path / folder).is_dir()
265
266
267 def check_bagit(base_path: Path) -> None:
268 # check bagit structure
269 required_files = [
270 "bagit.txt",
271 "bag-info.txt",
272 "manifest-sha1.txt",
273 "tagmanifest-sha1.txt",
274 "tagmanifest-sha256.txt",
275 ]
276
277 for basename in required_files:
278 assert (base_path / basename).is_file()
279
280 bag = bagit.Bag(str(base_path))
281 assert bag.has_oxum()
282 (only_manifest, only_fs) = bag.compare_manifests_with_fs()
283 assert not list(only_manifest), "Some files only in manifest"
284 assert not list(only_fs), "Some files only on file system"
285 missing_tagfiles = bag.missing_optional_tagfiles()
286 assert not list(missing_tagfiles), "Some files only in tagmanifest"
287 bag.validate()
288 # TODO: Check other bag-info attributes
289 assert arcp.is_arcp_uri(bag.info.get("External-Identifier"))
290
291
292 def find_arcp(base_path: Path) -> str:
293 # First try to find External-Identifier
294 bag = bagit.Bag(str(base_path))
295 ext_id = bag.info.get("External-Identifier")
296 if arcp.is_arcp_uri(ext_id):
297 return str(ext_id)
298 raise Exception("Can't find External-Identifier")
299
300
301 def _arcp2file(base_path: Path, uri: str) -> Path:
302 parsed = arcp.parse_arcp(uri)
303 # arcp URIs, ensure they are local to our RO
304 assert (
305 parsed.uuid == arcp.parse_arcp(find_arcp(base_path)).uuid
306 ), "arcp URI must be local to the research object"
307
308 path = parsed.path[1:] # Strip first /
309 # Convert to local path, in case it uses \ on Windows
310 return base_path / Path(path)
311
312
313 def check_ro(base_path: Path, nested: bool = False) -> None:
314 manifest_file = base_path / "metadata" / "manifest.json"
315 assert manifest_file.is_file(), f"Can't find {manifest_file}"
316 arcp_root = find_arcp(base_path)
317 base = urllib.parse.urljoin(arcp_root, "metadata/manifest.json")
318 g = Graph()
319
320 # Avoid resolving JSON-LD context https://w3id.org/bundle/context
321 # so this test works offline
322 context = Path(get_data("tests/bundle-context.jsonld")).as_uri()
323 with open(manifest_file, encoding="UTF-8") as fh:
324 jsonld = fh.read()
325 # replace with file:/// URI
326 jsonld = jsonld.replace("https://w3id.org/bundle/context", context)
327 g.parse(data=jsonld, format="json-ld", publicID=base)
328 if os.environ.get("DEBUG"):
329 print("Parsed manifest:\n\n")
330 g.serialize(sys.stdout, format="ttl")
331 _ro = None
332
333 for _ro in g.subjects(ORE.isDescribedBy, URIRef(base)):
334 break
335 assert _ro is not None, "Can't find RO with ore:isDescribedBy"
336
337 profile = None
338 for dc in g.objects(_ro, DCTERMS.conformsTo):
339 profile = dc
340 break
341 assert profile is not None, "Can't find profile with dct:conformsTo"
342 assert profile == URIRef(provenance_constants.CWLPROV_VERSION), (
343 "Unexpected cwlprov version " + profile
344 )
345
346 paths = []
347 externals = []
348 for aggregate in g.objects(_ro, ORE.aggregates):
349 if not arcp.is_arcp_uri(aggregate):
350 externals.append(aggregate)
351 # Won't check external URIs existence here
352 # TODO: Check they are not relative!
353 continue
354 lfile = _arcp2file(base_path, aggregate)
355 paths.append(os.path.relpath(lfile, base_path))
356 assert os.path.isfile(lfile), f"Can't find aggregated {lfile}"
357
358 assert paths, "Didn't find any arcp aggregates"
359 assert externals, "Didn't find any data URIs"
360
361 for ext in ["provn", "xml", "json", "jsonld", "nt", "ttl"]:
362 f = "metadata/provenance/primary.cwlprov.%s" % ext
363 assert f in paths, "provenance file missing " + f
364
365 for f in [
366 "workflow/primary-job.json",
367 "workflow/packed.cwl",
368 "workflow/primary-output.json",
369 ]:
370 assert f in paths, "workflow file missing " + f
371 # Can't test snapshot/ files directly as their name varies
372
373 # TODO: check urn:hash::sha1 thingies
374 # TODO: Check OA annotations
375
376 packed = urllib.parse.urljoin(arcp_root, "/workflow/packed.cwl")
377 primary_job = urllib.parse.urljoin(arcp_root, "/workflow/primary-job.json")
378 primary_prov_nt = urllib.parse.urljoin(
379 arcp_root, "/metadata/provenance/primary.cwlprov.nt"
380 )
381 uuid = arcp.parse_arcp(arcp_root).uuid
382
383 highlights = set(g.subjects(OA.motivatedBy, OA.highlighting))
384 assert highlights, "Didn't find highlights"
385 for h in highlights:
386 assert (h, OA.hasTarget, URIRef(packed)) in g
387
388 describes = set(g.subjects(OA.motivatedBy, OA.describing))
389 for d in describes:
390 assert (d, OA.hasBody, URIRef(arcp_root)) in g
391 assert (d, OA.hasTarget, URIRef(uuid.urn)) in g
392
393 linked = set(g.subjects(OA.motivatedBy, OA.linking))
394 for link in linked:
395 assert (link, OA.hasBody, URIRef(packed)) in g
396 assert (link, OA.hasBody, URIRef(primary_job)) in g
397 assert (link, OA.hasTarget, URIRef(uuid.urn)) in g
398
399 has_provenance = set(g.subjects(OA.hasBody, URIRef(primary_prov_nt)))
400 for p in has_provenance:
401 assert (p, OA.hasTarget, URIRef(uuid.urn)) in g
402 assert (p, OA.motivatedBy, PROV.has_provenance) in g
403 # Check all prov elements are listed
404 formats = set()
405 for prov in g.objects(p, OA.hasBody):
406 assert (
407 prov,
408 DCTERMS.conformsTo,
409 URIRef(provenance_constants.CWLPROV_VERSION),
410 ) in g
411 # NOTE: DC.format is a Namespace method and does not resolve like other terms
412 formats.update(set(g.objects(prov, DC["format"])))
413 assert formats, "Could not find media types"
414 expected = {
415 Literal(f)
416 for f in (
417 "application/json",
418 "application/ld+json",
419 "application/n-triples",
420 'text/provenance-notation; charset="UTF-8"',
421 'text/turtle; charset="UTF-8"',
422 "application/xml",
423 )
424 }
425 assert formats == expected, "Did not match expected PROV media types"
426
427 if nested:
428 # Check for additional PROVs
429 # Let's try to find the other wf run ID
430 otherRuns = set()
431 for p in g.subjects(OA.motivatedBy, PROV.has_provenance):
432 if (p, OA.hasTarget, URIRef(uuid.urn)) in g:
433 continue
434 otherRuns.update(set(g.objects(p, OA.hasTarget)))
435 assert otherRuns, "Could not find nested workflow run prov annotations"
436
437
438 def check_prov(
439 base_path: Path,
440 nested: bool = False,
441 single_tool: bool = False,
442 directory: bool = False,
443 secondary_files: bool = False,
444 ) -> None:
445 prov_file = base_path / "metadata" / "provenance" / "primary.cwlprov.nt"
446 assert prov_file.is_file(), f"Can't find {prov_file}"
447 arcp_root = find_arcp(base_path)
448 # Note: We don't need to include metadata/provnance in base URI
449 # as .nt always use absolute URIs
450 g = Graph()
451 with open(prov_file, "rb") as f:
452 g.parse(file=f, format="nt", publicID=arcp_root)
453 if os.environ.get("DEBUG"):
454 print("Parsed %s:\n\n" % prov_file)
455 g.serialize(sys.stdout, format="ttl")
456 runs = set(g.subjects(RDF.type, WFPROV.WorkflowRun))
457
458 # main workflow run URI (as urn:uuid:) should correspond to arcp uuid part
459 uuid = arcp.parse_arcp(arcp_root).uuid
460 main_run = URIRef(uuid.urn)
461 assert main_run in runs, f"Can't find run {main_run} in {runs}"
462 # TODO: we should not need to parse arcp, but follow
463 # the has_provenance annotations in manifest.json instead
464
465 # run should have been started by a wf engine
466
467 engines = set(g.subjects(RDF.type, WFPROV.WorkflowEngine))
468 assert engines, "Could not find WorkflowEngine"
469 assert len(engines) == 1, "Found too many WorkflowEngines: %s" % engines
470 engine = engines.pop()
471
472 assert (
473 main_run,
474 PROV.wasAssociatedWith,
475 engine,
476 ) in g, "Wf run not associated with wf engine"
477 assert (
478 engine,
479 RDF.type,
480 PROV.SoftwareAgent,
481 ) in g, "Engine not declared as SoftwareAgent"
482
483 if single_tool:
484 activities = set(g.subjects(RDF.type, PROV.Activity))
485 assert len(activities) == 1, "Too many activities: %s" % activities
486 # single tool exec, there should be no other activities
487 # than the tool run
488 # (NOTE: the WorkflowEngine is also activity, but not declared explicitly)
489 else:
490 # Check all process runs were started by the main worklow
491 stepActivities = set(g.subjects(RDF.type, WFPROV.ProcessRun))
492 # Although semantically a WorkflowEngine is also a ProcessRun,
493 # we don't declare that,
494 # thus only the step activities should be in this set.
495 assert main_run not in stepActivities
496 assert stepActivities, "No steps executed in workflow"
497 for step in stepActivities:
498 # Let's check it was started by the main_run. Unfortunately, unlike PROV-N
499 # in PROV-O RDF we have to check through the n-ary qualifiedStart relation
500 starts = set(g.objects(step, PROV.qualifiedStart))
501 assert starts, "Could not find qualifiedStart of step %s" % step
502 assert len(starts) == 1, "Too many qualifiedStart for step %s" % step
503 start = starts.pop()
504 assert (
505 start,
506 PROV.hadActivity,
507 main_run,
508 ) in g, "Step activity not started by main activity"
509 # Tip: Any nested workflow step executions should not be in this prov file,
510 # but in separate file
511 if nested:
512 # Find some cwlprov.nt the nested workflow is described in
513 prov_ids = set(g.objects(predicate=PROV.has_provenance))
514 # FIXME: The above is a bit naive and does not check the subject is
515 # one of the steps -- OK for now as this is the only case of prov:has_provenance
516 assert prov_ids, "Could not find prov:has_provenance from nested workflow"
517
518 nt_uris = [uri for uri in prov_ids if uri.endswith("cwlprov.nt")]
519 # TODO: Look up manifest conformsTo and content-type rather than assuming magic filename
520 assert nt_uris, "Could not find *.cwlprov.nt"
521 # Load into new graph
522 g2 = Graph()
523 nt_uri = nt_uris.pop()
524 with open(_arcp2file(base_path, nt_uri), "rb") as f:
525 g2.parse(file=f, format="nt", publicID=nt_uri)
526 # TODO: Check g2 statements that it's the same UUID activity inside
527 # as in the outer step
528 if directory:
529 directories = set(g.subjects(RDF.type, RO.Folder))
530 assert directories
531
532 for d in directories:
533 assert (d, RDF.type, PROV.Dictionary) in g
534 assert (d, RDF.type, PROV.Collection) in g
535 assert (d, RDF.type, PROV.Entity) in g
536
537 files = set()
538 for entry in g.objects(d, PROV.hadDictionaryMember):
539 assert (entry, RDF.type, PROV.KeyEntityPair) in g
540 # We don't check what that filename is here
541 assert set(g.objects(entry, PROV.pairKey))
542
543 # RO:Folder aspect
544 assert set(g.objects(entry, RO.entryName))
545 assert (d, ORE.aggregates, entry) in g
546 assert (entry, RDF.type, RO.FolderEntry) in g
547 assert (entry, RDF.type, ORE.Proxy) in g
548 assert (entry, ORE.proxyIn, d) in g
549 assert (entry, ORE.proxyIn, d) in g
550
551 # Which file?
552 entities = set(g.objects(entry, PROV.pairEntity))
553 assert entities
554 ef = entities.pop()
555 files.add(ef)
556 assert (entry, ORE.proxyFor, ef) in g
557 assert (ef, RDF.type, PROV.Entity) in g
558
559 if not files:
560 assert (d, RDF.type, PROV.EmptyCollection) in g
561 assert (d, RDF.type, PROV.EmptyDictionary) in g
562 if secondary_files:
563 derivations = set(g.subjects(RDF.type, CWLPROV.SecondaryFile))
564 assert derivations
565 for der in derivations:
566 sec = set(g.subjects(PROV.qualifiedDerivation, der)).pop()
567 prim = set(g.objects(der, PROV.entity)).pop()
568
569 # UUID specializes a hash checksum
570 assert set(g.objects(sec, PROV.specializationOf))
571 # extensions etc.
572 sec_basename = set(g.objects(sec, CWLPROV.basename)).pop()
573 sec_nameroot = set(g.objects(sec, CWLPROV.nameroot)).pop()
574 sec_nameext = set(g.objects(sec, CWLPROV.nameext)).pop()
575 assert str(sec_basename) == f"{sec_nameroot}{sec_nameext}"
576 # TODO: Check hash data file exist in RO
577
578 # The primary entity should have the same, but different values
579 assert set(g.objects(prim, PROV.specializationOf))
580 prim_basename = set(g.objects(prim, CWLPROV.basename)).pop()
581 prim_nameroot = set(g.objects(prim, CWLPROV.nameroot)).pop()
582 prim_nameext = set(g.objects(prim, CWLPROV.nameext)).pop()
583 assert str(prim_basename) == f"{prim_nameroot}{prim_nameext}"
584
585
586 @pytest.fixture
587 def research_object() -> Generator[ResearchObject, None, None]:
588 re_ob = ResearchObject(StdFsAccess(""))
589 yield re_ob
590 re_ob.close()
591
592
593 def test_absolute_path_fails(research_object: ResearchObject) -> None:
594 with pytest.raises(ValueError):
595 research_object.write_bag_file("/absolute/path/fails")
596
597
598 def test_climboutfails(research_object: ResearchObject) -> None:
599 with pytest.raises(ValueError):
600 research_object.write_bag_file("../../outside-ro")
601
602
603 def test_writable_string(research_object: ResearchObject) -> None:
604 with research_object.write_bag_file("file.txt") as fh:
605 assert fh.writable()
606 fh.write("Hello\n")
607 # TODO: Check Windows does not modify \n to \r\n here
608
609 sha1 = os.path.join(research_object.folder, "tagmanifest-sha1.txt")
610 assert os.path.isfile(sha1)
611
612 with open(sha1, encoding="UTF-8") as sha_file:
613 stripped_sha = sha_file.readline().strip()
614 assert stripped_sha.endswith("file.txt")
615 # stain@biggie:~/src/cwltool$ echo Hello | sha1sum
616 # 1d229271928d3f9e2bb0375bd6ce5db6c6d348d9 -
617 assert stripped_sha.startswith("1d229271928d3f9e2bb0375bd6ce5db6c6d348d9")
618
619 sha256 = os.path.join(research_object.folder, "tagmanifest-sha256.txt")
620 assert os.path.isfile(sha256)
621
622 with open(sha256, encoding="UTF-8") as sha_file:
623 stripped_sha = sha_file.readline().strip()
624
625 assert stripped_sha.endswith("file.txt")
626 # stain@biggie:~/src/cwltool$ echo Hello | sha256sum
627 # 66a045b452102c59d840ec097d59d9467e13a3f34f6494e539ffd32c1bb35f18 -
628 assert stripped_sha.startswith(
629 "66a045b452102c59d840ec097d59d9467e13a3f34f6494e539ffd32c1bb35f18"
630 )
631
632 sha512 = os.path.join(research_object.folder, "tagmanifest-sha512.txt")
633 assert os.path.isfile(sha512)
634
635
636 def test_writable_unicode_string(research_object: ResearchObject) -> None:
637 with research_object.write_bag_file("file.txt") as fh:
638 assert fh.writable()
639 fh.write("Here is a snowman: \u2603 \n")
640
641
642 def test_writable_bytes(research_object: ResearchObject) -> None:
643 string = "Here is a snowman: \u2603 \n".encode()
644 with research_object.write_bag_file("file.txt", encoding=None) as fh:
645 fh.write(string) # type: ignore
646
647
648 def test_data(research_object: ResearchObject) -> None:
649 with research_object.write_bag_file("data/file.txt") as fh:
650 assert fh.writable()
651 fh.write("Hello\n")
652 # TODO: Check Windows does not modify \n to \r\n here
653
654 # Because this is under data/ it should add to manifest
655 # rather than tagmanifest
656 sha1 = os.path.join(research_object.folder, "manifest-sha1.txt")
657 assert os.path.isfile(sha1)
658 with open(sha1, encoding="UTF-8") as fh2:
659 stripped_sha = fh2.readline().strip()
660 assert stripped_sha.endswith("data/file.txt")
661
662
663 def test_not_seekable(research_object: ResearchObject) -> None:
664 with research_object.write_bag_file("file.txt") as fh:
665 assert not fh.seekable()
666 with pytest.raises(OSError):
667 fh.seek(0)
668
669
670 def test_not_readable(research_object: ResearchObject) -> None:
671 with research_object.write_bag_file("file.txt") as fh:
672 assert not fh.readable()
673 with pytest.raises(OSError):
674 fh.read()
675
676
677 def test_truncate_fails(research_object: ResearchObject) -> None:
678 with research_object.write_bag_file("file.txt") as fh:
679 fh.write("Hello there")
680 fh.truncate() # OK as we're always at end
681 # Will fail because the checksum can't rewind
682 with pytest.raises(OSError):
683 fh.truncate(0)
684
685
686 mod_validness = [
687 # Taken from "Some sample ORCID iDs" on
688 # https://support.orcid.org/knowledgebase/articles/116780-structure-of-the-orcid-identifier
689 ("0000-0002-1825-0097", True),
690 ("0000-0001-5109-3700", True),
691 ("0000-0002-1694-233X", True),
692 # dashes optional
693 ("0000000218250097", True),
694 ("0000000151093700", True),
695 ("000000021694233X", True),
696 # do not fail on missing digits
697 ("0002-1694-233X", True),
698 # Swap check-digits around to force error
699 ("0000-0002-1825-009X", False),
700 ("0000-0001-5109-3707", False),
701 ("0000-0002-1694-2330", False),
702 ]
703
704
705 @pytest.mark.parametrize("mod11,valid", mod_validness)
706 def test_check_mod_11_2(mod11: str, valid: bool) -> None:
707 assert provenance._check_mod_11_2(mod11) == valid
708
709
710 orcid_uris = [
711 # https://orcid.org/ (Expected form)
712 ("https://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"),
713 # orcid.org
714 ("http://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"),
715 # just the number
716 ("0000-0002-1825-0097", "https://orcid.org/0000-0002-1825-0097"),
717 # lower-case X is OK (and fixed)
718 ("https://orcid.org/0000-0002-1694-233x", "https://orcid.org/0000-0002-1694-233X"),
719 # upper-case ORCID.ORG is OK.. (and fixed)
720 ("https://ORCID.ORG/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"),
721 ]
722
723
724 @pytest.mark.parametrize("orcid,expected", orcid_uris)
725 def test_valid_orcid(orcid: str, expected: str) -> None:
726 assert provenance._valid_orcid(orcid) == expected
727
728
729 invalid_orcids = [
730 # missing digit fails (even if checksum is correct)
731 "0002-1694-2332",
732 # Wrong checkdigit fails
733 "https://orcid.org/0000-0002-1694-2332",
734 "0000-0002-1694-2332",
735 # Missing dashes fails (although that's OK for checksum)
736 "https://orcid.org/000000021694233X",
737 "000000021694233X",
738 # Wrong hostname fails
739 "https://example.org/0000-0002-1694-233X",
740 # Wrong protocol fails
741 "ftp://orcid.org/0000-0002-1694-233X",
742 # Trying to be clever fails (no URL parsing!)
743 "https://orcid.org:443/0000-0002-1694-233X",
744 "http://orcid.org:80/0000-0002-1694-233X",
745 # Empty string is not really valid
746 "",
747 ]
748
749
750 @pytest.mark.parametrize("orcid", invalid_orcids)
751 def test_invalid_orcid(orcid: str) -> None:
752 with pytest.raises(ValueError):
753 provenance._valid_orcid(orcid)
754
755
756 def test_whoami() -> None:
757 username, fullname = provenance._whoami()
758 assert username and isinstance(username, str)
759 assert fullname and isinstance(fullname, str)
760
761
762 def test_research_object() -> None:
763 # TODO: Test ResearchObject methods
764 pass
765
766
767 # Reasearch object may need to be pickled (for Toil)
768
769
770 def test_research_object_picklability(research_object: ResearchObject) -> None:
771 assert pickle.dumps(research_object) is not None