JS Browser: Support for non AJAX forms

This commit is contained in:
Kovid Goyal 2011-09-20 16:54:31 -06:00
parent 0ec60fa315
commit 5331ac23bb
3 changed files with 339 additions and 17 deletions

View File

@ -18,9 +18,11 @@ from calibre import USER_AGENT, prints, get_proxies, get_proxy_info
from calibre.constants import ispy3, config_dir
from calibre.utils.logging import ThreadSafeLog
from calibre.gui2 import must_use_qt
from calibre.web.jsbrowser.forms import FormsMixin
class Timeout(Exception):
pass
class Timeout(Exception): pass
class LoadError(Exception): pass
class WebPage(QWebPage): # {{{
@ -28,6 +30,7 @@ class WebPage(QWebPage): # {{{
confirm_callback=None,
prompt_callback=None,
user_agent=USER_AGENT,
enable_developer_tools=False,
parent=None):
QWebPage.__init__(self, parent)
@ -38,7 +41,8 @@ class WebPage(QWebPage): # {{{
self.setForwardUnsupportedContent(True)
self.unsupportedContent.connect(self.on_unsupported_content)
settings = self.settings()
settings.setAttribute(QWebSettings.DeveloperExtrasEnabled, True)
if enable_developer_tools:
settings.setAttribute(QWebSettings.DeveloperExtrasEnabled, True)
QWebSettings.enablePersistentStorage(os.path.join(config_dir, 'caches',
'webkit-persistence'))
QWebSettings.setMaximumPagesInCache(0)
@ -211,7 +215,7 @@ class BrowserView(QDialog): # {{{
# }}}
class Browser(QObject):
class Browser(QObject, FormsMixin):
'''
Browser (WebKit with no GUI).
@ -240,11 +244,15 @@ class Browser(QObject):
# If True a disk cache is used
use_disk_cache=True,
# Enable Inspect element functionality
enable_developer_tools=False,
# Verbosity
verbosity = 0
):
must_use_qt()
QObject.__init__(self)
FormsMixin.__init__(self)
if log is None:
log = ThreadSafeLog()
@ -259,10 +267,25 @@ class Browser(QObject):
self.page = WebPage(log, confirm_callback=confirm_callback,
prompt_callback=prompt_callback, user_agent=user_agent,
enable_developer_tools=enable_developer_tools,
parent=self)
self.nam = NetworkAccessManager(log, use_disk_cache=use_disk_cache, parent=self)
self.page.setNetworkAccessManager(self.nam)
def _wait_for_load(self, timeout, url=None):
loop = QEventLoop(self)
start_time = time.time()
end_time = start_time + timeout
lw = LoadWatcher(self.page, parent=self)
while lw.is_loading and end_time > time.time():
if not loop.processEvents():
time.sleep(0.01)
if lw.is_loading:
raise Timeout('Loading of %r took longer than %d seconds'%(
url, timeout))
return lw.loaded_ok
def visit(self, url, timeout=30.0):
'''
Open the page specified in URL and wait for it to complete loading.
@ -273,22 +296,30 @@ class Browser(QObject):
Returns True if loading was successful, False otherwise.
'''
loop = QEventLoop(self)
start_time = time.time()
end_time = start_time + timeout
lw = LoadWatcher(self.page, parent=self)
self.current_form = None
self.page.mainFrame().load(QUrl(url))
return self._wait_for_load(timeout, url)
while lw.is_loading and end_time > time.time():
if not loop.processEvents():
time.sleep(0.01)
def click(self, qwe, wait_for_load=True, ajax_replies=0, timeout=30.0):
'''
Click the QWebElement pointed to by qwe.
if lw.is_loading:
raise Timeout('Loading of %r took longer than %d seconds'%(
url, timeout))
return lw.loaded_ok
:param wait_for_load: If you know that the click is going to cause a
new page to be loaded, set this to True to have
the method block until the new page is loaded
:para ajax_replies: Number of replies to wait for after clicking a link
that triggers some AJAX interaction
'''
js = '''
var e = document.createEvent('MouseEvents');
e.initEvent( 'click', true, true );
this.dispatchEvent(e);
'''
qwe.evaluateJavaScript(js)
if ajax_replies > 0:
raise NotImplementedError('AJAX clicking not implemented')
elif wait_for_load and not self._wait_for_load(timeout):
raise LoadError('Clicking resulted in a failed load')
def show_browser(self):
'''

