From 9f209ae695a62621ab6610363d3f878d64c582ed Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Thu, 11 Jun 2015 13:54:55 +0530 Subject: [PATCH] More tests for digest auth --- src/calibre/srv/auth.py | 4 +- src/calibre/srv/tests/auth.py | 87 +++++++++++++++++++++++++++++++++-- 2 files changed, 84 insertions(+), 7 deletions(-) diff --git a/src/calibre/srv/auth.py b/src/calibre/srv/auth.py index 3bba616b1c..cd93abb00c 100644 --- a/src/calibre/srv/auth.py +++ b/src/calibre/srv/auth.py @@ -71,7 +71,7 @@ class DigestAuth(object): # {{{ timestamp, server secret and realm. This allows the timestamp to be validated and stale nonce's to be rejected.''' if timestamp is None: - timestamp = binascii.hexlify(struct.pack(b'!f', float(monotonic()))) + timestamp = binascii.hexlify(struct.pack(b'!d', float(monotonic()))) h = sha1_hex(':'.join((timestamp, realm, secret))) nonce = ':'.join((timestamp, h)) return nonce @@ -83,7 +83,7 @@ class DigestAuth(object): # {{{ def is_nonce_stale(self, max_age_seconds=MAX_AGE_SECONDS): try: - timestamp = struct.unpack(b'!f', binascii.unhexlify(as_bytestring(self.nonce.partition(':')[0])))[0] + timestamp = struct.unpack(b'!d', binascii.unhexlify(as_bytestring(self.nonce.partition(':')[0])))[0] return timestamp + max_age_seconds < monotonic() except Exception: pass diff --git a/src/calibre/srv/tests/auth.py b/src/calibre/srv/tests/auth.py index 172efb2418..66c80fd2ef 100644 --- a/src/calibre/srv/tests/auth.py +++ b/src/calibre/srv/tests/auth.py @@ -6,7 +6,8 @@ from __future__ import (unicode_literals, division, absolute_import, __license__ = 'GPL v3' __copyright__ = '2015, Kovid Goyal ' -import httplib, base64, urllib2 +import httplib, base64, urllib2, subprocess, os +from distutils.spawn import find_executable from calibre.srv.tests.base import BaseTest, TestServer from calibre.srv.routes import endpoint, Router @@ -37,9 +38,27 @@ def urlopen(server, path='/closed', un='testuser', pw='testpw', method='digest') auth_handler.add_password(realm=REALM, uri=url, user=un, passwd=pw) return urllib2.build_opener(auth_handler).open(url) +def digest(un, pw, nonce=None, uri=None, method='GET', nc=1, qop='auth', realm=REALM, cnonce=None, algorithm='MD5', body=b'', modify=lambda x:None): + 'Create the payload for a digest based Authorization header' + from calibre.srv.auth import DigestAuth + templ = ('username="{un}", realm="{realm}", qop={qop}, method="{method}",' + ' nonce="{nonce}", uri="{uri}", nc={nc}, algorithm="{algorithm}", cnonce="{cnonce}", response="{response}"') + h = templ.format(un=un, realm=realm, qop=qop, uri=uri, method=method, nonce=nonce, nc=nc, cnonce=cnonce, algorithm=algorithm, response=None) + da = DigestAuth(h) + modify(da) + pw = getattr(da, 'pw', pw) + class Data(object): + def __init__(self): + self.method = method + def peek(): + return body + response = da.request_digest(pw, Data()) + return ('Digest ' + templ.format( + un=un, realm=realm, qop=qop, uri=uri, method=method, nonce=nonce, nc=nc, cnonce=cnonce, algorithm=algorithm, response=response)).encode('ascii') + class TestAuth(BaseTest): - def test_basic_auth(self): + def test_basic_auth(self): # {{{ 'Test HTTP Basic auth' r = router(prefer_basic_auth=True) with TestServer(r.dispatch) as server: @@ -74,8 +93,9 @@ class TestAuth(BaseTest): self.ae(1, len(warnings)) self.ae((httplib.UNAUTHORIZED, b''), request('testuser', 'y')) self.ae((httplib.UNAUTHORIZED, b''), request('asf', 'testpw')) + # }}} - def test_digest_auth(self): + def test_digest_auth(self): # {{{ 'Test HTTP Digest auth' from calibre.srv.http_request import normalize_header_name from calibre.srv.utils import parse_http_dict @@ -90,7 +110,64 @@ class TestAuth(BaseTest): return {normalize_header_name(k):v for k, v in r.getheaders()} conn = server.connect() test(conn, '/open', body=b'open') - auth = parse_http_dict(test(conn, '/closed', status=httplib.UNAUTHORIZED)['WWW-Authenticate']) - self.ae(auth[b'Digest realm'], bytes(REALM)), self.ae(auth[b'algorithm'], b'MD5'), self.ae(auth[b'qop'], b'auth') + auth = parse_http_dict(test(conn, '/closed', status=httplib.UNAUTHORIZED)['WWW-Authenticate'].partition(b' ')[2]) + nonce = auth['nonce'] + auth = parse_http_dict(test(conn, '/closed', status=httplib.UNAUTHORIZED)['WWW-Authenticate'].partition(b' ')[2]) + self.assertNotEqual(nonce, auth['nonce'], 'nonce was re-used') + self.ae(auth[b'realm'], bytes(REALM)), self.ae(auth[b'algorithm'], b'MD5'), self.ae(auth[b'qop'], b'auth') self.assertNotIn('stale', auth) + args = auth.copy() + args['un'], args['pw'], args['uri'] = 'testuser', 'testpw', '/closed' + + def ok_test(conn, dh, **args): + args['body'] = args.get('body', b'closed') + return test(conn, '/closed', headers={'Authorization':dh}, **args) + + ok_test(conn, digest(**args)) + # Check that server ignores repeated nc values + ok_test(conn, digest(**args)) + + # Check stale nonces + orig, r.auth_controller.max_age_seconds = r.auth_controller.max_age_seconds, -1 + auth = parse_http_dict(test(conn, '/closed', headers={ + 'Authorization':digest(**args)},status=httplib.UNAUTHORIZED)['WWW-Authenticate'].partition(b' ')[2]) + self.assertIn('stale', auth) + r.auth_controller.max_age_seconds = orig + ok_test(conn, digest(**args)) + + def fail_test(conn, modify, **kw): + kw['body'] = kw.get('body', b'') + kw['status'] = kw.get('status', httplib.UNAUTHORIZED) + args['modify'] = modify + return test(conn, '/closed', headers={'Authorization':digest(**args)}, **kw) + + # Check modified nonce fails + fail_test(conn, lambda da:setattr(da, 'nonce', 'xyz')) + fail_test(conn, lambda da:setattr(da, 'nonce', 'x' + da.nonce)) + + # Check mismatched uri fails + fail_test(conn, lambda da:setattr(da, 'uri', '/')) + fail_test(conn, lambda da:setattr(da, 'uri', '/closed2')) + fail_test(conn, lambda da:setattr(da, 'uri', '/closed/2')) + + # Check that incorrect user/password fails + fail_test(conn, lambda da:setattr(da, 'pw', '/')) + fail_test(conn, lambda da:setattr(da, 'username', '/')) + + # Check against python's stdlib self.ae(urlopen(server).read(), b'closed') + + # Check using curl + curl = find_executable('curl') + if curl: + def docurl(data, *args): + cmd = [curl] + list(args) + ['http://localhost:%d/closed' % server.address[1]] + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=open(os.devnull, 'wb')) + x = p.stdout.read() + p.wait() + self.ae(x, data) + docurl(b'') + docurl(b'', '--digest', '--user', 'xxxx:testpw') + docurl(b'', '--digest', '--user', 'testuser:xtestpw') + docurl(b'closed', '--digest', '--user', 'testuser:testpw') + # }}}