fix: sync new album asset exif added to a shared album

This commit is contained in:
Zack Pollard 2025-08-05 11:17:45 +01:00
parent 56d3a494e9
commit 246be0086e
6 changed files with 275 additions and 60 deletions

View File

@ -675,6 +675,9 @@ export enum SyncEntityType {
AlbumAssetExifCreateV1 = 'AlbumAssetExifCreateV1', // album-to-asset table joined to asset-exif table, updateId from album-to-asset, filter out assets owned by you
// If exif isn't created straight away that might cause a problem with the exif createV1, maybe we just send an empty object if it doesn't exist yet?
// We would ack this on album-to-asset table so that would work ok for future syncs
// If exif didn't exist we would send nothing as it would be filtered out when joining from album-to-asset table to the exif table
// Therefore there would be no create event for that asset and it would be skipped forever
// However when the exif is created, we would send an update event for that exif, as the updateId would be new and we no longer filter out exif where createdAt == updatedAt
AlbumAssetExifUpdateV1 = 'AlbumAssetExifUpdateV1', // asset-exif table, updateId from asset-exif table, filter out assets owned by you
AlbumAssetExifV1 = 'AlbumAssetExifV1',
AlbumAssetExifBackfillV1 = 'AlbumAssetExifBackfillV1',

View File

@ -139,20 +139,21 @@ select
"asset_exif"."profileDescription",
"asset_exif"."rating",
"asset_exif"."fps",
"asset_exif"."updateId"
"album_asset"."updateId"
from
"asset_exif"
inner join "album_asset" on "album_asset"."assetsId" = "asset_exif"."assetId"
"album_asset"
inner join "asset_exif" on "asset_exif"."assetId" = "album_asset"."assetsId"
where
"album_asset"."albumsId" = $1
and "asset_exif"."updateId" < $2
and "asset_exif"."updateId" <= $3
and "asset_exif"."updateId" >= $4
and "album_asset"."updateId" < $2
and "album_asset"."updateId" <= $3
and "album_asset"."updateId" >= $4
order by
"asset_exif"."updateId" asc
"album_asset"."updateId" asc
-- SyncRepository.albumAssetExif.getUpserts
-- SyncRepository.albumAssetExif.getCreates
select
"album_asset"."updateId",
"asset_exif"."assetId",
"asset_exif"."description",
"asset_exif"."exifImageWidth",
@ -177,21 +178,20 @@ select
"asset_exif"."exposureTime",
"asset_exif"."profileDescription",
"asset_exif"."rating",
"asset_exif"."fps",
"asset_exif"."updateId"
"asset_exif"."fps"
from
"asset_exif"
inner join "album_asset" on "album_asset"."assetsId" = "asset_exif"."assetId"
"album_asset"
inner join "asset_exif" on "asset_exif"."assetId" = "album_asset"."assetsId"
inner join "album" on "album"."id" = "album_asset"."albumsId"
left join "album_user" on "album_user"."albumsId" = "album_asset"."albumsId"
where
"asset_exif"."updateId" < $1
"album_asset"."updateId" < $1
and (
"album"."ownerId" = $2
or "album_user"."usersId" = $3
)
order by
"asset_exif"."updateId" asc
"album_asset"."updateId" asc
-- SyncRepository.albumToAsset.getBackfill
select

View File

@ -216,25 +216,26 @@ class AlbumAssetExifSync extends BaseSync {
})
getBackfill({ nowId }: SyncQueryOptions, albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
return this.db
.selectFrom('asset_exif')
.innerJoin('album_asset', 'album_asset.assetsId', 'asset_exif.assetId')
.selectFrom('album_asset')
.innerJoin('asset_exif', 'asset_exif.assetId', 'album_asset.assetsId')
.select(columns.syncAssetExif)
.select('asset_exif.updateId')
.select('album_asset.updateId')
.where('album_asset.albumsId', '=', albumId)
.where('asset_exif.updateId', '<', nowId)
.where('asset_exif.updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('asset_exif.updateId', '>=', afterUpdateId!))
.orderBy('asset_exif.updateId', 'asc')
.where('album_asset.updateId', '<', nowId)
.where('album_asset.updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('album_asset.updateId', '>=', afterUpdateId!))
.orderBy('album_asset.updateId', 'asc')
.stream();
}
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
getUpdates({ nowId, userId }: SyncQueryOptions, albumToAssetAck: SyncAck, ack?: SyncAck) {
return this.db
.selectFrom('asset_exif')
.innerJoin('album_asset', 'album_asset.assetsId', 'asset_exif.assetId')
.select(columns.syncAssetExif)
.select('asset_exif.updateId')
.where('album_asset.updateId', '<=', albumToAssetAck.updateId) // Ensure we only send exif updates for assets that the client already knows about
.where('asset_exif.updateId', '<', nowId)
.$if(!!ack, (qb) => qb.where('asset_exif.updateId', '>', ack!.updateId))
.orderBy('asset_exif.updateId', 'asc')
@ -243,6 +244,22 @@ class AlbumAssetExifSync extends BaseSync {
.where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_user.usersId', '=', userId)]))
.stream();
}
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getCreates({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('album_asset')
.select('album_asset.updateId')
.innerJoin('asset_exif', 'asset_exif.assetId', 'album_asset.assetsId')
.select(columns.syncAssetExif)
.innerJoin('album', 'album.id', 'album_asset.albumsId')
.leftJoin('album_user', 'album_user.albumsId', 'album_asset.albumsId')
.where('album_asset.updateId', '<', nowId)
.where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_user.usersId', '=', userId)]))
.$if(!!ack, (qb) => qb.where('album_asset.updateId', '>', ack!.updateId))
.orderBy('album_asset.updateId', 'asc')
.stream();
}
}
class AlbumToAssetSync extends BaseSync {

View File

@ -465,10 +465,12 @@ export class SyncService extends BaseService {
const backfillType = SyncEntityType.AlbumAssetExifBackfillV1;
const backfillCheckpoint = checkpointMap[backfillType];
const albums = await this.syncRepository.album.getCreatedAfter(options, backfillCheckpoint?.updateId);
const upsertType = SyncEntityType.AlbumAssetExifV1;
const upsertCheckpoint = checkpointMap[upsertType];
if (upsertCheckpoint) {
const endId = upsertCheckpoint.updateId;
const updateType = SyncEntityType.AlbumAssetExifUpdateV1;
const createType = SyncEntityType.AlbumAssetExifCreateV1;
const upsertCheckpoint = checkpointMap[updateType];
const createCheckpoint = checkpointMap[createType];
if (createCheckpoint) {
const endId = createCheckpoint.updateId;
for (const album of albums) {
const createId = album.createId;
@ -493,9 +495,26 @@ export class SyncService extends BaseService {
});
}
const upserts = this.syncRepository.albumAssetExif.getUpserts(options, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) {
send(response, { type: upsertType, ids: [updateId], data });
if (createCheckpoint) {
const updates = this.syncRepository.albumAssetExif.getUpdates(options, createCheckpoint, upsertCheckpoint);
for await (const { updateId, ...data } of updates) {
send(response, { type: updateType, ids: [updateId], data });
}
}
const creates = this.syncRepository.albumAssetExif.getCreates(options, createCheckpoint);
let first = true;
for await (const { updateId, ...data } of creates) {
if (first) {
send(response, {
type: SyncEntityType.SyncAckV1,
data: {},
ackType: SyncEntityType.AlbumAssetExifUpdateV1,
ids: [options.nowId],
});
first = false;
}
send(response, { type: createType, ids: [updateId], data });
}
}

View File

@ -1,5 +1,6 @@
import { Kysely } from 'kysely';
import { AlbumUserRole, SyncEntityType, SyncRequestType } from 'src/enum';
import { AssetRepository } from 'src/repositories/asset.repository';
import { DB } from 'src/schema';
import { SyncTestContext } from 'test/medium.factory';
import { factory } from 'test/small.factory';
@ -13,6 +14,18 @@ const setup = async (db?: Kysely<DB>) => {
return { auth, user, session, ctx };
};
const updateSyncAck = {
ack: expect.stringContaining(SyncEntityType.AlbumAssetExifUpdateV1),
data: {},
type: SyncEntityType.SyncAckV1,
};
const backfillSyncAck = {
ack: expect.stringContaining(SyncEntityType.AlbumAssetExifBackfillV1),
data: {},
type: SyncEntityType.SyncAckV1,
};
beforeAll(async () => {
defaultDatabase = await getKyselyDB();
});
@ -28,8 +41,8 @@ describe(SyncRequestType.AlbumAssetExifsV1, () => {
await ctx.newAlbumUser({ albumId: album.id, userId: auth.user.id, role: AlbumUserRole.Editor });
const response = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]);
expect(response).toHaveLength(1);
expect(response).toEqual([
updateSyncAck,
{
ack: expect.any(String),
data: {
@ -59,9 +72,10 @@ describe(SyncRequestType.AlbumAssetExifsV1, () => {
state: null,
timeZone: null,
},
type: SyncEntityType.AlbumAssetExifV1,
type: SyncEntityType.AlbumAssetExifCreateV1,
},
]);
expect(response).toHaveLength(2);
await ctx.syncAckAll(auth, response);
await expect(ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1])).resolves.toEqual([]);
@ -75,7 +89,7 @@ describe(SyncRequestType.AlbumAssetExifsV1, () => {
await ctx.newAlbumAsset({ albumId: album.id, assetId: asset.id });
await expect(ctx.syncStream(auth, [SyncRequestType.AssetExifsV1])).resolves.toHaveLength(1);
await expect(ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1])).resolves.toHaveLength(1);
await expect(ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1])).resolves.toHaveLength(2);
});
it('should not sync album asset exif for unrelated user', async () => {
@ -97,55 +111,46 @@ describe(SyncRequestType.AlbumAssetExifsV1, () => {
it('should backfill album assets exif when a user shares an album with you', async () => {
const { auth, ctx } = await setup();
const { user: user2 } = await ctx.newUser();
const { asset: asset1Owner } = await ctx.newAsset({ ownerId: auth.user.id });
await ctx.newExif({ assetId: asset1Owner.id, make: 'asset1Owner' });
await wait(2);
const { album: album1 } = await ctx.newAlbum({ ownerId: user2.id });
const { album: album2 } = await ctx.newAlbum({ ownerId: user2.id });
const { asset: asset1User2 } = await ctx.newAsset({ ownerId: user2.id });
await ctx.newExif({ assetId: asset1User2.id, make: 'asset1User2' });
await ctx.newAlbumAsset({ albumId: album2.id, assetId: asset1User2.id });
await wait(2);
const { asset: asset2User2 } = await ctx.newAsset({ ownerId: user2.id });
await ctx.newExif({ assetId: asset2User2.id, make: 'asset2User2' });
await ctx.newAlbumAsset({ albumId: album2.id, assetId: asset2User2.id });
await wait(2);
await ctx.newAlbumAsset({ albumId: album1.id, assetId: asset2User2.id });
await wait(2);
const { asset: asset3User2 } = await ctx.newAsset({ ownerId: user2.id });
await ctx.newAlbumAsset({ albumId: album2.id, assetId: asset3User2.id });
await ctx.newExif({ assetId: asset3User2.id, make: 'asset3User2' });
const { album: album1 } = await ctx.newAlbum({ ownerId: user2.id });
await ctx.newAlbumAsset({ albumId: album1.id, assetId: asset2User2.id });
await wait(2);
await ctx.newAlbumUser({ albumId: album1.id, userId: auth.user.id, role: AlbumUserRole.Editor });
const response = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]);
expect(response).toHaveLength(1);
expect(response).toEqual([
updateSyncAck,
{
ack: expect.any(String),
data: expect.objectContaining({
assetId: asset2User2.id,
}),
type: SyncEntityType.AlbumAssetExifV1,
type: SyncEntityType.AlbumAssetExifCreateV1,
},
]);
expect(response).toHaveLength(2);
// ack initial album asset exif sync
await ctx.syncAckAll(auth, response);
// create a second album
const { album: album2 } = await ctx.newAlbum({ ownerId: user2.id });
await Promise.all(
[asset1User2.id, asset2User2.id, asset3User2.id, asset1Owner.id].map((assetId) =>
ctx.newAlbumAsset({ albumId: album2.id, assetId }),
),
);
await ctx.newAlbumUser({ albumId: album2.id, userId: auth.user.id, role: AlbumUserRole.Editor });
// should backfill the album user
const newResponse = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]);
expect(newResponse).toEqual([
{
ack: expect.any(String),
data: expect.objectContaining({
assetId: asset1Owner.id,
}),
type: SyncEntityType.AlbumAssetExifBackfillV1,
},
{
ack: expect.any(String),
data: expect.objectContaining({
@ -160,21 +165,194 @@ describe(SyncRequestType.AlbumAssetExifsV1, () => {
}),
type: SyncEntityType.AlbumAssetExifBackfillV1,
},
{
ack: expect.stringContaining(SyncEntityType.AlbumAssetExifBackfillV1),
data: {},
type: SyncEntityType.SyncAckV1,
},
backfillSyncAck,
updateSyncAck,
{
ack: expect.any(String),
data: expect.objectContaining({
assetId: asset3User2.id,
}),
type: SyncEntityType.AlbumAssetExifV1,
type: SyncEntityType.AlbumAssetExifCreateV1,
},
]);
expect(newResponse).toHaveLength(5);
await ctx.syncAckAll(auth, newResponse);
await expect(ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1])).resolves.toEqual([]);
});
it('should sync old asset exif when a user adds them to an album they share you', async () => {
const { auth, ctx } = await setup();
const { user: user2 } = await ctx.newUser();
const { asset: firstAsset } = await ctx.newAsset({ ownerId: user2.id, originalFileName: 'firstAsset' });
await ctx.newExif({ assetId: firstAsset.id, make: 'firstAsset' });
const { asset: secondAsset } = await ctx.newAsset({ ownerId: user2.id, originalFileName: 'secondAsset' });
await ctx.newExif({ assetId: secondAsset.id, make: 'secondAsset' });
const { asset: album1Asset } = await ctx.newAsset({ ownerId: user2.id, originalFileName: 'album1Asset' });
await ctx.newExif({ assetId: album1Asset.id, make: 'album1Asset' });
const { album: album1 } = await ctx.newAlbum({ ownerId: user2.id });
const { album: album2 } = await ctx.newAlbum({ ownerId: user2.id });
await ctx.newAlbumAsset({ albumId: album2.id, assetId: firstAsset.id });
await wait(2);
await ctx.newAlbumAsset({ albumId: album1.id, assetId: album1Asset.id });
await ctx.newAlbumUser({ albumId: album1.id, userId: auth.user.id, role: AlbumUserRole.Editor });
const firstAlbumResponse = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]);
expect(firstAlbumResponse).toEqual([
updateSyncAck,
{
ack: expect.any(String),
data: expect.objectContaining({
assetId: album1Asset.id,
}),
type: SyncEntityType.AlbumAssetExifCreateV1,
},
]);
expect(firstAlbumResponse).toHaveLength(2);
await ctx.syncAckAll(auth, firstAlbumResponse);
await ctx.newAlbumUser({ albumId: album2.id, userId: auth.user.id, role: AlbumUserRole.Editor });
const response = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]);
expect(response).toEqual([
{
ack: expect.any(String),
data: expect.objectContaining({
assetId: firstAsset.id,
}),
type: SyncEntityType.AlbumAssetExifBackfillV1,
},
backfillSyncAck,
]);
expect(response).toHaveLength(2);
// ack initial album asset sync
await ctx.syncAckAll(auth, response);
await ctx.newAlbumAsset({ albumId: album2.id, assetId: secondAsset.id });
await wait(2);
// should backfill the new asset even though it's older than the first asset
const newResponse = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]);
expect(newResponse).toEqual([
updateSyncAck,
{
ack: expect.any(String),
data: expect.objectContaining({
assetId: secondAsset.id,
}),
type: SyncEntityType.AlbumAssetExifCreateV1,
},
]);
expect(newResponse).toHaveLength(2);
await ctx.syncAckAll(auth, newResponse);
await expect(ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1])).resolves.toEqual([]);
});
it('should sync asset exif updates for an album shared with you', async () => {
const { auth, ctx } = await setup();
const { user: user2 } = await ctx.newUser();
const { asset } = await ctx.newAsset({ ownerId: user2.id });
await ctx.newExif({ assetId: asset.id, make: 'asset' });
const { album } = await ctx.newAlbum({ ownerId: user2.id });
await wait(2);
await ctx.newAlbumAsset({ albumId: album.id, assetId: asset.id });
await ctx.newAlbumUser({ albumId: album.id, userId: auth.user.id, role: AlbumUserRole.Editor });
const response = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]);
expect(response).toHaveLength(2);
expect(response).toEqual([
updateSyncAck,
{
ack: expect.any(String),
data: expect.objectContaining({
assetId: asset.id,
}),
type: SyncEntityType.AlbumAssetExifCreateV1,
},
]);
await ctx.syncAckAll(auth, response);
// update the asset
const assetRepository = ctx.get(AssetRepository);
await assetRepository.upsertExif({
assetId: asset.id,
city: 'New City',
});
const updateResponse = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]);
expect(updateResponse).toHaveLength(1);
expect(updateResponse).toEqual([
{
ack: expect.any(String),
data: expect.objectContaining({
assetId: asset.id,
city: 'New City',
}),
type: SyncEntityType.AlbumAssetExifUpdateV1,
},
]);
});
it('should sync delayed asset exif creates for an album shared with you', async () => {
const { auth, ctx } = await setup();
const { user: user2 } = await ctx.newUser();
const { asset: assetWithExif } = await ctx.newAsset({ ownerId: user2.id });
await ctx.newExif({ assetId: assetWithExif.id, make: 'assetWithExif' });
const { asset: assetDelayedExif } = await ctx.newAsset({ ownerId: user2.id });
const { album } = await ctx.newAlbum({ ownerId: user2.id });
const { asset: newerAsset } = await ctx.newAsset({ ownerId: user2.id });
await ctx.newExif({ assetId: newerAsset.id, make: 'newerAsset' });
await ctx.newAlbumAsset({ albumId: album.id, assetId: assetWithExif.id });
await wait(2);
await ctx.newAlbumAsset({ albumId: album.id, assetId: assetDelayedExif.id });
await wait(2);
await ctx.newAlbumAsset({ albumId: album.id, assetId: newerAsset.id });
await ctx.newAlbumUser({ albumId: album.id, userId: auth.user.id, role: AlbumUserRole.Editor });
const response = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]);
expect(response).toEqual([
updateSyncAck,
{
ack: expect.any(String),
data: expect.objectContaining({
assetId: assetWithExif.id,
}),
type: SyncEntityType.AlbumAssetExifCreateV1,
},
{
ack: expect.any(String),
data: expect.objectContaining({
assetId: newerAsset.id,
}),
type: SyncEntityType.AlbumAssetExifCreateV1,
},
]);
expect(response).toHaveLength(3);
await ctx.syncAckAll(auth, response);
// update the asset
const assetRepository = ctx.get(AssetRepository);
await assetRepository.upsertExif({
assetId: assetDelayedExif.id,
city: 'Delayed Exif',
});
const updateResponse = await ctx.syncStream(auth, [SyncRequestType.AlbumAssetExifsV1]);
expect(updateResponse).toEqual([
{
ack: expect.any(String),
data: expect.objectContaining({
assetId: assetDelayedExif.id,
city: 'Delayed Exif',
}),
type: SyncEntityType.AlbumAssetExifUpdateV1,
},
]);
expect(updateResponse).toHaveLength(1);
});
});

View File

@ -148,8 +148,6 @@ describe(SyncRequestType.AlbumAssetsV1, () => {
// ack initial album asset sync
await ctx.syncAckAll(auth, response);
// create a second album
await ctx.newAlbumUser({ albumId: album2.id, userId: auth.user.id, role: AlbumUserRole.Editor });
// should backfill the album user