Add test for shutdown with hung worker

This commit is contained in:
Kovid Goyal 2015-05-29 18:45:37 +05:30
parent a485b61fe1
commit 00df6549c0

View File

@ -6,7 +6,7 @@ from __future__ import (unicode_literals, division, absolute_import,
__license__ = 'GPL v3'
__copyright__ = '2015, Kovid Goyal <kovid at kovidgoyal.net>'
import httplib, ssl, os, socket
import httplib, ssl, os, socket, time
from unittest import skipIf
try:
@ -28,6 +28,18 @@ class LoopTest(BaseTest):
server.loop.stop()
server.join()
self.ae(0, sum(int(w.is_alive()) for w in server.loop.pool.workers))
# Test shutdown with hung worker
with TestServer(lambda data:time.sleep(1000), worker_count=3, shutdown_timeout=0.01, timeout=0.01) as server:
pool = server.loop.pool
self.ae(3, sum(int(w.is_alive()) for w in pool.workers))
conn = server.connect()
conn.request('GET', '/')
with self.assertRaises(socket.timeout):
conn.getresponse()
self.ae(pool.busy, 1)
server.loop.stop()
server.join()
self.ae(1, sum(int(w.is_alive()) for w in pool.workers))
def test_ring_buffer(self):
class FakeSocket(object):