Write a fake browser object to help integrate the new webengine based fetcher into the recipe system

This commit is contained in:
Kovid Goyal 2024-08-09 08:59:34 +05:30
parent 40d217d7e7
commit 4f077f1934
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
3 changed files with 225 additions and 41 deletions

View File

@ -0,0 +1,171 @@
#!/usr/bin/env python
# License: GPLv3 Copyright: 2024, Kovid Goyal <kovid at kovidgoyal.net>
import json
import time
from contextlib import suppress
from io import BytesIO
from queue import Queue
from threading import Lock, Thread
from urllib.error import URLError
from calibre.ptempfile import PersistentTemporaryDirectory
class FakeResponse:
def __init__(self):
self.queue = Queue()
self.done = False
self.final_url = ''
self.data = BytesIO()
def _wait(self):
if self.done:
return
self.done = True
res = self.queue.get()
if res['action'] == 'input_error':
raise Exception(res['error'])
self.final_url = res['final_url']
if 'error' in res:
ex = URLError(res['error'])
ex.worth_retry = bool(res.get('worth_retry'))
raise ex
self.data = open(res['output'], 'rb')
def read(self, *a, **kw):
self._wait()
return self.data.read(*a, **kw)
def seek(self, *a, **kw):
self._wait()
return self.data.seek(*a, **kw)
def tell(self, *a, **kw):
return self.data.tell(*a, **kw)
def geturl(self):
self._wait()
return self.final_url
def close(self):
self.data.close()
class Browser:
def __init__(self, user_agent: str = '', headers: tuple[tuple[str, str], ...] = ()):
self.tdir = ''
self.worker = self.dispatcher = None
self.dispatch_map = {}
self.id_counter = 0
self.addheaders: list[tuple[str, str]] = list(headers)
self.user_agent = user_agent
self.lock = Lock()
self.shutting_down = False
def open(self, url_or_request, data=None, timeout=None):
if data is not None:
raise TypeError('The scraper fetch browser does not support sending data with requests')
headers = []
if hasattr(url_or_request, 'get_method'):
r = url_or_request
if r.get_method() != 'GET':
raise TypeError('The scraper fetch browser only supports GET requests')
if r.data is not None:
raise TypeError('The scraper fetch browser does not support sending data with requests')
headers = r.header_items()
url = r.full_url
else:
url = url_or_request
self._ensure_state()
with self.lock:
self.id_counter += 1
res = FakeResponse()
self.dispatch_map[self.id_counter] = res.queue
cmd = {'action': 'download', 'id': self.id_counter, 'url': url, 'timeout': timeout, 'headers': self.addheaders + headers}
self._send_command(cmd)
return res
open_novisit = open
def _send_command(self, cmd):
self.worker.stdin.write(json.dumps(cmd).encode())
self.worker.stdin.write(b'\n')
self.worker.stdin.flush()
def _ensure_state(self):
with self.lock:
if not self.tdir:
self.tdir = PersistentTemporaryDirectory()
self.worker = run_worker(self.tdir, self.user_agent)
self.dispatcher = Thread(target=self._dispatch, daemon=True)
self.dispatcher.start()
def _dispatch(self):
try:
for line in self.worker.stdout:
cmd = json.loads(line)
if cmd.get('action') == 'finished':
with self.lock:
q = self.dispatch_map.pop(cmd['id'])
q.put(cmd)
else:
raise Exception(f'Unexpected response from backend fetch worker process: {cmd}')
except Exception:
if not self.shutting_down:
import traceback
traceback.print_exc()
def shutdown(self):
self.shutting_down = True
import shutil
if self.worker:
with suppress(OSError):
self.worker.stdin.close()
with suppress(OSError):
self.worker.stdout.close()
give_up_at = time.monotonic() + 1.5
while time.monotonic() < give_up_at and self.worker.poll() is None:
time.sleep(0.01)
if self.worker.poll() is None:
self.worker.kill()
if self.tdir:
with suppress(OSError):
shutil.rmtree(self.tdir)
self.tdir = ''
if self.dispatcher:
self.dispatcher.join()
self.dispatcher = None
def __del__(self):
self.shutdown()
def run_worker(tdir: str, user_agent: str):
from calibre.utils.ipc.simple_worker import start_pipe_worker
return start_pipe_worker(f'from calibre.scraper.fetch import worker; worker({tdir!r}, {user_agent!r})')
def worker(*args):
from calibre.gui2 import must_use_qt
must_use_qt()
from .fetch_backend import worker
worker(*args)
def develop():
import sys
br = Browser()
try:
for url in sys.argv[1:]:
res = br.open(url)
print(url, len(res.read()))
finally:
del br
if __name__ == '__main__':
develop()

View File

