Virtualize network access when running RS tests

This commit is contained in:
Kovid Goyal 2021-03-04 14:48:58 +05:30
parent c851eaf67a
commit 57650d92d1
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C

View File

@ -331,35 +331,68 @@ def atomic_write(base, name, content):
def run_rapydscript_tests(): def run_rapydscript_tests():
from qt.core import QApplication, QEventLoop from urllib.parse import parse_qs
from qt.webengine import QWebEnginePage, QWebEngineScript from qt.core import QApplication, QByteArray, QEventLoop, QUrl
from qt.webengine import (
QWebEnginePage, QWebEngineProfile, QWebEngineScript, QWebEngineUrlRequestJob,
QWebEngineUrlScheme, QWebEngineUrlSchemeHandler
)
from calibre.constants import FAKE_HOST, FAKE_PROTOCOL
from calibre.gui2 import must_use_qt from calibre.gui2 import must_use_qt
from calibre.gui2.webengine import secure_webengine from calibre.gui2.viewer.web_view import send_reply
from calibre.gui2.webengine import secure_webengine, insert_scripts, create_script
must_use_qt() must_use_qt()
scheme = QWebEngineUrlScheme(FAKE_PROTOCOL.encode('ascii'))
scheme.setSyntax(QWebEngineUrlScheme.Syntax.Host)
scheme.setFlags(QWebEngineUrlScheme.Flag.SecureScheme)
QWebEngineUrlScheme.registerScheme(scheme)
base = base_dir() base = base_dir()
rapydscript_dir = os.path.join(base, 'src', 'pyj') rapydscript_dir = os.path.join(base, 'src', 'pyj')
fname = os.path.join(rapydscript_dir, 'test.pyj') fname = os.path.join(rapydscript_dir, 'test.pyj')
with lopen(fname, 'rb') as f: with lopen(fname, 'rb') as f:
js = compile_fast(f.read(), fname) js = compile_fast(f.read(), fname)
def create_script(src, name): class UrlSchemeHandler(QWebEngineUrlSchemeHandler):
s = QWebEngineScript()
s.setName(name) def __init__(self, parent=None):
s.setInjectionPoint(QWebEngineScript.InjectionPoint.DocumentReady) QWebEngineUrlSchemeHandler.__init__(self, parent)
s.setWorldId(QWebEngineScript.ScriptWorldId.ApplicationWorld) self.allowed_hosts = (FAKE_HOST,)
s.setRunsOnSubFrames(False) self.registered_data = {}
s.setSourceCode(src)
return s def requestStarted(self, rq):
if bytes(rq.requestMethod()) != b'GET':
return self.fail_request(rq, QWebEngineUrlRequestJob.Error.RequestDenied)
url = rq.requestUrl()
host = url.host()
if host not in self.allowed_hosts:
return self.fail_request(rq)
q = parse_qs(url.query())
if not q:
return self.fail_request(rq)
mt = q.get('mime-type', ('text/plain',))[0]
data = q.get('data', ('',))[0].encode('utf-8')
send_reply(rq, mt, data)
def fail_request(self, rq, fail_code=None):
if fail_code is None:
fail_code = QWebEngineUrlRequestJob.Error.UrlNotFound
rq.fail(fail_code)
print("Blocking FAKE_PROTOCOL request: {}".format(rq.requestUrl().toString()), file=sys.stderr)
class Tester(QWebEnginePage): class Tester(QWebEnginePage):
def __init__(self): def __init__(self):
QWebEnginePage.__init__(self) profile = QWebEngineProfile(QApplication.instance())
profile.setHttpUserAgent('calibre-tester')
insert_scripts(profile, create_script('test-rapydscript.js', js, on_subframes=False))
url_handler = UrlSchemeHandler(profile)
profile.installUrlSchemeHandler(QByteArray(FAKE_PROTOCOL.encode('ascii')), url_handler)
QWebEnginePage.__init__(self, profile, None)
self.titleChanged.connect(self.title_changed) self.titleChanged.connect(self.title_changed)
secure_webengine(self) secure_webengine(self)
self.scripts().insert(create_script(js, 'test-rapydscript.js')) self.setHtml('<p>initialize', QUrl(f'{FAKE_PROTOCOL}://{FAKE_HOST}/index.html'))
self.setHtml('<p>initialize')
self.working = True self.working = True
def title_changed(self, title): def title_changed(self, title):