diff env/lib/python3.9/site-packages/boto/gs/resumable_upload_handler.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/boto/gs/resumable_upload_handler.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,679 @@
+# Copyright 2010 Google Inc.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish, dis-
+# tribute, sublicense, and/or sell copies of the Software, and to permit
+# persons to whom the Software is furnished to do so, subject to the fol-
+# lowing conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
+# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
+# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+# IN THE SOFTWARE.
+import errno
+import httplib
+import os
+import random
+import re
+import socket
+import time
+import urlparse
+from hashlib import md5
+from boto import config, UserAgent
+from boto.connection import AWSAuthConnection
+from boto.exception import InvalidUriError
+from boto.exception import ResumableTransferDisposition
+from boto.exception import ResumableUploadException
+from boto.s3.keyfile import KeyFile
+
+"""
+Handler for Google Cloud Storage resumable uploads. See
+http://code.google.com/apis/storage/docs/developer-guide.html#resumable
+for details.
+
+Resumable uploads will retry failed uploads, resuming at the byte
+count completed by the last upload attempt. If too many retries happen with
+no progress (per configurable num_retries param), the upload will be
+aborted in the current process.
+
+The caller can optionally specify a tracker_file_name param in the
+ResumableUploadHandler constructor. If you do this, that file will
+save the state needed to allow retrying later, in a separate process
+(e.g., in a later run of gsutil).
+"""
+
+
+class ResumableUploadHandler(object):
+
+    BUFFER_SIZE = 8192
+    RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error,
+                            socket.gaierror)
+
+    # (start, end) response indicating server has nothing (upload protocol uses
+    # inclusive numbering).
+    SERVER_HAS_NOTHING = (0, -1)
+
+    def __init__(self, tracker_file_name=None, num_retries=None):
+        """
+        Constructor. Instantiate once for each uploaded file.
+
+        :type tracker_file_name: string
+        :param tracker_file_name: optional file name to save tracker URI.
+            If supplied and the current process fails the upload, it can be
+            retried in a new process. If called with an existing file containing
+            a valid tracker URI, we'll resume the upload from this URI; else
+            we'll start a new resumable upload (and write the URI to this
+            tracker file).
+
+        :type num_retries: int
+        :param num_retries: the number of times we'll re-try a resumable upload
+            making no progress. (Count resets every time we get progress, so
+            upload can span many more than this number of retries.)
+        """
+        self.tracker_file_name = tracker_file_name
+        self.num_retries = num_retries
+        self.server_has_bytes = 0  # Byte count at last server check.
+        self.tracker_uri = None
+        if tracker_file_name:
+            self._load_tracker_uri_from_file()
+        # Save upload_start_point in instance state so caller can find how
+        # much was transferred by this ResumableUploadHandler (across retries).
+        self.upload_start_point = None
+
+    def _load_tracker_uri_from_file(self):
+        f = None
+        try:
+            f = open(self.tracker_file_name, 'r')
+            uri = f.readline().strip()
+            self._set_tracker_uri(uri)
+        except IOError as e:
+            # Ignore non-existent file (happens first time an upload
+            # is attempted on a file), but warn user for other errors.
+            if e.errno != errno.ENOENT:
+                # Will restart because self.tracker_uri is None.
+                print('Couldn\'t read URI tracker file (%s): %s. Restarting '
+                      'upload from scratch.' %
+                      (self.tracker_file_name, e.strerror))
+        except InvalidUriError as e:
+            # Warn user, but proceed (will restart because
+            # self.tracker_uri is None).
+            print('Invalid tracker URI (%s) found in URI tracker file '
+                  '(%s). Restarting upload from scratch.' %
+                  (uri, self.tracker_file_name))
+        finally:
+            if f:
+                f.close()
+
+    def _save_tracker_uri_to_file(self):
+        """
+        Saves URI to tracker file if one was passed to constructor.
+        """
+        if not self.tracker_file_name:
+            return
+        f = None
+        try:
+            with os.fdopen(os.open(self.tracker_file_name,
+                                   os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f:
+              f.write(self.tracker_uri)
+        except IOError as e:
+            raise ResumableUploadException(
+                'Couldn\'t write URI tracker file (%s): %s.\nThis can happen'
+                'if you\'re using an incorrectly configured upload tool\n'
+                '(e.g., gsutil configured to save tracker files to an '
+                'unwritable directory)' %
+                (self.tracker_file_name, e.strerror),
+                ResumableTransferDisposition.ABORT)
+
+    def _set_tracker_uri(self, uri):
+        """
+        Called when we start a new resumable upload or get a new tracker
+        URI for the upload. Saves URI and resets upload state.
+
+        Raises InvalidUriError if URI is syntactically invalid.
+        """
+        parse_result = urlparse.urlparse(uri)
+        if (parse_result.scheme.lower() not in ['http', 'https'] or
+            not parse_result.netloc):
+            raise InvalidUriError('Invalid tracker URI (%s)' % uri)
+        self.tracker_uri = uri
+        self.tracker_uri_host = parse_result.netloc
+        self.tracker_uri_path = '%s?%s' % (
+            parse_result.path, parse_result.query)
+        self.server_has_bytes = 0
+
+    def get_tracker_uri(self):
+        """
+        Returns upload tracker URI, or None if the upload has not yet started.
+        """
+        return self.tracker_uri
+
+    def get_upload_id(self):
+        """
+        Returns the upload ID for the resumable upload, or None if the upload
+        has not yet started.
+        """
+        # We extract the upload_id from the tracker uri. We could retrieve the
+        # upload_id from the headers in the response but this only works for
+        # the case where we get the tracker uri from the service. In the case
+        # where we get the tracker from the tracking file we need to do this
+        # logic anyway.
+        delim = '?upload_id='
+        if self.tracker_uri and delim in self.tracker_uri:
+          return self.tracker_uri[self.tracker_uri.index(delim) + len(delim):]
+        else:
+          return None
+
+    def _remove_tracker_file(self):
+        if (self.tracker_file_name and
+            os.path.exists(self.tracker_file_name)):
+                os.unlink(self.tracker_file_name)
+
+    def _build_content_range_header(self, range_spec='*', length_spec='*'):
+        return 'bytes %s/%s' % (range_spec, length_spec)
+
+    def _query_server_state(self, conn, file_length):
+        """
+        Queries server to find out state of given upload.
+
+        Note that this method really just makes special case use of the
+        fact that the upload server always returns the current start/end
+        state whenever a PUT doesn't complete.
+
+        Returns HTTP response from sending request.
+
+        Raises ResumableUploadException if problem querying server.
+        """
+        # Send an empty PUT so that server replies with this resumable
+        # transfer's state.
+        put_headers = {}
+        put_headers['Content-Range'] = (
+            self._build_content_range_header('*', file_length))
+        put_headers['Content-Length'] = '0'
+        return AWSAuthConnection.make_request(conn, 'PUT',
+                                              path=self.tracker_uri_path,
+                                              auth_path=self.tracker_uri_path,
+                                              headers=put_headers,
+                                              host=self.tracker_uri_host)
+
+    def _query_server_pos(self, conn, file_length):
+        """
+        Queries server to find out what bytes it currently has.
+
+        Returns (server_start, server_end), where the values are inclusive.
+        For example, (0, 2) would mean that the server has bytes 0, 1, *and* 2.
+
+        Raises ResumableUploadException if problem querying server.
+        """
+        resp = self._query_server_state(conn, file_length)
+        if resp.status == 200:
+            # To handle the boundary condition where the server has the complete
+            # file, we return (server_start, file_length-1). That way the
+            # calling code can always simply read up through server_end. (If we
+            # didn't handle this boundary condition here, the caller would have
+            # to check whether server_end == file_length and read one fewer byte
+            # in that case.)
+            return (0, file_length - 1)  # Completed upload.
+        if resp.status != 308:
+            # This means the server didn't have any state for the given
+            # upload ID, which can happen (for example) if the caller saved
+            # the tracker URI to a file and then tried to restart the transfer
+            # after that upload ID has gone stale. In that case we need to
+            # start a new transfer (and the caller will then save the new
+            # tracker URI to the tracker file).
+            raise ResumableUploadException(
+                'Got non-308 response (%s) from server state query' %
+                resp.status, ResumableTransferDisposition.START_OVER)
+        got_valid_response = False
+        range_spec = resp.getheader('range')
+        if range_spec:
+            # Parse 'bytes=<from>-<to>' range_spec.
+            m = re.search('bytes=(\d+)-(\d+)', range_spec)
+            if m:
+                server_start = long(m.group(1))
+                server_end = long(m.group(2))
+                got_valid_response = True
+        else:
+            # No Range header, which means the server does not yet have
+            # any bytes. Note that the Range header uses inclusive 'from'
+            # and 'to' values. Since Range 0-0 would mean that the server
+            # has byte 0, omitting the Range header is used to indicate that
+            # the server doesn't have any bytes.
+            return self.SERVER_HAS_NOTHING
+        if not got_valid_response:
+            raise ResumableUploadException(
+                'Couldn\'t parse upload server state query response (%s)' %
+                str(resp.getheaders()), ResumableTransferDisposition.START_OVER)
+        if conn.debug >= 1:
+            print('Server has: Range: %d - %d.' % (server_start, server_end))
+        return (server_start, server_end)
+
+    def _start_new_resumable_upload(self, key, headers=None):
+        """
+        Starts a new resumable upload.
+
+        Raises ResumableUploadException if any errors occur.
+        """
+        conn = key.bucket.connection
+        if conn.debug >= 1:
+            print('Starting new resumable upload.')
+        self.server_has_bytes = 0
+
+        # Start a new resumable upload by sending a POST request with an
+        # empty body and the "X-Goog-Resumable: start" header. Include any
+        # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length
+        # (and raise an exception if they tried to pass one, since it's
+        # a semantic error to specify it at this point, and if we were to
+        # include one now it would cause the server to expect that many
+        # bytes; the POST doesn't include the actual file bytes  We set
+        # the Content-Length in the subsequent PUT, based on the uploaded
+        # file size.
+        post_headers = {}
+        for k in headers:
+            if k.lower() == 'content-length':
+                raise ResumableUploadException(
+                    'Attempt to specify Content-Length header (disallowed)',
+                    ResumableTransferDisposition.ABORT)
+            post_headers[k] = headers[k]
+        post_headers[conn.provider.resumable_upload_header] = 'start'
+
+        resp = conn.make_request(
+            'POST', key.bucket.name, key.name, post_headers)
+        # Get tracker URI from response 'Location' header.
+        body = resp.read()
+
+        # Check for various status conditions.
+        if resp.status in [500, 503]:
+            # Retry status 500 and 503 errors after a delay.
+            raise ResumableUploadException(
+                'Got status %d from attempt to start resumable upload. '
+                'Will wait/retry' % resp.status,
+                ResumableTransferDisposition.WAIT_BEFORE_RETRY)
+        elif resp.status != 200 and resp.status != 201:
+            raise ResumableUploadException(
+                'Got status %d from attempt to start resumable upload. '
+                'Aborting' % resp.status,
+                ResumableTransferDisposition.ABORT)
+
+        # Else we got 200 or 201 response code, indicating the resumable
+        # upload was created.
+        tracker_uri = resp.getheader('Location')
+        if not tracker_uri:
+            raise ResumableUploadException(
+                'No resumable tracker URI found in resumable initiation '
+                'POST response (%s)' % body,
+                ResumableTransferDisposition.WAIT_BEFORE_RETRY)
+        self._set_tracker_uri(tracker_uri)
+        self._save_tracker_uri_to_file()
+
+    def _upload_file_bytes(self, conn, http_conn, fp, file_length,
+                           total_bytes_uploaded, cb, num_cb, headers):
+        """
+        Makes one attempt to upload file bytes, using an existing resumable
+        upload connection.
+
+        Returns (etag, generation, metageneration) from server upon success.
+
+        Raises ResumableUploadException if any problems occur.
+        """
+        buf = fp.read(self.BUFFER_SIZE)
+        if cb:
+            # The cb_count represents the number of full buffers to send between
+            # cb executions.
+            if num_cb > 2:
+                cb_count = file_length / self.BUFFER_SIZE / (num_cb-2)
+            elif num_cb < 0:
+                cb_count = -1
+            else:
+                cb_count = 0
+            i = 0
+            cb(total_bytes_uploaded, file_length)
+
+        # Build resumable upload headers for the transfer. Don't send a
+        # Content-Range header if the file is 0 bytes long, because the
+        # resumable upload protocol uses an *inclusive* end-range (so, sending
+        # 'bytes 0-0/1' would actually mean you're sending a 1-byte file).
+        if not headers:
+          put_headers = {}
+        else:
+          put_headers = headers.copy()
+        if file_length:
+            if total_bytes_uploaded == file_length:
+                range_header = self._build_content_range_header(
+                    '*', file_length)
+            else:
+                range_header = self._build_content_range_header(
+                    '%d-%d' % (total_bytes_uploaded, file_length - 1),
+                    file_length)
+            put_headers['Content-Range'] = range_header
+        # Set Content-Length to the total bytes we'll send with this PUT.
+        put_headers['Content-Length'] = str(file_length - total_bytes_uploaded)
+        http_request = AWSAuthConnection.build_base_http_request(
+            conn, 'PUT', path=self.tracker_uri_path, auth_path=None,
+            headers=put_headers, host=self.tracker_uri_host)
+        http_conn.putrequest('PUT', http_request.path)
+        for k in put_headers:
+            http_conn.putheader(k, put_headers[k])
+        http_conn.endheaders()
+
+        # Turn off debug on http connection so upload content isn't included
+        # in debug stream.
+        http_conn.set_debuglevel(0)
+        while buf:
+            http_conn.send(buf)
+            for alg in self.digesters:
+                self.digesters[alg].update(buf)
+            total_bytes_uploaded += len(buf)
+            if cb:
+                i += 1
+                if i == cb_count or cb_count == -1:
+                    cb(total_bytes_uploaded, file_length)
+                    i = 0
+            buf = fp.read(self.BUFFER_SIZE)
+        http_conn.set_debuglevel(conn.debug)
+        if cb:
+            cb(total_bytes_uploaded, file_length)
+        if total_bytes_uploaded != file_length:
+            # Abort (and delete the tracker file) so if the user retries
+            # they'll start a new resumable upload rather than potentially
+            # attempting to pick back up later where we left off.
+            raise ResumableUploadException(
+                'File changed during upload: EOF at %d bytes of %d byte file.' %
+                (total_bytes_uploaded, file_length),
+                ResumableTransferDisposition.ABORT)
+        resp = http_conn.getresponse()
+        # Restore http connection debug level.
+        http_conn.set_debuglevel(conn.debug)
+
+        if resp.status == 200:
+            # Success.
+            return (resp.getheader('etag'),
+                    resp.getheader('x-goog-generation'),
+                    resp.getheader('x-goog-metageneration'))
+        # Retry timeout (408) and status 500 and 503 errors after a delay.
+        elif resp.status in [408, 500, 503]:
+            disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY
+        else:
+            # Catch all for any other error codes.
+            disposition = ResumableTransferDisposition.ABORT
+        raise ResumableUploadException('Got response code %d while attempting '
+                                       'upload (%s)' %
+                                       (resp.status, resp.reason), disposition)
+
+    def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
+                                  num_cb):
+        """
+        Attempts a resumable upload.
+
+        Returns (etag, generation, metageneration) from server upon success.
+
+        Raises ResumableUploadException if any problems occur.
+        """
+        (server_start, server_end) = self.SERVER_HAS_NOTHING
+        conn = key.bucket.connection
+        if self.tracker_uri:
+            # Try to resume existing resumable upload.
+            try:
+                (server_start, server_end) = (
+                    self._query_server_pos(conn, file_length))
+                self.server_has_bytes = server_start
+
+                if server_end:
+                  # If the server already has some of the content, we need to
+                  # update the digesters with the bytes that have already been
+                  # uploaded to ensure we get a complete hash in the end.
+                  print('Catching up hash digest(s) for resumed upload')
+                  fp.seek(0)
+                  # Read local file's bytes through position server has. For
+                  # example, if server has (0, 3) we want to read 3-0+1=4 bytes.
+                  bytes_to_go = server_end + 1
+                  while bytes_to_go:
+                      chunk = fp.read(min(key.BufferSize, bytes_to_go))
+                      if not chunk:
+                          raise ResumableUploadException(
+                              'Hit end of file during resumable upload hash '
+                              'catchup. This should not happen under\n'
+                              'normal circumstances, as it indicates the '
+                              'server has more bytes of this transfer\nthan'
+                              ' the current file size. Restarting upload.',
+                              ResumableTransferDisposition.START_OVER)
+                      for alg in self.digesters:
+                          self.digesters[alg].update(chunk)
+                      bytes_to_go -= len(chunk)
+
+                if conn.debug >= 1:
+                    print('Resuming transfer.')
+            except ResumableUploadException as e:
+                if conn.debug >= 1:
+                    print('Unable to resume transfer (%s).' % e.message)
+                self._start_new_resumable_upload(key, headers)
+        else:
+            self._start_new_resumable_upload(key, headers)
+
+        # upload_start_point allows the code that instantiated the
+        # ResumableUploadHandler to find out the point from which it started
+        # uploading (e.g., so it can correctly compute throughput).
+        if self.upload_start_point is None:
+            self.upload_start_point = server_end
+
+        total_bytes_uploaded = server_end + 1
+        # Corner case: Don't attempt to seek if we've already uploaded the
+        # entire file, because if the file is a stream (e.g., the KeyFile
+        # wrapper around input key when copying between providers), attempting
+        # to seek to the end of file would result in an InvalidRange error.
+        if file_length < total_bytes_uploaded:
+          fp.seek(total_bytes_uploaded)
+        conn = key.bucket.connection
+
+        # Get a new HTTP connection (vs conn.get_http_connection(), which reuses
+        # pool connections) because httplib requires a new HTTP connection per
+        # transaction. (Without this, calling http_conn.getresponse() would get
+        # "ResponseNotReady".)
+        http_conn = conn.new_http_connection(self.tracker_uri_host, conn.port,
+                                             conn.is_secure)
+        http_conn.set_debuglevel(conn.debug)
+
+        # Make sure to close http_conn at end so if a local file read
+        # failure occurs partway through server will terminate current upload
+        # and can report that progress on next attempt.
+        try:
+            return self._upload_file_bytes(conn, http_conn, fp, file_length,
+                                           total_bytes_uploaded, cb, num_cb,
+                                           headers)
+        except (ResumableUploadException, socket.error):
+            resp = self._query_server_state(conn, file_length)
+            if resp.status == 400:
+                raise ResumableUploadException('Got 400 response from server '
+                    'state query after failed resumable upload attempt. This '
+                    'can happen for various reasons, including specifying an '
+                    'invalid request (e.g., an invalid canned ACL) or if the '
+                    'file size changed between upload attempts',
+                    ResumableTransferDisposition.ABORT)
+            else:
+                raise
+        finally:
+            http_conn.close()
+
+    def _check_final_md5(self, key, etag):
+        """
+        Checks that etag from server agrees with md5 computed before upload.
+        This is important, since the upload could have spanned a number of
+        hours and multiple processes (e.g., gsutil runs), and the user could
+        change some of the file and not realize they have inconsistent data.
+        """
+        if key.bucket.connection.debug >= 1:
+            print('Checking md5 against etag.')
+        if key.md5 != etag.strip('"\''):
+            # Call key.open_read() before attempting to delete the
+            # (incorrect-content) key, so we perform that request on a
+            # different HTTP connection. This is neededb because httplib
+            # will return a "Response not ready" error if you try to perform
+            # a second transaction on the connection.
+            key.open_read()
+            key.close()
+            key.delete()
+            raise ResumableUploadException(
+                'File changed during upload: md5 signature doesn\'t match etag '
+                '(incorrect uploaded object deleted)',
+                ResumableTransferDisposition.ABORT)
+
+    def handle_resumable_upload_exception(self, e, debug):
+        if (e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS):
+            if debug >= 1:
+                print('Caught non-retryable ResumableUploadException (%s); '
+                      'aborting but retaining tracker file' % e.message)
+            raise
+        elif (e.disposition == ResumableTransferDisposition.ABORT):
+            if debug >= 1:
+                print('Caught non-retryable ResumableUploadException (%s); '
+                      'aborting and removing tracker file' % e.message)
+            self._remove_tracker_file()
+            raise
+        else:
+            if debug >= 1:
+                print('Caught ResumableUploadException (%s) - will retry' %
+                      e.message)
+
+    def track_progress_less_iterations(self, server_had_bytes_before_attempt,
+                                       roll_back_md5=True, debug=0):
+        # At this point we had a re-tryable failure; see if made progress.
+        if self.server_has_bytes > server_had_bytes_before_attempt:
+            self.progress_less_iterations = 0   # If progress, reset counter.
+        else:
+            self.progress_less_iterations += 1
+            if roll_back_md5:
+                # Rollback any potential hash updates, as we did not
+                # make any progress in this iteration.
+                self.digesters = self.digesters_before_attempt
+
+        if self.progress_less_iterations > self.num_retries:
+            # Don't retry any longer in the current process.
+            raise ResumableUploadException(
+                    'Too many resumable upload attempts failed without '
+                    'progress. You might try this upload again later',
+                    ResumableTransferDisposition.ABORT_CUR_PROCESS)
+
+        # Use binary exponential backoff to desynchronize client requests.
+        sleep_time_secs = random.random() * (2**self.progress_less_iterations)
+        if debug >= 1:
+            print('Got retryable failure (%d progress-less in a row).\n'
+                   'Sleeping %3.1f seconds before re-trying' %
+                   (self.progress_less_iterations, sleep_time_secs))
+        time.sleep(sleep_time_secs)
+
+    def send_file(self, key, fp, headers, cb=None, num_cb=10, hash_algs=None):
+        """
+        Upload a file to a key into a bucket on GS, using GS resumable upload
+        protocol.
+
+        :type key: :class:`boto.s3.key.Key` or subclass
+        :param key: The Key object to which data is to be uploaded
+
+        :type fp: file-like object
+        :param fp: The file pointer to upload
+
+        :type headers: dict
+        :param headers: The headers to pass along with the PUT request
+
+        :type cb: function
+        :param cb: a callback function that will be called to report progress on
+            the upload.  The callback should accept two integer parameters, the
+            first representing the number of bytes that have been successfully
+            transmitted to GS, and the second representing the total number of
+            bytes that need to be transmitted.
+
+        :type num_cb: int
+        :param num_cb: (optional) If a callback is specified with the cb
+            parameter, this parameter determines the granularity of the callback
+            by defining the maximum number of times the callback will be called
+            during the file transfer. Providing a negative integer will cause
+            your callback to be called with each buffer read.
+
+        :type hash_algs: dictionary
+        :param hash_algs: (optional) Dictionary mapping hash algorithm
+            descriptions to corresponding state-ful hashing objects that
+            implement update(), digest(), and copy() (e.g. hashlib.md5()).
+            Defaults to {'md5': md5()}.
+
+        Raises ResumableUploadException if a problem occurs during the transfer.
+        """
+
+        if not headers:
+            headers = {}
+        # If Content-Type header is present and set to None, remove it.
+        # This is gsutil's way of asking boto to refrain from auto-generating
+        # that header.
+        CT = 'Content-Type'
+        if CT in headers and headers[CT] is None:
+            del headers[CT]
+
+        headers['User-Agent'] = UserAgent
+
+        # Determine file size different ways for case where fp is actually a
+        # wrapper around a Key vs an actual file.
+        if isinstance(fp, KeyFile):
+            file_length = fp.getkey().size
+        else:
+            fp.seek(0, os.SEEK_END)
+            file_length = fp.tell()
+            fp.seek(0)
+        debug = key.bucket.connection.debug
+
+        # Compute the MD5 checksum on the fly.
+        if hash_algs is None:
+            hash_algs = {'md5': md5}
+        self.digesters = dict(
+            (alg, hash_algs[alg]()) for alg in hash_algs or {})
+
+        # Use num-retries from constructor if one was provided; else check
+        # for a value specified in the boto config file; else default to 5.
+        if self.num_retries is None:
+            self.num_retries = config.getint('Boto', 'num_retries', 6)
+        self.progress_less_iterations = 0
+
+        while True:  # Retry as long as we're making progress.
+            server_had_bytes_before_attempt = self.server_has_bytes
+            self.digesters_before_attempt = dict(
+                (alg, self.digesters[alg].copy())
+                for alg in self.digesters)
+            try:
+                # Save generation and metageneration in class state so caller
+                # can find these values, for use in preconditions of future
+                # operations on the uploaded object.
+                (etag, self.generation, self.metageneration) = (
+                    self._attempt_resumable_upload(key, fp, file_length,
+                                                   headers, cb, num_cb))
+
+                # Get the final digests for the uploaded content.
+                for alg in self.digesters:
+                    key.local_hashes[alg] = self.digesters[alg].digest()
+
+                # Upload succceded, so remove the tracker file (if have one).
+                self._remove_tracker_file()
+                self._check_final_md5(key, etag)
+                key.generation = self.generation
+                if debug >= 1:
+                    print('Resumable upload complete.')
+                return
+            except self.RETRYABLE_EXCEPTIONS as e:
+                if debug >= 1:
+                    print('Caught exception (%s)' % e.__repr__())
+                if isinstance(e, IOError) and e.errno == errno.EPIPE:
+                    # Broken pipe error causes httplib to immediately
+                    # close the socket (http://bugs.python.org/issue5542),
+                    # so we need to close the connection before we resume
+                    # the upload (which will cause a new connection to be
+                    # opened the next time an HTTP request is sent).
+                    key.bucket.connection.connection.close()
+            except ResumableUploadException as e:
+                self.handle_resumable_upload_exception(e, debug)
+
+            self.track_progress_less_iterations(server_had_bytes_before_attempt,
+                                                True, debug)