diff env/lib/python3.9/site-packages/galaxy/containers/docker_model.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/galaxy/containers/docker_model.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,779 @@
+"""
+Model objects for docker objects
+"""
+
+import logging
+import shlex
+
+try:
+    import docker
+except ImportError:
+    from galaxy.util.bunch import Bunch
+    docker = Bunch(errors=Bunch(NotFound=None))
+
+from galaxy.containers import (
+    Container,
+    ContainerPort,
+    ContainerVolume
+)
+from galaxy.util import (
+    pretty_print_time_interval,
+    unicodify,
+)
+
+
+CPUS_LABEL = '_galaxy_cpus'
+IMAGE_LABEL = '_galaxy_image'
+CPUS_CONSTRAINT = 'node.labels.' + CPUS_LABEL
+IMAGE_CONSTRAINT = 'node.labels.' + IMAGE_LABEL
+
+log = logging.getLogger(__name__)
+
+
+class DockerAttributeContainer:
+
+    def __init__(self, members=None):
+        if members is None:
+            members = set()
+        self._members = members
+
+    def __eq__(self, other):
+        return self.members == other.members
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def __hash__(self):
+        return hash(tuple(sorted([repr(x) for x in self._members])))
+
+    def __str__(self):
+        return ', '.join(str(x) for x in self._members) or 'None'
+
+    def __iter__(self):
+        return iter(self._members)
+
+    def __getitem__(self, name):
+        for member in self._members:
+            if member.name == name:
+                return member
+        else:
+            raise KeyError(name)
+
+    def __contains__(self, item):
+        return item in self._members
+
+    @property
+    def members(self):
+        return frozenset(self._members)
+
+    def hash(self):
+        return hex(self.__hash__())[2:]
+
+    def get(self, name, default):
+        try:
+            return self[name]
+        except KeyError:
+            return default
+
+
+class DockerVolume(ContainerVolume):
+    @classmethod
+    def from_str(cls, as_str):
+        """Construct an instance from a string as would be passed to `docker run --volume`.
+
+        A string in the format ``<host_path>:<mode>`` is supported for legacy purposes even though it is not valid
+        Docker volume syntax.
+        """
+        if not as_str:
+            raise ValueError("Failed to parse Docker volume from %s" % as_str)
+        parts = as_str.split(":", 2)
+        kwds = dict(host_path=parts[0])
+        if len(parts) == 1:
+            # auto-generated volume
+            kwds["path"] = kwds["host_path"]
+        elif len(parts) == 2:
+            # /host_path:mode is not (or is no longer?) valid Docker volume syntax
+            if parts[1] in DockerVolume.valid_modes:
+                kwds["mode"] = parts[1]
+                kwds["path"] = kwds["host_path"]
+            else:
+                kwds["path"] = parts[1]
+        elif len(parts) == 3:
+            kwds["path"] = parts[1]
+            kwds["mode"] = parts[2]
+        return cls(**kwds)
+
+    def __str__(self):
+        volume_str = ":".join(filter(lambda x: x is not None, (self.host_path, self.path, self.mode)))
+        if "$" not in volume_str:
+            volume_for_cmd_line = shlex.quote(volume_str)
+        else:
+            # e.g. $_GALAXY_JOB_TMP_DIR:$_GALAXY_JOB_TMP_DIR:rw so don't single quote.
+            volume_for_cmd_line = '"%s"' % volume_str
+        return volume_for_cmd_line
+
+    def to_native(self):
+        host_path = self.host_path or self.path
+        return (self.path, {host_path: {'bind': self.path, 'mode': self.mode}})
+
+
+class DockerContainer(Container):
+
+    def __init__(self, interface, id, name=None, inspect=None):
+        super().__init__(interface, id, name=name)
+        self._inspect = inspect
+
+    @classmethod
+    def from_id(cls, interface, id):
+        inspect = interface.inspect(id)
+        return cls(interface, id, name=inspect['Name'], inspect=inspect)
+
+    @property
+    def ports(self):
+        # {
+        #     "NetworkSettings" : {
+        #         "Ports" : {
+        #             "3306/tcp" : [
+        #                 {
+        #                     "HostIp" : "127.0.0.1",
+        #                     "HostPort" : "3306"
+        #                 }
+        #             ]
+        rval = []
+        try:
+            port_mappings = self.inspect['NetworkSettings']['Ports']
+        except KeyError:
+            log.warning("Failed to get ports for container %s from `docker inspect` output at "
+                        "['NetworkSettings']['Ports']: %s: %s", self.id, exc_info=True)
+            return None
+        for port_name in port_mappings:
+            for binding in port_mappings[port_name]:
+                rval.append(ContainerPort(
+                    int(port_name.split('/')[0]),
+                    port_name.split('/')[1],
+                    self.address,
+                    int(binding['HostPort']),
+                ))
+        return rval
+
+    @property
+    def address(self):
+        if self._interface.host and self._interface.host.startswith('tcp://'):
+            return self._interface.host.replace('tcp://', '').split(':', 1)[0]
+        else:
+            return 'localhost'
+
+    def is_ready(self):
+        return self.inspect['State']['Running']
+
+    def __eq__(self, other):
+        return self._id == other.id
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def __hash__(self):
+        return hash(self._id)
+
+    @property
+    def inspect(self):
+        if not self._inspect:
+            self._inspect = self._interface.inspect(self._id)
+        return self._inspect
+
+
+class DockerService(Container):
+
+    def __init__(self, interface, id, name=None, image=None, inspect=None):
+        super().__init__(interface, id, name=name)
+        self._image = image
+        self._inspect = inspect
+        self._env = {}
+        self._tasks = []
+        if inspect:
+            self._name = name or inspect['Spec']['Name']
+            self._image = image or inspect['Spec']['TaskTemplate']['ContainerSpec']['Image']
+
+    @classmethod
+    def from_cli(cls, interface, s, task_list):
+        service = cls(interface, s['ID'], name=s['NAME'], image=s['IMAGE'])
+        for task_dict in task_list:
+            if task_dict['NAME'].strip().startswith(r'\_'):
+                continue    # historical task
+            service.task_add(DockerTask.from_cli(interface, task_dict, service=service))
+        return service
+
+    @classmethod
+    def from_id(cls, interface, id):
+        inspect = interface.service_inspect(id)
+        service = cls(interface, id, inspect=inspect)
+        for task in interface.service_tasks(service):
+            service.task_add(task)
+        return service
+
+    @property
+    def ports(self):
+        # {
+        #     "Endpoint": {
+        #         "Ports": [
+        #             {
+        #                 "Protocol": "tcp",
+        #                 "TargetPort": 8888,
+        #                 "PublishedPort": 30000,
+        #                 "PublishMode": "ingress"
+        #             }
+        #         ]
+        rval = []
+        try:
+            port_mappings = self.inspect['Endpoint']['Ports']
+        except (IndexError, KeyError):
+            log.warning("Failed to get ports for container %s from `docker service inspect` output at "
+                        "['Endpoint']['Ports']: %s: %s", self.id, exc_info=True)
+            return None
+        for binding in port_mappings:
+            rval.append(ContainerPort(
+                binding['TargetPort'],
+                binding['Protocol'],
+                self.address,               # use the routing mesh
+                binding['PublishedPort']
+            ))
+        return rval
+
+    @property
+    def address(self):
+        if self._interface.host and self._interface.host.startswith('tcp://'):
+            return self._interface.host.replace('tcp://', '').split(':', 1)[0]
+        else:
+            return 'localhost'
+
+    def is_ready(self):
+        return self.in_state('Running', 'Running')
+
+    def __eq__(self, other):
+        return self._id == other.id
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def __hash__(self):
+        return hash(self._id)
+
+    def task_add(self, task):
+        self._tasks.append(task)
+
+    @property
+    def inspect(self):
+        if not self._inspect:
+            self._inspect = self._interface.service_inspect(self._id)
+        return self._inspect
+
+    @property
+    def state(self):
+        """If one of this service's tasks desired state is running, return that task state, otherwise, return the state
+        of a non-running task.
+
+        This is imperfect because it doesn't attempt to provide useful information for replicas > 1 tasks, but it suits
+        our purposes for now.
+        """
+        state = None
+        for task in self.tasks:
+            state = task.state
+            if task.desired_state == 'running':
+                break
+        return state
+
+    @property
+    def env(self):
+        if not self._env:
+            try:
+                for env_str in self.inspect['Spec']['TaskTemplate']['ContainerSpec']['Env']:
+                    try:
+                        self._env.update([env_str.split('=', 1)])
+                    except ValueError:
+                        self._env[env_str] = None
+            except KeyError as exc:
+                log.debug('Cannot retrieve container environment: KeyError: %s', unicodify(exc))
+        return self._env
+
+    @property
+    def terminal(self):
+        """Same caveats as :meth:`state`.
+        """
+        for task in self.tasks:
+            if task.desired_state == 'running':
+                return False
+        return True
+
+    @property
+    def node(self):
+        """Same caveats as :meth:`state`.
+        """
+        for task in self.tasks:
+            if task.node is not None:
+                return task.node
+        return None
+
+    @property
+    def image(self):
+        if self._image is None:
+            self._image = self.inspect['Spec']['TaskTemplate']['ContainerSpec']['Image']
+        return self._image
+
+    @property
+    def cpus(self):
+        try:
+            cpus = self.inspect['Spec']['TaskTemplate']['Resources']['Limits']['NanoCPUs'] / 1000000000.0
+            if cpus == int(cpus):
+                cpus = int(cpus)
+            return cpus
+        except KeyError:
+            return 0
+
+    @property
+    def constraints(self):
+        constraints = self.inspect['Spec']['TaskTemplate']['Placement'].get('Constraints', [])
+        return DockerServiceConstraints.from_constraint_string_list(constraints)
+
+    @property
+    def tasks(self):
+        """A list of *all* tasks, including terminal ones.
+        """
+        if not self._tasks:
+            self._tasks = []
+            for task in self._interface.service_tasks(self):
+                self.task_add(task)
+        return self._tasks
+
+    @property
+    def task_count(self):
+        """A count of *all* tasks, including terminal ones.
+        """
+        return len(self.tasks)
+
+    def in_state(self, desired, current, tasks='any'):
+        """Indicate if one of this service's tasks matches the desired state.
+        """
+        for task in self.tasks:
+            if task.in_state(desired, current):
+                if tasks == 'any':
+                    # at least 1 task in desired state
+                    return True
+            elif tasks == 'all':
+                # at least 1 task not in desired state
+                return False
+        else:
+            return False if tasks == 'any' else True
+
+    def constraint_add(self, name, op, value):
+        self._interface.service_constraint_add(self.id, name, op, value)
+
+    def set_cpus(self):
+        self.constraint_add(CPUS_LABEL, '==', self.cpus)
+
+    def set_image(self):
+        self.constraint_add(IMAGE_LABEL, '==', self.image)
+
+
+class DockerServiceConstraint:
+
+    def __init__(self, name=None, op=None, value=None):
+        self._name = name
+        self._op = op
+        self._value = value
+
+    def __eq__(self, other):
+        return self._name == other._name and \
+            self._op == other._op and \
+            self._value == other._value
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def __hash__(self):
+        return hash((self._name, self._op, self._value))
+
+    def __repr__(self):
+        return f'{self.__class__.__name__}({self._name}{self._op}{self._value})'
+
+    def __str__(self):
+        return f'{self._name}{self._op}{self._value}'
+
+    @staticmethod
+    def split_constraint_string(constraint_str):
+        constraint = (constraint_str, '', '')
+        for op in '==', '!=':
+            t = constraint_str.partition(op)
+            if len(t[0]) < len(constraint[0]):
+                constraint = t
+        if constraint[0] == constraint_str:
+            raise Exception('Unable to parse constraint string: %s' % constraint_str)
+        return [x.strip() for x in constraint]
+
+    @classmethod
+    def from_str(cls, constraint_str):
+        name, op, value = DockerServiceConstraint.split_constraint_string(constraint_str)
+        return cls(name=name, op=op, value=value)
+
+    @property
+    def name(self):
+        return self._name
+
+    @property
+    def op(self):
+        return self._op
+
+    @property
+    def value(self):
+        return self._value
+
+    @property
+    def label(self):
+        return DockerNodeLabel(
+            name=self.name.replace('node.labels.', ''),
+            value=self.value
+        )
+
+
+class DockerServiceConstraints(DockerAttributeContainer):
+
+    member_class = DockerServiceConstraint
+
+    @classmethod
+    def from_constraint_string_list(cls, inspect):
+        members = []
+        for member_str in inspect:
+            members.append(cls.member_class.from_str(member_str))
+        return cls(members=members)
+
+    @property
+    def labels(self):
+        return DockerNodeLabels(members=[x.label for x in self.members])
+
+
+class DockerNode:
+
+    def __init__(self, interface, id=None, name=None, status=None,
+                 availability=None, manager=False, inspect=None):
+        self._interface = interface
+        self._id = id
+        self._name = name
+        self._status = status
+        self._availability = availability
+        self._manager = manager
+        self._inspect = inspect
+        if inspect:
+            self._name = name or inspect['Description']['Hostname']
+            self._status = status or inspect['Status']['State']
+            self._availability = inspect['Spec']['Availability']
+            self._manager = manager or inspect['Spec']['Role'] == 'manager'
+        self._tasks = []
+
+    @classmethod
+    def from_cli(cls, interface, n, task_list):
+        node = cls(interface, id=n['ID'], name=n['HOSTNAME'], status=n['STATUS'],
+                   availability=n['AVAILABILITY'], manager=True if n['MANAGER STATUS'] else False)
+        for task_dict in task_list:
+            node.task_add(DockerTask.from_cli(interface, task_dict, node=node))
+        return node
+
+    @classmethod
+    def from_id(cls, interface, id):
+        inspect = interface.node_inspect(id)
+        node = cls(interface, id, inspect=inspect)
+        for task in interface.node_tasks(node):
+            node.task_add(task)
+        return node
+
+    def task_add(self, task):
+        self._tasks.append(task)
+
+    @property
+    def id(self):
+        return self._id
+
+    @property
+    def name(self):
+        return self._name
+
+    @property
+    def version(self):
+        # this changes on update so don't cache
+        return self._interface.node_inspect(self._id or self._name)['Version']['Index']
+
+    @property
+    def inspect(self):
+        if not self._inspect:
+            self._inspect = self._interface.node_inspect(self._id or self._name)
+        return self._inspect
+
+    @property
+    def state(self):
+        return (f'{self._status}-{self._availability}').lower()
+
+    @property
+    def cpus(self):
+        return self.inspect['Description']['Resources']['NanoCPUs'] / 1000000000
+
+    @property
+    def labels(self):
+        labels = self.inspect['Spec'].get('Labels', {}) or {}
+        return DockerNodeLabels.from_label_dictionary(labels)
+
+    def label_add(self, label, value):
+        self._interface.node_update(self.id, label_add={label: value})
+
+    @property
+    def labels_as_constraints(self):
+        constraints_strings = [x.constraint_string for x in self.labels]
+        return DockerServiceConstraints.from_constraint_string_list(constraints_strings)
+
+    def set_labels_for_constraints(self, constraints):
+        for label in self._constraints_to_label_args(constraints):
+            if label not in self.labels:
+                log.info("setting node '%s' label '%s' to '%s'", self.name, label.name, label.value)
+                self.label_add(label.name, label.value)
+
+    def _constraints_to_label_args(self, constraints):
+        constraints = filter(lambda x: x.name.startswith('node.labels.') and x.op == '==', constraints)
+        labels = map(lambda x: DockerNodeLabel(name=x.name.replace('node.labels.', '', 1), value=x.value), constraints)
+        return labels
+
+    @property
+    def tasks(self):
+        """A list of *all* tasks, including terminal ones.
+        """
+        if not self._tasks:
+            self._tasks = []
+            for task in self._interface.node_tasks(self):
+                self.task_add(task)
+        return self._tasks
+
+    @property
+    def non_terminal_tasks(self):
+        r = []
+        for task in self.tasks:
+            # ensure the task has a service - it is possible for "phantom" tasks to exist (service is removed, no
+            # container is running, but the task still shows up in the node's task list)
+            if not task.terminal and task.service is not None:
+                r.append(task)
+        return r
+
+    @property
+    def task_count(self):
+        """A count of *all* tasks, including terminal ones.
+        """
+        return len(self.tasks)
+
+    def in_state(self, status, availability):
+        return self._status.lower() == status.lower() and self._availability.lower() == availability.lower()
+
+    def is_ok(self):
+        return self.in_state('Ready', 'Active')
+
+    def is_managed(self):
+        return not self._manager
+
+    def destroyable(self):
+        return not self._manager and self.is_ok() and self.task_count == 0
+
+    def drain(self):
+        self._interface.node_update(self.id, availability='drain')
+
+
+class DockerNodeLabel:
+
+    def __init__(self, name=None, value=None):
+        self._name = name
+        self._value = value
+
+    def __eq__(self, other):
+        return self._name == other._name and \
+            self._value == other._value
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def __hash__(self):
+        return hash((self._name, self._value))
+
+    def __repr__(self):
+        return f'{self.__class__.__name__}({self._name}: {self._value})'
+
+    def __str__(self):
+        return f'{self._name}: {self._value}'
+
+    @property
+    def name(self):
+        return self._name
+
+    @property
+    def value(self):
+        return self._value
+
+    @property
+    def constraint_string(self):
+        return f'node.labels.{self.name}=={self.value}'
+
+    @property
+    def constraint(self):
+        return DockerServiceConstraint(
+            name=f'node.labels.{self.name}',
+            op='==',
+            value=self.value
+        )
+
+
+class DockerNodeLabels(DockerAttributeContainer):
+
+    member_class = DockerNodeLabel
+
+    @classmethod
+    def from_label_dictionary(cls, inspect):
+        members = []
+        for k, v in inspect.items():
+            members.append(cls.member_class(name=k, value=v))
+        return cls(members=members)
+
+    @property
+    def constraints(self):
+        return DockerServiceConstraints(members=[x.constraint for x in self.members])
+
+
+class DockerTask:
+
+    # these are the possible *current* state terminal states
+    terminal_states = (
+        'shutdown',  # this is normally only a desired state but I've seen a task with it as current as well
+        'complete',
+        'failed',
+        'rejected',
+        'orphaned',
+    )
+
+    def __init__(self, interface, id=None, name=None, image=None, desired_state=None,
+                 state=None, error=None, ports=None, service=None, node=None):
+        self._interface = interface
+        self._id = id
+        self._name = name
+        self._image = image
+        self._desired_state = desired_state
+        self._state = state
+        self._error = error
+        self._ports = ports
+        self._service = service
+        self._node = node
+        self._inspect = None
+
+    @classmethod
+    def from_cli(cls, interface, t, service=None, node=None):
+        state = t['CURRENT STATE'].split()[0]
+        return cls(interface, id=t['ID'], name=t['NAME'], image=t['IMAGE'],
+                   desired_state=t['DESIRED STATE'], state=state, error=t['ERROR'],
+                   ports=t['PORTS'], service=service, node=node)
+
+    @classmethod
+    def from_api(cls, interface, t, service=None, node=None):
+        service = service or interface.service(id=t.get('ServiceID'))
+        node = node or interface.node(id=t.get('NodeID'))
+        if service:
+            name = service.name + '.' + str(t['Slot'])
+        else:
+            name = t['ID']
+        image = t['Spec']['ContainerSpec']['Image'].split('@', 1)[0],  # remove pin
+        return cls(interface, id=t['ID'], name=name, image=image, desired_state=t['DesiredState'],
+                   state=t['Status']['State'], ports=t['Status']['PortStatus'], error=t['Status']['Message'],
+                   service=service, node=node)
+
+    @property
+    def id(self):
+        return self._id
+
+    @property
+    def name(self):
+        return self._name
+
+    @property
+    def inspect(self):
+        if not self._inspect:
+            try:
+                self._inspect = self._interface.task_inspect(self._id)
+            except docker.errors.NotFound:
+                # This shouldn't be possible, appears to be some kind of Swarm bug (the node claims to have a task that
+                # does not actually exist anymore, nor does its service exist).
+                log.error('Task could not be inspected because Docker claims it does not exist: %s (%s)',
+                          self.name, self.id)
+                return None
+        return self._inspect
+
+    @property
+    def slot(self):
+        try:
+            return self.inspect['Slot']
+        except TypeError:
+            return None
+
+    @property
+    def node(self):
+        if not self._node:
+            try:
+                self._node = self._interface.node(id=self.inspect['NodeID'])
+            except TypeError:
+                return None
+        return self._node
+
+    @property
+    def service(self):
+        if not self._service:
+            try:
+                self._service = self._interface.service(id=self.inspect['ServiceID'])
+            except TypeError:
+                return None
+        return self._service
+
+    @property
+    def cpus(self):
+        try:
+            cpus = self.inspect['Spec']['Resources']['Reservations']['NanoCPUs'] / 1000000000.0
+            if cpus == int(cpus):
+                cpus = int(cpus)
+            return cpus
+        except TypeError:
+            return None
+        except KeyError:
+            return 0
+
+    @property
+    def state(self):
+        return (f'{self._desired_state}-{self._state}').lower()
+
+    @property
+    def current_state(self):
+        try:
+            return self._state.lower()
+        except TypeError:
+            log.warning("Current state of %s (%s) is not a string: %s", self.name, self.id, str(self._state))
+            return None
+
+    @property
+    def current_state_time(self):
+        # Docker API returns a stamp w/ higher second precision than Python takes
+        try:
+            stamp = self.inspect['Status']['Timestamp']
+        except TypeError:
+            return None
+        return pretty_print_time_interval(time=stamp[:stamp.index('.') + 7], precise=True, utc=stamp[-1] == 'Z')
+
+    @property
+    def desired_state(self):
+        try:
+            return self._desired_state.lower()
+        except TypeError:
+            log.warning("Desired state of %s (%s) is not a string: %s", self.name, self.id, str(self._desired_state))
+            return None
+
+    @property
+    def terminal(self):
+        return self.desired_state == 'shutdown' and self.current_state in self.terminal_states
+
+    def in_state(self, desired, current):
+        return self.desired_state == desired.lower() and self.current_state == current.lower()