refactor: stream queue migration (#17997)

This commit is contained in:
Jason Rasmussen 2025-04-30 12:23:13 -04:00 committed by GitHub
parent 732b06eec8
commit 526c02297c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 25 additions and 98 deletions

View File

@ -495,3 +495,11 @@ where
and "job_status"."facesRecognizedAt" is null
order by
"assets"."createdAt" desc
-- AssetJobRepository.streamForMigrationJob
select
"id"
from
"assets"
where
"assets"."deletedAt" is null

View File

@ -232,20 +232,6 @@ where
limit
$3
-- AssetRepository.getWithout (sidecar)
select
"assets".*
from
"assets"
where
"deletedAt" is null
order by
"createdAt"
limit
$1
offset
$2
-- AssetRepository.getTimeBuckets
with
"assets" as (

View File

@ -343,4 +343,9 @@ export class AssetJobRepository {
.orderBy('assets.createdAt', 'desc')
.stream();
}
@GenerateSql({ params: [DummyValue.DATE], stream: true })
streamForMigrationJob() {
return this.db.selectFrom('assets').select(['id']).where('assets.deletedAt', 'is', null).stream();
}
}

View File

@ -7,13 +7,12 @@ import { AssetFiles, AssetJobStatus, Assets, DB, Exif } from 'src/db';
import { Chunked, ChunkedArray, DummyValue, GenerateSql } from 'src/decorators';
import { MapAsset } from 'src/dtos/asset-response.dto';
import { AssetFileType, AssetOrder, AssetStatus, AssetType } from 'src/enum';
import { AssetSearchOptions, SearchExploreItem, SearchExploreItemSet } from 'src/repositories/search.repository';
import { SearchExploreItem, SearchExploreItemSet } from 'src/repositories/search.repository';
import {
anyUuid,
asUuid,
hasPeople,
removeUndefinedKeys,
searchAssetBuilder,
truncatedDate,
unnest,
withExif,
@ -27,7 +26,6 @@ import {
withTags,
} from 'src/utils/database';
import { globToSqlPattern } from 'src/utils/misc';
import { PaginationOptions, paginationHelper } from 'src/utils/pagination';
export type AssetStats = Record<AssetType, number>;
@ -45,11 +43,6 @@ export interface LivePhotoSearchOptions {
type: AssetType;
}
export enum WithoutProperty {
THUMBNAIL = 'thumbnail',
ENCODED_VIDEO = 'encoded-video',
}
export enum WithProperty {
SIDECAR = 'sidecar',
}
@ -331,10 +324,6 @@ export class AssetRepository {
return assets.map((asset) => asset.deviceAssetId);
}
getByUserId(pagination: PaginationOptions, userId: string, options: Omit<AssetSearchOptions, 'userIds'> = {}) {
return this.getAll(pagination, { ...options, userIds: [userId] });
}
@GenerateSql({ params: [DummyValue.UUID, DummyValue.STRING] })
getByLibraryIdAndOriginalPath(libraryId: string, originalPath: string) {
return this.db
@ -346,16 +335,6 @@ export class AssetRepository {
.executeTakeFirst();
}
async getAll(pagination: PaginationOptions, { orderDirection, ...options }: AssetSearchOptions = {}) {
const builder = searchAssetBuilder(this.db, options)
.select(withFiles)
.orderBy('assets.createdAt', orderDirection ?? 'asc')
.limit(pagination.take + 1)
.offset(pagination.skip ?? 0);
const items = await builder.execute();
return paginationHelper(items, pagination.take);
}
/**
* Get assets by device's Id on the database
* @param ownerId
@ -525,43 +504,6 @@ export class AssetRepository {
.executeTakeFirst();
}
@GenerateSql(
...Object.values(WithProperty).map((property) => ({
name: property,
params: [DummyValue.PAGINATION, property],
})),
)
async getWithout(pagination: PaginationOptions, property: WithoutProperty) {
const items = await this.db
.selectFrom('assets')
.selectAll('assets')
.$if(property === WithoutProperty.ENCODED_VIDEO, (qb) =>
qb
.where('assets.type', '=', AssetType.VIDEO)
.where((eb) => eb.or([eb('assets.encodedVideoPath', 'is', null), eb('assets.encodedVideoPath', '=', '')])),
)
.$if(property === WithoutProperty.THUMBNAIL, (qb) =>
qb
.innerJoin('asset_job_status as job_status', 'assetId', 'assets.id')
.where('assets.isVisible', '=', true)
.where((eb) =>
eb.or([
eb('job_status.previewAt', 'is', null),
eb('job_status.thumbnailAt', 'is', null),
eb('assets.thumbhash', 'is', null),
]),
),
)
.where('deletedAt', 'is', null)
.limit(pagination.take + 1)
.offset(pagination.skip ?? 0)
.orderBy('createdAt')
.execute();
return paginationHelper(items, pagination.take);
}
getStatistics(ownerId: string, { isArchived, isFavorite, isTrashed }: AssetStatsOptions): Promise<AssetStats> {
return this.db
.selectFrom('assets')

View File

@ -273,7 +273,6 @@ describe(LibraryService.name, () => {
mocks.library.get.mockResolvedValue(library);
mocks.storage.walk.mockImplementation(async function* generator() {});
mocks.asset.getAll.mockResolvedValue({ items: [assetStub.external], hasNextPage: false });
mocks.asset.getLibraryAssetCount.mockResolvedValue(1);
mocks.asset.detectOfflineExternalAssets.mockResolvedValue({ numUpdatedRows: BigInt(1) });
@ -292,7 +291,6 @@ describe(LibraryService.name, () => {
mocks.library.get.mockResolvedValue(library);
mocks.storage.walk.mockImplementation(async function* generator() {});
mocks.asset.getAll.mockResolvedValue({ items: [assetStub.external], hasNextPage: false });
mocks.asset.getLibraryAssetCount.mockResolvedValue(0);
mocks.asset.detectOfflineExternalAssets.mockResolvedValue({ numUpdatedRows: BigInt(1) });

View File

@ -38,10 +38,6 @@ describe(MediaService.name, () => {
describe('handleQueueGenerateThumbnails', () => {
it('should queue all assets', async () => {
mocks.assetJob.streamForThumbnailJob.mockReturnValue(makeStream([assetStub.image]));
mocks.asset.getAll.mockResolvedValue({
items: [assetStub.image],
hasNextPage: false,
});
mocks.person.getAll.mockReturnValue(makeStream([personStub.newThumbnail]));
mocks.person.getFacesByIds.mockResolvedValue([faceStub.face1]);
@ -67,10 +63,6 @@ describe(MediaService.name, () => {
it('should queue trashed assets when force is true', async () => {
mocks.assetJob.streamForThumbnailJob.mockReturnValue(makeStream([assetStub.archived]));
mocks.asset.getAll.mockResolvedValue({
items: [assetStub.trashed],
hasNextPage: false,
});
mocks.person.getAll.mockReturnValue(makeStream());
await sut.handleQueueGenerateThumbnails({ force: true });
@ -171,7 +163,7 @@ describe(MediaService.name, () => {
describe('handleQueueMigration', () => {
it('should remove empty directories and queue jobs', async () => {
mocks.asset.getAll.mockResolvedValue({ hasNextPage: false, items: [assetStub.image] });
mocks.assetJob.streamForMigrationJob.mockReturnValue(makeStream([assetStub.image]));
mocks.job.getJobCounts.mockResolvedValue({ active: 1, waiting: 0 } as JobCounts);
mocks.person.getAll.mockReturnValue(makeStream([personStub.withName]));

View File

@ -36,7 +36,6 @@ import {
import { getAssetFiles } from 'src/utils/asset.util';
import { BaseConfig, ThumbnailConfig } from 'src/utils/media';
import { mimeTypes } from 'src/utils/mime-types';
import { usePagination } from 'src/utils/pagination';
@Injectable()
export class MediaService extends BaseService {
@ -96,23 +95,24 @@ export class MediaService extends BaseService {
@OnJob({ name: JobName.QUEUE_MIGRATION, queue: QueueName.MIGRATION })
async handleQueueMigration(): Promise<JobStatus> {
const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) =>
this.assetRepository.getAll(pagination),
);
const { active, waiting } = await this.jobRepository.getJobCounts(QueueName.MIGRATION);
if (active === 1 && waiting === 0) {
await this.storageCore.removeEmptyDirs(StorageFolder.THUMBNAILS);
await this.storageCore.removeEmptyDirs(StorageFolder.ENCODED_VIDEO);
}
for await (const assets of assetPagination) {
await this.jobRepository.queueAll(
assets.map((asset) => ({ name: JobName.MIGRATE_ASSET, data: { id: asset.id } })),
);
let jobs: JobItem[] = [];
const assets = this.assetJobRepository.streamForMigrationJob();
for await (const asset of assets) {
jobs.push({ name: JobName.MIGRATE_ASSET, data: { id: asset.id } });
if (jobs.length >= JOBS_ASSET_PAGINATION_SIZE) {
await this.jobRepository.queueAll(jobs);
jobs = [];
}
}
let jobs: { name: JobName.MIGRATE_PERSON; data: { id: string } }[] = [];
await this.jobRepository.queueAll(jobs);
jobs = [];
for await (const person of this.personRepository.getAll()) {
jobs.push({ name: JobName.MIGRATE_PERSON, data: { id: person.id } });

View File

@ -151,7 +151,6 @@ describe(SmartInfoService.name, () => {
await sut.handleQueueEncodeClip({});
expect(mocks.asset.getWithout).not.toHaveBeenCalled();
expect(mocks.search.setDimensionSize).not.toHaveBeenCalled();
});

View File

@ -13,14 +13,11 @@ export const newAssetRepositoryMock = (): Mocked<RepositoryInterface<AssetReposi
getByIds: vitest.fn().mockResolvedValue([]),
getByIdsWithAllRelationsButStacks: vitest.fn().mockResolvedValue([]),
getByDeviceIds: vitest.fn(),
getByUserId: vitest.fn(),
getById: vitest.fn(),
getWithout: vitest.fn(),
getByChecksum: vitest.fn(),
getByChecksums: vitest.fn(),
getUploadAssetIdByChecksum: vitest.fn(),
getRandom: vitest.fn(),
getAll: vitest.fn().mockResolvedValue({ items: [], hasNextPage: false }),
getAllByDeviceId: vitest.fn(),
getLivePhotoCount: vitest.fn(),
getLibraryAssetCount: vitest.fn(),