Create task runner to download images

This commit is contained in:
Zoe Roux 2025-03-16 14:51:58 +01:00
parent 67511a3aa8
commit 71b8cbca4a
No known key found for this signature in database

View File

@ -25,58 +25,85 @@ type ImageTask = {
// requests are not blocked by image downloading or blurhash calculation
export const enqueueImage = async (
tx: typeof db,
url: string,
img: Omit<ImageTask, "id">,
): Promise<Image> => {
const hasher = new Bun.CryptoHasher("sha256");
hasher.update(url);
hasher.update(img.url);
const id = hasher.digest().toString("hex");
await tx.insert(mqueue).values({ kind: "image", message: { id, url } });
await tx.insert(mqueue).values({ kind: "image", message: { id, ...img } });
await tx.execute(sql`notify image`);
return {
id,
source: url,
source: img.url,
blurhash: "",
};
};
export const enqueueOptImage = async (
tx: typeof db,
url: string | null,
img: Omit<ImageTask, "id">,
): Promise<Image | null> => {
if (!url) return null;
return await enqueueImage(tx, url);
if (!img.url) return null;
return await enqueueImage(tx, img);
};
export const processImages = async () => {
await db.transaction(async (tx) => {
const [item] = await tx
.select()
.from(mqueue)
.for("update", { skipLocked: true })
.where(eq(mqueue.kind, "image"))
.orderBy(mqueue.createdAt)
.limit(1);
async function processOne() {
return await db.transaction(async (tx) => {
const [item] = await tx
.select()
.from(mqueue)
.for("update", { skipLocked: true })
.where(eq(mqueue.kind, "image"))
.orderBy(mqueue.createdAt)
.limit(1);
const img = item.message as ImageTask;
const blurhash = await downloadImage(img.id, img.url);
if (!item) return false;
const table = schema[img.table as keyof typeof schema] as any;
const img = item.message as ImageTask;
const blurhash = await downloadImage(img.id, img.url);
await tx
.update(table)
.set({
[img.column]: { id: img.id, source: img.url, blurhash } satisfies Image,
})
.where(eq(sql`${table[img.column]}->'id'`, img.id));
const table = schema[img.table as keyof typeof schema] as any;
await tx.delete(mqueue).where(eq(mqueue.id, item.id));
});
await tx
.update(table)
.set({
[img.column]: {
id: img.id,
source: img.url,
blurhash,
} satisfies Image,
})
.where(eq(sql`${table[img.column]}->'id'`, img.id));
await tx.delete(mqueue).where(eq(mqueue.id, item.id));
return true;
});
}
let running = false;
async function processAll() {
if (running) return;
running = true;
let found = true;
while (found) {
found = await processOne();
}
running = false;
}
const client = (await db.$client.connect()) as PoolClient;
client.on("notification", (evt) => {
if (evt.channel !== "image") return;
processAll();
});
await client.query("listen image");
// start processing old tasks
await processAll();
};
async function downloadImage(id: string, url: string): Promise<string> {