Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/tests/test_provenance.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/tests/test_provenance.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,683 +0,0 @@ -import json -import ntpath -import os -import posixpath -import shutil -import sys -import tempfile -from io import open -try: - import cPickle as pickle -except ImportError: - import pickle - -from six.moves import urllib - -import arcp -import pytest -from rdflib import Graph, Literal, Namespace, URIRef -from rdflib.namespace import DC, DCTERMS, RDF - -import bagit -# Module to be tested -from cwltool import load_tool, provenance -from cwltool.main import main -from cwltool.resolver import Path -from cwltool.context import RuntimeContext -from cwltool.stdfsaccess import StdFsAccess - -from .util import get_data, needs_docker, temp_dir, working_directory - -# RDF namespaces we'll query for later -ORE = Namespace("http://www.openarchives.org/ore/terms/") -PROV = Namespace("http://www.w3.org/ns/prov#") -RO = Namespace("http://purl.org/wf4ever/ro#") -WFDESC = Namespace("http://purl.org/wf4ever/wfdesc#") -WFPROV = Namespace("http://purl.org/wf4ever/wfprov#") -SCHEMA = Namespace("http://schema.org/") -CWLPROV = Namespace("https://w3id.org/cwl/prov#") -OA = Namespace("http://www.w3.org/ns/oa#") - - -@pytest.fixture -def folder(tmpdir): - directory = str(tmpdir) - if os.environ.get("DEBUG"): - print("%s folder: %s" % (__loader__.fullname, folder)) - yield directory - - if not os.environ.get("DEBUG"): - shutil.rmtree(directory) - - -def cwltool(folder, *args): - new_args = ['--provenance', folder] - new_args.extend(args) - # Run within a temporary directory to not pollute git checkout - with temp_dir("cwltool-run") as tmp_dir: - with working_directory(tmp_dir): - status = main(new_args) - assert status == 0, "Failed: cwltool.main(%r)" % (args) - -@needs_docker -def test_hello_workflow(folder): - cwltool(folder, get_data('tests/wf/hello-workflow.cwl'), "--usermessage", "Hello workflow") - check_provenance(folder) - -@needs_docker -def test_hello_single_tool(folder): - cwltool(folder, get_data('tests/wf/hello_single_tool.cwl'), "--message", "Hello tool") - check_provenance(folder, single_tool=True) - -@needs_docker -def test_revsort_workflow(folder): - cwltool(folder, get_data('tests/wf/revsort.cwl'), get_data('tests/wf/revsort-job.json')) - check_output_object(folder) - check_provenance(folder) - -@needs_docker -def test_nested_workflow(folder): - cwltool(folder, get_data('tests/wf/nested.cwl')) - check_provenance(folder, nested=True) - -@needs_docker -def test_secondary_files_implicit(folder, tmpdir): - file1 = tmpdir.join("foo1.txt") - file1idx = tmpdir.join("foo1.txt.idx") - - with open(str(file1), "w", encoding="ascii") as f: - f.write(u"foo") - with open(str(file1idx), "w", encoding="ascii") as f: - f.write(u"bar") - - # secondary will be picked up by .idx - cwltool(folder, get_data('tests/wf/sec-wf.cwl'), "--file1", str(file1)) - check_provenance(folder, secondary_files=True) - check_secondary_files(folder) - -@needs_docker -def test_secondary_files_explicit(folder, tmpdir): - orig_tempdir = tempfile.tempdir - tempfile.tempdir = str(tmpdir) - # Deliberately do NOT have common basename or extension - file1 = tempfile.mktemp("foo") - file1idx = tempfile.mktemp("bar") - - with open(file1, "w", encoding="ascii") as f: - f.write(u"foo") - with open(file1idx, "w", encoding="ascii") as f: - f.write(u"bar") - - # explicit secondaryFiles - job = {"file1": - {"class": "File", - "path": file1, - "basename": "foo1.txt", - "secondaryFiles": [ - { - "class": "File", - "path": file1idx, - "basename": "foo1.txt.idx", - } - ] - } - } - jobJson = tempfile.mktemp("job.json") - with open(jobJson, "wb") as fp: - j = json.dumps(job, ensure_ascii=True) - fp.write(j.encode("ascii")) - - cwltool(folder, get_data('tests/wf/sec-wf.cwl'), jobJson) - check_provenance(folder, secondary_files=True) - check_secondary_files(folder) - tempfile.tempdir = orig_tempdir - -@needs_docker -def test_secondary_files_output(folder): - # secondary will be picked up by .idx - cwltool(folder, get_data('tests/wf/sec-wf-out.cwl')) - check_provenance(folder, secondary_files=True) - # Skipped, not the same secondary files as above - #self.check_secondary_files() - -@needs_docker -def test_directory_workflow(folder, tmpdir): - dir2 = tmpdir.join("dir2") - os.makedirs(str(dir2)) - sha1 = { - # Expected hashes of ASCII letters (no linefeed) - # as returned from: - # for x in a b c ; do echo -n $x | sha1sum ; done - "a": "86f7e437faa5a7fce15d1ddcb9eaeaea377667b8", - "b": "e9d71f5ee7c92d6dc9e92ffdad17b8bd49418f98", - "c": "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4", - } - for x in u"abc": - # Make test files with predictable hashes - with open(str(dir2.join(x)), "w", encoding="ascii") as f: - f.write(x) - - cwltool(folder, get_data('tests/wf/directory.cwl'), "--dir", str(dir2)) - check_provenance(folder, directory=True) - - # Output should include ls stdout of filenames a b c on each line - file_list = os.path.join( - folder, "data", - # checksum as returned from: - # echo -e "a\nb\nc" | sha1sum - # 3ca69e8d6c234a469d16ac28a4a658c92267c423 - - "3c", - "3ca69e8d6c234a469d16ac28a4a658c92267c423") - assert os.path.isfile(file_list) - - # Input files should be captured by hash value, - # even if they were inside a class: Directory - for (l, l_hash) in sha1.items(): - prefix = l_hash[:2] # first 2 letters - p = os.path.join(folder, "data", prefix, l_hash) - assert os.path.isfile(p), "Could not find %s as %s" % (l, p) - -def check_output_object(base_path): - output_obj = os.path.join(base_path, "workflow", "primary-output.json") - compare_checksum = "sha1$b9214658cc453331b62c2282b772a5c063dbd284" - compare_location = "../data/b9/b9214658cc453331b62c2282b772a5c063dbd284" - with open(output_obj) as fp: - out_json = json.load(fp) - f1 = out_json["sorted_output"] - assert f1["checksum"] == compare_checksum - assert f1["location"] == compare_location - - -def check_secondary_files(base_path): - foo_data = os.path.join( - base_path, "data", - # checksum as returned from: - # $ echo -n foo | sha1sum - # 0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33 - - "0b", - "0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33") - bar_data = os.path.join( - base_path, "data", "62", "62cdb7020ff920e5aa642c3d4066950dd1f01f4d") - assert os.path.isfile(foo_data), "Did not capture file.txt 'foo'" - assert os.path.isfile(bar_data), "Did not capture secondary file.txt.idx 'bar" - - primary_job = os.path.join(base_path, "workflow", "primary-job.json") - with open(primary_job) as fp: - job_json = json.load(fp) - # TODO: Verify secondaryFile in primary-job.json - f1 = job_json["file1"] - assert f1["location"] == "../data/0b/0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33" - assert f1["basename"] == "foo1.txt" - - secondaries = f1["secondaryFiles"] - assert secondaries - f1idx = secondaries[0] - assert f1idx["location"] == "../data/62/62cdb7020ff920e5aa642c3d4066950dd1f01f4d" - assert f1idx["basename"], "foo1.txt.idx" - -def check_provenance(base_path, nested=False, single_tool=False, directory=False, - secondary_files=False): - check_folders(base_path) - check_bagit(base_path) - check_ro(base_path, nested=nested) - check_prov(base_path, nested=nested, single_tool=single_tool, directory=directory, - secondary_files=secondary_files) - -def check_folders(base_path): - required_folders = [ - "data", "snapshot", "workflow", "metadata", os.path.join("metadata", "provenance")] - - for folder in required_folders: - assert os.path.isdir(os.path.join(base_path, folder)) - -def check_bagit(base_path): - # check bagit structure - required_files = [ - "bagit.txt", "bag-info.txt", "manifest-sha1.txt", - "tagmanifest-sha1.txt", "tagmanifest-sha256.txt"] - - for basename in required_files: - file_path = os.path.join(base_path, basename) - assert os.path.isfile(file_path) - - bag = bagit.Bag(base_path) - assert bag.has_oxum() - (only_manifest, only_fs) = bag.compare_manifests_with_fs() - assert not list(only_manifest), "Some files only in manifest" - assert not list(only_fs), "Some files only on file system" - missing_tagfiles = bag.missing_optional_tagfiles() - assert not list(missing_tagfiles), "Some files only in tagmanifest" - bag.validate() - # TODO: Check other bag-info attributes - assert arcp.is_arcp_uri(bag.info.get("External-Identifier")) - -def find_arcp(base_path): - # First try to find External-Identifier - bag = bagit.Bag(base_path) - ext_id = bag.info.get("External-Identifier") - if arcp.is_arcp_uri(ext_id): - return ext_id - raise Exception("Can't find External-Identifier") - -def _arcp2file(base_path, uri): - parsed = arcp.parse_arcp(uri) - # arcp URIs, ensure they are local to our RO - assert parsed.uuid == arcp.parse_arcp(find_arcp(base_path)).uuid,\ - 'arcp URI must be local to the research object' - - path = parsed.path[1:] # Strip first / - # Convert to local path, in case it uses \ on Windows - lpath = str(Path(path)) - return os.path.join(base_path, lpath) - -def check_ro(base_path, nested=False): - manifest_file = os.path.join(base_path, "metadata", "manifest.json") - assert os.path.isfile(manifest_file), "Can't find " + manifest_file - arcp_root = find_arcp(base_path) - base = urllib.parse.urljoin(arcp_root, "metadata/manifest.json") - g = Graph() - - # Avoid resolving JSON-LD context https://w3id.org/bundle/context - # so this test works offline - context = Path(get_data("tests/bundle-context.jsonld")).as_uri() - with open(manifest_file, "r", encoding="UTF-8") as f: - jsonld = f.read() - # replace with file:/// URI - jsonld = jsonld.replace("https://w3id.org/bundle/context", context) - g.parse(data=jsonld, format="json-ld", publicID=base) - if os.environ.get("DEBUG"): - print("Parsed manifest:\n\n") - g.serialize(sys.stdout, format="ttl") - ro = None - - for ro in g.subjects(ORE.isDescribedBy, URIRef(base)): - break - assert ro is not None, "Can't find RO with ore:isDescribedBy" - - profile = None - for dc in g.objects(ro, DCTERMS.conformsTo): - profile = dc - break - assert profile is not None, "Can't find profile with dct:conformsTo" - assert profile == URIRef(provenance.CWLPROV_VERSION),\ - "Unexpected cwlprov version " + profile - - paths = [] - externals = [] - for aggregate in g.objects(ro, ORE.aggregates): - if not arcp.is_arcp_uri(aggregate): - externals.append(aggregate) - # Won't check external URIs existence here - # TODO: Check they are not relative! - continue - lfile = _arcp2file(base_path, aggregate) - paths.append(os.path.relpath(lfile, base_path)) - assert os.path.isfile(lfile), "Can't find aggregated " + lfile - - assert paths, "Didn't find any arcp aggregates" - assert externals, "Didn't find any data URIs" - - for ext in ["provn", "xml", "json", "jsonld", "nt", "ttl"]: - f = "metadata/provenance/primary.cwlprov.%s" % ext - assert f in paths, "provenance file missing " + f - - for f in ["workflow/primary-job.json", "workflow/packed.cwl", "workflow/primary-output.json"]: - assert f in paths, "workflow file missing " + f - # Can't test snapshot/ files directly as their name varies - - # TODO: check urn:hash::sha1 thingies - # TODO: Check OA annotations - - packed = urllib.parse.urljoin(arcp_root, "/workflow/packed.cwl") - primary_job = urllib.parse.urljoin(arcp_root, "/workflow/primary-job.json") - primary_prov_nt = urllib.parse.urljoin(arcp_root, "/metadata/provenance/primary.cwlprov.nt") - uuid = arcp.parse_arcp(arcp_root).uuid - - highlights = set(g.subjects(OA.motivatedBy, OA.highlighting)) - assert highlights, "Didn't find highlights" - for h in highlights: - assert (h, OA.hasTarget, URIRef(packed)) in g - - describes = set(g.subjects(OA.motivatedBy, OA.describing)) - for d in describes: - assert (d, OA.hasBody, URIRef(arcp_root)) in g - assert (d, OA.hasTarget, URIRef(uuid.urn)) in g - - linked = set(g.subjects(OA.motivatedBy, OA.linking)) - for l in linked: - assert (l, OA.hasBody, URIRef(packed)) in g - assert (l, OA.hasBody, URIRef(primary_job)) in g - assert (l, OA.hasTarget, URIRef(uuid.urn)) in g - - has_provenance = set(g.subjects(OA.hasBody, URIRef(primary_prov_nt))) - for p in has_provenance: - assert (p, OA.hasTarget, URIRef(uuid.urn)) in g - assert (p, OA.motivatedBy, PROV.has_provenance) in g - # Check all prov elements are listed - formats = set() - for prov in g.objects(p, OA.hasBody): - assert (prov, DCTERMS.conformsTo, URIRef(provenance.CWLPROV_VERSION)) in g - # NOTE: DC.format is a Namespace method and does not resolve like other terms - formats.update(set(g.objects(prov, DC["format"]))) - assert formats, "Could not find media types" - expected = set(Literal(f) for f in ( - "application/json", - "application/ld+json", - "application/n-triples", - 'text/provenance-notation; charset="UTF-8"', - 'text/turtle; charset="UTF-8"', - "application/xml" - )) - assert formats == expected, "Did not match expected PROV media types" - - if nested: - # Check for additional PROVs - # Let's try to find the other wf run ID - otherRuns = set() - for p in g.subjects(OA.motivatedBy, PROV.has_provenance): - if (p, OA.hasTarget, URIRef(uuid.urn)) in g: - continue - otherRuns.update(set(g.objects(p, OA.hasTarget))) - assert otherRuns, "Could not find nested workflow run prov annotations" - -def check_prov(base_path, nested=False, single_tool=False, directory=False, - secondary_files=False): - prov_file = os.path.join(base_path, "metadata", "provenance", "primary.cwlprov.nt") - assert os.path.isfile(prov_file), "Can't find " + prov_file - arcp_root = find_arcp(base_path) - # Note: We don't need to include metadata/provnance in base URI - # as .nt always use absolute URIs - g = Graph() - with open(prov_file, "rb") as f: - g.parse(file=f, format="nt", publicID=arcp_root) - if os.environ.get("DEBUG"): - print("Parsed %s:\n\n" % prov_file) - g.serialize(sys.stdout, format="ttl") - runs = set(g.subjects(RDF.type, WFPROV.WorkflowRun)) - - # master workflow run URI (as urn:uuid:) should correspond to arcp uuid part - uuid = arcp.parse_arcp(arcp_root).uuid - master_run = URIRef(uuid.urn) - assert master_run in runs, "Can't find run %s in %s" % (master_run, runs) - # TODO: we should not need to parse arcp, but follow - # the has_provenance annotations in manifest.json instead - - # run should have been started by a wf engine - - engines = set(g.subjects(RDF.type, WFPROV.WorkflowEngine)) - assert engines, "Could not find WorkflowEngine" - assert len(engines) == 1, "Found too many WorkflowEngines: %s" % engines - engine = engines.pop() - - assert (master_run, PROV.wasAssociatedWith, engine) in g, "Wf run not associated with wf engine" - assert (engine, RDF.type, PROV.SoftwareAgent) in g, "Engine not declared as SoftwareAgent" - - if single_tool: - activities = set(g.subjects(RDF.type, PROV.Activity)) - assert len(activities) == 1, "Too many activities: %s" % activities - # single tool exec, there should be no other activities - # than the tool run - # (NOTE: the WorkflowEngine is also activity, but not declared explicitly) - else: - # Check all process runs were started by the master worklow - stepActivities = set(g.subjects(RDF.type, WFPROV.ProcessRun)) - # Although semantically a WorkflowEngine is also a ProcessRun, - # we don't declare that, - # thus only the step activities should be in this set. - assert master_run not in stepActivities - assert stepActivities, "No steps executed in workflow" - for step in stepActivities: - # Let's check it was started by the master_run. Unfortunately, unlike PROV-N - # in PROV-O RDF we have to check through the n-ary qualifiedStart relation - starts = set(g.objects(step, PROV.qualifiedStart)) - assert starts, "Could not find qualifiedStart of step %s" % step - assert len(starts) == 1, "Too many qualifiedStart for step %s" % step - start = starts.pop() - assert (start, PROV.hadActivity, master_run) in g,\ - "Step activity not started by master activity" - # Tip: Any nested workflow step executions should not be in this prov file, - # but in separate file - if nested: - # Find some cwlprov.nt the nested workflow is described in - prov_ids = set(g.objects(predicate=PROV.has_provenance)) - # FIXME: The above is a bit naive and does not check the subject is - # one of the steps -- OK for now as this is the only case of prov:has_provenance - assert prov_ids, "Could not find prov:has_provenance from nested workflow" - - nt_uris = [uri for uri in prov_ids if uri.endswith("cwlprov.nt")] - # TODO: Look up manifest conformsTo and content-type rather than assuming magic filename - assert nt_uris, "Could not find *.cwlprov.nt" - # Load into new graph - g2 = Graph() - nt_uri = nt_uris.pop() - with open(_arcp2file(base_path, nt_uri), "rb") as f: - g2.parse(file=f, format="nt", publicID=nt_uri) - # TODO: Check g2 statements that it's the same UUID activity inside - # as in the outer step - if directory: - directories = set(g.subjects(RDF.type, RO.Folder)) - assert directories - - for d in directories: - assert (d, RDF.type, PROV.Dictionary) in g - assert (d, RDF.type, PROV.Collection) in g - assert(d, RDF.type, PROV.Entity) in g - - files = set() - for entry in g.objects(d, PROV.hadDictionaryMember): - assert (entry, RDF.type, PROV.KeyEntityPair) in g - # We don't check what that filename is here - assert set(g.objects(entry, PROV.pairKey)) - - # RO:Folder aspect - assert set(g.objects(entry, RO.entryName)) - assert (d, ORE.aggregates, entry) in g - assert (entry, RDF.type, RO.FolderEntry) in g - assert (entry, RDF.type, ORE.Proxy) in g - assert (entry, ORE.proxyIn, d) in g - assert (entry, ORE.proxyIn, d) in g - - # Which file? - entities = set(g.objects(entry, PROV.pairEntity)) - assert entities - f = entities.pop() - files.add(f) - assert (entry, ORE.proxyFor, f) in g - assert (f, RDF.type, PROV.Entity) in g - - if not files: - assert (d, RDF.type, PROV.EmptyCollection) in g - assert (d, RDF.type, PROV.EmptyDictionary) in g - if secondary_files: - derivations = set(g.subjects(RDF.type, CWLPROV.SecondaryFile)) - assert derivations - for der in derivations: - sec = set(g.subjects(PROV.qualifiedDerivation, der)).pop() - prim = set(g.objects(der, PROV.entity)).pop() - - # UUID specializes a hash checksum - assert set(g.objects(sec, PROV.specializationOf)) - # extensions etc. - sec_basename = set(g.objects(sec, CWLPROV.basename)).pop() - sec_nameroot = set(g.objects(sec, CWLPROV.nameroot)).pop() - sec_nameext = set(g.objects(sec, CWLPROV.nameext)).pop() - assert str(sec_basename) == "%s%s" % (sec_nameroot, sec_nameext) - # TODO: Check hash data file exist in RO - - # The primary entity should have the same, but different values - assert set(g.objects(prim, PROV.specializationOf)) - prim_basename = set(g.objects(prim, CWLPROV.basename)).pop() - prim_nameroot = set(g.objects(prim, CWLPROV.nameroot)).pop() - prim_nameext = set(g.objects(prim, CWLPROV.nameext)).pop() - assert str(prim_basename) == "%s%s" % (prim_nameroot, prim_nameext) - - -@pytest.fixture -def research_object(): - re_ob = provenance.ResearchObject(StdFsAccess('')) - yield re_ob - re_ob.close() - -def test_absolute_path_fails(research_object): - with pytest.raises(ValueError): - research_object.write_bag_file("/absolute/path/fails") - -def test_climboutfails(research_object): - with pytest.raises(ValueError): - research_object.write_bag_file("../../outside-ro") - -def test_writable_string(research_object): - with research_object.write_bag_file("file.txt") as file: - assert file.writable() - file.write(u"Hello\n") - # TODO: Check Windows does not modify \n to \r\n here - - sha1 = os.path.join(research_object.folder, "tagmanifest-sha1.txt") - assert os.path.isfile(sha1) - - with open(sha1, "r", encoding="UTF-8") as sha_file: - stripped_sha = sha_file.readline().strip() - assert stripped_sha.endswith("file.txt") - #stain@biggie:~/src/cwltool$ echo Hello | sha1sum - #1d229271928d3f9e2bb0375bd6ce5db6c6d348d9 - - assert stripped_sha.startswith("1d229271928d3f9e2bb0375bd6ce5db6c6d348d9") - - sha256 = os.path.join(research_object.folder, "tagmanifest-sha256.txt") - assert os.path.isfile(sha256) - - with open(sha256, "r", encoding="UTF-8") as sha_file: - stripped_sha = sha_file.readline().strip() - - assert stripped_sha.endswith("file.txt") - #stain@biggie:~/src/cwltool$ echo Hello | sha256sum - #66a045b452102c59d840ec097d59d9467e13a3f34f6494e539ffd32c1bb35f18 - - assert stripped_sha.startswith("66a045b452102c59d840ec097d59d9467e13a3f34f6494e539ffd32c1bb35f18") - - sha512 = os.path.join(research_object.folder, "tagmanifest-sha512.txt") - assert os.path.isfile(sha512) - -def test_writable_unicode_string(research_object): - with research_object.write_bag_file("file.txt") as file: - assert file.writable() - file.write(u"Here is a snowman: \u2603 \n") - -def test_writable_bytes(research_object): - string = u"Here is a snowman: \u2603 \n".encode("UTF-8") - with research_object.write_bag_file("file.txt", encoding=None) as file: - file.write(string) - -def test_data(research_object): - with research_object.write_bag_file("data/file.txt") as file: - assert file.writable() - file.write(u"Hello\n") - # TODO: Check Windows does not modify \n to \r\n here - - # Because this is under data/ it should add to manifest - # rather than tagmanifest - sha1 = os.path.join(research_object.folder, "manifest-sha1.txt") - assert os.path.isfile(sha1) - with open(sha1, "r", encoding="UTF-8") as file: - stripped_sha = file.readline().strip() - assert stripped_sha.endswith("data/file.txt") - -def test_not_seekable(research_object): - with research_object.write_bag_file("file.txt") as file: - assert not file.seekable() - with pytest.raises(IOError): - file.seek(0) - -def test_not_readable(research_object): - with research_object.write_bag_file("file.txt") as file: - assert not file.readable() - with pytest.raises(IOError): - file.read() - -def test_truncate_fails(research_object): - with research_object.write_bag_file("file.txt") as file: - file.write(u"Hello there") - file.truncate() # OK as we're always at end - # Will fail because the checksum can't rewind - with pytest.raises(IOError): - file.truncate(0) - - -mod_validness = [ - # Taken from "Some sample ORCID iDs" on - # https://support.orcid.org/knowledgebase/articles/116780-structure-of-the-orcid-identifier - ("0000-0002-1825-0097", True), - ("0000-0001-5109-3700", True), - ("0000-0002-1694-233X", True), - # dashes optional - ("0000000218250097", True), - ("0000000151093700", True), - ("000000021694233X", True), - # do not fail on missing digits - ("0002-1694-233X", True), - # Swap check-digits around to force error - ("0000-0002-1825-009X", False), - ("0000-0001-5109-3707", False), - ("0000-0002-1694-2330", False) -] - -@pytest.mark.parametrize('mod11,valid', mod_validness) -def test_check_mod_11_2(mod11, valid): - assert provenance._check_mod_11_2(mod11) == valid - - -orcid_uris = [ - # https://orcid.org/ (Expected form) - ("https://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"), - # orcid.org - ("http://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"), - # just the number - ("0000-0002-1825-0097", "https://orcid.org/0000-0002-1825-0097"), - # lower-case X is OK (and fixed) - ("https://orcid.org/0000-0002-1694-233x", "https://orcid.org/0000-0002-1694-233X"), - # upper-case ORCID.ORG is OK.. (and fixed) - ("https://ORCID.ORG/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"), - # Unicode string (Python 2) - (u"https://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X") -] - -@pytest.mark.parametrize('orcid,expected', orcid_uris) -def test_valid_orcid(orcid, expected): - assert provenance._valid_orcid(orcid) == expected - - -invalid_orcids = [ - # missing digit fails (even if checksum is correct) - "0002-1694-2332", - # Wrong checkdigit fails - "https://orcid.org/0000-0002-1694-2332", - "0000-0002-1694-2332", - # Missing dashes fails (although that's OK for checksum) - "https://orcid.org/000000021694233X", - "000000021694233X", - # Wrong hostname fails - "https://example.org/0000-0002-1694-233X", - # Wrong protocol fails - "ftp://orcid.org/0000-0002-1694-233X", - # Trying to be clever fails (no URL parsing!) - "https://orcid.org:443/0000-0002-1694-233X", - "http://orcid.org:80/0000-0002-1694-233X", - # Empty string is not really valid - "" -] - -@pytest.mark.parametrize('orcid', invalid_orcids) -def test_invalid_orcid(orcid): - with pytest.raises(ValueError): - provenance._valid_orcid(orcid) - -def test_whoami(): - username, fullname = provenance._whoami() - assert username and isinstance(username, str) - assert fullname and isinstance(fullname, str) - -def test_research_object(): - # TODO: Test ResearchObject methods - pass - -# Reasearch object may need to be pickled (for Toil) -def test_research_object_picklability(research_object): - assert pickle.dumps(research_object) is not None
