From 4f077f1934baafda01f0b76e73100927ac656763 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Fri, 9 Aug 2024 08:59:34 +0530 Subject: [PATCH] Write a fake browser object to help integrate the new webengine based fetcher into the recipe system --- src/calibre/scraper/fetch.py | 171 +++++++++++++++++++++++++++ src/calibre/scraper/fetch_backend.py | 93 ++++++++------- src/calibre/web/fetch/simple.py | 2 +- 3 files changed, 225 insertions(+), 41 deletions(-) create mode 100644 src/calibre/scraper/fetch.py diff --git a/src/calibre/scraper/fetch.py b/src/calibre/scraper/fetch.py new file mode 100644 index 0000000000..e00d82f351 --- /dev/null +++ b/src/calibre/scraper/fetch.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python +# License: GPLv3 Copyright: 2024, Kovid Goyal + +import json +import time +from contextlib import suppress +from io import BytesIO +from queue import Queue +from threading import Lock, Thread +from urllib.error import URLError + +from calibre.ptempfile import PersistentTemporaryDirectory + + +class FakeResponse: + + def __init__(self): + self.queue = Queue() + self.done = False + self.final_url = '' + self.data = BytesIO() + + def _wait(self): + if self.done: + return + self.done = True + res = self.queue.get() + if res['action'] == 'input_error': + raise Exception(res['error']) + self.final_url = res['final_url'] + if 'error' in res: + ex = URLError(res['error']) + ex.worth_retry = bool(res.get('worth_retry')) + raise ex + self.data = open(res['output'], 'rb') + + def read(self, *a, **kw): + self._wait() + return self.data.read(*a, **kw) + + def seek(self, *a, **kw): + self._wait() + return self.data.seek(*a, **kw) + + def tell(self, *a, **kw): + return self.data.tell(*a, **kw) + + def geturl(self): + self._wait() + return self.final_url + + def close(self): + self.data.close() + + +class Browser: + + def __init__(self, user_agent: str = '', headers: tuple[tuple[str, str], ...] = ()): + self.tdir = '' + self.worker = self.dispatcher = None + self.dispatch_map = {} + self.id_counter = 0 + self.addheaders: list[tuple[str, str]] = list(headers) + self.user_agent = user_agent + self.lock = Lock() + self.shutting_down = False + + def open(self, url_or_request, data=None, timeout=None): + if data is not None: + raise TypeError('The scraper fetch browser does not support sending data with requests') + headers = [] + if hasattr(url_or_request, 'get_method'): + r = url_or_request + if r.get_method() != 'GET': + raise TypeError('The scraper fetch browser only supports GET requests') + if r.data is not None: + raise TypeError('The scraper fetch browser does not support sending data with requests') + headers = r.header_items() + url = r.full_url + else: + url = url_or_request + self._ensure_state() + + with self.lock: + self.id_counter += 1 + res = FakeResponse() + self.dispatch_map[self.id_counter] = res.queue + cmd = {'action': 'download', 'id': self.id_counter, 'url': url, 'timeout': timeout, 'headers': self.addheaders + headers} + self._send_command(cmd) + return res + + open_novisit = open + + def _send_command(self, cmd): + self.worker.stdin.write(json.dumps(cmd).encode()) + self.worker.stdin.write(b'\n') + self.worker.stdin.flush() + + def _ensure_state(self): + with self.lock: + if not self.tdir: + self.tdir = PersistentTemporaryDirectory() + self.worker = run_worker(self.tdir, self.user_agent) + self.dispatcher = Thread(target=self._dispatch, daemon=True) + self.dispatcher.start() + + def _dispatch(self): + try: + for line in self.worker.stdout: + cmd = json.loads(line) + if cmd.get('action') == 'finished': + with self.lock: + q = self.dispatch_map.pop(cmd['id']) + q.put(cmd) + else: + raise Exception(f'Unexpected response from backend fetch worker process: {cmd}') + except Exception: + if not self.shutting_down: + import traceback + traceback.print_exc() + + def shutdown(self): + self.shutting_down = True + import shutil + if self.worker: + with suppress(OSError): + self.worker.stdin.close() + with suppress(OSError): + self.worker.stdout.close() + give_up_at = time.monotonic() + 1.5 + while time.monotonic() < give_up_at and self.worker.poll() is None: + time.sleep(0.01) + if self.worker.poll() is None: + self.worker.kill() + if self.tdir: + with suppress(OSError): + shutil.rmtree(self.tdir) + self.tdir = '' + if self.dispatcher: + self.dispatcher.join() + self.dispatcher = None + + def __del__(self): + self.shutdown() + + +def run_worker(tdir: str, user_agent: str): + from calibre.utils.ipc.simple_worker import start_pipe_worker + return start_pipe_worker(f'from calibre.scraper.fetch import worker; worker({tdir!r}, {user_agent!r})') + + +def worker(*args): + from calibre.gui2 import must_use_qt + must_use_qt() + from .fetch_backend import worker + worker(*args) + + +def develop(): + import sys + br = Browser() + try: + for url in sys.argv[1:]: + res = br.open(url) + print(url, len(res.read())) + finally: + del br + + +if __name__ == '__main__': + develop() diff --git a/src/calibre/scraper/fetch_backend.py b/src/calibre/scraper/fetch_backend.py index bfb4824bed..0e99e0a547 100644 --- a/src/calibre/scraper/fetch_backend.py +++ b/src/calibre/scraper/fetch_backend.py @@ -8,13 +8,18 @@ import sys from contextlib import suppress from threading import Thread from time import monotonic -from typing import Union -from qt.core import QApplication, QNetworkCookie, QObject, Qt, QTimer, QUrl, pyqtSignal +from qt.core import QApplication, QNetworkCookie, QObject, Qt, QTimer, QUrl, pyqtSignal, sip from qt.webengine import QWebEngineDownloadRequest, QWebEnginePage, QWebEngineUrlRequestInfo, QWebEngineUrlRequestInterceptor from calibre.scraper.simple_backend import create_base_profile +default_timeout: float = 60. # seconds + + +def debug(*a, **kw): + print(*a, **kw, file=sys.stderr) + class RequestInterceptor(QWebEngineUrlRequestInterceptor): @@ -23,9 +28,8 @@ class RequestInterceptor(QWebEngineUrlRequestInterceptor): if fb: key = qurl_to_key(req.requestUrl()) if dr := fb.download_requests[key]: - for name, x in dr.headers.items(): - for val in ([x] if isinstance(x, str) else x): - req.setHttpHeader(name.encode(), val.encode()) + for (name, val) in dr.headers: + req.setHttpHeader(name.encode(), val.encode()) def qurl_to_string(url: QUrl | str) -> str: @@ -33,7 +37,7 @@ def qurl_to_string(url: QUrl | str) -> str: qurl_to_key = qurl_to_string -Headers = dict[str, Union[str, list[str]]] +Headers = list[tuple[str, str]] class DownloadRequest: @@ -44,17 +48,17 @@ class DownloadRequest: worth_retry: bool = False webengine_download_request: QWebEngineDownloadRequest | None = None - def __init__(self, url: str, filename: str, headers: Headers | None = None, timeout: float = 60.): + def __init__(self, url: str, filename: str, headers: Headers | None = None, timeout: float = default_timeout): self.url, self.filename = url, filename self.url_key = qurl_to_key(url) - self.headers: Headers = headers or {} - self.num_of_responses_needed = 1 + self.headers: Headers = headers or [] + self.responses_needed: list[int] = [] self.error_message = '' self.created_at = self.last_activity_at = monotonic() self.timeout = timeout - def as_result(self) -> dict[str, str]: - result = {'action': 'finished', 'url': self.url, 'output': os.path.join( + def as_result(self, req_id: int) -> dict[str, str]: + result = {'action': 'finished', 'id': req_id, 'url': self.url, 'output': os.path.join( self.webengine_download_request.downloadDirectory(), self.webengine_download_request.downloadFileName()), 'final_url': qurl_to_string(self.webengine_download_request.url()) } @@ -66,7 +70,7 @@ class DownloadRequest: if self.timeout and self.last_activity_at + self.timeout < now: return True time_taken = now - self.created_at - if time_taken > 60 and self.webengine_download_request is not None: + if time_taken > default_timeout and self.webengine_download_request is not None: downloaded = self.webengine_download_request.receivedBytes() rate = downloaded / time_taken return rate < 10 @@ -75,18 +79,20 @@ class DownloadRequest: class FetchBackend(QWebEnginePage): - request_download = pyqtSignal(str, str) + request_download = pyqtSignal(str, str, object, float, int) input_finished = pyqtSignal(str) download_finished = pyqtSignal(object) - def __init__(self, output_dir: str = '', cache_name: str = '', parent: QObject = None) -> None: - self.profile = create_base_profile(cache_name) - self.profile.downloadRequested.connect(self._download_requested) + def __init__(self, output_dir: str = '', cache_name: str = '', parent: QObject = None, user_agent: str = '') -> None: + profile = create_base_profile(cache_name) + if user_agent: + profile.setHttpUserAgent(user_agent) + profile.downloadRequested.connect(self._download_requested) self.output_dir = output_dir or os.getcwd() - self.profile.setDownloadPath(self.output_dir) - super().__init__(self.profile, parent) + profile.setDownloadPath(self.output_dir) + super().__init__(profile, parent) self.interceptor = RequestInterceptor(self) - self.profile.setUrlRequestInterceptor(self.interceptor) + profile.setUrlRequestInterceptor(self.interceptor) self.request_download.connect(self.download, type=Qt.ConnectionType.QueuedConnection) self.input_finished.connect(self.on_input_finished, type=Qt.ConnectionType.QueuedConnection) self.download_requests: dict[str, DownloadRequest] = {} @@ -115,23 +121,24 @@ class FetchBackend(QWebEnginePage): if self.live_requests: self.timeout_timer.start() - def download(self, url: str, filename: str, extra_headers: Headers | None = None, timeout: float = 60.) -> None: + def download(self, url: str, filename: str, extra_headers: Headers | None = None, timeout: float = default_timeout, req_id: int = 0) -> None: filename = os.path.basename(filename) qurl = QUrl(url) key = qurl_to_key(qurl) dr = self.download_requests.get(key) if dr and not dr.error: if dr.finished: - result = dr.as_result() + result = dr.as_result(req_id) self.download_finished.emit(result) self.send_response(result) else: - dr.num_of_responses_needed += 1 + dr.responses_needed.append(req_id) else: self.download_requests[key] = dr = DownloadRequest(url, filename, extra_headers, timeout) self.dr_identifier_count += 1 self.pending_download_requests[self.dr_identifier_count] = dr self.live_requests.add(dr) + dr.responses_needed.append(req_id) if not self.timeout_timer.isActive(): self.timeout_timer.start() super().download(qurl, str(self.dr_identifier_count)) @@ -172,18 +179,18 @@ class FetchBackend(QWebEnginePage): def report_finish(self, wdr: QWebEngineDownloadRequest, dr: DownloadRequest) -> None: s = wdr.state() - result: dict[str, str] = {} dr.last_activity_at = monotonic() dr.finished = True self.live_requests.discard(dr) + has_result = False if s == QWebEngineDownloadRequest.DownloadState.DownloadRequested: dr.error = 'Open of URL failed' - result = dr.as_result() + has_result = True elif s == QWebEngineDownloadRequest.DownloadState.DownloadCancelled: dr.error = 'Timed out waiting for download' dr.worth_retry = True - result = dr.as_result() + has_result = True elif s == QWebEngineDownloadRequest.DownloadState.DownloadInterrupted: dr.error = wdr.interruptReasonString() dr.worth_retry = wdr.interruptReason() in ( @@ -193,25 +200,26 @@ class FetchBackend(QWebEnginePage): QWebEngineDownloadRequest.DownloadInterruptReason.NetworkServerDown, QWebEngineDownloadRequest.DownloadInterruptReason.ServerUnreachable, ) - result = dr.as_result() + has_result = True elif s == QWebEngineDownloadRequest.DownloadState.DownloadCompleted: - result = dr.as_result() + has_result = True - if result: - self.download_finished.emit(result) - while dr.num_of_responses_needed: - dr.num_of_responses_needed -= 1 + if has_result: + for req_id in dr.responses_needed: + result = dr.as_result(req_id) + self.download_finished.emit(result) self.send_response(result) + del dr.responses_needed[:] def send_response(self, r: dict[str, str]) -> None: with suppress(OSError): print(json.dumps(r), flush=True) def set_user_agent(self, new_val: str) -> None: - self.profile.setHttpUserAgent(new_val) + self.profile().setHttpUserAgent(new_val) def set_simple_cookie(self, name, value, domain, path='/'): - cs = self.profile.cookieStore() + cs = self.profile().cookieStore() cookie_string = f'{name}={value}; Domain={domain}; Path={path}' for c in QNetworkCookie.parseCookies(cookie_string): cs.setCookie(c) @@ -226,7 +234,10 @@ def read_commands(backend: FetchBackend, tdir: str) -> None: ac = cmd['action'] if ac == 'download': file_counter += 1 - backend.request_download.emit(cmd['url'], os.path.join(tdir, str(file_counter))) + timeout = cmd.get('timeout') + if timeout is None: + timeout = default_timeout + backend.request_download.emit(cmd['url'], os.path.join(tdir, str(file_counter)), cmd.get('headers'), timeout, cmd.get('id', 0)) elif ac == 'quit': break except Exception as err: @@ -236,13 +247,15 @@ def read_commands(backend: FetchBackend, tdir: str) -> None: backend.input_finished.emit(error_msg) -def worker(tdir): +def worker(tdir: str, user_agent: str) -> None: app = QApplication.instance() - backend = FetchBackend(output_dir=tdir, parent=app) - read_thread = Thread(target=read_commands, args=(backend, tdir), daemon=True) - read_thread.start() - app.exec() - del backend + backend = FetchBackend(output_dir=tdir, parent=app, user_agent=user_agent) + try: + read_thread = Thread(target=read_commands, args=(backend, tdir), daemon=True) + read_thread.start() + app.exec() + finally: + sip.delete(backend) del app diff --git a/src/calibre/web/fetch/simple.py b/src/calibre/web/fetch/simple.py index 6a3172efaa..55a80395b3 100644 --- a/src/calibre/web/fetch/simple.py +++ b/src/calibre/web/fetch/simple.py @@ -290,7 +290,7 @@ class RecursiveFetcher: except URLError as err: if hasattr(err, 'code') and err.code in responses: raise FetchError(responses[err.code]) - is_temp = False + is_temp = getattr(err, 'worth_retry', False) reason = getattr(err, 'reason', None) if isinstance(reason, socket.gaierror): # see man gai_strerror() for details