chore: drop-apscheduler (#1152)

* rewrite interval timer

* drop apscheduler

* fix type annotations
This commit is contained in:
Hayden 2022-04-10 18:13:38 -08:00 committed by GitHub
parent 4f55020a58
commit 045798e959
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 117 additions and 220 deletions

View File

@ -50,19 +50,18 @@ app.add_middleware(GZipMiddleware, minimum_size=1000)
register_debug_handler(app)
def start_scheduler():
SchedulerService.start()
async def start_scheduler():
SchedulerRegistry.register_daily(
tasks.purge_group_registration,
tasks.purge_password_reset_tokens,
tasks.purge_group_data_exports,
)
SchedulerRegistry.register_hourly()
SchedulerRegistry.register_minutely(tasks.update_group_webhooks)
SchedulerRegistry.register_minutely(lambda: logger.info("Scheduler tick"))
logger.info(SchedulerService.scheduler.print_jobs())
SchedulerRegistry.print_jobs()
await SchedulerService.start()
def api_routers():
@ -75,8 +74,8 @@ api_routers()
@app.on_event("startup")
def system_startup():
start_scheduler()
async def system_startup():
await start_scheduler()
logger.info("-----SYSTEM STARTUP----- \n")
logger.info("------APP SETTINGS------")

View File

@ -0,0 +1,82 @@
# Code Adapted/Copied From fastapi_utils
# https://github.com/dmontagu/fastapi-utils/blob/master/fastapi_utils/tasks.py
import asyncio
import logging
from asyncio import ensure_future
from functools import wraps
from traceback import format_exception
from typing import Any, Callable, Coroutine, Optional, Union
from starlette.concurrency import run_in_threadpool
NoArgsNoReturnFuncT = Callable[[], None]
NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]]
NoArgsNoReturnDecorator = Callable[[Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]], NoArgsNoReturnAsyncFuncT]
def repeat_every(
*,
minutes: float,
wait_first: bool = False,
logger: Optional[logging.Logger] = None,
raise_exceptions: bool = False,
max_repetitions: Optional[int] = None,
) -> NoArgsNoReturnDecorator:
"""
This function returns a decorator that modifies a function so it is periodically re-executed after its first call.
The function it decorates should accept no arguments and return nothing. If necessary, this can be accomplished
by using `functools.partial` or otherwise wrapping the target function prior to decoration.
Parameters
----------
seconds: float
The number of seconds to wait between repeated calls
wait_first: bool (default False)
If True, the function will wait for a single period before the first call
logger: Optional[logging.Logger] (default None)
The logger to use to log any exceptions raised by calls to the decorated function.
If not provided, exceptions will not be logged by this function (though they may be handled by the event loop).
raise_exceptions: bool (default False)
If True, errors raised by the decorated function will be raised to the event loop's exception handler.
Note that if an error is raised, the repeated execution will stop.
Otherwise, exceptions are just logged and the execution continues to repeat.
See https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.set_exception_handler for more info.
max_repetitions: Optional[int] (default None)
The maximum number of times to call the repeated function. If `None`, the function is repeated forever.
"""
def decorator(func: Union[NoArgsNoReturnAsyncFuncT, NoArgsNoReturnFuncT]) -> NoArgsNoReturnAsyncFuncT:
"""
Converts the decorated function into a repeated, periodically-called version of itself.
"""
is_coroutine = asyncio.iscoroutinefunction(func)
@wraps(func)
async def wrapped() -> None:
repetitions = 0
async def loop() -> None:
nonlocal repetitions
if wait_first:
await asyncio.sleep(minutes * 60)
while max_repetitions is None or repetitions < max_repetitions:
try:
if is_coroutine:
await func() # type: ignore
else:
await run_in_threadpool(func)
repetitions += 1
except Exception as exc:
if logger is not None:
formatted_exception = "".join(format_exception(type(exc), exc, exc.__traceback__))
logger.error(formatted_exception)
if raise_exceptions:
raise exc
await asyncio.sleep(minutes * 60)
ensure_future(loop())
return wrapped
return decorator

