comparison env/lib/python3.9/site-packages/boto/datapipeline/layer1.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # Copyright (c) 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions:
10 #
11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE.
21 #
22
23 import boto
24 from boto.compat import json
25 from boto.connection import AWSQueryConnection
26 from boto.regioninfo import RegionInfo
27 from boto.exception import JSONResponseError
28 from boto.datapipeline import exceptions
29
30
31 class DataPipelineConnection(AWSQueryConnection):
32 """
33 This is the AWS Data Pipeline API Reference . This guide provides
34 descriptions and samples of the AWS Data Pipeline API.
35
36 AWS Data Pipeline is a web service that configures and manages a
37 data-driven workflow called a pipeline. AWS Data Pipeline handles
38 the details of scheduling and ensuring that data dependencies are
39 met so your application can focus on processing the data.
40
41 The AWS Data Pipeline API implements two main sets of
42 functionality. The first set of actions configure the pipeline in
43 the web service. You call these actions to create a pipeline and
44 define data sources, schedules, dependencies, and the transforms
45 to be performed on the data.
46
47 The second set of actions are used by a task runner application
48 that calls the AWS Data Pipeline API to receive the next task
49 ready for processing. The logic for performing the task, such as
50 querying the data, running data analysis, or converting the data
51 from one format to another, is contained within the task runner.
52 The task runner performs the task assigned to it by the web
53 service, reporting progress to the web service as it does so. When
54 the task is done, the task runner reports the final success or
55 failure of the task to the web service.
56
57 AWS Data Pipeline provides an open-source implementation of a task
58 runner called AWS Data Pipeline Task Runner. AWS Data Pipeline
59 Task Runner provides logic for common data management scenarios,
60 such as performing database queries and running data analysis
61 using Amazon Elastic MapReduce (Amazon EMR). You can use AWS Data
62 Pipeline Task Runner as your task runner, or you can write your
63 own task runner to provide custom data management.
64
65 The AWS Data Pipeline API uses the Signature Version 4 protocol
66 for signing requests. For more information about how to sign a
67 request with this protocol, see `Signature Version 4 Signing
68 Process`_. In the code examples in this reference, the Signature
69 Version 4 Request parameters are represented as AuthParams.
70 """
71 APIVersion = "2012-10-29"
72 DefaultRegionName = "us-east-1"
73 DefaultRegionEndpoint = "datapipeline.us-east-1.amazonaws.com"
74 ServiceName = "DataPipeline"
75 TargetPrefix = "DataPipeline"
76 ResponseError = JSONResponseError
77
78 _faults = {
79 "PipelineDeletedException": exceptions.PipelineDeletedException,
80 "InvalidRequestException": exceptions.InvalidRequestException,
81 "TaskNotFoundException": exceptions.TaskNotFoundException,
82 "PipelineNotFoundException": exceptions.PipelineNotFoundException,
83 "InternalServiceError": exceptions.InternalServiceError,
84 }
85
86 def __init__(self, **kwargs):
87 region = kwargs.pop('region', None)
88 if not region:
89 region = RegionInfo(self, self.DefaultRegionName,
90 self.DefaultRegionEndpoint)
91 kwargs['host'] = region.endpoint
92 super(DataPipelineConnection, self).__init__(**kwargs)
93 self.region = region
94
95 def _required_auth_capability(self):
96 return ['hmac-v4']
97
98 def activate_pipeline(self, pipeline_id):
99 """
100 Validates a pipeline and initiates processing. If the pipeline
101 does not pass validation, activation fails.
102
103 Call this action to start processing pipeline tasks of a
104 pipeline you've created using the CreatePipeline and
105 PutPipelineDefinition actions. A pipeline cannot be modified
106 after it has been successfully activated.
107
108 :type pipeline_id: string
109 :param pipeline_id: The identifier of the pipeline to activate.
110
111 """
112 params = {'pipelineId': pipeline_id, }
113 return self.make_request(action='ActivatePipeline',
114 body=json.dumps(params))
115
116 def create_pipeline(self, name, unique_id, description=None):
117 """
118 Creates a new empty pipeline. When this action succeeds, you
119 can then use the PutPipelineDefinition action to populate the
120 pipeline.
121
122 :type name: string
123 :param name: The name of the new pipeline. You can use the same name
124 for multiple pipelines associated with your AWS account, because
125 AWS Data Pipeline assigns each new pipeline a unique pipeline
126 identifier.
127
128 :type unique_id: string
129 :param unique_id: A unique identifier that you specify. This identifier
130 is not the same as the pipeline identifier assigned by AWS Data
131 Pipeline. You are responsible for defining the format and ensuring
132 the uniqueness of this identifier. You use this parameter to ensure
133 idempotency during repeated calls to CreatePipeline. For example,
134 if the first call to CreatePipeline does not return a clear
135 success, you can pass in the same unique identifier and pipeline
136 name combination on a subsequent call to CreatePipeline.
137 CreatePipeline ensures that if a pipeline already exists with the
138 same name and unique identifier, a new pipeline will not be
139 created. Instead, you'll receive the pipeline identifier from the
140 previous attempt. The uniqueness of the name and unique identifier
141 combination is scoped to the AWS account or IAM user credentials.
142
143 :type description: string
144 :param description: The description of the new pipeline.
145
146 """
147 params = {'name': name, 'uniqueId': unique_id, }
148 if description is not None:
149 params['description'] = description
150 return self.make_request(action='CreatePipeline',
151 body=json.dumps(params))
152
153 def delete_pipeline(self, pipeline_id):
154 """
155 Permanently deletes a pipeline, its pipeline definition and
156 its run history. You cannot query or restore a deleted
157 pipeline. AWS Data Pipeline will attempt to cancel instances
158 associated with the pipeline that are currently being
159 processed by task runners. Deleting a pipeline cannot be
160 undone.
161
162 To temporarily pause a pipeline instead of deleting it, call
163 SetStatus with the status set to Pause on individual
164 components. Components that are paused by SetStatus can be
165 resumed.
166
167 :type pipeline_id: string
168 :param pipeline_id: The identifier of the pipeline to be deleted.
169
170 """
171 params = {'pipelineId': pipeline_id, }
172 return self.make_request(action='DeletePipeline',
173 body=json.dumps(params))
174
175 def describe_objects(self, object_ids, pipeline_id, marker=None,
176 evaluate_expressions=None):
177 """
178 Returns the object definitions for a set of objects associated
179 with the pipeline. Object definitions are composed of a set of
180 fields that define the properties of the object.
181
182 :type pipeline_id: string
183 :param pipeline_id: Identifier of the pipeline that contains the object
184 definitions.
185
186 :type object_ids: list
187 :param object_ids: Identifiers of the pipeline objects that contain the
188 definitions to be described. You can pass as many as 25 identifiers
189 in a single call to DescribeObjects.
190
191 :type evaluate_expressions: boolean
192 :param evaluate_expressions: Indicates whether any expressions in the
193 object should be evaluated when the object descriptions are
194 returned.
195
196 :type marker: string
197 :param marker: The starting point for the results to be returned. The
198 first time you call DescribeObjects, this value should be empty. As
199 long as the action returns `HasMoreResults` as `True`, you can call
200 DescribeObjects again and pass the marker value from the response
201 to retrieve the next set of results.
202
203 """
204 params = {
205 'pipelineId': pipeline_id,
206 'objectIds': object_ids,
207 }
208 if evaluate_expressions is not None:
209 params['evaluateExpressions'] = evaluate_expressions
210 if marker is not None:
211 params['marker'] = marker
212 return self.make_request(action='DescribeObjects',
213 body=json.dumps(params))
214
215 def describe_pipelines(self, pipeline_ids):
216 """
217 Retrieve metadata about one or more pipelines. The information
218 retrieved includes the name of the pipeline, the pipeline
219 identifier, its current state, and the user account that owns
220 the pipeline. Using account credentials, you can retrieve
221 metadata about pipelines that you or your IAM users have
222 created. If you are using an IAM user account, you can
223 retrieve metadata about only those pipelines you have read
224 permission for.
225
226 To retrieve the full pipeline definition instead of metadata
227 about the pipeline, call the GetPipelineDefinition action.
228
229 :type pipeline_ids: list
230 :param pipeline_ids: Identifiers of the pipelines to describe. You can
231 pass as many as 25 identifiers in a single call to
232 DescribePipelines. You can obtain pipeline identifiers by calling
233 ListPipelines.
234
235 """
236 params = {'pipelineIds': pipeline_ids, }
237 return self.make_request(action='DescribePipelines',
238 body=json.dumps(params))
239
240 def evaluate_expression(self, pipeline_id, expression, object_id):
241 """
242 Evaluates a string in the context of a specified object. A
243 task runner can use this action to evaluate SQL queries stored
244 in Amazon S3.
245
246 :type pipeline_id: string
247 :param pipeline_id: The identifier of the pipeline.
248
249 :type object_id: string
250 :param object_id: The identifier of the object.
251
252 :type expression: string
253 :param expression: The expression to evaluate.
254
255 """
256 params = {
257 'pipelineId': pipeline_id,
258 'objectId': object_id,
259 'expression': expression,
260 }
261 return self.make_request(action='EvaluateExpression',
262 body=json.dumps(params))
263
264 def get_pipeline_definition(self, pipeline_id, version=None):
265 """
266 Returns the definition of the specified pipeline. You can call
267 GetPipelineDefinition to retrieve the pipeline definition you
268 provided using PutPipelineDefinition.
269
270 :type pipeline_id: string
271 :param pipeline_id: The identifier of the pipeline.
272
273 :type version: string
274 :param version: The version of the pipeline definition to retrieve.
275 This parameter accepts the values `latest` (default) and `active`.
276 Where `latest` indicates the last definition saved to the pipeline
277 and `active` indicates the last definition of the pipeline that was
278 activated.
279
280 """
281 params = {'pipelineId': pipeline_id, }
282 if version is not None:
283 params['version'] = version
284 return self.make_request(action='GetPipelineDefinition',
285 body=json.dumps(params))
286
287 def list_pipelines(self, marker=None):
288 """
289 Returns a list of pipeline identifiers for all active
290 pipelines. Identifiers are returned only for pipelines you
291 have permission to access.
292
293 :type marker: string
294 :param marker: The starting point for the results to be returned. The
295 first time you call ListPipelines, this value should be empty. As
296 long as the action returns `HasMoreResults` as `True`, you can call
297 ListPipelines again and pass the marker value from the response to
298 retrieve the next set of results.
299
300 """
301 params = {}
302 if marker is not None:
303 params['marker'] = marker
304 return self.make_request(action='ListPipelines',
305 body=json.dumps(params))
306
307 def poll_for_task(self, worker_group, hostname=None,
308 instance_identity=None):
309 """
310 Task runners call this action to receive a task to perform
311 from AWS Data Pipeline. The task runner specifies which tasks
312 it can perform by setting a value for the workerGroup
313 parameter of the PollForTask call. The task returned by
314 PollForTask may come from any of the pipelines that match the
315 workerGroup value passed in by the task runner and that was
316 launched using the IAM user credentials specified by the task
317 runner.
318
319 If tasks are ready in the work queue, PollForTask returns a
320 response immediately. If no tasks are available in the queue,
321 PollForTask uses long-polling and holds on to a poll
322 connection for up to a 90 seconds during which time the first
323 newly scheduled task is handed to the task runner. To
324 accomodate this, set the socket timeout in your task runner to
325 90 seconds. The task runner should not call PollForTask again
326 on the same `workerGroup` until it receives a response, and
327 this may take up to 90 seconds.
328
329 :type worker_group: string
330 :param worker_group: Indicates the type of task the task runner is
331 configured to accept and process. The worker group is set as a
332 field on objects in the pipeline when they are created. You can
333 only specify a single value for `workerGroup` in the call to
334 PollForTask. There are no wildcard values permitted in
335 `workerGroup`, the string must be an exact, case-sensitive, match.
336
337 :type hostname: string
338 :param hostname: The public DNS name of the calling task runner.
339
340 :type instance_identity: dict
341 :param instance_identity: Identity information for the Amazon EC2
342 instance that is hosting the task runner. You can get this value by
343 calling the URI, `http://169.254.169.254/latest/meta-data/instance-
344 id`, from the EC2 instance. For more information, go to `Instance
345 Metadata`_ in the Amazon Elastic Compute Cloud User Guide. Passing
346 in this value proves that your task runner is running on an EC2
347 instance, and ensures the proper AWS Data Pipeline service charges
348 are applied to your pipeline.
349
350 """
351 params = {'workerGroup': worker_group, }
352 if hostname is not None:
353 params['hostname'] = hostname
354 if instance_identity is not None:
355 params['instanceIdentity'] = instance_identity
356 return self.make_request(action='PollForTask',
357 body=json.dumps(params))
358
359 def put_pipeline_definition(self, pipeline_objects, pipeline_id):
360 """
361 Adds tasks, schedules, and preconditions that control the
362 behavior of the pipeline. You can use PutPipelineDefinition to
363 populate a new pipeline or to update an existing pipeline that
364 has not yet been activated.
365
366 PutPipelineDefinition also validates the configuration as it
367 adds it to the pipeline. Changes to the pipeline are saved
368 unless one of the following three validation errors exists in
369 the pipeline.
370
371 #. An object is missing a name or identifier field.
372 #. A string or reference field is empty.
373 #. The number of objects in the pipeline exceeds the maximum
374 allowed objects.
375
376
377
378 Pipeline object definitions are passed to the
379 PutPipelineDefinition action and returned by the
380 GetPipelineDefinition action.
381
382 :type pipeline_id: string
383 :param pipeline_id: The identifier of the pipeline to be configured.
384
385 :type pipeline_objects: list
386 :param pipeline_objects: The objects that define the pipeline. These
387 will overwrite the existing pipeline definition.
388
389 """
390 params = {
391 'pipelineId': pipeline_id,
392 'pipelineObjects': pipeline_objects,
393 }
394 return self.make_request(action='PutPipelineDefinition',
395 body=json.dumps(params))
396
397 def query_objects(self, pipeline_id, sphere, marker=None, query=None,
398 limit=None):
399 """
400 Queries a pipeline for the names of objects that match a
401 specified set of conditions.
402
403 The objects returned by QueryObjects are paginated and then
404 filtered by the value you set for query. This means the action
405 may return an empty result set with a value set for marker. If
406 `HasMoreResults` is set to `True`, you should continue to call
407 QueryObjects, passing in the returned value for marker, until
408 `HasMoreResults` returns `False`.
409
410 :type pipeline_id: string
411 :param pipeline_id: Identifier of the pipeline to be queried for object
412 names.
413
414 :type query: dict
415 :param query: Query that defines the objects to be returned. The Query
416 object can contain a maximum of ten selectors. The conditions in
417 the query are limited to top-level String fields in the object.
418 These filters can be applied to components, instances, and
419 attempts.
420
421 :type sphere: string
422 :param sphere: Specifies whether the query applies to components or
423 instances. Allowable values: `COMPONENT`, `INSTANCE`, `ATTEMPT`.
424
425 :type marker: string
426 :param marker: The starting point for the results to be returned. The
427 first time you call QueryObjects, this value should be empty. As
428 long as the action returns `HasMoreResults` as `True`, you can call
429 QueryObjects again and pass the marker value from the response to
430 retrieve the next set of results.
431
432 :type limit: integer
433 :param limit: Specifies the maximum number of object names that
434 QueryObjects will return in a single call. The default value is
435 100.
436
437 """
438 params = {'pipelineId': pipeline_id, 'sphere': sphere, }
439 if query is not None:
440 params['query'] = query
441 if marker is not None:
442 params['marker'] = marker
443 if limit is not None:
444 params['limit'] = limit
445 return self.make_request(action='QueryObjects',
446 body=json.dumps(params))
447
448 def report_task_progress(self, task_id):
449 """
450 Updates the AWS Data Pipeline service on the progress of the
451 calling task runner. When the task runner is assigned a task,
452 it should call ReportTaskProgress to acknowledge that it has
453 the task within 2 minutes. If the web service does not recieve
454 this acknowledgement within the 2 minute window, it will
455 assign the task in a subsequent PollForTask call. After this
456 initial acknowledgement, the task runner only needs to report
457 progress every 15 minutes to maintain its ownership of the
458 task. You can change this reporting time from 15 minutes by
459 specifying a `reportProgressTimeout` field in your pipeline.
460 If a task runner does not report its status after 5 minutes,
461 AWS Data Pipeline will assume that the task runner is unable
462 to process the task and will reassign the task in a subsequent
463 response to PollForTask. task runners should call
464 ReportTaskProgress every 60 seconds.
465
466 :type task_id: string
467 :param task_id: Identifier of the task assigned to the task runner.
468 This value is provided in the TaskObject that the service returns
469 with the response for the PollForTask action.
470
471 """
472 params = {'taskId': task_id, }
473 return self.make_request(action='ReportTaskProgress',
474 body=json.dumps(params))
475
476 def report_task_runner_heartbeat(self, taskrunner_id, worker_group=None,
477 hostname=None):
478 """
479 Task runners call ReportTaskRunnerHeartbeat every 15 minutes
480 to indicate that they are operational. In the case of AWS Data
481 Pipeline Task Runner launched on a resource managed by AWS
482 Data Pipeline, the web service can use this call to detect
483 when the task runner application has failed and restart a new
484 instance.
485
486 :type taskrunner_id: string
487 :param taskrunner_id: The identifier of the task runner. This value
488 should be unique across your AWS account. In the case of AWS Data
489 Pipeline Task Runner launched on a resource managed by AWS Data
490 Pipeline, the web service provides a unique identifier when it
491 launches the application. If you have written a custom task runner,
492 you should assign a unique identifier for the task runner.
493
494 :type worker_group: string
495 :param worker_group: Indicates the type of task the task runner is
496 configured to accept and process. The worker group is set as a
497 field on objects in the pipeline when they are created. You can
498 only specify a single value for `workerGroup` in the call to
499 ReportTaskRunnerHeartbeat. There are no wildcard values permitted
500 in `workerGroup`, the string must be an exact, case-sensitive,
501 match.
502
503 :type hostname: string
504 :param hostname: The public DNS name of the calling task runner.
505
506 """
507 params = {'taskrunnerId': taskrunner_id, }
508 if worker_group is not None:
509 params['workerGroup'] = worker_group
510 if hostname is not None:
511 params['hostname'] = hostname
512 return self.make_request(action='ReportTaskRunnerHeartbeat',
513 body=json.dumps(params))
514
515 def set_status(self, object_ids, status, pipeline_id):
516 """
517 Requests that the status of an array of physical or logical
518 pipeline objects be updated in the pipeline. This update may
519 not occur immediately, but is eventually consistent. The
520 status that can be set depends on the type of object.
521
522 :type pipeline_id: string
523 :param pipeline_id: Identifies the pipeline that contains the objects.
524
525 :type object_ids: list
526 :param object_ids: Identifies an array of objects. The corresponding
527 objects can be either physical or components, but not a mix of both
528 types.
529
530 :type status: string
531 :param status: Specifies the status to be set on all the objects in
532 `objectIds`. For components, this can be either `PAUSE` or
533 `RESUME`. For instances, this can be either `CANCEL`, `RERUN`, or
534 `MARK_FINISHED`.
535
536 """
537 params = {
538 'pipelineId': pipeline_id,
539 'objectIds': object_ids,
540 'status': status,
541 }
542 return self.make_request(action='SetStatus',
543 body=json.dumps(params))
544
545 def set_task_status(self, task_id, task_status, error_id=None,
546 error_message=None, error_stack_trace=None):
547 """
548 Notifies AWS Data Pipeline that a task is completed and
549 provides information about the final status. The task runner
550 calls this action regardless of whether the task was
551 sucessful. The task runner does not need to call SetTaskStatus
552 for tasks that are canceled by the web service during a call
553 to ReportTaskProgress.
554
555 :type task_id: string
556 :param task_id: Identifies the task assigned to the task runner. This
557 value is set in the TaskObject that is returned by the PollForTask
558 action.
559
560 :type task_status: string
561 :param task_status: If `FINISHED`, the task successfully completed. If
562 `FAILED` the task ended unsuccessfully. The `FALSE` value is used
563 by preconditions.
564
565 :type error_id: string
566 :param error_id: If an error occurred during the task, this value
567 specifies an id value that represents the error. This value is set
568 on the physical attempt object. It is used to display error
569 information to the user. It should not start with string "Service_"
570 which is reserved by the system.
571
572 :type error_message: string
573 :param error_message: If an error occurred during the task, this value
574 specifies a text description of the error. This value is set on the
575 physical attempt object. It is used to display error information to
576 the user. The web service does not parse this value.
577
578 :type error_stack_trace: string
579 :param error_stack_trace: If an error occurred during the task, this
580 value specifies the stack trace associated with the error. This
581 value is set on the physical attempt object. It is used to display
582 error information to the user. The web service does not parse this
583 value.
584
585 """
586 params = {'taskId': task_id, 'taskStatus': task_status, }
587 if error_id is not None:
588 params['errorId'] = error_id
589 if error_message is not None:
590 params['errorMessage'] = error_message
591 if error_stack_trace is not None:
592 params['errorStackTrace'] = error_stack_trace
593 return self.make_request(action='SetTaskStatus',
594 body=json.dumps(params))
595
596 def validate_pipeline_definition(self, pipeline_objects, pipeline_id):
597 """
598 Tests the pipeline definition with a set of validation checks
599 to ensure that it is well formed and can run without error.
600
601 :type pipeline_id: string
602 :param pipeline_id: Identifies the pipeline whose definition is to be
603 validated.
604
605 :type pipeline_objects: list
606 :param pipeline_objects: A list of objects that define the pipeline
607 changes to validate against the pipeline.
608
609 """
610 params = {
611 'pipelineId': pipeline_id,
612 'pipelineObjects': pipeline_objects,
613 }
614 return self.make_request(action='ValidatePipelineDefinition',
615 body=json.dumps(params))
616
617 def make_request(self, action, body):
618 headers = {
619 'X-Amz-Target': '%s.%s' % (self.TargetPrefix, action),
620 'Host': self.region.endpoint,
621 'Content-Type': 'application/x-amz-json-1.1',
622 'Content-Length': str(len(body)),
623 }
624 http_request = self.build_base_http_request(
625 method='POST', path='/', auth_path='/', params={},
626 headers=headers, data=body)
627 response = self._mexe(http_request, sender=None,
628 override_num_retries=10)
629 response_body = response.read().decode('utf-8')
630 boto.log.debug(response_body)
631 if response.status == 200:
632 if response_body:
633 return json.loads(response_body)
634 else:
635 json_body = json.loads(response_body)
636 fault_name = json_body.get('__type', None)
637 exception_class = self._faults.get(fault_name, self.ResponseError)
638 raise exception_class(response.status, response.reason,
639 body=json_body)