mirror of
https://github.com/immich-app/immich.git
synced 2025-05-24 01:12:58 -04:00
refactor: job on-done (#18004)
This commit is contained in:
parent
84b51e3cbb
commit
becdc3dcf5
@ -19,7 +19,7 @@ import { ReleaseNotification, ServerVersionResponseDto } from 'src/dtos/server.d
|
|||||||
import { ImmichWorker, MetadataKey, QueueName } from 'src/enum';
|
import { ImmichWorker, MetadataKey, QueueName } from 'src/enum';
|
||||||
import { ConfigRepository } from 'src/repositories/config.repository';
|
import { ConfigRepository } from 'src/repositories/config.repository';
|
||||||
import { LoggingRepository } from 'src/repositories/logging.repository';
|
import { LoggingRepository } from 'src/repositories/logging.repository';
|
||||||
import { JobItem } from 'src/types';
|
import { JobItem, JobSource } from 'src/types';
|
||||||
import { handlePromiseError } from 'src/utils/misc';
|
import { handlePromiseError } from 'src/utils/misc';
|
||||||
|
|
||||||
type EmitHandlers = Partial<{ [T in EmitEvent]: Array<EventItem<T>> }>;
|
type EmitHandlers = Partial<{ [T in EmitEvent]: Array<EventItem<T>> }>;
|
||||||
@ -58,6 +58,7 @@ type EventMap = {
|
|||||||
'asset.show': [{ assetId: string; userId: string }];
|
'asset.show': [{ assetId: string; userId: string }];
|
||||||
'asset.trash': [{ assetId: string; userId: string }];
|
'asset.trash': [{ assetId: string; userId: string }];
|
||||||
'asset.delete': [{ assetId: string; userId: string }];
|
'asset.delete': [{ assetId: string; userId: string }];
|
||||||
|
'asset.metadataExtracted': [{ assetId: string; userId: string; source?: JobSource }];
|
||||||
|
|
||||||
// asset bulk events
|
// asset bulk events
|
||||||
'assets.trash': [{ assetIds: string[]; userId: string }];
|
'assets.trash': [{ assetIds: string[]; userId: string }];
|
||||||
|
@ -239,10 +239,6 @@ describe(JobService.name, () => {
|
|||||||
item: { name: JobName.SIDECAR_DISCOVERY, data: { id: 'asset-1' } },
|
item: { name: JobName.SIDECAR_DISCOVERY, data: { id: 'asset-1' } },
|
||||||
jobs: [JobName.METADATA_EXTRACTION],
|
jobs: [JobName.METADATA_EXTRACTION],
|
||||||
},
|
},
|
||||||
{
|
|
||||||
item: { name: JobName.METADATA_EXTRACTION, data: { id: 'asset-1' } },
|
|
||||||
jobs: [JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE],
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
item: { name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: { id: 'asset-1', source: 'upload' } },
|
item: { name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: { id: 'asset-1', source: 'upload' } },
|
||||||
jobs: [JobName.GENERATE_THUMBNAILS],
|
jobs: [JobName.GENERATE_THUMBNAILS],
|
||||||
|
@ -264,17 +264,6 @@ export class JobService extends BaseService {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case JobName.METADATA_EXTRACTION: {
|
|
||||||
if (item.data.source === 'sidecar-write') {
|
|
||||||
const [asset] = await this.assetRepository.getByIdsWithAllRelationsButStacks([item.data.id]);
|
|
||||||
if (asset) {
|
|
||||||
this.eventRepository.clientSend('on_asset_update', asset.ownerId, mapAsset(asset));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
await this.jobRepository.queue({ name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: item.data });
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: {
|
case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: {
|
||||||
if (item.data.source === 'upload' || item.data.source === 'copy') {
|
if (item.data.source === 'upload' || item.data.source === 'copy') {
|
||||||
await this.jobRepository.queue({ name: JobName.GENERATE_THUMBNAILS, data: item.data });
|
await this.jobRepository.queue({ name: JobName.GENERATE_THUMBNAILS, data: item.data });
|
||||||
|
@ -143,7 +143,8 @@ describe(MetadataService.name, () => {
|
|||||||
|
|
||||||
it('should handle an asset that could not be found', async () => {
|
it('should handle an asset that could not be found', async () => {
|
||||||
mocks.assetJob.getForMetadataExtraction.mockResolvedValue(void 0);
|
mocks.assetJob.getForMetadataExtraction.mockResolvedValue(void 0);
|
||||||
await expect(sut.handleMetadataExtraction({ id: assetStub.image.id })).resolves.toBe(JobStatus.FAILED);
|
|
||||||
|
await sut.handleMetadataExtraction({ id: assetStub.image.id });
|
||||||
|
|
||||||
expect(mocks.assetJob.getForMetadataExtraction).toHaveBeenCalledWith(assetStub.image.id);
|
expect(mocks.assetJob.getForMetadataExtraction).toHaveBeenCalledWith(assetStub.image.id);
|
||||||
expect(mocks.asset.upsertExif).not.toHaveBeenCalled();
|
expect(mocks.asset.upsertExif).not.toHaveBeenCalled();
|
||||||
@ -526,7 +527,7 @@ describe(MetadataService.name, () => {
|
|||||||
ContainerDirectory: [{ Foo: 100 }],
|
ContainerDirectory: [{ Foo: 100 }],
|
||||||
});
|
});
|
||||||
|
|
||||||
await expect(sut.handleMetadataExtraction({ id: assetStub.image.id })).resolves.toBe(JobStatus.SUCCESS);
|
await sut.handleMetadataExtraction({ id: assetStub.image.id });
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should extract the correct video orientation', async () => {
|
it('should extract the correct video orientation', async () => {
|
||||||
@ -1201,7 +1202,7 @@ describe(MetadataService.name, () => {
|
|||||||
it('should handle livePhotoCID not set', async () => {
|
it('should handle livePhotoCID not set', async () => {
|
||||||
mocks.assetJob.getForMetadataExtraction.mockResolvedValue(assetStub.image);
|
mocks.assetJob.getForMetadataExtraction.mockResolvedValue(assetStub.image);
|
||||||
|
|
||||||
await expect(sut.handleMetadataExtraction({ id: assetStub.image.id })).resolves.toBe(JobStatus.SUCCESS);
|
await sut.handleMetadataExtraction({ id: assetStub.image.id });
|
||||||
|
|
||||||
expect(mocks.assetJob.getForMetadataExtraction).toHaveBeenCalledWith(assetStub.image.id);
|
expect(mocks.assetJob.getForMetadataExtraction).toHaveBeenCalledWith(assetStub.image.id);
|
||||||
expect(mocks.asset.findLivePhotoMatch).not.toHaveBeenCalled();
|
expect(mocks.asset.findLivePhotoMatch).not.toHaveBeenCalled();
|
||||||
@ -1214,9 +1215,7 @@ describe(MetadataService.name, () => {
|
|||||||
mocks.assetJob.getForMetadataExtraction.mockResolvedValue(assetStub.livePhotoMotionAsset);
|
mocks.assetJob.getForMetadataExtraction.mockResolvedValue(assetStub.livePhotoMotionAsset);
|
||||||
mockReadTags({ ContentIdentifier: 'CID' });
|
mockReadTags({ ContentIdentifier: 'CID' });
|
||||||
|
|
||||||
await expect(sut.handleMetadataExtraction({ id: assetStub.livePhotoMotionAsset.id })).resolves.toBe(
|
await sut.handleMetadataExtraction({ id: assetStub.livePhotoMotionAsset.id });
|
||||||
JobStatus.SUCCESS,
|
|
||||||
);
|
|
||||||
|
|
||||||
expect(mocks.assetJob.getForMetadataExtraction).toHaveBeenCalledWith(assetStub.livePhotoMotionAsset.id);
|
expect(mocks.assetJob.getForMetadataExtraction).toHaveBeenCalledWith(assetStub.livePhotoMotionAsset.id);
|
||||||
expect(mocks.asset.findLivePhotoMatch).toHaveBeenCalledWith({
|
expect(mocks.asset.findLivePhotoMatch).toHaveBeenCalledWith({
|
||||||
@ -1235,9 +1234,7 @@ describe(MetadataService.name, () => {
|
|||||||
mocks.asset.findLivePhotoMatch.mockResolvedValue(assetStub.livePhotoMotionAsset);
|
mocks.asset.findLivePhotoMatch.mockResolvedValue(assetStub.livePhotoMotionAsset);
|
||||||
mockReadTags({ ContentIdentifier: 'CID' });
|
mockReadTags({ ContentIdentifier: 'CID' });
|
||||||
|
|
||||||
await expect(sut.handleMetadataExtraction({ id: assetStub.livePhotoStillAsset.id })).resolves.toBe(
|
await sut.handleMetadataExtraction({ id: assetStub.livePhotoStillAsset.id });
|
||||||
JobStatus.SUCCESS,
|
|
||||||
);
|
|
||||||
|
|
||||||
expect(mocks.assetJob.getForMetadataExtraction).toHaveBeenCalledWith(assetStub.livePhotoStillAsset.id);
|
expect(mocks.assetJob.getForMetadataExtraction).toHaveBeenCalledWith(assetStub.livePhotoStillAsset.id);
|
||||||
expect(mocks.asset.findLivePhotoMatch).toHaveBeenCalledWith({
|
expect(mocks.asset.findLivePhotoMatch).toHaveBeenCalledWith({
|
||||||
@ -1261,9 +1258,7 @@ describe(MetadataService.name, () => {
|
|||||||
mocks.asset.findLivePhotoMatch.mockResolvedValue(assetStub.livePhotoMotionAsset);
|
mocks.asset.findLivePhotoMatch.mockResolvedValue(assetStub.livePhotoMotionAsset);
|
||||||
mockReadTags({ ContentIdentifier: 'CID' });
|
mockReadTags({ ContentIdentifier: 'CID' });
|
||||||
|
|
||||||
await expect(sut.handleMetadataExtraction({ id: assetStub.livePhotoStillAsset.id })).resolves.toBe(
|
await sut.handleMetadataExtraction({ id: assetStub.livePhotoStillAsset.id });
|
||||||
JobStatus.SUCCESS,
|
|
||||||
);
|
|
||||||
|
|
||||||
expect(mocks.event.emit).toHaveBeenCalledWith('asset.hide', {
|
expect(mocks.event.emit).toHaveBeenCalledWith('asset.hide', {
|
||||||
userId: assetStub.livePhotoMotionAsset.ownerId,
|
userId: assetStub.livePhotoMotionAsset.ownerId,
|
||||||
@ -1279,10 +1274,12 @@ describe(MetadataService.name, () => {
|
|||||||
mocks.asset.findLivePhotoMatch.mockResolvedValue(assetStub.livePhotoMotionAsset);
|
mocks.asset.findLivePhotoMatch.mockResolvedValue(assetStub.livePhotoMotionAsset);
|
||||||
mockReadTags({ ContentIdentifier: 'CID' });
|
mockReadTags({ ContentIdentifier: 'CID' });
|
||||||
|
|
||||||
await expect(sut.handleMetadataExtraction({ id: assetStub.livePhotoStillAsset.id })).resolves.toBe(
|
await sut.handleMetadataExtraction({ id: assetStub.livePhotoStillAsset.id });
|
||||||
JobStatus.SUCCESS,
|
|
||||||
);
|
|
||||||
|
|
||||||
|
expect(mocks.event.emit).toHaveBeenCalledWith('asset.metadataExtracted', {
|
||||||
|
assetId: assetStub.livePhotoStillAsset.id,
|
||||||
|
userId: assetStub.livePhotoStillAsset.ownerId,
|
||||||
|
});
|
||||||
expect(mocks.asset.findLivePhotoMatch).toHaveBeenCalledWith({
|
expect(mocks.asset.findLivePhotoMatch).toHaveBeenCalledWith({
|
||||||
ownerId: 'user-id',
|
ownerId: 'user-id',
|
||||||
otherAssetId: 'live-photo-still-asset',
|
otherAssetId: 'live-photo-still-asset',
|
||||||
|
@ -182,14 +182,14 @@ export class MetadataService extends BaseService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@OnJob({ name: JobName.METADATA_EXTRACTION, queue: QueueName.METADATA_EXTRACTION })
|
@OnJob({ name: JobName.METADATA_EXTRACTION, queue: QueueName.METADATA_EXTRACTION })
|
||||||
async handleMetadataExtraction(data: JobOf<JobName.METADATA_EXTRACTION>): Promise<JobStatus> {
|
async handleMetadataExtraction(data: JobOf<JobName.METADATA_EXTRACTION>) {
|
||||||
const [{ metadata, reverseGeocoding }, asset] = await Promise.all([
|
const [{ metadata, reverseGeocoding }, asset] = await Promise.all([
|
||||||
this.getConfig({ withCache: true }),
|
this.getConfig({ withCache: true }),
|
||||||
this.assetJobRepository.getForMetadataExtraction(data.id),
|
this.assetJobRepository.getForMetadataExtraction(data.id),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
if (!asset) {
|
if (!asset) {
|
||||||
return JobStatus.FAILED;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const [exifTags, stats] = await Promise.all([
|
const [exifTags, stats] = await Promise.all([
|
||||||
@ -283,7 +283,11 @@ export class MetadataService extends BaseService {
|
|||||||
|
|
||||||
await this.assetRepository.upsertJobStatus({ assetId: asset.id, metadataExtractedAt: new Date() });
|
await this.assetRepository.upsertJobStatus({ assetId: asset.id, metadataExtractedAt: new Date() });
|
||||||
|
|
||||||
return JobStatus.SUCCESS;
|
await this.eventRepository.emit('asset.metadataExtracted', {
|
||||||
|
assetId: asset.id,
|
||||||
|
userId: asset.ownerId,
|
||||||
|
source: data.source,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnJob({ name: JobName.QUEUE_SIDECAR, queue: QueueName.SIDECAR })
|
@OnJob({ name: JobName.QUEUE_SIDECAR, queue: QueueName.SIDECAR })
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
import { BadRequestException, Injectable } from '@nestjs/common';
|
import { BadRequestException, Injectable } from '@nestjs/common';
|
||||||
import { OnEvent, OnJob } from 'src/decorators';
|
import { OnEvent, OnJob } from 'src/decorators';
|
||||||
|
import { mapAsset } from 'src/dtos/asset-response.dto';
|
||||||
import { AuthDto } from 'src/dtos/auth.dto';
|
import { AuthDto } from 'src/dtos/auth.dto';
|
||||||
import {
|
import {
|
||||||
mapNotification,
|
mapNotification,
|
||||||
@ -152,6 +153,18 @@ export class NotificationService extends BaseService {
|
|||||||
this.eventRepository.clientSend('on_asset_trash', userId, assetIds);
|
this.eventRepository.clientSend('on_asset_trash', userId, assetIds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@OnEvent({ name: 'asset.metadataExtracted' })
|
||||||
|
async onAssetMetadataExtracted({ assetId, userId, source }: ArgOf<'asset.metadataExtracted'>) {
|
||||||
|
if (source !== 'sidecar-write') {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const [asset] = await this.assetRepository.getByIdsWithAllRelationsButStacks([assetId]);
|
||||||
|
if (asset) {
|
||||||
|
this.eventRepository.clientSend('on_asset_update', userId, mapAsset(asset));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@OnEvent({ name: 'assets.restore' })
|
@OnEvent({ name: 'assets.restore' })
|
||||||
onAssetsRestore({ assetIds, userId }: ArgOf<'assets.restore'>) {
|
onAssetsRestore({ assetIds, userId }: ArgOf<'assets.restore'>) {
|
||||||
this.eventRepository.clientSend('on_asset_restore', userId, assetIds);
|
this.eventRepository.clientSend('on_asset_restore', userId, assetIds);
|
||||||
|
@ -116,6 +116,11 @@ export class StorageTemplateService extends BaseService {
|
|||||||
return { ...storageTokens, presetOptions: storagePresets };
|
return { ...storageTokens, presetOptions: storagePresets };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@OnEvent({ name: 'asset.metadataExtracted' })
|
||||||
|
async onAssetMetadataExtracted({ source, assetId }: ArgOf<'asset.metadataExtracted'>) {
|
||||||
|
await this.jobRepository.queue({ name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: { source, id: assetId } });
|
||||||
|
}
|
||||||
|
|
||||||
@OnJob({ name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, queue: QueueName.STORAGE_TEMPLATE_MIGRATION })
|
@OnJob({ name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, queue: QueueName.STORAGE_TEMPLATE_MIGRATION })
|
||||||
async handleMigrationSingle({ id }: JobOf<JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE>): Promise<JobStatus> {
|
async handleMigrationSingle({ id }: JobOf<JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE>): Promise<JobStatus> {
|
||||||
const config = await this.getConfig({ withCache: true });
|
const config = await this.getConfig({ withCache: true });
|
||||||
|
@ -177,9 +177,10 @@ export interface IDelayedJob extends IBaseJob {
|
|||||||
delay?: number;
|
delay?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type JobSource = 'upload' | 'sidecar-write' | 'copy';
|
||||||
export interface IEntityJob extends IBaseJob {
|
export interface IEntityJob extends IBaseJob {
|
||||||
id: string;
|
id: string;
|
||||||
source?: 'upload' | 'sidecar-write' | 'copy';
|
source?: JobSource;
|
||||||
notify?: boolean;
|
notify?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user