Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/job.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 import datetime | |
| 2 import functools | |
| 3 import itertools | |
| 4 import logging | |
| 5 import os | |
| 6 import re | |
| 7 import shutil | |
| 8 import subprocess # nosec | |
| 9 import sys | |
| 10 import tempfile | |
| 11 import threading | |
| 12 import time | |
| 13 import uuid | |
| 14 from abc import ABCMeta, abstractmethod | |
| 15 from io import IOBase | |
| 16 from threading import Timer | |
| 17 from typing import ( | |
| 18 IO, | |
| 19 Callable, | |
| 20 Iterable, | |
| 21 List, | |
| 22 Match, | |
| 23 MutableMapping, | |
| 24 MutableSequence, | |
| 25 Optional, | |
| 26 TextIO, | |
| 27 Tuple, | |
| 28 Union, | |
| 29 cast, | |
| 30 ) | |
| 31 | |
| 32 import psutil | |
| 33 import shellescape | |
| 34 from prov.model import PROV | |
| 35 from schema_salad.sourceline import SourceLine | |
| 36 from schema_salad.utils import json_dump, json_dumps | |
| 37 from typing_extensions import TYPE_CHECKING | |
| 38 | |
| 39 from .builder import Builder, HasReqsHints | |
| 40 from .context import RuntimeContext | |
| 41 from .errors import UnsupportedRequirement, WorkflowException | |
| 42 from .loghandler import _logger | |
| 43 from .pathmapper import MapperEnt, PathMapper | |
| 44 from .process import stage_files | |
| 45 from .secrets import SecretStore | |
| 46 from .utils import ( | |
| 47 CWLObjectType, | |
| 48 CWLOutputType, | |
| 49 DirectoryType, | |
| 50 OutputCallbackType, | |
| 51 bytes2str_in_dicts, | |
| 52 copytree_with_merge, | |
| 53 create_tmp_dir, | |
| 54 ensure_non_writable, | |
| 55 ensure_writable, | |
| 56 onWindows, | |
| 57 processes_to_kill, | |
| 58 ) | |
| 59 | |
| 60 if TYPE_CHECKING: | |
| 61 from .provenance_profile import ProvenanceProfile # pylint: disable=unused-import | |
| 62 needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""") | |
| 63 | |
| 64 FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1" | |
| 65 | |
| 66 SHELL_COMMAND_TEMPLATE = """#!/bin/bash | |
| 67 python "run_job.py" "job.json" | |
| 68 """ | |
| 69 | |
| 70 PYTHON_RUN_SCRIPT = """ | |
| 71 import json | |
| 72 import os | |
| 73 import sys | |
| 74 if os.name == 'posix': | |
| 75 try: | |
| 76 import subprocess32 as subprocess # type: ignore | |
| 77 except Exception: | |
| 78 import subprocess | |
| 79 else: | |
| 80 import subprocess # type: ignore | |
| 81 | |
| 82 with open(sys.argv[1], "r") as f: | |
| 83 popen_description = json.load(f) | |
| 84 commands = popen_description["commands"] | |
| 85 cwd = popen_description["cwd"] | |
| 86 env = popen_description["env"] | |
| 87 env["PATH"] = os.environ.get("PATH") | |
| 88 stdin_path = popen_description["stdin_path"] | |
| 89 stdout_path = popen_description["stdout_path"] | |
| 90 stderr_path = popen_description["stderr_path"] | |
| 91 if stdin_path is not None: | |
| 92 stdin = open(stdin_path, "rb") | |
| 93 else: | |
| 94 stdin = subprocess.PIPE | |
| 95 if stdout_path is not None: | |
| 96 stdout = open(stdout_path, "wb") | |
| 97 else: | |
| 98 stdout = sys.stderr | |
| 99 if stderr_path is not None: | |
| 100 stderr = open(stderr_path, "wb") | |
| 101 else: | |
| 102 stderr = sys.stderr | |
| 103 if os.name == 'nt': | |
| 104 close_fds = False | |
| 105 for key, value in env.items(): | |
| 106 env[key] = str(value) | |
| 107 else: | |
| 108 close_fds = True | |
| 109 sp = subprocess.Popen(commands, | |
| 110 shell=False, | |
| 111 close_fds=close_fds, | |
| 112 stdin=stdin, | |
| 113 stdout=stdout, | |
| 114 stderr=stderr, | |
| 115 env=env, | |
| 116 cwd=cwd) | |
| 117 if sp.stdin: | |
| 118 sp.stdin.close() | |
| 119 rcode = sp.wait() | |
| 120 if stdin is not subprocess.PIPE: | |
| 121 stdin.close() | |
| 122 if stdout is not sys.stderr: | |
| 123 stdout.close() | |
| 124 if stderr is not sys.stderr: | |
| 125 stderr.close() | |
| 126 sys.exit(rcode) | |
| 127 """ | |
| 128 | |
| 129 | |
| 130 def relink_initialworkdir( | |
| 131 pathmapper: PathMapper, | |
| 132 host_outdir: str, | |
| 133 container_outdir: str, | |
| 134 inplace_update: bool = False, | |
| 135 ) -> None: | |
| 136 for _, vol in pathmapper.items(): | |
| 137 if not vol.staged: | |
| 138 continue | |
| 139 | |
| 140 if vol.type in ("File", "Directory") or ( | |
| 141 inplace_update and vol.type in ("WritableFile", "WritableDirectory") | |
| 142 ): | |
| 143 if not vol.target.startswith(container_outdir): | |
| 144 # this is an input file written outside of the working | |
| 145 # directory, so therefor ineligable for being an output file. | |
| 146 # Thus, none of our business | |
| 147 continue | |
| 148 host_outdir_tgt = os.path.join( | |
| 149 host_outdir, vol.target[len(container_outdir) + 1 :] | |
| 150 ) | |
| 151 if os.path.islink(host_outdir_tgt) or os.path.isfile(host_outdir_tgt): | |
| 152 try: | |
| 153 os.remove(host_outdir_tgt) | |
| 154 except PermissionError: | |
| 155 pass | |
| 156 elif os.path.isdir(host_outdir_tgt) and not vol.resolved.startswith("_:"): | |
| 157 shutil.rmtree(host_outdir_tgt) | |
| 158 if onWindows(): | |
| 159 # If this becomes a big issue for someone then we could | |
| 160 # refactor the code to process output from a running container | |
| 161 # and avoid all the extra IO below | |
| 162 if vol.type in ("File", "WritableFile"): | |
| 163 shutil.copy(vol.resolved, host_outdir_tgt) | |
| 164 elif vol.type in ("Directory", "WritableDirectory"): | |
| 165 copytree_with_merge(vol.resolved, host_outdir_tgt) | |
| 166 elif not vol.resolved.startswith("_:"): | |
| 167 try: | |
| 168 os.symlink(vol.resolved, host_outdir_tgt) | |
| 169 except FileExistsError: | |
| 170 pass | |
| 171 | |
| 172 | |
| 173 def neverquote(string: str, pos: int = 0, endpos: int = 0) -> Optional[Match[str]]: | |
| 174 return None | |
| 175 | |
| 176 | |
| 177 CollectOutputsType = Union[Callable[[str, int], CWLObjectType], functools.partial] | |
| 178 | |
| 179 | |
| 180 class JobBase(HasReqsHints, metaclass=ABCMeta): | |
| 181 def __init__( | |
| 182 self, | |
| 183 builder: Builder, | |
| 184 joborder: CWLObjectType, | |
| 185 make_path_mapper: Callable[..., PathMapper], | |
| 186 requirements: List[CWLObjectType], | |
| 187 hints: List[CWLObjectType], | |
| 188 name: str, | |
| 189 ) -> None: | |
| 190 """Initialize the job object.""" | |
| 191 super().__init__() | |
| 192 self.builder = builder | |
| 193 self.joborder = joborder | |
| 194 self.stdin = None # type: Optional[str] | |
| 195 self.stderr = None # type: Optional[str] | |
| 196 self.stdout = None # type: Optional[str] | |
| 197 self.successCodes = [] # type: Iterable[int] | |
| 198 self.temporaryFailCodes = [] # type: Iterable[int] | |
| 199 self.permanentFailCodes = [] # type: Iterable[int] | |
| 200 self.requirements = requirements | |
| 201 self.hints = hints | |
| 202 self.name = name | |
| 203 self.command_line = [] # type: List[str] | |
| 204 self.pathmapper = PathMapper([], "", "") | |
| 205 self.make_path_mapper = make_path_mapper | |
| 206 self.generatemapper = None # type: Optional[PathMapper] | |
| 207 | |
| 208 # set in CommandLineTool.job(i) | |
| 209 self.collect_outputs = cast(CollectOutputsType, None) | |
| 210 self.output_callback = None # type: Optional[OutputCallbackType] | |
| 211 self.outdir = "" | |
| 212 self.tmpdir = "" | |
| 213 | |
| 214 self.environment = {} # type: MutableMapping[str, str] | |
| 215 self.generatefiles = { | |
| 216 "class": "Directory", | |
| 217 "listing": [], | |
| 218 "basename": "", | |
| 219 } # type: DirectoryType | |
| 220 self.stagedir = None # type: Optional[str] | |
| 221 self.inplace_update = False | |
| 222 self.prov_obj = None # type: Optional[ProvenanceProfile] | |
| 223 self.parent_wf = None # type: Optional[ProvenanceProfile] | |
| 224 self.timelimit = None # type: Optional[int] | |
| 225 self.networkaccess = False # type: bool | |
| 226 self.mpi_procs = None # type: Optional[int] | |
| 227 | |
| 228 def __repr__(self): # type: () -> str | |
| 229 """Represent this Job object.""" | |
| 230 return "CommandLineJob(%s)" % self.name | |
| 231 | |
| 232 @abstractmethod | |
| 233 def run( | |
| 234 self, | |
| 235 runtimeContext: RuntimeContext, | |
| 236 tmpdir_lock: Optional[threading.Lock] = None, | |
| 237 ) -> None: | |
| 238 pass | |
| 239 | |
| 240 def _setup(self, runtimeContext: RuntimeContext) -> None: | |
| 241 if not os.path.exists(self.outdir): | |
| 242 os.makedirs(self.outdir) | |
| 243 | |
| 244 for knownfile in self.pathmapper.files(): | |
| 245 p = self.pathmapper.mapper(knownfile) | |
| 246 if p.type == "File" and not os.path.isfile(p[0]) and p.staged: | |
| 247 raise WorkflowException( | |
| 248 "Input file %s (at %s) not found or is not a regular " | |
| 249 "file." % (knownfile, self.pathmapper.mapper(knownfile)[0]) | |
| 250 ) | |
| 251 | |
| 252 if "listing" in self.generatefiles: | |
| 253 runtimeContext = runtimeContext.copy() | |
| 254 runtimeContext.outdir = self.outdir | |
| 255 self.generatemapper = self.make_path_mapper( | |
| 256 self.generatefiles["listing"], | |
| 257 self.builder.outdir, | |
| 258 runtimeContext, | |
| 259 False, | |
| 260 ) | |
| 261 if _logger.isEnabledFor(logging.DEBUG): | |
| 262 _logger.debug( | |
| 263 "[job %s] initial work dir %s", | |
| 264 self.name, | |
| 265 json_dumps( | |
| 266 { | |
| 267 p: self.generatemapper.mapper(p) | |
| 268 for p in self.generatemapper.files() | |
| 269 }, | |
| 270 indent=4, | |
| 271 ), | |
| 272 ) | |
| 273 | |
| 274 def _execute( | |
| 275 self, | |
| 276 runtime: List[str], | |
| 277 env: MutableMapping[str, str], | |
| 278 runtimeContext: RuntimeContext, | |
| 279 monitor_function=None, # type: Optional[Callable[[subprocess.Popen[str]], None]] | |
| 280 ) -> None: | |
| 281 | |
| 282 scr = self.get_requirement("ShellCommandRequirement")[0] | |
| 283 | |
| 284 shouldquote = needs_shell_quoting_re.search | |
| 285 if scr is not None: | |
| 286 shouldquote = neverquote | |
| 287 | |
| 288 # If mpi_procs (is not None and > 0) then prepend the | |
| 289 # appropriate MPI job launch command and flags before the | |
| 290 # execution. | |
| 291 if self.mpi_procs: | |
| 292 menv = runtimeContext.mpi_config | |
| 293 mpi_runtime = [ | |
| 294 menv.runner, | |
| 295 menv.nproc_flag, | |
| 296 str(self.mpi_procs), | |
| 297 ] + menv.extra_flags | |
| 298 runtime = mpi_runtime + runtime | |
| 299 menv.pass_through_env_vars(env) | |
| 300 menv.set_env_vars(env) | |
| 301 | |
| 302 _logger.info( | |
| 303 "[job %s] %s$ %s%s%s%s", | |
| 304 self.name, | |
| 305 self.outdir, | |
| 306 " \\\n ".join( | |
| 307 [ | |
| 308 shellescape.quote(str(arg)) if shouldquote(str(arg)) else str(arg) | |
| 309 for arg in (runtime + self.command_line) | |
| 310 ] | |
| 311 ), | |
| 312 " < %s" % self.stdin if self.stdin else "", | |
| 313 " > %s" % os.path.join(self.outdir, self.stdout) if self.stdout else "", | |
| 314 " 2> %s" % os.path.join(self.outdir, self.stderr) if self.stderr else "", | |
| 315 ) | |
| 316 if self.joborder is not None and runtimeContext.research_obj is not None: | |
| 317 job_order = self.joborder | |
| 318 if ( | |
| 319 runtimeContext.process_run_id is not None | |
| 320 and runtimeContext.prov_obj is not None | |
| 321 and isinstance(job_order, (list, dict)) | |
| 322 ): | |
| 323 runtimeContext.prov_obj.used_artefacts( | |
| 324 job_order, runtimeContext.process_run_id, str(self.name) | |
| 325 ) | |
| 326 else: | |
| 327 _logger.warning( | |
| 328 "research_obj set but one of process_run_id " | |
| 329 "or prov_obj is missing from runtimeContext: " | |
| 330 "{}".format(runtimeContext) | |
| 331 ) | |
| 332 outputs = {} # type: CWLObjectType | |
| 333 try: | |
| 334 stdin_path = None | |
| 335 if self.stdin is not None: | |
| 336 rmap = self.pathmapper.reversemap(self.stdin) | |
| 337 if rmap is None: | |
| 338 raise WorkflowException(f"{self.stdin} missing from pathmapper") | |
| 339 else: | |
| 340 stdin_path = rmap[1] | |
| 341 | |
| 342 stderr_path = None | |
| 343 if self.stderr is not None: | |
| 344 abserr = os.path.join(self.outdir, self.stderr) | |
| 345 dnerr = os.path.dirname(abserr) | |
| 346 if dnerr and not os.path.exists(dnerr): | |
| 347 os.makedirs(dnerr) | |
| 348 stderr_path = abserr | |
| 349 | |
| 350 stdout_path = None | |
| 351 if self.stdout is not None: | |
| 352 absout = os.path.join(self.outdir, self.stdout) | |
| 353 dnout = os.path.dirname(absout) | |
| 354 if dnout and not os.path.exists(dnout): | |
| 355 os.makedirs(dnout) | |
| 356 stdout_path = absout | |
| 357 | |
| 358 commands = [str(x) for x in runtime + self.command_line] | |
| 359 if runtimeContext.secret_store is not None: | |
| 360 commands = cast( | |
| 361 List[str], | |
| 362 runtimeContext.secret_store.retrieve(cast(CWLOutputType, commands)), | |
| 363 ) | |
| 364 env = cast( | |
| 365 MutableMapping[str, str], | |
| 366 runtimeContext.secret_store.retrieve(cast(CWLOutputType, env)), | |
| 367 ) | |
| 368 | |
| 369 job_script_contents = None # type: Optional[str] | |
| 370 builder = getattr(self, "builder", None) # type: Builder | |
| 371 if builder is not None: | |
| 372 job_script_contents = builder.build_job_script(commands) | |
| 373 rcode = _job_popen( | |
| 374 commands, | |
| 375 stdin_path=stdin_path, | |
| 376 stdout_path=stdout_path, | |
| 377 stderr_path=stderr_path, | |
| 378 env=env, | |
| 379 cwd=self.outdir, | |
| 380 make_job_dir=lambda: runtimeContext.create_outdir(), | |
| 381 job_script_contents=job_script_contents, | |
| 382 timelimit=self.timelimit, | |
| 383 name=self.name, | |
| 384 monitor_function=monitor_function, | |
| 385 default_stdout=runtimeContext.default_stdout, | |
| 386 default_stderr=runtimeContext.default_stderr, | |
| 387 ) | |
| 388 | |
| 389 if rcode in self.successCodes: | |
| 390 processStatus = "success" | |
| 391 elif rcode in self.temporaryFailCodes: | |
| 392 processStatus = "temporaryFail" | |
| 393 elif rcode in self.permanentFailCodes: | |
| 394 processStatus = "permanentFail" | |
| 395 elif rcode == 0: | |
| 396 processStatus = "success" | |
| 397 else: | |
| 398 processStatus = "permanentFail" | |
| 399 | |
| 400 if "listing" in self.generatefiles: | |
| 401 if self.generatemapper: | |
| 402 relink_initialworkdir( | |
| 403 self.generatemapper, | |
| 404 self.outdir, | |
| 405 self.builder.outdir, | |
| 406 inplace_update=self.inplace_update, | |
| 407 ) | |
| 408 else: | |
| 409 raise ValueError( | |
| 410 "'listing' in self.generatefiles but no " | |
| 411 "generatemapper was setup." | |
| 412 ) | |
| 413 | |
| 414 outputs = self.collect_outputs(self.outdir, rcode) | |
| 415 outputs = bytes2str_in_dicts(outputs) # type: ignore | |
| 416 except OSError as e: | |
| 417 if e.errno == 2: | |
| 418 if runtime: | |
| 419 _logger.error("'%s' not found: %s", runtime[0], str(e)) | |
| 420 else: | |
| 421 _logger.error("'%s' not found: %s", self.command_line[0], str(e)) | |
| 422 else: | |
| 423 _logger.exception("Exception while running job") | |
| 424 processStatus = "permanentFail" | |
| 425 except WorkflowException as err: | |
| 426 _logger.error("[job %s] Job error:\n%s", self.name, str(err)) | |
| 427 processStatus = "permanentFail" | |
| 428 except Exception: | |
| 429 _logger.exception("Exception while running job") | |
| 430 processStatus = "permanentFail" | |
| 431 if ( | |
| 432 runtimeContext.research_obj is not None | |
| 433 and self.prov_obj is not None | |
| 434 and runtimeContext.process_run_id is not None | |
| 435 ): | |
| 436 # creating entities for the outputs produced by each step (in the provenance document) | |
| 437 self.prov_obj.record_process_end( | |
| 438 str(self.name), | |
| 439 runtimeContext.process_run_id, | |
| 440 outputs, | |
| 441 datetime.datetime.now(), | |
| 442 ) | |
| 443 if processStatus != "success": | |
| 444 _logger.warning("[job %s] completed %s", self.name, processStatus) | |
| 445 else: | |
| 446 _logger.info("[job %s] completed %s", self.name, processStatus) | |
| 447 | |
| 448 if _logger.isEnabledFor(logging.DEBUG): | |
| 449 _logger.debug( | |
| 450 "[job %s] outputs %s", self.name, json_dumps(outputs, indent=4) | |
| 451 ) | |
| 452 | |
| 453 if self.generatemapper is not None and runtimeContext.secret_store is not None: | |
| 454 # Delete any runtime-generated files containing secrets. | |
| 455 for _, p in self.generatemapper.items(): | |
| 456 if p.type == "CreateFile": | |
| 457 if runtimeContext.secret_store.has_secret(p.resolved): | |
| 458 host_outdir = self.outdir | |
| 459 container_outdir = self.builder.outdir | |
| 460 host_outdir_tgt = p.target | |
| 461 if p.target.startswith(container_outdir + "/"): | |
| 462 host_outdir_tgt = os.path.join( | |
| 463 host_outdir, p.target[len(container_outdir) + 1 :] | |
| 464 ) | |
| 465 os.remove(host_outdir_tgt) | |
| 466 | |
| 467 if runtimeContext.workflow_eval_lock is None: | |
| 468 raise WorkflowException( | |
| 469 "runtimeContext.workflow_eval_lock must not be None" | |
| 470 ) | |
| 471 | |
| 472 if self.output_callback: | |
| 473 with runtimeContext.workflow_eval_lock: | |
| 474 self.output_callback(outputs, processStatus) | |
| 475 | |
| 476 if self.stagedir is not None and os.path.exists(self.stagedir): | |
| 477 _logger.debug( | |
| 478 "[job %s] Removing input staging directory %s", | |
| 479 self.name, | |
| 480 self.stagedir, | |
| 481 ) | |
| 482 shutil.rmtree(self.stagedir, True) | |
| 483 | |
| 484 if runtimeContext.rm_tmpdir: | |
| 485 _logger.debug( | |
| 486 "[job %s] Removing temporary directory %s", self.name, self.tmpdir | |
| 487 ) | |
| 488 shutil.rmtree(self.tmpdir, True) | |
| 489 | |
| 490 def process_monitor(self, sproc): # type: (subprocess.Popen[str]) -> None | |
| 491 monitor = psutil.Process(sproc.pid) | |
| 492 # Value must be list rather than integer to utilise pass-by-reference in python | |
| 493 memory_usage = [None] # type: MutableSequence[Optional[int]] | |
| 494 | |
| 495 def get_tree_mem_usage(memory_usage: MutableSequence[Optional[int]]) -> None: | |
| 496 children = monitor.children() | |
| 497 rss = monitor.memory_info().rss | |
| 498 while len(children): | |
| 499 rss += sum([process.memory_info().rss for process in children]) | |
| 500 children = list( | |
| 501 itertools.chain(*[process.children() for process in children]) | |
| 502 ) | |
| 503 if memory_usage[0] is None or rss > memory_usage[0]: | |
| 504 memory_usage[0] = rss | |
| 505 | |
| 506 mem_tm = Timer(interval=1, function=get_tree_mem_usage, args=(memory_usage,)) | |
| 507 mem_tm.daemon = True | |
| 508 mem_tm.start() | |
| 509 sproc.wait() | |
| 510 mem_tm.cancel() | |
| 511 if memory_usage[0] is not None: | |
| 512 _logger.info( | |
| 513 "[job %s] Max memory used: %iMiB", | |
| 514 self.name, | |
| 515 round(memory_usage[0] / (2 ** 20)), | |
| 516 ) | |
| 517 else: | |
| 518 _logger.debug( | |
| 519 "Could not collect memory usage, job ended before monitoring began." | |
| 520 ) | |
| 521 | |
| 522 | |
| 523 class CommandLineJob(JobBase): | |
| 524 def run( | |
| 525 self, | |
| 526 runtimeContext: RuntimeContext, | |
| 527 tmpdir_lock: Optional[threading.Lock] = None, | |
| 528 ) -> None: | |
| 529 | |
| 530 if tmpdir_lock: | |
| 531 with tmpdir_lock: | |
| 532 if not os.path.exists(self.tmpdir): | |
| 533 os.makedirs(self.tmpdir) | |
| 534 else: | |
| 535 if not os.path.exists(self.tmpdir): | |
| 536 os.makedirs(self.tmpdir) | |
| 537 | |
| 538 self._setup(runtimeContext) | |
| 539 | |
| 540 env = self.environment | |
| 541 vars_to_preserve = runtimeContext.preserve_environment | |
| 542 if runtimeContext.preserve_entire_environment is not False: | |
| 543 vars_to_preserve = os.environ | |
| 544 if vars_to_preserve: | |
| 545 for key, value in os.environ.items(): | |
| 546 if key in vars_to_preserve and key not in env: | |
| 547 # On Windows, subprocess env can't handle unicode. | |
| 548 env[key] = str(value) if onWindows() else value | |
| 549 env["HOME"] = str(self.outdir) if onWindows() else self.outdir | |
| 550 env["TMPDIR"] = str(self.tmpdir) if onWindows() else self.tmpdir | |
| 551 if "PATH" not in env: | |
| 552 env["PATH"] = str(os.environ["PATH"]) if onWindows() else os.environ["PATH"] | |
| 553 if "SYSTEMROOT" not in env and "SYSTEMROOT" in os.environ: | |
| 554 env["SYSTEMROOT"] = ( | |
| 555 str(os.environ["SYSTEMROOT"]) | |
| 556 if onWindows() | |
| 557 else os.environ["SYSTEMROOT"] | |
| 558 ) | |
| 559 | |
| 560 stage_files( | |
| 561 self.pathmapper, | |
| 562 ignore_writable=True, | |
| 563 symlink=True, | |
| 564 secret_store=runtimeContext.secret_store, | |
| 565 ) | |
| 566 if self.generatemapper is not None: | |
| 567 stage_files( | |
| 568 self.generatemapper, | |
| 569 ignore_writable=self.inplace_update, | |
| 570 symlink=True, | |
| 571 secret_store=runtimeContext.secret_store, | |
| 572 ) | |
| 573 relink_initialworkdir( | |
| 574 self.generatemapper, | |
| 575 self.outdir, | |
| 576 self.builder.outdir, | |
| 577 inplace_update=self.inplace_update, | |
| 578 ) | |
| 579 | |
| 580 monitor_function = functools.partial(self.process_monitor) | |
| 581 | |
| 582 self._execute([], env, runtimeContext, monitor_function) | |
| 583 | |
| 584 | |
| 585 CONTROL_CODE_RE = r"\x1b\[[0-9;]*[a-zA-Z]" | |
| 586 | |
| 587 | |
| 588 class ContainerCommandLineJob(JobBase, metaclass=ABCMeta): | |
| 589 """Commandline job using containers.""" | |
| 590 | |
| 591 @abstractmethod | |
| 592 def get_from_requirements( | |
| 593 self, | |
| 594 r: CWLObjectType, | |
| 595 pull_image: bool, | |
| 596 force_pull: bool, | |
| 597 tmp_outdir_prefix: str, | |
| 598 ) -> Optional[str]: | |
| 599 pass | |
| 600 | |
| 601 @abstractmethod | |
| 602 def create_runtime( | |
| 603 self, | |
| 604 env: MutableMapping[str, str], | |
| 605 runtime_context: RuntimeContext, | |
| 606 ) -> Tuple[List[str], Optional[str]]: | |
| 607 """Return the list of commands to run the selected container engine.""" | |
| 608 | |
| 609 @staticmethod | |
| 610 @abstractmethod | |
| 611 def append_volume( | |
| 612 runtime: List[str], source: str, target: str, writable: bool = False | |
| 613 ) -> None: | |
| 614 """Add binding arguments to the runtime list.""" | |
| 615 | |
| 616 @abstractmethod | |
| 617 def add_file_or_directory_volume( | |
| 618 self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str] | |
| 619 ) -> None: | |
| 620 """Append volume a file/dir mapping to the runtime option list.""" | |
| 621 | |
| 622 @abstractmethod | |
| 623 def add_writable_file_volume( | |
| 624 self, | |
| 625 runtime: List[str], | |
| 626 volume: MapperEnt, | |
| 627 host_outdir_tgt: Optional[str], | |
| 628 tmpdir_prefix: str, | |
| 629 ) -> None: | |
| 630 """Append a writable file mapping to the runtime option list.""" | |
| 631 | |
| 632 @abstractmethod | |
| 633 def add_writable_directory_volume( | |
| 634 self, | |
| 635 runtime: List[str], | |
| 636 volume: MapperEnt, | |
| 637 host_outdir_tgt: Optional[str], | |
| 638 tmpdir_prefix: str, | |
| 639 ) -> None: | |
| 640 """Append a writable directory mapping to the runtime option list.""" | |
| 641 | |
| 642 def create_file_and_add_volume( | |
| 643 self, | |
| 644 runtime: List[str], | |
| 645 volume: MapperEnt, | |
| 646 host_outdir_tgt: Optional[str], | |
| 647 secret_store: Optional[SecretStore], | |
| 648 tmpdir_prefix: str, | |
| 649 ) -> str: | |
| 650 """Create the file and add a mapping.""" | |
| 651 if not host_outdir_tgt: | |
| 652 new_file = os.path.join( | |
| 653 create_tmp_dir(tmpdir_prefix), | |
| 654 os.path.basename(volume.target), | |
| 655 ) | |
| 656 writable = True if volume.type == "CreateWritableFile" else False | |
| 657 contents = volume.resolved | |
| 658 if secret_store: | |
| 659 contents = cast(str, secret_store.retrieve(volume.resolved)) | |
| 660 dirname = os.path.dirname(host_outdir_tgt or new_file) | |
| 661 if not os.path.exists(dirname): | |
| 662 os.makedirs(dirname) | |
| 663 with open(host_outdir_tgt or new_file, "wb") as file_literal: | |
| 664 file_literal.write(contents.encode("utf-8")) | |
| 665 if not host_outdir_tgt: | |
| 666 self.append_volume(runtime, new_file, volume.target, writable=writable) | |
| 667 if writable: | |
| 668 ensure_writable(host_outdir_tgt or new_file) | |
| 669 else: | |
| 670 ensure_non_writable(host_outdir_tgt or new_file) | |
| 671 return host_outdir_tgt or new_file | |
| 672 | |
| 673 def add_volumes( | |
| 674 self, | |
| 675 pathmapper: PathMapper, | |
| 676 runtime: List[str], | |
| 677 tmpdir_prefix: str, | |
| 678 secret_store: Optional[SecretStore] = None, | |
| 679 any_path_okay: bool = False, | |
| 680 ) -> None: | |
| 681 """Append volume mappings to the runtime option list.""" | |
| 682 container_outdir = self.builder.outdir | |
| 683 for key, vol in (itm for itm in pathmapper.items() if itm[1].staged): | |
| 684 host_outdir_tgt = None # type: Optional[str] | |
| 685 if vol.target.startswith(container_outdir + "/"): | |
| 686 host_outdir_tgt = os.path.join( | |
| 687 self.outdir, vol.target[len(container_outdir) + 1 :] | |
| 688 ) | |
| 689 if not host_outdir_tgt and not any_path_okay: | |
| 690 raise WorkflowException( | |
| 691 "No mandatory DockerRequirement, yet path is outside " | |
| 692 "the designated output directory, also know as " | |
| 693 "$(runtime.outdir): {}".format(vol) | |
| 694 ) | |
| 695 if vol.type in ("File", "Directory"): | |
| 696 self.add_file_or_directory_volume(runtime, vol, host_outdir_tgt) | |
| 697 elif vol.type == "WritableFile": | |
| 698 self.add_writable_file_volume( | |
| 699 runtime, vol, host_outdir_tgt, tmpdir_prefix | |
| 700 ) | |
| 701 elif vol.type == "WritableDirectory": | |
| 702 self.add_writable_directory_volume( | |
| 703 runtime, vol, host_outdir_tgt, tmpdir_prefix | |
| 704 ) | |
| 705 elif vol.type in ["CreateFile", "CreateWritableFile"]: | |
| 706 new_path = self.create_file_and_add_volume( | |
| 707 runtime, vol, host_outdir_tgt, secret_store, tmpdir_prefix | |
| 708 ) | |
| 709 pathmapper.update(key, new_path, vol.target, vol.type, vol.staged) | |
| 710 | |
| 711 def run( | |
| 712 self, | |
| 713 runtimeContext: RuntimeContext, | |
| 714 tmpdir_lock: Optional[threading.Lock] = None, | |
| 715 ) -> None: | |
| 716 if tmpdir_lock: | |
| 717 with tmpdir_lock: | |
| 718 if not os.path.exists(self.tmpdir): | |
| 719 os.makedirs(self.tmpdir) | |
| 720 else: | |
| 721 if not os.path.exists(self.tmpdir): | |
| 722 os.makedirs(self.tmpdir) | |
| 723 | |
| 724 (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") | |
| 725 self.prov_obj = runtimeContext.prov_obj | |
| 726 img_id = None | |
| 727 env = cast(MutableMapping[str, str], os.environ) | |
| 728 user_space_docker_cmd = runtimeContext.user_space_docker_cmd | |
| 729 if docker_req is not None and user_space_docker_cmd: | |
| 730 # For user-space docker implementations, a local image name or ID | |
| 731 # takes precedence over a network pull | |
| 732 if "dockerImageId" in docker_req: | |
| 733 img_id = str(docker_req["dockerImageId"]) | |
| 734 elif "dockerPull" in docker_req: | |
| 735 img_id = str(docker_req["dockerPull"]) | |
| 736 cmd = [user_space_docker_cmd, "pull", img_id] | |
| 737 _logger.info(str(cmd)) | |
| 738 try: | |
| 739 subprocess.check_call(cmd, stdout=sys.stderr) # nosec | |
| 740 except OSError: | |
| 741 raise WorkflowException( | |
| 742 SourceLine(docker_req).makeError( | |
| 743 "Either Docker container {} is not available with " | |
| 744 "user space docker implementation {} or {} is missing " | |
| 745 "or broken.".format( | |
| 746 img_id, user_space_docker_cmd, user_space_docker_cmd | |
| 747 ) | |
| 748 ) | |
| 749 ) | |
| 750 else: | |
| 751 raise WorkflowException( | |
| 752 SourceLine(docker_req).makeError( | |
| 753 "Docker image must be specified as 'dockerImageId' or " | |
| 754 "'dockerPull' when using user space implementations of " | |
| 755 "Docker" | |
| 756 ) | |
| 757 ) | |
| 758 else: | |
| 759 try: | |
| 760 if docker_req is not None and runtimeContext.use_container: | |
| 761 img_id = str( | |
| 762 self.get_from_requirements( | |
| 763 docker_req, | |
| 764 runtimeContext.pull_image, | |
| 765 runtimeContext.force_docker_pull, | |
| 766 runtimeContext.tmp_outdir_prefix, | |
| 767 ) | |
| 768 ) | |
| 769 if img_id is None: | |
| 770 if self.builder.find_default_container: | |
| 771 default_container = self.builder.find_default_container() | |
| 772 if default_container: | |
| 773 img_id = str(default_container) | |
| 774 | |
| 775 if ( | |
| 776 docker_req is not None | |
| 777 and img_id is None | |
| 778 and runtimeContext.use_container | |
| 779 ): | |
| 780 raise Exception("Docker image not available") | |
| 781 | |
| 782 if ( | |
| 783 self.prov_obj is not None | |
| 784 and img_id is not None | |
| 785 and runtimeContext.process_run_id is not None | |
| 786 ): | |
| 787 container_agent = self.prov_obj.document.agent( | |
| 788 uuid.uuid4().urn, | |
| 789 { | |
| 790 "prov:type": PROV["SoftwareAgent"], | |
| 791 "cwlprov:image": img_id, | |
| 792 "prov:label": "Container execution of image %s" % img_id, | |
| 793 }, | |
| 794 ) | |
| 795 # FIXME: img_id is not a sha256 id, it might just be "debian:8" | |
| 796 # img_entity = document.entity("nih:sha-256;%s" % img_id, | |
| 797 # {"prov:label": "Container image %s" % img_id} ) | |
| 798 # The image is the plan for this activity-agent association | |
| 799 # document.wasAssociatedWith(process_run_ID, container_agent, img_entity) | |
| 800 self.prov_obj.document.wasAssociatedWith( | |
| 801 runtimeContext.process_run_id, container_agent | |
| 802 ) | |
| 803 except Exception as err: | |
| 804 container = "Singularity" if runtimeContext.singularity else "Docker" | |
| 805 _logger.debug("%s error", container, exc_info=True) | |
| 806 if docker_is_req: | |
| 807 raise UnsupportedRequirement( | |
| 808 "{} is required to run this tool: {}".format( | |
| 809 container, str(err) | |
| 810 ) | |
| 811 ) from err | |
| 812 else: | |
| 813 raise WorkflowException( | |
| 814 "{0} is not available for this tool, try " | |
| 815 "--no-container to disable {0}, or install " | |
| 816 "a user space Docker replacement like uDocker with " | |
| 817 "--user-space-docker-cmd.: {1}".format(container, err) | |
| 818 ) | |
| 819 | |
| 820 self._setup(runtimeContext) | |
| 821 (runtime, cidfile) = self.create_runtime(env, runtimeContext) | |
| 822 runtime.append(str(img_id)) | |
| 823 monitor_function = None | |
| 824 if cidfile: | |
| 825 monitor_function = functools.partial( | |
| 826 self.docker_monitor, | |
| 827 cidfile, | |
| 828 runtimeContext.tmpdir_prefix, | |
| 829 not bool(runtimeContext.cidfile_dir), | |
| 830 ) | |
| 831 elif runtimeContext.user_space_docker_cmd: | |
| 832 monitor_function = functools.partial(self.process_monitor) | |
| 833 self._execute(runtime, env, runtimeContext, monitor_function) | |
| 834 | |
| 835 def docker_monitor( | |
| 836 self, | |
| 837 cidfile: str, | |
| 838 tmpdir_prefix: str, | |
| 839 cleanup_cidfile: bool, | |
| 840 process, # type: subprocess.Popen[str] | |
| 841 ) -> None: | |
| 842 """Record memory usage of the running Docker container.""" | |
| 843 # Todo: consider switching to `docker create` / `docker start` | |
| 844 # instead of `docker run` as `docker create` outputs the container ID | |
| 845 # to stdout, but the container is frozen, thus allowing us to start the | |
| 846 # monitoring process without dealing with the cidfile or too-fast | |
| 847 # container execution | |
| 848 cid = None # type: Optional[str] | |
| 849 while cid is None: | |
| 850 time.sleep(1) | |
| 851 if process.returncode is not None: | |
| 852 if cleanup_cidfile: | |
| 853 try: | |
| 854 os.remove(cidfile) | |
| 855 except OSError as exc: | |
| 856 _logger.warn( | |
| 857 "Ignored error cleaning up Docker cidfile: %s", exc | |
| 858 ) | |
| 859 return | |
| 860 try: | |
| 861 with open(cidfile) as cidhandle: | |
| 862 cid = cidhandle.readline().strip() | |
| 863 except (OSError): | |
| 864 cid = None | |
| 865 max_mem = psutil.virtual_memory().total | |
| 866 tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) | |
| 867 stats_file = tempfile.NamedTemporaryFile(prefix=tmp_prefix, dir=tmp_dir) | |
| 868 stats_file_name = stats_file.name | |
| 869 try: | |
| 870 with open(stats_file_name, mode="w") as stats_file_handle: | |
| 871 stats_proc = subprocess.Popen( # nosec | |
| 872 ["docker", "stats", "--no-trunc", "--format", "{{.MemPerc}}", cid], | |
| 873 stdout=stats_file_handle, | |
| 874 stderr=subprocess.DEVNULL, | |
| 875 ) | |
| 876 process.wait() | |
| 877 stats_proc.kill() | |
| 878 except OSError as exc: | |
| 879 _logger.warn("Ignored error with docker stats: %s", exc) | |
| 880 return | |
| 881 max_mem_percent = 0 # type: float | |
| 882 mem_percent = 0 # type: float | |
| 883 with open(stats_file_name, mode="r") as stats: | |
| 884 while True: | |
| 885 line = stats.readline() | |
| 886 if not line: | |
| 887 break | |
| 888 try: | |
| 889 mem_percent = float( | |
| 890 re.sub(CONTROL_CODE_RE, "", line).replace("%", "") | |
| 891 ) | |
| 892 if mem_percent > max_mem_percent: | |
| 893 max_mem_percent = mem_percent | |
| 894 except ValueError: | |
| 895 break | |
| 896 _logger.info( | |
| 897 "[job %s] Max memory used: %iMiB", | |
| 898 self.name, | |
| 899 int((max_mem_percent / 100 * max_mem) / (2 ** 20)), | |
| 900 ) | |
| 901 if cleanup_cidfile: | |
| 902 os.remove(cidfile) | |
| 903 | |
| 904 | |
| 905 def _job_popen( | |
| 906 commands: List[str], | |
| 907 stdin_path: Optional[str], | |
| 908 stdout_path: Optional[str], | |
| 909 stderr_path: Optional[str], | |
| 910 env: MutableMapping[str, str], | |
| 911 cwd: str, | |
| 912 make_job_dir: Callable[[], str], | |
| 913 job_script_contents: Optional[str] = None, | |
| 914 timelimit: Optional[int] = None, | |
| 915 name: Optional[str] = None, | |
| 916 monitor_function=None, # type: Optional[Callable[[subprocess.Popen[str]], None]] | |
| 917 default_stdout=None, # type: Optional[Union[IO[bytes], TextIO]] | |
| 918 default_stderr=None, # type: Optional[Union[IO[bytes], TextIO]] | |
| 919 ) -> int: | |
| 920 | |
| 921 if job_script_contents is None and not FORCE_SHELLED_POPEN: | |
| 922 | |
| 923 stdin = subprocess.PIPE # type: Union[IO[bytes], int] | |
| 924 if stdin_path is not None: | |
| 925 stdin = open(stdin_path, "rb") | |
| 926 | |
| 927 stdout = ( | |
| 928 default_stdout if default_stdout is not None else sys.stderr | |
| 929 ) # type: Union[IO[bytes], TextIO] | |
| 930 if stdout_path is not None: | |
| 931 stdout = open(stdout_path, "wb") | |
| 932 | |
| 933 stderr = ( | |
| 934 default_stderr if default_stderr is not None else sys.stderr | |
| 935 ) # type: Union[IO[bytes], TextIO] | |
| 936 if stderr_path is not None: | |
| 937 stderr = open(stderr_path, "wb") | |
| 938 | |
| 939 sproc = subprocess.Popen( | |
| 940 commands, | |
| 941 shell=False, # nosec | |
| 942 close_fds=not onWindows(), | |
| 943 stdin=stdin, | |
| 944 stdout=stdout, | |
| 945 stderr=stderr, | |
| 946 env=env, | |
| 947 cwd=cwd, | |
| 948 universal_newlines=True, | |
| 949 ) | |
| 950 processes_to_kill.append(sproc) | |
| 951 | |
| 952 if sproc.stdin is not None: | |
| 953 sproc.stdin.close() | |
| 954 | |
| 955 tm = None | |
| 956 if timelimit is not None and timelimit > 0: | |
| 957 | |
| 958 def terminate(): # type: () -> None | |
| 959 try: | |
| 960 _logger.warning( | |
| 961 "[job %s] exceeded time limit of %d seconds and will be terminated", | |
| 962 name, | |
| 963 timelimit, | |
| 964 ) | |
| 965 sproc.terminate() | |
| 966 except OSError: | |
| 967 pass | |
| 968 | |
| 969 tm = Timer(timelimit, terminate) | |
| 970 tm.daemon = True | |
| 971 tm.start() | |
| 972 if monitor_function: | |
| 973 monitor_function(sproc) | |
| 974 rcode = sproc.wait() | |
| 975 | |
| 976 if tm is not None: | |
| 977 tm.cancel() | |
| 978 | |
| 979 if isinstance(stdin, IOBase) and hasattr(stdin, "close"): | |
| 980 stdin.close() | |
| 981 | |
| 982 if stdout is not sys.stderr and hasattr(stdout, "close"): | |
| 983 stdout.close() | |
| 984 | |
| 985 if stderr is not sys.stderr and hasattr(stderr, "close"): | |
| 986 stderr.close() | |
| 987 | |
| 988 return rcode | |
| 989 else: | |
| 990 if job_script_contents is None: | |
| 991 job_script_contents = SHELL_COMMAND_TEMPLATE | |
| 992 | |
| 993 env_copy = {} | |
| 994 key = None # type: Optional[str] | |
| 995 for key in env: | |
| 996 env_copy[key] = env[key] | |
| 997 | |
| 998 job_description = { | |
| 999 "commands": commands, | |
| 1000 "cwd": cwd, | |
| 1001 "env": env_copy, | |
| 1002 "stdout_path": stdout_path, | |
| 1003 "stderr_path": stderr_path, | |
| 1004 "stdin_path": stdin_path, | |
| 1005 } | |
| 1006 | |
| 1007 job_dir = make_job_dir() | |
| 1008 try: | |
| 1009 with open( | |
| 1010 os.path.join(job_dir, "job.json"), mode="w", encoding="utf-8" | |
| 1011 ) as job_file: | |
| 1012 json_dump(job_description, job_file, ensure_ascii=False) | |
| 1013 job_script = os.path.join(job_dir, "run_job.bash") | |
| 1014 with open(job_script, "wb") as _: | |
| 1015 _.write(job_script_contents.encode("utf-8")) | |
| 1016 job_run = os.path.join(job_dir, "run_job.py") | |
| 1017 with open(job_run, "wb") as _: | |
| 1018 _.write(PYTHON_RUN_SCRIPT.encode("utf-8")) | |
| 1019 sproc = subprocess.Popen( # nosec | |
| 1020 ["bash", job_script.encode("utf-8")], | |
| 1021 shell=False, # nosec | |
| 1022 cwd=job_dir, | |
| 1023 # The nested script will output the paths to the correct files if they need | |
| 1024 # to be captured. Else just write everything to stderr (same as above). | |
| 1025 stdout=sys.stderr, | |
| 1026 stderr=sys.stderr, | |
| 1027 stdin=subprocess.PIPE, | |
| 1028 universal_newlines=True, | |
| 1029 ) | |
| 1030 processes_to_kill.append(sproc) | |
| 1031 if sproc.stdin is not None: | |
| 1032 sproc.stdin.close() | |
| 1033 | |
| 1034 rcode = sproc.wait() | |
| 1035 | |
| 1036 return rcode | |
| 1037 finally: | |
| 1038 shutil.rmtree(job_dir) |
