comparison env/lib/python3.9/site-packages/boto/dynamodb2/table.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import boto
2 from boto.dynamodb2 import exceptions
3 from boto.dynamodb2.fields import (HashKey, RangeKey,
4 AllIndex, KeysOnlyIndex, IncludeIndex,
5 GlobalAllIndex, GlobalKeysOnlyIndex,
6 GlobalIncludeIndex)
7 from boto.dynamodb2.items import Item
8 from boto.dynamodb2.layer1 import DynamoDBConnection
9 from boto.dynamodb2.results import ResultSet, BatchGetResultSet
10 from boto.dynamodb2.types import (NonBooleanDynamizer, Dynamizer, FILTER_OPERATORS,
11 QUERY_OPERATORS, STRING)
12 from boto.exception import JSONResponseError
13
14
15 class Table(object):
16 """
17 Interacts & models the behavior of a DynamoDB table.
18
19 The ``Table`` object represents a set (or rough categorization) of
20 records within DynamoDB. The important part is that all records within the
21 table, while largely-schema-free, share the same schema & are essentially
22 namespaced for use in your application. For example, you might have a
23 ``users`` table or a ``forums`` table.
24 """
25 max_batch_get = 100
26
27 _PROJECTION_TYPE_TO_INDEX = dict(
28 global_indexes=dict(
29 ALL=GlobalAllIndex,
30 KEYS_ONLY=GlobalKeysOnlyIndex,
31 INCLUDE=GlobalIncludeIndex,
32 ), local_indexes=dict(
33 ALL=AllIndex,
34 KEYS_ONLY=KeysOnlyIndex,
35 INCLUDE=IncludeIndex,
36 )
37 )
38
39 def __init__(self, table_name, schema=None, throughput=None, indexes=None,
40 global_indexes=None, connection=None):
41 """
42 Sets up a new in-memory ``Table``.
43
44 This is useful if the table already exists within DynamoDB & you simply
45 want to use it for additional interactions. The only required parameter
46 is the ``table_name``. However, under the hood, the object will call
47 ``describe_table`` to determine the schema/indexes/throughput. You
48 can avoid this extra call by passing in ``schema`` & ``indexes``.
49
50 **IMPORTANT** - If you're creating a new ``Table`` for the first time,
51 you should use the ``Table.create`` method instead, as it will
52 persist the table structure to DynamoDB.
53
54 Requires a ``table_name`` parameter, which should be a simple string
55 of the name of the table.
56
57 Optionally accepts a ``schema`` parameter, which should be a list of
58 ``BaseSchemaField`` subclasses representing the desired schema.
59
60 Optionally accepts a ``throughput`` parameter, which should be a
61 dictionary. If provided, it should specify a ``read`` & ``write`` key,
62 both of which should have an integer value associated with them.
63
64 Optionally accepts a ``indexes`` parameter, which should be a list of
65 ``BaseIndexField`` subclasses representing the desired indexes.
66
67 Optionally accepts a ``global_indexes`` parameter, which should be a
68 list of ``GlobalBaseIndexField`` subclasses representing the desired
69 indexes.
70
71 Optionally accepts a ``connection`` parameter, which should be a
72 ``DynamoDBConnection`` instance (or subclass). This is primarily useful
73 for specifying alternate connection parameters.
74
75 Example::
76
77 # The simple, it-already-exists case.
78 >>> conn = Table('users')
79
80 # The full, minimum-extra-calls case.
81 >>> from boto import dynamodb2
82 >>> users = Table('users', schema=[
83 ... HashKey('username'),
84 ... RangeKey('date_joined', data_type=NUMBER)
85 ... ], throughput={
86 ... 'read':20,
87 ... 'write': 10,
88 ... }, indexes=[
89 ... KeysOnlyIndex('MostRecentlyJoined', parts=[
90 ... HashKey('username')
91 ... RangeKey('date_joined')
92 ... ]),
93 ... ], global_indexes=[
94 ... GlobalAllIndex('UsersByZipcode', parts=[
95 ... HashKey('zipcode'),
96 ... RangeKey('username'),
97 ... ],
98 ... throughput={
99 ... 'read':10,
100 ... 'write':10,
101 ... }),
102 ... ], connection=dynamodb2.connect_to_region('us-west-2',
103 ... aws_access_key_id='key',
104 ... aws_secret_access_key='key',
105 ... ))
106
107 """
108 self.table_name = table_name
109 self.connection = connection
110 self.throughput = {
111 'read': 5,
112 'write': 5,
113 }
114 self.schema = schema
115 self.indexes = indexes
116 self.global_indexes = global_indexes
117
118 if self.connection is None:
119 self.connection = DynamoDBConnection()
120
121 if throughput is not None:
122 self.throughput = throughput
123
124 self._dynamizer = NonBooleanDynamizer()
125
126 def use_boolean(self):
127 self._dynamizer = Dynamizer()
128
129 @classmethod
130 def create(cls, table_name, schema, throughput=None, indexes=None,
131 global_indexes=None, connection=None):
132 """
133 Creates a new table in DynamoDB & returns an in-memory ``Table`` object.
134
135 This will setup a brand new table within DynamoDB. The ``table_name``
136 must be unique for your AWS account. The ``schema`` is also required
137 to define the key structure of the table.
138
139 **IMPORTANT** - You should consider the usage pattern of your table
140 up-front, as the schema can **NOT** be modified once the table is
141 created, requiring the creation of a new table & migrating the data
142 should you wish to revise it.
143
144 **IMPORTANT** - If the table already exists in DynamoDB, additional
145 calls to this method will result in an error. If you just need
146 a ``Table`` object to interact with the existing table, you should
147 just initialize a new ``Table`` object, which requires only the
148 ``table_name``.
149
150 Requires a ``table_name`` parameter, which should be a simple string
151 of the name of the table.
152
153 Requires a ``schema`` parameter, which should be a list of
154 ``BaseSchemaField`` subclasses representing the desired schema.
155
156 Optionally accepts a ``throughput`` parameter, which should be a
157 dictionary. If provided, it should specify a ``read`` & ``write`` key,
158 both of which should have an integer value associated with them.
159
160 Optionally accepts a ``indexes`` parameter, which should be a list of
161 ``BaseIndexField`` subclasses representing the desired indexes.
162
163 Optionally accepts a ``global_indexes`` parameter, which should be a
164 list of ``GlobalBaseIndexField`` subclasses representing the desired
165 indexes.
166
167 Optionally accepts a ``connection`` parameter, which should be a
168 ``DynamoDBConnection`` instance (or subclass). This is primarily useful
169 for specifying alternate connection parameters.
170
171 Example::
172
173 >>> users = Table.create('users', schema=[
174 ... HashKey('username'),
175 ... RangeKey('date_joined', data_type=NUMBER)
176 ... ], throughput={
177 ... 'read':20,
178 ... 'write': 10,
179 ... }, indexes=[
180 ... KeysOnlyIndex('MostRecentlyJoined', parts=[
181 ... HashKey('username'),
182 ... RangeKey('date_joined'),
183 ... ]), global_indexes=[
184 ... GlobalAllIndex('UsersByZipcode', parts=[
185 ... HashKey('zipcode'),
186 ... RangeKey('username'),
187 ... ],
188 ... throughput={
189 ... 'read':10,
190 ... 'write':10,
191 ... }),
192 ... ])
193
194 """
195 table = cls(table_name=table_name, connection=connection)
196 table.schema = schema
197
198 if throughput is not None:
199 table.throughput = throughput
200
201 if indexes is not None:
202 table.indexes = indexes
203
204 if global_indexes is not None:
205 table.global_indexes = global_indexes
206
207 # Prep the schema.
208 raw_schema = []
209 attr_defs = []
210 seen_attrs = set()
211
212 for field in table.schema:
213 raw_schema.append(field.schema())
214 # Build the attributes off what we know.
215 seen_attrs.add(field.name)
216 attr_defs.append(field.definition())
217
218 raw_throughput = {
219 'ReadCapacityUnits': int(table.throughput['read']),
220 'WriteCapacityUnits': int(table.throughput['write']),
221 }
222 kwargs = {}
223
224 kwarg_map = {
225 'indexes': 'local_secondary_indexes',
226 'global_indexes': 'global_secondary_indexes',
227 }
228 for index_attr in ('indexes', 'global_indexes'):
229 table_indexes = getattr(table, index_attr)
230 if table_indexes:
231 raw_indexes = []
232 for index_field in table_indexes:
233 raw_indexes.append(index_field.schema())
234 # Make sure all attributes specified in the indexes are
235 # added to the definition
236 for field in index_field.parts:
237 if field.name not in seen_attrs:
238 seen_attrs.add(field.name)
239 attr_defs.append(field.definition())
240
241 kwargs[kwarg_map[index_attr]] = raw_indexes
242
243 table.connection.create_table(
244 table_name=table.table_name,
245 attribute_definitions=attr_defs,
246 key_schema=raw_schema,
247 provisioned_throughput=raw_throughput,
248 **kwargs
249 )
250 return table
251
252 def _introspect_schema(self, raw_schema, raw_attributes=None):
253 """
254 Given a raw schema structure back from a DynamoDB response, parse
255 out & build the high-level Python objects that represent them.
256 """
257 schema = []
258 sane_attributes = {}
259
260 if raw_attributes:
261 for field in raw_attributes:
262 sane_attributes[field['AttributeName']] = field['AttributeType']
263
264 for field in raw_schema:
265 data_type = sane_attributes.get(field['AttributeName'], STRING)
266
267 if field['KeyType'] == 'HASH':
268 schema.append(
269 HashKey(field['AttributeName'], data_type=data_type)
270 )
271 elif field['KeyType'] == 'RANGE':
272 schema.append(
273 RangeKey(field['AttributeName'], data_type=data_type)
274 )
275 else:
276 raise exceptions.UnknownSchemaFieldError(
277 "%s was seen, but is unknown. Please report this at "
278 "https://github.com/boto/boto/issues." % field['KeyType']
279 )
280
281 return schema
282
283 def _introspect_all_indexes(self, raw_indexes, map_indexes_projection):
284 """
285 Given a raw index/global index structure back from a DynamoDB response,
286 parse out & build the high-level Python objects that represent them.
287 """
288 indexes = []
289
290 for field in raw_indexes:
291 index_klass = map_indexes_projection.get('ALL')
292 kwargs = {
293 'parts': []
294 }
295
296 if field['Projection']['ProjectionType'] == 'ALL':
297 index_klass = map_indexes_projection.get('ALL')
298 elif field['Projection']['ProjectionType'] == 'KEYS_ONLY':
299 index_klass = map_indexes_projection.get('KEYS_ONLY')
300 elif field['Projection']['ProjectionType'] == 'INCLUDE':
301 index_klass = map_indexes_projection.get('INCLUDE')
302 kwargs['includes'] = field['Projection']['NonKeyAttributes']
303 else:
304 raise exceptions.UnknownIndexFieldError(
305 "%s was seen, but is unknown. Please report this at "
306 "https://github.com/boto/boto/issues." % \
307 field['Projection']['ProjectionType']
308 )
309
310 name = field['IndexName']
311 kwargs['parts'] = self._introspect_schema(field['KeySchema'], None)
312 indexes.append(index_klass(name, **kwargs))
313
314 return indexes
315
316 def _introspect_indexes(self, raw_indexes):
317 """
318 Given a raw index structure back from a DynamoDB response, parse
319 out & build the high-level Python objects that represent them.
320 """
321 return self._introspect_all_indexes(
322 raw_indexes, self._PROJECTION_TYPE_TO_INDEX.get('local_indexes'))
323
324 def _introspect_global_indexes(self, raw_global_indexes):
325 """
326 Given a raw global index structure back from a DynamoDB response, parse
327 out & build the high-level Python objects that represent them.
328 """
329 return self._introspect_all_indexes(
330 raw_global_indexes,
331 self._PROJECTION_TYPE_TO_INDEX.get('global_indexes'))
332
333 def describe(self):
334 """
335 Describes the current structure of the table in DynamoDB.
336
337 This information will be used to update the ``schema``, ``indexes``,
338 ``global_indexes`` and ``throughput`` information on the ``Table``. Some
339 calls, such as those involving creating keys or querying, will require
340 this information to be populated.
341
342 It also returns the full raw data structure from DynamoDB, in the
343 event you'd like to parse out additional information (such as the
344 ``ItemCount`` or usage information).
345
346 Example::
347
348 >>> users.describe()
349 {
350 # Lots of keys here...
351 }
352 >>> len(users.schema)
353 2
354
355 """
356 result = self.connection.describe_table(self.table_name)
357
358 # Blindly update throughput, since what's on DynamoDB's end is likely
359 # more correct.
360 raw_throughput = result['Table']['ProvisionedThroughput']
361 self.throughput['read'] = int(raw_throughput['ReadCapacityUnits'])
362 self.throughput['write'] = int(raw_throughput['WriteCapacityUnits'])
363
364 if not self.schema:
365 # Since we have the data, build the schema.
366 raw_schema = result['Table'].get('KeySchema', [])
367 raw_attributes = result['Table'].get('AttributeDefinitions', [])
368 self.schema = self._introspect_schema(raw_schema, raw_attributes)
369
370 if not self.indexes:
371 # Build the index information as well.
372 raw_indexes = result['Table'].get('LocalSecondaryIndexes', [])
373 self.indexes = self._introspect_indexes(raw_indexes)
374
375 # Build the global index information as well.
376 raw_global_indexes = result['Table'].get('GlobalSecondaryIndexes', [])
377 self.global_indexes = self._introspect_global_indexes(raw_global_indexes)
378
379 # This is leaky.
380 return result
381
382 def update(self, throughput=None, global_indexes=None):
383 """
384 Updates table attributes and global indexes in DynamoDB.
385
386 Optionally accepts a ``throughput`` parameter, which should be a
387 dictionary. If provided, it should specify a ``read`` & ``write`` key,
388 both of which should have an integer value associated with them.
389
390 Optionally accepts a ``global_indexes`` parameter, which should be a
391 dictionary. If provided, it should specify the index name, which is also
392 a dict containing a ``read`` & ``write`` key, both of which
393 should have an integer value associated with them. If you are writing
394 new code, please use ``Table.update_global_secondary_index``.
395
396 Returns ``True`` on success.
397
398 Example::
399
400 # For a read-heavier application...
401 >>> users.update(throughput={
402 ... 'read': 20,
403 ... 'write': 10,
404 ... })
405 True
406
407 # To also update the global index(es) throughput.
408 >>> users.update(throughput={
409 ... 'read': 20,
410 ... 'write': 10,
411 ... },
412 ... global_secondary_indexes={
413 ... 'TheIndexNameHere': {
414 ... 'read': 15,
415 ... 'write': 5,
416 ... }
417 ... })
418 True
419 """
420
421 data = None
422
423 if throughput:
424 self.throughput = throughput
425 data = {
426 'ReadCapacityUnits': int(self.throughput['read']),
427 'WriteCapacityUnits': int(self.throughput['write']),
428 }
429
430 gsi_data = None
431
432 if global_indexes:
433 gsi_data = []
434
435 for gsi_name, gsi_throughput in global_indexes.items():
436 gsi_data.append({
437 "Update": {
438 "IndexName": gsi_name,
439 "ProvisionedThroughput": {
440 "ReadCapacityUnits": int(gsi_throughput['read']),
441 "WriteCapacityUnits": int(gsi_throughput['write']),
442 },
443 },
444 })
445
446 if throughput or global_indexes:
447 self.connection.update_table(
448 self.table_name,
449 provisioned_throughput=data,
450 global_secondary_index_updates=gsi_data,
451 )
452
453 return True
454 else:
455 msg = 'You need to provide either the throughput or the ' \
456 'global_indexes to update method'
457 boto.log.error(msg)
458
459 return False
460
461 def create_global_secondary_index(self, global_index):
462 """
463 Creates a global index in DynamoDB after the table has been created.
464
465 Requires a ``global_indexes`` parameter, which should be a
466 ``GlobalBaseIndexField`` subclass representing the desired index.
467
468 To update ``global_indexes`` information on the ``Table``, you'll need
469 to call ``Table.describe``.
470
471 Returns ``True`` on success.
472
473 Example::
474
475 # To create a global index
476 >>> users.create_global_secondary_index(
477 ... global_index=GlobalAllIndex(
478 ... 'TheIndexNameHere', parts=[
479 ... HashKey('requiredHashkey', data_type=STRING),
480 ... RangeKey('optionalRangeKey', data_type=STRING)
481 ... ],
482 ... throughput={
483 ... 'read': 2,
484 ... 'write': 1,
485 ... })
486 ... )
487 True
488
489 """
490
491 if global_index:
492 gsi_data = []
493 gsi_data_attr_def = []
494
495 gsi_data.append({
496 "Create": global_index.schema()
497 })
498
499 for attr_def in global_index.parts:
500 gsi_data_attr_def.append(attr_def.definition())
501
502 self.connection.update_table(
503 self.table_name,
504 global_secondary_index_updates=gsi_data,
505 attribute_definitions=gsi_data_attr_def
506 )
507
508 return True
509 else:
510 msg = 'You need to provide the global_index to ' \
511 'create_global_secondary_index method'
512 boto.log.error(msg)
513
514 return False
515
516 def delete_global_secondary_index(self, global_index_name):
517 """
518 Deletes a global index in DynamoDB after the table has been created.
519
520 Requires a ``global_index_name`` parameter, which should be a simple
521 string of the name of the global secondary index.
522
523 To update ``global_indexes`` information on the ``Table``, you'll need
524 to call ``Table.describe``.
525
526 Returns ``True`` on success.
527
528 Example::
529
530 # To delete a global index
531 >>> users.delete_global_secondary_index('TheIndexNameHere')
532 True
533
534 """
535
536 if global_index_name:
537 gsi_data = [
538 {
539 "Delete": {
540 "IndexName": global_index_name
541 }
542 }
543 ]
544
545 self.connection.update_table(
546 self.table_name,
547 global_secondary_index_updates=gsi_data,
548 )
549
550 return True
551 else:
552 msg = 'You need to provide the global index name to ' \
553 'delete_global_secondary_index method'
554 boto.log.error(msg)
555
556 return False
557
558 def update_global_secondary_index(self, global_indexes):
559 """
560 Updates a global index(es) in DynamoDB after the table has been created.
561
562 Requires a ``global_indexes`` parameter, which should be a
563 dictionary. If provided, it should specify the index name, which is also
564 a dict containing a ``read`` & ``write`` key, both of which
565 should have an integer value associated with them.
566
567 To update ``global_indexes`` information on the ``Table``, you'll need
568 to call ``Table.describe``.
569
570 Returns ``True`` on success.
571
572 Example::
573
574 # To update a global index
575 >>> users.update_global_secondary_index(global_indexes={
576 ... 'TheIndexNameHere': {
577 ... 'read': 15,
578 ... 'write': 5,
579 ... }
580 ... })
581 True
582
583 """
584
585 if global_indexes:
586 gsi_data = []
587
588 for gsi_name, gsi_throughput in global_indexes.items():
589 gsi_data.append({
590 "Update": {
591 "IndexName": gsi_name,
592 "ProvisionedThroughput": {
593 "ReadCapacityUnits": int(gsi_throughput['read']),
594 "WriteCapacityUnits": int(gsi_throughput['write']),
595 },
596 },
597 })
598
599 self.connection.update_table(
600 self.table_name,
601 global_secondary_index_updates=gsi_data,
602 )
603 return True
604 else:
605 msg = 'You need to provide the global indexes to ' \
606 'update_global_secondary_index method'
607 boto.log.error(msg)
608
609 return False
610
611 def delete(self):
612 """
613 Deletes a table in DynamoDB.
614
615 **IMPORTANT** - Be careful when using this method, there is no undo.
616
617 Returns ``True`` on success.
618
619 Example::
620
621 >>> users.delete()
622 True
623
624 """
625 self.connection.delete_table(self.table_name)
626 return True
627
628 def _encode_keys(self, keys):
629 """
630 Given a flat Python dictionary of keys/values, converts it into the
631 nested dictionary DynamoDB expects.
632
633 Converts::
634
635 {
636 'username': 'john',
637 'tags': [1, 2, 5],
638 }
639
640 ...to...::
641
642 {
643 'username': {'S': 'john'},
644 'tags': {'NS': ['1', '2', '5']},
645 }
646
647 """
648 raw_key = {}
649
650 for key, value in keys.items():
651 raw_key[key] = self._dynamizer.encode(value)
652
653 return raw_key
654
655 def get_item(self, consistent=False, attributes=None, **kwargs):
656 """
657 Fetches an item (record) from a table in DynamoDB.
658
659 To specify the key of the item you'd like to get, you can specify the
660 key attributes as kwargs.
661
662 Optionally accepts a ``consistent`` parameter, which should be a
663 boolean. If you provide ``True``, it will perform
664 a consistent (but more expensive) read from DynamoDB.
665 (Default: ``False``)
666
667 Optionally accepts an ``attributes`` parameter, which should be a
668 list of fieldname to fetch. (Default: ``None``, which means all fields
669 should be fetched)
670
671 Returns an ``Item`` instance containing all the data for that record.
672
673 Raises an ``ItemNotFound`` exception if the item is not found.
674
675 Example::
676
677 # A simple hash key.
678 >>> john = users.get_item(username='johndoe')
679 >>> john['first_name']
680 'John'
681
682 # A complex hash+range key.
683 >>> john = users.get_item(username='johndoe', last_name='Doe')
684 >>> john['first_name']
685 'John'
686
687 # A consistent read (assuming the data might have just changed).
688 >>> john = users.get_item(username='johndoe', consistent=True)
689 >>> john['first_name']
690 'Johann'
691
692 # With a key that is an invalid variable name in Python.
693 # Also, assumes a different schema than previous examples.
694 >>> john = users.get_item(**{
695 ... 'date-joined': 127549192,
696 ... })
697 >>> john['first_name']
698 'John'
699
700 """
701 raw_key = self._encode_keys(kwargs)
702 item_data = self.connection.get_item(
703 self.table_name,
704 raw_key,
705 attributes_to_get=attributes,
706 consistent_read=consistent
707 )
708 if 'Item' not in item_data:
709 raise exceptions.ItemNotFound("Item %s couldn't be found." % kwargs)
710 item = Item(self)
711 item.load(item_data)
712 return item
713
714 def has_item(self, **kwargs):
715 """
716 Return whether an item (record) exists within a table in DynamoDB.
717
718 To specify the key of the item you'd like to get, you can specify the
719 key attributes as kwargs.
720
721 Optionally accepts a ``consistent`` parameter, which should be a
722 boolean. If you provide ``True``, it will perform
723 a consistent (but more expensive) read from DynamoDB.
724 (Default: ``False``)
725
726 Optionally accepts an ``attributes`` parameter, which should be a
727 list of fieldnames to fetch. (Default: ``None``, which means all fields
728 should be fetched)
729
730 Returns ``True`` if an ``Item`` is present, ``False`` if not.
731
732 Example::
733
734 # Simple, just hash-key schema.
735 >>> users.has_item(username='johndoe')
736 True
737
738 # Complex schema, item not present.
739 >>> users.has_item(
740 ... username='johndoe',
741 ... date_joined='2014-01-07'
742 ... )
743 False
744
745 """
746 try:
747 self.get_item(**kwargs)
748 except (JSONResponseError, exceptions.ItemNotFound):
749 return False
750
751 return True
752
753 def lookup(self, *args, **kwargs):
754 """
755 Look up an entry in DynamoDB. This is mostly backwards compatible
756 with boto.dynamodb. Unlike get_item, it takes hash_key and range_key first,
757 although you may still specify keyword arguments instead.
758
759 Also unlike the get_item command, if the returned item has no keys
760 (i.e., it does not exist in DynamoDB), a None result is returned, instead
761 of an empty key object.
762
763 Example::
764 >>> user = users.lookup(username)
765 >>> user = users.lookup(username, consistent=True)
766 >>> app = apps.lookup('my_customer_id', 'my_app_id')
767
768 """
769 if not self.schema:
770 self.describe()
771 for x, arg in enumerate(args):
772 kwargs[self.schema[x].name] = arg
773 ret = self.get_item(**kwargs)
774 if not ret.keys():
775 return None
776 return ret
777
778 def new_item(self, *args):
779 """
780 Returns a new, blank item
781
782 This is mostly for consistency with boto.dynamodb
783 """
784 if not self.schema:
785 self.describe()
786 data = {}
787 for x, arg in enumerate(args):
788 data[self.schema[x].name] = arg
789 return Item(self, data=data)
790
791 def put_item(self, data, overwrite=False):
792 """
793 Saves an entire item to DynamoDB.
794
795 By default, if any part of the ``Item``'s original data doesn't match
796 what's currently in DynamoDB, this request will fail. This prevents
797 other processes from updating the data in between when you read the
798 item & when your request to update the item's data is processed, which
799 would typically result in some data loss.
800
801 Requires a ``data`` parameter, which should be a dictionary of the data
802 you'd like to store in DynamoDB.
803
804 Optionally accepts an ``overwrite`` parameter, which should be a
805 boolean. If you provide ``True``, this will tell DynamoDB to blindly
806 overwrite whatever data is present, if any.
807
808 Returns ``True`` on success.
809
810 Example::
811
812 >>> users.put_item(data={
813 ... 'username': 'jane',
814 ... 'first_name': 'Jane',
815 ... 'last_name': 'Doe',
816 ... 'date_joined': 126478915,
817 ... })
818 True
819
820 """
821 item = Item(self, data=data)
822 return item.save(overwrite=overwrite)
823
824 def _put_item(self, item_data, expects=None):
825 """
826 The internal variant of ``put_item`` (full data). This is used by the
827 ``Item`` objects, since that operation is represented at the
828 table-level by the API, but conceptually maps better to telling an
829 individual ``Item`` to save itself.
830 """
831 kwargs = {}
832
833 if expects is not None:
834 kwargs['expected'] = expects
835
836 self.connection.put_item(self.table_name, item_data, **kwargs)
837 return True
838
839 def _update_item(self, key, item_data, expects=None):
840 """
841 The internal variant of ``put_item`` (partial data). This is used by the
842 ``Item`` objects, since that operation is represented at the
843 table-level by the API, but conceptually maps better to telling an
844 individual ``Item`` to save itself.
845 """
846 raw_key = self._encode_keys(key)
847 kwargs = {}
848
849 if expects is not None:
850 kwargs['expected'] = expects
851
852 self.connection.update_item(self.table_name, raw_key, item_data, **kwargs)
853 return True
854
855 def delete_item(self, expected=None, conditional_operator=None, **kwargs):
856 """
857 Deletes a single item. You can perform a conditional delete operation
858 that deletes the item if it exists, or if it has an expected attribute
859 value.
860
861 Conditional deletes are useful for only deleting items if specific
862 conditions are met. If those conditions are met, DynamoDB performs
863 the delete. Otherwise, the item is not deleted.
864
865 To specify the expected attribute values of the item, you can pass a
866 dictionary of conditions to ``expected``. Each condition should follow
867 the pattern ``<attributename>__<comparison_operator>=<value_to_expect>``.
868
869 **IMPORTANT** - Be careful when using this method, there is no undo.
870
871 To specify the key of the item you'd like to get, you can specify the
872 key attributes as kwargs.
873
874 Optionally accepts an ``expected`` parameter which is a dictionary of
875 expected attribute value conditions.
876
877 Optionally accepts a ``conditional_operator`` which applies to the
878 expected attribute value conditions:
879
880 + `AND` - If all of the conditions evaluate to true (default)
881 + `OR` - True if at least one condition evaluates to true
882
883 Returns ``True`` on success, ``False`` on failed conditional delete.
884
885 Example::
886
887 # A simple hash key.
888 >>> users.delete_item(username='johndoe')
889 True
890
891 # A complex hash+range key.
892 >>> users.delete_item(username='jane', last_name='Doe')
893 True
894
895 # With a key that is an invalid variable name in Python.
896 # Also, assumes a different schema than previous examples.
897 >>> users.delete_item(**{
898 ... 'date-joined': 127549192,
899 ... })
900 True
901
902 # Conditional delete
903 >>> users.delete_item(username='johndoe',
904 ... expected={'balance__eq': 0})
905 True
906 """
907 expected = self._build_filters(expected, using=FILTER_OPERATORS)
908 raw_key = self._encode_keys(kwargs)
909
910 try:
911 self.connection.delete_item(self.table_name, raw_key,
912 expected=expected,
913 conditional_operator=conditional_operator)
914 except exceptions.ConditionalCheckFailedException:
915 return False
916
917 return True
918
919 def get_key_fields(self):
920 """
921 Returns the fields necessary to make a key for a table.
922
923 If the ``Table`` does not already have a populated ``schema``,
924 this will request it via a ``Table.describe`` call.
925
926 Returns a list of fieldnames (strings).
927
928 Example::
929
930 # A simple hash key.
931 >>> users.get_key_fields()
932 ['username']
933
934 # A complex hash+range key.
935 >>> users.get_key_fields()
936 ['username', 'last_name']
937
938 """
939 if not self.schema:
940 # We don't know the structure of the table. Get a description to
941 # populate the schema.
942 self.describe()
943
944 return [field.name for field in self.schema]
945
946 def batch_write(self):
947 """
948 Allows the batching of writes to DynamoDB.
949
950 Since each write/delete call to DynamoDB has a cost associated with it,
951 when loading lots of data, it makes sense to batch them, creating as
952 few calls as possible.
953
954 This returns a context manager that will transparently handle creating
955 these batches. The object you get back lightly-resembles a ``Table``
956 object, sharing just the ``put_item`` & ``delete_item`` methods
957 (which are all that DynamoDB can batch in terms of writing data).
958
959 DynamoDB's maximum batch size is 25 items per request. If you attempt
960 to put/delete more than that, the context manager will batch as many
961 as it can up to that number, then flush them to DynamoDB & continue
962 batching as more calls come in.
963
964 Example::
965
966 # Assuming a table with one record...
967 >>> with users.batch_write() as batch:
968 ... batch.put_item(data={
969 ... 'username': 'johndoe',
970 ... 'first_name': 'John',
971 ... 'last_name': 'Doe',
972 ... 'owner': 1,
973 ... })
974 ... # Nothing across the wire yet.
975 ... batch.delete_item(username='bob')
976 ... # Still no requests sent.
977 ... batch.put_item(data={
978 ... 'username': 'jane',
979 ... 'first_name': 'Jane',
980 ... 'last_name': 'Doe',
981 ... 'date_joined': 127436192,
982 ... })
983 ... # Nothing yet, but once we leave the context, the
984 ... # put/deletes will be sent.
985
986 """
987 # PHENOMENAL COSMIC DOCS!!! itty-bitty code.
988 return BatchTable(self)
989
990 def _build_filters(self, filter_kwargs, using=QUERY_OPERATORS):
991 """
992 An internal method for taking query/scan-style ``**kwargs`` & turning
993 them into the raw structure DynamoDB expects for filtering.
994 """
995 if filter_kwargs is None:
996 return
997
998 filters = {}
999
1000 for field_and_op, value in filter_kwargs.items():
1001 field_bits = field_and_op.split('__')
1002 fieldname = '__'.join(field_bits[:-1])
1003
1004 try:
1005 op = using[field_bits[-1]]
1006 except KeyError:
1007 raise exceptions.UnknownFilterTypeError(
1008 "Operator '%s' from '%s' is not recognized." % (
1009 field_bits[-1],
1010 field_and_op
1011 )
1012 )
1013
1014 lookup = {
1015 'AttributeValueList': [],
1016 'ComparisonOperator': op,
1017 }
1018
1019 # Special-case the ``NULL/NOT_NULL`` case.
1020 if field_bits[-1] == 'null':
1021 del lookup['AttributeValueList']
1022
1023 if value is False:
1024 lookup['ComparisonOperator'] = 'NOT_NULL'
1025 else:
1026 lookup['ComparisonOperator'] = 'NULL'
1027 # Special-case the ``BETWEEN`` case.
1028 elif field_bits[-1] == 'between':
1029 if len(value) == 2 and isinstance(value, (list, tuple)):
1030 lookup['AttributeValueList'].append(
1031 self._dynamizer.encode(value[0])
1032 )
1033 lookup['AttributeValueList'].append(
1034 self._dynamizer.encode(value[1])
1035 )
1036 # Special-case the ``IN`` case
1037 elif field_bits[-1] == 'in':
1038 for val in value:
1039 lookup['AttributeValueList'].append(self._dynamizer.encode(val))
1040 else:
1041 # Fix up the value for encoding, because it was built to only work
1042 # with ``set``s.
1043 if isinstance(value, (list, tuple)):
1044 value = set(value)
1045 lookup['AttributeValueList'].append(
1046 self._dynamizer.encode(value)
1047 )
1048
1049 # Finally, insert it into the filters.
1050 filters[fieldname] = lookup
1051
1052 return filters
1053
1054 def query(self, limit=None, index=None, reverse=False, consistent=False,
1055 attributes=None, max_page_size=None, **filter_kwargs):
1056 """
1057 **WARNING:** This method is provided **strictly** for
1058 backward-compatibility. It returns results in an incorrect order.
1059
1060 If you are writing new code, please use ``Table.query_2``.
1061 """
1062 reverse = not reverse
1063 return self.query_2(limit=limit, index=index, reverse=reverse,
1064 consistent=consistent, attributes=attributes,
1065 max_page_size=max_page_size, **filter_kwargs)
1066
1067 def query_2(self, limit=None, index=None, reverse=False,
1068 consistent=False, attributes=None, max_page_size=None,
1069 query_filter=None, conditional_operator=None,
1070 **filter_kwargs):
1071 """
1072 Queries for a set of matching items in a DynamoDB table.
1073
1074 Queries can be performed against a hash key, a hash+range key or
1075 against any data stored in your local secondary indexes. Query filters
1076 can be used to filter on arbitrary fields.
1077
1078 **Note** - You can not query against arbitrary fields within the data
1079 stored in DynamoDB unless you specify ``query_filter`` values.
1080
1081 To specify the filters of the items you'd like to get, you can specify
1082 the filters as kwargs. Each filter kwarg should follow the pattern
1083 ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters
1084 are specified in the same way.
1085
1086 Optionally accepts a ``limit`` parameter, which should be an integer
1087 count of the total number of items to return. (Default: ``None`` -
1088 all results)
1089
1090 Optionally accepts an ``index`` parameter, which should be a string of
1091 name of the local secondary index you want to query against.
1092 (Default: ``None``)
1093
1094 Optionally accepts a ``reverse`` parameter, which will present the
1095 results in reverse order. (Default: ``False`` - normal order)
1096
1097 Optionally accepts a ``consistent`` parameter, which should be a
1098 boolean. If you provide ``True``, it will force a consistent read of
1099 the data (more expensive). (Default: ``False`` - use eventually
1100 consistent reads)
1101
1102 Optionally accepts a ``attributes`` parameter, which should be a
1103 tuple. If you provide any attributes only these will be fetched
1104 from DynamoDB. This uses the ``AttributesToGet`` and set's
1105 ``Select`` to ``SPECIFIC_ATTRIBUTES`` API.
1106
1107 Optionally accepts a ``max_page_size`` parameter, which should be an
1108 integer count of the maximum number of items to retrieve
1109 **per-request**. This is useful in making faster requests & prevent
1110 the scan from drowning out other queries. (Default: ``None`` -
1111 fetch as many as DynamoDB will return)
1112
1113 Optionally accepts a ``query_filter`` which is a dictionary of filter
1114 conditions against any arbitrary field in the returned data.
1115
1116 Optionally accepts a ``conditional_operator`` which applies to the
1117 query filter conditions:
1118
1119 + `AND` - True if all filter conditions evaluate to true (default)
1120 + `OR` - True if at least one filter condition evaluates to true
1121
1122 Returns a ``ResultSet`` containing ``Item``s, which transparently handles the pagination of
1123 results you get back.
1124
1125 Example::
1126
1127 # Look for last names equal to "Doe".
1128 >>> results = users.query(last_name__eq='Doe')
1129 >>> for res in results:
1130 ... print res['first_name']
1131 'John'
1132 'Jane'
1133
1134 # Look for last names beginning with "D", in reverse order, limit 3.
1135 >>> results = users.query(
1136 ... last_name__beginswith='D',
1137 ... reverse=True,
1138 ... limit=3
1139 ... )
1140 >>> for res in results:
1141 ... print res['first_name']
1142 'Alice'
1143 'Jane'
1144 'John'
1145
1146 # Use an LSI & a consistent read.
1147 >>> results = users.query(
1148 ... date_joined__gte=1236451000,
1149 ... owner__eq=1,
1150 ... index='DateJoinedIndex',
1151 ... consistent=True
1152 ... )
1153 >>> for res in results:
1154 ... print res['first_name']
1155 'Alice'
1156 'Bob'
1157 'John'
1158 'Fred'
1159
1160 # Filter by non-indexed field(s)
1161 >>> results = users.query(
1162 ... last_name__eq='Doe',
1163 ... reverse=True,
1164 ... query_filter={
1165 ... 'first_name__beginswith': 'A'
1166 ... }
1167 ... )
1168 >>> for res in results:
1169 ... print res['first_name'] + ' ' + res['last_name']
1170 'Alice Doe'
1171
1172 """
1173 if self.schema:
1174 if len(self.schema) == 1:
1175 if len(filter_kwargs) <= 1:
1176 if not self.global_indexes or not len(self.global_indexes):
1177 # If the schema only has one field, there's <= 1 filter
1178 # param & no Global Secondary Indexes, this is user
1179 # error. Bail early.
1180 raise exceptions.QueryError(
1181 "You must specify more than one key to filter on."
1182 )
1183
1184 if attributes is not None:
1185 select = 'SPECIFIC_ATTRIBUTES'
1186 else:
1187 select = None
1188
1189 results = ResultSet(
1190 max_page_size=max_page_size
1191 )
1192 kwargs = filter_kwargs.copy()
1193 kwargs.update({
1194 'limit': limit,
1195 'index': index,
1196 'reverse': reverse,
1197 'consistent': consistent,
1198 'select': select,
1199 'attributes_to_get': attributes,
1200 'query_filter': query_filter,
1201 'conditional_operator': conditional_operator,
1202 })
1203 results.to_call(self._query, **kwargs)
1204 return results
1205
1206 def query_count(self, index=None, consistent=False, conditional_operator=None,
1207 query_filter=None, scan_index_forward=True, limit=None,
1208 exclusive_start_key=None, **filter_kwargs):
1209 """
1210 Queries the exact count of matching items in a DynamoDB table.
1211
1212 Queries can be performed against a hash key, a hash+range key or
1213 against any data stored in your local secondary indexes. Query filters
1214 can be used to filter on arbitrary fields.
1215
1216 To specify the filters of the items you'd like to get, you can specify
1217 the filters as kwargs. Each filter kwarg should follow the pattern
1218 ``<fieldname>__<filter_operation>=<value_to_look_for>``. Query filters
1219 are specified in the same way.
1220
1221 Optionally accepts an ``index`` parameter, which should be a string of
1222 name of the local secondary index you want to query against.
1223 (Default: ``None``)
1224
1225 Optionally accepts a ``consistent`` parameter, which should be a
1226 boolean. If you provide ``True``, it will force a consistent read of
1227 the data (more expensive). (Default: ``False`` - use eventually
1228 consistent reads)
1229
1230 Optionally accepts a ``query_filter`` which is a dictionary of filter
1231 conditions against any arbitrary field in the returned data.
1232
1233 Optionally accepts a ``conditional_operator`` which applies to the
1234 query filter conditions:
1235
1236 + `AND` - True if all filter conditions evaluate to true (default)
1237 + `OR` - True if at least one filter condition evaluates to true
1238
1239 Optionally accept a ``exclusive_start_key`` which is used to get
1240 the remaining items when a query cannot return the complete count.
1241
1242 Returns an integer which represents the exact amount of matched
1243 items.
1244
1245 :type scan_index_forward: boolean
1246 :param scan_index_forward: Specifies ascending (true) or descending
1247 (false) traversal of the index. DynamoDB returns results reflecting
1248 the requested order determined by the range key. If the data type
1249 is Number, the results are returned in numeric order. For String,
1250 the results are returned in order of ASCII character code values.
1251 For Binary, DynamoDB treats each byte of the binary data as
1252 unsigned when it compares binary values.
1253
1254 If ScanIndexForward is not specified, the results are returned in
1255 ascending order.
1256
1257 :type limit: integer
1258 :param limit: The maximum number of items to evaluate (not necessarily
1259 the number of matching items).
1260
1261 Example::
1262
1263 # Look for last names equal to "Doe".
1264 >>> users.query_count(last_name__eq='Doe')
1265 5
1266
1267 # Use an LSI & a consistent read.
1268 >>> users.query_count(
1269 ... date_joined__gte=1236451000,
1270 ... owner__eq=1,
1271 ... index='DateJoinedIndex',
1272 ... consistent=True
1273 ... )
1274 2
1275
1276 """
1277 key_conditions = self._build_filters(
1278 filter_kwargs,
1279 using=QUERY_OPERATORS
1280 )
1281
1282 built_query_filter = self._build_filters(
1283 query_filter,
1284 using=FILTER_OPERATORS
1285 )
1286
1287 count_buffer = 0
1288 last_evaluated_key = exclusive_start_key
1289
1290 while True:
1291 raw_results = self.connection.query(
1292 self.table_name,
1293 index_name=index,
1294 consistent_read=consistent,
1295 select='COUNT',
1296 key_conditions=key_conditions,
1297 query_filter=built_query_filter,
1298 conditional_operator=conditional_operator,
1299 limit=limit,
1300 scan_index_forward=scan_index_forward,
1301 exclusive_start_key=last_evaluated_key
1302 )
1303
1304 count_buffer += int(raw_results.get('Count', 0))
1305 last_evaluated_key = raw_results.get('LastEvaluatedKey')
1306 if not last_evaluated_key or count_buffer < 1:
1307 break
1308
1309 return count_buffer
1310
1311 def _query(self, limit=None, index=None, reverse=False, consistent=False,
1312 exclusive_start_key=None, select=None, attributes_to_get=None,
1313 query_filter=None, conditional_operator=None, **filter_kwargs):
1314 """
1315 The internal method that performs the actual queries. Used extensively
1316 by ``ResultSet`` to perform each (paginated) request.
1317 """
1318 kwargs = {
1319 'limit': limit,
1320 'index_name': index,
1321 'consistent_read': consistent,
1322 'select': select,
1323 'attributes_to_get': attributes_to_get,
1324 'conditional_operator': conditional_operator,
1325 }
1326
1327 if reverse:
1328 kwargs['scan_index_forward'] = False
1329
1330 if exclusive_start_key:
1331 kwargs['exclusive_start_key'] = {}
1332
1333 for key, value in exclusive_start_key.items():
1334 kwargs['exclusive_start_key'][key] = \
1335 self._dynamizer.encode(value)
1336
1337 # Convert the filters into something we can actually use.
1338 kwargs['key_conditions'] = self._build_filters(
1339 filter_kwargs,
1340 using=QUERY_OPERATORS
1341 )
1342
1343 kwargs['query_filter'] = self._build_filters(
1344 query_filter,
1345 using=FILTER_OPERATORS
1346 )
1347
1348 raw_results = self.connection.query(
1349 self.table_name,
1350 **kwargs
1351 )
1352 results = []
1353 last_key = None
1354
1355 for raw_item in raw_results.get('Items', []):
1356 item = Item(self)
1357 item.load({
1358 'Item': raw_item,
1359 })
1360 results.append(item)
1361
1362 if raw_results.get('LastEvaluatedKey', None):
1363 last_key = {}
1364
1365 for key, value in raw_results['LastEvaluatedKey'].items():
1366 last_key[key] = self._dynamizer.decode(value)
1367
1368 return {
1369 'results': results,
1370 'last_key': last_key,
1371 }
1372
1373 def scan(self, limit=None, segment=None, total_segments=None,
1374 max_page_size=None, attributes=None, conditional_operator=None,
1375 **filter_kwargs):
1376 """
1377 Scans across all items within a DynamoDB table.
1378
1379 Scans can be performed against a hash key or a hash+range key. You can
1380 additionally filter the results after the table has been read but
1381 before the response is returned by using query filters.
1382
1383 To specify the filters of the items you'd like to get, you can specify
1384 the filters as kwargs. Each filter kwarg should follow the pattern
1385 ``<fieldname>__<filter_operation>=<value_to_look_for>``.
1386
1387 Optionally accepts a ``limit`` parameter, which should be an integer
1388 count of the total number of items to return. (Default: ``None`` -
1389 all results)
1390
1391 Optionally accepts a ``segment`` parameter, which should be an integer
1392 of the segment to retrieve on. Please see the documentation about
1393 Parallel Scans (Default: ``None`` - no segments)
1394
1395 Optionally accepts a ``total_segments`` parameter, which should be an
1396 integer count of number of segments to divide the table into.
1397 Please see the documentation about Parallel Scans (Default: ``None`` -
1398 no segments)
1399
1400 Optionally accepts a ``max_page_size`` parameter, which should be an
1401 integer count of the maximum number of items to retrieve
1402 **per-request**. This is useful in making faster requests & prevent
1403 the scan from drowning out other queries. (Default: ``None`` -
1404 fetch as many as DynamoDB will return)
1405
1406 Optionally accepts an ``attributes`` parameter, which should be a
1407 tuple. If you provide any attributes only these will be fetched
1408 from DynamoDB. This uses the ``AttributesToGet`` and set's
1409 ``Select`` to ``SPECIFIC_ATTRIBUTES`` API.
1410
1411 Returns a ``ResultSet``, which transparently handles the pagination of
1412 results you get back.
1413
1414 Example::
1415
1416 # All results.
1417 >>> everything = users.scan()
1418
1419 # Look for last names beginning with "D".
1420 >>> results = users.scan(last_name__beginswith='D')
1421 >>> for res in results:
1422 ... print res['first_name']
1423 'Alice'
1424 'John'
1425 'Jane'
1426
1427 # Use an ``IN`` filter & limit.
1428 >>> results = users.scan(
1429 ... age__in=[25, 26, 27, 28, 29],
1430 ... limit=1
1431 ... )
1432 >>> for res in results:
1433 ... print res['first_name']
1434 'Alice'
1435
1436 """
1437 results = ResultSet(
1438 max_page_size=max_page_size
1439 )
1440 kwargs = filter_kwargs.copy()
1441 kwargs.update({
1442 'limit': limit,
1443 'segment': segment,
1444 'total_segments': total_segments,
1445 'attributes': attributes,
1446 'conditional_operator': conditional_operator,
1447 })
1448 results.to_call(self._scan, **kwargs)
1449 return results
1450
1451 def _scan(self, limit=None, exclusive_start_key=None, segment=None,
1452 total_segments=None, attributes=None, conditional_operator=None,
1453 **filter_kwargs):
1454 """
1455 The internal method that performs the actual scan. Used extensively
1456 by ``ResultSet`` to perform each (paginated) request.
1457 """
1458 kwargs = {
1459 'limit': limit,
1460 'segment': segment,
1461 'total_segments': total_segments,
1462 'attributes_to_get': attributes,
1463 'conditional_operator': conditional_operator,
1464 }
1465
1466 if exclusive_start_key:
1467 kwargs['exclusive_start_key'] = {}
1468
1469 for key, value in exclusive_start_key.items():
1470 kwargs['exclusive_start_key'][key] = \
1471 self._dynamizer.encode(value)
1472
1473 # Convert the filters into something we can actually use.
1474 kwargs['scan_filter'] = self._build_filters(
1475 filter_kwargs,
1476 using=FILTER_OPERATORS
1477 )
1478
1479 raw_results = self.connection.scan(
1480 self.table_name,
1481 **kwargs
1482 )
1483 results = []
1484 last_key = None
1485
1486 for raw_item in raw_results.get('Items', []):
1487 item = Item(self)
1488 item.load({
1489 'Item': raw_item,
1490 })
1491 results.append(item)
1492
1493 if raw_results.get('LastEvaluatedKey', None):
1494 last_key = {}
1495
1496 for key, value in raw_results['LastEvaluatedKey'].items():
1497 last_key[key] = self._dynamizer.decode(value)
1498
1499 return {
1500 'results': results,
1501 'last_key': last_key,
1502 }
1503
1504 def batch_get(self, keys, consistent=False, attributes=None):
1505 """
1506 Fetches many specific items in batch from a table.
1507
1508 Requires a ``keys`` parameter, which should be a list of dictionaries.
1509 Each dictionary should consist of the keys values to specify.
1510
1511 Optionally accepts a ``consistent`` parameter, which should be a
1512 boolean. If you provide ``True``, a strongly consistent read will be
1513 used. (Default: False)
1514
1515 Optionally accepts an ``attributes`` parameter, which should be a
1516 tuple. If you provide any attributes only these will be fetched
1517 from DynamoDB.
1518
1519 Returns a ``ResultSet``, which transparently handles the pagination of
1520 results you get back.
1521
1522 Example::
1523
1524 >>> results = users.batch_get(keys=[
1525 ... {
1526 ... 'username': 'johndoe',
1527 ... },
1528 ... {
1529 ... 'username': 'jane',
1530 ... },
1531 ... {
1532 ... 'username': 'fred',
1533 ... },
1534 ... ])
1535 >>> for res in results:
1536 ... print res['first_name']
1537 'John'
1538 'Jane'
1539 'Fred'
1540
1541 """
1542 # We pass the keys to the constructor instead, so it can maintain it's
1543 # own internal state as to what keys have been processed.
1544 results = BatchGetResultSet(keys=keys, max_batch_get=self.max_batch_get)
1545 results.to_call(self._batch_get, consistent=consistent, attributes=attributes)
1546 return results
1547
1548 def _batch_get(self, keys, consistent=False, attributes=None):
1549 """
1550 The internal method that performs the actual batch get. Used extensively
1551 by ``BatchGetResultSet`` to perform each (paginated) request.
1552 """
1553 items = {
1554 self.table_name: {
1555 'Keys': [],
1556 },
1557 }
1558
1559 if consistent:
1560 items[self.table_name]['ConsistentRead'] = True
1561
1562 if attributes is not None:
1563 items[self.table_name]['AttributesToGet'] = attributes
1564
1565 for key_data in keys:
1566 raw_key = {}
1567
1568 for key, value in key_data.items():
1569 raw_key[key] = self._dynamizer.encode(value)
1570
1571 items[self.table_name]['Keys'].append(raw_key)
1572
1573 raw_results = self.connection.batch_get_item(request_items=items)
1574 results = []
1575 unprocessed_keys = []
1576
1577 for raw_item in raw_results['Responses'].get(self.table_name, []):
1578 item = Item(self)
1579 item.load({
1580 'Item': raw_item,
1581 })
1582 results.append(item)
1583
1584 raw_unprocessed = raw_results.get('UnprocessedKeys', {}).get(self.table_name, {})
1585
1586 for raw_key in raw_unprocessed.get('Keys', []):
1587 py_key = {}
1588
1589 for key, value in raw_key.items():
1590 py_key[key] = self._dynamizer.decode(value)
1591
1592 unprocessed_keys.append(py_key)
1593
1594 return {
1595 'results': results,
1596 # NEVER return a ``last_key``. Just in-case any part of
1597 # ``ResultSet`` peeks through, since much of the
1598 # original underlying implementation is based on this key.
1599 'last_key': None,
1600 'unprocessed_keys': unprocessed_keys,
1601 }
1602
1603 def count(self):
1604 """
1605 Returns a (very) eventually consistent count of the number of items
1606 in a table.
1607
1608 Lag time is about 6 hours, so don't expect a high degree of accuracy.
1609
1610 Example::
1611
1612 >>> users.count()
1613 6
1614
1615 """
1616 info = self.describe()
1617 return info['Table'].get('ItemCount', 0)
1618
1619
1620 class BatchTable(object):
1621 """
1622 Used by ``Table`` as the context manager for batch writes.
1623
1624 You likely don't want to try to use this object directly.
1625 """
1626 def __init__(self, table):
1627 self.table = table
1628 self._to_put = []
1629 self._to_delete = []
1630 self._unprocessed = []
1631
1632 def __enter__(self):
1633 return self
1634
1635 def __exit__(self, type, value, traceback):
1636 if self._to_put or self._to_delete:
1637 # Flush anything that's left.
1638 self.flush()
1639
1640 if self._unprocessed:
1641 # Finally, handle anything that wasn't processed.
1642 self.resend_unprocessed()
1643
1644 def put_item(self, data, overwrite=False):
1645 self._to_put.append(data)
1646
1647 if self.should_flush():
1648 self.flush()
1649
1650 def delete_item(self, **kwargs):
1651 self._to_delete.append(kwargs)
1652
1653 if self.should_flush():
1654 self.flush()
1655
1656 def should_flush(self):
1657 if len(self._to_put) + len(self._to_delete) == 25:
1658 return True
1659
1660 return False
1661
1662 def flush(self):
1663 batch_data = {
1664 self.table.table_name: [
1665 # We'll insert data here shortly.
1666 ],
1667 }
1668
1669 for put in self._to_put:
1670 item = Item(self.table, data=put)
1671 batch_data[self.table.table_name].append({
1672 'PutRequest': {
1673 'Item': item.prepare_full(),
1674 }
1675 })
1676
1677 for delete in self._to_delete:
1678 batch_data[self.table.table_name].append({
1679 'DeleteRequest': {
1680 'Key': self.table._encode_keys(delete),
1681 }
1682 })
1683
1684 resp = self.table.connection.batch_write_item(batch_data)
1685 self.handle_unprocessed(resp)
1686
1687 self._to_put = []
1688 self._to_delete = []
1689 return True
1690
1691 def handle_unprocessed(self, resp):
1692 if len(resp.get('UnprocessedItems', [])):
1693 table_name = self.table.table_name
1694 unprocessed = resp['UnprocessedItems'].get(table_name, [])
1695
1696 # Some items have not been processed. Stow them for now &
1697 # re-attempt processing on ``__exit__``.
1698 msg = "%s items were unprocessed. Storing for later."
1699 boto.log.info(msg % len(unprocessed))
1700 self._unprocessed.extend(unprocessed)
1701
1702 def resend_unprocessed(self):
1703 # If there are unprocessed records (for instance, the user was over
1704 # their throughput limitations), iterate over them & send until they're
1705 # all there.
1706 boto.log.info(
1707 "Re-sending %s unprocessed items." % len(self._unprocessed)
1708 )
1709
1710 while len(self._unprocessed):
1711 # Again, do 25 at a time.
1712 to_resend = self._unprocessed[:25]
1713 # Remove them from the list.
1714 self._unprocessed = self._unprocessed[25:]
1715 batch_data = {
1716 self.table.table_name: to_resend
1717 }
1718 boto.log.info("Sending %s items" % len(to_resend))
1719 resp = self.table.connection.batch_write_item(batch_data)
1720 self.handle_unprocessed(resp)
1721 boto.log.info(
1722 "%s unprocessed items left" % len(self._unprocessed)
1723 )