mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-09 03:04:10 -04:00
new db: Implement locking for file I/O and the cover() method
This commit is contained in:
parent
e6b8100fce
commit
64c3c60407
@ -8,7 +8,7 @@ __copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
||||
# Imports {{{
|
||||
import os, shutil, uuid, json, glob
|
||||
import os, shutil, uuid, json, glob, time, tempfile
|
||||
from functools import partial
|
||||
|
||||
import apsw
|
||||
@ -37,6 +37,8 @@ Differences in semantics from pysqlite:
|
||||
|
||||
'''
|
||||
|
||||
SPOOL_SIZE = 30*1024*1024
|
||||
|
||||
class DynamicFilter(object): # {{{
|
||||
|
||||
'No longer used, present for legacy compatibility'
|
||||
@ -784,5 +786,34 @@ class DB(object):
|
||||
ans['mtime'] = utcfromtimestamp(stat.st_mtime)
|
||||
return ans
|
||||
|
||||
def cover(self, path, as_file=False, as_image=False,
|
||||
as_path=False):
|
||||
path = os.path.join(self.library_path, path, 'cover.jpg')
|
||||
ret = None
|
||||
if os.access(path, os.R_OK):
|
||||
try:
|
||||
f = lopen(path, 'rb')
|
||||
except (IOError, OSError):
|
||||
time.sleep(0.2)
|
||||
f = lopen(path, 'rb')
|
||||
with f:
|
||||
if as_path:
|
||||
pt = PersistentTemporaryFile('_dbcover.jpg')
|
||||
with pt:
|
||||
shutil.copyfileobj(f, pt)
|
||||
return pt.name
|
||||
if as_file:
|
||||
ret = tempfile.SpooledTemporaryFile(SPOOL_SIZE)
|
||||
shutil.copyfileobj(f, ret)
|
||||
ret.seek(0)
|
||||
else:
|
||||
ret = f.read()
|
||||
if as_image:
|
||||
from PyQt4.Qt import QImage
|
||||
i = QImage()
|
||||
i.loadFromData(ret)
|
||||
ret = i
|
||||
return ret
|
||||
|
||||
# }}}
|
||||
|
||||
|
@ -81,6 +81,87 @@ class Cache(object):
|
||||
if name and path:
|
||||
return self.backend.format_abspath(book_id, fmt, name, path)
|
||||
|
||||
def _get_metadata(self, book_id, get_user_categories=True):
|
||||
mi = Metadata(None)
|
||||
author_ids = self._field_ids_for('authors', book_id)
|
||||
aut_list = [self._author_data(i) for i in author_ids]
|
||||
aum = []
|
||||
aus = {}
|
||||
aul = {}
|
||||
for rec in aut_list:
|
||||
aut = rec['name']
|
||||
aum.append(aut)
|
||||
aus[aut] = rec['sort']
|
||||
aul[aut] = rec['link']
|
||||
mi.title = self._field_for('title', book_id,
|
||||
default_value=_('Unknown'))
|
||||
mi.authors = aum
|
||||
mi.author_sort = self._field_for('author_sort', book_id,
|
||||
default_value=_('Unknown'))
|
||||
mi.author_sort_map = aus
|
||||
mi.author_link_map = aul
|
||||
mi.comments = self._field_for('comments', book_id)
|
||||
mi.publisher = self._field_for('publisher', book_id)
|
||||
n = now()
|
||||
mi.timestamp = self._field_for('timestamp', book_id, default_value=n)
|
||||
mi.pubdate = self._field_for('pubdate', book_id, default_value=n)
|
||||
mi.uuid = self._field_for('uuid', book_id,
|
||||
default_value='dummy')
|
||||
mi.title_sort = self._field_for('sort', book_id,
|
||||
default_value=_('Unknown'))
|
||||
mi.book_size = self._field_for('size', book_id, default_value=0)
|
||||
mi.ondevice_col = self._field_for('ondevice', book_id, default_value='')
|
||||
mi.last_modified = self._field_for('last_modified', book_id,
|
||||
default_value=n)
|
||||
formats = self._field_for('formats', book_id)
|
||||
mi.format_metadata = {}
|
||||
if not formats:
|
||||
formats = None
|
||||
else:
|
||||
for f in formats:
|
||||
mi.format_metadata[f] = self._format_metadata(book_id, f)
|
||||
formats = ','.join(formats)
|
||||
mi.formats = formats
|
||||
mi.has_cover = _('Yes') if self._field_for('cover', book_id,
|
||||
default_value=False) else ''
|
||||
mi.tags = list(self._field_for('tags', book_id, default_value=()))
|
||||
mi.series = self._field_for('series', book_id)
|
||||
if mi.series:
|
||||
mi.series_index = self._field_for('series_index', book_id,
|
||||
default_value=1.0)
|
||||
mi.rating = self._field_for('rating', book_id)
|
||||
mi.set_identifiers(self._field_for('identifiers', book_id,
|
||||
default_value={}))
|
||||
mi.application_id = book_id
|
||||
mi.id = book_id
|
||||
composites = {}
|
||||
for key, meta in self.field_metadata.custom_iteritems():
|
||||
mi.set_user_metadata(key, meta)
|
||||
if meta['datatype'] == 'composite':
|
||||
composites.append(key)
|
||||
else:
|
||||
mi.set(key, val=self._field_for(meta['label'], book_id),
|
||||
extra=self._field_for(meta['label']+'_index', book_id))
|
||||
for c in composites:
|
||||
mi.set(key, val=self._composite_for(key, book_id, mi))
|
||||
|
||||
user_cat_vals = {}
|
||||
if get_user_categories:
|
||||
user_cats = self.prefs['user_categories']
|
||||
for ucat in user_cats:
|
||||
res = []
|
||||
for name,cat,ign in user_cats[ucat]:
|
||||
v = mi.get(cat, None)
|
||||
if isinstance(v, list):
|
||||
if name in v:
|
||||
res.append([name,cat])
|
||||
elif name == v:
|
||||
res.append([name,cat])
|
||||
user_cat_vals[ucat] = res
|
||||
mi.user_categories = user_cat_vals
|
||||
|
||||
return mi
|
||||
|
||||
# Cache Layer API {{{
|
||||
|
||||
@api
|
||||
@ -193,101 +274,30 @@ class Cache(object):
|
||||
self.format_metadata_cache[book_id][fmt] = ans
|
||||
return ans
|
||||
|
||||
@read_api
|
||||
def get_metadata(self, book_id, get_cover=False,
|
||||
get_user_categories=True, cover_as_data=False):
|
||||
@api
|
||||
def get_metadata(self, book_id,
|
||||
get_cover=False, get_user_categories=True, cover_as_data=False):
|
||||
'''
|
||||
Convenience method to return metadata as a :class:`Metadata` object.
|
||||
Note that the list of formats is not verified.
|
||||
Return metadata for the book identified by book_id as a :class:`Metadata` object.
|
||||
Note that the list of formats is not verified. If get_cover is True,
|
||||
the cover is returned, either a path to temp file as mi.cover or if
|
||||
cover_as_data is True then as mi.cover_data.
|
||||
'''
|
||||
mi = Metadata(None)
|
||||
|
||||
author_ids = self._field_ids_for('authors', book_id)
|
||||
aut_list = [self._author_data(i) for i in author_ids]
|
||||
aum = []
|
||||
aus = {}
|
||||
aul = {}
|
||||
for rec in aut_list:
|
||||
aut = rec['name']
|
||||
aum.append(aut)
|
||||
aus[aut] = rec['sort']
|
||||
aul[aut] = rec['link']
|
||||
mi.title = self._field_for('title', book_id,
|
||||
default_value=_('Unknown'))
|
||||
mi.authors = aum
|
||||
mi.author_sort = self._field_for('author_sort', book_id,
|
||||
default_value=_('Unknown'))
|
||||
mi.author_sort_map = aus
|
||||
mi.author_link_map = aul
|
||||
mi.comments = self._field_for('comments', book_id)
|
||||
mi.publisher = self._field_for('publisher', book_id)
|
||||
n = now()
|
||||
mi.timestamp = self._field_for('timestamp', book_id, default_value=n)
|
||||
mi.pubdate = self._field_for('pubdate', book_id, default_value=n)
|
||||
mi.uuid = self._field_for('uuid', book_id,
|
||||
default_value='dummy')
|
||||
mi.title_sort = self._field_for('sort', book_id,
|
||||
default_value=_('Unknown'))
|
||||
mi.book_size = self._field_for('size', book_id, default_value=0)
|
||||
mi.ondevice_col = self._field_for('ondevice', book_id, default_value='')
|
||||
mi.last_modified = self._field_for('last_modified', book_id,
|
||||
default_value=n)
|
||||
formats = self._field_for('formats', book_id)
|
||||
mi.format_metadata = {}
|
||||
if not formats:
|
||||
formats = None
|
||||
else:
|
||||
for f in formats:
|
||||
mi.format_metadata[f] = self._format_metadata(book_id, f)
|
||||
formats = ','.join(formats)
|
||||
mi.formats = formats
|
||||
mi.has_cover = _('Yes') if self._field_for('cover', book_id,
|
||||
default_value=False) else ''
|
||||
mi.tags = list(self._field_for('tags', book_id, default_value=()))
|
||||
mi.series = self._field_for('series', book_id)
|
||||
if mi.series:
|
||||
mi.series_index = self._field_for('series_index', book_id,
|
||||
default_value=1.0)
|
||||
mi.rating = self._field_for('rating', book_id)
|
||||
mi.set_identifiers(self._field_for('identifiers', book_id,
|
||||
default_value={}))
|
||||
mi.application_id = book_id
|
||||
mi.id = book_id
|
||||
composites = {}
|
||||
for key, meta in self.field_metadata.custom_iteritems():
|
||||
mi.set_user_metadata(key, meta)
|
||||
if meta['datatype'] == 'composite':
|
||||
composites.append(key)
|
||||
else:
|
||||
mi.set(key, val=self._field_for(meta['label'], book_id),
|
||||
extra=self._field_for(meta['label']+'_index', book_id))
|
||||
for c in composites:
|
||||
mi.set(key, val=self._composite_for(key, book_id, mi))
|
||||
|
||||
user_cat_vals = {}
|
||||
if get_user_categories:
|
||||
user_cats = self.prefs['user_categories']
|
||||
for ucat in user_cats:
|
||||
res = []
|
||||
for name,cat,ign in user_cats[ucat]:
|
||||
v = mi.get(cat, None)
|
||||
if isinstance(v, list):
|
||||
if name in v:
|
||||
res.append([name,cat])
|
||||
elif name == v:
|
||||
res.append([name,cat])
|
||||
user_cat_vals[ucat] = res
|
||||
mi.user_categories = user_cat_vals
|
||||
with self.read_lock:
|
||||
mi = self._get_metadata(book_id, get_user_categories=get_user_categories)
|
||||
|
||||
if get_cover:
|
||||
if cover_as_data:
|
||||
cdata = self.cover(id, index_is_id=True)
|
||||
cdata = self.cover(book_id)
|
||||
if cdata:
|
||||
mi.cover_data = ('jpeg', cdata)
|
||||
else:
|
||||
mi.cover = self.cover(id, index_is_id=True, as_path=True)
|
||||
mi.cover = self.cover(book_id, as_path=True)
|
||||
|
||||
return mi
|
||||
|
||||
|
||||
# }}}
|
||||
|
||||
# Testing {{{
|
||||
|
@ -7,7 +7,9 @@ __license__ = 'GPL v3'
|
||||
__copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
||||
from threading import Lock, Condition, current_thread
|
||||
from threading import Lock, Condition, current_thread, RLock
|
||||
from functools import partial
|
||||
from collections import Counter
|
||||
|
||||
class LockingError(RuntimeError):
|
||||
pass
|
||||
@ -37,7 +39,7 @@ def create_locks():
|
||||
l = SHLock()
|
||||
return RWLockWrapper(l), RWLockWrapper(l, is_shared=False)
|
||||
|
||||
class SHLock(object):
|
||||
class SHLock(object): # {{{
|
||||
'''
|
||||
Shareable lock class. Used to implement the Multiple readers-single writer
|
||||
paradigm. As best as I can tell, neither writer nor reader starvation
|
||||
@ -79,6 +81,11 @@ class SHLock(object):
|
||||
return self._acquire_exclusive(blocking)
|
||||
assert not (self.is_shared and self.is_exclusive)
|
||||
|
||||
def owns_lock(self):
|
||||
me = current_thread()
|
||||
with self._lock:
|
||||
return self._exclusive_owner is me or me in self._shared_owners
|
||||
|
||||
def release(self):
|
||||
''' Release the lock. '''
|
||||
# This decrements the appropriate lock counters, and if the lock
|
||||
@ -189,6 +196,8 @@ class SHLock(object):
|
||||
def _return_waiter(self, waiter):
|
||||
self._free_waiters.append(waiter)
|
||||
|
||||
# }}}
|
||||
|
||||
class RWLockWrapper(object):
|
||||
|
||||
def __init__(self, shlock, is_shared=True):
|
||||
@ -200,16 +209,124 @@ class RWLockWrapper(object):
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
self.release()
|
||||
|
||||
def release(self):
|
||||
self._shlock.release()
|
||||
|
||||
def owns_lock(self):
|
||||
return self._shlock.owns_lock()
|
||||
|
||||
class RecordLock(object):
|
||||
|
||||
'''
|
||||
Lock records identified by hashable ids. To use
|
||||
|
||||
rl = RecordLock()
|
||||
|
||||
with rl.lock(some_id):
|
||||
# do something
|
||||
|
||||
This will lock the record identified by some_id exclusively. The lock is
|
||||
recursive, which means that you can lock the same record multiple times in
|
||||
the same thread.
|
||||
|
||||
This class co-operates with the SHLock class. If you try to lock a record
|
||||
in a thread that already holds the SHLock, a LockingError is raised. This
|
||||
is to prevent the possibility of a cross-lock deadlock.
|
||||
|
||||
A cross-lock deadlock is still possible if you first lock a record and then
|
||||
acquire the SHLock, but the usage pattern for this lock makes this highly
|
||||
unlikely (this lock should be acquired immediately before any file I/O on
|
||||
files in the library and released immediately after).
|
||||
'''
|
||||
|
||||
class Wrap(object):
|
||||
|
||||
def __init__(self, release):
|
||||
self.release = release
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *args, **kwargs):
|
||||
self.release()
|
||||
self.release = None
|
||||
|
||||
def __init__(self, sh_lock):
|
||||
self._lock = Lock()
|
||||
# This is for recycling lock objects.
|
||||
self._free_locks = [RLock()]
|
||||
self._records = {}
|
||||
self._counter = Counter()
|
||||
self.sh_lock = sh_lock
|
||||
|
||||
def lock(self, record_id):
|
||||
if self.sh_lock.owns_lock():
|
||||
raise LockingError('Current thread already holds a shared lock,'
|
||||
' you cannot also ask for record lock as this could cause a'
|
||||
' deadlock.')
|
||||
with self._lock:
|
||||
l = self._records.get(record_id, None)
|
||||
if l is None:
|
||||
l = self._take_lock()
|
||||
self._records[record_id] = l
|
||||
self._counter[record_id] += 1
|
||||
l.acquire()
|
||||
return RecordLock.Wrap(partial(self.release, record_id))
|
||||
|
||||
def release(self, record_id):
|
||||
with self._lock:
|
||||
l = self._records.pop(record_id, None)
|
||||
if l is None:
|
||||
raise LockingError('No lock acquired for record %r'%record_id)
|
||||
l.release()
|
||||
self._counter[record_id] -= 1
|
||||
if self._counter[record_id] > 0:
|
||||
self._records[record_id] = l
|
||||
else:
|
||||
self._return_lock(l)
|
||||
|
||||
def _take_lock(self):
|
||||
try:
|
||||
return self._free_locks.pop()
|
||||
except IndexError:
|
||||
return RLock()
|
||||
|
||||
def _return_lock(self, lock):
|
||||
self._free_locks.append(lock)
|
||||
|
||||
# Tests {{{
|
||||
if __name__ == '__main__':
|
||||
import time, random, unittest
|
||||
from threading import Thread
|
||||
|
||||
class TestSHLock(unittest.TestCase):
|
||||
"""Testcases for SHLock class."""
|
||||
class TestLock(unittest.TestCase):
|
||||
"""Testcases for Lock classes."""
|
||||
|
||||
def test_owns_locks(self):
|
||||
lock = SHLock()
|
||||
self.assertFalse(lock.owns_lock())
|
||||
lock.acquire(shared=True)
|
||||
self.assertTrue(lock.owns_lock())
|
||||
lock.release()
|
||||
self.assertFalse(lock.owns_lock())
|
||||
lock.acquire(shared=False)
|
||||
self.assertTrue(lock.owns_lock())
|
||||
lock.release()
|
||||
self.assertFalse(lock.owns_lock())
|
||||
|
||||
done = []
|
||||
def test():
|
||||
if not lock.owns_lock():
|
||||
done.append(True)
|
||||
lock.acquire()
|
||||
t = Thread(target=test)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
t.join(1)
|
||||
self.assertEqual(len(done), 1)
|
||||
lock.release()
|
||||
|
||||
def test_multithread_deadlock(self):
|
||||
lock = SHLock()
|
||||
@ -345,8 +462,38 @@ if __name__ == '__main__':
|
||||
self.assertFalse(lock.is_shared)
|
||||
self.assertFalse(lock.is_exclusive)
|
||||
|
||||
def test_record_lock(self):
|
||||
shlock = SHLock()
|
||||
lock = RecordLock(shlock)
|
||||
|
||||
suite = unittest.TestLoader().loadTestsFromTestCase(TestSHLock)
|
||||
shlock.acquire()
|
||||
self.assertRaises(LockingError, lock.lock, 1)
|
||||
shlock.release()
|
||||
with lock.lock(1):
|
||||
with lock.lock(1):
|
||||
pass
|
||||
|
||||
def dolock():
|
||||
with lock.lock(1):
|
||||
time.sleep(0.1)
|
||||
|
||||
t = Thread(target=dolock)
|
||||
t.daemon = True
|
||||
with lock.lock(1):
|
||||
t.start()
|
||||
t.join(0.2)
|
||||
self.assertTrue(t.is_alive())
|
||||
t.join(0.11)
|
||||
self.assertFalse(t.is_alive())
|
||||
|
||||
t = Thread(target=dolock)
|
||||
t.daemon = True
|
||||
with lock.lock(2):
|
||||
t.start()
|
||||
t.join(0.11)
|
||||
self.assertFalse(t.is_alive())
|
||||
|
||||
suite = unittest.TestLoader().loadTestsFromTestCase(TestLock)
|
||||
unittest.TextTestRunner(verbosity=2).run(suite)
|
||||
|
||||
# }}}
|
||||
|
Loading…
x
Reference in New Issue
Block a user