Update Edelweiss metadata plugin

Getting book data using edelweiss identifiers works. However, I cannot
get the search to work, for some reason the server returns random
results, ignoring the passed in query.
This commit is contained in:
Kovid Goyal 2018-04-05 23:30:07 +05:30
parent d72b06b29b
commit 19ca524e47
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C

View File

@ -25,14 +25,8 @@ def clean_html(raw):
def parse_html(raw):
raw = clean_html(raw)
try:
from html5_parser import parse
except ImportError:
# Old versions of calibre
import html5lib
return html5lib.parse(raw, treebuilder='lxml', namespaceHTMLElements=False)
else:
return parse(raw)
from html5_parser import parse
return parse(raw)
def astext(node):
@ -43,18 +37,21 @@ def astext(node):
class Worker(Thread): # {{{
def __init__(self, sku, url, relevance, result_queue, br, timeout, log, plugin):
def __init__(self, basic_data, relevance, result_queue, br, timeout, log, plugin):
Thread.__init__(self)
self.daemon = True
self.url, self.br, self.log, self.timeout = url, br, log, timeout
self.result_queue, self.plugin, self.sku = result_queue, plugin, sku
self.basic_data = basic_data
self.br, self.log, self.timeout = br, log, timeout
self.result_queue, self.plugin, self.sku = result_queue, plugin, self.basic_data['sku']
self.relevance = relevance
def run(self):
url = ('https://www.edelweiss.plus/GetTreelineControl.aspx?controlName=/uc/product/two_Enhanced.ascx&'
'sku={0}&idPrefix=content_1_{0}&mode=0'.format(self.sku))
try:
raw = self.br.open_novisit(self.url, timeout=self.timeout).read()
raw = self.br.open_novisit(url, timeout=self.timeout).read()
except:
self.log.exception('Failed to load details page: %r'%self.url)
self.log.exception('Failed to load comments page: %r'%url)
return
try:
@ -63,91 +60,46 @@ class Worker(Thread): # {{{
self.plugin.clean_downloaded_metadata(mi)
self.result_queue.put(mi)
except:
self.log.exception('Failed to parse details page: %r'%self.url)
self.log.exception('Failed to parse details for sku: %s'%self.sku)
def parse(self, raw):
from calibre.ebooks.metadata.book.base import Metadata
from calibre.utils.date import parse_only_date, UNDEFINED_DATE
from css_selectors import Select
from calibre.utils.date import UNDEFINED_DATE
root = parse_html(raw)
selector = Select(root)
sku = next(selector('div.sku.attGroup'))
info = sku.getparent()
top = info.getparent().getparent()
banner = top.find('div')
spans = banner.findall('span')
title = ''
for i, span in enumerate(spans):
if i == 0 or '12pt' in span.get('style', ''):
title += astext(span)
else:
break
authors = [re.sub(r'\(.*\)', '', x).strip() for x in astext(spans[-1]).split(',')]
mi = Metadata(title.strip(), authors)
mi = Metadata(self.basic_data['title'], self.basic_data['authors'])
# Identifiers
isbns = [check_isbn(x.strip()) for x in astext(sku).split(',')]
for isbn in isbns:
if isbn:
self.plugin.cache_isbn_to_identifier(isbn, self.sku)
isbns = sorted(isbns, key=lambda x:len(x) if x else 0, reverse=True)
if isbns and isbns[0]:
mi.isbn = isbns[0]
if self.basic_data['isbns']:
mi.isbn = self.basic_data['isbns'][0]
mi.set_identifier('edelweiss', self.sku)
# Tags
bisac = tuple(selector('div.bisac.attGroup'))
if bisac:
bisac = astext(bisac[0])
mi.tags = [x.strip() for x in bisac.split(',')]
if self.basic_data['tags']:
mi.tags = self.basic_data['tags']
mi.tags = [t[1:].strip() if t.startswith('&') else t for t in mi.tags]
# Publisher
pub = tuple(selector('div.supplier.attGroup'))
if pub:
pub = astext(pub[0])
mi.publisher = pub
mi.publisher = self.basic_data['publisher']
# Pubdate
pub = tuple(selector('div.shipDate.attGroupItem'))
if pub:
pub = astext(pub[0])
parts = pub.partition(':')[0::2]
pub = parts[1] or parts[0]
try:
if ', Ship Date:' in pub:
pub = pub.partition(', Ship Date:')[0]
q = parse_only_date(pub, assume_utc=True)
if q.year != UNDEFINED_DATE:
mi.pubdate = q
except:
self.log.exception('Error parsing published date: %r'%pub)
if self.basic_data['pubdate'] and self.basic_data['pubdate'].year != UNDEFINED_DATE:
mi.pubdate = self.basic_data['pubdate']
# Rating
if self.basic_data['rating']:
mi.rating = self.basic_data['rating']
# Comments
comm = ''
general = tuple(selector('div#pd-general-overview-content'))
if general:
q = self.render_comments(general[0])
if q != '<p>No title summary available. </p>':
comm += q
general = tuple(selector('div#pd-general-contributor-content'))
if general:
comm += self.render_comments(general[0])
general = tuple(selector('div#pd-general-quotes-content'))
if general:
comm += self.render_comments(general[0])
if comm:
mi.comments = comm
# Cover
img = tuple(selector('img.title-image[src]'))
if img:
href = img[0].get('src').replace('jacket_covers/medium/',
'jacket_covers/flyout/')
self.plugin.cache_identifier_to_cover_url(self.sku, href)
comments = ''
for cid in ('summary', 'contributorbio', 'quotes_reviews'):
cid = 'desc_{}{}-content'.format(cid, self.sku)
div = root.xpath('//*[@id="{}"]'.format(cid))
if div:
comments += self.render_comments(div[0])
if comments:
mi.comments = comments
mi.has_cover = self.plugin.cached_identifier_to_cover_url(self.sku) is not None
return mi
def render_comments(self, desc):
@ -171,17 +123,84 @@ class Worker(Thread): # {{{
# }}}
def get_basic_data(browser, log, *skus):
from calibre.utils.date import parse_only_date
from mechanize import Request
zeroes = ','.join('0' for sku in skus)
data = {
'skus': ','.join(skus),
'drc': zeroes,
'startPosition': '0',
'sequence': '1',
'selected': zeroes,
'itemID': '0',
'orderID': '0',
'mailingID': '',
'tContentWidth': '926',
'originalOrder': ','.join(str(i) for i in range(len(skus))),
'selectedOrderID': '0',
'selectedSortColumn': '0',
'listType': '1',
'resultType': '32',
'blockView': '1',
}
items_data_url = 'https://www.edelweiss.plus/GetTreelineControl.aspx?controlName=/uc/listviews/ListView_Title_Multi.ascx'
req = Request(items_data_url, data)
response = browser.open_novisit(req)
raw = response.read()
root = parse_html(raw)
for item in root.xpath('//div[@data-priority]'):
row = item.getparent().getparent()
sku = item.get('id').split('-')[-1]
isbns = [x.strip() for x in row.xpath('descendant::*[contains(@class, "pev_sku")]/text()')[0].split(',') if check_isbn(x.strip())]
isbns.sort(key=len, reverse=True)
try:
tags = [x.strip() for x in astext(row.xpath('descendant::*[contains(@class, "pev_categories")]')[0]).split('/')]
except IndexError:
tags = []
rating = 0
for bar in row.xpath('descendant::*[contains(@class, "bgdColorCommunity")]/@style'):
m = re.search('width: (\d+)px;.*max-width: (\d+)px', bar)
if m is not None:
rating = float(m.group(1)) / float(m.group(2))
break
try:
pubdate = parse_only_date(astext(row.xpath('descendant::*[contains(@class, "pev_shipDate")]')[0]
).split(':')[-1].split(u'\xa0')[-1].strip(), assume_utc=True)
except Exception:
log.exception('Error parsing published date')
pubdate = None
authors = []
for x in [x.strip() for x in row.xpath('descendant::*[contains(@class, "pev_contributor")]/@title')]:
authors.extend(a.strip() for a in x.split(','))
entry = {
'sku': sku,
'cover': row.xpath('descendant::img/@src')[0].split('?')[0],
'publisher': astext(row.xpath('descendant::*[contains(@class, "headerPublisher")]')[0]),
'title': astext(row.xpath('descendant::*[@id="title_{}"]'.format(sku))[0]),
'authors': authors,
'isbns': isbns,
'tags': tags,
'pubdate': pubdate,
'format': ' '.join(row.xpath('descendant::*[contains(@class, "pev_format")]/text()')).strip(),
'rating': rating,
}
if entry['cover'].startswith('/'):
entry['cover'] = None
yield entry
class Edelweiss(Source):
name = 'Edelweiss'
version = (1, 0, 0)
minimum_calibre_version = (2, 80, 0)
version = (2, 0, 0)
minimum_calibre_version = (3, 6, 0)
description = _('Downloads metadata and covers from Edelweiss - A catalog updated by book publishers')
capabilities = frozenset(['identify', 'cover'])
touched_fields = frozenset([
'title', 'authors', 'tags', 'pubdate', 'comments', 'publisher',
'identifier:isbn', 'identifier:edelweiss'])
'identifier:isbn', 'identifier:edelweiss', 'rating'])
supports_gzip_transfer_encoding = True
has_html_comments = True
@ -189,11 +208,11 @@ class Edelweiss(Source):
def user_agent(self):
# Pass in an index to random_user_agent() to test with a particular
# user agent
return random_user_agent()
return random_user_agent(allow_ie=False)
def _get_book_url(self, sku):
if sku:
return 'https://edelweiss.abovethetreeline.com/ProductDetailPage.aspx?sku=%s'%sku
return 'https://www.edelweiss.plus/#sku={}&page=1'.format(sku)
def get_book_url(self, identifiers): # {{{
sku = identifiers.get('edelweiss', None)
@ -213,10 +232,9 @@ class Edelweiss(Source):
def create_query(self, log, title=None, authors=None, identifiers={}):
from urllib import urlencode
BASE_URL = 'https://edelweiss.abovethetreeline.com/Browse.aspx?source=catalog&rg=4187&group=browse&pg=0&'
params = {
'browseType':'title', 'startIndex':0, 'savecook':1, 'sord':20, 'secSord':20, 'tertSord':20,
}
import time
BASE_URL = ('https://www.edelweiss.plus/GetTreelineControl.aspx?'
'controlName=/uc/listviews/controls/ListView_data.ascx&itemID=0&resultType=32&dashboardType=8&itemType=1&dataType=products&keywordSearch&')
keywords = []
isbn = check_isbn(identifiers.get('isbn', None))
if isbn is not None:
@ -225,28 +243,33 @@ class Edelweiss(Source):
title_tokens = list(self.get_title_tokens(title))
if title_tokens:
keywords.extend(title_tokens)
# Searching with author names does not work on edelweiss
# author_tokens = self.get_author_tokens(authors,
# only_first_author=True)
# if author_tokens:
# keywords.extend(author_tokens)
author_tokens = self.get_author_tokens(authors, only_first_author=True)
if author_tokens:
keywords.extend(author_tokens)
if not keywords:
return None
params['bsk'] = (' '.join(keywords)).encode('utf-8')
params = {
'q': (' '.join(keywords)).encode('utf-8'),
'_': str(int(time.time()))
}
return BASE_URL+urlencode(params)
# }}}
def identify(self, log, result_queue, abort, title=None, authors=None, # {{{
identifiers={}, timeout=30):
from urlparse import parse_qs
import json
book_url = self._get_book_url(identifiers.get('edelweiss', None))
br = self.browser
if book_url:
entries = [(book_url, identifiers['edelweiss'])]
br.addheaders = [
('Referer', 'https://www.edelweiss.plus/'),
('X-Requested-With', 'XMLHttpRequest'),
('Cache-Control', 'no-cache'),
('Pragma', 'no-cache'),
]
if 'edelweiss' in identifiers:
items = [identifiers['edelweiss']]
else:
entries = []
query = self.create_query(log, title=title, authors=authors,
identifiers=identifiers)
if not query:
@ -254,66 +277,41 @@ class Edelweiss(Source):
return
log('Using query URL:', query)
try:
raw = br.open_novisit(query, timeout=timeout).read()
raw = br.open(query, timeout=timeout).read().decode('utf-8')
except Exception as e:
log.exception('Failed to make identify query: %r'%query)
return as_unicode(e)
items = re.search('window[.]items\s*=\s*(.+?);', raw)
if items is None:
log.error('Failed to get list of matching items')
log.debug('Response text:')
log.debug(raw)
return
items = json.loads(items.group(1))
try:
root = parse_html(raw)
except Exception as e:
log.exception('Failed to parse identify results')
return as_unicode(e)
from css_selectors import Select
select = Select(root)
has_isbn = check_isbn(identifiers.get('isbn', None)) is not None
if not has_isbn:
author_tokens = set(x.lower() for x in self.get_author_tokens(authors, only_first_author=True))
for entry in select('div.listRow div.listRowMain'):
a = entry.xpath('descendant::a[contains(@href, "sku=") and contains(@href, "productDetailPage.aspx")]')
if not a:
continue
href = a[0].get('href')
prefix, qs = href.partition('?')[0::2]
sku = parse_qs(qs).get('sku', None)
if sku and sku[0]:
sku = sku[0]
div = tuple(select('div.sku.attGroup'))
if div:
text = astext(div[0])
isbns = [check_isbn(x.strip()) for x in text.split(',')]
for isbn in isbns:
if isbn:
self.cache_isbn_to_identifier(isbn, sku)
for img in entry.xpath('descendant::img[contains(@src, "/jacket_covers/thumbnail/")]'):
self.cache_identifier_to_cover_url(sku, img.get('src').replace('/thumbnail/', '/flyout/'))
div = tuple(select('div.format.attGroup'))
text = astext(div[0]).lower()
if 'audio' in text or 'mp3' in text: # Audio-book, ignore
continue
if not has_isbn:
# edelweiss returns matches based only on title, so we
# filter by author manually
div = tuple(select('div.contributor.attGroup'))
try:
entry_authors = set(self.get_author_tokens([x.strip() for x in astext(div[0]).lower().split(',')]))
except IndexError:
entry_authors = set()
if not entry_authors.issuperset(author_tokens):
continue
entries.append((self._get_book_url(sku), sku))
if (not entries and identifiers and title and authors and
if (not items and identifiers and title and authors and
not abort.is_set()):
return self.identify(log, result_queue, abort, title=title,
authors=authors, timeout=timeout)
if not entries:
if not items:
return
workers = [Worker(skul, url, i, result_queue, br.clone_browser(), timeout, log, self)
for i, (url, skul) in enumerate(entries[:5])]
workers = []
items = items[:5]
for i, item in enumerate(get_basic_data(self.browser, log, *items)):
sku = item['sku']
for isbn in item['isbns']:
self.cache_isbn_to_identifier(isbn, sku)
if item['cover']:
self.cache_identifier_to_cover_url(sku, item['cover'])
fmt = item['format'].lower()
if 'audio' in fmt or 'mp3' in fmt:
continue # Audio-book, ignore
workers.append(Worker(item, i, result_queue, br.clone_browser(), timeout, log, self))
if not workers:
return
for w in workers:
w.start()
@ -390,14 +388,13 @@ if __name__ == '__main__':
# Multiple authors and two part title and no general description
({'identifiers':{'edelweiss':'0321180607'}},
[title_test(
"XQuery from the Experts: A Guide to the W3C XML Query Language"
"XQuery From the Experts: A Guide to the W3C XML Query Language"
, exact=True), authors_test([
'Howard Katz', 'Don Chamberlin', 'Denise Draper', 'Mary Fernandez',
'Michael Kay', 'Jonathan Robie', 'Michael Rys', 'Jerome Simeon',
'Jim Tivy', 'Philip Wadler']), pubdate_test(2003, 8, 22),
comments_test('Jérôme Siméon'), lambda mi: bool(mi.comments and 'No title summary' not in mi.comments)
]),
]
start, stop = 0, len(tests)