view env/lib/python3.9/site-packages/cwltool/procgenerator.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line source

import copy
from typing import Dict, Optional, Tuple, cast

from ruamel.yaml.comments import CommentedMap
from schema_salad.exceptions import ValidationException
from schema_salad.sourceline import indent

from .context import LoadingContext, RuntimeContext
from .errors import WorkflowException
from .load_tool import load_tool
from .loghandler import _logger
from .process import Process, shortname
from .utils import CWLObjectType, JobsGeneratorType, OutputCallbackType


class ProcessGeneratorJob:
    """Result of ProcessGenerator.job()."""

    def __init__(self, procgenerator: "ProcessGenerator") -> None:
        """Create a ProccessGenerator Job."""
        self.procgenerator = procgenerator
        self.jobout = None  # type: Optional[CWLObjectType]
        self.processStatus = None  # type: Optional[str]

    def receive_output(
        self, jobout: Optional[CWLObjectType], processStatus: str
    ) -> None:
        self.jobout = jobout
        self.processStatus = processStatus

    def job(
        self,
        job_order: CWLObjectType,
        output_callbacks: Optional[OutputCallbackType],
        runtimeContext: RuntimeContext,
    ) -> JobsGeneratorType:

        try:
            yield from self.procgenerator.embedded_tool.job(
                job_order, self.receive_output, runtimeContext
            )

            while self.processStatus is None:
                yield None

            if self.processStatus != "success" and output_callbacks:
                output_callbacks(self.jobout, self.processStatus)
                return

            if self.jobout is None:
                raise WorkflowException("jobout should not be None")

            created_tool, runinputs = self.procgenerator.result(
                job_order, self.jobout, runtimeContext
            )

            yield from created_tool.job(runinputs, output_callbacks, runtimeContext)

        except WorkflowException:
            raise
        except Exception as exc:
            _logger.exception("Unexpected exception")
            raise WorkflowException(str(exc))


class ProcessGenerator(Process):
    def __init__(
        self,
        toolpath_object: CommentedMap,
        loadingContext: LoadingContext,
    ) -> None:
        """Create a ProcessGenerator from the given dictionary and context."""
        super().__init__(toolpath_object, loadingContext)
        self.loadingContext = loadingContext  # type: LoadingContext
        try:
            if isinstance(toolpath_object["run"], CommentedMap):
                self.embedded_tool = loadingContext.construct_tool_object(
                    toolpath_object["run"], loadingContext
                )  # type: Process
            else:
                loadingContext.metadata = {}
                self.embedded_tool = load_tool(toolpath_object["run"], loadingContext)
        except ValidationException as vexc:
            if loadingContext.debug:
                _logger.exception("Validation exception")
            raise WorkflowException(
                "Tool definition %s failed validation:\n%s"
                % (toolpath_object["run"], indent(str(vexc)))
            )

    def job(
        self,
        job_order: CWLObjectType,
        output_callbacks: Optional[OutputCallbackType],
        runtimeContext: RuntimeContext,
    ) -> JobsGeneratorType:
        return ProcessGeneratorJob(self).job(
            job_order, output_callbacks, runtimeContext
        )

    def result(
        self,
        job_order: CWLObjectType,
        jobout: CWLObjectType,
        runtimeContext: RuntimeContext,
    ) -> Tuple[Process, CWLObjectType]:
        try:
            loadingContext = self.loadingContext.copy()
            loadingContext.metadata = {}
            embedded_tool = load_tool(
                cast(Dict[str, str], jobout["runProcess"])["location"], loadingContext
            )
        except ValidationException as vexc:
            if runtimeContext.debug:
                _logger.exception("Validation exception")
            raise WorkflowException(
                "Tool definition %s failed validation:\n%s"
                % (jobout["runProcess"], indent(str(vexc)))
            )

        if "runInputs" in jobout:
            runinputs = cast(CWLObjectType, jobout["runInputs"])
        else:
            runinputs = copy.deepcopy(job_order)
            for i in self.embedded_tool.tool["inputs"]:
                if shortname(i["id"]) in runinputs:
                    del runinputs[shortname(i["id"])]
            if "id" in runinputs:
                del runinputs["id"]

        return embedded_tool, runinputs