view env/lib/python3.9/site-packages/cwltool/executors.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

"""Single and multi-threaded executors."""
import datetime
import functools
import logging
import math
import os
import threading
from abc import ABCMeta, abstractmethod
from threading import Lock
from typing import (
    Dict,
    Iterable,
    List,
    MutableSequence,
    Optional,
    Set,
    Tuple,
    Union,
    cast,
)

import psutil
from schema_salad.exceptions import ValidationException
from schema_salad.sourceline import SourceLine

from .command_line_tool import CallbackJob, ExpressionJob
from .context import RuntimeContext, getdefault
from .errors import WorkflowException
from .job import JobBase
from .loghandler import _logger
from .mutation import MutationManager
from .process import Process, cleanIntermediate, relocateOutputs
from .provenance_profile import ProvenanceProfile
from .task_queue import TaskQueue
from .utils import CWLObjectType, JobsType
from .workflow import Workflow
from .workflow_job import WorkflowJob, WorkflowJobStep

TMPDIR_LOCK = Lock()


class JobExecutor(metaclass=ABCMeta):
    """Abstract base job executor."""

    def __init__(self) -> None:
        """Initialize."""
        self.final_output = []  # type: MutableSequence[Optional[CWLObjectType]]
        self.final_status = []  # type: List[str]
        self.output_dirs = set()  # type: Set[str]

    def __call__(
        self,
        process: Process,
        job_order_object: CWLObjectType,
        runtime_context: RuntimeContext,
        logger: logging.Logger = _logger,
    ) -> Tuple[Optional[CWLObjectType], str]:

        return self.execute(process, job_order_object, runtime_context, logger)

    def output_callback(
        self, out: Optional[CWLObjectType], process_status: str
    ) -> None:
        """Collect the final status and outputs."""
        self.final_status.append(process_status)
        self.final_output.append(out)

    @abstractmethod
    def run_jobs(
        self,
        process: Process,
        job_order_object: CWLObjectType,
        logger: logging.Logger,
        runtime_context: RuntimeContext,
    ) -> None:
        """Execute the jobs for the given Process."""

    def execute(
        self,
        process: Process,
        job_order_object: CWLObjectType,
        runtime_context: RuntimeContext,
        logger: logging.Logger = _logger,
    ) -> Tuple[Union[Optional[CWLObjectType]], str]:
        """Execute the process."""
        if not runtime_context.basedir:
            raise WorkflowException("Must provide 'basedir' in runtimeContext")

        def check_for_abstract_op(tool: CWLObjectType) -> None:
            if tool["class"] == "Operation":
                raise SourceLine(tool, "class", WorkflowException).makeError(
                    "Workflow has unrunnable abstract Operation"
                )

        process.visit(check_for_abstract_op)

        finaloutdir = None  # Type: Optional[str]
        original_outdir = runtime_context.outdir
        if isinstance(original_outdir, str):
            finaloutdir = os.path.abspath(original_outdir)
        runtime_context = runtime_context.copy()
        outdir = runtime_context.create_outdir()
        self.output_dirs.add(outdir)
        runtime_context.outdir = outdir
        runtime_context.mutation_manager = MutationManager()
        runtime_context.toplevel = True
        runtime_context.workflow_eval_lock = threading.Condition(threading.RLock())

        job_reqs = None  # type: Optional[List[CWLObjectType]]
        if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
            if (
                process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion")
                == "v1.0"
            ):
                raise WorkflowException(
                    "`cwl:requirements` in the input object is not part of CWL "
                    "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
                    "can set the cwlVersion to v1.1"
                )
            job_reqs = cast(
                List[CWLObjectType],
                job_order_object["https://w3id.org/cwl/cwl#requirements"],
            )
        elif (
            "cwl:defaults" in process.metadata
            and "https://w3id.org/cwl/cwl#requirements"
            in cast(CWLObjectType, process.metadata["cwl:defaults"])
        ):
            if (
                process.metadata.get("http://commonwl.org/cwltool#original_cwlVersion")
                == "v1.0"
            ):
                raise WorkflowException(
                    "`cwl:requirements` in the input object is not part of CWL "
                    "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
                    "can set the cwlVersion to v1.1"
                )
            job_reqs = cast(
                Optional[List[CWLObjectType]],
                cast(CWLObjectType, process.metadata["cwl:defaults"])[
                    "https://w3id.org/cwl/cwl#requirements"
                ],
            )
        if job_reqs is not None:
            for req in job_reqs:
                process.requirements.append(req)

        self.run_jobs(process, job_order_object, logger, runtime_context)

        if (
            self.final_output
            and self.final_output[0] is not None
            and finaloutdir is not None
        ):
            self.final_output[0] = relocateOutputs(
                self.final_output[0],
                finaloutdir,
                self.output_dirs,
                runtime_context.move_outputs,
                runtime_context.make_fs_access(""),
                getdefault(runtime_context.compute_checksum, True),
                path_mapper=runtime_context.path_mapper,
            )

        if runtime_context.rm_tmpdir:
            if runtime_context.cachedir is None:
                output_dirs = self.output_dirs  # type: Iterable[str]
            else:
                output_dirs = filter(
                    lambda x: not x.startswith(runtime_context.cachedir),  # type: ignore
                    self.output_dirs,
                )
            cleanIntermediate(output_dirs)

        if self.final_output and self.final_status:

            if (
                runtime_context.research_obj is not None
                and isinstance(
                    process, (JobBase, Process, WorkflowJobStep, WorkflowJob)
                )
                and process.parent_wf
            ):
                process_run_id = None  # type: Optional[str]
                name = "primary"
                process.parent_wf.generate_output_prov(
                    self.final_output[0], process_run_id, name
                )
                process.parent_wf.document.wasEndedBy(
                    process.parent_wf.workflow_run_uri,
                    None,
                    process.parent_wf.engine_uuid,
                    datetime.datetime.now(),
                )
                process.parent_wf.finalize_prov_profile(name=None)
            return (self.final_output[0], self.final_status[0])
        return (None, "permanentFail")


