Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/galaxy/tool_util/deps/containers.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
import collections import logging import os from galaxy.util import ( asbool, plugin_config ) from .container_classes import ( CONTAINER_CLASSES, DOCKER_CONTAINER_TYPE, NULL_CONTAINER, SINGULARITY_CONTAINER_TYPE, ) from .container_resolvers import ResolutionCache from .container_resolvers.explicit import ( ExplicitContainerResolver, ExplicitSingularityContainerResolver, ) from .container_resolvers.mulled import ( BuildMulledDockerContainerResolver, BuildMulledSingularityContainerResolver, CachedMulledDockerContainerResolver, CachedMulledSingularityContainerResolver, MulledDockerContainerResolver, MulledSingularityContainerResolver, ) from .requirements import ( ContainerDescription, ) log = logging.getLogger(__name__) DEFAULT_CONTAINER_TYPE = DOCKER_CONTAINER_TYPE ALL_CONTAINER_TYPES = [DOCKER_CONTAINER_TYPE, SINGULARITY_CONTAINER_TYPE] ResolvedContainerDescription = collections.namedtuple('ResolvedContainerDescription', ['container_resolver', 'container_description']) class ContainerFinder: def __init__(self, app_info, mulled_resolution_cache=None): self.app_info = app_info self.container_registry = ContainerRegistry(app_info, mulled_resolution_cache=mulled_resolution_cache) def _enabled_container_types(self, destination_info): return [t for t in ALL_CONTAINER_TYPES if self.__container_type_enabled(t, destination_info)] def find_best_container_description(self, enabled_container_types, tool_info, **kwds): """Regardless of destination properties - find best container for tool. Given container types and container.ToolInfo description of the tool.""" return self.container_registry.find_best_container_description(enabled_container_types, tool_info, **kwds) def resolve(self, enabled_container_types, tool_info, **kwds): """Regardless of destination properties - find ResolvedContainerDescription for tool.""" return self.container_registry.resolve(enabled_container_types, tool_info, **kwds) def find_container(self, tool_info, destination_info, job_info): enabled_container_types = self._enabled_container_types(destination_info) # Short-cut everything else and just skip checks if no container type is enabled. if not enabled_container_types: return NULL_CONTAINER def __destination_container(container_description=None, container_id=None, container_type=None): if container_description: container_id = container_description.identifier container_type = container_description.type container = self.__destination_container( container_id, container_type, tool_info, destination_info, job_info, container_description, ) return container def container_from_description_from_dicts(destination_container_dicts): for destination_container_dict in destination_container_dicts: container_description = ContainerDescription.from_dict(destination_container_dict) if container_description: container = __destination_container(container_description) if container: return container if "container_override" in destination_info: container = container_from_description_from_dicts(destination_info["container_override"]) if container: return container # If destination forcing Galaxy to use a particular container do it, # this is likely kind of a corner case. For instance if deployers # do not trust the containers annotated in tools. for container_type in CONTAINER_CLASSES.keys(): container_id = self.__overridden_container_id(container_type, destination_info) if container_id: container = __destination_container(container_type=container_type, container_id=container_id) if container: return container # Otherwise lets see if we can find container for the tool. container_description = self.find_best_container_description(enabled_container_types, tool_info) container = __destination_container(container_description) if container: return container # If we still don't have a container, check to see if any container # types define a default container id and use that. if "container" in destination_info: container = container_from_description_from_dicts(destination_info["container"]) if container: return container for container_type in CONTAINER_CLASSES.keys(): container_id = self.__default_container_id(container_type, destination_info) if container_id: container = __destination_container(container_type=container_type, container_id=container_id) if container: return container return NULL_CONTAINER def resolution_cache(self): return self.container_registry.get_resolution_cache() def __overridden_container_id(self, container_type, destination_info): if not self.__container_type_enabled(container_type, destination_info): return None if "%s_container_id_override" % container_type in destination_info: return destination_info.get("%s_container_id_override" % container_type) if "%s_image_override" % container_type in destination_info: return self.__build_container_id_from_parts(container_type, destination_info, mode="override") def __build_container_id_from_parts(self, container_type, destination_info, mode): repo = "" owner = "" repo_key = f"{container_type}_repo_{mode}" owner_key = f"{container_type}_owner_{mode}" if repo_key in destination_info: repo = destination_info[repo_key] + "/" if owner_key in destination_info: owner = destination_info[owner_key] + "/" cont_id = repo + owner + destination_info[f"{container_type}_image_{mode}"] tag_key = f"{container_type}_tag_{mode}" if tag_key in destination_info: cont_id += ":" + destination_info[tag_key] return cont_id def __default_container_id(self, container_type, destination_info): if not self.__container_type_enabled(container_type, destination_info): return None key = "%s_default_container_id" % container_type # Also allow docker_image... if key not in destination_info: key = "%s_image" % container_type if key in destination_info: return destination_info.get(key) elif "%s_image_default" % container_type in destination_info: return self.__build_container_id_from_parts(container_type, destination_info, mode="default") return None def __destination_container(self, container_id, container_type, tool_info, destination_info, job_info, container_description=None): # TODO: ensure destination_info is dict-like if not self.__container_type_enabled(container_type, destination_info): return NULL_CONTAINER # TODO: Right now this assumes all containers available when a # container type is - there should be more thought put into this. # Checking which are available - settings policies for what can be # auto-fetched, etc.... return CONTAINER_CLASSES[container_type](container_id, self.app_info, tool_info, destination_info, job_info, container_description) def __container_type_enabled(self, container_type, destination_info): return asbool(destination_info.get("%s_enabled" % container_type, False)) class NullContainerFinder: def find_container(self, tool_info, destination_info, job_info): return [] class ContainerRegistry: """Loop through enabled ContainerResolver plugins and find first match.""" def __init__(self, app_info, mulled_resolution_cache=None): self.resolver_classes = self.__resolvers_dict() self.enable_mulled_containers = app_info.enable_mulled_containers self.app_info = app_info self.container_resolvers = self.__build_container_resolvers(app_info) self.mulled_resolution_cache = mulled_resolution_cache def __build_container_resolvers(self, app_info): conf_file = getattr(app_info, 'containers_resolvers_config_file', None) if not conf_file: return self.__default_containers_resolvers() if not os.path.exists(conf_file): log.debug("Unable to find config file '%s'", conf_file) return self.__default_containers_resolvers() plugin_source = plugin_config.plugin_source_from_path(conf_file) return self._parse_resolver_conf(plugin_source) def _parse_resolver_conf(self, plugin_source): extra_kwds = { 'app_info': self.app_info } return plugin_config.load_plugins(self.resolver_classes, plugin_source, extra_kwds) def __default_containers_resolvers(self): default_resolvers = [ ExplicitContainerResolver(self.app_info), ExplicitSingularityContainerResolver(self.app_info), ] if self.enable_mulled_containers: default_resolvers.extend([ CachedMulledDockerContainerResolver(self.app_info, namespace="biocontainers"), CachedMulledDockerContainerResolver(self.app_info, namespace="local"), CachedMulledSingularityContainerResolver(self.app_info, namespace="biocontainers"), CachedMulledSingularityContainerResolver(self.app_info, namespace="local"), MulledDockerContainerResolver(self.app_info, namespace="biocontainers"), MulledSingularityContainerResolver(self.app_info, namespace="biocontainers"), ]) # BuildMulledDockerContainerResolver and BuildMulledSingularityContainerResolver both need the docker daemon to build images. # If docker is not available, we don't load them. build_mulled_docker_container_resolver = BuildMulledDockerContainerResolver(self.app_info) if build_mulled_docker_container_resolver.cli_available: default_resolvers.extend([ build_mulled_docker_container_resolver, BuildMulledSingularityContainerResolver(self.app_info), ]) return default_resolvers def __resolvers_dict(self): import galaxy.tool_util.deps.container_resolvers return plugin_config.plugins_dict(galaxy.tool_util.deps.container_resolvers, 'resolver_type') def get_resolution_cache(self): cache = ResolutionCache() if self.mulled_resolution_cache is not None: cache.mulled_resolution_cache = self.mulled_resolution_cache return cache def find_best_container_description(self, enabled_container_types, tool_info, **kwds): """Yield best container description of supplied types matching tool info.""" try: resolved_container_description = self.resolve(enabled_container_types, tool_info, **kwds) except Exception: log.exception("Could not get container description for tool '%s'", tool_info.tool_id) return None return None if resolved_container_description is None else resolved_container_description.container_description def resolve(self, enabled_container_types, tool_info, index=None, resolver_type=None, install=True, resolution_cache=None, session=None): resolution_cache = resolution_cache or self.get_resolution_cache() for i, container_resolver in enumerate(self.container_resolvers): if index is not None and i != index: continue if resolver_type is not None and resolver_type != container_resolver.resolver_type: continue if hasattr(container_resolver, "container_type"): if container_resolver.container_type not in enabled_container_types: continue if not install and container_resolver.builds_on_resolution: continue container_description = container_resolver.resolve(enabled_container_types, tool_info, install=install, resolution_cache=resolution_cache, session=session) log.info(f"Checking with container resolver [{container_resolver}] found description [{container_description}]") if container_description: assert container_description.type in enabled_container_types return ResolvedContainerDescription(container_resolver, container_description) return None