From 5bceefce753a58ee13e802af2f55b61d414d891f Mon Sep 17 00:00:00 2001 From: Daniel Dietzler <36593685+danieldietzler@users.noreply.github.com> Date: Tue, 15 Apr 2025 19:53:28 +0200 Subject: [PATCH] refactor: stream assets for thumbnail job (#17623) --- server/src/queries/asset.job.repository.sql | 31 +++++++++++ .../src/repositories/asset-job.repository.ts | 23 ++++++++ server/src/services/media.service.spec.ts | 55 +++++-------------- server/src/services/media.service.ts | 28 +++------- 4 files changed, 75 insertions(+), 62 deletions(-) diff --git a/server/src/queries/asset.job.repository.sql b/server/src/queries/asset.job.repository.sql index 0b469b40fc..8fd6d1daf5 100644 --- a/server/src/queries/asset.job.repository.sql +++ b/server/src/queries/asset.job.repository.sql @@ -59,6 +59,37 @@ where limit $2 +-- AssetJobRepository.streamForThumbnailJob +select + "assets"."id", + "assets"."thumbhash", + ( + select + coalesce(json_agg(agg), '[]') + from + ( + select + "asset_files"."id", + "asset_files"."path", + "asset_files"."type" + from + "asset_files" + where + "asset_files"."assetId" = "assets"."id" + ) as agg + ) as "files" +from + "assets" + inner join "asset_job_status" on "asset_job_status"."assetId" = "assets"."id" +where + "assets"."deletedAt" is null + and "assets"."isVisible" = $1 + and ( + "asset_job_status"."previewAt" is null + or "asset_job_status"."thumbnailAt" is null + or "assets"."thumbhash" is null + ) + -- AssetJobRepository.getForStorageTemplateJob select "assets"."id", diff --git a/server/src/repositories/asset-job.repository.ts b/server/src/repositories/asset-job.repository.ts index 1d04a468e4..b0ae001242 100644 --- a/server/src/repositories/asset-job.repository.ts +++ b/server/src/repositories/asset-job.repository.ts @@ -54,6 +54,29 @@ export class AssetJobRepository { .executeTakeFirst(); } + @GenerateSql({ params: [false], stream: true }) + streamForThumbnailJob(force: boolean) { + return this.db + .selectFrom('assets') + .select(['assets.id', 'assets.thumbhash']) + .select(withFiles) + .where('assets.deletedAt', 'is', null) + .where('assets.isVisible', '=', true) + .$if(!force, (qb) => + qb + // If there aren't any entries, metadata extraction hasn't run yet which is required for thumbnails + .innerJoin('asset_job_status', 'asset_job_status.assetId', 'assets.id') + .where((eb) => + eb.or([ + eb('asset_job_status.previewAt', 'is', null), + eb('asset_job_status.thumbnailAt', 'is', null), + eb('assets.thumbhash', 'is', null), + ]), + ), + ) + .stream(); + } + private storageTemplateAssetQuery() { return this.db .selectFrom('assets') diff --git a/server/src/services/media.service.spec.ts b/server/src/services/media.service.spec.ts index c55a277fae..b6a13bc83b 100644 --- a/server/src/services/media.service.spec.ts +++ b/server/src/services/media.service.spec.ts @@ -39,6 +39,7 @@ describe(MediaService.name, () => { describe('handleQueueGenerateThumbnails', () => { it('should queue all assets', async () => { + mocks.assetJob.streamForThumbnailJob.mockReturnValue(makeStream([assetStub.image])); mocks.asset.getAll.mockResolvedValue({ items: [assetStub.image], hasNextPage: false, @@ -49,8 +50,7 @@ describe(MediaService.name, () => { await sut.handleQueueGenerateThumbnails({ force: true }); - expect(mocks.asset.getAll).toHaveBeenCalled(); - expect(mocks.asset.getWithout).not.toHaveBeenCalled(); + expect(mocks.assetJob.streamForThumbnailJob).toHaveBeenCalledWith(true); expect(mocks.job.queueAll).toHaveBeenCalledWith([ { name: JobName.GENERATE_THUMBNAILS, @@ -68,6 +68,7 @@ describe(MediaService.name, () => { }); it('should queue trashed assets when force is true', async () => { + mocks.assetJob.streamForThumbnailJob.mockReturnValue(makeStream([assetStub.archived])); mocks.asset.getAll.mockResolvedValue({ items: [assetStub.trashed], hasNextPage: false, @@ -76,11 +77,7 @@ describe(MediaService.name, () => { await sut.handleQueueGenerateThumbnails({ force: true }); - expect(mocks.asset.getAll).toHaveBeenCalledWith( - { skip: 0, take: 1000 }, - expect.objectContaining({ withDeleted: true }), - ); - expect(mocks.asset.getWithout).not.toHaveBeenCalled(); + expect(mocks.assetJob.streamForThumbnailJob).toHaveBeenCalledWith(true); expect(mocks.job.queueAll).toHaveBeenCalledWith([ { name: JobName.GENERATE_THUMBNAILS, @@ -90,19 +87,12 @@ describe(MediaService.name, () => { }); it('should queue archived assets when force is true', async () => { - mocks.asset.getAll.mockResolvedValue({ - items: [assetStub.archived], - hasNextPage: false, - }); + mocks.assetJob.streamForThumbnailJob.mockReturnValue(makeStream([assetStub.archived])); mocks.person.getAll.mockReturnValue(makeStream()); await sut.handleQueueGenerateThumbnails({ force: true }); - expect(mocks.asset.getAll).toHaveBeenCalledWith( - { skip: 0, take: 1000 }, - expect.objectContaining({ withArchived: true }), - ); - expect(mocks.asset.getWithout).not.toHaveBeenCalled(); + expect(mocks.assetJob.streamForThumbnailJob).toHaveBeenCalledWith(true); expect(mocks.job.queueAll).toHaveBeenCalledWith([ { name: JobName.GENERATE_THUMBNAILS, @@ -112,18 +102,13 @@ describe(MediaService.name, () => { }); it('should queue all people with missing thumbnail path', async () => { - mocks.asset.getWithout.mockResolvedValue({ - items: [assetStub.image], - hasNextPage: false, - }); + mocks.assetJob.streamForThumbnailJob.mockReturnValue(makeStream([assetStub.image])); mocks.person.getAll.mockReturnValue(makeStream([personStub.noThumbnail, personStub.noThumbnail])); mocks.person.getRandomFace.mockResolvedValueOnce(faceStub.face1); await sut.handleQueueGenerateThumbnails({ force: false }); - expect(mocks.asset.getAll).not.toHaveBeenCalled(); - expect(mocks.asset.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.THUMBNAIL); - + expect(mocks.assetJob.streamForThumbnailJob).toHaveBeenCalledWith(false); expect(mocks.person.getAll).toHaveBeenCalledWith({ thumbnailPath: '' }); expect(mocks.person.getRandomFace).toHaveBeenCalled(); expect(mocks.person.update).toHaveBeenCalledTimes(1); @@ -138,15 +123,11 @@ describe(MediaService.name, () => { }); it('should queue all assets with missing resize path', async () => { - mocks.asset.getWithout.mockResolvedValue({ - items: [assetStub.noResizePath], - hasNextPage: false, - }); + mocks.assetJob.streamForThumbnailJob.mockReturnValue(makeStream([assetStub.noResizePath])); mocks.person.getAll.mockReturnValue(makeStream()); await sut.handleQueueGenerateThumbnails({ force: false }); - expect(mocks.asset.getAll).not.toHaveBeenCalled(); - expect(mocks.asset.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.THUMBNAIL); + expect(mocks.assetJob.streamForThumbnailJob).toHaveBeenCalledWith(false); expect(mocks.job.queueAll).toHaveBeenCalledWith([ { name: JobName.GENERATE_THUMBNAILS, @@ -158,15 +139,11 @@ describe(MediaService.name, () => { }); it('should queue all assets with missing webp path', async () => { - mocks.asset.getWithout.mockResolvedValue({ - items: [assetStub.noWebpPath], - hasNextPage: false, - }); + mocks.assetJob.streamForThumbnailJob.mockReturnValue(makeStream([assetStub.noWebpPath])); mocks.person.getAll.mockReturnValue(makeStream()); await sut.handleQueueGenerateThumbnails({ force: false }); - expect(mocks.asset.getAll).not.toHaveBeenCalled(); - expect(mocks.asset.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.THUMBNAIL); + expect(mocks.assetJob.streamForThumbnailJob).toHaveBeenCalledWith(false); expect(mocks.job.queueAll).toHaveBeenCalledWith([ { name: JobName.GENERATE_THUMBNAILS, @@ -178,15 +155,11 @@ describe(MediaService.name, () => { }); it('should queue all assets with missing thumbhash', async () => { - mocks.asset.getWithout.mockResolvedValue({ - items: [assetStub.noThumbhash], - hasNextPage: false, - }); + mocks.assetJob.streamForThumbnailJob.mockReturnValue(makeStream([assetStub.noThumbhash])); mocks.person.getAll.mockReturnValue(makeStream()); await sut.handleQueueGenerateThumbnails({ force: false }); - expect(mocks.asset.getAll).not.toHaveBeenCalled(); - expect(mocks.asset.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.THUMBNAIL); + expect(mocks.assetJob.streamForThumbnailJob).toHaveBeenCalledWith(false); expect(mocks.job.queueAll).toHaveBeenCalledWith([ { name: JobName.GENERATE_THUMBNAILS, diff --git a/server/src/services/media.service.ts b/server/src/services/media.service.ts index 5318cdc97f..5fdee7c346 100644 --- a/server/src/services/media.service.ts +++ b/server/src/services/media.service.ts @@ -51,30 +51,16 @@ export class MediaService extends BaseService { @OnJob({ name: JobName.QUEUE_GENERATE_THUMBNAILS, queue: QueueName.THUMBNAIL_GENERATION }) async handleQueueGenerateThumbnails({ force }: JobOf): Promise { - const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { - return force - ? this.assetRepository.getAll(pagination, { - isVisible: true, - withDeleted: true, - withArchived: true, - }) - : this.assetRepository.getWithout(pagination, WithoutProperty.THUMBNAIL); - }); + const thumbJobs: JobItem[] = []; + for await (const asset of this.assetJobRepository.streamForThumbnailJob(!!force)) { + const { previewFile, thumbnailFile } = getAssetFiles(asset.files); - for await (const assets of assetPagination) { - const jobs: JobItem[] = []; - - for (const asset of assets) { - const { previewFile, thumbnailFile } = getAssetFiles(asset.files); - - if (!previewFile || !thumbnailFile || !asset.thumbhash || force) { - jobs.push({ name: JobName.GENERATE_THUMBNAILS, data: { id: asset.id } }); - continue; - } + if (!previewFile || !thumbnailFile || !asset.thumbhash || force) { + thumbJobs.push({ name: JobName.GENERATE_THUMBNAILS, data: { id: asset.id } }); + continue; } - - await this.jobRepository.queueAll(jobs); } + await this.jobRepository.queueAll(thumbJobs); const jobs: JobItem[] = [];