diff --git a/mealie/core/dependencies/dependencies.py b/mealie/core/dependencies/dependencies.py index ad9caac55c5e..2c277c1ce82e 100644 --- a/mealie/core/dependencies/dependencies.py +++ b/mealie/core/dependencies/dependencies.py @@ -14,6 +14,7 @@ from mealie.core.config import get_app_dirs, get_app_settings from mealie.db.db_setup import generate_session from mealie.repos.all_repositories import get_repositories from mealie.schema.user import PrivateUser, TokenData +from mealie.schema.user.user import DEFAULT_INTEGRATION_ID oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token") oauth2_scheme_soft_fail = OAuth2PasswordBearer(tokenUrl="/api/auth/token", auto_error=False) @@ -83,6 +84,21 @@ async def get_current_user(token: str = Depends(oauth2_scheme), session=Depends( return user +async def get_integration_id(token: str = Depends(oauth2_scheme)) -> str: + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"}, + ) + + try: + decoded_token = jwt.decode(token, settings.SECRET, algorithms=[ALGORITHM]) + return decoded_token.get("integration_id", DEFAULT_INTEGRATION_ID) + + except JWTError as e: + raise credentials_exception from e + + async def get_admin_user(current_user: PrivateUser = Depends(get_current_user)) -> PrivateUser: if not current_user.admin: raise HTTPException(status.HTTP_403_FORBIDDEN) diff --git a/mealie/routes/_base/base_controllers.py b/mealie/routes/_base/base_controllers.py index 71a8c1b4e8d0..6dd63c0cc2e5 100644 --- a/mealie/routes/_base/base_controllers.py +++ b/mealie/routes/_base/base_controllers.py @@ -6,7 +6,7 @@ from pydantic import UUID4 from sqlalchemy.orm import Session from mealie.core.config import get_app_dirs, get_app_settings -from mealie.core.dependencies.dependencies import get_admin_user, get_current_user +from mealie.core.dependencies.dependencies import get_admin_user, get_current_user, get_integration_id from mealie.core.exceptions import mealie_registered_exceptions from mealie.core.root_logger import get_logger from mealie.core.settings.directories import AppDirectories @@ -17,6 +17,8 @@ from mealie.lang.providers import Translator from mealie.repos.all_repositories import AllRepositories from mealie.routes._base.checks import OperationChecks from mealie.schema.user.user import GroupInDB, PrivateUser +from mealie.services.event_bus_service.event_bus_service import EventBusService +from mealie.services.event_bus_service.event_types import EventDocumentDataBase, EventTypes class _BaseController(ABC): @@ -78,6 +80,7 @@ class BaseUserController(_BaseController): """ user: PrivateUser = Depends(get_current_user) + integration_id: str = Depends(get_integration_id) translator: Translator = Depends(local_provider) # Manual Cache @@ -112,3 +115,20 @@ class BaseAdminController(BaseUserController): """ user: PrivateUser = Depends(get_admin_user) + + +class BaseCrudController(BaseUserController): + """ + Base class for all CRUD controllers to facilitate common CRUD functions. + """ + + event_bus: EventBusService = Depends(EventBusService) + + def publish_event(self, event_type: EventTypes, document_data: EventDocumentDataBase, message: str = "") -> None: + self.event_bus.dispatch( + integration_id=self.integration_id, + group_id=self.group_id, + event_type=event_type, + document_data=document_data, + message=message, + ) diff --git a/mealie/routes/groups/controller_cookbooks.py b/mealie/routes/groups/controller_cookbooks.py index bc3730b2a439..d2027cd57d64 100644 --- a/mealie/routes/groups/controller_cookbooks.py +++ b/mealie/routes/groups/controller_cookbooks.py @@ -4,24 +4,25 @@ from fastapi import APIRouter, Depends, HTTPException from pydantic import UUID4 from mealie.core.exceptions import mealie_registered_exceptions -from mealie.routes._base import BaseUserController, controller +from mealie.routes._base import BaseCrudController, controller from mealie.routes._base.mixins import HttpRepo from mealie.routes._base.routers import MealieCrudRoute from mealie.schema import mapper from mealie.schema.cookbook import CreateCookBook, ReadCookBook, RecipeCookBook, SaveCookBook, UpdateCookBook from mealie.schema.cookbook.cookbook import CookBookPagination from mealie.schema.response.pagination import PaginationQuery -from mealie.services.event_bus_service.event_bus_service import EventBusService, EventSource -from mealie.services.event_bus_service.message_types import EventTypes +from mealie.services.event_bus_service.event_types import ( + EventCookbookBulkData, + EventCookbookData, + EventOperation, + EventTypes, +) router = APIRouter(prefix="/groups/cookbooks", tags=["Groups: Cookbooks"], route_class=MealieCrudRoute) @controller(router) -class GroupCookbookController(BaseUserController): - - event_bus: EventBusService = Depends(EventBusService) - +class GroupCookbookController(BaseCrudController): @cached_property def repo(self): return self.repos.cookbooks.by_group(self.group_id) @@ -53,16 +54,16 @@ class GroupCookbookController(BaseUserController): @router.post("", response_model=ReadCookBook, status_code=201) def create_one(self, data: CreateCookBook): data = mapper.cast(data, SaveCookBook, group_id=self.group_id) - val = self.mixins.create_one(data) + cookbook = self.mixins.create_one(data) - if val: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.cookbook_created, - msg=self.t("notifications.generic-created", name=val.name), - event_source=EventSource(event_type="create", item_type="cookbook", item_id=val.id, slug=val.slug), + if cookbook: + self.publish_event( + event_type=EventTypes.cookbook_created, + document_data=EventCookbookData(operation=EventOperation.create, cookbook_id=cookbook.id), + message=self.t("notifications.generic-created", name=cookbook.name), ) - return val + + return cookbook @router.put("", response_model=list[ReadCookBook]) def update_many(self, data: list[UpdateCookBook]): @@ -72,6 +73,14 @@ class GroupCookbookController(BaseUserController): cb = self.mixins.update_one(cookbook, cookbook.id) updated.append(cb) + if updated: + self.publish_event( + event_type=EventTypes.cookbook_updated, + document_data=EventCookbookBulkData( + operation=EventOperation.update, cookbook_ids=[cb.id for cb in updated] + ), + ) + return updated @router.get("/{item_id}", response_model=RecipeCookBook) @@ -96,25 +105,24 @@ class GroupCookbookController(BaseUserController): @router.put("/{item_id}", response_model=ReadCookBook) def update_one(self, item_id: str, data: CreateCookBook): - val = self.mixins.update_one(data, item_id) # type: ignore - if val: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.cookbook_updated, - msg=self.t("notifications.generic-updated", name=val.name), - event_source=EventSource(event_type="update", item_type="cookbook", item_id=val.id, slug=val.slug), + cookbook = self.mixins.update_one(data, item_id) # type: ignore + if cookbook: + self.publish_event( + event_type=EventTypes.cookbook_updated, + document_data=EventCookbookData(operation=EventOperation.update, cookbook_id=cookbook.id), + message=self.t("notifications.generic-updated", name=cookbook.name), ) - return val + return cookbook @router.delete("/{item_id}", response_model=ReadCookBook) def delete_one(self, item_id: str): - val = self.mixins.delete_one(item_id) - if val: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.cookbook_deleted, - msg=self.t("notifications.generic-deleted", name=val.name), - event_source=EventSource(event_type="delete", item_type="cookbook", item_id=val.id, slug=val.slug), + cookbook = self.mixins.delete_one(item_id) + if cookbook: + self.publish_event( + event_type=EventTypes.cookbook_deleted, + document_data=EventCookbookData(operation=EventOperation.delete, cookbook_id=cookbook.id), + message=self.t("notifications.generic-deleted", name=cookbook.name), ) - return val + + return cookbook diff --git a/mealie/routes/groups/controller_group_notifications.py b/mealie/routes/groups/controller_group_notifications.py index 3e133b9f0508..89196b93f90d 100644 --- a/mealie/routes/groups/controller_group_notifications.py +++ b/mealie/routes/groups/controller_group_notifications.py @@ -17,7 +17,16 @@ from mealie.schema.group.group_events import ( ) from mealie.schema.mapper import cast from mealie.schema.response.pagination import PaginationQuery +from mealie.services.event_bus_service.event_bus_listeners import AppriseEventListener from mealie.services.event_bus_service.event_bus_service import EventBusService +from mealie.services.event_bus_service.event_types import ( + Event, + EventBusMessage, + EventDocumentDataBase, + EventDocumentType, + EventOperation, + EventTypes, +) router = APIRouter( prefix="/groups/events/notifications", tags=["Group: Event Notifications"], route_class=MealieCrudRoute @@ -78,7 +87,18 @@ class GroupEventsNotifierController(BaseUserController): # ======================================================================= # Test Event Notifications + # TODO: properly re-implement this with new event listeners @router.post("/{item_id}/test", status_code=204) def test_notification(self, item_id: UUID4): item: GroupEventNotifierPrivate = self.repo.get_one(item_id, override_schema=GroupEventNotifierPrivate) - self.event_bus.test_publisher(item.apprise_url) + + event_type = EventTypes.test_message + test_event = Event( + message=EventBusMessage.from_type(event_type, "test message"), + event_type=event_type, + integration_id="test_event", + document_data=EventDocumentDataBase(document_type=EventDocumentType.generic, operation=EventOperation.info), + ) + + test_listener = AppriseEventListener(self.event_bus.session, self.group_id) + test_listener.publish_to_subscribers(test_event, [item.apprise_url]) diff --git a/mealie/routes/groups/controller_shopping_lists.py b/mealie/routes/groups/controller_shopping_lists.py index 3961133c0d84..567fb06990ee 100644 --- a/mealie/routes/groups/controller_shopping_lists.py +++ b/mealie/routes/groups/controller_shopping_lists.py @@ -3,7 +3,7 @@ from functools import cached_property from fastapi import APIRouter, Depends, Query from pydantic import UUID4 -from mealie.routes._base.base_controllers import BaseUserController +from mealie.routes._base.base_controllers import BaseCrudController from mealie.routes._base.controller import controller from mealie.routes._base.mixins import HttpRepo from mealie.schema.group.group_shopping_list import ( @@ -20,18 +20,20 @@ from mealie.schema.group.group_shopping_list import ( from mealie.schema.mapper import cast from mealie.schema.response.pagination import PaginationQuery from mealie.schema.response.responses import SuccessResponse -from mealie.services.event_bus_service.event_bus_service import EventBusService, EventSource -from mealie.services.event_bus_service.message_types import EventTypes +from mealie.services.event_bus_service.event_types import ( + EventOperation, + EventShoppingListData, + EventShoppingListItemBulkData, + EventShoppingListItemData, + EventTypes, +) from mealie.services.group_services.shopping_lists import ShoppingListService item_router = APIRouter(prefix="/groups/shopping/items", tags=["Group: Shopping List Items"]) @controller(item_router) -class ShoppingListItemController(BaseUserController): - - event_bus: EventBusService = Depends(EventBusService) - +class ShoppingListItemController(BaseCrudController): @cached_property def service(self): return ShoppingListService(self.repos) @@ -79,19 +81,17 @@ class ShoppingListItemController(BaseUserController): shopping_list_item = self.mixins.create_one(data) if shopping_list_item: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.shopping_list_updated, - msg=self.t( + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListItemData( + operation=EventOperation.create, + shopping_list_id=shopping_list_item.shopping_list_id, + shopping_list_item_id=shopping_list_item.id, + ), + message=self.t( "notifications.generic-created", name=f"An item on shopping list {shopping_list_item.shopping_list_id}", ), - event_source=EventSource( - event_type="create", - item_type="shopping-list-item", - item_id=shopping_list_item.id, - shopping_list_id=shopping_list_item.shopping_list_id, - ), ) return shopping_list_item @@ -105,19 +105,17 @@ class ShoppingListItemController(BaseUserController): shopping_list_item = self.mixins.update_one(data, item_id) if shopping_list_item: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.shopping_list_updated, - msg=self.t( + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListItemData( + operation=EventOperation.update, + shopping_list_id=shopping_list_item.shopping_list_id, + shopping_list_item_id=shopping_list_item.id, + ), + message=self.t( "notifications.generic-updated", name=f"An item on shopping list {shopping_list_item.shopping_list_id}", ), - event_source=EventSource( - event_type="update", - item_type="shopping-list-item", - item_id=shopping_list_item.id, - shopping_list_id=shopping_list_item.shopping_list_id, - ), ) return shopping_list_item @@ -127,19 +125,17 @@ class ShoppingListItemController(BaseUserController): shopping_list_item = self.mixins.delete_one(item_id) # type: ignore if shopping_list_item: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.shopping_list_updated, - msg=self.t( + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListItemData( + operation=EventOperation.delete, + shopping_list_id=shopping_list_item.shopping_list_id, + shopping_list_item_id=shopping_list_item.id, + ), + message=self.t( "notifications.generic-deleted", name=f"An item on shopping list {shopping_list_item.shopping_list_id}", ), - event_source=EventSource( - event_type="delete", - item_type="shopping-list-item", - item_id=shopping_list_item.id, - shopping_list_id=shopping_list_item.shopping_list_id, - ), ) return shopping_list_item @@ -149,9 +145,7 @@ router = APIRouter(prefix="/groups/shopping/lists", tags=["Group: Shopping Lists @controller(router) -class ShoppingListController(BaseUserController): - event_bus: EventBusService = Depends(EventBusService) - +class ShoppingListController(BaseCrudController): @cached_property def service(self): return ShoppingListService(self.repos) @@ -180,21 +174,16 @@ class ShoppingListController(BaseUserController): @router.post("", response_model=ShoppingListOut, status_code=201) def create_one(self, data: ShoppingListCreate): save_data = cast(data, ShoppingListSave, group_id=self.user.group_id) - val = self.mixins.create_one(save_data) + shopping_list = self.mixins.create_one(save_data) - if val: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.shopping_list_created, - msg=self.t("notifications.generic-created", name=val.name), - event_source=EventSource( - event_type="create", - item_type="shopping-list", - item_id=val.id, - ), + if shopping_list: + self.publish_event( + event_type=EventTypes.shopping_list_created, + document_data=EventShoppingListData(operation=EventOperation.create, shopping_list_id=shopping_list.id), + message=self.t("notifications.generic-created", name=shopping_list.name), ) - return val + return shopping_list @router.get("/{item_id}", response_model=ShoppingListOut) def get_one(self, item_id: UUID4): @@ -202,54 +191,72 @@ class ShoppingListController(BaseUserController): @router.put("/{item_id}", response_model=ShoppingListOut) def update_one(self, item_id: UUID4, data: ShoppingListUpdate): - data = self.mixins.update_one(data, item_id) # type: ignore - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.shopping_list_updated, - msg=self.t("notifications.generic-updated", name=data.name), - event_source=EventSource( - event_type="update", - item_type="shopping-list", - item_id=data.id, - ), + shopping_list = self.mixins.update_one(data, item_id) # type: ignore + + if shopping_list: + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListData(operation=EventOperation.update, shopping_list_id=shopping_list.id), + message=self.t("notifications.generic-updated", name=shopping_list.name), ) - return data + + return shopping_list @router.delete("/{item_id}", response_model=ShoppingListOut) def delete_one(self, item_id: UUID4): - data = self.mixins.delete_one(item_id) # type: ignore - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.shopping_list_deleted, - msg=self.t("notifications.generic-deleted", name=data.name), - event_source=EventSource( - event_type="delete", - item_type="shopping-list", - item_id=data.id, - ), + shopping_list = self.mixins.delete_one(item_id) # type: ignore + if shopping_list: + self.publish_event( + event_type=EventTypes.shopping_list_deleted, + document_data=EventShoppingListData(operation=EventOperation.delete, shopping_list_id=shopping_list.id), + message=self.t("notifications.generic-deleted", name=shopping_list.name), ) - return data + + return shopping_list # ======================================================================= # Other Operations @router.post("/{item_id}/recipe/{recipe_id}", response_model=ShoppingListOut) def add_recipe_ingredients_to_list(self, item_id: UUID4, recipe_id: UUID4): - shopping_list = self.service.add_recipe_ingredients_to_list(item_id, recipe_id) - if shopping_list: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.shopping_list_updated, - msg=self.t( - "notifications.generic-updated", - name=shopping_list.name, + ( + shopping_list, + new_shopping_list_items, + updated_shopping_list_items, + deleted_shopping_list_items, + ) = self.service.add_recipe_ingredients_to_list(item_id, recipe_id) + + if new_shopping_list_items: + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListItemBulkData( + operation=EventOperation.create, + shopping_list_id=shopping_list.id, + shopping_list_item_ids=[shopping_list_item.id for shopping_list_item in new_shopping_list_items], ), - event_source=EventSource( - event_type="bulk-updated-items", - item_type="shopping-list", - item_id=shopping_list.id, + ) + + if updated_shopping_list_items: + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListItemBulkData( + operation=EventOperation.update, + shopping_list_id=shopping_list.id, + shopping_list_item_ids=[ + shopping_list_item.id for shopping_list_item in updated_shopping_list_items + ], + ), + ) + + if deleted_shopping_list_items: + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListItemBulkData( + operation=EventOperation.delete, + shopping_list_id=shopping_list.id, + shopping_list_item_ids=[ + shopping_list_item.id for shopping_list_item in deleted_shopping_list_items + ], ), ) @@ -257,19 +264,33 @@ class ShoppingListController(BaseUserController): @router.delete("/{item_id}/recipe/{recipe_id}", response_model=ShoppingListOut) def remove_recipe_ingredients_from_list(self, item_id: UUID4, recipe_id: UUID4): - shopping_list = self.service.remove_recipe_ingredients_from_list(item_id, recipe_id) - if shopping_list: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.shopping_list_updated, - msg=self.t( - "notifications.generic-updated", - name=shopping_list.name, + ( + shopping_list, + updated_shopping_list_items, + deleted_shopping_list_items, + ) = self.service.remove_recipe_ingredients_from_list(item_id, recipe_id) + + if updated_shopping_list_items: + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListItemBulkData( + operation=EventOperation.update, + shopping_list_id=shopping_list.id, + shopping_list_item_ids=[ + shopping_list_item.id for shopping_list_item in updated_shopping_list_items + ], ), - event_source=EventSource( - event_type="bulk-updated-items", - item_type="shopping-list", - item_id=shopping_list.id, + ) + + if deleted_shopping_list_items: + self.publish_event( + event_type=EventTypes.shopping_list_updated, + document_data=EventShoppingListItemBulkData( + operation=EventOperation.delete, + shopping_list_id=shopping_list.id, + shopping_list_item_ids=[ + shopping_list_item.id for shopping_list_item in deleted_shopping_list_items + ], ), ) diff --git a/mealie/routes/organizers/controller_categories.py b/mealie/routes/organizers/controller_categories.py index 7db9b9f8c465..3e584225b940 100644 --- a/mealie/routes/organizers/controller_categories.py +++ b/mealie/routes/organizers/controller_categories.py @@ -3,7 +3,7 @@ from functools import cached_property from fastapi import APIRouter, Depends from pydantic import UUID4, BaseModel -from mealie.routes._base import BaseUserController, controller +from mealie.routes._base import BaseCrudController, controller from mealie.routes._base.mixins import HttpRepo from mealie.schema import mapper from mealie.schema.recipe import CategoryIn, RecipeCategoryResponse @@ -11,8 +11,7 @@ from mealie.schema.recipe.recipe import RecipeCategory, RecipeCategoryPagination from mealie.schema.recipe.recipe_category import CategoryBase, CategorySave from mealie.schema.response.pagination import PaginationQuery from mealie.services import urls -from mealie.services.event_bus_service.event_bus_service import EventBusService, EventSource -from mealie.services.event_bus_service.message_types import EventTypes +from mealie.services.event_bus_service.event_types import EventCategoryData, EventOperation, EventTypes router = APIRouter(prefix="/categories", tags=["Organizer: Categories"]) @@ -27,10 +26,7 @@ class CategorySummary(BaseModel): @controller(router) -class RecipeCategoryController(BaseUserController): - - event_bus: EventBusService = Depends(EventBusService) - +class RecipeCategoryController(BaseCrudController): # ========================================================================= # CRUD Operations @cached_property @@ -56,19 +52,19 @@ class RecipeCategoryController(BaseUserController): def create_one(self, category: CategoryIn): """Creates a Category in the database""" save_data = mapper.cast(category, CategorySave, group_id=self.group_id) - data = self.mixins.create_one(save_data) - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.category_created, - msg=self.t( + new_category = self.mixins.create_one(save_data) + if new_category: + self.publish_event( + event_type=EventTypes.category_created, + document_data=EventCategoryData(operation=EventOperation.create, category_id=new_category.id), + message=self.t( "notifications.generic-created-with-url", - name=data.name, - url=urls.category_url(data.slug, self.settings.BASE_URL), + name=new_category.name, + url=urls.category_url(new_category.slug, self.settings.BASE_URL), ), - event_source=EventSource(event_type="create", item_type="category", item_id=data.id, slug=data.slug), ) - return data + + return new_category @router.get("/{item_id}", response_model=CategorySummary) def get_one(self, item_id: UUID4): @@ -81,20 +77,20 @@ class RecipeCategoryController(BaseUserController): def update_one(self, item_id: UUID4, update_data: CategoryIn): """Updates an existing Tag in the database""" save_data = mapper.cast(update_data, CategorySave, group_id=self.group_id) - data = self.mixins.update_one(save_data, item_id) + category = self.mixins.update_one(save_data, item_id) - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.category_updated, - msg=self.t( + if category: + self.publish_event( + event_type=EventTypes.category_updated, + document_data=EventCategoryData(operation=EventOperation.update, category_id=category.id), + message=self.t( "notifications.generic-updated-with-url", - name=data.name, - url=urls.category_url(data.slug, self.settings.BASE_URL), + name=category.name, + url=urls.category_url(category.slug, self.settings.BASE_URL), ), - event_source=EventSource(event_type="update", item_type="category", item_id=data.id, slug=data.slug), ) - return data + + return category @router.delete("/{item_id}") def delete_one(self, item_id: UUID4): @@ -103,12 +99,11 @@ class RecipeCategoryController(BaseUserController): category does not impact a recipe. The category will be removed from any recipes that contain it """ - if data := self.mixins.delete_one(item_id): - self.event_bus.dispatch( - self.user.group_id, - EventTypes.category_deleted, - msg=self.t("notifications.generic-deleted", name=data.name), - event_source=EventSource(event_type="delete", item_type="category", item_id=data.id, slug=data.slug), + if category := self.mixins.delete_one(item_id): + self.publish_event( + event_type=EventTypes.category_deleted, + document_data=EventCategoryData(operation=EventOperation.delete, category_id=category.id), + message=self.t("notifications.generic-deleted", name=category.name), ) # ========================================================================= diff --git a/mealie/routes/organizers/controller_tags.py b/mealie/routes/organizers/controller_tags.py index fc1daec59aa9..5f993890dd2f 100644 --- a/mealie/routes/organizers/controller_tags.py +++ b/mealie/routes/organizers/controller_tags.py @@ -3,7 +3,7 @@ from functools import cached_property from fastapi import APIRouter, Depends, HTTPException, status from pydantic import UUID4 -from mealie.routes._base import BaseUserController, controller +from mealie.routes._base import BaseCrudController, controller from mealie.routes._base.mixins import HttpRepo from mealie.schema import mapper from mealie.schema.recipe import RecipeTagResponse, TagIn @@ -11,17 +11,13 @@ from mealie.schema.recipe.recipe import RecipeTag, RecipeTagPagination from mealie.schema.recipe.recipe_category import TagSave from mealie.schema.response.pagination import PaginationQuery from mealie.services import urls -from mealie.services.event_bus_service.event_bus_service import EventBusService, EventSource -from mealie.services.event_bus_service.message_types import EventTypes +from mealie.services.event_bus_service.event_types import EventOperation, EventTagData, EventTypes router = APIRouter(prefix="/tags", tags=["Organizer: Tags"]) @controller(router) -class TagController(BaseUserController): - - event_bus: EventBusService = Depends(EventBusService) - +class TagController(BaseCrudController): @cached_property def repo(self): return self.repos.tags.by_group(self.group_id) @@ -55,55 +51,58 @@ class TagController(BaseUserController): def create_one(self, tag: TagIn): """Creates a Tag in the database""" save_data = mapper.cast(tag, TagSave, group_id=self.group_id) - data = self.repo.create(save_data) - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.tag_created, - msg=self.t( + new_tag = self.repo.create(save_data) + + if new_tag: + self.publish_event( + event_type=EventTypes.tag_created, + document_data=EventTagData(operation=EventOperation.create, tag_id=new_tag.id), + message=self.t( "notifications.generic-created-with-url", - name=data.name, - url=urls.tag_url(data.slug, self.settings.BASE_URL), + name=new_tag.name, + url=urls.tag_url(new_tag.slug, self.settings.BASE_URL), ), - event_source=EventSource(event_type="create", item_type="tag", item_id=data.id, slug=data.slug), ) - return data + + return new_tag @router.put("/{item_id}", response_model=RecipeTagResponse) def update_one(self, item_id: UUID4, new_tag: TagIn): """Updates an existing Tag in the database""" save_data = mapper.cast(new_tag, TagSave, group_id=self.group_id) - data = self.repo.update(item_id, save_data) - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.tag_updated, - msg=self.t( + tag = self.repo.update(item_id, save_data) + + if tag: + self.publish_event( + event_type=EventTypes.tag_updated, + document_data=EventTagData(operation=EventOperation.update, tag_id=tag.id), + message=self.t( "notifications.generic-updated-with-url", - name=data.name, - url=urls.tag_url(data.slug, self.settings.BASE_URL), + name=tag.name, + url=urls.tag_url(tag.slug, self.settings.BASE_URL), ), - event_source=EventSource(event_type="update", item_type="tag", item_id=data.id, slug=data.slug), ) - return data + + return tag @router.delete("/{item_id}") def delete_recipe_tag(self, item_id: UUID4): - """Removes a recipe tag from the database. Deleting a + """ + Removes a recipe tag from the database. Deleting a tag does not impact a recipe. The tag will be removed - from any recipes that contain it""" + from any recipes that contain it + """ try: - data = self.repo.delete(item_id) + tag = self.repo.delete(item_id) except Exception as e: raise HTTPException(status.HTTP_400_BAD_REQUEST) from e - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.tag_deleted, - msg=self.t("notifications.generic-deleted", name=data.name), - event_source=EventSource(event_type="delete", item_type="tag", item_id=data.id, slug=data.slug), + if tag: + self.publish_event( + event_type=EventTypes.tag_deleted, + document_data=EventTagData(operation=EventOperation.delete, tag_id=tag.id), + message=self.t("notifications.generic-deleted", name=tag.name), ) @router.get("/slug/{tag_slug}", response_model=RecipeTagResponse) diff --git a/mealie/routes/recipe/recipe_crud_routes.py b/mealie/routes/recipe/recipe_crud_routes.py index 7a2f64778de2..8742466452f4 100644 --- a/mealie/routes/recipe/recipe_crud_routes.py +++ b/mealie/routes/recipe/recipe_crud_routes.py @@ -18,7 +18,7 @@ from mealie.core.dependencies.dependencies import temporary_dir, validate_recipe from mealie.core.security import create_recipe_slug_token from mealie.pkgs import cache from mealie.repos.repository_recipes import RepositoryRecipes -from mealie.routes._base import BaseUserController, controller +from mealie.routes._base import BaseCrudController, controller from mealie.routes._base.mixins import HttpRepo from mealie.routes._base.routers import MealieCrudRoute, UserAPIRouter from mealie.schema.recipe import Recipe, RecipeImageTypes, ScrapeRecipe @@ -34,8 +34,12 @@ from mealie.schema.recipe.recipe_scraper import ScrapeRecipeTest from mealie.schema.recipe.request_helpers import RecipeZipTokenResponse, UpdateImageResponse from mealie.schema.response.responses import ErrorResponse from mealie.services import urls -from mealie.services.event_bus_service.event_bus_service import EventBusService, EventSource -from mealie.services.event_bus_service.message_types import EventTypes +from mealie.services.event_bus_service.event_types import ( + EventOperation, + EventRecipeBulkReportData, + EventRecipeData, + EventTypes, +) from mealie.services.recipe.recipe_data_service import InvalidDomainError, NotAnImageError, RecipeDataService from mealie.services.recipe.recipe_service import RecipeService from mealie.services.recipe.template_service import TemplateService @@ -45,7 +49,7 @@ from mealie.services.scraper.scraper import create_from_url from mealie.services.scraper.scraper_strategies import ForceTimeoutException, RecipeScraperPackage -class BaseRecipeController(BaseUserController): +class BaseRecipeController(BaseCrudController): @cached_property def repo(self) -> RepositoryRecipes: return self.repos.recipes.by_group(self.group_id) @@ -119,8 +123,6 @@ router = UserAPIRouter(prefix="/recipes", tags=["Recipe: CRUD"], route_class=Mea @controller(router) class RecipeController(BaseRecipeController): - event_bus: EventBusService = Depends(EventBusService) - def handle_exceptions(self, ex: Exception) -> None: match type(ex): case exceptions.PermissionDenied: @@ -161,17 +163,14 @@ class RecipeController(BaseRecipeController): new_recipe = self.service.create_one(recipe) if new_recipe: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.recipe_created, - msg=self.t( + self.publish_event( + event_type=EventTypes.recipe_created, + document_data=EventRecipeData(operation=EventOperation.create, recipe_slug=new_recipe.slug), + message=self.t( "notifications.generic-created-with-url", name=new_recipe.name, url=urls.recipe_url(new_recipe.slug, self.settings.BASE_URL), ), - event_source=EventSource( - event_type="create", item_type="recipe", item_id=new_recipe.id, slug=new_recipe.slug - ), ) return new_recipe.slug @@ -183,6 +182,11 @@ class RecipeController(BaseRecipeController): report_id = bulk_scraper.get_report_id() bg_tasks.add_task(bulk_scraper.scrape, bulk) + self.publish_event( + event_type=EventTypes.recipe_created, + document_data=EventRecipeBulkReportData(operation=EventOperation.create, report_id=report_id), + ) + return {"reportId": report_id} @router.post("/test-scrape-url") @@ -202,6 +206,11 @@ class RecipeController(BaseRecipeController): def create_recipe_from_zip(self, temp_path=Depends(temporary_zip_path), archive: UploadFile = File(...)): """Create recipe from archive""" recipe = self.service.create_from_zip(archive, temp_path) + self.publish_event( + event_type=EventTypes.recipe_created, + document_data=EventRecipeData(operation=EventOperation.create, recipe_slug=recipe.slug), + ) + return recipe.slug # ================================================================================================================== @@ -215,7 +224,7 @@ class RecipeController(BaseRecipeController): tags: Optional[list[UUID4 | str]] = Query(None), tools: Optional[list[UUID4 | str]] = Query(None), ): - response = self.repo.page_all( + pagination_response = self.repo.page_all( pagination=q, load_food=q.load_food, categories=categories, @@ -223,10 +232,10 @@ class RecipeController(BaseRecipeController): tools=tools, ) - response.set_pagination_guides(router.url_path_for("get_all"), q.dict()) + pagination_response.set_pagination_guides(router.url_path_for("get_all"), q.dict()) new_items = [] - for item in response.items: + for item in pagination_response.items: # Pydantic/FastAPI can't seem to serialize the ingredient field on thier own. new_item = item.__dict__ @@ -235,8 +244,8 @@ class RecipeController(BaseRecipeController): new_items.append(new_item) - response.items = [RecipeSummary.construct(**x) for x in new_items] - json_compatible_response = jsonable_encoder(response) + pagination_response.items = [RecipeSummary.construct(**x) for x in new_items] + json_compatible_response = jsonable_encoder(pagination_response) # Response is returned directly, to avoid validation and improve performance return JSONResponse(content=json_compatible_response) @@ -256,17 +265,14 @@ class RecipeController(BaseRecipeController): return None if new_recipe: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.recipe_created, - msg=self.t( + self.publish_event( + event_type=EventTypes.recipe_created, + document_data=EventRecipeData(operation=EventOperation.create, recipe_slug=new_recipe.slug), + message=self.t( "notifications.generic-created-with-url", name=new_recipe.name, url=urls.recipe_url(new_recipe.slug, self.settings.BASE_URL), ), - event_source=EventSource( - event_type="create", item_type="recipe", item_id=new_recipe.id, slug=new_recipe.slug - ), ) return new_recipe.slug @@ -275,63 +281,60 @@ class RecipeController(BaseRecipeController): def update_one(self, slug: str, data: Recipe): """Updates a recipe by existing slug and data.""" try: - data = self.service.update_one(slug, data) + recipe = self.service.update_one(slug, data) except Exception as e: self.handle_exceptions(e) - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.recipe_updated, - msg=self.t( + if recipe: + self.publish_event( + event_type=EventTypes.recipe_updated, + document_data=EventRecipeData(operation=EventOperation.update, recipe_slug=recipe.slug), + message=self.t( "notifications.generic-updated-with-url", - name=data.name, - url=urls.recipe_url(data.slug, self.settings.BASE_URL), + name=recipe.name, + url=urls.recipe_url(recipe.slug, self.settings.BASE_URL), ), - event_source=EventSource(event_type="update", item_type="recipe", item_id=data.id, slug=data.slug), ) - return data + return recipe @router.patch("/{slug}") def patch_one(self, slug: str, data: Recipe): """Updates a recipe by existing slug and data.""" try: - data = self.service.patch_one(slug, data) + recipe = self.service.patch_one(slug, data) except Exception as e: self.handle_exceptions(e) - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.recipe_updated, - msg=self.t( + if recipe: + self.publish_event( + event_type=EventTypes.recipe_updated, + document_data=EventRecipeData(operation=EventOperation.update, recipe_slug=recipe.slug), + message=self.t( "notifications.generic-updated-with-url", - name=data.name, - url=urls.recipe_url(data.slug, self.settings.BASE_URL), + name=recipe.name, + url=urls.recipe_url(recipe.slug, self.settings.BASE_URL), ), - event_source=EventSource(event_type="update", item_type="recipe", item_id=data.id, slug=data.slug), ) - return data + return recipe @router.delete("/{slug}") def delete_one(self, slug: str): """Deletes a recipe by slug""" try: - data = self.service.delete_one(slug) + recipe = self.service.delete_one(slug) except Exception as e: self.handle_exceptions(e) - if data: - self.event_bus.dispatch( - self.user.group_id, - EventTypes.recipe_deleted, - msg=self.t("notifications.generic-deleted", name=data.name), - event_source=EventSource(event_type="delete", item_type="recipe", item_id=data.id, slug=data.slug), + if recipe: + self.publish_event( + event_type=EventTypes.recipe_deleted, + document_data=EventRecipeData(operation=EventOperation.delete, recipe_slug=recipe.slug), + message=self.t("notifications.generic-deleted", name=recipe.name), ) - return data + return recipe # ================================================================================================================== # Image and Assets diff --git a/mealie/routes/users/api_tokens.py b/mealie/routes/users/api_tokens.py index 90a7015cf194..8b8f31766052 100644 --- a/mealie/routes/users/api_tokens.py +++ b/mealie/routes/users/api_tokens.py @@ -15,17 +15,22 @@ class UserApiTokensController(BaseUserController): @router.post("/api-tokens", status_code=status.HTTP_201_CREATED, response_model=LongLiveTokenOut) def create_api_token( self, - token_name: LongLiveTokenIn, + token_params: LongLiveTokenIn, ): """Create api_token in the Database""" - token_data = {"long_token": True, "id": str(self.user.id)} + token_data = { + "long_token": True, + "id": str(self.user.id), + "name": token_params.name, + "integration_id": token_params.integration_id, + } five_years = timedelta(1825) token = create_access_token(token_data, five_years) token_model = CreateToken( - name=token_name.name, + name=token_params.name, token=token, user_id=self.user.id, ) diff --git a/mealie/schema/user/user.py b/mealie/schema/user/user.py index 2d32257592cc..b3faffee2c75 100644 --- a/mealie/schema/user/user.py +++ b/mealie/schema/user/user.py @@ -16,11 +16,13 @@ from mealie.schema.response.pagination import PaginationBase from ..recipe import CategoryBase +DEFAULT_INTEGRATION_ID = "generic" settings = get_app_settings() class LongLiveTokenIn(MealieModel): name: str + integration_id: str = DEFAULT_INTEGRATION_ID class LongLiveTokenOut(MealieModel): diff --git a/mealie/services/event_bus_service/event_bus_listeners.py b/mealie/services/event_bus_service/event_bus_listeners.py new file mode 100644 index 000000000000..5cf0bed987c0 --- /dev/null +++ b/mealie/services/event_bus_service/event_bus_listeners.py @@ -0,0 +1,80 @@ +import json +from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit + +from fastapi.encoders import jsonable_encoder +from pydantic import UUID4 +from sqlalchemy.orm.session import Session + +from mealie.repos.repository_factory import AllRepositories +from mealie.schema.group.group_events import GroupEventNotifierPrivate + +from .event_types import Event +from .publisher import ApprisePublisher, PublisherLike + + +class EventListenerBase: + def __init__(self, session: Session, group_id: UUID4, publisher: PublisherLike) -> None: + self.session = session + self.group_id = group_id + self.publisher = publisher + + def get_subscribers(self, event: Event) -> list: + """Get a list of all subscribers to this event""" + ... + + def publish_to_subscribers(self, event: Event, subscribers: list) -> None: + """Publishes the event to all subscribers""" + ... + + +class AppriseEventListener(EventListenerBase): + def __init__(self, session: Session, group_id: UUID4) -> None: + super().__init__(session, group_id, ApprisePublisher()) + + def get_subscribers(self, event: Event) -> list[str]: + repos = AllRepositories(self.session) + + notifiers: list[GroupEventNotifierPrivate] = repos.group_event_notifier.by_group( # type: ignore + self.group_id + ).multi_query({"enabled": True}, override_schema=GroupEventNotifierPrivate) + + urls = [notifier.apprise_url for notifier in notifiers if getattr(notifier.options, event.event_type.name)] + urls = AppriseEventListener.update_urls_with_event_data(urls, event) + + return urls + + def publish_to_subscribers(self, event: Event, subscribers: list[str]) -> None: + self.publisher.publish(event, subscribers) + + @staticmethod + def update_urls_with_event_data(urls: list[str], event: Event): + params = { + "event_type": event.event_type.name, + "integration_id": event.integration_id, + "document_data": json.dumps(jsonable_encoder(event.document_data)), + "event_id": str(event.event_id), + "timestamp": event.timestamp.isoformat(), + } + + return [ + # We use query params to add custom key: value pairs to the Apprise payload by prepending the key with ":". + AppriseEventListener.merge_query_parameters(url, {f":{k}": v for k, v in params.items()}) + # only certain endpoints support the custom key: value pairs, so we only apply them to those endpoints + if AppriseEventListener.is_custom_url(url) else url + for url in urls + ] + + @staticmethod + def merge_query_parameters(url: str, params: dict): + scheme, netloc, path, query_string, fragment = urlsplit(url) + + # merge query params + query_params = parse_qs(query_string) + query_params.update(params) + new_query_string = urlencode(query_params, doseq=True) + + return urlunsplit((scheme, netloc, path, new_query_string, fragment)) + + @staticmethod + def is_custom_url(url: str): + return url.split(":", 1)[0].lower() in ["form", "forms", "json", "jsons", "xml", "xmls"] diff --git a/mealie/services/event_bus_service/event_bus_service.py b/mealie/services/event_bus_service/event_bus_service.py index 8015c73210d5..83211e864b86 100644 --- a/mealie/services/event_bus_service/event_bus_service.py +++ b/mealie/services/event_bus_service/event_bus_service.py @@ -1,14 +1,16 @@ -from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit +from typing import Optional from fastapi import BackgroundTasks, Depends from pydantic import UUID4 +from mealie.core.config import get_app_settings from mealie.db.db_setup import generate_session -from mealie.repos.repository_factory import AllRepositories -from mealie.schema.group.group_events import GroupEventNotifierPrivate +from mealie.services.event_bus_service.event_bus_listeners import AppriseEventListener, EventListenerBase -from .message_types import EventBusMessage, EventTypes -from .publisher import ApprisePublisher, PublisherLike +from .event_types import Event, EventBusMessage, EventDocumentDataBase, EventTypes + +settings = get_app_settings() +ALGORITHM = "HS256" class EventSource: @@ -35,66 +37,32 @@ class EventSource: class EventBusService: def __init__(self, bg: BackgroundTasks, session=Depends(generate_session)) -> None: self.bg = bg - self._publisher = ApprisePublisher self.session = session self.group_id: UUID4 | None = None - @property - def publisher(self) -> PublisherLike: - return self._publisher() - - def get_urls(self, event_type: EventTypes) -> list[str]: - repos = AllRepositories(self.session) - - notifiers: list[GroupEventNotifierPrivate] = repos.group_event_notifier.by_group( # type: ignore - self.group_id - ).multi_query({"enabled": True}, override_schema=GroupEventNotifierPrivate) - - return [notifier.apprise_url for notifier in notifiers if getattr(notifier.options, event_type.name)] + self.listeners: list[EventListenerBase] = [AppriseEventListener(self.session, self.group_id)] def dispatch( - self, group_id: UUID4, event_type: EventTypes, msg: str = "", event_source: EventSource = None + self, + integration_id: str, + group_id: UUID4, + event_type: EventTypes, + document_data: Optional[EventDocumentDataBase], + message: str = "", ) -> None: self.group_id = group_id - def _dispatch(event_source: EventSource = None): - if urls := self.get_urls(event_type): - if event_source: - urls = EventBusService.update_urls_with_event_source(urls, event_source) - - self.publisher.publish(EventBusMessage.from_type(event_type, body=msg), urls) - - if dispatch_task := _dispatch(event_source=event_source): - self.bg.add_task(dispatch_task) - - def test_publisher(self, url: str) -> None: - self.bg.add_task( - self.publisher.publish, - event=EventBusMessage.from_type(EventTypes.test_message, body="This is a test event."), - notification_urls=[url], + event = Event( + message=EventBusMessage.from_type(event_type, body=message), + event_type=event_type, + integration_id=integration_id, + document_data=document_data, ) - @staticmethod - def update_urls_with_event_source(urls: list[str], event_source: EventSource): - return [ - # We use query params to add custom key: value pairs to the Apprise payload by prepending the key with ":". - EventBusService.merge_query_parameters(url, {f":{k}": v for k, v in event_source.dict().items()}) - # only certain endpoints support the custom key: value pairs, so we only apply them to those endpoints - if EventBusService.is_custom_url(url) else url - for url in urls - ] + self.bg.add_task(self.publish_event, event=event) - @staticmethod - def merge_query_parameters(url: str, params: dict): - scheme, netloc, path, query_string, fragment = urlsplit(url) - - # merge query params - query_params = parse_qs(query_string) - query_params.update(params) - new_query_string = urlencode(query_params, doseq=True) - - return urlunsplit((scheme, netloc, path, new_query_string, fragment)) - - @staticmethod - def is_custom_url(url: str): - return url.split(":", 1)[0].lower() in ["form", "forms", "json", "jsons", "xml", "xmls"] + def publish_event(self, event: Event) -> None: + """Publishes the event to all listeners""" + for listener in self.listeners: + if subscribers := listener.get_subscribers(event): + listener.publish_to_subscribers(event, subscribers) diff --git a/mealie/services/event_bus_service/event_types.py b/mealie/services/event_bus_service/event_types.py new file mode 100644 index 000000000000..e770424614e2 --- /dev/null +++ b/mealie/services/event_bus_service/event_types.py @@ -0,0 +1,148 @@ +import uuid +from datetime import datetime +from enum import Enum, auto + +from pydantic import UUID4 + +from ...schema._mealie.mealie_model import MealieModel + + +class EventTypes(Enum): + """ + The event type defines whether or not a subscriber should receive an event. + + Each event type is represented by a field on the subscriber repository, therefore any changes + made here must also be reflected in the database (and likely requires a database migration). + + If you'd like more granular control over the metadata of the event, e.g. events for sub-records + (like shopping list items), modify the event document type instead (which is not tied to a database entry). + """ + + test_message = auto() + + recipe_created = auto() + recipe_updated = auto() + recipe_deleted = auto() + + user_signup = auto() + + data_migrations = auto() + data_export = auto() + data_import = auto() + + mealplan_entry_created = auto() + + shopping_list_created = auto() + shopping_list_updated = auto() + shopping_list_deleted = auto() + + cookbook_created = auto() + cookbook_updated = auto() + cookbook_deleted = auto() + + tag_created = auto() + tag_updated = auto() + tag_deleted = auto() + + category_created = auto() + category_updated = auto() + category_deleted = auto() + + +class EventDocumentType(Enum): + generic = "generic" + + category = "category" + cookbook = "cookbook" + shopping_list = "shopping_list" + shopping_list_item = "shopping_list_item" + recipe = "recipe" + recipe_bulk_report = "recipe_bulk_report" + tag = "tag" + + +class EventOperation(Enum): + info = "info" + + create = "create" + update = "update" + delete = "delete" + + +class EventDocumentDataBase(MealieModel): + document_type: EventDocumentType + operation: EventOperation + ... + + +class EventCategoryData(EventDocumentDataBase): + document_type = EventDocumentType.category + category_id: UUID4 + + +class EventCookbookData(EventDocumentDataBase): + document_type = EventDocumentType.cookbook + cookbook_id: UUID4 + + +class EventCookbookBulkData(EventDocumentDataBase): + document_type = EventDocumentType.cookbook + cookbook_ids: list[UUID4] + + +class EventShoppingListData(EventDocumentDataBase): + document_type = EventDocumentType.shopping_list + shopping_list_id: UUID4 + + +class EventShoppingListItemData(EventDocumentDataBase): + document_type = EventDocumentType.shopping_list_item + shopping_list_id: UUID4 + shopping_list_item_id: UUID4 + + +class EventShoppingListItemBulkData(EventDocumentDataBase): + document_type = EventDocumentType.shopping_list_item + shopping_list_id: UUID4 + shopping_list_item_ids: list[UUID4] + + +class EventRecipeData(EventDocumentDataBase): + document_type = EventDocumentType.recipe + recipe_slug: str + + +class EventRecipeBulkReportData(EventDocumentDataBase): + document_type = EventDocumentType.recipe_bulk_report + report_id: UUID4 + + +class EventTagData(EventDocumentDataBase): + document_type = EventDocumentType.tag + tag_id: UUID4 + + +class EventBusMessage(MealieModel): + title: str + body: str = "" + + @classmethod + def from_type(cls, event_type: EventTypes, body: str = "") -> "EventBusMessage": + title = event_type.name.replace("_", " ").title() + return cls(title=title, body=body) + + +class Event(MealieModel): + message: EventBusMessage + event_type: EventTypes + integration_id: str + document_data: EventDocumentDataBase + + # set at instantiation + event_id: UUID4 | None + timestamp: datetime | None + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.event_id = uuid.uuid4() + self.timestamp = datetime.now() diff --git a/mealie/services/event_bus_service/message_types.py b/mealie/services/event_bus_service/message_types.py deleted file mode 100644 index ec012f3b06bf..000000000000 --- a/mealie/services/event_bus_service/message_types.py +++ /dev/null @@ -1,47 +0,0 @@ -from enum import Enum, auto - - -class EventTypes(Enum): - test_message = auto() - - recipe_created = auto() - recipe_updated = auto() - recipe_deleted = auto() - - user_signup = auto() - - data_migrations = auto() - data_export = auto() - data_import = auto() - - mealplan_entry_created = auto() - - shopping_list_created = auto() - shopping_list_updated = auto() - shopping_list_deleted = auto() - - cookbook_created = auto() - cookbook_updated = auto() - cookbook_deleted = auto() - - tag_created = auto() - tag_updated = auto() - tag_deleted = auto() - - category_created = auto() - category_updated = auto() - category_deleted = auto() - - -class EventBusMessage: - title: str - body: str = "" - - def __init__(self, title, body) -> None: - self.title = title - self.body = body - - @classmethod - def from_type(cls, event_type: EventTypes, body: str = "") -> "EventBusMessage": - title = event_type.name.replace("_", " ").title() - return cls(title=title, body=body) diff --git a/mealie/services/event_bus_service/publisher.py b/mealie/services/event_bus_service/publisher.py index dec7806b3fbe..73bb6a9d5314 100644 --- a/mealie/services/event_bus_service/publisher.py +++ b/mealie/services/event_bus_service/publisher.py @@ -2,11 +2,11 @@ from typing import Protocol import apprise -from mealie.services.event_bus_service.event_bus_service import EventBusMessage +from mealie.services.event_bus_service.event_types import Event class PublisherLike(Protocol): - def publish(self, event: EventBusMessage, notification_urls: list[str]): + def publish(self, event: Event, notification_urls: list[str]): ... @@ -19,11 +19,18 @@ class ApprisePublisher: self.apprise = apprise.Apprise(asset=asset) self.hard_fail = hard_fail - def publish(self, event: EventBusMessage, notification_urls: list[str]): + def publish(self, event: Event, notification_urls: list[str]): + """Publishses a list of notification URLs""" + + tags = [] for dest in notification_urls: - status = self.apprise.add(dest) + # we tag the url so it only sends each notification once + tag = str(event.event_id) + tags.append(tag) + + status = self.apprise.add(dest, tag=tag) if not status and self.hard_fail: raise Exception("Apprise URL Add Failed") - self.apprise.notify(title=event.title, body=event.body) + self.apprise.notify(title=event.message.title, body=event.message.body, tag=tags) diff --git a/mealie/services/group_services/shopping_lists.py b/mealie/services/group_services/shopping_lists.py index 2812d39ba783..f06b0fe5085d 100644 --- a/mealie/services/group_services/shopping_lists.py +++ b/mealie/services/group_services/shopping_lists.py @@ -70,7 +70,7 @@ class ShoppingListService: # Set References new_refs = [] for ref in inner_item.recipe_references: - ref.shopping_list_item_id = base_item.id + ref.shopping_list_item_id = base_item.id # type: ignore new_refs.append(ref) base_item.recipe_references.extend(new_refs) @@ -80,46 +80,64 @@ class ShoppingListService: return consolidated_list - def consolidate_and_save(self, data: list[ShoppingListItemUpdate]): + def consolidate_and_save( + self, data: list[ShoppingListItemUpdate] + ) -> tuple[list[ShoppingListItemOut], list[ShoppingListItemOut]]: + """ + returns: + - updated_shopping_list_items + - deleted_shopping_list_items + """ # TODO: Convert to update many with single call all_updates = [] + all_deletes = [] keep_ids = [] - for item in self.consolidate_list_items(data): + for item in self.consolidate_list_items(data): # type: ignore updated_data = self.list_items.update(item.id, item) all_updates.append(updated_data) keep_ids.append(updated_data.id) - for item in data: + for item in data: # type: ignore if item.id not in keep_ids: self.list_items.delete(item.id) + all_deletes.append(item) - return all_updates + return all_updates, all_deletes # ======================================================================= # Methods - def add_recipe_ingredients_to_list(self, list_id: UUID4, recipe_id: UUID4) -> ShoppingListOut: + def add_recipe_ingredients_to_list( + self, list_id: UUID4, recipe_id: UUID4 + ) -> tuple[ShoppingListOut, list[ShoppingListItemOut], list[ShoppingListItemOut], list[ShoppingListItemOut]]: + """ + returns: + - updated_shopping_list + - new_shopping_list_items + - updated_shopping_list_items + - deleted_shopping_list_items + """ recipe = self.repos.recipes.get_one(recipe_id, "id") to_create = [] for ingredient in recipe.recipe_ingredient: food_id = None try: - food_id = ingredient.food.id + food_id = ingredient.food.id # type: ignore except AttributeError: pass label_id = None try: - label_id = ingredient.food.label.id + label_id = ingredient.food.label.id # type: ignore except AttributeError: pass unit_id = None try: - unit_id = ingredient.unit.id + unit_id = ingredient.unit.id # type: ignore except AttributeError: pass @@ -142,28 +160,67 @@ class ShoppingListService: ) ) - for item in to_create: - self.repos.group_shopping_list_item.create(item) + new_shopping_list_items = [self.repos.group_shopping_list_item.create(item) for item in to_create] - updated_list = self.shopping_lists.get_one(list_id) - updated_list.list_items = self.consolidate_and_save(updated_list.list_items) + updated_shopping_list = self.shopping_lists.get_one(list_id) + updated_shopping_list_items, deleted_shopping_list_items = self.consolidate_and_save(updated_shopping_list.list_items) # type: ignore + updated_shopping_list.list_items = updated_shopping_list_items not_found = True - for refs in updated_list.recipe_references: + for refs in updated_shopping_list.recipe_references: if refs.recipe_id == recipe_id: refs.recipe_quantity += 1 not_found = False if not_found: - updated_list.recipe_references.append(ShoppingListItemRecipeRef(recipe_id=recipe_id, recipe_quantity=1)) + updated_shopping_list.recipe_references.append( + ShoppingListItemRecipeRef(recipe_id=recipe_id, recipe_quantity=1) # type: ignore + ) - updated_list = self.shopping_lists.update(updated_list.id, updated_list) + updated_shopping_list = self.shopping_lists.update(updated_shopping_list.id, updated_shopping_list) - return updated_list + """ + There can be overlap between the list item collections, so we de-duplicate the lists. - def remove_recipe_ingredients_from_list(self, list_id: UUID4, recipe_id: UUID4) -> ShoppingListOut: + First new items are created, then existing items are updated, and finally some items are deleted, + so we can de-duplicate using this logic + """ + new_items_map = {list_item.id: list_item for list_item in new_shopping_list_items} + updated_items_map = {list_item.id: list_item for list_item in updated_shopping_list_items} + deleted_items_map = {list_item.id: list_item for list_item in deleted_shopping_list_items} + + # if the item was created and then updated, replace the create with the update and remove the update + for id in list(updated_items_map.keys()): + if id in new_items_map: + new_items_map[id] = updated_items_map[id] + del updated_items_map[id] + + # if the item was updated and then deleted, remove the update + updated_shopping_list_items = [ + list_item for id, list_item in updated_items_map.items() if id not in deleted_items_map + ] + + # if the item was created and then deleted, remove it from both lists + new_shopping_list_items = [list_item for id, list_item in new_items_map.items() if id not in deleted_items_map] + deleted_shopping_list_items = [ + list_item for id, list_item in deleted_items_map.items() if id not in new_items_map + ] + + return updated_shopping_list, new_shopping_list_items, updated_shopping_list_items, deleted_shopping_list_items + + def remove_recipe_ingredients_from_list( + self, list_id: UUID4, recipe_id: UUID4 + ) -> tuple[ShoppingListOut, list[ShoppingListItemOut], list[ShoppingListItemOut]]: + """ + returns: + - updated_shopping_list + - updated_shopping_list_items + - deleted_shopping_list_items + """ shopping_list = self.shopping_lists.get_one(list_id) + updated_shopping_list_items = [] + deleted_shopping_list_items = [] for item in shopping_list.list_items: found = False @@ -171,7 +228,7 @@ class ShoppingListService: remove_qty = 0.0 if ref.recipe_id == recipe_id: - self.list_item_refs.delete(ref.id) + self.list_item_refs.delete(ref.id) # type: ignore item.recipe_references.remove(ref) found = True remove_qty = ref.recipe_quantity @@ -183,20 +240,22 @@ class ShoppingListService: if item.quantity <= 0: self.list_items.delete(item.id) + deleted_shopping_list_items.append(item) else: self.list_items.update(item.id, item) + updated_shopping_list_items.append(item) - # Decrament the list recipe reference count - for ref in shopping_list.recipe_references: + # Decrement the list recipe reference count + for ref in shopping_list.recipe_references: # type: ignore if ref.recipe_id == recipe_id: ref.recipe_quantity -= 1 if ref.recipe_quantity <= 0: - self.list_refs.delete(ref.id) + self.list_refs.delete(ref.id) # type: ignore else: - self.list_refs.update(ref.id, ref) + self.list_refs.update(ref.id, ref) # type: ignore break # Save Changes - return self.shopping_lists.get_one(shopping_list.id) + return self.shopping_lists.get_one(shopping_list.id), updated_shopping_list_items, deleted_shopping_list_items diff --git a/tests/integration_tests/user_group_tests/test_group_notifications.py b/tests/integration_tests/user_group_tests/test_group_notifications.py index 1b0f9726f4b9..bdd97e9a400b 100644 --- a/tests/integration_tests/user_group_tests/test_group_notifications.py +++ b/tests/integration_tests/user_group_tests/test_group_notifications.py @@ -1,7 +1,15 @@ from fastapi.testclient import TestClient from mealie.schema.group.group_events import GroupEventNotifierCreate, GroupEventNotifierOptions -from mealie.services.event_bus_service.event_bus_service import EventBusService, EventSource +from mealie.services.event_bus_service.event_bus_listeners import AppriseEventListener +from mealie.services.event_bus_service.event_bus_service import Event +from mealie.services.event_bus_service.event_types import ( + EventBusMessage, + EventDocumentDataBase, + EventDocumentType, + EventOperation, + EventTypes, +) from tests.utils.assertion_helpers import assert_ignore_keys from tests.utils.factories import random_bool, random_email, random_int, random_string from tests.utils.fixture_schemas import TestUser @@ -47,8 +55,13 @@ def notifier_generator(): ).dict(by_alias=True) -def event_source_generator(): - return EventSource(event_type=random_string, item_type=random_string(), item_id=random_int()) +def event_generator(): + return Event( + message=EventBusMessage(title=random_string(), body=random_string()), + event_type=EventTypes.test_message, + integration_id=random_string(), + document_data=EventDocumentDataBase(document_type=EventDocumentType.generic, operation=EventOperation.info), + ) def test_create_notification(api_client: TestClient, unique_user: TestUser): @@ -61,7 +74,7 @@ def test_create_notification(api_client: TestClient, unique_user: TestUser): assert payload_as_dict["name"] == payload["name"] assert payload_as_dict["enabled"] is True - # Ensure Apprise URL Staysa Private + # Ensure Apprise URL Stays Private assert "apprise_url" not in payload_as_dict # Cleanup @@ -79,7 +92,7 @@ def test_ensure_apprise_url_is_secret(api_client: TestClient, unique_user: TestU assert "apprise_url" not in payload_as_dict -def test_update_notification(api_client: TestClient, unique_user: TestUser): +def test_update_apprise_notification(api_client: TestClient, unique_user: TestUser): payload = notifier_generator() response = api_client.post(Routes.base, json=payload, headers=unique_user.token) assert response.status_code == 201 @@ -110,7 +123,7 @@ def test_update_notification(api_client: TestClient, unique_user: TestUser): response = api_client.delete(Routes.item(update_payload["id"]), headers=unique_user.token) -def test_delete_notification(api_client: TestClient, unique_user: TestUser): +def test_delete_apprise_notification(api_client: TestClient, unique_user: TestUser): payload = notifier_generator() response = api_client.post(Routes.base, json=payload, headers=unique_user.token) assert response.status_code == 201 @@ -124,8 +137,8 @@ def test_delete_notification(api_client: TestClient, unique_user: TestUser): assert response.status_code == 404 -def test_event_bus_functions(): - test_event_source = event_source_generator() +def test_apprise_event_bus_listener_functions(): + test_event = event_generator() test_standard_urls = [ "a" + random_string(), @@ -143,15 +156,15 @@ def test_event_bus_functions(): ] # Validate all standard urls are not considered custom - responses = [EventBusService.is_custom_url(url) for url in test_standard_urls] + responses = [AppriseEventListener.is_custom_url(url) for url in test_standard_urls] assert not any(responses) # Validate all custom urls are actually considered custom - responses = [EventBusService.is_custom_url(url) for url in test_custom_urls] + responses = [AppriseEventListener.is_custom_url(url) for url in test_custom_urls] assert all(responses) - updated_standard_urls = EventBusService.update_urls_with_event_source(test_standard_urls, test_event_source) - updated_custom_urls = EventBusService.update_urls_with_event_source(test_custom_urls, test_event_source) + updated_standard_urls = AppriseEventListener.update_urls_with_event_data(test_standard_urls, test_event) + updated_custom_urls = AppriseEventListener.update_urls_with_event_data(test_custom_urls, test_event) # Validate that no URLs are lost when updating them assert len(updated_standard_urls) == len(test_standard_urls)