From 57650d92d13d5616c25090a1a8e9184fc1f419af Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Thu, 4 Mar 2021 14:48:58 +0530 Subject: [PATCH] Virtualize network access when running RS tests --- src/calibre/utils/rapydscript.py | 61 ++++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 14 deletions(-) diff --git a/src/calibre/utils/rapydscript.py b/src/calibre/utils/rapydscript.py index 3b84cdcf6b..a7f0451724 100644 --- a/src/calibre/utils/rapydscript.py +++ b/src/calibre/utils/rapydscript.py @@ -331,35 +331,68 @@ def atomic_write(base, name, content): def run_rapydscript_tests(): - from qt.core import QApplication, QEventLoop - from qt.webengine import QWebEnginePage, QWebEngineScript + from urllib.parse import parse_qs + from qt.core import QApplication, QByteArray, QEventLoop, QUrl + from qt.webengine import ( + QWebEnginePage, QWebEngineProfile, QWebEngineScript, QWebEngineUrlRequestJob, + QWebEngineUrlScheme, QWebEngineUrlSchemeHandler + ) + from calibre.constants import FAKE_HOST, FAKE_PROTOCOL from calibre.gui2 import must_use_qt - from calibre.gui2.webengine import secure_webengine + from calibre.gui2.viewer.web_view import send_reply + from calibre.gui2.webengine import secure_webengine, insert_scripts, create_script must_use_qt() + scheme = QWebEngineUrlScheme(FAKE_PROTOCOL.encode('ascii')) + scheme.setSyntax(QWebEngineUrlScheme.Syntax.Host) + scheme.setFlags(QWebEngineUrlScheme.Flag.SecureScheme) + QWebEngineUrlScheme.registerScheme(scheme) + base = base_dir() rapydscript_dir = os.path.join(base, 'src', 'pyj') fname = os.path.join(rapydscript_dir, 'test.pyj') with lopen(fname, 'rb') as f: js = compile_fast(f.read(), fname) - def create_script(src, name): - s = QWebEngineScript() - s.setName(name) - s.setInjectionPoint(QWebEngineScript.InjectionPoint.DocumentReady) - s.setWorldId(QWebEngineScript.ScriptWorldId.ApplicationWorld) - s.setRunsOnSubFrames(False) - s.setSourceCode(src) - return s + class UrlSchemeHandler(QWebEngineUrlSchemeHandler): + + def __init__(self, parent=None): + QWebEngineUrlSchemeHandler.__init__(self, parent) + self.allowed_hosts = (FAKE_HOST,) + self.registered_data = {} + + def requestStarted(self, rq): + if bytes(rq.requestMethod()) != b'GET': + return self.fail_request(rq, QWebEngineUrlRequestJob.Error.RequestDenied) + url = rq.requestUrl() + host = url.host() + if host not in self.allowed_hosts: + return self.fail_request(rq) + q = parse_qs(url.query()) + if not q: + return self.fail_request(rq) + mt = q.get('mime-type', ('text/plain',))[0] + data = q.get('data', ('',))[0].encode('utf-8') + send_reply(rq, mt, data) + + def fail_request(self, rq, fail_code=None): + if fail_code is None: + fail_code = QWebEngineUrlRequestJob.Error.UrlNotFound + rq.fail(fail_code) + print("Blocking FAKE_PROTOCOL request: {}".format(rq.requestUrl().toString()), file=sys.stderr) class Tester(QWebEnginePage): def __init__(self): - QWebEnginePage.__init__(self) + profile = QWebEngineProfile(QApplication.instance()) + profile.setHttpUserAgent('calibre-tester') + insert_scripts(profile, create_script('test-rapydscript.js', js, on_subframes=False)) + url_handler = UrlSchemeHandler(profile) + profile.installUrlSchemeHandler(QByteArray(FAKE_PROTOCOL.encode('ascii')), url_handler) + QWebEnginePage.__init__(self, profile, None) self.titleChanged.connect(self.title_changed) secure_webengine(self) - self.scripts().insert(create_script(js, 'test-rapydscript.js')) - self.setHtml('

initialize') + self.setHtml('

initialize', QUrl(f'{FAKE_PROTOCOL}://{FAKE_HOST}/index.html')) self.working = True def title_changed(self, title):