From 57b3531051d291e179861ff54e089559230246d6 Mon Sep 17 00:00:00 2001 From: John Schember Date: Fri, 22 Apr 2011 20:04:50 -0400 Subject: [PATCH] Store: Simplify threads. Tie all threads to progress indicator. --- .../gui2/store/search/download_thread.py | 90 ++++++++++--------- src/calibre/gui2/store/search/models.py | 10 +-- src/calibre/gui2/store/search/search.py | 21 +++-- 3 files changed, 65 insertions(+), 56 deletions(-) diff --git a/src/calibre/gui2/store/search/download_thread.py b/src/calibre/gui2/store/search/download_thread.py index a6f92011f6..4dd3c4a59b 100644 --- a/src/calibre/gui2/store/search/download_thread.py +++ b/src/calibre/gui2/store/search/download_thread.py @@ -6,7 +6,6 @@ __license__ = 'GPL 3' __copyright__ = '2011, John Schember ' __docformat__ = 'restructuredtext en' -import time import traceback from contextlib import closing from threading import Thread @@ -17,7 +16,9 @@ from calibre.utils.magick.draw import thumbnail class GenericDownloadThreadPool(object): ''' - add_task must be implemented in a subclass. + add_task must be implemented in a subclass and must + GenericDownloadThreadPool.add_task must be called + at the end of the function. ''' def __init__(self, thread_type, thread_count): @@ -29,10 +30,16 @@ class GenericDownloadThreadPool(object): self.threads = [] def add_task(self): - raise NotImplementedError() - - def start_threads(self): - for i in range(self.thread_count): + ''' + This must be implemented in a sub class and this function + must be called at the end of the add_task function in + the sub class. + + The implementation of this function (in this base class) + starts any threads necessary to fill the pool if it is + not already full. + ''' + for i in xrange(self.thread_count - self.running_threads_count()): t = self.thread_type(self.tasks, self.results) self.threads.append(t) t.start() @@ -60,10 +67,14 @@ class GenericDownloadThreadPool(object): return not self.results.empty() def threads_running(self): + return self.running_threads_count() > 0 + + def running_threads_count(self): + count = 0 for t in self.threads: if t.is_alive(): - return True - return False + count += 1 + return count class SearchThreadPool(GenericDownloadThreadPool): @@ -73,17 +84,16 @@ class SearchThreadPool(GenericDownloadThreadPool): using start_threads(). Reset by calling abort(). Example: - sp = SearchThreadPool(SearchThread, 3) - add tasks using add_task(...) - sp.start_threads() - all threads have finished. - sp.abort() - add tasks using add_task(...) - sp.start_threads() + sp = SearchThreadPool(3) + sp.add_task(...) ''' + + def __init__(self, thread_count): + GenericDownloadThreadPool.__init__(self, SearchThread, thread_count) def add_task(self, query, store_name, store_plugin, timeout): self.tasks.put((query, store_name, store_plugin, timeout)) + GenericDownloadThreadPool.add_task(self) class SearchThread(Thread): @@ -113,12 +123,13 @@ class SearchThread(Thread): class CoverThreadPool(GenericDownloadThreadPool): - ''' - Once started all threads run until abort is called. - ''' + + def __init__(self, thread_count): + GenericDownloadThreadPool.__init__(self, CoverThread, thread_count) def add_task(self, search_result, update_callback, timeout=5): self.tasks.put((search_result, update_callback, timeout)) + GenericDownloadThreadPool.add_task(self) class CoverThread(Thread): @@ -136,30 +147,27 @@ class CoverThread(Thread): self._run = False def run(self): - while self._run: + while self._run and not self.tasks.empty(): try: - time.sleep(.1) - while not self.tasks.empty(): - if not self._run: - break - result, callback, timeout = self.tasks.get() - if result and result.cover_url: - with closing(self.br.open(result.cover_url, timeout=timeout)) as f: - result.cover_data = f.read() - result.cover_data = thumbnail(result.cover_data, 64, 64)[2] - callback() - self.tasks.task_done() + result, callback, timeout = self.tasks.get() + if result and result.cover_url: + with closing(self.br.open(result.cover_url, timeout=timeout)) as f: + result.cover_data = f.read() + result.cover_data = thumbnail(result.cover_data, 64, 64)[2] + callback() + self.tasks.task_done() except: continue class DetailsThreadPool(GenericDownloadThreadPool): - ''' - Once started all threads run until abort is called. - ''' + + def __init__(self, thread_count): + GenericDownloadThreadPool.__init__(self, DetailsThread, thread_count) def add_task(self, search_result, store_plugin, update_callback, timeout=10): self.tasks.put((search_result, store_plugin, update_callback, timeout)) + GenericDownloadThreadPool.add_task(self) class DetailsThread(Thread): @@ -175,16 +183,12 @@ class DetailsThread(Thread): self._run = False def run(self): - while self._run: + while self._run and not self.tasks.empty(): try: - time.sleep(.1) - while not self.tasks.empty(): - if not self._run: - break - result, store_plugin, callback, timeout = self.tasks.get() - if result: - store_plugin.get_details(result, timeout) - callback(result) - self.tasks.task_done() + result, store_plugin, callback, timeout = self.tasks.get() + if result: + store_plugin.get_details(result, timeout) + callback(result) + self.tasks.task_done() except: continue diff --git a/src/calibre/gui2/store/search/models.py b/src/calibre/gui2/store/search/models.py index 73b7bcc90a..adc90e3b14 100644 --- a/src/calibre/gui2/store/search/models.py +++ b/src/calibre/gui2/store/search/models.py @@ -14,7 +14,7 @@ from PyQt4.Qt import (Qt, QAbstractItemModel, QVariant, QPixmap, QModelIndex, QS from calibre.gui2 import NONE from calibre.gui2.store.search_result import SearchResult from calibre.gui2.store.search.download_thread import DetailsThreadPool, \ - DetailsThread, CoverThreadPool, CoverThread + CoverThreadPool from calibre.library.caches import _match, CONTAINS_MATCH, EQUALS_MATCH, \ REGEXP_MATCH from calibre.utils.icu import sort_key @@ -51,10 +51,8 @@ class Matches(QAbstractItemModel): self.matches = [] self.query = '' self.search_filter = SearchFilter() - self.cover_pool = CoverThreadPool(CoverThread, 2) - self.cover_pool.start_threads() - self.details_pool = DetailsThreadPool(DetailsThread, 4) - self.details_pool.start_threads() + self.cover_pool = CoverThreadPool(2) + self.details_pool = DetailsThreadPool(4) self.sort_col = 2 self.sort_order = Qt.AscendingOrder @@ -70,9 +68,7 @@ class Matches(QAbstractItemModel): self.search_filter.clear_search_results() self.query = '' self.cover_pool.abort() - self.cover_pool.start_threads() self.details_pool.abort() - self.details_pool.start_threads() self.reset() def add_result(self, result, store_plugin): diff --git a/src/calibre/gui2/store/search/search.py b/src/calibre/gui2/store/search/search.py index 5c4b1cee00..8c3cf8452e 100644 --- a/src/calibre/gui2/store/search/search.py +++ b/src/calibre/gui2/store/search/search.py @@ -13,7 +13,7 @@ from PyQt4.Qt import (Qt, QDialog, QTimer, QCheckBox, QVBoxLayout) from calibre.gui2 import JSONConfig, info_dialog from calibre.gui2.progress_indicator import ProgressIndicator -from calibre.gui2.store.search.download_thread import SearchThreadPool, SearchThread +from calibre.gui2.store.search.download_thread import SearchThreadPool from calibre.gui2.store.search.search_ui import Ui_Dialog HANG_TIME = 75000 # milliseconds seconds @@ -31,9 +31,10 @@ class SearchDialog(QDialog, Ui_Dialog): # We keep a cache of store plugins and reference them by name. self.store_plugins = istores - self.search_pool = SearchThreadPool(SearchThread, SEARCH_THREAD_TOTAL) + self.search_pool = SearchThreadPool(SEARCH_THREAD_TOTAL) # Check for results and hung threads. self.checker = QTimer() + self.progress_checker = QTimer() self.hang_check = 0 # Add check boxes for each store so the user @@ -54,12 +55,15 @@ class SearchDialog(QDialog, Ui_Dialog): self.search.clicked.connect(self.do_search) self.checker.timeout.connect(self.get_results) + self.progress_checker.timeout.connect(self.check_progress) self.results_view.activated.connect(self.open_store) self.select_all_stores.clicked.connect(self.stores_select_all) self.select_invert_stores.clicked.connect(self.stores_select_invert) self.select_none_stores.clicked.connect(self.stores_select_none) self.finished.connect(self.dialog_closed) + self.progress_checker.start(100) + self.restore_state() def resize_columns(self): @@ -105,10 +109,9 @@ class SearchDialog(QDialog, Ui_Dialog): for n in store_names: if getattr(self, 'store_check_' + n).isChecked(): self.search_pool.add_task(query, n, self.store_plugins[n], TIMEOUT) - if self.search_pool.has_tasks(): + if self.search_pool.has_tasks() or self.search_pool.threads_running(): self.hang_check = 0 self.checker.start(100) - self.search_pool.start_threads() self.pi.startAnimation() def clean_query(self, query): @@ -181,12 +184,12 @@ class SearchDialog(QDialog, Ui_Dialog): if self.hang_check >= HANG_TIME: self.search_pool.abort() self.checker.stop() - self.pi.stopAnimation() + #self.check_progress() else: # Stop the checker if not threads are running. if not self.search_pool.threads_running() and not self.search_pool.has_tasks(): self.checker.stop() - self.pi.stopAnimation() + #self.check_progress() while self.search_pool.has_results(): res, store_plugin = self.search_pool.get_result() @@ -202,6 +205,12 @@ class SearchDialog(QDialog, Ui_Dialog): result = self.results_view.model().get_result(index) self.store_plugins[result.store_name].open(self, result.detail_item) + def check_progress(self): + if not self.search_pool.threads_running() and not self.results_view.model().cover_pool.threads_running() and not self.results_view.model().details_pool.threads_running(): + self.pi.stopAnimation() + else: + self.pi.startAnimation() + def get_store_checks(self): ''' Returns a list of QCheckBox's for each store.