diff --git a/src/calibre/db/cache.py b/src/calibre/db/cache.py index 9b98cd79ad..aae4e85de0 100644 --- a/src/calibre/db/cache.py +++ b/src/calibre/db/cache.py @@ -15,7 +15,6 @@ import traceback import weakref from collections import defaultdict from collections.abc import MutableSet, Set -from contextlib import closing from functools import partial, wraps from io import DEFAULT_BUFFER_SIZE, BytesIO from queue import Queue @@ -3162,10 +3161,10 @@ class Cache: if mtime is not None: mtime = timestampfromdt(mtime) with exporter.start_file(key, mtime=mtime) as dest: - self._copy_format_to(book_id, fmt, dest, report_file_size=dest.ensure_space) + self._copy_format_to(book_id, fmt, dest) cover_key = '{}:{}:{}'.format(key_prefix, book_id, '.cover') with exporter.start_file(cover_key) as dest: - if not self.copy_cover_to(book_id, dest, report_file_size=dest.ensure_space): + if not self.copy_cover_to(book_id, dest): dest.discard() else: fm['.cover'] = cover_key @@ -3442,6 +3441,7 @@ class Cache: dest_value.extend(src_value) self._set_field(field, {dest_id: dest_value}) + def import_library(library_key, importer, library_path, progress=None, abort=None): from calibre.db.backend import DB metadata = importer.metadata[library_key] @@ -3455,25 +3455,22 @@ def import_library(library_key, importer, library_path, progress=None, abort=Non report_progress('metadata.db') if abort is not None and abort.is_set(): return - with open(os.path.join(library_path, 'metadata.db'), 'wb') as f: - with closing(importer.start_file(metadata['metadata.db'], 'metadata.db for ' + library_path)) as src: - shutil.copyfileobj(src, f) + importer.save_file(metadata['metadata.db'], 'metadata.db for ' + library_path, os.path.join(library_path, 'metadata.db')) if 'full-text-search.db' in metadata: if progress is not None: progress('full-text-search.db', 1, total) if abort is not None and abort.is_set(): return poff += 1 - with open(os.path.join(library_path, 'full-text-search.db'), 'wb') as f: - with closing(importer.start_file(metadata['full-text-search.db'], 'full-text-search.db for ' + library_path)) as src: - shutil.copyfileobj(src, f) + importer.save_file(metadata['full-text-search.db'], 'full-text-search.db for ' + library_path, + os.path.join(library_path, 'full-text-search.db')) if abort is not None and abort.is_set(): return if 'notes.db' in metadata: import zipfile notes_dir = os.path.join(library_path, NOTES_DIR_NAME) os.makedirs(notes_dir, exist_ok=True) - with closing(importer.start_file(metadata['notes.db'], 'notes.db for ' + library_path)) as stream: + with importer.start_file(metadata['notes.db'], 'notes.db for ' + library_path) as stream: stream.check_hash = False with zipfile.ZipFile(stream) as zf: for zi in zf.infolist(): @@ -3482,6 +3479,8 @@ def import_library(library_key, importer, library_path, progress=None, abort=Non os.utime(tpath, (date_time, date_time)) if abort is not None and abort.is_set(): return + if importer.corrupted_files: + raise ValueError('Corrupted files:\n' + '\n'.join(importer.corrupted_files)) cache = Cache(DB(library_path, load_user_formatter_functions=False)) cache.init() @@ -3494,20 +3493,22 @@ def import_library(library_key, importer, library_path, progress=None, abort=Non if progress is not None: progress(title, i + poff, total) cache._update_path((book_id,), mark_as_dirtied=False) - for fmt, fmtkey in iteritems(fmt_key_map): + for fmt, fmtkey in fmt_key_map.items(): if fmt == '.cover': - with closing(importer.start_file(fmtkey, _('Cover for %s') % title)) as stream: + with importer.start_file(fmtkey, _('Cover for %s') % title) as stream: path = cache._field_for('path', book_id).replace('/', os.sep) cache.backend.set_cover(book_id, path, stream, no_processing=True) else: - with closing(importer.start_file(fmtkey, _('{0} format for {1}').format(fmt.upper(), title))) as stream: + with importer.start_file(fmtkey, _('{0} format for {1}').format(fmt.upper(), title)) as stream: size, fname = cache._do_add_format(book_id, fmt, stream, mtime=stream.mtime) cache.fields['formats'].table.update_fmt(book_id, fmt, fname, size, cache.backend) for relpath, efkey in extra_files.get(book_id, {}).items(): - with closing(importer.start_file(efkey, _('Extra file {0} for book {1}').format(relpath, title))) as stream: + with importer.start_file(efkey, _('Extra file {0} for book {1}').format(relpath, title)) as stream: path = cache._field_for('path', book_id).replace('/', os.sep) cache.backend.add_extra_file(relpath, stream, path) cache.dump_metadata({book_id}) + if importer.corrupted_files: + raise ValueError('Corrupted files:\n' + '\n'.join(importer.corrupted_files)) if progress is not None: progress(_('Completed'), total, total) return cache diff --git a/src/calibre/db/tests/filesystem.py b/src/calibre/db/tests/filesystem.py index 2cae7f0c35..3dc5bd5607 100644 --- a/src/calibre/db/tests/filesystem.py +++ b/src/calibre/db/tests/filesystem.py @@ -246,6 +246,21 @@ class FilesystemTest(BaseTest): def test_export_import(self): from calibre.db.cache import import_library from calibre.utils.exim import Exporter, Importer + with TemporaryDirectory('export_lib') as tdir: + for part_size in (8, 1, 1024): + exporter = Exporter(tdir, part_size=part_size + Exporter.tail_size()) + files = { + 'a': b'a' * 7, 'b': b'b' * 7, 'c': b'c' * 2, 'd': b'd' * 9, 'e': b'e' * 3, + } + for key, data in files.items(): + exporter.add_file(BytesIO(data), key) + exporter.commit() + importer = Importer(tdir) + for key, expected in files.items(): + with importer.start_file(key, key) as f: + actual = f.read() + self.assertEqual(expected, actual, key) + self.assertFalse(importer.corrupted_files) cache = self.init_cache() bookdir = os.path.dirname(cache.format_abspath(1, '__COVER_INTERNAL__')) with open(os.path.join(bookdir, 'exf'), 'w') as f: @@ -255,13 +270,14 @@ class FilesystemTest(BaseTest): f.write('recurse') self.assertEqual({ef.relpath for ef in cache.list_extra_files(1, pattern='sub/**/*')}, {'sub/recurse'}) self.assertEqual({ef.relpath for ef in cache.list_extra_files(1)}, {'exf', 'sub/recurse'}) - for part_size in (1 << 30, 100, 1): + for part_size in (512, 1027, None): with TemporaryDirectory('export_lib') as tdir, TemporaryDirectory('import_lib') as idir: - exporter = Exporter(tdir, part_size=part_size) + exporter = Exporter(tdir, part_size=part_size if part_size is None else (part_size + Exporter.tail_size())) cache.export_library('l', exporter) exporter.commit() importer = Importer(tdir) ic = import_library('l', importer, idir) + self.assertFalse(importer.corrupted_files) self.assertEqual(cache.all_book_ids(), ic.all_book_ids()) for book_id in cache.all_book_ids(): self.assertEqual(cache.cover(book_id), ic.cover(book_id), 'Covers not identical for book: %d' % book_id) @@ -290,6 +306,7 @@ class FilesystemTest(BaseTest): exporter.commit() importer = Importer(tdir) ic = import_library('l', importer, idir) + self.assertFalse(importer.corrupted_files) self.assertEqual(ic.fts_search('exim')[0]['id'], 1) self.assertEqual(cache.notes_for('authors', 2), ic.notes_for('authors', 2)) a, b = cache.get_notes_resource(r1), ic.get_notes_resource(r1) diff --git a/src/calibre/utils/exim.py b/src/calibre/utils/exim.py index b739de617d..4e12dde715 100644 --- a/src/calibre/utils/exim.py +++ b/src/calibre/utils/exim.py @@ -4,6 +4,7 @@ import errno import hashlib +import io import json import os import shutil @@ -13,6 +14,7 @@ import tempfile import time import uuid from collections import Counter +from typing import NamedTuple from calibre import prints from calibre.constants import config_dir, filesystem_encoding, iswindows @@ -30,30 +32,28 @@ class FileDest: def __init__(self, key, exporter, mtime=None): self.exporter, self.key = exporter, key self.hasher = hashlib.sha1() - self.start_pos = exporter.f.tell() + self.start_part_number, self.start_pos = exporter.current_pos() self._discard = False self.mtime = mtime + self.size = 0 def discard(self): self._discard = True - def ensure_space(self, size): - if size > 0: - self.exporter.ensure_space(size) - self.start_pos = self.exporter.f.tell() - def write(self, data): + self.size += len(data) + written = self.exporter.write(data) + if len(data) != written: + raise RuntimeError(f'Exporter failed to write all data: {len(data)} != {written}') self.hasher.update(data) - self.exporter.f.write(data) def flush(self): - self.exporter.f.flush() + pass def close(self): if not self._discard: - size = self.exporter.f.tell() - self.start_pos digest = str(self.hasher.hexdigest()) - self.exporter.file_metadata[self.key] = (len(self.exporter.parts), self.start_pos, size, digest, self.mtime) + self.exporter.file_metadata[self.key] = (self.start_part_number, self.start_pos, self.size, digest, self.mtime) del self.exporter, self.hasher def __enter__(self): @@ -65,17 +65,23 @@ class FileDest: class Exporter: - VERSION = 0 + VERSION = 1 TAIL_FMT = b'!II?' # part_num, version, is_last MDATA_SZ_FMT = b'!Q' EXT = '.calibre-data' - def __init__(self, path_to_export_dir, part_size=(1 << 30)): - self.part_size = part_size + @classmethod + def tail_size(cls): + return struct.calcsize(cls.TAIL_FMT) + + def __init__(self, path_to_export_dir, part_size=None): + # default part_size is 1 GB + self.part_size = (1 << 30) if part_size is None else part_size self.base = os.path.abspath(path_to_export_dir) - self.parts = [] - self.new_part() + self.commited_parts = [] + self.current_part = None self.file_metadata = {} + self.tail_sz = self.tail_size() self.metadata = {'file_metadata': self.file_metadata} def set_metadata(self, key, val): @@ -83,39 +89,60 @@ class Exporter: raise KeyError('The metadata already contains the key: %s' % key) self.metadata[key] = val - @property - def f(self): - return self.parts[-1] + def current_pos(self): + pos = 0 + if self.current_part is not None: + pos = self.current_part.tell() + if pos >= self.part_size - self.tail_sz: + self.new_part() + pos = 0 + return len(self.commited_parts) + 1, pos + + def write(self, data: bytes) -> int: + written = 0 + data = memoryview(data) + while len(data) > 0: + if self.current_part is None: + self.new_part() + max_size = self.part_size - self.tail_sz - self.current_part.tell() + if max_size <= 0: + self.new_part() + max_size = self.part_size - self.tail_sz + chunk = data[:max_size] + w = self.current_part.write(chunk) + data = data[w:] + written += w + return written def new_part(self): - self.parts.append(open(os.path.join( - self.base, f'part-{len(self.parts) + 1:04d}{self.EXT}'), 'wb')) + self.commit_part() + self.current_part = open(os.path.join( + self.base, f'part-{len(self.commited_parts) + 1:04d}{self.EXT}'), 'wb') def commit_part(self, is_last=False): - self.f.write(struct.pack(self.TAIL_FMT, len(self.parts), self.VERSION, is_last)) - self.f.close() - self.parts[-1] = self.f.name - - def ensure_space(self, size): - try: - if size + self.f.tell() < self.part_size: - return - except AttributeError: - raise RuntimeError('This exporter has already been committed, cannot add to it') - self.commit_part() - self.new_part() + if self.current_part is not None: + self.current_part.write(struct.pack(self.TAIL_FMT, len(self.commited_parts) + 1, self.VERSION, is_last)) + self.current_part.close() + self.commited_parts.append(self.current_part.name) + self.current_part = None def commit(self): raw = json.dumps(self.metadata, ensure_ascii=False) if not isinstance(raw, bytes): raw = raw.encode('utf-8') - self.ensure_space(len(raw)) - self.f.write(raw) - self.f.write(struct.pack(self.MDATA_SZ_FMT, len(raw))) + self.new_part() + orig, self.part_size = self.part_size, sys.maxsize + self.write(raw) + self.write(struct.pack(self.MDATA_SZ_FMT, len(raw))) + self.part_size = orig self.commit_part(is_last=True) def add_file(self, fileobj, key): - with self.start_file(key, os.fstat(fileobj.fileno()).st_mtime) as dest: + try: + mtime = os.fstat(fileobj.fileno()).st_mtime + except (io.UnsupportedOperation, OSError): + mtime = None + with self.start_file(key, mtime=mtime) as dest: shutil.copyfileobj(fileobj, dest) def start_file(self, key, mtime=None): @@ -198,47 +225,135 @@ def export(destdir, library_paths=None, dbmap=None, progress1=None, progress2=No # Import {{{ +class Chunk(NamedTuple): + part_num: int + pos_in_part: int + size: int + pos_in_file: int + + +class Pos: + + def __init__(self, part, pos_in_part, size, importer): + self.size = size + self.pos_in_file = 0 + self.chunks = chunks = [] + self.open_part = importer.open_part + self.currently_open_part = None + self.currently_open_chunk_index = -1 + + pos = 0 + while size > 0: + part_size = importer.size_of_part(part) + chunk_size = min(size, part_size - pos_in_part) + if chunk_size > 0: + chunks.append(Chunk(part, pos_in_part, chunk_size, pos)) + size -= chunk_size + pos += chunk_size + part += 1 + pos_in_part = 0 + + def close(self): + if self.currently_open_part is not None: + self.currently_open_part.close() + self.currently_open_part = None + self.currently_open_chunk_index = -1 + + def tell(self) -> int: + return self.pos_in_file + + def seek(self, amt, whence=os.SEEK_SET) -> int: + if whence == os.SEEK_SET: + new_pos_in_file = amt + if whence == os.SEEK_END: + new_pos_in_file = self.size + amt + if whence == os.SEEK_CUR: + new_pos_in_file = self.pos_in_file + amt + self.pos_in_file = max(0, min(new_pos_in_file, self.size)) + return self.pos_in_file + + def read(self, size=None): + if size is None or size < 0: + size = self.size + size = min(size, self.size) + amt_left = max(0, self.size - self.pos_in_file) + amt_to_read = min(amt_left, size) + if amt_to_read <= 0: + return b'' + start_chunk = max(0, self.currently_open_chunk_index) + num = len(self.chunks) + ans = [] + chunk_idx = -1 + for i in range(num): + chunk_idx = (start_chunk + i) % num + chunk = self.chunks[chunk_idx] + if chunk.pos_in_file <= self.pos_in_file < chunk.pos_in_file + chunk.size: + break + else: + raise ValueError(f'No chunk found containing {self.pos_in_file=}') + + while amt_to_read > 0: + try: + chunk = self.chunks[chunk_idx] + except IndexError: + break + ans.append(self._read_chunk(chunk, amt_to_read, chunk_idx)) + amt_to_read -= len(ans[-1]) + chunk_idx += 1 + return b''.join(ans) + + def _read_chunk(self, chunk, size, chunk_idx): + if self.currently_open_chunk_index != chunk_idx or self.currently_open_part is None: + self.close() + self.currently_open_part = self.open_part(chunk.part_num) + self.currently_open_chunk_index = chunk_idx + offset_from_start_of_chunk = self.pos_in_file - chunk.pos_in_file + self.currently_open_part.seek(chunk.pos_in_part + offset_from_start_of_chunk, os.SEEK_SET) + size = min(size, chunk.size - offset_from_start_of_chunk) + ans = self.currently_open_part.read(size) + self.pos_in_file += len(ans) + return ans + class FileSource: - def __init__(self, f, size, digest, description, mtime, importer): - self.f, self.size, self.digest, self.description = f, size, digest, description - self.seekable = self.f.seekable + def __init__(self, start_partnum, start_pos, size, digest, description, mtime, importer): + self.size, self.digest, self.description = size, digest, description self.mtime = mtime - self.start = f.tell() - self.end = self.start + size + self.start = start_pos + self.start_partnum = start_partnum + self.pos = Pos(start_partnum, start_pos, size, importer) self.hasher = hashlib.sha1() self.importer = importer self.check_hash = True + def seekable(self): + return False + def seek(self, amt, whence=os.SEEK_SET): - if whence == os.SEEK_SET: - return self.f.seek(self.start + amt, os.SEEK_SET) - if whence == os.SEEK_END: - return self.f.seek(self.end + amt, os.SEEK_SET) - if whence == os.SEEK_CUR: - return self.f.seek(amt, whence) + return self.pos.seek(amt, whence) def tell(self): - return self.f.tell() - self.start + return self.pos.tell() def read(self, size=None): - if size is not None and size < 1: - return b'' - left = self.end - self.f.tell() - amt = min(left, size or left) - if amt < 1: - return b'' - ans = self.f.read(amt) - if self.check_hash: + ans = self.pos.read(size) + if self.check_hash and ans: self.hasher.update(ans) return ans def close(self): if self.check_hash and self.hasher.hexdigest() != self.digest: self.importer.corrupted_files.append(self.description) - self.f.close() - self.hasher = self.f = None + self.hasher = None + self.pos.close() + self.pos = None + + def __enter__(self): + return self + + def __exit__(self, *a): + self.close() class Importer: @@ -246,11 +361,14 @@ class Importer: def __init__(self, path_to_export_dir): self.corrupted_files = [] part_map = {} - tail_size = struct.calcsize(Exporter.TAIL_FMT) + self.tail_size = tail_size = struct.calcsize(Exporter.TAIL_FMT) + self.version = -1 for name in os.listdir(path_to_export_dir): if name.lower().endswith(Exporter.EXT): path = os.path.join(path_to_export_dir, name) with open(path, 'rb') as f: + f.seek(0, os.SEEK_END) + size_of_part = f.tell() f.seek(-tail_size, os.SEEK_END) raw = f.read() if len(raw) != tail_size: @@ -260,7 +378,11 @@ class Importer: raise ValueError('The exported data in %s is not valid,' ' version (%d) is higher than maximum supported version.' ' You might need to upgrade calibre first.' % (name, version)) - part_map[part_num] = path, is_last + part_map[part_num] = path, is_last, size_of_part + if self.version == -1: + self.version = version + if version != self.version: + raise ValueError(f'The exported data in {name} is not valid as it contains a mix of parts with versions: {self.version} and {version}') nums = sorted(part_map) if not nums: raise ValueError('No exported data found in: %s' % path_to_export_dir) @@ -270,37 +392,44 @@ class Importer: raise ValueError('The last part of this exported data set is missing') if len(nums) != nums[-1]: raise ValueError('There are some parts of the exported data set missing') - self.part_map = {num:path for num, (path, is_last) in iteritems(part_map)} + self.part_map, self.part_size_map = {}, {} + for part_num, (path, is_last, size_of_part) in part_map.items(): + self.part_map[part_num] = path + self.part_size_map[part_num] = size_of_part msf = struct.calcsize(Exporter.MDATA_SZ_FMT) offset = tail_size + msf - with self.part(nums[-1]) as f: + with self.open_part(nums[-1]) as f: f.seek(-offset, os.SEEK_END) sz, = struct.unpack(Exporter.MDATA_SZ_FMT, f.read(msf)) f.seek(- sz - offset, os.SEEK_END) self.metadata = json.loads(f.read(sz)) self.file_metadata = self.metadata['file_metadata'] - def part(self, num): + def size_of_part(self, num): + return self.part_size_map[num] - self.tail_size + + def open_part(self, num): return open(self.part_map[num], 'rb') def start_file(self, key, description): partnum, pos, size, digest, mtime = self.file_metadata[key] - f = self.part(partnum) - f.seek(pos) - return FileSource(f, size, digest, description, mtime, self) + return FileSource(partnum, pos, size, digest, description, mtime, self) + + def save_file(self, key, description, output_path): + with open(output_path, 'wb') as dest, self.start_file(key, description) as src: + shutil.copyfileobj(src, dest) def export_config(self, base_dir, library_usage_stats): for key, relpath in self.metadata['config_dir']: - f = self.start_file(key, relpath) - path = os.path.join(base_dir, relpath.replace('/', os.sep)) - try: - with open(path, 'wb') as dest: - shutil.copyfileobj(f, dest) - except OSError: - os.makedirs(os.path.dirname(path)) - with open(path, 'wb') as dest: - shutil.copyfileobj(f, dest) - f.close() + with self.start_file(key, relpath) as f: + path = os.path.join(base_dir, relpath.replace('/', os.sep)) + try: + with open(path, 'wb') as dest: + shutil.copyfileobj(f, dest) + except OSError: + os.makedirs(os.path.dirname(path)) + with open(path, 'wb') as dest: + shutil.copyfileobj(f, dest) gpath = os.path.join(base_dir, 'global.py') try: with open(gpath, 'rb') as f: