More work on new fetch engine

This commit is contained in:
Kovid Goyal 2024-08-08 20:47:07 +05:30
parent 6f307021a0
commit 40d217d7e7
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C

View File

@ -7,9 +7,10 @@ import os
import sys import sys
from contextlib import suppress from contextlib import suppress
from threading import Thread from threading import Thread
from time import monotonic
from typing import Union from typing import Union
from qt.core import QApplication, QNetworkCookie, QObject, Qt, QUrl, pyqtSignal from qt.core import QApplication, QNetworkCookie, QObject, Qt, QTimer, QUrl, pyqtSignal
from qt.webengine import QWebEngineDownloadRequest, QWebEnginePage, QWebEngineUrlRequestInfo, QWebEngineUrlRequestInterceptor from qt.webengine import QWebEngineDownloadRequest, QWebEnginePage, QWebEngineUrlRequestInfo, QWebEngineUrlRequestInterceptor
from calibre.scraper.simple_backend import create_base_profile from calibre.scraper.simple_backend import create_base_profile
@ -23,9 +24,7 @@ class RequestInterceptor(QWebEngineUrlRequestInterceptor):
key = qurl_to_key(req.requestUrl()) key = qurl_to_key(req.requestUrl())
if dr := fb.download_requests[key]: if dr := fb.download_requests[key]:
for name, x in dr.headers.items(): for name, x in dr.headers.items():
if isinstance(x, str): for val in ([x] if isinstance(x, str) else x):
x = [x]
for val in x:
req.setHttpHeader(name.encode(), val.encode()) req.setHttpHeader(name.encode(), val.encode())
@ -39,14 +38,39 @@ Headers = dict[str, Union[str, list[str]]]
class DownloadRequest: class DownloadRequest:
def __init__(self, url: str, filename: str, headers: Headers | None = None): cancel_on_start: bool = False
error: str = ''
finished: bool = False
worth_retry: bool = False
webengine_download_request: QWebEngineDownloadRequest | None = None
def __init__(self, url: str, filename: str, headers: Headers | None = None, timeout: float = 60.):
self.url, self.filename = url, filename self.url, self.filename = url, filename
self.url_key = qurl_to_key(url) self.url_key = qurl_to_key(url)
self.headers: Headers = headers or {} self.headers: Headers = headers or {}
self.num_of_responses_needed = 1 self.num_of_responses_needed = 1
self.finished = False
self.error_message = '' self.error_message = ''
self.webengine_download_request: QWebEngineDownloadRequest | None = None self.created_at = self.last_activity_at = monotonic()
self.timeout = timeout
def as_result(self) -> dict[str, str]:
result = {'action': 'finished', 'url': self.url, 'output': os.path.join(
self.webengine_download_request.downloadDirectory(), self.webengine_download_request.downloadFileName()),
'final_url': qurl_to_string(self.webengine_download_request.url())
}
if self.error:
result['error'], result['worth_retry'] = self.error, self.worth_retry
return result
def too_slow_or_timed_out(self, now: float) -> bool:
if self.timeout and self.last_activity_at + self.timeout < now:
return True
time_taken = now - self.created_at
if time_taken > 60 and self.webengine_download_request is not None:
downloaded = self.webengine_download_request.receivedBytes()
rate = downloaded / time_taken
return rate < 10
return False
class FetchBackend(QWebEnginePage): class FetchBackend(QWebEnginePage):
@ -66,26 +90,50 @@ class FetchBackend(QWebEnginePage):
self.request_download.connect(self.download, type=Qt.ConnectionType.QueuedConnection) self.request_download.connect(self.download, type=Qt.ConnectionType.QueuedConnection)
self.input_finished.connect(self.on_input_finished, type=Qt.ConnectionType.QueuedConnection) self.input_finished.connect(self.on_input_finished, type=Qt.ConnectionType.QueuedConnection)
self.download_requests: dict[str, DownloadRequest] = {} self.download_requests: dict[str, DownloadRequest] = {}
self.live_requests: set[DownloadRequest] = set()
self.pending_download_requests: dict[int, DownloadRequest] = {} self.pending_download_requests: dict[int, DownloadRequest] = {}
self.download_requests_by_id: dict[int, DownloadRequest] = {} self.download_requests_by_id: dict[int, DownloadRequest] = {}
self.dr_identifier_count = 0 self.dr_identifier_count = 0
self.timeout_timer = t = QTimer(self)
t.setInterval(50)
t.timeout.connect(self.enforce_timeouts)
def on_input_finished(self, error_msg: str) -> None: def on_input_finished(self, error_msg: str) -> None:
if error_msg: if error_msg:
self.send_response({'action': 'input_error', 'error': error_msg}) self.send_response({'action': 'input_error', 'error': error_msg})
QApplication.instance().exit(1) QApplication.instance().exit(1)
def download(self, url: str, filename: str, extra_headers: Headers | None = None) -> None: def enforce_timeouts(self):
now = monotonic()
timed_out = tuple(dr for dr in self.live_requests if dr.too_slow_or_timed_out(now))
for dr in timed_out:
if dr.webengine_download_request is None:
dr.cancel_on_start = True
else:
dr.webengine_download_request.cancel()
self.live_requests.discard(dr)
if self.live_requests:
self.timeout_timer.start()
def download(self, url: str, filename: str, extra_headers: Headers | None = None, timeout: float = 60.) -> None:
filename = os.path.basename(filename) filename = os.path.basename(filename)
qurl = QUrl(url) qurl = QUrl(url)
key = qurl_to_key(qurl) key = qurl_to_key(qurl)
dr = self.download_requests.get(key) dr = self.download_requests.get(key)
if dr: if dr and not dr.error:
if dr.finished:
result = dr.as_result()
self.download_finished.emit(result)
self.send_response(result)
else:
dr.num_of_responses_needed += 1 dr.num_of_responses_needed += 1
else: else:
self.download_requests[key] = dr = DownloadRequest(url, filename, extra_headers) self.download_requests[key] = dr = DownloadRequest(url, filename, extra_headers, timeout)
self.dr_identifier_count += 1 self.dr_identifier_count += 1
self.pending_download_requests[self.dr_identifier_count] = dr self.pending_download_requests[self.dr_identifier_count] = dr
self.live_requests.add(dr)
if not self.timeout_timer.isActive():
self.timeout_timer.start()
super().download(qurl, str(self.dr_identifier_count)) super().download(qurl, str(self.dr_identifier_count))
def _download_requested(self, wdr: QWebEngineDownloadRequest) -> None: def _download_requested(self, wdr: QWebEngineDownloadRequest) -> None:
@ -93,10 +141,13 @@ class FetchBackend(QWebEnginePage):
idc = int(wdr.suggestedFileName()) idc = int(wdr.suggestedFileName())
dr: DownloadRequest = self.pending_download_requests.pop(idc) dr: DownloadRequest = self.pending_download_requests.pop(idc)
except Exception: except Exception:
import traceback
traceback.print_exc()
return return
try: try:
if dr.cancel_on_start:
dr.error = 'Timed out trying to open URL'
dr.finished = True
return
dr.last_activity_at = monotonic()
if dr.filename: if dr.filename:
wdr.setDownloadFileName(dr.filename) wdr.setDownloadFileName(dr.filename)
dr.webengine_download_request = wdr dr.webengine_download_request = wdr
@ -111,7 +162,8 @@ class FetchBackend(QWebEnginePage):
def _bytes_received(self) -> None: def _bytes_received(self) -> None:
wdr: QWebEngineDownloadRequest = self.sender() wdr: QWebEngineDownloadRequest = self.sender()
wdr if dr := self.download_requests_by_id.get(wdr.id()):
dr.last_activity_at = monotonic()
def _download_finished(self) -> None: def _download_finished(self) -> None:
wdr: QWebEngineDownloadRequest = self.sender() wdr: QWebEngineDownloadRequest = self.sender()
@ -120,21 +172,30 @@ class FetchBackend(QWebEnginePage):
def report_finish(self, wdr: QWebEngineDownloadRequest, dr: DownloadRequest) -> None: def report_finish(self, wdr: QWebEngineDownloadRequest, dr: DownloadRequest) -> None:
s = wdr.state() s = wdr.state()
output = os.path.join(wdr.downloadDirectory(), wdr.downloadFileName())
result: dict[str, str] = {} result: dict[str, str] = {}
dr.last_activity_at = monotonic()
dr.finished = True
self.live_requests.discard(dr)
if s == QWebEngineDownloadRequest.DownloadState.DownloadRequested: if s == QWebEngineDownloadRequest.DownloadState.DownloadRequested:
# Open of URL failed dr.error = 'Open of URL failed'
result = {'action': 'finished', 'error':'Open of URL failed', 'url': dr.url, 'output': output} result = dr.as_result()
dr.finished = True
elif s == QWebEngineDownloadRequest.DownloadState.DownloadCancelled: elif s == QWebEngineDownloadRequest.DownloadState.DownloadCancelled:
result = {'action': 'finished', 'error':'Timed out waiting for download', 'url': dr.url, 'output': output} dr.error = 'Timed out waiting for download'
dr.finished = True dr.worth_retry = True
result = dr.as_result()
elif s == QWebEngineDownloadRequest.DownloadState.DownloadInterrupted: elif s == QWebEngineDownloadRequest.DownloadState.DownloadInterrupted:
result = {'action': 'finished', 'error':wdr.interruptReasonString(), 'url': dr.url, 'output': output} dr.error = wdr.interruptReasonString()
dr.finished = True dr.worth_retry = wdr.interruptReason() in (
QWebEngineDownloadRequest.DownloadInterruptReason.NetworkTimeout,
QWebEngineDownloadRequest.DownloadInterruptReason.NetworkFailed,
QWebEngineDownloadRequest.DownloadInterruptReason.NetworkDisconnected,
QWebEngineDownloadRequest.DownloadInterruptReason.NetworkServerDown,
QWebEngineDownloadRequest.DownloadInterruptReason.ServerUnreachable,
)
result = dr.as_result()
elif s == QWebEngineDownloadRequest.DownloadState.DownloadCompleted: elif s == QWebEngineDownloadRequest.DownloadState.DownloadCompleted:
result = {'action': 'finished', 'url': dr.url, 'output': output, 'final_url': qurl_to_string(wdr.url())} result = dr.as_result()
dr.finished = True
if result: if result:
self.download_finished.emit(result) self.download_finished.emit(result)