From 8fc0a9f3c4fed6523f2f0c823f603d6757bb1bc5 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Mon, 26 Oct 2015 18:30:18 +0530 Subject: [PATCH] Make the EchoHandler more efficient by implementing an interface to send individual fragments instead of only complete messages. --- src/calibre/srv/tests/web_sockets.py | 110 ++++++++++----------------- src/calibre/srv/web_socket.py | 33 +++++--- 2 files changed, 61 insertions(+), 82 deletions(-) diff --git a/src/calibre/srv/tests/web_sockets.py b/src/calibre/srv/tests/web_sockets.py index 703fa65d1a..c06d19d520 100644 --- a/src/calibre/srv/tests/web_sockets.py +++ b/src/calibre/srv/tests/web_sockets.py @@ -13,7 +13,7 @@ from hashlib import sha1 from calibre.srv.tests.base import BaseTest, TestServer from calibre.srv.web_socket import ( GUID_STR, BINARY, TEXT, MessageWriter, create_frame, CLOSE, NORMAL_CLOSE, - PING, PONG, PROTOCOL_ERROR, CONTINUATION, INCONSISTENT_DATA) + PING, PONG, PROTOCOL_ERROR, CONTINUATION, INCONSISTENT_DATA, CONTROL_CODES) from calibre.utils.monotonic import monotonic from calibre.utils.socket_inheritance import set_socket_inherit @@ -80,13 +80,16 @@ class WSClient(object): while len(ans) < size: d = self.recv(size - len(ans)) if not d: - raise ValueError('Connection to server closed, no data received') + return None ans += d return ans def read_frame(self): - b1, b2 = bytearray(self.read_size(2)) - fin = b1 & 0b10000000 + x = self.read_size(2) + if x is None: + return None + b1, b2 = bytearray(x) + fin = bool(b1 & 0b10000000) opcode = b1 & 0b1111 masked = b2 & 0b10000000 if masked: @@ -98,24 +101,26 @@ class WSClient(object): payload_length = struct.unpack(b'!Q', self.read_size(8))[0] return Frame(fin, opcode, self.read_size(payload_length)) - def read_message(self): - frames = [] + def read_messages(self): + messages, control_frames = [], [] + msg_buf, opcode = [], None while True: frame = self.read_frame() - frames.append(frame) - if frame.fin: + if frame is None or frame.payload is None: break - ans, opcode = [], None - for frame in frames: - if frame is frames[0]: - opcode = frame.opcode - if frame.fin == 0 and frame.opcode not in (BINARY, TEXT): - raise ValueError('Server sent a start frame with fin=0 and bad opcode') - ans.append(frame.payload) - ans = b''.join(ans) - if opcode == TEXT: - ans = ans.decode('utf-8') - return opcode, ans + if frame.opcode in CONTROL_CODES: + control_frames.append((frame.opcode, frame.payload)) + else: + if opcode is None: + opcode = frame.opcode + msg_buf.append(frame.payload) + if frame.fin: + data = b''.join(msg_buf) + if opcode == TEXT: + data = data.decode('utf-8', 'replace') + messages.append((opcode, data)) + msg_buf, opcode = [], None + return messages, control_frames def write_message(self, msg, chunk_size=None): if isinstance(msg, tuple): @@ -140,47 +145,9 @@ class WSClient(object): self.write_frame(1, CLOSE, struct.pack(b'!H', code) + reason) -class TestHandler(object): - - def __init__(self): - self.connections = {} - self.connection_state = {} - - def conn(self, cid): - ans = self.connections.get(cid) - if ans is not None: - ans = ans() - return ans - - def handle_websocket_upgrade(self, connection_id, connection_ref, inheaders): - self.connections[connection_id] = connection_ref - - def handle_websocket_data(self, data, message_starting, message_finished, connection_id): - pass - - def handle_websocket_close(self, connection_id): - self.connections.pop(connection_id, None) - -class EchoHandler(TestHandler): - - def __init__(self): - TestHandler.__init__(self) - self.msg_buf = [] - - def handle_websocket_data(self, data, message_starting, message_finished, connection_id): - if message_starting: - self.msg_buf = [] - self.msg_buf.append(data) - if message_finished: - j = '' if isinstance(self.msg_buf[0], type('')) else b'' - msg = j.join(self.msg_buf) - self.msg_buf = [] - self.conn(connection_id).send_websocket_message(msg, wakeup=False) - - class WSTestServer(TestServer): - def __init__(self, handler=TestHandler): + def __init__(self, handler): TestServer.__init__(self, None) from calibre.srv.http_response import create_http_handler self.loop.handler = create_http_handler(websocket_handler=handler()) @@ -201,8 +168,8 @@ class WebSocketTest(BaseTest): client.write_frame(**msg) else: client.write_message(msg) - ordered = not isinstance(expected, (set, frozenset)) - pexpected, replies = set(), set() + + expected_messages, expected_controls = [], [] for ex in expected: if isinstance(ex, type('')): ex = TEXT, ex @@ -210,20 +177,22 @@ class WebSocketTest(BaseTest): ex = BINARY, ex elif isinstance(ex, int): ex = ex, b'' - if ordered: - self.ae(ex, client.read_message()) + if ex[0] in CONTROL_CODES: + expected_controls.append(ex) else: - pexpected.add(ex), replies.add(client.read_message()) - if not ordered: - self.ae(pexpected, replies) + expected_messages.append(ex) if send_close: client.write_close(close_code, close_reason) - opcode, data = client.read_message() - self.ae(opcode, CLOSE) - self.ae(close_code, struct.unpack_from(b'!H', data, 0)[0]) + messages, control_frames = client.read_messages() + self.ae(expected_messages, messages) + self.assertGreaterEqual(len(control_frames), 1) + self.ae(expected_controls, control_frames[:-1]) + self.ae(control_frames[-1][0], CLOSE) + self.ae(close_code, struct.unpack_from(b'!H', control_frames[-1][1], 0)[0]) def test_websocket_basic(self): 'Test basic interaction with the websocket server' + from calibre.srv.web_socket import EchoHandler with WSTestServer(EchoHandler) as server: simple_test = partial(self.simple_test, server) @@ -284,7 +253,7 @@ class WebSocketTest(BaseTest): simple_test([ {'opcode':TEXT, 'payload':fragments[0], 'fin':0}, (PING, b'pong'), {'opcode':CONTINUATION, 'payload':fragments[1]} - ], {(PONG, b'pong'), ''.join(fragments)}) + ], [(PONG, b'pong'), ''.join(fragments)]) fragments = '12345' simple_test([ @@ -293,7 +262,7 @@ class WebSocketTest(BaseTest): {'opcode':CONTINUATION, 'payload':fragments[2], 'fin':0}, {'opcode':CONTINUATION, 'payload':fragments[3], 'fin':0}, (PING, b'2'), {'opcode':CONTINUATION, 'payload':fragments[4]} - ], {(PONG, b'1'), (PONG, b'2'), fragments}) + ], [(PONG, b'1'), (PONG, b'2'), fragments]) simple_test([ {'opcode':TEXT, 'fin':0}, {'opcode':CONTINUATION, 'fin':0}, {'opcode':CONTINUATION},], ['']) @@ -320,6 +289,7 @@ class WebSocketTest(BaseTest): simple_test([(CLOSE, struct.pack(b'!H', code))], send_close=False, close_code=PROTOCOL_ERROR) def test_websocket_perf(self): + from calibre.srv.web_socket import EchoHandler with WSTestServer(EchoHandler) as server: simple_test = partial(self.simple_test, server) for sz in (64, 256, 1024, 4096, 8192, 16384): diff --git a/src/calibre/srv/web_socket.py b/src/calibre/srv/web_socket.py index 8255f9d7e3..66979cf9e0 100644 --- a/src/calibre/srv/web_socket.py +++ b/src/calibre/srv/web_socket.py @@ -226,6 +226,7 @@ conn_id = 0 class WebSocketConnection(HTTPConnection): + # Internal API {{{ in_websocket_mode = False websocket_handler = None @@ -431,14 +432,31 @@ class WebSocketConnection(HTTPConnection): Connection.close(self) else: HTTPConnection.close(self) + # }}} def send_websocket_message(self, buf, wakeup=True): + ''' Send a complete message. This class will take care of splitting it + into appropriate frames automatically. `buf` must be a file like object. ''' self.sendq.put(MessageWriter(buf)) self.wait_for = RDWR if wakeup: self.wakeup() + def send_websocket_frame(self, data, is_first=True, is_last=True): + ''' Useful for streaming handlers that want to break up messages into + frames themselves. Note that these frames will be interleaved with + control frames, so they should not be too large. ''' + opcode = (TEXT if isinstance(data, type('')) else BINARY) if is_first else CONTINUATION + fin = 1 if is_last else 0 + frame = create_frame(fin, opcode, data) + with self.cf_lock: + self.control_frames.append(BytesIO(frame)) + def handle_websocket_data(self, data, message_starting, message_finished): + ''' Called when some data is received from the remote client. In general the + data may not constitute a complete "message", use the message_starting + and message_finished flags to re-assemble it into a complete message in + the handler. ''' self.websocket_handler.handle_websocket_data(data, message_starting, message_finished, self.websocket_connection_id) class DummyHandler(object): @@ -458,10 +476,9 @@ class DummyHandler(object): # Run this file with calibre-debug and use wstest to run the Autobahn test # suite -class EchoClientHandler(object): +class EchoHandler(object): def __init__(self, *args, **kwargs): - self.msg_buf = [] self.ws_connections = {} def conn(self, cid): @@ -474,21 +491,13 @@ class EchoClientHandler(object): self.ws_connections[connection_id] = connection_ref def handle_websocket_data(self, data, message_starting, message_finished, connection_id): - if message_starting: - self.msg_buf = [] - self.msg_buf.append(data) - if message_finished: - j = '' if isinstance(self.msg_buf[0], type('')) else b'' - msg = j.join(self.msg_buf) - self.msg_buf = [] - # print('Received message from client:', reprlib.repr(msg)) - self.conn(connection_id).send_websocket_message(msg) + self.conn(connection_id).send_websocket_frame(data, message_starting, message_finished) def handle_websocket_close(self, connection_id): self.ws_connections.pop(connection_id, None) if __name__ == '__main__': - s = ServerLoop(create_http_handler(websocket_handler=EchoClientHandler())) + s = ServerLoop(create_http_handler(websocket_handler=EchoHandler())) with HandleInterrupt(s.wakeup): s.serve_forever() # }}}