comparison planemo/lib/python3.7/site-packages/boto/s3/resumable_download_handler.py @ 0:d30785e31577 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:18:57 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d30785e31577
1 # Copyright 2010 Google Inc.
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions:
10 #
11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE.
21 import errno
22 import httplib
23 import os
24 import re
25 import socket
26 import time
27 import boto
28 from boto import config, storage_uri_for_key
29 from boto.connection import AWSAuthConnection
30 from boto.exception import ResumableDownloadException
31 from boto.exception import ResumableTransferDisposition
32 from boto.s3.keyfile import KeyFile
33 from boto.gs.key import Key as GSKey
34
35 """
36 Resumable download handler.
37
38 Resumable downloads will retry failed downloads, resuming at the byte count
39 completed by the last download attempt. If too many retries happen with no
40 progress (per configurable num_retries param), the download will be aborted.
41
42 The caller can optionally specify a tracker_file_name param in the
43 ResumableDownloadHandler constructor. If you do this, that file will
44 save the state needed to allow retrying later, in a separate process
45 (e.g., in a later run of gsutil).
46
47 Note that resumable downloads work across providers (they depend only
48 on support Range GETs), but this code is in the boto.s3 package
49 because it is the wrong abstraction level to go in the top-level boto
50 package.
51
52 TODO: At some point we should refactor the code to have a storage_service
53 package where all these provider-independent files go.
54 """
55
56
57 class ByteTranslatingCallbackHandler(object):
58 """
59 Proxy class that translates progress callbacks made by
60 boto.s3.Key.get_file(), taking into account that we're resuming
61 a download.
62 """
63 def __init__(self, proxied_cb, download_start_point):
64 self.proxied_cb = proxied_cb
65 self.download_start_point = download_start_point
66
67 def call(self, total_bytes_uploaded, total_size):
68 self.proxied_cb(self.download_start_point + total_bytes_uploaded,
69 total_size)
70
71
72 def get_cur_file_size(fp, position_to_eof=False):
73 """
74 Returns size of file, optionally leaving fp positioned at EOF.
75 """
76 if isinstance(fp, KeyFile) and not position_to_eof:
77 # Avoid EOF seek for KeyFile case as it's very inefficient.
78 return fp.getkey().size
79 if not position_to_eof:
80 cur_pos = fp.tell()
81 fp.seek(0, os.SEEK_END)
82 cur_file_size = fp.tell()
83 if not position_to_eof:
84 fp.seek(cur_pos, os.SEEK_SET)
85 return cur_file_size
86
87
88 class ResumableDownloadHandler(object):
89 """
90 Handler for resumable downloads.
91 """
92
93 MIN_ETAG_LEN = 5
94
95 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error,
96 socket.gaierror)
97
98 def __init__(self, tracker_file_name=None, num_retries=None):
99 """
100 Constructor. Instantiate once for each downloaded file.
101
102 :type tracker_file_name: string
103 :param tracker_file_name: optional file name to save tracking info
104 about this download. If supplied and the current process fails
105 the download, it can be retried in a new process. If called
106 with an existing file containing an unexpired timestamp,
107 we'll resume the transfer for this file; else we'll start a
108 new resumable download.
109
110 :type num_retries: int
111 :param num_retries: the number of times we'll re-try a resumable
112 download making no progress. (Count resets every time we get
113 progress, so download can span many more than this number of
114 retries.)
115 """
116 self.tracker_file_name = tracker_file_name
117 self.num_retries = num_retries
118 self.etag_value_for_current_download = None
119 if tracker_file_name:
120 self._load_tracker_file_etag()
121 # Save download_start_point in instance state so caller can
122 # find how much was transferred by this ResumableDownloadHandler
123 # (across retries).
124 self.download_start_point = None
125
126 def _load_tracker_file_etag(self):
127 f = None
128 try:
129 f = open(self.tracker_file_name, 'r')
130 self.etag_value_for_current_download = f.readline().rstrip('\n')
131 # We used to match an MD5-based regex to ensure that the etag was
132 # read correctly. Since ETags need not be MD5s, we now do a simple
133 # length sanity check instead.
134 if len(self.etag_value_for_current_download) < self.MIN_ETAG_LEN:
135 print('Couldn\'t read etag in tracker file (%s). Restarting '
136 'download from scratch.' % self.tracker_file_name)
137 except IOError as e:
138 # Ignore non-existent file (happens first time a download
139 # is attempted on an object), but warn user for other errors.
140 if e.errno != errno.ENOENT:
141 # Will restart because
142 # self.etag_value_for_current_download is None.
143 print('Couldn\'t read URI tracker file (%s): %s. Restarting '
144 'download from scratch.' %
145 (self.tracker_file_name, e.strerror))
146 finally:
147 if f:
148 f.close()
149
150 def _save_tracker_info(self, key):
151 self.etag_value_for_current_download = key.etag.strip('"\'')
152 if not self.tracker_file_name:
153 return
154 f = None
155 try:
156 f = open(self.tracker_file_name, 'w')
157 f.write('%s\n' % self.etag_value_for_current_download)
158 except IOError as e:
159 raise ResumableDownloadException(
160 'Couldn\'t write tracker file (%s): %s.\nThis can happen'
161 'if you\'re using an incorrectly configured download tool\n'
162 '(e.g., gsutil configured to save tracker files to an '
163 'unwritable directory)' %
164 (self.tracker_file_name, e.strerror),
165 ResumableTransferDisposition.ABORT)
166 finally:
167 if f:
168 f.close()
169
170 def _remove_tracker_file(self):
171 if (self.tracker_file_name and
172 os.path.exists(self.tracker_file_name)):
173 os.unlink(self.tracker_file_name)
174
175 def _attempt_resumable_download(self, key, fp, headers, cb, num_cb,
176 torrent, version_id, hash_algs):
177 """
178 Attempts a resumable download.
179
180 Raises ResumableDownloadException if any problems occur.
181 """
182 cur_file_size = get_cur_file_size(fp, position_to_eof=True)
183
184 if (cur_file_size and
185 self.etag_value_for_current_download and
186 self.etag_value_for_current_download == key.etag.strip('"\'')):
187 # Try to resume existing transfer.
188 if cur_file_size > key.size:
189 raise ResumableDownloadException(
190 '%s is larger (%d) than %s (%d).\nDeleting tracker file, so '
191 'if you re-try this download it will start from scratch' %
192 (fp.name, cur_file_size, str(storage_uri_for_key(key)),
193 key.size), ResumableTransferDisposition.ABORT)
194 elif cur_file_size == key.size:
195 if key.bucket.connection.debug >= 1:
196 print('Download complete.')
197 return
198 if key.bucket.connection.debug >= 1:
199 print('Resuming download.')
200 headers = headers.copy()
201 headers['Range'] = 'bytes=%d-%d' % (cur_file_size, key.size - 1)
202 cb = ByteTranslatingCallbackHandler(cb, cur_file_size).call
203 self.download_start_point = cur_file_size
204 else:
205 if key.bucket.connection.debug >= 1:
206 print('Starting new resumable download.')
207 self._save_tracker_info(key)
208 self.download_start_point = 0
209 # Truncate the file, in case a new resumable download is being
210 # started atop an existing file.
211 fp.truncate(0)
212
213 # Disable AWSAuthConnection-level retry behavior, since that would
214 # cause downloads to restart from scratch.
215 if isinstance(key, GSKey):
216 key.get_file(fp, headers, cb, num_cb, torrent, version_id,
217 override_num_retries=0, hash_algs=hash_algs)
218 else:
219 key.get_file(fp, headers, cb, num_cb, torrent, version_id,
220 override_num_retries=0)
221 fp.flush()
222
223 def get_file(self, key, fp, headers, cb=None, num_cb=10, torrent=False,
224 version_id=None, hash_algs=None):
225 """
226 Retrieves a file from a Key
227 :type key: :class:`boto.s3.key.Key` or subclass
228 :param key: The Key object from which upload is to be downloaded
229
230 :type fp: file
231 :param fp: File pointer into which data should be downloaded
232
233 :type headers: string
234 :param: headers to send when retrieving the files
235
236 :type cb: function
237 :param cb: (optional) a callback function that will be called to report
238 progress on the download. The callback should accept two integer
239 parameters, the first representing the number of bytes that have
240 been successfully transmitted from the storage service and
241 the second representing the total number of bytes that need
242 to be transmitted.
243
244 :type num_cb: int
245 :param num_cb: (optional) If a callback is specified with the cb
246 parameter this parameter determines the granularity of the callback
247 by defining the maximum number of times the callback will be
248 called during the file transfer.
249
250 :type torrent: bool
251 :param torrent: Flag for whether to get a torrent for the file
252
253 :type version_id: string
254 :param version_id: The version ID (optional)
255
256 :type hash_algs: dictionary
257 :param hash_algs: (optional) Dictionary of hash algorithms and
258 corresponding hashing class that implements update() and digest().
259 Defaults to {'md5': hashlib/md5.md5}.
260
261 Raises ResumableDownloadException if a problem occurs during
262 the transfer.
263 """
264
265 debug = key.bucket.connection.debug
266 if not headers:
267 headers = {}
268
269 # Use num-retries from constructor if one was provided; else check
270 # for a value specified in the boto config file; else default to 6.
271 if self.num_retries is None:
272 self.num_retries = config.getint('Boto', 'num_retries', 6)
273 progress_less_iterations = 0
274
275 while True: # Retry as long as we're making progress.
276 had_file_bytes_before_attempt = get_cur_file_size(fp)
277 try:
278 self._attempt_resumable_download(key, fp, headers, cb, num_cb,
279 torrent, version_id, hash_algs)
280 # Download succceded, so remove the tracker file (if have one).
281 self._remove_tracker_file()
282 # Previously, check_final_md5() was called here to validate
283 # downloaded file's checksum, however, to be consistent with
284 # non-resumable downloads, this call was removed. Checksum
285 # validation of file contents should be done by the caller.
286 if debug >= 1:
287 print('Resumable download complete.')
288 return
289 except self.RETRYABLE_EXCEPTIONS as e:
290 if debug >= 1:
291 print('Caught exception (%s)' % e.__repr__())
292 if isinstance(e, IOError) and e.errno == errno.EPIPE:
293 # Broken pipe error causes httplib to immediately
294 # close the socket (http://bugs.python.org/issue5542),
295 # so we need to close and reopen the key before resuming
296 # the download.
297 if isinstance(key, GSKey):
298 key.get_file(fp, headers, cb, num_cb, torrent, version_id,
299 override_num_retries=0, hash_algs=hash_algs)
300 else:
301 key.get_file(fp, headers, cb, num_cb, torrent, version_id,
302 override_num_retries=0)
303 except ResumableDownloadException as e:
304 if (e.disposition ==
305 ResumableTransferDisposition.ABORT_CUR_PROCESS):
306 if debug >= 1:
307 print('Caught non-retryable ResumableDownloadException '
308 '(%s)' % e.message)
309 raise
310 elif (e.disposition ==
311 ResumableTransferDisposition.ABORT):
312 if debug >= 1:
313 print('Caught non-retryable ResumableDownloadException '
314 '(%s); aborting and removing tracker file' %
315 e.message)
316 self._remove_tracker_file()
317 raise
318 else:
319 if debug >= 1:
320 print('Caught ResumableDownloadException (%s) - will '
321 'retry' % e.message)
322
323 # At this point we had a re-tryable failure; see if made progress.
324 if get_cur_file_size(fp) > had_file_bytes_before_attempt:
325 progress_less_iterations = 0
326 else:
327 progress_less_iterations += 1
328
329 if progress_less_iterations > self.num_retries:
330 # Don't retry any longer in the current process.
331 raise ResumableDownloadException(
332 'Too many resumable download attempts failed without '
333 'progress. You might try this download again later',
334 ResumableTransferDisposition.ABORT_CUR_PROCESS)
335
336 # Close the key, in case a previous download died partway
337 # through and left data in the underlying key HTTP buffer.
338 # Do this within a try/except block in case the connection is
339 # closed (since key.close() attempts to do a final read, in which
340 # case this read attempt would get an IncompleteRead exception,
341 # which we can safely ignore.
342 try:
343 key.close()
344 except httplib.IncompleteRead:
345 pass
346
347 sleep_time_secs = 2**progress_less_iterations
348 if debug >= 1:
349 print('Got retryable failure (%d progress-less in a row).\n'
350 'Sleeping %d seconds before re-trying' %
351 (progress_less_iterations, sleep_time_secs))
352 time.sleep(sleep_time_secs)