Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/cwltool/job.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
comparison
equal
deleted
inserted
replaced
| 4:79f47841a781 | 5:9b1c78e6ba9c |
|---|---|
| 1 from __future__ import absolute_import | |
| 2 | |
| 3 import datetime | |
| 4 import functools | |
| 5 import itertools | |
| 6 import logging | |
| 7 import threading | |
| 8 import os | |
| 9 import re | |
| 10 import shutil | |
| 11 import stat | |
| 12 import sys | |
| 13 import tempfile | |
| 14 import time | |
| 15 import uuid | |
| 16 from abc import ABCMeta, abstractmethod | |
| 17 from io import IOBase, open # pylint: disable=redefined-builtin | |
| 18 from threading import Timer | |
| 19 from typing import (IO, Any, AnyStr, Callable, Dict, Iterable, List, Tuple, | |
| 20 MutableMapping, MutableSequence, Optional, Union, cast) | |
| 21 | |
| 22 import psutil | |
| 23 import shellescape | |
| 24 from prov.model import PROV | |
| 25 from schema_salad.sourceline import SourceLine | |
| 26 from six import PY2, with_metaclass | |
| 27 from future.utils import raise_from | |
| 28 from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import | |
| 29 Text) | |
| 30 | |
| 31 from .builder import Builder, HasReqsHints # pylint: disable=unused-import | |
| 32 from .context import RuntimeContext # pylint: disable=unused-import | |
| 33 from .context import getdefault | |
| 34 from .errors import WorkflowException | |
| 35 from .expression import JSON | |
| 36 from .loghandler import _logger | |
| 37 from .pathmapper import (MapperEnt, PathMapper, # pylint: disable=unused-import | |
| 38 ensure_writable, ensure_non_writable) | |
| 39 from .process import UnsupportedRequirement, stage_files | |
| 40 from .secrets import SecretStore # pylint: disable=unused-import | |
| 41 from .utils import (DEFAULT_TMP_PREFIX, Directory, bytes2str_in_dicts, | |
| 42 copytree_with_merge, json_dump, json_dumps, onWindows, | |
| 43 processes_to_kill, subprocess) | |
| 44 | |
| 45 if TYPE_CHECKING: | |
| 46 from .provenance import ProvenanceProfile # pylint: disable=unused-import | |
| 47 needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""") | |
| 48 | |
| 49 FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1" | |
| 50 | |
| 51 SHELL_COMMAND_TEMPLATE = u"""#!/bin/bash | |
| 52 python "run_job.py" "job.json" | |
| 53 """ | |
| 54 | |
| 55 PYTHON_RUN_SCRIPT = u""" | |
| 56 import json | |
| 57 import os | |
| 58 import sys | |
| 59 if os.name == 'posix': | |
| 60 try: | |
| 61 import subprocess32 as subprocess # type: ignore | |
| 62 except Exception: | |
| 63 import subprocess | |
| 64 else: | |
| 65 import subprocess # type: ignore | |
| 66 | |
| 67 with open(sys.argv[1], "r") as f: | |
| 68 popen_description = json.load(f) | |
| 69 commands = popen_description["commands"] | |
| 70 cwd = popen_description["cwd"] | |
| 71 env = popen_description["env"] | |
| 72 env["PATH"] = os.environ.get("PATH") | |
| 73 stdin_path = popen_description["stdin_path"] | |
| 74 stdout_path = popen_description["stdout_path"] | |
| 75 stderr_path = popen_description["stderr_path"] | |
| 76 if stdin_path is not None: | |
| 77 stdin = open(stdin_path, "rb") | |
| 78 else: | |
| 79 stdin = subprocess.PIPE | |
| 80 if stdout_path is not None: | |
| 81 stdout = open(stdout_path, "wb") | |
| 82 else: | |
| 83 stdout = sys.stderr | |
| 84 if stderr_path is not None: | |
| 85 stderr = open(stderr_path, "wb") | |
| 86 else: | |
| 87 stderr = sys.stderr | |
| 88 if os.name == 'nt': | |
| 89 close_fds = False | |
| 90 for key, value in env.items(): | |
| 91 env[key] = str(value) | |
| 92 else: | |
| 93 close_fds = True | |
| 94 sp = subprocess.Popen(commands, | |
| 95 shell=False, | |
| 96 close_fds=close_fds, | |
| 97 stdin=stdin, | |
| 98 stdout=stdout, | |
| 99 stderr=stderr, | |
| 100 env=env, | |
| 101 cwd=cwd) | |
| 102 if sp.stdin: | |
| 103 sp.stdin.close() | |
| 104 rcode = sp.wait() | |
| 105 if stdin is not subprocess.PIPE: | |
| 106 stdin.close() | |
| 107 if stdout is not sys.stderr: | |
| 108 stdout.close() | |
| 109 if stderr is not sys.stderr: | |
| 110 stderr.close() | |
| 111 sys.exit(rcode) | |
| 112 """ | |
| 113 | |
| 114 | |
| 115 def deref_links(outputs): # type: (Any) -> None | |
| 116 if isinstance(outputs, MutableMapping): | |
| 117 if outputs.get("class") == "File": | |
| 118 st = os.lstat(outputs["path"]) | |
| 119 if stat.S_ISLNK(st.st_mode): | |
| 120 outputs["basename"] = os.path.basename(outputs["path"]) | |
| 121 outputs["path"] = os.readlink(outputs["path"]) | |
| 122 else: | |
| 123 for v in outputs.values(): | |
| 124 deref_links(v) | |
| 125 if isinstance(outputs, MutableSequence): | |
| 126 for output in outputs: | |
| 127 deref_links(output) | |
| 128 | |
| 129 | |
| 130 def relink_initialworkdir(pathmapper, # type: PathMapper | |
| 131 host_outdir, # type: Text | |
| 132 container_outdir, # type: Text | |
| 133 inplace_update=False # type: bool | |
| 134 ): # type: (...) -> None | |
| 135 for _, vol in pathmapper.items(): | |
| 136 if not vol.staged: | |
| 137 continue | |
| 138 | |
| 139 if (vol.type in ("File", "Directory") or ( | |
| 140 inplace_update and vol.type in | |
| 141 ("WritableFile", "WritableDirectory"))): | |
| 142 if not vol.target.startswith(container_outdir): | |
| 143 # this is an input file written outside of the working | |
| 144 # directory, so therefor ineligable for being an output file. | |
| 145 # Thus, none of our business | |
| 146 continue | |
| 147 host_outdir_tgt = os.path.join( | |
| 148 host_outdir, vol.target[len(container_outdir) + 1:]) | |
| 149 if os.path.islink(host_outdir_tgt) \ | |
| 150 or os.path.isfile(host_outdir_tgt): | |
| 151 os.remove(host_outdir_tgt) | |
| 152 elif os.path.isdir(host_outdir_tgt) \ | |
| 153 and not vol.resolved.startswith("_:"): | |
| 154 shutil.rmtree(host_outdir_tgt) | |
| 155 if onWindows(): | |
| 156 # If this becomes a big issue for someone then we could | |
| 157 # refactor the code to process output from a running container | |
| 158 # and avoid all the extra IO below | |
| 159 if vol.type in ("File", "WritableFile"): | |
| 160 shutil.copy(vol.resolved, host_outdir_tgt) | |
| 161 elif vol.type in ("Directory", "WritableDirectory"): | |
| 162 copytree_with_merge(vol.resolved, host_outdir_tgt) | |
| 163 elif not vol.resolved.startswith("_:"): | |
| 164 os.symlink(vol.resolved, host_outdir_tgt) | |
| 165 | |
| 166 | |
| 167 class JobBase(with_metaclass(ABCMeta, HasReqsHints)): | |
| 168 def __init__(self, | |
| 169 builder, # type: Builder | |
| 170 joborder, # type: JSON | |
| 171 make_path_mapper, # type: Callable[..., PathMapper] | |
| 172 requirements, # type: List[Dict[Text, Text]] | |
| 173 hints, # type: List[Dict[Text, Text]] | |
| 174 name, # type: Text | |
| 175 ): # type: (...) -> None | |
| 176 """Initialize the job object.""" | |
| 177 self.builder = builder | |
| 178 self.joborder = joborder | |
| 179 self.stdin = None # type: Optional[Text] | |
| 180 self.stderr = None # type: Optional[Text] | |
| 181 self.stdout = None # type: Optional[Text] | |
| 182 self.successCodes = [] # type: Iterable[int] | |
| 183 self.temporaryFailCodes = [] # type: Iterable[int] | |
| 184 self.permanentFailCodes = [] # type: Iterable[int] | |
| 185 self.requirements = requirements | |
| 186 self.hints = hints | |
| 187 self.name = name | |
| 188 self.command_line = [] # type: List[Text] | |
| 189 self.pathmapper = PathMapper([], u"", u"") | |
| 190 self.make_path_mapper = make_path_mapper | |
| 191 self.generatemapper = None # type: Optional[PathMapper] | |
| 192 | |
| 193 # set in CommandLineTool.job(i) | |
| 194 self.collect_outputs = cast(Callable[[Text, int], MutableMapping[Text, Any]], | |
| 195 None) # type: Union[Callable[[Text, int], MutableMapping[Text, Any]], functools.partial[MutableMapping[Text, Any]]] | |
| 196 self.output_callback = cast(Callable[[Any, Any], Any], None) | |
| 197 self.outdir = u"" | |
| 198 self.tmpdir = u"" | |
| 199 | |
| 200 self.environment = {} # type: MutableMapping[Text, Text] | |
| 201 self.generatefiles = {"class": "Directory", "listing": [], "basename": ""} # type: Directory | |
| 202 self.stagedir = None # type: Optional[Text] | |
| 203 self.inplace_update = False | |
| 204 self.prov_obj = None # type: Optional[ProvenanceProfile] | |
| 205 self.parent_wf = None # type: Optional[ProvenanceProfile] | |
| 206 self.timelimit = None # type: Optional[int] | |
| 207 self.networkaccess = False # type: bool | |
| 208 | |
| 209 def __repr__(self): # type: () -> str | |
| 210 """Represent this Job object.""" | |
| 211 return "CommandLineJob(%s)" % self.name | |
| 212 | |
| 213 @abstractmethod | |
| 214 def run(self, | |
| 215 runtimeContext, # type: RuntimeContext | |
| 216 tmpdir_lock=None # type: Optional[threading.Lock] | |
| 217 ): # type: (...) -> None | |
| 218 pass | |
| 219 | |
| 220 def _setup(self, runtimeContext): # type: (RuntimeContext) -> None | |
| 221 if not os.path.exists(self.outdir): | |
| 222 os.makedirs(self.outdir) | |
| 223 | |
| 224 for knownfile in self.pathmapper.files(): | |
| 225 p = self.pathmapper.mapper(knownfile) | |
| 226 if p.type == "File" and not os.path.isfile(p[0]) and p.staged: | |
| 227 raise WorkflowException( | |
| 228 u"Input file %s (at %s) not found or is not a regular " | |
| 229 "file." % (knownfile, self.pathmapper.mapper(knownfile)[0])) | |
| 230 | |
| 231 if 'listing' in self.generatefiles: | |
| 232 runtimeContext = runtimeContext.copy() | |
| 233 runtimeContext.outdir = self.outdir | |
| 234 self.generatemapper = self.make_path_mapper( | |
| 235 cast(List[Any], self.generatefiles["listing"]), | |
| 236 self.builder.outdir, runtimeContext, False) | |
| 237 if _logger.isEnabledFor(logging.DEBUG): | |
| 238 _logger.debug( | |
| 239 u"[job %s] initial work dir %s", self.name, | |
| 240 json_dumps({p: self.generatemapper.mapper(p) | |
| 241 for p in self.generatemapper.files()}, indent=4)) | |
| 242 | |
| 243 def _execute(self, | |
| 244 runtime, # type: List[Text] | |
| 245 env, # type: MutableMapping[Text, Text] | |
| 246 runtimeContext, # type: RuntimeContext | |
| 247 monitor_function=None, # type: Optional[Callable[[subprocess.Popen], None]] | |
| 248 ): # type: (...) -> None | |
| 249 | |
| 250 scr, _ = self.get_requirement("ShellCommandRequirement") | |
| 251 | |
| 252 shouldquote = needs_shell_quoting_re.search # type: Callable[[Any], Any] | |
| 253 if scr is not None: | |
| 254 shouldquote = lambda x: False | |
| 255 | |
| 256 _logger.info(u"[job %s] %s$ %s%s%s%s", | |
| 257 self.name, | |
| 258 self.outdir, | |
| 259 " \\\n ".join([shellescape.quote(Text(arg)) if shouldquote(Text(arg)) else Text(arg) for arg in | |
| 260 (runtime + self.command_line)]), | |
| 261 u' < %s' % self.stdin if self.stdin else '', | |
| 262 u' > %s' % os.path.join(self.outdir, self.stdout) if self.stdout else '', | |
| 263 u' 2> %s' % os.path.join(self.outdir, self.stderr) if self.stderr else '') | |
| 264 if self.joborder is not None and runtimeContext.research_obj is not None: | |
| 265 job_order = self.joborder | |
| 266 if runtimeContext.process_run_id is not None \ | |
| 267 and runtimeContext.prov_obj is not None and isinstance(job_order, (list, dict)): | |
| 268 runtimeContext.prov_obj.used_artefacts( | |
| 269 job_order, runtimeContext.process_run_id, str(self.name)) | |
| 270 else: | |
| 271 _logger.warning("research_obj set but one of process_run_id " | |
| 272 "or prov_obj is missing from runtimeContext: " | |
| 273 "{}". format(runtimeContext)) | |
| 274 outputs = {} # type: MutableMapping[Text,Any] | |
| 275 try: | |
| 276 stdin_path = None | |
| 277 if self.stdin is not None: | |
| 278 rmap = self.pathmapper.reversemap(self.stdin) | |
| 279 if rmap is None: | |
| 280 raise WorkflowException( | |
| 281 "{} missing from pathmapper".format(self.stdin)) | |
| 282 else: | |
| 283 stdin_path = rmap[1] | |
| 284 | |
| 285 stderr_path = None | |
| 286 if self.stderr is not None: | |
| 287 abserr = os.path.join(self.outdir, self.stderr) | |
| 288 dnerr = os.path.dirname(abserr) | |
| 289 if dnerr and not os.path.exists(dnerr): | |
| 290 os.makedirs(dnerr) | |
| 291 stderr_path = abserr | |
| 292 | |
| 293 stdout_path = None | |
| 294 if self.stdout is not None: | |
| 295 absout = os.path.join(self.outdir, self.stdout) | |
| 296 dnout = os.path.dirname(absout) | |
| 297 if dnout and not os.path.exists(dnout): | |
| 298 os.makedirs(dnout) | |
| 299 stdout_path = absout | |
| 300 | |
| 301 commands = [Text(x) for x in runtime + self.command_line] | |
| 302 if runtimeContext.secret_store is not None: | |
| 303 commands = runtimeContext.secret_store.retrieve(commands) | |
| 304 env = runtimeContext.secret_store.retrieve(env) | |
| 305 | |
| 306 job_script_contents = None # type: Optional[Text] | |
| 307 builder = getattr(self, "builder", None) # type: Builder | |
| 308 if builder is not None: | |
| 309 job_script_contents = builder.build_job_script(commands) | |
| 310 rcode = _job_popen( | |
| 311 commands, | |
| 312 stdin_path=stdin_path, | |
| 313 stdout_path=stdout_path, | |
| 314 stderr_path=stderr_path, | |
| 315 env=env, | |
| 316 cwd=self.outdir, | |
| 317 job_dir=tempfile.mkdtemp(prefix=getdefault(runtimeContext.tmp_outdir_prefix, DEFAULT_TMP_PREFIX)), | |
| 318 job_script_contents=job_script_contents, | |
| 319 timelimit=self.timelimit, | |
| 320 name=self.name, | |
| 321 monitor_function=monitor_function | |
| 322 ) | |
| 323 | |
| 324 if rcode in self.successCodes: | |
| 325 processStatus = "success" | |
| 326 elif rcode in self.temporaryFailCodes: | |
| 327 processStatus = "temporaryFail" | |
| 328 elif rcode in self.permanentFailCodes: | |
| 329 processStatus = "permanentFail" | |
| 330 elif rcode == 0: | |
| 331 processStatus = "success" | |
| 332 else: | |
| 333 processStatus = "permanentFail" | |
| 334 | |
| 335 if 'listing' in self.generatefiles: | |
| 336 if self.generatemapper: | |
| 337 relink_initialworkdir( | |
| 338 self.generatemapper, self.outdir, self.builder.outdir, | |
| 339 inplace_update=self.inplace_update) | |
| 340 else: | |
| 341 raise ValueError("'lsiting' in self.generatefiles but no " | |
| 342 "generatemapper was setup.") | |
| 343 | |
| 344 outputs = self.collect_outputs(self.outdir, rcode) | |
| 345 outputs = bytes2str_in_dicts(outputs) # type: ignore | |
| 346 except OSError as e: | |
| 347 if e.errno == 2: | |
| 348 if runtime: | |
| 349 _logger.error(u"'%s' not found: %s", runtime[0], Text(e)) | |
| 350 else: | |
| 351 _logger.error(u"'%s' not found: %s", self.command_line[0], Text(e)) | |
| 352 else: | |
| 353 _logger.exception(u"Exception while running job") | |
| 354 processStatus = "permanentFail" | |
| 355 except WorkflowException as err: | |
| 356 _logger.error(u"[job %s] Job error:\n%s", self.name, Text(err)) | |
| 357 processStatus = "permanentFail" | |
| 358 except Exception as e: | |
| 359 _logger.exception(u"Exception while running job") | |
| 360 processStatus = "permanentFail" | |
| 361 if runtimeContext.research_obj is not None \ | |
| 362 and self.prov_obj is not None \ | |
| 363 and runtimeContext.process_run_id is not None: | |
| 364 # creating entities for the outputs produced by each step (in the provenance document) | |
| 365 self.prov_obj.record_process_end(str(self.name), runtimeContext.process_run_id, | |
| 366 outputs, datetime.datetime.now()) | |
| 367 if processStatus != "success": | |
| 368 _logger.warning(u"[job %s] completed %s", self.name, processStatus) | |
| 369 else: | |
| 370 _logger.info(u"[job %s] completed %s", self.name, processStatus) | |
| 371 | |
| 372 if _logger.isEnabledFor(logging.DEBUG): | |
| 373 _logger.debug(u"[job %s] %s", self.name, | |
| 374 json_dumps(outputs, indent=4)) | |
| 375 | |
| 376 if self.generatemapper is not None and runtimeContext.secret_store is not None: | |
| 377 # Delete any runtime-generated files containing secrets. | |
| 378 for _, p in self.generatemapper.items(): | |
| 379 if p.type == "CreateFile": | |
| 380 if runtimeContext.secret_store.has_secret(p.resolved): | |
| 381 host_outdir = self.outdir | |
| 382 container_outdir = self.builder.outdir | |
| 383 host_outdir_tgt = p.target | |
| 384 if p.target.startswith(container_outdir + "/"): | |
| 385 host_outdir_tgt = os.path.join( | |
| 386 host_outdir, p.target[len(container_outdir) + 1:]) | |
| 387 os.remove(host_outdir_tgt) | |
| 388 | |
| 389 if runtimeContext.workflow_eval_lock is None: | |
| 390 raise WorkflowException("runtimeContext.workflow_eval_lock must not be None") | |
| 391 | |
| 392 with runtimeContext.workflow_eval_lock: | |
| 393 self.output_callback(outputs, processStatus) | |
| 394 | |
| 395 if self.stagedir is not None and os.path.exists(self.stagedir): | |
| 396 _logger.debug(u"[job %s] Removing input staging directory %s", self.name, self.stagedir) | |
| 397 shutil.rmtree(self.stagedir, True) | |
| 398 | |
| 399 if runtimeContext.rm_tmpdir: | |
| 400 _logger.debug(u"[job %s] Removing temporary directory %s", self.name, self.tmpdir) | |
| 401 shutil.rmtree(self.tmpdir, True) | |
| 402 | |
| 403 def process_monitor(self, sproc): # type: (subprocess.Popen) -> None | |
| 404 monitor = psutil.Process(sproc.pid) | |
| 405 memory_usage = [None] # Value must be list rather than integer to utilise pass-by-reference in python | |
| 406 | |
| 407 def get_tree_mem_usage(memory_usage): # type: (List[int]) -> None | |
| 408 children = monitor.children() | |
| 409 rss = monitor.memory_info().rss | |
| 410 while len(children): | |
| 411 rss += sum([process.memory_info().rss for process in children]) | |
| 412 children = list(itertools.chain(*[process.children() for process in children])) | |
| 413 if memory_usage[0] is None or rss > memory_usage[0]: | |
| 414 memory_usage[0] = rss | |
| 415 | |
| 416 mem_tm = Timer(interval=1, function=get_tree_mem_usage, args=(memory_usage,)) | |
| 417 mem_tm.daemon = True | |
| 418 mem_tm.start() | |
| 419 sproc.wait() | |
| 420 mem_tm.cancel() | |
| 421 if memory_usage[0] is not None: | |
| 422 _logger.info(u"[job %s] Max memory used: %iMiB", self.name, | |
| 423 round(memory_usage[0] / (2 ** 20))) | |
| 424 else: | |
| 425 _logger.debug(u"Could not collect memory usage, job ended before monitoring began.") | |
| 426 | |
| 427 | |
| 428 class CommandLineJob(JobBase): | |
| 429 def run(self, | |
| 430 runtimeContext, # type: RuntimeContext | |
| 431 tmpdir_lock=None # type: Optional[threading.Lock] | |
| 432 ): # type: (...) -> None | |
| 433 | |
| 434 if tmpdir_lock: | |
| 435 with tmpdir_lock: | |
| 436 if not os.path.exists(self.tmpdir): | |
| 437 os.makedirs(self.tmpdir) | |
| 438 else: | |
| 439 if not os.path.exists(self.tmpdir): | |
| 440 os.makedirs(self.tmpdir) | |
| 441 | |
| 442 self._setup(runtimeContext) | |
| 443 | |
| 444 env = self.environment | |
| 445 vars_to_preserve = runtimeContext.preserve_environment | |
| 446 if runtimeContext.preserve_entire_environment is not False: | |
| 447 vars_to_preserve = os.environ | |
| 448 if vars_to_preserve: | |
| 449 for key, value in os.environ.items(): | |
| 450 if key in vars_to_preserve and key not in env: | |
| 451 # On Windows, subprocess env can't handle unicode. | |
| 452 env[key] = str(value) if onWindows() else value | |
| 453 env["HOME"] = str(self.outdir) if onWindows() else self.outdir | |
| 454 env["TMPDIR"] = str(self.tmpdir) if onWindows() else self.tmpdir | |
| 455 if "PATH" not in env: | |
| 456 env["PATH"] = str(os.environ["PATH"]) if onWindows() else os.environ["PATH"] | |
| 457 if "SYSTEMROOT" not in env and "SYSTEMROOT" in os.environ: | |
| 458 env["SYSTEMROOT"] = str(os.environ["SYSTEMROOT"]) if onWindows() \ | |
| 459 else os.environ["SYSTEMROOT"] | |
| 460 | |
| 461 stage_files(self.pathmapper, ignore_writable=True, symlink=True, | |
| 462 secret_store=runtimeContext.secret_store) | |
| 463 if self.generatemapper is not None: | |
| 464 stage_files(self.generatemapper, ignore_writable=self.inplace_update, | |
| 465 symlink=True, secret_store=runtimeContext.secret_store) | |
| 466 relink_initialworkdir( | |
| 467 self.generatemapper, self.outdir, self.builder.outdir, | |
| 468 inplace_update=self.inplace_update) | |
| 469 | |
| 470 monitor_function = functools.partial(self.process_monitor) | |
| 471 | |
| 472 self._execute([], env, runtimeContext, monitor_function) | |
| 473 | |
| 474 | |
| 475 CONTROL_CODE_RE = r'\x1b\[[0-9;]*[a-zA-Z]' | |
| 476 | |
| 477 | |
| 478 class ContainerCommandLineJob(with_metaclass(ABCMeta, JobBase)): | |
| 479 """Commandline job using containers.""" | |
| 480 | |
| 481 @abstractmethod | |
| 482 def get_from_requirements(self, | |
| 483 r, # type: Dict[Text, Text] | |
| 484 pull_image, # type: bool | |
| 485 force_pull=False, # type: bool | |
| 486 tmp_outdir_prefix=DEFAULT_TMP_PREFIX # type: Text | |
| 487 ): # type: (...) -> Optional[Text] | |
| 488 pass | |
| 489 | |
| 490 @abstractmethod | |
| 491 def create_runtime(self, | |
| 492 env, # type: MutableMapping[Text, Text] | |
| 493 runtime_context # type: RuntimeContext | |
| 494 ): # type: (...) -> Tuple[List[Text], Optional[Text]] | |
| 495 """Return the list of commands to run the selected container engine.""" | |
| 496 pass | |
| 497 | |
| 498 @staticmethod | |
| 499 @abstractmethod | |
| 500 def append_volume(runtime, source, target, writable=False): | |
| 501 # type: (List[Text], Text, Text, bool) -> None | |
| 502 """Add binding arguments to the runtime list.""" | |
| 503 pass | |
| 504 | |
| 505 @abstractmethod | |
| 506 def add_file_or_directory_volume(self, | |
| 507 runtime, # type: List[Text] | |
| 508 volume, # type: MapperEnt | |
| 509 host_outdir_tgt # type: Optional[Text] | |
| 510 ): # type: (...) -> None | |
| 511 """Append volume a file/dir mapping to the runtime option list.""" | |
| 512 pass | |
| 513 | |
| 514 @abstractmethod | |
| 515 def add_writable_file_volume(self, | |
| 516 runtime, # type: List[Text] | |
| 517 volume, # type: MapperEnt | |
| 518 host_outdir_tgt, # type: Optional[Text] | |
| 519 tmpdir_prefix # type: Text | |
| 520 ): # type: (...) -> None | |
| 521 """Append a writable file mapping to the runtime option list.""" | |
| 522 pass | |
| 523 | |
| 524 @abstractmethod | |
| 525 def add_writable_directory_volume(self, | |
| 526 runtime, # type: List[Text] | |
| 527 volume, # type: MapperEnt | |
| 528 host_outdir_tgt, # type: Optional[Text] | |
| 529 tmpdir_prefix # type: Text | |
| 530 ): # type: (...) -> None | |
| 531 """Append a writable directory mapping to the runtime option list.""" | |
| 532 pass | |
| 533 | |
| 534 def create_file_and_add_volume(self, | |
| 535 runtime, # type: List[Text] | |
| 536 volume, # type: MapperEnt | |
| 537 host_outdir_tgt, # type: Optional[Text] | |
| 538 secret_store, # type: Optional[SecretStore] | |
| 539 tmpdir_prefix # type: Text | |
| 540 ): # type: (...) -> Text | |
| 541 """Create the file and add a mapping.""" | |
| 542 if not host_outdir_tgt: | |
| 543 tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) | |
| 544 new_file = os.path.join( | |
| 545 tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir), | |
| 546 os.path.basename(volume.target)) | |
| 547 writable = True if volume.type == "CreateWritableFile" else False | |
| 548 if secret_store: | |
| 549 contents = secret_store.retrieve(volume.resolved) | |
| 550 else: | |
| 551 contents = volume.resolved | |
| 552 dirname = os.path.dirname(host_outdir_tgt or new_file) | |
| 553 if not os.path.exists(dirname): | |
| 554 os.makedirs(dirname) | |
| 555 with open(host_outdir_tgt or new_file, "wb") as file_literal: | |
| 556 file_literal.write(contents.encode("utf-8")) | |
| 557 if not host_outdir_tgt: | |
| 558 self.append_volume(runtime, new_file, volume.target, | |
| 559 writable=writable) | |
| 560 if writable: | |
| 561 ensure_writable(host_outdir_tgt or new_file) | |
| 562 else: | |
| 563 ensure_non_writable(host_outdir_tgt or new_file) | |
| 564 return host_outdir_tgt or new_file | |
| 565 | |
| 566 def add_volumes(self, | |
| 567 pathmapper, # type: PathMapper | |
| 568 runtime, # type: List[Text] | |
| 569 tmpdir_prefix, # type: Text | |
| 570 secret_store=None, # type: Optional[SecretStore] | |
| 571 any_path_okay=False # type: bool | |
| 572 ): # type: (...) -> None | |
| 573 """Append volume mappings to the runtime option list.""" | |
| 574 container_outdir = self.builder.outdir | |
| 575 for key, vol in (itm for itm in pathmapper.items() if itm[1].staged): | |
| 576 host_outdir_tgt = None # type: Optional[Text] | |
| 577 if vol.target.startswith(container_outdir + "/"): | |
| 578 host_outdir_tgt = os.path.join( | |
| 579 self.outdir, vol.target[len(container_outdir) + 1:]) | |
| 580 if not host_outdir_tgt and not any_path_okay: | |
| 581 raise WorkflowException( | |
| 582 "No mandatory DockerRequirement, yet path is outside " | |
| 583 "the designated output directory, also know as " | |
| 584 "$(runtime.outdir): {}".format(vol)) | |
| 585 if vol.type in ("File", "Directory"): | |
| 586 self.add_file_or_directory_volume( | |
| 587 runtime, vol, host_outdir_tgt) | |
| 588 elif vol.type == "WritableFile": | |
| 589 self.add_writable_file_volume( | |
| 590 runtime, vol, host_outdir_tgt, tmpdir_prefix) | |
| 591 elif vol.type == "WritableDirectory": | |
| 592 self.add_writable_directory_volume( | |
| 593 runtime, vol, host_outdir_tgt, tmpdir_prefix) | |
| 594 elif vol.type in ["CreateFile", "CreateWritableFile"]: | |
| 595 new_path = self.create_file_and_add_volume( | |
| 596 runtime, vol, host_outdir_tgt, secret_store, tmpdir_prefix) | |
| 597 pathmapper.update( | |
| 598 key, new_path, vol.target, vol.type, vol.staged) | |
| 599 | |
| 600 def run(self, | |
| 601 runtimeContext, # type: RuntimeContext | |
| 602 tmpdir_lock=None # type: Optional[threading.Lock] | |
| 603 ): # type: (...) -> None | |
| 604 if tmpdir_lock: | |
| 605 with tmpdir_lock: | |
| 606 if not os.path.exists(self.tmpdir): | |
| 607 os.makedirs(self.tmpdir) | |
| 608 else: | |
| 609 if not os.path.exists(self.tmpdir): | |
| 610 os.makedirs(self.tmpdir) | |
| 611 | |
| 612 (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") | |
| 613 self.prov_obj = runtimeContext.prov_obj | |
| 614 img_id = None | |
| 615 env = cast(MutableMapping[Text, Text], os.environ) | |
| 616 user_space_docker_cmd = runtimeContext.user_space_docker_cmd | |
| 617 if docker_req is not None and user_space_docker_cmd: | |
| 618 # For user-space docker implementations, a local image name or ID | |
| 619 # takes precedence over a network pull | |
| 620 if 'dockerImageId' in docker_req: | |
| 621 img_id = str(docker_req["dockerImageId"]) | |
| 622 elif 'dockerPull' in docker_req: | |
| 623 img_id = str(docker_req["dockerPull"]) | |
| 624 cmd = [user_space_docker_cmd, "pull", img_id] | |
| 625 _logger.info(Text(cmd)) | |
| 626 try: | |
| 627 subprocess.check_call(cmd, stdout=sys.stderr) | |
| 628 except OSError: | |
| 629 raise WorkflowException(SourceLine(docker_req).makeError( | |
| 630 "Either Docker container {} is not available with " | |
| 631 "user space docker implementation {} or {} is missing " | |
| 632 "or broken.".format(img_id, user_space_docker_cmd, | |
| 633 user_space_docker_cmd))) | |
| 634 else: | |
| 635 raise WorkflowException(SourceLine(docker_req).makeError( | |
| 636 "Docker image must be specified as 'dockerImageId' or " | |
| 637 "'dockerPull' when using user space implementations of " | |
| 638 "Docker")) | |
| 639 else: | |
| 640 try: | |
| 641 if docker_req is not None and runtimeContext.use_container: | |
| 642 img_id = str( | |
| 643 self.get_from_requirements( | |
| 644 docker_req, runtimeContext.pull_image, | |
| 645 getdefault(runtimeContext.force_docker_pull, False), | |
| 646 getdefault(runtimeContext.tmp_outdir_prefix, DEFAULT_TMP_PREFIX))) | |
| 647 if img_id is None: | |
| 648 if self.builder.find_default_container: | |
| 649 default_container = self.builder.find_default_container() | |
| 650 if default_container: | |
| 651 img_id = str(default_container) | |
| 652 | |
| 653 if docker_req is not None and img_id is None and runtimeContext.use_container: | |
| 654 raise Exception("Docker image not available") | |
| 655 | |
| 656 if self.prov_obj is not None and img_id is not None \ | |
| 657 and runtimeContext.process_run_id is not None: | |
| 658 container_agent = self.prov_obj.document.agent( | |
| 659 uuid.uuid4().urn, | |
| 660 {"prov:type": PROV["SoftwareAgent"], | |
| 661 "cwlprov:image": img_id, | |
| 662 "prov:label": "Container execution of image %s" % img_id}) | |
| 663 # FIXME: img_id is not a sha256 id, it might just be "debian:8" | |
| 664 # img_entity = document.entity("nih:sha-256;%s" % img_id, | |
| 665 # {"prov:label": "Container image %s" % img_id} ) | |
| 666 # The image is the plan for this activity-agent association | |
| 667 # document.wasAssociatedWith(process_run_ID, container_agent, img_entity) | |
| 668 self.prov_obj.document.wasAssociatedWith( | |
| 669 runtimeContext.process_run_id, container_agent) | |
| 670 except Exception as err: | |
| 671 container = "Singularity" if runtimeContext.singularity else "Docker" | |
| 672 _logger.debug(u"%s error", container, exc_info=True) | |
| 673 if docker_is_req: | |
| 674 raise_from(UnsupportedRequirement( | |
| 675 "%s is required to run this tool: %s" % (container, Text(err))), err) | |
| 676 else: | |
| 677 raise WorkflowException( | |
| 678 "{0} is not available for this tool, try " | |
| 679 "--no-container to disable {0}, or install " | |
| 680 "a user space Docker replacement like uDocker with " | |
| 681 "--user-space-docker-cmd.: {1}".format(container, err)) | |
| 682 | |
| 683 self._setup(runtimeContext) | |
| 684 (runtime, cidfile) = self.create_runtime(env, runtimeContext) | |
| 685 runtime.append(Text(img_id)) | |
| 686 monitor_function = None | |
| 687 if cidfile: | |
| 688 monitor_function = functools.partial( | |
| 689 self.docker_monitor, cidfile, runtimeContext.tmpdir_prefix, | |
| 690 not bool(runtimeContext.cidfile_dir)) | |
| 691 elif runtimeContext.user_space_docker_cmd: | |
| 692 monitor_function = functools.partial(self.process_monitor) | |
| 693 self._execute(runtime, env, runtimeContext, monitor_function) | |
| 694 | |
| 695 def docker_monitor(self, cidfile, tmpdir_prefix, cleanup_cidfile, process): | |
| 696 # type: (Text, Text, bool, subprocess.Popen) -> None | |
| 697 """Record memory usage of the running Docker container.""" | |
| 698 # Todo: consider switching to `docker create` / `docker start` | |
| 699 # instead of `docker run` as `docker create` outputs the container ID | |
| 700 # to stdout, but the container is frozen, thus allowing us to start the | |
| 701 # monitoring process without dealing with the cidfile or too-fast | |
| 702 # container execution | |
| 703 cid = None | |
| 704 while cid is None: | |
| 705 time.sleep(1) | |
| 706 if process.returncode is not None: | |
| 707 if cleanup_cidfile: | |
| 708 try: | |
| 709 os.remove(cidfile) | |
| 710 except OSError as exc: | |
| 711 _logger.warn("Ignored error cleaning up Docker cidfile: %s", exc) | |
| 712 return | |
| 713 try: | |
| 714 with open(cidfile) as cidhandle: | |
| 715 cid = cidhandle.readline().strip() | |
| 716 except (OSError, IOError): | |
| 717 cid = None | |
| 718 max_mem = psutil.virtual_memory().total | |
| 719 tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) | |
| 720 stats_file = tempfile.NamedTemporaryFile(prefix=tmp_prefix, dir=tmp_dir) | |
| 721 try: | |
| 722 with open(stats_file.name, mode="w") as stats_file_handle: | |
| 723 stats_proc = subprocess.Popen( | |
| 724 ['docker', 'stats', '--no-trunc', '--format', '{{.MemPerc}}', | |
| 725 cid], stdout=stats_file_handle, stderr=subprocess.DEVNULL) | |
| 726 process.wait() | |
| 727 stats_proc.kill() | |
| 728 except OSError as exc: | |
| 729 _logger.warn("Ignored error with docker stats: %s", exc) | |
| 730 return | |
| 731 max_mem_percent = 0 | |
| 732 with open(stats_file.name, mode="r") as stats: | |
| 733 for line in stats: | |
| 734 try: | |
| 735 mem_percent = float(re.sub( | |
| 736 CONTROL_CODE_RE, '', line).replace('%', '')) | |
| 737 if mem_percent > max_mem_percent: | |
| 738 max_mem_percent = mem_percent | |
| 739 except ValueError: | |
| 740 break | |
| 741 _logger.info(u"[job %s] Max memory used: %iMiB", self.name, | |
| 742 int((max_mem_percent / 100 * max_mem) / (2 ** 20))) | |
| 743 if cleanup_cidfile: | |
| 744 os.remove(cidfile) | |
| 745 | |
| 746 | |
| 747 def _job_popen(commands, # type: List[Text] | |
| 748 stdin_path, # type: Optional[Text] | |
| 749 stdout_path, # type: Optional[Text] | |
| 750 stderr_path, # type: Optional[Text] | |
| 751 env, # type: MutableMapping[AnyStr, AnyStr] | |
| 752 cwd, # type: Text | |
| 753 job_dir, # type: Text | |
| 754 job_script_contents=None, # type: Optional[Text] | |
| 755 timelimit=None, # type: Optional[int] | |
| 756 name=None, # type: Optional[Text] | |
| 757 monitor_function=None # type: Optional[Callable[[subprocess.Popen], None]] | |
| 758 ): # type: (...) -> int | |
| 759 | |
| 760 if job_script_contents is None and not FORCE_SHELLED_POPEN: | |
| 761 | |
| 762 stdin = subprocess.PIPE # type: Union[IO[Any], int] | |
| 763 if stdin_path is not None: | |
| 764 stdin = open(stdin_path, "rb") | |
| 765 | |
| 766 stdout = sys.stderr # type: IO[Any] | |
| 767 if stdout_path is not None: | |
| 768 stdout = open(stdout_path, "wb") | |
| 769 | |
| 770 stderr = sys.stderr # type: IO[Any] | |
| 771 if stderr_path is not None: | |
| 772 stderr = open(stderr_path, "wb") | |
| 773 | |
| 774 sproc = subprocess.Popen(commands, | |
| 775 shell=False, | |
| 776 close_fds=not onWindows(), | |
| 777 stdin=stdin, | |
| 778 stdout=stdout, | |
| 779 stderr=stderr, | |
| 780 env=env, | |
| 781 cwd=cwd) | |
| 782 processes_to_kill.append(sproc) | |
| 783 | |
| 784 if sproc.stdin is not None: | |
| 785 sproc.stdin.close() | |
| 786 | |
| 787 tm = None | |
| 788 if timelimit is not None and timelimit > 0: | |
| 789 def terminate(): # type: () -> None | |
| 790 try: | |
| 791 _logger.warning( | |
| 792 u"[job %s] exceeded time limit of %d seconds and will" | |
| 793 "be terminated", name, timelimit) | |
| 794 sproc.terminate() | |
| 795 except OSError: | |
| 796 pass | |
| 797 | |
| 798 tm = Timer(timelimit, terminate) | |
| 799 tm.daemon = True | |
| 800 tm.start() | |
| 801 if monitor_function: | |
| 802 monitor_function(sproc) | |
| 803 rcode = sproc.wait() | |
| 804 | |
| 805 if tm is not None: | |
| 806 tm.cancel() | |
| 807 | |
| 808 if isinstance(stdin, IOBase): | |
| 809 stdin.close() | |
| 810 | |
| 811 if stdout is not sys.stderr: | |
| 812 stdout.close() | |
| 813 | |
| 814 if stderr is not sys.stderr: | |
| 815 stderr.close() | |
| 816 | |
| 817 return rcode | |
| 818 else: | |
| 819 if job_script_contents is None: | |
| 820 job_script_contents = SHELL_COMMAND_TEMPLATE | |
| 821 | |
| 822 env_copy = {} | |
| 823 key = None # type: Any | |
| 824 for key in env: | |
| 825 env_copy[key] = env[key] | |
| 826 | |
| 827 job_description = { | |
| 828 u"commands": commands, | |
| 829 u"cwd": cwd, | |
| 830 u"env": env_copy, | |
| 831 u"stdout_path": stdout_path, | |
| 832 u"stderr_path": stderr_path, | |
| 833 u"stdin_path": stdin_path} | |
| 834 | |
| 835 if PY2: | |
| 836 with open(os.path.join(job_dir, "job.json"), mode="wb") as job_file: | |
| 837 json_dump(job_description, job_file, ensure_ascii=False) | |
| 838 else: | |
| 839 with open(os.path.join(job_dir, "job.json"), mode="w", | |
| 840 encoding='utf-8') as job_file: | |
| 841 json_dump(job_description, job_file, ensure_ascii=False) | |
| 842 try: | |
| 843 job_script = os.path.join(job_dir, "run_job.bash") | |
| 844 with open(job_script, "wb") as _: | |
| 845 _.write(job_script_contents.encode('utf-8')) | |
| 846 job_run = os.path.join(job_dir, "run_job.py") | |
| 847 with open(job_run, "wb") as _: | |
| 848 _.write(PYTHON_RUN_SCRIPT.encode('utf-8')) | |
| 849 sproc = subprocess.Popen( | |
| 850 ["bash", job_script.encode("utf-8")], | |
| 851 shell=False, | |
| 852 cwd=job_dir, | |
| 853 # The nested script will output the paths to the correct files if they need | |
| 854 # to be captured. Else just write everything to stderr (same as above). | |
| 855 stdout=sys.stderr, | |
| 856 stderr=sys.stderr, | |
| 857 stdin=subprocess.PIPE, | |
| 858 ) | |
| 859 processes_to_kill.append(sproc) | |
| 860 if sproc.stdin is not None: | |
| 861 sproc.stdin.close() | |
| 862 | |
| 863 rcode = sproc.wait() | |
| 864 | |
| 865 return rcode | |
| 866 finally: | |
| 867 shutil.rmtree(job_dir) |
