comparison env/lib/python3.9/site-packages/boltons/socketutils.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 # -*- coding: utf-8 -*-
2 """At its heart, Python can be viewed as an extension of the C
3 programming language. Springing from the most popular systems
4 programming language has made Python itself a great language for
5 systems programming. One key to success in this domain is Python's
6 very serviceable :mod:`socket` module and its :class:`socket.socket`
7 type.
8
9 The ``socketutils`` module provides natural next steps to the ``socket``
10 builtin: straightforward, tested building blocks for higher-level
11 protocols.
12
13 The :class:`BufferedSocket` wraps an ordinary socket, providing a
14 layer of intuitive buffering for both sending and receiving. This
15 facilitates parsing messages from streams, i.e., all sockets with type
16 ``SOCK_STREAM``. The BufferedSocket enables receiving until the next
17 relevant token, up to a certain size, or until the connection is
18 closed. For all of these, it provides consistent APIs to size
19 limiting, as well as timeouts that are compatible with multiple
20 concurrency paradigms. Use it to parse the next one-off text or binary
21 socket protocol you encounter.
22
23 This module also provides the :class:`NetstringSocket`, a pure-Python
24 implementation of `the Netstring protocol`_, built on top of the
25 :class:`BufferedSocket`, serving as a ready-made, production-grade example.
26
27 Special thanks to `Kurt Rose`_ for his original authorship and all his
28 contributions on this module. Also thanks to `Daniel J. Bernstein`_, the
29 original author of `Netstring`_.
30
31 .. _the Netstring protocol: https://en.wikipedia.org/wiki/Netstring
32 .. _Kurt Rose: https://github.com/doublereedkurt
33 .. _Daniel J. Bernstein: https://cr.yp.to/
34 .. _Netstring: https://cr.yp.to/proto/netstrings.txt
35
36 """
37
38 import time
39 import socket
40
41 try:
42 from threading import RLock
43 except Exception:
44 class RLock(object):
45 'Dummy reentrant lock for builds without threads'
46 def __enter__(self):
47 pass
48
49 def __exit__(self, exctype, excinst, exctb):
50 pass
51
52
53 try:
54 from typeutils import make_sentinel
55 _UNSET = make_sentinel(var_name='_UNSET')
56 except ImportError:
57 _UNSET = object()
58
59
60 DEFAULT_TIMEOUT = 10 # 10 seconds
61 DEFAULT_MAXSIZE = 32 * 1024 # 32kb
62 _RECV_LARGE_MAXSIZE = 1024 ** 5 # 1PB
63
64
65 class BufferedSocket(object):
66 """Mainly provides recv_until and recv_size. recv, send, sendall, and
67 peek all function as similarly as possible to the built-in socket
68 API.
69
70 This type has been tested against both the built-in socket type as
71 well as those from gevent and eventlet. It also features support
72 for sockets with timeouts set to 0 (aka nonblocking), provided the
73 caller is prepared to handle the EWOULDBLOCK exceptions.
74
75 Args:
76 sock (socket): The connected socket to be wrapped.
77 timeout (float): The default timeout for sends and recvs, in
78 seconds. Set to ``None`` for no timeout, and 0 for
79 nonblocking. Defaults to *sock*'s own timeout if already set,
80 and 10 seconds otherwise.
81 maxsize (int): The default maximum number of bytes to be received
82 into the buffer before it is considered full and raises an
83 exception. Defaults to 32 kilobytes.
84 recvsize (int): The number of bytes to recv for every
85 lower-level :meth:`socket.recv` call. Defaults to *maxsize*.
86
87 *timeout* and *maxsize* can both be overridden on individual socket
88 operations.
89
90 All ``recv`` methods return bytestrings (:class:`bytes`) and can
91 raise :exc:`socket.error`. :exc:`Timeout`,
92 :exc:`ConnectionClosed`, and :exc:`MessageTooLong` all inherit
93 from :exc:`socket.error` and exist to provide better error
94 messages. Received bytes are always buffered, even if an exception
95 is raised. Use :meth:`BufferedSocket.getrecvbuffer` to retrieve
96 partial recvs.
97
98 BufferedSocket does not replace the built-in socket by any
99 means. While the overlapping parts of the API are kept parallel to
100 the built-in :class:`socket.socket`, BufferedSocket does not
101 inherit from socket, and most socket functionality is only
102 available on the underlying socket. :meth:`socket.getpeername`,
103 :meth:`socket.getsockname`, :meth:`socket.fileno`, and others are
104 only available on the underlying socket that is wrapped. Use the
105 ``BufferedSocket.sock`` attribute to access it. See the examples
106 for more information on how to use BufferedSockets with built-in
107 sockets.
108
109 The BufferedSocket is threadsafe, but consider the semantics of
110 your protocol before accessing a single socket from multiple
111 threads. Similarly, once the BufferedSocket is constructed, avoid
112 using the underlying socket directly. Only use it for operations
113 unrelated to messages, e.g., :meth:`socket.getpeername`.
114
115 """
116 def __init__(self, sock, timeout=_UNSET,
117 maxsize=DEFAULT_MAXSIZE, recvsize=_UNSET):
118 self.sock = sock
119 self.rbuf = b''
120 self.sbuf = []
121 self.maxsize = int(maxsize)
122
123 if timeout is _UNSET:
124 if self.sock.gettimeout() is None:
125 self.timeout = DEFAULT_TIMEOUT
126 else:
127 self.timeout = self.sock.gettimeout()
128 else:
129 if timeout is None:
130 self.timeout = timeout
131 else:
132 self.timeout = float(timeout)
133
134 if recvsize is _UNSET:
135 self._recvsize = self.maxsize
136 else:
137 self._recvsize = int(recvsize)
138
139 self._send_lock = RLock()
140 self._recv_lock = RLock()
141
142 def settimeout(self, timeout):
143 "Set the default *timeout* for future operations, in seconds."
144 self.timeout = timeout
145
146 def gettimeout(self):
147 return self.timeout
148
149 def setblocking(self, blocking):
150 self.timeout = None if blocking else 0.0
151
152 def setmaxsize(self, maxsize):
153 """Set the default maximum buffer size *maxsize* for future
154 operations, in bytes. Does not truncate the current buffer.
155 """
156 self.maxsize = maxsize
157
158 def getrecvbuffer(self):
159 "Returns the receive buffer bytestring (rbuf)."
160 with self._recv_lock:
161 return self.rbuf
162
163 def getsendbuffer(self):
164 "Returns a copy of the send buffer list."
165 with self._send_lock:
166 return b''.join(self.sbuf)
167
168 def recv(self, size, flags=0, timeout=_UNSET):
169 """Returns **up to** *size* bytes, using the internal buffer before
170 performing a single :meth:`socket.recv` operation.
171
172 Args:
173 size (int): The maximum number of bytes to receive.
174 flags (int): Kept for API compatibility with sockets. Only
175 the default, ``0``, is valid.
176 timeout (float): The timeout for this operation. Can be
177 ``0`` for nonblocking and ``None`` for no
178 timeout. Defaults to the value set in the constructor
179 of BufferedSocket.
180
181 If the operation does not complete in *timeout* seconds, a
182 :exc:`Timeout` is raised. Much like the built-in
183 :class:`socket.socket`, if this method returns an empty string,
184 then the socket is closed and recv buffer is empty. Further
185 calls to recv will raise :exc:`socket.error`.
186
187 """
188 with self._recv_lock:
189 if timeout is _UNSET:
190 timeout = self.timeout
191 if flags:
192 raise ValueError("non-zero flags not supported: %r" % flags)
193 if len(self.rbuf) >= size:
194 data, self.rbuf = self.rbuf[:size], self.rbuf[size:]
195 return data
196 if self.rbuf:
197 ret, self.rbuf = self.rbuf, b''
198 return ret
199 self.sock.settimeout(timeout)
200 try:
201 data = self.sock.recv(self._recvsize)
202 except socket.timeout:
203 raise Timeout(timeout) # check the rbuf attr for more
204 if len(data) > size:
205 data, self.rbuf = data[:size], data[size:]
206 return data
207
208 def peek(self, size, timeout=_UNSET):
209 """Returns *size* bytes from the socket and/or internal buffer. Bytes
210 are retained in BufferedSocket's internal recv buffer. To only
211 see bytes in the recv buffer, use :meth:`getrecvbuffer`.
212
213 Args:
214 size (int): The exact number of bytes to peek at
215 timeout (float): The timeout for this operation. Can be 0 for
216 nonblocking and None for no timeout. Defaults to the value
217 set in the constructor of BufferedSocket.
218
219 If the appropriate number of bytes cannot be fetched from the
220 buffer and socket before *timeout* expires, then a
221 :exc:`Timeout` will be raised. If the connection is closed, a
222 :exc:`ConnectionClosed` will be raised.
223 """
224 with self._recv_lock:
225 if len(self.rbuf) >= size:
226 return self.rbuf[:size]
227 data = self.recv_size(size, timeout=timeout)
228 self.rbuf = data + self.rbuf
229 return data
230
231 def recv_close(self, timeout=_UNSET, maxsize=_UNSET):
232 """Receive until the connection is closed, up to *maxsize* bytes. If
233 more than *maxsize* bytes are received, raises :exc:`MessageTooLong`.
234 """
235 # recv_close works by using recv_size to request maxsize data,
236 # and ignoring ConnectionClose, returning and clearing the
237 # internal buffer instead. It raises an exception if
238 # ConnectionClosed isn't raised.
239 with self._recv_lock:
240 if maxsize is _UNSET:
241 maxsize = self.maxsize
242 if maxsize is None:
243 maxsize = _RECV_LARGE_MAXSIZE
244 try:
245 recvd = self.recv_size(maxsize + 1, timeout)
246 except ConnectionClosed:
247 ret, self.rbuf = self.rbuf, b''
248 else:
249 # put extra received bytes (now in rbuf) after recvd
250 self.rbuf = recvd + self.rbuf
251 size_read = min(maxsize, len(self.rbuf))
252 raise MessageTooLong(size_read) # check receive buffer
253 return ret
254
255 def recv_until(self, delimiter, timeout=_UNSET, maxsize=_UNSET,
256 with_delimiter=False):
257 """Receive until *delimiter* is found, *maxsize* bytes have been read,
258 or *timeout* is exceeded.
259
260 Args:
261 delimiter (bytes): One or more bytes to be searched for
262 in the socket stream.
263 timeout (float): The timeout for this operation. Can be 0 for
264 nonblocking and None for no timeout. Defaults to the value
265 set in the constructor of BufferedSocket.
266 maxsize (int): The maximum size for the internal buffer.
267 Defaults to the value set in the constructor.
268 with_delimiter (bool): Whether or not to include the
269 delimiter in the output. ``False`` by default, but
270 ``True`` is useful in cases where one is simply
271 forwarding the messages.
272
273 ``recv_until`` will raise the following exceptions:
274
275 * :exc:`Timeout` if more than *timeout* seconds expire.
276 * :exc:`ConnectionClosed` if the underlying socket is closed
277 by the sending end.
278 * :exc:`MessageTooLong` if the delimiter is not found in the
279 first *maxsize* bytes.
280 * :exc:`socket.error` if operating in nonblocking mode
281 (*timeout* equal to 0), or if some unexpected socket error
282 occurs, such as operating on a closed socket.
283
284 """
285 with self._recv_lock:
286 if maxsize is _UNSET:
287 maxsize = self.maxsize
288 if maxsize is None:
289 maxsize = _RECV_LARGE_MAXSIZE
290 if timeout is _UNSET:
291 timeout = self.timeout
292 len_delimiter = len(delimiter)
293
294 sock = self.sock
295 recvd = bytearray(self.rbuf)
296 start = time.time()
297 find_offset_start = 0 # becomes a negative index below
298
299 if not timeout: # covers None (no timeout) and 0 (nonblocking)
300 sock.settimeout(timeout)
301 try:
302 while 1:
303 offset = recvd.find(delimiter, find_offset_start, maxsize)
304 if offset != -1: # str.find returns -1 when no match found
305 if with_delimiter: # include delimiter in return
306 offset += len_delimiter
307 rbuf_offset = offset
308 else:
309 rbuf_offset = offset + len_delimiter
310 break
311 elif len(recvd) > maxsize:
312 raise MessageTooLong(maxsize, delimiter) # see rbuf
313 if timeout:
314 cur_timeout = timeout - (time.time() - start)
315 if cur_timeout <= 0.0:
316 raise socket.timeout()
317 sock.settimeout(cur_timeout)
318 nxt = sock.recv(self._recvsize)
319 if not nxt:
320 args = (len(recvd), delimiter)
321 msg = ('connection closed after reading %s bytes'
322 ' without finding symbol: %r' % args)
323 raise ConnectionClosed(msg) # check the recv buffer
324 recvd.extend(nxt)
325 find_offset_start = -len(nxt) - len_delimiter + 1
326 except socket.timeout:
327 self.rbuf = bytes(recvd)
328 msg = ('read %s bytes without finding delimiter: %r'
329 % (len(recvd), delimiter))
330 raise Timeout(timeout, msg) # check the recv buffer
331 except Exception:
332 self.rbuf = bytes(recvd)
333 raise
334 val, self.rbuf = bytes(recvd[:offset]), bytes(recvd[rbuf_offset:])
335 return val
336
337 def recv_size(self, size, timeout=_UNSET):
338 """Read off of the internal buffer, then off the socket, until
339 *size* bytes have been read.
340
341 Args:
342 size (int): number of bytes to read before returning.
343 timeout (float): The timeout for this operation. Can be 0 for
344 nonblocking and None for no timeout. Defaults to the value
345 set in the constructor of BufferedSocket.
346
347 If the appropriate number of bytes cannot be fetched from the
348 buffer and socket before *timeout* expires, then a
349 :exc:`Timeout` will be raised. If the connection is closed, a
350 :exc:`ConnectionClosed` will be raised.
351 """
352 with self._recv_lock:
353 if timeout is _UNSET:
354 timeout = self.timeout
355 chunks = []
356 total_bytes = 0
357 try:
358 start = time.time()
359 self.sock.settimeout(timeout)
360 nxt = self.rbuf or self.sock.recv(self._recvsize)
361 while nxt:
362 total_bytes += len(nxt)
363 if total_bytes >= size:
364 break
365 chunks.append(nxt)
366 if timeout:
367 cur_timeout = timeout - (time.time() - start)
368 if cur_timeout <= 0.0:
369 raise socket.timeout()
370 self.sock.settimeout(cur_timeout)
371 nxt = self.sock.recv(self._recvsize)
372 else:
373 msg = ('connection closed after reading %s of %s requested'
374 ' bytes' % (total_bytes, size))
375 raise ConnectionClosed(msg) # check recv buffer
376 except socket.timeout:
377 self.rbuf = b''.join(chunks)
378 msg = 'read %s of %s bytes' % (total_bytes, size)
379 raise Timeout(timeout, msg) # check recv buffer
380 except Exception:
381 # received data is still buffered in the case of errors
382 self.rbuf = b''.join(chunks)
383 raise
384 extra_bytes = total_bytes - size
385 if extra_bytes:
386 last, self.rbuf = nxt[:-extra_bytes], nxt[-extra_bytes:]
387 else:
388 last, self.rbuf = nxt, b''
389 chunks.append(last)
390 return b''.join(chunks)
391
392 def send(self, data, flags=0, timeout=_UNSET):
393 """Send the contents of the internal send buffer, as well as *data*,
394 to the receiving end of the connection. Returns the total
395 number of bytes sent. If no exception is raised, all of *data* was
396 sent and the internal send buffer is empty.
397
398 Args:
399 data (bytes): The bytes to send.
400 flags (int): Kept for API compatibility with sockets. Only
401 the default 0 is valid.
402 timeout (float): The timeout for this operation. Can be 0 for
403 nonblocking and None for no timeout. Defaults to the value
404 set in the constructor of BufferedSocket.
405
406 Will raise :exc:`Timeout` if the send operation fails to
407 complete before *timeout*. In the event of an exception, use
408 :meth:`BufferedSocket.getsendbuffer` to see which data was
409 unsent.
410 """
411 with self._send_lock:
412 if timeout is _UNSET:
413 timeout = self.timeout
414 if flags:
415 raise ValueError("non-zero flags not supported")
416 sbuf = self.sbuf
417 sbuf.append(data)
418 if len(sbuf) > 1:
419 sbuf[:] = [b''.join([s for s in sbuf if s])]
420 self.sock.settimeout(timeout)
421 start, total_sent = time.time(), 0
422 try:
423 while sbuf[0]:
424 sent = self.sock.send(sbuf[0])
425 total_sent += sent
426 sbuf[0] = sbuf[0][sent:]
427 if timeout:
428 cur_timeout = timeout - (time.time() - start)
429 if cur_timeout <= 0.0:
430 raise socket.timeout()
431 self.sock.settimeout(cur_timeout)
432 except socket.timeout:
433 raise Timeout(timeout, '%s bytes unsent' % len(sbuf[0]))
434 return total_sent
435
436 def sendall(self, data, flags=0, timeout=_UNSET):
437 """A passthrough to :meth:`~BufferedSocket.send`, retained for
438 parallelism to the :class:`socket.socket` API.
439 """
440 return self.send(data, flags, timeout)
441
442 def flush(self):
443 "Send the contents of the internal send buffer."
444 with self._send_lock:
445 self.send(b'')
446 return
447
448 def buffer(self, data):
449 "Buffer *data* bytes for the next send operation."
450 with self._send_lock:
451 self.sbuf.append(data)
452 return
453
454 # # #
455 # # # Passing through some socket basics
456 # # #
457
458 def getsockname(self):
459 """Convenience function to return the wrapped socket's own address.
460 See :meth:`socket.getsockname` for more details.
461 """
462 return self.sock.getsockname()
463
464 def getpeername(self):
465 """Convenience function to return the remote address to which the
466 wrapped socket is connected. See :meth:`socket.getpeername`
467 for more details.
468 """
469 return self.sock.getpeername()
470
471 def getsockopt(self, level, optname, buflen=None):
472 """Convenience function passing through to the wrapped socket's
473 :meth:`socket.getsockopt`.
474 """
475 args = (level, optname)
476 if buflen is not None:
477 args += (buflen,)
478 return self.sock.getsockopt(*args)
479
480 def setsockopt(self, level, optname, value):
481 """Convenience function passing through to the wrapped socket's
482 :meth:`socket.setsockopt`.
483 """
484 return self.sock.setsockopt(level, optname, value)
485
486 @property
487 def type(self):
488 """A passthrough to the wrapped socket's type. Valid usages should
489 only ever see :data:`socket.SOCK_STREAM`.
490 """
491 return self.sock.type
492
493 @property
494 def family(self):
495 """A passthrough to the wrapped socket's family. BufferedSocket
496 supports all widely-used families, so this read-only attribute
497 can be one of :data:`socket.AF_INET` for IP,
498 :data:`socket.AF_INET6` for IPv6, and :data:`socket.AF_UNIX`
499 for UDS.
500 """
501 return self.sock.family
502
503 @property
504 def proto(self):
505 """A passthrough to the wrapped socket's protocol. The ``proto``
506 attribute is very rarely used, so it's always 0, meaning "the
507 default" protocol. Pretty much all the practical information
508 is in :attr:`~BufferedSocket.type` and
509 :attr:`~BufferedSocket.family`, so you can go back to never
510 thinking about this.
511 """
512 return self.sock.proto
513
514 # # #
515 # # # Now for some more advanced interpretations of the builtin socket
516 # # #
517
518 def fileno(self):
519 """Returns the file descriptor of the wrapped socket. -1 if it has
520 been closed on this end.
521
522 Note that this makes the BufferedSocket selectable, i.e.,
523 usable for operating system event loops without any external
524 libraries. Keep in mind that the operating system cannot know
525 about data in BufferedSocket's internal buffer. Exercise
526 discipline with calling ``recv*`` functions.
527 """
528 return self.sock.fileno()
529
530 def close(self):
531 """Closes the wrapped socket, and empties the internal buffers. The
532 send buffer is not flushed automatically, so if you have been
533 calling :meth:`~BufferedSocket.buffer`, be sure to call
534 :meth:`~BufferedSocket.flush` before calling this
535 method. After calling this method, future socket operations
536 will raise :exc:`socket.error`.
537 """
538 with self._recv_lock:
539 with self._send_lock:
540 self.rbuf = b''
541 self.rbuf_unconsumed = self.rbuf
542 self.sbuf[:] = []
543 self.sock.close()
544 return
545
546 def shutdown(self, how):
547 """Convenience method which passes through to the wrapped socket's
548 :meth:`~socket.shutdown`. Semantics vary by platform, so no
549 special internal handling is done with the buffers. This
550 method exists to facilitate the most common usage, wherein a
551 full ``shutdown`` is followed by a
552 :meth:`~BufferedSocket.close`. Developers requiring more
553 support, please open `an issue`_.
554
555 .. _an issue: https://github.com/mahmoud/boltons/issues
556 """
557 with self._recv_lock:
558 with self._send_lock:
559 self.sock.shutdown(how)
560 return
561
562 # end BufferedSocket
563
564
565 class Error(socket.error):
566 """A subclass of :exc:`socket.error` from which all other
567 ``socketutils`` exceptions inherit.
568
569 When using :class:`BufferedSocket` and other ``socketutils``
570 types, generally you want to catch one of the specific exception
571 types below, or :exc:`socket.error`.
572 """
573 pass
574
575
576 class ConnectionClosed(Error):
577 """Raised when receiving and the connection is unexpectedly closed
578 from the sending end. Raised from :class:`BufferedSocket`'s
579 :meth:`~BufferedSocket.peek`, :meth:`~BufferedSocket.recv_until`,
580 and :meth:`~BufferedSocket.recv_size`, and never from its
581 :meth:`~BufferedSocket.recv` or
582 :meth:`~BufferedSocket.recv_close`.
583 """
584 pass
585
586
587 class MessageTooLong(Error):
588 """Raised from :meth:`BufferedSocket.recv_until` and
589 :meth:`BufferedSocket.recv_closed` when more than *maxsize* bytes are
590 read without encountering the delimiter or a closed connection,
591 respectively.
592 """
593 def __init__(self, bytes_read=None, delimiter=None):
594 msg = 'message exceeded maximum size'
595 if bytes_read is not None:
596 msg += '. %s bytes read' % (bytes_read,)
597 if delimiter is not None:
598 msg += '. Delimiter not found: %r' % (delimiter,)
599 super(MessageTooLong, self).__init__(msg)
600
601
602 class Timeout(socket.timeout, Error):
603 """Inheriting from :exc:`socket.timeout`, Timeout is used to indicate
604 when a socket operation did not complete within the time
605 specified. Raised from any of :class:`BufferedSocket`'s ``recv``
606 methods.
607 """
608 def __init__(self, timeout, extra=""):
609 msg = 'socket operation timed out'
610 if timeout is not None:
611 msg += ' after %sms.' % (timeout * 1000)
612 if extra:
613 msg += ' ' + extra
614 super(Timeout, self).__init__(msg)
615
616
617 class NetstringSocket(object):
618 """
619 Reads and writes using the netstring protocol.
620
621 More info: https://en.wikipedia.org/wiki/Netstring
622 Even more info: http://cr.yp.to/proto/netstrings.txt
623 """
624 def __init__(self, sock, timeout=DEFAULT_TIMEOUT, maxsize=DEFAULT_MAXSIZE):
625 self.bsock = BufferedSocket(sock)
626 self.timeout = timeout
627 self.maxsize = maxsize
628 self._msgsize_maxsize = len(str(maxsize)) + 1 # len(str()) == log10
629
630 def fileno(self):
631 return self.bsock.fileno()
632
633 def settimeout(self, timeout):
634 self.timeout = timeout
635
636 def setmaxsize(self, maxsize):
637 self.maxsize = maxsize
638 self._msgsize_maxsize = self._calc_msgsize_maxsize(maxsize)
639
640 def _calc_msgsize_maxsize(self, maxsize):
641 return len(str(maxsize)) + 1 # len(str()) == log10
642
643 def read_ns(self, timeout=_UNSET, maxsize=_UNSET):
644 if timeout is _UNSET:
645 timeout = self.timeout
646
647 if maxsize is _UNSET:
648 maxsize = self.maxsize
649 msgsize_maxsize = self._msgsize_maxsize
650 else:
651 msgsize_maxsize = self._calc_msgsize_maxsize(maxsize)
652
653 size_prefix = self.bsock.recv_until(b':',
654 timeout=timeout,
655 maxsize=msgsize_maxsize)
656 try:
657 size = int(size_prefix)
658 except ValueError:
659 raise NetstringInvalidSize('netstring message size must be valid'
660 ' integer, not %r' % size_prefix)
661
662 if size > maxsize:
663 raise NetstringMessageTooLong(size, maxsize)
664 payload = self.bsock.recv_size(size)
665 if self.bsock.recv(1) != b',':
666 raise NetstringProtocolError("expected trailing ',' after message")
667
668 return payload
669
670 def write_ns(self, payload):
671 size = len(payload)
672 if size > self.maxsize:
673 raise NetstringMessageTooLong(size, self.maxsize)
674 data = str(size).encode('ascii') + b':' + payload + b','
675 self.bsock.send(data)
676
677
678 class NetstringProtocolError(Error):
679 "Base class for all of socketutils' Netstring exception types."
680 pass
681
682
683 class NetstringInvalidSize(NetstringProtocolError):
684 """NetstringInvalidSize is raised when the ``:``-delimited size prefix
685 of the message does not contain a valid integer.
686
687 Message showing valid size::
688
689 5:hello,
690
691 Here the ``5`` is the size. Anything in this prefix position that
692 is not parsable as a Python integer (i.e., :class:`int`) will raise
693 this exception.
694 """
695 def __init__(self, msg):
696 super(NetstringInvalidSize, self).__init__(msg)
697
698
699 class NetstringMessageTooLong(NetstringProtocolError):
700 """NetstringMessageTooLong is raised when the size prefix contains a
701 valid integer, but that integer is larger than the
702 :class:`NetstringSocket`'s configured *maxsize*.
703
704 When this exception is raised, it's recommended to simply close
705 the connection instead of trying to recover.
706 """
707 def __init__(self, size, maxsize):
708 msg = ('netstring message length exceeds configured maxsize: %s > %s'
709 % (size, maxsize))
710 super(NetstringMessageTooLong, self).__init__(msg)
711
712
713 """
714 attrs worth adding/passing through:
715
716
717 properties: type, proto
718
719 For its main functionality, BufferedSocket can wrap any object that
720 has the following methods:
721
722 - gettimeout()
723 - settimeout()
724 - recv(size)
725 - send(data)
726
727 The following methods are passed through:
728
729 ...
730
731 """
732
733 # TODO: buffered socket check socket.type == SOCK_STREAM?
734 # TODO: make recv_until support taking a regex
735 # TODO: including the delimiter in the recv_until return is not
736 # necessary, as ConnectionClosed differentiates empty messages
737 # from socket closes.