Export of calibre data: Ensure individual part files in the exported data are no larger than one gigabyte even if the library contains individual files larger than that size.

Note that this means that exports created by calibre from this version
on will not be importable by earlier versions. However, exports from
earlier versions should still be importable.
This commit is contained in:
Kovid Goyal 2024-04-21 20:19:56 +05:30
parent bcc8ea4d5e
commit 1df7047633
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
3 changed files with 239 additions and 92 deletions

View File

@ -15,7 +15,6 @@ import traceback
import weakref
from collections import defaultdict
from collections.abc import MutableSet, Set
from contextlib import closing
from functools import partial, wraps
from io import DEFAULT_BUFFER_SIZE, BytesIO
from queue import Queue
@ -3162,10 +3161,10 @@ class Cache:
if mtime is not None:
mtime = timestampfromdt(mtime)
with exporter.start_file(key, mtime=mtime) as dest:
self._copy_format_to(book_id, fmt, dest, report_file_size=dest.ensure_space)
self._copy_format_to(book_id, fmt, dest)
cover_key = '{}:{}:{}'.format(key_prefix, book_id, '.cover')
with exporter.start_file(cover_key) as dest:
if not self.copy_cover_to(book_id, dest, report_file_size=dest.ensure_space):
if not self.copy_cover_to(book_id, dest):
dest.discard()
else:
fm['.cover'] = cover_key
@ -3442,6 +3441,7 @@ class Cache:
dest_value.extend(src_value)
self._set_field(field, {dest_id: dest_value})
def import_library(library_key, importer, library_path, progress=None, abort=None):
from calibre.db.backend import DB
metadata = importer.metadata[library_key]
@ -3455,25 +3455,22 @@ def import_library(library_key, importer, library_path, progress=None, abort=Non
report_progress('metadata.db')
if abort is not None and abort.is_set():
return
with open(os.path.join(library_path, 'metadata.db'), 'wb') as f:
with closing(importer.start_file(metadata['metadata.db'], 'metadata.db for ' + library_path)) as src:
shutil.copyfileobj(src, f)
importer.save_file(metadata['metadata.db'], 'metadata.db for ' + library_path, os.path.join(library_path, 'metadata.db'))
if 'full-text-search.db' in metadata:
if progress is not None:
progress('full-text-search.db', 1, total)
if abort is not None and abort.is_set():
return
poff += 1
with open(os.path.join(library_path, 'full-text-search.db'), 'wb') as f:
with closing(importer.start_file(metadata['full-text-search.db'], 'full-text-search.db for ' + library_path)) as src:
shutil.copyfileobj(src, f)
importer.save_file(metadata['full-text-search.db'], 'full-text-search.db for ' + library_path,
os.path.join(library_path, 'full-text-search.db'))
if abort is not None and abort.is_set():
return
if 'notes.db' in metadata:
import zipfile
notes_dir = os.path.join(library_path, NOTES_DIR_NAME)
os.makedirs(notes_dir, exist_ok=True)
with closing(importer.start_file(metadata['notes.db'], 'notes.db for ' + library_path)) as stream:
with importer.start_file(metadata['notes.db'], 'notes.db for ' + library_path) as stream:
stream.check_hash = False
with zipfile.ZipFile(stream) as zf:
for zi in zf.infolist():
@ -3482,6 +3479,8 @@ def import_library(library_key, importer, library_path, progress=None, abort=Non
os.utime(tpath, (date_time, date_time))
if abort is not None and abort.is_set():
return
if importer.corrupted_files:
raise ValueError('Corrupted files:\n' + '\n'.join(importer.corrupted_files))
cache = Cache(DB(library_path, load_user_formatter_functions=False))
cache.init()
@ -3494,20 +3493,22 @@ def import_library(library_key, importer, library_path, progress=None, abort=Non
if progress is not None:
progress(title, i + poff, total)
cache._update_path((book_id,), mark_as_dirtied=False)
for fmt, fmtkey in iteritems(fmt_key_map):
for fmt, fmtkey in fmt_key_map.items():
if fmt == '.cover':
with closing(importer.start_file(fmtkey, _('Cover for %s') % title)) as stream:
with importer.start_file(fmtkey, _('Cover for %s') % title) as stream:
path = cache._field_for('path', book_id).replace('/', os.sep)
cache.backend.set_cover(book_id, path, stream, no_processing=True)
else:
with closing(importer.start_file(fmtkey, _('{0} format for {1}').format(fmt.upper(), title))) as stream:
with importer.start_file(fmtkey, _('{0} format for {1}').format(fmt.upper(), title)) as stream:
size, fname = cache._do_add_format(book_id, fmt, stream, mtime=stream.mtime)
cache.fields['formats'].table.update_fmt(book_id, fmt, fname, size, cache.backend)
for relpath, efkey in extra_files.get(book_id, {}).items():
with closing(importer.start_file(efkey, _('Extra file {0} for book {1}').format(relpath, title))) as stream:
with importer.start_file(efkey, _('Extra file {0} for book {1}').format(relpath, title)) as stream:
path = cache._field_for('path', book_id).replace('/', os.sep)
cache.backend.add_extra_file(relpath, stream, path)
cache.dump_metadata({book_id})
if importer.corrupted_files:
raise ValueError('Corrupted files:\n' + '\n'.join(importer.corrupted_files))
if progress is not None:
progress(_('Completed'), total, total)
return cache

View File

@ -246,6 +246,21 @@ class FilesystemTest(BaseTest):
def test_export_import(self):
from calibre.db.cache import import_library
from calibre.utils.exim import Exporter, Importer
with TemporaryDirectory('export_lib') as tdir:
for part_size in (8, 1, 1024):
exporter = Exporter(tdir, part_size=part_size + Exporter.tail_size())
files = {
'a': b'a' * 7, 'b': b'b' * 7, 'c': b'c' * 2, 'd': b'd' * 9, 'e': b'e' * 3,
}
for key, data in files.items():
exporter.add_file(BytesIO(data), key)
exporter.commit()
importer = Importer(tdir)
for key, expected in files.items():
with importer.start_file(key, key) as f:
actual = f.read()
self.assertEqual(expected, actual, key)
self.assertFalse(importer.corrupted_files)
cache = self.init_cache()
bookdir = os.path.dirname(cache.format_abspath(1, '__COVER_INTERNAL__'))
with open(os.path.join(bookdir, 'exf'), 'w') as f:
@ -255,13 +270,14 @@ class FilesystemTest(BaseTest):
f.write('recurse')
self.assertEqual({ef.relpath for ef in cache.list_extra_files(1, pattern='sub/**/*')}, {'sub/recurse'})
self.assertEqual({ef.relpath for ef in cache.list_extra_files(1)}, {'exf', 'sub/recurse'})
for part_size in (1 << 30, 100, 1):
for part_size in (512, 1027, None):
with TemporaryDirectory('export_lib') as tdir, TemporaryDirectory('import_lib') as idir:
exporter = Exporter(tdir, part_size=part_size)
exporter = Exporter(tdir, part_size=part_size if part_size is None else (part_size + Exporter.tail_size()))
cache.export_library('l', exporter)
exporter.commit()
importer = Importer(tdir)
ic = import_library('l', importer, idir)
self.assertFalse(importer.corrupted_files)
self.assertEqual(cache.all_book_ids(), ic.all_book_ids())
for book_id in cache.all_book_ids():
self.assertEqual(cache.cover(book_id), ic.cover(book_id), 'Covers not identical for book: %d' % book_id)
@ -290,6 +306,7 @@ class FilesystemTest(BaseTest):
exporter.commit()
importer = Importer(tdir)
ic = import_library('l', importer, idir)
self.assertFalse(importer.corrupted_files)
self.assertEqual(ic.fts_search('exim')[0]['id'], 1)
self.assertEqual(cache.notes_for('authors', 2), ic.notes_for('authors', 2))
a, b = cache.get_notes_resource(r1), ic.get_notes_resource(r1)

View File

@ -4,6 +4,7 @@
import errno
import hashlib
import io
import json
import os
import shutil
@ -13,6 +14,7 @@ import tempfile
import time
import uuid
from collections import Counter
from typing import NamedTuple
from calibre import prints
from calibre.constants import config_dir, filesystem_encoding, iswindows
@ -30,30 +32,28 @@ class FileDest:
def __init__(self, key, exporter, mtime=None):
self.exporter, self.key = exporter, key
self.hasher = hashlib.sha1()
self.start_pos = exporter.f.tell()
self.start_part_number, self.start_pos = exporter.current_pos()
self._discard = False
self.mtime = mtime
self.size = 0
def discard(self):
self._discard = True
def ensure_space(self, size):
if size > 0:
self.exporter.ensure_space(size)
self.start_pos = self.exporter.f.tell()
def write(self, data):
self.size += len(data)
written = self.exporter.write(data)
if len(data) != written:
raise RuntimeError(f'Exporter failed to write all data: {len(data)} != {written}')
self.hasher.update(data)
self.exporter.f.write(data)
def flush(self):
self.exporter.f.flush()
pass
def close(self):
if not self._discard:
size = self.exporter.f.tell() - self.start_pos
digest = str(self.hasher.hexdigest())
self.exporter.file_metadata[self.key] = (len(self.exporter.parts), self.start_pos, size, digest, self.mtime)
self.exporter.file_metadata[self.key] = (self.start_part_number, self.start_pos, self.size, digest, self.mtime)
del self.exporter, self.hasher
def __enter__(self):
@ -65,17 +65,23 @@ class FileDest:
class Exporter:
VERSION = 0
VERSION = 1
TAIL_FMT = b'!II?' # part_num, version, is_last
MDATA_SZ_FMT = b'!Q'
EXT = '.calibre-data'
def __init__(self, path_to_export_dir, part_size=(1 << 30)):
self.part_size = part_size
@classmethod
def tail_size(cls):
return struct.calcsize(cls.TAIL_FMT)
def __init__(self, path_to_export_dir, part_size=None):
# default part_size is 1 GB
self.part_size = (1 << 30) if part_size is None else part_size
self.base = os.path.abspath(path_to_export_dir)
self.parts = []
self.new_part()
self.commited_parts = []
self.current_part = None
self.file_metadata = {}
self.tail_sz = self.tail_size()
self.metadata = {'file_metadata': self.file_metadata}
def set_metadata(self, key, val):
@ -83,39 +89,60 @@ class Exporter:
raise KeyError('The metadata already contains the key: %s' % key)
self.metadata[key] = val
@property
def f(self):
return self.parts[-1]
def current_pos(self):
pos = 0
if self.current_part is not None:
pos = self.current_part.tell()
if pos >= self.part_size - self.tail_sz:
self.new_part()
pos = 0
return len(self.commited_parts) + 1, pos
def write(self, data: bytes) -> int:
written = 0
data = memoryview(data)
while len(data) > 0:
if self.current_part is None:
self.new_part()
max_size = self.part_size - self.tail_sz - self.current_part.tell()
if max_size <= 0:
self.new_part()
max_size = self.part_size - self.tail_sz
chunk = data[:max_size]
w = self.current_part.write(chunk)
data = data[w:]
written += w
return written
def new_part(self):
self.parts.append(open(os.path.join(
self.base, f'part-{len(self.parts) + 1:04d}{self.EXT}'), 'wb'))
self.commit_part()
self.current_part = open(os.path.join(
self.base, f'part-{len(self.commited_parts) + 1:04d}{self.EXT}'), 'wb')
def commit_part(self, is_last=False):
self.f.write(struct.pack(self.TAIL_FMT, len(self.parts), self.VERSION, is_last))
self.f.close()
self.parts[-1] = self.f.name
def ensure_space(self, size):
try:
if size + self.f.tell() < self.part_size:
return
except AttributeError:
raise RuntimeError('This exporter has already been committed, cannot add to it')
self.commit_part()
self.new_part()
if self.current_part is not None:
self.current_part.write(struct.pack(self.TAIL_FMT, len(self.commited_parts) + 1, self.VERSION, is_last))
self.current_part.close()
self.commited_parts.append(self.current_part.name)
self.current_part = None
def commit(self):
raw = json.dumps(self.metadata, ensure_ascii=False)
if not isinstance(raw, bytes):
raw = raw.encode('utf-8')
self.ensure_space(len(raw))
self.f.write(raw)
self.f.write(struct.pack(self.MDATA_SZ_FMT, len(raw)))
self.new_part()
orig, self.part_size = self.part_size, sys.maxsize
self.write(raw)
self.write(struct.pack(self.MDATA_SZ_FMT, len(raw)))
self.part_size = orig
self.commit_part(is_last=True)
def add_file(self, fileobj, key):
with self.start_file(key, os.fstat(fileobj.fileno()).st_mtime) as dest:
try:
mtime = os.fstat(fileobj.fileno()).st_mtime
except (io.UnsupportedOperation, OSError):
mtime = None
with self.start_file(key, mtime=mtime) as dest:
shutil.copyfileobj(fileobj, dest)
def start_file(self, key, mtime=None):
@ -198,47 +225,135 @@ def export(destdir, library_paths=None, dbmap=None, progress1=None, progress2=No
# Import {{{
class Chunk(NamedTuple):
part_num: int
pos_in_part: int
size: int
pos_in_file: int
class Pos:
def __init__(self, part, pos_in_part, size, importer):
self.size = size
self.pos_in_file = 0
self.chunks = chunks = []
self.open_part = importer.open_part
self.currently_open_part = None
self.currently_open_chunk_index = -1
pos = 0
while size > 0:
part_size = importer.size_of_part(part)
chunk_size = min(size, part_size - pos_in_part)
if chunk_size > 0:
chunks.append(Chunk(part, pos_in_part, chunk_size, pos))
size -= chunk_size
pos += chunk_size
part += 1
pos_in_part = 0
def close(self):
if self.currently_open_part is not None:
self.currently_open_part.close()
self.currently_open_part = None
self.currently_open_chunk_index = -1
def tell(self) -> int:
return self.pos_in_file
def seek(self, amt, whence=os.SEEK_SET) -> int:
if whence == os.SEEK_SET:
new_pos_in_file = amt
if whence == os.SEEK_END:
new_pos_in_file = self.size + amt
if whence == os.SEEK_CUR:
new_pos_in_file = self.pos_in_file + amt
self.pos_in_file = max(0, min(new_pos_in_file, self.size))
return self.pos_in_file
def read(self, size=None):
if size is None or size < 0:
size = self.size
size = min(size, self.size)
amt_left = max(0, self.size - self.pos_in_file)
amt_to_read = min(amt_left, size)
if amt_to_read <= 0:
return b''
start_chunk = max(0, self.currently_open_chunk_index)
num = len(self.chunks)
ans = []
chunk_idx = -1
for i in range(num):
chunk_idx = (start_chunk + i) % num
chunk = self.chunks[chunk_idx]
if chunk.pos_in_file <= self.pos_in_file < chunk.pos_in_file + chunk.size:
break
else:
raise ValueError(f'No chunk found containing {self.pos_in_file=}')
while amt_to_read > 0:
try:
chunk = self.chunks[chunk_idx]
except IndexError:
break
ans.append(self._read_chunk(chunk, amt_to_read, chunk_idx))
amt_to_read -= len(ans[-1])
chunk_idx += 1
return b''.join(ans)
def _read_chunk(self, chunk, size, chunk_idx):
if self.currently_open_chunk_index != chunk_idx or self.currently_open_part is None:
self.close()
self.currently_open_part = self.open_part(chunk.part_num)
self.currently_open_chunk_index = chunk_idx
offset_from_start_of_chunk = self.pos_in_file - chunk.pos_in_file
self.currently_open_part.seek(chunk.pos_in_part + offset_from_start_of_chunk, os.SEEK_SET)
size = min(size, chunk.size - offset_from_start_of_chunk)
ans = self.currently_open_part.read(size)
self.pos_in_file += len(ans)
return ans
class FileSource:
def __init__(self, f, size, digest, description, mtime, importer):
self.f, self.size, self.digest, self.description = f, size, digest, description
self.seekable = self.f.seekable
def __init__(self, start_partnum, start_pos, size, digest, description, mtime, importer):
self.size, self.digest, self.description = size, digest, description
self.mtime = mtime
self.start = f.tell()
self.end = self.start + size
self.start = start_pos
self.start_partnum = start_partnum
self.pos = Pos(start_partnum, start_pos, size, importer)
self.hasher = hashlib.sha1()
self.importer = importer
self.check_hash = True
def seekable(self):
return False
def seek(self, amt, whence=os.SEEK_SET):
if whence == os.SEEK_SET:
return self.f.seek(self.start + amt, os.SEEK_SET)
if whence == os.SEEK_END:
return self.f.seek(self.end + amt, os.SEEK_SET)
if whence == os.SEEK_CUR:
return self.f.seek(amt, whence)
return self.pos.seek(amt, whence)
def tell(self):
return self.f.tell() - self.start
return self.pos.tell()
def read(self, size=None):
if size is not None and size < 1:
return b''
left = self.end - self.f.tell()
amt = min(left, size or left)
if amt < 1:
return b''
ans = self.f.read(amt)
if self.check_hash:
ans = self.pos.read(size)
if self.check_hash and ans:
self.hasher.update(ans)
return ans
def close(self):
if self.check_hash and self.hasher.hexdigest() != self.digest:
self.importer.corrupted_files.append(self.description)
self.f.close()
self.hasher = self.f = None
self.hasher = None
self.pos.close()
self.pos = None
def __enter__(self):
return self
def __exit__(self, *a):
self.close()
class Importer:
@ -246,11 +361,14 @@ class Importer:
def __init__(self, path_to_export_dir):
self.corrupted_files = []
part_map = {}
tail_size = struct.calcsize(Exporter.TAIL_FMT)
self.tail_size = tail_size = struct.calcsize(Exporter.TAIL_FMT)
self.version = -1
for name in os.listdir(path_to_export_dir):
if name.lower().endswith(Exporter.EXT):
path = os.path.join(path_to_export_dir, name)
with open(path, 'rb') as f:
f.seek(0, os.SEEK_END)
size_of_part = f.tell()
f.seek(-tail_size, os.SEEK_END)
raw = f.read()
if len(raw) != tail_size:
@ -260,7 +378,11 @@ class Importer:
raise ValueError('The exported data in %s is not valid,'
' version (%d) is higher than maximum supported version.'
' You might need to upgrade calibre first.' % (name, version))
part_map[part_num] = path, is_last
part_map[part_num] = path, is_last, size_of_part
if self.version == -1:
self.version = version
if version != self.version:
raise ValueError(f'The exported data in {name} is not valid as it contains a mix of parts with versions: {self.version} and {version}')
nums = sorted(part_map)
if not nums:
raise ValueError('No exported data found in: %s' % path_to_export_dir)
@ -270,28 +392,36 @@ class Importer:
raise ValueError('The last part of this exported data set is missing')
if len(nums) != nums[-1]:
raise ValueError('There are some parts of the exported data set missing')
self.part_map = {num:path for num, (path, is_last) in iteritems(part_map)}
self.part_map, self.part_size_map = {}, {}
for part_num, (path, is_last, size_of_part) in part_map.items():
self.part_map[part_num] = path
self.part_size_map[part_num] = size_of_part
msf = struct.calcsize(Exporter.MDATA_SZ_FMT)
offset = tail_size + msf
with self.part(nums[-1]) as f:
with self.open_part(nums[-1]) as f:
f.seek(-offset, os.SEEK_END)
sz, = struct.unpack(Exporter.MDATA_SZ_FMT, f.read(msf))
f.seek(- sz - offset, os.SEEK_END)
self.metadata = json.loads(f.read(sz))
self.file_metadata = self.metadata['file_metadata']
def part(self, num):
def size_of_part(self, num):
return self.part_size_map[num] - self.tail_size
def open_part(self, num):
return open(self.part_map[num], 'rb')
def start_file(self, key, description):
partnum, pos, size, digest, mtime = self.file_metadata[key]
f = self.part(partnum)
f.seek(pos)
return FileSource(f, size, digest, description, mtime, self)
return FileSource(partnum, pos, size, digest, description, mtime, self)
def save_file(self, key, description, output_path):
with open(output_path, 'wb') as dest, self.start_file(key, description) as src:
shutil.copyfileobj(src, dest)
def export_config(self, base_dir, library_usage_stats):
for key, relpath in self.metadata['config_dir']:
f = self.start_file(key, relpath)
with self.start_file(key, relpath) as f:
path = os.path.join(base_dir, relpath.replace('/', os.sep))
try:
with open(path, 'wb') as dest:
@ -300,7 +430,6 @@ class Importer:
os.makedirs(os.path.dirname(path))
with open(path, 'wb') as dest:
shutil.copyfileobj(f, dest)
f.close()
gpath = os.path.join(base_dir, 'global.py')
try:
with open(gpath, 'rb') as f: