mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-09 03:04:10 -04:00
Always load is_multiple fields in db order. Fix various tests to work on windows. Fix author_sort tests to not care about tweaks. Implement changing path of books when metadata changes. Add a test loader to load all tests
This commit is contained in:
parent
2cc790cd90
commit
4015d7526d
@ -23,9 +23,11 @@ from calibre.ebooks.metadata import title_sort, author_to_author_sort
|
|||||||
from calibre.utils.icu import sort_key
|
from calibre.utils.icu import sort_key
|
||||||
from calibre.utils.config import to_json, from_json, prefs, tweaks
|
from calibre.utils.config import to_json, from_json, prefs, tweaks
|
||||||
from calibre.utils.date import utcfromtimestamp, parse_date
|
from calibre.utils.date import utcfromtimestamp, parse_date
|
||||||
from calibre.utils.filenames import (is_case_sensitive, samefile, hardlink_file)
|
from calibre.utils.filenames import (is_case_sensitive, samefile, hardlink_file, ascii_filename,
|
||||||
|
WindowsAtomicFolderMove)
|
||||||
|
from calibre.utils.recycle_bin import delete_tree
|
||||||
from calibre.db.tables import (OneToOneTable, ManyToOneTable, ManyToManyTable,
|
from calibre.db.tables import (OneToOneTable, ManyToOneTable, ManyToManyTable,
|
||||||
SizeTable, FormatsTable, AuthorsTable, IdentifiersTable,
|
SizeTable, FormatsTable, AuthorsTable, IdentifiersTable, PathTable,
|
||||||
CompositeTable, LanguagesTable)
|
CompositeTable, LanguagesTable)
|
||||||
# }}}
|
# }}}
|
||||||
|
|
||||||
@ -672,7 +674,7 @@ class DB(object):
|
|||||||
if col == 'cover' else col)
|
if col == 'cover' else col)
|
||||||
if not metadata['column']:
|
if not metadata['column']:
|
||||||
metadata['column'] = col
|
metadata['column'] = col
|
||||||
tables[col] = OneToOneTable(col, metadata)
|
tables[col] = (PathTable if col == 'path' else OneToOneTable)(col, metadata)
|
||||||
|
|
||||||
for col in ('series', 'publisher', 'rating'):
|
for col in ('series', 'publisher', 'rating'):
|
||||||
tables[col] = ManyToOneTable(col, self.field_metadata[col].copy())
|
tables[col] = ManyToOneTable(col, self.field_metadata[col].copy())
|
||||||
@ -778,6 +780,44 @@ class DB(object):
|
|||||||
self.user_version = 1
|
self.user_version = 1
|
||||||
# }}}
|
# }}}
|
||||||
|
|
||||||
|
def normpath(self, path):
|
||||||
|
path = os.path.abspath(os.path.realpath(path))
|
||||||
|
if not self.is_case_sensitive:
|
||||||
|
path = os.path.normcase(path).lower()
|
||||||
|
return path
|
||||||
|
|
||||||
|
def rmtree(self, path, permanent=False):
|
||||||
|
if not self.normpath(self.library_path).startswith(self.normpath(path)):
|
||||||
|
delete_tree(path, permanent=permanent)
|
||||||
|
|
||||||
|
def construct_path_name(self, book_id, title, author):
|
||||||
|
'''
|
||||||
|
Construct the directory name for this book based on its metadata.
|
||||||
|
'''
|
||||||
|
author = ascii_filename(author
|
||||||
|
)[:self.PATH_LIMIT].decode('ascii', 'replace')
|
||||||
|
title = ascii_filename(title
|
||||||
|
)[:self.PATH_LIMIT].decode('ascii', 'replace')
|
||||||
|
while author[-1] in (' ', '.'):
|
||||||
|
author = author[:-1]
|
||||||
|
if not author:
|
||||||
|
author = ascii_filename(_('Unknown')).decode(
|
||||||
|
'ascii', 'replace')
|
||||||
|
return '%s/%s (%d)'%(author, title, book_id)
|
||||||
|
|
||||||
|
def construct_file_name(self, book_id, title, author):
|
||||||
|
'''
|
||||||
|
Construct the file name for this book based on its metadata.
|
||||||
|
'''
|
||||||
|
author = ascii_filename(author
|
||||||
|
)[:self.PATH_LIMIT].decode('ascii', 'replace')
|
||||||
|
title = ascii_filename(title
|
||||||
|
)[:self.PATH_LIMIT].decode('ascii', 'replace')
|
||||||
|
name = title + ' - ' + author
|
||||||
|
while name.endswith('.'):
|
||||||
|
name = name[:-1]
|
||||||
|
return name
|
||||||
|
|
||||||
# Database layer API {{{
|
# Database layer API {{{
|
||||||
|
|
||||||
def custom_table_names(self, num):
|
def custom_table_names(self, num):
|
||||||
@ -865,7 +905,7 @@ class DB(object):
|
|||||||
return self.format_abspath(book_id, fmt, fname, path) is not None
|
return self.format_abspath(book_id, fmt, fname, path) is not None
|
||||||
|
|
||||||
def copy_cover_to(self, path, dest, windows_atomic_move=None, use_hardlink=False):
|
def copy_cover_to(self, path, dest, windows_atomic_move=None, use_hardlink=False):
|
||||||
path = os.path.join(self.library_path, path, 'cover.jpg')
|
path = os.path.abspath(os.path.join(self.library_path, path, 'cover.jpg'))
|
||||||
if windows_atomic_move is not None:
|
if windows_atomic_move is not None:
|
||||||
if not isinstance(dest, basestring):
|
if not isinstance(dest, basestring):
|
||||||
raise Exception("Error, you must pass the dest as a path when"
|
raise Exception("Error, you must pass the dest as a path when"
|
||||||
@ -926,5 +966,90 @@ class DB(object):
|
|||||||
shutil.copyfileobj(f, d)
|
shutil.copyfileobj(f, d)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def windows_check_if_files_in_use(self, paths):
|
||||||
|
'''
|
||||||
|
Raises an EACCES IOError if any of the files in the folder of book_id
|
||||||
|
are opened in another program on windows.
|
||||||
|
'''
|
||||||
|
if iswindows:
|
||||||
|
for path in paths:
|
||||||
|
spath = os.path.join(self.library_path, *path.split('/'))
|
||||||
|
wam = None
|
||||||
|
if os.path.exists(spath):
|
||||||
|
try:
|
||||||
|
wam = WindowsAtomicFolderMove(spath)
|
||||||
|
finally:
|
||||||
|
if wam is not None:
|
||||||
|
wam.close_handles()
|
||||||
|
|
||||||
|
def update_path(self, book_id, title, author, path_field, formats_field):
|
||||||
|
path = self.construct_path_name(book_id, title, author)
|
||||||
|
current_path = path_field.for_book(book_id)
|
||||||
|
formats = formats_field.for_book(book_id, default_value=())
|
||||||
|
fname = self.construct_file_name(book_id, title, author)
|
||||||
|
# Check if the metadata used to construct paths has changed
|
||||||
|
changed = False
|
||||||
|
for fmt in formats:
|
||||||
|
name = formats_field.format_fname(book_id, fmt)
|
||||||
|
if name and name != fname:
|
||||||
|
changed = True
|
||||||
|
break
|
||||||
|
if path == current_path and not changed:
|
||||||
|
return
|
||||||
|
spath = os.path.join(self.library_path, *current_path.split('/'))
|
||||||
|
tpath = os.path.join(self.library_path, *path.split('/'))
|
||||||
|
|
||||||
|
source_ok = current_path and os.path.exists(spath)
|
||||||
|
wam = WindowsAtomicFolderMove(spath) if iswindows and source_ok else None
|
||||||
|
try:
|
||||||
|
if not os.path.exists(tpath):
|
||||||
|
os.makedirs(tpath)
|
||||||
|
|
||||||
|
if source_ok: # Migrate existing files
|
||||||
|
dest = os.path.join(tpath, 'cover.jpg')
|
||||||
|
self.copy_cover_to(current_path, dest,
|
||||||
|
windows_atomic_move=wam, use_hardlink=True)
|
||||||
|
for fmt in formats:
|
||||||
|
dest = os.path.join(tpath, fname+'.'+fmt.lower())
|
||||||
|
self.copy_format_to(book_id, fmt, formats_field.format_fname(book_id, fmt), current_path,
|
||||||
|
dest, windows_atomic_move=wam, use_hardlink=True)
|
||||||
|
# Update db to reflect new file locations
|
||||||
|
for fmt in formats:
|
||||||
|
formats_field.table.set_fname(book_id, fmt, fname, self)
|
||||||
|
path_field.table.set_path(book_id, path, self)
|
||||||
|
|
||||||
|
# Delete not needed directories
|
||||||
|
if source_ok:
|
||||||
|
if os.path.exists(spath) and not samefile(spath, tpath):
|
||||||
|
if wam is not None:
|
||||||
|
wam.delete_originals()
|
||||||
|
self.rmtree(spath, permanent=True)
|
||||||
|
parent = os.path.dirname(spath)
|
||||||
|
if len(os.listdir(parent)) == 0:
|
||||||
|
self.rmtree(parent, permanent=True)
|
||||||
|
finally:
|
||||||
|
if wam is not None:
|
||||||
|
wam.close_handles()
|
||||||
|
|
||||||
|
curpath = self.library_path
|
||||||
|
c1, c2 = current_path.split('/'), path.split('/')
|
||||||
|
if not self.is_case_sensitive and len(c1) == len(c2):
|
||||||
|
# On case-insensitive systems, title and author renames that only
|
||||||
|
# change case don't cause any changes to the directories in the file
|
||||||
|
# system. This can lead to having the directory names not match the
|
||||||
|
# title/author, which leads to trouble when libraries are copied to
|
||||||
|
# a case-sensitive system. The following code attempts to fix this
|
||||||
|
# by checking each segment. If they are different because of case,
|
||||||
|
# then rename the segment. Note that the code above correctly
|
||||||
|
# handles files in the directories, so no need to do them here.
|
||||||
|
for oldseg, newseg in zip(c1, c2):
|
||||||
|
if oldseg.lower() == newseg.lower() and oldseg != newseg:
|
||||||
|
try:
|
||||||
|
os.rename(os.path.join(curpath, oldseg),
|
||||||
|
os.path.join(curpath, newseg))
|
||||||
|
except:
|
||||||
|
break # Fail silently since nothing catastrophic has happened
|
||||||
|
curpath = os.path.join(curpath, newseg)
|
||||||
|
|
||||||
# }}}
|
# }}}
|
||||||
|
|
||||||
|
@ -12,6 +12,7 @@ from io import BytesIO
|
|||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from functools import wraps, partial
|
from functools import wraps, partial
|
||||||
|
|
||||||
|
from calibre.constants import iswindows
|
||||||
from calibre.db import SPOOL_SIZE
|
from calibre.db import SPOOL_SIZE
|
||||||
from calibre.db.categories import get_categories
|
from calibre.db.categories import get_categories
|
||||||
from calibre.db.locking import create_locks, RecordLock
|
from calibre.db.locking import create_locks, RecordLock
|
||||||
@ -621,9 +622,12 @@ class Cache(object):
|
|||||||
|
|
||||||
@write_api
|
@write_api
|
||||||
def set_field(self, name, book_id_to_val_map, allow_case_change=True):
|
def set_field(self, name, book_id_to_val_map, allow_case_change=True):
|
||||||
# TODO: Specialize title/authors to also update path
|
|
||||||
f = self.fields[name]
|
f = self.fields[name]
|
||||||
is_series = f.metadata['datatype'] == 'series'
|
is_series = f.metadata['datatype'] == 'series'
|
||||||
|
update_path = name in {'title', 'authors'}
|
||||||
|
if update_path and iswindows:
|
||||||
|
paths = (x for x in (self._field_for('path', book_id) for book_id in book_id_to_val_map) if x)
|
||||||
|
self.backend.windows_check_if_files_in_use(paths)
|
||||||
|
|
||||||
if is_series:
|
if is_series:
|
||||||
bimap, simap = {}, {}
|
bimap, simap = {}, {}
|
||||||
@ -650,8 +654,24 @@ class Cache(object):
|
|||||||
for name in self.composites:
|
for name in self.composites:
|
||||||
self.fields[name].pop_cache(dirtied)
|
self.fields[name].pop_cache(dirtied)
|
||||||
|
|
||||||
|
if dirtied and update_path:
|
||||||
|
self._update_path(dirtied, mark_as_dirtied=False)
|
||||||
|
|
||||||
|
# TODO: Mark these as dirtied so that the opf is regenerated
|
||||||
|
|
||||||
return dirtied
|
return dirtied
|
||||||
|
|
||||||
|
@write_api
|
||||||
|
def update_path(self, book_ids, mark_as_dirtied=True):
|
||||||
|
for book_id in book_ids:
|
||||||
|
title = self._field_for('title', book_id, default_value=_('Unknown'))
|
||||||
|
author = self._field_for('authors', book_id, default_value=(_('Unknown'),))[0]
|
||||||
|
self.backend.update_path(book_id, title, author, self.fields['path'], self.fields['formats'])
|
||||||
|
if mark_as_dirtied:
|
||||||
|
pass
|
||||||
|
# TODO: Mark these books as dirtied so that metadata.opf is
|
||||||
|
# re-created
|
||||||
|
|
||||||
# }}}
|
# }}}
|
||||||
|
|
||||||
class SortKey(object):
|
class SortKey(object):
|
||||||
|
@ -13,7 +13,6 @@ from dateutil.tz import tzoffset
|
|||||||
|
|
||||||
from calibre.constants import plugins
|
from calibre.constants import plugins
|
||||||
from calibre.utils.date import parse_date, local_tz, UNDEFINED_DATE
|
from calibre.utils.date import parse_date, local_tz, UNDEFINED_DATE
|
||||||
from calibre.utils.localization import lang_map
|
|
||||||
from calibre.ebooks.metadata import author_to_author_sort
|
from calibre.ebooks.metadata import author_to_author_sort
|
||||||
|
|
||||||
_c_speedup = plugins['speedup'][0]
|
_c_speedup = plugins['speedup'][0]
|
||||||
@ -83,6 +82,13 @@ class OneToOneTable(Table):
|
|||||||
self.metadata['column'], self.metadata['table'])):
|
self.metadata['column'], self.metadata['table'])):
|
||||||
self.book_col_map[row[0]] = self.unserialize(row[1])
|
self.book_col_map[row[0]] = self.unserialize(row[1])
|
||||||
|
|
||||||
|
class PathTable(OneToOneTable):
|
||||||
|
|
||||||
|
def set_path(self, book_id, path, db):
|
||||||
|
self.book_col_map[book_id] = path
|
||||||
|
db.conn.execute('UPDATE books SET path=? WHERE id=?',
|
||||||
|
(path, book_id))
|
||||||
|
|
||||||
class SizeTable(OneToOneTable):
|
class SizeTable(OneToOneTable):
|
||||||
|
|
||||||
def read(self, db):
|
def read(self, db):
|
||||||
@ -144,7 +150,7 @@ class ManyToManyTable(ManyToOneTable):
|
|||||||
'''
|
'''
|
||||||
|
|
||||||
table_type = MANY_MANY
|
table_type = MANY_MANY
|
||||||
selectq = 'SELECT book, {0} FROM {1}'
|
selectq = 'SELECT book, {0} FROM {1} ORDER BY id'
|
||||||
|
|
||||||
def read_maps(self, db):
|
def read_maps(self, db):
|
||||||
for row in db.conn.execute(
|
for row in db.conn.execute(
|
||||||
@ -161,8 +167,6 @@ class ManyToManyTable(ManyToOneTable):
|
|||||||
|
|
||||||
class AuthorsTable(ManyToManyTable):
|
class AuthorsTable(ManyToManyTable):
|
||||||
|
|
||||||
selectq = 'SELECT book, {0} FROM {1} ORDER BY id'
|
|
||||||
|
|
||||||
def read_id_maps(self, db):
|
def read_id_maps(self, db):
|
||||||
self.alink_map = {}
|
self.alink_map = {}
|
||||||
self.asort_map = {}
|
self.asort_map = {}
|
||||||
@ -196,6 +200,11 @@ class FormatsTable(ManyToManyTable):
|
|||||||
for key in tuple(self.book_col_map.iterkeys()):
|
for key in tuple(self.book_col_map.iterkeys()):
|
||||||
self.book_col_map[key] = tuple(sorted(self.book_col_map[key]))
|
self.book_col_map[key] = tuple(sorted(self.book_col_map[key]))
|
||||||
|
|
||||||
|
def set_fname(self, book_id, fmt, fname, db):
|
||||||
|
self.fname_map[book_id][fmt] = fname
|
||||||
|
db.conn.execute('UPDATE data SET name=? WHERE book=? AND format=?',
|
||||||
|
(fname, book_id, fmt))
|
||||||
|
|
||||||
class IdentifiersTable(ManyToManyTable):
|
class IdentifiersTable(ManyToManyTable):
|
||||||
|
|
||||||
def read_id_maps(self, db):
|
def read_id_maps(self, db):
|
||||||
@ -215,6 +224,3 @@ class LanguagesTable(ManyToManyTable):
|
|||||||
|
|
||||||
def read_id_maps(self, db):
|
def read_id_maps(self, db):
|
||||||
ManyToManyTable.read_id_maps(self, db)
|
ManyToManyTable.read_id_maps(self, db)
|
||||||
lm = lang_map()
|
|
||||||
self.lang_name_map = {x:lm.get(x, x) for x in self.id_map.itervalues()}
|
|
||||||
|
|
||||||
|
@ -7,7 +7,7 @@ __license__ = 'GPL v3'
|
|||||||
__copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
|
__copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
|
||||||
__docformat__ = 'restructuredtext en'
|
__docformat__ = 'restructuredtext en'
|
||||||
|
|
||||||
import unittest, os, shutil, tempfile, atexit
|
import unittest, os, shutil, tempfile, atexit, gc
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from future_builtins import map
|
from future_builtins import map
|
||||||
@ -21,6 +21,7 @@ class BaseTest(unittest.TestCase):
|
|||||||
self.create_db(self.library_path)
|
self.create_db(self.library_path)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
|
gc.collect(), gc.collect()
|
||||||
shutil.rmtree(self.library_path)
|
shutil.rmtree(self.library_path)
|
||||||
|
|
||||||
def create_db(self, library_path):
|
def create_db(self, library_path):
|
||||||
@ -36,6 +37,7 @@ class BaseTest(unittest.TestCase):
|
|||||||
db.add_format(1, 'FMT1', BytesIO(b'book1fmt1'), index_is_id=True)
|
db.add_format(1, 'FMT1', BytesIO(b'book1fmt1'), index_is_id=True)
|
||||||
db.add_format(1, 'FMT2', BytesIO(b'book1fmt2'), index_is_id=True)
|
db.add_format(1, 'FMT2', BytesIO(b'book1fmt2'), index_is_id=True)
|
||||||
db.add_format(2, 'FMT1', BytesIO(b'book2fmt1'), index_is_id=True)
|
db.add_format(2, 'FMT1', BytesIO(b'book2fmt1'), index_is_id=True)
|
||||||
|
db.conn.close()
|
||||||
return dest
|
return dest
|
||||||
|
|
||||||
def init_cache(self, library_path):
|
def init_cache(self, library_path):
|
||||||
@ -65,6 +67,10 @@ class BaseTest(unittest.TestCase):
|
|||||||
shutil.copytree(library_path, dest)
|
shutil.copytree(library_path, dest)
|
||||||
return dest
|
return dest
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cloned_library(self):
|
||||||
|
return self.clone_library(self.library_path)
|
||||||
|
|
||||||
def compare_metadata(self, mi1, mi2):
|
def compare_metadata(self, mi1, mi2):
|
||||||
allfk1 = mi1.all_field_keys()
|
allfk1 = mi1.all_field_keys()
|
||||||
allfk2 = mi2.all_field_keys()
|
allfk2 = mi2.all_field_keys()
|
||||||
@ -79,6 +85,8 @@ class BaseTest(unittest.TestCase):
|
|||||||
attr1, attr2 = getattr(mi1, attr), getattr(mi2, attr)
|
attr1, attr2 = getattr(mi1, attr), getattr(mi2, attr)
|
||||||
if attr == 'formats':
|
if attr == 'formats':
|
||||||
attr1, attr2 = map(lambda x:tuple(x) if x else (), (attr1, attr2))
|
attr1, attr2 = map(lambda x:tuple(x) if x else (), (attr1, attr2))
|
||||||
|
if isinstance(attr1, (tuple, list)) and 'authors' not in attr and 'languages' not in attr:
|
||||||
|
attr1, attr2 = set(attr1), set(attr2)
|
||||||
self.assertEqual(attr1, attr2,
|
self.assertEqual(attr1, attr2,
|
||||||
'%s not the same: %r != %r'%(attr, attr1, attr2))
|
'%s not the same: %r != %r'%(attr, attr1, attr2))
|
||||||
if attr.startswith('#'):
|
if attr.startswith('#'):
|
||||||
|
87
src/calibre/db/tests/filesystem.py
Normal file
87
src/calibre/db/tests/filesystem.py
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:fdm=marker:ai
|
||||||
|
from __future__ import (unicode_literals, division, absolute_import,
|
||||||
|
print_function)
|
||||||
|
|
||||||
|
__license__ = 'GPL v3'
|
||||||
|
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
|
||||||
|
__docformat__ = 'restructuredtext en'
|
||||||
|
|
||||||
|
import unittest, os
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
from calibre.constants import iswindows
|
||||||
|
from calibre.db.tests.base import BaseTest
|
||||||
|
|
||||||
|
class FilesystemTest(BaseTest):
|
||||||
|
|
||||||
|
def get_filesystem_data(self, cache, book_id):
|
||||||
|
fmts = cache.field_for('formats', book_id)
|
||||||
|
ans = {}
|
||||||
|
for fmt in fmts:
|
||||||
|
buf = BytesIO()
|
||||||
|
if cache.copy_format_to(book_id, fmt, buf):
|
||||||
|
ans[fmt] = buf.getvalue()
|
||||||
|
buf = BytesIO()
|
||||||
|
if cache.copy_cover_to(book_id, buf):
|
||||||
|
ans['cover'] = buf.getvalue()
|
||||||
|
return ans
|
||||||
|
|
||||||
|
def test_metadata_move(self):
|
||||||
|
'Test the moving of files when title/author change'
|
||||||
|
cl = self.cloned_library
|
||||||
|
cache = self.init_cache(cl)
|
||||||
|
ae, af, sf = self.assertEqual, self.assertFalse, cache.set_field
|
||||||
|
|
||||||
|
# Test that changing metadata on a book with no formats/cover works
|
||||||
|
ae(sf('title', {3:'moved1'}), set([3]))
|
||||||
|
ae(sf('authors', {3:'moved1'}), set([3]))
|
||||||
|
ae(sf('title', {3:'Moved1'}), set([3]))
|
||||||
|
ae(sf('authors', {3:'Moved1'}), set([3]))
|
||||||
|
ae(cache.field_for('title', 3), 'Moved1')
|
||||||
|
ae(cache.field_for('authors', 3), ('Moved1',))
|
||||||
|
|
||||||
|
# Now try with a book that has covers and formats
|
||||||
|
orig_data = self.get_filesystem_data(cache, 1)
|
||||||
|
orig_fpath = cache.format_abspath(1, 'FMT1')
|
||||||
|
ae(sf('title', {1:'moved'}), set([1]))
|
||||||
|
ae(sf('authors', {1:'moved'}), set([1]))
|
||||||
|
ae(sf('title', {1:'Moved'}), set([1]))
|
||||||
|
ae(sf('authors', {1:'Moved'}), set([1]))
|
||||||
|
ae(cache.field_for('title', 1), 'Moved')
|
||||||
|
ae(cache.field_for('authors', 1), ('Moved',))
|
||||||
|
cache2 = self.init_cache(cl)
|
||||||
|
for c in (cache, cache2):
|
||||||
|
data = self.get_filesystem_data(c, 1)
|
||||||
|
ae(set(orig_data.iterkeys()), set(data.iterkeys()))
|
||||||
|
ae(orig_data, data, 'Filesystem data does not match')
|
||||||
|
ae(c.field_for('path', 1), 'Moved/Moved (1)')
|
||||||
|
ae(c.field_for('path', 3), 'Moved1/Moved1 (3)')
|
||||||
|
fpath = c.format_abspath(1, 'FMT1').replace(os.sep, '/').split('/')
|
||||||
|
ae(fpath[-3:], ['Moved', 'Moved (1)', 'Moved - Moved.fmt1'])
|
||||||
|
af(os.path.exists(os.path.dirname(orig_fpath)), 'Original book folder still exists')
|
||||||
|
|
||||||
|
@unittest.skipUnless(iswindows, 'Windows only')
|
||||||
|
def test_windows_atomic_move(self):
|
||||||
|
'Test book file open in another process when changing metadata'
|
||||||
|
cl = self.cloned_library
|
||||||
|
cache = self.init_cache(cl)
|
||||||
|
fpath = cache.format_abspath(1, 'FMT1')
|
||||||
|
f = open(fpath, 'rb')
|
||||||
|
with self.assertRaises(IOError):
|
||||||
|
cache.set_field('title', {1:'Moved'})
|
||||||
|
f.close()
|
||||||
|
self.assertNotEqual(cache.field_for('title', 1), 'Moved', 'Title was changed despite file lock')
|
||||||
|
|
||||||
|
|
||||||
|
def tests():
|
||||||
|
tl = unittest.TestLoader()
|
||||||
|
return tl.loadTestsFromTestCase(FilesystemTest)
|
||||||
|
|
||||||
|
def run():
|
||||||
|
unittest.TextTestRunner(verbosity=2).run(tests())
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
run()
|
||||||
|
|
||||||
|
|
17
src/calibre/db/tests/main.py
Normal file
17
src/calibre/db/tests/main.py
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:fdm=marker:ai
|
||||||
|
from __future__ import (unicode_literals, division, absolute_import,
|
||||||
|
print_function)
|
||||||
|
|
||||||
|
__license__ = 'GPL v3'
|
||||||
|
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
|
||||||
|
__docformat__ = 'restructuredtext en'
|
||||||
|
|
||||||
|
import unittest, os
|
||||||
|
|
||||||
|
def find_tests():
|
||||||
|
return unittest.defaultTestLoader.discover(os.path.dirname(os.path.abspath(__file__)), pattern='*.py')
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.TextTestRunner(verbosity=2).run(find_tests())
|
||||||
|
|
@ -115,6 +115,8 @@ class ReadingTest(BaseTest):
|
|||||||
for book_id, test in tests.iteritems():
|
for book_id, test in tests.iteritems():
|
||||||
for field, expected_val in test.iteritems():
|
for field, expected_val in test.iteritems():
|
||||||
val = cache.field_for(field, book_id)
|
val = cache.field_for(field, book_id)
|
||||||
|
if isinstance(val, tuple) and 'authors' not in field and 'languages' not in field:
|
||||||
|
val, expected_val = set(val), set(expected_val)
|
||||||
self.assertEqual(expected_val, val,
|
self.assertEqual(expected_val, val,
|
||||||
'Book id: %d Field: %s failed: %r != %r'%(
|
'Book id: %d Field: %s failed: %r != %r'%(
|
||||||
book_id, field, expected_val, val))
|
book_id, field, expected_val, val))
|
||||||
@ -173,6 +175,7 @@ class ReadingTest(BaseTest):
|
|||||||
mi.format_metadata = dict(mi.format_metadata)
|
mi.format_metadata = dict(mi.format_metadata)
|
||||||
if mi.formats:
|
if mi.formats:
|
||||||
mi.formats = tuple(mi.formats)
|
mi.formats = tuple(mi.formats)
|
||||||
|
old.conn.close()
|
||||||
old = None
|
old = None
|
||||||
|
|
||||||
cache = self.init_cache(self.library_path)
|
cache = self.init_cache(self.library_path)
|
||||||
@ -189,6 +192,7 @@ class ReadingTest(BaseTest):
|
|||||||
from calibre.library.database2 import LibraryDatabase2
|
from calibre.library.database2 import LibraryDatabase2
|
||||||
old = LibraryDatabase2(self.library_path)
|
old = LibraryDatabase2(self.library_path)
|
||||||
covers = {i: old.cover(i, index_is_id=True) for i in old.all_ids()}
|
covers = {i: old.cover(i, index_is_id=True) for i in old.all_ids()}
|
||||||
|
old.conn.close()
|
||||||
old = None
|
old = None
|
||||||
cache = self.init_cache(self.library_path)
|
cache = self.init_cache(self.library_path)
|
||||||
for book_id, cdata in covers.iteritems():
|
for book_id, cdata in covers.iteritems():
|
||||||
@ -247,6 +251,7 @@ class ReadingTest(BaseTest):
|
|||||||
'#formats:fmt1', '#formats:fmt2', '#formats:fmt1 and #formats:fmt2',
|
'#formats:fmt1', '#formats:fmt2', '#formats:fmt1 and #formats:fmt2',
|
||||||
|
|
||||||
)}
|
)}
|
||||||
|
old.conn.close()
|
||||||
old = None
|
old = None
|
||||||
|
|
||||||
cache = self.init_cache(self.library_path)
|
cache = self.init_cache(self.library_path)
|
||||||
@ -263,6 +268,7 @@ class ReadingTest(BaseTest):
|
|||||||
from calibre.library.database2 import LibraryDatabase2
|
from calibre.library.database2 import LibraryDatabase2
|
||||||
old = LibraryDatabase2(self.library_path)
|
old = LibraryDatabase2(self.library_path)
|
||||||
old_categories = old.get_categories()
|
old_categories = old.get_categories()
|
||||||
|
old.conn.close()
|
||||||
cache = self.init_cache(self.library_path)
|
cache = self.init_cache(self.library_path)
|
||||||
new_categories = cache.get_categories()
|
new_categories = cache.get_categories()
|
||||||
self.assertEqual(set(old_categories), set(new_categories),
|
self.assertEqual(set(old_categories), set(new_categories),
|
||||||
@ -305,6 +311,7 @@ class ReadingTest(BaseTest):
|
|||||||
i, index_is_id=True) else set() for i in ids}
|
i, index_is_id=True) else set() for i in ids}
|
||||||
formats = {i:{f:old.format(i, f, index_is_id=True) for f in fmts} for
|
formats = {i:{f:old.format(i, f, index_is_id=True) for f in fmts} for
|
||||||
i, fmts in lf.iteritems()}
|
i, fmts in lf.iteritems()}
|
||||||
|
old.conn.close()
|
||||||
old = None
|
old = None
|
||||||
cache = self.init_cache(self.library_path)
|
cache = self.init_cache(self.library_path)
|
||||||
for book_id, fmts in lf.iteritems():
|
for book_id, fmts in lf.iteritems():
|
||||||
|
@ -11,15 +11,12 @@ import unittest
|
|||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
|
||||||
|
from calibre.ebooks.metadata import author_to_author_sort
|
||||||
from calibre.utils.date import UNDEFINED_DATE
|
from calibre.utils.date import UNDEFINED_DATE
|
||||||
from calibre.db.tests.base import BaseTest
|
from calibre.db.tests.base import BaseTest
|
||||||
|
|
||||||
class WritingTest(BaseTest):
|
class WritingTest(BaseTest):
|
||||||
|
|
||||||
@property
|
|
||||||
def cloned_library(self):
|
|
||||||
return self.clone_library(self.library_path)
|
|
||||||
|
|
||||||
def create_getter(self, name, getter=None):
|
def create_getter(self, name, getter=None):
|
||||||
if getter is None:
|
if getter is None:
|
||||||
if name.endswith('_index'):
|
if name.endswith('_index'):
|
||||||
@ -214,7 +211,7 @@ class WritingTest(BaseTest):
|
|||||||
{1, 2})
|
{1, 2})
|
||||||
for name in ('tags', '#tags'):
|
for name in ('tags', '#tags'):
|
||||||
f = cache.fields[name]
|
f = cache.fields[name]
|
||||||
af(sf(name, {1:('tag one', 'News')}, allow_case_change=False))
|
af(sf(name, {1:('News', 'tag one')}, allow_case_change=False))
|
||||||
ae(sf(name, {1:'tag one, News'}), {1, 2})
|
ae(sf(name, {1:'tag one, News'}), {1, 2})
|
||||||
ae(sf(name, {3:('tag two', 'sep,sep2')}), {2, 3})
|
ae(sf(name, {3:('tag two', 'sep,sep2')}), {2, 3})
|
||||||
ae(len(f.table.id_map), 4)
|
ae(len(f.table.id_map), 4)
|
||||||
@ -225,7 +222,7 @@ class WritingTest(BaseTest):
|
|||||||
ae(len(c.fields[name].table.id_map), 3)
|
ae(len(c.fields[name].table.id_map), 3)
|
||||||
ae(len(c.fields[name].table.id_map), 3)
|
ae(len(c.fields[name].table.id_map), 3)
|
||||||
ae(c.field_for(name, 1), ())
|
ae(c.field_for(name, 1), ())
|
||||||
ae(c.field_for(name, 2), ('tag one', 'tag two'))
|
ae(c.field_for(name, 2), ('tag two', 'tag one'))
|
||||||
del cache2
|
del cache2
|
||||||
|
|
||||||
# Authors
|
# Authors
|
||||||
@ -244,13 +241,10 @@ class WritingTest(BaseTest):
|
|||||||
ae(c.field_for(name, 3), ('Kovid Goyal', 'Divok Layog'))
|
ae(c.field_for(name, 3), ('Kovid Goyal', 'Divok Layog'))
|
||||||
ae(c.field_for(name, 2), ('An, Author',))
|
ae(c.field_for(name, 2), ('An, Author',))
|
||||||
ae(c.field_for(name, 1), ('Unknown',) if name=='authors' else ())
|
ae(c.field_for(name, 1), ('Unknown',) if name=='authors' else ())
|
||||||
ae(c.field_for('author_sort', 1), 'Unknown')
|
|
||||||
ae(c.field_for('author_sort', 2), 'An, Author')
|
|
||||||
ae(c.field_for('author_sort', 3), 'Goyal, Kovid & Layog, Divok')
|
|
||||||
if name == 'authors':
|
if name == 'authors':
|
||||||
ae(c.field_for('author_sort', 3), 'Goyal, Kovid & Layog, Divok')
|
ae(c.field_for('author_sort', 1), author_to_author_sort('Unknown'))
|
||||||
ae(c.field_for('author_sort', 2), 'An, Author')
|
ae(c.field_for('author_sort', 2), author_to_author_sort('An, Author'))
|
||||||
ae(c.field_for('author_sort', 1), 'Unknown')
|
ae(c.field_for('author_sort', 3), author_to_author_sort('Kovid Goyal') + ' & ' + author_to_author_sort('Divok Layog'))
|
||||||
del cache2
|
del cache2
|
||||||
ae(cache.set_field('authors', {1:'KoviD GoyaL'}), {1, 3})
|
ae(cache.set_field('authors', {1:'KoviD GoyaL'}), {1, 3})
|
||||||
ae(cache.field_for('author_sort', 1), 'GoyaL, KoviD')
|
ae(cache.field_for('author_sort', 1), 'GoyaL, KoviD')
|
||||||
@ -269,6 +263,13 @@ class WritingTest(BaseTest):
|
|||||||
ae(cache.field_for('languages', 3), ('eng',))
|
ae(cache.field_for('languages', 3), ('eng',))
|
||||||
ae(sf('languages', {3:None}), set([3]))
|
ae(sf('languages', {3:None}), set([3]))
|
||||||
ae(cache.field_for('languages', 3), ())
|
ae(cache.field_for('languages', 3), ())
|
||||||
|
ae(sf('languages', {1:'deu,fra,eng'}), set([1]), 'Changing order failed')
|
||||||
|
ae(sf('languages', {2:'deu,eng,eng'}), set([2]))
|
||||||
|
cache2 = self.init_cache(cl)
|
||||||
|
for c in (cache, cache2):
|
||||||
|
ae(cache.field_for('languages', 1), ('deu', 'fra', 'eng'))
|
||||||
|
ae(cache.field_for('languages', 2), ('deu', 'eng'))
|
||||||
|
del cache2
|
||||||
|
|
||||||
# Identifiers
|
# Identifiers
|
||||||
f = cache.fields['identifiers']
|
f = cache.fields['identifiers']
|
||||||
|
Loading…
x
Reference in New Issue
Block a user