Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/cwltool/tests/test_mpi.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/cwltool/tests/test_mpi.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,366 @@ +"""Tests of the experimental MPI extension.""" +import json +import os.path +import sys +from io import StringIO +from pathlib import Path +from typing import Any, Generator, List, MutableMapping, Optional, Tuple + +import pkg_resources +import pytest +from ruamel import yaml +from ruamel.yaml.comments import CommentedMap, CommentedSeq +from schema_salad.avro.schema import Names + +import cwltool.load_tool +import cwltool.singularity +import cwltool.udocker +from cwltool.command_line_tool import CommandLineTool +from cwltool.context import LoadingContext, RuntimeContext +from cwltool.main import main +from cwltool.mpi import MpiConfig, MPIRequirementName + +from .util import get_data, windows_needs_docker, working_directory + + +def test_mpi_conf_defaults() -> None: + mpi = MpiConfig() + assert mpi.runner == "mpirun" + assert mpi.nproc_flag == "-n" + assert mpi.default_nproc == 1 + assert mpi.extra_flags == [] + assert mpi.env_pass == [] + assert mpi.env_pass_regex == [] + assert mpi.env_set == {} + + +def test_mpi_conf_unknownkeys() -> None: + with pytest.raises(TypeError): + MpiConfig(runner="mpiexec", foo="bar") # type: ignore + + +@pytest.fixture(scope="class") +def fake_mpi_conf(tmp_path_factory: Any) -> Generator[str, None, None]: + """ + Make a super simple mpirun-alike for applications that don't actually use MPI. + + It just runs the command multiple times (in serial). + + Then create a plaform MPI config YAML file that should make it work + for the testing examples. + """ + mpirun_text = """#!{interpreter} +import argparse +import sys +import subprocess +from io import StringIO +from typing import List + +def make_parser(): + p = argparse.ArgumentParser() + p.add_argument("--num", type=int, help="number of times to run the application") + p.add_argument( + "--no-fail", help="add this flag to actually work", action="store_true" + ) + p.add_argument( + "progargs", nargs=argparse.REMAINDER, help="The program and its arguments" + ) + return p + +class Runner: + def __init__(self): + if sys.stdin.isatty(): + self.indata = None + else: + self.indata = sys.stdin.read().encode(sys.stdin.encoding) + + def run_once(self, args: List[str]): + subprocess.run( + args, input=self.indata, stdout=sys.stdout, stderr=sys.stderr + ).check_returncode() + + def run_many(self, n: int, args: List[str]): + for i in range(n): + self.run_once(args) + +if __name__ == "__main__": + args = make_parser().parse_args() + assert args.no_fail == True, "Didn't set the --no-fail flag" + r = Runner() + r.run_many(args.num, args.progargs) +""".format( + interpreter=sys.executable + ) + mpitmp = tmp_path_factory.mktemp("fake_mpi") + mpirun_file = mpitmp / "fake_mpirun" + mpirun_file.write_text(mpirun_text) + mpirun_file.chmod(0o755) + + plat_conf = { + "runner": str(mpirun_file), + "nproc_flag": "--num", + "extra_flags": ["--no-fail"], + "env_set": {"TEST_MPI_FOO": "bar"}, + "env_pass": ["USER"], + } + plat_conf_file = mpitmp / "plat_mpi.yml" + plat_conf_file.write_text(yaml.main.round_trip_dump(plat_conf)) + + yield str(plat_conf_file) + + plat_conf_file.unlink() + mpirun_file.unlink() + mpitmp.rmdir() + + +def make_processes_input(np: int, tmp_path: Path) -> Path: + input_file = tmp_path / "input.yml" + with input_file.open("w") as f: + f.write("processes: %d\n" % np) + return input_file + + +def cwltool_args(fake_mpi_conf: str) -> List[str]: + return ["--enable-ext", "--enable-dev", "--mpi-config-file", fake_mpi_conf] + + +class TestMpiRun: + def test_fake_mpi_config(self, fake_mpi_conf: str) -> None: + conf_obj = MpiConfig.load(fake_mpi_conf) + runner = conf_obj.runner + assert os.path.dirname(runner) == os.path.dirname(fake_mpi_conf) + assert os.path.basename(runner) == "fake_mpirun" + assert conf_obj.nproc_flag == "--num" + assert conf_obj.default_nproc == 1 + assert conf_obj.extra_flags == ["--no-fail"] + + @windows_needs_docker + def test_simple_mpi_tool(self, fake_mpi_conf: str, tmp_path: Path) -> None: + stdout = StringIO() + stderr = StringIO() + with working_directory(tmp_path): + rc = main( + argsl=cwltool_args(fake_mpi_conf) + + [get_data("tests/wf/mpi_simple.cwl")], + stdout=stdout, + stderr=stderr, + ) + assert rc == 0 + + output = json.loads(stdout.getvalue()) + pid_path = output["pids"]["path"] + with open(pid_path) as pidfile: + pids = [int(line) for line in pidfile] + assert len(pids) == 2 + + @windows_needs_docker + def test_simple_mpi_nproc_expr(self, fake_mpi_conf: str, tmp_path: Path) -> None: + np = 4 + input_file = make_processes_input(np, tmp_path) + stdout = StringIO() + stderr = StringIO() + with working_directory(tmp_path): + rc = main( + argsl=cwltool_args(fake_mpi_conf) + + [get_data("tests/wf/mpi_expr.cwl"), str(input_file)], + stdout=stdout, + stderr=stderr, + ) + assert rc == 0 + + output = json.loads(stdout.getvalue()) + pid_path = output["pids"]["path"] + with open(pid_path) as pidfile: + pids = [int(line) for line in pidfile] + assert len(pids) == np + + @windows_needs_docker + def test_mpi_workflow(self, fake_mpi_conf: str, tmp_path: Path) -> None: + np = 3 + input_file = make_processes_input(np, tmp_path) + stdout = StringIO() + stderr = StringIO() + with working_directory(tmp_path): + rc = main( + argsl=cwltool_args(fake_mpi_conf) + + [get_data("tests/wf/mpi_simple_wf.cwl"), str(input_file)], + stdout=stdout, + stderr=stderr, + ) + assert rc == 0 + + output = json.loads(stdout.getvalue()) + lc_path = output["line_count"]["path"] + with open(lc_path) as lc_file: + lc = int(lc_file.read()) + assert lc == np + + @windows_needs_docker + def test_environment( + self, fake_mpi_conf: str, tmp_path: Path, monkeypatch: Any + ) -> None: + stdout = StringIO() + stderr = StringIO() + monkeypatch.setenv("USER", "tester") + with working_directory(tmp_path): + rc = main( + argsl=cwltool_args(fake_mpi_conf) + [get_data("tests/wf/mpi_env.cwl")], + stdout=stdout, + stderr=stderr, + ) + assert rc == 0 + + output = json.loads(stdout.getvalue()) + env_path = output["environment"]["path"] + with open(env_path) as envfile: + e = {} + for line in envfile: + k, v = line.strip().split("=", 1) + e[k] = v + assert e["USER"] == "tester" + assert e["TEST_MPI_FOO"] == "bar" + + +def test_env_passing(monkeypatch: Any) -> None: + config = MpiConfig( + env_pass=["A", "B", "LONG_NAME"], + env_pass_regex=["TOOLNAME", "MPI_.*_CONF"], + ) + + env = {} # type: MutableMapping[str, str] + + with monkeypatch.context() as m: + m.setattr(os, "environ", {}) + env = {} + config.pass_through_env_vars(env) + assert env == {} + + with monkeypatch.context() as m: + m.setattr(os, "environ", {"A": "a"}) + env = {} + config.pass_through_env_vars(env) + assert env == {"A": "a"} + + with monkeypatch.context() as m: + m.setattr(os, "environ", {"A": "a", "C": "c"}) + env = {} + config.pass_through_env_vars(env) + assert env == {"A": "a"} + + with monkeypatch.context() as m: + m.setattr(os, "environ", {"A": "a", "B": "b", "C": "c"}) + env = {"PATH": "one:two:three", "HOME": "/tmp/dir", "TMPDIR": "/tmp/dir"} + config.pass_through_env_vars(env) + assert env == { + "PATH": "one:two:three", + "HOME": "/tmp/dir", + "TMPDIR": "/tmp/dir", + "A": "a", + "B": "b", + } + + with monkeypatch.context() as m: + m.setattr(os, "environ", {"TOOLNAME": "foobar"}) + env = {} + config.pass_through_env_vars(env) + assert env == {"TOOLNAME": "foobar"} + + with monkeypatch.context() as m: + m.setattr(os, "environ", {"_TOOLNAME_": "foobar"}) + env = {} + config.pass_through_env_vars(env) + # Cos we are matching not searching + assert env == {} + + with monkeypatch.context() as m: + m.setattr(os, "environ", {"MPI_A_CONF": "A", "MPI_B_CONF": "B"}) + + env = {} + config.pass_through_env_vars(env) + # Cos we are matching not searching + assert env == {"MPI_A_CONF": "A", "MPI_B_CONF": "B"} + + +# Reading the schema is super slow - cache for the session +@pytest.fixture(scope="session") +def schema_ext11() -> Generator[Names, None, None]: + with pkg_resources.resource_stream("cwltool", "extensions-v1.1.yml") as res: + ext11 = res.read().decode("utf-8") + cwltool.process.use_custom_schema("v1.1", "http://commonwl.org/cwltool", ext11) + schema = cwltool.process.get_schema("v1.1")[1] + assert isinstance(schema, Names) + yield schema + + +mpiReq = CommentedMap({"class": MPIRequirementName, "processes": 1}) +containerReq = CommentedMap({"class": "DockerRequirement"}) +basetool = CommentedMap( + {"cwlVersion": "v1.1", "inputs": CommentedSeq(), "outputs": CommentedSeq()} +) + + +def mk_tool( + schema: Names, + opts: List[str], + reqs: Optional[List[CommentedMap]] = None, + hints: Optional[List[CommentedMap]] = None, +) -> Tuple[LoadingContext, RuntimeContext, CommentedMap]: + tool = basetool.copy() + + if reqs is not None: + tool["requirements"] = CommentedSeq(reqs) + if hints is not None: + tool["hints"] = CommentedSeq(hints) + + args = cwltool.argparser.arg_parser().parse_args(opts) + args.enable_ext = True + rc = RuntimeContext(vars(args)) + lc = cwltool.main.setup_loadingContext(None, rc, args) + lc.avsc_names = schema + return lc, rc, tool + + +def test_singularity(schema_ext11: Names) -> None: + lc, rc, tool = mk_tool(schema_ext11, ["--singularity"], reqs=[mpiReq, containerReq]) + clt = CommandLineTool(tool, lc) + jr = clt.make_job_runner(rc) + assert jr is cwltool.singularity.SingularityCommandLineJob + + +def test_udocker(schema_ext11: Names) -> None: + lc, rc, tool = mk_tool(schema_ext11, ["--udocker"], reqs=[mpiReq, containerReq]) + clt = CommandLineTool(tool, lc) + jr = clt.make_job_runner(rc) + assert jr is cwltool.udocker.UDockerCommandLineJob + + +def test_docker_hint(schema_ext11: Names) -> None: + # Docker hint, MPI required + lc, rc, tool = mk_tool(schema_ext11, [], hints=[containerReq], reqs=[mpiReq]) + clt = CommandLineTool(tool, lc) + jr = clt.make_job_runner(rc) + assert jr is cwltool.job.CommandLineJob + + +def test_docker_required(schema_ext11: Names) -> None: + # Docker required, MPI hinted + lc, rc, tool = mk_tool(schema_ext11, [], reqs=[containerReq], hints=[mpiReq]) + clt = CommandLineTool(tool, lc) + jr = clt.make_job_runner(rc) + assert jr is cwltool.docker.DockerCommandLineJob + + +def test_docker_mpi_both_required(schema_ext11: Names) -> None: + # Both required - error + with pytest.raises(cwltool.errors.UnsupportedRequirement): + lc, rc, tool = mk_tool(schema_ext11, [], reqs=[mpiReq, containerReq]) + clt = CommandLineTool(tool, lc) + jr = clt.make_job_runner(rc) + + +def test_docker_mpi_both_hinted(schema_ext11: Names) -> None: + # Both hinted - error + with pytest.raises(cwltool.errors.UnsupportedRequirement): + lc, rc, tool = mk_tool(schema_ext11, [], hints=[mpiReq, containerReq]) + clt = CommandLineTool(tool, lc) + jr = clt.make_job_runner(rc)