New metadata backup architecture

This commit is contained in:
Charles Haley 2011-01-26 13:26:10 +00:00
parent 6e1e2fbd75
commit da4dfd7a1f
2 changed files with 84 additions and 69 deletions

View File

@ -38,7 +38,6 @@ class MetadataBackup(Thread): # {{{
self.get_metadata_for_dump = FunctionDispatcher(db.get_metadata_for_dump) self.get_metadata_for_dump = FunctionDispatcher(db.get_metadata_for_dump)
self.clear_dirtied = FunctionDispatcher(db.clear_dirtied) self.clear_dirtied = FunctionDispatcher(db.clear_dirtied)
self.set_dirtied = FunctionDispatcher(db.dirtied) self.set_dirtied = FunctionDispatcher(db.dirtied)
self.in_limbo = None
def stop(self): def stop(self):
self.keep_running = False self.keep_running = False
@ -50,34 +49,33 @@ class MetadataBackup(Thread): # {{{
def run(self): def run(self):
while self.keep_running: while self.keep_running:
self.in_limbo = None
try: try:
time.sleep(0.5) # Limit to two per second time.sleep(2) # Limit to two per second
id_ = self.db.dirtied_queue.get(True, 1.45) (id_, sequence) = self.db.get_a_dirtied_book()
except Empty: if id_ is None:
continue continue
print 'writer thread', id_, sequence
except: except:
# Happens during interpreter shutdown # Happens during interpreter shutdown
break break
if not self.keep_running: if not self.keep_running:
break break
self.in_limbo = id_
try: try:
path, mi = self.get_metadata_for_dump(id_) path, mi, sequence = self.get_metadata_for_dump(id_)
except: except:
prints('Failed to get backup metadata for id:', id_, 'once') prints('Failed to get backup metadata for id:', id_, 'once')
traceback.print_exc() traceback.print_exc()
time.sleep(2) time.sleep(2)
try: try:
path, mi = self.get_metadata_for_dump(id_) path, mi, sequence = self.get_metadata_for_dump(id_)
except: except:
prints('Failed to get backup metadata for id:', id_, 'again, giving up') prints('Failed to get backup metadata for id:', id_, 'again, giving up')
traceback.print_exc() traceback.print_exc()
continue continue
# at this point the dirty indication is off
if mi is None: if mi is None:
self.clear_dirtied(id_, sequence)
continue continue
if not self.keep_running: if not self.keep_running:
break break
@ -89,7 +87,6 @@ class MetadataBackup(Thread): # {{{
try: try:
raw = metadata_to_opf(mi) raw = metadata_to_opf(mi)
except: except:
self.set_dirtied([id_])
prints('Failed to convert to opf for id:', id_) prints('Failed to convert to opf for id:', id_)
traceback.print_exc() traceback.print_exc()
continue continue
@ -106,24 +103,13 @@ class MetadataBackup(Thread): # {{{
try: try:
self.do_write(path, raw) self.do_write(path, raw)
except: except:
self.set_dirtied([id_])
prints('Failed to write backup metadata for id:', id_, prints('Failed to write backup metadata for id:', id_,
'again, giving up') 'again, giving up')
continue continue
self.in_limbo = None self.clear_dirtied(id_, sequence)
self.flush()
self.break_cycles() self.break_cycles()
def flush(self):
'Used during shutdown to ensure that a dirtied book is not missed'
if self.in_limbo is not None:
try:
self.db.dirtied([self.in_limbo])
except:
traceback.print_exc()
self.in_limbo = None
def write(self, path, raw): def write(self, path, raw):
with lopen(path, 'wb') as f: with lopen(path, 'wb') as f:
f.write(raw) f.write(raw)

View File

