Allow serialized, multi-threaded access to the database. May have introduced regressions.

This commit is contained in:
Kovid Goyal 2008-10-27 12:31:41 -07:00
parent 754c0c63c1
commit 0f6cbc5109
9 changed files with 422 additions and 159 deletions

View File

@ -9,7 +9,7 @@ from urllib import urlopen, quote
from calibre import setup_cli_handlers
from calibre.utils.config import OptionParser
from calibre.ebooks.metadata import MetaInformation
from calibre.ebooks.metadata import MetaInformation, authors_to_sort_string
from calibre.ebooks.BeautifulSoup import BeautifulStoneSoup
BASE_URL = 'http://isbndb.com/api/books.xml?access_key=%(key)s&page_number=1&results=subjects,authors,texts&'
@ -64,7 +64,8 @@ class ISBNDBMetadata(MetaInformation):
try:
self.author_sort = book.find('authors').find('person').string
except:
pass
if self.authors:
self.author_sort = authors_to_sort_string(self.authors)
self.publisher = book.find('publishertext').string
summ = book.find('summary')

View File

@ -411,13 +411,12 @@ def get_metadata(stream):
if mr.book_header.exth is None:
mi = MetaInformation(mr.name, ['Unknown'])
else:
tdir = tempfile.mkdtemp('mobi-meta', __appname__)
tdir = tempfile.mkdtemp('_mobi_meta', __appname__)
atexit.register(shutil.rmtree, tdir)
mr.extract_images([], tdir)
mi = mr.create_opf('dummy.html')
if mi.cover:
cover = os.path.join(tdir, mi.cover)
print cover
if os.access(cover, os.R_OK):
mi.cover_data = ('JPEG', open(os.path.join(tdir, mi.cover), 'rb').read())
return mi

View File

@ -1178,6 +1178,7 @@ in which you want to store your books files. Any existing books will be automati
try:
self.olddb = LibraryDatabase(self.database_path)
except:
traceback.print_exc()
self.olddb = None

View File

