From df76735f4a9b089e0451423c8dce4bd98a0466c7 Mon Sep 17 00:00:00 2001 From: Jason Rasmussen Date: Fri, 27 Jun 2025 13:47:06 -0400 Subject: [PATCH] refactor: sync repository (#19581) --- server/src/bin/sync-sql.ts | 3 +- .../queries/sync.checkpoint.repository.sql | 15 + server/src/queries/sync.repository.sql | 1028 ++++++++--------- server/src/repositories/index.ts | 2 + .../sync-checkpoint.repository.ts | 41 + server/src/repositories/sync.repository.ts | 664 ++++++----- server/src/services/base.service.ts | 3 + server/src/services/sync.service.ts | 78 +- server/test/medium.factory.ts | 9 +- server/test/utils.ts | 4 + 10 files changed, 968 insertions(+), 879 deletions(-) create mode 100644 server/src/queries/sync.checkpoint.repository.sql create mode 100644 server/src/repositories/sync-checkpoint.repository.ts diff --git a/server/src/bin/sync-sql.ts b/server/src/bin/sync-sql.ts index a114830e09..6d3cb42fae 100644 --- a/server/src/bin/sync-sql.ts +++ b/server/src/bin/sync-sql.ts @@ -15,6 +15,7 @@ import { repositories } from 'src/repositories'; import { AccessRepository } from 'src/repositories/access.repository'; import { ConfigRepository } from 'src/repositories/config.repository'; import { LoggingRepository } from 'src/repositories/logging.repository'; +import { SyncRepository } from 'src/repositories/sync.repository'; import { AuthService } from 'src/services/auth.service'; import { getKyselyConfig } from 'src/utils/database'; @@ -111,7 +112,7 @@ class SqlGenerator { data.push(...(await this.runTargets(instance, `${Repository.name}`))); // nested repositories - if (Repository.name === AccessRepository.name) { + if (Repository.name === AccessRepository.name || Repository.name === SyncRepository.name) { for (const key of Object.keys(instance)) { const subInstance = (instance as any)[key]; data.push(...(await this.runTargets(subInstance, `${Repository.name}.${key}`))); diff --git a/server/src/queries/sync.checkpoint.repository.sql b/server/src/queries/sync.checkpoint.repository.sql new file mode 100644 index 0000000000..40edf0f405 --- /dev/null +++ b/server/src/queries/sync.checkpoint.repository.sql @@ -0,0 +1,15 @@ +-- NOTE: This file is auto generated by ./sql-generator + +-- SyncCheckpointRepository.getAll +select + "type", + "ack" +from + "session_sync_checkpoints" +where + "sessionId" = $1 + +-- SyncCheckpointRepository.deleteAll +delete from "session_sync_checkpoints" +where + "sessionId" = $1 diff --git a/server/src/queries/sync.repository.sql b/server/src/queries/sync.repository.sql index 0abe985317..bbd327ff69 100644 --- a/server/src/queries/sync.repository.sql +++ b/server/src/queries/sync.repository.sql @@ -1,141 +1,342 @@ -- NOTE: This file is auto generated by ./sql-generator --- SyncRepository.getCheckpoints +-- SyncRepository.album.getCreatedAfter select - "type", - "ack" + "albumsId" as "id", + "createId" from - "session_sync_checkpoints" + "albums_shared_users_users" where - "sessionId" = $1 - --- SyncRepository.deleteCheckpoints -delete from "session_sync_checkpoints" -where - "sessionId" = $1 - --- SyncRepository.getUserUpserts -select - "id", - "name", - "email", - "deletedAt", - "updateId" -from - "users" -where - "updatedAt" < now() - interval '1 millisecond' + "usersId" = $1 + and "createId" >= $2 + and "createdAt" < now() - interval '1 millisecond' order by - "updateId" asc + "createId" asc --- SyncRepository.getUserDeletes +-- SyncRepository.album.getDeletes select "id", - "userId" + "albumId" from - "users_audit" + "albums_audit" where - "deletedAt" < now() - interval '1 millisecond' + "userId" = $1 + and "deletedAt" < now() - interval '1 millisecond' order by "id" asc --- SyncRepository.getPartnerUpserts -select - "sharedById", - "sharedWithId", - "inTimeline", - "updateId" +-- SyncRepository.album.getUpserts +select distinct + on ("albums"."id", "albums"."updateId") "albums"."id", + "albums"."ownerId", + "albums"."albumName" as "name", + "albums"."description", + "albums"."createdAt", + "albums"."updatedAt", + "albums"."albumThumbnailAssetId" as "thumbnailAssetId", + "albums"."isActivityEnabled", + "albums"."order", + "albums"."updateId" from - "partners" + "albums" + left join "albums_shared_users_users" as "album_users" on "albums"."id" = "album_users"."albumsId" where - ( - "sharedById" = $1 - or "sharedWithId" = $2 + "albums"."updatedAt" < now() - interval '1 millisecond' + and ( + "albums"."ownerId" = $1 + or "album_users"."usersId" = $2 ) - and "updatedAt" < now() - interval '1 millisecond' order by - "updateId" asc + "albums"."updateId" asc --- SyncRepository.getPartnerDeletes +-- SyncRepository.albumAsset.getBackfill +select + "assets"."id", + "assets"."ownerId", + "assets"."originalFileName", + "assets"."thumbhash", + "assets"."checksum", + "assets"."fileCreatedAt", + "assets"."fileModifiedAt", + "assets"."localDateTime", + "assets"."type", + "assets"."deletedAt", + "assets"."isFavorite", + "assets"."visibility", + "assets"."duration", + "assets"."updateId" +from + "assets" + inner join "albums_assets_assets" as "album_assets" on "album_assets"."assetsId" = "assets"."id" +where + "album_assets"."albumsId" = $1 + and "assets"."updatedAt" < now() - interval '1 millisecond' + and "assets"."updateId" <= $2 + and "assets"."updateId" >= $3 +order by + "assets"."updateId" asc + +-- SyncRepository.albumAsset.getUpserts +select + "assets"."id", + "assets"."ownerId", + "assets"."originalFileName", + "assets"."thumbhash", + "assets"."checksum", + "assets"."fileCreatedAt", + "assets"."fileModifiedAt", + "assets"."localDateTime", + "assets"."type", + "assets"."deletedAt", + "assets"."isFavorite", + "assets"."visibility", + "assets"."duration", + "assets"."updateId" +from + "assets" + inner join "albums_assets_assets" as "album_assets" on "album_assets"."assetsId" = "assets"."id" + inner join "albums" on "albums"."id" = "album_assets"."albumsId" + left join "albums_shared_users_users" as "album_users" on "album_users"."albumsId" = "album_assets"."albumsId" +where + "assets"."updatedAt" < now() - interval '1 millisecond' + and ( + "albums"."ownerId" = $1 + or "album_users"."usersId" = $2 + ) +order by + "assets"."updateId" asc + +-- SyncRepository.albumAssetExif.getBackfill +select + "exif"."assetId", + "exif"."description", + "exif"."exifImageWidth", + "exif"."exifImageHeight", + "exif"."fileSizeInByte", + "exif"."orientation", + "exif"."dateTimeOriginal", + "exif"."modifyDate", + "exif"."timeZone", + "exif"."latitude", + "exif"."longitude", + "exif"."projectionType", + "exif"."city", + "exif"."state", + "exif"."country", + "exif"."make", + "exif"."model", + "exif"."lensModel", + "exif"."fNumber", + "exif"."focalLength", + "exif"."iso", + "exif"."exposureTime", + "exif"."profileDescription", + "exif"."rating", + "exif"."fps", + "exif"."updateId" +from + "exif" + inner join "albums_assets_assets" as "album_assets" on "album_assets"."assetsId" = "exif"."assetId" +where + "album_assets"."albumsId" = $1 + and "exif"."updatedAt" < now() - interval '1 millisecond' + and "exif"."updateId" <= $2 + and "exif"."updateId" >= $3 +order by + "exif"."updateId" asc + +-- SyncRepository.albumAssetExif.getUpserts +select + "exif"."assetId", + "exif"."description", + "exif"."exifImageWidth", + "exif"."exifImageHeight", + "exif"."fileSizeInByte", + "exif"."orientation", + "exif"."dateTimeOriginal", + "exif"."modifyDate", + "exif"."timeZone", + "exif"."latitude", + "exif"."longitude", + "exif"."projectionType", + "exif"."city", + "exif"."state", + "exif"."country", + "exif"."make", + "exif"."model", + "exif"."lensModel", + "exif"."fNumber", + "exif"."focalLength", + "exif"."iso", + "exif"."exposureTime", + "exif"."profileDescription", + "exif"."rating", + "exif"."fps", + "exif"."updateId" +from + "exif" + inner join "albums_assets_assets" as "album_assets" on "album_assets"."assetsId" = "exif"."assetId" + inner join "albums" on "albums"."id" = "album_assets"."albumsId" + left join "albums_shared_users_users" as "album_users" on "album_users"."albumsId" = "album_assets"."albumsId" +where + "exif"."updatedAt" < now() - interval '1 millisecond' + and ( + "albums"."ownerId" = $1 + or "album_users"."usersId" = $2 + ) +order by + "exif"."updateId" asc + +-- SyncRepository.albumToAsset.getBackfill +select + "album_assets"."assetsId" as "assetId", + "album_assets"."albumsId" as "albumId", + "album_assets"."updateId" +from + "albums_assets_assets" as "album_assets" +where + "album_assets"."albumsId" = $1 + and "album_assets"."updatedAt" < now() - interval '1 millisecond' + and "album_assets"."updateId" <= $2 + and "album_assets"."updateId" >= $3 +order by + "album_assets"."updateId" asc + +-- SyncRepository.albumToAsset.getDeletes select "id", - "sharedById", - "sharedWithId" + "assetId", + "albumId" from - "partners_audit" + "album_assets_audit" where - ( - "sharedById" = $1 - or "sharedWithId" = $2 + "albumId" in ( + select + "id" + from + "albums" + where + "ownerId" = $1 + union + ( + select + "albumUsers"."albumsId" as "id" + from + "albums_shared_users_users" as "albumUsers" + where + "albumUsers"."usersId" = $2 + ) ) and "deletedAt" < now() - interval '1 millisecond' order by "id" asc --- SyncRepository.getAssetUpserts +-- SyncRepository.albumToAsset.getUpserts select - "assets"."id", - "assets"."ownerId", - "assets"."originalFileName", - "assets"."thumbhash", - "assets"."checksum", - "assets"."fileCreatedAt", - "assets"."fileModifiedAt", - "assets"."localDateTime", - "assets"."type", - "assets"."deletedAt", - "assets"."isFavorite", - "assets"."visibility", - "assets"."duration", - "assets"."updateId" + "album_assets"."assetsId" as "assetId", + "album_assets"."albumsId" as "albumId", + "album_assets"."updateId" from - "assets" + "albums_assets_assets" as "album_assets" + inner join "albums" on "albums"."id" = "album_assets"."albumsId" + left join "albums_shared_users_users" as "album_users" on "album_users"."albumsId" = "album_assets"."albumsId" where - "ownerId" = $1 - and "updatedAt" < now() - interval '1 millisecond' + "album_assets"."updatedAt" < now() - interval '1 millisecond' + and ( + "albums"."ownerId" = $1 + or "album_users"."usersId" = $2 + ) order by - "updateId" asc + "album_assets"."updateId" asc --- SyncRepository.getPartnerBackfill +-- SyncRepository.albumUser.getBackfill select - "sharedById", - "createId" + "album_users"."albumsId" as "albumId", + "album_users"."usersId" as "userId", + "album_users"."role", + "album_users"."updateId" from - "partners" + "albums_shared_users_users" as "album_users" where - "sharedWithId" = $1 - and "createId" >= $2 - and "createdAt" < now() - interval '1 millisecond' -order by - "partners"."createId" asc - --- SyncRepository.getPartnerAssetsBackfill -select - "assets"."id", - "assets"."ownerId", - "assets"."originalFileName", - "assets"."thumbhash", - "assets"."checksum", - "assets"."fileCreatedAt", - "assets"."fileModifiedAt", - "assets"."localDateTime", - "assets"."type", - "assets"."deletedAt", - "assets"."isFavorite", - "assets"."visibility", - "assets"."duration", - "assets"."updateId" -from - "assets" -where - "ownerId" = $1 + "albumsId" = $1 and "updatedAt" < now() - interval '1 millisecond' and "updateId" <= $2 and "updateId" >= $3 order by "updateId" asc --- SyncRepository.getPartnerAssetsUpserts +-- SyncRepository.albumUser.getDeletes +select + "id", + "userId", + "albumId" +from + "album_users_audit" +where + "albumId" in ( + select + "id" + from + "albums" + where + "ownerId" = $1 + union + ( + select + "albumUsers"."albumsId" as "id" + from + "albums_shared_users_users" as "albumUsers" + where + "albumUsers"."usersId" = $2 + ) + ) + and "deletedAt" < now() - interval '1 millisecond' +order by + "id" asc + +-- SyncRepository.albumUser.getUpserts +select + "album_users"."albumsId" as "albumId", + "album_users"."usersId" as "userId", + "album_users"."role", + "album_users"."updateId" +from + "albums_shared_users_users" as "album_users" +where + "album_users"."updatedAt" < now() - interval '1 millisecond' + and "album_users"."albumsId" in ( + select + "id" + from + "albums" + where + "ownerId" = $1 + union + ( + select + "albumUsers"."albumsId" as "id" + from + "albums_shared_users_users" as "albumUsers" + where + "albumUsers"."usersId" = $2 + ) + ) +order by + "album_users"."updateId" asc + +-- SyncRepository.asset.getDeletes +select + "id", + "assetId" +from + "assets_audit" +where + "ownerId" = $1 + and "deletedAt" < now() - interval '1 millisecond' +order by + "id" asc + +-- SyncRepository.asset.getUpserts select "assets"."id", "assets"."ownerId", @@ -154,50 +355,12 @@ select from "assets" where - "ownerId" in ( - select - "sharedById" - from - "partners" - where - "sharedWithId" = $1 - ) + "ownerId" = $1 and "updatedAt" < now() - interval '1 millisecond' order by "updateId" asc --- SyncRepository.getAssetDeletes -select - "id", - "assetId" -from - "assets_audit" -where - "ownerId" = $1 - and "deletedAt" < now() - interval '1 millisecond' -order by - "id" asc - --- SyncRepository.getPartnerAssetDeletes -select - "id", - "assetId" -from - "assets_audit" -where - "ownerId" in ( - select - "sharedById" - from - "partners" - where - "sharedWithId" = $1 - ) - and "deletedAt" < now() - interval '1 millisecond' -order by - "id" asc - --- SyncRepository.getAssetExifsUpserts +-- SyncRepository.assetExif.getUpserts select "exif"."assetId", "exif"."description", @@ -240,7 +403,204 @@ where order by "updateId" asc --- SyncRepository.getPartnerAssetExifsBackfill +-- SyncRepository.memory.getDeletes +select + "id", + "memoryId" +from + "memories_audit" +where + "userId" = $1 + and "deletedAt" < now() - interval '1 millisecond' +order by + "id" asc + +-- SyncRepository.memory.getUpserts +select + "id", + "createdAt", + "updatedAt", + "deletedAt", + "ownerId", + "type", + "data", + "isSaved", + "memoryAt", + "seenAt", + "showAt", + "hideAt", + "updateId" +from + "memories" +where + "ownerId" = $1 + and "updatedAt" < now() - interval '1 millisecond' +order by + "updateId" asc + +-- SyncRepository.memoryToAsset.getDeletes +select + "id", + "memoryId", + "assetId" +from + "memory_assets_audit" +where + "memoryId" in ( + select + "id" + from + "memories" + where + "ownerId" = $1 + ) + and "deletedAt" < now() - interval '1 millisecond' +order by + "id" asc + +-- SyncRepository.memoryToAsset.getUpserts +select + "memoriesId" as "memoryId", + "assetsId" as "assetId", + "updateId" +from + "memories_assets_assets" +where + "memoriesId" in ( + select + "id" + from + "memories" + where + "ownerId" = $1 + ) + and "updatedAt" < now() - interval '1 millisecond' +order by + "updateId" asc + +-- SyncRepository.partner.getCreatedAfter +select + "sharedById", + "createId" +from + "partners" +where + "sharedWithId" = $1 + and "createId" >= $2 + and "createdAt" < now() - interval '1 millisecond' +order by + "partners"."createId" asc + +-- SyncRepository.partner.getDeletes +select + "id", + "sharedById", + "sharedWithId" +from + "partners_audit" +where + ( + "sharedById" = $1 + or "sharedWithId" = $2 + ) + and "deletedAt" < now() - interval '1 millisecond' +order by + "id" asc + +-- SyncRepository.partner.getUpserts +select + "sharedById", + "sharedWithId", + "inTimeline", + "updateId" +from + "partners" +where + ( + "sharedById" = $1 + or "sharedWithId" = $2 + ) + and "updatedAt" < now() - interval '1 millisecond' +order by + "updateId" asc + +-- SyncRepository.partnerAsset.getBackfill +select + "assets"."id", + "assets"."ownerId", + "assets"."originalFileName", + "assets"."thumbhash", + "assets"."checksum", + "assets"."fileCreatedAt", + "assets"."fileModifiedAt", + "assets"."localDateTime", + "assets"."type", + "assets"."deletedAt", + "assets"."isFavorite", + "assets"."visibility", + "assets"."duration", + "assets"."updateId" +from + "assets" +where + "ownerId" = $1 + and "updatedAt" < now() - interval '1 millisecond' + and "updateId" <= $2 + and "updateId" >= $3 +order by + "updateId" asc + +-- SyncRepository.partnerAsset.getDeletes +select + "id", + "assetId" +from + "assets_audit" +where + "ownerId" in ( + select + "sharedById" + from + "partners" + where + "sharedWithId" = $1 + ) + and "deletedAt" < now() - interval '1 millisecond' +order by + "id" asc + +-- SyncRepository.partnerAsset.getUpserts +select + "assets"."id", + "assets"."ownerId", + "assets"."originalFileName", + "assets"."thumbhash", + "assets"."checksum", + "assets"."fileCreatedAt", + "assets"."fileModifiedAt", + "assets"."localDateTime", + "assets"."type", + "assets"."deletedAt", + "assets"."isFavorite", + "assets"."visibility", + "assets"."duration", + "assets"."updateId" +from + "assets" +where + "ownerId" in ( + select + "sharedById" + from + "partners" + where + "sharedWithId" = $1 + ) + and "updatedAt" < now() - interval '1 millisecond' +order by + "updateId" asc + +-- SyncRepository.partnerAssetExif.getBackfill select "exif"."assetId", "exif"."description", @@ -279,7 +639,7 @@ where order by "exif"."updateId" asc --- SyncRepository.getPartnerAssetExifsUpserts +-- SyncRepository.partnerAssetExif.getUpserts select "exif"."assetId", "exif"."description", @@ -329,401 +689,27 @@ where order by "updateId" asc --- SyncRepository.getAlbumDeletes +-- SyncRepository.user.getDeletes select "id", - "albumId" + "userId" from - "albums_audit" + "users_audit" where - "userId" = $1 - and "deletedAt" < now() - interval '1 millisecond' + "deletedAt" < now() - interval '1 millisecond' order by "id" asc --- SyncRepository.getAlbumUpserts -select distinct - on ("albums"."id", "albums"."updateId") "albums"."id", - "albums"."ownerId", - "albums"."albumName" as "name", - "albums"."description", - "albums"."createdAt", - "albums"."updatedAt", - "albums"."albumThumbnailAssetId" as "thumbnailAssetId", - "albums"."isActivityEnabled", - "albums"."order", - "albums"."updateId" -from - "albums" - left join "albums_shared_users_users" as "album_users" on "albums"."id" = "album_users"."albumsId" -where - "albums"."updatedAt" < now() - interval '1 millisecond' - and ( - "albums"."ownerId" = $1 - or "album_users"."usersId" = $2 - ) -order by - "albums"."updateId" asc - --- SyncRepository.getAlbumToAssetDeletes +-- SyncRepository.user.getUpserts select "id", - "assetId", - "albumId" -from - "album_assets_audit" -where - "albumId" in ( - select - "id" - from - "albums" - where - "ownerId" = $1 - union - ( - select - "albumUsers"."albumsId" as "id" - from - "albums_shared_users_users" as "albumUsers" - where - "albumUsers"."usersId" = $2 - ) - ) - and "deletedAt" < now() - interval '1 millisecond' -order by - "id" asc - --- SyncRepository.getAlbumUserDeletes -select - "id", - "userId", - "albumId" -from - "album_users_audit" -where - "albumId" in ( - select - "id" - from - "albums" - where - "ownerId" = $1 - union - ( - select - "albumUsers"."albumsId" as "id" - from - "albums_shared_users_users" as "albumUsers" - where - "albumUsers"."usersId" = $2 - ) - ) - and "deletedAt" < now() - interval '1 millisecond' -order by - "id" asc - --- SyncRepository.getAlbumBackfill -select - "albumsId" as "id", - "createId" -from - "albums_shared_users_users" -where - "usersId" = $1 - and "createId" >= $2 - and "createdAt" < now() - interval '1 millisecond' -order by - "createId" asc - --- SyncRepository.getAlbumUsersBackfill -select - "album_users"."albumsId" as "albumId", - "album_users"."usersId" as "userId", - "album_users"."role", - "album_users"."updateId" -from - "albums_shared_users_users" as "album_users" -where - "albumsId" = $1 - and "updatedAt" < now() - interval '1 millisecond' - and "updateId" <= $2 - and "updateId" >= $3 -order by - "updateId" asc - --- SyncRepository.getAlbumUserUpserts -select - "album_users"."albumsId" as "albumId", - "album_users"."usersId" as "userId", - "album_users"."role", - "album_users"."updateId" -from - "albums_shared_users_users" as "album_users" -where - "album_users"."updatedAt" < now() - interval '1 millisecond' - and "album_users"."albumsId" in ( - select - "id" - from - "albums" - where - "ownerId" = $1 - union - ( - select - "albumUsers"."albumsId" as "id" - from - "albums_shared_users_users" as "albumUsers" - where - "albumUsers"."usersId" = $2 - ) - ) -order by - "album_users"."updateId" asc - --- SyncRepository.getAlbumAssetsBackfill -select - "assets"."id", - "assets"."ownerId", - "assets"."originalFileName", - "assets"."thumbhash", - "assets"."checksum", - "assets"."fileCreatedAt", - "assets"."fileModifiedAt", - "assets"."localDateTime", - "assets"."type", - "assets"."deletedAt", - "assets"."isFavorite", - "assets"."visibility", - "assets"."duration", - "assets"."updateId" -from - "assets" - inner join "albums_assets_assets" as "album_assets" on "album_assets"."assetsId" = "assets"."id" -where - "album_assets"."albumsId" = $1 - and "assets"."updatedAt" < now() - interval '1 millisecond' - and "assets"."updateId" <= $2 - and "assets"."updateId" >= $3 -order by - "assets"."updateId" asc - --- SyncRepository.getAlbumAssetsUpserts -select - "assets"."id", - "assets"."ownerId", - "assets"."originalFileName", - "assets"."thumbhash", - "assets"."checksum", - "assets"."fileCreatedAt", - "assets"."fileModifiedAt", - "assets"."localDateTime", - "assets"."type", - "assets"."deletedAt", - "assets"."isFavorite", - "assets"."visibility", - "assets"."duration", - "assets"."updateId" -from - "assets" - inner join "albums_assets_assets" as "album_assets" on "album_assets"."assetsId" = "assets"."id" - inner join "albums" on "albums"."id" = "album_assets"."albumsId" - left join "albums_shared_users_users" as "album_users" on "album_users"."albumsId" = "album_assets"."albumsId" -where - "assets"."updatedAt" < now() - interval '1 millisecond' - and ( - "albums"."ownerId" = $1 - or "album_users"."usersId" = $2 - ) -order by - "assets"."updateId" asc - --- SyncRepository.getAlbumToAssetBackfill -select - "album_assets"."assetsId" as "assetId", - "album_assets"."albumsId" as "albumId", - "album_assets"."updateId" -from - "albums_assets_assets" as "album_assets" -where - "album_assets"."albumsId" = $1 - and "album_assets"."updatedAt" < now() - interval '1 millisecond' - and "album_assets"."updateId" <= $2 - and "album_assets"."updateId" >= $3 -order by - "album_assets"."updateId" asc - --- SyncRepository.getAlbumToAssetUpserts -select - "album_assets"."assetsId" as "assetId", - "album_assets"."albumsId" as "albumId", - "album_assets"."updateId" -from - "albums_assets_assets" as "album_assets" - inner join "albums" on "albums"."id" = "album_assets"."albumsId" - left join "albums_shared_users_users" as "album_users" on "album_users"."albumsId" = "album_assets"."albumsId" -where - "album_assets"."updatedAt" < now() - interval '1 millisecond' - and ( - "albums"."ownerId" = $1 - or "album_users"."usersId" = $2 - ) -order by - "album_assets"."updateId" asc - --- SyncRepository.getAlbumAssetExifsBackfill -select - "exif"."assetId", - "exif"."description", - "exif"."exifImageWidth", - "exif"."exifImageHeight", - "exif"."fileSizeInByte", - "exif"."orientation", - "exif"."dateTimeOriginal", - "exif"."modifyDate", - "exif"."timeZone", - "exif"."latitude", - "exif"."longitude", - "exif"."projectionType", - "exif"."city", - "exif"."state", - "exif"."country", - "exif"."make", - "exif"."model", - "exif"."lensModel", - "exif"."fNumber", - "exif"."focalLength", - "exif"."iso", - "exif"."exposureTime", - "exif"."profileDescription", - "exif"."rating", - "exif"."fps", - "exif"."updateId" -from - "exif" - inner join "albums_assets_assets" as "album_assets" on "album_assets"."assetsId" = "exif"."assetId" -where - "album_assets"."albumsId" = $1 - and "exif"."updatedAt" < now() - interval '1 millisecond' - and "exif"."updateId" <= $2 - and "exif"."updateId" >= $3 -order by - "exif"."updateId" asc - --- SyncRepository.getAlbumAssetExifsUpserts -select - "exif"."assetId", - "exif"."description", - "exif"."exifImageWidth", - "exif"."exifImageHeight", - "exif"."fileSizeInByte", - "exif"."orientation", - "exif"."dateTimeOriginal", - "exif"."modifyDate", - "exif"."timeZone", - "exif"."latitude", - "exif"."longitude", - "exif"."projectionType", - "exif"."city", - "exif"."state", - "exif"."country", - "exif"."make", - "exif"."model", - "exif"."lensModel", - "exif"."fNumber", - "exif"."focalLength", - "exif"."iso", - "exif"."exposureTime", - "exif"."profileDescription", - "exif"."rating", - "exif"."fps", - "exif"."updateId" -from - "exif" - inner join "albums_assets_assets" as "album_assets" on "album_assets"."assetsId" = "exif"."assetId" - inner join "albums" on "albums"."id" = "album_assets"."albumsId" - left join "albums_shared_users_users" as "album_users" on "album_users"."albumsId" = "album_assets"."albumsId" -where - "exif"."updatedAt" < now() - interval '1 millisecond' - and ( - "albums"."ownerId" = $1 - or "album_users"."usersId" = $2 - ) -order by - "exif"."updateId" asc - --- SyncRepository.getMemoryUpserts -select - "id", - "createdAt", - "updatedAt", + "name", + "email", "deletedAt", - "ownerId", - "type", - "data", - "isSaved", - "memoryAt", - "seenAt", - "showAt", - "hideAt", "updateId" from - "memories" + "users" where - "ownerId" = $1 - and "updatedAt" < now() - interval '1 millisecond' + "updatedAt" < now() - interval '1 millisecond' order by "updateId" asc - --- SyncRepository.getMemoryDeletes -select - "id", - "memoryId" -from - "memories_audit" -where - "userId" = $1 - and "deletedAt" < now() - interval '1 millisecond' -order by - "id" asc - --- SyncRepository.getMemoryAssetUpserts -select - "memoriesId" as "memoryId", - "assetsId" as "assetId", - "updateId" -from - "memories_assets_assets" -where - "memoriesId" in ( - select - "id" - from - "memories" - where - "ownerId" = $1 - ) - and "updatedAt" < now() - interval '1 millisecond' -order by - "updateId" asc - --- SyncRepository.getMemoryAssetDeletes -select - "id", - "memoryId", - "assetId" -from - "memory_assets_audit" -where - "memoryId" in ( - select - "id" - from - "memories" - where - "ownerId" = $1 - ) - and "deletedAt" < now() - interval '1 millisecond' -order by - "id" asc diff --git a/server/src/repositories/index.ts b/server/src/repositories/index.ts index 879385f9fd..a01b46f3bd 100644 --- a/server/src/repositories/index.ts +++ b/server/src/repositories/index.ts @@ -34,6 +34,7 @@ import { SessionRepository } from 'src/repositories/session.repository'; import { SharedLinkRepository } from 'src/repositories/shared-link.repository'; import { StackRepository } from 'src/repositories/stack.repository'; import { StorageRepository } from 'src/repositories/storage.repository'; +import { SyncCheckpointRepository } from 'src/repositories/sync-checkpoint.repository'; import { SyncRepository } from 'src/repositories/sync.repository'; import { SystemMetadataRepository } from 'src/repositories/system-metadata.repository'; import { TagRepository } from 'src/repositories/tag.repository'; @@ -81,6 +82,7 @@ export const repositories = [ StackRepository, StorageRepository, SyncRepository, + SyncCheckpointRepository, SystemMetadataRepository, TagRepository, TelemetryRepository, diff --git a/server/src/repositories/sync-checkpoint.repository.ts b/server/src/repositories/sync-checkpoint.repository.ts new file mode 100644 index 0000000000..aae0da0890 --- /dev/null +++ b/server/src/repositories/sync-checkpoint.repository.ts @@ -0,0 +1,41 @@ +import { Injectable } from '@nestjs/common'; +import { Insertable, Kysely } from 'kysely'; +import { InjectKysely } from 'nestjs-kysely'; +import { DB, SessionSyncCheckpoints } from 'src/db'; +import { DummyValue, GenerateSql } from 'src/decorators'; +import { SyncEntityType } from 'src/enum'; + +@Injectable() +export class SyncCheckpointRepository { + constructor(@InjectKysely() private db: Kysely) {} + + @GenerateSql({ params: [DummyValue.UUID] }) + getAll(sessionId: string) { + return this.db + .selectFrom('session_sync_checkpoints') + .select(['type', 'ack']) + .where('sessionId', '=', sessionId) + .execute(); + } + + upsertAll(items: Insertable[]) { + return this.db + .insertInto('session_sync_checkpoints') + .values(items) + .onConflict((oc) => + oc.columns(['sessionId', 'type']).doUpdateSet((eb) => ({ + ack: eb.ref('excluded.ack'), + })), + ) + .execute(); + } + + @GenerateSql({ params: [DummyValue.UUID] }) + deleteAll(sessionId: string, types?: SyncEntityType[]) { + return this.db + .deleteFrom('session_sync_checkpoints') + .where('sessionId', '=', sessionId) + .$if(!!types, (qb) => qb.where('type', 'in', types!)) + .execute(); + } +} diff --git a/server/src/repositories/sync.repository.ts b/server/src/repositories/sync.repository.ts index db616f78cc..2b1157492d 100644 --- a/server/src/repositories/sync.repository.ts +++ b/server/src/repositories/sync.repository.ts @@ -1,10 +1,9 @@ import { Injectable } from '@nestjs/common'; -import { Insertable, Kysely, SelectQueryBuilder, sql } from 'kysely'; +import { Kysely, SelectQueryBuilder, sql } from 'kysely'; import { InjectKysely } from 'nestjs-kysely'; import { columns } from 'src/database'; -import { DB, SessionSyncCheckpoints } from 'src/db'; +import { DB } from 'src/db'; import { DummyValue, GenerateSql } from 'src/decorators'; -import { SyncEntityType } from 'src/enum'; import { SyncAck } from 'src/types'; type AuditTables = @@ -28,194 +27,78 @@ type UpsertTables = @Injectable() export class SyncRepository { - constructor(@InjectKysely() private db: Kysely) {} + album: AlbumSync; + albumAsset: AlbumAssetSync; + albumAssetExif: AlbumAssetExifSync; + albumToAsset: AlbumToAssetSync; + albumUser: AlbumUserSync; + asset: AssetSync; + assetExif: AssetExifSync; + memory: MemorySync; + memoryToAsset: MemoryToAssetSync; + partner: PartnerSync; + partnerAsset: PartnerAssetsSync; + partnerAssetExif: PartnerAssetExifsSync; + user: UserSync; - @GenerateSql({ params: [DummyValue.UUID] }) - getCheckpoints(sessionId: string) { - return this.db - .selectFrom('session_sync_checkpoints') - .select(['type', 'ack']) - .where('sessionId', '=', sessionId) - .execute(); + constructor(@InjectKysely() private db: Kysely) { + this.album = new AlbumSync(this.db); + this.albumAsset = new AlbumAssetSync(this.db); + this.albumAssetExif = new AlbumAssetExifSync(this.db); + this.albumToAsset = new AlbumToAssetSync(this.db); + this.albumUser = new AlbumUserSync(this.db); + this.asset = new AssetSync(this.db); + this.assetExif = new AssetExifSync(this.db); + this.memory = new MemorySync(this.db); + this.memoryToAsset = new MemoryToAssetSync(this.db); + this.partner = new PartnerSync(this.db); + this.partnerAsset = new PartnerAssetsSync(this.db); + this.partnerAssetExif = new PartnerAssetExifsSync(this.db); + this.user = new UserSync(this.db); + } +} + +class BaseSync { + constructor(protected db: Kysely) {} + + protected auditTableFilters, D>( + qb: SelectQueryBuilder, + ack?: SyncAck, + ) { + const builder = qb as SelectQueryBuilder; + return builder + .where('deletedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) + .orderBy('id', 'asc') as SelectQueryBuilder; } - upsertCheckpoints(items: Insertable[]) { - return this.db - .insertInto('session_sync_checkpoints') - .values(items) - .onConflict((oc) => - oc.columns(['sessionId', 'type']).doUpdateSet((eb) => ({ - ack: eb.ref('excluded.ack'), - })), - ) - .execute(); - } - - @GenerateSql({ params: [DummyValue.UUID] }) - deleteCheckpoints(sessionId: string, types?: SyncEntityType[]) { - return this.db - .deleteFrom('session_sync_checkpoints') - .where('sessionId', '=', sessionId) - .$if(!!types, (qb) => qb.where('type', 'in', types!)) - .execute(); - } - - @GenerateSql({ params: [], stream: true }) - getUserUpserts(ack?: SyncAck) { - return this.db - .selectFrom('users') - .select(['id', 'name', 'email', 'deletedAt', 'updateId']) - .$call((qb) => this.upsertTableFilters(qb, ack)) - .stream(); - } - - @GenerateSql({ params: [], stream: true }) - getUserDeletes(ack?: SyncAck) { - return this.db - .selectFrom('users_audit') - .select(['id', 'userId']) - .$call((qb) => this.auditTableFilters(qb, ack)) - .stream(); - } - - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getPartnerUpserts(userId: string, ack?: SyncAck) { - return this.db - .selectFrom('partners') - .select(['sharedById', 'sharedWithId', 'inTimeline', 'updateId']) - .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) - .$call((qb) => this.upsertTableFilters(qb, ack)) - .stream(); - } - - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getPartnerDeletes(userId: string, ack?: SyncAck) { - return this.db - .selectFrom('partners_audit') - .select(['id', 'sharedById', 'sharedWithId']) - .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) - .$call((qb) => this.auditTableFilters(qb, ack)) - .stream(); - } - - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getAssetUpserts(userId: string, ack?: SyncAck) { - return this.db - .selectFrom('assets') - .select(columns.syncAsset) - .select('assets.updateId') - .where('ownerId', '=', userId) - .$call((qb) => this.upsertTableFilters(qb, ack)) - .stream(); + protected upsertTableFilters, D>( + qb: SelectQueryBuilder, + ack?: SyncAck, + ) { + const builder = qb as SelectQueryBuilder; + return builder + .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) + .orderBy('updateId', 'asc') as SelectQueryBuilder; } +} +class AlbumSync extends BaseSync { @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID] }) - getPartnerBackfill(userId: string, afterCreateId?: string) { + getCreatedAfter(userId: string, afterCreateId?: string) { return this.db - .selectFrom('partners') - .select(['sharedById', 'createId']) - .where('sharedWithId', '=', userId) + .selectFrom('albums_shared_users_users') + .select(['albumsId as id', 'createId']) + .where('usersId', '=', userId) .$if(!!afterCreateId, (qb) => qb.where('createId', '>=', afterCreateId!)) .where('createdAt', '<', sql.raw("now() - interval '1 millisecond'")) - .orderBy('partners.createId', 'asc') + .orderBy('createId', 'asc') .execute(); } - @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) - getPartnerAssetsBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { - return this.db - .selectFrom('assets') - .select(columns.syncAsset) - .select('assets.updateId') - .where('ownerId', '=', partnerId) - .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .where('updateId', '<=', beforeUpdateId) - .$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!)) - .orderBy('updateId', 'asc') - .stream(); - } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getPartnerAssetsUpserts(userId: string, ack?: SyncAck) { - return this.db - .selectFrom('assets') - .select(columns.syncAsset) - .select('assets.updateId') - .where('ownerId', 'in', (eb) => - eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), - ) - .$call((qb) => this.upsertTableFilters(qb, ack)) - .stream(); - } - - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getAssetDeletes(userId: string, ack?: SyncAck) { - return this.db - .selectFrom('assets_audit') - .select(['id', 'assetId']) - .where('ownerId', '=', userId) - .$call((qb) => this.auditTableFilters(qb, ack)) - .stream(); - } - - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getPartnerAssetDeletes(userId: string, ack?: SyncAck) { - return this.db - .selectFrom('assets_audit') - .select(['id', 'assetId']) - .where('ownerId', 'in', (eb) => - eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), - ) - .$call((qb) => this.auditTableFilters(qb, ack)) - .stream(); - } - - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getAssetExifsUpserts(userId: string, ack?: SyncAck) { - return this.db - .selectFrom('exif') - .select(columns.syncAssetExif) - .select('exif.updateId') - .where('assetId', 'in', (eb) => eb.selectFrom('assets').select('id').where('ownerId', '=', userId)) - .$call((qb) => this.upsertTableFilters(qb, ack)) - .stream(); - } - - @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) - getPartnerAssetExifsBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { - return this.db - .selectFrom('exif') - .select(columns.syncAssetExif) - .select('exif.updateId') - .innerJoin('assets', 'assets.id', 'exif.assetId') - .where('assets.ownerId', '=', partnerId) - .where('exif.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .where('exif.updateId', '<=', beforeUpdateId) - .$if(!!afterUpdateId, (eb) => eb.where('exif.updateId', '>=', afterUpdateId!)) - .orderBy('exif.updateId', 'asc') - .stream(); - } - - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getPartnerAssetExifsUpserts(userId: string, ack?: SyncAck) { - return this.db - .selectFrom('exif') - .select(columns.syncAssetExif) - .select('exif.updateId') - .where('assetId', 'in', (eb) => - eb - .selectFrom('assets') - .select('id') - .where('ownerId', 'in', (eb) => - eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), - ), - ) - .$call((qb) => this.upsertTableFilters(qb, ack)) - .stream(); - } - - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getAlbumDeletes(userId: string, ack?: SyncAck) { + getDeletes(userId: string, ack?: SyncAck) { return this.db .selectFrom('albums_audit') .select(['id', 'albumId']) @@ -225,7 +108,7 @@ export class SyncRepository { } @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getAlbumUpserts(userId: string, ack?: SyncAck) { + getUpserts(userId: string, ack?: SyncAck) { return this.db .selectFrom('albums') .distinctOn(['albums.id', 'albums.updateId']) @@ -248,9 +131,90 @@ export class SyncRepository { ]) .stream(); } +} + +class AlbumAssetSync extends BaseSync { + @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) + getBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { + return this.db + .selectFrom('assets') + .innerJoin('albums_assets_assets as album_assets', 'album_assets.assetsId', 'assets.id') + .select(columns.syncAsset) + .select('assets.updateId') + .where('album_assets.albumsId', '=', albumId) + .where('assets.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('assets.updateId', '<=', beforeUpdateId) + .$if(!!afterUpdateId, (eb) => eb.where('assets.updateId', '>=', afterUpdateId!)) + .orderBy('assets.updateId', 'asc') + .stream(); + } @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getAlbumToAssetDeletes(userId: string, ack?: SyncAck) { + getUpserts(userId: string, ack?: SyncAck) { + return this.db + .selectFrom('assets') + .innerJoin('albums_assets_assets as album_assets', 'album_assets.assetsId', 'assets.id') + .select(columns.syncAsset) + .select('assets.updateId') + .where('assets.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .$if(!!ack, (qb) => qb.where('assets.updateId', '>', ack!.updateId)) + .orderBy('assets.updateId', 'asc') + .innerJoin('albums', 'albums.id', 'album_assets.albumsId') + .leftJoin('albums_shared_users_users as album_users', 'album_users.albumsId', 'album_assets.albumsId') + .where((eb) => eb.or([eb('albums.ownerId', '=', userId), eb('album_users.usersId', '=', userId)])) + .stream(); + } +} + +class AlbumAssetExifSync extends BaseSync { + @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) + getBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { + return this.db + .selectFrom('exif') + .innerJoin('albums_assets_assets as album_assets', 'album_assets.assetsId', 'exif.assetId') + .select(columns.syncAssetExif) + .select('exif.updateId') + .where('album_assets.albumsId', '=', albumId) + .where('exif.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('exif.updateId', '<=', beforeUpdateId) + .$if(!!afterUpdateId, (eb) => eb.where('exif.updateId', '>=', afterUpdateId!)) + .orderBy('exif.updateId', 'asc') + .stream(); + } + + @GenerateSql({ params: [DummyValue.UUID], stream: true }) + getUpserts(userId: string, ack?: SyncAck) { + return this.db + .selectFrom('exif') + .innerJoin('albums_assets_assets as album_assets', 'album_assets.assetsId', 'exif.assetId') + .select(columns.syncAssetExif) + .select('exif.updateId') + .where('exif.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .$if(!!ack, (qb) => qb.where('exif.updateId', '>', ack!.updateId)) + .orderBy('exif.updateId', 'asc') + .innerJoin('albums', 'albums.id', 'album_assets.albumsId') + .leftJoin('albums_shared_users_users as album_users', 'album_users.albumsId', 'album_assets.albumsId') + .where((eb) => eb.or([eb('albums.ownerId', '=', userId), eb('album_users.usersId', '=', userId)])) + .stream(); + } +} + +class AlbumToAssetSync extends BaseSync { + @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) + getBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { + return this.db + .selectFrom('albums_assets_assets as album_assets') + .select(['album_assets.assetsId as assetId', 'album_assets.albumsId as albumId', 'album_assets.updateId']) + .where('album_assets.albumsId', '=', albumId) + .where('album_assets.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('album_assets.updateId', '<=', beforeUpdateId) + .$if(!!afterUpdateId, (eb) => eb.where('album_assets.updateId', '>=', afterUpdateId!)) + .orderBy('album_assets.updateId', 'asc') + .stream(); + } + + @GenerateSql({ params: [DummyValue.UUID], stream: true }) + getDeletes(userId: string, ack?: SyncAck) { return this.db .selectFrom('album_assets_audit') .select(['id', 'assetId', 'albumId']) @@ -277,7 +241,37 @@ export class SyncRepository { } @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getAlbumUserDeletes(userId: string, ack?: SyncAck) { + getUpserts(userId: string, ack?: SyncAck) { + return this.db + .selectFrom('albums_assets_assets as album_assets') + .select(['album_assets.assetsId as assetId', 'album_assets.albumsId as albumId', 'album_assets.updateId']) + .where('album_assets.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .$if(!!ack, (qb) => qb.where('album_assets.updateId', '>', ack!.updateId)) + .orderBy('album_assets.updateId', 'asc') + .innerJoin('albums', 'albums.id', 'album_assets.albumsId') + .leftJoin('albums_shared_users_users as album_users', 'album_users.albumsId', 'album_assets.albumsId') + .where((eb) => eb.or([eb('albums.ownerId', '=', userId), eb('album_users.usersId', '=', userId)])) + .stream(); + } +} + +class AlbumUserSync extends BaseSync { + @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) + getBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { + return this.db + .selectFrom('albums_shared_users_users as album_users') + .select(columns.syncAlbumUser) + .select('album_users.updateId') + .where('albumsId', '=', albumId) + .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('updateId', '<=', beforeUpdateId) + .$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!)) + .orderBy('updateId', 'asc') + .stream(); + } + + @GenerateSql({ params: [DummyValue.UUID], stream: true }) + getDeletes(userId: string, ack?: SyncAck) { return this.db .selectFrom('album_users_audit') .select(['id', 'userId', 'albumId']) @@ -303,34 +297,8 @@ export class SyncRepository { .stream(); } - @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID] }) - getAlbumBackfill(userId: string, afterCreateId?: string) { - return this.db - .selectFrom('albums_shared_users_users') - .select(['albumsId as id', 'createId']) - .where('usersId', '=', userId) - .$if(!!afterCreateId, (qb) => qb.where('createId', '>=', afterCreateId!)) - .where('createdAt', '<', sql.raw("now() - interval '1 millisecond'")) - .orderBy('createId', 'asc') - .execute(); - } - - @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) - getAlbumUsersBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { - return this.db - .selectFrom('albums_shared_users_users as album_users') - .select(columns.syncAlbumUser) - .select('album_users.updateId') - .where('albumsId', '=', albumId) - .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .where('updateId', '<=', beforeUpdateId) - .$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!)) - .orderBy('updateId', 'asc') - .stream(); - } - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getAlbumUserUpserts(userId: string, ack?: SyncAck) { + getUpserts(userId: string, ack?: SyncAck) { return this.db .selectFrom('albums_shared_users_users as album_users') .select(columns.syncAlbumUser) @@ -358,98 +326,57 @@ export class SyncRepository { ) .stream(); } +} - @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) - getAlbumAssetsBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { +class AssetSync extends BaseSync { + @GenerateSql({ params: [DummyValue.UUID], stream: true }) + getDeletes(userId: string, ack?: SyncAck) { + return this.db + .selectFrom('assets_audit') + .select(['id', 'assetId']) + .where('ownerId', '=', userId) + .$call((qb) => this.auditTableFilters(qb, ack)) + .stream(); + } + + @GenerateSql({ params: [DummyValue.UUID], stream: true }) + getUpserts(userId: string, ack?: SyncAck) { return this.db .selectFrom('assets') - .innerJoin('albums_assets_assets as album_assets', 'album_assets.assetsId', 'assets.id') .select(columns.syncAsset) .select('assets.updateId') - .where('album_assets.albumsId', '=', albumId) - .where('assets.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .where('assets.updateId', '<=', beforeUpdateId) - .$if(!!afterUpdateId, (eb) => eb.where('assets.updateId', '>=', afterUpdateId!)) - .orderBy('assets.updateId', 'asc') + .where('ownerId', '=', userId) + .$call((qb) => this.upsertTableFilters(qb, ack)) .stream(); } +} +class AssetExifSync extends BaseSync { @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getAlbumAssetsUpserts(userId: string, ack?: SyncAck) { - return this.db - .selectFrom('assets') - .innerJoin('albums_assets_assets as album_assets', 'album_assets.assetsId', 'assets.id') - .select(columns.syncAsset) - .select('assets.updateId') - .where('assets.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .$if(!!ack, (qb) => qb.where('assets.updateId', '>', ack!.updateId)) - .orderBy('assets.updateId', 'asc') - .innerJoin('albums', 'albums.id', 'album_assets.albumsId') - .leftJoin('albums_shared_users_users as album_users', 'album_users.albumsId', 'album_assets.albumsId') - .where((eb) => eb.or([eb('albums.ownerId', '=', userId), eb('album_users.usersId', '=', userId)])) - .stream(); - } - - @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) - getAlbumToAssetBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { - return this.db - .selectFrom('albums_assets_assets as album_assets') - .select(['album_assets.assetsId as assetId', 'album_assets.albumsId as albumId', 'album_assets.updateId']) - .where('album_assets.albumsId', '=', albumId) - .where('album_assets.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .where('album_assets.updateId', '<=', beforeUpdateId) - .$if(!!afterUpdateId, (eb) => eb.where('album_assets.updateId', '>=', afterUpdateId!)) - .orderBy('album_assets.updateId', 'asc') - .stream(); - } - - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getAlbumToAssetUpserts(userId: string, ack?: SyncAck) { - return this.db - .selectFrom('albums_assets_assets as album_assets') - .select(['album_assets.assetsId as assetId', 'album_assets.albumsId as albumId', 'album_assets.updateId']) - .where('album_assets.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .$if(!!ack, (qb) => qb.where('album_assets.updateId', '>', ack!.updateId)) - .orderBy('album_assets.updateId', 'asc') - .innerJoin('albums', 'albums.id', 'album_assets.albumsId') - .leftJoin('albums_shared_users_users as album_users', 'album_users.albumsId', 'album_assets.albumsId') - .where((eb) => eb.or([eb('albums.ownerId', '=', userId), eb('album_users.usersId', '=', userId)])) - .stream(); - } - - @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) - getAlbumAssetExifsBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { + getUpserts(userId: string, ack?: SyncAck) { return this.db .selectFrom('exif') - .innerJoin('albums_assets_assets as album_assets', 'album_assets.assetsId', 'exif.assetId') .select(columns.syncAssetExif) .select('exif.updateId') - .where('album_assets.albumsId', '=', albumId) - .where('exif.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .where('exif.updateId', '<=', beforeUpdateId) - .$if(!!afterUpdateId, (eb) => eb.where('exif.updateId', '>=', afterUpdateId!)) - .orderBy('exif.updateId', 'asc') + .where('assetId', 'in', (eb) => eb.selectFrom('assets').select('id').where('ownerId', '=', userId)) + .$call((qb) => this.upsertTableFilters(qb, ack)) .stream(); } +} +class MemorySync extends BaseSync { @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getAlbumAssetExifsUpserts(userId: string, ack?: SyncAck) { + getDeletes(userId: string, ack?: SyncAck) { return this.db - .selectFrom('exif') - .innerJoin('albums_assets_assets as album_assets', 'album_assets.assetsId', 'exif.assetId') - .select(columns.syncAssetExif) - .select('exif.updateId') - .where('exif.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .$if(!!ack, (qb) => qb.where('exif.updateId', '>', ack!.updateId)) - .orderBy('exif.updateId', 'asc') - .innerJoin('albums', 'albums.id', 'album_assets.albumsId') - .leftJoin('albums_shared_users_users as album_users', 'album_users.albumsId', 'album_assets.albumsId') - .where((eb) => eb.or([eb('albums.ownerId', '=', userId), eb('album_users.usersId', '=', userId)])) + .selectFrom('memories_audit') + .select(['id', 'memoryId']) + .where('userId', '=', userId) + .$call((qb) => this.auditTableFilters(qb, ack)) .stream(); } @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getMemoryUpserts(userId: string, ack?: SyncAck) { + getUpserts(userId: string, ack?: SyncAck) { return this.db .selectFrom('memories') .select([ @@ -471,30 +398,11 @@ export class SyncRepository { .$call((qb) => this.upsertTableFilters(qb, ack)) .stream(); } +} +class MemoryToAssetSync extends BaseSync { @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getMemoryDeletes(userId: string, ack?: SyncAck) { - return this.db - .selectFrom('memories_audit') - .select(['id', 'memoryId']) - .where('userId', '=', userId) - .$call((qb) => this.auditTableFilters(qb, ack)) - .stream(); - } - - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getMemoryAssetUpserts(userId: string, ack?: SyncAck) { - return this.db - .selectFrom('memories_assets_assets') - .select(['memoriesId as memoryId', 'assetsId as assetId']) - .select('updateId') - .where('memoriesId', 'in', (eb) => eb.selectFrom('memories').select('id').where('ownerId', '=', userId)) - .$call((qb) => this.upsertTableFilters(qb, ack)) - .stream(); - } - - @GenerateSql({ params: [DummyValue.UUID], stream: true }) - getMemoryAssetDeletes(userId: string, ack?: SyncAck) { + getDeletes(userId: string, ack?: SyncAck) { return this.db .selectFrom('memory_assets_audit') .select(['id', 'memoryId', 'assetId']) @@ -503,22 +411,144 @@ export class SyncRepository { .stream(); } - private auditTableFilters, D>(qb: SelectQueryBuilder, ack?: SyncAck) { - const builder = qb as SelectQueryBuilder; - return builder - .where('deletedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) - .orderBy('id', 'asc') as SelectQueryBuilder; - } - - private upsertTableFilters, D>( - qb: SelectQueryBuilder, - ack?: SyncAck, - ) { - const builder = qb as SelectQueryBuilder; - return builder - .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) - .orderBy('updateId', 'asc') as SelectQueryBuilder; + @GenerateSql({ params: [DummyValue.UUID], stream: true }) + getUpserts(userId: string, ack?: SyncAck) { + return this.db + .selectFrom('memories_assets_assets') + .select(['memoriesId as memoryId', 'assetsId as assetId']) + .select('updateId') + .where('memoriesId', 'in', (eb) => eb.selectFrom('memories').select('id').where('ownerId', '=', userId)) + .$call((qb) => this.upsertTableFilters(qb, ack)) + .stream(); + } +} + +class PartnerSync extends BaseSync { + @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID] }) + getCreatedAfter(userId: string, afterCreateId?: string) { + return this.db + .selectFrom('partners') + .select(['sharedById', 'createId']) + .where('sharedWithId', '=', userId) + .$if(!!afterCreateId, (qb) => qb.where('createId', '>=', afterCreateId!)) + .where('createdAt', '<', sql.raw("now() - interval '1 millisecond'")) + .orderBy('partners.createId', 'asc') + .execute(); + } + + @GenerateSql({ params: [DummyValue.UUID], stream: true }) + getDeletes(userId: string, ack?: SyncAck) { + return this.db + .selectFrom('partners_audit') + .select(['id', 'sharedById', 'sharedWithId']) + .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) + .$call((qb) => this.auditTableFilters(qb, ack)) + .stream(); + } + + @GenerateSql({ params: [DummyValue.UUID], stream: true }) + getUpserts(userId: string, ack?: SyncAck) { + return this.db + .selectFrom('partners') + .select(['sharedById', 'sharedWithId', 'inTimeline', 'updateId']) + .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) + .$call((qb) => this.upsertTableFilters(qb, ack)) + .stream(); + } +} + +class PartnerAssetsSync extends BaseSync { + @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) + getBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { + return this.db + .selectFrom('assets') + .select(columns.syncAsset) + .select('assets.updateId') + .where('ownerId', '=', partnerId) + .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('updateId', '<=', beforeUpdateId) + .$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!)) + .orderBy('updateId', 'asc') + .stream(); + } + + @GenerateSql({ params: [DummyValue.UUID], stream: true }) + getDeletes(userId: string, ack?: SyncAck) { + return this.db + .selectFrom('assets_audit') + .select(['id', 'assetId']) + .where('ownerId', 'in', (eb) => + eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), + ) + .$call((qb) => this.auditTableFilters(qb, ack)) + .stream(); + } + + @GenerateSql({ params: [DummyValue.UUID], stream: true }) + getUpserts(userId: string, ack?: SyncAck) { + return this.db + .selectFrom('assets') + .select(columns.syncAsset) + .select('assets.updateId') + .where('ownerId', 'in', (eb) => + eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), + ) + .$call((qb) => this.upsertTableFilters(qb, ack)) + .stream(); + } +} + +class PartnerAssetExifsSync extends BaseSync { + @GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) + getBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { + return this.db + .selectFrom('exif') + .select(columns.syncAssetExif) + .select('exif.updateId') + .innerJoin('assets', 'assets.id', 'exif.assetId') + .where('assets.ownerId', '=', partnerId) + .where('exif.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .where('exif.updateId', '<=', beforeUpdateId) + .$if(!!afterUpdateId, (eb) => eb.where('exif.updateId', '>=', afterUpdateId!)) + .orderBy('exif.updateId', 'asc') + .stream(); + } + + @GenerateSql({ params: [DummyValue.UUID], stream: true }) + getUpserts(userId: string, ack?: SyncAck) { + return this.db + .selectFrom('exif') + .select(columns.syncAssetExif) + .select('exif.updateId') + .where('assetId', 'in', (eb) => + eb + .selectFrom('assets') + .select('id') + .where('ownerId', 'in', (eb) => + eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), + ), + ) + .$call((qb) => this.upsertTableFilters(qb, ack)) + .stream(); + } +} + +class UserSync extends BaseSync { + @GenerateSql({ params: [], stream: true }) + getDeletes(ack?: SyncAck) { + return this.db + .selectFrom('users_audit') + .select(['id', 'userId']) + .$call((qb) => this.auditTableFilters(qb, ack)) + .stream(); + } + + @GenerateSql({ params: [], stream: true }) + getUpserts(ack?: SyncAck) { + return this.db + .selectFrom('users') + .select(['id', 'name', 'email', 'deletedAt', 'updateId']) + .$call((qb) => this.upsertTableFilters(qb, ack)) + .stream(); } } diff --git a/server/src/services/base.service.ts b/server/src/services/base.service.ts index 3e2128ac2c..2f0e272883 100644 --- a/server/src/services/base.service.ts +++ b/server/src/services/base.service.ts @@ -41,6 +41,7 @@ import { SessionRepository } from 'src/repositories/session.repository'; import { SharedLinkRepository } from 'src/repositories/shared-link.repository'; import { StackRepository } from 'src/repositories/stack.repository'; import { StorageRepository } from 'src/repositories/storage.repository'; +import { SyncCheckpointRepository } from 'src/repositories/sync-checkpoint.repository'; import { SyncRepository } from 'src/repositories/sync.repository'; import { SystemMetadataRepository } from 'src/repositories/system-metadata.repository'; import { TagRepository } from 'src/repositories/tag.repository'; @@ -91,6 +92,7 @@ export const BASE_SERVICE_DEPENDENCIES = [ StackRepository, StorageRepository, SyncRepository, + SyncCheckpointRepository, SystemMetadataRepository, TagRepository, TelemetryRepository, @@ -142,6 +144,7 @@ export class BaseService { protected stackRepository: StackRepository, protected storageRepository: StorageRepository, protected syncRepository: SyncRepository, + protected syncCheckpointRepository: SyncCheckpointRepository, protected systemMetadataRepository: SystemMetadataRepository, protected tagRepository: TagRepository, protected telemetryRepository: TelemetryRepository, diff --git a/server/src/services/sync.service.ts b/server/src/services/sync.service.ts index 7e97f07370..62ec1f02a8 100644 --- a/server/src/services/sync.service.ts +++ b/server/src/services/sync.service.ts @@ -81,7 +81,7 @@ export class SyncService extends BaseService { return throwSessionRequired(); } - return this.syncRepository.getCheckpoints(sessionId); + return this.syncCheckpointRepository.getAll(sessionId); } async setAcks(auth: AuthDto, dto: SyncAckSetDto) { @@ -102,7 +102,7 @@ export class SyncService extends BaseService { checkpoints[type] = { sessionId, type, ack }; } - await this.syncRepository.upsertCheckpoints(Object.values(checkpoints)); + await this.syncCheckpointRepository.upsertAll(Object.values(checkpoints)); } async deleteAcks(auth: AuthDto, dto: SyncAckDeleteDto) { @@ -111,7 +111,7 @@ export class SyncService extends BaseService { return throwSessionRequired(); } - await this.syncRepository.deleteCheckpoints(sessionId, dto.types); + await this.syncCheckpointRepository.deleteAll(sessionId, dto.types); } async stream(auth: AuthDto, response: Writable, dto: SyncStreamDto) { @@ -120,7 +120,7 @@ export class SyncService extends BaseService { return throwSessionRequired(); } - const checkpoints = await this.syncRepository.getCheckpoints(sessionId); + const checkpoints = await this.syncCheckpointRepository.getAll(sessionId); const checkpointMap: CheckpointMap = Object.fromEntries(checkpoints.map(({ type, ack }) => [type, fromAck(ack)])); const handlers: Record Promise> = { [SyncRequestType.UsersV1]: () => this.syncUsersV1(response, checkpointMap), @@ -149,13 +149,13 @@ export class SyncService extends BaseService { private async syncUsersV1(response: Writable, checkpointMap: CheckpointMap) { const deleteType = SyncEntityType.UserDeleteV1; - const deletes = this.syncRepository.getUserDeletes(checkpointMap[deleteType]); + const deletes = this.syncRepository.user.getDeletes(checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.UserV1; - const upserts = this.syncRepository.getUserUpserts(checkpointMap[upsertType]); + const upserts = this.syncRepository.user.getUpserts(checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -163,13 +163,13 @@ export class SyncService extends BaseService { private async syncPartnersV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { const deleteType = SyncEntityType.PartnerDeleteV1; - const deletes = this.syncRepository.getPartnerDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.partner.getDeletes(auth.user.id, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.PartnerV1; - const upserts = this.syncRepository.getPartnerUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.partner.getUpserts(auth.user.id, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -177,13 +177,13 @@ export class SyncService extends BaseService { private async syncAssetsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { const deleteType = SyncEntityType.AssetDeleteV1; - const deletes = this.syncRepository.getAssetDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.asset.getDeletes(auth.user.id, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.AssetV1; - const upserts = this.syncRepository.getAssetUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.asset.getUpserts(auth.user.id, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data: mapSyncAssetV1(data) }); } @@ -196,14 +196,14 @@ export class SyncService extends BaseService { sessionId: string, ) { const deleteType = SyncEntityType.PartnerAssetDeleteV1; - const deletes = this.syncRepository.getPartnerAssetDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.partnerAsset.getDeletes(auth.user.id, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const backfillType = SyncEntityType.PartnerAssetBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const partners = await this.syncRepository.getPartnerBackfill(auth.user.id, backfillCheckpoint?.updateId); + const partners = await this.syncRepository.partner.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); const upsertType = SyncEntityType.PartnerAssetV1; const upsertCheckpoint = checkpointMap[upsertType]; if (upsertCheckpoint) { @@ -216,7 +216,7 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.getPartnerAssetsBackfill(partner.sharedById, startId, endId); + const backfill = this.syncRepository.partnerAsset.getBackfill(partner.sharedById, startId, endId); for await (const { updateId, ...data } of backfill) { send(response, { @@ -236,7 +236,7 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.getPartnerAssetsUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.partnerAsset.getUpserts(auth.user.id, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data: mapSyncAssetV1(data) }); } @@ -244,7 +244,7 @@ export class SyncService extends BaseService { private async syncAssetExifsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { const upsertType = SyncEntityType.AssetExifV1; - const upserts = this.syncRepository.getAssetExifsUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.assetExif.getUpserts(auth.user.id, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -258,7 +258,7 @@ export class SyncService extends BaseService { ) { const backfillType = SyncEntityType.PartnerAssetExifBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const partners = await this.syncRepository.getPartnerBackfill(auth.user.id, backfillCheckpoint?.updateId); + const partners = await this.syncRepository.partner.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); const upsertType = SyncEntityType.PartnerAssetExifV1; const upsertCheckpoint = checkpointMap[upsertType]; @@ -272,7 +272,7 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.getPartnerAssetExifsBackfill(partner.sharedById, startId, endId); + const backfill = this.syncRepository.partnerAssetExif.getBackfill(partner.sharedById, startId, endId); for await (const { updateId, ...data } of backfill) { send(response, { type: backfillType, ids: [partner.createId, updateId], data }); @@ -288,7 +288,7 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.getPartnerAssetExifsUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.partnerAssetExif.getUpserts(auth.user.id, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -296,13 +296,13 @@ export class SyncService extends BaseService { private async syncAlbumsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { const deleteType = SyncEntityType.AlbumDeleteV1; - const deletes = this.syncRepository.getAlbumDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.album.getDeletes(auth.user.id, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.AlbumV1; - const upserts = this.syncRepository.getAlbumUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.album.getUpserts(auth.user.id, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -310,14 +310,14 @@ export class SyncService extends BaseService { private async syncAlbumUsersV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto, sessionId: string) { const deleteType = SyncEntityType.AlbumUserDeleteV1; - const deletes = this.syncRepository.getAlbumUserDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.albumUser.getDeletes(auth.user.id, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const backfillType = SyncEntityType.AlbumUserBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const albums = await this.syncRepository.getAlbumBackfill(auth.user.id, backfillCheckpoint?.updateId); + const albums = await this.syncRepository.album.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); const upsertType = SyncEntityType.AlbumUserV1; const upsertCheckpoint = checkpointMap[upsertType]; if (upsertCheckpoint) { @@ -330,7 +330,7 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.getAlbumUsersBackfill(album.id, startId, endId); + const backfill = this.syncRepository.albumUser.getBackfill(album.id, startId, endId); for await (const { updateId, ...data } of backfill) { send(response, { type: backfillType, ids: [createId, updateId], data }); @@ -346,7 +346,7 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.getAlbumUserUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.albumUser.getUpserts(auth.user.id, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -355,7 +355,7 @@ export class SyncService extends BaseService { private async syncAlbumAssetsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto, sessionId: string) { const backfillType = SyncEntityType.AlbumAssetBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const albums = await this.syncRepository.getAlbumBackfill(auth.user.id, backfillCheckpoint?.updateId); + const albums = await this.syncRepository.album.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); const upsertType = SyncEntityType.AlbumAssetV1; const upsertCheckpoint = checkpointMap[upsertType]; if (upsertCheckpoint) { @@ -368,7 +368,7 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.getAlbumAssetsBackfill(album.id, startId, endId); + const backfill = this.syncRepository.albumAsset.getBackfill(album.id, startId, endId); for await (const { updateId, ...data } of backfill) { send(response, { type: backfillType, ids: [createId, updateId], data: mapSyncAssetV1(data) }); @@ -384,7 +384,7 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.getAlbumAssetsUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.albumAsset.getUpserts(auth.user.id, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data: mapSyncAssetV1(data) }); } @@ -398,7 +398,7 @@ export class SyncService extends BaseService { ) { const backfillType = SyncEntityType.AlbumAssetExifBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const albums = await this.syncRepository.getAlbumBackfill(auth.user.id, backfillCheckpoint?.updateId); + const albums = await this.syncRepository.album.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); const upsertType = SyncEntityType.AlbumAssetExifV1; const upsertCheckpoint = checkpointMap[upsertType]; if (upsertCheckpoint) { @@ -411,7 +411,7 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.getAlbumAssetExifsBackfill(album.id, startId, endId); + const backfill = this.syncRepository.albumAssetExif.getBackfill(album.id, startId, endId); for await (const { updateId, ...data } of backfill) { send(response, { type: backfillType, ids: [createId, updateId], data }); @@ -427,7 +427,7 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.getAlbumAssetExifsUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.albumAssetExif.getUpserts(auth.user.id, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -440,14 +440,14 @@ export class SyncService extends BaseService { sessionId: string, ) { const deleteType = SyncEntityType.AlbumToAssetDeleteV1; - const deletes = this.syncRepository.getAlbumToAssetDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.albumToAsset.getDeletes(auth.user.id, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const backfillType = SyncEntityType.AlbumToAssetBackfillV1; const backfillCheckpoint = checkpointMap[backfillType]; - const albums = await this.syncRepository.getAlbumBackfill(auth.user.id, backfillCheckpoint?.updateId); + const albums = await this.syncRepository.album.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); const upsertType = SyncEntityType.AlbumToAssetV1; const upsertCheckpoint = checkpointMap[upsertType]; if (upsertCheckpoint) { @@ -460,7 +460,7 @@ export class SyncService extends BaseService { } const startId = getStartId(createId, backfillCheckpoint); - const backfill = this.syncRepository.getAlbumToAssetBackfill(album.id, startId, endId); + const backfill = this.syncRepository.albumToAsset.getBackfill(album.id, startId, endId); for await (const { updateId, ...data } of backfill) { send(response, { type: backfillType, ids: [createId, updateId], data }); @@ -476,7 +476,7 @@ export class SyncService extends BaseService { }); } - const upserts = this.syncRepository.getAlbumToAssetUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.albumToAsset.getUpserts(auth.user.id, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -484,13 +484,13 @@ export class SyncService extends BaseService { private async syncMemoriesV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { const deleteType = SyncEntityType.MemoryDeleteV1; - const deletes = this.syncRepository.getMemoryDeletes(auth.user.id, checkpointMap[SyncEntityType.MemoryDeleteV1]); + const deletes = this.syncRepository.memory.getDeletes(auth.user.id, checkpointMap[SyncEntityType.MemoryDeleteV1]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.MemoryV1; - const upserts = this.syncRepository.getMemoryUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.memory.getUpserts(auth.user.id, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -498,13 +498,13 @@ export class SyncService extends BaseService { private async syncMemoryAssetsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { const deleteType = SyncEntityType.MemoryToAssetDeleteV1; - const deletes = this.syncRepository.getMemoryAssetDeletes(auth.user.id, checkpointMap[deleteType]); + const deletes = this.syncRepository.memoryToAsset.getDeletes(auth.user.id, checkpointMap[deleteType]); for await (const { id, ...data } of deletes) { send(response, { type: deleteType, ids: [id], data }); } const upsertType = SyncEntityType.MemoryToAssetV1; - const upserts = this.syncRepository.getMemoryAssetUpserts(auth.user.id, checkpointMap[upsertType]); + const upserts = this.syncRepository.memoryToAsset.getUpserts(auth.user.id, checkpointMap[upsertType]); for await (const { updateId, ...data } of upserts) { send(response, { type: upsertType, ids: [updateId], data }); } @@ -512,7 +512,7 @@ export class SyncService extends BaseService { private async upsertBackfillCheckpoint(item: { type: SyncEntityType; sessionId: string; createId: string }) { const { type, sessionId, createId } = item; - await this.syncRepository.upsertCheckpoints([ + await this.syncCheckpointRepository.upsertAll([ { type, sessionId, diff --git a/server/test/medium.factory.ts b/server/test/medium.factory.ts index 8c113a13cf..922c5d3fdb 100644 --- a/server/test/medium.factory.ts +++ b/server/test/medium.factory.ts @@ -26,6 +26,7 @@ import { PersonRepository } from 'src/repositories/person.repository'; import { SearchRepository } from 'src/repositories/search.repository'; import { SessionRepository } from 'src/repositories/session.repository'; import { StorageRepository } from 'src/repositories/storage.repository'; +import { SyncCheckpointRepository } from 'src/repositories/sync-checkpoint.repository'; import { SyncRepository } from 'src/repositories/sync.repository'; import { SystemMetadataRepository } from 'src/repositories/system-metadata.repository'; import { UserRepository } from 'src/repositories/user.repository'; @@ -202,7 +203,11 @@ export class MediumTestContext { export class SyncTestContext extends MediumTestContext { constructor(database: Kysely) { - super(SyncService, { database, real: [SyncRepository, SessionRepository], mock: [LoggingRepository] }); + super(SyncService, { + database, + real: [SyncRepository, SyncCheckpointRepository, SessionRepository], + mock: [LoggingRepository], + }); } async syncStream(auth: AuthDto, types: SyncRequestType[]) { @@ -239,6 +244,7 @@ const newRealRepository = (key: ClassConstructor, db: Kysely): T => { case SearchRepository: case SessionRepository: case SyncRepository: + case SyncCheckpointRepository: case SystemMetadataRepository: case UserRepository: case VersionHistoryRepository: { @@ -282,6 +288,7 @@ const newMockRepository = (key: ClassConstructor) => { case PersonRepository: case SessionRepository: case SyncRepository: + case SyncCheckpointRepository: case SystemMetadataRepository: case UserRepository: case VersionHistoryRepository: { diff --git a/server/test/utils.ts b/server/test/utils.ts index 0e07e603b5..0b4c743802 100644 --- a/server/test/utils.ts +++ b/server/test/utils.ts @@ -47,6 +47,7 @@ import { SessionRepository } from 'src/repositories/session.repository'; import { SharedLinkRepository } from 'src/repositories/shared-link.repository'; import { StackRepository } from 'src/repositories/stack.repository'; import { StorageRepository } from 'src/repositories/storage.repository'; +import { SyncCheckpointRepository } from 'src/repositories/sync-checkpoint.repository'; import { SyncRepository } from 'src/repositories/sync.repository'; import { SystemMetadataRepository } from 'src/repositories/system-metadata.repository'; import { TagRepository } from 'src/repositories/tag.repository'; @@ -217,6 +218,7 @@ export type ServiceOverrides = { stack: StackRepository; storage: StorageRepository; sync: SyncRepository; + syncCheckpoint: SyncCheckpointRepository; systemMetadata: SystemMetadataRepository; tag: TagRepository; telemetry: TelemetryRepository; @@ -287,6 +289,7 @@ export const newTestService = ( stack: automock(StackRepository), storage: newStorageRepositoryMock(), sync: automock(SyncRepository), + syncCheckpoint: automock(SyncCheckpointRepository), systemMetadata: newSystemMetadataRepositoryMock(), // systemMetadata: automock(SystemMetadataRepository, { strict: false }), // eslint-disable-next-line no-sparse-arrays @@ -336,6 +339,7 @@ export const newTestService = ( overrides.stack || (mocks.stack as As), overrides.storage || (mocks.storage as As), overrides.sync || (mocks.sync as As), + overrides.syncCheckpoint || (mocks.syncCheckpoint as As), overrides.systemMetadata || (mocks.systemMetadata as As), overrides.tag || (mocks.tag as As), overrides.telemetry || (mocks.telemetry as unknown as TelemetryRepository),