diff --git a/server/src/queries/sync.repository.sql b/server/src/queries/sync.repository.sql index 4c4747e5da..26aaf014d9 100644 --- a/server/src/queries/sync.repository.sql +++ b/server/src/queries/sync.repository.sql @@ -130,7 +130,7 @@ from where "ownerId" = $1 and "updatedAt" < now() - interval '1 millisecond' - and "updateId" < $2 + and "updateId" <= $2 and "updateId" >= $3 order by "updateId" asc @@ -274,7 +274,7 @@ from where "assets"."ownerId" = $1 and "exif"."updatedAt" < now() - interval '1 millisecond' - and "exif"."updateId" < $2 + and "exif"."updateId" <= $2 and "exif"."updateId" >= $3 order by "exif"."updateId" asc @@ -418,7 +418,7 @@ from where "albumsId" = $1 and "updatedAt" < now() - interval '1 millisecond' - and "updateId" < $2 + and "updateId" <= $2 and "updateId" >= $3 order by "updateId" asc diff --git a/server/src/repositories/sync.repository.ts b/server/src/repositories/sync.repository.ts index dee419f334..341966c6fa 100644 --- a/server/src/repositories/sync.repository.ts +++ b/server/src/repositories/sync.repository.ts @@ -111,7 +111,7 @@ export class SyncRepository { .select(columns.syncAsset) .where('ownerId', '=', partnerId) .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .where('updateId', '<', beforeUpdateId) + .where('updateId', '<=', beforeUpdateId) .$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!)) .orderBy('updateId', 'asc') .stream(); @@ -169,7 +169,7 @@ export class SyncRepository { .innerJoin('assets', 'assets.id', 'exif.assetId') .where('assets.ownerId', '=', partnerId) .where('exif.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .where('exif.updateId', '<', beforeUpdateId) + .where('exif.updateId', '<=', beforeUpdateId) .$if(!!afterUpdateId, (eb) => eb.where('exif.updateId', '>=', afterUpdateId!)) .orderBy('exif.updateId', 'asc') .stream(); @@ -273,7 +273,7 @@ export class SyncRepository { .select(columns.syncAlbumUser) .where('albumsId', '=', albumId) .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .where('updateId', '<', beforeUpdateId) + .where('updateId', '<=', beforeUpdateId) .$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!)) .orderBy('updateId', 'asc') .stream(); diff --git a/server/src/services/sync.service.ts b/server/src/services/sync.service.ts index 9021aa57e9..733dd9036a 100644 --- a/server/src/services/sync.service.ts +++ b/server/src/services/sync.service.ts @@ -38,11 +38,11 @@ const mapSyncAssetV1 = ({ checksum, thumbhash, ...data }: AssetLike): SyncAssetV thumbhash: thumbhash ? hexOrBufferToBase64(thumbhash) : null, }); -const isEntityBackfillComplete = (entity: { createId: string }, checkpoint: SyncAck | undefined): boolean => - entity.createId === checkpoint?.updateId && checkpoint.extraId === COMPLETE_ID; +const isEntityBackfillComplete = (createId: string, checkpoint: SyncAck | undefined): boolean => + createId === checkpoint?.updateId && checkpoint.extraId === COMPLETE_ID; -const getStartId = (entity: { createId: string }, checkpoint: SyncAck | undefined): string | undefined => - checkpoint?.updateId === entity.createId ? checkpoint?.extraId : undefined; +const getStartId = (createId: string, checkpoint: SyncAck | undefined): string | undefined => + createId === checkpoint?.updateId ? checkpoint?.extraId : undefined; const send = (response: Writable, item: SerializeOptions) => { response.write(serialize(item)); @@ -235,22 +235,23 @@ export class SyncService extends BaseService { const endId = upsertCheckpoint.updateId; for (const partner of partners) { - if (isEntityBackfillComplete(partner, backfillCheckpoint)) { + const createId = partner.createId; + if (isEntityBackfillComplete(createId, backfillCheckpoint)) { continue; } - const startId = getStartId(partner, backfillCheckpoint); + const startId = getStartId(createId, backfillCheckpoint); const backfill = this.syncRepository.getPartnerAssetsBackfill(partner.sharedById, startId, endId); for await (const { updateId, ...data } of backfill) { send(response, { type: backfillType, - ids: [updateId], + ids: [createId, updateId], data: mapSyncAssetV1(data), }); } - sendEntityBackfillCompleteAck(response, backfillType, partner.sharedById); + sendEntityBackfillCompleteAck(response, backfillType, createId); } } else if (partners.length > 0) { await this.upsertBackfillCheckpoint({ @@ -291,18 +292,19 @@ export class SyncService extends BaseService { const endId = upsertCheckpoint.updateId; for (const partner of partners) { - if (isEntityBackfillComplete(partner, backfillCheckpoint)) { + const createId = partner.createId; + if (isEntityBackfillComplete(createId, backfillCheckpoint)) { continue; } - const startId = getStartId(partner, backfillCheckpoint); + const startId = getStartId(createId, backfillCheckpoint); const backfill = this.syncRepository.getPartnerAssetExifsBackfill(partner.sharedById, startId, endId); for await (const { updateId, ...data } of backfill) { - send(response, { type: backfillType, ids: [updateId], data }); + send(response, { type: backfillType, ids: [partner.createId, updateId], data }); } - sendEntityBackfillCompleteAck(response, backfillType, partner.sharedById); + sendEntityBackfillCompleteAck(response, backfillType, partner.createId); } } else if (partners.length > 0) { await this.upsertBackfillCheckpoint({ @@ -350,18 +352,19 @@ export class SyncService extends BaseService { const endId = upsertCheckpoint.updateId; for (const album of albums) { - if (isEntityBackfillComplete(album, backfillCheckpoint)) { + const createId = album.createId; + if (isEntityBackfillComplete(createId, backfillCheckpoint)) { continue; } - const startId = getStartId(album, backfillCheckpoint); + const startId = getStartId(createId, backfillCheckpoint); const backfill = this.syncRepository.getAlbumUsersBackfill(album.id, startId, endId); for await (const { updateId, ...data } of backfill) { - send(response, { type: backfillType, ids: [updateId], data }); + send(response, { type: backfillType, ids: [createId, updateId], data }); } - sendEntityBackfillCompleteAck(response, backfillType, album.id); + sendEntityBackfillCompleteAck(response, backfillType, createId); } } else if (albums.length > 0) { await this.upsertBackfillCheckpoint({ diff --git a/server/test/medium/specs/sync/sync-partner-asset-exif.spec.ts b/server/test/medium/specs/sync/sync-partner-asset-exif.spec.ts index edf6eee564..c47c55ecdc 100644 --- a/server/test/medium/specs/sync/sync-partner-asset-exif.spec.ts +++ b/server/test/medium/specs/sync/sync-partner-asset-exif.spec.ts @@ -19,7 +19,7 @@ beforeAll(async () => { defaultDatabase = await getKyselyDB(); }); -describe.concurrent(SyncRequestType.PartnerAssetExifsV1, () => { +describe(SyncRequestType.PartnerAssetExifsV1, () => { it('should detect and sync the first partner asset exif', async () => { const { auth, sut, getRepository, testSync } = await setup(); @@ -78,7 +78,6 @@ describe.concurrent(SyncRequestType.PartnerAssetExifsV1, () => { await sut.setAcks(auth, { acks }); const ackSyncResponse = await testSync(auth, [SyncRequestType.PartnerAssetExifsV1]); - expect(ackSyncResponse).toHaveLength(0); }); @@ -196,6 +195,79 @@ describe.concurrent(SyncRequestType.PartnerAssetExifsV1, () => { expect(finalAcks).toEqual([]); }); + it('should handle partners with users ids lower than a uuidv7', async () => { + const { auth, sut, getRepository, testSync } = await setup(); + + const userRepo = getRepository('user'); + const user2 = mediumFactory.userInsert({ id: '00d4c0af-7695-4cf2-85e6-415eeaf449cb' }); + const user3 = mediumFactory.userInsert({ id: '00e4c0af-7695-4cf2-85e6-415eeaf449cb' }); + await userRepo.create(user2); + await userRepo.create(user3); + + const assetRepo = getRepository('asset'); + const assetUser3 = mediumFactory.assetInsert({ ownerId: user3.id }); + await assetRepo.create(assetUser3); + await assetRepo.upsertExif({ assetId: assetUser3.id, make: 'assetUser3' }); + + await wait(2); + + const assetUser2 = mediumFactory.assetInsert({ ownerId: user2.id }); + await assetRepo.create(assetUser2); + await assetRepo.upsertExif({ assetId: assetUser2.id, make: 'assetUser2' }); + + const partnerRepo = getRepository('partner'); + await partnerRepo.create({ sharedById: user2.id, sharedWithId: auth.user.id }); + + const response = await testSync(auth, [SyncRequestType.PartnerAssetExifsV1]); + + expect(response).toHaveLength(1); + expect(response).toEqual( + expect.arrayContaining([ + { + ack: expect.any(String), + data: expect.objectContaining({ + assetId: assetUser2.id, + }), + type: SyncEntityType.PartnerAssetExifV1, + }, + ]), + ); + + const acks = response.map(({ ack }) => ack); + await sut.setAcks(auth, { acks }); + + // This checks that our ack upsert is correct + const ackUpsertResponse = await testSync(auth, [SyncRequestType.PartnerAssetExifsV1]); + expect(ackUpsertResponse).toEqual([]); + + await partnerRepo.create({ sharedById: user3.id, sharedWithId: auth.user.id }); + + const syncAckResponse = await testSync(auth, [SyncRequestType.PartnerAssetExifsV1]); + expect(syncAckResponse).toHaveLength(2); + expect(syncAckResponse).toEqual( + expect.arrayContaining([ + { + ack: expect.stringMatching(new RegExp(`${SyncEntityType.PartnerAssetExifBackfillV1}\\|.+?\\|.+`)), + data: expect.objectContaining({ + assetId: assetUser3.id, + }), + type: SyncEntityType.PartnerAssetExifBackfillV1, + }, + { + ack: expect.stringContaining(SyncEntityType.PartnerAssetExifBackfillV1), + data: {}, + type: SyncEntityType.SyncAckV1, + }, + ]), + ); + + const syncAckResponseAcks = syncAckResponse.map(({ ack }) => ack); + await sut.setAcks(auth, { acks: [syncAckResponseAcks[1]] }); + + const finalResponse = await testSync(auth, [SyncRequestType.PartnerAssetExifsV1]); + expect(finalResponse).toEqual([]); + }); + it('should only backfill partner assets created prior to the current partner asset checkpoint', async () => { const { auth, sut, getRepository, testSync } = await setup(); @@ -210,13 +282,13 @@ describe.concurrent(SyncRequestType.PartnerAssetExifsV1, () => { const assetUser2 = mediumFactory.assetInsert({ ownerId: user2.id }); const asset2User3 = mediumFactory.assetInsert({ ownerId: user3.id }); await assetRepo.create(assetUser3); - await assetRepo.upsertExif({ assetId: assetUser3.id, make: 'Canon' }); + await assetRepo.upsertExif({ assetId: assetUser3.id, make: 'assetUser3' }); await wait(2); await assetRepo.create(assetUser2); - await assetRepo.upsertExif({ assetId: assetUser2.id, make: 'Canon' }); + await assetRepo.upsertExif({ assetId: assetUser2.id, make: 'assetUser2' }); await wait(2); await assetRepo.create(asset2User3); - await assetRepo.upsertExif({ assetId: asset2User3.id, make: 'Canon' }); + await assetRepo.upsertExif({ assetId: asset2User3.id, make: 'asset2User3' }); const partnerRepo = getRepository('partner'); await partnerRepo.create({ sharedById: user2.id, sharedWithId: auth.user.id }); @@ -246,7 +318,7 @@ describe.concurrent(SyncRequestType.PartnerAssetExifsV1, () => { expect(backfillResponse).toEqual( expect.arrayContaining([ { - ack: expect.any(String), + ack: expect.stringMatching(new RegExp(`${SyncEntityType.PartnerAssetExifBackfillV1}\\|.+?\\|.+`)), data: expect.objectContaining({ assetId: assetUser3.id, }), @@ -270,7 +342,6 @@ describe.concurrent(SyncRequestType.PartnerAssetExifsV1, () => { const backfillAck = backfillResponse[1].ack; const partnerAssetAck = backfillResponse[2].ack; await sut.setAcks(auth, { acks: [backfillAck, partnerAssetAck] }); - const finalResponse = await testSync(auth, [SyncRequestType.PartnerAssetExifsV1]); const finalAcks = finalResponse.map(({ ack }) => ack);