Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/procgenerator.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/cwltool/procgenerator.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,123 +0,0 @@ -import copy -from .process import Process, shortname -from .load_tool import load_tool -from schema_salad import validate -from schema_salad.sourceline import indent -from .errors import WorkflowException -from .context import RuntimeContext, LoadingContext -from typing import (Any, Callable, Dict, Generator, Iterable, List, - Mapping, MutableMapping, MutableSequence, - Optional, Tuple, Union, cast) -from .loghandler import _logger -from typing_extensions import Text # pylint: disable=unused-import - -class ProcessGeneratorJob(object): - def __init__(self, procgenerator): - # type: (ProcessGenerator) -> None - self.procgenerator = procgenerator - self.jobout = None # type: Optional[Dict[Text, Any]] - self.processStatus = None # type: Optional[Text] - - def receive_output(self, jobout, processStatus): - # type: (Dict[Text, Any], Text) -> None - self.jobout = jobout - self.processStatus = processStatus - - def job(self, - job_order, # type: Mapping[Text, Any] - output_callbacks, # type: Callable[[Any, Any], Any] - runtimeContext # type: RuntimeContext - ): # type: (...) -> Generator[Any, None, None] - # FIXME: Declare base type for what Generator yields - - try: - for tool in self.procgenerator.embedded_tool.job( - job_order, - self.receive_output, - runtimeContext): - yield tool - - while self.processStatus is None: - yield None - - if self.processStatus != "success": - output_callbacks(self.jobout, self.processStatus) - return - - if self.jobout is None: - raise WorkflowException("jobout should not be None") - - created_tool, runinputs = self.procgenerator.result(job_order, self.jobout, runtimeContext) - - for tool in created_tool.job( - runinputs, - output_callbacks, - runtimeContext): - yield tool - - except WorkflowException: - raise - except Exception as exc: - _logger.exception("Unexpected exception") - raise WorkflowException(Text(exc)) - - -class ProcessGenerator(Process): - def __init__(self, - toolpath_object, # type: MutableMapping[Text, Any] - loadingContext # type: LoadingContext - ): # type: (...) -> None - super(ProcessGenerator, self).__init__( - toolpath_object, loadingContext) - self.loadingContext = loadingContext # type: LoadingContext - try: - if isinstance(toolpath_object["run"], MutableMapping): - self.embedded_tool = loadingContext.construct_tool_object( - toolpath_object["run"], loadingContext) # type: Process - else: - loadingContext.metadata = {} - self.embedded_tool = load_tool( - toolpath_object["run"], loadingContext) - except validate.ValidationException as vexc: - if loadingContext.debug: - _logger.exception("Validation exception") - raise WorkflowException( - u"Tool definition %s failed validation:\n%s" % - (toolpath_object["run"], indent(str(vexc)))) - - def job(self, - job_order, # type: Mapping[Text, Text] - output_callbacks, # type: Callable[[Any, Any], Any] - runtimeContext # type: RuntimeContext - ): # type: (...) -> Generator[Any, None, None] - # FIXME: Declare base type for what Generator yields - return ProcessGeneratorJob(self).job(job_order, output_callbacks, runtimeContext) - - def result(self, - job_order, # type: Mapping[Text, Any] - jobout, # type: Mapping[Text, Any] - runtimeContext # type: RuntimeContext - ): # type: (...) -> Tuple[Process, MutableMapping[Text, Any]] - try: - loadingContext = self.loadingContext.copy() - loadingContext.metadata = {} - embedded_tool = load_tool( - jobout["runProcess"]["location"], loadingContext) - except validate.ValidationException as vexc: - if runtimeContext.debug: - _logger.exception("Validation exception") - raise WorkflowException( - u"Tool definition %s failed validation:\n%s" % - (jobout["runProcess"], indent(str(vexc)))) - - if "runInputs" in jobout: - runinputs = cast(MutableMapping[Text, Any], jobout["runInputs"]) - else: - runinputs = cast(MutableMapping[Text, Any], copy.deepcopy(job_order)) - for i in self.embedded_tool.tool["inputs"]: - if shortname(i["id"]) in runinputs: - del runinputs[shortname(i["id"])] - if "id" in runinputs: - del runinputs["id"] - - return embedded_tool, runinputs
