Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/job.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/job.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,867 +0,0 @@ -from __future__ import absolute_import - -import datetime -import functools -import itertools -import logging -import threading -import os -import re -import shutil -import stat -import sys -import tempfile -import time -import uuid -from abc import ABCMeta, abstractmethod -from io import IOBase, open # pylint: disable=redefined-builtin -from threading import Timer -from typing import (IO, Any, AnyStr, Callable, Dict, Iterable, List, Tuple, - MutableMapping, MutableSequence, Optional, Union, cast) - -import psutil -import shellescape -from prov.model import PROV -from schema_salad.sourceline import SourceLine -from six import PY2, with_metaclass -from future.utils import raise_from -from typing_extensions import (TYPE_CHECKING, # pylint: disable=unused-import - Text) - -from .builder import Builder, HasReqsHints # pylint: disable=unused-import -from .context import RuntimeContext # pylint: disable=unused-import -from .context import getdefault -from .errors import WorkflowException -from .expression import JSON -from .loghandler import _logger -from .pathmapper import (MapperEnt, PathMapper, # pylint: disable=unused-import - ensure_writable, ensure_non_writable) -from .process import UnsupportedRequirement, stage_files -from .secrets import SecretStore # pylint: disable=unused-import -from .utils import (DEFAULT_TMP_PREFIX, Directory, bytes2str_in_dicts, - copytree_with_merge, json_dump, json_dumps, onWindows, - processes_to_kill, subprocess) - -if TYPE_CHECKING: - from .provenance import ProvenanceProfile # pylint: disable=unused-import -needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""") - -FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1" - -SHELL_COMMAND_TEMPLATE = u"""#!/bin/bash -python "run_job.py" "job.json" -""" - -PYTHON_RUN_SCRIPT = u""" -import json -import os -import sys -if os.name == 'posix': - try: - import subprocess32 as subprocess # type: ignore - except Exception: - import subprocess -else: - import subprocess # type: ignore - -with open(sys.argv[1], "r") as f: - popen_description = json.load(f) - commands = popen_description["commands"] - cwd = popen_description["cwd"] - env = popen_description["env"] - env["PATH"] = os.environ.get("PATH") - stdin_path = popen_description["stdin_path"] - stdout_path = popen_description["stdout_path"] - stderr_path = popen_description["stderr_path"] - if stdin_path is not None: - stdin = open(stdin_path, "rb") - else: - stdin = subprocess.PIPE - if stdout_path is not None: - stdout = open(stdout_path, "wb") - else: - stdout = sys.stderr - if stderr_path is not None: - stderr = open(stderr_path, "wb") - else: - stderr = sys.stderr - if os.name == 'nt': - close_fds = False - for key, value in env.items(): - env[key] = str(value) - else: - close_fds = True - sp = subprocess.Popen(commands, - shell=False, - close_fds=close_fds, - stdin=stdin, - stdout=stdout, - stderr=stderr, - env=env, - cwd=cwd) - if sp.stdin: - sp.stdin.close() - rcode = sp.wait() - if stdin is not subprocess.PIPE: - stdin.close() - if stdout is not sys.stderr: - stdout.close() - if stderr is not sys.stderr: - stderr.close() - sys.exit(rcode) -""" - - -def deref_links(outputs): # type: (Any) -> None - if isinstance(outputs, MutableMapping): - if outputs.get("class") == "File": - st = os.lstat(outputs["path"]) - if stat.S_ISLNK(st.st_mode): - outputs["basename"] = os.path.basename(outputs["path"]) - outputs["path"] = os.readlink(outputs["path"]) - else: - for v in outputs.values(): - deref_links(v) - if isinstance(outputs, MutableSequence): - for output in outputs: - deref_links(output) - - -def relink_initialworkdir(pathmapper, # type: PathMapper - host_outdir, # type: Text - container_outdir, # type: Text - inplace_update=False # type: bool - ): # type: (...) -> None - for _, vol in pathmapper.items(): - if not vol.staged: - continue - - if (vol.type in ("File", "Directory") or ( - inplace_update and vol.type in - ("WritableFile", "WritableDirectory"))): - if not vol.target.startswith(container_outdir): - # this is an input file written outside of the working - # directory, so therefor ineligable for being an output file. - # Thus, none of our business - continue - host_outdir_tgt = os.path.join( - host_outdir, vol.target[len(container_outdir) + 1:]) - if os.path.islink(host_outdir_tgt) \ - or os.path.isfile(host_outdir_tgt): - os.remove(host_outdir_tgt) - elif os.path.isdir(host_outdir_tgt) \ - and not vol.resolved.startswith("_:"): - shutil.rmtree(host_outdir_tgt) - if onWindows(): - # If this becomes a big issue for someone then we could - # refactor the code to process output from a running container - # and avoid all the extra IO below - if vol.type in ("File", "WritableFile"): - shutil.copy(vol.resolved, host_outdir_tgt) - elif vol.type in ("Directory", "WritableDirectory"): - copytree_with_merge(vol.resolved, host_outdir_tgt) - elif not vol.resolved.startswith("_:"): - os.symlink(vol.resolved, host_outdir_tgt) - - -class JobBase(with_metaclass(ABCMeta, HasReqsHints)): - def __init__(self, - builder, # type: Builder - joborder, # type: JSON - make_path_mapper, # type: Callable[..., PathMapper] - requirements, # type: List[Dict[Text, Text]] - hints, # type: List[Dict[Text, Text]] - name, # type: Text - ): # type: (...) -> None - """Initialize the job object.""" - self.builder = builder - self.joborder = joborder - self.stdin = None # type: Optional[Text] - self.stderr = None # type: Optional[Text] - self.stdout = None # type: Optional[Text] - self.successCodes = [] # type: Iterable[int] - self.temporaryFailCodes = [] # type: Iterable[int] - self.permanentFailCodes = [] # type: Iterable[int] - self.requirements = requirements - self.hints = hints - self.name = name - self.command_line = [] # type: List[Text] - self.pathmapper = PathMapper([], u"", u"") - self.make_path_mapper = make_path_mapper - self.generatemapper = None # type: Optional[PathMapper] - - # set in CommandLineTool.job(i) - self.collect_outputs = cast(Callable[[Text, int], MutableMapping[Text, Any]], - None) # type: Union[Callable[[Text, int], MutableMapping[Text, Any]], functools.partial[MutableMapping[Text, Any]]] - self.output_callback = cast(Callable[[Any, Any], Any], None) - self.outdir = u"" - self.tmpdir = u"" - - self.environment = {} # type: MutableMapping[Text, Text] - self.generatefiles = {"class": "Directory", "listing": [], "basename": ""} # type: Directory - self.stagedir = None # type: Optional[Text] - self.inplace_update = False - self.prov_obj = None # type: Optional[ProvenanceProfile] - self.parent_wf = None # type: Optional[ProvenanceProfile] - self.timelimit = None # type: Optional[int] - self.networkaccess = False # type: bool - - def __repr__(self): # type: () -> str - """Represent this Job object.""" - return "CommandLineJob(%s)" % self.name - - @abstractmethod - def run(self, - runtimeContext, # type: RuntimeContext - tmpdir_lock=None # type: Optional[threading.Lock] - ): # type: (...) -> None - pass - - def _setup(self, runtimeContext): # type: (RuntimeContext) -> None - if not os.path.exists(self.outdir): - os.makedirs(self.outdir) - - for knownfile in self.pathmapper.files(): - p = self.pathmapper.mapper(knownfile) - if p.type == "File" and not os.path.isfile(p[0]) and p.staged: - raise WorkflowException( - u"Input file %s (at %s) not found or is not a regular " - "file." % (knownfile, self.pathmapper.mapper(knownfile)[0])) - - if 'listing' in self.generatefiles: - runtimeContext = runtimeContext.copy() - runtimeContext.outdir = self.outdir - self.generatemapper = self.make_path_mapper( - cast(List[Any], self.generatefiles["listing"]), - self.builder.outdir, runtimeContext, False) - if _logger.isEnabledFor(logging.DEBUG): - _logger.debug( - u"[job %s] initial work dir %s", self.name, - json_dumps({p: self.generatemapper.mapper(p) - for p in self.generatemapper.files()}, indent=4)) - - def _execute(self, - runtime, # type: List[Text] - env, # type: MutableMapping[Text, Text] - runtimeContext, # type: RuntimeContext - monitor_function=None, # type: Optional[Callable[[subprocess.Popen], None]] - ): # type: (...) -> None - - scr, _ = self.get_requirement("ShellCommandRequirement") - - shouldquote = needs_shell_quoting_re.search # type: Callable[[Any], Any] - if scr is not None: - shouldquote = lambda x: False - - _logger.info(u"[job %s] %s$ %s%s%s%s", - self.name, - self.outdir, - " \\\n ".join([shellescape.quote(Text(arg)) if shouldquote(Text(arg)) else Text(arg) for arg in - (runtime + self.command_line)]), - u' < %s' % self.stdin if self.stdin else '', - u' > %s' % os.path.join(self.outdir, self.stdout) if self.stdout else '', - u' 2> %s' % os.path.join(self.outdir, self.stderr) if self.stderr else '') - if self.joborder is not None and runtimeContext.research_obj is not None: - job_order = self.joborder - if runtimeContext.process_run_id is not None \ - and runtimeContext.prov_obj is not None and isinstance(job_order, (list, dict)): - runtimeContext.prov_obj.used_artefacts( - job_order, runtimeContext.process_run_id, str(self.name)) - else: - _logger.warning("research_obj set but one of process_run_id " - "or prov_obj is missing from runtimeContext: " - "{}". format(runtimeContext)) - outputs = {} # type: MutableMapping[Text,Any] - try: - stdin_path = None - if self.stdin is not None: - rmap = self.pathmapper.reversemap(self.stdin) - if rmap is None: - raise WorkflowException( - "{} missing from pathmapper".format(self.stdin)) - else: - stdin_path = rmap[1] - - stderr_path = None - if self.stderr is not None: - abserr = os.path.join(self.outdir, self.stderr) - dnerr = os.path.dirname(abserr) - if dnerr and not os.path.exists(dnerr): - os.makedirs(dnerr) - stderr_path = abserr - - stdout_path = None - if self.stdout is not None: - absout = os.path.join(self.outdir, self.stdout) - dnout = os.path.dirname(absout) - if dnout and not os.path.exists(dnout): - os.makedirs(dnout) - stdout_path = absout - - commands = [Text(x) for x in runtime + self.command_line] - if runtimeContext.secret_store is not None: - commands = runtimeContext.secret_store.retrieve(commands) - env = runtimeContext.secret_store.retrieve(env) - - job_script_contents = None # type: Optional[Text] - builder = getattr(self, "builder", None) # type: Builder - if builder is not None: - job_script_contents = builder.build_job_script(commands) - rcode = _job_popen( - commands, - stdin_path=stdin_path, - stdout_path=stdout_path, - stderr_path=stderr_path, - env=env, - cwd=self.outdir, - job_dir=tempfile.mkdtemp(prefix=getdefault(runtimeContext.tmp_outdir_prefix, DEFAULT_TMP_PREFIX)), - job_script_contents=job_script_contents, - timelimit=self.timelimit, - name=self.name, - monitor_function=monitor_function - ) - - if rcode in self.successCodes: - processStatus = "success" - elif rcode in self.temporaryFailCodes: - processStatus = "temporaryFail" - elif rcode in self.permanentFailCodes: - processStatus = "permanentFail" - elif rcode == 0: - processStatus = "success" - else: - processStatus = "permanentFail" - - if 'listing' in self.generatefiles: - if self.generatemapper: - relink_initialworkdir( - self.generatemapper, self.outdir, self.builder.outdir, - inplace_update=self.inplace_update) - else: - raise ValueError("'lsiting' in self.generatefiles but no " - "generatemapper was setup.") - - outputs = self.collect_outputs(self.outdir, rcode) - outputs = bytes2str_in_dicts(outputs) # type: ignore - except OSError as e: - if e.errno == 2: - if runtime: - _logger.error(u"'%s' not found: %s", runtime[0], Text(e)) - else: - _logger.error(u"'%s' not found: %s", self.command_line[0], Text(e)) - else: - _logger.exception(u"Exception while running job") - processStatus = "permanentFail" - except WorkflowException as err: - _logger.error(u"[job %s] Job error:\n%s", self.name, Text(err)) - processStatus = "permanentFail" - except Exception as e: - _logger.exception(u"Exception while running job") - processStatus = "permanentFail" - if runtimeContext.research_obj is not None \ - and self.prov_obj is not None \ - and runtimeContext.process_run_id is not None: - # creating entities for the outputs produced by each step (in the provenance document) - self.prov_obj.record_process_end(str(self.name), runtimeContext.process_run_id, - outputs, datetime.datetime.now()) - if processStatus != "success": - _logger.warning(u"[job %s] completed %s", self.name, processStatus) - else: - _logger.info(u"[job %s] completed %s", self.name, processStatus) - - if _logger.isEnabledFor(logging.DEBUG): - _logger.debug(u"[job %s] %s", self.name, - json_dumps(outputs, indent=4)) - - if self.generatemapper is not None and runtimeContext.secret_store is not None: - # Delete any runtime-generated files containing secrets. - for _, p in self.generatemapper.items(): - if p.type == "CreateFile": - if runtimeContext.secret_store.has_secret(p.resolved): - host_outdir = self.outdir - container_outdir = self.builder.outdir - host_outdir_tgt = p.target - if p.target.startswith(container_outdir + "/"): - host_outdir_tgt = os.path.join( - host_outdir, p.target[len(container_outdir) + 1:]) - os.remove(host_outdir_tgt) - - if runtimeContext.workflow_eval_lock is None: - raise WorkflowException("runtimeContext.workflow_eval_lock must not be None") - - with runtimeContext.workflow_eval_lock: - self.output_callback(outputs, processStatus) - - if self.stagedir is not None and os.path.exists(self.stagedir): - _logger.debug(u"[job %s] Removing input staging directory %s", self.name, self.stagedir) - shutil.rmtree(self.stagedir, True) - - if runtimeContext.rm_tmpdir: - _logger.debug(u"[job %s] Removing temporary directory %s", self.name, self.tmpdir) - shutil.rmtree(self.tmpdir, True) - - def process_monitor(self, sproc): # type: (subprocess.Popen) -> None - monitor = psutil.Process(sproc.pid) - memory_usage = [None] # Value must be list rather than integer to utilise pass-by-reference in python - - def get_tree_mem_usage(memory_usage): # type: (List[int]) -> None - children = monitor.children() - rss = monitor.memory_info().rss - while len(children): - rss += sum([process.memory_info().rss for process in children]) - children = list(itertools.chain(*[process.children() for process in children])) - if memory_usage[0] is None or rss > memory_usage[0]: - memory_usage[0] = rss - - mem_tm = Timer(interval=1, function=get_tree_mem_usage, args=(memory_usage,)) - mem_tm.daemon = True - mem_tm.start() - sproc.wait() - mem_tm.cancel() - if memory_usage[0] is not None: - _logger.info(u"[job %s] Max memory used: %iMiB", self.name, - round(memory_usage[0] / (2 ** 20))) - else: - _logger.debug(u"Could not collect memory usage, job ended before monitoring began.") - - -class CommandLineJob(JobBase): - def run(self, - runtimeContext, # type: RuntimeContext - tmpdir_lock=None # type: Optional[threading.Lock] - ): # type: (...) -> None - - if tmpdir_lock: - with tmpdir_lock: - if not os.path.exists(self.tmpdir): - os.makedirs(self.tmpdir) - else: - if not os.path.exists(self.tmpdir): - os.makedirs(self.tmpdir) - - self._setup(runtimeContext) - - env = self.environment - vars_to_preserve = runtimeContext.preserve_environment - if runtimeContext.preserve_entire_environment is not False: - vars_to_preserve = os.environ - if vars_to_preserve: - for key, value in os.environ.items(): - if key in vars_to_preserve and key not in env: - # On Windows, subprocess env can't handle unicode. - env[key] = str(value) if onWindows() else value - env["HOME"] = str(self.outdir) if onWindows() else self.outdir - env["TMPDIR"] = str(self.tmpdir) if onWindows() else self.tmpdir - if "PATH" not in env: - env["PATH"] = str(os.environ["PATH"]) if onWindows() else os.environ["PATH"] - if "SYSTEMROOT" not in env and "SYSTEMROOT" in os.environ: - env["SYSTEMROOT"] = str(os.environ["SYSTEMROOT"]) if onWindows() \ - else os.environ["SYSTEMROOT"] - - stage_files(self.pathmapper, ignore_writable=True, symlink=True, - secret_store=runtimeContext.secret_store) - if self.generatemapper is not None: - stage_files(self.generatemapper, ignore_writable=self.inplace_update, - symlink=True, secret_store=runtimeContext.secret_store) - relink_initialworkdir( - self.generatemapper, self.outdir, self.builder.outdir, - inplace_update=self.inplace_update) - - monitor_function = functools.partial(self.process_monitor) - - self._execute([], env, runtimeContext, monitor_function) - - -CONTROL_CODE_RE = r'\x1b\[[0-9;]*[a-zA-Z]' - - -class ContainerCommandLineJob(with_metaclass(ABCMeta, JobBase)): - """Commandline job using containers.""" - - @abstractmethod - def get_from_requirements(self, - r, # type: Dict[Text, Text] - pull_image, # type: bool - force_pull=False, # type: bool - tmp_outdir_prefix=DEFAULT_TMP_PREFIX # type: Text - ): # type: (...) -> Optional[Text] - pass - - @abstractmethod - def create_runtime(self, - env, # type: MutableMapping[Text, Text] - runtime_context # type: RuntimeContext - ): # type: (...) -> Tuple[List[Text], Optional[Text]] - """Return the list of commands to run the selected container engine.""" - pass - - @staticmethod - @abstractmethod - def append_volume(runtime, source, target, writable=False): - # type: (List[Text], Text, Text, bool) -> None - """Add binding arguments to the runtime list.""" - pass - - @abstractmethod - def add_file_or_directory_volume(self, - runtime, # type: List[Text] - volume, # type: MapperEnt - host_outdir_tgt # type: Optional[Text] - ): # type: (...) -> None - """Append volume a file/dir mapping to the runtime option list.""" - pass - - @abstractmethod - def add_writable_file_volume(self, - runtime, # type: List[Text] - volume, # type: MapperEnt - host_outdir_tgt, # type: Optional[Text] - tmpdir_prefix # type: Text - ): # type: (...) -> None - """Append a writable file mapping to the runtime option list.""" - pass - - @abstractmethod - def add_writable_directory_volume(self, - runtime, # type: List[Text] - volume, # type: MapperEnt - host_outdir_tgt, # type: Optional[Text] - tmpdir_prefix # type: Text - ): # type: (...) -> None - """Append a writable directory mapping to the runtime option list.""" - pass - - def create_file_and_add_volume(self, - runtime, # type: List[Text] - volume, # type: MapperEnt - host_outdir_tgt, # type: Optional[Text] - secret_store, # type: Optional[SecretStore] - tmpdir_prefix # type: Text - ): # type: (...) -> Text - """Create the file and add a mapping.""" - if not host_outdir_tgt: - tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) - new_file = os.path.join( - tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir), - os.path.basename(volume.target)) - writable = True if volume.type == "CreateWritableFile" else False - if secret_store: - contents = secret_store.retrieve(volume.resolved) - else: - contents = volume.resolved - dirname = os.path.dirname(host_outdir_tgt or new_file) - if not os.path.exists(dirname): - os.makedirs(dirname) - with open(host_outdir_tgt or new_file, "wb") as file_literal: - file_literal.write(contents.encode("utf-8")) - if not host_outdir_tgt: - self.append_volume(runtime, new_file, volume.target, - writable=writable) - if writable: - ensure_writable(host_outdir_tgt or new_file) - else: - ensure_non_writable(host_outdir_tgt or new_file) - return host_outdir_tgt or new_file - - def add_volumes(self, - pathmapper, # type: PathMapper - runtime, # type: List[Text] - tmpdir_prefix, # type: Text - secret_store=None, # type: Optional[SecretStore] - any_path_okay=False # type: bool - ): # type: (...) -> None - """Append volume mappings to the runtime option list.""" - container_outdir = self.builder.outdir - for key, vol in (itm for itm in pathmapper.items() if itm[1].staged): - host_outdir_tgt = None # type: Optional[Text] - if vol.target.startswith(container_outdir + "/"): - host_outdir_tgt = os.path.join( - self.outdir, vol.target[len(container_outdir) + 1:]) - if not host_outdir_tgt and not any_path_okay: - raise WorkflowException( - "No mandatory DockerRequirement, yet path is outside " - "the designated output directory, also know as " - "$(runtime.outdir): {}".format(vol)) - if vol.type in ("File", "Directory"): - self.add_file_or_directory_volume( - runtime, vol, host_outdir_tgt) - elif vol.type == "WritableFile": - self.add_writable_file_volume( - runtime, vol, host_outdir_tgt, tmpdir_prefix) - elif vol.type == "WritableDirectory": - self.add_writable_directory_volume( - runtime, vol, host_outdir_tgt, tmpdir_prefix) - elif vol.type in ["CreateFile", "CreateWritableFile"]: - new_path = self.create_file_and_add_volume( - runtime, vol, host_outdir_tgt, secret_store, tmpdir_prefix) - pathmapper.update( - key, new_path, vol.target, vol.type, vol.staged) - - def run(self, - runtimeContext, # type: RuntimeContext - tmpdir_lock=None # type: Optional[threading.Lock] - ): # type: (...) -> None - if tmpdir_lock: - with tmpdir_lock: - if not os.path.exists(self.tmpdir): - os.makedirs(self.tmpdir) - else: - if not os.path.exists(self.tmpdir): - os.makedirs(self.tmpdir) - - (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") - self.prov_obj = runtimeContext.prov_obj - img_id = None - env = cast(MutableMapping[Text, Text], os.environ) - user_space_docker_cmd = runtimeContext.user_space_docker_cmd - if docker_req is not None and user_space_docker_cmd: - # For user-space docker implementations, a local image name or ID - # takes precedence over a network pull - if 'dockerImageId' in docker_req: - img_id = str(docker_req["dockerImageId"]) - elif 'dockerPull' in docker_req: - img_id = str(docker_req["dockerPull"]) - cmd = [user_space_docker_cmd, "pull", img_id] - _logger.info(Text(cmd)) - try: - subprocess.check_call(cmd, stdout=sys.stderr) - except OSError: - raise WorkflowException(SourceLine(docker_req).makeError( - "Either Docker container {} is not available with " - "user space docker implementation {} or {} is missing " - "or broken.".format(img_id, user_space_docker_cmd, - user_space_docker_cmd))) - else: - raise WorkflowException(SourceLine(docker_req).makeError( - "Docker image must be specified as 'dockerImageId' or " - "'dockerPull' when using user space implementations of " - "Docker")) - else: - try: - if docker_req is not None and runtimeContext.use_container: - img_id = str( - self.get_from_requirements( - docker_req, runtimeContext.pull_image, - getdefault(runtimeContext.force_docker_pull, False), - getdefault(runtimeContext.tmp_outdir_prefix, DEFAULT_TMP_PREFIX))) - if img_id is None: - if self.builder.find_default_container: - default_container = self.builder.find_default_container() - if default_container: - img_id = str(default_container) - - if docker_req is not None and img_id is None and runtimeContext.use_container: - raise Exception("Docker image not available") - - if self.prov_obj is not None and img_id is not None \ - and runtimeContext.process_run_id is not None: - container_agent = self.prov_obj.document.agent( - uuid.uuid4().urn, - {"prov:type": PROV["SoftwareAgent"], - "cwlprov:image": img_id, - "prov:label": "Container execution of image %s" % img_id}) - # FIXME: img_id is not a sha256 id, it might just be "debian:8" - # img_entity = document.entity("nih:sha-256;%s" % img_id, - # {"prov:label": "Container image %s" % img_id} ) - # The image is the plan for this activity-agent association - # document.wasAssociatedWith(process_run_ID, container_agent, img_entity) - self.prov_obj.document.wasAssociatedWith( - runtimeContext.process_run_id, container_agent) - except Exception as err: - container = "Singularity" if runtimeContext.singularity else "Docker" - _logger.debug(u"%s error", container, exc_info=True) - if docker_is_req: - raise_from(UnsupportedRequirement( - "%s is required to run this tool: %s" % (container, Text(err))), err) - else: - raise WorkflowException( - "{0} is not available for this tool, try " - "--no-container to disable {0}, or install " - "a user space Docker replacement like uDocker with " - "--user-space-docker-cmd.: {1}".format(container, err)) - - self._setup(runtimeContext) - (runtime, cidfile) = self.create_runtime(env, runtimeContext) - runtime.append(Text(img_id)) - monitor_function = None - if cidfile: - monitor_function = functools.partial( - self.docker_monitor, cidfile, runtimeContext.tmpdir_prefix, - not bool(runtimeContext.cidfile_dir)) - elif runtimeContext.user_space_docker_cmd: - monitor_function = functools.partial(self.process_monitor) - self._execute(runtime, env, runtimeContext, monitor_function) - - def docker_monitor(self, cidfile, tmpdir_prefix, cleanup_cidfile, process): - # type: (Text, Text, bool, subprocess.Popen) -> None - """Record memory usage of the running Docker container.""" - # Todo: consider switching to `docker create` / `docker start` - # instead of `docker run` as `docker create` outputs the container ID - # to stdout, but the container is frozen, thus allowing us to start the - # monitoring process without dealing with the cidfile or too-fast - # container execution - cid = None - while cid is None: - time.sleep(1) - if process.returncode is not None: - if cleanup_cidfile: - try: - os.remove(cidfile) - except OSError as exc: - _logger.warn("Ignored error cleaning up Docker cidfile: %s", exc) - return - try: - with open(cidfile) as cidhandle: - cid = cidhandle.readline().strip() - except (OSError, IOError): - cid = None - max_mem = psutil.virtual_memory().total - tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) - stats_file = tempfile.NamedTemporaryFile(prefix=tmp_prefix, dir=tmp_dir) - try: - with open(stats_file.name, mode="w") as stats_file_handle: - stats_proc = subprocess.Popen( - ['docker', 'stats', '--no-trunc', '--format', '{{.MemPerc}}', - cid], stdout=stats_file_handle, stderr=subprocess.DEVNULL) - process.wait() - stats_proc.kill() - except OSError as exc: - _logger.warn("Ignored error with docker stats: %s", exc) - return - max_mem_percent = 0 - with open(stats_file.name, mode="r") as stats: - for line in stats: - try: - mem_percent = float(re.sub( - CONTROL_CODE_RE, '', line).replace('%', '')) - if mem_percent > max_mem_percent: - max_mem_percent = mem_percent - except ValueError: - break - _logger.info(u"[job %s] Max memory used: %iMiB", self.name, - int((max_mem_percent / 100 * max_mem) / (2 ** 20))) - if cleanup_cidfile: - os.remove(cidfile) - - -def _job_popen(commands, # type: List[Text] - stdin_path, # type: Optional[Text] - stdout_path, # type: Optional[Text] - stderr_path, # type: Optional[Text] - env, # type: MutableMapping[AnyStr, AnyStr] - cwd, # type: Text - job_dir, # type: Text - job_script_contents=None, # type: Optional[Text] - timelimit=None, # type: Optional[int] - name=None, # type: Optional[Text] - monitor_function=None # type: Optional[Callable[[subprocess.Popen], None]] - ): # type: (...) -> int - - if job_script_contents is None and not FORCE_SHELLED_POPEN: - - stdin = subprocess.PIPE # type: Union[IO[Any], int] - if stdin_path is not None: - stdin = open(stdin_path, "rb") - - stdout = sys.stderr # type: IO[Any] - if stdout_path is not None: - stdout = open(stdout_path, "wb") - - stderr = sys.stderr # type: IO[Any] - if stderr_path is not None: - stderr = open(stderr_path, "wb") - - sproc = subprocess.Popen(commands, - shell=False, - close_fds=not onWindows(), - stdin=stdin, - stdout=stdout, - stderr=stderr, - env=env, - cwd=cwd) - processes_to_kill.append(sproc) - - if sproc.stdin is not None: - sproc.stdin.close() - - tm = None - if timelimit is not None and timelimit > 0: - def terminate(): # type: () -> None - try: - _logger.warning( - u"[job %s] exceeded time limit of %d seconds and will" - "be terminated", name, timelimit) - sproc.terminate() - except OSError: - pass - - tm = Timer(timelimit, terminate) - tm.daemon = True - tm.start() - if monitor_function: - monitor_function(sproc) - rcode = sproc.wait() - - if tm is not None: - tm.cancel() - - if isinstance(stdin, IOBase): - stdin.close() - - if stdout is not sys.stderr: - stdout.close() - - if stderr is not sys.stderr: - stderr.close() - - return rcode - else: - if job_script_contents is None: - job_script_contents = SHELL_COMMAND_TEMPLATE - - env_copy = {} - key = None # type: Any - for key in env: - env_copy[key] = env[key] - - job_description = { - u"commands": commands, - u"cwd": cwd, - u"env": env_copy, - u"stdout_path": stdout_path, - u"stderr_path": stderr_path, - u"stdin_path": stdin_path} - - if PY2: - with open(os.path.join(job_dir, "job.json"), mode="wb") as job_file: - json_dump(job_description, job_file, ensure_ascii=False) - else: - with open(os.path.join(job_dir, "job.json"), mode="w", - encoding='utf-8') as job_file: - json_dump(job_description, job_file, ensure_ascii=False) - try: - job_script = os.path.join(job_dir, "run_job.bash") - with open(job_script, "wb") as _: - _.write(job_script_contents.encode('utf-8')) - job_run = os.path.join(job_dir, "run_job.py") - with open(job_run, "wb") as _: - _.write(PYTHON_RUN_SCRIPT.encode('utf-8')) - sproc = subprocess.Popen( - ["bash", job_script.encode("utf-8")], - shell=False, - cwd=job_dir, - # The nested script will output the paths to the correct files if they need - # to be captured. Else just write everything to stderr (same as above). - stdout=sys.stderr, - stderr=sys.stderr, - stdin=subprocess.PIPE, - ) - processes_to_kill.append(sproc) - if sproc.stdin is not None: - sproc.stdin.close() - - rcode = sproc.wait() - - return rcode - finally: - shutil.rmtree(job_dir)
