view env/lib/python3.9/site-packages/galaxy/containers/ @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
line wrap: on
line source

Docker Swarm mode interface

import logging
import os.path
import subprocess
from functools import partial
from typing import Any, Dict, Optional

    import docker.types
except ImportError:
    from galaxy.util.bunch import Bunch
    docker = Bunch(types=Bunch(

from galaxy.containers.docker import (
from galaxy.containers.docker_decorators import docker_columns, docker_json
from galaxy.containers.docker_model import (
from galaxy.exceptions import ContainerRunError
from galaxy.util import unicodify
from galaxy.util.json import safe_dumps_formatted

log = logging.getLogger(__name__)

SWARM_MANAGER_PATH = os.path.abspath(

class DockerSwarmInterface(DockerInterface):

    container_class = DockerService
    conf_defaults: Dict[str, Optional[Any]] = {
        'ignore_volumes': False,
        'node_prefix': None,
        'service_create_image_constraint': False,
        'service_create_cpus_constraint': False,
        'resolve_image_digest': False,
        'managed': True,
        'manager_autostart': True,
    publish_port_list_required = True
    supports_volumes = False

    def validate_config(self):
        self._node_prefix = self._conf.node_prefix

    def run_in_container(self, command, image=None, **kwopts):
        """Run a service like a detached container
        kwopts['replicas'] = 1
        kwopts['restart_condition'] = 'none'
        if kwopts.get('publish_all_ports', False):
            # not supported for services
            # TODO: inspect image (or query registry if possible) for port list
            if kwopts.get('publish_port_random', False) or kwopts.get('ports', False):
                # assume this covers for publish_all_ports
                del kwopts['publish_all_ports']
                raise ContainerRunError(
                    "Publishing all ports is not supported in Docker swarm"
                    " mode, use `publish_port_random` or `ports`",
        if not kwopts.get('detach', True):
            raise ContainerRunError(
                "Running attached containers is not supported in Docker swarm mode",
        elif kwopts.get('detach', None):
            del kwopts['detach']
        if kwopts.get('volumes', None):
            if self._conf.ignore_volumes:
                    "'volumes' kwopt is set and not supported in Docker swarm "
                    "mode, volumes will not be passed (set 'ignore_volumes: "
                    "False' in containers config to fail instead): %s" % kwopts['volumes']
                raise ContainerRunError(
                    "'volumes' kwopt is set and not supported in Docker swarm "
                    "mode (set 'ignore_volumes: True' in containers config to "
                    "warn instead): %s" % kwopts['volumes'],
        # ensure the volumes key is removed from kwopts
        kwopts.pop('volumes', None)
        service = self.service_create(command, image=image, **kwopts)
        return service

    # helpers

    def _run_swarm_manager(self):
        if self._conf.managed and self._conf.manager_autostart:
                # sys.exectuable would be preferable to using $PATH, but sys.executable is probably uwsgi
                subprocess.check_call(['python', SWARM_MANAGER_PATH, '--containers-config-file',
                                      self.containers_config_file, '--swarm', self.key])
            except subprocess.CalledProcessError as exc:
                log.error('Failed to launch swarm manager: %s', unicodify(exc))

    def _get_image(self, image):
        """Get the image string, either from the argument, or from the
        configured interface default if ``image`` is ``None``. Optionally
        resolve the image to its digest if ``resolve_image_digest`` is set in
        the interface configuration.

        If the image has not been pulled, the repo digest cannot be determined
        and the image name will be returned.

        :type   image:  str or None
        :param  image:  image id or name

        :returns:       image name or image repo digest
        if not image:
            image = self._conf.image
        assert image is not None, "No image supplied as parameter and no image set as default in config, cannot create service"
        if self._conf.resolve_image_digest:
            image = self.image_repodigest(image)
        return image

    def _objects_by_attribute(self, generator, attribute_name):
        rval = {}
        for obj in generator:
            attr = getattr(obj, attribute_name)
            if attr not in rval:
                rval[attr] = []
        return rval

    # docker object generators

    def services(self, id=None, name=None):
        for service_dict in self.service_ls(id=id, name=name):
            service_id = service_dict['ID']
            service = DockerService(self, service_id, inspect=service_dict)
                yield service

    def service(self, id=None, name=None):
            return next(, name=name))
        except StopIteration:
            return None

    def services_in_state(self, desired, current, tasks='any'):
        for service in
            if service.in_state(desired, current, tasks=tasks):
                yield service

    def service_tasks(self, service):
        for task_dict in self.service_ps(
            yield DockerTask.from_api(self, task_dict, service=service)

    def nodes(self, id=None, name=None):
        for node_dict in self.node_ls(id=id, name=name):
            node_id = node_dict['ID']
            node = DockerNode(self, node_id, inspect=node_dict)
            if self._node_prefix and not
            yield node

    def node(self, id=None, name=None):
            return next(self.nodes(id=id, name=name))
        except StopIteration:
            return None

    def nodes_in_state(self, status, availability):
        for node in self.nodes():
            if node.in_state(status, availability):
                yield node

    def node_tasks(self, node):
        for task_dict in self.node_ps(
            yield DockerTask.from_api(self, task_dict, node=node)

    # higher level queries

    def services_waiting(self):
        return self.services_in_state('Running', 'Pending')

    def services_waiting_by_constraints(self):
        return self._objects_by_attribute(self.services_waiting(), 'constraints')

    def services_completed(self):
        return self.services_in_state('Shutdown', 'Complete', tasks='all')

    def services_terminal(self):
        return [s for s in if s.terminal]

    def nodes_active(self):
        return self.nodes_in_state('Ready', 'Active')

    def nodes_active_by_constraints(self):
        return self._objects_by_attribute(self.nodes_active(), 'labels_as_constraints')

    # operations

    def services_clean(self):
        cleaned_service_ids = []
        completed_services = list(self.services_completed())  # returns a generator, should probably fix this
        if completed_services:
            cleaned_service_ids.extend(self.service_rm([ for x in completed_services]))
        terminal_services = list(self.services_terminal())
        for service in terminal_services:
            log.warning('cleaned service in abnormal terminal state: %s (%s). state: %s',,, service.state)
        if terminal_services:
            cleaned_service_ids.extend(self.service_rm([ for x in terminal_services]))
        return filter(lambda x: in cleaned_service_ids, completed_services + terminal_services)

class DockerSwarmCLIInterface(DockerSwarmInterface, DockerCLIInterface):

    container_type = 'docker_swarm_cli'
    option_map = {
        # `service create` options
        'constraint': {'flag': '--constraint', 'type': 'list_of_kovtrips'},
        'replicas': {'flag': '--replicas', 'type': 'string'},
        'restart_condition': {'flag': '--restart-condition', 'type': 'string'},
        'environment': {'flag': '--env', 'type': 'list_of_kvpairs'},
        'name': {'flag': '--name', 'type': 'string'},
        'publish_port_random': {'flag': '--publish', 'type': 'string'},
        'cpu_limit': {'flag': '--limit-cpu', 'type': 'string'},
        'mem_limit': {'flag': '--limit-memory', 'type': 'string'},
        'cpu_reservation': {'flag': '--reserve-cpu', 'type': 'string'},
        'mem_reservation': {'flag': '--reserve-memory', 'type': 'string'},
        # `service update` options
        'label_add': {'flag': '--label-add', 'type': 'list_of_kvpairs'},
        'label_rm': {'flag': '--label-rm', 'type': 'list_of_kvpairs'},
        'availability': {'flag': '--availability', 'type': 'string'},

    # docker object generators

    def services(self, id=None, name=None):
        for service_dict in self.service_ls(id=id, name=name):
            service_id = service_dict['ID']
            service_name = service_dict['NAME']
            if not service_name.startswith(self._name_prefix):
            task_list = self.service_ps(service_id)
            yield DockerService.from_cli(self, service_dict, task_list)

    def service_tasks(self, service):
        for task_dict in self.service_ps(
            if task_dict['NAME'].strip().startswith(r'\_'):
                continue    # historical task
            yield DockerTask.from_cli(self, task_dict, service=service)

    def nodes(self, id=None, name=None):
        for node_dict in self.node_ls(id=id, name=name):
            node_id = node_dict['ID'].strip(' *')
            node_name = node_dict['HOSTNAME']
            if self._node_prefix and not node_name.startswith(self._node_prefix):
            task_list = filter(lambda x: x['NAME'].startswith(self._name_prefix), self.node_ps(node_id))
            yield DockerNode.from_cli(self, node_dict, task_list)

    # docker subcommands

    def service_create(self, command, image=None, **kwopts):
        if ('service_create_image_constraint' in self._conf or 'service_create_cpus_constraint' in self._conf) and 'constraint' not in kwopts:
            kwopts['constraint'] = []
        image = self._get_image(image)
        if self._conf.service_create_image_constraint:
            kwopts['constraint'].append((IMAGE_CONSTRAINT, '==', image))
        if self._conf.service_create_cpus_constraint:
            cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1'))
            kwopts['constraint'].append((CPUS_CONSTRAINT, '==', cpus))
        if self._conf.cpus:
            kwopts['cpu_limit'] = self._conf.cpus
            kwopts['cpu_reservation'] = self._conf.cpus
        if self._conf.memory:
            kwopts['mem_limit'] = self._conf.memory
            kwopts['mem_reservation'] = self._conf.memory
        args = '{kwopts} {image} {command}'.format(
            image=image if image else '',
            command=command if command else '',
        service_id = self._run_docker(subcommand='service create', args=args, verbose=True)
        return DockerService.from_id(self, service_id)

    def service_inspect(self, service_id):
        return self._run_docker(subcommand='service inspect', args=service_id)[0]

    def service_ls(self, id=None, name=None):
        return self._run_docker(subcommand='service ls', args=self._filter_by_id_or_name(id, name))

    def service_ps(self, service_id):
        return self._run_docker(subcommand='service ps', args=f'--no-trunc {service_id}')

    def service_rm(self, service_ids):
        service_ids = ' '.join(service_ids)
        return self._run_docker(subcommand='service rm', args=service_ids).splitlines()

    def node_inspect(self, node_id):
        return self._run_docker(subcommand='node inspect', args=node_id)[0]

    def node_ls(self, id=None, name=None):
        return self._run_docker(subcommand='node ls', args=self._filter_by_id_or_name(id, name))

    def node_ps(self, node_id):
        return self._run_docker(subcommand='node ps', args=f'--no-trunc {node_id}')

    def node_update(self, node_id, **kwopts):
        return self._run_docker(subcommand='node update', args='{kwopts} {node_id}'.format(

    def task_inspect(self, task_id):
        return self._run_docker(subcommand="inspect", args=task_id)

class DockerSwarmAPIInterface(DockerSwarmInterface, DockerAPIInterface):

    container_type = 'docker_swarm'
    placement_option_map = {
        'constraint': {'param': 'constraints'},
    service_mode_option_map = {
        'service_mode': {'param': 0, 'default': 'replicated'},
        'replicas': {'default': 1},
    endpoint_spec_option_map: Dict[str, Dict] = {
        'ports': {},
    resources_option_map = {
        'cpus': {'params': ('cpu_limit', 'cpu_reservation'), 'map': lambda x: int(x * 1000000000)},
        'memory': {'params': ('mem_limit', 'mem_reservation')},
    container_spec_option_map = {
        'image': {'param': 0},
        'command': {},
        'environment': {'param': 'env'},
        'labels': {},
    restart_policy_option_map = {
        'restart_condition': {'param': 'condition', 'default': 'none'},
        'restart_delay': {'param': 'delay'},
        'restart_max_attempts': {'param': 'max_attemps'},
    task_template_option_map = {
        '_container_spec': {'spec_class': docker.types.ContainerSpec, 'required': True},
        '_resources': {'spec_class': docker.types.Resources},
        '_restart_policy': {'spec_class': docker.types.RestartPolicy},
        '_placement': {'spec_class': docker.types.Placement},
    node_spec_option_map = {
        'availability': {'param': 'Availability'},
        'name': {'param': 'Name'},
        'role': {'param': 'Role'},
        'labels': {'param': 'Labels'},

    def create_random_port_spec(port):
        return {
            'Protocol': 'tcp',
            'PublishedPort': None,
            'TargetPort': port,

    # docker subcommands

    def service_create(self, command, image=None, **kwopts):
        # TODO: some of this should probably move to run_in_container when the CLI interface is removed
        log.debug("Creating docker service with image '%s' for command: %s", image, command)
        # insert run kwopts from config
        for opt in self.conf_run_kwopts:
            if self._conf[opt]:
                kwopts[opt] = self._conf[opt]
        # image is part of the container spec
        kwopts['image'] = self._get_image(image)
        # service constraints
        kwopts['constraint'] = kwopts.get('constraint', [])
        if self._conf.service_create_image_constraint:
            kwopts['constraint'].append(IMAGE_CONSTRAINT + '==' + image)
        if self._conf.service_create_cpus_constraint:
            cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1'))
            kwopts['constraint'].append(CPUS_CONSTRAINT + '==' + cpus)
        # ports
        if 'publish_port_random' in kwopts:
            kwopts['ports'] = [DockerSwarmAPIInterface.create_random_port_spec(kwopts.pop('publish_port_random'))]
        # create specs
        service_mode = self._create_docker_api_spec('service_mode', docker.types.ServiceMode, kwopts)
        endpoint_spec = self._create_docker_api_spec('endpoint_spec', docker.types.EndpointSpec, kwopts)
        task_template = self._create_docker_api_spec('task_template', docker.types.TaskTemplate, kwopts)
        log.debug("Docker service task template:\n%s", safe_dumps_formatted(task_template))
        log.debug("Docker service endpoint specification:\n%s", safe_dumps_formatted(endpoint_spec))
        log.debug("Docker service mode:\n%s", safe_dumps_formatted(service_mode))
        log.debug("Docker service creation parameters:\n%s", safe_dumps_formatted(kwopts))
        success_test = partial(self._first, self.service_ls, name=kwopts['name'])
        # this can raise exceptions, if necessary we could wrap them in a more generic "creation failed" exception class
        service = self._client.create_service(
        service_id = service.get('ID')
        log.debug('Created service: %s (%s)', kwopts['name'], service_id)
        return DockerService.from_id(self, service_id)

    def service_inspect(self, service_id):
        return self._client.inspect_service(service_id)

    def service_ls(self, id=None, name=None):
        return, name))

    # roughly `docker service ps`
    def service_ps(self, service_id):
        return self.task_ls(filters={'service': service_id})

    def service_rm(self, service_ids):
        r = []
        for service_id in service_ids:
        return r

    def node_inspect(self, node_id):
        return self._client.inspect_node(node_id)

    def node_ls(self, id=None, name=None):
        return self._client.nodes(filters=self._filter_by_id_or_name(id, name))

    # roughly `docker node ps`
    def node_ps(self, node_id):
        return self.task_ls(filters={'node': node_id})

    def node_update(self, node_id, **kwopts):
        node = DockerNode.from_id(self, node_id)
        spec = node.inspect['Spec']
        if 'label_add' in kwopts:
            kwopts['labels'] = spec.get('Labels', {})
        spec.update(self._create_docker_api_spec('node_spec', dict, kwopts))
        return self._client.update_node(, node.version, node_spec=spec)

    def task_inspect(self, task_id):
        return self._client.inspect_task(task_id)

    def task_ls(self, filters=None):
        return self._client.tasks(filters=filters)