From 3d24230269fd9b310f66eda2fa11dcd2a4093636 Mon Sep 17 00:00:00 2001 From: Zack Pollard Date: Fri, 7 Mar 2025 12:25:22 +0000 Subject: [PATCH] refactor: remove duplicate where clause and orderBy statements in sync queries --- server/src/repositories/sync.repository.ts | 77 ++++++------ server/test/medium/specs/sync.service.spec.ts | 112 ++++++++++++++++-- 2 files changed, 141 insertions(+), 48 deletions(-) diff --git a/server/src/repositories/sync.repository.ts b/server/src/repositories/sync.repository.ts index 14488be137..caaa584ac8 100644 --- a/server/src/repositories/sync.repository.ts +++ b/server/src/repositories/sync.repository.ts @@ -1,11 +1,14 @@ import { Injectable } from '@nestjs/common'; -import { Insertable, Kysely, sql } from 'kysely'; +import { Insertable, Kysely, SelectQueryBuilder, sql } from 'kysely'; import { InjectKysely } from 'nestjs-kysely'; import { columns } from 'src/database'; import { DB, SessionSyncCheckpoints } from 'src/db'; import { SyncEntityType } from 'src/enum'; import { SyncAck } from 'src/types'; +type auditTables = 'users_audit' | 'partners_audit' | 'assets_audit'; +type upsertTables = 'users' | 'partners' | 'assets' | 'exif'; + @Injectable() export class SyncRepository { constructor(@InjectKysely() private db: Kysely) {} @@ -42,9 +45,7 @@ export class SyncRepository { return this.db .selectFrom('users') .select(['id', 'name', 'email', 'deletedAt', 'updateId']) - .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) - .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .orderBy(['updateId asc']) + .$call((qb) => this.upsertTableFilters(qb, ack)) .stream(); } @@ -52,9 +53,7 @@ export class SyncRepository { return this.db .selectFrom('users_audit') .select(['id', 'userId']) - .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) - .where('deletedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .orderBy(['id asc']) + .$call((qb) => this.auditTableFilters(qb, ack)) .stream(); } @@ -63,9 +62,7 @@ export class SyncRepository { .selectFrom('partners') .select(['sharedById', 'sharedWithId', 'inTimeline', 'updateId']) .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) - .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) - .orderBy(['updateId asc']) + .$call((qb) => this.upsertTableFilters(qb, ack)) .stream(); } @@ -74,9 +71,7 @@ export class SyncRepository { .selectFrom('partners_audit') .select(['id', 'sharedById', 'sharedWithId']) .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) - .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) - .where('deletedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .orderBy(['id asc']) + .$call((qb) => this.auditTableFilters(qb, ack)) .stream(); } @@ -85,9 +80,7 @@ export class SyncRepository { .selectFrom('assets') .select(columns.syncAsset) .where('ownerId', '=', userId) - .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) - .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .orderBy(['updateId asc']) + .$call((qb) => this.upsertTableFilters(qb, ack)) .stream(); } @@ -98,9 +91,7 @@ export class SyncRepository { .where('ownerId', 'in', (eb) => eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), ) - .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) - .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .orderBy(['updateId asc']) + .$call((qb) => this.upsertTableFilters(qb, ack)) .stream(); } @@ -109,9 +100,7 @@ export class SyncRepository { .selectFrom('assets_audit') .select(['id', 'assetId']) .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) - .where('ownerId', '=', userId) - .where('deletedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .orderBy(['id asc']) + .$call((qb) => this.auditTableFilters(qb, ack)) .stream(); } @@ -122,35 +111,51 @@ export class SyncRepository { .where('ownerId', 'in', (eb) => eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), ) - .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) - .where('deletedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .orderBy(['id asc']) + .$call((qb) => this.auditTableFilters(qb, ack)) .stream(); } getAssetExifsUpserts(userId: string, ack?: SyncAck) { return this.db .selectFrom('exif') - .innerJoin('assets', 'assets.id', 'exif.assetId') .select(columns.syncAssetExif) - .where('assets.ownerId', '=', userId) - .$if(!!ack, (qb) => qb.where('exif.updateId', '>', ack!.updateId)) - .where('exif.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .orderBy(['exif.updateId asc']) + .where('assetId', 'in', (eb) => eb.selectFrom('assets').select('id').where('ownerId', '=', userId)) + .$call((qb) => this.upsertTableFilters(qb, ack)) .stream(); } getPartnerAssetExifsUpserts(userId: string, ack?: SyncAck) { return this.db .selectFrom('exif') - .innerJoin('assets', 'assets.id', 'exif.assetId') .select(columns.syncAssetExif) - .where('assets.ownerId', 'in', (eb) => - eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), + .where('assetId', 'in', (eb) => + eb + .selectFrom('assets') + .select('id') + .where('ownerId', 'in', (eb) => + eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), + ), ) - .$if(!!ack, (qb) => qb.where('exif.updateId', '>', ack!.updateId)) - .where('exif.updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) - .orderBy(['exif.updateId asc']) + .$call((qb) => this.upsertTableFilters(qb, ack)) .stream(); } + + private auditTableFilters, D>(qb: SelectQueryBuilder, ack?: SyncAck) { + const builder = qb as SelectQueryBuilder; + return builder + .where('deletedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) + .orderBy(['id asc']) as SelectQueryBuilder; + } + + private upsertTableFilters, D>( + qb: SelectQueryBuilder, + ack?: SyncAck, + ) { + const builder = qb as SelectQueryBuilder; + return builder + .where('updatedAt', '<', sql.raw("now() - interval '1 millisecond'")) + .$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) + .orderBy(['updateId asc']) as SelectQueryBuilder; + } } diff --git a/server/test/medium/specs/sync.service.spec.ts b/server/test/medium/specs/sync.service.spec.ts index a4a8c53982..574ddde93c 100644 --- a/server/test/medium/specs/sync.service.spec.ts +++ b/server/test/medium/specs/sync.service.spec.ts @@ -41,7 +41,7 @@ describe(SyncService.name, () => { expect(SYNC_TYPES_ORDER.length).toBe(Object.keys(SyncRequestType).length); }); - describe.concurrent('users', () => { + describe.concurrent(SyncEntityType.UserV1, () => { it('should detect and sync the first user', async () => { const { context, auth, sut, testSync } = await setup(); @@ -197,7 +197,7 @@ describe(SyncService.name, () => { }); }); - describe.concurrent('partners', () => { + describe.concurrent(SyncEntityType.PartnerV1, () => { it('should detect and sync the first partner', async () => { const { auth, context, sut, testSync } = await setup(); @@ -357,7 +357,7 @@ describe(SyncService.name, () => { ); }); - it('should not sync a partner for an unrelated user', async () => { + it('should not sync a partner or partner delete for an unrelated user', async () => { const { auth, context, testSync } = await setup(); const user2 = await context.createUser(); @@ -365,9 +365,11 @@ describe(SyncService.name, () => { await context.createPartner({ sharedById: user2.id, sharedWithId: user3.id }); - const response = await testSync(auth, [SyncRequestType.PartnersV1]); + expect(await testSync(auth, [SyncRequestType.PartnersV1])).toHaveLength(0); - expect(response).toHaveLength(0); + await context.partner.remove({ sharedById: user2.id, sharedWithId: user3.id }); + + expect(await testSync(auth, [SyncRequestType.PartnersV1])).toHaveLength(0); }); it('should not sync a partner delete after a user is deleted', async () => { @@ -377,13 +379,11 @@ describe(SyncService.name, () => { await context.createPartner({ sharedById: user2.id, sharedWithId: auth.user.id }); await context.user.delete({ id: user2.id }, true); - const response = await testSync(auth, [SyncRequestType.PartnersV1]); - - expect(response).toHaveLength(0); + expect(await testSync(auth, [SyncRequestType.PartnersV1])).toHaveLength(0); }); }); - describe.concurrent('assets', () => { + describe.concurrent(SyncEntityType.AssetV1, () => { it('should detect and sync the first asset', async () => { const { auth, context, sut, testSync } = await setup(); @@ -464,17 +464,22 @@ describe(SyncService.name, () => { expect(ackSyncResponse).toHaveLength(0); }); - it('should not sync an asset for an unrelated user', async () => { + it('should not sync an asset or asset delete for an unrelated user', async () => { const { auth, context, testSync } = await setup(); const user2 = await context.createUser(); + const session = TestFactory.session({ userId: user2.id }); + const auth2 = TestFactory.auth({ session, user: user2 }); const asset = TestFactory.asset({ ownerId: user2.id }); await context.createAsset(asset); - const response = await testSync(auth, [SyncRequestType.AssetsV1]); + expect(await testSync(auth2, [SyncRequestType.AssetsV1])).toHaveLength(1); + expect(await testSync(auth, [SyncRequestType.AssetsV1])).toHaveLength(0); - expect(response).toHaveLength(0); + await context.asset.remove(asset); + expect(await testSync(auth2, [SyncRequestType.AssetsV1])).toHaveLength(1); + expect(await testSync(auth, [SyncRequestType.AssetsV1])).toHaveLength(0); }); }); @@ -591,6 +596,40 @@ describe(SyncService.name, () => { await expect(testSync(auth, [SyncRequestType.PartnerAssetsV1])).resolves.toHaveLength(0); }); + + it('should not sync an asset or asset delete for own user', async () => { + const { auth, context, testSync } = await setup(); + + const user2 = await context.createUser(); + const asset = await context.createAsset({ ownerId: auth.user.id }); + const partner = { sharedById: user2.id, sharedWithId: auth.user.id }; + await context.partner.create(partner); + + await expect(testSync(auth, [SyncRequestType.AssetsV1])).resolves.toHaveLength(1); + await expect(testSync(auth, [SyncRequestType.PartnerAssetsV1])).resolves.toHaveLength(0); + + await context.asset.remove(asset); + + await expect(testSync(auth, [SyncRequestType.AssetsV1])).resolves.toHaveLength(1); + await expect(testSync(auth, [SyncRequestType.PartnerAssetsV1])).resolves.toHaveLength(0); + }); + + it('should not sync an asset or asset delete for unrelated user', async () => { + const { auth, context, testSync } = await setup(); + + const user2 = await context.createUser(); + const session = TestFactory.session({ userId: user2.id }); + const auth2 = TestFactory.auth({ session, user: user2 }); + const asset = await context.createAsset({ ownerId: user2.id }); + + await expect(testSync(auth2, [SyncRequestType.AssetsV1])).resolves.toHaveLength(1); + await expect(testSync(auth, [SyncRequestType.PartnerAssetsV1])).resolves.toHaveLength(0); + + await context.asset.remove(asset); + + await expect(testSync(auth2, [SyncRequestType.AssetsV1])).resolves.toHaveLength(1); + await expect(testSync(auth, [SyncRequestType.PartnerAssetsV1])).resolves.toHaveLength(0); + }); }); describe.concurrent(SyncRequestType.AssetExifsV1, () => { @@ -649,6 +688,24 @@ describe(SyncService.name, () => { expect(ackSyncResponse).toHaveLength(0); }); + + it('should only sync asset exif for own user', async () => { + const { auth, context, testSync } = await setup(); + + const user2 = await context.createUser(); + const session = TestFactory.session({ userId: user2.id }); + const auth2 = TestFactory.auth({ session, user: user2 }); + + await context.partner.create({ sharedById: user2.id, sharedWithId: auth.user.id }); + const asset = TestFactory.asset({ ownerId: user2.id }); + const exif = { assetId: asset.id, make: 'Canon' }; + + await context.createAsset(asset); + await context.asset.upsertExif(exif); + + await expect(testSync(auth2, [SyncRequestType.AssetExifsV1])).resolves.toHaveLength(1); + await expect(testSync(auth, [SyncRequestType.AssetExifsV1])).resolves.toHaveLength(0); + }); }); describe.concurrent(SyncRequestType.PartnerAssetExifsV1, () => { @@ -708,5 +765,36 @@ describe(SyncService.name, () => { expect(ackSyncResponse).toHaveLength(0); }); + + it('should not sync partner asset exif for own user', async () => { + const { auth, context, testSync } = await setup(); + + const user2 = await context.createUser(); + await context.partner.create({ sharedById: user2.id, sharedWithId: auth.user.id }); + const asset = TestFactory.asset({ ownerId: auth.user.id }); + const exif = { assetId: asset.id, make: 'Canon' }; + await context.createAsset(asset); + await context.asset.upsertExif(exif); + + await expect(testSync(auth, [SyncRequestType.AssetExifsV1])).resolves.toHaveLength(1); + await expect(testSync(auth, [SyncRequestType.PartnerAssetExifsV1])).resolves.toHaveLength(0); + }); + + it('should not sync partner asset exif for unrelated user', async () => { + const { auth, context, testSync } = await setup(); + + const user2 = await context.createUser(); + const user3 = await context.createUser(); + const session = TestFactory.session({ userId: user3.id }); + const authUser3 = TestFactory.auth({ session, user: user3 }); + await context.partner.create({ sharedById: user2.id, sharedWithId: auth.user.id }); + const asset = TestFactory.asset({ ownerId: user3.id }); + const exif = { assetId: asset.id, make: 'Canon' }; + await context.createAsset(asset); + await context.asset.upsertExif(exif); + + await expect(testSync(authUser3, [SyncRequestType.AssetExifsV1])).resolves.toHaveLength(1); + await expect(testSync(auth, [SyncRequestType.PartnerAssetExifsV1])).resolves.toHaveLength(0); + }); }); });