diff env/lib/python3.9/site-packages/requests_toolbelt/auth/http_proxy_digest.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.9/site-packages/requests_toolbelt/auth/http_proxy_digest.py	Mon Mar 22 18:12:50 2021 +0000
@@ -0,0 +1,103 @@
+# -*- coding: utf-8 -*-
+"""The module containing HTTPProxyDigestAuth."""
+import re
+
+from requests import cookies, utils
+
+from . import _digest_auth_compat as auth
+
+
+class HTTPProxyDigestAuth(auth.HTTPDigestAuth):
+    """HTTP digest authentication between proxy
+
+    :param stale_rejects: The number of rejects indicate that:
+        the client may wish to simply retry the request
+        with a new encrypted response, without reprompting the user for a
+        new username and password. i.e., retry build_digest_header
+    :type stale_rejects: int
+    """
+    _pat = re.compile(r'digest ', flags=re.IGNORECASE)
+
+    def __init__(self, *args, **kwargs):
+        super(HTTPProxyDigestAuth, self).__init__(*args, **kwargs)
+        self.stale_rejects = 0
+
+        self.init_per_thread_state()
+
+    @property
+    def stale_rejects(self):
+        thread_local = getattr(self, '_thread_local', None)
+        if thread_local is None:
+            return self._stale_rejects
+        return thread_local.stale_rejects
+
+    @stale_rejects.setter
+    def stale_rejects(self, value):
+        thread_local = getattr(self, '_thread_local', None)
+        if thread_local is None:
+            self._stale_rejects = value
+        else:
+            thread_local.stale_rejects = value
+
+    def init_per_thread_state(self):
+        try:
+            super(HTTPProxyDigestAuth, self).init_per_thread_state()
+        except AttributeError:
+            # If we're not on requests 2.8.0+ this method does not exist
+            pass
+
+    def handle_407(self, r, **kwargs):
+        """Handle HTTP 407 only once, otherwise give up
+
+        :param r: current response
+        :returns: responses, along with the new response
+        """
+        if r.status_code == 407 and self.stale_rejects < 2:
+            s_auth = r.headers.get("proxy-authenticate")
+            if s_auth is None:
+                raise IOError(
+                    "proxy server violated RFC 7235:"
+                    "407 response MUST contain header proxy-authenticate")
+            elif not self._pat.match(s_auth):
+                return r
+
+            self.chal = utils.parse_dict_header(
+                self._pat.sub('', s_auth, count=1))
+
+            # if we present the user/passwd and still get rejected
+            # http://tools.ietf.org/html/rfc2617#section-3.2.1
+            if ('Proxy-Authorization' in r.request.headers and
+                    'stale' in self.chal):
+                if self.chal['stale'].lower() == 'true':  # try again
+                    self.stale_rejects += 1
+                # wrong user/passwd
+                elif self.chal['stale'].lower() == 'false':
+                    raise IOError("User or password is invalid")
+
+            # Consume content and release the original connection
+            # to allow our new request to reuse the same one.
+            r.content
+            r.close()
+            prep = r.request.copy()
+            cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw)
+            prep.prepare_cookies(prep._cookies)
+
+            prep.headers['Proxy-Authorization'] = self.build_digest_header(
+                prep.method, prep.url)
+            _r = r.connection.send(prep, **kwargs)
+            _r.history.append(r)
+            _r.request = prep
+
+            return _r
+        else:  # give up authenticate
+            return r
+
+    def __call__(self, r):
+        self.init_per_thread_state()
+        # if we have nonce, then just use it, otherwise server will tell us
+        if self.last_nonce:
+            r.headers['Proxy-Authorization'] = self.build_digest_header(
+                r.method, r.url
+            )
+        r.register_hook('response', self.handle_407)
+        return r