comparison env/lib/python3.9/site-packages/boto/glacier/concurrent.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # Copyright (c) 2012 Amazon.com, Inc. or its affiliates. All Rights Reserved
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions:
10 #
11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE.
21 #
22 import os
23 import math
24 import threading
25 import hashlib
26 import time
27 import logging
28 from boto.compat import Queue
29 import binascii
30
31 from boto.glacier.utils import DEFAULT_PART_SIZE, minimum_part_size, \
32 chunk_hashes, tree_hash, bytes_to_hex
33 from boto.glacier.exceptions import UploadArchiveError, \
34 DownloadArchiveError, \
35 TreeHashDoesNotMatchError
36
37
38 _END_SENTINEL = object()
39 log = logging.getLogger('boto.glacier.concurrent')
40
41
42 class ConcurrentTransferer(object):
43 def __init__(self, part_size=DEFAULT_PART_SIZE, num_threads=10):
44 self._part_size = part_size
45 self._num_threads = num_threads
46 self._threads = []
47
48 def _calculate_required_part_size(self, total_size):
49 min_part_size_required = minimum_part_size(total_size)
50 if self._part_size >= min_part_size_required:
51 part_size = self._part_size
52 else:
53 part_size = min_part_size_required
54 log.debug("The part size specified (%s) is smaller than "
55 "the minimum required part size. Using a part "
56 "size of: %s", self._part_size, part_size)
57 total_parts = int(math.ceil(total_size / float(part_size)))
58 return total_parts, part_size
59
60 def _shutdown_threads(self):
61 log.debug("Shutting down threads.")
62 for thread in self._threads:
63 thread.should_continue = False
64 for thread in self._threads:
65 thread.join()
66 log.debug("Threads have exited.")
67
68 def _add_work_items_to_queue(self, total_parts, worker_queue, part_size):
69 log.debug("Adding work items to queue.")
70 for i in range(total_parts):
71 worker_queue.put((i, part_size))
72 for i in range(self._num_threads):
73 worker_queue.put(_END_SENTINEL)
74
75
76 class ConcurrentUploader(ConcurrentTransferer):
77 """Concurrently upload an archive to glacier.
78
79 This class uses a thread pool to concurrently upload an archive
80 to glacier using the multipart upload API.
81
82 The threadpool is completely managed by this class and is
83 transparent to the users of this class.
84
85 """
86 def __init__(self, api, vault_name, part_size=DEFAULT_PART_SIZE,
87 num_threads=10):
88 """
89 :type api: :class:`boto.glacier.layer1.Layer1`
90 :param api: A layer1 glacier object.
91
92 :type vault_name: str
93 :param vault_name: The name of the vault.
94
95 :type part_size: int
96 :param part_size: The size, in bytes, of the chunks to use when uploading
97 the archive parts. The part size must be a megabyte multiplied by
98 a power of two.
99
100 :type num_threads: int
101 :param num_threads: The number of threads to spawn for the thread pool.
102 The number of threads will control how much parts are being
103 concurrently uploaded.
104
105 """
106 super(ConcurrentUploader, self).__init__(part_size, num_threads)
107 self._api = api
108 self._vault_name = vault_name
109
110 def upload(self, filename, description=None):
111 """Concurrently create an archive.
112
113 The part_size value specified when the class was constructed
114 will be used *unless* it is smaller than the minimum required
115 part size needed for the size of the given file. In that case,
116 the part size used will be the minimum part size required
117 to properly upload the given file.
118
119 :type file: str
120 :param file: The filename to upload
121
122 :type description: str
123 :param description: The description of the archive.
124
125 :rtype: str
126 :return: The archive id of the newly created archive.
127
128 """
129 total_size = os.stat(filename).st_size
130 total_parts, part_size = self._calculate_required_part_size(total_size)
131 hash_chunks = [None] * total_parts
132 worker_queue = Queue()
133 result_queue = Queue()
134 response = self._api.initiate_multipart_upload(self._vault_name,
135 part_size,
136 description)
137 upload_id = response['UploadId']
138 # The basic idea is to add the chunks (the offsets not the actual
139 # contents) to a work queue, start up a thread pool, let the crank
140 # through the items in the work queue, and then place their results
141 # in a result queue which we use to complete the multipart upload.
142 self._add_work_items_to_queue(total_parts, worker_queue, part_size)
143 self._start_upload_threads(result_queue, upload_id,
144 worker_queue, filename)
145 try:
146 self._wait_for_upload_threads(hash_chunks, result_queue,
147 total_parts)
148 except UploadArchiveError as e:
149 log.debug("An error occurred while uploading an archive, "
150 "aborting multipart upload.")
151 self._api.abort_multipart_upload(self._vault_name, upload_id)
152 raise e
153 log.debug("Completing upload.")
154 response = self._api.complete_multipart_upload(
155 self._vault_name, upload_id, bytes_to_hex(tree_hash(hash_chunks)),
156 total_size)
157 log.debug("Upload finished.")
158 return response['ArchiveId']
159
160 def _wait_for_upload_threads(self, hash_chunks, result_queue, total_parts):
161 for _ in range(total_parts):
162 result = result_queue.get()
163 if isinstance(result, Exception):
164 log.debug("An error was found in the result queue, terminating "
165 "threads: %s", result)
166 self._shutdown_threads()
167 raise UploadArchiveError("An error occurred while uploading "
168 "an archive: %s" % result)
169 # Each unit of work returns the tree hash for the given part
170 # number, which we use at the end to compute the tree hash of
171 # the entire archive.
172 part_number, tree_sha256 = result
173 hash_chunks[part_number] = tree_sha256
174 self._shutdown_threads()
175
176 def _start_upload_threads(self, result_queue, upload_id, worker_queue,
177 filename):
178 log.debug("Starting threads.")
179 for _ in range(self._num_threads):
180 thread = UploadWorkerThread(self._api, self._vault_name, filename,
181 upload_id, worker_queue, result_queue)
182 time.sleep(0.2)
183 thread.start()
184 self._threads.append(thread)
185
186
187 class TransferThread(threading.Thread):
188 def __init__(self, worker_queue, result_queue):
189 super(TransferThread, self).__init__()
190 self._worker_queue = worker_queue
191 self._result_queue = result_queue
192 # This value can be set externally by other objects
193 # to indicate that the thread should be shut down.
194 self.should_continue = True
195
196 def run(self):
197 while self.should_continue:
198 try:
199 work = self._worker_queue.get(timeout=1)
200 except Empty:
201 continue
202 if work is _END_SENTINEL:
203 self._cleanup()
204 return
205 result = self._process_chunk(work)
206 self._result_queue.put(result)
207 self._cleanup()
208
209 def _process_chunk(self, work):
210 pass
211
212 def _cleanup(self):
213 pass
214
215
216 class UploadWorkerThread(TransferThread):
217 def __init__(self, api, vault_name, filename, upload_id,
218 worker_queue, result_queue, num_retries=5,
219 time_between_retries=5,
220 retry_exceptions=Exception):
221 super(UploadWorkerThread, self).__init__(worker_queue, result_queue)
222 self._api = api
223 self._vault_name = vault_name
224 self._filename = filename
225 self._fileobj = open(filename, 'rb')
226 self._upload_id = upload_id
227 self._num_retries = num_retries
228 self._time_between_retries = time_between_retries
229 self._retry_exceptions = retry_exceptions
230
231 def _process_chunk(self, work):
232 result = None
233 for i in range(self._num_retries + 1):
234 try:
235 result = self._upload_chunk(work)
236 break
237 except self._retry_exceptions as e:
238 log.error("Exception caught uploading part number %s for "
239 "vault %s, attempt: (%s / %s), filename: %s, "
240 "exception: %s, msg: %s",
241 work[0], self._vault_name, i + 1, self._num_retries + 1,
242 self._filename, e.__class__, e)
243 time.sleep(self._time_between_retries)
244 result = e
245 return result
246
247 def _upload_chunk(self, work):
248 part_number, part_size = work
249 start_byte = part_number * part_size
250 self._fileobj.seek(start_byte)
251 contents = self._fileobj.read(part_size)
252 linear_hash = hashlib.sha256(contents).hexdigest()
253 tree_hash_bytes = tree_hash(chunk_hashes(contents))
254 byte_range = (start_byte, start_byte + len(contents) - 1)
255 log.debug("Uploading chunk %s of size %s", part_number, part_size)
256 response = self._api.upload_part(self._vault_name, self._upload_id,
257 linear_hash,
258 bytes_to_hex(tree_hash_bytes),
259 byte_range, contents)
260 # Reading the response allows the connection to be reused.
261 response.read()
262 return (part_number, tree_hash_bytes)
263
264 def _cleanup(self):
265 self._fileobj.close()
266
267
268 class ConcurrentDownloader(ConcurrentTransferer):
269 """
270 Concurrently download an archive from glacier.
271
272 This class uses a thread pool to concurrently download an archive
273 from glacier.
274
275 The threadpool is completely managed by this class and is
276 transparent to the users of this class.
277
278 """
279 def __init__(self, job, part_size=DEFAULT_PART_SIZE,
280 num_threads=10):
281 """
282 :param job: A layer2 job object for archive retrieval object.
283
284 :param part_size: The size, in bytes, of the chunks to use when uploading
285 the archive parts. The part size must be a megabyte multiplied by
286 a power of two.
287
288 """
289 super(ConcurrentDownloader, self).__init__(part_size, num_threads)
290 self._job = job
291
292 def download(self, filename):
293 """
294 Concurrently download an archive.
295
296 :param filename: The filename to download the archive to
297 :type filename: str
298
299 """
300 total_size = self._job.archive_size
301 total_parts, part_size = self._calculate_required_part_size(total_size)
302 worker_queue = Queue()
303 result_queue = Queue()
304 self._add_work_items_to_queue(total_parts, worker_queue, part_size)
305 self._start_download_threads(result_queue, worker_queue)
306 try:
307 self._wait_for_download_threads(filename, result_queue, total_parts)
308 except DownloadArchiveError as e:
309 log.debug("An error occurred while downloading an archive: %s", e)
310 raise e
311 log.debug("Download completed.")
312
313 def _wait_for_download_threads(self, filename, result_queue, total_parts):
314 """
315 Waits until the result_queue is filled with all the downloaded parts
316 This indicates that all part downloads have completed
317
318 Saves downloaded parts into filename
319
320 :param filename:
321 :param result_queue:
322 :param total_parts:
323 """
324 hash_chunks = [None] * total_parts
325 with open(filename, "wb") as f:
326 for _ in range(total_parts):
327 result = result_queue.get()
328 if isinstance(result, Exception):
329 log.debug("An error was found in the result queue, "
330 "terminating threads: %s", result)
331 self._shutdown_threads()
332 raise DownloadArchiveError(
333 "An error occurred while uploading "
334 "an archive: %s" % result)
335 part_number, part_size, actual_hash, data = result
336 hash_chunks[part_number] = actual_hash
337 start_byte = part_number * part_size
338 f.seek(start_byte)
339 f.write(data)
340 f.flush()
341 final_hash = bytes_to_hex(tree_hash(hash_chunks))
342 log.debug("Verifying final tree hash of archive, expecting: %s, "
343 "actual: %s", self._job.sha256_treehash, final_hash)
344 if self._job.sha256_treehash != final_hash:
345 self._shutdown_threads()
346 raise TreeHashDoesNotMatchError(
347 "Tree hash for entire archive does not match, "
348 "expected: %s, got: %s" % (self._job.sha256_treehash,
349 final_hash))
350 self._shutdown_threads()
351
352 def _start_download_threads(self, result_queue, worker_queue):
353 log.debug("Starting threads.")
354 for _ in range(self._num_threads):
355 thread = DownloadWorkerThread(self._job, worker_queue, result_queue)
356 time.sleep(0.2)
357 thread.start()
358 self._threads.append(thread)
359
360
361 class DownloadWorkerThread(TransferThread):
362 def __init__(self, job,
363 worker_queue, result_queue,
364 num_retries=5,
365 time_between_retries=5,
366 retry_exceptions=Exception):
367 """
368 Individual download thread that will download parts of the file from Glacier. Parts
369 to download stored in work queue.
370
371 Parts download to a temp dir with each part a separate file
372
373 :param job: Glacier job object
374 :param work_queue: A queue of tuples which include the part_number and
375 part_size
376 :param result_queue: A priority queue of tuples which include the
377 part_number and the path to the temp file that holds that
378 part's data.
379
380 """
381 super(DownloadWorkerThread, self).__init__(worker_queue, result_queue)
382 self._job = job
383 self._num_retries = num_retries
384 self._time_between_retries = time_between_retries
385 self._retry_exceptions = retry_exceptions
386
387 def _process_chunk(self, work):
388 """
389 Attempt to download a part of the archive from Glacier
390 Store the result in the result_queue
391
392 :param work:
393 """
394 result = None
395 for _ in range(self._num_retries):
396 try:
397 result = self._download_chunk(work)
398 break
399 except self._retry_exceptions as e:
400 log.error("Exception caught downloading part number %s for "
401 "job %s", work[0], self._job,)
402 time.sleep(self._time_between_retries)
403 result = e
404 return result
405
406 def _download_chunk(self, work):
407 """
408 Downloads a chunk of archive from Glacier. Saves the data to a temp file
409 Returns the part number and temp file location
410
411 :param work:
412 """
413 part_number, part_size = work
414 start_byte = part_number * part_size
415 byte_range = (start_byte, start_byte + part_size - 1)
416 log.debug("Downloading chunk %s of size %s", part_number, part_size)
417 response = self._job.get_output(byte_range)
418 data = response.read()
419 actual_hash = bytes_to_hex(tree_hash(chunk_hashes(data)))
420 if response['TreeHash'] != actual_hash:
421 raise TreeHashDoesNotMatchError(
422 "Tree hash for part number %s does not match, "
423 "expected: %s, got: %s" % (part_number, response['TreeHash'],
424 actual_hash))
425 return (part_number, part_size, binascii.unhexlify(actual_hash), data)