Make testing for calibre:// matching current db more robust

This commit is contained in:
Kovid Goyal 2020-12-09 10:02:19 +05:30
parent b3cd2a15a0
commit ca99d9bd1e
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
4 changed files with 37 additions and 5 deletions

View File

@ -170,6 +170,10 @@ class Cache(object):
def library_id(self): def library_id(self):
return self.backend.library_id return self.backend.library_id
@property
def dbpath(self):
return self.backend.dbpath
@property @property
def safe_read_lock(self): def safe_read_lock(self):
''' A safe read lock is a lock that does nothing if the thread already ''' A safe read lock is a lock that does nothing if the thread already

View File

@ -58,7 +58,7 @@ from calibre.gui2.tag_browser.ui import TagBrowserMixin
from calibre.gui2.update import UpdateMixin from calibre.gui2.update import UpdateMixin
from calibre.gui2.widgets import ProgressIndicator from calibre.gui2.widgets import ProgressIndicator
from calibre.library import current_library_name from calibre.library import current_library_name
from calibre.srv.library_broker import GuiLibraryBroker from calibre.srv.library_broker import GuiLibraryBroker, db_matches
from calibre.utils.config import dynamic, prefs from calibre.utils.config import dynamic, prefs
from calibre.utils.ipc.pool import Pool from calibre.utils.ipc.pool import Pool
from polyglot.builtins import string_or_bytes, unicode_type from polyglot.builtins import string_or_bytes, unicode_type
@ -661,7 +661,7 @@ class Main(MainWindow, MainWindowMixin, DeviceMixin, EmailMixin, # {{{
if action == 'switch-library': if action == 'switch-library':
library_id = decode_library_id(posixpath.basename(path)) library_id = decode_library_id(posixpath.basename(path))
library_path = self.library_broker.path_for_library_id(library_id) library_path = self.library_broker.path_for_library_id(library_id)
if library_path is not None and library_id != getattr(self.current_db.new_api, 'server_library_id', None): if not db_matches(self.current_db, library_id, library_path):
self.library_moved(library_path) self.library_moved(library_path)
elif action == 'show-book': elif action == 'show-book':
parts = tuple(filter(None, path.split('/'))) parts = tuple(filter(None, path.split('/')))
@ -741,7 +741,7 @@ class Main(MainWindow, MainWindowMixin, DeviceMixin, EmailMixin, # {{{
self.perform_url_action(library_id, library_path, doit) self.perform_url_action(library_id, library_path, doit)
def perform_url_action(self, library_id, library_path, func): def perform_url_action(self, library_id, library_path, func):
if library_id != getattr(self.current_db.new_api, 'server_library_id', None): if not db_matches(self.current_db, library_id, library_path):
self.library_moved(library_path) self.library_moved(library_path)
QTimer.singleShot(0, func) QTimer.singleShot(0, func)
else: else:

View File

@ -74,6 +74,14 @@ def correct_case_of_last_path_component(original_path):
return os.path.join(prefix, basename) return os.path.join(prefix, basename)
def db_matches(db, library_id, library_path):
db = db.new_api
if getattr(db, 'server_library_id', object()) == library_id:
return True
dbpath = db.dbpath
return samefile(dbpath, os.path.join(library_path, os.path.basename(dbpath)))
class LibraryBroker(object): class LibraryBroker(object):
def __init__(self, libraries): def __init__(self, libraries):
@ -150,7 +158,16 @@ class LibraryBroker(object):
def path_for_library_id(self, library_id): def path_for_library_id(self, library_id):
with self: with self:
return self.original_path_map.get(self.lmap.get(library_id)) lpath = self.lmap.get(library_id)
if lpath is None:
q = library_id.lower()
for k, v in self.lmap.items():
if k.lower() == q:
lpath = v
break
else:
return
return self.original_path_map.get(lpath)
def __enter__(self): def __enter__(self):
self.lock.acquire() self.lock.acquire()

View File

@ -14,7 +14,7 @@ from tempfile import TemporaryDirectory
class TestRouter(BaseTest): class TestRouter(BaseTest):
def test_library_id_construction(self): def test_library_id_construction(self):
from calibre.srv.library_broker import library_id_from_path, correct_case_of_last_path_component from calibre.srv.library_broker import library_id_from_path, correct_case_of_last_path_component, db_matches
self.ae(library_id_from_path('as'), 'as') self.ae(library_id_from_path('as'), 'as')
self.ae(library_id_from_path('as/'), 'as') self.ae(library_id_from_path('as/'), 'as')
self.ae(library_id_from_path('as////'), 'as') self.ae(library_id_from_path('as////'), 'as')
@ -22,11 +22,22 @@ class TestRouter(BaseTest):
if os.sep == '\\': if os.sep == '\\':
self.ae(library_id_from_path('as/' + os.sep), 'as') self.ae(library_id_from_path('as/' + os.sep), 'as')
self.ae(library_id_from_path('X:' + os.sep), 'X') self.ae(library_id_from_path('X:' + os.sep), 'X')
class MockDB:
def __init__(self, base):
self.new_api = self
self.server_library_id = 'lid'
self.dbpath = os.path.join(base, 'metadata.db')
with TemporaryDirectory() as tdir: with TemporaryDirectory() as tdir:
path = os.path.join(tdir, 'Test') path = os.path.join(tdir, 'Test')
os.mkdir(path) os.mkdir(path)
self.ae(correct_case_of_last_path_component(os.path.join(tdir, 'test')), path) self.ae(correct_case_of_last_path_component(os.path.join(tdir, 'test')), path)
self.ae(correct_case_of_last_path_component(os.path.join(tdir, 'Test')), path) self.ae(correct_case_of_last_path_component(os.path.join(tdir, 'Test')), path)
db = MockDB(tdir)
self.assertTrue(db_matches(db, db.server_library_id, None))
self.assertTrue(db_matches(db, db.server_library_id.upper(), tdir))
def test_route_construction(self): def test_route_construction(self):
' Test route construction ' ' Test route construction '