@ -51,7 +51,7 @@ XML_TEMPLATE = '''\
<cover py:if="record['cover']">${record['cover']}</cover>
<formats py:if="record['formats']">
<py:for each="path in record['formats']">
<format>$path</format>
<format>${path}</format>
</py:for>
</formats>
</record>

View File

@ -30,11 +30,21 @@ class Concatenate(object):
if self.sep:
return self.ans[:-len(self.sep)]
return self.ans
class Connection(sqlite.Connection):
def get(self, *args, **kw):
ans = self.execute(*args)
if not kw.get('all', True):
ans = ans.fetchone()
if not ans:
ans = [None]
return ans[0]
return ans.fetchall()
def _connect(path):
if isinstance(path, unicode):
path = path.encode('utf-8')
conn = sqlite.connect(path, detect_types=sqlite.PARSE_DECLTYPES|sqlite.PARSE_COLNAMES)
conn = sqlite.connect(path, factory=Connection, detect_types=sqlite.PARSE_DECLTYPES|sqlite.PARSE_COLNAMES)
conn.row_factory = lambda cursor, row : list(row)
conn.create_aggregate('concat', 1, Concatenate)
title_pat = re.compile('^(A|The|An)\s+', re.IGNORECASE)
@ -812,11 +822,11 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
def user_version():
doc = 'The user version of this database'
def fget(self):
return self.conn.execute('pragma user_version;').next()[0]
return self.conn.get('pragma user_version;', all=False)
return property(doc=doc, fget=fget)
def is_empty(self):
return not self.conn.execute('SELECT id FROM books LIMIT 1').fetchone()
return not self.conn.get('SELECT id FROM books LIMIT 1', all=False)
def refresh(self, sort_field, ascending):
'''
@ -846,14 +856,14 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
else:
sort += ',title '+order
self.cache = self.conn.execute('SELECT * from meta ORDER BY '+sort).fetchall()
self.cache = self.conn.get('SELECT * from meta ORDER BY '+sort)
self.data = self.cache
self.conn.commit()
def refresh_ids(self, ids):
indices = map(self.index, ids)
for id, idx in zip(ids, indices):
row = self.conn.execute('SELECT * from meta WHERE id=?', (id,)).fetchone()
row = self.conn.get('SELECT * from meta WHERE id=?', (id,), all=False)
self.data[idx] = row
return indices
@ -905,7 +915,7 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
if not index_is_id:
return self.data[index][1]
try:
return self.conn.execute('SELECT title FROM meta WHERE id=?',(index,)).fetchone()[0]
return self.conn.get('SELECT title FROM meta WHERE id=?',(index,), all=False)
except:
return _('Unknown')
@ -917,73 +927,69 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
if not index_is_id:
return self.data[index][2]
try:
return self.conn.execute('SELECT authors FROM meta WHERE id=?',(index,)).fetchone()[0]
return self.conn.get('SELECT authors FROM meta WHERE id=?',(index,), all=False)
except:
pass
def isbn(self, idx, index_is_id=False):
id = idx if index_is_id else self.id(idx)
return self.conn.execute('SELECT isbn FROM books WHERE id=?',(id,)).fetchone()[0]
return self.conn.get('SELECT isbn FROM books WHERE id=?',(id,), all=False)
def author_sort(self, index, index_is_id=False):
id = index if index_is_id else self.id(index)
return self.conn.execute('SELECT author_sort FROM books WHERE id=?', (id,)).fetchone()[0]
return self.conn.get('SELECT author_sort FROM books WHERE id=?', (id,), all=False)
def publisher(self, index, index_is_id=False):
if index_is_id:
return self.conn.execute('SELECT publisher FROM meta WHERE id=?', (index,)).fetchone()[0]
return self.conn.get('SELECT publisher FROM meta WHERE id=?', (index,), all=False)
return self.data[index][3]
def rating(self, index, index_is_id=False):
if index_is_id:
return self.conn.execute('SELECT rating FROM meta WHERE id=?', (index,)).fetchone()[0]
return self.conn.get('SELECT rating FROM meta WHERE id=?', (index,), all=False)
return self.data[index][4]
def timestamp(self, index, index_is_id=False):
if index_is_id:
return self.conn.execute('SELECT timestamp FROM meta WHERE id=?', (index,)).fetchone()[0]
return self.conn.get('SELECT timestamp FROM meta WHERE id=?', (index,), all=False)
return self.data[index][5]
def max_size(self, index, index_is_id=False):
if index_is_id:
return self.conn.execute('SELECT size FROM meta WHERE id=?', (index,)).fetchone()[0]
return self.conn.get('SELECT size FROM meta WHERE id=?', (index,), all=False)
return self.data[index][6]
def cover(self, index, index_is_id=False):
'''Cover as a data string or None'''
id = index if index_is_id else self.id(index)
data = self.conn.execute('SELECT data FROM covers WHERE book=?', (id,)).fetchone()
if not data or not data[0]:
data = self.conn.get('SELECT data FROM covers WHERE book=?', (id,), all=False)
if not data:
return None
return(decompress(data[0]))
return(decompress(data))
def tags(self, index, index_is_id=False):
'''tags as a comma separated list or None'''
id = index if index_is_id else self.id(index)
matches = self.conn.execute('SELECT concat(name) FROM tags WHERE tags.id IN (SELECT tag from books_tags_link WHERE book=?)', (id,)).fetchall()
matches = self.conn.get('SELECT concat(name) FROM tags WHERE tags.id IN (SELECT tag from books_tags_link WHERE book=?)', (id,))
if not matches or not matches[0][0]:
return None
matches = [t.lower().strip() for t in matches[0][0].split(',')]
return ','.join(matches)
def series_id(self, index, index_is_id=False):
id = index if index_is_id else self.id(index)
ans= self.conn.execute('SELECT series from books_series_link WHERE book=?', (id,)).fetchone()
if ans:
return ans[0]
id = index if index_is_id else self.id(index)
return self.conn.get('SELECT series from books_series_link WHERE book=?', (id,), all=False)
def series(self, index, index_is_id=False):
id = self.series_id(index, index_is_id)
ans = self.conn.execute('SELECT name from series WHERE id=?', (id,)).fetchone()
if ans:
return ans[0]
return self.conn.get('SELECT name from series WHERE id=?', (id,), all=False)
def series_index(self, index, index_is_id=False):
ans = None
if not index_is_id:
ans = self.data[index][10]
else:
ans = self.conn.execute('SELECT series_index FROM books WHERE id=?', (index,)).fetchone()[0]
ans = self.conn.get('SELECT series_index FROM books WHERE id=?', (index,), all=False)
try:
return int(ans)
except:
@ -994,8 +1000,8 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
Return an ordered list of all books in the series.
The list contains book ids.
'''
ans = self.conn.execute('SELECT book from books_series_link WHERE series=?',
(series_id,)).fetchall()
ans = self.conn.get('SELECT book from books_series_link WHERE series=?',
(series_id,))
if not ans:
return []
ans = [id[0] for id in ans]
@ -1014,41 +1020,35 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
def comments(self, index, index_is_id=False):
'''Comments as string or None'''
id = index if index_is_id else self.id(index)
matches = self.conn.execute('SELECT text FROM comments WHERE book=?', (id,)).fetchall()
if not matches:
return None
return matches[0][0]
return self.conn.get('SELECT text FROM comments WHERE book=?', (id,), all=False)
def formats(self, index, index_is_id=False):
''' Return available formats as a comma separated list '''
id = index if index_is_id else self.id(index)
matches = self.conn.execute('SELECT concat(format) FROM data WHERE data.book=?', (id,)).fetchall()
if not matches:
return None
return matches[0][0]
return self.conn.get('SELECT concat(format) FROM data WHERE data.book=?', (id,), all=False)
def sizeof_format(self, index, format, index_is_id=False):
''' Return size of C{format} for book C{index} in bytes'''
id = index if index_is_id else self.id(index)
format = format.upper()
return self.conn.execute('SELECT uncompressed_size FROM data WHERE data.book=? AND data.format=?', (id, format)).fetchone()[0]
return self.conn.get('SELECT uncompressed_size FROM data WHERE data.book=? AND data.format=?', (id, format), all=False)
def format(self, index, format, index_is_id=False):
id = index if index_is_id else self.id(index)
return decompress(self.conn.execute('SELECT data FROM data WHERE book=? AND format=?', (id, format)).fetchone()[0])
return decompress(self.conn.get('SELECT data FROM data WHERE book=? AND format=?', (id, format), all=False))
def all_series(self):
return [ (i[0], i[1]) for i in \
self.conn.execute('SELECT id, name FROM series').fetchall()]
self.conn.get('SELECT id, name FROM series')]
def all_tags(self):
return [i[0].strip() for i in self.conn.execute('SELECT name FROM tags').fetchall() if i[0].strip()]
return [i[0].strip() for i in self.conn.get('SELECT name FROM tags') if i[0].strip()]
def conversion_options(self, id, format):
data = self.conn.execute('SELECT data FROM conversion_options WHERE book=? AND format=?', (id, format.upper())).fetchone()
data = self.conn.get('SELECT data FROM conversion_options WHERE book=? AND format=?', (id, format.upper()), all=False)
if data:
return cPickle.loads(str(data[0]))
return cPickle.loads(str(data))
return None
@ -1108,9 +1108,9 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
def set_conversion_options(self, id, format, options):
data = sqlite.Binary(cPickle.dumps(options, -1))
oid = self.conn.execute('SELECT id FROM conversion_options WHERE book=? AND format=?', (id, format.upper())).fetchone()
oid = self.conn.get('SELECT id FROM conversion_options WHERE book=? AND format=?', (id, format.upper()), all=False)
if oid:
self.conn.execute('UPDATE conversion_options SET data=? WHERE id=?', (data, oid[0]))
self.conn.execute('UPDATE conversion_options SET data=? WHERE id=?', (data, oid))
else:
self.conn.execute('INSERT INTO conversion_options(book,format,data) VALUES (?,?,?)', (id,format.upper(),data))
self.conn.commit()
@ -1125,9 +1125,9 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
if not a:
continue
a = a.strip()
author = self.conn.execute('SELECT id from authors WHERE name=?', (a,)).fetchone()
author = self.conn.get('SELECT id from authors WHERE name=?', (a,), all=False)
if author:
aid = author[0]
aid = author
# Handle change of case
self.conn.execute('UPDATE authors SET name=? WHERE id=?', (a, aid))
else:
@ -1155,9 +1155,9 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
def set_publisher(self, id, publisher):
self.conn.execute('DELETE FROM books_publishers_link WHERE book=?',(id,))
if publisher:
pub = self.conn.execute('SELECT id from publishers WHERE name=?', (publisher,)).fetchone()
pub = self.conn.get('SELECT id from publishers WHERE name=?', (publisher,), all=False)
if pub:
aid = pub[0]
aid = pub
else:
aid = self.conn.execute('INSERT INTO publishers(name) VALUES (?)', (publisher,)).lastrowid
self.conn.execute('INSERT INTO books_publishers_link(book, publisher) VALUES (?,?)', (id, aid))
@ -1169,15 +1169,14 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
self.conn.commit()
def is_tag_used(self, tag):
id = self.conn.execute('SELECT id FROM tags WHERE name=?', (tag,)).fetchone()
id = self.conn.get('SELECT id FROM tags WHERE name=?', (tag,), all=False)
if not id:
return False
return bool(self.conn.execute('SELECT tag FROM books_tags_link WHERE tag=?',(id[0],)).fetchone())
return bool(self.conn.get('SELECT tag FROM books_tags_link WHERE tag=?',(id,), all=False))
def delete_tag(self, tag):
id = self.conn.execute('SELECT id FROM tags WHERE name=?', (tag,)).fetchone()
id = self.conn.get('SELECT id FROM tags WHERE name=?', (tag,), all=False)
if id:
id = id[0]
self.conn.execute('DELETE FROM books_tags_link WHERE tag=?', (id,))
self.conn.execute('DELETE FROM tags WHERE id=?', (id,))
self.conn.commit()
@ -1188,9 +1187,9 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
def unapply_tags(self, book_id, tags):
for tag in tags:
id = self.conn.execute('SELECT id FROM tags WHERE name=?', (tag,)).fetchone()
id = self.conn.get('SELECT id FROM tags WHERE name=?', (tag,), all=False)
if id:
self.conn.execute('DELETE FROM books_tags_link WHERE tag=? AND book=?', (id[0], book_id))
self.conn.execute('DELETE FROM books_tags_link WHERE tag=? AND book=?', (id, book_id))
self.conn.commit()
def set_tags(self, id, tags, append=False):
@ -1204,14 +1203,14 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
tag = tag.lower().strip()
if not tag:
continue
t = self.conn.execute('SELECT id FROM tags WHERE name=?', (tag,)).fetchone()
t = self.conn.get('SELECT id FROM tags WHERE name=?', (tag,), all=False)
if t:
tid = t[0]
tid = t
else:
tid = self.conn.execute('INSERT INTO tags(name) VALUES(?)', (tag,)).lastrowid
if not self.conn.execute('SELECT book FROM books_tags_link WHERE book=? AND tag=?',
(id, tid)).fetchone():
if not self.conn.get('SELECT book FROM books_tags_link WHERE book=? AND tag=?',
(id, tid), all=False):
self.conn.execute('INSERT INTO books_tags_link(book, tag) VALUES (?,?)',
(id, tid))
self.conn.commit()
@ -1220,9 +1219,9 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
def set_series(self, id, series):
self.conn.execute('DELETE FROM books_series_link WHERE book=?',(id,))
if series:
s = self.conn.execute('SELECT id from series WHERE name=?', (series,)).fetchone()
s = self.conn.get('SELECT id from series WHERE name=?', (series,), all=False)
if s:
aid = s[0]
aid = s
else:
aid = self.conn.execute('INSERT INTO series(name) VALUES (?)', (series,)).lastrowid
self.conn.execute('INSERT INTO books_series_link(book, series) VALUES (?,?)', (id, aid))
@ -1232,8 +1231,8 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
self.data[row][9] = series
def remove_unused_series(self):
for id, in self.conn.execute('SELECT id FROM series').fetchall():
if not self.conn.execute('SELECT id from books_series_link WHERE series=?', (id,)).fetchone():
for id, in self.conn.get('SELECT id FROM series'):
if not self.conn.get('SELECT id from books_series_link WHERE series=?', (id,)):
self.conn.execute('DELETE FROM series WHERE id=?', (id,))
self.conn.commit()
@ -1248,8 +1247,8 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
def set_rating(self, id, rating):
rating = int(rating)
self.conn.execute('DELETE FROM books_ratings_link WHERE book=?',(id,))
rat = self.conn.execute('SELECT id FROM ratings WHERE rating=?', (rating,)).fetchone()
rat = rat[0] if rat else self.conn.execute('INSERT INTO ratings(rating) VALUES (?)', (rating,)).lastrowid
rat = self.conn.get('SELECT id FROM ratings WHERE rating=?', (rating,), all=False)
rat = rat if rat else self.conn.execute('INSERT INTO ratings(rating) VALUES (?)', (rating,)).lastrowid
self.conn.execute('INSERT INTO books_ratings_link(book, rating) VALUES (?,?)', (id, rat))
self.conn.commit()
@ -1336,7 +1335,7 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
return i
def get_feeds(self):
feeds = self.conn.execute('SELECT title, script FROM feeds').fetchall()
feeds = self.conn.get('SELECT title, script FROM feeds')
for title, script in feeds:
yield title, script
@ -1386,7 +1385,7 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
self.conn.commit()
def all_ids(self):
return [i[0] for i in self.conn.execute('SELECT id FROM books').fetchall()]
return [i[0] for i in self.conn.get('SELECT id FROM books')]
def export_to_dir(self, dir, indices, byauthor=False, single_dir=False,
index_is_id=False):
@ -1395,8 +1394,8 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
by_author = {}
for index in indices:
id = index if index_is_id else self.id(index)
au = self.conn.execute('SELECT author_sort FROM books WHERE id=?',
(id,)).fetchone()[0]
au = self.conn.get('SELECT author_sort FROM books WHERE id=?',
(id,), all=False)
if not au:
au = self.authors(index, index_is_id=index_is_id)
if not au:
@ -1540,10 +1539,10 @@ ALTER TABLE books ADD COLUMN isbn TEXT DEFAULT "" COLLATE NOCASE;
def has_book(self, mi):
return bool(self.conn.execute('SELECT id FROM books where title=?', (mi.title,)).fetchone())
return bool(self.conn.get('SELECT id FROM books where title=?', (mi.title,), all=False))
def has_id(self, id):
return self.conn.execute('SELECT id FROM books where id=?', (id,)).fetchone() is not None
return self.conn.get('SELECT id FROM books where id=?', (id,), all=False) is not None
def recursive_import(self, root, single_book_per_directory=True):
root = os.path.abspath(root)

View File

@ -8,7 +8,6 @@ The database used to store ebook metadata
'''
import os, re, sys, shutil, cStringIO, glob, collections, textwrap, \
operator, itertools, functools, traceback
import sqlite3 as sqlite
from itertools import repeat
from PyQt4.QtCore import QCoreApplication, QThread, QReadWriteLock
@ -16,9 +15,11 @@ from PyQt4.QtGui import QApplication, QPixmap, QImage
__app = None
from calibre.library.database import LibraryDatabase
from calibre.library.sqlite import connect, IntegrityError
from calibre.ebooks.metadata import string_to_authors, authors_to_string
from calibre.constants import preferred_encoding, iswindows, isosx
copyfile = os.link if hasattr(os, 'link') else shutil.copyfile
filesystem_encoding = sys.getfilesystemencoding()
if filesystem_encoding is None: filesystem_encoding = 'utf-8'
@ -157,23 +158,6 @@ class CoverCache(QThread):
self.load_queue.appendleft(id)
self.load_queue_lock.unlock()
class Concatenate(object):
'''String concatenation aggregator for sqlite'''
def __init__(self, sep=','):
self.sep = sep
self.ans = ''
def step(self, value):
if value is not None:
self.ans += value + self.sep
def finalize(self):
if not self.ans:
return None
if self.sep:
return self.ans[:-len(self.sep)]
return self.ans
class ResultCache(object):
'''
@ -226,7 +210,7 @@ class ResultCache(object):
def refresh_ids(self, conn, ids):
for id in ids:
self._data[id] = conn.execute('SELECT * from meta WHERE id=?', (id,)).fetchone()
self._data[id] = conn.get('SELECT * from meta WHERE id=?', (id,))[0]
return map(self.row, ids)
def books_added(self, ids, conn):
@ -234,7 +218,7 @@ class ResultCache(object):
return
self._data.extend(repeat(None, max(ids)-len(self._data)+2))
for id in ids:
self._data[id] = conn.execute('SELECT * from meta WHERE id=?', (id,)).fetchone()
self._data[id] = conn.get('SELECT * from meta WHERE id=?', (id,))[0]
self._map[0:0] = ids
self._map_filtered[0:0] = ids
@ -246,7 +230,7 @@ class ResultCache(object):
# Fast mapping from sorted, filtered row numbers to ids
# At the moment it is the same as self._map
self._map_filtered = list(self._map)
temp = db.conn.execute('SELECT * FROM meta').fetchall()
temp = db.conn.get('SELECT * FROM meta')
# Fast mapping from ids to data.
# Can be None for ids that dont exist (i.e. have been deleted)
self._data = list(itertools.repeat(None, temp[-1][0]+2)) if temp else []
@ -285,53 +269,53 @@ class ResultCache(object):
self._map_filtered.remove(id)
def sort_on_title(self, order, db):
return db.conn.execute('SELECT id FROM books ORDER BY sort ' + order).fetchall()
return db.conn.get('SELECT id FROM books ORDER BY sort ' + order)
def sort_on_author_sort(self, order, db):
return db.conn.execute('SELECT id FROM books ORDER BY author_sort,sort ' + order).fetchall()
return db.conn.get('SELECT id FROM books ORDER BY author_sort,sort ' + order)
def sort_on_timestamp(self, order, db):
return db.conn.execute('SELECT id FROM books ORDER BY id ' + order).fetchall()
return db.conn.get('SELECT id FROM books ORDER BY id ' + order)
def sort_on_publisher(self, order, db):
no_publisher = db.conn.execute('SELECT id FROM books WHERE books.id NOT IN (SELECT book FROM books_publishers_link) ORDER BY books.sort').fetchall()
no_publisher = db.conn.get('SELECT id FROM books WHERE books.id NOT IN (SELECT book FROM books_publishers_link) ORDER BY books.sort')
ans = []
for r in db.conn.execute('SELECT id FROM publishers ORDER BY name '+order).fetchall():
for r in db.conn.get('SELECT id FROM publishers ORDER BY name '+order):
publishers_id = r[0]
ans += db.conn.execute('SELECT id FROM books WHERE books.id IN (SELECT book FROM books_publishers_link WHERE publisher=?) ORDER BY books.sort '+order, (publishers_id,)).fetchall()
ans += db.conn.get('SELECT id FROM books WHERE books.id IN (SELECT book FROM books_publishers_link WHERE publisher=?) ORDER BY books.sort '+order, (publishers_id,))
ans = (no_publisher + ans) if order == 'ASC' else (ans + no_publisher)
return ans
def sort_on_size(self, order, db):
return db.conn.execute('SELECT id FROM meta ORDER BY size ' + order).fetchall()
return db.conn.get('SELECT id FROM meta ORDER BY size ' + order)
def sort_on_rating(self, order, db):
no_rating = db.conn.execute('SELECT id FROM books WHERE books.id NOT IN (SELECT book FROM books_ratings_link) ORDER BY books.sort').fetchall()
no_rating = db.conn.get('SELECT id FROM books WHERE books.id NOT IN (SELECT book FROM books_ratings_link) ORDER BY books.sort')
ans = []
for r in db.conn.execute('SELECT id FROM ratings ORDER BY rating '+order).fetchall():
for r in db.conn.get('SELECT id FROM ratings ORDER BY rating '+order):
ratings_id = r[0]
ans += db.conn.execute('SELECT id FROM books WHERE books.id IN (SELECT book FROM books_ratings_link WHERE rating=?) ORDER BY books.sort', (ratings_id,)).fetchall()
ans += db.conn.get('SELECT id FROM books WHERE books.id IN (SELECT book FROM books_ratings_link WHERE rating=?) ORDER BY books.sort', (ratings_id,))
ans = (no_rating + ans) if order == 'ASC' else (ans + no_rating)
return ans
def sort_on_series(self, order, db):
no_series = db.conn.execute('SELECT id FROM books WHERE books.id NOT IN (SELECT book FROM books_series_link) ORDER BY books.sort').fetchall()
no_series = db.conn.get('SELECT id FROM books WHERE books.id NOT IN (SELECT book FROM books_series_link) ORDER BY books.sort')
ans = []
for r in db.conn.execute('SELECT id FROM series ORDER BY name '+order).fetchall():
for r in db.conn.get('SELECT id FROM series ORDER BY name '+order):
series_id = r[0]
ans += db.conn.execute('SELECT id FROM books WHERE books.id IN (SELECT book FROM books_series_link WHERE series=?) ORDER BY books.series_index,books.id '+order, (series_id,)).fetchall()
ans += db.conn.get('SELECT id FROM books WHERE books.id IN (SELECT book FROM books_series_link WHERE series=?) ORDER BY books.series_index,books.id '+order, (series_id,))
ans = (no_series + ans) if order == 'ASC' else (ans + no_series)
return ans
def sort_on_tags(self, order, db):
no_tags = db.conn.execute('SELECT id FROM books WHERE books.id NOT IN (SELECT book FROM books_tags_link) ORDER BY books.sort').fetchall()
no_tags = db.conn.get('SELECT id FROM books WHERE books.id NOT IN (SELECT book FROM books_tags_link) ORDER BY books.sort')
ans = []
for r in db.conn.execute('SELECT id FROM tags ORDER BY name '+order).fetchall():
for r in db.conn.get('SELECT id FROM tags ORDER BY name '+order):
tag_id = r[0]
ans += db.conn.execute('SELECT id FROM books WHERE books.id IN (SELECT book FROM books_tags_link WHERE tag=?) ORDER BY books.sort '+order, (tag_id,)).fetchall()
ans += db.conn.get('SELECT id FROM books WHERE books.id IN (SELECT book FROM books_tags_link WHERE tag=?) ORDER BY books.sort '+order, (tag_id,))
ans = (no_tags + ans) if order == 'ASC' else (ans + no_tags)
return ans
@ -353,36 +337,25 @@ class LibraryDatabase2(LibraryDatabase):
@apply
def user_version():
doc = 'The user version of this database'
def fget(self):
return self.conn.execute('pragma user_version;').next()[0]
return self.conn.get('pragma user_version;', all=False)
def fset(self, val):
self.conn.execute('pragma user_version=%d'%int(val))
self.conn.commit()
return property(doc=doc, fget=fget, fset=fset)
def connect(self):
if 'win32' in sys.platform and len(self.library_path) + 4*self.PATH_LIMIT + 10 > 259:
raise ValueError('Path to library too long. Must be less than %d characters.'%(259-4*self.PATH_LIMIT-10))
exists = os.path.exists(self.dbpath)
self.conn = sqlite.connect(self.dbpath,
detect_types=sqlite.PARSE_DECLTYPES|sqlite.PARSE_COLNAMES)
self.conn = connect(self.dbpath, self.row_factory)
if exists and self.user_version == 0:
self.conn.close()
os.remove(self.dbpath)
self.conn = sqlite.connect(self.dbpath,
detect_types=sqlite.PARSE_DECLTYPES|sqlite.PARSE_COLNAMES)
self.conn.row_factory = sqlite.Row if self.row_factory else lambda cursor, row : list(row)
self.conn.create_aggregate('concat', 1, Concatenate)
title_pat = re.compile('^(A|The|An)\s+', re.IGNORECASE)
def title_sort(title):
match = title_pat.search(title)
if match:
prep = match.group(1)
title = title.replace(prep, '') + ', ' + prep
return title.strip()
self.conn.create_function('title_sort', 1, title_sort)
self.conn = connect(self.dbpath, self.row_factory)
if self.user_version == 0:
self.initialize_database()
@ -453,7 +426,7 @@ class LibraryDatabase2(LibraryDatabase):
def path(self, index, index_is_id=False):
'Return the relative path to the directory containing this books files as a unicode string.'
id = index if index_is_id else self.id(index)
path = self.conn.execute('SELECT path FROM books WHERE id=?', (id,)).fetchone()[0].replace('/', os.sep)
path = self.conn.get('SELECT path FROM books WHERE id=?', (id,), all=False).replace('/', os.sep)
return path
def abspath(self, index, index_is_id=False):
@ -503,7 +476,7 @@ class LibraryDatabase2(LibraryDatabase):
fname = self.construct_file_name(id)
changed = False
for format in formats:
name = self.conn.execute('SELECT name FROM data WHERE book=? AND format=?', (id, format)).fetchone()[0]
name = self.conn.get('SELECT name FROM data WHERE book=? AND format=?', (id, format), all=False)
if name and name != fname:
changed = True
break
@ -594,7 +567,7 @@ class LibraryDatabase2(LibraryDatabase):
p.save(path)
def all_formats(self):
formats = self.conn.execute('SELECT format from data').fetchall()
formats = self.conn.get('SELECT format from data')
if not formats:
return set([])
return set([f[0] for f in formats])
@ -604,8 +577,8 @@ class LibraryDatabase2(LibraryDatabase):
id = index if index_is_id else self.id(index)
path = os.path.join(self.library_path, self.path(id, index_is_id=True))
try:
formats = self.conn.execute('SELECT format FROM data WHERE book=?', (id,)).fetchall()
name = self.conn.execute('SELECT name FROM data WHERE book=?', (id,)).fetchone()[0]
formats = self.conn.get('SELECT format FROM data WHERE book=?', (id,))
name = self.conn.get('SELECT name FROM data WHERE book=?', (id,), all=False)
formats = map(lambda x:x[0], formats)
except:
return None
@ -621,7 +594,7 @@ class LibraryDatabase2(LibraryDatabase):
'Return absolute path to the ebook file of format `format`'
id = index if index_is_id else self.id(index)
path = os.path.join(self.library_path, self.path(id, index_is_id=True))
name = self.conn.execute('SELECT name FROM data WHERE book=? AND format=?', (id, format)).fetchone()[0]
name = self.conn.get('SELECT name FROM data WHERE book=? AND format=?', (id, format), all=False)
if name:
format = ('.' + format.lower()) if format else ''
path = os.path.join(path, name+format)
@ -645,7 +618,7 @@ class LibraryDatabase2(LibraryDatabase):
id = index if index_is_id else self.id(index)
if path is None:
path = os.path.join(self.library_path, self.path(id, index_is_id=True))
name = self.conn.execute('SELECT name FROM data WHERE book=? AND format=?', (id, format)).fetchone()
name = self.conn.get('SELECT name FROM data WHERE book=? AND format=?', (id, format), all=False)
if name:
self.conn.execute('DELETE FROM data WHERE book=? AND format=?', (id, format))
name = self.construct_file_name(id)
@ -682,7 +655,7 @@ class LibraryDatabase2(LibraryDatabase):
def remove_format(self, index, format, index_is_id=False):
id = index if index_is_id else self.id(index)
path = os.path.join(self.library_path, self.path(id, index_is_id=True))
name = self.conn.execute('SELECT name FROM data WHERE book=? AND format=?', (id, format)).fetchone()
name = self.conn.get('SELECT name FROM data WHERE book=? AND format=?', (id, format), all=False)
name = name[0] if name else False
if name:
ext = ('.' + format.lower()) if format else ''
@ -709,7 +682,7 @@ class LibraryDatabase2(LibraryDatabase):
def get_categories(self, sort_on_count=False):
categories = {}
def get(name, category, field='name'):
ans = self.conn.execute('SELECT DISTINCT %s FROM %s'%(field, name)).fetchall()
ans = self.conn.get('SELECT DISTINCT %s FROM %s'%(field, name))
ans = [x[0].strip() for x in ans]
try:
ans.remove('')
@ -718,16 +691,14 @@ class LibraryDatabase2(LibraryDatabase):
tags = categories[category]
if name != 'data':
for tag in tags:
id = self.conn.execute('SELECT id FROM %s WHERE %s=?'%(name, field), (tag,)).fetchone()
if id:
id = id[0]
id = self.conn.get('SELECT id FROM %s WHERE %s=?'%(name, field), (tag,), all=False)
tag.id = id
for tag in tags:
if tag.id is not None:
tag.count = self.conn.execute('SELECT COUNT(id) FROM books_%s_link WHERE %s=?'%(name, category), (tag.id,)).fetchone()[0]
tag.count = self.conn.get('SELECT COUNT(id) FROM books_%s_link WHERE %s=?'%(name, category), (tag.id,), all=False)
else:
for tag in tags:
tag.count = self.conn.execute('SELECT COUNT(format) FROM data WHERE format=?', (tag,)).fetchone()[0]
tag.count = self.conn.get('SELECT COUNT(format) FROM data WHERE format=?', (tag,), all=False)
tags.sort(reverse=sort_on_count, cmp=(lambda x,y:cmp(x.count,y.count)) if sort_on_count else cmp)
for x in (('authors', 'author'), ('tags', 'tag'), ('publishers', 'publisher'),
('series', 'series')):
@ -785,6 +756,8 @@ class LibraryDatabase2(LibraryDatabase):
self.set_tags(id, mi.tags, notify=False)
if mi.comments:
self.set_comment(id, mi.comments)
if mi.isbn and mi.isbn.strip():
self.set_isbn(id, mi.isbn)
self.set_path(id, True)
self.notify('metadata', [id])
@ -800,16 +773,16 @@ class LibraryDatabase2(LibraryDatabase):
a = a.strip().replace(',', '|')
if not isinstance(a, unicode):
a = a.decode(preferred_encoding, 'replace')
author = self.conn.execute('SELECT id from authors WHERE name=?', (a,)).fetchone()
author = self.conn.get('SELECT id from authors WHERE name=?', (a,), all=False)
if author:
aid = author[0]
aid = author
# Handle change of case
self.conn.execute('UPDATE authors SET name=? WHERE id=?', (a, aid))
else:
aid = self.conn.execute('INSERT INTO authors(name) VALUES (?)', (a,)).lastrowid
try:
self.conn.execute('INSERT INTO books_authors_link(book, author) VALUES (?,?)', (id, aid))
except sqlite.IntegrityError: # Sometimes books specify the same author twice in their metadata
except IntegrityError: # Sometimes books specify the same author twice in their metadata
pass
self.set_path(id, True)
self.notify('metadata', [id])
@ -829,9 +802,9 @@ class LibraryDatabase2(LibraryDatabase):
if publisher:
if not isinstance(publisher, unicode):
publisher = publisher.decode(preferred_encoding, 'replace')
pub = self.conn.execute('SELECT id from publishers WHERE name=?', (publisher,)).fetchone()
pub = self.conn.get('SELECT id from publishers WHERE name=?', (publisher,), all=False)
if pub:
aid = pub[0]
aid = pub
else:
aid = self.conn.execute('INSERT INTO publishers(name) VALUES (?)', (publisher,)).lastrowid
self.conn.execute('INSERT INTO books_publishers_link(book, publisher) VALUES (?,?)', (id, aid))
@ -852,14 +825,14 @@ class LibraryDatabase2(LibraryDatabase):
continue
if not isinstance(tag, unicode):
tag = tag.decode(preferred_encoding, 'replace')
t = self.conn.execute('SELECT id FROM tags WHERE name=?', (tag,)).fetchone()
t = self.conn.get('SELECT id FROM tags WHERE name=?', (tag,), all=False)
if t:
tid = t[0]
tid = t
else:
tid = self.conn.execute('INSERT INTO tags(name) VALUES(?)', (tag,)).lastrowid
if not self.conn.execute('SELECT book FROM books_tags_link WHERE book=? AND tag=?',
(id, tid)).fetchone():
if not self.conn.get('SELECT book FROM books_tags_link WHERE book=? AND tag=?',
(id, tid), all=False):
self.conn.execute('INSERT INTO books_tags_link(book, tag) VALUES (?,?)',
(id, tid))
self.conn.commit()
@ -872,9 +845,9 @@ class LibraryDatabase2(LibraryDatabase):
if series:
if not isinstance(series, unicode):
series = series.decode(preferred_encoding, 'replace')
s = self.conn.execute('SELECT id from series WHERE name=?', (series,)).fetchone()
s = self.conn.get('SELECT id from series WHERE name=?', (series,), all=False)
if s:
aid = s[0]
aid = s
else:
aid = self.conn.execute('INSERT INTO series(name) VALUES (?)', (series,)).lastrowid
self.conn.execute('INSERT INTO books_series_link(book, series) VALUES (?,?)', (id, aid))
@ -904,7 +877,7 @@ class LibraryDatabase2(LibraryDatabase):
def add_books(self, paths, formats, metadata, uris=[], add_duplicates=True):
'''
Add a book to the database. The result cache is not updated.
@param paths: List of paths to book files of file-like objects
@param paths: List of paths to book files or file-like objects
'''
formats, metadata, uris = iter(formats), iter(metadata), iter(uris)
duplicates = []
@ -965,7 +938,7 @@ class LibraryDatabase2(LibraryDatabase):
def move_library_to(self, newloc, progress=None):
header = _(u'<p>Copying books to %s<br><center>')%newloc
books = self.conn.execute('SELECT id, path, title FROM books').fetchall()
books = self.conn.get('SELECT id, path, title FROM books')
if progress is not None:
progress.setValue(0)
progress.setLabelText(header)
@ -1047,6 +1020,7 @@ class LibraryDatabase2(LibraryDatabase):
x['formats'].append(path%fmt.lower())
x['fmt_'+fmt.lower()] = path%fmt.lower()
x['available_formats'] = [i.upper() for i in formats.split(',')]
return data
def migrate_old(self, db, progress):
@ -1056,7 +1030,7 @@ class LibraryDatabase2(LibraryDatabase):
QCoreApplication.processEvents()
db.conn.row_factory = lambda cursor, row : tuple(row)
db.conn.text_factory = lambda x : unicode(x, 'utf-8', 'replace')
books = db.conn.execute('SELECT id, title, sort, timestamp, uri, series_index, author_sort, isbn FROM books ORDER BY id ASC').fetchall()
books = db.conn.get('SELECT id, title, sort, timestamp, uri, series_index, author_sort, isbn FROM books ORDER BY id ASC')
progress.setAutoReset(False)
progress.setRange(0, len(books))
@ -1072,7 +1046,7 @@ books_ratings_link
books_series_link feeds
'''.split()
for table in tables:
rows = db.conn.execute('SELECT * FROM %s ORDER BY id ASC'%table).fetchall()
rows = db.conn.get('SELECT * FROM %s ORDER BY id ASC'%table)
for row in rows:
self.conn.execute('INSERT INTO %s VALUES(%s)'%(table, ','.join(repeat('?', len(row)))), row)

