Rework history to prevent duplicates in the last day

This commit is contained in:
Zoe Roux 2025-12-14 21:43:02 +01:00
parent 86f4ce2bd8
commit fd29c6f682
No known key found for this signature in database
4 changed files with 264 additions and 199 deletions

View File

@ -1,11 +1,28 @@
import { and, count, eq, exists, gt, isNotNull, ne, sql } from "drizzle-orm";
import {
and,
count,
eq,
exists,
gt,
isNotNull,
lte,
ne,
sql,
} from "drizzle-orm";
import { alias } from "drizzle-orm/pg-core";
import Elysia, { t } from "elysia";
import { auth, getUserInfo } from "~/auth";
import { db } from "~/db";
import { entries, history, profiles, shows, videos } from "~/db/schema";
import { db, type Transaction } from "~/db";
import {
entries,
entryVideoJoin,
history,
profiles,
shows,
videos,
} from "~/db/schema";
import { watchlist } from "~/db/schema/watchlist";
import { coalesce, values } from "~/db/utils";
import { coalesce } from "~/db/utils";
import { Entry } from "~/models/entry";
import { KError } from "~/models/error";
import { SeedHistory } from "~/models/history";
@ -13,12 +30,12 @@ import {
AcceptLanguage,
createPage,
Filter,
isUuid,
Page,
processLanguages,
} from "~/models/utils";
import { desc } from "~/models/utils/descriptions";
import type { WatchlistStatus } from "~/models/watchlist";
import { traverse } from "~/utils";
import {
entryFilters,
entryProgressQ,
@ -27,6 +44,220 @@ import {
} from "../entries";
import { getOrCreateProfile } from "./profile";
export async function updateHistory(
dbTx: Transaction,
userPk: number,
progress: SeedHistory[],
) {
return dbTx.transaction(async (tx) => {
const existing = (
await tx
.select({ videoId: videos.id })
.from(history)
.for("update")
.leftJoin(videos, eq(videos.pk, history.videoPk))
.where(
and(
eq(history.profilePk, userPk),
lte(history.playedDate, sql`interval '1 day'`),
),
)
).map((x) => x.videoId);
const toUpdate = traverse(
progress.filter((x) => existing.includes(x.videoId)),
);
const newEntries = traverse(
progress.filter((x) => !existing.includes(x.videoId)),
);
const updated = await tx
.update(history)
.set({
time: sql`hist.ts`,
percent: sql`hist.percent`,
playedDate: coalesce(sql`hist.played_date`, sql`now()`)
})
.from(sql`unnest(
${toUpdate.videoId}::uuid[],
${toUpdate.time}::integer[],
${toUpdate.percent}::integer[],
${toUpdate.playedDate}::timestamp[]
) as hist(video_id, ts, percent, played_date)`)
.innerJoin(videos, eq(videos.id, sql`hist.video_id`))
.where(and(eq(history.profilePk, userPk), eq(history.videoPk, videos.pk)))
.returning({
entryPk: history.entryPk,
percent: history.percent,
playedDate: history.playedDate,
});
const ret = await tx
.insert(history)
.select(
db
.select({
profilePk: sql`${userPk}`.as("profilePk"),
entryPk: entries.pk,
videoPk: videos.pk,
percent: sql`hist.percent`.as("percent"),
time: sql`hist.ts`.as("time"),
playedDate: coalesce(sql`hist.played_date`, sql`now()`).as(
"playedDate",
),
})
.from(sql`unnest(
${newEntries.videoId}::uuid[],
${newEntries.time}::integer[],
${newEntries.percent}::integer[],
${newEntries.playedDate}::timestamptz[]
) as hist(video_id, ts, percent, played_date)`)
.innerJoin(videos, eq(videos.id, sql`hist.videoId`))
.leftJoin(entryVideoJoin, eq(entryVideoJoin.videoPk, videos.pk))
.leftJoin(entries, eq(entries.pk, entryVideoJoin.entryPk)),
)
.returning({
entryPk: history.entryPk,
percent: history.percent,
playedDate: history.playedDate,
});
// only return new and entries whose status has changed.
// we don't need to update the watchlist every 10s when watching a video.
return [...ret, ...updated.filter((x) => x.percent >= 95)];
});
}
export async function updateWatchlist(
tx: Transaction,
userPk: number,
histArr: Awaited<ReturnType<typeof updateHistory>>,
) {
const nextEntry = alias(entries, "next_entry");
const nextEntryQ = tx
.select({
pk: nextEntry.pk,
})
.from(nextEntry)
.where(
and(
eq(nextEntry.showPk, entries.showPk),
ne(nextEntry.kind, "extra"),
gt(nextEntry.order, entries.order),
),
)
.orderBy(nextEntry.order)
.limit(1)
.as("nextEntryQ");
const seenCountQ = tx
.select({ c: count() })
.from(entries)
.where(
and(
eq(entries.showPk, sql`excluded.show_pk`),
exists(
db
.select()
.from(history)
.where(
and(
eq(history.profilePk, userPk),
eq(history.entryPk, entries.pk),
),
),
),
),
);
const showKindQ = tx
.select({ k: shows.kind })
.from(shows)
.where(eq(shows.pk, sql`excluded.show_pk`));
const hist = traverse(histArr);
await tx
.insert(watchlist)
.select(
db
.selectDistinctOn([entries.showPk], {
profilePk: sql`${userPk}`.as("profilePk"),
showPk: entries.showPk,
status: sql<WatchlistStatus>`
case
when
hist.percent >= 95
and ${nextEntryQ.pk} is null
then 'completed'::watchlist_status
else 'watching'::watchlist_status
end
`.as("status"),
seenCount: sql`
case
when ${entries.kind} = 'movie' then hist.percent
when hist.percent >= 95 then 1
else 0
end
`.as("seen_count"),
nextEntry: sql`
case
when hist.percent >= 95 then ${nextEntryQ.pk}
else ${entries.pk}
end
`.as("next_entry"),
score: sql`null`.as("score"),
startedAt: sql`hist.played_date`.as("startedAt"),
lastPlayedAt: sql`hist.played_date`.as("lastPlayedAt"),
completedAt: sql`
case
when ${nextEntryQ.pk} is null then hist.played_date
else null
end
`.as("completedAt"),
// see https://github.com/drizzle-team/drizzle-orm/issues/3608
updatedAt: sql`now()`.as("updatedAt"),
})
.from(sql`unnest(
${hist.entryPk}::integer[],
${hist.percent}::integer[],
${hist.playedDate}::timestamptz[]
) as hist(entry_pk, percent, played_date)`)
.leftJoin(entries, eq(entries.pk, sql`hist.entry_pk`))
.leftJoinLateral(nextEntryQ, sql`true`),
)
.onConflictDoUpdate({
target: [watchlist.profilePk, watchlist.showPk],
set: {
status: sql`
case
when excluded.status = 'completed' then excluded.status
when
${watchlist.status} != 'completed'
and ${watchlist.status} != 'rewatching'
then excluded.status
else ${watchlist.status}
end
`,
seenCount: sql`
case
when ${showKindQ} = 'movie' then excluded.seen_count
else ${seenCountQ}
end`,
nextEntry: sql`
case
when ${watchlist.status} = 'completed' then null
else excluded.next_entry
end
`,
lastPlayedAt: sql`excluded.last_played_at`,
completedAt: coalesce(
watchlist.completedAt,
sql`excluded.completed_at`,
),
},
});
}
const historyProgressQ: typeof entryProgressQ = db
.select({
percent: history.percent,
@ -170,162 +401,11 @@ export const historyH = new Elysia({ tags: ["profiles"] })
async ({ body, jwt: { sub }, status }) => {
const profilePk = await getOrCreateProfile(sub);
const hist = values(
body.map((x) => ({ ...x, entryUseId: isUuid(x.entry) })),
{
percent: "integer",
time: "integer",
playedDate: "timestamptz",
videoId: "uuid",
},
).as("hist");
const valEqEntries = sql`
case
when hist.entryUseId::boolean then ${entries.id} = hist.entry::uuid
else ${entries.slug} = hist.entry
end
`;
const rows = await db
.insert(history)
.select(
db
.select({
profilePk: sql`${profilePk}`.as("profilePk"),
entryPk: entries.pk,
videoPk: videos.pk,
percent: sql`hist.percent`.as("percent"),
time: sql`hist.time`.as("time"),
playedDate: sql`hist.playedDate`.as("playedDate"),
})
.from(hist)
.innerJoin(entries, valEqEntries)
.leftJoin(videos, eq(videos.id, sql`hist.videoId`)),
)
.returning({ pk: history.pk });
// automatically update watchlist with this new info
const nextEntry = alias(entries, "next_entry");
const nextEntryQ = db
.select({
pk: nextEntry.pk,
})
.from(nextEntry)
.where(
and(
eq(nextEntry.showPk, entries.showPk),
ne(nextEntry.kind, "extra"),
gt(nextEntry.order, entries.order),
),
)
.orderBy(nextEntry.order)
.limit(1)
.as("nextEntryQ");
const seenCountQ = db
.select({ c: count() })
.from(entries)
.where(
and(
eq(entries.showPk, sql`excluded.show_pk`),
exists(
db
.select()
.from(history)
.where(
and(
eq(history.profilePk, profilePk),
eq(history.entryPk, entries.pk),
),
),
),
),
);
const showKindQ = db
.select({ k: shows.kind })
.from(shows)
.where(eq(shows.pk, sql`excluded.show_pk`));
await db
.insert(watchlist)
.select(
db
.select({
profilePk: sql`${profilePk}`.as("profilePk"),
showPk: entries.showPk,
status: sql<WatchlistStatus>`
case
when
hist.percent >= 95
and ${nextEntryQ.pk} is null
then 'completed'::watchlist_status
else 'watching'::watchlist_status
end
`.as("status"),
seenCount: sql`
case
when ${entries.kind} = 'movie' then hist.percent
when hist.percent >= 95 then 1
else 0
end
`.as("seen_count"),
nextEntry: sql`
case
when hist.percent >= 95 then ${nextEntryQ.pk}
else ${entries.pk}
end
`.as("next_entry"),
score: sql`null`.as("score"),
startedAt: sql`hist.playedDate`.as("startedAt"),
lastPlayedAt: sql`hist.playedDate`.as("lastPlayedAt"),
completedAt: sql`
case
when ${nextEntryQ.pk} is null then hist.playedDate
else null
end
`.as("completedAt"),
// see https://github.com/drizzle-team/drizzle-orm/issues/3608
updatedAt: sql`now()`.as("updatedAt"),
})
.from(hist)
.leftJoin(entries, valEqEntries)
.leftJoinLateral(nextEntryQ, sql`true`),
)
.onConflictDoUpdate({
target: [watchlist.profilePk, watchlist.showPk],
set: {
status: sql`
case
when excluded.status = 'completed' then excluded.status
when
${watchlist.status} != 'completed'
and ${watchlist.status} != 'rewatching'
then excluded.status
else ${watchlist.status}
end
`,
seenCount: sql`
case
when ${showKindQ} = 'movie' then excluded.seen_count
else ${seenCountQ}
end`,
nextEntry: sql`
case
when ${watchlist.status} = 'completed' then null
else excluded.next_entry
end
`,
lastPlayedAt: sql`excluded.last_played_at`,
completedAt: coalesce(
watchlist.completedAt,
sql`excluded.completed_at`,
),
},
});
return status(201, { status: 201, inserted: rows.length });
return db.transaction(async (tx) => {
const hist = await updateHistory(tx, profilePk, body);
await updateWatchlist(tx, profilePk, hist);
return status(201, { status: 201, inserted: hist.length });
});
},
{
detail: { description: "Bulk add entries/movies to your watch history." },

View File

@ -92,36 +92,6 @@ export function sqlarr(array: unknown[]): string {
.join(", ")}}`;
}
// See https://github.com/drizzle-team/drizzle-orm/issues/4044
export function values<K extends string>(
items: Record<K, unknown>[],
typeInfo: Partial<Record<K, string>> = {},
) {
if (items[0] === undefined)
throw new Error("Invalid values, expecting at least one items");
const [firstProp, ...props] = Object.keys(items[0]) as K[];
const values = items
.map((x, i) => {
let ret = sql`(${x[firstProp]}`;
if (i === 0 && typeInfo[firstProp])
ret = sql`${ret}::${sql.raw(typeInfo[firstProp])}`;
for (const val of props) {
ret = sql`${ret}, ${x[val]}`;
if (i === 0 && typeInfo[val])
ret = sql`${ret}::${sql.raw(typeInfo[val])}`;
}
return sql`${ret})`;
})
.reduce((acc, x) => sql`${acc}, ${x}`);
const valueNames = [firstProp, ...props].join(", ");
return {
as: (name: string) => {
return sql`(values ${values}) as ${sql.raw(name)}(${sql.raw(valueNames)})`;
},
};
}
/* goal:
* unnestValues([{a: 1, b: 2}, {a: 3, b: 4}], tbl)
*

View File

@ -27,12 +27,10 @@ export const Progress = t.Object({
});
export type Progress = typeof Progress.static;
export const SeedHistory = t.Intersect([
t.Object({
entry: t.String({
description: "Id or slug of the entry/movie you watched",
}),
}),
Progress,
]);
export const SeedHistory = t.Object({
percent: Progress.properties.percent,
time: Progress.properties.time,
playedDate: t.Optional(Progress.properties.playedDate),
videoId: Progress.properties.videoId.anyOf[0],
});
export type SeedHistory = typeof SeedHistory.static;

View File

@ -38,3 +38,20 @@ export function uniqBy<T>(a: T[], key: (val: T) => string): T[] {
return true;
});
}
export function traverse<T extends Record<string, any>>(
arr: T[],
): { [K in keyof T]: T[K][] } {
const result = {} as { [K in keyof T]: T[K][] };
arr.forEach((obj) => {
for (const key in obj) {
if (!result[key]) {
result[key] = [];
}
result[key].push(obj[key]);
}
});
return result;
}