view env/lib/python3.9/site-packages/galaxy/containers/ @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
line wrap: on
line source

Interface to Docker

import logging
import os
import shlex
from functools import partial
from itertools import cycle, repeat
from time import sleep
from typing import Any, Dict, Optional, Type

    import docker
except ImportError:
    docker = None  # type: ignore

    from requests.exceptions import ConnectionError, ReadTimeout
except ImportError:
    ConnectionError = None  # type: ignore
    ReadTimeout = None  # type: ignore

from galaxy.containers import Container, ContainerInterface
from galaxy.containers.docker_decorators import (
from galaxy.containers.docker_model import (
from galaxy.exceptions import (
from galaxy.util.json import safe_dumps_formatted

log = logging.getLogger(__name__)

class DockerInterface(ContainerInterface):

    container_class: Type[Container] = DockerContainer
    volume_class = DockerVolume
    conf_defaults: Dict[str, Optional[Any]] = {
        'host': None,
        'tls': False,
        'force_tlsverify': False,
        'auto_remove': True,
        'image': None,
        'cpus': None,
        'memory': None,
    # These values are inserted into kwopts for run commands
    conf_run_kwopts = (

    def validate_config(self):
        self.__host_iter = None
        if is None or isinstance(, str):
            self.__host_iter = repeat(
            self.__host_iter = cycle(

    def _default_image(self):
        assert self._conf.image is not None, "No default image for this docker interface"
        return self._conf.image

    def run_in_container(self, command, image=None, **kwopts):
        for opt in self.conf_run_kwopts:
            if self._conf[opt]:
                kwopts[opt] = self._conf[opt]
        return, image=image, **kwopts)

    def image_repodigest(self, image):
        """Get the digest image string for an image.

        :type image: str
        :param image: image id or image name and optionally, tag

        :returns: digest string, having the format `<name>@<hash_alg>:<digest>`, e.g.:
                  or, the original image name if the digest cannot be
                  determined (the image has not been pulled)
            inspect = self.image_inspect(image)
            return inspect['RepoDigests'][0]
        except ContainerImageNotFound:
            return image

    def host(self):
        return next(self.__host_iter)

    def host_iter(self):
        return self.__host_iter

class DockerCLIInterface(DockerInterface):

    container_type = 'docker_cli'
    conf_defaults: Dict[str, Optional[Any]] = {
        'command_template': '{executable} {global_kwopts} {subcommand} {args}',
        'executable': 'docker',
    option_map = {
        # `run` options
        'environment': {'flag': '--env', 'type': 'list_of_kvpairs'},
        'volumes': {'flag': '--volume', 'type': 'docker_volumes'},
        'name': {'flag': '--name', 'type': 'string'},
        'detach': {'flag': '--detach', 'type': 'boolean'},
        'publish_all_ports': {'flag': '--publish-all', 'type': 'boolean'},
        'publish_port_random': {'flag': '--publish', 'type': 'string'},
        'auto_remove': {'flag': '--rm', 'type': 'boolean'},
        'cpus': {'flag': '--cpus', 'type': 'string'},
        'memory': {'flag': '--memory', 'type': 'string'},

    def validate_config(self):
        log.warning('The `docker_cli` interface is deprecated and will be removed in Galaxy 18.09, please use `docker`')
        global_kwopts = []
        if self._conf.force_tlsverify:
        self._docker_command = self._conf['command_template'].format(
            global_kwopts=' '.join(global_kwopts),

    def _filter_by_id_or_name(self, id, name):
        if id:
            return f'--filter id={id}'
        elif name:
            return f'--filter name={name}'
        return None

    def _stringify_kwopt_docker_volumes(self, flag, val):
        """The docker API will take a volumes argument in many formats, try to
        deal with that for the command line
        l = []
        if isinstance(val, list):
            # ['/host/vol']
            l = val
            for hostvol, guestopts in val.items():
                if isinstance(guestopts, str):
                    # {'/host/vol': '/container/vol'}
                    # {'/host/vol': {'bind': '/container/vol'}}
                    # {'/host/vol': {'bind': '/container/vol', 'mode': 'rw'}}
                    mode = guestopts.get('mode', '')
                        mode=':' + mode if mode else ''
        return self._stringify_kwopt_list(flag, l)

    def _run_docker(self, subcommand, args=None, verbose=False):
        command = self._docker_command.format(subcommand=subcommand, args=args or '')
        return self._run_command(command, verbose=verbose)

    # docker subcommands

    def ps(self, id=None, name=None):
        return self._run_docker(subcommand='ps', args=self._filter_by_id_or_name(id, name))

    def run(self, command, image=None, **kwopts):
        args = '{kwopts} {image} {command}'.format(
            image=image or self._default_image,
            command=command if command else ''
        container_id = self._run_docker(subcommand='run', args=args, verbose=True)
        return DockerContainer.from_id(self, container_id)

    def inspect(self, container_id):
            return self._run_docker(subcommand='inspect', args=container_id)[0]
        except (IndexError, ContainerCLIError) as exc:
            msg = "Invalid container id: %s" % container_id
            if exc.stdout == '[]' and exc.stderr == f'Error: no such object: {container_id}':
                return []
                raise ContainerNotFound(msg, container_id=container_id)

    def image_inspect(self, image):
            return self._run_docker(subcommand='image inspect', args=image)[0]
        except (IndexError, ContainerCLIError) as exc:
            msg = "%s not pulled, cannot get digest" % image
            if exc.stdout == '[]' and exc.stderr == f'Error: no such image: {image}':
                log.warning(msg, image)
                return []
                raise ContainerImageNotFound(msg, image=image)

class DockerAPIClient:
    """Wraps a ``docker.APIClient`` to catch exceptions.

    _exception_retry_time = 5
    _default_max_tries = 10
    _host_iter = None
    _client = None
    _client_args = ()
    _client_kwargs: Dict[str, Optional[Any]] = {}

    def _qualname(f):
        if isinstance(f, partial):
            f = f.func
            return getattr(f, '__qualname__', f.im_class.__name__ + '.' + f.__name__)
        except AttributeError:
            return f.__name__

    def _should_retry_request(response_code):
        return response_code >= 500 or response_code in (404, 408, 409, 429)

    def _nonfatal_error(response_code):
        return response_code in (404,)

    def _unwrapped_attr(attr):
        return getattr(DockerAPIClient._client, attr)

    def _init_client():
        kwargs = DockerAPIClient._client_kwargs.copy()
        if DockerAPIClient._host_iter is not None and 'base_url' not in kwargs:
            kwargs['base_url'] = next(DockerAPIClient._host_iter)
        DockerAPIClient._client = docker.APIClient(*DockerAPIClient._client_args, **kwargs)'Initialized Docker API client for server: %s', kwargs.get('base_url', 'localhost'))

    def _default_client_handler(fname, *args, **kwargs):
        success_test = kwargs.pop('success_test', None)
        max_tries = kwargs.pop('max_tries', DockerAPIClient._default_max_tries)
        for tries in range(1, max_tries + 1):
            retry_time = DockerAPIClient._exception_retry_time
            reinit = False
            exc = None
            # re-get the APIClient method every time as a different caller (such as the success test function) may have
            # already reinitialized the client, and we always want to use the current client
            f = DockerAPIClient._unwrapped_attr(fname)
            qualname = DockerAPIClient._qualname(f)
                r = f(*args, **kwargs)
                if tries > 1:
          '%s() succeeded on attempt %s', qualname, tries)
                return r
            except ConnectionError:
                reinit = True
            except docker.errors.APIError as exc:
                if not DockerAPIClient._should_retry_request(exc.response.status_code):
            except ReadTimeout:
                reinit = True
                retry_time = 0
                # this is inside the finally context so we can do a bare raise when we give up (so the real stack for
                # the exception is raised)
                if exc is not None:
                    log.warning("Caught exception on %s(): %s: %s",
                                DockerAPIClient._qualname(f), exc.__class__.__name__, exc)
                    if reinit:
                        log.warning("Reinitializing Docker API client due to connection-oriented failure")
                        f = DockerAPIClient._unwrapped_attr(fname)
                        qualname = DockerAPIClient._qualname(f)
                    r = None
                    if success_test is not None:
              "Testing if %s() succeeded despite the exception", qualname)
                        r = success_test()
                    if r:
                        log.warning("The request appears to have succeeded, will not retry. Response is: %s", str(r))
                        return r
                    elif tries >= max_tries:
                        log.error("Maximum number of attempts (%s) exceeded", max_tries)
                        if 'response' in exc and DockerAPIClient._nonfatal_error(exc.response.status_code):
                            return None
                        log.error("Retrying %s() in %s seconds (attempt: %s of %s)", qualname, retry_time, tries,

    def __init__(self, *args, **kwargs):
        # Only initialize the host iterator once
        host_iter = kwargs.pop('host_iter', None)
        DockerAPIClient._host_iter = DockerAPIClient._host_iter or host_iter
        DockerAPIClient._client_args = args
        DockerAPIClient._client_kwargs = kwargs

    def __getattr__(self, attr):
        """Allow the calling of methods on this class as if it were a docker.APIClient instance.
        cattr = DockerAPIClient._unwrapped_attr(attr)
        if callable(cattr):
            return partial(DockerAPIClient._default_client_handler, attr)
            return cattr

class DockerAPIInterface(DockerInterface):

    container_type = 'docker'

    # 'publish_port_random' and 'volumes' are special cases handled in _create_host_config()
    host_config_option_map = {
        'auto_remove': {},
        'publish_all_ports': {},
        'cpus': {'param': 'nano_cpus', 'map': lambda x: int(x * 1000000000)},
        'memory': {'param': 'mem_limit'},
        'binds': {},
        'port_bindings': {},

    def validate_config(self):
        assert docker is not None, "Docker module could not be imported, DockerAPIInterface unavailable"
        self.__client = None

    def _client(self):
        # TODO: add cert options to containers conf
        cert_path = os.environ.get('DOCKER_CERT_PATH') or None
        if not cert_path:
            cert_path = os.path.join(os.path.expanduser('~'), '.docker')
        if self._conf.force_tlsverify or self._conf.tls:
            tls_config = docker.tls.TLSConfig(
                client_cert=(os.path.join(cert_path, 'cert.pem'),
                             os.path.join(cert_path, 'key.pem')),
                ca_cert=os.path.join(cert_path, 'ca.pem'),
            tls_config = False
        if not self.__client:
            self.__client = DockerAPIClient(
        return self.__client

    def _first(f, *args, **kwargs):
            return f(*args, **kwargs)[0]
        except IndexError:
            return None

    def _filter_by_id_or_name(id, name):
        if id:
            return {'id': id}
        elif name:
            return {'name': name}
        return None

    def _kwopt_to_param_names(map_spec, key):
        """For a given containers lib method parameter name, return the matching docker-py parameter name(s).

        See :meth:`_create_docker_api_spec`.
        params = []
        if 'param' not in map_spec and 'params' not in map_spec:
        elif 'param' in map_spec:
        params.extend(map_spec.get('params', ()))
        return params

    def _kwopt_to_params(map_spec, key, value):
        """For a given containers lib method parameter name and value, return the matching docker-py parameters with
        values set (including transformation with an optional map function).

        See :meth:`_create_docker_api_spec`.
        params = {}
        if 'map' in map_spec:
            value = map_spec['map'](value)
        for param in DockerAPIInterface._kwopt_to_param_names(map_spec, key):
            params[param] = value
        return params

    def _create_docker_api_spec(self, option_map_name, spec_class, kwopts):
        """Create docker-py objects used as arguments to docker-py methods.

        This method modifies ``kwopts`` by removing options that match the spec.

        An option map is a class-level variable with name ``<map_name>_option_map`` and is a dict with format:

        .. code-block:: python

            sample_option_map = {
                'containers_lib_option_name': {
                    'param': docker_lib_positional_argument_int or 'docker_lib_keyword_argument_name',
                    'params': like 'param' but an iterable containing multiple docker lib params to set,
                    'default': default value,
                    'map': function with with to transform the value,
                    'required': True if this param is required, else False (default),
                '_spec_param': {
                    'spec_class': class of param value,

        All members of the mapping value are optional.

        For example, a spec map for (some of) the possible values of the :class:`docker.types.TaskTemplate`, which is
        used as the ``task_template`` argument to :meth:`docker.APIClient.create_service`, and the possible values of
        the :class`:docker.types.ContainerSpec`, which is used as the ``container_spec`` argument to the
        ``TaskTemplate``  would be:

        .. code-block:: python

            task_template_option_map = {
                # TaskTemplate's 'container_spec' param is a ContainerSpec
                '_container_spec': {
                    'spec_class': docker.types.ContainerSpec,
                    'required': True
            container_spec_option_map = {
                'image': {'param': 0},      # positional argument 0 to ContainerSpec()
                'command': {},              # 'command' keyword argument to ContainerSpec()
                'environment': {            # 'env' keyword argument to ContainerSpec(), 'environment' keyword argument
                    'param': 'env'          #   to ContainerInterface.run_in_container()

        Thus, calling ``DockerInterface.run_in_contaner('true', image='busybox', environment={'FOO': 'foo'}`` will
        essentially do this (for example, if using Docker Swarm mode):

        .. code-block:: python

            container_spec = docker.types.ContainerSpec('busybox', command='true', env={'FOO': 'foo'})
            task_template = docker.types.TaskTemplate(container_spec=container_spec)

        :param  option_map_name:    Name of option map class variable (``_option_map`` is automatically appended)
        :type   option_map_name:    str
        :param  spec_class:         docker-py specification class or callable returning an instance
        :type   spec_class:         :class:`docker.types.Resources`, :class:`docker.types.ContainerSpec`, etc. or
        :param  kwopts:             Keyword options passed to calling method (e.g.
        :type   kwopts:             dict
        :returns:                   Instantiated ``spec_class`` object
        :rtype:                     ``type(spec_class)``
        def _kwopt_to_arg(map_spec, key, value, param=None):
            # determines whether the given param is a positional or keyword argument in docker-py and adds it to the
            # list of arguments
            if isinstance(map_spec.get('param'), int):
                spec_opts.append((map_spec.get('param'), value))
            elif param is not None:
                spec_kwopts[param] = value
                spec_kwopts.update(DockerAPIInterface._kwopt_to_params(map_spec, key, value))
        # positional arguments
        spec_opts = []
        # keyword arguments
        spec_kwopts = {}
        # retrieve the option map for the docker-py object we're creating
        option_map = getattr(self, option_map_name + '_option_map')
        # set defaults
        for key in filter(lambda k: option_map[k].get('default'), option_map.keys()):
            map_spec = option_map[key]
            _kwopt_to_arg(map_spec, key, map_spec['default'])
        # don't allow kwopts that start with _, those are reserved for "child" object params
        for kwopt in filter(lambda k: not k.startswith('_') and k in option_map, list(kwopts.keys())):
            map_spec = option_map[kwopt]
            _v = kwopts.pop(kwopt)
            _kwopt_to_arg(map_spec, kwopt, _v)
        # find any child objects that need to be created and recurse to create them
        for _sub_k in filter(lambda k: k.startswith('_') and 'spec_class' in option_map[k], option_map.keys()):
            map_spec = option_map[_sub_k]
            param = _sub_k.lstrip('_')
            _sub_v = self._create_docker_api_spec(param, map_spec['spec_class'], kwopts)
            if _sub_v is not None or map_spec.get('required') or isinstance(map_spec.get('param'), int):
                _kwopt_to_arg(map_spec, None, _sub_v, param=param)
        # sort positional args and make into a flat tuple
        if spec_opts:
            spec_opts = sorted(spec_opts, key=lambda x: x[0])
            spec_opts = [i[1] for i in spec_opts]
        # create spec object
        if spec_opts or spec_kwopts:
            return spec_class(*spec_opts, **spec_kwopts)
            return None

    def _volumes_to_native(self, volumes):
        """Convert a list of volume definitions to the docker-py container creation method parameters.

        :param  volumes:    List of volumes to translate
        :type   volumes:    list of :class:`galaxy.containers.docker_model.DockerVolume`s
        paths = []
        binds = {}
        for v in volumes:
            path, bind = v.to_native()
        return (paths, binds)

    def _create_host_config(self, kwopts):
        """Build the host configuration parameter for docker-py container creation.

        This method modifies ``kwopts`` by removing host config options and potentially setting the ``ports`` and
        ``volumes`` keys.

        :param  kwopts: Keyword options passed to calling method (e.g. :method:``)
        :type   kwopts: dict
        :returns:       The return value of `docker.APIClient.create_host_config()`
        :rtype:         dict
        if 'publish_port_random' in kwopts:
            port = int(kwopts.pop('publish_port_random'))
            kwopts['port_bindings'] = {port: None}
            kwopts['ports'] = [port]
        if 'volumes' in kwopts:
            paths, binds = self._volumes_to_native(kwopts.pop('volumes'))
            kwopts['binds'] = binds
            kwopts['volumes'] = paths
        return self._create_docker_api_spec('host_config', self._client.create_host_config, kwopts)

    # docker subcommands

    def ps(self, id=None, name=None, running=True):
        return self._client.containers(all=not running, filters=self._filter_by_id_or_name(id, name))

    def run(self, command, image=None, **kwopts):
        image = image or self._default_image
        command = command or None
        log.debug("Creating docker container with image '%s' for command: %s", image, command)
        host_config = self._create_host_config(kwopts)
        log.debug("Docker container host configuration:\n%s", safe_dumps_formatted(host_config))
        log.debug("Docker container creation parameters:\n%s", safe_dumps_formatted(kwopts))
        success_test = partial(self._first,, name=kwopts['name'], running=False)
        # this can raise exceptions, if necessary we could wrap them in a more generic "creation failed" exception class
        container = self._client.create_container(
            command=command if command else None,
        container_id = container.get('Id')
        log.debug("Starting container: %s (%s)", kwopts['name'], str(container_id))
        # start can safely be run more than once
        return DockerContainer.from_id(self, container_id)

    def inspect(self, container_id):
            return self._client.inspect_container(container_id)
        except docker.errors.NotFound:
            raise ContainerNotFound("Invalid container id: %s" % container_id, container_id=container_id)

    def image_inspect(self, image):
            return self._client.inspect_image(image)
        except docker.errors.NotFound:
            raise ContainerImageNotFound("%s not pulled, cannot get digest" % image, image=image)