Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/procgenerator.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 import copy | |
2 from typing import Dict, Optional, Tuple, cast | |
3 | |
4 from ruamel.yaml.comments import CommentedMap | |
5 from schema_salad.exceptions import ValidationException | |
6 from schema_salad.sourceline import indent | |
7 | |
8 from .context import LoadingContext, RuntimeContext | |
9 from .errors import WorkflowException | |
10 from .load_tool import load_tool | |
11 from .loghandler import _logger | |
12 from .process import Process, shortname | |
13 from .utils import CWLObjectType, JobsGeneratorType, OutputCallbackType | |
14 | |
15 | |
16 class ProcessGeneratorJob: | |
17 """Result of ProcessGenerator.job().""" | |
18 | |
19 def __init__(self, procgenerator: "ProcessGenerator") -> None: | |
20 """Create a ProccessGenerator Job.""" | |
21 self.procgenerator = procgenerator | |
22 self.jobout = None # type: Optional[CWLObjectType] | |
23 self.processStatus = None # type: Optional[str] | |
24 | |
25 def receive_output( | |
26 self, jobout: Optional[CWLObjectType], processStatus: str | |
27 ) -> None: | |
28 self.jobout = jobout | |
29 self.processStatus = processStatus | |
30 | |
31 def job( | |
32 self, | |
33 job_order: CWLObjectType, | |
34 output_callbacks: Optional[OutputCallbackType], | |
35 runtimeContext: RuntimeContext, | |
36 ) -> JobsGeneratorType: | |
37 | |
38 try: | |
39 yield from self.procgenerator.embedded_tool.job( | |
40 job_order, self.receive_output, runtimeContext | |
41 ) | |
42 | |
43 while self.processStatus is None: | |
44 yield None | |
45 | |
46 if self.processStatus != "success" and output_callbacks: | |
47 output_callbacks(self.jobout, self.processStatus) | |
48 return | |
49 | |
50 if self.jobout is None: | |
51 raise WorkflowException("jobout should not be None") | |
52 | |
53 created_tool, runinputs = self.procgenerator.result( | |
54 job_order, self.jobout, runtimeContext | |
55 ) | |
56 | |
57 yield from created_tool.job(runinputs, output_callbacks, runtimeContext) | |
58 | |
59 except WorkflowException: | |
60 raise | |
61 except Exception as exc: | |
62 _logger.exception("Unexpected exception") | |
63 raise WorkflowException(str(exc)) | |
64 | |
65 | |
66 class ProcessGenerator(Process): | |
67 def __init__( | |
68 self, | |
69 toolpath_object: CommentedMap, | |
70 loadingContext: LoadingContext, | |
71 ) -> None: | |
72 """Create a ProcessGenerator from the given dictionary and context.""" | |
73 super().__init__(toolpath_object, loadingContext) | |
74 self.loadingContext = loadingContext # type: LoadingContext | |
75 try: | |
76 if isinstance(toolpath_object["run"], CommentedMap): | |
77 self.embedded_tool = loadingContext.construct_tool_object( | |
78 toolpath_object["run"], loadingContext | |
79 ) # type: Process | |
80 else: | |
81 loadingContext.metadata = {} | |
82 self.embedded_tool = load_tool(toolpath_object["run"], loadingContext) | |
83 except ValidationException as vexc: | |
84 if loadingContext.debug: | |
85 _logger.exception("Validation exception") | |
86 raise WorkflowException( | |
87 "Tool definition %s failed validation:\n%s" | |
88 % (toolpath_object["run"], indent(str(vexc))) | |
89 ) | |
90 | |
91 def job( | |
92 self, | |
93 job_order: CWLObjectType, | |
94 output_callbacks: Optional[OutputCallbackType], | |
95 runtimeContext: RuntimeContext, | |
96 ) -> JobsGeneratorType: | |
97 return ProcessGeneratorJob(self).job( | |
98 job_order, output_callbacks, runtimeContext | |
99 ) | |
100 | |
101 def result( | |
102 self, | |
103 job_order: CWLObjectType, | |
104 jobout: CWLObjectType, | |
105 runtimeContext: RuntimeContext, | |
106 ) -> Tuple[Process, CWLObjectType]: | |
107 try: | |
108 loadingContext = self.loadingContext.copy() | |
109 loadingContext.metadata = {} | |
110 embedded_tool = load_tool( | |
111 cast(Dict[str, str], jobout["runProcess"])["location"], loadingContext | |
112 ) | |
113 except ValidationException as vexc: | |
114 if runtimeContext.debug: | |
115 _logger.exception("Validation exception") | |
116 raise WorkflowException( | |
117 "Tool definition %s failed validation:\n%s" | |
118 % (jobout["runProcess"], indent(str(vexc))) | |
119 ) | |
120 | |
121 if "runInputs" in jobout: | |
122 runinputs = cast(CWLObjectType, jobout["runInputs"]) | |
123 else: | |
124 runinputs = copy.deepcopy(job_order) | |
125 for i in self.embedded_tool.tool["inputs"]: | |
126 if shortname(i["id"]) in runinputs: | |
127 del runinputs[shortname(i["id"])] | |
128 if "id" in runinputs: | |
129 del runinputs["id"] | |
130 | |
131 return embedded_tool, runinputs |