From 689bec8c4685e17e6c3822c210cb3ca05713c4f4 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Wed, 28 Oct 2015 08:25:43 +0530 Subject: [PATCH] Reduce number of mallocs in the WebSocket server --- src/calibre/srv/http_response.py | 4 ++-- src/calibre/srv/utils.py | 34 ++++++++++++++++++++++++++++++++ src/calibre/srv/web_socket.py | 21 ++++++++++---------- 3 files changed, 46 insertions(+), 13 deletions(-) diff --git a/src/calibre/srv/http_response.py b/src/calibre/srv/http_response.py index 4bc0f4cec0..0202b9b879 100644 --- a/src/calibre/srv/http_response.py +++ b/src/calibre/srv/http_response.py @@ -22,7 +22,7 @@ from calibre.srv.http_request import HTTPRequest, read_headers from calibre.srv.sendfile import file_metadata, sendfile_to_socket_async, CannotSendfile, SendfileInterrupted from calibre.srv.utils import ( MultiDict, http_date, HTTP1, HTTP11, socket_errors_socket_closed, - sort_q_values, get_translator_for_lang, Cookie) + sort_q_values, get_translator_for_lang, Cookie, ReadOnlyFileBuffer) from calibre.utils.monotonic import monotonic Range = namedtuple('Range', 'start stop size') @@ -324,7 +324,7 @@ class HTTPConnection(HTTPRequest): limit = end - pos if limit == 0: return True - if self.use_sendfile and not isinstance(buf, BytesIO): + if self.use_sendfile and not isinstance(buf, (BytesIO, ReadOnlyFileBuffer)): try: sent = sendfile_to_socket_async(buf, pos, limit, self.socket) except CannotSendfile: diff --git a/src/calibre/srv/utils.py b/src/calibre/srv/utils.py index 2ac9a500ec..1c4b891e11 100644 --- a/src/calibre/srv/utils.py +++ b/src/calibre/srv/utils.py @@ -432,3 +432,37 @@ class Accumulator(object): # {{{ self.total_length = 0 return ans # }}} + +class ReadOnlyFileBuffer(object): + + ''' A zero copy implementation of a file like object. Uses memoryviews for efficiency. ''' + + def __init__(self, raw): + self.sz, self.mv = len(raw), (raw if isinstance(raw, memoryview) else memoryview(raw)) + self.pos = 0 + + def tell(self): + return self.pos + + def read(self, n=None): + if n is None: + ans = self.mv[self.pos:] + self.pos = self.sz + return ans + ans = self.mv[self.pos:self.pos+n] + self.pos = min(self.pos + n, self.sz) + return ans + + def seek(self, pos, whence=os.SEEK_SET): + if whence == os.SEEK_SET: + self.pos = pos + elif whence == os.SEEK_END: + self.pos = self.sz + pos + else: + self.pos += pos + self.pos = max(0, min(self.pos, self.sz)) + return self.pos + + def getvalue(self): + return self.mv + diff --git a/src/calibre/srv/web_socket.py b/src/calibre/srv/web_socket.py index eaf57b1c6f..eb78c01f8f 100644 --- a/src/calibre/srv/web_socket.py +++ b/src/calibre/srv/web_socket.py @@ -10,7 +10,6 @@ from base64 import standard_b64encode from collections import deque from functools import partial from hashlib import sha1 -from io import BytesIO from Queue import Queue, Empty from threading import Lock @@ -18,7 +17,7 @@ from calibre import as_unicode from calibre.constants import plugins from calibre.srv.loop import ServerLoop, HandleInterrupt, WRITE, READ, RDWR, Connection from calibre.srv.http_response import HTTPConnection, create_http_handler -from calibre.srv.utils import DESIRED_SEND_BUFFER_SIZE +from calibre.srv.utils import DESIRED_SEND_BUFFER_SIZE, ReadOnlyFileBuffer speedup, err = plugins['speedup'] if not speedup: raise RuntimeError('Failed to load speedup module with error: ' + err) @@ -196,9 +195,9 @@ class MessageWriter(object): def __init__(self, buf, mask=None, chunk_size=None): self.buf, self.data_type, self.mask = buf, BINARY, mask if isinstance(buf, type('')): - self.buf, self.data_type = BytesIO(buf.encode('utf-8')), TEXT + self.buf, self.data_type = ReadOnlyFileBuffer(buf.encode('utf-8')), TEXT elif isinstance(buf, bytes): - self.buf = BytesIO(buf) + self.buf = ReadOnlyFileBuffer(buf) buf = self.buf self.chunk_size = chunk_size or SEND_CHUNK_SIZE try: @@ -219,7 +218,7 @@ class MessageWriter(object): fin = 0 if has_more and raw else 1 opcode = 0 if self.first_frame_created else self.data_type self.first_frame_created, self.exhausted = True, bool(fin) - return BytesIO(create_frame(fin, opcode, raw, self.mask)) + return ReadOnlyFileBuffer(create_frame(fin, opcode, raw, self.mask)) # }}} conn_id = 0 @@ -277,7 +276,7 @@ class WebSocketConnection(HTTPConnection): response = HANDSHAKE_STR % standard_b64encode(sha1(key + GUID_STR).digest()) self.optimize_for_sending_packet() self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - self.set_state(WRITE, self.upgrade_connection_to_ws, BytesIO(response.encode('ascii')), inheaders) + self.set_state(WRITE, self.upgrade_connection_to_ws, ReadOnlyFileBuffer(response.encode('ascii')), inheaders) def upgrade_connection_to_ws(self, buf, inheaders, event): if self.write(buf): @@ -394,7 +393,7 @@ class WebSocketConnection(HTTPConnection): else: close_code = NORMAL_CLOSE data = struct.pack(b'!H', close_code) - f = BytesIO(create_frame(1, rcode, data)) + f = ReadOnlyFileBuffer(create_frame(1, rcode, data)) f.is_close_frame = opcode == CLOSE with self.cf_lock: self.control_frames.append(f) @@ -411,9 +410,9 @@ class WebSocketConnection(HTTPConnection): self.stop_reading = True reason = reason[:123] if code is None and not reason: - f = BytesIO(create_frame(1, CLOSE, b'')) + f = ReadOnlyFileBuffer(create_frame(1, CLOSE, b'')) else: - f = BytesIO(create_frame(1, CLOSE, struct.pack(b'!H', code) + reason)) + f = ReadOnlyFileBuffer(create_frame(1, CLOSE, struct.pack(b'!H', code) + reason)) f.is_close_frame = True with self.cf_lock: self.control_frames.append(f) @@ -475,7 +474,7 @@ class WebSocketConnection(HTTPConnection): fin = 1 if is_last else 0 frame = create_frame(fin, opcode, data) with self.cf_lock: - self.control_frames.append(BytesIO(frame)) + self.control_frames.append(ReadOnlyFileBuffer(frame)) def send_websocket_ping(self, data=b''): ''' Send a PING to the remote client, it should reply with a PONG which @@ -484,7 +483,7 @@ class WebSocketConnection(HTTPConnection): data = data.encode('utf-8') frame = create_frame(True, PING, data) with self.cf_lock: - self.control_frames.append(BytesIO(frame)) + self.control_frames.append(ReadOnlyFileBuffer(frame)) def handle_websocket_data(self, data, message_starting, message_finished): ''' Called when some data is received from the remote client. In general the