comparison env/lib/python3.9/site-packages/planemo/engine/galaxy.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Module contianing the :class:`GalaxyEngine` implementation of :class:`Engine`."""
2 from __future__ import absolute_import
3
4 import abc
5 import contextlib
6
7 from galaxy.tool_util.verify import interactor
8 from six import add_metaclass
9
10 from planemo.galaxy.activity import execute
11 from planemo.galaxy.config import external_galaxy_config
12 from planemo.galaxy.serve import serve_daemon
13 from planemo.runnable import RunnableType
14 from .interface import BaseEngine
15
16
17 @add_metaclass(abc.ABCMeta)
18 class GalaxyEngine(BaseEngine):
19 """An :class:`Engine` implementation backed by a managed Galaxy.
20
21 More information on Galaxy can be found at http://galaxyproject.org/.
22 """
23
24 handled_runnable_types = [
25 RunnableType.cwl_tool,
26 RunnableType.cwl_workflow,
27 RunnableType.galaxy_workflow,
28 RunnableType.galaxy_tool,
29 RunnableType.galaxy_datamanager,
30 ]
31
32 def _run(self, runnable, job_path):
33 """Run CWL job in Galaxy."""
34 self._ctx.vlog("Serving artifact [%s] with Galaxy." % (runnable,))
35 with self.ensure_runnables_served([runnable]) as config:
36 self._ctx.vlog("Running job path [%s]" % job_path)
37 if self._ctx.verbose:
38 self._ctx.log("Running Galaxy with API configuration [%s]" % config.user_api_config)
39 run_response = execute(self._ctx, config, runnable, job_path, **self._kwds)
40
41 return run_response
42
43 @abc.abstractmethod
44 def ensure_runnables_served(self, runnables):
45 """Use a context manager and describe Galaxy instance with runnables being served."""
46
47 def _run_test_case(self, test_case):
48 if hasattr(test_case, "job_path"):
49 # Simple file-based job path.
50 return super(GalaxyEngine, self)._run_test_case(test_case)
51 else:
52 with self.ensure_runnables_served([test_case.runnable]) as config:
53 galaxy_interactor_kwds = {
54 "galaxy_url": config.galaxy_url,
55 "master_api_key": config.master_api_key,
56 "api_key": config.user_api_key,
57 "keep_outputs_dir": "", # TODO: this...
58 }
59 tool_id = test_case.tool_id
60 test_index = test_case.test_index
61 tool_version = test_case.tool_version
62 galaxy_interactor = interactor.GalaxyInteractorApi(**galaxy_interactor_kwds)
63
64 test_results = []
65
66 def _register_job_data(job_data):
67 test_results.append({
68 'id': tool_id + "-" + str(test_index),
69 'has_data': True,
70 'data': job_data,
71 })
72
73 verbose = self._ctx.verbose
74 try:
75 if verbose:
76 # TODO: this is pretty hacky, it'd be better to send a stream
77 # and capture the output information somehow.
78 interactor.VERBOSE_GALAXY_ERRORS = True
79
80 interactor.verify_tool(
81 tool_id,
82 galaxy_interactor,
83 test_index=test_index,
84 tool_version=tool_version,
85 register_job_data=_register_job_data,
86 quiet=not verbose,
87 )
88 except Exception:
89 pass
90
91 return test_results[0]
92
93
94 class LocalManagedGalaxyEngine(GalaxyEngine):
95 """An :class:`Engine` implementation backed by a managed Galaxy.
96
97 More information on Galaxy can be found at http://galaxyproject.org/.
98 """
99
100 @contextlib.contextmanager
101 def ensure_runnables_served(self, runnables):
102 # TODO: define an interface for this - not everything in config would make sense for a
103 # pre-existing Galaxy interface.
104 with serve_daemon(self._ctx, runnables, **self._serve_kwds()) as config:
105 yield config
106
107 def _serve_kwds(self):
108 return self._kwds.copy()
109
110
111 class DockerizedManagedGalaxyEngine(LocalManagedGalaxyEngine):
112 """An :class:`Engine` implementation backed by Galaxy running in Docker.
113
114 More information on Galaxy can be found at http://galaxyproject.org/.
115 """
116
117 def _serve_kwds(self):
118 serve_kwds = self._kwds.copy()
119 serve_kwds["dockerize"] = True
120 return serve_kwds
121
122
123 class ExternalGalaxyEngine(GalaxyEngine):
124 """An :class:`Engine` implementation backed by an external Galaxy instance.
125 """
126
127 @contextlib.contextmanager
128 def ensure_runnables_served(self, runnables):
129 # TODO: ensure tools are available
130 with external_galaxy_config(self._ctx, runnables, **self._kwds) as config:
131 config.install_workflows()
132 yield config
133
134
135 __all__ = (
136 "DockerizedManagedGalaxyEngine",
137 "ExternalGalaxyEngine",
138 "LocalManagedGalaxyEngine",
139 )