View File

@ -1,21 +1,10 @@
from collections.abc import Callable
from dataclasses import dataclass
from dataclasses import dataclass, field
from pydantic import BaseModel
@dataclass
class Cron:
hours: int
minutes: int
@classmethod
def parse(cls, time_str: str) -> "Cron":
time = time_str.split(":")
return Cron(hours=int(time[0]), minutes=int(time[1]))
@dataclass
@dataclass(slots=True)
class ScheduledFunc(BaseModel):
id: tuple[str, int]
name: str
@ -25,4 +14,4 @@ class ScheduledFunc(BaseModel):
max_instances: int = 1
replace_existing: bool = True
args: list = []
args: list = field(default_factory=list)

View File

@ -46,3 +46,14 @@ class SchedulerRegistry:
def remove_minutely(callback: Callable):
logger.info(f"Removing minutely callback: {callback.__name__}")
SchedulerRegistry._minutely.remove(callback)
@staticmethod
def print_jobs():
for job in SchedulerRegistry._daily:
logger.info(f"Daily job: {job.__name__}")
for job in SchedulerRegistry._hourly:
logger.info(f"Hourly job: {job.__name__}")
for job in SchedulerRegistry._minutely:
logger.info(f"Minutely job: {job.__name__}")

View File

@ -1,73 +1,25 @@
from pathlib import Path
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from mealie.core import root_logger
from mealie.services.scheduler.runner import repeat_every
from .scheduled_func import ScheduledFunc
from .scheduler_registry import SchedulerRegistry
logger = root_logger.get_logger()
CWD = Path(__file__).parent
SCHEDULER_DB = CWD / ".scheduler.db"
SCHEDULER_DATABASE = f"sqlite:///{SCHEDULER_DB}"
MINUTES_DAY = 1440
MINUTES_15 = 15
MINUTES_5 = 5
MINUTES_HOUR = 60
class SchedulerService:
"""
SchedulerService is a wrapper class around the APScheduler library. It is resonpseible for interacting with the scheduler
and scheduling events. This includes the interval events that are registered in the SchedulerRegistry as well as cron events
that are used for sending webhooks. In most cases, unless the the schedule is dynamic, events should be registered with the
SchedulerRegistry. See app.py for examples.
"""
_scheduler: BackgroundScheduler
@staticmethod
def start():
# Preclean
SCHEDULER_DB.unlink(missing_ok=True)
# Register Interval Jobs and Start Scheduler
SchedulerService._scheduler = BackgroundScheduler(jobstores={"default": SQLAlchemyJobStore(SCHEDULER_DATABASE)})
SchedulerService._scheduler.add_job(run_daily, "interval", minutes=MINUTES_DAY, id="Daily Interval Jobs")
SchedulerService._scheduler.add_job(run_hourly, "interval", minutes=MINUTES_HOUR, id="Hourly Interval Jobs")
SchedulerService._scheduler.add_job(run_minutely, "interval", minutes=MINUTES_15, id="Regular Interval Jobs")
SchedulerService._scheduler.start()
@classmethod
@property
def scheduler(cls) -> BackgroundScheduler:
return SchedulerService._scheduler
@staticmethod
def add_cron_job(job_func: ScheduledFunc):
SchedulerService.scheduler.add_job( # type: ignore
job_func.callback,
trigger="cron",
name=job_func.id,
hour=job_func.hour,
minute=job_func.minutes,
max_instances=job_func.max_instances, # type: ignore
replace_existing=job_func.replace_existing,
args=job_func.args,
)
@staticmethod
def update_cron_job(job_func: ScheduledFunc):
SchedulerService.scheduler.reschedule_job( # type: ignore
job_func.id,
trigger="cron",
hour=job_func.hour,
minute=job_func.minutes,
)
async def start():
await run_minutely()
await run_daily()
await run_hourly()
def _scheduled_task_wrapper(callable):
@ -77,18 +29,21 @@ def _scheduled_task_wrapper(callable):
logger.error(f"Error in scheduled task func='{callable.__name__}': exception='{e}'")
@repeat_every(minutes=MINUTES_DAY, wait_first=True, logger=logger)
def run_daily():
logger.info("Running daily callbacks")
for func in SchedulerRegistry._daily:
_scheduled_task_wrapper(func)
@repeat_every(minutes=MINUTES_HOUR, wait_first=True, logger=logger)
def run_hourly():
logger.info("Running hourly callbacks")
for func in SchedulerRegistry._hourly:
_scheduled_task_wrapper(func)
@repeat_every(minutes=MINUTES_5, wait_first=True, logger=logger)
def run_minutely():
logger.info("Running minutely callbacks")
for func in SchedulerRegistry._minutely:

