diff --git a/api/src/controllers/seed/images.ts b/api/src/controllers/seed/images.ts index 26d42af9..dc5f67b3 100644 --- a/api/src/controllers/seed/images.ts +++ b/api/src/controllers/seed/images.ts @@ -92,34 +92,47 @@ export const flushImageQueue = record( }, ); -export const processImages = record("processImages", async () => { - let running = false; - async function processAll() { - if (running) return; - running = true; +export const processImages = record( + "processImages", + async (waitToFinish = false) => { + let running = false; + async function processAll() { + if (running) return; + running = true; - let found = true; - while (found) { - // run 10 downloads at the same time, - // if one of them couldn't find an item the queue is empty. - found = !( - await Promise.all([new Array(10)].map(() => processOne())) - ).includes(false); + let found = true; + while (found) { + // run 10 downloads at the same time, + const founds = await Promise.all([...new Array(10)].map(processOne)); + // continue as long as there's one found (if it failed we wanna retry) + found = founds.includes(true); + } + running = false; } - running = false; - } - const client = (await db.$client.connect()) as PoolClient; - client.on("notification", (evt) => { - if (evt.channel !== "kyoo_image") return; - processAll(); - }); - await client.query("listen kyoo_image"); + const client = (await db.$client.connect()) as PoolClient; + client.on("notification", (evt) => { + if (evt.channel !== "kyoo_image") return; + try { + processAll(); + } catch (e) { + console.error( + "Failed to processs images. aborting images downloading", + e, + ); + } + }); + await client.query("listen kyoo_image"); - // start processing old tasks - await processAll(); - return () => client.release(true); -}); + if (waitToFinish) { + // start processing old tasks + await processAll(); + } else { + processAll(); + } + return () => client.release(true); + }, +); const processOne = record("download", async () => { return await db.transaction(async (tx) => { @@ -144,7 +157,7 @@ const processOne = record("download", async () => { await tx.execute(sql` update ${table} set ${column} = ${ret} - where ${column}->'id' = ${sql.raw(`'"${img.id}"'::jsonb`)} + where ${column}->'id' = to_jsonb(${img.id}::text) `); await tx.delete(mqueue).where(eq(mqueue.id, item.id)); @@ -154,7 +167,7 @@ const processOne = record("download", async () => { span.recordException(err); span.setStatus({ code: SpanStatusCode.ERROR }); } - console.error("Failed to download image", img.url, err.message); + console.error("Failed to download image", img.url, err); await tx .update(mqueue) .set({ attempt: sql`${mqueue.attempt}+1` }) diff --git a/api/src/index.ts b/api/src/index.ts index aaa8ff11..89f53da2 100644 --- a/api/src/index.ts +++ b/api/src/index.ts @@ -2,12 +2,12 @@ import { swagger } from "@elysiajs/swagger"; import Elysia from "elysia"; import { handlers } from "./base"; import { processImages } from "./controllers/seed/images"; -import { migrate } from "./db"; +import { db, migrate } from "./db"; import { comment } from "./utils"; await migrate(); -processImages(); +const disposeImages = await processImages(); const app = new Elysia() .use( @@ -87,4 +87,14 @@ const app = new Elysia() .use(handlers) .listen(3567); +process.on("SIGTERM", () => { + app.stop().then(async () => { + console.log("Api stopping"); + disposeImages(); + await db.$client.end(); + console.log("Api stopped"); + process.exit(0); + }); +}); + console.log(`Api running at ${app.server?.hostname}:${app.server?.port}`); diff --git a/api/tests/misc/images.test.ts b/api/tests/misc/images.test.ts index e3d73c68..6623a0a7 100644 --- a/api/tests/misc/images.test.ts +++ b/api/tests/misc/images.test.ts @@ -19,7 +19,7 @@ describe("images", () => { it("Create a serie download images", async () => { await db.delete(mqueue); await createSerie(madeInAbyss); - const release = await processImages(); + const release = await processImages(true); // remove notifications to prevent other images to be downloaded (do not curl 20000 images for nothing) release(); @@ -48,7 +48,7 @@ describe("images", () => { }); expectStatus(ret, body).toBe(201); - const release = await processImages(); + const release = await processImages(true); // remove notifications to prevent other images to be downloaded (do not curl 20000 images for nothing) release();