Implement create_book_entry(), with tests

This commit is contained in:
Kovid Goyal 2013-07-05 20:48:31 +05:30
parent e7bf1c7b7d
commit 9a8d31ee96
8 changed files with 223 additions and 20 deletions

View File

@ -9,14 +9,15 @@ __docformat__ = 'restructuredtext en'
SPOOL_SIZE = 30*1024*1024
def _get_next_series_num_for_list(series_indices):
def _get_next_series_num_for_list(series_indices, unwrap=True):
from calibre.utils.config_base import tweaks
from math import ceil, floor
if not series_indices:
if isinstance(tweaks['series_index_auto_increment'], (int, float)):
return float(tweaks['series_index_auto_increment'])
return 1.0
series_indices = [x[0] for x in series_indices]
if unwrap:
series_indices = [x[0] for x in series_indices]
if tweaks['series_index_auto_increment'] == 'next':
return floor(series_indices[-1]) + 1
if tweaks['series_index_auto_increment'] == 'first_free':

View File

@ -1088,7 +1088,7 @@ class DB(object):
def update_path(self, book_id, title, author, path_field, formats_field):
path = self.construct_path_name(book_id, title, author)
current_path = path_field.for_book(book_id)
current_path = path_field.for_book(book_id, default_value='')
formats = formats_field.for_book(book_id, default_value=())
fname = self.construct_file_name(book_id, title, author)
# Check if the metadata used to construct paths has changed

View File

@ -12,9 +12,10 @@ from io import BytesIO
from collections import defaultdict
from functools import wraps, partial
from calibre.constants import iswindows
from calibre import isbytestring
from calibre.constants import iswindows, preferred_encoding
from calibre.customize.ui import run_plugins_on_import, run_plugins_on_postimport
from calibre.db import SPOOL_SIZE
from calibre.db import SPOOL_SIZE, _get_next_series_num_for_list
from calibre.db.categories import get_categories
from calibre.db.locking import create_locks
from calibre.db.errors import NoSuchFormat
@ -24,12 +25,13 @@ from calibre.db.tables import VirtualTable
from calibre.db.write import get_series_values
from calibre.db.lazy import FormatMetadata, FormatsList
from calibre.ebooks import check_ebook_format
from calibre.ebooks.metadata import string_to_authors
from calibre.ebooks.metadata import string_to_authors, author_to_author_sort
from calibre.ebooks.metadata.book.base import Metadata
from calibre.ebooks.metadata.opf2 import metadata_to_opf
from calibre.ptempfile import (base_dir, PersistentTemporaryFile,
SpooledTemporaryFile)
from calibre.utils.date import now as nowf
from calibre.utils.config import prefs
from calibre.utils.date import now as nowf, utcnow, UNDEFINED_DATE
from calibre.utils.icu import sort_key
def api(f):
@ -65,6 +67,16 @@ def run_import_plugins(path_or_stream, fmt):
path = path_or_stream
return run_plugins_on_import(path, fmt)
def _add_newbook_tag(mi):
tags = prefs['new_book_tags']
if tags:
for tag in [t.strip() for t in tags]:
if tag:
if not mi.tags:
mi.tags = [tag]
elif tag not in mi.tags:
mi.tags.append(tag)
class Cache(object):
@ -1021,6 +1033,95 @@ class Cache(object):
self._update_last_modified(tuple(formats_map.iterkeys()))
@read_api
def get_next_series_num_for(self, series):
books = ()
sf = self.fields['series']
if series:
q = icu_lower(series)
for val, book_ids in sf.iter_searchable_values(self._get_metadata, frozenset(self.all_book_ids())):
if q == icu_lower(val):
books = book_ids
break
series_indices = sorted(self._field_for('series_index', book_id) for book_id in books)
return _get_next_series_num_for_list(tuple(series_indices), unwrap=False)
@read_api
def author_sort_from_authors(self, authors):
'''Given a list of authors, return the author_sort string for the authors,
preferring the author sort associated with the author over the computed
string. '''
table = self.fields['authors'].table
result = []
rmap = {icu_lower(v):k for k, v in table.id_map.iteritems()}
for aut in authors:
aid = rmap.get(icu_lower(aut), None)
result.append(author_to_author_sort(aut) if aid is None else table.asort_map[aid])
return ' & '.join(result)
@read_api
def has_book(self, mi):
title = mi.title
if title:
if isbytestring(title):
title = title.decode(preferred_encoding, 'replace')
q = icu_lower(title)
for title in self.fields['title'].table.book_col_map.itervalues():
if q == icu_lower(title):
return True
return False
@write_api
def create_book_entry(self, mi, cover=None, add_duplicates=True, force_id=None, apply_import_tags=True, preserve_uuid=False):
if mi.tags:
mi.tags = list(mi.tags)
if apply_import_tags:
_add_newbook_tag(mi)
if not add_duplicates and self._has_book(mi):
return
series_index = (self._get_next_series_num_for(mi.series) if mi.series_index is None else mi.series_index)
if not mi.authors:
mi.authors = (_('Unknown'),)
aus = mi.author_sort if mi.author_sort else self._author_sort_from_authors(mi.authors)
mi.title = mi.title or _('Unknown')
if isbytestring(aus):
aus = aus.decode(preferred_encoding, 'replace')
if isbytestring(mi.title):
mi.title = mi.title.decode(preferred_encoding, 'replace')
conn = self.backend.conn
if force_id is None:
conn.execute('INSERT INTO books(title, series_index, author_sort) VALUES (?, ?, ?)',
(mi.title, series_index, aus))
else:
conn.execute('INSERT INTO books(id, title, series_index, author_sort) VALUES (?, ?, ?, ?)',
(force_id, mi.title, series_index, aus))
book_id = conn.last_insert_rowid()
mi.timestamp = utcnow() if mi.timestamp is None else mi.timestamp
mi.pubdate = UNDEFINED_DATE if mi.pubdate is None else mi.pubdate
if cover is not None:
mi.cover, mi.cover_data = None, (None, cover)
self._set_metadata(book_id, mi, ignore_errors=True)
if preserve_uuid and mi.uuid:
self._set_field('uuid', {book_id:mi.uuid})
# Update the caches for fields from the books table
self.fields['size'].table.book_col_map[book_id] = 0
row = next(conn.execute('SELECT sort, series_index, author_sort, uuid, has_cover FROM books WHERE id=?', (book_id,)))
for field, val in zip(('sort', 'series_index', 'author_sort', 'uuid', 'cover'), row):
if field == 'cover':
val = bool(val)
elif field == 'uuid':
self.fields[field].table.uuid_to_id_map[val] = book_id
self.fields[field].table.book_col_map[book_id] = val
return book_id
@write_api
def add_books(self, books, add_duplicates=True):
duplicates, ids = [], []
for mi, format_map in books:
pass
# }}}
class SortKey(object): # {{{
@ -1038,3 +1139,4 @@ class SortKey(object): # {{{
# }}}

