forked from Cutlery/immich
		
	refactor(server): video transcode processor (#2163)
* refactor(server): video transcode processor * refactor: rename shouldRotate to isVideoVertical, remove unnecessary await * refactor: rename getOptions to getFfmpegOptions to be clearer in that context * fix: optimal preset converting vertical videos already smaller than target resolution --------- Co-authored-by: Zack Pollard <zackpollard@ymail.com>
This commit is contained in:
		
							parent
							
								
									ec6a7ae97c
								
							
						
					
					
						commit
						48393c215b
					
				@ -10,9 +10,9 @@ import {
 | 
			
		||||
  SearchIndexProcessor,
 | 
			
		||||
  StorageTemplateMigrationProcessor,
 | 
			
		||||
  ThumbnailGeneratorProcessor,
 | 
			
		||||
  VideoTranscodeProcessor,
 | 
			
		||||
} from './processors';
 | 
			
		||||
import { MetadataExtractionProcessor } from './processors/metadata-extraction.processor';
 | 
			
		||||
import { VideoTranscodeProcessor } from './processors/video-transcode.processor';
 | 
			
		||||
 | 
			
		||||
@Module({
 | 
			
		||||
  imports: [
 | 
			
		||||
 | 
			
		||||
@ -153,3 +153,18 @@ export class ThumbnailGeneratorProcessor {
 | 
			
		||||
    await this.mediaService.handleGenerateWepbThumbnail(job.data);
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@Processor(QueueName.VIDEO_CONVERSION)
 | 
			
		||||
export class VideoTranscodeProcessor {
 | 
			
		||||
  constructor(private mediaService: MediaService) {}
 | 
			
		||||
 | 
			
		||||
  @Process({ name: JobName.QUEUE_VIDEO_CONVERSION, concurrency: 1 })
 | 
			
		||||
  async onQueueVideoConversion(job: Job<IBaseJob>): Promise<void> {
 | 
			
		||||
    await this.mediaService.handleQueueVideoConversion(job.data);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Process({ name: JobName.VIDEO_CONVERSION, concurrency: 2 })
 | 
			
		||||
  async onVideoConversion(job: Job<IAssetJob>) {
 | 
			
		||||
    await this.mediaService.handleVideoConversion(job.data);
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1,184 +0,0 @@
 | 
			
		||||
import {
 | 
			
		||||
  IAssetJob,
 | 
			
		||||
  IAssetRepository,
 | 
			
		||||
  IBaseJob,
 | 
			
		||||
  IJobRepository,
 | 
			
		||||
  IStorageRepository,
 | 
			
		||||
  JobName,
 | 
			
		||||
  QueueName,
 | 
			
		||||
  StorageCore,
 | 
			
		||||
  StorageFolder,
 | 
			
		||||
  SystemConfigFFmpegDto,
 | 
			
		||||
  SystemConfigService,
 | 
			
		||||
  WithoutProperty,
 | 
			
		||||
} from '@app/domain';
 | 
			
		||||
import { AssetEntity, AssetType, TranscodePreset } from '@app/infra/entities';
 | 
			
		||||
import { Process, Processor } from '@nestjs/bull';
 | 
			
		||||
import { Inject, Logger } from '@nestjs/common';
 | 
			
		||||
import { Job } from 'bull';
 | 
			
		||||
import ffmpeg, { FfprobeData, FfprobeStream } from 'fluent-ffmpeg';
 | 
			
		||||
import { join } from 'path';
 | 
			
		||||
 | 
			
		||||
@Processor(QueueName.VIDEO_CONVERSION)
 | 
			
		||||
export class VideoTranscodeProcessor {
 | 
			
		||||
  readonly logger = new Logger(VideoTranscodeProcessor.name);
 | 
			
		||||
  private storageCore = new StorageCore();
 | 
			
		||||
 | 
			
		||||
  constructor(
 | 
			
		||||
    @Inject(IAssetRepository) private assetRepository: IAssetRepository,
 | 
			
		||||
    @Inject(IJobRepository) private jobRepository: IJobRepository,
 | 
			
		||||
    private systemConfigService: SystemConfigService,
 | 
			
		||||
    @Inject(IStorageRepository) private storageRepository: IStorageRepository,
 | 
			
		||||
  ) {}
 | 
			
		||||
 | 
			
		||||
  @Process({ name: JobName.QUEUE_VIDEO_CONVERSION, concurrency: 1 })
 | 
			
		||||
  async handleQueueVideoConversion(job: Job<IBaseJob>): Promise<void> {
 | 
			
		||||
    try {
 | 
			
		||||
      const { force } = job.data;
 | 
			
		||||
      const assets = force
 | 
			
		||||
        ? await this.assetRepository.getAll({ type: AssetType.VIDEO })
 | 
			
		||||
        : await this.assetRepository.getWithout(WithoutProperty.ENCODED_VIDEO);
 | 
			
		||||
      for (const asset of assets) {
 | 
			
		||||
        await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { asset } });
 | 
			
		||||
      }
 | 
			
		||||
    } catch (error: any) {
 | 
			
		||||
      this.logger.error('Failed to queue video conversions', error.stack);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @Process({ name: JobName.VIDEO_CONVERSION, concurrency: 2 })
 | 
			
		||||
  async handleVideoConversion(job: Job<IAssetJob>) {
 | 
			
		||||
    const { asset } = job.data;
 | 
			
		||||
 | 
			
		||||
    const encodedVideoPath = this.storageCore.getFolderLocation(StorageFolder.ENCODED_VIDEO, asset.ownerId);
 | 
			
		||||
 | 
			
		||||
    this.storageRepository.mkdirSync(encodedVideoPath);
 | 
			
		||||
 | 
			
		||||
    const savedEncodedPath = join(encodedVideoPath, `${asset.id}.mp4`);
 | 
			
		||||
 | 
			
		||||
    await this.runVideoEncode(asset, savedEncodedPath);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async runFFProbePipeline(asset: AssetEntity): Promise<FfprobeData> {
 | 
			
		||||
    return new Promise((resolve, reject) => {
 | 
			
		||||
      ffmpeg.ffprobe(asset.originalPath, (err, data) => {
 | 
			
		||||
        if (err || !data) {
 | 
			
		||||
          this.logger.error(`Cannot probe video ${err}`, 'runFFProbePipeline');
 | 
			
		||||
          reject(err);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        resolve(data);
 | 
			
		||||
      });
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async runVideoEncode(asset: AssetEntity, savedEncodedPath: string): Promise<void> {
 | 
			
		||||
    const config = await this.systemConfigService.getConfig();
 | 
			
		||||
    const videoStream = await this.getVideoStream(asset);
 | 
			
		||||
 | 
			
		||||
    const transcode = await this.needsTranscoding(videoStream, config.ffmpeg);
 | 
			
		||||
    if (transcode) {
 | 
			
		||||
      //TODO: If video or audio are already the correct format, don't re-encode, copy the stream
 | 
			
		||||
      return this.runFFMPEGPipeLine(asset, videoStream, savedEncodedPath);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async needsTranscoding(videoStream: FfprobeStream, ffmpegConfig: SystemConfigFFmpegDto): Promise<boolean> {
 | 
			
		||||
    switch (ffmpegConfig.transcode) {
 | 
			
		||||
      case TranscodePreset.ALL:
 | 
			
		||||
        return true;
 | 
			
		||||
 | 
			
		||||
      case TranscodePreset.REQUIRED:
 | 
			
		||||
        {
 | 
			
		||||
          if (videoStream.codec_name !== ffmpegConfig.targetVideoCodec) {
 | 
			
		||||
            return true;
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
        break;
 | 
			
		||||
 | 
			
		||||
      case TranscodePreset.OPTIMAL: {
 | 
			
		||||
        if (videoStream.codec_name !== ffmpegConfig.targetVideoCodec) {
 | 
			
		||||
          return true;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        const config = await this.systemConfigService.getConfig();
 | 
			
		||||
 | 
			
		||||
        const videoHeightThreshold = Number.parseInt(config.ffmpeg.targetResolution);
 | 
			
		||||
        return !videoStream.height || videoStream.height > videoHeightThreshold;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    return false;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async getVideoStream(asset: AssetEntity): Promise<ffmpeg.FfprobeStream> {
 | 
			
		||||
    const videoInfo = await this.runFFProbePipeline(asset);
 | 
			
		||||
 | 
			
		||||
    const videoStreams = videoInfo.streams.filter((stream) => {
 | 
			
		||||
      return stream.codec_type === 'video';
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    const longestVideoStream = videoStreams.sort((stream1, stream2) => {
 | 
			
		||||
      const stream1Frames = Number.parseInt(stream1.nb_frames ?? '0');
 | 
			
		||||
      const stream2Frames = Number.parseInt(stream2.nb_frames ?? '0');
 | 
			
		||||
      return stream2Frames - stream1Frames;
 | 
			
		||||
    })[0];
 | 
			
		||||
 | 
			
		||||
    return longestVideoStream;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async runFFMPEGPipeLine(asset: AssetEntity, videoStream: FfprobeStream, savedEncodedPath: string): Promise<void> {
 | 
			
		||||
    const config = await this.systemConfigService.getConfig();
 | 
			
		||||
 | 
			
		||||
    const ffmpegOptions = [
 | 
			
		||||
      `-crf ${config.ffmpeg.crf}`,
 | 
			
		||||
      `-preset ${config.ffmpeg.preset}`,
 | 
			
		||||
      `-vcodec ${config.ffmpeg.targetVideoCodec}`,
 | 
			
		||||
      `-acodec ${config.ffmpeg.targetAudioCodec}`,
 | 
			
		||||
      // Makes a second pass moving the moov atom to the beginning of
 | 
			
		||||
      // the file for improved playback speed.
 | 
			
		||||
      `-movflags faststart`,
 | 
			
		||||
    ];
 | 
			
		||||
 | 
			
		||||
    if (!videoStream.height || !videoStream.width) {
 | 
			
		||||
      this.logger.error('Height or width undefined for video stream');
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    const streamHeight = videoStream.height;
 | 
			
		||||
    const streamWidth = videoStream.width;
 | 
			
		||||
 | 
			
		||||
    const targetResolution = Number.parseInt(config.ffmpeg.targetResolution);
 | 
			
		||||
 | 
			
		||||
    let scaling = `-2:${targetResolution}`;
 | 
			
		||||
    const shouldScale = Math.min(streamHeight, streamWidth) > targetResolution;
 | 
			
		||||
 | 
			
		||||
    const videoIsRotated = Math.abs(Number.parseInt(`${videoStream.rotation ?? 0}`)) === 90;
 | 
			
		||||
 | 
			
		||||
    if (streamHeight > streamWidth || videoIsRotated) {
 | 
			
		||||
      scaling = `${targetResolution}:-2`;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (shouldScale) {
 | 
			
		||||
      ffmpegOptions.push(`-vf scale=${scaling}`);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return new Promise((resolve, reject) => {
 | 
			
		||||
      ffmpeg(asset.originalPath)
 | 
			
		||||
        .outputOptions(ffmpegOptions)
 | 
			
		||||
        .output(savedEncodedPath)
 | 
			
		||||
        .on('start', () => {
 | 
			
		||||
          this.logger.log('Start Converting Video');
 | 
			
		||||
        })
 | 
			
		||||
        .on('error', (error) => {
 | 
			
		||||
          this.logger.error(`Cannot Convert Video ${error}`);
 | 
			
		||||
          reject();
 | 
			
		||||
        })
 | 
			
		||||
        .on('end', async () => {
 | 
			
		||||
          this.logger.log(`Converting Success ${asset.id}`);
 | 
			
		||||
          await this.assetRepository.save({ id: asset.id, encodedVideoPath: savedEncodedPath });
 | 
			
		||||
          resolve();
 | 
			
		||||
        })
 | 
			
		||||
        .run();
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
@ -5,8 +5,26 @@ export interface ResizeOptions {
 | 
			
		||||
  format: 'webp' | 'jpeg';
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export interface IMediaRepository {
 | 
			
		||||
  resize(input: string, output: string, options: ResizeOptions): Promise<void>;
 | 
			
		||||
  extractVideoThumbnail(input: string, output: string, size: number): Promise<void>;
 | 
			
		||||
  extractThumbnailFromExif(input: string, output: string): Promise<void>;
 | 
			
		||||
export interface VideoStreamInfo {
 | 
			
		||||
  height: number;
 | 
			
		||||
  width: number;
 | 
			
		||||
  rotation: number;
 | 
			
		||||
  codecName?: string;
 | 
			
		||||
  codecType?: string;
 | 
			
		||||
  frameCount: number;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export interface VideoInfo {
 | 
			
		||||
  streams: VideoStreamInfo[];
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export interface IMediaRepository {
 | 
			
		||||
  // image
 | 
			
		||||
  extractThumbnailFromExif(input: string, output: string): Promise<void>;
 | 
			
		||||
  resize(input: string, output: string, options: ResizeOptions): Promise<void>;
 | 
			
		||||
 | 
			
		||||
  // video
 | 
			
		||||
  extractVideoThumbnail(input: string, output: string, size: number): Promise<void>;
 | 
			
		||||
  probe(input: string): Promise<VideoInfo>;
 | 
			
		||||
  transcode(input: string, output: string, options: any): Promise<void>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1,3 +1,4 @@
 | 
			
		||||
import { AssetType, SystemConfigKey } from '@app/infra/entities';
 | 
			
		||||
import _ from 'lodash';
 | 
			
		||||
import {
 | 
			
		||||
  assetEntityStub,
 | 
			
		||||
@ -6,17 +7,21 @@ import {
 | 
			
		||||
  newJobRepositoryMock,
 | 
			
		||||
  newMediaRepositoryMock,
 | 
			
		||||
  newStorageRepositoryMock,
 | 
			
		||||
  newSystemConfigRepositoryMock,
 | 
			
		||||
  probeStub,
 | 
			
		||||
} from '../../test';
 | 
			
		||||
import { IAssetRepository, WithoutProperty } from '../asset';
 | 
			
		||||
import { ICommunicationRepository } from '../communication';
 | 
			
		||||
import { IJobRepository, JobName } from '../job';
 | 
			
		||||
import { IStorageRepository } from '../storage';
 | 
			
		||||
import { ISystemConfigRepository } from '../system-config';
 | 
			
		||||
import { IMediaRepository } from './media.repository';
 | 
			
		||||
import { MediaService } from './media.service';
 | 
			
		||||
 | 
			
		||||
describe(MediaService.name, () => {
 | 
			
		||||
  let sut: MediaService;
 | 
			
		||||
  let assetMock: jest.Mocked<IAssetRepository>;
 | 
			
		||||
  let configMock: jest.Mocked<ISystemConfigRepository>;
 | 
			
		||||
  let communicationMock: jest.Mocked<ICommunicationRepository>;
 | 
			
		||||
  let jobMock: jest.Mocked<IJobRepository>;
 | 
			
		||||
  let mediaMock: jest.Mocked<IMediaRepository>;
 | 
			
		||||
@ -24,11 +29,12 @@ describe(MediaService.name, () => {
 | 
			
		||||
 | 
			
		||||
  beforeEach(async () => {
 | 
			
		||||
    assetMock = newAssetRepositoryMock();
 | 
			
		||||
    configMock = newSystemConfigRepositoryMock();
 | 
			
		||||
    communicationMock = newCommunicationRepositoryMock();
 | 
			
		||||
    jobMock = newJobRepositoryMock();
 | 
			
		||||
    mediaMock = newMediaRepositoryMock();
 | 
			
		||||
    storageMock = newStorageRepositoryMock();
 | 
			
		||||
    sut = new MediaService(assetMock, communicationMock, jobMock, mediaMock, storageMock);
 | 
			
		||||
    sut = new MediaService(assetMock, communicationMock, jobMock, mediaMock, storageMock, configMock);
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  it('should be defined', () => {
 | 
			
		||||
@ -169,4 +175,106 @@ describe(MediaService.name, () => {
 | 
			
		||||
      expect(mediaMock.resize).toHaveBeenCalled();
 | 
			
		||||
    });
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  describe('handleQueueVideoConversion', () => {
 | 
			
		||||
    it('should queue all video assets', async () => {
 | 
			
		||||
      assetMock.getAll.mockResolvedValue([assetEntityStub.video]);
 | 
			
		||||
 | 
			
		||||
      await sut.handleQueueVideoConversion({ force: true });
 | 
			
		||||
 | 
			
		||||
      expect(assetMock.getAll).toHaveBeenCalledWith({ type: AssetType.VIDEO });
 | 
			
		||||
      expect(assetMock.getWithout).not.toHaveBeenCalled();
 | 
			
		||||
      expect(jobMock.queue).toHaveBeenCalledWith({
 | 
			
		||||
        name: JobName.VIDEO_CONVERSION,
 | 
			
		||||
        data: { asset: assetEntityStub.video },
 | 
			
		||||
      });
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('should queue all video assets without encoded videos', async () => {
 | 
			
		||||
      assetMock.getWithout.mockResolvedValue([assetEntityStub.video]);
 | 
			
		||||
 | 
			
		||||
      await sut.handleQueueVideoConversion({});
 | 
			
		||||
 | 
			
		||||
      expect(assetMock.getAll).not.toHaveBeenCalled();
 | 
			
		||||
      expect(assetMock.getWithout).toHaveBeenCalledWith(WithoutProperty.ENCODED_VIDEO);
 | 
			
		||||
      expect(jobMock.queue).toHaveBeenCalledWith({
 | 
			
		||||
        name: JobName.VIDEO_CONVERSION,
 | 
			
		||||
        data: { asset: assetEntityStub.video },
 | 
			
		||||
      });
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('should log an error', async () => {
 | 
			
		||||
      assetMock.getAll.mockRejectedValue(new Error('database unavailable'));
 | 
			
		||||
 | 
			
		||||
      await sut.handleQueueVideoConversion({ force: true });
 | 
			
		||||
 | 
			
		||||
      expect(assetMock.getAll).toHaveBeenCalled();
 | 
			
		||||
    });
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  describe('handleVideoConversion', () => {
 | 
			
		||||
    it('should log an error', async () => {
 | 
			
		||||
      mediaMock.transcode.mockRejectedValue(new Error('unable to transcode'));
 | 
			
		||||
 | 
			
		||||
      await sut.handleVideoConversion({ asset: assetEntityStub.video });
 | 
			
		||||
 | 
			
		||||
      expect(storageMock.mkdirSync).toHaveBeenCalled();
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('should transcode the longest stream', async () => {
 | 
			
		||||
      mediaMock.probe.mockResolvedValue(probeStub.multiple);
 | 
			
		||||
 | 
			
		||||
      await sut.handleVideoConversion({ asset: assetEntityStub.video });
 | 
			
		||||
 | 
			
		||||
      expect(mediaMock.probe).toHaveBeenCalledWith('/original/path.ext');
 | 
			
		||||
      expect(configMock.load).toHaveBeenCalled();
 | 
			
		||||
      expect(storageMock.mkdirSync).toHaveBeenCalled();
 | 
			
		||||
      expect(mediaMock.transcode).toHaveBeenCalledWith(
 | 
			
		||||
        '/original/path.ext',
 | 
			
		||||
        'upload/encoded-video/user-id/asset-id.mp4',
 | 
			
		||||
        ['-crf 23', '-preset ultrafast', '-vcodec h264', '-acodec aac', '-movflags faststart'],
 | 
			
		||||
      );
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('should skip a video without any streams', async () => {
 | 
			
		||||
      mediaMock.probe.mockResolvedValue(probeStub.empty);
 | 
			
		||||
      await sut.handleVideoConversion({ asset: assetEntityStub.video });
 | 
			
		||||
      expect(mediaMock.transcode).not.toHaveBeenCalled();
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('should skip a video without any height', async () => {
 | 
			
		||||
      mediaMock.probe.mockResolvedValue(probeStub.noHeight);
 | 
			
		||||
      await sut.handleVideoConversion({ asset: assetEntityStub.video });
 | 
			
		||||
      expect(mediaMock.transcode).not.toHaveBeenCalled();
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('should transcode when set to all', async () => {
 | 
			
		||||
      mediaMock.probe.mockResolvedValue(probeStub.multiple);
 | 
			
		||||
      configMock.load.mockResolvedValue([{ key: SystemConfigKey.FFMPEG_TRANSCODE, value: 'all' }]);
 | 
			
		||||
      await sut.handleVideoConversion({ asset: assetEntityStub.video });
 | 
			
		||||
      expect(mediaMock.transcode).toHaveBeenCalledWith(
 | 
			
		||||
        '/original/path.ext',
 | 
			
		||||
        'upload/encoded-video/user-id/asset-id.mp4',
 | 
			
		||||
        ['-crf 23', '-preset ultrafast', '-vcodec h264', '-acodec aac', '-movflags faststart'],
 | 
			
		||||
      );
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('should transcode when optimal and too big', async () => {
 | 
			
		||||
      mediaMock.probe.mockResolvedValue(probeStub.tooBig);
 | 
			
		||||
      configMock.load.mockResolvedValue([{ key: SystemConfigKey.FFMPEG_TRANSCODE, value: 'optimal' }]);
 | 
			
		||||
      await sut.handleVideoConversion({ asset: assetEntityStub.video });
 | 
			
		||||
      expect(mediaMock.transcode).toHaveBeenCalledWith(
 | 
			
		||||
        '/original/path.ext',
 | 
			
		||||
        'upload/encoded-video/user-id/asset-id.mp4',
 | 
			
		||||
        ['-crf 23', '-preset ultrafast', '-vcodec h264', '-acodec aac', '-movflags faststart', '-vf scale=-2:720'],
 | 
			
		||||
      );
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('should not transcode an invalid transcode value', async () => {
 | 
			
		||||
      mediaMock.probe.mockResolvedValue(probeStub.tooBig);
 | 
			
		||||
      configMock.load.mockResolvedValue([{ key: SystemConfigKey.FFMPEG_TRANSCODE, value: 'invalid' }]);
 | 
			
		||||
      await sut.handleVideoConversion({ asset: assetEntityStub.video });
 | 
			
		||||
      expect(mediaMock.transcode).not.toHaveBeenCalled();
 | 
			
		||||
    });
 | 
			
		||||
  });
 | 
			
		||||
});
 | 
			
		||||
 | 
			
		||||
@ -1,16 +1,19 @@
 | 
			
		||||
import { AssetType } from '@app/infra/entities';
 | 
			
		||||
import { AssetType, TranscodePreset } from '@app/infra/entities';
 | 
			
		||||
import { Inject, Injectable, Logger } from '@nestjs/common';
 | 
			
		||||
import { join } from 'path';
 | 
			
		||||
import { IAssetRepository, mapAsset, WithoutProperty } from '../asset';
 | 
			
		||||
import { CommunicationEvent, ICommunicationRepository } from '../communication';
 | 
			
		||||
import { IAssetJob, IBaseJob, IJobRepository, JobName } from '../job';
 | 
			
		||||
import { IStorageRepository, StorageCore, StorageFolder } from '../storage';
 | 
			
		||||
import { IMediaRepository } from './media.repository';
 | 
			
		||||
import { ISystemConfigRepository, SystemConfigFFmpegDto } from '../system-config';
 | 
			
		||||
import { SystemConfigCore } from '../system-config/system-config.core';
 | 
			
		||||
import { IMediaRepository, VideoStreamInfo } from './media.repository';
 | 
			
		||||
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class MediaService {
 | 
			
		||||
  private logger = new Logger(MediaService.name);
 | 
			
		||||
  private storageCore = new StorageCore();
 | 
			
		||||
  private configCore: SystemConfigCore;
 | 
			
		||||
 | 
			
		||||
  constructor(
 | 
			
		||||
    @Inject(IAssetRepository) private assetRepository: IAssetRepository,
 | 
			
		||||
@ -18,7 +21,10 @@ export class MediaService {
 | 
			
		||||
    @Inject(IJobRepository) private jobRepository: IJobRepository,
 | 
			
		||||
    @Inject(IMediaRepository) private mediaRepository: IMediaRepository,
 | 
			
		||||
    @Inject(IStorageRepository) private storageRepository: IStorageRepository,
 | 
			
		||||
  ) {}
 | 
			
		||||
    @Inject(ISystemConfigRepository) systemConfig: ISystemConfigRepository,
 | 
			
		||||
  ) {
 | 
			
		||||
    this.configCore = new SystemConfigCore(systemConfig);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async handleQueueGenerateThumbnails(job: IBaseJob): Promise<void> {
 | 
			
		||||
    try {
 | 
			
		||||
@ -93,7 +99,114 @@ export class MediaService {
 | 
			
		||||
      await this.mediaRepository.resize(asset.resizePath, webpPath, { size: 250, format: 'webp' });
 | 
			
		||||
      await this.assetRepository.save({ id: asset.id, webpPath: webpPath });
 | 
			
		||||
    } catch (error: any) {
 | 
			
		||||
      this.logger.error('Failed to generate webp thumbnail for asset: ' + asset.id, error.stack);
 | 
			
		||||
      this.logger.error(`Failed to generate webp thumbnail for asset: ${asset.id}`, error.stack);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async handleQueueVideoConversion(job: IBaseJob) {
 | 
			
		||||
    const { force } = job;
 | 
			
		||||
 | 
			
		||||
    try {
 | 
			
		||||
      const assets = force
 | 
			
		||||
        ? await this.assetRepository.getAll({ type: AssetType.VIDEO })
 | 
			
		||||
        : await this.assetRepository.getWithout(WithoutProperty.ENCODED_VIDEO);
 | 
			
		||||
      for (const asset of assets) {
 | 
			
		||||
        await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { asset } });
 | 
			
		||||
      }
 | 
			
		||||
    } catch (error: any) {
 | 
			
		||||
      this.logger.error('Failed to queue video conversions', error.stack);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async handleVideoConversion(job: IAssetJob) {
 | 
			
		||||
    const { asset } = job;
 | 
			
		||||
 | 
			
		||||
    try {
 | 
			
		||||
      const input = asset.originalPath;
 | 
			
		||||
      const outputFolder = this.storageCore.getFolderLocation(StorageFolder.ENCODED_VIDEO, asset.ownerId);
 | 
			
		||||
      const output = join(outputFolder, `${asset.id}.mp4`);
 | 
			
		||||
      this.storageRepository.mkdirSync(outputFolder);
 | 
			
		||||
 | 
			
		||||
      const { streams } = await this.mediaRepository.probe(input);
 | 
			
		||||
      const stream = await this.getLongestStream(streams);
 | 
			
		||||
      if (!stream) {
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      const { ffmpeg: config } = await this.configCore.getConfig();
 | 
			
		||||
 | 
			
		||||
      const required = this.isTranscodeRequired(stream, config);
 | 
			
		||||
      if (!required) {
 | 
			
		||||
        return;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      const options = this.getFfmpegOptions(stream, config);
 | 
			
		||||
      await this.mediaRepository.transcode(input, output, options);
 | 
			
		||||
 | 
			
		||||
      this.logger.log(`Converting Success ${asset.id}`);
 | 
			
		||||
 | 
			
		||||
      await this.assetRepository.save({ id: asset.id, encodedVideoPath: output });
 | 
			
		||||
    } catch (error: any) {
 | 
			
		||||
      this.logger.error(`Failed to handle video conversion for asset: ${asset.id}`, error.stack);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private getLongestStream(streams: VideoStreamInfo[]): VideoStreamInfo | null {
 | 
			
		||||
    return streams
 | 
			
		||||
      .filter((stream) => stream.codecType === 'video')
 | 
			
		||||
      .sort((stream1, stream2) => stream2.frameCount - stream1.frameCount)[0];
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private isTranscodeRequired(stream: VideoStreamInfo, ffmpegConfig: SystemConfigFFmpegDto): boolean {
 | 
			
		||||
    if (!stream.height || !stream.width) {
 | 
			
		||||
      this.logger.error('Skipping transcode, height or width undefined for video stream');
 | 
			
		||||
      return false;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    const isTargetVideoCodec = stream.codecName === ffmpegConfig.targetVideoCodec;
 | 
			
		||||
 | 
			
		||||
    const targetResolution = Number.parseInt(ffmpegConfig.targetResolution);
 | 
			
		||||
    const isLargerThanTargetResolution = Math.min(stream.height, stream.width) > targetResolution;
 | 
			
		||||
 | 
			
		||||
    switch (ffmpegConfig.transcode) {
 | 
			
		||||
      case TranscodePreset.ALL:
 | 
			
		||||
        return true;
 | 
			
		||||
 | 
			
		||||
      case TranscodePreset.REQUIRED:
 | 
			
		||||
        return !isTargetVideoCodec;
 | 
			
		||||
 | 
			
		||||
      case TranscodePreset.OPTIMAL:
 | 
			
		||||
        return !isTargetVideoCodec || isLargerThanTargetResolution;
 | 
			
		||||
 | 
			
		||||
      default:
 | 
			
		||||
        return false;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private getFfmpegOptions(stream: VideoStreamInfo, ffmpeg: SystemConfigFFmpegDto) {
 | 
			
		||||
    // TODO: If video or audio are already the correct format, don't re-encode, copy the stream
 | 
			
		||||
 | 
			
		||||
    const options = [
 | 
			
		||||
      `-crf ${ffmpeg.crf}`,
 | 
			
		||||
      `-preset ${ffmpeg.preset}`,
 | 
			
		||||
      `-vcodec ${ffmpeg.targetVideoCodec}`,
 | 
			
		||||
      `-acodec ${ffmpeg.targetAudioCodec}`,
 | 
			
		||||
      // Makes a second pass moving the moov atom to the beginning of
 | 
			
		||||
      // the file for improved playback speed.
 | 
			
		||||
      `-movflags faststart`,
 | 
			
		||||
    ];
 | 
			
		||||
 | 
			
		||||
    const videoIsRotated = Math.abs(stream.rotation) === 90;
 | 
			
		||||
    const targetResolution = Number.parseInt(ffmpeg.targetResolution);
 | 
			
		||||
 | 
			
		||||
    const isVideoVertical = stream.height > stream.width || videoIsRotated;
 | 
			
		||||
    const scaling = isVideoVertical ? `${targetResolution}:-2` : `-2:${targetResolution}`;
 | 
			
		||||
 | 
			
		||||
    const shouldScale = Math.min(stream.height, stream.width) > targetResolution;
 | 
			
		||||
    if (shouldScale) {
 | 
			
		||||
      options.push(`-vf scale=${scaling}`);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return options;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -18,6 +18,7 @@ import {
 | 
			
		||||
  mapUser,
 | 
			
		||||
  SearchResult,
 | 
			
		||||
  SharedLinkResponseDto,
 | 
			
		||||
  VideoInfo,
 | 
			
		||||
} from '../src';
 | 
			
		||||
 | 
			
		||||
const today = new Date();
 | 
			
		||||
@ -704,3 +705,51 @@ export const searchStub = {
 | 
			
		||||
    facets: [],
 | 
			
		||||
  }),
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
export const probeStub = {
 | 
			
		||||
  empty: { streams: [] },
 | 
			
		||||
  multiple: Object.freeze<VideoInfo>({
 | 
			
		||||
    streams: [
 | 
			
		||||
      {
 | 
			
		||||
        height: 1080,
 | 
			
		||||
        width: 400,
 | 
			
		||||
        codecName: 'h265',
 | 
			
		||||
        codecType: 'video',
 | 
			
		||||
        frameCount: 100,
 | 
			
		||||
        rotation: 0,
 | 
			
		||||
      },
 | 
			
		||||
      {
 | 
			
		||||
        height: 1080,
 | 
			
		||||
        width: 400,
 | 
			
		||||
        codecName: 'h7000',
 | 
			
		||||
        codecType: 'video',
 | 
			
		||||
        frameCount: 99,
 | 
			
		||||
        rotation: 0,
 | 
			
		||||
      },
 | 
			
		||||
    ],
 | 
			
		||||
  }),
 | 
			
		||||
  noHeight: Object.freeze<VideoInfo>({
 | 
			
		||||
    streams: [
 | 
			
		||||
      {
 | 
			
		||||
        height: 0,
 | 
			
		||||
        width: 400,
 | 
			
		||||
        codecName: 'h265',
 | 
			
		||||
        codecType: 'video',
 | 
			
		||||
        frameCount: 100,
 | 
			
		||||
        rotation: 0,
 | 
			
		||||
      },
 | 
			
		||||
    ],
 | 
			
		||||
  }),
 | 
			
		||||
  tooBig: Object.freeze<VideoInfo>({
 | 
			
		||||
    streams: [
 | 
			
		||||
      {
 | 
			
		||||
        height: 10000,
 | 
			
		||||
        width: 10000,
 | 
			
		||||
        codecName: 'h264',
 | 
			
		||||
        codecType: 'video',
 | 
			
		||||
        frameCount: 100,
 | 
			
		||||
        rotation: 0,
 | 
			
		||||
      },
 | 
			
		||||
    ],
 | 
			
		||||
  }),
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
@ -5,5 +5,7 @@ export const newMediaRepositoryMock = (): jest.Mocked<IMediaRepository> => {
 | 
			
		||||
    extractThumbnailFromExif: jest.fn(),
 | 
			
		||||
    extractVideoThumbnail: jest.fn(),
 | 
			
		||||
    resize: jest.fn(),
 | 
			
		||||
    probe: jest.fn(),
 | 
			
		||||
    transcode: jest.fn(),
 | 
			
		||||
  };
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
@ -1,7 +1,10 @@
 | 
			
		||||
import { IMediaRepository, ResizeOptions } from '@app/domain';
 | 
			
		||||
import { IMediaRepository, ResizeOptions, VideoInfo } from '@app/domain';
 | 
			
		||||
import { exiftool } from 'exiftool-vendored';
 | 
			
		||||
import ffmpeg from 'fluent-ffmpeg';
 | 
			
		||||
import ffmpeg, { FfprobeData } from 'fluent-ffmpeg';
 | 
			
		||||
import sharp from 'sharp';
 | 
			
		||||
import { promisify } from 'util';
 | 
			
		||||
 | 
			
		||||
const probe = promisify<string, FfprobeData>(ffmpeg.ffprobe);
 | 
			
		||||
 | 
			
		||||
export class MediaRepository implements IMediaRepository {
 | 
			
		||||
  extractThumbnailFromExif(input: string, output: string): Promise<void> {
 | 
			
		||||
@ -42,4 +45,31 @@ export class MediaRepository implements IMediaRepository {
 | 
			
		||||
        .run();
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async probe(input: string): Promise<VideoInfo> {
 | 
			
		||||
    const results = await probe(input);
 | 
			
		||||
 | 
			
		||||
    return {
 | 
			
		||||
      streams: results.streams.map((stream) => ({
 | 
			
		||||
        height: stream.height || 0,
 | 
			
		||||
        width: stream.width || 0,
 | 
			
		||||
        codecName: stream.codec_name,
 | 
			
		||||
        codecType: stream.codec_type,
 | 
			
		||||
        frameCount: Number.parseInt(stream.nb_frames ?? '0'),
 | 
			
		||||
        rotation: Number.parseInt(`${stream.rotation ?? 0}`),
 | 
			
		||||
      })),
 | 
			
		||||
    };
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  transcode(input: string, output: string, options: string[]): Promise<void> {
 | 
			
		||||
    return new Promise((resolve, reject) => {
 | 
			
		||||
      ffmpeg(input)
 | 
			
		||||
        //
 | 
			
		||||
        .outputOptions(options)
 | 
			
		||||
        .output(output)
 | 
			
		||||
        .on('error', reject)
 | 
			
		||||
        .on('end', resolve)
 | 
			
		||||
        .run();
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user