diff env/lib/python3.9/site-packages/cwltool/docker.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/cwltool/docker.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,456 @@
+"""Enables Docker software containers via the {u,}docker runtimes."""
+
+import csv
+import datetime
+import os
+import re
+import shutil
+import subprocess  # nosec
+import sys
+import threading
+from distutils import spawn
+from io import StringIO  # pylint: disable=redefined-builtin
+from typing import Callable, Dict, List, MutableMapping, Optional, Set, Tuple, cast
+
+import requests
+
+from .builder import Builder
+from .context import RuntimeContext
+from .docker_id import docker_vm_id
+from .errors import WorkflowException
+from .job import ContainerCommandLineJob
+from .loghandler import _logger
+from .pathmapper import MapperEnt, PathMapper
+from .utils import (
+    CWLObjectType,
+    create_tmp_dir,
+    docker_windows_path_adjust,
+    ensure_writable,
+    onWindows,
+)
+
+_IMAGES = set()  # type: Set[str]
+_IMAGES_LOCK = threading.Lock()
+__docker_machine_mounts = None  # type: Optional[List[str]]
+__docker_machine_mounts_lock = threading.Lock()
+
+
+def _get_docker_machine_mounts() -> List[str]:
+    global __docker_machine_mounts
+    if __docker_machine_mounts is None:
+        with __docker_machine_mounts_lock:
+            if "DOCKER_MACHINE_NAME" not in os.environ:
+                __docker_machine_mounts = []
+            else:
+                __docker_machine_mounts = [
+                    "/" + line.split(None, 1)[0]
+                    for line in subprocess.check_output(  # nosec
+                        [
+                            "docker-machine",
+                            "ssh",
+                            os.environ["DOCKER_MACHINE_NAME"],
+                            "mount",
+                            "-t",
+                            "vboxsf",
+                        ],
+                        universal_newlines=True,
+                    ).splitlines()
+                ]
+    return __docker_machine_mounts
+
+
+def _check_docker_machine_path(path: Optional[str]) -> None:
+    if path is None:
+        return
+    if onWindows():
+        path = path.lower()
+    mounts = _get_docker_machine_mounts()
+
+    found = False
+    for mount in mounts:
+        if onWindows():
+            mount = mount.lower()
+        if path.startswith(mount):
+            found = True
+            break
+
+    if not found and mounts:
+        name = os.environ.get("DOCKER_MACHINE_NAME", "???")
+        raise WorkflowException(
+            "Input path {path} is not in the list of host paths mounted "
+            "into the Docker virtual machine named {name}. Already mounted "
+            "paths: {mounts}.\n"
+            "See https://docs.docker.com/toolbox/toolbox_install_windows/"
+            "#optional-add-shared-directories for instructions on how to "
+            "add this path to your VM.".format(path=path, name=name, mounts=mounts)
+        )
+
+
+class DockerCommandLineJob(ContainerCommandLineJob):
+    """Runs a CommandLineJob in a sofware container using the Docker engine."""
+
+    def __init__(
+        self,
+        builder: Builder,
+        joborder: CWLObjectType,
+        make_path_mapper: Callable[..., PathMapper],
+        requirements: List[CWLObjectType],
+        hints: List[CWLObjectType],
+        name: str,
+    ) -> None:
+        """Initialize a command line builder using the Docker software container engine."""
+        super().__init__(builder, joborder, make_path_mapper, requirements, hints, name)
+
+    @staticmethod
+    def get_image(
+        docker_requirement: Dict[str, str],
+        pull_image: bool,
+        force_pull: bool,
+        tmp_outdir_prefix: str,
+    ) -> bool:
+        """
+        Retrieve the relevant Docker container image.
+
+        Returns True upon success
+        """
+        found = False
+
+        if (
+            "dockerImageId" not in docker_requirement
+            and "dockerPull" in docker_requirement
+        ):
+            docker_requirement["dockerImageId"] = docker_requirement["dockerPull"]
+
+        with _IMAGES_LOCK:
+            if docker_requirement["dockerImageId"] in _IMAGES:
+                return True
+
+        for line in (
+            subprocess.check_output(  # nosec
+                ["docker", "images", "--no-trunc", "--all"]
+            )
+            .decode("utf-8")
+            .splitlines()
+        ):
+            try:
+                match = re.match(r"^([^ ]+)\s+([^ ]+)\s+([^ ]+)", line)
+                split = docker_requirement["dockerImageId"].split(":")
+                if len(split) == 1:
+                    split.append("latest")
+                elif len(split) == 2:
+                    #  if split[1] doesn't  match valid tag names, it is a part of repository
+                    if not re.match(r"[\w][\w.-]{0,127}", split[1]):
+                        split[0] = split[0] + ":" + split[1]
+                        split[1] = "latest"
+                elif len(split) == 3:
+                    if re.match(r"[\w][\w.-]{0,127}", split[2]):
+                        split[0] = split[0] + ":" + split[1]
+                        split[1] = split[2]
+                        del split[2]
+
+                # check for repository:tag match or image id match
+                if match and (
+                    (split[0] == match.group(1) and split[1] == match.group(2))
+                    or docker_requirement["dockerImageId"] == match.group(3)
+                ):
+                    found = True
+                    break
+            except ValueError:
+                pass
+
+        if (force_pull or not found) and pull_image:
+            cmd = []  # type: List[str]
+            if "dockerPull" in docker_requirement:
+                cmd = ["docker", "pull", str(docker_requirement["dockerPull"])]
+                _logger.info(str(cmd))
+                subprocess.check_call(cmd, stdout=sys.stderr)  # nosec
+                found = True
+            elif "dockerFile" in docker_requirement:
+                dockerfile_dir = create_tmp_dir(tmp_outdir_prefix)
+                with open(os.path.join(dockerfile_dir, "Dockerfile"), "wb") as dfile:
+                    dfile.write(docker_requirement["dockerFile"].encode("utf-8"))
+                cmd = [
+                    "docker",
+                    "build",
+                    "--tag=%s" % str(docker_requirement["dockerImageId"]),
+                    dockerfile_dir,
+                ]
+                _logger.info(str(cmd))
+                subprocess.check_call(cmd, stdout=sys.stderr)  # nosec
+                found = True
+            elif "dockerLoad" in docker_requirement:
+                cmd = ["docker", "load"]
+                _logger.info(str(cmd))
+                if os.path.exists(docker_requirement["dockerLoad"]):
+                    _logger.info(
+                        "Loading docker image from %s",
+                        docker_requirement["dockerLoad"],
+                    )
+                    with open(docker_requirement["dockerLoad"], "rb") as dload:
+                        loadproc = subprocess.Popen(  # nosec
+                            cmd, stdin=dload, stdout=sys.stderr
+                        )
+                else:
+                    loadproc = subprocess.Popen(  # nosec
+                        cmd, stdin=subprocess.PIPE, stdout=sys.stderr
+                    )
+                    assert loadproc.stdin is not None  # nosec
+                    _logger.info(
+                        "Sending GET request to %s", docker_requirement["dockerLoad"]
+                    )
+                    req = requests.get(docker_requirement["dockerLoad"], stream=True)
+                    size = 0
+                    for chunk in req.iter_content(1024 * 1024):
+                        size += len(chunk)
+                        _logger.info("\r%i bytes", size)
+                        loadproc.stdin.write(chunk)
+                    loadproc.stdin.close()
+                rcode = loadproc.wait()
+                if rcode != 0:
+                    raise WorkflowException(
+                        "Docker load returned non-zero exit status %i" % (rcode)
+                    )
+                found = True
+            elif "dockerImport" in docker_requirement:
+                cmd = [
+                    "docker",
+                    "import",
+                    str(docker_requirement["dockerImport"]),
+                    str(docker_requirement["dockerImageId"]),
+                ]
+                _logger.info(str(cmd))
+                subprocess.check_call(cmd, stdout=sys.stderr)  # nosec
+                found = True
+
+        if found:
+            with _IMAGES_LOCK:
+                _IMAGES.add(docker_requirement["dockerImageId"])
+
+        return found
+
+    def get_from_requirements(
+        self,
+        r: CWLObjectType,
+        pull_image: bool,
+        force_pull: bool,
+        tmp_outdir_prefix: str,
+    ) -> Optional[str]:
+        if not spawn.find_executable("docker"):
+            raise WorkflowException("docker executable is not available")
+
+        if self.get_image(
+            cast(Dict[str, str], r), pull_image, force_pull, tmp_outdir_prefix
+        ):
+            return cast(Optional[str], r["dockerImageId"])
+        raise WorkflowException("Docker image %s not found" % r["dockerImageId"])
+
+    @staticmethod
+    def append_volume(
+        runtime: List[str], source: str, target: str, writable: bool = False
+    ) -> None:
+        """Add binding arguments to the runtime list."""
+        options = [
+            "type=bind",
+            "source=" + source,
+            "target=" + target,
+        ]
+        if not writable:
+            options.append("readonly")
+        output = StringIO()
+        csv.writer(output).writerow(options)
+        mount_arg = output.getvalue().strip()
+        runtime.append(f"--mount={mount_arg}")
+        # Unlike "--volume", "--mount" will fail if the volume doesn't already exist.
+        if not os.path.exists(source):
+            os.makedirs(source)
+
+    def add_file_or_directory_volume(
+        self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str]
+    ) -> None:
+        """Append volume a file/dir mapping to the runtime option list."""
+        if not volume.resolved.startswith("_:"):
+            _check_docker_machine_path(docker_windows_path_adjust(volume.resolved))
+            self.append_volume(runtime, volume.resolved, volume.target)
+
+    def add_writable_file_volume(
+        self,
+        runtime: List[str],
+        volume: MapperEnt,
+        host_outdir_tgt: Optional[str],
+        tmpdir_prefix: str,
+    ) -> None:
+        """Append a writable file mapping to the runtime option list."""
+        if self.inplace_update:
+            self.append_volume(runtime, volume.resolved, volume.target, writable=True)
+        else:
+            if host_outdir_tgt:
+                # shortcut, just copy to the output directory
+                # which is already going to be mounted
+                if not os.path.exists(os.path.dirname(host_outdir_tgt)):
+                    os.makedirs(os.path.dirname(host_outdir_tgt))
+                shutil.copy(volume.resolved, host_outdir_tgt)
+            else:
+                tmpdir = create_tmp_dir(tmpdir_prefix)
+                file_copy = os.path.join(tmpdir, os.path.basename(volume.resolved))
+                shutil.copy(volume.resolved, file_copy)
+                self.append_volume(runtime, file_copy, volume.target, writable=True)
+            ensure_writable(host_outdir_tgt or file_copy)
+
+    def add_writable_directory_volume(
+        self,
+        runtime: List[str],
+        volume: MapperEnt,
+        host_outdir_tgt: Optional[str],
+        tmpdir_prefix: str,
+    ) -> None:
+        """Append a writable directory mapping to the runtime option list."""
+        if volume.resolved.startswith("_:"):
+            # Synthetic directory that needs creating first
+            if not host_outdir_tgt:
+                new_dir = os.path.join(
+                    create_tmp_dir(tmpdir_prefix),
+                    os.path.basename(volume.target),
+                )
+                self.append_volume(runtime, new_dir, volume.target, writable=True)
+            elif not os.path.exists(host_outdir_tgt):
+                os.makedirs(host_outdir_tgt)
+        else:
+            if self.inplace_update:
+                self.append_volume(
+                    runtime, volume.resolved, volume.target, writable=True
+                )
+            else:
+                if not host_outdir_tgt:
+                    tmpdir = create_tmp_dir(tmpdir_prefix)
+                    new_dir = os.path.join(tmpdir, os.path.basename(volume.resolved))
+                    shutil.copytree(volume.resolved, new_dir)
+                    self.append_volume(runtime, new_dir, volume.target, writable=True)
+                else:
+                    shutil.copytree(volume.resolved, host_outdir_tgt)
+                ensure_writable(host_outdir_tgt or new_dir)
+
+    def create_runtime(
+        self, env: MutableMapping[str, str], runtimeContext: RuntimeContext
+    ) -> Tuple[List[str], Optional[str]]:
+        any_path_okay = self.builder.get_requirement("DockerRequirement")[1] or False
+        user_space_docker_cmd = runtimeContext.user_space_docker_cmd
+        if user_space_docker_cmd:
+            if "udocker" in user_space_docker_cmd and not runtimeContext.debug:
+                runtime = [user_space_docker_cmd, "--quiet", "run"]
+                # udocker 1.1.1 will output diagnostic messages to stdout
+                # without this
+            else:
+                runtime = [user_space_docker_cmd, "run"]
+        else:
+            runtime = ["docker", "run", "-i"]
+        self.append_volume(
+            runtime, os.path.realpath(self.outdir), self.builder.outdir, writable=True
+        )
+        tmpdir = "/tmp"  # nosec
+        self.append_volume(
+            runtime, os.path.realpath(self.tmpdir), tmpdir, writable=True
+        )
+        self.add_volumes(
+            self.pathmapper,
+            runtime,
+            any_path_okay=True,
+            secret_store=runtimeContext.secret_store,
+            tmpdir_prefix=runtimeContext.tmpdir_prefix,
+        )
+        if self.generatemapper is not None:
+            self.add_volumes(
+                self.generatemapper,
+                runtime,
+                any_path_okay=any_path_okay,
+                secret_store=runtimeContext.secret_store,
+                tmpdir_prefix=runtimeContext.tmpdir_prefix,
+            )
+
+        if user_space_docker_cmd:
+            runtime = [x.replace(":ro", "") for x in runtime]
+            runtime = [x.replace(":rw", "") for x in runtime]
+
+        runtime.append(
+            "--workdir=%s" % (docker_windows_path_adjust(self.builder.outdir))
+        )
+        if not user_space_docker_cmd:
+
+            if not runtimeContext.no_read_only:
+                runtime.append("--read-only=true")
+
+            if self.networkaccess:
+                if runtimeContext.custom_net:
+                    runtime.append(f"--net={runtimeContext.custom_net}")
+            else:
+                runtime.append("--net=none")
+
+            if self.stdout is not None:
+                runtime.append("--log-driver=none")
+
+            euid, egid = docker_vm_id()
+            if not onWindows():
+                # MS Windows does not have getuid() or geteuid() functions
+                euid, egid = euid or os.geteuid(), egid or os.getgid()
+
+            if runtimeContext.no_match_user is False and (
+                euid is not None and egid is not None
+            ):
+                runtime.append("--user=%d:%d" % (euid, egid))
+
+        if runtimeContext.rm_container:
+            runtime.append("--rm")
+
+        runtime.append("--env=TMPDIR=/tmp")
+
+        # spec currently says "HOME must be set to the designated output
+        # directory." but spec might change to designated temp directory.
+        # runtime.append("--env=HOME=/tmp")
+        runtime.append("--env=HOME=%s" % self.builder.outdir)
+
+        cidfile_path = None  # type: Optional[str]
+        # add parameters to docker to write a container ID file
+        if runtimeContext.user_space_docker_cmd is None:
+            if runtimeContext.cidfile_dir:
+                cidfile_dir = runtimeContext.cidfile_dir
+                if not os.path.exists(str(cidfile_dir)):
+                    _logger.error(
+                        "--cidfile-dir %s error:\n%s",
+                        cidfile_dir,
+                        "directory doesn't exist, please create it first",
+                    )
+                    exit(2)
+                if not os.path.isdir(cidfile_dir):
+                    _logger.error(
+                        "--cidfile-dir %s error:\n%s",
+                        cidfile_dir,
+                        cidfile_dir + " is not a directory, please check it first",
+                    )
+                    exit(2)
+            else:
+                cidfile_dir = runtimeContext.create_tmpdir()
+
+            cidfile_name = datetime.datetime.now().strftime("%Y%m%d%H%M%S-%f") + ".cid"
+            if runtimeContext.cidfile_prefix is not None:
+                cidfile_name = str(runtimeContext.cidfile_prefix + "-" + cidfile_name)
+            cidfile_path = os.path.join(cidfile_dir, cidfile_name)
+            runtime.append("--cidfile=%s" % cidfile_path)
+        for key, value in self.environment.items():
+            runtime.append(f"--env={key}={value}")
+
+        if runtimeContext.strict_memory_limit and not user_space_docker_cmd:
+            ram = self.builder.resources["ram"]
+            if not isinstance(ram, str):
+                runtime.append("--memory=%dm" % ram)
+        elif not user_space_docker_cmd:
+            res_req, _ = self.builder.get_requirement("ResourceRequirement")
+            if res_req and ("ramMin" in res_req or "ramMax" in res_req):
+                _logger.warning(
+                    "[job %s] Skipping Docker software container '--memory' limit "
+                    "despite presence of ResourceRequirement with ramMin "
+                    "and/or ramMax setting. Consider running with "
+                    "--strict-memory-limit for increased portability "
+                    "assurance.",
+                    self.name,
+                )
+
+        return runtime, cidfile_path