diff env/lib/python3.9/site-packages/planemo/engine/galaxy.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/planemo/engine/galaxy.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,139 @@
+"""Module contianing the :class:`GalaxyEngine` implementation of :class:`Engine`."""
+from __future__ import absolute_import
+
+import abc
+import contextlib
+
+from galaxy.tool_util.verify import interactor
+from six import add_metaclass
+
+from planemo.galaxy.activity import execute
+from planemo.galaxy.config import external_galaxy_config
+from planemo.galaxy.serve import serve_daemon
+from planemo.runnable import RunnableType
+from .interface import BaseEngine
+
+
+@add_metaclass(abc.ABCMeta)
+class GalaxyEngine(BaseEngine):
+    """An :class:`Engine` implementation backed by a managed Galaxy.
+
+    More information on Galaxy can be found at http://galaxyproject.org/.
+    """
+
+    handled_runnable_types = [
+        RunnableType.cwl_tool,
+        RunnableType.cwl_workflow,
+        RunnableType.galaxy_workflow,
+        RunnableType.galaxy_tool,
+        RunnableType.galaxy_datamanager,
+    ]
+
+    def _run(self, runnable, job_path):
+        """Run CWL job in Galaxy."""
+        self._ctx.vlog("Serving artifact [%s] with Galaxy." % (runnable,))
+        with self.ensure_runnables_served([runnable]) as config:
+            self._ctx.vlog("Running job path [%s]" % job_path)
+            if self._ctx.verbose:
+                self._ctx.log("Running Galaxy with API configuration [%s]" % config.user_api_config)
+            run_response = execute(self._ctx, config, runnable, job_path, **self._kwds)
+
+        return run_response
+
+    @abc.abstractmethod
+    def ensure_runnables_served(self, runnables):
+        """Use a context manager and describe Galaxy instance with runnables being served."""
+
+    def _run_test_case(self, test_case):
+        if hasattr(test_case, "job_path"):
+            # Simple file-based job path.
+            return super(GalaxyEngine, self)._run_test_case(test_case)
+        else:
+            with self.ensure_runnables_served([test_case.runnable]) as config:
+                galaxy_interactor_kwds = {
+                    "galaxy_url": config.galaxy_url,
+                    "master_api_key": config.master_api_key,
+                    "api_key": config.user_api_key,
+                    "keep_outputs_dir": "",  # TODO: this...
+                }
+                tool_id = test_case.tool_id
+                test_index = test_case.test_index
+                tool_version = test_case.tool_version
+                galaxy_interactor = interactor.GalaxyInteractorApi(**galaxy_interactor_kwds)
+
+                test_results = []
+
+                def _register_job_data(job_data):
+                    test_results.append({
+                        'id': tool_id + "-" + str(test_index),
+                        'has_data': True,
+                        'data': job_data,
+                    })
+
+                verbose = self._ctx.verbose
+                try:
+                    if verbose:
+                        # TODO: this is pretty hacky, it'd be better to send a stream
+                        # and capture the output information somehow.
+                        interactor.VERBOSE_GALAXY_ERRORS = True
+
+                    interactor.verify_tool(
+                        tool_id,
+                        galaxy_interactor,
+                        test_index=test_index,
+                        tool_version=tool_version,
+                        register_job_data=_register_job_data,
+                        quiet=not verbose,
+                    )
+                except Exception:
+                    pass
+
+                return test_results[0]
+
+
+class LocalManagedGalaxyEngine(GalaxyEngine):
+    """An :class:`Engine` implementation backed by a managed Galaxy.
+
+    More information on Galaxy can be found at http://galaxyproject.org/.
+    """
+
+    @contextlib.contextmanager
+    def ensure_runnables_served(self, runnables):
+        # TODO: define an interface for this - not everything in config would make sense for a
+        # pre-existing Galaxy interface.
+        with serve_daemon(self._ctx, runnables, **self._serve_kwds()) as config:
+            yield config
+
+    def _serve_kwds(self):
+        return self._kwds.copy()
+
+
+class DockerizedManagedGalaxyEngine(LocalManagedGalaxyEngine):
+    """An :class:`Engine` implementation backed by Galaxy running in Docker.
+
+    More information on Galaxy can be found at http://galaxyproject.org/.
+    """
+
+    def _serve_kwds(self):
+        serve_kwds = self._kwds.copy()
+        serve_kwds["dockerize"] = True
+        return serve_kwds
+
+
+class ExternalGalaxyEngine(GalaxyEngine):
+    """An :class:`Engine` implementation backed by an external Galaxy instance.
+    """
+
+    @contextlib.contextmanager
+    def ensure_runnables_served(self, runnables):
+        # TODO: ensure tools are available
+        with external_galaxy_config(self._ctx, runnables, **self._kwds) as config:
+            config.install_workflows()
+            yield config
+
+
+__all__ = (
+    "DockerizedManagedGalaxyEngine",
+    "ExternalGalaxyEngine",
+    "LocalManagedGalaxyEngine",
+)