Implement remove_formats()

This commit is contained in:
Kovid Goyal 2013-07-05 12:44:19 +05:30
parent df2c497a13
commit 65e31eee12
4 changed files with 101 additions and 4 deletions

View File

@ -26,7 +26,7 @@ from calibre.utils.date import utcfromtimestamp, parse_date
from calibre.utils.filenames import (is_case_sensitive, samefile, hardlink_file, ascii_filename,
WindowsAtomicFolderMove)
from calibre.utils.magick.draw import save_cover_data_to
from calibre.utils.recycle_bin import delete_tree
from calibre.utils.recycle_bin import delete_tree, delete_file
from calibre.db.tables import (OneToOneTable, ManyToOneTable, ManyToManyTable,
SizeTable, FormatsTable, AuthorsTable, IdentifiersTable, PathTable,
CompositeTable, LanguagesTable, UUIDTable)
@ -940,6 +940,15 @@ class DB(object):
def has_format(self, book_id, fmt, fname, path):
return self.format_abspath(book_id, fmt, fname, path) is not None
def remove_format(self, book_id, fmt, fname, path):
path = self.format_abspath(book_id, fmt, fname, path)
if path is not None:
try:
delete_file(path)
except:
import traceback
traceback.print_exc()
def copy_cover_to(self, path, dest, windows_atomic_move=None, use_hardlink=False):
path = os.path.abspath(os.path.join(self.library_path, path, 'cover.jpg'))
if windows_atomic_move is not None:

View File

@ -984,7 +984,7 @@ class Cache(object):
del stream
max_size = self.fields['formats'].table.update_fmt(book_id, fmt, fname, size, self.backend)
self.fields['size'].table.update_size(book_id, max_size)
self.fields['size'].table.update_sizes({book_id: max_size})
self._update_last_modified((book_id,))
if run_hooks:
@ -994,6 +994,33 @@ class Cache(object):
return True
@write_api
def remove_formats(self, formats_map, db_only=False):
table = self.fields['formats'].table
formats_map = {book_id:frozenset((f or '').upper() for f in fmts) for book_id, fmts in formats_map.iteritems()}
size_map = table.remove_formats(formats_map, self.backend)
self.fields['size'].table.update_sizes(size_map)
for book_id, fmts in formats_map.iteritems():
for fmt in fmts:
self.format_metadata_cache[book_id].pop(fmt, None)
if not db_only:
for book_id, fmts in formats_map.iteritems():
try:
path = self._field_for('path', book_id).replace('/', os.sep)
except:
continue
for fmt in fmts:
try:
name = self.fields['formats'].format_fname(book_id, fmt)
except:
continue
if name and path:
self.backend.remove_format(book_id, fmt, name, path)
self._update_last_modified(tuple(formats_map.iterkeys()))
# }}}
class SortKey(object): # {{{

View File

@ -99,8 +99,8 @@ class SizeTable(OneToOneTable):
'WHERE data.book=books.id) FROM books'):
self.book_col_map[row[0]] = self.unserialize(row[1])
def update_size(self, book_id, size):
self.book_col_map[book_id] = size
def update_sizes(self, size_map):
self.book_col_map.update(size_map)
class UUIDTable(OneToOneTable):
@ -220,6 +220,26 @@ class FormatsTable(ManyToManyTable):
db.conn.execute('UPDATE data SET name=? WHERE book=? AND format=?',
(fname, book_id, fmt))
def remove_formats(self, formats_map, db):
for book_id, fmts in formats_map.iteritems():
self.book_col_map[book_id] = [fmt for fmt in self.book_col_map.get(book_id, []) if fmt not in fmts]
for m in (self.fname_map, self.size_map):
m[book_id] = {k:v for k, v in m[book_id].iteritems() if k not in fmts}
for fmt in fmts:
try:
self.col_book_map[fmt].discard(book_id)
except KeyError:
pass
db.conn.executemany('DELETE FROM data WHERE book=? AND format=?',
[(book_id, fmt) for book_id, fmts in formats_map.iteritems() for fmt in fmts])
def zero_max(book_id):
try:
return max(self.size_map[book_id].itervalues())
except ValueError:
return 0
return {book_id:zero_max(book_id) for book_id in formats_map}
def update_fmt(self, book_id, fmt, fname, size, db):
fmts = list(self.book_col_map.get(book_id, []))
try:
@ -259,3 +279,4 @@ class LanguagesTable(ManyToManyTable):
def read_id_maps(self, db):
ManyToManyTable.read_id_maps(self, db)

View File

@ -7,6 +7,7 @@ __license__ = 'GPL v3'
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import os
from io import BytesIO
from tempfile import NamedTemporaryFile
@ -98,4 +99,43 @@ class AddRemoveTest(BaseTest):
# }}}
def test_remove_formats(self): # {{{
'Test removal of formats from book records'
af, ae, at = self.assertFalse, self.assertEqual, self.assertTrue
cache = self.init_cache()
# Test removal of non-existing format does nothing
formats = {bid:tuple(cache.formats(bid)) for bid in (1, 2, 3)}
cache.remove_formats({1:{'NF'}, 2:{'NF'}, 3:{'NF'}})
nformats = {bid:tuple(cache.formats(bid)) for bid in (1, 2, 3)}
ae(formats, nformats)
# Test full removal of format
af(cache.format(1, 'FMT1') is None)
at(cache.has_format(1, 'FMT1'))
cache.remove_formats({1:{'FMT1'}})
at(cache.format(1, 'FMT1') is None)
af(bool(cache.format_metadata(1, 'FMT1')))
af(bool(cache.format_metadata(1, 'FMT1', allow_cache=False)))
af('FMT1' in cache.formats(1))
af(cache.has_format(1, 'FMT1'))
# Test db only removal
at(cache.has_format(1, 'FMT2'))
ap = cache.format_abspath(1, 'FMT2')
if ap and os.path.exists(ap):
cache.remove_formats({1:{'FMT2'}})
af(bool(cache.format_metadata(1, 'FMT2')))
af(cache.has_format(1, 'FMT2'))
at(os.path.exists(ap))
# Test that the old interface agrees
db = self.init_old()
at(db.format(1, 'FMT1', index_is_id=True) is None)
db.close()
del db
# }}}