diff --git a/src/calibre/srv/respond.py b/src/calibre/srv/respond.py index d2e4e23a52..ed50e560b1 100644 --- a/src/calibre/srv/respond.py +++ b/src/calibre/srv/respond.py @@ -12,6 +12,49 @@ from io import DEFAULT_BUFFER_SIZE, BytesIO from calibre import force_unicode from calibre.srv.errors import IfNoneMatch +def get_ranges(headervalue, content_length): + """Return a list of (start, num_of_bytes) indices from a Range header, or None. + If this function returns an empty list, it indicates no valid range was found. + """ + if not headervalue: + return None + + result = [] + try: + bytesunit, byteranges = headervalue.split("=", 1) + except Exception: + return None + if bytesunit.strip() != 'bytes': + return None + + for brange in byteranges.split(","): + start, stop = [x.strip() for x in brange.split("-", 1)] + if start: + if not stop: + stop = content_length - 1 + try: + start, stop = int(start), int(stop) + except Exception: + continue + if start >= content_length: + continue + if stop < start: + continue + result.append((start, stop - start + 1)) + elif stop: + # Negative subscript (last N bytes) + try: + stop = int(stop) + except Exception: + continue + if stop > content_length: + result.append((0, content_length)) + else: + result.append((content_length - stop, stop)) + + return result + + def acceptable_encoding(val, allowed=frozenset({'gzip'})): def enc(x): e, r = x.partition(';')[::2] diff --git a/src/calibre/srv/tests/http.py b/src/calibre/srv/tests/http.py index 5516c25703..eced708e95 100644 --- a/src/calibre/srv/tests/http.py +++ b/src/calibre/srv/tests/http.py @@ -63,7 +63,28 @@ class TestHTTP(BaseTest): test('Case insensitive', 'GZIp', 'gzip') test('Multiple', 'gzip, identity', 'gzip') test('Priority', '1;q=0.5, 2;q=0.75, 3;q=1.0', '3', {'1', '2', '3'}) - # }}} + # }}} + + def test_range_parsing(self): # {{{ + 'Test parsing of Range header' + from calibre.srv.respond import get_ranges + def test(val, *args): + pval = get_ranges(val, 100) + if len(args) == 1 and args[0] is None: + self.assertIsNone(pval, val) + else: + self.assertListEqual(pval, list(args), val) + test('crap', None) + test('crap=', None) + test('crap=1', None) + test('crap=1-2', None) + test('bytes=a-2') + test('bytes=0-99', (0, 100)) + test('bytes=0-0,-1', (0, 1), (99, 1)) + test('bytes=-5', (95, 5)) + test('bytes=95-', (95, 5)) + test('bytes=-200', (0, 100)) + # }}} def test_http_basic(self): # {{{ 'Test basic HTTP protocol conformance' @@ -194,4 +215,3 @@ class TestHTTP(BaseTest): self.ae(r.status, httplib.OK), self.ae(zlib.decompress(r.read(), 16+zlib.MAX_WBITS), b'an_etagged_path') # }}} -