diff --git a/server/src/repositories/asset.repository.ts b/server/src/repositories/asset.repository.ts index 3c05e12b6f..6274c88dfe 100644 --- a/server/src/repositories/asset.repository.ts +++ b/server/src/repositories/asset.repository.ts @@ -431,17 +431,6 @@ export class AssetRepository { return paginationHelper(items as any as AssetEntity[], pagination.take); } - async getAllInLibrary(pagination: PaginationOptions, libraryId: string): Paginated { - const builder = this.db - .selectFrom('assets') - .select('id') - .where('libraryId', '=', asUuid(libraryId)) - .limit(pagination.take + 1) - .offset(pagination.skip ?? 0); - const items = await builder.execute(); - return paginationHelper(items as any as AssetEntity[], pagination.take); - } - /** * Get assets by device's Id on the database * @param ownerId diff --git a/server/src/repositories/library.repository.ts b/server/src/repositories/library.repository.ts index 211802cf21..bd2cb0c1df 100644 --- a/server/src/repositories/library.repository.ts +++ b/server/src/repositories/library.repository.ts @@ -144,4 +144,8 @@ export class LibraryRepository { total: Number(stats.photos) + Number(stats.videos), }; } + + streamAssetIds(libraryId: string) { + return this.db.selectFrom('assets').select(['id']).where('libraryId', '=', libraryId).stream(); + } } diff --git a/server/src/services/library.service.spec.ts b/server/src/services/library.service.spec.ts index 666497e2e1..5c89b09561 100644 --- a/server/src/services/library.service.spec.ts +++ b/server/src/services/library.service.spec.ts @@ -13,7 +13,7 @@ import { libraryStub } from 'test/fixtures/library.stub'; import { systemConfigStub } from 'test/fixtures/system-config.stub'; import { userStub } from 'test/fixtures/user.stub'; import { makeMockWatcher } from 'test/repositories/storage.repository.mock'; -import { newTestService, ServiceMocks } from 'test/utils'; +import { makeStream, newTestService, ServiceMocks } from 'test/utils'; import { vitest } from 'vitest'; async function* mockWalk() { @@ -287,10 +287,10 @@ describe(LibraryService.name, () => { it('should queue asset sync', async () => { mocks.library.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1); mocks.storage.walk.mockImplementation(async function* generator() {}); - mocks.asset.getAll.mockResolvedValue({ items: [assetStub.external], hasNextPage: false }); + mocks.library.streamAssetIds.mockReturnValue(makeStream([assetStub.external])); mocks.asset.getLibraryAssetCount.mockResolvedValue(1); mocks.asset.detectOfflineExternalAssets.mockResolvedValue({ numUpdatedRows: BigInt(0) }); - mocks.asset.getAllInLibrary.mockResolvedValue({ items: [assetStub.external], hasNextPage: false }); + mocks.library.streamAssetIds.mockReturnValue(makeStream([assetStub.external])); const response = await sut.handleQueueSyncAssets({ id: libraryStub.externalLibraryWithImportPaths1.id }); @@ -1039,7 +1039,7 @@ describe(LibraryService.name, () => { describe('handleDeleteLibrary', () => { it('should delete an empty library', async () => { mocks.library.get.mockResolvedValue(libraryStub.externalLibrary1); - mocks.asset.getAll.mockResolvedValue({ items: [], hasNextPage: false }); + mocks.library.streamAssetIds.mockReturnValue(makeStream([])); await expect(sut.handleDeleteLibrary({ id: libraryStub.externalLibrary1.id })).resolves.toBe(JobStatus.SUCCESS); expect(mocks.library.delete).toHaveBeenCalled(); @@ -1047,7 +1047,7 @@ describe(LibraryService.name, () => { it('should delete all assets in a library', async () => { mocks.library.get.mockResolvedValue(libraryStub.externalLibrary1); - mocks.asset.getAll.mockResolvedValue({ items: [assetStub.image1], hasNextPage: false }); + mocks.library.streamAssetIds.mockReturnValue(makeStream([assetStub.image1])); mocks.asset.getById.mockResolvedValue(assetStub.image1); diff --git a/server/src/services/library.service.ts b/server/src/services/library.service.ts index 55d9ae08d4..787d09ca15 100644 --- a/server/src/services/library.service.ts +++ b/server/src/services/library.service.ts @@ -24,7 +24,6 @@ import { BaseService } from 'src/services/base.service'; import { JobOf } from 'src/types'; import { mimeTypes } from 'src/utils/mime-types'; import { handlePromiseError } from 'src/utils/misc'; -import { usePagination } from 'src/utils/pagination'; @Injectable() export class LibraryService extends BaseService { @@ -341,34 +340,37 @@ export class LibraryService extends BaseService { await this.assetRepository.updateByLibraryId(libraryId, { deletedAt: new Date() }); - const assetPagination = usePagination(JOBS_LIBRARY_PAGINATION_SIZE, (pagination) => - this.assetRepository.getAll(pagination, { libraryId, withDeleted: true }), - ); - let assetsFound = false; + let chunk: string[] = []; + + const queueChunk = async () => { + if (chunk.length > 0) { + assetsFound = true; + this.logger.debug(`Queueing deletion of ${chunk.length} asset(s) in library ${libraryId}`); + await this.jobRepository.queueAll( + chunk.map((id) => ({ name: JobName.ASSET_DELETION, data: { id, deleteOnDisk: false } })), + ); + chunk = []; + } + }; this.logger.debug(`Will delete all assets in library ${libraryId}`); - for await (const assets of assetPagination) { - if (assets.length > 0) { - assetsFound = true; - } + const assets = this.libraryRepository.streamAssetIds(libraryId); + for await (const asset of assets) { + chunk.push(asset.id); - this.logger.debug(`Queueing deletion of ${assets.length} asset(s) in library ${libraryId}`); - await this.jobRepository.queueAll( - assets.map((asset) => ({ - name: JobName.ASSET_DELETION, - data: { - id: asset.id, - deleteOnDisk: false, - }, - })), - ); + if (chunk.length >= 10_000) { + await queueChunk(); + } } + await queueChunk(); + if (!assetsFound) { this.logger.log(`Deleting library ${libraryId}`); await this.libraryRepository.delete(libraryId); } + return JobStatus.SUCCESS; } @@ -676,7 +678,6 @@ export class LibraryService extends BaseService { } const assetCount = await this.assetRepository.getLibraryAssetCount(job.id); - if (!assetCount) { this.logger.log(`Library ${library.id} is empty, no need to check assets`); return JobStatus.SUCCESS; @@ -702,42 +703,47 @@ export class LibraryService extends BaseService { return JobStatus.SUCCESS; } - this.logger.log(`Scanning library ${library.id} for assets missing from disk...`); + let chunk: string[] = []; + let count = 0; - const existingAssets = usePagination(JOBS_LIBRARY_PAGINATION_SIZE, (pagination) => - this.assetRepository.getAllInLibrary(pagination, job.id), - ); + const queueChunk = async () => { + if (chunk.length > 0) { + count += chunk.length; - let currentAssetCount = 0; - for await (const assets of existingAssets) { - if (assets.length === 0) { - throw new BadRequestException(`Failed to get assets for library ${job.id}`); + await this.jobRepository.queue({ + name: JobName.LIBRARY_SYNC_ASSETS, + data: { + libraryId: library.id, + importPaths: library.importPaths, + exclusionPatterns: library.exclusionPatterns, + assetIds: chunk.map((id) => id), + progressCounter: count, + totalAssets: assetCount, + }, + }); + chunk = []; + + const completePercentage = ((100 * count) / assetCount).toFixed(1); + + this.logger.log( + `Queued check of ${count} of ${assetCount} (${completePercentage} %) existing asset(s) so far in library ${library.id}`, + ); } + }; - currentAssetCount += assets.length; + this.logger.log(`Scanning library ${library.id} for assets missing from disk...`); + const existingAssets = this.libraryRepository.streamAssetIds(library.id); - await this.jobRepository.queue({ - name: JobName.LIBRARY_SYNC_ASSETS, - data: { - libraryId: library.id, - importPaths: library.importPaths, - exclusionPatterns: library.exclusionPatterns, - assetIds: assets.map(({ id }) => id), - progressCounter: currentAssetCount, - totalAssets: assetCount, - }, - }); - - const completePercentage = ((100 * currentAssetCount) / assetCount).toFixed(1); - - this.logger.log( - `Queued check of ${currentAssetCount} of ${assetCount} (${completePercentage} %) existing asset(s) so far in library ${library.id}`, - ); + for await (const asset of existingAssets) { + chunk.push(asset.id); + if (chunk.length === 10_000) { + await queueChunk(); + } } - if (currentAssetCount) { - this.logger.log(`Finished queuing ${currentAssetCount} asset check(s) for library ${library.id}`); - } + await queueChunk(); + + this.logger.log(`Finished queuing ${count} asset check(s) for library ${library.id}`); return JobStatus.SUCCESS; } diff --git a/server/test/repositories/asset.repository.mock.ts b/server/test/repositories/asset.repository.mock.ts index d89327c61c..bc6a172e05 100644 --- a/server/test/repositories/asset.repository.mock.ts +++ b/server/test/repositories/asset.repository.mock.ts @@ -24,7 +24,6 @@ export const newAssetRepositoryMock = (): Mocked