Use an images table to prevent dup downloads (#1235)

This commit is contained in:
Zoe Roux 2025-12-21 18:30:14 +01:00 committed by GitHub
commit bec3cfb865
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 2163 additions and 72 deletions

View File

@ -0,0 +1,16 @@
CREATE TYPE "kyoo"."img_status" AS ENUM('pending', 'link', 'ready');--> statement-breakpoint
CREATE TABLE "kyoo"."images" (
"pk" integer PRIMARY KEY GENERATED ALWAYS AS IDENTITY (sequence name "kyoo"."images_pk_seq" INCREMENT BY 1 MINVALUE 1 MAXVALUE 2147483647 START WITH 1 CACHE 1),
"id" varchar(256) NOT NULL,
"url" text NOT NULL,
"blurhash" varchar(256),
"targets" jsonb NOT NULL,
"priority" integer DEFAULT 0 NOT NULL,
"attempt" integer DEFAULT 0 NOT NULL,
"status" "kyoo"."img_status" DEFAULT 'pending' NOT NULL,
"created_at" timestamp with time zone DEFAULT now() NOT NULL,
"downloaded_at" timestamp with time zone,
CONSTRAINT "images_id_unique" UNIQUE("id")
);
--> statement-breakpoint
CREATE INDEX "imgqueue_sort" ON "kyoo"."images" USING btree ("priority","attempt","created_at");

File diff suppressed because it is too large Load Diff

View File

@ -176,6 +176,13 @@
"when": 1763932730557,
"tag": "0024_fix-season-count",
"breakpoints": true
},
{
"idx": 25,
"version": "7",
"when": 1766335080472,
"tag": "0025_images",
"breakpoints": true
}
]
}

View File