View File

@ -0,0 +1,43 @@
#!/usr/bin/env python
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net'
__docformat__ = 'restructuredtext en'
'''
HTTP server for remote access to the calibre database.
'''
import sys
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
from calibre.constants import __version__
class Server(HTTPServer):
pass
class DBHandler(BaseHTTPRequestHandler):
server_version = 'calibre/'+__version__
def set_db(self, db):
self.db = db
def server(db, port=80):
server = Server(('', port), DBHandler)
def main(args=sys.argv):
from calibre.utils.config import prefs
from calibre.library.database2 import LibraryDatabase2
db = LibraryDatabase2(prefs['library_path'])
try:
print 'Starting server...'
s = server()
s.server_forever()
except KeyboardInterrupt:
print 'Server interrupted'
s.socket.close()
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -0,0 +1,156 @@
from __future__ import with_statement
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net'
__docformat__ = 'restructuredtext en'
'''
Wrapper for multi-threaded access to a single sqlite database connection. Serializes
all calls.
'''
import sqlite3 as sqlite, traceback, re, time
from sqlite3 import IntegrityError
from threading import Thread
from Queue import Queue
class Concatenate(object):
'''String concatenation aggregator for sqlite'''
def __init__(self, sep=','):
self.sep = sep
self.ans = ''
def step(self, value):
if value is not None:
self.ans += value + self.sep
def finalize(self):
if not self.ans:
return None
if self.sep:
return self.ans[:-len(self.sep)]
return self.ans
class Connection(sqlite.Connection):
def get(self, *args, **kw):
ans = self.execute(*args)
if not kw.get('all', True):
ans = ans.fetchone()
if not ans:
ans = [None]
return ans[0]
return ans.fetchall()
class DBThread(Thread):
CLOSE = '-------close---------'
def __init__(self, path, row_factory):
Thread.__init__(self)
self.setDaemon(True)
self.path = path
self.unhandled_error = (None, '')
self.row_factory = row_factory
self.requests = Queue(1)
self.results = Queue(1)
self.conn = None
def connect(self):
self.conn = sqlite.connect(self.path, factory=Connection,
detect_types=sqlite.PARSE_DECLTYPES|sqlite.PARSE_COLNAMES)
self.conn.row_factory = sqlite.Row if self.row_factory else lambda cursor, row : list(row)
self.conn.create_aggregate('concat', 1, Concatenate)
title_pat = re.compile('^(A|The|An)\s+', re.IGNORECASE)
def title_sort(title):
match = title_pat.search(title)
if match:
prep = match.group(1)
title = title.replace(prep, '') + ', ' + prep
return title.strip()
self.conn.create_function('title_sort', 1, title_sort)
def run(self):
try:
self.connect()
while True:
func, args, kwargs = self.requests.get()
if func == self.CLOSE:
self.conn.close()
break
func = getattr(self.conn, func)
try:
ok, res = True, func(*args, **kwargs)
except Exception, err:
ok, res = False, (err, traceback.format_exc())
self.results.put((ok, res))
except Exception, err:
self.unhandled_error = (err, traceback.format_exc())
class DatabaseException(Exception):
def __init__(self, err, tb):
tb = '\n\t'.join(('\tRemote'+tb).splitlines())
msg = unicode(err) +'\n' + tb
Exception.__init__(self, msg)
self.orig_err = err
self.orig_tb = tb
def proxy(fn):
''' Decorator to call methods on the database connection in the proxy thread '''
def run(self, *args, **kwargs):
if self.proxy.unhandled_error[0] is not None:
raise DatabaseException(*self.proxy.unhandled_error)
self.proxy.requests.put((fn.__name__, args, kwargs))
ok, res = self.proxy.results.get()
if not ok:
if isinstance(res[0], IntegrityError):
raise IntegrityError(unicode(res[0]))
raise DatabaseException(*res)
return res
return run
class ConnectionProxy(object):
def __init__(self, proxy):
self.proxy = proxy
def close(self):
if self.proxy.unhandled_error is None:
self.proxy.requests.put((self.proxy.CLOSE, [], {}))
@proxy
def get(self, query, all=True): pass
@proxy
def commit(self): pass
@proxy
def execute(self): pass
@proxy
def executemany(self): pass
@proxy
def executescript(self): pass
@proxy
def create_aggregate(self): pass
@proxy
def create_function(self): pass
@proxy
def cursor(self): pass
def connect(dbpath, row_factory=None):
conn = ConnectionProxy(DBThread(dbpath, row_factory))
conn.proxy.start()
while conn.proxy.unhandled_error[0] is None and conn.proxy.conn is None:
time.sleep(0.01)
if conn.proxy.unhandled_error[0] is not None:
raise DatabaseException(*conn.proxy.unhandled_error)
return conn

