Simplify windows exclusive file implementation by using msvcrt.locking as a fcntl substitute

This commit is contained in:
Kovid Goyal 2017-05-03 15:13:28 +05:30
parent ebe67702fd
commit 0fdf23de8b
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
4 changed files with 179 additions and 194 deletions

View File

@ -66,6 +66,8 @@ def find_tests(which_tests=None):
a(find_tests())
from calibre.utils.shared_file import find_tests
a(find_tests())
from calibre.utils.test_lock import find_tests
a(find_tests())
if ok('dbcli'):
from calibre.db.cli.tests import find_tests
a(find_tests())

View File

@ -10,7 +10,7 @@ from functools import partial
from collections import defaultdict
from copy import deepcopy
from calibre.utils.lock import LockError, ExclusiveFile
from calibre.utils.lock import ExclusiveFile
from calibre.constants import config_dir, CONFIG_DIR_MODE
plugin_dir = os.path.join(config_dir, 'plugins')
@ -278,30 +278,23 @@ class Config(ConfigInterface):
def parse(self):
src = ''
if os.path.exists(self.config_file_path):
try:
with ExclusiveFile(self.config_file_path) as f:
try:
src = f.read().decode('utf-8')
except ValueError:
print "Failed to parse", self.config_file_path
traceback.print_exc()
except LockError:
raise IOError('Could not lock config file: %s'%self.config_file_path)
return self.option_set.parse_string(src)
def as_string(self):
if not os.path.exists(self.config_file_path):
return ''
try:
with ExclusiveFile(self.config_file_path) as f:
return f.read().decode('utf-8')
except LockError:
raise IOError('Could not lock config file: %s'%self.config_file_path)
def set(self, name, val):
if not self.option_set.has_option(name):
raise ValueError('The option %s is not defined.'%name)
try:
if not os.path.exists(config_dir):
make_config_dir()
with ExclusiveFile(self.config_file_path) as f:
@ -315,8 +308,6 @@ class Config(ConfigInterface):
if isinstance(src, unicode):
src = src.encode('utf-8')
f.write(src)
except LockError:
raise IOError('Could not lock config file: %s'%self.config_file_path)
class StringConfig(ConfigInterface):

View File

