1
0
forked from Cutlery/immich
immich-quadlet/server/apps/microservices/src/processors/generate-checksum.processor.ts
Thanh Pham a467936e73
feat(server): de-duplication (#557)
* feat(server): remove un-used deviceAssetId cols.

* feat(server): return 409 if asset is duplicated

* feat(server): replace old unique constaint

* feat(server): strip deviceId in file path

* feat(server): skip duplicate asset

* chore(server): revert changes

* fix(server): asset test spec

* fix(server): checksum generation for uploaded assets

* fix(server): make sure generation queue run after migraion

* feat(server): remove temp file

* chore(server): remove dead code
2022-09-05 14:45:38 -05:00

66 lines
1.8 KiB
TypeScript

import { AssetEntity } from '@app/database/entities/asset.entity';
import { generateChecksumQueueName } from '@app/job';
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { createHash } from 'node:crypto';
import fs from 'node:fs';
import { IsNull, Repository } from 'typeorm';
// TODO: just temporary task to generate previous uploaded assets.
@Processor(generateChecksumQueueName)
export class GenerateChecksumProcessor {
constructor(
@InjectRepository(AssetEntity)
private assetRepository: Repository<AssetEntity>,
) {}
@Process()
async generateChecksum() {
let hasNext = true;
let pageSize = 200;
while (hasNext) {
const assets = await this.assetRepository.find({
where: {
checksum: IsNull()
},
take: pageSize,
});
if (!assets?.length) {
hasNext = false; // avoid using break
} else {
for (const asset of assets) {
try {
await this.generateAssetChecksum(asset);
} catch (err: any) {
Logger.error(`Error generate checksum ${err}`);
}
}
if (assets.length < pageSize) {
hasNext = false;
}
}
}
}
private async generateAssetChecksum(asset: AssetEntity) {
if (!asset.originalPath) return;
if (!fs.existsSync(asset.originalPath)) return;
const fileReadStream = fs.createReadStream(asset.originalPath);
const sha1Hash = createHash('sha1');
const deferred = new Promise<Buffer>((resolve, reject) => {
sha1Hash.once('error', (err) => reject(err));
sha1Hash.once('finish', () => resolve(sha1Hash.read()));
});
fileReadStream.pipe(sha1Hash);
const checksum = await deferred;
await this.assetRepository.update(asset.id, { checksum });
}
}