From 6bea78733e5ef8c00f3f77cb376093bac178ec78 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Tue, 9 Apr 2013 11:11:49 +0530 Subject: [PATCH] Implement metadata to OPF backup --- src/calibre/db/__init__.py | 1 + src/calibre/db/backend.py | 10 ++ src/calibre/db/backup.py | 115 +++++++++++++++ src/calibre/db/cache.py | 238 +++++++++++++++++++++++++------- src/calibre/db/tests/main.py | 6 +- src/calibre/db/tests/writing.py | 63 +++++++++ src/calibre/utils/monotonic.py | 104 ++++++++++++++ 7 files changed, 481 insertions(+), 56 deletions(-) create mode 100644 src/calibre/db/backup.py create mode 100644 src/calibre/utils/monotonic.py diff --git a/src/calibre/db/__init__.py b/src/calibre/db/__init__.py index 5d12bdc686..b0916ebf73 100644 --- a/src/calibre/db/__init__.py +++ b/src/calibre/db/__init__.py @@ -68,4 +68,5 @@ Various things that require other things before they can be migrated: libraries/switching/on calibre startup. 3. From refresh in the legacy interface: Rember to flush the composite column template cache. + 4. Replace the metadatabackup thread with the new implementation when using the new backend. ''' diff --git a/src/calibre/db/backend.py b/src/calibre/db/backend.py index 9259fc628b..c2beb25e2e 100644 --- a/src/calibre/db/backend.py +++ b/src/calibre/db/backend.py @@ -1067,5 +1067,15 @@ class DB(object): break # Fail silently since nothing catastrophic has happened curpath = os.path.join(curpath, newseg) + def write_backup(self, path, raw): + path = os.path.abspath(os.path.join(self.library_path, path, 'metadata.opf')) + with lopen(path, 'wb') as f: + f.write(raw) + + def read_backup(self, path): + path = os.path.abspath(os.path.join(self.library_path, path, 'metadata.opf')) + with lopen(path, 'rb') as f: + return f.read() + # }}} diff --git a/src/calibre/db/backup.py b/src/calibre/db/backup.py new file mode 100644 index 0000000000..6410a347c6 --- /dev/null +++ b/src/calibre/db/backup.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python +# vim:fileencoding=UTF-8 +from __future__ import (unicode_literals, division, absolute_import, + print_function) + +__license__ = 'GPL v3' +__copyright__ = '2013, Kovid Goyal ' +__docformat__ = 'restructuredtext en' + +import weakref, traceback +from threading import Thread, Event + +from calibre import prints +from calibre.ebooks.metadata.opf2 import metadata_to_opf + +class Abort(Exception): + pass + +class MetadataBackup(Thread): + ''' + Continuously backup changed metadata into OPF files + in the book directory. This class runs in its own + thread. + ''' + + def __init__(self, db, interval=2, scheduling_interval=0.1): + Thread.__init__(self) + self.daemon = True + self._db = weakref.ref(db) + self.stop_running = Event() + self.interval = interval + self.scheduling_interval = scheduling_interval + + @property + def db(self): + ans = self._db() + if ans is None: + raise Abort() + return ans + + def stop(self): + self.stop_running.set() + + def wait(self, interval): + if self.stop_running.wait(interval): + raise Abort() + + def run(self): + while not self.stop_running.is_set(): + try: + self.wait(self.interval) + self.do_one() + except Abort: + break + + def do_one(self): + try: + book_id = self.db.get_a_dirtied_book() + if book_id is None: + return + except Abort: + raise + except: + # Happens during interpreter shutdown + return + + self.wait(0) + + try: + mi, sequence = self.db.get_metadata_for_dump(book_id) + except: + prints('Failed to get backup metadata for id:', book_id, 'once') + traceback.print_exc() + self.wait(self.interval) + try: + mi, sequence = self.db.get_metadata_for_dump(book_id) + except: + prints('Failed to get backup metadata for id:', book_id, 'again, giving up') + traceback.print_exc() + return + + if mi is None: + self.db.clear_dirtied(book_id, sequence) + + # Give the GUI thread a chance to do something. Python threads don't + # have priorities, so this thread would naturally keep the processor + # until some scheduling event happens. The wait makes such an event + self.wait(self.scheduling_interval) + + try: + raw = metadata_to_opf(mi) + except: + prints('Failed to convert to opf for id:', book_id) + traceback.print_exc() + return + + self.wait(self.scheduling_interval) + + try: + self.db.write_backup(book_id, raw) + except: + prints('Failed to write backup metadata for id:', book_id, 'once') + self.wait(self.interval) + try: + self.db.write_backup(book_id, raw) + except: + prints('Failed to write backup metadata for id:', book_id, 'again, giving up') + return + + self.db.clear_dirtied(book_id, sequence) + + def break_cycles(self): + # Legacy compatibility + pass + diff --git a/src/calibre/db/cache.py b/src/calibre/db/cache.py index 0f648e96dd..630757497b 100644 --- a/src/calibre/db/cache.py +++ b/src/calibre/db/cache.py @@ -7,7 +7,7 @@ __license__ = 'GPL v3' __copyright__ = '2011, Kovid Goyal ' __docformat__ = 'restructuredtext en' -import os, traceback +import os, traceback, random from io import BytesIO from collections import defaultdict from functools import wraps, partial @@ -15,7 +15,7 @@ from functools import wraps, partial from calibre.constants import iswindows from calibre.db import SPOOL_SIZE from calibre.db.categories import get_categories -from calibre.db.locking import create_locks, RecordLock +from calibre.db.locking import create_locks from calibre.db.errors import NoSuchFormat from calibre.db.fields import create_field from calibre.db.search import Search @@ -23,9 +23,10 @@ from calibre.db.tables import VirtualTable from calibre.db.write import get_series_values from calibre.db.lazy import FormatMetadata, FormatsList from calibre.ebooks.metadata.book.base import Metadata +from calibre.ebooks.metadata.opf2 import metadata_to_opf from calibre.ptempfile import (base_dir, PersistentTemporaryFile, SpooledTemporaryFile) -from calibre.utils.date import now +from calibre.utils.date import now as nowf from calibre.utils.icu import sort_key def api(f): @@ -57,9 +58,10 @@ class Cache(object): self.fields = {} self.composites = set() self.read_lock, self.write_lock = create_locks() - self.record_lock = RecordLock(self.read_lock) self.format_metadata_cache = defaultdict(dict) self.formatter_template_cache = {} + self.dirtied_cache = {} + self.dirtied_sequence = 0 self._search_api = Search(self.field_metadata.get_search_terms()) # Implement locking for all simple read/write API methods @@ -78,17 +80,18 @@ class Cache(object): self.initialize_dynamic() + @write_api def initialize_dynamic(self): # Reconstruct the user categories, putting them into field_metadata # Assumption is that someone else will fix them if they change. self.field_metadata.remove_dynamic_categories() - for user_cat in sorted(self.pref('user_categories', {}).iterkeys(), key=sort_key): + for user_cat in sorted(self._pref('user_categories', {}).iterkeys(), key=sort_key): cat_name = '@' + user_cat # add the '@' to avoid name collision self.field_metadata.add_user_category(label=cat_name, name=user_cat) # add grouped search term user categories - muc = frozenset(self.pref('grouped_search_make_user_categories', [])) - for cat in sorted(self.pref('grouped_search_terms', {}).iterkeys(), key=sort_key): + muc = frozenset(self._pref('grouped_search_make_user_categories', [])) + for cat in sorted(self._pref('grouped_search_terms', {}).iterkeys(), key=sort_key): if cat in muc: # There is a chance that these can be duplicates of an existing # user category. Print the exception and continue. @@ -102,10 +105,15 @@ class Cache(object): # self.field_metadata.add_search_category(label='search', name=_('Searches')) self.field_metadata.add_grouped_search_terms( - self.pref('grouped_search_terms', {})) + self._pref('grouped_search_terms', {})) self._search_api.change_locations(self.field_metadata.get_search_terms()) + self.dirtied_cache = {x:i for i, (x,) in enumerate( + self.backend.conn.execute('SELECT book FROM metadata_dirtied'))} + if self.dirtied_cache: + self.dirtied_sequence = max(self.dirtied_cache.itervalues())+1 + @property def field_metadata(self): return self.backend.field_metadata @@ -131,7 +139,7 @@ class Cache(object): mi.author_link_map = aul mi.comments = self._field_for('comments', book_id) mi.publisher = self._field_for('publisher', book_id) - n = now() + n = nowf() mi.timestamp = self._field_for('timestamp', book_id, default_value=n) mi.pubdate = self._field_for('pubdate', book_id, default_value=n) mi.uuid = self._field_for('uuid', book_id, @@ -413,7 +421,7 @@ class Cache(object): ret = i return ret - @api + @read_api def copy_cover_to(self, book_id, dest, use_hardlink=False): ''' Copy the cover to the file like object ``dest``. Returns False @@ -422,17 +430,15 @@ class Cache(object): copied to it iff the path is different from the current path (taking case sensitivity into account). ''' - with self.read_lock: - try: - path = self._field_for('path', book_id).replace('/', os.sep) - except: - return False + try: + path = self._field_for('path', book_id).replace('/', os.sep) + except AttributeError: + return False - with self.record_lock.lock(book_id): - return self.backend.copy_cover_to(path, dest, + return self.backend.copy_cover_to(path, dest, use_hardlink=use_hardlink) - @api + @read_api def copy_format_to(self, book_id, fmt, dest, use_hardlink=False): ''' Copy the format ``fmt`` to the file like object ``dest``. If the @@ -441,15 +447,13 @@ class Cache(object): the path is different from the current path (taking case sensitivity into account). ''' - with self.read_lock: - try: - name = self.fields['formats'].format_fname(book_id, fmt) - path = self._field_for('path', book_id).replace('/', os.sep) - except: - raise NoSuchFormat('Record %d has no %s file'%(book_id, fmt)) + try: + name = self.fields['formats'].format_fname(book_id, fmt) + path = self._field_for('path', book_id).replace('/', os.sep) + except (KeyError, AttributeError): + raise NoSuchFormat('Record %d has no %s file'%(book_id, fmt)) - with self.record_lock.lock(book_id): - return self.backend.copy_format_to(book_id, fmt, name, path, dest, + return self.backend.copy_format_to(book_id, fmt, name, path, dest, use_hardlink=use_hardlink) @read_api @@ -520,16 +524,16 @@ class Cache(object): this means that repeated calls yield the same temp file (which is re-created each time) ''' - with self.read_lock: - ext = ('.'+fmt.lower()) if fmt else '' - try: - fname = self.fields['formats'].format_fname(book_id, fmt) - except: - return None - fname += ext - + ext = ('.'+fmt.lower()) if fmt else '' if as_path: if preserve_filename: + with self.read_lock: + try: + fname = self.fields['formats'].format_fname(book_id, fmt) + except: + return None + fname += ext + bd = base_dir() d = os.path.join(bd, 'format_abspath') try: @@ -537,36 +541,40 @@ class Cache(object): except: pass ret = os.path.join(d, fname) - with self.record_lock.lock(book_id): - try: - self.copy_format_to(book_id, fmt, ret) - except NoSuchFormat: - return None + try: + self.copy_format_to(book_id, fmt, ret) + except NoSuchFormat: + return None else: - with PersistentTemporaryFile(ext) as pt, self.record_lock.lock(book_id): + with PersistentTemporaryFile(ext) as pt: try: self.copy_format_to(book_id, fmt, pt) except NoSuchFormat: return None ret = pt.name elif as_file: - ret = SpooledTemporaryFile(SPOOL_SIZE) - with self.record_lock.lock(book_id): + with self.read_lock: try: - self.copy_format_to(book_id, fmt, ret) - except NoSuchFormat: + fname = self.fields['formats'].format_fname(book_id, fmt) + except: return None + fname += ext + + ret = SpooledTemporaryFile(SPOOL_SIZE) + try: + self.copy_format_to(book_id, fmt, ret) + except NoSuchFormat: + return None ret.seek(0) # Various bits of code try to use the name as the default # title when reading metadata, so set it ret.name = fname else: buf = BytesIO() - with self.record_lock.lock(book_id): - try: - self.copy_format_to(book_id, fmt, buf) - except NoSuchFormat: - return None + try: + self.copy_format_to(book_id, fmt, buf) + except NoSuchFormat: + return None ret = buf.getvalue() @@ -620,6 +628,30 @@ class Cache(object): return get_categories(self, sort=sort, book_ids=book_ids, icon_map=icon_map) + @write_api + def update_last_modified(self, book_ids, now=None): + if now is None: + now = nowf() + if book_ids: + f = self.fields['last_modified'] + f.writer.set_books({book_id:now for book_id in book_ids}, self.backend) + + @write_api + def mark_as_dirty(self, book_ids): + self._update_last_modified(book_ids) + already_dirtied = set(self.dirtied_cache).intersection(book_ids) + new_dirtied = book_ids - already_dirtied + already_dirtied = {book_id:self.dirtied_sequence+i for i, book_id in enumerate(already_dirtied)} + if already_dirtied: + self.dirtied_sequence = max(already_dirtied.itervalues()) + 1 + self.dirtied_cache.update(already_dirtied) + if new_dirtied: + self.backend.conn.executemany('INSERT OR IGNORE INTO metadata_dirtied (book) VALUES (?)', + ((x,) for x in new_dirtied)) + new_dirtied = {book_id:self.dirtied_sequence+i for i, book_id in enumerate(new_dirtied)} + self.dirtied_sequence = max(new_dirtied.itervalues()) + 1 + self.dirtied_cache.update(new_dirtied) + @write_api def set_field(self, name, book_id_to_val_map, allow_case_change=True): f = self.fields[name] @@ -657,7 +689,7 @@ class Cache(object): if dirtied and update_path: self._update_path(dirtied, mark_as_dirtied=False) - # TODO: Mark these as dirtied so that the opf is regenerated + self._mark_as_dirty(dirtied) return dirtied @@ -668,9 +700,111 @@ class Cache(object): author = self._field_for('authors', book_id, default_value=(_('Unknown'),))[0] self.backend.update_path(book_id, title, author, self.fields['path'], self.fields['formats']) if mark_as_dirtied: + self._mark_as_dirty(book_ids) + + @read_api + def get_a_dirtied_book(self): + if self.dirtied_cache: + return random.choice(tuple(self.dirtied_cache.iterkeys())) + return None + + @read_api + def get_metadata_for_dump(self, book_id): + mi = None + # get the current sequence number for this book to pass back to the + # backup thread. This will avoid double calls in the case where the + # thread has not done the work between the put and the get_metadata + sequence = self.dirtied_cache.get(book_id, None) + if sequence is not None: + try: + # While a book is being created, the path is empty. Don't bother to + # try to write the opf, because it will go to the wrong folder. + if self._field_for('path', book_id): + mi = self._get_metadata(book_id) + # Always set cover to cover.jpg. Even if cover doesn't exist, + # no harm done. This way no need to call dirtied when + # cover is set/removed + mi.cover = 'cover.jpg' + except: + # This almost certainly means that the book has been deleted while + # the backup operation sat in the queue. pass - # TODO: Mark these books as dirtied so that metadata.opf is - # re-created + return mi, sequence + + @write_api + def clear_dirtied(self, book_id, sequence): + ''' + Clear the dirtied indicator for the books. This is used when fetching + metadata, creating an OPF, and writing a file are separated into steps. + The last step is clearing the indicator + ''' + dc_sequence = self.dirtied_cache.get(book_id, None) + if dc_sequence is None or sequence is None or dc_sequence == sequence: + self.backend.conn.execute('DELETE FROM metadata_dirtied WHERE book=?', + (book_id,)) + self.dirtied_cache.pop(book_id, None) + + @write_api + def write_backup(self, book_id, raw): + try: + path = self._field_for('path', book_id).replace('/', os.sep) + except: + return + + self.backend.write_backup(path, raw) + + @read_api + def dirty_queue_length(self): + return len(self.dirtied_cache) + + @read_api + def read_backup(self, book_id): + ''' Return the OPF metadata backup for the book as a bytestring or None + if no such backup exists. ''' + try: + path = self._field_for('path', book_id).replace('/', os.sep) + except: + return + + try: + return self.backend.read_backup(path) + except EnvironmentError: + return None + + @write_api + def dump_metadata(self, book_ids=None, remove_from_dirtied=True, + callback=None): + ''' + Write metadata for each record to an individual OPF file. If callback + is not None, it is called once at the start with the number of book_ids + being processed. And once for every book_id, with arguments (book_id, + mi, ok). + ''' + if book_ids is None: + book_ids = set(self.dirtied_cache) + + if callback is not None: + callback(len(book_ids), True, False) + + for book_id in book_ids: + if self._field_for('path', book_id) is None: + if callback is not None: + callback(book_id, None, False) + continue + mi, sequence = self._get_metadata_for_dump(book_id) + if mi is None: + if callback is not None: + callback(book_id, mi, False) + continue + try: + raw = metadata_to_opf(mi) + self._write_backup(book_id, raw) + if remove_from_dirtied: + self._clear_dirtied(book_id, sequence) + except: + pass + if callback is not None: + callback(book_id, mi, True) # }}} diff --git a/src/calibre/db/tests/main.py b/src/calibre/db/tests/main.py index c4bb058b7e..7268db3e99 100644 --- a/src/calibre/db/tests/main.py +++ b/src/calibre/db/tests/main.py @@ -16,8 +16,6 @@ if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('name', nargs='?', default=None, help='The name of the test to run, for e.g. writing.WritingTest.many_many_basic') args = parser.parse_args() - if args.name: - unittest.TextTestRunner(verbosity=4).run(unittest.defaultTestLoader.loadTestsFromName(args.name)) - else: - unittest.TextTestRunner(verbosity=4).run(find_tests()) + tests = unittest.defaultTestLoader.loadTestsFromName(args.name) if args.name else find_tests() + unittest.TextTestRunner(verbosity=4).run(tests) diff --git a/src/calibre/db/tests/writing.py b/src/calibre/db/tests/writing.py index 127bcd3609..c54d21f055 100644 --- a/src/calibre/db/tests/writing.py +++ b/src/calibre/db/tests/writing.py @@ -9,6 +9,7 @@ __docformat__ = 'restructuredtext en' from collections import namedtuple from functools import partial +from io import BytesIO from calibre.ebooks.metadata import author_to_author_sort from calibre.utils.date import UNDEFINED_DATE @@ -292,3 +293,65 @@ class WritingTest(BaseTest): # }}} + def test_dirtied(self): # {{{ + 'Test the setting of the dirtied flag and the last_modified column' + cl = self.cloned_library + cache = self.init_cache(cl) + ae, af, sf = self.assertEqual, self.assertFalse, cache.set_field + # First empty dirtied + cache.dump_metadata() + af(cache.dirtied_cache) + af(self.init_cache(cl).dirtied_cache) + + prev = cache.field_for('last_modified', 3) + import calibre.db.cache as c + from datetime import timedelta + utime = prev+timedelta(days=1) + onowf = c.nowf + c.nowf = lambda : utime + try: + ae(sf('title', {3:'xxx'}), set([3])) + self.assertTrue(3 in cache.dirtied_cache) + ae(cache.field_for('last_modified', 3), utime) + cache.dump_metadata() + raw = cache.read_backup(3) + from calibre.ebooks.metadata.opf2 import OPF + opf = OPF(BytesIO(raw)) + ae(opf.title, 'xxx') + finally: + c.nowf = onowf + # }}} + + def test_backup(self): # {{{ + 'Test the automatic backup of changed metadata' + cl = self.cloned_library + cache = self.init_cache(cl) + ae, af, sf, ff = self.assertEqual, self.assertFalse, cache.set_field, cache.field_for + # First empty dirtied + cache.dump_metadata() + af(cache.dirtied_cache) + from calibre.db.backup import MetadataBackup + interval = 0.01 + mb = MetadataBackup(cache, interval=interval, scheduling_interval=0) + mb.start() + try: + ae(sf('title', {1:'title1', 2:'title2', 3:'title3'}), {1,2,3}) + ae(sf('authors', {1:'author1 & author2', 2:'author1 & author2', 3:'author1 & author2'}), {1,2,3}) + count = 6 + while cache.dirty_queue_length() and count > 0: + mb.join(interval) + count -= 1 + af(cache.dirty_queue_length()) + finally: + mb.stop() + mb.join(interval) + af(mb.is_alive()) + from calibre.ebooks.metadata.opf2 import OPF + for book_id in (1, 2, 3): + raw = cache.read_backup(book_id) + opf = OPF(BytesIO(raw)) + ae(opf.title, 'title%d'%book_id) + ae(opf.authors, ['author1', 'author2']) + # }}} + + diff --git a/src/calibre/utils/monotonic.py b/src/calibre/utils/monotonic.py new file mode 100644 index 0000000000..2bda006929 --- /dev/null +++ b/src/calibre/utils/monotonic.py @@ -0,0 +1,104 @@ +# vim:fileencoding=utf-8 + +from __future__ import division, absolute_import + +try: + try: + # >=python-3.3, Unix + from time import clock_gettime + try: + # >={kernel}-sources-2.6.28 + from time import CLOCK_MONOTONIC_RAW as CLOCK_ID + except ImportError: + from time import CLOCK_MONOTONIC as CLOCK_ID # NOQA + + monotonic = lambda: clock_gettime(CLOCK_ID) + + except ImportError: + # >=python-3.3 + from time import monotonic # NOQA + +except ImportError: + import ctypes + import sys + + try: + if sys.platform == 'win32': + # Windows only + GetTickCount64 = ctypes.windll.kernel32.GetTickCount64 + GetTickCount64.restype = ctypes.c_ulonglong + + def monotonic(): # NOQA + return GetTickCount64() / 1000 + + elif sys.platform == 'darwin': + # Mac OS X + from ctypes.util import find_library + + libc_name = find_library('c') + if not libc_name: + raise OSError + + libc = ctypes.CDLL(libc_name, use_errno=True) + + mach_absolute_time = libc.mach_absolute_time + mach_absolute_time.argtypes = () + mach_absolute_time.restype = ctypes.c_uint64 + + class mach_timebase_info_data_t(ctypes.Structure): + _fields_ = ( + ('numer', ctypes.c_uint32), + ('denom', ctypes.c_uint32), + ) + mach_timebase_info_data_p = ctypes.POINTER(mach_timebase_info_data_t) + + _mach_timebase_info = libc.mach_timebase_info + _mach_timebase_info.argtypes = (mach_timebase_info_data_p,) + _mach_timebase_info.restype = ctypes.c_int + + def mach_timebase_info(): + timebase = mach_timebase_info_data_t() + _mach_timebase_info(ctypes.byref(timebase)) + return (timebase.numer, timebase.denom) + + timebase = mach_timebase_info() + factor = timebase[0] / timebase[1] * 1e-9 + + def monotonic(): # NOQA + return mach_absolute_time() * factor + else: + # linux only (no librt on OS X) + import os + + # See + CLOCK_MONOTONIC = 1 + CLOCK_MONOTONIC_RAW = 4 + + class timespec(ctypes.Structure): + _fields_ = ( + ('tv_sec', ctypes.c_long), + ('tv_nsec', ctypes.c_long) + ) + tspec = timespec() + + librt = ctypes.CDLL('librt.so.1', use_errno=True) + clock_gettime = librt.clock_gettime + clock_gettime.argtypes = [ctypes.c_int, ctypes.POINTER(timespec)] + + if clock_gettime(CLOCK_MONOTONIC_RAW, ctypes.pointer(tspec)) == 0: + # >={kernel}-sources-2.6.28 + clock_id = CLOCK_MONOTONIC_RAW + elif clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(tspec)) == 0: + clock_id = CLOCK_MONOTONIC + else: + raise OSError + + def monotonic(): # NOQA + if clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(tspec)) != 0: + errno_ = ctypes.get_errno() + raise OSError(errno_, os.strerror(errno_)) + return tspec.tv_sec + tspec.tv_nsec / 1e9 + + except: + from time import time as monotonic # NOQA + monotonic