View File

@ -0,0 +1,160 @@
#!/usr/bin/env python
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
from __future__ import (unicode_literals, division, absolute_import,
print_function)
from future_builtins import map
__license__ = 'GPL v3'
__copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
from calibre import as_unicode
class Control(object):
def __init__(self, qwe):
self.qwe = qwe
self.name = unicode(qwe.attribute('name'))
self.type = unicode(qwe.attribute('type'))
def __repr__(self):
return unicode(self.qwe.toOuterXml())
@dynamic_property
def value(self):
def fget(self):
if self.type in ('checkbox', 'radio'):
return unicode(self.qwe.attribute('checked')) == 'checked'
if self.type in ('text', 'password'):
return unicode(self.qwe.attribute('value'))
def fset(self, val):
if self.type in ('checkbox', 'radio'):
if val:
self.qwe.setAttribute('checked', 'checked')
else:
self.qwe.removeAttribute('checked')
elif self.type in ('text', 'password'):
self.qwe.setAttribute('value', as_unicode(val))
return property(fget=fget, fset=fset)
class RadioControl(object):
def __init__(self, name, controls):
self.name = name
self.type = 'radio'
self.values = {unicode(c.attribute('value')):c for c in controls}
def __repr__(self):
return 'RadioControl(%s)'%(', '.join(self.values))
@dynamic_property
def value(self):
def fget(self):
for val, x in self.values.iteritems():
if unicode(x.attribute('checked')) == 'checked':
return val
def fset(self, val):
control = None
for value, x in self.values.iteritems():
if val == value:
control = x
break
if control is not None:
for x in self.values.itervalues():
x.removeAttribute('checked')
control.setAttribute('checked', 'checked')
return property(fget=fget, fset=fset)
class Form(object):
def __init__(self, qwe):
self.qwe = qwe
self.attributes = {unicode(x):unicode(qwe.attribute(x)) for x in
qwe.attributeNames()}
self.input_controls = list(map(Control, qwe.findAll('input')))
rc = [x for x in self.input_controls if x.type == 'radio']
self.input_controls = [x for x in self.input_controls if x.type != 'radio']
rc_names = {x.name for x in rc}
self.radio_controls = {name:RadioControl(name, [x.qwe for x in rc if x.name == name]) for name in rc_names}
def __getitem__(self, key):
for x in self.input_controls:
if key == x.name:
return x
try:
return self.radio_controls.get(key)
except KeyError:
pass
raise KeyError('No control with the name %s in this form'%key)
def __repr__(self):
attrs = ['%s=%s'%(k, v) for k, v in self.attributes.iteritems()]
return '<form %s>'%(' '.join(attrs))
def submit_control(self, submit_control_selector=None):
if submit_control_selector is not None:
sc = self.qwe.findFirst(submit_control_selector)
if not sc.isNull():
return sc
for c in self.input_controls:
if c.type == 'submit':
return c
for c in self.input_controls:
if c.type == 'image':
return c
class FormsMixin(object):
def __init__(self):
self.current_form = None
def find_form(self, css2_selector=None, nr=None):
mf = self.page.mainFrame()
if css2_selector is not None:
candidate = mf.findFirstElement(css2_selector)
if not candidate.isNull():
return Form(candidate)
if nr is not None and int(nr) > -1:
nr = int(nr)
forms = mf.findAllElements('form')
if nr < forms.count():
return Form(forms.at(nr))
def all_forms(self):
'''
Return all forms present in the current page.
'''
mf = self.page.mainFrame()
return list(map(Form, mf.findAllElements('form').toList()))
def select_form(self, css2_selector=None, nr=None):
'''
Select a form for further processing. Specify the form either with
css2_selector or nr. Raises ValueError if no matching form is found.
:param css2_selector: A CSS2 selector, for example:
'form[action="/accounts/login"]' or 'form[id="loginForm"]'
:param nr: An integer >= 0. Selects the nr'th form in the current page.
'''
self.current_form = self.find_form(css2_selector=css2_selector, nr=nr)
if self.current_form is None:
raise ValueError('No such form found')
return self.current_form
def submit(self, submit_control_selector=None, ajax_replies=0, timeout=30.0):
if self.current_form is None:
raise ValueError('No form selected, use select_form() first')
sc = self.current_form.submit_control(submit_control_selector)
if sc is None:
raise ValueError('No submit control found in the current form')
self.current_form = None
self.click(sc.qwe, ajax_replies=ajax_replies, timeout=timeout)

