newdb: Fix replacing formats in a book creating orphans

When replacing formats in a book with a very long title+authors on
windows, newdb could leave behind the old format file, because the
filename shortening algorithm has changed. Handle that case.
This commit is contained in:
Kovid Goyal 2013-08-27 14:23:42 +05:30
parent 121469daa5
commit 41bcffea92
3 changed files with 36 additions and 3 deletions

View File

@ -1337,7 +1337,7 @@ class DB(object):
if wam is not None: if wam is not None:
wam.close_handles() wam.close_handles()
def add_format(self, book_id, fmt, stream, title, author, path): def add_format(self, book_id, fmt, stream, title, author, path, current_name):
fmt = ('.' + fmt.lower()) if fmt else '' fmt = ('.' + fmt.lower()) if fmt else ''
fname = self.construct_file_name(book_id, title, author, len(fmt)) fname = self.construct_file_name(book_id, title, author, len(fmt))
path = os.path.join(self.library_path, path) path = os.path.join(self.library_path, path)
@ -1345,6 +1345,22 @@ class DB(object):
if not os.path.exists(path): if not os.path.exists(path):
os.makedirs(path) os.makedirs(path)
size = 0 size = 0
if current_name is not None:
old_path = os.path.join(path, current_name + fmt)
if old_path != dest:
# Ensure that the old format file is not orphaned, this can
# happen if the algorithm in construct_file_name is changed.
try:
# rename rather than remove, so that if something goes
# wrong in the rest of this function, at least the file is
# not deleted
os.rename(old_path, dest)
except EnvironmentError as e:
if getattr(e, 'errno', None) != errno.ENOENT:
# Failing to rename the old format will at worst leave a
# harmless orphan, so log and ignore the error
import traceback
traceback.print_exc()
if (not getattr(stream, 'name', False) or not samefile(dest, stream.name)): if (not getattr(stream, 'name', False) or not samefile(dest, stream.name)):
with lopen(dest, 'wb') as f: with lopen(dest, 'wb') as f:

View File

@ -1201,7 +1201,8 @@ class Cache(object):
title = self._field_for('title', book_id, default_value=_('Unknown')) title = self._field_for('title', book_id, default_value=_('Unknown'))
author = self._field_for('authors', book_id, default_value=(_('Unknown'),))[0] author = self._field_for('authors', book_id, default_value=(_('Unknown'),))[0]
stream = stream_or_path if hasattr(stream_or_path, 'read') else lopen(stream_or_path, 'rb') stream = stream_or_path if hasattr(stream_or_path, 'read') else lopen(stream_or_path, 'rb')
size, fname = self.backend.add_format(book_id, fmt, stream, title, author, path)
size, fname = self.backend.add_format(book_id, fmt, stream, title, author, path, name)
del stream del stream
max_size = self.fields['formats'].table.update_fmt(book_id, fmt, fname, size, self.backend) max_size = self.fields['formats'].table.update_fmt(book_id, fmt, fname, size, self.backend)

View File

@ -7,7 +7,7 @@ __license__ = 'GPL v3'
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>' __copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
__docformat__ = 'restructuredtext en' __docformat__ = 'restructuredtext en'
import os import os, glob
from io import BytesIO from io import BytesIO
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from datetime import timedelta from datetime import timedelta
@ -285,3 +285,19 @@ class AddRemoveTest(BaseTest):
af(db.has_format(1, 'ORIGINAL_FMT1')) af(db.has_format(1, 'ORIGINAL_FMT1'))
ae(set(fmts), set(db.formats(1, verify_formats=False))) ae(set(fmts), set(db.formats(1, verify_formats=False)))
# }}} # }}}
def test_format_orphan(self): # {{{
' Test that adding formats does not create orphans if the file name algorithm changes '
cache = self.init_cache()
path = cache.format_abspath(1, 'FMT1')
base, name = os.path.split(path)
prefix = 'mushroomxx'
os.rename(path, os.path.join(base, prefix + name))
cache.fields['formats'].table.fname_map[1]['FMT1'] = prefix + os.path.splitext(name)[0]
old = glob.glob(os.path.join(base, '*.fmt1'))
cache.add_format(1, 'FMT1', BytesIO(b'xxxx'), run_hooks=False)
new = glob.glob(os.path.join(base, '*.fmt1'))
self.assertNotEqual(old, new)
self.assertEqual(len(old), len(new))
self.assertNotIn(prefix, cache.fields['formats'].format_fname(1, 'FMT1'))
# }}}