@ -8,13 +8,18 @@ import sys
from contextlib import suppress
from threading import Thread
from time import monotonic
from typing import Union
from qt.core import QApplication, QNetworkCookie, QObject, Qt, QTimer, QUrl, pyqtSignal
from qt.core import QApplication, QNetworkCookie, QObject, Qt, QTimer, QUrl, pyqtSignal, sip
from qt.webengine import QWebEngineDownloadRequest, QWebEnginePage, QWebEngineUrlRequestInfo, QWebEngineUrlRequestInterceptor
from calibre.scraper.simple_backend import create_base_profile
default_timeout: float = 60. # seconds
def debug(*a, **kw):
print(*a, **kw, file=sys.stderr)
class RequestInterceptor(QWebEngineUrlRequestInterceptor):
@ -23,9 +28,8 @@ class RequestInterceptor(QWebEngineUrlRequestInterceptor):
if fb:
key = qurl_to_key(req.requestUrl())
if dr := fb.download_requests[key]:
for name, x in dr.headers.items():
for val in ([x] if isinstance(x, str) else x):
req.setHttpHeader(name.encode(), val.encode())
for (name, val) in dr.headers:
req.setHttpHeader(name.encode(), val.encode())
def qurl_to_string(url: QUrl | str) -> str:
@ -33,7 +37,7 @@ def qurl_to_string(url: QUrl | str) -> str:
qurl_to_key = qurl_to_string
Headers = dict[str, Union[str, list[str]]]
Headers = list[tuple[str, str]]
class DownloadRequest:
@ -44,17 +48,17 @@ class DownloadRequest:
worth_retry: bool = False
webengine_download_request: QWebEngineDownloadRequest | None = None
def __init__(self, url: str, filename: str, headers: Headers | None = None, timeout: float = 60.):
def __init__(self, url: str, filename: str, headers: Headers | None = None, timeout: float = default_timeout):
self.url, self.filename = url, filename
self.url_key = qurl_to_key(url)
self.headers: Headers = headers or {}
self.num_of_responses_needed = 1
self.headers: Headers = headers or []
self.responses_needed: list[int] = []
self.error_message = ''
self.created_at = self.last_activity_at = monotonic()
self.timeout = timeout
def as_result(self) -> dict[str, str]:
result = {'action': 'finished', 'url': self.url, 'output': os.path.join(
def as_result(self, req_id: int) -> dict[str, str]:
result = {'action': 'finished', 'id': req_id, 'url': self.url, 'output': os.path.join(
self.webengine_download_request.downloadDirectory(), self.webengine_download_request.downloadFileName()),
'final_url': qurl_to_string(self.webengine_download_request.url())
}
@ -66,7 +70,7 @@ class DownloadRequest:
if self.timeout and self.last_activity_at + self.timeout < now:
return True
time_taken = now - self.created_at
if time_taken > 60 and self.webengine_download_request is not None:
if time_taken > default_timeout and self.webengine_download_request is not None:
downloaded = self.webengine_download_request.receivedBytes()
rate = downloaded / time_taken
return rate < 10
@ -75,18 +79,20 @@ class DownloadRequest:
class FetchBackend(QWebEnginePage):
request_download = pyqtSignal(str, str)
request_download = pyqtSignal(str, str, object, float, int)
input_finished = pyqtSignal(str)
download_finished = pyqtSignal(object)
def __init__(self, output_dir: str = '', cache_name: str = '', parent: QObject = None) -> None:
self.profile = create_base_profile(cache_name)
self.profile.downloadRequested.connect(self._download_requested)
def __init__(self, output_dir: str = '', cache_name: str = '', parent: QObject = None, user_agent: str = '') -> None:
profile = create_base_profile(cache_name)
if user_agent:
profile.setHttpUserAgent(user_agent)
profile.downloadRequested.connect(self._download_requested)
self.output_dir = output_dir or os.getcwd()
self.profile.setDownloadPath(self.output_dir)
super().__init__(self.profile, parent)
profile.setDownloadPath(self.output_dir)
super().__init__(profile, parent)
self.interceptor = RequestInterceptor(self)
self.profile.setUrlRequestInterceptor(self.interceptor)
profile.setUrlRequestInterceptor(self.interceptor)
self.request_download.connect(self.download, type=Qt.ConnectionType.QueuedConnection)
self.input_finished.connect(self.on_input_finished, type=Qt.ConnectionType.QueuedConnection)
self.download_requests: dict[str, DownloadRequest] = {}
@ -115,23 +121,24 @@ class FetchBackend(QWebEnginePage):
if self.live_requests:
self.timeout_timer.start()
def download(self, url: str, filename: str, extra_headers: Headers | None = None, timeout: float = 60.) -> None:
def download(self, url: str, filename: str, extra_headers: Headers | None = None, timeout: float = default_timeout, req_id: int = 0) -> None:
filename = os.path.basename(filename)
qurl = QUrl(url)
key = qurl_to_key(qurl)
dr = self.download_requests.get(key)
if dr and not dr.error:
if dr.finished:
result = dr.as_result()
result = dr.as_result(req_id)
self.download_finished.emit(result)
self.send_response(result)
else:
dr.num_of_responses_needed += 1
dr.responses_needed.append(req_id)
else:
self.download_requests[key] = dr = DownloadRequest(url, filename, extra_headers, timeout)
self.dr_identifier_count += 1
self.pending_download_requests[self.dr_identifier_count] = dr
self.live_requests.add(dr)
dr.responses_needed.append(req_id)
if not self.timeout_timer.isActive():
self.timeout_timer.start()
super().download(qurl, str(self.dr_identifier_count))
@ -172,18 +179,18 @@ class FetchBackend(QWebEnginePage):
def report_finish(self, wdr: QWebEngineDownloadRequest, dr: DownloadRequest) -> None:
s = wdr.state()
result: dict[str, str] = {}
dr.last_activity_at = monotonic()
dr.finished = True
self.live_requests.discard(dr)
has_result = False
if s == QWebEngineDownloadRequest.DownloadState.DownloadRequested:
dr.error = 'Open of URL failed'
result = dr.as_result()
has_result = True
elif s == QWebEngineDownloadRequest.DownloadState.DownloadCancelled:
dr.error = 'Timed out waiting for download'
dr.worth_retry = True
result = dr.as_result()
has_result = True
elif s == QWebEngineDownloadRequest.DownloadState.DownloadInterrupted:
dr.error = wdr.interruptReasonString()
dr.worth_retry = wdr.interruptReason() in (
@ -193,25 +200,26 @@ class FetchBackend(QWebEnginePage):
QWebEngineDownloadRequest.DownloadInterruptReason.NetworkServerDown,
QWebEngineDownloadRequest.DownloadInterruptReason.ServerUnreachable,
)
result = dr.as_result()
has_result = True
elif s == QWebEngineDownloadRequest.DownloadState.DownloadCompleted:
result = dr.as_result()
has_result = True
if result:
self.download_finished.emit(result)
while dr.num_of_responses_needed:
dr.num_of_responses_needed -= 1
if has_result:
for req_id in dr.responses_needed:
result = dr.as_result(req_id)
self.download_finished.emit(result)
self.send_response(result)
del dr.responses_needed[:]
def send_response(self, r: dict[str, str]) -> None:
with suppress(OSError):
print(json.dumps(r), flush=True)
def set_user_agent(self, new_val: str) -> None:
self.profile.setHttpUserAgent(new_val)
self.profile().setHttpUserAgent(new_val)
def set_simple_cookie(self, name, value, domain, path='/'):
cs = self.profile.cookieStore()
cs = self.profile().cookieStore()
cookie_string = f'{name}={value}; Domain={domain}; Path={path}'
for c in QNetworkCookie.parseCookies(cookie_string):
cs.setCookie(c)
@ -226,7 +234,10 @@ def read_commands(backend: FetchBackend, tdir: str) -> None:
ac = cmd['action']
if ac == 'download':
file_counter += 1
backend.request_download.emit(cmd['url'], os.path.join(tdir, str(file_counter)))
timeout = cmd.get('timeout')
if timeout is None:
timeout = default_timeout
backend.request_download.emit(cmd['url'], os.path.join(tdir, str(file_counter)), cmd.get('headers'), timeout, cmd.get('id', 0))
elif ac == 'quit':
break
except Exception as err:
@ -236,13 +247,15 @@ def read_commands(backend: FetchBackend, tdir: str) -> None:
backend.input_finished.emit(error_msg)
def worker(tdir):
def worker(tdir: str, user_agent: str) -> None:
app = QApplication.instance()
backend = FetchBackend(output_dir=tdir, parent=app)
read_thread = Thread(target=read_commands, args=(backend, tdir), daemon=True)
read_thread.start()
app.exec()
del backend
backend = FetchBackend(output_dir=tdir, parent=app, user_agent=user_agent)
try:
read_thread = Thread(target=read_commands, args=(backend, tdir), daemon=True)
read_thread.start()
app.exec()
finally:
sip.delete(backend)
del app

View File

@ -290,7 +290,7 @@ class RecursiveFetcher:
except URLError as err:
if hasattr(err, 'code') and err.code in responses:
raise FetchError(responses[err.code])
is_temp = False
is_temp = getattr(err, 'worth_retry', False)
reason = getattr(err, 'reason', None)
if isinstance(reason, socket.gaierror):
# see man gai_strerror() for details