diff --git a/mobile/lib/domain/services/sync_stream.service.dart b/mobile/lib/domain/services/sync_stream.service.dart index c21a9cade5..7c25fbb345 100644 --- a/mobile/lib/domain/services/sync_stream.service.dart +++ b/mobile/lib/domain/services/sync_stream.service.dart @@ -139,14 +139,18 @@ class SyncStreamService { return _syncStreamRepository.updateAlbumUsersV1(data.cast(), debugLabel: 'backfill'); case SyncEntityType.albumUserDeleteV1: return _syncStreamRepository.deleteAlbumUsersV1(data.cast()); - case SyncEntityType.albumAssetV1: - return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'album'); + case SyncEntityType.albumAssetCreateV1: + return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'album asset create'); + case SyncEntityType.albumAssetUpdateV1: + return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'album asset update'); case SyncEntityType.albumAssetBackfillV1: - return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'album backfill'); - case SyncEntityType.albumAssetExifV1: - return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'album'); + return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'album asset backfill'); + case SyncEntityType.albumAssetExifCreateV1: + return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'album asset exif create'); + case SyncEntityType.albumAssetExifUpdateV1: + return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'album asset exif update'); case SyncEntityType.albumAssetExifBackfillV1: - return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'album backfill'); + return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'album asset exif backfill'); case SyncEntityType.albumToAssetV1: return _syncStreamRepository.updateAlbumToAssetsV1(data.cast()); case SyncEntityType.albumToAssetBackfillV1: diff --git a/mobile/lib/infrastructure/repositories/sync_api.repository.dart b/mobile/lib/infrastructure/repositories/sync_api.repository.dart index 78b2a9d957..48c38cf64b 100644 --- a/mobile/lib/infrastructure/repositories/sync_api.repository.dart +++ b/mobile/lib/infrastructure/repositories/sync_api.repository.dart @@ -149,9 +149,11 @@ const _kResponseMap = { SyncEntityType.albumUserV1: SyncAlbumUserV1.fromJson, SyncEntityType.albumUserBackfillV1: SyncAlbumUserV1.fromJson, SyncEntityType.albumUserDeleteV1: SyncAlbumUserDeleteV1.fromJson, - SyncEntityType.albumAssetV1: SyncAssetV1.fromJson, + SyncEntityType.albumAssetCreateV1: SyncAssetV1.fromJson, + SyncEntityType.albumAssetUpdateV1: SyncAssetV1.fromJson, SyncEntityType.albumAssetBackfillV1: SyncAssetV1.fromJson, - SyncEntityType.albumAssetExifV1: SyncAssetExifV1.fromJson, + SyncEntityType.albumAssetExifCreateV1: SyncAssetExifV1.fromJson, + SyncEntityType.albumAssetExifUpdateV1: SyncAssetExifV1.fromJson, SyncEntityType.albumAssetExifBackfillV1: SyncAssetExifV1.fromJson, SyncEntityType.albumToAssetV1: SyncAlbumToAssetV1.fromJson, SyncEntityType.albumToAssetBackfillV1: SyncAlbumToAssetV1.fromJson, diff --git a/mobile/openapi/lib/model/sync_entity_type.dart b/mobile/openapi/lib/model/sync_entity_type.dart index 5368126923..f259fdc9d9 100644 --- a/mobile/openapi/lib/model/sync_entity_type.dart +++ b/mobile/openapi/lib/model/sync_entity_type.dart @@ -44,9 +44,11 @@ class SyncEntityType { static const albumUserV1 = SyncEntityType._(r'AlbumUserV1'); static const albumUserBackfillV1 = SyncEntityType._(r'AlbumUserBackfillV1'); static const albumUserDeleteV1 = SyncEntityType._(r'AlbumUserDeleteV1'); - static const albumAssetV1 = SyncEntityType._(r'AlbumAssetV1'); + static const albumAssetCreateV1 = SyncEntityType._(r'AlbumAssetCreateV1'); + static const albumAssetUpdateV1 = SyncEntityType._(r'AlbumAssetUpdateV1'); static const albumAssetBackfillV1 = SyncEntityType._(r'AlbumAssetBackfillV1'); - static const albumAssetExifV1 = SyncEntityType._(r'AlbumAssetExifV1'); + static const albumAssetExifCreateV1 = SyncEntityType._(r'AlbumAssetExifCreateV1'); + static const albumAssetExifUpdateV1 = SyncEntityType._(r'AlbumAssetExifUpdateV1'); static const albumAssetExifBackfillV1 = SyncEntityType._(r'AlbumAssetExifBackfillV1'); static const albumToAssetV1 = SyncEntityType._(r'AlbumToAssetV1'); static const albumToAssetDeleteV1 = SyncEntityType._(r'AlbumToAssetDeleteV1'); @@ -89,9 +91,11 @@ class SyncEntityType { albumUserV1, albumUserBackfillV1, albumUserDeleteV1, - albumAssetV1, + albumAssetCreateV1, + albumAssetUpdateV1, albumAssetBackfillV1, - albumAssetExifV1, + albumAssetExifCreateV1, + albumAssetExifUpdateV1, albumAssetExifBackfillV1, albumToAssetV1, albumToAssetDeleteV1, @@ -169,9 +173,11 @@ class SyncEntityTypeTypeTransformer { case r'AlbumUserV1': return SyncEntityType.albumUserV1; case r'AlbumUserBackfillV1': return SyncEntityType.albumUserBackfillV1; case r'AlbumUserDeleteV1': return SyncEntityType.albumUserDeleteV1; - case r'AlbumAssetV1': return SyncEntityType.albumAssetV1; + case r'AlbumAssetCreateV1': return SyncEntityType.albumAssetCreateV1; + case r'AlbumAssetUpdateV1': return SyncEntityType.albumAssetUpdateV1; case r'AlbumAssetBackfillV1': return SyncEntityType.albumAssetBackfillV1; - case r'AlbumAssetExifV1': return SyncEntityType.albumAssetExifV1; + case r'AlbumAssetExifCreateV1': return SyncEntityType.albumAssetExifCreateV1; + case r'AlbumAssetExifUpdateV1': return SyncEntityType.albumAssetExifUpdateV1; case r'AlbumAssetExifBackfillV1': return SyncEntityType.albumAssetExifBackfillV1; case r'AlbumToAssetV1': return SyncEntityType.albumToAssetV1; case r'AlbumToAssetDeleteV1': return SyncEntityType.albumToAssetDeleteV1; diff --git a/open-api/immich-openapi-specs.json b/open-api/immich-openapi-specs.json index 7d3feb24a3..74a1407096 100644 --- a/open-api/immich-openapi-specs.json +++ b/open-api/immich-openapi-specs.json @@ -14944,9 +14944,11 @@ "AlbumUserV1", "AlbumUserBackfillV1", "AlbumUserDeleteV1", - "AlbumAssetV1", + "AlbumAssetCreateV1", + "AlbumAssetUpdateV1", "AlbumAssetBackfillV1", - "AlbumAssetExifV1", + "AlbumAssetExifCreateV1", + "AlbumAssetExifUpdateV1", "AlbumAssetExifBackfillV1", "AlbumToAssetV1", "AlbumToAssetDeleteV1", diff --git a/open-api/typescript-sdk/src/fetch-client.ts b/open-api/typescript-sdk/src/fetch-client.ts index 8b2ed427b4..a1a04ea566 100644 --- a/open-api/typescript-sdk/src/fetch-client.ts +++ b/open-api/typescript-sdk/src/fetch-client.ts @@ -4770,9 +4770,11 @@ export enum SyncEntityType { AlbumUserV1 = "AlbumUserV1", AlbumUserBackfillV1 = "AlbumUserBackfillV1", AlbumUserDeleteV1 = "AlbumUserDeleteV1", - AlbumAssetV1 = "AlbumAssetV1", + AlbumAssetCreateV1 = "AlbumAssetCreateV1", + AlbumAssetUpdateV1 = "AlbumAssetUpdateV1", AlbumAssetBackfillV1 = "AlbumAssetBackfillV1", - AlbumAssetExifV1 = "AlbumAssetExifV1", + AlbumAssetExifCreateV1 = "AlbumAssetExifCreateV1", + AlbumAssetExifUpdateV1 = "AlbumAssetExifUpdateV1", AlbumAssetExifBackfillV1 = "AlbumAssetExifBackfillV1", AlbumToAssetV1 = "AlbumToAssetV1", AlbumToAssetDeleteV1 = "AlbumToAssetDeleteV1", diff --git a/server/package-lock.json b/server/package-lock.json index 6d418288a6..2fff7747f9 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -85,6 +85,7 @@ "thumbhash": "^0.1.1", "typeorm": "^0.3.17", "ua-parser-js": "^2.0.0", + "uuid": "^11.1.0", "validator": "^13.12.0" }, "devDependencies": { @@ -8504,6 +8505,19 @@ "uuid": "^9.0.0" } }, + "node_modules/bullmq/node_modules/uuid": { + "version": "9.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz", + "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "license": "MIT", + "bin": { + "uuid": "dist/bin/uuid" + } + }, "node_modules/busboy": { "version": "1.6.0", "resolved": "https://registry.npmjs.org/busboy/-/busboy-1.6.0.tgz", @@ -11261,6 +11275,19 @@ "node": ">=14" } }, + "node_modules/gaxios/node_modules/uuid": { + "version": "9.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz", + "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "license": "MIT", + "bin": { + "uuid": "dist/bin/uuid" + } + }, "node_modules/gcp-metadata": { "version": "6.1.1", "resolved": "https://registry.npmjs.org/gcp-metadata/-/gcp-metadata-6.1.1.tgz", @@ -17767,19 +17794,6 @@ "url": "https://github.com/sponsors/isaacs" } }, - "node_modules/typeorm/node_modules/uuid": { - "version": "11.1.0", - "resolved": "https://registry.npmjs.org/uuid/-/uuid-11.1.0.tgz", - "integrity": "sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==", - "funding": [ - "https://github.com/sponsors/broofa", - "https://github.com/sponsors/ctavan" - ], - "license": "MIT", - "bin": { - "uuid": "dist/esm/bin/uuid" - } - }, "node_modules/typescript": { "version": "5.8.3", "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.8.3.tgz", @@ -18084,16 +18098,16 @@ "license": "MIT" }, "node_modules/uuid": { - "version": "9.0.1", - "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz", - "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==", + "version": "11.1.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-11.1.0.tgz", + "integrity": "sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==", "funding": [ "https://github.com/sponsors/broofa", "https://github.com/sponsors/ctavan" ], "license": "MIT", "bin": { - "uuid": "dist/bin/uuid" + "uuid": "dist/esm/bin/uuid" } }, "node_modules/validator": { diff --git a/server/package.json b/server/package.json index 60f224f950..100dd33d47 100644 --- a/server/package.json +++ b/server/package.json @@ -110,6 +110,7 @@ "thumbhash": "^0.1.1", "typeorm": "^0.3.17", "ua-parser-js": "^2.0.0", + "uuid": "^11.1.0", "validator": "^13.12.0" }, "devDependencies": { diff --git a/server/src/dtos/sync.dto.ts b/server/src/dtos/sync.dto.ts index 9c304c0d3c..9ac85755ab 100644 --- a/server/src/dtos/sync.dto.ts +++ b/server/src/dtos/sync.dto.ts @@ -339,9 +339,11 @@ export type SyncItem = { [SyncEntityType.AlbumUserV1]: SyncAlbumUserV1; [SyncEntityType.AlbumUserBackfillV1]: SyncAlbumUserV1; [SyncEntityType.AlbumUserDeleteV1]: SyncAlbumUserDeleteV1; - [SyncEntityType.AlbumAssetV1]: SyncAssetV1; + [SyncEntityType.AlbumAssetCreateV1]: SyncAssetV1; + [SyncEntityType.AlbumAssetUpdateV1]: SyncAssetV1; [SyncEntityType.AlbumAssetBackfillV1]: SyncAssetV1; - [SyncEntityType.AlbumAssetExifV1]: SyncAssetExifV1; + [SyncEntityType.AlbumAssetExifCreateV1]: SyncAssetExifV1; + [SyncEntityType.AlbumAssetExifUpdateV1]: SyncAssetExifV1; [SyncEntityType.AlbumAssetExifBackfillV1]: SyncAssetExifV1; [SyncEntityType.AlbumToAssetV1]: SyncAlbumToAssetV1; [SyncEntityType.AlbumToAssetBackfillV1]: SyncAlbumToAssetV1; diff --git a/server/src/enum.ts b/server/src/enum.ts index 8a6d361d35..8c7ee85a32 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -668,9 +668,11 @@ export enum SyncEntityType { AlbumUserBackfillV1 = 'AlbumUserBackfillV1', AlbumUserDeleteV1 = 'AlbumUserDeleteV1', - AlbumAssetV1 = 'AlbumAssetV1', + AlbumAssetCreateV1 = 'AlbumAssetCreateV1', + AlbumAssetUpdateV1 = 'AlbumAssetUpdateV1', AlbumAssetBackfillV1 = 'AlbumAssetBackfillV1', - AlbumAssetExifV1 = 'AlbumAssetExifV1', + AlbumAssetExifCreateV1 = 'AlbumAssetExifCreateV1', + AlbumAssetExifUpdateV1 = 'AlbumAssetExifUpdateV1', AlbumAssetExifBackfillV1 = 'AlbumAssetExifBackfillV1', AlbumToAssetV1 = 'AlbumToAssetV1', diff --git a/server/src/queries/sync.checkpoint.repository.sql b/server/src/queries/sync.checkpoint.repository.sql index 018054ff79..e99d90cd54 100644 --- a/server/src/queries/sync.checkpoint.repository.sql +++ b/server/src/queries/sync.checkpoint.repository.sql @@ -13,3 +13,7 @@ where delete from "session_sync_checkpoint" where "sessionId" = $1 + +-- SyncCheckpointRepository.getNow +select + immich_uuid_v7 (now() - interval '1 millisecond') as "nowId" diff --git a/server/src/queries/sync.repository.sql b/server/src/queries/sync.repository.sql index 28c6f32acc..0bd5b9f47d 100644 --- a/server/src/queries/sync.repository.sql +++ b/server/src/queries/sync.repository.sql @@ -9,7 +9,7 @@ from where "usersId" = $1 and "createId" >= $2 - and "createdAt" < now() - interval '1 millisecond' + and "createId" < $3 order by "createId" asc @@ -21,7 +21,7 @@ from "album_audit" where "userId" = $1 - and "deletedAt" < now() - interval '1 millisecond' + and "id" < $2 order by "id" asc @@ -41,10 +41,10 @@ from "album" left join "album_user" as "album_users" on "album"."id" = "album_users"."albumsId" where - "album"."updatedAt" < now() - interval '1 millisecond' + "album"."updateId" < $1 and ( - "album"."ownerId" = $1 - or "album_users"."usersId" = $2 + "album"."ownerId" = $2 + or "album_users"."usersId" = $3 ) order by "album"."updateId" asc @@ -67,20 +67,21 @@ select "asset"."livePhotoVideoId", "asset"."stackId", "asset"."libraryId", - "asset"."updateId" + "album_asset"."updateId" from - "asset" - inner join "album_asset" on "album_asset"."assetsId" = "asset"."id" + "album_asset" + inner join "asset" on "asset"."id" = "album_asset"."assetsId" where "album_asset"."albumsId" = $1 - and "asset"."updatedAt" < now() - interval '1 millisecond' - and "asset"."updateId" <= $2 - and "asset"."updateId" >= $3 + and "album_asset"."updateId" < $2 + and "album_asset"."updateId" <= $3 + and "album_asset"."updateId" >= $4 order by - "asset"."updateId" asc + "album_asset"."updateId" asc --- SyncRepository.albumAsset.getUpserts +-- SyncRepository.albumAsset.getCreates select + "album_asset"."updateId", "asset"."id", "asset"."ownerId", "asset"."originalFileName", @@ -96,21 +97,20 @@ select "asset"."duration", "asset"."livePhotoVideoId", "asset"."stackId", - "asset"."libraryId", - "asset"."updateId" + "asset"."libraryId" from - "asset" - inner join "album_asset" on "album_asset"."assetsId" = "asset"."id" + "album_asset" + inner join "asset" on "asset"."id" = "album_asset"."assetsId" inner join "album" on "album"."id" = "album_asset"."albumsId" left join "album_user" on "album_user"."albumsId" = "album_asset"."albumsId" where - "asset"."updatedAt" < now() - interval '1 millisecond' + "album_asset"."updateId" < $1 and ( - "album"."ownerId" = $1 - or "album_user"."usersId" = $2 + "album"."ownerId" = $2 + or "album_user"."usersId" = $3 ) order by - "asset"."updateId" asc + "album_asset"."updateId" asc -- SyncRepository.albumAssetExif.getBackfill select @@ -139,20 +139,21 @@ select "asset_exif"."profileDescription", "asset_exif"."rating", "asset_exif"."fps", - "asset_exif"."updateId" + "album_asset"."updateId" from - "asset_exif" - inner join "album_asset" on "album_asset"."assetsId" = "asset_exif"."assetId" + "album_asset" + inner join "asset_exif" on "asset_exif"."assetId" = "album_asset"."assetsId" where "album_asset"."albumsId" = $1 - and "asset_exif"."updatedAt" < now() - interval '1 millisecond' - and "asset_exif"."updateId" <= $2 - and "asset_exif"."updateId" >= $3 + and "album_asset"."updateId" < $2 + and "album_asset"."updateId" <= $3 + and "album_asset"."updateId" >= $4 order by - "asset_exif"."updateId" asc + "album_asset"."updateId" asc --- SyncRepository.albumAssetExif.getUpserts +-- SyncRepository.albumAssetExif.getCreates select + "album_asset"."updateId", "asset_exif"."assetId", "asset_exif"."description", "asset_exif"."exifImageWidth", @@ -177,21 +178,20 @@ select "asset_exif"."exposureTime", "asset_exif"."profileDescription", "asset_exif"."rating", - "asset_exif"."fps", - "asset_exif"."updateId" + "asset_exif"."fps" from - "asset_exif" - inner join "album_asset" on "album_asset"."assetsId" = "asset_exif"."assetId" + "album_asset" + inner join "asset_exif" on "asset_exif"."assetId" = "album_asset"."assetsId" inner join "album" on "album"."id" = "album_asset"."albumsId" left join "album_user" on "album_user"."albumsId" = "album_asset"."albumsId" where - "asset_exif"."updatedAt" < now() - interval '1 millisecond' + "album_asset"."updateId" < $1 and ( - "album"."ownerId" = $1 - or "album_user"."usersId" = $2 + "album"."ownerId" = $2 + or "album_user"."usersId" = $3 ) order by - "asset_exif"."updateId" asc + "album_asset"."updateId" asc -- SyncRepository.albumToAsset.getBackfill select @@ -202,9 +202,9 @@ from "album_asset" as "album_assets" where "album_assets"."albumsId" = $1 - and "album_assets"."updatedAt" < now() - interval '1 millisecond' - and "album_assets"."updateId" <= $2 - and "album_assets"."updateId" >= $3 + and "album_assets"."updateId" < $2 + and "album_assets"."updateId" <= $3 + and "album_assets"."updateId" >= $4 order by "album_assets"."updateId" asc @@ -233,7 +233,7 @@ where "album_user"."usersId" = $2 ) ) - and "deletedAt" < now() - interval '1 millisecond' + and "id" < $3 order by "id" asc @@ -247,10 +247,10 @@ from inner join "album" on "album"."id" = "album_asset"."albumsId" left join "album_user" on "album_user"."albumsId" = "album_asset"."albumsId" where - "album_asset"."updatedAt" < now() - interval '1 millisecond' + "album_asset"."updateId" < $1 and ( - "album"."ownerId" = $1 - or "album_user"."usersId" = $2 + "album"."ownerId" = $2 + or "album_user"."usersId" = $3 ) order by "album_asset"."updateId" asc @@ -265,9 +265,9 @@ from "album_user" where "albumsId" = $1 - and "updatedAt" < now() - interval '1 millisecond' - and "updateId" <= $2 - and "updateId" >= $3 + and "updateId" < $2 + and "updateId" <= $3 + and "updateId" >= $4 order by "updateId" asc @@ -296,7 +296,7 @@ where "album_user"."usersId" = $2 ) ) - and "deletedAt" < now() - interval '1 millisecond' + and "id" < $3 order by "id" asc @@ -309,14 +309,14 @@ select from "album_user" where - "album_user"."updatedAt" < now() - interval '1 millisecond' + "album_user"."updateId" < $1 and "album_user"."albumsId" in ( select "id" from "album" where - "ownerId" = $1 + "ownerId" = $2 union ( select @@ -324,7 +324,7 @@ where from "album_user" as "albumUsers" where - "albumUsers"."usersId" = $2 + "albumUsers"."usersId" = $3 ) ) order by @@ -338,7 +338,7 @@ from "asset_audit" where "ownerId" = $1 - and "deletedAt" < now() - interval '1 millisecond' + and "id" < $2 order by "id" asc @@ -365,7 +365,7 @@ from "asset" where "ownerId" = $1 - and "updatedAt" < now() - interval '1 millisecond' + and "updateId" < $2 order by "updateId" asc @@ -408,7 +408,7 @@ where where "ownerId" = $1 ) - and "updatedAt" < now() - interval '1 millisecond' + and "updateId" < $2 order by "updateId" asc @@ -421,7 +421,7 @@ from left join "asset" on "asset"."id" = "asset_face_audit"."assetId" where "asset"."ownerId" = $1 - and "asset_face_audit"."deletedAt" < now() - interval '1 millisecond' + and "asset_face_audit"."id" < $2 order by "asset_face_audit"."id" asc @@ -442,34 +442,11 @@ from "asset_face" left join "asset" on "asset"."id" = "asset_face"."assetId" where - "asset_face"."updatedAt" < now() - interval '1 millisecond' - and "asset"."ownerId" = $1 + "asset_face"."updateId" < $1 + and "asset"."ownerId" = $2 order by "asset_face"."updateId" asc --- SyncRepository.authUser.getUpserts -select - "id", - "name", - "email", - "avatarColor", - "deletedAt", - "updateId", - "profileImagePath", - "profileChangedAt", - "isAdmin", - "pinCode", - "oauthId", - "storageLabel", - "quotaSizeInBytes", - "quotaUsageInBytes" -from - "user" -where - "updatedAt" < now() - interval '1 millisecond' -order by - "updateId" asc - -- SyncRepository.memory.getDeletes select "id", @@ -478,7 +455,7 @@ from "memory_audit" where "userId" = $1 - and "deletedAt" < now() - interval '1 millisecond' + and "id" < $2 order by "id" asc @@ -501,7 +478,7 @@ from "memory" where "ownerId" = $1 - and "updatedAt" < now() - interval '1 millisecond' + and "updateId" < $2 order by "updateId" asc @@ -521,7 +498,7 @@ where where "ownerId" = $1 ) - and "deletedAt" < now() - interval '1 millisecond' + and "id" < $2 order by "id" asc @@ -541,7 +518,7 @@ where where "ownerId" = $1 ) - and "updatedAt" < now() - interval '1 millisecond' + and "updateId" < $2 order by "updateId" asc @@ -553,8 +530,7 @@ from "partner" where "sharedWithId" = $1 - and "createId" >= $2 - and "createdAt" < now() - interval '1 millisecond' + and "createId" < $2 order by "partner"."createId" asc @@ -570,7 +546,7 @@ where "sharedById" = $1 or "sharedWithId" = $2 ) - and "deletedAt" < now() - interval '1 millisecond' + and "id" < $3 order by "id" asc @@ -587,7 +563,7 @@ where "sharedById" = $1 or "sharedWithId" = $2 ) - and "updatedAt" < now() - interval '1 millisecond' + and "updateId" < $3 order by "updateId" asc @@ -614,9 +590,9 @@ from "asset" where "ownerId" = $1 - and "updatedAt" < now() - interval '1 millisecond' - and "updateId" <= $2 - and "updateId" >= $3 + and "updateId" < $2 + and "updateId" <= $3 + and "updateId" >= $4 order by "updateId" asc @@ -635,7 +611,7 @@ where where "sharedWithId" = $1 ) - and "deletedAt" < now() - interval '1 millisecond' + and "id" < $2 order by "id" asc @@ -669,7 +645,7 @@ where where "sharedWithId" = $1 ) - and "updatedAt" < now() - interval '1 millisecond' + and "updateId" < $2 order by "updateId" asc @@ -706,9 +682,9 @@ from inner join "asset" on "asset"."id" = "asset_exif"."assetId" where "asset"."ownerId" = $1 - and "asset_exif"."updatedAt" < now() - interval '1 millisecond' - and "asset_exif"."updateId" <= $2 - and "asset_exif"."updateId" >= $3 + and "asset_exif"."updateId" < $2 + and "asset_exif"."updateId" <= $3 + and "asset_exif"."updateId" >= $4 order by "asset_exif"."updateId" asc @@ -758,7 +734,7 @@ where "sharedWithId" = $1 ) ) - and "updatedAt" < now() - interval '1 millisecond' + and "updateId" < $2 order by "updateId" asc @@ -777,7 +753,7 @@ where where "sharedWithId" = $1 ) - and "deletedAt" < now() - interval '1 millisecond' + and "id" < $2 order by "id" asc @@ -793,9 +769,9 @@ from "stack" where "ownerId" = $1 - and "updatedAt" < now() - interval '1 millisecond' - and "updateId" <= $2 - and "updateId" >= $3 + and "updateId" < $2 + and "updateId" <= $3 + and "updateId" >= $4 order by "updateId" asc @@ -818,7 +794,7 @@ where where "sharedWithId" = $1 ) - and "updatedAt" < now() - interval '1 millisecond' + and "updateId" < $2 order by "updateId" asc @@ -830,7 +806,7 @@ from "person_audit" where "ownerId" = $1 - and "deletedAt" < now() - interval '1 millisecond' + and "id" < $2 order by "id" asc @@ -851,7 +827,7 @@ from "person" where "ownerId" = $1 - and "updatedAt" < now() - interval '1 millisecond' + and "updateId" < $2 order by "updateId" asc @@ -863,7 +839,7 @@ from "stack_audit" where "userId" = $1 - and "deletedAt" < now() - interval '1 millisecond' + and "id" < $2 order by "id" asc @@ -879,7 +855,7 @@ from "stack" where "ownerId" = $1 - and "updatedAt" < now() - interval '1 millisecond' + and "updateId" < $2 order by "updateId" asc @@ -890,7 +866,7 @@ select from "user_audit" where - "deletedAt" < now() - interval '1 millisecond' + "id" < $1 order by "id" asc @@ -907,7 +883,7 @@ select from "user" where - "updatedAt" < now() - interval '1 millisecond' + "updateId" < $1 order by "updateId" asc @@ -920,7 +896,7 @@ from "user_metadata_audit" where "userId" = $1 - and "deletedAt" < now() - interval '1 millisecond' + and "id" < $2 order by "id" asc @@ -934,6 +910,6 @@ from "user_metadata" where "userId" = $1 - and "updatedAt" < now() - interval '1 millisecond' + and "updateId" < $2 order by "updateId" asc diff --git a/server/src/repositories/sync-checkpoint.repository.ts b/server/src/repositories/sync-checkpoint.repository.ts index 65fd018136..9db56e1bfe 100644 --- a/server/src/repositories/sync-checkpoint.repository.ts +++ b/server/src/repositories/sync-checkpoint.repository.ts @@ -1,5 +1,5 @@ import { Injectable } from '@nestjs/common'; -import { Insertable, Kysely } from 'kysely'; +import { Insertable, Kysely, sql } from 'kysely'; import { InjectKysely } from 'nestjs-kysely'; import { DummyValue, GenerateSql } from 'src/decorators'; import { SyncEntityType } from 'src/enum'; @@ -39,4 +39,13 @@ export class SyncCheckpointRepository { .$if(!!types, (qb) => qb.where('type', 'in', types!)) .execute(); } + + @GenerateSql() + getNow() { + return this.db + .selectNoFrom((eb) => [ + eb.fn('immich_uuid_v7', [sql.raw("now() - interval '1 millisecond'")]).as('nowId'), + ]) + .executeTakeFirstOrThrow(); + } } diff --git a/server/src/repositories/sync.repository.ts b/server/src/repositories/sync.repository.ts index d72ddcfc4d..44191ccabd 100644 --- a/server/src/repositories/sync.repository.ts +++ b/server/src/repositories/sync.repository.ts @@ -1,5 +1,5 @@ import { Injectable } from '@nestjs/common'; -import { Kysely, SelectQueryBuilder, sql } from 'kysely'; +import { Kysely, SelectQueryBuilder } from 'kysely'; import { InjectKysely } from 'nestjs-kysely'; import { columns } from 'src/database'; import { DummyValue, GenerateSql } from 'src/decorators'; @@ -33,6 +33,11 @@ type UpsertTables = | 'user_metadata' | 'asset_face'; +export type SyncQueryOptions = { + nowId: string; + userId: string; +}; + @Injectable() export class SyncRepository { album: AlbumSync; @@ -81,21 +86,21 @@ export class SyncRepository { class BaseSync { constructor(protected db: Kysely) {} - protected auditTableFilters(ack?: SyncAck) { + protected auditTableFilters(nowId: string, ack?: SyncAck) { return , D>(qb: SelectQueryBuilder) => { const builder = qb as SelectQueryBuilder; return builder - .where('deletedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('id', '<', nowId) .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) .orderBy('id', 'asc') as SelectQueryBuilder; }; } - protected upsertTableFilters(ack?: SyncAck) { + protected upsertTableFilters(nowId: string, ack?: SyncAck) { return , D>(qb: SelectQueryBuilder) => { const builder = qb as SelectQueryBuilder; return builder - .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('updateId', '<', nowId) .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) .orderBy('updateId', 'asc') as SelectQueryBuilder; }; @@ -103,34 +108,34 @@ class BaseSync { } class AlbumSync extends BaseSync { - @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID] }) - getCreatedAfter(userId: string, afterCreateId?: string) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID] }) + getCreatedAfter({ nowId, userId }: SyncQueryOptions, afterCreateId?: string) { return this.db .selectFrom('album_user') .select(['albumsId as id', 'createId']) .where('usersId', '=', userId) .$if(!!afterCreateId, (qb) => qb.where('createId', '>=', afterCreateId!)) - .where('createdAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('createId', '<', nowId) .orderBy('createId', 'asc') .execute(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getDeletes(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('album_audit') .select(['id', 'albumId']) .where('userId', '=', userId) - .$call(this.auditTableFilters(ack)) + .$call(this.auditTableFilters(nowId, ack)) .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getUpserts(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('album') .distinctOn(['album.id', 'album.updateId']) - .where('album.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('album.updateId', '<', nowId) .$if(!!ack, (qb) => qb.where('album.updateId', '>', ack!.updateId)) .orderBy('album.updateId', 'asc') .leftJoin('album_user as album_users', 'album.id', 'album_users.albumsId') @@ -152,29 +157,33 @@ class AlbumSync extends BaseSync { } class AlbumAssetSync extends BaseSync { - @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) - getBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { + @GenerateSql({ + params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], + stream: true, + }) + getBackfill({ nowId }: SyncQueryOptions, albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { return this.db - .selectFrom('asset') - .innerJoin('album_asset', 'album_asset.assetsId', 'asset.id') + .selectFrom('album_asset') + .innerJoin('asset', 'asset.id', 'album_asset.assetsId') .select(columns.syncAsset) - .select('asset.updateId') + .select('album_asset.updateId') .where('album_asset.albumsId', '=', albumId) - .where('asset.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .where('asset.updateId', '<=', beforeUpdateId) - .$if(!!afterUpdateId, (eb) => eb.where('asset.updateId', '>=', afterUpdateId!)) - .orderBy('asset.updateId', 'asc') + .where('album_asset.updateId', '<', nowId) + .where('album_asset.updateId', '<=', beforeUpdateId) + .$if(!!afterUpdateId, (eb) => eb.where('album_asset.updateId', '>=', afterUpdateId!)) + .orderBy('album_asset.updateId', 'asc') .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getUpserts(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpdates({ nowId, userId }: SyncQueryOptions, albumToAssetAck: SyncAck, ack?: SyncAck) { return this.db .selectFrom('asset') .innerJoin('album_asset', 'album_asset.assetsId', 'asset.id') .select(columns.syncAsset) .select('asset.updateId') - .where('asset.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('asset.updateId', '<', nowId) + .where('album_asset.updateId', '<=', albumToAssetAck.updateId) // Ensure we only send updates for assets that the client already knows about .$if(!!ack, (qb) => qb.where('asset.updateId', '>', ack!.updateId)) .orderBy('asset.updateId', 'asc') .innerJoin('album', 'album.id', 'album_asset.albumsId') @@ -182,32 +191,52 @@ class AlbumAssetSync extends BaseSync { .where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_user.usersId', '=', userId)])) .stream(); } + + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getCreates({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { + return this.db + .selectFrom('album_asset') + .select('album_asset.updateId') + .innerJoin('asset', 'asset.id', 'album_asset.assetsId') + .select(columns.syncAsset) + .innerJoin('album', 'album.id', 'album_asset.albumsId') + .leftJoin('album_user', 'album_user.albumsId', 'album_asset.albumsId') + .where('album_asset.updateId', '<', nowId) + .where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_user.usersId', '=', userId)])) + .$if(!!ack, (qb) => qb.where('album_asset.updateId', '>', ack!.updateId)) + .orderBy('album_asset.updateId', 'asc') + .stream(); + } } class AlbumAssetExifSync extends BaseSync { - @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) - getBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { + @GenerateSql({ + params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], + stream: true, + }) + getBackfill({ nowId }: SyncQueryOptions, albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { return this.db - .selectFrom('asset_exif') - .innerJoin('album_asset', 'album_asset.assetsId', 'asset_exif.assetId') + .selectFrom('album_asset') + .innerJoin('asset_exif', 'asset_exif.assetId', 'album_asset.assetsId') .select(columns.syncAssetExif) - .select('asset_exif.updateId') + .select('album_asset.updateId') .where('album_asset.albumsId', '=', albumId) - .where('asset_exif.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .where('asset_exif.updateId', '<=', beforeUpdateId) - .$if(!!afterUpdateId, (eb) => eb.where('asset_exif.updateId', '>=', afterUpdateId!)) - .orderBy('asset_exif.updateId', 'asc') + .where('album_asset.updateId', '<', nowId) + .where('album_asset.updateId', '<=', beforeUpdateId) + .$if(!!afterUpdateId, (eb) => eb.where('album_asset.updateId', '>=', afterUpdateId!)) + .orderBy('album_asset.updateId', 'asc') .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getUpserts(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpdates({ nowId, userId }: SyncQueryOptions, albumToAssetAck: SyncAck, ack?: SyncAck) { return this.db .selectFrom('asset_exif') .innerJoin('album_asset', 'album_asset.assetsId', 'asset_exif.assetId') .select(columns.syncAssetExif) .select('asset_exif.updateId') - .where('asset_exif.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('album_asset.updateId', '<=', albumToAssetAck.updateId) // Ensure we only send exif updates for assets that the client already knows about + .where('asset_exif.updateId', '<', nowId) .$if(!!ack, (qb) => qb.where('asset_exif.updateId', '>', ack!.updateId)) .orderBy('asset_exif.updateId', 'asc') .innerJoin('album', 'album.id', 'album_asset.albumsId') @@ -215,24 +244,43 @@ class AlbumAssetExifSync extends BaseSync { .where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_user.usersId', '=', userId)])) .stream(); } + + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getCreates({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { + return this.db + .selectFrom('album_asset') + .select('album_asset.updateId') + .innerJoin('asset_exif', 'asset_exif.assetId', 'album_asset.assetsId') + .select(columns.syncAssetExif) + .innerJoin('album', 'album.id', 'album_asset.albumsId') + .leftJoin('album_user', 'album_user.albumsId', 'album_asset.albumsId') + .where('album_asset.updateId', '<', nowId) + .where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_user.usersId', '=', userId)])) + .$if(!!ack, (qb) => qb.where('album_asset.updateId', '>', ack!.updateId)) + .orderBy('album_asset.updateId', 'asc') + .stream(); + } } class AlbumToAssetSync extends BaseSync { - @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) - getBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { + @GenerateSql({ + params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], + stream: true, + }) + getBackfill({ nowId }: SyncQueryOptions, albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { return this.db .selectFrom('album_asset as album_assets') .select(['album_assets.assetsId as assetId', 'album_assets.albumsId as albumId', 'album_assets.updateId']) .where('album_assets.albumsId', '=', albumId) - .where('album_assets.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('album_assets.updateId', '<', nowId) .where('album_assets.updateId', '<=', beforeUpdateId) .$if(!!afterUpdateId, (eb) => eb.where('album_assets.updateId', '>=', afterUpdateId!)) .orderBy('album_assets.updateId', 'asc') .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getDeletes(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('album_asset_audit') .select(['id', 'assetId', 'albumId']) @@ -254,16 +302,16 @@ class AlbumToAssetSync extends BaseSync { ), ), ) - .$call(this.auditTableFilters(ack)) + .$call(this.auditTableFilters(nowId, ack)) .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getUpserts(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('album_asset') .select(['album_asset.assetsId as assetId', 'album_asset.albumsId as albumId', 'album_asset.updateId']) - .where('album_asset.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('album_asset.updateId', '<', nowId) .$if(!!ack, (qb) => qb.where('album_asset.updateId', '>', ack!.updateId)) .orderBy('album_asset.updateId', 'asc') .innerJoin('album', 'album.id', 'album_asset.albumsId') @@ -274,22 +322,25 @@ class AlbumToAssetSync extends BaseSync { } class AlbumUserSync extends BaseSync { - @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) - getBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { + @GenerateSql({ + params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], + stream: true, + }) + getBackfill({ nowId }: SyncQueryOptions, albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { return this.db .selectFrom('album_user') .select(columns.syncAlbumUser) .select('album_user.updateId') .where('albumsId', '=', albumId) - .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('updateId', '<', nowId) .where('updateId', '<=', beforeUpdateId) .$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!)) .orderBy('updateId', 'asc') .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getDeletes(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('album_user_audit') .select(['id', 'userId', 'albumId']) @@ -311,17 +362,17 @@ class AlbumUserSync extends BaseSync { ), ), ) - .$call(this.auditTableFilters(ack)) + .$call(this.auditTableFilters(nowId, ack)) .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getUpserts(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('album_user') .select(columns.syncAlbumUser) .select('album_user.updateId') - .where('album_user.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('album_user.updateId', '<', nowId) .$if(!!ack, (qb) => qb.where('album_user.updateId', '>', ack!.updateId)) .orderBy('album_user.updateId', 'asc') .where((eb) => @@ -347,53 +398,53 @@ class AlbumUserSync extends BaseSync { } class AssetSync extends BaseSync { - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getDeletes(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('asset_audit') .select(['id', 'assetId']) .where('ownerId', '=', userId) - .$call(this.auditTableFilters(ack)) + .$call(this.auditTableFilters(nowId, ack)) .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getUpserts(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('asset') .select(columns.syncAsset) .select('asset.updateId') .where('ownerId', '=', userId) - .$call(this.upsertTableFilters(ack)) + .$call(this.upsertTableFilters(nowId, ack)) .stream(); } } class AuthUserSync extends BaseSync { @GenerateSql({ params: [], stream: true }) - getUpserts(ack?: SyncAck) { + getUpserts({ nowId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('user') .select(columns.syncUser) .select(['isAdmin', 'pinCode', 'oauthId', 'storageLabel', 'quotaSizeInBytes', 'quotaUsageInBytes']) - .$call(this.upsertTableFilters(ack)) + .$call(this.upsertTableFilters(nowId, ack)) .stream(); } } class PersonSync extends BaseSync { - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getDeletes(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('person_audit') .select(['id', 'personId']) .where('ownerId', '=', userId) - .$call(this.auditTableFilters(ack)) + .$call(this.auditTableFilters(nowId, ack)) .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getUpserts(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('person') .select([ @@ -410,27 +461,27 @@ class PersonSync extends BaseSync { 'faceAssetId', ]) .where('ownerId', '=', userId) - .$call(this.upsertTableFilters(ack)) + .$call(this.upsertTableFilters(nowId, ack)) .stream(); } } class AssetFaceSync extends BaseSync { - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getDeletes(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('asset_face_audit') .select(['asset_face_audit.id', 'assetFaceId']) .orderBy('asset_face_audit.id', 'asc') .leftJoin('asset', 'asset.id', 'asset_face_audit.assetId') .where('asset.ownerId', '=', userId) - .where('asset_face_audit.deletedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('asset_face_audit.id', '<', nowId) .$if(!!ack, (qb) => qb.where('asset_face_audit.id', '>', ack!.updateId)) .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getUpserts(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('asset_face') .select([ @@ -446,7 +497,7 @@ class AssetFaceSync extends BaseSync { 'sourceType', 'asset_face.updateId', ]) - .where('asset_face.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('asset_face.updateId', '<', nowId) .$if(!!ack, (qb) => qb.where('asset_face.updateId', '>', ack!.updateId)) .orderBy('asset_face.updateId', 'asc') .leftJoin('asset', 'asset.id', 'asset_face.assetId') @@ -456,31 +507,31 @@ class AssetFaceSync extends BaseSync { } class AssetExifSync extends BaseSync { - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getUpserts(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('asset_exif') .select(columns.syncAssetExif) .select('asset_exif.updateId') .where('assetId', 'in', (eb) => eb.selectFrom('asset').select('id').where('ownerId', '=', userId)) - .$call(this.upsertTableFilters(ack)) + .$call(this.upsertTableFilters(nowId, ack)) .stream(); } } class MemorySync extends BaseSync { - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getDeletes(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('memory_audit') .select(['id', 'memoryId']) .where('userId', '=', userId) - .$call(this.auditTableFilters(ack)) + .$call(this.auditTableFilters(nowId, ack)) .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getUpserts(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('memory') .select([ @@ -499,97 +550,105 @@ class MemorySync extends BaseSync { ]) .select('updateId') .where('ownerId', '=', userId) - .$call(this.upsertTableFilters(ack)) + .$call(this.upsertTableFilters(nowId, ack)) .stream(); } } class MemoryToAssetSync extends BaseSync { - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getDeletes(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('memory_asset_audit') .select(['id', 'memoryId', 'assetId']) .where('memoryId', 'in', (eb) => eb.selectFrom('memory').select('id').where('ownerId', '=', userId)) - .$call(this.auditTableFilters(ack)) + .$call(this.auditTableFilters(nowId, ack)) .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getUpserts(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('memory_asset') .select(['memoriesId as memoryId', 'assetsId as assetId']) .select('updateId') .where('memoriesId', 'in', (eb) => eb.selectFrom('memory').select('id').where('ownerId', '=', userId)) - .$call(this.upsertTableFilters(ack)) + .$call(this.upsertTableFilters(nowId, ack)) .stream(); } } class PartnerSync extends BaseSync { - @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID] }) - getCreatedAfter(userId: string, afterCreateId?: string) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }] }) + getCreatedAfter({ nowId, userId }: SyncQueryOptions, afterCreateId?: string) { return this.db .selectFrom('partner') .select(['sharedById', 'createId']) .where('sharedWithId', '=', userId) .$if(!!afterCreateId, (qb) => qb.where('createId', '>=', afterCreateId!)) - .where('createdAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('createId', '<', nowId) .orderBy('partner.createId', 'asc') .execute(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getDeletes(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('partner_audit') .select(['id', 'sharedById', 'sharedWithId']) .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) - .$call(this.auditTableFilters(ack)) + .$call(this.auditTableFilters(nowId, ack)) .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getUpserts(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('partner') .select(['sharedById', 'sharedWithId', 'inTimeline', 'updateId']) .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) - .$call(this.upsertTableFilters(ack)) + .$call(this.upsertTableFilters(nowId, ack)) .stream(); } } class PartnerAssetsSync extends BaseSync { - @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) - getBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { + @GenerateSql({ + params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], + stream: true, + }) + getBackfill( + { nowId }: SyncQueryOptions, + partnerId: string, + afterUpdateId: string | undefined, + beforeUpdateId: string, + ) { return this.db .selectFrom('asset') .select(columns.syncAsset) .select('asset.updateId') .where('ownerId', '=', partnerId) - .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('updateId', '<', nowId) .where('updateId', '<=', beforeUpdateId) .$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!)) .orderBy('updateId', 'asc') .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getDeletes(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('asset_audit') .select(['id', 'assetId']) .where('ownerId', 'in', (eb) => eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId), ) - .$call(this.auditTableFilters(ack)) + .$call(this.auditTableFilters(nowId, ack)) .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getUpserts(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('asset') .select(columns.syncAsset) @@ -597,29 +656,37 @@ class PartnerAssetsSync extends BaseSync { .where('ownerId', 'in', (eb) => eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId), ) - .$call(this.upsertTableFilters(ack)) + .$call(this.upsertTableFilters(nowId, ack)) .stream(); } } class PartnerAssetExifsSync extends BaseSync { - @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) - getBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { + @GenerateSql({ + params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], + stream: true, + }) + getBackfill( + { nowId }: SyncQueryOptions, + partnerId: string, + afterUpdateId: string | undefined, + beforeUpdateId: string, + ) { return this.db .selectFrom('asset_exif') .select(columns.syncAssetExif) .select('asset_exif.updateId') .innerJoin('asset', 'asset.id', 'asset_exif.assetId') .where('asset.ownerId', '=', partnerId) - .where('asset_exif.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('asset_exif.updateId', '<', nowId) .where('asset_exif.updateId', '<=', beforeUpdateId) .$if(!!afterUpdateId, (eb) => eb.where('asset_exif.updateId', '>=', afterUpdateId!)) .orderBy('asset_exif.updateId', 'asc') .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getUpserts(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('asset_exif') .select(columns.syncAssetExif) @@ -632,61 +699,69 @@ class PartnerAssetExifsSync extends BaseSync { eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId), ), ) - .$call(this.upsertTableFilters(ack)) + .$call(this.upsertTableFilters(nowId, ack)) .stream(); } } class StackSync extends BaseSync { - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getDeletes(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('stack_audit') .select(['id', 'stackId']) .where('userId', '=', userId) - .$call(this.auditTableFilters(ack)) + .$call(this.auditTableFilters(nowId, ack)) .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getUpserts(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('stack') .select(columns.syncStack) .select('updateId') .where('ownerId', '=', userId) - .$call(this.upsertTableFilters(ack)) + .$call(this.upsertTableFilters(nowId, ack)) .stream(); } } class PartnerStackSync extends BaseSync { - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getDeletes(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('stack_audit') .select(['id', 'stackId']) .where('userId', 'in', (eb) => eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId)) - .$call(this.auditTableFilters(ack)) + .$call(this.auditTableFilters(nowId, ack)) .stream(); } - @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) - getBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { + @GenerateSql({ + params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], + stream: true, + }) + getBackfill( + { nowId }: SyncQueryOptions, + partnerId: string, + afterUpdateId: string | undefined, + beforeUpdateId: string, + ) { return this.db .selectFrom('stack') .select(columns.syncStack) .select('updateId') .where('ownerId', '=', partnerId) - .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('updateId', '<', nowId) .where('updateId', '<=', beforeUpdateId) .$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!)) .orderBy('updateId', 'asc') .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getUpserts(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('stack') .select(columns.syncStack) @@ -694,41 +769,41 @@ class PartnerStackSync extends BaseSync { .where('ownerId', 'in', (eb) => eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId), ) - .$call(this.upsertTableFilters(ack)) + .$call(this.upsertTableFilters(nowId, ack)) .stream(); } } class UserSync extends BaseSync { - @GenerateSql({ params: [], stream: true }) - getDeletes(ack?: SyncAck) { - return this.db.selectFrom('user_audit').select(['id', 'userId']).$call(this.auditTableFilters(ack)).stream(); + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getDeletes({ nowId }: SyncQueryOptions, ack?: SyncAck) { + return this.db.selectFrom('user_audit').select(['id', 'userId']).$call(this.auditTableFilters(nowId, ack)).stream(); } - @GenerateSql({ params: [], stream: true }) - getUpserts(ack?: SyncAck) { - return this.db.selectFrom('user').select(columns.syncUser).$call(this.upsertTableFilters(ack)).stream(); + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpserts({ nowId }: SyncQueryOptions, ack?: SyncAck) { + return this.db.selectFrom('user').select(columns.syncUser).$call(this.upsertTableFilters(nowId, ack)).stream(); } } class UserMetadataSync extends BaseSync { - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getDeletes(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('user_metadata_audit') .select(['id', 'userId', 'key']) .where('userId', '=', userId) - .$call(this.auditTableFilters(ack)) + .$call(this.auditTableFilters(nowId, ack)) .stream(); } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getUpserts(userId: string, ack?: SyncAck) { + @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) + getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { return this.db .selectFrom('user_metadata') .select(['userId', 'key', 'value', 'updateId']) .where('userId', '=', userId) - .$call(this.upsertTableFilters(ack)) + .$call(this.upsertTableFilters(nowId, ack)) .stream(); } } diff --git a/server/src/schema/migrations/1754389095885-ResetAlbumAssetSync.ts b/server/src/schema/migrations/1754389095885-ResetAlbumAssetSync.ts new file mode 100644 index 0000000000..34bdfbd4ab --- /dev/null +++ b/server/src/schema/migrations/1754389095885-ResetAlbumAssetSync.ts @@ -0,0 +1,7 @@ +import { Kysely, sql } from 'kysely'; + +export async function up(db: Kysely): Promise { + await sql`DELETE FROM session_sync_checkpoint WHERE type IN ('AlbumAssetBackfillV1', 'AlbumAssetExifV1', 'AlbumAssetV1')`.execute(db); +} + +export async function down(): Promise {} diff --git a/server/src/services/sync.service.ts b/server/src/services/sync.service.ts index fee77f35ba..c7d67e7dd0 100644 --- a/server/src/services/sync.service.ts +++ b/server/src/services/sync.service.ts @@ -16,6 +16,7 @@ import { SyncStreamDto, } from 'src/dtos/sync.dto'; import { AssetVisibility, DatabaseAction, EntityType, Permission, SyncEntityType, SyncRequestType } from 'src/enum'; +import { SyncQueryOptions } from 'src/repositories/sync.repository'; import { SessionSyncCheckpointTable } from 'src/schema/tables/sync-checkpoint.table'; import { BaseService } from 'src/services/base.service'; import { SyncAck } from 'src/types'; @@ -137,30 +138,34 @@ export class SyncService extends BaseService { return; } + const { nowId } = await this.syncCheckpointRepository.getNow(); + const options: SyncQueryOptions = { nowId, userId: auth.user.id }; + const checkpoints = await this.syncCheckpointRepository.getAll(session.id); const checkpointMap: CheckpointMap = Object.fromEntries(checkpoints.map(({ type, ack }) => [type, fromAck(ack)])); const handlers: Record Promise> = { - [SyncRequestType.AuthUsersV1]: () => this.syncAuthUsersV1(response, checkpointMap), - [SyncRequestType.UsersV1]: () => this.syncUsersV1(response, checkpointMap), - [SyncRequestType.PartnersV1]: () => this.syncPartnersV1(response, checkpointMap, auth), - [SyncRequestType.AssetsV1]: () => this.syncAssetsV1(response, checkpointMap, auth), - [SyncRequestType.AssetExifsV1]: () => this.syncAssetExifsV1(response, checkpointMap, auth), - [SyncRequestType.PartnerAssetsV1]: () => this.syncPartnerAssetsV1(response, checkpointMap, auth, session.id), + [SyncRequestType.AuthUsersV1]: () => this.syncAuthUsersV1(options, response, checkpointMap), + [SyncRequestType.UsersV1]: () => this.syncUsersV1(options, response, checkpointMap), + [SyncRequestType.PartnersV1]: () => this.syncPartnersV1(options, response, checkpointMap), + [SyncRequestType.AssetsV1]: () => this.syncAssetsV1(options, response, checkpointMap), + [SyncRequestType.AssetExifsV1]: () => this.syncAssetExifsV1(options, response, checkpointMap), + [SyncRequestType.PartnerAssetsV1]: () => this.syncPartnerAssetsV1(options, response, checkpointMap, session.id), [SyncRequestType.PartnerAssetExifsV1]: () => - this.syncPartnerAssetExifsV1(response, checkpointMap, auth, session.id), - [SyncRequestType.AlbumsV1]: () => this.syncAlbumsV1(response, checkpointMap, auth), - [SyncRequestType.AlbumUsersV1]: () => this.syncAlbumUsersV1(response, checkpointMap, auth, session.id), - [SyncRequestType.AlbumAssetsV1]: () => this.syncAlbumAssetsV1(response, checkpointMap, auth, session.id), - [SyncRequestType.AlbumToAssetsV1]: () => this.syncAlbumToAssetsV1(response, checkpointMap, auth, session.id), - [SyncRequestType.AlbumAssetExifsV1]: () => this.syncAlbumAssetExifsV1(response, checkpointMap, auth, session.id), - [SyncRequestType.MemoriesV1]: () => this.syncMemoriesV1(response, checkpointMap, auth), - [SyncRequestType.MemoryToAssetsV1]: () => this.syncMemoryAssetsV1(response, checkpointMap, auth), - [SyncRequestType.StacksV1]: () => this.syncStackV1(response, checkpointMap, auth), - [SyncRequestType.PartnerStacksV1]: () => this.syncPartnerStackV1(response, checkpointMap, auth, session.id), - [SyncRequestType.PeopleV1]: () => this.syncPeopleV1(response, checkpointMap, auth), - [SyncRequestType.AssetFacesV1]: async () => this.syncAssetFacesV1(response, checkpointMap, auth), - [SyncRequestType.UserMetadataV1]: () => this.syncUserMetadataV1(response, checkpointMap, auth), + this.syncPartnerAssetExifsV1(options, response, checkpointMap, session.id), + [SyncRequestType.AlbumsV1]: () => this.syncAlbumsV1(options, response, checkpointMap), + [SyncRequestType.AlbumUsersV1]: () => this.syncAlbumUsersV1(options, response, checkpointMap, session.id), + [SyncRequestType.AlbumAssetsV1]: () => this.syncAlbumAssetsV1(options, response, checkpointMap, session.id), + [SyncRequestType.AlbumToAssetsV1]: () => this.syncAlbumToAssetsV1(options, response, checkpointMap, session.id), + [SyncRequestType.AlbumAssetExifsV1]: () => + this.syncAlbumAssetExifsV1(options, response, checkpointMap, session.id), + [SyncRequestType.MemoriesV1]: () => this.syncMemoriesV1(options, response, checkpointMap), + [SyncRequestType.MemoryToAssetsV1]: () => this.syncMemoryAssetsV1(options, response, checkpointMap), + [SyncRequestType.StacksV1]: () => this.syncStackV1(options, response, checkpointMap), + [SyncRequestType.PartnerStacksV1]: () => this.syncPartnerStackV1(options, response, checkpointMap, session.id), + [SyncRequestType.PeopleV1]: () => this.syncPeopleV1(options, response, checkpointMap), + [SyncRequestType.AssetFacesV1]: async () => this.syncAssetFacesV1(options, response, checkpointMap), + [SyncRequestType.UserMetadataV1]: () => this.syncUserMetadataV1(options, response, checkpointMap), }; for (const type of SYNC_TYPES_ORDER.filter((type) => dto.types.includes(type))) { @@ -171,71 +176,71 @@ export class SyncService extends BaseService { response.end(); } - private async syncAuthUsersV1(response: Writable, checkpointMap: CheckpointMap) { + private async syncAuthUsersV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const upsertType = SyncEntityType.AuthUserV1; - const upserts = this.syncRepository.authUser.getUpserts(checkpointMap[upsertType]); + const upserts = this.syncRepository.authUser.getUpserts(options, checkpointMap[upsertType]); for await (const { updateId, profileImagePath, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data: { ...data, hasProfileImage: !!profileImagePath } }); } } - private async syncUsersV1(response: Writable, checkpointMap: CheckpointMap) { + private async syncUsersV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.UserDeleteV1; - const deletes = this.syncRepository.user.getDeletes(checkpointMap[deleteType]); + const deletes = this.syncRepository.user.getDeletes(options, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.UserV1; - const upserts = this.syncRepository.user.getUpserts(checkpointMap[upsertType]); + const upserts = this.syncRepository.user.getUpserts(options, checkpointMap[upsertType]); for await (const { updateId, profileImagePath, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data: { ...data, hasProfileImage: !!profileImagePath } }); } } - private async syncPartnersV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { + private async syncPartnersV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.PartnerDeleteV1; - const deletes = this.syncRepository.partner.getDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.partner.getDeletes(options, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.PartnerV1; - const upserts = this.syncRepository.partner.getUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.partner.getUpserts(options, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } } - private async syncAssetsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { + private async syncAssetsV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.AssetDeleteV1; - const deletes = this.syncRepository.asset.getDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.asset.getDeletes(options, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.AssetV1; - const upserts = this.syncRepository.asset.getUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.asset.getUpserts(options, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data: mapSyncAssetV1(data) }); } } private async syncPartnerAssetsV1( + options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap, - auth: AuthDto, sessionId: string, ) { const deleteType = SyncEntityType.PartnerAssetDeleteV1; - const deletes = this.syncRepository.partnerAsset.getDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.partnerAsset.getDeletes(options, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const backfillType = SyncEntityType.PartnerAssetBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const partners = await this.syncRepository.partner.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); + const partners = await this.syncRepository.partner.getCreatedAfter(options, backfillCheckpoint?.updateId); const upsertType = SyncEntityType.PartnerAssetV1; const upsertCheckpoint = checkpointMap[upsertType]; if (upsertCheckpoint) { @@ -248,7 +253,7 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.partnerAsset.getBackfill(partner.sharedById, startId, endId); + const backfill = this.syncRepository.partnerAsset.getBackfill(options, partner.sharedById, startId, endId); for await (const { updateId, ...data } of backfill) { send(response, { @@ -268,29 +273,29 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.partnerAsset.getUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.partnerAsset.getUpserts(options, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data: mapSyncAssetV1(data) }); } } - private async syncAssetExifsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { + private async syncAssetExifsV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const upsertType = SyncEntityType.AssetExifV1; - const upserts = this.syncRepository.assetExif.getUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.assetExif.getUpserts(options, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } } private async syncPartnerAssetExifsV1( + options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap, - auth: AuthDto, sessionId: string, ) { const backfillType = SyncEntityType.PartnerAssetExifBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const partners = await this.syncRepository.partner.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); + const partners = await this.syncRepository.partner.getCreatedAfter(options, backfillCheckpoint?.updateId); const upsertType = SyncEntityType.PartnerAssetExifV1; const upsertCheckpoint = checkpointMap[upsertType]; @@ -304,7 +309,7 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.partnerAssetExif.getBackfill(partner.sharedById, startId, endId); + const backfill = this.syncRepository.partnerAssetExif.getBackfill(options, partner.sharedById, startId, endId); for await (const { updateId, ...data } of backfill) { send(response, { type: backfillType, ids: [partner.createId, updateId], data }); @@ -320,36 +325,41 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.partnerAssetExif.getUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.partnerAssetExif.getUpserts(options, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } } - private async syncAlbumsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { + private async syncAlbumsV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.AlbumDeleteV1; - const deletes = this.syncRepository.album.getDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.album.getDeletes(options, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.AlbumV1; - const upserts = this.syncRepository.album.getUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.album.getUpserts(options, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } } - private async syncAlbumUsersV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto, sessionId: string) { + private async syncAlbumUsersV1( + options: SyncQueryOptions, + response: Writable, + checkpointMap: CheckpointMap, + sessionId: string, + ) { const deleteType = SyncEntityType.AlbumUserDeleteV1; - const deletes = this.syncRepository.albumUser.getDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.albumUser.getDeletes(options, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const backfillType = SyncEntityType.AlbumUserBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const albums = await this.syncRepository.album.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); + const albums = await this.syncRepository.album.getCreatedAfter(options, backfillCheckpoint?.updateId); const upsertType = SyncEntityType.AlbumUserV1; const upsertCheckpoint = checkpointMap[upsertType]; if (upsertCheckpoint) { @@ -362,7 +372,7 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.albumUser.getBackfill(album.id, startId, endId); + const backfill = this.syncRepository.albumUser.getBackfill(options, album.id, startId, endId); for await (const { updateId, ...data } of backfill) { send(response, { type: backfillType, ids: [createId, updateId], data }); @@ -378,20 +388,27 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.albumUser.getUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.albumUser.getUpserts(options, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } } - private async syncAlbumAssetsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto, sessionId: string) { + private async syncAlbumAssetsV1( + options: SyncQueryOptions, + response: Writable, + checkpointMap: CheckpointMap, + sessionId: string, + ) { const backfillType = SyncEntityType.AlbumAssetBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const albums = await this.syncRepository.album.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); - const upsertType = SyncEntityType.AlbumAssetV1; - const upsertCheckpoint = checkpointMap[upsertType]; - if (upsertCheckpoint) { - const endId = upsertCheckpoint.updateId; + const albums = await this.syncRepository.album.getCreatedAfter(options, backfillCheckpoint?.updateId); + const updateType = SyncEntityType.AlbumAssetUpdateV1; + const createType = SyncEntityType.AlbumAssetCreateV1; + const updateCheckpoint = checkpointMap[updateType]; + const createCheckpoint = checkpointMap[createType]; + if (createCheckpoint) { + const endId = createCheckpoint.updateId; for (const album of albums) { const createId = album.createId; @@ -400,7 +417,7 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.albumAsset.getBackfill(album.id, startId, endId); + const backfill = this.syncRepository.albumAsset.getBackfill(options, album.id, startId, endId); for await (const { updateId, ...data } of backfill) { send(response, { type: backfillType, ids: [createId, updateId], data: mapSyncAssetV1(data) }); @@ -416,25 +433,44 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.albumAsset.getUpserts(auth.user.id, checkpointMap[upsertType]); - for await (const { updateId, ...data } of upserts) { - send(response, { type: upsertType, ids: [updateId], data: mapSyncAssetV1(data) }); + if (createCheckpoint) { + const updates = this.syncRepository.albumAsset.getUpdates(options, createCheckpoint, updateCheckpoint); + for await (const { updateId, ...data } of updates) { + send(response, { type: updateType, ids: [updateId], data: mapSyncAssetV1(data) }); + } + } + + const creates = this.syncRepository.albumAsset.getCreates(options, createCheckpoint); + let first = true; + for await (const { updateId, ...data } of creates) { + if (first) { + send(response, { + type: SyncEntityType.SyncAckV1, + data: {}, + ackType: SyncEntityType.AlbumAssetUpdateV1, + ids: [options.nowId], + }); + first = false; + } + send(response, { type: createType, ids: [updateId], data: mapSyncAssetV1(data) }); } } private async syncAlbumAssetExifsV1( + options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap, - auth: AuthDto, sessionId: string, ) { const backfillType = SyncEntityType.AlbumAssetExifBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const albums = await this.syncRepository.album.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); - const upsertType = SyncEntityType.AlbumAssetExifV1; - const upsertCheckpoint = checkpointMap[upsertType]; - if (upsertCheckpoint) { - const endId = upsertCheckpoint.updateId; + const albums = await this.syncRepository.album.getCreatedAfter(options, backfillCheckpoint?.updateId); + const updateType = SyncEntityType.AlbumAssetExifUpdateV1; + const createType = SyncEntityType.AlbumAssetExifCreateV1; + const upsertCheckpoint = checkpointMap[updateType]; + const createCheckpoint = checkpointMap[createType]; + if (createCheckpoint) { + const endId = createCheckpoint.updateId; for (const album of albums) { const createId = album.createId; @@ -443,7 +479,7 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.albumAssetExif.getBackfill(album.id, startId, endId); + const backfill = this.syncRepository.albumAssetExif.getBackfill(options, album.id, startId, endId); for await (const { updateId, ...data } of backfill) { send(response, { type: backfillType, ids: [createId, updateId], data }); @@ -459,27 +495,44 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.albumAssetExif.getUpserts(auth.user.id, checkpointMap[upsertType]); - for await (const { updateId, ...data } of upserts) { - send(response, { type: upsertType, ids: [updateId], data }); + if (createCheckpoint) { + const updates = this.syncRepository.albumAssetExif.getUpdates(options, createCheckpoint, upsertCheckpoint); + for await (const { updateId, ...data } of updates) { + send(response, { type: updateType, ids: [updateId], data }); + } + } + + const creates = this.syncRepository.albumAssetExif.getCreates(options, createCheckpoint); + let first = true; + for await (const { updateId, ...data } of creates) { + if (first) { + send(response, { + type: SyncEntityType.SyncAckV1, + data: {}, + ackType: SyncEntityType.AlbumAssetExifUpdateV1, + ids: [options.nowId], + }); + first = false; + } + send(response, { type: createType, ids: [updateId], data }); } } private async syncAlbumToAssetsV1( + options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap, - auth: AuthDto, sessionId: string, ) { const deleteType = SyncEntityType.AlbumToAssetDeleteV1; - const deletes = this.syncRepository.albumToAsset.getDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.albumToAsset.getDeletes(options, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const backfillType = SyncEntityType.AlbumToAssetBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const albums = await this.syncRepository.album.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); + const albums = await this.syncRepository.album.getCreatedAfter(options, backfillCheckpoint?.updateId); const upsertType = SyncEntityType.AlbumToAssetV1; const upsertCheckpoint = checkpointMap[upsertType]; if (upsertCheckpoint) { @@ -492,7 +545,7 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.albumToAsset.getBackfill(album.id, startId, endId); + const backfill = this.syncRepository.albumToAsset.getBackfill(options, album.id, startId, endId); for await (const { updateId, ...data } of backfill) { send(response, { type: backfillType, ids: [createId, updateId], data }); @@ -508,64 +561,69 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.albumToAsset.getUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.albumToAsset.getUpserts(options, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } } - private async syncMemoriesV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { + private async syncMemoriesV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.MemoryDeleteV1; - const deletes = this.syncRepository.memory.getDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.memory.getDeletes(options, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.MemoryV1; - const upserts = this.syncRepository.memory.getUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.memory.getUpserts(options, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } } - private async syncMemoryAssetsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { + private async syncMemoryAssetsV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.MemoryToAssetDeleteV1; - const deletes = this.syncRepository.memoryToAsset.getDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.memoryToAsset.getDeletes(options, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.MemoryToAssetV1; - const upserts = this.syncRepository.memoryToAsset.getUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.memoryToAsset.getUpserts(options, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } } - private async syncStackV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { + private async syncStackV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.StackDeleteV1; - const deletes = this.syncRepository.stack.getDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.stack.getDeletes(options, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.StackV1; - const upserts = this.syncRepository.stack.getUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.stack.getUpserts(options, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } } - private async syncPartnerStackV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto, sessionId: string) { + private async syncPartnerStackV1( + options: SyncQueryOptions, + response: Writable, + checkpointMap: CheckpointMap, + sessionId: string, + ) { const deleteType = SyncEntityType.PartnerStackDeleteV1; - const deletes = this.syncRepository.partnerStack.getDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.partnerStack.getDeletes(options, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const backfillType = SyncEntityType.PartnerStackBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const partners = await this.syncRepository.partner.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); + const partners = await this.syncRepository.partner.getCreatedAfter(options, backfillCheckpoint?.updateId); const upsertType = SyncEntityType.PartnerStackV1; const upsertCheckpoint = checkpointMap[upsertType]; if (upsertCheckpoint) { @@ -578,7 +636,7 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.partnerStack.getBackfill(partner.sharedById, startId, endId); + const backfill = this.syncRepository.partnerStack.getBackfill(options, partner.sharedById, startId, endId); for await (const { updateId, ...data } of backfill) { send(response, { @@ -598,50 +656,50 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.partnerStack.getUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.partnerStack.getUpserts(options, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } } - private async syncPeopleV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { + private async syncPeopleV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.PersonDeleteV1; - const deletes = this.syncRepository.people.getDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.people.getDeletes(options, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.PersonV1; - const upserts = this.syncRepository.people.getUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.people.getUpserts(options, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } } - private async syncAssetFacesV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { + private async syncAssetFacesV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.AssetFaceDeleteV1; - const deletes = this.syncRepository.assetFace.getDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.assetFace.getDeletes(options, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.AssetFaceV1; - const upserts = this.syncRepository.assetFace.getUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.assetFace.getUpserts(options, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } } - private async syncUserMetadataV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { + private async syncUserMetadataV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.UserMetadataDeleteV1; - const deletes = this.syncRepository.userMetadata.getDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.userMetadata.getDeletes(options, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.UserMetadataV1; - const upserts = this.syncRepository.userMetadata.getUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.userMetadata.getUpserts(options, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); diff --git a/server/test/medium.factory.ts b/server/test/medium.factory.ts index 1b669e83e4..8b0878a35a 100644 --- a/server/test/medium.factory.ts +++ b/server/test/medium.factory.ts @@ -5,7 +5,15 @@ import { createHash, randomBytes } from 'node:crypto'; import { Writable } from 'node:stream'; import { AssetFace } from 'src/database'; import { AuthDto, LoginResponseDto } from 'src/dtos/auth.dto'; -import { AlbumUserRole, AssetType, AssetVisibility, MemoryType, SourceType, SyncRequestType } from 'src/enum'; +import { + AlbumUserRole, + AssetType, + AssetVisibility, + MemoryType, + SourceType, + SyncEntityType, + SyncRequestType, +} from 'src/enum'; import { AccessRepository } from 'src/repositories/access.repository'; import { ActivityRepository } from 'src/repositories/activity.repository'; import { AlbumUserRepository } from 'src/repositories/album-user.repository'; @@ -251,11 +259,16 @@ export class SyncTestContext extends MediumTestContext { async syncAckAll(auth: AuthDto, response: Array<{ type: string; ack: string }>) { const acks: Record = {}; + const syncAcks: string[] = []; for (const { type, ack } of response) { + if (type === SyncEntityType.SyncAckV1) { + syncAcks.push(ack); + continue; + } acks[type] = ack; } - await this.sut.setAcks(auth, { acks: Object.values(acks) }); + await this.sut.setAcks(auth, { acks: [...Object.values(acks), ...syncAcks] }); } } @@ -468,8 +481,6 @@ const personInsert = (person: Partial> & { ownerId: stri name: person.name || 'Test Name', ownerId: person.ownerId || newUuid(), thumbnailPath: person.thumbnailPath || '/path/to/thumbnail.jpg', - updatedAt: person.updatedAt || newDate(), - updateId: person.updateId || newUuid(), }; return { ...defaults, diff --git a/server/test/medium/specs/sync/sync-album-asset-exif.spec.ts b/server/test/medium/specs/sync/sync-album-asset-exif.spec.ts index 808a4785ce..9e994604a5 100644 --- a/server/test/medium/specs/sync/sync-album-asset-exif.spec.ts +++ b/server/test/medium/specs/sync/sync-album-asset-exif.spec.ts @@ -1,5 +1,6 @@ import { Kysely } from 'kysely'; import { AlbumUserRole, SyncEntityType, SyncRequestType } from 'src/enum'; +import { AssetRepository } from 'src/repositories/asset.repository'; import { DB } from 'src/schema'; import { SyncTestContext } from 'test/medium.factory'; import { factory } from 'test/small.factory'; @@ -13,6 +14,18 @@ const setup = async (db?: Kysely) => { return { auth, user, session, ctx }; }; +const updateSyncAck = { + ack: expect.stringContaining(SyncEntityType.AlbumAssetExifUpdateV1), + data: {}, + type: SyncEntityType.SyncAckV1, +}; + +const backfillSyncAck = { + ack: expect.stringContaining(SyncEntityType.AlbumAssetExifBackfillV1), + data: {}, + type: SyncEntityType.SyncAckV1, +}; + beforeAll(async () => { defaultDatabase = await getKyselyDB(); }); @@ -28,8 +41,8 @@ describe(SyncRequestType.AlbumAssetExifsV1, () => { await ctx.newAlbumUser({ albumId: album.id, userId: auth.user.id, role: AlbumUserRole.Editor }); const response = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]); - expect(response).toHaveLength(1); expect(response).toEqual([ + updateSyncAck, { ack: expect.any(String), data: { @@ -59,9 +72,10 @@ describe(SyncRequestType.AlbumAssetExifsV1, () => { state: null, timeZone: null, }, - type: SyncEntityType.AlbumAssetExifV1, + type: SyncEntityType.AlbumAssetExifCreateV1, }, ]); + expect(response).toHaveLength(2); await ctx.syncAckAll(auth, response); await expect(ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1])).resolves.toEqual([]); @@ -75,7 +89,7 @@ describe(SyncRequestType.AlbumAssetExifsV1, () => { await ctx.newAlbumAsset({ albumId: album.id, assetId: asset.id }); await expect(ctx.syncStream(auth, [SyncRequestType.AssetExifsV1])).resolves.toHaveLength(1); - await expect(ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1])).resolves.toHaveLength(1); + await expect(ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1])).resolves.toHaveLength(2); }); it('should not sync album asset exif for unrelated user', async () => { @@ -97,55 +111,46 @@ describe(SyncRequestType.AlbumAssetExifsV1, () => { it('should backfill album assets exif when a user shares an album with you', async () => { const { auth, ctx } = await setup(); const { user: user2 } = await ctx.newUser(); - const { asset: asset1Owner } = await ctx.newAsset({ ownerId: auth.user.id }); - await ctx.newExif({ assetId: asset1Owner.id, make: 'asset1Owner' }); - await wait(2); + const { album: album1 } = await ctx.newAlbum({ ownerId: user2.id }); + const { album: album2 } = await ctx.newAlbum({ ownerId: user2.id }); const { asset: asset1User2 } = await ctx.newAsset({ ownerId: user2.id }); await ctx.newExif({ assetId: asset1User2.id, make: 'asset1User2' }); + await ctx.newAlbumAsset({ albumId: album2.id, assetId: asset1User2.id }); await wait(2); const { asset: asset2User2 } = await ctx.newAsset({ ownerId: user2.id }); await ctx.newExif({ assetId: asset2User2.id, make: 'asset2User2' }); + await ctx.newAlbumAsset({ albumId: album2.id, assetId: asset2User2.id }); + await wait(2); + await ctx.newAlbumAsset({ albumId: album1.id, assetId: asset2User2.id }); await wait(2); const { asset: asset3User2 } = await ctx.newAsset({ ownerId: user2.id }); + await ctx.newAlbumAsset({ albumId: album2.id, assetId: asset3User2.id }); await ctx.newExif({ assetId: asset3User2.id, make: 'asset3User2' }); - const { album: album1 } = await ctx.newAlbum({ ownerId: user2.id }); - await ctx.newAlbumAsset({ albumId: album1.id, assetId: asset2User2.id }); + await wait(2); await ctx.newAlbumUser({ albumId: album1.id, userId: auth.user.id, role: AlbumUserRole.Editor }); const response = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]); - expect(response).toHaveLength(1); expect(response).toEqual([ + updateSyncAck, { ack: expect.any(String), data: expect.objectContaining({ assetId: asset2User2.id, }), - type: SyncEntityType.AlbumAssetExifV1, + type: SyncEntityType.AlbumAssetExifCreateV1, }, ]); + expect(response).toHaveLength(2); // ack initial album asset exif sync await ctx.syncAckAll(auth, response); // create a second album - const { album: album2 } = await ctx.newAlbum({ ownerId: user2.id }); - await Promise.all( - [asset1User2.id, asset2User2.id, asset3User2.id, asset1Owner.id].map((assetId) => - ctx.newAlbumAsset({ albumId: album2.id, assetId }), - ), - ); await ctx.newAlbumUser({ albumId: album2.id, userId: auth.user.id, role: AlbumUserRole.Editor }); // should backfill the album user const newResponse = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]); expect(newResponse).toEqual([ - { - ack: expect.any(String), - data: expect.objectContaining({ - assetId: asset1Owner.id, - }), - type: SyncEntityType.AlbumAssetExifBackfillV1, - }, { ack: expect.any(String), data: expect.objectContaining({ @@ -160,21 +165,194 @@ describe(SyncRequestType.AlbumAssetExifsV1, () => { }), type: SyncEntityType.AlbumAssetExifBackfillV1, }, - { - ack: expect.stringContaining(SyncEntityType.AlbumAssetExifBackfillV1), - data: {}, - type: SyncEntityType.SyncAckV1, - }, + backfillSyncAck, + updateSyncAck, { ack: expect.any(String), data: expect.objectContaining({ assetId: asset3User2.id, }), - type: SyncEntityType.AlbumAssetExifV1, + type: SyncEntityType.AlbumAssetExifCreateV1, }, ]); + expect(newResponse).toHaveLength(5); await ctx.syncAckAll(auth, newResponse); await expect(ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1])).resolves.toEqual([]); }); + + it('should sync old asset exif when a user adds them to an album they share you', async () => { + const { auth, ctx } = await setup(); + const { user: user2 } = await ctx.newUser(); + const { asset: firstAsset } = await ctx.newAsset({ ownerId: user2.id, originalFileName: 'firstAsset' }); + await ctx.newExif({ assetId: firstAsset.id, make: 'firstAsset' }); + const { asset: secondAsset } = await ctx.newAsset({ ownerId: user2.id, originalFileName: 'secondAsset' }); + await ctx.newExif({ assetId: secondAsset.id, make: 'secondAsset' }); + const { asset: album1Asset } = await ctx.newAsset({ ownerId: user2.id, originalFileName: 'album1Asset' }); + await ctx.newExif({ assetId: album1Asset.id, make: 'album1Asset' }); + const { album: album1 } = await ctx.newAlbum({ ownerId: user2.id }); + const { album: album2 } = await ctx.newAlbum({ ownerId: user2.id }); + await ctx.newAlbumAsset({ albumId: album2.id, assetId: firstAsset.id }); + await wait(2); + await ctx.newAlbumAsset({ albumId: album1.id, assetId: album1Asset.id }); + await ctx.newAlbumUser({ albumId: album1.id, userId: auth.user.id, role: AlbumUserRole.Editor }); + + const firstAlbumResponse = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]); + expect(firstAlbumResponse).toEqual([ + updateSyncAck, + { + ack: expect.any(String), + data: expect.objectContaining({ + assetId: album1Asset.id, + }), + type: SyncEntityType.AlbumAssetExifCreateV1, + }, + ]); + expect(firstAlbumResponse).toHaveLength(2); + + await ctx.syncAckAll(auth, firstAlbumResponse); + + await ctx.newAlbumUser({ albumId: album2.id, userId: auth.user.id, role: AlbumUserRole.Editor }); + + const response = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]); + expect(response).toEqual([ + { + ack: expect.any(String), + data: expect.objectContaining({ + assetId: firstAsset.id, + }), + type: SyncEntityType.AlbumAssetExifBackfillV1, + }, + backfillSyncAck, + ]); + expect(response).toHaveLength(2); + + // ack initial album asset sync + await ctx.syncAckAll(auth, response); + + await ctx.newAlbumAsset({ albumId: album2.id, assetId: secondAsset.id }); + await wait(2); + + // should backfill the new asset even though it's older than the first asset + const newResponse = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]); + expect(newResponse).toEqual([ + updateSyncAck, + { + ack: expect.any(String), + data: expect.objectContaining({ + assetId: secondAsset.id, + }), + type: SyncEntityType.AlbumAssetExifCreateV1, + }, + ]); + expect(newResponse).toHaveLength(2); + + await ctx.syncAckAll(auth, newResponse); + await expect(ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1])).resolves.toEqual([]); + }); + + it('should sync asset exif updates for an album shared with you', async () => { + const { auth, ctx } = await setup(); + const { user: user2 } = await ctx.newUser(); + const { asset } = await ctx.newAsset({ ownerId: user2.id }); + await ctx.newExif({ assetId: asset.id, make: 'asset' }); + const { album } = await ctx.newAlbum({ ownerId: user2.id }); + await wait(2); + await ctx.newAlbumAsset({ albumId: album.id, assetId: asset.id }); + await ctx.newAlbumUser({ albumId: album.id, userId: auth.user.id, role: AlbumUserRole.Editor }); + + const response = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]); + expect(response).toHaveLength(2); + expect(response).toEqual([ + updateSyncAck, + { + ack: expect.any(String), + data: expect.objectContaining({ + assetId: asset.id, + }), + type: SyncEntityType.AlbumAssetExifCreateV1, + }, + ]); + + await ctx.syncAckAll(auth, response); + + // update the asset + const assetRepository = ctx.get(AssetRepository); + await assetRepository.upsertExif({ + assetId: asset.id, + city: 'New City', + }); + + const updateResponse = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]); + expect(updateResponse).toHaveLength(1); + expect(updateResponse).toEqual([ + { + ack: expect.any(String), + data: expect.objectContaining({ + assetId: asset.id, + city: 'New City', + }), + type: SyncEntityType.AlbumAssetExifUpdateV1, + }, + ]); + }); + + it('should sync delayed asset exif creates for an album shared with you', async () => { + const { auth, ctx } = await setup(); + const { user: user2 } = await ctx.newUser(); + const { asset: assetWithExif } = await ctx.newAsset({ ownerId: user2.id }); + await ctx.newExif({ assetId: assetWithExif.id, make: 'assetWithExif' }); + const { asset: assetDelayedExif } = await ctx.newAsset({ ownerId: user2.id }); + const { album } = await ctx.newAlbum({ ownerId: user2.id }); + const { asset: newerAsset } = await ctx.newAsset({ ownerId: user2.id }); + await ctx.newExif({ assetId: newerAsset.id, make: 'newerAsset' }); + await ctx.newAlbumAsset({ albumId: album.id, assetId: assetWithExif.id }); + await wait(2); + await ctx.newAlbumAsset({ albumId: album.id, assetId: assetDelayedExif.id }); + await wait(2); + await ctx.newAlbumAsset({ albumId: album.id, assetId: newerAsset.id }); + await ctx.newAlbumUser({ albumId: album.id, userId: auth.user.id, role: AlbumUserRole.Editor }); + + const response = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]); + expect(response).toEqual([ + updateSyncAck, + { + ack: expect.any(String), + data: expect.objectContaining({ + assetId: assetWithExif.id, + }), + type: SyncEntityType.AlbumAssetExifCreateV1, + }, + { + ack: expect.any(String), + data: expect.objectContaining({ + assetId: newerAsset.id, + }), + type: SyncEntityType.AlbumAssetExifCreateV1, + }, + ]); + expect(response).toHaveLength(3); + + await ctx.syncAckAll(auth, response); + + // update the asset + const assetRepository = ctx.get(AssetRepository); + await assetRepository.upsertExif({ + assetId: assetDelayedExif.id, + city: 'Delayed Exif', + }); + + const updateResponse = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]); + expect(updateResponse).toEqual([ + { + ack: expect.any(String), + data: expect.objectContaining({ + assetId: assetDelayedExif.id, + city: 'Delayed Exif', + }), + type: SyncEntityType.AlbumAssetExifUpdateV1, + }, + ]); + expect(updateResponse).toHaveLength(1); + }); }); diff --git a/server/test/medium/specs/sync/sync-album-asset.spec.ts b/server/test/medium/specs/sync/sync-album-asset.spec.ts index 3002b99071..cbc60a2c5a 100644 --- a/server/test/medium/specs/sync/sync-album-asset.spec.ts +++ b/server/test/medium/specs/sync/sync-album-asset.spec.ts @@ -1,5 +1,6 @@ import { Kysely } from 'kysely'; import { AlbumUserRole, SyncEntityType, SyncRequestType } from 'src/enum'; +import { AssetRepository } from 'src/repositories/asset.repository'; import { DB } from 'src/schema'; import { SyncTestContext } from 'test/medium.factory'; import { factory } from 'test/small.factory'; @@ -13,6 +14,18 @@ const setup = async (db?: Kysely) => { return { auth, user, session, ctx }; }; +const updateSyncAck = { + ack: expect.stringContaining(SyncEntityType.AlbumAssetUpdateV1), + data: {}, + type: SyncEntityType.SyncAckV1, +}; + +const backfillSyncAck = { + ack: expect.stringContaining(SyncEntityType.AlbumAssetBackfillV1), + data: {}, + type: SyncEntityType.SyncAckV1, +}; + beforeAll(async () => { defaultDatabase = await getKyselyDB(); }); @@ -45,8 +58,9 @@ describe(SyncRequestType.AlbumAssetsV1, () => { await ctx.newAlbumUser({ albumId: album.id, userId: auth.user.id, role: AlbumUserRole.Editor }); const response = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetsV1]); - expect(response).toHaveLength(1); + expect(response).toHaveLength(2); expect(response).toEqual([ + updateSyncAck, { ack: expect.any(String), data: { @@ -67,7 +81,7 @@ describe(SyncRequestType.AlbumAssetsV1, () => { stackId: asset.stackId, libraryId: asset.libraryId, }, - type: SyncEntityType.AlbumAssetV1, + type: SyncEntityType.AlbumAssetCreateV1, }, ]); @@ -82,7 +96,7 @@ describe(SyncRequestType.AlbumAssetsV1, () => { await ctx.newAlbumAsset({ albumId: album.id, assetId: asset.id }); await expect(ctx.syncStream(auth, [SyncRequestType.AssetsV1])).resolves.toHaveLength(1); - await expect(ctx.syncStream(auth, [SyncRequestType.AlbumAssetsV1])).resolves.toHaveLength(1); + await expect(ctx.syncStream(auth, [SyncRequestType.AlbumAssetsV1])).resolves.toHaveLength(2); }); it('should not sync album asset for unrelated user', async () => { @@ -103,52 +117,42 @@ describe(SyncRequestType.AlbumAssetsV1, () => { it('should backfill album assets when a user shares an album with you', async () => { const { auth, ctx } = await setup(); const { user: user2 } = await ctx.newUser(); - const { asset: asset1Owner } = await ctx.newAsset({ ownerId: auth.user.id }); - await wait(2); + const { album: album1 } = await ctx.newAlbum({ ownerId: user2.id }); + const { album: album2 } = await ctx.newAlbum({ ownerId: user2.id }); const { asset: asset1User2 } = await ctx.newAsset({ ownerId: user2.id }); + await ctx.newAlbumAsset({ albumId: album2.id, assetId: asset1User2.id }); await wait(2); const { asset: asset2User2 } = await ctx.newAsset({ ownerId: user2.id }); + await ctx.newAlbumAsset({ albumId: album2.id, assetId: asset2User2.id }); + await wait(2); + await ctx.newAlbumAsset({ albumId: album1.id, assetId: asset2User2.id }); await wait(2); const { asset: asset3User2 } = await ctx.newAsset({ ownerId: user2.id }); + await ctx.newAlbumAsset({ albumId: album2.id, assetId: asset3User2.id }); await wait(2); - const { album: album1 } = await ctx.newAlbum({ ownerId: user2.id }); - await ctx.newAlbumAsset({ albumId: album1.id, assetId: asset2User2.id }); await ctx.newAlbumUser({ albumId: album1.id, userId: auth.user.id, role: AlbumUserRole.Editor }); const response = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetsV1]); - expect(response).toHaveLength(1); + expect(response).toHaveLength(2); expect(response).toEqual([ + updateSyncAck, { ack: expect.any(String), data: expect.objectContaining({ id: asset2User2.id, }), - type: SyncEntityType.AlbumAssetV1, + type: SyncEntityType.AlbumAssetCreateV1, }, ]); // ack initial album asset sync await ctx.syncAckAll(auth, response); - // create a second album - const { album: album2 } = await ctx.newAlbum({ ownerId: user2.id }); - await Promise.all( - [asset1User2.id, asset2User2.id, asset3User2.id, asset1Owner.id].map((assetId) => - ctx.newAlbumAsset({ albumId: album2.id, assetId }), - ), - ); await ctx.newAlbumUser({ albumId: album2.id, userId: auth.user.id, role: AlbumUserRole.Editor }); // should backfill the album user const newResponse = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetsV1]); expect(newResponse).toEqual([ - { - ack: expect.any(String), - data: expect.objectContaining({ - id: asset1Owner.id, - }), - type: SyncEntityType.AlbumAssetBackfillV1, - }, { ack: expect.any(String), data: expect.objectContaining({ @@ -163,21 +167,129 @@ describe(SyncRequestType.AlbumAssetsV1, () => { }), type: SyncEntityType.AlbumAssetBackfillV1, }, - { - ack: expect.stringContaining(SyncEntityType.AlbumAssetBackfillV1), - data: {}, - type: SyncEntityType.SyncAckV1, - }, + backfillSyncAck, + updateSyncAck, { ack: expect.any(String), data: expect.objectContaining({ id: asset3User2.id, }), - type: SyncEntityType.AlbumAssetV1, + type: SyncEntityType.AlbumAssetCreateV1, }, ]); await ctx.syncAckAll(auth, newResponse); await expect(ctx.syncStream(auth, [SyncRequestType.AlbumAssetsV1])).resolves.toEqual([]); }); + + it('should sync old assets when a user adds them to an album they share you', async () => { + const { auth, ctx } = await setup(); + const { user: user2 } = await ctx.newUser(); + const { asset: firstAsset } = await ctx.newAsset({ ownerId: user2.id, originalFileName: 'firstAsset' }); + const { asset: secondAsset } = await ctx.newAsset({ ownerId: user2.id, originalFileName: 'secondAsset' }); + const { asset: album1Asset } = await ctx.newAsset({ ownerId: user2.id, originalFileName: 'album1Asset' }); + const { album: album1 } = await ctx.newAlbum({ ownerId: user2.id }); + const { album: album2 } = await ctx.newAlbum({ ownerId: user2.id }); + await ctx.newAlbumAsset({ albumId: album2.id, assetId: firstAsset.id }); + await wait(2); + await ctx.newAlbumAsset({ albumId: album1.id, assetId: album1Asset.id }); + await ctx.newAlbumUser({ albumId: album1.id, userId: auth.user.id, role: AlbumUserRole.Editor }); + + const firstAlbumResponse = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetsV1]); + expect(firstAlbumResponse).toHaveLength(2); + expect(firstAlbumResponse).toEqual([ + updateSyncAck, + { + ack: expect.any(String), + data: expect.objectContaining({ + id: album1Asset.id, + }), + type: SyncEntityType.AlbumAssetCreateV1, + }, + ]); + + await ctx.syncAckAll(auth, firstAlbumResponse); + + await ctx.newAlbumUser({ albumId: album2.id, userId: auth.user.id, role: AlbumUserRole.Editor }); + + const response = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetsV1]); + // expect(response).toHaveLength(2); + expect(response).toEqual([ + { + ack: expect.any(String), + data: expect.objectContaining({ + id: firstAsset.id, + }), + type: SyncEntityType.AlbumAssetBackfillV1, + }, + backfillSyncAck, + ]); + + // ack initial album asset sync + await ctx.syncAckAll(auth, response); + + await ctx.newAlbumAsset({ albumId: album2.id, assetId: secondAsset.id }); + await wait(2); + + // should backfill the new asset even though it's older than the first asset + const newResponse = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetsV1]); + expect(newResponse).toEqual([ + updateSyncAck, + { + ack: expect.any(String), + data: expect.objectContaining({ + id: secondAsset.id, + }), + type: SyncEntityType.AlbumAssetCreateV1, + }, + ]); + + await ctx.syncAckAll(auth, newResponse); + await expect(ctx.syncStream(auth, [SyncRequestType.AlbumAssetsV1])).resolves.toEqual([]); + }); + + it('should sync asset updates for an album shared with you', async () => { + const { auth, ctx } = await setup(); + const { user: user2 } = await ctx.newUser(); + const { asset } = await ctx.newAsset({ ownerId: user2.id, isFavorite: false }); + const { album } = await ctx.newAlbum({ ownerId: user2.id }); + await wait(2); + await ctx.newAlbumAsset({ albumId: album.id, assetId: asset.id }); + await ctx.newAlbumUser({ albumId: album.id, userId: auth.user.id, role: AlbumUserRole.Editor }); + + const response = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetsV1]); + expect(response).toHaveLength(2); + expect(response).toEqual([ + updateSyncAck, + { + ack: expect.any(String), + data: expect.objectContaining({ + id: asset.id, + }), + type: SyncEntityType.AlbumAssetCreateV1, + }, + ]); + + await ctx.syncAckAll(auth, response); + + // update the asset + const assetRepository = ctx.get(AssetRepository); + await assetRepository.update({ + id: asset.id, + isFavorite: true, + }); + + const updateResponse = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetsV1]); + expect(updateResponse).toHaveLength(1); + expect(updateResponse).toEqual([ + { + ack: expect.any(String), + data: expect.objectContaining({ + id: asset.id, + isFavorite: true, + }), + type: SyncEntityType.AlbumAssetUpdateV1, + }, + ]); + }); }); diff --git a/server/test/utils.ts b/server/test/utils.ts index 9f212578c0..95f9741d7c 100644 --- a/server/test/utils.ts +++ b/server/test/utils.ts @@ -446,6 +446,16 @@ export async function* makeStream(items: T[] = []): AsyncIterableIterator } } -export const wait = (ms: number) => { - return new Promise((resolve) => setTimeout(resolve, ms)); +export const wait = (ms: number): Promise => { + return new Promise((resolve) => { + const target = performance.now() + ms; + const checkDone = () => { + if (performance.now() >= target) { + resolve(); + } else { + setTimeout(checkDone, 1); // Check again after 1ms + } + }; + setTimeout(checkDone, ms); + }); };