Code to list entries in trash

This commit is contained in:
Kovid Goyal 2023-04-10 15:16:38 +05:30
parent 53ef74ec85
commit 5c5ac19935
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 53 additions and 10 deletions

View File

@ -16,7 +16,9 @@ import sys
import time import time
import uuid import uuid
from contextlib import closing, suppress from contextlib import closing, suppress
from dataclasses import dataclass
from functools import partial from functools import partial
from typing import Sequence
from calibre import as_unicode, force_unicode, isbytestring, prints from calibre import as_unicode, force_unicode, isbytestring, prints
from calibre.constants import ( from calibre.constants import (
@ -35,7 +37,7 @@ from calibre.library.field_metadata import FieldMetadata
from calibre.ptempfile import PersistentTemporaryFile, TemporaryFile from calibre.ptempfile import PersistentTemporaryFile, TemporaryFile
from calibre.utils import pickle_binary_string, unpickle_binary_string from calibre.utils import pickle_binary_string, unpickle_binary_string
from calibre.utils.config import from_json, prefs, to_json, tweaks from calibre.utils.config import from_json, prefs, to_json, tweaks
from calibre.utils.copy_files import copy_tree, copy_files from calibre.utils.copy_files import copy_files, copy_tree
from calibre.utils.date import EPOCH, parse_date, utcfromtimestamp, utcnow from calibre.utils.date import EPOCH, parse_date, utcfromtimestamp, utcnow
from calibre.utils.filenames import ( from calibre.utils.filenames import (
WindowsAtomicFolderMove, ascii_filename, atomic_rename, copyfile_using_links, WindowsAtomicFolderMove, ascii_filename, atomic_rename, copyfile_using_links,
@ -54,7 +56,7 @@ from polyglot.builtins import (
# }}} # }}}
COVER_FILE_NAME = 'cover.jpg'
TRASH_DIR_NAME = '.caltrash' TRASH_DIR_NAME = '.caltrash'
BOOK_ID_PATH_TEMPLATE = ' ({})' BOOK_ID_PATH_TEMPLATE = ' ({})'
CUSTOM_DATA_TYPES = frozenset(('rating', 'text', 'comments', 'datetime', CUSTOM_DATA_TYPES = frozenset(('rating', 'text', 'comments', 'datetime',
@ -62,6 +64,14 @@ CUSTOM_DATA_TYPES = frozenset(('rating', 'text', 'comments', 'datetime',
WINDOWS_RESERVED_NAMES = frozenset('CON PRN AUX NUL COM1 COM2 COM3 COM4 COM5 COM6 COM7 COM8 COM9 LPT1 LPT2 LPT3 LPT4 LPT5 LPT6 LPT7 LPT8 LPT9'.split()) WINDOWS_RESERVED_NAMES = frozenset('CON PRN AUX NUL COM1 COM2 COM3 COM4 COM5 COM6 COM7 COM8 COM9 LPT1 LPT2 LPT3 LPT4 LPT5 LPT6 LPT7 LPT8 LPT9'.split())
@dataclass
class TrashEntry:
book_id: int
book_dir: str
mtime: float
formats: Sequence[str] = ()
class DynamicFilter: # {{{ class DynamicFilter: # {{{
'No longer used, present for legacy compatibility' 'No longer used, present for legacy compatibility'
@ -1482,7 +1492,7 @@ class DB:
def cover_abspath(self, book_id, path): def cover_abspath(self, book_id, path):
path = os.path.join(self.library_path, path) path = os.path.join(self.library_path, path)
fmt_path = os.path.join(path, 'cover.jpg') fmt_path = os.path.join(path, COVER_FILE_NAME)
if os.path.exists(fmt_path): if os.path.exists(fmt_path):
return fmt_path return fmt_path
@ -1540,14 +1550,14 @@ class DB:
self.move_book_files_to_trash(book_id, paths) self.move_book_files_to_trash(book_id, paths)
def cover_last_modified(self, path): def cover_last_modified(self, path):
path = os.path.abspath(os.path.join(self.library_path, path, 'cover.jpg')) path = os.path.abspath(os.path.join(self.library_path, path, COVER_FILE_NAME))
try: try:
return utcfromtimestamp(os.stat(path).st_mtime) return utcfromtimestamp(os.stat(path).st_mtime)
except OSError: except OSError:
pass # Cover doesn't exist pass # Cover doesn't exist
def copy_cover_to(self, path, dest, windows_atomic_move=None, use_hardlink=False, report_file_size=None): def copy_cover_to(self, path, dest, windows_atomic_move=None, use_hardlink=False, report_file_size=None):
path = os.path.abspath(os.path.join(self.library_path, path, 'cover.jpg')) path = os.path.abspath(os.path.join(self.library_path, path, COVER_FILE_NAME))
if windows_atomic_move is not None: if windows_atomic_move is not None:
if not isinstance(dest, string_or_bytes): if not isinstance(dest, string_or_bytes):
raise Exception('Error, you must pass the dest as a path when' raise Exception('Error, you must pass the dest as a path when'
@ -1590,7 +1600,7 @@ class DB:
return False return False
def cover_or_cache(self, path, timestamp): def cover_or_cache(self, path, timestamp):
path = os.path.abspath(os.path.join(self.library_path, path, 'cover.jpg')) path = os.path.abspath(os.path.join(self.library_path, path, COVER_FILE_NAME))
try: try:
stat = os.stat(path) stat = os.stat(path)
except OSError: except OSError:
@ -1611,7 +1621,7 @@ class DB:
def progress_callback(book_id, old_sz, new_sz): def progress_callback(book_id, old_sz, new_sz):
return None return None
for book_id, path in path_map.items(): for book_id, path in path_map.items():
path = os.path.abspath(os.path.join(self.library_path, path, 'cover.jpg')) path = os.path.abspath(os.path.join(self.library_path, path, COVER_FILE_NAME))
try: try:
sz = os.path.getsize(path) sz = os.path.getsize(path)
except OSError: except OSError:
@ -1625,7 +1635,7 @@ class DB:
path = os.path.abspath(os.path.join(self.library_path, path)) path = os.path.abspath(os.path.join(self.library_path, path))
if not os.path.exists(path): if not os.path.exists(path):
os.makedirs(path) os.makedirs(path)
path = os.path.join(path, 'cover.jpg') path = os.path.join(path, COVER_FILE_NAME)
if callable(getattr(data, 'save', None)): if callable(getattr(data, 'save', None)):
from calibre.gui2 import pixmap_to_data from calibre.gui2 import pixmap_to_data
data = pixmap_to_data(data) data = pixmap_to_data(data)
@ -1782,7 +1792,7 @@ class DB:
os.makedirs(tpath) os.makedirs(tpath)
if source_ok: # Migrate existing files if source_ok: # Migrate existing files
dest = os.path.join(tpath, 'cover.jpg') dest = os.path.join(tpath, COVER_FILE_NAME)
self.copy_cover_to(current_path, dest, self.copy_cover_to(current_path, dest,
windows_atomic_move=wam, use_hardlink=True) windows_atomic_move=wam, use_hardlink=True)
for fmt in formats: for fmt in formats:
@ -1913,6 +1923,34 @@ class DB:
fmap[path] = os.path.join(dest, ext) fmap[path] = os.path.join(dest, ext)
copy_files(fmap, delete_source=True) copy_files(fmap, delete_source=True)
def list_trash_entries(self):
self.ensure_trash_dir()
books, files = [], []
base = os.path.join(self.trash_dir, 'b')
for x in os.scandir(base):
if x.is_dir(follow_symlinks=False):
try:
book_id = int(x.name)
mtime = x.stat(follow_symlinks=False).st_mtime
except Exception:
continue
books.append(TrashEntry(book_id, x.path, mtime))
base = os.path.join(self.trash_dir, 'f')
for x in os.scandir(base):
if x.is_dir(follow_symlinks=False):
try:
book_id = int(x.name)
mtime = x.stat(follow_symlinks=False).st_mtime
except Exception:
continue
formats = set()
for f in os.scandir(x.path):
if f.is_file(follow_symlinks=False):
formats.add(f.name.upper())
if formats:
files.append(TrashEntry(book_id, x.path, mtime, tuple(formats)))
return books, files
def remove_books(self, path_map, permanent=False): def remove_books(self, path_map, permanent=False):
self.ensure_trash_dir() self.ensure_trash_dir()
self.executemany( self.executemany(

View File

@ -265,12 +265,17 @@ class AddRemoveTest(BaseTest):
# Check that files are removed # Check that files are removed
fmtpath = cache.format_abspath(1, 'FMT1') fmtpath = cache.format_abspath(1, 'FMT1')
bookpath = os.path.dirname(fmtpath) bookpath = os.path.dirname(fmtpath)
os.mkdir(os.path.join(bookpath, 'xyz'))
open(os.path.join(bookpath, 'xyz', 'abc'), 'w').close()
authorpath = os.path.dirname(bookpath) authorpath = os.path.dirname(bookpath)
item_id = {v:k for k, v in iteritems(cache.fields['#series'].table.id_map)}['My Series Two'] item_id = {v:k for k, v in iteritems(cache.fields['#series'].table.id_map)}['My Series Two']
cache.remove_books((1,)) cache.remove_books((1,))
for x in (fmtpath, bookpath, authorpath): for x in (fmtpath, bookpath, authorpath):
af(os.path.exists(x), 'The file %s exists, when it should not' % x) af(os.path.exists(x), 'The file %s exists, when it should not' % x)
b, f = cache.backend.list_trash_entries()
self.assertEqual(len(b), 1)
self.assertEqual(len(f), 0)
self.assertTrue(os.path.exists(os.path.join(b[0].book_dir, 'metadata.opf')))
# }}} # }}}
def test_original_fmt(self): # {{{ def test_original_fmt(self): # {{{