From 1df7047633e74b601b53eb905059b033beaeb8a7 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Sun, 21 Apr 2024 20:19:56 +0530 Subject: [PATCH] Export of calibre data: Ensure individual part files in the exported data are no larger than one gigabyte even if the library contains individual files larger than that size. Note that this means that exports created by calibre from this version on will not be importable by earlier versions. However, exports from earlier versions should still be importable. --- src/calibre/db/cache.py | 29 +-- src/calibre/db/tests/filesystem.py | 21 ++- src/calibre/utils/exim.py | 281 +++++++++++++++++++++-------- 3 files changed, 239 insertions(+), 92 deletions(-) diff --git a/src/calibre/db/cache.py b/src/calibre/db/cache.py index 9b98cd79ad..aae4e85de0 100644 --- a/src/calibre/db/cache.py +++ b/src/calibre/db/cache.py @@ -15,7 +15,6 @@ import traceback import weakref from collections import defaultdict from collections.abc import MutableSet, Set -from contextlib import closing from functools import partial, wraps from io import DEFAULT_BUFFER_SIZE, BytesIO from queue import Queue @@ -3162,10 +3161,10 @@ class Cache: if mtime is not None: mtime = timestampfromdt(mtime) with exporter.start_file(key, mtime=mtime) as dest: - self._copy_format_to(book_id, fmt, dest, report_file_size=dest.ensure_space) + self._copy_format_to(book_id, fmt, dest) cover_key = '{}:{}:{}'.format(key_prefix, book_id, '.cover') with exporter.start_file(cover_key) as dest: - if not self.copy_cover_to(book_id, dest, report_file_size=dest.ensure_space): + if not self.copy_cover_to(book_id, dest): dest.discard() else: fm['.cover'] = cover_key @@ -3442,6 +3441,7 @@ class Cache: dest_value.extend(src_value) self._set_field(field, {dest_id: dest_value}) + def import_library(library_key, importer, library_path, progress=None, abort=None): from calibre.db.backend import DB metadata = importer.metadata[library_key] @@ -3455,25 +3455,22 @@ def import_library(library_key, importer, library_path, progress=None, abort=Non report_progress('metadata.db') if abort is not None and abort.is_set(): return - with open(os.path.join(library_path, 'metadata.db'), 'wb') as f: - with closing(importer.start_file(metadata['metadata.db'], 'metadata.db for ' + library_path)) as src: - shutil.copyfileobj(src, f) + importer.save_file(metadata['metadata.db'], 'metadata.db for ' + library_path, os.path.join(library_path, 'metadata.db')) if 'full-text-search.db' in metadata: if progress is not None: progress('full-text-search.db', 1, total) if abort is not None and abort.is_set(): return poff += 1 - with open(os.path.join(library_path, 'full-text-search.db'), 'wb') as f: - with closing(importer.start_file(metadata['full-text-search.db'], 'full-text-search.db for ' + library_path)) as src: - shutil.copyfileobj(src, f) + importer.save_file(metadata['full-text-search.db'], 'full-text-search.db for ' + library_path, + os.path.join(library_path, 'full-text-search.db')) if abort is not None and abort.is_set(): return if 'notes.db' in metadata: import zipfile notes_dir = os.path.join(library_path, NOTES_DIR_NAME) os.makedirs(notes_dir, exist_ok=True) - with closing(importer.start_file(metadata['notes.db'], 'notes.db for ' + library_path)) as stream: + with importer.start_file(metadata['notes.db'], 'notes.db for ' + library_path) as stream: stream.check_hash = False with zipfile.ZipFile(stream) as zf: for zi in zf.infolist(): @@ -3482,6 +3479,8 @@ def import_library(library_key, importer, library_path, progress=None, abort=Non os.utime(tpath, (date_time, date_time)) if abort is not None and abort.is_set(): return + if importer.corrupted_files: + raise ValueError('Corrupted files:\n' + '\n'.join(importer.corrupted_files)) cache = Cache(DB(library_path, load_user_formatter_functions=False)) cache.init() @@ -3494,20 +3493,22 @@ def import_library(library_key, importer, library_path, progress=None, abort=Non if progress is not None: progress(title, i + poff, total) cache._update_path((book_id,), mark_as_dirtied=False) - for fmt, fmtkey in iteritems(fmt_key_map): + for fmt, fmtkey in fmt_key_map.items(): if fmt == '.cover': - with closing(importer.start_file(fmtkey, _('Cover for %s') % title)) as stream: + with importer.start_file(fmtkey, _('Cover for %s') % title) as stream: path = cache._field_for('path', book_id).replace('/', os.sep) cache.backend.set_cover(book_id, path, stream, no_processing=True) else: - with closing(importer.start_file(fmtkey, _('{0} format for {1}').format(fmt.upper(), title))) as stream: + with importer.start_file(fmtkey, _('{0} format for {1}').format(fmt.upper(), title)) as stream: size, fname = cache._do_add_format(book_id, fmt, stream, mtime=stream.mtime) cache.fields['formats'].table.update_fmt(book_id, fmt, fname, size, cache.backend) for relpath, efkey in extra_files.get(book_id, {}).items(): - with closing(importer.start_file(efkey, _('Extra file {0} for book {1}').format(relpath, title))) as stream: + with importer.start_file(efkey, _('Extra file {0} for book {1}').format(relpath, title)) as stream: path = cache._field_for('path', book_id).replace('/', os.sep) cache.backend.add_extra_file(relpath, stream, path) cache.dump_metadata({book_id}) + if importer.corrupted_files: + raise ValueError('Corrupted files:\n' + '\n'.join(importer.corrupted_files)) if progress is not None: progress(_('Completed'), total, total) return cache diff --git a/src/calibre/db/tests/filesystem.py b/src/calibre/db/tests/filesystem.py index 2cae7f0c35..3dc5bd5607 100644 --- a/src/calibre/db/tests/filesystem.py +++ b/src/calibre/db/tests/filesystem.py @@ -246,6 +246,21 @@ class FilesystemTest(BaseTest): def test_export_import(self): from calibre.db.cache import import_library from calibre.utils.exim import Exporter, Importer + with TemporaryDirectory('export_lib') as tdir: + for part_size in (8, 1, 1024): + exporter = Exporter(tdir, part_size=part_size + Exporter.tail_size()) + files = { + 'a': b'a' * 7, 'b': b'b' * 7, 'c': b'c' * 2, 'd': b'd' * 9, 'e': b'e' * 3, + } + for key, data in files.items(): + exporter.add_file(BytesIO(data), key) + exporter.commit() + importer = Importer(tdir) + for key, expected in files.items(): + with importer.start_file(key, key) as f: + actual = f.read() + self.assertEqual(expected, actual, key) + self.assertFalse(importer.corrupted_files) cache = self.init_cache() bookdir = os.path.dirname(cache.format_abspath(1, '__COVER_INTERNAL__')) with open(os.path.join(bookdir, 'exf'), 'w') as f: @@ -255,13 +270,14 @@ class FilesystemTest(BaseTest): f.write('recurse') self.assertEqual({ef.relpath for ef in cache.list_extra_files(1, pattern='sub/**/*')}, {'sub/recurse'}) self.assertEqual({ef.relpath for ef in cache.list_extra_files(1)}, {'exf', 'sub/recurse'}) - for part_size in (1 << 30, 100, 1): + for part_size in (512, 1027, None): with TemporaryDirectory('export_lib') as tdir, TemporaryDirectory('import_lib') as idir: - exporter = Exporter(tdir, part_size=part_size) + exporter = Exporter(tdir, part_size=part_size if part_size is None else (part_size + Exporter.tail_size())) cache.export_library('l', exporter) exporter.commit() importer = Importer(tdir) ic = import_library('l', importer, idir) + self.assertFalse(importer.corrupted_files) self.assertEqual(cache.all_book_ids(), ic.all_book_ids()) for book_id in cache.all_book_ids(): self.assertEqual(cache.cover(book_id), ic.cover(book_id), 'Covers not identical for book: %d' % book_id) @@ -290,6 +306,7 @@ class FilesystemTest(BaseTest): exporter.commit() importer = Importer(tdir) ic = import_library('l', importer, idir) + self.assertFalse(importer.corrupted_files) self.assertEqual(ic.fts_search('exim')[0]['id'], 1) self.assertEqual(cache.notes_for('authors', 2), ic.notes_for('authors', 2)) a, b = cache.get_notes_resource(r1), ic.get_notes_resource(r1) diff --git a/src/calibre/utils/exim.py b/src/calibre/utils/exim.py index b739de617d..4e12dde715 100644 --- a/src/calibre/utils/exim.py +++ b/src/calibre/utils/exim.py @@ -4,6 +4,7 @@ import errno import hashlib +import io import json import os import shutil @@ -13,6 +14,7 @@ import tempfile import time import uuid from collections import Counter +from typing import NamedTuple from calibre import prints from calibre.constants import config_dir, filesystem_encoding, iswindows @@ -30,30 +32,28 @@ class FileDest: def __init__(self, key, exporter, mtime=None): self.exporter, self.key = exporter, key self.hasher = hashlib.sha1() - self.start_pos = exporter.f.tell() + self.start_part_number, self.start_pos = exporter.current_pos() self._discard = False self.mtime = mtime + self.size = 0 def discard(self): self._discard = True - def ensure_space(self, size): - if size > 0: - self.exporter.ensure_space(size) - self.start_pos = self.exporter.f.tell() - def write(self, data): + self.size += len(data) + written = self.exporter.write(data) + if len(data) != written: + raise RuntimeError(f'Exporter failed to write all data: {len(data)} != {written}') self.hasher.update(data) - self.exporter.f.write(data) def flush(self): - self.exporter.f.flush() + pass def close(self): if not self._discard: - size = self.exporter.f.tell() - self.start_pos digest = str(self.hasher.hexdigest()) - self.exporter.file_metadata[self.key] = (len(self.exporter.parts), self.start_pos, size, digest, self.mtime) + self.exporter.file_metadata[self.key] = (self.start_part_number, self.start_pos, self.size, digest, self.mtime) del self.exporter, self.hasher def __enter__(self): @@ -65,17 +65,23 @@ class FileDest: class Exporter: - VERSION = 0 + VERSION = 1 TAIL_FMT = b'!II?' # part_num, version, is_last MDATA_SZ_FMT = b'!Q' EXT = '.calibre-data' - def __init__(self, path_to_export_dir, part_size=(1 << 30)): - self.part_size = part_size + @classmethod + def tail_size(cls): + return struct.calcsize(cls.TAIL_FMT) + + def __init__(self, path_to_export_dir, part_size=None): + # default part_size is 1 GB + self.part_size = (1 << 30) if part_size is None else part_size self.base = os.path.abspath(path_to_export_dir) - self.parts = [] - self.new_part() + self.commited_parts = [] + self.current_part = None self.file_metadata = {} + self.tail_sz = self.tail_size() self.metadata = {'file_metadata': self.file_metadata} def set_metadata(self, key, val): @@ -83,39 +89,60 @@ class Exporter: raise KeyError('The metadata already contains the key: %s' % key) self.metadata[key] = val - @property - def f(self): - return self.parts[-1] + def current_pos(self): + pos = 0 + if self.current_part is not None: + pos = self.current_part.tell() + if pos >= self.part_size - self.tail_sz: + self.new_part() + pos = 0 + return len(self.commited_parts) + 1, pos + + def write(self, data: bytes) -> int: + written = 0 + data = memoryview(data) + while len(data) > 0: + if self.current_part is None: + self.new_part() + max_size = self.part_size - self.tail_sz - self.current_part.tell() + if max_size <= 0: + self.new_part() + max_size = self.part_size - self.tail_sz + chunk = data[:max_size] + w = self.current_part.write(chunk) + data = data[w:] + written += w + return written def new_part(self): - self.parts.append(open(os.path.join( - self.base, f'part-{len(self.parts) + 1:04d}{self.EXT}'), 'wb')) + self.commit_part() + self.current_part = open(os.path.join( + self.base, f'part-{len(self.commited_parts) + 1:04d}{self.EXT}'), 'wb') def commit_part(self, is_last=False): - self.f.write(struct.pack(self.TAIL_FMT, len(self.parts), self.VERSION, is_last)) - self.f.close() - self.parts[-1] = self.f.name - - def ensure_space(self, size): - try: - if size + self.f.tell() < self.part_size: - return - except AttributeError: - raise RuntimeError('This exporter has already been committed, cannot add to it') - self.commit_part() - self.new_part() + if self.current_part is not None: + self.current_part.write(struct.pack(self.TAIL_FMT, len(self.commited_parts) + 1, self.VERSION, is_last)) + self.current_part.close() + self.commited_parts.append(self.current_part.name) + self.current_part = None def commit(self): raw = json.dumps(self.metadata, ensure_ascii=False) if not isinstance(raw, bytes): raw = raw.encode('utf-8') - self.ensure_space(len(raw)) - self.f.write(raw) - self.f.write(struct.pack(self.MDATA_SZ_FMT, len(raw))) + self.new_part() + orig, self.part_size = self.part_size, sys.maxsize + self.write(raw) + self.write(struct.pack(self.MDATA_SZ_FMT, len(raw))) + self.part_size = orig self.commit_part(is_last=True) def add_file(self, fileobj, key): - with self.start_file(key, os.fstat(fileobj.fileno()).st_mtime) as dest: + try: + mtime = os.fstat(fileobj.fileno()).st_mtime + except (io.UnsupportedOperation, OSError): + mtime = None + with self.start_file(key, mtime=mtime) as dest: shutil.copyfileobj(fileobj, dest) def start_file(self, key, mtime=None): @@ -198,47 +225,135 @@ def export(destdir, library_paths=None, dbmap=None, progress1=None, progress2=No # Import {{{ +class Chunk(NamedTuple): + part_num: int + pos_in_part: int + size: int + pos_in_file: int + + +class Pos: + + def __init__(self, part, pos_in_part, size, importer): + self.size = size + self.pos_in_file = 0 + self.chunks = chunks = [] + self.open_part = importer.open_part + self.currently_open_part = None + self.currently_open_chunk_index = -1 + + pos = 0 + while size > 0: + part_size = importer.size_of_part(part) + chunk_size = min(size, part_size - pos_in_part) + if chunk_size > 0: + chunks.append(Chunk(part, pos_in_part, chunk_size, pos)) + size -= chunk_size + pos += chunk_size + part += 1 + pos_in_part = 0 + + def close(self): + if self.currently_open_part is not None: + self.currently_open_part.close() + self.currently_open_part = None + self.currently_open_chunk_index = -1 + + def tell(self) -> int: + return self.pos_in_file + + def seek(self, amt, whence=os.SEEK_SET) -> int: + if whence == os.SEEK_SET: + new_pos_in_file = amt + if whence == os.SEEK_END: + new_pos_in_file = self.size + amt + if whence == os.SEEK_CUR: + new_pos_in_file = self.pos_in_file + amt + self.pos_in_file = max(0, min(new_pos_in_file, self.size)) + return self.pos_in_file + + def read(self, size=None): + if size is None or size < 0: + size = self.size + size = min(size, self.size) + amt_left = max(0, self.size - self.pos_in_file) + amt_to_read = min(amt_left, size) + if amt_to_read <= 0: + return b'' + start_chunk = max(0, self.currently_open_chunk_index) + num = len(self.chunks) + ans = [] + chunk_idx = -1 + for i in range(num): + chunk_idx = (start_chunk + i) % num + chunk = self.chunks[chunk_idx] + if chunk.pos_in_file <= self.pos_in_file < chunk.pos_in_file + chunk.size: + break + else: + raise ValueError(f'No chunk found containing {self.pos_in_file=}') + + while amt_to_read > 0: + try: + chunk = self.chunks[chunk_idx] + except IndexError: + break + ans.append(self._read_chunk(chunk, amt_to_read, chunk_idx)) + amt_to_read -= len(ans[-1]) + chunk_idx += 1 + return b''.join(ans) + + def _read_chunk(self, chunk, size, chunk_idx): + if self.currently_open_chunk_index != chunk_idx or self.currently_open_part is None: + self.close() + self.currently_open_part = self.open_part(chunk.part_num) + self.currently_open_chunk_index = chunk_idx + offset_from_start_of_chunk = self.pos_in_file - chunk.pos_in_file + self.currently_open_part.seek(chunk.pos_in_part + offset_from_start_of_chunk, os.SEEK_SET) + size = min(size, chunk.size - offset_from_start_of_chunk) + ans = self.currently_open_part.read(size) + self.pos_in_file += len(ans) + return ans + class FileSource: - def __init__(self, f, size, digest, description, mtime, importer): - self.f, self.size, self.digest, self.description = f, size, digest, description - self.seekable = self.f.seekable + def __init__(self, start_partnum, start_pos, size, digest, description, mtime, importer): + self.size, self.digest, self.description = size, digest, description self.mtime = mtime - self.start = f.tell() - self.end = self.start + size + self.start = start_pos + self.start_partnum = start_partnum + self.pos = Pos(start_partnum, start_pos, size, importer) self.hasher = hashlib.sha1() self.importer = importer self.check_hash = True + def seekable(self): + return False + def seek(self, amt, whence=os.SEEK_SET): - if whence == os.SEEK_SET: - return self.f.seek(self.start + amt, os.SEEK_SET) - if whence == os.SEEK_END: - return self.f.seek(self.end + amt, os.SEEK_SET) - if whence == os.SEEK_CUR: - return self.f.seek(amt, whence) + return self.pos.seek(amt, whence) def tell(self): - return self.f.tell() - self.start + return self.pos.tell() def read(self, size=None): - if size is not None and size < 1: - return b'' - left = self.end - self.f.tell() - amt = min(left, size or left) - if amt < 1: - return b'' - ans = self.f.read(amt) - if self.check_hash: + ans = self.pos.read(size) + if self.check_hash and ans: self.hasher.update(ans) return ans def close(self): if self.check_hash and self.hasher.hexdigest() != self.digest: self.importer.corrupted_files.append(self.description) - self.f.close() - self.hasher = self.f = None + self.hasher = None + self.pos.close() + self.pos = None + + def __enter__(self): + return self + + def __exit__(self, *a): + self.close() class Importer: @@ -246,11 +361,14 @@ class Importer: def __init__(self, path_to_export_dir): self.corrupted_files = [] part_map = {} - tail_size = struct.calcsize(Exporter.TAIL_FMT) + self.tail_size = tail_size = struct.calcsize(Exporter.TAIL_FMT) + self.version = -1 for name in os.listdir(path_to_export_dir): if name.lower().endswith(Exporter.EXT): path = os.path.join(path_to_export_dir, name) with open(path, 'rb') as f: + f.seek(0, os.SEEK_END) + size_of_part = f.tell() f.seek(-tail_size, os.SEEK_END) raw = f.read() if len(raw) != tail_size: @@ -260,7 +378,11 @@ class Importer: raise ValueError('The exported data in %s is not valid,' ' version (%d) is higher than maximum supported version.' ' You might need to upgrade calibre first.' % (name, version)) - part_map[part_num] = path, is_last + part_map[part_num] = path, is_last, size_of_part + if self.version == -1: + self.version = version + if version != self.version: + raise ValueError(f'The exported data in {name} is not valid as it contains a mix of parts with versions: {self.version} and {version}') nums = sorted(part_map) if not nums: raise ValueError('No exported data found in: %s' % path_to_export_dir) @@ -270,37 +392,44 @@ class Importer: raise ValueError('The last part of this exported data set is missing') if len(nums) != nums[-1]: raise ValueError('There are some parts of the exported data set missing') - self.part_map = {num:path for num, (path, is_last) in iteritems(part_map)} + self.part_map, self.part_size_map = {}, {} + for part_num, (path, is_last, size_of_part) in part_map.items(): + self.part_map[part_num] = path + self.part_size_map[part_num] = size_of_part msf = struct.calcsize(Exporter.MDATA_SZ_FMT) offset = tail_size + msf - with self.part(nums[-1]) as f: + with self.open_part(nums[-1]) as f: f.seek(-offset, os.SEEK_END) sz, = struct.unpack(Exporter.MDATA_SZ_FMT, f.read(msf)) f.seek(- sz - offset, os.SEEK_END) self.metadata = json.loads(f.read(sz)) self.file_metadata = self.metadata['file_metadata'] - def part(self, num): + def size_of_part(self, num): + return self.part_size_map[num] - self.tail_size + + def open_part(self, num): return open(self.part_map[num], 'rb') def start_file(self, key, description): partnum, pos, size, digest, mtime = self.file_metadata[key] - f = self.part(partnum) - f.seek(pos) - return FileSource(f, size, digest, description, mtime, self) + return FileSource(partnum, pos, size, digest, description, mtime, self) + + def save_file(self, key, description, output_path): + with open(output_path, 'wb') as dest, self.start_file(key, description) as src: + shutil.copyfileobj(src, dest) def export_config(self, base_dir, library_usage_stats): for key, relpath in self.metadata['config_dir']: - f = self.start_file(key, relpath) - path = os.path.join(base_dir, relpath.replace('/', os.sep)) - try: - with open(path, 'wb') as dest: - shutil.copyfileobj(f, dest) - except OSError: - os.makedirs(os.path.dirname(path)) - with open(path, 'wb') as dest: - shutil.copyfileobj(f, dest) - f.close() + with self.start_file(key, relpath) as f: + path = os.path.join(base_dir, relpath.replace('/', os.sep)) + try: + with open(path, 'wb') as dest: + shutil.copyfileobj(f, dest) + except OSError: + os.makedirs(os.path.dirname(path)) + with open(path, 'wb') as dest: + shutil.copyfileobj(f, dest) gpath = os.path.join(base_dir, 'global.py') try: with open(gpath, 'rb') as f: