diff --git a/api/src/controllers/seed/images.ts b/api/src/controllers/seed/images.ts index 55b9edec..18e5faa2 100644 --- a/api/src/controllers/seed/images.ts +++ b/api/src/controllers/seed/images.ts @@ -25,58 +25,85 @@ type ImageTask = { // requests are not blocked by image downloading or blurhash calculation export const enqueueImage = async ( tx: typeof db, - url: string, + img: Omit, ): Promise => { const hasher = new Bun.CryptoHasher("sha256"); - hasher.update(url); + hasher.update(img.url); const id = hasher.digest().toString("hex"); - await tx.insert(mqueue).values({ kind: "image", message: { id, url } }); + await tx.insert(mqueue).values({ kind: "image", message: { id, ...img } }); + await tx.execute(sql`notify image`); return { id, - source: url, + source: img.url, blurhash: "", }; }; export const enqueueOptImage = async ( tx: typeof db, - url: string | null, + img: Omit, ): Promise => { - if (!url) return null; - return await enqueueImage(tx, url); + if (!img.url) return null; + return await enqueueImage(tx, img); }; export const processImages = async () => { - await db.transaction(async (tx) => { - const [item] = await tx - .select() - .from(mqueue) - .for("update", { skipLocked: true }) - .where(eq(mqueue.kind, "image")) - .orderBy(mqueue.createdAt) - .limit(1); + async function processOne() { + return await db.transaction(async (tx) => { + const [item] = await tx + .select() + .from(mqueue) + .for("update", { skipLocked: true }) + .where(eq(mqueue.kind, "image")) + .orderBy(mqueue.createdAt) + .limit(1); - const img = item.message as ImageTask; - const blurhash = await downloadImage(img.id, img.url); + if (!item) return false; - const table = schema[img.table as keyof typeof schema] as any; + const img = item.message as ImageTask; + const blurhash = await downloadImage(img.id, img.url); - await tx - .update(table) - .set({ - [img.column]: { id: img.id, source: img.url, blurhash } satisfies Image, - }) - .where(eq(sql`${table[img.column]}->'id'`, img.id)); + const table = schema[img.table as keyof typeof schema] as any; - await tx.delete(mqueue).where(eq(mqueue.id, item.id)); - }); + await tx + .update(table) + .set({ + [img.column]: { + id: img.id, + source: img.url, + blurhash, + } satisfies Image, + }) + .where(eq(sql`${table[img.column]}->'id'`, img.id)); + + await tx.delete(mqueue).where(eq(mqueue.id, item.id)); + return true; + }); + } + + let running = false; + async function processAll() { + if (running) return; + running = true; + + let found = true; + while (found) { + found = await processOne(); + } + running = false; + } const client = (await db.$client.connect()) as PoolClient; client.on("notification", (evt) => { if (evt.channel !== "image") return; + processAll(); }); + await client.query("listen image"); + + // start processing old tasks + await processAll(); }; async function downloadImage(id: string, url: string): Promise {