Reduce number of mallocs in the WebSocket server

This commit is contained in:
Kovid Goyal 2015-10-28 08:25:43 +05:30
parent ee1593b77c
commit 689bec8c46
3 changed files with 46 additions and 13 deletions

View File

@ -22,7 +22,7 @@ from calibre.srv.http_request import HTTPRequest, read_headers
from calibre.srv.sendfile import file_metadata, sendfile_to_socket_async, CannotSendfile, SendfileInterrupted from calibre.srv.sendfile import file_metadata, sendfile_to_socket_async, CannotSendfile, SendfileInterrupted
from calibre.srv.utils import ( from calibre.srv.utils import (
MultiDict, http_date, HTTP1, HTTP11, socket_errors_socket_closed, MultiDict, http_date, HTTP1, HTTP11, socket_errors_socket_closed,
sort_q_values, get_translator_for_lang, Cookie) sort_q_values, get_translator_for_lang, Cookie, ReadOnlyFileBuffer)
from calibre.utils.monotonic import monotonic from calibre.utils.monotonic import monotonic
Range = namedtuple('Range', 'start stop size') Range = namedtuple('Range', 'start stop size')
@ -324,7 +324,7 @@ class HTTPConnection(HTTPRequest):
limit = end - pos limit = end - pos
if limit == 0: if limit == 0:
return True return True
if self.use_sendfile and not isinstance(buf, BytesIO): if self.use_sendfile and not isinstance(buf, (BytesIO, ReadOnlyFileBuffer)):
try: try:
sent = sendfile_to_socket_async(buf, pos, limit, self.socket) sent = sendfile_to_socket_async(buf, pos, limit, self.socket)
except CannotSendfile: except CannotSendfile:

View File

@ -432,3 +432,37 @@ class Accumulator(object): # {{{
self.total_length = 0 self.total_length = 0
return ans return ans
# }}} # }}}
class ReadOnlyFileBuffer(object):
''' A zero copy implementation of a file like object. Uses memoryviews for efficiency. '''
def __init__(self, raw):
self.sz, self.mv = len(raw), (raw if isinstance(raw, memoryview) else memoryview(raw))
self.pos = 0
def tell(self):
return self.pos
def read(self, n=None):
if n is None:
ans = self.mv[self.pos:]
self.pos = self.sz
return ans
ans = self.mv[self.pos:self.pos+n]
self.pos = min(self.pos + n, self.sz)
return ans
def seek(self, pos, whence=os.SEEK_SET):
if whence == os.SEEK_SET:
self.pos = pos
elif whence == os.SEEK_END:
self.pos = self.sz + pos
else:
self.pos += pos
self.pos = max(0, min(self.pos, self.sz))
return self.pos
def getvalue(self):
return self.mv

View File

