feat: handle SyncResetV1 (#20732)

* feat: handle SyncResetV1

* auto retry on reset and handle SyncCompleteV1

* fix tests

---------

Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com>
Co-authored-by: Alex <alex.tran1502@gmail.com>
This commit is contained in:
shenlong 2025-09-08 20:18:26 +05:30 committed by GitHub
parent 5acf909235
commit fd4c2acde8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 122 additions and 80 deletions

View File

@ -23,54 +23,17 @@ class SyncStreamService {
bool get isCancelled => _cancelChecker?.call() ?? false; bool get isCancelled => _cancelChecker?.call() ?? false;
Future<void> sync() { Future<void> sync() async {
_logger.info("Remote sync request for user"); _logger.info("Remote sync request for user");
// Start the sync stream and handle events // Start the sync stream and handle events
return _syncApiRepository.streamChanges(_handleEvents); bool shouldReset = false;
} await _syncApiRepository.streamChanges(_handleEvents, onReset: () => shouldReset = true);
if (shouldReset) {
Future<void> handleWsAssetUploadReadyV1Batch(List<dynamic> batchData) async { await _syncApiRepository.streamChanges(_handleEvents);
if (batchData.isEmpty) return;
_logger.info('Processing batch of ${batchData.length} AssetUploadReadyV1 events');
final List<SyncAssetV1> assets = [];
final List<SyncAssetExifV1> exifs = [];
try {
for (final data in batchData) {
if (data is! Map<String, dynamic>) {
continue;
}
final payload = data;
final assetData = payload['asset'];
final exifData = payload['exif'];
if (assetData == null || exifData == null) {
continue;
}
final asset = SyncAssetV1.fromJson(assetData);
final exif = SyncAssetExifV1.fromJson(exifData);
if (asset != null && exif != null) {
assets.add(asset);
exifs.add(exif);
} }
} }
if (assets.isNotEmpty && exifs.isNotEmpty) { Future<void> _handleEvents(List<SyncEvent> events, Function() abort, Function() reset) async {
await _syncStreamRepository.updateAssetsV1(assets, debugLabel: 'websocket-batch');
await _syncStreamRepository.updateAssetsExifV1(exifs, debugLabel: 'websocket-batch');
_logger.info('Successfully processed ${assets.length} assets in batch');
}
} catch (error, stackTrace) {
_logger.severe("Error processing AssetUploadReadyV1 websocket batch events", error, stackTrace);
}
}
Future<void> _handleEvents(List<SyncEvent> events, Function() abort) async {
List<SyncEvent> items = []; List<SyncEvent> items = [];
for (final event in events) { for (final event in events) {
if (isCancelled) { if (isCancelled) {
@ -83,6 +46,10 @@ class SyncStreamService {
await _processBatch(items); await _processBatch(items);
} }
if (event.type == SyncEntityType.syncResetV1) {
reset();
}
items.add(event); items.add(event);
} }
@ -159,6 +126,12 @@ class SyncStreamService {
// to acknowledge that the client has processed all the backfill events // to acknowledge that the client has processed all the backfill events
case SyncEntityType.syncAckV1: case SyncEntityType.syncAckV1:
return; return;
// No-op. SyncCompleteV1 is used to signal the completion of the sync process
case SyncEntityType.syncCompleteV1:
return;
// Request to reset the client state. Clear everything related to remote entities
case SyncEntityType.syncResetV1:
return _syncStreamRepository.reset();
case SyncEntityType.memoryV1: case SyncEntityType.memoryV1:
return _syncStreamRepository.updateMemoriesV1(data.cast()); return _syncStreamRepository.updateMemoriesV1(data.cast());
case SyncEntityType.memoryDeleteV1: case SyncEntityType.memoryDeleteV1:
@ -193,4 +166,45 @@ class SyncStreamService {
_logger.warning("Unknown sync data type: $type"); _logger.warning("Unknown sync data type: $type");
} }
} }
Future<void> handleWsAssetUploadReadyV1Batch(List<dynamic> batchData) async {
if (batchData.isEmpty) return;
_logger.info('Processing batch of ${batchData.length} AssetUploadReadyV1 events');
final List<SyncAssetV1> assets = [];
final List<SyncAssetExifV1> exifs = [];
try {
for (final data in batchData) {
if (data is! Map<String, dynamic>) {
continue;
}
final payload = data;
final assetData = payload['asset'];
final exifData = payload['exif'];
if (assetData == null || exifData == null) {
continue;
}
final asset = SyncAssetV1.fromJson(assetData);
final exif = SyncAssetExifV1.fromJson(exifData);
if (asset != null && exif != null) {
assets.add(asset);
exifs.add(exif);
}
}
if (assets.isNotEmpty && exifs.isNotEmpty) {
await _syncStreamRepository.updateAssetsV1(assets, debugLabel: 'websocket-batch');
await _syncStreamRepository.updateAssetsExifV1(exifs, debugLabel: 'websocket-batch');
_logger.info('Successfully processed ${assets.length} assets in batch');
}
} catch (error, stackTrace) {
_logger.severe("Error processing AssetUploadReadyV1 websocket batch events", error, stackTrace);
}
}
} }

View File

@ -18,7 +18,8 @@ class SyncApiRepository {
} }
Future<void> streamChanges( Future<void> streamChanges(
Function(List<SyncEvent>, Function() abort) onData, { Future<void> Function(List<SyncEvent>, Function() abort, Function() reset) onData, {
Function()? onReset,
int batchSize = kSyncEventBatchSize, int batchSize = kSyncEventBatchSize,
http.Client? httpClient, http.Client? httpClient,
}) async { }) async {
@ -69,6 +70,8 @@ class SyncApiRepository {
shouldAbort = true; shouldAbort = true;
} }
final reset = onReset ?? () {};
try { try {
final response = await client.send(request); final response = await client.send(request);
@ -91,12 +94,12 @@ class SyncApiRepository {
continue; continue;
} }
await onData(_parseLines(lines), abort); await onData(_parseLines(lines), abort, reset);
lines.clear(); lines.clear();
} }
if (lines.isNotEmpty && !shouldAbort) { if (lines.isNotEmpty && !shouldAbort) {
await onData(_parseLines(lines), abort); await onData(_parseLines(lines), abort, reset);
} }
} catch (error, stack) { } catch (error, stack) {
_logger.severe("Error processing stream", error, stack); _logger.severe("Error processing stream", error, stack);
@ -156,7 +159,8 @@ const _kResponseMap = <SyncEntityType, Function(Object)>{
SyncEntityType.albumToAssetV1: SyncAlbumToAssetV1.fromJson, SyncEntityType.albumToAssetV1: SyncAlbumToAssetV1.fromJson,
SyncEntityType.albumToAssetBackfillV1: SyncAlbumToAssetV1.fromJson, SyncEntityType.albumToAssetBackfillV1: SyncAlbumToAssetV1.fromJson,
SyncEntityType.albumToAssetDeleteV1: SyncAlbumToAssetDeleteV1.fromJson, SyncEntityType.albumToAssetDeleteV1: SyncAlbumToAssetDeleteV1.fromJson,
SyncEntityType.syncAckV1: _SyncAckV1.fromJson, SyncEntityType.syncAckV1: _SyncEmptyDto.fromJson,
SyncEntityType.syncResetV1: _SyncEmptyDto.fromJson,
SyncEntityType.memoryV1: SyncMemoryV1.fromJson, SyncEntityType.memoryV1: SyncMemoryV1.fromJson,
SyncEntityType.memoryDeleteV1: SyncMemoryDeleteV1.fromJson, SyncEntityType.memoryDeleteV1: SyncMemoryDeleteV1.fromJson,
SyncEntityType.memoryToAssetV1: SyncMemoryAssetV1.fromJson, SyncEntityType.memoryToAssetV1: SyncMemoryAssetV1.fromJson,
@ -172,8 +176,9 @@ const _kResponseMap = <SyncEntityType, Function(Object)>{
SyncEntityType.personDeleteV1: SyncPersonDeleteV1.fromJson, SyncEntityType.personDeleteV1: SyncPersonDeleteV1.fromJson,
SyncEntityType.assetFaceV1: SyncAssetFaceV1.fromJson, SyncEntityType.assetFaceV1: SyncAssetFaceV1.fromJson,
SyncEntityType.assetFaceDeleteV1: SyncAssetFaceDeleteV1.fromJson, SyncEntityType.assetFaceDeleteV1: SyncAssetFaceDeleteV1.fromJson,
SyncEntityType.syncCompleteV1: _SyncEmptyDto.fromJson,
}; };
class _SyncAckV1 { class _SyncEmptyDto {
static _SyncAckV1? fromJson(dynamic _) => _SyncAckV1(); static _SyncEmptyDto? fromJson(dynamic _) => _SyncEmptyDto();
} }

View File

@ -29,6 +29,36 @@ class SyncStreamRepository extends DriftDatabaseRepository {
SyncStreamRepository(super.db) : _db = db; SyncStreamRepository(super.db) : _db = db;
Future<void> reset() async {
_logger.fine("SyncResetV1 received. Resetting remote entities");
try {
await _db.exclusively(() async {
// foreign_keys PRAGMA is no-op within transactions
// https://www.sqlite.org/pragma.html#pragma_foreign_keys
await _db.customStatement('PRAGMA foreign_keys = OFF');
await transaction(() async {
await _db.assetFaceEntity.deleteAll();
await _db.memoryAssetEntity.deleteAll();
await _db.memoryEntity.deleteAll();
await _db.partnerEntity.deleteAll();
await _db.personEntity.deleteAll();
await _db.remoteAlbumAssetEntity.deleteAll();
await _db.remoteAlbumEntity.deleteAll();
await _db.remoteAlbumUserEntity.deleteAll();
await _db.remoteAssetEntity.deleteAll();
await _db.remoteExifEntity.deleteAll();
await _db.stackEntity.deleteAll();
await _db.userEntity.deleteAll();
await _db.userMetadataEntity.deleteAll();
});
await _db.customStatement('PRAGMA foreign_keys = ON');
});
} catch (error, stack) {
_logger.severe('Error: SyncResetV1', error, stack);
rethrow;
}
}
Future<void> deleteUsersV1(Iterable<SyncUserDeleteV1> data) async { Future<void> deleteUsersV1(Iterable<SyncUserDeleteV1> data) async {
try { try {
await _db.userEntity.deleteWhere((row) => row.id.isIn(data.map((e) => e.userId))); await _db.userEntity.deleteWhere((row) => row.id.isIn(data.map((e) => e.userId)));

View File

@ -1,6 +1,5 @@
import 'dart:convert'; import 'dart:convert';
import 'package:drift/drift.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/domain/models/store.model.dart'; import 'package:immich_mobile/domain/models/store.model.dart';
import 'package:immich_mobile/entities/album.entity.dart'; import 'package:immich_mobile/entities/album.entity.dart';
@ -10,6 +9,7 @@ import 'package:immich_mobile/entities/store.entity.dart';
import 'package:immich_mobile/infrastructure/entities/exif.entity.dart'; import 'package:immich_mobile/infrastructure/entities/exif.entity.dart';
import 'package:immich_mobile/infrastructure/entities/user.entity.dart'; import 'package:immich_mobile/infrastructure/entities/user.entity.dart';
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/sync_stream.repository.dart';
import 'package:immich_mobile/models/auth/auxilary_endpoint.model.dart'; import 'package:immich_mobile/models/auth/auxilary_endpoint.model.dart';
import 'package:immich_mobile/providers/db.provider.dart'; import 'package:immich_mobile/providers/db.provider.dart';
import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
@ -25,25 +25,7 @@ class AuthRepository extends DatabaseRepository {
const AuthRepository(super.db, this._drift); const AuthRepository(super.db, this._drift);
Future<void> clearLocalData() async { Future<void> clearLocalData() async {
// Drift deletions - child entities first (those with foreign keys) await SyncStreamRepository(_drift).reset();
await Future.wait([
_drift.memoryAssetEntity.deleteAll(),
_drift.remoteAlbumAssetEntity.deleteAll(),
_drift.remoteAlbumUserEntity.deleteAll(),
_drift.remoteExifEntity.deleteAll(),
_drift.userMetadataEntity.deleteAll(),
_drift.partnerEntity.deleteAll(),
_drift.stackEntity.deleteAll(),
_drift.assetFaceEntity.deleteAll(),
]);
// Drift deletions - parent entities
await Future.wait([
_drift.memoryEntity.deleteAll(),
_drift.personEntity.deleteAll(),
_drift.remoteAlbumEntity.deleteAll(),
_drift.remoteAssetEntity.deleteAll(),
_drift.userEntity.deleteAll(),
]);
return db.writeTxn(() { return db.writeTxn(() {
return Future.wait([ return Future.wait([

View File

@ -30,8 +30,9 @@ void main() {
late SyncStreamService sut; late SyncStreamService sut;
late SyncStreamRepository mockSyncStreamRepo; late SyncStreamRepository mockSyncStreamRepo;
late SyncApiRepository mockSyncApiRepo; late SyncApiRepository mockSyncApiRepo;
late Function(List<SyncEvent>, Function()) handleEventsCallback; late Future<void> Function(List<SyncEvent>, Function(), Function()) handleEventsCallback;
late _MockAbortCallbackWrapper mockAbortCallbackWrapper; late _MockAbortCallbackWrapper mockAbortCallbackWrapper;
late _MockAbortCallbackWrapper mockResetCallbackWrapper;
successHandler(Invocation _) async => true; successHandler(Invocation _) async => true;
@ -39,6 +40,7 @@ void main() {
mockSyncStreamRepo = MockSyncStreamRepository(); mockSyncStreamRepo = MockSyncStreamRepository();
mockSyncApiRepo = MockSyncApiRepository(); mockSyncApiRepo = MockSyncApiRepository();
mockAbortCallbackWrapper = _MockAbortCallbackWrapper(); mockAbortCallbackWrapper = _MockAbortCallbackWrapper();
mockResetCallbackWrapper = _MockAbortCallbackWrapper();
when(() => mockAbortCallbackWrapper()).thenReturn(false); when(() => mockAbortCallbackWrapper()).thenReturn(false);
@ -46,6 +48,10 @@ void main() {
handleEventsCallback = invocation.positionalArguments.first; handleEventsCallback = invocation.positionalArguments.first;
}); });
when(() => mockSyncApiRepo.streamChanges(any(), onReset: any(named: 'onReset'))).thenAnswer((invocation) async {
handleEventsCallback = invocation.positionalArguments.first;
});
when(() => mockSyncApiRepo.ack(any())).thenAnswer((_) async => {}); when(() => mockSyncApiRepo.ack(any())).thenAnswer((_) async => {});
when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer(successHandler); when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer(successHandler);
@ -86,7 +92,7 @@ void main() {
Future<void> simulateEvents(List<SyncEvent> events) async { Future<void> simulateEvents(List<SyncEvent> events) async {
await sut.sync(); await sut.sync();
await handleEventsCallback(events, mockAbortCallbackWrapper.call); await handleEventsCallback(events, mockAbortCallbackWrapper.call, mockResetCallbackWrapper.call);
} }
group("SyncStreamService - _handleEvents", () { group("SyncStreamService - _handleEvents", () {
@ -156,7 +162,7 @@ void main() {
when(() => cancellationChecker()).thenReturn(true); when(() => cancellationChecker()).thenReturn(true);
}); });
await handleEventsCallback(events, mockAbortCallbackWrapper.call); await handleEventsCallback(events, mockAbortCallbackWrapper.call, mockResetCallbackWrapper.call);
verify(() => mockSyncStreamRepo.deleteUsersV1(any())).called(1); verify(() => mockSyncStreamRepo.deleteUsersV1(any())).called(1);
verifyNever(() => mockSyncStreamRepo.updateUsersV1(any())); verifyNever(() => mockSyncStreamRepo.updateUsersV1(any()));
@ -188,7 +194,11 @@ void main() {
final events = [SyncStreamStub.userDeleteV1, SyncStreamStub.userV1Admin, SyncStreamStub.partnerDeleteV1]; final events = [SyncStreamStub.userDeleteV1, SyncStreamStub.userV1Admin, SyncStreamStub.partnerDeleteV1];
final processingFuture = handleEventsCallback(events, mockAbortCallbackWrapper.call); final processingFuture = handleEventsCallback(
events,
mockAbortCallbackWrapper.call,
mockResetCallbackWrapper.call,
);
await pumpEventQueue(); await pumpEventQueue();
expect(handler1Started, isTrue); expect(handler1Started, isTrue);

View File

@ -63,7 +63,9 @@ void main() {
} }
}); });
Future<void> streamChanges(Function(List<SyncEvent>, Function() abort) onDataCallback) { Future<void> streamChanges(
Future<void> Function(List<SyncEvent>, Function() abort, Function() reset) onDataCallback,
) {
return sut.streamChanges(onDataCallback, batchSize: testBatchSize, httpClient: mockHttpClient); return sut.streamChanges(onDataCallback, batchSize: testBatchSize, httpClient: mockHttpClient);
} }
@ -72,7 +74,7 @@ void main() {
bool abortWasCalledInCallback = false; bool abortWasCalledInCallback = false;
List<SyncEvent> receivedEventsBatch1 = []; List<SyncEvent> receivedEventsBatch1 = [];
onDataCallback(List<SyncEvent> events, Function() abort) { Future<void> onDataCallback(List<SyncEvent> events, Function() abort, Function() _) async {
onDataCallCount++; onDataCallCount++;
if (onDataCallCount == 1) { if (onDataCallCount == 1) {
receivedEventsBatch1 = events; receivedEventsBatch1 = events;
@ -116,7 +118,7 @@ void main() {
int onDataCallCount = 0; int onDataCallCount = 0;
bool abortWasCalledInCallback = false; bool abortWasCalledInCallback = false;
onDataCallback(List<SyncEvent> events, Function() abort) { Future<void> onDataCallback(List<SyncEvent> events, Function() abort, Function() _) async {
onDataCallCount++; onDataCallCount++;
if (onDataCallCount == 1) { if (onDataCallCount == 1) {
abort(); abort();
@ -158,7 +160,7 @@ void main() {
List<SyncEvent> receivedEventsBatch1 = []; List<SyncEvent> receivedEventsBatch1 = [];
List<SyncEvent> receivedEventsBatch2 = []; List<SyncEvent> receivedEventsBatch2 = [];
onDataCallback(List<SyncEvent> events, Function() _) { Future<void> onDataCallback(List<SyncEvent> events, Function() _, Function() __) async {
onDataCallCount++; onDataCallCount++;
if (onDataCallCount == 1) { if (onDataCallCount == 1) {
receivedEventsBatch1 = events; receivedEventsBatch1 = events;
@ -202,7 +204,7 @@ void main() {
final streamError = Exception("Network Error"); final streamError = Exception("Network Error");
int onDataCallCount = 0; int onDataCallCount = 0;
onDataCallback(List<SyncEvent> events, Function() _) { Future<void> onDataCallback(List<SyncEvent> events, Function() _, Function() __) async {
onDataCallCount++; onDataCallCount++;
} }
@ -229,8 +231,7 @@ void main() {
when(() => mockStreamedResponse.stream).thenAnswer((_) => http.ByteStream(errorBodyController.stream)); when(() => mockStreamedResponse.stream).thenAnswer((_) => http.ByteStream(errorBodyController.stream));
int onDataCallCount = 0; int onDataCallCount = 0;
Future<void> onDataCallback(List<SyncEvent> events, Function() _, Function() __) async {
onDataCallback(List<SyncEvent> events, Function() _) {
onDataCallCount++; onDataCallCount++;
} }