mirror of
https://github.com/searxng/searxng.git
synced 2025-10-30 10:12:31 -04:00
[fix] JSON format: serialization of the result-types
The ``JSONEncoder`` (``format="json"``) must perform a conversion to the
built-in types for the ``msgspec.Struct``::
if isinstance(o, msgspec.Struct):
return msgspec.to_builtins(o)
The result types are already of type ``msgspec.Struct``, so they can be
converted into built-in types.
The field types (in the result type) that were not yet of type ``msgspec.Struct``
have been converted to::
searx.weather.GeoLocation@dataclass -> msgspec.Struct
searx.weather.DateTime -> msgspec.Struct
searx.weather.Temperature -> msgspec.Struct
searx.weather.PressureUnits -> msgspec.Struct
searx.weather.WindSpeed -> msgspec.Struct
searx.weather.RelativeHumidity -> msgspec.Struct
searx.weather.Compass -> msgspec.Struct
BTW: Wherever it seemed sensible, the typing was also modernized in the modified
files.
Closes: https://github.com/searxng/searxng/issues/5250
Signed-off-by: Markus Heiser <markus.heiser@darmarit.de>
This commit is contained in:
parent
41e0f2abf0
commit
e16b6cb148
@ -76,12 +76,12 @@ def _weather_data(location: weather.GeoLocation, data: dict[str, t.Any]):
|
||||
|
||||
return EngineResults.types.WeatherAnswer.Item(
|
||||
location=location,
|
||||
temperature=weather.Temperature(unit="°C", value=data['temperature']),
|
||||
temperature=weather.Temperature(val=data['temperature'], unit="°C"),
|
||||
condition=WEATHERKIT_TO_CONDITION[data["conditionCode"]],
|
||||
feels_like=weather.Temperature(unit="°C", value=data['temperatureApparent']),
|
||||
feels_like=weather.Temperature(val=data['temperatureApparent'], unit="°C"),
|
||||
wind_from=weather.Compass(data["windDirection"]),
|
||||
wind_speed=weather.WindSpeed(data["windSpeed"], unit="mi/h"),
|
||||
pressure=weather.Pressure(data["pressure"], unit="hPa"),
|
||||
wind_speed=weather.WindSpeed(val=data["windSpeed"], unit="mi/h"),
|
||||
pressure=weather.Pressure(val=data["pressure"], unit="hPa"),
|
||||
humidity=weather.RelativeHumidity(data["humidity"] * 100),
|
||||
cloud_cover=data["cloudCover"] * 100,
|
||||
)
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""Open Meteo (weather)"""
|
||||
|
||||
import typing as t
|
||||
|
||||
from urllib.parse import urlencode
|
||||
from datetime import datetime
|
||||
|
||||
@ -106,16 +108,16 @@ WMO_TO_CONDITION: dict[int, weather.WeatherConditionType] = {
|
||||
}
|
||||
|
||||
|
||||
def _weather_data(location: weather.GeoLocation, data: dict):
|
||||
def _weather_data(location: weather.GeoLocation, data: dict[str, t.Any]):
|
||||
|
||||
return WeatherAnswer.Item(
|
||||
location=location,
|
||||
temperature=weather.Temperature(unit="°C", value=data["temperature_2m"]),
|
||||
temperature=weather.Temperature(val=data["temperature_2m"], unit="°C"),
|
||||
condition=WMO_TO_CONDITION[data["weather_code"]],
|
||||
feels_like=weather.Temperature(unit="°C", value=data["apparent_temperature"]),
|
||||
feels_like=weather.Temperature(val=data["apparent_temperature"], unit="°C"),
|
||||
wind_from=weather.Compass(data["wind_direction_10m"]),
|
||||
wind_speed=weather.WindSpeed(data["wind_speed_10m"], unit="km/h"),
|
||||
pressure=weather.Pressure(data["pressure_msl"], unit="hPa"),
|
||||
wind_speed=weather.WindSpeed(val=data["wind_speed_10m"], unit="km/h"),
|
||||
pressure=weather.Pressure(val=data["pressure_msl"], unit="hPa"),
|
||||
humidity=weather.RelativeHumidity(data["relative_humidity_2m"]),
|
||||
cloud_cover=data["cloud_cover"],
|
||||
)
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
"""wttr.in (weather forecast service)"""
|
||||
|
||||
import typing as t
|
||||
|
||||
from urllib.parse import quote
|
||||
from datetime import datetime
|
||||
|
||||
@ -80,19 +82,19 @@ def request(query, params):
|
||||
return params
|
||||
|
||||
|
||||
def _weather_data(location: weather.GeoLocation, data: dict):
|
||||
def _weather_data(location: weather.GeoLocation, data: dict[str, t.Any]):
|
||||
# the naming between different data objects is inconsitent, thus temp_C and
|
||||
# tempC are possible
|
||||
tempC: float = data.get("temp_C") or data.get("tempC") # type: ignore
|
||||
|
||||
return WeatherAnswer.Item(
|
||||
location=location,
|
||||
temperature=weather.Temperature(unit="°C", value=tempC),
|
||||
temperature=weather.Temperature(val=tempC, unit="°C"),
|
||||
condition=WWO_TO_CONDITION[data["weatherCode"]],
|
||||
feels_like=weather.Temperature(unit="°C", value=data["FeelsLikeC"]),
|
||||
feels_like=weather.Temperature(val=data["FeelsLikeC"], unit="°C"),
|
||||
wind_from=weather.Compass(int(data["winddirDegree"])),
|
||||
wind_speed=weather.WindSpeed(data["windspeedKmph"], unit="km/h"),
|
||||
pressure=weather.Pressure(data["pressure"], unit="hPa"),
|
||||
wind_speed=weather.WindSpeed(val=data["windspeedKmph"], unit="km/h"),
|
||||
pressure=weather.Pressure(val=data["pressure"], unit="hPa"),
|
||||
humidity=weather.RelativeHumidity(data["humidity"]),
|
||||
cloud_cover=data["cloudcover"],
|
||||
)
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
# pylint: disable=too-few-public-methods,missing-module-docstring
|
||||
|
||||
|
||||
__all__ = ["PluginInfo", "Plugin", "PluginCfg", "PluginStorage"]
|
||||
|
||||
import abc
|
||||
@ -9,16 +8,17 @@ import importlib
|
||||
import inspect
|
||||
import logging
|
||||
import re
|
||||
import typing
|
||||
from collections.abc import Sequence
|
||||
|
||||
import typing as t
|
||||
from collections.abc import Generator
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from searx.extended_types import SXNG_Request
|
||||
from searx.result_types import Result
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
if t.TYPE_CHECKING:
|
||||
from searx.search import SearchWithPlugins
|
||||
from searx.result_types import Result, EngineResults, LegacyResult # pyright: ignore[reportPrivateLocalImportUsage]
|
||||
import flask
|
||||
|
||||
log: logging.Logger = logging.getLogger("searx.plugins")
|
||||
@ -42,7 +42,7 @@ class PluginInfo:
|
||||
description: str
|
||||
"""Short description of the *answerer*."""
|
||||
|
||||
preference_section: typing.Literal["general", "ui", "privacy", "query"] | None = "general"
|
||||
preference_section: t.Literal["general", "ui", "privacy", "query"] | None = "general"
|
||||
"""Section (tab/group) in the preferences where this plugin is shown to the
|
||||
user.
|
||||
|
||||
@ -71,7 +71,7 @@ class Plugin(abc.ABC):
|
||||
id: str = ""
|
||||
"""The ID (suffix) in the HTML form."""
|
||||
|
||||
active: typing.ClassVar[bool]
|
||||
active: t.ClassVar[bool]
|
||||
"""Plugin is enabled/disabled by default (:py:obj:`PluginCfg.active`)."""
|
||||
|
||||
keywords: list[str] = []
|
||||
@ -109,7 +109,7 @@ class Plugin(abc.ABC):
|
||||
raise ValueError(f"plugin ID {self.id} contains invalid character (use lowercase ASCII)")
|
||||
|
||||
if not getattr(self, "log", None):
|
||||
pkg_name = inspect.getmodule(self.__class__).__package__ # type: ignore
|
||||
pkg_name = inspect.getmodule(self.__class__).__package__ # pyright: ignore[reportOptionalMemberAccess]
|
||||
self.log = logging.getLogger(f"{pkg_name}.{self.id}")
|
||||
|
||||
def __hash__(self) -> int:
|
||||
@ -120,7 +120,7 @@ class Plugin(abc.ABC):
|
||||
|
||||
return id(self)
|
||||
|
||||
def __eq__(self, other: typing.Any):
|
||||
def __eq__(self, other: t.Any):
|
||||
"""py:obj:`Plugin` objects are equal if the hash values of the two
|
||||
objects are equal."""
|
||||
|
||||
@ -146,7 +146,7 @@ class Plugin(abc.ABC):
|
||||
"""
|
||||
return True
|
||||
|
||||
def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool:
|
||||
def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: "Result") -> bool:
|
||||
"""Runs for each result of each engine and returns a boolean:
|
||||
|
||||
- ``True`` to keep the result
|
||||
@ -166,7 +166,9 @@ class Plugin(abc.ABC):
|
||||
"""
|
||||
return True
|
||||
|
||||
def post_search(self, request: SXNG_Request, search: "SearchWithPlugins") -> None | Sequence[Result]:
|
||||
def post_search(
|
||||
self, request: SXNG_Request, search: "SearchWithPlugins"
|
||||
) -> "None | list[Result | LegacyResult] | EngineResults":
|
||||
"""Runs AFTER the search request. Can return a list of
|
||||
:py:obj:`Result <searx.result_types._base.Result>` objects to be added to the
|
||||
final result list."""
|
||||
@ -196,7 +198,7 @@ class PluginStorage:
|
||||
def __init__(self):
|
||||
self.plugin_list = set()
|
||||
|
||||
def __iter__(self):
|
||||
def __iter__(self) -> Generator[Plugin]:
|
||||
yield from self.plugin_list
|
||||
|
||||
def __len__(self):
|
||||
@ -207,7 +209,7 @@ class PluginStorage:
|
||||
|
||||
return [p.info for p in self.plugin_list]
|
||||
|
||||
def load_settings(self, cfg: dict[str, dict[str, typing.Any]]):
|
||||
def load_settings(self, cfg: dict[str, dict[str, t.Any]]):
|
||||
"""Load plugins configured in SearXNG's settings :ref:`settings
|
||||
plugins`."""
|
||||
|
||||
@ -262,7 +264,7 @@ class PluginStorage:
|
||||
break
|
||||
return ret
|
||||
|
||||
def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: Result) -> bool:
|
||||
def on_result(self, request: SXNG_Request, search: "SearchWithPlugins", result: "Result") -> bool:
|
||||
|
||||
ret = True
|
||||
for plugin in [p for p in self.plugin_list if p.id in search.user_plugins]:
|
||||
|
||||
@ -1,12 +1,11 @@
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
# pylint: disable=missing-module-docstring
|
||||
|
||||
from __future__ import annotations
|
||||
import typing as t
|
||||
|
||||
import datetime
|
||||
|
||||
from flask_babel import gettext # type: ignore
|
||||
from flask_babel import gettext
|
||||
from searx.result_types import EngineResults
|
||||
from searx.weather import DateTime, GeoLocation
|
||||
|
||||
@ -53,13 +52,13 @@ class SXNGPlugin(Plugin):
|
||||
search_term = " ".join(query_parts).strip()
|
||||
|
||||
if not search_term:
|
||||
date_time = DateTime(time=datetime.datetime.now())
|
||||
date_time = DateTime(datetime.datetime.now())
|
||||
results.add(results.types.Answer(answer=date_time.l10n()))
|
||||
return results
|
||||
|
||||
geo = GeoLocation.by_query(search_term=search_term)
|
||||
if geo:
|
||||
date_time = DateTime(time=datetime.datetime.now(tz=geo.zoneinfo))
|
||||
date_time = DateTime(datetime.datetime.now(tz=geo.zoneinfo))
|
||||
tz_name = geo.timezone.replace('_', ' ')
|
||||
results.add(
|
||||
results.types.Answer(
|
||||
|
||||
195
searx/weather.py
195
searx/weather.py
@ -14,11 +14,10 @@ __all__ = [
|
||||
"GeoLocation",
|
||||
]
|
||||
|
||||
import typing
|
||||
import typing as t
|
||||
|
||||
import base64
|
||||
import datetime
|
||||
import dataclasses
|
||||
import zoneinfo
|
||||
|
||||
from urllib.parse import quote_plus
|
||||
@ -27,7 +26,8 @@ import babel
|
||||
import babel.numbers
|
||||
import babel.dates
|
||||
import babel.languages
|
||||
import flask_babel
|
||||
import flask_babel # pyright: ignore[reportMissingTypeStubs]
|
||||
import msgspec
|
||||
|
||||
from searx import network
|
||||
from searx.cache import ExpireCache, ExpireCacheCfg
|
||||
@ -120,8 +120,7 @@ def symbol_url(condition: "WeatherConditionType") -> str | None:
|
||||
return data_url
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class GeoLocation:
|
||||
class GeoLocation(msgspec.Struct, kw_only=True):
|
||||
"""Minimal implementation of Geocoding."""
|
||||
|
||||
# The type definition was based on the properties from the geocoding API of
|
||||
@ -176,6 +175,8 @@ class GeoLocation:
|
||||
|
||||
ctx = "weather_geolocation_by_query"
|
||||
cache = get_WEATHER_DATA_CACHE()
|
||||
# {'name': 'Berlin', 'latitude': 52.52437, 'longitude': 13.41053,
|
||||
# 'elevation': 74.0, 'country_code': 'DE', 'timezone': 'Europe/Berlin'}
|
||||
geo_props = cache.get(search_term, ctx=ctx)
|
||||
|
||||
if not geo_props:
|
||||
@ -194,15 +195,14 @@ class GeoLocation:
|
||||
if not results:
|
||||
raise ValueError(f"unknown geo location: '{search_term}'")
|
||||
location = results[0]
|
||||
return {field.name: location[field.name] for field in dataclasses.fields(cls)}
|
||||
return {field_name: location[field_name] for field_name in cls.__struct_fields__}
|
||||
|
||||
|
||||
DateTimeFormats = typing.Literal["full", "long", "medium", "short"]
|
||||
DateTimeLocaleTypes = typing.Literal["UI"]
|
||||
DateTimeFormats = t.Literal["full", "long", "medium", "short"]
|
||||
DateTimeLocaleTypes = t.Literal["UI"]
|
||||
|
||||
|
||||
@typing.final
|
||||
class DateTime:
|
||||
class DateTime(msgspec.Struct):
|
||||
"""Class to represent date & time. Essentially, it is a wrapper that
|
||||
conveniently combines :py:obj:`datetime.datetime` and
|
||||
:py:obj:`babel.dates.format_datetime`. A conversion of time zones is not
|
||||
@ -216,8 +216,7 @@ class DateTime:
|
||||
as the value for the ``locale``.
|
||||
"""
|
||||
|
||||
def __init__(self, time: datetime.datetime):
|
||||
self.datetime = time
|
||||
datetime: datetime.datetime
|
||||
|
||||
def __str__(self):
|
||||
return self.l10n()
|
||||
@ -252,36 +251,35 @@ class DateTime:
|
||||
return babel.dates.format_date(self.datetime, format=fmt, locale=locale)
|
||||
|
||||
|
||||
@typing.final
|
||||
class Temperature:
|
||||
TemperatureUnits: t.TypeAlias = t.Literal["°C", "°F", "K"]
|
||||
|
||||
|
||||
class Temperature(msgspec.Struct, kw_only=True):
|
||||
"""Class for converting temperature units and for string representation of
|
||||
measured values."""
|
||||
|
||||
si_name = "Q11579"
|
||||
val: float
|
||||
unit: TemperatureUnits
|
||||
|
||||
Units = typing.Literal["°C", "°F", "K"]
|
||||
"""Supported temperature units."""
|
||||
si_name: t.ClassVar[str] = "Q11579"
|
||||
units: t.ClassVar[list[str]] = list(t.get_args(TemperatureUnits))
|
||||
|
||||
units = list(typing.get_args(Units))
|
||||
|
||||
def __init__(self, value: float, unit: Units):
|
||||
if unit not in self.units:
|
||||
raise ValueError(f"invalid unit: {unit}")
|
||||
self.si: float = convert_to_si( # pylint: disable=invalid-name
|
||||
si_name=self.si_name,
|
||||
symbol=unit,
|
||||
value=value,
|
||||
)
|
||||
def __post_init__(self):
|
||||
if self.unit not in self.units:
|
||||
raise ValueError(f"invalid unit: {self.unit}")
|
||||
|
||||
def __str__(self):
|
||||
return self.l10n()
|
||||
|
||||
def value(self, unit: Units) -> float:
|
||||
return convert_from_si(si_name=self.si_name, symbol=unit, value=self.si)
|
||||
def value(self, unit: TemperatureUnits) -> float:
|
||||
if unit == self.unit:
|
||||
return self.val
|
||||
si_val = convert_to_si(si_name=self.si_name, symbol=self.unit, value=self.val)
|
||||
return convert_from_si(si_name=self.si_name, symbol=unit, value=si_val)
|
||||
|
||||
def l10n(
|
||||
self,
|
||||
unit: Units | None = None,
|
||||
unit: TemperatureUnits | None = None,
|
||||
locale: babel.Locale | GeoLocation | None = None,
|
||||
template: str = "{value} {unit}",
|
||||
num_pattern: str = "#,##0",
|
||||
@ -320,33 +318,35 @@ class Temperature:
|
||||
return template.format(value=val_str, unit=unit)
|
||||
|
||||
|
||||
@typing.final
|
||||
class Pressure:
|
||||
PressureUnits: t.TypeAlias = t.Literal["Pa", "hPa", "cm Hg", "bar"]
|
||||
|
||||
|
||||
class Pressure(msgspec.Struct, kw_only=True):
|
||||
"""Class for converting pressure units and for string representation of
|
||||
measured values."""
|
||||
|
||||
si_name = "Q44395"
|
||||
val: float
|
||||
unit: PressureUnits
|
||||
|
||||
Units = typing.Literal["Pa", "hPa", "cm Hg", "bar"]
|
||||
"""Supported units."""
|
||||
si_name: t.ClassVar[str] = "Q44395"
|
||||
units: t.ClassVar[list[str]] = list(t.get_args(PressureUnits))
|
||||
|
||||
units = list(typing.get_args(Units))
|
||||
|
||||
def __init__(self, value: float, unit: Units):
|
||||
if unit not in self.units:
|
||||
raise ValueError(f"invalid unit: {unit}")
|
||||
# pylint: disable=invalid-name
|
||||
self.si: float = convert_to_si(si_name=self.si_name, symbol=unit, value=value)
|
||||
def __post_init__(self):
|
||||
if self.unit not in self.units:
|
||||
raise ValueError(f"invalid unit: {self.unit}")
|
||||
|
||||
def __str__(self):
|
||||
return self.l10n()
|
||||
|
||||
def value(self, unit: Units) -> float:
|
||||
return convert_from_si(si_name=self.si_name, symbol=unit, value=self.si)
|
||||
def value(self, unit: PressureUnits) -> float:
|
||||
if unit == self.unit:
|
||||
return self.val
|
||||
si_val = convert_to_si(si_name=self.si_name, symbol=self.unit, value=self.val)
|
||||
return convert_from_si(si_name=self.si_name, symbol=unit, value=si_val)
|
||||
|
||||
def l10n(
|
||||
self,
|
||||
unit: Units | None = None,
|
||||
unit: PressureUnits | None = None,
|
||||
locale: babel.Locale | GeoLocation | None = None,
|
||||
template: str = "{value} {unit}",
|
||||
num_pattern: str = "#,##0",
|
||||
@ -363,8 +363,10 @@ class Pressure:
|
||||
return template.format(value=val_str, unit=unit)
|
||||
|
||||
|
||||
@typing.final
|
||||
class WindSpeed:
|
||||
WindSpeedUnits: t.TypeAlias = t.Literal["m/s", "km/h", "kn", "mph", "mi/h", "Bft"]
|
||||
|
||||
|
||||
class WindSpeed(msgspec.Struct, kw_only=True):
|
||||
"""Class for converting speed or velocity units and for string
|
||||
representation of measured values.
|
||||
|
||||
@ -375,28 +377,28 @@ class WindSpeed:
|
||||
(55.6 m/s)
|
||||
"""
|
||||
|
||||
si_name = "Q182429"
|
||||
val: float
|
||||
unit: WindSpeedUnits
|
||||
|
||||
Units = typing.Literal["m/s", "km/h", "kn", "mph", "mi/h", "Bft"]
|
||||
"""Supported units."""
|
||||
si_name: t.ClassVar[str] = "Q182429"
|
||||
units: t.ClassVar[list[str]] = list(t.get_args(WindSpeedUnits))
|
||||
|
||||
units = list(typing.get_args(Units))
|
||||
|
||||
def __init__(self, value: float, unit: Units):
|
||||
if unit not in self.units:
|
||||
raise ValueError(f"invalid unit: {unit}")
|
||||
# pylint: disable=invalid-name
|
||||
self.si: float = convert_to_si(si_name=self.si_name, symbol=unit, value=value)
|
||||
def __post_init__(self):
|
||||
if self.unit not in self.units:
|
||||
raise ValueError(f"invalid unit: {self.unit}")
|
||||
|
||||
def __str__(self):
|
||||
return self.l10n()
|
||||
|
||||
def value(self, unit: Units) -> float:
|
||||
return convert_from_si(si_name=self.si_name, symbol=unit, value=self.si)
|
||||
def value(self, unit: WindSpeedUnits) -> float:
|
||||
if unit == self.unit:
|
||||
return self.val
|
||||
si_val = convert_to_si(si_name=self.si_name, symbol=self.unit, value=self.val)
|
||||
return convert_from_si(si_name=self.si_name, symbol=unit, value=si_val)
|
||||
|
||||
def l10n(
|
||||
self,
|
||||
unit: Units | None = None,
|
||||
unit: WindSpeedUnits | None = None,
|
||||
locale: babel.Locale | GeoLocation | None = None,
|
||||
template: str = "{value} {unit}",
|
||||
num_pattern: str = "#,##0",
|
||||
@ -413,23 +415,23 @@ class WindSpeed:
|
||||
return template.format(value=val_str, unit=unit)
|
||||
|
||||
|
||||
@typing.final
|
||||
class RelativeHumidity:
|
||||
RelativeHumidityUnits: t.TypeAlias = t.Literal["%"]
|
||||
|
||||
|
||||
class RelativeHumidity(msgspec.Struct):
|
||||
"""Amount of relative humidity in the air. The unit is ``%``"""
|
||||
|
||||
Units = typing.Literal["%"]
|
||||
"""Supported unit."""
|
||||
val: float
|
||||
|
||||
units = list(typing.get_args(Units))
|
||||
|
||||
def __init__(self, humidity: float):
|
||||
self.humidity = humidity
|
||||
# there exists only one unit (%) --> set "%" as the final value (constant)
|
||||
unit: t.ClassVar["t.Final[RelativeHumidityUnits]"] = "%"
|
||||
units: t.ClassVar[list[str]] = list(t.get_args(RelativeHumidityUnits))
|
||||
|
||||
def __str__(self):
|
||||
return self.l10n()
|
||||
|
||||
def value(self) -> float:
|
||||
return self.humidity
|
||||
return self.val
|
||||
|
||||
def l10n(
|
||||
self,
|
||||
@ -447,45 +449,50 @@ class RelativeHumidity:
|
||||
return template.format(value=val_str, unit=unit)
|
||||
|
||||
|
||||
@typing.final
|
||||
class Compass:
|
||||
CompassPoint: t.TypeAlias = t.Literal[
|
||||
"N", "NNE", "NE", "ENE", "E", "ESE", "SE", "SSE", "S", "SSW", "SW", "WSW", "W", "WNW", "NW", "NNW"
|
||||
]
|
||||
"""Compass point type definition"""
|
||||
|
||||
CompassUnits: t.TypeAlias = t.Literal["°", "Point"]
|
||||
|
||||
|
||||
class Compass(msgspec.Struct):
|
||||
"""Class for converting compass points and azimuth values (360°)"""
|
||||
|
||||
Units = typing.Literal["°", "Point"]
|
||||
val: "float | int | CompassPoint"
|
||||
unit: CompassUnits = "°"
|
||||
|
||||
Point = typing.Literal[
|
||||
"N", "NNE", "NE", "ENE", "E", "ESE", "SE", "SSE", "S", "SSW", "SW", "WSW", "W", "WNW", "NW", "NNW"
|
||||
]
|
||||
"""Compass point type definition"""
|
||||
|
||||
TURN = 360.0
|
||||
TURN: t.ClassVar[float] = 360.0
|
||||
"""Full turn (360°)"""
|
||||
|
||||
POINTS = list(typing.get_args(Point))
|
||||
POINTS: t.ClassVar[list[CompassPoint]] = list(t.get_args(CompassPoint))
|
||||
"""Compass points."""
|
||||
|
||||
RANGE = TURN / len(POINTS)
|
||||
RANGE: t.ClassVar[float] = TURN / len(POINTS)
|
||||
"""Angle sector of a compass point"""
|
||||
|
||||
def __init__(self, azimuth: float | int | Point):
|
||||
if isinstance(azimuth, str):
|
||||
if azimuth not in self.POINTS:
|
||||
raise ValueError(f"Invalid compass point: {azimuth}")
|
||||
azimuth = self.POINTS.index(azimuth) * self.RANGE
|
||||
self.azimuth = azimuth % self.TURN
|
||||
def __post_init__(self):
|
||||
if isinstance(self.val, str):
|
||||
if self.val not in self.POINTS:
|
||||
raise ValueError(f"Invalid compass point: {self.val}")
|
||||
self.val = self.POINTS.index(self.val) * self.RANGE
|
||||
|
||||
self.val = self.val % self.TURN
|
||||
self.unit = "°"
|
||||
|
||||
def __str__(self):
|
||||
return self.l10n()
|
||||
|
||||
def value(self, unit: Units):
|
||||
if unit == "Point":
|
||||
return self.point(self.azimuth)
|
||||
def value(self, unit: CompassUnits):
|
||||
if unit == "Point" and isinstance(self.val, float):
|
||||
return self.point(self.val)
|
||||
if unit == "°":
|
||||
return self.azimuth
|
||||
return self.val
|
||||
raise ValueError(f"unknown unit: {unit}")
|
||||
|
||||
@classmethod
|
||||
def point(cls, azimuth: float | int) -> Point:
|
||||
def point(cls, azimuth: float | int) -> CompassPoint:
|
||||
"""Returns the compass point to an azimuth value."""
|
||||
azimuth = azimuth % cls.TURN
|
||||
# The angle sector of a compass point starts 1/2 sector range before
|
||||
@ -496,7 +503,7 @@ class Compass:
|
||||
|
||||
def l10n(
|
||||
self,
|
||||
unit: Units = "Point",
|
||||
unit: CompassUnits = "Point",
|
||||
locale: babel.Locale | GeoLocation | None = None,
|
||||
template: str = "{value}{unit}",
|
||||
num_pattern: str = "#,##0",
|
||||
@ -514,7 +521,7 @@ class Compass:
|
||||
return template.format(value=val_str, unit=unit)
|
||||
|
||||
|
||||
WeatherConditionType = typing.Literal[
|
||||
WeatherConditionType = t.Literal[
|
||||
# The capitalized string goes into to i18n/l10n (en: "Clear sky" -> de: "wolkenloser Himmel")
|
||||
"clear sky",
|
||||
"partly cloudy",
|
||||
@ -632,7 +639,7 @@ YR_WEATHER_SYMBOL_MAP = {
|
||||
if __name__ == "__main__":
|
||||
|
||||
# test: fetch all symbols of the type catalog ..
|
||||
for c in typing.get_args(WeatherConditionType):
|
||||
for c in t.get_args(WeatherConditionType):
|
||||
symbol_url(condition=c)
|
||||
|
||||
_cache = get_WEATHER_DATA_CACHE()
|
||||
|
||||
@ -16,6 +16,7 @@ from typing import Iterable, List, Tuple, TYPE_CHECKING
|
||||
from io import StringIO
|
||||
from codecs import getincrementalencoder
|
||||
|
||||
import msgspec
|
||||
from flask_babel import gettext, format_date # type: ignore
|
||||
|
||||
from searx import logger, get_setting
|
||||
@ -147,6 +148,8 @@ def write_csv_response(csv: CSVWriter, rc: "ResultContainer") -> None: # pylint
|
||||
|
||||
class JSONEncoder(json.JSONEncoder): # pylint: disable=missing-class-docstring
|
||||
def default(self, o):
|
||||
if isinstance(o, msgspec.Struct):
|
||||
return msgspec.to_builtins(o)
|
||||
if isinstance(o, datetime):
|
||||
return o.isoformat()
|
||||
if isinstance(o, timedelta):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user