From 62ace2707fabc8037e13d41ebeafac23d1da3ff5 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Fri, 22 May 2015 16:37:45 +0530 Subject: [PATCH] Use sendfile() for added performance on linux --- src/calibre/srv/http.py | 2 +- src/calibre/srv/loop.py | 3 ++ src/calibre/srv/opts.py | 6 ++++ src/calibre/srv/respond.py | 58 ++++++++++++++++++++------------- src/calibre/srv/sendfile.py | 61 +++++++++++++++++++++++++++++++++++ src/calibre/srv/tests/http.py | 11 ++++++- src/calibre/srv/utils.py | 14 ++++++-- 7 files changed, 127 insertions(+), 28 deletions(-) create mode 100644 src/calibre/srv/sendfile.py diff --git a/src/calibre/srv/http.py b/src/calibre/srv/http.py index 7f90ae55f8..c5214c52a8 100644 --- a/src/calibre/srv/http.py +++ b/src/calibre/srv/http.py @@ -557,7 +557,7 @@ class HTTPPair(object): try: self.status_code, output = finalize_output( output, self.inheaders, self.outheaders, self.status_code, - self.response_protocol is HTTP1, self.method, self.server_loop.opts.compress_min_size) + self.response_protocol is HTTP1, self.method, self.server_loop.opts) except IfNoneMatch as e: if self.method in ('GET', 'HEAD'): self.send_not_modified(e.etag) diff --git a/src/calibre/srv/loop.py b/src/calibre/srv/loop.py index d3971d4a69..2f946e117f 100644 --- a/src/calibre/srv/loop.py +++ b/src/calibre/srv/loop.py @@ -80,6 +80,9 @@ class SocketFile(object): # {{{ def fileno(self): return self._sock.fileno() + def gettimeout(self): + return self._sock.gettimeout() + def __enter__(self): return self diff --git a/src/calibre/srv/opts.py b/src/calibre/srv/opts.py index 4d4be3998d..c9463ab0d1 100644 --- a/src/calibre/srv/opts.py +++ b/src/calibre/srv/opts.py @@ -69,6 +69,12 @@ raw_options = ( 'no_delay turns on TCP_NODELAY which decreases latency at the cost of' ' worse overall performance when sending multiple small packets. It' ' prevents the TCP stack from aggregating multiple small TCP packets.', + + 'Use zero copy file transfers for increased performance', + 'use_sendfile', True, + 'This will use zero-copy in-kernel transfers when sending files over the network,' + ' increasing performance. However, it can cause corrupted file transfers on some' + ' broken filesystems. If you experience corrupted file transfers, turn it off.', ) assert len(raw_options) % 4 == 0 diff --git a/src/calibre/srv/respond.py b/src/calibre/srv/respond.py index d48ea94e9a..a2bbf355dc 100644 --- a/src/calibre/srv/respond.py +++ b/src/calibre/srv/respond.py @@ -6,7 +6,7 @@ from __future__ import (unicode_literals, division, absolute_import, __license__ = 'GPL v3' __copyright__ = '2015, Kovid Goyal ' -import os, hashlib, shutil, httplib, zlib, struct, time, uuid +import os, hashlib, httplib, zlib, struct, time, uuid from io import DEFAULT_BUFFER_SIZE, BytesIO from collections import namedtuple from functools import partial @@ -15,6 +15,7 @@ from itertools import izip_longest from calibre import force_unicode, guess_type from calibre.srv.errors import IfNoneMatch, RangeNotSatisfiable +from calibre.srv.sendfile import file_metadata, copy_range, sendfile_to_socket Range = namedtuple('Range', 'start stop size') MULTIPART_SEPARATOR = uuid.uuid4().hex.decode('ascii') @@ -163,18 +164,22 @@ def parse_multipart_byterange(buf, content_type): ans.append(data) return ans -class FileSystemOutputFile(object): +class ReadableOutput(object): - def __init__(self, output, outheaders, size): + def __init__(self, output, outheaders): self.src_file = output - self.name = output.name - self.content_length = size - self.etag = '"%s"' % hashlib.sha1(type('')(os.fstat(output.fileno()).st_mtime) + force_unicode(output.name or '')).hexdigest() + self.src_file.seek(0, os.SEEK_END) + self.content_length = self.src_file.tell() + self.etag = None self.accept_ranges = True + self.use_sendfile = False def write(self, dest): - self.src_file.seek(0) - shutil.copyfileobj(self.src_file, dest) + if self.use_sendfile: + dest.flush() # Ensure everything in the SocketFile buffer is sent before calling sendfile() + sendfile_to_socket(self.src_file, 0, self.content_length, dest) + else: + copy_range(self.src_file, 0, self.content_length, dest) self.src_file = None def write_compressed(self, dest): @@ -196,12 +201,21 @@ class FileSystemOutputFile(object): self.src_file = None def copy_range(self, start, size, dest): - self.src_file.seek(start) - while size > 0: - data = self.src_file.read(min(size, DEFAULT_BUFFER_SIZE)) - dest.write(data) - size -= len(data) - del data + func = sendfile_to_socket if self.use_sendfile else copy_range + if self.use_sendfile: + dest.flush() # Ensure everything in the SocketFile buffer is sent before calling sendfile() + func(self.src_file, start, size, dest) + +class FileSystemOutputFile(ReadableOutput): + + def __init__(self, output, outheaders, stat_result, use_sendfile): + self.src_file = output + self.name = output.name + self.content_length = stat_result.st_size + self.etag = '"%s"' % hashlib.sha1(type('')(stat_result.st_mtime) + force_unicode(output.name or '')).hexdigest() + self.accept_ranges = True + self.use_sendfile = use_sendfile and sendfile_to_socket is not None + class DynamicOutput(object): @@ -262,16 +276,12 @@ def generate_static_output(cache, gso_lock, name, generator): def parse_if_none_match(val): return {x.strip() for x in val.split(',')} -def finalize_output(output, inheaders, outheaders, status_code, is_http1, method, compress_min_size): +def finalize_output(output, inheaders, outheaders, status_code, is_http1, method, opts): ct = outheaders.get('Content-Type', '') compressible = not ct or ct.startswith('text/') or ct.startswith('image/svg') or ct.startswith('application/json') - try: - fd = output.fileno() - fsize = os.fstat(fd).st_size - except Exception: - fd = fsize = None - if fsize is not None: - output = FileSystemOutputFile(output, outheaders, fsize) + stat_result = file_metadata(output) + if stat_result is not None: + output = FileSystemOutputFile(output, outheaders, stat_result, opts.use_sendfile) if 'Content-Type' not in outheaders: mt = guess_type(output.name)[0] if mt: @@ -280,12 +290,14 @@ def finalize_output(output, inheaders, outheaders, status_code, is_http1, method outheaders['Content-Type'] = mt elif isinstance(output, (bytes, type(''))): output = DynamicOutput(output, outheaders) + elif hasattr(output, 'read'): + output = ReadableOutput(output, outheaders) elif isinstance(output, StaticGeneratedOutput): pass else: output = GeneratedOutput(output, outheaders) compressible = (status_code == httplib.OK and compressible and - (compress_min_size > -1 and output.content_length >= compress_min_size) and + (opts.compress_min_size > -1 and output.content_length >= opts.compress_min_size) and acceptable_encoding(inheaders.get('Accept-Encoding', '')) and not is_http1) accept_ranges = (not compressible and output.accept_ranges is not None and status_code == httplib.OK and not is_http1) diff --git a/src/calibre/srv/sendfile.py b/src/calibre/srv/sendfile.py new file mode 100644 index 0000000000..ba86d6cadc --- /dev/null +++ b/src/calibre/srv/sendfile.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python2 +# vim:fileencoding=utf-8 +from __future__ import (unicode_literals, division, absolute_import, + print_function) + +__license__ = 'GPL v3' +__copyright__ = '2015, Kovid Goyal ' + +import os, ctypes, errno, socket +from io import DEFAULT_BUFFER_SIZE +from select import select + +from calibre.constants import iswindows, isosx + +def file_metadata(fileobj): + try: + fd = fileobj.fileno() + return os.fstat(fd) + except Exception: + pass + +def copy_range(src_file, start, size, dest): + src_file.seek(start) + while size > 0: + data = src_file.read(min(size, DEFAULT_BUFFER_SIZE)) + dest.write(data) + size -= len(data) + del data + + +if iswindows: + sendfile_to_socket = None +elif isosx: + sendfile_to_socket = None +else: + libc = ctypes.CDLL(None, use_errno=True) + sendfile = ctypes.CFUNCTYPE(ctypes.c_ssize_t, ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_int64), ctypes.c_size_t, use_errno=True)(('sendfile64', libc)) + del libc + + def sendfile_to_socket(fileobj, offset, size, socket_file): + off = ctypes.c_int64(offset) + timeout = socket_file.gettimeout() + if timeout == 0: + return copy_range(fileobj, off.value, size, socket_file) + total_sent = 0 + while size > 0: + r, w, x = select([], [socket_file], [], timeout) + if not w: + raise socket.timeout('timed out in sendfile() waiting for socket to become writeable') + sent = sendfile(socket_file.fileno(), fileobj.fileno(), ctypes.byref(off), size) + if sent < 0: + err = ctypes.get_errno() + if err in (errno.ENOSYS, errno.EINVAL): + return copy_range(fileobj, off.value, size, socket_file) + if err != errno.EAGAIN: + raise IOError((err, os.strerror(err))) + elif sent == 0: + break # EOF + else: + size -= sent + total_sent += sent diff --git a/src/calibre/srv/tests/http.py b/src/calibre/srv/tests/http.py index c96672e973..da0028adee 100644 --- a/src/calibre/srv/tests/http.py +++ b/src/calibre/srv/tests/http.py @@ -194,7 +194,8 @@ class TestHTTP(BaseTest): from calibre.srv.respond import parse_multipart_byterange def handler(conn): return conn.generate_static_output('test', lambda : ''.join(conn.path)) - with TestServer(handler, timeout=0.1, compress_min_size=0) as server, NamedTemporaryFile(suffix='test.epub') as f: + with TestServer(handler, timeout=0.1, compress_min_size=0) as server, \ + NamedTemporaryFile(suffix='test.epub') as f, open(P('localization/locales.zip'), 'rb') as lf: fdata = string.ascii_letters * 100 f.write(fdata), f.seek(0) @@ -258,4 +259,12 @@ class TestHTTP(BaseTest): buf = BytesIO(data) self.ae(parse_multipart_byterange(buf, r.getheader('Content-Type')), [(0, fdata[:26]), (26, fdata[26:51])]) + # Test sending of larger file + lf.seek(0) + data = lf.read() + server.change_handler(lambda conn: lf) + conn = server.connect() + conn.request('GET', '/test') + r = conn.getresponse() + self.ae(data, r.read()) # }}} diff --git a/src/calibre/srv/utils.py b/src/calibre/srv/utils.py index 59070521fe..4df8e0ac48 100644 --- a/src/calibre/srv/utils.py +++ b/src/calibre/srv/utils.py @@ -112,13 +112,21 @@ class Corked(object): ' Context manager to turn on TCP corking. Ensures maximum throughput for large logical packets. ' def __init__(self, sock): - self.sock = sock if hasattr(socket, 'TCP_CORK') else None + self.sock = sock def __enter__(self): - if self.sock is not None: + nodelay = self.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) + if nodelay == 1: + self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 0) + self.set_nodelay = True + else: + self.set_nodelay = False + if hasattr(socket, 'TCP_CORK'): self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1) def __exit__(self, *args): - if self.sock is not None: + if self.set_nodelay: + self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + if hasattr(socket, 'TCP_CORK'): self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 0) self.sock.send(b'') # Ensure that uncorking occurs