Enhance autocomplete functionality by adding environment variable check to enable/disable it globally. Improve error handling in HTTP client for closed connections and add client recreation logic. Refactor link extraction to avoid details elements in search results.

This commit is contained in:
Don-Swanson 2025-09-23 21:37:21 -05:00
parent 99c7c7b00d
commit ffdeeb5f44
No known key found for this signature in database
GPG Key ID: C6A6ACD574A005E5
5 changed files with 89 additions and 32 deletions

View File

@ -265,26 +265,35 @@ class Request:
list: The list of matches for possible search suggestions
"""
ac_query = dict(q=query)
if self.language:
ac_query['lr'] = self.language
if self.country:
ac_query['gl'] = self.country
if self.lang_interface:
ac_query['hl'] = self.lang_interface
response = self.send(base_url=AUTOCOMPLETE_URL,
query=urlparse.urlencode(ac_query)).text
if not response:
# Check if autocomplete is disabled via environment variable
if os.environ.get('WHOOGLE_AUTOCOMPLETE', '1') == '0':
return []
try:
root = ET.fromstring(response)
return [_.attrib['data'] for _ in
root.findall('.//suggestion/[@data]')]
except ET.ParseError:
# Malformed XML response
ac_query = dict(q=query)
if self.language:
ac_query['lr'] = self.language
if self.country:
ac_query['gl'] = self.country
if self.lang_interface:
ac_query['hl'] = self.lang_interface
response = self.send(base_url=AUTOCOMPLETE_URL,
query=urlparse.urlencode(ac_query)).text
if not response:
return []
try:
root = ET.fromstring(response)
return [_.attrib['data'] for _ in
root.findall('.//suggestion/[@data]')]
except ET.ParseError:
# Malformed XML response
return []
except Exception as e:
# Log the error but don't crash - autocomplete is non-essential
print(f"Autocomplete error: {str(e)}")
return []
def send(self, base_url='', query='', attempt=0,

View File

@ -283,9 +283,11 @@ def autocomplete():
#
# Note: If Tor is enabled, this returns nothing, as the request is
# almost always rejected
# Also check if autocomplete is disabled globally
autocomplete_enabled = os.environ.get('WHOOGLE_AUTOCOMPLETE', '1') != '0'
return jsonify([
q,
g.user_request.autocomplete(q) if not g.user_config.tor else []
g.user_request.autocomplete(q) if (not g.user_config.tor and autocomplete_enabled) else []
])
@app.route(f'/{Endpoint.search}', methods=['GET', 'POST'])

View File

@ -84,13 +84,22 @@ class HttpxClient:
attempt = 0
while attempt <= retries:
try:
# Check if client is closed and recreate if needed
if self._client.is_closed:
self._recreate_client()
response = self._client.get(url, headers=headers, cookies=cookies)
if use_cache and response.status_code == 200:
with self._cache_lock:
self._cache[key] = response
return response
except httpx.HTTPError as exc:
except (httpx.HTTPError, RuntimeError) as exc:
last_exc = exc
if "client has been closed" in str(exc).lower():
# Recreate client and try again
self._recreate_client()
if attempt < retries:
continue
if attempt == retries:
raise
time.sleep(backoff_seconds * (2 ** attempt))
@ -101,6 +110,44 @@ class HttpxClient:
raise last_exc
raise httpx.HTTPError('Unknown HTTP error')
def _recreate_client(self) -> None:
"""Recreate the HTTP client when it has been closed."""
try:
self._client.close()
except Exception:
pass # Client might already be closed
# Recreate with same configuration
client_kwargs = dict(timeout=self._timeout_seconds,
follow_redirects=True)
if self._proxies:
proxy_values = list(self._proxies.values())
single_proxy = proxy_values[0] if proxy_values and all(v == proxy_values[0] for v in proxy_values) else None
if single_proxy:
try:
self._client = httpx.Client(proxy=single_proxy, **client_kwargs)
except TypeError:
try:
self._client = httpx.Client(proxies=self._proxies, **client_kwargs)
except TypeError:
mounts: Dict[str, httpx.Proxy] = {}
for scheme_key, url in self._proxies.items():
prefix = f"{scheme_key}://"
mounts[prefix] = httpx.Proxy(url)
self._client = httpx.Client(mounts=mounts, **client_kwargs)
else:
try:
self._client = httpx.Client(proxies=self._proxies, **client_kwargs)
except TypeError:
mounts: Dict[str, httpx.Proxy] = {}
for scheme_key, url in self._proxies.items():
prefix = f"{scheme_key}://"
mounts[prefix] = httpx.Proxy(url)
self._client = httpx.Client(mounts=mounts, **client_kwargs)
else:
self._client = httpx.Client(**client_kwargs)
def close(self) -> None:
self._client.close()

View File

@ -136,7 +136,7 @@ def has_ad_content(element: str) -> bool:
or '' in element)
def get_first_link(soup: BeautifulSoup) -> str:
def get_first_link(soup) -> str:
"""Retrieves the first result link from the query response
Args:
@ -147,24 +147,18 @@ def get_first_link(soup: BeautifulSoup) -> str:
"""
first_link = ''
orig_details = []
# Temporarily remove details so we don't grab those links
for details in soup.find_all('details'):
temp_details = soup.new_tag('removed_details')
orig_details.append(details.replace_with(temp_details))
# Replace hrefs with only the intended destination (no "utm" type tags)
# Find the first valid search result link, excluding details elements
for a in soup.find_all('a', href=True):
# Skip links that are inside details elements (collapsible sections)
if a.find_parent('details'):
continue
# Return the first search result URL
if a['href'].startswith('http://') or a['href'].startswith('https://'):
first_link = a['href']
break
# Add the details back
for orig_detail, details in zip(orig_details, soup.find_all('removed_details')):
details.replace_with(orig_detail)
return first_link

View File

@ -161,6 +161,11 @@ class Search:
# Produce cleanable html soup from response
get_body_safed = get_body.text.replace("&lt;","andlt;").replace("&gt;","andgt;")
html_soup = bsoup(get_body_safed, 'html.parser')
# Ensure we extract only the content within <html> if it exists
# This prevents doctype declarations from appearing in the output
if html_soup.html:
html_soup = html_soup.html
# Replace current soup if view_image is active
# FIXME: Broken since the user agent changes as of 16 Jan 2025