view env/lib/python3.9/site-packages/planemo/engine/galaxy.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

"""Module contianing the :class:`GalaxyEngine` implementation of :class:`Engine`."""
from __future__ import absolute_import

import abc
import contextlib

from galaxy.tool_util.verify import interactor
from six import add_metaclass

from planemo.galaxy.activity import execute
from planemo.galaxy.config import external_galaxy_config
from planemo.galaxy.serve import serve_daemon
from planemo.runnable import RunnableType
from .interface import BaseEngine


@add_metaclass(abc.ABCMeta)
class GalaxyEngine(BaseEngine):
    """An :class:`Engine` implementation backed by a managed Galaxy.

    More information on Galaxy can be found at http://galaxyproject.org/.
    """

    handled_runnable_types = [
        RunnableType.cwl_tool,
        RunnableType.cwl_workflow,
        RunnableType.galaxy_workflow,
        RunnableType.galaxy_tool,
        RunnableType.galaxy_datamanager,
    ]

    def _run(self, runnable, job_path):
        """Run CWL job in Galaxy."""
        self._ctx.vlog("Serving artifact [%s] with Galaxy." % (runnable,))
        with self.ensure_runnables_served([runnable]) as config:
            self._ctx.vlog("Running job path [%s]" % job_path)
            if self._ctx.verbose:
                self._ctx.log("Running Galaxy with API configuration [%s]" % config.user_api_config)
            run_response = execute(self._ctx, config, runnable, job_path, **self._kwds)

        return run_response

    @abc.abstractmethod
    def ensure_runnables_served(self, runnables):
        """Use a context manager and describe Galaxy instance with runnables being served."""

    def _run_test_case(self, test_case):
        if hasattr(test_case, "job_path"):
            # Simple file-based job path.
            return super(GalaxyEngine, self)._run_test_case(test_case)
        else:
            with self.ensure_runnables_served([test_case.runnable]) as config:
                galaxy_interactor_kwds = {
                    "galaxy_url": config.galaxy_url,
                    "master_api_key": config.master_api_key,
                    "api_key": config.user_api_key,
                    "keep_outputs_dir": "",  # TODO: this...
                }
                tool_id = test_case.tool_id
                test_index = test_case.test_index
                tool_version = test_case.tool_version
                galaxy_interactor = interactor.GalaxyInteractorApi(**galaxy_interactor_kwds)

                test_results = []

                def _register_job_data(job_data):
                    test_results.append({
                        'id': tool_id + "-" + str(test_index),
                        'has_data': True,
                        'data': job_data,
                    })

                verbose = self._ctx.verbose
                try:
                    if verbose:
                        # TODO: this is pretty hacky, it'd be better to send a stream
                        # and capture the output information somehow.
                        interactor.VERBOSE_GALAXY_ERRORS = True

                    interactor.verify_tool(
                        tool_id,
                        galaxy_interactor,
                        test_index=test_index,
                        tool_version=tool_version,
                        register_job_data=_register_job_data,
                        quiet=not verbose,
                    )
                except Exception:
                    pass

                return test_results[0]


class LocalManagedGalaxyEngine(GalaxyEngine):
    """An :class:`Engine` implementation backed by a managed Galaxy.

    More information on Galaxy can be found at http://galaxyproject.org/.
    """

    @contextlib.contextmanager
    def ensure_runnables_served(self, runnables):
        # TODO: define an interface for this - not everything in config would make sense for a
        # pre-existing Galaxy interface.
        with serve_daemon(self._ctx, runnables, **self._serve_kwds()) as config:
            yield config

    def _serve_kwds(self):
        return self._kwds.copy()


class DockerizedManagedGalaxyEngine(LocalManagedGalaxyEngine):
    """An :class:`Engine` implementation backed by Galaxy running in Docker.

    More information on Galaxy can be found at http://galaxyproject.org/.
    """

    def _serve_kwds(self):
        serve_kwds = self._kwds.copy()
        serve_kwds["dockerize"] = True
        return serve_kwds


class ExternalGalaxyEngine(GalaxyEngine):
    """An :class:`Engine` implementation backed by an external Galaxy instance.
    """

    @contextlib.contextmanager
    def ensure_runnables_served(self, runnables):
        # TODO: ensure tools are available
        with external_galaxy_config(self._ctx, runnables, **self._kwds) as config:
            config.install_workflows()
            yield config


__all__ = (
    "DockerizedManagedGalaxyEngine",
    "ExternalGalaxyEngine",
    "LocalManagedGalaxyEngine",
)