diff env/lib/python3.9/site-packages/galaxy/containers/__init__.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/galaxy/containers/__init__.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,383 @@
+"""
+Interfaces to containerization software
+"""
+
+import errno
+import inspect
+import logging
+import shlex
+import subprocess
+import sys
+import uuid
+from abc import (
+    ABCMeta,
+    abstractmethod,
+    abstractproperty
+)
+from typing import Any, Dict, NamedTuple, Optional, Type
+
+import yaml
+
+from galaxy.exceptions import ContainerCLIError
+from galaxy.util.submodules import import_submodules
+
+
+DEFAULT_CONTAINER_TYPE = 'docker'
+DEFAULT_CONF = {'_default_': {'type': DEFAULT_CONTAINER_TYPE}}
+
+log = logging.getLogger(__name__)
+
+
+class ContainerPort(NamedTuple):
+    """Named tuple representing ports published by a container, with attributes"""
+    port: int  # Port number (inside the container)
+    protocol: str  # Port protocol, either ``tcp`` or ``udp``
+    hostaddr: str  # Address or hostname where the published port can be accessed
+    hostport: int  # Published port number on which the container can be accessed
+
+
+class ContainerVolume(metaclass=ABCMeta):
+
+    valid_modes = frozenset({"ro", "rw"})
+
+    def __init__(self, path, host_path=None, mode=None):
+        self.path = path
+        self.host_path = host_path
+        self.mode = mode
+        if mode and not self.mode_is_valid:
+            raise ValueError("Invalid container volume mode: %s" % mode)
+
+    @abstractmethod
+    def from_str(cls, as_str):
+        """Classmethod to convert from this container type's string representation.
+
+        :param  as_str: string representation of volume
+        :type   as_str: str
+        """
+
+    @abstractmethod
+    def __str__(self):
+        """Return this container type's string representation of the volume.
+        """
+
+    @abstractmethod
+    def to_native(self):
+        """Return this container type's native representation of the volume.
+        """
+
+    @property
+    def mode_is_valid(self):
+        return self.mode in self.valid_modes
+
+
+class Container(metaclass=ABCMeta):
+
+    def __init__(self, interface, id, name=None, **kwargs):
+        """
+
+        :param      interface:  Container interface for the given container type
+        :type       interface:  :class:`ContainerInterface` subclass instance
+        :param      id:         Container identifier
+        :type       id:         str
+        :param      name:       Container name
+        :type       name:       str
+
+        """
+        self._interface = interface
+        self._id = id
+        self._name = name
+
+    @property
+    def id(self):
+        """The container's id"""
+        return self._id
+
+    @property
+    def name(self):
+        """The container's name"""
+        return self._name
+
+    @abstractmethod
+    def from_id(cls, interface, id):
+        """Classmethod to create an instance of Container from the container system's id for the given container type.
+
+        :param  interface:  Container insterface for the given id
+        :type   interface:  :class:`ContainerInterface` subclass instance
+        :param  id:         Container identifier
+        :type   id:         str
+        :returns:           Container object
+        :rtype:             :class:`Container` subclass instance
+        """
+
+    @abstractproperty
+    def ports(self):
+        """Attribute for accessing details of ports published by the container.
+
+        :returns:   Port details
+        :rtype:     list of :class:`ContainerPort` s
+        """
+
+    @abstractproperty
+    def address(self):
+        """Attribute for accessing the address or hostname where published ports can be accessed.
+
+        :returns:   Hostname or IP address
+        :rtype:     str
+        """
+
+    @abstractmethod
+    def is_ready(self):
+        """Indicate whether or not the container is "ready" (up, available, running).
+
+        :returns:   True if ready, else False
+        :rtpe:      bool
+        """
+
+    def map_port(self, port):
+        """Map a given container port to a host address/port.
+
+        For legacy reasons, if port is ``None``, the first port (if any) will be returned
+
+        :param  port:   Container port to map
+        :type   port:   int
+        :returns:       Mapping to host address/port for given container port
+        :rtype:         :class:`ContainerPort` instance
+        """
+        mapping = None
+        ports = self.ports or []
+        for mapping in ports:
+            if port == mapping.port:
+                return mapping
+            if port is None:
+                log.warning("Container %s (%s): Don't know how to map ports to containers with multiple exposed ports "
+                            "when a specific port is not requested. Arbitrarily choosing first: %s",
+                            self.name, self.id, mapping)
+                return mapping
+        else:
+            if port is None:
+                log.warning("Container %s (%s): No exposed ports found!", self.name, self.id)
+            else:
+                log.warning("Container %s (%s): No mapping found for port: %s", self.name, self.id, port)
+        return None
+
+
+class ContainerInterface(metaclass=ABCMeta):
+
+    container_type: Optional[str] = None
+    container_class: Optional[Type[Container]] = None
+    volume_class = Optional[Type[ContainerVolume]]
+    conf_defaults: Dict[str, Optional[Any]] = {
+        'name_prefix': 'galaxy_',
+    }
+    option_map: Dict[str, Dict] = {}
+    publish_port_list_required = False
+    supports_volumes = True
+
+    def __init__(self, conf, key, containers_config_file):
+        self._key = key
+        self._containers_config_file = containers_config_file
+        mro = reversed(self.__class__.__mro__)
+        next(mro)
+        self._conf = ContainerInterfaceConfig()
+        for c in mro:
+            self._conf.update(c.conf_defaults)
+        self._conf.update(conf)
+        self.validate_config()
+
+    def _normalize_command(self, command):
+        if isinstance(command, str):
+            command = shlex.split(command)
+        return command
+
+    def _guess_kwopt_type(self, val):
+        opttype = 'string'
+        if isinstance(val, bool):
+            opttype = 'boolean'
+        elif isinstance(val, list):
+            opttype = 'list'
+            try:
+                if isinstance(val[0], tuple) and len(val[0]) == 3:
+                    opttype = 'list_of_kovtrips'
+            except IndexError:
+                pass
+        elif isinstance(val, dict):
+            opttype = 'list_of_kvpairs'
+        return opttype
+
+    def _guess_kwopt_flag(self, opt):
+        return '--%s' % opt.replace('_', '-')
+
+    def _stringify_kwopts(self, kwopts):
+        opts = []
+        for opt, val in kwopts.items():
+            try:
+                optdef = self.option_map[opt]
+            except KeyError:
+                optdef = {
+                    'flag': self._guess_kwopt_flag(opt),
+                    'type': self._guess_kwopt_type(val),
+                }
+                log.warning("option '%s' not in %s.option_map, guessing flag '%s' type '%s'",
+                            opt, self.__class__.__name__, optdef['flag'], optdef['type'])
+            opts.append(getattr(self, '_stringify_kwopt_' + optdef['type'])(optdef['flag'], val))
+        return ' '.join(opts)
+
+    def _stringify_kwopt_boolean(self, flag, val):
+        """
+        """
+        return '{flag}={value}'.format(flag=flag, value=str(val).lower())
+
+    def _stringify_kwopt_string(self, flag, val):
+        """
+        """
+        return '{flag} {value}'.format(flag=flag, value=shlex.quote(str(val)))
+
+    def _stringify_kwopt_list(self, flag, val):
+        """
+        """
+        if isinstance(val, str):
+            return self._stringify_kwopt_string(flag, val)
+        return ' '.join('{flag} {value}'.format(flag=flag, value=shlex.quote(str(v))) for v in val)
+
+    def _stringify_kwopt_list_of_kvpairs(self, flag, val):
+        """
+        """
+        l = []
+        if isinstance(val, list):
+            # ['foo=bar', 'baz=quux']
+            l = val
+        else:
+            # {'foo': 'bar', 'baz': 'quux'}
+            for k, v in dict(val).items():
+                l.append(f'{k}={v}')
+        return self._stringify_kwopt_list(flag, l)
+
+    def _stringify_kwopt_list_of_kovtrips(self, flag, val):
+        """
+        """
+        if isinstance(val, str):
+            return self._stringify_kwopt_string(flag, val)
+        l = []
+        for k, o, v in val:
+            l.append(f'{k}{o}{v}')
+        return self._stringify_kwopt_list(flag, l)
+
+    def _run_command(self, command, verbose=False):
+        if verbose:
+            log.debug('running command: [%s]', command)
+        command_list = self._normalize_command(command)
+        p = subprocess.Popen(command_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
+        stdout, stderr = p.communicate()
+        if p.returncode == 0:
+            return stdout.strip()
+        else:
+            msg = f"Command '{command}' returned non-zero exit status {p.returncode}"
+            log.error(msg + ': ' + stderr.strip())
+            raise ContainerCLIError(
+                msg,
+                stdout=stdout.strip(),
+                stderr=stderr.strip(),
+                returncode=p.returncode,
+                command=command,
+                subprocess_command=command_list)
+
+    @property
+    def key(self):
+        return self._key
+
+    @property
+    def containers_config_file(self):
+        return self._containers_config_file
+
+    def get_container(self, container_id):
+        return self.container_class.from_id(self, container_id)
+
+    def set_kwopts_name(self, kwopts):
+        if self._name_prefix is not None:
+            name = '{prefix}{name}'.format(
+                prefix=self._name_prefix,
+                name=kwopts.get('name', uuid.uuid4().hex)
+            )
+            kwopts['name'] = name
+
+    def validate_config(self):
+        """
+        """
+        self._name_prefix = self._conf.name_prefix
+
+    @abstractmethod
+    def run_in_container(self, command, image=None, **kwopts):
+        """
+        """
+
+
+class ContainerInterfaceConfig(dict):
+
+    def __setattr__(self, name, value):
+        self[name] = value
+
+    def __getattr__(self, name):
+        try:
+            return self[name]
+        except KeyError:
+            raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")
+
+    def get(self, name, default=None):
+        try:
+            return self[name]
+        except KeyError:
+            return default
+
+
+def build_container_interfaces(containers_config_file, containers_conf=None):
+    """Build :class:`ContainerInterface`s. Pass ``containers_conf`` to avoid rereading the config file.
+
+    :param  containers_config_file: Filename of containers_conf.yml
+    :type   containers_config_file: str
+    :param  containers_conf:        Optional containers conf (as read from containers_conf.yml), will be used in place
+                                    of containers_config_file
+    :type   containers_conf:        dict
+    :returns:                       Instantiated container interfaces with keys corresponding to ``containers`` keys
+    :rtype:                         dict of :class:`ContainerInterface` subclass instances
+    """
+    if not containers_conf:
+        containers_conf = parse_containers_config(containers_config_file)
+    interface_classes = _get_interface_modules()
+    interfaces = {}
+    for k, conf in containers_conf.items():
+        container_type = conf.get('type', DEFAULT_CONTAINER_TYPE)
+        assert container_type in interface_classes, "unknown container interface type: %s" % container_type
+        interfaces[k] = interface_classes[container_type](conf, k, containers_config_file)
+    return interfaces
+
+
+def parse_containers_config(containers_config_file):
+    """Parse a ``containers_conf.yml`` and return the contents of its ``containers`` dictionary.
+
+    :param  containers_config_file: Filename of containers_conf.yml
+    :type   containers_config_file: str
+    :returns:                       Contents of the dictionary under the ``containers`` key
+    :rtype:                         dict
+    """
+    conf = DEFAULT_CONF.copy()
+    try:
+        with open(containers_config_file) as fh:
+            c = yaml.safe_load(fh)
+            conf.update(c.get('containers', {}))
+    except OSError as exc:
+        if exc.errno == errno.ENOENT:
+            log.debug("config file '%s' does not exist, running with default config", containers_config_file)
+        else:
+            raise
+    return conf
+
+
+def _get_interface_modules():
+    interfaces = []
+    modules = import_submodules(sys.modules[__name__])
+    for module in modules:
+        module_names = [getattr(module, _) for _ in dir(module)]
+        classes = [_ for _ in module_names if inspect.isclass(_) and
+            not _ == ContainerInterface and issubclass(_, ContainerInterface)]
+        interfaces.extend(classes)
+    return {x.container_type: x for x in interfaces}