refactor: remove duplicate where clause and orderBy statements in sync queries

This commit is contained in:
Zack Pollard 2025-03-07 12:25:22 +00:00 committed by Alex
parent 423a570949
commit 3d24230269
No known key found for this signature in database
GPG Key ID: 53CD082B3A5E1082
2 changed files with 141 additions and 48 deletions

View File

@ -1,11 +1,14 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { Insertable, Kysely, sql } from 'kysely'; import { Insertable, Kysely, SelectQueryBuilder, sql } from 'kysely';
import { InjectKysely } from 'nestjs-kysely'; import { InjectKysely } from 'nestjs-kysely';
import { columns } from 'src/database'; import { columns } from 'src/database';
import { DB, SessionSyncCheckpoints } from 'src/db'; import { DB, SessionSyncCheckpoints } from 'src/db';
import { SyncEntityType } from 'src/enum'; import { SyncEntityType } from 'src/enum';
import { SyncAck } from 'src/types'; import { SyncAck } from 'src/types';
type auditTables = 'users_audit' | 'partners_audit' | 'assets_audit';
type upsertTables = 'users' | 'partners' | 'assets' | 'exif';
@Injectable() @Injectable()
export class SyncRepository { export class SyncRepository {
constructor(@InjectKysely() private db: Kysely<DB>) {} constructor(@InjectKysely() private db: Kysely<DB>) {}
@ -42,9 +45,7 @@ export class SyncRepository {
return this.db return this.db
.selectFrom('users') .selectFrom('users')
.select(['id', 'name', 'email', 'deletedAt', 'updateId']) .select(['id', 'name', 'email', 'deletedAt', 'updateId'])
.$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) .$call((qb) => this.upsertTableFilters(qb, ack))
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.orderBy(['updateId asc'])
.stream(); .stream();
} }
@ -52,9 +53,7 @@ export class SyncRepository {
return this.db return this.db
.selectFrom('users_audit') .selectFrom('users_audit')
.select(['id', 'userId']) .select(['id', 'userId'])
.$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) .$call((qb) => this.auditTableFilters(qb, ack))
.where('deletedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.orderBy(['id asc'])
.stream(); .stream();
} }
@ -63,9 +62,7 @@ export class SyncRepository {
.selectFrom('partners') .selectFrom('partners')
.select(['sharedById', 'sharedWithId', 'inTimeline', 'updateId']) .select(['sharedById', 'sharedWithId', 'inTimeline', 'updateId'])
.where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)]))
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'")) .$call((qb) => this.upsertTableFilters(qb, ack))
.$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId))
.orderBy(['updateId asc'])
.stream(); .stream();
} }
@ -74,9 +71,7 @@ export class SyncRepository {
.selectFrom('partners_audit') .selectFrom('partners_audit')
.select(['id', 'sharedById', 'sharedWithId']) .select(['id', 'sharedById', 'sharedWithId'])
.where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)])) .where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)]))
.$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) .$call((qb) => this.auditTableFilters(qb, ack))
.where('deletedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.orderBy(['id asc'])
.stream(); .stream();
} }
@ -85,9 +80,7 @@ export class SyncRepository {
.selectFrom('assets') .selectFrom('assets')
.select(columns.syncAsset) .select(columns.syncAsset)
.where('ownerId', '=', userId) .where('ownerId', '=', userId)
.$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) .$call((qb) => this.upsertTableFilters(qb, ack))
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.orderBy(['updateId asc'])
.stream(); .stream();
} }
@ -98,9 +91,7 @@ export class SyncRepository {
.where('ownerId', 'in', (eb) => .where('ownerId', 'in', (eb) =>
eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId),
) )
.$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId)) .$call((qb) => this.upsertTableFilters(qb, ack))
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.orderBy(['updateId asc'])
.stream(); .stream();
} }
@ -109,9 +100,7 @@ export class SyncRepository {
.selectFrom('assets_audit') .selectFrom('assets_audit')
.select(['id', 'assetId']) .select(['id', 'assetId'])
.$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) .$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId))
.where('ownerId', '=', userId) .$call((qb) => this.auditTableFilters(qb, ack))
.where('deletedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.orderBy(['id asc'])
.stream(); .stream();
} }
@ -122,35 +111,51 @@ export class SyncRepository {
.where('ownerId', 'in', (eb) => .where('ownerId', 'in', (eb) =>
eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId),
) )
.$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId)) .$call((qb) => this.auditTableFilters(qb, ack))
.where('deletedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.orderBy(['id asc'])
.stream(); .stream();
} }
getAssetExifsUpserts(userId: string, ack?: SyncAck) { getAssetExifsUpserts(userId: string, ack?: SyncAck) {
return this.db return this.db
.selectFrom('exif') .selectFrom('exif')
.innerJoin('assets', 'assets.id', 'exif.assetId')
.select(columns.syncAssetExif) .select(columns.syncAssetExif)
.where('assets.ownerId', '=', userId) .where('assetId', 'in', (eb) => eb.selectFrom('assets').select('id').where('ownerId', '=', userId))
.$if(!!ack, (qb) => qb.where('exif.updateId', '>', ack!.updateId)) .$call((qb) => this.upsertTableFilters(qb, ack))
.where('exif.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.orderBy(['exif.updateId asc'])
.stream(); .stream();
} }
getPartnerAssetExifsUpserts(userId: string, ack?: SyncAck) { getPartnerAssetExifsUpserts(userId: string, ack?: SyncAck) {
return this.db return this.db
.selectFrom('exif') .selectFrom('exif')
.innerJoin('assets', 'assets.id', 'exif.assetId')
.select(columns.syncAssetExif) .select(columns.syncAssetExif)
.where('assets.ownerId', 'in', (eb) => .where('assetId', 'in', (eb) =>
eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId), eb
.selectFrom('assets')
.select('id')
.where('ownerId', 'in', (eb) =>
eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId),
),
) )
.$if(!!ack, (qb) => qb.where('exif.updateId', '>', ack!.updateId)) .$call((qb) => this.upsertTableFilters(qb, ack))
.where('exif.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.orderBy(['exif.updateId asc'])
.stream(); .stream();
} }
private auditTableFilters<T extends keyof Pick<DB, auditTables>, D>(qb: SelectQueryBuilder<DB, T, D>, ack?: SyncAck) {
const builder = qb as SelectQueryBuilder<DB, auditTables, D>;
return builder
.where('deletedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId))
.orderBy(['id asc']) as SelectQueryBuilder<DB, T, D>;
}
private upsertTableFilters<T extends keyof Pick<DB, upsertTables>, D>(
qb: SelectQueryBuilder<DB, T, D>,
ack?: SyncAck,
) {
const builder = qb as SelectQueryBuilder<DB, upsertTables, D>;
return builder
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId))
.orderBy(['updateId asc']) as SelectQueryBuilder<DB, T, D>;
}
} }