View File

@ -110,7 +110,7 @@ class UUIDTable(OneToOneTable):
def update_uuid_cache(self, book_id_val_map):
for book_id, uuid in book_id_val_map.iteritems():
self.uuid_to_id_map.pop(self.book_col_map[book_id], None) # discard old uuid
self.uuid_to_id_map.pop(self.book_col_map.get(book_id, None), None) # discard old uuid
self.uuid_to_id_map[uuid] = book_id
class CompositeTable(OneToOneTable):
@ -192,6 +192,11 @@ class AuthorsTable(ManyToManyTable):
author_to_author_sort(row[1]))
self.alink_map[row[0]] = row[3]
def set_sort_names(self, aus_map, db):
self.asort_map.update(aus_map)
db.conn.executemany('UPDATE authors SET sort=? WHERE id=?',
[(v, k) for k, v in aus_map.iteritems()])
class FormatsTable(ManyToManyTable):
def read_id_maps(self, db):

View File

@ -10,9 +10,11 @@ __docformat__ = 'restructuredtext en'
import os
from io import BytesIO
from tempfile import NamedTemporaryFile
from datetime import timedelta
from calibre.db.tests.base import BaseTest
from calibre.db.tests.base import BaseTest, IMG
from calibre.ptempfile import PersistentTemporaryFile
from calibre.utils.date import now, UNDEFINED_DATE
def import_test(replacement_data, replacement_fmt=None):
def func(path, fmt):
@ -138,4 +140,51 @@ class AddRemoveTest(BaseTest):
del db
# }}}
def test_create_book_entry(self): # {{{
'Test the creation of new book entries'
from calibre.ebooks.metadata.book.base import Metadata
cache = self.init_cache()
mi = Metadata('Created One', authors=('Creator One', 'Creator Two'))
book_id = cache.create_book_entry(mi)
self.assertIsNot(book_id, None)
def do_test(cache, book_id):
for field in ('path', 'uuid', 'author_sort', 'timestamp', 'pubdate', 'title', 'authors', 'series_index', 'sort'):
self.assertTrue(cache.field_for(field, book_id))
for field in ('size', 'cover'):
self.assertFalse(cache.field_for(field, book_id))
self.assertEqual(book_id, cache.fields['uuid'].table.uuid_to_id_map[cache.field_for('uuid', book_id)])
self.assertLess(now() - cache.field_for('timestamp', book_id), timedelta(seconds=30))
self.assertEqual(('Created One', ('Creator One', 'Creator Two')), (cache.field_for('title', book_id), cache.field_for('authors', book_id)))
self.assertEqual(cache.field_for('series_index', book_id), 1.0)
self.assertEqual(cache.field_for('pubdate', book_id), UNDEFINED_DATE)
do_test(cache, book_id)
# Test that the db contains correct data
cache = self.init_cache()
do_test(cache, book_id)
self.assertIs(None, cache.create_book_entry(mi, add_duplicates=False), 'Duplicate added incorrectly')
book_id = cache.create_book_entry(mi, cover=IMG)
self.assertIsNot(book_id, None)
self.assertEqual(IMG, cache.cover(book_id))
import calibre.db.cache as c
orig = c.prefs
c.prefs = {'new_book_tags':('newbook', 'newbook2')}
try:
book_id = cache.create_book_entry(mi)
self.assertEqual(('newbook', 'newbook2'), cache.field_for('tags', book_id))
mi.tags = ('one', 'two')
book_id = cache.create_book_entry(mi)
self.assertEqual(('one', 'two') + ('newbook', 'newbook2'), cache.field_for('tags', book_id))
mi.tags = ()
finally:
c.prefs = orig
mi.uuid = 'a preserved uuid'
book_id = cache.create_book_entry(mi, preserve_uuid=True)
self.assertEqual(mi.uuid, cache.field_for('uuid', book_id))
# }}}

