mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-09 03:04:10 -04:00
Use sendfile() for added performance on linux
This commit is contained in:
parent
439ddab5e5
commit
62ace2707f
@ -557,7 +557,7 @@ class HTTPPair(object):
|
|||||||
try:
|
try:
|
||||||
self.status_code, output = finalize_output(
|
self.status_code, output = finalize_output(
|
||||||
output, self.inheaders, self.outheaders, self.status_code,
|
output, self.inheaders, self.outheaders, self.status_code,
|
||||||
self.response_protocol is HTTP1, self.method, self.server_loop.opts.compress_min_size)
|
self.response_protocol is HTTP1, self.method, self.server_loop.opts)
|
||||||
except IfNoneMatch as e:
|
except IfNoneMatch as e:
|
||||||
if self.method in ('GET', 'HEAD'):
|
if self.method in ('GET', 'HEAD'):
|
||||||
self.send_not_modified(e.etag)
|
self.send_not_modified(e.etag)
|
||||||
|
@ -80,6 +80,9 @@ class SocketFile(object): # {{{
|
|||||||
def fileno(self):
|
def fileno(self):
|
||||||
return self._sock.fileno()
|
return self._sock.fileno()
|
||||||
|
|
||||||
|
def gettimeout(self):
|
||||||
|
return self._sock.gettimeout()
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
@ -69,6 +69,12 @@ raw_options = (
|
|||||||
'no_delay turns on TCP_NODELAY which decreases latency at the cost of'
|
'no_delay turns on TCP_NODELAY which decreases latency at the cost of'
|
||||||
' worse overall performance when sending multiple small packets. It'
|
' worse overall performance when sending multiple small packets. It'
|
||||||
' prevents the TCP stack from aggregating multiple small TCP packets.',
|
' prevents the TCP stack from aggregating multiple small TCP packets.',
|
||||||
|
|
||||||
|
'Use zero copy file transfers for increased performance',
|
||||||
|
'use_sendfile', True,
|
||||||
|
'This will use zero-copy in-kernel transfers when sending files over the network,'
|
||||||
|
' increasing performance. However, it can cause corrupted file transfers on some'
|
||||||
|
' broken filesystems. If you experience corrupted file transfers, turn it off.',
|
||||||
)
|
)
|
||||||
assert len(raw_options) % 4 == 0
|
assert len(raw_options) % 4 == 0
|
||||||
|
|
||||||
|
@ -6,7 +6,7 @@ from __future__ import (unicode_literals, division, absolute_import,
|
|||||||
__license__ = 'GPL v3'
|
__license__ = 'GPL v3'
|
||||||
__copyright__ = '2015, Kovid Goyal <kovid at kovidgoyal.net>'
|
__copyright__ = '2015, Kovid Goyal <kovid at kovidgoyal.net>'
|
||||||
|
|
||||||
import os, hashlib, shutil, httplib, zlib, struct, time, uuid
|
import os, hashlib, httplib, zlib, struct, time, uuid
|
||||||
from io import DEFAULT_BUFFER_SIZE, BytesIO
|
from io import DEFAULT_BUFFER_SIZE, BytesIO
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from functools import partial
|
from functools import partial
|
||||||
@ -15,6 +15,7 @@ from itertools import izip_longest
|
|||||||
|
|
||||||
from calibre import force_unicode, guess_type
|
from calibre import force_unicode, guess_type
|
||||||
from calibre.srv.errors import IfNoneMatch, RangeNotSatisfiable
|
from calibre.srv.errors import IfNoneMatch, RangeNotSatisfiable
|
||||||
|
from calibre.srv.sendfile import file_metadata, copy_range, sendfile_to_socket
|
||||||
|
|
||||||
Range = namedtuple('Range', 'start stop size')
|
Range = namedtuple('Range', 'start stop size')
|
||||||
MULTIPART_SEPARATOR = uuid.uuid4().hex.decode('ascii')
|
MULTIPART_SEPARATOR = uuid.uuid4().hex.decode('ascii')
|
||||||
@ -163,18 +164,22 @@ def parse_multipart_byterange(buf, content_type):
|
|||||||
ans.append(data)
|
ans.append(data)
|
||||||
return ans
|
return ans
|
||||||
|
|
||||||
class FileSystemOutputFile(object):
|
class ReadableOutput(object):
|
||||||
|
|
||||||
def __init__(self, output, outheaders, size):
|
def __init__(self, output, outheaders):
|
||||||
self.src_file = output
|
self.src_file = output
|
||||||
self.name = output.name
|
self.src_file.seek(0, os.SEEK_END)
|
||||||
self.content_length = size
|
self.content_length = self.src_file.tell()
|
||||||
self.etag = '"%s"' % hashlib.sha1(type('')(os.fstat(output.fileno()).st_mtime) + force_unicode(output.name or '')).hexdigest()
|
self.etag = None
|
||||||
self.accept_ranges = True
|
self.accept_ranges = True
|
||||||
|
self.use_sendfile = False
|
||||||
|
|
||||||
def write(self, dest):
|
def write(self, dest):
|
||||||
self.src_file.seek(0)
|
if self.use_sendfile:
|
||||||
shutil.copyfileobj(self.src_file, dest)
|
dest.flush() # Ensure everything in the SocketFile buffer is sent before calling sendfile()
|
||||||
|
sendfile_to_socket(self.src_file, 0, self.content_length, dest)
|
||||||
|
else:
|
||||||
|
copy_range(self.src_file, 0, self.content_length, dest)
|
||||||
self.src_file = None
|
self.src_file = None
|
||||||
|
|
||||||
def write_compressed(self, dest):
|
def write_compressed(self, dest):
|
||||||
@ -196,12 +201,21 @@ class FileSystemOutputFile(object):
|
|||||||
self.src_file = None
|
self.src_file = None
|
||||||
|
|
||||||
def copy_range(self, start, size, dest):
|
def copy_range(self, start, size, dest):
|
||||||
self.src_file.seek(start)
|
func = sendfile_to_socket if self.use_sendfile else copy_range
|
||||||
while size > 0:
|
if self.use_sendfile:
|
||||||
data = self.src_file.read(min(size, DEFAULT_BUFFER_SIZE))
|
dest.flush() # Ensure everything in the SocketFile buffer is sent before calling sendfile()
|
||||||
dest.write(data)
|
func(self.src_file, start, size, dest)
|
||||||
size -= len(data)
|
|
||||||
del data
|
class FileSystemOutputFile(ReadableOutput):
|
||||||
|
|
||||||
|
def __init__(self, output, outheaders, stat_result, use_sendfile):
|
||||||
|
self.src_file = output
|
||||||
|
self.name = output.name
|
||||||
|
self.content_length = stat_result.st_size
|
||||||
|
self.etag = '"%s"' % hashlib.sha1(type('')(stat_result.st_mtime) + force_unicode(output.name or '')).hexdigest()
|
||||||
|
self.accept_ranges = True
|
||||||
|
self.use_sendfile = use_sendfile and sendfile_to_socket is not None
|
||||||
|
|
||||||
|
|
||||||
class DynamicOutput(object):
|
class DynamicOutput(object):
|
||||||
|
|
||||||
@ -262,16 +276,12 @@ def generate_static_output(cache, gso_lock, name, generator):
|
|||||||
def parse_if_none_match(val):
|
def parse_if_none_match(val):
|
||||||
return {x.strip() for x in val.split(',')}
|
return {x.strip() for x in val.split(',')}
|
||||||
|
|
||||||
def finalize_output(output, inheaders, outheaders, status_code, is_http1, method, compress_min_size):
|
def finalize_output(output, inheaders, outheaders, status_code, is_http1, method, opts):
|
||||||
ct = outheaders.get('Content-Type', '')
|
ct = outheaders.get('Content-Type', '')
|
||||||
compressible = not ct or ct.startswith('text/') or ct.startswith('image/svg') or ct.startswith('application/json')
|
compressible = not ct or ct.startswith('text/') or ct.startswith('image/svg') or ct.startswith('application/json')
|
||||||
try:
|
stat_result = file_metadata(output)
|
||||||
fd = output.fileno()
|
if stat_result is not None:
|
||||||
fsize = os.fstat(fd).st_size
|
output = FileSystemOutputFile(output, outheaders, stat_result, opts.use_sendfile)
|
||||||
except Exception:
|
|
||||||
fd = fsize = None
|
|
||||||
if fsize is not None:
|
|
||||||
output = FileSystemOutputFile(output, outheaders, fsize)
|
|
||||||
if 'Content-Type' not in outheaders:
|
if 'Content-Type' not in outheaders:
|
||||||
mt = guess_type(output.name)[0]
|
mt = guess_type(output.name)[0]
|
||||||
if mt:
|
if mt:
|
||||||
@ -280,12 +290,14 @@ def finalize_output(output, inheaders, outheaders, status_code, is_http1, method
|
|||||||
outheaders['Content-Type'] = mt
|
outheaders['Content-Type'] = mt
|
||||||
elif isinstance(output, (bytes, type(''))):
|
elif isinstance(output, (bytes, type(''))):
|
||||||
output = DynamicOutput(output, outheaders)
|
output = DynamicOutput(output, outheaders)
|
||||||
|
elif hasattr(output, 'read'):
|
||||||
|
output = ReadableOutput(output, outheaders)
|
||||||
elif isinstance(output, StaticGeneratedOutput):
|
elif isinstance(output, StaticGeneratedOutput):
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
output = GeneratedOutput(output, outheaders)
|
output = GeneratedOutput(output, outheaders)
|
||||||
compressible = (status_code == httplib.OK and compressible and
|
compressible = (status_code == httplib.OK and compressible and
|
||||||
(compress_min_size > -1 and output.content_length >= compress_min_size) and
|
(opts.compress_min_size > -1 and output.content_length >= opts.compress_min_size) and
|
||||||
acceptable_encoding(inheaders.get('Accept-Encoding', '')) and not is_http1)
|
acceptable_encoding(inheaders.get('Accept-Encoding', '')) and not is_http1)
|
||||||
accept_ranges = (not compressible and output.accept_ranges is not None and status_code == httplib.OK and
|
accept_ranges = (not compressible and output.accept_ranges is not None and status_code == httplib.OK and
|
||||||
not is_http1)
|
not is_http1)
|
||||||
|
61
src/calibre/srv/sendfile.py
Normal file
61
src/calibre/srv/sendfile.py
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
#!/usr/bin/env python2
|
||||||
|
# vim:fileencoding=utf-8
|
||||||
|
from __future__ import (unicode_literals, division, absolute_import,
|
||||||
|
print_function)
|
||||||
|
|
||||||
|
__license__ = 'GPL v3'
|
||||||
|
__copyright__ = '2015, Kovid Goyal <kovid at kovidgoyal.net>'
|
||||||
|
|
||||||
|
import os, ctypes, errno, socket
|
||||||
|
from io import DEFAULT_BUFFER_SIZE
|
||||||
|
from select import select
|
||||||
|
|
||||||
|
from calibre.constants import iswindows, isosx
|
||||||
|
|
||||||
|
def file_metadata(fileobj):
|
||||||
|
try:
|
||||||
|
fd = fileobj.fileno()
|
||||||
|
return os.fstat(fd)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def copy_range(src_file, start, size, dest):
|
||||||
|
src_file.seek(start)
|
||||||
|
while size > 0:
|
||||||
|
data = src_file.read(min(size, DEFAULT_BUFFER_SIZE))
|
||||||
|
dest.write(data)
|
||||||
|
size -= len(data)
|
||||||
|
del data
|
||||||
|
|
||||||
|
|
||||||
|
if iswindows:
|
||||||
|
sendfile_to_socket = None
|
||||||
|
elif isosx:
|
||||||
|
sendfile_to_socket = None
|
||||||
|
else:
|
||||||
|
libc = ctypes.CDLL(None, use_errno=True)
|
||||||
|
sendfile = ctypes.CFUNCTYPE(ctypes.c_ssize_t, ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_int64), ctypes.c_size_t, use_errno=True)(('sendfile64', libc))
|
||||||
|
del libc
|
||||||
|
|
||||||
|
def sendfile_to_socket(fileobj, offset, size, socket_file):
|
||||||
|
off = ctypes.c_int64(offset)
|
||||||
|
timeout = socket_file.gettimeout()
|
||||||
|
if timeout == 0:
|
||||||
|
return copy_range(fileobj, off.value, size, socket_file)
|
||||||
|
total_sent = 0
|
||||||
|
while size > 0:
|
||||||
|
r, w, x = select([], [socket_file], [], timeout)
|
||||||
|
if not w:
|
||||||
|
raise socket.timeout('timed out in sendfile() waiting for socket to become writeable')
|
||||||
|
sent = sendfile(socket_file.fileno(), fileobj.fileno(), ctypes.byref(off), size)
|
||||||
|
if sent < 0:
|
||||||
|
err = ctypes.get_errno()
|
||||||
|
if err in (errno.ENOSYS, errno.EINVAL):
|
||||||
|
return copy_range(fileobj, off.value, size, socket_file)
|
||||||
|
if err != errno.EAGAIN:
|
||||||
|
raise IOError((err, os.strerror(err)))
|
||||||
|
elif sent == 0:
|
||||||
|
break # EOF
|
||||||
|
else:
|
||||||
|
size -= sent
|
||||||
|
total_sent += sent
|
@ -194,7 +194,8 @@ class TestHTTP(BaseTest):
|
|||||||
from calibre.srv.respond import parse_multipart_byterange
|
from calibre.srv.respond import parse_multipart_byterange
|
||||||
def handler(conn):
|
def handler(conn):
|
||||||
return conn.generate_static_output('test', lambda : ''.join(conn.path))
|
return conn.generate_static_output('test', lambda : ''.join(conn.path))
|
||||||
with TestServer(handler, timeout=0.1, compress_min_size=0) as server, NamedTemporaryFile(suffix='test.epub') as f:
|
with TestServer(handler, timeout=0.1, compress_min_size=0) as server, \
|
||||||
|
NamedTemporaryFile(suffix='test.epub') as f, open(P('localization/locales.zip'), 'rb') as lf:
|
||||||
fdata = string.ascii_letters * 100
|
fdata = string.ascii_letters * 100
|
||||||
f.write(fdata), f.seek(0)
|
f.write(fdata), f.seek(0)
|
||||||
|
|
||||||
@ -258,4 +259,12 @@ class TestHTTP(BaseTest):
|
|||||||
buf = BytesIO(data)
|
buf = BytesIO(data)
|
||||||
self.ae(parse_multipart_byterange(buf, r.getheader('Content-Type')), [(0, fdata[:26]), (26, fdata[26:51])])
|
self.ae(parse_multipart_byterange(buf, r.getheader('Content-Type')), [(0, fdata[:26]), (26, fdata[26:51])])
|
||||||
|
|
||||||
|
# Test sending of larger file
|
||||||
|
lf.seek(0)
|
||||||
|
data = lf.read()
|
||||||
|
server.change_handler(lambda conn: lf)
|
||||||
|
conn = server.connect()
|
||||||
|
conn.request('GET', '/test')
|
||||||
|
r = conn.getresponse()
|
||||||
|
self.ae(data, r.read())
|
||||||
# }}}
|
# }}}
|
||||||
|
@ -112,13 +112,21 @@ class Corked(object):
|
|||||||
' Context manager to turn on TCP corking. Ensures maximum throughput for large logical packets. '
|
' Context manager to turn on TCP corking. Ensures maximum throughput for large logical packets. '
|
||||||
|
|
||||||
def __init__(self, sock):
|
def __init__(self, sock):
|
||||||
self.sock = sock if hasattr(socket, 'TCP_CORK') else None
|
self.sock = sock
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
if self.sock is not None:
|
nodelay = self.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
|
||||||
|
if nodelay == 1:
|
||||||
|
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 0)
|
||||||
|
self.set_nodelay = True
|
||||||
|
else:
|
||||||
|
self.set_nodelay = False
|
||||||
|
if hasattr(socket, 'TCP_CORK'):
|
||||||
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1)
|
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1)
|
||||||
|
|
||||||
def __exit__(self, *args):
|
def __exit__(self, *args):
|
||||||
if self.sock is not None:
|
if self.set_nodelay:
|
||||||
|
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||||
|
if hasattr(socket, 'TCP_CORK'):
|
||||||
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 0)
|
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 0)
|
||||||
self.sock.send(b'') # Ensure that uncorking occurs
|
self.sock.send(b'') # Ensure that uncorking occurs
|
||||||
|
Loading…
x
Reference in New Issue
Block a user