fix: Fix file not found error with individual recipe export/download. (#3579)

This commit is contained in:
nephlm 2024-05-20 18:53:14 -04:00 committed by GitHub
parent c610ec1344
commit c70a5cb72c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 115 additions and 80 deletions

View File

@ -1,12 +1,13 @@
import shutil
import tempfile
from collections.abc import AsyncGenerator, Callable, Generator
from collections.abc import Callable, Generator
from contextlib import contextmanager
from pathlib import Path
from shutil import rmtree
from uuid import uuid4
import fastapi
import jwt
from fastapi import BackgroundTasks, Depends, HTTPException, Request, status
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import OAuth2PasswordBearer
from jwt.exceptions import PyJWTError
from sqlalchemy.orm.session import Session
@ -205,24 +206,26 @@ def validate_recipe_token(token: str | None = None) -> str:
return slug
async def temporary_zip_path() -> AsyncGenerator[Path, None]:
@contextmanager
def get_temporary_zip_path(auto_unlink=True) -> Generator[Path, None, None]:
app_dirs.TEMP_DIR.mkdir(exist_ok=True, parents=True)
temp_path = app_dirs.TEMP_DIR.joinpath("my_zip_archive.zip")
try:
yield temp_path
finally:
temp_path.unlink(missing_ok=True)
if auto_unlink:
temp_path.unlink(missing_ok=True)
async def temporary_dir(background_tasks: BackgroundTasks) -> AsyncGenerator[Path, None]:
@contextmanager
def get_temporary_path(auto_unlink=True) -> Generator[Path, None, None]:
temp_path = app_dirs.TEMP_DIR.joinpath(uuid4().hex)
temp_path.mkdir(exist_ok=True, parents=True)
try:
yield temp_path
finally:
background_tasks.add_task(shutil.rmtree, temp_path)
if auto_unlink:
rmtree(temp_path)
def temporary_file(ext: str = "") -> Callable[[], Generator[tempfile._TemporaryFileWrapper, None, None]]:

View File

@ -1,10 +1,9 @@
import shutil
from pathlib import Path
from fastapi import Depends, File, Form
from fastapi import File, Form
from fastapi.datastructures import UploadFile
from mealie.core.dependencies import temporary_zip_path
from mealie.core.dependencies import get_temporary_zip_path
from mealie.routes._base import BaseUserController, controller
from mealie.routes._base.routers import UserAPIRouter
from mealie.schema.group.group_migration import SupportedMigrations
@ -32,38 +31,39 @@ class GroupMigrationController(BaseUserController):
add_migration_tag: bool = Form(False),
migration_type: SupportedMigrations = Form(...),
archive: UploadFile = File(...),
temp_path: Path = Depends(temporary_zip_path),
):
# Save archive to temp_path
with temp_path.open("wb") as buffer:
shutil.copyfileobj(archive.file, buffer)
with get_temporary_zip_path() as temp_path:
# Save archive to temp_path
with temp_path.open("wb") as buffer:
shutil.copyfileobj(archive.file, buffer)
args = {
"archive": temp_path,
"db": self.repos,
"session": self.session,
"user_id": self.user.id,
"group_id": self.group_id,
"add_migration_tag": add_migration_tag,
"translator": self.translator,
}
args = {
"archive": temp_path,
"db": self.repos,
"session": self.session,
"user_id": self.user.id,
"group_id": self.group_id,
"add_migration_tag": add_migration_tag,
"translator": self.translator,
}
table: dict[SupportedMigrations, type[BaseMigrator]] = {
SupportedMigrations.chowdown: ChowdownMigrator,
SupportedMigrations.copymethat: CopyMeThatMigrator,
SupportedMigrations.mealie_alpha: MealieAlphaMigrator,
SupportedMigrations.nextcloud: NextcloudMigrator,
SupportedMigrations.paprika: PaprikaMigrator,
SupportedMigrations.tandoor: TandoorMigrator,
SupportedMigrations.plantoeat: PlanToEatMigrator,
SupportedMigrations.myrecipebox: MyRecipeBoxMigrator,
}
table: dict[SupportedMigrations, type[BaseMigrator]] = {
SupportedMigrations.chowdown: ChowdownMigrator,
SupportedMigrations.copymethat: CopyMeThatMigrator,
SupportedMigrations.mealie_alpha: MealieAlphaMigrator,
SupportedMigrations.nextcloud: NextcloudMigrator,
SupportedMigrations.paprika: PaprikaMigrator,
SupportedMigrations.tandoor: TandoorMigrator,
SupportedMigrations.plantoeat: PlanToEatMigrator,
SupportedMigrations.myrecipebox: MyRecipeBoxMigrator,
}
constructor = table.get(migration_type, None)
constructor = table.get(migration_type, None)
if constructor is None:
raise ValueError(f"Unsupported migration type: {migration_type}")
if constructor is None:
raise ValueError(f"Unsupported migration type: {migration_type}")
migrator = constructor(**args)
migrator = constructor(**args)
return migrator.migrate(f"{migration_type.value.title()} Migration")
migration_result = migrator.migrate(f"{migration_type.value.title()} Migration")
return migration_result

