From 749f63e4a00e406fd7ece2ba7e629a3f434929d1 Mon Sep 17 00:00:00 2001 From: Zack Pollard Date: Tue, 17 Jun 2025 14:56:54 +0100 Subject: [PATCH] fix: partner asset and exif sync backfill (#19224) * fix: partner asset sync backfill * fix: add partner asset exif backfill * ci: output content of files that have changed --- .github/workflows/test.yml | 1 + .../openapi/lib/model/sync_entity_type.dart | 9 ++ open-api/immich-openapi-specs.json | 5 +- open-api/typescript-sdk/src/fetch-client.ts | 5 +- server/src/database.ts | 1 + server/src/db.d.ts | 1 + server/src/decorators.ts | 3 + server/src/dtos/sync.dto.ts | 3 + server/src/enum.ts | 4 + server/src/queries/sync.repository.sql | 78 +++++++++ server/src/repositories/sync.repository.ts | 39 +++++ .../1750107668827-PartnerCreateId.ts | 10 ++ server/src/schema/tables/partner.table.ts | 5 +- server/src/services/sync.service.ts | 138 ++++++++++++++-- server/src/types.ts | 1 + server/src/utils/sync.ts | 15 +- server/test/medium.factory.ts | 4 +- .../sync/sync-partner-asset-exif.spec.ts | 152 +++++++++++++++++- .../specs/sync/sync-partner-asset.spec.ts | 149 ++++++++++++++++- server/test/small.factory.ts | 17 +- server/test/utils.ts | 4 + 21 files changed, 607 insertions(+), 37 deletions(-) create mode 100644 server/src/schema/migrations/1750107668827-PartnerCreateId.ts diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 186eb07761..c9fd2600bf 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -722,6 +722,7 @@ jobs: run: | echo "ERROR: Generated SQL files not up to date!" echo "Changed files: ${CHANGED_FILES}" + git diff exit 1 # mobile-integration-tests: diff --git a/mobile/openapi/lib/model/sync_entity_type.dart b/mobile/openapi/lib/model/sync_entity_type.dart index 600371545a..f1366a2bfc 100644 --- a/mobile/openapi/lib/model/sync_entity_type.dart +++ b/mobile/openapi/lib/model/sync_entity_type.dart @@ -31,12 +31,15 @@ class SyncEntityType { static const assetDeleteV1 = SyncEntityType._(r'AssetDeleteV1'); static const assetExifV1 = SyncEntityType._(r'AssetExifV1'); static const partnerAssetV1 = SyncEntityType._(r'PartnerAssetV1'); + static const partnerAssetBackfillV1 = SyncEntityType._(r'PartnerAssetBackfillV1'); static const partnerAssetDeleteV1 = SyncEntityType._(r'PartnerAssetDeleteV1'); static const partnerAssetExifV1 = SyncEntityType._(r'PartnerAssetExifV1'); + static const partnerAssetExifBackfillV1 = SyncEntityType._(r'PartnerAssetExifBackfillV1'); static const albumV1 = SyncEntityType._(r'AlbumV1'); static const albumDeleteV1 = SyncEntityType._(r'AlbumDeleteV1'); static const albumUserV1 = SyncEntityType._(r'AlbumUserV1'); static const albumUserDeleteV1 = SyncEntityType._(r'AlbumUserDeleteV1'); + static const syncAckV1 = SyncEntityType._(r'SyncAckV1'); /// List of all possible values in this [enum][SyncEntityType]. static const values = [ @@ -48,12 +51,15 @@ class SyncEntityType { assetDeleteV1, assetExifV1, partnerAssetV1, + partnerAssetBackfillV1, partnerAssetDeleteV1, partnerAssetExifV1, + partnerAssetExifBackfillV1, albumV1, albumDeleteV1, albumUserV1, albumUserDeleteV1, + syncAckV1, ]; static SyncEntityType? fromJson(dynamic value) => SyncEntityTypeTypeTransformer().decode(value); @@ -100,12 +106,15 @@ class SyncEntityTypeTypeTransformer { case r'AssetDeleteV1': return SyncEntityType.assetDeleteV1; case r'AssetExifV1': return SyncEntityType.assetExifV1; case r'PartnerAssetV1': return SyncEntityType.partnerAssetV1; + case r'PartnerAssetBackfillV1': return SyncEntityType.partnerAssetBackfillV1; case r'PartnerAssetDeleteV1': return SyncEntityType.partnerAssetDeleteV1; case r'PartnerAssetExifV1': return SyncEntityType.partnerAssetExifV1; + case r'PartnerAssetExifBackfillV1': return SyncEntityType.partnerAssetExifBackfillV1; case r'AlbumV1': return SyncEntityType.albumV1; case r'AlbumDeleteV1': return SyncEntityType.albumDeleteV1; case r'AlbumUserV1': return SyncEntityType.albumUserV1; case r'AlbumUserDeleteV1': return SyncEntityType.albumUserDeleteV1; + case r'SyncAckV1': return SyncEntityType.syncAckV1; default: if (!allowNull) { throw ArgumentError('Unknown enum value to decode: $data'); diff --git a/open-api/immich-openapi-specs.json b/open-api/immich-openapi-specs.json index 3ab1a0ce74..6e1f029862 100644 --- a/open-api/immich-openapi-specs.json +++ b/open-api/immich-openapi-specs.json @@ -13699,12 +13699,15 @@ "AssetDeleteV1", "AssetExifV1", "PartnerAssetV1", + "PartnerAssetBackfillV1", "PartnerAssetDeleteV1", "PartnerAssetExifV1", + "PartnerAssetExifBackfillV1", "AlbumV1", "AlbumDeleteV1", "AlbumUserV1", - "AlbumUserDeleteV1" + "AlbumUserDeleteV1", + "SyncAckV1" ], "type": "string" }, diff --git a/open-api/typescript-sdk/src/fetch-client.ts b/open-api/typescript-sdk/src/fetch-client.ts index 9a642a2d4b..6a288e75db 100644 --- a/open-api/typescript-sdk/src/fetch-client.ts +++ b/open-api/typescript-sdk/src/fetch-client.ts @@ -4052,12 +4052,15 @@ export enum SyncEntityType { AssetDeleteV1 = "AssetDeleteV1", AssetExifV1 = "AssetExifV1", PartnerAssetV1 = "PartnerAssetV1", + PartnerAssetBackfillV1 = "PartnerAssetBackfillV1", PartnerAssetDeleteV1 = "PartnerAssetDeleteV1", PartnerAssetExifV1 = "PartnerAssetExifV1", + PartnerAssetExifBackfillV1 = "PartnerAssetExifBackfillV1", AlbumV1 = "AlbumV1", AlbumDeleteV1 = "AlbumDeleteV1", AlbumUserV1 = "AlbumUserV1", - AlbumUserDeleteV1 = "AlbumUserDeleteV1" + AlbumUserDeleteV1 = "AlbumUserDeleteV1", + SyncAckV1 = "SyncAckV1" } export enum SyncRequestType { UsersV1 = "UsersV1", diff --git a/server/src/database.ts b/server/src/database.ts index 87509bd72a..79c550dd52 100644 --- a/server/src/database.ts +++ b/server/src/database.ts @@ -209,6 +209,7 @@ export type Partner = { sharedWithId: string; sharedWith: User; createdAt: Date; + createId: string; updatedAt: Date; updateId: string; inTimeline: boolean; diff --git a/server/src/db.d.ts b/server/src/db.d.ts index af1dd964fd..5aa8a8c4dc 100644 --- a/server/src/db.d.ts +++ b/server/src/db.d.ts @@ -332,6 +332,7 @@ export interface PartnersAudit { export interface Partners { createdAt: Generated; + createId: Generated; inTimeline: Generated; sharedById: string; sharedWithId: string; diff --git a/server/src/decorators.ts b/server/src/decorators.ts index 6b34ffcafe..766e7c70b9 100644 --- a/server/src/decorators.ts +++ b/server/src/decorators.ts @@ -14,6 +14,9 @@ const GeneratedUuidV7Column = (options: Omit = {}) => GeneratedUuidV7Column(options); +export const CreateIdColumn = (options: Omit = {}) => + GeneratedUuidV7Column(options); + export const PrimaryGeneratedUuidV7Column = () => GeneratedUuidV7Column({ primary: true }); export const UpdatedAtTrigger = (name: string) => diff --git a/server/src/dtos/sync.dto.ts b/server/src/dtos/sync.dto.ts index 050635308e..dbd58cde53 100644 --- a/server/src/dtos/sync.dto.ts +++ b/server/src/dtos/sync.dto.ts @@ -154,12 +154,15 @@ export type SyncItem = { [SyncEntityType.AssetDeleteV1]: SyncAssetDeleteV1; [SyncEntityType.AssetExifV1]: SyncAssetExifV1; [SyncEntityType.PartnerAssetV1]: SyncAssetV1; + [SyncEntityType.PartnerAssetBackfillV1]: SyncAssetV1; [SyncEntityType.PartnerAssetDeleteV1]: SyncAssetDeleteV1; [SyncEntityType.PartnerAssetExifV1]: SyncAssetExifV1; + [SyncEntityType.PartnerAssetExifBackfillV1]: SyncAssetExifV1; [SyncEntityType.AlbumV1]: SyncAlbumV1; [SyncEntityType.AlbumDeleteV1]: SyncAlbumDeleteV1; [SyncEntityType.AlbumUserV1]: SyncAlbumUserV1; [SyncEntityType.AlbumUserDeleteV1]: SyncAlbumUserDeleteV1; + [SyncEntityType.SyncAckV1]: object; }; const responseDtos = [ diff --git a/server/src/enum.ts b/server/src/enum.ts index e7e40eb122..4353e43ad1 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -595,13 +595,17 @@ export enum SyncEntityType { AssetExifV1 = 'AssetExifV1', PartnerAssetV1 = 'PartnerAssetV1', + PartnerAssetBackfillV1 = 'PartnerAssetBackfillV1', PartnerAssetDeleteV1 = 'PartnerAssetDeleteV1', PartnerAssetExifV1 = 'PartnerAssetExifV1', + PartnerAssetExifBackfillV1 = 'PartnerAssetExifBackfillV1', AlbumV1 = 'AlbumV1', AlbumDeleteV1 = 'AlbumDeleteV1', AlbumUserV1 = 'AlbumUserV1', AlbumUserDeleteV1 = 'AlbumUserDeleteV1', + + SyncAckV1 = 'SyncAckV1', } export enum NotificationLevel { diff --git a/server/src/queries/sync.repository.sql b/server/src/queries/sync.repository.sql index 659365c563..8e52754467 100644 --- a/server/src/queries/sync.repository.sql +++ b/server/src/queries/sync.repository.sql @@ -96,6 +96,45 @@ where order by "updateId" asc +-- SyncRepository.getPartnerBackfill +select + "sharedById", + "createId" +from + "partners" +where + "sharedWithId" = $1 + and "createId" >= $2 + and "createdAt" < now() - interval '1 millisecond' +order by + "partners"."createId" asc + +-- SyncRepository.getPartnerAssetsBackfill +select + "id", + "ownerId", + "originalFileName", + "thumbhash", + "checksum", + "fileCreatedAt", + "fileModifiedAt", + "localDateTime", + "type", + "deletedAt", + "isFavorite", + "visibility", + "updateId", + "duration" +from + "assets" +where + "ownerId" = $1 + and "updatedAt" < now() - interval '1 millisecond' + and "updateId" < $2 + and "updateId" >= $3 +order by + "updateId" asc + -- SyncRepository.getPartnerAssetsUpserts select "id", @@ -201,6 +240,45 @@ where order by "updateId" asc +-- SyncRepository.getPartnerAssetExifsBackfill +select + "exif"."assetId", + "exif"."description", + "exif"."exifImageWidth", + "exif"."exifImageHeight", + "exif"."fileSizeInByte", + "exif"."orientation", + "exif"."dateTimeOriginal", + "exif"."modifyDate", + "exif"."timeZone", + "exif"."latitude", + "exif"."longitude", + "exif"."projectionType", + "exif"."city", + "exif"."state", + "exif"."country", + "exif"."make", + "exif"."model", + "exif"."lensModel", + "exif"."fNumber", + "exif"."focalLength", + "exif"."iso", + "exif"."exposureTime", + "exif"."profileDescription", + "exif"."rating", + "exif"."fps", + "exif"."updateId" +from + "exif" + inner join "assets" on "assets"."id" = "exif"."assetId" +where + "assets"."ownerId" = $1 + and "exif"."updatedAt" < now() - interval '1 millisecond' + and "exif"."updateId" < $2 + and "exif"."updateId" >= $3 +order by + "exif"."updateId" asc + -- SyncRepository.getPartnerAssetExifsUpserts select "exif"."assetId", diff --git a/server/src/repositories/sync.repository.ts b/server/src/repositories/sync.repository.ts index 43fd732747..0f2d382fe0 100644 --- a/server/src/repositories/sync.repository.ts +++ b/server/src/repositories/sync.repository.ts @@ -92,6 +92,31 @@ export class SyncRepository { .stream(); } + @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID] }) + getPartnerBackfill(userId: string, afterCreateId?: string) { + return this.db + .selectFrom('partners') + .select(['sharedById', 'createId']) + .where('sharedWithId', '=', userId) + .$if(!!afterCreateId, (qb) => qb.where('createId', '>=', afterCreateId!)) + .where('createdAt', '<', sql.raw("now() - interval '1 millisecond'")) + .orderBy('partners.createId', 'asc') + .execute(); + } + + @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) + getPartnerAssetsBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { + return this.db + .selectFrom('assets') + .select(columns.syncAsset) + .where('ownerId', '=', partnerId) + .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('updateId', '<', beforeUpdateId) + .$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!)) + .orderBy('updateId', 'asc') + .stream(); + } + @GenerateSql({ params: [DummyValue.UUID], stream: true }) getPartnerAssetsUpserts(userId: string, ack?: SyncAck) { return this.db @@ -136,6 +161,20 @@ export class SyncRepository { .stream(); } + @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) + getPartnerAssetExifsBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { + return this.db + .selectFrom('exif') + .select(columns.syncAssetExif) + .innerJoin('assets', 'assets.id', 'exif.assetId') + .where('assets.ownerId', '=', partnerId) + .where('exif.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('exif.updateId', '<', beforeUpdateId) + .$if(!!afterUpdateId, (eb) => eb.where('exif.updateId', '>=', afterUpdateId!)) + .orderBy('exif.updateId', 'asc') + .stream(); + } + @GenerateSql({ params: [DummyValue.UUID], stream: true }) getPartnerAssetExifsUpserts(userId: string, ack?: SyncAck) { return this.db diff --git a/server/src/schema/migrations/1750107668827-PartnerCreateId.ts b/server/src/schema/migrations/1750107668827-PartnerCreateId.ts new file mode 100644 index 0000000000..56d78bf25a --- /dev/null +++ b/server/src/schema/migrations/1750107668827-PartnerCreateId.ts @@ -0,0 +1,10 @@ +import { Kysely, sql } from 'kysely'; + +export async function up(db: Kysely): Promise { + await sql`ALTER TABLE "partners" ADD "createId" uuid NOT NULL DEFAULT immich_uuid_v7();`.execute(db); + await sql`UPDATE "partners" SET "createId" = immich_uuid_v7("createdAt")`.execute(db); +} + +export async function down(db: Kysely): Promise { + await sql`ALTER TABLE "partners" DROP COLUMN "createId";`.execute(db); +} diff --git a/server/src/schema/tables/partner.table.ts b/server/src/schema/tables/partner.table.ts index 0da60cfc0c..6b83c6ba4c 100644 --- a/server/src/schema/tables/partner.table.ts +++ b/server/src/schema/tables/partner.table.ts @@ -1,4 +1,4 @@ -import { UpdatedAtTrigger, UpdateIdColumn } from 'src/decorators'; +import { CreateIdColumn, UpdatedAtTrigger, UpdateIdColumn } from 'src/decorators'; import { partners_delete_audit } from 'src/schema/functions'; import { UserTable } from 'src/schema/tables/user.table'; import { AfterDeleteTrigger, Column, CreateDateColumn, ForeignKeyColumn, Table, UpdateDateColumn } from 'src/sql-tools'; @@ -27,6 +27,9 @@ export class PartnerTable { @CreateDateColumn() createdAt!: Date; + @CreateIdColumn() + createId!: string; + @UpdateDateColumn() updatedAt!: Date; diff --git a/server/src/services/sync.service.ts b/server/src/services/sync.service.ts index d6cbc17a29..4705fc8e1f 100644 --- a/server/src/services/sync.service.ts +++ b/server/src/services/sync.service.ts @@ -20,7 +20,7 @@ import { SyncAck } from 'src/types'; import { getMyPartnerIds } from 'src/utils/asset.util'; import { hexOrBufferToBase64 } from 'src/utils/bytes'; import { setIsEqual } from 'src/utils/set'; -import { fromAck, serialize } from 'src/utils/sync'; +import { fromAck, serialize, toAck } from 'src/utils/sync'; const FULL_SYNC = { needsFullSync: true, deleted: [], upserted: [] }; export const SYNC_TYPES_ORDER = [ @@ -98,12 +98,12 @@ export class SyncService extends BaseService { case SyncRequestType.UsersV1: { const deletes = this.syncRepository.getUserDeletes(checkpointMap[SyncEntityType.UserDeleteV1]); for await (const { id, ...data } of deletes) { - response.write(serialize({ type: SyncEntityType.UserDeleteV1, updateId: id, data })); + response.write(serialize({ type: SyncEntityType.UserDeleteV1, ids: [id], data })); } const upserts = this.syncRepository.getUserUpserts(checkpointMap[SyncEntityType.UserV1]); for await (const { updateId, ...data } of upserts) { - response.write(serialize({ type: SyncEntityType.UserV1, updateId, data })); + response.write(serialize({ type: SyncEntityType.UserV1, ids: [updateId], data })); } break; @@ -115,12 +115,12 @@ export class SyncService extends BaseService { checkpointMap[SyncEntityType.PartnerDeleteV1], ); for await (const { id, ...data } of deletes) { - response.write(serialize({ type: SyncEntityType.PartnerDeleteV1, updateId: id, data })); + response.write(serialize({ type: SyncEntityType.PartnerDeleteV1, ids: [id], data })); } const upserts = this.syncRepository.getPartnerUpserts(auth.user.id, checkpointMap[SyncEntityType.PartnerV1]); for await (const { updateId, ...data } of upserts) { - response.write(serialize({ type: SyncEntityType.PartnerV1, updateId, data })); + response.write(serialize({ type: SyncEntityType.PartnerV1, ids: [updateId], data })); } break; @@ -132,7 +132,7 @@ export class SyncService extends BaseService { checkpointMap[SyncEntityType.AssetDeleteV1], ); for await (const { id, ...data } of deletes) { - response.write(serialize({ type: SyncEntityType.AssetDeleteV1, updateId: id, data })); + response.write(serialize({ type: SyncEntityType.AssetDeleteV1, ids: [id], data })); } const upserts = this.syncRepository.getAssetUpserts(auth.user.id, checkpointMap[SyncEntityType.AssetV1]); @@ -140,7 +140,7 @@ export class SyncService extends BaseService { response.write( serialize({ type: SyncEntityType.AssetV1, - updateId, + ids: [updateId], data: { ...data, checksum: hexOrBufferToBase64(checksum), @@ -159,7 +159,60 @@ export class SyncService extends BaseService { checkpointMap[SyncEntityType.PartnerAssetDeleteV1], ); for await (const { id, ...data } of deletes) { - response.write(serialize({ type: SyncEntityType.PartnerAssetDeleteV1, updateId: id, data })); + response.write(serialize({ type: SyncEntityType.PartnerAssetDeleteV1, ids: [id], data })); + } + + const checkpoint = checkpointMap[SyncEntityType.PartnerAssetBackfillV1]; + const partnerAssetCheckpoint = checkpointMap[SyncEntityType.PartnerAssetV1]; + + const partners = await this.syncRepository.getPartnerBackfill(auth.user.id, checkpoint?.updateId); + + if (partnerAssetCheckpoint) { + for (const partner of partners) { + if (partner.createId === checkpoint?.updateId && checkpoint.extraId === 'complete') { + continue; + } + const partnerCheckpoint = checkpoint?.updateId === partner.createId ? checkpoint?.extraId : undefined; + const backfill = this.syncRepository.getPartnerAssetsBackfill( + partner.sharedById, + partnerCheckpoint, + partnerAssetCheckpoint.updateId, + ); + + for await (const { updateId, checksum, thumbhash, ...data } of backfill) { + response.write( + serialize({ + type: SyncEntityType.PartnerAssetBackfillV1, + ids: [updateId], + data: { + ...data, + checksum: hexOrBufferToBase64(checksum), + thumbhash: thumbhash ? hexOrBufferToBase64(thumbhash) : null, + }, + }), + ); + } + response.write( + serialize({ + type: SyncEntityType.SyncAckV1, + data: {}, + ackType: SyncEntityType.PartnerAssetBackfillV1, + ids: [partner.sharedById, 'complete'], + }), + ); + } + } else if (partners.length > 0) { + await this.syncRepository.upsertCheckpoints([ + { + type: SyncEntityType.PartnerAssetBackfillV1, + sessionId, + ack: toAck({ + type: SyncEntityType.PartnerAssetBackfillV1, + updateId: partners.at(-1)!.createId, + extraId: 'complete', + }), + }, + ]); } const upserts = this.syncRepository.getPartnerAssetsUpserts( @@ -170,7 +223,7 @@ export class SyncService extends BaseService { response.write( serialize({ type: SyncEntityType.PartnerAssetV1, - updateId, + ids: [updateId], data: { ...data, checksum: hexOrBufferToBase64(checksum), @@ -189,19 +242,74 @@ export class SyncService extends BaseService { checkpointMap[SyncEntityType.AssetExifV1], ); for await (const { updateId, ...data } of upserts) { - response.write(serialize({ type: SyncEntityType.AssetExifV1, updateId, data })); + response.write(serialize({ type: SyncEntityType.AssetExifV1, ids: [updateId], data })); } break; } case SyncRequestType.PartnerAssetExifsV1: { + const checkpoint = checkpointMap[SyncEntityType.PartnerAssetExifBackfillV1]; + const partnerAssetCheckpoint = checkpointMap[SyncEntityType.PartnerAssetExifV1]; + + const partners = await this.syncRepository.getPartnerBackfill(auth.user.id, checkpoint?.updateId); + + if (partnerAssetCheckpoint) { + for (const partner of partners) { + if (partner.createId === checkpoint?.updateId && checkpoint.extraId === 'complete') { + continue; + } + const partnerCheckpoint = checkpoint?.updateId === partner.createId ? checkpoint?.extraId : undefined; + const backfill = this.syncRepository.getPartnerAssetExifsBackfill( + partner.sharedById, + partnerCheckpoint, + partnerAssetCheckpoint.updateId, + ); + + for await (const { updateId, ...data } of backfill) { + response.write( + serialize({ + type: SyncEntityType.PartnerAssetExifBackfillV1, + ids: [updateId], + data, + }), + ); + } + response.write( + serialize({ + type: SyncEntityType.SyncAckV1, + data: {}, + ackType: SyncEntityType.PartnerAssetExifBackfillV1, + ids: [partner.sharedById, 'complete'], + }), + ); + } + } else if (partners.length > 0) { + await this.syncRepository.upsertCheckpoints([ + { + type: SyncEntityType.PartnerAssetExifBackfillV1, + sessionId, + ack: toAck({ + type: SyncEntityType.PartnerAssetExifBackfillV1, + updateId: partners.at(-1)!.createId, + extraId: 'complete', + }), + }, + ]); + } + const upserts = this.syncRepository.getPartnerAssetExifsUpserts( auth.user.id, checkpointMap[SyncEntityType.PartnerAssetExifV1], ); for await (const { updateId, ...data } of upserts) { - response.write(serialize({ type: SyncEntityType.PartnerAssetExifV1, updateId, data })); + response.write( + serialize({ + type: SyncEntityType.PartnerAssetExifV1, + ids: [updateId], + data, + }), + ); } break; @@ -213,12 +321,12 @@ export class SyncService extends BaseService { checkpointMap[SyncEntityType.AlbumDeleteV1], ); for await (const { id, ...data } of deletes) { - response.write(serialize({ type: SyncEntityType.AlbumDeleteV1, updateId: id, data })); + response.write(serialize({ type: SyncEntityType.AlbumDeleteV1, ids: [id], data })); } const upserts = this.syncRepository.getAlbumUpserts(auth.user.id, checkpointMap[SyncEntityType.AlbumV1]); for await (const { updateId, ...data } of upserts) { - response.write(serialize({ type: SyncEntityType.AlbumV1, updateId, data })); + response.write(serialize({ type: SyncEntityType.AlbumV1, ids: [updateId], data })); } break; @@ -230,7 +338,7 @@ export class SyncService extends BaseService { checkpointMap[SyncEntityType.AlbumUserDeleteV1], ); for await (const { id, ...data } of deletes) { - response.write(serialize({ type: SyncEntityType.AlbumUserDeleteV1, updateId: id, data })); + response.write(serialize({ type: SyncEntityType.AlbumUserDeleteV1, ids: [id], data })); } const upserts = this.syncRepository.getAlbumUserUpserts( @@ -238,7 +346,7 @@ export class SyncService extends BaseService { checkpointMap[SyncEntityType.AlbumUserV1], ); for await (const { updateId, ...data } of upserts) { - response.write(serialize({ type: SyncEntityType.AlbumUserV1, updateId, data })); + response.write(serialize({ type: SyncEntityType.AlbumUserV1, ids: [updateId], data })); } break; diff --git a/server/src/types.ts b/server/src/types.ts index 3ef22f96ff..6776604078 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -421,6 +421,7 @@ export interface IBulkAsset { export type SyncAck = { type: SyncEntityType; updateId: string; + extraId?: string; }; export type StorageAsset = { diff --git a/server/src/utils/sync.ts b/server/src/utils/sync.ts index cfb6660bdc..893c94dfcd 100644 --- a/server/src/utils/sync.ts +++ b/server/src/utils/sync.ts @@ -9,20 +9,23 @@ type Impossible = { type Exact = U & Impossible>; export const fromAck = (ack: string): SyncAck => { - const [type, updateId] = ack.split('|'); - return { type: type as SyncEntityType, updateId }; + const [type, updateId, extraId] = ack.split('|'); + return { type: type as SyncEntityType, updateId, extraId }; }; -export const toAck = ({ type, updateId }: SyncAck) => [type, updateId].join('|'); +export const toAck = ({ type, updateId, extraId }: SyncAck) => + [type, updateId, extraId].filter((v) => v !== undefined).join('|'); export const mapJsonLine = (object: unknown) => JSON.stringify(object) + '\n'; export const serialize = ({ type, - updateId, data, + ids, + ackType, }: { type: T; - updateId: string; data: Exact; -}) => mapJsonLine({ type, data, ack: toAck({ type, updateId }) }); + ids: [string] | [string, string]; + ackType?: SyncEntityType; +}) => mapJsonLine({ type, data, ack: toAck({ type: ackType ?? type, updateId: ids[0], extraId: ids[1] }) }); diff --git a/server/test/medium.factory.ts b/server/test/medium.factory.ts index 9a4065aa9d..988f1b406f 100644 --- a/server/test/medium.factory.ts +++ b/server/test/medium.factory.ts @@ -33,7 +33,7 @@ import { BaseService } from 'src/services/base.service'; import { SyncService } from 'src/services/sync.service'; import { RepositoryInterface } from 'src/types'; import { factory, newDate, newEmbedding, newUuid } from 'test/small.factory'; -import { automock, ServiceOverrides } from 'test/utils'; +import { automock, ServiceOverrides, wait } from 'test/utils'; import { Mocked } from 'vitest'; const sha256 = (value: string) => createHash('sha256').update(value).digest('base64'); @@ -120,7 +120,7 @@ export const newSyncTest = (options: SyncTestOptions) => { const testSync = async (auth: AuthDto, types: SyncRequestType[]) => { const stream = mediumFactory.syncStream(); // Wait for 2ms to ensure all updates are available and account for setTimeout inaccuracy - await new Promise((resolve) => setTimeout(resolve, 2)); + await wait(2); await sut.stream(auth, stream, { types }); return stream.getResponse(); diff --git a/server/test/medium/specs/sync/sync-partner-asset-exif.spec.ts b/server/test/medium/specs/sync/sync-partner-asset-exif.spec.ts index 8d9e6d6ac5..edf6eee564 100644 --- a/server/test/medium/specs/sync/sync-partner-asset-exif.spec.ts +++ b/server/test/medium/specs/sync/sync-partner-asset-exif.spec.ts @@ -3,7 +3,7 @@ import { DB } from 'src/db'; import { SyncEntityType, SyncRequestType } from 'src/enum'; import { mediumFactory, newSyncAuthUser, newSyncTest } from 'test/medium.factory'; import { factory } from 'test/small.factory'; -import { getKyselyDB } from 'test/utils'; +import { getKyselyDB, wait } from 'test/utils'; let defaultDatabase: Kysely; @@ -126,4 +126,154 @@ describe.concurrent(SyncRequestType.PartnerAssetExifsV1, () => { await expect(testSync(authUser3, [SyncRequestType.AssetExifsV1])).resolves.toHaveLength(1); await expect(testSync(auth, [SyncRequestType.PartnerAssetExifsV1])).resolves.toHaveLength(0); }); + + it('should backfill partner asset exif when a partner shared their library with you', async () => { + const { auth, sut, getRepository, testSync } = await setup(); + + const userRepo = getRepository('user'); + const user2 = mediumFactory.userInsert(); + const user3 = mediumFactory.userInsert(); + await userRepo.create(user2); + await userRepo.create(user3); + + const assetRepo = getRepository('asset'); + const assetUser3 = mediumFactory.assetInsert({ ownerId: user3.id }); + const assetUser2 = mediumFactory.assetInsert({ ownerId: user2.id }); + await assetRepo.create(assetUser3); + await assetRepo.upsertExif({ assetId: assetUser3.id, make: 'Canon' }); + await wait(2); + await assetRepo.create(assetUser2); + await assetRepo.upsertExif({ assetId: assetUser2.id, make: 'Canon' }); + + const partnerRepo = getRepository('partner'); + await partnerRepo.create({ sharedById: user2.id, sharedWithId: auth.user.id }); + + const response = await testSync(auth, [SyncRequestType.PartnerAssetExifsV1]); + + expect(response).toHaveLength(1); + expect(response).toEqual( + expect.arrayContaining([ + { + ack: expect.any(String), + data: expect.objectContaining({ + assetId: assetUser2.id, + }), + type: SyncEntityType.PartnerAssetExifV1, + }, + ]), + ); + + const acks = response.map(({ ack }) => ack); + await sut.setAcks(auth, { acks }); + + await partnerRepo.create({ sharedById: user3.id, sharedWithId: auth.user.id }); + const backfillResponse = await testSync(auth, [SyncRequestType.PartnerAssetExifsV1]); + + expect(backfillResponse).toHaveLength(2); + expect(backfillResponse).toEqual( + expect.arrayContaining([ + { + ack: expect.any(String), + data: expect.objectContaining({ + assetId: assetUser3.id, + }), + type: SyncEntityType.PartnerAssetExifBackfillV1, + }, + { + ack: expect.any(String), + data: {}, + type: SyncEntityType.SyncAckV1, + }, + ]), + ); + + const backfillAck = backfillResponse[1].ack; + await sut.setAcks(auth, { acks: [backfillAck] }); + + const finalResponse = await testSync(auth, [SyncRequestType.PartnerAssetExifsV1]); + + const finalAcks = finalResponse.map(({ ack }) => ack); + expect(finalAcks).toEqual([]); + }); + + it('should only backfill partner assets created prior to the current partner asset checkpoint', async () => { + const { auth, sut, getRepository, testSync } = await setup(); + + const userRepo = getRepository('user'); + const user2 = mediumFactory.userInsert(); + const user3 = mediumFactory.userInsert(); + await userRepo.create(user2); + await userRepo.create(user3); + + const assetRepo = getRepository('asset'); + const assetUser3 = mediumFactory.assetInsert({ ownerId: user3.id }); + const assetUser2 = mediumFactory.assetInsert({ ownerId: user2.id }); + const asset2User3 = mediumFactory.assetInsert({ ownerId: user3.id }); + await assetRepo.create(assetUser3); + await assetRepo.upsertExif({ assetId: assetUser3.id, make: 'Canon' }); + await wait(2); + await assetRepo.create(assetUser2); + await assetRepo.upsertExif({ assetId: assetUser2.id, make: 'Canon' }); + await wait(2); + await assetRepo.create(asset2User3); + await assetRepo.upsertExif({ assetId: asset2User3.id, make: 'Canon' }); + + const partnerRepo = getRepository('partner'); + await partnerRepo.create({ sharedById: user2.id, sharedWithId: auth.user.id }); + + const response = await testSync(auth, [SyncRequestType.PartnerAssetExifsV1]); + + expect(response).toHaveLength(1); + expect(response).toEqual( + expect.arrayContaining([ + { + ack: expect.any(String), + data: expect.objectContaining({ + assetId: assetUser2.id, + }), + type: SyncEntityType.PartnerAssetExifV1, + }, + ]), + ); + + const acks = response.map(({ ack }) => ack); + await sut.setAcks(auth, { acks }); + + await partnerRepo.create({ sharedById: user3.id, sharedWithId: auth.user.id }); + const backfillResponse = await testSync(auth, [SyncRequestType.PartnerAssetExifsV1]); + + expect(backfillResponse).toHaveLength(3); + expect(backfillResponse).toEqual( + expect.arrayContaining([ + { + ack: expect.any(String), + data: expect.objectContaining({ + assetId: assetUser3.id, + }), + type: SyncEntityType.PartnerAssetExifBackfillV1, + }, + { + ack: expect.stringContaining(SyncEntityType.PartnerAssetExifBackfillV1), + data: {}, + type: SyncEntityType.SyncAckV1, + }, + { + ack: expect.any(String), + data: expect.objectContaining({ + assetId: asset2User3.id, + }), + type: SyncEntityType.PartnerAssetExifV1, + }, + ]), + ); + + const backfillAck = backfillResponse[1].ack; + const partnerAssetAck = backfillResponse[2].ack; + await sut.setAcks(auth, { acks: [backfillAck, partnerAssetAck] }); + + const finalResponse = await testSync(auth, [SyncRequestType.PartnerAssetExifsV1]); + + const finalAcks = finalResponse.map(({ ack }) => ack); + expect(finalAcks).toEqual([]); + }); }); diff --git a/server/test/medium/specs/sync/sync-partner-asset.spec.ts b/server/test/medium/specs/sync/sync-partner-asset.spec.ts index 8125193ba5..fe3d4edbcc 100644 --- a/server/test/medium/specs/sync/sync-partner-asset.spec.ts +++ b/server/test/medium/specs/sync/sync-partner-asset.spec.ts @@ -3,7 +3,7 @@ import { DB } from 'src/db'; import { SyncEntityType, SyncRequestType } from 'src/enum'; import { mediumFactory, newSyncAuthUser, newSyncTest } from 'test/medium.factory'; import { factory } from 'test/small.factory'; -import { getKyselyDB } from 'test/utils'; +import { getKyselyDB, wait } from 'test/utils'; let defaultDatabase: Kysely; @@ -19,7 +19,7 @@ beforeAll(async () => { defaultDatabase = await getKyselyDB(); }); -describe.concurrent(SyncRequestType.PartnerAssetsV1, () => { +describe(SyncRequestType.PartnerAssetsV1, () => { it('should detect and sync the first partner asset', async () => { const { auth, sut, getRepository, testSync } = await setup(); @@ -210,4 +210,149 @@ describe.concurrent(SyncRequestType.PartnerAssetsV1, () => { await expect(testSync(auth2, [SyncRequestType.AssetsV1])).resolves.toHaveLength(1); await expect(testSync(auth, [SyncRequestType.PartnerAssetsV1])).resolves.toHaveLength(0); }); + + it('should backfill partner assets when a partner shared their library with you', async () => { + const { auth, sut, getRepository, testSync } = await setup(); + + const userRepo = getRepository('user'); + const user2 = mediumFactory.userInsert(); + const user3 = mediumFactory.userInsert(); + await userRepo.create(user2); + await userRepo.create(user3); + + const assetRepo = getRepository('asset'); + const assetUser3 = mediumFactory.assetInsert({ ownerId: user3.id }); + const assetUser2 = mediumFactory.assetInsert({ ownerId: user2.id }); + await assetRepo.create(assetUser3); + await wait(2); + await assetRepo.create(assetUser2); + + const partnerRepo = getRepository('partner'); + await partnerRepo.create({ sharedById: user2.id, sharedWithId: auth.user.id }); + + const response = await testSync(auth, [SyncRequestType.PartnerAssetsV1]); + + expect(response).toHaveLength(1); + expect(response).toEqual( + expect.arrayContaining([ + { + ack: expect.any(String), + data: expect.objectContaining({ + id: assetUser2.id, + }), + type: SyncEntityType.PartnerAssetV1, + }, + ]), + ); + + const acks = response.map(({ ack }) => ack); + await sut.setAcks(auth, { acks }); + + await partnerRepo.create({ sharedById: user3.id, sharedWithId: auth.user.id }); + const backfillResponse = await testSync(auth, [SyncRequestType.PartnerAssetsV1]); + + expect(backfillResponse).toHaveLength(2); + expect(backfillResponse).toEqual( + expect.arrayContaining([ + { + ack: expect.any(String), + data: expect.objectContaining({ + id: assetUser3.id, + }), + type: SyncEntityType.PartnerAssetBackfillV1, + }, + { + ack: expect.stringContaining(SyncEntityType.PartnerAssetBackfillV1), + data: {}, + type: SyncEntityType.SyncAckV1, + }, + ]), + ); + + const backfillAck = backfillResponse[1].ack; + await sut.setAcks(auth, { acks: [backfillAck] }); + + const finalResponse = await testSync(auth, [SyncRequestType.PartnerAssetsV1]); + + const finalAcks = finalResponse.map(({ ack }) => ack); + expect(finalAcks).toEqual([]); + }); + + it('should only backfill partner assets created prior to the current partner asset checkpoint', async () => { + const { auth, sut, getRepository, testSync } = await setup(); + + const userRepo = getRepository('user'); + const user2 = mediumFactory.userInsert(); + const user3 = mediumFactory.userInsert(); + await userRepo.create(user2); + await userRepo.create(user3); + + const assetRepo = getRepository('asset'); + const assetUser3 = mediumFactory.assetInsert({ ownerId: user3.id }); + const assetUser2 = mediumFactory.assetInsert({ ownerId: user2.id }); + const asset2User3 = mediumFactory.assetInsert({ ownerId: user3.id }); + await assetRepo.create(assetUser3); + await wait(2); + await assetRepo.create(assetUser2); + await wait(2); + await assetRepo.create(asset2User3); + + const partnerRepo = getRepository('partner'); + await partnerRepo.create({ sharedById: user2.id, sharedWithId: auth.user.id }); + + const response = await testSync(auth, [SyncRequestType.PartnerAssetsV1]); + + expect(response).toHaveLength(1); + expect(response).toEqual( + expect.arrayContaining([ + { + ack: expect.any(String), + data: expect.objectContaining({ + id: assetUser2.id, + }), + type: SyncEntityType.PartnerAssetV1, + }, + ]), + ); + + const acks = response.map(({ ack }) => ack); + await sut.setAcks(auth, { acks }); + + await partnerRepo.create({ sharedById: user3.id, sharedWithId: auth.user.id }); + const backfillResponse = await testSync(auth, [SyncRequestType.PartnerAssetsV1]); + + expect(backfillResponse).toHaveLength(3); + expect(backfillResponse).toEqual( + expect.arrayContaining([ + { + ack: expect.any(String), + data: expect.objectContaining({ + id: assetUser3.id, + }), + type: SyncEntityType.PartnerAssetBackfillV1, + }, + { + ack: expect.stringContaining(SyncEntityType.PartnerAssetBackfillV1), + data: {}, + type: SyncEntityType.SyncAckV1, + }, + { + ack: expect.any(String), + data: expect.objectContaining({ + id: asset2User3.id, + }), + type: SyncEntityType.PartnerAssetV1, + }, + ]), + ); + + const backfillAck = backfillResponse[1].ack; + const partnerAssetAck = backfillResponse[2].ack; + await sut.setAcks(auth, { acks: [backfillAck, partnerAssetAck] }); + + const finalResponse = await testSync(auth, [SyncRequestType.PartnerAssetsV1]); + + const finalAcks = finalResponse.map(({ ack }) => ack); + expect(finalAcks).toEqual([]); + }); }); diff --git a/server/test/small.factory.ts b/server/test/small.factory.ts index b70f02bcf5..79d6d511a3 100644 --- a/server/test/small.factory.ts +++ b/server/test/small.factory.ts @@ -24,7 +24,7 @@ export const newUuids = () => .fill(0) .map(() => newUuid()); export const newDate = () => new Date(); -export const newUpdateId = () => 'uuid-v7'; +export const newUuidV7 = () => 'uuid-v7'; export const newSha1 = () => Buffer.from('this is a fake hash'); export const newEmbedding = () => { const embedding = Array.from({ length: 512 }) @@ -110,9 +110,10 @@ const partnerFactory = (partner: Partial = {}) => { sharedBy, sharedWithId: sharedWith.id, sharedWith, + createId: newUuidV7(), createdAt: newDate(), updatedAt: newDate(), - updateId: newUpdateId(), + updateId: newUuidV7(), inTimeline: true, ...partner, }; @@ -122,7 +123,7 @@ const sessionFactory = (session: Partial = {}) => ({ id: newUuid(), createdAt: newDate(), updatedAt: newDate(), - updateId: newUpdateId(), + updateId: newUuidV7(), deviceOS: 'android', deviceType: 'mobile', token: 'abc123', @@ -201,7 +202,7 @@ const assetFactory = (asset: Partial = {}) => ({ createdAt: newDate(), updatedAt: newDate(), deletedAt: null, - updateId: newUpdateId(), + updateId: newUuidV7(), status: AssetStatus.ACTIVE, checksum: newSha1(), deviceAssetId: '', @@ -240,7 +241,7 @@ const activityFactory = (activity: Partial = {}) => { albumId: newUuid(), createdAt: newDate(), updatedAt: newDate(), - updateId: newUpdateId(), + updateId: newUuidV7(), ...activity, }; }; @@ -250,7 +251,7 @@ const apiKeyFactory = (apiKey: Partial = {}) => ({ userId: newUuid(), createdAt: newDate(), updatedAt: newDate(), - updateId: newUpdateId(), + updateId: newUuidV7(), name: 'Api Key', permissions: [Permission.ALL], ...apiKey, @@ -260,7 +261,7 @@ const libraryFactory = (library: Partial = {}) => ({ id: newUuid(), createdAt: newDate(), updatedAt: newDate(), - updateId: newUpdateId(), + updateId: newUuidV7(), deletedAt: null, refreshedAt: null, name: 'Library', @@ -275,7 +276,7 @@ const memoryFactory = (memory: Partial = {}) => ({ id: newUuid(), createdAt: newDate(), updatedAt: newDate(), - updateId: newUpdateId(), + updateId: newUuidV7(), deletedAt: null, ownerId: newUuid(), type: MemoryType.ON_THIS_DAY, diff --git a/server/test/utils.ts b/server/test/utils.ts index 5738ae88dc..0b3cd186c7 100644 --- a/server/test/utils.ts +++ b/server/test/utils.ts @@ -438,3 +438,7 @@ export async function* makeStream(items: T[] = []): AsyncIterableIterator yield item; } } + +export const wait = (ms: number) => { + return new Promise((resolve) => setTimeout(resolve, ms)); +};