Parsing of range header

This commit is contained in:
Kovid Goyal 2015-05-21 17:57:40 +05:30
parent ae9021d279
commit c03ab24377
2 changed files with 65 additions and 2 deletions

View File

@ -12,6 +12,49 @@ from io import DEFAULT_BUFFER_SIZE, BytesIO
from calibre import force_unicode from calibre import force_unicode
from calibre.srv.errors import IfNoneMatch from calibre.srv.errors import IfNoneMatch
def get_ranges(headervalue, content_length):
"""Return a list of (start, num_of_bytes) indices from a Range header, or None.
If this function returns an empty list, it indicates no valid range was found.
"""
if not headervalue:
return None
result = []
try:
bytesunit, byteranges = headervalue.split("=", 1)
except Exception:
return None
if bytesunit.strip() != 'bytes':
return None
for brange in byteranges.split(","):
start, stop = [x.strip() for x in brange.split("-", 1)]
if start:
if not stop:
stop = content_length - 1
try:
start, stop = int(start), int(stop)
except Exception:
continue
if start >= content_length:
continue
if stop < start:
continue
result.append((start, stop - start + 1))
elif stop:
# Negative subscript (last N bytes)
try:
stop = int(stop)
except Exception:
continue
if stop > content_length:
result.append((0, content_length))
else:
result.append((content_length - stop, stop))
return result
def acceptable_encoding(val, allowed=frozenset({'gzip'})): def acceptable_encoding(val, allowed=frozenset({'gzip'})):
def enc(x): def enc(x):
e, r = x.partition(';')[::2] e, r = x.partition(';')[::2]

View File

@ -63,7 +63,28 @@ class TestHTTP(BaseTest):
test('Case insensitive', 'GZIp', 'gzip') test('Case insensitive', 'GZIp', 'gzip')
test('Multiple', 'gzip, identity', 'gzip') test('Multiple', 'gzip, identity', 'gzip')
test('Priority', '1;q=0.5, 2;q=0.75, 3;q=1.0', '3', {'1', '2', '3'}) test('Priority', '1;q=0.5, 2;q=0.75, 3;q=1.0', '3', {'1', '2', '3'})
# }}} # }}}
def test_range_parsing(self): # {{{
'Test parsing of Range header'
from calibre.srv.respond import get_ranges
def test(val, *args):
pval = get_ranges(val, 100)
if len(args) == 1 and args[0] is None:
self.assertIsNone(pval, val)
else:
self.assertListEqual(pval, list(args), val)
test('crap', None)
test('crap=', None)
test('crap=1', None)
test('crap=1-2', None)
test('bytes=a-2')
test('bytes=0-99', (0, 100))
test('bytes=0-0,-1', (0, 1), (99, 1))
test('bytes=-5', (95, 5))
test('bytes=95-', (95, 5))
test('bytes=-200', (0, 100))
# }}}
def test_http_basic(self): # {{{ def test_http_basic(self): # {{{
'Test basic HTTP protocol conformance' 'Test basic HTTP protocol conformance'
@ -194,4 +215,3 @@ class TestHTTP(BaseTest):
self.ae(r.status, httplib.OK), self.ae(zlib.decompress(r.read(), 16+zlib.MAX_WBITS), b'an_etagged_path') self.ae(r.status, httplib.OK), self.ae(zlib.decompress(r.read(), 16+zlib.MAX_WBITS), b'an_etagged_path')
# }}} # }}}