view env/lib/python3.9/site-packages/cwltool/tests/test_provenance.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

import json
import os
import pickle
import sys
import urllib
from pathlib import Path
from typing import Any, Generator

import arcp
import bagit
import pytest
from rdflib import Graph, Namespace, URIRef
from rdflib.namespace import DC, DCTERMS, RDF
from rdflib.term import Literal

from cwltool import provenance, provenance_constants
from cwltool.main import main
from cwltool.provenance import ResearchObject
from cwltool.stdfsaccess import StdFsAccess

from .util import get_data, needs_docker, working_directory

# RDF namespaces we'll query for later
ORE = Namespace("http://www.openarchives.org/ore/terms/")
PROV = Namespace("http://www.w3.org/ns/prov#")
RO = Namespace("http://purl.org/wf4ever/ro#")
WFDESC = Namespace("http://purl.org/wf4ever/wfdesc#")
WFPROV = Namespace("http://purl.org/wf4ever/wfprov#")
SCHEMA = Namespace("http://schema.org/")
CWLPROV = Namespace("https://w3id.org/cwl/prov#")
OA = Namespace("http://www.w3.org/ns/oa#")


def cwltool(tmp_path: Path, *args: Any) -> Path:
    prov_folder = tmp_path / "provenance"
    prov_folder.mkdir()
    new_args = ["--provenance", str(prov_folder)]
    new_args.extend(args)
    # Run within a temporary directory to not pollute git checkout
    tmp_dir = tmp_path / "cwltool-run"
    tmp_dir.mkdir()
    with working_directory(tmp_dir):
        status = main(new_args)
        assert status == 0, "Failed: cwltool.main(%r)" % (args)
    return prov_folder


@needs_docker
def test_hello_workflow(tmp_path: Path) -> None:
    check_provenance(
        cwltool(
            tmp_path,
            get_data("tests/wf/hello-workflow.cwl"),
            "--usermessage",
            "Hello workflow",
        )
    )


@needs_docker
def test_hello_single_tool(tmp_path: Path) -> None:
    check_provenance(
        cwltool(
            tmp_path,
            get_data("tests/wf/hello_single_tool.cwl"),
            "--message",
            "Hello tool",
        ),
        single_tool=True,
    )


@needs_docker
def test_revsort_workflow(tmp_path: Path) -> None:
    folder = cwltool(
        tmp_path,
        get_data("tests/wf/revsort.cwl"),
        get_data("tests/wf/revsort-job.json"),
    )
    check_output_object(folder)
    check_provenance(folder)


@needs_docker
def test_nested_workflow(tmp_path: Path) -> None:
    check_provenance(cwltool(tmp_path, get_data("tests/wf/nested.cwl")), nested=True)


@needs_docker
def test_secondary_files_implicit(tmp_path: Path) -> None:
    file1 = tmp_path / "foo1.txt"
    file1idx = tmp_path / "foo1.txt.idx"

    with open(str(file1), "w", encoding="ascii") as f:
        f.write("foo")
    with open(str(file1idx), "w", encoding="ascii") as f:
        f.write("bar")

    # secondary will be picked up by .idx
    folder = cwltool(tmp_path, get_data("tests/wf/sec-wf.cwl"), "--file1", str(file1))
    check_provenance(folder, secondary_files=True)
    check_secondary_files(folder)


@needs_docker
def test_secondary_files_explicit(tmp_path: Path) -> None:
    # Deliberately do NOT have common basename or extension
    file1dir = tmp_path / "foo"
    file1dir.mkdir()
    file1 = file1dir / "foo"
    file1idxdir = tmp_path / "bar"
    file1idxdir.mkdir()
    file1idx = file1idxdir / "bar"

    with open(file1, "w", encoding="ascii") as f:
        f.write("foo")
    with open(file1idx, "w", encoding="ascii") as f:
        f.write("bar")

    # explicit secondaryFiles
    job = {
        "file1": {
            "class": "File",
            "path": str(file1),
            "basename": "foo1.txt",
            "secondaryFiles": [
                {
                    "class": "File",
                    "path": str(file1idx),
                    "basename": "foo1.txt.idx",
                }
            ],
        }
    }

    jobJson = tmp_path / "job.json"
    with open(jobJson, "wb") as fp:
        j = json.dumps(job, ensure_ascii=True)
        fp.write(j.encode("ascii"))

    folder = cwltool(tmp_path, get_data("tests/wf/sec-wf.cwl"), str(jobJson))
    check_provenance(folder, secondary_files=True)
    check_secondary_files(folder)


