Add a test to ensure restoring a db from folders preserves extra files

Also clean up the restore folder scanning logic a bit
This commit is contained in:
Kovid Goyal 2023-04-17 17:15:58 +05:30
parent ce4238e8a1
commit c049052dc9
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 53 additions and 15 deletions

View File

@ -8,6 +8,7 @@ __docformat__ = 'restructuredtext en'
import os
import re
import shutil
import sys
import time
import traceback
from contextlib import suppress
@ -16,7 +17,7 @@ from threading import Thread
from calibre import force_unicode, isbytestring
from calibre.constants import filesystem_encoding
from calibre.db.backend import DB, TRASH_DIR_NAME, DBPrefs
from calibre.db.backend import DB, METADATA_FILE_NAME, TRASH_DIR_NAME, DBPrefs
from calibre.db.cache import Cache
from calibre.ebooks.metadata.opf2 import OPF
from calibre.ptempfile import TemporaryDirectory
@ -29,7 +30,7 @@ NON_EBOOK_EXTENSIONS = frozenset((
def read_opf(dirpath, read_annotations=True):
opf = os.path.join(dirpath, 'metadata.opf')
opf = os.path.join(dirpath, METADATA_FILE_NAME)
parsed_opf = OPF(opf, basedir=dirpath)
mi = parsed_opf.to_book_metadata()
annotations = tuple(parsed_opf.read_annotations()) if read_annotations else ()
@ -74,7 +75,6 @@ class Restore(Thread):
if not callable(self.progress_callback):
self.progress_callback = lambda x, y: x
self.dirs = []
self.ignored_dirs = []
self.failed_dirs = []
self.books = []
self.conflicting_custom_cols = {}
@ -183,26 +183,37 @@ class Restore(Thread):
dirnames.remove(TRASH_DIR_NAME)
leaf = os.path.basename(dirpath)
m = self.db_id_regexp.search(leaf)
if m is None or 'metadata.opf' not in filenames:
self.ignored_dirs.append(dirpath)
if m is None or METADATA_FILE_NAME not in filenames:
continue
self.dirs.append((dirpath, filenames, m.group(1)))
self.dirs.append((dirpath, list(dirnames), filenames, m.group(1)))
del dirnames[:]
self.progress_callback(None, len(self.dirs))
for i, x in enumerate(self.dirs):
dirpath, filenames, book_id = x
for i, (dirpath, dirnames, filenames, book_id) in enumerate(self.dirs):
try:
self.process_dir(dirpath, filenames, book_id)
except:
self.process_dir(dirpath, dirnames, filenames, book_id)
except Exception:
self.failed_dirs.append((dirpath, traceback.format_exc()))
self.progress_callback(_('Processed') + ' ' + dirpath, i+1)
def process_dir(self, dirpath, filenames, book_id):
def process_dir(self, dirpath, dirnames, filenames, book_id):
book_id = int(book_id)
formats = list(filter(is_ebook_file, filenames))
fmts = [os.path.splitext(x)[1][1:].upper() for x in formats]
sizes = [os.path.getsize(os.path.join(dirpath, x)) for x in formats]
names = [os.path.splitext(x)[0] for x in formats]
def safe_mtime(path):
with suppress(OSError):
return os.path.getmtime(path)
return sys.maxsize
filenames.sort(key=lambda f: safe_mtime(os.path.join(dirpath, filenames)))
fmt_map = {}
fmts, formats, sizes, names = [], [], [], []
for x in filenames:
if is_ebook_file(x):
fmt = os.path.splitext(x)[1][1:].upper()
if fmt and fmt_map.setdefault(fmt, x) is x:
formats.append(x)
sizes.append(os.path.getsize(os.path.join(dirpath, x)))
names.append(os.path.splitext(x)[0])
fmts.append(fmt)
mi, timestamp, annotations = read_opf(dirpath)
path = os.path.relpath(dirpath, self.src_library_path).replace(os.sep, '/')

View File

@ -5,6 +5,7 @@ __license__ = 'GPL v3'
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import os
from collections import namedtuple
from functools import partial
from io import BytesIO
@ -368,6 +369,22 @@ class WritingTest(BaseTest):
af(mb.is_alive())
from calibre.ebooks.metadata.opf2 import OPF
book_ids = (1,2,3)
def read_all_formats():
fbefore = {}
for book_id in book_ids:
ff = fbefore[book_id] = {}
for fmt in cache.formats(book_id):
ff[fmt] = cache.format(book_id, fmt)
return fbefore
def read_all_extra_files(book_id=1):
ans = {}
bp = cache.field_for('path', book_id)
for (relpath, fobj, mtime) in cache.backend.iter_extra_files(book_id, bp, cache.fields['formats']):
ans[relpath] = fobj.read()
return ans
for book_id in book_ids:
raw = cache.read_backup(book_id)
opf = OPF(BytesIO(raw))
@ -376,6 +393,14 @@ class WritingTest(BaseTest):
tested_fields = 'title authors tags'.split()
before = {f:cache.all_field_for(f, book_ids) for f in tested_fields}
lbefore = tuple(cache.get_all_link_maps_for_book(i) for i in book_ids)
fbefore = read_all_formats()
bookdir = os.path.dirname(cache.format_abspath(1, '__COVER_INTERNAL__'))
with open(os.path.join(bookdir, 'exf'), 'w') as f:
f.write('exf')
os.mkdir(os.path.join(bookdir, 'sub'))
with open(os.path.join(bookdir, 'sub', 'recurse'), 'w') as f:
f.write('recurse')
ebefore = read_all_extra_files()
cache.close()
from calibre.db.restore import Restore
restorer = Restore(cl)
@ -385,6 +410,8 @@ class WritingTest(BaseTest):
cache = self.init_cache(cl)
ae(before, {f:cache.all_field_for(f, book_ids) for f in tested_fields})
ae(lbefore, tuple(cache.get_all_link_maps_for_book(i) for i in book_ids))
ae(fbefore, read_all_formats())
ae(ebefore, read_all_extra_files())
# }}}
def test_set_cover(self): # {{{