Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/procgenerator.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 import copy | |
| 2 from typing import Dict, Optional, Tuple, cast | |
| 3 | |
| 4 from ruamel.yaml.comments import CommentedMap | |
| 5 from schema_salad.exceptions import ValidationException | |
| 6 from schema_salad.sourceline import indent | |
| 7 | |
| 8 from .context import LoadingContext, RuntimeContext | |
| 9 from .errors import WorkflowException | |
| 10 from .load_tool import load_tool | |
| 11 from .loghandler import _logger | |
| 12 from .process import Process, shortname | |
| 13 from .utils import CWLObjectType, JobsGeneratorType, OutputCallbackType | |
| 14 | |
| 15 | |
| 16 class ProcessGeneratorJob: | |
| 17 """Result of ProcessGenerator.job().""" | |
| 18 | |
| 19 def __init__(self, procgenerator: "ProcessGenerator") -> None: | |
| 20 """Create a ProccessGenerator Job.""" | |
| 21 self.procgenerator = procgenerator | |
| 22 self.jobout = None # type: Optional[CWLObjectType] | |
| 23 self.processStatus = None # type: Optional[str] | |
| 24 | |
| 25 def receive_output( | |
| 26 self, jobout: Optional[CWLObjectType], processStatus: str | |
| 27 ) -> None: | |
| 28 self.jobout = jobout | |
| 29 self.processStatus = processStatus | |
| 30 | |
| 31 def job( | |
| 32 self, | |
| 33 job_order: CWLObjectType, | |
| 34 output_callbacks: Optional[OutputCallbackType], | |
| 35 runtimeContext: RuntimeContext, | |
| 36 ) -> JobsGeneratorType: | |
| 37 | |
| 38 try: | |
| 39 yield from self.procgenerator.embedded_tool.job( | |
| 40 job_order, self.receive_output, runtimeContext | |
| 41 ) | |
| 42 | |
| 43 while self.processStatus is None: | |
| 44 yield None | |
| 45 | |
| 46 if self.processStatus != "success" and output_callbacks: | |
| 47 output_callbacks(self.jobout, self.processStatus) | |
| 48 return | |
| 49 | |
| 50 if self.jobout is None: | |
| 51 raise WorkflowException("jobout should not be None") | |
| 52 | |
| 53 created_tool, runinputs = self.procgenerator.result( | |
| 54 job_order, self.jobout, runtimeContext | |
| 55 ) | |
| 56 | |
| 57 yield from created_tool.job(runinputs, output_callbacks, runtimeContext) | |
| 58 | |
| 59 except WorkflowException: | |
| 60 raise | |
| 61 except Exception as exc: | |
| 62 _logger.exception("Unexpected exception") | |
| 63 raise WorkflowException(str(exc)) | |
| 64 | |
| 65 | |
| 66 class ProcessGenerator(Process): | |
| 67 def __init__( | |
| 68 self, | |
| 69 toolpath_object: CommentedMap, | |
| 70 loadingContext: LoadingContext, | |
| 71 ) -> None: | |
| 72 """Create a ProcessGenerator from the given dictionary and context.""" | |
| 73 super().__init__(toolpath_object, loadingContext) | |
| 74 self.loadingContext = loadingContext # type: LoadingContext | |
| 75 try: | |
| 76 if isinstance(toolpath_object["run"], CommentedMap): | |
| 77 self.embedded_tool = loadingContext.construct_tool_object( | |
| 78 toolpath_object["run"], loadingContext | |
| 79 ) # type: Process | |
| 80 else: | |
| 81 loadingContext.metadata = {} | |
| 82 self.embedded_tool = load_tool(toolpath_object["run"], loadingContext) | |
| 83 except ValidationException as vexc: | |
| 84 if loadingContext.debug: | |
| 85 _logger.exception("Validation exception") | |
| 86 raise WorkflowException( | |
| 87 "Tool definition %s failed validation:\n%s" | |
| 88 % (toolpath_object["run"], indent(str(vexc))) | |
| 89 ) | |
| 90 | |
| 91 def job( | |
| 92 self, | |
| 93 job_order: CWLObjectType, | |
| 94 output_callbacks: Optional[OutputCallbackType], | |
| 95 runtimeContext: RuntimeContext, | |
| 96 ) -> JobsGeneratorType: | |
| 97 return ProcessGeneratorJob(self).job( | |
| 98 job_order, output_callbacks, runtimeContext | |
| 99 ) | |
| 100 | |
| 101 def result( | |
| 102 self, | |
| 103 job_order: CWLObjectType, | |
| 104 jobout: CWLObjectType, | |
| 105 runtimeContext: RuntimeContext, | |
| 106 ) -> Tuple[Process, CWLObjectType]: | |
| 107 try: | |
| 108 loadingContext = self.loadingContext.copy() | |
| 109 loadingContext.metadata = {} | |
| 110 embedded_tool = load_tool( | |
| 111 cast(Dict[str, str], jobout["runProcess"])["location"], loadingContext | |
| 112 ) | |
| 113 except ValidationException as vexc: | |
| 114 if runtimeContext.debug: | |
| 115 _logger.exception("Validation exception") | |
| 116 raise WorkflowException( | |
| 117 "Tool definition %s failed validation:\n%s" | |
| 118 % (jobout["runProcess"], indent(str(vexc))) | |
| 119 ) | |
| 120 | |
| 121 if "runInputs" in jobout: | |
| 122 runinputs = cast(CWLObjectType, jobout["runInputs"]) | |
| 123 else: | |
| 124 runinputs = copy.deepcopy(job_order) | |
| 125 for i in self.embedded_tool.tool["inputs"]: | |
| 126 if shortname(i["id"]) in runinputs: | |
| 127 del runinputs[shortname(i["id"])] | |
| 128 if "id" in runinputs: | |
| 129 del runinputs["id"] | |
| 130 | |
| 131 return embedded_tool, runinputs |
