import inspect import json import os import random import shutil import tempfile from collections.abc import Generator from pathlib import Path from uuid import uuid4 from zipfile import ZipFile import pytest from bs4 import BeautifulSoup from fastapi.testclient import TestClient from httpx import Response from pytest import MonkeyPatch from recipe_scrapers._abstract import AbstractScraper from recipe_scrapers._schemaorg import SchemaOrg from recipe_scrapers.plugins import SchemaOrgFillPlugin from slugify import slugify from mealie.pkgs.safehttp.transport import AsyncSafeTransport from mealie.schema.cookbook.cookbook import SaveCookBook from mealie.schema.recipe.recipe import Recipe, RecipeCategory, RecipeSummary, RecipeTag from mealie.schema.recipe.recipe_category import CategorySave, TagSave from mealie.schema.recipe.recipe_notes import RecipeNote from mealie.schema.recipe.recipe_tool import RecipeToolSave from mealie.services.recipe.recipe_data_service import RecipeDataService from mealie.services.scraper.recipe_scraper import DEFAULT_SCRAPER_STRATEGIES from tests import utils from tests.utils import api_routes from tests.utils.factories import random_int, random_string from tests.utils.fixture_schemas import TestUser from tests.utils.recipe_data import get_recipe_test_cases recipe_test_data = get_recipe_test_cases() @pytest.fixture(scope="module") def tempdir() -> Generator[str, None, None]: with tempfile.TemporaryDirectory() as td: yield td def zip_recipe(tempdir: str, recipe: RecipeSummary) -> dict: data_file = tempfile.NamedTemporaryFile(mode="w+", dir=tempdir, suffix=".json", delete=False) json.dump(json.loads(recipe.model_dump_json()), data_file) data_file.flush() zip_file = shutil.make_archive(os.path.join(tempdir, "zipfile"), "zip") with ZipFile(zip_file, "w") as zf: zf.write(data_file.name) return {"archive": Path(zip_file).read_bytes()} def get_init(html_path: Path): """ Override the init method of the abstract scraper to return a bootstrapped init function that serves the html from the given path instead of calling the url. """ def init_override( self, url, proxies: str | None = None, timeout: float | tuple | None = None, wild_mode: bool | None = False, **_, ): page_data = html_path.read_bytes() url = "https://test.example.com/" self.wild_mode = wild_mode self.soup = BeautifulSoup(page_data, "html.parser") self.url = url self.schema = SchemaOrg(page_data) # attach the SchemaOrgFill plugin if not hasattr(self.__class__, "plugins_initialized"): for name, _ in inspect.getmembers(self, inspect.ismethod): # type: ignore current_method = getattr(self.__class__, name) current_method = SchemaOrgFillPlugin.run(current_method) setattr(self.__class__, name, current_method) self.__class__.plugins_initialized = True return init_override def open_graph_override(html: str): async def get_html(self, url: str) -> str: return html return get_html def test_create_by_url( api_client: TestClient, unique_user: TestUser, monkeypatch: MonkeyPatch, ): for recipe_data in recipe_test_data: # Override init function for AbstractScraper to use the test html instead of calling the url monkeypatch.setattr( AbstractScraper, "__init__", get_init(recipe_data.html_file), ) # Override the get_html method of the RecipeScraperOpenGraph to return the test html for scraper_cls in DEFAULT_SCRAPER_STRATEGIES: monkeypatch.setattr( scraper_cls, "get_html", open_graph_override(recipe_data.html_file.read_text()), ) # Skip AsyncSafeTransport requests async def return_empty_response(*args, **kwargs): return Response(200, content=b"") monkeypatch.setattr( AsyncSafeTransport, "handle_async_request", return_empty_response, ) # Skip image downloader monkeypatch.setattr( RecipeDataService, "scrape_image", lambda *_: "TEST_IMAGE", ) api_client.delete(api_routes.recipes_slug(recipe_data.expected_slug), headers=unique_user.token) response = api_client.post( api_routes.recipes_create_url, json={"url": recipe_data.url, "include_tags": recipe_data.include_tags}, headers=unique_user.token, ) assert response.status_code == 201 assert json.loads(response.text) == recipe_data.expected_slug recipe = api_client.get(api_routes.recipes_slug(recipe_data.expected_slug), headers=unique_user.token) assert recipe.status_code == 200 recipe_dict: dict = json.loads(recipe.text) assert recipe_dict["slug"] == recipe_data.expected_slug assert len(recipe_dict["recipeInstructions"]) == recipe_data.num_steps assert len(recipe_dict["recipeIngredient"]) == recipe_data.num_ingredients if not recipe_data.include_tags: return expected_tags = recipe_data.expected_tags or set() assert len(recipe_dict["tags"]) == len(expected_tags) for tag in recipe_dict["tags"]: assert tag["name"] in expected_tags @pytest.mark.parametrize("use_json", [True, False]) def test_create_by_html_or_json( api_client: TestClient, unique_user: TestUser, monkeypatch: MonkeyPatch, use_json: bool, ): # Skip image downloader monkeypatch.setattr( RecipeDataService, "scrape_image", lambda *_: "TEST_IMAGE", ) recipe_data = recipe_test_data[0] api_client.delete(api_routes.recipes_slug(recipe_data.expected_slug), headers=unique_user.token) data = recipe_data.html_file.read_text() if use_json: soup = BeautifulSoup(data, "lxml") ld_json_data = soup.find("script", type="application/ld+json") if ld_json_data: data = json.dumps(json.loads(ld_json_data.string)) else: data = "{}" response = api_client.post( api_routes.recipes_create_html_or_json, json={"data": data, "include_tags": recipe_data.include_tags}, headers=unique_user.token, ) assert response.status_code == 201 assert json.loads(response.text) == recipe_data.expected_slug recipe = api_client.get(api_routes.recipes_slug(recipe_data.expected_slug), headers=unique_user.token) assert recipe.status_code == 200 recipe_dict: dict = json.loads(recipe.text) assert recipe_dict["slug"] == recipe_data.expected_slug assert len(recipe_dict["recipeInstructions"]) == recipe_data.num_steps assert len(recipe_dict["recipeIngredient"]) == recipe_data.num_ingredients if not recipe_data.include_tags: return expected_tags = recipe_data.expected_tags or set() assert len(recipe_dict["tags"]) == len(expected_tags) for tag in recipe_dict["tags"]: assert tag["name"] in expected_tags def test_create_recipe_from_zip(api_client: TestClient, unique_user: TestUser, tempdir: str): database = unique_user.repos recipe_name = random_string() recipe = RecipeSummary( id=uuid4(), user_id=unique_user.user_id, group_id=unique_user.group_id, name=recipe_name, slug=recipe_name, ) r = api_client.post(api_routes.recipes_create_zip, files=zip_recipe(tempdir, recipe), headers=unique_user.token) assert r.status_code == 201 fetched_recipe = database.recipes.get_by_slug(unique_user.group_id, recipe.slug) assert fetched_recipe def test_create_recipe_from_zip_invalid_group(api_client: TestClient, unique_user: TestUser, tempdir: str): database = unique_user.repos recipe_name = random_string() recipe = RecipeSummary( id=uuid4(), user_id=unique_user.user_id, group_id=uuid4(), name=recipe_name, slug=recipe_name, ) r = api_client.post(api_routes.recipes_create_zip, files=zip_recipe(tempdir, recipe), headers=unique_user.token) assert r.status_code == 201 fetched_recipe = database.recipes.get_by_slug(unique_user.group_id, recipe.slug) assert fetched_recipe # the group should always be set to the current user's group assert str(fetched_recipe.group_id) == str(unique_user.group_id) def test_create_recipe_from_zip_invalid_user(api_client: TestClient, unique_user: TestUser, tempdir: str): database = unique_user.repos recipe_name = random_string() recipe = RecipeSummary( id=uuid4(), user_id=uuid4(), group_id=unique_user.group_id, name=recipe_name, slug=recipe_name, ) r = api_client.post(api_routes.recipes_create_zip, files=zip_recipe(tempdir, recipe), headers=unique_user.token) assert r.status_code == 201 fetched_recipe = database.recipes.get_by_slug(unique_user.group_id, recipe.slug) assert fetched_recipe # invalid users should default to the current user assert str(fetched_recipe.user_id) == str(unique_user.user_id) def test_create_recipe_from_zip_existing_category(api_client: TestClient, unique_user: TestUser, tempdir: str): database = unique_user.repos categories = database.categories.create_many( [{"name": random_string(), "group_id": unique_user.group_id} for _ in range(random_int(5, 10))] ) category = random.choice(categories) recipe_name = random_string() recipe = RecipeSummary( id=uuid4(), user_id=unique_user.user_id, group_id=unique_user.group_id, name=recipe_name, slug=recipe_name, recipe_category=[category], ) r = api_client.post(api_routes.recipes_create_zip, files=zip_recipe(tempdir, recipe), headers=unique_user.token) assert r.status_code == 201 fetched_recipe = database.recipes.get_by_slug(unique_user.group_id, recipe.slug) assert fetched_recipe assert fetched_recipe.recipe_category assert len(fetched_recipe.recipe_category) == 1 assert str(fetched_recipe.recipe_category[0].id) == str(category.id) def test_create_recipe_from_zip_existing_tag(api_client: TestClient, unique_user: TestUser, tempdir: str): database = unique_user.repos tags = database.tags.create_many( [{"name": random_string(), "group_id": unique_user.group_id} for _ in range(random_int(5, 10))] ) tag = random.choice(tags) recipe_name = random_string() recipe = RecipeSummary( id=uuid4(), user_id=unique_user.user_id, group_id=unique_user.group_id, name=recipe_name, slug=recipe_name, tags=[tag], ) r = api_client.post(api_routes.recipes_create_zip, files=zip_recipe(tempdir, recipe), headers=unique_user.token) assert r.status_code == 201 fetched_recipe = database.recipes.get_by_slug(unique_user.group_id, recipe.slug) assert fetched_recipe assert fetched_recipe.tags assert len(fetched_recipe.tags) == 1 assert str(fetched_recipe.tags[0].id) == str(tag.id) def test_create_recipe_from_zip_existing_category_wrong_ids( api_client: TestClient, unique_user: TestUser, tempdir: str ): database = unique_user.repos categories = database.categories.create_many( [{"name": random_string(), "group_id": unique_user.group_id} for _ in range(random_int(5, 10))] ) category = random.choice(categories) invalid_category = RecipeCategory(id=uuid4(), name=category.name, slug=category.slug) recipe_name = random_string() recipe = RecipeSummary( id=uuid4(), user_id=unique_user.user_id, group_id=unique_user.group_id, name=recipe_name, slug=recipe_name, recipe_category=[invalid_category], ) r = api_client.post(api_routes.recipes_create_zip, files=zip_recipe(tempdir, recipe), headers=unique_user.token) assert r.status_code == 201 fetched_recipe = database.recipes.get_by_slug(unique_user.group_id, recipe.slug) assert fetched_recipe assert fetched_recipe.recipe_category assert len(fetched_recipe.recipe_category) == 1 assert str(fetched_recipe.recipe_category[0].id) == str(category.id) def test_create_recipe_from_zip_existing_tag_wrong_ids(api_client: TestClient, unique_user: TestUser, tempdir: str): database = unique_user.repos tags = database.tags.create_many( [{"name": random_string(), "group_id": unique_user.group_id} for _ in range(random_int(5, 10))] ) tag = random.choice(tags) invalid_tag = RecipeTag(id=uuid4(), name=tag.name, slug=tag.slug) recipe_name = random_string() recipe = RecipeSummary( id=uuid4(), user_id=unique_user.user_id, group_id=unique_user.group_id, name=recipe_name, slug=recipe_name, tags=[invalid_tag], ) r = api_client.post(api_routes.recipes_create_zip, files=zip_recipe(tempdir, recipe), headers=unique_user.token) assert r.status_code == 201 fetched_recipe = database.recipes.get_by_slug(unique_user.group_id, recipe.slug) assert fetched_recipe assert fetched_recipe.tags assert len(fetched_recipe.tags) == 1 assert str(fetched_recipe.tags[0].id) == str(tag.id) def test_create_recipe_from_zip_invalid_category(api_client: TestClient, unique_user: TestUser, tempdir: str): database = unique_user.repos invalid_name = random_string() invalid_category = RecipeCategory(id=uuid4(), name=invalid_name, slug=invalid_name) recipe_name = random_string() recipe = RecipeSummary( id=uuid4(), user_id=unique_user.user_id, group_id=unique_user.group_id, name=recipe_name, slug=recipe_name, recipe_category=[invalid_category], ) r = api_client.post(api_routes.recipes_create_zip, files=zip_recipe(tempdir, recipe), headers=unique_user.token) assert r.status_code == 201 fetched_recipe = database.recipes.get_by_slug(unique_user.group_id, recipe.slug) assert fetched_recipe assert fetched_recipe.recipe_category assert len(fetched_recipe.recipe_category) == 1 # a new category should be created assert fetched_recipe.recipe_category[0].name == invalid_name assert fetched_recipe.recipe_category[0].slug == invalid_name def test_create_recipe_from_zip_invalid_tag(api_client: TestClient, unique_user: TestUser, tempdir: str): database = unique_user.repos invalid_name = random_string() invalid_tag = RecipeTag(id=uuid4(), name=invalid_name, slug=invalid_name) recipe_name = random_string() recipe = RecipeSummary( id=uuid4(), user_id=unique_user.user_id, group_id=unique_user.group_id, name=recipe_name, slug=recipe_name, tags=[invalid_tag], ) r = api_client.post(api_routes.recipes_create_zip, files=zip_recipe(tempdir, recipe), headers=unique_user.token) assert r.status_code == 201 fetched_recipe = database.recipes.get_by_slug(unique_user.group_id, recipe.slug) assert fetched_recipe assert fetched_recipe.tags assert len(fetched_recipe.tags) == 1 # a new tag should be created assert fetched_recipe.tags[0].name == invalid_name assert fetched_recipe.tags[0].slug == invalid_name def test_read_update( api_client: TestClient, unique_user: TestUser, recipe_categories: list[RecipeCategory], ): recipe_data = recipe_test_data[0] recipe_url = api_routes.recipes_slug(recipe_data.expected_slug) response = api_client.get(recipe_url, headers=unique_user.token) assert response.status_code == 200 recipe = json.loads(response.text) test_notes = [ {"title": "My Test Title1", "text": "My Test Text1"}, {"title": "My Test Title2", "text": "My Test Text2"}, ] recipe["notes"] = test_notes recipe["recipeCategory"] = [x.model_dump() for x in recipe_categories] response = api_client.put(recipe_url, json=utils.jsonify(recipe), headers=unique_user.token) assert response.status_code == 200 assert json.loads(response.text).get("slug") == recipe_data.expected_slug response = api_client.get(recipe_url, headers=unique_user.token) assert response.status_code == 200 recipe = json.loads(response.text) assert recipe["notes"] == test_notes assert len(recipe["recipeCategory"]) == len(recipe_categories) test_name = [x.name for x in recipe_categories] for cats in zip(recipe["recipeCategory"], recipe_categories, strict=False): assert cats[0]["name"] in test_name def test_duplicate(api_client: TestClient, unique_user: TestUser): recipe_data = recipe_test_data[0] # Initial get of the original recipe original_recipe_url = api_routes.recipes_slug(recipe_data.expected_slug) response = api_client.get(original_recipe_url, headers=unique_user.token) assert response.status_code == 200 initial_recipe = json.loads(response.text) # Duplicate the recipe recipe_duplicate_url = api_routes.recipes_slug_duplicate(recipe_data.expected_slug) response = api_client.post( recipe_duplicate_url, headers=unique_user.token, json={ "name": "Test Duplicate", }, ) assert response.status_code == 201 duplicate_recipe = json.loads(response.text) assert duplicate_recipe["id"] != initial_recipe["id"] assert duplicate_recipe["slug"].startswith("test-duplicate") assert duplicate_recipe["name"].startswith("Test Duplicate") # Image should be copied (if it exists) assert ( duplicate_recipe["image"] is None and initial_recipe["image"] is None or duplicate_recipe["image"] != initial_recipe["image"] ) # Number of steps should be the same, but the text may have changed (link replacements) assert len(duplicate_recipe["recipeInstructions"]) == len(initial_recipe["recipeInstructions"]) # Ingredients should have the same texts, but different ids assert duplicate_recipe["recipeIngredient"] != initial_recipe["recipeIngredient"] assert [i["note"] for i in duplicate_recipe["recipeIngredient"]] == [ i["note"] for i in initial_recipe["recipeIngredient"] ] previous_categories = initial_recipe["recipeCategory"] assert duplicate_recipe["recipeCategory"] == previous_categories # Edit the duplicated recipe to make sure it doesn't affect the original dup_notes = duplicate_recipe["notes"] or [] dup_notes.append({"title": "Test", "text": "Test"}) duplicate_recipe["notes"] = dup_notes duplicate_recipe["recipeIngredient"][0]["note"] = "Different Ingredient" new_recipe_url = api_routes.recipes_slug(duplicate_recipe.get("slug")) response = api_client.put(new_recipe_url, json=duplicate_recipe, headers=unique_user.token) assert response.status_code == 200 edited_recipe = json.loads(response.text) # reload original response = api_client.get(original_recipe_url, headers=unique_user.token) assert response.status_code == 200 original_recipe = json.loads(response.text) assert edited_recipe["notes"] == dup_notes assert original_recipe.get("notes") != edited_recipe.get("notes") assert original_recipe.get("recipeCategory") == previous_categories # Make sure ingredient edits don't affect the original original_ingredients = original_recipe.get("recipeIngredient") edited_ingredients = edited_recipe.get("recipeIngredient") assert len(original_ingredients) == len(edited_ingredients) assert original_ingredients[0]["note"] != edited_ingredients[0]["note"] assert edited_ingredients[0]["note"] == "Different Ingredient" assert original_ingredients[0]["referenceId"] != edited_ingredients[1]["referenceId"] for i in range(1, len(original_ingredients)): assert original_ingredients[i]["referenceId"] != edited_ingredients[i]["referenceId"] def copy_info(ing): return {k: v for k, v in ing.items() if k != "referenceId"} assert copy_info(original_ingredients[i]) == copy_info(edited_ingredients[i]) # This needs to happen after test_duplicate, # otherwise that one will run into problems with comparing the instruction/ingredient lists def test_update_with_empty_relationship( api_client: TestClient, unique_user: TestUser, ): recipe_data = recipe_test_data[0] recipe_url = api_routes.recipes_slug(recipe_data.expected_slug) response = api_client.get(recipe_url, headers=unique_user.token) assert response.status_code == 200 recipe = json.loads(response.text) recipe["recipeInstructions"] = [] recipe["recipeIngredient"] = [] response = api_client.put(recipe_url, json=utils.jsonify(recipe), headers=unique_user.token) assert response.status_code == 200 assert json.loads(response.text).get("slug") == recipe_data.expected_slug response = api_client.get(recipe_url, headers=unique_user.token) assert response.status_code == 200 recipe = json.loads(response.text) assert recipe["recipeInstructions"] == [] assert recipe["recipeIngredient"] == [] def test_rename(api_client: TestClient, unique_user: TestUser): recipe_data = recipe_test_data[0] recipe_url = api_routes.recipes_slug(recipe_data.expected_slug) response = api_client.get(recipe_url, headers=unique_user.token) assert response.status_code == 200 recipe = json.loads(response.text) new_name = recipe.get("name") + "-rename" new_slug = slugify(new_name) recipe["name"] = new_name response = api_client.put(recipe_url, json=recipe, headers=unique_user.token) assert response.status_code == 200 assert json.loads(response.text).get("slug") == new_slug recipe_data.expected_slug = new_slug def test_remove_notes(api_client: TestClient, unique_user: TestUser): # create recipe recipe_create_url = api_routes.recipes recipe_create_data = {"name": random_string()} response = api_client.post(recipe_create_url, headers=unique_user.token, json=recipe_create_data) assert response.status_code == 201 recipe_slug: str = response.json() # get recipe and add a note recipe_url = api_routes.recipes_slug(recipe_slug) response = api_client.get(recipe_url, headers=unique_user.token) assert response.status_code == 200 recipe = json.loads(response.text) recipe["notes"] = [RecipeNote(title=random_string(), text=random_string()).model_dump()] response = api_client.put(recipe_url, json=recipe, headers=unique_user.token) assert response.status_code == 200 # get recipe and remove the note response = api_client.get(recipe_url, headers=unique_user.token) assert response.status_code == 200 recipe = json.loads(response.text) assert len(recipe.get("notes", [])) == 1 recipe["notes"] = [] response = api_client.put(recipe_url, json=recipe, headers=unique_user.token) assert response.status_code == 200 # verify the note is removed response = api_client.get(recipe_url, headers=unique_user.token) assert response.status_code == 200 recipe = json.loads(response.text) assert len(recipe.get("notes", [])) == 0 def test_delete(api_client: TestClient, unique_user: TestUser): recipe_data = recipe_test_data[0] response = api_client.delete(api_routes.recipes_slug(recipe_data.expected_slug), headers=unique_user.token) assert response.status_code == 200 def test_recipe_crud_404(api_client: TestClient, unique_user: TestUser): response = api_client.put(api_routes.recipes_slug("test"), json={"test": "stest"}, headers=unique_user.token) assert response.status_code == 404 response = api_client.get(api_routes.recipes_slug("test"), headers=unique_user.token) assert response.status_code == 404 response = api_client.delete(api_routes.recipes_slug("test"), headers=unique_user.token) assert response.status_code == 404 response = api_client.patch(api_routes.recipes_slug("test"), json={"test": "stest"}, headers=unique_user.token) assert response.status_code == 404 def test_create_recipe_same_name(api_client: TestClient, unique_user: TestUser): slug = random_string(10) response = api_client.post(api_routes.recipes, json={"name": slug}, headers=unique_user.token) assert response.status_code == 201 assert json.loads(response.text) == slug response = api_client.post(api_routes.recipes, json={"name": slug}, headers=unique_user.token) assert response.status_code == 201 assert json.loads(response.text) == f"{slug}-1" def test_create_recipe_too_many_time(api_client: TestClient, unique_user: TestUser): slug = random_string(10) for _ in range(10): response = api_client.post(api_routes.recipes, json={"name": slug}, headers=unique_user.token) assert response.status_code == 201 response = api_client.post(api_routes.recipes, json={"name": slug}, headers=unique_user.token) assert response.status_code == 400 def test_delete_recipe_same_name(api_client: TestClient, unique_user: utils.TestUser, g2_user: utils.TestUser): slug = random_string(10) # Create recipe for both users for user in (unique_user, g2_user): response = api_client.post(api_routes.recipes, json={"name": slug}, headers=user.token) assert response.status_code == 201 assert json.loads(response.text) == slug # Delete recipe for user 1 response = api_client.delete(api_routes.recipes_slug(slug), headers=unique_user.token) assert response.status_code == 200 # Ensure recipe for user 2 still exists response = api_client.get(api_routes.recipes_slug(slug), headers=g2_user.token) assert response.status_code == 200 # Make sure recipe for user 1 doesn't exist response = api_client.get(api_routes.recipes_slug(slug), headers=unique_user.token) response = api_client.get(api_routes.recipes_slug(slug), headers=unique_user.token) assert response.status_code == 404 def test_get_recipe_by_slug_or_id(api_client: TestClient, unique_user: utils.TestUser): slugs = [random_string(10) for _ in range(3)] # Create recipes for slug in slugs: response = api_client.post(api_routes.recipes, json={"name": slug}, headers=unique_user.token) assert response.status_code == 201 assert json.loads(response.text) == slug # Get recipes by slug recipe_ids = [] for slug in slugs: response = api_client.get(api_routes.recipes_slug(slug), headers=unique_user.token) assert response.status_code == 200 recipe_data = response.json() assert recipe_data["slug"] == slug recipe_ids.append(recipe_data["id"]) # Get recipes by id for recipe_id, slug in zip(recipe_ids, slugs, strict=True): response = api_client.get(api_routes.recipes_slug(recipe_id), headers=unique_user.token) assert response.status_code == 200 recipe_data = response.json() assert recipe_data["slug"] == slug assert recipe_data["id"] == recipe_id @pytest.mark.parametrize("organizer_type", ["tags", "categories", "tools"]) def test_get_recipes_organizer_filter(api_client: TestClient, unique_user: utils.TestUser, organizer_type: str): database = unique_user.repos # create recipes with different organizers tags = database.tags.create_many([TagSave(name=random_string(), group_id=unique_user.group_id) for _ in range(3)]) categories = database.categories.create_many( [CategorySave(name=random_string(), group_id=unique_user.group_id) for _ in range(3)] ) tools = database.tools.create_many( [RecipeToolSave(name=random_string(), group_id=unique_user.group_id) for _ in range(3)] ) new_recipes_data: list[dict] = [] for i in range(40): name = random_string() new_recipes_data.append( Recipe( id=uuid4(), user_id=unique_user.user_id, group_id=unique_user.group_id, name=name, slug=name, tags=[random.choice(tags)] if i % 2 else [], recipe_category=[random.choice(categories)] if i % 2 else [], tools=[random.choice(tools)] if i % 2 else [], ) ) recipes = database.recipes.create_many(new_recipes_data) # type: ignore # get recipes by organizer if organizer_type == "tags": organizer = random.choice(tags) expected_recipe_ids = { str(recipe.id) for recipe in recipes if organizer.id in [tag.id for tag in recipe.tags or []] } elif organizer_type == "categories": organizer = random.choice(categories) expected_recipe_ids = { str(recipe.id) for recipe in recipes if organizer.id in [category.id for category in recipe.recipe_category or []] } elif organizer_type == "tools": organizer = random.choice(tools) expected_recipe_ids = { str(recipe.id) for recipe in recipes if organizer.id in [tool.id for tool in recipe.tools or []] } else: raise ValueError(f"Unknown organizer type: {organizer_type}") query_params = {organizer_type: str(organizer.id)} response = api_client.get(api_routes.recipes, params=query_params, headers=unique_user.token) assert response.status_code == 200 response_json = response.json() assert len(response_json["items"]) == len(expected_recipe_ids) fetched_recipes_ids = [recipe["id"] for recipe in response_json["items"]] assert set(fetched_recipes_ids) == expected_recipe_ids def test_get_random_order(api_client: TestClient, unique_user: utils.TestUser): # Create more recipes for stable random ordering slugs = [random_string(10) for _ in range(7)] for slug in slugs: response = api_client.post(api_routes.recipes, json={"name": slug}, headers=unique_user.token) assert response.status_code == 201 assert json.loads(response.text) == slug goodparams: dict[str, int | str] = {"page": 1, "perPage": -1, "orderBy": "random", "paginationSeed": "abcdefg"} response = api_client.get(api_routes.recipes, params=goodparams, headers=unique_user.token) assert response.status_code == 200 seed1_params: dict[str, int | str] = {"page": 1, "perPage": -1, "orderBy": "random", "paginationSeed": "abcdefg"} seed2_params: dict[str, int | str] = {"page": 1, "perPage": -1, "orderBy": "random", "paginationSeed": "gfedcba"} data1 = api_client.get(api_routes.recipes, params=seed1_params, headers=unique_user.token).json() data2 = api_client.get(api_routes.recipes, params=seed2_params, headers=unique_user.token).json() data1_new = api_client.get(api_routes.recipes, params=seed1_params, headers=unique_user.token).json() assert data1["items"][0]["slug"] != data2["items"][0]["slug"] # new seed -> new order assert data1["items"][0]["slug"] == data1_new["items"][0]["slug"] # same seed -> same order badparams: dict[str, int | str] = {"page": 1, "perPage": -1, "orderBy": "random"} response = api_client.get(api_routes.recipes, params=badparams, headers=unique_user.token) assert response.status_code == 422 def test_get_cookbook_recipes(api_client: TestClient, unique_user: utils.TestUser): tag = unique_user.repos.tags.create(TagSave(name=random_string(), group_id=unique_user.group_id)) cookbook_recipes = unique_user.repos.recipes.create_many( [ Recipe( user_id=unique_user.user_id, group_id=unique_user.group_id, name=random_string(), tags=[tag], ) for _ in range(3) ] ) other_recipes = unique_user.repos.recipes.create_many( [ Recipe( user_id=unique_user.user_id, group_id=unique_user.group_id, name=random_string(), ) for _ in range(3) ] ) cookbook = unique_user.repos.cookbooks.create( SaveCookBook( name=random_string(), group_id=unique_user.group_id, household_id=unique_user.household_id, tags=[tag], ) ) response = api_client.get(api_routes.recipes, params={"cookbook": cookbook.slug}, headers=unique_user.token) assert response.status_code == 200 recipes = [Recipe.model_validate(data) for data in response.json()["items"]] fetched_recipe_ids = {recipe.id for recipe in recipes} for recipe in cookbook_recipes: assert recipe.id in fetched_recipe_ids for recipe in other_recipes: assert recipe.id not in fetched_recipe_ids