Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/boto/kinesis/layer1.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 # Copyright (c) 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved | |
| 2 # | |
| 3 # Permission is hereby granted, free of charge, to any person obtaining a | |
| 4 # copy of this software and associated documentation files (the | |
| 5 # "Software"), to deal in the Software without restriction, including | |
| 6 # without limitation the rights to use, copy, modify, merge, publish, dis- | |
| 7 # tribute, sublicense, and/or sell copies of the Software, and to permit | |
| 8 # persons to whom the Software is furnished to do so, subject to the fol- | |
| 9 # lowing conditions: | |
| 10 # | |
| 11 # The above copyright notice and this permission notice shall be included | |
| 12 # in all copies or substantial portions of the Software. | |
| 13 # | |
| 14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
| 15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
| 16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
| 17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
| 18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| 19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
| 20 # IN THE SOFTWARE. | |
| 21 # | |
| 22 | |
| 23 import base64 | |
| 24 import boto | |
| 25 | |
| 26 from boto.connection import AWSQueryConnection | |
| 27 from boto.regioninfo import RegionInfo | |
| 28 from boto.exception import JSONResponseError | |
| 29 from boto.kinesis import exceptions | |
| 30 from boto.compat import json | |
| 31 from boto.compat import six | |
| 32 | |
| 33 | |
| 34 class KinesisConnection(AWSQueryConnection): | |
| 35 """ | |
| 36 Amazon Kinesis Service API Reference | |
| 37 Amazon Kinesis is a managed service that scales elastically for | |
| 38 real time processing of streaming big data. | |
| 39 """ | |
| 40 APIVersion = "2013-12-02" | |
| 41 DefaultRegionName = "us-east-1" | |
| 42 DefaultRegionEndpoint = "kinesis.us-east-1.amazonaws.com" | |
| 43 ServiceName = "Kinesis" | |
| 44 TargetPrefix = "Kinesis_20131202" | |
| 45 ResponseError = JSONResponseError | |
| 46 | |
| 47 _faults = { | |
| 48 "ProvisionedThroughputExceededException": exceptions.ProvisionedThroughputExceededException, | |
| 49 "LimitExceededException": exceptions.LimitExceededException, | |
| 50 "ExpiredIteratorException": exceptions.ExpiredIteratorException, | |
| 51 "ResourceInUseException": exceptions.ResourceInUseException, | |
| 52 "ResourceNotFoundException": exceptions.ResourceNotFoundException, | |
| 53 "InvalidArgumentException": exceptions.InvalidArgumentException, | |
| 54 "SubscriptionRequiredException": exceptions.SubscriptionRequiredException | |
| 55 } | |
| 56 | |
| 57 | |
| 58 def __init__(self, **kwargs): | |
| 59 region = kwargs.pop('region', None) | |
| 60 if not region: | |
| 61 region = RegionInfo(self, self.DefaultRegionName, | |
| 62 self.DefaultRegionEndpoint) | |
| 63 if 'host' not in kwargs: | |
| 64 kwargs['host'] = region.endpoint | |
| 65 super(KinesisConnection, self).__init__(**kwargs) | |
| 66 self.region = region | |
| 67 | |
| 68 def _required_auth_capability(self): | |
| 69 return ['hmac-v4'] | |
| 70 | |
| 71 def add_tags_to_stream(self, stream_name, tags): | |
| 72 """ | |
| 73 Adds or updates tags for the specified Amazon Kinesis stream. | |
| 74 Each stream can have up to 10 tags. | |
| 75 | |
| 76 If tags have already been assigned to the stream, | |
| 77 `AddTagsToStream` overwrites any existing tags that correspond | |
| 78 to the specified tag keys. | |
| 79 | |
| 80 :type stream_name: string | |
| 81 :param stream_name: The name of the stream. | |
| 82 | |
| 83 :type tags: map | |
| 84 :param tags: The set of key-value pairs to use to create the tags. | |
| 85 | |
| 86 """ | |
| 87 params = {'StreamName': stream_name, 'Tags': tags, } | |
| 88 return self.make_request(action='AddTagsToStream', | |
| 89 body=json.dumps(params)) | |
| 90 | |
| 91 def create_stream(self, stream_name, shard_count): | |
| 92 """ | |
| 93 Creates a Amazon Kinesis stream. A stream captures and | |
| 94 transports data records that are continuously emitted from | |
| 95 different data sources or producers . Scale-out within an | |
| 96 Amazon Kinesis stream is explicitly supported by means of | |
| 97 shards, which are uniquely identified groups of data records | |
| 98 in an Amazon Kinesis stream. | |
| 99 | |
| 100 You specify and control the number of shards that a stream is | |
| 101 composed of. Each open shard can support up to 5 read | |
| 102 transactions per second, up to a maximum total of 2 MB of data | |
| 103 read per second. Each shard can support up to 1000 records | |
| 104 written per second, up to a maximum total of 1 MB data written | |
| 105 per second. You can add shards to a stream if the amount of | |
| 106 data input increases and you can remove shards if the amount | |
| 107 of data input decreases. | |
| 108 | |
| 109 The stream name identifies the stream. The name is scoped to | |
| 110 the AWS account used by the application. It is also scoped by | |
| 111 region. That is, two streams in two different accounts can | |
| 112 have the same name, and two streams in the same account, but | |
| 113 in two different regions, can have the same name. | |
| 114 | |
| 115 `CreateStream` is an asynchronous operation. Upon receiving a | |
| 116 `CreateStream` request, Amazon Kinesis immediately returns and | |
| 117 sets the stream status to `CREATING`. After the stream is | |
| 118 created, Amazon Kinesis sets the stream status to `ACTIVE`. | |
| 119 You should perform read and write operations only on an | |
| 120 `ACTIVE` stream. | |
| 121 | |
| 122 You receive a `LimitExceededException` when making a | |
| 123 `CreateStream` request if you try to do one of the following: | |
| 124 | |
| 125 | |
| 126 + Have more than five streams in the `CREATING` state at any | |
| 127 point in time. | |
| 128 + Create more shards than are authorized for your account. | |
| 129 | |
| 130 | |
| 131 The default limit for an AWS account is 10 shards per stream. | |
| 132 If you need to create a stream with more than 10 shards, | |
| 133 `contact AWS Support`_ to increase the limit on your account. | |
| 134 | |
| 135 You can use `DescribeStream` to check the stream status, which | |
| 136 is returned in `StreamStatus`. | |
| 137 | |
| 138 `CreateStream` has a limit of 5 transactions per second per | |
| 139 account. | |
| 140 | |
| 141 :type stream_name: string | |
| 142 :param stream_name: A name to identify the stream. The stream name is | |
| 143 scoped to the AWS account used by the application that creates the | |
| 144 stream. It is also scoped by region. That is, two streams in two | |
| 145 different AWS accounts can have the same name, and two streams in | |
| 146 the same AWS account, but in two different regions, can have the | |
| 147 same name. | |
| 148 | |
| 149 :type shard_count: integer | |
| 150 :param shard_count: The number of shards that the stream will use. The | |
| 151 throughput of the stream is a function of the number of shards; | |
| 152 more shards are required for greater provisioned throughput. | |
| 153 **Note:** The default limit for an AWS account is 10 shards per stream. | |
| 154 If you need to create a stream with more than 10 shards, `contact | |
| 155 AWS Support`_ to increase the limit on your account. | |
| 156 | |
| 157 """ | |
| 158 params = { | |
| 159 'StreamName': stream_name, | |
| 160 'ShardCount': shard_count, | |
| 161 } | |
| 162 return self.make_request(action='CreateStream', | |
| 163 body=json.dumps(params)) | |
| 164 | |
| 165 def delete_stream(self, stream_name): | |
| 166 """ | |
| 167 Deletes a stream and all its shards and data. You must shut | |
| 168 down any applications that are operating on the stream before | |
| 169 you delete the stream. If an application attempts to operate | |
| 170 on a deleted stream, it will receive the exception | |
| 171 `ResourceNotFoundException`. | |
| 172 | |
| 173 If the stream is in the `ACTIVE` state, you can delete it. | |
| 174 After a `DeleteStream` request, the specified stream is in the | |
| 175 `DELETING` state until Amazon Kinesis completes the deletion. | |
| 176 | |
| 177 **Note:** Amazon Kinesis might continue to accept data read | |
| 178 and write operations, such as PutRecord, PutRecords, and | |
| 179 GetRecords, on a stream in the `DELETING` state until the | |
| 180 stream deletion is complete. | |
| 181 | |
| 182 When you delete a stream, any shards in that stream are also | |
| 183 deleted, and any tags are dissociated from the stream. | |
| 184 | |
| 185 You can use the DescribeStream operation to check the state of | |
| 186 the stream, which is returned in `StreamStatus`. | |
| 187 | |
| 188 `DeleteStream` has a limit of 5 transactions per second per | |
| 189 account. | |
| 190 | |
| 191 :type stream_name: string | |
| 192 :param stream_name: The name of the stream to delete. | |
| 193 | |
| 194 """ | |
| 195 params = {'StreamName': stream_name, } | |
| 196 return self.make_request(action='DeleteStream', | |
| 197 body=json.dumps(params)) | |
| 198 | |
| 199 def describe_stream(self, stream_name, limit=None, | |
| 200 exclusive_start_shard_id=None): | |
| 201 """ | |
| 202 Describes the specified stream. | |
| 203 | |
| 204 The information about the stream includes its current status, | |
| 205 its Amazon Resource Name (ARN), and an array of shard objects. | |
| 206 For each shard object, there is information about the hash key | |
| 207 and sequence number ranges that the shard spans, and the IDs | |
| 208 of any earlier shards that played in a role in creating the | |
| 209 shard. A sequence number is the identifier associated with | |
| 210 every record ingested in the Amazon Kinesis stream. The | |
| 211 sequence number is assigned when a record is put into the | |
| 212 stream. | |
| 213 | |
| 214 You can limit the number of returned shards using the `Limit` | |
| 215 parameter. The number of shards in a stream may be too large | |
| 216 to return from a single call to `DescribeStream`. You can | |
| 217 detect this by using the `HasMoreShards` flag in the returned | |
| 218 output. `HasMoreShards` is set to `True` when there is more | |
| 219 data available. | |
| 220 | |
| 221 `DescribeStream` is a paginated operation. If there are more | |
| 222 shards available, you can request them using the shard ID of | |
| 223 the last shard returned. Specify this ID in the | |
| 224 `ExclusiveStartShardId` parameter in a subsequent request to | |
| 225 `DescribeStream`. | |
| 226 | |
| 227 `DescribeStream` has a limit of 10 transactions per second per | |
| 228 account. | |
| 229 | |
| 230 :type stream_name: string | |
| 231 :param stream_name: The name of the stream to describe. | |
| 232 | |
| 233 :type limit: integer | |
| 234 :param limit: The maximum number of shards to return. | |
| 235 | |
| 236 :type exclusive_start_shard_id: string | |
| 237 :param exclusive_start_shard_id: The shard ID of the shard to start | |
| 238 with. | |
| 239 | |
| 240 """ | |
| 241 params = {'StreamName': stream_name, } | |
| 242 if limit is not None: | |
| 243 params['Limit'] = limit | |
| 244 if exclusive_start_shard_id is not None: | |
| 245 params['ExclusiveStartShardId'] = exclusive_start_shard_id | |
| 246 return self.make_request(action='DescribeStream', | |
| 247 body=json.dumps(params)) | |
| 248 | |
| 249 def get_records(self, shard_iterator, limit=None, b64_decode=True): | |
| 250 """ | |
| 251 Gets data records from a shard. | |
| 252 | |
| 253 Specify a shard iterator using the `ShardIterator` parameter. | |
| 254 The shard iterator specifies the position in the shard from | |
| 255 which you want to start reading data records sequentially. If | |
| 256 there are no records available in the portion of the shard | |
| 257 that the iterator points to, `GetRecords` returns an empty | |
| 258 list. Note that it might take multiple calls to get to a | |
| 259 portion of the shard that contains records. | |
| 260 | |
| 261 You can scale by provisioning multiple shards. Your | |
| 262 application should have one thread per shard, each reading | |
| 263 continuously from its stream. To read from a stream | |
| 264 continually, call `GetRecords` in a loop. Use GetShardIterator | |
| 265 to get the shard iterator to specify in the first `GetRecords` | |
| 266 call. `GetRecords` returns a new shard iterator in | |
| 267 `NextShardIterator`. Specify the shard iterator returned in | |
| 268 `NextShardIterator` in subsequent calls to `GetRecords`. Note | |
| 269 that if the shard has been closed, the shard iterator can't | |
| 270 return more data and `GetRecords` returns `null` in | |
| 271 `NextShardIterator`. You can terminate the loop when the shard | |
| 272 is closed, or when the shard iterator reaches the record with | |
| 273 the sequence number or other attribute that marks it as the | |
| 274 last record to process. | |
| 275 | |
| 276 Each data record can be up to 50 KB in size, and each shard | |
| 277 can read up to 2 MB per second. You can ensure that your calls | |
| 278 don't exceed the maximum supported size or throughput by using | |
| 279 the `Limit` parameter to specify the maximum number of records | |
| 280 that `GetRecords` can return. Consider your average record | |
| 281 size when determining this limit. For example, if your average | |
| 282 record size is 40 KB, you can limit the data returned to about | |
| 283 1 MB per call by specifying 25 as the limit. | |
| 284 | |
| 285 The size of the data returned by `GetRecords` will vary | |
| 286 depending on the utilization of the shard. The maximum size of | |
| 287 data that `GetRecords` can return is 10 MB. If a call returns | |
| 288 10 MB of data, subsequent calls made within the next 5 seconds | |
| 289 throw `ProvisionedThroughputExceededException`. If there is | |
| 290 insufficient provisioned throughput on the shard, subsequent | |
| 291 calls made within the next 1 second throw | |
| 292 `ProvisionedThroughputExceededException`. Note that | |
| 293 `GetRecords` won't return any data when it throws an | |
| 294 exception. For this reason, we recommend that you wait one | |
| 295 second between calls to `GetRecords`; however, it's possible | |
| 296 that the application will get exceptions for longer than 1 | |
| 297 second. | |
| 298 | |
| 299 To detect whether the application is falling behind in | |
| 300 processing, add a timestamp to your records and note how long | |
| 301 it takes to process them. You can also monitor how much data | |
| 302 is in a stream using the CloudWatch metrics for write | |
| 303 operations ( `PutRecord` and `PutRecords`). For more | |
| 304 information, see `Monitoring Amazon Kinesis with Amazon | |
| 305 CloudWatch`_ in the Amazon Kinesis Developer Guide . | |
| 306 | |
| 307 :type shard_iterator: string | |
| 308 :param shard_iterator: The position in the shard from which you want to | |
| 309 start sequentially reading data records. A shard iterator specifies | |
| 310 this position using the sequence number of a data record in the | |
| 311 shard. | |
| 312 | |
| 313 :type limit: integer | |
| 314 :param limit: The maximum number of records to return. Specify a value | |
| 315 of up to 10,000. If you specify a value that is greater than | |
| 316 10,000, `GetRecords` throws `InvalidArgumentException`. | |
| 317 | |
| 318 :type b64_decode: boolean | |
| 319 :param b64_decode: Decode the Base64-encoded ``Data`` field of records. | |
| 320 | |
| 321 """ | |
| 322 params = {'ShardIterator': shard_iterator, } | |
| 323 if limit is not None: | |
| 324 params['Limit'] = limit | |
| 325 | |
| 326 response = self.make_request(action='GetRecords', | |
| 327 body=json.dumps(params)) | |
| 328 | |
| 329 # Base64 decode the data | |
| 330 if b64_decode: | |
| 331 for record in response.get('Records', []): | |
| 332 record['Data'] = base64.b64decode( | |
| 333 record['Data'].encode('utf-8')).decode('utf-8') | |
| 334 | |
| 335 return response | |
| 336 | |
| 337 def get_shard_iterator(self, stream_name, shard_id, shard_iterator_type, | |
| 338 starting_sequence_number=None): | |
| 339 """ | |
| 340 Gets a shard iterator. A shard iterator expires five minutes | |
| 341 after it is returned to the requester. | |
| 342 | |
| 343 A shard iterator specifies the position in the shard from | |
| 344 which to start reading data records sequentially. A shard | |
| 345 iterator specifies this position using the sequence number of | |
| 346 a data record in a shard. A sequence number is the identifier | |
| 347 associated with every record ingested in the Amazon Kinesis | |
| 348 stream. The sequence number is assigned when a record is put | |
| 349 into the stream. | |
| 350 | |
| 351 You must specify the shard iterator type. For example, you can | |
| 352 set the `ShardIteratorType` parameter to read exactly from the | |
| 353 position denoted by a specific sequence number by using the | |
| 354 `AT_SEQUENCE_NUMBER` shard iterator type, or right after the | |
| 355 sequence number by using the `AFTER_SEQUENCE_NUMBER` shard | |
| 356 iterator type, using sequence numbers returned by earlier | |
| 357 calls to PutRecord, PutRecords, GetRecords, or DescribeStream. | |
| 358 You can specify the shard iterator type `TRIM_HORIZON` in the | |
| 359 request to cause `ShardIterator` to point to the last | |
| 360 untrimmed record in the shard in the system, which is the | |
| 361 oldest data record in the shard. Or you can point to just | |
| 362 after the most recent record in the shard, by using the shard | |
| 363 iterator type `LATEST`, so that you always read the most | |
| 364 recent data in the shard. | |
| 365 | |
| 366 When you repeatedly read from an Amazon Kinesis stream use a | |
| 367 GetShardIterator request to get the first shard iterator to to | |
| 368 use in your first `GetRecords` request and then use the shard | |
| 369 iterator returned by the `GetRecords` request in | |
| 370 `NextShardIterator` for subsequent reads. A new shard iterator | |
| 371 is returned by every `GetRecords` request in | |
| 372 `NextShardIterator`, which you use in the `ShardIterator` | |
| 373 parameter of the next `GetRecords` request. | |
| 374 | |
| 375 If a `GetShardIterator` request is made too often, you receive | |
| 376 a `ProvisionedThroughputExceededException`. For more | |
| 377 information about throughput limits, see GetRecords. | |
| 378 | |
| 379 If the shard is closed, the iterator can't return more data, | |
| 380 and `GetShardIterator` returns `null` for its `ShardIterator`. | |
| 381 A shard can be closed using SplitShard or MergeShards. | |
| 382 | |
| 383 `GetShardIterator` has a limit of 5 transactions per second | |
| 384 per account per open shard. | |
| 385 | |
| 386 :type stream_name: string | |
| 387 :param stream_name: The name of the stream. | |
| 388 | |
| 389 :type shard_id: string | |
| 390 :param shard_id: The shard ID of the shard to get the iterator for. | |
| 391 | |
| 392 :type shard_iterator_type: string | |
| 393 :param shard_iterator_type: | |
| 394 Determines how the shard iterator is used to start reading data records | |
| 395 from the shard. | |
| 396 | |
| 397 The following are the valid shard iterator types: | |
| 398 | |
| 399 | |
| 400 + AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted | |
| 401 by a specific sequence number. | |
| 402 + AFTER_SEQUENCE_NUMBER - Start reading right after the position | |
| 403 denoted by a specific sequence number. | |
| 404 + TRIM_HORIZON - Start reading at the last untrimmed record in the | |
| 405 shard in the system, which is the oldest data record in the shard. | |
| 406 + LATEST - Start reading just after the most recent record in the | |
| 407 shard, so that you always read the most recent data in the shard. | |
| 408 | |
| 409 :type starting_sequence_number: string | |
| 410 :param starting_sequence_number: The sequence number of the data record | |
| 411 in the shard from which to start reading from. | |
| 412 | |
| 413 :returns: A dictionary containing: | |
| 414 | |
| 415 1) a `ShardIterator` with the value being the shard-iterator object | |
| 416 """ | |
| 417 | |
| 418 params = { | |
| 419 'StreamName': stream_name, | |
| 420 'ShardId': shard_id, | |
| 421 'ShardIteratorType': shard_iterator_type, | |
| 422 } | |
| 423 if starting_sequence_number is not None: | |
| 424 params['StartingSequenceNumber'] = starting_sequence_number | |
| 425 return self.make_request(action='GetShardIterator', | |
| 426 body=json.dumps(params)) | |
| 427 | |
| 428 def list_streams(self, limit=None, exclusive_start_stream_name=None): | |
| 429 """ | |
| 430 Lists your streams. | |
| 431 | |
| 432 The number of streams may be too large to return from a single | |
| 433 call to `ListStreams`. You can limit the number of returned | |
| 434 streams using the `Limit` parameter. If you do not specify a | |
| 435 value for the `Limit` parameter, Amazon Kinesis uses the | |
| 436 default limit, which is currently 10. | |
| 437 | |
| 438 You can detect if there are more streams available to list by | |
| 439 using the `HasMoreStreams` flag from the returned output. If | |
| 440 there are more streams available, you can request more streams | |
| 441 by using the name of the last stream returned by the | |
| 442 `ListStreams` request in the `ExclusiveStartStreamName` | |
| 443 parameter in a subsequent request to `ListStreams`. The group | |
| 444 of stream names returned by the subsequent request is then | |
| 445 added to the list. You can continue this process until all the | |
| 446 stream names have been collected in the list. | |
| 447 | |
| 448 `ListStreams` has a limit of 5 transactions per second per | |
| 449 account. | |
| 450 | |
| 451 :type limit: integer | |
| 452 :param limit: The maximum number of streams to list. | |
| 453 | |
| 454 :type exclusive_start_stream_name: string | |
| 455 :param exclusive_start_stream_name: The name of the stream to start the | |
| 456 list with. | |
| 457 | |
| 458 """ | |
| 459 params = {} | |
| 460 if limit is not None: | |
| 461 params['Limit'] = limit | |
| 462 if exclusive_start_stream_name is not None: | |
| 463 params['ExclusiveStartStreamName'] = exclusive_start_stream_name | |
| 464 return self.make_request(action='ListStreams', | |
| 465 body=json.dumps(params)) | |
| 466 | |
| 467 def list_tags_for_stream(self, stream_name, exclusive_start_tag_key=None, | |
| 468 limit=None): | |
| 469 """ | |
| 470 Lists the tags for the specified Amazon Kinesis stream. | |
| 471 | |
| 472 :type stream_name: string | |
| 473 :param stream_name: The name of the stream. | |
| 474 | |
| 475 :type exclusive_start_tag_key: string | |
| 476 :param exclusive_start_tag_key: The key to use as the starting point | |
| 477 for the list of tags. If this parameter is set, `ListTagsForStream` | |
| 478 gets all tags that occur after `ExclusiveStartTagKey`. | |
| 479 | |
| 480 :type limit: integer | |
| 481 :param limit: The number of tags to return. If this number is less than | |
| 482 the total number of tags associated with the stream, `HasMoreTags` | |
| 483 is set to `True`. To list additional tags, set | |
| 484 `ExclusiveStartTagKey` to the last key in the response. | |
| 485 | |
| 486 """ | |
| 487 params = {'StreamName': stream_name, } | |
| 488 if exclusive_start_tag_key is not None: | |
| 489 params['ExclusiveStartTagKey'] = exclusive_start_tag_key | |
| 490 if limit is not None: | |
| 491 params['Limit'] = limit | |
| 492 return self.make_request(action='ListTagsForStream', | |
| 493 body=json.dumps(params)) | |
| 494 | |
| 495 def merge_shards(self, stream_name, shard_to_merge, | |
| 496 adjacent_shard_to_merge): | |
| 497 """ | |
| 498 Merges two adjacent shards in a stream and combines them into | |
| 499 a single shard to reduce the stream's capacity to ingest and | |
| 500 transport data. Two shards are considered adjacent if the | |
| 501 union of the hash key ranges for the two shards form a | |
| 502 contiguous set with no gaps. For example, if you have two | |
| 503 shards, one with a hash key range of 276...381 and the other | |
| 504 with a hash key range of 382...454, then you could merge these | |
| 505 two shards into a single shard that would have a hash key | |
| 506 range of 276...454. After the merge, the single child shard | |
| 507 receives data for all hash key values covered by the two | |
| 508 parent shards. | |
| 509 | |
| 510 `MergeShards` is called when there is a need to reduce the | |
| 511 overall capacity of a stream because of excess capacity that | |
| 512 is not being used. You must specify the shard to be merged and | |
| 513 the adjacent shard for a stream. For more information about | |
| 514 merging shards, see `Merge Two Shards`_ in the Amazon Kinesis | |
| 515 Developer Guide . | |
| 516 | |
| 517 If the stream is in the `ACTIVE` state, you can call | |
| 518 `MergeShards`. If a stream is in the `CREATING`, `UPDATING`, | |
| 519 or `DELETING` state, `MergeShards` returns a | |
| 520 `ResourceInUseException`. If the specified stream does not | |
| 521 exist, `MergeShards` returns a `ResourceNotFoundException`. | |
| 522 | |
| 523 You can use DescribeStream to check the state of the stream, | |
| 524 which is returned in `StreamStatus`. | |
| 525 | |
| 526 `MergeShards` is an asynchronous operation. Upon receiving a | |
| 527 `MergeShards` request, Amazon Kinesis immediately returns a | |
| 528 response and sets the `StreamStatus` to `UPDATING`. After the | |
| 529 operation is completed, Amazon Kinesis sets the `StreamStatus` | |
| 530 to `ACTIVE`. Read and write operations continue to work while | |
| 531 the stream is in the `UPDATING` state. | |
| 532 | |
| 533 You use DescribeStream to determine the shard IDs that are | |
| 534 specified in the `MergeShards` request. | |
| 535 | |
| 536 If you try to operate on too many streams in parallel using | |
| 537 CreateStream, DeleteStream, `MergeShards` or SplitShard, you | |
| 538 will receive a `LimitExceededException`. | |
| 539 | |
| 540 `MergeShards` has limit of 5 transactions per second per | |
| 541 account. | |
| 542 | |
| 543 :type stream_name: string | |
| 544 :param stream_name: The name of the stream for the merge. | |
| 545 | |
| 546 :type shard_to_merge: string | |
| 547 :param shard_to_merge: The shard ID of the shard to combine with the | |
| 548 adjacent shard for the merge. | |
| 549 | |
| 550 :type adjacent_shard_to_merge: string | |
| 551 :param adjacent_shard_to_merge: The shard ID of the adjacent shard for | |
| 552 the merge. | |
| 553 | |
| 554 """ | |
| 555 params = { | |
| 556 'StreamName': stream_name, | |
| 557 'ShardToMerge': shard_to_merge, | |
| 558 'AdjacentShardToMerge': adjacent_shard_to_merge, | |
| 559 } | |
| 560 return self.make_request(action='MergeShards', | |
| 561 body=json.dumps(params)) | |
| 562 | |
| 563 def put_record(self, stream_name, data, partition_key, | |
| 564 explicit_hash_key=None, | |
| 565 sequence_number_for_ordering=None, | |
| 566 exclusive_minimum_sequence_number=None, | |
| 567 b64_encode=True): | |
| 568 """ | |
| 569 This operation puts a data record into an Amazon Kinesis | |
| 570 stream from a producer. This operation must be called to send | |
| 571 data from the producer into the Amazon Kinesis stream for | |
| 572 real-time ingestion and subsequent processing. The `PutRecord` | |
| 573 operation requires the name of the stream that captures, | |
| 574 stores, and transports the data; a partition key; and the data | |
| 575 blob itself. The data blob could be a segment from a log file, | |
| 576 geographic/location data, website clickstream data, or any | |
| 577 other data type. | |
| 578 | |
| 579 The partition key is used to distribute data across shards. | |
| 580 Amazon Kinesis segregates the data records that belong to a | |
| 581 data stream into multiple shards, using the partition key | |
| 582 associated with each data record to determine which shard a | |
| 583 given data record belongs to. | |
| 584 | |
| 585 Partition keys are Unicode strings, with a maximum length | |
| 586 limit of 256 bytes. An MD5 hash function is used to map | |
| 587 partition keys to 128-bit integer values and to map associated | |
| 588 data records to shards using the hash key ranges of the | |
| 589 shards. You can override hashing the partition key to | |
| 590 determine the shard by explicitly specifying a hash value | |
| 591 using the `ExplicitHashKey` parameter. For more information, | |
| 592 see the `Amazon Kinesis Developer Guide`_. | |
| 593 | |
| 594 `PutRecord` returns the shard ID of where the data record was | |
| 595 placed and the sequence number that was assigned to the data | |
| 596 record. | |
| 597 | |
| 598 Sequence numbers generally increase over time. To guarantee | |
| 599 strictly increasing ordering, use the | |
| 600 `SequenceNumberForOrdering` parameter. For more information, | |
| 601 see the `Amazon Kinesis Developer Guide`_. | |
| 602 | |
| 603 If a `PutRecord` request cannot be processed because of | |
| 604 insufficient provisioned throughput on the shard involved in | |
| 605 the request, `PutRecord` throws | |
| 606 `ProvisionedThroughputExceededException`. | |
| 607 | |
| 608 Data records are accessible for only 24 hours from the time | |
| 609 that they are added to an Amazon Kinesis stream. | |
| 610 | |
| 611 :type stream_name: string | |
| 612 :param stream_name: The name of the stream to put the data record into. | |
| 613 | |
| 614 :type data: blob | |
| 615 :param data: The data blob to put into the record, which is | |
| 616 Base64-encoded when the blob is serialized. | |
| 617 The maximum size of the data blob (the payload after | |
| 618 Base64-decoding) is 50 kilobytes (KB) | |
| 619 Set `b64_encode` to disable automatic Base64 encoding. | |
| 620 | |
| 621 :type partition_key: string | |
| 622 :param partition_key: Determines which shard in the stream the data | |
| 623 record is assigned to. Partition keys are Unicode strings with a | |
| 624 maximum length limit of 256 bytes. Amazon Kinesis uses the | |
| 625 partition key as input to a hash function that maps the partition | |
| 626 key and associated data to a specific shard. Specifically, an MD5 | |
| 627 hash function is used to map partition keys to 128-bit integer | |
| 628 values and to map associated data records to shards. As a result of | |
| 629 this hashing mechanism, all data records with the same partition | |
| 630 key will map to the same shard within the stream. | |
| 631 | |
| 632 :type explicit_hash_key: string | |
| 633 :param explicit_hash_key: The hash value used to explicitly determine | |
| 634 the shard the data record is assigned to by overriding the | |
| 635 partition key hash. | |
| 636 | |
| 637 :type sequence_number_for_ordering: string | |
| 638 :param sequence_number_for_ordering: Guarantees strictly increasing | |
| 639 sequence numbers, for puts from the same client and to the same | |
| 640 partition key. Usage: set the `SequenceNumberForOrdering` of record | |
| 641 n to the sequence number of record n-1 (as returned in the | |
| 642 PutRecordResult when putting record n-1 ). If this parameter is not | |
| 643 set, records will be coarsely ordered based on arrival time. | |
| 644 | |
| 645 :type b64_encode: boolean | |
| 646 :param b64_encode: Whether to Base64 encode `data`. Can be set to | |
| 647 ``False`` if `data` is already encoded to prevent double encoding. | |
| 648 | |
| 649 """ | |
| 650 params = { | |
| 651 'StreamName': stream_name, | |
| 652 'Data': data, | |
| 653 'PartitionKey': partition_key, | |
| 654 } | |
| 655 if explicit_hash_key is not None: | |
| 656 params['ExplicitHashKey'] = explicit_hash_key | |
| 657 if sequence_number_for_ordering is not None: | |
| 658 params['SequenceNumberForOrdering'] = sequence_number_for_ordering | |
| 659 if b64_encode: | |
| 660 if not isinstance(params['Data'], six.binary_type): | |
| 661 params['Data'] = params['Data'].encode('utf-8') | |
| 662 params['Data'] = base64.b64encode(params['Data']).decode('utf-8') | |
| 663 return self.make_request(action='PutRecord', | |
| 664 body=json.dumps(params)) | |
| 665 | |
| 666 def put_records(self, records, stream_name, b64_encode=True): | |
| 667 """ | |
| 668 Puts (writes) multiple data records from a producer into an | |
| 669 Amazon Kinesis stream in a single call (also referred to as a | |
| 670 `PutRecords` request). Use this operation to send data from a | |
| 671 data producer into the Amazon Kinesis stream for real-time | |
| 672 ingestion and processing. Each shard can support up to 1000 | |
| 673 records written per second, up to a maximum total of 1 MB data | |
| 674 written per second. | |
| 675 | |
| 676 You must specify the name of the stream that captures, stores, | |
| 677 and transports the data; and an array of request `Records`, | |
| 678 with each record in the array requiring a partition key and | |
| 679 data blob. | |
| 680 | |
| 681 The data blob can be any type of data; for example, a segment | |
| 682 from a log file, geographic/location data, website clickstream | |
| 683 data, and so on. | |
| 684 | |
| 685 The partition key is used by Amazon Kinesis as input to a hash | |
| 686 function that maps the partition key and associated data to a | |
| 687 specific shard. An MD5 hash function is used to map partition | |
| 688 keys to 128-bit integer values and to map associated data | |
| 689 records to shards. As a result of this hashing mechanism, all | |
| 690 data records with the same partition key map to the same shard | |
| 691 within the stream. For more information, see `Partition Key`_ | |
| 692 in the Amazon Kinesis Developer Guide . | |
| 693 | |
| 694 Each record in the `Records` array may include an optional | |
| 695 parameter, `ExplicitHashKey`, which overrides the partition | |
| 696 key to shard mapping. This parameter allows a data producer to | |
| 697 determine explicitly the shard where the record is stored. For | |
| 698 more information, see `Adding Multiple Records with | |
| 699 PutRecords`_ in the Amazon Kinesis Developer Guide . | |
| 700 | |
| 701 The `PutRecords` response includes an array of response | |
| 702 `Records`. Each record in the response array directly | |
| 703 correlates with a record in the request array using natural | |
| 704 ordering, from the top to the bottom of the request and | |
| 705 response. The response `Records` array always includes the | |
| 706 same number of records as the request array. | |
| 707 | |
| 708 The response `Records` array includes both successfully and | |
| 709 unsuccessfully processed records. Amazon Kinesis attempts to | |
| 710 process all records in each `PutRecords` request. A single | |
| 711 record failure does not stop the processing of subsequent | |
| 712 records. | |
| 713 | |
| 714 A successfully-processed record includes `ShardId` and | |
| 715 `SequenceNumber` values. The `ShardId` parameter identifies | |
| 716 the shard in the stream where the record is stored. The | |
| 717 `SequenceNumber` parameter is an identifier assigned to the | |
| 718 put record, unique to all records in the stream. | |
| 719 | |
| 720 An unsuccessfully-processed record includes `ErrorCode` and | |
| 721 `ErrorMessage` values. `ErrorCode` reflects the type of error | |
| 722 and can be one of the following values: | |
| 723 `ProvisionedThroughputExceededException` or `InternalFailure`. | |
| 724 `ErrorMessage` provides more detailed information about the | |
| 725 `ProvisionedThroughputExceededException` exception including | |
| 726 the account ID, stream name, and shard ID of the record that | |
| 727 was throttled. | |
| 728 | |
| 729 Data records are accessible for only 24 hours from the time | |
| 730 that they are added to an Amazon Kinesis stream. | |
| 731 | |
| 732 :type records: list | |
| 733 :param records: The records associated with the request. | |
| 734 | |
| 735 :type stream_name: string | |
| 736 :param stream_name: The stream name associated with the request. | |
| 737 | |
| 738 :type b64_encode: boolean | |
| 739 :param b64_encode: Whether to Base64 encode `data`. Can be set to | |
| 740 ``False`` if `data` is already encoded to prevent double encoding. | |
| 741 | |
| 742 """ | |
| 743 params = {'Records': records, 'StreamName': stream_name, } | |
| 744 if b64_encode: | |
| 745 for i in range(len(params['Records'])): | |
| 746 data = params['Records'][i]['Data'] | |
| 747 if not isinstance(data, six.binary_type): | |
| 748 data = data.encode('utf-8') | |
| 749 params['Records'][i]['Data'] = base64.b64encode( | |
| 750 data).decode('utf-8') | |
| 751 return self.make_request(action='PutRecords', | |
| 752 body=json.dumps(params)) | |
| 753 | |
| 754 def remove_tags_from_stream(self, stream_name, tag_keys): | |
| 755 """ | |
| 756 Deletes tags from the specified Amazon Kinesis stream. | |
| 757 | |
| 758 If you specify a tag that does not exist, it is ignored. | |
| 759 | |
| 760 :type stream_name: string | |
| 761 :param stream_name: The name of the stream. | |
| 762 | |
| 763 :type tag_keys: list | |
| 764 :param tag_keys: A list of tag keys. Each corresponding tag is removed | |
| 765 from the stream. | |
| 766 | |
| 767 """ | |
| 768 params = {'StreamName': stream_name, 'TagKeys': tag_keys, } | |
| 769 return self.make_request(action='RemoveTagsFromStream', | |
| 770 body=json.dumps(params)) | |
| 771 | |
| 772 def split_shard(self, stream_name, shard_to_split, new_starting_hash_key): | |
| 773 """ | |
| 774 Splits a shard into two new shards in the stream, to increase | |
| 775 the stream's capacity to ingest and transport data. | |
| 776 `SplitShard` is called when there is a need to increase the | |
| 777 overall capacity of stream because of an expected increase in | |
| 778 the volume of data records being ingested. | |
| 779 | |
| 780 You can also use `SplitShard` when a shard appears to be | |
| 781 approaching its maximum utilization, for example, when the set | |
| 782 of producers sending data into the specific shard are suddenly | |
| 783 sending more than previously anticipated. You can also call | |
| 784 `SplitShard` to increase stream capacity, so that more Amazon | |
| 785 Kinesis applications can simultaneously read data from the | |
| 786 stream for real-time processing. | |
| 787 | |
| 788 You must specify the shard to be split and the new hash key, | |
| 789 which is the position in the shard where the shard gets split | |
| 790 in two. In many cases, the new hash key might simply be the | |
| 791 average of the beginning and ending hash key, but it can be | |
| 792 any hash key value in the range being mapped into the shard. | |
| 793 For more information about splitting shards, see `Split a | |
| 794 Shard`_ in the Amazon Kinesis Developer Guide . | |
| 795 | |
| 796 You can use DescribeStream to determine the shard ID and hash | |
| 797 key values for the `ShardToSplit` and `NewStartingHashKey` | |
| 798 parameters that are specified in the `SplitShard` request. | |
| 799 | |
| 800 `SplitShard` is an asynchronous operation. Upon receiving a | |
| 801 `SplitShard` request, Amazon Kinesis immediately returns a | |
| 802 response and sets the stream status to `UPDATING`. After the | |
| 803 operation is completed, Amazon Kinesis sets the stream status | |
| 804 to `ACTIVE`. Read and write operations continue to work while | |
| 805 the stream is in the `UPDATING` state. | |
| 806 | |
| 807 You can use `DescribeStream` to check the status of the | |
| 808 stream, which is returned in `StreamStatus`. If the stream is | |
| 809 in the `ACTIVE` state, you can call `SplitShard`. If a stream | |
| 810 is in `CREATING` or `UPDATING` or `DELETING` states, | |
| 811 `DescribeStream` returns a `ResourceInUseException`. | |
| 812 | |
| 813 If the specified stream does not exist, `DescribeStream` | |
| 814 returns a `ResourceNotFoundException`. If you try to create | |
| 815 more shards than are authorized for your account, you receive | |
| 816 a `LimitExceededException`. | |
| 817 | |
| 818 The default limit for an AWS account is 10 shards per stream. | |
| 819 If you need to create a stream with more than 10 shards, | |
| 820 `contact AWS Support`_ to increase the limit on your account. | |
| 821 | |
| 822 If you try to operate on too many streams in parallel using | |
| 823 CreateStream, DeleteStream, MergeShards or SplitShard, you | |
| 824 receive a `LimitExceededException`. | |
| 825 | |
| 826 `SplitShard` has limit of 5 transactions per second per | |
| 827 account. | |
| 828 | |
| 829 :type stream_name: string | |
| 830 :param stream_name: The name of the stream for the shard split. | |
| 831 | |
| 832 :type shard_to_split: string | |
| 833 :param shard_to_split: The shard ID of the shard to split. | |
| 834 | |
| 835 :type new_starting_hash_key: string | |
| 836 :param new_starting_hash_key: A hash key value for the starting hash | |
| 837 key of one of the child shards created by the split. The hash key | |
| 838 range for a given shard constitutes a set of ordered contiguous | |
| 839 positive integers. The value for `NewStartingHashKey` must be in | |
| 840 the range of hash keys being mapped into the shard. The | |
| 841 `NewStartingHashKey` hash key value and all higher hash key values | |
| 842 in hash key range are distributed to one of the child shards. All | |
| 843 the lower hash key values in the range are distributed to the other | |
| 844 child shard. | |
| 845 | |
| 846 """ | |
| 847 params = { | |
| 848 'StreamName': stream_name, | |
| 849 'ShardToSplit': shard_to_split, | |
| 850 'NewStartingHashKey': new_starting_hash_key, | |
| 851 } | |
| 852 return self.make_request(action='SplitShard', | |
| 853 body=json.dumps(params)) | |
| 854 | |
| 855 def make_request(self, action, body): | |
| 856 headers = { | |
| 857 'X-Amz-Target': '%s.%s' % (self.TargetPrefix, action), | |
| 858 'Host': self.region.endpoint, | |
| 859 'Content-Type': 'application/x-amz-json-1.1', | |
| 860 'Content-Length': str(len(body)), | |
| 861 } | |
| 862 http_request = self.build_base_http_request( | |
| 863 method='POST', path='/', auth_path='/', params={}, | |
| 864 headers=headers, data=body) | |
| 865 response = self._mexe(http_request, sender=None, | |
| 866 override_num_retries=10) | |
| 867 response_body = response.read().decode('utf-8') | |
| 868 boto.log.debug(response.getheaders()) | |
| 869 boto.log.debug(response_body) | |
| 870 if response.status == 200: | |
| 871 if response_body: | |
| 872 return json.loads(response_body) | |
| 873 else: | |
| 874 json_body = json.loads(response_body) | |
| 875 fault_name = json_body.get('__type', None) | |
| 876 exception_class = self._faults.get(fault_name, self.ResponseError) | |
| 877 raise exception_class(response.status, response.reason, | |
| 878 body=json_body) | |
| 879 |
