Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/executors.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """Single and multi-threaded executors.""" | |
2 import datetime | |
3 import functools | |
4 import logging | |
5 import math | |
6 import os | |
7 import threading | |
8 from abc import ABCMeta, abstractmethod | |
9 from threading import Lock | |
10 from typing import ( | |
11 Dict, | |
12 Iterable, | |
13 List, | |
14 MutableSequence, | |
15 Optional, | |
16 Set, | |
17 Tuple, | |
18 Union, | |
19 cast, | |
20 ) | |
21 | |
22 import psutil | |
23 from schema_salad.exceptions import ValidationException | |
24 from schema_salad.sourceline import SourceLine | |
25 | |
26 from .command_line_tool import CallbackJob, ExpressionJob | |
27 from .context import RuntimeContext, getdefault | |
28 from .errors import WorkflowException | |
29 from .job import JobBase | |
30 from .loghandler import _logger | |
31 from .mutation import MutationManager | |
32 from .process import Process, cleanIntermediate, relocateOutputs | |
33 from .provenance_profile import ProvenanceProfile | |
34 from .task_queue import TaskQueue | |
35 from .utils import CWLObjectType, JobsType | |
36 from .workflow import Workflow | |
37 from .workflow_job import WorkflowJob, WorkflowJobStep | |
38 | |
39 TMPDIR_LOCK = Lock() | |
40 | |
41 | |
42 class JobExecutor(metaclass=ABCMeta): | |
43 """Abstract base job executor.""" | |
44 | |
45 def __init__(self) -> None: | |
46 """Initialize.""" | |
47 self.final_output = [] # type: MutableSequence[Optional[CWLObjectType]] | |
48 self.final_status = [] # type: List[str] | |
49 self.output_dirs = set() # type: Set[str] | |
50 | |
51 def __call__( | |
52 self, | |
53 process: Process, | |
54 job_order_object: CWLObjectType, | |
55 runtime_context: RuntimeContext, | |
56 logger: logging.Logger = _logger, | |
57 ) -> Tuple[Optional[CWLObjectType], str]: | |
58 | |
59 return self.execute(process, job_order_object, runtime_context, logger) | |
60 | |
61 def output_callback( | |
62 self, out: Optional[CWLObjectType], process_status: str | |
63 ) -> None: | |
64 """Collect the final status and outputs.""" | |
65 self.final_status.append(process_status) | |
66 self.final_output.append(out) | |
67 | |
68 @abstractmethod | |
69 def run_jobs( | |
70 self, | |
71 process: Process, | |
72 job_order_object: CWLObjectType, | |
73 logger: logging.Logger, | |
74 runtime_context: RuntimeContext, | |
75 ) -> None: | |
76 """Execute the jobs for the given Process.""" | |
77 | |
78 def execute( | |
79 self, | |
80 process: Process, | |
81 job_order_object: CWLObjectType, | |
82 runtime_context: RuntimeContext, | |
83 logger: logging.Logger = _logger, | |
84 ) -> Tuple[Union[Optional[CWLObjectType]], str]: | |
85 """Execute the process.""" | |
86 if not runtime_context.basedir: | |
87 raise WorkflowException("Must provide 'basedir' in runtimeContext") | |
88 | |
89 def check_for_abstract_op(tool: CWLObjectType) -> None: | |
90 if tool["class"] == "Operation": | |
91 raise SourceLine(tool, "class", WorkflowException).makeError( | |
92 "Workflow has unrunnable abstract Operation" | |
93 ) | |
94 | |
95 process.visit(check_for_abstract_op) | |
96 | |
97 finaloutdir = None # Type: Optional[str] | |
98 original_outdir = runtime_context.outdir | |
99 if isinstance(original_outdir, str): | |
100 finaloutdir = os.path.abspath(original_outdir) | |
101 runtime_context = runtime_context.copy() | |
102 outdir = runtime_context.create_outdir() | |
103 self.output_dirs.add(outdir) | |
104 runtime_context.outdir = outdir | |
105 runtime_context.mutation_manager = MutationManager() | |
106 runtime_context.toplevel = True | |
107 runtime_context.workflow_eval_lock = threading.Condition(threading.RLock()) | |
108 | |
109 job_reqs = None # type: Optional[List[CWLObjectType]] | |
110 if "https://w3id.org/cwl/cwl#requirements" in job_order_object: | |
111 if ( | |
112 process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") | |
113 == "v1.0" | |
114 ): | |
115 raise WorkflowException( | |
116 "`cwl:requirements` in the input object is not part of CWL " | |
117 "v1.0. You can adjust to use `cwltool:overrides` instead; or you " | |
118 "can set the cwlVersion to v1.1" | |
119 ) | |
120 job_reqs = cast( | |
121 List[CWLObjectType], | |
122 job_order_object["https://w3id.org/cwl/cwl#requirements"], | |
123 ) | |
124 elif ( | |
125 "cwl:defaults" in process.metadata | |
126 and "https://w3id.org/cwl/cwl#requirements" | |
127 in cast(CWLObjectType, process.metadata["cwl:defaults"]) | |
128 ): | |
129 if ( | |
130 process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") | |
131 == "v1.0" | |
132 ): | |
133 raise WorkflowException( | |
134 "`cwl:requirements` in the input object is not part of CWL " | |
135 "v1.0. You can adjust to use `cwltool:overrides` instead; or you " | |
136 "can set the cwlVersion to v1.1" | |
137 ) | |
138 job_reqs = cast( | |
139 Optional[List[CWLObjectType]], | |
140 cast(CWLObjectType, process.metadata["cwl:defaults"])[ | |
141 "https://w3id.org/cwl/cwl#requirements" | |
142 ], | |
143 ) | |
144 if job_reqs is not None: | |
145 for req in job_reqs: | |
146 process.requirements.append(req) | |
147 | |
148 self.run_jobs(process, job_order_object, logger, runtime_context) | |
149 | |
150 if ( | |
151 self.final_output | |
152 and self.final_output[0] is not None | |
153 and finaloutdir is not None | |
154 ): | |
155 self.final_output[0] = relocateOutputs( | |
156 self.final_output[0], | |
157 finaloutdir, | |
158 self.output_dirs, | |
159 runtime_context.move_outputs, | |
160 runtime_context.make_fs_access(""), | |
161 getdefault(runtime_context.compute_checksum, True), | |
162 path_mapper=runtime_context.path_mapper, | |
163 ) | |
164 | |
165 if runtime_context.rm_tmpdir: | |
166 if runtime_context.cachedir is None: | |
167 output_dirs = self.output_dirs # type: Iterable[str] | |
168 else: | |
169 output_dirs = filter( | |
170 lambda x: not x.startswith(runtime_context.cachedir), # type: ignore | |
171 self.output_dirs, | |
172 ) | |
173 cleanIntermediate(output_dirs) | |
174 | |
175 if self.final_output and self.final_status: | |
176 | |
177 if ( | |
178 runtime_context.research_obj is not None | |
179 and isinstance( | |
180 process, (JobBase, Process, WorkflowJobStep, WorkflowJob) | |
181 ) | |
182 and process.parent_wf | |
183 ): | |
184 process_run_id = None # type: Optional[str] | |
185 name = "primary" | |
186 process.parent_wf.generate_output_prov( | |
187 self.final_output[0], process_run_id, name | |
188 ) | |
189 process.parent_wf.document.wasEndedBy( | |
190 process.parent_wf.workflow_run_uri, | |
191 None, | |
192 process.parent_wf.engine_uuid, | |
193 datetime.datetime.now(), | |
194 ) | |
195 process.parent_wf.finalize_prov_profile(name=None) | |
196 return (self.final_output[0], self.final_status[0]) | |
197 return (None, "permanentFail") | |
198 | |
199 | |
200 class SingleJobExecutor(JobExecutor): | |
201 """Default single-threaded CWL reference executor.""" | |
202 | |
203 def run_jobs( | |
204 self, | |
205 process: Process, | |
206 job_order_object: CWLObjectType, | |
207 logger: logging.Logger, | |
208 runtime_context: RuntimeContext, | |
209 ) -> None: | |
210 | |
211 process_run_id = None # type: Optional[str] | |
212 | |
213 # define provenance profile for single commandline tool | |
214 if ( | |
215 not isinstance(process, Workflow) | |
216 and runtime_context.research_obj is not None | |
217 ): | |
218 process.provenance_object = ProvenanceProfile( | |
219 runtime_context.research_obj, | |
220 full_name=runtime_context.cwl_full_name, | |
221 host_provenance=False, | |
222 user_provenance=False, | |
223 orcid=runtime_context.orcid, | |
224 # single tool execution, so RO UUID = wf UUID = tool UUID | |
225 run_uuid=runtime_context.research_obj.ro_uuid, | |
226 fsaccess=runtime_context.make_fs_access(""), | |
227 ) | |
228 process.parent_wf = process.provenance_object | |
229 jobiter = process.job(job_order_object, self.output_callback, runtime_context) | |
230 | |
231 try: | |
232 for job in jobiter: | |
233 if job is not None: | |
234 if runtime_context.builder is not None and hasattr(job, "builder"): | |
235 job.builder = runtime_context.builder # type: ignore | |
236 if job.outdir is not None: | |
237 self.output_dirs.add(job.outdir) | |
238 if runtime_context.research_obj is not None: | |
239 if not isinstance(process, Workflow): | |
240 prov_obj = process.provenance_object | |
241 else: | |
242 prov_obj = job.prov_obj | |
243 if prov_obj: | |
244 runtime_context.prov_obj = prov_obj | |
245 prov_obj.fsaccess = runtime_context.make_fs_access("") | |
246 prov_obj.evaluate( | |
247 process, | |
248 job, | |
249 job_order_object, | |
250 runtime_context.research_obj, | |
251 ) | |
252 process_run_id = prov_obj.record_process_start(process, job) | |
253 runtime_context = runtime_context.copy() | |
254 runtime_context.process_run_id = process_run_id | |
255 job.run(runtime_context) | |
256 else: | |
257 logger.error("Workflow cannot make any more progress.") | |
258 break | |
259 except ( | |
260 ValidationException, | |
261 WorkflowException, | |
262 ): # pylint: disable=try-except-raise | |
263 raise | |
264 except Exception as err: | |
265 logger.exception("Got workflow error") | |
266 raise WorkflowException(str(err)) from err | |
267 | |
268 | |
269 class MultithreadedJobExecutor(JobExecutor): | |
270 """ | |
271 Experimental multi-threaded CWL executor. | |
272 | |
273 Does simple resource accounting, will not start a job unless it | |
274 has cores / ram available, but does not make any attempt to | |
275 optimize usage. | |
276 """ | |
277 | |
278 def __init__(self) -> None: | |
279 """Initialize.""" | |
280 super().__init__() | |
281 self.exceptions = [] # type: List[WorkflowException] | |
282 self.pending_jobs = [] # type: List[JobsType] | |
283 self.pending_jobs_lock = threading.Lock() | |
284 | |
285 self.max_ram = int(psutil.virtual_memory().available / 2 ** 20) | |
286 self.max_cores = float(psutil.cpu_count()) | |
287 self.allocated_ram = float(0) | |
288 self.allocated_cores = float(0) | |
289 | |
290 def select_resources( | |
291 self, request, runtime_context | |
292 ): # pylint: disable=unused-argument | |
293 # type: (Dict[str, Union[int, float, str]], RuntimeContext) -> Dict[str, Union[int, float, str]] | |
294 """Naïve check for available cpu cores and memory.""" | |
295 result = {} # type: Dict[str, Union[int, float, str]] | |
296 maxrsc = {"cores": self.max_cores, "ram": self.max_ram} | |
297 for rsc in ("cores", "ram"): | |
298 rsc_min = request[rsc + "Min"] | |
299 if not isinstance(rsc_min, str) and rsc_min > maxrsc[rsc]: | |
300 raise WorkflowException( | |
301 "Requested at least %d %s but only %d available" | |
302 % (rsc_min, rsc, maxrsc[rsc]) | |
303 ) | |
304 rsc_max = request[rsc + "Max"] | |
305 if not isinstance(rsc_max, str) and rsc_max < maxrsc[rsc]: | |
306 result[rsc] = math.ceil(rsc_max) | |
307 else: | |
308 result[rsc] = maxrsc[rsc] | |
309 | |
310 result["tmpdirSize"] = ( | |
311 math.ceil(request["tmpdirMin"]) | |
312 if not isinstance(request["tmpdirMin"], str) | |
313 else request["tmpdirMin"] | |
314 ) | |
315 result["outdirSize"] = ( | |
316 math.ceil(request["outdirMin"]) | |
317 if not isinstance(request["outdirMin"], str) | |
318 else request["outdirMin"] | |
319 ) | |
320 | |
321 return result | |
322 | |
323 def _runner(self, job, runtime_context, TMPDIR_LOCK): | |
324 # type: (Union[JobBase, WorkflowJob, CallbackJob, ExpressionJob], RuntimeContext, threading.Lock) -> None | |
325 """Job running thread.""" | |
326 try: | |
327 _logger.debug( | |
328 "job: {}, runtime_context: {}, TMPDIR_LOCK: {}".format( | |
329 job, runtime_context, TMPDIR_LOCK | |
330 ) | |
331 ) | |
332 job.run(runtime_context, TMPDIR_LOCK) | |
333 except WorkflowException as err: | |
334 _logger.exception(f"Got workflow error: {err}") | |
335 self.exceptions.append(err) | |
336 except Exception as err: # pylint: disable=broad-except | |
337 _logger.exception(f"Got workflow error: {err}") | |
338 self.exceptions.append(WorkflowException(str(err))) | |
339 finally: | |
340 if runtime_context.workflow_eval_lock: | |
341 with runtime_context.workflow_eval_lock: | |
342 if isinstance(job, JobBase): | |
343 ram = job.builder.resources["ram"] | |
344 if not isinstance(ram, str): | |
345 self.allocated_ram -= ram | |
346 cores = job.builder.resources["cores"] | |
347 if not isinstance(cores, str): | |
348 self.allocated_cores -= cores | |
349 runtime_context.workflow_eval_lock.notifyAll() | |
350 | |
351 def run_job( | |
352 self, | |
353 job: Optional[JobsType], | |
354 runtime_context: RuntimeContext, | |
355 ) -> None: | |
356 """Execute a single Job in a seperate thread.""" | |
357 if job is not None: | |
358 with self.pending_jobs_lock: | |
359 self.pending_jobs.append(job) | |
360 | |
361 with self.pending_jobs_lock: | |
362 n = 0 | |
363 while (n + 1) <= len(self.pending_jobs): | |
364 # Simple greedy resource allocation strategy. Go | |
365 # through pending jobs in the order they were | |
366 # generated and add them to the queue only if there | |
367 # are resources available. | |
368 job = self.pending_jobs[n] | |
369 if isinstance(job, JobBase): | |
370 ram = job.builder.resources["ram"] | |
371 cores = job.builder.resources["cores"] | |
372 if (not isinstance(ram, str) and ram > self.max_ram) or ( | |
373 not isinstance(cores, str) and cores > self.max_cores | |
374 ): | |
375 _logger.error( | |
376 'Job "%s" cannot be run, requests more resources (%s) ' | |
377 "than available on this host (max ram %d, max cores %d", | |
378 job.name, | |
379 job.builder.resources, | |
380 self.allocated_ram, | |
381 self.allocated_cores, | |
382 self.max_ram, | |
383 self.max_cores, | |
384 ) | |
385 self.pending_jobs.remove(job) | |
386 return | |
387 | |
388 if ( | |
389 not isinstance(ram, str) | |
390 and self.allocated_ram + ram > self.max_ram | |
391 ) or ( | |
392 not isinstance(cores, str) | |
393 and self.allocated_cores + cores > self.max_cores | |
394 ): | |
395 _logger.debug( | |
396 'Job "%s" cannot run yet, resources (%s) are not ' | |
397 "available (already allocated ram is %d, allocated cores is %d, " | |
398 "max ram %d, max cores %d", | |
399 job.name, | |
400 job.builder.resources, | |
401 self.allocated_ram, | |
402 self.allocated_cores, | |
403 self.max_ram, | |
404 self.max_cores, | |
405 ) | |
406 n += 1 | |
407 continue | |
408 | |
409 if isinstance(job, JobBase): | |
410 ram = job.builder.resources["ram"] | |
411 if not isinstance(ram, str): | |
412 self.allocated_ram += ram | |
413 cores = job.builder.resources["cores"] | |
414 if not isinstance(cores, str): | |
415 self.allocated_cores += cores | |
416 self.taskqueue.add( | |
417 functools.partial(self._runner, job, runtime_context, TMPDIR_LOCK), | |
418 runtime_context.workflow_eval_lock, | |
419 ) | |
420 self.pending_jobs.remove(job) | |
421 | |
422 def wait_for_next_completion(self, runtime_context): | |
423 # type: (RuntimeContext) -> None | |
424 """Wait for jobs to finish.""" | |
425 if runtime_context.workflow_eval_lock is not None: | |
426 runtime_context.workflow_eval_lock.wait(timeout=3) | |
427 if self.exceptions: | |
428 raise self.exceptions[0] | |
429 | |
430 def run_jobs( | |
431 self, | |
432 process: Process, | |
433 job_order_object: CWLObjectType, | |
434 logger: logging.Logger, | |
435 runtime_context: RuntimeContext, | |
436 ) -> None: | |
437 | |
438 self.taskqueue = TaskQueue( | |
439 threading.Lock(), psutil.cpu_count() | |
440 ) # type: TaskQueue | |
441 try: | |
442 | |
443 jobiter = process.job( | |
444 job_order_object, self.output_callback, runtime_context | |
445 ) | |
446 | |
447 if runtime_context.workflow_eval_lock is None: | |
448 raise WorkflowException( | |
449 "runtimeContext.workflow_eval_lock must not be None" | |
450 ) | |
451 | |
452 runtime_context.workflow_eval_lock.acquire() | |
453 for job in jobiter: | |
454 if job is not None: | |
455 if isinstance(job, JobBase): | |
456 job.builder = runtime_context.builder or job.builder | |
457 if job.outdir is not None: | |
458 self.output_dirs.add(job.outdir) | |
459 | |
460 self.run_job(job, runtime_context) | |
461 | |
462 if job is None: | |
463 if self.taskqueue.in_flight > 0: | |
464 self.wait_for_next_completion(runtime_context) | |
465 else: | |
466 logger.error("Workflow cannot make any more progress.") | |
467 break | |
468 | |
469 self.run_job(None, runtime_context) | |
470 while self.taskqueue.in_flight > 0: | |
471 self.wait_for_next_completion(runtime_context) | |
472 self.run_job(None, runtime_context) | |
473 | |
474 runtime_context.workflow_eval_lock.release() | |
475 finally: | |
476 self.taskqueue.drain() | |
477 self.taskqueue.join() | |
478 | |
479 | |
480 class NoopJobExecutor(JobExecutor): | |
481 """Do nothing executor, for testing purposes only.""" | |
482 | |
483 def run_jobs( | |
484 self, | |
485 process: Process, | |
486 job_order_object: CWLObjectType, | |
487 logger: logging.Logger, | |
488 runtime_context: RuntimeContext, | |
489 ) -> None: | |
490 pass | |
491 | |
492 def execute( | |
493 self, | |
494 process: Process, | |
495 job_order_object: CWLObjectType, | |
496 runtime_context: RuntimeContext, | |
497 logger: Optional[logging.Logger] = None, | |
498 ) -> Tuple[Optional[CWLObjectType], str]: | |
499 return {}, "success" |