diff env/lib/python3.7/site-packages/requests_toolbelt/multipart/decoder.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/requests_toolbelt/multipart/decoder.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,156 +0,0 @@
-# -*- coding: utf-8 -*-
-"""
-
-requests_toolbelt.multipart.decoder
-===================================
-
-This holds all the implementation details of the MultipartDecoder
-
-"""
-
-import sys
-import email.parser
-from .encoder import encode_with
-from requests.structures import CaseInsensitiveDict
-
-
-def _split_on_find(content, bound):
-    point = content.find(bound)
-    return content[:point], content[point + len(bound):]
-
-
-class ImproperBodyPartContentException(Exception):
-    pass
-
-
-class NonMultipartContentTypeException(Exception):
-    pass
-
-
-def _header_parser(string, encoding):
-    major = sys.version_info[0]
-    if major == 3:
-        string = string.decode(encoding)
-    headers = email.parser.HeaderParser().parsestr(string).items()
-    return (
-        (encode_with(k, encoding), encode_with(v, encoding))
-        for k, v in headers
-    )
-
-
-class BodyPart(object):
-    """
-
-    The ``BodyPart`` object is a ``Response``-like interface to an individual
-    subpart of a multipart response. It is expected that these will
-    generally be created by objects of the ``MultipartDecoder`` class.
-
-    Like ``Response``, there is a ``CaseInsensitiveDict`` object named headers,
-    ``content`` to access bytes, ``text`` to access unicode, and ``encoding``
-    to access the unicode codec.
-
-    """
-
-    def __init__(self, content, encoding):
-        self.encoding = encoding
-        headers = {}
-        # Split into header section (if any) and the content
-        if b'\r\n\r\n' in content:
-            first, self.content = _split_on_find(content, b'\r\n\r\n')
-            if first != b'':
-                headers = _header_parser(first.lstrip(), encoding)
-        else:
-            raise ImproperBodyPartContentException(
-                'content does not contain CR-LF-CR-LF'
-            )
-        self.headers = CaseInsensitiveDict(headers)
-
-    @property
-    def text(self):
-        """Content of the ``BodyPart`` in unicode."""
-        return self.content.decode(self.encoding)
-
-
-class MultipartDecoder(object):
-    """
-
-    The ``MultipartDecoder`` object parses the multipart payload of
-    a bytestring into a tuple of ``Response``-like ``BodyPart`` objects.
-
-    The basic usage is::
-
-        import requests
-        from requests_toolbelt import MultipartDecoder
-
-        response = request.get(url)
-        decoder = MultipartDecoder.from_response(response)
-        for part in decoder.parts:
-            print(part.headers['content-type'])
-
-    If the multipart content is not from a response, basic usage is::
-
-        from requests_toolbelt import MultipartDecoder
-
-        decoder = MultipartDecoder(content, content_type)
-        for part in decoder.parts:
-            print(part.headers['content-type'])
-
-    For both these usages, there is an optional ``encoding`` parameter. This is
-    a string, which is the name of the unicode codec to use (default is
-    ``'utf-8'``).
-
-    """
-    def __init__(self, content, content_type, encoding='utf-8'):
-        #: Original Content-Type header
-        self.content_type = content_type
-        #: Response body encoding
-        self.encoding = encoding
-        #: Parsed parts of the multipart response body
-        self.parts = tuple()
-        self._find_boundary()
-        self._parse_body(content)
-
-    def _find_boundary(self):
-        ct_info = tuple(x.strip() for x in self.content_type.split(';'))
-        mimetype = ct_info[0]
-        if mimetype.split('/')[0].lower() != 'multipart':
-            raise NonMultipartContentTypeException(
-                "Unexpected mimetype in content-type: '{0}'".format(mimetype)
-            )
-        for item in ct_info[1:]:
-            attr, value = _split_on_find(
-                item,
-                '='
-            )
-            if attr.lower() == 'boundary':
-                self.boundary = encode_with(value.strip('"'), self.encoding)
-
-    @staticmethod
-    def _fix_first_part(part, boundary_marker):
-        bm_len = len(boundary_marker)
-        if boundary_marker == part[:bm_len]:
-            return part[bm_len:]
-        else:
-            return part
-
-    def _parse_body(self, content):
-        boundary = b''.join((b'--', self.boundary))
-
-        def body_part(part):
-            fixed = MultipartDecoder._fix_first_part(part, boundary)
-            return BodyPart(fixed, self.encoding)
-
-        def test_part(part):
-            return (part != b'' and
-                    part != b'\r\n' and
-                    part[:4] != b'--\r\n' and
-                    part != b'--')
-
-        parts = content.split(b''.join((b'\r\n', boundary)))
-        self.parts = tuple(body_part(x) for x in parts if test_part(x))
-
-    @classmethod
-    def from_response(cls, response, encoding='utf-8'):
-        content = response.content
-        content_type = response.headers.get('content-type', None)
-        return cls(content, content_type, encoding)