Mercurial > repos > shellac > sam_consensus_v3
diff env/lib/python3.9/site-packages/cwltool/procgenerator.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.9/site-packages/cwltool/procgenerator.py Mon Mar 22 18:12:50 2021 +0000 @@ -0,0 +1,131 @@ +import copy +from typing import Dict, Optional, Tuple, cast + +from ruamel.yaml.comments import CommentedMap +from schema_salad.exceptions import ValidationException +from schema_salad.sourceline import indent + +from .context import LoadingContext, RuntimeContext +from .errors import WorkflowException +from .load_tool import load_tool +from .loghandler import _logger +from .process import Process, shortname +from .utils import CWLObjectType, JobsGeneratorType, OutputCallbackType + + +class ProcessGeneratorJob: + """Result of ProcessGenerator.job().""" + + def __init__(self, procgenerator: "ProcessGenerator") -> None: + """Create a ProccessGenerator Job.""" + self.procgenerator = procgenerator + self.jobout = None # type: Optional[CWLObjectType] + self.processStatus = None # type: Optional[str] + + def receive_output( + self, jobout: Optional[CWLObjectType], processStatus: str + ) -> None: + self.jobout = jobout + self.processStatus = processStatus + + def job( + self, + job_order: CWLObjectType, + output_callbacks: Optional[OutputCallbackType], + runtimeContext: RuntimeContext, + ) -> JobsGeneratorType: + + try: + yield from self.procgenerator.embedded_tool.job( + job_order, self.receive_output, runtimeContext + ) + + while self.processStatus is None: + yield None + + if self.processStatus != "success" and output_callbacks: + output_callbacks(self.jobout, self.processStatus) + return + + if self.jobout is None: + raise WorkflowException("jobout should not be None") + + created_tool, runinputs = self.procgenerator.result( + job_order, self.jobout, runtimeContext + ) + + yield from created_tool.job(runinputs, output_callbacks, runtimeContext) + + except WorkflowException: + raise + except Exception as exc: + _logger.exception("Unexpected exception") + raise WorkflowException(str(exc)) + + +class ProcessGenerator(Process): + def __init__( + self, + toolpath_object: CommentedMap, + loadingContext: LoadingContext, + ) -> None: + """Create a ProcessGenerator from the given dictionary and context.""" + super().__init__(toolpath_object, loadingContext) + self.loadingContext = loadingContext # type: LoadingContext + try: + if isinstance(toolpath_object["run"], CommentedMap): + self.embedded_tool = loadingContext.construct_tool_object( + toolpath_object["run"], loadingContext + ) # type: Process + else: + loadingContext.metadata = {} + self.embedded_tool = load_tool(toolpath_object["run"], loadingContext) + except ValidationException as vexc: + if loadingContext.debug: + _logger.exception("Validation exception") + raise WorkflowException( + "Tool definition %s failed validation:\n%s" + % (toolpath_object["run"], indent(str(vexc))) + ) + + def job( + self, + job_order: CWLObjectType, + output_callbacks: Optional[OutputCallbackType], + runtimeContext: RuntimeContext, + ) -> JobsGeneratorType: + return ProcessGeneratorJob(self).job( + job_order, output_callbacks, runtimeContext + ) + + def result( + self, + job_order: CWLObjectType, + jobout: CWLObjectType, + runtimeContext: RuntimeContext, + ) -> Tuple[Process, CWLObjectType]: + try: + loadingContext = self.loadingContext.copy() + loadingContext.metadata = {} + embedded_tool = load_tool( + cast(Dict[str, str], jobout["runProcess"])["location"], loadingContext + ) + except ValidationException as vexc: + if runtimeContext.debug: + _logger.exception("Validation exception") + raise WorkflowException( + "Tool definition %s failed validation:\n%s" + % (jobout["runProcess"], indent(str(vexc))) + ) + + if "runInputs" in jobout: + runinputs = cast(CWLObjectType, jobout["runInputs"]) + else: + runinputs = copy.deepcopy(job_order) + for i in self.embedded_tool.tool["inputs"]: + if shortname(i["id"]) in runinputs: + del runinputs[shortname(i["id"])] + if "id" in runinputs: + del runinputs["id"] + + return embedded_tool, runinputs