From da4dfd7a1fcb1527590f5381a198270357276ca9 Mon Sep 17 00:00:00 2001 From: Charles Haley <> Date: Wed, 26 Jan 2011 13:26:10 +0000 Subject: [PATCH] New metadata backup architecture --- src/calibre/library/caches.py | 32 +++----- src/calibre/library/database2.py | 121 +++++++++++++++++++------------ 2 files changed, 84 insertions(+), 69 deletions(-) diff --git a/src/calibre/library/caches.py b/src/calibre/library/caches.py index 55045e7f98..2049feaa18 100644 --- a/src/calibre/library/caches.py +++ b/src/calibre/library/caches.py @@ -38,7 +38,6 @@ class MetadataBackup(Thread): # {{{ self.get_metadata_for_dump = FunctionDispatcher(db.get_metadata_for_dump) self.clear_dirtied = FunctionDispatcher(db.clear_dirtied) self.set_dirtied = FunctionDispatcher(db.dirtied) - self.in_limbo = None def stop(self): self.keep_running = False @@ -50,34 +49,33 @@ class MetadataBackup(Thread): # {{{ def run(self): while self.keep_running: - self.in_limbo = None try: - time.sleep(0.5) # Limit to two per second - id_ = self.db.dirtied_queue.get(True, 1.45) - except Empty: - continue + time.sleep(2) # Limit to two per second + (id_, sequence) = self.db.get_a_dirtied_book() + if id_ is None: + continue + print 'writer thread', id_, sequence except: # Happens during interpreter shutdown break if not self.keep_running: break - self.in_limbo = id_ try: - path, mi = self.get_metadata_for_dump(id_) + path, mi, sequence = self.get_metadata_for_dump(id_) except: prints('Failed to get backup metadata for id:', id_, 'once') traceback.print_exc() time.sleep(2) try: - path, mi = self.get_metadata_for_dump(id_) + path, mi, sequence = self.get_metadata_for_dump(id_) except: prints('Failed to get backup metadata for id:', id_, 'again, giving up') traceback.print_exc() continue - # at this point the dirty indication is off if mi is None: + self.clear_dirtied(id_, sequence) continue if not self.keep_running: break @@ -89,7 +87,6 @@ class MetadataBackup(Thread): # {{{ try: raw = metadata_to_opf(mi) except: - self.set_dirtied([id_]) prints('Failed to convert to opf for id:', id_) traceback.print_exc() continue @@ -106,24 +103,13 @@ class MetadataBackup(Thread): # {{{ try: self.do_write(path, raw) except: - self.set_dirtied([id_]) prints('Failed to write backup metadata for id:', id_, 'again, giving up') continue - self.in_limbo = None - self.flush() + self.clear_dirtied(id_, sequence) self.break_cycles() - def flush(self): - 'Used during shutdown to ensure that a dirtied book is not missed' - if self.in_limbo is not None: - try: - self.db.dirtied([self.in_limbo]) - except: - traceback.print_exc() - self.in_limbo = None - def write(self, path, raw): with lopen(path, 'wb') as f: f.write(raw) diff --git a/src/calibre/library/database2.py b/src/calibre/library/database2.py index dbdeabf3d4..7c8d5aee13 100644 --- a/src/calibre/library/database2.py +++ b/src/calibre/library/database2.py @@ -7,9 +7,9 @@ __docformat__ = 'restructuredtext en' The database used to store ebook metadata ''' import os, sys, shutil, cStringIO, glob, time, functools, traceback, re, json +import threading, random from itertools import repeat from math import ceil -from Queue import Queue from PyQt4.QtGui import QImage @@ -117,7 +117,6 @@ class LibraryDatabase2(LibraryDatabase, SchemaUpgrade, CustomColumns): def __init__(self, library_path, row_factory=False, default_prefs=None, read_only=False): self.field_metadata = FieldMetadata() - self.dirtied_queue = Queue() if not os.path.exists(library_path): os.makedirs(library_path) self.listeners = set([]) @@ -168,6 +167,10 @@ class LibraryDatabase2(LibraryDatabase, SchemaUpgrade, CustomColumns): return row[loc] def initialize_dynamic(self): + # Create the lock to be used to guard access to the metadata writer + # queues. This must be an RLock, not a Lock + self.dirtied_lock = threading.RLock() + self.field_metadata = FieldMetadata() #Ensure we start with a clean copy self.prefs = DBPrefs(self) defs = self.prefs.defaults @@ -376,9 +379,12 @@ class LibraryDatabase2(LibraryDatabase, SchemaUpgrade, CustomColumns): loc=self.FIELD_MAP['sort'])) d = self.conn.get('SELECT book FROM metadata_dirtied', all=True) - for x in d: - self.dirtied_queue.put(x[0]) - self.dirtied_cache = set([x[0] for x in d]) + with self.dirtied_lock: + self.dirtied_sequence = 0 + self.dirtied_cache = {} + for x in d: + self.dirtied_cache[x[0]] = self.dirtied_sequence + self.dirtied_sequence += 1 self.refresh_ondevice = functools.partial(self.data.refresh_ondevice, self) self.refresh() @@ -605,20 +611,26 @@ class LibraryDatabase2(LibraryDatabase, SchemaUpgrade, CustomColumns): def metadata_for_field(self, key): return self.field_metadata[key] - def clear_dirtied(self, book_ids): + def clear_dirtied(self, book_id, sequence): ''' Clear the dirtied indicator for the books. This is used when fetching metadata, creating an OPF, and writing a file are separated into steps. The last step is clearing the indicator ''' - for book_id in book_ids: - self.conn.execute('DELETE FROM metadata_dirtied WHERE book=?', - (book_id,)) - # if a later exception prevents the commit, then the dirtied - # table will still have the book. No big deal, because the OPF - # is there and correct. We will simply do it again on next - # start - self.dirtied_cache.discard(book_id) + with self.dirtied_lock: + dc_sequence = self.dirtied_cache.get(book_id, None) +# print 'clear_dirty: check book', book_id, dc_sequence + if dc_sequence is None or sequence is None or dc_sequence == sequence: +# print 'needs to be cleaned' + self.conn.execute('DELETE FROM metadata_dirtied WHERE book=?', + (book_id,)) + try: + del self.dirtied_cache[book_id] + except: + pass + elif dc_sequence is not None: +# print 'book needs to be done again' + pass self.conn.commit() def dump_metadata(self, book_ids=None, remove_from_dirtied=True, @@ -632,38 +644,57 @@ class LibraryDatabase2(LibraryDatabase, SchemaUpgrade, CustomColumns): for book_id in book_ids: if not self.data.has_id(book_id): continue - path, mi = self.get_metadata_for_dump(book_id, - remove_from_dirtied=remove_from_dirtied) + path, mi, sequence = self.get_metadata_for_dump(book_id) if path is None: continue try: raw = metadata_to_opf(mi) with lopen(path, 'wb') as f: f.write(raw) + if remove_from_dirtied: + self.clear_dirtied(book_id, sequence) except: - # Something went wrong. Put the book back on the dirty list - self.dirtied([book_id]) + pass if commit: self.conn.commit() def dirtied(self, book_ids, commit=True): - for book in frozenset(book_ids) - self.dirtied_cache: - try: - self.conn.execute( - 'INSERT INTO metadata_dirtied (book) VALUES (?)', - (book,)) - self.dirtied_queue.put(book) - except IntegrityError: - # Already in table - pass - # If the commit doesn't happen, then our cache will be wrong. This - # could lead to a problem because we won't put the book back into - # the dirtied table. We deal with this by writing the dirty cache - # back to the table on GUI exit. Not perfect, but probably OK - self.dirtied_cache.add(book) + for book in book_ids: + with self.dirtied_lock: +# print 'dirtied: check id', book + if book in self.dirtied_cache: + self.dirtied_cache[book] = self.dirtied_sequence + self.dirtied_sequence += 1 + continue +# print 'book not already dirty' + try: + self.conn.execute( + 'INSERT INTO metadata_dirtied (book) VALUES (?)', + (book,)) + except IntegrityError: + # Already in table + pass + self.dirtied_cache[book] = self.dirtied_sequence + self.dirtied_sequence += 1 + # If the commit doesn't happen, then the DB table will be wrong. This + # could lead to a problem because on restart, we won't put the book back + # into the dirtied_cache. We deal with this by writing the dirtied_cache + # back to the table on GUI exit. Not perfect, but probably OK if commit: self.conn.commit() + def get_a_dirtied_book(self): + with self.dirtied_lock: + l = len(self.dirtied_cache) + if l > 0: + # The random stuff is here to prevent a single book from + # blocking progress if its metadata cannot be written for some + # reason. + id_ = self.dirtied_cache.keys()[random.randint(0, l-1)] + sequence = self.dirtied_cache[id_] + return (id_, sequence) + return (None, None) + def dirty_queue_length(self): return len(self.dirtied_cache) @@ -676,12 +707,19 @@ class LibraryDatabase2(LibraryDatabase, SchemaUpgrade, CustomColumns): is no problem with setting a dirty indication for a book that isn't in fact dirty. Just wastes a few cycles. ''' - book_ids = list(self.dirtied_cache) - self.dirtied_cache = set() - self.dirtied(book_ids) + with self.dirtied_lock: + book_ids = list(self.dirtied_cache.keys()) + self.dirtied_cache = {} + self.dirtied(book_ids) - def get_metadata_for_dump(self, idx, remove_from_dirtied=True): + def get_metadata_for_dump(self, idx): path, mi = (None, None) + # get the current sequence number for this book to pass back to the + # backup thread. This will avoid double calls in the case where the + # thread has not done the work between the put and the get_metadata + with self.dirtied_lock: + sequence = self.dirtied_cache.get(idx, None) +# print 'get_md_for_dump', idx, sequence try: # While a book is being created, the path is empty. Don't bother to # try to write the opf, because it will go to the wrong folder. @@ -696,16 +734,7 @@ class LibraryDatabase2(LibraryDatabase, SchemaUpgrade, CustomColumns): # This almost certainly means that the book has been deleted while # the backup operation sat in the queue. pass - - try: - # clear the dirtied indicator. The user must put it back if - # something goes wrong with writing the OPF - if remove_from_dirtied: - self.clear_dirtied([idx]) - except: - # No real problem. We will just do it again. - pass - return (path, mi) + return (path, mi, sequence) def get_metadata(self, idx, index_is_id=False, get_cover=False): '''