mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-09 03:04:10 -04:00
py3: port websocket tests
This commit is contained in:
parent
440d174284
commit
51ae5ee86f
@ -16,7 +16,7 @@ from calibre.srv.web_socket import (
|
|||||||
from calibre.utils.monotonic import monotonic
|
from calibre.utils.monotonic import monotonic
|
||||||
from calibre.utils.socket_inheritance import set_socket_inherit
|
from calibre.utils.socket_inheritance import set_socket_inherit
|
||||||
from polyglot.builtins import range, unicode_type
|
from polyglot.builtins import range, unicode_type
|
||||||
from polyglot.binary import as_base64_bytes, as_base64_unicode
|
from polyglot.binary import as_base64_unicode
|
||||||
|
|
||||||
HANDSHAKE_STR = '''\
|
HANDSHAKE_STR = '''\
|
||||||
GET / HTTP/1.1\r
|
GET / HTTP/1.1\r
|
||||||
@ -35,7 +35,7 @@ class WSClient(object):
|
|||||||
self.timeout = timeout
|
self.timeout = timeout
|
||||||
self.socket = socket.create_connection(('localhost', port), timeout)
|
self.socket = socket.create_connection(('localhost', port), timeout)
|
||||||
set_socket_inherit(self.socket, False)
|
set_socket_inherit(self.socket, False)
|
||||||
self.key = as_base64_bytes(os.urandom(8))
|
self.key = as_base64_unicode(os.urandom(8))
|
||||||
self.socket.sendall(HANDSHAKE_STR.format(self.key).encode('ascii'))
|
self.socket.sendall(HANDSHAKE_STR.format(self.key).encode('ascii'))
|
||||||
self.read_buf = deque()
|
self.read_buf = deque()
|
||||||
self.read_upgrade_response()
|
self.read_upgrade_response()
|
||||||
@ -64,7 +64,7 @@ class WSClient(object):
|
|||||||
if rl != b'HTTP/1.1 101 Switching Protocols\r\n':
|
if rl != b'HTTP/1.1 101 Switching Protocols\r\n':
|
||||||
raise ValueError('Server did not respond with correct switching protocols line')
|
raise ValueError('Server did not respond with correct switching protocols line')
|
||||||
headers = read_headers(partial(next, lines))
|
headers = read_headers(partial(next, lines))
|
||||||
key = as_base64_unicode(sha1(self.key + GUID_STR.encode('ascii')).digest())
|
key = as_base64_unicode(sha1((self.key + GUID_STR).encode('ascii')).digest())
|
||||||
if headers.get('Sec-WebSocket-Accept') != key:
|
if headers.get('Sec-WebSocket-Accept') != key:
|
||||||
raise ValueError('Server did not respond with correct key in Sec-WebSocket-Accept: {} != {}'.format(
|
raise ValueError('Server did not respond with correct key in Sec-WebSocket-Accept: {} != {}'.format(
|
||||||
key, headers.get('Sec-WebSocket-Accept')))
|
key, headers.get('Sec-WebSocket-Accept')))
|
||||||
@ -215,7 +215,7 @@ class WebSocketTest(BaseTest):
|
|||||||
for q in (b'', b'\xfe' * 125, b'\xfe' * 126, b'\xfe' * 127, b'\xfe' * 128, b'\xfe' * 65535, b'\xfe' * 65536):
|
for q in (b'', b'\xfe' * 125, b'\xfe' * 126, b'\xfe' * 127, b'\xfe' * 128, b'\xfe' * 65535, b'\xfe' * 65536):
|
||||||
simple_test([q], [q])
|
simple_test([q], [q])
|
||||||
|
|
||||||
for payload in ['', 'ping', b'\x00\xff\xfe\xfd\xfc\xfb\x00\xff', b"\xfe" * 125]:
|
for payload in [b'', b'ping', b'\x00\xff\xfe\xfd\xfc\xfb\x00\xff', b"\xfe" * 125]:
|
||||||
simple_test([(PING, payload)], [(PONG, payload)])
|
simple_test([(PING, payload)], [(PONG, payload)])
|
||||||
|
|
||||||
with server.silence_log:
|
with server.silence_log:
|
||||||
@ -260,7 +260,8 @@ class WebSocketTest(BaseTest):
|
|||||||
isf_test(frags, close_code=INCONSISTENT_DATA, send_close=False)
|
isf_test(frags, close_code=INCONSISTENT_DATA, send_close=False)
|
||||||
|
|
||||||
frags, q = [], b'\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5\xed\xa0\x80\x80\x65\x64\x69\x74\x65\x64'
|
frags, q = [], b'\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5\xed\xa0\x80\x80\x65\x64\x69\x74\x65\x64'
|
||||||
for i, b in enumerate(q):
|
for i in range(len(q)):
|
||||||
|
b = q[i:i+1]
|
||||||
frags.append({'opcode':(TEXT if i == 0 else CONTINUATION), 'fin':1 if i == len(q)-1 else 0, 'payload':b})
|
frags.append({'opcode':(TEXT if i == 0 else CONTINUATION), 'fin':1 if i == len(q)-1 else 0, 'payload':b})
|
||||||
isf_test(frags, close_code=INCONSISTENT_DATA, send_close=False, ignore_send_failures=True)
|
isf_test(frags, close_code=INCONSISTENT_DATA, send_close=False, ignore_send_failures=True)
|
||||||
|
|
||||||
@ -291,7 +292,8 @@ class WebSocketTest(BaseTest):
|
|||||||
|
|
||||||
for q in (b'\xc2\xb5', b'\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5', "Hello-µ@ßöäüàá-UTF-8!!".encode('utf-8')):
|
for q in (b'\xc2\xb5', b'\xce\xba\xe1\xbd\xb9\xcf\x83\xce\xbc\xce\xb5', "Hello-µ@ßöäüàá-UTF-8!!".encode('utf-8')):
|
||||||
frags = []
|
frags = []
|
||||||
for i, b in enumerate(q):
|
for i in range(len(q)):
|
||||||
|
b = q[i:i+1]
|
||||||
frags.append({'opcode':(TEXT if i == 0 else CONTINUATION), 'fin':1 if i == len(q)-1 else 0, 'payload':b})
|
frags.append({'opcode':(TEXT if i == 0 else CONTINUATION), 'fin':1 if i == len(q)-1 else 0, 'payload':b})
|
||||||
simple_test(frags, [q.decode('utf-8')])
|
simple_test(frags, [q.decode('utf-8')])
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user