From ba2ff0efee3f3bf895503aff0587c75581ece3c3 Mon Sep 17 00:00:00 2001 From: mertalev <101130780+mertalev@users.noreply.github.com> Date: Sat, 23 Mar 2024 17:56:49 -0400 Subject: [PATCH] queueing --- .../src/entities/asset-job-status.entity.ts | 3 + server/src/entities/system-config.entity.ts | 3 +- server/src/interfaces/asset.interface.ts | 1 + server/src/interfaces/job.interface.ts | 6 ++ server/src/repositories/asset.repository.ts | 12 ++++ server/src/repositories/job.repository.ts | 3 + server/src/services/job.service.ts | 5 ++ server/src/services/microservices.service.ts | 1 + server/src/services/search.service.ts | 70 ++++++++++++++----- 9 files changed, 87 insertions(+), 17 deletions(-) diff --git a/server/src/entities/asset-job-status.entity.ts b/server/src/entities/asset-job-status.entity.ts index b50075203..44c0a0469 100644 --- a/server/src/entities/asset-job-status.entity.ts +++ b/server/src/entities/asset-job-status.entity.ts @@ -15,4 +15,7 @@ export class AssetJobStatusEntity { @Column({ type: 'timestamptz', nullable: true }) metadataExtractedAt!: Date | null; + + @Column({ type: 'timestamptz', nullable: true }) + duplicatesDetectedAt!: Date | null; } diff --git a/server/src/entities/system-config.entity.ts b/server/src/entities/system-config.entity.ts index 190452e31..ec59b4bc0 100644 --- a/server/src/entities/system-config.entity.ts +++ b/server/src/entities/system-config.entity.ts @@ -39,7 +39,8 @@ export enum SystemConfigKey { JOB_METADATA_EXTRACTION_CONCURRENCY = 'job.metadataExtraction.concurrency', JOB_VIDEO_CONVERSION_CONCURRENCY = 'job.videoConversion.concurrency', JOB_FACE_DETECTION_CONCURRENCY = 'job.faceDetection.concurrency', - JOB_CLIP_ENCODING_CONCURRENCY = 'job.smartSearch.concurrency', + JOB_SMART_SEARCH_CONCURRENCY = 'job.smartSearch.concurrency', + JOB_DUPLICATE_DETECTION_CONCURRENCY = 'job.duplicateDetection.concurrency', JOB_BACKGROUND_TASK_CONCURRENCY = 'job.backgroundTask.concurrency', JOB_STORAGE_TEMPLATE_MIGRATION_CONCURRENCY = 'job.storageTemplateMigration.concurrency', JOB_SEARCH_CONCURRENCY = 'job.search.concurrency', diff --git a/server/src/interfaces/asset.interface.ts b/server/src/interfaces/asset.interface.ts index 008931566..dd00e40cb 100644 --- a/server/src/interfaces/asset.interface.ts +++ b/server/src/interfaces/asset.interface.ts @@ -40,6 +40,7 @@ export enum WithoutProperty { ENCODED_VIDEO = 'encoded-video', EXIF = 'exif', SMART_SEARCH = 'smart-search', + DUPLICATE = 'duplicate', OBJECT_TAGS = 'object-tags', FACES = 'faces', PERSON = 'person', diff --git a/server/src/interfaces/job.interface.ts b/server/src/interfaces/job.interface.ts index 25c25e87d..19455f51a 100644 --- a/server/src/interfaces/job.interface.ts +++ b/server/src/interfaces/job.interface.ts @@ -84,6 +84,9 @@ export enum JobName { // smart search QUEUE_SMART_SEARCH = 'queue-smart-search', SMART_SEARCH = 'smart-search', + + // duplicate detection + QUEUE_DUPLICATE_DETECTION = 'queue-duplicate-detection', DUPLICATE_DETECTION = 'duplicate-detection', // XMP sidecars @@ -200,6 +203,9 @@ export type JobItem = // Smart Search | { name: JobName.QUEUE_SMART_SEARCH; data: IBaseJob } | { name: JobName.SMART_SEARCH; data: IEntityJob } + + // Duplicate Detection + | { name: JobName.QUEUE_DUPLICATE_DETECTION; data: IBaseJob } | { name: JobName.DUPLICATE_DETECTION; data: IEntityJob } // Filesystem diff --git a/server/src/repositories/asset.repository.ts b/server/src/repositories/asset.repository.ts index 907cddb89..bfcb525f2 100644 --- a/server/src/repositories/asset.repository.ts +++ b/server/src/repositories/asset.repository.ts @@ -347,6 +347,18 @@ export class AssetRepository implements IAssetRepository { break; } + case WithoutProperty.DUPLICATE: { + where = { + resizePath: Not(IsNull()), + isVisible: true, + smartSearch: true, + jobStatus: { + duplicatesDetectedAt: IsNull(), + }, + }; + break; + } + case WithoutProperty.OBJECT_TAGS: { relations = { smartInfo: true, diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index af41dbd04..28a4cc699 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -63,6 +63,9 @@ export const JOBS_TO_QUEUE: Record = { // smart search [JobName.QUEUE_SMART_SEARCH]: QueueName.SMART_SEARCH, [JobName.SMART_SEARCH]: QueueName.SMART_SEARCH, + + // duplicate detection + [JobName.QUEUE_DUPLICATE_DETECTION]: QueueName.DUPLICATE_DETECTION, [JobName.DUPLICATE_DETECTION]: QueueName.DUPLICATE_DETECTION, // XMP sidecars diff --git a/server/src/services/job.service.ts b/server/src/services/job.service.ts index 3c3803751..244a30a14 100644 --- a/server/src/services/job.service.ts +++ b/server/src/services/job.service.ts @@ -115,6 +115,11 @@ export class JobService { return this.jobRepository.queue({ name: JobName.QUEUE_SMART_SEARCH, data: { force } }); } + case QueueName.DUPLICATE_DETECTION: { + await this.configCore.requireFeature(FeatureFlag.SMART_SEARCH); + return this.jobRepository.queue({ name: JobName.QUEUE_DUPLICATE_DETECTION, data: { force } }); + } + case QueueName.METADATA_EXTRACTION: { return this.jobRepository.queue({ name: JobName.QUEUE_METADATA_EXTRACTION, data: { force } }); } diff --git a/server/src/services/microservices.service.ts b/server/src/services/microservices.service.ts index 3e4ee8e4f..41bacd0f6 100644 --- a/server/src/services/microservices.service.ts +++ b/server/src/services/microservices.service.ts @@ -49,6 +49,7 @@ export class MicroservicesService { [JobName.USER_SYNC_USAGE]: () => this.userService.handleUserSyncUsage(), [JobName.QUEUE_SMART_SEARCH]: (data) => this.smartInfoService.handleQueueEncodeClip(data), [JobName.SMART_SEARCH]: (data) => this.smartInfoService.handleEncodeClip(data), + [JobName.QUEUE_DUPLICATE_DETECTION]: (data) => this.searchService.handleQueueSearchDuplicates(data), [JobName.DUPLICATE_DETECTION]: (data) => this.searchService.handleSearchDuplicates(data), [JobName.STORAGE_TEMPLATE_MIGRATION]: () => this.storageTemplateService.handleMigration(), [JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE]: (data) => this.storageTemplateService.handleMigrationSingle(data), diff --git a/server/src/services/search.service.ts b/server/src/services/search.service.ts index 7c1a89caa..e3f5d016c 100644 --- a/server/src/services/search.service.ts +++ b/server/src/services/search.service.ts @@ -18,18 +18,28 @@ import { import { AssetOrder } from 'src/entities/album.entity'; import { AssetEntity } from 'src/entities/asset.entity'; import { IAssetDuplicateRepository } from 'src/interfaces/asset-duplicate.interface'; -import { IAssetRepository } from 'src/interfaces/asset.interface'; +import { IAssetRepository, WithoutProperty } from 'src/interfaces/asset.interface'; import { ICryptoRepository } from 'src/interfaces/crypto.interface'; -import { IEntityJob, JobStatus } from 'src/interfaces/job.interface'; +import { + IBaseJob, + IEntityJob, + IJobRepository, + JOBS_ASSET_PAGINATION_SIZE, + JobName, + JobStatus, +} from 'src/interfaces/job.interface'; import { IMachineLearningRepository } from 'src/interfaces/machine-learning.interface'; import { IMetadataRepository } from 'src/interfaces/metadata.interface'; import { IPartnerRepository } from 'src/interfaces/partner.interface'; import { IPersonRepository } from 'src/interfaces/person.interface'; import { ISearchRepository, SearchExploreItem, SearchStrategy } from 'src/interfaces/search.interface'; import { ISystemConfigRepository } from 'src/interfaces/system-config.interface'; +import { ImmichLogger } from 'src/utils/logger'; +import { usePagination } from 'src/utils/pagination'; @Injectable() export class SearchService { + private logger = new ImmichLogger(SearchService.name); private configCore: SystemConfigCore; constructor( @@ -42,6 +52,7 @@ export class SearchService { @Inject(IMetadataRepository) private metadataRepository: IMetadataRepository, @Inject(ICryptoRepository) private cryptoRepository: ICryptoRepository, @Inject(IAssetDuplicateRepository) private assetDuplicateRepository: IAssetDuplicateRepository, + @Inject(IJobRepository) private jobRepository: IJobRepository, ) { this.configCore = SystemConfigCore.create(configRepository); } @@ -144,19 +155,41 @@ export class SearchService { } } + async handleQueueSearchDuplicates({ force }: IBaseJob): Promise { + const { machineLearning } = await this.configCore.getConfig(); + if (!machineLearning.enabled || !machineLearning.clip.enabled) { + return JobStatus.SKIPPED; + } + + const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { + return force + ? this.assetRepository.getAll(pagination) + : this.assetRepository.getWithout(pagination, WithoutProperty.DUPLICATE); + }); + + for await (const assets of assetPagination) { + await this.jobRepository.queueAll( + assets.map((asset) => ({ name: JobName.DUPLICATE_DETECTION, data: { id: asset.id } })), + ); + } + + return JobStatus.SUCCESS; + } + async handleSearchDuplicates({ id }: IEntityJob): Promise { const { machineLearning } = await this.configCore.getConfig(); if (!machineLearning.enabled || !machineLearning.clip.enabled) { return JobStatus.SKIPPED; } - const asset = await this.assetRepository.getById(id, { smartSearch: { embedding: true } }); - if (!asset?.resizePath || !asset.smartSearch?.embedding) { - return JobStatus.FAILED; + const asset = await this.assetRepository.getById(id, { smartSearch: true }); + + if (!asset?.isVisible || asset.duplicateId) { + return JobStatus.SKIPPED; } - if (asset.duplicateId) { - return JobStatus.SKIPPED; + if (!asset?.resizePath || !asset.smartSearch?.embedding) { + return JobStatus.FAILED; } const duplicateAssets = await this.searchRepository.searchDuplicates({ @@ -165,17 +198,22 @@ export class SearchService { maxDistance: machineLearning.clip.duplicateThreshold, }); - if (duplicateAssets.length === 0) { - return JobStatus.SUCCESS; + if (duplicateAssets.length > 0) { + this.logger.debug(`Found ${duplicateAssets.length} duplicates for asset ${asset.id}`); + + let duplicateId = duplicateAssets.find((duplicate) => duplicate.duplicateId)?.duplicateId; + duplicateId ??= this.cryptoRepository.randomUUID(); + + const duplicateAssetIds = duplicateAssets.map((duplicate) => duplicate.assetId); + duplicateAssetIds.push(asset.id); + + await this.assetDuplicateRepository.create(duplicateId, duplicateAssetIds); } - let duplicateId = duplicateAssets.find((duplicate) => duplicate.duplicateId)?.duplicateId; - duplicateId ??= this.cryptoRepository.randomUUID(); - - const duplicateAssetIds = duplicateAssets.map((duplicate) => duplicate.assetId); - duplicateAssetIds.push(asset.id); - - await this.assetDuplicateRepository.create(duplicateId, duplicateAssetIds); + await this.assetRepository.upsertJobStatus({ + assetId: asset.id, + facesRecognizedAt: new Date(), + }); return JobStatus.SUCCESS; }