@ -7,9 +7,9 @@ __docformat__ = 'restructuredtext en'
The database used to store ebook metadata The database used to store ebook metadata
''' '''
import os, sys, shutil, cStringIO, glob, time, functools, traceback, re, json import os, sys, shutil, cStringIO, glob, time, functools, traceback, re, json
import threading, random
from itertools import repeat from itertools import repeat
from math import ceil from math import ceil
from Queue import Queue
from PyQt4.QtGui import QImage from PyQt4.QtGui import QImage
@ -117,7 +117,6 @@ class LibraryDatabase2(LibraryDatabase, SchemaUpgrade, CustomColumns):
def __init__(self, library_path, row_factory=False, default_prefs=None, def __init__(self, library_path, row_factory=False, default_prefs=None,
read_only=False): read_only=False):
self.field_metadata = FieldMetadata() self.field_metadata = FieldMetadata()
self.dirtied_queue = Queue()
if not os.path.exists(library_path): if not os.path.exists(library_path):
os.makedirs(library_path) os.makedirs(library_path)
self.listeners = set([]) self.listeners = set([])
@ -168,6 +167,10 @@ class LibraryDatabase2(LibraryDatabase, SchemaUpgrade, CustomColumns):
return row[loc] return row[loc]
def initialize_dynamic(self): def initialize_dynamic(self):
# Create the lock to be used to guard access to the metadata writer
# queues. This must be an RLock, not a Lock
self.dirtied_lock = threading.RLock()
self.field_metadata = FieldMetadata() #Ensure we start with a clean copy self.field_metadata = FieldMetadata() #Ensure we start with a clean copy
self.prefs = DBPrefs(self) self.prefs = DBPrefs(self)
defs = self.prefs.defaults defs = self.prefs.defaults
@ -376,9 +379,12 @@ class LibraryDatabase2(LibraryDatabase, SchemaUpgrade, CustomColumns):
loc=self.FIELD_MAP['sort'])) loc=self.FIELD_MAP['sort']))
d = self.conn.get('SELECT book FROM metadata_dirtied', all=True) d = self.conn.get('SELECT book FROM metadata_dirtied', all=True)
for x in d: with self.dirtied_lock:
self.dirtied_queue.put(x[0]) self.dirtied_sequence = 0
self.dirtied_cache = set([x[0] for x in d]) self.dirtied_cache = {}
for x in d:
self.dirtied_cache[x[0]] = self.dirtied_sequence
self.dirtied_sequence += 1
self.refresh_ondevice = functools.partial(self.data.refresh_ondevice, self) self.refresh_ondevice = functools.partial(self.data.refresh_ondevice, self)
self.refresh() self.refresh()
@ -605,20 +611,26 @@ class LibraryDatabase2(LibraryDatabase, SchemaUpgrade, CustomColumns):
def metadata_for_field(self, key): def metadata_for_field(self, key):
return self.field_metadata[key] return self.field_metadata[key]
def clear_dirtied(self, book_ids): def clear_dirtied(self, book_id, sequence):
''' '''
Clear the dirtied indicator for the books. This is used when fetching Clear the dirtied indicator for the books. This is used when fetching
metadata, creating an OPF, and writing a file are separated into steps. metadata, creating an OPF, and writing a file are separated into steps.
The last step is clearing the indicator The last step is clearing the indicator
''' '''
for book_id in book_ids: with self.dirtied_lock:
self.conn.execute('DELETE FROM metadata_dirtied WHERE book=?', dc_sequence = self.dirtied_cache.get(book_id, None)
(book_id,)) # print 'clear_dirty: check book', book_id, dc_sequence
# if a later exception prevents the commit, then the dirtied if dc_sequence is None or sequence is None or dc_sequence == sequence:
# table will still have the book. No big deal, because the OPF # print 'needs to be cleaned'
# is there and correct. We will simply do it again on next self.conn.execute('DELETE FROM metadata_dirtied WHERE book=?',
# start (book_id,))
self.dirtied_cache.discard(book_id) try:
del self.dirtied_cache[book_id]
except:
pass
elif dc_sequence is not None:
# print 'book needs to be done again'
pass
self.conn.commit() self.conn.commit()
def dump_metadata(self, book_ids=None, remove_from_dirtied=True, def dump_metadata(self, book_ids=None, remove_from_dirtied=True,
@ -632,38 +644,57 @@ class LibraryDatabase2(LibraryDatabase, SchemaUpgrade, CustomColumns):
for book_id in book_ids: for book_id in book_ids:
if not self.data.has_id(book_id): if not self.data.has_id(book_id):
continue continue
path, mi = self.get_metadata_for_dump(book_id, path, mi, sequence = self.get_metadata_for_dump(book_id)
remove_from_dirtied=remove_from_dirtied)
if path is None: if path is None:
continue continue
try: try:
raw = metadata_to_opf(mi) raw = metadata_to_opf(mi)
with lopen(path, 'wb') as f: with lopen(path, 'wb') as f:
f.write(raw) f.write(raw)
if remove_from_dirtied:
self.clear_dirtied(book_id, sequence)
except: except:
# Something went wrong. Put the book back on the dirty list pass
self.dirtied([book_id])
if commit: if commit:
self.conn.commit() self.conn.commit()
def dirtied(self, book_ids, commit=True): def dirtied(self, book_ids, commit=True):
for book in frozenset(book_ids) - self.dirtied_cache: for book in book_ids:
try: with self.dirtied_lock:
self.conn.execute( # print 'dirtied: check id', book
'INSERT INTO metadata_dirtied (book) VALUES (?)', if book in self.dirtied_cache:
(book,)) self.dirtied_cache[book] = self.dirtied_sequence
self.dirtied_queue.put(book) self.dirtied_sequence += 1
except IntegrityError: continue
# Already in table # print 'book not already dirty'
pass try:
# If the commit doesn't happen, then our cache will be wrong. This self.conn.execute(
# could lead to a problem because we won't put the book back into 'INSERT INTO metadata_dirtied (book) VALUES (?)',
# the dirtied table. We deal with this by writing the dirty cache (book,))
# back to the table on GUI exit. Not perfect, but probably OK except IntegrityError:
self.dirtied_cache.add(book) # Already in table
pass
self.dirtied_cache[book] = self.dirtied_sequence
self.dirtied_sequence += 1
# If the commit doesn't happen, then the DB table will be wrong. This
# could lead to a problem because on restart, we won't put the book back
# into the dirtied_cache. We deal with this by writing the dirtied_cache
# back to the table on GUI exit. Not perfect, but probably OK
if commit: if commit:
self.conn.commit() self.conn.commit()
def get_a_dirtied_book(self):
with self.dirtied_lock:
l = len(self.dirtied_cache)
if l > 0:
# The random stuff is here to prevent a single book from
# blocking progress if its metadata cannot be written for some
# reason.
id_ = self.dirtied_cache.keys()[random.randint(0, l-1)]
sequence = self.dirtied_cache[id_]
return (id_, sequence)
return (None, None)
def dirty_queue_length(self): def dirty_queue_length(self):
return len(self.dirtied_cache) return len(self.dirtied_cache)
@ -676,12 +707,19 @@ class LibraryDatabase2(LibraryDatabase, SchemaUpgrade, CustomColumns):
is no problem with setting a dirty indication for a book that isn't in is no problem with setting a dirty indication for a book that isn't in
fact dirty. Just wastes a few cycles. fact dirty. Just wastes a few cycles.
''' '''
book_ids = list(self.dirtied_cache) with self.dirtied_lock:
self.dirtied_cache = set() book_ids = list(self.dirtied_cache.keys())
self.dirtied(book_ids) self.dirtied_cache = {}
self.dirtied(book_ids)
def get_metadata_for_dump(self, idx, remove_from_dirtied=True): def get_metadata_for_dump(self, idx):
path, mi = (None, None) path, mi = (None, None)
# get the current sequence number for this book to pass back to the
# backup thread. This will avoid double calls in the case where the
# thread has not done the work between the put and the get_metadata
with self.dirtied_lock:
sequence = self.dirtied_cache.get(idx, None)
# print 'get_md_for_dump', idx, sequence
try: try:
# While a book is being created, the path is empty. Don't bother to # While a book is being created, the path is empty. Don't bother to
# try to write the opf, because it will go to the wrong folder. # try to write the opf, because it will go to the wrong folder.
@ -696,16 +734,7 @@ class LibraryDatabase2(LibraryDatabase, SchemaUpgrade, CustomColumns):
# This almost certainly means that the book has been deleted while # This almost certainly means that the book has been deleted while
# the backup operation sat in the queue. # the backup operation sat in the queue.
pass pass
return (path, mi, sequence)
try:
# clear the dirtied indicator. The user must put it back if
# something goes wrong with writing the OPF
if remove_from_dirtied:
self.clear_dirtied([idx])
except:
# No real problem. We will just do it again.
pass
return (path, mi)
def get_metadata(self, idx, index_is_id=False, get_cover=False): def get_metadata(self, idx, index_is_id=False, get_cover=False):
''' '''