mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-09 03:04:10 -04:00
Code to restore deleted format files
This commit is contained in:
parent
fff9686192
commit
7a5176e1b4
@ -1746,7 +1746,7 @@ class DB:
|
||||
# rename rather than remove, so that if something goes
|
||||
# wrong in the rest of this function, at least the file is
|
||||
# not deleted
|
||||
os.rename(old_path, dest)
|
||||
os.replace(old_path, dest)
|
||||
except OSError as e:
|
||||
if getattr(e, 'errno', None) != errno.ENOENT:
|
||||
# Failing to rename the old format will at worst leave a
|
||||
@ -1754,7 +1754,17 @@ class DB:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
if (not getattr(stream, 'name', False) or not samefile(dest, stream.name)):
|
||||
if isinstance(stream, str) and stream:
|
||||
try:
|
||||
os.replace(stream, dest)
|
||||
except OSError:
|
||||
if iswindows:
|
||||
time.sleep(1)
|
||||
os.replace(stream, dest)
|
||||
else:
|
||||
raise
|
||||
size = os.path.getsize(dest)
|
||||
elif (not getattr(stream, 'name', False) or not samefile(dest, stream.name)):
|
||||
with open(dest, 'wb') as f:
|
||||
shutil.copyfileobj(stream, f)
|
||||
size = f.tell()
|
||||
@ -1953,6 +1963,20 @@ class DB:
|
||||
dest = os.path.abspath(os.path.join(self.library_path, path))
|
||||
copy_tree(bdir, dest, delete_source=True)
|
||||
|
||||
def path_for_trash_format(self, book_id, fmt):
|
||||
bdir = os.path.join(self.trash_dir, 'f', str(book_id))
|
||||
if not os.path.isdir(bdir):
|
||||
return ''
|
||||
path = os.path.join(bdir, fmt.lower())
|
||||
if not os.path.exists(path):
|
||||
path = ''
|
||||
return path
|
||||
|
||||
def remove_trash_formats_dir_if_empty(self, book_id):
|
||||
bdir = os.path.join(self.trash_dir, 'f', str(book_id))
|
||||
if os.path.isdir(bdir) and len(os.listdir(bdir)) <= 1: # dont count metadata.json
|
||||
self.rmtree(bdir)
|
||||
|
||||
def list_trash_entries(self):
|
||||
from calibre.ebooks.metadata.opf2 import OPF
|
||||
self.ensure_trash_dir()
|
||||
|
@ -2673,6 +2673,26 @@ class Cache:
|
||||
e.cover_path = self.format_abspath(e.book_id, '__COVER_INTERNAL__')
|
||||
return books, formats
|
||||
|
||||
@write_api
|
||||
def move_format_from_trash(self, book_id, fmt):
|
||||
''' Undelete a format from the trash directory '''
|
||||
if not self._has_id(book_id):
|
||||
raise ValueError(f'A book with the id {book_id} does not exist')
|
||||
fmt = fmt.upper()
|
||||
try:
|
||||
name = self.fields['formats'].format_fname(book_id, fmt)
|
||||
except Exception:
|
||||
name = None
|
||||
fpath = self.backend.path_for_trash_format(book_id, fmt)
|
||||
if not fpath:
|
||||
raise ValueError(f'No format {fmt} found in book {book_id}')
|
||||
size, fname = self._do_add_format(book_id, fmt, fpath, name)
|
||||
self.format_metadata_cache.pop(book_id, None)
|
||||
max_size = self.fields['formats'].table.update_fmt(book_id, fmt, fname, size, self.backend)
|
||||
self.fields['size'].table.update_sizes({book_id: max_size})
|
||||
self.event_dispatcher(EventType.format_added, book_id, fmt)
|
||||
self.backend.remove_trash_formats_dir_if_empty(book_id)
|
||||
|
||||
@write_api
|
||||
def move_book_from_trash(self, book_id):
|
||||
''' Undelete a book from the trash directory '''
|
||||
|
@ -219,6 +219,7 @@ class AddRemoveTest(BaseTest):
|
||||
'Test removal of books'
|
||||
cl = self.cloned_library
|
||||
cl2 = self.cloned_library
|
||||
cl3 = self.cloned_library
|
||||
cache = self.init_cache()
|
||||
af, ae = self.assertFalse, self.assertEqual
|
||||
authors = cache.fields['authors'].table
|
||||
@ -307,6 +308,21 @@ class AddRemoveTest(BaseTest):
|
||||
self.assertEqual(annots_before, cache.all_annotations_for_book(1))
|
||||
self.assertTrue(cache.cover(1))
|
||||
self.assertTrue(os.path.exists(os.path.join(bookpath, 'xyz', 'abc')))
|
||||
|
||||
# test restoring of formats
|
||||
cache = self.init_cache(cl3)
|
||||
all_formats = cache.formats(1)
|
||||
cache.remove_formats({1: all_formats})
|
||||
self.assertFalse(cache.formats(1))
|
||||
b, f = cache.list_trash_entries()
|
||||
self.assertEqual(len(b), 0)
|
||||
self.assertEqual(len(f), 1)
|
||||
self.assertEqual(f[0].title, title)
|
||||
self.assertTrue(f[0].cover_path)
|
||||
for fmt in all_formats:
|
||||
cache.move_format_from_trash(1, fmt)
|
||||
self.assertEqual(all_formats, cache.formats(1))
|
||||
self.assertFalse(os.listdir(os.path.join(cache.backend.trash_dir, 'f')))
|
||||
# }}}
|
||||
|
||||
def test_original_fmt(self): # {{{
|
||||
|
Loading…
x
Reference in New Issue
Block a user