comparison planemo/lib/python3.7/site-packages/requests_toolbelt/threaded/pool.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 """Module implementing the Pool for :mod:``requests_toolbelt.threaded``."""
2 import multiprocessing
3 import requests
4
5 from . import thread
6 from .._compat import queue
7
8
9 class Pool(object):
10 """Pool that manages the threads containing sessions.
11
12 :param queue:
13 The queue you're expected to use to which you should add items.
14 :type queue: queue.Queue
15 :param initializer:
16 Function used to initialize an instance of ``session``.
17 :type initializer: collections.Callable
18 :param auth_generator:
19 Function used to generate new auth credentials for the session.
20 :type auth_generator: collections.Callable
21 :param int num_process:
22 Number of threads to create.
23 :param session:
24 :type session: requests.Session
25 """
26
27 def __init__(self, job_queue, initializer=None, auth_generator=None,
28 num_processes=None, session=requests.Session):
29 if num_processes is None:
30 num_processes = multiprocessing.cpu_count() or 1
31
32 if num_processes < 1:
33 raise ValueError("Number of processes should at least be 1.")
34
35 self._job_queue = job_queue
36 self._response_queue = queue.Queue()
37 self._exc_queue = queue.Queue()
38 self._processes = num_processes
39 self._initializer = initializer or _identity
40 self._auth = auth_generator or _identity
41 self._session = session
42 self._pool = [
43 thread.SessionThread(self._new_session(), self._job_queue,
44 self._response_queue, self._exc_queue)
45 for _ in range(self._processes)
46 ]
47
48 def _new_session(self):
49 return self._auth(self._initializer(self._session()))
50
51 @classmethod
52 def from_exceptions(cls, exceptions, **kwargs):
53 r"""Create a :class:`~Pool` from an :class:`~ThreadException`\ s.
54
55 Provided an iterable that provides :class:`~ThreadException` objects,
56 this classmethod will generate a new pool to retry the requests that
57 caused the exceptions.
58
59 :param exceptions:
60 Iterable that returns :class:`~ThreadException`
61 :type exceptions: iterable
62 :param kwargs:
63 Keyword arguments passed to the :class:`~Pool` initializer.
64 :returns: An initialized :class:`~Pool` object.
65 :rtype: :class:`~Pool`
66 """
67 job_queue = queue.Queue()
68 for exc in exceptions:
69 job_queue.put(exc.request_kwargs)
70
71 return cls(job_queue=job_queue, **kwargs)
72
73 @classmethod
74 def from_urls(cls, urls, request_kwargs=None, **kwargs):
75 """Create a :class:`~Pool` from an iterable of URLs.
76
77 :param urls:
78 Iterable that returns URLs with which we create a pool.
79 :type urls: iterable
80 :param dict request_kwargs:
81 Dictionary of other keyword arguments to provide to the request
82 method.
83 :param kwargs:
84 Keyword arguments passed to the :class:`~Pool` initializer.
85 :returns: An initialized :class:`~Pool` object.
86 :rtype: :class:`~Pool`
87 """
88 request_dict = {'method': 'GET'}
89 request_dict.update(request_kwargs or {})
90 job_queue = queue.Queue()
91 for url in urls:
92 job = request_dict.copy()
93 job.update({'url': url})
94 job_queue.put(job)
95
96 return cls(job_queue=job_queue, **kwargs)
97
98 def exceptions(self):
99 """Iterate over all the exceptions in the pool.
100
101 :returns: Generator of :class:`~ThreadException`
102 """
103 while True:
104 exc = self.get_exception()
105 if exc is None:
106 break
107 yield exc
108
109 def get_exception(self):
110 """Get an exception from the pool.
111
112 :rtype: :class:`~ThreadException`
113 """
114 try:
115 (request, exc) = self._exc_queue.get_nowait()
116 except queue.Empty:
117 return None
118 else:
119 return ThreadException(request, exc)
120
121 def get_response(self):
122 """Get a response from the pool.
123
124 :rtype: :class:`~ThreadResponse`
125 """
126 try:
127 (request, response) = self._response_queue.get_nowait()
128 except queue.Empty:
129 return None
130 else:
131 return ThreadResponse(request, response)
132
133 def responses(self):
134 """Iterate over all the responses in the pool.
135
136 :returns: Generator of :class:`~ThreadResponse`
137 """
138 while True:
139 resp = self.get_response()
140 if resp is None:
141 break
142 yield resp
143
144 def join_all(self):
145 """Join all the threads to the master thread."""
146 for session_thread in self._pool:
147 session_thread.join()
148
149
150 class ThreadProxy(object):
151 proxied_attr = None
152
153 def __getattr__(self, attr):
154 """Proxy attribute accesses to the proxied object."""
155 get = object.__getattribute__
156 if attr not in self.attrs:
157 response = get(self, self.proxied_attr)
158 return getattr(response, attr)
159 else:
160 return get(self, attr)
161
162
163 class ThreadResponse(ThreadProxy):
164 """A wrapper around a requests Response object.
165
166 This will proxy most attribute access actions to the Response object. For
167 example, if you wanted the parsed JSON from the response, you might do:
168
169 .. code-block:: python
170
171 thread_response = pool.get_response()
172 json = thread_response.json()
173
174 """
175 proxied_attr = 'response'
176 attrs = frozenset(['request_kwargs', 'response'])
177
178 def __init__(self, request_kwargs, response):
179 #: The original keyword arguments provided to the queue
180 self.request_kwargs = request_kwargs
181 #: The wrapped response
182 self.response = response
183
184
185 class ThreadException(ThreadProxy):
186 """A wrapper around an exception raised during a request.
187
188 This will proxy most attribute access actions to the exception object. For
189 example, if you wanted the message from the exception, you might do:
190
191 .. code-block:: python
192
193 thread_exc = pool.get_exception()
194 msg = thread_exc.message
195
196 """
197 proxied_attr = 'exception'
198 attrs = frozenset(['request_kwargs', 'exception'])
199
200 def __init__(self, request_kwargs, exception):
201 #: The original keyword arguments provided to the queue
202 self.request_kwargs = request_kwargs
203 #: The captured and wrapped exception
204 self.exception = exception
205
206
207 def _identity(session_obj):
208 return session_obj
209
210
211 __all__ = ['ThreadException', 'ThreadResponse', 'Pool']