@ -2,13 +2,13 @@ import path from "node:path";
import { getCurrentSpan, setAttributes } from "@elysiajs/opentelemetry";
import { SpanStatusCode } from "@opentelemetry/api";
import { encode } from "blurhash";
import { and, eq, is, lt, type SQL, sql } from "drizzle-orm";
import { and, eq, is, lt, ne, type SQL, sql } from "drizzle-orm";
import { PgColumn, type PgTable } from "drizzle-orm/pg-core";
import { version } from "package.json";
import type { PoolClient } from "pg";
import sharp from "sharp";
import { db, type Transaction } from "~/db";
import { mqueue } from "~/db/schema/mqueue";
import { images } from "~/db/schema";
import { unnestValues } from "~/db/utils";
import type { Image } from "~/models/utils";
import { record } from "~/otel";
@ -20,8 +20,7 @@ export const defaultBlurhash = "000000";
export type ImageTask = {
id: string;
url: string;
table: string;
column: string;
targets: { table: string; column: string }[];
};
// this will only push a task to the image downloader service and not download it instantly.
@ -52,24 +51,27 @@ export const enqueueOptImage = (
),
).sql;
const message: ImageTask =
"table" in img
? {
id,
url: img.url,
// @ts-expect-error dialect is private
table: db.dialect.sqlToQuery(sql`${img.table}`).sql,
column: cleanupColumn(img.column),
}
: {
id,
url: img.url,
// @ts-expect-error dialect is private
table: db.dialect.sqlToQuery(sql`${img.column.table}`).sql,
column: sql.identifier(img.column.name).value,
};
const req: ImageTask = {
id,
url: img.url,
targets: [
"table" in img
? {
// @ts-expect-error dialect is private
table: db.dialect.sqlToQuery(sql`${img.table}`).sql,
column: cleanupColumn(img.column),
}
: {
// @ts-expect-error dialect is private
table: db.dialect.sqlToQuery(sql`${img.column.table}`).sql,
column: sql.identifier(img.column.name).value,
},
],
};
imgQueue.push(message);
const existing = imgQueue.find((x) => x.id === id);
if (existing) existing.targets.push(...req.targets);
else imgQueue.push(req);
return {
id,
@ -80,14 +82,33 @@ export const enqueueOptImage = (
export const flushImageQueue = record(
"enqueueImages",
async (tx: Transaction, imgQueue: ImageTask[], priority: number) => {
if (!imgQueue.length) return;
await tx.insert(mqueue).select(
unnestValues(
imgQueue.map((x) => ({ kind: "image", message: x, priority })),
mqueue,
),
);
async (tx: Transaction, tasks: ImageTask[], priority: number) => {
if (!tasks.length) return;
await tx
.insert(images)
.select(
unnestValues(
tasks.map((x) => ({
id: x.id,
url: x.url,
targets: x.targets,
priority,
})),
images,
),
)
.onConflictDoUpdate({
target: [images.id],
set: {
status: sql`
case
when ${images.status} = 'pending' then 'pending'::img_status
else 'link'::img_status
end
`,
targets: sql`${images.targets} || excluded.targets`,
},
});
await tx.execute(sql`notify kyoo_image`);
},
);
@ -113,14 +134,7 @@ export const processImages = record(
const client = (await db.$client.connect()) as PoolClient;
client.on("notification", (evt) => {
if (evt.channel !== "kyoo_image") return;
try {
processAll();
} catch (e) {
console.error(
"Failed to processs images. aborting images downloading",
e,
);
}
processAll();
});
await client.query("listen kyoo_image");
@ -136,31 +150,43 @@ export const processImages = record(
const processOne = record("download", async () => {
return await db.transaction(async (tx) => {
const [item] = await tx
const [img] = await tx
.select()
.from(mqueue)
.from(images)
.for("update", { skipLocked: true })
.where(and(eq(mqueue.kind, "image"), lt(mqueue.attempt, 5)))
.orderBy(mqueue.priority, mqueue.attempt, mqueue.createdAt)
.where(and(ne(images.status, "ready"), lt(images.attempt, 5)))
.orderBy(images.priority, images.attempt, images.createdAt)
.limit(1);
if (!item) return false;
if (!img) return false;
const img = item.message as ImageTask;
setAttributes({ "item.url": img.url });
try {
const blurhash = await downloadImage(img.id, img.url);
const blurhash =
img.status === "pending"
? await downloadImage(img.id, img.url)
: img.blurhash!;
const ret: Image = { id: img.id, source: img.url, blurhash };
const table = sql.raw(img.table);
const column = sql.raw(img.column);
for (const target of img.targets) {
const table = sql.raw(target.table);
const column = sql.raw(target.column);
await tx.execute(sql`
update ${table} set ${column} = ${ret}
where ${column}->'id' = to_jsonb(${img.id}::text)
`);
await tx.execute(sql`
update ${table} set ${column} = ${ret}
where ${column}->'id' = to_jsonb(${img.id}::text)
`);
}
await tx.delete(mqueue).where(eq(mqueue.id, item.id));
await tx
.update(images)
.set({
blurhash,
status: "ready",
targets: [],
downloadedAt: sql`now()`,
})
.where(eq(images.pk, img.pk));
} catch (err: any) {
const span = getCurrentSpan();
if (span) {
@ -168,10 +194,14 @@ const processOne = record("download", async () => {
span.setStatus({ code: SpanStatusCode.ERROR });
}
console.error("Failed to download image", img.url, err);
await tx
.update(mqueue)
.set({ attempt: sql`${mqueue.attempt}+1` })
.where(eq(mqueue.id, item.id));
try {
await tx
.update(images)
.set({ attempt: sql`${images.attempt}+1` })
.where(eq(images.pk, img.pk));
} catch (e) {
console.error("Failed to mark download as failed", e);
}
}
return true;
});

View File

@ -0,0 +1,28 @@
import { sql } from "drizzle-orm";
import { index, integer, jsonb, text, varchar } from "drizzle-orm/pg-core";
import { schema, timestamp } from "./utils";
export const imgStatus = schema.enum("img_status", [
"pending",
"link",
"ready",
]);
export const images = schema.table(
"images",
{
pk: integer().primaryKey().generatedAlwaysAsIdentity(),
id: varchar({ length: 256 }).notNull().unique(),
url: text().notNull(),
blurhash: varchar({ length: 256 }),
targets: jsonb().$type<{ table: string; column: string }[]>().notNull(),
priority: integer().notNull().default(0),
attempt: integer().notNull().default(0),
status: imgStatus().notNull().default("pending"),
createdAt: timestamp({ withTimezone: true, mode: "iso" })
.notNull()
.default(sql`now()`),
downloadedAt: timestamp({ withTimezone: true, mode: "iso" }),
},
(t) => [index("imgqueue_sort").on(t.priority, t.attempt, t.createdAt)],
);

View File

@ -1,5 +1,6 @@
export * from "./entries";
export * from "./history";
export * from "./images";
export * from "./mqueue";
export * from "./profiles";
export * from "./seasons";

View File

@ -79,15 +79,15 @@ export function sqlarr(array: unknown[]): string {
return str.replaceAll("\\", "\\\\").replaceAll('"', '\\"');
}
// we treat arrays as object to have them as jsonb arrays instead of pg arrays.
// nested arrays doesn't work well with unnest anyways.
return `{${array
.map((item) =>
item === "null" || item === null || item === undefined
? "null"
: Array.isArray(item)
? sqlarr(item)
: typeof item === "object"
? `"${escapeStr(JSON.stringify(item))}"`
: `"${escapeStr(item.toString())}"`,
: typeof item === "object"
? `"${escapeStr(JSON.stringify(item))}"`
: `"${escapeStr(item.toString())}"`,
)
.join(", ")}}`;
}

View File

@ -1,10 +1,10 @@
import { beforeAll, describe, expect, it } from "bun:test";
import { and, eq, sql } from "drizzle-orm";
import { eq } from "drizzle-orm";
import { createMovie, createSerie } from "tests/helpers";
import { expectStatus } from "tests/utils";
import { defaultBlurhash, processImages } from "~/controllers/seed/images";
import { db } from "~/db";
import { mqueue, shows, staff, studios, videos } from "~/db/schema";
import { images, shows, staff, studios, videos } from "~/db/schema";
import { dune, madeInAbyss } from "~/models/examples";
describe("images", () => {
@ -13,11 +13,10 @@ describe("images", () => {
await db.delete(studios);
await db.delete(staff);
await db.delete(videos);
await db.delete(mqueue);
await db.delete(images);
});
it("Create a serie download images", async () => {
await db.delete(mqueue);
await createSerie(madeInAbyss);
const release = await processImages(true);
// remove notifications to prevent other images to be downloaded (do not curl 20000 images for nothing)
@ -32,7 +31,6 @@ describe("images", () => {
});
it("Download 404 image", async () => {
await db.delete(mqueue);
const url404 = "https://mockhttp.org/status/404";
const [ret, body] = await createMovie({
...dune,
@ -52,11 +50,8 @@ describe("images", () => {
// remove notifications to prevent other images to be downloaded (do not curl 20000 images for nothing)
release();
const failed = await db.query.mqueue.findFirst({
where: and(
eq(mqueue.kind, "image"),
eq(sql`${mqueue.message}->>'url'`, url404),
),
const failed = await db.query.images.findFirst({
where: eq(images.url, url404),
});
expect(failed!.attempt).toBe(5);
});

View File

@ -21,8 +21,13 @@ export const useWebsockets = ({
});
useEffect(() => {
console.log(ret.readyState);
}, [ret.readyState]);
console.log(
"websocket connected to:",
`${apiUrl}/api/ws`,
"status:",
ret.readyState,
);
}, [apiUrl, ret.readyState]);
return ret;
};