Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/cwltool/docker.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/cwltool/docker.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,456 @@ +"""Enables Docker software containers via the {u,}docker runtimes.""" + +import csv +import datetime +import os +import re +import shutil +import subprocess # nosec +import sys +import threading +from distutils import spawn +from io import StringIO # pylint: disable=redefined-builtin +from typing import Callable, Dict, List, MutableMapping, Optional, Set, Tuple, cast + +import requests + +from .builder import Builder +from .context import RuntimeContext +from .docker_id import docker_vm_id +from .errors import WorkflowException +from .job import ContainerCommandLineJob +from .loghandler import _logger +from .pathmapper import MapperEnt, PathMapper +from .utils import ( + CWLObjectType, + create_tmp_dir, + docker_windows_path_adjust, + ensure_writable, + onWindows, +) + +_IMAGES = set() # type: Set[str] +_IMAGES_LOCK = threading.Lock() +__docker_machine_mounts = None # type: Optional[List[str]] +__docker_machine_mounts_lock = threading.Lock() + + +def _get_docker_machine_mounts() -> List[str]: + global __docker_machine_mounts + if __docker_machine_mounts is None: + with __docker_machine_mounts_lock: + if "DOCKER_MACHINE_NAME" not in os.environ: + __docker_machine_mounts = [] + else: + __docker_machine_mounts = [ + "/" + line.split(None, 1)[0] + for line in subprocess.check_output( # nosec + [ + "docker-machine", + "ssh", + os.environ["DOCKER_MACHINE_NAME"], + "mount", + "-t", + "vboxsf", + ], + universal_newlines=True, + ).splitlines() + ] + return __docker_machine_mounts + + +def _check_docker_machine_path(path: Optional[str]) -> None: + if path is None: + return + if onWindows(): + path = path.lower() + mounts = _get_docker_machine_mounts() + + found = False + for mount in mounts: + if onWindows(): + mount = mount.lower() + if path.startswith(mount): + found = True + break + + if not found and mounts: + name = os.environ.get("DOCKER_MACHINE_NAME", "???") + raise WorkflowException( + "Input path {path} is not in the list of host paths mounted " + "into the Docker virtual machine named {name}. Already mounted " + "paths: {mounts}.\n" + "See https://docs.docker.com/toolbox/toolbox_install_windows/" + "#optional-add-shared-directories for instructions on how to " + "add this path to your VM.".format(path=path, name=name, mounts=mounts) + ) + + +class DockerCommandLineJob(ContainerCommandLineJob): + """Runs a CommandLineJob in a sofware container using the Docker engine.""" + + def __init__( + self, + builder: Builder, + joborder: CWLObjectType, + make_path_mapper: Callable[..., PathMapper], + requirements: List[CWLObjectType], + hints: List[CWLObjectType], + name: str, + ) -> None: + """Initialize a command line builder using the Docker software container engine.""" + super().__init__(builder, joborder, make_path_mapper, requirements, hints, name) + + @staticmethod + def get_image( + docker_requirement: Dict[str, str], + pull_image: bool, + force_pull: bool, + tmp_outdir_prefix: str, + ) -> bool: + """ + Retrieve the relevant Docker container image. + + Returns True upon success + """ + found = False + + if ( + "dockerImageId" not in docker_requirement + and "dockerPull" in docker_requirement + ): + docker_requirement["dockerImageId"] = docker_requirement["dockerPull"] + + with _IMAGES_LOCK: + if docker_requirement["dockerImageId"] in _IMAGES: + return True + + for line in ( + subprocess.check_output( # nosec + ["docker", "images", "--no-trunc", "--all"] + ) + .decode("utf-8") + .splitlines() + ): + try: + match = re.match(r"^([^ ]+)\s+([^ ]+)\s+([^ ]+)", line) + split = docker_requirement["dockerImageId"].split(":") + if len(split) == 1: + split.append("latest") + elif len(split) == 2: + # if split[1] doesn't match valid tag names, it is a part of repository + if not re.match(r"[\w][\w.-]{0,127}", split[1]): + split[0] = split[0] + ":" + split[1] + split[1] = "latest" + elif len(split) == 3: + if re.match(r"[\w][\w.-]{0,127}", split[2]): + split[0] = split[0] + ":" + split[1] + split[1] = split[2] + del split[2] + + # check for repository:tag match or image id match + if match and ( + (split[0] == match.group(1) and split[1] == match.group(2)) + or docker_requirement["dockerImageId"] == match.group(3) + ): + found = True + break + except ValueError: + pass + + if (force_pull or not found) and pull_image: + cmd = [] # type: List[str] + if "dockerPull" in docker_requirement: + cmd = ["docker", "pull", str(docker_requirement["dockerPull"])] + _logger.info(str(cmd)) + subprocess.check_call(cmd, stdout=sys.stderr) # nosec + found = True + elif "dockerFile" in docker_requirement: + dockerfile_dir = create_tmp_dir(tmp_outdir_prefix) + with open(os.path.join(dockerfile_dir, "Dockerfile"), "wb") as dfile: + dfile.write(docker_requirement["dockerFile"].encode("utf-8")) + cmd = [ + "docker", + "build", + "--tag=%s" % str(docker_requirement["dockerImageId"]), + dockerfile_dir, + ] + _logger.info(str(cmd)) + subprocess.check_call(cmd, stdout=sys.stderr) # nosec + found = True + elif "dockerLoad" in docker_requirement: + cmd = ["docker", "load"] + _logger.info(str(cmd)) + if os.path.exists(docker_requirement["dockerLoad"]): + _logger.info( + "Loading docker image from %s", + docker_requirement["dockerLoad"], + ) + with open(docker_requirement["dockerLoad"], "rb") as dload: + loadproc = subprocess.Popen( # nosec + cmd, stdin=dload, stdout=sys.stderr + ) + else: + loadproc = subprocess.Popen( # nosec + cmd, stdin=subprocess.PIPE, stdout=sys.stderr + ) + assert loadproc.stdin is not None # nosec + _logger.info( + "Sending GET request to %s", docker_requirement["dockerLoad"] + ) + req = requests.get(docker_requirement["dockerLoad"], stream=True) + size = 0 + for chunk in req.iter_content(1024 * 1024): + size += len(chunk) + _logger.info("\r%i bytes", size) + loadproc.stdin.write(chunk) + loadproc.stdin.close() + rcode = loadproc.wait() + if rcode != 0: + raise WorkflowException( + "Docker load returned non-zero exit status %i" % (rcode) + ) + found = True + elif "dockerImport" in docker_requirement: + cmd = [ + "docker", + "import", + str(docker_requirement["dockerImport"]), + str(docker_requirement["dockerImageId"]), + ] + _logger.info(str(cmd)) + subprocess.check_call(cmd, stdout=sys.stderr) # nosec + found = True + + if found: + with _IMAGES_LOCK: + _IMAGES.add(docker_requirement["dockerImageId"]) + + return found + + def get_from_requirements( + self, + r: CWLObjectType, + pull_image: bool, + force_pull: bool, + tmp_outdir_prefix: str, + ) -> Optional[str]: + if not spawn.find_executable("docker"): + raise WorkflowException("docker executable is not available") + + if self.get_image( + cast(Dict[str, str], r), pull_image, force_pull, tmp_outdir_prefix + ): + return cast(Optional[str], r["dockerImageId"]) + raise WorkflowException("Docker image %s not found" % r["dockerImageId"]) + + @staticmethod + def append_volume( + runtime: List[str], source: str, target: str, writable: bool = False + ) -> None: + """Add binding arguments to the runtime list.""" + options = [ + "type=bind", + "source=" + source, + "target=" + target, + ] + if not writable: + options.append("readonly") + output = StringIO() + csv.writer(output).writerow(options) + mount_arg = output.getvalue().strip() + runtime.append(f"--mount={mount_arg}") + # Unlike "--volume", "--mount" will fail if the volume doesn't already exist. + if not os.path.exists(source): + os.makedirs(source) + + def add_file_or_directory_volume( + self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str] + ) -> None: + """Append volume a file/dir mapping to the runtime option list.""" + if not volume.resolved.startswith("_:"): + _check_docker_machine_path(docker_windows_path_adjust(volume.resolved)) + self.append_volume(runtime, volume.resolved, volume.target) + + def add_writable_file_volume( + self, + runtime: List[str], + volume: MapperEnt, + host_outdir_tgt: Optional[str], + tmpdir_prefix: str, + ) -> None: + """Append a writable file mapping to the runtime option list.""" + if self.inplace_update: + self.append_volume(runtime, volume.resolved, volume.target, writable=True) + else: + if host_outdir_tgt: + # shortcut, just copy to the output directory + # which is already going to be mounted + if not os.path.exists(os.path.dirname(host_outdir_tgt)): + os.makedirs(os.path.dirname(host_outdir_tgt)) + shutil.copy(volume.resolved, host_outdir_tgt) + else: + tmpdir = create_tmp_dir(tmpdir_prefix) + file_copy = os.path.join(tmpdir, os.path.basename(volume.resolved)) + shutil.copy(volume.resolved, file_copy) + self.append_volume(runtime, file_copy, volume.target, writable=True) + ensure_writable(host_outdir_tgt or file_copy) + + def add_writable_directory_volume( + self, + runtime: List[str], + volume: MapperEnt, + host_outdir_tgt: Optional[str], + tmpdir_prefix: str, + ) -> None: + """Append a writable directory mapping to the runtime option list.""" + if volume.resolved.startswith("_:"): + # Synthetic directory that needs creating first + if not host_outdir_tgt: + new_dir = os.path.join( + create_tmp_dir(tmpdir_prefix), + os.path.basename(volume.target), + ) + self.append_volume(runtime, new_dir, volume.target, writable=True) + elif not os.path.exists(host_outdir_tgt): + os.makedirs(host_outdir_tgt) + else: + if self.inplace_update: + self.append_volume( + runtime, volume.resolved, volume.target, writable=True + ) + else: + if not host_outdir_tgt: + tmpdir = create_tmp_dir(tmpdir_prefix) + new_dir = os.path.join(tmpdir, os.path.basename(volume.resolved)) + shutil.copytree(volume.resolved, new_dir) + self.append_volume(runtime, new_dir, volume.target, writable=True) + else: + shutil.copytree(volume.resolved, host_outdir_tgt) + ensure_writable(host_outdir_tgt or new_dir) + + def create_runtime( + self, env: MutableMapping[str, str], runtimeContext: RuntimeContext + ) -> Tuple[List[str], Optional[str]]: + any_path_okay = self.builder.get_requirement("DockerRequirement")[1] or False + user_space_docker_cmd = runtimeContext.user_space_docker_cmd + if user_space_docker_cmd: + if "udocker" in user_space_docker_cmd and not runtimeContext.debug: + runtime = [user_space_docker_cmd, "--quiet", "run"] + # udocker 1.1.1 will output diagnostic messages to stdout + # without this + else: + runtime = [user_space_docker_cmd, "run"] + else: + runtime = ["docker", "run", "-i"] + self.append_volume( + runtime, os.path.realpath(self.outdir), self.builder.outdir, writable=True + ) + tmpdir = "/tmp" # nosec + self.append_volume( + runtime, os.path.realpath(self.tmpdir), tmpdir, writable=True + ) + self.add_volumes( + self.pathmapper, + runtime, + any_path_okay=True, + secret_store=runtimeContext.secret_store, + tmpdir_prefix=runtimeContext.tmpdir_prefix, + ) + if self.generatemapper is not None: + self.add_volumes( + self.generatemapper, + runtime, + any_path_okay=any_path_okay, + secret_store=runtimeContext.secret_store, + tmpdir_prefix=runtimeContext.tmpdir_prefix, + ) + + if user_space_docker_cmd: + runtime = [x.replace(":ro", "") for x in runtime] + runtime = [x.replace(":rw", "") for x in runtime] + + runtime.append( + "--workdir=%s" % (docker_windows_path_adjust(self.builder.outdir)) + ) + if not user_space_docker_cmd: + + if not runtimeContext.no_read_only: + runtime.append("--read-only=true") + + if self.networkaccess: + if runtimeContext.custom_net: + runtime.append(f"--net={runtimeContext.custom_net}") + else: + runtime.append("--net=none") + + if self.stdout is not None: + runtime.append("--log-driver=none") + + euid, egid = docker_vm_id() + if not onWindows(): + # MS Windows does not have getuid() or geteuid() functions + euid, egid = euid or os.geteuid(), egid or os.getgid() + + if runtimeContext.no_match_user is False and ( + euid is not None and egid is not None + ): + runtime.append("--user=%d:%d" % (euid, egid)) + + if runtimeContext.rm_container: + runtime.append("--rm") + + runtime.append("--env=TMPDIR=/tmp") + + # spec currently says "HOME must be set to the designated output + # directory." but spec might change to designated temp directory. + # runtime.append("--env=HOME=/tmp") + runtime.append("--env=HOME=%s" % self.builder.outdir) + + cidfile_path = None # type: Optional[str] + # add parameters to docker to write a container ID file + if runtimeContext.user_space_docker_cmd is None: + if runtimeContext.cidfile_dir: + cidfile_dir = runtimeContext.cidfile_dir + if not os.path.exists(str(cidfile_dir)): + _logger.error( + "--cidfile-dir %s error:\n%s", + cidfile_dir, + "directory doesn't exist, please create it first", + ) + exit(2) + if not os.path.isdir(cidfile_dir): + _logger.error( + "--cidfile-dir %s error:\n%s", + cidfile_dir, + cidfile_dir + " is not a directory, please check it first", + ) + exit(2) + else: + cidfile_dir = runtimeContext.create_tmpdir() + + cidfile_name = datetime.datetime.now().strftime("%Y%m%d%H%M%S-%f") + ".cid" + if runtimeContext.cidfile_prefix is not None: + cidfile_name = str(runtimeContext.cidfile_prefix + "-" + cidfile_name) + cidfile_path = os.path.join(cidfile_dir, cidfile_name) + runtime.append("--cidfile=%s" % cidfile_path) + for key, value in self.environment.items(): + runtime.append(f"--env={key}={value}") + + if runtimeContext.strict_memory_limit and not user_space_docker_cmd: + ram = self.builder.resources["ram"] + if not isinstance(ram, str): + runtime.append("--memory=%dm" % ram) + elif not user_space_docker_cmd: + res_req, _ = self.builder.get_requirement("ResourceRequirement") + if res_req and ("ramMin" in res_req or "ramMax" in res_req): + _logger.warning( + "[job %s] Skipping Docker software container '--memory' limit " + "despite presence of ResourceRequirement with ramMin " + "and/or ramMax setting. Consider running with " + "--strict-memory-limit for increased portability " + "assurance.", + self.name, + ) + + return runtime, cidfile_path