Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/galaxy/tool_util/deps/container_classes.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import os | |
2 import string | |
3 from abc import ( | |
4 ABCMeta, | |
5 abstractmethod | |
6 ) | |
7 from logging import getLogger | |
8 from uuid import uuid4 | |
9 | |
10 | |
11 from galaxy.containers.docker_model import DockerVolume | |
12 from galaxy.util import ( | |
13 asbool, | |
14 in_directory | |
15 ) | |
16 from . import ( | |
17 docker_util, | |
18 singularity_util | |
19 ) | |
20 from .requirements import ( | |
21 DEFAULT_CONTAINER_RESOLVE_DEPENDENCIES, | |
22 DEFAULT_CONTAINER_SHELL, | |
23 ) | |
24 | |
25 log = getLogger(__name__) | |
26 | |
27 DOCKER_CONTAINER_TYPE = "docker" | |
28 SINGULARITY_CONTAINER_TYPE = "singularity" | |
29 | |
30 LOAD_CACHED_IMAGE_COMMAND_TEMPLATE = r''' | |
31 python << EOF | |
32 from __future__ import print_function | |
33 | |
34 import json | |
35 import re | |
36 import subprocess | |
37 import tarfile | |
38 | |
39 t = tarfile.TarFile("${cached_image_file}") | |
40 meta_str = t.extractfile('repositories').read() | |
41 meta = json.loads(meta_str) | |
42 tag, tag_value = next(iter(meta.items())) | |
43 rev, rev_value = next(iter(tag_value.items())) | |
44 cmd = "${images_cmd}" | |
45 proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) | |
46 stdo, stde = proc.communicate() | |
47 found = False | |
48 for line in stdo.split("\n"): | |
49 tmp = re.split(r'\s+', line) | |
50 if tmp[0] == tag and tmp[1] == rev and tmp[2] == rev_value: | |
51 found = True | |
52 if not found: | |
53 print("Loading image") | |
54 cmd = "cat ${cached_image_file} | ${load_cmd}" | |
55 subprocess.check_call(cmd, shell=True) | |
56 EOF | |
57 ''' | |
58 SOURCE_CONDA_ACTIVATE = """ | |
59 # Check if container was created by installing conda packages, | |
60 # and if so, source scripts to populate environment variables | |
61 # that would be set by activating the conda environment. | |
62 if [ -d /usr/local/etc/conda/activate.d ]; then | |
63 export CONDA_PREFIX=/usr/local | |
64 for f in /usr/local/etc/conda/activate.d/*.sh; do | |
65 case "$f" in | |
66 "/usr/local/etc/conda/activate.d/activate-"*) :;; | |
67 *) . "$f" ;; | |
68 esac; | |
69 done | |
70 fi | |
71 """ | |
72 | |
73 | |
74 class Container(metaclass=ABCMeta): | |
75 | |
76 def __init__(self, container_id, app_info, tool_info, destination_info, job_info, container_description, container_name=None): | |
77 self.container_id = container_id | |
78 self.app_info = app_info | |
79 self.tool_info = tool_info | |
80 self.destination_info = destination_info | |
81 self.job_info = job_info | |
82 self.container_description = container_description | |
83 self.container_name = container_name or uuid4().hex | |
84 self.container_info = {} | |
85 | |
86 def prop(self, name, default): | |
87 destination_name = f"{self.container_type}_{name}" | |
88 return self.destination_info.get(destination_name, default) | |
89 | |
90 @property | |
91 def resolve_dependencies(self): | |
92 return DEFAULT_CONTAINER_RESOLVE_DEPENDENCIES if not self.container_description else self.container_description.resolve_dependencies | |
93 | |
94 @property | |
95 def shell(self): | |
96 return DEFAULT_CONTAINER_SHELL if not self.container_description else self.container_description.shell | |
97 | |
98 @property | |
99 def source_environment(self): | |
100 if self.container_description and not self.container_description.explicit: | |
101 return SOURCE_CONDA_ACTIVATE | |
102 return "" | |
103 | |
104 @abstractmethod | |
105 def containerize_command(self, command): | |
106 """ | |
107 Use destination supplied container configuration parameters, | |
108 container_id, and command to build a new command that runs | |
109 input command in container. | |
110 """ | |
111 | |
112 | |
113 def preprocess_volumes(volumes_raw_str, container_type): | |
114 """Process Galaxy volume specification string to either Docker or Singularity specification. | |
115 | |
116 Galaxy allows the mount try "default_ro" which translates to ro for Docker and | |
117 ro for Singularity iff no subdirectories are rw (Singularity does not allow ro | |
118 parent directories with rw subdirectories). | |
119 | |
120 >>> preprocess_volumes("/a/b", DOCKER_CONTAINER_TYPE) | |
121 ['/a/b:rw'] | |
122 >>> preprocess_volumes("/a/b:ro,/a/b/c:rw", DOCKER_CONTAINER_TYPE) | |
123 ['/a/b:ro', '/a/b/c:rw'] | |
124 >>> preprocess_volumes("/a/b:/a:ro,/a/b/c:/a/b:rw", DOCKER_CONTAINER_TYPE) | |
125 ['/a/b:/a:ro', '/a/b/c:/a/b:rw'] | |
126 >>> preprocess_volumes("/a/b:default_ro,/a/b/c:rw", DOCKER_CONTAINER_TYPE) | |
127 ['/a/b:ro', '/a/b/c:rw'] | |
128 >>> preprocess_volumes("/a/b:default_ro,/a/b/c:ro", SINGULARITY_CONTAINER_TYPE) | |
129 ['/a/b:ro', '/a/b/c:ro'] | |
130 >>> preprocess_volumes("/a/b:default_ro,/a/b/c:rw", SINGULARITY_CONTAINER_TYPE) | |
131 ['/a/b', '/a/b/c'] | |
132 """ | |
133 | |
134 volumes_raw_strs = [v.strip() for v in volumes_raw_str.split(",")] | |
135 volumes = [] | |
136 rw_paths = [] | |
137 | |
138 for volume_raw_str in volumes_raw_strs: | |
139 volume_parts = volume_raw_str.split(":") | |
140 if len(volume_parts) > 3: | |
141 raise Exception("Unparsable volumes string in configuration [%s]" % volumes_raw_str) | |
142 if len(volume_parts) == 3: | |
143 volume_parts = ["{}:{}".format(volume_parts[0], volume_parts[1]), volume_parts[2]] | |
144 if len(volume_parts) == 2 and volume_parts[1] not in ("rw", "ro", "default_ro"): | |
145 volume_parts = ["{}:{}".format(volume_parts[0], volume_parts[1]), "rw"] | |
146 if len(volume_parts) == 1: | |
147 volume_parts.append("rw") | |
148 volumes.append(volume_parts) | |
149 if volume_parts[1] == "rw": | |
150 rw_paths.append(volume_parts[0]) | |
151 | |
152 for volume in volumes: | |
153 path = volume[0] | |
154 how = volume[1] | |
155 | |
156 if how == "default_ro": | |
157 how = "ro" | |
158 if container_type == SINGULARITY_CONTAINER_TYPE: | |
159 for rw_path in rw_paths: | |
160 if in_directory(rw_path, path): | |
161 how = "rw" | |
162 | |
163 volume[1] = how | |
164 | |
165 # for a while singularity did not allow to specify the bind type rw | |
166 # (which is the default). so we omit this default | |
167 # see https://github.com/hpcng/singularity/pull/5487 | |
168 if container_type == SINGULARITY_CONTAINER_TYPE and volume[1] == 'rw': | |
169 del volume[1] | |
170 | |
171 return [":".join(v) for v in volumes] | |
172 | |
173 | |
174 class HasDockerLikeVolumes: | |
175 """Mixin to share functionality related to Docker volume handling. | |
176 | |
177 Singularity seems to have a fairly compatible syntax for volume handling. | |
178 """ | |
179 | |
180 def _expand_volume_str(self, value): | |
181 if not value: | |
182 return value | |
183 | |
184 template = string.Template(value) | |
185 variables = dict() | |
186 | |
187 def add_var(name, value): | |
188 if value: | |
189 if not value.startswith("$"): | |
190 value = os.path.abspath(value) | |
191 variables[name] = value | |
192 | |
193 add_var("working_directory", self.job_info.working_directory) | |
194 add_var("tmp_directory", self.job_info.tmp_directory) | |
195 add_var("job_directory", self.job_info.job_directory) | |
196 add_var("tool_directory", self.job_info.tool_directory) | |
197 add_var("home_directory", self.job_info.home_directory) | |
198 add_var("galaxy_root", self.app_info.galaxy_root_dir) | |
199 add_var("default_file_path", self.app_info.default_file_path) | |
200 add_var("library_import_dir", self.app_info.library_import_dir) | |
201 add_var('tool_data_path', self.app_info.tool_data_path) | |
202 add_var('shed_tool_data_path', self.app_info.shed_tool_data_path) | |
203 if self.job_info.job_directory and self.job_info.job_directory_type == "pulsar": | |
204 # We have a Pulsar job directory, so everything needed (excluding index | |
205 # files) should be available in job_directory... | |
206 defaults = "$job_directory:default_ro" | |
207 if self.job_info.tool_directory: | |
208 defaults += ",$tool_directory:default_ro" | |
209 defaults += ",$job_directory/outputs:rw,$working_directory:rw" | |
210 else: | |
211 defaults = "$galaxy_root:default_ro" | |
212 if self.job_info.tool_directory: | |
213 defaults += ",$tool_directory:default_ro" | |
214 if self.job_info.job_directory: | |
215 defaults += ",$job_directory:default_ro,$job_directory/outputs:rw" | |
216 if self.tool_info.profile <= 19.09: | |
217 defaults += ",$job_directory/configs:rw" | |
218 if self.job_info.tmp_directory is not None: | |
219 defaults += ",$tmp_directory:rw" | |
220 if self.job_info.home_directory is not None: | |
221 defaults += ",$home_directory:rw" | |
222 if self.app_info.outputs_to_working_directory: | |
223 # Should need default_file_path (which is of course an estimate given | |
224 # object stores anyway). | |
225 defaults += ",$working_directory:rw,$default_file_path:default_ro" | |
226 else: | |
227 defaults += ",$working_directory:rw,$default_file_path:rw" | |
228 | |
229 if self.app_info.library_import_dir: | |
230 defaults += ",$library_import_dir:default_ro" | |
231 if self.app_info.tool_data_path: | |
232 defaults += ",$tool_data_path:default_ro" | |
233 if self.app_info.shed_tool_data_path: | |
234 defaults += ",$shed_tool_data_path:default_ro" | |
235 | |
236 # Define $defaults that can easily be extended with external library and | |
237 # index data without deployer worrying about above details. | |
238 variables["defaults"] = string.Template(defaults).safe_substitute(variables) | |
239 | |
240 volumes_str = template.safe_substitute(variables) | |
241 | |
242 # Not all tools have a tool_directory - strip this out if supplied by | |
243 # job_conf. | |
244 tool_directory_index = volumes_str.find("$tool_directory") | |
245 if tool_directory_index > 0: | |
246 end_index = volumes_str.find(",", tool_directory_index) | |
247 if end_index < 0: | |
248 end_index = len(volumes_str) | |
249 volumes_str = volumes_str[0:tool_directory_index] + volumes_str[end_index:len(volumes_str)] | |
250 return volumes_str | |
251 | |
252 | |
253 class DockerContainer(Container, HasDockerLikeVolumes): | |
254 | |
255 container_type = DOCKER_CONTAINER_TYPE | |
256 | |
257 @property | |
258 def docker_host_props(self): | |
259 docker_host_props = dict( | |
260 docker_cmd=self.prop("cmd", docker_util.DEFAULT_DOCKER_COMMAND), | |
261 sudo=asbool(self.prop("sudo", docker_util.DEFAULT_SUDO)), | |
262 sudo_cmd=self.prop("sudo_cmd", docker_util.DEFAULT_SUDO_COMMAND), | |
263 host=self.prop("host", docker_util.DEFAULT_HOST), | |
264 ) | |
265 return docker_host_props | |
266 | |
267 @property | |
268 def connection_configuration(self): | |
269 return self.docker_host_props | |
270 | |
271 def build_pull_command(self): | |
272 return docker_util.build_pull_command(self.container_id, **self.docker_host_props) | |
273 | |
274 def containerize_command(self, command): | |
275 env_directives = [] | |
276 for pass_through_var in self.tool_info.env_pass_through: | |
277 env_directives.append(f'"{pass_through_var}=${pass_through_var}"') | |
278 | |
279 # Allow destinations to explicitly set environment variables just for | |
280 # docker container. Better approach is to set for destination and then | |
281 # pass through only what tool needs however. (See todo in ToolInfo.) | |
282 for key, value in self.destination_info.items(): | |
283 if key.startswith("docker_env_"): | |
284 env = key[len("docker_env_"):] | |
285 env_directives.append(f'"{env}={value}"') | |
286 | |
287 working_directory = self.job_info.working_directory | |
288 if not working_directory: | |
289 raise Exception("Cannot containerize command [%s] without defined working directory." % working_directory) | |
290 | |
291 volumes_raw = self._expand_volume_str(self.destination_info.get("docker_volumes", "$defaults")) | |
292 preprocessed_volumes_list = preprocess_volumes(volumes_raw, self.container_type) | |
293 # TODO: Remove redundant volumes... | |
294 volumes = [DockerVolume.from_str(v) for v in preprocessed_volumes_list] | |
295 # If a tool definitely has a temp directory available set it to /tmp in container for compat. | |
296 # with CWL. This is part of that spec and should make it easier to share containers between CWL | |
297 # and Galaxy. | |
298 if self.job_info.tmp_directory is not None: | |
299 volumes.append(DockerVolume.from_str("%s:/tmp:rw" % self.job_info.tmp_directory)) | |
300 else: | |
301 volumes.append(DockerVolume.from_str("$_GALAXY_JOB_TMP_DIR:$_GALAXY_JOB_TMP_DIR:rw")) | |
302 volumes_from = self.destination_info.get("docker_volumes_from", docker_util.DEFAULT_VOLUMES_FROM) | |
303 docker_host_props = self.docker_host_props | |
304 | |
305 cached_image_file = self.__get_cached_image_file() | |
306 if not cached_image_file: | |
307 # TODO: Add option to cache it once here and create cached_image_file. | |
308 cache_command = docker_util.build_docker_cache_command(self.container_id, **docker_host_props) | |
309 else: | |
310 cache_command = self.__cache_from_file_command(cached_image_file, docker_host_props) | |
311 run_command = docker_util.build_docker_run_command( | |
312 command, | |
313 self.container_id, | |
314 volumes=volumes, | |
315 volumes_from=volumes_from, | |
316 env_directives=env_directives, | |
317 working_directory=working_directory, | |
318 net=self.prop("net", None), # By default, docker instance has networking disabled | |
319 auto_rm=asbool(self.prop("auto_rm", docker_util.DEFAULT_AUTO_REMOVE)), | |
320 set_user=self.prop("set_user", docker_util.DEFAULT_SET_USER), | |
321 run_extra_arguments=self.prop("run_extra_arguments", docker_util.DEFAULT_RUN_EXTRA_ARGUMENTS), | |
322 guest_ports=self.tool_info.guest_ports, | |
323 container_name=self.container_name, | |
324 **docker_host_props | |
325 ) | |
326 kill_command = docker_util.build_docker_simple_command("kill", container_name=self.container_name, **docker_host_props) | |
327 # Suppress standard error below in the kill command because it can cause jobs that otherwise would work | |
328 # to fail. Likely, in these cases the container has been stopped normally and so cannot be stopped again. | |
329 # A less hacky approach might be to check if the container is running first before trying to kill. | |
330 # https://stackoverflow.com/questions/34228864/stop-and-delete-docker-container-if-its-running | |
331 # Standard error is: | |
332 # Error response from daemon: Cannot kill container: 2b0b961527574ebc873256b481bbe72e: No such container: 2b0b961527574ebc873256b481bbe72e | |
333 return """ | |
334 _on_exit() {{ | |
335 {} &> /dev/null | |
336 }} | |
337 trap _on_exit 0 | |
338 {}\n{}""".format(kill_command, cache_command, run_command) | |
339 | |
340 def __cache_from_file_command(self, cached_image_file, docker_host_props): | |
341 images_cmd = docker_util.build_docker_images_command(truncate=False, **docker_host_props) | |
342 load_cmd = docker_util.build_docker_load_command(**docker_host_props) | |
343 | |
344 return string.Template(LOAD_CACHED_IMAGE_COMMAND_TEMPLATE).safe_substitute( | |
345 cached_image_file=cached_image_file, | |
346 images_cmd=images_cmd, | |
347 load_cmd=load_cmd | |
348 ) | |
349 | |
350 def __get_cached_image_file(self): | |
351 container_id = self.container_id | |
352 cache_directory = os.path.abspath(self.__get_destination_overridable_property("container_image_cache_path")) | |
353 cache_path = docker_cache_path(cache_directory, container_id) | |
354 return cache_path if os.path.exists(cache_path) else None | |
355 | |
356 def __get_destination_overridable_property(self, name): | |
357 prop_name = "docker_%s" % name | |
358 if prop_name in self.destination_info: | |
359 return self.destination_info[prop_name] | |
360 else: | |
361 return getattr(self.app_info, name) | |
362 | |
363 | |
364 def docker_cache_path(cache_directory, container_id): | |
365 file_container_id = container_id.replace("/", "_slash_") | |
366 cache_file_name = "docker_%s.tar" % file_container_id | |
367 return os.path.join(cache_directory, cache_file_name) | |
368 | |
369 | |
370 class SingularityContainer(Container, HasDockerLikeVolumes): | |
371 | |
372 container_type = SINGULARITY_CONTAINER_TYPE | |
373 | |
374 def get_singularity_target_kwds(self): | |
375 return dict( | |
376 singularity_cmd=self.prop("cmd", singularity_util.DEFAULT_SINGULARITY_COMMAND), | |
377 sudo=asbool(self.prop("sudo", singularity_util.DEFAULT_SUDO)), | |
378 sudo_cmd=self.prop("sudo_cmd", singularity_util.DEFAULT_SUDO_COMMAND), | |
379 ) | |
380 | |
381 @property | |
382 def connection_configuration(self): | |
383 return self.get_singularity_target_kwds() | |
384 | |
385 def build_mulled_singularity_pull_command(self, cache_directory, namespace="biocontainers"): | |
386 return singularity_util.pull_mulled_singularity_command( | |
387 docker_image_identifier=self.container_id, | |
388 cache_directory=cache_directory, | |
389 namespace=namespace, | |
390 **self.get_singularity_target_kwds() | |
391 ) | |
392 | |
393 def containerize_command(self, command): | |
394 | |
395 env = [] | |
396 for pass_through_var in self.tool_info.env_pass_through: | |
397 env.append((pass_through_var, "$%s" % pass_through_var)) | |
398 | |
399 # Allow destinations to explicitly set environment variables just for | |
400 # docker container. Better approach is to set for destination and then | |
401 # pass through only what tool needs however. (See todo in ToolInfo.) | |
402 for key, value in self.destination_info.items(): | |
403 if key.startswith("singularity_env_"): | |
404 real_key = key[len("singularity_env_"):] | |
405 env.append((real_key, value)) | |
406 | |
407 working_directory = self.job_info.working_directory | |
408 if not working_directory: | |
409 raise Exception("Cannot containerize command [%s] without defined working directory." % working_directory) | |
410 | |
411 volumes_raw = self._expand_volume_str(self.destination_info.get("singularity_volumes", "$defaults")) | |
412 preprocessed_volumes_list = preprocess_volumes(volumes_raw, self.container_type) | |
413 volumes = [DockerVolume.from_str(v) for v in preprocessed_volumes_list] | |
414 | |
415 run_command = singularity_util.build_singularity_run_command( | |
416 command, | |
417 self.container_id, | |
418 volumes=volumes, | |
419 env=env, | |
420 working_directory=working_directory, | |
421 run_extra_arguments=self.prop("run_extra_arguments", singularity_util.DEFAULT_RUN_EXTRA_ARGUMENTS), | |
422 guest_ports=self.tool_info.guest_ports, | |
423 container_name=self.container_name, | |
424 **self.get_singularity_target_kwds() | |
425 ) | |
426 return run_command | |
427 | |
428 | |
429 CONTAINER_CLASSES = dict( | |
430 docker=DockerContainer, | |
431 singularity=SingularityContainer, | |
432 ) | |
433 | |
434 | |
435 class NullContainer: | |
436 | |
437 def __init__(self): | |
438 pass | |
439 | |
440 def __bool__(self): | |
441 return False | |
442 __nonzero__ = __bool__ | |
443 | |
444 | |
445 NULL_CONTAINER = NullContainer() |