Remove the jsbrowser as it used the obsolete Qt WebKit

There are still a handful of recipes to be ported, but they will require
someone to provide login credentials. Also the Woblink store plugin
needs to be ported.
This commit is contained in:
Kovid Goyal 2016-04-25 14:52:27 +05:30
parent d46ff32eb2
commit 974740539d
12 changed files with 5 additions and 2044 deletions

View File

@ -419,10 +419,6 @@ def browser(honor_time=True, max_time=2, mobile_browser=False, user_agent=None,
return opener return opener
def jsbrowser(*args, **kwargs):
from calibre.web.jsbrowser.browser import Browser
return Browser(*args, **kwargs)
def fit_image(width, height, pwidth, pheight): def fit_image(width, height, pwidth, pheight):
''' '''
Fit image in box of width pwidth and height pheight. Fit image in box of width pwidth and height pheight.

View File

@ -3,8 +3,7 @@ Remove all *content_server_* and server_listen_on tweaks
Rewrite server integration with nginx/apache section Rewrite server integration with nginx/apache section
Remove dependency on cherrypy from download and contribs pages and remove Remove dependency on cherrypy from download and contribs pages and remove
cherrypy private copy (you will have to re-write jsbrowser.test to not use cherrypy private copy
cherrypy)
Remove the bundled routes package Remove the bundled routes package

View File

@ -60,10 +60,6 @@ class Browser(B):
B.set_cookiejar(self, *args, **kwargs) B.set_cookiejar(self, *args, **kwargs)
self._clone_actions['set_cookiejar'] = ('set_cookiejar', args, kwargs) self._clone_actions['set_cookiejar'] = ('set_cookiejar', args, kwargs)
def copy_cookies_from_jsbrowser(self, jsbrowser):
for cookie in jsbrowser.cookies:
self.cookiejar.set_cookie(cookie)
def set_cookie(self, name, value, domain, path='/'): def set_cookie(self, name, value, domain, path='/'):
self.cookiejar.set_cookie(Cookie( self.cookiejar.set_cookie(Cookie(
None, name, value, None, name, value,

View File

@ -1,411 +0,0 @@
#!/usr/bin/env python2
# vim:fileencoding=utf-8
from __future__ import (unicode_literals, division, absolute_import,
print_function)
__license__ = 'GPL v3'
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
import os, re
from io import BytesIO
from functools import partial
from calibre import force_unicode, walk
from calibre.constants import __appname__
from calibre.web.feeds import feeds_from_index
from calibre.web.feeds.news import BasicNewsRecipe
from calibre.web.fetch.javascript import fetch_page, AbortFetch, links_from_selectors
from calibre.ebooks.chardet import xml_to_unicode, strip_encoding_declarations
from calibre.utils.cleantext import clean_xml_chars
def image_data_to_url(data, base='cover'):
from calibre.utils.imghdr import what
ans = BytesIO(data)
ext = what(None, data)
if not ext:
if data.startswith(b'%PDF-'):
ext = 'pdf'
else:
ext = 'jpg'
ans.name = 'cover.' + ext
return ans
class JavascriptRecipe(BasicNewsRecipe):
'''
This recipe class is used to download content from javascript heavy
sites. It uses a full WebKit browser to do the downloading, therefore it
can support sites that use javascript to dynamically fetch content.
Most of the parameters from :class:`BasicNewsRecipe` still apply, apart
from those noted specifically below. The biggest difference is that you use
CSS selectors to specify tags to keep and remove as well as links to
follow, instead of the BeautifulSoup selectors used in
:class:`BasicNewsRecipe`. Indeed, BeautifulSoup has been completely removed
and replaced by lxml, whereever you previously expected BeautifulSoup to
represent parsed HTML, you will now get lxml trees. See
http://lxml.de/tutorial.html for a tutorial on using lxml.
The various article pre-processing callbacks such as ``preprocess_html()``
and ``skip_ad_pages()`` have all been replaced by just two callbacks,
:meth:`preprocess_stage1` and :meth:`preprocess_stage2`. These methods are
a passed the browser instance, and can thus do anything they like.
An important method that you will often have to implement is
:meth:`load_complete` to tell the download system when a page has finished
loading and is ready to be scraped.
You can use the builtin recipe for time.com as an example of the usage of
this class.
'''
#: Minimum calibre version needed to use this recipe
requires_version = (0, 9, 35)
#: List of tags to be removed. Specified tags are removed from downloaded HTML.
#: A tag is specified using CSS selectors.
#: A common example::
#:
#: remove_tags = ['div.advert', 'div.tools']
#:
#: This will remove all `<div class="advert">` and `<div class="tools">` tags and all
#: their children from the downloaded :term:`HTML`.
remove_tags = ()
#: Remove all tags that occur after the specified tag.
#: A tag is specified using CSS selectors.
#: For example::
#:
# : remove_tags_after = '#content'
#:
#: will remove all tags after the first element with `id="content"`.
remove_tags_after = None
#: Remove all tags that occur before the specified tag.
#: A tag is specified using CSS selectors.
#: For example::
#:
# : remove_tags_before = '#content'
#:
#: will remove all tags before the first element with `id="content"`.
remove_tags_before = None
#: Keep only the specified tags and their children.
#: Uses the CSS selector syntax.
#: If this list is not empty, then the `<body>` tag will be emptied and re-filled with
#: the tags that match the entries in this list. For example::
#:
# : keep_only_tags = ['#content', '#heading']
#:
#: will keep only tags that have an `id` attribute of `"content"` or `"heading"`.
keep_only_tags = ()
#: A list of selectors that match <a href> elements that you want followed.
#: For this to work you must also set recursions to at least 1.
#: You can get more control by re-implemnting :met:`select_links` in your sub-class.
links_from_selectors = ()
def select_links(self, browser, url, recursion_level):
'''
Override this method in your recipe to implement arbitrary link following logic. It must return a
list of URLs, each of which will be downloaded in turn.
'''
return links_from_selectors(self.links_from_selectors, self.recursions, browser, url, recursion_level)
def get_jsbrowser(self, *args, **kwargs):
'''
Override this method in your recipe if you want to use a non-standard Browser object.
'''
from calibre.web.jsbrowser.browser import Browser
return Browser(default_timeout=kwargs.get('default_timeout', self.timeout))
def do_login(self, browser, username, password):
'''
This method is used to login to a website that uses a paywall. Implement it in
your recipe if the site uses a paywall. An example implementation::
def do_login(self, browser, username, password):
browser.visit('http://some-page-that-has-a-login')
form = browser.select_form(nr=0) # Select the first form on the page
form['username'] = username
form['password'] = password
browser.submit(timeout=120) # Submit the form and wait at most two minutes for loading to complete
Note that you can also select forms with CSS2 selectors, like this::
browser.select_form('form#login_form')
browser.select_from('form[name="someform"]')
'''
pass
def get_publication_data(self, browser):
'''
Download the cover, the masthead image and the list of sections/articles.
Should return a dictionary with keys 'index', 'cover' and 'masthead'.
'cover' and 'masthead' are optional, if not present, they will be auto-generated.
The index must be in the same format as described in :meth:`parse_index`.
The cover and masthead must be the downloaded image data as a bytestring.
'''
raise NotImplementedError('You must implement this method in your recipe')
def load_complete(self, browser, url, recursion_level):
'''
This method is called after every page on the website is loaded. To be
precise, it is called when the DOM is ready. If further checks need to
be made, they should be made here. For example, if you want to check
that some element in the DOM is present, you would use::
def load_complete(self, browser, url, rl):
browser.wait_for_element('#article-footer')
return True
where article-footer is the id of the element you want to wait for.
'''
return True
def abort_article(self, msg=None):
'''
Call this method in any article processing callback to abort the download of the article.
For example::
def postprocess_html(self, article, root, url, recursion_level):
if '/video/' in url:
self.abort_article()
return root
This will cause this article to be ignored.
'''
raise AbortFetch(msg or 'Article fetch aborted')
def preprocess_stage1(self, article, browser, url, recursion_level):
'''
This method is a callback called for every downloaded page, before any cleanup is done.
'''
pass
def preprocess_stage2(self, article, browser, url, recursion_level):
'''
This method is a callback called for every downloaded page, after the cleanup is done.
'''
pass
def postprocess_html(self, article, root, url, recursion_level):
'''
This method is called with the downloaded html for every page as an lxml
tree. It is called after all cleanup and related processing is completed.
You can use it to perform any extra cleanup,or to abort the article
download (see :meth:`abort_article`).
:param article: The Article object, which represents the article being currently downloaded
:param root: The parsed downloaded HTML, as an lxml tree, see http://lxml.de/tutorial.html
for help with using lxml to manipulate HTML.
:param url: The URL from which this HTML was downloaded
:param recursion_level: This is zero for the first page in an article and > 0 for subsequent pages.
'''
return root
def index_to_soup(self, url_or_raw, raw=False):
'''
Convenience method that takes an URL to the index page and returns
a parsed lxml tree representation of it. See http://lxml.de/tutorial.html
`url_or_raw`: Either a URL or the downloaded index page as a string
'''
if re.match(r'\w+://', url_or_raw):
self.jsbrowser.start_load(url_or_raw)
html = self.jsbrowser.html
else:
html = url_or_raw
if isinstance(html, bytes):
html = xml_to_unicode(html)[0]
html = strip_encoding_declarations(html)
if raw:
return html
import html5lib
root = html5lib.parse(clean_xml_chars(html), treebuilder='lxml', namespaceHTMLElements=False).getroot()
return root
# ***************************** Internal API *****************************
def _preprocess_browser(self, article, browser, url, stage, recursion_level):
func = getattr(self, 'preprocess_stage%d' % stage)
return func(article, browser, url, recursion_level)
def _postprocess_html(self, article, feed_num, art_num, feed_len, root, url, recursion_level):
from lxml.html.builder import STYLE
if self.no_stylesheets:
for link in root.xpath('//link[@href]'):
if (link.get('type', '') or 'text/css'):
link.getparent().remove(link)
for style in root.xpath('//style'):
style.getparent().remove(style)
# Add recipe specific styling
head = root.xpath('//head|//body')
head = head[0] if head else next(root.iterdescendants())
head.append(STYLE(self.template_css + '\n\n' + (self.extra_css or '') + '\n'))
# Add the top navbar
if recursion_level == 0:
body = root.xpath('//body')
if body:
templ = self.navbar.generate(
False, feed_num, art_num, feed_len, not self.has_single_feed, url,
__appname__, center=self.center_navbar,
extra_css=self.extra_css)
body[0].insert(0, templ.root.xpath('//div')[0])
# Remove javascript
remove_attrs = set(self.remove_attributes)
if self.remove_javascript:
remove_attrs.add('onload')
for script in root.xpath('//*[name()="script" or name()="noscript"]'):
script.getparent().remove(script)
# Remove specified attributes
for attr in remove_attrs:
for tag in root.xpath('//*[@%s]' % attr):
tag.attrib.pop(attr, None)
# Remove tags that cause problems on ebook devices
nuke = ['base', 'iframe', 'canvas', 'embed', 'command', 'datalist', 'video', 'audio', 'form']
for tag in root.xpath('|'.join('//%s' % tag for tag in nuke)):
tag.getparent().remove(tag)
root = self.postprocess_html(article, root, url, recursion_level)
if root is not None:
# Nuke HTML5 tags
tags = ['article', 'aside', 'header', 'footer', 'nav', 'figcaption', 'figure', 'section']
for tag in root.xpath('|'.join('//%s' % tag for tag in tags)):
tag.tag = 'div'
self.populate_article_metadata(article, root, recursion_level == 0)
return root
def download(self):
browser = self.jsbrowser = self.get_jsbrowser()
with browser:
try:
if self.needs_subscription and self.username and self.password:
self.do_login(browser, self.username, self.password)
data = self.get_publication_data(browser)
# Process cover, if any
cdata = data.get('cover', None)
if cdata:
self.cover_url = image_data_to_url(cdata)
self.download_cover()
# Process masthead, if any
mdata = data.get('masthead', None)
if mdata:
self.masthead_url = image_data_to_url(mdata)
self.resolve_masthead()
# Process the list of sections/articles
return self.build_index(data, browser)
finally:
self.cleanup()
def build_index(self, data, browser):
sections = data.get('index', None)
if not sections:
raise ValueError('No articles found, aborting')
feeds = feeds_from_index(sections, oldest_article=self.oldest_article,
max_articles_per_feed=self.max_articles_per_feed,
log=self.log)
if not feeds:
raise ValueError('No articles found, aborting')
if self.ignore_duplicate_articles is not None:
feeds = self.remove_duplicate_articles(feeds)
if self.test:
feeds = feeds[:self.test[0]]
self.has_single_feed = len(feeds) == 1
index = os.path.join(self.output_dir, 'index.html')
html = self.feeds2index(feeds)
with open(index, 'wb') as fi:
fi.write(html)
if self.reverse_article_order:
for feed in feeds:
if hasattr(feed, 'reverse'):
feed.reverse()
self.report_progress(0, _('Got feeds from index page'))
resource_cache = {}
total = 0
for feed in feeds:
total += min(self.max_articles_per_feed, len(feed))
num = 0
for f, feed in enumerate(feeds):
feed_dir = os.path.join(self.output_dir, 'feed_%d'%f)
if not os.path.isdir(feed_dir):
os.makedirs(feed_dir)
for a, article in enumerate(feed):
if a >= self.max_articles_per_feed:
break
num += 1
art_dir = os.path.join(feed_dir, 'article_%d'%a)
if not os.path.isdir(art_dir):
os.makedirs(art_dir)
try:
url = self.print_version(article.url)
except NotImplementedError:
url = article.url
except:
self.log.exception('Failed to find print version for: '+article.url)
url = None
if not url:
continue
self.log.debug('Downloading article:', article.title, 'from', url)
try:
pages = fetch_page(
url,
load_complete=self.load_complete,
links=self.select_links,
remove=self.remove_tags,
keep_only=self.keep_only_tags,
preprocess_browser=partial(self._preprocess_browser, article),
postprocess_html=partial(self._postprocess_html, article, f, a, len(feed)),
remove_before=self.remove_tags_before,
remove_after=self.remove_tags_after,
remove_javascript=self.remove_javascript,
delay=self.delay,
resource_cache=resource_cache, output_dir=art_dir, browser=browser)
except AbortFetch:
self.log.exception('Fetching of article: %r aborted' % article.title)
continue
except Exception:
self.log.exception('Fetching of article: %r failed' % article.title)
continue
self.log.debug('Downloaded article:', article.title, 'from', article.url)
article.orig_url = article.url
article.url = 'article_%d/index.html'%a
article.downloaded = True
article.sub_pages = pages[1:]
self.report_progress(float(num)/total,
_(u'Article downloaded: %s')%force_unicode(article.title))
for f, feed in enumerate(feeds):
html = self.feed2index(f, feeds)
feed_dir = os.path.join(self.output_dir, 'feed_%d'%f)
with open(os.path.join(feed_dir, 'index.html'), 'wb') as fi:
fi.write(html)
if self.no_stylesheets:
for f in walk(self.output_dir):
if f.endswith('.css'):
os.remove(f)
self.create_opf(feeds)
self.report_progress(1, _('Download finished'))
return index

View File

@ -338,12 +338,6 @@ class BasicNewsRecipe(Recipe):
#: ignore_duplicate_articles = {'title', 'url'} #: ignore_duplicate_articles = {'title', 'url'}
ignore_duplicate_articles = None ignore_duplicate_articles = None
#: If you set this True, then calibre will use javascript to login to the
#: website. This is needed for some websites that require the use of
#: javascript to login. If you set this to True you must implement the
#: :meth:`javascript_login` method, to do the actual logging in.
use_javascript_to_login = False
# The following parameters control how the recipe attempts to minimize # The following parameters control how the recipe attempts to minimize
# jpeg image sizes # jpeg image sizes
@ -483,48 +477,10 @@ class BasicNewsRecipe(Recipe):
return br return br
''' '''
if self.use_javascript_to_login:
if getattr(self, 'browser', None) is not None:
return self.clone_browser(self.browser)
from calibre.web.jsbrowser.browser import Browser
br = Browser(headless=not self.test)
with br:
self.javascript_login(br, self.username, self.password)
kwargs['user_agent'] = br.user_agent
ans = browser(*args, **kwargs)
ans.copy_cookies_from_jsbrowser(br)
return ans
else:
br = browser(*args, **kwargs) br = browser(*args, **kwargs)
br.addheaders += [('Accept', '*/*')] br.addheaders += [('Accept', '*/*')]
return br return br
def javascript_login(self, browser, username, password):
'''
This method is used to login to a website that uses javascript for its
login form. After the login is complete, the cookies returned from the
website are copied to a normal (non-javascript) browser and the
download proceeds using those cookies.
An example implementation::
def javascript_login(self, browser, username, password):
browser.visit('http://some-page-that-has-a-login')
form = browser.select_form(nr=0) # Select the first form on the page
form['username'] = username
form['password'] = password
browser.submit(timeout=120) # Submit the form and wait at most two minutes for loading to complete
Note that you can also select forms with CSS2 selectors, like this::
browser.select_form('form#login_form')
browser.select_from('form[name="someform"]')
'''
raise NotImplementedError('You must implement the javascript_login()'
' method if you set use_javascript_to_login'
' to True')
def clone_browser(self, br): def clone_browser(self, br):
''' '''
Clone the browser br. Cloned browsers are used for multi-threaded Clone the browser br. Cloned browsers are used for multi-threaded

View File

@ -7,12 +7,11 @@ Builtin recipes.
import re, time, io import re, time, io
from calibre.web.feeds.news import (BasicNewsRecipe, CustomIndexRecipe, from calibre.web.feeds.news import (BasicNewsRecipe, CustomIndexRecipe,
AutomaticNewsRecipe, CalibrePeriodical) AutomaticNewsRecipe, CalibrePeriodical)
from calibre.web.feeds.jsnews import JavascriptRecipe
from calibre.ebooks.BeautifulSoup import BeautifulSoup from calibre.ebooks.BeautifulSoup import BeautifulSoup
from calibre.utils.config import JSONConfig from calibre.utils.config import JSONConfig
basic_recipes = (BasicNewsRecipe, AutomaticNewsRecipe, CustomIndexRecipe, basic_recipes = (BasicNewsRecipe, AutomaticNewsRecipe, CustomIndexRecipe,
CalibrePeriodical, JavascriptRecipe) CalibrePeriodical)
custom_recipes = JSONConfig('custom_recipes/index.json') custom_recipes = JSONConfig('custom_recipes/index.json')

View File

@ -1,267 +0,0 @@
#!/usr/bin/env python2
# vim:fileencoding=utf-8
from __future__ import (unicode_literals, division, absolute_import,
print_function)
__license__ = 'GPL v3'
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
import time, os, hashlib
from operator import attrgetter
from collections import defaultdict
from functools import partial
from calibre import jsbrowser
from calibre.ebooks.chardet import strip_encoding_declarations
from calibre.utils.imghdr import what
# remove_comments() {{{
remove_comments = '''
function remove_comments(node) {
var nodes = node.childNodes, i=0, t;
while((t = nodes.item(i++))) {
switch(t.nodeType){
case Node.ELEMENT_NODE:
remove_comments(t);
break;
case Node.COMMENT_NODE:
node.removeChild(t);
i--;
}
}
}
remove_comments(document)
''' # }}}
class AbortFetch(ValueError):
pass
def children(elem):
elem = elem.firstChild()
while not elem.isNull():
yield elem
elem = elem.nextSibling()
def apply_keep_only(browser, keep_only):
mf = browser.page.mainFrame()
body = mf.findFirstElement('body')
if body.isNull():
browser.log.error('Document has no body, cannot apply keep_only')
return
keep = []
for selector in keep_only:
keep.extend(x for x in mf.findAllElements(selector))
if not keep:
browser.log.error('Failed to find any elements matching the keep_only selectors: %r' % list(keep_only))
return
for elem in keep:
body.appendInside(elem)
for elem in tuple(children(body)):
preserve = False
for x in keep:
if x == elem:
preserve = True
break
if preserve:
break
elem.removeFromDocument()
def apply_remove(browser, remove):
mf = browser.page.mainFrame()
for selector in remove:
for elem in mf.findAllElements(selector):
if not elem.isNull():
elem.removeFromDocument()
def remove_beyond(browser, selector, before=True):
mf = browser.page.mainFrame()
elem = mf.findFirstElement(selector)
if elem.isNull():
browser.log('Failed to find any element matching the selector: %s' % selector)
return
next_sibling = attrgetter('previousSibling' if before else 'nextSibling')
while not elem.isNull() and unicode(elem.tagName()) != 'body':
remove = []
after = next_sibling(elem)()
while not after.isNull():
remove.append(after)
after = next_sibling(after)()
for x in remove:
x.removeFromDocument()
elem = elem.parent()
def is_tag(elem, name):
return unicode(elem.tagName()).lower() == name.lower()
def download_resources(browser, resource_cache, output_dir):
img_counter = style_counter = 0
resources = defaultdict(list)
for img in browser.css_select('img[src]', all=True):
# Using javascript ensures that absolute URLs are returned, direct
# attribute access does not do that
src = unicode(img.evaluateJavaScript('this.src') or '').strip()
if src:
resources[src].append(img)
for link in browser.css_select('link[href]', all=True):
lt = unicode(link.attribute('type')).strip() or 'text/css'
rel = unicode(link.attribute('rel')).strip() or 'stylesheet'
if lt == 'text/css' and rel == 'stylesheet':
href = unicode(link.evaluateJavaScript('this.href') or '').strip()
if href:
resources[href].append(link)
else:
link.removeFromDocument()
else:
link.removeFromDocument()
loaded_resources = browser.wait_for_resources(resources)
for url, raw in loaded_resources.iteritems():
h = hashlib.sha1(raw).digest()
if h in resource_cache:
href = os.path.relpath(resource_cache[h], output_dir).replace(os.sep, '/')
else:
elem = resources[url][0]
if is_tag(elem, 'link'):
style_counter += 1
href = 'style_%d.css' % style_counter
else:
img_counter += 1
ext = what(None, raw) or 'jpg'
if ext == 'jpeg':
ext = 'jpg' # Apparently Moon+ cannot handle .jpeg
href = 'img_%d.%s' % (img_counter, ext)
dest = os.path.join(output_dir, href)
resource_cache[h] = dest
with open(dest, 'wb') as f:
f.write(raw)
for elem in resources[url]:
elem.setAttribute('href' if is_tag(elem, 'link') else 'src', href)
failed = set(resources) - set(loaded_resources)
for url in failed:
browser.log.warn('Failed to download resource:', url)
for elem in resources[url]:
elem.removeFromDocument()
def save_html(browser, output_dir, postprocess_html, url, recursion_level):
import html5lib
from calibre.utils.cleantext import clean_xml_chars
html = strip_encoding_declarations(browser.html)
if isinstance(html, unicode):
html = clean_xml_chars(html)
root = html5lib.parse(html, treebuilder='lxml', namespaceHTMLElements=False).getroot()
root = postprocess_html(root, url, recursion_level)
if root is None:
# user wants this page to be aborted
raise AbortFetch('%s was aborted during postprocess' % url)
with open(os.path.join(output_dir, 'index.html'), 'wb') as f:
from lxml.html import tostring
f.write(tostring(root, include_meta_content_type=True, encoding='utf-8', pretty_print=True))
return f.name
def links_from_selectors(selectors, recursions, browser, url, recursion_level):
ans = []
if recursions > recursion_level:
for selector in selectors:
for a in browser.css_select(selector, all=True):
href = unicode(a.evaluateJavaScript('this.href') or '').strip()
if href:
ans.append(href)
return ans
def clean_dom(
browser, url, recursion_level, preprocess_browser, remove_javascript,
keep_only, remove_after, remove_before, remove):
# Remove comments as otherwise we can end up with nested comments, which
# cause problems later
browser.page.mainFrame().evaluateJavaScript(remove_comments)
preprocess_browser(browser, url, 1, recursion_level)
if remove_javascript:
for elem in browser.css_select('script', all=True):
elem.removeFromDocument()
if keep_only:
apply_keep_only(browser, keep_only)
if remove_after:
remove_beyond(browser, remove_after, before=False)
if remove_before:
remove_beyond(browser, remove_before, before=True)
if remove:
apply_remove(browser, remove)
preprocess_browser(browser, url, 2, recursion_level)
def fetch_page(
url=None,
load_complete=lambda browser, url, recursion_level: True,
links=lambda browser, url, recursion_level: (),
keep_only=(),
remove_after=None,
remove_before=None,
remove=(),
remove_javascript=True,
delay=0,
preprocess_browser=lambda browser, url, stage, recursion_level:None,
postprocess_html=lambda root, url, recursion_level: root,
resource_cache={},
output_dir=None,
browser=None,
recursion_level=0
):
output_dir = output_dir or os.getcwdu()
if browser is None:
browser = jsbrowser()
if delay:
time.sleep(delay)
# Load the DOM
if url is not None:
start_time = time.time()
browser.start_load(url)
while not load_complete(browser, url, recursion_level):
browser.run_for_a_time(0.1)
if time.time() - start_time > browser.default_timeout:
from calibre.web.jsbrowser.browser import Timeout
raise Timeout('Timed out while waiting for %s to load' % url)
children = links(browser, url, recursion_level)
# Cleanup the DOM
clean_dom(
browser, url, recursion_level, preprocess_browser,
remove_javascript, keep_only, remove_after, remove_before, remove)
# Download resources
download_resources(browser, resource_cache, output_dir)
# Get HTML from the DOM
pages = [save_html(browser, output_dir, postprocess_html, url, recursion_level)]
# Fetch the linked pages
for i, curl in enumerate(children):
odir = os.path.join(output_dir, 'link%d' % (i + 1))
if not os.path.exists(odir):
os.mkdir(odir)
try:
pages.extend(fetch_page(
curl, load_complete=load_complete, links=links, keep_only=keep_only,
remove_after=remove_after, remove_before=remove_before, remove=remove,
preprocess_browser=preprocess_browser, postprocess_html=postprocess_html,
resource_cache=resource_cache, output_dir=odir, browser=browser, delay=delay,
recursion_level=recursion_level+1))
except AbortFetch:
continue
return tuple(pages)
if __name__ == '__main__':
browser = jsbrowser()
fetch_page('http://www.time.com/time/magazine/article/0,9171,2145057,00.html', browser=browser,
links=partial(links_from_selectors, ('.wp-paginate a.page[href]',), 1),
keep_only=('article.post',), remove=('.entry-sharing', '.entry-footer', '.wp-paginate', '.post-rail'))

View File

@ -1,10 +0,0 @@
#!/usr/bin/env python2
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
from __future__ import (unicode_literals, division, absolute_import,
print_function)
__license__ = 'GPL v3'
__copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'

View File

@ -1,740 +0,0 @@
#!/usr/bin/env python2
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
from __future__ import (unicode_literals, division, absolute_import,
print_function)
__license__ = 'GPL v3'
__copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import os, pprint, time, uuid, re
from cookielib import Cookie
from threading import current_thread
from PyQt5.QtWebKit import QWebSettings, QWebElement
from PyQt5.QtWebKitWidgets import QWebPage, QWebView
from PyQt5.Qt import (
QObject, QNetworkAccessManager, QNetworkDiskCache, QNetworkProxy,
QNetworkProxyFactory, QEventLoop, QUrl, pyqtSignal, QDialog, QVBoxLayout,
QSize, QNetworkCookieJar, Qt, pyqtSlot, QPixmap)
from calibre import USER_AGENT, prints, get_proxies, get_proxy_info, prepare_string_for_xml
from calibre.constants import ispy3, cache_dir
from calibre.ptempfile import PersistentTemporaryDirectory
from calibre.utils.logging import ThreadSafeLog
from calibre.gui2 import must_use_qt, app_is_headless
from calibre.web.jsbrowser.forms import FormsMixin, default_timeout
class Timeout(Exception):
pass
class LoadError(Exception):
pass
class ElementNotFound(ValueError):
pass
class NotAFile(ValueError):
pass
class WebPage(QWebPage): # {{{
def __init__(self, log,
confirm_callback=None,
prompt_callback=None,
user_agent=USER_AGENT,
enable_developer_tools=False,
parent=None):
QWebPage.__init__(self, parent)
self.log = log
self.user_agent = user_agent if user_agent else USER_AGENT
self.confirm_callback = confirm_callback
self.prompt_callback = prompt_callback
self.setForwardUnsupportedContent(True)
self.unsupportedContent.connect(self.on_unsupported_content)
settings = self.settings()
if enable_developer_tools:
settings.setAttribute(QWebSettings.DeveloperExtrasEnabled, True)
QWebSettings.enablePersistentStorage(os.path.join(cache_dir(),
'webkit-persistence'))
QWebSettings.setMaximumPagesInCache(0)
self.bridge_name = 'b' + uuid.uuid4().get_hex()
self.mainFrame().javaScriptWindowObjectCleared.connect(
self.add_window_objects)
self.dom_loaded = False
def add_window_objects(self):
self.dom_loaded = False
mf = self.mainFrame()
mf.addToJavaScriptWindowObject(self.bridge_name, self)
mf.evaluateJavaScript('document.addEventListener( "DOMContentLoaded", %s.content_loaded, false )' % self.bridge_name)
def load_url(self, url):
self.dom_loaded = False
url = QUrl(url)
self.mainFrame().load(url)
self.ready_state # Without this, DOMContentLoaded does not fire for file:// URLs
@pyqtSlot()
def content_loaded(self):
self.dom_loaded = True
def userAgentForUrl(self, url):
return self.user_agent
def javaScriptAlert(self, frame, msg):
if self.view() is not None:
return QWebPage.javaScriptAlert(self, frame, msg)
prints('JSBrowser alert():', unicode(msg))
def javaScriptConfirm(self, frame, msg):
if self.view() is not None:
return QWebPage.javaScriptConfirm(self, frame, msg)
if self.confirm_callback is not None:
return self.confirm_callback(unicode(msg))
return True
def javaScriptConsoleMessage(self, msg, lineno, source_id):
prints('JSBrowser msg():%s:%s:'%(unicode(source_id), lineno), unicode(msg))
def javaScriptPrompt(self, frame, msg, default_value, *args):
if self.view() is not None:
return QWebPage.javaScriptPrompt(self, frame, msg, default_value,
*args)
if self.prompt_callback is None:
return (False, default_value) if ispy3 else False
value = self.prompt_callback(unicode(msg), unicode(default_value))
ok = value is not None
if ispy3:
return ok, value
if ok:
result = args[0]
result.clear()
result.append(value)
return ok
@pyqtSlot(result=bool)
def shouldInterruptJavaScript(self):
if self.view() is not None:
return QWebPage.shouldInterruptJavaScript(self)
return True
def on_unsupported_content(self, reply):
reply.abort()
self.log.warn('Unsupported content, ignoring: %s'%reply.url())
@property
def ready_state(self):
return unicode(self.mainFrame().evaluateJavaScript('document.readyState') or '')
@pyqtSlot(QPixmap)
def transfer_image(self, img):
self.saved_img = img
def get_image(self, qwe_or_selector):
qwe = qwe_or_selector
if not isinstance(qwe, QWebElement):
qwe = self.mainFrame().findFirstElement(qwe)
if qwe.isNull():
raise ValueError('Failed to find element with selector: %r'
% qwe_or_selector)
self.saved_img = QPixmap()
qwe.evaluateJavaScript('%s.transfer_image(this)' % self.bridge_name)
try:
return self.saved_img
finally:
del self.saved_img
def supportsExtension(self, extension):
if extension == QWebPage.ErrorPageExtension:
return True
return False
def extension(self, extension, option, output):
# Log more data about a failed page load
if extension != QWebPage.ErrorPageExtension:
return False
domain = {QWebPage.QtNetwork:'Network', QWebPage.Http:'HTTP', QWebPage.WebKit:'WebKit'}.get(option.domain, 'Unknown')
err = 'Error loading: %s: [%s %d: %s]' % (option.url.toString(), domain, option.error, unicode(option.errorString))
self.log.error(err)
return False # If we return True then loadFinished() will also return True, which we dont want
# }}}
class ProxyFactory(QNetworkProxyFactory): # {{{
def __init__(self, log):
QNetworkProxyFactory.__init__(self)
proxies = get_proxies()
self.proxies = {}
for scheme, proxy_string in proxies.iteritems():
scheme = scheme.lower()
info = get_proxy_info(scheme, proxy_string)
if info is None:
continue
hn, port = info['hostname'], info['port']
if not hn or not port:
continue
log.debug('JSBrowser using proxy:', pprint.pformat(info))
pt = {'socks5':QNetworkProxy.Socks5Proxy}.get(scheme,
QNetworkProxy.HttpProxy)
proxy = QNetworkProxy(pt, hn, port)
un, pw = info['username'], info['password']
if un:
proxy.setUser(un)
if pw:
proxy.setPassword(pw)
self.proxies[scheme] = proxy
self.default_proxy = QNetworkProxy(QNetworkProxy.DefaultProxy)
def queryProxy(self, query):
scheme = unicode(query.protocolTag()).lower()
return [self.proxies.get(scheme, self.default_proxy)]
# }}}
class NetworkAccessManager(QNetworkAccessManager): # {{{
OPERATION_NAMES = {getattr(QNetworkAccessManager, '%sOperation'%x) :
x.upper() for x in ('Head', 'Get', 'Put', 'Post', 'Delete',
'Custom')
}
report_reply_signal = pyqtSignal(object)
def __init__(self, log, disk_cache_size=50, parent=None):
QNetworkAccessManager.__init__(self, parent)
self.reply_count = 0
self.log = log
if disk_cache_size > 0:
self.cache = QNetworkDiskCache(self)
self.cache.setCacheDirectory(PersistentTemporaryDirectory(prefix='disk_cache_'))
self.cache.setMaximumCacheSize(int(disk_cache_size * 1024 * 1024))
self.setCache(self.cache)
self.sslErrors.connect(self.on_ssl_errors)
self.pf = ProxyFactory(log)
self.setProxyFactory(self.pf)
self.finished.connect(self.on_finished)
self.cookie_jar = QNetworkCookieJar()
self.setCookieJar(self.cookie_jar)
self.main_thread = current_thread()
self.report_reply_signal.connect(self.report_reply, type=Qt.QueuedConnection)
def on_ssl_errors(self, reply, errors):
reply.ignoreSslErrors()
def createRequest(self, operation, request, data):
url = unicode(request.url().toString(QUrl.None))
operation_name = self.OPERATION_NAMES[operation]
debug = []
debug.append(('Request: %s %s' % (operation_name, url)))
for h in request.rawHeaderList():
try:
d = ' %s: %s' % (h, request.rawHeader(h))
except:
d = ' %r: %r' % (h, request.rawHeader(h))
debug.append(d)
if data is not None:
raw = data.peek(1024)
try:
raw = raw.decode('utf-8')
except:
raw = repr(raw)
debug.append(' Request data: %s'%raw)
self.log.debug('\n'.join(debug))
return QNetworkAccessManager.createRequest(self, operation, request,
data)
def on_finished(self, reply):
if current_thread() is not self.main_thread:
# This method was called in a thread created by Qt. The python
# interpreter may not be in a safe state, so dont do anything
# more. This signal is queued which means the reply wont be
# reported unless someone spins the event loop. So far, I have only
# seen this happen when doing Ctrl+C in the console.
self.report_reply_signal.emit(reply)
else:
self.report_reply(reply)
def report_reply(self, reply):
reply_url = unicode(reply.url().toString(QUrl.None))
self.reply_count += 1
err = reply.error()
if err:
l = self.log.debug if err == reply.OperationCanceledError else self.log.warn
l("Reply error: %s - %d (%s)" % (reply_url, err, unicode(reply.errorString())))
else:
debug = []
debug.append("Reply successful: %s" % reply_url)
for h in reply.rawHeaderList():
try:
d = ' %s: %s' % (h, reply.rawHeader(h))
except:
d = ' %r: %r' % (h, reply.rawHeader(h))
debug.append(d)
self.log.debug('\n'.join(debug))
def py_cookies(self):
for c in self.cookie_jar.allCookies():
name, value = map(bytes, (c.name(), c.value()))
domain = bytes(c.domain())
initial_dot = domain_specified = domain.startswith(b'.')
secure = bool(c.isSecure())
path = unicode(c.path()).strip().encode('utf-8')
expires = c.expirationDate()
is_session_cookie = False
if expires.isValid():
expires = expires.toTime_t()
else:
expires = None
is_session_cookie = True
path_specified = True
if not path:
path = b'/'
path_specified = False
c = Cookie(0, # version
name, value,
None, # port
False, # port specified
domain, domain_specified, initial_dot, path,
path_specified,
secure, expires, is_session_cookie,
None, # Comment
None, # Comment URL
{} # rest
)
yield c
# }}}
class LoadWatcher(QObject): # {{{
def __init__(self, page, parent=None):
QObject.__init__(self, parent)
self.is_loading = True
self.loaded_ok = None
page.loadFinished.connect(self)
self.page = page
def __call__(self, ok):
self.loaded_ok = ok
self.is_loading = False
self.page.loadFinished.disconnect(self)
self.page = None
# }}}
class BrowserView(QDialog): # {{{
def __init__(self, page, parent=None):
QDialog.__init__(self, parent)
self.l = l = QVBoxLayout(self)
self.setLayout(l)
self.webview = QWebView(self)
l.addWidget(self.webview)
self.resize(QSize(1024, 768))
self.webview.setPage(page)
# }}}
class Browser(QObject, FormsMixin):
'''
Browser (WebKit with no GUI).
This browser is NOT thread safe. Use it in a single thread only! If you
need to run downloads in parallel threads, use multiple browsers (after
copying the cookies).
'''
def __init__(self,
# Logging. If None, uses a default log, which does not output
# debugging info
log=None,
# Receives a string and returns True/False. By default, returns
# True for all strings
confirm_callback=None,
# Prompt callback. Receives a msg string and a default value
# string. Should return the user input value or None if the user
# canceled the prompt. By default returns None.
prompt_callback=None,
# User agent to be used
user_agent=USER_AGENT,
# The size (in MB) of the on disk cache. Note that because the disk
# cache cannot be shared between different instances, we currently
# use a temporary dir for the cache, which is deleted on
# program exit. Set to zero to disable cache.
disk_cache_size=50,
# Enable Inspect element functionality
enable_developer_tools=False,
# Verbosity
verbosity=0,
# The default timeout (in seconds)
default_timeout=30,
# If True, do not connect to the X server on linux
headless=True
):
must_use_qt(headless=headless)
QObject.__init__(self)
FormsMixin.__init__(self)
if log is None:
log = ThreadSafeLog()
if verbosity:
log.filter_level = log.DEBUG
self.log = log
self.default_timeout = default_timeout
self.page = WebPage(log, confirm_callback=confirm_callback,
prompt_callback=prompt_callback, user_agent=user_agent,
enable_developer_tools=enable_developer_tools,
parent=self)
self.nam = NetworkAccessManager(log, disk_cache_size=disk_cache_size, parent=self)
self.page.setNetworkAccessManager(self.nam)
@property
def user_agent(self):
return self.page.user_agent
def _wait_for_load(self, timeout, url=None):
timeout = self.default_timeout if timeout is default_timeout else timeout
loop = QEventLoop(self)
start_time = time.time()
end_time = start_time + timeout
lw = LoadWatcher(self.page, parent=self)
while lw.is_loading and end_time > time.time():
if not loop.processEvents():
time.sleep(0.01)
if lw.is_loading:
raise Timeout('Loading of %r took longer than %d seconds'%(
url, timeout))
return lw.loaded_ok
def _wait_for_replies(self, reply_count, timeout):
final_time = time.time() + (self.default_timeout if timeout is default_timeout else timeout)
loop = QEventLoop(self)
while (time.time() < final_time and self.nam.reply_count <
reply_count):
loop.processEvents()
time.sleep(0.1)
if self.nam.reply_count < reply_count:
raise Timeout('Waiting for replies took longer than %d seconds' %
timeout)
def run_for_a_time(self, timeout):
final_time = time.time() + timeout
loop = QEventLoop(self)
while (time.time() < final_time):
if not loop.processEvents():
time.sleep(0.1)
def wait_for_element(self, selector, timeout=default_timeout):
timeout = self.default_timeout if timeout is default_timeout else timeout
start_time = time.time()
while self.css_select(selector) is None:
self.run_for_a_time(0.1)
if time.time() - start_time > timeout:
raise Timeout('DOM failed to load in %.1g seconds' % timeout)
return self.css_select(selector)
def visit(self, url, timeout=default_timeout):
'''
Open the page specified in URL and wait for it to complete loading.
Note that when this method returns, there may still be javascript
that needs to execute (this method returns when the loadFinished()
signal is called on QWebPage). This method will raise a Timeout
exception if loading takes more than timeout seconds.
Returns True if loading was successful, False otherwise.
'''
self.current_form = None
self.page.load_url(url)
return self._wait_for_load(timeout, url)
def back(self, wait_for_load=True, timeout=default_timeout):
'''
Like clicking the back button in the browser. Waits for loading to complete.
This method will raise a Timeout exception if loading takes more than timeout seconds.
Returns True if loading was successful, False otherwise.
'''
self.page.triggerAction(self.page.Back)
if wait_for_load:
return self._wait_for_load(timeout)
def stop(self):
'Stop loading of current page'
self.page.triggerAction(self.page.Stop)
def stop_scheduled_refresh(self):
'Stop any scheduled page refresh/reloads'
self.page.triggerAction(self.page.StopScheduledPageRefresh)
def reload(self, bypass_cache=False):
action = self.page.ReloadAndBypassCache if bypass_cache else self.page.Reload
self.page.triggerAction(action)
@property
def dom_ready(self):
return self.page.dom_loaded
def wait_till_dom_ready(self, timeout=default_timeout, url=None):
timeout = self.default_timeout if timeout is default_timeout else timeout
start_time = time.time()
while not self.dom_ready:
if time.time() - start_time > timeout:
raise Timeout('Loading of %r took longer than %d seconds'%(
url, timeout))
self.run_for_a_time(0.1)
def start_load(self, url, timeout=default_timeout, selector=None):
'''
Start the loading of the page at url and return once the DOM is ready,
sub-resources such as scripts/stylesheets/images/etc. may not have all
loaded.
'''
self.current_form = None
self.page.load_url(url)
if selector is not None:
self.wait_for_element(selector, timeout=timeout)
else:
self.wait_till_dom_ready(timeout=timeout, url=url)
def click(self, qwe_or_selector, wait_for_load=True, ajax_replies=0, timeout=default_timeout):
'''
Click the :class:`QWebElement` pointed to by qwe_or_selector.
:param wait_for_load: If you know that the click is going to cause a
new page to be loaded, set this to True to have
the method block until the new page is loaded
:para ajax_replies: Number of replies to wait for after clicking a link
that triggers some AJAX interaction
'''
initial_count = self.nam.reply_count
qwe = qwe_or_selector
if not isinstance(qwe, QWebElement):
qwe = self.css_select(qwe)
if qwe is None:
raise ElementNotFound('Failed to find element with selector: %r'
% qwe_or_selector)
js = '''
var e = document.createEvent('MouseEvents');
e.initEvent( 'click', true, true );
this.dispatchEvent(e);
'''
qwe.evaluateJavaScript(js)
if ajax_replies > 0:
reply_count = initial_count + ajax_replies
self._wait_for_replies(reply_count, timeout)
elif wait_for_load and not self._wait_for_load(timeout):
raise LoadError('Clicking resulted in a failed load')
def click_text_link(self, text_or_regex, selector='a[href]',
wait_for_load=True, ajax_replies=0, timeout=default_timeout):
target = None
for qwe in self.page.mainFrame().findAllElements(selector):
src = unicode(qwe.toPlainText())
if hasattr(text_or_regex, 'match') and text_or_regex.search(src):
target = qwe
break
if src.lower() == text_or_regex.lower():
target = qwe
break
if target is None:
raise ElementNotFound('No element matching %r with text %s found'%(
selector, text_or_regex))
return self.click(target, wait_for_load=wait_for_load,
ajax_replies=ajax_replies, timeout=timeout)
def css_select(self, selector, all=False):
if all:
return tuple(self.page.mainFrame().findAllElements(selector).toList())
ans = self.page.mainFrame().findFirstElement(selector)
if ans.isNull():
ans = None
return ans
def get_image(self, qwe_or_selector):
'''
Return the image identified by qwe_or_selector as a QPixmap. If no such
image exists, the returned pixmap will be null.
'''
return self.page.get_image(qwe_or_selector)
def get_cached(self, url):
iod = self.nam.cache.data(QUrl(url))
if iod is not None:
try:
return bytes(bytearray(iod.readAll()))
finally:
# Ensure the IODevice is closed right away, so that the
# underlying file can be deleted if the space is needed,
# otherwise on windows the file stays locked
iod.close()
del iod
def wait_for_resources(self, urls, timeout=default_timeout):
timeout = self.default_timeout if timeout is default_timeout else timeout
start_time = time.time()
ans = {}
urls = set(urls)
def get_resources():
for url in tuple(urls):
raw = self.get_cached(url)
if raw is not None:
ans[url] = raw
urls.discard(url)
while urls and time.time() - start_time < timeout and not self.load_completed:
get_resources()
if urls:
self.run_for_a_time(0.1)
if urls:
get_resources()
return ans
@property
def load_completed(self):
return self.page.ready_state in {'complete', 'completed'}
def get_resource(self, url, rtype='img', use_cache=True, timeout=default_timeout):
'''
Download a resource (image/stylesheet/script). The resource is
downloaded by visiting an simple HTML page that contains only that
resource. The resource is then returned from the cache (therefore, to
use this method you must not disable the cache). If use_cache is True
then the cache is queried before loading the resource. This can result
in a stale object if the resource has changed on the server, however,
it is a big performance boost in the common case, by avoiding a
roundtrip to the server. The resource is returned as a bytestring or None
if it could not be loaded.
'''
if not hasattr(self.nam, 'cache'):
raise RuntimeError('Cannot get resources when the cache is disabled')
if use_cache:
ans = self.get_cached(url)
if ans is not None:
return ans
try:
tag = {
'img': '<img src="%s">',
'link': '<link href="%s"></link>',
'script': '<script src="%s"></script>',
}[rtype] % prepare_string_for_xml(url, attribute=True)
except KeyError:
raise ValueError('Unknown resource type: %s' % rtype)
self.page.mainFrame().setHtml(
'''<!DOCTYPE html><html><body><div>{0}</div></body></html>'''.format(tag))
self._wait_for_load(timeout)
ans = self.get_cached(url)
if ans is not None:
return ans
def download_file(self, url_or_selector_or_qwe, timeout=60):
'''
Download unsupported content: i.e. files the browser cannot handle
itself or files marked for saving as files by the website. Useful if
you want to download something like an epub file after authentication.
You can pass in either the url to the file to be downloaded, or a
selector that points to an element to be clicked on the current page
which will cause the file to be downloaded.
'''
ans = [False, None, []]
loop = QEventLoop(self)
start_time = time.time()
end_time = start_time + timeout
self.page.unsupportedContent.disconnect(self.page.on_unsupported_content)
try:
def download(reply):
if ans[0]:
reply.abort() # We only handle the first unsupported download
return
ans[0] = True
while not reply.isFinished() and end_time > time.time():
if not loop.processEvents():
time.sleep(0.01)
raw = bytes(bytearray(reply.readAll()))
if raw:
ans[-1].append(raw)
if not reply.isFinished():
ans[1] = Timeout('Loading of %r took longer than %d seconds'%(url_or_selector_or_qwe, timeout))
ans[-1].append(bytes(bytearray(reply.readAll())))
self.page.unsupportedContent.connect(download)
if hasattr(url_or_selector_or_qwe, 'rstrip') and re.match('[a-z]+://', url_or_selector_or_qwe) is not None:
# We have a URL
self.page.mainFrame().load(QUrl(url_or_selector_or_qwe))
else:
self.click(url_or_selector_or_qwe, wait_for_load=False)
lw = LoadWatcher(self.page)
while not ans[0] and lw.is_loading and end_time > time.time():
if not loop.processEvents():
time.sleep(0.01)
if not ans[0]:
raise NotAFile('%r does not point to a downloadable file. You can only'
' use this method to download files that the browser cannot handle'
' natively. Or files that are marked with the '
' content-disposition: attachment header' % url_or_selector_or_qwe)
if ans[1] is not None:
raise ans[1]
return b''.join(ans[-1])
finally:
self.page.unsupportedContent.disconnect()
self.page.unsupportedContent.connect(self.page.on_unsupported_content)
def show_browser(self):
'''
Show the currently loaded web page in a window. Useful for debugging.
'''
if app_is_headless():
raise RuntimeError('Cannot show browser when running in a headless Qt application')
view = BrowserView(self.page)
view.exec_()
@property
def cookies(self):
'''
Return all the cookies set currently as :class:`Cookie` objects.
Returns expired cookies as well.
'''
return list(self.nam.py_cookies())
@property
def html(self):
return unicode(self.page.mainFrame().toHtml())
def blank(self):
try:
self.visit('about:blank', timeout=0.01)
except Timeout:
pass
def close(self):
self.stop()
self.blank()
self.stop()
self.nam.setCache(QNetworkDiskCache())
self.nam.cache = None
self.nam = self.page = None
def __enter__(self):
pass
def __exit__(self, *args):
self.close()

View File

@ -1,261 +0,0 @@
#!/usr/bin/env python2
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
from __future__ import (unicode_literals, division, absolute_import,
print_function)
from future_builtins import map
__license__ = 'GPL v3'
__copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
from itertools import chain
from calibre import as_unicode
default_timeout = object()
# Forms {{{
class Control(object):
def __init__(self, qwe):
self.qwe = qwe
self.name = unicode(qwe.attribute('name')) or unicode(qwe.attribute('id'))
self.type = unicode(qwe.attribute('type'))
def __repr__(self):
return unicode(self.qwe.toOuterXml())
@dynamic_property
def value(self):
def fget(self):
if self.type in ('checkbox', 'radio'):
return unicode(self.qwe.attribute('checked')) == 'checked'
if self.type in ('text', 'password', 'hidden', 'email', 'search'):
return unicode(self.qwe.attribute('value'))
if self.type in ('number', 'range'):
return int(unicode(self.qwe.attribute('value')))
# Unknown type just treat as text
return unicode(self.qwe.attribute('value'))
def fset(self, val):
if self.type in ('checkbox', 'radio'):
if val:
self.qwe.setAttribute('checked', 'checked')
else:
self.qwe.removeAttribute('checked')
elif self.type in ('text', 'password', 'hidden', 'email', 'search'):
self.qwe.setAttribute('value', as_unicode(val))
elif self.type in ('number', 'range'):
self.qwe.setAttribute('value', '%d'%int(val))
else: # Unknown type treat as text
self.qwe.setAttribute('value', as_unicode(val))
return property(fget=fget, fset=fset)
class RadioControl(object):
ATTR = 'checked'
def __init__(self, name, controls):
self.name = name
self.type = 'radio'
self.values = {unicode(c.attribute('value')):c for c in controls}
def __repr__(self):
return '%s(%s)'%(self.__class__.__name__, ', '.join(self.values))
@dynamic_property
def value(self):
def fget(self):
for val, x in self.values.iteritems():
if unicode(x.attribute(self.ATTR)) == self.ATTR:
return val
def fset(self, val):
control = None
for value, x in self.values.iteritems():
if val == value:
control = x
break
if control is not None:
for x in self.values.itervalues():
x.removeAttribute(self.ATTR)
control.setAttribute(self.ATTR, self.ATTR)
return property(fget=fget, fset=fset)
class SelectControl(RadioControl):
ATTR = 'selected'
def __init__(self, qwe):
self.qwe = qwe
self.name = unicode(qwe.attribute('name'))
self.type = 'select'
self.values = {unicode(c.attribute('value')):c for c in
qwe.findAll('option')}
class Form(object):
'''
Provides dictionary like access to all the controls in a form.
For example::
form['username'] = 'some name'
form['password'] = 'password'
See also the :attr:`controls` property and the :meth:`submit_control` method.
'''
def __init__(self, qwe):
self.qwe = qwe
self.attributes = {unicode(x):unicode(qwe.attribute(x)) for x in
qwe.attributeNames()}
self.input_controls = list(map(Control, qwe.findAll('input')))
rc = [y for y in self.input_controls if y.type == 'radio']
self.input_controls = [ic for ic in self.input_controls if ic.type != 'radio']
rc_names = {x.name for x in rc}
self.radio_controls = {name:RadioControl(name, [z.qwe for z in rc if z.name == name]) for name in rc_names}
selects = list(map(SelectControl, qwe.findAll('select')))
self.select_controls = {x.name:x for x in selects}
self.button_controls = list(map(Control, qwe.findAll('button')))
@property
def controls(self):
for x in self.input_controls:
if x.name:
yield x.name
for x in (self.radio_controls, self.select_controls):
for n in x.iterkeys():
if n:
yield n
def control_object(self, name):
for x in self.input_controls:
if name == x.name:
return x
for x in (self.radio_controls, self.select_controls):
try:
return x[name]
except KeyError:
continue
raise KeyError('No control with the name %s in this form'%name)
def __getitem__(self, key):
for x in self.input_controls:
if key == x.name:
return x.value
for x in (self.radio_controls, self.select_controls):
try:
return x[key].value
except KeyError:
continue
raise KeyError('No control with the name %s in this form'%key)
def __setitem__(self, key, val):
control = None
for x in self.input_controls:
if key == x.name:
control = x
break
if control is None:
for x in (self.radio_controls, self.select_controls):
control = x.get(key, None)
if control is not None:
break
if control is None:
raise KeyError('No control with the name %s in this form'%key)
control.value = val
def __repr__(self):
attrs = ['%s=%s'%(k, v) for k, v in self.attributes.iteritems()]
return '<form %s>'%(' '.join(attrs))
def submit_control(self, submit_control_selector=None):
if submit_control_selector is not None:
sc = self.qwe.findFirst(submit_control_selector)
if not sc.isNull():
return sc
for c in chain(self.input_controls, self.button_controls):
if c.type == 'submit':
return c
for c in self.input_controls:
if c.type == 'image':
return c
# }}}
class FormsMixin(object):
def __init__(self):
self.current_form = None
def find_form(self, css2_selector=None, nr=None, qwebelement=None):
mf = self.page.mainFrame()
if qwebelement is not None:
return Form(qwebelement)
if css2_selector is not None:
candidate = mf.findFirstElement(css2_selector)
if not candidate.isNull():
return Form(candidate)
if nr is not None and int(nr) > -1:
nr = int(nr)
forms = mf.findAllElements('form')
if nr < forms.count():
return Form(forms.at(nr))
def all_forms(self):
'''
Return all forms present in the current page.
'''
mf = self.page.mainFrame()
return list(map(Form, mf.findAllElements('form').toList()))
def select_form(self, css2_selector=None, nr=None, qwebelement=None):
'''
Select a form for further processing. Specify the form either with
css2_selector or nr. Raises ValueError if no matching form is found.
:param css2_selector: A CSS2 selector, for example:
'form[action="/accounts/login"]' or 'form[id="loginForm"]'
:param nr: An integer >= 0. Selects the nr'th form in the current page.
:param qwebelement: A QWebElement, useful is CSS selectors are insufficient
'''
self.current_form = self.find_form(css2_selector=css2_selector, nr=nr, qwebelement=qwebelement)
if self.current_form is None:
raise ValueError('No such form found')
return self.current_form
def submit(self, submit_control_selector=None, wait_for_load=True,
ajax_replies=0, timeout=default_timeout):
'''
Submit the currently selected form. Tries to autodetect the submit
control. You can override auto-detection by specifying a CSS2 selector
as submit_control_selector. For the rest of the parameters, see the
documentation of the click() method.
'''
if self.current_form is None:
raise ValueError('No form selected, use select_form() first')
sc = self.current_form.submit_control(submit_control_selector)
if sc is None:
raise ValueError('No submit control found in the current form')
self.current_form = None
self.click(getattr(sc, 'qwe', sc), wait_for_load=wait_for_load,
ajax_replies=ajax_replies, timeout=timeout)
def ajax_submit(self, submit_control_selector=None,
num_of_replies=1, timeout=default_timeout):
'''
Submit the current form. This method is meant for those forms that
use AJAX rather than a plain submit. It will block until the specified
number of responses are returned from the server after the submit
button is clicked.
'''
self.submit(submit_control_selector=submit_control_selector,
wait_for_load=False, ajax_replies=num_of_replies,
timeout=timeout)

View File

@ -1,56 +0,0 @@
#!/usr/bin/env python2
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
from __future__ import (unicode_literals, division, absolute_import,
print_function)
__license__ = 'GPL v3'
__copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
from calibre import USER_AGENT
from calibre.web.jsbrowser.browser import Browser
def do_login(login_url, calibre_browser, form_selector, controls={},
num_of_replies=0, timeout=60.0, verbosity=0, pause_time=5,
post_visit_callback=None, post_submit_callback=None,
submit_control_selector=None):
ua = USER_AGENT
for key, val in calibre_browser.addheaders:
if key.lower() == 'user-agent':
ua = val
break
br = Browser(user_agent=ua, verbosity=verbosity)
if not br.visit(login_url, timeout=timeout):
raise ValueError('Failed to load the login URL: %r'%login_url)
if callable(post_visit_callback):
post_visit_callback(br)
f = br.select_form(form_selector)
for key, val in controls.iteritems():
f[key] = val
# br.show_browser()
if num_of_replies > 0:
br.ajax_submit(num_of_replies=num_of_replies, timeout=timeout,
submit_control_selector=submit_control_selector)
else:
br.submit(timeout=timeout,
submit_control_selector=submit_control_selector)
# Give any javascript some time to run
br.run_for_a_time(pause_time)
if callable(post_submit_callback):
post_submit_callback(br)
br.show_browser()
cj = calibre_browser.cookiejar
for cookie in br.cookies:
cj.set_cookie(cookie)
html = br.html
br.close()
return html

View File

@ -1,240 +0,0 @@
#!/usr/bin/env python2
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
from __future__ import (unicode_literals, division, absolute_import,
print_function)
__license__ = 'GPL v3'
__copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import unittest, pprint, threading, time
import cherrypy
from calibre import browser
from calibre.web.jsbrowser.browser import Browser
from calibre.library.server.utils import (cookie_max_age_to_expires,
cookie_time_fmt)
class Server(object):
def __init__(self):
self.form_data = {}
@cherrypy.expose
def index(self):
return '''
<html>
<head><title>JS Browser test</title></head>
<script type="text/javascript" src="jquery"></script>
<script type="text/javascript">
$(document).ready(function() {
$('#ajax_test').submit(function() {
var val = $('#ajax_test input[name="text"]').val();
$.ajax({
dataType: "html",
url: "/controls_test",
data: {"text":val},
success: function(data) {
$('#ajax_test input[name="text"]').val(data);
}
});
return false;
});
});
</script>
<body>
<form id="controls_test" method="post" action="controls_test">
<h3>Test controls</h3>
<div><label>Simple Text:</label><input type="text" name="text"/></div>
<div><label>Password:</label><input type="password" name="password"/></div>
<div><label>Checked Checkbox:</label><input type="checkbox" checked="checked" name="checked_checkbox"/></div>
<div><label>UnChecked Checkbox:</label><input type="checkbox" name="unchecked_checkbox"/></div>
<div><input type="radio" name="sex" value="male" checked="checked" /> Male</div>
<div><input type="radio" name="sex" value="female" /> Female</div>
<div><label>Color:</label><select name="color"><option value="red" selected="selected" /><option value="green" /></select></div>
<div><input type="submit" value="Submit" /></div>
</form>
<form id="image_test" method="post" action="controls_test">
<h3>Test Image submit</h3>
<div><label>Simple Text:</label><input type="text" name="text" value="Image Test" /></div>
<input type="image" src="button_image" alt="Submit" />
</form>
<form id="ajax_test" method="post" action="controls_test">
<h3>Test AJAX submit</h3>
<div><label>Simple Text:</label><input type="text" name="text" value="AJAX Test" /></div>
<input type="submit" />
</form>
</body>
</html>
'''
@cherrypy.expose
def controls_test(self, **kwargs):
self.form_data = kwargs.copy()
# pprint.pprint(kwargs)
return pprint.pformat(kwargs)
@cherrypy.expose
def button_image(self):
cherrypy.response.headers['Content-Type'] = 'image/png'
return I('next.png', data=True)
@cherrypy.expose
def jquery(self):
cherrypy.response.headers['Content-Type'] = 'text/javascript'
return P('content_server/jquery.js', data=True)
@cherrypy.expose
def cookies(self):
try:
cookie = cherrypy.response.cookie
cookie[b'cookiea'] = 'The%20first%20cookie'
cookie[b'cookiea']['path'] = '/'
cookie[b'cookiea']['max-age'] = 60 # seconds
cookie[b'cookieb'] = 'The_second_cookie'
cookie[b'cookieb']['path'] = '/'
cookie[b'cookieb']['expires'] = cookie_max_age_to_expires(60) # seconds
cookie[b'cookiec'] = 'The_third_cookie'
cookie[b'cookiec']['path'] = '/'
self.sent_cookies = {n:(c.value, dict(c)) for n, c in
dict(cookie).iteritems()}
return pprint.pformat(self.sent_cookies)
except:
import traceback
traceback.print_exc()
@cherrypy.expose
def receive_cookies(self):
self.received_cookies = {n:(c.value, dict(c)) for n, c in
dict(cherrypy.request.cookie).iteritems()}
return pprint.pformat(self.received_cookies)
class Test(unittest.TestCase):
@classmethod
def run_server(cls):
cherrypy.engine.start()
try:
cherrypy.engine.block()
except:
pass
@classmethod
def setUpClass(cls):
cls.port = 17983
cls.server = Server()
cherrypy.config.update({
'log.screen' : False,
'checker.on' : False,
'engine.autoreload.on' : False,
'request.show_tracebacks': True,
'server.socket_host' : b'127.0.0.1',
'server.socket_port' : cls.port,
'server.socket_timeout' : 10, # seconds
'server.thread_pool' : 5, # number of threads setting to 1 causes major slowdown
'server.shutdown_timeout': 0.1, # minutes
})
cherrypy.tree.mount(cls.server, '/', config={'/':{}})
cls.server_thread = threading.Thread(target=cls.run_server)
cls.server_thread.daemon = True
cls.server_thread.start()
cls.browser = Browser(verbosity=0)
@classmethod
def tearDownClass(cls):
cherrypy.engine.exit()
cls.browser = None
def test_control_types(self):
'Test setting data in the various control types'
self.assertEqual(self.browser.visit('http://127.0.0.1:%d'%self.port),
True)
values = {
'checked_checkbox' : (False, None),
'unchecked_checkbox': (True, 'on'),
'text': ('some text', 'some text'),
'password': ('some password', 'some password'),
'sex': ('female', 'female'),
'color': ('green', 'green'),
}
f = self.browser.select_form('#controls_test')
for k, vals in values.iteritems():
f[k] = vals[0]
self.browser.submit()
dat = self.server.form_data
for k, vals in values.iteritems():
self.assertEqual(vals[1], dat.get(k, None),
'Field %s: %r != %r'%(k, vals[1], dat.get(k, None)))
def test_image_submit(self):
'Test submitting a form with a image as the submit control'
self.assertEqual(self.browser.visit('http://127.0.0.1:%d'%self.port),
True)
self.browser.select_form('#image_test')
self.browser.submit()
self.assertEqual(self.server.form_data['text'], 'Image Test')
def test_ajax_submit(self):
'Test AJAX based form submission'
self.assertEqual(self.browser.visit('http://127.0.0.1:%d'%self.port),
True)
f = self.browser.select_form('#ajax_test')
f['text'] = 'Changed'
self.browser.ajax_submit()
self.assertEqual(self.server.form_data['text'], 'Changed')
def test_cookies(self):
'Test migration of cookies to python objects'
self.assertEqual(self.browser.visit('http://127.0.0.1:%d/cookies'%self.port),
True)
sent_cookies = self.server.sent_cookies
cookies = self.browser.cookies
cmap = {c.name:c for c in cookies}
for name, vals in sent_cookies.iteritems():
c = cmap[name]
value, fields = vals
self.assertEqual(value, c.value)
for field in ('secure', 'path'):
cval = getattr(c, field)
if cval is False:
cval = b''
self.assertEqual(fields[field], cval,
'Field %s in %s: %r != %r'%(field, name, fields[field], cval))
cexp = cookie_time_fmt(time.gmtime(c.expires))
fexp = fields['expires']
if fexp:
self.assertEqual(fexp, cexp)
def test_cookie_copy(self):
'Test copying of cookies from jsbrowser to mechanize'
self.assertEqual(self.browser.visit('http://127.0.0.1:%d/cookies'%self.port),
True)
sent_cookies = self.server.sent_cookies.copy()
self.browser.visit('http://127.0.0.1:%d/receive_cookies'%self.port)
orig_rc = self.server.received_cookies.copy()
br = browser(user_agent=self.browser.user_agent)
br.copy_cookies_from_jsbrowser(self.browser)
br.open('http://127.0.0.1:%d/receive_cookies'%self.port)
for name, vals in sent_cookies.iteritems():
val = vals[0]
try:
rval = self.server.received_cookies[name][0]
except:
self.fail('The cookie: %s was not received by the server')
self.assertEqual(val, rval,
'The received value for the cookie: %s, %s != %s'%(
name, rval, val))
self.assertEqual(orig_rc, self.server.received_cookies)
def tests():
return unittest.TestLoader().loadTestsFromTestCase(Test)
def run():
unittest.TextTestRunner(verbosity=2).run(tests())
if __name__ == '__main__':
run()