View File

@ -41,7 +41,7 @@ describe(SyncService.name, () => {
expect(SYNC_TYPES_ORDER.length).toBe(Object.keys(SyncRequestType).length); expect(SYNC_TYPES_ORDER.length).toBe(Object.keys(SyncRequestType).length);
}); });
describe.concurrent('users', () => { describe.concurrent(SyncEntityType.UserV1, () => {
it('should detect and sync the first user', async () => { it('should detect and sync the first user', async () => {
const { context, auth, sut, testSync } = await setup(); const { context, auth, sut, testSync } = await setup();
@ -197,7 +197,7 @@ describe(SyncService.name, () => {
}); });
}); });
describe.concurrent('partners', () => { describe.concurrent(SyncEntityType.PartnerV1, () => {
it('should detect and sync the first partner', async () => { it('should detect and sync the first partner', async () => {
const { auth, context, sut, testSync } = await setup(); const { auth, context, sut, testSync } = await setup();
@ -357,7 +357,7 @@ describe(SyncService.name, () => {
); );
}); });
it('should not sync a partner for an unrelated user', async () => { it('should not sync a partner or partner delete for an unrelated user', async () => {
const { auth, context, testSync } = await setup(); const { auth, context, testSync } = await setup();
const user2 = await context.createUser(); const user2 = await context.createUser();
@ -365,9 +365,11 @@ describe(SyncService.name, () => {
await context.createPartner({ sharedById: user2.id, sharedWithId: user3.id }); await context.createPartner({ sharedById: user2.id, sharedWithId: user3.id });
const response = await testSync(auth, [SyncRequestType.PartnersV1]); expect(await testSync(auth, [SyncRequestType.PartnersV1])).toHaveLength(0);
expect(response).toHaveLength(0); await context.partner.remove({ sharedById: user2.id, sharedWithId: user3.id });
expect(await testSync(auth, [SyncRequestType.PartnersV1])).toHaveLength(0);
}); });
it('should not sync a partner delete after a user is deleted', async () => { it('should not sync a partner delete after a user is deleted', async () => {
@ -377,13 +379,11 @@ describe(SyncService.name, () => {
await context.createPartner({ sharedById: user2.id, sharedWithId: auth.user.id }); await context.createPartner({ sharedById: user2.id, sharedWithId: auth.user.id });
await context.user.delete({ id: user2.id }, true); await context.user.delete({ id: user2.id }, true);
const response = await testSync(auth, [SyncRequestType.PartnersV1]); expect(await testSync(auth, [SyncRequestType.PartnersV1])).toHaveLength(0);
expect(response).toHaveLength(0);
}); });
}); });
describe.concurrent('assets', () => { describe.concurrent(SyncEntityType.AssetV1, () => {
it('should detect and sync the first asset', async () => { it('should detect and sync the first asset', async () => {
const { auth, context, sut, testSync } = await setup(); const { auth, context, sut, testSync } = await setup();
@ -464,17 +464,22 @@ describe(SyncService.name, () => {
expect(ackSyncResponse).toHaveLength(0); expect(ackSyncResponse).toHaveLength(0);
}); });
it('should not sync an asset for an unrelated user', async () => { it('should not sync an asset or asset delete for an unrelated user', async () => {
const { auth, context, testSync } = await setup(); const { auth, context, testSync } = await setup();
const user2 = await context.createUser(); const user2 = await context.createUser();
const session = TestFactory.session({ userId: user2.id });
const auth2 = TestFactory.auth({ session, user: user2 });
const asset = TestFactory.asset({ ownerId: user2.id }); const asset = TestFactory.asset({ ownerId: user2.id });
await context.createAsset(asset); await context.createAsset(asset);
const response = await testSync(auth, [SyncRequestType.AssetsV1]); expect(await testSync(auth2, [SyncRequestType.AssetsV1])).toHaveLength(1);
expect(await testSync(auth, [SyncRequestType.AssetsV1])).toHaveLength(0);
expect(response).toHaveLength(0); await context.asset.remove(asset);
expect(await testSync(auth2, [SyncRequestType.AssetsV1])).toHaveLength(1);
expect(await testSync(auth, [SyncRequestType.AssetsV1])).toHaveLength(0);
}); });
}); });
@ -591,6 +596,40 @@ describe(SyncService.name, () => {
await expect(testSync(auth, [SyncRequestType.PartnerAssetsV1])).resolves.toHaveLength(0); await expect(testSync(auth, [SyncRequestType.PartnerAssetsV1])).resolves.toHaveLength(0);
}); });
it('should not sync an asset or asset delete for own user', async () => {
const { auth, context, testSync } = await setup();
const user2 = await context.createUser();
const asset = await context.createAsset({ ownerId: auth.user.id });
const partner = { sharedById: user2.id, sharedWithId: auth.user.id };
await context.partner.create(partner);
await expect(testSync(auth, [SyncRequestType.AssetsV1])).resolves.toHaveLength(1);
await expect(testSync(auth, [SyncRequestType.PartnerAssetsV1])).resolves.toHaveLength(0);
await context.asset.remove(asset);
await expect(testSync(auth, [SyncRequestType.AssetsV1])).resolves.toHaveLength(1);
await expect(testSync(auth, [SyncRequestType.PartnerAssetsV1])).resolves.toHaveLength(0);
});
it('should not sync an asset or asset delete for unrelated user', async () => {
const { auth, context, testSync } = await setup();
const user2 = await context.createUser();
const session = TestFactory.session({ userId: user2.id });
const auth2 = TestFactory.auth({ session, user: user2 });
const asset = await context.createAsset({ ownerId: user2.id });
await expect(testSync(auth2, [SyncRequestType.AssetsV1])).resolves.toHaveLength(1);
await expect(testSync(auth, [SyncRequestType.PartnerAssetsV1])).resolves.toHaveLength(0);
await context.asset.remove(asset);
await expect(testSync(auth2, [SyncRequestType.AssetsV1])).resolves.toHaveLength(1);
await expect(testSync(auth, [SyncRequestType.PartnerAssetsV1])).resolves.toHaveLength(0);
});
}); });
describe.concurrent(SyncRequestType.AssetExifsV1, () => { describe.concurrent(SyncRequestType.AssetExifsV1, () => {
@ -649,6 +688,24 @@ describe(SyncService.name, () => {
expect(ackSyncResponse).toHaveLength(0); expect(ackSyncResponse).toHaveLength(0);
}); });
it('should only sync asset exif for own user', async () => {
const { auth, context, testSync } = await setup();
const user2 = await context.createUser();
const session = TestFactory.session({ userId: user2.id });
const auth2 = TestFactory.auth({ session, user: user2 });
await context.partner.create({ sharedById: user2.id, sharedWithId: auth.user.id });
const asset = TestFactory.asset({ ownerId: user2.id });
const exif = { assetId: asset.id, make: 'Canon' };
await context.createAsset(asset);
await context.asset.upsertExif(exif);
await expect(testSync(auth2, [SyncRequestType.AssetExifsV1])).resolves.toHaveLength(1);
await expect(testSync(auth, [SyncRequestType.AssetExifsV1])).resolves.toHaveLength(0);
});
}); });
describe.concurrent(SyncRequestType.PartnerAssetExifsV1, () => { describe.concurrent(SyncRequestType.PartnerAssetExifsV1, () => {
@ -708,5 +765,36 @@ describe(SyncService.name, () => {
expect(ackSyncResponse).toHaveLength(0); expect(ackSyncResponse).toHaveLength(0);
}); });
it('should not sync partner asset exif for own user', async () => {
const { auth, context, testSync } = await setup();
const user2 = await context.createUser();
await context.partner.create({ sharedById: user2.id, sharedWithId: auth.user.id });
const asset = TestFactory.asset({ ownerId: auth.user.id });
const exif = { assetId: asset.id, make: 'Canon' };
await context.createAsset(asset);
await context.asset.upsertExif(exif);
await expect(testSync(auth, [SyncRequestType.AssetExifsV1])).resolves.toHaveLength(1);
await expect(testSync(auth, [SyncRequestType.PartnerAssetExifsV1])).resolves.toHaveLength(0);
});
it('should not sync partner asset exif for unrelated user', async () => {
const { auth, context, testSync } = await setup();
const user2 = await context.createUser();
const user3 = await context.createUser();
const session = TestFactory.session({ userId: user3.id });
const authUser3 = TestFactory.auth({ session, user: user3 });
await context.partner.create({ sharedById: user2.id, sharedWithId: auth.user.id });
const asset = TestFactory.asset({ ownerId: user3.id });
const exif = { assetId: asset.id, make: 'Canon' };
await context.createAsset(asset);
await context.asset.upsertExif(exif);
await expect(testSync(authUser3, [SyncRequestType.AssetExifsV1])).resolves.toHaveLength(1);
await expect(testSync(auth, [SyncRequestType.PartnerAssetExifsV1])).resolves.toHaveLength(0);
});
}); });
}); });