fix: use a consistent point in time for all updates in a single sync

This commit is contained in:
Zack Pollard 2025-08-05 00:58:34 +01:00
parent 220ee82c18
commit 7724277416
6 changed files with 374 additions and 321 deletions

View File

@ -13,3 +13,7 @@ where
delete from "session_sync_checkpoint" delete from "session_sync_checkpoint"
where where
"sessionId" = $1 "sessionId" = $1
-- SyncCheckpointRepository.getNow
select
immich_uuid_v7 (now() - interval '1 millisecond') as "nowId"

View File

@ -9,7 +9,7 @@ from
where where
"usersId" = $1 "usersId" = $1
and "createId" >= $2 and "createId" >= $2
and "createdAt" < now() - interval '1 millisecond' and "createId" < $3
order by order by
"createId" asc "createId" asc
@ -21,7 +21,7 @@ from
"album_audit" "album_audit"
where where
"userId" = $1 "userId" = $1
and "deletedAt" < now() - interval '1 millisecond' and "id" < $2
order by order by
"id" asc "id" asc
@ -41,10 +41,10 @@ from
"album" "album"
left join "album_user" as "album_users" on "album"."id" = "album_users"."albumsId" left join "album_user" as "album_users" on "album"."id" = "album_users"."albumsId"
where where
"album"."updatedAt" < now() - interval '1 millisecond' "album"."updateId" < $1
and ( and (
"album"."ownerId" = $1 "album"."ownerId" = $2
or "album_users"."usersId" = $2 or "album_users"."usersId" = $3
) )
order by order by
"album"."updateId" asc "album"."updateId" asc
@ -67,20 +67,21 @@ select
"asset"."livePhotoVideoId", "asset"."livePhotoVideoId",
"asset"."stackId", "asset"."stackId",
"asset"."libraryId", "asset"."libraryId",
"asset"."updateId" "album_asset"."updateId"
from from
"asset" "album_asset"
inner join "album_asset" on "album_asset"."assetsId" = "asset"."id" inner join "asset" on "asset"."id" = "album_asset"."assetsId"
where where
"album_asset"."albumsId" = $1 "album_asset"."albumsId" = $1
and "asset"."updatedAt" < now() - interval '1 millisecond' and "album_asset"."updateId" < $2
and "asset"."updateId" <= $2 and "album_asset"."updateId" <= $3
and "asset"."updateId" >= $3 and "album_asset"."updateId" >= $4
order by order by
"asset"."updateId" asc "album_asset"."updateId" asc
-- SyncRepository.albumAsset.getUpserts -- SyncRepository.albumAsset.getCreates
select select
"album_asset"."updateId",
"asset"."id", "asset"."id",
"asset"."ownerId", "asset"."ownerId",
"asset"."originalFileName", "asset"."originalFileName",
@ -96,21 +97,20 @@ select
"asset"."duration", "asset"."duration",
"asset"."livePhotoVideoId", "asset"."livePhotoVideoId",
"asset"."stackId", "asset"."stackId",
"asset"."libraryId", "asset"."libraryId"
"asset"."updateId"
from from
"asset" "album_asset"
inner join "album_asset" on "album_asset"."assetsId" = "asset"."id" inner join "asset" on "asset"."id" = "album_asset"."assetsId"
inner join "album" on "album"."id" = "album_asset"."albumsId" inner join "album" on "album"."id" = "album_asset"."albumsId"
left join "album_user" on "album_user"."albumsId" = "album_asset"."albumsId" left join "album_user" on "album_user"."albumsId" = "album_asset"."albumsId"
where where
"asset"."updatedAt" < now() - interval '1 millisecond' "album_asset"."updateId" < $1
and ( and (
"album"."ownerId" = $1 "album"."ownerId" = $2
or "album_user"."usersId" = $2 or "album_user"."usersId" = $3
) )
order by order by
"asset"."updateId" asc "album_asset"."updateId" asc
-- SyncRepository.albumAssetExif.getBackfill -- SyncRepository.albumAssetExif.getBackfill
select select
@ -145,9 +145,9 @@ from
inner join "album_asset" on "album_asset"."assetsId" = "asset_exif"."assetId" inner join "album_asset" on "album_asset"."assetsId" = "asset_exif"."assetId"
where where
"album_asset"."albumsId" = $1 "album_asset"."albumsId" = $1
and "asset_exif"."updatedAt" < now() - interval '1 millisecond' and "asset_exif"."updateId" < $2
and "asset_exif"."updateId" <= $2 and "asset_exif"."updateId" <= $3
and "asset_exif"."updateId" >= $3 and "asset_exif"."updateId" >= $4
order by order by
"asset_exif"."updateId" asc "asset_exif"."updateId" asc
@ -185,10 +185,10 @@ from
inner join "album" on "album"."id" = "album_asset"."albumsId" inner join "album" on "album"."id" = "album_asset"."albumsId"
left join "album_user" on "album_user"."albumsId" = "album_asset"."albumsId" left join "album_user" on "album_user"."albumsId" = "album_asset"."albumsId"
where where
"asset_exif"."updatedAt" < now() - interval '1 millisecond' "asset_exif"."updateId" < $1
and ( and (
"album"."ownerId" = $1 "album"."ownerId" = $2
or "album_user"."usersId" = $2 or "album_user"."usersId" = $3
) )
order by order by
"asset_exif"."updateId" asc "asset_exif"."updateId" asc
@ -202,9 +202,9 @@ from
"album_asset" as "album_assets" "album_asset" as "album_assets"
where where
"album_assets"."albumsId" = $1 "album_assets"."albumsId" = $1
and "album_assets"."updatedAt" < now() - interval '1 millisecond' and "album_assets"."updateId" < $2
and "album_assets"."updateId" <= $2 and "album_assets"."updateId" <= $3
and "album_assets"."updateId" >= $3 and "album_assets"."updateId" >= $4
order by order by
"album_assets"."updateId" asc "album_assets"."updateId" asc
@ -233,7 +233,7 @@ where
"album_user"."usersId" = $2 "album_user"."usersId" = $2
) )
) )
and "deletedAt" < now() - interval '1 millisecond' and "id" < $3
order by order by
"id" asc "id" asc
@ -247,10 +247,10 @@ from
inner join "album" on "album"."id" = "album_asset"."albumsId" inner join "album" on "album"."id" = "album_asset"."albumsId"
left join "album_user" on "album_user"."albumsId" = "album_asset"."albumsId" left join "album_user" on "album_user"."albumsId" = "album_asset"."albumsId"
where where
"album_asset"."updatedAt" < now() - interval '1 millisecond' "album_asset"."updateId" < $1
and ( and (
"album"."ownerId" = $1 "album"."ownerId" = $2
or "album_user"."usersId" = $2 or "album_user"."usersId" = $3
) )
order by order by
"album_asset"."updateId" asc "album_asset"."updateId" asc
@ -265,9 +265,9 @@ from
"album_user" "album_user"
where where
"albumsId" = $1 "albumsId" = $1
and "updatedAt" < now() - interval '1 millisecond' and "updateId" < $2
and "updateId" <= $2 and "updateId" <= $3
and "updateId" >= $3 and "updateId" >= $4
order by order by
"updateId" asc "updateId" asc
@ -296,7 +296,7 @@ where
"album_user"."usersId" = $2 "album_user"."usersId" = $2
) )
) )
and "deletedAt" < now() - interval '1 millisecond' and "id" < $3
order by order by
"id" asc "id" asc
@ -309,14 +309,14 @@ select
from from
"album_user" "album_user"
where where
"album_user"."updatedAt" < now() - interval '1 millisecond' "album_user"."updateId" < $1
and "album_user"."albumsId" in ( and "album_user"."albumsId" in (
select select
"id" "id"
from from
"album" "album"
where where
"ownerId" = $1 "ownerId" = $2
union union
( (
select select
@ -324,7 +324,7 @@ where
from from
"album_user" as "albumUsers" "album_user" as "albumUsers"
where where
"albumUsers"."usersId" = $2 "albumUsers"."usersId" = $3
) )
) )
order by order by
@ -338,7 +338,7 @@ from
"asset_audit" "asset_audit"
where where
"ownerId" = $1 "ownerId" = $1
and "deletedAt" < now() - interval '1 millisecond' and "id" < $2
order by order by
"id" asc "id" asc
@ -365,7 +365,7 @@ from
"asset" "asset"
where where
"ownerId" = $1 "ownerId" = $1
and "updatedAt" < now() - interval '1 millisecond' and "updateId" < $2
order by order by
"updateId" asc "updateId" asc
@ -408,7 +408,7 @@ where
where where
"ownerId" = $1 "ownerId" = $1
) )
and "updatedAt" < now() - interval '1 millisecond' and "updateId" < $2
order by order by
"updateId" asc "updateId" asc
@ -421,7 +421,7 @@ from
left join "asset" on "asset"."id" = "asset_face_audit"."assetId" left join "asset" on "asset"."id" = "asset_face_audit"."assetId"
where where
"asset"."ownerId" = $1 "asset"."ownerId" = $1
and "asset_face_audit"."deletedAt" < now() - interval '1 millisecond' and "asset_face_audit"."id" < $2
order by order by
"asset_face_audit"."id" asc "asset_face_audit"."id" asc
@ -442,34 +442,11 @@ from
"asset_face" "asset_face"
left join "asset" on "asset"."id" = "asset_face"."assetId" left join "asset" on "asset"."id" = "asset_face"."assetId"
where where
"asset_face"."updatedAt" < now() - interval '1 millisecond' "asset_face"."updateId" < $1
and "asset"."ownerId" = $1 and "asset"."ownerId" = $2
order by order by
"asset_face"."updateId" asc "asset_face"."updateId" asc
-- SyncRepository.authUser.getUpserts
select
"id",
"name",
"email",
"avatarColor",
"deletedAt",
"updateId",
"profileImagePath",
"profileChangedAt",
"isAdmin",
"pinCode",
"oauthId",
"storageLabel",
"quotaSizeInBytes",
"quotaUsageInBytes"
from
"user"
where
"updatedAt" < now() - interval '1 millisecond'
order by
"updateId" asc
-- SyncRepository.memory.getDeletes -- SyncRepository.memory.getDeletes
select select
"id", "id",
@ -478,7 +455,7 @@ from
"memory_audit" "memory_audit"
where where
"userId" = $1 "userId" = $1
and "deletedAt" < now() - interval '1 millisecond' and "id" < $2
order by order by
"id" asc "id" asc
@ -501,7 +478,7 @@ from
"memory" "memory"
where where
"ownerId" = $1 "ownerId" = $1
and "updatedAt" < now() - interval '1 millisecond' and "updateId" < $2
order by order by
"updateId" asc "updateId" asc
@ -521,7 +498,7 @@ where
where where
"ownerId" = $1 "ownerId" = $1
) )
and "deletedAt" < now() - interval '1 millisecond' and "id" < $2
order by order by
"id" asc "id" asc
@ -541,7 +518,7 @@ where
where where
"ownerId" = $1 "ownerId" = $1
) )
and "updatedAt" < now() - interval '1 millisecond' and "updateId" < $2
order by order by
"updateId" asc "updateId" asc
@ -553,8 +530,7 @@ from
"partner" "partner"
where where
"sharedWithId" = $1 "sharedWithId" = $1
and "createId" >= $2 and "createId" < $2
and "createdAt" < now() - interval '1 millisecond'
order by order by
"partner"."createId" asc "partner"."createId" asc
@ -570,7 +546,7 @@ where
"sharedById" = $1 "sharedById" = $1
or "sharedWithId" = $2 or "sharedWithId" = $2
) )
and "deletedAt" < now() - interval '1 millisecond' and "id" < $3
order by order by
"id" asc "id" asc
@ -587,7 +563,7 @@ where
"sharedById" = $1 "sharedById" = $1
or "sharedWithId" = $2 or "sharedWithId" = $2
) )
and "updatedAt" < now() - interval '1 millisecond' and "updateId" < $3
order by order by
"updateId" asc "updateId" asc
@ -614,9 +590,9 @@ from
"asset" "asset"
where where
"ownerId" = $1 "ownerId" = $1
and "updatedAt" < now() - interval '1 millisecond' and "updateId" < $2
and "updateId" <= $2 and "updateId" <= $3
and "updateId" >= $3 and "updateId" >= $4
order by order by
"updateId" asc "updateId" asc
@ -635,7 +611,7 @@ where
where where
"sharedWithId" = $1 "sharedWithId" = $1
) )
and "deletedAt" < now() - interval '1 millisecond' and "id" < $2
order by order by
"id" asc "id" asc
@ -669,7 +645,7 @@ where
where where
"sharedWithId" = $1 "sharedWithId" = $1
) )
and "updatedAt" < now() - interval '1 millisecond' and "updateId" < $2
order by order by
"updateId" asc "updateId" asc
@ -706,9 +682,9 @@ from
inner join "asset" on "asset"."id" = "asset_exif"."assetId" inner join "asset" on "asset"."id" = "asset_exif"."assetId"
where where
"asset"."ownerId" = $1 "asset"."ownerId" = $1
and "asset_exif"."updatedAt" < now() - interval '1 millisecond' and "asset_exif"."updateId" < $2
and "asset_exif"."updateId" <= $2 and "asset_exif"."updateId" <= $3
and "asset_exif"."updateId" >= $3 and "asset_exif"."updateId" >= $4
order by order by
"asset_exif"."updateId" asc "asset_exif"."updateId" asc
@ -758,7 +734,7 @@ where
"sharedWithId" = $1 "sharedWithId" = $1
) )
) )
and "updatedAt" < now() - interval '1 millisecond' and "updateId" < $2
order by order by
"updateId" asc "updateId" asc
@ -777,7 +753,7 @@ where
where where
"sharedWithId" = $1 "sharedWithId" = $1
) )
and "deletedAt" < now() - interval '1 millisecond' and "id" < $2
order by order by
"id" asc "id" asc
@ -793,9 +769,9 @@ from
"stack" "stack"
where where
"ownerId" = $1 "ownerId" = $1
and "updatedAt" < now() - interval '1 millisecond' and "updateId" < $2
and "updateId" <= $2 and "updateId" <= $3
and "updateId" >= $3 and "updateId" >= $4
order by order by
"updateId" asc "updateId" asc
@ -818,7 +794,7 @@ where
where where
"sharedWithId" = $1 "sharedWithId" = $1
) )
and "updatedAt" < now() - interval '1 millisecond' and "updateId" < $2
order by order by
"updateId" asc "updateId" asc
@ -830,7 +806,7 @@ from
"person_audit" "person_audit"
where where
"ownerId" = $1 "ownerId" = $1
and "deletedAt" < now() - interval '1 millisecond' and "id" < $2
order by order by
"id" asc "id" asc
@ -851,7 +827,7 @@ from
"person" "person"
where where
"ownerId" = $1 "ownerId" = $1
and "updatedAt" < now() - interval '1 millisecond' and "updateId" < $2
order by order by
"updateId" asc "updateId" asc
@ -863,7 +839,7 @@ from
"stack_audit" "stack_audit"
where where
"userId" = $1 "userId" = $1
and "deletedAt" < now() - interval '1 millisecond' and "id" < $2
order by order by
"id" asc "id" asc
@ -879,7 +855,7 @@ from
"stack" "stack"
where where
"ownerId" = $1 "ownerId" = $1
and "updatedAt" < now() - interval '1 millisecond' and "updateId" < $2
order by order by
"updateId" asc "updateId" asc
@ -890,7 +866,7 @@ select
from from
"user_audit" "user_audit"
where where
"deletedAt" < now() - interval '1 millisecond' "id" < $1
order by order by
"id" asc "id" asc
@ -907,7 +883,7 @@ select
from from
"user" "user"
where where
"updatedAt" < now() - interval '1 millisecond' "updateId" < $1
order by order by
"updateId" asc "updateId" asc
@ -920,7 +896,7 @@ from
"user_metadata_audit" "user_metadata_audit"
where where
"userId" = $1 "userId" = $1
and "deletedAt" < now() - interval '1 millisecond' and "id" < $2
order by order by
"id" asc "id" asc
@ -934,6 +910,6 @@ from
"user_metadata" "user_metadata"
where where
"userId" = $1 "userId" = $1
and "updatedAt" < now() - interval '1 millisecond' and "updateId" < $2
order by order by
"updateId" asc "updateId" asc