View File

@ -1,7 +1,6 @@
from .purge_group_exports import *
from .purge_password_reset import *
from .purge_registration import *
from .webhooks import *
"""
Tasks Package

View File

@ -1,58 +0,0 @@
import json
import requests
from sqlalchemy.orm.session import Session
from mealie.core import root_logger
from mealie.db.db_setup import create_session
from mealie.repos.all_repositories import get_repositories
from mealie.schema.group.webhook import ReadWebhook
from ..scheduled_func import Cron, ScheduledFunc
from ..scheduler_service import SchedulerService
logger = root_logger.get_logger()
def post_webhooks(webhook_id: int, session: Session = None):
session = session or create_session()
db = get_repositories(session)
webhook: ReadWebhook = db.webhooks.get_one(webhook_id)
if not webhook.enabled:
logger.info(f"Skipping webhook {webhook_id}. reasons: is disabled")
return
todays_recipe = db.meals.get_today(webhook.group_id)
if not todays_recipe:
return
payload = json.loads([x.json(by_alias=True) for x in todays_recipe]) # type: ignore
response = requests.post(webhook.url, json=payload)
if response.status_code != 200:
logger.error(f"Error posting webhook to {webhook.url} ({response.status_code})")
session.close()
def update_group_webhooks():
session = create_session()
db = get_repositories(session)
webhooks: list[ReadWebhook] = db.webhooks.get_all()
for webhook in webhooks:
cron = Cron.parse(webhook.time)
job_func = ScheduledFunc(
id=webhook.id,
name=f"Group {webhook.group_id} webhook",
callback=post_webhooks,
hour=cron.hours,
minute=cron.minutes,
args=(webhook.id),
)
SchedulerService.add_cron_job(job_func)

79
poetry.lock generated
View File

@ -70,32 +70,6 @@ requests = "*"
requests-oauthlib = "*"
six = "*"
[[package]]
name = "apscheduler"
version = "3.8.1"
description = "In-process task scheduler with Cron-like capabilities"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4"
[package.dependencies]
pytz = "*"
six = ">=1.4.0"
tzlocal = ">=2.0,<3.0.0 || >=4.0.0"
[package.extras]
asyncio = ["trollius"]
doc = ["sphinx", "sphinx-rtd-theme"]
gevent = ["gevent"]
mongodb = ["pymongo (>=3.0)"]
redis = ["redis (>=3.0)"]
rethinkdb = ["rethinkdb (>=2.4.0)"]
sqlalchemy = ["sqlalchemy (>=0.8)"]
testing = ["pytest (<6)", "pytest-cov", "pytest-tornado5", "mock", "pytest-asyncio (<0.6)", "pytest-asyncio"]
tornado = ["tornado (>=4.3)"]
twisted = ["twisted"]
zookeeper = ["kazoo"]
[[package]]
name = "astroid"
version = "2.9.3"
@ -1150,25 +1124,6 @@ text-unidecode = ">=1.3"
[package.extras]
unidecode = ["Unidecode (>=1.1.1)"]
[[package]]
name = "pytz"
version = "2021.3"
description = "World timezone definitions, modern and historical"
category = "main"
optional = false
python-versions = "*"
[[package]]
name = "pytz-deprecation-shim"
version = "0.1.0.post0"
description = "Shims to make deprecation of pytz easier"
category = "main"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7"
[package.dependencies]
tzdata = {version = "*", markers = "python_version >= \"3.6\""}
[[package]]
name = "pyyaml"
version = "5.4.1"
@ -1434,22 +1389,6 @@ category = "main"
optional = false
python-versions = ">=2"
[[package]]
name = "tzlocal"
version = "4.1"
description = "tzinfo object for the local timezone"
category = "main"
optional = false
python-versions = ">=3.6"
[package.dependencies]
pytz-deprecation-shim = "*"
tzdata = {version = "*", markers = "platform_system == \"Windows\""}
[package.extras]
devenv = ["black", "pyroma", "pytest-cov", "zest.releaser"]
test = ["pytest-mock (>=3.3)", "pytest (>=4.3)"]
[[package]]
name = "urllib3"
version = "1.26.8"
@ -1588,7 +1527,7 @@ pgsql = ["psycopg2-binary"]
[metadata]
lock-version = "1.1"
python-versions = "^3.10"
content-hash = "576653c13d9b6220769f2a166cb65ac7bc9cd5d5298e03b45002cd68eef66ede"
content-hash = "0de62ed339982506e2402f40eb604fc152ed4689c1e07fe835d46312e0dfa5f2"
[metadata.files]
aiofiles = [
@ -1615,10 +1554,6 @@ apprise = [
{file = "apprise-0.9.6-py2.py3-none-any.whl", hash = "sha256:c8fe142102e3d6410c5b04b4a10c4c99b267f8e946126c0105954ba84aca39ac"},
{file = "apprise-0.9.6.tar.gz", hash = "sha256:15ed06208197c9d28fd83cd69e727b48280879a4c611afd6b5aea5d0def397f8"},
]
apscheduler = [
{file = "APScheduler-3.8.1-py2.py3-none-any.whl", hash = "sha256:c22cb14b411a31435eb2c530dfbbec948ac63015b517087c7978adb61b574865"},
{file = "APScheduler-3.8.1.tar.gz", hash = "sha256:5cf344ebcfbdaa48ae178c029c055cec7bc7a4a47c21e315e4d1f08bd35f2355"},
]
astroid = [
{file = "astroid-2.9.3-py3-none-any.whl", hash = "sha256:506daabe5edffb7e696ad82483ad0228245a9742ed7d2d8c9cdb31537decf9f6"},
{file = "astroid-2.9.3.tar.gz", hash = "sha256:1efdf4e867d4d8ba4a9f6cf9ce07cd182c4c41de77f23814feb27ca93ca9d877"},
@ -2460,14 +2395,6 @@ python-multipart = [
python-slugify = [
{file = "python-slugify-4.0.1.tar.gz", hash = "sha256:69a517766e00c1268e5bbfc0d010a0a8508de0b18d30ad5a1ff357f8ae724270"},
]
pytz = [
{file = "pytz-2021.3-py2.py3-none-any.whl", hash = "sha256:3672058bc3453457b622aab7a1c3bfd5ab0bdae451512f6cf25f64ed37f5b87c"},
{file = "pytz-2021.3.tar.gz", hash = "sha256:acad2d8b20a1af07d4e4c9d2e9285c5ed9104354062f275f3fcd88dcef4f1326"},
]
pytz-deprecation-shim = [
{file = "pytz_deprecation_shim-0.1.0.post0-py2.py3-none-any.whl", hash = "sha256:8314c9692a636c8eb3bda879b9f119e350e93223ae83e70e80c31675a0fdc1a6"},
{file = "pytz_deprecation_shim-0.1.0.post0.tar.gz", hash = "sha256:af097bae1b616dde5c5744441e2ddc69e74dfdcb0c263129610d85b87445a59d"},
]
pyyaml = [
{file = "PyYAML-5.4.1-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:3b2b1824fe7112845700f815ff6a489360226a5609b96ec2190a45e62a9fc922"},
{file = "PyYAML-5.4.1-cp27-cp27m-win32.whl", hash = "sha256:129def1b7c1bf22faffd67b8f3724645203b79d8f4cc81f674654d9902cb4393"},
@ -2622,10 +2549,6 @@ tzdata = [
{file = "tzdata-2021.5-py2.py3-none-any.whl", hash = "sha256:3eee491e22ebfe1e5cfcc97a4137cd70f092ce59144d81f8924a844de05ba8f5"},
{file = "tzdata-2021.5.tar.gz", hash = "sha256:68dbe41afd01b867894bbdfd54fa03f468cfa4f0086bfb4adcd8de8f24f3ee21"},
]
tzlocal = [
{file = "tzlocal-4.1-py3-none-any.whl", hash = "sha256:28ba8d9fcb6c9a782d6e0078b4f6627af1ea26aeaa32b4eab5324abc7df4149f"},
{file = "tzlocal-4.1.tar.gz", hash = "sha256:0f28015ac68a5c067210400a9197fc5d36ba9bc3f8eaf1da3cbd59acdfed9e09"},
]
urllib3 = [
{file = "urllib3-1.26.8-py2.py3-none-any.whl", hash = "sha256:000ca7f471a233c2251c6c7023ee85305721bfdf18621ebff4fd17a8653427ed"},
{file = "urllib3-1.26.8.tar.gz", hash = "sha256:0e7c33d9a63e7ddfcb86780aac87befc2fbddf46c58dbb487e0855f7ceec283c"},

View File

@ -15,7 +15,6 @@ aniso8601 = "7.0.0"
appdirs = "1.4.4"
fastapi = "^0.75.1"
uvicorn = {extras = ["standard"], version = "^0.13.0"}
APScheduler = "^3.8.1"
SQLAlchemy = "^1.4.29"
alembic = "^1.7.5"
Jinja2 = "^2.11.2"

View File

@ -1,4 +1,3 @@
import requests
from pytest import fixture
from starlette.testclient import TestClient
@ -7,7 +6,7 @@ from tests import utils
@fixture(scope="session")
def admin_token(api_client: requests, api_routes: utils.AppRoutes):
def admin_token(api_client: TestClient, api_routes: utils.AppRoutes):
settings = get_app_settings()
form_data = {"username": "changeme@email.com", "password": settings.DEFAULT_PASSWORD}

View File

@ -1,6 +1,5 @@
import json
import requests
from pytest import fixture
from starlette.testclient import TestClient
@ -8,7 +7,7 @@ from tests import utils
from tests.utils.factories import random_string
def build_unique_user(group: str, api_client: requests) -> utils.TestUser:
def build_unique_user(group: str, api_client: TestClient) -> utils.TestUser:
api_routes = utils.AppRoutes()
group = group or random_string(12)
@ -103,7 +102,7 @@ def unique_user(api_client: TestClient, api_routes: utils.AppRoutes):
@fixture(scope="module")
def user_tuple(admin_token, api_client: requests, api_routes: utils.AppRoutes) -> tuple[utils.TestUser]:
def user_tuple(admin_token, api_client: TestClient, api_routes: utils.AppRoutes) -> tuple[utils.TestUser]:
group_name = utils.random_string()
# Create the user
create_data_1 = {
@ -157,7 +156,7 @@ def user_tuple(admin_token, api_client: requests, api_routes: utils.AppRoutes) -
@fixture(scope="session")
def user_token(admin_token, api_client: requests, api_routes: utils.AppRoutes):
def user_token(admin_token, api_client: TestClient, api_routes: utils.AppRoutes):
# Create the user
create_data = {
"fullName": utils.random_string(),