comparison env/lib/python3.9/site-packages/boto/kinesis/layer1.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # Copyright (c) 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions:
10 #
11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE.
21 #
22
23 import base64
24 import boto
25
26 from boto.connection import AWSQueryConnection
27 from boto.regioninfo import RegionInfo
28 from boto.exception import JSONResponseError
29 from boto.kinesis import exceptions
30 from boto.compat import json
31 from boto.compat import six
32
33
34 class KinesisConnection(AWSQueryConnection):
35 """
36 Amazon Kinesis Service API Reference
37 Amazon Kinesis is a managed service that scales elastically for
38 real time processing of streaming big data.
39 """
40 APIVersion = "2013-12-02"
41 DefaultRegionName = "us-east-1"
42 DefaultRegionEndpoint = "kinesis.us-east-1.amazonaws.com"
43 ServiceName = "Kinesis"
44 TargetPrefix = "Kinesis_20131202"
45 ResponseError = JSONResponseError
46
47 _faults = {
48 "ProvisionedThroughputExceededException": exceptions.ProvisionedThroughputExceededException,
49 "LimitExceededException": exceptions.LimitExceededException,
50 "ExpiredIteratorException": exceptions.ExpiredIteratorException,
51 "ResourceInUseException": exceptions.ResourceInUseException,
52 "ResourceNotFoundException": exceptions.ResourceNotFoundException,
53 "InvalidArgumentException": exceptions.InvalidArgumentException,
54 "SubscriptionRequiredException": exceptions.SubscriptionRequiredException
55 }
56
57
58 def __init__(self, **kwargs):
59 region = kwargs.pop('region', None)
60 if not region:
61 region = RegionInfo(self, self.DefaultRegionName,
62 self.DefaultRegionEndpoint)
63 if 'host' not in kwargs:
64 kwargs['host'] = region.endpoint
65 super(KinesisConnection, self).__init__(**kwargs)
66 self.region = region
67
68 def _required_auth_capability(self):
69 return ['hmac-v4']
70
71 def add_tags_to_stream(self, stream_name, tags):
72 """
73 Adds or updates tags for the specified Amazon Kinesis stream.
74 Each stream can have up to 10 tags.
75
76 If tags have already been assigned to the stream,
77 `AddTagsToStream` overwrites any existing tags that correspond
78 to the specified tag keys.
79
80 :type stream_name: string
81 :param stream_name: The name of the stream.
82
83 :type tags: map
84 :param tags: The set of key-value pairs to use to create the tags.
85
86 """
87 params = {'StreamName': stream_name, 'Tags': tags, }
88 return self.make_request(action='AddTagsToStream',
89 body=json.dumps(params))
90
91 def create_stream(self, stream_name, shard_count):
92 """
93 Creates a Amazon Kinesis stream. A stream captures and
94 transports data records that are continuously emitted from
95 different data sources or producers . Scale-out within an
96 Amazon Kinesis stream is explicitly supported by means of
97 shards, which are uniquely identified groups of data records
98 in an Amazon Kinesis stream.
99
100 You specify and control the number of shards that a stream is
101 composed of. Each open shard can support up to 5 read
102 transactions per second, up to a maximum total of 2 MB of data
103 read per second. Each shard can support up to 1000 records
104 written per second, up to a maximum total of 1 MB data written
105 per second. You can add shards to a stream if the amount of
106 data input increases and you can remove shards if the amount
107 of data input decreases.
108
109 The stream name identifies the stream. The name is scoped to
110 the AWS account used by the application. It is also scoped by
111 region. That is, two streams in two different accounts can
112 have the same name, and two streams in the same account, but
113 in two different regions, can have the same name.
114
115 `CreateStream` is an asynchronous operation. Upon receiving a
116 `CreateStream` request, Amazon Kinesis immediately returns and
117 sets the stream status to `CREATING`. After the stream is
118 created, Amazon Kinesis sets the stream status to `ACTIVE`.
119 You should perform read and write operations only on an
120 `ACTIVE` stream.
121
122 You receive a `LimitExceededException` when making a
123 `CreateStream` request if you try to do one of the following:
124
125
126 + Have more than five streams in the `CREATING` state at any
127 point in time.
128 + Create more shards than are authorized for your account.
129
130
131 The default limit for an AWS account is 10 shards per stream.
132 If you need to create a stream with more than 10 shards,
133 `contact AWS Support`_ to increase the limit on your account.
134
135 You can use `DescribeStream` to check the stream status, which
136 is returned in `StreamStatus`.
137
138 `CreateStream` has a limit of 5 transactions per second per
139 account.
140
141 :type stream_name: string
142 :param stream_name: A name to identify the stream. The stream name is
143 scoped to the AWS account used by the application that creates the
144 stream. It is also scoped by region. That is, two streams in two
145 different AWS accounts can have the same name, and two streams in
146 the same AWS account, but in two different regions, can have the
147 same name.
148
149 :type shard_count: integer
150 :param shard_count: The number of shards that the stream will use. The
151 throughput of the stream is a function of the number of shards;
152 more shards are required for greater provisioned throughput.
153 **Note:** The default limit for an AWS account is 10 shards per stream.
154 If you need to create a stream with more than 10 shards, `contact
155 AWS Support`_ to increase the limit on your account.
156
157 """
158 params = {
159 'StreamName': stream_name,
160 'ShardCount': shard_count,
161 }
162 return self.make_request(action='CreateStream',
163 body=json.dumps(params))
164
165 def delete_stream(self, stream_name):
166 """
167 Deletes a stream and all its shards and data. You must shut
168 down any applications that are operating on the stream before
169 you delete the stream. If an application attempts to operate
170 on a deleted stream, it will receive the exception
171 `ResourceNotFoundException`.
172
173 If the stream is in the `ACTIVE` state, you can delete it.
174 After a `DeleteStream` request, the specified stream is in the
175 `DELETING` state until Amazon Kinesis completes the deletion.
176
177 **Note:** Amazon Kinesis might continue to accept data read
178 and write operations, such as PutRecord, PutRecords, and
179 GetRecords, on a stream in the `DELETING` state until the
180 stream deletion is complete.
181
182 When you delete a stream, any shards in that stream are also
183 deleted, and any tags are dissociated from the stream.
184
185 You can use the DescribeStream operation to check the state of
186 the stream, which is returned in `StreamStatus`.
187
188 `DeleteStream` has a limit of 5 transactions per second per
189 account.
190
191 :type stream_name: string
192 :param stream_name: The name of the stream to delete.
193
194 """
195 params = {'StreamName': stream_name, }
196 return self.make_request(action='DeleteStream',
197 body=json.dumps(params))
198
199 def describe_stream(self, stream_name, limit=None,
200 exclusive_start_shard_id=None):
201 """
202 Describes the specified stream.
203
204 The information about the stream includes its current status,
205 its Amazon Resource Name (ARN), and an array of shard objects.
206 For each shard object, there is information about the hash key
207 and sequence number ranges that the shard spans, and the IDs
208 of any earlier shards that played in a role in creating the
209 shard. A sequence number is the identifier associated with
210 every record ingested in the Amazon Kinesis stream. The
211 sequence number is assigned when a record is put into the
212 stream.
213
214 You can limit the number of returned shards using the `Limit`
215 parameter. The number of shards in a stream may be too large
216 to return from a single call to `DescribeStream`. You can
217 detect this by using the `HasMoreShards` flag in the returned
218 output. `HasMoreShards` is set to `True` when there is more
219 data available.
220
221 `DescribeStream` is a paginated operation. If there are more
222 shards available, you can request them using the shard ID of
223 the last shard returned. Specify this ID in the
224 `ExclusiveStartShardId` parameter in a subsequent request to
225 `DescribeStream`.
226
227 `DescribeStream` has a limit of 10 transactions per second per
228 account.
229
230 :type stream_name: string
231 :param stream_name: The name of the stream to describe.
232
233 :type limit: integer
234 :param limit: The maximum number of shards to return.
235
236 :type exclusive_start_shard_id: string
237 :param exclusive_start_shard_id: The shard ID of the shard to start
238 with.
239
240 """
241 params = {'StreamName': stream_name, }
242 if limit is not None:
243 params['Limit'] = limit
244 if exclusive_start_shard_id is not None:
245 params['ExclusiveStartShardId'] = exclusive_start_shard_id
246 return self.make_request(action='DescribeStream',
247 body=json.dumps(params))
248
249 def get_records(self, shard_iterator, limit=None, b64_decode=True):
250 """
251 Gets data records from a shard.
252
253 Specify a shard iterator using the `ShardIterator` parameter.
254 The shard iterator specifies the position in the shard from
255 which you want to start reading data records sequentially. If
256 there are no records available in the portion of the shard
257 that the iterator points to, `GetRecords` returns an empty
258 list. Note that it might take multiple calls to get to a
259 portion of the shard that contains records.
260
261 You can scale by provisioning multiple shards. Your
262 application should have one thread per shard, each reading
263 continuously from its stream. To read from a stream
264 continually, call `GetRecords` in a loop. Use GetShardIterator
265 to get the shard iterator to specify in the first `GetRecords`
266 call. `GetRecords` returns a new shard iterator in
267 `NextShardIterator`. Specify the shard iterator returned in
268 `NextShardIterator` in subsequent calls to `GetRecords`. Note
269 that if the shard has been closed, the shard iterator can't
270 return more data and `GetRecords` returns `null` in
271 `NextShardIterator`. You can terminate the loop when the shard
272 is closed, or when the shard iterator reaches the record with
273 the sequence number or other attribute that marks it as the
274 last record to process.
275
276 Each data record can be up to 50 KB in size, and each shard
277 can read up to 2 MB per second. You can ensure that your calls
278 don't exceed the maximum supported size or throughput by using
279 the `Limit` parameter to specify the maximum number of records
280 that `GetRecords` can return. Consider your average record
281 size when determining this limit. For example, if your average
282 record size is 40 KB, you can limit the data returned to about
283 1 MB per call by specifying 25 as the limit.
284
285 The size of the data returned by `GetRecords` will vary
286 depending on the utilization of the shard. The maximum size of
287 data that `GetRecords` can return is 10 MB. If a call returns
288 10 MB of data, subsequent calls made within the next 5 seconds
289 throw `ProvisionedThroughputExceededException`. If there is
290 insufficient provisioned throughput on the shard, subsequent
291 calls made within the next 1 second throw
292 `ProvisionedThroughputExceededException`. Note that
293 `GetRecords` won't return any data when it throws an
294 exception. For this reason, we recommend that you wait one
295 second between calls to `GetRecords`; however, it's possible
296 that the application will get exceptions for longer than 1
297 second.
298
299 To detect whether the application is falling behind in
300 processing, add a timestamp to your records and note how long
301 it takes to process them. You can also monitor how much data
302 is in a stream using the CloudWatch metrics for write
303 operations ( `PutRecord` and `PutRecords`). For more
304 information, see `Monitoring Amazon Kinesis with Amazon
305 CloudWatch`_ in the Amazon Kinesis Developer Guide .
306
307 :type shard_iterator: string
308 :param shard_iterator: The position in the shard from which you want to
309 start sequentially reading data records. A shard iterator specifies
310 this position using the sequence number of a data record in the
311 shard.
312
313 :type limit: integer
314 :param limit: The maximum number of records to return. Specify a value
315 of up to 10,000. If you specify a value that is greater than
316 10,000, `GetRecords` throws `InvalidArgumentException`.
317
318 :type b64_decode: boolean
319 :param b64_decode: Decode the Base64-encoded ``Data`` field of records.
320
321 """
322 params = {'ShardIterator': shard_iterator, }
323 if limit is not None:
324 params['Limit'] = limit
325
326 response = self.make_request(action='GetRecords',
327 body=json.dumps(params))
328
329 # Base64 decode the data
330 if b64_decode:
331 for record in response.get('Records', []):
332 record['Data'] = base64.b64decode(
333 record['Data'].encode('utf-8')).decode('utf-8')
334
335 return response
336
337 def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type,
338 starting_sequence_number=None):
339 """
340 Gets a shard iterator. A shard iterator expires five minutes
341 after it is returned to the requester.
342
343 A shard iterator specifies the position in the shard from
344 which to start reading data records sequentially. A shard
345 iterator specifies this position using the sequence number of
346 a data record in a shard. A sequence number is the identifier
347 associated with every record ingested in the Amazon Kinesis
348 stream. The sequence number is assigned when a record is put
349 into the stream.
350
351 You must specify the shard iterator type. For example, you can
352 set the `ShardIteratorType` parameter to read exactly from the
353 position denoted by a specific sequence number by using the
354 `AT_SEQUENCE_NUMBER` shard iterator type, or right after the
355 sequence number by using the `AFTER_SEQUENCE_NUMBER` shard
356 iterator type, using sequence numbers returned by earlier
357 calls to PutRecord, PutRecords, GetRecords, or DescribeStream.
358 You can specify the shard iterator type `TRIM_HORIZON` in the
359 request to cause `ShardIterator` to point to the last
360 untrimmed record in the shard in the system, which is the
361 oldest data record in the shard. Or you can point to just
362 after the most recent record in the shard, by using the shard
363 iterator type `LATEST`, so that you always read the most
364 recent data in the shard.
365
366 When you repeatedly read from an Amazon Kinesis stream use a
367 GetShardIterator request to get the first shard iterator to to
368 use in your first `GetRecords` request and then use the shard
369 iterator returned by the `GetRecords` request in
370 `NextShardIterator` for subsequent reads. A new shard iterator
371 is returned by every `GetRecords` request in
372 `NextShardIterator`, which you use in the `ShardIterator`
373 parameter of the next `GetRecords` request.
374
375 If a `GetShardIterator` request is made too often, you receive
376 a `ProvisionedThroughputExceededException`. For more
377 information about throughput limits, see GetRecords.
378
379 If the shard is closed, the iterator can't return more data,
380 and `GetShardIterator` returns `null` for its `ShardIterator`.
381 A shard can be closed using SplitShard or MergeShards.
382
383 `GetShardIterator` has a limit of 5 transactions per second
384 per account per open shard.
385
386 :type stream_name: string
387 :param stream_name: The name of the stream.
388
389 :type shard_id: string
390 :param shard_id: The shard ID of the shard to get the iterator for.
391
392 :type shard_iterator_type: string
393 :param shard_iterator_type:
394 Determines how the shard iterator is used to start reading data records
395 from the shard.
396
397 The following are the valid shard iterator types:
398
399
400 + AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted
401 by a specific sequence number.
402 + AFTER_SEQUENCE_NUMBER - Start reading right after the position
403 denoted by a specific sequence number.
404 + TRIM_HORIZON - Start reading at the last untrimmed record in the
405 shard in the system, which is the oldest data record in the shard.
406 + LATEST - Start reading just after the most recent record in the
407 shard, so that you always read the most recent data in the shard.
408
409 :type starting_sequence_number: string
410 :param starting_sequence_number: The sequence number of the data record
411 in the shard from which to start reading from.
412
413 :returns: A dictionary containing:
414
415 1) a `ShardIterator` with the value being the shard-iterator object
416 """
417
418 params = {
419 'StreamName': stream_name,
420 'ShardId': shard_id,
421 'ShardIteratorType': shard_iterator_type,
422 }
423 if starting_sequence_number is not None:
424 params['StartingSequenceNumber'] = starting_sequence_number
425 return self.make_request(action='GetShardIterator',
426 body=json.dumps(params))
427
428 def list_streams(self, limit=None, exclusive_start_stream_name=None):
429 """
430 Lists your streams.
431
432 The number of streams may be too large to return from a single
433 call to `ListStreams`. You can limit the number of returned
434 streams using the `Limit` parameter. If you do not specify a
435 value for the `Limit` parameter, Amazon Kinesis uses the
436 default limit, which is currently 10.
437
438 You can detect if there are more streams available to list by
439 using the `HasMoreStreams` flag from the returned output. If
440 there are more streams available, you can request more streams
441 by using the name of the last stream returned by the
442 `ListStreams` request in the `ExclusiveStartStreamName`
443 parameter in a subsequent request to `ListStreams`. The group
444 of stream names returned by the subsequent request is then
445 added to the list. You can continue this process until all the
446 stream names have been collected in the list.
447
448 `ListStreams` has a limit of 5 transactions per second per
449 account.
450
451 :type limit: integer
452 :param limit: The maximum number of streams to list.
453
454 :type exclusive_start_stream_name: string
455 :param exclusive_start_stream_name: The name of the stream to start the
456 list with.
457
458 """
459 params = {}
460 if limit is not None:
461 params['Limit'] = limit
462 if exclusive_start_stream_name is not None:
463 params['ExclusiveStartStreamName'] = exclusive_start_stream_name
464 return self.make_request(action='ListStreams',
465 body=json.dumps(params))
466
467 def list_tags_for_stream(self, stream_name, exclusive_start_tag_key=None,
468 limit=None):
469 """
470 Lists the tags for the specified Amazon Kinesis stream.
471
472 :type stream_name: string
473 :param stream_name: The name of the stream.
474
475 :type exclusive_start_tag_key: string
476 :param exclusive_start_tag_key: The key to use as the starting point
477 for the list of tags. If this parameter is set, `ListTagsForStream`
478 gets all tags that occur after `ExclusiveStartTagKey`.
479
480 :type limit: integer
481 :param limit: The number of tags to return. If this number is less than
482 the total number of tags associated with the stream, `HasMoreTags`
483 is set to `True`. To list additional tags, set
484 `ExclusiveStartTagKey` to the last key in the response.
485
486 """
487 params = {'StreamName': stream_name, }
488 if exclusive_start_tag_key is not None:
489 params['ExclusiveStartTagKey'] = exclusive_start_tag_key
490 if limit is not None:
491 params['Limit'] = limit
492 return self.make_request(action='ListTagsForStream',
493 body=json.dumps(params))
494
495 def merge_shards(self, stream_name, shard_to_merge,
496 adjacent_shard_to_merge):
497 """
498 Merges two adjacent shards in a stream and combines them into
499 a single shard to reduce the stream's capacity to ingest and
500 transport data. Two shards are considered adjacent if the
501 union of the hash key ranges for the two shards form a
502 contiguous set with no gaps. For example, if you have two
503 shards, one with a hash key range of 276...381 and the other
504 with a hash key range of 382...454, then you could merge these
505 two shards into a single shard that would have a hash key
506 range of 276...454. After the merge, the single child shard
507 receives data for all hash key values covered by the two
508 parent shards.
509
510 `MergeShards` is called when there is a need to reduce the
511 overall capacity of a stream because of excess capacity that
512 is not being used. You must specify the shard to be merged and
513 the adjacent shard for a stream. For more information about
514 merging shards, see `Merge Two Shards`_ in the Amazon Kinesis
515 Developer Guide .
516
517 If the stream is in the `ACTIVE` state, you can call
518 `MergeShards`. If a stream is in the `CREATING`, `UPDATING`,
519 or `DELETING` state, `MergeShards` returns a
520 `ResourceInUseException`. If the specified stream does not
521 exist, `MergeShards` returns a `ResourceNotFoundException`.
522
523 You can use DescribeStream to check the state of the stream,
524 which is returned in `StreamStatus`.
525
526 `MergeShards` is an asynchronous operation. Upon receiving a
527 `MergeShards` request, Amazon Kinesis immediately returns a
528 response and sets the `StreamStatus` to `UPDATING`. After the
529 operation is completed, Amazon Kinesis sets the `StreamStatus`
530 to `ACTIVE`. Read and write operations continue to work while
531 the stream is in the `UPDATING` state.
532
533 You use DescribeStream to determine the shard IDs that are
534 specified in the `MergeShards` request.
535
536 If you try to operate on too many streams in parallel using
537 CreateStream, DeleteStream, `MergeShards` or SplitShard, you
538 will receive a `LimitExceededException`.
539
540 `MergeShards` has limit of 5 transactions per second per
541 account.
542
543 :type stream_name: string
544 :param stream_name: The name of the stream for the merge.
545
546 :type shard_to_merge: string
547 :param shard_to_merge: The shard ID of the shard to combine with the
548 adjacent shard for the merge.
549
550 :type adjacent_shard_to_merge: string
551 :param adjacent_shard_to_merge: The shard ID of the adjacent shard for
552 the merge.
553
554 """
555 params = {
556 'StreamName': stream_name,
557 'ShardToMerge': shard_to_merge,
558 'AdjacentShardToMerge': adjacent_shard_to_merge,
559 }
560 return self.make_request(action='MergeShards',
561 body=json.dumps(params))
562
563 def put_record(self, stream_name, data, partition_key,
564 explicit_hash_key=None,
565 sequence_number_for_ordering=None,
566 exclusive_minimum_sequence_number=None,
567 b64_encode=True):
568 """
569 This operation puts a data record into an Amazon Kinesis
570 stream from a producer. This operation must be called to send
571 data from the producer into the Amazon Kinesis stream for
572 real-time ingestion and subsequent processing. The `PutRecord`
573 operation requires the name of the stream that captures,
574 stores, and transports the data; a partition key; and the data
575 blob itself. The data blob could be a segment from a log file,
576 geographic/location data, website clickstream data, or any
577 other data type.
578
579 The partition key is used to distribute data across shards.
580 Amazon Kinesis segregates the data records that belong to a
581 data stream into multiple shards, using the partition key
582 associated with each data record to determine which shard a
583 given data record belongs to.
584
585 Partition keys are Unicode strings, with a maximum length
586 limit of 256 bytes. An MD5 hash function is used to map
587 partition keys to 128-bit integer values and to map associated
588 data records to shards using the hash key ranges of the
589 shards. You can override hashing the partition key to
590 determine the shard by explicitly specifying a hash value
591 using the `ExplicitHashKey` parameter. For more information,
592 see the `Amazon Kinesis Developer Guide`_.
593
594 `PutRecord` returns the shard ID of where the data record was
595 placed and the sequence number that was assigned to the data
596 record.
597
598 Sequence numbers generally increase over time. To guarantee
599 strictly increasing ordering, use the
600 `SequenceNumberForOrdering` parameter. For more information,
601 see the `Amazon Kinesis Developer Guide`_.
602
603 If a `PutRecord` request cannot be processed because of
604 insufficient provisioned throughput on the shard involved in
605 the request, `PutRecord` throws
606 `ProvisionedThroughputExceededException`.
607
608 Data records are accessible for only 24 hours from the time
609 that they are added to an Amazon Kinesis stream.
610
611 :type stream_name: string
612 :param stream_name: The name of the stream to put the data record into.
613
614 :type data: blob
615 :param data: The data blob to put into the record, which is
616 Base64-encoded when the blob is serialized.
617 The maximum size of the data blob (the payload after
618 Base64-decoding) is 50 kilobytes (KB)
619 Set `b64_encode` to disable automatic Base64 encoding.
620
621 :type partition_key: string
622 :param partition_key: Determines which shard in the stream the data
623 record is assigned to. Partition keys are Unicode strings with a
624 maximum length limit of 256 bytes. Amazon Kinesis uses the
625 partition key as input to a hash function that maps the partition
626 key and associated data to a specific shard. Specifically, an MD5
627 hash function is used to map partition keys to 128-bit integer
628 values and to map associated data records to shards. As a result of
629 this hashing mechanism, all data records with the same partition
630 key will map to the same shard within the stream.
631
632 :type explicit_hash_key: string
633 :param explicit_hash_key: The hash value used to explicitly determine
634 the shard the data record is assigned to by overriding the
635 partition key hash.
636
637 :type sequence_number_for_ordering: string
638 :param sequence_number_for_ordering: Guarantees strictly increasing
639 sequence numbers, for puts from the same client and to the same
640 partition key. Usage: set the `SequenceNumberForOrdering` of record
641 n to the sequence number of record n-1 (as returned in the
642 PutRecordResult when putting record n-1 ). If this parameter is not
643 set, records will be coarsely ordered based on arrival time.
644
645 :type b64_encode: boolean
646 :param b64_encode: Whether to Base64 encode `data`. Can be set to
647 ``False`` if `data` is already encoded to prevent double encoding.
648
649 """
650 params = {
651 'StreamName': stream_name,
652 'Data': data,
653 'PartitionKey': partition_key,
654 }
655 if explicit_hash_key is not None:
656 params['ExplicitHashKey'] = explicit_hash_key
657 if sequence_number_for_ordering is not None:
658 params['SequenceNumberForOrdering'] = sequence_number_for_ordering
659 if b64_encode:
660 if not isinstance(params['Data'], six.binary_type):
661 params['Data'] = params['Data'].encode('utf-8')
662 params['Data'] = base64.b64encode(params['Data']).decode('utf-8')
663 return self.make_request(action='PutRecord',
664 body=json.dumps(params))
665
666 def put_records(self, records, stream_name, b64_encode=True):
667 """
668 Puts (writes) multiple data records from a producer into an
669 Amazon Kinesis stream in a single call (also referred to as a
670 `PutRecords` request). Use this operation to send data from a
671 data producer into the Amazon Kinesis stream for real-time
672 ingestion and processing. Each shard can support up to 1000
673 records written per second, up to a maximum total of 1 MB data
674 written per second.
675
676 You must specify the name of the stream that captures, stores,
677 and transports the data; and an array of request `Records`,
678 with each record in the array requiring a partition key and
679 data blob.
680
681 The data blob can be any type of data; for example, a segment
682 from a log file, geographic/location data, website clickstream
683 data, and so on.
684
685 The partition key is used by Amazon Kinesis as input to a hash
686 function that maps the partition key and associated data to a
687 specific shard. An MD5 hash function is used to map partition
688 keys to 128-bit integer values and to map associated data
689 records to shards. As a result of this hashing mechanism, all
690 data records with the same partition key map to the same shard
691 within the stream. For more information, see `Partition Key`_
692 in the Amazon Kinesis Developer Guide .
693
694 Each record in the `Records` array may include an optional
695 parameter, `ExplicitHashKey`, which overrides the partition
696 key to shard mapping. This parameter allows a data producer to
697 determine explicitly the shard where the record is stored. For
698 more information, see `Adding Multiple Records with
699 PutRecords`_ in the Amazon Kinesis Developer Guide .
700
701 The `PutRecords` response includes an array of response
702 `Records`. Each record in the response array directly
703 correlates with a record in the request array using natural
704 ordering, from the top to the bottom of the request and
705 response. The response `Records` array always includes the
706 same number of records as the request array.
707
708 The response `Records` array includes both successfully and
709 unsuccessfully processed records. Amazon Kinesis attempts to
710 process all records in each `PutRecords` request. A single
711 record failure does not stop the processing of subsequent
712 records.
713
714 A successfully-processed record includes `ShardId` and
715 `SequenceNumber` values. The `ShardId` parameter identifies
716 the shard in the stream where the record is stored. The
717 `SequenceNumber` parameter is an identifier assigned to the
718 put record, unique to all records in the stream.
719
720 An unsuccessfully-processed record includes `ErrorCode` and
721 `ErrorMessage` values. `ErrorCode` reflects the type of error
722 and can be one of the following values:
723 `ProvisionedThroughputExceededException` or `InternalFailure`.
724 `ErrorMessage` provides more detailed information about the
725 `ProvisionedThroughputExceededException` exception including
726 the account ID, stream name, and shard ID of the record that
727 was throttled.
728
729 Data records are accessible for only 24 hours from the time
730 that they are added to an Amazon Kinesis stream.
731
732 :type records: list
733 :param records: The records associated with the request.
734
735 :type stream_name: string
736 :param stream_name: The stream name associated with the request.
737
738 :type b64_encode: boolean
739 :param b64_encode: Whether to Base64 encode `data`. Can be set to
740 ``False`` if `data` is already encoded to prevent double encoding.
741
742 """
743 params = {'Records': records, 'StreamName': stream_name, }
744 if b64_encode:
745 for i in range(len(params['Records'])):
746 data = params['Records'][i]['Data']
747 if not isinstance(data, six.binary_type):
748 data = data.encode('utf-8')
749 params['Records'][i]['Data'] = base64.b64encode(
750 data).decode('utf-8')
751 return self.make_request(action='PutRecords',
752 body=json.dumps(params))
753
754 def remove_tags_from_stream(self, stream_name, tag_keys):
755 """
756 Deletes tags from the specified Amazon Kinesis stream.
757
758 If you specify a tag that does not exist, it is ignored.
759
760 :type stream_name: string
761 :param stream_name: The name of the stream.
762
763 :type tag_keys: list
764 :param tag_keys: A list of tag keys. Each corresponding tag is removed
765 from the stream.
766
767 """
768 params = {'StreamName': stream_name, 'TagKeys': tag_keys, }
769 return self.make_request(action='RemoveTagsFromStream',
770 body=json.dumps(params))
771
772 def split_shard(self, stream_name, shard_to_split, new_starting_hash_key):
773 """
774 Splits a shard into two new shards in the stream, to increase
775 the stream's capacity to ingest and transport data.
776 `SplitShard` is called when there is a need to increase the
777 overall capacity of stream because of an expected increase in
778 the volume of data records being ingested.
779
780 You can also use `SplitShard` when a shard appears to be
781 approaching its maximum utilization, for example, when the set
782 of producers sending data into the specific shard are suddenly
783 sending more than previously anticipated. You can also call
784 `SplitShard` to increase stream capacity, so that more Amazon
785 Kinesis applications can simultaneously read data from the
786 stream for real-time processing.
787
788 You must specify the shard to be split and the new hash key,
789 which is the position in the shard where the shard gets split
790 in two. In many cases, the new hash key might simply be the
791 average of the beginning and ending hash key, but it can be
792 any hash key value in the range being mapped into the shard.
793 For more information about splitting shards, see `Split a
794 Shard`_ in the Amazon Kinesis Developer Guide .
795
796 You can use DescribeStream to determine the shard ID and hash
797 key values for the `ShardToSplit` and `NewStartingHashKey`
798 parameters that are specified in the `SplitShard` request.
799
800 `SplitShard` is an asynchronous operation. Upon receiving a
801 `SplitShard` request, Amazon Kinesis immediately returns a
802 response and sets the stream status to `UPDATING`. After the
803 operation is completed, Amazon Kinesis sets the stream status
804 to `ACTIVE`. Read and write operations continue to work while
805 the stream is in the `UPDATING` state.
806
807 You can use `DescribeStream` to check the status of the
808 stream, which is returned in `StreamStatus`. If the stream is
809 in the `ACTIVE` state, you can call `SplitShard`. If a stream
810 is in `CREATING` or `UPDATING` or `DELETING` states,
811 `DescribeStream` returns a `ResourceInUseException`.
812
813 If the specified stream does not exist, `DescribeStream`
814 returns a `ResourceNotFoundException`. If you try to create
815 more shards than are authorized for your account, you receive
816 a `LimitExceededException`.
817
818 The default limit for an AWS account is 10 shards per stream.
819 If you need to create a stream with more than 10 shards,
820 `contact AWS Support`_ to increase the limit on your account.
821
822 If you try to operate on too many streams in parallel using
823 CreateStream, DeleteStream, MergeShards or SplitShard, you
824 receive a `LimitExceededException`.
825
826 `SplitShard` has limit of 5 transactions per second per
827 account.
828
829 :type stream_name: string
830 :param stream_name: The name of the stream for the shard split.
831
832 :type shard_to_split: string
833 :param shard_to_split: The shard ID of the shard to split.
834
835 :type new_starting_hash_key: string
836 :param new_starting_hash_key: A hash key value for the starting hash
837 key of one of the child shards created by the split. The hash key
838 range for a given shard constitutes a set of ordered contiguous
839 positive integers. The value for `NewStartingHashKey` must be in
840 the range of hash keys being mapped into the shard. The
841 `NewStartingHashKey` hash key value and all higher hash key values
842 in hash key range are distributed to one of the child shards. All
843 the lower hash key values in the range are distributed to the other
844 child shard.
845
846 """
847 params = {
848 'StreamName': stream_name,
849 'ShardToSplit': shard_to_split,
850 'NewStartingHashKey': new_starting_hash_key,
851 }
852 return self.make_request(action='SplitShard',
853 body=json.dumps(params))
854
855 def make_request(self, action, body):
856 headers = {
857 'X-Amz-Target': '%s.%s' % (self.TargetPrefix, action),
858 'Host': self.region.endpoint,
859 'Content-Type': 'application/x-amz-json-1.1',
860 'Content-Length': str(len(body)),
861 }
862 http_request = self.build_base_http_request(
863 method='POST', path='/', auth_path='/', params={},
864 headers=headers, data=body)
865 response = self._mexe(http_request, sender=None,
866 override_num_retries=10)
867 response_body = response.read().decode('utf-8')
868 boto.log.debug(response.getheaders())
869 boto.log.debug(response_body)
870 if response.status == 200:
871 if response_body:
872 return json.loads(response_body)
873 else:
874 json_body = json.loads(response_body)
875 fault_name = json_body.get('__type', None)
876 exception_class = self._faults.get(fault_name, self.ResponseError)
877 raise exception_class(response.status, response.reason,
878 body=json_body)
879