class SingleJobExecutor(JobExecutor):
    """Default single-threaded CWL reference executor."""

    def run_jobs(
        self,
        process: Process,
        job_order_object: CWLObjectType,
        logger: logging.Logger,
        runtime_context: RuntimeContext,
    ) -> None:

        process_run_id = None  # type: Optional[str]

        # define provenance profile for single commandline tool
        if (
            not isinstance(process, Workflow)
            and runtime_context.research_obj is not None
        ):
            process.provenance_object = ProvenanceProfile(
                runtime_context.research_obj,
                full_name=runtime_context.cwl_full_name,
                host_provenance=False,
                user_provenance=False,
                orcid=runtime_context.orcid,
                # single tool execution, so RO UUID = wf UUID = tool UUID
                run_uuid=runtime_context.research_obj.ro_uuid,
                fsaccess=runtime_context.make_fs_access(""),
            )
            process.parent_wf = process.provenance_object
        jobiter = process.job(job_order_object, self.output_callback, runtime_context)

        try:
            for job in jobiter:
                if job is not None:
                    if runtime_context.builder is not None and hasattr(job, "builder"):
                        job.builder = runtime_context.builder  # type: ignore
                    if job.outdir is not None:
                        self.output_dirs.add(job.outdir)
                    if runtime_context.research_obj is not None:
                        if not isinstance(process, Workflow):
                            prov_obj = process.provenance_object
                        else:
                            prov_obj = job.prov_obj
                        if prov_obj:
                            runtime_context.prov_obj = prov_obj
                            prov_obj.fsaccess = runtime_context.make_fs_access("")
                            prov_obj.evaluate(
                                process,
                                job,
                                job_order_object,
                                runtime_context.research_obj,
                            )
                            process_run_id = prov_obj.record_process_start(process, job)
                            runtime_context = runtime_context.copy()
                        runtime_context.process_run_id = process_run_id
                    job.run(runtime_context)
                else:
                    logger.error("Workflow cannot make any more progress.")
                    break
        except (
            ValidationException,
            WorkflowException,
        ):  # pylint: disable=try-except-raise
            raise
        except Exception as err:
            logger.exception("Got workflow error")
            raise WorkflowException(str(err)) from err


