Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/galaxy/containers/docker_swarm.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
| author | shellac |
|---|---|
| date | Sat, 02 May 2020 07:14:21 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:26e78fe6e8c4 |
|---|---|
| 1 """ | |
| 2 Docker Swarm mode interface | |
| 3 """ | |
| 4 from __future__ import absolute_import | |
| 5 | |
| 6 import logging | |
| 7 import os.path | |
| 8 import subprocess | |
| 9 from functools import partial | |
| 10 | |
| 11 try: | |
| 12 import docker.types | |
| 13 except ImportError: | |
| 14 from galaxy.util.bunch import Bunch | |
| 15 docker = Bunch(types=Bunch( | |
| 16 ContainerSpec=None, | |
| 17 RestartPolicy=None, | |
| 18 Resources=None, | |
| 19 Placement=None, | |
| 20 )) | |
| 21 | |
| 22 from galaxy.containers.docker import ( | |
| 23 DockerAPIInterface, | |
| 24 DockerCLIInterface, | |
| 25 DockerInterface | |
| 26 ) | |
| 27 from galaxy.containers.docker_decorators import docker_columns, docker_json | |
| 28 from galaxy.containers.docker_model import ( | |
| 29 CPUS_CONSTRAINT, | |
| 30 DockerNode, | |
| 31 DockerService, | |
| 32 DockerTask, | |
| 33 IMAGE_CONSTRAINT | |
| 34 ) | |
| 35 from galaxy.exceptions import ContainerRunError | |
| 36 from galaxy.util import unicodify | |
| 37 from galaxy.util.json import safe_dumps_formatted | |
| 38 | |
| 39 log = logging.getLogger(__name__) | |
| 40 | |
| 41 SWARM_MANAGER_PATH = os.path.abspath( | |
| 42 os.path.join( | |
| 43 os.path.dirname(__file__), | |
| 44 os.path.pardir, | |
| 45 os.path.pardir, | |
| 46 os.path.pardir, | |
| 47 'scripts', | |
| 48 'docker_swarm_manager.py')) | |
| 49 | |
| 50 | |
| 51 class DockerSwarmInterface(DockerInterface): | |
| 52 | |
| 53 container_class = DockerService | |
| 54 conf_defaults = { | |
| 55 'ignore_volumes': False, | |
| 56 'node_prefix': None, | |
| 57 'service_create_image_constraint': False, | |
| 58 'service_create_cpus_constraint': False, | |
| 59 'resolve_image_digest': False, | |
| 60 'managed': True, | |
| 61 'manager_autostart': True, | |
| 62 } | |
| 63 publish_port_list_required = True | |
| 64 supports_volumes = False | |
| 65 | |
| 66 def validate_config(self): | |
| 67 super(DockerSwarmInterface, self).validate_config() | |
| 68 self._node_prefix = self._conf.node_prefix | |
| 69 | |
| 70 def run_in_container(self, command, image=None, **kwopts): | |
| 71 """Run a service like a detached container | |
| 72 """ | |
| 73 kwopts['replicas'] = 1 | |
| 74 kwopts['restart_condition'] = 'none' | |
| 75 if kwopts.get('publish_all_ports', False): | |
| 76 # not supported for services | |
| 77 # TODO: inspect image (or query registry if possible) for port list | |
| 78 if kwopts.get('publish_port_random', False) or kwopts.get('ports', False): | |
| 79 # assume this covers for publish_all_ports | |
| 80 del kwopts['publish_all_ports'] | |
| 81 else: | |
| 82 raise ContainerRunError( | |
| 83 "Publishing all ports is not supported in Docker swarm" | |
| 84 " mode, use `publish_port_random` or `ports`", | |
| 85 image=image, | |
| 86 command=command | |
| 87 ) | |
| 88 if not kwopts.get('detach', True): | |
| 89 raise ContainerRunError( | |
| 90 "Running attached containers is not supported in Docker swarm mode", | |
| 91 image=image, | |
| 92 command=command | |
| 93 ) | |
| 94 elif kwopts.get('detach', None): | |
| 95 del kwopts['detach'] | |
| 96 if kwopts.get('volumes', None): | |
| 97 if self._conf.ignore_volumes: | |
| 98 log.warning( | |
| 99 "'volumes' kwopt is set and not supported in Docker swarm " | |
| 100 "mode, volumes will not be passed (set 'ignore_volumes: " | |
| 101 "False' in containers config to fail instead): %s" % kwopts['volumes'] | |
| 102 ) | |
| 103 else: | |
| 104 raise ContainerRunError( | |
| 105 "'volumes' kwopt is set and not supported in Docker swarm " | |
| 106 "mode (set 'ignore_volumes: True' in containers config to " | |
| 107 "warn instead): %s" % kwopts['volumes'], | |
| 108 image=image, | |
| 109 command=command | |
| 110 ) | |
| 111 # ensure the volumes key is removed from kwopts | |
| 112 kwopts.pop('volumes', None) | |
| 113 service = self.service_create(command, image=image, **kwopts) | |
| 114 self._run_swarm_manager() | |
| 115 return service | |
| 116 | |
| 117 # | |
| 118 # helpers | |
| 119 # | |
| 120 | |
| 121 def _run_swarm_manager(self): | |
| 122 if self._conf.managed and self._conf.manager_autostart: | |
| 123 try: | |
| 124 # sys.exectuable would be preferable to using $PATH, but sys.executable is probably uwsgi | |
| 125 subprocess.check_call(['python', SWARM_MANAGER_PATH, '--containers-config-file', | |
| 126 self.containers_config_file, '--swarm', self.key]) | |
| 127 except subprocess.CalledProcessError as exc: | |
| 128 log.error('Failed to launch swarm manager: %s', unicodify(exc)) | |
| 129 | |
| 130 def _get_image(self, image): | |
| 131 """Get the image string, either from the argument, or from the | |
| 132 configured interface default if ``image`` is ``None``. Optionally | |
| 133 resolve the image to its digest if ``resolve_image_digest`` is set in | |
| 134 the interface configuration. | |
| 135 | |
| 136 If the image has not been pulled, the repo digest cannot be determined | |
| 137 and the image name will be returned. | |
| 138 | |
| 139 :type image: str or None | |
| 140 :param image: image id or name | |
| 141 | |
| 142 :returns: image name or image repo digest | |
| 143 """ | |
| 144 if not image: | |
| 145 image = self._conf.image | |
| 146 assert image is not None, "No image supplied as parameter and no image set as default in config, cannot create service" | |
| 147 if self._conf.resolve_image_digest: | |
| 148 image = self.image_repodigest(image) | |
| 149 return image | |
| 150 | |
| 151 def _objects_by_attribute(self, generator, attribute_name): | |
| 152 rval = {} | |
| 153 for obj in generator: | |
| 154 attr = getattr(obj, attribute_name) | |
| 155 if attr not in rval: | |
| 156 rval[attr] = [] | |
| 157 rval[attr].append(obj) | |
| 158 return rval | |
| 159 | |
| 160 # | |
| 161 # docker object generators | |
| 162 # | |
| 163 | |
| 164 def services(self, id=None, name=None): | |
| 165 for service_dict in self.service_ls(id=id, name=name): | |
| 166 service_id = service_dict['ID'] | |
| 167 service = DockerService(self, service_id, inspect=service_dict) | |
| 168 if service.name.startswith(self._name_prefix): | |
| 169 yield service | |
| 170 | |
| 171 def service(self, id=None, name=None): | |
| 172 try: | |
| 173 return self.services(id=id, name=name).next() | |
| 174 except StopIteration: | |
| 175 return None | |
| 176 | |
| 177 def services_in_state(self, desired, current, tasks='any'): | |
| 178 for service in self.services(): | |
| 179 if service.in_state(desired, current, tasks=tasks): | |
| 180 yield service | |
| 181 | |
| 182 def service_tasks(self, service): | |
| 183 for task_dict in self.service_ps(service.id): | |
| 184 yield DockerTask.from_api(self, task_dict, service=service) | |
| 185 | |
| 186 def nodes(self, id=None, name=None): | |
| 187 for node_dict in self.node_ls(id=id, name=name): | |
| 188 node_id = node_dict['ID'] | |
| 189 node = DockerNode(self, node_id, inspect=node_dict) | |
| 190 if self._node_prefix and not node.name.startswith(self._node_prefix): | |
| 191 continue | |
| 192 yield node | |
| 193 | |
| 194 def node(self, id=None, name=None): | |
| 195 try: | |
| 196 return self.nodes(id=id, name=name).next() | |
| 197 except StopIteration: | |
| 198 return None | |
| 199 | |
| 200 def nodes_in_state(self, status, availability): | |
| 201 for node in self.nodes(): | |
| 202 if node.in_state(status, availability): | |
| 203 yield node | |
| 204 | |
| 205 def node_tasks(self, node): | |
| 206 for task_dict in self.node_ps(node.id): | |
| 207 yield DockerTask.from_api(self, task_dict, node=node) | |
| 208 | |
| 209 # | |
| 210 # higher level queries | |
| 211 # | |
| 212 | |
| 213 def services_waiting(self): | |
| 214 return self.services_in_state('Running', 'Pending') | |
| 215 | |
| 216 def services_waiting_by_constraints(self): | |
| 217 return self._objects_by_attribute(self.services_waiting(), 'constraints') | |
| 218 | |
| 219 def services_completed(self): | |
| 220 return self.services_in_state('Shutdown', 'Complete', tasks='all') | |
| 221 | |
| 222 def services_terminal(self): | |
| 223 return [s for s in self.services() if s.terminal] | |
| 224 | |
| 225 def nodes_active(self): | |
| 226 return self.nodes_in_state('Ready', 'Active') | |
| 227 | |
| 228 def nodes_active_by_constraints(self): | |
| 229 return self._objects_by_attribute(self.nodes_active(), 'labels_as_constraints') | |
| 230 | |
| 231 # | |
| 232 # operations | |
| 233 # | |
| 234 | |
| 235 def services_clean(self): | |
| 236 cleaned_service_ids = [] | |
| 237 completed_services = list(self.services_completed()) # returns a generator, should probably fix this | |
| 238 if completed_services: | |
| 239 cleaned_service_ids.extend(self.service_rm([x.id for x in completed_services])) | |
| 240 terminal_services = list(self.services_terminal()) | |
| 241 for service in terminal_services: | |
| 242 log.warning('cleaned service in abnormal terminal state: %s (%s). state: %s', service.name, service.id, service.state) | |
| 243 if terminal_services: | |
| 244 cleaned_service_ids.extend(self.service_rm([x.id for x in terminal_services])) | |
| 245 return filter(lambda x: x.id in cleaned_service_ids, completed_services + terminal_services) | |
| 246 | |
| 247 | |
| 248 class DockerSwarmCLIInterface(DockerSwarmInterface, DockerCLIInterface): | |
| 249 | |
| 250 container_type = 'docker_swarm_cli' | |
| 251 option_map = { | |
| 252 # `service create` options | |
| 253 'constraint': {'flag': '--constraint', 'type': 'list_of_kovtrips'}, | |
| 254 'replicas': {'flag': '--replicas', 'type': 'string'}, | |
| 255 'restart_condition': {'flag': '--restart-condition', 'type': 'string'}, | |
| 256 'environment': {'flag': '--env', 'type': 'list_of_kvpairs'}, | |
| 257 'name': {'flag': '--name', 'type': 'string'}, | |
| 258 'publish_port_random': {'flag': '--publish', 'type': 'string'}, | |
| 259 'cpu_limit': {'flag': '--limit-cpu', 'type': 'string'}, | |
| 260 'mem_limit': {'flag': '--limit-memory', 'type': 'string'}, | |
| 261 'cpu_reservation': {'flag': '--reserve-cpu', 'type': 'string'}, | |
| 262 'mem_reservation': {'flag': '--reserve-memory', 'type': 'string'}, | |
| 263 # `service update` options | |
| 264 'label_add': {'flag': '--label-add', 'type': 'list_of_kvpairs'}, | |
| 265 'label_rm': {'flag': '--label-rm', 'type': 'list_of_kvpairs'}, | |
| 266 'availability': {'flag': '--availability', 'type': 'string'}, | |
| 267 } | |
| 268 | |
| 269 # | |
| 270 # docker object generators | |
| 271 # | |
| 272 | |
| 273 def services(self, id=None, name=None): | |
| 274 for service_dict in self.service_ls(id=id, name=name): | |
| 275 service_id = service_dict['ID'] | |
| 276 service_name = service_dict['NAME'] | |
| 277 if not service_name.startswith(self._name_prefix): | |
| 278 continue | |
| 279 task_list = self.service_ps(service_id) | |
| 280 yield DockerService.from_cli(self, service_dict, task_list) | |
| 281 | |
| 282 def service_tasks(self, service): | |
| 283 for task_dict in self.service_ps(service.id): | |
| 284 if task_dict['NAME'].strip().startswith(r'\_'): | |
| 285 continue # historical task | |
| 286 yield DockerTask.from_cli(self, task_dict, service=service) | |
| 287 | |
| 288 def nodes(self, id=None, name=None): | |
| 289 for node_dict in self.node_ls(id=id, name=name): | |
| 290 node_id = node_dict['ID'].strip(' *') | |
| 291 node_name = node_dict['HOSTNAME'] | |
| 292 if self._node_prefix and not node_name.startswith(self._node_prefix): | |
| 293 continue | |
| 294 task_list = filter(lambda x: x['NAME'].startswith(self._name_prefix), self.node_ps(node_id)) | |
| 295 yield DockerNode.from_cli(self, node_dict, task_list) | |
| 296 | |
| 297 # | |
| 298 # docker subcommands | |
| 299 # | |
| 300 | |
| 301 def service_create(self, command, image=None, **kwopts): | |
| 302 if ('service_create_image_constraint' in self._conf or 'service_create_cpus_constraint' in self._conf) and 'constraint' not in kwopts: | |
| 303 kwopts['constraint'] = [] | |
| 304 image = self._get_image(image) | |
| 305 if self._conf.service_create_image_constraint: | |
| 306 kwopts['constraint'].append((IMAGE_CONSTRAINT, '==', image)) | |
| 307 if self._conf.service_create_cpus_constraint: | |
| 308 cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1')) | |
| 309 kwopts['constraint'].append((CPUS_CONSTRAINT, '==', cpus)) | |
| 310 if self._conf.cpus: | |
| 311 kwopts['cpu_limit'] = self._conf.cpus | |
| 312 kwopts['cpu_reservation'] = self._conf.cpus | |
| 313 if self._conf.memory: | |
| 314 kwopts['mem_limit'] = self._conf.memory | |
| 315 kwopts['mem_reservation'] = self._conf.memory | |
| 316 self.set_kwopts_name(kwopts) | |
| 317 args = '{kwopts} {image} {command}'.format( | |
| 318 kwopts=self._stringify_kwopts(kwopts), | |
| 319 image=image if image else '', | |
| 320 command=command if command else '', | |
| 321 ).strip() | |
| 322 service_id = self._run_docker(subcommand='service create', args=args, verbose=True) | |
| 323 return DockerService.from_id(self, service_id) | |
| 324 | |
| 325 @docker_json | |
| 326 def service_inspect(self, service_id): | |
| 327 return self._run_docker(subcommand='service inspect', args=service_id)[0] | |
| 328 | |
| 329 @docker_columns | |
| 330 def service_ls(self, id=None, name=None): | |
| 331 return self._run_docker(subcommand='service ls', args=self._filter_by_id_or_name(id, name)) | |
| 332 | |
| 333 @docker_columns | |
| 334 def service_ps(self, service_id): | |
| 335 return self._run_docker(subcommand='service ps', args='--no-trunc {}'.format(service_id)) | |
| 336 | |
| 337 def service_rm(self, service_ids): | |
| 338 service_ids = ' '.join(service_ids) | |
| 339 return self._run_docker(subcommand='service rm', args=service_ids).splitlines() | |
| 340 | |
| 341 @docker_json | |
| 342 def node_inspect(self, node_id): | |
| 343 return self._run_docker(subcommand='node inspect', args=node_id)[0] | |
| 344 | |
| 345 @docker_columns | |
| 346 def node_ls(self, id=None, name=None): | |
| 347 return self._run_docker(subcommand='node ls', args=self._filter_by_id_or_name(id, name)) | |
| 348 | |
| 349 @docker_columns | |
| 350 def node_ps(self, node_id): | |
| 351 return self._run_docker(subcommand='node ps', args='--no-trunc {}'.format(node_id)) | |
| 352 | |
| 353 def node_update(self, node_id, **kwopts): | |
| 354 return self._run_docker(subcommand='node update', args='{kwopts} {node_id}'.format( | |
| 355 kwopts=self._stringify_kwopts(kwopts), | |
| 356 node_id=node_id | |
| 357 )) | |
| 358 | |
| 359 @docker_json | |
| 360 def task_inspect(self, task_id): | |
| 361 return self._run_docker(subcommand="inspect", args=task_id) | |
| 362 | |
| 363 | |
| 364 class DockerSwarmAPIInterface(DockerSwarmInterface, DockerAPIInterface): | |
| 365 | |
| 366 container_type = 'docker_swarm' | |
| 367 placement_option_map = { | |
| 368 'constraint': {'param': 'constraints'}, | |
| 369 } | |
| 370 service_mode_option_map = { | |
| 371 'service_mode': {'param': 0, 'default': 'replicated'}, | |
| 372 'replicas': {'default': 1}, | |
| 373 } | |
| 374 endpoint_spec_option_map = { | |
| 375 'ports': {}, | |
| 376 } | |
| 377 resources_option_map = { | |
| 378 'cpus': {'params': ('cpu_limit', 'cpu_reservation'), 'map': lambda x: int(x * 1000000000)}, | |
| 379 'memory': {'params': ('mem_limit', 'mem_reservation')}, | |
| 380 } | |
| 381 container_spec_option_map = { | |
| 382 'image': {'param': 0}, | |
| 383 'command': {}, | |
| 384 'environment': {'param': 'env'}, | |
| 385 'labels': {}, | |
| 386 } | |
| 387 restart_policy_option_map = { | |
| 388 'restart_condition': {'param': 'condition', 'default': 'none'}, | |
| 389 'restart_delay': {'param': 'delay'}, | |
| 390 'restart_max_attempts': {'param': 'max_attemps'}, | |
| 391 } | |
| 392 task_template_option_map = { | |
| 393 '_container_spec': {'spec_class': docker.types.ContainerSpec, 'required': True}, | |
| 394 '_resources': {'spec_class': docker.types.Resources}, | |
| 395 '_restart_policy': {'spec_class': docker.types.RestartPolicy}, | |
| 396 '_placement': {'spec_class': docker.types.Placement}, | |
| 397 } | |
| 398 node_spec_option_map = { | |
| 399 'availability': {'param': 'Availability'}, | |
| 400 'name': {'param': 'Name'}, | |
| 401 'role': {'param': 'Role'}, | |
| 402 'labels': {'param': 'Labels'}, | |
| 403 } | |
| 404 | |
| 405 @staticmethod | |
| 406 def create_random_port_spec(port): | |
| 407 return { | |
| 408 'Protocol': 'tcp', | |
| 409 'PublishedPort': None, | |
| 410 'TargetPort': port, | |
| 411 } | |
| 412 | |
| 413 # | |
| 414 # docker subcommands | |
| 415 # | |
| 416 | |
| 417 def service_create(self, command, image=None, **kwopts): | |
| 418 # TODO: some of this should probably move to run_in_container when the CLI interface is removed | |
| 419 log.debug("Creating docker service with image '%s' for command: %s", image, command) | |
| 420 # insert run kwopts from config | |
| 421 for opt in self.conf_run_kwopts: | |
| 422 if self._conf[opt]: | |
| 423 kwopts[opt] = self._conf[opt] | |
| 424 # image is part of the container spec | |
| 425 kwopts['image'] = self._get_image(image) | |
| 426 # service constraints | |
| 427 kwopts['constraint'] = kwopts.get('constraint', []) | |
| 428 if self._conf.service_create_image_constraint: | |
| 429 kwopts['constraint'].append((IMAGE_CONSTRAINT + '==' + image)) | |
| 430 if self._conf.service_create_cpus_constraint: | |
| 431 cpus = kwopts.get('reserve_cpus', kwopts.get('limit_cpus', '1')) | |
| 432 kwopts['constraint'].append((CPUS_CONSTRAINT + '==' + cpus)) | |
| 433 # ports | |
| 434 if 'publish_port_random' in kwopts: | |
| 435 kwopts['ports'] = [DockerSwarmAPIInterface.create_random_port_spec(kwopts.pop('publish_port_random'))] | |
| 436 # create specs | |
| 437 service_mode = self._create_docker_api_spec('service_mode', docker.types.ServiceMode, kwopts) | |
| 438 endpoint_spec = self._create_docker_api_spec('endpoint_spec', docker.types.EndpointSpec, kwopts) | |
| 439 task_template = self._create_docker_api_spec('task_template', docker.types.TaskTemplate, kwopts) | |
| 440 self.set_kwopts_name(kwopts) | |
| 441 log.debug("Docker service task template:\n%s", safe_dumps_formatted(task_template)) | |
| 442 log.debug("Docker service endpoint specification:\n%s", safe_dumps_formatted(endpoint_spec)) | |
| 443 log.debug("Docker service mode:\n%s", safe_dumps_formatted(service_mode)) | |
| 444 log.debug("Docker service creation parameters:\n%s", safe_dumps_formatted(kwopts)) | |
| 445 success_test = partial(self._first, self.service_ls, name=kwopts['name']) | |
| 446 # this can raise exceptions, if necessary we could wrap them in a more generic "creation failed" exception class | |
| 447 service = self._client.create_service( | |
| 448 task_template, | |
| 449 mode=service_mode, | |
| 450 endpoint_spec=endpoint_spec, | |
| 451 success_test=success_test, | |
| 452 max_tries=5, | |
| 453 **kwopts) | |
| 454 service_id = service.get('ID') | |
| 455 log.debug('Created service: %s (%s)', kwopts['name'], service_id) | |
| 456 return DockerService.from_id(self, service_id) | |
| 457 | |
| 458 def service_inspect(self, service_id): | |
| 459 return self._client.inspect_service(service_id) | |
| 460 | |
| 461 def service_ls(self, id=None, name=None): | |
| 462 return self._client.services(filters=self._filter_by_id_or_name(id, name)) | |
| 463 | |
| 464 # roughly `docker service ps` | |
| 465 def service_ps(self, service_id): | |
| 466 return self.task_ls(filters={'service': service_id}) | |
| 467 | |
| 468 def service_rm(self, service_ids): | |
| 469 r = [] | |
| 470 for service_id in service_ids: | |
| 471 self._client.remove_service(service_id) | |
| 472 r.append(service_id) | |
| 473 return r | |
| 474 | |
| 475 def node_inspect(self, node_id): | |
| 476 return self._client.inspect_node(node_id) | |
| 477 | |
| 478 def node_ls(self, id=None, name=None): | |
| 479 return self._client.nodes(filters=self._filter_by_id_or_name(id, name)) | |
| 480 | |
| 481 # roughly `docker node ps` | |
| 482 def node_ps(self, node_id): | |
| 483 return self.task_ls(filters={'node': node_id}) | |
| 484 | |
| 485 def node_update(self, node_id, **kwopts): | |
| 486 node = DockerNode.from_id(self, node_id) | |
| 487 spec = node.inspect['Spec'] | |
| 488 if 'label_add' in kwopts: | |
| 489 kwopts['labels'] = spec.get('Labels', {}) | |
| 490 kwopts['labels'].update(kwopts.pop('label_add')) | |
| 491 spec.update(self._create_docker_api_spec('node_spec', dict, kwopts)) | |
| 492 return self._client.update_node(node.id, node.version, node_spec=spec) | |
| 493 | |
| 494 def task_inspect(self, task_id): | |
| 495 return self._client.inspect_task(task_id) | |
| 496 | |
| 497 def task_ls(self, filters=None): | |
| 498 return self._client.tasks(filters=filters) |
