User generic thread pool base class. Use 2 thread thread pool for downloading covers.

This commit is contained in:
John Schember 2011-02-28 19:45:05 -05:00
parent 98a0211463
commit 93f855abe9

View File

@ -21,12 +21,12 @@ from calibre.gui2.store.search_ui import Ui_Dialog
from calibre.utils.icu import sort_key from calibre.utils.icu import sort_key
from calibre.utils.magick.draw import thumbnail from calibre.utils.magick.draw import thumbnail
class SearchDialog(QDialog, Ui_Dialog): HANG_TIME = 75000 # milliseconds seconds
TIMEOUT = 75 # seconds
SEARCH_THREAD_TOTAL = 4
COVER_DOWNLOAD_THREAD_TOTAL = 2
HANG_TIME = 75000 # milliseconds seconds class SearchDialog(QDialog, Ui_Dialog):
TIMEOUT = 75 # seconds
SEARCH_THREAD_TOTAL = 4
COVER_DOWNLOAD_THREAD_TOTAL = 2
def __init__(self, gui, *args): def __init__(self, gui, *args):
QDialog.__init__(self, *args) QDialog.__init__(self, *args)
@ -35,7 +35,7 @@ class SearchDialog(QDialog, Ui_Dialog):
self.gui = gui self.gui = gui
self.store_plugins = {} self.store_plugins = {}
self.search_thread_pool = SearchThreadPool(self.SEARCH_THREAD_TOTAL) self.search_pool = SearchThreadPool(SearchThread, SEARCH_THREAD_TOTAL)
self.checker = QTimer() self.checker = QTimer()
self.hang_check = 0 self.hang_check = 0
@ -78,7 +78,7 @@ class SearchDialog(QDialog, Ui_Dialog):
def do_search(self, checked=False): def do_search(self, checked=False):
# Stop all running threads. # Stop all running threads.
self.checker.stop() self.checker.stop()
self.search_thread_pool.abort() self.search_pool.abort()
# Clear the visible results. # Clear the visible results.
self.results_view.model().clear_results() self.results_view.model().clear_results()
@ -89,26 +89,26 @@ class SearchDialog(QDialog, Ui_Dialog):
for n in self.store_plugins: for n in self.store_plugins:
if getattr(self, 'store_check_' + n).isChecked(): if getattr(self, 'store_check_' + n).isChecked():
self.search_thread_pool.add_task(query, n, self.store_plugins[n], self.TIMEOUT) self.search_pool.add_task(query, n, self.store_plugins[n], TIMEOUT)
if self.search_thread_pool.has_tasks(): if self.search_pool.has_tasks():
self.hang_check = 0 self.hang_check = 0
self.checker.start(100) self.checker.start(100)
self.search_thread_pool.start_threads() self.search_pool.start_threads()
def get_results(self): def get_results(self):
# We only want the search plugins to run # We only want the search plugins to run
# a maximum set amount of time before giving up. # a maximum set amount of time before giving up.
self.hang_check += 1 self.hang_check += 1
if self.hang_check >= self.HANG_TIME: if self.hang_check >= HANG_TIME:
self.search_thread_pool.abort() self.search_pool.abort()
self.checker.stop() self.checker.stop()
else: else:
# Stop the checker if not threads are running. # Stop the checker if not threads are running.
if not self.search_thread_pool.threads_running(): if not self.search_pool.threads_running():
self.checker.stop() self.checker.stop()
while self.search_thread_pool.has_results(): while self.search_pool.has_results():
res = self.search_thread_pool.get_result_no_wait() res = self.search_pool.get_result_no_wait()
if res: if res:
self.results_view.model().add_result(res) self.results_view.model().add_result(res)
@ -137,17 +137,22 @@ class SearchDialog(QDialog, Ui_Dialog):
check.setChecked(False) check.setChecked(False)
class SearchThreadPool(object): class GenericDownloadThreadPool(object):
'''
add_task must be implemented in a subclass.
'''
def __init__(self, thread_count): def __init__(self, thread_type, thread_count):
self.thread_type = thread_type
self.thread_count = thread_count
self.tasks = Queue() self.tasks = Queue()
self.results = Queue() self.results = Queue()
self.threads = [] self.threads = []
self.thread_count = thread_count
def start_threads(self): def start_threads(self):
for i in range(self.thread_count): for i in range(self.thread_count):
t = SearchThread(self.tasks, self.results) t = self.thread_type(self.tasks, self.results)
self.threads.append(t) self.threads.append(t)
t.start() t.start()
@ -157,9 +162,6 @@ class SearchThreadPool(object):
self.threads = [] self.threads = []
for t in self.threads: for t in self.threads:
t.abort() t.abort()
def add_task(self, query, store_name, store_plugin, timeout):
self.tasks.put((query, store_name, store_plugin, timeout))
def has_tasks(self): def has_tasks(self):
return not self.tasks.empty() return not self.tasks.empty()
@ -181,7 +183,27 @@ class SearchThreadPool(object):
if t.is_alive(): if t.is_alive():
return True return True
return False return False
class SearchThreadPool(GenericDownloadThreadPool):
'''
Threads will run until there is no work or
abort is called. Create and start new threads
using start_threads(). Reset by calling abort().
Example:
sp = SearchThreadPool(SearchThread, 3)
add tasks using add_task(...)
sp.start_threads()
all threads have finished.
sp.abort()
add tasks using add_task(...)
sp.start_threads()
'''
def add_task(self, query, store_name, store_plugin, timeout):
self.tasks.put((query, store_name, store_plugin, timeout))
class SearchThread(Thread): class SearchThread(Thread):
@ -209,38 +231,43 @@ class SearchThread(Thread):
pass pass
class CoverDownloadThread(Thread): class CoverThreadPool(GenericDownloadThreadPool):
'''
Once started all threads run until abort is called.
'''
def __init__(self, items, update_callback, timeout=5): def add_task(self, search_result, update_callback, timeout=5):
self.tasks.put((search_result, update_callback, timeout))
class CoverThread(Thread):
def __init__(self, tasks, results):
Thread.__init__(self) Thread.__init__(self)
self.daemon = True self.daemon = True
self.items = items self.tasks = tasks
self.update_callback = update_callback self.results = results
self.timeout = timeout
self.br = browser()
self._run = True self._run = True
self.br = browser()
def abort(self): def abort(self):
self._run = False self._run = False
def is_running(self):
return self._run
def run(self): def run(self):
while self._run: while self._run:
try: try:
time.sleep(.1) time.sleep(.1)
while not self.items.empty(): while not self.tasks.empty():
if not self._run: if not self._run:
break break
item = self.items.get_nowait() result, callback, timeout = self.tasks.get()
if item and item.cover_url: if result and result.cover_url:
with closing(self.br.open(item.cover_url, timeout=self.timeout)) as f: with closing(self.br.open(result.cover_url, timeout=timeout)) as f:
item.cover_data = f.read() result.cover_data = f.read()
item.cover_data = thumbnail(item.cover_data, 64, 64)[2] result.cover_data = thumbnail(result.cover_data, 64, 64)[2]
self.items.task_done() callback()
self.update_callback(item) self.tasks.task_done()
except: except:
continue continue
@ -252,20 +279,20 @@ class Matches(QAbstractItemModel):
def __init__(self): def __init__(self):
QAbstractItemModel.__init__(self) QAbstractItemModel.__init__(self)
self.matches = [] self.matches = []
self.cover_download_queue = Queue() self.cover_pool = CoverThreadPool(CoverThread, 2)
self.cover_download_thread = CoverDownloadThread(self.cover_download_queue, self.update_result) self.cover_pool.start_threads()
self.cover_download_thread.start()
def clear_results(self): def clear_results(self):
self.matches = [] self.matches = []
self.cover_download_queue.queue.clear() self.cover_pool.abort()
self.cover_pool.start_threads()
self.reset() self.reset()
def add_result(self, result): def add_result(self, result):
self.layoutAboutToBeChanged.emit() self.layoutAboutToBeChanged.emit()
self.matches.append(result) self.matches.append(result)
self.cover_download_queue.put(result) self.cover_pool.add_task(result, self.update_result)
self.layoutChanged.emit() self.layoutChanged.emit()
def get_result(self, index): def get_result(self, index):
@ -275,9 +302,7 @@ class Matches(QAbstractItemModel):
else: else:
return None return None
def update_result(self, result): def update_result(self):
if not result in self.matches:
return
self.layoutAboutToBeChanged.emit() self.layoutAboutToBeChanged.emit()
self.layoutChanged.emit() self.layoutChanged.emit()