mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-09 03:04:10 -04:00
Implement changing user-agent and setting cookies
This commit is contained in:
parent
1c5c36b1d7
commit
339ec662d4
@ -93,6 +93,19 @@ class Browser:
|
|||||||
|
|
||||||
open_novisit = open
|
open_novisit = open
|
||||||
|
|
||||||
|
def set_simple_cookie(self, name: str, value: str, domain: str | None = None, path: str | None = '/'):
|
||||||
|
'''
|
||||||
|
Set a simple cookie using a name and value. If domain is specified, the cookie is only sent with requests
|
||||||
|
to matching domains, otherwise it is sent with all requests. The leading dot in domain is optional.
|
||||||
|
Similarly, by default all paths match, to restrict to certain path use the path parameter.
|
||||||
|
'''
|
||||||
|
c = {'name': name, 'value': value, 'domain': domain, 'path': path}
|
||||||
|
self._send_command({'action': 'set_cookies', 'cookies':[c]})
|
||||||
|
|
||||||
|
def set_user_agent(self, val: str = '') -> None:
|
||||||
|
self.user_agent = val
|
||||||
|
self._send_command({'action': 'set_user_agent', 'user_agent': val})
|
||||||
|
|
||||||
def _send_command(self, cmd):
|
def _send_command(self, cmd):
|
||||||
self.worker.stdin.write(json.dumps(cmd).encode())
|
self.worker.stdin.write(json.dumps(cmd).encode())
|
||||||
self.worker.stdin.write(b'\n')
|
self.worker.stdin.write(b'\n')
|
||||||
|
@ -84,6 +84,8 @@ class FetchBackend(QWebEnginePage):
|
|||||||
|
|
||||||
request_download = pyqtSignal(str, str, object, float, int)
|
request_download = pyqtSignal(str, str, object, float, int)
|
||||||
input_finished = pyqtSignal(str)
|
input_finished = pyqtSignal(str)
|
||||||
|
set_cookies = pyqtSignal(object)
|
||||||
|
set_user_agent_signal = pyqtSignal(str)
|
||||||
download_finished = pyqtSignal(object)
|
download_finished = pyqtSignal(object)
|
||||||
|
|
||||||
def __init__(self, output_dir: str = '', cache_name: str = '', parent: QObject = None, user_agent: str = '') -> None:
|
def __init__(self, output_dir: str = '', cache_name: str = '', parent: QObject = None, user_agent: str = '') -> None:
|
||||||
@ -98,11 +100,14 @@ class FetchBackend(QWebEnginePage):
|
|||||||
self.interceptor = RequestInterceptor(self)
|
self.interceptor = RequestInterceptor(self)
|
||||||
profile.setUrlRequestInterceptor(self.interceptor)
|
profile.setUrlRequestInterceptor(self.interceptor)
|
||||||
self.request_download.connect(self.download, type=Qt.ConnectionType.QueuedConnection)
|
self.request_download.connect(self.download, type=Qt.ConnectionType.QueuedConnection)
|
||||||
|
self.set_cookies.connect(self._set_cookies, type=Qt.ConnectionType.QueuedConnection)
|
||||||
|
self.set_user_agent_signal.connect(self.set_user_agent, type=Qt.ConnectionType.QueuedConnection)
|
||||||
self.input_finished.connect(self.on_input_finished, type=Qt.ConnectionType.QueuedConnection)
|
self.input_finished.connect(self.on_input_finished, type=Qt.ConnectionType.QueuedConnection)
|
||||||
self.live_requests: set[DownloadRequest] = set()
|
self.live_requests: set[DownloadRequest] = set()
|
||||||
self.pending_download_requests: dict[int, DownloadRequest] = {}
|
self.pending_download_requests: dict[int, DownloadRequest] = {}
|
||||||
self.download_requests_by_id: dict[int, DownloadRequest] = {}
|
self.download_requests_by_id: dict[int, DownloadRequest] = {}
|
||||||
self.dr_identifier_count = 0
|
self.dr_identifier_count = 0
|
||||||
|
self.all_request_cookies: list[QNetworkCookie] = []
|
||||||
self.timeout_timer = t = QTimer(self)
|
self.timeout_timer = t = QTimer(self)
|
||||||
t.setInterval(50)
|
t.setInterval(50)
|
||||||
t.timeout.connect(self.enforce_timeouts)
|
t.timeout.connect(self.enforce_timeouts)
|
||||||
@ -138,6 +143,11 @@ class FetchBackend(QWebEnginePage):
|
|||||||
self.live_requests.add(dr)
|
self.live_requests.add(dr)
|
||||||
if not self.timeout_timer.isActive():
|
if not self.timeout_timer.isActive():
|
||||||
self.timeout_timer.start()
|
self.timeout_timer.start()
|
||||||
|
cs = self.profile().cookieStore()
|
||||||
|
for c in self.all_request_cookies:
|
||||||
|
c = QNetworkCookie(c)
|
||||||
|
c.normalize(qurl)
|
||||||
|
cs.setCookie(c)
|
||||||
super().download(qurl, str(self.dr_identifier_count))
|
super().download(qurl, str(self.dr_identifier_count))
|
||||||
|
|
||||||
def _download_requested(self, wdr: QWebEngineDownloadRequest) -> None:
|
def _download_requested(self, wdr: QWebEngineDownloadRequest) -> None:
|
||||||
@ -213,12 +223,31 @@ class FetchBackend(QWebEnginePage):
|
|||||||
def set_user_agent(self, new_val: str) -> None:
|
def set_user_agent(self, new_val: str) -> None:
|
||||||
self.profile().setHttpUserAgent(new_val)
|
self.profile().setHttpUserAgent(new_val)
|
||||||
|
|
||||||
def set_simple_cookie(self, name, value, domain, path='/'):
|
def _set_cookie_from_header(self, cookie_string: str) -> None:
|
||||||
cs = self.profile().cookieStore()
|
cs = self.profile().cookieStore()
|
||||||
cookie_string = f'{name}={value}; Domain={domain}; Path={path}'
|
for c in QNetworkCookie.parseCookies(cookie_string.encode()):
|
||||||
for c in QNetworkCookie.parseCookies(cookie_string):
|
|
||||||
cs.setCookie(c)
|
cs.setCookie(c)
|
||||||
|
|
||||||
|
def _set_cookies(self, cookies: list[dict[str, str]]) -> None:
|
||||||
|
for c in cookies:
|
||||||
|
if 'header' in c:
|
||||||
|
self._set_cookie_from_header(c['header'])
|
||||||
|
else:
|
||||||
|
self.set_simple_cookie(c['name'], c['value'], c.get('domain'), c.get('path'))
|
||||||
|
|
||||||
|
def set_simple_cookie(self, name: str, value: str, domain: str | None = None, path: str | None = '/'):
|
||||||
|
c = QNetworkCookie()
|
||||||
|
c.setName(name.encode())
|
||||||
|
c.setValue(value.encode())
|
||||||
|
if domain is not None:
|
||||||
|
c.setDomain(domain)
|
||||||
|
if path is not None:
|
||||||
|
c.setPath(path)
|
||||||
|
if c.domain():
|
||||||
|
self.profile().cookieStore().setCookie(c)
|
||||||
|
else:
|
||||||
|
self.all_request_cookies.append(c)
|
||||||
|
|
||||||
|
|
||||||
def read_commands(backend: FetchBackend, tdir: str) -> None:
|
def read_commands(backend: FetchBackend, tdir: str) -> None:
|
||||||
file_counter = 0
|
file_counter = 0
|
||||||
@ -233,6 +262,10 @@ def read_commands(backend: FetchBackend, tdir: str) -> None:
|
|||||||
if timeout is None:
|
if timeout is None:
|
||||||
timeout = default_timeout
|
timeout = default_timeout
|
||||||
backend.request_download.emit(cmd['url'], os.path.join(tdir, str(file_counter)), cmd.get('headers'), timeout, cmd.get('id', 0))
|
backend.request_download.emit(cmd['url'], os.path.join(tdir, str(file_counter)), cmd.get('headers'), timeout, cmd.get('id', 0))
|
||||||
|
elif ac == 'set_cookies':
|
||||||
|
backend.set_cookies.emit(cmd['cookies'])
|
||||||
|
elif ac == 'set_user_agent':
|
||||||
|
backend.set_user_agent_signal.emit(cmd['user_agent'])
|
||||||
elif ac == 'quit':
|
elif ac == 'quit':
|
||||||
break
|
break
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
@ -154,6 +154,11 @@ class TestFetchBackend(unittest.TestCase):
|
|||||||
r = get(headers={'th': '2', 'tc': '1'})
|
r = get(headers={'th': '2', 'tc': '1'})
|
||||||
self.ae(r['headers']['Th'], ['2'])
|
self.ae(r['headers']['Th'], ['2'])
|
||||||
self.ae(r['headers']['Tc'], ['1'])
|
self.ae(r['headers']['Tc'], ['1'])
|
||||||
|
br.set_simple_cookie('cook', 'ie')
|
||||||
|
br.set_user_agent('man in black')
|
||||||
|
r = get()
|
||||||
|
self.ae(r['headers']['User-Agent'], ['man in black'])
|
||||||
|
self.ae(r['headers']['Cookie'], ['sc=1; cook=ie'])
|
||||||
finally:
|
finally:
|
||||||
br.shutdown()
|
br.shutdown()
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user