@needs_docker
def test_secondary_files_output(tmp_path: Path) -> None:
    # secondary will be picked up by .idx
    folder = cwltool(tmp_path, get_data("tests/wf/sec-wf-out.cwl"))
    check_provenance(folder, secondary_files=True)
    # Skipped, not the same secondary files as above
    # self.check_secondary_files()


@needs_docker
def test_directory_workflow(tmp_path: Path) -> None:
    dir2 = tmp_path / "dir2"
    dir2.mkdir()
    sha1 = {
        # Expected hashes of ASCII letters (no linefeed)
        # as returned from:
        # for x in a b c ; do echo -n $x | sha1sum ; done
        "a": "86f7e437faa5a7fce15d1ddcb9eaeaea377667b8",
        "b": "e9d71f5ee7c92d6dc9e92ffdad17b8bd49418f98",
        "c": "84a516841ba77a5b4648de2cd0dfcb30ea46dbb4",
    }
    for x in "abc":
        # Make test files with predictable hashes
        with open(dir2 / x, "w", encoding="ascii") as f:
            f.write(x)

    folder = cwltool(tmp_path, get_data("tests/wf/directory.cwl"), "--dir", str(dir2))
    check_provenance(folder, directory=True)

    # Output should include ls stdout of filenames a b c on each line
    file_list = (
        folder
        / "data"
        / "3c"
        / "3ca69e8d6c234a469d16ac28a4a658c92267c423"
        # checksum as returned from:
        # echo -e "a\nb\nc" | sha1sum
        # 3ca69e8d6c234a469d16ac28a4a658c92267c423  -
    )
    assert file_list.is_file()

    # Input files should be captured by hash value,
    # even if they were inside a class: Directory
    for (l, l_hash) in sha1.items():
        prefix = l_hash[:2]  # first 2 letters
        p = folder / "data" / prefix / l_hash
        assert p.is_file(), f"Could not find {l} as {p}"


def check_output_object(base_path: Path) -> None:
    output_obj = base_path / "workflow" / "primary-output.json"
    compare_checksum = "sha1$b9214658cc453331b62c2282b772a5c063dbd284"
    compare_location = "../data/b9/b9214658cc453331b62c2282b772a5c063dbd284"
    with open(output_obj) as fp:
        out_json = json.load(fp)
    f1 = out_json["sorted_output"]
    assert f1["checksum"] == compare_checksum
    assert f1["location"] == compare_location


def check_secondary_files(base_path: Path) -> None:
    foo_data = (
        base_path
        / "data"
        / "0b"
        / "0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33"
        # checksum as returned from:
        # $ echo -n foo | sha1sum
        # 0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33  -
    )
    bar_data = base_path / "data" / "62" / "62cdb7020ff920e5aa642c3d4066950dd1f01f4d"
    assert foo_data.is_file(), "Did not capture file.txt 'foo'"
    assert bar_data.is_file(), "Did not capture secondary file.txt.idx 'bar"

    primary_job = base_path / "workflow" / "primary-job.json"
    with open(primary_job) as fp:
        job_json = json.load(fp)
    # TODO: Verify secondaryFile in primary-job.json
    f1 = job_json["file1"]
    assert f1["location"] == "../data/0b/0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33"
    assert f1["basename"] == "foo1.txt"

    secondaries = f1["secondaryFiles"]
    assert secondaries
    f1idx = secondaries[0]
    assert f1idx["location"] == "../data/62/62cdb7020ff920e5aa642c3d4066950dd1f01f4d"
    assert f1idx["basename"], "foo1.txt.idx"