View File

@ -1,9 +1,9 @@
from functools import cached_property
from pathlib import Path
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, HTTPException
from mealie.core.dependencies.dependencies import temporary_zip_path
from mealie.core.dependencies.dependencies import get_temporary_zip_path
from mealie.core.security import create_file_token
from mealie.routes._base import BaseUserController, controller
from mealie.schema.group.group_exports import GroupDataExport
@ -44,8 +44,9 @@ class RecipeBulkActionsController(BaseUserController):
self.service.delete_recipes(delete_recipes.recipes)
@router.post("/export", status_code=202)
def bulk_export_recipes(self, export_recipes: ExportRecipes, temp_path=Depends(temporary_zip_path)):
self.service.export_recipes(temp_path, export_recipes.recipes)
def bulk_export_recipes(self, export_recipes: ExportRecipes):
with get_temporary_zip_path() as temp_path:
self.service.export_recipes(temp_path, export_recipes.recipes)
@router.get("/export/download")
def get_exported_data_token(self, path: Path):

View File

@ -1,5 +1,5 @@
from functools import cached_property
from shutil import copyfileobj
from shutil import copyfileobj, rmtree
from uuid import UUID
from zipfile import ZipFile
@ -10,11 +10,11 @@ from fastapi.datastructures import UploadFile
from fastapi.responses import JSONResponse
from pydantic import UUID4, BaseModel, Field
from slugify import slugify
from starlette.background import BackgroundTask
from starlette.responses import FileResponse
from mealie.core import exceptions
from mealie.core.dependencies import temporary_zip_path
from mealie.core.dependencies.dependencies import temporary_dir, validate_recipe_token
from mealie.core.dependencies import get_temporary_path, get_temporary_zip_path, validate_recipe_token
from mealie.core.security import create_recipe_slug_token
from mealie.db.models.group.cookbook import CookBook
from mealie.pkgs import cache
@ -103,7 +103,7 @@ class RecipeExportController(BaseRecipeController):
return RecipeZipTokenResponse(token=create_recipe_slug_token(slug))
@router_exports.get("/{slug}/exports", response_class=FileResponse)
def get_recipe_as_format(self, slug: str, template_name: str, temp_dir=Depends(temporary_dir)):
def get_recipe_as_format(self, slug: str, template_name: str):
"""
## Parameters
`template_name`: The name of the template to use to use in the exports listed. Template type will automatically
@ -111,27 +111,31 @@ class RecipeExportController(BaseRecipeController):
names and formats in the /api/recipes/exports endpoint.
"""
recipe = self.mixins.get_one(slug)
file = self.service.render_template(recipe, temp_dir, template_name)
return FileResponse(file)
with get_temporary_path(auto_unlink=False) as temp_path:
recipe = self.mixins.get_one(slug)
file = self.service.render_template(recipe, temp_path, template_name)
return FileResponse(file, background=BackgroundTask(rmtree, temp_path))
@router_exports.get("/{slug}/exports/zip")
def get_recipe_as_zip(self, slug: str, token: str, temp_path=Depends(temporary_zip_path)):
"""Get a Recipe and It's Original Image as a Zip File"""
slug = validate_recipe_token(token)
def get_recipe_as_zip(self, slug: str, token: str):
"""Get a Recipe and Its Original Image as a Zip File"""
with get_temporary_zip_path(auto_unlink=False) as temp_path:
validated_slug = validate_recipe_token(token)
if slug != slug:
raise HTTPException(status_code=400, detail="Invalid Slug")
if validated_slug != slug:
raise HTTPException(status_code=400, detail="Invalid Slug")
recipe: Recipe = self.mixins.get_one(slug)
image_asset = recipe.image_dir.joinpath(RecipeImageTypes.original.value)
with ZipFile(temp_path, "w") as myzip:
myzip.writestr(f"{slug}.json", recipe.model_dump_json())
recipe: Recipe = self.mixins.get_one(validated_slug)
image_asset = recipe.image_dir.joinpath(RecipeImageTypes.original.value)
with ZipFile(temp_path, "w") as myzip:
myzip.writestr(f"{slug}.json", recipe.model_dump_json())
if image_asset.is_file():
myzip.write(image_asset, arcname=image_asset.name)
if image_asset.is_file():
myzip.write(image_asset, arcname=image_asset.name)
return FileResponse(temp_path, filename=f"{slug}.zip")
return FileResponse(
temp_path, filename=f"{recipe.slug}.zip", background=BackgroundTask(temp_path.unlink, missing_ok=True)
)
router = UserAPIRouter(prefix="/recipes", tags=["Recipe: CRUD"], route_class=MealieCrudRoute)
@ -219,13 +223,14 @@ class RecipeController(BaseRecipeController):
return "recipe_scrapers was unable to scrape this URL"
@router.post("/create-from-zip", status_code=201)
def create_recipe_from_zip(self, temp_path=Depends(temporary_zip_path), archive: UploadFile = File(...)):
def create_recipe_from_zip(self, archive: UploadFile = File(...)):
"""Create recipe from archive"""
recipe = self.service.create_from_zip(archive, temp_path)
self.publish_event(
event_type=EventTypes.recipe_created,
document_data=EventRecipeData(operation=EventOperation.create, recipe_slug=recipe.slug),
)
with get_temporary_zip_path() as temp_path:
recipe = self.service.create_from_zip(archive, temp_path)
self.publish_event(
event_type=EventTypes.recipe_created,
document_data=EventRecipeData(operation=EventOperation.create, recipe_slug=recipe.slug),
)
return recipe.slug

