comparison env/lib/python3.9/site-packages/boto/swf/layer2.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Object-oriented interface to SWF wrapping boto.swf.layer1.Layer1"""
2
3 import time
4 from functools import wraps
5 from boto.swf.layer1 import Layer1
6 from boto.swf.layer1_decisions import Layer1Decisions
7
8 DEFAULT_CREDENTIALS = {
9 'aws_access_key_id': None,
10 'aws_secret_access_key': None
11 }
12
13 def set_default_credentials(aws_access_key_id, aws_secret_access_key):
14 """Set default credentials."""
15 DEFAULT_CREDENTIALS.update({
16 'aws_access_key_id': aws_access_key_id,
17 'aws_secret_access_key': aws_secret_access_key,
18 })
19
20 class SWFBase(object):
21
22 name = None
23 domain = None
24 aws_access_key_id = None
25 aws_secret_access_key = None
26 region = None
27
28 def __init__(self, **kwargs):
29 # Set default credentials.
30 for credkey in ('aws_access_key_id', 'aws_secret_access_key'):
31 if DEFAULT_CREDENTIALS.get(credkey):
32 setattr(self, credkey, DEFAULT_CREDENTIALS[credkey])
33 # Override attributes with keyword args.
34 for kwarg in kwargs:
35 setattr(self, kwarg, kwargs[kwarg])
36
37 self._swf = Layer1(self.aws_access_key_id,
38 self.aws_secret_access_key,
39 region=self.region)
40
41 def __repr__(self):
42 rep_str = str(self.name)
43 if hasattr(self, 'version'):
44 rep_str += '-' + str(getattr(self, 'version'))
45 return '<%s %r at 0x%x>' % (self.__class__.__name__, rep_str, id(self))
46
47 class Domain(SWFBase):
48
49 """Simple Workflow Domain."""
50
51 description = None
52 retention = 30
53 @wraps(Layer1.describe_domain)
54 def describe(self):
55 """DescribeDomain."""
56 return self._swf.describe_domain(self.name)
57
58 @wraps(Layer1.deprecate_domain)
59 def deprecate(self):
60 """DeprecateDomain"""
61 self._swf.deprecate_domain(self.name)
62
63 @wraps(Layer1.register_domain)
64 def register(self):
65 """RegisterDomain."""
66 self._swf.register_domain(self.name, str(self.retention),
67 self.description)
68
69 @wraps(Layer1.list_activity_types)
70 def activities(self, status='REGISTERED', **kwargs):
71 """ListActivityTypes."""
72 act_types = self._swf.list_activity_types(self.name, status, **kwargs)
73 act_objects = []
74 for act_args in act_types['typeInfos']:
75 act_ident = act_args['activityType']
76 del act_args['activityType']
77 act_args.update(act_ident)
78 act_args.update({
79 'aws_access_key_id': self.aws_access_key_id,
80 'aws_secret_access_key': self.aws_secret_access_key,
81 'domain': self.name,
82 'region': self.region,
83 })
84 act_objects.append(ActivityType(**act_args))
85 return act_objects
86
87 @wraps(Layer1.list_workflow_types)
88 def workflows(self, status='REGISTERED', **kwargs):
89 """ListWorkflowTypes."""
90 wf_types = self._swf.list_workflow_types(self.name, status, **kwargs)
91 wf_objects = []
92 for wf_args in wf_types['typeInfos']:
93 wf_ident = wf_args['workflowType']
94 del wf_args['workflowType']
95 wf_args.update(wf_ident)
96 wf_args.update({
97 'aws_access_key_id': self.aws_access_key_id,
98 'aws_secret_access_key': self.aws_secret_access_key,
99 'domain': self.name,
100 'region': self.region,
101 })
102
103 wf_objects.append(WorkflowType(**wf_args))
104 return wf_objects
105
106 def executions(self, closed=False, **kwargs):
107 """List list open/closed executions.
108
109 For a full list of available parameters refer to
110 :py:func:`boto.swf.layer1.Layer1.list_closed_workflow_executions` and
111 :py:func:`boto.swf.layer1.Layer1.list_open_workflow_executions`
112 """
113 if closed:
114 executions = self._swf.list_closed_workflow_executions(self.name,
115 **kwargs)
116 else:
117 if 'oldest_date' not in kwargs:
118 # Last 24 hours.
119 kwargs['oldest_date'] = time.time() - (3600 * 24)
120 executions = self._swf.list_open_workflow_executions(self.name,
121 **kwargs)
122 exe_objects = []
123 for exe_args in executions['executionInfos']:
124 for nested_key in ('execution', 'workflowType'):
125 nested_dict = exe_args[nested_key]
126 del exe_args[nested_key]
127 exe_args.update(nested_dict)
128
129 exe_args.update({
130 'aws_access_key_id': self.aws_access_key_id,
131 'aws_secret_access_key': self.aws_secret_access_key,
132 'domain': self.name,
133 'region': self.region,
134 })
135
136 exe_objects.append(WorkflowExecution(**exe_args))
137 return exe_objects
138
139 @wraps(Layer1.count_pending_activity_tasks)
140 def count_pending_activity_tasks(self, task_list):
141 """CountPendingActivityTasks."""
142 return self._swf.count_pending_activity_tasks(self.name, task_list)
143
144 @wraps(Layer1.count_pending_decision_tasks)
145 def count_pending_decision_tasks(self, task_list):
146 """CountPendingDecisionTasks."""
147 return self._swf.count_pending_decision_tasks(self.name, task_list)
148
149
150 class Actor(SWFBase):
151
152 task_list = None
153 last_tasktoken = None
154 domain = None
155
156 def run(self):
157 """To be overloaded by subclasses."""
158 raise NotImplementedError()
159
160 class ActivityWorker(Actor):
161
162 """Base class for SimpleWorkflow activity workers."""
163
164 @wraps(Layer1.respond_activity_task_canceled)
165 def cancel(self, task_token=None, details=None):
166 """RespondActivityTaskCanceled."""
167 if task_token is None:
168 task_token = self.last_tasktoken
169 return self._swf.respond_activity_task_canceled(task_token, details)
170
171 @wraps(Layer1.respond_activity_task_completed)
172 def complete(self, task_token=None, result=None):
173 """RespondActivityTaskCompleted."""
174 if task_token is None:
175 task_token = self.last_tasktoken
176 return self._swf.respond_activity_task_completed(task_token, result)
177
178 @wraps(Layer1.respond_activity_task_failed)
179 def fail(self, task_token=None, details=None, reason=None):
180 """RespondActivityTaskFailed."""
181 if task_token is None:
182 task_token = self.last_tasktoken
183 return self._swf.respond_activity_task_failed(task_token, details,
184 reason)
185
186 @wraps(Layer1.record_activity_task_heartbeat)
187 def heartbeat(self, task_token=None, details=None):
188 """RecordActivityTaskHeartbeat."""
189 if task_token is None:
190 task_token = self.last_tasktoken
191 return self._swf.record_activity_task_heartbeat(task_token, details)
192
193 @wraps(Layer1.poll_for_activity_task)
194 def poll(self, **kwargs):
195 """PollForActivityTask."""
196 task_list = self.task_list
197 if 'task_list' in kwargs:
198 task_list = kwargs.get('task_list')
199 del kwargs['task_list']
200 task = self._swf.poll_for_activity_task(self.domain, task_list,
201 **kwargs)
202 self.last_tasktoken = task.get('taskToken')
203 return task
204
205 class Decider(Actor):
206
207 """Base class for SimpleWorkflow deciders."""
208
209 @wraps(Layer1.respond_decision_task_completed)
210 def complete(self, task_token=None, decisions=None, **kwargs):
211 """RespondDecisionTaskCompleted."""
212 if isinstance(decisions, Layer1Decisions):
213 # Extract decision list from a Layer1Decisions instance.
214 decisions = decisions._data
215 if task_token is None:
216 task_token = self.last_tasktoken
217 return self._swf.respond_decision_task_completed(task_token, decisions,
218 **kwargs)
219
220 @wraps(Layer1.poll_for_decision_task)
221 def poll(self, **kwargs):
222 """PollForDecisionTask."""
223 task_list = self.task_list
224 if 'task_list' in kwargs:
225 task_list = kwargs.get('task_list')
226 del kwargs['task_list']
227 decision_task = self._swf.poll_for_decision_task(self.domain, task_list,
228 **kwargs)
229 self.last_tasktoken = decision_task.get('taskToken')
230 return decision_task
231
232 class WorkflowType(SWFBase):
233
234 """A versioned workflow type."""
235
236 version = None
237 task_list = None
238 child_policy = 'TERMINATE'
239
240 @wraps(Layer1.describe_workflow_type)
241 def describe(self):
242 """DescribeWorkflowType."""
243 return self._swf.describe_workflow_type(self.domain, self.name,
244 self.version)
245 @wraps(Layer1.register_workflow_type)
246 def register(self, **kwargs):
247 """RegisterWorkflowType."""
248 args = {
249 'default_execution_start_to_close_timeout': '3600',
250 'default_task_start_to_close_timeout': '300',
251 'default_child_policy': 'TERMINATE',
252 }
253 args.update(kwargs)
254 self._swf.register_workflow_type(self.domain, self.name, self.version,
255 **args)
256
257 @wraps(Layer1.deprecate_workflow_type)
258 def deprecate(self):
259 """DeprecateWorkflowType."""
260 self._swf.deprecate_workflow_type(self.domain, self.name, self.version)
261
262 @wraps(Layer1.start_workflow_execution)
263 def start(self, **kwargs):
264 """StartWorkflowExecution."""
265 if 'workflow_id' in kwargs:
266 workflow_id = kwargs['workflow_id']
267 del kwargs['workflow_id']
268 else:
269 workflow_id = '%s-%s-%i' % (self.name, self.version, time.time())
270
271 for def_attr in ('task_list', 'child_policy'):
272 kwargs[def_attr] = kwargs.get(def_attr, getattr(self, def_attr))
273 run_id = self._swf.start_workflow_execution(self.domain, workflow_id,
274 self.name, self.version, **kwargs)['runId']
275 return WorkflowExecution(name=self.name, version=self.version,
276 runId=run_id, domain=self.domain, workflowId=workflow_id,
277 aws_access_key_id=self.aws_access_key_id,
278 aws_secret_access_key=self.aws_secret_access_key)
279
280 class WorkflowExecution(SWFBase):
281
282 """An instance of a workflow."""
283
284 workflowId = None
285 runId = None
286
287 @wraps(Layer1.signal_workflow_execution)
288 def signal(self, signame, **kwargs):
289 """SignalWorkflowExecution."""
290 self._swf.signal_workflow_execution(self.domain, signame,
291 self.workflowId, **kwargs)
292
293 @wraps(Layer1.terminate_workflow_execution)
294 def terminate(self, **kwargs):
295 """TerminateWorkflowExecution (p. 103)."""
296 return self._swf.terminate_workflow_execution(self.domain,
297 self.workflowId, **kwargs)
298
299 @wraps(Layer1.get_workflow_execution_history)
300 def history(self, **kwargs):
301 """GetWorkflowExecutionHistory."""
302 return self._swf.get_workflow_execution_history(self.domain, self.runId,
303 self.workflowId, **kwargs)['events']
304
305 @wraps(Layer1.describe_workflow_execution)
306 def describe(self):
307 """DescribeWorkflowExecution."""
308 return self._swf.describe_workflow_execution(self.domain, self.runId,
309 self.workflowId)
310
311 @wraps(Layer1.request_cancel_workflow_execution)
312 def request_cancel(self):
313 """RequestCancelWorkflowExecution."""
314 return self._swf.request_cancel_workflow_execution(self.domain,
315 self.workflowId, self.runId)
316
317
318 class ActivityType(SWFBase):
319
320 """A versioned activity type."""
321
322 version = None
323
324 @wraps(Layer1.deprecate_activity_type)
325 def deprecate(self):
326 """DeprecateActivityType."""
327 return self._swf.deprecate_activity_type(self.domain, self.name,
328 self.version)
329
330 @wraps(Layer1.describe_activity_type)
331 def describe(self):
332 """DescribeActivityType."""
333 return self._swf.describe_activity_type(self.domain, self.name,
334 self.version)
335
336 @wraps(Layer1.register_activity_type)
337 def register(self, **kwargs):
338 """RegisterActivityType."""
339 args = {
340 'default_task_heartbeat_timeout': '600',
341 'default_task_schedule_to_close_timeout': '3900',
342 'default_task_schedule_to_start_timeout': '300',
343 'default_task_start_to_close_timeout': '3600',
344 }
345 args.update(kwargs)
346 self._swf.register_activity_type(self.domain, self.name, self.version,
347 **args)