Add unit test for listener IPC

This commit is contained in:
Kovid Goyal 2025-03-30 15:25:31 +05:30
parent 05f17d0212
commit 3a4f4dc5b6
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 56 additions and 12 deletions

View File

@ -11,7 +11,7 @@ from itertools import count
from qt.core import QAbstractSocket, QLocalServer, pyqtSignal
from calibre.utils.ipc import gui_socket_address
from calibre.utils.ipc import gui_socket_address, socket_address
def unix_socket(timeout=10):
@ -90,25 +90,67 @@ def send_message_in_process(msg, address=None, timeout=5):
ps.close()
def send_message_via_worker(msg, address=None, timeout=5, wait_till_sent=False):
def send_message_in_worker(address, timeout):
msg = sys.stdin.buffer.read()
send_message_in_process(msg, address, timeout)
def send_message_via_worker(msg, address=None, timeout=5, wait_till_sent=True):
# On Windows sending a message in a process that also is listening on the
# same named pipe in a different thread deadlocks, so we do the actual sending in
# a simple worker process
import json
import subprocess
from calibre.startup import get_debug_executable
cmd = get_debug_executable() + [
'-c', 'from calibre.gui2.listener import *; import sys, json;'
'send_message_in_process(sys.stdin.buffer.read(), address=json.loads(sys.argv[-2]), timeout=float(sys.argv[-1]))',
json.dumps(address), str(timeout)]
p = subprocess.Popen(cmd, stdin=subprocess.PIPE)
from calibre.utils.ipc.simple_worker import start_pipe_worker
if isinstance(msg, str):
msg = msg.encode('utf-8')
p = start_pipe_worker(f'from calibre.gui2.listener import *; send_message_in_worker({address!r}, {timeout!r})')
with closing(p.stdin):
p.stdin.write(msg)
if wait_till_sent:
return p.wait(timeout=timeout) == 0
return p.wait(timeout=timeout + 2) == 0
def listener_for_test(address):
from qt.core import QCoreApplication, QTimer
app = QCoreApplication([])
s = Listener(address=address, parent=app)
s.start_listening()
def got_message(msg):
if msg == b'quit':
app.quit()
return
sys.stdout.buffer.write(msg)
sys.stdout.buffer.write(os.linesep.encode())
sys.stdout.buffer.flush()
s.message_received.connect(got_message)
QTimer.singleShot(0, lambda: print('started', flush=True))
app.exec()
def find_tests():
import unittest
class TestIPC(unittest.TestCase):
def test_listener_ipc(self):
from calibre.utils.ipc.simple_worker import start_pipe_worker
address = socket_address('test')
server = start_pipe_worker(f'from calibre.gui2.listener import *; listener_for_test({address!r})')
try:
self.assertEqual(server.stdout.readline().rstrip(), b'started')
for msg in 'one', 'two', 'three':
send_message_in_process(msg, address=address)
self.assertEqual(server.stdout.readline().rstrip(), msg.encode())
self.assertTrue(send_message_via_worker('hello, world!', address=address))
self.assertEqual(server.stdout.readline().rstrip(), b'hello, world!')
msg = '123456789' * 8192 * 10
send_message_in_process(msg, address=address)
self.assertEqual(server.stdout.readline().rstrip(), msg.encode())
send_message_in_process('quit', address=address)
server.wait(2)
finally:
server.kill()
server.wait()
return unittest.defaultTestLoader.loadTestsFromTestCase(TestIPC)
def test():

View File

@ -310,6 +310,8 @@ def find_tests(which_tests=None, exclude_tests=None):
a(find_tests())
from calibre.utils.safe_atexit import find_tests
a(find_tests())
from calibre.gui2.listener import find_tests
a(find_tests())
if iswindows:
from calibre.utils.windows.wintest import find_tests
a(find_tests())