View File

@ -0,0 +1,90 @@
#!/usr/bin/env python
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net'
__docformat__ = 'restructuredtext en'
'''
Unit tests for database layer.
'''
import sys, unittest, os
from itertools import repeat
from calibre.ptempfile import PersistentTemporaryDirectory
from calibre.library.database2 import LibraryDatabase2
from calibre.ebooks.metadata import MetaInformation
class DBTest(unittest.TestCase):
img = '\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01\x00d\x00d\x00\x00\xff\xdb\x00C\x00\x05\x03\x04\x04\x04\x03\x05\x04\x04\x04\x05\x05\x05\x06\x07\x0c\x08\x07\x07\x07\x07\x0f\x0b\x0b\t\x0c\x11\x0f\x12\x12\x11\x0f\x11\x11\x13\x16\x1c\x17\x13\x14\x1a\x15\x11\x11\x18!\x18\x1a\x1d\x1d\x1f\x1f\x1f\x13\x17"$"\x1e$\x1c\x1e\x1f\x1e\xff\xdb\x00C\x01\x05\x05\x05\x07\x06\x07\x0e\x08\x08\x0e\x1e\x14\x11\x14\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\x1e\xff\xc0\x00\x11\x08\x00\x01\x00\x01\x03\x01\x11\x00\x02\x11\x01\x03\x11\x01\xff\xc4\x00\x14\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\xff\xc4\x00\x14\x10\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xc4\x00\x14\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\xff\xc4\x00\x14\x11\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xda\x00\x0c\x03\x01\x00\x02\x11\x03\x11\x00?\x00p\xf9+\xff\xd9'
def setUp(self):
self.tdir = PersistentTemporaryDirectory('_calibre_dbtest')
self.db = LibraryDatabase2(self.tdir)
f = open(os.path.join(self.tdir, 'test.txt'), 'w+b')
f.write('test')
paths = list(repeat(f, 3))
formats = list(repeat('txt', 3))
m1 = MetaInformation('Test Ebook 1', ['Test Author 1'])
m1.tags = ['tag1', 'tag2']
m1.publisher = 'Test Publisher 1'
m1.rating = 2
m1.series = 'Test Series 1'
m1.series_index = 3
m1.author_sort = 'as1'
m1.isbn = 'isbn1'
m1.cover_data = ('jpg', self.img)
m2 = MetaInformation('Test Ebook 2', ['Test Author 2'])
m2.tags = ['tag3', 'tag4']
m2.publisher = 'Test Publisher 2'
m2.rating = 3
m2.series = 'Test Series 2'
m2.series_index = 1
m2.author_sort = 'as1'
m2.isbn = 'isbn1'
self.db.add_books(paths, formats, [m1, m2, m2], add_duplicates=True)
self.m1, self.m2 = m1, m2
def testAdding(self):
m1, m2 = self.db.get_metadata(1, True), self.db.get_metadata(2, True)
for p in ('title', 'authors', 'publisher', 'rating', 'series',
'series_index', 'author_sort', 'isbn', 'tags'):
def ga(mi, p):
val = getattr(mi, p)
if isinstance(val, list):
val = set(val)
return val
self.assertEqual(ga(self.m1, p), ga(m1, p))
self.assertEqual(ga(self.m2, p), ga(m2, p))
self.assertEqual(self.db.format(1, 'txt', index_is_id=True), 'test')
self.assertNotEqual(self.db.cover(1, index_is_id=True), None)
self.assertEqual(self.db.cover(2, index_is_id=True), None)
def testMetadata(self):
self.db.refresh('timestamp', True)
for x in ('title', 'author_sort', 'series', 'publisher', 'isbn', 'series_index', 'rating'):
val = 3 if x in ['rating', 'series_index'] else 'dummy'
getattr(self.db, 'set_'+x)(3, val)
self.db.refresh_ids([3])
self.assertEqual(getattr(self.db, x)(2), val)
self.db.set_authors(3, ['new auth'])
self.assertEqual(self.db.format(3, 'txt', index_is_id=True), 'test')
def suite():
return unittest.TestLoader().loadTestsFromTestCase(DBTest)
def test():
unittest.TextTestRunner(verbosity=2).run(suite())
def main(args=sys.argv):
test()
return 0
if __name__ == '__main__':
sys.exit(main())