diff env/lib/python3.9/site-packages/boto/glacier/concurrent.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/boto/glacier/concurrent.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,425 @@
+# Copyright (c) 2012 Amazon.com, Inc. or its affiliates.  All Rights Reserved
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish, dis-
+# tribute, sublicense, and/or sell copies of the Software, and to permit
+# persons to whom the Software is furnished to do so, subject to the fol-
+# lowing conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+#
+import os
+import math
+import threading
+import hashlib
+import time
+import logging
+from boto.compat import Queue
+import binascii
+
+from boto.glacier.utils import DEFAULT_PART_SIZE, minimum_part_size, \
+                               chunk_hashes, tree_hash, bytes_to_hex
+from boto.glacier.exceptions import UploadArchiveError, \
+                                    DownloadArchiveError, \
+                                    TreeHashDoesNotMatchError
+
+
+_END_SENTINEL = object()
+log = logging.getLogger('boto.glacier.concurrent')
+
+
+class ConcurrentTransferer(object):
+    def __init__(self, part_size=DEFAULT_PART_SIZE, num_threads=10):
+        self._part_size = part_size
+        self._num_threads = num_threads
+        self._threads = []
+
+    def _calculate_required_part_size(self, total_size):
+        min_part_size_required = minimum_part_size(total_size)
+        if self._part_size >= min_part_size_required:
+            part_size = self._part_size
+        else:
+            part_size = min_part_size_required
+            log.debug("The part size specified (%s) is smaller than "
+                      "the minimum required part size.  Using a part "
+                      "size of: %s", self._part_size, part_size)
+        total_parts = int(math.ceil(total_size / float(part_size)))
+        return total_parts, part_size
+
+    def _shutdown_threads(self):
+        log.debug("Shutting down threads.")
+        for thread in self._threads:
+            thread.should_continue = False
+        for thread in self._threads:
+            thread.join()
+        log.debug("Threads have exited.")
+
+    def _add_work_items_to_queue(self, total_parts, worker_queue, part_size):
+        log.debug("Adding work items to queue.")
+        for i in range(total_parts):
+            worker_queue.put((i, part_size))
+        for i in range(self._num_threads):
+            worker_queue.put(_END_SENTINEL)
+
+
+class ConcurrentUploader(ConcurrentTransferer):
+    """Concurrently upload an archive to glacier.
+
+    This class uses a thread pool to concurrently upload an archive
+    to glacier using the multipart upload API.
+
+    The threadpool is completely managed by this class and is
+    transparent to the users of this class.
+
+    """
+    def __init__(self, api, vault_name, part_size=DEFAULT_PART_SIZE,
+                 num_threads=10):
+        """
+        :type api: :class:`boto.glacier.layer1.Layer1`
+        :param api: A layer1 glacier object.
+
+        :type vault_name: str
+        :param vault_name: The name of the vault.
+
+        :type part_size: int
+        :param part_size: The size, in bytes, of the chunks to use when uploading
+            the archive parts.  The part size must be a megabyte multiplied by
+            a power of two.
+
+        :type num_threads: int
+        :param num_threads: The number of threads to spawn for the thread pool.
+            The number of threads will control how much parts are being
+            concurrently uploaded.
+
+        """
+        super(ConcurrentUploader, self).__init__(part_size, num_threads)
+        self._api = api
+        self._vault_name = vault_name
+
+    def upload(self, filename, description=None):
+        """Concurrently create an archive.
+
+        The part_size value specified when the class was constructed
+        will be used *unless* it is smaller than the minimum required
+        part size needed for the size of the given file.  In that case,
+        the part size used will be the minimum part size required
+        to properly upload the given file.
+
+        :type file: str
+        :param file: The filename to upload
+
+        :type description: str
+        :param description: The description of the archive.
+
+        :rtype: str
+        :return: The archive id of the newly created archive.
+
+        """
+        total_size = os.stat(filename).st_size
+        total_parts, part_size = self._calculate_required_part_size(total_size)
+        hash_chunks = [None] * total_parts
+        worker_queue = Queue()
+        result_queue = Queue()
+        response = self._api.initiate_multipart_upload(self._vault_name,
+                                                       part_size,
+                                                       description)
+        upload_id = response['UploadId']
+        # The basic idea is to add the chunks (the offsets not the actual
+        # contents) to a work queue, start up a thread pool, let the crank
+        # through the items in the work queue, and then place their results
+        # in a result queue which we use to complete the multipart upload.
+        self._add_work_items_to_queue(total_parts, worker_queue, part_size)
+        self._start_upload_threads(result_queue, upload_id,
+                                   worker_queue, filename)
+        try:
+            self._wait_for_upload_threads(hash_chunks, result_queue,
+                                          total_parts)
+        except UploadArchiveError as e:
+            log.debug("An error occurred while uploading an archive, "
+                      "aborting multipart upload.")
+            self._api.abort_multipart_upload(self._vault_name, upload_id)
+            raise e
+        log.debug("Completing upload.")
+        response = self._api.complete_multipart_upload(
+            self._vault_name, upload_id, bytes_to_hex(tree_hash(hash_chunks)),
+            total_size)
+        log.debug("Upload finished.")
+        return response['ArchiveId']
+
+    def _wait_for_upload_threads(self, hash_chunks, result_queue, total_parts):
+        for _ in range(total_parts):
+            result = result_queue.get()
+            if isinstance(result, Exception):
+                log.debug("An error was found in the result queue, terminating "
+                          "threads: %s", result)
+                self._shutdown_threads()
+                raise UploadArchiveError("An error occurred while uploading "
+                                         "an archive: %s" % result)
+            # Each unit of work returns the tree hash for the given part
+            # number, which we use at the end to compute the tree hash of
+            # the entire archive.
+            part_number, tree_sha256 = result
+            hash_chunks[part_number] = tree_sha256
+        self._shutdown_threads()
+
+    def _start_upload_threads(self, result_queue, upload_id, worker_queue,
+                              filename):
+        log.debug("Starting threads.")
+        for _ in range(self._num_threads):
+            thread = UploadWorkerThread(self._api, self._vault_name, filename,
+                                        upload_id, worker_queue, result_queue)
+            time.sleep(0.2)
+            thread.start()
+            self._threads.append(thread)
+
+
+class TransferThread(threading.Thread):
+    def __init__(self, worker_queue, result_queue):
+        super(TransferThread, self).__init__()
+        self._worker_queue = worker_queue
+        self._result_queue = result_queue
+        # This value can be set externally by other objects
+        # to indicate that the thread should be shut down.
+        self.should_continue = True
+
+    def run(self):
+        while self.should_continue:
+            try:
+                work = self._worker_queue.get(timeout=1)
+            except Empty:
+                continue
+            if work is _END_SENTINEL:
+                self._cleanup()
+                return
+            result = self._process_chunk(work)
+            self._result_queue.put(result)
+        self._cleanup()
+
+    def _process_chunk(self, work):
+        pass
+
+    def _cleanup(self):
+        pass
+
+
+class UploadWorkerThread(TransferThread):
+    def __init__(self, api, vault_name, filename, upload_id,
+                 worker_queue, result_queue, num_retries=5,
+                 time_between_retries=5,
+                 retry_exceptions=Exception):
+        super(UploadWorkerThread, self).__init__(worker_queue, result_queue)
+        self._api = api
+        self._vault_name = vault_name
+        self._filename = filename
+        self._fileobj = open(filename, 'rb')
+        self._upload_id = upload_id
+        self._num_retries = num_retries
+        self._time_between_retries = time_between_retries
+        self._retry_exceptions = retry_exceptions
+
+    def _process_chunk(self, work):
+        result = None
+        for i in range(self._num_retries + 1):
+            try:
+                result = self._upload_chunk(work)
+                break
+            except self._retry_exceptions as e:
+                log.error("Exception caught uploading part number %s for "
+                          "vault %s, attempt: (%s / %s), filename: %s, "
+                          "exception: %s, msg: %s",
+                          work[0], self._vault_name, i + 1, self._num_retries + 1,
+                          self._filename, e.__class__, e)
+                time.sleep(self._time_between_retries)
+                result = e
+        return result
+
+    def _upload_chunk(self, work):
+        part_number, part_size = work
+        start_byte = part_number * part_size
+        self._fileobj.seek(start_byte)
+        contents = self._fileobj.read(part_size)
+        linear_hash = hashlib.sha256(contents).hexdigest()
+        tree_hash_bytes = tree_hash(chunk_hashes(contents))
+        byte_range = (start_byte, start_byte + len(contents) - 1)
+        log.debug("Uploading chunk %s of size %s", part_number, part_size)
+        response = self._api.upload_part(self._vault_name, self._upload_id,
+                                         linear_hash,
+                                         bytes_to_hex(tree_hash_bytes),
+                                         byte_range, contents)
+        # Reading the response allows the connection to be reused.
+        response.read()
+        return (part_number, tree_hash_bytes)
+
+    def _cleanup(self):
+        self._fileobj.close()
+
+
+class ConcurrentDownloader(ConcurrentTransferer):
+    """
+    Concurrently download an archive from glacier.
+
+    This class uses a thread pool to concurrently download an archive
+    from glacier.
+
+    The threadpool is completely managed by this class and is
+    transparent to the users of this class.
+
+    """
+    def __init__(self, job, part_size=DEFAULT_PART_SIZE,
+                 num_threads=10):
+        """
+        :param job: A layer2 job object for archive retrieval object.
+
+        :param part_size: The size, in bytes, of the chunks to use when uploading
+            the archive parts.  The part size must be a megabyte multiplied by
+            a power of two.
+
+        """
+        super(ConcurrentDownloader, self).__init__(part_size, num_threads)
+        self._job = job
+
+    def download(self, filename):
+        """
+        Concurrently download an archive.
+
+        :param filename: The filename to download the archive to
+        :type filename: str
+
+        """
+        total_size = self._job.archive_size
+        total_parts, part_size = self._calculate_required_part_size(total_size)
+        worker_queue = Queue()
+        result_queue = Queue()
+        self._add_work_items_to_queue(total_parts, worker_queue, part_size)
+        self._start_download_threads(result_queue, worker_queue)
+        try:
+            self._wait_for_download_threads(filename, result_queue, total_parts)
+        except DownloadArchiveError as e:
+            log.debug("An error occurred while downloading an archive: %s", e)
+            raise e
+        log.debug("Download completed.")
+
+    def _wait_for_download_threads(self, filename, result_queue, total_parts):
+        """
+        Waits until the result_queue is filled with all the downloaded parts
+        This indicates that all part downloads have completed
+
+        Saves downloaded parts into filename
+
+        :param filename:
+        :param result_queue:
+        :param total_parts:
+        """
+        hash_chunks = [None] * total_parts
+        with open(filename, "wb") as f:
+            for _ in range(total_parts):
+                result = result_queue.get()
+                if isinstance(result, Exception):
+                    log.debug("An error was found in the result queue, "
+                              "terminating threads: %s", result)
+                    self._shutdown_threads()
+                    raise DownloadArchiveError(
+                        "An error occurred while uploading "
+                        "an archive: %s" % result)
+                part_number, part_size, actual_hash, data = result
+                hash_chunks[part_number] = actual_hash
+                start_byte = part_number * part_size
+                f.seek(start_byte)
+                f.write(data)
+                f.flush()
+        final_hash = bytes_to_hex(tree_hash(hash_chunks))
+        log.debug("Verifying final tree hash of archive, expecting: %s, "
+                  "actual: %s", self._job.sha256_treehash, final_hash)
+        if self._job.sha256_treehash != final_hash:
+            self._shutdown_threads()
+            raise TreeHashDoesNotMatchError(
+                "Tree hash for entire archive does not match, "
+                "expected: %s, got: %s" % (self._job.sha256_treehash,
+                                           final_hash))
+        self._shutdown_threads()
+
+    def _start_download_threads(self, result_queue, worker_queue):
+        log.debug("Starting threads.")
+        for _ in range(self._num_threads):
+            thread = DownloadWorkerThread(self._job, worker_queue, result_queue)
+            time.sleep(0.2)
+            thread.start()
+            self._threads.append(thread)
+
+
+class DownloadWorkerThread(TransferThread):
+    def __init__(self, job,
+                 worker_queue, result_queue,
+                 num_retries=5,
+                 time_between_retries=5,
+                 retry_exceptions=Exception):
+        """
+        Individual download thread that will download parts of the file from Glacier. Parts
+        to download stored in work queue.
+
+        Parts download to a temp dir with each part a separate file
+
+        :param job: Glacier job object
+        :param work_queue: A queue of tuples which include the part_number and
+            part_size
+        :param result_queue: A priority queue of tuples which include the
+            part_number and the path to the temp file that holds that
+            part's data.
+
+        """
+        super(DownloadWorkerThread, self).__init__(worker_queue, result_queue)
+        self._job = job
+        self._num_retries = num_retries
+        self._time_between_retries = time_between_retries
+        self._retry_exceptions = retry_exceptions
+
+    def _process_chunk(self, work):
+        """
+        Attempt to download a part of the archive from Glacier
+        Store the result in the result_queue
+
+        :param work:
+        """
+        result = None
+        for _ in range(self._num_retries):
+            try:
+                result = self._download_chunk(work)
+                break
+            except self._retry_exceptions as e:
+                log.error("Exception caught downloading part number %s for "
+                          "job %s", work[0], self._job,)
+                time.sleep(self._time_between_retries)
+                result = e
+        return result
+
+    def _download_chunk(self, work):
+        """
+        Downloads a chunk of archive from Glacier. Saves the data to a temp file
+        Returns the part number and temp file location
+
+        :param work:
+        """
+        part_number, part_size = work
+        start_byte = part_number * part_size
+        byte_range = (start_byte, start_byte + part_size - 1)
+        log.debug("Downloading chunk %s of size %s", part_number, part_size)
+        response = self._job.get_output(byte_range)
+        data = response.read()
+        actual_hash = bytes_to_hex(tree_hash(chunk_hashes(data)))
+        if response['TreeHash'] != actual_hash:
+            raise TreeHashDoesNotMatchError(
+                "Tree hash for part number %s does not match, "
+                "expected: %s, got: %s" % (part_number, response['TreeHash'],
+                                           actual_hash))
+        return (part_number, part_size, binascii.unhexlify(actual_hash), data)