forked from Cutlery/immich
duplicate detection job, entity, config
This commit is contained in:
parent
861b72ef04
commit
e8d5d7a214
@ -20,6 +20,8 @@ import { ApiService } from 'src/services/api.service';
|
||||
import { MicroservicesService } from 'src/services/microservices.service';
|
||||
import { otelConfig } from 'src/utils/instrumentation';
|
||||
import { ImmichLogger } from 'src/utils/logger';
|
||||
import { IAssetDuplicateRepository } from 'src/interfaces/asset-duplicate.interface';
|
||||
import { AssetDuplicateRepository } from 'src/repositories/asset-duplicate.repository';
|
||||
|
||||
const providers = [ImmichLogger];
|
||||
const common = [...services, ...providers, ...repositories];
|
||||
|
@ -55,6 +55,7 @@ export const defaults = Object.freeze<SystemConfig>({
|
||||
[QueueName.METADATA_EXTRACTION]: { concurrency: 5 },
|
||||
[QueueName.FACE_DETECTION]: { concurrency: 2 },
|
||||
[QueueName.SEARCH]: { concurrency: 5 },
|
||||
[QueueName.DUPLICATE_DETECTION]: { concurrency: 5 },
|
||||
[QueueName.SIDECAR]: { concurrency: 5 },
|
||||
[QueueName.LIBRARY]: { concurrency: 5 },
|
||||
[QueueName.MIGRATION]: { concurrency: 5 },
|
||||
@ -71,6 +72,7 @@ export const defaults = Object.freeze<SystemConfig>({
|
||||
clip: {
|
||||
enabled: true,
|
||||
modelName: 'ViT-B-32__openai',
|
||||
duplicateThreshold: 0.01,
|
||||
},
|
||||
facialRecognition: {
|
||||
enabled: true,
|
||||
|
@ -73,6 +73,9 @@ export class AllJobStatusResponseDto implements Record<QueueName, JobStatusDto>
|
||||
@ApiProperty({ type: JobStatusDto })
|
||||
[QueueName.SEARCH]!: JobStatusDto;
|
||||
|
||||
@ApiProperty({ type: JobStatusDto })
|
||||
[QueueName.DUPLICATE_DETECTION]!: JobStatusDto;
|
||||
|
||||
@ApiProperty({ type: JobStatusDto })
|
||||
[QueueName.FACE_DETECTION]!: JobStatusDto;
|
||||
|
||||
|
@ -23,6 +23,13 @@ export class CLIPConfig extends ModelConfig {
|
||||
@Optional()
|
||||
@ApiProperty({ enumName: 'CLIPMode', enum: CLIPMode })
|
||||
mode?: CLIPMode;
|
||||
|
||||
@IsNumber()
|
||||
@Min(0.01)
|
||||
@Max(0.1)
|
||||
@Type(() => Number)
|
||||
@ApiProperty({ type: 'number', format: 'float' })
|
||||
duplicateThreshold!: number;
|
||||
}
|
||||
|
||||
export class RecognitionConfig extends ModelConfig {
|
||||
|
@ -184,6 +184,12 @@ class SystemConfigJobDto implements Record<ConcurrentQueueName, JobSettingsDto>
|
||||
@Type(() => JobSettingsDto)
|
||||
[QueueName.SEARCH]!: JobSettingsDto;
|
||||
|
||||
@ApiProperty({ type: JobSettingsDto })
|
||||
@ValidateNested()
|
||||
@IsObject()
|
||||
@Type(() => JobSettingsDto)
|
||||
[QueueName.DUPLICATE_DETECTION]!: JobSettingsDto;
|
||||
|
||||
@ApiProperty({ type: JobSettingsDto })
|
||||
@ValidateNested()
|
||||
@IsObject()
|
||||
|
16
server/src/entities/asset-duplicate.entity.ts
Normal file
16
server/src/entities/asset-duplicate.entity.ts
Normal file
@ -0,0 +1,16 @@
|
||||
import { AssetEntity } from 'src/entities/asset.entity';
|
||||
import { Entity, Index, JoinColumn, OneToMany, PrimaryColumn } from 'typeorm';
|
||||
|
||||
@Entity('asset_duplicates')
|
||||
@Index('asset_duplicates_assetId_uindex', ['assetId'], { unique: true })
|
||||
export class AssetDuplicateEntity {
|
||||
@OneToMany(() => AssetEntity, (asset) => asset.duplicates)
|
||||
@JoinColumn({ name: 'assetId', referencedColumnName: 'id' })
|
||||
assets!: AssetEntity;
|
||||
|
||||
@PrimaryColumn()
|
||||
id!: string;
|
||||
|
||||
@PrimaryColumn()
|
||||
assetId!: string;
|
||||
}
|
@ -24,6 +24,7 @@ import {
|
||||
PrimaryGeneratedColumn,
|
||||
UpdateDateColumn,
|
||||
} from 'typeorm';
|
||||
import { AssetDuplicateEntity } from './asset-duplicate.entity';
|
||||
|
||||
export const ASSET_CHECKSUM_CONSTRAINT = 'UQ_assets_owner_library_checksum';
|
||||
|
||||
@ -168,6 +169,13 @@ export class AssetEntity {
|
||||
|
||||
@OneToOne(() => AssetJobStatusEntity, (jobStatus) => jobStatus.asset, { nullable: true })
|
||||
jobStatus?: AssetJobStatusEntity;
|
||||
|
||||
@Column({ nullable: true })
|
||||
duplicateId?: string | null;
|
||||
|
||||
@ManyToOne(() => AssetDuplicateEntity, { nullable: true, onDelete: 'SET NULL', onUpdate: 'CASCADE' })
|
||||
@JoinColumn({ name: 'duplicateId' })
|
||||
duplicates?: AssetDuplicateEntity | null;
|
||||
}
|
||||
|
||||
export enum AssetType {
|
||||
|
@ -20,12 +20,14 @@ import { SystemMetadataEntity } from 'src/entities/system-metadata.entity';
|
||||
import { TagEntity } from 'src/entities/tag.entity';
|
||||
import { UserTokenEntity } from 'src/entities/user-token.entity';
|
||||
import { UserEntity } from 'src/entities/user.entity';
|
||||
import { AssetDuplicateEntity } from 'src/entities/asset-duplicate.entity';
|
||||
|
||||
export const entities = [
|
||||
ActivityEntity,
|
||||
AlbumEntity,
|
||||
APIKeyEntity,
|
||||
AssetEntity,
|
||||
AssetDuplicateEntity,
|
||||
AssetStackEntity,
|
||||
AssetFaceEntity,
|
||||
AssetJobStatusEntity,
|
||||
|
@ -208,6 +208,7 @@ export interface SystemConfig {
|
||||
clip: {
|
||||
enabled: boolean;
|
||||
modelName: string;
|
||||
duplicateThreshold: number;
|
||||
};
|
||||
facialRecognition: {
|
||||
enabled: boolean;
|
||||
|
9
server/src/interfaces/asset-duplicate.interface.ts
Normal file
9
server/src/interfaces/asset-duplicate.interface.ts
Normal file
@ -0,0 +1,9 @@
|
||||
import { AssetDuplicateEntity } from 'src/entities/asset-duplicate.entity';
|
||||
|
||||
export const IAssetDuplicateRepository = 'IAssetDuplicateRepository';
|
||||
|
||||
export interface IAssetDuplicateRepository {
|
||||
create(duplicateId: string, assetIds: string[]): Promise<void>;
|
||||
delete(id: string): Promise<void>;
|
||||
getById(id: string): Promise<AssetDuplicateEntity | null>;
|
||||
}
|
@ -5,6 +5,7 @@ export enum QueueName {
|
||||
FACE_DETECTION = 'faceDetection',
|
||||
FACIAL_RECOGNITION = 'facialRecognition',
|
||||
SMART_SEARCH = 'smartSearch',
|
||||
DUPLICATE_DETECTION = 'duplicateDetection',
|
||||
BACKGROUND_TASK = 'backgroundTask',
|
||||
STORAGE_TEMPLATE_MIGRATION = 'storageTemplateMigration',
|
||||
MIGRATION = 'migration',
|
||||
@ -83,6 +84,7 @@ export enum JobName {
|
||||
// smart search
|
||||
QUEUE_SMART_SEARCH = 'queue-smart-search',
|
||||
SMART_SEARCH = 'smart-search',
|
||||
DUPLICATE_DETECTION = 'duplicate-detection',
|
||||
|
||||
// XMP sidecars
|
||||
QUEUE_SIDECAR = 'queue-sidecar',
|
||||
@ -198,6 +200,7 @@ export type JobItem =
|
||||
// Smart Search
|
||||
| { name: JobName.QUEUE_SMART_SEARCH; data: IBaseJob }
|
||||
| { name: JobName.SMART_SEARCH; data: IEntityJob }
|
||||
| { name: JobName.DUPLICATE_DETECTION; data: IEntityJob }
|
||||
|
||||
// Filesystem
|
||||
| { name: JobName.DELETE_FILES; data: IDeleteFilesJob }
|
||||
|
@ -133,6 +133,7 @@ export interface SearchExifOptions {
|
||||
export interface SearchEmbeddingOptions {
|
||||
embedding: Embedding;
|
||||
userIds: string[];
|
||||
maxDistance?: number;
|
||||
}
|
||||
|
||||
export interface SearchPeopleOptions {
|
||||
@ -174,7 +175,6 @@ export type SmartSearchOptions = SearchDateOptions &
|
||||
export interface FaceEmbeddingSearch extends SearchEmbeddingOptions {
|
||||
hasPerson?: boolean;
|
||||
numResults: number;
|
||||
maxDistance?: number;
|
||||
}
|
||||
|
||||
export interface FaceSearchResult {
|
||||
@ -182,10 +182,17 @@ export interface FaceSearchResult {
|
||||
face: AssetFaceEntity;
|
||||
}
|
||||
|
||||
export interface AssetDuplicateResult {
|
||||
assetId: string;
|
||||
duplicateId: string;
|
||||
distance: number;
|
||||
}
|
||||
|
||||
export interface ISearchRepository {
|
||||
init(modelName: string): Promise<void>;
|
||||
searchMetadata(pagination: SearchPaginationOptions, options: AssetSearchOptions): Paginated<AssetEntity>;
|
||||
searchSmart(pagination: SearchPaginationOptions, options: SmartSearchOptions): Paginated<AssetEntity>;
|
||||
searchDuplicates(options: SearchEmbeddingOptions): Promise<AssetDuplicateResult[]>;
|
||||
searchFaces(search: FaceEmbeddingSearch): Promise<FaceSearchResult[]>;
|
||||
upsert(assetId: string, embedding: number[]): Promise<void>;
|
||||
searchPlaces(placeName: string): Promise<GeodataPlacesEntity[]>;
|
||||
|
39
server/src/repositories/asset-duplicate.repository.ts
Normal file
39
server/src/repositories/asset-duplicate.repository.ts
Normal file
@ -0,0 +1,39 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { AssetDuplicateEntity } from 'src/entities/asset-duplicate.entity';
|
||||
import { AssetEntity } from 'src/entities/asset.entity';
|
||||
import { IAssetDuplicateRepository } from 'src/interfaces/asset-duplicate.interface';
|
||||
import { Instrumentation } from 'src/utils/instrumentation';
|
||||
import { Repository } from 'typeorm';
|
||||
|
||||
@Instrumentation()
|
||||
@Injectable()
|
||||
export class AssetDuplicateRepository implements IAssetDuplicateRepository {
|
||||
constructor(@InjectRepository(AssetDuplicateEntity) private repository: Repository<AssetDuplicateEntity>) {}
|
||||
|
||||
async create(duplicateId: string, assetIds: string[]) {
|
||||
await this.repository.manager.transaction(async (manager) => {
|
||||
await manager.upsert(
|
||||
AssetDuplicateEntity,
|
||||
assetIds.map((assetId) => ({ duplicateId, assetId })),
|
||||
['assetId'],
|
||||
);
|
||||
await manager.update(AssetEntity, assetIds, { duplicateId });
|
||||
});
|
||||
}
|
||||
|
||||
async delete(id: string): Promise<void> {
|
||||
await this.repository.delete(id);
|
||||
}
|
||||
|
||||
async getById(id: string): Promise<AssetDuplicateEntity | null> {
|
||||
return this.repository.findOne({
|
||||
where: {
|
||||
id,
|
||||
},
|
||||
relations: {
|
||||
assets: true,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
@ -63,6 +63,7 @@ export const JOBS_TO_QUEUE: Record<JobName, QueueName> = {
|
||||
// smart search
|
||||
[JobName.QUEUE_SMART_SEARCH]: QueueName.SMART_SEARCH,
|
||||
[JobName.SMART_SEARCH]: QueueName.SMART_SEARCH,
|
||||
[JobName.DUPLICATE_DETECTION]: QueueName.DUPLICATE_DETECTION,
|
||||
|
||||
// XMP sidecars
|
||||
[JobName.QUEUE_SIDECAR]: QueueName.SIDECAR,
|
||||
|
@ -9,10 +9,12 @@ import { SmartInfoEntity } from 'src/entities/smart-info.entity';
|
||||
import { SmartSearchEntity } from 'src/entities/smart-search.entity';
|
||||
import { DatabaseExtension } from 'src/interfaces/database.interface';
|
||||
import {
|
||||
AssetDuplicateResult,
|
||||
AssetSearchOptions,
|
||||
FaceEmbeddingSearch,
|
||||
FaceSearchResult,
|
||||
ISearchRepository,
|
||||
SearchEmbeddingOptions,
|
||||
SearchPaginationOptions,
|
||||
SmartSearchOptions,
|
||||
} from 'src/interfaces/search.interface';
|
||||
@ -144,6 +146,36 @@ export class SearchRepository implements ISearchRepository {
|
||||
return results;
|
||||
}
|
||||
|
||||
@GenerateSql({
|
||||
params: [
|
||||
{
|
||||
embedding: Array.from({ length: 512 }, Math.random),
|
||||
maxDistance: 0.6,
|
||||
userIds: [DummyValue.UUID],
|
||||
},
|
||||
],
|
||||
})
|
||||
searchDuplicates({ embedding, maxDistance, userIds }: SearchEmbeddingOptions): Promise<AssetDuplicateResult[]> {
|
||||
const cte = this.assetRepository.createQueryBuilder('asset');
|
||||
cte
|
||||
.select('asset.id', 'assetId')
|
||||
.addSelect('asset.duplicateId')
|
||||
.addSelect('search.embedding <=> :embedding', 'distance')
|
||||
.innerJoin('asset.smartSearch', 'search')
|
||||
.where('asset.ownerId IN (:...userIds )')
|
||||
.orderBy('asset.embedding <=> :embedding')
|
||||
.limit(64)
|
||||
.setParameters({ embedding: asVector(embedding), userIds });
|
||||
|
||||
const builder = this.assetRepository
|
||||
.createQueryBuilder('asset')
|
||||
.addCommonTableExpression(cte, 'cte')
|
||||
.select('cte.*')
|
||||
.where('cte.distance <= :maxDistance', { maxDistance });
|
||||
|
||||
return builder.getMany() as any as Promise<AssetDuplicateResult[]>;
|
||||
}
|
||||
|
||||
@GenerateSql({
|
||||
params: [
|
||||
{
|
||||
|
@ -296,6 +296,13 @@ export class JobService {
|
||||
break;
|
||||
}
|
||||
|
||||
case JobName.SMART_SEARCH: {
|
||||
if (item.data.source === 'upload') {
|
||||
await this.jobRepository.queue({ name: JobName.DUPLICATE_DETECTION, data: item.data });
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case JobName.USER_DELETION: {
|
||||
this.eventRepository.clientBroadcast(ClientEvent.USER_DELETE, item.data.id);
|
||||
break;
|
||||
|
@ -14,6 +14,7 @@ import { StorageService } from 'src/services/storage.service';
|
||||
import { SystemConfigService } from 'src/services/system-config.service';
|
||||
import { UserService } from 'src/services/user.service';
|
||||
import { otelSDK } from 'src/utils/instrumentation';
|
||||
import { SearchService } from './search.service';
|
||||
|
||||
@Injectable()
|
||||
export class MicroservicesService {
|
||||
@ -31,6 +32,7 @@ export class MicroservicesService {
|
||||
private storageService: StorageService,
|
||||
private userService: UserService,
|
||||
private databaseService: DatabaseService,
|
||||
private searchService: SearchService,
|
||||
) {}
|
||||
|
||||
async init() {
|
||||
@ -47,6 +49,7 @@ export class MicroservicesService {
|
||||
[JobName.USER_SYNC_USAGE]: () => this.userService.handleUserSyncUsage(),
|
||||
[JobName.QUEUE_SMART_SEARCH]: (data) => this.smartInfoService.handleQueueEncodeClip(data),
|
||||
[JobName.SMART_SEARCH]: (data) => this.smartInfoService.handleEncodeClip(data),
|
||||
[JobName.DUPLICATE_DETECTION]: (data) => this.searchService.handleSearchDuplicates(data),
|
||||
[JobName.STORAGE_TEMPLATE_MIGRATION]: () => this.storageTemplateService.handleMigration(),
|
||||
[JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE]: (data) => this.storageTemplateService.handleMigrationSingle(data),
|
||||
[JobName.QUEUE_MIGRATION]: () => this.mediaService.handleQueueMigration(),
|
||||
|
@ -17,7 +17,10 @@ import {
|
||||
} from 'src/dtos/search.dto';
|
||||
import { AssetOrder } from 'src/entities/album.entity';
|
||||
import { AssetEntity } from 'src/entities/asset.entity';
|
||||
import { IAssetDuplicateRepository } from 'src/interfaces/asset-duplicate.interface';
|
||||
import { IAssetRepository } from 'src/interfaces/asset.interface';
|
||||
import { ICryptoRepository } from 'src/interfaces/crypto.interface';
|
||||
import { IEntityJob, JobStatus } from 'src/interfaces/job.interface';
|
||||
import { IMachineLearningRepository } from 'src/interfaces/machine-learning.interface';
|
||||
import { IMetadataRepository } from 'src/interfaces/metadata.interface';
|
||||
import { IPartnerRepository } from 'src/interfaces/partner.interface';
|
||||
@ -37,6 +40,8 @@ export class SearchService {
|
||||
@Inject(IAssetRepository) private assetRepository: IAssetRepository,
|
||||
@Inject(IPartnerRepository) private partnerRepository: IPartnerRepository,
|
||||
@Inject(IMetadataRepository) private metadataRepository: IMetadataRepository,
|
||||
@Inject(ICryptoRepository) private cryptoRepository: ICryptoRepository,
|
||||
@Inject(IAssetDuplicateRepository) private assetDuplicateRepository: IAssetDuplicateRepository,
|
||||
) {
|
||||
this.configCore = SystemConfigCore.create(configRepository);
|
||||
}
|
||||
@ -139,6 +144,42 @@ export class SearchService {
|
||||
}
|
||||
}
|
||||
|
||||
async handleSearchDuplicates({ id }: IEntityJob): Promise<JobStatus> {
|
||||
const { machineLearning } = await this.configCore.getConfig();
|
||||
if (!machineLearning.enabled || !machineLearning.clip.enabled) {
|
||||
return JobStatus.SKIPPED;
|
||||
}
|
||||
|
||||
const asset = await this.assetRepository.getById(id, { smartSearch: { embedding: true } });
|
||||
if (!asset?.resizePath || !asset.smartSearch?.embedding) {
|
||||
return JobStatus.FAILED;
|
||||
}
|
||||
|
||||
if (asset.duplicateId) {
|
||||
return JobStatus.SKIPPED;
|
||||
}
|
||||
|
||||
const duplicateAssets = await this.searchRepository.searchDuplicates({
|
||||
userIds: [asset.ownerId],
|
||||
embedding: asset.smartSearch.embedding,
|
||||
maxDistance: machineLearning.clip.duplicateThreshold,
|
||||
});
|
||||
|
||||
if (duplicateAssets.length === 0) {
|
||||
return JobStatus.SUCCESS;
|
||||
}
|
||||
|
||||
let duplicateId = duplicateAssets.find((duplicate) => duplicate.duplicateId)?.duplicateId;
|
||||
duplicateId ??= this.cryptoRepository.randomUUID();
|
||||
|
||||
const duplicateAssetIds = duplicateAssets.map((duplicate) => duplicate.assetId);
|
||||
duplicateAssetIds.push(asset.id);
|
||||
|
||||
await this.assetDuplicateRepository.create(duplicateId, duplicateAssetIds);
|
||||
|
||||
return JobStatus.SUCCESS;
|
||||
}
|
||||
|
||||
// TODO: remove after implementing new search filters
|
||||
/** @deprecated */
|
||||
async search(auth: AuthDto, dto: SearchDto): Promise<SearchResponseDto> {
|
||||
|
@ -74,6 +74,7 @@ const updatedConfig = Object.freeze<SystemConfig>({
|
||||
clip: {
|
||||
enabled: true,
|
||||
modelName: 'ViT-B-32__openai',
|
||||
duplicateThreshold: 0.01,
|
||||
},
|
||||
facialRecognition: {
|
||||
enabled: true,
|
||||
|
Loading…
x
Reference in New Issue
Block a user