From fa26d0de3301f1d6f777a6d8469115578db2b956 Mon Sep 17 00:00:00 2001 From: Zack Pollard Date: Wed, 6 Aug 2025 13:34:12 +0100 Subject: [PATCH] refactor: new helper methods that work for all sync queries (#20690) refactor: new helper methods that work for all sync queries --- server/src/queries/sync.repository.sql | 490 ++++++++++------- server/src/repositories/sync.repository.ts | 580 ++++++++------------- server/src/services/sync.service.ts | 146 ++++-- 3 files changed, 639 insertions(+), 577 deletions(-) diff --git a/server/src/queries/sync.repository.sql b/server/src/queries/sync.repository.sql index 0bd5b9f47d..80021368a0 100644 --- a/server/src/queries/sync.repository.sql +++ b/server/src/queries/sync.repository.sql @@ -18,12 +18,13 @@ select "id", "albumId" from - "album_audit" + "album_audit" as "album_audit" where - "userId" = $1 - and "id" < $2 + "album_audit"."id" < $1 + and "album_audit"."id" > $2 + and "userId" = $3 order by - "id" asc + "album_audit"."id" asc -- SyncRepository.album.getUpserts select distinct @@ -38,13 +39,14 @@ select distinct "album"."order", "album"."updateId" from - "album" + "album" as "album" left join "album_user" as "album_users" on "album"."id" = "album_users"."albumsId" where "album"."updateId" < $1 + and "album"."updateId" > $2 and ( - "album"."ownerId" = $2 - or "album_users"."usersId" = $3 + "album"."ownerId" = $3 + or "album_users"."usersId" = $4 ) order by "album"."updateId" asc @@ -69,16 +71,51 @@ select "asset"."libraryId", "album_asset"."updateId" from - "album_asset" + "album_asset" as "album_asset" inner join "asset" on "asset"."id" = "album_asset"."assetsId" where - "album_asset"."albumsId" = $1 - and "album_asset"."updateId" < $2 - and "album_asset"."updateId" <= $3 - and "album_asset"."updateId" >= $4 + "album_asset"."updateId" < $1 + and "album_asset"."updateId" <= $2 + and "album_asset"."updateId" >= $3 + and "album_asset"."albumsId" = $4 order by "album_asset"."updateId" asc +-- SyncRepository.albumAsset.getUpdates +select + "asset"."id", + "asset"."ownerId", + "asset"."originalFileName", + "asset"."thumbhash", + "asset"."checksum", + "asset"."fileCreatedAt", + "asset"."fileModifiedAt", + "asset"."localDateTime", + "asset"."type", + "asset"."deletedAt", + "asset"."isFavorite", + "asset"."visibility", + "asset"."duration", + "asset"."livePhotoVideoId", + "asset"."stackId", + "asset"."libraryId", + "asset"."updateId" +from + "asset" as "asset" + inner join "album_asset" on "album_asset"."assetsId" = "asset"."id" + inner join "album" on "album"."id" = "album_asset"."albumsId" + left join "album_user" on "album_user"."albumsId" = "album_asset"."albumsId" +where + "asset"."updateId" < $1 + and "asset"."updateId" > $2 + and "album_asset"."updateId" <= $3 + and ( + "album"."ownerId" = $4 + or "album_user"."usersId" = $5 + ) +order by + "asset"."updateId" asc + -- SyncRepository.albumAsset.getCreates select "album_asset"."updateId", @@ -99,15 +136,16 @@ select "asset"."stackId", "asset"."libraryId" from - "album_asset" + "album_asset" as "album_asset" inner join "asset" on "asset"."id" = "album_asset"."assetsId" inner join "album" on "album"."id" = "album_asset"."albumsId" left join "album_user" on "album_user"."albumsId" = "album_asset"."albumsId" where "album_asset"."updateId" < $1 + and "album_asset"."updateId" > $2 and ( - "album"."ownerId" = $2 - or "album_user"."usersId" = $3 + "album"."ownerId" = $3 + or "album_user"."usersId" = $4 ) order by "album_asset"."updateId" asc @@ -141,16 +179,60 @@ select "asset_exif"."fps", "album_asset"."updateId" from - "album_asset" + "album_asset" as "album_asset" inner join "asset_exif" on "asset_exif"."assetId" = "album_asset"."assetsId" where - "album_asset"."albumsId" = $1 - and "album_asset"."updateId" < $2 - and "album_asset"."updateId" <= $3 - and "album_asset"."updateId" >= $4 + "album_asset"."updateId" < $1 + and "album_asset"."updateId" <= $2 + and "album_asset"."updateId" >= $3 + and "album_asset"."albumsId" = $4 order by "album_asset"."updateId" asc +-- SyncRepository.albumAssetExif.getUpdates +select + "asset_exif"."assetId", + "asset_exif"."description", + "asset_exif"."exifImageWidth", + "asset_exif"."exifImageHeight", + "asset_exif"."fileSizeInByte", + "asset_exif"."orientation", + "asset_exif"."dateTimeOriginal", + "asset_exif"."modifyDate", + "asset_exif"."timeZone", + "asset_exif"."latitude", + "asset_exif"."longitude", + "asset_exif"."projectionType", + "asset_exif"."city", + "asset_exif"."state", + "asset_exif"."country", + "asset_exif"."make", + "asset_exif"."model", + "asset_exif"."lensModel", + "asset_exif"."fNumber", + "asset_exif"."focalLength", + "asset_exif"."iso", + "asset_exif"."exposureTime", + "asset_exif"."profileDescription", + "asset_exif"."rating", + "asset_exif"."fps", + "asset_exif"."updateId" +from + "asset_exif" as "asset_exif" + inner join "album_asset" on "album_asset"."assetsId" = "asset_exif"."assetId" + inner join "album" on "album"."id" = "album_asset"."albumsId" + left join "album_user" on "album_user"."albumsId" = "album_asset"."albumsId" +where + "asset_exif"."updateId" < $1 + and "asset_exif"."updateId" > $2 + and "album_asset"."updateId" <= $3 + and ( + "album"."ownerId" = $4 + or "album_user"."usersId" = $5 + ) +order by + "asset_exif"."updateId" asc + -- SyncRepository.albumAssetExif.getCreates select "album_asset"."updateId", @@ -180,33 +262,34 @@ select "asset_exif"."rating", "asset_exif"."fps" from - "album_asset" + "album_asset" as "album_asset" inner join "asset_exif" on "asset_exif"."assetId" = "album_asset"."assetsId" inner join "album" on "album"."id" = "album_asset"."albumsId" left join "album_user" on "album_user"."albumsId" = "album_asset"."albumsId" where "album_asset"."updateId" < $1 + and "album_asset"."updateId" > $2 and ( - "album"."ownerId" = $2 - or "album_user"."usersId" = $3 + "album"."ownerId" = $3 + or "album_user"."usersId" = $4 ) order by "album_asset"."updateId" asc -- SyncRepository.albumToAsset.getBackfill select - "album_assets"."assetsId" as "assetId", - "album_assets"."albumsId" as "albumId", - "album_assets"."updateId" + "album_asset"."assetsId" as "assetId", + "album_asset"."albumsId" as "albumId", + "album_asset"."updateId" from - "album_asset" as "album_assets" + "album_asset" as "album_asset" where - "album_assets"."albumsId" = $1 - and "album_assets"."updateId" < $2 - and "album_assets"."updateId" <= $3 - and "album_assets"."updateId" >= $4 + "album_asset"."updateId" < $1 + and "album_asset"."updateId" <= $2 + and "album_asset"."updateId" >= $3 + and "album_asset"."albumsId" = $4 order by - "album_assets"."updateId" asc + "album_asset"."updateId" asc -- SyncRepository.albumToAsset.getDeletes select @@ -214,15 +297,17 @@ select "assetId", "albumId" from - "album_asset_audit" + "album_asset_audit" as "album_asset_audit" where - "albumId" in ( + "album_asset_audit"."id" < $1 + and "album_asset_audit"."id" > $2 + and "albumId" in ( select "id" from "album" where - "ownerId" = $1 + "ownerId" = $3 union ( select @@ -230,12 +315,11 @@ where from "album_user" where - "album_user"."usersId" = $2 + "album_user"."usersId" = $4 ) ) - and "id" < $3 order by - "id" asc + "album_asset_audit"."id" asc -- SyncRepository.albumToAsset.getUpserts select @@ -243,14 +327,15 @@ select "album_asset"."albumsId" as "albumId", "album_asset"."updateId" from - "album_asset" + "album_asset" as "album_asset" inner join "album" on "album"."id" = "album_asset"."albumsId" left join "album_user" on "album_user"."albumsId" = "album_asset"."albumsId" where "album_asset"."updateId" < $1 + and "album_asset"."updateId" > $2 and ( - "album"."ownerId" = $2 - or "album_user"."usersId" = $3 + "album"."ownerId" = $3 + or "album_user"."usersId" = $4 ) order by "album_asset"."updateId" asc @@ -262,14 +347,14 @@ select "album_user"."role", "album_user"."updateId" from - "album_user" + "album_user" as "album_user" where - "albumsId" = $1 - and "updateId" < $2 - and "updateId" <= $3 - and "updateId" >= $4 + "album_user"."updateId" < $1 + and "album_user"."updateId" <= $2 + and "album_user"."updateId" >= $3 + and "albumsId" = $4 order by - "updateId" asc + "album_user"."updateId" asc -- SyncRepository.albumUser.getDeletes select @@ -277,15 +362,17 @@ select "userId", "albumId" from - "album_user_audit" + "album_user_audit" as "album_user_audit" where - "albumId" in ( + "album_user_audit"."id" < $1 + and "album_user_audit"."id" > $2 + and "albumId" in ( select "id" from "album" where - "ownerId" = $1 + "ownerId" = $3 union ( select @@ -293,12 +380,11 @@ where from "album_user" where - "album_user"."usersId" = $2 + "album_user"."usersId" = $4 ) ) - and "id" < $3 order by - "id" asc + "album_user_audit"."id" asc -- SyncRepository.albumUser.getUpserts select @@ -307,16 +393,17 @@ select "album_user"."role", "album_user"."updateId" from - "album_user" + "album_user" as "album_user" where "album_user"."updateId" < $1 + and "album_user"."updateId" > $2 and "album_user"."albumsId" in ( select "id" from "album" where - "ownerId" = $2 + "ownerId" = $3 union ( select @@ -324,7 +411,7 @@ where from "album_user" as "albumUsers" where - "albumUsers"."usersId" = $3 + "albumUsers"."usersId" = $4 ) ) order by @@ -335,12 +422,13 @@ select "id", "assetId" from - "asset_audit" + "asset_audit" as "asset_audit" where - "ownerId" = $1 - and "id" < $2 + "asset_audit"."id" < $1 + and "asset_audit"."id" > $2 + and "ownerId" = $3 order by - "id" asc + "asset_audit"."id" asc -- SyncRepository.asset.getUpserts select @@ -362,12 +450,13 @@ select "asset"."libraryId", "asset"."updateId" from - "asset" + "asset" as "asset" where - "ownerId" = $1 - and "updateId" < $2 + "asset"."updateId" < $1 + and "asset"."updateId" > $2 + and "ownerId" = $3 order by - "updateId" asc + "asset"."updateId" asc -- SyncRepository.assetExif.getUpserts select @@ -398,30 +487,32 @@ select "asset_exif"."fps", "asset_exif"."updateId" from - "asset_exif" + "asset_exif" as "asset_exif" where - "assetId" in ( + "asset_exif"."updateId" < $1 + and "asset_exif"."updateId" > $2 + and "assetId" in ( select "id" from "asset" where - "ownerId" = $1 + "ownerId" = $3 ) - and "updateId" < $2 order by - "updateId" asc + "asset_exif"."updateId" asc -- SyncRepository.assetFace.getDeletes select "asset_face_audit"."id", "assetFaceId" from - "asset_face_audit" + "asset_face_audit" as "asset_face_audit" left join "asset" on "asset"."id" = "asset_face_audit"."assetId" where - "asset"."ownerId" = $1 - and "asset_face_audit"."id" < $2 + "asset_face_audit"."id" < $1 + and "asset_face_audit"."id" > $2 + and "asset"."ownerId" = $3 order by "asset_face_audit"."id" asc @@ -439,25 +530,51 @@ select "sourceType", "asset_face"."updateId" from - "asset_face" + "asset_face" as "asset_face" left join "asset" on "asset"."id" = "asset_face"."assetId" where "asset_face"."updateId" < $1 - and "asset"."ownerId" = $2 + and "asset_face"."updateId" > $2 + and "asset"."ownerId" = $3 order by "asset_face"."updateId" asc +-- SyncRepository.authUser.getUpserts +select + "id", + "name", + "email", + "avatarColor", + "deletedAt", + "updateId", + "profileImagePath", + "profileChangedAt", + "isAdmin", + "pinCode", + "oauthId", + "storageLabel", + "quotaSizeInBytes", + "quotaUsageInBytes" +from + "user" as "user" +where + "user"."updateId" < $1 + and "user"."updateId" > $2 +order by + "user"."updateId" asc + -- SyncRepository.memory.getDeletes select "id", "memoryId" from - "memory_audit" + "memory_audit" as "memory_audit" where - "userId" = $1 - and "id" < $2 + "memory_audit"."id" < $1 + and "memory_audit"."id" > $2 + and "userId" = $3 order by - "id" asc + "memory_audit"."id" asc -- SyncRepository.memory.getUpserts select @@ -475,12 +592,13 @@ select "hideAt", "updateId" from - "memory" + "memory" as "memory" where - "ownerId" = $1 - and "updateId" < $2 + "memory"."updateId" < $1 + and "memory"."updateId" > $2 + and "ownerId" = $3 order by - "updateId" asc + "memory"."updateId" asc -- SyncRepository.memoryToAsset.getDeletes select @@ -488,19 +606,20 @@ select "memoryId", "assetId" from - "memory_asset_audit" + "memory_asset_audit" as "memory_asset_audit" where - "memoryId" in ( + "memory_asset_audit"."id" < $1 + and "memory_asset_audit"."id" > $2 + and "memoryId" in ( select "id" from "memory" where - "ownerId" = $1 + "ownerId" = $3 ) - and "id" < $2 order by - "id" asc + "memory_asset_audit"."id" asc -- SyncRepository.memoryToAsset.getUpserts select @@ -508,19 +627,20 @@ select "assetsId" as "assetId", "updateId" from - "memory_asset" + "memory_asset" as "memory_asset" where - "memoriesId" in ( + "memory_asset"."updateId" < $1 + and "memory_asset"."updateId" > $2 + and "memoriesId" in ( select "id" from "memory" where - "ownerId" = $1 + "ownerId" = $3 ) - and "updateId" < $2 order by - "updateId" asc + "memory_asset"."updateId" asc -- SyncRepository.partner.getCreatedAfter select @@ -530,7 +650,8 @@ from "partner" where "sharedWithId" = $1 - and "createId" < $2 + and "createId" >= $2 + and "createId" < $3 order by "partner"."createId" asc @@ -540,15 +661,16 @@ select "sharedById", "sharedWithId" from - "partner_audit" + "partner_audit" as "partner_audit" where - ( - "sharedById" = $1 - or "sharedWithId" = $2 + "partner_audit"."id" < $1 + and "partner_audit"."id" > $2 + and ( + "sharedById" = $3 + or "sharedWithId" = $4 ) - and "id" < $3 order by - "id" asc + "partner_audit"."id" asc -- SyncRepository.partner.getUpserts select @@ -557,15 +679,16 @@ select "inTimeline", "updateId" from - "partner" + "partner" as "partner" where - ( - "sharedById" = $1 - or "sharedWithId" = $2 + "partner"."updateId" < $1 + and "partner"."updateId" > $2 + and ( + "sharedById" = $3 + or "sharedWithId" = $4 ) - and "updateId" < $3 order by - "updateId" asc + "partner"."updateId" asc -- SyncRepository.partnerAsset.getBackfill select @@ -587,33 +710,34 @@ select "asset"."libraryId", "asset"."updateId" from - "asset" + "asset" as "asset" where - "ownerId" = $1 - and "updateId" < $2 - and "updateId" <= $3 - and "updateId" >= $4 + "asset"."updateId" < $1 + and "asset"."updateId" <= $2 + and "asset"."updateId" >= $3 + and "ownerId" = $4 order by - "updateId" asc + "asset"."updateId" asc -- SyncRepository.partnerAsset.getDeletes select "id", "assetId" from - "asset_audit" + "asset_audit" as "asset_audit" where - "ownerId" in ( + "asset_audit"."id" < $1 + and "asset_audit"."id" > $2 + and "ownerId" in ( select "sharedById" from "partner" where - "sharedWithId" = $1 + "sharedWithId" = $3 ) - and "id" < $2 order by - "id" asc + "asset_audit"."id" asc -- SyncRepository.partnerAsset.getUpserts select @@ -635,19 +759,20 @@ select "asset"."libraryId", "asset"."updateId" from - "asset" + "asset" as "asset" where - "ownerId" in ( + "asset"."updateId" < $1 + and "asset"."updateId" > $2 + and "ownerId" in ( select "sharedById" from "partner" where - "sharedWithId" = $1 + "sharedWithId" = $3 ) - and "updateId" < $2 order by - "updateId" asc + "asset"."updateId" asc -- SyncRepository.partnerAssetExif.getBackfill select @@ -678,13 +803,13 @@ select "asset_exif"."fps", "asset_exif"."updateId" from - "asset_exif" + "asset_exif" as "asset_exif" inner join "asset" on "asset"."id" = "asset_exif"."assetId" where - "asset"."ownerId" = $1 - and "asset_exif"."updateId" < $2 - and "asset_exif"."updateId" <= $3 - and "asset_exif"."updateId" >= $4 + "asset_exif"."updateId" < $1 + and "asset_exif"."updateId" <= $2 + and "asset_exif"."updateId" >= $3 + and "asset"."ownerId" = $4 order by "asset_exif"."updateId" asc @@ -717,9 +842,11 @@ select "asset_exif"."fps", "asset_exif"."updateId" from - "asset_exif" + "asset_exif" as "asset_exif" where - "assetId" in ( + "asset_exif"."updateId" < $1 + and "asset_exif"."updateId" > $2 + and "assetId" in ( select "id" from @@ -731,31 +858,31 @@ where from "partner" where - "sharedWithId" = $1 + "sharedWithId" = $3 ) ) - and "updateId" < $2 order by - "updateId" asc + "asset_exif"."updateId" asc -- SyncRepository.partnerStack.getDeletes select "id", "stackId" from - "stack_audit" + "stack_audit" as "stack_audit" where - "userId" in ( + "stack_audit"."id" < $1 + and "stack_audit"."id" > $2 + and "userId" in ( select "sharedById" from "partner" where - "sharedWithId" = $1 + "sharedWithId" = $3 ) - and "id" < $2 order by - "id" asc + "stack_audit"."id" asc -- SyncRepository.partnerStack.getBackfill select @@ -766,14 +893,14 @@ select "stack"."ownerId", "updateId" from - "stack" + "stack" as "stack" where - "ownerId" = $1 - and "updateId" < $2 - and "updateId" <= $3 - and "updateId" >= $4 + "stack"."updateId" < $1 + and "stack"."updateId" <= $2 + and "stack"."updateId" >= $3 + and "ownerId" = $4 order by - "updateId" asc + "stack"."updateId" asc -- SyncRepository.partnerStack.getUpserts select @@ -784,31 +911,33 @@ select "stack"."ownerId", "updateId" from - "stack" + "stack" as "stack" where - "ownerId" in ( + "stack"."updateId" < $1 + and "stack"."updateId" > $2 + and "ownerId" in ( select "sharedById" from "partner" where - "sharedWithId" = $1 + "sharedWithId" = $3 ) - and "updateId" < $2 order by - "updateId" asc + "stack"."updateId" asc -- SyncRepository.people.getDeletes select "id", "personId" from - "person_audit" + "person_audit" as "person_audit" where - "ownerId" = $1 - and "id" < $2 + "person_audit"."id" < $1 + and "person_audit"."id" > $2 + and "ownerId" = $3 order by - "id" asc + "person_audit"."id" asc -- SyncRepository.people.getUpserts select @@ -824,24 +953,26 @@ select "updateId", "faceAssetId" from - "person" + "person" as "person" where - "ownerId" = $1 - and "updateId" < $2 + "person"."updateId" < $1 + and "person"."updateId" > $2 + and "ownerId" = $3 order by - "updateId" asc + "person"."updateId" asc -- SyncRepository.stack.getDeletes select "id", "stackId" from - "stack_audit" + "stack_audit" as "stack_audit" where - "userId" = $1 - and "id" < $2 + "stack_audit"."id" < $1 + and "stack_audit"."id" > $2 + and "userId" = $3 order by - "id" asc + "stack_audit"."id" asc -- SyncRepository.stack.getUpserts select @@ -852,23 +983,25 @@ select "stack"."ownerId", "updateId" from - "stack" + "stack" as "stack" where - "ownerId" = $1 - and "updateId" < $2 + "stack"."updateId" < $1 + and "stack"."updateId" > $2 + and "ownerId" = $3 order by - "updateId" asc + "stack"."updateId" asc -- SyncRepository.user.getDeletes select "id", "userId" from - "user_audit" + "user_audit" as "user_audit" where - "id" < $1 + "user_audit"."id" < $1 + and "user_audit"."id" > $2 order by - "id" asc + "user_audit"."id" asc -- SyncRepository.user.getUpserts select @@ -881,11 +1014,12 @@ select "profileImagePath", "profileChangedAt" from - "user" + "user" as "user" where - "updateId" < $1 + "user"."updateId" < $1 + and "user"."updateId" > $2 order by - "updateId" asc + "user"."updateId" asc -- SyncRepository.userMetadata.getDeletes select @@ -893,12 +1027,13 @@ select "userId", "key" from - "user_metadata_audit" + "user_metadata_audit" as "user_metadata_audit" where - "userId" = $1 - and "id" < $2 + "user_metadata_audit"."id" < $1 + and "user_metadata_audit"."id" > $2 + and "userId" = $3 order by - "id" asc + "user_metadata_audit"."id" asc -- SyncRepository.userMetadata.getUpserts select @@ -907,9 +1042,10 @@ select "value", "updateId" from - "user_metadata" + "user_metadata" as "user_metadata" where - "userId" = $1 - and "updateId" < $2 + "user_metadata"."updateId" < $1 + and "user_metadata"."updateId" > $2 + and "userId" = $3 order by - "updateId" asc + "user_metadata"."updateId" asc diff --git a/server/src/repositories/sync.repository.ts b/server/src/repositories/sync.repository.ts index 44191ccabd..13e933fd2f 100644 --- a/server/src/repositories/sync.repository.ts +++ b/server/src/repositories/sync.repository.ts @@ -1,41 +1,47 @@ import { Injectable } from '@nestjs/common'; -import { Kysely, SelectQueryBuilder } from 'kysely'; +import { Kysely } from 'kysely'; import { InjectKysely } from 'nestjs-kysely'; import { columns } from 'src/database'; import { DummyValue, GenerateSql } from 'src/decorators'; import { DB } from 'src/schema'; import { SyncAck } from 'src/types'; -type AuditTables = - | 'user_audit' - | 'partner_audit' - | 'asset_audit' - | 'album_audit' - | 'album_user_audit' - | 'album_asset_audit' - | 'memory_audit' - | 'memory_asset_audit' - | 'stack_audit' - | 'person_audit' - | 'user_metadata_audit' - | 'asset_face_audit'; -type UpsertTables = - | 'user' - | 'partner' - | 'asset' - | 'asset_exif' - | 'album' - | 'album_user' - | 'memory' - | 'memory_asset' - | 'stack' - | 'person' - | 'user_metadata' - | 'asset_face'; +export type SyncBackfillOptions = { + nowId: string; + afterUpdateId?: string; + beforeUpdateId: string; +}; + +const dummyBackfillOptions = { + nowId: DummyValue.UUID, + beforeUpdateId: DummyValue.UUID, + afterUpdateId: DummyValue.UUID, +}; + +export type SyncCreatedAfterOptions = { + nowId: string; + userId: string; + afterCreateId?: string; +}; + +const dummyCreateAfterOptions = { + nowId: DummyValue.UUID, + userId: DummyValue.UUID, + afterCreateId: DummyValue.UUID, +}; export type SyncQueryOptions = { nowId: string; userId: string; + ack?: SyncAck; +}; + +const dummyQueryOptions = { + nowId: DummyValue.UUID, + userId: DummyValue.UUID, + ack: { + updateId: DummyValue.UUID, + }, }; @Injectable() @@ -86,30 +92,44 @@ export class SyncRepository { class BaseSync { constructor(protected db: Kysely) {} - protected auditTableFilters(nowId: string, ack?: SyncAck) { - return , D>(qb: SelectQueryBuilder) => { - const builder = qb as SelectQueryBuilder; - return builder - .where('id', '<', nowId) - .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) - .orderBy('id', 'asc') as SelectQueryBuilder; - }; + protected backfillQuery(t: T, { nowId, beforeUpdateId, afterUpdateId }: SyncBackfillOptions) { + const { table, ref } = this.db.dynamic; + const updateIdRef = ref(`${t}.updateId`); + + return this.db + .selectFrom(table(t).as(t)) + .where(updateIdRef, '<', nowId) + .where(updateIdRef, '<=', beforeUpdateId) + .$if(!!afterUpdateId, (qb) => qb.where(updateIdRef, '>=', afterUpdateId!)) + .orderBy(updateIdRef, 'asc'); } - protected upsertTableFilters(nowId: string, ack?: SyncAck) { - return , D>(qb: SelectQueryBuilder) => { - const builder = qb as SelectQueryBuilder; - return builder - .where('updateId', '<', nowId) - .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) - .orderBy('updateId', 'asc') as SelectQueryBuilder; - }; + protected auditQuery(t: T, { nowId, ack }: SyncQueryOptions) { + const { table, ref } = this.db.dynamic; + const idRef = ref(`${t}.id`); + + return this.db + .selectFrom(table(t).as(t)) + .where(idRef, '<', nowId) + .$if(!!ack, (qb) => qb.where(idRef, '>', ack!.updateId)) + .orderBy(idRef, 'asc'); + } + + protected upsertQuery(t: T, { nowId, ack }: SyncQueryOptions) { + const { table, ref } = this.db.dynamic; + const updateIdRef = ref(`${t}.updateId`); + + return this.db + .selectFrom(table(t).as(t)) + .where(updateIdRef, '<', nowId) + .$if(!!ack, (qb) => qb.where(updateIdRef, '>', ack!.updateId)) + .orderBy(updateIdRef, 'asc'); } } class AlbumSync extends BaseSync { - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID] }) - getCreatedAfter({ nowId, userId }: SyncQueryOptions, afterCreateId?: string) { + @GenerateSql({ params: [dummyCreateAfterOptions] }) + getCreatedAfter({ nowId, userId, afterCreateId }: SyncCreatedAfterOptions) { return this.db .selectFrom('album_user') .select(['albumsId as id', 'createId']) @@ -120,24 +140,19 @@ class AlbumSync extends BaseSync { .execute(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('album_audit') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getDeletes(options: SyncQueryOptions) { + return this.auditQuery('album_audit', options) .select(['id', 'albumId']) - .where('userId', '=', userId) - .$call(this.auditTableFilters(nowId, ack)) + .where('userId', '=', options.userId) .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('album') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getUpserts(options: SyncQueryOptions) { + const userId = options.userId; + return this.upsertQuery('album', options) .distinctOn(['album.id', 'album.updateId']) - .where('album.updateId', '<', nowId) - .$if(!!ack, (qb) => qb.where('album.updateId', '>', ack!.updateId)) - .orderBy('album.updateId', 'asc') .leftJoin('album_user as album_users', 'album.id', 'album_users.albumsId') .where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_users.usersId', '=', userId)])) .select([ @@ -157,132 +172,96 @@ class AlbumSync extends BaseSync { } class AlbumAssetSync extends BaseSync { - @GenerateSql({ - params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], - stream: true, - }) - getBackfill({ nowId }: SyncQueryOptions, albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { - return this.db - .selectFrom('album_asset') + @GenerateSql({ params: [dummyBackfillOptions, DummyValue.UUID], stream: true }) + getBackfill(options: SyncBackfillOptions, albumId: string) { + return this.backfillQuery('album_asset', options) .innerJoin('asset', 'asset.id', 'album_asset.assetsId') .select(columns.syncAsset) .select('album_asset.updateId') .where('album_asset.albumsId', '=', albumId) - .where('album_asset.updateId', '<', nowId) - .where('album_asset.updateId', '<=', beforeUpdateId) - .$if(!!afterUpdateId, (eb) => eb.where('album_asset.updateId', '>=', afterUpdateId!)) - .orderBy('album_asset.updateId', 'asc') .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpdates({ nowId, userId }: SyncQueryOptions, albumToAssetAck: SyncAck, ack?: SyncAck) { - return this.db - .selectFrom('asset') + @GenerateSql({ params: [dummyQueryOptions, { updateId: DummyValue.UUID }], stream: true }) + getUpdates(options: SyncQueryOptions, albumToAssetAck: SyncAck) { + const userId = options.userId; + return this.upsertQuery('asset', options) .innerJoin('album_asset', 'album_asset.assetsId', 'asset.id') .select(columns.syncAsset) .select('asset.updateId') - .where('asset.updateId', '<', nowId) .where('album_asset.updateId', '<=', albumToAssetAck.updateId) // Ensure we only send updates for assets that the client already knows about - .$if(!!ack, (qb) => qb.where('asset.updateId', '>', ack!.updateId)) - .orderBy('asset.updateId', 'asc') .innerJoin('album', 'album.id', 'album_asset.albumsId') .leftJoin('album_user', 'album_user.albumsId', 'album_asset.albumsId') .where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_user.usersId', '=', userId)])) .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getCreates({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('album_asset') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getCreates(options: SyncQueryOptions) { + const userId = options.userId; + return this.upsertQuery('album_asset', options) .select('album_asset.updateId') .innerJoin('asset', 'asset.id', 'album_asset.assetsId') .select(columns.syncAsset) .innerJoin('album', 'album.id', 'album_asset.albumsId') .leftJoin('album_user', 'album_user.albumsId', 'album_asset.albumsId') - .where('album_asset.updateId', '<', nowId) .where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_user.usersId', '=', userId)])) - .$if(!!ack, (qb) => qb.where('album_asset.updateId', '>', ack!.updateId)) - .orderBy('album_asset.updateId', 'asc') .stream(); } } class AlbumAssetExifSync extends BaseSync { - @GenerateSql({ - params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], - stream: true, - }) - getBackfill({ nowId }: SyncQueryOptions, albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { - return this.db - .selectFrom('album_asset') + @GenerateSql({ params: [dummyBackfillOptions, DummyValue.UUID], stream: true }) + getBackfill(options: SyncBackfillOptions, albumId: string) { + return this.backfillQuery('album_asset', options) .innerJoin('asset_exif', 'asset_exif.assetId', 'album_asset.assetsId') .select(columns.syncAssetExif) .select('album_asset.updateId') .where('album_asset.albumsId', '=', albumId) - .where('album_asset.updateId', '<', nowId) - .where('album_asset.updateId', '<=', beforeUpdateId) - .$if(!!afterUpdateId, (eb) => eb.where('album_asset.updateId', '>=', afterUpdateId!)) - .orderBy('album_asset.updateId', 'asc') .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpdates({ nowId, userId }: SyncQueryOptions, albumToAssetAck: SyncAck, ack?: SyncAck) { - return this.db - .selectFrom('asset_exif') + @GenerateSql({ params: [dummyQueryOptions, { updateId: DummyValue.UUID }], stream: true }) + getUpdates(options: SyncQueryOptions, albumToAssetAck: SyncAck) { + const userId = options.userId; + return this.upsertQuery('asset_exif', options) .innerJoin('album_asset', 'album_asset.assetsId', 'asset_exif.assetId') .select(columns.syncAssetExif) .select('asset_exif.updateId') .where('album_asset.updateId', '<=', albumToAssetAck.updateId) // Ensure we only send exif updates for assets that the client already knows about - .where('asset_exif.updateId', '<', nowId) - .$if(!!ack, (qb) => qb.where('asset_exif.updateId', '>', ack!.updateId)) - .orderBy('asset_exif.updateId', 'asc') .innerJoin('album', 'album.id', 'album_asset.albumsId') .leftJoin('album_user', 'album_user.albumsId', 'album_asset.albumsId') .where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_user.usersId', '=', userId)])) .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getCreates({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('album_asset') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getCreates(options: SyncQueryOptions) { + const userId = options.userId; + return this.upsertQuery('album_asset', options) .select('album_asset.updateId') .innerJoin('asset_exif', 'asset_exif.assetId', 'album_asset.assetsId') .select(columns.syncAssetExif) .innerJoin('album', 'album.id', 'album_asset.albumsId') .leftJoin('album_user', 'album_user.albumsId', 'album_asset.albumsId') - .where('album_asset.updateId', '<', nowId) .where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_user.usersId', '=', userId)])) - .$if(!!ack, (qb) => qb.where('album_asset.updateId', '>', ack!.updateId)) - .orderBy('album_asset.updateId', 'asc') .stream(); } } class AlbumToAssetSync extends BaseSync { - @GenerateSql({ - params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], - stream: true, - }) - getBackfill({ nowId }: SyncQueryOptions, albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { - return this.db - .selectFrom('album_asset as album_assets') - .select(['album_assets.assetsId as assetId', 'album_assets.albumsId as albumId', 'album_assets.updateId']) - .where('album_assets.albumsId', '=', albumId) - .where('album_assets.updateId', '<', nowId) - .where('album_assets.updateId', '<=', beforeUpdateId) - .$if(!!afterUpdateId, (eb) => eb.where('album_assets.updateId', '>=', afterUpdateId!)) - .orderBy('album_assets.updateId', 'asc') + @GenerateSql({ params: [dummyBackfillOptions, DummyValue.UUID], stream: true }) + getBackfill(options: SyncBackfillOptions, albumId: string) { + return this.backfillQuery('album_asset', options) + .select(['album_asset.assetsId as assetId', 'album_asset.albumsId as albumId', 'album_asset.updateId']) + .where('album_asset.albumsId', '=', albumId) .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('album_asset_audit') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getDeletes(options: SyncQueryOptions) { + const userId = options.userId; + return this.auditQuery('album_asset_audit', options) .select(['id', 'assetId', 'albumId']) .where((eb) => eb( @@ -302,18 +281,14 @@ class AlbumToAssetSync extends BaseSync { ), ), ) - .$call(this.auditTableFilters(nowId, ack)) .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('album_asset') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getUpserts(options: SyncQueryOptions) { + const userId = options.userId; + return this.upsertQuery('album_asset', options) .select(['album_asset.assetsId as assetId', 'album_asset.albumsId as albumId', 'album_asset.updateId']) - .where('album_asset.updateId', '<', nowId) - .$if(!!ack, (qb) => qb.where('album_asset.updateId', '>', ack!.updateId)) - .orderBy('album_asset.updateId', 'asc') .innerJoin('album', 'album.id', 'album_asset.albumsId') .leftJoin('album_user', 'album_user.albumsId', 'album_asset.albumsId') .where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_user.usersId', '=', userId)])) @@ -322,27 +297,19 @@ class AlbumToAssetSync extends BaseSync { } class AlbumUserSync extends BaseSync { - @GenerateSql({ - params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], - stream: true, - }) - getBackfill({ nowId }: SyncQueryOptions, albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { - return this.db - .selectFrom('album_user') + @GenerateSql({ params: [dummyBackfillOptions, DummyValue.UUID], stream: true }) + getBackfill(options: SyncBackfillOptions, albumId: string) { + return this.backfillQuery('album_user', options) .select(columns.syncAlbumUser) .select('album_user.updateId') .where('albumsId', '=', albumId) - .where('updateId', '<', nowId) - .where('updateId', '<=', beforeUpdateId) - .$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!)) - .orderBy('updateId', 'asc') .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('album_user_audit') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getDeletes(options: SyncQueryOptions) { + const userId = options.userId; + return this.auditQuery('album_user_audit', options) .select(['id', 'userId', 'albumId']) .where((eb) => eb( @@ -362,19 +329,15 @@ class AlbumUserSync extends BaseSync { ), ), ) - .$call(this.auditTableFilters(nowId, ack)) .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('album_user') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getUpserts(options: SyncQueryOptions) { + const userId = options.userId; + return this.upsertQuery('album_user', options) .select(columns.syncAlbumUser) .select('album_user.updateId') - .where('album_user.updateId', '<', nowId) - .$if(!!ack, (qb) => qb.where('album_user.updateId', '>', ack!.updateId)) - .orderBy('album_user.updateId', 'asc') .where((eb) => eb( 'album_user.albumsId', @@ -398,55 +361,46 @@ class AlbumUserSync extends BaseSync { } class AssetSync extends BaseSync { - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('asset_audit') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getDeletes(options: SyncQueryOptions) { + return this.auditQuery('asset_audit', options) .select(['id', 'assetId']) - .where('ownerId', '=', userId) - .$call(this.auditTableFilters(nowId, ack)) + .where('ownerId', '=', options.userId) .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('asset') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getUpserts(options: SyncQueryOptions) { + return this.upsertQuery('asset', options) .select(columns.syncAsset) .select('asset.updateId') - .where('ownerId', '=', userId) - .$call(this.upsertTableFilters(nowId, ack)) + .where('ownerId', '=', options.userId) .stream(); } } class AuthUserSync extends BaseSync { - @GenerateSql({ params: [], stream: true }) - getUpserts({ nowId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('user') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getUpserts(options: SyncQueryOptions) { + return this.upsertQuery('user', options) .select(columns.syncUser) .select(['isAdmin', 'pinCode', 'oauthId', 'storageLabel', 'quotaSizeInBytes', 'quotaUsageInBytes']) - .$call(this.upsertTableFilters(nowId, ack)) .stream(); } } class PersonSync extends BaseSync { - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('person_audit') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getDeletes(options: SyncQueryOptions) { + return this.auditQuery('person_audit', options) .select(['id', 'personId']) - .where('ownerId', '=', userId) - .$call(this.auditTableFilters(nowId, ack)) + .where('ownerId', '=', options.userId) .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('person') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getUpserts(options: SyncQueryOptions) { + return this.upsertQuery('person', options) .select([ 'id', 'createdAt', @@ -460,30 +414,24 @@ class PersonSync extends BaseSync { 'updateId', 'faceAssetId', ]) - .where('ownerId', '=', userId) - .$call(this.upsertTableFilters(nowId, ack)) + .where('ownerId', '=', options.userId) .stream(); } } class AssetFaceSync extends BaseSync { - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('asset_face_audit') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getDeletes(options: SyncQueryOptions) { + return this.auditQuery('asset_face_audit', options) .select(['asset_face_audit.id', 'assetFaceId']) - .orderBy('asset_face_audit.id', 'asc') .leftJoin('asset', 'asset.id', 'asset_face_audit.assetId') - .where('asset.ownerId', '=', userId) - .where('asset_face_audit.id', '<', nowId) - .$if(!!ack, (qb) => qb.where('asset_face_audit.id', '>', ack!.updateId)) + .where('asset.ownerId', '=', options.userId) .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('asset_face') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getUpserts(options: SyncQueryOptions) { + return this.upsertQuery('asset_face', options) .select([ 'asset_face.id', 'assetId', @@ -497,43 +445,35 @@ class AssetFaceSync extends BaseSync { 'sourceType', 'asset_face.updateId', ]) - .where('asset_face.updateId', '<', nowId) - .$if(!!ack, (qb) => qb.where('asset_face.updateId', '>', ack!.updateId)) - .orderBy('asset_face.updateId', 'asc') .leftJoin('asset', 'asset.id', 'asset_face.assetId') - .where('asset.ownerId', '=', userId) + .where('asset.ownerId', '=', options.userId) .stream(); } } class AssetExifSync extends BaseSync { - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('asset_exif') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getUpserts(options: SyncQueryOptions) { + return this.upsertQuery('asset_exif', options) .select(columns.syncAssetExif) .select('asset_exif.updateId') - .where('assetId', 'in', (eb) => eb.selectFrom('asset').select('id').where('ownerId', '=', userId)) - .$call(this.upsertTableFilters(nowId, ack)) + .where('assetId', 'in', (eb) => eb.selectFrom('asset').select('id').where('ownerId', '=', options.userId)) .stream(); } } class MemorySync extends BaseSync { - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('memory_audit') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getDeletes(options: SyncQueryOptions) { + return this.auditQuery('memory_audit', options) .select(['id', 'memoryId']) - .where('userId', '=', userId) - .$call(this.auditTableFilters(nowId, ack)) + .where('userId', '=', options.userId) .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('memory') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getUpserts(options: SyncQueryOptions) { + return this.upsertQuery('memory', options) .select([ 'id', 'createdAt', @@ -549,38 +489,33 @@ class MemorySync extends BaseSync { 'hideAt', ]) .select('updateId') - .where('ownerId', '=', userId) - .$call(this.upsertTableFilters(nowId, ack)) + .where('ownerId', '=', options.userId) .stream(); } } class MemoryToAssetSync extends BaseSync { - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('memory_asset_audit') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getDeletes(options: SyncQueryOptions) { + return this.auditQuery('memory_asset_audit', options) .select(['id', 'memoryId', 'assetId']) - .where('memoryId', 'in', (eb) => eb.selectFrom('memory').select('id').where('ownerId', '=', userId)) - .$call(this.auditTableFilters(nowId, ack)) + .where('memoryId', 'in', (eb) => eb.selectFrom('memory').select('id').where('ownerId', '=', options.userId)) .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('memory_asset') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getUpserts(options: SyncQueryOptions) { + return this.upsertQuery('memory_asset', options) .select(['memoriesId as memoryId', 'assetsId as assetId']) .select('updateId') - .where('memoriesId', 'in', (eb) => eb.selectFrom('memory').select('id').where('ownerId', '=', userId)) - .$call(this.upsertTableFilters(nowId, ack)) + .where('memoriesId', 'in', (eb) => eb.selectFrom('memory').select('id').where('ownerId', '=', options.userId)) .stream(); } } class PartnerSync extends BaseSync { - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }] }) - getCreatedAfter({ nowId, userId }: SyncQueryOptions, afterCreateId?: string) { + @GenerateSql({ params: [dummyCreateAfterOptions] }) + getCreatedAfter({ nowId, userId, afterCreateId }: SyncCreatedAfterOptions) { return this.db .selectFrom('partner') .select(['sharedById', 'createId']) @@ -591,104 +526,71 @@ class PartnerSync extends BaseSync { .execute(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('partner_audit') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getDeletes(options: SyncQueryOptions) { + const userId = options.userId; + return this.auditQuery('partner_audit', options) .select(['id', 'sharedById', 'sharedWithId']) .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) - .$call(this.auditTableFilters(nowId, ack)) .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('partner') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getUpserts(options: SyncQueryOptions) { + const userId = options.userId; + return this.upsertQuery('partner', options) .select(['sharedById', 'sharedWithId', 'inTimeline', 'updateId']) .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) - .$call(this.upsertTableFilters(nowId, ack)) .stream(); } } class PartnerAssetsSync extends BaseSync { - @GenerateSql({ - params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], - stream: true, - }) - getBackfill( - { nowId }: SyncQueryOptions, - partnerId: string, - afterUpdateId: string | undefined, - beforeUpdateId: string, - ) { - return this.db - .selectFrom('asset') + @GenerateSql({ params: [dummyBackfillOptions, DummyValue.UUID], stream: true }) + getBackfill(options: SyncBackfillOptions, partnerId: string) { + return this.backfillQuery('asset', options) .select(columns.syncAsset) .select('asset.updateId') .where('ownerId', '=', partnerId) - .where('updateId', '<', nowId) - .where('updateId', '<=', beforeUpdateId) - .$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!)) - .orderBy('updateId', 'asc') .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('asset_audit') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getDeletes(options: SyncQueryOptions) { + return this.auditQuery('asset_audit', options) .select(['id', 'assetId']) .where('ownerId', 'in', (eb) => - eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId), + eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', options.userId), ) - .$call(this.auditTableFilters(nowId, ack)) .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('asset') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getUpserts(options: SyncQueryOptions) { + return this.upsertQuery('asset', options) .select(columns.syncAsset) .select('asset.updateId') .where('ownerId', 'in', (eb) => - eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId), + eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', options.userId), ) - .$call(this.upsertTableFilters(nowId, ack)) .stream(); } } class PartnerAssetExifsSync extends BaseSync { - @GenerateSql({ - params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], - stream: true, - }) - getBackfill( - { nowId }: SyncQueryOptions, - partnerId: string, - afterUpdateId: string | undefined, - beforeUpdateId: string, - ) { - return this.db - .selectFrom('asset_exif') + @GenerateSql({ params: [dummyBackfillOptions, DummyValue.UUID], stream: true }) + getBackfill(options: SyncBackfillOptions, partnerId: string) { + return this.backfillQuery('asset_exif', options) .select(columns.syncAssetExif) .select('asset_exif.updateId') .innerJoin('asset', 'asset.id', 'asset_exif.assetId') .where('asset.ownerId', '=', partnerId) - .where('asset_exif.updateId', '<', nowId) - .where('asset_exif.updateId', '<=', beforeUpdateId) - .$if(!!afterUpdateId, (eb) => eb.where('asset_exif.updateId', '>=', afterUpdateId!)) - .orderBy('asset_exif.updateId', 'asc') .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('asset_exif') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getUpserts(options: SyncQueryOptions) { + return this.upsertQuery('asset_exif', options) .select(columns.syncAssetExif) .select('asset_exif.updateId') .where('assetId', 'in', (eb) => @@ -696,114 +598,90 @@ class PartnerAssetExifsSync extends BaseSync { .selectFrom('asset') .select('id') .where('ownerId', 'in', (eb) => - eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId), + eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', options.userId), ), ) - .$call(this.upsertTableFilters(nowId, ack)) .stream(); } } class StackSync extends BaseSync { - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('stack_audit') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getDeletes(options: SyncQueryOptions) { + return this.auditQuery('stack_audit', options) .select(['id', 'stackId']) - .where('userId', '=', userId) - .$call(this.auditTableFilters(nowId, ack)) + .where('userId', '=', options.userId) .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('stack') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getUpserts(options: SyncQueryOptions) { + return this.upsertQuery('stack', options) .select(columns.syncStack) .select('updateId') - .where('ownerId', '=', userId) - .$call(this.upsertTableFilters(nowId, ack)) + .where('ownerId', '=', options.userId) .stream(); } } class PartnerStackSync extends BaseSync { - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('stack_audit') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getDeletes(options: SyncQueryOptions) { + return this.auditQuery('stack_audit', options) .select(['id', 'stackId']) - .where('userId', 'in', (eb) => eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId)) - .$call(this.auditTableFilters(nowId, ack)) + .where('userId', 'in', (eb) => + eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', options.userId), + ) .stream(); } - @GenerateSql({ - params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], - stream: true, - }) - getBackfill( - { nowId }: SyncQueryOptions, - partnerId: string, - afterUpdateId: string | undefined, - beforeUpdateId: string, - ) { - return this.db - .selectFrom('stack') + @GenerateSql({ params: [dummyBackfillOptions, DummyValue.UUID], stream: true }) + getBackfill(options: SyncBackfillOptions, partnerId: string) { + return this.backfillQuery('stack', options) .select(columns.syncStack) .select('updateId') .where('ownerId', '=', partnerId) - .where('updateId', '<', nowId) - .where('updateId', '<=', beforeUpdateId) - .$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!)) - .orderBy('updateId', 'asc') .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('stack') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getUpserts(options: SyncQueryOptions) { + return this.upsertQuery('stack', options) .select(columns.syncStack) .select('updateId') .where('ownerId', 'in', (eb) => - eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId), + eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', options.userId), ) - .$call(this.upsertTableFilters(nowId, ack)) .stream(); } } class UserSync extends BaseSync { - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getDeletes({ nowId }: SyncQueryOptions, ack?: SyncAck) { - return this.db.selectFrom('user_audit').select(['id', 'userId']).$call(this.auditTableFilters(nowId, ack)).stream(); + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getDeletes(options: SyncQueryOptions) { + return this.auditQuery('user_audit', options).select(['id', 'userId']).stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpserts({ nowId }: SyncQueryOptions, ack?: SyncAck) { - return this.db.selectFrom('user').select(columns.syncUser).$call(this.upsertTableFilters(nowId, ack)).stream(); + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getUpserts(options: SyncQueryOptions) { + return this.upsertQuery('user', options).select(columns.syncUser).stream(); } } class UserMetadataSync extends BaseSync { - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('user_metadata_audit') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getDeletes(options: SyncQueryOptions) { + return this.auditQuery('user_metadata_audit', options) .select(['id', 'userId', 'key']) - .where('userId', '=', userId) - .$call(this.auditTableFilters(nowId, ack)) + .where('userId', '=', options.userId) .stream(); } - @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true }) - getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) { - return this.db - .selectFrom('user_metadata') + @GenerateSql({ params: [dummyQueryOptions], stream: true }) + getUpserts(options: SyncQueryOptions) { + return this.upsertQuery('user_metadata', options) .select(['userId', 'key', 'value', 'updateId']) - .where('userId', '=', userId) - .$call(this.upsertTableFilters(nowId, ack)) + .where('userId', '=', options.userId) .stream(); } } diff --git a/server/src/services/sync.service.ts b/server/src/services/sync.service.ts index 0a4144a953..6b8512eacb 100644 --- a/server/src/services/sync.service.ts +++ b/server/src/services/sync.service.ts @@ -183,7 +183,7 @@ export class SyncService extends BaseService { private async syncAuthUsersV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const upsertType = SyncEntityType.AuthUserV1; - const upserts = this.syncRepository.authUser.getUpserts(options, checkpointMap[upsertType]); + const upserts = this.syncRepository.authUser.getUpserts({ ...options, ack: checkpointMap[upsertType] }); for await (const { updateId, profileImagePath, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data: { ...data, hasProfileImage: !!profileImagePath } }); } @@ -191,13 +191,13 @@ export class SyncService extends BaseService { private async syncUsersV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.UserDeleteV1; - const deletes = this.syncRepository.user.getDeletes(options, checkpointMap[deleteType]); + const deletes = this.syncRepository.user.getDeletes({ ...options, ack: checkpointMap[deleteType] }); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.UserV1; - const upserts = this.syncRepository.user.getUpserts(options, checkpointMap[upsertType]); + const upserts = this.syncRepository.user.getUpserts({ ...options, ack: checkpointMap[upsertType] }); for await (const { updateId, profileImagePath, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data: { ...data, hasProfileImage: !!profileImagePath } }); } @@ -205,13 +205,13 @@ export class SyncService extends BaseService { private async syncPartnersV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.PartnerDeleteV1; - const deletes = this.syncRepository.partner.getDeletes(options, checkpointMap[deleteType]); + const deletes = this.syncRepository.partner.getDeletes({ ...options, ack: checkpointMap[deleteType] }); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.PartnerV1; - const upserts = this.syncRepository.partner.getUpserts(options, checkpointMap[upsertType]); + const upserts = this.syncRepository.partner.getUpserts({ ...options, ack: checkpointMap[upsertType] }); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -219,13 +219,13 @@ export class SyncService extends BaseService { private async syncAssetsV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.AssetDeleteV1; - const deletes = this.syncRepository.asset.getDeletes(options, checkpointMap[deleteType]); + const deletes = this.syncRepository.asset.getDeletes({ ...options, ack: checkpointMap[deleteType] }); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.AssetV1; - const upserts = this.syncRepository.asset.getUpserts(options, checkpointMap[upsertType]); + const upserts = this.syncRepository.asset.getUpserts({ ...options, ack: checkpointMap[upsertType] }); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data: mapSyncAssetV1(data) }); } @@ -238,14 +238,17 @@ export class SyncService extends BaseService { sessionId: string, ) { const deleteType = SyncEntityType.PartnerAssetDeleteV1; - const deletes = this.syncRepository.partnerAsset.getDeletes(options, checkpointMap[deleteType]); + const deletes = this.syncRepository.partnerAsset.getDeletes({ ...options, ack: checkpointMap[deleteType] }); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const backfillType = SyncEntityType.PartnerAssetBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const partners = await this.syncRepository.partner.getCreatedAfter(options, backfillCheckpoint?.updateId); + const partners = await this.syncRepository.partner.getCreatedAfter({ + ...options, + afterCreateId: backfillCheckpoint?.updateId, + }); const upsertType = SyncEntityType.PartnerAssetV1; const upsertCheckpoint = checkpointMap[upsertType]; if (upsertCheckpoint) { @@ -258,7 +261,10 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.partnerAsset.getBackfill(options, partner.sharedById, startId, endId); + const backfill = this.syncRepository.partnerAsset.getBackfill( + { ...options, afterUpdateId: startId, beforeUpdateId: endId }, + partner.sharedById, + ); for await (const { updateId, ...data } of backfill) { send(response, { @@ -278,7 +284,7 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.partnerAsset.getUpserts(options, checkpointMap[upsertType]); + const upserts = this.syncRepository.partnerAsset.getUpserts({ ...options, ack: checkpointMap[upsertType] }); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data: mapSyncAssetV1(data) }); } @@ -286,7 +292,7 @@ export class SyncService extends BaseService { private async syncAssetExifsV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const upsertType = SyncEntityType.AssetExifV1; - const upserts = this.syncRepository.assetExif.getUpserts(options, checkpointMap[upsertType]); + const upserts = this.syncRepository.assetExif.getUpserts({ ...options, ack: checkpointMap[upsertType] }); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -300,7 +306,10 @@ export class SyncService extends BaseService { ) { const backfillType = SyncEntityType.PartnerAssetExifBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const partners = await this.syncRepository.partner.getCreatedAfter(options, backfillCheckpoint?.updateId); + const partners = await this.syncRepository.partner.getCreatedAfter({ + ...options, + afterCreateId: backfillCheckpoint?.updateId, + }); const upsertType = SyncEntityType.PartnerAssetExifV1; const upsertCheckpoint = checkpointMap[upsertType]; @@ -314,7 +323,10 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.partnerAssetExif.getBackfill(options, partner.sharedById, startId, endId); + const backfill = this.syncRepository.partnerAssetExif.getBackfill( + { ...options, afterUpdateId: startId, beforeUpdateId: endId }, + partner.sharedById, + ); for await (const { updateId, ...data } of backfill) { send(response, { type: backfillType, ids: [partner.createId, updateId], data }); @@ -330,7 +342,7 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.partnerAssetExif.getUpserts(options, checkpointMap[upsertType]); + const upserts = this.syncRepository.partnerAssetExif.getUpserts({ ...options, ack: checkpointMap[upsertType] }); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -338,13 +350,13 @@ export class SyncService extends BaseService { private async syncAlbumsV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.AlbumDeleteV1; - const deletes = this.syncRepository.album.getDeletes(options, checkpointMap[deleteType]); + const deletes = this.syncRepository.album.getDeletes({ ...options, ack: checkpointMap[deleteType] }); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.AlbumV1; - const upserts = this.syncRepository.album.getUpserts(options, checkpointMap[upsertType]); + const upserts = this.syncRepository.album.getUpserts({ ...options, ack: checkpointMap[upsertType] }); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -357,14 +369,17 @@ export class SyncService extends BaseService { sessionId: string, ) { const deleteType = SyncEntityType.AlbumUserDeleteV1; - const deletes = this.syncRepository.albumUser.getDeletes(options, checkpointMap[deleteType]); + const deletes = this.syncRepository.albumUser.getDeletes({ ...options, ack: checkpointMap[deleteType] }); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const backfillType = SyncEntityType.AlbumUserBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const albums = await this.syncRepository.album.getCreatedAfter(options, backfillCheckpoint?.updateId); + const albums = await this.syncRepository.album.getCreatedAfter({ + ...options, + afterCreateId: backfillCheckpoint?.updateId, + }); const upsertType = SyncEntityType.AlbumUserV1; const upsertCheckpoint = checkpointMap[upsertType]; if (upsertCheckpoint) { @@ -377,7 +392,10 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.albumUser.getBackfill(options, album.id, startId, endId); + const backfill = this.syncRepository.albumUser.getBackfill( + { ...options, afterUpdateId: startId, beforeUpdateId: endId }, + album.id, + ); for await (const { updateId, ...data } of backfill) { send(response, { type: backfillType, ids: [createId, updateId], data }); @@ -393,7 +411,7 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.albumUser.getUpserts(options, checkpointMap[upsertType]); + const upserts = this.syncRepository.albumUser.getUpserts({ ...options, ack: checkpointMap[upsertType] }); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -407,7 +425,10 @@ export class SyncService extends BaseService { ) { const backfillType = SyncEntityType.AlbumAssetBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const albums = await this.syncRepository.album.getCreatedAfter(options, backfillCheckpoint?.updateId); + const albums = await this.syncRepository.album.getCreatedAfter({ + ...options, + afterCreateId: backfillCheckpoint?.updateId, + }); const updateType = SyncEntityType.AlbumAssetUpdateV1; const createType = SyncEntityType.AlbumAssetCreateV1; const updateCheckpoint = checkpointMap[updateType]; @@ -422,7 +443,10 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.albumAsset.getBackfill(options, album.id, startId, endId); + const backfill = this.syncRepository.albumAsset.getBackfill( + { ...options, afterUpdateId: startId, beforeUpdateId: endId }, + album.id, + ); for await (const { updateId, ...data } of backfill) { send(response, { type: backfillType, ids: [createId, updateId], data: mapSyncAssetV1(data) }); @@ -439,13 +463,16 @@ export class SyncService extends BaseService { } if (createCheckpoint) { - const updates = this.syncRepository.albumAsset.getUpdates(options, createCheckpoint, updateCheckpoint); + const updates = this.syncRepository.albumAsset.getUpdates( + { ...options, ack: updateCheckpoint }, + createCheckpoint, + ); for await (const { updateId, ...data } of updates) { send(response, { type: updateType, ids: [updateId], data: mapSyncAssetV1(data) }); } } - const creates = this.syncRepository.albumAsset.getCreates(options, createCheckpoint); + const creates = this.syncRepository.albumAsset.getCreates({ ...options, ack: createCheckpoint }); let first = true; for await (const { updateId, ...data } of creates) { if (first) { @@ -469,7 +496,10 @@ export class SyncService extends BaseService { ) { const backfillType = SyncEntityType.AlbumAssetExifBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const albums = await this.syncRepository.album.getCreatedAfter(options, backfillCheckpoint?.updateId); + const albums = await this.syncRepository.album.getCreatedAfter({ + ...options, + afterCreateId: backfillCheckpoint?.updateId, + }); const updateType = SyncEntityType.AlbumAssetExifUpdateV1; const createType = SyncEntityType.AlbumAssetExifCreateV1; const upsertCheckpoint = checkpointMap[updateType]; @@ -484,7 +514,10 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.albumAssetExif.getBackfill(options, album.id, startId, endId); + const backfill = this.syncRepository.albumAssetExif.getBackfill( + { ...options, afterUpdateId: startId, beforeUpdateId: endId }, + album.id, + ); for await (const { updateId, ...data } of backfill) { send(response, { type: backfillType, ids: [createId, updateId], data }); @@ -501,13 +534,16 @@ export class SyncService extends BaseService { } if (createCheckpoint) { - const updates = this.syncRepository.albumAssetExif.getUpdates(options, createCheckpoint, upsertCheckpoint); + const updates = this.syncRepository.albumAssetExif.getUpdates( + { ...options, ack: upsertCheckpoint }, + createCheckpoint, + ); for await (const { updateId, ...data } of updates) { send(response, { type: updateType, ids: [updateId], data }); } } - const creates = this.syncRepository.albumAssetExif.getCreates(options, createCheckpoint); + const creates = this.syncRepository.albumAssetExif.getCreates({ ...options, ack: createCheckpoint }); let first = true; for await (const { updateId, ...data } of creates) { if (first) { @@ -530,14 +566,17 @@ export class SyncService extends BaseService { sessionId: string, ) { const deleteType = SyncEntityType.AlbumToAssetDeleteV1; - const deletes = this.syncRepository.albumToAsset.getDeletes(options, checkpointMap[deleteType]); + const deletes = this.syncRepository.albumToAsset.getDeletes({ ...options, ack: checkpointMap[deleteType] }); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const backfillType = SyncEntityType.AlbumToAssetBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const albums = await this.syncRepository.album.getCreatedAfter(options, backfillCheckpoint?.updateId); + const albums = await this.syncRepository.album.getCreatedAfter({ + ...options, + afterCreateId: backfillCheckpoint?.updateId, + }); const upsertType = SyncEntityType.AlbumToAssetV1; const upsertCheckpoint = checkpointMap[upsertType]; if (upsertCheckpoint) { @@ -550,7 +589,10 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.albumToAsset.getBackfill(options, album.id, startId, endId); + const backfill = this.syncRepository.albumToAsset.getBackfill( + { ...options, afterUpdateId: startId, beforeUpdateId: endId }, + album.id, + ); for await (const { updateId, ...data } of backfill) { send(response, { type: backfillType, ids: [createId, updateId], data }); @@ -566,7 +608,7 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.albumToAsset.getUpserts(options, checkpointMap[upsertType]); + const upserts = this.syncRepository.albumToAsset.getUpserts({ ...options, ack: checkpointMap[upsertType] }); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -574,13 +616,13 @@ export class SyncService extends BaseService { private async syncMemoriesV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.MemoryDeleteV1; - const deletes = this.syncRepository.memory.getDeletes(options, checkpointMap[deleteType]); + const deletes = this.syncRepository.memory.getDeletes({ ...options, ack: checkpointMap[deleteType] }); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.MemoryV1; - const upserts = this.syncRepository.memory.getUpserts(options, checkpointMap[upsertType]); + const upserts = this.syncRepository.memory.getUpserts({ ...options, ack: checkpointMap[upsertType] }); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -588,13 +630,13 @@ export class SyncService extends BaseService { private async syncMemoryAssetsV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.MemoryToAssetDeleteV1; - const deletes = this.syncRepository.memoryToAsset.getDeletes(options, checkpointMap[deleteType]); + const deletes = this.syncRepository.memoryToAsset.getDeletes({ ...options, ack: checkpointMap[deleteType] }); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.MemoryToAssetV1; - const upserts = this.syncRepository.memoryToAsset.getUpserts(options, checkpointMap[upsertType]); + const upserts = this.syncRepository.memoryToAsset.getUpserts({ ...options, ack: checkpointMap[upsertType] }); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -602,13 +644,13 @@ export class SyncService extends BaseService { private async syncStackV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.StackDeleteV1; - const deletes = this.syncRepository.stack.getDeletes(options, checkpointMap[deleteType]); + const deletes = this.syncRepository.stack.getDeletes({ ...options, ack: checkpointMap[deleteType] }); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.StackV1; - const upserts = this.syncRepository.stack.getUpserts(options, checkpointMap[upsertType]); + const upserts = this.syncRepository.stack.getUpserts({ ...options, ack: checkpointMap[upsertType] }); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -621,14 +663,17 @@ export class SyncService extends BaseService { sessionId: string, ) { const deleteType = SyncEntityType.PartnerStackDeleteV1; - const deletes = this.syncRepository.partnerStack.getDeletes(options, checkpointMap[deleteType]); + const deletes = this.syncRepository.partnerStack.getDeletes({ ...options, ack: checkpointMap[deleteType] }); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const backfillType = SyncEntityType.PartnerStackBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const partners = await this.syncRepository.partner.getCreatedAfter(options, backfillCheckpoint?.updateId); + const partners = await this.syncRepository.partner.getCreatedAfter({ + ...options, + afterCreateId: backfillCheckpoint?.updateId, + }); const upsertType = SyncEntityType.PartnerStackV1; const upsertCheckpoint = checkpointMap[upsertType]; if (upsertCheckpoint) { @@ -641,7 +686,10 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.partnerStack.getBackfill(options, partner.sharedById, startId, endId); + const backfill = this.syncRepository.partnerStack.getBackfill( + { ...options, afterUpdateId: startId, beforeUpdateId: endId }, + partner.sharedById, + ); for await (const { updateId, ...data } of backfill) { send(response, { @@ -661,7 +709,7 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.partnerStack.getUpserts(options, checkpointMap[upsertType]); + const upserts = this.syncRepository.partnerStack.getUpserts({ ...options, ack: checkpointMap[upsertType] }); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -669,13 +717,13 @@ export class SyncService extends BaseService { private async syncPeopleV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.PersonDeleteV1; - const deletes = this.syncRepository.people.getDeletes(options, checkpointMap[deleteType]); + const deletes = this.syncRepository.people.getDeletes({ ...options, ack: checkpointMap[deleteType] }); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.PersonV1; - const upserts = this.syncRepository.people.getUpserts(options, checkpointMap[upsertType]); + const upserts = this.syncRepository.people.getUpserts({ ...options, ack: checkpointMap[upsertType] }); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -683,13 +731,13 @@ export class SyncService extends BaseService { private async syncAssetFacesV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.AssetFaceDeleteV1; - const deletes = this.syncRepository.assetFace.getDeletes(options, checkpointMap[deleteType]); + const deletes = this.syncRepository.assetFace.getDeletes({ ...options, ack: checkpointMap[deleteType] }); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.AssetFaceV1; - const upserts = this.syncRepository.assetFace.getUpserts(options, checkpointMap[upsertType]); + const upserts = this.syncRepository.assetFace.getUpserts({ ...options, ack: checkpointMap[upsertType] }); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -697,14 +745,14 @@ export class SyncService extends BaseService { private async syncUserMetadataV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.UserMetadataDeleteV1; - const deletes = this.syncRepository.userMetadata.getDeletes(options, checkpointMap[deleteType]); + const deletes = this.syncRepository.userMetadata.getDeletes({ ...options, ack: checkpointMap[deleteType] }); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.UserMetadataV1; - const upserts = this.syncRepository.userMetadata.getUpserts(options, checkpointMap[upsertType]); + const upserts = this.syncRepository.userMetadata.getUpserts({ ...options, ack: checkpointMap[upsertType] }); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data });