diff planemo/lib/python3.7/site-packages/boto/s3/resumable_download_handler.py @ 0:d30785e31577 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:18:57 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/planemo/lib/python3.7/site-packages/boto/s3/resumable_download_handler.py	Fri Jul 31 00:18:57 2020 -0400
@@ -0,0 +1,352 @@
+# Copyright 2010 Google Inc.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish, dis-
+# tribute, sublicense, and/or sell copies of the Software, and to permit
+# persons to whom the Software is furnished to do so, subject to the fol-
+# lowing conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+import errno
+import httplib
+import os
+import re
+import socket
+import time
+import boto
+from boto import config, storage_uri_for_key
+from boto.connection import AWSAuthConnection
+from boto.exception import ResumableDownloadException
+from boto.exception import ResumableTransferDisposition
+from boto.s3.keyfile import KeyFile
+from boto.gs.key import Key as GSKey
+
+"""
+Resumable download handler.
+
+Resumable downloads will retry failed downloads, resuming at the byte count
+completed by the last download attempt. If too many retries happen with no
+progress (per configurable num_retries param), the download will be aborted.
+
+The caller can optionally specify a tracker_file_name param in the
+ResumableDownloadHandler constructor. If you do this, that file will
+save the state needed to allow retrying later, in a separate process
+(e.g., in a later run of gsutil).
+
+Note that resumable downloads work across providers (they depend only
+on support Range GETs), but this code is in the boto.s3 package
+because it is the wrong abstraction level to go in the top-level boto
+package.
+
+TODO: At some point we should refactor the code to have a storage_service
+package where all these provider-independent files go.
+"""
+
+
+class ByteTranslatingCallbackHandler(object):
+    """
+    Proxy class that translates progress callbacks made by
+    boto.s3.Key.get_file(), taking into account that we're resuming
+    a download.
+    """
+    def __init__(self, proxied_cb, download_start_point):
+        self.proxied_cb = proxied_cb
+        self.download_start_point = download_start_point
+
+    def call(self, total_bytes_uploaded, total_size):
+        self.proxied_cb(self.download_start_point + total_bytes_uploaded,
+                        total_size)
+
+
+def get_cur_file_size(fp, position_to_eof=False):
+    """
+    Returns size of file, optionally leaving fp positioned at EOF.
+    """
+    if isinstance(fp, KeyFile) and not position_to_eof:
+        # Avoid EOF seek for KeyFile case as it's very inefficient.
+        return fp.getkey().size
+    if not position_to_eof:
+        cur_pos = fp.tell()
+    fp.seek(0, os.SEEK_END)
+    cur_file_size = fp.tell()
+    if not position_to_eof:
+        fp.seek(cur_pos, os.SEEK_SET)
+    return cur_file_size
+
+
+class ResumableDownloadHandler(object):
+    """
+    Handler for resumable downloads.
+    """
+
+    MIN_ETAG_LEN = 5
+
+    RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error,
+                            socket.gaierror)
+
+    def __init__(self, tracker_file_name=None, num_retries=None):
+        """
+        Constructor. Instantiate once for each downloaded file.
+
+        :type tracker_file_name: string
+        :param tracker_file_name: optional file name to save tracking info
+            about this download. If supplied and the current process fails
+            the download, it can be retried in a new process. If called
+            with an existing file containing an unexpired timestamp,
+            we'll resume the transfer for this file; else we'll start a
+            new resumable download.
+
+        :type num_retries: int
+        :param num_retries: the number of times we'll re-try a resumable
+            download making no progress. (Count resets every time we get
+            progress, so download can span many more than this number of
+            retries.)
+        """
+        self.tracker_file_name = tracker_file_name
+        self.num_retries = num_retries
+        self.etag_value_for_current_download = None
+        if tracker_file_name:
+            self._load_tracker_file_etag()
+        # Save download_start_point in instance state so caller can
+        # find how much was transferred by this ResumableDownloadHandler
+        # (across retries).
+        self.download_start_point = None
+
+    def _load_tracker_file_etag(self):
+        f = None
+        try:
+            f = open(self.tracker_file_name, 'r')
+            self.etag_value_for_current_download = f.readline().rstrip('\n')
+            # We used to match an MD5-based regex to ensure that the etag was
+            # read correctly. Since ETags need not be MD5s, we now do a simple
+            # length sanity check instead.
+            if len(self.etag_value_for_current_download) < self.MIN_ETAG_LEN:
+                print('Couldn\'t read etag in tracker file (%s). Restarting '
+                      'download from scratch.' % self.tracker_file_name)
+        except IOError as e:
+            # Ignore non-existent file (happens first time a download
+            # is attempted on an object), but warn user for other errors.
+            if e.errno != errno.ENOENT:
+                # Will restart because
+                # self.etag_value_for_current_download is None.
+                print('Couldn\'t read URI tracker file (%s): %s. Restarting '
+                      'download from scratch.' %
+                      (self.tracker_file_name, e.strerror))
+        finally:
+            if f:
+                f.close()
+
+    def _save_tracker_info(self, key):
+        self.etag_value_for_current_download = key.etag.strip('"\'')
+        if not self.tracker_file_name:
+            return
+        f = None
+        try:
+            f = open(self.tracker_file_name, 'w')
+            f.write('%s\n' % self.etag_value_for_current_download)
+        except IOError as e:
+            raise ResumableDownloadException(
+                'Couldn\'t write tracker file (%s): %s.\nThis can happen'
+                'if you\'re using an incorrectly configured download tool\n'
+                '(e.g., gsutil configured to save tracker files to an '
+                'unwritable directory)' %
+                (self.tracker_file_name, e.strerror),
+                ResumableTransferDisposition.ABORT)
+        finally:
+            if f:
+                f.close()
+
+    def _remove_tracker_file(self):
+        if (self.tracker_file_name and
+            os.path.exists(self.tracker_file_name)):
+                os.unlink(self.tracker_file_name)
+
+    def _attempt_resumable_download(self, key, fp, headers, cb, num_cb,
+                                    torrent, version_id, hash_algs):
+        """
+        Attempts a resumable download.
+
+        Raises ResumableDownloadException if any problems occur.
+        """
+        cur_file_size = get_cur_file_size(fp, position_to_eof=True)
+
+        if (cur_file_size and 
+            self.etag_value_for_current_download and
+            self.etag_value_for_current_download == key.etag.strip('"\'')):
+            # Try to resume existing transfer.
+            if cur_file_size > key.size:
+              raise ResumableDownloadException(
+                  '%s is larger (%d) than %s (%d).\nDeleting tracker file, so '
+                  'if you re-try this download it will start from scratch' %
+                  (fp.name, cur_file_size, str(storage_uri_for_key(key)),
+                   key.size), ResumableTransferDisposition.ABORT)
+            elif cur_file_size == key.size:
+                if key.bucket.connection.debug >= 1:
+                    print('Download complete.')
+                return
+            if key.bucket.connection.debug >= 1:
+                print('Resuming download.')
+            headers = headers.copy()
+            headers['Range'] = 'bytes=%d-%d' % (cur_file_size, key.size - 1)
+            cb = ByteTranslatingCallbackHandler(cb, cur_file_size).call
+            self.download_start_point = cur_file_size
+        else:
+            if key.bucket.connection.debug >= 1:
+                print('Starting new resumable download.')
+            self._save_tracker_info(key)
+            self.download_start_point = 0
+            # Truncate the file, in case a new resumable download is being
+            # started atop an existing file.
+            fp.truncate(0)
+
+        # Disable AWSAuthConnection-level retry behavior, since that would
+        # cause downloads to restart from scratch.
+        if isinstance(key, GSKey):
+          key.get_file(fp, headers, cb, num_cb, torrent, version_id,
+                       override_num_retries=0, hash_algs=hash_algs)
+        else:
+          key.get_file(fp, headers, cb, num_cb, torrent, version_id,
+                       override_num_retries=0)
+        fp.flush()
+
+    def get_file(self, key, fp, headers, cb=None, num_cb=10, torrent=False,
+                 version_id=None, hash_algs=None):
+        """
+        Retrieves a file from a Key
+        :type key: :class:`boto.s3.key.Key` or subclass
+        :param key: The Key object from which upload is to be downloaded
+        
+        :type fp: file
+        :param fp: File pointer into which data should be downloaded
+        
+        :type headers: string
+        :param: headers to send when retrieving the files
+        
+        :type cb: function
+        :param cb: (optional) a callback function that will be called to report
+             progress on the download.  The callback should accept two integer
+             parameters, the first representing the number of bytes that have
+             been successfully transmitted from the storage service and
+             the second representing the total number of bytes that need
+             to be transmitted.
+        
+        :type num_cb: int
+        :param num_cb: (optional) If a callback is specified with the cb
+             parameter this parameter determines the granularity of the callback
+             by defining the maximum number of times the callback will be
+             called during the file transfer.
+             
+        :type torrent: bool
+        :param torrent: Flag for whether to get a torrent for the file
+
+        :type version_id: string
+        :param version_id: The version ID (optional)
+
+        :type hash_algs: dictionary
+        :param hash_algs: (optional) Dictionary of hash algorithms and
+            corresponding hashing class that implements update() and digest().
+            Defaults to {'md5': hashlib/md5.md5}.
+
+        Raises ResumableDownloadException if a problem occurs during
+            the transfer.
+        """
+
+        debug = key.bucket.connection.debug
+        if not headers:
+            headers = {}
+
+        # Use num-retries from constructor if one was provided; else check
+        # for a value specified in the boto config file; else default to 6.
+        if self.num_retries is None:
+            self.num_retries = config.getint('Boto', 'num_retries', 6)
+        progress_less_iterations = 0
+
+        while True:  # Retry as long as we're making progress.
+            had_file_bytes_before_attempt = get_cur_file_size(fp)
+            try:
+                self._attempt_resumable_download(key, fp, headers, cb, num_cb,
+                                                 torrent, version_id, hash_algs)
+                # Download succceded, so remove the tracker file (if have one).
+                self._remove_tracker_file()
+                # Previously, check_final_md5() was called here to validate 
+                # downloaded file's checksum, however, to be consistent with
+                # non-resumable downloads, this call was removed. Checksum
+                # validation of file contents should be done by the caller.
+                if debug >= 1:
+                    print('Resumable download complete.')
+                return
+            except self.RETRYABLE_EXCEPTIONS as e:
+                if debug >= 1:
+                    print('Caught exception (%s)' % e.__repr__())
+                if isinstance(e, IOError) and e.errno == errno.EPIPE:
+                    # Broken pipe error causes httplib to immediately
+                    # close the socket (http://bugs.python.org/issue5542),
+                    # so we need to close and reopen the key before resuming
+                    # the download.
+                    if isinstance(key, GSKey):
+                      key.get_file(fp, headers, cb, num_cb, torrent, version_id,
+                                   override_num_retries=0, hash_algs=hash_algs)
+                    else:
+                      key.get_file(fp, headers, cb, num_cb, torrent, version_id,
+                                   override_num_retries=0)
+            except ResumableDownloadException as e:
+                if (e.disposition ==
+                    ResumableTransferDisposition.ABORT_CUR_PROCESS):
+                    if debug >= 1:
+                        print('Caught non-retryable ResumableDownloadException '
+                              '(%s)' % e.message)
+                    raise
+                elif (e.disposition ==
+                    ResumableTransferDisposition.ABORT):
+                    if debug >= 1:
+                        print('Caught non-retryable ResumableDownloadException '
+                              '(%s); aborting and removing tracker file' %
+                              e.message)
+                    self._remove_tracker_file()
+                    raise
+                else:
+                    if debug >= 1:
+                        print('Caught ResumableDownloadException (%s) - will '
+                              'retry' % e.message)
+
+            # At this point we had a re-tryable failure; see if made progress.
+            if get_cur_file_size(fp) > had_file_bytes_before_attempt:
+                progress_less_iterations = 0
+            else:
+                progress_less_iterations += 1
+
+            if progress_less_iterations > self.num_retries:
+                # Don't retry any longer in the current process.
+                raise ResumableDownloadException(
+                    'Too many resumable download attempts failed without '
+                    'progress. You might try this download again later',
+                    ResumableTransferDisposition.ABORT_CUR_PROCESS)
+
+            # Close the key, in case a previous download died partway
+            # through and left data in the underlying key HTTP buffer.
+            # Do this within a try/except block in case the connection is
+            # closed (since key.close() attempts to do a final read, in which
+            # case this read attempt would get an IncompleteRead exception,
+            # which we can safely ignore.
+            try:
+                key.close()
+            except httplib.IncompleteRead:
+                pass
+
+            sleep_time_secs = 2**progress_less_iterations
+            if debug >= 1:
+                print('Got retryable failure (%d progress-less in a row).\n'
+                      'Sleeping %d seconds before re-trying' %
+                      (progress_less_iterations, sleep_time_secs))
+            time.sleep(sleep_time_secs)