Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/job.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import datetime | |
2 import functools | |
3 import itertools | |
4 import logging | |
5 import os | |
6 import re | |
7 import shutil | |
8 import subprocess # nosec | |
9 import sys | |
10 import tempfile | |
11 import threading | |
12 import time | |
13 import uuid | |
14 from abc import ABCMeta, abstractmethod | |
15 from io import IOBase | |
16 from threading import Timer | |
17 from typing import ( | |
18 IO, | |
19 Callable, | |
20 Iterable, | |
21 List, | |
22 Match, | |
23 MutableMapping, | |
24 MutableSequence, | |
25 Optional, | |
26 TextIO, | |
27 Tuple, | |
28 Union, | |
29 cast, | |
30 ) | |
31 | |
32 import psutil | |
33 import shellescape | |
34 from prov.model import PROV | |
35 from schema_salad.sourceline import SourceLine | |
36 from schema_salad.utils import json_dump, json_dumps | |
37 from typing_extensions import TYPE_CHECKING | |
38 | |
39 from .builder import Builder, HasReqsHints | |
40 from .context import RuntimeContext | |
41 from .errors import UnsupportedRequirement, WorkflowException | |
42 from .loghandler import _logger | |
43 from .pathmapper import MapperEnt, PathMapper | |
44 from .process import stage_files | |
45 from .secrets import SecretStore | |
46 from .utils import ( | |
47 CWLObjectType, | |
48 CWLOutputType, | |
49 DirectoryType, | |
50 OutputCallbackType, | |
51 bytes2str_in_dicts, | |
52 copytree_with_merge, | |
53 create_tmp_dir, | |
54 ensure_non_writable, | |
55 ensure_writable, | |
56 onWindows, | |
57 processes_to_kill, | |
58 ) | |
59 | |
60 if TYPE_CHECKING: | |
61 from .provenance_profile import ProvenanceProfile # pylint: disable=unused-import | |
62 needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""") | |
63 | |
64 FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1" | |
65 | |
66 SHELL_COMMAND_TEMPLATE = """#!/bin/bash | |
67 python "run_job.py" "job.json" | |
68 """ | |
69 | |
70 PYTHON_RUN_SCRIPT = """ | |
71 import json | |
72 import os | |
73 import sys | |
74 if os.name == 'posix': | |
75 try: | |
76 import subprocess32 as subprocess # type: ignore | |
77 except Exception: | |
78 import subprocess | |
79 else: | |
80 import subprocess # type: ignore | |
81 | |
82 with open(sys.argv[1], "r") as f: | |
83 popen_description = json.load(f) | |
84 commands = popen_description["commands"] | |
85 cwd = popen_description["cwd"] | |
86 env = popen_description["env"] | |
87 env["PATH"] = os.environ.get("PATH") | |
88 stdin_path = popen_description["stdin_path"] | |
89 stdout_path = popen_description["stdout_path"] | |
90 stderr_path = popen_description["stderr_path"] | |
91 if stdin_path is not None: | |
92 stdin = open(stdin_path, "rb") | |
93 else: | |
94 stdin = subprocess.PIPE | |
95 if stdout_path is not None: | |
96 stdout = open(stdout_path, "wb") | |
97 else: | |
98 stdout = sys.stderr | |
99 if stderr_path is not None: | |
100 stderr = open(stderr_path, "wb") | |
101 else: | |
102 stderr = sys.stderr | |
103 if os.name == 'nt': | |
104 close_fds = False | |
105 for key, value in env.items(): | |
106 env[key] = str(value) | |
107 else: | |
108 close_fds = True | |
109 sp = subprocess.Popen(commands, | |
110 shell=False, | |
111 close_fds=close_fds, | |
112 stdin=stdin, | |
113 stdout=stdout, | |
114 stderr=stderr, | |
115 env=env, | |
116 cwd=cwd) | |
117 if sp.stdin: | |
118 sp.stdin.close() | |
119 rcode = sp.wait() | |
120 if stdin is not subprocess.PIPE: | |
121 stdin.close() | |
122 if stdout is not sys.stderr: | |
123 stdout.close() | |
124 if stderr is not sys.stderr: | |
125 stderr.close() | |
126 sys.exit(rcode) | |
127 """ | |
128 | |
129 | |
130 def relink_initialworkdir( | |
131 pathmapper: PathMapper, | |
132 host_outdir: str, | |
133 container_outdir: str, | |
134 inplace_update: bool = False, | |
135 ) -> None: | |
136 for _, vol in pathmapper.items(): | |
137 if not vol.staged: | |
138 continue | |
139 | |
140 if vol.type in ("File", "Directory") or ( | |
141 inplace_update and vol.type in ("WritableFile", "WritableDirectory") | |
142 ): | |
143 if not vol.target.startswith(container_outdir): | |
144 # this is an input file written outside of the working | |
145 # directory, so therefor ineligable for being an output file. | |
146 # Thus, none of our business | |
147 continue | |
148 host_outdir_tgt = os.path.join( | |
149 host_outdir, vol.target[len(container_outdir) + 1 :] | |
150 ) | |
151 if os.path.islink(host_outdir_tgt) or os.path.isfile(host_outdir_tgt): | |
152 try: | |
153 os.remove(host_outdir_tgt) | |
154 except PermissionError: | |
155 pass | |
156 elif os.path.isdir(host_outdir_tgt) and not vol.resolved.startswith("_:"): | |
157 shutil.rmtree(host_outdir_tgt) | |
158 if onWindows(): | |
159 # If this becomes a big issue for someone then we could | |
160 # refactor the code to process output from a running container | |
161 # and avoid all the extra IO below | |
162 if vol.type in ("File", "WritableFile"): | |
163 shutil.copy(vol.resolved, host_outdir_tgt) | |
164 elif vol.type in ("Directory", "WritableDirectory"): | |
165 copytree_with_merge(vol.resolved, host_outdir_tgt) | |
166 elif not vol.resolved.startswith("_:"): | |
167 try: | |
168 os.symlink(vol.resolved, host_outdir_tgt) | |
169 except FileExistsError: | |
170 pass | |
171 | |
172 | |
173 def neverquote(string: str, pos: int = 0, endpos: int = 0) -> Optional[Match[str]]: | |
174 return None | |
175 | |
176 | |
177 CollectOutputsType = Union[Callable[[str, int], CWLObjectType], functools.partial] | |
178 | |
179 | |
180 class JobBase(HasReqsHints, metaclass=ABCMeta): | |
181 def __init__( | |
182 self, | |
183 builder: Builder, | |
184 joborder: CWLObjectType, | |
185 make_path_mapper: Callable[..., PathMapper], | |
186 requirements: List[CWLObjectType], | |
187 hints: List[CWLObjectType], | |
188 name: str, | |
189 ) -> None: | |
190 """Initialize the job object.""" | |
191 super().__init__() | |
192 self.builder = builder | |
193 self.joborder = joborder | |
194 self.stdin = None # type: Optional[str] | |
195 self.stderr = None # type: Optional[str] | |
196 self.stdout = None # type: Optional[str] | |
197 self.successCodes = [] # type: Iterable[int] | |
198 self.temporaryFailCodes = [] # type: Iterable[int] | |
199 self.permanentFailCodes = [] # type: Iterable[int] | |
200 self.requirements = requirements | |
201 self.hints = hints | |
202 self.name = name | |
203 self.command_line = [] # type: List[str] | |
204 self.pathmapper = PathMapper([], "", "") | |
205 self.make_path_mapper = make_path_mapper | |
206 self.generatemapper = None # type: Optional[PathMapper] | |
207 | |
208 # set in CommandLineTool.job(i) | |
209 self.collect_outputs = cast(CollectOutputsType, None) | |
210 self.output_callback = None # type: Optional[OutputCallbackType] | |
211 self.outdir = "" | |
212 self.tmpdir = "" | |
213 | |
214 self.environment = {} # type: MutableMapping[str, str] | |
215 self.generatefiles = { | |
216 "class": "Directory", | |
217 "listing": [], | |
218 "basename": "", | |
219 } # type: DirectoryType | |
220 self.stagedir = None # type: Optional[str] | |
221 self.inplace_update = False | |
222 self.prov_obj = None # type: Optional[ProvenanceProfile] | |
223 self.parent_wf = None # type: Optional[ProvenanceProfile] | |
224 self.timelimit = None # type: Optional[int] | |
225 self.networkaccess = False # type: bool | |
226 self.mpi_procs = None # type: Optional[int] | |
227 | |
228 def __repr__(self): # type: () -> str | |
229 """Represent this Job object.""" | |
230 return "CommandLineJob(%s)" % self.name | |
231 | |
232 @abstractmethod | |
233 def run( | |
234 self, | |
235 runtimeContext: RuntimeContext, | |
236 tmpdir_lock: Optional[threading.Lock] = None, | |
237 ) -> None: | |
238 pass | |
239 | |
240 def _setup(self, runtimeContext: RuntimeContext) -> None: | |
241 if not os.path.exists(self.outdir): | |
242 os.makedirs(self.outdir) | |
243 | |
244 for knownfile in self.pathmapper.files(): | |
245 p = self.pathmapper.mapper(knownfile) | |
246 if p.type == "File" and not os.path.isfile(p[0]) and p.staged: | |
247 raise WorkflowException( | |
248 "Input file %s (at %s) not found or is not a regular " | |
249 "file." % (knownfile, self.pathmapper.mapper(knownfile)[0]) | |
250 ) | |
251 | |
252 if "listing" in self.generatefiles: | |
253 runtimeContext = runtimeContext.copy() | |
254 runtimeContext.outdir = self.outdir | |
255 self.generatemapper = self.make_path_mapper( | |
256 self.generatefiles["listing"], | |
257 self.builder.outdir, | |
258 runtimeContext, | |
259 False, | |
260 ) | |
261 if _logger.isEnabledFor(logging.DEBUG): | |
262 _logger.debug( | |
263 "[job %s] initial work dir %s", | |
264 self.name, | |
265 json_dumps( | |
266 { | |
267 p: self.generatemapper.mapper(p) | |
268 for p in self.generatemapper.files() | |
269 }, | |
270 indent=4, | |
271 ), | |
272 ) | |
273 | |
274 def _execute( | |
275 self, | |
276 runtime: List[str], | |
277 env: MutableMapping[str, str], | |
278 runtimeContext: RuntimeContext, | |
279 monitor_function=None, # type: Optional[Callable[[subprocess.Popen[str]], None]] | |
280 ) -> None: | |
281 | |
282 scr = self.get_requirement("ShellCommandRequirement")[0] | |
283 | |
284 shouldquote = needs_shell_quoting_re.search | |
285 if scr is not None: | |
286 shouldquote = neverquote | |
287 | |
288 # If mpi_procs (is not None and > 0) then prepend the | |
289 # appropriate MPI job launch command and flags before the | |
290 # execution. | |
291 if self.mpi_procs: | |
292 menv = runtimeContext.mpi_config | |
293 mpi_runtime = [ | |
294 menv.runner, | |
295 menv.nproc_flag, | |
296 str(self.mpi_procs), | |
297 ] + menv.extra_flags | |
298 runtime = mpi_runtime + runtime | |
299 menv.pass_through_env_vars(env) | |
300 menv.set_env_vars(env) | |
301 | |
302 _logger.info( | |
303 "[job %s] %s$ %s%s%s%s", | |
304 self.name, | |
305 self.outdir, | |
306 " \\\n ".join( | |
307 [ | |
308 shellescape.quote(str(arg)) if shouldquote(str(arg)) else str(arg) | |
309 for arg in (runtime + self.command_line) | |
310 ] | |
311 ), | |
312 " < %s" % self.stdin if self.stdin else "", | |
313 " > %s" % os.path.join(self.outdir, self.stdout) if self.stdout else "", | |
314 " 2> %s" % os.path.join(self.outdir, self.stderr) if self.stderr else "", | |
315 ) | |
316 if self.joborder is not None and runtimeContext.research_obj is not None: | |
317 job_order = self.joborder | |
318 if ( | |
319 runtimeContext.process_run_id is not None | |
320 and runtimeContext.prov_obj is not None | |
321 and isinstance(job_order, (list, dict)) | |
322 ): | |
323 runtimeContext.prov_obj.used_artefacts( | |
324 job_order, runtimeContext.process_run_id, str(self.name) | |
325 ) | |
326 else: | |
327 _logger.warning( | |
328 "research_obj set but one of process_run_id " | |
329 "or prov_obj is missing from runtimeContext: " | |
330 "{}".format(runtimeContext) | |
331 ) | |
332 outputs = {} # type: CWLObjectType | |
333 try: | |
334 stdin_path = None | |
335 if self.stdin is not None: | |
336 rmap = self.pathmapper.reversemap(self.stdin) | |
337 if rmap is None: | |
338 raise WorkflowException(f"{self.stdin} missing from pathmapper") | |
339 else: | |
340 stdin_path = rmap[1] | |
341 | |
342 stderr_path = None | |
343 if self.stderr is not None: | |
344 abserr = os.path.join(self.outdir, self.stderr) | |
345 dnerr = os.path.dirname(abserr) | |
346 if dnerr and not os.path.exists(dnerr): | |
347 os.makedirs(dnerr) | |
348 stderr_path = abserr | |
349 | |
350 stdout_path = None | |
351 if self.stdout is not None: | |
352 absout = os.path.join(self.outdir, self.stdout) | |
353 dnout = os.path.dirname(absout) | |
354 if dnout and not os.path.exists(dnout): | |
355 os.makedirs(dnout) | |
356 stdout_path = absout | |
357 | |
358 commands = [str(x) for x in runtime + self.command_line] | |
359 if runtimeContext.secret_store is not None: | |
360 commands = cast( | |
361 List[str], | |
362 runtimeContext.secret_store.retrieve(cast(CWLOutputType, commands)), | |
363 ) | |
364 env = cast( | |
365 MutableMapping[str, str], | |
366 runtimeContext.secret_store.retrieve(cast(CWLOutputType, env)), | |
367 ) | |
368 | |
369 job_script_contents = None # type: Optional[str] | |
370 builder = getattr(self, "builder", None) # type: Builder | |
371 if builder is not None: | |
372 job_script_contents = builder.build_job_script(commands) | |
373 rcode = _job_popen( | |
374 commands, | |
375 stdin_path=stdin_path, | |
376 stdout_path=stdout_path, | |
377 stderr_path=stderr_path, | |
378 env=env, | |
379 cwd=self.outdir, | |
380 make_job_dir=lambda: runtimeContext.create_outdir(), | |
381 job_script_contents=job_script_contents, | |
382 timelimit=self.timelimit, | |
383 name=self.name, | |
384 monitor_function=monitor_function, | |
385 default_stdout=runtimeContext.default_stdout, | |
386 default_stderr=runtimeContext.default_stderr, | |
387 ) | |
388 | |
389 if rcode in self.successCodes: | |
390 processStatus = "success" | |
391 elif rcode in self.temporaryFailCodes: | |
392 processStatus = "temporaryFail" | |
393 elif rcode in self.permanentFailCodes: | |
394 processStatus = "permanentFail" | |
395 elif rcode == 0: | |
396 processStatus = "success" | |
397 else: | |
398 processStatus = "permanentFail" | |
399 | |
400 if "listing" in self.generatefiles: | |
401 if self.generatemapper: | |
402 relink_initialworkdir( | |
403 self.generatemapper, | |
404 self.outdir, | |
405 self.builder.outdir, | |
406 inplace_update=self.inplace_update, | |
407 ) | |
408 else: | |
409 raise ValueError( | |
410 "'listing' in self.generatefiles but no " | |
411 "generatemapper was setup." | |
412 ) | |
413 | |
414 outputs = self.collect_outputs(self.outdir, rcode) | |
415 outputs = bytes2str_in_dicts(outputs) # type: ignore | |
416 except OSError as e: | |
417 if e.errno == 2: | |
418 if runtime: | |
419 _logger.error("'%s' not found: %s", runtime[0], str(e)) | |
420 else: | |
421 _logger.error("'%s' not found: %s", self.command_line[0], str(e)) | |
422 else: | |
423 _logger.exception("Exception while running job") | |
424 processStatus = "permanentFail" | |
425 except WorkflowException as err: | |
426 _logger.error("[job %s] Job error:\n%s", self.name, str(err)) | |
427 processStatus = "permanentFail" | |
428 except Exception: | |
429 _logger.exception("Exception while running job") | |
430 processStatus = "permanentFail" | |
431 if ( | |
432 runtimeContext.research_obj is not None | |
433 and self.prov_obj is not None | |
434 and runtimeContext.process_run_id is not None | |
435 ): | |
436 # creating entities for the outputs produced by each step (in the provenance document) | |
437 self.prov_obj.record_process_end( | |
438 str(self.name), | |
439 runtimeContext.process_run_id, | |
440 outputs, | |
441 datetime.datetime.now(), | |
442 ) | |
443 if processStatus != "success": | |
444 _logger.warning("[job %s] completed %s", self.name, processStatus) | |
445 else: | |
446 _logger.info("[job %s] completed %s", self.name, processStatus) | |
447 | |
448 if _logger.isEnabledFor(logging.DEBUG): | |
449 _logger.debug( | |
450 "[job %s] outputs %s", self.name, json_dumps(outputs, indent=4) | |
451 ) | |
452 | |
453 if self.generatemapper is not None and runtimeContext.secret_store is not None: | |
454 # Delete any runtime-generated files containing secrets. | |
455 for _, p in self.generatemapper.items(): | |
456 if p.type == "CreateFile": | |
457 if runtimeContext.secret_store.has_secret(p.resolved): | |
458 host_outdir = self.outdir | |
459 container_outdir = self.builder.outdir | |
460 host_outdir_tgt = p.target | |
461 if p.target.startswith(container_outdir + "/"): | |
462 host_outdir_tgt = os.path.join( | |
463 host_outdir, p.target[len(container_outdir) + 1 :] | |
464 ) | |
465 os.remove(host_outdir_tgt) | |
466 | |
467 if runtimeContext.workflow_eval_lock is None: | |
468 raise WorkflowException( | |
469 "runtimeContext.workflow_eval_lock must not be None" | |
470 ) | |
471 | |
472 if self.output_callback: | |
473 with runtimeContext.workflow_eval_lock: | |
474 self.output_callback(outputs, processStatus) | |
475 | |
476 if self.stagedir is not None and os.path.exists(self.stagedir): | |
477 _logger.debug( | |
478 "[job %s] Removing input staging directory %s", | |
479 self.name, | |
480 self.stagedir, | |
481 ) | |
482 shutil.rmtree(self.stagedir, True) | |
483 | |
484 if runtimeContext.rm_tmpdir: | |
485 _logger.debug( | |
486 "[job %s] Removing temporary directory %s", self.name, self.tmpdir | |
487 ) | |
488 shutil.rmtree(self.tmpdir, True) | |
489 | |
490 def process_monitor(self, sproc): # type: (subprocess.Popen[str]) -> None | |
491 monitor = psutil.Process(sproc.pid) | |
492 # Value must be list rather than integer to utilise pass-by-reference in python | |
493 memory_usage = [None] # type: MutableSequence[Optional[int]] | |
494 | |
495 def get_tree_mem_usage(memory_usage: MutableSequence[Optional[int]]) -> None: | |
496 children = monitor.children() | |
497 rss = monitor.memory_info().rss | |
498 while len(children): | |
499 rss += sum([process.memory_info().rss for process in children]) | |
500 children = list( | |
501 itertools.chain(*[process.children() for process in children]) | |
502 ) | |
503 if memory_usage[0] is None or rss > memory_usage[0]: | |
504 memory_usage[0] = rss | |
505 | |
506 mem_tm = Timer(interval=1, function=get_tree_mem_usage, args=(memory_usage,)) | |
507 mem_tm.daemon = True | |
508 mem_tm.start() | |
509 sproc.wait() | |
510 mem_tm.cancel() | |
511 if memory_usage[0] is not None: | |
512 _logger.info( | |
513 "[job %s] Max memory used: %iMiB", | |
514 self.name, | |
515 round(memory_usage[0] / (2 ** 20)), | |
516 ) | |
517 else: | |
518 _logger.debug( | |
519 "Could not collect memory usage, job ended before monitoring began." | |
520 ) | |
521 | |
522 | |
523 class CommandLineJob(JobBase): | |
524 def run( | |
525 self, | |
526 runtimeContext: RuntimeContext, | |
527 tmpdir_lock: Optional[threading.Lock] = None, | |
528 ) -> None: | |
529 | |
530 if tmpdir_lock: | |
531 with tmpdir_lock: | |
532 if not os.path.exists(self.tmpdir): | |
533 os.makedirs(self.tmpdir) | |
534 else: | |
535 if not os.path.exists(self.tmpdir): | |
536 os.makedirs(self.tmpdir) | |
537 | |
538 self._setup(runtimeContext) | |
539 | |
540 env = self.environment | |
541 vars_to_preserve = runtimeContext.preserve_environment | |
542 if runtimeContext.preserve_entire_environment is not False: | |
543 vars_to_preserve = os.environ | |
544 if vars_to_preserve: | |
545 for key, value in os.environ.items(): | |
546 if key in vars_to_preserve and key not in env: | |
547 # On Windows, subprocess env can't handle unicode. | |
548 env[key] = str(value) if onWindows() else value | |
549 env["HOME"] = str(self.outdir) if onWindows() else self.outdir | |
550 env["TMPDIR"] = str(self.tmpdir) if onWindows() else self.tmpdir | |
551 if "PATH" not in env: | |
552 env["PATH"] = str(os.environ["PATH"]) if onWindows() else os.environ["PATH"] | |
553 if "SYSTEMROOT" not in env and "SYSTEMROOT" in os.environ: | |
554 env["SYSTEMROOT"] = ( | |
555 str(os.environ["SYSTEMROOT"]) | |
556 if onWindows() | |
557 else os.environ["SYSTEMROOT"] | |
558 ) | |
559 | |
560 stage_files( | |
561 self.pathmapper, | |
562 ignore_writable=True, | |
563 symlink=True, | |
564 secret_store=runtimeContext.secret_store, | |
565 ) | |
566 if self.generatemapper is not None: | |
567 stage_files( | |
568 self.generatemapper, | |
569 ignore_writable=self.inplace_update, | |
570 symlink=True, | |
571 secret_store=runtimeContext.secret_store, | |
572 ) | |
573 relink_initialworkdir( | |
574 self.generatemapper, | |
575 self.outdir, | |
576 self.builder.outdir, | |
577 inplace_update=self.inplace_update, | |
578 ) | |
579 | |
580 monitor_function = functools.partial(self.process_monitor) | |
581 | |
582 self._execute([], env, runtimeContext, monitor_function) | |
583 | |
584 | |
585 CONTROL_CODE_RE = r"\x1b\[[0-9;]*[a-zA-Z]" | |
586 | |
587 | |
588 class ContainerCommandLineJob(JobBase, metaclass=ABCMeta): | |
589 """Commandline job using containers.""" | |
590 | |
591 @abstractmethod | |
592 def get_from_requirements( | |
593 self, | |
594 r: CWLObjectType, | |
595 pull_image: bool, | |
596 force_pull: bool, | |
597 tmp_outdir_prefix: str, | |
598 ) -> Optional[str]: | |
599 pass | |
600 | |
601 @abstractmethod | |
602 def create_runtime( | |
603 self, | |
604 env: MutableMapping[str, str], | |
605 runtime_context: RuntimeContext, | |
606 ) -> Tuple[List[str], Optional[str]]: | |
607 """Return the list of commands to run the selected container engine.""" | |
608 | |
609 @staticmethod | |
610 @abstractmethod | |
611 def append_volume( | |
612 runtime: List[str], source: str, target: str, writable: bool = False | |
613 ) -> None: | |
614 """Add binding arguments to the runtime list.""" | |
615 | |
616 @abstractmethod | |
617 def add_file_or_directory_volume( | |
618 self, runtime: List[str], volume: MapperEnt, host_outdir_tgt: Optional[str] | |
619 ) -> None: | |
620 """Append volume a file/dir mapping to the runtime option list.""" | |
621 | |
622 @abstractmethod | |
623 def add_writable_file_volume( | |
624 self, | |
625 runtime: List[str], | |
626 volume: MapperEnt, | |
627 host_outdir_tgt: Optional[str], | |
628 tmpdir_prefix: str, | |
629 ) -> None: | |
630 """Append a writable file mapping to the runtime option list.""" | |
631 | |
632 @abstractmethod | |
633 def add_writable_directory_volume( | |
634 self, | |
635 runtime: List[str], | |
636 volume: MapperEnt, | |
637 host_outdir_tgt: Optional[str], | |
638 tmpdir_prefix: str, | |
639 ) -> None: | |
640 """Append a writable directory mapping to the runtime option list.""" | |
641 | |
642 def create_file_and_add_volume( | |
643 self, | |
644 runtime: List[str], | |
645 volume: MapperEnt, | |
646 host_outdir_tgt: Optional[str], | |
647 secret_store: Optional[SecretStore], | |
648 tmpdir_prefix: str, | |
649 ) -> str: | |
650 """Create the file and add a mapping.""" | |
651 if not host_outdir_tgt: | |
652 new_file = os.path.join( | |
653 create_tmp_dir(tmpdir_prefix), | |
654 os.path.basename(volume.target), | |
655 ) | |
656 writable = True if volume.type == "CreateWritableFile" else False | |
657 contents = volume.resolved | |
658 if secret_store: | |
659 contents = cast(str, secret_store.retrieve(volume.resolved)) | |
660 dirname = os.path.dirname(host_outdir_tgt or new_file) | |
661 if not os.path.exists(dirname): | |
662 os.makedirs(dirname) | |
663 with open(host_outdir_tgt or new_file, "wb") as file_literal: | |
664 file_literal.write(contents.encode("utf-8")) | |
665 if not host_outdir_tgt: | |
666 self.append_volume(runtime, new_file, volume.target, writable=writable) | |
667 if writable: | |
668 ensure_writable(host_outdir_tgt or new_file) | |
669 else: | |
670 ensure_non_writable(host_outdir_tgt or new_file) | |
671 return host_outdir_tgt or new_file | |
672 | |
673 def add_volumes( | |
674 self, | |
675 pathmapper: PathMapper, | |
676 runtime: List[str], | |
677 tmpdir_prefix: str, | |
678 secret_store: Optional[SecretStore] = None, | |
679 any_path_okay: bool = False, | |
680 ) -> None: | |
681 """Append volume mappings to the runtime option list.""" | |
682 container_outdir = self.builder.outdir | |
683 for key, vol in (itm for itm in pathmapper.items() if itm[1].staged): | |
684 host_outdir_tgt = None # type: Optional[str] | |
685 if vol.target.startswith(container_outdir + "/"): | |
686 host_outdir_tgt = os.path.join( | |
687 self.outdir, vol.target[len(container_outdir) + 1 :] | |
688 ) | |
689 if not host_outdir_tgt and not any_path_okay: | |
690 raise WorkflowException( | |
691 "No mandatory DockerRequirement, yet path is outside " | |
692 "the designated output directory, also know as " | |
693 "$(runtime.outdir): {}".format(vol) | |
694 ) | |
695 if vol.type in ("File", "Directory"): | |
696 self.add_file_or_directory_volume(runtime, vol, host_outdir_tgt) | |
697 elif vol.type == "WritableFile": | |
698 self.add_writable_file_volume( | |
699 runtime, vol, host_outdir_tgt, tmpdir_prefix | |
700 ) | |
701 elif vol.type == "WritableDirectory": | |
702 self.add_writable_directory_volume( | |
703 runtime, vol, host_outdir_tgt, tmpdir_prefix | |
704 ) | |
705 elif vol.type in ["CreateFile", "CreateWritableFile"]: | |
706 new_path = self.create_file_and_add_volume( | |
707 runtime, vol, host_outdir_tgt, secret_store, tmpdir_prefix | |
708 ) | |
709 pathmapper.update(key, new_path, vol.target, vol.type, vol.staged) | |
710 | |
711 def run( | |
712 self, | |
713 runtimeContext: RuntimeContext, | |
714 tmpdir_lock: Optional[threading.Lock] = None, | |
715 ) -> None: | |
716 if tmpdir_lock: | |
717 with tmpdir_lock: | |
718 if not os.path.exists(self.tmpdir): | |
719 os.makedirs(self.tmpdir) | |
720 else: | |
721 if not os.path.exists(self.tmpdir): | |
722 os.makedirs(self.tmpdir) | |
723 | |
724 (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") | |
725 self.prov_obj = runtimeContext.prov_obj | |
726 img_id = None | |
727 env = cast(MutableMapping[str, str], os.environ) | |
728 user_space_docker_cmd = runtimeContext.user_space_docker_cmd | |
729 if docker_req is not None and user_space_docker_cmd: | |
730 # For user-space docker implementations, a local image name or ID | |
731 # takes precedence over a network pull | |
732 if "dockerImageId" in docker_req: | |
733 img_id = str(docker_req["dockerImageId"]) | |
734 elif "dockerPull" in docker_req: | |
735 img_id = str(docker_req["dockerPull"]) | |
736 cmd = [user_space_docker_cmd, "pull", img_id] | |
737 _logger.info(str(cmd)) | |
738 try: | |
739 subprocess.check_call(cmd, stdout=sys.stderr) # nosec | |
740 except OSError: | |
741 raise WorkflowException( | |
742 SourceLine(docker_req).makeError( | |
743 "Either Docker container {} is not available with " | |
744 "user space docker implementation {} or {} is missing " | |
745 "or broken.".format( | |
746 img_id, user_space_docker_cmd, user_space_docker_cmd | |
747 ) | |
748 ) | |
749 ) | |
750 else: | |
751 raise WorkflowException( | |
752 SourceLine(docker_req).makeError( | |
753 "Docker image must be specified as 'dockerImageId' or " | |
754 "'dockerPull' when using user space implementations of " | |
755 "Docker" | |
756 ) | |
757 ) | |
758 else: | |
759 try: | |
760 if docker_req is not None and runtimeContext.use_container: | |
761 img_id = str( | |
762 self.get_from_requirements( | |
763 docker_req, | |
764 runtimeContext.pull_image, | |
765 runtimeContext.force_docker_pull, | |
766 runtimeContext.tmp_outdir_prefix, | |
767 ) | |
768 ) | |
769 if img_id is None: | |
770 if self.builder.find_default_container: | |
771 default_container = self.builder.find_default_container() | |
772 if default_container: | |
773 img_id = str(default_container) | |
774 | |
775 if ( | |
776 docker_req is not None | |
777 and img_id is None | |
778 and runtimeContext.use_container | |
779 ): | |
780 raise Exception("Docker image not available") | |
781 | |
782 if ( | |
783 self.prov_obj is not None | |
784 and img_id is not None | |
785 and runtimeContext.process_run_id is not None | |
786 ): | |
787 container_agent = self.prov_obj.document.agent( | |
788 uuid.uuid4().urn, | |
789 { | |
790 "prov:type": PROV["SoftwareAgent"], | |
791 "cwlprov:image": img_id, | |
792 "prov:label": "Container execution of image %s" % img_id, | |
793 }, | |
794 ) | |
795 # FIXME: img_id is not a sha256 id, it might just be "debian:8" | |
796 # img_entity = document.entity("nih:sha-256;%s" % img_id, | |
797 # {"prov:label": "Container image %s" % img_id} ) | |
798 # The image is the plan for this activity-agent association | |
799 # document.wasAssociatedWith(process_run_ID, container_agent, img_entity) | |
800 self.prov_obj.document.wasAssociatedWith( | |
801 runtimeContext.process_run_id, container_agent | |
802 ) | |
803 except Exception as err: | |
804 container = "Singularity" if runtimeContext.singularity else "Docker" | |
805 _logger.debug("%s error", container, exc_info=True) | |
806 if docker_is_req: | |
807 raise UnsupportedRequirement( | |
808 "{} is required to run this tool: {}".format( | |
809 container, str(err) | |
810 ) | |
811 ) from err | |
812 else: | |
813 raise WorkflowException( | |
814 "{0} is not available for this tool, try " | |
815 "--no-container to disable {0}, or install " | |
816 "a user space Docker replacement like uDocker with " | |
817 "--user-space-docker-cmd.: {1}".format(container, err) | |
818 ) | |
819 | |
820 self._setup(runtimeContext) | |
821 (runtime, cidfile) = self.create_runtime(env, runtimeContext) | |
822 runtime.append(str(img_id)) | |
823 monitor_function = None | |
824 if cidfile: | |
825 monitor_function = functools.partial( | |
826 self.docker_monitor, | |
827 cidfile, | |
828 runtimeContext.tmpdir_prefix, | |
829 not bool(runtimeContext.cidfile_dir), | |
830 ) | |
831 elif runtimeContext.user_space_docker_cmd: | |
832 monitor_function = functools.partial(self.process_monitor) | |
833 self._execute(runtime, env, runtimeContext, monitor_function) | |
834 | |
835 def docker_monitor( | |
836 self, | |
837 cidfile: str, | |
838 tmpdir_prefix: str, | |
839 cleanup_cidfile: bool, | |
840 process, # type: subprocess.Popen[str] | |
841 ) -> None: | |
842 """Record memory usage of the running Docker container.""" | |
843 # Todo: consider switching to `docker create` / `docker start` | |
844 # instead of `docker run` as `docker create` outputs the container ID | |
845 # to stdout, but the container is frozen, thus allowing us to start the | |
846 # monitoring process without dealing with the cidfile or too-fast | |
847 # container execution | |
848 cid = None # type: Optional[str] | |
849 while cid is None: | |
850 time.sleep(1) | |
851 if process.returncode is not None: | |
852 if cleanup_cidfile: | |
853 try: | |
854 os.remove(cidfile) | |
855 except OSError as exc: | |
856 _logger.warn( | |
857 "Ignored error cleaning up Docker cidfile: %s", exc | |
858 ) | |
859 return | |
860 try: | |
861 with open(cidfile) as cidhandle: | |
862 cid = cidhandle.readline().strip() | |
863 except (OSError): | |
864 cid = None | |
865 max_mem = psutil.virtual_memory().total | |
866 tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix) | |
867 stats_file = tempfile.NamedTemporaryFile(prefix=tmp_prefix, dir=tmp_dir) | |
868 stats_file_name = stats_file.name | |
869 try: | |
870 with open(stats_file_name, mode="w") as stats_file_handle: | |
871 stats_proc = subprocess.Popen( # nosec | |
872 ["docker", "stats", "--no-trunc", "--format", "{{.MemPerc}}", cid], | |
873 stdout=stats_file_handle, | |
874 stderr=subprocess.DEVNULL, | |
875 ) | |
876 process.wait() | |
877 stats_proc.kill() | |
878 except OSError as exc: | |
879 _logger.warn("Ignored error with docker stats: %s", exc) | |
880 return | |
881 max_mem_percent = 0 # type: float | |
882 mem_percent = 0 # type: float | |
883 with open(stats_file_name, mode="r") as stats: | |
884 while True: | |
885 line = stats.readline() | |
886 if not line: | |
887 break | |
888 try: | |
889 mem_percent = float( | |
890 re.sub(CONTROL_CODE_RE, "", line).replace("%", "") | |
891 ) | |
892 if mem_percent > max_mem_percent: | |
893 max_mem_percent = mem_percent | |
894 except ValueError: | |
895 break | |
896 _logger.info( | |
897 "[job %s] Max memory used: %iMiB", | |
898 self.name, | |
899 int((max_mem_percent / 100 * max_mem) / (2 ** 20)), | |
900 ) | |
901 if cleanup_cidfile: | |
902 os.remove(cidfile) | |
903 | |
904 | |
905 def _job_popen( | |
906 commands: List[str], | |
907 stdin_path: Optional[str], | |
908 stdout_path: Optional[str], | |
909 stderr_path: Optional[str], | |
910 env: MutableMapping[str, str], | |
911 cwd: str, | |
912 make_job_dir: Callable[[], str], | |
913 job_script_contents: Optional[str] = None, | |
914 timelimit: Optional[int] = None, | |
915 name: Optional[str] = None, | |
916 monitor_function=None, # type: Optional[Callable[[subprocess.Popen[str]], None]] | |
917 default_stdout=None, # type: Optional[Union[IO[bytes], TextIO]] | |
918 default_stderr=None, # type: Optional[Union[IO[bytes], TextIO]] | |
919 ) -> int: | |
920 | |
921 if job_script_contents is None and not FORCE_SHELLED_POPEN: | |
922 | |
923 stdin = subprocess.PIPE # type: Union[IO[bytes], int] | |
924 if stdin_path is not None: | |
925 stdin = open(stdin_path, "rb") | |
926 | |
927 stdout = ( | |
928 default_stdout if default_stdout is not None else sys.stderr | |
929 ) # type: Union[IO[bytes], TextIO] | |
930 if stdout_path is not None: | |
931 stdout = open(stdout_path, "wb") | |
932 | |
933 stderr = ( | |
934 default_stderr if default_stderr is not None else sys.stderr | |
935 ) # type: Union[IO[bytes], TextIO] | |
936 if stderr_path is not None: | |
937 stderr = open(stderr_path, "wb") | |
938 | |
939 sproc = subprocess.Popen( | |
940 commands, | |
941 shell=False, # nosec | |
942 close_fds=not onWindows(), | |
943 stdin=stdin, | |
944 stdout=stdout, | |
945 stderr=stderr, | |
946 env=env, | |
947 cwd=cwd, | |
948 universal_newlines=True, | |
949 ) | |
950 processes_to_kill.append(sproc) | |
951 | |
952 if sproc.stdin is not None: | |
953 sproc.stdin.close() | |
954 | |
955 tm = None | |
956 if timelimit is not None and timelimit > 0: | |
957 | |
958 def terminate(): # type: () -> None | |
959 try: | |
960 _logger.warning( | |
961 "[job %s] exceeded time limit of %d seconds and will be terminated", | |
962 name, | |
963 timelimit, | |
964 ) | |
965 sproc.terminate() | |
966 except OSError: | |
967 pass | |
968 | |
969 tm = Timer(timelimit, terminate) | |
970 tm.daemon = True | |
971 tm.start() | |
972 if monitor_function: | |
973 monitor_function(sproc) | |
974 rcode = sproc.wait() | |
975 | |
976 if tm is not None: | |
977 tm.cancel() | |
978 | |
979 if isinstance(stdin, IOBase) and hasattr(stdin, "close"): | |
980 stdin.close() | |
981 | |
982 if stdout is not sys.stderr and hasattr(stdout, "close"): | |
983 stdout.close() | |
984 | |
985 if stderr is not sys.stderr and hasattr(stderr, "close"): | |
986 stderr.close() | |
987 | |
988 return rcode | |
989 else: | |
990 if job_script_contents is None: | |
991 job_script_contents = SHELL_COMMAND_TEMPLATE | |
992 | |
993 env_copy = {} | |
994 key = None # type: Optional[str] | |
995 for key in env: | |
996 env_copy[key] = env[key] | |
997 | |
998 job_description = { | |
999 "commands": commands, | |
1000 "cwd": cwd, | |
1001 "env": env_copy, | |
1002 "stdout_path": stdout_path, | |
1003 "stderr_path": stderr_path, | |
1004 "stdin_path": stdin_path, | |
1005 } | |
1006 | |
1007 job_dir = make_job_dir() | |
1008 try: | |
1009 with open( | |
1010 os.path.join(job_dir, "job.json"), mode="w", encoding="utf-8" | |
1011 ) as job_file: | |
1012 json_dump(job_description, job_file, ensure_ascii=False) | |
1013 job_script = os.path.join(job_dir, "run_job.bash") | |
1014 with open(job_script, "wb") as _: | |
1015 _.write(job_script_contents.encode("utf-8")) | |
1016 job_run = os.path.join(job_dir, "run_job.py") | |
1017 with open(job_run, "wb") as _: | |
1018 _.write(PYTHON_RUN_SCRIPT.encode("utf-8")) | |
1019 sproc = subprocess.Popen( # nosec | |
1020 ["bash", job_script.encode("utf-8")], | |
1021 shell=False, # nosec | |
1022 cwd=job_dir, | |
1023 # The nested script will output the paths to the correct files if they need | |
1024 # to be captured. Else just write everything to stderr (same as above). | |
1025 stdout=sys.stderr, | |
1026 stderr=sys.stderr, | |
1027 stdin=subprocess.PIPE, | |
1028 universal_newlines=True, | |
1029 ) | |
1030 processes_to_kill.append(sproc) | |
1031 if sproc.stdin is not None: | |
1032 sproc.stdin.close() | |
1033 | |
1034 rcode = sproc.wait() | |
1035 | |
1036 return rcode | |
1037 finally: | |
1038 shutil.rmtree(job_dir) |