view env/lib/python3.9/site-packages/cwltool/docker.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

"""Enables Docker software containers via the {u,}docker runtimes."""

import csv
import datetime
import os
import re
import shutil
import subprocess  # nosec
import sys
import threading
from distutils import spawn
from io import StringIO  # pylint: disable=redefined-builtin
from typing import Callable, Dict, List, MutableMapping, Optional, Set, Tuple, cast

import requests

from .builder import Builder
from .context import RuntimeContext
from .docker_id import docker_vm_id
from .errors import WorkflowException
from .job import ContainerCommandLineJob
from .loghandler import _logger
from .pathmapper import MapperEnt, PathMapper
from .utils import (
    CWLObjectType,
    create_tmp_dir,
    docker_windows_path_adjust,
    ensure_writable,
    onWindows,
)

_IMAGES = set()  # type: Set[str]
_IMAGES_LOCK = threading.Lock()
__docker_machine_mounts = None  # type: Optional[List[str]]
__docker_machine_mounts_lock = threading.Lock()


def _get_docker_machine_mounts() -> List[str]:
    global __docker_machine_mounts
    if __docker_machine_mounts is None:
        with __docker_machine_mounts_lock:
            if "DOCKER_MACHINE_NAME" not in os.environ:
                __docker_machine_mounts = []
            else:
                __docker_machine_mounts = [
                    "/" + line.split(None, 1)[0]
                    for line in subprocess.check_output(  # nosec
                        [
                            "docker-machine",
                            "ssh",
                            os.environ["DOCKER_MACHINE_NAME"],
                            "mount",
                            "-t",
                            "vboxsf",
                        ],
                        universal_newlines=True,
                    ).splitlines()
                ]
    return __docker_machine_mounts


def _check_docker_machine_path(path: Optional[str]) -> None:
    if path is None:
        return
    if onWindows():
        path = path.lower()
    mounts = _get_docker_machine_mounts()

    found = False
    for mount in mounts:
        if onWindows():
            mount = mount.lower()
        if path.startswith(mount):
            found = True
            break

    if not found and mounts:
        name = os.environ.get("DOCKER_MACHINE_NAME", "???")
        raise WorkflowException(
            "Input path {path} is not in the list of host paths mounted "
            "into the Docker virtual machine named {name}. Already mounted "
            "paths: {mounts}.\n"
            "See https://docs.docker.com/toolbox/toolbox_install_windows/"
            "#optional-add-shared-directories for instructions on how to "
            "add this path to your VM.".format(path=path, name=name, mounts=mounts)
        )


