From 5c5ac19935486d9f488c5f344ba33b27488f6d0e Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Mon, 10 Apr 2023 15:16:38 +0530 Subject: [PATCH] Code to list entries in trash --- src/calibre/db/backend.py | 56 +++++++++++++++++++++++++----- src/calibre/db/tests/add_remove.py | 7 +++- 2 files changed, 53 insertions(+), 10 deletions(-) diff --git a/src/calibre/db/backend.py b/src/calibre/db/backend.py index 10d2695185..2e5a3f783f 100644 --- a/src/calibre/db/backend.py +++ b/src/calibre/db/backend.py @@ -16,7 +16,9 @@ import sys import time import uuid from contextlib import closing, suppress +from dataclasses import dataclass from functools import partial +from typing import Sequence from calibre import as_unicode, force_unicode, isbytestring, prints from calibre.constants import ( @@ -35,7 +37,7 @@ from calibre.library.field_metadata import FieldMetadata from calibre.ptempfile import PersistentTemporaryFile, TemporaryFile from calibre.utils import pickle_binary_string, unpickle_binary_string from calibre.utils.config import from_json, prefs, to_json, tweaks -from calibre.utils.copy_files import copy_tree, copy_files +from calibre.utils.copy_files import copy_files, copy_tree from calibre.utils.date import EPOCH, parse_date, utcfromtimestamp, utcnow from calibre.utils.filenames import ( WindowsAtomicFolderMove, ascii_filename, atomic_rename, copyfile_using_links, @@ -54,7 +56,7 @@ from polyglot.builtins import ( # }}} - +COVER_FILE_NAME = 'cover.jpg' TRASH_DIR_NAME = '.caltrash' BOOK_ID_PATH_TEMPLATE = ' ({})' CUSTOM_DATA_TYPES = frozenset(('rating', 'text', 'comments', 'datetime', @@ -62,6 +64,14 @@ CUSTOM_DATA_TYPES = frozenset(('rating', 'text', 'comments', 'datetime', WINDOWS_RESERVED_NAMES = frozenset('CON PRN AUX NUL COM1 COM2 COM3 COM4 COM5 COM6 COM7 COM8 COM9 LPT1 LPT2 LPT3 LPT4 LPT5 LPT6 LPT7 LPT8 LPT9'.split()) +@dataclass +class TrashEntry: + book_id: int + book_dir: str + mtime: float + formats: Sequence[str] = () + + class DynamicFilter: # {{{ 'No longer used, present for legacy compatibility' @@ -1482,7 +1492,7 @@ class DB: def cover_abspath(self, book_id, path): path = os.path.join(self.library_path, path) - fmt_path = os.path.join(path, 'cover.jpg') + fmt_path = os.path.join(path, COVER_FILE_NAME) if os.path.exists(fmt_path): return fmt_path @@ -1540,14 +1550,14 @@ class DB: self.move_book_files_to_trash(book_id, paths) def cover_last_modified(self, path): - path = os.path.abspath(os.path.join(self.library_path, path, 'cover.jpg')) + path = os.path.abspath(os.path.join(self.library_path, path, COVER_FILE_NAME)) try: return utcfromtimestamp(os.stat(path).st_mtime) except OSError: pass # Cover doesn't exist def copy_cover_to(self, path, dest, windows_atomic_move=None, use_hardlink=False, report_file_size=None): - path = os.path.abspath(os.path.join(self.library_path, path, 'cover.jpg')) + path = os.path.abspath(os.path.join(self.library_path, path, COVER_FILE_NAME)) if windows_atomic_move is not None: if not isinstance(dest, string_or_bytes): raise Exception('Error, you must pass the dest as a path when' @@ -1590,7 +1600,7 @@ class DB: return False def cover_or_cache(self, path, timestamp): - path = os.path.abspath(os.path.join(self.library_path, path, 'cover.jpg')) + path = os.path.abspath(os.path.join(self.library_path, path, COVER_FILE_NAME)) try: stat = os.stat(path) except OSError: @@ -1611,7 +1621,7 @@ class DB: def progress_callback(book_id, old_sz, new_sz): return None for book_id, path in path_map.items(): - path = os.path.abspath(os.path.join(self.library_path, path, 'cover.jpg')) + path = os.path.abspath(os.path.join(self.library_path, path, COVER_FILE_NAME)) try: sz = os.path.getsize(path) except OSError: @@ -1625,7 +1635,7 @@ class DB: path = os.path.abspath(os.path.join(self.library_path, path)) if not os.path.exists(path): os.makedirs(path) - path = os.path.join(path, 'cover.jpg') + path = os.path.join(path, COVER_FILE_NAME) if callable(getattr(data, 'save', None)): from calibre.gui2 import pixmap_to_data data = pixmap_to_data(data) @@ -1782,7 +1792,7 @@ class DB: os.makedirs(tpath) if source_ok: # Migrate existing files - dest = os.path.join(tpath, 'cover.jpg') + dest = os.path.join(tpath, COVER_FILE_NAME) self.copy_cover_to(current_path, dest, windows_atomic_move=wam, use_hardlink=True) for fmt in formats: @@ -1913,6 +1923,34 @@ class DB: fmap[path] = os.path.join(dest, ext) copy_files(fmap, delete_source=True) + def list_trash_entries(self): + self.ensure_trash_dir() + books, files = [], [] + base = os.path.join(self.trash_dir, 'b') + for x in os.scandir(base): + if x.is_dir(follow_symlinks=False): + try: + book_id = int(x.name) + mtime = x.stat(follow_symlinks=False).st_mtime + except Exception: + continue + books.append(TrashEntry(book_id, x.path, mtime)) + base = os.path.join(self.trash_dir, 'f') + for x in os.scandir(base): + if x.is_dir(follow_symlinks=False): + try: + book_id = int(x.name) + mtime = x.stat(follow_symlinks=False).st_mtime + except Exception: + continue + formats = set() + for f in os.scandir(x.path): + if f.is_file(follow_symlinks=False): + formats.add(f.name.upper()) + if formats: + files.append(TrashEntry(book_id, x.path, mtime, tuple(formats))) + return books, files + def remove_books(self, path_map, permanent=False): self.ensure_trash_dir() self.executemany( diff --git a/src/calibre/db/tests/add_remove.py b/src/calibre/db/tests/add_remove.py index 290962d250..49e25cbc5d 100644 --- a/src/calibre/db/tests/add_remove.py +++ b/src/calibre/db/tests/add_remove.py @@ -265,12 +265,17 @@ class AddRemoveTest(BaseTest): # Check that files are removed fmtpath = cache.format_abspath(1, 'FMT1') bookpath = os.path.dirname(fmtpath) + os.mkdir(os.path.join(bookpath, 'xyz')) + open(os.path.join(bookpath, 'xyz', 'abc'), 'w').close() authorpath = os.path.dirname(bookpath) item_id = {v:k for k, v in iteritems(cache.fields['#series'].table.id_map)}['My Series Two'] cache.remove_books((1,)) for x in (fmtpath, bookpath, authorpath): af(os.path.exists(x), 'The file %s exists, when it should not' % x) - + b, f = cache.backend.list_trash_entries() + self.assertEqual(len(b), 1) + self.assertEqual(len(f), 0) + self.assertTrue(os.path.exists(os.path.join(b[0].book_dir, 'metadata.opf'))) # }}} def test_original_fmt(self): # {{{