From da7a81b7525ad72bd92ab2ba434073a7b034e930 Mon Sep 17 00:00:00 2001 From: Thomas <9749173+uhthomas@users.noreply.github.com> Date: Wed, 30 Apr 2025 22:45:35 +0100 Subject: [PATCH] chore(server): split album update notifications into multiple jobs (#17879) We would like to move away from the concept of finding and removing pending jobs. The only place this is used is for album update notifications, and this is done so that users who initially uploaded assets to an album will also receive a notification if someone else then adds assets to the same album. This can also be achieved with a job for each recipient. Multiple jobs also has the advantage that it will scale better for albums with many users, it's possible to send notifications concurrently, retries are possible without sending duplicate notifications, and it's clear what recipient a job failed for. --- server/package-lock.json | 57 ++-------- server/package.json | 2 +- server/src/repositories/event.repository.ts | 2 +- server/src/repositories/job.repository.ts | 24 ++--- server/src/services/album.service.spec.ts | 2 +- server/src/services/album.service.ts | 4 +- .../src/services/notification.service.spec.ts | 22 ++-- server/src/services/notification.service.ts | 101 +++++++----------- server/src/types.ts | 2 +- .../repositories/config.repository.mock.ts | 1 + 10 files changed, 74 insertions(+), 143 deletions(-) diff --git a/server/package-lock.json b/server/package-lock.json index 4e451cc8e1..06152e9335 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -28,7 +28,7 @@ "archiver": "^7.0.0", "async-lock": "^1.4.0", "bcrypt": "^5.1.1", - "bullmq": "^4.8.0", + "bullmq": "^5.51.0", "chokidar": "^3.5.3", "class-transformer": "^0.5.1", "class-validator": "^0.14.0", @@ -6886,63 +6886,20 @@ } }, "node_modules/bullmq": { - "version": "4.18.2", - "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-4.18.2.tgz", - "integrity": "sha512-Cx0O98IlGiFw7UBa+zwGz+nH0Pcl1wfTvMVBlsMna3s0219hXroVovh1xPRgomyUcbyciHiugGCkW0RRNZDHYQ==", + "version": "5.51.0", + "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.51.0.tgz", + "integrity": "sha512-YjX+CO2U4nmbCq2ZgNb/Hnu6Xk953j8EFmp0eehTuudavPyNstoZsbnyvvM6PX9rfD9clhcc5kRLyyWoFEM3Lg==", "license": "MIT", "dependencies": { - "cron-parser": "^4.6.0", - "glob": "^8.0.3", - "ioredis": "^5.3.2", - "lodash": "^4.17.21", - "msgpackr": "^1.6.2", + "cron-parser": "^4.9.0", + "ioredis": "^5.4.1", + "msgpackr": "^1.11.2", "node-abort-controller": "^3.1.1", "semver": "^7.5.4", "tslib": "^2.0.0", "uuid": "^9.0.0" } }, - "node_modules/bullmq/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", - "license": "MIT", - "dependencies": { - "balanced-match": "^1.0.0" - } - }, - "node_modules/bullmq/node_modules/glob": { - "version": "8.1.0", - "resolved": "https://registry.npmjs.org/glob/-/glob-8.1.0.tgz", - "integrity": "sha512-r8hpEjiQEYlF2QU0df3dS+nxxSIreXQS1qRhMJM0Q5NDdR386C7jb7Hwwod8Fgiuex+k0GFjgft18yvxm5XoCQ==", - "deprecated": "Glob versions prior to v9 are no longer supported", - "license": "ISC", - "dependencies": { - "fs.realpath": "^1.0.0", - "inflight": "^1.0.4", - "inherits": "2", - "minimatch": "^5.0.1", - "once": "^1.3.0" - }, - "engines": { - "node": ">=12" - }, - "funding": { - "url": "https://github.com/sponsors/isaacs" - } - }, - "node_modules/bullmq/node_modules/minimatch": { - "version": "5.1.6", - "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-5.1.6.tgz", - "integrity": "sha512-lKwV/1brpG6mBUFHtb7NUmtABCb2WZZmm2wNiOA5hAb8VdCS4B3dtMWyvcoViccwAW/COERjXLt0zP1zXUN26g==", - "license": "ISC", - "dependencies": { - "brace-expansion": "^2.0.1" - }, - "engines": { - "node": ">=10" - } - }, "node_modules/busboy": { "version": "1.6.0", "resolved": "https://registry.npmjs.org/busboy/-/busboy-1.6.0.tgz", diff --git a/server/package.json b/server/package.json index 33d1450a53..b4a8841156 100644 --- a/server/package.json +++ b/server/package.json @@ -53,7 +53,7 @@ "archiver": "^7.0.0", "async-lock": "^1.4.0", "bcrypt": "^5.1.1", - "bullmq": "^4.8.0", + "bullmq": "^5.51.0", "chokidar": "^3.5.3", "class-transformer": "^0.5.1", "class-validator": "^0.14.0", diff --git a/server/src/repositories/event.repository.ts b/server/src/repositories/event.repository.ts index 96d04fd23c..307b8b0ef4 100644 --- a/server/src/repositories/event.repository.ts +++ b/server/src/repositories/event.repository.ts @@ -48,7 +48,7 @@ type EventMap = { 'config.validate': [{ newConfig: SystemConfig; oldConfig: SystemConfig }]; // album events - 'album.update': [{ id: string; recipientIds: string[] }]; + 'album.update': [{ id: string; recipientId: string }]; 'album.invite': [{ id: string; userId: string }]; // asset events diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index 0912759d1c..32a4f75d67 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -9,7 +9,7 @@ import { JobName, JobStatus, MetadataKey, QueueCleanType, QueueName } from 'src/ import { ConfigRepository } from 'src/repositories/config.repository'; import { EventRepository } from 'src/repositories/event.repository'; import { LoggingRepository } from 'src/repositories/logging.repository'; -import { IEntityJob, JobCounts, JobItem, JobOf, QueueStatus } from 'src/types'; +import { JobCounts, JobItem, JobOf, QueueStatus } from 'src/types'; import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc'; type JobMapItem = { @@ -206,7 +206,10 @@ export class JobRepository { private getJobOptions(item: JobItem): JobsOptions | null { switch (item.name) { case JobName.NOTIFY_ALBUM_UPDATE: { - return { jobId: item.data.id, delay: item.data?.delay }; + return { + jobId: `${item.data.id}/${item.data.recipientId}`, + delay: item.data?.delay, + }; } case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: { return { jobId: item.data.id }; @@ -227,19 +230,12 @@ export class JobRepository { return this.moduleRef.get(getQueueToken(queue), { strict: false }); } - public async removeJob(jobId: string, name: JobName): Promise { - const existingJob = await this.getQueue(this.getQueueName(name)).getJob(jobId); - if (!existingJob) { - return; - } - try { + /** @deprecated */ + // todo: remove this when asset notifications no longer need it. + public async removeJob(name: JobName, jobID: string): Promise { + const existingJob = await this.getQueue(this.getQueueName(name)).getJob(jobID); + if (existingJob) { await existingJob.remove(); - } catch (error: any) { - if (error.message?.includes('Missing key for job')) { - return; - } - throw error; } - return existingJob.data; } } diff --git a/server/src/services/album.service.spec.ts b/server/src/services/album.service.spec.ts index a0fbfc0817..9a3bb605f7 100644 --- a/server/src/services/album.service.spec.ts +++ b/server/src/services/album.service.spec.ts @@ -606,7 +606,7 @@ describe(AlbumService.name, () => { expect(mocks.album.addAssetIds).toHaveBeenCalledWith('album-123', ['asset-1', 'asset-2', 'asset-3']); expect(mocks.event.emit).toHaveBeenCalledWith('album.update', { id: 'album-123', - recipientIds: ['admin_id'], + recipientId: 'admin_id', }); }); diff --git a/server/src/services/album.service.ts b/server/src/services/album.service.ts index 1c612de8c0..d4e6ab7ffd 100644 --- a/server/src/services/album.service.ts +++ b/server/src/services/album.service.ts @@ -170,8 +170,8 @@ export class AlbumService extends BaseService { (userId) => userId !== auth.user.id, ); - if (allUsersExceptUs.length > 0) { - await this.eventRepository.emit('album.update', { id, recipientIds: allUsersExceptUs }); + for (const recipientId of allUsersExceptUs) { + await this.eventRepository.emit('album.update', { id, recipientId }); } } diff --git a/server/src/services/notification.service.spec.ts b/server/src/services/notification.service.spec.ts index 133eb9e7f6..b0f2a3ab62 100644 --- a/server/src/services/notification.service.spec.ts +++ b/server/src/services/notification.service.spec.ts @@ -154,10 +154,10 @@ describe(NotificationService.name, () => { describe('onAlbumUpdateEvent', () => { it('should queue notify album update event', async () => { - await sut.onAlbumUpdate({ id: 'album', recipientIds: ['42'] }); + await sut.onAlbumUpdate({ id: 'album', recipientId: '42' }); expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.NOTIFY_ALBUM_UPDATE, - data: { id: 'album', recipientIds: ['42'], delay: 300_000 }, + data: { id: 'album', recipientId: '42', delay: 300_000 }, }); }); }); @@ -414,14 +414,14 @@ describe(NotificationService.name, () => { describe('handleAlbumUpdate', () => { it('should skip if album could not be found', async () => { - await expect(sut.handleAlbumUpdate({ id: '', recipientIds: ['1'] })).resolves.toBe(JobStatus.SKIPPED); + await expect(sut.handleAlbumUpdate({ id: '', recipientId: '1' })).resolves.toBe(JobStatus.SKIPPED); expect(mocks.user.get).not.toHaveBeenCalled(); }); it('should skip if owner could not be found', async () => { mocks.album.getById.mockResolvedValue(albumStub.emptyWithValidThumbnail); - await expect(sut.handleAlbumUpdate({ id: '', recipientIds: ['1'] })).resolves.toBe(JobStatus.SKIPPED); + await expect(sut.handleAlbumUpdate({ id: '', recipientId: '1' })).resolves.toBe(JobStatus.SKIPPED); expect(mocks.systemMetadata.get).not.toHaveBeenCalled(); }); @@ -434,7 +434,7 @@ describe(NotificationService.name, () => { mocks.email.renderEmail.mockResolvedValue({ html: '', text: '' }); mocks.assetJob.getAlbumThumbnailFiles.mockResolvedValue([]); - await sut.handleAlbumUpdate({ id: '', recipientIds: [userStub.user1.id] }); + await sut.handleAlbumUpdate({ id: '', recipientId: userStub.user1.id }); expect(mocks.user.get).toHaveBeenCalledWith(userStub.user1.id, { withDeleted: false }); expect(mocks.email.renderEmail).not.toHaveBeenCalled(); }); @@ -456,7 +456,7 @@ describe(NotificationService.name, () => { mocks.email.renderEmail.mockResolvedValue({ html: '', text: '' }); mocks.assetJob.getAlbumThumbnailFiles.mockResolvedValue([]); - await sut.handleAlbumUpdate({ id: '', recipientIds: [userStub.user1.id] }); + await sut.handleAlbumUpdate({ id: '', recipientId: userStub.user1.id }); expect(mocks.user.get).toHaveBeenCalledWith(userStub.user1.id, { withDeleted: false }); expect(mocks.email.renderEmail).not.toHaveBeenCalled(); }); @@ -478,7 +478,7 @@ describe(NotificationService.name, () => { mocks.email.renderEmail.mockResolvedValue({ html: '', text: '' }); mocks.assetJob.getAlbumThumbnailFiles.mockResolvedValue([]); - await sut.handleAlbumUpdate({ id: '', recipientIds: [userStub.user1.id] }); + await sut.handleAlbumUpdate({ id: '', recipientId: userStub.user1.id }); expect(mocks.user.get).toHaveBeenCalledWith(userStub.user1.id, { withDeleted: false }); expect(mocks.email.renderEmail).not.toHaveBeenCalled(); }); @@ -492,21 +492,21 @@ describe(NotificationService.name, () => { mocks.email.renderEmail.mockResolvedValue({ html: '', text: '' }); mocks.assetJob.getAlbumThumbnailFiles.mockResolvedValue([]); - await sut.handleAlbumUpdate({ id: '', recipientIds: [userStub.user1.id] }); + await sut.handleAlbumUpdate({ id: '', recipientId: userStub.user1.id }); expect(mocks.user.get).toHaveBeenCalledWith(userStub.user1.id, { withDeleted: false }); expect(mocks.email.renderEmail).toHaveBeenCalled(); expect(mocks.job.queue).toHaveBeenCalled(); }); it('should add new recipients for new images if job is already queued', async () => { - mocks.job.removeJob.mockResolvedValue({ id: '1', recipientIds: ['2', '3', '4'] } as INotifyAlbumUpdateJob); - await sut.onAlbumUpdate({ id: '1', recipientIds: ['1', '2', '3'] } as INotifyAlbumUpdateJob); + await sut.onAlbumUpdate({ id: '1', recipientId: '2' } as INotifyAlbumUpdateJob); + expect(mocks.job.removeJob).toHaveBeenCalledWith(JobName.NOTIFY_ALBUM_UPDATE, '1/2'); expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.NOTIFY_ALBUM_UPDATE, data: { id: '1', delay: 300_000, - recipientIds: ['1', '2', '3', '4'], + recipientId: '2', }, }); }); diff --git a/server/src/services/notification.service.ts b/server/src/services/notification.service.ts index 518feae7fd..e72f77ad4f 100644 --- a/server/src/services/notification.service.ts +++ b/server/src/services/notification.service.ts @@ -23,7 +23,7 @@ import { import { EmailTemplate } from 'src/repositories/email.repository'; import { ArgOf } from 'src/repositories/event.repository'; import { BaseService } from 'src/services/base.service'; -import { EmailImageAttachment, IEntityJob, INotifyAlbumUpdateJob, JobItem, JobOf } from 'src/types'; +import { EmailImageAttachment, JobOf } from 'src/types'; import { getFilenameExtension } from 'src/utils/file'; import { getExternalDomain } from 'src/utils/misc'; import { isEqualObject } from 'src/utils/object'; @@ -198,30 +198,12 @@ export class NotificationService extends BaseService { } @OnEvent({ name: 'album.update' }) - async onAlbumUpdate({ id, recipientIds }: ArgOf<'album.update'>) { - // if recipientIds is empty, album likely only has one user part of it, don't queue notification if so - if (recipientIds.length === 0) { - return; - } - - const job: JobItem = { + async onAlbumUpdate({ id, recipientId }: ArgOf<'album.update'>) { + await this.jobRepository.removeJob(JobName.NOTIFY_ALBUM_UPDATE, `${id}/${recipientId}`); + await this.jobRepository.queue({ name: JobName.NOTIFY_ALBUM_UPDATE, - data: { id, recipientIds, delay: NotificationService.albumUpdateEmailDelayMs }, - }; - - const previousJobData = await this.jobRepository.removeJob(id, JobName.NOTIFY_ALBUM_UPDATE); - if (previousJobData && this.isAlbumUpdateJob(previousJobData)) { - for (const id of previousJobData.recipientIds) { - if (!recipientIds.includes(id)) { - recipientIds.push(id); - } - } - } - await this.jobRepository.queue(job); - } - - private isAlbumUpdateJob(job: IEntityJob): job is INotifyAlbumUpdateJob { - return 'recipientIds' in job; + data: { id, recipientId, delay: NotificationService.albumUpdateEmailDelayMs }, + }); } @OnEvent({ name: 'album.invite' }) @@ -412,7 +394,7 @@ export class NotificationService extends BaseService { } @OnJob({ name: JobName.NOTIFY_ALBUM_UPDATE, queue: QueueName.NOTIFICATION }) - async handleAlbumUpdate({ id, recipientIds }: JobOf) { + async handleAlbumUpdate({ id, recipientId }: JobOf) { const album = await this.albumRepository.getById(id, { withAssets: false }); if (!album) { @@ -424,49 +406,44 @@ export class NotificationService extends BaseService { return JobStatus.SKIPPED; } - const recipients = [...album.albumUsers.map((user) => user.user), owner].filter((user) => - recipientIds.includes(user.id), - ); const attachment = await this.getAlbumThumbnailAttachment(album); const { server, templates } = await this.getConfig({ withCache: false }); - for (const recipient of recipients) { - const user = await this.userRepository.get(recipient.id, { withDeleted: false }); - if (!user) { - continue; - } - - const { emailNotifications } = getPreferences(user.metadata); - - if (!emailNotifications.enabled || !emailNotifications.albumUpdate) { - continue; - } - - const { html, text } = await this.emailRepository.renderEmail({ - template: EmailTemplate.ALBUM_UPDATE, - data: { - baseUrl: getExternalDomain(server), - albumId: album.id, - albumName: album.albumName, - recipientName: recipient.name, - cid: attachment ? attachment.cid : undefined, - }, - customTemplate: templates.email.albumUpdateTemplate, - }); - - await this.jobRepository.queue({ - name: JobName.SEND_EMAIL, - data: { - to: recipient.email, - subject: `New media has been added to an album - ${album.albumName}`, - html, - text, - imageAttachments: attachment ? [attachment] : undefined, - }, - }); + const user = await this.userRepository.get(recipientId, { withDeleted: false }); + if (!user) { + return JobStatus.SKIPPED; } + const { emailNotifications } = getPreferences(user.metadata); + + if (!emailNotifications.enabled || !emailNotifications.albumUpdate) { + return JobStatus.SKIPPED; + } + + const { html, text } = await this.emailRepository.renderEmail({ + template: EmailTemplate.ALBUM_UPDATE, + data: { + baseUrl: getExternalDomain(server), + albumId: album.id, + albumName: album.albumName, + recipientName: user.name, + cid: attachment ? attachment.cid : undefined, + }, + customTemplate: templates.email.albumUpdateTemplate, + }); + + await this.jobRepository.queue({ + name: JobName.SEND_EMAIL, + data: { + to: user.email, + subject: `New media has been added to an album - ${album.albumName}`, + html, + text, + imageAttachments: attachment ? [attachment] : undefined, + }, + }); + return JobStatus.SUCCESS; } diff --git a/server/src/types.ts b/server/src/types.ts index ae2122c6ac..d18ef297ef 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -252,7 +252,7 @@ export interface INotifyAlbumInviteJob extends IEntityJob { } export interface INotifyAlbumUpdateJob extends IEntityJob, IDelayedJob { - recipientIds: string[]; + recipientId: string; } export interface JobCounts { diff --git a/server/test/repositories/config.repository.mock.ts b/server/test/repositories/config.repository.mock.ts index 4943a56a33..a2585e3b9e 100644 --- a/server/test/repositories/config.repository.mock.ts +++ b/server/test/repositories/config.repository.mock.ts @@ -10,6 +10,7 @@ const envData: EnvData = { buildMetadata: {}, bull: { config: { + connection: {}, prefix: 'immich_bull', }, queues: [{ name: 'queue-1' }],