sendfile() for OS X. Also make sending of files a little more robust against simultaneous file modification

This commit is contained in:
Kovid Goyal 2015-05-22 17:23:27 +05:30
parent aa478698d2
commit 4036e4af7d
3 changed files with 101 additions and 52 deletions

View File

@ -177,9 +177,13 @@ class ReadableOutput(object):
def write(self, dest): def write(self, dest):
if self.use_sendfile: if self.use_sendfile:
dest.flush() # Ensure everything in the SocketFile buffer is sent before calling sendfile() dest.flush() # Ensure everything in the SocketFile buffer is sent before calling sendfile()
sendfile_to_socket(self.src_file, 0, self.content_length, dest) sent = sendfile_to_socket(self.src_file, 0, self.content_length, dest)
else: else:
copy_range(self.src_file, 0, self.content_length, dest) sent = copy_range(self.src_file, 0, self.content_length, dest)
if sent != self.content_length:
raise IOError(
'Failed to send complete file (%r) (%s != %s bytes), perhaps the file was modified during send?' % (
getattr(self.src_file, 'name', '<file>'), sent, self.content_length))
self.src_file = None self.src_file = None
def write_compressed(self, dest): def write_compressed(self, dest):
@ -201,10 +205,14 @@ class ReadableOutput(object):
self.src_file = None self.src_file = None
def copy_range(self, start, size, dest): def copy_range(self, start, size, dest):
func = sendfile_to_socket if self.use_sendfile else copy_range
if self.use_sendfile: if self.use_sendfile:
dest.flush() # Ensure everything in the SocketFile buffer is sent before calling sendfile() dest.flush() # Ensure everything in the SocketFile buffer is sent before calling sendfile()
func(self.src_file, start, size, dest) sent = sendfile_to_socket(self.src_file, start, size, dest)
else:
sent = copy_range(self.src_file, start, size, dest)
if sent != size:
raise IOError('Failed to send byte range from file (%r) (%s != %s bytes), perhaps the file was modified during send?' % (
getattr(self.src_file, 'name', '<file>'), sent, size))
class FileSystemOutputFile(ReadableOutput): class FileSystemOutputFile(ReadableOutput):

View File

@ -20,21 +20,57 @@ def file_metadata(fileobj):
pass pass
def copy_range(src_file, start, size, dest): def copy_range(src_file, start, size, dest):
total_sent = 0
src_file.seek(start) src_file.seek(start)
while size > 0: while size > 0:
data = src_file.read(min(size, DEFAULT_BUFFER_SIZE)) data = src_file.read(min(size, DEFAULT_BUFFER_SIZE))
if len(data) == 0:
break # EOF
dest.write(data) dest.write(data)
size -= len(data) size -= len(data)
total_sent += len(data)
del data del data
return total_sent
if iswindows: if iswindows:
sendfile_to_socket = None sendfile_to_socket = None
elif isosx: elif isosx:
sendfile_to_socket = None libc = ctypes.CDLL(None, use_errno=True)
sendfile = ctypes.CFUNCTYPE(
ctypes.c_int, ctypes.c_int, ctypes.c_int, ctypes.c_int64, ctypes.POINTER(ctypes.c_int64), ctypes.c_void_p, ctypes.c_int, use_errno=True)(
('sendfile', libc))
del libc
def sendfile_to_socket(fileobj, offset, size, socket_file):
timeout = socket_file.gettimeout()
if timeout == 0:
return copy_range(fileobj, offset, size, socket_file)
num_bytes = ctypes.c_int64(size)
total_sent = 0
while size > 0:
num_bytes.value = size
r, w, x = select([], [socket_file], [], timeout)
if not w:
raise socket.timeout('timed out in sendfile() waiting for socket to become writeable')
ret = sendfile(fileobj.fileno(), socket_file.fileno(), offset, ctypes.byref(num_bytes), None, 0)
if ret != 0:
err = ctypes.get_errno()
if err in (errno.EBADF, errno.ENOTSUP, errno.ENOTSOCK, errno.EOPNOTSUPP):
return copy_range(fileobj, offset, size, socket_file)
if err not in (errno.EINTR, errno.EAGAIN):
raise IOError((err, os.strerror(err)))
if num_bytes.value == 0:
break # EOF
total_sent += num_bytes.value
size -= num_bytes.value
offset += num_bytes.value
return total_sent
else: else:
libc = ctypes.CDLL(None, use_errno=True) libc = ctypes.CDLL(None, use_errno=True)
sendfile = ctypes.CFUNCTYPE(ctypes.c_ssize_t, ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_int64), ctypes.c_size_t, use_errno=True)(('sendfile64', libc)) sendfile = ctypes.CFUNCTYPE(
ctypes.c_ssize_t, ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_int64), ctypes.c_size_t, use_errno=True)(('sendfile64', libc))
del libc del libc
def sendfile_to_socket(fileobj, offset, size, socket_file): def sendfile_to_socket(fileobj, offset, size, socket_file):
@ -52,10 +88,11 @@ else:
err = ctypes.get_errno() err = ctypes.get_errno()
if err in (errno.ENOSYS, errno.EINVAL): if err in (errno.ENOSYS, errno.EINVAL):
return copy_range(fileobj, off.value, size, socket_file) return copy_range(fileobj, off.value, size, socket_file)
if err != errno.EAGAIN: if err not in (errno.EINTR, errno.EAGAIN):
raise IOError((err, os.strerror(err))) raise IOError((err, os.strerror(err)))
elif sent == 0: elif sent == 0:
break # EOF break # EOF
else: else:
size -= sent size -= sent
total_sent += sent total_sent += sent
return total_sent

