mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-09 03:04:10 -04:00
Add API to get formats
This commit is contained in:
parent
d6b3eb4969
commit
dcec9fe605
@ -859,8 +859,8 @@ class DB(object):
|
||||
ans['mtime'] = utcfromtimestamp(stat.st_mtime)
|
||||
return ans
|
||||
|
||||
def has_format(self, book_id, fmt):
|
||||
return self.format_abspath(book_id, fmt) is not None
|
||||
def has_format(self, book_id, fmt, fname, path):
|
||||
return self.format_abspath(book_id, fmt, fname, path) is not None
|
||||
|
||||
def copy_cover_to(self, path, dest, windows_atomic_move=None, use_hardlink=False):
|
||||
path = os.path.join(self.library_path, path, 'cover.jpg')
|
||||
@ -896,5 +896,33 @@ class DB(object):
|
||||
return True
|
||||
return False
|
||||
|
||||
def copy_format_to(self, book_id, fmt, fname, path, dest,
|
||||
windows_atomic_move=None, use_hardlink=False):
|
||||
path = self.format_abspath(book_id, fmt, fname, path)
|
||||
if path is None:
|
||||
return False
|
||||
if windows_atomic_move is not None:
|
||||
if not isinstance(dest, basestring):
|
||||
raise Exception("Error, you must pass the dest as a path when"
|
||||
" using windows_atomic_move")
|
||||
if dest and not samefile(dest, path):
|
||||
windows_atomic_move.copy_path_to(path, dest)
|
||||
else:
|
||||
if hasattr(dest, 'write'):
|
||||
with lopen(path, 'rb') as f:
|
||||
shutil.copyfileobj(f, dest)
|
||||
if hasattr(dest, 'flush'):
|
||||
dest.flush()
|
||||
elif dest and not samefile(dest, path):
|
||||
if use_hardlink:
|
||||
try:
|
||||
hardlink_file(path, dest)
|
||||
return True
|
||||
except:
|
||||
pass
|
||||
with lopen(path, 'rb') as f, lopen(dest, 'wb') as d:
|
||||
shutil.copyfileobj(f, d)
|
||||
return True
|
||||
|
||||
# }}}
|
||||
|
||||
|
@ -15,12 +15,14 @@ from functools import wraps, partial
|
||||
from calibre.db import SPOOL_SIZE
|
||||
from calibre.db.categories import get_categories
|
||||
from calibre.db.locking import create_locks, RecordLock
|
||||
from calibre.db.errors import NoSuchFormat
|
||||
from calibre.db.fields import create_field
|
||||
from calibre.db.search import Search
|
||||
from calibre.db.tables import VirtualTable
|
||||
from calibre.db.lazy import FormatMetadata, FormatsList
|
||||
from calibre.ebooks.metadata.book.base import Metadata
|
||||
from calibre.ptempfile import PersistentTemporaryFile, SpooledTemporaryFile
|
||||
from calibre.ptempfile import (base_dir, PersistentTemporaryFile,
|
||||
SpooledTemporaryFile)
|
||||
from calibre.utils.date import now
|
||||
from calibre.utils.icu import sort_key
|
||||
|
||||
@ -106,27 +108,6 @@ class Cache(object):
|
||||
def field_metadata(self):
|
||||
return self.backend.field_metadata
|
||||
|
||||
def _format_abspath(self, book_id, fmt):
|
||||
'''
|
||||
Return absolute path to the ebook file of format `format`
|
||||
|
||||
WARNING: This method will return a dummy path for a network backend DB,
|
||||
so do not rely on it, use format(..., as_path=True) instead.
|
||||
|
||||
Currently used only in calibredb list, the viewer and the catalogs (via
|
||||
get_data_as_dict()).
|
||||
|
||||
Apart from the viewer, I don't believe any of the others do any file
|
||||
I/O with the results of this call.
|
||||
'''
|
||||
try:
|
||||
name = self.fields['formats'].format_fname(book_id, fmt)
|
||||
path = self._field_for('path', book_id).replace('/', os.sep)
|
||||
except:
|
||||
return None
|
||||
if name and path:
|
||||
return self.backend.format_abspath(book_id, fmt, name, path)
|
||||
|
||||
def _get_metadata(self, book_id, get_user_categories=True): # {{{
|
||||
mi = Metadata(None, template_cache=self.formatter_template_cache)
|
||||
author_ids = self._field_ids_for('authors', book_id)
|
||||
@ -421,7 +402,7 @@ class Cache(object):
|
||||
return ret
|
||||
|
||||
@api
|
||||
def copy_cover_to(self, book_id, dest):
|
||||
def copy_cover_to(self, book_id, dest, use_hardlink=False):
|
||||
'''
|
||||
Copy the cover to the file like object ``dest``. Returns False
|
||||
if no cover exists or dest is the same file as the current cover.
|
||||
@ -436,7 +417,148 @@ class Cache(object):
|
||||
return False
|
||||
|
||||
with self.record_lock.lock(book_id):
|
||||
return self.backend.copy_cover_to(path, dest)
|
||||
return self.backend.copy_cover_to(path, dest,
|
||||
use_hardlink=use_hardlink)
|
||||
|
||||
@api
|
||||
def copy_format_to(self, book_id, fmt, dest, use_hardlink=False):
|
||||
'''
|
||||
Copy the format ``fmt`` to the file like object ``dest``. If the
|
||||
specified format does not exist, raises :class:`NoSuchFormat` error.
|
||||
dest can also be a path, in which case the format is copied to it, iff
|
||||
the path is different from the current path (taking case sensitivity
|
||||
into account).
|
||||
'''
|
||||
with self.read_lock:
|
||||
try:
|
||||
name = self.fields['formats'].format_fname(book_id, fmt)
|
||||
path = self._field_for('path', book_id).replace('/', os.sep)
|
||||
except:
|
||||
raise NoSuchFormat('Record %d has no %s file'%(book_id, fmt))
|
||||
|
||||
with self.record_lock.lock(book_id):
|
||||
return self.backend.copy_format_to(book_id, fmt, name, path, dest,
|
||||
use_hardlink=use_hardlink)
|
||||
|
||||
@read_api
|
||||
def format_abspath(self, book_id, fmt):
|
||||
'''
|
||||
Return absolute path to the ebook file of format `format`
|
||||
|
||||
Currently used only in calibredb list, the viewer and the catalogs (via
|
||||
get_data_as_dict()).
|
||||
|
||||
Apart from the viewer, I don't believe any of the others do any file
|
||||
I/O with the results of this call.
|
||||
'''
|
||||
try:
|
||||
name = self.fields['formats'].format_fname(book_id, fmt)
|
||||
path = self._field_for('path', book_id).replace('/', os.sep)
|
||||
except:
|
||||
return None
|
||||
if name and path:
|
||||
return self.backend.format_abspath(book_id, fmt, name, path)
|
||||
|
||||
@read_api
|
||||
def has_format(self, book_id, fmt):
|
||||
'Return True iff the format exists on disk'
|
||||
try:
|
||||
name = self.fields['formats'].format_fname(book_id, fmt)
|
||||
path = self._field_for('path', book_id).replace('/', os.sep)
|
||||
except:
|
||||
return False
|
||||
return self.backend.has_format(book_id, fmt, name, path)
|
||||
|
||||
@read_api
|
||||
def formats(self, book_id, verify_formats=True):
|
||||
'''
|
||||
Return tuple of all formats for the specified book. If verify_formats
|
||||
is True, verifies that the files exist on disk.
|
||||
'''
|
||||
ans = self.field_for('formats', book_id)
|
||||
if verify_formats and ans:
|
||||
try:
|
||||
path = self._field_for('path', book_id).replace('/', os.sep)
|
||||
except:
|
||||
return ()
|
||||
def verify(fmt):
|
||||
try:
|
||||
name = self.fields['formats'].format_fname(book_id, fmt)
|
||||
except:
|
||||
return False
|
||||
return self.backend.has_format(book_id, fmt, name, path)
|
||||
|
||||
ans = tuple(x for x in ans if verify(x))
|
||||
return ans
|
||||
|
||||
@api
|
||||
def format(self, book_id, fmt, as_file=False, as_path=False, preserve_filename=False):
|
||||
'''
|
||||
Return the ebook format as a bytestring or `None` if the format doesn't exist,
|
||||
or we don't have permission to write to the ebook file.
|
||||
|
||||
:param as_file: If True the ebook format is returned as a file object. Note
|
||||
that the file object is a SpooledTemporaryFile, so if what you want to
|
||||
do is copy the format to another file, use :method:`copy_format_to`
|
||||
instead for performance.
|
||||
:param as_path: Copies the format file to a temp file and returns the
|
||||
path to the temp file
|
||||
:param preserve_filename: If True and returning a path the filename is
|
||||
the same as that used in the library. Note that using
|
||||
this means that repeated calls yield the same
|
||||
temp file (which is re-created each time)
|
||||
'''
|
||||
with self.read_lock:
|
||||
ext = ('.'+fmt.lower()) if fmt else ''
|
||||
try:
|
||||
fname = self.fields['formats'].format_fname(book_id, fmt)
|
||||
except:
|
||||
return None
|
||||
fname += ext
|
||||
|
||||
if as_path:
|
||||
if preserve_filename:
|
||||
bd = base_dir()
|
||||
d = os.path.join(bd, 'format_abspath')
|
||||
try:
|
||||
os.makedirs(d)
|
||||
except:
|
||||
pass
|
||||
ret = os.path.join(d, fname)
|
||||
with self.record_lock.lock(book_id):
|
||||
try:
|
||||
self.copy_format_to(book_id, fmt, ret)
|
||||
except NoSuchFormat:
|
||||
return None
|
||||
else:
|
||||
with PersistentTemporaryFile(ext) as pt, self.record_lock.lock(book_id):
|
||||
try:
|
||||
self.copy_format_to(book_id, fmt, pt)
|
||||
except NoSuchFormat:
|
||||
return None
|
||||
ret = pt.name
|
||||
elif as_file:
|
||||
ret = SpooledTemporaryFile(SPOOL_SIZE)
|
||||
with self.record_lock.lock(book_id):
|
||||
try:
|
||||
self.copy_format_to(book_id, fmt, ret)
|
||||
except NoSuchFormat:
|
||||
return None
|
||||
ret.seek(0)
|
||||
# Various bits of code try to use the name as the default
|
||||
# title when reading metadata, so set it
|
||||
ret.name = fname
|
||||
else:
|
||||
buf = BytesIO()
|
||||
with self.record_lock.lock(book_id):
|
||||
try:
|
||||
self.copy_format_to(book_id, fmt, buf)
|
||||
except NoSuchFormat:
|
||||
return None
|
||||
|
||||
ret = buf.getvalue()
|
||||
|
||||
return ret
|
||||
|
||||
@read_api
|
||||
def multisort(self, fields, ids_to_sort=None):
|
||||
|
@ -195,7 +195,7 @@ class ReadingTest(BaseTest):
|
||||
'Test cover() returns the same data for both backends'
|
||||
from calibre.library.database2 import LibraryDatabase2
|
||||
old = LibraryDatabase2(self.library_path)
|
||||
covers = {i: old.cover(i, index_is_id=True) for i in (1, 2, 3)}
|
||||
covers = {i: old.cover(i, index_is_id=True) for i in old.all_ids()}
|
||||
old = None
|
||||
cache = self.init_cache(self.library_path)
|
||||
for book_id, cdata in covers.iteritems():
|
||||
@ -303,6 +303,38 @@ class ReadingTest(BaseTest):
|
||||
|
||||
# }}}
|
||||
|
||||
def test_get_formats(self): # {{{
|
||||
'Test reading ebook formats using the format() method'
|
||||
from calibre.library.database2 import LibraryDatabase2
|
||||
old = LibraryDatabase2(self.library_path)
|
||||
ids = old.all_ids()
|
||||
lf = {i:set(old.formats(i, index_is_id=True).split(',')) if old.formats(
|
||||
i, index_is_id=True) else set() for i in ids}
|
||||
formats = {i:{f:old.format(i, f, index_is_id=True) for f in fmts} for
|
||||
i, fmts in lf.iteritems()}
|
||||
old = None
|
||||
cache = self.init_cache(self.library_path)
|
||||
for book_id, fmts in lf.iteritems():
|
||||
self.assertEqual(fmts, set(cache.formats(book_id)),
|
||||
'Set of formats is not the same')
|
||||
for fmt in fmts:
|
||||
old = formats[book_id][fmt]
|
||||
self.assertEqual(old, cache.format(book_id, fmt),
|
||||
'Old and new format disagree')
|
||||
f = cache.format(book_id, fmt, as_file=True)
|
||||
self.assertEqual(old, f.read(),
|
||||
'Failed to read format as file')
|
||||
with open(cache.format(book_id, fmt, as_path=True,
|
||||
preserve_filename=True), 'rb') as f:
|
||||
self.assertEqual(old, f.read(),
|
||||
'Failed to read format as path')
|
||||
with open(cache.format(book_id, fmt, as_path=True), 'rb') as f:
|
||||
self.assertEqual(old, f.read(),
|
||||
'Failed to read format as path')
|
||||
|
||||
|
||||
# }}}
|
||||
|
||||
def tests():
|
||||
return unittest.TestLoader().loadTestsFromTestCase(ReadingTest)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user