Source code for support.buffered_socket

import gevent


_UNSET = object()


[docs]class BufferedSocket(object): def __init__(self, sock, timeout=10, maxbytes=32 * 1024): self.sock = sock self.sock.settimeout(None) self.rbuf = "" self.sbuf = [] self.timeout = timeout self.maxbytes = maxbytes
[docs] def settimeout(self, timeout): self.timeout = timeout
[docs] def setmaxbytes(self, maxbytes): self.maxbytes = maxbytes
[docs] def recv(self, size, flags=0, timeout=_UNSET): if timeout is _UNSET: timeout = self.timeout if flags: raise ValueError("flags not supported") if len(self.rbuf) >= size: data, self.rbuf = self.rbuf[:size], self.rbuf[size:] return data size -= len(self.rbuf) with gevent.Timeout(timeout, Timeout(timeout)): data = self.rbuf + self.sock.recv(size) # don't empty buffer till after network communication is complete, # to avoid data loss on transient / retry-able errors (e.g. read # timeout) self.rbuf = "" return data
[docs] def peek(self, n, timeout=None): 'peek n bytes from the socket, but keep them in the buffer' if len(self.rbuf) >= n: return self.rbuf[:n] data = self.recv_all(n, timeout=timeout) self.rbuf = data + self.rbuf return data
[docs] def recv_until(self, marker, timeout=_UNSET, maxbytes=_UNSET): 'read off of socket until the marker is found' if maxbytes is _UNSET: maxbytes = self.maxbytes if timeout is _UNSET: timeout = self.timeout chunks = [] recvd = 0 nxt = '' try: with gevent.Timeout(timeout, False): nxt = self.rbuf or self.sock.recv(32 * 1024) while nxt and marker not in nxt: chunks.append(nxt) recvd += len(nxt) if maxbytes is not None and recvd > maxbytes: raise NotFound(marker, recvd) nxt = self.sock.recv(32 * 1024) if not nxt: raise ConnectionClosed( 'connection closed after reading {0} bytes without' ' finding symbol {1}'.format(recvd, marker)) if marker not in nxt: raise Timeout( timeout, 'read {0} bytes without finding symbol {1}'.format( recvd, marker)) except: # in case of error, retain data read so far in buffer self.rbuf = ''.join(chunks) raise val, _, self.rbuf = nxt.partition(marker) return ''.join(chunks) + val
[docs] def recv_all(self, size, timeout=_UNSET): 'read off of socket until size bytes have been read' if timeout is _UNSET: timeout = self.timeout chunks = [] total_bytes = 0 try: with gevent.Timeout(timeout, False): nxt = self.rbuf or self.sock.recv(size) while nxt: total_bytes += len(nxt) if total_bytes >= size: break chunks.append(nxt) nxt = self.sock.recv(size - total_bytes) else: raise ConnectionClosed( 'connection was closed after reading' ' {0} of {1} bytes'.format(total_bytes, size)) if total_bytes < size: raise Timeout( timeout, 'read {0} of {1} bytes'.format(total_bytes, size)) except: # in case of error, retain data read so far in buffer self.rbuf = ''.join(chunks) raise extra_bytes = total_bytes - size if extra_bytes: last, self.rbuf = nxt[:-extra_bytes], nxt[-extra_bytes:] else: last, self.rbuf = nxt, "" chunks.append(last) return ''.join(chunks)
[docs] def send(self, data, flags=0, timeout=_UNSET): if timeout is _UNSET: timeout = self.timeout if flags: raise ValueError("flags not supported") self.sbuf = [''.join(self.sbuf) + data] with gevent.Timeout(timeout, False): while self.sbuf[0]: sent = self.sock.send(data) self.sbuf[0] = self.sbuf[0][sent:] if self.sbuf[0]: raise Timeout( timeout, "{0} bytes unsent".format(len(self.sbuf[0])))
sendall = send
[docs] def flush(self): self.send('')
[docs] def buffer(self, data): self.sbuf.append(data)
[docs] def close(self): self.sock.close()
[docs] def shutdown(self, how): self.sock.shutdown(how)
[docs]class Error(Exception): pass
[docs]class ConnectionClosed(Error): pass
[docs]class Timeout(Error): def __init__(self, timeout, extra=""): if timeout is None: super(Timeout, self).__init__('timed out ' + extra) else: super(Timeout, self).__init__( 'timed out after {0}ms '.format(timeout * 1e3) + extra)
[docs]class NotFound(Error): def __init__(self, symbol, bytes_read): super(NotFound, self).__init__( 'read {1} bytes without finding symbol {0}'.format( symbol, bytes_read))