def check_provenance(
    base_path: Path,
    nested: bool = False,
    single_tool: bool = False,
    directory: bool = False,
    secondary_files: bool = False,
) -> None:
    check_folders(base_path)
    check_bagit(base_path)
    check_ro(base_path, nested=nested)
    check_prov(
        base_path,
        nested=nested,
        single_tool=single_tool,
        directory=directory,
        secondary_files=secondary_files,
    )


def check_folders(base_path: Path) -> None:
    required_folders = [
        "data",
        "snapshot",
        "workflow",
        "metadata",
        os.path.join("metadata", "provenance"),
    ]

    for folder in required_folders:
        assert (base_path / folder).is_dir()


def check_bagit(base_path: Path) -> None:
    # check bagit structure
    required_files = [
        "bagit.txt",
        "bag-info.txt",
        "manifest-sha1.txt",
        "tagmanifest-sha1.txt",
        "tagmanifest-sha256.txt",
    ]

    for basename in required_files:
        assert (base_path / basename).is_file()

    bag = bagit.Bag(str(base_path))
    assert bag.has_oxum()
    (only_manifest, only_fs) = bag.compare_manifests_with_fs()
    assert not list(only_manifest), "Some files only in manifest"
    assert not list(only_fs), "Some files only on file system"
    missing_tagfiles = bag.missing_optional_tagfiles()
    assert not list(missing_tagfiles), "Some files only in tagmanifest"
    bag.validate()
    # TODO: Check other bag-info attributes
    assert arcp.is_arcp_uri(bag.info.get("External-Identifier"))


def find_arcp(base_path: Path) -> str:
    # First try to find External-Identifier
    bag = bagit.Bag(str(base_path))
    ext_id = bag.info.get("External-Identifier")
    if arcp.is_arcp_uri(ext_id):
        return str(ext_id)
    raise Exception("Can't find External-Identifier")


def _arcp2file(base_path: Path, uri: str) -> Path:
    parsed = arcp.parse_arcp(uri)
    # arcp URIs, ensure they are local to our RO
    assert (
        parsed.uuid == arcp.parse_arcp(find_arcp(base_path)).uuid
    ), "arcp URI must be local to the research object"

    path = parsed.path[1:]  # Strip first /
    # Convert to local path, in case it uses \ on Windows
    return base_path / Path(path)


