From 666248454b6c22b89fc2022accc8d001d00da656 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Tue, 6 Aug 2013 07:44:19 +0530 Subject: [PATCH] Move lock testing into the main test suite --- src/calibre/db/locking.py | 170 ------------------------------- src/calibre/db/tests/locking.py | 174 ++++++++++++++++++++++++++++++++ 2 files changed, 174 insertions(+), 170 deletions(-) create mode 100644 src/calibre/db/tests/locking.py diff --git a/src/calibre/db/locking.py b/src/calibre/db/locking.py index ac871a5869..9245ecae84 100644 --- a/src/calibre/db/locking.py +++ b/src/calibre/db/locking.py @@ -242,174 +242,4 @@ class DebugRWLockWrapper(RWLockWrapper): print ('release done: thread id:', current_thread(), 'is_shared:', self._shlock.is_shared, 'is_exclusive:', self._shlock.is_exclusive, file=sys.stderr) print ('_' * 120, file=sys.stderr) -# Tests {{{ -if __name__ == '__main__': - import time, random, unittest - from threading import Thread - - class TestLock(unittest.TestCase): - """Testcases for Lock classes.""" - - def test_owns_locks(self): - lock = SHLock() - self.assertFalse(lock.owns_lock()) - lock.acquire(shared=True) - self.assertTrue(lock.owns_lock()) - lock.release() - self.assertFalse(lock.owns_lock()) - lock.acquire(shared=False) - self.assertTrue(lock.owns_lock()) - lock.release() - self.assertFalse(lock.owns_lock()) - - done = [] - def test(): - if not lock.owns_lock(): - done.append(True) - lock.acquire() - t = Thread(target=test) - t.daemon = True - t.start() - t.join(1) - self.assertEqual(len(done), 1) - lock.release() - - def test_multithread_deadlock(self): - lock = SHLock() - def two_shared(): - r = RWLockWrapper(lock) - with r: - time.sleep(0.2) - with r: - pass - def one_exclusive(): - time.sleep(0.1) - w = RWLockWrapper(lock, is_shared=False) - with w: - pass - threads = [Thread(target=two_shared), Thread(target=one_exclusive)] - for t in threads: - t.daemon = True - t.start() - for t in threads: - t.join(5) - live = [t for t in threads if t.is_alive()] - self.assertListEqual(live, [], 'ShLock hung') - - def test_upgrade(self): - lock = SHLock() - lock.acquire(shared=True) - self.assertRaises(LockingError, lock.acquire, shared=False) - lock.release() - - def test_downgrade(self): - lock = SHLock() - lock.acquire(shared=False) - self.assertRaises(LockingError, lock.acquire, shared=True) - lock.release() - - def test_recursive(self): - lock = SHLock() - lock.acquire(shared=True) - lock.acquire(shared=True) - self.assertEqual(lock.is_shared, 2) - lock.release() - lock.release() - self.assertFalse(lock.is_shared) - lock.acquire(shared=False) - lock.acquire(shared=False) - self.assertEqual(lock.is_exclusive, 2) - lock.release() - lock.release() - self.assertFalse(lock.is_exclusive) - - def test_release(self): - lock = SHLock() - self.assertRaises(LockingError, lock.release) - - def get_lock(shared): - lock.acquire(shared=shared) - time.sleep(1) - lock.release() - - threads = [Thread(target=get_lock, args=(x,)) for x in (True, - False)] - for t in threads: - t.daemon = True - t.start() - self.assertRaises(LockingError, lock.release) - t.join(2) - self.assertFalse(t.is_alive()) - self.assertFalse(lock.is_shared) - self.assertFalse(lock.is_exclusive) - - def test_acquire(self): - lock = SHLock() - - def get_lock(shared): - lock.acquire(shared=shared) - time.sleep(1) - lock.release() - - shared = Thread(target=get_lock, args=(True,)) - shared.daemon = True - shared.start() - time.sleep(0.1) - self.assertTrue(lock.acquire(shared=True, blocking=False)) - lock.release() - self.assertFalse(lock.acquire(shared=False, blocking=False)) - lock.acquire(shared=False) - self.assertFalse(shared.is_alive()) - lock.release() - self.assertTrue(lock.acquire(shared=False, blocking=False)) - lock.release() - - exclusive = Thread(target=get_lock, args=(False,)) - exclusive.daemon = True - exclusive.start() - time.sleep(0.1) - self.assertFalse(lock.acquire(shared=False, blocking=False)) - self.assertFalse(lock.acquire(shared=True, blocking=False)) - lock.acquire(shared=True) - self.assertFalse(exclusive.is_alive()) - lock.release() - lock.acquire(shared=False) - lock.release() - lock.acquire(shared=True) - lock.release() - self.assertFalse(lock.is_shared) - self.assertFalse(lock.is_exclusive) - - def test_contention(self): - lock = SHLock() - done = [] - def lots_of_acquires(): - for _ in xrange(1000): - shared = random.choice([True,False]) - lock.acquire(shared=shared) - lock.acquire(shared=shared) - time.sleep(random.random() * 0.0001) - lock.release() - time.sleep(random.random() * 0.0001) - lock.acquire(shared=shared) - time.sleep(random.random() * 0.0001) - lock.release() - lock.release() - done.append(True) - threads = [Thread(target=lots_of_acquires) for _ in xrange(10)] - for t in threads: - t.daemon = True - t.start() - for t in threads: - t.join(20) - live = [t for t in threads if t.is_alive()] - self.assertListEqual(live, [], 'ShLock hung') - self.assertEqual(len(done), len(threads), 'SHLock locking failed') - self.assertFalse(lock.is_shared) - self.assertFalse(lock.is_exclusive) - - suite = unittest.TestLoader().loadTestsFromTestCase(TestLock) - unittest.TextTestRunner(verbosity=2).run(suite) - -# }}} diff --git a/src/calibre/db/tests/locking.py b/src/calibre/db/tests/locking.py new file mode 100644 index 0000000000..a9dcb43ae4 --- /dev/null +++ b/src/calibre/db/tests/locking.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python +# vim:fileencoding=utf-8 +from __future__ import (unicode_literals, division, absolute_import, + print_function) + +__license__ = 'GPL v3' +__copyright__ = '2013, Kovid Goyal ' + +import time, random +from threading import Thread +from calibre.db.tests.base import BaseTest +from calibre.db.locking import SHLock, RWLockWrapper, LockingError + +class TestLock(BaseTest): + """Tests for db locking """ + + def test_owns_locks(self): + lock = SHLock() + self.assertFalse(lock.owns_lock()) + lock.acquire(shared=True) + self.assertTrue(lock.owns_lock()) + lock.release() + self.assertFalse(lock.owns_lock()) + lock.acquire(shared=False) + self.assertTrue(lock.owns_lock()) + lock.release() + self.assertFalse(lock.owns_lock()) + + done = [] + def test(): + if not lock.owns_lock(): + done.append(True) + lock.acquire() + t = Thread(target=test) + t.daemon = True + t.start() + t.join(1) + self.assertEqual(len(done), 1) + lock.release() + + def test_multithread_deadlock(self): + lock = SHLock() + def two_shared(): + r = RWLockWrapper(lock) + with r: + time.sleep(0.2) + with r: + pass + def one_exclusive(): + time.sleep(0.1) + w = RWLockWrapper(lock, is_shared=False) + with w: + pass + threads = [Thread(target=two_shared), Thread(target=one_exclusive)] + for t in threads: + t.daemon = True + t.start() + for t in threads: + t.join(5) + live = [t for t in threads if t.is_alive()] + self.assertListEqual(live, [], 'ShLock hung') + + def test_upgrade(self): + lock = SHLock() + lock.acquire(shared=True) + self.assertRaises(LockingError, lock.acquire, shared=False) + lock.release() + + def test_downgrade(self): + lock = SHLock() + lock.acquire(shared=False) + self.assertRaises(LockingError, lock.acquire, shared=True) + lock.release() + + def test_recursive(self): + lock = SHLock() + lock.acquire(shared=True) + lock.acquire(shared=True) + self.assertEqual(lock.is_shared, 2) + lock.release() + lock.release() + self.assertFalse(lock.is_shared) + lock.acquire(shared=False) + lock.acquire(shared=False) + self.assertEqual(lock.is_exclusive, 2) + lock.release() + lock.release() + self.assertFalse(lock.is_exclusive) + + def test_release(self): + lock = SHLock() + self.assertRaises(LockingError, lock.release) + + def get_lock(shared): + lock.acquire(shared=shared) + time.sleep(1) + lock.release() + + threads = [Thread(target=get_lock, args=(x,)) for x in (True, + False)] + for t in threads: + t.daemon = True + t.start() + self.assertRaises(LockingError, lock.release) + t.join(2) + self.assertFalse(t.is_alive()) + self.assertFalse(lock.is_shared) + self.assertFalse(lock.is_exclusive) + + def test_acquire(self): + lock = SHLock() + + def get_lock(shared): + lock.acquire(shared=shared) + time.sleep(1) + lock.release() + + shared = Thread(target=get_lock, args=(True,)) + shared.daemon = True + shared.start() + time.sleep(0.1) + self.assertTrue(lock.acquire(shared=True, blocking=False)) + lock.release() + self.assertFalse(lock.acquire(shared=False, blocking=False)) + lock.acquire(shared=False) + self.assertFalse(shared.is_alive()) + lock.release() + self.assertTrue(lock.acquire(shared=False, blocking=False)) + lock.release() + + exclusive = Thread(target=get_lock, args=(False,)) + exclusive.daemon = True + exclusive.start() + time.sleep(0.1) + self.assertFalse(lock.acquire(shared=False, blocking=False)) + self.assertFalse(lock.acquire(shared=True, blocking=False)) + lock.acquire(shared=True) + self.assertFalse(exclusive.is_alive()) + lock.release() + lock.acquire(shared=False) + lock.release() + lock.acquire(shared=True) + lock.release() + self.assertFalse(lock.is_shared) + self.assertFalse(lock.is_exclusive) + + def test_contention(self): + lock = SHLock() + done = [] + def lots_of_acquires(): + for _ in xrange(1000): + shared = random.choice([True,False]) + lock.acquire(shared=shared) + lock.acquire(shared=shared) + time.sleep(random.random() * 0.0001) + lock.release() + time.sleep(random.random() * 0.0001) + lock.acquire(shared=shared) + time.sleep(random.random() * 0.0001) + lock.release() + lock.release() + done.append(True) + threads = [Thread(target=lots_of_acquires) for _ in xrange(10)] + for t in threads: + t.daemon = True + t.start() + for t in threads: + t.join(20) + live = [t for t in threads if t.is_alive()] + self.assertListEqual(live, [], 'ShLock hung') + self.assertEqual(len(done), len(threads), 'SHLock locking failed') + self.assertFalse(lock.is_shared) + self.assertFalse(lock.is_exclusive) +