comparison env/lib/python3.9/site-packages/cwltool/tests/test_mpi.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Tests of the experimental MPI extension."""
2 import json
3 import os.path
4 import sys
5 from io import StringIO
6 from pathlib import Path
7 from typing import Any, Generator, List, MutableMapping, Optional, Tuple
8
9 import pkg_resources
10 import pytest
11 from ruamel import yaml
12 from ruamel.yaml.comments import CommentedMap, CommentedSeq
13 from schema_salad.avro.schema import Names
14
15 import cwltool.load_tool
16 import cwltool.singularity
17 import cwltool.udocker
18 from cwltool.command_line_tool import CommandLineTool
19 from cwltool.context import LoadingContext, RuntimeContext
20 from cwltool.main import main
21 from cwltool.mpi import MpiConfig, MPIRequirementName
22
23 from .util import get_data, windows_needs_docker, working_directory
24
25
26 def test_mpi_conf_defaults() -> None:
27 mpi = MpiConfig()
28 assert mpi.runner == "mpirun"
29 assert mpi.nproc_flag == "-n"
30 assert mpi.default_nproc == 1
31 assert mpi.extra_flags == []
32 assert mpi.env_pass == []
33 assert mpi.env_pass_regex == []
34 assert mpi.env_set == {}
35
36
37 def test_mpi_conf_unknownkeys() -> None:
38 with pytest.raises(TypeError):
39 MpiConfig(runner="mpiexec", foo="bar") # type: ignore
40
41
42 @pytest.fixture(scope="class")
43 def fake_mpi_conf(tmp_path_factory: Any) -> Generator[str, None, None]:
44 """
45 Make a super simple mpirun-alike for applications that don't actually use MPI.
46
47 It just runs the command multiple times (in serial).
48
49 Then create a plaform MPI config YAML file that should make it work
50 for the testing examples.
51 """
52 mpirun_text = """#!{interpreter}
53 import argparse
54 import sys
55 import subprocess
56 from io import StringIO
57 from typing import List
58
59 def make_parser():
60 p = argparse.ArgumentParser()
61 p.add_argument("--num", type=int, help="number of times to run the application")
62 p.add_argument(
63 "--no-fail", help="add this flag to actually work", action="store_true"
64 )
65 p.add_argument(
66 "progargs", nargs=argparse.REMAINDER, help="The program and its arguments"
67 )
68 return p
69
70 class Runner:
71 def __init__(self):
72 if sys.stdin.isatty():
73 self.indata = None
74 else:
75 self.indata = sys.stdin.read().encode(sys.stdin.encoding)
76
77 def run_once(self, args: List[str]):
78 subprocess.run(
79 args, input=self.indata, stdout=sys.stdout, stderr=sys.stderr
80 ).check_returncode()
81
82 def run_many(self, n: int, args: List[str]):
83 for i in range(n):
84 self.run_once(args)
85
86 if __name__ == "__main__":
87 args = make_parser().parse_args()
88 assert args.no_fail == True, "Didn't set the --no-fail flag"
89 r = Runner()
90 r.run_many(args.num, args.progargs)
91 """.format(
92 interpreter=sys.executable
93 )
94 mpitmp = tmp_path_factory.mktemp("fake_mpi")
95 mpirun_file = mpitmp / "fake_mpirun"
96 mpirun_file.write_text(mpirun_text)
97 mpirun_file.chmod(0o755)
98
99 plat_conf = {
100 "runner": str(mpirun_file),
101 "nproc_flag": "--num",
102 "extra_flags": ["--no-fail"],
103 "env_set": {"TEST_MPI_FOO": "bar"},
104 "env_pass": ["USER"],
105 }
106 plat_conf_file = mpitmp / "plat_mpi.yml"
107 plat_conf_file.write_text(yaml.main.round_trip_dump(plat_conf))
108
109 yield str(plat_conf_file)
110
111 plat_conf_file.unlink()
112 mpirun_file.unlink()
113 mpitmp.rmdir()
114
115
116 def make_processes_input(np: int, tmp_path: Path) -> Path:
117 input_file = tmp_path / "input.yml"
118 with input_file.open("w") as f:
119 f.write("processes: %d\n" % np)
120 return input_file
121
122
123 def cwltool_args(fake_mpi_conf: str) -> List[str]:
124 return ["--enable-ext", "--enable-dev", "--mpi-config-file", fake_mpi_conf]
125
126
127 class TestMpiRun:
128 def test_fake_mpi_config(self, fake_mpi_conf: str) -> None:
129 conf_obj = MpiConfig.load(fake_mpi_conf)
130 runner = conf_obj.runner
131 assert os.path.dirname(runner) == os.path.dirname(fake_mpi_conf)
132 assert os.path.basename(runner) == "fake_mpirun"
133 assert conf_obj.nproc_flag == "--num"
134 assert conf_obj.default_nproc == 1
135 assert conf_obj.extra_flags == ["--no-fail"]
136
137 @windows_needs_docker
138 def test_simple_mpi_tool(self, fake_mpi_conf: str, tmp_path: Path) -> None:
139 stdout = StringIO()
140 stderr = StringIO()
141 with working_directory(tmp_path):
142 rc = main(
143 argsl=cwltool_args(fake_mpi_conf)
144 + [get_data("tests/wf/mpi_simple.cwl")],
145 stdout=stdout,
146 stderr=stderr,
147 )
148 assert rc == 0
149
150 output = json.loads(stdout.getvalue())
151 pid_path = output["pids"]["path"]
152 with open(pid_path) as pidfile:
153 pids = [int(line) for line in pidfile]
154 assert len(pids) == 2
155
156 @windows_needs_docker
157 def test_simple_mpi_nproc_expr(self, fake_mpi_conf: str, tmp_path: Path) -> None:
158 np = 4
159 input_file = make_processes_input(np, tmp_path)
160 stdout = StringIO()
161 stderr = StringIO()
162 with working_directory(tmp_path):
163 rc = main(
164 argsl=cwltool_args(fake_mpi_conf)
165 + [get_data("tests/wf/mpi_expr.cwl"), str(input_file)],
166 stdout=stdout,
167 stderr=stderr,
168 )
169 assert rc == 0
170
171 output = json.loads(stdout.getvalue())
172 pid_path = output["pids"]["path"]
173 with open(pid_path) as pidfile:
174 pids = [int(line) for line in pidfile]
175 assert len(pids) == np
176
177 @windows_needs_docker
178 def test_mpi_workflow(self, fake_mpi_conf: str, tmp_path: Path) -> None:
179 np = 3
180 input_file = make_processes_input(np, tmp_path)
181 stdout = StringIO()
182 stderr = StringIO()
183 with working_directory(tmp_path):
184 rc = main(
185 argsl=cwltool_args(fake_mpi_conf)
186 + [get_data("tests/wf/mpi_simple_wf.cwl"), str(input_file)],
187 stdout=stdout,
188 stderr=stderr,
189 )
190 assert rc == 0
191
192 output = json.loads(stdout.getvalue())
193 lc_path = output["line_count"]["path"]
194 with open(lc_path) as lc_file:
195 lc = int(lc_file.read())
196 assert lc == np
197
198 @windows_needs_docker
199 def test_environment(
200 self, fake_mpi_conf: str, tmp_path: Path, monkeypatch: Any
201 ) -> None:
202 stdout = StringIO()
203 stderr = StringIO()
204 monkeypatch.setenv("USER", "tester")
205 with working_directory(tmp_path):
206 rc = main(
207 argsl=cwltool_args(fake_mpi_conf) + [get_data("tests/wf/mpi_env.cwl")],
208 stdout=stdout,
209 stderr=stderr,
210 )
211 assert rc == 0
212
213 output = json.loads(stdout.getvalue())
214 env_path = output["environment"]["path"]
215 with open(env_path) as envfile:
216 e = {}
217 for line in envfile:
218 k, v = line.strip().split("=", 1)
219 e[k] = v
220 assert e["USER"] == "tester"
221 assert e["TEST_MPI_FOO"] == "bar"
222
223
224 def test_env_passing(monkeypatch: Any) -> None:
225 config = MpiConfig(
226 env_pass=["A", "B", "LONG_NAME"],
227 env_pass_regex=["TOOLNAME", "MPI_.*_CONF"],
228 )
229
230 env = {} # type: MutableMapping[str, str]
231
232 with monkeypatch.context() as m:
233 m.setattr(os, "environ", {})
234 env = {}
235 config.pass_through_env_vars(env)
236 assert env == {}
237
238 with monkeypatch.context() as m:
239 m.setattr(os, "environ", {"A": "a"})
240 env = {}
241 config.pass_through_env_vars(env)
242 assert env == {"A": "a"}
243
244 with monkeypatch.context() as m:
245 m.setattr(os, "environ", {"A": "a", "C": "c"})
246 env = {}
247 config.pass_through_env_vars(env)
248 assert env == {"A": "a"}
249
250 with monkeypatch.context() as m:
251 m.setattr(os, "environ", {"A": "a", "B": "b", "C": "c"})
252 env = {"PATH": "one:two:three", "HOME": "/tmp/dir", "TMPDIR": "/tmp/dir"}
253 config.pass_through_env_vars(env)
254 assert env == {
255 "PATH": "one:two:three",
256 "HOME": "/tmp/dir",
257 "TMPDIR": "/tmp/dir",
258 "A": "a",
259 "B": "b",
260 }
261
262 with monkeypatch.context() as m:
263 m.setattr(os, "environ", {"TOOLNAME": "foobar"})
264 env = {}
265 config.pass_through_env_vars(env)
266 assert env == {"TOOLNAME": "foobar"}
267
268 with monkeypatch.context() as m:
269 m.setattr(os, "environ", {"_TOOLNAME_": "foobar"})
270 env = {}
271 config.pass_through_env_vars(env)
272 # Cos we are matching not searching
273 assert env == {}
274
275 with monkeypatch.context() as m:
276 m.setattr(os, "environ", {"MPI_A_CONF": "A", "MPI_B_CONF": "B"})
277
278 env = {}
279 config.pass_through_env_vars(env)
280 # Cos we are matching not searching
281 assert env == {"MPI_A_CONF": "A", "MPI_B_CONF": "B"}
282
283
284 # Reading the schema is super slow - cache for the session
285 @pytest.fixture(scope="session")
286 def schema_ext11() -> Generator[Names, None, None]:
287 with pkg_resources.resource_stream("cwltool", "extensions-v1.1.yml") as res:
288 ext11 = res.read().decode("utf-8")
289 cwltool.process.use_custom_schema("v1.1", "http://commonwl.org/cwltool", ext11)
290 schema = cwltool.process.get_schema("v1.1")[1]
291 assert isinstance(schema, Names)
292 yield schema
293
294
295 mpiReq = CommentedMap({"class": MPIRequirementName, "processes": 1})
296 containerReq = CommentedMap({"class": "DockerRequirement"})
297 basetool = CommentedMap(
298 {"cwlVersion": "v1.1", "inputs": CommentedSeq(), "outputs": CommentedSeq()}
299 )
300
301
302 def mk_tool(
303 schema: Names,
304 opts: List[str],
305 reqs: Optional[List[CommentedMap]] = None,
306 hints: Optional[List[CommentedMap]] = None,
307 ) -> Tuple[LoadingContext, RuntimeContext, CommentedMap]:
308 tool = basetool.copy()
309
310 if reqs is not None:
311 tool["requirements"] = CommentedSeq(reqs)
312 if hints is not None:
313 tool["hints"] = CommentedSeq(hints)
314
315 args = cwltool.argparser.arg_parser().parse_args(opts)
316 args.enable_ext = True
317 rc = RuntimeContext(vars(args))
318 lc = cwltool.main.setup_loadingContext(None, rc, args)
319 lc.avsc_names = schema
320 return lc, rc, tool
321
322
323 def test_singularity(schema_ext11: Names) -> None:
324 lc, rc, tool = mk_tool(schema_ext11, ["--singularity"], reqs=[mpiReq, containerReq])
325 clt = CommandLineTool(tool, lc)
326 jr = clt.make_job_runner(rc)
327 assert jr is cwltool.singularity.SingularityCommandLineJob
328
329
330 def test_udocker(schema_ext11: Names) -> None:
331 lc, rc, tool = mk_tool(schema_ext11, ["--udocker"], reqs=[mpiReq, containerReq])
332 clt = CommandLineTool(tool, lc)
333 jr = clt.make_job_runner(rc)
334 assert jr is cwltool.udocker.UDockerCommandLineJob
335
336
337 def test_docker_hint(schema_ext11: Names) -> None:
338 # Docker hint, MPI required
339 lc, rc, tool = mk_tool(schema_ext11, [], hints=[containerReq], reqs=[mpiReq])
340 clt = CommandLineTool(tool, lc)
341 jr = clt.make_job_runner(rc)
342 assert jr is cwltool.job.CommandLineJob
343
344
345 def test_docker_required(schema_ext11: Names) -> None:
346 # Docker required, MPI hinted
347 lc, rc, tool = mk_tool(schema_ext11, [], reqs=[containerReq], hints=[mpiReq])
348 clt = CommandLineTool(tool, lc)
349 jr = clt.make_job_runner(rc)
350 assert jr is cwltool.docker.DockerCommandLineJob
351
352
353 def test_docker_mpi_both_required(schema_ext11: Names) -> None:
354 # Both required - error
355 with pytest.raises(cwltool.errors.UnsupportedRequirement):
356 lc, rc, tool = mk_tool(schema_ext11, [], reqs=[mpiReq, containerReq])
357 clt = CommandLineTool(tool, lc)
358 jr = clt.make_job_runner(rc)
359
360
361 def test_docker_mpi_both_hinted(schema_ext11: Names) -> None:
362 # Both hinted - error
363 with pytest.raises(cwltool.errors.UnsupportedRequirement):
364 lc, rc, tool = mk_tool(schema_ext11, [], hints=[mpiReq, containerReq])
365 clt = CommandLineTool(tool, lc)
366 jr = clt.make_job_runner(rc)