review feedback

This commit is contained in:
mertalev
2026-05-07 18:33:23 -04:00
parent 4ec06aaa38
commit 1b8a5f3307
3 changed files with 9 additions and 3 deletions
+4 -2
View File
@@ -24,8 +24,8 @@ type AssetWithStreamInfo = { videoStream: VideoStreamInfo & { timeBase: number }
@Injectable()
export class HlsService extends BaseService {
private pendingSegments = new PendingEvents<'HlsSegmentResult'>(15_000);
private pendingSessions = new PendingEvents<'HlsSessionResult'>(5000);
private pendingSegments = new PendingEvents<'HlsSegmentResult'>({ timeoutMs: 15_000 });
private pendingSessions = new PendingEvents<'HlsSessionResult'>({ timeoutMs: 5000 });
private sessions = new Map<string, { lastRequestedSegment: number | null }>();
@OnEvent({ name: 'HlsSessionResult', server: true, workers: [ImmichWorker.Api] })
@@ -60,6 +60,8 @@ export class HlsService extends BaseService {
throw new NotFoundException('Asset is not yet ready for streaming');
}
// Sharing the sessionId allows only one microservices worker to successfully insert to the session table.
// The microservices worker that creates a session owns the transcoding lifecycle for it.
const sessionId = this.cryptoRepository.randomUUID();
this.websocketRepository.serverSend('HlsSessionRequest', { sessionId, assetId, ownerId: auth.user.id });
await this.pendingSessions.wait(sessionId);
@@ -81,6 +81,7 @@ export class TranscodingService extends BaseService {
this.cleanupInterval ??= setInterval(() => void this.removeInactiveSessions(), HLS_CLEANUP_INTERVAL_MS);
this.websocketRepository.serverSend('HlsSessionResult', { sessionId });
} catch (error) {
// If insertion failed due to a PK constraint, another worker has already created a session for this ID.
if (!isVideoStreamSessionPkConstraint(error)) {
this.logger.error(`Failed to create HLS session ${sessionId}: ${error}`);
this.websocketRepository.serverSend('HlsSessionResult', { sessionId, error: 'Failed to create HLS session' });
+4 -1
View File
@@ -2,8 +2,11 @@ import { ArgOf, EmitEvent } from 'src/repositories/event.repository';
export class PendingEvents<T extends { [T in EmitEvent]: ArgOf<T> extends { error?: string } ? T : never }[EmitEvent]> {
private pending = new Map<string, { completers: PromiseWithResolvers<ArgOf<T>>[]; timeout: NodeJS.Timeout }>();
private timeoutMs: number;
constructor(private timeoutMs: number) {}
constructor({ timeoutMs }: { timeoutMs: number }) {
this.timeoutMs = timeoutMs;
}
wait(key: string): Promise<ArgOf<T>> {
const completer = Promise.withResolvers<ArgOf<T>>();