refactor: stream for sidecar (#17995)

* refactor: stream for sidecar

* chore: make sql

---------

Co-authored-by: Jason Rasmussen <jason@rasm.me>
This commit is contained in:
Alex 2025-04-30 10:53:51 -05:00 committed by GitHub
parent 436cff72b5
commit 732b06eec8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 46 additions and 38 deletions

View File

@ -470,6 +470,18 @@ from
where where
"assets"."deletedAt" <= $1 "assets"."deletedAt" <= $1
-- AssetJobRepository.streamForSidecar
select
"assets"."id"
from
"assets"
where
(
"assets"."sidecarPath" = $1
or "assets"."sidecarPath" is null
)
and "assets"."isVisible" = $2
-- AssetJobRepository.streamForDetectFacesJob -- AssetJobRepository.streamForDetectFacesJob
select select
"assets"."id" "assets"."id"

View File

@ -238,18 +238,13 @@ select
from from
"assets" "assets"
where where
( "deletedAt" is null
"assets"."sidecarPath" = $1
or "assets"."sidecarPath" is null
)
and "assets"."isVisible" = $2
and "deletedAt" is null
order by order by
"createdAt" "createdAt"
limit limit
$3 $1
offset offset
$4 $2
-- AssetRepository.getTimeBuckets -- AssetRepository.getTimeBuckets
with with

View File

@ -323,6 +323,18 @@ export class AssetJobRepository {
.stream(); .stream();
} }
@GenerateSql({ params: [], stream: true })
streamForSidecar(force?: boolean) {
return this.db
.selectFrom('assets')
.select(['assets.id'])
.$if(!force, (qb) =>
qb.where((eb) => eb.or([eb('assets.sidecarPath', '=', ''), eb('assets.sidecarPath', 'is', null)])),
)
.where('assets.isVisible', '=', true)
.stream();
}
@GenerateSql({ params: [], stream: true }) @GenerateSql({ params: [], stream: true })
streamForDetectFacesJob(force?: boolean) { streamForDetectFacesJob(force?: boolean) {
return this.assetsWithPreviews() return this.assetsWithPreviews()

View File

@ -48,7 +48,6 @@ export interface LivePhotoSearchOptions {
export enum WithoutProperty { export enum WithoutProperty {
THUMBNAIL = 'thumbnail', THUMBNAIL = 'thumbnail',
ENCODED_VIDEO = 'encoded-video', ENCODED_VIDEO = 'encoded-video',
SIDECAR = 'sidecar',
} }
export enum WithProperty { export enum WithProperty {
@ -542,11 +541,6 @@ export class AssetRepository {
.where((eb) => eb.or([eb('assets.encodedVideoPath', 'is', null), eb('assets.encodedVideoPath', '=', '')])), .where((eb) => eb.or([eb('assets.encodedVideoPath', 'is', null), eb('assets.encodedVideoPath', '=', '')])),
) )
.$if(property === WithoutProperty.SIDECAR, (qb) =>
qb
.where((eb) => eb.or([eb('assets.sidecarPath', '=', ''), eb('assets.sidecarPath', 'is', null)]))
.where('assets.isVisible', '=', true),
)
.$if(property === WithoutProperty.THUMBNAIL, (qb) => .$if(property === WithoutProperty.THUMBNAIL, (qb) =>
qb qb
.innerJoin('asset_job_status as job_status', 'assetId', 'assets.id') .innerJoin('asset_job_status as job_status', 'assetId', 'assets.id')

View File

@ -5,7 +5,6 @@ import { constants } from 'node:fs/promises';
import { defaults } from 'src/config'; import { defaults } from 'src/config';
import { MapAsset } from 'src/dtos/asset-response.dto'; import { MapAsset } from 'src/dtos/asset-response.dto';
import { AssetType, ExifOrientation, ImmichWorker, JobName, JobStatus, SourceType } from 'src/enum'; import { AssetType, ExifOrientation, ImmichWorker, JobName, JobStatus, SourceType } from 'src/enum';
import { WithoutProperty } from 'src/repositories/asset.repository';
import { ImmichTags } from 'src/repositories/metadata.repository'; import { ImmichTags } from 'src/repositories/metadata.repository';
import { MetadataService } from 'src/services/metadata.service'; import { MetadataService } from 'src/services/metadata.service';
import { assetStub } from 'test/fixtures/asset.stub'; import { assetStub } from 'test/fixtures/asset.stub';
@ -1346,12 +1345,11 @@ describe(MetadataService.name, () => {
describe('handleQueueSidecar', () => { describe('handleQueueSidecar', () => {
it('should queue assets with sidecar files', async () => { it('should queue assets with sidecar files', async () => {
mocks.asset.getAll.mockResolvedValue({ items: [assetStub.sidecar], hasNextPage: false }); mocks.assetJob.streamForSidecar.mockReturnValue(makeStream([assetStub.image]));
await sut.handleQueueSidecar({ force: true }); await sut.handleQueueSidecar({ force: true });
expect(mocks.assetJob.streamForSidecar).toHaveBeenCalledWith(true);
expect(mocks.asset.getAll).toHaveBeenCalledWith({ take: 1000, skip: 0 });
expect(mocks.asset.getWithout).not.toHaveBeenCalled();
expect(mocks.job.queueAll).toHaveBeenCalledWith([ expect(mocks.job.queueAll).toHaveBeenCalledWith([
{ {
name: JobName.SIDECAR_SYNC, name: JobName.SIDECAR_SYNC,
@ -1361,12 +1359,11 @@ describe(MetadataService.name, () => {
}); });
it('should queue assets without sidecar files', async () => { it('should queue assets without sidecar files', async () => {
mocks.asset.getWithout.mockResolvedValue({ items: [assetStub.image], hasNextPage: false }); mocks.assetJob.streamForSidecar.mockReturnValue(makeStream([assetStub.image]));
await sut.handleQueueSidecar({ force: false }); await sut.handleQueueSidecar({ force: false });
expect(mocks.asset.getWithout).toHaveBeenCalledWith({ take: 1000, skip: 0 }, WithoutProperty.SIDECAR); expect(mocks.assetJob.streamForSidecar).toHaveBeenCalledWith(false);
expect(mocks.asset.getAll).not.toHaveBeenCalled();
expect(mocks.job.queueAll).toHaveBeenCalledWith([ expect(mocks.job.queueAll).toHaveBeenCalledWith([
{ {
name: JobName.SIDECAR_DISCOVERY, name: JobName.SIDECAR_DISCOVERY,

View File

@ -22,14 +22,12 @@ import {
QueueName, QueueName,
SourceType, SourceType,
} from 'src/enum'; } from 'src/enum';
import { WithoutProperty } from 'src/repositories/asset.repository';
import { ArgOf } from 'src/repositories/event.repository'; import { ArgOf } from 'src/repositories/event.repository';
import { ReverseGeocodeResult } from 'src/repositories/map.repository'; import { ReverseGeocodeResult } from 'src/repositories/map.repository';
import { ImmichTags } from 'src/repositories/metadata.repository'; import { ImmichTags } from 'src/repositories/metadata.repository';
import { BaseService } from 'src/services/base.service'; import { BaseService } from 'src/services/base.service';
import { JobOf } from 'src/types'; import { JobItem, JobOf } from 'src/types';
import { isFaceImportEnabled } from 'src/utils/misc'; import { isFaceImportEnabled } from 'src/utils/misc';
import { usePagination } from 'src/utils/pagination';
import { upsertTags } from 'src/utils/tag'; import { upsertTags } from 'src/utils/tag';
/** look for a date from these tags (in order) */ /** look for a date from these tags (in order) */
@ -289,22 +287,22 @@ export class MetadataService extends BaseService {
} }
@OnJob({ name: JobName.QUEUE_SIDECAR, queue: QueueName.SIDECAR }) @OnJob({ name: JobName.QUEUE_SIDECAR, queue: QueueName.SIDECAR })
async handleQueueSidecar(job: JobOf<JobName.QUEUE_SIDECAR>): Promise<JobStatus> { async handleQueueSidecar({ force }: JobOf<JobName.QUEUE_SIDECAR>): Promise<JobStatus> {
const { force } = job; let jobs: JobItem[] = [];
const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => { const queueAll = async () => {
return force await this.jobRepository.queueAll(jobs);
? this.assetRepository.getAll(pagination) jobs = [];
: this.assetRepository.getWithout(pagination, WithoutProperty.SIDECAR); };
});
for await (const assets of assetPagination) { const assets = this.assetJobRepository.streamForSidecar(force);
await this.jobRepository.queueAll( for await (const asset of assets) {
assets.map((asset) => ({ jobs.push({ name: force ? JobName.SIDECAR_SYNC : JobName.SIDECAR_DISCOVERY, data: { id: asset.id } });
name: force ? JobName.SIDECAR_SYNC : JobName.SIDECAR_DISCOVERY, if (jobs.length >= JOBS_ASSET_PAGINATION_SIZE) {
data: { id: asset.id }, await queueAll();
})),
);
} }
}
await queueAll();
return JobStatus.SUCCESS; return JobStatus.SUCCESS;
} }