diff env/lib/python3.9/site-packages/cwltool/executors.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/cwltool/executors.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,499 @@
+"""Single and multi-threaded executors."""
+import datetime
+import functools
+import logging
+import math
+import os
+import threading
+from abc import ABCMeta, abstractmethod
+from threading import Lock
+from typing import (
+    Dict,
+    Iterable,
+    List,
+    MutableSequence,
+    Optional,
+    Set,
+    Tuple,
+    Union,
+    cast,
+)
+
+import psutil
+from schema_salad.exceptions import ValidationException
+from schema_salad.sourceline import SourceLine
+
+from .command_line_tool import CallbackJob, ExpressionJob
+from .context import RuntimeContext, getdefault
+from .errors import WorkflowException
+from .job import JobBase
+from .loghandler import _logger
+from .mutation import MutationManager
+from .process import Process, cleanIntermediate, relocateOutputs
+from .provenance_profile import ProvenanceProfile
+from .task_queue import TaskQueue
+from .utils import CWLObjectType, JobsType
+from .workflow import Workflow
+from .workflow_job import WorkflowJob, WorkflowJobStep
+
+TMPDIR_LOCK = Lock()
+
+
+class JobExecutor(metaclass=ABCMeta):
+    """Abstract base job executor."""
+
+    def __init__(self) -> None:
+        """Initialize."""
+        self.final_output = []  # type: MutableSequence[Optional[CWLObjectType]]
+        self.final_status = []  # type: List[str]
+        self.output_dirs = set()  # type: Set[str]
+
+    def __call__(
+        self,
+        process: Process,
+        job_order_object: CWLObjectType,
+        runtime_context: RuntimeContext,
+        logger: logging.Logger = _logger,
+    ) -> Tuple[Optional[CWLObjectType], str]:
+
+        return self.execute(process, job_order_object, runtime_context, logger)
+
+    def output_callback(
+        self, out: Optional[CWLObjectType], process_status: str
+    ) -> None:
+        """Collect the final status and outputs."""
+        self.final_status.append(process_status)
+        self.final_output.append(out)
+
+    @abstractmethod
+    def run_jobs(
+        self,
+        process: Process,
+        job_order_object: CWLObjectType,
+        logger: logging.Logger,
+        runtime_context: RuntimeContext,
+    ) -> None:
+        """Execute the jobs for the given Process."""
+
+    def execute(
+        self,
+        process: Process,
+        job_order_object: CWLObjectType,
+        runtime_context: RuntimeContext,
+        logger: logging.Logger = _logger,
+    ) -> Tuple[Union[Optional[CWLObjectType]], str]:
+        """Execute the process."""
+        if not runtime_context.basedir:
+            raise WorkflowException("Must provide 'basedir' in runtimeContext")
+
+        def check_for_abstract_op(tool: CWLObjectType) -> None:
+            if tool["class"] == "Operation":
+                raise SourceLine(tool, "class", WorkflowException).makeError(
+                    "Workflow has unrunnable abstract Operation"
+                )
+
+        process.visit(check_for_abstract_op)
+
+        finaloutdir = None  # Type: Optional[str]
+        original_outdir = runtime_context.outdir
+        if isinstance(original_outdir, str):
+            finaloutdir = os.path.abspath(original_outdir)
+        runtime_context = runtime_context.copy()
+        outdir = runtime_context.create_outdir()
+        self.output_dirs.add(outdir)
+        runtime_context.outdir = outdir
+        runtime_context.mutation_manager = MutationManager()
+        runtime_context.toplevel = True
+        runtime_context.workflow_eval_lock = threading.Condition(threading.RLock())
+
+        job_reqs = None  # type: Optional[List[CWLObjectType]]
+        if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
+            if (
+                process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion")
+                == "v1.0"
+            ):
+                raise WorkflowException(
+                    "`cwl:requirements` in the input object is not part of CWL "
+                    "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
+                    "can set the cwlVersion to v1.1"
+                )
+            job_reqs = cast(
+                List[CWLObjectType],
+                job_order_object["https://w3id.org/cwl/cwl#requirements"],
+            )
+        elif (
+            "cwl:defaults" in process.metadata
+            and "https://w3id.org/cwl/cwl#requirements"
+            in cast(CWLObjectType, process.metadata["cwl:defaults"])
+        ):
+            if (
+                process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion")
+                == "v1.0"
+            ):
+                raise WorkflowException(
+                    "`cwl:requirements` in the input object is not part of CWL "
+                    "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
+                    "can set the cwlVersion to v1.1"
+                )
+            job_reqs = cast(
+                Optional[List[CWLObjectType]],
+                cast(CWLObjectType, process.metadata["cwl:defaults"])[
+                    "https://w3id.org/cwl/cwl#requirements"
+                ],
+            )
+        if job_reqs is not None:
+            for req in job_reqs:
+                process.requirements.append(req)
+
+        self.run_jobs(process, job_order_object, logger, runtime_context)
+
+        if (
+            self.final_output
+            and self.final_output[0] is not None
+            and finaloutdir is not None
+        ):
+            self.final_output[0] = relocateOutputs(
+                self.final_output[0],
+                finaloutdir,
+                self.output_dirs,
+                runtime_context.move_outputs,
+                runtime_context.make_fs_access(""),
+                getdefault(runtime_context.compute_checksum, True),
+                path_mapper=runtime_context.path_mapper,
+            )
+
+        if runtime_context.rm_tmpdir:
+            if runtime_context.cachedir is None:
+                output_dirs = self.output_dirs  # type: Iterable[str]
+            else:
+                output_dirs = filter(
+                    lambda x: not x.startswith(runtime_context.cachedir),  # type: ignore
+                    self.output_dirs,
+                )
+            cleanIntermediate(output_dirs)
+
+        if self.final_output and self.final_status:
+
+            if (
+                runtime_context.research_obj is not None
+                and isinstance(
+                    process, (JobBase, Process, WorkflowJobStep, WorkflowJob)
+                )
+                and process.parent_wf
+            ):
+                process_run_id = None  # type: Optional[str]
+                name = "primary"
+                process.parent_wf.generate_output_prov(
+                    self.final_output[0], process_run_id, name
+                )
+                process.parent_wf.document.wasEndedBy(
+                    process.parent_wf.workflow_run_uri,
+                    None,
+                    process.parent_wf.engine_uuid,
+                    datetime.datetime.now(),
+                )
+                process.parent_wf.finalize_prov_profile(name=None)
+            return (self.final_output[0], self.final_status[0])
+        return (None, "permanentFail")
+
+
+class SingleJobExecutor(JobExecutor):
+    """Default single-threaded CWL reference executor."""
+
+    def run_jobs(
+        self,
+        process: Process,
+        job_order_object: CWLObjectType,
+        logger: logging.Logger,
+        runtime_context: RuntimeContext,
+    ) -> None:
+
+        process_run_id = None  # type: Optional[str]
+
+        # define provenance profile for single commandline tool
+        if (
+            not isinstance(process, Workflow)
+            and runtime_context.research_obj is not None
+        ):
+            process.provenance_object = ProvenanceProfile(
+                runtime_context.research_obj,
+                full_name=runtime_context.cwl_full_name,
+                host_provenance=False,
+                user_provenance=False,
+                orcid=runtime_context.orcid,
+                # single tool execution, so RO UUID = wf UUID = tool UUID
+                run_uuid=runtime_context.research_obj.ro_uuid,
+                fsaccess=runtime_context.make_fs_access(""),
+            )
+            process.parent_wf = process.provenance_object
+        jobiter = process.job(job_order_object, self.output_callback, runtime_context)
+
+        try:
+            for job in jobiter:
+                if job is not None:
+                    if runtime_context.builder is not None and hasattr(job, "builder"):
+                        job.builder = runtime_context.builder  # type: ignore
+                    if job.outdir is not None:
+                        self.output_dirs.add(job.outdir)
+                    if runtime_context.research_obj is not None:
+                        if not isinstance(process, Workflow):
+                            prov_obj = process.provenance_object
+                        else:
+                            prov_obj = job.prov_obj
+                        if prov_obj:
+                            runtime_context.prov_obj = prov_obj
+                            prov_obj.fsaccess = runtime_context.make_fs_access("")
+                            prov_obj.evaluate(
+                                process,
+                                job,
+                                job_order_object,
+                                runtime_context.research_obj,
+                            )
+                            process_run_id = prov_obj.record_process_start(process, job)
+                            runtime_context = runtime_context.copy()
+                        runtime_context.process_run_id = process_run_id
+                    job.run(runtime_context)
+                else:
+                    logger.error("Workflow cannot make any more progress.")
+                    break
+        except (
+            ValidationException,
+            WorkflowException,
+        ):  # pylint: disable=try-except-raise
+            raise
+        except Exception as err:
+            logger.exception("Got workflow error")
+            raise WorkflowException(str(err)) from err
+
+
+class MultithreadedJobExecutor(JobExecutor):
+    """
+    Experimental multi-threaded CWL executor.
+
+    Does simple resource accounting, will not start a job unless it
+    has cores / ram available, but does not make any attempt to
+    optimize usage.
+    """
+
+    def __init__(self) -> None:
+        """Initialize."""
+        super().__init__()
+        self.exceptions = []  # type: List[WorkflowException]
+        self.pending_jobs = []  # type: List[JobsType]
+        self.pending_jobs_lock = threading.Lock()
+
+        self.max_ram = int(psutil.virtual_memory().available / 2 ** 20)
+        self.max_cores = float(psutil.cpu_count())
+        self.allocated_ram = float(0)
+        self.allocated_cores = float(0)
+
+    def select_resources(
+        self, request, runtime_context
+    ):  # pylint: disable=unused-argument
+        # type: (Dict[str, Union[int, float, str]], RuntimeContext) -> Dict[str, Union[int, float, str]]
+        """Naïve check for available cpu cores and memory."""
+        result = {}  # type: Dict[str, Union[int, float, str]]
+        maxrsc = {"cores": self.max_cores, "ram": self.max_ram}
+        for rsc in ("cores", "ram"):
+            rsc_min = request[rsc + "Min"]
+            if not isinstance(rsc_min, str) and rsc_min > maxrsc[rsc]:
+                raise WorkflowException(
+                    "Requested at least %d %s but only %d available"
+                    % (rsc_min, rsc, maxrsc[rsc])
+                )
+            rsc_max = request[rsc + "Max"]
+            if not isinstance(rsc_max, str) and rsc_max < maxrsc[rsc]:
+                result[rsc] = math.ceil(rsc_max)
+            else:
+                result[rsc] = maxrsc[rsc]
+
+        result["tmpdirSize"] = (
+            math.ceil(request["tmpdirMin"])
+            if not isinstance(request["tmpdirMin"], str)
+            else request["tmpdirMin"]
+        )
+        result["outdirSize"] = (
+            math.ceil(request["outdirMin"])
+            if not isinstance(request["outdirMin"], str)
+            else request["outdirMin"]
+        )
+
+        return result
+
+    def _runner(self, job, runtime_context, TMPDIR_LOCK):
+        # type: (Union[JobBase, WorkflowJob, CallbackJob, ExpressionJob], RuntimeContext, threading.Lock) -> None
+        """Job running thread."""
+        try:
+            _logger.debug(
+                "job: {}, runtime_context: {}, TMPDIR_LOCK: {}".format(
+                    job, runtime_context, TMPDIR_LOCK
+                )
+            )
+            job.run(runtime_context, TMPDIR_LOCK)
+        except WorkflowException as err:
+            _logger.exception(f"Got workflow error: {err}")
+            self.exceptions.append(err)
+        except Exception as err:  # pylint: disable=broad-except
+            _logger.exception(f"Got workflow error: {err}")
+            self.exceptions.append(WorkflowException(str(err)))
+        finally:
+            if runtime_context.workflow_eval_lock:
+                with runtime_context.workflow_eval_lock:
+                    if isinstance(job, JobBase):
+                        ram = job.builder.resources["ram"]
+                        if not isinstance(ram, str):
+                            self.allocated_ram -= ram
+                        cores = job.builder.resources["cores"]
+                        if not isinstance(cores, str):
+                            self.allocated_cores -= cores
+                    runtime_context.workflow_eval_lock.notifyAll()
+
+    def run_job(
+        self,
+        job: Optional[JobsType],
+        runtime_context: RuntimeContext,
+    ) -> None:
+        """Execute a single Job in a seperate thread."""
+        if job is not None:
+            with self.pending_jobs_lock:
+                self.pending_jobs.append(job)
+
+        with self.pending_jobs_lock:
+            n = 0
+            while (n + 1) <= len(self.pending_jobs):
+                # Simple greedy resource allocation strategy.  Go
+                # through pending jobs in the order they were
+                # generated and add them to the queue only if there
+                # are resources available.
+                job = self.pending_jobs[n]
+                if isinstance(job, JobBase):
+                    ram = job.builder.resources["ram"]
+                    cores = job.builder.resources["cores"]
+                    if (not isinstance(ram, str) and ram > self.max_ram) or (
+                        not isinstance(cores, str) and cores > self.max_cores
+                    ):
+                        _logger.error(
+                            'Job "%s" cannot be run, requests more resources (%s) '
+                            "than available on this host (max ram %d, max cores %d",
+                            job.name,
+                            job.builder.resources,
+                            self.allocated_ram,
+                            self.allocated_cores,
+                            self.max_ram,
+                            self.max_cores,
+                        )
+                        self.pending_jobs.remove(job)
+                        return
+
+                    if (
+                        not isinstance(ram, str)
+                        and self.allocated_ram + ram > self.max_ram
+                    ) or (
+                        not isinstance(cores, str)
+                        and self.allocated_cores + cores > self.max_cores
+                    ):
+                        _logger.debug(
+                            'Job "%s" cannot run yet, resources (%s) are not '
+                            "available (already allocated ram is %d, allocated cores is %d, "
+                            "max ram %d, max cores %d",
+                            job.name,
+                            job.builder.resources,
+                            self.allocated_ram,
+                            self.allocated_cores,
+                            self.max_ram,
+                            self.max_cores,
+                        )
+                        n += 1
+                        continue
+
+                if isinstance(job, JobBase):
+                    ram = job.builder.resources["ram"]
+                    if not isinstance(ram, str):
+                        self.allocated_ram += ram
+                    cores = job.builder.resources["cores"]
+                    if not isinstance(cores, str):
+                        self.allocated_cores += cores
+                self.taskqueue.add(
+                    functools.partial(self._runner, job, runtime_context, TMPDIR_LOCK),
+                    runtime_context.workflow_eval_lock,
+                )
+                self.pending_jobs.remove(job)
+
+    def wait_for_next_completion(self, runtime_context):
+        # type: (RuntimeContext) -> None
+        """Wait for jobs to finish."""
+        if runtime_context.workflow_eval_lock is not None:
+            runtime_context.workflow_eval_lock.wait(timeout=3)
+        if self.exceptions:
+            raise self.exceptions[0]
+
+    def run_jobs(
+        self,
+        process: Process,
+        job_order_object: CWLObjectType,
+        logger: logging.Logger,
+        runtime_context: RuntimeContext,
+    ) -> None:
+
+        self.taskqueue = TaskQueue(
+            threading.Lock(), psutil.cpu_count()
+        )  # type: TaskQueue
+        try:
+
+            jobiter = process.job(
+                job_order_object, self.output_callback, runtime_context
+            )
+
+            if runtime_context.workflow_eval_lock is None:
+                raise WorkflowException(
+                    "runtimeContext.workflow_eval_lock must not be None"
+                )
+
+            runtime_context.workflow_eval_lock.acquire()
+            for job in jobiter:
+                if job is not None:
+                    if isinstance(job, JobBase):
+                        job.builder = runtime_context.builder or job.builder
+                        if job.outdir is not None:
+                            self.output_dirs.add(job.outdir)
+
+                self.run_job(job, runtime_context)
+
+                if job is None:
+                    if self.taskqueue.in_flight > 0:
+                        self.wait_for_next_completion(runtime_context)
+                    else:
+                        logger.error("Workflow cannot make any more progress.")
+                        break
+
+            self.run_job(None, runtime_context)
+            while self.taskqueue.in_flight > 0:
+                self.wait_for_next_completion(runtime_context)
+                self.run_job(None, runtime_context)
+
+            runtime_context.workflow_eval_lock.release()
+        finally:
+            self.taskqueue.drain()
+            self.taskqueue.join()
+
+
+class NoopJobExecutor(JobExecutor):
+    """Do nothing executor, for testing purposes only."""
+
+    def run_jobs(
+        self,
+        process: Process,
+        job_order_object: CWLObjectType,
+        logger: logging.Logger,
+        runtime_context: RuntimeContext,
+    ) -> None:
+        pass
+
+    def execute(
+        self,
+        process: Process,
+        job_order_object: CWLObjectType,
+        runtime_context: RuntimeContext,
+        logger: Optional[logging.Logger] = None,
+    ) -> Tuple[Optional[CWLObjectType], str]:
+        return {}, "success"