View File

@ -14,6 +14,8 @@ from future_builtins import map
rmtree = partial(shutil.rmtree, ignore_errors=True)
IMG = b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01\x00`\x00`\x00\x00\xff\xe1\x00\x16Exif\x00\x00II*\x00\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xdb\x00C\x00\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\xff\xdb\x00C\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\xff\xc0\x00\x11\x08\x00\x01\x00\x01\x03\x01"\x00\x02\x11\x01\x03\x11\x01\xff\xc4\x00\x15\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\n\xff\xc4\x00\x14\x10\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xc4\x00\x14\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xc4\x00\x14\x11\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xda\x00\x0c\x03\x01\x00\x02\x11\x03\x11\x00?\x00\xbf\x80\x01\xff\xd9' # noqa {{{ }}}
class BaseTest(unittest.TestCase):
longMessage = True

View File

@ -15,7 +15,7 @@ from calibre.db.tests.base import BaseTest
class ReadingTest(BaseTest):
def test_read(self): # {{{
def test_read(self): # {{{
'Test the reading of data from the database'
cache = self.init_cache(self.library_path)
tests = {
@ -123,7 +123,7 @@ class ReadingTest(BaseTest):
book_id, field, expected_val, val))
# }}}
def test_sorting(self): # {{{
def test_sorting(self): # {{{
'Test sorting'
cache = self.init_cache(self.library_path)
for field, order in {
@ -165,7 +165,7 @@ class ReadingTest(BaseTest):
('title', True)]), 'Subsort failed')
# }}}
def test_get_metadata(self): # {{{
def test_get_metadata(self): # {{{
'Test get_metadata() returns the same data for both backends'
from calibre.library.database2 import LibraryDatabase2
old = LibraryDatabase2(self.library_path)
@ -188,7 +188,7 @@ class ReadingTest(BaseTest):
self.compare_metadata(mi1, mi2)
# }}}
def test_get_cover(self): # {{{
def test_get_cover(self): # {{{
'Test cover() returns the same data for both backends'
from calibre.library.database2 import LibraryDatabase2
old = LibraryDatabase2(self.library_path)
@ -212,7 +212,7 @@ class ReadingTest(BaseTest):
# }}}
def test_searching(self): # {{{
def test_searching(self): # {{{
'Test searching returns the same data for both backends'
from calibre.library.database2 import LibraryDatabase2
old = LibraryDatabase2(self.library_path)
@ -267,7 +267,7 @@ class ReadingTest(BaseTest):
# }}}
def test_get_categories(self): # {{{
def test_get_categories(self): # {{{
'Check that get_categories() returns the same data for both backends'
from calibre.library.database2 import LibraryDatabase2
old = LibraryDatabase2(self.library_path)
@ -286,9 +286,9 @@ class ReadingTest(BaseTest):
oval, nval = getattr(old, attr), getattr(new, attr)
if (
(category in {'rating', '#rating'} and attr in {'id_set', 'sort'}) or
(category == 'series' and attr == 'sort') or # Sorting is wrong in old
(category == 'series' and attr == 'sort') or # Sorting is wrong in old
(category == 'identifiers' and attr == 'id_set') or
(category == '@Good Series') or # Sorting is wrong in old
(category == '@Good Series') or # Sorting is wrong in old
(category == 'news' and attr in {'count', 'id_set'}) or
(category == 'formats' and attr == 'id_set')
):
@ -306,7 +306,7 @@ class ReadingTest(BaseTest):
# }}}
def test_get_formats(self): # {{{
def test_get_formats(self): # {{{
'Test reading ebook formats using the format() method'
from calibre.library.database2 import LibraryDatabase2
from calibre.db.cache import NoSuchFormat
@ -343,3 +343,47 @@ class ReadingTest(BaseTest):
# }}}
def test_author_sort_for_authors(self): # {{{
'Test getting the author sort for authors from the db'
cache = self.init_cache()
table = cache.fields['authors'].table
table.set_sort_names({next(table.id_map.iterkeys()): 'Fake Sort'}, cache.backend)
authors = tuple(table.id_map.itervalues())
nval = cache.author_sort_from_authors(authors)
self.assertIn('Fake Sort', nval)
db = self.init_old()
self.assertEqual(db.author_sort_from_authors(authors), nval)
db.close()
del db
# }}}
def test_get_next_series_num(self): # {{{
'Test getting the next series number for a series'
cache = self.init_cache()
cache.set_field('series', {3:'test series'})
cache.set_field('series_index', {3:13})
table = cache.fields['series'].table
series = tuple(table.id_map.itervalues())
nvals = {s:cache.get_next_series_num_for(s) for s in series}
db = self.init_old()
self.assertEqual({s:db.get_next_series_num_for(s) for s in series}, nvals)
db.close()
# }}}
def test_has_book(self): # {{{
'Test detecting duplicates'
from calibre.ebooks.metadata.book.base import Metadata
cache = self.init_cache()
db = self.init_old()
for title in cache.fields['title'].table.book_col_map.itervalues():
for x in (db, cache):
self.assertTrue(x.has_book(Metadata(title)))
self.assertTrue(x.has_book(Metadata(title.upper())))
self.assertFalse(x.has_book(Metadata(title + 'XXX')))
self.assertFalse(x.has_book(Metadata(title[:1])))
db.close()
# }}}

