Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/galaxy/containers/docker_swarm.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/galaxy/containers/docker_swarm.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,498 +0,0 @@ -""" -Docker Swarm mode interface -""" -from __future__ import absolute_import - -import logging -import os.path -import subprocess -from functools import partial - -try: - import docker.types -except ImportError: - from galaxy.util.bunch import Bunch - docker = Bunch(types=Bunch( - ContainerSpec=None, - RestartPolicy=None, - Resources=None, - Placement=None, - )) - -from galaxy.containers.docker import ( - DockerAPIInterface, - DockerCLIInterface, - DockerInterface -) -from galaxy.containers.docker_decorators import docker_columns, docker_json -from galaxy.containers.docker_model import ( - CPUS_CONSTRAINT, - DockerNode, - DockerService, - DockerTask, - IMAGE_CONSTRAINT -) -from galaxy.exceptions import ContainerRunError -from galaxy.util import unicodify -from galaxy.util.json import safe_dumps_formatted - -log = logging.getLogger(__name__) - -SWARM_MANAGER_PATH = os.path.abspath( - os.path.join( - os.path.dirname(__file__), - os.path.pardir, - os.path.pardir, - os.path.pardir, - 'scripts', - 'docker_swarm_manager.py')) - - -class DockerSwarmInterface(DockerInterface): - - container_class = DockerService - conf_defaults = { - 'ignore_volumes': False, - 'node_prefix': None, - 'service_create_image_constraint': False, - 'service_create_cpus_constraint': False, - 'resolve_image_digest': False, - 'managed': True, - 'manager_autostart': True, - } - publish_port_list_required = True - supports_volumes = False - - def validate_config(self): - super(DockerSwarmInterface, self).validate_config() - self._node_prefix = self._conf.node_prefix - - def run_in_container(self, command, image=None, **kwopts): - """Run a service like a detached container - """ - kwopts['replicas'] = 1 - kwopts['restart_condition'] = 'none' - if kwopts.get('publish_all_ports', False): - # not supported for services - # TODO: inspect image (or query registry if possible) for port list - if kwopts.get('publish_port_random', False) or kwopts.get('ports', False): - # assume this covers for publish_all_ports - del kwopts['publish_all_ports'] - else: - raise ContainerRunError( - "Publishing all ports is not supported in Docker swarm" - " mode, use `publish_port_random` or `ports`", - image=image, - command=command - ) - if not kwopts.get('detach', True): - raise ContainerRunError( - "Running attached containers is not supported in Docker swarm mode", - image=image, - command=command - ) - elif kwopts.get('detach', None): - del kwopts['detach'] - if kwopts.get('volumes', None): - if self._conf.ignore_volumes: - log.warning( - "'volumes' kwopt is set and not supported in Docker swarm " - "mode, volumes will not be passed (set 'ignore_volumes: " - "False' in containers config to fail instead): %s" % kwopts['volumes'] - ) - else: - raise ContainerRunError( - "'volumes' kwopt is set and not supported in Docker swarm " - "mode (set 'ignore_volumes: True' in containers config to " - "warn instead): %s" % kwopts['volumes'], - image=image, - command=command - ) - # ensure the volumes key is removed from kwopts - kwopts.pop('volumes', None) - service = self.service_create(command, image=image, **kwopts) - self._run_swarm_manager() - return service - - # - # helpers - # - - def _run_swarm_manager(self): - if self._conf.managed and self._conf.manager_autostart: - try: - # sys.exectuable would be preferable to using $PATH, but sys.executable is probably uwsgi - subprocess.check_call(['python', SWARM_MANAGER_PATH, '--containers-config-file', - self.containers_config_file, '--swarm', self.key]) - except subprocess.CalledProcessError as exc: - log.error('Failed to launch swarm manager: %s', unicodify(exc)) - - def _get_image(self, image): - """Get the image string, either from the argument, or from the - configured interface default if ``image`` is ``None``. Optionally - resolve the image to its digest if ``resolve_image_digest`` is set in - the interface configuration. - - If the image has not been pulled, the repo digest cannot be determined - and the image name will be returned. - - :type image: str or None - :param image: image id or name - - :returns: image name or image repo digest - """ - if not image: - image = self._conf.image - assert image is not None, "No image supplied as parameter and no image set as default in config, cannot create service" - if self._conf.resolve_image_digest: - image = self.image_repodigest(image) - return image - - def _objects_by_attribute(self, generator, attribute_name): - rval = {} - for obj in generator: - attr = getattr(obj, attribute_name) - if attr not in rval: - rval[attr] = [] - rval[attr].append(obj) - return rval - - # - # docker object generators - # - - def services(self, id=None, name=None): - for service_dict in self.service_ls(id=id, name=name): - service_id = service_dict['ID'] - service = DockerService(self, service_id, inspect=service_dict) - if service.name.startswith(self._name_prefix): - yield service - - def service(self, id=None, name=None): - try: - return self.services(id=id, name=name).next() - except StopIteration: - return None - - def services_in_state(self, desired, current, tasks='any'): - for service in self.services(): - if service.in_state(desired, current, tasks=tasks): - yield service - - def service_tasks(self, service): - for task_dict in self.service_ps(service.id): - yield DockerTask.from_api(self, task_dict, service=service) - - def nodes(self, id=None, name=None): - for node_dict in self.node_ls(id=id, name=name): - node_id = node_dict['ID'] - node = DockerNode(self, node_id, inspect=node_dict) - if self._node_prefix and not node.name.startswith(self._node_prefix): - continue - yield node - - def node(self, id=None, name=None): - try: - return self.nodes(id=id, name=name).next() - except StopIteration: - return None - - def nodes_in_state(self, status, availability): - for node in self.nodes(): - if node.in_state(status, availability): - yield node - - def node_tasks(self, node): - for task_dict in self.node_ps(node.id): - yield DockerTask.from_api(self, task_dict, node=node) - - # - # higher level queries - # - - def services_waiting(self): - return self.services_in_state('Running', 'Pending') - - def services_waiting_by_constraints(self): - return self._objects_by_attribute(self.services_waiting(), 'constraints') - - def services_completed(self): - return self.services_in_state('Shutdown', 'Complete', tasks='all') - - def services_terminal(self): - return [s for s in self.services() if s.terminal] - - def nodes_active(self): - return self.nodes_in_state('Ready', 'Active') - - def nodes_active_by_constraints(self): - return self._objects_by_attribute(self.nodes_active(), 'labels_as_constraints') - - # - # operations - # - - def services_clean(self): - cleaned_service_ids = [] - completed_services = list(self.services_completed()) # returns a generator, should probably fix this - if completed_services: - cleaned_service_ids.extend(self.service_rm([x.id for x in completed_services])) - terminal_services = list(self.services_terminal()) - for service in terminal_services: - log.warning('cleaned service in abnormal terminal state: %s (%s). state: %s', service.name, service.id, service.state) - if terminal_services: - cleaned_service_ids.extend(self.service_rm([x.id for x in terminal_services])) - return filter(lambda x: x.id in cleaned_service_ids, completed_services + terminal_services) - - -class DockerSwarmCLIInterface(DockerSwarmInterface, DockerCLIInterface): - - container_type = 'docker_swarm_cli' - option_map = { - # `service create` options - 'constraint': {'flag': '--constraint', 'type': 'list_of_kovtrips'}, - 'replicas': {'flag': '--replicas', 'type': 'string'}, - 'restart_condition': {'flag': '--restart-condition', 'type': 'string'}, - 'environment': {'flag': '--env', 'type': 'list_of_kvpairs'}, - 'name': {'flag': '--name', 'type': 'string'}, - 'publish_port_random': {'flag': '--publish', 'type': 'string'}, - 'cpu_limit': {'flag': '--limit-cpu', 'type': 'string'}, - 'mem_limit': {'flag': '--limit-memory', 'type': 'string'}, - 'cpu_reservation': {'flag': '--reserve-cpu', 'type': 'string'}, - 'mem_reservation': {'flag': '--reserve-memory', 'type': 'string'}, - # `service update` options - 'label_add': {'flag': '--label-add', 'type': 'list_of_kvpairs'}, - 'label_rm': {'flag': '--label-rm', 'type': 'list_of_kvpairs'}, - 'availability': {'flag': '--availability', 'type': 'string'}, - } - - # - # docker object generators - # - - def services(self, id=None, name=None): - for service_dict in self.service_ls(id=id, name=name): - service_id = service_dict['ID'] - service_name = service_dict['NAME'] - if not service_name.startswith(self._name_prefix): - continue - task_list = self.service_ps(service_id) - yield DockerService.from_cli(self, service_dict, task_list) - - def service_tasks(self, service): - for task_dict in self.service_ps(service.id): - if task_dict['NAME'].strip().startswith(r'\_'): - continue # historical task - yield DockerTask.from_cli(self, task_dict, service=service) - - def nodes(self, id=None, name=None): - for node_dict in self.node_ls(id=id, name=name): - node_id = node_dict['ID'].strip(' *') - node_name = node_dict['HOSTNAME'] - if self._node_prefix and not node_name.startswith(self._node_prefix): - continue - task_list = filter(lambda x: x['NAME'].startswith(self._name_prefix), self.node_ps(node_id)) - yield DockerNode.from_cli(self, node_dict, task_list) - - # - # docker subcommands - # - - def service_create(self, command, image=None, **kwopts): - if ('service_create_image_constraint' in self._conf or 'service_create_cpus_constraint' in self._conf) and 'constraint' not in kwopts: - kwopts['constraint'] = [] - image = self._get_image(image) - if self._conf.service_create_image_constraint: - kwopts['constraint'].append((IMAGE_CONSTRAINT, '==', image)) - if self._conf.service_create_cpus_constraint: - cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1')) - kwopts['constraint'].append((CPUS_CONSTRAINT, '==', cpus)) - if self._conf.cpus: - kwopts['cpu_limit'] = self._conf.cpus - kwopts['cpu_reservation'] = self._conf.cpus - if self._conf.memory: - kwopts['mem_limit'] = self._conf.memory - kwopts['mem_reservation'] = self._conf.memory - self.set_kwopts_name(kwopts) - args = '{kwopts} {image} {command}'.format( - kwopts=self._stringify_kwopts(kwopts), - image=image if image else '', - command=command if command else '', - ).strip() - service_id = self._run_docker(subcommand='service create', args=args, verbose=True) - return DockerService.from_id(self, service_id) - - @docker_json - def service_inspect(self, service_id): - return self._run_docker(subcommand='service inspect', args=service_id)[0] - - @docker_columns - def service_ls(self, id=None, name=None): - return self._run_docker(subcommand='service ls', args=self._filter_by_id_or_name(id, name)) - - @docker_columns - def service_ps(self, service_id): - return self._run_docker(subcommand='service ps', args='--no-trunc {}'.format(service_id)) - - def service_rm(self, service_ids): - service_ids = ' '.join(service_ids) - return self._run_docker(subcommand='service rm', args=service_ids).splitlines() - - @docker_json - def node_inspect(self, node_id): - return self._run_docker(subcommand='node inspect', args=node_id)[0] - - @docker_columns - def node_ls(self, id=None, name=None): - return self._run_docker(subcommand='node ls', args=self._filter_by_id_or_name(id, name)) - - @docker_columns - def node_ps(self, node_id): - return self._run_docker(subcommand='node ps', args='--no-trunc {}'.format(node_id)) - - def node_update(self, node_id, **kwopts): - return self._run_docker(subcommand='node update', args='{kwopts} {node_id}'.format( - kwopts=self._stringify_kwopts(kwopts), - node_id=node_id - )) - - @docker_json - def task_inspect(self, task_id): - return self._run_docker(subcommand="inspect", args=task_id) - - -class DockerSwarmAPIInterface(DockerSwarmInterface, DockerAPIInterface): - - container_type = 'docker_swarm' - placement_option_map = { - 'constraint': {'param': 'constraints'}, - } - service_mode_option_map = { - 'service_mode': {'param': 0, 'default': 'replicated'}, - 'replicas': {'default': 1}, - } - endpoint_spec_option_map = { - 'ports': {}, - } - resources_option_map = { - 'cpus': {'params': ('cpu_limit', 'cpu_reservation'), 'map': lambda x: int(x * 1000000000)}, - 'memory': {'params': ('mem_limit', 'mem_reservation')}, - } - container_spec_option_map = { - 'image': {'param': 0}, - 'command': {}, - 'environment': {'param': 'env'}, - 'labels': {}, - } - restart_policy_option_map = { - 'restart_condition': {'param': 'condition', 'default': 'none'}, - 'restart_delay': {'param': 'delay'}, - 'restart_max_attempts': {'param': 'max_attemps'}, - } - task_template_option_map = { - '_container_spec': {'spec_class': docker.types.ContainerSpec, 'required': True}, - '_resources': {'spec_class': docker.types.Resources}, - '_restart_policy': {'spec_class': docker.types.RestartPolicy}, - '_placement': {'spec_class': docker.types.Placement}, - } - node_spec_option_map = { - 'availability': {'param': 'Availability'}, - 'name': {'param': 'Name'}, - 'role': {'param': 'Role'}, - 'labels': {'param': 'Labels'}, - } - - @staticmethod - def create_random_port_spec(port): - return { - 'Protocol': 'tcp', - 'PublishedPort': None, - 'TargetPort': port, - } - - # - # docker subcommands - # - - def service_create(self, command, image=None, **kwopts): - # TODO: some of this should probably move to run_in_container when the CLI interface is removed - log.debug("Creating docker service with image '%s' for command: %s", image, command) - # insert run kwopts from config - for opt in self.conf_run_kwopts: - if self._conf[opt]: - kwopts[opt] = self._conf[opt] - # image is part of the container spec - kwopts['image'] = self._get_image(image) - # service constraints - kwopts['constraint'] = kwopts.get('constraint', []) - if self._conf.service_create_image_constraint: - kwopts['constraint'].append((IMAGE_CONSTRAINT + '==' + image)) - if self._conf.service_create_cpus_constraint: - cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1')) - kwopts['constraint'].append((CPUS_CONSTRAINT + '==' + cpus)) - # ports - if 'publish_port_random' in kwopts: - kwopts['ports'] = [DockerSwarmAPIInterface.create_random_port_spec(kwopts.pop('publish_port_random'))] - # create specs - service_mode = self._create_docker_api_spec('service_mode', docker.types.ServiceMode, kwopts) - endpoint_spec = self._create_docker_api_spec('endpoint_spec', docker.types.EndpointSpec, kwopts) - task_template = self._create_docker_api_spec('task_template', docker.types.TaskTemplate, kwopts) - self.set_kwopts_name(kwopts) - log.debug("Docker service task template:\n%s", safe_dumps_formatted(task_template)) - log.debug("Docker service endpoint specification:\n%s", safe_dumps_formatted(endpoint_spec)) - log.debug("Docker service mode:\n%s", safe_dumps_formatted(service_mode)) - log.debug("Docker service creation parameters:\n%s", safe_dumps_formatted(kwopts)) - success_test = partial(self._first, self.service_ls, name=kwopts['name']) - # this can raise exceptions, if necessary we could wrap them in a more generic "creation failed" exception class - service = self._client.create_service( - task_template, - mode=service_mode, - endpoint_spec=endpoint_spec, - success_test=success_test, - max_tries=5, - **kwopts) - service_id = service.get('ID') - log.debug('Created service: %s (%s)', kwopts['name'], service_id) - return DockerService.from_id(self, service_id) - - def service_inspect(self, service_id): - return self._client.inspect_service(service_id) - - def service_ls(self, id=None, name=None): - return self._client.services(filters=self._filter_by_id_or_name(id, name)) - - # roughly `docker service ps` - def service_ps(self, service_id): - return self.task_ls(filters={'service': service_id}) - - def service_rm(self, service_ids): - r = [] - for service_id in service_ids: - self._client.remove_service(service_id) - r.append(service_id) - return r - - def node_inspect(self, node_id): - return self._client.inspect_node(node_id) - - def node_ls(self, id=None, name=None): - return self._client.nodes(filters=self._filter_by_id_or_name(id, name)) - - # roughly `docker node ps` - def node_ps(self, node_id): - return self.task_ls(filters={'node': node_id}) - - def node_update(self, node_id, **kwopts): - node = DockerNode.from_id(self, node_id) - spec = node.inspect['Spec'] - if 'label_add' in kwopts: - kwopts['labels'] = spec.get('Labels', {}) - kwopts['labels'].update(kwopts.pop('label_add')) - spec.update(self._create_docker_api_spec('node_spec', dict, kwopts)) - return self._client.update_node(node.id, node.version, node_spec=spec) - - def task_inspect(self, task_id): - return self._client.inspect_task(task_id) - - def task_ls(self, filters=None): - return self._client.tasks(filters=filters)
