Use an images table to prevent dup downloads

This commit is contained in:
Zoe Roux
2025-12-21 18:12:44 +01:00
parent 11060df0d0
commit 1a1ccc9d4c
8 changed files with 2156 additions and 70 deletions
+85 -55
View File
@@ -2,13 +2,13 @@ import path from "node:path";
import { getCurrentSpan, setAttributes } from "@elysiajs/opentelemetry";
import { SpanStatusCode } from "@opentelemetry/api";
import { encode } from "blurhash";
import { and, eq, is, lt, type SQL, sql } from "drizzle-orm";
import { and, eq, is, lt, ne, type SQL, sql } from "drizzle-orm";
import { PgColumn, type PgTable } from "drizzle-orm/pg-core";
import { version } from "package.json";
import type { PoolClient } from "pg";
import sharp from "sharp";
import { db, type Transaction } from "~/db";
import { mqueue } from "~/db/schema/mqueue";
import { images } from "~/db/schema";
import { unnestValues } from "~/db/utils";
import type { Image } from "~/models/utils";
import { record } from "~/otel";
@@ -20,8 +20,7 @@ export const defaultBlurhash = "000000";
export type ImageTask = {
id: string;
url: string;
table: string;
column: string;
targets: { table: string; column: string }[];
};
// this will only push a task to the image downloader service and not download it instantly.
@@ -52,24 +51,27 @@ export const enqueueOptImage = (
),
).sql;
const message: ImageTask =
"table" in img
? {
id,
url: img.url,
// @ts-expect-error dialect is private
table: db.dialect.sqlToQuery(sql`${img.table}`).sql,
column: cleanupColumn(img.column),
}
: {
id,
url: img.url,
// @ts-expect-error dialect is private
table: db.dialect.sqlToQuery(sql`${img.column.table}`).sql,
column: sql.identifier(img.column.name).value,
};
const req: ImageTask = {
id,
url: img.url,
targets: [
"table" in img
? {
// @ts-expect-error dialect is private
table: db.dialect.sqlToQuery(sql`${img.table}`).sql,
column: cleanupColumn(img.column),
}
: {
// @ts-expect-error dialect is private
table: db.dialect.sqlToQuery(sql`${img.column.table}`).sql,
column: sql.identifier(img.column.name).value,
},
],
};
imgQueue.push(message);
const existing = imgQueue.find((x) => x.id === id);
if (existing) existing.targets.push(...req.targets);
else imgQueue.push(req);
return {
id,
@@ -80,14 +82,33 @@ export const enqueueOptImage = (
export const flushImageQueue = record(
"enqueueImages",
async (tx: Transaction, imgQueue: ImageTask[], priority: number) => {
if (!imgQueue.length) return;
await tx.insert(mqueue).select(
unnestValues(
imgQueue.map((x) => ({ kind: "image", message: x, priority })),
mqueue,
),
);
async (tx: Transaction, tasks: ImageTask[], priority: number) => {
if (!tasks.length) return;
await tx
.insert(images)
.select(
unnestValues(
tasks.map((x) => ({
id: x.id,
url: x.url,
targets: x.targets,
priority,
})),
images,
),
)
.onConflictDoUpdate({
target: [images.id],
set: {
status: sql`
case
when ${images.status} = 'pending' then 'pending'::img_status
else 'link'::img_status
end
`,
targets: sql`${images.targets} || excluded.targets`,
},
});
await tx.execute(sql`notify kyoo_image`);
},
);
@@ -113,14 +134,7 @@ export const processImages = record(
const client = (await db.$client.connect()) as PoolClient;
client.on("notification", (evt) => {
if (evt.channel !== "kyoo_image") return;
try {
processAll();
} catch (e) {
console.error(
"Failed to processs images. aborting images downloading",
e,
);
}
processAll();
});
await client.query("listen kyoo_image");
@@ -136,31 +150,43 @@ export const processImages = record(
const processOne = record("download", async () => {
return await db.transaction(async (tx) => {
const [item] = await tx
const [img] = await tx
.select()
.from(mqueue)
.from(images)
.for("update", { skipLocked: true })
.where(and(eq(mqueue.kind, "image"), lt(mqueue.attempt, 5)))
.orderBy(mqueue.priority, mqueue.attempt, mqueue.createdAt)
.where(and(ne(images.status, "ready"), lt(images.attempt, 5)))
.orderBy(images.priority, images.attempt, images.createdAt)
.limit(1);
if (!item) return false;
if (!img) return false;
const img = item.message as ImageTask;
setAttributes({ "item.url": img.url });
try {
const blurhash = await downloadImage(img.id, img.url);
const blurhash =
img.status === "pending"
? await downloadImage(img.id, img.url)
: img.blurhash!;
const ret: Image = { id: img.id, source: img.url, blurhash };
const table = sql.raw(img.table);
const column = sql.raw(img.column);
for (const target of img.targets) {
const table = sql.raw(target.table);
const column = sql.raw(target.column);
await tx.execute(sql`
update ${table} set ${column} = ${ret}
where ${column}->'id' = to_jsonb(${img.id}::text)
`);
await tx.execute(sql`
update ${table} set ${column} = ${ret}
where ${column}->'id' = to_jsonb(${img.id}::text)
`);
}
await tx.delete(mqueue).where(eq(mqueue.id, item.id));
await tx
.update(images)
.set({
blurhash,
status: "ready",
targets: [],
downloadedAt: sql`now()`,
})
.where(eq(images.pk, img.pk));
} catch (err: any) {
const span = getCurrentSpan();
if (span) {
@@ -168,10 +194,14 @@ const processOne = record("download", async () => {
span.setStatus({ code: SpanStatusCode.ERROR });
}
console.error("Failed to download image", img.url, err);
await tx
.update(mqueue)
.set({ attempt: sql`${mqueue.attempt}+1` })
.where(eq(mqueue.id, item.id));
try {
await tx
.update(images)
.set({ attempt: sql`${images.attempt}+1` })
.where(eq(images.pk, img.pk));
} catch (e) {
console.error("Failed to mark download as failed", e);
}
}
return true;
});