comparison env/lib/python3.9/site-packages/cwltool/docker.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Enables Docker software containers via the {u,}docker runtimes."""
2
3 import csv
4 import datetime
5 import os
6 import re
7 import shutil
8 import subprocess # nosec
9 import sys
10 import threading
11 from distutils import spawn
12 from io import StringIO # pylint: disable=redefined-builtin
13 from typing import Callable, Dict, List, MutableMapping, Optional, Set, Tuple, cast
14
15 import requests
16
17 from .builder import Builder
18 from .context import RuntimeContext
19 from .docker_id import docker_vm_id
20 from .errors import WorkflowException
21 from .job import ContainerCommandLineJob
22 from .loghandler import _logger
23 from .pathmapper import MapperEnt, PathMapper
24 from .utils import (
25 CWLObjectType,
26 create_tmp_dir,
27 docker_windows_path_adjust,
28 ensure_writable,
29 onWindows,
30 )
31
32 _IMAGES = set() # type: Set[str]
33 _IMAGES_LOCK = threading.Lock()
34 __docker_machine_mounts = None # type: Optional[List[str]]
35 __docker_machine_mounts_lock = threading.Lock()
36
37
38 def _get_docker_machine_mounts() -> List[str]:
39 global __docker_machine_mounts
40 if __docker_machine_mounts is None:
41 with __docker_machine_mounts_lock:
42 if "DOCKER_MACHINE_NAME" not in os.environ:
43 __docker_machine_mounts = []
44 else:
45 __docker_machine_mounts = [
46 "/" + line.split(None, 1)[0]
47 for line in subprocess.check_output( # nosec
48 [
49 "docker-machine",
50 "ssh",
51 os.environ["DOCKER_MACHINE_NAME"],
52 "mount",
53 "-t",
54 "vboxsf",
55 ],
56 universal_newlines=True,
57 ).splitlines()
58 ]
59 return __docker_machine_mounts
60
61
62 def _check_docker_machine_path(path: Optional[str]) -> None:
63 if path is None:
64 return
65 if onWindows():
66 path = path.lower()
67 mounts = _get_docker_machine_mounts()
68
69 found = False
70 for mount in mounts:
71 if onWindows():
72 mount = mount.lower()
73 if path.startswith(mount):
74 found = True
75 break
76
77 if not found and mounts:
78 name = os.environ.get("DOCKER_MACHINE_NAME", "???")
79 raise WorkflowException(
80 "Input path {path} is not in the list of host paths mounted "
81 "into the Docker virtual machine named {name}. Already mounted "
82 "paths: {mounts}.\n"
83 "See https://docs.docker.com/toolbox/toolbox_install_windows/"
84 "#optional-add-shared-directories for instructions on how to "
85 "add this path to your VM.".format(path=path, name=name, mounts=mounts)
86 )
87
88
89 class DockerCommandLineJob(ContainerCommandLineJob):
90 """Runs a CommandLineJob in a sofware container using the Docker engine."""
91
92 def __init__(
93 self,
94 builder: Builder,
95 joborder: CWLObjectType,
96 make_path_mapper: Callable[..., PathMapper],
97 requirements: List[CWLObjectType],
98 hints: List[CWLObjectType],
99 name: str,
100 ) -> None:
101 """Initialize a command line builder using the Docker software container engine."""
102 super().__init__(builder, joborder, make_path_mapper, requirements, hints, name)
103
104 @staticmethod
105 def get_image(
106 docker_requirement: Dict[str, str],
107 pull_image: bool,
108 force_pull: bool,
109 tmp_outdir_prefix: str,
110 ) -> bool:
111 """
112 Retrieve the relevant Docker container image.
113
114 Returns True upon success
115 """
116 found = False
117
118 if (
119 "dockerImageId" not in docker_requirement
120 and "dockerPull" in docker_requirement
121 ):
122 docker_requirement["dockerImageId"] = docker_requirement["dockerPull"]
123
124 with _IMAGES_LOCK:
125 if docker_requirement["dockerImageId"] in _IMAGES:
126 return True
127
128 for line in (
129 subprocess.check_output( # nosec
130 ["docker", "images", "--no-trunc", "--all"]
131 )
132 .decode("utf-8")
133 .splitlines()
134 ):
135 try:
136 match = re.match(r"^([^ ]+)\s+([^ ]+)\s+([^ ]+)", line)
137 split = docker_requirement["dockerImageId"].split(":")
138 if len(split) == 1:
139 split.append("latest")
140 elif len(split) == 2:
141 # if split[1] doesn't match valid tag names, it is a part of repository
142 if not re.match(r"[\w][\w.-]{0,127}", split[1]):
143 split[0] = split[0] + ":" + split[1]
144 split[1] = "latest"
145 elif len(split) == 3:
146 if re.match(r"[\w][\w.-]{0,127}", split[2]):
147 split[0] = split[0] + ":" + split[1]
148 split[1] = split[2]
149 del split[2]
150
151 # check for repository:tag match or image id match
152 if match and (
153 (split[0] == match.group(1) and split[1] == match.group(2))
154 or docker_requirement["dockerImageId"] == match.group(3)
155 ):
156 found = True
157 break
158 except ValueError:
159 pass
160
161 if (force_pull or not found) and pull_image:
162 cmd = [] # type: List[str]
163 if "dockerPull" in docker_requirement:
164 cmd = ["docker", "pull", str(docker_requirement["dockerPull"])]
165 _logger.info(str(cmd))
166 subprocess.check_call(cmd, stdout=sys.stderr) # nosec
167 found = True
168 elif "dockerFile" in docker_requirement:
169 dockerfile_dir = create_tmp_dir(tmp_outdir_prefix)
170 with open(os.path.join(dockerfile_dir, "Dockerfile"), "wb") as dfile:
171 dfile.write(docker_requirement["dockerFile"].encode("utf-8"))
172 cmd = [
173 "docker",
174 "build",
175 "--tag=%s" % str(docker_requirement["dockerImageId"]),
176 dockerfile_dir,
177 ]
178 _logger.info(str(cmd))
179 subprocess.check_call(cmd, stdout=sys.stderr) # nosec
180 found = True
181 elif "dockerLoad" in docker_requirement:
182 cmd = ["docker", "load"]
183 _logger.info(str(cmd))
184 if os.path.exists(docker_requirement["dockerLoad"]):
185 _logger.info(
186 "Loading docker image from %s",
187 docker_requirement["dockerLoad"],
188 )
189 with open(docker_requirement["dockerLoad"], "rb") as dload:
190 loadproc = subprocess.Popen( # nosec
191 cmd, stdin=dload, stdout=sys.stderr
192 )
193 else:
194 loadproc = subprocess.Popen( # nosec
195 cmd, stdin=subprocess.PIPE, stdout=sys.stderr
196 )
197 assert loadproc.stdin is not None # nosec
198 _logger.info(
199 "Sending GET request to %s", docker_requirement["dockerLoad"]
200 )
201 req = requests.get(docker_requirement["dockerLoad"], stream=True)
202 size = 0
203 for chunk in req.iter_content(1024 * 1024):
204 size += len(chunk)
205 _logger.info("\r%i bytes", size)
206 loadproc.stdin.write(chunk)
207 loadproc.stdin.close()
208 rcode = loadproc.wait()
209 if rcode != 0:
210 raise WorkflowException(
211 "Docker load returned non-zero exit status %i" % (rcode)
212 )
213 found = True
214 elif "dockerImport" in docker_requirement:
215 cmd = [
216 "docker",
217 "import",
218 str(docker_requirement["dockerImport"]),
219 str(docker_requirement["dockerImageId"]),
220 ]
221 _logger.info(str(cmd))
222 subprocess.check_call(cmd, stdout=sys.stderr) # nosec
223 found = True
224
225 if found:
226 with _IMAGES_LOCK:
227 _IMAGES.add(docker_requirement["dockerImageId"])
228
229 return found
230
231 def get_from_requirements(
232 self,
233 r: CWLObjectType,
234 pull_image: bool,
235 force_pull: bool,
236 tmp_outdir_prefix: str,
237 ) -> Optional[str]:
238 if not spawn.find_executable("docker"):
239 raise WorkflowException("docker executable is not available")
240
241 if self.get_image(
242 cast(Dict[str, str], r), pull_image, force_pull, tmp_outdir_prefix
243 ):
244 return cast(Optional[str], r["dockerImageId"])
245 raise WorkflowException("Docker image %s not found" % r["dockerImageId"])
246
247 @staticmethod
248 def append_volume(
249 runtime: List[str], source: str, target: str, writable: bool = False
250 ) -> None:
251 """Add binding arguments to the runtime list."""
252 options = [
253 "type=bind",
254 "source=" + source,
255 "target=" + target,
256 ]
257 if not writable:
258 options.append("readonly")
259 output = StringIO()
260 csv.writer(output).writerow(options)
261 mount_arg = output.getvalue().strip()
262 runtime.append(f"--mount={mount_arg}")
263 # Unlike "--volume", "--mount" will fail if the volume doesn't already exist.
264 if not os.path.exists(source):
265 os.makedirs(source)
266
267 def add_file_or_directory_volume(
268 self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str]
269 ) -> None:
270 """Append volume a file/dir mapping to the runtime option list."""
271 if not volume.resolved.startswith("_:"):
272 _check_docker_machine_path(docker_windows_path_adjust(volume.resolved))
273 self.append_volume(runtime, volume.resolved, volume.target)
274
275 def add_writable_file_volume(
276 self,
277 runtime: List[str],
278 volume: MapperEnt,
279 host_outdir_tgt: Optional[str],
280 tmpdir_prefix: str,
281 ) -> None:
282 """Append a writable file mapping to the runtime option list."""
283 if self.inplace_update:
284 self.append_volume(runtime, volume.resolved, volume.target, writable=True)
285 else:
286 if host_outdir_tgt:
287 # shortcut, just copy to the output directory
288 # which is already going to be mounted
289 if not os.path.exists(os.path.dirname(host_outdir_tgt)):
290 os.makedirs(os.path.dirname(host_outdir_tgt))
291 shutil.copy(volume.resolved, host_outdir_tgt)
292 else:
293 tmpdir = create_tmp_dir(tmpdir_prefix)
294 file_copy = os.path.join(tmpdir, os.path.basename(volume.resolved))
295 shutil.copy(volume.resolved, file_copy)
296 self.append_volume(runtime, file_copy, volume.target, writable=True)
297 ensure_writable(host_outdir_tgt or file_copy)
298
299 def add_writable_directory_volume(
300 self,
301 runtime: List[str],
302 volume: MapperEnt,
303 host_outdir_tgt: Optional[str],
304 tmpdir_prefix: str,
305 ) -> None:
306 """Append a writable directory mapping to the runtime option list."""
307 if volume.resolved.startswith("_:"):
308 # Synthetic directory that needs creating first
309 if not host_outdir_tgt:
310 new_dir = os.path.join(
311 create_tmp_dir(tmpdir_prefix),
312 os.path.basename(volume.target),
313 )
314 self.append_volume(runtime, new_dir, volume.target, writable=True)
315 elif not os.path.exists(host_outdir_tgt):
316 os.makedirs(host_outdir_tgt)
317 else:
318 if self.inplace_update:
319 self.append_volume(
320 runtime, volume.resolved, volume.target, writable=True
321 )
322 else:
323 if not host_outdir_tgt:
324 tmpdir = create_tmp_dir(tmpdir_prefix)
325 new_dir = os.path.join(tmpdir, os.path.basename(volume.resolved))
326 shutil.copytree(volume.resolved, new_dir)
327 self.append_volume(runtime, new_dir, volume.target, writable=True)
328 else:
329 shutil.copytree(volume.resolved, host_outdir_tgt)
330 ensure_writable(host_outdir_tgt or new_dir)
331
332 def create_runtime(
333 self, env: MutableMapping[str, str], runtimeContext: RuntimeContext
334 ) -> Tuple[List[str], Optional[str]]:
335 any_path_okay = self.builder.get_requirement("DockerRequirement")[1] or False
336 user_space_docker_cmd = runtimeContext.user_space_docker_cmd
337 if user_space_docker_cmd:
338 if "udocker" in user_space_docker_cmd and not runtimeContext.debug:
339 runtime = [user_space_docker_cmd, "--quiet", "run"]
340 # udocker 1.1.1 will output diagnostic messages to stdout
341 # without this
342 else:
343 runtime = [user_space_docker_cmd, "run"]
344 else:
345 runtime = ["docker", "run", "-i"]
346 self.append_volume(
347 runtime, os.path.realpath(self.outdir), self.builder.outdir, writable=True
348 )
349 tmpdir = "/tmp" # nosec
350 self.append_volume(
351 runtime, os.path.realpath(self.tmpdir), tmpdir, writable=True
352 )
353 self.add_volumes(
354 self.pathmapper,
355 runtime,
356 any_path_okay=True,
357 secret_store=runtimeContext.secret_store,
358 tmpdir_prefix=runtimeContext.tmpdir_prefix,
359 )
360 if self.generatemapper is not None:
361 self.add_volumes(
362 self.generatemapper,
363 runtime,
364 any_path_okay=any_path_okay,
365 secret_store=runtimeContext.secret_store,
366 tmpdir_prefix=runtimeContext.tmpdir_prefix,
367 )
368
369 if user_space_docker_cmd:
370 runtime = [x.replace(":ro", "") for x in runtime]
371 runtime = [x.replace(":rw", "") for x in runtime]
372
373 runtime.append(
374 "--workdir=%s" % (docker_windows_path_adjust(self.builder.outdir))
375 )
376 if not user_space_docker_cmd:
377
378 if not runtimeContext.no_read_only:
379 runtime.append("--read-only=true")
380
381 if self.networkaccess:
382 if runtimeContext.custom_net:
383 runtime.append(f"--net={runtimeContext.custom_net}")
384 else:
385 runtime.append("--net=none")
386
387 if self.stdout is not None:
388 runtime.append("--log-driver=none")
389
390 euid, egid = docker_vm_id()
391 if not onWindows():
392 # MS Windows does not have getuid() or geteuid() functions
393 euid, egid = euid or os.geteuid(), egid or os.getgid()
394
395 if runtimeContext.no_match_user is False and (
396 euid is not None and egid is not None
397 ):
398 runtime.append("--user=%d:%d" % (euid, egid))
399
400 if runtimeContext.rm_container:
401 runtime.append("--rm")
402
403 runtime.append("--env=TMPDIR=/tmp")
404
405 # spec currently says "HOME must be set to the designated output
406 # directory." but spec might change to designated temp directory.
407 # runtime.append("--env=HOME=/tmp")
408 runtime.append("--env=HOME=%s" % self.builder.outdir)
409
410 cidfile_path = None # type: Optional[str]
411 # add parameters to docker to write a container ID file
412 if runtimeContext.user_space_docker_cmd is None:
413 if runtimeContext.cidfile_dir:
414 cidfile_dir = runtimeContext.cidfile_dir
415 if not os.path.exists(str(cidfile_dir)):
416 _logger.error(
417 "--cidfile-dir %s error:\n%s",
418 cidfile_dir,
419 "directory doesn't exist, please create it first",
420 )
421 exit(2)
422 if not os.path.isdir(cidfile_dir):
423 _logger.error(
424 "--cidfile-dir %s error:\n%s",
425 cidfile_dir,
426 cidfile_dir + " is not a directory, please check it first",
427 )
428 exit(2)
429 else:
430 cidfile_dir = runtimeContext.create_tmpdir()
431
432 cidfile_name = datetime.datetime.now().strftime("%Y%m%d%H%M%S-%f") + ".cid"
433 if runtimeContext.cidfile_prefix is not None:
434 cidfile_name = str(runtimeContext.cidfile_prefix + "-" + cidfile_name)
435 cidfile_path = os.path.join(cidfile_dir, cidfile_name)
436 runtime.append("--cidfile=%s" % cidfile_path)
437 for key, value in self.environment.items():
438 runtime.append(f"--env={key}={value}")
439
440 if runtimeContext.strict_memory_limit and not user_space_docker_cmd:
441 ram = self.builder.resources["ram"]
442 if not isinstance(ram, str):
443 runtime.append("--memory=%dm" % ram)
444 elif not user_space_docker_cmd:
445 res_req, _ = self.builder.get_requirement("ResourceRequirement")
446 if res_req and ("ramMin" in res_req or "ramMax" in res_req):
447 _logger.warning(
448 "[job %s] Skipping Docker software container '--memory' limit "
449 "despite presence of ResourceRequirement with ramMin "
450 "and/or ramMax setting. Consider running with "
451 "--strict-memory-limit for increased portability "
452 "assurance.",
453 self.name,
454 )
455
456 return runtime, cidfile_path