diff env/lib/python3.9/site-packages/cwltool/job.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/cwltool/job.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,1038 @@
+import datetime
+import functools
+import itertools
+import logging
+import os
+import re
+import shutil
+import subprocess  # nosec
+import sys
+import tempfile
+import threading
+import time
+import uuid
+from abc import ABCMeta, abstractmethod
+from io import IOBase
+from threading import Timer
+from typing import (
+    IO,
+    Callable,
+    Iterable,
+    List,
+    Match,
+    MutableMapping,
+    MutableSequence,
+    Optional,
+    TextIO,
+    Tuple,
+    Union,
+    cast,
+)
+
+import psutil
+import shellescape
+from prov.model import PROV
+from schema_salad.sourceline import SourceLine
+from schema_salad.utils import json_dump, json_dumps
+from typing_extensions import TYPE_CHECKING
+
+from .builder import Builder, HasReqsHints
+from .context import RuntimeContext
+from .errors import UnsupportedRequirement, WorkflowException
+from .loghandler import _logger
+from .pathmapper import MapperEnt, PathMapper
+from .process import stage_files
+from .secrets import SecretStore
+from .utils import (
+    CWLObjectType,
+    CWLOutputType,
+    DirectoryType,
+    OutputCallbackType,
+    bytes2str_in_dicts,
+    copytree_with_merge,
+    create_tmp_dir,
+    ensure_non_writable,
+    ensure_writable,
+    onWindows,
+    processes_to_kill,
+)
+
+if TYPE_CHECKING:
+    from .provenance_profile import ProvenanceProfile  # pylint: disable=unused-import
+needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""")
+
+FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1"
+
+SHELL_COMMAND_TEMPLATE = """#!/bin/bash
+python "run_job.py" "job.json"
+"""
+
+PYTHON_RUN_SCRIPT = """
+import json
+import os
+import sys
+if os.name == 'posix':
+    try:
+        import subprocess32 as subprocess  # type: ignore
+    except Exception:
+        import subprocess
+else:
+    import subprocess  # type: ignore
+
+with open(sys.argv[1], "r") as f:
+    popen_description = json.load(f)
+    commands = popen_description["commands"]
+    cwd = popen_description["cwd"]
+    env = popen_description["env"]
+    env["PATH"] = os.environ.get("PATH")
+    stdin_path = popen_description["stdin_path"]
+    stdout_path = popen_description["stdout_path"]
+    stderr_path = popen_description["stderr_path"]
+    if stdin_path is not None:
+        stdin = open(stdin_path, "rb")
+    else:
+        stdin = subprocess.PIPE
+    if stdout_path is not None:
+        stdout = open(stdout_path, "wb")
+    else:
+        stdout = sys.stderr
+    if stderr_path is not None:
+        stderr = open(stderr_path, "wb")
+    else:
+        stderr = sys.stderr
+    if os.name == 'nt':
+        close_fds = False
+        for key, value in env.items():
+            env[key] = str(value)
+    else:
+        close_fds = True
+    sp = subprocess.Popen(commands,
+                          shell=False,
+                          close_fds=close_fds,
+                          stdin=stdin,
+                          stdout=stdout,
+                          stderr=stderr,
+                          env=env,
+                          cwd=cwd)
+    if sp.stdin:
+        sp.stdin.close()
+    rcode = sp.wait()
+    if stdin is not subprocess.PIPE:
+        stdin.close()
+    if stdout is not sys.stderr:
+        stdout.close()
+    if stderr is not sys.stderr:
+        stderr.close()
+    sys.exit(rcode)
+"""
+
+
+def relink_initialworkdir(
+    pathmapper: PathMapper,
+    host_outdir: str,
+    container_outdir: str,
+    inplace_update: bool = False,
+) -> None:
+    for _, vol in pathmapper.items():
+        if not vol.staged:
+            continue
+
+        if vol.type in ("File", "Directory") or (
+            inplace_update and vol.type in ("WritableFile", "WritableDirectory")
+        ):
+            if not vol.target.startswith(container_outdir):
+                # this is an input file written outside of the working
+                # directory, so therefor ineligable for being an output file.
+                # Thus, none of our business
+                continue
+            host_outdir_tgt = os.path.join(
+                host_outdir, vol.target[len(container_outdir) + 1 :]
+            )
+            if os.path.islink(host_outdir_tgt) or os.path.isfile(host_outdir_tgt):
+                try:
+                    os.remove(host_outdir_tgt)
+                except PermissionError:
+                    pass
+            elif os.path.isdir(host_outdir_tgt) and not vol.resolved.startswith("_:"):
+                shutil.rmtree(host_outdir_tgt)
+            if onWindows():
+                # If this becomes a big issue for someone then we could
+                # refactor the code to process output from a running container
+                # and avoid all the extra IO below
+                if vol.type in ("File", "WritableFile"):
+                    shutil.copy(vol.resolved, host_outdir_tgt)
+                elif vol.type in ("Directory", "WritableDirectory"):
+                    copytree_with_merge(vol.resolved, host_outdir_tgt)
+            elif not vol.resolved.startswith("_:"):
+                try:
+                    os.symlink(vol.resolved, host_outdir_tgt)
+                except FileExistsError:
+                    pass
+
+
+def neverquote(string: str, pos: int = 0, endpos: int = 0) -> Optional[Match[str]]:
+    return None
+
+
+CollectOutputsType = Union[Callable[[str, int], CWLObjectType], functools.partial]
+
+
+class JobBase(HasReqsHints, metaclass=ABCMeta):
+    def __init__(
+        self,
+        builder: Builder,
+        joborder: CWLObjectType,
+        make_path_mapper: Callable[..., PathMapper],
+        requirements: List[CWLObjectType],
+        hints: List[CWLObjectType],
+        name: str,
+    ) -> None:
+        """Initialize the job object."""
+        super().__init__()
+        self.builder = builder
+        self.joborder = joborder
+        self.stdin = None  # type: Optional[str]
+        self.stderr = None  # type: Optional[str]
+        self.stdout = None  # type: Optional[str]
+        self.successCodes = []  # type: Iterable[int]
+        self.temporaryFailCodes = []  # type: Iterable[int]
+        self.permanentFailCodes = []  # type: Iterable[int]
+        self.requirements = requirements
+        self.hints = hints
+        self.name = name
+        self.command_line = []  # type: List[str]
+        self.pathmapper = PathMapper([], "", "")
+        self.make_path_mapper = make_path_mapper
+        self.generatemapper = None  # type: Optional[PathMapper]
+
+        # set in CommandLineTool.job(i)
+        self.collect_outputs = cast(CollectOutputsType, None)
+        self.output_callback = None  # type: Optional[OutputCallbackType]
+        self.outdir = ""
+        self.tmpdir = ""
+
+        self.environment = {}  # type: MutableMapping[str, str]
+        self.generatefiles = {
+            "class": "Directory",
+            "listing": [],
+            "basename": "",
+        }  # type: DirectoryType
+        self.stagedir = None  # type: Optional[str]
+        self.inplace_update = False
+        self.prov_obj = None  # type: Optional[ProvenanceProfile]
+        self.parent_wf = None  # type: Optional[ProvenanceProfile]
+        self.timelimit = None  # type: Optional[int]
+        self.networkaccess = False  # type: bool
+        self.mpi_procs = None  # type: Optional[int]
+
+    def __repr__(self):  # type: () -> str
+        """Represent this Job object."""
+        return "CommandLineJob(%s)" % self.name
+
+    @abstractmethod
+    def run(
+        self,
+        runtimeContext: RuntimeContext,
+        tmpdir_lock: Optional[threading.Lock] = None,
+    ) -> None:
+        pass
+
+    def _setup(self, runtimeContext: RuntimeContext) -> None:
+        if not os.path.exists(self.outdir):
+            os.makedirs(self.outdir)
+
+        for knownfile in self.pathmapper.files():
+            p = self.pathmapper.mapper(knownfile)
+            if p.type == "File" and not os.path.isfile(p[0]) and p.staged:
+                raise WorkflowException(
+                    "Input file %s (at %s) not found or is not a regular "
+                    "file." % (knownfile, self.pathmapper.mapper(knownfile)[0])
+                )
+
+        if "listing" in self.generatefiles:
+            runtimeContext = runtimeContext.copy()
+            runtimeContext.outdir = self.outdir
+            self.generatemapper = self.make_path_mapper(
+                self.generatefiles["listing"],
+                self.builder.outdir,
+                runtimeContext,
+                False,
+            )
+            if _logger.isEnabledFor(logging.DEBUG):
+                _logger.debug(
+                    "[job %s] initial work dir %s",
+                    self.name,
+                    json_dumps(
+                        {
+                            p: self.generatemapper.mapper(p)
+                            for p in self.generatemapper.files()
+                        },
+                        indent=4,
+                    ),
+                )
+
+    def _execute(
+        self,
+        runtime: List[str],
+        env: MutableMapping[str, str],
+        runtimeContext: RuntimeContext,
+        monitor_function=None,  # type: Optional[Callable[[subprocess.Popen[str]], None]]
+    ) -> None:
+
+        scr = self.get_requirement("ShellCommandRequirement")[0]
+
+        shouldquote = needs_shell_quoting_re.search
+        if scr is not None:
+            shouldquote = neverquote
+
+        # If mpi_procs (is not None and > 0) then prepend the
+        # appropriate MPI job launch command and flags before the
+        # execution.
+        if self.mpi_procs:
+            menv = runtimeContext.mpi_config
+            mpi_runtime = [
+                menv.runner,
+                menv.nproc_flag,
+                str(self.mpi_procs),
+            ] + menv.extra_flags
+            runtime = mpi_runtime + runtime
+            menv.pass_through_env_vars(env)
+            menv.set_env_vars(env)
+
+        _logger.info(
+            "[job %s] %s$ %s%s%s%s",
+            self.name,
+            self.outdir,
+            " \\\n    ".join(
+                [
+                    shellescape.quote(str(arg)) if shouldquote(str(arg)) else str(arg)
+                    for arg in (runtime + self.command_line)
+                ]
+            ),
+            " < %s" % self.stdin if self.stdin else "",
+            " > %s" % os.path.join(self.outdir, self.stdout) if self.stdout else "",
+            " 2> %s" % os.path.join(self.outdir, self.stderr) if self.stderr else "",
+        )
+        if self.joborder is not None and runtimeContext.research_obj is not None:
+            job_order = self.joborder
+            if (
+                runtimeContext.process_run_id is not None
+                and runtimeContext.prov_obj is not None
+                and isinstance(job_order, (list, dict))
+            ):
+                runtimeContext.prov_obj.used_artefacts(
+                    job_order, runtimeContext.process_run_id, str(self.name)
+                )
+            else:
+                _logger.warning(
+                    "research_obj set but one of process_run_id "
+                    "or prov_obj is missing from runtimeContext: "
+                    "{}".format(runtimeContext)
+                )
+        outputs = {}  # type: CWLObjectType
+        try:
+            stdin_path = None
+            if self.stdin is not None:
+                rmap = self.pathmapper.reversemap(self.stdin)
+                if rmap is None:
+                    raise WorkflowException(f"{self.stdin} missing from pathmapper")
+                else:
+                    stdin_path = rmap[1]
+
+            stderr_path = None
+            if self.stderr is not None:
+                abserr = os.path.join(self.outdir, self.stderr)
+                dnerr = os.path.dirname(abserr)
+                if dnerr and not os.path.exists(dnerr):
+                    os.makedirs(dnerr)
+                stderr_path = abserr
+
+            stdout_path = None
+            if self.stdout is not None:
+                absout = os.path.join(self.outdir, self.stdout)
+                dnout = os.path.dirname(absout)
+                if dnout and not os.path.exists(dnout):
+                    os.makedirs(dnout)
+                stdout_path = absout
+
+            commands = [str(x) for x in runtime + self.command_line]
+            if runtimeContext.secret_store is not None:
+                commands = cast(
+                    List[str],
+                    runtimeContext.secret_store.retrieve(cast(CWLOutputType, commands)),
+                )
+                env = cast(
+                    MutableMapping[str, str],
+                    runtimeContext.secret_store.retrieve(cast(CWLOutputType, env)),
+                )
+
+            job_script_contents = None  # type: Optional[str]
+            builder = getattr(self, "builder", None)  # type: Builder
+            if builder is not None:
+                job_script_contents = builder.build_job_script(commands)
+            rcode = _job_popen(
+                commands,
+                stdin_path=stdin_path,
+                stdout_path=stdout_path,
+                stderr_path=stderr_path,
+                env=env,
+                cwd=self.outdir,
+                make_job_dir=lambda: runtimeContext.create_outdir(),
+                job_script_contents=job_script_contents,
+                timelimit=self.timelimit,
+                name=self.name,
+                monitor_function=monitor_function,
+                default_stdout=runtimeContext.default_stdout,
+                default_stderr=runtimeContext.default_stderr,
+            )
+
+            if rcode in self.successCodes:
+                processStatus = "success"
+            elif rcode in self.temporaryFailCodes:
+                processStatus = "temporaryFail"
+            elif rcode in self.permanentFailCodes:
+                processStatus = "permanentFail"
+            elif rcode == 0:
+                processStatus = "success"
+            else:
+                processStatus = "permanentFail"
+
+            if "listing" in self.generatefiles:
+                if self.generatemapper:
+                    relink_initialworkdir(
+                        self.generatemapper,
+                        self.outdir,
+                        self.builder.outdir,
+                        inplace_update=self.inplace_update,
+                    )
+                else:
+                    raise ValueError(
+                        "'listing' in self.generatefiles but no "
+                        "generatemapper was setup."
+                    )
+
+            outputs = self.collect_outputs(self.outdir, rcode)
+            outputs = bytes2str_in_dicts(outputs)  # type: ignore
+        except OSError as e:
+            if e.errno == 2:
+                if runtime:
+                    _logger.error("'%s' not found: %s", runtime[0], str(e))
+                else:
+                    _logger.error("'%s' not found: %s", self.command_line[0], str(e))
+            else:
+                _logger.exception("Exception while running job")
+            processStatus = "permanentFail"
+        except WorkflowException as err:
+            _logger.error("[job %s] Job error:\n%s", self.name, str(err))
+            processStatus = "permanentFail"
+        except Exception:
+            _logger.exception("Exception while running job")
+            processStatus = "permanentFail"
+        if (
+            runtimeContext.research_obj is not None
+            and self.prov_obj is not None
+            and runtimeContext.process_run_id is not None
+        ):
+            # creating entities for the outputs produced by each step (in the provenance document)
+            self.prov_obj.record_process_end(
+                str(self.name),
+                runtimeContext.process_run_id,
+                outputs,
+                datetime.datetime.now(),
+            )
+        if processStatus != "success":
+            _logger.warning("[job %s] completed %s", self.name, processStatus)
+        else:
+            _logger.info("[job %s] completed %s", self.name, processStatus)
+
+        if _logger.isEnabledFor(logging.DEBUG):
+            _logger.debug(
+                "[job %s] outputs %s", self.name, json_dumps(outputs, indent=4)
+            )
+
+        if self.generatemapper is not None and runtimeContext.secret_store is not None:
+            # Delete any runtime-generated files containing secrets.
+            for _, p in self.generatemapper.items():
+                if p.type == "CreateFile":
+                    if runtimeContext.secret_store.has_secret(p.resolved):
+                        host_outdir = self.outdir
+                        container_outdir = self.builder.outdir
+                        host_outdir_tgt = p.target
+                        if p.target.startswith(container_outdir + "/"):
+                            host_outdir_tgt = os.path.join(
+                                host_outdir, p.target[len(container_outdir) + 1 :]
+                            )
+                        os.remove(host_outdir_tgt)
+
+        if runtimeContext.workflow_eval_lock is None:
+            raise WorkflowException(
+                "runtimeContext.workflow_eval_lock must not be None"
+            )
+
+        if self.output_callback:
+            with runtimeContext.workflow_eval_lock:
+                self.output_callback(outputs, processStatus)
+
+        if self.stagedir is not None and os.path.exists(self.stagedir):
+            _logger.debug(
+                "[job %s] Removing input staging directory %s",
+                self.name,
+                self.stagedir,
+            )
+            shutil.rmtree(self.stagedir, True)
+
+        if runtimeContext.rm_tmpdir:
+            _logger.debug(
+                "[job %s] Removing temporary directory %s", self.name, self.tmpdir
+            )
+            shutil.rmtree(self.tmpdir, True)
+
+    def process_monitor(self, sproc):  # type: (subprocess.Popen[str]) -> None
+        monitor = psutil.Process(sproc.pid)
+        # Value must be list rather than integer to utilise pass-by-reference in python
+        memory_usage = [None]  # type: MutableSequence[Optional[int]]
+
+        def get_tree_mem_usage(memory_usage: MutableSequence[Optional[int]]) -> None:
+            children = monitor.children()
+            rss = monitor.memory_info().rss
+            while len(children):
+                rss += sum([process.memory_info().rss for process in children])
+                children = list(
+                    itertools.chain(*[process.children() for process in children])
+                )
+            if memory_usage[0] is None or rss > memory_usage[0]:
+                memory_usage[0] = rss
+
+        mem_tm = Timer(interval=1, function=get_tree_mem_usage, args=(memory_usage,))
+        mem_tm.daemon = True
+        mem_tm.start()
+        sproc.wait()
+        mem_tm.cancel()
+        if memory_usage[0] is not None:
+            _logger.info(
+                "[job %s] Max memory used: %iMiB",
+                self.name,
+                round(memory_usage[0] / (2 ** 20)),
+            )
+        else:
+            _logger.debug(
+                "Could not collect memory usage, job ended before monitoring began."
+            )
+
+
+class CommandLineJob(JobBase):
+    def run(
+        self,
+        runtimeContext: RuntimeContext,
+        tmpdir_lock: Optional[threading.Lock] = None,
+    ) -> None:
+
+        if tmpdir_lock:
+            with tmpdir_lock:
+                if not os.path.exists(self.tmpdir):
+                    os.makedirs(self.tmpdir)
+        else:
+            if not os.path.exists(self.tmpdir):
+                os.makedirs(self.tmpdir)
+
+        self._setup(runtimeContext)
+
+        env = self.environment
+        vars_to_preserve = runtimeContext.preserve_environment
+        if runtimeContext.preserve_entire_environment is not False:
+            vars_to_preserve = os.environ
+        if vars_to_preserve:
+            for key, value in os.environ.items():
+                if key in vars_to_preserve and key not in env:
+                    # On Windows, subprocess env can't handle unicode.
+                    env[key] = str(value) if onWindows() else value
+        env["HOME"] = str(self.outdir) if onWindows() else self.outdir
+        env["TMPDIR"] = str(self.tmpdir) if onWindows() else self.tmpdir
+        if "PATH" not in env:
+            env["PATH"] = str(os.environ["PATH"]) if onWindows() else os.environ["PATH"]
+        if "SYSTEMROOT" not in env and "SYSTEMROOT" in os.environ:
+            env["SYSTEMROOT"] = (
+                str(os.environ["SYSTEMROOT"])
+                if onWindows()
+                else os.environ["SYSTEMROOT"]
+            )
+
+        stage_files(
+            self.pathmapper,
+            ignore_writable=True,
+            symlink=True,
+            secret_store=runtimeContext.secret_store,
+        )
+        if self.generatemapper is not None:
+            stage_files(
+                self.generatemapper,
+                ignore_writable=self.inplace_update,
+                symlink=True,
+                secret_store=runtimeContext.secret_store,
+            )
+            relink_initialworkdir(
+                self.generatemapper,
+                self.outdir,
+                self.builder.outdir,
+                inplace_update=self.inplace_update,
+            )
+
+        monitor_function = functools.partial(self.process_monitor)
+
+        self._execute([], env, runtimeContext, monitor_function)
+
+
+CONTROL_CODE_RE = r"\x1b\[[0-9;]*[a-zA-Z]"
+
+
+class ContainerCommandLineJob(JobBase, metaclass=ABCMeta):
+    """Commandline job using containers."""
+
+    @abstractmethod
+    def get_from_requirements(
+        self,
+        r: CWLObjectType,
+        pull_image: bool,
+        force_pull: bool,
+        tmp_outdir_prefix: str,
+    ) -> Optional[str]:
+        pass
+
+    @abstractmethod
+    def create_runtime(
+        self,
+        env: MutableMapping[str, str],
+        runtime_context: RuntimeContext,
+    ) -> Tuple[List[str], Optional[str]]:
+        """Return the list of commands to run the selected container engine."""
+
+    @staticmethod
+    @abstractmethod
+    def append_volume(
+        runtime: List[str], source: str, target: str, writable: bool = False
+    ) -> None:
+        """Add binding arguments to the runtime list."""
+
+    @abstractmethod
+    def add_file_or_directory_volume(
+        self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str]
+    ) -> None:
+        """Append volume a file/dir mapping to the runtime option list."""
+
+    @abstractmethod
+    def add_writable_file_volume(
+        self,
+        runtime: List[str],
+        volume: MapperEnt,
+        host_outdir_tgt: Optional[str],
+        tmpdir_prefix: str,
+    ) -> None:
+        """Append a writable file mapping to the runtime option list."""
+
+    @abstractmethod
+    def add_writable_directory_volume(
+        self,
+        runtime: List[str],
+        volume: MapperEnt,
+        host_outdir_tgt: Optional[str],
+        tmpdir_prefix: str,
+    ) -> None:
+        """Append a writable directory mapping to the runtime option list."""
+
+    def create_file_and_add_volume(
+        self,
+        runtime: List[str],
+        volume: MapperEnt,
+        host_outdir_tgt: Optional[str],
+        secret_store: Optional[SecretStore],
+        tmpdir_prefix: str,
+    ) -> str:
+        """Create the file and add a mapping."""
+        if not host_outdir_tgt:
+            new_file = os.path.join(
+                create_tmp_dir(tmpdir_prefix),
+                os.path.basename(volume.target),
+            )
+        writable = True if volume.type == "CreateWritableFile" else False
+        contents = volume.resolved
+        if secret_store:
+            contents = cast(str, secret_store.retrieve(volume.resolved))
+        dirname = os.path.dirname(host_outdir_tgt or new_file)
+        if not os.path.exists(dirname):
+            os.makedirs(dirname)
+        with open(host_outdir_tgt or new_file, "wb") as file_literal:
+            file_literal.write(contents.encode("utf-8"))
+        if not host_outdir_tgt:
+            self.append_volume(runtime, new_file, volume.target, writable=writable)
+        if writable:
+            ensure_writable(host_outdir_tgt or new_file)
+        else:
+            ensure_non_writable(host_outdir_tgt or new_file)
+        return host_outdir_tgt or new_file
+
+    def add_volumes(
+        self,
+        pathmapper: PathMapper,
+        runtime: List[str],
+        tmpdir_prefix: str,
+        secret_store: Optional[SecretStore] = None,
+        any_path_okay: bool = False,
+    ) -> None:
+        """Append volume mappings to the runtime option list."""
+        container_outdir = self.builder.outdir
+        for key, vol in (itm for itm in pathmapper.items() if itm[1].staged):
+            host_outdir_tgt = None  # type: Optional[str]
+            if vol.target.startswith(container_outdir + "/"):
+                host_outdir_tgt = os.path.join(
+                    self.outdir, vol.target[len(container_outdir) + 1 :]
+                )
+            if not host_outdir_tgt and not any_path_okay:
+                raise WorkflowException(
+                    "No mandatory DockerRequirement, yet path is outside "
+                    "the designated output directory, also know as "
+                    "$(runtime.outdir): {}".format(vol)
+                )
+            if vol.type in ("File", "Directory"):
+                self.add_file_or_directory_volume(runtime, vol, host_outdir_tgt)
+            elif vol.type == "WritableFile":
+                self.add_writable_file_volume(
+                    runtime, vol, host_outdir_tgt, tmpdir_prefix
+                )
+            elif vol.type == "WritableDirectory":
+                self.add_writable_directory_volume(
+                    runtime, vol, host_outdir_tgt, tmpdir_prefix
+                )
+            elif vol.type in ["CreateFile", "CreateWritableFile"]:
+                new_path = self.create_file_and_add_volume(
+                    runtime, vol, host_outdir_tgt, secret_store, tmpdir_prefix
+                )
+                pathmapper.update(key, new_path, vol.target, vol.type, vol.staged)
+
+    def run(
+        self,
+        runtimeContext: RuntimeContext,
+        tmpdir_lock: Optional[threading.Lock] = None,
+    ) -> None:
+        if tmpdir_lock:
+            with tmpdir_lock:
+                if not os.path.exists(self.tmpdir):
+                    os.makedirs(self.tmpdir)
+        else:
+            if not os.path.exists(self.tmpdir):
+                os.makedirs(self.tmpdir)
+
+        (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
+        self.prov_obj = runtimeContext.prov_obj
+        img_id = None
+        env = cast(MutableMapping[str, str], os.environ)
+        user_space_docker_cmd = runtimeContext.user_space_docker_cmd
+        if docker_req is not None and user_space_docker_cmd:
+            # For user-space docker implementations, a local image name or ID
+            # takes precedence over a network pull
+            if "dockerImageId" in docker_req:
+                img_id = str(docker_req["dockerImageId"])
+            elif "dockerPull" in docker_req:
+                img_id = str(docker_req["dockerPull"])
+                cmd = [user_space_docker_cmd, "pull", img_id]
+                _logger.info(str(cmd))
+                try:
+                    subprocess.check_call(cmd, stdout=sys.stderr)  # nosec
+                except OSError:
+                    raise WorkflowException(
+                        SourceLine(docker_req).makeError(
+                            "Either Docker container {} is not available with "
+                            "user space docker implementation {} or {} is missing "
+                            "or broken.".format(
+                                img_id, user_space_docker_cmd, user_space_docker_cmd
+                            )
+                        )
+                    )
+            else:
+                raise WorkflowException(
+                    SourceLine(docker_req).makeError(
+                        "Docker image must be specified as 'dockerImageId' or "
+                        "'dockerPull' when using user space implementations of "
+                        "Docker"
+                    )
+                )
+        else:
+            try:
+                if docker_req is not None and runtimeContext.use_container:
+                    img_id = str(
+                        self.get_from_requirements(
+                            docker_req,
+                            runtimeContext.pull_image,
+                            runtimeContext.force_docker_pull,
+                            runtimeContext.tmp_outdir_prefix,
+                        )
+                    )
+                if img_id is None:
+                    if self.builder.find_default_container:
+                        default_container = self.builder.find_default_container()
+                        if default_container:
+                            img_id = str(default_container)
+
+                if (
+                    docker_req is not None
+                    and img_id is None
+                    and runtimeContext.use_container
+                ):
+                    raise Exception("Docker image not available")
+
+                if (
+                    self.prov_obj is not None
+                    and img_id is not None
+                    and runtimeContext.process_run_id is not None
+                ):
+                    container_agent = self.prov_obj.document.agent(
+                        uuid.uuid4().urn,
+                        {
+                            "prov:type": PROV["SoftwareAgent"],
+                            "cwlprov:image": img_id,
+                            "prov:label": "Container execution of image %s" % img_id,
+                        },
+                    )
+                    # FIXME: img_id is not a sha256 id, it might just be "debian:8"
+                    # img_entity = document.entity("nih:sha-256;%s" % img_id,
+                    #                  {"prov:label": "Container image %s" % img_id} )
+                    # The image is the plan for this activity-agent association
+                    # document.wasAssociatedWith(process_run_ID, container_agent, img_entity)
+                    self.prov_obj.document.wasAssociatedWith(
+                        runtimeContext.process_run_id, container_agent
+                    )
+            except Exception as err:
+                container = "Singularity" if runtimeContext.singularity else "Docker"
+                _logger.debug("%s error", container, exc_info=True)
+                if docker_is_req:
+                    raise UnsupportedRequirement(
+                        "{} is required to run this tool: {}".format(
+                            container, str(err)
+                        )
+                    ) from err
+                else:
+                    raise WorkflowException(
+                        "{0} is not available for this tool, try "
+                        "--no-container to disable {0}, or install "
+                        "a user space Docker replacement like uDocker with "
+                        "--user-space-docker-cmd.: {1}".format(container, err)
+                    )
+
+        self._setup(runtimeContext)
+        (runtime, cidfile) = self.create_runtime(env, runtimeContext)
+        runtime.append(str(img_id))
+        monitor_function = None
+        if cidfile:
+            monitor_function = functools.partial(
+                self.docker_monitor,
+                cidfile,
+                runtimeContext.tmpdir_prefix,
+                not bool(runtimeContext.cidfile_dir),
+            )
+        elif runtimeContext.user_space_docker_cmd:
+            monitor_function = functools.partial(self.process_monitor)
+        self._execute(runtime, env, runtimeContext, monitor_function)
+
+    def docker_monitor(
+        self,
+        cidfile: str,
+        tmpdir_prefix: str,
+        cleanup_cidfile: bool,
+        process,  # type: subprocess.Popen[str]
+    ) -> None:
+        """Record memory usage of the running Docker container."""
+        # Todo: consider switching to `docker create` / `docker start`
+        # instead of `docker run` as `docker create` outputs the container ID
+        # to stdout, but the container is frozen, thus allowing us to start the
+        # monitoring process without dealing with the cidfile or too-fast
+        # container execution
+        cid = None  # type: Optional[str]
+        while cid is None:
+            time.sleep(1)
+            if process.returncode is not None:
+                if cleanup_cidfile:
+                    try:
+                        os.remove(cidfile)
+                    except OSError as exc:
+                        _logger.warn(
+                            "Ignored error cleaning up Docker cidfile: %s", exc
+                        )
+                    return
+            try:
+                with open(cidfile) as cidhandle:
+                    cid = cidhandle.readline().strip()
+            except (OSError):
+                cid = None
+        max_mem = psutil.virtual_memory().total
+        tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix)
+        stats_file = tempfile.NamedTemporaryFile(prefix=tmp_prefix, dir=tmp_dir)
+        stats_file_name = stats_file.name
+        try:
+            with open(stats_file_name, mode="w") as stats_file_handle:
+                stats_proc = subprocess.Popen(  # nosec
+                    ["docker", "stats", "--no-trunc", "--format", "{{.MemPerc}}", cid],
+                    stdout=stats_file_handle,
+                    stderr=subprocess.DEVNULL,
+                )
+                process.wait()
+                stats_proc.kill()
+        except OSError as exc:
+            _logger.warn("Ignored error with docker stats: %s", exc)
+            return
+        max_mem_percent = 0  # type: float
+        mem_percent = 0  # type: float
+        with open(stats_file_name, mode="r") as stats:
+            while True:
+                line = stats.readline()
+                if not line:
+                    break
+                try:
+                    mem_percent = float(
+                        re.sub(CONTROL_CODE_RE, "", line).replace("%", "")
+                    )
+                    if mem_percent > max_mem_percent:
+                        max_mem_percent = mem_percent
+                except ValueError:
+                    break
+        _logger.info(
+            "[job %s] Max memory used: %iMiB",
+            self.name,
+            int((max_mem_percent / 100 * max_mem) / (2 ** 20)),
+        )
+        if cleanup_cidfile:
+            os.remove(cidfile)
+
+
+def _job_popen(
+    commands: List[str],
+    stdin_path: Optional[str],
+    stdout_path: Optional[str],
+    stderr_path: Optional[str],
+    env: MutableMapping[str, str],
+    cwd: str,
+    make_job_dir: Callable[[], str],
+    job_script_contents: Optional[str] = None,
+    timelimit: Optional[int] = None,
+    name: Optional[str] = None,
+    monitor_function=None,  # type: Optional[Callable[[subprocess.Popen[str]], None]]
+    default_stdout=None,  # type: Optional[Union[IO[bytes], TextIO]]
+    default_stderr=None,  # type: Optional[Union[IO[bytes], TextIO]]
+) -> int:
+
+    if job_script_contents is None and not FORCE_SHELLED_POPEN:
+
+        stdin = subprocess.PIPE  # type: Union[IO[bytes], int]
+        if stdin_path is not None:
+            stdin = open(stdin_path, "rb")
+
+        stdout = (
+            default_stdout if default_stdout is not None else sys.stderr
+        )  # type: Union[IO[bytes], TextIO]
+        if stdout_path is not None:
+            stdout = open(stdout_path, "wb")
+
+        stderr = (
+            default_stderr if default_stderr is not None else sys.stderr
+        )  # type: Union[IO[bytes], TextIO]
+        if stderr_path is not None:
+            stderr = open(stderr_path, "wb")
+
+        sproc = subprocess.Popen(
+            commands,
+            shell=False,  # nosec
+            close_fds=not onWindows(),
+            stdin=stdin,
+            stdout=stdout,
+            stderr=stderr,
+            env=env,
+            cwd=cwd,
+            universal_newlines=True,
+        )
+        processes_to_kill.append(sproc)
+
+        if sproc.stdin is not None:
+            sproc.stdin.close()
+
+        tm = None
+        if timelimit is not None and timelimit > 0:
+
+            def terminate():  # type: () -> None
+                try:
+                    _logger.warning(
+                        "[job %s] exceeded time limit of %d seconds and will be terminated",
+                        name,
+                        timelimit,
+                    )
+                    sproc.terminate()
+                except OSError:
+                    pass
+
+            tm = Timer(timelimit, terminate)
+            tm.daemon = True
+            tm.start()
+        if monitor_function:
+            monitor_function(sproc)
+        rcode = sproc.wait()
+
+        if tm is not None:
+            tm.cancel()
+
+        if isinstance(stdin, IOBase) and hasattr(stdin, "close"):
+            stdin.close()
+
+        if stdout is not sys.stderr and hasattr(stdout, "close"):
+            stdout.close()
+
+        if stderr is not sys.stderr and hasattr(stderr, "close"):
+            stderr.close()
+
+        return rcode
+    else:
+        if job_script_contents is None:
+            job_script_contents = SHELL_COMMAND_TEMPLATE
+
+        env_copy = {}
+        key = None  # type: Optional[str]
+        for key in env:
+            env_copy[key] = env[key]
+
+        job_description = {
+            "commands": commands,
+            "cwd": cwd,
+            "env": env_copy,
+            "stdout_path": stdout_path,
+            "stderr_path": stderr_path,
+            "stdin_path": stdin_path,
+        }
+
+        job_dir = make_job_dir()
+        try:
+            with open(
+                os.path.join(job_dir, "job.json"), mode="w", encoding="utf-8"
+            ) as job_file:
+                json_dump(job_description, job_file, ensure_ascii=False)
+            job_script = os.path.join(job_dir, "run_job.bash")
+            with open(job_script, "wb") as _:
+                _.write(job_script_contents.encode("utf-8"))
+            job_run = os.path.join(job_dir, "run_job.py")
+            with open(job_run, "wb") as _:
+                _.write(PYTHON_RUN_SCRIPT.encode("utf-8"))
+            sproc = subprocess.Popen(  # nosec
+                ["bash", job_script.encode("utf-8")],
+                shell=False,  # nosec
+                cwd=job_dir,
+                # The nested script will output the paths to the correct files if they need
+                # to be captured. Else just write everything to stderr (same as above).
+                stdout=sys.stderr,
+                stderr=sys.stderr,
+                stdin=subprocess.PIPE,
+                universal_newlines=True,
+            )
+            processes_to_kill.append(sproc)
+            if sproc.stdin is not None:
+                sproc.stdin.close()
+
+            rcode = sproc.wait()
+
+            return rcode
+        finally:
+            shutil.rmtree(job_dir)