def check_ro(base_path: Path, nested: bool = False) -> None:
    manifest_file = base_path / "metadata" / "manifest.json"
    assert manifest_file.is_file(), f"Can't find {manifest_file}"
    arcp_root = find_arcp(base_path)
    base = urllib.parse.urljoin(arcp_root, "metadata/manifest.json")
    g = Graph()

    # Avoid resolving JSON-LD context https://w3id.org/bundle/context
    # so this test works offline
    context = Path(get_data("tests/bundle-context.jsonld")).as_uri()
    with open(manifest_file, encoding="UTF-8") as fh:
        jsonld = fh.read()
        # replace with file:/// URI
        jsonld = jsonld.replace("https://w3id.org/bundle/context", context)
    g.parse(data=jsonld, format="json-ld", publicID=base)
    if os.environ.get("DEBUG"):
        print("Parsed manifest:\n\n")
        g.serialize(sys.stdout, format="ttl")
    _ro = None

    for _ro in g.subjects(ORE.isDescribedBy, URIRef(base)):
        break
    assert _ro is not None, "Can't find RO with ore:isDescribedBy"

    profile = None
    for dc in g.objects(_ro, DCTERMS.conformsTo):
        profile = dc
        break
    assert profile is not None, "Can't find profile with dct:conformsTo"
    assert profile == URIRef(provenance_constants.CWLPROV_VERSION), (
        "Unexpected cwlprov version " + profile
    )

    paths = []
    externals = []
    for aggregate in g.objects(_ro, ORE.aggregates):
        if not arcp.is_arcp_uri(aggregate):
            externals.append(aggregate)
            # Won't check external URIs existence here
            # TODO: Check they are not relative!
            continue
        lfile = _arcp2file(base_path, aggregate)
        paths.append(os.path.relpath(lfile, base_path))
        assert os.path.isfile(lfile), f"Can't find aggregated {lfile}"

    assert paths, "Didn't find any arcp aggregates"
    assert externals, "Didn't find any data URIs"

    for ext in ["provn", "xml", "json", "jsonld", "nt", "ttl"]:
        f = "metadata/provenance/primary.cwlprov.%s" % ext
        assert f in paths, "provenance file missing " + f

    for f in [
        "workflow/primary-job.json",
        "workflow/packed.cwl",
        "workflow/primary-output.json",
    ]:
        assert f in paths, "workflow file missing " + f
    # Can't test snapshot/ files directly as their name varies

    # TODO: check urn:hash::sha1 thingies
    # TODO: Check OA annotations

    packed = urllib.parse.urljoin(arcp_root, "/workflow/packed.cwl")
    primary_job = urllib.parse.urljoin(arcp_root, "/workflow/primary-job.json")
    primary_prov_nt = urllib.parse.urljoin(
        arcp_root, "/metadata/provenance/primary.cwlprov.nt"
    )
    uuid = arcp.parse_arcp(arcp_root).uuid

    highlights = set(g.subjects(OA.motivatedBy, OA.highlighting))
    assert highlights, "Didn't find highlights"
    for h in highlights:
        assert (h, OA.hasTarget, URIRef(packed)) in g

    describes = set(g.subjects(OA.motivatedBy, OA.describing))
    for d in describes:
        assert (d, OA.hasBody, URIRef(arcp_root)) in g
        assert (d, OA.hasTarget, URIRef(uuid.urn)) in g

    linked = set(g.subjects(OA.motivatedBy, OA.linking))
    for link in linked:
        assert (link, OA.hasBody, URIRef(packed)) in g
        assert (link, OA.hasBody, URIRef(primary_job)) in g
        assert (link, OA.hasTarget, URIRef(uuid.urn)) in g

    has_provenance = set(g.subjects(OA.hasBody, URIRef(primary_prov_nt)))
    for p in has_provenance:
        assert (p, OA.hasTarget, URIRef(uuid.urn)) in g
        assert (p, OA.motivatedBy, PROV.has_provenance) in g
        # Check all prov elements are listed
        formats = set()
        for prov in g.objects(p, OA.hasBody):
            assert (
                prov,
                DCTERMS.conformsTo,
                URIRef(provenance_constants.CWLPROV_VERSION),
            ) in g
            # NOTE: DC.format is a Namespace method and does not resolve like other terms
            formats.update(set(g.objects(prov, DC["format"])))
        assert formats, "Could not find media types"
        expected = {
            Literal(f)
            for f in (
                "application/json",
                "application/ld+json",
                "application/n-triples",
                'text/provenance-notation; charset="UTF-8"',
                'text/turtle; charset="UTF-8"',
                "application/xml",
            )
        }
        assert formats == expected, "Did not match expected PROV media types"

    if nested:
        # Check for additional PROVs
        # Let's try to find the other wf run ID
        otherRuns = set()
        for p in g.subjects(OA.motivatedBy, PROV.has_provenance):
            if (p, OA.hasTarget, URIRef(uuid.urn)) in g:
                continue
            otherRuns.update(set(g.objects(p, OA.hasTarget)))
        assert otherRuns, "Could not find nested workflow run prov annotations"


