mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-08 02:34:06 -04:00
See if unbufferring and an explicit flush fix test on OS X
This commit is contained in:
parent
852be48cb7
commit
e32ff0ec67
@ -36,6 +36,7 @@ def run_worker(mod, func, **kw):
|
|||||||
exe = [sys.executable, os.path.join(sys.setup_dir, 'run-calibre-worker.py')]
|
exe = [sys.executable, os.path.join(sys.setup_dir, 'run-calibre-worker.py')]
|
||||||
env = kw.get('env', os.environ.copy())
|
env = kw.get('env', os.environ.copy())
|
||||||
env['CALIBRE_SIMPLE_WORKER'] = mod + ':' + func
|
env['CALIBRE_SIMPLE_WORKER'] = mod + ':' + func
|
||||||
|
env['PYTHONUNBUFFERED'] = '1'
|
||||||
if iswindows:
|
if iswindows:
|
||||||
import win32process
|
import win32process
|
||||||
kw['creationflags'] = win32process.CREATE_NO_WINDOW
|
kw['creationflags'] = win32process.CREATE_NO_WINDOW
|
||||||
@ -69,7 +70,9 @@ class IPCLockTest(unittest.TestCase):
|
|||||||
|
|
||||||
def test_exclusive_file_other_process(self):
|
def test_exclusive_file_other_process(self):
|
||||||
child = run_worker('calibre.utils.test_lock', 'other1', stdout=subprocess.PIPE)
|
child = run_worker('calibre.utils.test_lock', 'other1', stdout=subprocess.PIPE)
|
||||||
|
print(1111111111)
|
||||||
ready = child.stdout.readline()
|
ready = child.stdout.readline()
|
||||||
|
print(2222222222)
|
||||||
self.assertEqual(ready.strip(), b'ready')
|
self.assertEqual(ready.strip(), b'ready')
|
||||||
ef = FastFailEF('test')
|
ef = FastFailEF('test')
|
||||||
self.assertRaises(EnvironmentError, ef.__enter__)
|
self.assertRaises(EnvironmentError, ef.__enter__)
|
||||||
@ -84,10 +87,14 @@ def other1():
|
|||||||
e = ExclusiveFile('test')
|
e = ExclusiveFile('test')
|
||||||
with e:
|
with e:
|
||||||
print('ready')
|
print('ready')
|
||||||
sys.stdout.close()
|
sys.stdout.flush()
|
||||||
sys.stderr.close()
|
|
||||||
time.sleep(30)
|
time.sleep(30)
|
||||||
|
|
||||||
|
|
||||||
def find_tests():
|
def find_tests():
|
||||||
return unittest.defaultTestLoader.loadTestsFromTestCase(IPCLockTest)
|
return unittest.defaultTestLoader.loadTestsFromTestCase(IPCLockTest)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
suite = find_tests()
|
||||||
|
unittest.TextTestRunner(verbosity=4).run(suite)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user