mirror of
https://github.com/immich-app/immich.git
synced 2025-05-24 01:12:58 -04:00
We've been keen to try this for a while as it means we can remove redis as a dependency, which makes Immich easier to setup and run. This replaces bullmq with a bespoke postgres queue. Jobs in the queue are processed either immediately via triggers and notifications, or eventually if a notification is missed.
337 lines
11 KiB
TypeScript
337 lines
11 KiB
TypeScript
import { BadRequestException, Injectable } from '@nestjs/common';
|
|
import { ClassConstructor } from 'class-transformer';
|
|
import { snakeCase } from 'lodash';
|
|
import { OnEvent } from 'src/decorators';
|
|
import { mapAsset } from 'src/dtos/asset-response.dto';
|
|
import { AllJobStatusResponseDto, JobCommandDto, JobCreateDto, JobStatusDto } from 'src/dtos/job.dto';
|
|
import {
|
|
AssetType,
|
|
BootstrapEventPriority,
|
|
ImmichWorker,
|
|
JobCommand,
|
|
JobName,
|
|
JobStatus,
|
|
ManualJobName,
|
|
QueueName,
|
|
} from 'src/enum';
|
|
import { ArgOf, ArgsOf } from 'src/repositories/event.repository';
|
|
import { BaseService } from 'src/services/base.service';
|
|
import { ConcurrentQueueName, JobItem } from 'src/types';
|
|
|
|
const asJobItem = (dto: JobCreateDto): JobItem => {
|
|
switch (dto.name) {
|
|
case ManualJobName.TAG_CLEANUP: {
|
|
return { name: JobName.TAG_CLEANUP };
|
|
}
|
|
|
|
case ManualJobName.PERSON_CLEANUP: {
|
|
return { name: JobName.PERSON_CLEANUP };
|
|
}
|
|
|
|
case ManualJobName.USER_CLEANUP: {
|
|
return { name: JobName.USER_DELETE_CHECK };
|
|
}
|
|
|
|
case ManualJobName.MEMORY_CLEANUP: {
|
|
return { name: JobName.MEMORIES_CLEANUP };
|
|
}
|
|
|
|
case ManualJobName.MEMORY_CREATE: {
|
|
return { name: JobName.MEMORIES_CREATE };
|
|
}
|
|
|
|
case ManualJobName.BACKUP_DATABASE: {
|
|
return { name: JobName.BACKUP_DATABASE };
|
|
}
|
|
|
|
default: {
|
|
throw new BadRequestException('Invalid job name');
|
|
}
|
|
}
|
|
};
|
|
|
|
@Injectable()
|
|
export class JobService extends BaseService {
|
|
private services: ClassConstructor<unknown>[] = [];
|
|
|
|
@OnEvent({ name: 'config.init', workers: [ImmichWorker.MICROSERVICES] })
|
|
async onConfigInit({ newConfig: config }: ArgOf<'config.init'>) {
|
|
this.logger.debug(`Updating queue concurrency settings`);
|
|
for (const queueName of Object.values(QueueName)) {
|
|
let concurrency = 1;
|
|
if (this.isConcurrentQueue(queueName)) {
|
|
concurrency = config.job[queueName].concurrency;
|
|
}
|
|
this.logger.debug(`Setting ${queueName} concurrency to ${concurrency}`);
|
|
await this.jobRepository.start(queueName, concurrency);
|
|
}
|
|
}
|
|
|
|
@OnEvent({ name: 'config.update', server: true, workers: [ImmichWorker.MICROSERVICES] })
|
|
async onConfigUpdate({ newConfig: config }: ArgOf<'config.update'>) {
|
|
await this.onConfigInit({ newConfig: config });
|
|
}
|
|
|
|
@OnEvent({ name: 'app.bootstrap', priority: BootstrapEventPriority.JobService })
|
|
async onBootstrap() {
|
|
await this.jobRepository.setup(this.services);
|
|
}
|
|
|
|
setServices(services: ClassConstructor<unknown>[]) {
|
|
this.services = services;
|
|
}
|
|
|
|
async create(dto: JobCreateDto): Promise<void> {
|
|
await this.jobRepository.queue(asJobItem(dto));
|
|
}
|
|
|
|
async handleCommand(queueName: QueueName, dto: JobCommandDto): Promise<JobStatusDto> {
|
|
this.logger.debug(`Handling command: queue=${queueName},command=${dto.command},force=${dto.force}`);
|
|
|
|
switch (dto.command) {
|
|
case JobCommand.START: {
|
|
await this.start(queueName, dto);
|
|
break;
|
|
}
|
|
case JobCommand.PAUSE: {
|
|
this.eventRepository.serverSend('queue.pause', queueName);
|
|
break;
|
|
}
|
|
case JobCommand.RESUME: {
|
|
this.eventRepository.serverSend('queue.resume', queueName);
|
|
break;
|
|
}
|
|
case JobCommand.CLEAR: {
|
|
await this.jobRepository.clear(queueName);
|
|
break;
|
|
}
|
|
case JobCommand.CLEAR_FAILED: {
|
|
await this.jobRepository.clearFailed(queueName);
|
|
break;
|
|
}
|
|
}
|
|
|
|
return this.getJobStatus(queueName);
|
|
}
|
|
|
|
async getJobStatus(queueName: QueueName): Promise<JobStatusDto> {
|
|
const [jobCounts, queueStatus] = await Promise.all([
|
|
this.jobRepository.getJobCounts(queueName),
|
|
this.jobRepository.getQueueStatus(queueName),
|
|
]);
|
|
|
|
return { jobCounts, queueStatus };
|
|
}
|
|
|
|
async getAllJobsStatus(): Promise<AllJobStatusResponseDto> {
|
|
const response = new AllJobStatusResponseDto();
|
|
for (const queueName of Object.values(QueueName)) {
|
|
response[queueName] = await this.getJobStatus(queueName);
|
|
}
|
|
return response;
|
|
}
|
|
|
|
private async start(name: QueueName, { force }: JobCommandDto): Promise<void> {
|
|
const { active } = await this.jobRepository.getJobCounts(name);
|
|
if (active > 0) {
|
|
throw new BadRequestException(`Jobs are already running`);
|
|
}
|
|
|
|
this.telemetryRepository.jobs.addToCounter(`immich.queues.${snakeCase(name)}.started`, 1);
|
|
|
|
switch (name) {
|
|
case QueueName.VIDEO_CONVERSION: {
|
|
return this.jobRepository.queue({ name: JobName.QUEUE_VIDEO_CONVERSION, data: { force } });
|
|
}
|
|
|
|
case QueueName.STORAGE_TEMPLATE_MIGRATION: {
|
|
return this.jobRepository.queue({ name: JobName.STORAGE_TEMPLATE_MIGRATION });
|
|
}
|
|
|
|
case QueueName.MIGRATION: {
|
|
return this.jobRepository.queue({ name: JobName.QUEUE_MIGRATION });
|
|
}
|
|
|
|
case QueueName.SMART_SEARCH: {
|
|
return this.jobRepository.queue({ name: JobName.QUEUE_SMART_SEARCH, data: { force } });
|
|
}
|
|
|
|
case QueueName.DUPLICATE_DETECTION: {
|
|
return this.jobRepository.queue({ name: JobName.QUEUE_DUPLICATE_DETECTION, data: { force } });
|
|
}
|
|
|
|
case QueueName.METADATA_EXTRACTION: {
|
|
return this.jobRepository.queue({ name: JobName.QUEUE_METADATA_EXTRACTION, data: { force } });
|
|
}
|
|
|
|
case QueueName.SIDECAR: {
|
|
return this.jobRepository.queue({ name: JobName.QUEUE_SIDECAR, data: { force } });
|
|
}
|
|
|
|
case QueueName.THUMBNAIL_GENERATION: {
|
|
return this.jobRepository.queue({ name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force } });
|
|
}
|
|
|
|
case QueueName.FACE_DETECTION: {
|
|
return this.jobRepository.queue({ name: JobName.QUEUE_FACE_DETECTION, data: { force } });
|
|
}
|
|
|
|
case QueueName.FACIAL_RECOGNITION: {
|
|
return this.jobRepository.queue({ name: JobName.QUEUE_FACIAL_RECOGNITION, data: { force } });
|
|
}
|
|
|
|
case QueueName.LIBRARY: {
|
|
return this.jobRepository.queue({ name: JobName.LIBRARY_QUEUE_SCAN_ALL, data: { force } });
|
|
}
|
|
|
|
case QueueName.BACKUP_DATABASE: {
|
|
return this.jobRepository.queue({ name: JobName.BACKUP_DATABASE, data: { force } });
|
|
}
|
|
|
|
default: {
|
|
throw new BadRequestException(`Invalid job name: ${name}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
@OnEvent({ name: 'queue.pause', server: true, workers: [ImmichWorker.MICROSERVICES] })
|
|
async pause(...[queueName]: ArgsOf<'queue.pause'>): Promise<void> {
|
|
await this.jobRepository.pause(queueName);
|
|
}
|
|
|
|
@OnEvent({ name: 'queue.resume', server: true, workers: [ImmichWorker.MICROSERVICES] })
|
|
async resume(...[queueName]: ArgsOf<'queue.resume'>): Promise<void> {
|
|
await this.jobRepository.resume(queueName);
|
|
}
|
|
|
|
@OnEvent({ name: 'job.start' })
|
|
async onJobStart(...[queueName, job]: ArgsOf<'job.start'>) {
|
|
const queueMetric = `immich.queues.${snakeCase(queueName)}.active`;
|
|
this.telemetryRepository.jobs.addToGauge(queueMetric, 1);
|
|
try {
|
|
const status = await this.jobRepository.run(job);
|
|
const jobMetric = `immich.jobs.${job.name.replaceAll('-', '_')}.${status}`;
|
|
this.telemetryRepository.jobs.addToCounter(jobMetric, 1);
|
|
if (status === JobStatus.SUCCESS || status == JobStatus.SKIPPED) {
|
|
await this.onDone(job);
|
|
}
|
|
} catch (error: Error | any) {
|
|
await this.eventRepository.emit('job.failed', { job, error });
|
|
} finally {
|
|
this.telemetryRepository.jobs.addToGauge(queueMetric, -1);
|
|
}
|
|
}
|
|
|
|
private isConcurrentQueue(name: QueueName): name is ConcurrentQueueName {
|
|
return ![
|
|
QueueName.FACIAL_RECOGNITION,
|
|
QueueName.STORAGE_TEMPLATE_MIGRATION,
|
|
QueueName.DUPLICATE_DETECTION,
|
|
QueueName.BACKUP_DATABASE,
|
|
].includes(name);
|
|
}
|
|
|
|
async handleNightlyJobs() {
|
|
await this.jobRepository.queueAll([
|
|
{ name: JobName.ASSET_DELETION_CHECK },
|
|
{ name: JobName.USER_DELETE_CHECK },
|
|
{ name: JobName.PERSON_CLEANUP },
|
|
{ name: JobName.MEMORIES_CLEANUP },
|
|
{ name: JobName.MEMORIES_CREATE },
|
|
{ name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force: false } },
|
|
{ name: JobName.CLEAN_OLD_AUDIT_LOGS },
|
|
{ name: JobName.USER_SYNC_USAGE },
|
|
{ name: JobName.QUEUE_FACIAL_RECOGNITION, data: { force: false, nightly: true } },
|
|
{ name: JobName.CLEAN_OLD_SESSION_TOKENS },
|
|
]);
|
|
}
|
|
|
|
/**
|
|
* Queue follow up jobs
|
|
*/
|
|
private async onDone(item: JobItem) {
|
|
switch (item.name) {
|
|
case JobName.SIDECAR_SYNC:
|
|
case JobName.SIDECAR_DISCOVERY: {
|
|
await this.jobRepository.queue({ name: JobName.METADATA_EXTRACTION, data: item.data });
|
|
break;
|
|
}
|
|
|
|
case JobName.SIDECAR_WRITE: {
|
|
await this.jobRepository.queue({
|
|
name: JobName.METADATA_EXTRACTION,
|
|
data: { id: item.data.id, source: 'sidecar-write' },
|
|
});
|
|
break;
|
|
}
|
|
|
|
case JobName.METADATA_EXTRACTION: {
|
|
if (item.data.source === 'sidecar-write') {
|
|
const [asset] = await this.assetRepository.getByIdsWithAllRelationsButStacks([item.data.id]);
|
|
if (asset) {
|
|
this.eventRepository.clientSend('on_asset_update', asset.ownerId, mapAsset(asset));
|
|
}
|
|
}
|
|
await this.jobRepository.queue({ name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: item.data });
|
|
break;
|
|
}
|
|
|
|
case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: {
|
|
if (item.data.source === 'upload' || item.data.source === 'copy') {
|
|
await this.jobRepository.queue({ name: JobName.GENERATE_THUMBNAILS, data: item.data });
|
|
}
|
|
break;
|
|
}
|
|
|
|
case JobName.GENERATE_PERSON_THUMBNAIL: {
|
|
const { id } = item.data;
|
|
const person = await this.personRepository.getById(id);
|
|
if (person) {
|
|
this.eventRepository.clientSend('on_person_thumbnail', person.ownerId, person.id);
|
|
}
|
|
break;
|
|
}
|
|
|
|
case JobName.GENERATE_THUMBNAILS: {
|
|
if (!item.data.notify && item.data.source !== 'upload') {
|
|
break;
|
|
}
|
|
|
|
const [asset] = await this.assetRepository.getByIdsWithAllRelationsButStacks([item.data.id]);
|
|
if (!asset) {
|
|
this.logger.warn(`Could not find asset ${item.data.id} after generating thumbnails`);
|
|
break;
|
|
}
|
|
|
|
const jobs: JobItem[] = [
|
|
{ name: JobName.SMART_SEARCH, data: item.data },
|
|
{ name: JobName.FACE_DETECTION, data: item.data },
|
|
];
|
|
|
|
if (asset.type === AssetType.VIDEO) {
|
|
jobs.push({ name: JobName.VIDEO_CONVERSION, data: item.data });
|
|
}
|
|
|
|
await this.jobRepository.queueAll(jobs);
|
|
if (asset.isVisible) {
|
|
this.eventRepository.clientSend('on_upload_success', asset.ownerId, mapAsset(asset));
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
case JobName.SMART_SEARCH: {
|
|
if (item.data.source === 'upload') {
|
|
await this.jobRepository.queue({ name: JobName.DUPLICATE_DETECTION, data: item.data });
|
|
}
|
|
break;
|
|
}
|
|
|
|
case JobName.USER_DELETION: {
|
|
this.eventRepository.clientBroadcast('on_user_delete', item.data.id);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|