Proper api shutdown & image downlaoding multi run (#1213)

This commit is contained in:
Zoe Roux 2025-12-09 09:30:19 +01:00 committed by GitHub
parent f9af1a9947
commit 00466108a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 53 additions and 30 deletions

View File

@ -92,34 +92,47 @@ export const flushImageQueue = record(
},
);
export const processImages = record("processImages", async () => {
let running = false;
async function processAll() {
if (running) return;
running = true;
export const processImages = record(
"processImages",
async (waitToFinish = false) => {
let running = false;
async function processAll() {
if (running) return;
running = true;
let found = true;
while (found) {
// run 10 downloads at the same time,
// if one of them couldn't find an item the queue is empty.
found = !(
await Promise.all([new Array(10)].map(() => processOne()))
).includes(false);
let found = true;
while (found) {
// run 10 downloads at the same time,
const founds = await Promise.all([...new Array(10)].map(processOne));
// continue as long as there's one found (if it failed we wanna retry)
found = founds.includes(true);
}
running = false;
}
running = false;
}
const client = (await db.$client.connect()) as PoolClient;
client.on("notification", (evt) => {
if (evt.channel !== "kyoo_image") return;
processAll();
});
await client.query("listen kyoo_image");
const client = (await db.$client.connect()) as PoolClient;
client.on("notification", (evt) => {
if (evt.channel !== "kyoo_image") return;
try {
processAll();
} catch (e) {
console.error(
"Failed to processs images. aborting images downloading",
e,
);
}
});
await client.query("listen kyoo_image");
// start processing old tasks
await processAll();
return () => client.release(true);
});
if (waitToFinish) {
// start processing old tasks
await processAll();
} else {
processAll();
}
return () => client.release(true);
},
);
const processOne = record("download", async () => {
return await db.transaction(async (tx) => {
@ -144,7 +157,7 @@ const processOne = record("download", async () => {
await tx.execute(sql`
update ${table} set ${column} = ${ret}
where ${column}->'id' = ${sql.raw(`'"${img.id}"'::jsonb`)}
where ${column}->'id' = to_jsonb(${img.id}::text)
`);
await tx.delete(mqueue).where(eq(mqueue.id, item.id));
@ -154,7 +167,7 @@ const processOne = record("download", async () => {
span.recordException(err);
span.setStatus({ code: SpanStatusCode.ERROR });
}
console.error("Failed to download image", img.url, err.message);
console.error("Failed to download image", img.url, err);
await tx
.update(mqueue)
.set({ attempt: sql`${mqueue.attempt}+1` })

View File

@ -2,12 +2,12 @@ import { swagger } from "@elysiajs/swagger";
import Elysia from "elysia";
import { handlers } from "./base";
import { processImages } from "./controllers/seed/images";
import { migrate } from "./db";
import { db, migrate } from "./db";
import { comment } from "./utils";
await migrate();
processImages();
const disposeImages = await processImages();
const app = new Elysia()
.use(
@ -87,4 +87,14 @@ const app = new Elysia()
.use(handlers)
.listen(3567);
process.on("SIGTERM", () => {
app.stop().then(async () => {
console.log("Api stopping");
disposeImages();
await db.$client.end();
console.log("Api stopped");
process.exit(0);
});
});
console.log(`Api running at ${app.server?.hostname}:${app.server?.port}`);

View File

@ -19,7 +19,7 @@ describe("images", () => {
it("Create a serie download images", async () => {
await db.delete(mqueue);
await createSerie(madeInAbyss);
const release = await processImages();
const release = await processImages(true);
// remove notifications to prevent other images to be downloaded (do not curl 20000 images for nothing)
release();
@ -48,7 +48,7 @@ describe("images", () => {
});
expectStatus(ret, body).toBe(201);
const release = await processImages();
const release = await processImages(true);
// remove notifications to prevent other images to be downloaded (do not curl 20000 images for nothing)
release();