def check_prov(
    base_path: Path,
    nested: bool = False,
    single_tool: bool = False,
    directory: bool = False,
    secondary_files: bool = False,
) -> None:
    prov_file = base_path / "metadata" / "provenance" / "primary.cwlprov.nt"
    assert prov_file.is_file(), f"Can't find {prov_file}"
    arcp_root = find_arcp(base_path)
    # Note: We don't need to include metadata/provnance in base URI
    # as .nt always use absolute URIs
    g = Graph()
    with open(prov_file, "rb") as f:
        g.parse(file=f, format="nt", publicID=arcp_root)
    if os.environ.get("DEBUG"):
        print("Parsed %s:\n\n" % prov_file)
        g.serialize(sys.stdout, format="ttl")
    runs = set(g.subjects(RDF.type, WFPROV.WorkflowRun))

    # main workflow run URI (as urn:uuid:) should correspond to arcp uuid part
    uuid = arcp.parse_arcp(arcp_root).uuid
    main_run = URIRef(uuid.urn)
    assert main_run in runs, f"Can't find run {main_run} in {runs}"
    # TODO: we should not need to parse arcp, but follow
    # the has_provenance annotations in manifest.json instead

    # run should have been started by a wf engine

    engines = set(g.subjects(RDF.type, WFPROV.WorkflowEngine))
    assert engines, "Could not find WorkflowEngine"
    assert len(engines) == 1, "Found too many WorkflowEngines: %s" % engines
    engine = engines.pop()

    assert (
        main_run,
        PROV.wasAssociatedWith,
        engine,
    ) in g, "Wf run not associated with wf engine"
    assert (
        engine,
        RDF.type,
        PROV.SoftwareAgent,
    ) in g, "Engine not declared as SoftwareAgent"

    if single_tool:
        activities = set(g.subjects(RDF.type, PROV.Activity))
        assert len(activities) == 1, "Too many activities: %s" % activities
        # single tool exec, there should be no other activities
        # than the tool run
        # (NOTE: the WorkflowEngine is also activity, but not declared explicitly)
    else:
        # Check all process runs were started by the main worklow
        stepActivities = set(g.subjects(RDF.type, WFPROV.ProcessRun))
        # Although semantically a WorkflowEngine is also a ProcessRun,
        # we don't declare that,
        # thus only the step activities should be in this set.
        assert main_run not in stepActivities
        assert stepActivities, "No steps executed in workflow"
        for step in stepActivities:
            # Let's check it was started by the main_run. Unfortunately, unlike PROV-N
            # in PROV-O RDF we have to check through the n-ary qualifiedStart relation
            starts = set(g.objects(step, PROV.qualifiedStart))
            assert starts, "Could not find qualifiedStart of step %s" % step
            assert len(starts) == 1, "Too many qualifiedStart for step %s" % step
            start = starts.pop()
            assert (
                start,
                PROV.hadActivity,
                main_run,
            ) in g, "Step activity not started by main activity"
            # Tip: Any nested workflow step executions should not be in this prov file,
            # but in separate file
    if nested:
        # Find some cwlprov.nt the nested workflow is described in
        prov_ids = set(g.objects(predicate=PROV.has_provenance))
        # FIXME: The above is a bit naive and does not check the subject is
        # one of the steps -- OK for now as this is the only case of prov:has_provenance
        assert prov_ids, "Could not find prov:has_provenance from nested workflow"

        nt_uris = [uri for uri in prov_ids if uri.endswith("cwlprov.nt")]
        # TODO: Look up manifest conformsTo and content-type rather than assuming magic filename
        assert nt_uris, "Could not find *.cwlprov.nt"
        # Load into new graph
        g2 = Graph()
        nt_uri = nt_uris.pop()
        with open(_arcp2file(base_path, nt_uri), "rb") as f:
            g2.parse(file=f, format="nt", publicID=nt_uri)
        # TODO: Check g2 statements that it's the same UUID activity inside
        # as in the outer step
    if directory:
        directories = set(g.subjects(RDF.type, RO.Folder))
        assert directories

        for d in directories:
            assert (d, RDF.type, PROV.Dictionary) in g
            assert (d, RDF.type, PROV.Collection) in g
            assert (d, RDF.type, PROV.Entity) in g

            files = set()
            for entry in g.objects(d, PROV.hadDictionaryMember):
                assert (entry, RDF.type, PROV.KeyEntityPair) in g
                # We don't check what that filename is here
                assert set(g.objects(entry, PROV.pairKey))

                # RO:Folder aspect
                assert set(g.objects(entry, RO.entryName))
                assert (d, ORE.aggregates, entry) in g
                assert (entry, RDF.type, RO.FolderEntry) in g
                assert (entry, RDF.type, ORE.Proxy) in g
                assert (entry, ORE.proxyIn, d) in g
                assert (entry, ORE.proxyIn, d) in g

                # Which file?
                entities = set(g.objects(entry, PROV.pairEntity))
                assert entities
                ef = entities.pop()
                files.add(ef)
                assert (entry, ORE.proxyFor, ef) in g
                assert (ef, RDF.type, PROV.Entity) in g

            if not files:
                assert (d, RDF.type, PROV.EmptyCollection) in g
                assert (d, RDF.type, PROV.EmptyDictionary) in g
    if secondary_files:
        derivations = set(g.subjects(RDF.type, CWLPROV.SecondaryFile))
        assert derivations
        for der in derivations:
            sec = set(g.subjects(PROV.qualifiedDerivation, der)).pop()
            prim = set(g.objects(der, PROV.entity)).pop()

            # UUID specializes a hash checksum
            assert set(g.objects(sec, PROV.specializationOf))
            # extensions etc.
            sec_basename = set(g.objects(sec, CWLPROV.basename)).pop()
            sec_nameroot = set(g.objects(sec, CWLPROV.nameroot)).pop()
            sec_nameext = set(g.objects(sec, CWLPROV.nameext)).pop()
            assert str(sec_basename) == f"{sec_nameroot}{sec_nameext}"
            # TODO: Check hash data file exist in RO

            # The primary entity should have the same, but different values
            assert set(g.objects(prim, PROV.specializationOf))
            prim_basename = set(g.objects(prim, CWLPROV.basename)).pop()
            prim_nameroot = set(g.objects(prim, CWLPROV.nameroot)).pop()
            prim_nameext = set(g.objects(prim, CWLPROV.nameext)).pop()
            assert str(prim_basename) == f"{prim_nameroot}{prim_nameext}"


