comparison env/lib/python3.9/site-packages/boto/gs/resumable_upload_handler.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # Copyright 2010 Google Inc.
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions:
10 #
11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE.
21 import errno
22 import httplib
23 import os
24 import random
25 import re
26 import socket
27 import time
28 import urlparse
29 from hashlib import md5
30 from boto import config, UserAgent
31 from boto.connection import AWSAuthConnection
32 from boto.exception import InvalidUriError
33 from boto.exception import ResumableTransferDisposition
34 from boto.exception import ResumableUploadException
35 from boto.s3.keyfile import KeyFile
36
37 """
38 Handler for Google Cloud Storage resumable uploads. See
39 http://code.google.com/apis/storage/docs/developer-guide.html#resumable
40 for details.
41
42 Resumable uploads will retry failed uploads, resuming at the byte
43 count completed by the last upload attempt. If too many retries happen with
44 no progress (per configurable num_retries param), the upload will be
45 aborted in the current process.
46
47 The caller can optionally specify a tracker_file_name param in the
48 ResumableUploadHandler constructor. If you do this, that file will
49 save the state needed to allow retrying later, in a separate process
50 (e.g., in a later run of gsutil).
51 """
52
53
54 class ResumableUploadHandler(object):
55
56 BUFFER_SIZE = 8192
57 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error,
58 socket.gaierror)
59
60 # (start, end) response indicating server has nothing (upload protocol uses
61 # inclusive numbering).
62 SERVER_HAS_NOTHING = (0, -1)
63
64 def __init__(self, tracker_file_name=None, num_retries=None):
65 """
66 Constructor. Instantiate once for each uploaded file.
67
68 :type tracker_file_name: string
69 :param tracker_file_name: optional file name to save tracker URI.
70 If supplied and the current process fails the upload, it can be
71 retried in a new process. If called with an existing file containing
72 a valid tracker URI, we'll resume the upload from this URI; else
73 we'll start a new resumable upload (and write the URI to this
74 tracker file).
75
76 :type num_retries: int
77 :param num_retries: the number of times we'll re-try a resumable upload
78 making no progress. (Count resets every time we get progress, so
79 upload can span many more than this number of retries.)
80 """
81 self.tracker_file_name = tracker_file_name
82 self.num_retries = num_retries
83 self.server_has_bytes = 0 # Byte count at last server check.
84 self.tracker_uri = None
85 if tracker_file_name:
86 self._load_tracker_uri_from_file()
87 # Save upload_start_point in instance state so caller can find how
88 # much was transferred by this ResumableUploadHandler (across retries).
89 self.upload_start_point = None
90
91 def _load_tracker_uri_from_file(self):
92 f = None
93 try:
94 f = open(self.tracker_file_name, 'r')
95 uri = f.readline().strip()
96 self._set_tracker_uri(uri)
97 except IOError as e:
98 # Ignore non-existent file (happens first time an upload
99 # is attempted on a file), but warn user for other errors.
100 if e.errno != errno.ENOENT:
101 # Will restart because self.tracker_uri is None.
102 print('Couldn\'t read URI tracker file (%s): %s. Restarting '
103 'upload from scratch.' %
104 (self.tracker_file_name, e.strerror))
105 except InvalidUriError as e:
106 # Warn user, but proceed (will restart because
107 # self.tracker_uri is None).
108 print('Invalid tracker URI (%s) found in URI tracker file '
109 '(%s). Restarting upload from scratch.' %
110 (uri, self.tracker_file_name))
111 finally:
112 if f:
113 f.close()
114
115 def _save_tracker_uri_to_file(self):
116 """
117 Saves URI to tracker file if one was passed to constructor.
118 """
119 if not self.tracker_file_name:
120 return
121 f = None
122 try:
123 with os.fdopen(os.open(self.tracker_file_name,
124 os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f:
125 f.write(self.tracker_uri)
126 except IOError as e:
127 raise ResumableUploadException(
128 'Couldn\'t write URI tracker file (%s): %s.\nThis can happen'
129 'if you\'re using an incorrectly configured upload tool\n'
130 '(e.g., gsutil configured to save tracker files to an '
131 'unwritable directory)' %
132 (self.tracker_file_name, e.strerror),
133 ResumableTransferDisposition.ABORT)
134
135 def _set_tracker_uri(self, uri):
136 """
137 Called when we start a new resumable upload or get a new tracker
138 URI for the upload. Saves URI and resets upload state.
139
140 Raises InvalidUriError if URI is syntactically invalid.
141 """
142 parse_result = urlparse.urlparse(uri)
143 if (parse_result.scheme.lower() not in ['http', 'https'] or
144 not parse_result.netloc):
145 raise InvalidUriError('Invalid tracker URI (%s)' % uri)
146 self.tracker_uri = uri
147 self.tracker_uri_host = parse_result.netloc
148 self.tracker_uri_path = '%s?%s' % (
149 parse_result.path, parse_result.query)
150 self.server_has_bytes = 0
151
152 def get_tracker_uri(self):
153 """
154 Returns upload tracker URI, or None if the upload has not yet started.
155 """
156 return self.tracker_uri
157
158 def get_upload_id(self):
159 """
160 Returns the upload ID for the resumable upload, or None if the upload
161 has not yet started.
162 """
163 # We extract the upload_id from the tracker uri. We could retrieve the
164 # upload_id from the headers in the response but this only works for
165 # the case where we get the tracker uri from the service. In the case
166 # where we get the tracker from the tracking file we need to do this
167 # logic anyway.
168 delim = '?upload_id='
169 if self.tracker_uri and delim in self.tracker_uri:
170 return self.tracker_uri[self.tracker_uri.index(delim) + len(delim):]
171 else:
172 return None
173
174 def _remove_tracker_file(self):
175 if (self.tracker_file_name and
176 os.path.exists(self.tracker_file_name)):
177 os.unlink(self.tracker_file_name)
178
179 def _build_content_range_header(self, range_spec='*', length_spec='*'):
180 return 'bytes %s/%s' % (range_spec, length_spec)
181
182 def _query_server_state(self, conn, file_length):
183 """
184 Queries server to find out state of given upload.
185
186 Note that this method really just makes special case use of the
187 fact that the upload server always returns the current start/end
188 state whenever a PUT doesn't complete.
189
190 Returns HTTP response from sending request.
191
192 Raises ResumableUploadException if problem querying server.
193 """
194 # Send an empty PUT so that server replies with this resumable
195 # transfer's state.
196 put_headers = {}
197 put_headers['Content-Range'] = (
198 self._build_content_range_header('*', file_length))
199 put_headers['Content-Length'] = '0'
200 return AWSAuthConnection.make_request(conn, 'PUT',
201 path=self.tracker_uri_path,
202 auth_path=self.tracker_uri_path,
203 headers=put_headers,
204 host=self.tracker_uri_host)
205
206 def _query_server_pos(self, conn, file_length):
207 """
208 Queries server to find out what bytes it currently has.
209
210 Returns (server_start, server_end), where the values are inclusive.
211 For example, (0, 2) would mean that the server has bytes 0, 1, *and* 2.
212
213 Raises ResumableUploadException if problem querying server.
214 """
215 resp = self._query_server_state(conn, file_length)
216 if resp.status == 200:
217 # To handle the boundary condition where the server has the complete
218 # file, we return (server_start, file_length-1). That way the
219 # calling code can always simply read up through server_end. (If we
220 # didn't handle this boundary condition here, the caller would have
221 # to check whether server_end == file_length and read one fewer byte
222 # in that case.)
223 return (0, file_length - 1) # Completed upload.
224 if resp.status != 308:
225 # This means the server didn't have any state for the given
226 # upload ID, which can happen (for example) if the caller saved
227 # the tracker URI to a file and then tried to restart the transfer
228 # after that upload ID has gone stale. In that case we need to
229 # start a new transfer (and the caller will then save the new
230 # tracker URI to the tracker file).
231 raise ResumableUploadException(
232 'Got non-308 response (%s) from server state query' %
233 resp.status, ResumableTransferDisposition.START_OVER)
234 got_valid_response = False
235 range_spec = resp.getheader('range')
236 if range_spec:
237 # Parse 'bytes=<from>-<to>' range_spec.
238 m = re.search('bytes=(\d+)-(\d+)', range_spec)
239 if m:
240 server_start = long(m.group(1))
241 server_end = long(m.group(2))
242 got_valid_response = True
243 else:
244 # No Range header, which means the server does not yet have
245 # any bytes. Note that the Range header uses inclusive 'from'
246 # and 'to' values. Since Range 0-0 would mean that the server
247 # has byte 0, omitting the Range header is used to indicate that
248 # the server doesn't have any bytes.
249 return self.SERVER_HAS_NOTHING
250 if not got_valid_response:
251 raise ResumableUploadException(
252 'Couldn\'t parse upload server state query response (%s)' %
253 str(resp.getheaders()), ResumableTransferDisposition.START_OVER)
254 if conn.debug >= 1:
255 print('Server has: Range: %d - %d.' % (server_start, server_end))
256 return (server_start, server_end)
257
258 def _start_new_resumable_upload(self, key, headers=None):
259 """
260 Starts a new resumable upload.
261
262 Raises ResumableUploadException if any errors occur.
263 """
264 conn = key.bucket.connection
265 if conn.debug >= 1:
266 print('Starting new resumable upload.')
267 self.server_has_bytes = 0
268
269 # Start a new resumable upload by sending a POST request with an
270 # empty body and the "X-Goog-Resumable: start" header. Include any
271 # caller-provided headers (e.g., Content-Type) EXCEPT Content-Length
272 # (and raise an exception if they tried to pass one, since it's
273 # a semantic error to specify it at this point, and if we were to
274 # include one now it would cause the server to expect that many
275 # bytes; the POST doesn't include the actual file bytes We set
276 # the Content-Length in the subsequent PUT, based on the uploaded
277 # file size.
278 post_headers = {}
279 for k in headers:
280 if k.lower() == 'content-length':
281 raise ResumableUploadException(
282 'Attempt to specify Content-Length header (disallowed)',
283 ResumableTransferDisposition.ABORT)
284 post_headers[k] = headers[k]
285 post_headers[conn.provider.resumable_upload_header] = 'start'
286
287 resp = conn.make_request(
288 'POST', key.bucket.name, key.name, post_headers)
289 # Get tracker URI from response 'Location' header.
290 body = resp.read()
291
292 # Check for various status conditions.
293 if resp.status in [500, 503]:
294 # Retry status 500 and 503 errors after a delay.
295 raise ResumableUploadException(
296 'Got status %d from attempt to start resumable upload. '
297 'Will wait/retry' % resp.status,
298 ResumableTransferDisposition.WAIT_BEFORE_RETRY)
299 elif resp.status != 200 and resp.status != 201:
300 raise ResumableUploadException(
301 'Got status %d from attempt to start resumable upload. '
302 'Aborting' % resp.status,
303 ResumableTransferDisposition.ABORT)
304
305 # Else we got 200 or 201 response code, indicating the resumable
306 # upload was created.
307 tracker_uri = resp.getheader('Location')
308 if not tracker_uri:
309 raise ResumableUploadException(
310 'No resumable tracker URI found in resumable initiation '
311 'POST response (%s)' % body,
312 ResumableTransferDisposition.WAIT_BEFORE_RETRY)
313 self._set_tracker_uri(tracker_uri)
314 self._save_tracker_uri_to_file()
315
316 def _upload_file_bytes(self, conn, http_conn, fp, file_length,
317 total_bytes_uploaded, cb, num_cb, headers):
318 """
319 Makes one attempt to upload file bytes, using an existing resumable
320 upload connection.
321
322 Returns (etag, generation, metageneration) from server upon success.
323
324 Raises ResumableUploadException if any problems occur.
325 """
326 buf = fp.read(self.BUFFER_SIZE)
327 if cb:
328 # The cb_count represents the number of full buffers to send between
329 # cb executions.
330 if num_cb > 2:
331 cb_count = file_length / self.BUFFER_SIZE / (num_cb-2)
332 elif num_cb < 0:
333 cb_count = -1
334 else:
335 cb_count = 0
336 i = 0
337 cb(total_bytes_uploaded, file_length)
338
339 # Build resumable upload headers for the transfer. Don't send a
340 # Content-Range header if the file is 0 bytes long, because the
341 # resumable upload protocol uses an *inclusive* end-range (so, sending
342 # 'bytes 0-0/1' would actually mean you're sending a 1-byte file).
343 if not headers:
344 put_headers = {}
345 else:
346 put_headers = headers.copy()
347 if file_length:
348 if total_bytes_uploaded == file_length:
349 range_header = self._build_content_range_header(
350 '*', file_length)
351 else:
352 range_header = self._build_content_range_header(
353 '%d-%d' % (total_bytes_uploaded, file_length - 1),
354 file_length)
355 put_headers['Content-Range'] = range_header
356 # Set Content-Length to the total bytes we'll send with this PUT.
357 put_headers['Content-Length'] = str(file_length - total_bytes_uploaded)
358 http_request = AWSAuthConnection.build_base_http_request(
359 conn, 'PUT', path=self.tracker_uri_path, auth_path=None,
360 headers=put_headers, host=self.tracker_uri_host)
361 http_conn.putrequest('PUT', http_request.path)
362 for k in put_headers:
363 http_conn.putheader(k, put_headers[k])
364 http_conn.endheaders()
365
366 # Turn off debug on http connection so upload content isn't included
367 # in debug stream.
368 http_conn.set_debuglevel(0)
369 while buf:
370 http_conn.send(buf)
371 for alg in self.digesters:
372 self.digesters[alg].update(buf)
373 total_bytes_uploaded += len(buf)
374 if cb:
375 i += 1
376 if i == cb_count or cb_count == -1:
377 cb(total_bytes_uploaded, file_length)
378 i = 0
379 buf = fp.read(self.BUFFER_SIZE)
380 http_conn.set_debuglevel(conn.debug)
381 if cb:
382 cb(total_bytes_uploaded, file_length)
383 if total_bytes_uploaded != file_length:
384 # Abort (and delete the tracker file) so if the user retries
385 # they'll start a new resumable upload rather than potentially
386 # attempting to pick back up later where we left off.
387 raise ResumableUploadException(
388 'File changed during upload: EOF at %d bytes of %d byte file.' %
389 (total_bytes_uploaded, file_length),
390 ResumableTransferDisposition.ABORT)
391 resp = http_conn.getresponse()
392 # Restore http connection debug level.
393 http_conn.set_debuglevel(conn.debug)
394
395 if resp.status == 200:
396 # Success.
397 return (resp.getheader('etag'),
398 resp.getheader('x-goog-generation'),
399 resp.getheader('x-goog-metageneration'))
400 # Retry timeout (408) and status 500 and 503 errors after a delay.
401 elif resp.status in [408, 500, 503]:
402 disposition = ResumableTransferDisposition.WAIT_BEFORE_RETRY
403 else:
404 # Catch all for any other error codes.
405 disposition = ResumableTransferDisposition.ABORT
406 raise ResumableUploadException('Got response code %d while attempting '
407 'upload (%s)' %
408 (resp.status, resp.reason), disposition)
409
410 def _attempt_resumable_upload(self, key, fp, file_length, headers, cb,
411 num_cb):
412 """
413 Attempts a resumable upload.
414
415 Returns (etag, generation, metageneration) from server upon success.
416
417 Raises ResumableUploadException if any problems occur.
418 """
419 (server_start, server_end) = self.SERVER_HAS_NOTHING
420 conn = key.bucket.connection
421 if self.tracker_uri:
422 # Try to resume existing resumable upload.
423 try:
424 (server_start, server_end) = (
425 self._query_server_pos(conn, file_length))
426 self.server_has_bytes = server_start
427
428 if server_end:
429 # If the server already has some of the content, we need to
430 # update the digesters with the bytes that have already been
431 # uploaded to ensure we get a complete hash in the end.
432 print('Catching up hash digest(s) for resumed upload')
433 fp.seek(0)
434 # Read local file's bytes through position server has. For
435 # example, if server has (0, 3) we want to read 3-0+1=4 bytes.
436 bytes_to_go = server_end + 1
437 while bytes_to_go:
438 chunk = fp.read(min(key.BufferSize, bytes_to_go))
439 if not chunk:
440 raise ResumableUploadException(
441 'Hit end of file during resumable upload hash '
442 'catchup. This should not happen under\n'
443 'normal circumstances, as it indicates the '
444 'server has more bytes of this transfer\nthan'
445 ' the current file size. Restarting upload.',
446 ResumableTransferDisposition.START_OVER)
447 for alg in self.digesters:
448 self.digesters[alg].update(chunk)
449 bytes_to_go -= len(chunk)
450
451 if conn.debug >= 1:
452 print('Resuming transfer.')
453 except ResumableUploadException as e:
454 if conn.debug >= 1:
455 print('Unable to resume transfer (%s).' % e.message)
456 self._start_new_resumable_upload(key, headers)
457 else:
458 self._start_new_resumable_upload(key, headers)
459
460 # upload_start_point allows the code that instantiated the
461 # ResumableUploadHandler to find out the point from which it started
462 # uploading (e.g., so it can correctly compute throughput).
463 if self.upload_start_point is None:
464 self.upload_start_point = server_end
465
466 total_bytes_uploaded = server_end + 1
467 # Corner case: Don't attempt to seek if we've already uploaded the
468 # entire file, because if the file is a stream (e.g., the KeyFile
469 # wrapper around input key when copying between providers), attempting
470 # to seek to the end of file would result in an InvalidRange error.
471 if file_length < total_bytes_uploaded:
472 fp.seek(total_bytes_uploaded)
473 conn = key.bucket.connection
474
475 # Get a new HTTP connection (vs conn.get_http_connection(), which reuses
476 # pool connections) because httplib requires a new HTTP connection per
477 # transaction. (Without this, calling http_conn.getresponse() would get
478 # "ResponseNotReady".)
479 http_conn = conn.new_http_connection(self.tracker_uri_host, conn.port,
480 conn.is_secure)
481 http_conn.set_debuglevel(conn.debug)
482
483 # Make sure to close http_conn at end so if a local file read
484 # failure occurs partway through server will terminate current upload
485 # and can report that progress on next attempt.
486 try:
487 return self._upload_file_bytes(conn, http_conn, fp, file_length,
488 total_bytes_uploaded, cb, num_cb,
489 headers)
490 except (ResumableUploadException, socket.error):
491 resp = self._query_server_state(conn, file_length)
492 if resp.status == 400:
493 raise ResumableUploadException('Got 400 response from server '
494 'state query after failed resumable upload attempt. This '
495 'can happen for various reasons, including specifying an '
496 'invalid request (e.g., an invalid canned ACL) or if the '
497 'file size changed between upload attempts',
498 ResumableTransferDisposition.ABORT)
499 else:
500 raise
501 finally:
502 http_conn.close()
503
504 def _check_final_md5(self, key, etag):
505 """
506 Checks that etag from server agrees with md5 computed before upload.
507 This is important, since the upload could have spanned a number of
508 hours and multiple processes (e.g., gsutil runs), and the user could
509 change some of the file and not realize they have inconsistent data.
510 """
511 if key.bucket.connection.debug >= 1:
512 print('Checking md5 against etag.')
513 if key.md5 != etag.strip('"\''):
514 # Call key.open_read() before attempting to delete the
515 # (incorrect-content) key, so we perform that request on a
516 # different HTTP connection. This is neededb because httplib
517 # will return a "Response not ready" error if you try to perform
518 # a second transaction on the connection.
519 key.open_read()
520 key.close()
521 key.delete()
522 raise ResumableUploadException(
523 'File changed during upload: md5 signature doesn\'t match etag '
524 '(incorrect uploaded object deleted)',
525 ResumableTransferDisposition.ABORT)
526
527 def handle_resumable_upload_exception(self, e, debug):
528 if (e.disposition == ResumableTransferDisposition.ABORT_CUR_PROCESS):
529 if debug >= 1:
530 print('Caught non-retryable ResumableUploadException (%s); '
531 'aborting but retaining tracker file' % e.message)
532 raise
533 elif (e.disposition == ResumableTransferDisposition.ABORT):
534 if debug >= 1:
535 print('Caught non-retryable ResumableUploadException (%s); '
536 'aborting and removing tracker file' % e.message)
537 self._remove_tracker_file()
538 raise
539 else:
540 if debug >= 1:
541 print('Caught ResumableUploadException (%s) - will retry' %
542 e.message)
543
544 def track_progress_less_iterations(self, server_had_bytes_before_attempt,
545 roll_back_md5=True, debug=0):
546 # At this point we had a re-tryable failure; see if made progress.
547 if self.server_has_bytes > server_had_bytes_before_attempt:
548 self.progress_less_iterations = 0 # If progress, reset counter.
549 else:
550 self.progress_less_iterations += 1
551 if roll_back_md5:
552 # Rollback any potential hash updates, as we did not
553 # make any progress in this iteration.
554 self.digesters = self.digesters_before_attempt
555
556 if self.progress_less_iterations > self.num_retries:
557 # Don't retry any longer in the current process.
558 raise ResumableUploadException(
559 'Too many resumable upload attempts failed without '
560 'progress. You might try this upload again later',
561 ResumableTransferDisposition.ABORT_CUR_PROCESS)
562
563 # Use binary exponential backoff to desynchronize client requests.
564 sleep_time_secs = random.random() * (2**self.progress_less_iterations)
565 if debug >= 1:
566 print('Got retryable failure (%d progress-less in a row).\n'
567 'Sleeping %3.1f seconds before re-trying' %
568 (self.progress_less_iterations, sleep_time_secs))
569 time.sleep(sleep_time_secs)
570
571 def send_file(self, key, fp, headers, cb=None, num_cb=10, hash_algs=None):
572 """
573 Upload a file to a key into a bucket on GS, using GS resumable upload
574 protocol.
575
576 :type key: :class:`boto.s3.key.Key` or subclass
577 :param key: The Key object to which data is to be uploaded
578
579 :type fp: file-like object
580 :param fp: The file pointer to upload
581
582 :type headers: dict
583 :param headers: The headers to pass along with the PUT request
584
585 :type cb: function
586 :param cb: a callback function that will be called to report progress on
587 the upload. The callback should accept two integer parameters, the
588 first representing the number of bytes that have been successfully
589 transmitted to GS, and the second representing the total number of
590 bytes that need to be transmitted.
591
592 :type num_cb: int
593 :param num_cb: (optional) If a callback is specified with the cb
594 parameter, this parameter determines the granularity of the callback
595 by defining the maximum number of times the callback will be called
596 during the file transfer. Providing a negative integer will cause
597 your callback to be called with each buffer read.
598
599 :type hash_algs: dictionary
600 :param hash_algs: (optional) Dictionary mapping hash algorithm
601 descriptions to corresponding state-ful hashing objects that
602 implement update(), digest(), and copy() (e.g. hashlib.md5()).
603 Defaults to {'md5': md5()}.
604
605 Raises ResumableUploadException if a problem occurs during the transfer.
606 """
607
608 if not headers:
609 headers = {}
610 # If Content-Type header is present and set to None, remove it.
611 # This is gsutil's way of asking boto to refrain from auto-generating
612 # that header.
613 CT = 'Content-Type'
614 if CT in headers and headers[CT] is None:
615 del headers[CT]
616
617 headers['User-Agent'] = UserAgent
618
619 # Determine file size different ways for case where fp is actually a
620 # wrapper around a Key vs an actual file.
621 if isinstance(fp, KeyFile):
622 file_length = fp.getkey().size
623 else:
624 fp.seek(0, os.SEEK_END)
625 file_length = fp.tell()
626 fp.seek(0)
627 debug = key.bucket.connection.debug
628
629 # Compute the MD5 checksum on the fly.
630 if hash_algs is None:
631 hash_algs = {'md5': md5}
632 self.digesters = dict(
633 (alg, hash_algs[alg]()) for alg in hash_algs or {})
634
635 # Use num-retries from constructor if one was provided; else check
636 # for a value specified in the boto config file; else default to 5.
637 if self.num_retries is None:
638 self.num_retries = config.getint('Boto', 'num_retries', 6)
639 self.progress_less_iterations = 0
640
641 while True: # Retry as long as we're making progress.
642 server_had_bytes_before_attempt = self.server_has_bytes
643 self.digesters_before_attempt = dict(
644 (alg, self.digesters[alg].copy())
645 for alg in self.digesters)
646 try:
647 # Save generation and metageneration in class state so caller
648 # can find these values, for use in preconditions of future
649 # operations on the uploaded object.
650 (etag, self.generation, self.metageneration) = (
651 self._attempt_resumable_upload(key, fp, file_length,
652 headers, cb, num_cb))
653
654 # Get the final digests for the uploaded content.
655 for alg in self.digesters:
656 key.local_hashes[alg] = self.digesters[alg].digest()
657
658 # Upload succceded, so remove the tracker file (if have one).
659 self._remove_tracker_file()
660 self._check_final_md5(key, etag)
661 key.generation = self.generation
662 if debug >= 1:
663 print('Resumable upload complete.')
664 return
665 except self.RETRYABLE_EXCEPTIONS as e:
666 if debug >= 1:
667 print('Caught exception (%s)' % e.__repr__())
668 if isinstance(e, IOError) and e.errno == errno.EPIPE:
669 # Broken pipe error causes httplib to immediately
670 # close the socket (http://bugs.python.org/issue5542),
671 # so we need to close the connection before we resume
672 # the upload (which will cause a new connection to be
673 # opened the next time an HTTP request is sent).
674 key.bucket.connection.connection.close()
675 except ResumableUploadException as e:
676 self.handle_resumable_upload_exception(e, debug)
677
678 self.track_progress_less_iterations(server_had_bytes_before_attempt,
679 True, debug)