From 35280b94cc3fc13a36a3b954ed548a655409f108 Mon Sep 17 00:00:00 2001 From: Jason Rasmussen Date: Tue, 17 Jun 2025 12:06:40 -0400 Subject: [PATCH] refactor: sync service (#19225) --- server/src/services/sync.service.ts | 454 ++++++++++++++-------------- server/src/utils/sync.ts | 15 +- 2 files changed, 235 insertions(+), 234 deletions(-) diff --git a/server/src/services/sync.service.ts b/server/src/services/sync.service.ts index 4705fc8e1f..46937a570f 100644 --- a/server/src/services/sync.service.ts +++ b/server/src/services/sync.service.ts @@ -12,6 +12,8 @@ import { AssetFullSyncDto, SyncAckDeleteDto, SyncAckSetDto, + SyncAssetV1, + SyncItem, SyncStreamDto, } from 'src/dtos/sync.dto'; import { AssetVisibility, DatabaseAction, EntityType, Permission, SyncEntityType, SyncRequestType } from 'src/enum'; @@ -20,7 +22,35 @@ import { SyncAck } from 'src/types'; import { getMyPartnerIds } from 'src/utils/asset.util'; import { hexOrBufferToBase64 } from 'src/utils/bytes'; import { setIsEqual } from 'src/utils/set'; -import { fromAck, serialize, toAck } from 'src/utils/sync'; +import { fromAck, serialize, SerializeOptions, toAck } from 'src/utils/sync'; + +type CheckpointMap = Partial>; +type AssetLike = Omit & { + checksum: Buffer; + thumbhash: Buffer | null; +}; + +const COMPLETE_ID = 'complete'; + +const mapSyncAssetV1 = ({ checksum, thumbhash, ...data }: AssetLike): SyncAssetV1 => ({ + ...data, + checksum: hexOrBufferToBase64(checksum), + thumbhash: thumbhash ? hexOrBufferToBase64(thumbhash) : null, +}); + +const isEntityBackfillComplete = (entity: { createId: string }, checkpoint: SyncAck | undefined): boolean => + entity.createId === checkpoint?.updateId && checkpoint.extraId === COMPLETE_ID; + +const getStartId = (entity: { createId: string }, checkpoint: SyncAck | undefined): string | undefined => + checkpoint?.updateId === entity.createId ? checkpoint?.extraId : undefined; + +const send = (response: Writable, item: SerializeOptions) => { + response.write(serialize(item)); +}; + +const sendEntityBackfillCompleteAck = (response: Writable, ackType: SyncEntityType, id: string) => { + send(response, { type: SyncEntityType.SyncAckV1, data: {}, ackType, ids: [id, COMPLETE_ID] }); +}; const FULL_SYNC = { needsFullSync: true, deleted: [], upserted: [] }; export const SYNC_TYPES_ORDER = [ @@ -89,266 +119,48 @@ export class SyncService extends BaseService { } const checkpoints = await this.syncRepository.getCheckpoints(sessionId); - const checkpointMap: Partial> = Object.fromEntries( - checkpoints.map(({ type, ack }) => [type, fromAck(ack)]), - ); + const checkpointMap: CheckpointMap = Object.fromEntries(checkpoints.map(({ type, ack }) => [type, fromAck(ack)])); for (const type of SYNC_TYPES_ORDER.filter((type) => dto.types.includes(type))) { switch (type) { case SyncRequestType.UsersV1: { - const deletes = this.syncRepository.getUserDeletes(checkpointMap[SyncEntityType.UserDeleteV1]); - for await (const { id, ...data } of deletes) { - response.write(serialize({ type: SyncEntityType.UserDeleteV1, ids: [id], data })); - } - - const upserts = this.syncRepository.getUserUpserts(checkpointMap[SyncEntityType.UserV1]); - for await (const { updateId, ...data } of upserts) { - response.write(serialize({ type: SyncEntityType.UserV1, ids: [updateId], data })); - } - + await this.syncUsersV1(response, checkpointMap); break; } case SyncRequestType.PartnersV1: { - const deletes = this.syncRepository.getPartnerDeletes( - auth.user.id, - checkpointMap[SyncEntityType.PartnerDeleteV1], - ); - for await (const { id, ...data } of deletes) { - response.write(serialize({ type: SyncEntityType.PartnerDeleteV1, ids: [id], data })); - } - - const upserts = this.syncRepository.getPartnerUpserts(auth.user.id, checkpointMap[SyncEntityType.PartnerV1]); - for await (const { updateId, ...data } of upserts) { - response.write(serialize({ type: SyncEntityType.PartnerV1, ids: [updateId], data })); - } - + await this.syncPartnersV1(response, checkpointMap, auth); break; } case SyncRequestType.AssetsV1: { - const deletes = this.syncRepository.getAssetDeletes( - auth.user.id, - checkpointMap[SyncEntityType.AssetDeleteV1], - ); - for await (const { id, ...data } of deletes) { - response.write(serialize({ type: SyncEntityType.AssetDeleteV1, ids: [id], data })); - } - - const upserts = this.syncRepository.getAssetUpserts(auth.user.id, checkpointMap[SyncEntityType.AssetV1]); - for await (const { updateId, checksum, thumbhash, ...data } of upserts) { - response.write( - serialize({ - type: SyncEntityType.AssetV1, - ids: [updateId], - data: { - ...data, - checksum: hexOrBufferToBase64(checksum), - thumbhash: thumbhash ? hexOrBufferToBase64(thumbhash) : null, - }, - }), - ); - } - + await this.syncAssetsV1(response, checkpointMap, auth); break; } case SyncRequestType.PartnerAssetsV1: { - const deletes = this.syncRepository.getPartnerAssetDeletes( - auth.user.id, - checkpointMap[SyncEntityType.PartnerAssetDeleteV1], - ); - for await (const { id, ...data } of deletes) { - response.write(serialize({ type: SyncEntityType.PartnerAssetDeleteV1, ids: [id], data })); - } - - const checkpoint = checkpointMap[SyncEntityType.PartnerAssetBackfillV1]; - const partnerAssetCheckpoint = checkpointMap[SyncEntityType.PartnerAssetV1]; - - const partners = await this.syncRepository.getPartnerBackfill(auth.user.id, checkpoint?.updateId); - - if (partnerAssetCheckpoint) { - for (const partner of partners) { - if (partner.createId === checkpoint?.updateId && checkpoint.extraId === 'complete') { - continue; - } - const partnerCheckpoint = checkpoint?.updateId === partner.createId ? checkpoint?.extraId : undefined; - const backfill = this.syncRepository.getPartnerAssetsBackfill( - partner.sharedById, - partnerCheckpoint, - partnerAssetCheckpoint.updateId, - ); - - for await (const { updateId, checksum, thumbhash, ...data } of backfill) { - response.write( - serialize({ - type: SyncEntityType.PartnerAssetBackfillV1, - ids: [updateId], - data: { - ...data, - checksum: hexOrBufferToBase64(checksum), - thumbhash: thumbhash ? hexOrBufferToBase64(thumbhash) : null, - }, - }), - ); - } - response.write( - serialize({ - type: SyncEntityType.SyncAckV1, - data: {}, - ackType: SyncEntityType.PartnerAssetBackfillV1, - ids: [partner.sharedById, 'complete'], - }), - ); - } - } else if (partners.length > 0) { - await this.syncRepository.upsertCheckpoints([ - { - type: SyncEntityType.PartnerAssetBackfillV1, - sessionId, - ack: toAck({ - type: SyncEntityType.PartnerAssetBackfillV1, - updateId: partners.at(-1)!.createId, - extraId: 'complete', - }), - }, - ]); - } - - const upserts = this.syncRepository.getPartnerAssetsUpserts( - auth.user.id, - checkpointMap[SyncEntityType.PartnerAssetV1], - ); - for await (const { updateId, checksum, thumbhash, ...data } of upserts) { - response.write( - serialize({ - type: SyncEntityType.PartnerAssetV1, - ids: [updateId], - data: { - ...data, - checksum: hexOrBufferToBase64(checksum), - thumbhash: thumbhash ? hexOrBufferToBase64(thumbhash) : null, - }, - }), - ); - } + await this.syncPartnerAssetsV1(response, checkpointMap, auth, sessionId); break; } case SyncRequestType.AssetExifsV1: { - const upserts = this.syncRepository.getAssetExifsUpserts( - auth.user.id, - checkpointMap[SyncEntityType.AssetExifV1], - ); - for await (const { updateId, ...data } of upserts) { - response.write(serialize({ type: SyncEntityType.AssetExifV1, ids: [updateId], data })); - } - + await this.syncAssetExifsV1(response, checkpointMap, auth); break; } case SyncRequestType.PartnerAssetExifsV1: { - const checkpoint = checkpointMap[SyncEntityType.PartnerAssetExifBackfillV1]; - const partnerAssetCheckpoint = checkpointMap[SyncEntityType.PartnerAssetExifV1]; - - const partners = await this.syncRepository.getPartnerBackfill(auth.user.id, checkpoint?.updateId); - - if (partnerAssetCheckpoint) { - for (const partner of partners) { - if (partner.createId === checkpoint?.updateId && checkpoint.extraId === 'complete') { - continue; - } - const partnerCheckpoint = checkpoint?.updateId === partner.createId ? checkpoint?.extraId : undefined; - const backfill = this.syncRepository.getPartnerAssetExifsBackfill( - partner.sharedById, - partnerCheckpoint, - partnerAssetCheckpoint.updateId, - ); - - for await (const { updateId, ...data } of backfill) { - response.write( - serialize({ - type: SyncEntityType.PartnerAssetExifBackfillV1, - ids: [updateId], - data, - }), - ); - } - response.write( - serialize({ - type: SyncEntityType.SyncAckV1, - data: {}, - ackType: SyncEntityType.PartnerAssetExifBackfillV1, - ids: [partner.sharedById, 'complete'], - }), - ); - } - } else if (partners.length > 0) { - await this.syncRepository.upsertCheckpoints([ - { - type: SyncEntityType.PartnerAssetExifBackfillV1, - sessionId, - ack: toAck({ - type: SyncEntityType.PartnerAssetExifBackfillV1, - updateId: partners.at(-1)!.createId, - extraId: 'complete', - }), - }, - ]); - } - - const upserts = this.syncRepository.getPartnerAssetExifsUpserts( - auth.user.id, - checkpointMap[SyncEntityType.PartnerAssetExifV1], - ); - for await (const { updateId, ...data } of upserts) { - response.write( - serialize({ - type: SyncEntityType.PartnerAssetExifV1, - ids: [updateId], - data, - }), - ); - } - + await this.syncPartnerAssetExifsV1(response, checkpointMap, auth, sessionId); break; } case SyncRequestType.AlbumsV1: { - const deletes = this.syncRepository.getAlbumDeletes( - auth.user.id, - checkpointMap[SyncEntityType.AlbumDeleteV1], - ); - for await (const { id, ...data } of deletes) { - response.write(serialize({ type: SyncEntityType.AlbumDeleteV1, ids: [id], data })); - } - - const upserts = this.syncRepository.getAlbumUpserts(auth.user.id, checkpointMap[SyncEntityType.AlbumV1]); - for await (const { updateId, ...data } of upserts) { - response.write(serialize({ type: SyncEntityType.AlbumV1, ids: [updateId], data })); - } - + await this.syncAlbumsV1(response, checkpointMap, auth); break; } case SyncRequestType.AlbumUsersV1: { - const deletes = this.syncRepository.getAlbumUserDeletes( - auth.user.id, - checkpointMap[SyncEntityType.AlbumUserDeleteV1], - ); - for await (const { id, ...data } of deletes) { - response.write(serialize({ type: SyncEntityType.AlbumUserDeleteV1, ids: [id], data })); - } - - const upserts = this.syncRepository.getAlbumUserUpserts( - auth.user.id, - checkpointMap[SyncEntityType.AlbumUserV1], - ); - for await (const { updateId, ...data } of upserts) { - response.write(serialize({ type: SyncEntityType.AlbumUserV1, ids: [updateId], data })); - } - + await this.syncAlbumUsersV1(response, checkpointMap, auth); break; } @@ -362,6 +174,192 @@ export class SyncService extends BaseService { response.end(); } + private async syncUsersV1(response: Writable, checkpointMap: CheckpointMap) { + const deletes = this.syncRepository.getUserDeletes(checkpointMap[SyncEntityType.UserDeleteV1]); + for await (const { id, ...data } of deletes) { + send(response, { type: SyncEntityType.UserDeleteV1, ids: [id], data }); + } + + const upserts = this.syncRepository.getUserUpserts(checkpointMap[SyncEntityType.UserV1]); + for await (const { updateId, ...data } of upserts) { + send(response, { type: SyncEntityType.UserV1, ids: [updateId], data }); + } + } + + private async syncPartnersV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { + const deletes = this.syncRepository.getPartnerDeletes(auth.user.id, checkpointMap[SyncEntityType.PartnerDeleteV1]); + for await (const { id, ...data } of deletes) { + send(response, { type: SyncEntityType.PartnerDeleteV1, ids: [id], data }); + } + + const upserts = this.syncRepository.getPartnerUpserts(auth.user.id, checkpointMap[SyncEntityType.PartnerV1]); + for await (const { updateId, ...data } of upserts) { + send(response, { type: SyncEntityType.PartnerV1, ids: [updateId], data }); + } + } + + private async syncAssetsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { + const deletes = this.syncRepository.getAssetDeletes(auth.user.id, checkpointMap[SyncEntityType.AssetDeleteV1]); + for await (const { id, ...data } of deletes) { + send(response, { type: SyncEntityType.AssetDeleteV1, ids: [id], data }); + } + + const upserts = this.syncRepository.getAssetUpserts(auth.user.id, checkpointMap[SyncEntityType.AssetV1]); + for await (const { updateId, ...data } of upserts) { + send(response, { type: SyncEntityType.AssetV1, ids: [updateId], data: mapSyncAssetV1(data) }); + } + } + + private async syncPartnerAssetsV1( + response: Writable, + checkpointMap: CheckpointMap, + auth: AuthDto, + sessionId: string, + ) { + const backfillType = SyncEntityType.PartnerAssetBackfillV1; + const upsertType = SyncEntityType.PartnerAssetV1; + const deleteType = SyncEntityType.PartnerAssetDeleteV1; + + const backfillCheckpoint = checkpointMap[backfillType]; + const upsertCheckpoint = checkpointMap[upsertType]; + + const deletes = this.syncRepository.getPartnerAssetDeletes(auth.user.id, checkpointMap[deleteType]); + + for await (const { id, ...data } of deletes) { + send(response, { type: deleteType, ids: [id], data }); + } + + const partners = await this.syncRepository.getPartnerBackfill(auth.user.id, backfillCheckpoint?.updateId); + + if (upsertCheckpoint) { + const endId = upsertCheckpoint.updateId; + + for (const partner of partners) { + if (isEntityBackfillComplete(partner, backfillCheckpoint)) { + continue; + } + + const startId = getStartId(partner, backfillCheckpoint); + const backfill = this.syncRepository.getPartnerAssetsBackfill(partner.sharedById, startId, endId); + + for await (const { updateId, ...data } of backfill) { + send(response, { + type: backfillType, + ids: [updateId], + data: mapSyncAssetV1(data), + }); + } + + sendEntityBackfillCompleteAck(response, backfillType, partner.sharedById); + } + } else if (partners.length > 0) { + await this.upsertBackfillCheckpoint({ + type: backfillType, + sessionId, + createId: partners.at(-1)!.createId, + }); + } + + const upserts = this.syncRepository.getPartnerAssetsUpserts(auth.user.id, checkpointMap[upsertType]); + for await (const { updateId, ...data } of upserts) { + send(response, { type: upsertType, ids: [updateId], data: mapSyncAssetV1(data) }); + } + } + + private async syncAssetExifsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { + const upserts = this.syncRepository.getAssetExifsUpserts(auth.user.id, checkpointMap[SyncEntityType.AssetExifV1]); + for await (const { updateId, ...data } of upserts) { + send(response, { type: SyncEntityType.AssetExifV1, ids: [updateId], data }); + } + } + + private async syncPartnerAssetExifsV1( + response: Writable, + checkpointMap: CheckpointMap, + auth: AuthDto, + sessionId: string, + ) { + const backfillType = SyncEntityType.PartnerAssetExifBackfillV1; + const upsertType = SyncEntityType.PartnerAssetExifV1; + + const backfillCheckpoint = checkpointMap[backfillType]; + const upsertCheckpoint = checkpointMap[upsertType]; + + const partners = await this.syncRepository.getPartnerBackfill(auth.user.id, backfillCheckpoint?.updateId); + + if (upsertCheckpoint) { + const endId = upsertCheckpoint.updateId; + + for (const partner of partners) { + if (isEntityBackfillComplete(partner, backfillCheckpoint)) { + continue; + } + + const startId = getStartId(partner, backfillCheckpoint); + const backfill = this.syncRepository.getPartnerAssetExifsBackfill(partner.sharedById, startId, endId); + + for await (const { updateId, ...data } of backfill) { + send(response, { type: backfillType, ids: [updateId], data }); + } + + sendEntityBackfillCompleteAck(response, backfillType, partner.sharedById); + } + } else if (partners.length > 0) { + await this.upsertBackfillCheckpoint({ + type: backfillType, + sessionId, + createId: partners.at(-1)!.createId, + }); + } + + const upserts = this.syncRepository.getPartnerAssetExifsUpserts(auth.user.id, checkpointMap[upsertType]); + for await (const { updateId, ...data } of upserts) { + send(response, { type: upsertType, ids: [updateId], data }); + } + } + + private async syncAlbumsV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { + const deletes = this.syncRepository.getAlbumDeletes(auth.user.id, checkpointMap[SyncEntityType.AlbumDeleteV1]); + for await (const { id, ...data } of deletes) { + send(response, { type: SyncEntityType.AlbumDeleteV1, ids: [id], data }); + } + + const upserts = this.syncRepository.getAlbumUpserts(auth.user.id, checkpointMap[SyncEntityType.AlbumV1]); + for await (const { updateId, ...data } of upserts) { + send(response, { type: SyncEntityType.AlbumV1, ids: [updateId], data }); + } + } + + private async syncAlbumUsersV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) { + const deletes = this.syncRepository.getAlbumUserDeletes( + auth.user.id, + checkpointMap[SyncEntityType.AlbumUserDeleteV1], + ); + for await (const { id, ...data } of deletes) { + send(response, { type: SyncEntityType.AlbumUserDeleteV1, ids: [id], data }); + } + + const upserts = this.syncRepository.getAlbumUserUpserts(auth.user.id, checkpointMap[SyncEntityType.AlbumUserV1]); + for await (const { updateId, ...data } of upserts) { + send(response, { type: SyncEntityType.AlbumUserV1, ids: [updateId], data }); + } + } + + private async upsertBackfillCheckpoint(item: { type: SyncEntityType; sessionId: string; createId: string }) { + const { type, sessionId, createId } = item; + await this.syncRepository.upsertCheckpoints([ + { + type, + sessionId, + ack: toAck({ + type, + updateId: createId, + extraId: COMPLETE_ID, + }), + }, + ]); + } + async getFullSync(auth: AuthDto, dto: AssetFullSyncDto): Promise { // mobile implementation is faster if this is a single id const userId = dto.userId || auth.user.id; diff --git a/server/src/utils/sync.ts b/server/src/utils/sync.ts index 893c94dfcd..82222708af 100644 --- a/server/src/utils/sync.ts +++ b/server/src/utils/sync.ts @@ -18,14 +18,17 @@ export const toAck = ({ type, updateId, extraId }: SyncAck) => export const mapJsonLine = (object: unknown) => JSON.stringify(object) + '\n'; +export type SerializeOptions = { + type: T; + data: Exact; + ids: [string] | [string, string]; + ackType?: SyncEntityType; +}; + export const serialize = ({ type, data, ids, ackType, -}: { - type: T; - data: Exact; - ids: [string] | [string, string]; - ackType?: SyncEntityType; -}) => mapJsonLine({ type, data, ack: toAck({ type: ackType ?? type, updateId: ids[0], extraId: ids[1] }) }); +}: SerializeOptions) => + mapJsonLine({ type, data, ack: toAck({ type: ackType ?? type, updateId: ids[0], extraId: ids[1] }) });