View File

@ -1,5 +1,5 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { Insertable, Kysely } from 'kysely'; import { Insertable, Kysely, sql } from 'kysely';
import { InjectKysely } from 'nestjs-kysely'; import { InjectKysely } from 'nestjs-kysely';
import { DummyValue, GenerateSql } from 'src/decorators'; import { DummyValue, GenerateSql } from 'src/decorators';
import { SyncEntityType } from 'src/enum'; import { SyncEntityType } from 'src/enum';
@ -39,4 +39,13 @@ export class SyncCheckpointRepository {
.$if(!!types, (qb) => qb.where('type', 'in', types!)) .$if(!!types, (qb) => qb.where('type', 'in', types!))
.execute(); .execute();
} }
@GenerateSql()
getNow() {
return this.db
.selectNoFrom((eb) => [
eb.fn<string>('immich_uuid_v7', [sql.raw<Date>("now() - interval '1 millisecond'")]).as('nowId'),
])
.executeTakeFirstOrThrow();
}
} }

View File

@ -33,6 +33,11 @@ type UpsertTables =
| 'user_metadata' | 'user_metadata'
| 'asset_face'; | 'asset_face';
export type SyncQueryOptions = {
nowId: string;
userId: string;
};
@Injectable() @Injectable()
export class SyncRepository { export class SyncRepository {
album: AlbumSync; album: AlbumSync;
@ -81,21 +86,21 @@ export class SyncRepository {
class BaseSync { class BaseSync {
constructor(protected db: Kysely<DB>) {} constructor(protected db: Kysely<DB>) {}
protected auditTableFilters(ack?: SyncAck) { protected auditTableFilters(nowId: string, ack?: SyncAck) {
return <T extends keyof Pick<DB, AuditTables>, D>(qb: SelectQueryBuilder<DB, T, D>) => { return <T extends keyof Pick<DB, AuditTables>, D>(qb: SelectQueryBuilder<DB, T, D>) => {
const builder = qb as SelectQueryBuilder<DB, AuditTables, D>; const builder = qb as SelectQueryBuilder<DB, AuditTables, D>;
return builder return builder
.where('deletedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('id', '<', nowId)
.$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId))
.orderBy('id', 'asc') as SelectQueryBuilder<DB, T, D>; .orderBy('id', 'asc') as SelectQueryBuilder<DB, T, D>;
}; };
} }
protected upsertTableFilters(ack?: SyncAck) { protected upsertTableFilters(nowId: string, ack?: SyncAck) {
return <T extends keyof Pick<DB, UpsertTables>, D>(qb: SelectQueryBuilder<DB, T, D>) => { return <T extends keyof Pick<DB, UpsertTables>, D>(qb: SelectQueryBuilder<DB, T, D>) => {
const builder = qb as SelectQueryBuilder<DB, UpsertTables, D>; const builder = qb as SelectQueryBuilder<DB, UpsertTables, D>;
return builder return builder
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('updateId', '<', nowId)
.$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId))
.orderBy('updateId', 'asc') as SelectQueryBuilder<DB, T, D>; .orderBy('updateId', 'asc') as SelectQueryBuilder<DB, T, D>;
}; };
@ -103,34 +108,34 @@ class BaseSync {
} }
class AlbumSync extends BaseSync { class AlbumSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID] }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID] })
getCreatedAfter(userId: string, afterCreateId?: string) { getCreatedAfter({ nowId, userId }: SyncQueryOptions, afterCreateId?: string) {
return this.db return this.db
.selectFrom('album_user') .selectFrom('album_user')
.select(['albumsId as id', 'createId']) .select(['albumsId as id', 'createId'])
.where('usersId', '=', userId) .where('usersId', '=', userId)
.$if(!!afterCreateId, (qb) => qb.where('createId', '>=', afterCreateId!)) .$if(!!afterCreateId, (qb) => qb.where('createId', '>=', afterCreateId!))
.where('createdAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('createId', '<', nowId)
.orderBy('createId', 'asc') .orderBy('createId', 'asc')
.execute(); .execute();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes(userId: string, ack?: SyncAck) { getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('album_audit') .selectFrom('album_audit')
.select(['id', 'albumId']) .select(['id', 'albumId'])
.where('userId', '=', userId) .where('userId', '=', userId)
.$call(this.auditTableFilters(ack)) .$call(this.auditTableFilters(nowId, ack))
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts(userId: string, ack?: SyncAck) { getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('album') .selectFrom('album')
.distinctOn(['album.id', 'album.updateId']) .distinctOn(['album.id', 'album.updateId'])
.where('album.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('album.updateId', '<', nowId)
.$if(!!ack, (qb) => qb.where('album.updateId', '>', ack!.updateId)) .$if(!!ack, (qb) => qb.where('album.updateId', '>', ack!.updateId))
.orderBy('album.updateId', 'asc') .orderBy('album.updateId', 'asc')
.leftJoin('album_user as album_users', 'album.id', 'album_users.albumsId') .leftJoin('album_user as album_users', 'album.id', 'album_users.albumsId')
@ -152,29 +157,32 @@ class AlbumSync extends BaseSync {
} }
class AlbumAssetSync extends BaseSync { class AlbumAssetSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) @GenerateSql({
getBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID],
stream: true,
})
getBackfill({ nowId }: SyncQueryOptions, albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
return this.db return this.db
.selectFrom('album_asset') .selectFrom('album_asset')
.innerJoin('asset', 'asset.id', 'album_asset.assetsId') .innerJoin('asset', 'asset.id', 'album_asset.assetsId')
.select(columns.syncAsset) .select(columns.syncAsset)
.select('album_asset.updateId') .select('album_asset.updateId')
.where('album_asset.albumsId', '=', albumId) .where('album_asset.albumsId', '=', albumId)
.where('album_asset.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('album_asset.updateId', '<', nowId)
.where('album_asset.updateId', '<=', beforeUpdateId) .where('album_asset.updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('album_asset.updateId', '>=', afterUpdateId!)) .$if(!!afterUpdateId, (eb) => eb.where('album_asset.updateId', '>=', afterUpdateId!))
.orderBy('album_asset.updateId', 'asc') .orderBy('album_asset.updateId', 'asc')
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpdates(userId: string, albumToAssetAck: SyncAck, ack?: SyncAck) { getUpdates({ nowId, userId }: SyncQueryOptions, albumToAssetAck: SyncAck, ack?: SyncAck) {
return this.db return this.db
.selectFrom('asset') .selectFrom('asset')
.innerJoin('album_asset', 'album_asset.assetsId', 'asset.id') .innerJoin('album_asset', 'album_asset.assetsId', 'asset.id')
.select(columns.syncAsset) .select(columns.syncAsset)
.select('asset.updateId') .select('asset.updateId')
.where('asset.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('asset.updateId', '<', nowId)
.where('album_asset.updateId', '<=', albumToAssetAck.updateId) // Ensure we only send updates for assets that the client already knows about .where('album_asset.updateId', '<=', albumToAssetAck.updateId) // Ensure we only send updates for assets that the client already knows about
.$if(!!ack, (qb) => qb.where('asset.updateId', '>', ack!.updateId)) .$if(!!ack, (qb) => qb.where('asset.updateId', '>', ack!.updateId))
.orderBy('asset.updateId', 'asc') .orderBy('asset.updateId', 'asc')
@ -184,8 +192,8 @@ class AlbumAssetSync extends BaseSync {
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getCreates(userId: string, ack?: SyncAck) { getCreates({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('album_asset') .selectFrom('album_asset')
.select('album_asset.updateId') .select('album_asset.updateId')
@ -193,7 +201,7 @@ class AlbumAssetSync extends BaseSync {
.select(columns.syncAsset) .select(columns.syncAsset)
.innerJoin('album', 'album.id', 'album_asset.albumsId') .innerJoin('album', 'album.id', 'album_asset.albumsId')
.leftJoin('album_user', 'album_user.albumsId', 'album_asset.albumsId') .leftJoin('album_user', 'album_user.albumsId', 'album_asset.albumsId')
.where('album_asset.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('album_asset.updateId', '<', nowId)
.where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_user.usersId', '=', userId)])) .where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_user.usersId', '=', userId)]))
.$if(!!ack, (qb) => qb.where('album_asset.updateId', '>', ack!.updateId)) .$if(!!ack, (qb) => qb.where('album_asset.updateId', '>', ack!.updateId))
.orderBy('album_asset.updateId', 'asc') .orderBy('album_asset.updateId', 'asc')
@ -202,29 +210,32 @@ class AlbumAssetSync extends BaseSync {
} }
class AlbumAssetExifSync extends BaseSync { class AlbumAssetExifSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) @GenerateSql({
getBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID],
stream: true,
})
getBackfill({ nowId }: SyncQueryOptions, albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
return this.db return this.db
.selectFrom('asset_exif') .selectFrom('asset_exif')
.innerJoin('album_asset', 'album_asset.assetsId', 'asset_exif.assetId') .innerJoin('album_asset', 'album_asset.assetsId', 'asset_exif.assetId')
.select(columns.syncAssetExif) .select(columns.syncAssetExif)
.select('asset_exif.updateId') .select('asset_exif.updateId')
.where('album_asset.albumsId', '=', albumId) .where('album_asset.albumsId', '=', albumId)
.where('asset_exif.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('asset_exif.updateId', '<', nowId)
.where('asset_exif.updateId', '<=', beforeUpdateId) .where('asset_exif.updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('asset_exif.updateId', '>=', afterUpdateId!)) .$if(!!afterUpdateId, (eb) => eb.where('asset_exif.updateId', '>=', afterUpdateId!))
.orderBy('asset_exif.updateId', 'asc') .orderBy('asset_exif.updateId', 'asc')
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts(userId: string, ack?: SyncAck) { getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('asset_exif') .selectFrom('asset_exif')
.innerJoin('album_asset', 'album_asset.assetsId', 'asset_exif.assetId') .innerJoin('album_asset', 'album_asset.assetsId', 'asset_exif.assetId')
.select(columns.syncAssetExif) .select(columns.syncAssetExif)
.select('asset_exif.updateId') .select('asset_exif.updateId')
.where('asset_exif.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('asset_exif.updateId', '<', nowId)
.$if(!!ack, (qb) => qb.where('asset_exif.updateId', '>', ack!.updateId)) .$if(!!ack, (qb) => qb.where('asset_exif.updateId', '>', ack!.updateId))
.orderBy('asset_exif.updateId', 'asc') .orderBy('asset_exif.updateId', 'asc')
.innerJoin('album', 'album.id', 'album_asset.albumsId') .innerJoin('album', 'album.id', 'album_asset.albumsId')
@ -235,21 +246,29 @@ class AlbumAssetExifSync extends BaseSync {
} }
class AlbumToAssetSync extends BaseSync { class AlbumToAssetSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) @GenerateSql({
getBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID],
stream: true,
})
getBackfill(
{ nowId, userId }: SyncQueryOptions,
albumId: string,
afterUpdateId: string | undefined,
beforeUpdateId: string,
) {
return this.db return this.db
.selectFrom('album_asset as album_assets') .selectFrom('album_asset as album_assets')
.select(['album_assets.assetsId as assetId', 'album_assets.albumsId as albumId', 'album_assets.updateId']) .select(['album_assets.assetsId as assetId', 'album_assets.albumsId as albumId', 'album_assets.updateId'])
.where('album_assets.albumsId', '=', albumId) .where('album_assets.albumsId', '=', albumId)
.where('album_assets.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('album_assets.updateId', '<', nowId)
.where('album_assets.updateId', '<=', beforeUpdateId) .where('album_assets.updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('album_assets.updateId', '>=', afterUpdateId!)) .$if(!!afterUpdateId, (eb) => eb.where('album_assets.updateId', '>=', afterUpdateId!))
.orderBy('album_assets.updateId', 'asc') .orderBy('album_assets.updateId', 'asc')
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes(userId: string, ack?: SyncAck) { getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('album_asset_audit') .selectFrom('album_asset_audit')
.select(['id', 'assetId', 'albumId']) .select(['id', 'assetId', 'albumId'])
@ -271,16 +290,16 @@ class AlbumToAssetSync extends BaseSync {
), ),
), ),
) )
.$call(this.auditTableFilters(ack)) .$call(this.auditTableFilters(nowId, ack))
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts(userId: string, ack?: SyncAck) { getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('album_asset') .selectFrom('album_asset')
.select(['album_asset.assetsId as assetId', 'album_asset.albumsId as albumId', 'album_asset.updateId']) .select(['album_asset.assetsId as assetId', 'album_asset.albumsId as albumId', 'album_asset.updateId'])
.where('album_asset.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('album_asset.updateId', '<', nowId)
.$if(!!ack, (qb) => qb.where('album_asset.updateId', '>', ack!.updateId)) .$if(!!ack, (qb) => qb.where('album_asset.updateId', '>', ack!.updateId))
.orderBy('album_asset.updateId', 'asc') .orderBy('album_asset.updateId', 'asc')
.innerJoin('album', 'album.id', 'album_asset.albumsId') .innerJoin('album', 'album.id', 'album_asset.albumsId')
@ -291,22 +310,25 @@ class AlbumToAssetSync extends BaseSync {
} }
class AlbumUserSync extends BaseSync { class AlbumUserSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) @GenerateSql({
getBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID],
stream: true,
})
getBackfill({ nowId }: SyncQueryOptions, albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
return this.db return this.db
.selectFrom('album_user') .selectFrom('album_user')
.select(columns.syncAlbumUser) .select(columns.syncAlbumUser)
.select('album_user.updateId') .select('album_user.updateId')
.where('albumsId', '=', albumId) .where('albumsId', '=', albumId)
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('updateId', '<', nowId)
.where('updateId', '<=', beforeUpdateId) .where('updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!)) .$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!))
.orderBy('updateId', 'asc') .orderBy('updateId', 'asc')
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes(userId: string, ack?: SyncAck) { getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('album_user_audit') .selectFrom('album_user_audit')
.select(['id', 'userId', 'albumId']) .select(['id', 'userId', 'albumId'])
@ -328,17 +350,17 @@ class AlbumUserSync extends BaseSync {
), ),
), ),
) )
.$call(this.auditTableFilters(ack)) .$call(this.auditTableFilters(nowId, ack))
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts(userId: string, ack?: SyncAck) { getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('album_user') .selectFrom('album_user')
.select(columns.syncAlbumUser) .select(columns.syncAlbumUser)
.select('album_user.updateId') .select('album_user.updateId')
.where('album_user.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('album_user.updateId', '<', nowId)
.$if(!!ack, (qb) => qb.where('album_user.updateId', '>', ack!.updateId)) .$if(!!ack, (qb) => qb.where('album_user.updateId', '>', ack!.updateId))
.orderBy('album_user.updateId', 'asc') .orderBy('album_user.updateId', 'asc')
.where((eb) => .where((eb) =>
@ -364,53 +386,53 @@ class AlbumUserSync extends BaseSync {
} }
class AssetSync extends BaseSync { class AssetSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes(userId: string, ack?: SyncAck) { getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('asset_audit') .selectFrom('asset_audit')
.select(['id', 'assetId']) .select(['id', 'assetId'])
.where('ownerId', '=', userId) .where('ownerId', '=', userId)
.$call(this.auditTableFilters(ack)) .$call(this.auditTableFilters(nowId, ack))
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts(userId: string, ack?: SyncAck) { getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('asset') .selectFrom('asset')
.select(columns.syncAsset) .select(columns.syncAsset)
.select('asset.updateId') .select('asset.updateId')
.where('ownerId', '=', userId) .where('ownerId', '=', userId)
.$call(this.upsertTableFilters(ack)) .$call(this.upsertTableFilters(nowId, ack))
.stream(); .stream();
} }
} }
class AuthUserSync extends BaseSync { class AuthUserSync extends BaseSync {
@GenerateSql({ params: [], stream: true }) @GenerateSql({ params: [], stream: true })
getUpserts(ack?: SyncAck) { getUpserts({ nowId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('user') .selectFrom('user')
.select(columns.syncUser) .select(columns.syncUser)
.select(['isAdmin', 'pinCode', 'oauthId', 'storageLabel', 'quotaSizeInBytes', 'quotaUsageInBytes']) .select(['isAdmin', 'pinCode', 'oauthId', 'storageLabel', 'quotaSizeInBytes', 'quotaUsageInBytes'])
.$call(this.upsertTableFilters(ack)) .$call(this.upsertTableFilters(nowId, ack))
.stream(); .stream();
} }
} }
class PersonSync extends BaseSync { class PersonSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes(userId: string, ack?: SyncAck) { getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('person_audit') .selectFrom('person_audit')
.select(['id', 'personId']) .select(['id', 'personId'])
.where('ownerId', '=', userId) .where('ownerId', '=', userId)
.$call(this.auditTableFilters(ack)) .$call(this.auditTableFilters(nowId, ack))
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts(userId: string, ack?: SyncAck) { getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('person') .selectFrom('person')
.select([ .select([
@ -427,27 +449,27 @@ class PersonSync extends BaseSync {
'faceAssetId', 'faceAssetId',
]) ])
.where('ownerId', '=', userId) .where('ownerId', '=', userId)
.$call(this.upsertTableFilters(ack)) .$call(this.upsertTableFilters(nowId, ack))
.stream(); .stream();
} }
} }
class AssetFaceSync extends BaseSync { class AssetFaceSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes(userId: string, ack?: SyncAck) { getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('asset_face_audit') .selectFrom('asset_face_audit')
.select(['asset_face_audit.id', 'assetFaceId']) .select(['asset_face_audit.id', 'assetFaceId'])
.orderBy('asset_face_audit.id', 'asc') .orderBy('asset_face_audit.id', 'asc')
.leftJoin('asset', 'asset.id', 'asset_face_audit.assetId') .leftJoin('asset', 'asset.id', 'asset_face_audit.assetId')
.where('asset.ownerId', '=', userId) .where('asset.ownerId', '=', userId)
.where('asset_face_audit.deletedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('asset_face_audit.id', '<', nowId)
.$if(!!ack, (qb) => qb.where('asset_face_audit.id', '>', ack!.updateId)) .$if(!!ack, (qb) => qb.where('asset_face_audit.id', '>', ack!.updateId))
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts(userId: string, ack?: SyncAck) { getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('asset_face') .selectFrom('asset_face')
.select([ .select([
@ -463,7 +485,7 @@ class AssetFaceSync extends BaseSync {
'sourceType', 'sourceType',
'asset_face.updateId', 'asset_face.updateId',
]) ])
.where('asset_face.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('asset_face.updateId', '<', nowId)
.$if(!!ack, (qb) => qb.where('asset_face.updateId', '>', ack!.updateId)) .$if(!!ack, (qb) => qb.where('asset_face.updateId', '>', ack!.updateId))
.orderBy('asset_face.updateId', 'asc') .orderBy('asset_face.updateId', 'asc')
.leftJoin('asset', 'asset.id', 'asset_face.assetId') .leftJoin('asset', 'asset.id', 'asset_face.assetId')
@ -473,31 +495,31 @@ class AssetFaceSync extends BaseSync {
} }
class AssetExifSync extends BaseSync { class AssetExifSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts(userId: string, ack?: SyncAck) { getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('asset_exif') .selectFrom('asset_exif')
.select(columns.syncAssetExif) .select(columns.syncAssetExif)
.select('asset_exif.updateId') .select('asset_exif.updateId')
.where('assetId', 'in', (eb) => eb.selectFrom('asset').select('id').where('ownerId', '=', userId)) .where('assetId', 'in', (eb) => eb.selectFrom('asset').select('id').where('ownerId', '=', userId))
.$call(this.upsertTableFilters(ack)) .$call(this.upsertTableFilters(nowId, ack))
.stream(); .stream();
} }
} }
class MemorySync extends BaseSync { class MemorySync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes(userId: string, ack?: SyncAck) { getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('memory_audit') .selectFrom('memory_audit')
.select(['id', 'memoryId']) .select(['id', 'memoryId'])
.where('userId', '=', userId) .where('userId', '=', userId)
.$call(this.auditTableFilters(ack)) .$call(this.auditTableFilters(nowId, ack))
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts(userId: string, ack?: SyncAck) { getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('memory') .selectFrom('memory')
.select([ .select([
@ -516,97 +538,105 @@ class MemorySync extends BaseSync {
]) ])
.select('updateId') .select('updateId')
.where('ownerId', '=', userId) .where('ownerId', '=', userId)
.$call(this.upsertTableFilters(ack)) .$call(this.upsertTableFilters(nowId, ack))
.stream(); .stream();
} }
} }
class MemoryToAssetSync extends BaseSync { class MemoryToAssetSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes(userId: string, ack?: SyncAck) { getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('memory_asset_audit') .selectFrom('memory_asset_audit')
.select(['id', 'memoryId', 'assetId']) .select(['id', 'memoryId', 'assetId'])
.where('memoryId', 'in', (eb) => eb.selectFrom('memory').select('id').where('ownerId', '=', userId)) .where('memoryId', 'in', (eb) => eb.selectFrom('memory').select('id').where('ownerId', '=', userId))
.$call(this.auditTableFilters(ack)) .$call(this.auditTableFilters(nowId, ack))
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts(userId: string, ack?: SyncAck) { getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('memory_asset') .selectFrom('memory_asset')
.select(['memoriesId as memoryId', 'assetsId as assetId']) .select(['memoriesId as memoryId', 'assetsId as assetId'])
.select('updateId') .select('updateId')
.where('memoriesId', 'in', (eb) => eb.selectFrom('memory').select('id').where('ownerId', '=', userId)) .where('memoriesId', 'in', (eb) => eb.selectFrom('memory').select('id').where('ownerId', '=', userId))
.$call(this.upsertTableFilters(ack)) .$call(this.upsertTableFilters(nowId, ack))
.stream(); .stream();
} }
} }
class PartnerSync extends BaseSync { class PartnerSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID] }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }] })
getCreatedAfter(userId: string, afterCreateId?: string) { getCreatedAfter({ nowId, userId }: SyncQueryOptions, afterCreateId?: string) {
return this.db return this.db
.selectFrom('partner') .selectFrom('partner')
.select(['sharedById', 'createId']) .select(['sharedById', 'createId'])
.where('sharedWithId', '=', userId) .where('sharedWithId', '=', userId)
.$if(!!afterCreateId, (qb) => qb.where('createId', '>=', afterCreateId!)) .$if(!!afterCreateId, (qb) => qb.where('createId', '>=', afterCreateId!))
.where('createdAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('createId', '<', nowId)
.orderBy('partner.createId', 'asc') .orderBy('partner.createId', 'asc')
.execute(); .execute();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes(userId: string, ack?: SyncAck) { getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('partner_audit') .selectFrom('partner_audit')
.select(['id', 'sharedById', 'sharedWithId']) .select(['id', 'sharedById', 'sharedWithId'])
.where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)]))
.$call(this.auditTableFilters(ack)) .$call(this.auditTableFilters(nowId, ack))
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts(userId: string, ack?: SyncAck) { getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('partner') .selectFrom('partner')
.select(['sharedById', 'sharedWithId', 'inTimeline', 'updateId']) .select(['sharedById', 'sharedWithId', 'inTimeline', 'updateId'])
.where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)]))
.$call(this.upsertTableFilters(ack)) .$call(this.upsertTableFilters(nowId, ack))
.stream(); .stream();
} }
} }
class PartnerAssetsSync extends BaseSync { class PartnerAssetsSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) @GenerateSql({
getBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID],
stream: true,
})
getBackfill(
{ nowId }: SyncQueryOptions,
partnerId: string,
afterUpdateId: string | undefined,
beforeUpdateId: string,
) {
return this.db return this.db
.selectFrom('asset') .selectFrom('asset')
.select(columns.syncAsset) .select(columns.syncAsset)
.select('asset.updateId') .select('asset.updateId')
.where('ownerId', '=', partnerId) .where('ownerId', '=', partnerId)
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('updateId', '<', nowId)
.where('updateId', '<=', beforeUpdateId) .where('updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!)) .$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!))
.orderBy('updateId', 'asc') .orderBy('updateId', 'asc')
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes(userId: string, ack?: SyncAck) { getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('asset_audit') .selectFrom('asset_audit')
.select(['id', 'assetId']) .select(['id', 'assetId'])
.where('ownerId', 'in', (eb) => .where('ownerId', 'in', (eb) =>
eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId), eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId),
) )
.$call(this.auditTableFilters(ack)) .$call(this.auditTableFilters(nowId, ack))
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts(userId: string, ack?: SyncAck) { getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('asset') .selectFrom('asset')
.select(columns.syncAsset) .select(columns.syncAsset)
@ -614,29 +644,37 @@ class PartnerAssetsSync extends BaseSync {
.where('ownerId', 'in', (eb) => .where('ownerId', 'in', (eb) =>
eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId), eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId),
) )
.$call(this.upsertTableFilters(ack)) .$call(this.upsertTableFilters(nowId, ack))
.stream(); .stream();
} }
} }
class PartnerAssetExifsSync extends BaseSync { class PartnerAssetExifsSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) @GenerateSql({
getBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID],
stream: true,
})
getBackfill(
{ nowId }: SyncQueryOptions,
partnerId: string,
afterUpdateId: string | undefined,
beforeUpdateId: string,
) {
return this.db return this.db
.selectFrom('asset_exif') .selectFrom('asset_exif')
.select(columns.syncAssetExif) .select(columns.syncAssetExif)
.select('asset_exif.updateId') .select('asset_exif.updateId')
.innerJoin('asset', 'asset.id', 'asset_exif.assetId') .innerJoin('asset', 'asset.id', 'asset_exif.assetId')
.where('asset.ownerId', '=', partnerId) .where('asset.ownerId', '=', partnerId)
.where('asset_exif.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('asset_exif.updateId', '<', nowId)
.where('asset_exif.updateId', '<=', beforeUpdateId) .where('asset_exif.updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('asset_exif.updateId', '>=', afterUpdateId!)) .$if(!!afterUpdateId, (eb) => eb.where('asset_exif.updateId', '>=', afterUpdateId!))
.orderBy('asset_exif.updateId', 'asc') .orderBy('asset_exif.updateId', 'asc')
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts(userId: string, ack?: SyncAck) { getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('asset_exif') .selectFrom('asset_exif')
.select(columns.syncAssetExif) .select(columns.syncAssetExif)
@ -649,61 +687,69 @@ class PartnerAssetExifsSync extends BaseSync {
eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId), eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId),
), ),
) )
.$call(this.upsertTableFilters(ack)) .$call(this.upsertTableFilters(nowId, ack))
.stream(); .stream();
} }
} }
class StackSync extends BaseSync { class StackSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes(userId: string, ack?: SyncAck) { getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('stack_audit') .selectFrom('stack_audit')
.select(['id', 'stackId']) .select(['id', 'stackId'])
.where('userId', '=', userId) .where('userId', '=', userId)
.$call(this.auditTableFilters(ack)) .$call(this.auditTableFilters(nowId, ack))
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts(userId: string, ack?: SyncAck) { getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('stack') .selectFrom('stack')
.select(columns.syncStack) .select(columns.syncStack)
.select('updateId') .select('updateId')
.where('ownerId', '=', userId) .where('ownerId', '=', userId)
.$call(this.upsertTableFilters(ack)) .$call(this.upsertTableFilters(nowId, ack))
.stream(); .stream();
} }
} }
class PartnerStackSync extends BaseSync { class PartnerStackSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes(userId: string, ack?: SyncAck) { getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('stack_audit') .selectFrom('stack_audit')
.select(['id', 'stackId']) .select(['id', 'stackId'])
.where('userId', 'in', (eb) => eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId)) .where('userId', 'in', (eb) => eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId))
.$call(this.auditTableFilters(ack)) .$call(this.auditTableFilters(nowId, ack))
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true }) @GenerateSql({
getBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) { params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID],
stream: true,
})
getBackfill(
{ nowId }: SyncQueryOptions,
partnerId: string,
afterUpdateId: string | undefined,
beforeUpdateId: string,
) {
return this.db return this.db
.selectFrom('stack') .selectFrom('stack')
.select(columns.syncStack) .select(columns.syncStack)
.select('updateId') .select('updateId')
.where('ownerId', '=', partnerId) .where('ownerId', '=', partnerId)
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .where('updateId', '<', nowId)
.where('updateId', '<=', beforeUpdateId) .where('updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!)) .$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!))
.orderBy('updateId', 'asc') .orderBy('updateId', 'asc')
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts(userId: string, ack?: SyncAck) { getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('stack') .selectFrom('stack')
.select(columns.syncStack) .select(columns.syncStack)
@ -711,41 +757,41 @@ class PartnerStackSync extends BaseSync {
.where('ownerId', 'in', (eb) => .where('ownerId', 'in', (eb) =>
eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId), eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId),
) )
.$call(this.upsertTableFilters(ack)) .$call(this.upsertTableFilters(nowId, ack))
.stream(); .stream();
} }
} }
class UserSync extends BaseSync { class UserSync extends BaseSync {
@GenerateSql({ params: [], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes(ack?: SyncAck) { getDeletes({ nowId }: SyncQueryOptions, ack?: SyncAck) {
return this.db.selectFrom('user_audit').select(['id', 'userId']).$call(this.auditTableFilters(ack)).stream(); return this.db.selectFrom('user_audit').select(['id', 'userId']).$call(this.auditTableFilters(nowId, ack)).stream();
} }
@GenerateSql({ params: [], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts(ack?: SyncAck) { getUpserts({ nowId }: SyncQueryOptions, ack?: SyncAck) {
return this.db.selectFrom('user').select(columns.syncUser).$call(this.upsertTableFilters(ack)).stream(); return this.db.selectFrom('user').select(columns.syncUser).$call(this.upsertTableFilters(nowId, ack)).stream();
} }
} }
class UserMetadataSync extends BaseSync { class UserMetadataSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes(userId: string, ack?: SyncAck) { getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('user_metadata_audit') .selectFrom('user_metadata_audit')
.select(['id', 'userId', 'key']) .select(['id', 'userId', 'key'])
.where('userId', '=', userId) .where('userId', '=', userId)
.$call(this.auditTableFilters(ack)) .$call(this.auditTableFilters(nowId, ack))
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.UUID], stream: true }) @GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts(userId: string, ack?: SyncAck) { getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db return this.db
.selectFrom('user_metadata') .selectFrom('user_metadata')
.select(['userId', 'key', 'value', 'updateId']) .select(['userId', 'key', 'value', 'updateId'])
.where('userId', '=', userId) .where('userId', '=', userId)
.$call(this.upsertTableFilters(ack)) .$call(this.upsertTableFilters(nowId, ack))
.stream(); .stream();
} }
} }