View File

@ -1,10 +1,9 @@
import shutil
from pathlib import Path
from fastapi import Depends, File, HTTPException, UploadFile, status
from fastapi import File, HTTPException, UploadFile, status
from pydantic import UUID4
from mealie.core.dependencies.dependencies import temporary_dir
from mealie.core.dependencies import get_temporary_path
from mealie.pkgs import cache, img
from mealie.routes._base import BaseUserController, controller
from mealie.routes._base.routers import UserAPIRouter
@ -21,19 +20,19 @@ class UserImageController(BaseUserController):
self,
id: UUID4,
profile: UploadFile = File(...),
temp_dir: Path = Depends(temporary_dir),
):
"""Updates a User Image"""
assert_user_change_allowed(id, self.user)
temp_img = temp_dir.joinpath(profile.filename)
with get_temporary_path() as temp_path:
assert_user_change_allowed(id, self.user)
temp_img = temp_path.joinpath(profile.filename)
with temp_img.open("wb") as buffer:
shutil.copyfileobj(profile.file, buffer)
with temp_img.open("wb") as buffer:
shutil.copyfileobj(profile.file, buffer)
image = img.PillowMinifier.to_webp(temp_img)
dest = PrivateUser.get_directory(id) / "profile.webp"
image = img.PillowMinifier.to_webp(temp_img)
dest = PrivateUser.get_directory(id) / "profile.webp"
shutil.copyfile(image, dest)
shutil.copyfile(image, dest)
self.repos.users.patch(id, {"cache_key": cache.new_key()})

View File

@ -1,3 +1,7 @@
from io import BytesIO
import json
import zipfile
from fastapi.testclient import TestClient
from tests.utils import api_routes
@ -36,6 +40,29 @@ def test_render_jinja_template(api_client: TestClient, unique_user: TestUser) ->
assert f"# {recipe_name}" in response.text
def test_get_recipe_as_zip(api_client: TestClient, unique_user: TestUser) -> None:
# Create Recipe
recipe_name = random_string()
response = api_client.post(api_routes.recipes, json={"name": recipe_name}, headers=unique_user.token)
assert response.status_code == 201
slug = response.json()
# Get zip token
response = api_client.post(api_routes.recipes_slug_exports(slug), headers=unique_user.token)
assert response.status_code == 200
token = response.json()["token"]
assert token
response = api_client.get(api_routes.recipes_slug_exports_zip(slug) + f"?token={token}", headers=unique_user.token)
assert response.status_code == 200
# Verify the zip
zip_file = BytesIO(response.content)
with zipfile.ZipFile(zip_file, "r") as zip_fp:
with zip_fp.open(f"{slug}.json") as json_fp:
assert json.loads(json_fp.read())["name"] == recipe_name
# TODO: Allow users to upload templates to their own directory
# def test_upload_template(api_client: TestClient, unique_user: TestUser) -> None:
# assert False