@pytest.fixture
def research_object() -> Generator[ResearchObject, None, None]:
    re_ob = ResearchObject(StdFsAccess(""))
    yield re_ob
    re_ob.close()


def test_absolute_path_fails(research_object: ResearchObject) -> None:
    with pytest.raises(ValueError):
        research_object.write_bag_file("/absolute/path/fails")


def test_climboutfails(research_object: ResearchObject) -> None:
    with pytest.raises(ValueError):
        research_object.write_bag_file("../../outside-ro")


def test_writable_string(research_object: ResearchObject) -> None:
    with research_object.write_bag_file("file.txt") as fh:
        assert fh.writable()
        fh.write("Hello\n")
        # TODO: Check Windows does not modify \n to \r\n here

    sha1 = os.path.join(research_object.folder, "tagmanifest-sha1.txt")
    assert os.path.isfile(sha1)

    with open(sha1, encoding="UTF-8") as sha_file:
        stripped_sha = sha_file.readline().strip()
    assert stripped_sha.endswith("file.txt")
    # stain@biggie:~/src/cwltool$ echo Hello | sha1sum
    # 1d229271928d3f9e2bb0375bd6ce5db6c6d348d9  -
    assert stripped_sha.startswith("1d229271928d3f9e2bb0375bd6ce5db6c6d348d9")

    sha256 = os.path.join(research_object.folder, "tagmanifest-sha256.txt")
    assert os.path.isfile(sha256)

    with open(sha256, encoding="UTF-8") as sha_file:
        stripped_sha = sha_file.readline().strip()

    assert stripped_sha.endswith("file.txt")
    # stain@biggie:~/src/cwltool$ echo Hello | sha256sum
    # 66a045b452102c59d840ec097d59d9467e13a3f34f6494e539ffd32c1bb35f18  -
    assert stripped_sha.startswith(
        "66a045b452102c59d840ec097d59d9467e13a3f34f6494e539ffd32c1bb35f18"
    )

    sha512 = os.path.join(research_object.folder, "tagmanifest-sha512.txt")
    assert os.path.isfile(sha512)


def test_writable_unicode_string(research_object: ResearchObject) -> None:
    with research_object.write_bag_file("file.txt") as fh:
        assert fh.writable()
        fh.write("Here is a snowman: \u2603 \n")


def test_writable_bytes(research_object: ResearchObject) -> None:
    string = "Here is a snowman: \u2603 \n".encode()
    with research_object.write_bag_file("file.txt", encoding=None) as fh:
        fh.write(string)  # type: ignore


