Get basic fetching working

This commit is contained in:
Kovid Goyal 2024-08-16 11:58:14 +05:30
parent 3c5e4ed751
commit 885961acc0
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
4 changed files with 153 additions and 74 deletions

View File

@ -18,19 +18,33 @@
console.log(msg);
}
function debug() {
var args = Array.prototype.slice.call(arguments);
var text = args.join(' ');
send_msg({type: 'print', text: text});
}
function notify_that_messages_are_available() {
send_msg({type: 'messages_available', count: messages.length});
}
function notify(type, msg) {
msg.type = type;
messages.push(msg);
notify_that_messages_are_available();
}
async function* stream_to_async_iterable(stream) {
const reader = stream.getReader()
try {
while (true) {
const {done, value} = await reader.read()
if (done) return
yield value
}
} finally {
reader.releaseLock()
}
}
async function download(req, data) {
try {
const controller = new AbortController();
live_requests[req.id] = controller;
var fetch_options = {
method: req.method.toUpperCase(),
signal: controller.signal,
@ -40,13 +54,17 @@
for (const pair of response.headers) {
headers.push(pair);
}
const body = await response.arrayBuffer();
notify('metadata_received', {
status_code: response.status, status_msg: response.statusText,
url: response.url, headers: headers, response_type: response.type,
})
for await (const chunk of stream_to_async_iterable(response.body)) {
notify('chunk_received', {chunk: chunk})
}
delete live_requests[req.id];
messages.push({type: 'finished', req: req, status_code: response.status, status_msg: response.statusText, url: response.url, headers: headers, type: response.type, body: body});
notify_that_messages_are_available();
notify('finished', {})
} catch (error) {
messages.push({type: 'finished', error: error.message, req: req, url: req.url});
notify_that_messages_are_available();
notify('error', {error: error.message});
}
}

View File

@ -23,37 +23,41 @@ class FakeResponse:
self.queue = Queue()
self.done = False
self.final_url = ''
self._reason = ''
self._status = None
self._headers = []
self.data = BytesIO()
self._data = BytesIO()
def _wait(self):
if self.done:
return
self.done = True
res = self.queue.get()
del self.queue
if res['action'] == 'input_error':
raise Exception(res['error'])
self.final_url = res['final_url']
self._status = res.get('http_code')
self._reason = res.get('http_status_message')
self._headers = res['headers']
if 'error' in res:
ex = URLError(res['error'])
ex.worth_retry = bool(res.get('worth_retry'))
raise ex
self.data = open(res['output'], 'rb')
with suppress(FileNotFoundError):
self._data = open(res['output'], 'rb')
def read(self, *a, **kw):
self._wait()
ans = self.data.read(*a, **kw)
ans = self._data.read(*a, **kw)
return ans
def seek(self, *a, **kw):
self._wait()
return self.data.seek(*a, **kw)
return self._data.seek(*a, **kw)
def tell(self, *a, **kw):
return self.data.tell(*a, **kw)
return self._data.tell(*a, **kw)
@property
def url(self) -> str:
@ -75,6 +79,11 @@ class FakeResponse:
ans[k] = v
return ans
@property
def reason(self) -> str:
self._wait()
return self._reason or ''
def getcode(self) -> int | None:
return self.status
@ -85,7 +94,7 @@ class FakeResponse:
return self.headers
def close(self):
self.data.close()
self._data.close()
class Browser:

View File

@ -71,6 +71,16 @@ class CookieJar(QNetworkCookieJar):
return ans + super().cookiesForUrl(url)
def too_slow_or_timed_out(timeout: float, last_activity_at: float, created_at: float, downloaded_bytes: int, now: float) -> bool:
if timeout and last_activity_at + timeout < now:
return True
time_taken = now - created_at
if time_taken > default_timeout:
rate = downloaded_bytes / time_taken
return rate < 10
return False
class DownloadRequest(QObject):
worth_retry: bool = False
@ -83,14 +93,17 @@ class DownloadRequest(QObject):
self.req_id: int = req_id
self.created_at = self.last_activity_at = monotonic()
self.timeout = timeout
self.bytes_received = 0
self.reply.downloadProgress.connect(self.on_download_progress, type=Qt.ConnectionType.QueuedConnection)
self.reply.uploadProgress.connect(self.on_upload_progress, type=Qt.ConnectionType.QueuedConnection)
# self.reply.readyRead.connect(self.on_data_available)
def on_download_progress(self, bytes_received: int, bytes_total: int) -> None:
self.bytes_received = bytes_received
self.last_activity_at = monotonic()
def on_upload_progress(self, bytes_received: int, bytes_total: int) -> None:
self.bytes_received = bytes_received
self.last_activity_at = monotonic()
def save_data(self) -> None:
@ -132,14 +145,7 @@ class DownloadRequest(QObject):
return result
def too_slow_or_timed_out(self, now: float) -> bool:
if self.timeout and self.last_activity_at + self.timeout < now:
return True
time_taken = now - self.created_at
if time_taken > default_timeout:
downloaded = self.webengine_download_request.receivedBytes()
rate = downloaded / time_taken
return rate < 10
return False
return too_slow_or_timed_out(self.timeout, self.last_activity_at, self.created_at, self.bytes_received, now)
class FetchBackend(QNetworkAccessManager):
@ -186,8 +192,8 @@ class FetchBackend(QNetworkAccessManager):
timed_out = tuple(dr for dr in self.live_requests if dr.too_slow_or_timed_out(now))
for dr in timed_out:
dr.reply.abort()
if self.live_requests:
self.timeout_timer.start()
if not self.live_requests:
self.timeout_timer.stop()
def current_user_agent(self) -> str:
return self.user_agent

View File

@ -7,35 +7,24 @@ import json
import os
import secrets
import sys
from collections import deque
from contextlib import suppress
from http import HTTPStatus
from time import monotonic
from qt.core import QApplication, QNetworkCookie, QObject, Qt, QTimer, QUrl, pyqtSignal
from qt.core import QApplication, QByteArray, QNetworkCookie, QObject, Qt, QTimer, QUrl, pyqtSignal
from qt.webengine import QWebEnginePage, QWebEngineScript
from calibre.scraper.qt_backend import Request
from calibre.scraper.qt_backend import Request, too_slow_or_timed_out
from calibre.scraper.qt_backend import worker as qt_worker
from calibre.scraper.simple_backend import create_base_profile
from calibre.utils.resources import get_path as P
from calibre.utils.webengine import create_script, insert_scripts
default_timeout: float = 60. # seconds
def qurl_to_string(url: QUrl | str) -> str:
return bytes(QUrl(url).toEncoded()).decode()
def qurl_to_key(url: QUrl | str) -> str:
return qurl_to_string(url).rstrip('/')
Headers = list[tuple[str, str]]
class DownloadRequest(QObject):
worth_retry: bool = False
aborted_on_timeout: bool = False
response_received = pyqtSignal(object)
def __init__(self, url: str, output_path: str, timeout: float, req_id: int, parent: 'FetchBackend'):
@ -45,23 +34,50 @@ class DownloadRequest(QObject):
self.req_id: int = req_id
self.created_at = self.last_activity_at = monotonic()
self.timeout = timeout
def handle_response(self, r: dict) -> None:
result = {
self.bytes_received = 0
self.result = {
'action': 'finished', 'id': self.req_id, 'url': self.url, 'output': self.output_path,
'final_url': r['url'], 'headers': r.get('headers', []), 'worth_retry': self.worth_retry,
'headers': [], 'final_url': self.url, 'worth_retry': False,
}
if 'error' in r:
result['error'] = r['error']
def metadata_received(self, r: dict) -> None:
if r['response_type'] != 'basic':
print(f'WARNING: response type for {self.url} indicates headers are restrcited: {r["type"]}')
self.result['worth_retry'] = r['status_code'] in (
HTTPStatus.TOO_MANY_REQUESTS, HTTPStatus.REQUEST_TIMEOUT, HTTPStatus.SERVICE_UNAVAILABLE, HTTPStatus.GATEWAY_TIMEOUT)
self.result['final_url'] = r['url']
self.result['headers'] = r['headers']
self.result['http_code'] = r['status_code']
self.result['http_status_message'] = r['status_msg']
def chunk_received(self, chunk: QByteArray) -> None:
mv = memoryview(chunk)
self.bytes_received += len(mv)
with open(self.output_path, 'ab') as f:
f.write(mv)
def as_result(self, r: dict | None = {}) -> dict:
if self.aborted_on_timeout:
self.result['error'] = 'Timed out'
self.result['worth_retry'] = True
else:
if r['type'] != 'basic':
print(f'WARNING: response type for {self.url} indicates headers are restrcited: {r["type"]}')
with open(self.output_path, 'wb') as f:
f.write(memoryview(r['data']))
if r:
self.result['error'] = r['error']
self.result['worth_retry'] = True # usually some kind of network error
return self.result
def too_slow_or_timed_out(self, now: float) -> bool:
return too_slow_or_timed_out(self.timeout, self.last_activity_at, self.created_at, self.bytes_received, now)
class Worker(QWebEnginePage):
working_on_request: DownloadRequest | None = None
messages_dispatch = pyqtSignal(object)
result_received = pyqtSignal(object)
def __init__(self, profile, parent):
super().__init__(profile, parent)
self.messages_dispatch.connect(self.on_messages)
def javaScriptAlert(self, url, msg):
pass
@ -77,10 +93,8 @@ class Worker(QWebEnginePage):
if level == QWebEnginePage.JavaScriptConsoleMessageLevel.InfoMessageLevel and message.startswith(self.token):
msg = json.loads(message.partition(' ')[2])
t = msg.get('type')
if t == 'print':
print(msg['text'])
elif t == 'messages_available':
self.runjs('window.get_messages()', self.on_messages)
if t == 'messages_available':
self.runjs('window.get_messages()', self.messages_dispatch.emit)
else:
print(f'{source_id}:{line_num}:{message}')
return
@ -91,7 +105,6 @@ class Worker(QWebEnginePage):
def start_download(self, output_dir: str, req: Request, data: str) -> DownloadRequest:
filename = os.path.basename(req['filename'])
# TODO: Implement POST requests with data
# TODO: Implement timeout
payload = json.dumps({'req': req, 'data': data})
content = f'''<!DOCTYPE html>
<html><head></head></body><div id="payload">{html.escape(payload)}</div></body></html>
@ -100,11 +113,32 @@ class Worker(QWebEnginePage):
self.working_on_request = DownloadRequest(req['url'], os.path.join(output_dir, filename), req['timeout'], req['id'], self.parent())
return self.working_on_request
def abort_on_timeout(self) -> None:
if self.working_on_request is not None:
self.working_on_request.aborted_on_timeout = True
self.runjs(f'window.abort_download({self.req_id})')
def on_messages(self, messages: list[dict]) -> None:
if not messages:
return
if self.working_on_request is None:
print('Got messages without request:', messages)
return
self.working_on_request.last_activity_at = monotonic()
for m in messages:
if m['type'] == 'finished':
self.working_on_request.handle_response(m)
t = m['type']
if t == 'metadata_received':
self.working_on_request.metadata_received(m)
elif t == 'chunk_received':
self.working_on_request.chunk_received(m['chunk'])
elif t == 'finished':
result = self.working_on_request.as_result()
self.working_on_request = None
self.result_received.emit(result)
elif t == 'error':
result = self.working_on_request.as_result(m)
self.working_on_request = None
self.result_received.emit(result)
class FetchBackend(QObject):
@ -126,7 +160,7 @@ class FetchBackend(QObject):
self.profile = profile
super().__init__(parent)
self.workers: list[Worker] = []
self.pending_requests: list[tuple[Request, str]] = []
self.pending_requests: deque[tuple[Request, str]] = deque()
sys.excepthook = self.excepthook
self.request_download.connect(self.download, type=Qt.ConnectionType.QueuedConnection)
self.set_cookies.connect(self._set_cookies, type=Qt.ConnectionType.QueuedConnection)
@ -148,17 +182,16 @@ class FetchBackend(QObject):
QApplication.instance().exit(1)
def enforce_timeouts(self):
# TODO: Start timer on download and port this method
now = monotonic()
timed_out = tuple(dr for dr in self.live_requests if dr.too_slow_or_timed_out(now))
for dr in timed_out:
if dr.webengine_download_request is None:
dr.cancel_on_start = True
else:
dr.webengine_download_request.cancel()
self.live_requests.discard(dr)
if self.live_requests:
self.timeout_timer.start()
has_workers = False
for w in self.workers:
if w.working_on_request is not None:
if w.working_on_request.too_slow_or_timed_out(now):
w.abort_on_timeout()
else:
has_workers = True
if not has_workers:
self.timeout_timer.stop()
def download(self, req: Request) -> None:
qurl = QUrl(req['url'])
@ -177,19 +210,30 @@ class FetchBackend(QObject):
for w in self.workers:
if w.working_on_request is None:
w.start_download(self.output_dir, req, data)
self.timeout_timer.start()
return
if len(self.workers) < 5:
self.workers.append(self.create_worker)
self.workers[-1].start_download(self.output_dir, req, data)
self.timeout_timer.start()
return
# TODO: Drain pending requests on finish
self.pending_requests.append((req, data))
def create_worker(self) -> Worker:
ans = Worker(self.profile, self)
ans.token = self.token + ' '
ans.result_received.connect(self.result_received)
return ans
def result_received(self, result: dict) -> None:
self.send_response(result)
self.download_finished.emit(result)
if self.pending_requests:
w = self.sender()
req, data = self.pending_requests.popleft()
w.start_download(self.output_dir, req, data)
self.timeout_timer.start()
def send_response(self, r: dict[str, str]) -> None:
with suppress(OSError):
print(json.dumps(r), flush=True, file=sys.__stdout__)
@ -233,8 +277,10 @@ def worker(tdir: str, user_agent: str, verify_ssl_certificates: bool) -> None:
def develop(url: str) -> None:
from calibre.scraper.qt import WebEngineBrowser
br = WebEngineBrowser()
raw = br.open(url).read()
print(len(raw))
res = br.open(url)
print(f'{res.code} {res.reason}')
print(res.headers)
print(len(res.read()))
if __name__ == '__main__':