mirror of
https://github.com/immich-app/immich.git
synced 2026-05-22 23:52:32 -04:00
chore!: remove old timeline sync endpoints (#27804)
This commit is contained in:
@@ -2,17 +2,8 @@ import { Body, Controller, Delete, Get, Header, HttpCode, HttpStatus, Post, Res
|
||||
import { ApiTags } from '@nestjs/swagger';
|
||||
import { Response } from 'express';
|
||||
import { Endpoint, HistoryBuilder } from 'src/decorators';
|
||||
import { AssetResponseDto } from 'src/dtos/asset-response.dto';
|
||||
import { AuthDto } from 'src/dtos/auth.dto';
|
||||
import {
|
||||
AssetDeltaSyncDto,
|
||||
AssetDeltaSyncResponseDto,
|
||||
AssetFullSyncDto,
|
||||
SyncAckDeleteDto,
|
||||
SyncAckDto,
|
||||
SyncAckSetDto,
|
||||
SyncStreamDto,
|
||||
} from 'src/dtos/sync.dto';
|
||||
import { SyncAckDeleteDto, SyncAckDto, SyncAckSetDto, SyncStreamDto } from 'src/dtos/sync.dto';
|
||||
import { ApiTag, Permission } from 'src/enum';
|
||||
import { Auth, Authenticated } from 'src/middleware/auth.guard';
|
||||
import { GlobalExceptionFilter } from 'src/middleware/global-exception.filter';
|
||||
@@ -26,30 +17,6 @@ export class SyncController {
|
||||
private errorService: GlobalExceptionFilter,
|
||||
) {}
|
||||
|
||||
@Post('full-sync')
|
||||
@Authenticated()
|
||||
@HttpCode(HttpStatus.OK)
|
||||
@Endpoint({
|
||||
summary: 'Get full sync for user',
|
||||
description: 'Retrieve all assets for a full synchronization for the authenticated user.',
|
||||
history: new HistoryBuilder().added('v1').deprecated('v2'),
|
||||
})
|
||||
getFullSyncForUser(@Auth() auth: AuthDto, @Body() dto: AssetFullSyncDto): Promise<AssetResponseDto[]> {
|
||||
return this.service.getFullSync(auth, dto);
|
||||
}
|
||||
|
||||
@Post('delta-sync')
|
||||
@Authenticated()
|
||||
@HttpCode(HttpStatus.OK)
|
||||
@Endpoint({
|
||||
summary: 'Get delta sync for user',
|
||||
description: 'Retrieve changed assets since the last sync for the authenticated user.',
|
||||
history: new HistoryBuilder().added('v1').deprecated('v2'),
|
||||
})
|
||||
getDeltaSync(@Auth() auth: AuthDto, @Body() dto: AssetDeltaSyncDto): Promise<AssetDeltaSyncResponseDto> {
|
||||
return this.service.getDeltaSync(auth, dto);
|
||||
}
|
||||
|
||||
@Post('stream')
|
||||
@Authenticated({ permission: Permission.SyncStream })
|
||||
@Header('Content-Type', 'application/jsonlines+json')
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
/* eslint-disable @typescript-eslint/no-unsafe-function-type */
|
||||
import { createZodDto } from 'nestjs-zod';
|
||||
import { AssetResponseSchema } from 'src/dtos/asset-response.dto';
|
||||
import { AssetEditActionSchema } from 'src/dtos/editing.dto';
|
||||
import {
|
||||
AlbumUserRoleSchema,
|
||||
@@ -17,36 +16,6 @@ import {
|
||||
import { isoDatetimeToDate } from 'src/validation';
|
||||
import z from 'zod';
|
||||
|
||||
const AssetFullSyncSchema = z
|
||||
.object({
|
||||
lastId: z.uuidv4().optional().describe('Last asset ID (pagination)'),
|
||||
updatedUntil: isoDatetimeToDate.describe('Sync assets updated until this date'),
|
||||
limit: z.int().min(1).describe('Maximum number of assets to return'),
|
||||
userId: z.uuidv4().optional().describe('Filter by user ID'),
|
||||
})
|
||||
.meta({ id: 'AssetFullSyncDto' });
|
||||
|
||||
const AssetDeltaSyncSchema = z
|
||||
.object({
|
||||
updatedAfter: isoDatetimeToDate.describe('Sync assets updated after this date'),
|
||||
userIds: z.array(z.uuidv4()).describe('User IDs to sync'),
|
||||
})
|
||||
.meta({ id: 'AssetDeltaSyncDto' });
|
||||
|
||||
export class AssetFullSyncDto extends createZodDto(AssetFullSyncSchema) {}
|
||||
export class AssetDeltaSyncDto extends createZodDto(AssetDeltaSyncSchema) {}
|
||||
|
||||
const AssetDeltaSyncResponseSchema = z
|
||||
.object({
|
||||
needsFullSync: z.boolean().describe('Whether full sync is needed'),
|
||||
upserted: z.array(AssetResponseSchema),
|
||||
deleted: z.array(z.string()).describe('Deleted asset IDs'),
|
||||
})
|
||||
.describe('Asset delta sync response')
|
||||
.meta({ id: 'AssetDeltaSyncResponseDto' });
|
||||
|
||||
export class AssetDeltaSyncResponseDto extends createZodDto(AssetDeltaSyncResponseSchema) {}
|
||||
|
||||
export const extraSyncModels: Function[] = [];
|
||||
|
||||
const ExtraModel = (): ClassDecorator => {
|
||||
|
||||
@@ -88,21 +88,6 @@ export enum AssetOrder {
|
||||
|
||||
export const AssetOrderSchema = z.enum(AssetOrder).describe('Asset sort order').meta({ id: 'AssetOrder' });
|
||||
|
||||
export enum DatabaseAction {
|
||||
Create = 'CREATE',
|
||||
Update = 'UPDATE',
|
||||
Delete = 'DELETE',
|
||||
}
|
||||
|
||||
export const DatabaseActionSchema = z.enum(DatabaseAction).describe('Database action').meta({ id: 'DatabaseAction' });
|
||||
|
||||
export enum EntityType {
|
||||
Asset = 'ASSET',
|
||||
Album = 'ALBUM',
|
||||
}
|
||||
|
||||
export const EntityTypeSchema = z.enum(EntityType).describe('Entity type').meta({ id: 'EntityType' });
|
||||
|
||||
export enum MemoryType {
|
||||
/** pictures taken on this day X years ago */
|
||||
OnThisDay = 'on_this_day',
|
||||
@@ -761,7 +746,6 @@ export enum JobName {
|
||||
AssetGenerateThumbnailsQueueAll = 'AssetGenerateThumbnailsQueueAll',
|
||||
AssetGenerateThumbnails = 'AssetGenerateThumbnails',
|
||||
|
||||
AuditLogCleanup = 'AuditLogCleanup',
|
||||
AuditTableCleanup = 'AuditTableCleanup',
|
||||
|
||||
DatabaseBackup = 'DatabaseBackup',
|
||||
|
||||
@@ -497,63 +497,6 @@ where
|
||||
limit
|
||||
$5
|
||||
|
||||
-- AssetRepository.getAllForUserFullSync
|
||||
select
|
||||
"asset".*,
|
||||
to_json("asset_exif") as "exifInfo",
|
||||
to_json("stacked_assets") as "stack"
|
||||
from
|
||||
"asset"
|
||||
left join "asset_exif" on "asset"."id" = "asset_exif"."assetId"
|
||||
left join "stack" on "stack"."id" = "asset"."stackId"
|
||||
left join lateral (
|
||||
select
|
||||
"stack".*,
|
||||
count("stacked") as "assetCount"
|
||||
from
|
||||
"asset" as "stacked"
|
||||
where
|
||||
"stacked"."stackId" = "stack"."id"
|
||||
group by
|
||||
"stack"."id"
|
||||
) as "stacked_assets" on "stack"."id" is not null
|
||||
where
|
||||
"asset"."ownerId" = $1::uuid
|
||||
and "asset"."visibility" != $2
|
||||
and "asset"."updatedAt" <= $3
|
||||
and "asset"."id" > $4
|
||||
order by
|
||||
"asset"."id"
|
||||
limit
|
||||
$5
|
||||
|
||||
-- AssetRepository.getChangedDeltaSync
|
||||
select
|
||||
"asset".*,
|
||||
to_json("asset_exif") as "exifInfo",
|
||||
to_json("stacked_assets") as "stack"
|
||||
from
|
||||
"asset"
|
||||
left join "asset_exif" on "asset"."id" = "asset_exif"."assetId"
|
||||
left join "stack" on "stack"."id" = "asset"."stackId"
|
||||
left join lateral (
|
||||
select
|
||||
"stack".*,
|
||||
count("stacked") as "assetCount"
|
||||
from
|
||||
"asset" as "stacked"
|
||||
where
|
||||
"stacked"."stackId" = "stack"."id"
|
||||
group by
|
||||
"stack"."id"
|
||||
) as "stacked_assets" on "stack"."id" is not null
|
||||
where
|
||||
"asset"."ownerId" = any ($1::uuid[])
|
||||
and "asset"."visibility" != $2
|
||||
and "asset"."updatedAt" > $3
|
||||
limit
|
||||
$4
|
||||
|
||||
-- AssetRepository.detectOfflineExternalAssets
|
||||
update "asset"
|
||||
set
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
-- NOTE: This file is auto generated by ./sql-generator
|
||||
|
||||
-- AuditRepository.getAfter
|
||||
select distinct
|
||||
on ("audit"."entityId", "audit"."entityType") "audit"."entityId"
|
||||
from
|
||||
"audit"
|
||||
where
|
||||
"audit"."createdAt" > $1
|
||||
and "audit"."action" = $2
|
||||
and "audit"."entityType" = $3
|
||||
and "audit"."ownerId" in ($4)
|
||||
order by
|
||||
"audit"."entityId" desc,
|
||||
"audit"."entityType" desc,
|
||||
"audit"."createdAt" desc
|
||||
@@ -106,19 +106,6 @@ interface AssetExploreFieldOptions {
|
||||
minAssetsPerField: number;
|
||||
}
|
||||
|
||||
interface AssetFullSyncOptions {
|
||||
ownerId: string;
|
||||
lastId?: string;
|
||||
updatedUntil: Date;
|
||||
limit: number;
|
||||
}
|
||||
|
||||
interface AssetDeltaSyncOptions {
|
||||
userIds: string[];
|
||||
updatedAfter: Date;
|
||||
limit: number;
|
||||
}
|
||||
|
||||
interface AssetGetByChecksumOptions {
|
||||
ownerId: string;
|
||||
checksum: Buffer;
|
||||
@@ -905,70 +892,6 @@ export class AssetRepository {
|
||||
return { fieldName: 'exifInfo.city', items };
|
||||
}
|
||||
|
||||
@GenerateSql({
|
||||
params: [
|
||||
{
|
||||
ownerId: DummyValue.UUID,
|
||||
lastId: DummyValue.UUID,
|
||||
updatedUntil: DummyValue.DATE,
|
||||
limit: 10,
|
||||
},
|
||||
],
|
||||
})
|
||||
getAllForUserFullSync(options: AssetFullSyncOptions) {
|
||||
const { ownerId, lastId, updatedUntil, limit } = options;
|
||||
return this.db
|
||||
.selectFrom('asset')
|
||||
.selectAll('asset')
|
||||
.$call(withExif)
|
||||
.leftJoin('stack', 'stack.id', 'asset.stackId')
|
||||
.leftJoinLateral(
|
||||
(eb) =>
|
||||
eb
|
||||
.selectFrom('asset as stacked')
|
||||
.selectAll('stack')
|
||||
.select((eb) => eb.fn.count(eb.table('stacked')).as('assetCount'))
|
||||
.whereRef('stacked.stackId', '=', 'stack.id')
|
||||
.groupBy('stack.id')
|
||||
.as('stacked_assets'),
|
||||
(join) => join.on('stack.id', 'is not', null),
|
||||
)
|
||||
.select((eb) => eb.fn.toJson(eb.table('stacked_assets')).$castTo<Stack | null>().as('stack'))
|
||||
.where('asset.ownerId', '=', asUuid(ownerId))
|
||||
.where('asset.visibility', '!=', AssetVisibility.Hidden)
|
||||
.where('asset.updatedAt', '<=', updatedUntil)
|
||||
.$if(!!lastId, (qb) => qb.where('asset.id', '>', lastId!))
|
||||
.orderBy('asset.id')
|
||||
.limit(limit)
|
||||
.execute();
|
||||
}
|
||||
|
||||
@GenerateSql({ params: [{ userIds: [DummyValue.UUID], updatedAfter: DummyValue.DATE, limit: 100 }] })
|
||||
async getChangedDeltaSync(options: AssetDeltaSyncOptions) {
|
||||
return this.db
|
||||
.selectFrom('asset')
|
||||
.selectAll('asset')
|
||||
.$call(withExif)
|
||||
.leftJoin('stack', 'stack.id', 'asset.stackId')
|
||||
.leftJoinLateral(
|
||||
(eb) =>
|
||||
eb
|
||||
.selectFrom('asset as stacked')
|
||||
.selectAll('stack')
|
||||
.select((eb) => eb.fn.count(eb.table('stacked')).as('assetCount'))
|
||||
.whereRef('stacked.stackId', '=', 'stack.id')
|
||||
.groupBy('stack.id')
|
||||
.as('stacked_assets'),
|
||||
(join) => join.on('stack.id', 'is not', null),
|
||||
)
|
||||
.select((eb) => eb.fn.toJson(eb.table('stacked_assets').$castTo<Stack | null>()).as('stack'))
|
||||
.where('asset.ownerId', '=', anyUuid(options.userIds))
|
||||
.where('asset.visibility', '!=', AssetVisibility.Hidden)
|
||||
.where('asset.updatedAt', '>', options.updatedAfter)
|
||||
.limit(options.limit)
|
||||
.execute();
|
||||
}
|
||||
|
||||
async upsertFile(
|
||||
file: Pick<
|
||||
Insertable<AssetFileTable>,
|
||||
|
||||
@@ -1,44 +0,0 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Kysely } from 'kysely';
|
||||
import { InjectKysely } from 'nestjs-kysely';
|
||||
import { DummyValue, GenerateSql } from 'src/decorators';
|
||||
import { DatabaseAction, EntityType } from 'src/enum';
|
||||
import { DB } from 'src/schema';
|
||||
|
||||
export interface AuditSearch {
|
||||
action?: DatabaseAction;
|
||||
entityType?: EntityType;
|
||||
userIds: string[];
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class AuditRepository {
|
||||
constructor(@InjectKysely() private db: Kysely<DB>) {}
|
||||
|
||||
@GenerateSql({
|
||||
params: [
|
||||
DummyValue.DATE,
|
||||
{ action: DatabaseAction.Create, entityType: EntityType.Asset, userIds: [DummyValue.UUID] },
|
||||
],
|
||||
})
|
||||
async getAfter(since: Date, options: AuditSearch): Promise<string[]> {
|
||||
const records = await this.db
|
||||
.selectFrom('audit')
|
||||
.where('audit.createdAt', '>', since)
|
||||
.$if(!!options.action, (qb) => qb.where('audit.action', '=', options.action!))
|
||||
.$if(!!options.entityType, (qb) => qb.where('audit.entityType', '=', options.entityType!))
|
||||
.where('audit.ownerId', 'in', options.userIds)
|
||||
.distinctOn(['audit.entityId', 'audit.entityType'])
|
||||
.orderBy('audit.entityId', 'desc')
|
||||
.orderBy('audit.entityType', 'desc')
|
||||
.orderBy('audit.createdAt', 'desc')
|
||||
.select('audit.entityId')
|
||||
.execute();
|
||||
|
||||
return records.map(({ entityId }) => entityId);
|
||||
}
|
||||
|
||||
async removeBefore(before: Date): Promise<void> {
|
||||
await this.db.deleteFrom('audit').where('createdAt', '<', before).execute();
|
||||
}
|
||||
}
|
||||
@@ -7,7 +7,6 @@ import { AppRepository } from 'src/repositories/app.repository';
|
||||
import { AssetEditRepository } from 'src/repositories/asset-edit.repository';
|
||||
import { AssetJobRepository } from 'src/repositories/asset-job.repository';
|
||||
import { AssetRepository } from 'src/repositories/asset.repository';
|
||||
import { AuditRepository } from 'src/repositories/audit.repository';
|
||||
import { ConfigRepository } from 'src/repositories/config.repository';
|
||||
import { CronRepository } from 'src/repositories/cron.repository';
|
||||
import { CryptoRepository } from 'src/repositories/crypto.repository';
|
||||
@@ -56,7 +55,6 @@ export const repositories = [
|
||||
ActivityRepository,
|
||||
AlbumRepository,
|
||||
AlbumUserRepository,
|
||||
AuditRepository,
|
||||
ApiKeyRepository,
|
||||
AppRepository,
|
||||
AssetRepository,
|
||||
|
||||
@@ -40,7 +40,6 @@ import { AssetMetadataAuditTable } from 'src/schema/tables/asset-metadata-audit.
|
||||
import { AssetMetadataTable } from 'src/schema/tables/asset-metadata.table';
|
||||
import { AssetOcrTable } from 'src/schema/tables/asset-ocr.table';
|
||||
import { AssetTable } from 'src/schema/tables/asset.table';
|
||||
import { AuditTable } from 'src/schema/tables/audit.table';
|
||||
import { FaceSearchTable } from 'src/schema/tables/face-search.table';
|
||||
import { GeodataPlacesTable } from 'src/schema/tables/geodata-places.table';
|
||||
import { LibraryTable } from 'src/schema/tables/library.table';
|
||||
@@ -98,7 +97,6 @@ export class ImmichDatabase {
|
||||
AssetOcrTable,
|
||||
AssetTable,
|
||||
AssetFileTable,
|
||||
AuditTable,
|
||||
AssetExifTable,
|
||||
FaceSearchTable,
|
||||
GeodataPlacesTable,
|
||||
@@ -197,8 +195,6 @@ export interface DB {
|
||||
asset_ocr: AssetOcrTable;
|
||||
ocr_search: OcrSearchTable;
|
||||
|
||||
audit: AuditTable;
|
||||
|
||||
face_search: FaceSearchTable;
|
||||
|
||||
geodata_places: GeodataPlacesTable;
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
import { Kysely, sql } from 'kysely';
|
||||
|
||||
export async function up(db: Kysely<any>): Promise<void> {
|
||||
await sql`DROP TABLE "audit";`.execute(db);
|
||||
}
|
||||
|
||||
export async function down(db: Kysely<any>): Promise<void> {
|
||||
await sql`CREATE TABLE "audit" (
|
||||
"id" serial NOT NULL,
|
||||
"entityType" character varying NOT NULL,
|
||||
"entityId" uuid NOT NULL,
|
||||
"action" character varying NOT NULL,
|
||||
"ownerId" uuid NOT NULL,
|
||||
"createdAt" timestamp with time zone NOT NULL DEFAULT now(),
|
||||
CONSTRAINT "audit_pkey" PRIMARY KEY ("id")
|
||||
);`.execute(db);
|
||||
await sql`CREATE INDEX "audit_ownerId_createdAt_idx" ON "audit" ("ownerId", "createdAt");`.execute(db);
|
||||
}
|
||||
@@ -1,24 +0,0 @@
|
||||
import { Column, CreateDateColumn, Generated, Index, PrimaryColumn, Table, Timestamp } from '@immich/sql-tools';
|
||||
import { DatabaseAction, EntityType } from 'src/enum';
|
||||
|
||||
@Table('audit')
|
||||
@Index({ columns: ['ownerId', 'createdAt'] })
|
||||
export class AuditTable {
|
||||
@PrimaryColumn({ type: 'serial', synchronize: false })
|
||||
id!: Generated<number>;
|
||||
|
||||
@Column()
|
||||
entityType!: EntityType;
|
||||
|
||||
@Column({ type: 'uuid' })
|
||||
entityId!: string;
|
||||
|
||||
@Column()
|
||||
action!: DatabaseAction;
|
||||
|
||||
@Column({ type: 'uuid' })
|
||||
ownerId!: string;
|
||||
|
||||
@CreateDateColumn()
|
||||
createdAt!: Generated<Timestamp>;
|
||||
}
|
||||
@@ -1,26 +0,0 @@
|
||||
import { JobStatus } from 'src/enum';
|
||||
import { AuditService } from 'src/services/audit.service';
|
||||
import { newTestService, ServiceMocks } from 'test/utils';
|
||||
|
||||
describe(AuditService.name, () => {
|
||||
let sut: AuditService;
|
||||
let mocks: ServiceMocks;
|
||||
|
||||
beforeEach(() => {
|
||||
({ sut, mocks } = newTestService(AuditService));
|
||||
});
|
||||
|
||||
it('should work', () => {
|
||||
expect(sut).toBeDefined();
|
||||
});
|
||||
|
||||
describe('handleCleanup', () => {
|
||||
it('should delete old audit entries', async () => {
|
||||
mocks.audit.removeBefore.mockResolvedValue();
|
||||
|
||||
await expect(sut.handleCleanup()).resolves.toBe(JobStatus.Success);
|
||||
|
||||
expect(mocks.audit.removeBefore).toHaveBeenCalledWith(expect.any(Date));
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,15 +0,0 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { DateTime } from 'luxon';
|
||||
import { AUDIT_LOG_MAX_DURATION } from 'src/constants';
|
||||
import { OnJob } from 'src/decorators';
|
||||
import { JobName, JobStatus, QueueName } from 'src/enum';
|
||||
import { BaseService } from 'src/services/base.service';
|
||||
|
||||
@Injectable()
|
||||
export class AuditService extends BaseService {
|
||||
@OnJob({ name: JobName.AuditLogCleanup, queue: QueueName.BackgroundTask })
|
||||
async handleCleanup(): Promise<JobStatus> {
|
||||
await this.auditRepository.removeBefore(DateTime.now().minus(AUDIT_LOG_MAX_DURATION).toJSDate());
|
||||
return JobStatus.Success;
|
||||
}
|
||||
}
|
||||
@@ -14,7 +14,6 @@ import { AppRepository } from 'src/repositories/app.repository';
|
||||
import { AssetEditRepository } from 'src/repositories/asset-edit.repository';
|
||||
import { AssetJobRepository } from 'src/repositories/asset-job.repository';
|
||||
import { AssetRepository } from 'src/repositories/asset.repository';
|
||||
import { AuditRepository } from 'src/repositories/audit.repository';
|
||||
import { ConfigRepository } from 'src/repositories/config.repository';
|
||||
import { CronRepository } from 'src/repositories/cron.repository';
|
||||
import { CryptoRepository } from 'src/repositories/crypto.repository';
|
||||
@@ -72,7 +71,6 @@ export const BASE_SERVICE_DEPENDENCIES = [
|
||||
AssetRepository,
|
||||
AssetEditRepository,
|
||||
AssetJobRepository,
|
||||
AuditRepository,
|
||||
ConfigRepository,
|
||||
CronRepository,
|
||||
CryptoRepository,
|
||||
@@ -131,7 +129,6 @@ export class BaseService {
|
||||
protected assetRepository: AssetRepository,
|
||||
protected assetEditRepository: AssetEditRepository,
|
||||
protected assetJobRepository: AssetJobRepository,
|
||||
protected auditRepository: AuditRepository,
|
||||
protected configRepository: ConfigRepository,
|
||||
protected cronRepository: CronRepository,
|
||||
protected cryptoRepository: CryptoRepository,
|
||||
|
||||
@@ -4,7 +4,6 @@ import { ApiKeyService } from 'src/services/api-key.service';
|
||||
import { ApiService } from 'src/services/api.service';
|
||||
import { AssetMediaService } from 'src/services/asset-media.service';
|
||||
import { AssetService } from 'src/services/asset.service';
|
||||
import { AuditService } from 'src/services/audit.service';
|
||||
import { AuthAdminService } from 'src/services/auth-admin.service';
|
||||
import { AuthService } from 'src/services/auth.service';
|
||||
import { CliService } from 'src/services/cli.service';
|
||||
@@ -54,7 +53,6 @@ export const services = [
|
||||
ApiService,
|
||||
AssetMediaService,
|
||||
AssetService,
|
||||
AuditService,
|
||||
AuthService,
|
||||
AuthAdminService,
|
||||
CliService,
|
||||
|
||||
@@ -42,7 +42,6 @@ describe(QueueService.name, () => {
|
||||
{ name: JobName.MemoryCleanup },
|
||||
{ name: JobName.SessionCleanup },
|
||||
{ name: JobName.AuditTableCleanup },
|
||||
{ name: JobName.AuditLogCleanup },
|
||||
{ name: JobName.MemoryGenerate },
|
||||
{ name: JobName.UserSyncUsage },
|
||||
{ name: JobName.AssetGenerateThumbnailsQueueAll, data: { force: false } },
|
||||
|
||||
@@ -270,7 +270,6 @@ export class QueueService extends BaseService {
|
||||
{ name: JobName.MemoryCleanup },
|
||||
{ name: JobName.SessionCleanup },
|
||||
{ name: JobName.AuditTableCleanup },
|
||||
{ name: JobName.AuditLogCleanup },
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,98 +0,0 @@
|
||||
import { mapAsset } from 'src/dtos/asset-response.dto';
|
||||
import { SyncService } from 'src/services/sync.service';
|
||||
import { AssetFactory } from 'test/factories/asset.factory';
|
||||
import { PartnerFactory } from 'test/factories/partner.factory';
|
||||
import { authStub } from 'test/fixtures/auth.stub';
|
||||
import { getForAsset, getForPartner } from 'test/mappers';
|
||||
import { factory } from 'test/small.factory';
|
||||
import { newTestService, ServiceMocks } from 'test/utils';
|
||||
|
||||
const untilDate = new Date(2024);
|
||||
const mapAssetOpts = { auth: authStub.user1, stripMetadata: false, withStack: true };
|
||||
|
||||
describe(SyncService.name, () => {
|
||||
let sut: SyncService;
|
||||
let mocks: ServiceMocks;
|
||||
|
||||
beforeEach(() => {
|
||||
({ sut, mocks } = newTestService(SyncService));
|
||||
});
|
||||
|
||||
it('should exist', () => {
|
||||
expect(sut).toBeDefined();
|
||||
});
|
||||
|
||||
describe('getAllAssetsForUserFullSync', () => {
|
||||
it('should return a list of all assets owned by the user', async () => {
|
||||
const [asset1, asset2] = [
|
||||
AssetFactory.from({ libraryId: 'library-id', isExternal: true }).owner(authStub.user1.user).build(),
|
||||
AssetFactory.from().owner(authStub.user1.user).build(),
|
||||
];
|
||||
mocks.asset.getAllForUserFullSync.mockResolvedValue([getForAsset(asset1), getForAsset(asset2)]);
|
||||
await expect(sut.getFullSync(authStub.user1, { limit: 2, updatedUntil: untilDate })).resolves.toEqual([
|
||||
mapAsset(getForAsset(asset1), mapAssetOpts),
|
||||
mapAsset(getForAsset(asset2), mapAssetOpts),
|
||||
]);
|
||||
expect(mocks.asset.getAllForUserFullSync).toHaveBeenCalledWith({
|
||||
ownerId: authStub.user1.user.id,
|
||||
updatedUntil: untilDate,
|
||||
limit: 2,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('getChangesForDeltaSync', () => {
|
||||
it('should return a response requiring a full sync when partners are out of sync', async () => {
|
||||
const partner = PartnerFactory.create();
|
||||
const auth = factory.auth({ user: { id: partner.sharedWithId } });
|
||||
|
||||
mocks.partner.getAll.mockResolvedValue([getForPartner(partner)]);
|
||||
|
||||
await expect(
|
||||
sut.getDeltaSync(authStub.user1, { updatedAfter: new Date(), userIds: [auth.user.id] }),
|
||||
).resolves.toEqual({ needsFullSync: true, upserted: [], deleted: [] });
|
||||
|
||||
expect(mocks.asset.getChangedDeltaSync).toHaveBeenCalledTimes(0);
|
||||
expect(mocks.audit.getAfter).toHaveBeenCalledTimes(0);
|
||||
});
|
||||
|
||||
it('should return a response requiring a full sync when last sync was too long ago', async () => {
|
||||
mocks.partner.getAll.mockResolvedValue([]);
|
||||
await expect(
|
||||
sut.getDeltaSync(authStub.user1, { updatedAfter: new Date(2000), userIds: [authStub.user1.user.id] }),
|
||||
).resolves.toEqual({ needsFullSync: true, upserted: [], deleted: [] });
|
||||
expect(mocks.asset.getChangedDeltaSync).toHaveBeenCalledTimes(0);
|
||||
expect(mocks.audit.getAfter).toHaveBeenCalledTimes(0);
|
||||
});
|
||||
|
||||
it('should return a response requiring a full sync when there are too many changes', async () => {
|
||||
const asset = AssetFactory.create();
|
||||
mocks.partner.getAll.mockResolvedValue([]);
|
||||
mocks.asset.getChangedDeltaSync.mockResolvedValue(
|
||||
Array.from<ReturnType<typeof getForAsset>>({ length: 10_000 }).fill(getForAsset(asset)),
|
||||
);
|
||||
await expect(
|
||||
sut.getDeltaSync(authStub.user1, { updatedAfter: new Date(), userIds: [authStub.user1.user.id] }),
|
||||
).resolves.toEqual({ needsFullSync: true, upserted: [], deleted: [] });
|
||||
expect(mocks.asset.getChangedDeltaSync).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.audit.getAfter).toHaveBeenCalledTimes(0);
|
||||
});
|
||||
|
||||
it('should return a response with changes and deletions', async () => {
|
||||
const asset = AssetFactory.create({ ownerId: authStub.user1.user.id });
|
||||
const deletedAsset = AssetFactory.create({ libraryId: 'library-id', isExternal: true });
|
||||
mocks.partner.getAll.mockResolvedValue([]);
|
||||
mocks.asset.getChangedDeltaSync.mockResolvedValue([getForAsset(asset)]);
|
||||
mocks.audit.getAfter.mockResolvedValue([deletedAsset.id]);
|
||||
await expect(
|
||||
sut.getDeltaSync(authStub.user1, { updatedAfter: new Date(), userIds: [authStub.user1.user.id] }),
|
||||
).resolves.toEqual({
|
||||
needsFullSync: false,
|
||||
upserted: [mapAsset(getForAsset(asset), mapAssetOpts)],
|
||||
deleted: [deletedAsset.id],
|
||||
});
|
||||
expect(mocks.asset.getChangedDeltaSync).toHaveBeenCalledTimes(1);
|
||||
expect(mocks.audit.getAfter).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -2,14 +2,9 @@ import { BadRequestException, ForbiddenException, Injectable } from '@nestjs/com
|
||||
import { Insertable } from 'kysely';
|
||||
import { DateTime, Duration } from 'luxon';
|
||||
import { Writable } from 'node:stream';
|
||||
import { AUDIT_LOG_MAX_DURATION } from 'src/constants';
|
||||
import { OnJob } from 'src/decorators';
|
||||
import { AssetResponseDto, mapAsset } from 'src/dtos/asset-response.dto';
|
||||
import { AuthDto } from 'src/dtos/auth.dto';
|
||||
import {
|
||||
AssetDeltaSyncDto,
|
||||
AssetDeltaSyncResponseDto,
|
||||
AssetFullSyncDto,
|
||||
SyncAckDeleteDto,
|
||||
SyncAckSetDto,
|
||||
syncAssetFaceV2ToV1,
|
||||
@@ -17,23 +12,12 @@ import {
|
||||
SyncItem,
|
||||
SyncStreamDto,
|
||||
} from 'src/dtos/sync.dto';
|
||||
import {
|
||||
AssetVisibility,
|
||||
DatabaseAction,
|
||||
EntityType,
|
||||
JobName,
|
||||
Permission,
|
||||
QueueName,
|
||||
SyncEntityType,
|
||||
SyncRequestType,
|
||||
} from 'src/enum';
|
||||
import { JobName, QueueName, SyncEntityType, SyncRequestType } from 'src/enum';
|
||||
import { SyncQueryOptions } from 'src/repositories/sync.repository';
|
||||
import { SessionSyncCheckpointTable } from 'src/schema/tables/sync-checkpoint.table';
|
||||
import { BaseService } from 'src/services/base.service';
|
||||
import { SyncAck } from 'src/types';
|
||||
import { getMyPartnerIds } from 'src/utils/asset.util';
|
||||
import { hexOrBufferToBase64 } from 'src/utils/bytes';
|
||||
import { setIsEqual } from 'src/utils/set';
|
||||
import { fromAck, serialize, SerializeOptions, toAck } from 'src/utils/sync';
|
||||
|
||||
type CheckpointMap = Partial<Record<SyncEntityType, SyncAck>>;
|
||||
@@ -66,7 +50,6 @@ const sendEntityBackfillCompleteAck = (response: Writable, ackType: SyncEntityTy
|
||||
send(response, { type: SyncEntityType.SyncAckV1, data: {}, ackType, ids: [id, COMPLETE_ID] });
|
||||
};
|
||||
|
||||
const FULL_SYNC = { needsFullSync: true, deleted: [], upserted: [] };
|
||||
export const SYNC_TYPES_ORDER = [
|
||||
SyncRequestType.AuthUsersV1,
|
||||
SyncRequestType.UsersV1,
|
||||
@@ -887,68 +870,4 @@ export class SyncService extends BaseService {
|
||||
},
|
||||
]);
|
||||
}
|
||||
|
||||
async getFullSync(auth: AuthDto, dto: AssetFullSyncDto): Promise<AssetResponseDto[]> {
|
||||
// mobile implementation is faster if this is a single id
|
||||
const userId = dto.userId || auth.user.id;
|
||||
await this.requireAccess({ auth, permission: Permission.TimelineRead, ids: [userId] });
|
||||
const assets = await this.assetRepository.getAllForUserFullSync({
|
||||
ownerId: userId,
|
||||
updatedUntil: dto.updatedUntil,
|
||||
lastId: dto.lastId,
|
||||
limit: dto.limit,
|
||||
});
|
||||
return assets.map((a) => mapAsset(a, { auth, stripMetadata: false, withStack: true }));
|
||||
}
|
||||
|
||||
async getDeltaSync(auth: AuthDto, dto: AssetDeltaSyncDto): Promise<AssetDeltaSyncResponseDto> {
|
||||
// app has not synced in the last 100 days
|
||||
const duration = DateTime.now().diff(DateTime.fromJSDate(dto.updatedAfter));
|
||||
if (duration > AUDIT_LOG_MAX_DURATION) {
|
||||
return FULL_SYNC;
|
||||
}
|
||||
|
||||
// app does not have the correct partners synced
|
||||
const partnerIds = await getMyPartnerIds({ userId: auth.user.id, repository: this.partnerRepository });
|
||||
const userIds = [auth.user.id, ...partnerIds];
|
||||
if (!setIsEqual(new Set(userIds), new Set(dto.userIds))) {
|
||||
return FULL_SYNC;
|
||||
}
|
||||
|
||||
await this.requireAccess({ auth, permission: Permission.TimelineRead, ids: dto.userIds });
|
||||
|
||||
const limit = 10_000;
|
||||
const upserted = await this.assetRepository.getChangedDeltaSync({ limit, updatedAfter: dto.updatedAfter, userIds });
|
||||
|
||||
// too many changes, need to do a full sync
|
||||
if (upserted.length === limit) {
|
||||
return FULL_SYNC;
|
||||
}
|
||||
|
||||
const deleted = await this.auditRepository.getAfter(dto.updatedAfter, {
|
||||
userIds,
|
||||
entityType: EntityType.Asset,
|
||||
action: DatabaseAction.Delete,
|
||||
});
|
||||
|
||||
const result = {
|
||||
needsFullSync: false,
|
||||
upserted: upserted
|
||||
// do not return archived assets for partner users
|
||||
.filter(
|
||||
(a) =>
|
||||
a.ownerId === auth.user.id || (a.ownerId !== auth.user.id && a.visibility === AssetVisibility.Timeline),
|
||||
)
|
||||
.map((a) =>
|
||||
mapAsset(a, {
|
||||
auth,
|
||||
stripMetadata: false,
|
||||
// ignore stacks for non partner users
|
||||
withStack: a.ownerId === auth.user.id,
|
||||
}),
|
||||
),
|
||||
deleted,
|
||||
};
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -351,7 +351,6 @@ export type JobItem =
|
||||
| { name: JobName.FileDelete; data: IDeleteFilesJob }
|
||||
|
||||
// Cleanup
|
||||
| { name: JobName.AuditLogCleanup; data?: IBaseJob }
|
||||
| { name: JobName.SessionCleanup; data?: IBaseJob }
|
||||
|
||||
// Tags
|
||||
|
||||
Reference in New Issue
Block a user