comparison env/lib/python3.9/site-packages/cwltool/job.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import datetime
2 import functools
3 import itertools
4 import logging
5 import os
6 import re
7 import shutil
8 import subprocess # nosec
9 import sys
10 import tempfile
11 import threading
12 import time
13 import uuid
14 from abc import ABCMeta, abstractmethod
15 from io import IOBase
16 from threading import Timer
17 from typing import (
18 IO,
19 Callable,
20 Iterable,
21 List,
22 Match,
23 MutableMapping,
24 MutableSequence,
25 Optional,
26 TextIO,
27 Tuple,
28 Union,
29 cast,
30 )
31
32 import psutil
33 import shellescape
34 from prov.model import PROV
35 from schema_salad.sourceline import SourceLine
36 from schema_salad.utils import json_dump, json_dumps
37 from typing_extensions import TYPE_CHECKING
38
39 from .builder import Builder, HasReqsHints
40 from .context import RuntimeContext
41 from .errors import UnsupportedRequirement, WorkflowException
42 from .loghandler import _logger
43 from .pathmapper import MapperEnt, PathMapper
44 from .process import stage_files
45 from .secrets import SecretStore
46 from .utils import (
47 CWLObjectType,
48 CWLOutputType,
49 DirectoryType,
50 OutputCallbackType,
51 bytes2str_in_dicts,
52 copytree_with_merge,
53 create_tmp_dir,
54 ensure_non_writable,
55 ensure_writable,
56 onWindows,
57 processes_to_kill,
58 )
59
60 if TYPE_CHECKING:
61 from .provenance_profile import ProvenanceProfile # pylint: disable=unused-import
62 needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""")
63
64 FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1"
65
66 SHELL_COMMAND_TEMPLATE = """#!/bin/bash
67 python "run_job.py" "job.json"
68 """
69
70 PYTHON_RUN_SCRIPT = """
71 import json
72 import os
73 import sys
74 if os.name == 'posix':
75 try:
76 import subprocess32 as subprocess # type: ignore
77 except Exception:
78 import subprocess
79 else:
80 import subprocess # type: ignore
81
82 with open(sys.argv[1], "r") as f:
83 popen_description = json.load(f)
84 commands = popen_description["commands"]
85 cwd = popen_description["cwd"]
86 env = popen_description["env"]
87 env["PATH"] = os.environ.get("PATH")
88 stdin_path = popen_description["stdin_path"]
89 stdout_path = popen_description["stdout_path"]
90 stderr_path = popen_description["stderr_path"]
91 if stdin_path is not None:
92 stdin = open(stdin_path, "rb")
93 else:
94 stdin = subprocess.PIPE
95 if stdout_path is not None:
96 stdout = open(stdout_path, "wb")
97 else:
98 stdout = sys.stderr
99 if stderr_path is not None:
100 stderr = open(stderr_path, "wb")
101 else:
102 stderr = sys.stderr
103 if os.name == 'nt':
104 close_fds = False
105 for key, value in env.items():
106 env[key] = str(value)
107 else:
108 close_fds = True
109 sp = subprocess.Popen(commands,
110 shell=False,
111 close_fds=close_fds,
112 stdin=stdin,
113 stdout=stdout,
114 stderr=stderr,
115 env=env,
116 cwd=cwd)
117 if sp.stdin:
118 sp.stdin.close()
119 rcode = sp.wait()
120 if stdin is not subprocess.PIPE:
121 stdin.close()
122 if stdout is not sys.stderr:
123 stdout.close()
124 if stderr is not sys.stderr:
125 stderr.close()
126 sys.exit(rcode)
127 """
128
129
130 def relink_initialworkdir(
131 pathmapper: PathMapper,
132 host_outdir: str,
133 container_outdir: str,
134 inplace_update: bool = False,
135 ) -> None:
136 for _, vol in pathmapper.items():
137 if not vol.staged:
138 continue
139
140 if vol.type in ("File", "Directory") or (
141 inplace_update and vol.type in ("WritableFile", "WritableDirectory")
142 ):
143 if not vol.target.startswith(container_outdir):
144 # this is an input file written outside of the working
145 # directory, so therefor ineligable for being an output file.
146 # Thus, none of our business
147 continue
148 host_outdir_tgt = os.path.join(
149 host_outdir, vol.target[len(container_outdir) + 1 :]
150 )
151 if os.path.islink(host_outdir_tgt) or os.path.isfile(host_outdir_tgt):
152 try:
153 os.remove(host_outdir_tgt)
154 except PermissionError:
155 pass
156 elif os.path.isdir(host_outdir_tgt) and not vol.resolved.startswith("_:"):
157 shutil.rmtree(host_outdir_tgt)
158 if onWindows():
159 # If this becomes a big issue for someone then we could
160 # refactor the code to process output from a running container
161 # and avoid all the extra IO below
162 if vol.type in ("File", "WritableFile"):
163 shutil.copy(vol.resolved, host_outdir_tgt)
164 elif vol.type in ("Directory", "WritableDirectory"):
165 copytree_with_merge(vol.resolved, host_outdir_tgt)
166 elif not vol.resolved.startswith("_:"):
167 try:
168 os.symlink(vol.resolved, host_outdir_tgt)
169 except FileExistsError:
170 pass
171
172
173 def neverquote(string: str, pos: int = 0, endpos: int = 0) -> Optional[Match[str]]:
174 return None
175
176
177 CollectOutputsType = Union[Callable[[str, int], CWLObjectType], functools.partial]
178
179
180 class JobBase(HasReqsHints, metaclass=ABCMeta):
181 def __init__(
182 self,
183 builder: Builder,
184 joborder: CWLObjectType,
185 make_path_mapper: Callable[..., PathMapper],
186 requirements: List[CWLObjectType],
187 hints: List[CWLObjectType],
188 name: str,
189 ) -> None:
190 """Initialize the job object."""
191 super().__init__()
192 self.builder = builder
193 self.joborder = joborder
194 self.stdin = None # type: Optional[str]
195 self.stderr = None # type: Optional[str]
196 self.stdout = None # type: Optional[str]
197 self.successCodes = [] # type: Iterable[int]
198 self.temporaryFailCodes = [] # type: Iterable[int]
199 self.permanentFailCodes = [] # type: Iterable[int]
200 self.requirements = requirements
201 self.hints = hints
202 self.name = name
203 self.command_line = [] # type: List[str]
204 self.pathmapper = PathMapper([], "", "")
205 self.make_path_mapper = make_path_mapper
206 self.generatemapper = None # type: Optional[PathMapper]
207
208 # set in CommandLineTool.job(i)
209 self.collect_outputs = cast(CollectOutputsType, None)
210 self.output_callback = None # type: Optional[OutputCallbackType]
211 self.outdir = ""
212 self.tmpdir = ""
213
214 self.environment = {} # type: MutableMapping[str, str]
215 self.generatefiles = {
216 "class": "Directory",
217 "listing": [],
218 "basename": "",
219 } # type: DirectoryType
220 self.stagedir = None # type: Optional[str]
221 self.inplace_update = False
222 self.prov_obj = None # type: Optional[ProvenanceProfile]
223 self.parent_wf = None # type: Optional[ProvenanceProfile]
224 self.timelimit = None # type: Optional[int]
225 self.networkaccess = False # type: bool
226 self.mpi_procs = None # type: Optional[int]
227
228 def __repr__(self): # type: () -> str
229 """Represent this Job object."""
230 return "CommandLineJob(%s)" % self.name
231
232 @abstractmethod
233 def run(
234 self,
235 runtimeContext: RuntimeContext,
236 tmpdir_lock: Optional[threading.Lock] = None,
237 ) -> None:
238 pass
239
240 def _setup(self, runtimeContext: RuntimeContext) -> None:
241 if not os.path.exists(self.outdir):
242 os.makedirs(self.outdir)
243
244 for knownfile in self.pathmapper.files():
245 p = self.pathmapper.mapper(knownfile)
246 if p.type == "File" and not os.path.isfile(p[0]) and p.staged:
247 raise WorkflowException(
248 "Input file %s (at %s) not found or is not a regular "
249 "file." % (knownfile, self.pathmapper.mapper(knownfile)[0])
250 )
251
252 if "listing" in self.generatefiles:
253 runtimeContext = runtimeContext.copy()
254 runtimeContext.outdir = self.outdir
255 self.generatemapper = self.make_path_mapper(
256 self.generatefiles["listing"],
257 self.builder.outdir,
258 runtimeContext,
259 False,
260 )
261 if _logger.isEnabledFor(logging.DEBUG):
262 _logger.debug(
263 "[job %s] initial work dir %s",
264 self.name,
265 json_dumps(
266 {
267 p: self.generatemapper.mapper(p)
268 for p in self.generatemapper.files()
269 },
270 indent=4,
271 ),
272 )
273
274 def _execute(
275 self,
276 runtime: List[str],
277 env: MutableMapping[str, str],
278 runtimeContext: RuntimeContext,
279 monitor_function=None, # type: Optional[Callable[[subprocess.Popen[str]], None]]
280 ) -> None:
281
282 scr = self.get_requirement("ShellCommandRequirement")[0]
283
284 shouldquote = needs_shell_quoting_re.search
285 if scr is not None:
286 shouldquote = neverquote
287
288 # If mpi_procs (is not None and > 0) then prepend the
289 # appropriate MPI job launch command and flags before the
290 # execution.
291 if self.mpi_procs:
292 menv = runtimeContext.mpi_config
293 mpi_runtime = [
294 menv.runner,
295 menv.nproc_flag,
296 str(self.mpi_procs),
297 ] + menv.extra_flags
298 runtime = mpi_runtime + runtime
299 menv.pass_through_env_vars(env)
300 menv.set_env_vars(env)
301
302 _logger.info(
303 "[job %s] %s$ %s%s%s%s",
304 self.name,
305 self.outdir,
306 " \\\n ".join(
307 [
308 shellescape.quote(str(arg)) if shouldquote(str(arg)) else str(arg)
309 for arg in (runtime + self.command_line)
310 ]
311 ),
312 " < %s" % self.stdin if self.stdin else "",
313 " > %s" % os.path.join(self.outdir, self.stdout) if self.stdout else "",
314 " 2> %s" % os.path.join(self.outdir, self.stderr) if self.stderr else "",
315 )
316 if self.joborder is not None and runtimeContext.research_obj is not None:
317 job_order = self.joborder
318 if (
319 runtimeContext.process_run_id is not None
320 and runtimeContext.prov_obj is not None
321 and isinstance(job_order, (list, dict))
322 ):
323 runtimeContext.prov_obj.used_artefacts(
324 job_order, runtimeContext.process_run_id, str(self.name)
325 )
326 else:
327 _logger.warning(
328 "research_obj set but one of process_run_id "
329 "or prov_obj is missing from runtimeContext: "
330 "{}".format(runtimeContext)
331 )
332 outputs = {} # type: CWLObjectType
333 try:
334 stdin_path = None
335 if self.stdin is not None:
336 rmap = self.pathmapper.reversemap(self.stdin)
337 if rmap is None:
338 raise WorkflowException(f"{self.stdin} missing from pathmapper")
339 else:
340 stdin_path = rmap[1]
341
342 stderr_path = None
343 if self.stderr is not None:
344 abserr = os.path.join(self.outdir, self.stderr)
345 dnerr = os.path.dirname(abserr)
346 if dnerr and not os.path.exists(dnerr):
347 os.makedirs(dnerr)
348 stderr_path = abserr
349
350 stdout_path = None
351 if self.stdout is not None:
352 absout = os.path.join(self.outdir, self.stdout)
353 dnout = os.path.dirname(absout)
354 if dnout and not os.path.exists(dnout):
355 os.makedirs(dnout)
356 stdout_path = absout
357
358 commands = [str(x) for x in runtime + self.command_line]
359 if runtimeContext.secret_store is not None:
360 commands = cast(
361 List[str],
362 runtimeContext.secret_store.retrieve(cast(CWLOutputType, commands)),
363 )
364 env = cast(
365 MutableMapping[str, str],
366 runtimeContext.secret_store.retrieve(cast(CWLOutputType, env)),
367 )
368
369 job_script_contents = None # type: Optional[str]
370 builder = getattr(self, "builder", None) # type: Builder
371 if builder is not None:
372 job_script_contents = builder.build_job_script(commands)
373 rcode = _job_popen(
374 commands,
375 stdin_path=stdin_path,
376 stdout_path=stdout_path,
377 stderr_path=stderr_path,
378 env=env,
379 cwd=self.outdir,
380 make_job_dir=lambda: runtimeContext.create_outdir(),
381 job_script_contents=job_script_contents,
382 timelimit=self.timelimit,
383 name=self.name,
384 monitor_function=monitor_function,
385 default_stdout=runtimeContext.default_stdout,
386 default_stderr=runtimeContext.default_stderr,
387 )
388
389 if rcode in self.successCodes:
390 processStatus = "success"
391 elif rcode in self.temporaryFailCodes:
392 processStatus = "temporaryFail"
393 elif rcode in self.permanentFailCodes:
394 processStatus = "permanentFail"
395 elif rcode == 0:
396 processStatus = "success"
397 else:
398 processStatus = "permanentFail"
399
400 if "listing" in self.generatefiles:
401 if self.generatemapper:
402 relink_initialworkdir(
403 self.generatemapper,
404 self.outdir,
405 self.builder.outdir,
406 inplace_update=self.inplace_update,
407 )
408 else:
409 raise ValueError(
410 "'listing' in self.generatefiles but no "
411 "generatemapper was setup."
412 )
413
414 outputs = self.collect_outputs(self.outdir, rcode)
415 outputs = bytes2str_in_dicts(outputs) # type: ignore
416 except OSError as e:
417 if e.errno == 2:
418 if runtime:
419 _logger.error("'%s' not found: %s", runtime[0], str(e))
420 else:
421 _logger.error("'%s' not found: %s", self.command_line[0], str(e))
422 else:
423 _logger.exception("Exception while running job")
424 processStatus = "permanentFail"
425 except WorkflowException as err:
426 _logger.error("[job %s] Job error:\n%s", self.name, str(err))
427 processStatus = "permanentFail"
428 except Exception:
429 _logger.exception("Exception while running job")
430 processStatus = "permanentFail"
431 if (
432 runtimeContext.research_obj is not None
433 and self.prov_obj is not None
434 and runtimeContext.process_run_id is not None
435 ):
436 # creating entities for the outputs produced by each step (in the provenance document)
437 self.prov_obj.record_process_end(
438 str(self.name),
439 runtimeContext.process_run_id,
440 outputs,
441 datetime.datetime.now(),
442 )
443 if processStatus != "success":
444 _logger.warning("[job %s] completed %s", self.name, processStatus)
445 else:
446 _logger.info("[job %s] completed %s", self.name, processStatus)
447
448 if _logger.isEnabledFor(logging.DEBUG):
449 _logger.debug(
450 "[job %s] outputs %s", self.name, json_dumps(outputs, indent=4)
451 )
452
453 if self.generatemapper is not None and runtimeContext.secret_store is not None:
454 # Delete any runtime-generated files containing secrets.
455 for _, p in self.generatemapper.items():
456 if p.type == "CreateFile":
457 if runtimeContext.secret_store.has_secret(p.resolved):
458 host_outdir = self.outdir
459 container_outdir = self.builder.outdir
460 host_outdir_tgt = p.target
461 if p.target.startswith(container_outdir + "/"):
462 host_outdir_tgt = os.path.join(
463 host_outdir, p.target[len(container_outdir) + 1 :]
464 )
465 os.remove(host_outdir_tgt)
466
467 if runtimeContext.workflow_eval_lock is None:
468 raise WorkflowException(
469 "runtimeContext.workflow_eval_lock must not be None"
470 )
471
472 if self.output_callback:
473 with runtimeContext.workflow_eval_lock:
474 self.output_callback(outputs, processStatus)
475
476 if self.stagedir is not None and os.path.exists(self.stagedir):
477 _logger.debug(
478 "[job %s] Removing input staging directory %s",
479 self.name,
480 self.stagedir,
481 )
482 shutil.rmtree(self.stagedir, True)
483
484 if runtimeContext.rm_tmpdir:
485 _logger.debug(
486 "[job %s] Removing temporary directory %s", self.name, self.tmpdir
487 )
488 shutil.rmtree(self.tmpdir, True)
489
490 def process_monitor(self, sproc): # type: (subprocess.Popen[str]) -> None
491 monitor = psutil.Process(sproc.pid)
492 # Value must be list rather than integer to utilise pass-by-reference in python
493 memory_usage = [None] # type: MutableSequence[Optional[int]]
494
495 def get_tree_mem_usage(memory_usage: MutableSequence[Optional[int]]) -> None:
496 children = monitor.children()
497 rss = monitor.memory_info().rss
498 while len(children):
499 rss += sum([process.memory_info().rss for process in children])
500 children = list(
501 itertools.chain(*[process.children() for process in children])
502 )
503 if memory_usage[0] is None or rss > memory_usage[0]:
504 memory_usage[0] = rss
505
506 mem_tm = Timer(interval=1, function=get_tree_mem_usage, args=(memory_usage,))
507 mem_tm.daemon = True
508 mem_tm.start()
509 sproc.wait()
510 mem_tm.cancel()
511 if memory_usage[0] is not None:
512 _logger.info(
513 "[job %s] Max memory used: %iMiB",
514 self.name,
515 round(memory_usage[0] / (2 ** 20)),
516 )
517 else:
518 _logger.debug(
519 "Could not collect memory usage, job ended before monitoring began."
520 )
521
522
523 class CommandLineJob(JobBase):
524 def run(
525 self,
526 runtimeContext: RuntimeContext,
527 tmpdir_lock: Optional[threading.Lock] = None,
528 ) -> None:
529
530 if tmpdir_lock:
531 with tmpdir_lock:
532 if not os.path.exists(self.tmpdir):
533 os.makedirs(self.tmpdir)
534 else:
535 if not os.path.exists(self.tmpdir):
536 os.makedirs(self.tmpdir)
537
538 self._setup(runtimeContext)
539
540 env = self.environment
541 vars_to_preserve = runtimeContext.preserve_environment
542 if runtimeContext.preserve_entire_environment is not False:
543 vars_to_preserve = os.environ
544 if vars_to_preserve:
545 for key, value in os.environ.items():
546 if key in vars_to_preserve and key not in env:
547 # On Windows, subprocess env can't handle unicode.
548 env[key] = str(value) if onWindows() else value
549 env["HOME"] = str(self.outdir) if onWindows() else self.outdir
550 env["TMPDIR"] = str(self.tmpdir) if onWindows() else self.tmpdir
551 if "PATH" not in env:
552 env["PATH"] = str(os.environ["PATH"]) if onWindows() else os.environ["PATH"]
553 if "SYSTEMROOT" not in env and "SYSTEMROOT" in os.environ:
554 env["SYSTEMROOT"] = (
555 str(os.environ["SYSTEMROOT"])
556 if onWindows()
557 else os.environ["SYSTEMROOT"]
558 )
559
560 stage_files(
561 self.pathmapper,
562 ignore_writable=True,
563 symlink=True,
564 secret_store=runtimeContext.secret_store,
565 )
566 if self.generatemapper is not None:
567 stage_files(
568 self.generatemapper,
569 ignore_writable=self.inplace_update,
570 symlink=True,
571 secret_store=runtimeContext.secret_store,
572 )
573 relink_initialworkdir(
574 self.generatemapper,
575 self.outdir,
576 self.builder.outdir,
577 inplace_update=self.inplace_update,
578 )
579
580 monitor_function = functools.partial(self.process_monitor)
581
582 self._execute([], env, runtimeContext, monitor_function)
583
584
585 CONTROL_CODE_RE = r"\x1b\[[0-9;]*[a-zA-Z]"
586
587
588 class ContainerCommandLineJob(JobBase, metaclass=ABCMeta):
589 """Commandline job using containers."""
590
591 @abstractmethod
592 def get_from_requirements(
593 self,
594 r: CWLObjectType,
595 pull_image: bool,
596 force_pull: bool,
597 tmp_outdir_prefix: str,
598 ) -> Optional[str]:
599 pass
600
601 @abstractmethod
602 def create_runtime(
603 self,
604 env: MutableMapping[str, str],
605 runtime_context: RuntimeContext,
606 ) -> Tuple[List[str], Optional[str]]:
607 """Return the list of commands to run the selected container engine."""
608
609 @staticmethod
610 @abstractmethod
611 def append_volume(
612 runtime: List[str], source: str, target: str, writable: bool = False
613 ) -> None:
614 """Add binding arguments to the runtime list."""
615
616 @abstractmethod
617 def add_file_or_directory_volume(
618 self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str]
619 ) -> None:
620 """Append volume a file/dir mapping to the runtime option list."""
621
622 @abstractmethod
623 def add_writable_file_volume(
624 self,
625 runtime: List[str],
626 volume: MapperEnt,
627 host_outdir_tgt: Optional[str],
628 tmpdir_prefix: str,
629 ) -> None:
630 """Append a writable file mapping to the runtime option list."""
631
632 @abstractmethod
633 def add_writable_directory_volume(
634 self,
635 runtime: List[str],
636 volume: MapperEnt,
637 host_outdir_tgt: Optional[str],
638 tmpdir_prefix: str,
639 ) -> None:
640 """Append a writable directory mapping to the runtime option list."""
641
642 def create_file_and_add_volume(
643 self,
644 runtime: List[str],
645 volume: MapperEnt,
646 host_outdir_tgt: Optional[str],
647 secret_store: Optional[SecretStore],
648 tmpdir_prefix: str,
649 ) -> str:
650 """Create the file and add a mapping."""
651 if not host_outdir_tgt:
652 new_file = os.path.join(
653 create_tmp_dir(tmpdir_prefix),
654 os.path.basename(volume.target),
655 )
656 writable = True if volume.type == "CreateWritableFile" else False
657 contents = volume.resolved
658 if secret_store:
659 contents = cast(str, secret_store.retrieve(volume.resolved))
660 dirname = os.path.dirname(host_outdir_tgt or new_file)
661 if not os.path.exists(dirname):
662 os.makedirs(dirname)
663 with open(host_outdir_tgt or new_file, "wb") as file_literal:
664 file_literal.write(contents.encode("utf-8"))
665 if not host_outdir_tgt:
666 self.append_volume(runtime, new_file, volume.target, writable=writable)
667 if writable:
668 ensure_writable(host_outdir_tgt or new_file)
669 else:
670 ensure_non_writable(host_outdir_tgt or new_file)
671 return host_outdir_tgt or new_file
672
673 def add_volumes(
674 self,
675 pathmapper: PathMapper,
676 runtime: List[str],
677 tmpdir_prefix: str,
678 secret_store: Optional[SecretStore] = None,
679 any_path_okay: bool = False,
680 ) -> None:
681 """Append volume mappings to the runtime option list."""
682 container_outdir = self.builder.outdir
683 for key, vol in (itm for itm in pathmapper.items() if itm[1].staged):
684 host_outdir_tgt = None # type: Optional[str]
685 if vol.target.startswith(container_outdir + "/"):
686 host_outdir_tgt = os.path.join(
687 self.outdir, vol.target[len(container_outdir) + 1 :]
688 )
689 if not host_outdir_tgt and not any_path_okay:
690 raise WorkflowException(
691 "No mandatory DockerRequirement, yet path is outside "
692 "the designated output directory, also know as "
693 "$(runtime.outdir): {}".format(vol)
694 )
695 if vol.type in ("File", "Directory"):
696 self.add_file_or_directory_volume(runtime, vol, host_outdir_tgt)
697 elif vol.type == "WritableFile":
698 self.add_writable_file_volume(
699 runtime, vol, host_outdir_tgt, tmpdir_prefix
700 )
701 elif vol.type == "WritableDirectory":
702 self.add_writable_directory_volume(
703 runtime, vol, host_outdir_tgt, tmpdir_prefix
704 )
705 elif vol.type in ["CreateFile", "CreateWritableFile"]:
706 new_path = self.create_file_and_add_volume(
707 runtime, vol, host_outdir_tgt, secret_store, tmpdir_prefix
708 )
709 pathmapper.update(key, new_path, vol.target, vol.type, vol.staged)
710
711 def run(
712 self,
713 runtimeContext: RuntimeContext,
714 tmpdir_lock: Optional[threading.Lock] = None,
715 ) -> None:
716 if tmpdir_lock:
717 with tmpdir_lock:
718 if not os.path.exists(self.tmpdir):
719 os.makedirs(self.tmpdir)
720 else:
721 if not os.path.exists(self.tmpdir):
722 os.makedirs(self.tmpdir)
723
724 (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
725 self.prov_obj = runtimeContext.prov_obj
726 img_id = None
727 env = cast(MutableMapping[str, str], os.environ)
728 user_space_docker_cmd = runtimeContext.user_space_docker_cmd
729 if docker_req is not None and user_space_docker_cmd:
730 # For user-space docker implementations, a local image name or ID
731 # takes precedence over a network pull
732 if "dockerImageId" in docker_req:
733 img_id = str(docker_req["dockerImageId"])
734 elif "dockerPull" in docker_req:
735 img_id = str(docker_req["dockerPull"])
736 cmd = [user_space_docker_cmd, "pull", img_id]
737 _logger.info(str(cmd))
738 try:
739 subprocess.check_call(cmd, stdout=sys.stderr) # nosec
740 except OSError:
741 raise WorkflowException(
742 SourceLine(docker_req).makeError(
743 "Either Docker container {} is not available with "
744 "user space docker implementation {} or {} is missing "
745 "or broken.".format(
746 img_id, user_space_docker_cmd, user_space_docker_cmd
747 )
748 )
749 )
750 else:
751 raise WorkflowException(
752 SourceLine(docker_req).makeError(
753 "Docker image must be specified as 'dockerImageId' or "
754 "'dockerPull' when using user space implementations of "
755 "Docker"
756 )
757 )
758 else:
759 try:
760 if docker_req is not None and runtimeContext.use_container:
761 img_id = str(
762 self.get_from_requirements(
763 docker_req,
764 runtimeContext.pull_image,
765 runtimeContext.force_docker_pull,
766 runtimeContext.tmp_outdir_prefix,
767 )
768 )
769 if img_id is None:
770 if self.builder.find_default_container:
771 default_container = self.builder.find_default_container()
772 if default_container:
773 img_id = str(default_container)
774
775 if (
776 docker_req is not None
777 and img_id is None
778 and runtimeContext.use_container
779 ):
780 raise Exception("Docker image not available")
781
782 if (
783 self.prov_obj is not None
784 and img_id is not None
785 and runtimeContext.process_run_id is not None
786 ):
787 container_agent = self.prov_obj.document.agent(
788 uuid.uuid4().urn,
789 {
790 "prov:type": PROV["SoftwareAgent"],
791 "cwlprov:image": img_id,
792 "prov:label": "Container execution of image %s" % img_id,
793 },
794 )
795 # FIXME: img_id is not a sha256 id, it might just be "debian:8"
796 # img_entity = document.entity("nih:sha-256;%s" % img_id,
797 # {"prov:label": "Container image %s" % img_id} )
798 # The image is the plan for this activity-agent association
799 # document.wasAssociatedWith(process_run_ID, container_agent, img_entity)
800 self.prov_obj.document.wasAssociatedWith(
801 runtimeContext.process_run_id, container_agent
802 )
803 except Exception as err:
804 container = "Singularity" if runtimeContext.singularity else "Docker"
805 _logger.debug("%s error", container, exc_info=True)
806 if docker_is_req:
807 raise UnsupportedRequirement(
808 "{} is required to run this tool: {}".format(
809 container, str(err)
810 )
811 ) from err
812 else:
813 raise WorkflowException(
814 "{0} is not available for this tool, try "
815 "--no-container to disable {0}, or install "
816 "a user space Docker replacement like uDocker with "
817 "--user-space-docker-cmd.: {1}".format(container, err)
818 )
819
820 self._setup(runtimeContext)
821 (runtime, cidfile) = self.create_runtime(env, runtimeContext)
822 runtime.append(str(img_id))
823 monitor_function = None
824 if cidfile:
825 monitor_function = functools.partial(
826 self.docker_monitor,
827 cidfile,
828 runtimeContext.tmpdir_prefix,
829 not bool(runtimeContext.cidfile_dir),
830 )
831 elif runtimeContext.user_space_docker_cmd:
832 monitor_function = functools.partial(self.process_monitor)
833 self._execute(runtime, env, runtimeContext, monitor_function)
834
835 def docker_monitor(
836 self,
837 cidfile: str,
838 tmpdir_prefix: str,
839 cleanup_cidfile: bool,
840 process, # type: subprocess.Popen[str]
841 ) -> None:
842 """Record memory usage of the running Docker container."""
843 # Todo: consider switching to `docker create` / `docker start`
844 # instead of `docker run` as `docker create` outputs the container ID
845 # to stdout, but the container is frozen, thus allowing us to start the
846 # monitoring process without dealing with the cidfile or too-fast
847 # container execution
848 cid = None # type: Optional[str]
849 while cid is None:
850 time.sleep(1)
851 if process.returncode is not None:
852 if cleanup_cidfile:
853 try:
854 os.remove(cidfile)
855 except OSError as exc:
856 _logger.warn(
857 "Ignored error cleaning up Docker cidfile: %s", exc
858 )
859 return
860 try:
861 with open(cidfile) as cidhandle:
862 cid = cidhandle.readline().strip()
863 except (OSError):
864 cid = None
865 max_mem = psutil.virtual_memory().total
866 tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix)
867 stats_file = tempfile.NamedTemporaryFile(prefix=tmp_prefix, dir=tmp_dir)
868 stats_file_name = stats_file.name
869 try:
870 with open(stats_file_name, mode="w") as stats_file_handle:
871 stats_proc = subprocess.Popen( # nosec
872 ["docker", "stats", "--no-trunc", "--format", "{{.MemPerc}}", cid],
873 stdout=stats_file_handle,
874 stderr=subprocess.DEVNULL,
875 )
876 process.wait()
877 stats_proc.kill()
878 except OSError as exc:
879 _logger.warn("Ignored error with docker stats: %s", exc)
880 return
881 max_mem_percent = 0 # type: float
882 mem_percent = 0 # type: float
883 with open(stats_file_name, mode="r") as stats:
884 while True:
885 line = stats.readline()
886 if not line:
887 break
888 try:
889 mem_percent = float(
890 re.sub(CONTROL_CODE_RE, "", line).replace("%", "")
891 )
892 if mem_percent > max_mem_percent:
893 max_mem_percent = mem_percent
894 except ValueError:
895 break
896 _logger.info(
897 "[job %s] Max memory used: %iMiB",
898 self.name,
899 int((max_mem_percent / 100 * max_mem) / (2 ** 20)),
900 )
901 if cleanup_cidfile:
902 os.remove(cidfile)
903
904
905 def _job_popen(
906 commands: List[str],
907 stdin_path: Optional[str],
908 stdout_path: Optional[str],
909 stderr_path: Optional[str],
910 env: MutableMapping[str, str],
911 cwd: str,
912 make_job_dir: Callable[[], str],
913 job_script_contents: Optional[str] = None,
914 timelimit: Optional[int] = None,
915 name: Optional[str] = None,
916 monitor_function=None, # type: Optional[Callable[[subprocess.Popen[str]], None]]
917 default_stdout=None, # type: Optional[Union[IO[bytes], TextIO]]
918 default_stderr=None, # type: Optional[Union[IO[bytes], TextIO]]
919 ) -> int:
920
921 if job_script_contents is None and not FORCE_SHELLED_POPEN:
922
923 stdin = subprocess.PIPE # type: Union[IO[bytes], int]
924 if stdin_path is not None:
925 stdin = open(stdin_path, "rb")
926
927 stdout = (
928 default_stdout if default_stdout is not None else sys.stderr
929 ) # type: Union[IO[bytes], TextIO]
930 if stdout_path is not None:
931 stdout = open(stdout_path, "wb")
932
933 stderr = (
934 default_stderr if default_stderr is not None else sys.stderr
935 ) # type: Union[IO[bytes], TextIO]
936 if stderr_path is not None:
937 stderr = open(stderr_path, "wb")
938
939 sproc = subprocess.Popen(
940 commands,
941 shell=False, # nosec
942 close_fds=not onWindows(),
943 stdin=stdin,
944 stdout=stdout,
945 stderr=stderr,
946 env=env,
947 cwd=cwd,
948 universal_newlines=True,
949 )
950 processes_to_kill.append(sproc)
951
952 if sproc.stdin is not None:
953 sproc.stdin.close()
954
955 tm = None
956 if timelimit is not None and timelimit > 0:
957
958 def terminate(): # type: () -> None
959 try:
960 _logger.warning(
961 "[job %s] exceeded time limit of %d seconds and will be terminated",
962 name,
963 timelimit,
964 )
965 sproc.terminate()
966 except OSError:
967 pass
968
969 tm = Timer(timelimit, terminate)
970 tm.daemon = True
971 tm.start()
972 if monitor_function:
973 monitor_function(sproc)
974 rcode = sproc.wait()
975
976 if tm is not None:
977 tm.cancel()
978
979 if isinstance(stdin, IOBase) and hasattr(stdin, "close"):
980 stdin.close()
981
982 if stdout is not sys.stderr and hasattr(stdout, "close"):
983 stdout.close()
984
985 if stderr is not sys.stderr and hasattr(stderr, "close"):
986 stderr.close()
987
988 return rcode
989 else:
990 if job_script_contents is None:
991 job_script_contents = SHELL_COMMAND_TEMPLATE
992
993 env_copy = {}
994 key = None # type: Optional[str]
995 for key in env:
996 env_copy[key] = env[key]
997
998 job_description = {
999 "commands": commands,
1000 "cwd": cwd,
1001 "env": env_copy,
1002 "stdout_path": stdout_path,
1003 "stderr_path": stderr_path,
1004 "stdin_path": stdin_path,
1005 }
1006
1007 job_dir = make_job_dir()
1008 try:
1009 with open(
1010 os.path.join(job_dir, "job.json"), mode="w", encoding="utf-8"
1011 ) as job_file:
1012 json_dump(job_description, job_file, ensure_ascii=False)
1013 job_script = os.path.join(job_dir, "run_job.bash")
1014 with open(job_script, "wb") as _:
1015 _.write(job_script_contents.encode("utf-8"))
1016 job_run = os.path.join(job_dir, "run_job.py")
1017 with open(job_run, "wb") as _:
1018 _.write(PYTHON_RUN_SCRIPT.encode("utf-8"))
1019 sproc = subprocess.Popen( # nosec
1020 ["bash", job_script.encode("utf-8")],
1021 shell=False, # nosec
1022 cwd=job_dir,
1023 # The nested script will output the paths to the correct files if they need
1024 # to be captured. Else just write everything to stderr (same as above).
1025 stdout=sys.stderr,
1026 stderr=sys.stderr,
1027 stdin=subprocess.PIPE,
1028 universal_newlines=True,
1029 )
1030 processes_to_kill.append(sproc)
1031 if sproc.stdin is not None:
1032 sproc.stdin.close()
1033
1034 rcode = sproc.wait()
1035
1036 return rcode
1037 finally:
1038 shutil.rmtree(job_dir)