Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/cwltool/tests/test_mpi.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
"""Tests of the experimental MPI extension.""" import json import os.path import sys from io import StringIO from pathlib import Path from typing import Any, Generator, List, MutableMapping, Optional, Tuple import pkg_resources import pytest from ruamel import yaml from ruamel.yaml.comments import CommentedMap, CommentedSeq from schema_salad.avro.schema import Names import cwltool.load_tool import cwltool.singularity import cwltool.udocker from cwltool.command_line_tool import CommandLineTool from cwltool.context import LoadingContext, RuntimeContext from cwltool.main import main from cwltool.mpi import MpiConfig, MPIRequirementName from .util import get_data, windows_needs_docker, working_directory def test_mpi_conf_defaults() -> None: mpi = MpiConfig() assert mpi.runner == "mpirun" assert mpi.nproc_flag == "-n" assert mpi.default_nproc == 1 assert mpi.extra_flags == [] assert mpi.env_pass == [] assert mpi.env_pass_regex == [] assert mpi.env_set == {} def test_mpi_conf_unknownkeys() -> None: with pytest.raises(TypeError): MpiConfig(runner="mpiexec", foo="bar") # type: ignore @pytest.fixture(scope="class") def fake_mpi_conf(tmp_path_factory: Any) -> Generator[str, None, None]: """ Make a super simple mpirun-alike for applications that don't actually use MPI. It just runs the command multiple times (in serial). Then create a plaform MPI config YAML file that should make it work for the testing examples. """ mpirun_text = """#!{interpreter} import argparse import sys import subprocess from io import StringIO from typing import List def make_parser(): p = argparse.ArgumentParser() p.add_argument("--num", type=int, help="number of times to run the application") p.add_argument( "--no-fail", help="add this flag to actually work", action="store_true" ) p.add_argument( "progargs", nargs=argparse.REMAINDER, help="The program and its arguments" ) return p class Runner: def __init__(self): if sys.stdin.isatty(): self.indata = None else: self.indata = sys.stdin.read().encode(sys.stdin.encoding) def run_once(self, args: List[str]): subprocess.run( args, input=self.indata, stdout=sys.stdout, stderr=sys.stderr ).check_returncode() def run_many(self, n: int, args: List[str]): for i in range(n): self.run_once(args) if __name__ == "__main__": args = make_parser().parse_args() assert args.no_fail == True, "Didn't set the --no-fail flag" r = Runner() r.run_many(args.num, args.progargs) """.format( interpreter=sys.executable ) mpitmp = tmp_path_factory.mktemp("fake_mpi") mpirun_file = mpitmp / "fake_mpirun" mpirun_file.write_text(mpirun_text) mpirun_file.chmod(0o755) plat_conf = { "runner": str(mpirun_file), "nproc_flag": "--num", "extra_flags": ["--no-fail"], "env_set": {"TEST_MPI_FOO": "bar"}, "env_pass": ["USER"], } plat_conf_file = mpitmp / "plat_mpi.yml" plat_conf_file.write_text(yaml.main.round_trip_dump(plat_conf)) yield str(plat_conf_file) plat_conf_file.unlink() mpirun_file.unlink() mpitmp.rmdir() def make_processes_input(np: int, tmp_path: Path) -> Path: input_file = tmp_path / "input.yml" with input_file.open("w") as f: f.write("processes: %d\n" % np) return input_file def cwltool_args(fake_mpi_conf: str) -> List[str]: return ["--enable-ext", "--enable-dev", "--mpi-config-file", fake_mpi_conf] class TestMpiRun: def test_fake_mpi_config(self, fake_mpi_conf: str) -> None: conf_obj = MpiConfig.load(fake_mpi_conf) runner = conf_obj.runner assert os.path.dirname(runner) == os.path.dirname(fake_mpi_conf) assert os.path.basename(runner) == "fake_mpirun" assert conf_obj.nproc_flag == "--num" assert conf_obj.default_nproc == 1 assert conf_obj.extra_flags == ["--no-fail"] @windows_needs_docker def test_simple_mpi_tool(self, fake_mpi_conf: str, tmp_path: Path) -> None: stdout = StringIO() stderr = StringIO() with working_directory(tmp_path): rc = main( argsl=cwltool_args(fake_mpi_conf) + [get_data("tests/wf/mpi_simple.cwl")], stdout=stdout, stderr=stderr, ) assert rc == 0 output = json.loads(stdout.getvalue()) pid_path = output["pids"]["path"] with open(pid_path) as pidfile: pids = [int(line) for line in pidfile] assert len(pids) == 2 @windows_needs_docker def test_simple_mpi_nproc_expr(self, fake_mpi_conf: str, tmp_path: Path) -> None: np = 4 input_file = make_processes_input(np, tmp_path) stdout = StringIO() stderr = StringIO() with working_directory(tmp_path): rc = main( argsl=cwltool_args(fake_mpi_conf) + [get_data("tests/wf/mpi_expr.cwl"), str(input_file)], stdout=stdout, stderr=stderr, ) assert rc == 0 output = json.loads(stdout.getvalue()) pid_path = output["pids"]["path"] with open(pid_path) as pidfile: pids = [int(line) for line in pidfile] assert len(pids) == np @windows_needs_docker def test_mpi_workflow(self, fake_mpi_conf: str, tmp_path: Path) -> None: np = 3 input_file = make_processes_input(np, tmp_path) stdout = StringIO() stderr = StringIO() with working_directory(tmp_path): rc = main( argsl=cwltool_args(fake_mpi_conf) + [get_data("tests/wf/mpi_simple_wf.cwl"), str(input_file)], stdout=stdout, stderr=stderr, ) assert rc == 0 output = json.loads(stdout.getvalue()) lc_path = output["line_count"]["path"] with open(lc_path) as lc_file: lc = int(lc_file.read()) assert lc == np @windows_needs_docker def test_environment( self, fake_mpi_conf: str, tmp_path: Path, monkeypatch: Any ) -> None: stdout = StringIO() stderr = StringIO() monkeypatch.setenv("USER", "tester") with working_directory(tmp_path): rc = main( argsl=cwltool_args(fake_mpi_conf) + [get_data("tests/wf/mpi_env.cwl")], stdout=stdout, stderr=stderr, ) assert rc == 0 output = json.loads(stdout.getvalue()) env_path = output["environment"]["path"] with open(env_path) as envfile: e = {} for line in envfile: k, v = line.strip().split("=", 1) e[k] = v assert e["USER"] == "tester" assert e["TEST_MPI_FOO"] == "bar" def test_env_passing(monkeypatch: Any) -> None: config = MpiConfig( env_pass=["A", "B", "LONG_NAME"], env_pass_regex=["TOOLNAME", "MPI_.*_CONF"], ) env = {} # type: MutableMapping[str, str] with monkeypatch.context() as m: m.setattr(os, "environ", {}) env = {} config.pass_through_env_vars(env) assert env == {} with monkeypatch.context() as m: m.setattr(os, "environ", {"A": "a"}) env = {} config.pass_through_env_vars(env) assert env == {"A": "a"} with monkeypatch.context() as m: m.setattr(os, "environ", {"A": "a", "C": "c"}) env = {} config.pass_through_env_vars(env) assert env == {"A": "a"} with monkeypatch.context() as m: m.setattr(os, "environ", {"A": "a", "B": "b", "C": "c"}) env = {"PATH": "one:two:three", "HOME": "/tmp/dir", "TMPDIR": "/tmp/dir"} config.pass_through_env_vars(env) assert env == { "PATH": "one:two:three", "HOME": "/tmp/dir", "TMPDIR": "/tmp/dir", "A": "a", "B": "b", } with monkeypatch.context() as m: m.setattr(os, "environ", {"TOOLNAME": "foobar"}) env = {} config.pass_through_env_vars(env) assert env == {"TOOLNAME": "foobar"} with monkeypatch.context() as m: m.setattr(os, "environ", {"_TOOLNAME_": "foobar"}) env = {} config.pass_through_env_vars(env) # Cos we are matching not searching assert env == {} with monkeypatch.context() as m: m.setattr(os, "environ", {"MPI_A_CONF": "A", "MPI_B_CONF": "B"}) env = {} config.pass_through_env_vars(env) # Cos we are matching not searching assert env == {"MPI_A_CONF": "A", "MPI_B_CONF": "B"} # Reading the schema is super slow - cache for the session @pytest.fixture(scope="session") def schema_ext11() -> Generator[Names, None, None]: with pkg_resources.resource_stream("cwltool", "extensions-v1.1.yml") as res: ext11 = res.read().decode("utf-8") cwltool.process.use_custom_schema("v1.1", "http://commonwl.org/cwltool", ext11) schema = cwltool.process.get_schema("v1.1")[1] assert isinstance(schema, Names) yield schema mpiReq = CommentedMap({"class": MPIRequirementName, "processes": 1}) containerReq = CommentedMap({"class": "DockerRequirement"}) basetool = CommentedMap( {"cwlVersion": "v1.1", "inputs": CommentedSeq(), "outputs": CommentedSeq()} ) def mk_tool( schema: Names, opts: List[str], reqs: Optional[List[CommentedMap]] = None, hints: Optional[List[CommentedMap]] = None, ) -> Tuple[LoadingContext, RuntimeContext, CommentedMap]: tool = basetool.copy() if reqs is not None: tool["requirements"] = CommentedSeq(reqs) if hints is not None: tool["hints"] = CommentedSeq(hints) args = cwltool.argparser.arg_parser().parse_args(opts) args.enable_ext = True rc = RuntimeContext(vars(args)) lc = cwltool.main.setup_loadingContext(None, rc, args) lc.avsc_names = schema return lc, rc, tool def test_singularity(schema_ext11: Names) -> None: lc, rc, tool = mk_tool(schema_ext11, ["--singularity"], reqs=[mpiReq, containerReq]) clt = CommandLineTool(tool, lc) jr = clt.make_job_runner(rc) assert jr is cwltool.singularity.SingularityCommandLineJob def test_udocker(schema_ext11: Names) -> None: lc, rc, tool = mk_tool(schema_ext11, ["--udocker"], reqs=[mpiReq, containerReq]) clt = CommandLineTool(tool, lc) jr = clt.make_job_runner(rc) assert jr is cwltool.udocker.UDockerCommandLineJob def test_docker_hint(schema_ext11: Names) -> None: # Docker hint, MPI required lc, rc, tool = mk_tool(schema_ext11, [], hints=[containerReq], reqs=[mpiReq]) clt = CommandLineTool(tool, lc) jr = clt.make_job_runner(rc) assert jr is cwltool.job.CommandLineJob def test_docker_required(schema_ext11: Names) -> None: # Docker required, MPI hinted lc, rc, tool = mk_tool(schema_ext11, [], reqs=[containerReq], hints=[mpiReq]) clt = CommandLineTool(tool, lc) jr = clt.make_job_runner(rc) assert jr is cwltool.docker.DockerCommandLineJob def test_docker_mpi_both_required(schema_ext11: Names) -> None: # Both required - error with pytest.raises(cwltool.errors.UnsupportedRequirement): lc, rc, tool = mk_tool(schema_ext11, [], reqs=[mpiReq, containerReq]) clt = CommandLineTool(tool, lc) jr = clt.make_job_runner(rc) def test_docker_mpi_both_hinted(schema_ext11: Names) -> None: # Both hinted - error with pytest.raises(cwltool.errors.UnsupportedRequirement): lc, rc, tool = mk_tool(schema_ext11, [], hints=[mpiReq, containerReq]) clt = CommandLineTool(tool, lc) jr = clt.make_job_runner(rc)