update_path() now preserves non format files/dirs in book directories

This commit is contained in:
Kovid Goyal 2023-04-16 18:07:05 +05:30
parent 8c8e6fda0b
commit eac7146286
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 152 additions and 76 deletions

View File

@ -37,7 +37,7 @@ from calibre.library.field_metadata import FieldMetadata
from calibre.ptempfile import PersistentTemporaryFile, TemporaryFile
from calibre.utils import pickle_binary_string, unpickle_binary_string
from calibre.utils.config import from_json, prefs, to_json, tweaks
from calibre.utils.copy_files import copy_files, copy_tree
from calibre.utils.copy_files import copy_files, copy_tree, rename_files
from calibre.utils.date import EPOCH, parse_date, utcfromtimestamp, utcnow
from calibre.utils.filenames import (
WindowsAtomicFolderMove, ascii_filename, atomic_rename, copyfile_using_links,
@ -423,6 +423,8 @@ def rmtree_with_retry(path, sleep_time=1):
try:
shutil.rmtree(path)
except OSError as e:
if not iswindows:
raise
if e.errno == errno.ENOENT and not os.path.exists(path):
return
time.sleep(sleep_time) # In case something has temporarily locked a file
@ -1469,7 +1471,7 @@ class DB:
if book_dir.name.endswith(q) and book_dir.is_dir():
return book_dir.path
def format_abspath(self, book_id, fmt, fname, book_path):
def format_abspath(self, book_id, fmt, fname, book_path, do_file_rename=True):
path = os.path.join(self.library_path, book_path)
fmt = ('.' + fmt.lower()) if fmt else ''
fmt_path = os.path.join(path, fname+fmt)
@ -1479,11 +1481,13 @@ class DB:
return
candidates = ()
with suppress(OSError):
candidates = os.listdir(path)
candidates = os.scandir(path)
q = fmt.lower()
for x in candidates:
if x.lower().endswith(q):
x = os.path.join(path, x)
if x.name.endswith(q) and x.is_file():
if not do_file_rename:
return x.path
x = x.path
with suppress(OSError):
atomic_rename(x, fmt_path)
return fmt_path
@ -1783,73 +1787,57 @@ class DB:
return size, fname
def update_path(self, book_id, title, author, path_field, formats_field):
path = self.construct_path_name(book_id, title, author)
current_path = path_field.for_book(book_id, default_value='')
path = self.construct_path_name(book_id, title, author)
formats = formats_field.for_book(book_id, default_value=())
try:
extlen = max(len(fmt) for fmt in formats) + 1
except ValueError:
extlen = 10
fname = self.construct_file_name(book_id, title, author, extlen)
# Check if the metadata used to construct paths has changed
def rename_format_files():
changed = False
for fmt in formats:
name = formats_field.format_fname(book_id, fmt)
if name and name != fname:
changed = True
break
if path == current_path and not changed:
return
spath = os.path.join(self.library_path, *current_path.split('/'))
tpath = os.path.join(self.library_path, *path.split('/'))
source_ok = current_path and os.path.exists(spath)
wam = WindowsAtomicFolderMove(spath) if iswindows and source_ok else None
format_map = {}
original_format_map = {}
try:
if not os.path.exists(tpath):
os.makedirs(tpath)
if source_ok: # Migrate existing files
dest = os.path.join(tpath, COVER_FILE_NAME)
self.copy_cover_to(current_path, dest,
windows_atomic_move=wam, use_hardlink=True)
if changed:
rename_map = {}
for fmt in formats:
dest = os.path.join(tpath, fname+'.'+fmt.lower())
format_map[fmt] = dest
ofmt_fname = formats_field.format_fname(book_id, fmt)
original_format_map[fmt] = os.path.join(spath, ofmt_fname+'.'+fmt.lower())
self.copy_format_to(book_id, fmt, ofmt_fname, current_path,
dest, windows_atomic_move=wam, use_hardlink=True)
# Update db to reflect new file locations
current_fname = formats_field.format_fname(book_id, fmt)
current_fmt_path = self.format_abspath(book_id, fmt, current_fname, current_path, do_file_rename=False)
if current_fmt_path:
new_fmt_path = os.path.abspath(os.path.join(os.path.dirname(current_fmt_path), fname + '.' + fmt.lower()))
if current_fmt_path != new_fmt_path:
rename_map[current_fmt_path] = new_fmt_path
if rename_map:
rename_files(rename_map)
return changed
def update_paths_in_db():
with self.conn:
for fmt in formats:
formats_field.table.set_fname(book_id, fmt, fname, self)
path_field.table.set_path(book_id, path, self)
# Delete not needed files and directories
if source_ok:
if os.path.exists(spath):
if samefile(spath, tpath):
# The format filenames may have changed while the folder
# name remains the same
for fmt, opath in iteritems(original_format_map):
npath = format_map.get(fmt, None)
if npath and os.path.abspath(npath.lower()) != os.path.abspath(opath.lower()) and samefile(opath, npath):
# opath and npath are different hard links to the same file
os.unlink(opath)
else:
if wam is not None:
wam.delete_originals()
self.rmtree(spath)
parent = os.path.dirname(spath)
if len(os.listdir(parent)) == 0:
self.rmtree(parent)
finally:
if wam is not None:
wam.close_handles()
if not current_path:
update_paths_in_db()
return
if path == current_path:
# Only format paths have possibly changed
if rename_format_files():
update_paths_in_db()
return
spath = os.path.join(self.library_path, *current_path.split('/'))
tpath = os.path.join(self.library_path, *path.split('/'))
if samefile(spath, tpath):
# format paths changed and case of path to book folder changed
rename_format_files()
update_paths_in_db()
curpath = self.library_path
c1, c2 = current_path.split('/'), path.split('/')
if not self.is_case_sensitive and len(c1) == len(c2):
@ -1864,11 +1852,37 @@ class DB:
for oldseg, newseg in zip(c1, c2):
if oldseg.lower() == newseg.lower() and oldseg != newseg:
try:
os.rename(os.path.join(curpath, oldseg),
os.path.join(curpath, newseg))
except:
os.replace(os.path.join(curpath, oldseg), os.path.join(curpath, newseg))
except OSError:
break # Fail silently since nothing catastrophic has happened
curpath = os.path.join(curpath, newseg)
return
with suppress(FileNotFoundError):
self.rmtree(tpath)
lfmts = tuple(fmt.lower() for fmt in formats)
existing_format_filenames = {}
for fmt in lfmts:
current_fname = formats_field.format_fname(book_id, fmt)
current_fmt_path = self.format_abspath(book_id, fmt, current_fname, current_path, do_file_rename=False)
if current_fmt_path:
existing_format_filenames[os.path.basename(current_fmt_path)] = fmt
def transform_format_filenames(src_path, dest_path):
src_dir, src_filename = os.path.split(os.path.abspath(src_path))
if src_dir != spath:
return dest_path
fmt = existing_format_filenames.get(src_filename)
if not fmt:
return dest_path
return os.path.join(os.path.dirname(dest_path), fname + '.' + fmt)
if os.path.exists(spath):
copy_tree(os.path.abspath(spath), tpath, delete_source=True, transform_destination_filename=transform_format_filenames)
else:
os.makedirs(tpath)
update_paths_in_db()
def write_backup(self, path, raw):
path = os.path.abspath(os.path.join(self.library_path, path, 'metadata.opf'))

View File

@ -67,6 +67,68 @@ class FilesystemTest(BaseTest):
part = fpath[-x:][0]
self.assertIn(part, os.listdir(base))
initial_side_data = {}
def init_cache():
nonlocal cache, initial_side_data
cache = self.init_cache(self.cloned_library)
bookdir = os.path.dirname(cache.format_abspath(1, '__COVER_INTERNAL__'))
with open(os.path.join(bookdir, 'a.side'), 'w') as f:
f.write('a.side')
os.mkdir(os.path.join(bookdir, 'subdir'))
with open(os.path.join(bookdir, 'subdir', 'a.fmt1'), 'w') as f:
f.write('a.fmt1')
initial_side_data = side_data()
def side_data(book_id=1):
bookdir = os.path.dirname(cache.format_abspath(book_id, '__COVER_INTERNAL__'))
return {
'a.side': open(os.path.join(bookdir, 'a.side')).read(),
'a.fmt1': open(os.path.join(bookdir, 'subdir', 'a.fmt1')).read(),
}
def check_that_filesystem_and_db_entries_match(book_id):
bookdir = os.path.dirname(cache.format_abspath(book_id, '__COVER_INTERNAL__'))
if iswindows:
from calibre_extensions import winutil
bookdir = winutil.get_long_path_name(bookdir)
bookdir_contents = set(os.listdir(bookdir))
expected_contents = {'cover.jpg', 'a.side', 'subdir'}
for fmt, fname in cache.fields['formats'].table.fname_map[book_id].items():
expected_contents.add(fname + '.' + fmt.lower())
ae(expected_contents, bookdir_contents)
fs_path = bookdir.split(os.sep)[-2:]
db_path = cache.field_for('path', book_id).split('/')
ae(db_path, fs_path)
ae(initial_side_data, side_data(book_id))
# test only formats being changed
init_cache()
fname = cache.fields['formats'].table.fname_map[1]['FMT1']
cache.fields['formats'].table.fname_map[1]['FMT1'] = 'some thing else'
cache.fields['formats'].table.fname_map[1]['FMT2'] = fname.upper()
cache.backend.update_path(1, cache.field_for('title', 1), cache.field_for('authors', 1)[0], cache.fields['path'], cache.fields['formats'])
check_that_filesystem_and_db_entries_match(1)
# test a case only change
init_cache()
title = cache.field_for('title', 1)
self.assertNotEqual(title, title.upper())
cache.set_field('title', {1: title.upper()})
check_that_filesystem_and_db_entries_match(1)
# test a title change
init_cache()
cache.set_field('title', {1: 'new changed title'})
check_that_filesystem_and_db_entries_match(1)
# test an author change
cache.set_field('authors', {1: ('new changed author',)})
check_that_filesystem_and_db_entries_match(1)
# test a double change
from calibre.ebooks.metadata.book.base import Metadata
cache.set_metadata(1, Metadata('t1', ('a1', 'a2')))
check_that_filesystem_and_db_entries_match(1)
@unittest.skipUnless(iswindows, 'Windows only')
def test_windows_atomic_move(self):
'Test book file open in another process when changing metadata'