From 99921673d62dd26305a47fed9f35c332aee3a1aa Mon Sep 17 00:00:00 2001 From: Sengian Date: Tue, 14 Dec 2010 00:34:25 +0100 Subject: [PATCH] Threading optimisation (last I hope), now faster than light at least pratchett's for amazon --- src/calibre/ebooks/metadata/amazon.py | 74 +++++++++------------ src/calibre/ebooks/metadata/fictionwise.py | 36 ++-------- src/calibre/ebooks/metadata/google_books.py | 40 +++-------- src/calibre/ebooks/metadata/nicebooks.py | 36 ++-------- 4 files changed, 53 insertions(+), 133 deletions(-) diff --git a/src/calibre/ebooks/metadata/amazon.py b/src/calibre/ebooks/metadata/amazon.py index aec4fb313a..6eb106c862 100644 --- a/src/calibre/ebooks/metadata/amazon.py +++ b/src/calibre/ebooks/metadata/amazon.py @@ -121,20 +121,6 @@ def report(verbose): class AmazonError(Exception): pass -class ThreadwithResults(Thread): - def __init__(self, func, *args, **kargs): - self.func = func - self.args = args - self.kargs = kargs - self.result = None - Thread.__init__(self) - - def get_result(self): - return self.result - - def run(self): - self.result = self.func(*self.args, **self.kargs) - class Query(object): @@ -269,14 +255,11 @@ class Query(object): for i in x.xpath("//a/span[@class='srTitle']")]) return results[:self.max_results], self.baseurl -class ResultList(object): +class ResultList(list): def __init__(self, baseurl, lang = 'all'): self.baseurl = baseurl self.lang = lang - self.thread = [] - self.res = [] - self.nbtag = 0 self.repub = re.compile(u'\((.*)\)') self.rerat = re.compile(u'([0-9.]+)') self.reattr = re.compile(r'<([a-zA-Z0-9]+)\s[^>]+>') @@ -484,63 +467,65 @@ class ResultList(object): entry = None finally: qbr.put(browser, True) - qsync.put(nb, True) - return entry + qsync.put((nb, entry), True) def producer(self, sync, urls, br, verbose=False): for i in xrange(len(urls)): - thread = ThreadwithResults(self.fetchdatathread, br, sync, - i, urls[i], verbose) + thread = Thread(target=self.fetchdatathread, + args=(br, sync, i, urls[i], verbose)) thread.start() - self.thread.append(thread) def consumer(self, sync, syncbis, br, total_entries, verbose=False): i=0 + self.extend([None]*total_entries) while i < total_entries: - nb = int(sync.get(True)) - self.thread[nb].join() - entry = self.thread[nb].get_result() + rq = sync.get(True) + nb = int(rq[0]) + entry = rq[1] i+=1 if entry is not None: mi = self.fill_MI(entry, verbose) if mi is not None: mi.tags, atag = self.get_tags(entry, verbose) - self.res[nb] = mi + self[nb] = mi if atag: - threadbis = ThreadwithResults(self.fetchdatathread, - br, syncbis, nb, mi.tags, verbose) - self.thread[nb] = threadbis - self.nbtag +=1 - threadbis.start() + thread = Thread(target=self.fetchdatathread, + args=(br, syncbis, nb, mi.tags, verbose)) + thread.start() + else: + syncbis.put((nb, None), True) + + def final(self, sync, total_entries, verbose): + i=0 + while i < total_entries: + rq = sync.get(True) + nb = int(rq[0]) + tags = rq[1] + i+=1 + if tags is not None: + self[nb].tags = self.get_tags(tags, verbose)[0] def populate(self, entries, ibr, verbose=False, brcall=3): br = Queue(brcall) cbr = Queue(brcall-1) syncp = Queue(1) - syncc = Queue(len(entries)) + syncc = Queue(1) for i in xrange(brcall-1): br.put(browser(), True) cbr.put(browser(), True) br.put(ibr, True) - self.res = [None]*len(entries) - prod_thread = Thread(target=self.producer, args=(syncp, entries, br, verbose)) cons_thread = Thread(target=self.consumer, args=(syncp, syncc, cbr, len(entries), verbose)) + fin_thread = Thread(target=self.final, args=(syncc, len(entries), verbose)) prod_thread.start() cons_thread.start() + fin_thread.start() prod_thread.join() cons_thread.join() - - #finish processing - for i in xrange(self.nbtag): - nb = int(syncc.get(True)) - tags = self.thread[nb].get_result() - if tags is not None: - self.res[nb].tags = self.get_tags(tags, verbose)[0] - return self.res + fin_thread.join() def search(title=None, author=None, publisher=None, isbn=None, @@ -554,7 +539,8 @@ def search(title=None, author=None, publisher=None, isbn=None, #List of entry ans = ResultList(baseurl, lang) - return [x for x in ans.populate(entries, br, verbose) if x is not None] + ans.populate(entries, br, verbose) + return [x for x in ans if x is not None] def get_social_metadata(title, authors, publisher, isbn, verbose=False, max_results=1, lang='all'): diff --git a/src/calibre/ebooks/metadata/fictionwise.py b/src/calibre/ebooks/metadata/fictionwise.py index a50bb2ce04..48dac131cc 100644 --- a/src/calibre/ebooks/metadata/fictionwise.py +++ b/src/calibre/ebooks/metadata/fictionwise.py @@ -41,20 +41,6 @@ class Fictionwise(MetadataSource): class FictionwiseError(Exception): pass -class ThreadwithResults(Thread): - def __init__(self, func, *args, **kargs): - self.func = func - self.args = args - self.kargs = kargs - self.result = None - Thread.__init__(self) - - def get_result(self): - return self.result - - def run(self): - self.result = self.func(*self.args, **self.kargs) - def report(verbose): if verbose: import traceback @@ -155,7 +141,6 @@ class ResultList(list): def __init__(self, islink): self.islink = islink - self.thread = [] self.retitle = re.compile(r'\[[^\[\]]+\]') self.rechkauth = re.compile(r'.*book\s*by', re.I) self.redesc = re.compile(r'book\s*description\s*:\s*(]+>)*(?P.*)]*>.{,15}publisher\s*:', re.I) @@ -361,27 +346,21 @@ class ResultList(list): entry = None finally: qbr.put(browser, True) - qsync.put(nb, True) - return entry + qsync.put((nb, entry), True) def producer(self, sync, urls, br, verbose=False): for i in xrange(len(urls)): - thread = ThreadwithResults(self.fetchdatathread, br, sync, - i, self.BASE_URL+urls[i], verbose) + thread = Thread(target=self.fetchdatathread, + args=(br, sync, i, self.BASE_URL+urls[i], verbose)) thread.start() - self.thread.append(thread) def consumer(self, sync, total_entries, verbose=False): - res=[None]*total_entries + self.extend([None]*total_entries) i=0 while i < total_entries: - nb = int(sync.get(True)) - self.thread[nb].join() - entry = self.thread[nb].get_result() + rq = sync.get(True) + self[int(rq[0])] = self.fill_MI(rq[1], verbose) i+=1 - if entry is not None: - res[nb] = self.fill_MI(entry, verbose) - return res def populate(self, entries, br, verbose=False, brcall=3): if not self.islink: @@ -396,12 +375,11 @@ class ResultList(list): pbr.put(br, True) prod_thread = Thread(target=self.producer, args=(sync, entries, pbr, verbose)) - cons_thread = ThreadwithResults(self.consumer, sync, len(entries), verbose) + cons_thread = Thread(target=self.consumer, args=(sync, len(entries), verbose)) prod_thread.start() cons_thread.start() prod_thread.join() cons_thread.join() - self.extend(cons_thread.get_result()) def search(title=None, author=None, publisher=None, isbn=None, diff --git a/src/calibre/ebooks/metadata/google_books.py b/src/calibre/ebooks/metadata/google_books.py index 765bb4a255..fd18f080a0 100644 --- a/src/calibre/ebooks/metadata/google_books.py +++ b/src/calibre/ebooks/metadata/google_books.py @@ -61,20 +61,6 @@ class GoogleBooks(MetadataSource): class GoogleBooksError(Exception): pass -class ThreadwithResults(Thread): - def __init__(self, func, *args, **kargs): - self.func = func - self.args = args - self.kargs = kargs - self.result = None - Thread.__init__(self) - - def get_result(self): - return self.result - - def run(self): - self.result = self.func(*self.args, **self.kargs) - def report(verbose): if verbose: import traceback @@ -173,8 +159,6 @@ class Query(object): return entries class ResultList(list): - def __init__(self): - self.thread = [] def get_description(self, entry, verbose): try: @@ -206,8 +190,7 @@ class ResultList(list): return val def get_identifiers(self, entry, mi): - isbns = [str(x.text).strip() for x in identifier(entry)] - isbns = [t[5:] for t in isbns \ + isbns = [t[5:] for t in [str(x.text).strip() for x in identifier(entry)] \ if t[:5].upper() == 'ISBN:' and check_isbn(t[5:])] # for x in identifier(entry): # t = str(x.text).strip() @@ -309,8 +292,7 @@ class ResultList(list): entry = None finally: qbr.put(browser, True) - qsync.put(nb, True) - return entry + qsync.put((nb, entry), True) def producer(self, sync, entries, br, verbose=False): for i in xrange(len(entries)): @@ -319,21 +301,18 @@ class ResultList(list): except: id_url = None report(verbose) - thread = ThreadwithResults(self.fetchdatathread, br, sync, - i, id_url, verbose) + thread = Thread(target=self.fetchdatathread, + args=(br, sync, i, id_url, verbose)) thread.start() - self.thread.append(thread) def consumer(self, entries, sync, total_entries, verbose=False): - res=[None]*total_entries #remove? + self.extend([None]*total_entries) i=0 while i < total_entries: - nb = int(sync.get(True)) - self.thread[nb].join() - data = self.thread[nb].get_result() - res[nb] = self.fill_MI(entries[nb], data, verbose) + rq = sync.get(True) + nb = int(rq[0]) + self[nb] = self.fill_MI(entries[nb], rq[1], verbose) i+=1 - return res def populate(self, entries, br, verbose=False, brcall=3): pbr = Queue(brcall) @@ -343,12 +322,11 @@ class ResultList(list): pbr.put(br, True) prod_thread = Thread(target=self.producer, args=(sync, entries, pbr, verbose)) - cons_thread = ThreadwithResults(self.consumer, entries, sync, len(entries), verbose) + cons_thread = Thread(target=self.consumer, args=(entries, sync, len(entries), verbose)) prod_thread.start() cons_thread.start() prod_thread.join() cons_thread.join() - self.extend(cons_thread.get_result()) def search(title=None, author=None, publisher=None, isbn=None, diff --git a/src/calibre/ebooks/metadata/nicebooks.py b/src/calibre/ebooks/metadata/nicebooks.py index cacb511563..1ff5f7fc6b 100644 --- a/src/calibre/ebooks/metadata/nicebooks.py +++ b/src/calibre/ebooks/metadata/nicebooks.py @@ -82,20 +82,6 @@ class NiceBooksError(Exception): class ISBNNotFound(NiceBooksError): pass -class ThreadwithResults(Thread): - def __init__(self, func, *args, **kargs): - self.func = func - self.args = args - self.kargs = kargs - self.result = None - Thread.__init__(self) - - def get_result(self): - return self.result - - def run(self): - self.result = self.func(*self.args, **self.kargs) - def report(verbose): if verbose: import traceback @@ -191,7 +177,6 @@ class ResultList(list): def __init__(self, islink): self.islink = islink - self.thread = [] self.repub = re.compile(u'\s*.diteur\s*', re.I) self.reauteur = re.compile(u'\s*auteur.*', re.I) self.reautclean = re.compile(u'\s*\(.*\)\s*') @@ -302,27 +287,21 @@ class ResultList(list): entry = None finally: qbr.put(browser, True) - qsync.put(nb, True) - return entry + qsync.put((nb, entry), True) def producer(self, sync, urls, br, verbose=False): for i in xrange(len(urls)): - thread = ThreadwithResults(self.fetchdatathread, br, sync, - i, self.BASE_URL+urls[i], verbose) + thread = Thread(target=self.fetchdatathread, + args=(br, sync, i, self.BASE_URL+urls[i], verbose)) thread.start() - self.thread.append(thread) def consumer(self, sync, total_entries, verbose=False): - res=[None]*total_entries + self.extend([None]*total_entries) i=0 while i < total_entries: - nb = int(sync.get(True)) - self.thread[nb].join() - entry = self.thread[nb].get_result() + rq = sync.get(True) + self[int(rq[0])] = self.fill_MI(rq[1], verbose) i+=1 - if entry is not None: - res[nb] = self.fill_MI(entry, verbose) - return res def populate(self, entries, br, verbose=False, brcall=3): if not self.islink: @@ -337,12 +316,11 @@ class ResultList(list): pbr.put(br, True) prod_thread = Thread(target=self.producer, args=(sync, entries, pbr, verbose)) - cons_thread = ThreadwithResults(self.consumer, sync, len(entries), verbose) + cons_thread = Thread(target=self.consumer, args=(sync, len(entries), verbose)) prod_thread.start() cons_thread.start() prod_thread.join() cons_thread.join() - self.extend(cons_thread.get_result()) class Covers(object):