mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-31 14:33:54 -04:00
More tests for digest auth
This commit is contained in:
parent
1fc2082c4a
commit
9f209ae695
@ -71,7 +71,7 @@ class DigestAuth(object): # {{{
|
||||
timestamp, server secret and realm. This allows the timestamp to be
|
||||
validated and stale nonce's to be rejected.'''
|
||||
if timestamp is None:
|
||||
timestamp = binascii.hexlify(struct.pack(b'!f', float(monotonic())))
|
||||
timestamp = binascii.hexlify(struct.pack(b'!d', float(monotonic())))
|
||||
h = sha1_hex(':'.join((timestamp, realm, secret)))
|
||||
nonce = ':'.join((timestamp, h))
|
||||
return nonce
|
||||
@ -83,7 +83,7 @@ class DigestAuth(object): # {{{
|
||||
|
||||
def is_nonce_stale(self, max_age_seconds=MAX_AGE_SECONDS):
|
||||
try:
|
||||
timestamp = struct.unpack(b'!f', binascii.unhexlify(as_bytestring(self.nonce.partition(':')[0])))[0]
|
||||
timestamp = struct.unpack(b'!d', binascii.unhexlify(as_bytestring(self.nonce.partition(':')[0])))[0]
|
||||
return timestamp + max_age_seconds < monotonic()
|
||||
except Exception:
|
||||
pass
|
||||
|
@ -6,7 +6,8 @@ from __future__ import (unicode_literals, division, absolute_import,
|
||||
__license__ = 'GPL v3'
|
||||
__copyright__ = '2015, Kovid Goyal <kovid at kovidgoyal.net>'
|
||||
|
||||
import httplib, base64, urllib2
|
||||
import httplib, base64, urllib2, subprocess, os
|
||||
from distutils.spawn import find_executable
|
||||
|
||||
from calibre.srv.tests.base import BaseTest, TestServer
|
||||
from calibre.srv.routes import endpoint, Router
|
||||
@ -37,9 +38,27 @@ def urlopen(server, path='/closed', un='testuser', pw='testpw', method='digest')
|
||||
auth_handler.add_password(realm=REALM, uri=url, user=un, passwd=pw)
|
||||
return urllib2.build_opener(auth_handler).open(url)
|
||||
|
||||
def digest(un, pw, nonce=None, uri=None, method='GET', nc=1, qop='auth', realm=REALM, cnonce=None, algorithm='MD5', body=b'', modify=lambda x:None):
|
||||
'Create the payload for a digest based Authorization header'
|
||||
from calibre.srv.auth import DigestAuth
|
||||
templ = ('username="{un}", realm="{realm}", qop={qop}, method="{method}",'
|
||||
' nonce="{nonce}", uri="{uri}", nc={nc}, algorithm="{algorithm}", cnonce="{cnonce}", response="{response}"')
|
||||
h = templ.format(un=un, realm=realm, qop=qop, uri=uri, method=method, nonce=nonce, nc=nc, cnonce=cnonce, algorithm=algorithm, response=None)
|
||||
da = DigestAuth(h)
|
||||
modify(da)
|
||||
pw = getattr(da, 'pw', pw)
|
||||
class Data(object):
|
||||
def __init__(self):
|
||||
self.method = method
|
||||
def peek():
|
||||
return body
|
||||
response = da.request_digest(pw, Data())
|
||||
return ('Digest ' + templ.format(
|
||||
un=un, realm=realm, qop=qop, uri=uri, method=method, nonce=nonce, nc=nc, cnonce=cnonce, algorithm=algorithm, response=response)).encode('ascii')
|
||||
|
||||
class TestAuth(BaseTest):
|
||||
|
||||
def test_basic_auth(self):
|
||||
def test_basic_auth(self): # {{{
|
||||
'Test HTTP Basic auth'
|
||||
r = router(prefer_basic_auth=True)
|
||||
with TestServer(r.dispatch) as server:
|
||||
@ -74,8 +93,9 @@ class TestAuth(BaseTest):
|
||||
self.ae(1, len(warnings))
|
||||
self.ae((httplib.UNAUTHORIZED, b''), request('testuser', 'y'))
|
||||
self.ae((httplib.UNAUTHORIZED, b''), request('asf', 'testpw'))
|
||||
# }}}
|
||||
|
||||
def test_digest_auth(self):
|
||||
def test_digest_auth(self): # {{{
|
||||
'Test HTTP Digest auth'
|
||||
from calibre.srv.http_request import normalize_header_name
|
||||
from calibre.srv.utils import parse_http_dict
|
||||
@ -90,7 +110,64 @@ class TestAuth(BaseTest):
|
||||
return {normalize_header_name(k):v for k, v in r.getheaders()}
|
||||
conn = server.connect()
|
||||
test(conn, '/open', body=b'open')
|
||||
auth = parse_http_dict(test(conn, '/closed', status=httplib.UNAUTHORIZED)['WWW-Authenticate'])
|
||||
self.ae(auth[b'Digest realm'], bytes(REALM)), self.ae(auth[b'algorithm'], b'MD5'), self.ae(auth[b'qop'], b'auth')
|
||||
auth = parse_http_dict(test(conn, '/closed', status=httplib.UNAUTHORIZED)['WWW-Authenticate'].partition(b' ')[2])
|
||||
nonce = auth['nonce']
|
||||
auth = parse_http_dict(test(conn, '/closed', status=httplib.UNAUTHORIZED)['WWW-Authenticate'].partition(b' ')[2])
|
||||
self.assertNotEqual(nonce, auth['nonce'], 'nonce was re-used')
|
||||
self.ae(auth[b'realm'], bytes(REALM)), self.ae(auth[b'algorithm'], b'MD5'), self.ae(auth[b'qop'], b'auth')
|
||||
self.assertNotIn('stale', auth)
|
||||
args = auth.copy()
|
||||
args['un'], args['pw'], args['uri'] = 'testuser', 'testpw', '/closed'
|
||||
|
||||
def ok_test(conn, dh, **args):
|
||||
args['body'] = args.get('body', b'closed')
|
||||
return test(conn, '/closed', headers={'Authorization':dh}, **args)
|
||||
|
||||
ok_test(conn, digest(**args))
|
||||
# Check that server ignores repeated nc values
|
||||
ok_test(conn, digest(**args))
|
||||
|
||||
# Check stale nonces
|
||||
orig, r.auth_controller.max_age_seconds = r.auth_controller.max_age_seconds, -1
|
||||
auth = parse_http_dict(test(conn, '/closed', headers={
|
||||
'Authorization':digest(**args)},status=httplib.UNAUTHORIZED)['WWW-Authenticate'].partition(b' ')[2])
|
||||
self.assertIn('stale', auth)
|
||||
r.auth_controller.max_age_seconds = orig
|
||||
ok_test(conn, digest(**args))
|
||||
|
||||
def fail_test(conn, modify, **kw):
|
||||
kw['body'] = kw.get('body', b'')
|
||||
kw['status'] = kw.get('status', httplib.UNAUTHORIZED)
|
||||
args['modify'] = modify
|
||||
return test(conn, '/closed', headers={'Authorization':digest(**args)}, **kw)
|
||||
|
||||
# Check modified nonce fails
|
||||
fail_test(conn, lambda da:setattr(da, 'nonce', 'xyz'))
|
||||
fail_test(conn, lambda da:setattr(da, 'nonce', 'x' + da.nonce))
|
||||
|
||||
# Check mismatched uri fails
|
||||
fail_test(conn, lambda da:setattr(da, 'uri', '/'))
|
||||
fail_test(conn, lambda da:setattr(da, 'uri', '/closed2'))
|
||||
fail_test(conn, lambda da:setattr(da, 'uri', '/closed/2'))
|
||||
|
||||
# Check that incorrect user/password fails
|
||||
fail_test(conn, lambda da:setattr(da, 'pw', '/'))
|
||||
fail_test(conn, lambda da:setattr(da, 'username', '/'))
|
||||
|
||||
# Check against python's stdlib
|
||||
self.ae(urlopen(server).read(), b'closed')
|
||||
|
||||
# Check using curl
|
||||
curl = find_executable('curl')
|
||||
if curl:
|
||||
def docurl(data, *args):
|
||||
cmd = [curl] + list(args) + ['http://localhost:%d/closed' % server.address[1]]
|
||||
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=open(os.devnull, 'wb'))
|
||||
x = p.stdout.read()
|
||||
p.wait()
|
||||
self.ae(x, data)
|
||||
docurl(b'')
|
||||
docurl(b'', '--digest', '--user', 'xxxx:testpw')
|
||||
docurl(b'', '--digest', '--user', 'testuser:xtestpw')
|
||||
docurl(b'closed', '--digest', '--user', 'testuser:testpw')
|
||||
# }}}
|
||||
|
Loading…
x
Reference in New Issue
Block a user