Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/cwltool/executors.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/cwltool/executors.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,499 @@ +"""Single and multi-threaded executors.""" +import datetime +import functools +import logging +import math +import os +import threading +from abc import ABCMeta, abstractmethod +from threading import Lock +from typing import ( + Dict, + Iterable, + List, + MutableSequence, + Optional, + Set, + Tuple, + Union, + cast, +) + +import psutil +from schema_salad.exceptions import ValidationException +from schema_salad.sourceline import SourceLine + +from .command_line_tool import CallbackJob, ExpressionJob +from .context import RuntimeContext, getdefault +from .errors import WorkflowException +from .job import JobBase +from .loghandler import _logger +from .mutation import MutationManager +from .process import Process, cleanIntermediate, relocateOutputs +from .provenance_profile import ProvenanceProfile +from .task_queue import TaskQueue +from .utils import CWLObjectType, JobsType +from .workflow import Workflow +from .workflow_job import WorkflowJob, WorkflowJobStep + +TMPDIR_LOCK = Lock() + + +class JobExecutor(metaclass=ABCMeta): + """Abstract base job executor.""" + + def __init__(self) -> None: + """Initialize.""" + self.final_output = [] # type: MutableSequence[Optional[CWLObjectType]] + self.final_status = [] # type: List[str] + self.output_dirs = set() # type: Set[str] + + def __call__( + self, + process: Process, + job_order_object: CWLObjectType, + runtime_context: RuntimeContext, + logger: logging.Logger = _logger, + ) -> Tuple[Optional[CWLObjectType], str]: + + return self.execute(process, job_order_object, runtime_context, logger) + + def output_callback( + self, out: Optional[CWLObjectType], process_status: str + ) -> None: + """Collect the final status and outputs.""" + self.final_status.append(process_status) + self.final_output.append(out) + + @abstractmethod + def run_jobs( + self, + process: Process, + job_order_object: CWLObjectType, + logger: logging.Logger, + runtime_context: RuntimeContext, + ) -> None: + """Execute the jobs for the given Process.""" + + def execute( + self, + process: Process, + job_order_object: CWLObjectType, + runtime_context: RuntimeContext, + logger: logging.Logger = _logger, + ) -> Tuple[Union[Optional[CWLObjectType]], str]: + """Execute the process.""" + if not runtime_context.basedir: + raise WorkflowException("Must provide 'basedir' in runtimeContext") + + def check_for_abstract_op(tool: CWLObjectType) -> None: + if tool["class"] == "Operation": + raise SourceLine(tool, "class", WorkflowException).makeError( + "Workflow has unrunnable abstract Operation" + ) + + process.visit(check_for_abstract_op) + + finaloutdir = None # Type: Optional[str] + original_outdir = runtime_context.outdir + if isinstance(original_outdir, str): + finaloutdir = os.path.abspath(original_outdir) + runtime_context = runtime_context.copy() + outdir = runtime_context.create_outdir() + self.output_dirs.add(outdir) + runtime_context.outdir = outdir + runtime_context.mutation_manager = MutationManager() + runtime_context.toplevel = True + runtime_context.workflow_eval_lock = threading.Condition(threading.RLock()) + + job_reqs = None # type: Optional[List[CWLObjectType]] + if "https://w3id.org/cwl/cwl#requirements" in job_order_object: + if ( + process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") + == "v1.0" + ): + raise WorkflowException( + "`cwl:requirements` in the input object is not part of CWL " + "v1.0. You can adjust to use `cwltool:overrides` instead; or you " + "can set the cwlVersion to v1.1" + ) + job_reqs = cast( + List[CWLObjectType], + job_order_object["https://w3id.org/cwl/cwl#requirements"], + ) + elif ( + "cwl:defaults" in process.metadata + and "https://w3id.org/cwl/cwl#requirements" + in cast(CWLObjectType, process.metadata["cwl:defaults"]) + ): + if ( + process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") + == "v1.0" + ): + raise WorkflowException( + "`cwl:requirements` in the input object is not part of CWL " + "v1.0. You can adjust to use `cwltool:overrides` instead; or you " + "can set the cwlVersion to v1.1" + ) + job_reqs = cast( + Optional[List[CWLObjectType]], + cast(CWLObjectType, process.metadata["cwl:defaults"])[ + "https://w3id.org/cwl/cwl#requirements" + ], + ) + if job_reqs is not None: + for req in job_reqs: + process.requirements.append(req) + + self.run_jobs(process, job_order_object, logger, runtime_context) + + if ( + self.final_output + and self.final_output[0] is not None + and finaloutdir is not None + ): + self.final_output[0] = relocateOutputs( + self.final_output[0], + finaloutdir, + self.output_dirs, + runtime_context.move_outputs, + runtime_context.make_fs_access(""), + getdefault(runtime_context.compute_checksum, True), + path_mapper=runtime_context.path_mapper, + ) + + if runtime_context.rm_tmpdir: + if runtime_context.cachedir is None: + output_dirs = self.output_dirs # type: Iterable[str] + else: + output_dirs = filter( + lambda x: not x.startswith(runtime_context.cachedir), # type: ignore + self.output_dirs, + ) + cleanIntermediate(output_dirs) + + if self.final_output and self.final_status: + + if ( + runtime_context.research_obj is not None + and isinstance( + process, (JobBase, Process, WorkflowJobStep, WorkflowJob) + ) + and process.parent_wf + ): + process_run_id = None # type: Optional[str] + name = "primary" + process.parent_wf.generate_output_prov( + self.final_output[0], process_run_id, name + ) + process.parent_wf.document.wasEndedBy( + process.parent_wf.workflow_run_uri, + None, + process.parent_wf.engine_uuid, + datetime.datetime.now(), + ) + process.parent_wf.finalize_prov_profile(name=None) + return (self.final_output[0], self.final_status[0]) + return (None, "permanentFail") + + +class SingleJobExecutor(JobExecutor): + """Default single-threaded CWL reference executor.""" + + def run_jobs( + self, + process: Process, + job_order_object: CWLObjectType, + logger: logging.Logger, + runtime_context: RuntimeContext, + ) -> None: + + process_run_id = None # type: Optional[str] + + # define provenance profile for single commandline tool + if ( + not isinstance(process, Workflow) + and runtime_context.research_obj is not None + ): + process.provenance_object = ProvenanceProfile( + runtime_context.research_obj, + full_name=runtime_context.cwl_full_name, + host_provenance=False, + user_provenance=False, + orcid=runtime_context.orcid, + # single tool execution, so RO UUID = wf UUID = tool UUID + run_uuid=runtime_context.research_obj.ro_uuid, + fsaccess=runtime_context.make_fs_access(""), + ) + process.parent_wf = process.provenance_object + jobiter = process.job(job_order_object, self.output_callback, runtime_context) + + try: + for job in jobiter: + if job is not None: + if runtime_context.builder is not None and hasattr(job, "builder"): + job.builder = runtime_context.builder # type: ignore + if job.outdir is not None: + self.output_dirs.add(job.outdir) + if runtime_context.research_obj is not None: + if not isinstance(process, Workflow): + prov_obj = process.provenance_object + else: + prov_obj = job.prov_obj + if prov_obj: + runtime_context.prov_obj = prov_obj + prov_obj.fsaccess = runtime_context.make_fs_access("") + prov_obj.evaluate( + process, + job, + job_order_object, + runtime_context.research_obj, + ) + process_run_id = prov_obj.record_process_start(process, job) + runtime_context = runtime_context.copy() + runtime_context.process_run_id = process_run_id + job.run(runtime_context) + else: + logger.error("Workflow cannot make any more progress.") + break + except ( + ValidationException, + WorkflowException, + ): # pylint: disable=try-except-raise + raise + except Exception as err: + logger.exception("Got workflow error") + raise WorkflowException(str(err)) from err + + +class MultithreadedJobExecutor(JobExecutor): + """ + Experimental multi-threaded CWL executor. + + Does simple resource accounting, will not start a job unless it + has cores / ram available, but does not make any attempt to + optimize usage. + """ + + def __init__(self) -> None: + """Initialize.""" + super().__init__() + self.exceptions = [] # type: List[WorkflowException] + self.pending_jobs = [] # type: List[JobsType] + self.pending_jobs_lock = threading.Lock() + + self.max_ram = int(psutil.virtual_memory().available / 2 ** 20) + self.max_cores = float(psutil.cpu_count()) + self.allocated_ram = float(0) + self.allocated_cores = float(0) + + def select_resources( + self, request, runtime_context + ): # pylint: disable=unused-argument + # type: (Dict[str, Union[int, float, str]], RuntimeContext) -> Dict[str, Union[int, float, str]] + """Naïve check for available cpu cores and memory.""" + result = {} # type: Dict[str, Union[int, float, str]] + maxrsc = {"cores": self.max_cores, "ram": self.max_ram} + for rsc in ("cores", "ram"): + rsc_min = request[rsc + "Min"] + if not isinstance(rsc_min, str) and rsc_min > maxrsc[rsc]: + raise WorkflowException( + "Requested at least %d %s but only %d available" + % (rsc_min, rsc, maxrsc[rsc]) + ) + rsc_max = request[rsc + "Max"] + if not isinstance(rsc_max, str) and rsc_max < maxrsc[rsc]: + result[rsc] = math.ceil(rsc_max) + else: + result[rsc] = maxrsc[rsc] + + result["tmpdirSize"] = ( + math.ceil(request["tmpdirMin"]) + if not isinstance(request["tmpdirMin"], str) + else request["tmpdirMin"] + ) + result["outdirSize"] = ( + math.ceil(request["outdirMin"]) + if not isinstance(request["outdirMin"], str) + else request["outdirMin"] + ) + + return result + + def _runner(self, job, runtime_context, TMPDIR_LOCK): + # type: (Union[JobBase, WorkflowJob, CallbackJob, ExpressionJob], RuntimeContext, threading.Lock) -> None + """Job running thread.""" + try: + _logger.debug( + "job: {}, runtime_context: {}, TMPDIR_LOCK: {}".format( + job, runtime_context, TMPDIR_LOCK + ) + ) + job.run(runtime_context, TMPDIR_LOCK) + except WorkflowException as err: + _logger.exception(f"Got workflow error: {err}") + self.exceptions.append(err) + except Exception as err: # pylint: disable=broad-except + _logger.exception(f"Got workflow error: {err}") + self.exceptions.append(WorkflowException(str(err))) + finally: + if runtime_context.workflow_eval_lock: + with runtime_context.workflow_eval_lock: + if isinstance(job, JobBase): + ram = job.builder.resources["ram"] + if not isinstance(ram, str): + self.allocated_ram -= ram + cores = job.builder.resources["cores"] + if not isinstance(cores, str): + self.allocated_cores -= cores + runtime_context.workflow_eval_lock.notifyAll() + + def run_job( + self, + job: Optional[JobsType], + runtime_context: RuntimeContext, + ) -> None: + """Execute a single Job in a seperate thread.""" + if job is not None: + with self.pending_jobs_lock: + self.pending_jobs.append(job) + + with self.pending_jobs_lock: + n = 0 + while (n + 1) <= len(self.pending_jobs): + # Simple greedy resource allocation strategy. Go + # through pending jobs in the order they were + # generated and add them to the queue only if there + # are resources available. + job = self.pending_jobs[n] + if isinstance(job, JobBase): + ram = job.builder.resources["ram"] + cores = job.builder.resources["cores"] + if (not isinstance(ram, str) and ram > self.max_ram) or ( + not isinstance(cores, str) and cores > self.max_cores + ): + _logger.error( + 'Job "%s" cannot be run, requests more resources (%s) ' + "than available on this host (max ram %d, max cores %d", + job.name, + job.builder.resources, + self.allocated_ram, + self.allocated_cores, + self.max_ram, + self.max_cores, + ) + self.pending_jobs.remove(job) + return + + if ( + not isinstance(ram, str) + and self.allocated_ram + ram > self.max_ram + ) or ( + not isinstance(cores, str) + and self.allocated_cores + cores > self.max_cores + ): + _logger.debug( + 'Job "%s" cannot run yet, resources (%s) are not ' + "available (already allocated ram is %d, allocated cores is %d, " + "max ram %d, max cores %d", + job.name, + job.builder.resources, + self.allocated_ram, + self.allocated_cores, + self.max_ram, + self.max_cores, + ) + n += 1 + continue + + if isinstance(job, JobBase): + ram = job.builder.resources["ram"] + if not isinstance(ram, str): + self.allocated_ram += ram + cores = job.builder.resources["cores"] + if not isinstance(cores, str): + self.allocated_cores += cores + self.taskqueue.add( + functools.partial(self._runner, job, runtime_context, TMPDIR_LOCK), + runtime_context.workflow_eval_lock, + ) + self.pending_jobs.remove(job) + + def wait_for_next_completion(self, runtime_context): + # type: (RuntimeContext) -> None + """Wait for jobs to finish.""" + if runtime_context.workflow_eval_lock is not None: + runtime_context.workflow_eval_lock.wait(timeout=3) + if self.exceptions: + raise self.exceptions[0] + + def run_jobs( + self, + process: Process, + job_order_object: CWLObjectType, + logger: logging.Logger, + runtime_context: RuntimeContext, + ) -> None: + + self.taskqueue = TaskQueue( + threading.Lock(), psutil.cpu_count() + ) # type: TaskQueue + try: + + jobiter = process.job( + job_order_object, self.output_callback, runtime_context + ) + + if runtime_context.workflow_eval_lock is None: + raise WorkflowException( + "runtimeContext.workflow_eval_lock must not be None" + ) + + runtime_context.workflow_eval_lock.acquire() + for job in jobiter: + if job is not None: + if isinstance(job, JobBase): + job.builder = runtime_context.builder or job.builder + if job.outdir is not None: + self.output_dirs.add(job.outdir) + + self.run_job(job, runtime_context) + + if job is None: + if self.taskqueue.in_flight > 0: + self.wait_for_next_completion(runtime_context) + else: + logger.error("Workflow cannot make any more progress.") + break + + self.run_job(None, runtime_context) + while self.taskqueue.in_flight > 0: + self.wait_for_next_completion(runtime_context) + self.run_job(None, runtime_context) + + runtime_context.workflow_eval_lock.release() + finally: + self.taskqueue.drain() + self.taskqueue.join() + + +class NoopJobExecutor(JobExecutor): + """Do nothing executor, for testing purposes only.""" + + def run_jobs( + self, + process: Process, + job_order_object: CWLObjectType, + logger: logging.Logger, + runtime_context: RuntimeContext, + ) -> None: + pass + + def execute( + self, + process: Process, + job_order_object: CWLObjectType, + runtime_context: RuntimeContext, + logger: Optional[logging.Logger] = None, + ) -> Tuple[Optional[CWLObjectType], str]: + return {}, "success"