Move webengine specific code into its own module

Since webengine currently needs to be imported before QApplication is
constructed, this allows it to be controlled easily
This commit is contained in:
Kovid Goyal 2022-04-02 08:42:28 +05:30
parent fbb0956665
commit b7c92f3498
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 164 additions and 130 deletions

View File

@ -5,139 +5,15 @@
import json import json
import os import os
import secrets
import sys import sys
import time import weakref
from functools import lru_cache from qt.core import QLoggingCategory, QUrl
from qt.core import QApplication, QEventLoop, QLoggingCategory, QUrl from threading import Lock, Thread
from qt.webengine import QWebEnginePage, QWebEngineProfile, QWebEngineSettings
from threading import Lock
from calibre.constants import cache_dir, iswindows from calibre.constants import iswindows
from calibre.gui2.webengine import create_script, insert_scripts
from calibre.ptempfile import PersistentTemporaryFile from calibre.ptempfile import PersistentTemporaryFile
from calibre.utils.ipc.simple_worker import start_pipe_worker
from calibre.utils.filenames import retry_on_fail from calibre.utils.filenames import retry_on_fail
from calibre.utils.ipc.simple_worker import start_pipe_worker
def canonicalize_qurl(qurl):
qurl = qurl.adjusted(QUrl.UrlFormattingOption.StripTrailingSlash | QUrl.UrlFormattingOption.NormalizePathSegments)
if qurl.path() == '/':
qurl = qurl.adjusted(QUrl.UrlFormattingOption.RemovePath)
return qurl
@lru_cache(maxsize=None)
def create_profile(cache_name='simple', allow_js=False):
from calibre.utils.random_ua import random_common_chrome_user_agent
ans = QWebEngineProfile(cache_name, QApplication.instance())
ans.setHttpUserAgent(random_common_chrome_user_agent())
ans.setHttpCacheMaximumSize(0) # managed by webengine
ans.setCachePath(os.path.join(cache_dir(), 'scraper', cache_name))
s = ans.settings()
a = s.setAttribute
a(QWebEngineSettings.WebAttribute.PluginsEnabled, False)
a(QWebEngineSettings.WebAttribute.JavascriptEnabled, allow_js)
s.setUnknownUrlSchemePolicy(QWebEngineSettings.UnknownUrlSchemePolicy.DisallowUnknownUrlSchemes)
a(QWebEngineSettings.WebAttribute.JavascriptCanOpenWindows, False)
a(QWebEngineSettings.WebAttribute.JavascriptCanAccessClipboard, False)
# ensure javascript cannot read from local files
a(QWebEngineSettings.WebAttribute.LocalContentCanAccessFileUrls, False)
a(QWebEngineSettings.WebAttribute.AllowWindowActivationFromJavaScript, False)
js = P('scraper.js', allow_user_override=False, data=True).decode('utf-8')
ans.token = secrets.token_hex()
js = js.replace('TOKEN', ans.token)
insert_scripts(ans, create_script('scraper.js', js))
return ans
class SimpleScraper(QWebEnginePage):
def __init__(self, source, parent=None):
profile = create_profile(source)
self.token = profile.token
self.is_being_tested = source == 'test'
super().__init__(profile, parent)
self.setAudioMuted(True)
self.loadStarted.connect(self.load_started)
self.loadFinished.connect(self.load_finished)
self.loadProgress.connect(self.load_progress)
def print(self, *a):
print(*a, file=sys.stderr)
def load_started(self):
if self.is_being_tested:
self.print(f'load_started: {self.is_current_url=} {self.requestedUrl()=}')
if self.is_current_url:
self.current_fetch['load_started'] = True
def load_finished(self, ok):
if self.is_being_tested:
self.print(f'load_finished: {ok=} {self.is_current_url=}')
if self.is_current_url:
self.current_fetch['load_finished'] = True
self.current_fetch['load_was_ok'] = ok
if not ok and self.is_current_url:
self.current_fetch['working'] = False
def load_progress(self, progress):
if self.is_being_tested:
self.print(f'load_progress: {progress=} {self.is_current_url=}')
if self.is_current_url:
self.current_fetch['end_time'] = time.monotonic() + self.current_fetch['timeout']
def javaScriptAlert(self, url, msg):
pass
def javaScriptConfirm(self, url, msg):
return True
def javaScriptPrompt(self, url, msg, defval):
return True, defval
@property
def is_current_url(self):
if not hasattr(self, 'current_fetch'):
return False
return canonicalize_qurl(self.requestedUrl()) == self.current_fetch['fetching_url']
def javaScriptConsoleMessage(self, level, message, line_num, source_id):
parts = message.split(maxsplit=1)
if len(parts) == 2 and parts[0] == self.token:
msg = json.loads(parts[1])
t = msg.get('type')
if t == 'print':
print(msg['text'], file=sys.stderr)
elif t == 'domready':
if self.is_being_tested:
self.print(f'domready: {self.is_current_url=}')
if self.is_current_url:
self.current_fetch['working'] = False
if not msg.get('failed'):
self.current_fetch['html'] = msg['html']
def fetch(self, url_or_qurl, timeout=60):
fetching_url = QUrl(url_or_qurl)
self.current_fetch = {
'timeout': timeout, 'end_time': time.monotonic() + timeout,
'fetching_url': canonicalize_qurl(fetching_url), 'working': True,
'load_started': False
}
self.load(fetching_url)
try:
app = QApplication.instance()
while self.current_fetch['working'] and time.monotonic() < self.current_fetch['end_time']:
app.processEvents(QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents)
ans = self.current_fetch.get('html')
if ans is None:
eurl = fetching_url.toString()
if self.current_fetch['working']:
raise TimeoutError(f'Timed out loading HTML from: {eurl}')
raise ValueError(f'Failed to load HTML from: {eurl}')
return ans
finally:
del self.current_fetch
def worker_main(source): def worker_main(source):
@ -145,6 +21,8 @@ def worker_main(source):
qt.webenginecontext.info=false qt.webenginecontext.info=false
''') ''')
from calibre.gui2 import must_use_qt from calibre.gui2 import must_use_qt
from .simple_backend import SimpleScraper
must_use_qt() must_use_qt()
s = SimpleScraper(source) s = SimpleScraper(source)
for line in sys.stdin.buffer: for line in sys.stdin.buffer:
@ -170,11 +48,15 @@ qt.webenginecontext.info=false
print(json.dumps(result), flush=True) print(json.dumps(result), flush=True)
overseers = []
class Overseer: class Overseer:
def __init__(self): def __init__(self):
self.lock = Lock() self.lock = Lock()
self.workers = {} self.workers = {}
overseers.append(weakref.ref(self))
def worker_for_source(self, source): def worker_for_source(self, source):
with self.lock: with self.lock:
@ -212,12 +94,29 @@ class Overseer:
if w.wait(0.1) is None: if w.wait(0.1) is None:
w.kill() w.kill()
self.workers.clear() self.workers.clear()
close = __del__
def cleanup_overseers():
threads = []
for x in overseers:
o = x()
if o is not None:
t = Thread(target=o.close, name='CloseOverSeer')
t.start()
threads.append(t)
del overseers[:]
def join_all():
for t in threads:
t.join()
return join_all
def find_tests(): def find_tests():
import re
import unittest import unittest
from lxml.html import fromstring, tostring from lxml.html import fromstring, tostring
import re
skip = '' skip = ''
is_sanitized = 'libasan' in os.environ.get('LD_PRELOAD', '') is_sanitized = 'libasan' in os.environ.get('LD_PRELOAD', '')
if is_sanitized: if is_sanitized:

