From 7a5176e1b497304b97dde1090143281c7f151f0f Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Tue, 11 Apr 2023 21:41:49 +0530 Subject: [PATCH] Code to restore deleted format files --- src/calibre/db/backend.py | 28 ++++++++++++++++++++++++++-- src/calibre/db/cache.py | 20 ++++++++++++++++++++ src/calibre/db/tests/add_remove.py | 16 ++++++++++++++++ 3 files changed, 62 insertions(+), 2 deletions(-) diff --git a/src/calibre/db/backend.py b/src/calibre/db/backend.py index 76cf7168b9..5514010db1 100644 --- a/src/calibre/db/backend.py +++ b/src/calibre/db/backend.py @@ -1746,7 +1746,7 @@ class DB: # rename rather than remove, so that if something goes # wrong in the rest of this function, at least the file is # not deleted - os.rename(old_path, dest) + os.replace(old_path, dest) except OSError as e: if getattr(e, 'errno', None) != errno.ENOENT: # Failing to rename the old format will at worst leave a @@ -1754,7 +1754,17 @@ class DB: import traceback traceback.print_exc() - if (not getattr(stream, 'name', False) or not samefile(dest, stream.name)): + if isinstance(stream, str) and stream: + try: + os.replace(stream, dest) + except OSError: + if iswindows: + time.sleep(1) + os.replace(stream, dest) + else: + raise + size = os.path.getsize(dest) + elif (not getattr(stream, 'name', False) or not samefile(dest, stream.name)): with open(dest, 'wb') as f: shutil.copyfileobj(stream, f) size = f.tell() @@ -1953,6 +1963,20 @@ class DB: dest = os.path.abspath(os.path.join(self.library_path, path)) copy_tree(bdir, dest, delete_source=True) + def path_for_trash_format(self, book_id, fmt): + bdir = os.path.join(self.trash_dir, 'f', str(book_id)) + if not os.path.isdir(bdir): + return '' + path = os.path.join(bdir, fmt.lower()) + if not os.path.exists(path): + path = '' + return path + + def remove_trash_formats_dir_if_empty(self, book_id): + bdir = os.path.join(self.trash_dir, 'f', str(book_id)) + if os.path.isdir(bdir) and len(os.listdir(bdir)) <= 1: # dont count metadata.json + self.rmtree(bdir) + def list_trash_entries(self): from calibre.ebooks.metadata.opf2 import OPF self.ensure_trash_dir() diff --git a/src/calibre/db/cache.py b/src/calibre/db/cache.py index 2160a92ede..9457cd2d0d 100644 --- a/src/calibre/db/cache.py +++ b/src/calibre/db/cache.py @@ -2673,6 +2673,26 @@ class Cache: e.cover_path = self.format_abspath(e.book_id, '__COVER_INTERNAL__') return books, formats + @write_api + def move_format_from_trash(self, book_id, fmt): + ''' Undelete a format from the trash directory ''' + if not self._has_id(book_id): + raise ValueError(f'A book with the id {book_id} does not exist') + fmt = fmt.upper() + try: + name = self.fields['formats'].format_fname(book_id, fmt) + except Exception: + name = None + fpath = self.backend.path_for_trash_format(book_id, fmt) + if not fpath: + raise ValueError(f'No format {fmt} found in book {book_id}') + size, fname = self._do_add_format(book_id, fmt, fpath, name) + self.format_metadata_cache.pop(book_id, None) + max_size = self.fields['formats'].table.update_fmt(book_id, fmt, fname, size, self.backend) + self.fields['size'].table.update_sizes({book_id: max_size}) + self.event_dispatcher(EventType.format_added, book_id, fmt) + self.backend.remove_trash_formats_dir_if_empty(book_id) + @write_api def move_book_from_trash(self, book_id): ''' Undelete a book from the trash directory ''' diff --git a/src/calibre/db/tests/add_remove.py b/src/calibre/db/tests/add_remove.py index 935ba2f8d2..33e069e5bf 100644 --- a/src/calibre/db/tests/add_remove.py +++ b/src/calibre/db/tests/add_remove.py @@ -219,6 +219,7 @@ class AddRemoveTest(BaseTest): 'Test removal of books' cl = self.cloned_library cl2 = self.cloned_library + cl3 = self.cloned_library cache = self.init_cache() af, ae = self.assertFalse, self.assertEqual authors = cache.fields['authors'].table @@ -307,6 +308,21 @@ class AddRemoveTest(BaseTest): self.assertEqual(annots_before, cache.all_annotations_for_book(1)) self.assertTrue(cache.cover(1)) self.assertTrue(os.path.exists(os.path.join(bookpath, 'xyz', 'abc'))) + + # test restoring of formats + cache = self.init_cache(cl3) + all_formats = cache.formats(1) + cache.remove_formats({1: all_formats}) + self.assertFalse(cache.formats(1)) + b, f = cache.list_trash_entries() + self.assertEqual(len(b), 0) + self.assertEqual(len(f), 1) + self.assertEqual(f[0].title, title) + self.assertTrue(f[0].cover_path) + for fmt in all_formats: + cache.move_format_from_trash(1, fmt) + self.assertEqual(all_formats, cache.formats(1)) + self.assertFalse(os.listdir(os.path.join(cache.backend.trash_dir, 'f'))) # }}} def test_original_fmt(self): # {{{