@ -10,7 +10,6 @@ from base64 import standard_b64encode
from collections import deque from collections import deque
from functools import partial from functools import partial
from hashlib import sha1 from hashlib import sha1
from io import BytesIO
from Queue import Queue, Empty from Queue import Queue, Empty
from threading import Lock from threading import Lock
@ -18,7 +17,7 @@ from calibre import as_unicode
from calibre.constants import plugins from calibre.constants import plugins
from calibre.srv.loop import ServerLoop, HandleInterrupt, WRITE, READ, RDWR, Connection from calibre.srv.loop import ServerLoop, HandleInterrupt, WRITE, READ, RDWR, Connection
from calibre.srv.http_response import HTTPConnection, create_http_handler from calibre.srv.http_response import HTTPConnection, create_http_handler
from calibre.srv.utils import DESIRED_SEND_BUFFER_SIZE from calibre.srv.utils import DESIRED_SEND_BUFFER_SIZE, ReadOnlyFileBuffer
speedup, err = plugins['speedup'] speedup, err = plugins['speedup']
if not speedup: if not speedup:
raise RuntimeError('Failed to load speedup module with error: ' + err) raise RuntimeError('Failed to load speedup module with error: ' + err)
@ -196,9 +195,9 @@ class MessageWriter(object):
def __init__(self, buf, mask=None, chunk_size=None): def __init__(self, buf, mask=None, chunk_size=None):
self.buf, self.data_type, self.mask = buf, BINARY, mask self.buf, self.data_type, self.mask = buf, BINARY, mask
if isinstance(buf, type('')): if isinstance(buf, type('')):
self.buf, self.data_type = BytesIO(buf.encode('utf-8')), TEXT self.buf, self.data_type = ReadOnlyFileBuffer(buf.encode('utf-8')), TEXT
elif isinstance(buf, bytes): elif isinstance(buf, bytes):
self.buf = BytesIO(buf) self.buf = ReadOnlyFileBuffer(buf)
buf = self.buf buf = self.buf
self.chunk_size = chunk_size or SEND_CHUNK_SIZE self.chunk_size = chunk_size or SEND_CHUNK_SIZE
try: try:
@ -219,7 +218,7 @@ class MessageWriter(object):
fin = 0 if has_more and raw else 1 fin = 0 if has_more and raw else 1
opcode = 0 if self.first_frame_created else self.data_type opcode = 0 if self.first_frame_created else self.data_type
self.first_frame_created, self.exhausted = True, bool(fin) self.first_frame_created, self.exhausted = True, bool(fin)
return BytesIO(create_frame(fin, opcode, raw, self.mask)) return ReadOnlyFileBuffer(create_frame(fin, opcode, raw, self.mask))
# }}} # }}}
conn_id = 0 conn_id = 0
@ -277,7 +276,7 @@ class WebSocketConnection(HTTPConnection):
response = HANDSHAKE_STR % standard_b64encode(sha1(key + GUID_STR).digest()) response = HANDSHAKE_STR % standard_b64encode(sha1(key + GUID_STR).digest())
self.optimize_for_sending_packet() self.optimize_for_sending_packet()
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.set_state(WRITE, self.upgrade_connection_to_ws, BytesIO(response.encode('ascii')), inheaders) self.set_state(WRITE, self.upgrade_connection_to_ws, ReadOnlyFileBuffer(response.encode('ascii')), inheaders)
def upgrade_connection_to_ws(self, buf, inheaders, event): def upgrade_connection_to_ws(self, buf, inheaders, event):
if self.write(buf): if self.write(buf):
@ -394,7 +393,7 @@ class WebSocketConnection(HTTPConnection):
else: else:
close_code = NORMAL_CLOSE close_code = NORMAL_CLOSE
data = struct.pack(b'!H', close_code) data = struct.pack(b'!H', close_code)
f = BytesIO(create_frame(1, rcode, data)) f = ReadOnlyFileBuffer(create_frame(1, rcode, data))
f.is_close_frame = opcode == CLOSE f.is_close_frame = opcode == CLOSE
with self.cf_lock: with self.cf_lock:
self.control_frames.append(f) self.control_frames.append(f)
@ -411,9 +410,9 @@ class WebSocketConnection(HTTPConnection):
self.stop_reading = True self.stop_reading = True
reason = reason[:123] reason = reason[:123]
if code is None and not reason: if code is None and not reason:
f = BytesIO(create_frame(1, CLOSE, b'')) f = ReadOnlyFileBuffer(create_frame(1, CLOSE, b''))
else: else:
f = BytesIO(create_frame(1, CLOSE, struct.pack(b'!H', code) + reason)) f = ReadOnlyFileBuffer(create_frame(1, CLOSE, struct.pack(b'!H', code) + reason))
f.is_close_frame = True f.is_close_frame = True
with self.cf_lock: with self.cf_lock:
self.control_frames.append(f) self.control_frames.append(f)
@ -475,7 +474,7 @@ class WebSocketConnection(HTTPConnection):
fin = 1 if is_last else 0 fin = 1 if is_last else 0
frame = create_frame(fin, opcode, data) frame = create_frame(fin, opcode, data)
with self.cf_lock: with self.cf_lock:
self.control_frames.append(BytesIO(frame)) self.control_frames.append(ReadOnlyFileBuffer(frame))
def send_websocket_ping(self, data=b''): def send_websocket_ping(self, data=b''):
''' Send a PING to the remote client, it should reply with a PONG which ''' Send a PING to the remote client, it should reply with a PONG which
@ -484,7 +483,7 @@ class WebSocketConnection(HTTPConnection):
data = data.encode('utf-8') data = data.encode('utf-8')
frame = create_frame(True, PING, data) frame = create_frame(True, PING, data)
with self.cf_lock: with self.cf_lock:
self.control_frames.append(BytesIO(frame)) self.control_frames.append(ReadOnlyFileBuffer(frame))
def handle_websocket_data(self, data, message_starting, message_finished): def handle_websocket_data(self, data, message_starting, message_finished):
''' Called when some data is received from the remote client. In general the ''' Called when some data is received from the remote client. In general the