Implement getting of covers and add tests for it

This commit is contained in:
Kovid Goyal 2013-02-20 13:08:05 +05:30
parent 277f0ce19c
commit 449410d317
3 changed files with 89 additions and 32 deletions

View File

@ -16,15 +16,14 @@ import apsw
from calibre import isbytestring, force_unicode, prints
from calibre.constants import (iswindows, filesystem_encoding,
preferred_encoding)
from calibre.ptempfile import PersistentTemporaryFile, SpooledTemporaryFile
from calibre.db import SPOOL_SIZE
from calibre.ptempfile import PersistentTemporaryFile
from calibre.db.schema_upgrades import SchemaUpgrade
from calibre.library.field_metadata import FieldMetadata
from calibre.ebooks.metadata import title_sort, author_to_author_sort
from calibre.utils.icu import strcmp
from calibre.utils.config import to_json, from_json, prefs, tweaks
from calibre.utils.date import utcfromtimestamp, parse_date
from calibre.utils.filenames import is_case_sensitive
from calibre.utils.filenames import (is_case_sensitive, samefile, hardlink_file)
from calibre.db.tables import (OneToOneTable, ManyToOneTable, ManyToManyTable,
SizeTable, FormatsTable, AuthorsTable, IdentifiersTable,
CompositeTable, LanguagesTable)
@ -863,10 +862,16 @@ class DB(object):
def has_format(self, book_id, fmt):
return self.format_abspath(book_id, fmt) is not None
def cover(self, path, as_file=False, as_image=False,
as_path=False):
def copy_cover_to(self, path, dest, windows_atomic_move=None, use_hardlink=False):
path = os.path.join(self.library_path, path, 'cover.jpg')
ret = None
if windows_atomic_move is not None:
if not isinstance(dest, basestring):
raise Exception("Error, you must pass the dest as a path when"
" using windows_atomic_move")
if os.access(path, os.R_OK) and dest and not samefile(dest, path):
windows_atomic_move.copy_path_to(path, dest)
return True
else:
if os.access(path, os.R_OK):
try:
f = lopen(path, 'rb')
@ -874,23 +879,22 @@ class DB(object):
time.sleep(0.2)
f = lopen(path, 'rb')
with f:
if as_path:
pt = PersistentTemporaryFile('_dbcover.jpg')
with pt:
shutil.copyfileobj(f, pt)
return pt.name
if as_file:
ret = SpooledTemporaryFile(SPOOL_SIZE)
shutil.copyfileobj(f, ret)
ret.seek(0)
else:
ret = f.read()
if as_image:
from PyQt4.Qt import QImage
i = QImage()
i.loadFromData(ret)
ret = i
return ret
if hasattr(dest, 'write'):
shutil.copyfileobj(f, dest)
if hasattr(dest, 'flush'):
dest.flush()
return True
elif dest and not samefile(dest, path):
if use_hardlink:
try:
hardlink_file(path, dest)
return True
except:
pass
with lopen(dest, 'wb') as d:
shutil.copyfileobj(f, d)
return True
return False
# }}}

View File

@ -8,9 +8,11 @@ __copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import os, traceback
from io import BytesIO
from collections import defaultdict
from functools import wraps, partial
from calibre.db import SPOOL_SIZE
from calibre.db.categories import get_categories
from calibre.db.locking import create_locks, RecordLock
from calibre.db.fields import create_field
@ -18,6 +20,7 @@ from calibre.db.search import Search
from calibre.db.tables import VirtualTable
from calibre.db.lazy import FormatMetadata, FormatsList
from calibre.ebooks.metadata.book.base import Metadata
from calibre.ptempfile import PersistentTemporaryFile, SpooledTemporaryFile
from calibre.utils.date import now
from calibre.utils.icu import sort_key
@ -397,15 +400,43 @@ class Cache(object):
:param as_path: If True return the image as a path pointing to a
temporary file
'''
if as_file:
ret = SpooledTemporaryFile(SPOOL_SIZE)
if not self.copy_cover_to(book_id, ret): return
ret.seek(0)
elif as_path:
pt = PersistentTemporaryFile('_dbcover.jpg')
with pt:
if not self.copy_cover_to(book_id, pt): return
ret = pt.name
else:
buf = BytesIO()
if not self.copy_cover_to(book_id, buf): return
ret = buf.getvalue()
if as_image:
from PyQt4.Qt import QImage
i = QImage()
i.loadFromData(ret)
ret = i
return ret
@api
def copy_cover_to(self, book_id, dest):
'''
Copy the cover to the file like object ``dest``. Returns False
if no cover exists or dest is the same file as the current cover.
dest can also be a path in which case the cover is
copied to it iff the path is different from the current path (taking
case sensitivity into account).
'''
with self.read_lock:
try:
path = self._field_for('path', book_id).replace('/', os.sep)
except:
return None
return False
with self.record_lock.lock(book_id):
return self.backend.cover(path, as_file=as_file, as_image=as_image,
as_path=as_path)
return self.backend.copy_cover_to(path, dest)
@read_api
def multisort(self, fields, ids_to_sort=None):

View File

@ -176,6 +176,10 @@ class ReadingTest(BaseTest):
old_metadata = {i:old.get_metadata(
i, index_is_id=True, get_cover=True, cover_as_data=True) for i in
xrange(1, 4)}
for mi in old_metadata.itervalues():
mi.format_metadata = dict(mi.format_metadata)
if mi.formats:
mi.formats = tuple(mi.formats)
old = None
cache = self.init_cache(self.library_path)
@ -186,6 +190,24 @@ class ReadingTest(BaseTest):
for mi2, mi1 in zip(new_metadata.values(), old_metadata.values()):
self.compare_metadata(mi1, mi2)
def test_get_cover(self): # {{{
'Test cover() returns the same data for both backends'
from calibre.library.database2 import LibraryDatabase2
old = LibraryDatabase2(self.library_path)
covers = {i: old.cover(i, index_is_id=True) for i in (1, 2, 3)}
old = None
cache = self.init_cache(self.library_path)
for book_id, cdata in covers.iteritems():
self.assertEqual(cdata, cache.cover(book_id), 'Reading of cover failed')
f = cache.cover(book_id, as_file=True)
self.assertEqual(cdata, f.read() if f else f, 'Reading of cover as file failed')
if cdata:
with open(cache.cover(book_id, as_path=True), 'rb') as f:
self.assertEqual(cdata, f.read(), 'Reading of cover as path failed')
else:
self.assertEqual(cdata, cache.cover(book_id, as_path=True),
'Reading of null cover as path failed')
# }}}
def test_searching(self): # {{{