diff --git a/server/package-lock.json b/server/package-lock.json index 4e451cc8e1..06152e9335 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -28,7 +28,7 @@ "archiver": "^7.0.0", "async-lock": "^1.4.0", "bcrypt": "^5.1.1", - "bullmq": "^4.8.0", + "bullmq": "^5.51.0", "chokidar": "^3.5.3", "class-transformer": "^0.5.1", "class-validator": "^0.14.0", @@ -6886,63 +6886,20 @@ } }, "node_modules/bullmq": { - "version": "4.18.2", - "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-4.18.2.tgz", - "integrity": "sha512-Cx0O98IlGiFw7UBa+zwGz+nH0Pcl1wfTvMVBlsMna3s0219hXroVovh1xPRgomyUcbyciHiugGCkW0RRNZDHYQ==", + "version": "5.51.0", + "resolved": "https://registry.npmjs.org/bullmq/-/bullmq-5.51.0.tgz", + "integrity": "sha512-YjX+CO2U4nmbCq2ZgNb/Hnu6Xk953j8EFmp0eehTuudavPyNstoZsbnyvvM6PX9rfD9clhcc5kRLyyWoFEM3Lg==", "license": "MIT", "dependencies": { - "cron-parser": "^4.6.0", - "glob": "^8.0.3", - "ioredis": "^5.3.2", - "lodash": "^4.17.21", - "msgpackr": "^1.6.2", + "cron-parser": "^4.9.0", + "ioredis": "^5.4.1", + "msgpackr": "^1.11.2", "node-abort-controller": "^3.1.1", "semver": "^7.5.4", "tslib": "^2.0.0", "uuid": "^9.0.0" } }, - "node_modules/bullmq/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", - "license": "MIT", - "dependencies": { - "balanced-match": "^1.0.0" - } - }, - "node_modules/bullmq/node_modules/glob": { - "version": "8.1.0", - "resolved": "https://registry.npmjs.org/glob/-/glob-8.1.0.tgz", - "integrity": "sha512-r8hpEjiQEYlF2QU0df3dS+nxxSIreXQS1qRhMJM0Q5NDdR386C7jb7Hwwod8Fgiuex+k0GFjgft18yvxm5XoCQ==", - "deprecated": "Glob versions prior to v9 are no longer supported", - "license": "ISC", - "dependencies": { - "fs.realpath": "^1.0.0", - "inflight": "^1.0.4", - "inherits": "2", - "minimatch": "^5.0.1", - "once": "^1.3.0" - }, - "engines": { - "node": ">=12" - }, - "funding": { - "url": "https://github.com/sponsors/isaacs" - } - }, - "node_modules/bullmq/node_modules/minimatch": { - "version": "5.1.6", - "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-5.1.6.tgz", - "integrity": "sha512-lKwV/1brpG6mBUFHtb7NUmtABCb2WZZmm2wNiOA5hAb8VdCS4B3dtMWyvcoViccwAW/COERjXLt0zP1zXUN26g==", - "license": "ISC", - "dependencies": { - "brace-expansion": "^2.0.1" - }, - "engines": { - "node": ">=10" - } - }, "node_modules/busboy": { "version": "1.6.0", "resolved": "https://registry.npmjs.org/busboy/-/busboy-1.6.0.tgz", diff --git a/server/package.json b/server/package.json index 33d1450a53..b4a8841156 100644 --- a/server/package.json +++ b/server/package.json @@ -53,7 +53,7 @@ "archiver": "^7.0.0", "async-lock": "^1.4.0", "bcrypt": "^5.1.1", - "bullmq": "^4.8.0", + "bullmq": "^5.51.0", "chokidar": "^3.5.3", "class-transformer": "^0.5.1", "class-validator": "^0.14.0", diff --git a/server/src/repositories/event.repository.ts b/server/src/repositories/event.repository.ts index 96d04fd23c..307b8b0ef4 100644 --- a/server/src/repositories/event.repository.ts +++ b/server/src/repositories/event.repository.ts @@ -48,7 +48,7 @@ type EventMap = { 'config.validate': [{ newConfig: SystemConfig; oldConfig: SystemConfig }]; // album events - 'album.update': [{ id: string; recipientIds: string[] }]; + 'album.update': [{ id: string; recipientId: string }]; 'album.invite': [{ id: string; userId: string }]; // asset events diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index 0912759d1c..32a4f75d67 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -9,7 +9,7 @@ import { JobName, JobStatus, MetadataKey, QueueCleanType, QueueName } from 'src/ import { ConfigRepository } from 'src/repositories/config.repository'; import { EventRepository } from 'src/repositories/event.repository'; import { LoggingRepository } from 'src/repositories/logging.repository'; -import { IEntityJob, JobCounts, JobItem, JobOf, QueueStatus } from 'src/types'; +import { JobCounts, JobItem, JobOf, QueueStatus } from 'src/types'; import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc'; type JobMapItem = { @@ -206,7 +206,10 @@ export class JobRepository { private getJobOptions(item: JobItem): JobsOptions | null { switch (item.name) { case JobName.NOTIFY_ALBUM_UPDATE: { - return { jobId: item.data.id, delay: item.data?.delay }; + return { + jobId: `${item.data.id}/${item.data.recipientId}`, + delay: item.data?.delay, + }; } case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: { return { jobId: item.data.id }; @@ -227,19 +230,12 @@ export class JobRepository { return this.moduleRef.get(getQueueToken(queue), { strict: false }); } - public async removeJob(jobId: string, name: JobName): Promise { - const existingJob = await this.getQueue(this.getQueueName(name)).getJob(jobId); - if (!existingJob) { - return; - } - try { + /** @deprecated */ + // todo: remove this when asset notifications no longer need it. + public async removeJob(name: JobName, jobID: string): Promise { + const existingJob = await this.getQueue(this.getQueueName(name)).getJob(jobID); + if (existingJob) { await existingJob.remove(); - } catch (error: any) { - if (error.message?.includes('Missing key for job')) { - return; - } - throw error; } - return existingJob.data; } } diff --git a/server/src/services/album.service.spec.ts b/server/src/services/album.service.spec.ts index a0fbfc0817..9a3bb605f7 100644 --- a/server/src/services/album.service.spec.ts +++ b/server/src/services/album.service.spec.ts @@ -606,7 +606,7 @@ describe(AlbumService.name, () => { expect(mocks.album.addAssetIds).toHaveBeenCalledWith('album-123', ['asset-1', 'asset-2', 'asset-3']); expect(mocks.event.emit).toHaveBeenCalledWith('album.update', { id: 'album-123', - recipientIds: ['admin_id'], + recipientId: 'admin_id', }); }); diff --git a/server/src/services/album.service.ts b/server/src/services/album.service.ts index 1c612de8c0..d4e6ab7ffd 100644 --- a/server/src/services/album.service.ts +++ b/server/src/services/album.service.ts @@ -170,8 +170,8 @@ export class AlbumService extends BaseService { (userId) => userId !== auth.user.id, ); - if (allUsersExceptUs.length > 0) { - await this.eventRepository.emit('album.update', { id, recipientIds: allUsersExceptUs }); + for (const recipientId of allUsersExceptUs) { + await this.eventRepository.emit('album.update', { id, recipientId }); } } diff --git a/server/src/services/notification.service.spec.ts b/server/src/services/notification.service.spec.ts index 133eb9e7f6..b0f2a3ab62 100644 --- a/server/src/services/notification.service.spec.ts +++ b/server/src/services/notification.service.spec.ts @@ -154,10 +154,10 @@ describe(NotificationService.name, () => { describe('onAlbumUpdateEvent', () => { it('should queue notify album update event', async () => { - await sut.onAlbumUpdate({ id: 'album', recipientIds: ['42'] }); + await sut.onAlbumUpdate({ id: 'album', recipientId: '42' }); expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.NOTIFY_ALBUM_UPDATE, - data: { id: 'album', recipientIds: ['42'], delay: 300_000 }, + data: { id: 'album', recipientId: '42', delay: 300_000 }, }); }); }); @@ -414,14 +414,14 @@ describe(NotificationService.name, () => { describe('handleAlbumUpdate', () => { it('should skip if album could not be found', async () => { - await expect(sut.handleAlbumUpdate({ id: '', recipientIds: ['1'] })).resolves.toBe(JobStatus.SKIPPED); + await expect(sut.handleAlbumUpdate({ id: '', recipientId: '1' })).resolves.toBe(JobStatus.SKIPPED); expect(mocks.user.get).not.toHaveBeenCalled(); }); it('should skip if owner could not be found', async () => { mocks.album.getById.mockResolvedValue(albumStub.emptyWithValidThumbnail); - await expect(sut.handleAlbumUpdate({ id: '', recipientIds: ['1'] })).resolves.toBe(JobStatus.SKIPPED); + await expect(sut.handleAlbumUpdate({ id: '', recipientId: '1' })).resolves.toBe(JobStatus.SKIPPED); expect(mocks.systemMetadata.get).not.toHaveBeenCalled(); }); @@ -434,7 +434,7 @@ describe(NotificationService.name, () => { mocks.email.renderEmail.mockResolvedValue({ html: '', text: '' }); mocks.assetJob.getAlbumThumbnailFiles.mockResolvedValue([]); - await sut.handleAlbumUpdate({ id: '', recipientIds: [userStub.user1.id] }); + await sut.handleAlbumUpdate({ id: '', recipientId: userStub.user1.id }); expect(mocks.user.get).toHaveBeenCalledWith(userStub.user1.id, { withDeleted: false }); expect(mocks.email.renderEmail).not.toHaveBeenCalled(); }); @@ -456,7 +456,7 @@ describe(NotificationService.name, () => { mocks.email.renderEmail.mockResolvedValue({ html: '', text: '' }); mocks.assetJob.getAlbumThumbnailFiles.mockResolvedValue([]); - await sut.handleAlbumUpdate({ id: '', recipientIds: [userStub.user1.id] }); + await sut.handleAlbumUpdate({ id: '', recipientId: userStub.user1.id }); expect(mocks.user.get).toHaveBeenCalledWith(userStub.user1.id, { withDeleted: false }); expect(mocks.email.renderEmail).not.toHaveBeenCalled(); }); @@ -478,7 +478,7 @@ describe(NotificationService.name, () => { mocks.email.renderEmail.mockResolvedValue({ html: '', text: '' }); mocks.assetJob.getAlbumThumbnailFiles.mockResolvedValue([]); - await sut.handleAlbumUpdate({ id: '', recipientIds: [userStub.user1.id] }); + await sut.handleAlbumUpdate({ id: '', recipientId: userStub.user1.id }); expect(mocks.user.get).toHaveBeenCalledWith(userStub.user1.id, { withDeleted: false }); expect(mocks.email.renderEmail).not.toHaveBeenCalled(); }); @@ -492,21 +492,21 @@ describe(NotificationService.name, () => { mocks.email.renderEmail.mockResolvedValue({ html: '', text: '' }); mocks.assetJob.getAlbumThumbnailFiles.mockResolvedValue([]); - await sut.handleAlbumUpdate({ id: '', recipientIds: [userStub.user1.id] }); + await sut.handleAlbumUpdate({ id: '', recipientId: userStub.user1.id }); expect(mocks.user.get).toHaveBeenCalledWith(userStub.user1.id, { withDeleted: false }); expect(mocks.email.renderEmail).toHaveBeenCalled(); expect(mocks.job.queue).toHaveBeenCalled(); }); it('should add new recipients for new images if job is already queued', async () => { - mocks.job.removeJob.mockResolvedValue({ id: '1', recipientIds: ['2', '3', '4'] } as INotifyAlbumUpdateJob); - await sut.onAlbumUpdate({ id: '1', recipientIds: ['1', '2', '3'] } as INotifyAlbumUpdateJob); + await sut.onAlbumUpdate({ id: '1', recipientId: '2' } as INotifyAlbumUpdateJob); + expect(mocks.job.removeJob).toHaveBeenCalledWith(JobName.NOTIFY_ALBUM_UPDATE, '1/2'); expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.NOTIFY_ALBUM_UPDATE, data: { id: '1', delay: 300_000, - recipientIds: ['1', '2', '3', '4'], + recipientId: '2', }, }); }); diff --git a/server/src/services/notification.service.ts b/server/src/services/notification.service.ts index 518feae7fd..e72f77ad4f 100644 --- a/server/src/services/notification.service.ts +++ b/server/src/services/notification.service.ts @@ -23,7 +23,7 @@ import { import { EmailTemplate } from 'src/repositories/email.repository'; import { ArgOf } from 'src/repositories/event.repository'; import { BaseService } from 'src/services/base.service'; -import { EmailImageAttachment, IEntityJob, INotifyAlbumUpdateJob, JobItem, JobOf } from 'src/types'; +import { EmailImageAttachment, JobOf } from 'src/types'; import { getFilenameExtension } from 'src/utils/file'; import { getExternalDomain } from 'src/utils/misc'; import { isEqualObject } from 'src/utils/object'; @@ -198,30 +198,12 @@ export class NotificationService extends BaseService { } @OnEvent({ name: 'album.update' }) - async onAlbumUpdate({ id, recipientIds }: ArgOf<'album.update'>) { - // if recipientIds is empty, album likely only has one user part of it, don't queue notification if so - if (recipientIds.length === 0) { - return; - } - - const job: JobItem = { + async onAlbumUpdate({ id, recipientId }: ArgOf<'album.update'>) { + await this.jobRepository.removeJob(JobName.NOTIFY_ALBUM_UPDATE, `${id}/${recipientId}`); + await this.jobRepository.queue({ name: JobName.NOTIFY_ALBUM_UPDATE, - data: { id, recipientIds, delay: NotificationService.albumUpdateEmailDelayMs }, - }; - - const previousJobData = await this.jobRepository.removeJob(id, JobName.NOTIFY_ALBUM_UPDATE); - if (previousJobData && this.isAlbumUpdateJob(previousJobData)) { - for (const id of previousJobData.recipientIds) { - if (!recipientIds.includes(id)) { - recipientIds.push(id); - } - } - } - await this.jobRepository.queue(job); - } - - private isAlbumUpdateJob(job: IEntityJob): job is INotifyAlbumUpdateJob { - return 'recipientIds' in job; + data: { id, recipientId, delay: NotificationService.albumUpdateEmailDelayMs }, + }); } @OnEvent({ name: 'album.invite' }) @@ -412,7 +394,7 @@ export class NotificationService extends BaseService { } @OnJob({ name: JobName.NOTIFY_ALBUM_UPDATE, queue: QueueName.NOTIFICATION }) - async handleAlbumUpdate({ id, recipientIds }: JobOf) { + async handleAlbumUpdate({ id, recipientId }: JobOf) { const album = await this.albumRepository.getById(id, { withAssets: false }); if (!album) { @@ -424,49 +406,44 @@ export class NotificationService extends BaseService { return JobStatus.SKIPPED; } - const recipients = [...album.albumUsers.map((user) => user.user), owner].filter((user) => - recipientIds.includes(user.id), - ); const attachment = await this.getAlbumThumbnailAttachment(album); const { server, templates } = await this.getConfig({ withCache: false }); - for (const recipient of recipients) { - const user = await this.userRepository.get(recipient.id, { withDeleted: false }); - if (!user) { - continue; - } - - const { emailNotifications } = getPreferences(user.metadata); - - if (!emailNotifications.enabled || !emailNotifications.albumUpdate) { - continue; - } - - const { html, text } = await this.emailRepository.renderEmail({ - template: EmailTemplate.ALBUM_UPDATE, - data: { - baseUrl: getExternalDomain(server), - albumId: album.id, - albumName: album.albumName, - recipientName: recipient.name, - cid: attachment ? attachment.cid : undefined, - }, - customTemplate: templates.email.albumUpdateTemplate, - }); - - await this.jobRepository.queue({ - name: JobName.SEND_EMAIL, - data: { - to: recipient.email, - subject: `New media has been added to an album - ${album.albumName}`, - html, - text, - imageAttachments: attachment ? [attachment] : undefined, - }, - }); + const user = await this.userRepository.get(recipientId, { withDeleted: false }); + if (!user) { + return JobStatus.SKIPPED; } + const { emailNotifications } = getPreferences(user.metadata); + + if (!emailNotifications.enabled || !emailNotifications.albumUpdate) { + return JobStatus.SKIPPED; + } + + const { html, text } = await this.emailRepository.renderEmail({ + template: EmailTemplate.ALBUM_UPDATE, + data: { + baseUrl: getExternalDomain(server), + albumId: album.id, + albumName: album.albumName, + recipientName: user.name, + cid: attachment ? attachment.cid : undefined, + }, + customTemplate: templates.email.albumUpdateTemplate, + }); + + await this.jobRepository.queue({ + name: JobName.SEND_EMAIL, + data: { + to: user.email, + subject: `New media has been added to an album - ${album.albumName}`, + html, + text, + imageAttachments: attachment ? [attachment] : undefined, + }, + }); + return JobStatus.SUCCESS; } diff --git a/server/src/types.ts b/server/src/types.ts index ae2122c6ac..d18ef297ef 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -252,7 +252,7 @@ export interface INotifyAlbumInviteJob extends IEntityJob { } export interface INotifyAlbumUpdateJob extends IEntityJob, IDelayedJob { - recipientIds: string[]; + recipientId: string; } export interface JobCounts { diff --git a/server/test/repositories/config.repository.mock.ts b/server/test/repositories/config.repository.mock.ts index 4943a56a33..a2585e3b9e 100644 --- a/server/test/repositories/config.repository.mock.ts +++ b/server/test/repositories/config.repository.mock.ts @@ -10,6 +10,7 @@ const envData: EnvData = { buildMetadata: {}, bull: { config: { + connection: {}, prefix: 'immich_bull', }, queues: [{ name: 'queue-1' }],