fix(server): queueing for duplicate detection (#18380)

* fix queueing

* update tests
This commit is contained in:
Mert 2025-05-20 09:44:39 -04:00 committed by GitHub
parent 0d773af6c3
commit a6a4dfcfd3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 18 additions and 70 deletions

View File

@ -8,30 +8,14 @@ select
"duplicateId",
"stackId",
"visibility",
"smart_search"."embedding",
(
select
coalesce(json_agg(agg), '[]')
from
(
select
"asset_files"."id",
"asset_files"."path",
"asset_files"."type"
from
"asset_files"
where
"asset_files"."assetId" = "assets"."id"
and "asset_files"."type" = $1
) as agg
) as "files"
"smart_search"."embedding"
from
"assets"
left join "smart_search" on "assets"."id" = "smart_search"."assetId"
where
"assets"."id" = $2::uuid
"assets"."id" = $1::uuid
limit
$3
$2
-- AssetJobRepository.getForSidecarWriteJob
select
@ -199,18 +183,11 @@ select
"assets"."id"
from
"assets"
inner join "smart_search" on "assets"."id" = "smart_search"."assetId"
inner join "asset_job_status" as "job_status" on "assetId" = "assets"."id"
where
"assets"."visibility" != $1
and "assets"."deletedAt" is null
and "job_status"."previewAt" is not null
and not exists (
select
from
"smart_search"
where
"assetId" = "assets"."id"
)
and "job_status"."duplicatesDetectedAt" is null
-- AssetJobRepository.streamForEncodeClip

View File

@ -28,16 +28,7 @@ export class AssetJobRepository {
.selectFrom('assets')
.where('assets.id', '=', asUuid(id))
.leftJoin('smart_search', 'assets.id', 'smart_search.assetId')
.select((eb) => [
'id',
'type',
'ownerId',
'duplicateId',
'stackId',
'visibility',
'smart_search.embedding',
withFiles(eb, AssetFileType.PREVIEW),
])
.select(['id', 'type', 'ownerId', 'duplicateId', 'stackId', 'visibility', 'smart_search.embedding'])
.limit(1)
.executeTakeFirst();
}
@ -146,10 +137,17 @@ export class AssetJobRepository {
@GenerateSql({ params: [], stream: true })
streamForSearchDuplicates(force?: boolean) {
return this.assetsWithPreviews()
.where((eb) => eb.not((eb) => eb.exists(eb.selectFrom('smart_search').whereRef('assetId', '=', 'assets.id'))))
.$if(!force, (qb) => qb.where('job_status.duplicatesDetectedAt', 'is', null))
return this.db
.selectFrom('assets')
.select(['assets.id'])
.where('assets.visibility', '!=', AssetVisibility.HIDDEN)
.where('assets.deletedAt', 'is', null)
.innerJoin('smart_search', 'assets.id', 'smart_search.assetId')
.$if(!force, (qb) =>
qb
.innerJoin('asset_job_status as job_status', 'assetId', 'assets.id')
.where('job_status.duplicatesDetectedAt', 'is', null),
)
.stream();
}

View File

@ -1,4 +1,4 @@
import { AssetFileType, AssetType, AssetVisibility, JobName, JobStatus } from 'src/enum';
import { AssetType, AssetVisibility, JobName, JobStatus } from 'src/enum';
import { DuplicateService } from 'src/services/duplicate.service';
import { SearchService } from 'src/services/search.service';
import { assetStub } from 'test/fixtures/asset.stub';
@ -11,17 +11,6 @@ vitest.useFakeTimers();
const hasEmbedding = {
id: 'asset-1',
ownerId: 'user-id',
files: [
{
assetId: 'asset-1',
createdAt: new Date(),
id: 'file-1',
path: 'preview.jpg',
type: AssetFileType.PREVIEW,
updatedAt: new Date(),
updateId: 'update-1',
},
],
stackId: null,
type: AssetType.IMAGE,
duplicateId: null,
@ -218,15 +207,6 @@ describe(SearchService.name, () => {
expect(mocks.logger.debug).toHaveBeenCalledWith(`Asset ${id} is not visible, skipping`);
});
it('should fail if asset is missing preview image', async () => {
mocks.assetJob.getForSearchDuplicatesJob.mockResolvedValue({ ...hasEmbedding, files: [] });
const result = await sut.handleSearchDuplicates({ id: assetStub.noResizePath.id });
expect(result).toBe(JobStatus.FAILED);
expect(mocks.logger.warn).toHaveBeenCalledWith(`Asset ${assetStub.noResizePath.id} is missing preview image`);
});
it('should fail if asset is missing embedding', async () => {
mocks.assetJob.getForSearchDuplicatesJob.mockResolvedValue({ ...hasEmbedding, embedding: null });

View File

@ -4,11 +4,10 @@ import { OnJob } from 'src/decorators';
import { mapAsset } from 'src/dtos/asset-response.dto';
import { AuthDto } from 'src/dtos/auth.dto';
import { DuplicateResponseDto } from 'src/dtos/duplicate.dto';
import { AssetFileType, AssetVisibility, JobName, JobStatus, QueueName } from 'src/enum';
import { AssetVisibility, JobName, JobStatus, QueueName } from 'src/enum';
import { AssetDuplicateResult } from 'src/repositories/search.repository';
import { BaseService } from 'src/services/base.service';
import { JobItem, JobOf } from 'src/types';
import { getAssetFile } from 'src/utils/asset.util';
import { isDuplicateDetectionEnabled } from 'src/utils/misc';
@Injectable()
@ -65,17 +64,11 @@ export class DuplicateService extends BaseService {
return JobStatus.SKIPPED;
}
if (asset.visibility == AssetVisibility.HIDDEN) {
if (asset.visibility === AssetVisibility.HIDDEN) {
this.logger.debug(`Asset ${id} is not visible, skipping`);
return JobStatus.SKIPPED;
}
const previewFile = getAssetFile(asset.files || [], AssetFileType.PREVIEW);
if (!previewFile) {
this.logger.warn(`Asset ${id} is missing preview image`);
return JobStatus.FAILED;
}
if (!asset.embedding) {
this.logger.debug(`Asset ${id} is missing embedding`);
return JobStatus.FAILED;