comparison env/lib/python3.9/site-packages/galaxy/containers/docker_swarm.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """
2 Docker Swarm mode interface
3 """
4
5 import logging
6 import os.path
7 import subprocess
8 from functools import partial
9 from typing import Any, Dict, Optional
10
11 try:
12 import docker.types
13 except ImportError:
14 from galaxy.util.bunch import Bunch
15 docker = Bunch(types=Bunch(
16 ContainerSpec=None,
17 RestartPolicy=None,
18 Resources=None,
19 Placement=None,
20 ))
21
22 from galaxy.containers.docker import (
23 DockerAPIInterface,
24 DockerCLIInterface,
25 DockerInterface
26 )
27 from galaxy.containers.docker_decorators import docker_columns, docker_json
28 from galaxy.containers.docker_model import (
29 CPUS_CONSTRAINT,
30 DockerNode,
31 DockerService,
32 DockerTask,
33 IMAGE_CONSTRAINT
34 )
35 from galaxy.exceptions import ContainerRunError
36 from galaxy.util import unicodify
37 from galaxy.util.json import safe_dumps_formatted
38
39 log = logging.getLogger(__name__)
40
41 SWARM_MANAGER_PATH = os.path.abspath(
42 os.path.join(
43 os.path.dirname(__file__),
44 os.path.pardir,
45 os.path.pardir,
46 os.path.pardir,
47 'scripts',
48 'docker_swarm_manager.py'))
49
50
51 class DockerSwarmInterface(DockerInterface):
52
53 container_class = DockerService
54 conf_defaults: Dict[str, Optional[Any]] = {
55 'ignore_volumes': False,
56 'node_prefix': None,
57 'service_create_image_constraint': False,
58 'service_create_cpus_constraint': False,
59 'resolve_image_digest': False,
60 'managed': True,
61 'manager_autostart': True,
62 }
63 publish_port_list_required = True
64 supports_volumes = False
65
66 def validate_config(self):
67 super().validate_config()
68 self._node_prefix = self._conf.node_prefix
69
70 def run_in_container(self, command, image=None, **kwopts):
71 """Run a service like a detached container
72 """
73 kwopts['replicas'] = 1
74 kwopts['restart_condition'] = 'none'
75 if kwopts.get('publish_all_ports', False):
76 # not supported for services
77 # TODO: inspect image (or query registry if possible) for port list
78 if kwopts.get('publish_port_random', False) or kwopts.get('ports', False):
79 # assume this covers for publish_all_ports
80 del kwopts['publish_all_ports']
81 else:
82 raise ContainerRunError(
83 "Publishing all ports is not supported in Docker swarm"
84 " mode, use `publish_port_random` or `ports`",
85 image=image,
86 command=command
87 )
88 if not kwopts.get('detach', True):
89 raise ContainerRunError(
90 "Running attached containers is not supported in Docker swarm mode",
91 image=image,
92 command=command
93 )
94 elif kwopts.get('detach', None):
95 del kwopts['detach']
96 if kwopts.get('volumes', None):
97 if self._conf.ignore_volumes:
98 log.warning(
99 "'volumes' kwopt is set and not supported in Docker swarm "
100 "mode, volumes will not be passed (set 'ignore_volumes: "
101 "False' in containers config to fail instead): %s" % kwopts['volumes']
102 )
103 else:
104 raise ContainerRunError(
105 "'volumes' kwopt is set and not supported in Docker swarm "
106 "mode (set 'ignore_volumes: True' in containers config to "
107 "warn instead): %s" % kwopts['volumes'],
108 image=image,
109 command=command
110 )
111 # ensure the volumes key is removed from kwopts
112 kwopts.pop('volumes', None)
113 service = self.service_create(command, image=image, **kwopts)
114 self._run_swarm_manager()
115 return service
116
117 #
118 # helpers
119 #
120
121 def _run_swarm_manager(self):
122 if self._conf.managed and self._conf.manager_autostart:
123 try:
124 # sys.exectuable would be preferable to using $PATH, but sys.executable is probably uwsgi
125 subprocess.check_call(['python', SWARM_MANAGER_PATH, '--containers-config-file',
126 self.containers_config_file, '--swarm', self.key])
127 except subprocess.CalledProcessError as exc:
128 log.error('Failed to launch swarm manager: %s', unicodify(exc))
129
130 def _get_image(self, image):
131 """Get the image string, either from the argument, or from the
132 configured interface default if ``image`` is ``None``. Optionally
133 resolve the image to its digest if ``resolve_image_digest`` is set in
134 the interface configuration.
135
136 If the image has not been pulled, the repo digest cannot be determined
137 and the image name will be returned.
138
139 :type image: str or None
140 :param image: image id or name
141
142 :returns: image name or image repo digest
143 """
144 if not image:
145 image = self._conf.image
146 assert image is not None, "No image supplied as parameter and no image set as default in config, cannot create service"
147 if self._conf.resolve_image_digest:
148 image = self.image_repodigest(image)
149 return image
150
151 def _objects_by_attribute(self, generator, attribute_name):
152 rval = {}
153 for obj in generator:
154 attr = getattr(obj, attribute_name)
155 if attr not in rval:
156 rval[attr] = []
157 rval[attr].append(obj)
158 return rval
159
160 #
161 # docker object generators
162 #
163
164 def services(self, id=None, name=None):
165 for service_dict in self.service_ls(id=id, name=name):
166 service_id = service_dict['ID']
167 service = DockerService(self, service_id, inspect=service_dict)
168 if service.name.startswith(self._name_prefix):
169 yield service
170
171 def service(self, id=None, name=None):
172 try:
173 return next(self.services(id=id, name=name))
174 except StopIteration:
175 return None
176
177 def services_in_state(self, desired, current, tasks='any'):
178 for service in self.services():
179 if service.in_state(desired, current, tasks=tasks):
180 yield service
181
182 def service_tasks(self, service):
183 for task_dict in self.service_ps(service.id):
184 yield DockerTask.from_api(self, task_dict, service=service)
185
186 def nodes(self, id=None, name=None):
187 for node_dict in self.node_ls(id=id, name=name):
188 node_id = node_dict['ID']
189 node = DockerNode(self, node_id, inspect=node_dict)
190 if self._node_prefix and not node.name.startswith(self._node_prefix):
191 continue
192 yield node
193
194 def node(self, id=None, name=None):
195 try:
196 return next(self.nodes(id=id, name=name))
197 except StopIteration:
198 return None
199
200 def nodes_in_state(self, status, availability):
201 for node in self.nodes():
202 if node.in_state(status, availability):
203 yield node
204
205 def node_tasks(self, node):
206 for task_dict in self.node_ps(node.id):
207 yield DockerTask.from_api(self, task_dict, node=node)
208
209 #
210 # higher level queries
211 #
212
213 def services_waiting(self):
214 return self.services_in_state('Running', 'Pending')
215
216 def services_waiting_by_constraints(self):
217 return self._objects_by_attribute(self.services_waiting(), 'constraints')
218
219 def services_completed(self):
220 return self.services_in_state('Shutdown', 'Complete', tasks='all')
221
222 def services_terminal(self):
223 return [s for s in self.services() if s.terminal]
224
225 def nodes_active(self):
226 return self.nodes_in_state('Ready', 'Active')
227
228 def nodes_active_by_constraints(self):
229 return self._objects_by_attribute(self.nodes_active(), 'labels_as_constraints')
230
231 #
232 # operations
233 #
234
235 def services_clean(self):
236 cleaned_service_ids = []
237 completed_services = list(self.services_completed()) # returns a generator, should probably fix this
238 if completed_services:
239 cleaned_service_ids.extend(self.service_rm([x.id for x in completed_services]))
240 terminal_services = list(self.services_terminal())
241 for service in terminal_services:
242 log.warning('cleaned service in abnormal terminal state: %s (%s). state: %s', service.name, service.id, service.state)
243 if terminal_services:
244 cleaned_service_ids.extend(self.service_rm([x.id for x in terminal_services]))
245 return filter(lambda x: x.id in cleaned_service_ids, completed_services + terminal_services)
246
247
248 class DockerSwarmCLIInterface(DockerSwarmInterface, DockerCLIInterface):
249
250 container_type = 'docker_swarm_cli'
251 option_map = {
252 # `service create` options
253 'constraint': {'flag': '--constraint', 'type': 'list_of_kovtrips'},
254 'replicas': {'flag': '--replicas', 'type': 'string'},
255 'restart_condition': {'flag': '--restart-condition', 'type': 'string'},
256 'environment': {'flag': '--env', 'type': 'list_of_kvpairs'},
257 'name': {'flag': '--name', 'type': 'string'},
258 'publish_port_random': {'flag': '--publish', 'type': 'string'},
259 'cpu_limit': {'flag': '--limit-cpu', 'type': 'string'},
260 'mem_limit': {'flag': '--limit-memory', 'type': 'string'},
261 'cpu_reservation': {'flag': '--reserve-cpu', 'type': 'string'},
262 'mem_reservation': {'flag': '--reserve-memory', 'type': 'string'},
263 # `service update` options
264 'label_add': {'flag': '--label-add', 'type': 'list_of_kvpairs'},
265 'label_rm': {'flag': '--label-rm', 'type': 'list_of_kvpairs'},
266 'availability': {'flag': '--availability', 'type': 'string'},
267 }
268
269 #
270 # docker object generators
271 #
272
273 def services(self, id=None, name=None):
274 for service_dict in self.service_ls(id=id, name=name):
275 service_id = service_dict['ID']
276 service_name = service_dict['NAME']
277 if not service_name.startswith(self._name_prefix):
278 continue
279 task_list = self.service_ps(service_id)
280 yield DockerService.from_cli(self, service_dict, task_list)
281
282 def service_tasks(self, service):
283 for task_dict in self.service_ps(service.id):
284 if task_dict['NAME'].strip().startswith(r'\_'):
285 continue # historical task
286 yield DockerTask.from_cli(self, task_dict, service=service)
287
288 def nodes(self, id=None, name=None):
289 for node_dict in self.node_ls(id=id, name=name):
290 node_id = node_dict['ID'].strip(' *')
291 node_name = node_dict['HOSTNAME']
292 if self._node_prefix and not node_name.startswith(self._node_prefix):
293 continue
294 task_list = filter(lambda x: x['NAME'].startswith(self._name_prefix), self.node_ps(node_id))
295 yield DockerNode.from_cli(self, node_dict, task_list)
296
297 #
298 # docker subcommands
299 #
300
301 def service_create(self, command, image=None, **kwopts):
302 if ('service_create_image_constraint' in self._conf or 'service_create_cpus_constraint' in self._conf) and 'constraint' not in kwopts:
303 kwopts['constraint'] = []
304 image = self._get_image(image)
305 if self._conf.service_create_image_constraint:
306 kwopts['constraint'].append((IMAGE_CONSTRAINT, '==', image))
307 if self._conf.service_create_cpus_constraint:
308 cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1'))
309 kwopts['constraint'].append((CPUS_CONSTRAINT, '==', cpus))
310 if self._conf.cpus:
311 kwopts['cpu_limit'] = self._conf.cpus
312 kwopts['cpu_reservation'] = self._conf.cpus
313 if self._conf.memory:
314 kwopts['mem_limit'] = self._conf.memory
315 kwopts['mem_reservation'] = self._conf.memory
316 self.set_kwopts_name(kwopts)
317 args = '{kwopts} {image} {command}'.format(
318 kwopts=self._stringify_kwopts(kwopts),
319 image=image if image else '',
320 command=command if command else '',
321 ).strip()
322 service_id = self._run_docker(subcommand='service create', args=args, verbose=True)
323 return DockerService.from_id(self, service_id)
324
325 @docker_json
326 def service_inspect(self, service_id):
327 return self._run_docker(subcommand='service inspect', args=service_id)[0]
328
329 @docker_columns
330 def service_ls(self, id=None, name=None):
331 return self._run_docker(subcommand='service ls', args=self._filter_by_id_or_name(id, name))
332
333 @docker_columns
334 def service_ps(self, service_id):
335 return self._run_docker(subcommand='service ps', args=f'--no-trunc {service_id}')
336
337 def service_rm(self, service_ids):
338 service_ids = ' '.join(service_ids)
339 return self._run_docker(subcommand='service rm', args=service_ids).splitlines()
340
341 @docker_json
342 def node_inspect(self, node_id):
343 return self._run_docker(subcommand='node inspect', args=node_id)[0]
344
345 @docker_columns
346 def node_ls(self, id=None, name=None):
347 return self._run_docker(subcommand='node ls', args=self._filter_by_id_or_name(id, name))
348
349 @docker_columns
350 def node_ps(self, node_id):
351 return self._run_docker(subcommand='node ps', args=f'--no-trunc {node_id}')
352
353 def node_update(self, node_id, **kwopts):
354 return self._run_docker(subcommand='node update', args='{kwopts} {node_id}'.format(
355 kwopts=self._stringify_kwopts(kwopts),
356 node_id=node_id
357 ))
358
359 @docker_json
360 def task_inspect(self, task_id):
361 return self._run_docker(subcommand="inspect", args=task_id)
362
363
364 class DockerSwarmAPIInterface(DockerSwarmInterface, DockerAPIInterface):
365
366 container_type = 'docker_swarm'
367 placement_option_map = {
368 'constraint': {'param': 'constraints'},
369 }
370 service_mode_option_map = {
371 'service_mode': {'param': 0, 'default': 'replicated'},
372 'replicas': {'default': 1},
373 }
374 endpoint_spec_option_map: Dict[str, Dict] = {
375 'ports': {},
376 }
377 resources_option_map = {
378 'cpus': {'params': ('cpu_limit', 'cpu_reservation'), 'map': lambda x: int(x * 1000000000)},
379 'memory': {'params': ('mem_limit', 'mem_reservation')},
380 }
381 container_spec_option_map = {
382 'image': {'param': 0},
383 'command': {},
384 'environment': {'param': 'env'},
385 'labels': {},
386 }
387 restart_policy_option_map = {
388 'restart_condition': {'param': 'condition', 'default': 'none'},
389 'restart_delay': {'param': 'delay'},
390 'restart_max_attempts': {'param': 'max_attemps'},
391 }
392 task_template_option_map = {
393 '_container_spec': {'spec_class': docker.types.ContainerSpec, 'required': True},
394 '_resources': {'spec_class': docker.types.Resources},
395 '_restart_policy': {'spec_class': docker.types.RestartPolicy},
396 '_placement': {'spec_class': docker.types.Placement},
397 }
398 node_spec_option_map = {
399 'availability': {'param': 'Availability'},
400 'name': {'param': 'Name'},
401 'role': {'param': 'Role'},
402 'labels': {'param': 'Labels'},
403 }
404
405 @staticmethod
406 def create_random_port_spec(port):
407 return {
408 'Protocol': 'tcp',
409 'PublishedPort': None,
410 'TargetPort': port,
411 }
412
413 #
414 # docker subcommands
415 #
416
417 def service_create(self, command, image=None, **kwopts):
418 # TODO: some of this should probably move to run_in_container when the CLI interface is removed
419 log.debug("Creating docker service with image '%s' for command: %s", image, command)
420 # insert run kwopts from config
421 for opt in self.conf_run_kwopts:
422 if self._conf[opt]:
423 kwopts[opt] = self._conf[opt]
424 # image is part of the container spec
425 kwopts['image'] = self._get_image(image)
426 # service constraints
427 kwopts['constraint'] = kwopts.get('constraint', [])
428 if self._conf.service_create_image_constraint:
429 kwopts['constraint'].append(IMAGE_CONSTRAINT + '==' + image)
430 if self._conf.service_create_cpus_constraint:
431 cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1'))
432 kwopts['constraint'].append(CPUS_CONSTRAINT + '==' + cpus)
433 # ports
434 if 'publish_port_random' in kwopts:
435 kwopts['ports'] = [DockerSwarmAPIInterface.create_random_port_spec(kwopts.pop('publish_port_random'))]
436 # create specs
437 service_mode = self._create_docker_api_spec('service_mode', docker.types.ServiceMode, kwopts)
438 endpoint_spec = self._create_docker_api_spec('endpoint_spec', docker.types.EndpointSpec, kwopts)
439 task_template = self._create_docker_api_spec('task_template', docker.types.TaskTemplate, kwopts)
440 self.set_kwopts_name(kwopts)
441 log.debug("Docker service task template:\n%s", safe_dumps_formatted(task_template))
442 log.debug("Docker service endpoint specification:\n%s", safe_dumps_formatted(endpoint_spec))
443 log.debug("Docker service mode:\n%s", safe_dumps_formatted(service_mode))
444 log.debug("Docker service creation parameters:\n%s", safe_dumps_formatted(kwopts))
445 success_test = partial(self._first, self.service_ls, name=kwopts['name'])
446 # this can raise exceptions, if necessary we could wrap them in a more generic "creation failed" exception class
447 service = self._client.create_service(
448 task_template,
449 mode=service_mode,
450 endpoint_spec=endpoint_spec,
451 success_test=success_test,
452 max_tries=5,
453 **kwopts)
454 service_id = service.get('ID')
455 log.debug('Created service: %s (%s)', kwopts['name'], service_id)
456 return DockerService.from_id(self, service_id)
457
458 def service_inspect(self, service_id):
459 return self._client.inspect_service(service_id)
460
461 def service_ls(self, id=None, name=None):
462 return self._client.services(filters=self._filter_by_id_or_name(id, name))
463
464 # roughly `docker service ps`
465 def service_ps(self, service_id):
466 return self.task_ls(filters={'service': service_id})
467
468 def service_rm(self, service_ids):
469 r = []
470 for service_id in service_ids:
471 self._client.remove_service(service_id)
472 r.append(service_id)
473 return r
474
475 def node_inspect(self, node_id):
476 return self._client.inspect_node(node_id)
477
478 def node_ls(self, id=None, name=None):
479 return self._client.nodes(filters=self._filter_by_id_or_name(id, name))
480
481 # roughly `docker node ps`
482 def node_ps(self, node_id):
483 return self.task_ls(filters={'node': node_id})
484
485 def node_update(self, node_id, **kwopts):
486 node = DockerNode.from_id(self, node_id)
487 spec = node.inspect['Spec']
488 if 'label_add' in kwopts:
489 kwopts['labels'] = spec.get('Labels', {})
490 kwopts['labels'].update(kwopts.pop('label_add'))
491 spec.update(self._create_docker_api_spec('node_spec', dict, kwopts))
492 return self._client.update_node(node.id, node.version, node_spec=spec)
493
494 def task_inspect(self, task_id):
495 return self._client.inspect_task(task_id)
496
497 def task_ls(self, filters=None):
498 return self._client.tasks(filters=filters)