Mercurial > repos > shellac > sam_consensus_v3
view env/lib/python3.9/site-packages/cwltool/singularity.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line source
"""Support for executing Docker containers using the Singularity 2.x engine.""" import os import os.path import re import shutil import sys from distutils import spawn from subprocess import ( # nosec DEVNULL, PIPE, Popen, TimeoutExpired, check_call, check_output, ) from typing import Callable, Dict, List, MutableMapping, Optional, Tuple, cast from schema_salad.sourceline import SourceLine from .builder import Builder from .context import RuntimeContext from .errors import UnsupportedRequirement, WorkflowException from .job import ContainerCommandLineJob from .loghandler import _logger from .pathmapper import MapperEnt, PathMapper from .utils import ( CWLObjectType, create_tmp_dir, docker_windows_path_adjust, ensure_non_writable, ensure_writable, ) _USERNS = None # type: Optional[bool] _SINGULARITY_VERSION = "" def _singularity_supports_userns() -> bool: global _USERNS # pylint: disable=global-statement if _USERNS is None: try: hello_image = os.path.join(os.path.dirname(__file__), "hello.simg") result = Popen( # nosec ["singularity", "exec", "--userns", hello_image, "true"], stderr=PIPE, stdout=DEVNULL, universal_newlines=True, ).communicate(timeout=60)[1] _USERNS = ( "No valid /bin/sh" in result or "/bin/sh doesn't exist in container" in result or "executable file not found in" in result ) except TimeoutExpired: _USERNS = False return _USERNS def get_version() -> str: global _SINGULARITY_VERSION # pylint: disable=global-statement if not _SINGULARITY_VERSION: _SINGULARITY_VERSION = check_output( # nosec ["singularity", "--version"], universal_newlines=True ) if _SINGULARITY_VERSION.startswith("singularity version "): _SINGULARITY_VERSION = _SINGULARITY_VERSION[20:] return _SINGULARITY_VERSION def is_version_2_6() -> bool: return get_version().startswith("2.6") def is_version_3_or_newer() -> bool: return int(get_version()[0]) >= 3 def is_version_3_1_or_newer() -> bool: version = get_version().split(".") return int(version[0]) >= 4 or (int(version[0]) == 3 and int(version[1]) >= 1) def _normalize_image_id(string: str) -> str: return string.replace("/", "_") + ".img" def _normalize_sif_id(string: str) -> str: return string.replace("/", "_") + ".sif" class SingularityCommandLineJob(ContainerCommandLineJob): def __init__( self, builder: Builder, joborder: CWLObjectType, make_path_mapper: Callable[..., PathMapper], requirements: List[CWLObjectType], hints: List[CWLObjectType], name: str, ) -> None: """Builder for invoking the Singularty software container engine.""" super().__init__(builder, joborder, make_path_mapper, requirements, hints, name) @staticmethod def get_image( dockerRequirement: Dict[str, str], pull_image: bool, force_pull: bool = False, ) -> bool: """ Acquire the software container image in the specified dockerRequirement. Uses Singularity and returns the success as a bool. Updates the provided dockerRequirement with the specific dockerImageId to the full path of the local image, if found. Likewise the dockerRequirement['dockerPull'] is updated to a docker:// URI if needed. """ found = False candidates = [] cache_folder = None if "CWL_SINGULARITY_CACHE" in os.environ: cache_folder = os.environ["CWL_SINGULARITY_CACHE"] elif is_version_2_6() and "SINGULARITY_PULLFOLDER" in os.environ: cache_folder = os.environ["SINGULARITY_PULLFOLDER"] if ( "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement ): match = re.search( pattern=r"([a-z]*://)", string=dockerRequirement["dockerPull"] ) img_name = _normalize_image_id(dockerRequirement["dockerPull"]) candidates.append(img_name) if is_version_3_or_newer(): sif_name = _normalize_sif_id(dockerRequirement["dockerPull"]) candidates.append(sif_name) dockerRequirement["dockerImageId"] = sif_name else: dockerRequirement["dockerImageId"] = img_name if not match: dockerRequirement["dockerPull"] = ( "docker://" + dockerRequirement["dockerPull"] ) elif "dockerImageId" in dockerRequirement: if os.path.isfile(dockerRequirement["dockerImageId"]): found = True candidates.append(dockerRequirement["dockerImageId"]) candidates.append(_normalize_image_id(dockerRequirement["dockerImageId"])) if is_version_3_or_newer(): candidates.append(_normalize_sif_id(dockerRequirement["dockerPull"])) targets = [os.getcwd()] if "CWL_SINGULARITY_CACHE" in os.environ: targets.append(os.environ["CWL_SINGULARITY_CACHE"]) if is_version_2_6() and "SINGULARITY_PULLFOLDER" in os.environ: targets.append(os.environ["SINGULARITY_PULLFOLDER"]) for target in targets: for dirpath, _subdirs, files in os.walk(target): for entry in files: if entry in candidates: path = os.path.join(dirpath, entry) if os.path.isfile(path): _logger.info( "Using local copy of Singularity image found in %s", dirpath, ) dockerRequirement["dockerImageId"] = path found = True if (force_pull or not found) and pull_image: cmd = [] # type: List[str] if "dockerPull" in dockerRequirement: if cache_folder: env = os.environ.copy() if is_version_2_6(): env["SINGULARITY_PULLFOLDER"] = cache_folder cmd = [ "singularity", "pull", "--force", "--name", dockerRequirement["dockerImageId"], str(dockerRequirement["dockerPull"]), ] else: cmd = [ "singularity", "pull", "--force", "--name", "{}/{}".format( cache_folder, dockerRequirement["dockerImageId"] ), str(dockerRequirement["dockerPull"]), ] _logger.info(str(cmd)) check_call(cmd, env=env, stdout=sys.stderr) # nosec dockerRequirement["dockerImageId"] = "{}/{}".format( cache_folder, dockerRequirement["dockerImageId"] ) found = True else: cmd = [ "singularity", "pull", "--force", "--name", str(dockerRequirement["dockerImageId"]), str(dockerRequirement["dockerPull"]), ] _logger.info(str(cmd)) check_call(cmd, stdout=sys.stderr) # nosec found = True elif "dockerFile" in dockerRequirement: raise WorkflowException( SourceLine(dockerRequirement, "dockerFile").makeError( "dockerFile is not currently supported when using the " "Singularity runtime for Docker containers." ) ) elif "dockerLoad" in dockerRequirement: if is_version_3_1_or_newer(): if "dockerImageId" in dockerRequirement: name = "{}.sif".format(dockerRequirement["dockerImageId"]) else: name = "{}.sif".format(dockerRequirement["dockerLoad"]) cmd = [ "singularity", "build", name, "docker-archive://{}".format(dockerRequirement["dockerLoad"]), ] _logger.info(str(cmd)) check_call(cmd, stdout=sys.stderr) # nosec found = True dockerRequirement["dockerImageId"] = name raise WorkflowException( SourceLine(dockerRequirement, "dockerLoad").makeError( "dockerLoad is not currently supported when using the " "Singularity runtime (version less than 3.1) for Docker containers." ) ) elif "dockerImport" in dockerRequirement: raise WorkflowException( SourceLine(dockerRequirement, "dockerImport").makeError( "dockerImport is not currently supported when using the " "Singularity runtime for Docker containers." ) ) return found def get_from_requirements( self, r: CWLObjectType, pull_image: bool, force_pull: bool, tmp_outdir_prefix: str, ) -> Optional[str]: """ Return the filename of the Singularity image. (e.g. hello-world-latest.{img,sif}). """ if not bool(spawn.find_executable("singularity")): raise WorkflowException("singularity executable is not available") if not self.get_image(cast(Dict[str, str], r), pull_image, force_pull): raise WorkflowException( "Container image {} not found".format(r["dockerImageId"]) ) return os.path.abspath(cast(str, r["dockerImageId"])) @staticmethod def append_volume( runtime: List[str], source: str, target: str, writable: bool = False ) -> None: runtime.append("--bind") runtime.append( "{}:{}:{}".format( docker_windows_path_adjust(source), docker_windows_path_adjust(target), "rw" if writable else "ro", ) ) def add_file_or_directory_volume( self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str] ) -> None: if host_outdir_tgt is not None: # workaround for lack of overlapping mounts in Singularity # revert to daa923d5b0be3819b6ed0e6440e7193e65141052 # once https://github.com/sylabs/singularity/issues/1607 # is fixed if volume.type == "File": shutil.copy(volume.resolved, host_outdir_tgt) else: shutil.copytree(volume.resolved, host_outdir_tgt) ensure_non_writable(host_outdir_tgt) elif not volume.resolved.startswith("_:"): self.append_volume(runtime, volume.resolved, volume.target) def add_writable_file_volume( self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str], tmpdir_prefix: str, ) -> None: if host_outdir_tgt is not None: # workaround for lack of overlapping mounts in Singularity # revert to daa923d5b0be3819b6ed0e6440e7193e65141052 # once https://github.com/sylabs/singularity/issues/1607 # is fixed if self.inplace_update: try: os.link(os.path.realpath(volume.resolved), host_outdir_tgt) except os.error: shutil.copy(volume.resolved, host_outdir_tgt) else: shutil.copy(volume.resolved, host_outdir_tgt) ensure_writable(host_outdir_tgt) elif self.inplace_update: self.append_volume(runtime, volume.resolved, volume.target, writable=True) ensure_writable(volume.resolved) else: file_copy = os.path.join( create_tmp_dir(tmpdir_prefix), os.path.basename(volume.resolved), ) shutil.copy(volume.resolved, file_copy) # volume.resolved = file_copy self.append_volume(runtime, file_copy, volume.target, writable=True) ensure_writable(file_copy) def add_writable_directory_volume( self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str], tmpdir_prefix: str, ) -> None: if volume.resolved.startswith("_:"): if host_outdir_tgt is not None: new_dir = host_outdir_tgt else: new_dir = os.path.join( create_tmp_dir(tmpdir_prefix), os.path.basename(volume.resolved), ) os.makedirs(new_dir) else: if host_outdir_tgt is not None: # workaround for lack of overlapping mounts in Singularity # revert to daa923d5b0be3819b6ed0e6440e7193e65141052 # once https://github.com/sylabs/singularity/issues/1607 # is fixed shutil.copytree(volume.resolved, host_outdir_tgt) ensure_writable(host_outdir_tgt) else: if not self.inplace_update: dir_copy = os.path.join( create_tmp_dir(tmpdir_prefix), os.path.basename(volume.resolved), ) shutil.copytree(volume.resolved, dir_copy) source = dir_copy # volume.resolved = dir_copy else: source = volume.resolved self.append_volume(runtime, source, volume.target, writable=True) ensure_writable(source) def create_runtime( self, env: MutableMapping[str, str], runtime_context: RuntimeContext ) -> Tuple[List[str], Optional[str]]: """Return the Singularity runtime list of commands and options.""" any_path_okay = self.builder.get_requirement("DockerRequirement")[1] or False runtime = [ "singularity", "--quiet", "exec", "--contain", "--ipc", ] if _singularity_supports_userns(): runtime.append("--userns") else: runtime.append("--pid") if is_version_3_1_or_newer(): runtime.append("--home") runtime.append( "{}:{}".format( docker_windows_path_adjust(os.path.realpath(self.outdir)), self.builder.outdir, ) ) else: runtime.append("--bind") runtime.append( "{}:{}:rw".format( docker_windows_path_adjust(os.path.realpath(self.outdir)), self.builder.outdir, ) ) runtime.append("--bind") tmpdir = "/tmp" # nosec runtime.append( "{}:{}:rw".format( docker_windows_path_adjust(os.path.realpath(self.tmpdir)), tmpdir ) ) self.add_volumes( self.pathmapper, runtime, any_path_okay=True, secret_store=runtime_context.secret_store, tmpdir_prefix=runtime_context.tmpdir_prefix, ) if self.generatemapper is not None: self.add_volumes( self.generatemapper, runtime, any_path_okay=any_path_okay, secret_store=runtime_context.secret_store, tmpdir_prefix=runtime_context.tmpdir_prefix, ) runtime.append("--pwd") runtime.append("%s" % (docker_windows_path_adjust(self.builder.outdir))) if runtime_context.custom_net: raise UnsupportedRequirement( "Singularity implementation does not support custom networking" ) elif runtime_context.disable_net: runtime.append("--net") env["SINGULARITYENV_TMPDIR"] = tmpdir env["SINGULARITYENV_HOME"] = self.builder.outdir for name, value in self.environment.items(): env[f"SINGULARITYENV_{name}"] = str(value) return (runtime, None)