Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/requests_toolbelt/threaded/pool.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
| author | shellac |
|---|---|
| date | Mon, 01 Jun 2020 08:59:25 -0400 |
| parents | 79f47841a781 |
| children |
comparison
equal
deleted
inserted
replaced
| 4:79f47841a781 | 5:9b1c78e6ba9c |
|---|---|
| 1 """Module implementing the Pool for :mod:``requests_toolbelt.threaded``.""" | |
| 2 import multiprocessing | |
| 3 import requests | |
| 4 | |
| 5 from . import thread | |
| 6 from .._compat import queue | |
| 7 | |
| 8 | |
| 9 class Pool(object): | |
| 10 """Pool that manages the threads containing sessions. | |
| 11 | |
| 12 :param queue: | |
| 13 The queue you're expected to use to which you should add items. | |
| 14 :type queue: queue.Queue | |
| 15 :param initializer: | |
| 16 Function used to initialize an instance of ``session``. | |
| 17 :type initializer: collections.Callable | |
| 18 :param auth_generator: | |
| 19 Function used to generate new auth credentials for the session. | |
| 20 :type auth_generator: collections.Callable | |
| 21 :param int num_process: | |
| 22 Number of threads to create. | |
| 23 :param session: | |
| 24 :type session: requests.Session | |
| 25 """ | |
| 26 | |
| 27 def __init__(self, job_queue, initializer=None, auth_generator=None, | |
| 28 num_processes=None, session=requests.Session): | |
| 29 if num_processes is None: | |
| 30 num_processes = multiprocessing.cpu_count() or 1 | |
| 31 | |
| 32 if num_processes < 1: | |
| 33 raise ValueError("Number of processes should at least be 1.") | |
| 34 | |
| 35 self._job_queue = job_queue | |
| 36 self._response_queue = queue.Queue() | |
| 37 self._exc_queue = queue.Queue() | |
| 38 self._processes = num_processes | |
| 39 self._initializer = initializer or _identity | |
| 40 self._auth = auth_generator or _identity | |
| 41 self._session = session | |
| 42 self._pool = [ | |
| 43 thread.SessionThread(self._new_session(), self._job_queue, | |
| 44 self._response_queue, self._exc_queue) | |
| 45 for _ in range(self._processes) | |
| 46 ] | |
| 47 | |
| 48 def _new_session(self): | |
| 49 return self._auth(self._initializer(self._session())) | |
| 50 | |
| 51 @classmethod | |
| 52 def from_exceptions(cls, exceptions, **kwargs): | |
| 53 r"""Create a :class:`~Pool` from an :class:`~ThreadException`\ s. | |
| 54 | |
| 55 Provided an iterable that provides :class:`~ThreadException` objects, | |
| 56 this classmethod will generate a new pool to retry the requests that | |
| 57 caused the exceptions. | |
| 58 | |
| 59 :param exceptions: | |
| 60 Iterable that returns :class:`~ThreadException` | |
| 61 :type exceptions: iterable | |
| 62 :param kwargs: | |
| 63 Keyword arguments passed to the :class:`~Pool` initializer. | |
| 64 :returns: An initialized :class:`~Pool` object. | |
| 65 :rtype: :class:`~Pool` | |
| 66 """ | |
| 67 job_queue = queue.Queue() | |
| 68 for exc in exceptions: | |
| 69 job_queue.put(exc.request_kwargs) | |
| 70 | |
| 71 return cls(job_queue=job_queue, **kwargs) | |
| 72 | |
| 73 @classmethod | |
| 74 def from_urls(cls, urls, request_kwargs=None, **kwargs): | |
| 75 """Create a :class:`~Pool` from an iterable of URLs. | |
| 76 | |
| 77 :param urls: | |
| 78 Iterable that returns URLs with which we create a pool. | |
| 79 :type urls: iterable | |
| 80 :param dict request_kwargs: | |
| 81 Dictionary of other keyword arguments to provide to the request | |
| 82 method. | |
| 83 :param kwargs: | |
| 84 Keyword arguments passed to the :class:`~Pool` initializer. | |
| 85 :returns: An initialized :class:`~Pool` object. | |
| 86 :rtype: :class:`~Pool` | |
| 87 """ | |
| 88 request_dict = {'method': 'GET'} | |
| 89 request_dict.update(request_kwargs or {}) | |
| 90 job_queue = queue.Queue() | |
| 91 for url in urls: | |
| 92 job = request_dict.copy() | |
| 93 job.update({'url': url}) | |
| 94 job_queue.put(job) | |
| 95 | |
| 96 return cls(job_queue=job_queue, **kwargs) | |
| 97 | |
| 98 def exceptions(self): | |
| 99 """Iterate over all the exceptions in the pool. | |
| 100 | |
| 101 :returns: Generator of :class:`~ThreadException` | |
| 102 """ | |
| 103 while True: | |
| 104 exc = self.get_exception() | |
| 105 if exc is None: | |
| 106 break | |
| 107 yield exc | |
| 108 | |
| 109 def get_exception(self): | |
| 110 """Get an exception from the pool. | |
| 111 | |
| 112 :rtype: :class:`~ThreadException` | |
| 113 """ | |
| 114 try: | |
| 115 (request, exc) = self._exc_queue.get_nowait() | |
| 116 except queue.Empty: | |
| 117 return None | |
| 118 else: | |
| 119 return ThreadException(request, exc) | |
| 120 | |
| 121 def get_response(self): | |
| 122 """Get a response from the pool. | |
| 123 | |
| 124 :rtype: :class:`~ThreadResponse` | |
| 125 """ | |
| 126 try: | |
| 127 (request, response) = self._response_queue.get_nowait() | |
| 128 except queue.Empty: | |
| 129 return None | |
| 130 else: | |
| 131 return ThreadResponse(request, response) | |
| 132 | |
| 133 def responses(self): | |
| 134 """Iterate over all the responses in the pool. | |
| 135 | |
| 136 :returns: Generator of :class:`~ThreadResponse` | |
| 137 """ | |
| 138 while True: | |
| 139 resp = self.get_response() | |
| 140 if resp is None: | |
| 141 break | |
| 142 yield resp | |
| 143 | |
| 144 def join_all(self): | |
| 145 """Join all the threads to the master thread.""" | |
| 146 for session_thread in self._pool: | |
| 147 session_thread.join() | |
| 148 | |
| 149 | |
| 150 class ThreadProxy(object): | |
| 151 proxied_attr = None | |
| 152 | |
| 153 def __getattr__(self, attr): | |
| 154 """Proxy attribute accesses to the proxied object.""" | |
| 155 get = object.__getattribute__ | |
| 156 if attr not in self.attrs: | |
| 157 response = get(self, self.proxied_attr) | |
| 158 return getattr(response, attr) | |
| 159 else: | |
| 160 return get(self, attr) | |
| 161 | |
| 162 | |
| 163 class ThreadResponse(ThreadProxy): | |
| 164 """A wrapper around a requests Response object. | |
| 165 | |
| 166 This will proxy most attribute access actions to the Response object. For | |
| 167 example, if you wanted the parsed JSON from the response, you might do: | |
| 168 | |
| 169 .. code-block:: python | |
| 170 | |
| 171 thread_response = pool.get_response() | |
| 172 json = thread_response.json() | |
| 173 | |
| 174 """ | |
| 175 proxied_attr = 'response' | |
| 176 attrs = frozenset(['request_kwargs', 'response']) | |
| 177 | |
| 178 def __init__(self, request_kwargs, response): | |
| 179 #: The original keyword arguments provided to the queue | |
| 180 self.request_kwargs = request_kwargs | |
| 181 #: The wrapped response | |
| 182 self.response = response | |
| 183 | |
| 184 | |
| 185 class ThreadException(ThreadProxy): | |
| 186 """A wrapper around an exception raised during a request. | |
| 187 | |
| 188 This will proxy most attribute access actions to the exception object. For | |
| 189 example, if you wanted the message from the exception, you might do: | |
| 190 | |
| 191 .. code-block:: python | |
| 192 | |
| 193 thread_exc = pool.get_exception() | |
| 194 msg = thread_exc.message | |
| 195 | |
| 196 """ | |
| 197 proxied_attr = 'exception' | |
| 198 attrs = frozenset(['request_kwargs', 'exception']) | |
| 199 | |
| 200 def __init__(self, request_kwargs, exception): | |
| 201 #: The original keyword arguments provided to the queue | |
| 202 self.request_kwargs = request_kwargs | |
| 203 #: The captured and wrapped exception | |
| 204 self.exception = exception | |
| 205 | |
| 206 | |
| 207 def _identity(session_obj): | |
| 208 return session_obj | |
| 209 | |
| 210 | |
| 211 __all__ = ['ThreadException', 'ThreadResponse', 'Pool'] |
