comparison env/lib/python3.9/site-packages/boto/sqs/connection.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # Copyright (c) 2006-2009 Mitch Garnaat http://garnaat.org/
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions:
10 #
11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE.
21
22 import boto
23 from boto.connection import AWSQueryConnection
24 from boto.sqs.regioninfo import SQSRegionInfo
25 from boto.sqs.queue import Queue
26 from boto.sqs.message import Message
27 from boto.sqs.attributes import Attributes
28 from boto.sqs.batchresults import BatchResults
29 from boto.exception import SQSError, BotoServerError
30
31
32 class SQSConnection(AWSQueryConnection):
33 """
34 A Connection to the SQS Service.
35 """
36 DefaultRegionName = boto.config.get('Boto', 'sqs_region_name', 'us-east-1')
37 DefaultRegionEndpoint = boto.config.get('Boto', 'sqs_region_endpoint',
38 'queue.amazonaws.com')
39 APIVersion = boto.config.get('Boto', 'sqs_version', '2012-11-05')
40 DefaultContentType = 'text/plain'
41 ResponseError = SQSError
42 AuthServiceName = 'sqs'
43
44 def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
45 is_secure=True, port=None, proxy=None, proxy_port=None,
46 proxy_user=None, proxy_pass=None, debug=0,
47 https_connection_factory=None, region=None, path='/',
48 security_token=None, validate_certs=True, profile_name=None):
49 if not region:
50 region = SQSRegionInfo(self, self.DefaultRegionName,
51 self.DefaultRegionEndpoint)
52 self.region = region
53 super(SQSConnection, self).__init__(aws_access_key_id,
54 aws_secret_access_key,
55 is_secure, port,
56 proxy, proxy_port,
57 proxy_user, proxy_pass,
58 self.region.endpoint, debug,
59 https_connection_factory, path,
60 security_token=security_token,
61 validate_certs=validate_certs,
62 profile_name=profile_name)
63 self.auth_region_name = self.region.name
64
65 def _required_auth_capability(self):
66 return ['hmac-v4']
67
68 def create_queue(self, queue_name, visibility_timeout=None):
69 """
70 Create an SQS Queue.
71
72 :type queue_name: str or unicode
73 :param queue_name: The name of the new queue. Names are
74 scoped to an account and need to be unique within that
75 account. Calling this method on an existing queue name
76 will not return an error from SQS unless the value for
77 visibility_timeout is different than the value of the
78 existing queue of that name. This is still an expensive
79 operation, though, and not the preferred way to check for
80 the existence of a queue. See the
81 :func:`boto.sqs.connection.SQSConnection.lookup` method.
82
83 :type visibility_timeout: int
84 :param visibility_timeout: The default visibility timeout for
85 all messages written in the queue. This can be overridden
86 on a per-message.
87
88 :rtype: :class:`boto.sqs.queue.Queue`
89 :return: The newly created queue.
90
91 """
92 params = {'QueueName': queue_name}
93 if visibility_timeout:
94 params['Attribute.1.Name'] = 'VisibilityTimeout'
95 params['Attribute.1.Value'] = int(visibility_timeout)
96 return self.get_object('CreateQueue', params, Queue)
97
98 def delete_queue(self, queue, force_deletion=False):
99 """
100 Delete an SQS Queue.
101
102 :type queue: A Queue object
103 :param queue: The SQS queue to be deleted
104
105 :type force_deletion: Boolean
106 :param force_deletion: A deprecated parameter that is no longer used by
107 SQS's API.
108
109 :rtype: bool
110 :return: True if the command succeeded, False otherwise
111 """
112 return self.get_status('DeleteQueue', None, queue.id)
113
114 def purge_queue(self, queue):
115 """
116 Purge all messages in an SQS Queue.
117
118 :type queue: A Queue object
119 :param queue: The SQS queue to be purged
120
121 :rtype: bool
122 :return: True if the command succeeded, False otherwise
123 """
124 return self.get_status('PurgeQueue', None, queue.id)
125
126 def get_queue_attributes(self, queue, attribute='All'):
127 """
128 Gets one or all attributes of a Queue
129
130 :type queue: A Queue object
131 :param queue: The SQS queue to get attributes for
132
133 :type attribute: str
134 :param attribute: The specific attribute requested. If not
135 supplied, the default is to return all attributes. Valid
136 attributes are:
137
138 * All
139 * ApproximateNumberOfMessages
140 * ApproximateNumberOfMessagesNotVisible
141 * VisibilityTimeout
142 * CreatedTimestamp
143 * LastModifiedTimestamp
144 * Policy
145 * MaximumMessageSize
146 * MessageRetentionPeriod
147 * QueueArn
148 * ApproximateNumberOfMessagesDelayed
149 * DelaySeconds
150 * ReceiveMessageWaitTimeSeconds
151 * RedrivePolicy
152
153 :rtype: :class:`boto.sqs.attributes.Attributes`
154 :return: An Attributes object containing request value(s).
155 """
156 params = {'AttributeName' : attribute}
157 return self.get_object('GetQueueAttributes', params,
158 Attributes, queue.id)
159
160 def set_queue_attribute(self, queue, attribute, value):
161 """
162 Set a new value for an attribute of a Queue.
163
164 :type queue: A Queue object
165 :param queue: The SQS queue to get attributes for
166
167 :type attribute: String
168 :param attribute: The name of the attribute you want to set.
169
170 :param value: The new value for the attribute must be:
171
172 * For `DelaySeconds` the value must be an integer number of
173 seconds from 0 to 900 (15 minutes).
174 >>> connection.set_queue_attribute(queue, 'DelaySeconds', 900)
175
176 * For `MaximumMessageSize` the value must be an integer number of
177 bytes from 1024 (1 KiB) to 262144 (256 KiB).
178 >>> connection.set_queue_attribute(queue, 'MaximumMessageSize', 262144)
179
180 * For `MessageRetentionPeriod` the value must be an integer number of
181 seconds from 60 (1 minute) to 1209600 (14 days).
182 >>> connection.set_queue_attribute(queue, 'MessageRetentionPeriod', 1209600)
183
184 * For `Policy` the value must be an string that contains JSON formatted
185 parameters and values.
186 >>> connection.set_queue_attribute(queue, 'Policy', json.dumps({
187 ... 'Version': '2008-10-17',
188 ... 'Id': '/123456789012/testQueue/SQSDefaultPolicy',
189 ... 'Statement': [
190 ... {
191 ... 'Sid': 'Queue1ReceiveMessage',
192 ... 'Effect': 'Allow',
193 ... 'Principal': {
194 ... 'AWS': '*'
195 ... },
196 ... 'Action': 'SQS:ReceiveMessage',
197 ... 'Resource': 'arn:aws:aws:sqs:us-east-1:123456789012:testQueue'
198 ... }
199 ... ]
200 ... }))
201
202 * For `ReceiveMessageWaitTimeSeconds` the value must be an integer number of
203 seconds from 0 to 20.
204 >>> connection.set_queue_attribute(queue, 'ReceiveMessageWaitTimeSeconds', 20)
205
206 * For `VisibilityTimeout` the value must be an integer number of
207 seconds from 0 to 43200 (12 hours).
208 >>> connection.set_queue_attribute(queue, 'VisibilityTimeout', 43200)
209
210 * For `RedrivePolicy` the value must be an string that contains JSON formatted
211 parameters and values. You can set maxReceiveCount to a value between 1 and 1000.
212 The deadLetterTargetArn value is the Amazon Resource Name (ARN) of the queue that
213 will receive the dead letter messages.
214 >>> connection.set_queue_attribute(queue, 'RedrivePolicy', json.dumps({
215 ... 'maxReceiveCount': 5,
216 ... 'deadLetterTargetArn': "arn:aws:aws:sqs:us-east-1:123456789012:testDeadLetterQueue"
217 ... }))
218 """
219
220 params = {'Attribute.Name' : attribute, 'Attribute.Value' : value}
221 return self.get_status('SetQueueAttributes', params, queue.id)
222
223 def receive_message(self, queue, number_messages=1,
224 visibility_timeout=None, attributes=None,
225 wait_time_seconds=None, message_attributes=None):
226 """
227 Read messages from an SQS Queue.
228
229 :type queue: A Queue object
230 :param queue: The Queue from which messages are read.
231
232 :type number_messages: int
233 :param number_messages: The maximum number of messages to read
234 (default=1)
235
236 :type visibility_timeout: int
237 :param visibility_timeout: The number of seconds the message should
238 remain invisible to other queue readers
239 (default=None which uses the Queues default)
240
241 :type attributes: str
242 :param attributes: The name of additional attribute to return
243 with response or All if you want all attributes. The
244 default is to return no additional attributes. Valid
245 values:
246 * All
247 * SenderId
248 * SentTimestamp
249 * ApproximateReceiveCount
250 * ApproximateFirstReceiveTimestamp
251
252 :type wait_time_seconds: int
253 :param wait_time_seconds: The duration (in seconds) for which the call
254 will wait for a message to arrive in the queue before returning.
255 If a message is available, the call will return sooner than
256 wait_time_seconds.
257
258 :type message_attributes: list
259 :param message_attributes: The name(s) of additional message
260 attributes to return. The default is to return no additional
261 message attributes. Use ``['All']`` or ``['.*']`` to return all.
262
263 :rtype: list
264 :return: A list of :class:`boto.sqs.message.Message` objects.
265
266 """
267 params = {'MaxNumberOfMessages' : number_messages}
268 if visibility_timeout is not None:
269 params['VisibilityTimeout'] = visibility_timeout
270 if attributes is not None:
271 self.build_list_params(params, attributes, 'AttributeName')
272 if wait_time_seconds is not None:
273 params['WaitTimeSeconds'] = wait_time_seconds
274 if message_attributes is not None:
275 self.build_list_params(params, message_attributes,
276 'MessageAttributeName')
277 return self.get_list('ReceiveMessage', params,
278 [('Message', queue.message_class)],
279 queue.id, queue)
280
281 def delete_message(self, queue, message):
282 """
283 Delete a message from a queue.
284
285 :type queue: A :class:`boto.sqs.queue.Queue` object
286 :param queue: The Queue from which messages are read.
287
288 :type message: A :class:`boto.sqs.message.Message` object
289 :param message: The Message to be deleted
290
291 :rtype: bool
292 :return: True if successful, False otherwise.
293 """
294 params = {'ReceiptHandle' : message.receipt_handle}
295 return self.get_status('DeleteMessage', params, queue.id)
296
297 def delete_message_batch(self, queue, messages):
298 """
299 Deletes a list of messages from a queue in a single request.
300
301 :type queue: A :class:`boto.sqs.queue.Queue` object.
302 :param queue: The Queue to which the messages will be written.
303
304 :type messages: List of :class:`boto.sqs.message.Message` objects.
305 :param messages: A list of message objects.
306 """
307 params = {}
308 for i, msg in enumerate(messages):
309 prefix = 'DeleteMessageBatchRequestEntry'
310 p_name = '%s.%i.Id' % (prefix, (i+1))
311 params[p_name] = msg.id
312 p_name = '%s.%i.ReceiptHandle' % (prefix, (i+1))
313 params[p_name] = msg.receipt_handle
314 return self.get_object('DeleteMessageBatch', params, BatchResults,
315 queue.id, verb='POST')
316
317 def delete_message_from_handle(self, queue, receipt_handle):
318 """
319 Delete a message from a queue, given a receipt handle.
320
321 :type queue: A :class:`boto.sqs.queue.Queue` object
322 :param queue: The Queue from which messages are read.
323
324 :type receipt_handle: str
325 :param receipt_handle: The receipt handle for the message
326
327 :rtype: bool
328 :return: True if successful, False otherwise.
329 """
330 params = {'ReceiptHandle' : receipt_handle}
331 return self.get_status('DeleteMessage', params, queue.id)
332
333 def send_message(self, queue, message_content, delay_seconds=None,
334 message_attributes=None):
335 """
336 Send a new message to the queue.
337
338 :type queue: A :class:`boto.sqs.queue.Queue` object.
339 :param queue: The Queue to which the messages will be written.
340
341 :type message_content: string
342 :param message_content: The body of the message
343
344 :type delay_seconds: int
345 :param delay_seconds: Number of seconds (0 - 900) to delay this
346 message from being processed.
347
348 :type message_attributes: dict
349 :param message_attributes: Message attributes to set. Should be
350 of the form:
351
352 {
353 "name1": {
354 "data_type": "Number",
355 "string_value": "1"
356 },
357 "name2": {
358 "data_type": "String",
359 "string_value": "Bob"
360 }
361 }
362
363 """
364 params = {'MessageBody' : message_content}
365 if delay_seconds:
366 params['DelaySeconds'] = int(delay_seconds)
367
368 if message_attributes is not None:
369 keys = sorted(message_attributes.keys())
370 for i, name in enumerate(keys, start=1):
371 attribute = message_attributes[name]
372 params['MessageAttribute.%s.Name' % i] = name
373 if 'data_type' in attribute:
374 params['MessageAttribute.%s.Value.DataType' % i] = \
375 attribute['data_type']
376 if 'string_value' in attribute:
377 params['MessageAttribute.%s.Value.StringValue' % i] = \
378 attribute['string_value']
379 if 'binary_value' in attribute:
380 params['MessageAttribute.%s.Value.BinaryValue' % i] = \
381 attribute['binary_value']
382 if 'string_list_value' in attribute:
383 params['MessageAttribute.%s.Value.StringListValue' % i] = \
384 attribute['string_list_value']
385 if 'binary_list_value' in attribute:
386 params['MessageAttribute.%s.Value.BinaryListValue' % i] = \
387 attribute['binary_list_value']
388
389 return self.get_object('SendMessage', params, Message,
390 queue.id, verb='POST')
391
392 def send_message_batch(self, queue, messages):
393 """
394 Delivers up to 10 messages to a queue in a single request.
395
396 :type queue: A :class:`boto.sqs.queue.Queue` object.
397 :param queue: The Queue to which the messages will be written.
398
399 :type messages: List of lists.
400 :param messages: A list of lists or tuples. Each inner
401 tuple represents a single message to be written
402 and consists of and ID (string) that must be unique
403 within the list of messages, the message body itself
404 which can be a maximum of 64K in length, an
405 integer which represents the delay time (in seconds)
406 for the message (0-900) before the message will
407 be delivered to the queue, and an optional dict of
408 message attributes like those passed to ``send_message``
409 above.
410
411 """
412 params = {}
413 for i, msg in enumerate(messages):
414 base = 'SendMessageBatchRequestEntry.%i' % (i + 1)
415 params['%s.Id' % base] = msg[0]
416 params['%s.MessageBody' % base] = msg[1]
417 params['%s.DelaySeconds' % base] = msg[2]
418 if len(msg) > 3:
419 base += '.MessageAttribute'
420 keys = sorted(msg[3].keys())
421 for j, name in enumerate(keys):
422 attribute = msg[3][name]
423
424 p_name = '%s.%i.Name' % (base, j + 1)
425 params[p_name] = name
426
427 if 'data_type' in attribute:
428 p_name = '%s.%i.Value.DataType' % (base, j + 1)
429 params[p_name] = attribute['data_type']
430 if 'string_value' in attribute:
431 p_name = '%s.%i.Value.StringValue' % (base, j + 1)
432 params[p_name] = attribute['string_value']
433 if 'binary_value' in attribute:
434 p_name = '%s.%i.Value.BinaryValue' % (base, j + 1)
435 params[p_name] = attribute['binary_value']
436 if 'string_list_value' in attribute:
437 p_name = '%s.%i.Value.StringListValue' % (base, j + 1)
438 params[p_name] = attribute['string_list_value']
439 if 'binary_list_value' in attribute:
440 p_name = '%s.%i.Value.BinaryListValue' % (base, j + 1)
441 params[p_name] = attribute['binary_list_value']
442
443 return self.get_object('SendMessageBatch', params, BatchResults,
444 queue.id, verb='POST')
445
446 def change_message_visibility(self, queue, receipt_handle,
447 visibility_timeout):
448 """
449 Extends the read lock timeout for the specified message from
450 the specified queue to the specified value.
451
452 :type queue: A :class:`boto.sqs.queue.Queue` object
453 :param queue: The Queue from which messages are read.
454
455 :type receipt_handle: str
456 :param receipt_handle: The receipt handle associated with the message
457 whose visibility timeout will be changed.
458
459 :type visibility_timeout: int
460 :param visibility_timeout: The new value of the message's visibility
461 timeout in seconds.
462 """
463 params = {'ReceiptHandle' : receipt_handle,
464 'VisibilityTimeout' : visibility_timeout}
465 return self.get_status('ChangeMessageVisibility', params, queue.id)
466
467 def change_message_visibility_batch(self, queue, messages):
468 """
469 A batch version of change_message_visibility that can act
470 on up to 10 messages at a time.
471
472 :type queue: A :class:`boto.sqs.queue.Queue` object.
473 :param queue: The Queue to which the messages will be written.
474
475 :type messages: List of tuples.
476 :param messages: A list of tuples where each tuple consists
477 of a :class:`boto.sqs.message.Message` object and an integer
478 that represents the new visibility timeout for that message.
479 """
480 params = {}
481 for i, t in enumerate(messages):
482 prefix = 'ChangeMessageVisibilityBatchRequestEntry'
483 p_name = '%s.%i.Id' % (prefix, (i+1))
484 params[p_name] = t[0].id
485 p_name = '%s.%i.ReceiptHandle' % (prefix, (i+1))
486 params[p_name] = t[0].receipt_handle
487 p_name = '%s.%i.VisibilityTimeout' % (prefix, (i+1))
488 params[p_name] = t[1]
489 return self.get_object('ChangeMessageVisibilityBatch',
490 params, BatchResults,
491 queue.id, verb='POST')
492
493 def get_all_queues(self, prefix=''):
494 """
495 Retrieves all queues.
496
497 :keyword str prefix: Optionally, only return queues that start with
498 this value.
499 :rtype: list
500 :returns: A list of :py:class:`boto.sqs.queue.Queue` instances.
501 """
502 params = {}
503 if prefix:
504 params['QueueNamePrefix'] = prefix
505 return self.get_list('ListQueues', params, [('QueueUrl', Queue)])
506
507 def get_queue(self, queue_name, owner_acct_id=None):
508 """
509 Retrieves the queue with the given name, or ``None`` if no match
510 was found.
511
512 :param str queue_name: The name of the queue to retrieve.
513 :param str owner_acct_id: Optionally, the AWS account ID of the account that created the queue.
514 :rtype: :py:class:`boto.sqs.queue.Queue` or ``None``
515 :returns: The requested queue, or ``None`` if no match was found.
516 """
517 params = {'QueueName': queue_name}
518 if owner_acct_id:
519 params['QueueOwnerAWSAccountId']=owner_acct_id
520 try:
521 return self.get_object('GetQueueUrl', params, Queue)
522 except SQSError:
523 return None
524
525 lookup = get_queue
526
527 def get_dead_letter_source_queues(self, queue):
528 """
529 Retrieves the dead letter source queues for a given queue.
530
531 :type queue: A :class:`boto.sqs.queue.Queue` object.
532 :param queue: The queue for which to get DL source queues
533 :rtype: list
534 :returns: A list of :py:class:`boto.sqs.queue.Queue` instances.
535 """
536 params = {'QueueUrl': queue.url}
537 return self.get_list('ListDeadLetterSourceQueues', params,
538 [('QueueUrl', Queue)])
539
540 #
541 # Permissions methods
542 #
543
544 def add_permission(self, queue, label, aws_account_id, action_name):
545 """
546 Add a permission to a queue.
547
548 :type queue: :class:`boto.sqs.queue.Queue`
549 :param queue: The queue object
550
551 :type label: str or unicode
552 :param label: A unique identification of the permission you are setting.
553 Maximum of 80 characters ``[0-9a-zA-Z_-]``
554 Example, AliceSendMessage
555
556 :type aws_account_id: str or unicode
557 :param principal_id: The AWS account number of the principal
558 who will be given permission. The principal must have an
559 AWS account, but does not need to be signed up for Amazon
560 SQS. For information about locating the AWS account
561 identification.
562
563 :type action_name: str or unicode
564 :param action_name: The action. Valid choices are:
565 * *
566 * SendMessage
567 * ReceiveMessage
568 * DeleteMessage
569 * ChangeMessageVisibility
570 * GetQueueAttributes
571
572 :rtype: bool
573 :return: True if successful, False otherwise.
574
575 """
576 params = {'Label': label,
577 'AWSAccountId' : aws_account_id,
578 'ActionName' : action_name}
579 return self.get_status('AddPermission', params, queue.id)
580
581 def remove_permission(self, queue, label):
582 """
583 Remove a permission from a queue.
584
585 :type queue: :class:`boto.sqs.queue.Queue`
586 :param queue: The queue object
587
588 :type label: str or unicode
589 :param label: The unique label associated with the permission
590 being removed.
591
592 :rtype: bool
593 :return: True if successful, False otherwise.
594 """
595 params = {'Label': label}
596 return self.get_status('RemovePermission', params, queue.id)