Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/boto/s3/resumable_download_handler.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:18:57 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d30785e31577 |
---|---|
1 # Copyright 2010 Google Inc. | |
2 # | |
3 # Permission is hereby granted, free of charge, to any person obtaining a | |
4 # copy of this software and associated documentation files (the | |
5 # "Software"), to deal in the Software without restriction, including | |
6 # without limitation the rights to use, copy, modify, merge, publish, dis- | |
7 # tribute, sublicense, and/or sell copies of the Software, and to permit | |
8 # persons to whom the Software is furnished to do so, subject to the fol- | |
9 # lowing conditions: | |
10 # | |
11 # The above copyright notice and this permission notice shall be included | |
12 # in all copies or substantial portions of the Software. | |
13 # | |
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
20 # IN THE SOFTWARE. | |
21 import errno | |
22 import httplib | |
23 import os | |
24 import re | |
25 import socket | |
26 import time | |
27 import boto | |
28 from boto import config, storage_uri_for_key | |
29 from boto.connection import AWSAuthConnection | |
30 from boto.exception import ResumableDownloadException | |
31 from boto.exception import ResumableTransferDisposition | |
32 from boto.s3.keyfile import KeyFile | |
33 from boto.gs.key import Key as GSKey | |
34 | |
35 """ | |
36 Resumable download handler. | |
37 | |
38 Resumable downloads will retry failed downloads, resuming at the byte count | |
39 completed by the last download attempt. If too many retries happen with no | |
40 progress (per configurable num_retries param), the download will be aborted. | |
41 | |
42 The caller can optionally specify a tracker_file_name param in the | |
43 ResumableDownloadHandler constructor. If you do this, that file will | |
44 save the state needed to allow retrying later, in a separate process | |
45 (e.g., in a later run of gsutil). | |
46 | |
47 Note that resumable downloads work across providers (they depend only | |
48 on support Range GETs), but this code is in the boto.s3 package | |
49 because it is the wrong abstraction level to go in the top-level boto | |
50 package. | |
51 | |
52 TODO: At some point we should refactor the code to have a storage_service | |
53 package where all these provider-independent files go. | |
54 """ | |
55 | |
56 | |
57 class ByteTranslatingCallbackHandler(object): | |
58 """ | |
59 Proxy class that translates progress callbacks made by | |
60 boto.s3.Key.get_file(), taking into account that we're resuming | |
61 a download. | |
62 """ | |
63 def __init__(self, proxied_cb, download_start_point): | |
64 self.proxied_cb = proxied_cb | |
65 self.download_start_point = download_start_point | |
66 | |
67 def call(self, total_bytes_uploaded, total_size): | |
68 self.proxied_cb(self.download_start_point + total_bytes_uploaded, | |
69 total_size) | |
70 | |
71 | |
72 def get_cur_file_size(fp, position_to_eof=False): | |
73 """ | |
74 Returns size of file, optionally leaving fp positioned at EOF. | |
75 """ | |
76 if isinstance(fp, KeyFile) and not position_to_eof: | |
77 # Avoid EOF seek for KeyFile case as it's very inefficient. | |
78 return fp.getkey().size | |
79 if not position_to_eof: | |
80 cur_pos = fp.tell() | |
81 fp.seek(0, os.SEEK_END) | |
82 cur_file_size = fp.tell() | |
83 if not position_to_eof: | |
84 fp.seek(cur_pos, os.SEEK_SET) | |
85 return cur_file_size | |
86 | |
87 | |
88 class ResumableDownloadHandler(object): | |
89 """ | |
90 Handler for resumable downloads. | |
91 """ | |
92 | |
93 MIN_ETAG_LEN = 5 | |
94 | |
95 RETRYABLE_EXCEPTIONS = (httplib.HTTPException, IOError, socket.error, | |
96 socket.gaierror) | |
97 | |
98 def __init__(self, tracker_file_name=None, num_retries=None): | |
99 """ | |
100 Constructor. Instantiate once for each downloaded file. | |
101 | |
102 :type tracker_file_name: string | |
103 :param tracker_file_name: optional file name to save tracking info | |
104 about this download. If supplied and the current process fails | |
105 the download, it can be retried in a new process. If called | |
106 with an existing file containing an unexpired timestamp, | |
107 we'll resume the transfer for this file; else we'll start a | |
108 new resumable download. | |
109 | |
110 :type num_retries: int | |
111 :param num_retries: the number of times we'll re-try a resumable | |
112 download making no progress. (Count resets every time we get | |
113 progress, so download can span many more than this number of | |
114 retries.) | |
115 """ | |
116 self.tracker_file_name = tracker_file_name | |
117 self.num_retries = num_retries | |
118 self.etag_value_for_current_download = None | |
119 if tracker_file_name: | |
120 self._load_tracker_file_etag() | |
121 # Save download_start_point in instance state so caller can | |
122 # find how much was transferred by this ResumableDownloadHandler | |
123 # (across retries). | |
124 self.download_start_point = None | |
125 | |
126 def _load_tracker_file_etag(self): | |
127 f = None | |
128 try: | |
129 f = open(self.tracker_file_name, 'r') | |
130 self.etag_value_for_current_download = f.readline().rstrip('\n') | |
131 # We used to match an MD5-based regex to ensure that the etag was | |
132 # read correctly. Since ETags need not be MD5s, we now do a simple | |
133 # length sanity check instead. | |
134 if len(self.etag_value_for_current_download) < self.MIN_ETAG_LEN: | |
135 print('Couldn\'t read etag in tracker file (%s). Restarting ' | |
136 'download from scratch.' % self.tracker_file_name) | |
137 except IOError as e: | |
138 # Ignore non-existent file (happens first time a download | |
139 # is attempted on an object), but warn user for other errors. | |
140 if e.errno != errno.ENOENT: | |
141 # Will restart because | |
142 # self.etag_value_for_current_download is None. | |
143 print('Couldn\'t read URI tracker file (%s): %s. Restarting ' | |
144 'download from scratch.' % | |
145 (self.tracker_file_name, e.strerror)) | |
146 finally: | |
147 if f: | |
148 f.close() | |
149 | |
150 def _save_tracker_info(self, key): | |
151 self.etag_value_for_current_download = key.etag.strip('"\'') | |
152 if not self.tracker_file_name: | |
153 return | |
154 f = None | |
155 try: | |
156 f = open(self.tracker_file_name, 'w') | |
157 f.write('%s\n' % self.etag_value_for_current_download) | |
158 except IOError as e: | |
159 raise ResumableDownloadException( | |
160 'Couldn\'t write tracker file (%s): %s.\nThis can happen' | |
161 'if you\'re using an incorrectly configured download tool\n' | |
162 '(e.g., gsutil configured to save tracker files to an ' | |
163 'unwritable directory)' % | |
164 (self.tracker_file_name, e.strerror), | |
165 ResumableTransferDisposition.ABORT) | |
166 finally: | |
167 if f: | |
168 f.close() | |
169 | |
170 def _remove_tracker_file(self): | |
171 if (self.tracker_file_name and | |
172 os.path.exists(self.tracker_file_name)): | |
173 os.unlink(self.tracker_file_name) | |
174 | |
175 def _attempt_resumable_download(self, key, fp, headers, cb, num_cb, | |
176 torrent, version_id, hash_algs): | |
177 """ | |
178 Attempts a resumable download. | |
179 | |
180 Raises ResumableDownloadException if any problems occur. | |
181 """ | |
182 cur_file_size = get_cur_file_size(fp, position_to_eof=True) | |
183 | |
184 if (cur_file_size and | |
185 self.etag_value_for_current_download and | |
186 self.etag_value_for_current_download == key.etag.strip('"\'')): | |
187 # Try to resume existing transfer. | |
188 if cur_file_size > key.size: | |
189 raise ResumableDownloadException( | |
190 '%s is larger (%d) than %s (%d).\nDeleting tracker file, so ' | |
191 'if you re-try this download it will start from scratch' % | |
192 (fp.name, cur_file_size, str(storage_uri_for_key(key)), | |
193 key.size), ResumableTransferDisposition.ABORT) | |
194 elif cur_file_size == key.size: | |
195 if key.bucket.connection.debug >= 1: | |
196 print('Download complete.') | |
197 return | |
198 if key.bucket.connection.debug >= 1: | |
199 print('Resuming download.') | |
200 headers = headers.copy() | |
201 headers['Range'] = 'bytes=%d-%d' % (cur_file_size, key.size - 1) | |
202 cb = ByteTranslatingCallbackHandler(cb, cur_file_size).call | |
203 self.download_start_point = cur_file_size | |
204 else: | |
205 if key.bucket.connection.debug >= 1: | |
206 print('Starting new resumable download.') | |
207 self._save_tracker_info(key) | |
208 self.download_start_point = 0 | |
209 # Truncate the file, in case a new resumable download is being | |
210 # started atop an existing file. | |
211 fp.truncate(0) | |
212 | |
213 # Disable AWSAuthConnection-level retry behavior, since that would | |
214 # cause downloads to restart from scratch. | |
215 if isinstance(key, GSKey): | |
216 key.get_file(fp, headers, cb, num_cb, torrent, version_id, | |
217 override_num_retries=0, hash_algs=hash_algs) | |
218 else: | |
219 key.get_file(fp, headers, cb, num_cb, torrent, version_id, | |
220 override_num_retries=0) | |
221 fp.flush() | |
222 | |
223 def get_file(self, key, fp, headers, cb=None, num_cb=10, torrent=False, | |
224 version_id=None, hash_algs=None): | |
225 """ | |
226 Retrieves a file from a Key | |
227 :type key: :class:`boto.s3.key.Key` or subclass | |
228 :param key: The Key object from which upload is to be downloaded | |
229 | |
230 :type fp: file | |
231 :param fp: File pointer into which data should be downloaded | |
232 | |
233 :type headers: string | |
234 :param: headers to send when retrieving the files | |
235 | |
236 :type cb: function | |
237 :param cb: (optional) a callback function that will be called to report | |
238 progress on the download. The callback should accept two integer | |
239 parameters, the first representing the number of bytes that have | |
240 been successfully transmitted from the storage service and | |
241 the second representing the total number of bytes that need | |
242 to be transmitted. | |
243 | |
244 :type num_cb: int | |
245 :param num_cb: (optional) If a callback is specified with the cb | |
246 parameter this parameter determines the granularity of the callback | |
247 by defining the maximum number of times the callback will be | |
248 called during the file transfer. | |
249 | |
250 :type torrent: bool | |
251 :param torrent: Flag for whether to get a torrent for the file | |
252 | |
253 :type version_id: string | |
254 :param version_id: The version ID (optional) | |
255 | |
256 :type hash_algs: dictionary | |
257 :param hash_algs: (optional) Dictionary of hash algorithms and | |
258 corresponding hashing class that implements update() and digest(). | |
259 Defaults to {'md5': hashlib/md5.md5}. | |
260 | |
261 Raises ResumableDownloadException if a problem occurs during | |
262 the transfer. | |
263 """ | |
264 | |
265 debug = key.bucket.connection.debug | |
266 if not headers: | |
267 headers = {} | |
268 | |
269 # Use num-retries from constructor if one was provided; else check | |
270 # for a value specified in the boto config file; else default to 6. | |
271 if self.num_retries is None: | |
272 self.num_retries = config.getint('Boto', 'num_retries', 6) | |
273 progress_less_iterations = 0 | |
274 | |
275 while True: # Retry as long as we're making progress. | |
276 had_file_bytes_before_attempt = get_cur_file_size(fp) | |
277 try: | |
278 self._attempt_resumable_download(key, fp, headers, cb, num_cb, | |
279 torrent, version_id, hash_algs) | |
280 # Download succceded, so remove the tracker file (if have one). | |
281 self._remove_tracker_file() | |
282 # Previously, check_final_md5() was called here to validate | |
283 # downloaded file's checksum, however, to be consistent with | |
284 # non-resumable downloads, this call was removed. Checksum | |
285 # validation of file contents should be done by the caller. | |
286 if debug >= 1: | |
287 print('Resumable download complete.') | |
288 return | |
289 except self.RETRYABLE_EXCEPTIONS as e: | |
290 if debug >= 1: | |
291 print('Caught exception (%s)' % e.__repr__()) | |
292 if isinstance(e, IOError) and e.errno == errno.EPIPE: | |
293 # Broken pipe error causes httplib to immediately | |
294 # close the socket (http://bugs.python.org/issue5542), | |
295 # so we need to close and reopen the key before resuming | |
296 # the download. | |
297 if isinstance(key, GSKey): | |
298 key.get_file(fp, headers, cb, num_cb, torrent, version_id, | |
299 override_num_retries=0, hash_algs=hash_algs) | |
300 else: | |
301 key.get_file(fp, headers, cb, num_cb, torrent, version_id, | |
302 override_num_retries=0) | |
303 except ResumableDownloadException as e: | |
304 if (e.disposition == | |
305 ResumableTransferDisposition.ABORT_CUR_PROCESS): | |
306 if debug >= 1: | |
307 print('Caught non-retryable ResumableDownloadException ' | |
308 '(%s)' % e.message) | |
309 raise | |
310 elif (e.disposition == | |
311 ResumableTransferDisposition.ABORT): | |
312 if debug >= 1: | |
313 print('Caught non-retryable ResumableDownloadException ' | |
314 '(%s); aborting and removing tracker file' % | |
315 e.message) | |
316 self._remove_tracker_file() | |
317 raise | |
318 else: | |
319 if debug >= 1: | |
320 print('Caught ResumableDownloadException (%s) - will ' | |
321 'retry' % e.message) | |
322 | |
323 # At this point we had a re-tryable failure; see if made progress. | |
324 if get_cur_file_size(fp) > had_file_bytes_before_attempt: | |
325 progress_less_iterations = 0 | |
326 else: | |
327 progress_less_iterations += 1 | |
328 | |
329 if progress_less_iterations > self.num_retries: | |
330 # Don't retry any longer in the current process. | |
331 raise ResumableDownloadException( | |
332 'Too many resumable download attempts failed without ' | |
333 'progress. You might try this download again later', | |
334 ResumableTransferDisposition.ABORT_CUR_PROCESS) | |
335 | |
336 # Close the key, in case a previous download died partway | |
337 # through and left data in the underlying key HTTP buffer. | |
338 # Do this within a try/except block in case the connection is | |
339 # closed (since key.close() attempts to do a final read, in which | |
340 # case this read attempt would get an IncompleteRead exception, | |
341 # which we can safely ignore. | |
342 try: | |
343 key.close() | |
344 except httplib.IncompleteRead: | |
345 pass | |
346 | |
347 sleep_time_secs = 2**progress_less_iterations | |
348 if debug >= 1: | |
349 print('Got retryable failure (%d progress-less in a row).\n' | |
350 'Sleeping %d seconds before re-trying' % | |
351 (progress_less_iterations, sleep_time_secs)) | |
352 time.sleep(sleep_time_secs) |