View File

@ -0,0 +1,135 @@
#!/usr/bin/env python
# vim:fileencoding=utf-8
# License: GPL v3 Copyright: 2022, Kovid Goyal <kovid at kovidgoyal.net>
import json
import os
import secrets
import sys
import time
from functools import lru_cache
from qt.core import QApplication, QEventLoop, QUrl
from qt.webengine import QWebEnginePage, QWebEngineProfile, QWebEngineSettings
from calibre.constants import cache_dir
from calibre.gui2.webengine import create_script, insert_scripts
def canonicalize_qurl(qurl):
qurl = qurl.adjusted(QUrl.UrlFormattingOption.StripTrailingSlash | QUrl.UrlFormattingOption.NormalizePathSegments)
if qurl.path() == '/':
qurl = qurl.adjusted(QUrl.UrlFormattingOption.RemovePath)
return qurl
@lru_cache(maxsize=None)
def create_profile(cache_name='simple', allow_js=False):
from calibre.utils.random_ua import random_common_chrome_user_agent
ans = QWebEngineProfile(cache_name, QApplication.instance())
ans.setHttpUserAgent(random_common_chrome_user_agent())
ans.setHttpCacheMaximumSize(0) # managed by webengine
ans.setCachePath(os.path.join(cache_dir(), 'scraper', cache_name))
s = ans.settings()
a = s.setAttribute
a(QWebEngineSettings.WebAttribute.PluginsEnabled, False)
a(QWebEngineSettings.WebAttribute.JavascriptEnabled, allow_js)
s.setUnknownUrlSchemePolicy(QWebEngineSettings.UnknownUrlSchemePolicy.DisallowUnknownUrlSchemes)
a(QWebEngineSettings.WebAttribute.JavascriptCanOpenWindows, False)
a(QWebEngineSettings.WebAttribute.JavascriptCanAccessClipboard, False)
# ensure javascript cannot read from local files
a(QWebEngineSettings.WebAttribute.LocalContentCanAccessFileUrls, False)
a(QWebEngineSettings.WebAttribute.AllowWindowActivationFromJavaScript, False)
js = P('scraper.js', allow_user_override=False, data=True).decode('utf-8')
ans.token = secrets.token_hex()
js = js.replace('TOKEN', ans.token)
insert_scripts(ans, create_script('scraper.js', js))
return ans
class SimpleScraper(QWebEnginePage):
def __init__(self, source, parent=None):
profile = create_profile(source)
self.token = profile.token
self.is_being_tested = source == 'test'
super().__init__(profile, parent)
self.setAudioMuted(True)
self.loadStarted.connect(self.load_started)
self.loadFinished.connect(self.load_finished)
self.loadProgress.connect(self.load_progress)
def print(self, *a):
print(*a, file=sys.stderr)
def load_started(self):
if self.is_being_tested:
self.print(f'load_started: {self.is_current_url=} {self.requestedUrl()=}')
if self.is_current_url:
self.current_fetch['load_started'] = True
def load_finished(self, ok):
if self.is_being_tested:
self.print(f'load_finished: {ok=} {self.is_current_url=}')
if self.is_current_url:
self.current_fetch['load_finished'] = True
self.current_fetch['load_was_ok'] = ok
if not ok and self.is_current_url:
self.current_fetch['working'] = False
def load_progress(self, progress):
if self.is_being_tested:
self.print(f'load_progress: {progress=} {self.is_current_url=}')
if self.is_current_url:
self.current_fetch['end_time'] = time.monotonic() + self.current_fetch['timeout']
def javaScriptAlert(self, url, msg):
pass
def javaScriptConfirm(self, url, msg):
return True
def javaScriptPrompt(self, url, msg, defval):
return True, defval
@property
def is_current_url(self):
if not hasattr(self, 'current_fetch'):
return False
return canonicalize_qurl(self.requestedUrl()) == self.current_fetch['fetching_url']
def javaScriptConsoleMessage(self, level, message, line_num, source_id):
parts = message.split(maxsplit=1)
if len(parts) == 2 and parts[0] == self.token:
msg = json.loads(parts[1])
t = msg.get('type')
if t == 'print':
print(msg['text'], file=sys.stderr)
elif t == 'domready':
if self.is_being_tested:
self.print(f'domready: {self.is_current_url=}')
if self.is_current_url:
self.current_fetch['working'] = False
if not msg.get('failed'):
self.current_fetch['html'] = msg['html']
def fetch(self, url_or_qurl, timeout=60):
fetching_url = QUrl(url_or_qurl)
self.current_fetch = {
'timeout': timeout, 'end_time': time.monotonic() + timeout,
'fetching_url': canonicalize_qurl(fetching_url), 'working': True,
'load_started': False
}
self.load(fetching_url)
try:
app = QApplication.instance()
while self.current_fetch['working'] and time.monotonic() < self.current_fetch['end_time']:
app.processEvents(QEventLoop.ProcessEventsFlag.ExcludeUserInputEvents)
ans = self.current_fetch.get('html')
if ans is None:
eurl = fetching_url.toString()
if self.current_fetch['working']:
raise TimeoutError(f'Timed out loading HTML from: {eurl}')
raise ValueError(f'Failed to load HTML from: {eurl}')
return ans
finally:
del self.current_fetch