From aeae26d05339df179cd9a498cf3315d449f54230 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Tue, 11 Apr 2023 15:26:42 +0530 Subject: [PATCH] Implement undelete of book from trash --- src/calibre/db/backend.py | 77 ++++++++++++++++++++++-------- src/calibre/db/cache.py | 40 ++++++++++++++-- src/calibre/db/restore.py | 43 +++++++++-------- src/calibre/db/tests/add_remove.py | 45 ++++++++++++++--- 4 files changed, 154 insertions(+), 51 deletions(-) diff --git a/src/calibre/db/backend.py b/src/calibre/db/backend.py index 2e5a3f783f..da9f079675 100644 --- a/src/calibre/db/backend.py +++ b/src/calibre/db/backend.py @@ -67,6 +67,8 @@ WINDOWS_RESERVED_NAMES = frozenset('CON PRN AUX NUL COM1 COM2 COM3 COM4 COM5 COM @dataclass class TrashEntry: book_id: int + title: str + author: str book_dir: str mtime: float formats: Sequence[str] = () @@ -509,6 +511,8 @@ class DB: self.initialize_tables() self.set_user_template_functions(compile_user_template_functions( self.prefs.get('user_template_functions', []))) + if self.prefs['last_expired_trash_at'] > 0: + self.ensure_trash_dir() if load_user_formatter_functions: set_global_state(self) @@ -1539,15 +1543,16 @@ class DB: atomic_rename(src_path, dest_path) return os.path.getsize(dest_path) - def remove_formats(self, remove_map): + def remove_formats(self, remove_map, metadata_map): self.ensure_trash_dir() - paths = set() for book_id, removals in iteritems(remove_map): + paths = set() for fmt, fname, path in removals: path = self.format_abspath(book_id, fmt, fname, path) if path: paths.add(path) - self.move_book_files_to_trash(book_id, paths) + if paths: + self.move_book_files_to_trash(book_id, paths, metadata_map[book_id]) def cover_last_modified(self, path): path = os.path.abspath(os.path.join(self.library_path, path, COVER_FILE_NAME)) @@ -1895,15 +1900,14 @@ class DB: removals = [] for base in ('b', 'f'): base = os.path.join(self.trash_dir, base) - for entries in os.scandir(base): - for x in entries: - try: - st = x.stat(follow_symlinks=False) - mtime = st.st_mtime - except OSError: - mtime = 0 - if mtime + expire_age_in_seconds < now: - removals.append(x.path) + for x in os.scandir(base): + try: + st = x.stat(follow_symlinks=False) + mtime = st.st_mtime + except OSError: + mtime = 0 + if mtime + expire_age_in_seconds <= now: + removals.append(x.path) for x in removals: rmtree_with_retry(x) @@ -1913,7 +1917,7 @@ class DB: rmtree_with_retry(dest) copy_tree(book_dir_abspath, dest, delete_source=True) - def move_book_files_to_trash(self, book_id, format_abspaths): + def move_book_files_to_trash(self, book_id, format_abspaths, metadata): dest = os.path.join(self.trash_dir, 'f', str(book_id)) if not os.path.exists(dest): os.makedirs(dest) @@ -1921,12 +1925,41 @@ class DB: for path in format_abspaths: ext = path.rpartition('.')[-1].lower() fmap[path] = os.path.join(dest, ext) + with open(os.path.join(dest, 'metadata.json'), 'wb') as f: + f.write(json.dumps(metadata).encode('utf-8')) copy_files(fmap, delete_source=True) + def get_metadata_for_trash_book(self, book_id, read_annotations=True): + from .restore import read_opf + bdir = os.path.join(self.trash_dir, 'b', str(book_id)) + if not os.path.isdir(bdir): + raise ValueError(f'The book {book_id} not present in the trash folder') + mi, _, annotations = read_opf(bdir, read_annotations=read_annotations) + formats = [] + for x in os.scandir(bdir): + if x.is_file() and x.name not in (COVER_FILE_NAME, 'metadata.opf') and '.' in x.name: + try: + size = x.stat(follow_symlinks=False).st_size + except OSError: + continue + fname, ext = os.path.splitext(x.name) + formats.append((ext[1:].upper(), size, fname)) + return mi, annotations, formats + + def move_book_from_trash(self, book_id, path): + bdir = os.path.join(self.trash_dir, 'b', str(book_id)) + if not os.path.isdir(bdir): + raise ValueError(f'The book {book_id} not present in the trash folder') + dest = os.path.abspath(os.path.join(self.library_path, path)) + copy_tree(bdir, dest, delete_source=True) + def list_trash_entries(self): + from calibre.ebooks.metadata.opf2 import OPF self.ensure_trash_dir() books, files = [], [] base = os.path.join(self.trash_dir, 'b') + unknown = _('Unknown') + au = (unknown,) for x in os.scandir(base): if x.is_dir(follow_symlinks=False): try: @@ -1934,8 +1967,10 @@ class DB: mtime = x.stat(follow_symlinks=False).st_mtime except Exception: continue - books.append(TrashEntry(book_id, x.path, mtime)) + opf = OPF(os.path.join(x.path, 'metadata.opf'), basedir=x.path) + books.append(TrashEntry(book_id, opf.title or unknown, (opf.authors or au)[0], x.path, mtime)) base = os.path.join(self.trash_dir, 'f') + um = {'title': unknown, 'authors': au} for x in os.scandir(base): if x.is_dir(follow_symlinks=False): try: @@ -1944,11 +1979,16 @@ class DB: except Exception: continue formats = set() + metadata = um for f in os.scandir(x.path): if f.is_file(follow_symlinks=False): - formats.add(f.name.upper()) + if f.name == 'metadata.json': + with open(f.path, 'rb') as mf: + metadata = json.loads(mf.read()) + else: + formats.add(f.name.upper()) if formats: - files.append(TrashEntry(book_id, x.path, mtime, tuple(formats))) + files.append(TrashEntry(book_id, metadata.get('title') or unknown, (metadata.get('authors') or au)[0], x.path, mtime, tuple(formats))) return books, files def remove_books(self, path_map, permanent=False): @@ -2302,11 +2342,6 @@ class DB: self.conn # Connect to the moved metadata.db progress(_('Completed'), total, total) - def restore_book(self, book_id, path, formats): - self.execute('UPDATE books SET path=? WHERE id=?', (path.replace(os.sep, '/'), book_id)) - vals = [(book_id, fmt, size, name) for fmt, size, name in formats] - self.executemany('INSERT INTO data (book,format,uncompressed_size,name) VALUES (?,?,?,?)', vals) - def backup_database(self, path): with closing(apsw.Connection(path)) as dest_db: with dest_db.backup('main', self.conn, 'main') as b: diff --git a/src/calibre/db/cache.py b/src/calibre/db/cache.py index bb88c83143..3727840c92 100644 --- a/src/calibre/db/cache.py +++ b/src/calibre/db/cache.py @@ -1858,6 +1858,7 @@ class Cache: if not db_only: removes = defaultdict(set) + metadata_map = {} for book_id, fmts in iteritems(formats_map): try: path = self._field_for('path', book_id).replace('/', os.sep) @@ -1870,8 +1871,10 @@ class Cache: continue if name and path: removes[book_id].add((fmt, name, path)) + if removes[book_id]: + metadata_map[book_id] = {'title': self._field_for('title', book_id), 'authors': self._field_for('authors', book_id)} if removes: - self.backend.remove_formats(removes) + self.backend.remove_formats(removes, metadata_map) size_map = table.remove_formats(formats_map, self.backend) self.fields['size'].table.update_sizes(size_map) @@ -2660,17 +2663,46 @@ class Cache: def is_closed(self): return self.backend.is_closed + @read_api + def list_trash_entries(self): + return self.backend.list_trash_entries() + + @write_api + def move_book_from_trash(self, book_id): + ''' Undelete a book from the trash directory ''' + if self._has_id(book_id): + raise ValueError(f'A book with the id {book_id} already exists') + mi, annotations, formats = self.backend.get_metadata_for_trash_book(book_id) + mi.cover = None + self._create_book_entry(mi, add_duplicates=True, + force_id=book_id, apply_import_tags=False, preserve_uuid=True) + path = self._field_for('path', book_id).replace('/', os.sep) + self.backend.move_book_from_trash(book_id, path) + self.format_metadata_cache.pop(book_id, None) + f = self.fields['formats'].table + max_size = 0 + for (fmt, size, fname) in formats: + max_size = max(max_size, f.update_fmt(book_id, fmt, fname, size, self.backend)) + self.fields['size'].table.update_sizes({book_id: max_size}) + cover = self.backend.cover_abspath(book_id, path) + if cover and os.path.exists(cover): + self._set_field('cover', {book_id:1}) + if annotations: + self._restore_annotations(book_id, annotations) + @write_api def restore_book(self, book_id, mi, last_modified, path, formats, annotations=()): ''' Restore the book entry in the database for a book that already exists on the filesystem ''' - cover = mi.cover - mi.cover = None + cover, mi.cover = mi.cover, None self._create_book_entry(mi, add_duplicates=True, force_id=book_id, apply_import_tags=False, preserve_uuid=True) self._update_last_modified((book_id,), last_modified) if cover and os.path.exists(cover): self._set_field('cover', {book_id:1}) - self.backend.restore_book(book_id, path, formats) + f = self.fields['formats'].table + for (fmt, size, fname) in formats: + f.update_fmt(book_id, fmt, fname, size, self.backend) + self.fields['path'].table.set_path(book_id, path, self.backend) if annotations: self._restore_annotations(book_id, annotations) diff --git a/src/calibre/db/restore.py b/src/calibre/db/restore.py index e6269e2cdc..3bc736de3c 100644 --- a/src/calibre/db/restore.py +++ b/src/calibre/db/restore.py @@ -28,6 +28,26 @@ NON_EBOOK_EXTENSIONS = frozenset(( )) +def read_opf(dirpath, read_annotations=True): + opf = os.path.join(dirpath, 'metadata.opf') + parsed_opf = OPF(opf, basedir=dirpath) + mi = parsed_opf.to_book_metadata() + annotations = tuple(parsed_opf.read_annotations()) if read_annotations else () + timestamp = os.path.getmtime(opf) + return mi, timestamp, annotations + + +def is_ebook_file(filename): + ext = os.path.splitext(filename)[1] + if not ext: + return False + ext = ext[1:].lower() + bad_ext_pat = re.compile(r'[^a-z0-9_]+') + if ext in NON_EBOOK_EXTENSIONS or bad_ext_pat.search(ext) is not None: + return False + return True + + class Restorer(Cache): def __init__(self, library_path, default_prefs=None, restore_all_prefs=False, progress_callback=lambda x, y:True): @@ -51,7 +71,6 @@ class Restore(Thread): self.src_library_path = os.path.abspath(library_path) self.progress_callback = progress_callback self.db_id_regexp = re.compile(r'^.* \((\d+)\)$') - self.bad_ext_pat = re.compile(r'[^a-z0-9_]+') if not callable(self.progress_callback): self.progress_callback = lambda x, y: x self.dirs = [] @@ -178,29 +197,15 @@ class Restore(Thread): self.failed_dirs.append((dirpath, traceback.format_exc())) self.progress_callback(_('Processed') + ' ' + dirpath, i+1) - def is_ebook_file(self, filename): - ext = os.path.splitext(filename)[1] - if not ext: - return False - ext = ext[1:].lower() - if ext in NON_EBOOK_EXTENSIONS or \ - self.bad_ext_pat.search(ext) is not None: - return False - return True - def process_dir(self, dirpath, filenames, book_id): book_id = int(book_id) - formats = list(filter(self.is_ebook_file, filenames)) + formats = list(filter(is_ebook_file, filenames)) fmts = [os.path.splitext(x)[1][1:].upper() for x in formats] sizes = [os.path.getsize(os.path.join(dirpath, x)) for x in formats] names = [os.path.splitext(x)[0] for x in formats] - opf = os.path.join(dirpath, 'metadata.opf') - parsed_opf = OPF(opf, basedir=dirpath) - mi = parsed_opf.to_book_metadata() - annotations = tuple(parsed_opf.read_annotations()) - timestamp = os.path.getmtime(opf) - path = os.path.relpath(dirpath, self.src_library_path).replace(os.sep, - '/') + + mi, timestamp, annotations = read_opf(dirpath) + path = os.path.relpath(dirpath, self.src_library_path).replace(os.sep, '/') if int(mi.application_id) == book_id: self.books.append({ diff --git a/src/calibre/db/tests/add_remove.py b/src/calibre/db/tests/add_remove.py index 49e25cbc5d..f0dba2f2b1 100644 --- a/src/calibre/db/tests/add_remove.py +++ b/src/calibre/db/tests/add_remove.py @@ -5,14 +5,17 @@ __license__ = 'GPL v3' __copyright__ = '2013, Kovid Goyal ' __docformat__ = 'restructuredtext en' -import os, glob +import glob +import os +from datetime import timedelta from io import BytesIO from tempfile import NamedTemporaryFile -from datetime import timedelta -from calibre.db.tests.base import BaseTest, IMG +from calibre.db.tests.base import IMG, BaseTest from calibre.ptempfile import PersistentTemporaryFile -from calibre.utils.date import now, UNDEFINED_DATE +from calibre.utils.date import UNDEFINED_DATE, now, utcnow +from calibre.utils.img import image_from_path +from calibre.utils.resources import get_image_path from polyglot.builtins import iteritems, itervalues @@ -215,6 +218,7 @@ class AddRemoveTest(BaseTest): def test_remove_books(self): # {{{ 'Test removal of books' cl = self.cloned_library + cl2 = self.cloned_library cache = self.init_cache() af, ae = self.assertFalse, self.assertEqual authors = cache.fields['authors'].table @@ -261,10 +265,11 @@ class AddRemoveTest(BaseTest): self.assertFalse(table.col_book_map) # Test the delete service + # test basic delete book and cache expiry cache = self.init_cache(cl) - # Check that files are removed fmtpath = cache.format_abspath(1, 'FMT1') bookpath = os.path.dirname(fmtpath) + title = cache.field_for('title', 1) os.mkdir(os.path.join(bookpath, 'xyz')) open(os.path.join(bookpath, 'xyz', 'abc'), 'w').close() authorpath = os.path.dirname(bookpath) @@ -272,10 +277,36 @@ class AddRemoveTest(BaseTest): cache.remove_books((1,)) for x in (fmtpath, bookpath, authorpath): af(os.path.exists(x), 'The file %s exists, when it should not' % x) - b, f = cache.backend.list_trash_entries() + b, f = cache.list_trash_entries() self.assertEqual(len(b), 1) self.assertEqual(len(f), 0) + self.assertEqual(b[0].title, title) self.assertTrue(os.path.exists(os.path.join(b[0].book_dir, 'metadata.opf'))) + cache.backend.expire_old_trash(1000) + self.assertTrue(os.path.exists(os.path.join(b[0].book_dir, 'metadata.opf'))) + cache.backend.expire_old_trash(0) + self.assertFalse(os.path.exists(os.path.join(b[0].book_dir, 'metadata.opf'))) + + # test restoring of books + cache = self.init_cache(cl2) + cache.set_cover({1: image_from_path(get_image_path('lt.png', allow_user_override=False))}) + fmtpath = cache.format_abspath(1, 'FMT1') + bookpath = os.path.dirname(fmtpath) + cache.set_annotations_for_book(1, 'FMT1', [({'title': 'else', 'type': 'bookmark', 'timestamp': utcnow().isoformat()}, 1)]) + annots_before = cache.all_annotations_for_book(1) + fm_before = cache.format_metadata(1, 'FMT1', allow_cache=False), cache.format_metadata(1, 'FMT2', allow_cache=False) + os.mkdir(os.path.join(bookpath, 'xyz')) + open(os.path.join(bookpath, 'xyz', 'abc'), 'w').close() + cache.remove_books((1,)) + cache.move_book_from_trash(1) + b, f = cache.list_trash_entries() + self.assertEqual(len(b), 0) + self.assertEqual(len(f), 0) + self.assertEqual(fmtpath, cache.format_abspath(1, 'FMT1')) + self.assertEqual(fm_before, (cache.format_metadata(1, 'FMT1', allow_cache=False), cache.format_metadata(1, 'FMT2', allow_cache=False))) + self.assertEqual(annots_before, cache.all_annotations_for_book(1)) + self.assertTrue(cache.cover(1)) + self.assertTrue(os.path.exists(os.path.join(bookpath, 'xyz', 'abc'))) # }}} def test_original_fmt(self): # {{{ @@ -315,7 +346,7 @@ class AddRemoveTest(BaseTest): def test_copy_to_library(self): # {{{ from calibre.db.copy_to_library import copy_one_book from calibre.ebooks.metadata import authors_to_string - from calibre.utils.date import utcnow, EPOCH + from calibre.utils.date import EPOCH, utcnow src_db = self.init_cache() dest_db = self.init_cache(self.cloned_library)