View File

@ -216,55 +216,59 @@ class TestHTTP(BaseTest):
r = conn.getresponse() r = conn.getresponse()
self.ae(r.status, httplib.OK), self.ae(zlib.decompress(r.read(), 16+zlib.MAX_WBITS), b'an_etagged_path') self.ae(r.status, httplib.OK), self.ae(zlib.decompress(r.read(), 16+zlib.MAX_WBITS), b'an_etagged_path')
# Test getting a filesystem file for i in '12':
server.change_handler(lambda conn: f) # Test getting a filesystem file
conn = server.connect() server.change_handler(lambda conn: f)
conn.request('GET', '/test') conn = server.connect()
r = conn.getresponse() conn.request('GET', '/test')
etag = type('')(r.getheader('ETag')) r = conn.getresponse()
self.assertTrue(etag) etag = type('')(r.getheader('ETag'))
self.ae(r.getheader('Content-Type'), guess_type(f.name)[0]) self.assertTrue(etag)
self.ae(type('')(r.getheader('Accept-Ranges')), 'bytes') self.ae(r.getheader('Content-Type'), guess_type(f.name)[0])
self.ae(int(r.getheader('Content-Length')), len(fdata)) self.ae(type('')(r.getheader('Accept-Ranges')), 'bytes')
self.ae(r.status, httplib.OK), self.ae(r.read(), fdata) self.ae(int(r.getheader('Content-Length')), len(fdata))
self.ae(r.status, httplib.OK), self.ae(r.read(), fdata)
conn.request('GET', '/test', headers={'Range':'bytes=0-25'}) conn.request('GET', '/test', headers={'Range':'bytes=0-25'})
r = conn.getresponse() r = conn.getresponse()
self.ae(type('')(r.getheader('Accept-Ranges')), 'bytes') self.ae(type('')(r.getheader('Accept-Ranges')), 'bytes')
self.ae(type('')(r.getheader('Content-Range')), 'bytes 0-25/%d' % len(fdata)) self.ae(type('')(r.getheader('Content-Range')), 'bytes 0-25/%d' % len(fdata))
self.ae(int(r.getheader('Content-Length')), 26) self.ae(int(r.getheader('Content-Length')), 26)
self.ae(r.status, httplib.PARTIAL_CONTENT), self.ae(r.read(), fdata[0:26]) self.ae(r.status, httplib.PARTIAL_CONTENT), self.ae(r.read(), fdata[0:26])
conn.request('GET', '/test', headers={'Range':'bytes=100000-'}) conn.request('GET', '/test', headers={'Range':'bytes=100000-'})
r = conn.getresponse() r = conn.getresponse()
self.ae(type('')(r.getheader('Content-Range')), 'bytes */%d' % len(fdata)) self.ae(type('')(r.getheader('Content-Range')), 'bytes */%d' % len(fdata))
self.ae(r.status, httplib.REQUESTED_RANGE_NOT_SATISFIABLE) self.ae(r.status, httplib.REQUESTED_RANGE_NOT_SATISFIABLE)
conn.request('GET', '/test', headers={'Range':'bytes=25-50', 'If-Range':etag}) conn.request('GET', '/test', headers={'Range':'bytes=25-50', 'If-Range':etag})
r = conn.getresponse() r = conn.getresponse()
self.ae(int(r.getheader('Content-Length')), 26) self.ae(int(r.getheader('Content-Length')), 26)
self.ae(r.status, httplib.PARTIAL_CONTENT), self.ae(r.read(), fdata[25:51]) self.ae(r.status, httplib.PARTIAL_CONTENT), self.ae(r.read(), fdata[25:51])
conn.request('GET', '/test', headers={'Range':'bytes=25-50', 'If-Range':'"nomatch"'}) conn.request('GET', '/test', headers={'Range':'bytes=25-50', 'If-Range':'"nomatch"'})
r = conn.getresponse() r = conn.getresponse()
self.assertFalse(r.getheader('Content-Range')) self.assertFalse(r.getheader('Content-Range'))
self.ae(int(r.getheader('Content-Length')), len(fdata)) self.ae(int(r.getheader('Content-Length')), len(fdata))
self.ae(r.status, httplib.OK), self.ae(r.read(), fdata) self.ae(r.status, httplib.OK), self.ae(r.read(), fdata)
conn.request('GET', '/test', headers={'Range':'bytes=0-25,26-50'}) conn.request('GET', '/test', headers={'Range':'bytes=0-25,26-50'})
r = conn.getresponse() r = conn.getresponse()
clen = int(r.getheader('Content-Length')) clen = int(r.getheader('Content-Length'))
data = r.read() data = r.read()
self.ae(clen, len(data)) self.ae(clen, len(data))
buf = BytesIO(data) buf = BytesIO(data)
self.ae(parse_multipart_byterange(buf, r.getheader('Content-Type')), [(0, fdata[:26]), (26, fdata[26:51])]) self.ae(parse_multipart_byterange(buf, r.getheader('Content-Type')), [(0, fdata[:26]), (26, fdata[26:51])])
# Test sending of larger file # Test sending of larger file
lf.seek(0) lf.seek(0)
data = lf.read() data = lf.read()
server.change_handler(lambda conn: lf) server.change_handler(lambda conn: lf)
conn = server.connect() conn = server.connect()
conn.request('GET', '/test') conn.request('GET', '/test')
r = conn.getresponse() r = conn.getresponse()
self.ae(data, r.read()) self.ae(data, r.read())
server.loop.opts.use_sendfile ^= True
conn = server.connect()
# }}} # }}}