View File

@ -0,0 +1,131 @@
#!/usr/bin/env python
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
from __future__ import (unicode_literals, division, absolute_import,
print_function)
__license__ = 'GPL v3'
__copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import unittest, pprint, threading
import cherrypy
from calibre.web.jsbrowser.browser import Browser
class Server(object):
def __init__(self):
self.form_data = {}
@cherrypy.expose
def index(self):
return '''
<html>
<head><title>JS Browser test</title></head>
<body>
<form id="controls_test" method="post" action="controls_test">
<h3>Test controls</h3>
<div><label>Simple Text:</label><input type="text" name="text"/></div>
<div><label>Password:</label><input type="password" name="password"/></div>
<div><label>Checked Checkbox:</label><input type="checkbox" checked="checked" name="checked_checkbox"/></div>
<div><label>UnChecked Checkbox:</label><input type="checkbox" name="unchecked_checkbox"/></div>
<div><input type="radio" name="sex" value="male" checked="checked" /> Male</div>
<div><input type="radio" name="sex" value="female" /> Female</div>
<div><input type="submit" value="Submit" /></div>
</form>
<form id="image_test" method="post" action="controls_test">
<h3>Test Image submit</h3>
<div><label>Simple Text:</label><input type="text" name="text" value="Image Test" /></div>
<input type="image" src="button_image" alt="Submit" />
</form>
</body>
</html>
'''
@cherrypy.expose
def controls_test(self, **kwargs):
self.form_data = kwargs.copy()
#pprint.pprint(kwargs)
return pprint.pformat(kwargs)
@cherrypy.expose
def button_image(self):
cherrypy.response.headers['Content-Type'] = 'image/png'
return I('next.png', data=True)
class Test(unittest.TestCase):
@classmethod
def run_server(cls):
cherrypy.engine.start()
try:
cherrypy.engine.block()
except:
pass
@classmethod
def setUpClass(cls):
cls.port = 17983
cls.server = Server()
cherrypy.config.update({
'log.screen' : False,
'checker.on' : False,
'engine.autoreload_on' : False,
'request.show_tracebacks': True,
'server.socket_host' : b'127.0.0.1',
'server.socket_port' : cls.port,
'server.socket_timeout' : 10, #seconds
'server.thread_pool' : 1, # number of threads
'server.shutdown_timeout': 0.1, # minutes
})
cherrypy.tree.mount(cls.server, '/', config={'/':{}})
cls.server_thread = threading.Thread(target=cls.run_server)
cls.server_thread.daemon = True
cls.server_thread.start()
cls.browser = Browser(verbosity=1)
@classmethod
def tearDownClass(cls):
cherrypy.engine.exit()
cls.browser = None
def test_control_types(self):
'Test setting data in the various control types'
self.assertEqual(self.browser.visit('http://127.0.0.1:%d'%self.port),
True)
values = {
'checked_checkbox' : (False, None),
'unchecked_checkbox': (True, 'on'),
'text': ('some text', 'some text'),
'password': ('some password', 'some password'),
'sex': ('female', 'female'),
}
f = self.browser.select_form('#controls_test')
for k, vals in values.iteritems():
f[k].value = vals[0]
self.browser.submit()
dat = self.server.form_data
for k, vals in values.iteritems():
self.assertEqual(vals[1], dat.get(k, None),
'Field %s: %r != %r'%(k, vals[1], dat.get(k, None)))
def test_image_submit(self):
'Test submitting a form with a image as the submit control'
self.assertEqual(self.browser.visit('http://127.0.0.1:%d'%self.port),
True)
self.browser.select_form('#image_test')
self.browser.submit()
self.assertEqual(self.server.form_data['text'], 'Image Test')
def tests():
return unittest.TestLoader().loadTestsFromTestCase(Test)
def run():
unittest.TextTestRunner(verbosity=2).run(tests())
if __name__ == '__main__':
run()