Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/cwltool/job.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d30785e31577 |
---|---|
1 from __future__ import absolute_import | |
2 | |
3 import datetime | |
4 import functools | |
5 import itertools | |
6 import logging | |
7 import threading | |
8 import os | |
9 import re | |
10 import shutil | |
11 import stat | |
12 import sys | |
13 import tempfile | |
14 import time | |
15 import uuid | |
16 from abc import ABCMeta, abstractmethod | |
17 from io import IOBase, open # pylint: disable=redefined-builtin | |
18 from threading import Timer | |
19 from typing import (IO, Any, AnyStr, Callable, Dict, Iterable, List, Tuple, | |
20 MutableMapping, MutableSequence, Optional, Union, cast) | |
21 | |
22 import psutil | |
23 import shellescape | |
24 from prov.model import PROV | |
25 from schema_salad.sourceline import SourceLine | |
26 from six import PY2, with_metaclass | |
27 from future.utils import raise_from | |
28 from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import | |
29 Text) | |
30 | |
31 from .builder import Builder, HasReqsHints # pylint: disable=unused-import | |
32 from .context import RuntimeContext # pylint: disable=unused-import | |
33 from .context import getdefault | |
34 from .errors import WorkflowException | |
35 from .expression import JSON | |
36 from .loghandler import _logger | |
37 from .pathmapper import (MapperEnt, PathMapper, # pylint: disable=unused-import | |
38 ensure_writable, ensure_non_writable) | |
39 from .process import UnsupportedRequirement, stage_files | |
40 from .secrets import SecretStore # pylint: disable=unused-import | |
41 from .utils import (DEFAULT_TMP_PREFIX, Directory, bytes2str_in_dicts, | |
42 copytree_with_merge, json_dump, json_dumps, onWindows, | |
43 processes_to_kill, subprocess) | |
44 | |
45 if TYPE_CHECKING: | |
46 from .provenance import ProvenanceProfile # pylint: disable=unused-import | |
47 needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""") | |
48 | |
49 FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1" | |
50 | |
51 SHELL_COMMAND_TEMPLATE = u"""#!/bin/bash | |
52 python "run_job.py" "job.json" | |
53 """ | |
54 | |
55 PYTHON_RUN_SCRIPT = u""" | |
56 import json | |
57 import os | |
58 import sys | |
59 if os.name == 'posix': | |
60 try: | |
61 import subprocess32 as subprocess # type: ignore | |
62 except Exception: | |
63 import subprocess | |
64 else: | |
65 import subprocess # type: ignore | |
66 | |
67 with open(sys.argv[1], "r") as f: | |
68 popen_description = json.load(f) | |
69 commands = popen_description["commands"] | |
70 cwd = popen_description["cwd"] | |
71 env = popen_description["env"] | |
72 env["PATH"] = os.environ.get("PATH") | |
73 stdin_path = popen_description["stdin_path"] | |
74 stdout_path = popen_description["stdout_path"] | |
75 stderr_path = popen_description["stderr_path"] | |
76 if stdin_path is not None: | |
77 stdin = open(stdin_path, "rb") | |
78 else: | |
79 stdin = subprocess.PIPE | |
80 if stdout_path is not None: | |
81 stdout = open(stdout_path, "wb") | |
82 else: | |
83 stdout = sys.stderr | |
84 if stderr_path is not None: | |
85 stderr = open(stderr_path, "wb") | |
86 else: | |
87 stderr = sys.stderr | |
88 if os.name == 'nt': | |
89 close_fds = False | |
90 for key, value in env.items(): | |
91 env[key] = str(value) | |
92 else: | |
93 close_fds = True | |
94 sp = subprocess.Popen(commands, | |
95 shell=False, | |
96 close_fds=close_fds, | |
97 stdin=stdin, | |
98 stdout=stdout, | |
99 stderr=stderr, | |
100 env=env, | |
101 cwd=cwd) | |
102 if sp.stdin: | |
103 sp.stdin.close() | |
104 rcode = sp.wait() | |
105 if stdin is not subprocess.PIPE: | |
106 stdin.close() | |
107 if stdout is not sys.stderr: | |
108 stdout.close() | |
109 if stderr is not sys.stderr: | |
110 stderr.close() | |
111 sys.exit(rcode) | |
112 """ | |
113 | |
114 | |
115 def deref_links(outputs): # type: (Any) -> None | |
116 if isinstance(outputs, MutableMapping): | |
117 if outputs.get("class") == "File": | |
118 st = os.lstat(outputs["path"]) | |
119 if stat.S_ISLNK(st.st_mode): | |
120 outputs["basename"] = os.path.basename(outputs["path"]) | |
121 outputs["path"] = os.readlink(outputs["path"]) | |
122 else: | |
123 for v in outputs.values(): | |
124 deref_links(v) | |
125 if isinstance(outputs, MutableSequence): | |
126 for output in outputs: | |
127 deref_links(output) | |
128 | |
129 | |
130 def relink_initialworkdir(pathmapper, # type: PathMapper | |
131 host_outdir, # type: Text | |
132 container_outdir, # type: Text | |
133 inplace_update=False # type: bool | |
134 ): # type: (...) -> None | |
135 for _, vol in pathmapper.items(): | |
136 if not vol.staged: | |
137 continue | |
138 | |
139 if (vol.type in ("File", "Directory") or ( | |
140 inplace_update and vol.type in | |
141 ("WritableFile", "WritableDirectory"))): | |
142 if not vol.target.startswith(container_outdir): | |
143 # this is an input file written outside of the working | |
144 # directory, so therefor ineligable for being an output file. | |
145 # Thus, none of our business | |
146 continue | |
147 host_outdir_tgt = os.path.join( | |
148 host_outdir, vol.target[len(container_outdir) + 1:]) | |
149 if os.path.islink(host_outdir_tgt) \ | |
150 or os.path.isfile(host_outdir_tgt): | |
151 os.remove(host_outdir_tgt) | |
152 elif os.path.isdir(host_outdir_tgt) \ | |
153 and not vol.resolved.startswith("_:"): | |
154 shutil.rmtree(host_outdir_tgt) | |
155 if onWindows(): | |
156 # If this becomes a big issue for someone then we could | |
157 # refactor the code to process output from a running container | |
158 # and avoid all the extra IO below | |
159 if vol.type in ("File", "WritableFile"): | |
160 shutil.copy(vol.resolved, host_outdir_tgt) | |
161 elif vol.type in ("Directory", "WritableDirectory"): | |
162 copytree_with_merge(vol.resolved, host_outdir_tgt) | |
163 elif not vol.resolved.startswith("_:"): | |
164 os.symlink(vol.resolved, host_outdir_tgt) | |
165 | |
166 | |
167 class JobBase(with_metaclass(ABCMeta, HasReqsHints)): | |
168 def __init__(self, | |
169 builder, # type: Builder | |
170 joborder, # type: JSON | |
171 make_path_mapper, # type: Callable[..., PathMapper] | |
172 requirements, # type: List[Dict[Text, Text]] | |
173 hints, # type: List[Dict[Text, Text]] | |
174 name, # type: Text | |
175 ): # type: (...) -> None | |
176 """Initialize the job object.""" | |
177 self.builder = builder | |
178 self.joborder = joborder | |
179 self.stdin = None # type: Optional[Text] | |
180 self.stderr = None # type: Optional[Text] | |
181 self.stdout = None # type: Optional[Text] | |
182 self.successCodes = [] # type: Iterable[int] | |
183 self.temporaryFailCodes = [] # type: Iterable[int] | |
184 self.permanentFailCodes = [] # type: Iterable[int] | |
185 self.requirements = requirements | |
186 self.hints = hints | |
187 self.name = name | |
188 self.command_line = [] # type: List[Text] | |
189 self.pathmapper = PathMapper([], u"", u"") | |
190 self.make_path_mapper = make_path_mapper | |
191 self.generatemapper = None # type: Optional[PathMapper] | |
192 | |
193 # set in CommandLineTool.job(i) | |
194 self.collect_outputs = cast(Callable[[Text, int], MutableMapping[Text, Any]], | |
195 None) # type: Union[Callable[[Text, int], MutableMapping[Text, Any]], functools.partial[MutableMapping[Text, Any]]] | |
196 self.output_callback = cast(Callable[[Any, Any], Any], None) | |
197 self.outdir = u"" | |
198 self.tmpdir = u"" | |
199 | |
200 self.environment = {} # type: MutableMapping[Text, Text] | |
201 self.generatefiles = {"class": "Directory", "listing": [], "basename": ""} # type: Directory | |
202 self.stagedir = None # type: Optional[Text] | |
203 self.inplace_update = False | |
204 self.prov_obj = None # type: Optional[ProvenanceProfile] | |
205 self.parent_wf = None # type: Optional[ProvenanceProfile] | |
206 self.timelimit = None # type: Optional[int] | |
207 self.networkaccess = False # type: bool | |
208 | |
209 def __repr__(self): # type: () -> str | |
210 """Represent this Job object.""" | |
211 return "CommandLineJob(%s)" % self.name | |
212 | |
213 @abstractmethod | |
214 def run(self, | |
215 runtimeContext, # type: RuntimeContext | |
216 tmpdir_lock=None # type: Optional[threading.Lock] | |
217 ): # type: (...) -> None | |
218 pass | |
219 | |
220 def _setup(self, runtimeContext): # type: (RuntimeContext) -> None | |
221 if not os.path.exists(self.outdir): | |
222 os.makedirs(self.outdir) | |
223 | |
224 for knownfile in self.pathmapper.files(): | |
225 p = self.pathmapper.mapper(knownfile) | |
226 if p.type == "File" and not os.path.isfile(p[0]) and p.staged: | |
227 raise WorkflowException( | |
228 u"Input file %s (at %s) not found or is not a regular " | |
229 "file." % (knownfile, self.pathmapper.mapper(knownfile)[0])) | |
230 | |
231 if 'listing' in self.generatefiles: | |
232 runtimeContext = runtimeContext.copy() | |
233 runtimeContext.outdir = self.outdir | |
234 self.generatemapper = self.make_path_mapper( | |
235 cast(List[Any], self.generatefiles["listing"]), | |
236 self.builder.outdir, runtimeContext, False) | |
237 if _logger.isEnabledFor(logging.DEBUG): | |
238 _logger.debug( | |
239 u"[job %s] initial work dir %s", self.name, | |
240 json_dumps({p: self.generatemapper.mapper(p) | |
241 for p in self.generatemapper.files()}, indent=4)) | |
242 | |
243 def _execute(self, | |
244 runtime, # type: List[Text] | |
245 env, # type: MutableMapping[Text, Text] | |
246 runtimeContext, # type: RuntimeContext | |
247 monitor_function=None, # type: Optional[Callable[[subprocess.Popen], None]] | |
248 ): # type: (...) -> None | |
249 | |
250 scr, _ = self.get_requirement("ShellCommandRequirement") | |
251 | |
252 shouldquote = needs_shell_quoting_re.search # type: Callable[[Any], Any] | |
253 if scr is not None: | |
254 shouldquote = lambda x: False | |
255 | |
256 _logger.info(u"[job %s] %s$ %s%s%s%s", | |
257 self.name, | |
258 self.outdir, | |
259 " \\\n ".join([shellescape.quote(Text(arg)) if shouldquote(Text(arg)) else Text(arg) for arg in | |
260 (runtime + self.command_line)]), | |
261 u' < %s' % self.stdin if self.stdin else '', | |
262 u' > %s' % os.path.join(self.outdir, self.stdout) if self.stdout else '', | |
263 u' 2> %s' % os.path.join(self.outdir, self.stderr) if self.stderr else '') | |
264 if self.joborder is not None and runtimeContext.research_obj is not None: | |
265 job_order = self.joborder | |
266 if runtimeContext.process_run_id is not None \ | |
267 and runtimeContext.prov_obj is not None and isinstance(job_order, (list, dict)): | |
268 runtimeContext.prov_obj.used_artefacts( | |
269 job_order, runtimeContext.process_run_id, str(self.name)) | |
270 else: | |
271 _logger.warning("research_obj set but one of process_run_id " | |
272 "or prov_obj is missing from runtimeContext: " | |
273 "{}". format(runtimeContext)) | |
274 outputs = {} # type: MutableMapping[Text,Any] | |
275 try: | |
276 stdin_path = None | |
277 if self.stdin is not None: | |
278 rmap = self.pathmapper.reversemap(self.stdin) | |
279 if rmap is None: | |
280 raise WorkflowException( | |
281 "{} missing from pathmapper".format(self.stdin)) | |
282 else: | |
283 stdin_path = rmap[1] | |
284 | |
285 stderr_path = None | |
286 if self.stderr is not None: | |
287 abserr = os.path.join(self.outdir, self.stderr) | |
288 dnerr = os.path.dirname(abserr) | |
289 if dnerr and not os.path.exists(dnerr): | |
290 os.makedirs(dnerr) | |
291 stderr_path = abserr | |
292 | |
293 stdout_path = None | |
294 if self.stdout is not None: | |
295 absout = os.path.join(self.outdir, self.stdout) | |
296 dnout = os.path.dirname(absout) | |
297 if dnout and not os.path.exists(dnout): | |
298 os.makedirs(dnout) | |
299 stdout_path = absout | |
300 | |
301 commands = [Text(x) for x in runtime + self.command_line] | |
302 if runtimeContext.secret_store is not None: | |
303 commands = runtimeContext.secret_store.retrieve(commands) | |
304 env = runtimeContext.secret_store.retrieve(env) | |
305 | |
306 job_script_contents = None # type: Optional[Text] | |
307 builder = getattr(self, "builder", None) # type: Builder | |
308 if builder is not None: | |
309 job_script_contents = builder.build_job_script(commands) | |
310 rcode = _job_popen( | |
311 commands, | |
312 stdin_path=stdin_path, | |
313 stdout_path=stdout_path, | |
314 stderr_path=stderr_path, | |
315 env=env, | |
316 cwd=self.outdir, | |
317 job_dir=tempfile.mkdtemp(prefix=getdefault(runtimeContext.tmp_outdir_prefix, DEFAULT_TMP_PREFIX)), | |
318 job_script_contents=job_script_contents, | |
319 timelimit=self.timelimit, | |
320 name=self.name, | |
321 monitor_function=monitor_function | |
322 ) | |
323 | |
324 if rcode in self.successCodes: | |
325 processStatus = "success" | |
326 elif rcode in self.temporaryFailCodes: | |
327 processStatus = "temporaryFail" | |
328 elif rcode in self.permanentFailCodes: | |
329 processStatus = "permanentFail" | |
330 elif rcode == 0: | |
331 processStatus = "success" | |
332 else: | |
333 processStatus = "permanentFail" | |
334 | |
335 if 'listing' in self.generatefiles: | |
336 if self.generatemapper: | |
337 relink_initialworkdir( | |
338 self.generatemapper, self.outdir, self.builder.outdir, | |
339 inplace_update=self.inplace_update) | |
340 else: | |
341 raise ValueError("'lsiting' in self.generatefiles but no " | |
342 "generatemapper was setup.") | |
343 | |
344 outputs = self.collect_outputs(self.outdir, rcode) | |
345 outputs = bytes2str_in_dicts(outputs) # type: ignore | |
346 except OSError as e: | |
347 if e.errno == 2: | |
348 if runtime: | |
349 _logger.error(u"'%s' not found: %s", runtime[0], Text(e)) | |
350 else: | |
351 _logger.error(u"'%s' not found: %s", self.command_line[0], Text(e)) | |
352 else: | |
353 _logger.exception(u"Exception while running job") | |
354 processStatus = "permanentFail" | |
355 except WorkflowException as err: | |
356 _logger.error(u"[job %s] Job error:\n%s", self.name, Text(err)) | |
357 processStatus = "permanentFail" | |
358 except Exception as e: | |
359 _logger.exception(u"Exception while running job") | |
360 processStatus = "permanentFail" | |
361 if runtimeContext.research_obj is not None \ | |
362 and self.prov_obj is not None \ | |
363 and runtimeContext.process_run_id is not None: | |
364 # creating entities for the outputs produced by each step (in the provenance document) | |
365 self.prov_obj.record_process_end(str(self.name), runtimeContext.process_run_id, | |
366 outputs, datetime.datetime.now()) | |
367 if processStatus != "success": | |
368 _logger.warning(u"[job %s] completed %s", self.name, processStatus) | |
369 else: | |
370 _logger.info(u"[job %s] completed %s", self.name, processStatus) | |
371 | |
372 if _logger.isEnabledFor(logging.DEBUG): | |
373 _logger.debug(u"[job %s] %s", self.name, | |
374 json_dumps(outputs, indent=4)) | |
375 | |
376 if self.generatemapper is not None and runtimeContext.secret_store is not None: | |
377 # Delete any runtime-generated files containing secrets. | |
378 for _, p in self.generatemapper.items(): | |
379 if p.type == "CreateFile": | |
380 if runtimeContext.secret_store.has_secret(p.resolved): | |
381 host_outdir = self.outdir | |
382 container_outdir = self.builder.outdir | |
383 host_outdir_tgt = p.target | |
384 if p.target.startswith(container_outdir + "/"): | |
385 host_outdir_tgt = os.path.join( | |
386 host_outdir, p.target[len(container_outdir) + 1:]) | |
387 os.remove(host_outdir_tgt) | |
388 | |
389 if runtimeContext.workflow_eval_lock is None: | |
390 raise WorkflowException("runtimeContext.workflow_eval_lock must not be None") | |
391 | |
392 with runtimeContext.workflow_eval_lock: | |
393 self.output_callback(outputs, processStatus) | |
394 | |
395 if self.stagedir is not None and os.path.exists(self.stagedir): | |
396 _logger.debug(u"[job %s] Removing input staging directory %s", self.name, self.stagedir) | |
397 shutil.rmtree(self.stagedir, True) | |
398 | |
399 if runtimeContext.rm_tmpdir: | |
400 _logger.debug(u"[job %s] Removing temporary directory %s", self.name, self.tmpdir) | |
401 shutil.rmtree(self.tmpdir, True) | |
402 | |
403 def process_monitor(self, sproc): # type: (subprocess.Popen) -> None | |
404 monitor = psutil.Process(sproc.pid) | |
405 memory_usage = [None] # Value must be list rather than integer to utilise pass-by-reference in python | |
406 | |
407 def get_tree_mem_usage(memory_usage): # type: (List[int]) -> None | |
408 children = monitor.children() | |
409 rss = monitor.memory_info().rss | |
410 while len(children): | |
411 rss += sum([process.memory_info().rss for process in children]) | |
412 children = list(itertools.chain(*[process.children() for process in children])) | |
413 if memory_usage[0] is None or rss > memory_usage[0]: | |
414 memory_usage[0] = rss | |
415 | |
416 mem_tm = Timer(interval=1, function=get_tree_mem_usage, args=(memory_usage,)) | |
417 mem_tm.daemon = True | |
418 mem_tm.start() | |
419 sproc.wait() | |
420 mem_tm.cancel() | |
421 if memory_usage[0] is not None: | |
422 _logger.info(u"[job %s] Max memory used: %iMiB", self.name, | |
423 round(memory_usage[0] / (2 ** 20))) | |
424 else: | |
425 _logger.debug(u"Could not collect memory usage, job ended before monitoring began.") | |
426 | |
427 | |
428 class CommandLineJob(JobBase): | |
429 def run(self, | |
430 runtimeContext, # type: RuntimeContext | |
431 tmpdir_lock=None # type: Optional[threading.Lock] | |
432 ): # type: (...) -> None | |
433 | |
434 if tmpdir_lock: | |
435 with tmpdir_lock: | |
436 if not os.path.exists(self.tmpdir): | |
437 os.makedirs(self.tmpdir) | |
438 else: | |
439 if not os.path.exists(self.tmpdir): | |
440 os.makedirs(self.tmpdir) | |
441 | |
442 self._setup(runtimeContext) | |
443 | |
444 env = self.environment | |
445 vars_to_preserve = runtimeContext.preserve_environment | |
446 if runtimeContext.preserve_entire_environment is not False: | |
447 vars_to_preserve = os.environ | |
448 if vars_to_preserve: | |
449 for key, value in os.environ.items(): | |
450 if key in vars_to_preserve and key not in env: | |
451 # On Windows, subprocess env can't handle unicode. | |
452 env[key] = str(value) if onWindows() else value | |
453 env["HOME"] = str(self.outdir) if onWindows() else self.outdir | |
454 env["TMPDIR"] = str(self.tmpdir) if onWindows() else self.tmpdir | |
455 if "PATH" not in env: | |
456 env["PATH"] = str(os.environ["PATH"]) if onWindows() else os.environ["PATH"] | |
457 if "SYSTEMROOT" not in env and "SYSTEMROOT" in os.environ: | |
458 env["SYSTEMROOT"] = str(os.environ["SYSTEMROOT"]) if onWindows() \ | |
459 else os.environ["SYSTEMROOT"] | |
460 | |
461 stage_files(self.pathmapper, ignore_writable=True, symlink=True, | |
462 secret_store=runtimeContext.secret_store) | |
463 if self.generatemapper is not None: | |
464 stage_files(self.generatemapper, ignore_writable=self.inplace_update, | |
465 symlink=True, secret_store=runtimeContext.secret_store) | |
466 relink_initialworkdir( | |
467 self.generatemapper, self.outdir, self.builder.outdir, | |
468 inplace_update=self.inplace_update) | |
469 | |
470 monitor_function = functools.partial(self.process_monitor) | |
471 | |
472 self._execute([], env, runtimeContext, monitor_function) | |
473 | |
474 | |
475 CONTROL_CODE_RE = r'\x1b\[[0-9;]*[a-zA-Z]' | |
476 | |
477 | |
478 class ContainerCommandLineJob(with_metaclass(ABCMeta, JobBase)): | |
479 """Commandline job using containers.""" | |
480 | |
481 @abstractmethod | |
482 def get_from_requirements(self, | |
483 r, # type: Dict[Text, Text] | |
484 pull_image, # type: bool | |
485 force_pull=False, # type: bool | |
486 tmp_outdir_prefix=DEFAULT_TMP_PREFIX # type: Text | |
487 ): # type: (...) -> Optional[Text] | |
488 pass | |
489 | |
490 @abstractmethod | |
491 def create_runtime(self, | |
492 env, # type: MutableMapping[Text, Text] | |
493 runtime_context # type: RuntimeContext | |
494 ): # type: (...) -> Tuple[List[Text], Optional[Text]] | |
495 """Return the list of commands to run the selected container engine.""" | |
496 pass | |
497 | |
498 @staticmethod | |
499 @abstractmethod | |
500 def append_volume(runtime, source, target, writable=False): | |
501 # type: (List[Text], Text, Text, bool) -> None | |
502 """Add binding arguments to the runtime list.""" | |
503 pass | |
504 | |
505 @abstractmethod | |
506 def add_file_or_directory_volume(self, | |
507 runtime, # type: List[Text] | |
508 volume, # type: MapperEnt | |
509 host_outdir_tgt # type: Optional[Text] | |
510 ): # type: (...) -> None | |
511 """Append volume a file/dir mapping to the runtime option list.""" | |
512 pass | |
513 | |
514 @abstractmethod | |
515 def add_writable_file_volume(self, | |
516 runtime, # type: List[Text] | |
517 volume, # type: MapperEnt | |
518 host_outdir_tgt, # type: Optional[Text] | |
519 tmpdir_prefix # type: Text | |
520 ): # type: (...) -> None | |
521 """Append a writable file mapping to the runtime option list.""" | |
522 pass | |
523 | |
524 @abstractmethod | |
525 def add_writable_directory_volume(self, | |
526 runtime, # type: List[Text] | |
527 volume, # type: MapperEnt | |
528 host_outdir_tgt, # type: Optional[Text] | |
529 tmpdir_prefix # type: Text | |
530 ): # type: (...) -> None | |
531 """Append a writable directory mapping to the runtime option list.""" | |
532 pass | |
533 | |
534 def create_file_and_add_volume(self, | |
535 runtime, # type: List[Text] | |
536 volume, # type: MapperEnt | |
537 host_outdir_tgt, # type: Optional[Text] | |
538 secret_store, # type: Optional[SecretStore] | |
539 tmpdir_prefix # type: Text | |
540 ): # type: (...) -> Text | |
541 """Create the file and add a mapping.""" | |
542 if not host_outdir_tgt: | |
543 tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) | |
544 new_file = os.path.join( | |
545 tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir), | |
546 os.path.basename(volume.target)) | |
547 writable = True if volume.type == "CreateWritableFile" else False | |
548 if secret_store: | |
549 contents = secret_store.retrieve(volume.resolved) | |
550 else: | |
551 contents = volume.resolved | |
552 dirname = os.path.dirname(host_outdir_tgt or new_file) | |
553 if not os.path.exists(dirname): | |
554 os.makedirs(dirname) | |
555 with open(host_outdir_tgt or new_file, "wb") as file_literal: | |
556 file_literal.write(contents.encode("utf-8")) | |
557 if not host_outdir_tgt: | |
558 self.append_volume(runtime, new_file, volume.target, | |
559 writable=writable) | |
560 if writable: | |
561 ensure_writable(host_outdir_tgt or new_file) | |
562 else: | |
563 ensure_non_writable(host_outdir_tgt or new_file) | |
564 return host_outdir_tgt or new_file | |
565 | |
566 def add_volumes(self, | |
567 pathmapper, # type: PathMapper | |
568 runtime, # type: List[Text] | |
569 tmpdir_prefix, # type: Text | |
570 secret_store=None, # type: Optional[SecretStore] | |
571 any_path_okay=False # type: bool | |
572 ): # type: (...) -> None | |
573 """Append volume mappings to the runtime option list.""" | |
574 container_outdir = self.builder.outdir | |
575 for key, vol in (itm for itm in pathmapper.items() if itm[1].staged): | |
576 host_outdir_tgt = None # type: Optional[Text] | |
577 if vol.target.startswith(container_outdir + "/"): | |
578 host_outdir_tgt = os.path.join( | |
579 self.outdir, vol.target[len(container_outdir) + 1:]) | |
580 if not host_outdir_tgt and not any_path_okay: | |
581 raise WorkflowException( | |
582 "No mandatory DockerRequirement, yet path is outside " | |
583 "the designated output directory, also know as " | |
584 "$(runtime.outdir): {}".format(vol)) | |
585 if vol.type in ("File", "Directory"): | |
586 self.add_file_or_directory_volume( | |
587 runtime, vol, host_outdir_tgt) | |
588 elif vol.type == "WritableFile": | |
589 self.add_writable_file_volume( | |
590 runtime, vol, host_outdir_tgt, tmpdir_prefix) | |
591 elif vol.type == "WritableDirectory": | |
592 self.add_writable_directory_volume( | |
593 runtime, vol, host_outdir_tgt, tmpdir_prefix) | |
594 elif vol.type in ["CreateFile", "CreateWritableFile"]: | |
595 new_path = self.create_file_and_add_volume( | |
596 runtime, vol, host_outdir_tgt, secret_store, tmpdir_prefix) | |
597 pathmapper.update( | |
598 key, new_path, vol.target, vol.type, vol.staged) | |
599 | |
600 def run(self, | |
601 runtimeContext, # type: RuntimeContext | |
602 tmpdir_lock=None # type: Optional[threading.Lock] | |
603 ): # type: (...) -> None | |
604 if tmpdir_lock: | |
605 with tmpdir_lock: | |
606 if not os.path.exists(self.tmpdir): | |
607 os.makedirs(self.tmpdir) | |
608 else: | |
609 if not os.path.exists(self.tmpdir): | |
610 os.makedirs(self.tmpdir) | |
611 | |
612 (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") | |
613 self.prov_obj = runtimeContext.prov_obj | |
614 img_id = None | |
615 env = cast(MutableMapping[Text, Text], os.environ) | |
616 user_space_docker_cmd = runtimeContext.user_space_docker_cmd | |
617 if docker_req is not None and user_space_docker_cmd: | |
618 # For user-space docker implementations, a local image name or ID | |
619 # takes precedence over a network pull | |
620 if 'dockerImageId' in docker_req: | |
621 img_id = str(docker_req["dockerImageId"]) | |
622 elif 'dockerPull' in docker_req: | |
623 img_id = str(docker_req["dockerPull"]) | |
624 cmd = [user_space_docker_cmd, "pull", img_id] | |
625 _logger.info(Text(cmd)) | |
626 try: | |
627 subprocess.check_call(cmd, stdout=sys.stderr) | |
628 except OSError: | |
629 raise WorkflowException(SourceLine(docker_req).makeError( | |
630 "Either Docker container {} is not available with " | |
631 "user space docker implementation {} or {} is missing " | |
632 "or broken.".format(img_id, user_space_docker_cmd, | |
633 user_space_docker_cmd))) | |
634 else: | |
635 raise WorkflowException(SourceLine(docker_req).makeError( | |
636 "Docker image must be specified as 'dockerImageId' or " | |
637 "'dockerPull' when using user space implementations of " | |
638 "Docker")) | |
639 else: | |
640 try: | |
641 if docker_req is not None and runtimeContext.use_container: | |
642 img_id = str( | |
643 self.get_from_requirements( | |
644 docker_req, runtimeContext.pull_image, | |
645 getdefault(runtimeContext.force_docker_pull, False), | |
646 getdefault(runtimeContext.tmp_outdir_prefix, DEFAULT_TMP_PREFIX))) | |
647 if img_id is None: | |
648 if self.builder.find_default_container: | |
649 default_container = self.builder.find_default_container() | |
650 if default_container: | |
651 img_id = str(default_container) | |
652 | |
653 if docker_req is not None and img_id is None and runtimeContext.use_container: | |
654 raise Exception("Docker image not available") | |
655 | |
656 if self.prov_obj is not None and img_id is not None \ | |
657 and runtimeContext.process_run_id is not None: | |
658 container_agent = self.prov_obj.document.agent( | |
659 uuid.uuid4().urn, | |
660 {"prov:type": PROV["SoftwareAgent"], | |
661 "cwlprov:image": img_id, | |
662 "prov:label": "Container execution of image %s" % img_id}) | |
663 # FIXME: img_id is not a sha256 id, it might just be "debian:8" | |
664 # img_entity = document.entity("nih:sha-256;%s" % img_id, | |
665 # {"prov:label": "Container image %s" % img_id} ) | |
666 # The image is the plan for this activity-agent association | |
667 # document.wasAssociatedWith(process_run_ID, container_agent, img_entity) | |
668 self.prov_obj.document.wasAssociatedWith( | |
669 runtimeContext.process_run_id, container_agent) | |
670 except Exception as err: | |
671 container = "Singularity" if runtimeContext.singularity else "Docker" | |
672 _logger.debug(u"%s error", container, exc_info=True) | |
673 if docker_is_req: | |
674 raise_from(UnsupportedRequirement( | |
675 "%s is required to run this tool: %s" % (container, Text(err))), err) | |
676 else: | |
677 raise WorkflowException( | |
678 "{0} is not available for this tool, try " | |
679 "--no-container to disable {0}, or install " | |
680 "a user space Docker replacement like uDocker with " | |
681 "--user-space-docker-cmd.: {1}".format(container, err)) | |
682 | |
683 self._setup(runtimeContext) | |
684 (runtime, cidfile) = self.create_runtime(env, runtimeContext) | |
685 runtime.append(Text(img_id)) | |
686 monitor_function = None | |
687 if cidfile: | |
688 monitor_function = functools.partial( | |
689 self.docker_monitor, cidfile, runtimeContext.tmpdir_prefix, | |
690 not bool(runtimeContext.cidfile_dir)) | |
691 elif runtimeContext.user_space_docker_cmd: | |
692 monitor_function = functools.partial(self.process_monitor) | |
693 self._execute(runtime, env, runtimeContext, monitor_function) | |
694 | |
695 def docker_monitor(self, cidfile, tmpdir_prefix, cleanup_cidfile, process): | |
696 # type: (Text, Text, bool, subprocess.Popen) -> None | |
697 """Record memory usage of the running Docker container.""" | |
698 # Todo: consider switching to `docker create` / `docker start` | |
699 # instead of `docker run` as `docker create` outputs the container ID | |
700 # to stdout, but the container is frozen, thus allowing us to start the | |
701 # monitoring process without dealing with the cidfile or too-fast | |
702 # container execution | |
703 cid = None | |
704 while cid is None: | |
705 time.sleep(1) | |
706 if process.returncode is not None: | |
707 if cleanup_cidfile: | |
708 try: | |
709 os.remove(cidfile) | |
710 except OSError as exc: | |
711 _logger.warn("Ignored error cleaning up Docker cidfile: %s", exc) | |
712 return | |
713 try: | |
714 with open(cidfile) as cidhandle: | |
715 cid = cidhandle.readline().strip() | |
716 except (OSError, IOError): | |
717 cid = None | |
718 max_mem = psutil.virtual_memory().total | |
719 tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) | |
720 stats_file = tempfile.NamedTemporaryFile(prefix=tmp_prefix, dir=tmp_dir) | |
721 try: | |
722 with open(stats_file.name, mode="w") as stats_file_handle: | |
723 stats_proc = subprocess.Popen( | |
724 ['docker', 'stats', '--no-trunc', '--format', '{{.MemPerc}}', | |
725 cid], stdout=stats_file_handle, stderr=subprocess.DEVNULL) | |
726 process.wait() | |
727 stats_proc.kill() | |
728 except OSError as exc: | |
729 _logger.warn("Ignored error with docker stats: %s", exc) | |
730 return | |
731 max_mem_percent = 0 | |
732 with open(stats_file.name, mode="r") as stats: | |
733 for line in stats: | |
734 try: | |
735 mem_percent = float(re.sub( | |
736 CONTROL_CODE_RE, '', line).replace('%', '')) | |
737 if mem_percent > max_mem_percent: | |
738 max_mem_percent = mem_percent | |
739 except ValueError: | |
740 break | |
741 _logger.info(u"[job %s] Max memory used: %iMiB", self.name, | |
742 int((max_mem_percent / 100 * max_mem) / (2 ** 20))) | |
743 if cleanup_cidfile: | |
744 os.remove(cidfile) | |
745 | |
746 | |
747 def _job_popen(commands, # type: List[Text] | |
748 stdin_path, # type: Optional[Text] | |
749 stdout_path, # type: Optional[Text] | |
750 stderr_path, # type: Optional[Text] | |
751 env, # type: MutableMapping[AnyStr, AnyStr] | |
752 cwd, # type: Text | |
753 job_dir, # type: Text | |
754 job_script_contents=None, # type: Optional[Text] | |
755 timelimit=None, # type: Optional[int] | |
756 name=None, # type: Optional[Text] | |
757 monitor_function=None # type: Optional[Callable[[subprocess.Popen], None]] | |
758 ): # type: (...) -> int | |
759 | |
760 if job_script_contents is None and not FORCE_SHELLED_POPEN: | |
761 | |
762 stdin = subprocess.PIPE # type: Union[IO[Any], int] | |
763 if stdin_path is not None: | |
764 stdin = open(stdin_path, "rb") | |
765 | |
766 stdout = sys.stderr # type: IO[Any] | |
767 if stdout_path is not None: | |
768 stdout = open(stdout_path, "wb") | |
769 | |
770 stderr = sys.stderr # type: IO[Any] | |
771 if stderr_path is not None: | |
772 stderr = open(stderr_path, "wb") | |
773 | |
774 sproc = subprocess.Popen(commands, | |
775 shell=False, | |
776 close_fds=not onWindows(), | |
777 stdin=stdin, | |
778 stdout=stdout, | |
779 stderr=stderr, | |
780 env=env, | |
781 cwd=cwd) | |
782 processes_to_kill.append(sproc) | |
783 | |
784 if sproc.stdin is not None: | |
785 sproc.stdin.close() | |
786 | |
787 tm = None | |
788 if timelimit is not None and timelimit > 0: | |
789 def terminate(): # type: () -> None | |
790 try: | |
791 _logger.warning( | |
792 u"[job %s] exceeded time limit of %d seconds and will" | |
793 "be terminated", name, timelimit) | |
794 sproc.terminate() | |
795 except OSError: | |
796 pass | |
797 | |
798 tm = Timer(timelimit, terminate) | |
799 tm.daemon = True | |
800 tm.start() | |
801 if monitor_function: | |
802 monitor_function(sproc) | |
803 rcode = sproc.wait() | |
804 | |
805 if tm is not None: | |
806 tm.cancel() | |
807 | |
808 if isinstance(stdin, IOBase): | |
809 stdin.close() | |
810 | |
811 if stdout is not sys.stderr: | |
812 stdout.close() | |
813 | |
814 if stderr is not sys.stderr: | |
815 stderr.close() | |
816 | |
817 return rcode | |
818 else: | |
819 if job_script_contents is None: | |
820 job_script_contents = SHELL_COMMAND_TEMPLATE | |
821 | |
822 env_copy = {} | |
823 key = None # type: Any | |
824 for key in env: | |
825 env_copy[key] = env[key] | |
826 | |
827 job_description = { | |
828 u"commands": commands, | |
829 u"cwd": cwd, | |
830 u"env": env_copy, | |
831 u"stdout_path": stdout_path, | |
832 u"stderr_path": stderr_path, | |
833 u"stdin_path": stdin_path} | |
834 | |
835 if PY2: | |
836 with open(os.path.join(job_dir, "job.json"), mode="wb") as job_file: | |
837 json_dump(job_description, job_file, ensure_ascii=False) | |
838 else: | |
839 with open(os.path.join(job_dir, "job.json"), mode="w", | |
840 encoding='utf-8') as job_file: | |
841 json_dump(job_description, job_file, ensure_ascii=False) | |
842 try: | |
843 job_script = os.path.join(job_dir, "run_job.bash") | |
844 with open(job_script, "wb") as _: | |
845 _.write(job_script_contents.encode('utf-8')) | |
846 job_run = os.path.join(job_dir, "run_job.py") | |
847 with open(job_run, "wb") as _: | |
848 _.write(PYTHON_RUN_SCRIPT.encode('utf-8')) | |
849 sproc = subprocess.Popen( | |
850 ["bash", job_script.encode("utf-8")], | |
851 shell=False, | |
852 cwd=job_dir, | |
853 # The nested script will output the paths to the correct files if they need | |
854 # to be captured. Else just write everything to stderr (same as above). | |
855 stdout=sys.stderr, | |
856 stderr=sys.stderr, | |
857 stdin=subprocess.PIPE, | |
858 ) | |
859 processes_to_kill.append(sproc) | |
860 if sproc.stdin is not None: | |
861 sproc.stdin.close() | |
862 | |
863 rcode = sproc.wait() | |
864 | |
865 return rcode | |
866 finally: | |
867 shutil.rmtree(job_dir) |