class DockerCommandLineJob(ContainerCommandLineJob):
    """Runs a CommandLineJob in a sofware container using the Docker engine."""

    def __init__(
        self,
        builder: Builder,
        joborder: CWLObjectType,
        make_path_mapper: Callable[..., PathMapper],
        requirements: List[CWLObjectType],
        hints: List[CWLObjectType],
        name: str,
    ) -> None:
        """Initialize a command line builder using the Docker software container engine."""
        super().__init__(builder, joborder, make_path_mapper, requirements, hints, name)

    @staticmethod
    def get_image(
        docker_requirement: Dict[str, str],
        pull_image: bool,
        force_pull: bool,
        tmp_outdir_prefix: str,
    ) -> bool:
        """
        Retrieve the relevant Docker container image.

        Returns True upon success
        """
        found = False

        if (
            "dockerImageId" not in docker_requirement
            and "dockerPull" in docker_requirement
        ):
            docker_requirement["dockerImageId"] = docker_requirement["dockerPull"]

        with _IMAGES_LOCK:
            if docker_requirement["dockerImageId"] in _IMAGES:
                return True

        for line in (
            subprocess.check_output(  # nosec
                ["docker", "images", "--no-trunc", "--all"]
            )
            .decode("utf-8")
            .splitlines()
        ):
            try:
                match = re.match(r"^([^ ]+)\s+([^ ]+)\s+([^ ]+)", line)
                split = docker_requirement["dockerImageId"].split(":")
                if len(split) == 1:
                    split.append("latest")
                elif len(split) == 2:
                    #  if split[1] doesn't  match valid tag names, it is a part of repository
                    if not re.match(r"[\w][\w.-]{0,127}", split[1]):
                        split[0] = split[0] + ":" + split[1]
                        split[1] = "latest"
                elif len(split) == 3:
                    if re.match(r"[\w][\w.-]{0,127}", split[2]):
                        split[0] = split[0] + ":" + split[1]
                        split[1] = split[2]
                        del split[2]

                # check for repository:tag match or image id match
                if match and (
                    (split[0] == match.group(1) and split[1] == match.group(2))
                    or docker_requirement["dockerImageId"] == match.group(3)
                ):
                    found = True
                    break
            except ValueError:
                pass

        if (force_pull or not found) and pull_image:
            cmd = []  # type: List[str]
            if "dockerPull" in docker_requirement:
                cmd = ["docker", "pull", str(docker_requirement["dockerPull"])]
                _logger.info(str(cmd))
                subprocess.check_call(cmd, stdout=sys.stderr)  # nosec
                found = True
            elif "dockerFile" in docker_requirement:
                dockerfile_dir = create_tmp_dir(tmp_outdir_prefix)
                with open(os.path.join(dockerfile_dir, "Dockerfile"), "wb") as dfile:
                    dfile.write(docker_requirement["dockerFile"].encode("utf-8"))
                cmd = [
                    "docker",
                    "build",
                    "--tag=%s" % str(docker_requirement["dockerImageId"]),
                    dockerfile_dir,
                ]
                _logger.info(str(cmd))
                subprocess.check_call(cmd, stdout=sys.stderr)  # nosec
                found = True
            elif "dockerLoad" in docker_requirement:
                cmd = ["docker", "load"]
                _logger.info(str(cmd))
                if os.path.exists(docker_requirement["dockerLoad"]):
                    _logger.info(
                        "Loading docker image from %s",
                        docker_requirement["dockerLoad"],
                    )
                    with open(docker_requirement["dockerLoad"], "rb") as dload:
                        loadproc = subprocess.Popen(  # nosec
                            cmd, stdin=dload, stdout=sys.stderr
                        )
                else:
                    loadproc = subprocess.Popen(  # nosec
                        cmd, stdin=subprocess.PIPE, stdout=sys.stderr
                    )
                    assert loadproc.stdin is not None  # nosec
                    _logger.info(
                        "Sending GET request to %s", docker_requirement["dockerLoad"]
                    )
                    req = requests.get(docker_requirement["dockerLoad"], stream=True)
                    size = 0
                    for chunk in req.iter_content(1024 * 1024):
                        size += len(chunk)
                        _logger.info("\r%i bytes", size)
                        loadproc.stdin.write(chunk)
                    loadproc.stdin.close()
                rcode = loadproc.wait()
                if rcode != 0:
                    raise WorkflowException(
                        "Docker load returned non-zero exit status %i" % (rcode)
                    )
                found = True
            elif "dockerImport" in docker_requirement:
                cmd = [
                    "docker",
                    "import",
                    str(docker_requirement["dockerImport"]),
                    str(docker_requirement["dockerImageId"]),
                ]
                _logger.info(str(cmd))
                subprocess.check_call(cmd, stdout=sys.stderr)  # nosec
                found = True

        if found:
            with _IMAGES_LOCK:
                _IMAGES.add(docker_requirement["dockerImageId"])

        return found

    def get_from_requirements(
        self,
        r: CWLObjectType,
        pull_image: bool,
        force_pull: bool,
        tmp_outdir_prefix: str,
    ) -> Optional[str]:
        if not spawn.find_executable("docker"):
            raise WorkflowException("docker executable is not available")

        if self.get_image(
            cast(Dict[str, str], r), pull_image, force_pull, tmp_outdir_prefix
        ):
            return cast(Optional[str], r["dockerImageId"])
        raise WorkflowException("Docker image %s not found" % r["dockerImageId"])

    @staticmethod
    def append_volume(
        runtime: List[str], source: str, target: str, writable: bool = False
    ) -> None:
        """Add binding arguments to the runtime list."""
        options = [
            "type=bind",
            "source=" + source,
            "target=" + target,
        ]
        if not writable:
            options.append("readonly")
        output = StringIO()
        csv.writer(output).writerow(options)
        mount_arg = output.getvalue().strip()
        runtime.append(f"--mount={mount_arg}")
        # Unlike "--volume", "--mount" will fail if the volume doesn't already exist.
        if not os.path.exists(source):
            os.makedirs(source)

    def add_file_or_directory_volume(
        self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str]
    ) -> None:
        """Append volume a file/dir mapping to the runtime option list."""
        if not volume.resolved.startswith("_:"):
            _check_docker_machine_path(docker_windows_path_adjust(volume.resolved))
            self.append_volume(runtime, volume.resolved, volume.target)

    def add_writable_file_volume(
        self,
        runtime: List[str],
        volume: MapperEnt,
        host_outdir_tgt: Optional[str],
        tmpdir_prefix: str,
    ) -> None:
        """Append a writable file mapping to the runtime option list."""
        if self.inplace_update:
            self.append_volume(runtime, volume.resolved, volume.target, writable=True)
        else:
            if host_outdir_tgt:
                # shortcut, just copy to the output directory
                # which is already going to be mounted
                if not os.path.exists(os.path.dirname(host_outdir_tgt)):
                    os.makedirs(os.path.dirname(host_outdir_tgt))
                shutil.copy(volume.resolved, host_outdir_tgt)
            else:
                tmpdir = create_tmp_dir(tmpdir_prefix)
                file_copy = os.path.join(tmpdir, os.path.basename(volume.resolved))
                shutil.copy(volume.resolved, file_copy)
                self.append_volume(runtime, file_copy, volume.target, writable=True)
            ensure_writable(host_outdir_tgt or file_copy)

    def add_writable_directory_volume(
        self,
        runtime: List[str],
        volume: MapperEnt,
        host_outdir_tgt: Optional[str],
        tmpdir_prefix: str,
    ) -> None:
        """Append a writable directory mapping to the runtime option list."""
        if volume.resolved.startswith("_:"):
            # Synthetic directory that needs creating first
            if not host_outdir_tgt:
                new_dir = os.path.join(
                    create_tmp_dir(tmpdir_prefix),
                    os.path.basename(volume.target),
                )
                self.append_volume(runtime, new_dir, volume.target, writable=True)
            elif not os.path.exists(host_outdir_tgt):
                os.makedirs(host_outdir_tgt)
        else:
            if self.inplace_update:
                self.append_volume(
                    runtime, volume.resolved, volume.target, writable=True
                )
            else:
                if not host_outdir_tgt:
                    tmpdir = create_tmp_dir(tmpdir_prefix)
                    new_dir = os.path.join(tmpdir, os.path.basename(volume.resolved))
                    shutil.copytree(volume.resolved, new_dir)
                    self.append_volume(runtime, new_dir, volume.target, writable=True)
                else:
                    shutil.copytree(volume.resolved, host_outdir_tgt)
                ensure_writable(host_outdir_tgt or new_dir)

    def create_runtime(
        self, env: MutableMapping[str, str], runtimeContext: RuntimeContext
    ) -> Tuple[List[str], Optional[str]]:
        any_path_okay = self.builder.get_requirement("DockerRequirement")[1] or False
        user_space_docker_cmd = runtimeContext.user_space_docker_cmd
        if user_space_docker_cmd:
            if "udocker" in user_space_docker_cmd and not runtimeContext.debug:
                runtime = [user_space_docker_cmd, "--quiet", "run"]
                # udocker 1.1.1 will output diagnostic messages to stdout
                # without this
            else:
                runtime = [user_space_docker_cmd, "run"]
        else:
            runtime = ["docker", "run", "-i"]
        self.append_volume(
            runtime, os.path.realpath(self.outdir), self.builder.outdir, writable=True
        )
        tmpdir = "/tmp"  # nosec
        self.append_volume(
            runtime, os.path.realpath(self.tmpdir), tmpdir, writable=True
        )
        self.add_volumes(
            self.pathmapper,
            runtime,
            any_path_okay=True,
            secret_store=runtimeContext.secret_store,
            tmpdir_prefix=runtimeContext.tmpdir_prefix,
        )
        if self.generatemapper is not None:
            self.add_volumes(
                self.generatemapper,
                runtime,
                any_path_okay=any_path_okay,
                secret_store=runtimeContext.secret_store,
                tmpdir_prefix=runtimeContext.tmpdir_prefix,
            )

        if user_space_docker_cmd:
            runtime = [x.replace(":ro", "") for x in runtime]
            runtime = [x.replace(":rw", "") for x in runtime]

        runtime.append(
            "--workdir=%s" % (docker_windows_path_adjust(self.builder.outdir))
        )
        if not user_space_docker_cmd:

            if not runtimeContext.no_read_only:
                runtime.append("--read-only=true")

            if self.networkaccess:
                if runtimeContext.custom_net:
                    runtime.append(f"--net={runtimeContext.custom_net}")
            else:
                runtime.append("--net=none")

            if self.stdout is not None:
                runtime.append("--log-driver=none")

            euid, egid = docker_vm_id()
            if not onWindows():
                # MS Windows does not have getuid() or geteuid() functions
                euid, egid = euid or os.geteuid(), egid or os.getgid()

            if runtimeContext.no_match_user is False and (
                euid is not None and egid is not None
            ):
                runtime.append("--user=%d:%d" % (euid, egid))

        if runtimeContext.rm_container:
            runtime.append("--rm")

        runtime.append("--env=TMPDIR=/tmp")

        # spec currently says "HOME must be set to the designated output
        # directory." but spec might change to designated temp directory.
        # runtime.append("--env=HOME=/tmp")
        runtime.append("--env=HOME=%s" % self.builder.outdir)

        cidfile_path = None  # type: Optional[str]
        # add parameters to docker to write a container ID file
        if runtimeContext.user_space_docker_cmd is None:
            if runtimeContext.cidfile_dir:
                cidfile_dir = runtimeContext.cidfile_dir
                if not os.path.exists(str(cidfile_dir)):
                    _logger.error(
                        "--cidfile-dir %s error:\n%s",
                        cidfile_dir,
                        "directory doesn't exist, please create it first",
                    )
                    exit(2)
                if not os.path.isdir(cidfile_dir):
                    _logger.error(
                        "--cidfile-dir %s error:\n%s",
                        cidfile_dir,
                        cidfile_dir + " is not a directory, please check it first",
                    )
                    exit(2)
            else:
                cidfile_dir = runtimeContext.create_tmpdir()

            cidfile_name = datetime.datetime.now().strftime("%Y%m%d%H%M%S-%f") + ".cid"
            if runtimeContext.cidfile_prefix is not None:
                cidfile_name = str(runtimeContext.cidfile_prefix + "-" + cidfile_name)
            cidfile_path = os.path.join(cidfile_dir, cidfile_name)
            runtime.append("--cidfile=%s" % cidfile_path)
        for key, value in self.environment.items():
            runtime.append(f"--env={key}={value}")

        if runtimeContext.strict_memory_limit and not user_space_docker_cmd:
            ram = self.builder.resources["ram"]
            if not isinstance(ram, str):
                runtime.append("--memory=%dm" % ram)
        elif not user_space_docker_cmd:
            res_req, _ = self.builder.get_requirement("ResourceRequirement")
            if res_req and ("ramMin" in res_req or "ramMax" in res_req):
                _logger.warning(
                    "[job %s] Skipping Docker software container '--memory' limit "
                    "despite presence of ResourceRequirement with ramMin "
                    "and/or ramMax setting. Consider running with "
                    "--strict-memory-limit for increased portability "
                    "assurance.",
                    self.name,
                )

        return runtime, cidfile_path