Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/cwltool/executors.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler | 
|---|---|
| date | Fri, 31 Jul 2020 00:18:57 -0400 | 
| parents | |
| children | 
   comparison
  equal
  deleted
  inserted
  replaced
| -1:000000000000 | 0:d30785e31577 | 
|---|---|
| 1 # -*- coding: utf-8 -*- | |
| 2 """ Single and multi-threaded executors.""" | |
| 3 import datetime | |
| 4 import os | |
| 5 import tempfile | |
| 6 import threading | |
| 7 import logging | |
| 8 from threading import Lock | |
| 9 from abc import ABCMeta, abstractmethod | |
| 10 from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union | |
| 11 | |
| 12 import psutil | |
| 13 from six import string_types, with_metaclass | |
| 14 from typing_extensions import Text # pylint: disable=unused-import | |
| 15 from future.utils import raise_from | |
| 16 from schema_salad.validate import ValidationException | |
| 17 | |
| 18 from .builder import Builder # pylint: disable=unused-import | |
| 19 from .context import (RuntimeContext, # pylint: disable=unused-import | |
| 20 getdefault) | |
| 21 from .errors import WorkflowException | |
| 22 from .job import JobBase # pylint: disable=unused-import | |
| 23 from .loghandler import _logger | |
| 24 from .mutation import MutationManager | |
| 25 from .process import Process # pylint: disable=unused-import | |
| 26 from .process import cleanIntermediate, relocateOutputs | |
| 27 from .provenance import ProvenanceProfile | |
| 28 from .utils import DEFAULT_TMP_PREFIX | |
| 29 from .workflow import Workflow, WorkflowJob, WorkflowJobStep | |
| 30 from .command_line_tool import CallbackJob | |
| 31 | |
| 32 TMPDIR_LOCK = Lock() | |
| 33 | |
| 34 | |
| 35 class JobExecutor(with_metaclass(ABCMeta, object)): | |
| 36 """Abstract base job executor.""" | |
| 37 | |
| 38 def __init__(self): | |
| 39 # type: (...) -> None | |
| 40 """Initialize.""" | |
| 41 self.final_output = [] # type: List[Union[Dict[Text, Any], List[Dict[Text, Any]]]] | |
| 42 self.final_status = [] # type: List[Text] | |
| 43 self.output_dirs = set() # type: Set[Text] | |
| 44 | |
| 45 def __call__(self, *args, **kwargs): # type: (*Any, **Any) -> Any | |
| 46 return self.execute(*args, **kwargs) | |
| 47 | |
| 48 def output_callback(self, out, process_status): # type: (Dict[Text, Any], Text) -> None | |
| 49 """Collect the final status and outputs.""" | |
| 50 self.final_status.append(process_status) | |
| 51 self.final_output.append(out) | |
| 52 | |
| 53 @abstractmethod | |
| 54 def run_jobs(self, | |
| 55 process, # type: Process | |
| 56 job_order_object, # type: Dict[Text, Any] | |
| 57 logger, # type: logging.Logger | |
| 58 runtime_context # type: RuntimeContext | |
| 59 ): # type: (...) -> None | |
| 60 """Execute the jobs for the given Process.""" | |
| 61 | |
| 62 def execute(self, | |
| 63 process, # type: Process | |
| 64 job_order_object, # type: Dict[Text, Any] | |
| 65 runtime_context, # type: RuntimeContext | |
| 66 logger=_logger, # type: logging.Logger | |
| 67 ): # type: (...) -> Tuple[Optional[Union[Dict[Text, Any], List[Dict[Text, Any]]]], Text] | |
| 68 """Execute the process.""" | |
| 69 if not runtime_context.basedir: | |
| 70 raise WorkflowException("Must provide 'basedir' in runtimeContext") | |
| 71 | |
| 72 finaloutdir = None # Type: Optional[Text] | |
| 73 original_outdir = runtime_context.outdir | |
| 74 if isinstance(original_outdir, string_types): | |
| 75 finaloutdir = os.path.abspath(original_outdir) | |
| 76 runtime_context = runtime_context.copy() | |
| 77 outdir = tempfile.mkdtemp( | |
| 78 prefix=getdefault(runtime_context.tmp_outdir_prefix, DEFAULT_TMP_PREFIX)) | |
| 79 self.output_dirs.add(outdir) | |
| 80 runtime_context.outdir = outdir | |
| 81 runtime_context.mutation_manager = MutationManager() | |
| 82 runtime_context.toplevel = True | |
| 83 runtime_context.workflow_eval_lock = threading.Condition(threading.RLock()) | |
| 84 | |
| 85 job_reqs = None | |
| 86 if "https://w3id.org/cwl/cwl#requirements" in job_order_object: | |
| 87 if process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0': | |
| 88 raise WorkflowException( | |
| 89 "`cwl:requirements` in the input object is not part of CWL " | |
| 90 "v1.0. You can adjust to use `cwltool:overrides` instead; or you " | |
| 91 "can set the cwlVersion to v1.1") | |
| 92 job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"] | |
| 93 elif ("cwl:defaults" in process.metadata | |
| 94 and "https://w3id.org/cwl/cwl#requirements" | |
| 95 in process.metadata["cwl:defaults"]): | |
| 96 if process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0': | |
| 97 raise WorkflowException( | |
| 98 "`cwl:requirements` in the input object is not part of CWL " | |
| 99 "v1.0. You can adjust to use `cwltool:overrides` instead; or you " | |
| 100 "can set the cwlVersion to v1.1") | |
| 101 job_reqs = process.metadata["cwl:defaults"]["https://w3id.org/cwl/cwl#requirements"] | |
| 102 if job_reqs is not None: | |
| 103 for req in job_reqs: | |
| 104 process.requirements.append(req) | |
| 105 | |
| 106 self.run_jobs(process, job_order_object, logger, runtime_context) | |
| 107 | |
| 108 if self.final_output and self.final_output[0] is not None and finaloutdir is not None: | |
| 109 self.final_output[0] = relocateOutputs( | |
| 110 self.final_output[0], finaloutdir, self.output_dirs, | |
| 111 runtime_context.move_outputs, runtime_context.make_fs_access(""), | |
| 112 getdefault(runtime_context.compute_checksum, True), | |
| 113 path_mapper=runtime_context.path_mapper) | |
| 114 | |
| 115 if runtime_context.rm_tmpdir: | |
| 116 if runtime_context.cachedir is None: | |
| 117 output_dirs = self.output_dirs # type: Iterable[Any] | |
| 118 else: | |
| 119 output_dirs = filter(lambda x: not x.startswith( | |
| 120 runtime_context.cachedir), self.output_dirs) | |
| 121 cleanIntermediate(output_dirs) | |
| 122 | |
| 123 if self.final_output and self.final_status: | |
| 124 | |
| 125 if runtime_context.research_obj is not None and \ | |
| 126 isinstance(process, (JobBase, Process, WorkflowJobStep, | |
| 127 WorkflowJob)) and process.parent_wf: | |
| 128 process_run_id = None | |
| 129 name = "primary" | |
| 130 process.parent_wf.generate_output_prov(self.final_output[0], | |
| 131 process_run_id, name) | |
| 132 process.parent_wf.document.wasEndedBy( | |
| 133 process.parent_wf.workflow_run_uri, None, process.parent_wf.engine_uuid, | |
| 134 datetime.datetime.now()) | |
| 135 process.parent_wf.finalize_prov_profile(name=None) | |
| 136 return (self.final_output[0], self.final_status[0]) | |
| 137 return (None, "permanentFail") | |
| 138 | |
| 139 | |
| 140 class SingleJobExecutor(JobExecutor): | |
| 141 """Default single-threaded CWL reference executor.""" | |
| 142 | |
| 143 def run_jobs(self, | |
| 144 process, # type: Process | |
| 145 job_order_object, # type: Dict[Text, Any] | |
| 146 logger, # type: logging.Logger | |
| 147 runtime_context # type: RuntimeContext | |
| 148 ): # type: (...) -> None | |
| 149 | |
| 150 process_run_id = None # type: Optional[str] | |
| 151 | |
| 152 # define provenance profile for single commandline tool | |
| 153 if not isinstance(process, Workflow) \ | |
| 154 and runtime_context.research_obj is not None: | |
| 155 process.provenance_object = ProvenanceProfile( | |
| 156 runtime_context.research_obj, | |
| 157 full_name=runtime_context.cwl_full_name, | |
| 158 host_provenance=False, | |
| 159 user_provenance=False, | |
| 160 orcid=runtime_context.orcid, | |
| 161 # single tool execution, so RO UUID = wf UUID = tool UUID | |
| 162 run_uuid=runtime_context.research_obj.ro_uuid, | |
| 163 fsaccess=runtime_context.make_fs_access('')) | |
| 164 process.parent_wf = process.provenance_object | |
| 165 jobiter = process.job(job_order_object, self.output_callback, | |
| 166 runtime_context) | |
| 167 | |
| 168 try: | |
| 169 for job in jobiter: | |
| 170 if job is not None: | |
| 171 if runtime_context.builder is not None: | |
| 172 job.builder = runtime_context.builder | |
| 173 if job.outdir is not None: | |
| 174 self.output_dirs.add(job.outdir) | |
| 175 if runtime_context.research_obj is not None: | |
| 176 if not isinstance(process, Workflow): | |
| 177 prov_obj = process.provenance_object | |
| 178 else: | |
| 179 prov_obj = job.prov_obj | |
| 180 if prov_obj: | |
| 181 runtime_context.prov_obj = prov_obj | |
| 182 prov_obj.fsaccess = runtime_context.make_fs_access('') | |
| 183 prov_obj.evaluate( | |
| 184 process, job, job_order_object, | |
| 185 runtime_context.research_obj) | |
| 186 process_run_id =\ | |
| 187 prov_obj.record_process_start(process, job) | |
| 188 runtime_context = runtime_context.copy() | |
| 189 runtime_context.process_run_id = process_run_id | |
| 190 job.run(runtime_context) | |
| 191 else: | |
| 192 logger.error("Workflow cannot make any more progress.") | |
| 193 break | |
| 194 except (ValidationException, WorkflowException): # pylint: disable=try-except-raise | |
| 195 raise | |
| 196 except Exception as err: | |
| 197 logger.exception("Got workflow error") | |
| 198 raise_from(WorkflowException(Text(err)), err) | |
| 199 | |
| 200 | |
| 201 class MultithreadedJobExecutor(JobExecutor): | |
| 202 """ | |
| 203 Experimental multi-threaded CWL executor. | |
| 204 | |
| 205 Does simple resource accounting, will not start a job unless it | |
| 206 has cores / ram available, but does not make any attempt to | |
| 207 optimize usage. | |
| 208 """ | |
| 209 | |
| 210 def __init__(self): # type: () -> None | |
| 211 """Initialize.""" | |
| 212 super(MultithreadedJobExecutor, self).__init__() | |
| 213 self.threads = set() # type: Set[threading.Thread] | |
| 214 self.exceptions = [] # type: List[WorkflowException] | |
| 215 self.pending_jobs = [] # type: List[Union[JobBase, WorkflowJob]] | |
| 216 self.pending_jobs_lock = threading.Lock() | |
| 217 | |
| 218 self.max_ram = int(psutil.virtual_memory().available / 2**20) | |
| 219 self.max_cores = psutil.cpu_count() | |
| 220 self.allocated_ram = 0 | |
| 221 self.allocated_cores = 0 | |
| 222 | |
| 223 def select_resources(self, request, runtime_context): # pylint: disable=unused-argument | |
| 224 # type: (Dict[str, int], RuntimeContext) -> Dict[str, int] | |
| 225 """Naïve check for available cpu cores and memory.""" | |
| 226 result = {} # type: Dict[str, int] | |
| 227 maxrsc = { | |
| 228 "cores": self.max_cores, | |
| 229 "ram": self.max_ram | |
| 230 } | |
| 231 for rsc in ("cores", "ram"): | |
| 232 if request[rsc+"Min"] > maxrsc[rsc]: | |
| 233 raise WorkflowException( | |
| 234 "Requested at least %d %s but only %d available" % | |
| 235 (request[rsc+"Min"], rsc, maxrsc[rsc])) | |
| 236 if request[rsc+"Max"] < maxrsc[rsc]: | |
| 237 result[rsc] = request[rsc+"Max"] | |
| 238 else: | |
| 239 result[rsc] = maxrsc[rsc] | |
| 240 | |
| 241 return result | |
| 242 | |
| 243 def _runner(self, job, runtime_context, TMPDIR_LOCK): | |
| 244 # type: (Union[JobBase, WorkflowJob, CallbackJob], RuntimeContext, threading.Lock) -> None | |
| 245 """Job running thread.""" | |
| 246 try: | |
| 247 _logger.debug("job: {}, runtime_context: {}, TMPDIR_LOCK: {}".format(job, runtime_context, TMPDIR_LOCK)) | |
| 248 job.run(runtime_context, TMPDIR_LOCK) | |
| 249 except WorkflowException as err: | |
| 250 _logger.exception("Got workflow error") | |
| 251 self.exceptions.append(err) | |
| 252 except Exception as err: # pylint: disable=broad-except | |
| 253 _logger.exception("Got workflow error") | |
| 254 self.exceptions.append(WorkflowException(Text(err))) | |
| 255 finally: | |
| 256 if runtime_context.workflow_eval_lock: | |
| 257 with runtime_context.workflow_eval_lock: | |
| 258 self.threads.remove(threading.current_thread()) | |
| 259 if isinstance(job, JobBase): | |
| 260 self.allocated_ram -= job.builder.resources["ram"] | |
| 261 self.allocated_cores -= job.builder.resources["cores"] | |
| 262 runtime_context.workflow_eval_lock.notifyAll() | |
| 263 | |
| 264 def run_job(self, | |
| 265 job, # type: Union[JobBase, WorkflowJob, None] | |
| 266 runtime_context # type: RuntimeContext | |
| 267 ): # type: (...) -> None | |
| 268 """Execute a single Job in a seperate thread.""" | |
| 269 if job is not None: | |
| 270 with self.pending_jobs_lock: | |
| 271 self.pending_jobs.append(job) | |
| 272 | |
| 273 with self.pending_jobs_lock: | |
| 274 n = 0 | |
| 275 while (n+1) <= len(self.pending_jobs): | |
| 276 job = self.pending_jobs[n] | |
| 277 if isinstance(job, JobBase): | |
| 278 if ((job.builder.resources["ram"]) | |
| 279 > self.max_ram | |
| 280 or (job.builder.resources["cores"]) | |
| 281 > self.max_cores): | |
| 282 _logger.error( | |
| 283 'Job "%s" cannot be run, requests more resources (%s) ' | |
| 284 'than available on this host (max ram %d, max cores %d', | |
| 285 job.name, job.builder.resources, | |
| 286 self.allocated_ram, | |
| 287 self.allocated_cores, | |
| 288 self.max_ram, | |
| 289 self.max_cores) | |
| 290 self.pending_jobs.remove(job) | |
| 291 return | |
| 292 | |
| 293 if ((self.allocated_ram + job.builder.resources["ram"]) | |
| 294 > self.max_ram | |
| 295 or (self.allocated_cores + job.builder.resources["cores"]) | |
| 296 > self.max_cores): | |
| 297 _logger.debug( | |
| 298 'Job "%s" cannot run yet, resources (%s) are not ' | |
| 299 'available (already allocated ram is %d, allocated cores is %d, ' | |
| 300 'max ram %d, max cores %d', | |
| 301 job.name, job.builder.resources, | |
| 302 self.allocated_ram, | |
| 303 self.allocated_cores, | |
| 304 self.max_ram, | |
| 305 self.max_cores) | |
| 306 n += 1 | |
| 307 continue | |
| 308 | |
| 309 thread = threading.Thread(target=self._runner, args=(job, runtime_context, TMPDIR_LOCK)) | |
| 310 thread.daemon = True | |
| 311 self.threads.add(thread) | |
| 312 if isinstance(job, JobBase): | |
| 313 self.allocated_ram += job.builder.resources["ram"] | |
| 314 self.allocated_cores += job.builder.resources["cores"] | |
| 315 thread.start() | |
| 316 self.pending_jobs.remove(job) | |
| 317 | |
| 318 def wait_for_next_completion(self, runtime_context): | |
| 319 # type: (RuntimeContext) -> None | |
| 320 """Wait for jobs to finish.""" | |
| 321 if runtime_context.workflow_eval_lock is not None: | |
| 322 runtime_context.workflow_eval_lock.wait() | |
| 323 if self.exceptions: | |
| 324 raise self.exceptions[0] | |
| 325 | |
| 326 def run_jobs(self, | |
| 327 process, # type: Process | |
| 328 job_order_object, # type: Dict[Text, Any] | |
| 329 logger, # type: logging.Logger | |
| 330 runtime_context # type: RuntimeContext | |
| 331 ): # type: (...) -> None | |
| 332 | |
| 333 jobiter = process.job(job_order_object, self.output_callback, | |
| 334 runtime_context) | |
| 335 | |
| 336 if runtime_context.workflow_eval_lock is None: | |
| 337 raise WorkflowException( | |
| 338 "runtimeContext.workflow_eval_lock must not be None") | |
| 339 | |
| 340 runtime_context.workflow_eval_lock.acquire() | |
| 341 for job in jobiter: | |
| 342 if job is not None: | |
| 343 if isinstance(job, JobBase): | |
| 344 job.builder = runtime_context.builder or job.builder | |
| 345 if job.outdir is not None: | |
| 346 self.output_dirs.add(job.outdir) | |
| 347 | |
| 348 self.run_job(job, runtime_context) | |
| 349 | |
| 350 if job is None: | |
| 351 if self.threads: | |
| 352 self.wait_for_next_completion(runtime_context) | |
| 353 else: | |
| 354 logger.error("Workflow cannot make any more progress.") | |
| 355 break | |
| 356 | |
| 357 self.run_job(None, runtime_context) | |
| 358 while self.threads: | |
| 359 self.wait_for_next_completion(runtime_context) | |
| 360 self.run_job(None, runtime_context) | |
| 361 | |
| 362 runtime_context.workflow_eval_lock.release() | 
