Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/requests_toolbelt/multipart/decoder.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
| author | shellac |
|---|---|
| date | Sat, 02 May 2020 07:14:21 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:26e78fe6e8c4 |
|---|---|
| 1 # -*- coding: utf-8 -*- | |
| 2 """ | |
| 3 | |
| 4 requests_toolbelt.multipart.decoder | |
| 5 =================================== | |
| 6 | |
| 7 This holds all the implementation details of the MultipartDecoder | |
| 8 | |
| 9 """ | |
| 10 | |
| 11 import sys | |
| 12 import email.parser | |
| 13 from .encoder import encode_with | |
| 14 from requests.structures import CaseInsensitiveDict | |
| 15 | |
| 16 | |
| 17 def _split_on_find(content, bound): | |
| 18 point = content.find(bound) | |
| 19 return content[:point], content[point + len(bound):] | |
| 20 | |
| 21 | |
| 22 class ImproperBodyPartContentException(Exception): | |
| 23 pass | |
| 24 | |
| 25 | |
| 26 class NonMultipartContentTypeException(Exception): | |
| 27 pass | |
| 28 | |
| 29 | |
| 30 def _header_parser(string, encoding): | |
| 31 major = sys.version_info[0] | |
| 32 if major == 3: | |
| 33 string = string.decode(encoding) | |
| 34 headers = email.parser.HeaderParser().parsestr(string).items() | |
| 35 return ( | |
| 36 (encode_with(k, encoding), encode_with(v, encoding)) | |
| 37 for k, v in headers | |
| 38 ) | |
| 39 | |
| 40 | |
| 41 class BodyPart(object): | |
| 42 """ | |
| 43 | |
| 44 The ``BodyPart`` object is a ``Response``-like interface to an individual | |
| 45 subpart of a multipart response. It is expected that these will | |
| 46 generally be created by objects of the ``MultipartDecoder`` class. | |
| 47 | |
| 48 Like ``Response``, there is a ``CaseInsensitiveDict`` object named headers, | |
| 49 ``content`` to access bytes, ``text`` to access unicode, and ``encoding`` | |
| 50 to access the unicode codec. | |
| 51 | |
| 52 """ | |
| 53 | |
| 54 def __init__(self, content, encoding): | |
| 55 self.encoding = encoding | |
| 56 headers = {} | |
| 57 # Split into header section (if any) and the content | |
| 58 if b'\r\n\r\n' in content: | |
| 59 first, self.content = _split_on_find(content, b'\r\n\r\n') | |
| 60 if first != b'': | |
| 61 headers = _header_parser(first.lstrip(), encoding) | |
| 62 else: | |
| 63 raise ImproperBodyPartContentException( | |
| 64 'content does not contain CR-LF-CR-LF' | |
| 65 ) | |
| 66 self.headers = CaseInsensitiveDict(headers) | |
| 67 | |
| 68 @property | |
| 69 def text(self): | |
| 70 """Content of the ``BodyPart`` in unicode.""" | |
| 71 return self.content.decode(self.encoding) | |
| 72 | |
| 73 | |
| 74 class MultipartDecoder(object): | |
| 75 """ | |
| 76 | |
| 77 The ``MultipartDecoder`` object parses the multipart payload of | |
| 78 a bytestring into a tuple of ``Response``-like ``BodyPart`` objects. | |
| 79 | |
| 80 The basic usage is:: | |
| 81 | |
| 82 import requests | |
| 83 from requests_toolbelt import MultipartDecoder | |
| 84 | |
| 85 response = request.get(url) | |
| 86 decoder = MultipartDecoder.from_response(response) | |
| 87 for part in decoder.parts: | |
| 88 print(part.headers['content-type']) | |
| 89 | |
| 90 If the multipart content is not from a response, basic usage is:: | |
| 91 | |
| 92 from requests_toolbelt import MultipartDecoder | |
| 93 | |
| 94 decoder = MultipartDecoder(content, content_type) | |
| 95 for part in decoder.parts: | |
| 96 print(part.headers['content-type']) | |
| 97 | |
| 98 For both these usages, there is an optional ``encoding`` parameter. This is | |
| 99 a string, which is the name of the unicode codec to use (default is | |
| 100 ``'utf-8'``). | |
| 101 | |
| 102 """ | |
| 103 def __init__(self, content, content_type, encoding='utf-8'): | |
| 104 #: Original Content-Type header | |
| 105 self.content_type = content_type | |
| 106 #: Response body encoding | |
| 107 self.encoding = encoding | |
| 108 #: Parsed parts of the multipart response body | |
| 109 self.parts = tuple() | |
| 110 self._find_boundary() | |
| 111 self._parse_body(content) | |
| 112 | |
| 113 def _find_boundary(self): | |
| 114 ct_info = tuple(x.strip() for x in self.content_type.split(';')) | |
| 115 mimetype = ct_info[0] | |
| 116 if mimetype.split('/')[0].lower() != 'multipart': | |
| 117 raise NonMultipartContentTypeException( | |
| 118 "Unexpected mimetype in content-type: '{0}'".format(mimetype) | |
| 119 ) | |
| 120 for item in ct_info[1:]: | |
| 121 attr, value = _split_on_find( | |
| 122 item, | |
| 123 '=' | |
| 124 ) | |
| 125 if attr.lower() == 'boundary': | |
| 126 self.boundary = encode_with(value.strip('"'), self.encoding) | |
| 127 | |
| 128 @staticmethod | |
| 129 def _fix_first_part(part, boundary_marker): | |
| 130 bm_len = len(boundary_marker) | |
| 131 if boundary_marker == part[:bm_len]: | |
| 132 return part[bm_len:] | |
| 133 else: | |
| 134 return part | |
| 135 | |
| 136 def _parse_body(self, content): | |
| 137 boundary = b''.join((b'--', self.boundary)) | |
| 138 | |
| 139 def body_part(part): | |
| 140 fixed = MultipartDecoder._fix_first_part(part, boundary) | |
| 141 return BodyPart(fixed, self.encoding) | |
| 142 | |
| 143 def test_part(part): | |
| 144 return (part != b'' and | |
| 145 part != b'\r\n' and | |
| 146 part[:4] != b'--\r\n' and | |
| 147 part != b'--') | |
| 148 | |
| 149 parts = content.split(b''.join((b'\r\n', boundary))) | |
| 150 self.parts = tuple(body_part(x) for x in parts if test_part(x)) | |
| 151 | |
| 152 @classmethod | |
| 153 def from_response(cls, response, encoding='utf-8'): | |
| 154 content = response.content | |
| 155 content_type = response.headers.get('content-type', None) | |
| 156 return cls(content, content_type, encoding) |
