Store: Simplify threads. Tie all threads to progress indicator.

This commit is contained in:
John Schember 2011-04-22 20:04:50 -04:00
parent 8c51a78c7f
commit 57b3531051
3 changed files with 65 additions and 56 deletions

View File

@ -6,7 +6,6 @@ __license__ = 'GPL 3'
__copyright__ = '2011, John Schember <john@nachtimwald.com>'
__docformat__ = 'restructuredtext en'
import time
import traceback
from contextlib import closing
from threading import Thread
@ -17,7 +16,9 @@ from calibre.utils.magick.draw import thumbnail
class GenericDownloadThreadPool(object):
'''
add_task must be implemented in a subclass.
add_task must be implemented in a subclass and must
GenericDownloadThreadPool.add_task must be called
at the end of the function.
'''
def __init__(self, thread_type, thread_count):
@ -29,10 +30,16 @@ class GenericDownloadThreadPool(object):
self.threads = []
def add_task(self):
raise NotImplementedError()
def start_threads(self):
for i in range(self.thread_count):
'''
This must be implemented in a sub class and this function
must be called at the end of the add_task function in
the sub class.
The implementation of this function (in this base class)
starts any threads necessary to fill the pool if it is
not already full.
'''
for i in xrange(self.thread_count - self.running_threads_count()):
t = self.thread_type(self.tasks, self.results)
self.threads.append(t)
t.start()
@ -60,10 +67,14 @@ class GenericDownloadThreadPool(object):
return not self.results.empty()
def threads_running(self):
return self.running_threads_count() > 0
def running_threads_count(self):
count = 0
for t in self.threads:
if t.is_alive():
return True
return False
count += 1
return count
class SearchThreadPool(GenericDownloadThreadPool):
@ -73,17 +84,16 @@ class SearchThreadPool(GenericDownloadThreadPool):
using start_threads(). Reset by calling abort().
Example:
sp = SearchThreadPool(SearchThread, 3)
add tasks using add_task(...)
sp.start_threads()
all threads have finished.
sp.abort()
add tasks using add_task(...)
sp.start_threads()
sp = SearchThreadPool(3)
sp.add_task(...)
'''
def __init__(self, thread_count):
GenericDownloadThreadPool.__init__(self, SearchThread, thread_count)
def add_task(self, query, store_name, store_plugin, timeout):
self.tasks.put((query, store_name, store_plugin, timeout))
GenericDownloadThreadPool.add_task(self)
class SearchThread(Thread):
@ -113,12 +123,13 @@ class SearchThread(Thread):
class CoverThreadPool(GenericDownloadThreadPool):
'''
Once started all threads run until abort is called.
'''
def __init__(self, thread_count):
GenericDownloadThreadPool.__init__(self, CoverThread, thread_count)
def add_task(self, search_result, update_callback, timeout=5):
self.tasks.put((search_result, update_callback, timeout))
GenericDownloadThreadPool.add_task(self)
class CoverThread(Thread):
@ -136,30 +147,27 @@ class CoverThread(Thread):
self._run = False
def run(self):
while self._run:
while self._run and not self.tasks.empty():
try:
time.sleep(.1)
while not self.tasks.empty():
if not self._run:
break
result, callback, timeout = self.tasks.get()
if result and result.cover_url:
with closing(self.br.open(result.cover_url, timeout=timeout)) as f:
result.cover_data = f.read()
result.cover_data = thumbnail(result.cover_data, 64, 64)[2]
callback()
self.tasks.task_done()
result, callback, timeout = self.tasks.get()
if result and result.cover_url:
with closing(self.br.open(result.cover_url, timeout=timeout)) as f:
result.cover_data = f.read()
result.cover_data = thumbnail(result.cover_data, 64, 64)[2]
callback()
self.tasks.task_done()
except:
continue
class DetailsThreadPool(GenericDownloadThreadPool):
'''
Once started all threads run until abort is called.
'''
def __init__(self, thread_count):
GenericDownloadThreadPool.__init__(self, DetailsThread, thread_count)
def add_task(self, search_result, store_plugin, update_callback, timeout=10):
self.tasks.put((search_result, store_plugin, update_callback, timeout))
GenericDownloadThreadPool.add_task(self)
class DetailsThread(Thread):
@ -175,16 +183,12 @@ class DetailsThread(Thread):
self._run = False
def run(self):
while self._run:
while self._run and not self.tasks.empty():
try:
time.sleep(.1)
while not self.tasks.empty():
if not self._run:
break
result, store_plugin, callback, timeout = self.tasks.get()
if result:
store_plugin.get_details(result, timeout)
callback(result)
self.tasks.task_done()
result, store_plugin, callback, timeout = self.tasks.get()
if result:
store_plugin.get_details(result, timeout)
callback(result)
self.tasks.task_done()
except:
continue

View File

@ -14,7 +14,7 @@ from PyQt4.Qt import (Qt, QAbstractItemModel, QVariant, QPixmap, QModelIndex, QS
from calibre.gui2 import NONE
from calibre.gui2.store.search_result import SearchResult
from calibre.gui2.store.search.download_thread import DetailsThreadPool, \
DetailsThread, CoverThreadPool, CoverThread
CoverThreadPool
from calibre.library.caches import _match, CONTAINS_MATCH, EQUALS_MATCH, \
REGEXP_MATCH
from calibre.utils.icu import sort_key
@ -51,10 +51,8 @@ class Matches(QAbstractItemModel):
self.matches = []
self.query = ''
self.search_filter = SearchFilter()
self.cover_pool = CoverThreadPool(CoverThread, 2)
self.cover_pool.start_threads()
self.details_pool = DetailsThreadPool(DetailsThread, 4)
self.details_pool.start_threads()
self.cover_pool = CoverThreadPool(2)
self.details_pool = DetailsThreadPool(4)
self.sort_col = 2
self.sort_order = Qt.AscendingOrder
@ -70,9 +68,7 @@ class Matches(QAbstractItemModel):
self.search_filter.clear_search_results()
self.query = ''
self.cover_pool.abort()
self.cover_pool.start_threads()
self.details_pool.abort()
self.details_pool.start_threads()
self.reset()
def add_result(self, result, store_plugin):

View File

@ -13,7 +13,7 @@ from PyQt4.Qt import (Qt, QDialog, QTimer, QCheckBox, QVBoxLayout)
from calibre.gui2 import JSONConfig, info_dialog
from calibre.gui2.progress_indicator import ProgressIndicator
from calibre.gui2.store.search.download_thread import SearchThreadPool, SearchThread
from calibre.gui2.store.search.download_thread import SearchThreadPool
from calibre.gui2.store.search.search_ui import Ui_Dialog
HANG_TIME = 75000 # milliseconds seconds
@ -31,9 +31,10 @@ class SearchDialog(QDialog, Ui_Dialog):
# We keep a cache of store plugins and reference them by name.
self.store_plugins = istores
self.search_pool = SearchThreadPool(SearchThread, SEARCH_THREAD_TOTAL)
self.search_pool = SearchThreadPool(SEARCH_THREAD_TOTAL)
# Check for results and hung threads.
self.checker = QTimer()
self.progress_checker = QTimer()
self.hang_check = 0
# Add check boxes for each store so the user
@ -54,12 +55,15 @@ class SearchDialog(QDialog, Ui_Dialog):
self.search.clicked.connect(self.do_search)
self.checker.timeout.connect(self.get_results)
self.progress_checker.timeout.connect(self.check_progress)
self.results_view.activated.connect(self.open_store)
self.select_all_stores.clicked.connect(self.stores_select_all)
self.select_invert_stores.clicked.connect(self.stores_select_invert)
self.select_none_stores.clicked.connect(self.stores_select_none)
self.finished.connect(self.dialog_closed)
self.progress_checker.start(100)
self.restore_state()
def resize_columns(self):
@ -105,10 +109,9 @@ class SearchDialog(QDialog, Ui_Dialog):
for n in store_names:
if getattr(self, 'store_check_' + n).isChecked():
self.search_pool.add_task(query, n, self.store_plugins[n], TIMEOUT)
if self.search_pool.has_tasks():
if self.search_pool.has_tasks() or self.search_pool.threads_running():
self.hang_check = 0
self.checker.start(100)
self.search_pool.start_threads()
self.pi.startAnimation()
def clean_query(self, query):
@ -181,12 +184,12 @@ class SearchDialog(QDialog, Ui_Dialog):
if self.hang_check >= HANG_TIME:
self.search_pool.abort()
self.checker.stop()
self.pi.stopAnimation()
#self.check_progress()
else:
# Stop the checker if not threads are running.
if not self.search_pool.threads_running() and not self.search_pool.has_tasks():
self.checker.stop()
self.pi.stopAnimation()
#self.check_progress()
while self.search_pool.has_results():
res, store_plugin = self.search_pool.get_result()
@ -202,6 +205,12 @@ class SearchDialog(QDialog, Ui_Dialog):
result = self.results_view.model().get_result(index)
self.store_plugins[result.store_name].open(self, result.detail_item)
def check_progress(self):
if not self.search_pool.threads_running() and not self.results_view.model().cover_pool.threads_running() and not self.results_view.model().details_pool.threads_running():
self.pi.stopAnimation()
else:
self.pi.startAnimation()
def get_store_checks(self):
'''
Returns a list of QCheckBox's for each store.