forked from Cutlery/immich
queueing
This commit is contained in:
parent
e8d5d7a214
commit
ba2ff0efee
@ -15,4 +15,7 @@ export class AssetJobStatusEntity {
|
||||
|
||||
@Column({ type: 'timestamptz', nullable: true })
|
||||
metadataExtractedAt!: Date | null;
|
||||
|
||||
@Column({ type: 'timestamptz', nullable: true })
|
||||
duplicatesDetectedAt!: Date | null;
|
||||
}
|
||||
|
@ -39,7 +39,8 @@ export enum SystemConfigKey {
|
||||
JOB_METADATA_EXTRACTION_CONCURRENCY = 'job.metadataExtraction.concurrency',
|
||||
JOB_VIDEO_CONVERSION_CONCURRENCY = 'job.videoConversion.concurrency',
|
||||
JOB_FACE_DETECTION_CONCURRENCY = 'job.faceDetection.concurrency',
|
||||
JOB_CLIP_ENCODING_CONCURRENCY = 'job.smartSearch.concurrency',
|
||||
JOB_SMART_SEARCH_CONCURRENCY = 'job.smartSearch.concurrency',
|
||||
JOB_DUPLICATE_DETECTION_CONCURRENCY = 'job.duplicateDetection.concurrency',
|
||||
JOB_BACKGROUND_TASK_CONCURRENCY = 'job.backgroundTask.concurrency',
|
||||
JOB_STORAGE_TEMPLATE_MIGRATION_CONCURRENCY = 'job.storageTemplateMigration.concurrency',
|
||||
JOB_SEARCH_CONCURRENCY = 'job.search.concurrency',
|
||||
|
@ -40,6 +40,7 @@ export enum WithoutProperty {
|
||||
ENCODED_VIDEO = 'encoded-video',
|
||||
EXIF = 'exif',
|
||||
SMART_SEARCH = 'smart-search',
|
||||
DUPLICATE = 'duplicate',
|
||||
OBJECT_TAGS = 'object-tags',
|
||||
FACES = 'faces',
|
||||
PERSON = 'person',
|
||||
|
@ -84,6 +84,9 @@ export enum JobName {
|
||||
// smart search
|
||||
QUEUE_SMART_SEARCH = 'queue-smart-search',
|
||||
SMART_SEARCH = 'smart-search',
|
||||
|
||||
// duplicate detection
|
||||
QUEUE_DUPLICATE_DETECTION = 'queue-duplicate-detection',
|
||||
DUPLICATE_DETECTION = 'duplicate-detection',
|
||||
|
||||
// XMP sidecars
|
||||
@ -200,6 +203,9 @@ export type JobItem =
|
||||
// Smart Search
|
||||
| { name: JobName.QUEUE_SMART_SEARCH; data: IBaseJob }
|
||||
| { name: JobName.SMART_SEARCH; data: IEntityJob }
|
||||
|
||||
// Duplicate Detection
|
||||
| { name: JobName.QUEUE_DUPLICATE_DETECTION; data: IBaseJob }
|
||||
| { name: JobName.DUPLICATE_DETECTION; data: IEntityJob }
|
||||
|
||||
// Filesystem
|
||||
|
@ -347,6 +347,18 @@ export class AssetRepository implements IAssetRepository {
|
||||
break;
|
||||
}
|
||||
|
||||
case WithoutProperty.DUPLICATE: {
|
||||
where = {
|
||||
resizePath: Not(IsNull()),
|
||||
isVisible: true,
|
||||
smartSearch: true,
|
||||
jobStatus: {
|
||||
duplicatesDetectedAt: IsNull(),
|
||||
},
|
||||
};
|
||||
break;
|
||||
}
|
||||
|
||||
case WithoutProperty.OBJECT_TAGS: {
|
||||
relations = {
|
||||
smartInfo: true,
|
||||
|
@ -63,6 +63,9 @@ export const JOBS_TO_QUEUE: Record<JobName, QueueName> = {
|
||||
// smart search
|
||||
[JobName.QUEUE_SMART_SEARCH]: QueueName.SMART_SEARCH,
|
||||
[JobName.SMART_SEARCH]: QueueName.SMART_SEARCH,
|
||||
|
||||
// duplicate detection
|
||||
[JobName.QUEUE_DUPLICATE_DETECTION]: QueueName.DUPLICATE_DETECTION,
|
||||
[JobName.DUPLICATE_DETECTION]: QueueName.DUPLICATE_DETECTION,
|
||||
|
||||
// XMP sidecars
|
||||
|
@ -115,6 +115,11 @@ export class JobService {
|
||||
return this.jobRepository.queue({ name: JobName.QUEUE_SMART_SEARCH, data: { force } });
|
||||
}
|
||||
|
||||
case QueueName.DUPLICATE_DETECTION: {
|
||||
await this.configCore.requireFeature(FeatureFlag.SMART_SEARCH);
|
||||
return this.jobRepository.queue({ name: JobName.QUEUE_DUPLICATE_DETECTION, data: { force } });
|
||||
}
|
||||
|
||||
case QueueName.METADATA_EXTRACTION: {
|
||||
return this.jobRepository.queue({ name: JobName.QUEUE_METADATA_EXTRACTION, data: { force } });
|
||||
}
|
||||
|
@ -49,6 +49,7 @@ export class MicroservicesService {
|
||||
[JobName.USER_SYNC_USAGE]: () => this.userService.handleUserSyncUsage(),
|
||||
[JobName.QUEUE_SMART_SEARCH]: (data) => this.smartInfoService.handleQueueEncodeClip(data),
|
||||
[JobName.SMART_SEARCH]: (data) => this.smartInfoService.handleEncodeClip(data),
|
||||
[JobName.QUEUE_DUPLICATE_DETECTION]: (data) => this.searchService.handleQueueSearchDuplicates(data),
|
||||
[JobName.DUPLICATE_DETECTION]: (data) => this.searchService.handleSearchDuplicates(data),
|
||||
[JobName.STORAGE_TEMPLATE_MIGRATION]: () => this.storageTemplateService.handleMigration(),
|
||||
[JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE]: (data) => this.storageTemplateService.handleMigrationSingle(data),
|
||||
|
@ -18,18 +18,28 @@ import {
|
||||
import { AssetOrder } from 'src/entities/album.entity';
|
||||
import { AssetEntity } from 'src/entities/asset.entity';
|
||||
import { IAssetDuplicateRepository } from 'src/interfaces/asset-duplicate.interface';
|
||||
import { IAssetRepository } from 'src/interfaces/asset.interface';
|
||||
import { IAssetRepository, WithoutProperty } from 'src/interfaces/asset.interface';
|
||||
import { ICryptoRepository } from 'src/interfaces/crypto.interface';
|
||||
import { IEntityJob, JobStatus } from 'src/interfaces/job.interface';
|
||||
import {
|
||||
IBaseJob,
|
||||
IEntityJob,
|
||||
IJobRepository,
|
||||
JOBS_ASSET_PAGINATION_SIZE,
|
||||
JobName,
|
||||
JobStatus,
|
||||
} from 'src/interfaces/job.interface';
|
||||
import { IMachineLearningRepository } from 'src/interfaces/machine-learning.interface';
|
||||
import { IMetadataRepository } from 'src/interfaces/metadata.interface';
|
||||
import { IPartnerRepository } from 'src/interfaces/partner.interface';
|
||||
import { IPersonRepository } from 'src/interfaces/person.interface';
|
||||
import { ISearchRepository, SearchExploreItem, SearchStrategy } from 'src/interfaces/search.interface';
|
||||
import { ISystemConfigRepository } from 'src/interfaces/system-config.interface';
|
||||
import { ImmichLogger } from 'src/utils/logger';
|
||||
import { usePagination } from 'src/utils/pagination';
|
||||
|
||||
@Injectable()
|
||||
export class SearchService {
|
||||
private logger = new ImmichLogger(SearchService.name);
|
||||
private configCore: SystemConfigCore;
|
||||
|
||||
constructor(
|
||||
@ -42,6 +52,7 @@ export class SearchService {
|
||||
@Inject(IMetadataRepository) private metadataRepository: IMetadataRepository,
|
||||
@Inject(ICryptoRepository) private cryptoRepository: ICryptoRepository,
|
||||
@Inject(IAssetDuplicateRepository) private assetDuplicateRepository: IAssetDuplicateRepository,
|
||||
@Inject(IJobRepository) private jobRepository: IJobRepository,
|
||||
) {
|
||||
this.configCore = SystemConfigCore.create(configRepository);
|
||||
}
|
||||
@ -144,19 +155,41 @@ export class SearchService {
|
||||
}
|
||||
}
|
||||
|
||||
async handleQueueSearchDuplicates({ force }: IBaseJob): Promise<JobStatus> {
|
||||
const { machineLearning } = await this.configCore.getConfig();
|
||||
if (!machineLearning.enabled || !machineLearning.clip.enabled) {
|
||||
return JobStatus.SKIPPED;
|
||||
}
|
||||
|
||||
const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => {
|
||||
return force
|
||||
? this.assetRepository.getAll(pagination)
|
||||
: this.assetRepository.getWithout(pagination, WithoutProperty.DUPLICATE);
|
||||
});
|
||||
|
||||
for await (const assets of assetPagination) {
|
||||
await this.jobRepository.queueAll(
|
||||
assets.map((asset) => ({ name: JobName.DUPLICATE_DETECTION, data: { id: asset.id } })),
|
||||
);
|
||||
}
|
||||
|
||||
return JobStatus.SUCCESS;
|
||||
}
|
||||
|
||||
async handleSearchDuplicates({ id }: IEntityJob): Promise<JobStatus> {
|
||||
const { machineLearning } = await this.configCore.getConfig();
|
||||
if (!machineLearning.enabled || !machineLearning.clip.enabled) {
|
||||
return JobStatus.SKIPPED;
|
||||
}
|
||||
|
||||
const asset = await this.assetRepository.getById(id, { smartSearch: { embedding: true } });
|
||||
if (!asset?.resizePath || !asset.smartSearch?.embedding) {
|
||||
return JobStatus.FAILED;
|
||||
const asset = await this.assetRepository.getById(id, { smartSearch: true });
|
||||
|
||||
if (!asset?.isVisible || asset.duplicateId) {
|
||||
return JobStatus.SKIPPED;
|
||||
}
|
||||
|
||||
if (asset.duplicateId) {
|
||||
return JobStatus.SKIPPED;
|
||||
if (!asset?.resizePath || !asset.smartSearch?.embedding) {
|
||||
return JobStatus.FAILED;
|
||||
}
|
||||
|
||||
const duplicateAssets = await this.searchRepository.searchDuplicates({
|
||||
@ -165,17 +198,22 @@ export class SearchService {
|
||||
maxDistance: machineLearning.clip.duplicateThreshold,
|
||||
});
|
||||
|
||||
if (duplicateAssets.length === 0) {
|
||||
return JobStatus.SUCCESS;
|
||||
if (duplicateAssets.length > 0) {
|
||||
this.logger.debug(`Found ${duplicateAssets.length} duplicates for asset ${asset.id}`);
|
||||
|
||||
let duplicateId = duplicateAssets.find((duplicate) => duplicate.duplicateId)?.duplicateId;
|
||||
duplicateId ??= this.cryptoRepository.randomUUID();
|
||||
|
||||
const duplicateAssetIds = duplicateAssets.map((duplicate) => duplicate.assetId);
|
||||
duplicateAssetIds.push(asset.id);
|
||||
|
||||
await this.assetDuplicateRepository.create(duplicateId, duplicateAssetIds);
|
||||
}
|
||||
|
||||
let duplicateId = duplicateAssets.find((duplicate) => duplicate.duplicateId)?.duplicateId;
|
||||
duplicateId ??= this.cryptoRepository.randomUUID();
|
||||
|
||||
const duplicateAssetIds = duplicateAssets.map((duplicate) => duplicate.assetId);
|
||||
duplicateAssetIds.push(asset.id);
|
||||
|
||||
await this.assetDuplicateRepository.create(duplicateId, duplicateAssetIds);
|
||||
await this.assetRepository.upsertJobStatus({
|
||||
assetId: asset.id,
|
||||
facesRecognizedAt: new Date(),
|
||||
});
|
||||
|
||||
return JobStatus.SUCCESS;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user