diff env/lib/python3.9/site-packages/cwltool/tests/test_mpi.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/cwltool/tests/test_mpi.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,366 @@
+"""Tests of the experimental MPI extension."""
+import json
+import os.path
+import sys
+from io import StringIO
+from pathlib import Path
+from typing import Any, Generator, List, MutableMapping, Optional, Tuple
+
+import pkg_resources
+import pytest
+from ruamel import yaml
+from ruamel.yaml.comments import CommentedMap, CommentedSeq
+from schema_salad.avro.schema import Names
+
+import cwltool.load_tool
+import cwltool.singularity
+import cwltool.udocker
+from cwltool.command_line_tool import CommandLineTool
+from cwltool.context import LoadingContext, RuntimeContext
+from cwltool.main import main
+from cwltool.mpi import MpiConfig, MPIRequirementName
+
+from .util import get_data, windows_needs_docker, working_directory
+
+
+def test_mpi_conf_defaults() -> None:
+    mpi = MpiConfig()
+    assert mpi.runner == "mpirun"
+    assert mpi.nproc_flag == "-n"
+    assert mpi.default_nproc == 1
+    assert mpi.extra_flags == []
+    assert mpi.env_pass == []
+    assert mpi.env_pass_regex == []
+    assert mpi.env_set == {}
+
+
+def test_mpi_conf_unknownkeys() -> None:
+    with pytest.raises(TypeError):
+        MpiConfig(runner="mpiexec", foo="bar")  # type: ignore
+
+
+@pytest.fixture(scope="class")
+def fake_mpi_conf(tmp_path_factory: Any) -> Generator[str, None, None]:
+    """
+    Make a super simple mpirun-alike for applications that don't actually use MPI.
+
+    It just runs the command multiple times (in serial).
+
+    Then create a plaform MPI config YAML file that should make it work
+    for the testing examples.
+    """
+    mpirun_text = """#!{interpreter}
+import argparse
+import sys
+import subprocess
+from io import StringIO
+from typing import List
+
+def make_parser():
+    p = argparse.ArgumentParser()
+    p.add_argument("--num", type=int, help="number of times to run the application")
+    p.add_argument(
+        "--no-fail", help="add this flag to actually work", action="store_true"
+    )
+    p.add_argument(
+        "progargs", nargs=argparse.REMAINDER, help="The program and its arguments"
+    )
+    return p
+
+class Runner:
+    def __init__(self):
+        if sys.stdin.isatty():
+            self.indata = None
+        else:
+            self.indata = sys.stdin.read().encode(sys.stdin.encoding)
+
+    def run_once(self, args: List[str]):
+        subprocess.run(
+            args, input=self.indata, stdout=sys.stdout, stderr=sys.stderr
+        ).check_returncode()
+
+    def run_many(self, n: int, args: List[str]):
+        for i in range(n):
+            self.run_once(args)
+
+if __name__ == "__main__":
+    args = make_parser().parse_args()
+    assert args.no_fail == True, "Didn't set the --no-fail flag"
+    r = Runner()
+    r.run_many(args.num, args.progargs)
+""".format(
+        interpreter=sys.executable
+    )
+    mpitmp = tmp_path_factory.mktemp("fake_mpi")
+    mpirun_file = mpitmp / "fake_mpirun"
+    mpirun_file.write_text(mpirun_text)
+    mpirun_file.chmod(0o755)
+
+    plat_conf = {
+        "runner": str(mpirun_file),
+        "nproc_flag": "--num",
+        "extra_flags": ["--no-fail"],
+        "env_set": {"TEST_MPI_FOO": "bar"},
+        "env_pass": ["USER"],
+    }
+    plat_conf_file = mpitmp / "plat_mpi.yml"
+    plat_conf_file.write_text(yaml.main.round_trip_dump(plat_conf))
+
+    yield str(plat_conf_file)
+
+    plat_conf_file.unlink()
+    mpirun_file.unlink()
+    mpitmp.rmdir()
+
+
+def make_processes_input(np: int, tmp_path: Path) -> Path:
+    input_file = tmp_path / "input.yml"
+    with input_file.open("w") as f:
+        f.write("processes: %d\n" % np)
+    return input_file
+
+
+def cwltool_args(fake_mpi_conf: str) -> List[str]:
+    return ["--enable-ext", "--enable-dev", "--mpi-config-file", fake_mpi_conf]
+
+
+class TestMpiRun:
+    def test_fake_mpi_config(self, fake_mpi_conf: str) -> None:
+        conf_obj = MpiConfig.load(fake_mpi_conf)
+        runner = conf_obj.runner
+        assert os.path.dirname(runner) == os.path.dirname(fake_mpi_conf)
+        assert os.path.basename(runner) == "fake_mpirun"
+        assert conf_obj.nproc_flag == "--num"
+        assert conf_obj.default_nproc == 1
+        assert conf_obj.extra_flags == ["--no-fail"]
+
+    @windows_needs_docker
+    def test_simple_mpi_tool(self, fake_mpi_conf: str, tmp_path: Path) -> None:
+        stdout = StringIO()
+        stderr = StringIO()
+        with working_directory(tmp_path):
+            rc = main(
+                argsl=cwltool_args(fake_mpi_conf)
+                + [get_data("tests/wf/mpi_simple.cwl")],
+                stdout=stdout,
+                stderr=stderr,
+            )
+            assert rc == 0
+
+            output = json.loads(stdout.getvalue())
+            pid_path = output["pids"]["path"]
+            with open(pid_path) as pidfile:
+                pids = [int(line) for line in pidfile]
+            assert len(pids) == 2
+
+    @windows_needs_docker
+    def test_simple_mpi_nproc_expr(self, fake_mpi_conf: str, tmp_path: Path) -> None:
+        np = 4
+        input_file = make_processes_input(np, tmp_path)
+        stdout = StringIO()
+        stderr = StringIO()
+        with working_directory(tmp_path):
+            rc = main(
+                argsl=cwltool_args(fake_mpi_conf)
+                + [get_data("tests/wf/mpi_expr.cwl"), str(input_file)],
+                stdout=stdout,
+                stderr=stderr,
+            )
+            assert rc == 0
+
+            output = json.loads(stdout.getvalue())
+            pid_path = output["pids"]["path"]
+            with open(pid_path) as pidfile:
+                pids = [int(line) for line in pidfile]
+            assert len(pids) == np
+
+    @windows_needs_docker
+    def test_mpi_workflow(self, fake_mpi_conf: str, tmp_path: Path) -> None:
+        np = 3
+        input_file = make_processes_input(np, tmp_path)
+        stdout = StringIO()
+        stderr = StringIO()
+        with working_directory(tmp_path):
+            rc = main(
+                argsl=cwltool_args(fake_mpi_conf)
+                + [get_data("tests/wf/mpi_simple_wf.cwl"), str(input_file)],
+                stdout=stdout,
+                stderr=stderr,
+            )
+            assert rc == 0
+
+            output = json.loads(stdout.getvalue())
+            lc_path = output["line_count"]["path"]
+            with open(lc_path) as lc_file:
+                lc = int(lc_file.read())
+                assert lc == np
+
+    @windows_needs_docker
+    def test_environment(
+        self, fake_mpi_conf: str, tmp_path: Path, monkeypatch: Any
+    ) -> None:
+        stdout = StringIO()
+        stderr = StringIO()
+        monkeypatch.setenv("USER", "tester")
+        with working_directory(tmp_path):
+            rc = main(
+                argsl=cwltool_args(fake_mpi_conf) + [get_data("tests/wf/mpi_env.cwl")],
+                stdout=stdout,
+                stderr=stderr,
+            )
+            assert rc == 0
+
+            output = json.loads(stdout.getvalue())
+            env_path = output["environment"]["path"]
+            with open(env_path) as envfile:
+                e = {}
+                for line in envfile:
+                    k, v = line.strip().split("=", 1)
+                    e[k] = v
+            assert e["USER"] == "tester"
+            assert e["TEST_MPI_FOO"] == "bar"
+
+
+def test_env_passing(monkeypatch: Any) -> None:
+    config = MpiConfig(
+        env_pass=["A", "B", "LONG_NAME"],
+        env_pass_regex=["TOOLNAME", "MPI_.*_CONF"],
+    )
+
+    env = {}  # type: MutableMapping[str, str]
+
+    with monkeypatch.context() as m:
+        m.setattr(os, "environ", {})
+        env = {}
+        config.pass_through_env_vars(env)
+        assert env == {}
+
+    with monkeypatch.context() as m:
+        m.setattr(os, "environ", {"A": "a"})
+        env = {}
+        config.pass_through_env_vars(env)
+        assert env == {"A": "a"}
+
+    with monkeypatch.context() as m:
+        m.setattr(os, "environ", {"A": "a", "C": "c"})
+        env = {}
+        config.pass_through_env_vars(env)
+        assert env == {"A": "a"}
+
+    with monkeypatch.context() as m:
+        m.setattr(os, "environ", {"A": "a", "B": "b", "C": "c"})
+        env = {"PATH": "one:two:three", "HOME": "/tmp/dir", "TMPDIR": "/tmp/dir"}
+        config.pass_through_env_vars(env)
+        assert env == {
+            "PATH": "one:two:three",
+            "HOME": "/tmp/dir",
+            "TMPDIR": "/tmp/dir",
+            "A": "a",
+            "B": "b",
+        }
+
+    with monkeypatch.context() as m:
+        m.setattr(os, "environ", {"TOOLNAME": "foobar"})
+        env = {}
+        config.pass_through_env_vars(env)
+        assert env == {"TOOLNAME": "foobar"}
+
+    with monkeypatch.context() as m:
+        m.setattr(os, "environ", {"_TOOLNAME_": "foobar"})
+        env = {}
+        config.pass_through_env_vars(env)
+        # Cos we are matching not searching
+        assert env == {}
+
+    with monkeypatch.context() as m:
+        m.setattr(os, "environ", {"MPI_A_CONF": "A", "MPI_B_CONF": "B"})
+
+        env = {}
+        config.pass_through_env_vars(env)
+        # Cos we are matching not searching
+        assert env == {"MPI_A_CONF": "A", "MPI_B_CONF": "B"}
+
+
+# Reading the schema is super slow - cache for the session
+@pytest.fixture(scope="session")
+def schema_ext11() -> Generator[Names, None, None]:
+    with pkg_resources.resource_stream("cwltool", "extensions-v1.1.yml") as res:
+        ext11 = res.read().decode("utf-8")
+        cwltool.process.use_custom_schema("v1.1", "http://commonwl.org/cwltool", ext11)
+        schema = cwltool.process.get_schema("v1.1")[1]
+        assert isinstance(schema, Names)
+        yield schema
+
+
+mpiReq = CommentedMap({"class": MPIRequirementName, "processes": 1})
+containerReq = CommentedMap({"class": "DockerRequirement"})
+basetool = CommentedMap(
+    {"cwlVersion": "v1.1", "inputs": CommentedSeq(), "outputs": CommentedSeq()}
+)
+
+
+def mk_tool(
+    schema: Names,
+    opts: List[str],
+    reqs: Optional[List[CommentedMap]] = None,
+    hints: Optional[List[CommentedMap]] = None,
+) -> Tuple[LoadingContext, RuntimeContext, CommentedMap]:
+    tool = basetool.copy()
+
+    if reqs is not None:
+        tool["requirements"] = CommentedSeq(reqs)
+    if hints is not None:
+        tool["hints"] = CommentedSeq(hints)
+
+    args = cwltool.argparser.arg_parser().parse_args(opts)
+    args.enable_ext = True
+    rc = RuntimeContext(vars(args))
+    lc = cwltool.main.setup_loadingContext(None, rc, args)
+    lc.avsc_names = schema
+    return lc, rc, tool
+
+
+def test_singularity(schema_ext11: Names) -> None:
+    lc, rc, tool = mk_tool(schema_ext11, ["--singularity"], reqs=[mpiReq, containerReq])
+    clt = CommandLineTool(tool, lc)
+    jr = clt.make_job_runner(rc)
+    assert jr is cwltool.singularity.SingularityCommandLineJob
+
+
+def test_udocker(schema_ext11: Names) -> None:
+    lc, rc, tool = mk_tool(schema_ext11, ["--udocker"], reqs=[mpiReq, containerReq])
+    clt = CommandLineTool(tool, lc)
+    jr = clt.make_job_runner(rc)
+    assert jr is cwltool.udocker.UDockerCommandLineJob
+
+
+def test_docker_hint(schema_ext11: Names) -> None:
+    # Docker hint, MPI required
+    lc, rc, tool = mk_tool(schema_ext11, [], hints=[containerReq], reqs=[mpiReq])
+    clt = CommandLineTool(tool, lc)
+    jr = clt.make_job_runner(rc)
+    assert jr is cwltool.job.CommandLineJob
+
+
+def test_docker_required(schema_ext11: Names) -> None:
+    # Docker required, MPI hinted
+    lc, rc, tool = mk_tool(schema_ext11, [], reqs=[containerReq], hints=[mpiReq])
+    clt = CommandLineTool(tool, lc)
+    jr = clt.make_job_runner(rc)
+    assert jr is cwltool.docker.DockerCommandLineJob
+
+
+def test_docker_mpi_both_required(schema_ext11: Names) -> None:
+    # Both required - error
+    with pytest.raises(cwltool.errors.UnsupportedRequirement):
+        lc, rc, tool = mk_tool(schema_ext11, [], reqs=[mpiReq, containerReq])
+        clt = CommandLineTool(tool, lc)
+        jr = clt.make_job_runner(rc)
+
+
+def test_docker_mpi_both_hinted(schema_ext11: Names) -> None:
+    # Both hinted - error
+    with pytest.raises(cwltool.errors.UnsupportedRequirement):
+        lc, rc, tool = mk_tool(schema_ext11, [], hints=[mpiReq, containerReq])
+        clt = CommandLineTool(tool, lc)
+        jr = clt.make_job_runner(rc)