View File

@ -13,7 +13,7 @@ from io import BytesIO
from calibre.ebooks.metadata import author_to_author_sort
from calibre.utils.date import UNDEFINED_DATE
from calibre.db.tests.base import BaseTest
from calibre.db.tests.base import BaseTest, IMG
class WritingTest(BaseTest):
@ -364,8 +364,8 @@ class WritingTest(BaseTest):
ae(cache.field_for('cover', 1), 1)
ae(cache.set_cover({1:None}), set([1]))
ae(cache.field_for('cover', 1), 0)
img = IMG
img = b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01\x00`\x00`\x00\x00\xff\xe1\x00\x16Exif\x00\x00II*\x00\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xdb\x00C\x00\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\xff\xdb\x00C\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\xff\xc0\x00\x11\x08\x00\x01\x00\x01\x03\x01"\x00\x02\x11\x01\x03\x11\x01\xff\xc4\x00\x15\x00\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\n\xff\xc4\x00\x14\x10\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xc4\x00\x14\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xc4\x00\x14\x11\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xda\x00\x0c\x03\x01\x00\x02\x11\x03\x11\x00?\x00\xbf\x80\x01\xff\xd9' # noqa {{{ }}}
# Test setting a cover
ae(cache.set_cover({bid:img for bid in (1, 2, 3)}), {1, 2, 3})
old = self.init_old()