class MultithreadedJobExecutor(JobExecutor):
    """
    Experimental multi-threaded CWL executor.

    Does simple resource accounting, will not start a job unless it
    has cores / ram available, but does not make any attempt to
    optimize usage.
    """

    def __init__(self) -> None:
        """Initialize."""
        super().__init__()
        self.exceptions = []  # type: List[WorkflowException]
        self.pending_jobs = []  # type: List[JobsType]
        self.pending_jobs_lock = threading.Lock()

        self.max_ram = int(psutil.virtual_memory().available / 2 ** 20)
        self.max_cores = float(psutil.cpu_count())
        self.allocated_ram = float(0)
        self.allocated_cores = float(0)

    def select_resources(
        self, request, runtime_context
    ):  # pylint: disable=unused-argument
        # type: (Dict[str, Union[int, float, str]], RuntimeContext) -> Dict[str, Union[int, float, str]]
        """Naïve check for available cpu cores and memory."""
        result = {}  # type: Dict[str, Union[int, float, str]]
        maxrsc = {"cores": self.max_cores, "ram": self.max_ram}
        for rsc in ("cores", "ram"):
            rsc_min = request[rsc + "Min"]
            if not isinstance(rsc_min, str) and rsc_min > maxrsc[rsc]:
                raise WorkflowException(
                    "Requested at least %d %s but only %d available"
                    % (rsc_min, rsc, maxrsc[rsc])
                )
            rsc_max = request[rsc + "Max"]
            if not isinstance(rsc_max, str) and rsc_max < maxrsc[rsc]:
                result[rsc] = math.ceil(rsc_max)
            else:
                result[rsc] = maxrsc[rsc]

        result["tmpdirSize"] = (
            math.ceil(request["tmpdirMin"])
            if not isinstance(request["tmpdirMin"], str)
            else request["tmpdirMin"]
        )
        result["outdirSize"] = (
            math.ceil(request["outdirMin"])
            if not isinstance(request["outdirMin"], str)
            else request["outdirMin"]
        )

        return result

    def _runner(self, job, runtime_context, TMPDIR_LOCK):
        # type: (Union[JobBase, WorkflowJob, CallbackJob, ExpressionJob], RuntimeContext, threading.Lock) -> None
        """Job running thread."""
        try:
            _logger.debug(
                "job: {}, runtime_context: {}, TMPDIR_LOCK: {}".format(
                    job, runtime_context, TMPDIR_LOCK
                )
            )
            job.run(runtime_context, TMPDIR_LOCK)
        except WorkflowException as err:
            _logger.exception(f"Got workflow error: {err}")
            self.exceptions.append(err)
        except Exception as err:  # pylint: disable=broad-except
            _logger.exception(f"Got workflow error: {err}")
            self.exceptions.append(WorkflowException(str(err)))
        finally:
            if runtime_context.workflow_eval_lock:
                with runtime_context.workflow_eval_lock:
                    if isinstance(job, JobBase):
                        ram = job.builder.resources["ram"]
                        if not isinstance(ram, str):
                            self.allocated_ram -= ram
                        cores = job.builder.resources["cores"]
                        if not isinstance(cores, str):
                            self.allocated_cores -= cores
                    runtime_context.workflow_eval_lock.notifyAll()

    def run_job(
        self,
        job: Optional[JobsType],
        runtime_context: RuntimeContext,
    ) -> None:
        """Execute a single Job in a seperate thread."""
        if job is not None:
            with self.pending_jobs_lock:
                self.pending_jobs.append(job)

        with self.pending_jobs_lock:
            n = 0
            while (n + 1) <= len(self.pending_jobs):
                # Simple greedy resource allocation strategy.  Go
                # through pending jobs in the order they were
                # generated and add them to the queue only if there
                # are resources available.
                job = self.pending_jobs[n]
                if isinstance(job, JobBase):
                    ram = job.builder.resources["ram"]
                    cores = job.builder.resources["cores"]
                    if (not isinstance(ram, str) and ram > self.max_ram) or (
                        not isinstance(cores, str) and cores > self.max_cores
                    ):
                        _logger.error(
                            'Job "%s" cannot be run, requests more resources (%s) '
                            "than available on this host (max ram %d, max cores %d",
                            job.name,
                            job.builder.resources,
                            self.allocated_ram,
                            self.allocated_cores,
                            self.max_ram,
                            self.max_cores,
                        )
                        self.pending_jobs.remove(job)
                        return

                    if (
                        not isinstance(ram, str)
                        and self.allocated_ram + ram > self.max_ram
                    ) or (
                        not isinstance(cores, str)
                        and self.allocated_cores + cores > self.max_cores
                    ):
                        _logger.debug(
                            'Job "%s" cannot run yet, resources (%s) are not '
                            "available (already allocated ram is %d, allocated cores is %d, "
                            "max ram %d, max cores %d",
                            job.name,
                            job.builder.resources,
                            self.allocated_ram,
                            self.allocated_cores,
                            self.max_ram,
                            self.max_cores,
                        )
                        n += 1
                        continue

                if isinstance(job, JobBase):
                    ram = job.builder.resources["ram"]
                    if not isinstance(ram, str):
                        self.allocated_ram += ram
                    cores = job.builder.resources["cores"]
                    if not isinstance(cores, str):
                        self.allocated_cores += cores
                self.taskqueue.add(
                    functools.partial(self._runner, job, runtime_context, TMPDIR_LOCK),
                    runtime_context.workflow_eval_lock,
                )
                self.pending_jobs.remove(job)

    def wait_for_next_completion(self, runtime_context):
        # type: (RuntimeContext) -> None
        """Wait for jobs to finish."""
        if runtime_context.workflow_eval_lock is not None:
            runtime_context.workflow_eval_lock.wait(timeout=3)
        if self.exceptions:
            raise self.exceptions[0]

    def run_jobs(
        self,
        process: Process,
        job_order_object: CWLObjectType,
        logger: logging.Logger,
        runtime_context: RuntimeContext,
    ) -> None:

        self.taskqueue = TaskQueue(
            threading.Lock(), psutil.cpu_count()
        )  # type: TaskQueue
        try:

            jobiter = process.job(
                job_order_object, self.output_callback, runtime_context
            )

            if runtime_context.workflow_eval_lock is None:
                raise WorkflowException(
                    "runtimeContext.workflow_eval_lock must not be None"
                )

            runtime_context.workflow_eval_lock.acquire()
            for job in jobiter:
                if job is not None:
                    if isinstance(job, JobBase):
                        job.builder = runtime_context.builder or job.builder
                        if job.outdir is not None:
                            self.output_dirs.add(job.outdir)

                self.run_job(job, runtime_context)

                if job is None:
                    if self.taskqueue.in_flight > 0:
                        self.wait_for_next_completion(runtime_context)
                    else:
                        logger.error("Workflow cannot make any more progress.")
                        break

            self.run_job(None, runtime_context)
            while self.taskqueue.in_flight > 0:
                self.wait_for_next_completion(runtime_context)
                self.run_job(None, runtime_context)

            runtime_context.workflow_eval_lock.release()
        finally:
            self.taskqueue.drain()
            self.taskqueue.join()


class NoopJobExecutor(JobExecutor):
    """Do nothing executor, for testing purposes only."""

    def run_jobs(
        self,
        process: Process,
        job_order_object: CWLObjectType,
        logger: logging.Logger,
        runtime_context: RuntimeContext,
    ) -> None:
        pass

    def execute(
        self,
        process: Process,
        job_order_object: CWLObjectType,
        runtime_context: RuntimeContext,
        logger: Optional[logging.Logger] = None,
    ) -> Tuple[Optional[CWLObjectType], str]:
        return {}, "success"