comparison env/lib/python3.9/site-packages/galaxy/containers/docker.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """
2 Interface to Docker
3 """
4
5 import logging
6 import os
7 import shlex
8 from functools import partial
9 from itertools import cycle, repeat
10 from time import sleep
11 from typing import Any, Dict, Optional, Type
12
13 try:
14 import docker
15 except ImportError:
16 docker = None # type: ignore
17
18 try:
19 from requests.exceptions import ConnectionError, ReadTimeout
20 except ImportError:
21 ConnectionError = None # type: ignore
22 ReadTimeout = None # type: ignore
23
24 from galaxy.containers import Container, ContainerInterface
25 from galaxy.containers.docker_decorators import (
26 docker_columns,
27 docker_json
28 )
29 from galaxy.containers.docker_model import (
30 DockerContainer,
31 DockerVolume
32 )
33 from galaxy.exceptions import (
34 ContainerCLIError,
35 ContainerImageNotFound,
36 ContainerNotFound
37 )
38 from galaxy.util.json import safe_dumps_formatted
39
40 log = logging.getLogger(__name__)
41
42
43 class DockerInterface(ContainerInterface):
44
45 container_class: Type[Container] = DockerContainer
46 volume_class = DockerVolume
47 conf_defaults: Dict[str, Optional[Any]] = {
48 'host': None,
49 'tls': False,
50 'force_tlsverify': False,
51 'auto_remove': True,
52 'image': None,
53 'cpus': None,
54 'memory': None,
55 }
56 # These values are inserted into kwopts for run commands
57 conf_run_kwopts = (
58 'cpus',
59 'memory',
60 )
61
62 def validate_config(self):
63 super().validate_config()
64 self.__host_iter = None
65 if self._conf.host is None or isinstance(self._conf.host, str):
66 self.__host_iter = repeat(self._conf.host)
67 else:
68 self.__host_iter = cycle(self._conf.host)
69
70 @property
71 def _default_image(self):
72 assert self._conf.image is not None, "No default image for this docker interface"
73 return self._conf.image
74
75 def run_in_container(self, command, image=None, **kwopts):
76 for opt in self.conf_run_kwopts:
77 if self._conf[opt]:
78 kwopts[opt] = self._conf[opt]
79 self.set_kwopts_name(kwopts)
80 return self.run(command, image=image, **kwopts)
81
82 def image_repodigest(self, image):
83 """Get the digest image string for an image.
84
85 :type image: str
86 :param image: image id or image name and optionally, tag
87
88 :returns: digest string, having the format `<name>@<hash_alg>:<digest>`, e.g.:
89 `'bgruening/docker-jupyter-notebook@sha256:3ec0bc9abc9d511aa602ee4fff2534d80dd9b1564482de52cb5de36cce6debae'`
90 or, the original image name if the digest cannot be
91 determined (the image has not been pulled)
92 """
93 try:
94 inspect = self.image_inspect(image)
95 return inspect['RepoDigests'][0]
96 except ContainerImageNotFound:
97 return image
98
99 @property
100 def host(self):
101 return next(self.__host_iter)
102
103 @property
104 def host_iter(self):
105 return self.__host_iter
106
107
108 class DockerCLIInterface(DockerInterface):
109
110 container_type = 'docker_cli'
111 conf_defaults: Dict[str, Optional[Any]] = {
112 'command_template': '{executable} {global_kwopts} {subcommand} {args}',
113 'executable': 'docker',
114 }
115 option_map = {
116 # `run` options
117 'environment': {'flag': '--env', 'type': 'list_of_kvpairs'},
118 'volumes': {'flag': '--volume', 'type': 'docker_volumes'},
119 'name': {'flag': '--name', 'type': 'string'},
120 'detach': {'flag': '--detach', 'type': 'boolean'},
121 'publish_all_ports': {'flag': '--publish-all', 'type': 'boolean'},
122 'publish_port_random': {'flag': '--publish', 'type': 'string'},
123 'auto_remove': {'flag': '--rm', 'type': 'boolean'},
124 'cpus': {'flag': '--cpus', 'type': 'string'},
125 'memory': {'flag': '--memory', 'type': 'string'},
126 }
127
128 def validate_config(self):
129 log.warning('The `docker_cli` interface is deprecated and will be removed in Galaxy 18.09, please use `docker`')
130 super().validate_config()
131 global_kwopts = []
132 if self._conf.host:
133 global_kwopts.append('--host')
134 global_kwopts.append(shlex.quote(self._conf.host))
135 if self._conf.force_tlsverify:
136 global_kwopts.append('--tlsverify')
137 self._docker_command = self._conf['command_template'].format(
138 executable=self._conf['executable'],
139 global_kwopts=' '.join(global_kwopts),
140 subcommand='{subcommand}',
141 args='{args}'
142 )
143
144 def _filter_by_id_or_name(self, id, name):
145 if id:
146 return f'--filter id={id}'
147 elif name:
148 return f'--filter name={name}'
149 return None
150
151 def _stringify_kwopt_docker_volumes(self, flag, val):
152 """The docker API will take a volumes argument in many formats, try to
153 deal with that for the command line
154 """
155 l = []
156 if isinstance(val, list):
157 # ['/host/vol']
158 l = val
159 else:
160 for hostvol, guestopts in val.items():
161 if isinstance(guestopts, str):
162 # {'/host/vol': '/container/vol'}
163 l.append(f'{hostvol}:{guestopts}')
164 else:
165 # {'/host/vol': {'bind': '/container/vol'}}
166 # {'/host/vol': {'bind': '/container/vol', 'mode': 'rw'}}
167 mode = guestopts.get('mode', '')
168 l.append('{vol}:{bind}{mode}'.format(
169 vol=hostvol,
170 bind=guestopts['bind'],
171 mode=':' + mode if mode else ''
172 ))
173 return self._stringify_kwopt_list(flag, l)
174
175 def _run_docker(self, subcommand, args=None, verbose=False):
176 command = self._docker_command.format(subcommand=subcommand, args=args or '')
177 return self._run_command(command, verbose=verbose)
178
179 #
180 # docker subcommands
181 #
182
183 @docker_columns
184 def ps(self, id=None, name=None):
185 return self._run_docker(subcommand='ps', args=self._filter_by_id_or_name(id, name))
186
187 def run(self, command, image=None, **kwopts):
188 args = '{kwopts} {image} {command}'.format(
189 kwopts=self._stringify_kwopts(kwopts),
190 image=image or self._default_image,
191 command=command if command else ''
192 ).strip()
193 container_id = self._run_docker(subcommand='run', args=args, verbose=True)
194 return DockerContainer.from_id(self, container_id)
195
196 @docker_json
197 def inspect(self, container_id):
198 try:
199 return self._run_docker(subcommand='inspect', args=container_id)[0]
200 except (IndexError, ContainerCLIError) as exc:
201 msg = "Invalid container id: %s" % container_id
202 if exc.stdout == '[]' and exc.stderr == f'Error: no such object: {container_id}':
203 log.warning(msg)
204 return []
205 else:
206 raise ContainerNotFound(msg, container_id=container_id)
207
208 @docker_json
209 def image_inspect(self, image):
210 try:
211 return self._run_docker(subcommand='image inspect', args=image)[0]
212 except (IndexError, ContainerCLIError) as exc:
213 msg = "%s not pulled, cannot get digest" % image
214 if exc.stdout == '[]' and exc.stderr == f'Error: no such image: {image}':
215 log.warning(msg, image)
216 return []
217 else:
218 raise ContainerImageNotFound(msg, image=image)
219
220
221 class DockerAPIClient:
222 """Wraps a ``docker.APIClient`` to catch exceptions.
223 """
224
225 _exception_retry_time = 5
226 _default_max_tries = 10
227 _host_iter = None
228 _client = None
229 _client_args = ()
230 _client_kwargs: Dict[str, Optional[Any]] = {}
231
232 @staticmethod
233 def _qualname(f):
234 if isinstance(f, partial):
235 f = f.func
236 try:
237 return getattr(f, '__qualname__', f.im_class.__name__ + '.' + f.__name__)
238 except AttributeError:
239 return f.__name__
240
241 @staticmethod
242 def _should_retry_request(response_code):
243 return response_code >= 500 or response_code in (404, 408, 409, 429)
244
245 @staticmethod
246 def _nonfatal_error(response_code):
247 return response_code in (404,)
248
249 @staticmethod
250 def _unwrapped_attr(attr):
251 return getattr(DockerAPIClient._client, attr)
252
253 @staticmethod
254 def _init_client():
255 kwargs = DockerAPIClient._client_kwargs.copy()
256 if DockerAPIClient._host_iter is not None and 'base_url' not in kwargs:
257 kwargs['base_url'] = next(DockerAPIClient._host_iter)
258 DockerAPIClient._client = docker.APIClient(*DockerAPIClient._client_args, **kwargs)
259 log.info('Initialized Docker API client for server: %s', kwargs.get('base_url', 'localhost'))
260
261 @staticmethod
262 def _default_client_handler(fname, *args, **kwargs):
263 success_test = kwargs.pop('success_test', None)
264 max_tries = kwargs.pop('max_tries', DockerAPIClient._default_max_tries)
265 for tries in range(1, max_tries + 1):
266 retry_time = DockerAPIClient._exception_retry_time
267 reinit = False
268 exc = None
269 # re-get the APIClient method every time as a different caller (such as the success test function) may have
270 # already reinitialized the client, and we always want to use the current client
271 f = DockerAPIClient._unwrapped_attr(fname)
272 qualname = DockerAPIClient._qualname(f)
273 try:
274 r = f(*args, **kwargs)
275 if tries > 1:
276 log.info('%s() succeeded on attempt %s', qualname, tries)
277 return r
278 except ConnectionError:
279 reinit = True
280 except docker.errors.APIError as exc:
281 if not DockerAPIClient._should_retry_request(exc.response.status_code):
282 raise
283 except ReadTimeout:
284 reinit = True
285 retry_time = 0
286 finally:
287 # this is inside the finally context so we can do a bare raise when we give up (so the real stack for
288 # the exception is raised)
289 if exc is not None:
290 log.warning("Caught exception on %s(): %s: %s",
291 DockerAPIClient._qualname(f), exc.__class__.__name__, exc)
292 if reinit:
293 log.warning("Reinitializing Docker API client due to connection-oriented failure")
294 DockerAPIClient._init_client()
295 f = DockerAPIClient._unwrapped_attr(fname)
296 qualname = DockerAPIClient._qualname(f)
297 r = None
298 if success_test is not None:
299 log.info("Testing if %s() succeeded despite the exception", qualname)
300 r = success_test()
301 if r:
302 log.warning("The request appears to have succeeded, will not retry. Response is: %s", str(r))
303 return r
304 elif tries >= max_tries:
305 log.error("Maximum number of attempts (%s) exceeded", max_tries)
306 if 'response' in exc and DockerAPIClient._nonfatal_error(exc.response.status_code):
307 return None
308 else:
309 raise
310 else:
311 log.error("Retrying %s() in %s seconds (attempt: %s of %s)", qualname, retry_time, tries,
312 max_tries)
313 sleep(retry_time)
314
315 def __init__(self, *args, **kwargs):
316 # Only initialize the host iterator once
317 host_iter = kwargs.pop('host_iter', None)
318 DockerAPIClient._host_iter = DockerAPIClient._host_iter or host_iter
319 DockerAPIClient._client_args = args
320 DockerAPIClient._client_kwargs = kwargs
321 DockerAPIClient._init_client()
322
323 def __getattr__(self, attr):
324 """Allow the calling of methods on this class as if it were a docker.APIClient instance.
325 """
326 cattr = DockerAPIClient._unwrapped_attr(attr)
327 if callable(cattr):
328 return partial(DockerAPIClient._default_client_handler, attr)
329 else:
330 return cattr
331
332
333 class DockerAPIInterface(DockerInterface):
334
335 container_type = 'docker'
336
337 # 'publish_port_random' and 'volumes' are special cases handled in _create_host_config()
338 host_config_option_map = {
339 'auto_remove': {},
340 'publish_all_ports': {},
341 'cpus': {'param': 'nano_cpus', 'map': lambda x: int(x * 1000000000)},
342 'memory': {'param': 'mem_limit'},
343 'binds': {},
344 'port_bindings': {},
345 }
346
347 def validate_config(self):
348 assert docker is not None, "Docker module could not be imported, DockerAPIInterface unavailable"
349 super().validate_config()
350 self.__client = None
351
352 @property
353 def _client(self):
354 # TODO: add cert options to containers conf
355 cert_path = os.environ.get('DOCKER_CERT_PATH') or None
356 if not cert_path:
357 cert_path = os.path.join(os.path.expanduser('~'), '.docker')
358 if self._conf.force_tlsverify or self._conf.tls:
359 tls_config = docker.tls.TLSConfig(
360 client_cert=(os.path.join(cert_path, 'cert.pem'),
361 os.path.join(cert_path, 'key.pem')),
362 ca_cert=os.path.join(cert_path, 'ca.pem'),
363 verify=self._conf.force_tlsverify,
364 )
365 else:
366 tls_config = False
367 if not self.__client:
368 self.__client = DockerAPIClient(
369 host_iter=self.host_iter,
370 tls=tls_config,
371 )
372 return self.__client
373
374 @staticmethod
375 def _first(f, *args, **kwargs):
376 try:
377 return f(*args, **kwargs)[0]
378 except IndexError:
379 return None
380
381 @staticmethod
382 def _filter_by_id_or_name(id, name):
383 if id:
384 return {'id': id}
385 elif name:
386 return {'name': name}
387 return None
388
389 @staticmethod
390 def _kwopt_to_param_names(map_spec, key):
391 """For a given containers lib method parameter name, return the matching docker-py parameter name(s).
392
393 See :meth:`_create_docker_api_spec`.
394 """
395 params = []
396 if 'param' not in map_spec and 'params' not in map_spec:
397 params.append(key)
398 elif 'param' in map_spec:
399 params.append(map_spec['param'])
400 params.extend(map_spec.get('params', ()))
401 return params
402
403 @staticmethod
404 def _kwopt_to_params(map_spec, key, value):
405 """For a given containers lib method parameter name and value, return the matching docker-py parameters with
406 values set (including transformation with an optional map function).
407
408 See :meth:`_create_docker_api_spec`.
409 """
410 params = {}
411 if 'map' in map_spec:
412 value = map_spec['map'](value)
413 for param in DockerAPIInterface._kwopt_to_param_names(map_spec, key):
414 params[param] = value
415 return params
416
417 def _create_docker_api_spec(self, option_map_name, spec_class, kwopts):
418 """Create docker-py objects used as arguments to docker-py methods.
419
420 This method modifies ``kwopts`` by removing options that match the spec.
421
422 An option map is a class-level variable with name ``<map_name>_option_map`` and is a dict with format:
423
424 .. code-block:: python
425
426 sample_option_map = {
427 'containers_lib_option_name': {
428 'param': docker_lib_positional_argument_int or 'docker_lib_keyword_argument_name',
429 'params': like 'param' but an iterable containing multiple docker lib params to set,
430 'default': default value,
431 'map': function with with to transform the value,
432 'required': True if this param is required, else False (default),
433 },
434 '_spec_param': {
435 'spec_class': class of param value,
436 }
437 }
438
439 All members of the mapping value are optional.
440
441 For example, a spec map for (some of) the possible values of the :class:`docker.types.TaskTemplate`, which is
442 used as the ``task_template`` argument to :meth:`docker.APIClient.create_service`, and the possible values of
443 the :class`:docker.types.ContainerSpec`, which is used as the ``container_spec`` argument to the
444 ``TaskTemplate`` would be:
445
446 .. code-block:: python
447
448 task_template_option_map = {
449 # TaskTemplate's 'container_spec' param is a ContainerSpec
450 '_container_spec': {
451 'spec_class': docker.types.ContainerSpec,
452 'required': True
453 }
454 }
455 container_spec_option_map = {
456 'image': {'param': 0}, # positional argument 0 to ContainerSpec()
457 'command': {}, # 'command' keyword argument to ContainerSpec()
458 'environment': { # 'env' keyword argument to ContainerSpec(), 'environment' keyword argument
459 'param': 'env' # to ContainerInterface.run_in_container()
460 },
461 }
462
463 Thus, calling ``DockerInterface.run_in_contaner('true', image='busybox', environment={'FOO': 'foo'}`` will
464 essentially do this (for example, if using Docker Swarm mode):
465
466 .. code-block:: python
467
468 container_spec = docker.types.ContainerSpec('busybox', command='true', env={'FOO': 'foo'})
469 task_template = docker.types.TaskTemplate(container_spec=container_spec)
470 docker.APIClient().create_service(task_template)
471
472 :param option_map_name: Name of option map class variable (``_option_map`` is automatically appended)
473 :type option_map_name: str
474 :param spec_class: docker-py specification class or callable returning an instance
475 :type spec_class: :class:`docker.types.Resources`, :class:`docker.types.ContainerSpec`, etc. or
476 callable
477 :param kwopts: Keyword options passed to calling method (e.g.
478 :meth:`DockerInterface.run_in_container`)
479 :type kwopts: dict
480 :returns: Instantiated ``spec_class`` object
481 :rtype: ``type(spec_class)``
482 """
483 def _kwopt_to_arg(map_spec, key, value, param=None):
484 # determines whether the given param is a positional or keyword argument in docker-py and adds it to the
485 # list of arguments
486 if isinstance(map_spec.get('param'), int):
487 spec_opts.append((map_spec.get('param'), value))
488 elif param is not None:
489 spec_kwopts[param] = value
490 else:
491 spec_kwopts.update(DockerAPIInterface._kwopt_to_params(map_spec, key, value))
492 # positional arguments
493 spec_opts = []
494 # keyword arguments
495 spec_kwopts = {}
496 # retrieve the option map for the docker-py object we're creating
497 option_map = getattr(self, option_map_name + '_option_map')
498 # set defaults
499 for key in filter(lambda k: option_map[k].get('default'), option_map.keys()):
500 map_spec = option_map[key]
501 _kwopt_to_arg(map_spec, key, map_spec['default'])
502 # don't allow kwopts that start with _, those are reserved for "child" object params
503 for kwopt in filter(lambda k: not k.startswith('_') and k in option_map, list(kwopts.keys())):
504 map_spec = option_map[kwopt]
505 _v = kwopts.pop(kwopt)
506 _kwopt_to_arg(map_spec, kwopt, _v)
507 # find any child objects that need to be created and recurse to create them
508 for _sub_k in filter(lambda k: k.startswith('_') and 'spec_class' in option_map[k], option_map.keys()):
509 map_spec = option_map[_sub_k]
510 param = _sub_k.lstrip('_')
511 _sub_v = self._create_docker_api_spec(param, map_spec['spec_class'], kwopts)
512 if _sub_v is not None or map_spec.get('required') or isinstance(map_spec.get('param'), int):
513 _kwopt_to_arg(map_spec, None, _sub_v, param=param)
514 # sort positional args and make into a flat tuple
515 if spec_opts:
516 spec_opts = sorted(spec_opts, key=lambda x: x[0])
517 spec_opts = [i[1] for i in spec_opts]
518 # create spec object
519 if spec_opts or spec_kwopts:
520 return spec_class(*spec_opts, **spec_kwopts)
521 else:
522 return None
523
524 def _volumes_to_native(self, volumes):
525 """Convert a list of volume definitions to the docker-py container creation method parameters.
526
527 :param volumes: List of volumes to translate
528 :type volumes: list of :class:`galaxy.containers.docker_model.DockerVolume`s
529 """
530 paths = []
531 binds = {}
532 for v in volumes:
533 path, bind = v.to_native()
534 paths.append(path)
535 binds.update(bind)
536 return (paths, binds)
537
538 def _create_host_config(self, kwopts):
539 """Build the host configuration parameter for docker-py container creation.
540
541 This method modifies ``kwopts`` by removing host config options and potentially setting the ``ports`` and
542 ``volumes`` keys.
543
544 :param kwopts: Keyword options passed to calling method (e.g. :method:`DockerInterface.run()`)
545 :type kwopts: dict
546 :returns: The return value of `docker.APIClient.create_host_config()`
547 :rtype: dict
548 """
549 if 'publish_port_random' in kwopts:
550 port = int(kwopts.pop('publish_port_random'))
551 kwopts['port_bindings'] = {port: None}
552 kwopts['ports'] = [port]
553 if 'volumes' in kwopts:
554 paths, binds = self._volumes_to_native(kwopts.pop('volumes'))
555 kwopts['binds'] = binds
556 kwopts['volumes'] = paths
557 return self._create_docker_api_spec('host_config', self._client.create_host_config, kwopts)
558
559 #
560 # docker subcommands
561 #
562
563 def ps(self, id=None, name=None, running=True):
564 return self._client.containers(all=not running, filters=self._filter_by_id_or_name(id, name))
565
566 def run(self, command, image=None, **kwopts):
567 image = image or self._default_image
568 command = command or None
569 log.debug("Creating docker container with image '%s' for command: %s", image, command)
570 host_config = self._create_host_config(kwopts)
571 log.debug("Docker container host configuration:\n%s", safe_dumps_formatted(host_config))
572 log.debug("Docker container creation parameters:\n%s", safe_dumps_formatted(kwopts))
573 success_test = partial(self._first, self.ps, name=kwopts['name'], running=False)
574 # this can raise exceptions, if necessary we could wrap them in a more generic "creation failed" exception class
575 container = self._client.create_container(
576 image,
577 command=command if command else None,
578 host_config=host_config,
579 success_test=success_test,
580 max_tries=5,
581 **kwopts
582 )
583 container_id = container.get('Id')
584 log.debug("Starting container: %s (%s)", kwopts['name'], str(container_id))
585 # start can safely be run more than once
586 self._client.start(container=container_id)
587 return DockerContainer.from_id(self, container_id)
588
589 def inspect(self, container_id):
590 try:
591 return self._client.inspect_container(container_id)
592 except docker.errors.NotFound:
593 raise ContainerNotFound("Invalid container id: %s" % container_id, container_id=container_id)
594
595 def image_inspect(self, image):
596 try:
597 return self._client.inspect_image(image)
598 except docker.errors.NotFound:
599 raise ContainerImageNotFound("%s not pulled, cannot get digest" % image, image=image)