comparison env/lib/python3.9/site-packages/cwltool/procgenerator.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import copy
2 from typing import Dict, Optional, Tuple, cast
3
4 from ruamel.yaml.comments import CommentedMap
5 from schema_salad.exceptions import ValidationException
6 from schema_salad.sourceline import indent
7
8 from .context import LoadingContext, RuntimeContext
9 from .errors import WorkflowException
10 from .load_tool import load_tool
11 from .loghandler import _logger
12 from .process import Process, shortname
13 from .utils import CWLObjectType, JobsGeneratorType, OutputCallbackType
14
15
16 class ProcessGeneratorJob:
17 """Result of ProcessGenerator.job()."""
18
19 def __init__(self, procgenerator: "ProcessGenerator") -> None:
20 """Create a ProccessGenerator Job."""
21 self.procgenerator = procgenerator
22 self.jobout = None # type: Optional[CWLObjectType]
23 self.processStatus = None # type: Optional[str]
24
25 def receive_output(
26 self, jobout: Optional[CWLObjectType], processStatus: str
27 ) -> None:
28 self.jobout = jobout
29 self.processStatus = processStatus
30
31 def job(
32 self,
33 job_order: CWLObjectType,
34 output_callbacks: Optional[OutputCallbackType],
35 runtimeContext: RuntimeContext,
36 ) -> JobsGeneratorType:
37
38 try:
39 yield from self.procgenerator.embedded_tool.job(
40 job_order, self.receive_output, runtimeContext
41 )
42
43 while self.processStatus is None:
44 yield None
45
46 if self.processStatus != "success" and output_callbacks:
47 output_callbacks(self.jobout, self.processStatus)
48 return
49
50 if self.jobout is None:
51 raise WorkflowException("jobout should not be None")
52
53 created_tool, runinputs = self.procgenerator.result(
54 job_order, self.jobout, runtimeContext
55 )
56
57 yield from created_tool.job(runinputs, output_callbacks, runtimeContext)
58
59 except WorkflowException:
60 raise
61 except Exception as exc:
62 _logger.exception("Unexpected exception")
63 raise WorkflowException(str(exc))
64
65
66 class ProcessGenerator(Process):
67 def __init__(
68 self,
69 toolpath_object: CommentedMap,
70 loadingContext: LoadingContext,
71 ) -> None:
72 """Create a ProcessGenerator from the given dictionary and context."""
73 super().__init__(toolpath_object, loadingContext)
74 self.loadingContext = loadingContext # type: LoadingContext
75 try:
76 if isinstance(toolpath_object["run"], CommentedMap):
77 self.embedded_tool = loadingContext.construct_tool_object(
78 toolpath_object["run"], loadingContext
79 ) # type: Process
80 else:
81 loadingContext.metadata = {}
82 self.embedded_tool = load_tool(toolpath_object["run"], loadingContext)
83 except ValidationException as vexc:
84 if loadingContext.debug:
85 _logger.exception("Validation exception")
86 raise WorkflowException(
87 "Tool definition %s failed validation:\n%s"
88 % (toolpath_object["run"], indent(str(vexc)))
89 )
90
91 def job(
92 self,
93 job_order: CWLObjectType,
94 output_callbacks: Optional[OutputCallbackType],
95 runtimeContext: RuntimeContext,
96 ) -> JobsGeneratorType:
97 return ProcessGeneratorJob(self).job(
98 job_order, output_callbacks, runtimeContext
99 )
100
101 def result(
102 self,
103 job_order: CWLObjectType,
104 jobout: CWLObjectType,
105 runtimeContext: RuntimeContext,
106 ) -> Tuple[Process, CWLObjectType]:
107 try:
108 loadingContext = self.loadingContext.copy()
109 loadingContext.metadata = {}
110 embedded_tool = load_tool(
111 cast(Dict[str, str], jobout["runProcess"])["location"], loadingContext
112 )
113 except ValidationException as vexc:
114 if runtimeContext.debug:
115 _logger.exception("Validation exception")
116 raise WorkflowException(
117 "Tool definition %s failed validation:\n%s"
118 % (jobout["runProcess"], indent(str(vexc)))
119 )
120
121 if "runInputs" in jobout:
122 runinputs = cast(CWLObjectType, jobout["runInputs"])
123 else:
124 runinputs = copy.deepcopy(job_order)
125 for i in self.embedded_tool.tool["inputs"]:
126 if shortname(i["id"]) in runinputs:
127 del runinputs[shortname(i["id"])]
128 if "id" in runinputs:
129 del runinputs["id"]
130
131 return embedded_tool, runinputs