From 3602905e869c52b89c5c6c86dae5fd3b62e7e8f5 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Sun, 23 Nov 2025 15:14:23 +0100 Subject: [PATCH] Properly handle spans of image downloading --- api/src/auth.ts | 2 +- api/src/controllers/seed/images.ts | 62 ++++++++++++++++++------------ 2 files changed, 38 insertions(+), 26 deletions(-) diff --git a/api/src/auth.ts b/api/src/auth.ts index 2a38bfa4..d7f0d874 100644 --- a/api/src/auth.ts +++ b/api/src/auth.ts @@ -73,7 +73,7 @@ export const auth = new Elysia({ name: "auth" }) .macro({ permissions(perms: string[]) { return { - beforeHandle: ({ jwt, status }) => { + beforeHandle: function permissionCheck({ jwt, status }) { for (const perm of perms) { if (!jwt!.permissions.includes(perm)) { return status(403, { diff --git a/api/src/controllers/seed/images.ts b/api/src/controllers/seed/images.ts index f80b0d53..106047b8 100644 --- a/api/src/controllers/seed/images.ts +++ b/api/src/controllers/seed/images.ts @@ -1,4 +1,6 @@ import path from "node:path"; +import { getCurrentSpan, record, setAttributes } from "@elysiajs/opentelemetry"; +import { SpanStatusCode } from "@opentelemetry/api"; import { encode } from "blurhash"; import { and, eq, is, lt, type SQL, sql } from "drizzle-orm"; import { PgColumn, type PgTable } from "drizzle-orm/pg-core"; @@ -78,7 +80,34 @@ export const enqueueOptImage = async ( }; export const processImages = async () => { - async function processOne() { + return record("download images", async () => { + let running = false; + async function processAll() { + if (running) return; + running = true; + + let found = true; + while (found) { + found = await processOne(); + } + running = false; + } + + const client = (await db.$client.connect()) as PoolClient; + client.on("notification", (evt) => { + if (evt.channel !== "kyoo_image") return; + processAll(); + }); + await client.query("listen kyoo_image"); + + // start processing old tasks + await processAll(); + return () => client.release(true); + }); +}; + +async function processOne() { + return record("download", async () => { return await db.transaction(async (tx) => { const [item] = await tx .select() @@ -91,6 +120,7 @@ export const processImages = async () => { if (!item) return false; const img = item.message as ImageTask; + setAttributes({ "item.url": img.url }); try { const blurhash = await downloadImage(img.id, img.url); const ret: Image = { id: img.id, source: img.url, blurhash }; @@ -105,6 +135,11 @@ export const processImages = async () => { await tx.delete(mqueue).where(eq(mqueue.id, item.id)); } catch (err: any) { + const span = getCurrentSpan(); + if (span) { + span.recordException(err); + span.setStatus({ code: SpanStatusCode.ERROR }); + } console.error("Failed to download image", img.url, err.message); // don't use the transaction here, it can be aborted. await db @@ -114,31 +149,8 @@ export const processImages = async () => { } return true; }); - } - - let running = false; - async function processAll() { - if (running) return; - running = true; - - let found = true; - while (found) { - found = await processOne(); - } - running = false; - } - - const client = (await db.$client.connect()) as PoolClient; - client.on("notification", (evt) => { - if (evt.channel !== "kyoo_image") return; - processAll(); }); - await client.query("listen kyoo_image"); - - // start processing old tasks - await processAll(); - return () => client.release(true); -}; +} async function downloadImage(id: string, url: string): Promise { const low = await getFile(path.join(imageDir, `${id}.low.jpg`))