@ -5,202 +5,107 @@ __docformat__ = 'restructuredtext en'
Secure access to locked files from multiple processes.
'''
from calibre.constants import iswindows, __appname__, islinux, ishaiku, win32api, win32event, winerror, fcntl
import time, atexit, os, stat, errno
import atexit
import errno
import os
import stat
import time
class LockError(Exception):
pass
class WindowsExclFile(object):
def __init__(self, path, timeout=20):
self.name = path
import win32file as w
import pywintypes
while timeout > 0:
timeout -= 1
try:
self._handle = w.CreateFile(
path,
w.GENERIC_READ | w.GENERIC_WRITE, # Open for reading and writing
0, # Open exclusive
None, # No security attributes, ensures handle is not inherited by children
w.OPEN_ALWAYS, # If file does not exist, create it
w.FILE_ATTRIBUTE_NORMAL, # Normal attributes
None, # No template file
from calibre.constants import (
__appname__, fcntl, filesystem_encoding, ishaiku, islinux, iswindows, win32api,
win32event, winerror
)
break
except pywintypes.error as err:
if getattr(err, 'args', [-1])[0] in (0x20, 0x21):
time.sleep(1)
continue
from calibre.utils.monotonic import monotonic
if iswindows:
excl_file_mode = stat.S_IREAD | stat.S_IWRITE
import msvcrt
else:
raise
if not hasattr(self, '_handle'):
raise LockError('Failed to open exclusive file: %s' % path)
def seek(self, amt, frm=0):
import win32file as w
if frm not in (0, 1, 2):
raise ValueError('Invalid from for seek: %s' % frm)
frm = {0: w.FILE_BEGIN, 1: w.FILE_CURRENT, 2: w.FILE_END}[frm]
if frm is w.FILE_END:
amt = 0 - amt
w.SetFilePointer(self._handle, amt, frm)
def tell(self):
import win32file as w
return w.SetFilePointer(self._handle, 0, w.FILE_CURRENT)
def flush(self):
import win32file as w
w.FlushFileBuffers(self._handle)
def close(self):
if self._handle is not None:
import win32file as w
self.flush()
w.CloseHandle(self._handle)
self._handle = None
def read(self, bytes=-1):
import win32file as w
sz = w.GetFileSize(self._handle)
max = sz - self.tell()
if bytes < 0:
bytes = max
bytes = min(max, bytes)
if bytes < 1:
return ''
hr, ans = w.ReadFile(self._handle, bytes, None)
if hr != 0:
raise IOError('Error reading file: %s' % hr)
return ans
def readlines(self, sizehint=-1):
return self.read().splitlines()
def write(self, bytes):
if isinstance(bytes, unicode):
bytes = bytes.encode('utf-8')
import win32file as w
w.WriteFile(self._handle, bytes, None)
def truncate(self, size=None):
import win32file as w
pos = self.tell()
if size is None:
size = pos
t = min(size, pos)
self.seek(t)
w.SetEndOfFile(self._handle)
self.seek(pos)
def isatty(self):
return False
@property
def closed(self):
return self._handle is None
excl_file_mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
def unix_open(path):
# We cannot use open(a+b) directly because Fedora apparently ships with a
# broken libc that causes seek(0) followed by truncate() to not work for
# files with O_APPEND set. We also use O_CLOEXEC when it is available,
# to ensure there are no races.
flags = os.O_RDWR | os.O_CREAT
from calibre.constants import plugins
speedup = plugins['speedup'][0]
has_cloexec = False
if hasattr(speedup, 'O_CLOEXEC'):
try:
fd = os.open(
path, flags | speedup.O_CLOEXEC, stat.S_IRUSR | stat.S_IWUSR |
stat.S_IRGRP | stat.S_IROTH
)
fd = os.open(path, flags | speedup.O_CLOEXEC, excl_file_mode)
has_cloexec = True
except EnvironmentError as err:
if getattr(err, 'errno', None) == errno.EINVAL:
# Kernel does not support O_CLOEXEC
fd = os.open(
path, flags, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP |
stat.S_IROTH
)
else:
# Kernel may not support O_CLOEXEC
if err.errno != errno.EINVAL:
raise
else:
fd = os.open(
path, flags, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
)
if not has_cloexec:
fd = os.open(path, flags, excl_file_mode)
fcntl.fcntl(fd, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
return os.fdopen(fd, 'r+b')
def windows_open(path):
flags = os.O_RDWR | os.O_CREAT | os.O_NOINHERIT | os.O_BINARY
fd = os.open(path, flags, excl_file_mode)
return os.fdopen(fd, 'r+bN')
class TimeoutError(Exception):
pass
def retry_for_a_time(timeout, sleep_time, func, *args):
limit = monotonic() + timeout
last_error = None
while monotonic() <= limit:
try:
return func(*args)
except EnvironmentError as err:
last_error = err.args
if monotonic() > limit:
break
time.sleep(sleep_time)
raise TimeoutError(*last_error)
class ExclusiveFile(object):
def __init__(self, path, timeout=15):
def __init__(self, path, timeout=15, sleep_time=0.2):
if iswindows:
if isinstance(path, bytes):
path = path.decode(filesystem_encoding)
self.path = path
self.timeout = timeout
self.sleep_time = sleep_time
def __enter__(self):
self.file = WindowsExclFile(self.path, self.timeout
) if iswindows else unix_open(self.path)
self.file.seek(0)
timeout = self.timeout
if not iswindows:
while self.timeout < 0 or timeout >= 0:
try:
fcntl.flock(self.file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except IOError:
time.sleep(1)
timeout -= 1
if timeout < 0 and self.timeout >= 0:
self.file.close()
raise LockError('Failed to lock')
if iswindows:
f = windows_open(self.path)
retry_for_a_time(
self.timeout, self.sleep_time, msvcrt.locking,
f.fileno(), msvcrt.LK_NBLCK, 1
)
else:
f = unix_open(self.path)
retry_for_a_time(
self.timeout, self.sleep_time, fcntl.flock,
f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB
)
self.file = f
except TimeoutError as err:
raise OSError(*(list(err.args)[:2] + [self.path]))
return self.file
def __exit__(self, type, value, traceback):
if iswindows:
try:
msvcrt.locking(self.file.fileno(), msvcrt.LK_UNLCK, 1)
except EnvironmentError:
pass
self.file.close()
def test_exclusive_file(path=None):
if path is None:
import tempfile
f = os.path.join(tempfile.gettempdir(), 'test-exclusive-file')
with ExclusiveFile(f):
# Try same process lock
try:
with ExclusiveFile(f, timeout=1):
raise LockError(
"ExclusiveFile failed to prevent multiple uses in the same process!"
)
except LockError:
pass
# Try different process lock
from calibre.utils.ipc.simple_worker import fork_job
err = fork_job('calibre.utils.lock', 'test_exclusive_file',
(f, ))['result']
if err is not None:
raise LockError('ExclusiveFile failed with error: %s' % err)
else:
try:
with ExclusiveFile(path, timeout=1):
raise Exception(
'ExclusiveFile failed to prevent multiple uses in different processes!'
)
except LockError:
pass
except Exception as err:
return str(err)
def _clean_lock_file(file):
try:
file.close()

View File

@ -0,0 +1,87 @@
#!/usr/bin/env python2
# vim:fileencoding=utf-8
# License: GPLv3 Copyright: 2017, Kovid Goyal <kovid at kovidgoyal.net>
from __future__ import absolute_import, division, print_function, unicode_literals
import fcntl
import os
import shutil
import subprocess
import tempfile
import unittest
from threading import Thread
from calibre.constants import iswindows
from calibre.debug import run_calibre_debug
from calibre.utils.lock import ExclusiveFile, unix_open
def FastFailEF(name):
return ExclusiveFile(name, sleep_time=0.01, timeout=0.05)
class Other(Thread):
daemon = True
locked = None
def run(self):
try:
with FastFailEF('test'):
self.locked = True
except EnvironmentError:
self.locked = False
class IPCLockTest(unittest.TestCase):
def setUp(self):
self.cwd = os.getcwd()
self.tdir = tempfile.mkdtemp()
os.chdir(self.tdir)
def tearDown(self):
os.chdir(self.cwd)
shutil.rmtree(self.tdir)
def test_exclusive_file_same_process(self):
with ExclusiveFile('test'):
ef = FastFailEF('test')
self.assertRaises(EnvironmentError, ef.__enter__)
t = Other()
t.start(), t.join()
self.assertIs(t.locked, False)
if not iswindows:
with unix_open('test') as f:
self.assertEqual(
1, fcntl.fcntl(f.fileno(), fcntl.F_GETFD) & fcntl.FD_CLOEXEC
)
def test_exclusive_file_other_process(self):
child = run_calibre_debug(
'-c',
'from calibre.utils.test_lock import other1; other1()',
stdout=subprocess.PIPE
)
ready = child.stdout.readline()
self.assertEqual(ready.strip(), b'ready')
ef = FastFailEF('test')
self.assertRaises(EnvironmentError, ef.__enter__)
child.kill()
self.assertIsNotNone(child.wait())
with ExclusiveFile('test'):
pass
def other1():
import sys, time
e = ExclusiveFile('test')
with e:
print('ready')
sys.stdout.close()
sys.stderr.close()
time.sleep(30)
def find_tests():
return unittest.defaultTestLoader.loadTestsFromTestCase(IPCLockTest)