diff --git a/resources/scraper.js b/resources/scraper.js index 71442d7969..d29ada510d 100644 --- a/resources/scraper.js +++ b/resources/scraper.js @@ -18,19 +18,33 @@ console.log(msg); } - function debug() { - var args = Array.prototype.slice.call(arguments); - var text = args.join(' '); - send_msg({type: 'print', text: text}); - } - function notify_that_messages_are_available() { send_msg({type: 'messages_available', count: messages.length}); } + function notify(type, msg) { + msg.type = type; + messages.push(msg); + notify_that_messages_are_available(); + } + + async function* stream_to_async_iterable(stream) { + const reader = stream.getReader() + try { + while (true) { + const {done, value} = await reader.read() + if (done) return + yield value + } + } finally { + reader.releaseLock() + } + } + async function download(req, data) { try { const controller = new AbortController(); + live_requests[req.id] = controller; var fetch_options = { method: req.method.toUpperCase(), signal: controller.signal, @@ -40,13 +54,17 @@ for (const pair of response.headers) { headers.push(pair); } - const body = await response.arrayBuffer(); + notify('metadata_received', { + status_code: response.status, status_msg: response.statusText, + url: response.url, headers: headers, response_type: response.type, + }) + for await (const chunk of stream_to_async_iterable(response.body)) { + notify('chunk_received', {chunk: chunk}) + } delete live_requests[req.id]; - messages.push({type: 'finished', req: req, status_code: response.status, status_msg: response.statusText, url: response.url, headers: headers, type: response.type, body: body}); - notify_that_messages_are_available(); + notify('finished', {}) } catch (error) { - messages.push({type: 'finished', error: error.message, req: req, url: req.url}); - notify_that_messages_are_available(); + notify('error', {error: error.message}); } } diff --git a/src/calibre/scraper/qt.py b/src/calibre/scraper/qt.py index 7c4dd0eb8b..5435ea7d03 100644 --- a/src/calibre/scraper/qt.py +++ b/src/calibre/scraper/qt.py @@ -23,37 +23,41 @@ class FakeResponse: self.queue = Queue() self.done = False self.final_url = '' + self._reason = '' self._status = None self._headers = [] - self.data = BytesIO() + self._data = BytesIO() def _wait(self): if self.done: return self.done = True res = self.queue.get() + del self.queue if res['action'] == 'input_error': raise Exception(res['error']) self.final_url = res['final_url'] self._status = res.get('http_code') + self._reason = res.get('http_status_message') self._headers = res['headers'] if 'error' in res: ex = URLError(res['error']) ex.worth_retry = bool(res.get('worth_retry')) raise ex - self.data = open(res['output'], 'rb') + with suppress(FileNotFoundError): + self._data = open(res['output'], 'rb') def read(self, *a, **kw): self._wait() - ans = self.data.read(*a, **kw) + ans = self._data.read(*a, **kw) return ans def seek(self, *a, **kw): self._wait() - return self.data.seek(*a, **kw) + return self._data.seek(*a, **kw) def tell(self, *a, **kw): - return self.data.tell(*a, **kw) + return self._data.tell(*a, **kw) @property def url(self) -> str: @@ -75,6 +79,11 @@ class FakeResponse: ans[k] = v return ans + @property + def reason(self) -> str: + self._wait() + return self._reason or '' + def getcode(self) -> int | None: return self.status @@ -85,7 +94,7 @@ class FakeResponse: return self.headers def close(self): - self.data.close() + self._data.close() class Browser: diff --git a/src/calibre/scraper/qt_backend.py b/src/calibre/scraper/qt_backend.py index 3dcefbbef9..9e6c867894 100644 --- a/src/calibre/scraper/qt_backend.py +++ b/src/calibre/scraper/qt_backend.py @@ -71,6 +71,16 @@ class CookieJar(QNetworkCookieJar): return ans + super().cookiesForUrl(url) +def too_slow_or_timed_out(timeout: float, last_activity_at: float, created_at: float, downloaded_bytes: int, now: float) -> bool: + if timeout and last_activity_at + timeout < now: + return True + time_taken = now - created_at + if time_taken > default_timeout: + rate = downloaded_bytes / time_taken + return rate < 10 + return False + + class DownloadRequest(QObject): worth_retry: bool = False @@ -83,14 +93,17 @@ class DownloadRequest(QObject): self.req_id: int = req_id self.created_at = self.last_activity_at = monotonic() self.timeout = timeout + self.bytes_received = 0 self.reply.downloadProgress.connect(self.on_download_progress, type=Qt.ConnectionType.QueuedConnection) self.reply.uploadProgress.connect(self.on_upload_progress, type=Qt.ConnectionType.QueuedConnection) # self.reply.readyRead.connect(self.on_data_available) def on_download_progress(self, bytes_received: int, bytes_total: int) -> None: + self.bytes_received = bytes_received self.last_activity_at = monotonic() def on_upload_progress(self, bytes_received: int, bytes_total: int) -> None: + self.bytes_received = bytes_received self.last_activity_at = monotonic() def save_data(self) -> None: @@ -132,14 +145,7 @@ class DownloadRequest(QObject): return result def too_slow_or_timed_out(self, now: float) -> bool: - if self.timeout and self.last_activity_at + self.timeout < now: - return True - time_taken = now - self.created_at - if time_taken > default_timeout: - downloaded = self.webengine_download_request.receivedBytes() - rate = downloaded / time_taken - return rate < 10 - return False + return too_slow_or_timed_out(self.timeout, self.last_activity_at, self.created_at, self.bytes_received, now) class FetchBackend(QNetworkAccessManager): @@ -186,8 +192,8 @@ class FetchBackend(QNetworkAccessManager): timed_out = tuple(dr for dr in self.live_requests if dr.too_slow_or_timed_out(now)) for dr in timed_out: dr.reply.abort() - if self.live_requests: - self.timeout_timer.start() + if not self.live_requests: + self.timeout_timer.stop() def current_user_agent(self) -> str: return self.user_agent diff --git a/src/calibre/scraper/webengine_backend.py b/src/calibre/scraper/webengine_backend.py index 66acec4c7d..8b1cabf231 100644 --- a/src/calibre/scraper/webengine_backend.py +++ b/src/calibre/scraper/webengine_backend.py @@ -7,35 +7,24 @@ import json import os import secrets import sys +from collections import deque from contextlib import suppress +from http import HTTPStatus from time import monotonic -from qt.core import QApplication, QNetworkCookie, QObject, Qt, QTimer, QUrl, pyqtSignal +from qt.core import QApplication, QByteArray, QNetworkCookie, QObject, Qt, QTimer, QUrl, pyqtSignal from qt.webengine import QWebEnginePage, QWebEngineScript -from calibre.scraper.qt_backend import Request +from calibre.scraper.qt_backend import Request, too_slow_or_timed_out from calibre.scraper.qt_backend import worker as qt_worker from calibre.scraper.simple_backend import create_base_profile from calibre.utils.resources import get_path as P from calibre.utils.webengine import create_script, insert_scripts -default_timeout: float = 60. # seconds - - -def qurl_to_string(url: QUrl | str) -> str: - return bytes(QUrl(url).toEncoded()).decode() - - -def qurl_to_key(url: QUrl | str) -> str: - return qurl_to_string(url).rstrip('/') - - -Headers = list[tuple[str, str]] - class DownloadRequest(QObject): - worth_retry: bool = False + aborted_on_timeout: bool = False response_received = pyqtSignal(object) def __init__(self, url: str, output_path: str, timeout: float, req_id: int, parent: 'FetchBackend'): @@ -45,23 +34,50 @@ class DownloadRequest(QObject): self.req_id: int = req_id self.created_at = self.last_activity_at = monotonic() self.timeout = timeout - - def handle_response(self, r: dict) -> None: - result = { + self.bytes_received = 0 + self.result = { 'action': 'finished', 'id': self.req_id, 'url': self.url, 'output': self.output_path, - 'final_url': r['url'], 'headers': r.get('headers', []), 'worth_retry': self.worth_retry, + 'headers': [], 'final_url': self.url, 'worth_retry': False, } - if 'error' in r: - result['error'] = r['error'] + + def metadata_received(self, r: dict) -> None: + if r['response_type'] != 'basic': + print(f'WARNING: response type for {self.url} indicates headers are restrcited: {r["type"]}') + self.result['worth_retry'] = r['status_code'] in ( + HTTPStatus.TOO_MANY_REQUESTS, HTTPStatus.REQUEST_TIMEOUT, HTTPStatus.SERVICE_UNAVAILABLE, HTTPStatus.GATEWAY_TIMEOUT) + self.result['final_url'] = r['url'] + self.result['headers'] = r['headers'] + self.result['http_code'] = r['status_code'] + self.result['http_status_message'] = r['status_msg'] + + def chunk_received(self, chunk: QByteArray) -> None: + mv = memoryview(chunk) + self.bytes_received += len(mv) + with open(self.output_path, 'ab') as f: + f.write(mv) + + def as_result(self, r: dict | None = {}) -> dict: + if self.aborted_on_timeout: + self.result['error'] = 'Timed out' + self.result['worth_retry'] = True else: - if r['type'] != 'basic': - print(f'WARNING: response type for {self.url} indicates headers are restrcited: {r["type"]}') - with open(self.output_path, 'wb') as f: - f.write(memoryview(r['data'])) + if r: + self.result['error'] = r['error'] + self.result['worth_retry'] = True # usually some kind of network error + return self.result + + def too_slow_or_timed_out(self, now: float) -> bool: + return too_slow_or_timed_out(self.timeout, self.last_activity_at, self.created_at, self.bytes_received, now) class Worker(QWebEnginePage): working_on_request: DownloadRequest | None = None + messages_dispatch = pyqtSignal(object) + result_received = pyqtSignal(object) + + def __init__(self, profile, parent): + super().__init__(profile, parent) + self.messages_dispatch.connect(self.on_messages) def javaScriptAlert(self, url, msg): pass @@ -77,10 +93,8 @@ class Worker(QWebEnginePage): if level == QWebEnginePage.JavaScriptConsoleMessageLevel.InfoMessageLevel and message.startswith(self.token): msg = json.loads(message.partition(' ')[2]) t = msg.get('type') - if t == 'print': - print(msg['text']) - elif t == 'messages_available': - self.runjs('window.get_messages()', self.on_messages) + if t == 'messages_available': + self.runjs('window.get_messages()', self.messages_dispatch.emit) else: print(f'{source_id}:{line_num}:{message}') return @@ -91,7 +105,6 @@ class Worker(QWebEnginePage): def start_download(self, output_dir: str, req: Request, data: str) -> DownloadRequest: filename = os.path.basename(req['filename']) # TODO: Implement POST requests with data - # TODO: Implement timeout payload = json.dumps({'req': req, 'data': data}) content = f'''
{html.escape(payload)}
@@ -100,11 +113,32 @@ class Worker(QWebEnginePage): self.working_on_request = DownloadRequest(req['url'], os.path.join(output_dir, filename), req['timeout'], req['id'], self.parent()) return self.working_on_request + def abort_on_timeout(self) -> None: + if self.working_on_request is not None: + self.working_on_request.aborted_on_timeout = True + self.runjs(f'window.abort_download({self.req_id})') + def on_messages(self, messages: list[dict]) -> None: + if not messages: + return + if self.working_on_request is None: + print('Got messages without request:', messages) + return + self.working_on_request.last_activity_at = monotonic() for m in messages: - if m['type'] == 'finished': - self.working_on_request.handle_response(m) + t = m['type'] + if t == 'metadata_received': + self.working_on_request.metadata_received(m) + elif t == 'chunk_received': + self.working_on_request.chunk_received(m['chunk']) + elif t == 'finished': + result = self.working_on_request.as_result() self.working_on_request = None + self.result_received.emit(result) + elif t == 'error': + result = self.working_on_request.as_result(m) + self.working_on_request = None + self.result_received.emit(result) class FetchBackend(QObject): @@ -126,7 +160,7 @@ class FetchBackend(QObject): self.profile = profile super().__init__(parent) self.workers: list[Worker] = [] - self.pending_requests: list[tuple[Request, str]] = [] + self.pending_requests: deque[tuple[Request, str]] = deque() sys.excepthook = self.excepthook self.request_download.connect(self.download, type=Qt.ConnectionType.QueuedConnection) self.set_cookies.connect(self._set_cookies, type=Qt.ConnectionType.QueuedConnection) @@ -148,17 +182,16 @@ class FetchBackend(QObject): QApplication.instance().exit(1) def enforce_timeouts(self): - # TODO: Start timer on download and port this method now = monotonic() - timed_out = tuple(dr for dr in self.live_requests if dr.too_slow_or_timed_out(now)) - for dr in timed_out: - if dr.webengine_download_request is None: - dr.cancel_on_start = True - else: - dr.webengine_download_request.cancel() - self.live_requests.discard(dr) - if self.live_requests: - self.timeout_timer.start() + has_workers = False + for w in self.workers: + if w.working_on_request is not None: + if w.working_on_request.too_slow_or_timed_out(now): + w.abort_on_timeout() + else: + has_workers = True + if not has_workers: + self.timeout_timer.stop() def download(self, req: Request) -> None: qurl = QUrl(req['url']) @@ -177,19 +210,30 @@ class FetchBackend(QObject): for w in self.workers: if w.working_on_request is None: w.start_download(self.output_dir, req, data) + self.timeout_timer.start() return if len(self.workers) < 5: self.workers.append(self.create_worker) self.workers[-1].start_download(self.output_dir, req, data) + self.timeout_timer.start() return - # TODO: Drain pending requests on finish self.pending_requests.append((req, data)) def create_worker(self) -> Worker: ans = Worker(self.profile, self) ans.token = self.token + ' ' + ans.result_received.connect(self.result_received) return ans + def result_received(self, result: dict) -> None: + self.send_response(result) + self.download_finished.emit(result) + if self.pending_requests: + w = self.sender() + req, data = self.pending_requests.popleft() + w.start_download(self.output_dir, req, data) + self.timeout_timer.start() + def send_response(self, r: dict[str, str]) -> None: with suppress(OSError): print(json.dumps(r), flush=True, file=sys.__stdout__) @@ -233,8 +277,10 @@ def worker(tdir: str, user_agent: str, verify_ssl_certificates: bool) -> None: def develop(url: str) -> None: from calibre.scraper.qt import WebEngineBrowser br = WebEngineBrowser() - raw = br.open(url).read() - print(len(raw)) + res = br.open(url) + print(f'{res.code} {res.reason}') + print(res.headers) + print(len(res.read())) if __name__ == '__main__':