Get simple scraper working via worker processes

This commit is contained in:
Kovid Goyal 2022-04-02 06:25:14 +05:30
parent 91268eb9c0
commit 44ccd8104b
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C

View File

@ -9,11 +9,15 @@ import secrets
import sys import sys
import time import time
from functools import lru_cache from functools import lru_cache
from qt.core import QApplication, QEventLoop, QUrl from qt.core import QApplication, QEventLoop, QLoggingCategory, QUrl
from qt.webengine import QWebEnginePage, QWebEngineProfile, QWebEngineSettings from qt.webengine import QWebEnginePage, QWebEngineProfile, QWebEngineSettings
from threading import Lock
from calibre.constants import cache_dir from calibre.constants import cache_dir, iswindows
from calibre.gui2.webengine import create_script, insert_scripts from calibre.gui2.webengine import create_script, insert_scripts
from calibre.ptempfile import PersistentTemporaryFile
from calibre.utils.ipc.simple_worker import start_pipe_worker
from calibre.utils.filenames import retry_on_fail
def canonicalize_qurl(qurl): def canonicalize_qurl(qurl):
@ -123,13 +127,97 @@ class SimpleScraper(QWebEnginePage):
del self.current_fetch del self.current_fetch
def worker_main(source):
QLoggingCategory.setFilterRules('''\
qt.webenginecontext.info=false
''')
from calibre.gui2 import must_use_qt
must_use_qt()
s = SimpleScraper(source)
for line in sys.stdin.buffer:
line = line.strip()
try:
cmd, rest = line.split(b':', 1)
except Exception:
continue
if cmd == b'EXIT':
raise SystemExit(int(rest))
if cmd == b'FETCH':
try:
html = s.fetch(QUrl.fromEncoded(json.loads(rest).encode('utf-8')))
except Exception as e:
import traceback
result = {'ok': False, 'tb': traceback.format_exc(), 'err': str(e)}
else:
with PersistentTemporaryFile(suffix='-scraper-result.html') as t:
t.write(html.encode('utf-8'))
result = {'ok': True, 'html_file': t.name}
print(json.dumps(result), flush=True)
class Overseer:
def __init__(self):
self.lock = Lock()
self.workers = {}
def worker_for_source(self, source):
with self.lock:
ans = self.workers.get(source)
if ans is None:
w = start_pipe_worker(f'from calibre.scraper.simple import worker_main; worker_main({source!r})')
ans = self.workers[source] = w
return ans
def fetch_url(self, source, url_or_qurl):
w = self.worker_for_source(source)
if isinstance(url_or_qurl, str):
url_or_qurl = QUrl(url_or_qurl)
w.stdin.write(b'FETCH:')
w.stdin.write(json.dumps(bytes(url_or_qurl.toEncoded()).decode('utf-8')).encode('utf-8'))
w.stdin.write(b'\n')
w.stdin.flush()
output = json.loads(w.stdout.readline())
if not output['ok']:
raise ValueError(output['err'])
with open(output['html_file'], 'rb') as f:
html = f.read().decode('utf-8')
retry_on_fail(os.remove, output['html_file'])
return html
def __del__(self):
with self.lock:
for w in self.workers.values():
w.stdin.write(b'EXIT:0\n')
w.stdin.flush()
for w in self.workers.values():
if w.wait(0.2) is None:
w.terminate()
if not iswindows:
if w.wait(0.1) is None:
w.kill()
self.workers.clear()
def find_tests(): def find_tests():
import unittest import unittest
from lxml.html import fromstring, tostring
import re
class TestSimpleWebEngineScraper(unittest.TestCase): class TestSimpleWebEngineScraper(unittest.TestCase):
def test_dom_load(self): def test_dom_load(self):
return overseer = Overseer()
for f in ('book', 'nav'):
path = P(f'templates/new_{f}.html', allow_user_override=False)
url = QUrl.fromLocalFile(path)
html = overseer.fetch_url('test', url)
def c(a):
ans = tostring(fromstring(a.encode('utf-8')), pretty_print=True, encoding='unicode')
return re.sub(r'\s+', ' ', ans)
self.assertEqual(c(html), c(open(path, 'rb').read().decode('utf-8')))
self.assertRaises(ValueError, overseer.fetch_url, 'test', 'file:///does-not-exist.html')
return unittest.defaultTestLoader.loadTestsFromTestCase(TestSimpleWebEngineScraper) return unittest.defaultTestLoader.loadTestsFromTestCase(TestSimpleWebEngineScraper)