From 23c039b42d1b437b622d38da0864b6071f586457 Mon Sep 17 00:00:00 2001 From: Michael Genson <71845777+michael-genson@users.noreply.github.com> Date: Sat, 27 Aug 2022 13:52:45 -0500 Subject: [PATCH] refactor: event bus refactor (#1574) * refactored event dispatching added EventDocumentType and EventOperation to Event added event listeners to bulk recipe changes overhauled shopping list item events to be more useful modified shopping list item repo to return more information * added internal documentation for event types * renamed message_types.py to event_types.py * added unique event id and fixed instantiation * generalized event listeners and publishers moved apprise publisher to new apprise event listener fixed duplicate message bug with apprise publisher * added JWT field for user-specified integration id * removed obselete test notification route * tuned up existing notification tests * added dependency to get integration_id from jwt * added base crud controller to facilitate events * simplified event publishing * temporarily fixed test notification --- mealie/core/dependencies/dependencies.py | 16 ++ mealie/routes/_base/base_controllers.py | 22 +- mealie/routes/groups/controller_cookbooks.py | 70 +++--- .../groups/controller_group_notifications.py | 22 +- .../groups/controller_shopping_lists.py | 221 ++++++++++-------- .../organizers/controller_categories.py | 61 +++-- mealie/routes/organizers/controller_tags.py | 71 +++--- mealie/routes/recipe/recipe_crud_routes.py | 109 ++++----- mealie/routes/users/api_tokens.py | 11 +- mealie/schema/user/user.py | 2 + .../event_bus_service/event_bus_listeners.py | 80 +++++++ .../event_bus_service/event_bus_service.py | 82 ++----- .../services/event_bus_service/event_types.py | 148 ++++++++++++ .../event_bus_service/message_types.py | 47 ---- .../services/event_bus_service/publisher.py | 17 +- .../services/group_services/shopping_lists.py | 107 +++++++-- .../test_group_notifications.py | 37 ++- 17 files changed, 720 insertions(+), 403 deletions(-) create mode 100644 mealie/services/event_bus_service/event_bus_listeners.py create mode 100644 mealie/services/event_bus_service/event_types.py delete mode 100644 mealie/services/event_bus_service/message_types.py diff --git a/mealie/core/dependencies/dependencies.py b/mealie/core/dependencies/dependencies.py index ad9caac55c5e..2c277c1ce82e 100644 --- a/mealie/core/dependencies/dependencies.py +++ b/mealie/core/dependencies/dependencies.py @@ -14,6 +14,7 @@ from mealie.core.config import get_app_dirs, get_app_settings from mealie.db.db_setup import generate_session from mealie.repos.all_repositories import get_repositories from mealie.schema.user import PrivateUser, TokenData +from mealie.schema.user.user import DEFAULT_INTEGRATION_ID oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token") oauth2_scheme_soft_fail = OAuth2PasswordBearer(tokenUrl="/api/auth/token", auto_error=False) @@ -83,6 +84,21 @@ async def get_current_user(token: str = Depends(oauth2_scheme), session=Depends( return user +async def get_integration_id(token: str = Depends(oauth2_scheme)) -> str: + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + + try: + decoded_token = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) + return decoded_token.get("integration_id", DEFAULT_INTEGRATION_ID) + + except JWTError as e: + raise credentials_exception from e + + async def get_admin_user(current_user: PrivateUser = Depends(get_current_user)) -> PrivateUser: if not current_user.admin: raise HTTPException(status.HTTP_403_FORBIDDEN) diff --git a/mealie/routes/_base/base_controllers.py b/mealie/routes/_base/base_controllers.py index 71a8c1b4e8d0..6dd63c0cc2e5 100644 --- a/mealie/routes/_base/base_controllers.py +++ b/mealie/routes/_base/base_controllers.py @@ -6,7 +6,7 @@ from pydantic import UUID4 from sqlalchemy.orm import Session from mealie.core.config import get_app_dirs, get_app_settings -from mealie.core.dependencies.dependencies import get_admin_user, get_current_user +from mealie.core.dependencies.dependencies import get_admin_user, get_current_user, get_integration_id from mealie.core.exceptions import mealie_registered_exceptions from mealie.core.root_logger import get_logger from mealie.core.settings.directories import AppDirectories @@ -17,6 +17,8 @@ from mealie.lang.providers import Translator from mealie.repos.all_repositories import AllRepositories from mealie.routes._base.checks import OperationChecks from mealie.schema.user.user import GroupInDB, PrivateUser +from mealie.services.event_bus_service.event_bus_service import EventBusService +from mealie.services.event_bus_service.event_types import EventDocumentDataBase, EventTypes class _BaseController(ABC): @@ -78,6 +80,7 @@ class BaseUserController(_BaseController): """ user: PrivateUser = Depends(get_current_user) + integration_id: str = Depends(get_integration_id) translator: Translator = Depends(local_provider) # Manual Cache @@ -112,3 +115,20 @@ class BaseAdminController(BaseUserController): """ user: PrivateUser = Depends(get_admin_user) + + +class BaseCrudController(BaseUserController): + """ + Base class for all CRUD controllers to facilitate common CRUD functions. + """ + + event_bus: EventBusService = Depends(EventBusService) + + def publish_event(self, event_type: EventTypes, document_data: EventDocumentDataBase, message: str = "") -> None: + self.event_bus.dispatch( + integration_id=self.integration_id, + group_id=self.group_id, + event_type=event_type, + document_data=document_data, + message=message, + ) diff --git a/mealie/routes/groups/controller_cookbooks.py b/mealie/routes/groups/controller_cookbooks.py index bc3730b2a439..d2027cd57d64 100644 --- a/mealie/routes/groups/controller_cookbooks.py +++ b/mealie/routes/groups/controller_cookbooks.py @@ -4,24 +4,25 @@ from fastapi import APIRouter, Depends, HTTPException from pydantic import UUID4 from mealie.core.exceptions import mealie_registered_exceptions -from mealie.routes._base import BaseUserController, controller +from mealie.routes._base import BaseCrudController, controller from mealie.routes._base.mixins import HttpRepo from mealie.routes._base.routers import MealieCrudRoute from mealie.schema import mapper from mealie.schema.cookbook import CreateCookBook, ReadCookBook, RecipeCookBook, SaveCookBook, UpdateCookBook from mealie.schema.cookbook.cookbook import CookBookPagination from mealie.schema.response.pagination import PaginationQuery -from mealie.services.event_bus_service.event_bus_service import EventBusService, EventSource -from mealie.services.event_bus_service.message_types import EventTypes +from mealie.services.event_bus_service.event_types import ( + EventCookbookBulkData, + EventCookbookData, + EventOperation, + EventTypes, +) router = APIRouter(prefix="/groups/cookbooks", tags=["Groups: Cookbooks"], route_class=MealieCrudRoute) @controller(router) -class GroupCookbookController(BaseUserController): - - event_bus: EventBusService = Depends(EventBusService) - +class GroupCookbookController(BaseCrudController): @cached_property def repo(self): return self.repos.cookbooks.by_group(self.group_id) @@ -53,16 +54,16 @@ class GroupCookbookController(BaseUserController): @router.post("", response_model=ReadCookBook, status_code=201) def create_one(self, data: CreateCookBook): data = mapper.cast(data, SaveCookBook, group_id=self.group_id) - val = self.mixins.create_one(data) + cookbook = self.mixins.create_one(data) - if val: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.cookbook_created, - msg=self.t("notifications.generic-created", name=val.name), - event_source=EventSource(event_type="create", item_type="cookbook", item_id=val.id, slug=val.slug), + if cookbook: + self.publish_event( + event_type=EventTypes.cookbook_created, + document_data=EventCookbookData(operation=EventOperation.create, cookbook_id=cookbook.id), + message=self.t("notifications.generic-created", name=cookbook.name), ) - return val + + return cookbook @router.put("", response_model=list[ReadCookBook]) def update_many(self, data: list[UpdateCookBook]): @@ -72,6 +73,14 @@ class GroupCookbookController(BaseUserController): cb = self.mixins.update_one(cookbook, cookbook.id) updated.append(cb) + if updated: + self.publish_event( + event_type=EventTypes.cookbook_updated, + document_data=EventCookbookBulkData( + operation=EventOperation.update, cookbook_ids=[cb.id for cb in updated] + ), + ) + return updated @router.get("/{item_id}", response_model=RecipeCookBook) @@ -96,25 +105,24 @@ class GroupCookbookController(BaseUserController): @router.put("/{item_id}", response_model=ReadCookBook) def update_one(self, item_id: str, data: CreateCookBook): - val = self.mixins.update_one(data, item_id) # type: ignore - if val: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.cookbook_updated, - msg=self.t("notifications.generic-updated", name=val.name), - event_source=EventSource(event_type="update", item_type="cookbook", item_id=val.id, slug=val.slug), + cookbook = self.mixins.update_one(data, item_id) # type: ignore + if cookbook: + self.publish_event( + event_type=EventTypes.cookbook_updated, + document_data=EventCookbookData(operation=EventOperation.update, cookbook_id=cookbook.id), + message=self.t("notifications.generic-updated", name=cookbook.name), ) - return val + return cookbook @router.delete("/{item_id}", response_model=ReadCookBook) def delete_one(self, item_id: str): - val = self.mixins.delete_one(item_id) - if val: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.cookbook_deleted, - msg=self.t("notifications.generic-deleted", name=val.name), - event_source=EventSource(event_type="delete", item_type="cookbook", item_id=val.id, slug=val.slug), + cookbook = self.mixins.delete_one(item_id) + if cookbook: + self.publish_event( + event_type=EventTypes.cookbook_deleted, + document_data=EventCookbookData(operation=EventOperation.delete, cookbook_id=cookbook.id), + message=self.t("notifications.generic-deleted", name=cookbook.name), ) - return val + + return cookbook diff --git a/mealie/routes/groups/controller_group_notifications.py b/mealie/routes/groups/controller_group_notifications.py index 3e133b9f0508..89196b93f90d 100644 --- a/mealie/routes/groups/controller_group_notifications.py +++ b/mealie/routes/groups/controller_group_notifications.py @@ -17,7 +17,16 @@ from mealie.schema.group.group_events import ( ) from mealie.schema.mapper import cast from mealie.schema.response.pagination import PaginationQuery +from mealie.services.event_bus_service.event_bus_listeners import AppriseEventListener from mealie.services.event_bus_service.event_bus_service import EventBusService +from mealie.services.event_bus_service.event_types import ( + Event, + EventBusMessage, + EventDocumentDataBase, + EventDocumentType, + EventOperation, + EventTypes, +) router = APIRouter( prefix="/groups/events/notifications", tags=["Group: Event Notifications"], route_class=MealieCrudRoute @@ -78,7 +87,18 @@ class GroupEventsNotifierController(BaseUserController): # ======================================================================= # Test Event Notifications + # TODO: properly re-implement this with new event listeners @router.post("/{item_id}/test", status_code=204) def test_notification(self, item_id: UUID4): item: GroupEventNotifierPrivate = self.repo.get_one(item_id, override_schema=GroupEventNotifierPrivate) - self.event_bus.test_publisher(item.apprise_url) + + event_type = EventTypes.test_message + test_event = Event( + message=EventBusMessage.from_type(event_type, "test message"), + event_type=event_type, + integration_id="test_event", + document_data=EventDocumentDataBase(document_type=EventDocumentType.generic, operation=EventOperation.info), + ) + + test_listener = AppriseEventListener(self.event_bus.session, self.group_id) + test_listener.publish_to_subscribers(test_event, [item.apprise_url]) diff --git a/mealie/routes/groups/controller_shopping_lists.py b/mealie/routes/groups/controller_shopping_lists.py index 3961133c0d84..567fb06990ee 100644 --- a/mealie/routes/groups/controller_shopping_lists.py +++ b/mealie/routes/groups/controller_shopping_lists.py @@ -3,7 +3,7 @@ from functools import cached_property from fastapi import APIRouter, Depends, Query from pydantic import UUID4 -from mealie.routes._base.base_controllers import BaseUserController +from mealie.routes._base.base_controllers import BaseCrudController from mealie.routes._base.controller import controller from mealie.routes._base.mixins import HttpRepo from mealie.schema.group.group_shopping_list import ( @@ -20,18 +20,20 @@ from mealie.schema.group.group_shopping_list import ( from mealie.schema.mapper import cast from mealie.schema.response.pagination import PaginationQuery from mealie.schema.response.responses import SuccessResponse -from mealie.services.event_bus_service.event_bus_service import EventBusService, EventSource -from mealie.services.event_bus_service.message_types import EventTypes +from mealie.services.event_bus_service.event_types import ( + EventOperation, + EventShoppingListData, + EventShoppingListItemBulkData, + EventShoppingListItemData, + EventTypes, +) from mealie.services.group_services.shopping_lists import ShoppingListService item_router = APIRouter(prefix="/groups/shopping/items", tags=["Group: Shopping List Items"]) @controller(item_router) -class ShoppingListItemController(BaseUserController): - - event_bus: EventBusService = Depends(EventBusService) - +class ShoppingListItemController(BaseCrudController): @cached_property def service(self): return ShoppingListService(self.repos) @@ -79,19 +81,17 @@ class ShoppingListItemController(BaseUserController): shopping_list_item = self.mixins.create_one(data) if shopping_list_item: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.shopping_list_updated, - msg=self.t( + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListItemData( + operation=EventOperation.create, + shopping_list_id=shopping_list_item.shopping_list_id, + shopping_list_item_id=shopping_list_item.id, + ), + message=self.t( "notifications.generic-created", name=f"An item on shopping list {shopping_list_item.shopping_list_id}", ), - event_source=EventSource( - event_type="create", - item_type="shopping-list-item", - item_id=shopping_list_item.id, - shopping_list_id=shopping_list_item.shopping_list_id, - ), ) return shopping_list_item @@ -105,19 +105,17 @@ class ShoppingListItemController(BaseUserController): shopping_list_item = self.mixins.update_one(data, item_id) if shopping_list_item: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.shopping_list_updated, - msg=self.t( + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListItemData( + operation=EventOperation.update, + shopping_list_id=shopping_list_item.shopping_list_id, + shopping_list_item_id=shopping_list_item.id, + ), + message=self.t( "notifications.generic-updated", name=f"An item on shopping list {shopping_list_item.shopping_list_id}", ), - event_source=EventSource( - event_type="update", - item_type="shopping-list-item", - item_id=shopping_list_item.id, - shopping_list_id=shopping_list_item.shopping_list_id, - ), ) return shopping_list_item @@ -127,19 +125,17 @@ class ShoppingListItemController(BaseUserController): shopping_list_item = self.mixins.delete_one(item_id) # type: ignore if shopping_list_item: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.shopping_list_updated, - msg=self.t( + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListItemData( + operation=EventOperation.delete, + shopping_list_id=shopping_list_item.shopping_list_id, + shopping_list_item_id=shopping_list_item.id, + ), + message=self.t( "notifications.generic-deleted", name=f"An item on shopping list {shopping_list_item.shopping_list_id}", ), - event_source=EventSource( - event_type="delete", - item_type="shopping-list-item", - item_id=shopping_list_item.id, - shopping_list_id=shopping_list_item.shopping_list_id, - ), ) return shopping_list_item @@ -149,9 +145,7 @@ router = APIRouter(prefix="/groups/shopping/lists", tags=["Group: Shopping Lists @controller(router) -class ShoppingListController(BaseUserController): - event_bus: EventBusService = Depends(EventBusService) - +class ShoppingListController(BaseCrudController): @cached_property def service(self): return ShoppingListService(self.repos) @@ -180,21 +174,16 @@ class ShoppingListController(BaseUserController): @router.post("", response_model=ShoppingListOut, status_code=201) def create_one(self, data: ShoppingListCreate): save_data = cast(data, ShoppingListSave, group_id=self.user.group_id) - val = self.mixins.create_one(save_data) + shopping_list = self.mixins.create_one(save_data) - if val: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.shopping_list_created, - msg=self.t("notifications.generic-created", name=val.name), - event_source=EventSource( - event_type="create", - item_type="shopping-list", - item_id=val.id, - ), + if shopping_list: + self.publish_event( + event_type=EventTypes.shopping_list_created, + document_data=EventShoppingListData(operation=EventOperation.create, shopping_list_id=shopping_list.id), + message=self.t("notifications.generic-created", name=shopping_list.name), ) - return val + return shopping_list @router.get("/{item_id}", response_model=ShoppingListOut) def get_one(self, item_id: UUID4): @@ -202,54 +191,72 @@ class ShoppingListController(BaseUserController): @router.put("/{item_id}", response_model=ShoppingListOut) def update_one(self, item_id: UUID4, data: ShoppingListUpdate): - data = self.mixins.update_one(data, item_id) # type: ignore - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.shopping_list_updated, - msg=self.t("notifications.generic-updated", name=data.name), - event_source=EventSource( - event_type="update", - item_type="shopping-list", - item_id=data.id, - ), + shopping_list = self.mixins.update_one(data, item_id) # type: ignore + + if shopping_list: + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListData(operation=EventOperation.update, shopping_list_id=shopping_list.id), + message=self.t("notifications.generic-updated", name=shopping_list.name), ) - return data + + return shopping_list @router.delete("/{item_id}", response_model=ShoppingListOut) def delete_one(self, item_id: UUID4): - data = self.mixins.delete_one(item_id) # type: ignore - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.shopping_list_deleted, - msg=self.t("notifications.generic-deleted", name=data.name), - event_source=EventSource( - event_type="delete", - item_type="shopping-list", - item_id=data.id, - ), + shopping_list = self.mixins.delete_one(item_id) # type: ignore + if shopping_list: + self.publish_event( + event_type=EventTypes.shopping_list_deleted, + document_data=EventShoppingListData(operation=EventOperation.delete, shopping_list_id=shopping_list.id), + message=self.t("notifications.generic-deleted", name=shopping_list.name), ) - return data + + return shopping_list # ======================================================================= # Other Operations @router.post("/{item_id}/recipe/{recipe_id}", response_model=ShoppingListOut) def add_recipe_ingredients_to_list(self, item_id: UUID4, recipe_id: UUID4): - shopping_list = self.service.add_recipe_ingredients_to_list(item_id, recipe_id) - if shopping_list: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.shopping_list_updated, - msg=self.t( - "notifications.generic-updated", - name=shopping_list.name, + ( + shopping_list, + new_shopping_list_items, + updated_shopping_list_items, + deleted_shopping_list_items, + ) = self.service.add_recipe_ingredients_to_list(item_id, recipe_id) + + if new_shopping_list_items: + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListItemBulkData( + operation=EventOperation.create, + shopping_list_id=shopping_list.id, + shopping_list_item_ids=[shopping_list_item.id for shopping_list_item in new_shopping_list_items], ), - event_source=EventSource( - event_type="bulk-updated-items", - item_type="shopping-list", - item_id=shopping_list.id, + ) + + if updated_shopping_list_items: + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListItemBulkData( + operation=EventOperation.update, + shopping_list_id=shopping_list.id, + shopping_list_item_ids=[ + shopping_list_item.id for shopping_list_item in updated_shopping_list_items + ], + ), + ) + + if deleted_shopping_list_items: + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListItemBulkData( + operation=EventOperation.delete, + shopping_list_id=shopping_list.id, + shopping_list_item_ids=[ + shopping_list_item.id for shopping_list_item in deleted_shopping_list_items + ], ), ) @@ -257,19 +264,33 @@ class ShoppingListController(BaseUserController): @router.delete("/{item_id}/recipe/{recipe_id}", response_model=ShoppingListOut) def remove_recipe_ingredients_from_list(self, item_id: UUID4, recipe_id: UUID4): - shopping_list = self.service.remove_recipe_ingredients_from_list(item_id, recipe_id) - if shopping_list: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.shopping_list_updated, - msg=self.t( - "notifications.generic-updated", - name=shopping_list.name, + ( + shopping_list, + updated_shopping_list_items, + deleted_shopping_list_items, + ) = self.service.remove_recipe_ingredients_from_list(item_id, recipe_id) + + if updated_shopping_list_items: + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListItemBulkData( + operation=EventOperation.update, + shopping_list_id=shopping_list.id, + shopping_list_item_ids=[ + shopping_list_item.id for shopping_list_item in updated_shopping_list_items + ], ), - event_source=EventSource( - event_type="bulk-updated-items", - item_type="shopping-list", - item_id=shopping_list.id, + ) + + if deleted_shopping_list_items: + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListItemBulkData( + operation=EventOperation.delete, + shopping_list_id=shopping_list.id, + shopping_list_item_ids=[ + shopping_list_item.id for shopping_list_item in deleted_shopping_list_items + ], ), ) diff --git a/mealie/routes/organizers/controller_categories.py b/mealie/routes/organizers/controller_categories.py index 7db9b9f8c465..3e584225b940 100644 --- a/mealie/routes/organizers/controller_categories.py +++ b/mealie/routes/organizers/controller_categories.py @@ -3,7 +3,7 @@ from functools import cached_property from fastapi import APIRouter, Depends from pydantic import UUID4, BaseModel -from mealie.routes._base import BaseUserController, controller +from mealie.routes._base import BaseCrudController, controller from mealie.routes._base.mixins import HttpRepo from mealie.schema import mapper from mealie.schema.recipe import CategoryIn, RecipeCategoryResponse @@ -11,8 +11,7 @@ from mealie.schema.recipe.recipe import RecipeCategory, RecipeCategoryPagination from mealie.schema.recipe.recipe_category import CategoryBase, CategorySave from mealie.schema.response.pagination import PaginationQuery from mealie.services import urls -from mealie.services.event_bus_service.event_bus_service import EventBusService, EventSource -from mealie.services.event_bus_service.message_types import EventTypes +from mealie.services.event_bus_service.event_types import EventCategoryData, EventOperation, EventTypes router = APIRouter(prefix="/categories", tags=["Organizer: Categories"]) @@ -27,10 +26,7 @@ class CategorySummary(BaseModel): @controller(router) -class RecipeCategoryController(BaseUserController): - - event_bus: EventBusService = Depends(EventBusService) - +class RecipeCategoryController(BaseCrudController): # ========================================================================= # CRUD Operations @cached_property @@ -56,19 +52,19 @@ class RecipeCategoryController(BaseUserController): def create_one(self, category: CategoryIn): """Creates a Category in the database""" save_data = mapper.cast(category, CategorySave, group_id=self.group_id) - data = self.mixins.create_one(save_data) - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.category_created, - msg=self.t( + new_category = self.mixins.create_one(save_data) + if new_category: + self.publish_event( + event_type=EventTypes.category_created, + document_data=EventCategoryData(operation=EventOperation.create, category_id=new_category.id), + message=self.t( "notifications.generic-created-with-url", - name=data.name, - url=urls.category_url(data.slug, self.settings.BASE_URL), + name=new_category.name, + url=urls.category_url(new_category.slug, self.settings.BASE_URL), ), - event_source=EventSource(event_type="create", item_type="category", item_id=data.id, slug=data.slug), ) - return data + + return new_category @router.get("/{item_id}", response_model=CategorySummary) def get_one(self, item_id: UUID4): @@ -81,20 +77,20 @@ class RecipeCategoryController(BaseUserController): def update_one(self, item_id: UUID4, update_data: CategoryIn): """Updates an existing Tag in the database""" save_data = mapper.cast(update_data, CategorySave, group_id=self.group_id) - data = self.mixins.update_one(save_data, item_id) + category = self.mixins.update_one(save_data, item_id) - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.category_updated, - msg=self.t( + if category: + self.publish_event( + event_type=EventTypes.category_updated, + document_data=EventCategoryData(operation=EventOperation.update, category_id=category.id), + message=self.t( "notifications.generic-updated-with-url", - name=data.name, - url=urls.category_url(data.slug, self.settings.BASE_URL), + name=category.name, + url=urls.category_url(category.slug, self.settings.BASE_URL), ), - event_source=EventSource(event_type="update", item_type="category", item_id=data.id, slug=data.slug), ) - return data + + return category @router.delete("/{item_id}") def delete_one(self, item_id: UUID4): @@ -103,12 +99,11 @@ class RecipeCategoryController(BaseUserController): category does not impact a recipe. The category will be removed from any recipes that contain it """ - if data := self.mixins.delete_one(item_id): - self.event_bus.dispatch( - self.user.group_id, - EventTypes.category_deleted, - msg=self.t("notifications.generic-deleted", name=data.name), - event_source=EventSource(event_type="delete", item_type="category", item_id=data.id, slug=data.slug), + if category := self.mixins.delete_one(item_id): + self.publish_event( + event_type=EventTypes.category_deleted, + document_data=EventCategoryData(operation=EventOperation.delete, category_id=category.id), + message=self.t("notifications.generic-deleted", name=category.name), ) # ========================================================================= diff --git a/mealie/routes/organizers/controller_tags.py b/mealie/routes/organizers/controller_tags.py index fc1daec59aa9..5f993890dd2f 100644 --- a/mealie/routes/organizers/controller_tags.py +++ b/mealie/routes/organizers/controller_tags.py @@ -3,7 +3,7 @@ from functools import cached_property from fastapi import APIRouter, Depends, HTTPException, status from pydantic import UUID4 -from mealie.routes._base import BaseUserController, controller +from mealie.routes._base import BaseCrudController, controller from mealie.routes._base.mixins import HttpRepo from mealie.schema import mapper from mealie.schema.recipe import RecipeTagResponse, TagIn @@ -11,17 +11,13 @@ from mealie.schema.recipe.recipe import RecipeTag, RecipeTagPagination from mealie.schema.recipe.recipe_category import TagSave from mealie.schema.response.pagination import PaginationQuery from mealie.services import urls -from mealie.services.event_bus_service.event_bus_service import EventBusService, EventSource -from mealie.services.event_bus_service.message_types import EventTypes +from mealie.services.event_bus_service.event_types import EventOperation, EventTagData, EventTypes router = APIRouter(prefix="/tags", tags=["Organizer: Tags"]) @controller(router) -class TagController(BaseUserController): - - event_bus: EventBusService = Depends(EventBusService) - +class TagController(BaseCrudController): @cached_property def repo(self): return self.repos.tags.by_group(self.group_id) @@ -55,55 +51,58 @@ class TagController(BaseUserController): def create_one(self, tag: TagIn): """Creates a Tag in the database""" save_data = mapper.cast(tag, TagSave, group_id=self.group_id) - data = self.repo.create(save_data) - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.tag_created, - msg=self.t( + new_tag = self.repo.create(save_data) + + if new_tag: + self.publish_event( + event_type=EventTypes.tag_created, + document_data=EventTagData(operation=EventOperation.create, tag_id=new_tag.id), + message=self.t( "notifications.generic-created-with-url", - name=data.name, - url=urls.tag_url(data.slug, self.settings.BASE_URL), + name=new_tag.name, + url=urls.tag_url(new_tag.slug, self.settings.BASE_URL), ), - event_source=EventSource(event_type="create", item_type="tag", item_id=data.id, slug=data.slug), ) - return data + + return new_tag @router.put("/{item_id}", response_model=RecipeTagResponse) def update_one(self, item_id: UUID4, new_tag: TagIn): """Updates an existing Tag in the database""" save_data = mapper.cast(new_tag, TagSave, group_id=self.group_id) - data = self.repo.update(item_id, save_data) - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.tag_updated, - msg=self.t( + tag = self.repo.update(item_id, save_data) + + if tag: + self.publish_event( + event_type=EventTypes.tag_updated, + document_data=EventTagData(operation=EventOperation.update, tag_id=tag.id), + message=self.t( "notifications.generic-updated-with-url", - name=data.name, - url=urls.tag_url(data.slug, self.settings.BASE_URL), + name=tag.name, + url=urls.tag_url(tag.slug, self.settings.BASE_URL), ), - event_source=EventSource(event_type="update", item_type="tag", item_id=data.id, slug=data.slug), ) - return data + + return tag @router.delete("/{item_id}") def delete_recipe_tag(self, item_id: UUID4): - """Removes a recipe tag from the database. Deleting a + """ + Removes a recipe tag from the database. Deleting a tag does not impact a recipe. The tag will be removed - from any recipes that contain it""" + from any recipes that contain it + """ try: - data = self.repo.delete(item_id) + tag = self.repo.delete(item_id) except Exception as e: raise HTTPException(status.HTTP_400_BAD_REQUEST) from e - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.tag_deleted, - msg=self.t("notifications.generic-deleted", name=data.name), - event_source=EventSource(event_type="delete", item_type="tag", item_id=data.id, slug=data.slug), + if tag: + self.publish_event( + event_type=EventTypes.tag_deleted, + document_data=EventTagData(operation=EventOperation.delete, tag_id=tag.id), + message=self.t("notifications.generic-deleted", name=tag.name), ) @router.get("/slug/{tag_slug}", response_model=RecipeTagResponse) diff --git a/mealie/routes/recipe/recipe_crud_routes.py b/mealie/routes/recipe/recipe_crud_routes.py index 7a2f64778de2..8742466452f4 100644 --- a/mealie/routes/recipe/recipe_crud_routes.py +++ b/mealie/routes/recipe/recipe_crud_routes.py @@ -18,7 +18,7 @@ from mealie.core.dependencies.dependencies import temporary_dir, validate_recipe from mealie.core.security import create_recipe_slug_token from mealie.pkgs import cache from mealie.repos.repository_recipes import RepositoryRecipes -from mealie.routes._base import BaseUserController, controller +from mealie.routes._base import BaseCrudController, controller from mealie.routes._base.mixins import HttpRepo from mealie.routes._base.routers import MealieCrudRoute, UserAPIRouter from mealie.schema.recipe import Recipe, RecipeImageTypes, ScrapeRecipe @@ -34,8 +34,12 @@ from mealie.schema.recipe.recipe_scraper import ScrapeRecipeTest from mealie.schema.recipe.request_helpers import RecipeZipTokenResponse, UpdateImageResponse from mealie.schema.response.responses import ErrorResponse from mealie.services import urls -from mealie.services.event_bus_service.event_bus_service import EventBusService, EventSource -from mealie.services.event_bus_service.message_types import EventTypes +from mealie.services.event_bus_service.event_types import ( + EventOperation, + EventRecipeBulkReportData, + EventRecipeData, + EventTypes, +) from mealie.services.recipe.recipe_data_service import InvalidDomainError, NotAnImageError, RecipeDataService from mealie.services.recipe.recipe_service import RecipeService from mealie.services.recipe.template_service import TemplateService @@ -45,7 +49,7 @@ from mealie.services.scraper.scraper import create_from_url from mealie.services.scraper.scraper_strategies import ForceTimeoutException, RecipeScraperPackage -class BaseRecipeController(BaseUserController): +class BaseRecipeController(BaseCrudController): @cached_property def repo(self) -> RepositoryRecipes: return self.repos.recipes.by_group(self.group_id) @@ -119,8 +123,6 @@ router = UserAPIRouter(prefix="/recipes", tags=["Recipe: CRUD"], route_class=Mea @controller(router) class RecipeController(BaseRecipeController): - event_bus: EventBusService = Depends(EventBusService) - def handle_exceptions(self, ex: Exception) -> None: match type(ex): case exceptions.PermissionDenied: @@ -161,17 +163,14 @@ class RecipeController(BaseRecipeController): new_recipe = self.service.create_one(recipe) if new_recipe: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.recipe_created, - msg=self.t( + self.publish_event( + event_type=EventTypes.recipe_created, + document_data=EventRecipeData(operation=EventOperation.create, recipe_slug=new_recipe.slug), + message=self.t( "notifications.generic-created-with-url", name=new_recipe.name, url=urls.recipe_url(new_recipe.slug, self.settings.BASE_URL), ), - event_source=EventSource( - event_type="create", item_type="recipe", item_id=new_recipe.id, slug=new_recipe.slug - ), ) return new_recipe.slug @@ -183,6 +182,11 @@ class RecipeController(BaseRecipeController): report_id = bulk_scraper.get_report_id() bg_tasks.add_task(bulk_scraper.scrape, bulk) + self.publish_event( + event_type=EventTypes.recipe_created, + document_data=EventRecipeBulkReportData(operation=EventOperation.create, report_id=report_id), + ) + return {"reportId": report_id} @router.post("/test-scrape-url") @@ -202,6 +206,11 @@ class RecipeController(BaseRecipeController): def create_recipe_from_zip(self, temp_path=Depends(temporary_zip_path), archive: UploadFile = File(...)): """Create recipe from archive""" recipe = self.service.create_from_zip(archive, temp_path) + self.publish_event( + event_type=EventTypes.recipe_created, + document_data=EventRecipeData(operation=EventOperation.create, recipe_slug=recipe.slug), + ) + return recipe.slug # ================================================================================================================== @@ -215,7 +224,7 @@ class RecipeController(BaseRecipeController): tags: Optional[list[UUID4 | str]] = Query(None), tools: Optional[list[UUID4 | str]] = Query(None), ): - response = self.repo.page_all( + pagination_response = self.repo.page_all( pagination=q, load_food=q.load_food, categories=categories, @@ -223,10 +232,10 @@ class RecipeController(BaseRecipeController): tools=tools, ) - response.set_pagination_guides(router.url_path_for("get_all"), q.dict()) + pagination_response.set_pagination_guides(router.url_path_for("get_all"), q.dict()) new_items = [] - for item in response.items: + for item in pagination_response.items: # Pydantic/FastAPI can't seem to serialize the ingredient field on thier own. new_item = item.__dict__ @@ -235,8 +244,8 @@ class RecipeController(BaseRecipeController): new_items.append(new_item) - response.items = [RecipeSummary.construct(**x) for x in new_items] - json_compatible_response = jsonable_encoder(response) + pagination_response.items = [RecipeSummary.construct(**x) for x in new_items] + json_compatible_response = jsonable_encoder(pagination_response) # Response is returned directly, to avoid validation and improve performance return JSONResponse(content=json_compatible_response) @@ -256,17 +265,14 @@ class RecipeController(BaseRecipeController): return None if new_recipe: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.recipe_created, - msg=self.t( + self.publish_event( + event_type=EventTypes.recipe_created, + document_data=EventRecipeData(operation=EventOperation.create, recipe_slug=new_recipe.slug), + message=self.t( "notifications.generic-created-with-url", name=new_recipe.name, url=urls.recipe_url(new_recipe.slug, self.settings.BASE_URL), ), - event_source=EventSource( - event_type="create", item_type="recipe", item_id=new_recipe.id, slug=new_recipe.slug - ), ) return new_recipe.slug @@ -275,63 +281,60 @@ class RecipeController(BaseRecipeController): def update_one(self, slug: str, data: Recipe): """Updates a recipe by existing slug and data.""" try: - data = self.service.update_one(slug, data) + recipe = self.service.update_one(slug, data) except Exception as e: self.handle_exceptions(e) - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.recipe_updated, - msg=self.t( + if recipe: + self.publish_event( + event_type=EventTypes.recipe_updated, + document_data=EventRecipeData(operation=EventOperation.update, recipe_slug=recipe.slug), + message=self.t( "notifications.generic-updated-with-url", - name=data.name, - url=urls.recipe_url(data.slug, self.settings.BASE_URL), + name=recipe.name, + url=urls.recipe_url(recipe.slug, self.settings.BASE_URL), ), - event_source=EventSource(event_type="update", item_type="recipe", item_id=data.id, slug=data.slug), ) - return data + return recipe @router.patch("/{slug}") def patch_one(self, slug: str, data: Recipe): """Updates a recipe by existing slug and data.""" try: - data = self.service.patch_one(slug, data) + recipe = self.service.patch_one(slug, data) except Exception as e: self.handle_exceptions(e) - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.recipe_updated, - msg=self.t( + if recipe: + self.publish_event( + event_type=EventTypes.recipe_updated, + document_data=EventRecipeData(operation=EventOperation.update, recipe_slug=recipe.slug), + message=self.t( "notifications.generic-updated-with-url", - name=data.name, - url=urls.recipe_url(data.slug, self.settings.BASE_URL), + name=recipe.name, + url=urls.recipe_url(recipe.slug, self.settings.BASE_URL), ), - event_source=EventSource(event_type="update", item_type="recipe", item_id=data.id, slug=data.slug), ) - return data + return recipe @router.delete("/{slug}") def delete_one(self, slug: str): """Deletes a recipe by slug""" try: - data = self.service.delete_one(slug) + recipe = self.service.delete_one(slug) except Exception as e: self.handle_exceptions(e) - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.recipe_deleted, - msg=self.t("notifications.generic-deleted", name=data.name), - event_source=EventSource(event_type="delete", item_type="recipe", item_id=data.id, slug=data.slug), + if recipe: + self.publish_event( + event_type=EventTypes.recipe_deleted, + document_data=EventRecipeData(operation=EventOperation.delete, recipe_slug=recipe.slug), + message=self.t("notifications.generic-deleted", name=recipe.name), ) - return data + return recipe # ================================================================================================================== # Image and Assets diff --git a/mealie/routes/users/api_tokens.py b/mealie/routes/users/api_tokens.py index 90a7015cf194..8b8f31766052 100644 --- a/mealie/routes/users/api_tokens.py +++ b/mealie/routes/users/api_tokens.py @@ -15,17 +15,22 @@ class UserApiTokensController(BaseUserController): @router.post("/api-tokens", status_code=status.HTTP_201_CREATED, response_model=LongLiveTokenOut) def create_api_token( self, - token_name: LongLiveTokenIn, + token_params: LongLiveTokenIn, ): """Create api_token in the Database""" - token_data = {"long_token": True, "id": str(self.user.id)} + token_data = { + "long_token": True, + "id": str(self.user.id), + "name": token_params.name, + "integration_id": token_params.integration_id, + } five_years = timedelta(1825) token = create_access_token(token_data, five_years) token_model = CreateToken( - name=token_name.name, + name=token_params.name, token=token, user_id=self.user.id, ) diff --git a/mealie/schema/user/user.py b/mealie/schema/user/user.py index 2d32257592cc..b3faffee2c75 100644 --- a/mealie/schema/user/user.py +++ b/mealie/schema/user/user.py @@ -16,11 +16,13 @@ from mealie.schema.response.pagination import PaginationBase from ..recipe import CategoryBase +DEFAULT_INTEGRATION_ID = "generic" settings = get_app_settings() class LongLiveTokenIn(MealieModel): name: str + integration_id: str = DEFAULT_INTEGRATION_ID class LongLiveTokenOut(MealieModel): diff --git a/mealie/services/event_bus_service/event_bus_listeners.py b/mealie/services/event_bus_service/event_bus_listeners.py new file mode 100644 index 000000000000..5cf0bed987c0 --- /dev/null +++ b/mealie/services/event_bus_service/event_bus_listeners.py @@ -0,0 +1,80 @@ +import json +from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit + +from fastapi.encoders import jsonable_encoder +from pydantic import UUID4 +from sqlalchemy.orm.session import Session + +from mealie.repos.repository_factory import AllRepositories +from mealie.schema.group.group_events import GroupEventNotifierPrivate + +from .event_types import Event +from .publisher import ApprisePublisher, PublisherLike + + +class EventListenerBase: + def __init__(self, session: Session, group_id: UUID4, publisher: PublisherLike) -> None: + self.session = session + self.group_id = group_id + self.publisher = publisher + + def get_subscribers(self, event: Event) -> list: + """Get a list of all subscribers to this event""" + ... + + def publish_to_subscribers(self, event: Event, subscribers: list) -> None: + """Publishes the event to all subscribers""" + ... + + +class AppriseEventListener(EventListenerBase): + def __init__(self, session: Session, group_id: UUID4) -> None: + super().__init__(session, group_id, ApprisePublisher()) + + def get_subscribers(self, event: Event) -> list[str]: + repos = AllRepositories(self.session) + + notifiers: list[GroupEventNotifierPrivate] = repos.group_event_notifier.by_group( # type: ignore + self.group_id + ).multi_query({"enabled": True}, override_schema=GroupEventNotifierPrivate) + + urls = [notifier.apprise_url for notifier in notifiers if getattr(notifier.options, event.event_type.name)] + urls = AppriseEventListener.update_urls_with_event_data(urls, event) + + return urls + + def publish_to_subscribers(self, event: Event, subscribers: list[str]) -> None: + self.publisher.publish(event, subscribers) + + @staticmethod + def update_urls_with_event_data(urls: list[str], event: Event): + params = { + "event_type": event.event_type.name, + "integration_id": event.integration_id, + "document_data": json.dumps(jsonable_encoder(event.document_data)), + "event_id": str(event.event_id), + "timestamp": event.timestamp.isoformat(), + } + + return [ + # We use query params to add custom key: value pairs to the Apprise payload by prepending the key with ":". + AppriseEventListener.merge_query_parameters(url, {f":{k}": v for k, v in params.items()}) + # only certain endpoints support the custom key: value pairs, so we only apply them to those endpoints + if AppriseEventListener.is_custom_url(url) else url + for url in urls + ] + + @staticmethod + def merge_query_parameters(url: str, params: dict): + scheme, netloc, path, query_string, fragment = urlsplit(url) + + # merge query params + query_params = parse_qs(query_string) + query_params.update(params) + new_query_string = urlencode(query_params, doseq=True) + + return urlunsplit((scheme, netloc, path, new_query_string, fragment)) + + @staticmethod + def is_custom_url(url: str): + return url.split(":", 1)[0].lower() in ["form", "forms", "json", "jsons", "xml", "xmls"] diff --git a/mealie/services/event_bus_service/event_bus_service.py b/mealie/services/event_bus_service/event_bus_service.py index 8015c73210d5..83211e864b86 100644 --- a/mealie/services/event_bus_service/event_bus_service.py +++ b/mealie/services/event_bus_service/event_bus_service.py @@ -1,14 +1,16 @@ -from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit +from typing import Optional from fastapi import BackgroundTasks, Depends from pydantic import UUID4 +from mealie.core.config import get_app_settings from mealie.db.db_setup import generate_session -from mealie.repos.repository_factory import AllRepositories -from mealie.schema.group.group_events import GroupEventNotifierPrivate +from mealie.services.event_bus_service.event_bus_listeners import AppriseEventListener, EventListenerBase -from .message_types import EventBusMessage, EventTypes -from .publisher import ApprisePublisher, PublisherLike +from .event_types import Event, EventBusMessage, EventDocumentDataBase, EventTypes + +settings = get_app_settings() +ALGORITHM = "HS256" class EventSource: @@ -35,66 +37,32 @@ class EventSource: class EventBusService: def __init__(self, bg: BackgroundTasks, session=Depends(generate_session)) -> None: self.bg = bg - self._publisher = ApprisePublisher self.session = session self.group_id: UUID4 | None = None - @property - def publisher(self) -> PublisherLike: - return self._publisher() - - def get_urls(self, event_type: EventTypes) -> list[str]: - repos = AllRepositories(self.session) - - notifiers: list[GroupEventNotifierPrivate] = repos.group_event_notifier.by_group( # type: ignore - self.group_id - ).multi_query({"enabled": True}, override_schema=GroupEventNotifierPrivate) - - return [notifier.apprise_url for notifier in notifiers if getattr(notifier.options, event_type.name)] + self.listeners: list[EventListenerBase] = [AppriseEventListener(self.session, self.group_id)] def dispatch( - self, group_id: UUID4, event_type: EventTypes, msg: str = "", event_source: EventSource = None + self, + integration_id: str, + group_id: UUID4, + event_type: EventTypes, + document_data: Optional[EventDocumentDataBase], + message: str = "", ) -> None: self.group_id = group_id - def _dispatch(event_source: EventSource = None): - if urls := self.get_urls(event_type): - if event_source: - urls = EventBusService.update_urls_with_event_source(urls, event_source) - - self.publisher.publish(EventBusMessage.from_type(event_type, body=msg), urls) - - if dispatch_task := _dispatch(event_source=event_source): - self.bg.add_task(dispatch_task) - - def test_publisher(self, url: str) -> None: - self.bg.add_task( - self.publisher.publish, - event=EventBusMessage.from_type(EventTypes.test_message, body="This is a test event."), - notification_urls=[url], + event = Event( + message=EventBusMessage.from_type(event_type, body=message), + event_type=event_type, + integration_id=integration_id, + document_data=document_data, ) - @staticmethod - def update_urls_with_event_source(urls: list[str], event_source: EventSource): - return [ - # We use query params to add custom key: value pairs to the Apprise payload by prepending the key with ":". - EventBusService.merge_query_parameters(url, {f":{k}": v for k, v in event_source.dict().items()}) - # only certain endpoints support the custom key: value pairs, so we only apply them to those endpoints - if EventBusService.is_custom_url(url) else url - for url in urls - ] + self.bg.add_task(self.publish_event, event=event) - @staticmethod - def merge_query_parameters(url: str, params: dict): - scheme, netloc, path, query_string, fragment = urlsplit(url) - - # merge query params - query_params = parse_qs(query_string) - query_params.update(params) - new_query_string = urlencode(query_params, doseq=True) - - return urlunsplit((scheme, netloc, path, new_query_string, fragment)) - - @staticmethod - def is_custom_url(url: str): - return url.split(":", 1)[0].lower() in ["form", "forms", "json", "jsons", "xml", "xmls"] + def publish_event(self, event: Event) -> None: + """Publishes the event to all listeners""" + for listener in self.listeners: + if subscribers := listener.get_subscribers(event): + listener.publish_to_subscribers(event, subscribers) diff --git a/mealie/services/event_bus_service/event_types.py b/mealie/services/event_bus_service/event_types.py new file mode 100644 index 000000000000..e770424614e2 --- /dev/null +++ b/mealie/services/event_bus_service/event_types.py @@ -0,0 +1,148 @@ +import uuid +from datetime import datetime +from enum import Enum, auto + +from pydantic import UUID4 + +from ...schema._mealie.mealie_model import MealieModel + + +class EventTypes(Enum): + """ + The event type defines whether or not a subscriber should receive an event. + + Each event type is represented by a field on the subscriber repository, therefore any changes + made here must also be reflected in the database (and likely requires a database migration). + + If you'd like more granular control over the metadata of the event, e.g. events for sub-records + (like shopping list items), modify the event document type instead (which is not tied to a database entry). + """ + + test_message = auto() + + recipe_created = auto() + recipe_updated = auto() + recipe_deleted = auto() + + user_signup = auto() + + data_migrations = auto() + data_export = auto() + data_import = auto() + + mealplan_entry_created = auto() + + shopping_list_created = auto() + shopping_list_updated = auto() + shopping_list_deleted = auto() + + cookbook_created = auto() + cookbook_updated = auto() + cookbook_deleted = auto() + + tag_created = auto() + tag_updated = auto() + tag_deleted = auto() + + category_created = auto() + category_updated = auto() + category_deleted = auto() + + +class EventDocumentType(Enum): + generic = "generic" + + category = "category" + cookbook = "cookbook" + shopping_list = "shopping_list" + shopping_list_item = "shopping_list_item" + recipe = "recipe" + recipe_bulk_report = "recipe_bulk_report" + tag = "tag" + + +class EventOperation(Enum): + info = "info" + + create = "create" + update = "update" + delete = "delete" + + +class EventDocumentDataBase(MealieModel): + document_type: EventDocumentType + operation: EventOperation + ... + + +class EventCategoryData(EventDocumentDataBase): + document_type = EventDocumentType.category + category_id: UUID4 + + +class EventCookbookData(EventDocumentDataBase): + document_type = EventDocumentType.cookbook + cookbook_id: UUID4 + + +class EventCookbookBulkData(EventDocumentDataBase): + document_type = EventDocumentType.cookbook + cookbook_ids: list[UUID4] + + +class EventShoppingListData(EventDocumentDataBase): + document_type = EventDocumentType.shopping_list + shopping_list_id: UUID4 + + +class EventShoppingListItemData(EventDocumentDataBase): + document_type = EventDocumentType.shopping_list_item + shopping_list_id: UUID4 + shopping_list_item_id: UUID4 + + +class EventShoppingListItemBulkData(EventDocumentDataBase): + document_type = EventDocumentType.shopping_list_item + shopping_list_id: UUID4 + shopping_list_item_ids: list[UUID4] + + +class EventRecipeData(EventDocumentDataBase): + document_type = EventDocumentType.recipe + recipe_slug: str + + +class EventRecipeBulkReportData(EventDocumentDataBase): + document_type = EventDocumentType.recipe_bulk_report + report_id: UUID4 + + +class EventTagData(EventDocumentDataBase): + document_type = EventDocumentType.tag + tag_id: UUID4 + + +class EventBusMessage(MealieModel): + title: str + body: str = "" + + @classmethod + def from_type(cls, event_type: EventTypes, body: str = "") -> "EventBusMessage": + title = event_type.name.replace("_", " ").title() + return cls(title=title, body=body) + + +class Event(MealieModel): + message: EventBusMessage + event_type: EventTypes + integration_id: str + document_data: EventDocumentDataBase + + # set at instantiation + event_id: UUID4 | None + timestamp: datetime | None + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.event_id = uuid.uuid4() + self.timestamp = datetime.now() diff --git a/mealie/services/event_bus_service/message_types.py b/mealie/services/event_bus_service/message_types.py deleted file mode 100644 index ec012f3b06bf..000000000000 --- a/mealie/services/event_bus_service/message_types.py +++ /dev/null @@ -1,47 +0,0 @@ -from enum import Enum, auto - - -class EventTypes(Enum): - test_message = auto() - - recipe_created = auto() - recipe_updated = auto() - recipe_deleted = auto() - - user_signup = auto() - - data_migrations = auto() - data_export = auto() - data_import = auto() - - mealplan_entry_created = auto() - - shopping_list_created = auto() - shopping_list_updated = auto() - shopping_list_deleted = auto() - - cookbook_created = auto() - cookbook_updated = auto() - cookbook_deleted = auto() - - tag_created = auto() - tag_updated = auto() - tag_deleted = auto() - - category_created = auto() - category_updated = auto() - category_deleted = auto() - - -class EventBusMessage: - title: str - body: str = "" - - def __init__(self, title, body) -> None: - self.title = title - self.body = body - - @classmethod - def from_type(cls, event_type: EventTypes, body: str = "") -> "EventBusMessage": - title = event_type.name.replace("_", " ").title() - return cls(title=title, body=body) diff --git a/mealie/services/event_bus_service/publisher.py b/mealie/services/event_bus_service/publisher.py index dec7806b3fbe..73bb6a9d5314 100644 --- a/mealie/services/event_bus_service/publisher.py +++ b/mealie/services/event_bus_service/publisher.py @@ -2,11 +2,11 @@ from typing import Protocol import apprise -from mealie.services.event_bus_service.event_bus_service import EventBusMessage +from mealie.services.event_bus_service.event_types import Event class PublisherLike(Protocol): - def publish(self, event: EventBusMessage, notification_urls: list[str]): + def publish(self, event: Event, notification_urls: list[str]): ... @@ -19,11 +19,18 @@ class ApprisePublisher: self.apprise = apprise.Apprise(asset=asset) self.hard_fail = hard_fail - def publish(self, event: EventBusMessage, notification_urls: list[str]): + def publish(self, event: Event, notification_urls: list[str]): + """Publishses a list of notification URLs""" + + tags = [] for dest in notification_urls: - status = self.apprise.add(dest) + # we tag the url so it only sends each notification once + tag = str(event.event_id) + tags.append(tag) + + status = self.apprise.add(dest, tag=tag) if not status and self.hard_fail: raise Exception("Apprise URL Add Failed") - self.apprise.notify(title=event.title, body=event.body) + self.apprise.notify(title=event.message.title, body=event.message.body, tag=tags) diff --git a/mealie/services/group_services/shopping_lists.py b/mealie/services/group_services/shopping_lists.py index 2812d39ba783..f06b0fe5085d 100644 --- a/mealie/services/group_services/shopping_lists.py +++ b/mealie/services/group_services/shopping_lists.py @@ -70,7 +70,7 @@ class ShoppingListService: # Set References new_refs = [] for ref in inner_item.recipe_references: - ref.shopping_list_item_id = base_item.id + ref.shopping_list_item_id = base_item.id # type: ignore new_refs.append(ref) base_item.recipe_references.extend(new_refs) @@ -80,46 +80,64 @@ class ShoppingListService: return consolidated_list - def consolidate_and_save(self, data: list[ShoppingListItemUpdate]): + def consolidate_and_save( + self, data: list[ShoppingListItemUpdate] + ) -> tuple[list[ShoppingListItemOut], list[ShoppingListItemOut]]: + """ + returns: + - updated_shopping_list_items + - deleted_shopping_list_items + """ # TODO: Convert to update many with single call all_updates = [] + all_deletes = [] keep_ids = [] - for item in self.consolidate_list_items(data): + for item in self.consolidate_list_items(data): # type: ignore updated_data = self.list_items.update(item.id, item) all_updates.append(updated_data) keep_ids.append(updated_data.id) - for item in data: + for item in data: # type: ignore if item.id not in keep_ids: self.list_items.delete(item.id) + all_deletes.append(item) - return all_updates + return all_updates, all_deletes # ======================================================================= # Methods - def add_recipe_ingredients_to_list(self, list_id: UUID4, recipe_id: UUID4) -> ShoppingListOut: + def add_recipe_ingredients_to_list( + self, list_id: UUID4, recipe_id: UUID4 + ) -> tuple[ShoppingListOut, list[ShoppingListItemOut], list[ShoppingListItemOut], list[ShoppingListItemOut]]: + """ + returns: + - updated_shopping_list + - new_shopping_list_items + - updated_shopping_list_items + - deleted_shopping_list_items + """ recipe = self.repos.recipes.get_one(recipe_id, "id") to_create = [] for ingredient in recipe.recipe_ingredient: food_id = None try: - food_id = ingredient.food.id + food_id = ingredient.food.id # type: ignore except AttributeError: pass label_id = None try: - label_id = ingredient.food.label.id + label_id = ingredient.food.label.id # type: ignore except AttributeError: pass unit_id = None try: - unit_id = ingredient.unit.id + unit_id = ingredient.unit.id # type: ignore except AttributeError: pass @@ -142,28 +160,67 @@ class ShoppingListService: ) ) - for item in to_create: - self.repos.group_shopping_list_item.create(item) + new_shopping_list_items = [self.repos.group_shopping_list_item.create(item) for item in to_create] - updated_list = self.shopping_lists.get_one(list_id) - updated_list.list_items = self.consolidate_and_save(updated_list.list_items) + updated_shopping_list = self.shopping_lists.get_one(list_id) + updated_shopping_list_items, deleted_shopping_list_items = self.consolidate_and_save(updated_shopping_list.list_items) # type: ignore + updated_shopping_list.list_items = updated_shopping_list_items not_found = True - for refs in updated_list.recipe_references: + for refs in updated_shopping_list.recipe_references: if refs.recipe_id == recipe_id: refs.recipe_quantity += 1 not_found = False if not_found: - updated_list.recipe_references.append(ShoppingListItemRecipeRef(recipe_id=recipe_id, recipe_quantity=1)) + updated_shopping_list.recipe_references.append( + ShoppingListItemRecipeRef(recipe_id=recipe_id, recipe_quantity=1) # type: ignore + ) - updated_list = self.shopping_lists.update(updated_list.id, updated_list) + updated_shopping_list = self.shopping_lists.update(updated_shopping_list.id, updated_shopping_list) - return updated_list + """ + There can be overlap between the list item collections, so we de-duplicate the lists. - def remove_recipe_ingredients_from_list(self, list_id: UUID4, recipe_id: UUID4) -> ShoppingListOut: + First new items are created, then existing items are updated, and finally some items are deleted, + so we can de-duplicate using this logic + """ + new_items_map = {list_item.id: list_item for list_item in new_shopping_list_items} + updated_items_map = {list_item.id: list_item for list_item in updated_shopping_list_items} + deleted_items_map = {list_item.id: list_item for list_item in deleted_shopping_list_items} + + # if the item was created and then updated, replace the create with the update and remove the update + for id in list(updated_items_map.keys()): + if id in new_items_map: + new_items_map[id] = updated_items_map[id] + del updated_items_map[id] + + # if the item was updated and then deleted, remove the update + updated_shopping_list_items = [ + list_item for id, list_item in updated_items_map.items() if id not in deleted_items_map + ] + + # if the item was created and then deleted, remove it from both lists + new_shopping_list_items = [list_item for id, list_item in new_items_map.items() if id not in deleted_items_map] + deleted_shopping_list_items = [ + list_item for id, list_item in deleted_items_map.items() if id not in new_items_map + ] + + return updated_shopping_list, new_shopping_list_items, updated_shopping_list_items, deleted_shopping_list_items + + def remove_recipe_ingredients_from_list( + self, list_id: UUID4, recipe_id: UUID4 + ) -> tuple[ShoppingListOut, list[ShoppingListItemOut], list[ShoppingListItemOut]]: + """ + returns: + - updated_shopping_list + - updated_shopping_list_items + - deleted_shopping_list_items + """ shopping_list = self.shopping_lists.get_one(list_id) + updated_shopping_list_items = [] + deleted_shopping_list_items = [] for item in shopping_list.list_items: found = False @@ -171,7 +228,7 @@ class ShoppingListService: remove_qty = 0.0 if ref.recipe_id == recipe_id: - self.list_item_refs.delete(ref.id) + self.list_item_refs.delete(ref.id) # type: ignore item.recipe_references.remove(ref) found = True remove_qty = ref.recipe_quantity @@ -183,20 +240,22 @@ class ShoppingListService: if item.quantity <= 0: self.list_items.delete(item.id) + deleted_shopping_list_items.append(item) else: self.list_items.update(item.id, item) + updated_shopping_list_items.append(item) - # Decrament the list recipe reference count - for ref in shopping_list.recipe_references: + # Decrement the list recipe reference count + for ref in shopping_list.recipe_references: # type: ignore if ref.recipe_id == recipe_id: ref.recipe_quantity -= 1 if ref.recipe_quantity <= 0: - self.list_refs.delete(ref.id) + self.list_refs.delete(ref.id) # type: ignore else: - self.list_refs.update(ref.id, ref) + self.list_refs.update(ref.id, ref) # type: ignore break # Save Changes - return self.shopping_lists.get_one(shopping_list.id) + return self.shopping_lists.get_one(shopping_list.id), updated_shopping_list_items, deleted_shopping_list_items diff --git a/tests/integration_tests/user_group_tests/test_group_notifications.py b/tests/integration_tests/user_group_tests/test_group_notifications.py index 1b0f9726f4b9..bdd97e9a400b 100644 --- a/tests/integration_tests/user_group_tests/test_group_notifications.py +++ b/tests/integration_tests/user_group_tests/test_group_notifications.py @@ -1,7 +1,15 @@ from fastapi.testclient import TestClient from mealie.schema.group.group_events import GroupEventNotifierCreate, GroupEventNotifierOptions -from mealie.services.event_bus_service.event_bus_service import EventBusService, EventSource +from mealie.services.event_bus_service.event_bus_listeners import AppriseEventListener +from mealie.services.event_bus_service.event_bus_service import Event +from mealie.services.event_bus_service.event_types import ( + EventBusMessage, + EventDocumentDataBase, + EventDocumentType, + EventOperation, + EventTypes, +) from tests.utils.assertion_helpers import assert_ignore_keys from tests.utils.factories import random_bool, random_email, random_int, random_string from tests.utils.fixture_schemas import TestUser @@ -47,8 +55,13 @@ def notifier_generator(): ).dict(by_alias=True) -def event_source_generator(): - return EventSource(event_type=random_string, item_type=random_string(), item_id=random_int()) +def event_generator(): + return Event( + message=EventBusMessage(title=random_string(), body=random_string()), + event_type=EventTypes.test_message, + integration_id=random_string(), + document_data=EventDocumentDataBase(document_type=EventDocumentType.generic, operation=EventOperation.info), + ) def test_create_notification(api_client: TestClient, unique_user: TestUser): @@ -61,7 +74,7 @@ def test_create_notification(api_client: TestClient, unique_user: TestUser): assert payload_as_dict["name"] == payload["name"] assert payload_as_dict["enabled"] is True - # Ensure Apprise URL Staysa Private + # Ensure Apprise URL Stays Private assert "apprise_url" not in payload_as_dict # Cleanup @@ -79,7 +92,7 @@ def test_ensure_apprise_url_is_secret(api_client: TestClient, unique_user: TestU assert "apprise_url" not in payload_as_dict -def test_update_notification(api_client: TestClient, unique_user: TestUser): +def test_update_apprise_notification(api_client: TestClient, unique_user: TestUser): payload = notifier_generator() response = api_client.post(Routes.base, json=payload, headers=unique_user.token) assert response.status_code == 201 @@ -110,7 +123,7 @@ def test_update_notification(api_client: TestClient, unique_user: TestUser): response = api_client.delete(Routes.item(update_payload["id"]), headers=unique_user.token) -def test_delete_notification(api_client: TestClient, unique_user: TestUser): +def test_delete_apprise_notification(api_client: TestClient, unique_user: TestUser): payload = notifier_generator() response = api_client.post(Routes.base, json=payload, headers=unique_user.token) assert response.status_code == 201 @@ -124,8 +137,8 @@ def test_delete_notification(api_client: TestClient, unique_user: TestUser): assert response.status_code == 404 -def test_event_bus_functions(): - test_event_source = event_source_generator() +def test_apprise_event_bus_listener_functions(): + test_event = event_generator() test_standard_urls = [ "a" + random_string(), @@ -143,15 +156,15 @@ def test_event_bus_functions(): ] # Validate all standard urls are not considered custom - responses = [EventBusService.is_custom_url(url) for url in test_standard_urls] + responses = [AppriseEventListener.is_custom_url(url) for url in test_standard_urls] assert not any(responses) # Validate all custom urls are actually considered custom - responses = [EventBusService.is_custom_url(url) for url in test_custom_urls] + responses = [AppriseEventListener.is_custom_url(url) for url in test_custom_urls] assert all(responses) - updated_standard_urls = EventBusService.update_urls_with_event_source(test_standard_urls, test_event_source) - updated_custom_urls = EventBusService.update_urls_with_event_source(test_custom_urls, test_event_source) + updated_standard_urls = AppriseEventListener.update_urls_with_event_data(test_standard_urls, test_event) + updated_custom_urls = AppriseEventListener.update_urls_with_event_data(test_custom_urls, test_event) # Validate that no URLs are lost when updating them assert len(updated_standard_urls) == len(test_standard_urls)