View File

@ -16,6 +16,7 @@ import {
SyncStreamDto, SyncStreamDto,
} from 'src/dtos/sync.dto'; } from 'src/dtos/sync.dto';
import { AssetVisibility, DatabaseAction, EntityType, Permission, SyncEntityType, SyncRequestType } from 'src/enum'; import { AssetVisibility, DatabaseAction, EntityType, Permission, SyncEntityType, SyncRequestType } from 'src/enum';
import { SyncQueryOptions } from 'src/repositories/sync.repository';
import { SessionSyncCheckpointTable } from 'src/schema/tables/sync-checkpoint.table'; import { SessionSyncCheckpointTable } from 'src/schema/tables/sync-checkpoint.table';
import { BaseService } from 'src/services/base.service'; import { BaseService } from 'src/services/base.service';
import { SyncAck } from 'src/types'; import { SyncAck } from 'src/types';
@ -138,30 +139,34 @@ export class SyncService extends BaseService {
return; return;
} }
const { nowId } = await this.syncCheckpointRepository.getNow();
const options: SyncQueryOptions = { nowId, userId: auth.user.id };
const checkpoints = await this.syncCheckpointRepository.getAll(session.id); const checkpoints = await this.syncCheckpointRepository.getAll(session.id);
const checkpointMap: CheckpointMap = Object.fromEntries(checkpoints.map(({ type, ack }) => [type, fromAck(ack)])); const checkpointMap: CheckpointMap = Object.fromEntries(checkpoints.map(({ type, ack }) => [type, fromAck(ack)]));
const handlers: Record<SyncRequestType, () => Promise<void>> = { const handlers: Record<SyncRequestType, () => Promise<void>> = {
[SyncRequestType.AuthUsersV1]: () => this.syncAuthUsersV1(response, checkpointMap), [SyncRequestType.AuthUsersV1]: () => this.syncAuthUsersV1(options, response, checkpointMap),
[SyncRequestType.UsersV1]: () => this.syncUsersV1(response, checkpointMap), [SyncRequestType.UsersV1]: () => this.syncUsersV1(options, response, checkpointMap),
[SyncRequestType.PartnersV1]: () => this.syncPartnersV1(response, checkpointMap, auth), [SyncRequestType.PartnersV1]: () => this.syncPartnersV1(options, response, checkpointMap),
[SyncRequestType.AssetsV1]: () => this.syncAssetsV1(response, checkpointMap, auth), [SyncRequestType.AssetsV1]: () => this.syncAssetsV1(options, response, checkpointMap),
[SyncRequestType.AssetExifsV1]: () => this.syncAssetExifsV1(response, checkpointMap, auth), [SyncRequestType.AssetExifsV1]: () => this.syncAssetExifsV1(options, response, checkpointMap),
[SyncRequestType.PartnerAssetsV1]: () => this.syncPartnerAssetsV1(response, checkpointMap, auth, session.id), [SyncRequestType.PartnerAssetsV1]: () => this.syncPartnerAssetsV1(options, response, checkpointMap, session.id),
[SyncRequestType.PartnerAssetExifsV1]: () => [SyncRequestType.PartnerAssetExifsV1]: () =>
this.syncPartnerAssetExifsV1(response, checkpointMap, auth, session.id), this.syncPartnerAssetExifsV1(options, response, checkpointMap, session.id),
[SyncRequestType.AlbumsV1]: () => this.syncAlbumsV1(response, checkpointMap, auth), [SyncRequestType.AlbumsV1]: () => this.syncAlbumsV1(options, response, checkpointMap),
[SyncRequestType.AlbumUsersV1]: () => this.syncAlbumUsersV1(response, checkpointMap, auth, session.id), [SyncRequestType.AlbumUsersV1]: () => this.syncAlbumUsersV1(options, response, checkpointMap, session.id),
[SyncRequestType.AlbumAssetsV1]: () => this.syncAlbumAssetsV1(response, checkpointMap, auth, session.id), [SyncRequestType.AlbumAssetsV1]: () => this.syncAlbumAssetsV1(options, response, checkpointMap, session.id),
[SyncRequestType.AlbumToAssetsV1]: () => this.syncAlbumToAssetsV1(response, checkpointMap, auth, session.id), [SyncRequestType.AlbumToAssetsV1]: () => this.syncAlbumToAssetsV1(options, response, checkpointMap, session.id),
[SyncRequestType.AlbumAssetExifsV1]: () => this.syncAlbumAssetExifsV1(response, checkpointMap, auth, session.id), [SyncRequestType.AlbumAssetExifsV1]: () =>
[SyncRequestType.MemoriesV1]: () => this.syncMemoriesV1(response, checkpointMap, auth), this.syncAlbumAssetExifsV1(options, response, checkpointMap, session.id),
[SyncRequestType.MemoryToAssetsV1]: () => this.syncMemoryAssetsV1(response, checkpointMap, auth), [SyncRequestType.MemoriesV1]: () => this.syncMemoriesV1(options, response, checkpointMap),
[SyncRequestType.StacksV1]: () => this.syncStackV1(response, checkpointMap, auth), [SyncRequestType.MemoryToAssetsV1]: () => this.syncMemoryAssetsV1(options, response, checkpointMap),
[SyncRequestType.PartnerStacksV1]: () => this.syncPartnerStackV1(response, checkpointMap, auth, session.id), [SyncRequestType.StacksV1]: () => this.syncStackV1(options, response, checkpointMap),
[SyncRequestType.PeopleV1]: () => this.syncPeopleV1(response, checkpointMap, auth), [SyncRequestType.PartnerStacksV1]: () => this.syncPartnerStackV1(options, response, checkpointMap, session.id),
[SyncRequestType.AssetFacesV1]: async () => this.syncAssetFacesV1(response, checkpointMap, auth), [SyncRequestType.PeopleV1]: () => this.syncPeopleV1(options, response, checkpointMap),
[SyncRequestType.UserMetadataV1]: () => this.syncUserMetadataV1(response, checkpointMap, auth), [SyncRequestType.AssetFacesV1]: async () => this.syncAssetFacesV1(options, response, checkpointMap),
[SyncRequestType.UserMetadataV1]: () => this.syncUserMetadataV1(options, response, checkpointMap),
}; };
for (const type of SYNC_TYPES_ORDER.filter((type) => dto.types.includes(type))) { for (const type of SYNC_TYPES_ORDER.filter((type) => dto.types.includes(type))) {
@ -172,71 +177,71 @@ export class SyncService extends BaseService {
response.end(); response.end();
} }
private async syncAuthUsersV1(response: Writable, checkpointMap: CheckpointMap) { private async syncAuthUsersV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
const upsertType = SyncEntityType.AuthUserV1; const upsertType = SyncEntityType.AuthUserV1;
const upserts = this.syncRepository.authUser.getUpserts(checkpointMap[upsertType]); const upserts = this.syncRepository.authUser.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, profileImagePath, ...data } of upserts) { for await (const { updateId, profileImagePath, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data: { ...data, hasProfileImage: !!profileImagePath } }); send(response, { type: upsertType, ids: [updateId], data: { ...data, hasProfileImage: !!profileImagePath } });
} }
} }
private async syncUsersV1(response: Writable, checkpointMap: CheckpointMap) { private async syncUsersV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
const deleteType = SyncEntityType.UserDeleteV1; const deleteType = SyncEntityType.UserDeleteV1;
const deletes = this.syncRepository.user.getDeletes(checkpointMap[deleteType]); const deletes = this.syncRepository.user.getDeletes(options, checkpointMap[deleteType]);
for await (const { id, ...data } of deletes) { for await (const { id, ...data } of deletes) {
send(response, { type: deleteType, ids: [id], data }); send(response, { type: deleteType, ids: [id], data });
} }
const upsertType = SyncEntityType.UserV1; const upsertType = SyncEntityType.UserV1;
const upserts = this.syncRepository.user.getUpserts(checkpointMap[upsertType]); const upserts = this.syncRepository.user.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, profileImagePath, ...data } of upserts) { for await (const { updateId, profileImagePath, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data: { ...data, hasProfileImage: !!profileImagePath } }); send(response, { type: upsertType, ids: [updateId], data: { ...data, hasProfileImage: !!profileImagePath } });
} }
} }
private async syncPartnersV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { private async syncPartnersV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
const deleteType = SyncEntityType.PartnerDeleteV1; const deleteType = SyncEntityType.PartnerDeleteV1;
const deletes = this.syncRepository.partner.getDeletes(auth.user.id, checkpointMap[deleteType]); const deletes = this.syncRepository.partner.getDeletes(options, checkpointMap[deleteType]);
for await (const { id, ...data } of deletes) { for await (const { id, ...data } of deletes) {
send(response, { type: deleteType, ids: [id], data }); send(response, { type: deleteType, ids: [id], data });
} }
const upsertType = SyncEntityType.PartnerV1; const upsertType = SyncEntityType.PartnerV1;
const upserts = this.syncRepository.partner.getUpserts(auth.user.id, checkpointMap[upsertType]); const upserts = this.syncRepository.partner.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) { for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data }); send(response, { type: upsertType, ids: [updateId], data });
} }
} }
private async syncAssetsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { private async syncAssetsV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
const deleteType = SyncEntityType.AssetDeleteV1; const deleteType = SyncEntityType.AssetDeleteV1;
const deletes = this.syncRepository.asset.getDeletes(auth.user.id, checkpointMap[deleteType]); const deletes = this.syncRepository.asset.getDeletes(options, checkpointMap[deleteType]);
for await (const { id, ...data } of deletes) { for await (const { id, ...data } of deletes) {
send(response, { type: deleteType, ids: [id], data }); send(response, { type: deleteType, ids: [id], data });
} }
const upsertType = SyncEntityType.AssetV1; const upsertType = SyncEntityType.AssetV1;
const upserts = this.syncRepository.asset.getUpserts(auth.user.id, checkpointMap[upsertType]); const upserts = this.syncRepository.asset.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) { for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data: mapSyncAssetV1(data) }); send(response, { type: upsertType, ids: [updateId], data: mapSyncAssetV1(data) });
} }
} }
private async syncPartnerAssetsV1( private async syncPartnerAssetsV1(
options: SyncQueryOptions,
response: Writable, response: Writable,
checkpointMap: CheckpointMap, checkpointMap: CheckpointMap,
auth: AuthDto,
sessionId: string, sessionId: string,
) { ) {
const deleteType = SyncEntityType.PartnerAssetDeleteV1; const deleteType = SyncEntityType.PartnerAssetDeleteV1;
const deletes = this.syncRepository.partnerAsset.getDeletes(auth.user.id, checkpointMap[deleteType]); const deletes = this.syncRepository.partnerAsset.getDeletes(options, checkpointMap[deleteType]);
for await (const { id, ...data } of deletes) { for await (const { id, ...data } of deletes) {
send(response, { type: deleteType, ids: [id], data }); send(response, { type: deleteType, ids: [id], data });
} }
const backfillType = SyncEntityType.PartnerAssetBackfillV1; const backfillType = SyncEntityType.PartnerAssetBackfillV1;
const backfillCheckpoint = checkpointMap[backfillType]; const backfillCheckpoint = checkpointMap[backfillType];
const partners = await this.syncRepository.partner.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); const partners = await this.syncRepository.partner.getCreatedAfter(options, backfillCheckpoint?.updateId);
const upsertType = SyncEntityType.PartnerAssetV1; const upsertType = SyncEntityType.PartnerAssetV1;
const upsertCheckpoint = checkpointMap[upsertType]; const upsertCheckpoint = checkpointMap[upsertType];
if (upsertCheckpoint) { if (upsertCheckpoint) {
@ -249,7 +254,7 @@ export class SyncService extends BaseService {
} }
const startId = getStartId(createId, backfillCheckpoint); const startId = getStartId(createId, backfillCheckpoint);
const backfill = this.syncRepository.partnerAsset.getBackfill(partner.sharedById, startId, endId); const backfill = this.syncRepository.partnerAsset.getBackfill(options, partner.sharedById, startId, endId);
for await (const { updateId, ...data } of backfill) { for await (const { updateId, ...data } of backfill) {
send(response, { send(response, {
@ -269,29 +274,29 @@ export class SyncService extends BaseService {
}); });
} }
const upserts = this.syncRepository.partnerAsset.getUpserts(auth.user.id, checkpointMap[upsertType]); const upserts = this.syncRepository.partnerAsset.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) { for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data: mapSyncAssetV1(data) }); send(response, { type: upsertType, ids: [updateId], data: mapSyncAssetV1(data) });
} }
} }
private async syncAssetExifsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { private async syncAssetExifsV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
const upsertType = SyncEntityType.AssetExifV1; const upsertType = SyncEntityType.AssetExifV1;
const upserts = this.syncRepository.assetExif.getUpserts(auth.user.id, checkpointMap[upsertType]); const upserts = this.syncRepository.assetExif.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) { for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data }); send(response, { type: upsertType, ids: [updateId], data });
} }
} }
private async syncPartnerAssetExifsV1( private async syncPartnerAssetExifsV1(
options: SyncQueryOptions,
response: Writable, response: Writable,
checkpointMap: CheckpointMap, checkpointMap: CheckpointMap,
auth: AuthDto,
sessionId: string, sessionId: string,
) { ) {
const backfillType = SyncEntityType.PartnerAssetExifBackfillV1; const backfillType = SyncEntityType.PartnerAssetExifBackfillV1;
const backfillCheckpoint = checkpointMap[backfillType]; const backfillCheckpoint = checkpointMap[backfillType];
const partners = await this.syncRepository.partner.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); const partners = await this.syncRepository.partner.getCreatedAfter(options, backfillCheckpoint?.updateId);
const upsertType = SyncEntityType.PartnerAssetExifV1; const upsertType = SyncEntityType.PartnerAssetExifV1;
const upsertCheckpoint = checkpointMap[upsertType]; const upsertCheckpoint = checkpointMap[upsertType];
@ -305,7 +310,7 @@ export class SyncService extends BaseService {
} }
const startId = getStartId(createId, backfillCheckpoint); const startId = getStartId(createId, backfillCheckpoint);
const backfill = this.syncRepository.partnerAssetExif.getBackfill(partner.sharedById, startId, endId); const backfill = this.syncRepository.partnerAssetExif.getBackfill(options, partner.sharedById, startId, endId);
for await (const { updateId, ...data } of backfill) { for await (const { updateId, ...data } of backfill) {
send(response, { type: backfillType, ids: [partner.createId, updateId], data }); send(response, { type: backfillType, ids: [partner.createId, updateId], data });
@ -321,36 +326,41 @@ export class SyncService extends BaseService {
}); });
} }
const upserts = this.syncRepository.partnerAssetExif.getUpserts(auth.user.id, checkpointMap[upsertType]); const upserts = this.syncRepository.partnerAssetExif.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) { for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data }); send(response, { type: upsertType, ids: [updateId], data });
} }
} }
private async syncAlbumsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { private async syncAlbumsV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
const deleteType = SyncEntityType.AlbumDeleteV1; const deleteType = SyncEntityType.AlbumDeleteV1;
const deletes = this.syncRepository.album.getDeletes(auth.user.id, checkpointMap[deleteType]); const deletes = this.syncRepository.album.getDeletes(options, checkpointMap[deleteType]);
for await (const { id, ...data } of deletes) { for await (const { id, ...data } of deletes) {
send(response, { type: deleteType, ids: [id], data }); send(response, { type: deleteType, ids: [id], data });
} }
const upsertType = SyncEntityType.AlbumV1; const upsertType = SyncEntityType.AlbumV1;
const upserts = this.syncRepository.album.getUpserts(auth.user.id, checkpointMap[upsertType]); const upserts = this.syncRepository.album.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) { for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data }); send(response, { type: upsertType, ids: [updateId], data });
} }
} }
private async syncAlbumUsersV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto, sessionId: string) { private async syncAlbumUsersV1(
options: SyncQueryOptions,
response: Writable,
checkpointMap: CheckpointMap,
sessionId: string,
) {
const deleteType = SyncEntityType.AlbumUserDeleteV1; const deleteType = SyncEntityType.AlbumUserDeleteV1;
const deletes = this.syncRepository.albumUser.getDeletes(auth.user.id, checkpointMap[deleteType]); const deletes = this.syncRepository.albumUser.getDeletes(options, checkpointMap[deleteType]);
for await (const { id, ...data } of deletes) { for await (const { id, ...data } of deletes) {
send(response, { type: deleteType, ids: [id], data }); send(response, { type: deleteType, ids: [id], data });
} }
const backfillType = SyncEntityType.AlbumUserBackfillV1; const backfillType = SyncEntityType.AlbumUserBackfillV1;
const backfillCheckpoint = checkpointMap[backfillType]; const backfillCheckpoint = checkpointMap[backfillType];
const albums = await this.syncRepository.album.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); const albums = await this.syncRepository.album.getCreatedAfter(options, backfillCheckpoint?.updateId);
const upsertType = SyncEntityType.AlbumUserV1; const upsertType = SyncEntityType.AlbumUserV1;
const upsertCheckpoint = checkpointMap[upsertType]; const upsertCheckpoint = checkpointMap[upsertType];
if (upsertCheckpoint) { if (upsertCheckpoint) {
@ -363,7 +373,7 @@ export class SyncService extends BaseService {
} }
const startId = getStartId(createId, backfillCheckpoint); const startId = getStartId(createId, backfillCheckpoint);
const backfill = this.syncRepository.albumUser.getBackfill(album.id, startId, endId); const backfill = this.syncRepository.albumUser.getBackfill(options, album.id, startId, endId);
for await (const { updateId, ...data } of backfill) { for await (const { updateId, ...data } of backfill) {
send(response, { type: backfillType, ids: [createId, updateId], data }); send(response, { type: backfillType, ids: [createId, updateId], data });
@ -379,16 +389,21 @@ export class SyncService extends BaseService {
}); });
} }
const upserts = this.syncRepository.albumUser.getUpserts(auth.user.id, checkpointMap[upsertType]); const upserts = this.syncRepository.albumUser.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) { for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data }); send(response, { type: upsertType, ids: [updateId], data });
} }
} }
private async syncAlbumAssetsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto, sessionId: string) { private async syncAlbumAssetsV1(
options: SyncQueryOptions,
response: Writable,
checkpointMap: CheckpointMap,
sessionId: string,
) {
const backfillType = SyncEntityType.AlbumAssetBackfillV1; const backfillType = SyncEntityType.AlbumAssetBackfillV1;
const backfillCheckpoint = checkpointMap[backfillType]; const backfillCheckpoint = checkpointMap[backfillType];
const albums = await this.syncRepository.album.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); const albums = await this.syncRepository.album.getCreatedAfter(options, backfillCheckpoint?.updateId);
const updateType = SyncEntityType.AlbumAssetUpdateV1; const updateType = SyncEntityType.AlbumAssetUpdateV1;
const createType = SyncEntityType.AlbumAssetCreateV1; const createType = SyncEntityType.AlbumAssetCreateV1;
const updateCheckpoint = checkpointMap[updateType]; const updateCheckpoint = checkpointMap[updateType];
@ -403,7 +418,7 @@ export class SyncService extends BaseService {
} }
const startId = getStartId(createId, backfillCheckpoint); const startId = getStartId(createId, backfillCheckpoint);
const backfill = this.syncRepository.albumAsset.getBackfill(album.id, startId, endId); const backfill = this.syncRepository.albumAsset.getBackfill(options, album.id, startId, endId);
for await (const { updateId, ...data } of backfill) { for await (const { updateId, ...data } of backfill) {
send(response, { type: backfillType, ids: [createId, updateId], data: mapSyncAssetV1(data) }); send(response, { type: backfillType, ids: [createId, updateId], data: mapSyncAssetV1(data) });
@ -420,13 +435,13 @@ export class SyncService extends BaseService {
} }
if (createCheckpoint) { if (createCheckpoint) {
const updates = this.syncRepository.albumAsset.getUpdates(auth.user.id, createCheckpoint, updateCheckpoint); const updates = this.syncRepository.albumAsset.getUpdates(options, createCheckpoint, updateCheckpoint);
for await (const { updateId, ...data } of updates) { for await (const { updateId, ...data } of updates) {
send(response, { type: updateType, ids: [updateId], data: mapSyncAssetV1(data) }); send(response, { type: updateType, ids: [updateId], data: mapSyncAssetV1(data) });
} }
} }
const creates = this.syncRepository.albumAsset.getCreates(auth.user.id, createCheckpoint); const creates = this.syncRepository.albumAsset.getCreates(options, createCheckpoint);
let first = true; let first = true;
for await (const { updateId, ...data } of creates) { for await (const { updateId, ...data } of creates) {
if (first) { if (first) {
@ -443,14 +458,14 @@ export class SyncService extends BaseService {
} }
private async syncAlbumAssetExifsV1( private async syncAlbumAssetExifsV1(
options: SyncQueryOptions,
response: Writable, response: Writable,
checkpointMap: CheckpointMap, checkpointMap: CheckpointMap,
auth: AuthDto,
sessionId: string, sessionId: string,
) { ) {
const backfillType = SyncEntityType.AlbumAssetExifBackfillV1; const backfillType = SyncEntityType.AlbumAssetExifBackfillV1;
const backfillCheckpoint = checkpointMap[backfillType]; const backfillCheckpoint = checkpointMap[backfillType];
const albums = await this.syncRepository.album.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); const albums = await this.syncRepository.album.getCreatedAfter(options, backfillCheckpoint?.updateId);
const upsertType = SyncEntityType.AlbumAssetExifV1; const upsertType = SyncEntityType.AlbumAssetExifV1;
const upsertCheckpoint = checkpointMap[upsertType]; const upsertCheckpoint = checkpointMap[upsertType];
if (upsertCheckpoint) { if (upsertCheckpoint) {
@ -463,7 +478,7 @@ export class SyncService extends BaseService {
} }
const startId = getStartId(createId, backfillCheckpoint); const startId = getStartId(createId, backfillCheckpoint);
const backfill = this.syncRepository.albumAssetExif.getBackfill(album.id, startId, endId); const backfill = this.syncRepository.albumAssetExif.getBackfill(options, album.id, startId, endId);
for await (const { updateId, ...data } of backfill) { for await (const { updateId, ...data } of backfill) {
send(response, { type: backfillType, ids: [createId, updateId], data }); send(response, { type: backfillType, ids: [createId, updateId], data });
@ -479,27 +494,27 @@ export class SyncService extends BaseService {
}); });
} }
const upserts = this.syncRepository.albumAssetExif.getUpserts(auth.user.id, checkpointMap[upsertType]); const upserts = this.syncRepository.albumAssetExif.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) { for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data }); send(response, { type: upsertType, ids: [updateId], data });
} }
} }
private async syncAlbumToAssetsV1( private async syncAlbumToAssetsV1(
options: SyncQueryOptions,
response: Writable, response: Writable,
checkpointMap: CheckpointMap, checkpointMap: CheckpointMap,
auth: AuthDto,
sessionId: string, sessionId: string,
) { ) {
const deleteType = SyncEntityType.AlbumToAssetDeleteV1; const deleteType = SyncEntityType.AlbumToAssetDeleteV1;
const deletes = this.syncRepository.albumToAsset.getDeletes(auth.user.id, checkpointMap[deleteType]); const deletes = this.syncRepository.albumToAsset.getDeletes(options, checkpointMap[deleteType]);
for await (const { id, ...data } of deletes) { for await (const { id, ...data } of deletes) {
send(response, { type: deleteType, ids: [id], data }); send(response, { type: deleteType, ids: [id], data });
} }
const backfillType = SyncEntityType.AlbumToAssetBackfillV1; const backfillType = SyncEntityType.AlbumToAssetBackfillV1;
const backfillCheckpoint = checkpointMap[backfillType]; const backfillCheckpoint = checkpointMap[backfillType];
const albums = await this.syncRepository.album.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); const albums = await this.syncRepository.album.getCreatedAfter(options, backfillCheckpoint?.updateId);
const upsertType = SyncEntityType.AlbumToAssetV1; const upsertType = SyncEntityType.AlbumToAssetV1;
const upsertCheckpoint = checkpointMap[upsertType]; const upsertCheckpoint = checkpointMap[upsertType];
if (upsertCheckpoint) { if (upsertCheckpoint) {
@ -512,7 +527,7 @@ export class SyncService extends BaseService {
} }
const startId = getStartId(createId, backfillCheckpoint); const startId = getStartId(createId, backfillCheckpoint);
const backfill = this.syncRepository.albumToAsset.getBackfill(album.id, startId, endId); const backfill = this.syncRepository.albumToAsset.getBackfill(options, album.id, startId, endId);
for await (const { updateId, ...data } of backfill) { for await (const { updateId, ...data } of backfill) {
send(response, { type: backfillType, ids: [createId, updateId], data }); send(response, { type: backfillType, ids: [createId, updateId], data });
@ -528,64 +543,69 @@ export class SyncService extends BaseService {
}); });
} }
const upserts = this.syncRepository.albumToAsset.getUpserts(auth.user.id, checkpointMap[upsertType]); const upserts = this.syncRepository.albumToAsset.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) { for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data }); send(response, { type: upsertType, ids: [updateId], data });
} }
} }
private async syncMemoriesV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { private async syncMemoriesV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
const deleteType = SyncEntityType.MemoryDeleteV1; const deleteType = SyncEntityType.MemoryDeleteV1;
const deletes = this.syncRepository.memory.getDeletes(auth.user.id, checkpointMap[deleteType]); const deletes = this.syncRepository.memory.getDeletes(options, checkpointMap[deleteType]);
for await (const { id, ...data } of deletes) { for await (const { id, ...data } of deletes) {
send(response, { type: deleteType, ids: [id], data }); send(response, { type: deleteType, ids: [id], data });
} }
const upsertType = SyncEntityType.MemoryV1; const upsertType = SyncEntityType.MemoryV1;
const upserts = this.syncRepository.memory.getUpserts(auth.user.id, checkpointMap[upsertType]); const upserts = this.syncRepository.memory.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) { for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data }); send(response, { type: upsertType, ids: [updateId], data });
} }
} }
private async syncMemoryAssetsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { private async syncMemoryAssetsV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
const deleteType = SyncEntityType.MemoryToAssetDeleteV1; const deleteType = SyncEntityType.MemoryToAssetDeleteV1;
const deletes = this.syncRepository.memoryToAsset.getDeletes(auth.user.id, checkpointMap[deleteType]); const deletes = this.syncRepository.memoryToAsset.getDeletes(options, checkpointMap[deleteType]);
for await (const { id, ...data } of deletes) { for await (const { id, ...data } of deletes) {
send(response, { type: deleteType, ids: [id], data }); send(response, { type: deleteType, ids: [id], data });
} }
const upsertType = SyncEntityType.MemoryToAssetV1; const upsertType = SyncEntityType.MemoryToAssetV1;
const upserts = this.syncRepository.memoryToAsset.getUpserts(auth.user.id, checkpointMap[upsertType]); const upserts = this.syncRepository.memoryToAsset.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) { for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data }); send(response, { type: upsertType, ids: [updateId], data });
} }
} }
private async syncStackV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { private async syncStackV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
const deleteType = SyncEntityType.StackDeleteV1; const deleteType = SyncEntityType.StackDeleteV1;
const deletes = this.syncRepository.stack.getDeletes(auth.user.id, checkpointMap[deleteType]); const deletes = this.syncRepository.stack.getDeletes(options, checkpointMap[deleteType]);
for await (const { id, ...data } of deletes) { for await (const { id, ...data } of deletes) {
send(response, { type: deleteType, ids: [id], data }); send(response, { type: deleteType, ids: [id], data });
} }
const upsertType = SyncEntityType.StackV1; const upsertType = SyncEntityType.StackV1;
const upserts = this.syncRepository.stack.getUpserts(auth.user.id, checkpointMap[upsertType]); const upserts = this.syncRepository.stack.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) { for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data }); send(response, { type: upsertType, ids: [updateId], data });
} }
} }
private async syncPartnerStackV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto, sessionId: string) { private async syncPartnerStackV1(
options: SyncQueryOptions,
response: Writable,
checkpointMap: CheckpointMap,
sessionId: string,
) {
const deleteType = SyncEntityType.PartnerStackDeleteV1; const deleteType = SyncEntityType.PartnerStackDeleteV1;
const deletes = this.syncRepository.partnerStack.getDeletes(auth.user.id, checkpointMap[deleteType]); const deletes = this.syncRepository.partnerStack.getDeletes(options, checkpointMap[deleteType]);
for await (const { id, ...data } of deletes) { for await (const { id, ...data } of deletes) {
send(response, { type: deleteType, ids: [id], data }); send(response, { type: deleteType, ids: [id], data });
} }
const backfillType = SyncEntityType.PartnerStackBackfillV1; const backfillType = SyncEntityType.PartnerStackBackfillV1;
const backfillCheckpoint = checkpointMap[backfillType]; const backfillCheckpoint = checkpointMap[backfillType];
const partners = await this.syncRepository.partner.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId); const partners = await this.syncRepository.partner.getCreatedAfter(options, backfillCheckpoint?.updateId);
const upsertType = SyncEntityType.PartnerStackV1; const upsertType = SyncEntityType.PartnerStackV1;
const upsertCheckpoint = checkpointMap[upsertType]; const upsertCheckpoint = checkpointMap[upsertType];
if (upsertCheckpoint) { if (upsertCheckpoint) {
@ -598,7 +618,7 @@ export class SyncService extends BaseService {
} }
const startId = getStartId(createId, backfillCheckpoint); const startId = getStartId(createId, backfillCheckpoint);
const backfill = this.syncRepository.partnerStack.getBackfill(partner.sharedById, startId, endId); const backfill = this.syncRepository.partnerStack.getBackfill(options, partner.sharedById, startId, endId);
for await (const { updateId, ...data } of backfill) { for await (const { updateId, ...data } of backfill) {
send(response, { send(response, {
@ -618,50 +638,50 @@ export class SyncService extends BaseService {
}); });
} }
const upserts = this.syncRepository.partnerStack.getUpserts(auth.user.id, checkpointMap[upsertType]); const upserts = this.syncRepository.partnerStack.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) { for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data }); send(response, { type: upsertType, ids: [updateId], data });
} }
} }
private async syncPeopleV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { private async syncPeopleV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
const deleteType = SyncEntityType.PersonDeleteV1; const deleteType = SyncEntityType.PersonDeleteV1;
const deletes = this.syncRepository.people.getDeletes(auth.user.id, checkpointMap[deleteType]); const deletes = this.syncRepository.people.getDeletes(options, checkpointMap[deleteType]);
for await (const { id, ...data } of deletes) { for await (const { id, ...data } of deletes) {
send(response, { type: deleteType, ids: [id], data }); send(response, { type: deleteType, ids: [id], data });
} }
const upsertType = SyncEntityType.PersonV1; const upsertType = SyncEntityType.PersonV1;
const upserts = this.syncRepository.people.getUpserts(auth.user.id, checkpointMap[upsertType]); const upserts = this.syncRepository.people.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) { for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data }); send(response, { type: upsertType, ids: [updateId], data });
} }
} }
private async syncAssetFacesV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { private async syncAssetFacesV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
const deleteType = SyncEntityType.AssetFaceDeleteV1; const deleteType = SyncEntityType.AssetFaceDeleteV1;
const deletes = this.syncRepository.assetFace.getDeletes(auth.user.id, checkpointMap[deleteType]); const deletes = this.syncRepository.assetFace.getDeletes(options, checkpointMap[deleteType]);
for await (const { id, ...data } of deletes) { for await (const { id, ...data } of deletes) {
send(response, { type: deleteType, ids: [id], data }); send(response, { type: deleteType, ids: [id], data });
} }
const upsertType = SyncEntityType.AssetFaceV1; const upsertType = SyncEntityType.AssetFaceV1;
const upserts = this.syncRepository.assetFace.getUpserts(auth.user.id, checkpointMap[upsertType]); const upserts = this.syncRepository.assetFace.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) { for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data }); send(response, { type: upsertType, ids: [updateId], data });
} }
} }
private async syncUserMetadataV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { private async syncUserMetadataV1(options: SyncQueryOptions, response: Writable, checkpointMap: CheckpointMap) {
const deleteType = SyncEntityType.UserMetadataDeleteV1; const deleteType = SyncEntityType.UserMetadataDeleteV1;
const deletes = this.syncRepository.userMetadata.getDeletes(auth.user.id, checkpointMap[deleteType]); const deletes = this.syncRepository.userMetadata.getDeletes(options, checkpointMap[deleteType]);
for await (const { id, ...data } of deletes) { for await (const { id, ...data } of deletes) {
send(response, { type: deleteType, ids: [id], data }); send(response, { type: deleteType, ids: [id], data });
} }
const upsertType = SyncEntityType.UserMetadataV1; const upsertType = SyncEntityType.UserMetadataV1;
const upserts = this.syncRepository.userMetadata.getUpserts(auth.user.id, checkpointMap[upsertType]); const upserts = this.syncRepository.userMetadata.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) { for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data }); send(response, { type: upsertType, ids: [updateId], data });

View File

@ -481,8 +481,6 @@ const personInsert = (person: Partial<Insertable<PersonTable>> & { ownerId: stri
name: person.name || 'Test Name', name: person.name || 'Test Name',
ownerId: person.ownerId || newUuid(), ownerId: person.ownerId || newUuid(),
thumbnailPath: person.thumbnailPath || '/path/to/thumbnail.jpg', thumbnailPath: person.thumbnailPath || '/path/to/thumbnail.jpg',
updatedAt: person.updatedAt || newDate(),
updateId: person.updateId || newUuid(),
}; };
return { return {
...defaults, ...defaults,