comparison planemo/lib/python3.7/site-packages/cwltool/job.py @ 0:d30785e31577 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:18:57 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d30785e31577
1 from __future__ import absolute_import
2
3 import datetime
4 import functools
5 import itertools
6 import logging
7 import threading
8 import os
9 import re
10 import shutil
11 import stat
12 import sys
13 import tempfile
14 import time
15 import uuid
16 from abc import ABCMeta, abstractmethod
17 from io import IOBase, open # pylint: disable=redefined-builtin
18 from threading import Timer
19 from typing import (IO, Any, AnyStr, Callable, Dict, Iterable, List, Tuple,
20 MutableMapping, MutableSequence, Optional, Union, cast)
21
22 import psutil
23 import shellescape
24 from prov.model import PROV
25 from schema_salad.sourceline import SourceLine
26 from six import PY2, with_metaclass
27 from future.utils import raise_from
28 from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import
29 Text)
30
31 from .builder import Builder, HasReqsHints # pylint: disable=unused-import
32 from .context import RuntimeContext # pylint: disable=unused-import
33 from .context import getdefault
34 from .errors import WorkflowException
35 from .expression import JSON
36 from .loghandler import _logger
37 from .pathmapper import (MapperEnt, PathMapper, # pylint: disable=unused-import
38 ensure_writable, ensure_non_writable)
39 from .process import UnsupportedRequirement, stage_files
40 from .secrets import SecretStore # pylint: disable=unused-import
41 from .utils import (DEFAULT_TMP_PREFIX, Directory, bytes2str_in_dicts,
42 copytree_with_merge, json_dump, json_dumps, onWindows,
43 processes_to_kill, subprocess)
44
45 if TYPE_CHECKING:
46 from .provenance import ProvenanceProfile # pylint: disable=unused-import
47 needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""")
48
49 FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1"
50
51 SHELL_COMMAND_TEMPLATE = u"""#!/bin/bash
52 python "run_job.py" "job.json"
53 """
54
55 PYTHON_RUN_SCRIPT = u"""
56 import json
57 import os
58 import sys
59 if os.name == 'posix':
60 try:
61 import subprocess32 as subprocess # type: ignore
62 except Exception:
63 import subprocess
64 else:
65 import subprocess # type: ignore
66
67 with open(sys.argv[1], "r") as f:
68 popen_description = json.load(f)
69 commands = popen_description["commands"]
70 cwd = popen_description["cwd"]
71 env = popen_description["env"]
72 env["PATH"] = os.environ.get("PATH")
73 stdin_path = popen_description["stdin_path"]
74 stdout_path = popen_description["stdout_path"]
75 stderr_path = popen_description["stderr_path"]
76 if stdin_path is not None:
77 stdin = open(stdin_path, "rb")
78 else:
79 stdin = subprocess.PIPE
80 if stdout_path is not None:
81 stdout = open(stdout_path, "wb")
82 else:
83 stdout = sys.stderr
84 if stderr_path is not None:
85 stderr = open(stderr_path, "wb")
86 else:
87 stderr = sys.stderr
88 if os.name == 'nt':
89 close_fds = False
90 for key, value in env.items():
91 env[key] = str(value)
92 else:
93 close_fds = True
94 sp = subprocess.Popen(commands,
95 shell=False,
96 close_fds=close_fds,
97 stdin=stdin,
98 stdout=stdout,
99 stderr=stderr,
100 env=env,
101 cwd=cwd)
102 if sp.stdin:
103 sp.stdin.close()
104 rcode = sp.wait()
105 if stdin is not subprocess.PIPE:
106 stdin.close()
107 if stdout is not sys.stderr:
108 stdout.close()
109 if stderr is not sys.stderr:
110 stderr.close()
111 sys.exit(rcode)
112 """
113
114
115 def deref_links(outputs): # type: (Any) -> None
116 if isinstance(outputs, MutableMapping):
117 if outputs.get("class") == "File":
118 st = os.lstat(outputs["path"])
119 if stat.S_ISLNK(st.st_mode):
120 outputs["basename"] = os.path.basename(outputs["path"])
121 outputs["path"] = os.readlink(outputs["path"])
122 else:
123 for v in outputs.values():
124 deref_links(v)
125 if isinstance(outputs, MutableSequence):
126 for output in outputs:
127 deref_links(output)
128
129
130 def relink_initialworkdir(pathmapper, # type: PathMapper
131 host_outdir, # type: Text
132 container_outdir, # type: Text
133 inplace_update=False # type: bool
134 ): # type: (...) -> None
135 for _, vol in pathmapper.items():
136 if not vol.staged:
137 continue
138
139 if (vol.type in ("File", "Directory") or (
140 inplace_update and vol.type in
141 ("WritableFile", "WritableDirectory"))):
142 if not vol.target.startswith(container_outdir):
143 # this is an input file written outside of the working
144 # directory, so therefor ineligable for being an output file.
145 # Thus, none of our business
146 continue
147 host_outdir_tgt = os.path.join(
148 host_outdir, vol.target[len(container_outdir) + 1:])
149 if os.path.islink(host_outdir_tgt) \
150 or os.path.isfile(host_outdir_tgt):
151 os.remove(host_outdir_tgt)
152 elif os.path.isdir(host_outdir_tgt) \
153 and not vol.resolved.startswith("_:"):
154 shutil.rmtree(host_outdir_tgt)
155 if onWindows():
156 # If this becomes a big issue for someone then we could
157 # refactor the code to process output from a running container
158 # and avoid all the extra IO below
159 if vol.type in ("File", "WritableFile"):
160 shutil.copy(vol.resolved, host_outdir_tgt)
161 elif vol.type in ("Directory", "WritableDirectory"):
162 copytree_with_merge(vol.resolved, host_outdir_tgt)
163 elif not vol.resolved.startswith("_:"):
164 os.symlink(vol.resolved, host_outdir_tgt)
165
166
167 class JobBase(with_metaclass(ABCMeta, HasReqsHints)):
168 def __init__(self,
169 builder, # type: Builder
170 joborder, # type: JSON
171 make_path_mapper, # type: Callable[..., PathMapper]
172 requirements, # type: List[Dict[Text, Text]]
173 hints, # type: List[Dict[Text, Text]]
174 name, # type: Text
175 ): # type: (...) -> None
176 """Initialize the job object."""
177 self.builder = builder
178 self.joborder = joborder
179 self.stdin = None # type: Optional[Text]
180 self.stderr = None # type: Optional[Text]
181 self.stdout = None # type: Optional[Text]
182 self.successCodes = [] # type: Iterable[int]
183 self.temporaryFailCodes = [] # type: Iterable[int]
184 self.permanentFailCodes = [] # type: Iterable[int]
185 self.requirements = requirements
186 self.hints = hints
187 self.name = name
188 self.command_line = [] # type: List[Text]
189 self.pathmapper = PathMapper([], u"", u"")
190 self.make_path_mapper = make_path_mapper
191 self.generatemapper = None # type: Optional[PathMapper]
192
193 # set in CommandLineTool.job(i)
194 self.collect_outputs = cast(Callable[[Text, int], MutableMapping[Text, Any]],
195 None) # type: Union[Callable[[Text, int], MutableMapping[Text, Any]], functools.partial[MutableMapping[Text, Any]]]
196 self.output_callback = cast(Callable[[Any, Any], Any], None)
197 self.outdir = u""
198 self.tmpdir = u""
199
200 self.environment = {} # type: MutableMapping[Text, Text]
201 self.generatefiles = {"class": "Directory", "listing": [], "basename": ""} # type: Directory
202 self.stagedir = None # type: Optional[Text]
203 self.inplace_update = False
204 self.prov_obj = None # type: Optional[ProvenanceProfile]
205 self.parent_wf = None # type: Optional[ProvenanceProfile]
206 self.timelimit = None # type: Optional[int]
207 self.networkaccess = False # type: bool
208
209 def __repr__(self): # type: () -> str
210 """Represent this Job object."""
211 return "CommandLineJob(%s)" % self.name
212
213 @abstractmethod
214 def run(self,
215 runtimeContext, # type: RuntimeContext
216 tmpdir_lock=None # type: Optional[threading.Lock]
217 ): # type: (...) -> None
218 pass
219
220 def _setup(self, runtimeContext): # type: (RuntimeContext) -> None
221 if not os.path.exists(self.outdir):
222 os.makedirs(self.outdir)
223
224 for knownfile in self.pathmapper.files():
225 p = self.pathmapper.mapper(knownfile)
226 if p.type == "File" and not os.path.isfile(p[0]) and p.staged:
227 raise WorkflowException(
228 u"Input file %s (at %s) not found or is not a regular "
229 "file." % (knownfile, self.pathmapper.mapper(knownfile)[0]))
230
231 if 'listing' in self.generatefiles:
232 runtimeContext = runtimeContext.copy()
233 runtimeContext.outdir = self.outdir
234 self.generatemapper = self.make_path_mapper(
235 cast(List[Any], self.generatefiles["listing"]),
236 self.builder.outdir, runtimeContext, False)
237 if _logger.isEnabledFor(logging.DEBUG):
238 _logger.debug(
239 u"[job %s] initial work dir %s", self.name,
240 json_dumps({p: self.generatemapper.mapper(p)
241 for p in self.generatemapper.files()}, indent=4))
242
243 def _execute(self,
244 runtime, # type: List[Text]
245 env, # type: MutableMapping[Text, Text]
246 runtimeContext, # type: RuntimeContext
247 monitor_function=None, # type: Optional[Callable[[subprocess.Popen], None]]
248 ): # type: (...) -> None
249
250 scr, _ = self.get_requirement("ShellCommandRequirement")
251
252 shouldquote = needs_shell_quoting_re.search # type: Callable[[Any], Any]
253 if scr is not None:
254 shouldquote = lambda x: False
255
256 _logger.info(u"[job %s] %s$ %s%s%s%s",
257 self.name,
258 self.outdir,
259 " \\\n ".join([shellescape.quote(Text(arg)) if shouldquote(Text(arg)) else Text(arg) for arg in
260 (runtime + self.command_line)]),
261 u' < %s' % self.stdin if self.stdin else '',
262 u' > %s' % os.path.join(self.outdir, self.stdout) if self.stdout else '',
263 u' 2> %s' % os.path.join(self.outdir, self.stderr) if self.stderr else '')
264 if self.joborder is not None and runtimeContext.research_obj is not None:
265 job_order = self.joborder
266 if runtimeContext.process_run_id is not None \
267 and runtimeContext.prov_obj is not None and isinstance(job_order, (list, dict)):
268 runtimeContext.prov_obj.used_artefacts(
269 job_order, runtimeContext.process_run_id, str(self.name))
270 else:
271 _logger.warning("research_obj set but one of process_run_id "
272 "or prov_obj is missing from runtimeContext: "
273 "{}". format(runtimeContext))
274 outputs = {} # type: MutableMapping[Text,Any]
275 try:
276 stdin_path = None
277 if self.stdin is not None:
278 rmap = self.pathmapper.reversemap(self.stdin)
279 if rmap is None:
280 raise WorkflowException(
281 "{} missing from pathmapper".format(self.stdin))
282 else:
283 stdin_path = rmap[1]
284
285 stderr_path = None
286 if self.stderr is not None:
287 abserr = os.path.join(self.outdir, self.stderr)
288 dnerr = os.path.dirname(abserr)
289 if dnerr and not os.path.exists(dnerr):
290 os.makedirs(dnerr)
291 stderr_path = abserr
292
293 stdout_path = None
294 if self.stdout is not None:
295 absout = os.path.join(self.outdir, self.stdout)
296 dnout = os.path.dirname(absout)
297 if dnout and not os.path.exists(dnout):
298 os.makedirs(dnout)
299 stdout_path = absout
300
301 commands = [Text(x) for x in runtime + self.command_line]
302 if runtimeContext.secret_store is not None:
303 commands = runtimeContext.secret_store.retrieve(commands)
304 env = runtimeContext.secret_store.retrieve(env)
305
306 job_script_contents = None # type: Optional[Text]
307 builder = getattr(self, "builder", None) # type: Builder
308 if builder is not None:
309 job_script_contents = builder.build_job_script(commands)
310 rcode = _job_popen(
311 commands,
312 stdin_path=stdin_path,
313 stdout_path=stdout_path,
314 stderr_path=stderr_path,
315 env=env,
316 cwd=self.outdir,
317 job_dir=tempfile.mkdtemp(prefix=getdefault(runtimeContext.tmp_outdir_prefix, DEFAULT_TMP_PREFIX)),
318 job_script_contents=job_script_contents,
319 timelimit=self.timelimit,
320 name=self.name,
321 monitor_function=monitor_function
322 )
323
324 if rcode in self.successCodes:
325 processStatus = "success"
326 elif rcode in self.temporaryFailCodes:
327 processStatus = "temporaryFail"
328 elif rcode in self.permanentFailCodes:
329 processStatus = "permanentFail"
330 elif rcode == 0:
331 processStatus = "success"
332 else:
333 processStatus = "permanentFail"
334
335 if 'listing' in self.generatefiles:
336 if self.generatemapper:
337 relink_initialworkdir(
338 self.generatemapper, self.outdir, self.builder.outdir,
339 inplace_update=self.inplace_update)
340 else:
341 raise ValueError("'lsiting' in self.generatefiles but no "
342 "generatemapper was setup.")
343
344 outputs = self.collect_outputs(self.outdir, rcode)
345 outputs = bytes2str_in_dicts(outputs) # type: ignore
346 except OSError as e:
347 if e.errno == 2:
348 if runtime:
349 _logger.error(u"'%s' not found: %s", runtime[0], Text(e))
350 else:
351 _logger.error(u"'%s' not found: %s", self.command_line[0], Text(e))
352 else:
353 _logger.exception(u"Exception while running job")
354 processStatus = "permanentFail"
355 except WorkflowException as err:
356 _logger.error(u"[job %s] Job error:\n%s", self.name, Text(err))
357 processStatus = "permanentFail"
358 except Exception as e:
359 _logger.exception(u"Exception while running job")
360 processStatus = "permanentFail"
361 if runtimeContext.research_obj is not None \
362 and self.prov_obj is not None \
363 and runtimeContext.process_run_id is not None:
364 # creating entities for the outputs produced by each step (in the provenance document)
365 self.prov_obj.record_process_end(str(self.name), runtimeContext.process_run_id,
366 outputs, datetime.datetime.now())
367 if processStatus != "success":
368 _logger.warning(u"[job %s] completed %s", self.name, processStatus)
369 else:
370 _logger.info(u"[job %s] completed %s", self.name, processStatus)
371
372 if _logger.isEnabledFor(logging.DEBUG):
373 _logger.debug(u"[job %s] %s", self.name,
374 json_dumps(outputs, indent=4))
375
376 if self.generatemapper is not None and runtimeContext.secret_store is not None:
377 # Delete any runtime-generated files containing secrets.
378 for _, p in self.generatemapper.items():
379 if p.type == "CreateFile":
380 if runtimeContext.secret_store.has_secret(p.resolved):
381 host_outdir = self.outdir
382 container_outdir = self.builder.outdir
383 host_outdir_tgt = p.target
384 if p.target.startswith(container_outdir + "/"):
385 host_outdir_tgt = os.path.join(
386 host_outdir, p.target[len(container_outdir) + 1:])
387 os.remove(host_outdir_tgt)
388
389 if runtimeContext.workflow_eval_lock is None:
390 raise WorkflowException("runtimeContext.workflow_eval_lock must not be None")
391
392 with runtimeContext.workflow_eval_lock:
393 self.output_callback(outputs, processStatus)
394
395 if self.stagedir is not None and os.path.exists(self.stagedir):
396 _logger.debug(u"[job %s] Removing input staging directory %s", self.name, self.stagedir)
397 shutil.rmtree(self.stagedir, True)
398
399 if runtimeContext.rm_tmpdir:
400 _logger.debug(u"[job %s] Removing temporary directory %s", self.name, self.tmpdir)
401 shutil.rmtree(self.tmpdir, True)
402
403 def process_monitor(self, sproc): # type: (subprocess.Popen) -> None
404 monitor = psutil.Process(sproc.pid)
405 memory_usage = [None] # Value must be list rather than integer to utilise pass-by-reference in python
406
407 def get_tree_mem_usage(memory_usage): # type: (List[int]) -> None
408 children = monitor.children()
409 rss = monitor.memory_info().rss
410 while len(children):
411 rss += sum([process.memory_info().rss for process in children])
412 children = list(itertools.chain(*[process.children() for process in children]))
413 if memory_usage[0] is None or rss > memory_usage[0]:
414 memory_usage[0] = rss
415
416 mem_tm = Timer(interval=1, function=get_tree_mem_usage, args=(memory_usage,))
417 mem_tm.daemon = True
418 mem_tm.start()
419 sproc.wait()
420 mem_tm.cancel()
421 if memory_usage[0] is not None:
422 _logger.info(u"[job %s] Max memory used: %iMiB", self.name,
423 round(memory_usage[0] / (2 ** 20)))
424 else:
425 _logger.debug(u"Could not collect memory usage, job ended before monitoring began.")
426
427
428 class CommandLineJob(JobBase):
429 def run(self,
430 runtimeContext, # type: RuntimeContext
431 tmpdir_lock=None # type: Optional[threading.Lock]
432 ): # type: (...) -> None
433
434 if tmpdir_lock:
435 with tmpdir_lock:
436 if not os.path.exists(self.tmpdir):
437 os.makedirs(self.tmpdir)
438 else:
439 if not os.path.exists(self.tmpdir):
440 os.makedirs(self.tmpdir)
441
442 self._setup(runtimeContext)
443
444 env = self.environment
445 vars_to_preserve = runtimeContext.preserve_environment
446 if runtimeContext.preserve_entire_environment is not False:
447 vars_to_preserve = os.environ
448 if vars_to_preserve:
449 for key, value in os.environ.items():
450 if key in vars_to_preserve and key not in env:
451 # On Windows, subprocess env can't handle unicode.
452 env[key] = str(value) if onWindows() else value
453 env["HOME"] = str(self.outdir) if onWindows() else self.outdir
454 env["TMPDIR"] = str(self.tmpdir) if onWindows() else self.tmpdir
455 if "PATH" not in env:
456 env["PATH"] = str(os.environ["PATH"]) if onWindows() else os.environ["PATH"]
457 if "SYSTEMROOT" not in env and "SYSTEMROOT" in os.environ:
458 env["SYSTEMROOT"] = str(os.environ["SYSTEMROOT"]) if onWindows() \
459 else os.environ["SYSTEMROOT"]
460
461 stage_files(self.pathmapper, ignore_writable=True, symlink=True,
462 secret_store=runtimeContext.secret_store)
463 if self.generatemapper is not None:
464 stage_files(self.generatemapper, ignore_writable=self.inplace_update,
465 symlink=True, secret_store=runtimeContext.secret_store)
466 relink_initialworkdir(
467 self.generatemapper, self.outdir, self.builder.outdir,
468 inplace_update=self.inplace_update)
469
470 monitor_function = functools.partial(self.process_monitor)
471
472 self._execute([], env, runtimeContext, monitor_function)
473
474
475 CONTROL_CODE_RE = r'\x1b\[[0-9;]*[a-zA-Z]'
476
477
478 class ContainerCommandLineJob(with_metaclass(ABCMeta, JobBase)):
479 """Commandline job using containers."""
480
481 @abstractmethod
482 def get_from_requirements(self,
483 r, # type: Dict[Text, Text]
484 pull_image, # type: bool
485 force_pull=False, # type: bool
486 tmp_outdir_prefix=DEFAULT_TMP_PREFIX # type: Text
487 ): # type: (...) -> Optional[Text]
488 pass
489
490 @abstractmethod
491 def create_runtime(self,
492 env, # type: MutableMapping[Text, Text]
493 runtime_context # type: RuntimeContext
494 ): # type: (...) -> Tuple[List[Text], Optional[Text]]
495 """Return the list of commands to run the selected container engine."""
496 pass
497
498 @staticmethod
499 @abstractmethod
500 def append_volume(runtime, source, target, writable=False):
501 # type: (List[Text], Text, Text, bool) -> None
502 """Add binding arguments to the runtime list."""
503 pass
504
505 @abstractmethod
506 def add_file_or_directory_volume(self,
507 runtime, # type: List[Text]
508 volume, # type: MapperEnt
509 host_outdir_tgt # type: Optional[Text]
510 ): # type: (...) -> None
511 """Append volume a file/dir mapping to the runtime option list."""
512 pass
513
514 @abstractmethod
515 def add_writable_file_volume(self,
516 runtime, # type: List[Text]
517 volume, # type: MapperEnt
518 host_outdir_tgt, # type: Optional[Text]
519 tmpdir_prefix # type: Text
520 ): # type: (...) -> None
521 """Append a writable file mapping to the runtime option list."""
522 pass
523
524 @abstractmethod
525 def add_writable_directory_volume(self,
526 runtime, # type: List[Text]
527 volume, # type: MapperEnt
528 host_outdir_tgt, # type: Optional[Text]
529 tmpdir_prefix # type: Text
530 ): # type: (...) -> None
531 """Append a writable directory mapping to the runtime option list."""
532 pass
533
534 def create_file_and_add_volume(self,
535 runtime, # type: List[Text]
536 volume, # type: MapperEnt
537 host_outdir_tgt, # type: Optional[Text]
538 secret_store, # type: Optional[SecretStore]
539 tmpdir_prefix # type: Text
540 ): # type: (...) -> Text
541 """Create the file and add a mapping."""
542 if not host_outdir_tgt:
543 tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix)
544 new_file = os.path.join(
545 tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir),
546 os.path.basename(volume.target))
547 writable = True if volume.type == "CreateWritableFile" else False
548 if secret_store:
549 contents = secret_store.retrieve(volume.resolved)
550 else:
551 contents = volume.resolved
552 dirname = os.path.dirname(host_outdir_tgt or new_file)
553 if not os.path.exists(dirname):
554 os.makedirs(dirname)
555 with open(host_outdir_tgt or new_file, "wb") as file_literal:
556 file_literal.write(contents.encode("utf-8"))
557 if not host_outdir_tgt:
558 self.append_volume(runtime, new_file, volume.target,
559 writable=writable)
560 if writable:
561 ensure_writable(host_outdir_tgt or new_file)
562 else:
563 ensure_non_writable(host_outdir_tgt or new_file)
564 return host_outdir_tgt or new_file
565
566 def add_volumes(self,
567 pathmapper, # type: PathMapper
568 runtime, # type: List[Text]
569 tmpdir_prefix, # type: Text
570 secret_store=None, # type: Optional[SecretStore]
571 any_path_okay=False # type: bool
572 ): # type: (...) -> None
573 """Append volume mappings to the runtime option list."""
574 container_outdir = self.builder.outdir
575 for key, vol in (itm for itm in pathmapper.items() if itm[1].staged):
576 host_outdir_tgt = None # type: Optional[Text]
577 if vol.target.startswith(container_outdir + "/"):
578 host_outdir_tgt = os.path.join(
579 self.outdir, vol.target[len(container_outdir) + 1:])
580 if not host_outdir_tgt and not any_path_okay:
581 raise WorkflowException(
582 "No mandatory DockerRequirement, yet path is outside "
583 "the designated output directory, also know as "
584 "$(runtime.outdir): {}".format(vol))
585 if vol.type in ("File", "Directory"):
586 self.add_file_or_directory_volume(
587 runtime, vol, host_outdir_tgt)
588 elif vol.type == "WritableFile":
589 self.add_writable_file_volume(
590 runtime, vol, host_outdir_tgt, tmpdir_prefix)
591 elif vol.type == "WritableDirectory":
592 self.add_writable_directory_volume(
593 runtime, vol, host_outdir_tgt, tmpdir_prefix)
594 elif vol.type in ["CreateFile", "CreateWritableFile"]:
595 new_path = self.create_file_and_add_volume(
596 runtime, vol, host_outdir_tgt, secret_store, tmpdir_prefix)
597 pathmapper.update(
598 key, new_path, vol.target, vol.type, vol.staged)
599
600 def run(self,
601 runtimeContext, # type: RuntimeContext
602 tmpdir_lock=None # type: Optional[threading.Lock]
603 ): # type: (...) -> None
604 if tmpdir_lock:
605 with tmpdir_lock:
606 if not os.path.exists(self.tmpdir):
607 os.makedirs(self.tmpdir)
608 else:
609 if not os.path.exists(self.tmpdir):
610 os.makedirs(self.tmpdir)
611
612 (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
613 self.prov_obj = runtimeContext.prov_obj
614 img_id = None
615 env = cast(MutableMapping[Text, Text], os.environ)
616 user_space_docker_cmd = runtimeContext.user_space_docker_cmd
617 if docker_req is not None and user_space_docker_cmd:
618 # For user-space docker implementations, a local image name or ID
619 # takes precedence over a network pull
620 if 'dockerImageId' in docker_req:
621 img_id = str(docker_req["dockerImageId"])
622 elif 'dockerPull' in docker_req:
623 img_id = str(docker_req["dockerPull"])
624 cmd = [user_space_docker_cmd, "pull", img_id]
625 _logger.info(Text(cmd))
626 try:
627 subprocess.check_call(cmd, stdout=sys.stderr)
628 except OSError:
629 raise WorkflowException(SourceLine(docker_req).makeError(
630 "Either Docker container {} is not available with "
631 "user space docker implementation {} or {} is missing "
632 "or broken.".format(img_id, user_space_docker_cmd,
633 user_space_docker_cmd)))
634 else:
635 raise WorkflowException(SourceLine(docker_req).makeError(
636 "Docker image must be specified as 'dockerImageId' or "
637 "'dockerPull' when using user space implementations of "
638 "Docker"))
639 else:
640 try:
641 if docker_req is not None and runtimeContext.use_container:
642 img_id = str(
643 self.get_from_requirements(
644 docker_req, runtimeContext.pull_image,
645 getdefault(runtimeContext.force_docker_pull, False),
646 getdefault(runtimeContext.tmp_outdir_prefix, DEFAULT_TMP_PREFIX)))
647 if img_id is None:
648 if self.builder.find_default_container:
649 default_container = self.builder.find_default_container()
650 if default_container:
651 img_id = str(default_container)
652
653 if docker_req is not None and img_id is None and runtimeContext.use_container:
654 raise Exception("Docker image not available")
655
656 if self.prov_obj is not None and img_id is not None \
657 and runtimeContext.process_run_id is not None:
658 container_agent = self.prov_obj.document.agent(
659 uuid.uuid4().urn,
660 {"prov:type": PROV["SoftwareAgent"],
661 "cwlprov:image": img_id,
662 "prov:label": "Container execution of image %s" % img_id})
663 # FIXME: img_id is not a sha256 id, it might just be "debian:8"
664 # img_entity = document.entity("nih:sha-256;%s" % img_id,
665 # {"prov:label": "Container image %s" % img_id} )
666 # The image is the plan for this activity-agent association
667 # document.wasAssociatedWith(process_run_ID, container_agent, img_entity)
668 self.prov_obj.document.wasAssociatedWith(
669 runtimeContext.process_run_id, container_agent)
670 except Exception as err:
671 container = "Singularity" if runtimeContext.singularity else "Docker"
672 _logger.debug(u"%s error", container, exc_info=True)
673 if docker_is_req:
674 raise_from(UnsupportedRequirement(
675 "%s is required to run this tool: %s" % (container, Text(err))), err)
676 else:
677 raise WorkflowException(
678 "{0} is not available for this tool, try "
679 "--no-container to disable {0}, or install "
680 "a user space Docker replacement like uDocker with "
681 "--user-space-docker-cmd.: {1}".format(container, err))
682
683 self._setup(runtimeContext)
684 (runtime, cidfile) = self.create_runtime(env, runtimeContext)
685 runtime.append(Text(img_id))
686 monitor_function = None
687 if cidfile:
688 monitor_function = functools.partial(
689 self.docker_monitor, cidfile, runtimeContext.tmpdir_prefix,
690 not bool(runtimeContext.cidfile_dir))
691 elif runtimeContext.user_space_docker_cmd:
692 monitor_function = functools.partial(self.process_monitor)
693 self._execute(runtime, env, runtimeContext, monitor_function)
694
695 def docker_monitor(self, cidfile, tmpdir_prefix, cleanup_cidfile, process):
696 # type: (Text, Text, bool, subprocess.Popen) -> None
697 """Record memory usage of the running Docker container."""
698 # Todo: consider switching to `docker create` / `docker start`
699 # instead of `docker run` as `docker create` outputs the container ID
700 # to stdout, but the container is frozen, thus allowing us to start the
701 # monitoring process without dealing with the cidfile or too-fast
702 # container execution
703 cid = None
704 while cid is None:
705 time.sleep(1)
706 if process.returncode is not None:
707 if cleanup_cidfile:
708 try:
709 os.remove(cidfile)
710 except OSError as exc:
711 _logger.warn("Ignored error cleaning up Docker cidfile: %s", exc)
712 return
713 try:
714 with open(cidfile) as cidhandle:
715 cid = cidhandle.readline().strip()
716 except (OSError, IOError):
717 cid = None
718 max_mem = psutil.virtual_memory().total
719 tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix)
720 stats_file = tempfile.NamedTemporaryFile(prefix=tmp_prefix, dir=tmp_dir)
721 try:
722 with open(stats_file.name, mode="w") as stats_file_handle:
723 stats_proc = subprocess.Popen(
724 ['docker', 'stats', '--no-trunc', '--format', '{{.MemPerc}}',
725 cid], stdout=stats_file_handle, stderr=subprocess.DEVNULL)
726 process.wait()
727 stats_proc.kill()
728 except OSError as exc:
729 _logger.warn("Ignored error with docker stats: %s", exc)
730 return
731 max_mem_percent = 0
732 with open(stats_file.name, mode="r") as stats:
733 for line in stats:
734 try:
735 mem_percent = float(re.sub(
736 CONTROL_CODE_RE, '', line).replace('%', ''))
737 if mem_percent > max_mem_percent:
738 max_mem_percent = mem_percent
739 except ValueError:
740 break
741 _logger.info(u"[job %s] Max memory used: %iMiB", self.name,
742 int((max_mem_percent / 100 * max_mem) / (2 ** 20)))
743 if cleanup_cidfile:
744 os.remove(cidfile)
745
746
747 def _job_popen(commands, # type: List[Text]
748 stdin_path, # type: Optional[Text]
749 stdout_path, # type: Optional[Text]
750 stderr_path, # type: Optional[Text]
751 env, # type: MutableMapping[AnyStr, AnyStr]
752 cwd, # type: Text
753 job_dir, # type: Text
754 job_script_contents=None, # type: Optional[Text]
755 timelimit=None, # type: Optional[int]
756 name=None, # type: Optional[Text]
757 monitor_function=None # type: Optional[Callable[[subprocess.Popen], None]]
758 ): # type: (...) -> int
759
760 if job_script_contents is None and not FORCE_SHELLED_POPEN:
761
762 stdin = subprocess.PIPE # type: Union[IO[Any], int]
763 if stdin_path is not None:
764 stdin = open(stdin_path, "rb")
765
766 stdout = sys.stderr # type: IO[Any]
767 if stdout_path is not None:
768 stdout = open(stdout_path, "wb")
769
770 stderr = sys.stderr # type: IO[Any]
771 if stderr_path is not None:
772 stderr = open(stderr_path, "wb")
773
774 sproc = subprocess.Popen(commands,
775 shell=False,
776 close_fds=not onWindows(),
777 stdin=stdin,
778 stdout=stdout,
779 stderr=stderr,
780 env=env,
781 cwd=cwd)
782 processes_to_kill.append(sproc)
783
784 if sproc.stdin is not None:
785 sproc.stdin.close()
786
787 tm = None
788 if timelimit is not None and timelimit > 0:
789 def terminate(): # type: () -> None
790 try:
791 _logger.warning(
792 u"[job %s] exceeded time limit of %d seconds and will"
793 "be terminated", name, timelimit)
794 sproc.terminate()
795 except OSError:
796 pass
797
798 tm = Timer(timelimit, terminate)
799 tm.daemon = True
800 tm.start()
801 if monitor_function:
802 monitor_function(sproc)
803 rcode = sproc.wait()
804
805 if tm is not None:
806 tm.cancel()
807
808 if isinstance(stdin, IOBase):
809 stdin.close()
810
811 if stdout is not sys.stderr:
812 stdout.close()
813
814 if stderr is not sys.stderr:
815 stderr.close()
816
817 return rcode
818 else:
819 if job_script_contents is None:
820 job_script_contents = SHELL_COMMAND_TEMPLATE
821
822 env_copy = {}
823 key = None # type: Any
824 for key in env:
825 env_copy[key] = env[key]
826
827 job_description = {
828 u"commands": commands,
829 u"cwd": cwd,
830 u"env": env_copy,
831 u"stdout_path": stdout_path,
832 u"stderr_path": stderr_path,
833 u"stdin_path": stdin_path}
834
835 if PY2:
836 with open(os.path.join(job_dir, "job.json"), mode="wb") as job_file:
837 json_dump(job_description, job_file, ensure_ascii=False)
838 else:
839 with open(os.path.join(job_dir, "job.json"), mode="w",
840 encoding='utf-8') as job_file:
841 json_dump(job_description, job_file, ensure_ascii=False)
842 try:
843 job_script = os.path.join(job_dir, "run_job.bash")
844 with open(job_script, "wb") as _:
845 _.write(job_script_contents.encode('utf-8'))
846 job_run = os.path.join(job_dir, "run_job.py")
847 with open(job_run, "wb") as _:
848 _.write(PYTHON_RUN_SCRIPT.encode('utf-8'))
849 sproc = subprocess.Popen(
850 ["bash", job_script.encode("utf-8")],
851 shell=False,
852 cwd=job_dir,
853 # The nested script will output the paths to the correct files if they need
854 # to be captured. Else just write everything to stderr (same as above).
855 stdout=sys.stderr,
856 stderr=sys.stderr,
857 stdin=subprocess.PIPE,
858 )
859 processes_to_kill.append(sproc)
860 if sproc.stdin is not None:
861 sproc.stdin.close()
862
863 rcode = sproc.wait()
864
865 return rcode
866 finally:
867 shutil.rmtree(job_dir)