Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/tests/test_mpi.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """Tests of the experimental MPI extension.""" | |
2 import json | |
3 import os.path | |
4 import sys | |
5 from io import StringIO | |
6 from pathlib import Path | |
7 from typing import Any, Generator, List, MutableMapping, Optional, Tuple | |
8 | |
9 import pkg_resources | |
10 import pytest | |
11 from ruamel import yaml | |
12 from ruamel.yaml.comments import CommentedMap, CommentedSeq | |
13 from schema_salad.avro.schema import Names | |
14 | |
15 import cwltool.load_tool | |
16 import cwltool.singularity | |
17 import cwltool.udocker | |
18 from cwltool.command_line_tool import CommandLineTool | |
19 from cwltool.context import LoadingContext, RuntimeContext | |
20 from cwltool.main import main | |
21 from cwltool.mpi import MpiConfig, MPIRequirementName | |
22 | |
23 from .util import get_data, windows_needs_docker, working_directory | |
24 | |
25 | |
26 def test_mpi_conf_defaults() -> None: | |
27 mpi = MpiConfig() | |
28 assert mpi.runner == "mpirun" | |
29 assert mpi.nproc_flag == "-n" | |
30 assert mpi.default_nproc == 1 | |
31 assert mpi.extra_flags == [] | |
32 assert mpi.env_pass == [] | |
33 assert mpi.env_pass_regex == [] | |
34 assert mpi.env_set == {} | |
35 | |
36 | |
37 def test_mpi_conf_unknownkeys() -> None: | |
38 with pytest.raises(TypeError): | |
39 MpiConfig(runner="mpiexec", foo="bar") # type: ignore | |
40 | |
41 | |
42 @pytest.fixture(scope="class") | |
43 def fake_mpi_conf(tmp_path_factory: Any) -> Generator[str, None, None]: | |
44 """ | |
45 Make a super simple mpirun-alike for applications that don't actually use MPI. | |
46 | |
47 It just runs the command multiple times (in serial). | |
48 | |
49 Then create a plaform MPI config YAML file that should make it work | |
50 for the testing examples. | |
51 """ | |
52 mpirun_text = """#!{interpreter} | |
53 import argparse | |
54 import sys | |
55 import subprocess | |
56 from io import StringIO | |
57 from typing import List | |
58 | |
59 def make_parser(): | |
60 p = argparse.ArgumentParser() | |
61 p.add_argument("--num", type=int, help="number of times to run the application") | |
62 p.add_argument( | |
63 "--no-fail", help="add this flag to actually work", action="store_true" | |
64 ) | |
65 p.add_argument( | |
66 "progargs", nargs=argparse.REMAINDER, help="The program and its arguments" | |
67 ) | |
68 return p | |
69 | |
70 class Runner: | |
71 def __init__(self): | |
72 if sys.stdin.isatty(): | |
73 self.indata = None | |
74 else: | |
75 self.indata = sys.stdin.read().encode(sys.stdin.encoding) | |
76 | |
77 def run_once(self, args: List[str]): | |
78 subprocess.run( | |
79 args, input=self.indata, stdout=sys.stdout, stderr=sys.stderr | |
80 ).check_returncode() | |
81 | |
82 def run_many(self, n: int, args: List[str]): | |
83 for i in range(n): | |
84 self.run_once(args) | |
85 | |
86 if __name__ == "__main__": | |
87 args = make_parser().parse_args() | |
88 assert args.no_fail == True, "Didn't set the --no-fail flag" | |
89 r = Runner() | |
90 r.run_many(args.num, args.progargs) | |
91 """.format( | |
92 interpreter=sys.executable | |
93 ) | |
94 mpitmp = tmp_path_factory.mktemp("fake_mpi") | |
95 mpirun_file = mpitmp / "fake_mpirun" | |
96 mpirun_file.write_text(mpirun_text) | |
97 mpirun_file.chmod(0o755) | |
98 | |
99 plat_conf = { | |
100 "runner": str(mpirun_file), | |
101 "nproc_flag": "--num", | |
102 "extra_flags": ["--no-fail"], | |
103 "env_set": {"TEST_MPI_FOO": "bar"}, | |
104 "env_pass": ["USER"], | |
105 } | |
106 plat_conf_file = mpitmp / "plat_mpi.yml" | |
107 plat_conf_file.write_text(yaml.main.round_trip_dump(plat_conf)) | |
108 | |
109 yield str(plat_conf_file) | |
110 | |
111 plat_conf_file.unlink() | |
112 mpirun_file.unlink() | |
113 mpitmp.rmdir() | |
114 | |
115 | |
116 def make_processes_input(np: int, tmp_path: Path) -> Path: | |
117 input_file = tmp_path / "input.yml" | |
118 with input_file.open("w") as f: | |
119 f.write("processes: %d\n" % np) | |
120 return input_file | |
121 | |
122 | |
123 def cwltool_args(fake_mpi_conf: str) -> List[str]: | |
124 return ["--enable-ext", "--enable-dev", "--mpi-config-file", fake_mpi_conf] | |
125 | |
126 | |
127 class TestMpiRun: | |
128 def test_fake_mpi_config(self, fake_mpi_conf: str) -> None: | |
129 conf_obj = MpiConfig.load(fake_mpi_conf) | |
130 runner = conf_obj.runner | |
131 assert os.path.dirname(runner) == os.path.dirname(fake_mpi_conf) | |
132 assert os.path.basename(runner) == "fake_mpirun" | |
133 assert conf_obj.nproc_flag == "--num" | |
134 assert conf_obj.default_nproc == 1 | |
135 assert conf_obj.extra_flags == ["--no-fail"] | |
136 | |
137 @windows_needs_docker | |
138 def test_simple_mpi_tool(self, fake_mpi_conf: str, tmp_path: Path) -> None: | |
139 stdout = StringIO() | |
140 stderr = StringIO() | |
141 with working_directory(tmp_path): | |
142 rc = main( | |
143 argsl=cwltool_args(fake_mpi_conf) | |
144 + [get_data("tests/wf/mpi_simple.cwl")], | |
145 stdout=stdout, | |
146 stderr=stderr, | |
147 ) | |
148 assert rc == 0 | |
149 | |
150 output = json.loads(stdout.getvalue()) | |
151 pid_path = output["pids"]["path"] | |
152 with open(pid_path) as pidfile: | |
153 pids = [int(line) for line in pidfile] | |
154 assert len(pids) == 2 | |
155 | |
156 @windows_needs_docker | |
157 def test_simple_mpi_nproc_expr(self, fake_mpi_conf: str, tmp_path: Path) -> None: | |
158 np = 4 | |
159 input_file = make_processes_input(np, tmp_path) | |
160 stdout = StringIO() | |
161 stderr = StringIO() | |
162 with working_directory(tmp_path): | |
163 rc = main( | |
164 argsl=cwltool_args(fake_mpi_conf) | |
165 + [get_data("tests/wf/mpi_expr.cwl"), str(input_file)], | |
166 stdout=stdout, | |
167 stderr=stderr, | |
168 ) | |
169 assert rc == 0 | |
170 | |
171 output = json.loads(stdout.getvalue()) | |
172 pid_path = output["pids"]["path"] | |
173 with open(pid_path) as pidfile: | |
174 pids = [int(line) for line in pidfile] | |
175 assert len(pids) == np | |
176 | |
177 @windows_needs_docker | |
178 def test_mpi_workflow(self, fake_mpi_conf: str, tmp_path: Path) -> None: | |
179 np = 3 | |
180 input_file = make_processes_input(np, tmp_path) | |
181 stdout = StringIO() | |
182 stderr = StringIO() | |
183 with working_directory(tmp_path): | |
184 rc = main( | |
185 argsl=cwltool_args(fake_mpi_conf) | |
186 + [get_data("tests/wf/mpi_simple_wf.cwl"), str(input_file)], | |
187 stdout=stdout, | |
188 stderr=stderr, | |
189 ) | |
190 assert rc == 0 | |
191 | |
192 output = json.loads(stdout.getvalue()) | |
193 lc_path = output["line_count"]["path"] | |
194 with open(lc_path) as lc_file: | |
195 lc = int(lc_file.read()) | |
196 assert lc == np | |
197 | |
198 @windows_needs_docker | |
199 def test_environment( | |
200 self, fake_mpi_conf: str, tmp_path: Path, monkeypatch: Any | |
201 ) -> None: | |
202 stdout = StringIO() | |
203 stderr = StringIO() | |
204 monkeypatch.setenv("USER", "tester") | |
205 with working_directory(tmp_path): | |
206 rc = main( | |
207 argsl=cwltool_args(fake_mpi_conf) + [get_data("tests/wf/mpi_env.cwl")], | |
208 stdout=stdout, | |
209 stderr=stderr, | |
210 ) | |
211 assert rc == 0 | |
212 | |
213 output = json.loads(stdout.getvalue()) | |
214 env_path = output["environment"]["path"] | |
215 with open(env_path) as envfile: | |
216 e = {} | |
217 for line in envfile: | |
218 k, v = line.strip().split("=", 1) | |
219 e[k] = v | |
220 assert e["USER"] == "tester" | |
221 assert e["TEST_MPI_FOO"] == "bar" | |
222 | |
223 | |
224 def test_env_passing(monkeypatch: Any) -> None: | |
225 config = MpiConfig( | |
226 env_pass=["A", "B", "LONG_NAME"], | |
227 env_pass_regex=["TOOLNAME", "MPI_.*_CONF"], | |
228 ) | |
229 | |
230 env = {} # type: MutableMapping[str, str] | |
231 | |
232 with monkeypatch.context() as m: | |
233 m.setattr(os, "environ", {}) | |
234 env = {} | |
235 config.pass_through_env_vars(env) | |
236 assert env == {} | |
237 | |
238 with monkeypatch.context() as m: | |
239 m.setattr(os, "environ", {"A": "a"}) | |
240 env = {} | |
241 config.pass_through_env_vars(env) | |
242 assert env == {"A": "a"} | |
243 | |
244 with monkeypatch.context() as m: | |
245 m.setattr(os, "environ", {"A": "a", "C": "c"}) | |
246 env = {} | |
247 config.pass_through_env_vars(env) | |
248 assert env == {"A": "a"} | |
249 | |
250 with monkeypatch.context() as m: | |
251 m.setattr(os, "environ", {"A": "a", "B": "b", "C": "c"}) | |
252 env = {"PATH": "one:two:three", "HOME": "/tmp/dir", "TMPDIR": "/tmp/dir"} | |
253 config.pass_through_env_vars(env) | |
254 assert env == { | |
255 "PATH": "one:two:three", | |
256 "HOME": "/tmp/dir", | |
257 "TMPDIR": "/tmp/dir", | |
258 "A": "a", | |
259 "B": "b", | |
260 } | |
261 | |
262 with monkeypatch.context() as m: | |
263 m.setattr(os, "environ", {"TOOLNAME": "foobar"}) | |
264 env = {} | |
265 config.pass_through_env_vars(env) | |
266 assert env == {"TOOLNAME": "foobar"} | |
267 | |
268 with monkeypatch.context() as m: | |
269 m.setattr(os, "environ", {"_TOOLNAME_": "foobar"}) | |
270 env = {} | |
271 config.pass_through_env_vars(env) | |
272 # Cos we are matching not searching | |
273 assert env == {} | |
274 | |
275 with monkeypatch.context() as m: | |
276 m.setattr(os, "environ", {"MPI_A_CONF": "A", "MPI_B_CONF": "B"}) | |
277 | |
278 env = {} | |
279 config.pass_through_env_vars(env) | |
280 # Cos we are matching not searching | |
281 assert env == {"MPI_A_CONF": "A", "MPI_B_CONF": "B"} | |
282 | |
283 | |
284 # Reading the schema is super slow - cache for the session | |
285 @pytest.fixture(scope="session") | |
286 def schema_ext11() -> Generator[Names, None, None]: | |
287 with pkg_resources.resource_stream("cwltool", "extensions-v1.1.yml") as res: | |
288 ext11 = res.read().decode("utf-8") | |
289 cwltool.process.use_custom_schema("v1.1", "http://commonwl.org/cwltool", ext11) | |
290 schema = cwltool.process.get_schema("v1.1")[1] | |
291 assert isinstance(schema, Names) | |
292 yield schema | |
293 | |
294 | |
295 mpiReq = CommentedMap({"class": MPIRequirementName, "processes": 1}) | |
296 containerReq = CommentedMap({"class": "DockerRequirement"}) | |
297 basetool = CommentedMap( | |
298 {"cwlVersion": "v1.1", "inputs": CommentedSeq(), "outputs": CommentedSeq()} | |
299 ) | |
300 | |
301 | |
302 def mk_tool( | |
303 schema: Names, | |
304 opts: List[str], | |
305 reqs: Optional[List[CommentedMap]] = None, | |
306 hints: Optional[List[CommentedMap]] = None, | |
307 ) -> Tuple[LoadingContext, RuntimeContext, CommentedMap]: | |
308 tool = basetool.copy() | |
309 | |
310 if reqs is not None: | |
311 tool["requirements"] = CommentedSeq(reqs) | |
312 if hints is not None: | |
313 tool["hints"] = CommentedSeq(hints) | |
314 | |
315 args = cwltool.argparser.arg_parser().parse_args(opts) | |
316 args.enable_ext = True | |
317 rc = RuntimeContext(vars(args)) | |
318 lc = cwltool.main.setup_loadingContext(None, rc, args) | |
319 lc.avsc_names = schema | |
320 return lc, rc, tool | |
321 | |
322 | |
323 def test_singularity(schema_ext11: Names) -> None: | |
324 lc, rc, tool = mk_tool(schema_ext11, ["--singularity"], reqs=[mpiReq, containerReq]) | |
325 clt = CommandLineTool(tool, lc) | |
326 jr = clt.make_job_runner(rc) | |
327 assert jr is cwltool.singularity.SingularityCommandLineJob | |
328 | |
329 | |
330 def test_udocker(schema_ext11: Names) -> None: | |
331 lc, rc, tool = mk_tool(schema_ext11, ["--udocker"], reqs=[mpiReq, containerReq]) | |
332 clt = CommandLineTool(tool, lc) | |
333 jr = clt.make_job_runner(rc) | |
334 assert jr is cwltool.udocker.UDockerCommandLineJob | |
335 | |
336 | |
337 def test_docker_hint(schema_ext11: Names) -> None: | |
338 # Docker hint, MPI required | |
339 lc, rc, tool = mk_tool(schema_ext11, [], hints=[containerReq], reqs=[mpiReq]) | |
340 clt = CommandLineTool(tool, lc) | |
341 jr = clt.make_job_runner(rc) | |
342 assert jr is cwltool.job.CommandLineJob | |
343 | |
344 | |
345 def test_docker_required(schema_ext11: Names) -> None: | |
346 # Docker required, MPI hinted | |
347 lc, rc, tool = mk_tool(schema_ext11, [], reqs=[containerReq], hints=[mpiReq]) | |
348 clt = CommandLineTool(tool, lc) | |
349 jr = clt.make_job_runner(rc) | |
350 assert jr is cwltool.docker.DockerCommandLineJob | |
351 | |
352 | |
353 def test_docker_mpi_both_required(schema_ext11: Names) -> None: | |
354 # Both required - error | |
355 with pytest.raises(cwltool.errors.UnsupportedRequirement): | |
356 lc, rc, tool = mk_tool(schema_ext11, [], reqs=[mpiReq, containerReq]) | |
357 clt = CommandLineTool(tool, lc) | |
358 jr = clt.make_job_runner(rc) | |
359 | |
360 | |
361 def test_docker_mpi_both_hinted(schema_ext11: Names) -> None: | |
362 # Both hinted - error | |
363 with pytest.raises(cwltool.errors.UnsupportedRequirement): | |
364 lc, rc, tool = mk_tool(schema_ext11, [], hints=[mpiReq, containerReq]) | |
365 clt = CommandLineTool(tool, lc) | |
366 jr = clt.make_job_runner(rc) |