From 93f855abe989d8f0bd053ad0ef70336928cfaf47 Mon Sep 17 00:00:00 2001 From: John Schember Date: Mon, 28 Feb 2011 19:45:05 -0500 Subject: [PATCH] User generic thread pool base class. Use 2 thread thread pool for downloading covers. --- src/calibre/gui2/store/search.py | 121 +++++++++++++++++++------------ 1 file changed, 73 insertions(+), 48 deletions(-) diff --git a/src/calibre/gui2/store/search.py b/src/calibre/gui2/store/search.py index 0d4bcb8252..272dd6d5f1 100644 --- a/src/calibre/gui2/store/search.py +++ b/src/calibre/gui2/store/search.py @@ -21,12 +21,12 @@ from calibre.gui2.store.search_ui import Ui_Dialog from calibre.utils.icu import sort_key from calibre.utils.magick.draw import thumbnail -class SearchDialog(QDialog, Ui_Dialog): +HANG_TIME = 75000 # milliseconds seconds +TIMEOUT = 75 # seconds +SEARCH_THREAD_TOTAL = 4 +COVER_DOWNLOAD_THREAD_TOTAL = 2 - HANG_TIME = 75000 # milliseconds seconds - TIMEOUT = 75 # seconds - SEARCH_THREAD_TOTAL = 4 - COVER_DOWNLOAD_THREAD_TOTAL = 2 +class SearchDialog(QDialog, Ui_Dialog): def __init__(self, gui, *args): QDialog.__init__(self, *args) @@ -35,7 +35,7 @@ class SearchDialog(QDialog, Ui_Dialog): self.gui = gui self.store_plugins = {} - self.search_thread_pool = SearchThreadPool(self.SEARCH_THREAD_TOTAL) + self.search_pool = SearchThreadPool(SearchThread, SEARCH_THREAD_TOTAL) self.checker = QTimer() self.hang_check = 0 @@ -78,7 +78,7 @@ class SearchDialog(QDialog, Ui_Dialog): def do_search(self, checked=False): # Stop all running threads. self.checker.stop() - self.search_thread_pool.abort() + self.search_pool.abort() # Clear the visible results. self.results_view.model().clear_results() @@ -89,26 +89,26 @@ class SearchDialog(QDialog, Ui_Dialog): for n in self.store_plugins: if getattr(self, 'store_check_' + n).isChecked(): - self.search_thread_pool.add_task(query, n, self.store_plugins[n], self.TIMEOUT) - if self.search_thread_pool.has_tasks(): + self.search_pool.add_task(query, n, self.store_plugins[n], TIMEOUT) + if self.search_pool.has_tasks(): self.hang_check = 0 self.checker.start(100) - self.search_thread_pool.start_threads() + self.search_pool.start_threads() def get_results(self): # We only want the search plugins to run # a maximum set amount of time before giving up. self.hang_check += 1 - if self.hang_check >= self.HANG_TIME: - self.search_thread_pool.abort() + if self.hang_check >= HANG_TIME: + self.search_pool.abort() self.checker.stop() else: # Stop the checker if not threads are running. - if not self.search_thread_pool.threads_running(): + if not self.search_pool.threads_running(): self.checker.stop() - while self.search_thread_pool.has_results(): - res = self.search_thread_pool.get_result_no_wait() + while self.search_pool.has_results(): + res = self.search_pool.get_result_no_wait() if res: self.results_view.model().add_result(res) @@ -137,17 +137,22 @@ class SearchDialog(QDialog, Ui_Dialog): check.setChecked(False) -class SearchThreadPool(object): +class GenericDownloadThreadPool(object): + ''' + add_task must be implemented in a subclass. + ''' - def __init__(self, thread_count): + def __init__(self, thread_type, thread_count): + self.thread_type = thread_type + self.thread_count = thread_count + self.tasks = Queue() self.results = Queue() self.threads = [] - self.thread_count = thread_count def start_threads(self): for i in range(self.thread_count): - t = SearchThread(self.tasks, self.results) + t = self.thread_type(self.tasks, self.results) self.threads.append(t) t.start() @@ -157,9 +162,6 @@ class SearchThreadPool(object): self.threads = [] for t in self.threads: t.abort() - - def add_task(self, query, store_name, store_plugin, timeout): - self.tasks.put((query, store_name, store_plugin, timeout)) def has_tasks(self): return not self.tasks.empty() @@ -181,7 +183,27 @@ class SearchThreadPool(object): if t.is_alive(): return True return False + + +class SearchThreadPool(GenericDownloadThreadPool): + ''' + Threads will run until there is no work or + abort is called. Create and start new threads + using start_threads(). Reset by calling abort(). + Example: + sp = SearchThreadPool(SearchThread, 3) + add tasks using add_task(...) + sp.start_threads() + all threads have finished. + sp.abort() + add tasks using add_task(...) + sp.start_threads() + ''' + + def add_task(self, query, store_name, store_plugin, timeout): + self.tasks.put((query, store_name, store_plugin, timeout)) + class SearchThread(Thread): @@ -209,38 +231,43 @@ class SearchThread(Thread): pass -class CoverDownloadThread(Thread): +class CoverThreadPool(GenericDownloadThreadPool): + ''' + Once started all threads run until abort is called. + ''' - def __init__(self, items, update_callback, timeout=5): + def add_task(self, search_result, update_callback, timeout=5): + self.tasks.put((search_result, update_callback, timeout)) + + +class CoverThread(Thread): + + def __init__(self, tasks, results): Thread.__init__(self) self.daemon = True - self.items = items - self.update_callback = update_callback - self.timeout = timeout - self.br = browser() - + self.tasks = tasks + self.results = results self._run = True + self.br = browser() + def abort(self): self._run = False - - def is_running(self): - return self._run def run(self): while self._run: try: time.sleep(.1) - while not self.items.empty(): + while not self.tasks.empty(): if not self._run: break - item = self.items.get_nowait() - if item and item.cover_url: - with closing(self.br.open(item.cover_url, timeout=self.timeout)) as f: - item.cover_data = f.read() - item.cover_data = thumbnail(item.cover_data, 64, 64)[2] - self.items.task_done() - self.update_callback(item) + result, callback, timeout = self.tasks.get() + if result and result.cover_url: + with closing(self.br.open(result.cover_url, timeout=timeout)) as f: + result.cover_data = f.read() + result.cover_data = thumbnail(result.cover_data, 64, 64)[2] + callback() + self.tasks.task_done() except: continue @@ -252,20 +279,20 @@ class Matches(QAbstractItemModel): def __init__(self): QAbstractItemModel.__init__(self) self.matches = [] - self.cover_download_queue = Queue() - self.cover_download_thread = CoverDownloadThread(self.cover_download_queue, self.update_result) - self.cover_download_thread.start() + self.cover_pool = CoverThreadPool(CoverThread, 2) + self.cover_pool.start_threads() def clear_results(self): self.matches = [] - self.cover_download_queue.queue.clear() + self.cover_pool.abort() + self.cover_pool.start_threads() self.reset() def add_result(self, result): self.layoutAboutToBeChanged.emit() self.matches.append(result) - self.cover_download_queue.put(result) + self.cover_pool.add_task(result, self.update_result) self.layoutChanged.emit() def get_result(self, index): @@ -275,9 +302,7 @@ class Matches(QAbstractItemModel): else: return None - def update_result(self, result): - if not result in self.matches: - return + def update_result(self): self.layoutAboutToBeChanged.emit() self.layoutChanged.emit()