comparison env/lib/python3.9/site-packages/boto/sqs/queue.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # Copyright (c) 2006-2009 Mitch Garnaat http://garnaat.org/
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions:
10 #
11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE.
21
22 """
23 Represents an SQS Queue
24 """
25 from boto.compat import urllib
26 from boto.sqs.message import Message
27
28
29 class Queue(object):
30
31 def __init__(self, connection=None, url=None, message_class=Message):
32 self.connection = connection
33 self.url = url
34 self.message_class = message_class
35 self.visibility_timeout = None
36
37 def __repr__(self):
38 return 'Queue(%s)' % self.url
39
40 def _id(self):
41 if self.url:
42 val = urllib.parse.urlparse(self.url)[2]
43 else:
44 val = self.url
45 return val
46 id = property(_id)
47
48 def _name(self):
49 if self.url:
50 val = urllib.parse.urlparse(self.url)[2].split('/')[2]
51 else:
52 val = self.url
53 return val
54 name = property(_name)
55
56 def _arn(self):
57 parts = self.id.split('/')
58 if self.connection.region.name == 'cn-north-1':
59 partition = 'aws-cn'
60 else:
61 partition = 'aws'
62 return 'arn:%s:sqs:%s:%s:%s' % (
63 partition, self.connection.region.name, parts[1], parts[2])
64 arn = property(_arn)
65
66 def startElement(self, name, attrs, connection):
67 return None
68
69 def endElement(self, name, value, connection):
70 if name == 'QueueUrl':
71 self.url = value
72 elif name == 'VisibilityTimeout':
73 self.visibility_timeout = int(value)
74 else:
75 setattr(self, name, value)
76
77 def set_message_class(self, message_class):
78 """
79 Set the message class that should be used when instantiating
80 messages read from the queue. By default, the class
81 :class:`boto.sqs.message.Message` is used but this can be overriden
82 with any class that behaves like a message.
83
84 :type message_class: Message-like class
85 :param message_class: The new Message class
86 """
87 self.message_class = message_class
88
89 def get_attributes(self, attributes='All'):
90 """
91 Retrieves attributes about this queue object and returns
92 them in an Attribute instance (subclass of a Dictionary).
93
94 :type attributes: string
95 :param attributes: String containing one of:
96 ApproximateNumberOfMessages,
97 ApproximateNumberOfMessagesNotVisible,
98 VisibilityTimeout,
99 CreatedTimestamp,
100 LastModifiedTimestamp,
101 Policy
102 ReceiveMessageWaitTimeSeconds
103 :rtype: Attribute object
104 :return: An Attribute object which is a mapping type holding the
105 requested name/value pairs
106 """
107 return self.connection.get_queue_attributes(self, attributes)
108
109 def set_attribute(self, attribute, value):
110 """
111 Set a new value for an attribute of the Queue.
112
113 :type attribute: String
114 :param attribute: The name of the attribute you want to set.
115
116 :param value: The new value for the attribute must be:
117
118
119 * For `DelaySeconds` the value must be an integer number of
120 seconds from 0 to 900 (15 minutes).
121 >>> queue.set_attribute('DelaySeconds', 900)
122
123 * For `MaximumMessageSize` the value must be an integer number of
124 bytes from 1024 (1 KiB) to 262144 (256 KiB).
125 >>> queue.set_attribute('MaximumMessageSize', 262144)
126
127 * For `MessageRetentionPeriod` the value must be an integer number of
128 seconds from 60 (1 minute) to 1209600 (14 days).
129 >>> queue.set_attribute('MessageRetentionPeriod', 1209600)
130
131 * For `Policy` the value must be an string that contains JSON formatted
132 parameters and values.
133 >>> queue.set_attribute('Policy', json.dumps({
134 ... 'Version': '2008-10-17',
135 ... 'Id': '/123456789012/testQueue/SQSDefaultPolicy',
136 ... 'Statement': [
137 ... {
138 ... 'Sid': 'Queue1ReceiveMessage',
139 ... 'Effect': 'Allow',
140 ... 'Principal': {
141 ... 'AWS': '*'
142 ... },
143 ... 'Action': 'SQS:ReceiveMessage',
144 ... 'Resource': 'arn:aws:aws:sqs:us-east-1:123456789012:testQueue'
145 ... }
146 ... ]
147 ... }))
148
149 * For `ReceiveMessageWaitTimeSeconds` the value must be an integer number of
150 seconds from 0 to 20.
151 >>> queue.set_attribute('ReceiveMessageWaitTimeSeconds', 20)
152
153 * For `VisibilityTimeout` the value must be an integer number of
154 seconds from 0 to 43200 (12 hours).
155 >>> queue.set_attribute('VisibilityTimeout', 43200)
156
157 * For `RedrivePolicy` the value must be an string that contains JSON formatted
158 parameters and values. You can set maxReceiveCount to a value between 1 and 1000.
159 The deadLetterTargetArn value is the Amazon Resource Name (ARN) of the queue that
160 will receive the dead letter messages.
161 >>> queue.set_attribute('RedrivePolicy', json.dumps({
162 ... 'maxReceiveCount': 5,
163 ... 'deadLetterTargetArn': "arn:aws:aws:sqs:us-east-1:123456789012:testDeadLetterQueue"
164 ... }))
165
166 :rtype: bool
167 :return: True if successful, otherwise False.
168 """
169 return self.connection.set_queue_attribute(self, attribute, value)
170
171 def get_timeout(self):
172 """
173 Get the visibility timeout for the queue.
174
175 :rtype: int
176 :return: The number of seconds as an integer.
177 """
178 a = self.get_attributes('VisibilityTimeout')
179 return int(a['VisibilityTimeout'])
180
181 def set_timeout(self, visibility_timeout):
182 """
183 Set the visibility timeout for the queue.
184
185 :type visibility_timeout: int
186 :param visibility_timeout: The desired timeout in seconds
187 """
188 retval = self.set_attribute('VisibilityTimeout', visibility_timeout)
189 if retval:
190 self.visibility_timeout = visibility_timeout
191 return retval
192
193 def add_permission(self, label, aws_account_id, action_name):
194 """
195 Add a permission to a queue.
196
197 :type label: str or unicode
198 :param label: A unique identification of the permission you are setting.
199 Maximum of 80 characters ``[0-9a-zA-Z_-]``
200 Example, AliceSendMessage
201
202 :type aws_account_id: str or unicode
203 :param principal_id: The AWS account number of the principal who
204 will be given permission. The principal must have an AWS account,
205 but does not need to be signed up for Amazon SQS. For information
206 about locating the AWS account identification.
207
208 :type action_name: str or unicode
209 :param action_name: The action. Valid choices are:
210 SendMessage|ReceiveMessage|DeleteMessage|
211 ChangeMessageVisibility|GetQueueAttributes|*
212
213 :rtype: bool
214 :return: True if successful, False otherwise.
215
216 """
217 return self.connection.add_permission(self, label, aws_account_id,
218 action_name)
219
220 def remove_permission(self, label):
221 """
222 Remove a permission from a queue.
223
224 :type label: str or unicode
225 :param label: The unique label associated with the permission
226 being removed.
227
228 :rtype: bool
229 :return: True if successful, False otherwise.
230 """
231 return self.connection.remove_permission(self, label)
232
233 def read(self, visibility_timeout=None, wait_time_seconds=None,
234 message_attributes=None):
235 """
236 Read a single message from the queue.
237
238 :type visibility_timeout: int
239 :param visibility_timeout: The timeout for this message in seconds
240
241 :type wait_time_seconds: int
242 :param wait_time_seconds: The duration (in seconds) for which the call
243 will wait for a message to arrive in the queue before returning.
244 If a message is available, the call will return sooner than
245 wait_time_seconds.
246
247 :type message_attributes: list
248 :param message_attributes: The name(s) of additional message
249 attributes to return. The default is to return no additional
250 message attributes. Use ``['All']`` or ``['.*']`` to return all.
251
252 :rtype: :class:`boto.sqs.message.Message`
253 :return: A single message or None if queue is empty
254 """
255 rs = self.get_messages(1, visibility_timeout,
256 wait_time_seconds=wait_time_seconds,
257 message_attributes=message_attributes)
258 if len(rs) == 1:
259 return rs[0]
260 else:
261 return None
262
263 def write(self, message, delay_seconds=None):
264 """
265 Add a single message to the queue.
266
267 :type message: Message
268 :param message: The message to be written to the queue
269
270 :rtype: :class:`boto.sqs.message.Message`
271 :return: The :class:`boto.sqs.message.Message` object that was written.
272 """
273 new_msg = self.connection.send_message(self,
274 message.get_body_encoded(), delay_seconds=delay_seconds,
275 message_attributes=message.message_attributes)
276 message.id = new_msg.id
277 message.md5 = new_msg.md5
278 return message
279
280 def write_batch(self, messages):
281 """
282 Delivers up to 10 messages in a single request.
283
284 :type messages: List of lists.
285 :param messages: A list of lists or tuples. Each inner
286 tuple represents a single message to be written
287 and consists of and ID (string) that must be unique
288 within the list of messages, the message body itself
289 which can be a maximum of 64K in length, an
290 integer which represents the delay time (in seconds)
291 for the message (0-900) before the message will
292 be delivered to the queue, and an optional dict of
293 message attributes like those passed to ``send_message``
294 in the connection class.
295 """
296 return self.connection.send_message_batch(self, messages)
297
298 def new_message(self, body='', **kwargs):
299 """
300 Create new message of appropriate class.
301
302 :type body: message body
303 :param body: The body of the newly created message (optional).
304
305 :rtype: :class:`boto.sqs.message.Message`
306 :return: A new Message object
307 """
308 m = self.message_class(self, body, **kwargs)
309 m.queue = self
310 return m
311
312 # get a variable number of messages, returns a list of messages
313 def get_messages(self, num_messages=1, visibility_timeout=None,
314 attributes=None, wait_time_seconds=None,
315 message_attributes=None):
316 """
317 Get a variable number of messages.
318
319 :type num_messages: int
320 :param num_messages: The maximum number of messages to read from
321 the queue.
322
323 :type visibility_timeout: int
324 :param visibility_timeout: The VisibilityTimeout for the messages read.
325
326 :type attributes: str
327 :param attributes: The name of additional attribute to return
328 with response or All if you want all attributes. The
329 default is to return no additional attributes. Valid
330 values: All SenderId SentTimestamp ApproximateReceiveCount
331 ApproximateFirstReceiveTimestamp
332
333 :type wait_time_seconds: int
334 :param wait_time_seconds: The duration (in seconds) for which the call
335 will wait for a message to arrive in the queue before returning.
336 If a message is available, the call will return sooner than
337 wait_time_seconds.
338
339 :type message_attributes: list
340 :param message_attributes: The name(s) of additional message
341 attributes to return. The default is to return no additional
342 message attributes. Use ``['All']`` or ``['.*']`` to return all.
343
344 :rtype: list
345 :return: A list of :class:`boto.sqs.message.Message` objects.
346 """
347 return self.connection.receive_message(
348 self, number_messages=num_messages,
349 visibility_timeout=visibility_timeout, attributes=attributes,
350 wait_time_seconds=wait_time_seconds,
351 message_attributes=message_attributes)
352
353 def delete_message(self, message):
354 """
355 Delete a message from the queue.
356
357 :type message: :class:`boto.sqs.message.Message`
358 :param message: The :class:`boto.sqs.message.Message` object to delete.
359
360 :rtype: bool
361 :return: True if successful, False otherwise
362 """
363 return self.connection.delete_message(self, message)
364
365 def delete_message_batch(self, messages):
366 """
367 Deletes a list of messages in a single request.
368
369 :type messages: List of :class:`boto.sqs.message.Message` objects.
370 :param messages: A list of message objects.
371 """
372 return self.connection.delete_message_batch(self, messages)
373
374 def change_message_visibility_batch(self, messages):
375 """
376 A batch version of change_message_visibility that can act
377 on up to 10 messages at a time.
378
379 :type messages: List of tuples.
380 :param messages: A list of tuples where each tuple consists
381 of a :class:`boto.sqs.message.Message` object and an integer
382 that represents the new visibility timeout for that message.
383 """
384 return self.connection.change_message_visibility_batch(self, messages)
385
386 def delete(self):
387 """
388 Delete the queue.
389 """
390 return self.connection.delete_queue(self)
391
392 def purge(self):
393 """
394 Purge all messages in the queue.
395 """
396 return self.connection.purge_queue(self)
397
398 def clear(self, page_size=10, vtimeout=10):
399 """Deprecated utility function to remove all messages from a queue"""
400 return self.purge()
401
402 def count(self, page_size=10, vtimeout=10):
403 """
404 Utility function to count the number of messages in a queue.
405 Note: This function now calls GetQueueAttributes to obtain
406 an 'approximate' count of the number of messages in a queue.
407 """
408 a = self.get_attributes('ApproximateNumberOfMessages')
409 return int(a['ApproximateNumberOfMessages'])
410
411 def count_slow(self, page_size=10, vtimeout=10):
412 """
413 Deprecated. This is the old 'count' method that actually counts
414 the messages by reading them all. This gives an accurate count but
415 is very slow for queues with non-trivial number of messasges.
416 Instead, use get_attributes('ApproximateNumberOfMessages') to take
417 advantage of the new SQS capability. This is retained only for
418 the unit tests.
419 """
420 n = 0
421 l = self.get_messages(page_size, vtimeout)
422 while l:
423 for m in l:
424 n += 1
425 l = self.get_messages(page_size, vtimeout)
426 return n
427
428 def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'):
429 """Utility function to dump the messages in a queue to a file
430 NOTE: Page size must be < 10 else SQS errors"""
431 fp = open(file_name, 'wb')
432 n = 0
433 l = self.get_messages(page_size, vtimeout)
434 while l:
435 for m in l:
436 fp.write(m.get_body())
437 if sep:
438 fp.write(sep)
439 n += 1
440 l = self.get_messages(page_size, vtimeout)
441 fp.close()
442 return n
443
444 def save_to_file(self, fp, sep='\n'):
445 """
446 Read all messages from the queue and persist them to file-like object.
447 Messages are written to the file and the 'sep' string is written
448 in between messages. Messages are deleted from the queue after
449 being written to the file.
450 Returns the number of messages saved.
451 """
452 n = 0
453 m = self.read()
454 while m:
455 n += 1
456 fp.write(m.get_body())
457 if sep:
458 fp.write(sep)
459 self.delete_message(m)
460 m = self.read()
461 return n
462
463 def save_to_filename(self, file_name, sep='\n'):
464 """
465 Read all messages from the queue and persist them to local file.
466 Messages are written to the file and the 'sep' string is written
467 in between messages. Messages are deleted from the queue after
468 being written to the file.
469 Returns the number of messages saved.
470 """
471 fp = open(file_name, 'wb')
472 n = self.save_to_file(fp, sep)
473 fp.close()
474 return n
475
476 # for backwards compatibility
477 save = save_to_filename
478
479 def save_to_s3(self, bucket):
480 """
481 Read all messages from the queue and persist them to S3.
482 Messages are stored in the S3 bucket using a naming scheme of::
483
484 <queue_id>/<message_id>
485
486 Messages are deleted from the queue after being saved to S3.
487 Returns the number of messages saved.
488 """
489 n = 0
490 m = self.read()
491 while m:
492 n += 1
493 key = bucket.new_key('%s/%s' % (self.id, m.id))
494 key.set_contents_from_string(m.get_body())
495 self.delete_message(m)
496 m = self.read()
497 return n
498
499 def load_from_s3(self, bucket, prefix=None):
500 """
501 Load messages previously saved to S3.
502 """
503 n = 0
504 if prefix:
505 prefix = '%s/' % prefix
506 else:
507 prefix = '%s/' % self.id[1:]
508 rs = bucket.list(prefix=prefix)
509 for key in rs:
510 n += 1
511 m = self.new_message(key.get_contents_as_string())
512 self.write(m)
513 return n
514
515 def load_from_file(self, fp, sep='\n'):
516 """Utility function to load messages from a file-like object to a queue"""
517 n = 0
518 body = ''
519 l = fp.readline()
520 while l:
521 if l == sep:
522 m = Message(self, body)
523 self.write(m)
524 n += 1
525 print('writing message %d' % n)
526 body = ''
527 else:
528 body = body + l
529 l = fp.readline()
530 return n
531
532 def load_from_filename(self, file_name, sep='\n'):
533 """Utility function to load messages from a local filename to a queue"""
534 fp = open(file_name, 'rb')
535 n = self.load_from_file(fp, sep)
536 fp.close()
537 return n
538
539 # for backward compatibility
540 load = load_from_filename
541