comparison env/lib/python3.9/site-packages/galaxy/containers/docker_model.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """
2 Model objects for docker objects
3 """
4
5 import logging
6 import shlex
7
8 try:
9 import docker
10 except ImportError:
11 from galaxy.util.bunch import Bunch
12 docker = Bunch(errors=Bunch(NotFound=None))
13
14 from galaxy.containers import (
15 Container,
16 ContainerPort,
17 ContainerVolume
18 )
19 from galaxy.util import (
20 pretty_print_time_interval,
21 unicodify,
22 )
23
24
25 CPUS_LABEL = '_galaxy_cpus'
26 IMAGE_LABEL = '_galaxy_image'
27 CPUS_CONSTRAINT = 'node.labels.' + CPUS_LABEL
28 IMAGE_CONSTRAINT = 'node.labels.' + IMAGE_LABEL
29
30 log = logging.getLogger(__name__)
31
32
33 class DockerAttributeContainer:
34
35 def __init__(self, members=None):
36 if members is None:
37 members = set()
38 self._members = members
39
40 def __eq__(self, other):
41 return self.members == other.members
42
43 def __ne__(self, other):
44 return not self.__eq__(other)
45
46 def __hash__(self):
47 return hash(tuple(sorted([repr(x) for x in self._members])))
48
49 def __str__(self):
50 return ', '.join(str(x) for x in self._members) or 'None'
51
52 def __iter__(self):
53 return iter(self._members)
54
55 def __getitem__(self, name):
56 for member in self._members:
57 if member.name == name:
58 return member
59 else:
60 raise KeyError(name)
61
62 def __contains__(self, item):
63 return item in self._members
64
65 @property
66 def members(self):
67 return frozenset(self._members)
68
69 def hash(self):
70 return hex(self.__hash__())[2:]
71
72 def get(self, name, default):
73 try:
74 return self[name]
75 except KeyError:
76 return default
77
78
79 class DockerVolume(ContainerVolume):
80 @classmethod
81 def from_str(cls, as_str):
82 """Construct an instance from a string as would be passed to `docker run --volume`.
83
84 A string in the format ``<host_path>:<mode>`` is supported for legacy purposes even though it is not valid
85 Docker volume syntax.
86 """
87 if not as_str:
88 raise ValueError("Failed to parse Docker volume from %s" % as_str)
89 parts = as_str.split(":", 2)
90 kwds = dict(host_path=parts[0])
91 if len(parts) == 1:
92 # auto-generated volume
93 kwds["path"] = kwds["host_path"]
94 elif len(parts) == 2:
95 # /host_path:mode is not (or is no longer?) valid Docker volume syntax
96 if parts[1] in DockerVolume.valid_modes:
97 kwds["mode"] = parts[1]
98 kwds["path"] = kwds["host_path"]
99 else:
100 kwds["path"] = parts[1]
101 elif len(parts) == 3:
102 kwds["path"] = parts[1]
103 kwds["mode"] = parts[2]
104 return cls(**kwds)
105
106 def __str__(self):
107 volume_str = ":".join(filter(lambda x: x is not None, (self.host_path, self.path, self.mode)))
108 if "$" not in volume_str:
109 volume_for_cmd_line = shlex.quote(volume_str)
110 else:
111 # e.g. $_GALAXY_JOB_TMP_DIR:$_GALAXY_JOB_TMP_DIR:rw so don't single quote.
112 volume_for_cmd_line = '"%s"' % volume_str
113 return volume_for_cmd_line
114
115 def to_native(self):
116 host_path = self.host_path or self.path
117 return (self.path, {host_path: {'bind': self.path, 'mode': self.mode}})
118
119
120 class DockerContainer(Container):
121
122 def __init__(self, interface, id, name=None, inspect=None):
123 super().__init__(interface, id, name=name)
124 self._inspect = inspect
125
126 @classmethod
127 def from_id(cls, interface, id):
128 inspect = interface.inspect(id)
129 return cls(interface, id, name=inspect['Name'], inspect=inspect)
130
131 @property
132 def ports(self):
133 # {
134 # "NetworkSettings" : {
135 # "Ports" : {
136 # "3306/tcp" : [
137 # {
138 # "HostIp" : "127.0.0.1",
139 # "HostPort" : "3306"
140 # }
141 # ]
142 rval = []
143 try:
144 port_mappings = self.inspect['NetworkSettings']['Ports']
145 except KeyError:
146 log.warning("Failed to get ports for container %s from `docker inspect` output at "
147 "['NetworkSettings']['Ports']: %s: %s", self.id, exc_info=True)
148 return None
149 for port_name in port_mappings:
150 for binding in port_mappings[port_name]:
151 rval.append(ContainerPort(
152 int(port_name.split('/')[0]),
153 port_name.split('/')[1],
154 self.address,
155 int(binding['HostPort']),
156 ))
157 return rval
158
159 @property
160 def address(self):
161 if self._interface.host and self._interface.host.startswith('tcp://'):
162 return self._interface.host.replace('tcp://', '').split(':', 1)[0]
163 else:
164 return 'localhost'
165
166 def is_ready(self):
167 return self.inspect['State']['Running']
168
169 def __eq__(self, other):
170 return self._id == other.id
171
172 def __ne__(self, other):
173 return not self.__eq__(other)
174
175 def __hash__(self):
176 return hash(self._id)
177
178 @property
179 def inspect(self):
180 if not self._inspect:
181 self._inspect = self._interface.inspect(self._id)
182 return self._inspect
183
184
185 class DockerService(Container):
186
187 def __init__(self, interface, id, name=None, image=None, inspect=None):
188 super().__init__(interface, id, name=name)
189 self._image = image
190 self._inspect = inspect
191 self._env = {}
192 self._tasks = []
193 if inspect:
194 self._name = name or inspect['Spec']['Name']
195 self._image = image or inspect['Spec']['TaskTemplate']['ContainerSpec']['Image']
196
197 @classmethod
198 def from_cli(cls, interface, s, task_list):
199 service = cls(interface, s['ID'], name=s['NAME'], image=s['IMAGE'])
200 for task_dict in task_list:
201 if task_dict['NAME'].strip().startswith(r'\_'):
202 continue # historical task
203 service.task_add(DockerTask.from_cli(interface, task_dict, service=service))
204 return service
205
206 @classmethod
207 def from_id(cls, interface, id):
208 inspect = interface.service_inspect(id)
209 service = cls(interface, id, inspect=inspect)
210 for task in interface.service_tasks(service):
211 service.task_add(task)
212 return service
213
214 @property
215 def ports(self):
216 # {
217 # "Endpoint": {
218 # "Ports": [
219 # {
220 # "Protocol": "tcp",
221 # "TargetPort": 8888,
222 # "PublishedPort": 30000,
223 # "PublishMode": "ingress"
224 # }
225 # ]
226 rval = []
227 try:
228 port_mappings = self.inspect['Endpoint']['Ports']
229 except (IndexError, KeyError):
230 log.warning("Failed to get ports for container %s from `docker service inspect` output at "
231 "['Endpoint']['Ports']: %s: %s", self.id, exc_info=True)
232 return None
233 for binding in port_mappings:
234 rval.append(ContainerPort(
235 binding['TargetPort'],
236 binding['Protocol'],
237 self.address, # use the routing mesh
238 binding['PublishedPort']
239 ))
240 return rval
241
242 @property
243 def address(self):
244 if self._interface.host and self._interface.host.startswith('tcp://'):
245 return self._interface.host.replace('tcp://', '').split(':', 1)[0]
246 else:
247 return 'localhost'
248
249 def is_ready(self):
250 return self.in_state('Running', 'Running')
251
252 def __eq__(self, other):
253 return self._id == other.id
254
255 def __ne__(self, other):
256 return not self.__eq__(other)
257
258 def __hash__(self):
259 return hash(self._id)
260
261 def task_add(self, task):
262 self._tasks.append(task)
263
264 @property
265 def inspect(self):
266 if not self._inspect:
267 self._inspect = self._interface.service_inspect(self._id)
268 return self._inspect
269
270 @property
271 def state(self):
272 """If one of this service's tasks desired state is running, return that task state, otherwise, return the state
273 of a non-running task.
274
275 This is imperfect because it doesn't attempt to provide useful information for replicas > 1 tasks, but it suits
276 our purposes for now.
277 """
278 state = None
279 for task in self.tasks:
280 state = task.state
281 if task.desired_state == 'running':
282 break
283 return state
284
285 @property
286 def env(self):
287 if not self._env:
288 try:
289 for env_str in self.inspect['Spec']['TaskTemplate']['ContainerSpec']['Env']:
290 try:
291 self._env.update([env_str.split('=', 1)])
292 except ValueError:
293 self._env[env_str] = None
294 except KeyError as exc:
295 log.debug('Cannot retrieve container environment: KeyError: %s', unicodify(exc))
296 return self._env
297
298 @property
299 def terminal(self):
300 """Same caveats as :meth:`state`.
301 """
302 for task in self.tasks:
303 if task.desired_state == 'running':
304 return False
305 return True
306
307 @property
308 def node(self):
309 """Same caveats as :meth:`state`.
310 """
311 for task in self.tasks:
312 if task.node is not None:
313 return task.node
314 return None
315
316 @property
317 def image(self):
318 if self._image is None:
319 self._image = self.inspect['Spec']['TaskTemplate']['ContainerSpec']['Image']
320 return self._image
321
322 @property
323 def cpus(self):
324 try:
325 cpus = self.inspect['Spec']['TaskTemplate']['Resources']['Limits']['NanoCPUs'] / 1000000000.0
326 if cpus == int(cpus):
327 cpus = int(cpus)
328 return cpus
329 except KeyError:
330 return 0
331
332 @property
333 def constraints(self):
334 constraints = self.inspect['Spec']['TaskTemplate']['Placement'].get('Constraints', [])
335 return DockerServiceConstraints.from_constraint_string_list(constraints)
336
337 @property
338 def tasks(self):
339 """A list of *all* tasks, including terminal ones.
340 """
341 if not self._tasks:
342 self._tasks = []
343 for task in self._interface.service_tasks(self):
344 self.task_add(task)
345 return self._tasks
346
347 @property
348 def task_count(self):
349 """A count of *all* tasks, including terminal ones.
350 """
351 return len(self.tasks)
352
353 def in_state(self, desired, current, tasks='any'):
354 """Indicate if one of this service's tasks matches the desired state.
355 """
356 for task in self.tasks:
357 if task.in_state(desired, current):
358 if tasks == 'any':
359 # at least 1 task in desired state
360 return True
361 elif tasks == 'all':
362 # at least 1 task not in desired state
363 return False
364 else:
365 return False if tasks == 'any' else True
366
367 def constraint_add(self, name, op, value):
368 self._interface.service_constraint_add(self.id, name, op, value)
369
370 def set_cpus(self):
371 self.constraint_add(CPUS_LABEL, '==', self.cpus)
372
373 def set_image(self):
374 self.constraint_add(IMAGE_LABEL, '==', self.image)
375
376
377 class DockerServiceConstraint:
378
379 def __init__(self, name=None, op=None, value=None):
380 self._name = name
381 self._op = op
382 self._value = value
383
384 def __eq__(self, other):
385 return self._name == other._name and \
386 self._op == other._op and \
387 self._value == other._value
388
389 def __ne__(self, other):
390 return not self.__eq__(other)
391
392 def __hash__(self):
393 return hash((self._name, self._op, self._value))
394
395 def __repr__(self):
396 return f'{self.__class__.__name__}({self._name}{self._op}{self._value})'
397
398 def __str__(self):
399 return f'{self._name}{self._op}{self._value}'
400
401 @staticmethod
402 def split_constraint_string(constraint_str):
403 constraint = (constraint_str, '', '')
404 for op in '==', '!=':
405 t = constraint_str.partition(op)
406 if len(t[0]) < len(constraint[0]):
407 constraint = t
408 if constraint[0] == constraint_str:
409 raise Exception('Unable to parse constraint string: %s' % constraint_str)
410 return [x.strip() for x in constraint]
411
412 @classmethod
413 def from_str(cls, constraint_str):
414 name, op, value = DockerServiceConstraint.split_constraint_string(constraint_str)
415 return cls(name=name, op=op, value=value)
416
417 @property
418 def name(self):
419 return self._name
420
421 @property
422 def op(self):
423 return self._op
424
425 @property
426 def value(self):
427 return self._value
428
429 @property
430 def label(self):
431 return DockerNodeLabel(
432 name=self.name.replace('node.labels.', ''),
433 value=self.value
434 )
435
436
437 class DockerServiceConstraints(DockerAttributeContainer):
438
439 member_class = DockerServiceConstraint
440
441 @classmethod
442 def from_constraint_string_list(cls, inspect):
443 members = []
444 for member_str in inspect:
445 members.append(cls.member_class.from_str(member_str))
446 return cls(members=members)
447
448 @property
449 def labels(self):
450 return DockerNodeLabels(members=[x.label for x in self.members])
451
452
453 class DockerNode:
454
455 def __init__(self, interface, id=None, name=None, status=None,
456 availability=None, manager=False, inspect=None):
457 self._interface = interface
458 self._id = id
459 self._name = name
460 self._status = status
461 self._availability = availability
462 self._manager = manager
463 self._inspect = inspect
464 if inspect:
465 self._name = name or inspect['Description']['Hostname']
466 self._status = status or inspect['Status']['State']
467 self._availability = inspect['Spec']['Availability']
468 self._manager = manager or inspect['Spec']['Role'] == 'manager'
469 self._tasks = []
470
471 @classmethod
472 def from_cli(cls, interface, n, task_list):
473 node = cls(interface, id=n['ID'], name=n['HOSTNAME'], status=n['STATUS'],
474 availability=n['AVAILABILITY'], manager=True if n['MANAGER STATUS'] else False)
475 for task_dict in task_list:
476 node.task_add(DockerTask.from_cli(interface, task_dict, node=node))
477 return node
478
479 @classmethod
480 def from_id(cls, interface, id):
481 inspect = interface.node_inspect(id)
482 node = cls(interface, id, inspect=inspect)
483 for task in interface.node_tasks(node):
484 node.task_add(task)
485 return node
486
487 def task_add(self, task):
488 self._tasks.append(task)
489
490 @property
491 def id(self):
492 return self._id
493
494 @property
495 def name(self):
496 return self._name
497
498 @property
499 def version(self):
500 # this changes on update so don't cache
501 return self._interface.node_inspect(self._id or self._name)['Version']['Index']
502
503 @property
504 def inspect(self):
505 if not self._inspect:
506 self._inspect = self._interface.node_inspect(self._id or self._name)
507 return self._inspect
508
509 @property
510 def state(self):
511 return (f'{self._status}-{self._availability}').lower()
512
513 @property
514 def cpus(self):
515 return self.inspect['Description']['Resources']['NanoCPUs'] / 1000000000
516
517 @property
518 def labels(self):
519 labels = self.inspect['Spec'].get('Labels', {}) or {}
520 return DockerNodeLabels.from_label_dictionary(labels)
521
522 def label_add(self, label, value):
523 self._interface.node_update(self.id, label_add={label: value})
524
525 @property
526 def labels_as_constraints(self):
527 constraints_strings = [x.constraint_string for x in self.labels]
528 return DockerServiceConstraints.from_constraint_string_list(constraints_strings)
529
530 def set_labels_for_constraints(self, constraints):
531 for label in self._constraints_to_label_args(constraints):
532 if label not in self.labels:
533 log.info("setting node '%s' label '%s' to '%s'", self.name, label.name, label.value)
534 self.label_add(label.name, label.value)
535
536 def _constraints_to_label_args(self, constraints):
537 constraints = filter(lambda x: x.name.startswith('node.labels.') and x.op == '==', constraints)
538 labels = map(lambda x: DockerNodeLabel(name=x.name.replace('node.labels.', '', 1), value=x.value), constraints)
539 return labels
540
541 @property
542 def tasks(self):
543 """A list of *all* tasks, including terminal ones.
544 """
545 if not self._tasks:
546 self._tasks = []
547 for task in self._interface.node_tasks(self):
548 self.task_add(task)
549 return self._tasks
550
551 @property
552 def non_terminal_tasks(self):
553 r = []
554 for task in self.tasks:
555 # ensure the task has a service - it is possible for "phantom" tasks to exist (service is removed, no
556 # container is running, but the task still shows up in the node's task list)
557 if not task.terminal and task.service is not None:
558 r.append(task)
559 return r
560
561 @property
562 def task_count(self):
563 """A count of *all* tasks, including terminal ones.
564 """
565 return len(self.tasks)
566
567 def in_state(self, status, availability):
568 return self._status.lower() == status.lower() and self._availability.lower() == availability.lower()
569
570 def is_ok(self):
571 return self.in_state('Ready', 'Active')
572
573 def is_managed(self):
574 return not self._manager
575
576 def destroyable(self):
577 return not self._manager and self.is_ok() and self.task_count == 0
578
579 def drain(self):
580 self._interface.node_update(self.id, availability='drain')
581
582
583 class DockerNodeLabel:
584
585 def __init__(self, name=None, value=None):
586 self._name = name
587 self._value = value
588
589 def __eq__(self, other):
590 return self._name == other._name and \
591 self._value == other._value
592
593 def __ne__(self, other):
594 return not self.__eq__(other)
595
596 def __hash__(self):
597 return hash((self._name, self._value))
598
599 def __repr__(self):
600 return f'{self.__class__.__name__}({self._name}: {self._value})'
601
602 def __str__(self):
603 return f'{self._name}: {self._value}'
604
605 @property
606 def name(self):
607 return self._name
608
609 @property
610 def value(self):
611 return self._value
612
613 @property
614 def constraint_string(self):
615 return f'node.labels.{self.name}=={self.value}'
616
617 @property
618 def constraint(self):
619 return DockerServiceConstraint(
620 name=f'node.labels.{self.name}',
621 op='==',
622 value=self.value
623 )
624
625
626 class DockerNodeLabels(DockerAttributeContainer):
627
628 member_class = DockerNodeLabel
629
630 @classmethod
631 def from_label_dictionary(cls, inspect):
632 members = []
633 for k, v in inspect.items():
634 members.append(cls.member_class(name=k, value=v))
635 return cls(members=members)
636
637 @property
638 def constraints(self):
639 return DockerServiceConstraints(members=[x.constraint for x in self.members])
640
641
642 class DockerTask:
643
644 # these are the possible *current* state terminal states
645 terminal_states = (
646 'shutdown', # this is normally only a desired state but I've seen a task with it as current as well
647 'complete',
648 'failed',
649 'rejected',
650 'orphaned',
651 )
652
653 def __init__(self, interface, id=None, name=None, image=None, desired_state=None,
654 state=None, error=None, ports=None, service=None, node=None):
655 self._interface = interface
656 self._id = id
657 self._name = name
658 self._image = image
659 self._desired_state = desired_state
660 self._state = state
661 self._error = error
662 self._ports = ports
663 self._service = service
664 self._node = node
665 self._inspect = None
666
667 @classmethod
668 def from_cli(cls, interface, t, service=None, node=None):
669 state = t['CURRENT STATE'].split()[0]
670 return cls(interface, id=t['ID'], name=t['NAME'], image=t['IMAGE'],
671 desired_state=t['DESIRED STATE'], state=state, error=t['ERROR'],
672 ports=t['PORTS'], service=service, node=node)
673
674 @classmethod
675 def from_api(cls, interface, t, service=None, node=None):
676 service = service or interface.service(id=t.get('ServiceID'))
677 node = node or interface.node(id=t.get('NodeID'))
678 if service:
679 name = service.name + '.' + str(t['Slot'])
680 else:
681 name = t['ID']
682 image = t['Spec']['ContainerSpec']['Image'].split('@', 1)[0], # remove pin
683 return cls(interface, id=t['ID'], name=name, image=image, desired_state=t['DesiredState'],
684 state=t['Status']['State'], ports=t['Status']['PortStatus'], error=t['Status']['Message'],
685 service=service, node=node)
686
687 @property
688 def id(self):
689 return self._id
690
691 @property
692 def name(self):
693 return self._name
694
695 @property
696 def inspect(self):
697 if not self._inspect:
698 try:
699 self._inspect = self._interface.task_inspect(self._id)
700 except docker.errors.NotFound:
701 # This shouldn't be possible, appears to be some kind of Swarm bug (the node claims to have a task that
702 # does not actually exist anymore, nor does its service exist).
703 log.error('Task could not be inspected because Docker claims it does not exist: %s (%s)',
704 self.name, self.id)
705 return None
706 return self._inspect
707
708 @property
709 def slot(self):
710 try:
711 return self.inspect['Slot']
712 except TypeError:
713 return None
714
715 @property
716 def node(self):
717 if not self._node:
718 try:
719 self._node = self._interface.node(id=self.inspect['NodeID'])
720 except TypeError:
721 return None
722 return self._node
723
724 @property
725 def service(self):
726 if not self._service:
727 try:
728 self._service = self._interface.service(id=self.inspect['ServiceID'])
729 except TypeError:
730 return None
731 return self._service
732
733 @property
734 def cpus(self):
735 try:
736 cpus = self.inspect['Spec']['Resources']['Reservations']['NanoCPUs'] / 1000000000.0
737 if cpus == int(cpus):
738 cpus = int(cpus)
739 return cpus
740 except TypeError:
741 return None
742 except KeyError:
743 return 0
744
745 @property
746 def state(self):
747 return (f'{self._desired_state}-{self._state}').lower()
748
749 @property
750 def current_state(self):
751 try:
752 return self._state.lower()
753 except TypeError:
754 log.warning("Current state of %s (%s) is not a string: %s", self.name, self.id, str(self._state))
755 return None
756
757 @property
758 def current_state_time(self):
759 # Docker API returns a stamp w/ higher second precision than Python takes
760 try:
761 stamp = self.inspect['Status']['Timestamp']
762 except TypeError:
763 return None
764 return pretty_print_time_interval(time=stamp[:stamp.index('.') + 7], precise=True, utc=stamp[-1] == 'Z')
765
766 @property
767 def desired_state(self):
768 try:
769 return self._desired_state.lower()
770 except TypeError:
771 log.warning("Desired state of %s (%s) is not a string: %s", self.name, self.id, str(self._desired_state))
772 return None
773
774 @property
775 def terminal(self):
776 return self.desired_state == 'shutdown' and self.current_state in self.terminal_states
777
778 def in_state(self, desired, current):
779 return self.desired_state == desired.lower() and self.current_state == current.lower()