def test_data(research_object: ResearchObject) -> None:
    with research_object.write_bag_file("data/file.txt") as fh:
        assert fh.writable()
        fh.write("Hello\n")
    # TODO: Check Windows does not modify \n to \r\n here

    # Because this is under data/ it should add to manifest
    # rather than tagmanifest
    sha1 = os.path.join(research_object.folder, "manifest-sha1.txt")
    assert os.path.isfile(sha1)
    with open(sha1, encoding="UTF-8") as fh2:
        stripped_sha = fh2.readline().strip()
        assert stripped_sha.endswith("data/file.txt")


def test_not_seekable(research_object: ResearchObject) -> None:
    with research_object.write_bag_file("file.txt") as fh:
        assert not fh.seekable()
        with pytest.raises(OSError):
            fh.seek(0)


def test_not_readable(research_object: ResearchObject) -> None:
    with research_object.write_bag_file("file.txt") as fh:
        assert not fh.readable()
        with pytest.raises(OSError):
            fh.read()


def test_truncate_fails(research_object: ResearchObject) -> None:
    with research_object.write_bag_file("file.txt") as fh:
        fh.write("Hello there")
        fh.truncate()  # OK as we're always at end
        # Will fail because the checksum can't rewind
        with pytest.raises(OSError):
            fh.truncate(0)


mod_validness = [
    # Taken from "Some sample ORCID iDs" on
    # https://support.orcid.org/knowledgebase/articles/116780-structure-of-the-orcid-identifier
    ("0000-0002-1825-0097", True),
    ("0000-0001-5109-3700", True),
    ("0000-0002-1694-233X", True),
    # dashes optional
    ("0000000218250097", True),
    ("0000000151093700", True),
    ("000000021694233X", True),
    # do not fail on missing digits
    ("0002-1694-233X", True),
    # Swap check-digits around to force error
    ("0000-0002-1825-009X", False),
    ("0000-0001-5109-3707", False),
    ("0000-0002-1694-2330", False),
]


@pytest.mark.parametrize("mod11,valid", mod_validness)
def test_check_mod_11_2(mod11: str, valid: bool) -> None:
    assert provenance._check_mod_11_2(mod11) == valid


orcid_uris = [
    # https://orcid.org/ (Expected form)
    ("https://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"),
    # orcid.org
    ("http://orcid.org/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"),
    # just the number
    ("0000-0002-1825-0097", "https://orcid.org/0000-0002-1825-0097"),
    # lower-case X is OK (and fixed)
    ("https://orcid.org/0000-0002-1694-233x", "https://orcid.org/0000-0002-1694-233X"),
    # upper-case ORCID.ORG is OK.. (and fixed)
    ("https://ORCID.ORG/0000-0002-1694-233X", "https://orcid.org/0000-0002-1694-233X"),
]


@pytest.mark.parametrize("orcid,expected", orcid_uris)
def test_valid_orcid(orcid: str, expected: str) -> None:
    assert provenance._valid_orcid(orcid) == expected


invalid_orcids = [
    # missing digit fails (even if checksum is correct)
    "0002-1694-2332",
    # Wrong checkdigit fails
    "https://orcid.org/0000-0002-1694-2332",
    "0000-0002-1694-2332",
    # Missing dashes fails (although that's OK for checksum)
    "https://orcid.org/000000021694233X",
    "000000021694233X",
    # Wrong hostname fails
    "https://example.org/0000-0002-1694-233X",
    # Wrong protocol fails
    "ftp://orcid.org/0000-0002-1694-233X",
    # Trying to be clever fails (no URL parsing!)
    "https://orcid.org:443/0000-0002-1694-233X",
    "http://orcid.org:80/0000-0002-1694-233X",
    # Empty string is not really valid
    "",
]


@pytest.mark.parametrize("orcid", invalid_orcids)
def test_invalid_orcid(orcid: str) -> None:
    with pytest.raises(ValueError):
        provenance._valid_orcid(orcid)


def test_whoami() -> None:
    username, fullname = provenance._whoami()
    assert username and isinstance(username, str)
    assert fullname and isinstance(fullname, str)


def test_research_object() -> None:
    # TODO: Test ResearchObject methods
    pass


# Reasearch object may need to be pickled (for Toil)


def test_research_object_picklability(research_object: ResearchObject) -> None:
    assert pickle.dumps(research_object) is not None