mirror of
https://github.com/zoriya/Kyoo.git
synced 2026-05-22 15:12:28 -04:00
Batch images task insertion and add priority
This commit is contained in:
@@ -15,7 +15,7 @@ import { getFile } from "~/utils";
|
||||
export const imageDir = process.env.IMAGES_PATH ?? "/images";
|
||||
export const defaultBlurhash = "000000";
|
||||
|
||||
type ImageTask = {
|
||||
export type ImageTask = {
|
||||
id: string;
|
||||
url: string;
|
||||
table: string;
|
||||
@@ -25,12 +25,12 @@ type ImageTask = {
|
||||
// this will only push a task to the image downloader service and not download it instantly.
|
||||
// this is both done to prevent too many requests to be sent at once and to make sure POST
|
||||
// requests are not blocked by image downloading or blurhash calculation
|
||||
export const enqueueOptImage = async (
|
||||
tx: Transaction,
|
||||
export const enqueueOptImage = (
|
||||
imgQueue: ImageTask[],
|
||||
img:
|
||||
| { url: string | null; column: PgColumn }
|
||||
| { url: string | null; table: PgTable; column: SQL },
|
||||
): Promise<Image | null> => {
|
||||
): Image | null => {
|
||||
if (!img.url) return null;
|
||||
|
||||
const hasher = new Bun.CryptoHasher("sha256");
|
||||
@@ -66,11 +66,8 @@ export const enqueueOptImage = async (
|
||||
table: db.dialect.sqlToQuery(sql`${img.column.table}`).sql,
|
||||
column: sql.identifier(img.column.name).value,
|
||||
};
|
||||
await tx.insert(mqueue).values({
|
||||
kind: "image",
|
||||
message,
|
||||
});
|
||||
await tx.execute(sql`notify kyoo_image`);
|
||||
|
||||
imgQueue.push(message);
|
||||
|
||||
return {
|
||||
id,
|
||||
@@ -79,6 +76,20 @@ export const enqueueOptImage = async (
|
||||
};
|
||||
};
|
||||
|
||||
export const flushImageQueue = async (
|
||||
tx: Transaction,
|
||||
imgQueue: ImageTask[],
|
||||
priority: number,
|
||||
) => {
|
||||
if (!imgQueue.length) return;
|
||||
record("enqueue images", async () => {
|
||||
await tx
|
||||
.insert(mqueue)
|
||||
.values(imgQueue.map((x) => ({ kind: "image", message: x, priority })));
|
||||
await tx.execute(sql`notify kyoo_image`);
|
||||
});
|
||||
};
|
||||
|
||||
export const processImages = async () => {
|
||||
return record("download images", async () => {
|
||||
let running = false;
|
||||
@@ -114,7 +125,7 @@ async function processOne() {
|
||||
.from(mqueue)
|
||||
.for("update", { skipLocked: true })
|
||||
.where(and(eq(mqueue.kind, "image"), lt(mqueue.attempt, 5)))
|
||||
.orderBy(mqueue.attempt, mqueue.createdAt)
|
||||
.orderBy(mqueue.priority, mqueue.attempt, mqueue.createdAt)
|
||||
.limit(1);
|
||||
|
||||
if (!item) return false;
|
||||
|
||||
@@ -5,7 +5,7 @@ import { conflictUpdateAllExcept } from "~/db/utils";
|
||||
import type { SeedCollection } from "~/models/collections";
|
||||
import type { SeedMovie } from "~/models/movie";
|
||||
import type { SeedSerie } from "~/models/serie";
|
||||
import { enqueueOptImage } from "../images";
|
||||
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
|
||||
|
||||
type ShowTrans = typeof showTranslations.$inferInsert;
|
||||
|
||||
@@ -19,6 +19,7 @@ export const insertCollection = async (
|
||||
const { translations, ...col } = collection;
|
||||
|
||||
return await db.transaction(async (tx) => {
|
||||
const imgQueue: ImageTask[] = [];
|
||||
const [ret] = await tx
|
||||
.insert(shows)
|
||||
.values({
|
||||
@@ -48,29 +49,30 @@ export const insertCollection = async (
|
||||
})
|
||||
.returning({ pk: shows.pk, id: shows.id, slug: shows.slug });
|
||||
|
||||
const trans: ShowTrans[] = await Promise.all(
|
||||
Object.entries(translations).map(async ([lang, tr]) => ({
|
||||
const trans: ShowTrans[] = Object.entries(translations).map(
|
||||
([lang, tr]) => ({
|
||||
pk: ret.pk,
|
||||
language: lang,
|
||||
...tr,
|
||||
poster: await enqueueOptImage(tx, {
|
||||
poster: enqueueOptImage(imgQueue, {
|
||||
url: tr.poster,
|
||||
column: showTranslations.poster,
|
||||
}),
|
||||
thumbnail: await enqueueOptImage(tx, {
|
||||
thumbnail: enqueueOptImage(imgQueue, {
|
||||
url: tr.thumbnail,
|
||||
column: showTranslations.thumbnail,
|
||||
}),
|
||||
logo: await enqueueOptImage(tx, {
|
||||
logo: enqueueOptImage(imgQueue, {
|
||||
url: tr.logo,
|
||||
column: showTranslations.logo,
|
||||
}),
|
||||
banner: await enqueueOptImage(tx, {
|
||||
banner: enqueueOptImage(imgQueue, {
|
||||
url: tr.banner,
|
||||
column: showTranslations.banner,
|
||||
}),
|
||||
})),
|
||||
}),
|
||||
);
|
||||
await flushImageQueue(tx, imgQueue, 100);
|
||||
await tx
|
||||
.insert(showTranslations)
|
||||
.values(trans)
|
||||
|
||||
@@ -8,7 +8,7 @@ import {
|
||||
} from "~/db/schema";
|
||||
import { conflictUpdateAllExcept, values } from "~/db/utils";
|
||||
import type { SeedEntry as SEntry, SeedExtra as SExtra } from "~/models/entry";
|
||||
import { enqueueOptImage } from "../images";
|
||||
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
|
||||
import { guessNextRefresh } from "../refresh";
|
||||
import { updateAvailableCount, updateAvailableSince } from "./shows";
|
||||
|
||||
@@ -50,30 +50,29 @@ export const insertEntries = async (
|
||||
if (!items.length) return [];
|
||||
|
||||
const retEntries = await db.transaction(async (tx) => {
|
||||
const vals: EntryI[] = await Promise.all(
|
||||
items.map(async (seed) => {
|
||||
const { translations, videos, video, ...entry } = seed;
|
||||
return {
|
||||
...entry,
|
||||
showPk: show.pk,
|
||||
slug: generateSlug(show.slug, seed),
|
||||
thumbnail: await enqueueOptImage(tx, {
|
||||
url: seed.thumbnail,
|
||||
column: entries.thumbnail,
|
||||
}),
|
||||
nextRefresh:
|
||||
entry.kind !== "extra"
|
||||
? guessNextRefresh(entry.airDate ?? new Date())
|
||||
: guessNextRefresh(new Date()),
|
||||
episodeNumber:
|
||||
entry.kind === "episode"
|
||||
? entry.episodeNumber
|
||||
: entry.kind === "special"
|
||||
? entry.number
|
||||
: undefined,
|
||||
};
|
||||
}),
|
||||
);
|
||||
const imgQueue: ImageTask[] = [];
|
||||
const vals: EntryI[] = items.map((seed) => {
|
||||
const { translations, videos, video, ...entry } = seed;
|
||||
return {
|
||||
...entry,
|
||||
showPk: show.pk,
|
||||
slug: generateSlug(show.slug, seed),
|
||||
thumbnail: enqueueOptImage(imgQueue, {
|
||||
url: seed.thumbnail,
|
||||
column: entries.thumbnail,
|
||||
}),
|
||||
nextRefresh:
|
||||
entry.kind !== "extra"
|
||||
? guessNextRefresh(entry.airDate ?? new Date())
|
||||
: guessNextRefresh(new Date()),
|
||||
episodeNumber:
|
||||
entry.kind === "episode"
|
||||
? entry.episodeNumber
|
||||
: entry.kind === "special"
|
||||
? entry.number
|
||||
: undefined,
|
||||
};
|
||||
});
|
||||
const ret = await tx
|
||||
.insert(entries)
|
||||
.values(vals)
|
||||
@@ -89,41 +88,36 @@ export const insertEntries = async (
|
||||
})
|
||||
.returning({ pk: entries.pk, id: entries.id, slug: entries.slug });
|
||||
|
||||
const trans: EntryTransI[] = (
|
||||
await Promise.all(
|
||||
items.map(async (seed, i) => {
|
||||
if (seed.kind === "extra") {
|
||||
return [
|
||||
{
|
||||
pk: ret[i].pk,
|
||||
// yeah we hardcode the language to extra because if we want to support
|
||||
// translations one day it won't be awkward
|
||||
language: "extra",
|
||||
name: seed.name,
|
||||
description: null,
|
||||
poster: undefined,
|
||||
},
|
||||
];
|
||||
}
|
||||
const trans: EntryTransI[] = items.flatMap((seed, i) => {
|
||||
if (seed.kind === "extra") {
|
||||
return [
|
||||
{
|
||||
pk: ret[i].pk,
|
||||
// yeah we hardcode the language to extra because if we want to support
|
||||
// translations one day it won't be awkward
|
||||
language: "extra",
|
||||
name: seed.name,
|
||||
description: null,
|
||||
poster: undefined,
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
return await Promise.all(
|
||||
Object.entries(seed.translations).map(async ([lang, tr]) => ({
|
||||
// assumes ret is ordered like items.
|
||||
pk: ret[i].pk,
|
||||
language: lang,
|
||||
...tr,
|
||||
poster:
|
||||
seed.kind === "movie"
|
||||
? await enqueueOptImage(tx, {
|
||||
url: (tr as any).poster,
|
||||
column: entryTranslations.poster,
|
||||
})
|
||||
: undefined,
|
||||
})),
|
||||
);
|
||||
}),
|
||||
)
|
||||
).flat();
|
||||
return Object.entries(seed.translations).map(([lang, tr]) => ({
|
||||
// assumes ret is ordered like items.
|
||||
pk: ret[i].pk,
|
||||
language: lang,
|
||||
...tr,
|
||||
poster:
|
||||
seed.kind === "movie"
|
||||
? enqueueOptImage(imgQueue, {
|
||||
url: (tr as any).poster,
|
||||
column: entryTranslations.poster,
|
||||
})
|
||||
: undefined,
|
||||
}));
|
||||
});
|
||||
await flushImageQueue(tx, imgQueue, 0);
|
||||
await tx
|
||||
.insert(entryTranslations)
|
||||
.values(trans)
|
||||
|
||||
@@ -2,7 +2,7 @@ import { db } from "~/db";
|
||||
import { seasons, seasonTranslations } from "~/db/schema";
|
||||
import { conflictUpdateAllExcept } from "~/db/utils";
|
||||
import type { SeedSeason } from "~/models/season";
|
||||
import { enqueueOptImage } from "../images";
|
||||
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
|
||||
import { guessNextRefresh } from "../refresh";
|
||||
|
||||
type SeasonI = typeof seasons.$inferInsert;
|
||||
@@ -15,6 +15,7 @@ export const insertSeasons = async (
|
||||
if (!items.length) return [];
|
||||
|
||||
return db.transaction(async (tx) => {
|
||||
const imgQueue: ImageTask[] = [];
|
||||
const vals: SeasonI[] = items.map((x) => {
|
||||
const { translations, ...season } = x;
|
||||
return {
|
||||
@@ -42,33 +43,27 @@ export const insertSeasons = async (
|
||||
})
|
||||
.returning({ pk: seasons.pk, id: seasons.id, slug: seasons.slug });
|
||||
|
||||
const trans: SeasonTransI[] = (
|
||||
await Promise.all(
|
||||
items.map(
|
||||
async (seed, i) =>
|
||||
await Promise.all(
|
||||
Object.entries(seed.translations).map(async ([lang, tr]) => ({
|
||||
// assumes ret is ordered like items.
|
||||
pk: ret[i].pk,
|
||||
language: lang,
|
||||
...tr,
|
||||
poster: await enqueueOptImage(tx, {
|
||||
url: tr.poster,
|
||||
column: seasonTranslations.poster,
|
||||
}),
|
||||
thumbnail: await enqueueOptImage(tx, {
|
||||
url: tr.thumbnail,
|
||||
column: seasonTranslations.thumbnail,
|
||||
}),
|
||||
banner: await enqueueOptImage(tx, {
|
||||
url: tr.banner,
|
||||
column: seasonTranslations.banner,
|
||||
}),
|
||||
})),
|
||||
),
|
||||
),
|
||||
)
|
||||
).flat();
|
||||
const trans: SeasonTransI[] = items.flatMap((seed, i) =>
|
||||
Object.entries(seed.translations).map(([lang, tr]) => ({
|
||||
// assumes ret is ordered like items.
|
||||
pk: ret[i].pk,
|
||||
language: lang,
|
||||
...tr,
|
||||
poster: enqueueOptImage(imgQueue, {
|
||||
url: tr.poster,
|
||||
column: seasonTranslations.poster,
|
||||
}),
|
||||
thumbnail: enqueueOptImage(imgQueue, {
|
||||
url: tr.thumbnail,
|
||||
column: seasonTranslations.thumbnail,
|
||||
}),
|
||||
banner: enqueueOptImage(imgQueue, {
|
||||
url: tr.banner,
|
||||
column: seasonTranslations.banner,
|
||||
}),
|
||||
})),
|
||||
);
|
||||
await flushImageQueue(tx, imgQueue, -10);
|
||||
await tx
|
||||
.insert(seasonTranslations)
|
||||
.values(trans)
|
||||
|
||||
@@ -22,7 +22,7 @@ import type { SeedMovie } from "~/models/movie";
|
||||
import type { SeedSerie } from "~/models/serie";
|
||||
import type { Original } from "~/models/utils";
|
||||
import { getYear } from "~/utils";
|
||||
import { enqueueOptImage } from "../images";
|
||||
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
|
||||
|
||||
type Show = typeof shows.$inferInsert;
|
||||
type ShowTrans = typeof showTranslations.$inferInsert;
|
||||
@@ -41,24 +41,25 @@ export const insertShow = async (
|
||||
| SeedCollection["translations"],
|
||||
) => {
|
||||
return await db.transaction(async (tx) => {
|
||||
const imgQueue: ImageTask[] = [];
|
||||
const orig = {
|
||||
...original,
|
||||
poster: await enqueueOptImage(tx, {
|
||||
poster: enqueueOptImage(imgQueue, {
|
||||
url: original.poster,
|
||||
table: shows,
|
||||
column: sql`${shows.original}['poster']`,
|
||||
}),
|
||||
thumbnail: await enqueueOptImage(tx, {
|
||||
thumbnail: enqueueOptImage(imgQueue, {
|
||||
url: original.thumbnail,
|
||||
table: shows,
|
||||
column: sql`${shows.original}['thumbnail']`,
|
||||
}),
|
||||
banner: await enqueueOptImage(tx, {
|
||||
banner: enqueueOptImage(imgQueue, {
|
||||
url: original.banner,
|
||||
table: shows,
|
||||
column: sql`${shows.original}['banner']`,
|
||||
}),
|
||||
logo: await enqueueOptImage(tx, {
|
||||
logo: enqueueOptImage(imgQueue, {
|
||||
url: original.logo,
|
||||
table: shows,
|
||||
column: sql`${shows.original}['logo']`,
|
||||
@@ -67,30 +68,31 @@ export const insertShow = async (
|
||||
const ret = await insertBaseShow(tx, { ...show, original: orig });
|
||||
if ("status" in ret) return ret;
|
||||
|
||||
const trans: ShowTrans[] = await Promise.all(
|
||||
Object.entries(translations).map(async ([lang, tr]) => ({
|
||||
const trans: ShowTrans[] = Object.entries(translations).map(
|
||||
([lang, tr]) => ({
|
||||
pk: ret.pk,
|
||||
language: lang,
|
||||
...tr,
|
||||
latinName: tr.latinName ?? null,
|
||||
poster: await enqueueOptImage(tx, {
|
||||
poster: enqueueOptImage(imgQueue, {
|
||||
url: tr.poster,
|
||||
column: showTranslations.poster,
|
||||
}),
|
||||
thumbnail: await enqueueOptImage(tx, {
|
||||
thumbnail: enqueueOptImage(imgQueue, {
|
||||
url: tr.thumbnail,
|
||||
column: showTranslations.thumbnail,
|
||||
}),
|
||||
logo: await enqueueOptImage(tx, {
|
||||
logo: enqueueOptImage(imgQueue, {
|
||||
url: tr.logo,
|
||||
column: showTranslations.logo,
|
||||
}),
|
||||
banner: await enqueueOptImage(tx, {
|
||||
banner: enqueueOptImage(imgQueue, {
|
||||
url: tr.banner,
|
||||
column: showTranslations.banner,
|
||||
}),
|
||||
})),
|
||||
}),
|
||||
);
|
||||
await flushImageQueue(tx, imgQueue, 200);
|
||||
await tx
|
||||
.insert(showTranslations)
|
||||
.values(trans)
|
||||
|
||||
@@ -3,7 +3,7 @@ import { db } from "~/db";
|
||||
import { roles, staff } from "~/db/schema";
|
||||
import { conflictUpdateAllExcept } from "~/db/utils";
|
||||
import type { SeedStaff } from "~/models/staff";
|
||||
import { enqueueOptImage } from "../images";
|
||||
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
|
||||
|
||||
export const insertStaff = async (
|
||||
seed: SeedStaff[] | undefined,
|
||||
@@ -12,15 +12,14 @@ export const insertStaff = async (
|
||||
if (!seed?.length) return [];
|
||||
|
||||
return await db.transaction(async (tx) => {
|
||||
const people = await Promise.all(
|
||||
seed.map(async (x) => ({
|
||||
...x.staff,
|
||||
image: await enqueueOptImage(tx, {
|
||||
url: x.staff.image,
|
||||
column: staff.image,
|
||||
}),
|
||||
})),
|
||||
);
|
||||
const imgQueue: ImageTask[] = [];
|
||||
const people = seed.map((x) => ({
|
||||
...x.staff,
|
||||
image: enqueueOptImage(imgQueue, {
|
||||
url: x.staff.image,
|
||||
column: staff.image,
|
||||
}),
|
||||
}));
|
||||
const ret = await tx
|
||||
.insert(staff)
|
||||
.values(people)
|
||||
@@ -30,22 +29,22 @@ export const insertStaff = async (
|
||||
})
|
||||
.returning({ pk: staff.pk, id: staff.id, slug: staff.slug });
|
||||
|
||||
const rval = await Promise.all(
|
||||
seed.map(async (x, i) => ({
|
||||
showPk,
|
||||
staffPk: ret[i].pk,
|
||||
kind: x.kind,
|
||||
order: i,
|
||||
character: {
|
||||
...x.character,
|
||||
image: await enqueueOptImage(tx, {
|
||||
url: x.character.image,
|
||||
table: roles,
|
||||
column: sql`${roles.character}['image']`,
|
||||
}),
|
||||
},
|
||||
})),
|
||||
);
|
||||
const rval = seed.map((x, i) => ({
|
||||
showPk,
|
||||
staffPk: ret[i].pk,
|
||||
kind: x.kind,
|
||||
order: i,
|
||||
character: {
|
||||
...x.character,
|
||||
image: enqueueOptImage(imgQueue, {
|
||||
url: x.character.image,
|
||||
table: roles,
|
||||
column: sql`${roles.character}['image']`,
|
||||
}),
|
||||
},
|
||||
}));
|
||||
|
||||
await flushImageQueue(tx, imgQueue, -200);
|
||||
|
||||
// always replace all roles. this is because:
|
||||
// - we want `order` to stay in sync (& without duplicates)
|
||||
|
||||
@@ -2,7 +2,7 @@ import { db } from "~/db";
|
||||
import { showStudioJoin, studios, studioTranslations } from "~/db/schema";
|
||||
import { conflictUpdateAllExcept } from "~/db/utils";
|
||||
import type { SeedStudio } from "~/models/studio";
|
||||
import { enqueueOptImage } from "../images";
|
||||
import { enqueueOptImage, flushImageQueue, ImageTask } from "../images";
|
||||
|
||||
type StudioI = typeof studios.$inferInsert;
|
||||
type StudioTransI = typeof studioTranslations.$inferInsert;
|
||||
@@ -33,24 +33,19 @@ export const insertStudios = async (
|
||||
})
|
||||
.returning({ pk: studios.pk, id: studios.id, slug: studios.slug });
|
||||
|
||||
const trans: StudioTransI[] = (
|
||||
await Promise.all(
|
||||
seed.map(
|
||||
async (x, i) =>
|
||||
await Promise.all(
|
||||
Object.entries(x.translations).map(async ([lang, tr]) => ({
|
||||
pk: ret[i].pk,
|
||||
language: lang,
|
||||
name: tr.name,
|
||||
logo: await enqueueOptImage(tx, {
|
||||
url: tr.logo,
|
||||
column: studioTranslations.logo,
|
||||
}),
|
||||
})),
|
||||
),
|
||||
),
|
||||
)
|
||||
).flat();
|
||||
const imgQueue: ImageTask[] = [];
|
||||
const trans: StudioTransI[] = seed.flatMap((x, i) =>
|
||||
Object.entries(x.translations).map(([lang, tr]) => ({
|
||||
pk: ret[i].pk,
|
||||
language: lang,
|
||||
name: tr.name,
|
||||
logo: enqueueOptImage(imgQueue, {
|
||||
url: tr.logo,
|
||||
column: studioTranslations.logo,
|
||||
}),
|
||||
})),
|
||||
);
|
||||
await flushImageQueue(tx, imgQueue, -100);
|
||||
await tx
|
||||
.insert(studioTranslations)
|
||||
.values(trans)
|
||||
|
||||
Reference in New Issue
Block a user