Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/requests_toolbelt/threaded/pool.py @ 1:56ad4e20f292 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author | guerler |
---|---|
date | Fri, 31 Jul 2020 00:32:28 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:d30785e31577 | 1:56ad4e20f292 |
---|---|
1 """Module implementing the Pool for :mod:``requests_toolbelt.threaded``.""" | |
2 import multiprocessing | |
3 import requests | |
4 | |
5 from . import thread | |
6 from .._compat import queue | |
7 | |
8 | |
9 class Pool(object): | |
10 """Pool that manages the threads containing sessions. | |
11 | |
12 :param queue: | |
13 The queue you're expected to use to which you should add items. | |
14 :type queue: queue.Queue | |
15 :param initializer: | |
16 Function used to initialize an instance of ``session``. | |
17 :type initializer: collections.Callable | |
18 :param auth_generator: | |
19 Function used to generate new auth credentials for the session. | |
20 :type auth_generator: collections.Callable | |
21 :param int num_process: | |
22 Number of threads to create. | |
23 :param session: | |
24 :type session: requests.Session | |
25 """ | |
26 | |
27 def __init__(self, job_queue, initializer=None, auth_generator=None, | |
28 num_processes=None, session=requests.Session): | |
29 if num_processes is None: | |
30 num_processes = multiprocessing.cpu_count() or 1 | |
31 | |
32 if num_processes < 1: | |
33 raise ValueError("Number of processes should at least be 1.") | |
34 | |
35 self._job_queue = job_queue | |
36 self._response_queue = queue.Queue() | |
37 self._exc_queue = queue.Queue() | |
38 self._processes = num_processes | |
39 self._initializer = initializer or _identity | |
40 self._auth = auth_generator or _identity | |
41 self._session = session | |
42 self._pool = [ | |
43 thread.SessionThread(self._new_session(), self._job_queue, | |
44 self._response_queue, self._exc_queue) | |
45 for _ in range(self._processes) | |
46 ] | |
47 | |
48 def _new_session(self): | |
49 return self._auth(self._initializer(self._session())) | |
50 | |
51 @classmethod | |
52 def from_exceptions(cls, exceptions, **kwargs): | |
53 r"""Create a :class:`~Pool` from an :class:`~ThreadException`\ s. | |
54 | |
55 Provided an iterable that provides :class:`~ThreadException` objects, | |
56 this classmethod will generate a new pool to retry the requests that | |
57 caused the exceptions. | |
58 | |
59 :param exceptions: | |
60 Iterable that returns :class:`~ThreadException` | |
61 :type exceptions: iterable | |
62 :param kwargs: | |
63 Keyword arguments passed to the :class:`~Pool` initializer. | |
64 :returns: An initialized :class:`~Pool` object. | |
65 :rtype: :class:`~Pool` | |
66 """ | |
67 job_queue = queue.Queue() | |
68 for exc in exceptions: | |
69 job_queue.put(exc.request_kwargs) | |
70 | |
71 return cls(job_queue=job_queue, **kwargs) | |
72 | |
73 @classmethod | |
74 def from_urls(cls, urls, request_kwargs=None, **kwargs): | |
75 """Create a :class:`~Pool` from an iterable of URLs. | |
76 | |
77 :param urls: | |
78 Iterable that returns URLs with which we create a pool. | |
79 :type urls: iterable | |
80 :param dict request_kwargs: | |
81 Dictionary of other keyword arguments to provide to the request | |
82 method. | |
83 :param kwargs: | |
84 Keyword arguments passed to the :class:`~Pool` initializer. | |
85 :returns: An initialized :class:`~Pool` object. | |
86 :rtype: :class:`~Pool` | |
87 """ | |
88 request_dict = {'method': 'GET'} | |
89 request_dict.update(request_kwargs or {}) | |
90 job_queue = queue.Queue() | |
91 for url in urls: | |
92 job = request_dict.copy() | |
93 job.update({'url': url}) | |
94 job_queue.put(job) | |
95 | |
96 return cls(job_queue=job_queue, **kwargs) | |
97 | |
98 def exceptions(self): | |
99 """Iterate over all the exceptions in the pool. | |
100 | |
101 :returns: Generator of :class:`~ThreadException` | |
102 """ | |
103 while True: | |
104 exc = self.get_exception() | |
105 if exc is None: | |
106 break | |
107 yield exc | |
108 | |
109 def get_exception(self): | |
110 """Get an exception from the pool. | |
111 | |
112 :rtype: :class:`~ThreadException` | |
113 """ | |
114 try: | |
115 (request, exc) = self._exc_queue.get_nowait() | |
116 except queue.Empty: | |
117 return None | |
118 else: | |
119 return ThreadException(request, exc) | |
120 | |
121 def get_response(self): | |
122 """Get a response from the pool. | |
123 | |
124 :rtype: :class:`~ThreadResponse` | |
125 """ | |
126 try: | |
127 (request, response) = self._response_queue.get_nowait() | |
128 except queue.Empty: | |
129 return None | |
130 else: | |
131 return ThreadResponse(request, response) | |
132 | |
133 def responses(self): | |
134 """Iterate over all the responses in the pool. | |
135 | |
136 :returns: Generator of :class:`~ThreadResponse` | |
137 """ | |
138 while True: | |
139 resp = self.get_response() | |
140 if resp is None: | |
141 break | |
142 yield resp | |
143 | |
144 def join_all(self): | |
145 """Join all the threads to the master thread.""" | |
146 for session_thread in self._pool: | |
147 session_thread.join() | |
148 | |
149 | |
150 class ThreadProxy(object): | |
151 proxied_attr = None | |
152 | |
153 def __getattr__(self, attr): | |
154 """Proxy attribute accesses to the proxied object.""" | |
155 get = object.__getattribute__ | |
156 if attr not in self.attrs: | |
157 response = get(self, self.proxied_attr) | |
158 return getattr(response, attr) | |
159 else: | |
160 return get(self, attr) | |
161 | |
162 | |
163 class ThreadResponse(ThreadProxy): | |
164 """A wrapper around a requests Response object. | |
165 | |
166 This will proxy most attribute access actions to the Response object. For | |
167 example, if you wanted the parsed JSON from the response, you might do: | |
168 | |
169 .. code-block:: python | |
170 | |
171 thread_response = pool.get_response() | |
172 json = thread_response.json() | |
173 | |
174 """ | |
175 proxied_attr = 'response' | |
176 attrs = frozenset(['request_kwargs', 'response']) | |
177 | |
178 def __init__(self, request_kwargs, response): | |
179 #: The original keyword arguments provided to the queue | |
180 self.request_kwargs = request_kwargs | |
181 #: The wrapped response | |
182 self.response = response | |
183 | |
184 | |
185 class ThreadException(ThreadProxy): | |
186 """A wrapper around an exception raised during a request. | |
187 | |
188 This will proxy most attribute access actions to the exception object. For | |
189 example, if you wanted the message from the exception, you might do: | |
190 | |
191 .. code-block:: python | |
192 | |
193 thread_exc = pool.get_exception() | |
194 msg = thread_exc.message | |
195 | |
196 """ | |
197 proxied_attr = 'exception' | |
198 attrs = frozenset(['request_kwargs', 'exception']) | |
199 | |
200 def __init__(self, request_kwargs, exception): | |
201 #: The original keyword arguments provided to the queue | |
202 self.request_kwargs = request_kwargs | |
203 #: The captured and wrapped exception | |
204 self.exception = exception | |
205 | |
206 | |
207 def _identity(session_obj): | |
208 return session_obj | |
209 | |
210 | |
211 __all__ = ['ThreadException', 'ThreadResponse', 'Pool'] |