mirror of
https://github.com/immich-app/immich.git
synced 2025-05-24 01:12:58 -04:00
refactor: stream assets for thumbnail job (#17623)
This commit is contained in:
parent
b710ad36f3
commit
5bceefce75
@ -59,6 +59,37 @@ where
|
||||
limit
|
||||
$2
|
||||
|
||||
-- AssetJobRepository.streamForThumbnailJob
|
||||
select
|
||||
"assets"."id",
|
||||
"assets"."thumbhash",
|
||||
(
|
||||
select
|
||||
coalesce(json_agg(agg), '[]')
|
||||
from
|
||||
(
|
||||
select
|
||||
"asset_files"."id",
|
||||
"asset_files"."path",
|
||||
"asset_files"."type"
|
||||
from
|
||||
"asset_files"
|
||||
where
|
||||
"asset_files"."assetId" = "assets"."id"
|
||||
) as agg
|
||||
) as "files"
|
||||
from
|
||||
"assets"
|
||||
inner join "asset_job_status" on "asset_job_status"."assetId" = "assets"."id"
|
||||
where
|
||||
"assets"."deletedAt" is null
|
||||
and "assets"."isVisible" = $1
|
||||
and (
|
||||
"asset_job_status"."previewAt" is null
|
||||
or "asset_job_status"."thumbnailAt" is null
|
||||
or "assets"."thumbhash" is null
|
||||
)
|
||||
|
||||
-- AssetJobRepository.getForStorageTemplateJob
|
||||
select
|
||||
"assets"."id",
|
||||
|
@ -54,6 +54,29 @@ export class AssetJobRepository {
|
||||
.executeTakeFirst();
|
||||
}
|
||||
|
||||
@GenerateSql({ params: [false], stream: true })
|
||||
streamForThumbnailJob(force: boolean) {
|
||||
return this.db
|
||||
.selectFrom('assets')
|
||||
.select(['assets.id', 'assets.thumbhash'])
|
||||
.select(withFiles)
|
||||
.where('assets.deletedAt', 'is', null)
|
||||
.where('assets.isVisible', '=', true)
|
||||
.$if(!force, (qb) =>
|
||||
qb
|
||||
// If there aren't any entries, metadata extraction hasn't run yet which is required for thumbnails
|
||||
.innerJoin('asset_job_status', 'asset_job_status.assetId', 'assets.id')
|
||||
.where((eb) =>
|
||||
eb.or([
|
||||
eb('asset_job_status.previewAt', 'is', null),
|
||||
eb('asset_job_status.thumbnailAt', 'is', null),
|
||||
eb('assets.thumbhash', 'is', null),
|
||||
]),
|
||||
),
|
||||
)
|
||||
.stream();
|
||||
}
|
||||
|
||||
private storageTemplateAssetQuery() {
|
||||
return this.db
|
||||
.selectFrom('assets')
|
||||
|
@ -39,6 +39,7 @@ describe(MediaService.name, () => {
|
||||
|
||||
describe('handleQueueGenerateThumbnails', () => {
|
||||
it('should queue all assets', async () => {
|
||||
mocks.assetJob.streamForThumbnailJob.mockReturnValue(makeStream([assetStub.image]));
|
||||
mocks.asset.getAll.mockResolvedValue({
|
||||
items: [assetStub.image],
|
||||
hasNextPage: false,
|
||||
@ -49,8 +50,7 @@ describe(MediaService.name, () => {
|
||||
|
||||
await sut.handleQueueGenerateThumbnails({ force: true });
|
||||
|
||||
expect(mocks.asset.getAll).toHaveBeenCalled();
|
||||
expect(mocks.asset.getWithout).not.toHaveBeenCalled();
|
||||
expect(mocks.assetJob.streamForThumbnailJob).toHaveBeenCalledWith(true);
|
||||
expect(mocks.job.queueAll).toHaveBeenCalledWith([
|
||||
{
|
||||
name: JobName.GENERATE_THUMBNAILS,
|
||||
@ -68,6 +68,7 @@ describe(MediaService.name, () => {
|
||||
});
|
||||
|
||||
it('should queue trashed assets when force is true', async () => {
|
||||
mocks.assetJob.streamForThumbnailJob.mockReturnValue(makeStream([assetStub.archived]));
|
||||
mocks.asset.getAll.mockResolvedValue({
|
||||
items: [assetStub.trashed],
|
||||
hasNextPage: false,
|
||||
@ -76,11 +77,7 @@ describe(MediaService.name, () => {
|
||||
|
||||
await sut.handleQueueGenerateThumbnails({ force: true });
|
||||
|
||||
expect(mocks.asset.getAll).toHaveBeenCalledWith(
|
||||
{ skip: 0, take: 1000 },
|
||||
expect.objectContaining({ withDeleted: true }),
|
||||
);
|
||||
expect(mocks.asset.getWithout).not.toHaveBeenCalled();
|
||||
expect(mocks.assetJob.streamForThumbnailJob).toHaveBeenCalledWith(true);
|
||||
expect(mocks.job.queueAll).toHaveBeenCalledWith([
|
||||
{
|
||||
name: JobName.GENERATE_THUMBNAILS,
|
||||
@ -90,19 +87,12 @@ describe(MediaService.name, () => {
|
||||
});
|
||||
|
||||
it('should queue archived assets when force is true', async () => {
|
||||
mocks.asset.getAll.mockResolvedValue({
|
||||
items: [assetStub.archived],
|
||||
hasNextPage: false,
|
||||
});
|
||||
mocks.assetJob.streamForThumbnailJob.mockReturnValue(makeStream([assetStub.archived]));
|
||||
mocks.person.getAll.mockReturnValue(makeStream());
|
||||
|
||||
await sut.handleQueueGenerateThumbnails({ force: true });
|
||||
|
||||
expect(mocks.asset.getAll).toHaveBeenCalledWith(
|
||||
{ skip: 0, take: 1000 },
|
||||
expect.objectContaining({ withArchived: true }),
|
||||
);
|
||||
expect(mocks.asset.getWithout).not.toHaveBeenCalled();
|
||||
expect(mocks.assetJob.streamForThumbnailJob).toHaveBeenCalledWith(true);
|
||||
expect(mocks.job.queueAll).toHaveBeenCalledWith([
|
||||
{
|
||||
name: JobName.GENERATE_THUMBNAILS,
|
||||
@ -112,18 +102,13 @@ describe(MediaService.name, () => {
|
||||
});
|
||||
|
||||
it('should queue all people with missing thumbnail path', async () => {
|
||||
mocks.asset.getWithout.mockResolvedValue({
|
||||
items: [assetStub.image],
|
||||
hasNextPage: false,
|
||||
});
|
||||
mocks.assetJob.streamForThumbnailJob.mockReturnValue(makeStream([assetStub.image]));
|
||||
mocks.person.getAll.mockReturnValue(makeStream([personStub.noThumbnail, personStub.noThumbnail]));
|
||||
mocks.person.getRandomFace.mockResolvedValueOnce(faceStub.face1);
|
||||
|
||||
await sut.handleQueueGenerateThumbnails({ force: false });
|
||||
|
||||
expect(mocks.asset.getAll).not.toHaveBeenCalled();
|
||||
expect(mocks.asset.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.THUMBNAIL);
|
||||
|
||||
expect(mocks.assetJob.streamForThumbnailJob).toHaveBeenCalledWith(false);
|
||||
expect(mocks.person.getAll).toHaveBeenCalledWith({ thumbnailPath: '' });
|
||||
expect(mocks.person.getRandomFace).toHaveBeenCalled();
|
||||
expect(mocks.person.update).toHaveBeenCalledTimes(1);
|
||||
@ -138,15 +123,11 @@ describe(MediaService.name, () => {
|
||||
});
|
||||
|
||||
it('should queue all assets with missing resize path', async () => {
|
||||
mocks.asset.getWithout.mockResolvedValue({
|
||||
items: [assetStub.noResizePath],
|
||||
hasNextPage: false,
|
||||
});
|
||||
mocks.assetJob.streamForThumbnailJob.mockReturnValue(makeStream([assetStub.noResizePath]));
|
||||
mocks.person.getAll.mockReturnValue(makeStream());
|
||||
await sut.handleQueueGenerateThumbnails({ force: false });
|
||||
|
||||
expect(mocks.asset.getAll).not.toHaveBeenCalled();
|
||||
expect(mocks.asset.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.THUMBNAIL);
|
||||
expect(mocks.assetJob.streamForThumbnailJob).toHaveBeenCalledWith(false);
|
||||
expect(mocks.job.queueAll).toHaveBeenCalledWith([
|
||||
{
|
||||
name: JobName.GENERATE_THUMBNAILS,
|
||||
@ -158,15 +139,11 @@ describe(MediaService.name, () => {
|
||||
});
|
||||
|
||||
it('should queue all assets with missing webp path', async () => {
|
||||
mocks.asset.getWithout.mockResolvedValue({
|
||||
items: [assetStub.noWebpPath],
|
||||
hasNextPage: false,
|
||||
});
|
||||
mocks.assetJob.streamForThumbnailJob.mockReturnValue(makeStream([assetStub.noWebpPath]));
|
||||
mocks.person.getAll.mockReturnValue(makeStream());
|
||||
await sut.handleQueueGenerateThumbnails({ force: false });
|
||||
|
||||
expect(mocks.asset.getAll).not.toHaveBeenCalled();
|
||||
expect(mocks.asset.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.THUMBNAIL);
|
||||
expect(mocks.assetJob.streamForThumbnailJob).toHaveBeenCalledWith(false);
|
||||
expect(mocks.job.queueAll).toHaveBeenCalledWith([
|
||||
{
|
||||
name: JobName.GENERATE_THUMBNAILS,
|
||||
@ -178,15 +155,11 @@ describe(MediaService.name, () => {
|
||||
});
|
||||
|
||||
it('should queue all assets with missing thumbhash', async () => {
|
||||
mocks.asset.getWithout.mockResolvedValue({
|
||||
items: [assetStub.noThumbhash],
|
||||
hasNextPage: false,
|
||||
});
|
||||
mocks.assetJob.streamForThumbnailJob.mockReturnValue(makeStream([assetStub.noThumbhash]));
|
||||
mocks.person.getAll.mockReturnValue(makeStream());
|
||||
await sut.handleQueueGenerateThumbnails({ force: false });
|
||||
|
||||
expect(mocks.asset.getAll).not.toHaveBeenCalled();
|
||||
expect(mocks.asset.getWithout).toHaveBeenCalledWith({ skip: 0, take: 1000 }, WithoutProperty.THUMBNAIL);
|
||||
expect(mocks.assetJob.streamForThumbnailJob).toHaveBeenCalledWith(false);
|
||||
expect(mocks.job.queueAll).toHaveBeenCalledWith([
|
||||
{
|
||||
name: JobName.GENERATE_THUMBNAILS,
|
||||
|
@ -51,30 +51,16 @@ export class MediaService extends BaseService {
|
||||
|
||||
@OnJob({ name: JobName.QUEUE_GENERATE_THUMBNAILS, queue: QueueName.THUMBNAIL_GENERATION })
|
||||
async handleQueueGenerateThumbnails({ force }: JobOf<JobName.QUEUE_GENERATE_THUMBNAILS>): Promise<JobStatus> {
|
||||
const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) => {
|
||||
return force
|
||||
? this.assetRepository.getAll(pagination, {
|
||||
isVisible: true,
|
||||
withDeleted: true,
|
||||
withArchived: true,
|
||||
})
|
||||
: this.assetRepository.getWithout(pagination, WithoutProperty.THUMBNAIL);
|
||||
});
|
||||
|
||||
for await (const assets of assetPagination) {
|
||||
const jobs: JobItem[] = [];
|
||||
|
||||
for (const asset of assets) {
|
||||
const thumbJobs: JobItem[] = [];
|
||||
for await (const asset of this.assetJobRepository.streamForThumbnailJob(!!force)) {
|
||||
const { previewFile, thumbnailFile } = getAssetFiles(asset.files);
|
||||
|
||||
if (!previewFile || !thumbnailFile || !asset.thumbhash || force) {
|
||||
jobs.push({ name: JobName.GENERATE_THUMBNAILS, data: { id: asset.id } });
|
||||
thumbJobs.push({ name: JobName.GENERATE_THUMBNAILS, data: { id: asset.id } });
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
await this.jobRepository.queueAll(jobs);
|
||||
}
|
||||
await this.jobRepository.queueAll(thumbJobs);
|
||||
|
||||
const jobs: JobItem[] = [];
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user