From 045798e959ae32630640f9e869cc480ff1ab0b41 Mon Sep 17 00:00:00 2001 From: Hayden <64056131+hay-kot@users.noreply.github.com> Date: Sun, 10 Apr 2022 18:13:38 -0800 Subject: [PATCH] chore: drop-apscheduler (#1152) * rewrite interval timer * drop apscheduler * fix type annotations --- mealie/app.py | 15 ++-- mealie/services/scheduler/runner.py | 82 +++++++++++++++++++ mealie/services/scheduler/scheduled_func.py | 17 +--- .../services/scheduler/scheduler_registry.py | 11 +++ .../services/scheduler/scheduler_service.py | 63 ++------------ mealie/services/scheduler/tasks/__init__.py | 1 - mealie/services/scheduler/tasks/webhooks.py | 58 ------------- poetry.lock | 79 +----------------- pyproject.toml | 1 - tests/fixtures/fixture_admin.py | 3 +- tests/fixtures/fixture_users.py | 7 +- 11 files changed, 117 insertions(+), 220 deletions(-) create mode 100644 mealie/services/scheduler/runner.py delete mode 100644 mealie/services/scheduler/tasks/webhooks.py diff --git a/mealie/app.py b/mealie/app.py index b10d3a6505bb..cbc6ca19c3ca 100644 --- a/mealie/app.py +++ b/mealie/app.py @@ -50,19 +50,18 @@ app.add_middleware(GZipMiddleware, minimum_size=1000) register_debug_handler(app) -def start_scheduler(): - SchedulerService.start() - +async def start_scheduler(): SchedulerRegistry.register_daily( tasks.purge_group_registration, tasks.purge_password_reset_tokens, tasks.purge_group_data_exports, ) - SchedulerRegistry.register_hourly() - SchedulerRegistry.register_minutely(tasks.update_group_webhooks) + SchedulerRegistry.register_minutely(lambda: logger.info("Scheduler tick")) - logger.info(SchedulerService.scheduler.print_jobs()) + SchedulerRegistry.print_jobs() + + await SchedulerService.start() def api_routers(): @@ -75,8 +74,8 @@ api_routers() @app.on_event("startup") -def system_startup(): - start_scheduler() +async def system_startup(): + await start_scheduler() logger.info("-----SYSTEM STARTUP----- \n") logger.info("------APP SETTINGS------") diff --git a/mealie/services/scheduler/runner.py b/mealie/services/scheduler/runner.py new file mode 100644 index 000000000000..990c2f7d615f --- /dev/null +++ b/mealie/services/scheduler/runner.py @@ -0,0 +1,82 @@ +# Code Adapted/Copied From fastapi_utils +# https://github.com/dmontagu/fastapi-utils/blob/master/fastapi_utils/tasks.py + +import asyncio +import logging +from asyncio import ensure_future +from functools import wraps +from traceback import format_exception +from typing import Any, Callable, Coroutine, Optional, Union + +from starlette.concurrency import run_in_threadpool + +NoArgsNoReturnFuncT = Callable[[], None] +NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]] +NoArgsNoReturnDecorator = Callable[[Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]], NoArgsNoReturnAsyncFuncT] + + +def repeat_every( + *, + minutes: float, + wait_first: bool = False, + logger: Optional[logging.Logger] = None, + raise_exceptions: bool = False, + max_repetitions: Optional[int] = None, +) -> NoArgsNoReturnDecorator: + """ + This function returns a decorator that modifies a function so it is periodically re-executed after its first call. + The function it decorates should accept no arguments and return nothing. If necessary, this can be accomplished + by using `functools.partial` or otherwise wrapping the target function prior to decoration. + + Parameters + ---------- + seconds: float + The number of seconds to wait between repeated calls + wait_first: bool (default False) + If True, the function will wait for a single period before the first call + logger: Optional[logging.Logger] (default None) + The logger to use to log any exceptions raised by calls to the decorated function. + If not provided, exceptions will not be logged by this function (though they may be handled by the event loop). + raise_exceptions: bool (default False) + If True, errors raised by the decorated function will be raised to the event loop's exception handler. + Note that if an error is raised, the repeated execution will stop. + Otherwise, exceptions are just logged and the execution continues to repeat. + See https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.set_exception_handler for more info. + max_repetitions: Optional[int] (default None) + The maximum number of times to call the repeated function. If `None`, the function is repeated forever. + """ + + def decorator(func: Union[NoArgsNoReturnAsyncFuncT, NoArgsNoReturnFuncT]) -> NoArgsNoReturnAsyncFuncT: + """ + Converts the decorated function into a repeated, periodically-called version of itself. + """ + is_coroutine = asyncio.iscoroutinefunction(func) + + @wraps(func) + async def wrapped() -> None: + repetitions = 0 + + async def loop() -> None: + nonlocal repetitions + if wait_first: + await asyncio.sleep(minutes * 60) + while max_repetitions is None or repetitions < max_repetitions: + try: + if is_coroutine: + await func() # type: ignore + else: + await run_in_threadpool(func) + repetitions += 1 + except Exception as exc: + if logger is not None: + formatted_exception = "".join(format_exception(type(exc), exc, exc.__traceback__)) + logger.error(formatted_exception) + if raise_exceptions: + raise exc + await asyncio.sleep(minutes * 60) + + ensure_future(loop()) + + return wrapped + + return decorator diff --git a/mealie/services/scheduler/scheduled_func.py b/mealie/services/scheduler/scheduled_func.py index d7159bab6363..4614e2470ec3 100644 --- a/mealie/services/scheduler/scheduled_func.py +++ b/mealie/services/scheduler/scheduled_func.py @@ -1,21 +1,10 @@ from collections.abc import Callable -from dataclasses import dataclass +from dataclasses import dataclass, field from pydantic import BaseModel -@dataclass -class Cron: - hours: int - minutes: int - - @classmethod - def parse(cls, time_str: str) -> "Cron": - time = time_str.split(":") - return Cron(hours=int(time[0]), minutes=int(time[1])) - - -@dataclass +@dataclass(slots=True) class ScheduledFunc(BaseModel): id: tuple[str, int] name: str @@ -25,4 +14,4 @@ class ScheduledFunc(BaseModel): max_instances: int = 1 replace_existing: bool = True - args: list = [] + args: list = field(default_factory=list) diff --git a/mealie/services/scheduler/scheduler_registry.py b/mealie/services/scheduler/scheduler_registry.py index f75e6012e73d..ae8cbb65080d 100644 --- a/mealie/services/scheduler/scheduler_registry.py +++ b/mealie/services/scheduler/scheduler_registry.py @@ -46,3 +46,14 @@ class SchedulerRegistry: def remove_minutely(callback: Callable): logger.info(f"Removing minutely callback: {callback.__name__}") SchedulerRegistry._minutely.remove(callback) + + @staticmethod + def print_jobs(): + for job in SchedulerRegistry._daily: + logger.info(f"Daily job: {job.__name__}") + + for job in SchedulerRegistry._hourly: + logger.info(f"Hourly job: {job.__name__}") + + for job in SchedulerRegistry._minutely: + logger.info(f"Minutely job: {job.__name__}") diff --git a/mealie/services/scheduler/scheduler_service.py b/mealie/services/scheduler/scheduler_service.py index 75c92710323c..bf9f1ff49e44 100644 --- a/mealie/services/scheduler/scheduler_service.py +++ b/mealie/services/scheduler/scheduler_service.py @@ -1,73 +1,25 @@ from pathlib import Path -from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore -from apscheduler.schedulers.background import BackgroundScheduler - from mealie.core import root_logger +from mealie.services.scheduler.runner import repeat_every -from .scheduled_func import ScheduledFunc from .scheduler_registry import SchedulerRegistry logger = root_logger.get_logger() CWD = Path(__file__).parent -SCHEDULER_DB = CWD / ".scheduler.db" -SCHEDULER_DATABASE = f"sqlite:///{SCHEDULER_DB}" - MINUTES_DAY = 1440 -MINUTES_15 = 15 +MINUTES_5 = 5 MINUTES_HOUR = 60 class SchedulerService: - """ - SchedulerService is a wrapper class around the APScheduler library. It is resonpseible for interacting with the scheduler - and scheduling events. This includes the interval events that are registered in the SchedulerRegistry as well as cron events - that are used for sending webhooks. In most cases, unless the the schedule is dynamic, events should be registered with the - SchedulerRegistry. See app.py for examples. - """ - - _scheduler: BackgroundScheduler - @staticmethod - def start(): - # Preclean - SCHEDULER_DB.unlink(missing_ok=True) - - # Register Interval Jobs and Start Scheduler - SchedulerService._scheduler = BackgroundScheduler(jobstores={"default": SQLAlchemyJobStore(SCHEDULER_DATABASE)}) - SchedulerService._scheduler.add_job(run_daily, "interval", minutes=MINUTES_DAY, id="Daily Interval Jobs") - SchedulerService._scheduler.add_job(run_hourly, "interval", minutes=MINUTES_HOUR, id="Hourly Interval Jobs") - SchedulerService._scheduler.add_job(run_minutely, "interval", minutes=MINUTES_15, id="Regular Interval Jobs") - SchedulerService._scheduler.start() - - @classmethod - @property - def scheduler(cls) -> BackgroundScheduler: - return SchedulerService._scheduler - - @staticmethod - def add_cron_job(job_func: ScheduledFunc): - SchedulerService.scheduler.add_job( # type: ignore - job_func.callback, - trigger="cron", - name=job_func.id, - hour=job_func.hour, - minute=job_func.minutes, - max_instances=job_func.max_instances, # type: ignore - replace_existing=job_func.replace_existing, - args=job_func.args, - ) - - @staticmethod - def update_cron_job(job_func: ScheduledFunc): - SchedulerService.scheduler.reschedule_job( # type: ignore - job_func.id, - trigger="cron", - hour=job_func.hour, - minute=job_func.minutes, - ) + async def start(): + await run_minutely() + await run_daily() + await run_hourly() def _scheduled_task_wrapper(callable): @@ -77,18 +29,21 @@ def _scheduled_task_wrapper(callable): logger.error(f"Error in scheduled task func='{callable.__name__}': exception='{e}'") +@repeat_every(minutes=MINUTES_DAY, wait_first=True, logger=logger) def run_daily(): logger.info("Running daily callbacks") for func in SchedulerRegistry._daily: _scheduled_task_wrapper(func) +@repeat_every(minutes=MINUTES_HOUR, wait_first=True, logger=logger) def run_hourly(): logger.info("Running hourly callbacks") for func in SchedulerRegistry._hourly: _scheduled_task_wrapper(func) +@repeat_every(minutes=MINUTES_5, wait_first=True, logger=logger) def run_minutely(): logger.info("Running minutely callbacks") for func in SchedulerRegistry._minutely: diff --git a/mealie/services/scheduler/tasks/__init__.py b/mealie/services/scheduler/tasks/__init__.py index d4446d33bb1e..07193f327e7e 100644 --- a/mealie/services/scheduler/tasks/__init__.py +++ b/mealie/services/scheduler/tasks/__init__.py @@ -1,7 +1,6 @@ from .purge_group_exports import * from .purge_password_reset import * from .purge_registration import * -from .webhooks import * """ Tasks Package diff --git a/mealie/services/scheduler/tasks/webhooks.py b/mealie/services/scheduler/tasks/webhooks.py deleted file mode 100644 index d9aff970fccd..000000000000 --- a/mealie/services/scheduler/tasks/webhooks.py +++ /dev/null @@ -1,58 +0,0 @@ -import json - -import requests -from sqlalchemy.orm.session import Session - -from mealie.core import root_logger -from mealie.db.db_setup import create_session -from mealie.repos.all_repositories import get_repositories -from mealie.schema.group.webhook import ReadWebhook - -from ..scheduled_func import Cron, ScheduledFunc -from ..scheduler_service import SchedulerService - -logger = root_logger.get_logger() - - -def post_webhooks(webhook_id: int, session: Session = None): - session = session or create_session() - db = get_repositories(session) - webhook: ReadWebhook = db.webhooks.get_one(webhook_id) - - if not webhook.enabled: - logger.info(f"Skipping webhook {webhook_id}. reasons: is disabled") - return - - todays_recipe = db.meals.get_today(webhook.group_id) - - if not todays_recipe: - return - - payload = json.loads([x.json(by_alias=True) for x in todays_recipe]) # type: ignore - response = requests.post(webhook.url, json=payload) - - if response.status_code != 200: - logger.error(f"Error posting webhook to {webhook.url} ({response.status_code})") - - session.close() - - -def update_group_webhooks(): - session = create_session() - db = get_repositories(session) - - webhooks: list[ReadWebhook] = db.webhooks.get_all() - - for webhook in webhooks: - cron = Cron.parse(webhook.time) - - job_func = ScheduledFunc( - id=webhook.id, - name=f"Group {webhook.group_id} webhook", - callback=post_webhooks, - hour=cron.hours, - minute=cron.minutes, - args=(webhook.id), - ) - - SchedulerService.add_cron_job(job_func) diff --git a/poetry.lock b/poetry.lock index b1de0668991e..6c3d220d682a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -70,32 +70,6 @@ requests = "*" requests-oauthlib = "*" six = "*" -[[package]] -name = "apscheduler" -version = "3.8.1" -description = "In-process task scheduler with Cron-like capabilities" -category = "main" -optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4" - -[package.dependencies] -pytz = "*" -six = ">=1.4.0" -tzlocal = ">=2.0,<3.0.0 || >=4.0.0" - -[package.extras] -asyncio = ["trollius"] -doc = ["sphinx", "sphinx-rtd-theme"] -gevent = ["gevent"] -mongodb = ["pymongo (>=3.0)"] -redis = ["redis (>=3.0)"] -rethinkdb = ["rethinkdb (>=2.4.0)"] -sqlalchemy = ["sqlalchemy (>=0.8)"] -testing = ["pytest (<6)", "pytest-cov", "pytest-tornado5", "mock", "pytest-asyncio (<0.6)", "pytest-asyncio"] -tornado = ["tornado (>=4.3)"] -twisted = ["twisted"] -zookeeper = ["kazoo"] - [[package]] name = "astroid" version = "2.9.3" @@ -1150,25 +1124,6 @@ text-unidecode = ">=1.3" [package.extras] unidecode = ["Unidecode (>=1.1.1)"] -[[package]] -name = "pytz" -version = "2021.3" -description = "World timezone definitions, modern and historical" -category = "main" -optional = false -python-versions = "*" - -[[package]] -name = "pytz-deprecation-shim" -version = "0.1.0.post0" -description = "Shims to make deprecation of pytz easier" -category = "main" -optional = false -python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7" - -[package.dependencies] -tzdata = {version = "*", markers = "python_version >= \"3.6\""} - [[package]] name = "pyyaml" version = "5.4.1" @@ -1434,22 +1389,6 @@ category = "main" optional = false python-versions = ">=2" -[[package]] -name = "tzlocal" -version = "4.1" -description = "tzinfo object for the local timezone" -category = "main" -optional = false -python-versions = ">=3.6" - -[package.dependencies] -pytz-deprecation-shim = "*" -tzdata = {version = "*", markers = "platform_system == \"Windows\""} - -[package.extras] -devenv = ["black", "pyroma", "pytest-cov", "zest.releaser"] -test = ["pytest-mock (>=3.3)", "pytest (>=4.3)"] - [[package]] name = "urllib3" version = "1.26.8" @@ -1588,7 +1527,7 @@ pgsql = ["psycopg2-binary"] [metadata] lock-version = "1.1" python-versions = "^3.10" -content-hash = "576653c13d9b6220769f2a166cb65ac7bc9cd5d5298e03b45002cd68eef66ede" +content-hash = "0de62ed339982506e2402f40eb604fc152ed4689c1e07fe835d46312e0dfa5f2" [metadata.files] aiofiles = [ @@ -1615,10 +1554,6 @@ apprise = [ {file = "apprise-0.9.6-py2.py3-none-any.whl", hash = "sha256:c8fe142102e3d6410c5b04b4a10c4c99b267f8e946126c0105954ba84aca39ac"}, {file = "apprise-0.9.6.tar.gz", hash = "sha256:15ed06208197c9d28fd83cd69e727b48280879a4c611afd6b5aea5d0def397f8"}, ] -apscheduler = [ - {file = "APScheduler-3.8.1-py2.py3-none-any.whl", hash = "sha256:c22cb14b411a31435eb2c530dfbbec948ac63015b517087c7978adb61b574865"}, - {file = "APScheduler-3.8.1.tar.gz", hash = "sha256:5cf344ebcfbdaa48ae178c029c055cec7bc7a4a47c21e315e4d1f08bd35f2355"}, -] astroid = [ {file = "astroid-2.9.3-py3-none-any.whl", hash = "sha256:506daabe5edffb7e696ad82483ad0228245a9742ed7d2d8c9cdb31537decf9f6"}, {file = "astroid-2.9.3.tar.gz", hash = "sha256:1efdf4e867d4d8ba4a9f6cf9ce07cd182c4c41de77f23814feb27ca93ca9d877"}, @@ -2460,14 +2395,6 @@ python-multipart = [ python-slugify = [ {file = "python-slugify-4.0.1.tar.gz", hash = "sha256:69a517766e00c1268e5bbfc0d010a0a8508de0b18d30ad5a1ff357f8ae724270"}, ] -pytz = [ - {file = "pytz-2021.3-py2.py3-none-any.whl", hash = "sha256:3672058bc3453457b622aab7a1c3bfd5ab0bdae451512f6cf25f64ed37f5b87c"}, - {file = "pytz-2021.3.tar.gz", hash = "sha256:acad2d8b20a1af07d4e4c9d2e9285c5ed9104354062f275f3fcd88dcef4f1326"}, -] -pytz-deprecation-shim = [ - {file = "pytz_deprecation_shim-0.1.0.post0-py2.py3-none-any.whl", hash = "sha256:8314c9692a636c8eb3bda879b9f119e350e93223ae83e70e80c31675a0fdc1a6"}, - {file = "pytz_deprecation_shim-0.1.0.post0.tar.gz", hash = "sha256:af097bae1b616dde5c5744441e2ddc69e74dfdcb0c263129610d85b87445a59d"}, -] pyyaml = [ {file = "PyYAML-5.4.1-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:3b2b1824fe7112845700f815ff6a489360226a5609b96ec2190a45e62a9fc922"}, {file = "PyYAML-5.4.1-cp27-cp27m-win32.whl", hash = "sha256:129def1b7c1bf22faffd67b8f3724645203b79d8f4cc81f674654d9902cb4393"}, @@ -2622,10 +2549,6 @@ tzdata = [ {file = "tzdata-2021.5-py2.py3-none-any.whl", hash = "sha256:3eee491e22ebfe1e5cfcc97a4137cd70f092ce59144d81f8924a844de05ba8f5"}, {file = "tzdata-2021.5.tar.gz", hash = "sha256:68dbe41afd01b867894bbdfd54fa03f468cfa4f0086bfb4adcd8de8f24f3ee21"}, ] -tzlocal = [ - {file = "tzlocal-4.1-py3-none-any.whl", hash = "sha256:28ba8d9fcb6c9a782d6e0078b4f6627af1ea26aeaa32b4eab5324abc7df4149f"}, - {file = "tzlocal-4.1.tar.gz", hash = "sha256:0f28015ac68a5c067210400a9197fc5d36ba9bc3f8eaf1da3cbd59acdfed9e09"}, -] urllib3 = [ {file = "urllib3-1.26.8-py2.py3-none-any.whl", hash = "sha256:000ca7f471a233c2251c6c7023ee85305721bfdf18621ebff4fd17a8653427ed"}, {file = "urllib3-1.26.8.tar.gz", hash = "sha256:0e7c33d9a63e7ddfcb86780aac87befc2fbddf46c58dbb487e0855f7ceec283c"}, diff --git a/pyproject.toml b/pyproject.toml index 652a72b0f796..ef6f9750378a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,6 @@ aniso8601 = "7.0.0" appdirs = "1.4.4" fastapi = "^0.75.1" uvicorn = {extras = ["standard"], version = "^0.13.0"} -APScheduler = "^3.8.1" SQLAlchemy = "^1.4.29" alembic = "^1.7.5" Jinja2 = "^2.11.2" diff --git a/tests/fixtures/fixture_admin.py b/tests/fixtures/fixture_admin.py index 640262dee21d..60e0994fe167 100644 --- a/tests/fixtures/fixture_admin.py +++ b/tests/fixtures/fixture_admin.py @@ -1,4 +1,3 @@ -import requests from pytest import fixture from starlette.testclient import TestClient @@ -7,7 +6,7 @@ from tests import utils @fixture(scope="session") -def admin_token(api_client: requests, api_routes: utils.AppRoutes): +def admin_token(api_client: TestClient, api_routes: utils.AppRoutes): settings = get_app_settings() form_data = {"username": "changeme@email.com", "password": settings.DEFAULT_PASSWORD} diff --git a/tests/fixtures/fixture_users.py b/tests/fixtures/fixture_users.py index 4f1ae3216083..87e206c5cbb3 100644 --- a/tests/fixtures/fixture_users.py +++ b/tests/fixtures/fixture_users.py @@ -1,6 +1,5 @@ import json -import requests from pytest import fixture from starlette.testclient import TestClient @@ -8,7 +7,7 @@ from tests import utils from tests.utils.factories import random_string -def build_unique_user(group: str, api_client: requests) -> utils.TestUser: +def build_unique_user(group: str, api_client: TestClient) -> utils.TestUser: api_routes = utils.AppRoutes() group = group or random_string(12) @@ -103,7 +102,7 @@ def unique_user(api_client: TestClient, api_routes: utils.AppRoutes): @fixture(scope="module") -def user_tuple(admin_token, api_client: requests, api_routes: utils.AppRoutes) -> tuple[utils.TestUser]: +def user_tuple(admin_token, api_client: TestClient, api_routes: utils.AppRoutes) -> tuple[utils.TestUser]: group_name = utils.random_string() # Create the user create_data_1 = { @@ -157,7 +156,7 @@ def user_tuple(admin_token, api_client: requests, api_routes: utils.AppRoutes) - @fixture(scope="session") -def user_token(admin_token, api_client: requests, api_routes: utils.AppRoutes): +def user_token(admin_token, api_client: TestClient, api_routes: utils.AppRoutes): # Create the user create_data = { "fullName": utils.random_string(),