mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-08 02:34:06 -04:00
Implement undelete of book from trash
This commit is contained in:
parent
5c5ac19935
commit
aeae26d053
@ -67,6 +67,8 @@ WINDOWS_RESERVED_NAMES = frozenset('CON PRN AUX NUL COM1 COM2 COM3 COM4 COM5 COM
|
|||||||
@dataclass
|
@dataclass
|
||||||
class TrashEntry:
|
class TrashEntry:
|
||||||
book_id: int
|
book_id: int
|
||||||
|
title: str
|
||||||
|
author: str
|
||||||
book_dir: str
|
book_dir: str
|
||||||
mtime: float
|
mtime: float
|
||||||
formats: Sequence[str] = ()
|
formats: Sequence[str] = ()
|
||||||
@ -509,6 +511,8 @@ class DB:
|
|||||||
self.initialize_tables()
|
self.initialize_tables()
|
||||||
self.set_user_template_functions(compile_user_template_functions(
|
self.set_user_template_functions(compile_user_template_functions(
|
||||||
self.prefs.get('user_template_functions', [])))
|
self.prefs.get('user_template_functions', [])))
|
||||||
|
if self.prefs['last_expired_trash_at'] > 0:
|
||||||
|
self.ensure_trash_dir()
|
||||||
if load_user_formatter_functions:
|
if load_user_formatter_functions:
|
||||||
set_global_state(self)
|
set_global_state(self)
|
||||||
|
|
||||||
@ -1539,15 +1543,16 @@ class DB:
|
|||||||
atomic_rename(src_path, dest_path)
|
atomic_rename(src_path, dest_path)
|
||||||
return os.path.getsize(dest_path)
|
return os.path.getsize(dest_path)
|
||||||
|
|
||||||
def remove_formats(self, remove_map):
|
def remove_formats(self, remove_map, metadata_map):
|
||||||
self.ensure_trash_dir()
|
self.ensure_trash_dir()
|
||||||
paths = set()
|
|
||||||
for book_id, removals in iteritems(remove_map):
|
for book_id, removals in iteritems(remove_map):
|
||||||
|
paths = set()
|
||||||
for fmt, fname, path in removals:
|
for fmt, fname, path in removals:
|
||||||
path = self.format_abspath(book_id, fmt, fname, path)
|
path = self.format_abspath(book_id, fmt, fname, path)
|
||||||
if path:
|
if path:
|
||||||
paths.add(path)
|
paths.add(path)
|
||||||
self.move_book_files_to_trash(book_id, paths)
|
if paths:
|
||||||
|
self.move_book_files_to_trash(book_id, paths, metadata_map[book_id])
|
||||||
|
|
||||||
def cover_last_modified(self, path):
|
def cover_last_modified(self, path):
|
||||||
path = os.path.abspath(os.path.join(self.library_path, path, COVER_FILE_NAME))
|
path = os.path.abspath(os.path.join(self.library_path, path, COVER_FILE_NAME))
|
||||||
@ -1895,14 +1900,13 @@ class DB:
|
|||||||
removals = []
|
removals = []
|
||||||
for base in ('b', 'f'):
|
for base in ('b', 'f'):
|
||||||
base = os.path.join(self.trash_dir, base)
|
base = os.path.join(self.trash_dir, base)
|
||||||
for entries in os.scandir(base):
|
for x in os.scandir(base):
|
||||||
for x in entries:
|
|
||||||
try:
|
try:
|
||||||
st = x.stat(follow_symlinks=False)
|
st = x.stat(follow_symlinks=False)
|
||||||
mtime = st.st_mtime
|
mtime = st.st_mtime
|
||||||
except OSError:
|
except OSError:
|
||||||
mtime = 0
|
mtime = 0
|
||||||
if mtime + expire_age_in_seconds < now:
|
if mtime + expire_age_in_seconds <= now:
|
||||||
removals.append(x.path)
|
removals.append(x.path)
|
||||||
for x in removals:
|
for x in removals:
|
||||||
rmtree_with_retry(x)
|
rmtree_with_retry(x)
|
||||||
@ -1913,7 +1917,7 @@ class DB:
|
|||||||
rmtree_with_retry(dest)
|
rmtree_with_retry(dest)
|
||||||
copy_tree(book_dir_abspath, dest, delete_source=True)
|
copy_tree(book_dir_abspath, dest, delete_source=True)
|
||||||
|
|
||||||
def move_book_files_to_trash(self, book_id, format_abspaths):
|
def move_book_files_to_trash(self, book_id, format_abspaths, metadata):
|
||||||
dest = os.path.join(self.trash_dir, 'f', str(book_id))
|
dest = os.path.join(self.trash_dir, 'f', str(book_id))
|
||||||
if not os.path.exists(dest):
|
if not os.path.exists(dest):
|
||||||
os.makedirs(dest)
|
os.makedirs(dest)
|
||||||
@ -1921,12 +1925,41 @@ class DB:
|
|||||||
for path in format_abspaths:
|
for path in format_abspaths:
|
||||||
ext = path.rpartition('.')[-1].lower()
|
ext = path.rpartition('.')[-1].lower()
|
||||||
fmap[path] = os.path.join(dest, ext)
|
fmap[path] = os.path.join(dest, ext)
|
||||||
|
with open(os.path.join(dest, 'metadata.json'), 'wb') as f:
|
||||||
|
f.write(json.dumps(metadata).encode('utf-8'))
|
||||||
copy_files(fmap, delete_source=True)
|
copy_files(fmap, delete_source=True)
|
||||||
|
|
||||||
|
def get_metadata_for_trash_book(self, book_id, read_annotations=True):
|
||||||
|
from .restore import read_opf
|
||||||
|
bdir = os.path.join(self.trash_dir, 'b', str(book_id))
|
||||||
|
if not os.path.isdir(bdir):
|
||||||
|
raise ValueError(f'The book {book_id} not present in the trash folder')
|
||||||
|
mi, _, annotations = read_opf(bdir, read_annotations=read_annotations)
|
||||||
|
formats = []
|
||||||
|
for x in os.scandir(bdir):
|
||||||
|
if x.is_file() and x.name not in (COVER_FILE_NAME, 'metadata.opf') and '.' in x.name:
|
||||||
|
try:
|
||||||
|
size = x.stat(follow_symlinks=False).st_size
|
||||||
|
except OSError:
|
||||||
|
continue
|
||||||
|
fname, ext = os.path.splitext(x.name)
|
||||||
|
formats.append((ext[1:].upper(), size, fname))
|
||||||
|
return mi, annotations, formats
|
||||||
|
|
||||||
|
def move_book_from_trash(self, book_id, path):
|
||||||
|
bdir = os.path.join(self.trash_dir, 'b', str(book_id))
|
||||||
|
if not os.path.isdir(bdir):
|
||||||
|
raise ValueError(f'The book {book_id} not present in the trash folder')
|
||||||
|
dest = os.path.abspath(os.path.join(self.library_path, path))
|
||||||
|
copy_tree(bdir, dest, delete_source=True)
|
||||||
|
|
||||||
def list_trash_entries(self):
|
def list_trash_entries(self):
|
||||||
|
from calibre.ebooks.metadata.opf2 import OPF
|
||||||
self.ensure_trash_dir()
|
self.ensure_trash_dir()
|
||||||
books, files = [], []
|
books, files = [], []
|
||||||
base = os.path.join(self.trash_dir, 'b')
|
base = os.path.join(self.trash_dir, 'b')
|
||||||
|
unknown = _('Unknown')
|
||||||
|
au = (unknown,)
|
||||||
for x in os.scandir(base):
|
for x in os.scandir(base):
|
||||||
if x.is_dir(follow_symlinks=False):
|
if x.is_dir(follow_symlinks=False):
|
||||||
try:
|
try:
|
||||||
@ -1934,8 +1967,10 @@ class DB:
|
|||||||
mtime = x.stat(follow_symlinks=False).st_mtime
|
mtime = x.stat(follow_symlinks=False).st_mtime
|
||||||
except Exception:
|
except Exception:
|
||||||
continue
|
continue
|
||||||
books.append(TrashEntry(book_id, x.path, mtime))
|
opf = OPF(os.path.join(x.path, 'metadata.opf'), basedir=x.path)
|
||||||
|
books.append(TrashEntry(book_id, opf.title or unknown, (opf.authors or au)[0], x.path, mtime))
|
||||||
base = os.path.join(self.trash_dir, 'f')
|
base = os.path.join(self.trash_dir, 'f')
|
||||||
|
um = {'title': unknown, 'authors': au}
|
||||||
for x in os.scandir(base):
|
for x in os.scandir(base):
|
||||||
if x.is_dir(follow_symlinks=False):
|
if x.is_dir(follow_symlinks=False):
|
||||||
try:
|
try:
|
||||||
@ -1944,11 +1979,16 @@ class DB:
|
|||||||
except Exception:
|
except Exception:
|
||||||
continue
|
continue
|
||||||
formats = set()
|
formats = set()
|
||||||
|
metadata = um
|
||||||
for f in os.scandir(x.path):
|
for f in os.scandir(x.path):
|
||||||
if f.is_file(follow_symlinks=False):
|
if f.is_file(follow_symlinks=False):
|
||||||
|
if f.name == 'metadata.json':
|
||||||
|
with open(f.path, 'rb') as mf:
|
||||||
|
metadata = json.loads(mf.read())
|
||||||
|
else:
|
||||||
formats.add(f.name.upper())
|
formats.add(f.name.upper())
|
||||||
if formats:
|
if formats:
|
||||||
files.append(TrashEntry(book_id, x.path, mtime, tuple(formats)))
|
files.append(TrashEntry(book_id, metadata.get('title') or unknown, (metadata.get('authors') or au)[0], x.path, mtime, tuple(formats)))
|
||||||
return books, files
|
return books, files
|
||||||
|
|
||||||
def remove_books(self, path_map, permanent=False):
|
def remove_books(self, path_map, permanent=False):
|
||||||
@ -2302,11 +2342,6 @@ class DB:
|
|||||||
self.conn # Connect to the moved metadata.db
|
self.conn # Connect to the moved metadata.db
|
||||||
progress(_('Completed'), total, total)
|
progress(_('Completed'), total, total)
|
||||||
|
|
||||||
def restore_book(self, book_id, path, formats):
|
|
||||||
self.execute('UPDATE books SET path=? WHERE id=?', (path.replace(os.sep, '/'), book_id))
|
|
||||||
vals = [(book_id, fmt, size, name) for fmt, size, name in formats]
|
|
||||||
self.executemany('INSERT INTO data (book,format,uncompressed_size,name) VALUES (?,?,?,?)', vals)
|
|
||||||
|
|
||||||
def backup_database(self, path):
|
def backup_database(self, path):
|
||||||
with closing(apsw.Connection(path)) as dest_db:
|
with closing(apsw.Connection(path)) as dest_db:
|
||||||
with dest_db.backup('main', self.conn, 'main') as b:
|
with dest_db.backup('main', self.conn, 'main') as b:
|
||||||
|
@ -1858,6 +1858,7 @@ class Cache:
|
|||||||
|
|
||||||
if not db_only:
|
if not db_only:
|
||||||
removes = defaultdict(set)
|
removes = defaultdict(set)
|
||||||
|
metadata_map = {}
|
||||||
for book_id, fmts in iteritems(formats_map):
|
for book_id, fmts in iteritems(formats_map):
|
||||||
try:
|
try:
|
||||||
path = self._field_for('path', book_id).replace('/', os.sep)
|
path = self._field_for('path', book_id).replace('/', os.sep)
|
||||||
@ -1870,8 +1871,10 @@ class Cache:
|
|||||||
continue
|
continue
|
||||||
if name and path:
|
if name and path:
|
||||||
removes[book_id].add((fmt, name, path))
|
removes[book_id].add((fmt, name, path))
|
||||||
|
if removes[book_id]:
|
||||||
|
metadata_map[book_id] = {'title': self._field_for('title', book_id), 'authors': self._field_for('authors', book_id)}
|
||||||
if removes:
|
if removes:
|
||||||
self.backend.remove_formats(removes)
|
self.backend.remove_formats(removes, metadata_map)
|
||||||
|
|
||||||
size_map = table.remove_formats(formats_map, self.backend)
|
size_map = table.remove_formats(formats_map, self.backend)
|
||||||
self.fields['size'].table.update_sizes(size_map)
|
self.fields['size'].table.update_sizes(size_map)
|
||||||
@ -2660,17 +2663,46 @@ class Cache:
|
|||||||
def is_closed(self):
|
def is_closed(self):
|
||||||
return self.backend.is_closed
|
return self.backend.is_closed
|
||||||
|
|
||||||
|
@read_api
|
||||||
|
def list_trash_entries(self):
|
||||||
|
return self.backend.list_trash_entries()
|
||||||
|
|
||||||
|
@write_api
|
||||||
|
def move_book_from_trash(self, book_id):
|
||||||
|
''' Undelete a book from the trash directory '''
|
||||||
|
if self._has_id(book_id):
|
||||||
|
raise ValueError(f'A book with the id {book_id} already exists')
|
||||||
|
mi, annotations, formats = self.backend.get_metadata_for_trash_book(book_id)
|
||||||
|
mi.cover = None
|
||||||
|
self._create_book_entry(mi, add_duplicates=True,
|
||||||
|
force_id=book_id, apply_import_tags=False, preserve_uuid=True)
|
||||||
|
path = self._field_for('path', book_id).replace('/', os.sep)
|
||||||
|
self.backend.move_book_from_trash(book_id, path)
|
||||||
|
self.format_metadata_cache.pop(book_id, None)
|
||||||
|
f = self.fields['formats'].table
|
||||||
|
max_size = 0
|
||||||
|
for (fmt, size, fname) in formats:
|
||||||
|
max_size = max(max_size, f.update_fmt(book_id, fmt, fname, size, self.backend))
|
||||||
|
self.fields['size'].table.update_sizes({book_id: max_size})
|
||||||
|
cover = self.backend.cover_abspath(book_id, path)
|
||||||
|
if cover and os.path.exists(cover):
|
||||||
|
self._set_field('cover', {book_id:1})
|
||||||
|
if annotations:
|
||||||
|
self._restore_annotations(book_id, annotations)
|
||||||
|
|
||||||
@write_api
|
@write_api
|
||||||
def restore_book(self, book_id, mi, last_modified, path, formats, annotations=()):
|
def restore_book(self, book_id, mi, last_modified, path, formats, annotations=()):
|
||||||
''' Restore the book entry in the database for a book that already exists on the filesystem '''
|
''' Restore the book entry in the database for a book that already exists on the filesystem '''
|
||||||
cover = mi.cover
|
cover, mi.cover = mi.cover, None
|
||||||
mi.cover = None
|
|
||||||
self._create_book_entry(mi, add_duplicates=True,
|
self._create_book_entry(mi, add_duplicates=True,
|
||||||
force_id=book_id, apply_import_tags=False, preserve_uuid=True)
|
force_id=book_id, apply_import_tags=False, preserve_uuid=True)
|
||||||
self._update_last_modified((book_id,), last_modified)
|
self._update_last_modified((book_id,), last_modified)
|
||||||
if cover and os.path.exists(cover):
|
if cover and os.path.exists(cover):
|
||||||
self._set_field('cover', {book_id:1})
|
self._set_field('cover', {book_id:1})
|
||||||
self.backend.restore_book(book_id, path, formats)
|
f = self.fields['formats'].table
|
||||||
|
for (fmt, size, fname) in formats:
|
||||||
|
f.update_fmt(book_id, fmt, fname, size, self.backend)
|
||||||
|
self.fields['path'].table.set_path(book_id, path, self.backend)
|
||||||
if annotations:
|
if annotations:
|
||||||
self._restore_annotations(book_id, annotations)
|
self._restore_annotations(book_id, annotations)
|
||||||
|
|
||||||
|
@ -28,6 +28,26 @@ NON_EBOOK_EXTENSIONS = frozenset((
|
|||||||
))
|
))
|
||||||
|
|
||||||
|
|
||||||
|
def read_opf(dirpath, read_annotations=True):
|
||||||
|
opf = os.path.join(dirpath, 'metadata.opf')
|
||||||
|
parsed_opf = OPF(opf, basedir=dirpath)
|
||||||
|
mi = parsed_opf.to_book_metadata()
|
||||||
|
annotations = tuple(parsed_opf.read_annotations()) if read_annotations else ()
|
||||||
|
timestamp = os.path.getmtime(opf)
|
||||||
|
return mi, timestamp, annotations
|
||||||
|
|
||||||
|
|
||||||
|
def is_ebook_file(filename):
|
||||||
|
ext = os.path.splitext(filename)[1]
|
||||||
|
if not ext:
|
||||||
|
return False
|
||||||
|
ext = ext[1:].lower()
|
||||||
|
bad_ext_pat = re.compile(r'[^a-z0-9_]+')
|
||||||
|
if ext in NON_EBOOK_EXTENSIONS or bad_ext_pat.search(ext) is not None:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
class Restorer(Cache):
|
class Restorer(Cache):
|
||||||
|
|
||||||
def __init__(self, library_path, default_prefs=None, restore_all_prefs=False, progress_callback=lambda x, y:True):
|
def __init__(self, library_path, default_prefs=None, restore_all_prefs=False, progress_callback=lambda x, y:True):
|
||||||
@ -51,7 +71,6 @@ class Restore(Thread):
|
|||||||
self.src_library_path = os.path.abspath(library_path)
|
self.src_library_path = os.path.abspath(library_path)
|
||||||
self.progress_callback = progress_callback
|
self.progress_callback = progress_callback
|
||||||
self.db_id_regexp = re.compile(r'^.* \((\d+)\)$')
|
self.db_id_regexp = re.compile(r'^.* \((\d+)\)$')
|
||||||
self.bad_ext_pat = re.compile(r'[^a-z0-9_]+')
|
|
||||||
if not callable(self.progress_callback):
|
if not callable(self.progress_callback):
|
||||||
self.progress_callback = lambda x, y: x
|
self.progress_callback = lambda x, y: x
|
||||||
self.dirs = []
|
self.dirs = []
|
||||||
@ -178,29 +197,15 @@ class Restore(Thread):
|
|||||||
self.failed_dirs.append((dirpath, traceback.format_exc()))
|
self.failed_dirs.append((dirpath, traceback.format_exc()))
|
||||||
self.progress_callback(_('Processed') + ' ' + dirpath, i+1)
|
self.progress_callback(_('Processed') + ' ' + dirpath, i+1)
|
||||||
|
|
||||||
def is_ebook_file(self, filename):
|
|
||||||
ext = os.path.splitext(filename)[1]
|
|
||||||
if not ext:
|
|
||||||
return False
|
|
||||||
ext = ext[1:].lower()
|
|
||||||
if ext in NON_EBOOK_EXTENSIONS or \
|
|
||||||
self.bad_ext_pat.search(ext) is not None:
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
def process_dir(self, dirpath, filenames, book_id):
|
def process_dir(self, dirpath, filenames, book_id):
|
||||||
book_id = int(book_id)
|
book_id = int(book_id)
|
||||||
formats = list(filter(self.is_ebook_file, filenames))
|
formats = list(filter(is_ebook_file, filenames))
|
||||||
fmts = [os.path.splitext(x)[1][1:].upper() for x in formats]
|
fmts = [os.path.splitext(x)[1][1:].upper() for x in formats]
|
||||||
sizes = [os.path.getsize(os.path.join(dirpath, x)) for x in formats]
|
sizes = [os.path.getsize(os.path.join(dirpath, x)) for x in formats]
|
||||||
names = [os.path.splitext(x)[0] for x in formats]
|
names = [os.path.splitext(x)[0] for x in formats]
|
||||||
opf = os.path.join(dirpath, 'metadata.opf')
|
|
||||||
parsed_opf = OPF(opf, basedir=dirpath)
|
mi, timestamp, annotations = read_opf(dirpath)
|
||||||
mi = parsed_opf.to_book_metadata()
|
path = os.path.relpath(dirpath, self.src_library_path).replace(os.sep, '/')
|
||||||
annotations = tuple(parsed_opf.read_annotations())
|
|
||||||
timestamp = os.path.getmtime(opf)
|
|
||||||
path = os.path.relpath(dirpath, self.src_library_path).replace(os.sep,
|
|
||||||
'/')
|
|
||||||
|
|
||||||
if int(mi.application_id) == book_id:
|
if int(mi.application_id) == book_id:
|
||||||
self.books.append({
|
self.books.append({
|
||||||
|
@ -5,14 +5,17 @@ __license__ = 'GPL v3'
|
|||||||
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
|
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
|
||||||
__docformat__ = 'restructuredtext en'
|
__docformat__ = 'restructuredtext en'
|
||||||
|
|
||||||
import os, glob
|
import glob
|
||||||
|
import os
|
||||||
|
from datetime import timedelta
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from tempfile import NamedTemporaryFile
|
from tempfile import NamedTemporaryFile
|
||||||
from datetime import timedelta
|
|
||||||
|
|
||||||
from calibre.db.tests.base import BaseTest, IMG
|
from calibre.db.tests.base import IMG, BaseTest
|
||||||
from calibre.ptempfile import PersistentTemporaryFile
|
from calibre.ptempfile import PersistentTemporaryFile
|
||||||
from calibre.utils.date import now, UNDEFINED_DATE
|
from calibre.utils.date import UNDEFINED_DATE, now, utcnow
|
||||||
|
from calibre.utils.img import image_from_path
|
||||||
|
from calibre.utils.resources import get_image_path
|
||||||
from polyglot.builtins import iteritems, itervalues
|
from polyglot.builtins import iteritems, itervalues
|
||||||
|
|
||||||
|
|
||||||
@ -215,6 +218,7 @@ class AddRemoveTest(BaseTest):
|
|||||||
def test_remove_books(self): # {{{
|
def test_remove_books(self): # {{{
|
||||||
'Test removal of books'
|
'Test removal of books'
|
||||||
cl = self.cloned_library
|
cl = self.cloned_library
|
||||||
|
cl2 = self.cloned_library
|
||||||
cache = self.init_cache()
|
cache = self.init_cache()
|
||||||
af, ae = self.assertFalse, self.assertEqual
|
af, ae = self.assertFalse, self.assertEqual
|
||||||
authors = cache.fields['authors'].table
|
authors = cache.fields['authors'].table
|
||||||
@ -261,10 +265,11 @@ class AddRemoveTest(BaseTest):
|
|||||||
self.assertFalse(table.col_book_map)
|
self.assertFalse(table.col_book_map)
|
||||||
|
|
||||||
# Test the delete service
|
# Test the delete service
|
||||||
|
# test basic delete book and cache expiry
|
||||||
cache = self.init_cache(cl)
|
cache = self.init_cache(cl)
|
||||||
# Check that files are removed
|
|
||||||
fmtpath = cache.format_abspath(1, 'FMT1')
|
fmtpath = cache.format_abspath(1, 'FMT1')
|
||||||
bookpath = os.path.dirname(fmtpath)
|
bookpath = os.path.dirname(fmtpath)
|
||||||
|
title = cache.field_for('title', 1)
|
||||||
os.mkdir(os.path.join(bookpath, 'xyz'))
|
os.mkdir(os.path.join(bookpath, 'xyz'))
|
||||||
open(os.path.join(bookpath, 'xyz', 'abc'), 'w').close()
|
open(os.path.join(bookpath, 'xyz', 'abc'), 'w').close()
|
||||||
authorpath = os.path.dirname(bookpath)
|
authorpath = os.path.dirname(bookpath)
|
||||||
@ -272,10 +277,36 @@ class AddRemoveTest(BaseTest):
|
|||||||
cache.remove_books((1,))
|
cache.remove_books((1,))
|
||||||
for x in (fmtpath, bookpath, authorpath):
|
for x in (fmtpath, bookpath, authorpath):
|
||||||
af(os.path.exists(x), 'The file %s exists, when it should not' % x)
|
af(os.path.exists(x), 'The file %s exists, when it should not' % x)
|
||||||
b, f = cache.backend.list_trash_entries()
|
b, f = cache.list_trash_entries()
|
||||||
self.assertEqual(len(b), 1)
|
self.assertEqual(len(b), 1)
|
||||||
self.assertEqual(len(f), 0)
|
self.assertEqual(len(f), 0)
|
||||||
|
self.assertEqual(b[0].title, title)
|
||||||
self.assertTrue(os.path.exists(os.path.join(b[0].book_dir, 'metadata.opf')))
|
self.assertTrue(os.path.exists(os.path.join(b[0].book_dir, 'metadata.opf')))
|
||||||
|
cache.backend.expire_old_trash(1000)
|
||||||
|
self.assertTrue(os.path.exists(os.path.join(b[0].book_dir, 'metadata.opf')))
|
||||||
|
cache.backend.expire_old_trash(0)
|
||||||
|
self.assertFalse(os.path.exists(os.path.join(b[0].book_dir, 'metadata.opf')))
|
||||||
|
|
||||||
|
# test restoring of books
|
||||||
|
cache = self.init_cache(cl2)
|
||||||
|
cache.set_cover({1: image_from_path(get_image_path('lt.png', allow_user_override=False))})
|
||||||
|
fmtpath = cache.format_abspath(1, 'FMT1')
|
||||||
|
bookpath = os.path.dirname(fmtpath)
|
||||||
|
cache.set_annotations_for_book(1, 'FMT1', [({'title': 'else', 'type': 'bookmark', 'timestamp': utcnow().isoformat()}, 1)])
|
||||||
|
annots_before = cache.all_annotations_for_book(1)
|
||||||
|
fm_before = cache.format_metadata(1, 'FMT1', allow_cache=False), cache.format_metadata(1, 'FMT2', allow_cache=False)
|
||||||
|
os.mkdir(os.path.join(bookpath, 'xyz'))
|
||||||
|
open(os.path.join(bookpath, 'xyz', 'abc'), 'w').close()
|
||||||
|
cache.remove_books((1,))
|
||||||
|
cache.move_book_from_trash(1)
|
||||||
|
b, f = cache.list_trash_entries()
|
||||||
|
self.assertEqual(len(b), 0)
|
||||||
|
self.assertEqual(len(f), 0)
|
||||||
|
self.assertEqual(fmtpath, cache.format_abspath(1, 'FMT1'))
|
||||||
|
self.assertEqual(fm_before, (cache.format_metadata(1, 'FMT1', allow_cache=False), cache.format_metadata(1, 'FMT2', allow_cache=False)))
|
||||||
|
self.assertEqual(annots_before, cache.all_annotations_for_book(1))
|
||||||
|
self.assertTrue(cache.cover(1))
|
||||||
|
self.assertTrue(os.path.exists(os.path.join(bookpath, 'xyz', 'abc')))
|
||||||
# }}}
|
# }}}
|
||||||
|
|
||||||
def test_original_fmt(self): # {{{
|
def test_original_fmt(self): # {{{
|
||||||
@ -315,7 +346,7 @@ class AddRemoveTest(BaseTest):
|
|||||||
def test_copy_to_library(self): # {{{
|
def test_copy_to_library(self): # {{{
|
||||||
from calibre.db.copy_to_library import copy_one_book
|
from calibre.db.copy_to_library import copy_one_book
|
||||||
from calibre.ebooks.metadata import authors_to_string
|
from calibre.ebooks.metadata import authors_to_string
|
||||||
from calibre.utils.date import utcnow, EPOCH
|
from calibre.utils.date import EPOCH, utcnow
|
||||||
src_db = self.init_cache()
|
src_db = self.init_cache()
|
||||||
dest_db = self.init_cache(self.cloned_library)
|
dest_db = self.init_cache(self.cloned_library)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user