diff --git a/mobile/lib/domain/services/sync_stream.service.dart b/mobile/lib/domain/services/sync_stream.service.dart index 5625635e49..f18f54258b 100644 --- a/mobile/lib/domain/services/sync_stream.service.dart +++ b/mobile/lib/domain/services/sync_stream.service.dart @@ -23,54 +23,17 @@ class SyncStreamService { bool get isCancelled => _cancelChecker?.call() ?? false; - Future sync() { + Future sync() async { _logger.info("Remote sync request for user"); // Start the sync stream and handle events - return _syncApiRepository.streamChanges(_handleEvents); - } - - Future handleWsAssetUploadReadyV1Batch(List batchData) async { - if (batchData.isEmpty) return; - - _logger.info('Processing batch of ${batchData.length} AssetUploadReadyV1 events'); - - final List assets = []; - final List exifs = []; - - try { - for (final data in batchData) { - if (data is! Map) { - continue; - } - - final payload = data; - final assetData = payload['asset']; - final exifData = payload['exif']; - - if (assetData == null || exifData == null) { - continue; - } - - final asset = SyncAssetV1.fromJson(assetData); - final exif = SyncAssetExifV1.fromJson(exifData); - - if (asset != null && exif != null) { - assets.add(asset); - exifs.add(exif); - } - } - - if (assets.isNotEmpty && exifs.isNotEmpty) { - await _syncStreamRepository.updateAssetsV1(assets, debugLabel: 'websocket-batch'); - await _syncStreamRepository.updateAssetsExifV1(exifs, debugLabel: 'websocket-batch'); - _logger.info('Successfully processed ${assets.length} assets in batch'); - } - } catch (error, stackTrace) { - _logger.severe("Error processing AssetUploadReadyV1 websocket batch events", error, stackTrace); + bool shouldReset = false; + await _syncApiRepository.streamChanges(_handleEvents, onReset: () => shouldReset = true); + if (shouldReset) { + await _syncApiRepository.streamChanges(_handleEvents); } } - Future _handleEvents(List events, Function() abort) async { + Future _handleEvents(List events, Function() abort, Function() reset) async { List items = []; for (final event in events) { if (isCancelled) { @@ -83,6 +46,10 @@ class SyncStreamService { await _processBatch(items); } + if (event.type == SyncEntityType.syncResetV1) { + reset(); + } + items.add(event); } @@ -159,6 +126,12 @@ class SyncStreamService { // to acknowledge that the client has processed all the backfill events case SyncEntityType.syncAckV1: return; + // No-op. SyncCompleteV1 is used to signal the completion of the sync process + case SyncEntityType.syncCompleteV1: + return; + // Request to reset the client state. Clear everything related to remote entities + case SyncEntityType.syncResetV1: + return _syncStreamRepository.reset(); case SyncEntityType.memoryV1: return _syncStreamRepository.updateMemoriesV1(data.cast()); case SyncEntityType.memoryDeleteV1: @@ -193,4 +166,45 @@ class SyncStreamService { _logger.warning("Unknown sync data type: $type"); } } + + Future handleWsAssetUploadReadyV1Batch(List batchData) async { + if (batchData.isEmpty) return; + + _logger.info('Processing batch of ${batchData.length} AssetUploadReadyV1 events'); + + final List assets = []; + final List exifs = []; + + try { + for (final data in batchData) { + if (data is! Map) { + continue; + } + + final payload = data; + final assetData = payload['asset']; + final exifData = payload['exif']; + + if (assetData == null || exifData == null) { + continue; + } + + final asset = SyncAssetV1.fromJson(assetData); + final exif = SyncAssetExifV1.fromJson(exifData); + + if (asset != null && exif != null) { + assets.add(asset); + exifs.add(exif); + } + } + + if (assets.isNotEmpty && exifs.isNotEmpty) { + await _syncStreamRepository.updateAssetsV1(assets, debugLabel: 'websocket-batch'); + await _syncStreamRepository.updateAssetsExifV1(exifs, debugLabel: 'websocket-batch'); + _logger.info('Successfully processed ${assets.length} assets in batch'); + } + } catch (error, stackTrace) { + _logger.severe("Error processing AssetUploadReadyV1 websocket batch events", error, stackTrace); + } + } } diff --git a/mobile/lib/infrastructure/repositories/sync_api.repository.dart b/mobile/lib/infrastructure/repositories/sync_api.repository.dart index 2175e77e82..7af1141659 100644 --- a/mobile/lib/infrastructure/repositories/sync_api.repository.dart +++ b/mobile/lib/infrastructure/repositories/sync_api.repository.dart @@ -18,7 +18,8 @@ class SyncApiRepository { } Future streamChanges( - Function(List, Function() abort) onData, { + Future Function(List, Function() abort, Function() reset) onData, { + Function()? onReset, int batchSize = kSyncEventBatchSize, http.Client? httpClient, }) async { @@ -69,6 +70,8 @@ class SyncApiRepository { shouldAbort = true; } + final reset = onReset ?? () {}; + try { final response = await client.send(request); @@ -91,12 +94,12 @@ class SyncApiRepository { continue; } - await onData(_parseLines(lines), abort); + await onData(_parseLines(lines), abort, reset); lines.clear(); } if (lines.isNotEmpty && !shouldAbort) { - await onData(_parseLines(lines), abort); + await onData(_parseLines(lines), abort, reset); } } catch (error, stack) { _logger.severe("Error processing stream", error, stack); @@ -156,7 +159,8 @@ const _kResponseMap = { SyncEntityType.albumToAssetV1: SyncAlbumToAssetV1.fromJson, SyncEntityType.albumToAssetBackfillV1: SyncAlbumToAssetV1.fromJson, SyncEntityType.albumToAssetDeleteV1: SyncAlbumToAssetDeleteV1.fromJson, - SyncEntityType.syncAckV1: _SyncAckV1.fromJson, + SyncEntityType.syncAckV1: _SyncEmptyDto.fromJson, + SyncEntityType.syncResetV1: _SyncEmptyDto.fromJson, SyncEntityType.memoryV1: SyncMemoryV1.fromJson, SyncEntityType.memoryDeleteV1: SyncMemoryDeleteV1.fromJson, SyncEntityType.memoryToAssetV1: SyncMemoryAssetV1.fromJson, @@ -172,8 +176,9 @@ const _kResponseMap = { SyncEntityType.personDeleteV1: SyncPersonDeleteV1.fromJson, SyncEntityType.assetFaceV1: SyncAssetFaceV1.fromJson, SyncEntityType.assetFaceDeleteV1: SyncAssetFaceDeleteV1.fromJson, + SyncEntityType.syncCompleteV1: _SyncEmptyDto.fromJson, }; -class _SyncAckV1 { - static _SyncAckV1? fromJson(dynamic _) => _SyncAckV1(); +class _SyncEmptyDto { + static _SyncEmptyDto? fromJson(dynamic _) => _SyncEmptyDto(); } diff --git a/mobile/lib/infrastructure/repositories/sync_stream.repository.dart b/mobile/lib/infrastructure/repositories/sync_stream.repository.dart index 52ffaabca9..0fed400083 100644 --- a/mobile/lib/infrastructure/repositories/sync_stream.repository.dart +++ b/mobile/lib/infrastructure/repositories/sync_stream.repository.dart @@ -29,6 +29,36 @@ class SyncStreamRepository extends DriftDatabaseRepository { SyncStreamRepository(super.db) : _db = db; + Future reset() async { + _logger.fine("SyncResetV1 received. Resetting remote entities"); + try { + await _db.exclusively(() async { + // foreign_keys PRAGMA is no-op within transactions + // https://www.sqlite.org/pragma.html#pragma_foreign_keys + await _db.customStatement('PRAGMA foreign_keys = OFF'); + await transaction(() async { + await _db.assetFaceEntity.deleteAll(); + await _db.memoryAssetEntity.deleteAll(); + await _db.memoryEntity.deleteAll(); + await _db.partnerEntity.deleteAll(); + await _db.personEntity.deleteAll(); + await _db.remoteAlbumAssetEntity.deleteAll(); + await _db.remoteAlbumEntity.deleteAll(); + await _db.remoteAlbumUserEntity.deleteAll(); + await _db.remoteAssetEntity.deleteAll(); + await _db.remoteExifEntity.deleteAll(); + await _db.stackEntity.deleteAll(); + await _db.userEntity.deleteAll(); + await _db.userMetadataEntity.deleteAll(); + }); + await _db.customStatement('PRAGMA foreign_keys = ON'); + }); + } catch (error, stack) { + _logger.severe('Error: SyncResetV1', error, stack); + rethrow; + } + } + Future deleteUsersV1(Iterable data) async { try { await _db.userEntity.deleteWhere((row) => row.id.isIn(data.map((e) => e.userId))); diff --git a/mobile/lib/repositories/auth.repository.dart b/mobile/lib/repositories/auth.repository.dart index 9d7748254d..ba978b0df0 100644 --- a/mobile/lib/repositories/auth.repository.dart +++ b/mobile/lib/repositories/auth.repository.dart @@ -1,6 +1,5 @@ import 'dart:convert'; -import 'package:drift/drift.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/models/store.model.dart'; import 'package:immich_mobile/entities/album.entity.dart'; @@ -10,6 +9,7 @@ import 'package:immich_mobile/entities/store.entity.dart'; import 'package:immich_mobile/infrastructure/entities/exif.entity.dart'; import 'package:immich_mobile/infrastructure/entities/user.entity.dart'; import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; +import 'package:immich_mobile/infrastructure/repositories/sync_stream.repository.dart'; import 'package:immich_mobile/models/auth/auxilary_endpoint.model.dart'; import 'package:immich_mobile/providers/db.provider.dart'; import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; @@ -25,25 +25,7 @@ class AuthRepository extends DatabaseRepository { const AuthRepository(super.db, this._drift); Future clearLocalData() async { - // Drift deletions - child entities first (those with foreign keys) - await Future.wait([ - _drift.memoryAssetEntity.deleteAll(), - _drift.remoteAlbumAssetEntity.deleteAll(), - _drift.remoteAlbumUserEntity.deleteAll(), - _drift.remoteExifEntity.deleteAll(), - _drift.userMetadataEntity.deleteAll(), - _drift.partnerEntity.deleteAll(), - _drift.stackEntity.deleteAll(), - _drift.assetFaceEntity.deleteAll(), - ]); - // Drift deletions - parent entities - await Future.wait([ - _drift.memoryEntity.deleteAll(), - _drift.personEntity.deleteAll(), - _drift.remoteAlbumEntity.deleteAll(), - _drift.remoteAssetEntity.deleteAll(), - _drift.userEntity.deleteAll(), - ]); + await SyncStreamRepository(_drift).reset(); return db.writeTxn(() { return Future.wait([ diff --git a/mobile/test/domain/services/sync_stream_service_test.dart b/mobile/test/domain/services/sync_stream_service_test.dart index 46e585faa0..0126b11e46 100644 --- a/mobile/test/domain/services/sync_stream_service_test.dart +++ b/mobile/test/domain/services/sync_stream_service_test.dart @@ -30,8 +30,9 @@ void main() { late SyncStreamService sut; late SyncStreamRepository mockSyncStreamRepo; late SyncApiRepository mockSyncApiRepo; - late Function(List, Function()) handleEventsCallback; + late Future Function(List, Function(), Function()) handleEventsCallback; late _MockAbortCallbackWrapper mockAbortCallbackWrapper; + late _MockAbortCallbackWrapper mockResetCallbackWrapper; successHandler(Invocation _) async => true; @@ -39,6 +40,7 @@ void main() { mockSyncStreamRepo = MockSyncStreamRepository(); mockSyncApiRepo = MockSyncApiRepository(); mockAbortCallbackWrapper = _MockAbortCallbackWrapper(); + mockResetCallbackWrapper = _MockAbortCallbackWrapper(); when(() => mockAbortCallbackWrapper()).thenReturn(false); @@ -46,6 +48,10 @@ void main() { handleEventsCallback = invocation.positionalArguments.first; }); + when(() => mockSyncApiRepo.streamChanges(any(), onReset: any(named: 'onReset'))).thenAnswer((invocation) async { + handleEventsCallback = invocation.positionalArguments.first; + }); + when(() => mockSyncApiRepo.ack(any())).thenAnswer((_) async => {}); when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer(successHandler); @@ -86,7 +92,7 @@ void main() { Future simulateEvents(List events) async { await sut.sync(); - await handleEventsCallback(events, mockAbortCallbackWrapper.call); + await handleEventsCallback(events, mockAbortCallbackWrapper.call, mockResetCallbackWrapper.call); } group("SyncStreamService - _handleEvents", () { @@ -156,7 +162,7 @@ void main() { when(() => cancellationChecker()).thenReturn(true); }); - await handleEventsCallback(events, mockAbortCallbackWrapper.call); + await handleEventsCallback(events, mockAbortCallbackWrapper.call, mockResetCallbackWrapper.call); verify(() => mockSyncStreamRepo.deleteUsersV1(any())).called(1); verifyNever(() => mockSyncStreamRepo.updateUsersV1(any())); @@ -188,7 +194,11 @@ void main() { final events = [SyncStreamStub.userDeleteV1, SyncStreamStub.userV1Admin, SyncStreamStub.partnerDeleteV1]; - final processingFuture = handleEventsCallback(events, mockAbortCallbackWrapper.call); + final processingFuture = handleEventsCallback( + events, + mockAbortCallbackWrapper.call, + mockResetCallbackWrapper.call, + ); await pumpEventQueue(); expect(handler1Started, isTrue); diff --git a/mobile/test/infrastructure/repositories/sync_api_repository_test.dart b/mobile/test/infrastructure/repositories/sync_api_repository_test.dart index d456b06f7c..7ce6da3c85 100644 --- a/mobile/test/infrastructure/repositories/sync_api_repository_test.dart +++ b/mobile/test/infrastructure/repositories/sync_api_repository_test.dart @@ -63,7 +63,9 @@ void main() { } }); - Future streamChanges(Function(List, Function() abort) onDataCallback) { + Future streamChanges( + Future Function(List, Function() abort, Function() reset) onDataCallback, + ) { return sut.streamChanges(onDataCallback, batchSize: testBatchSize, httpClient: mockHttpClient); } @@ -72,7 +74,7 @@ void main() { bool abortWasCalledInCallback = false; List receivedEventsBatch1 = []; - onDataCallback(List events, Function() abort) { + Future onDataCallback(List events, Function() abort, Function() _) async { onDataCallCount++; if (onDataCallCount == 1) { receivedEventsBatch1 = events; @@ -116,7 +118,7 @@ void main() { int onDataCallCount = 0; bool abortWasCalledInCallback = false; - onDataCallback(List events, Function() abort) { + Future onDataCallback(List events, Function() abort, Function() _) async { onDataCallCount++; if (onDataCallCount == 1) { abort(); @@ -158,7 +160,7 @@ void main() { List receivedEventsBatch1 = []; List receivedEventsBatch2 = []; - onDataCallback(List events, Function() _) { + Future onDataCallback(List events, Function() _, Function() __) async { onDataCallCount++; if (onDataCallCount == 1) { receivedEventsBatch1 = events; @@ -202,7 +204,7 @@ void main() { final streamError = Exception("Network Error"); int onDataCallCount = 0; - onDataCallback(List events, Function() _) { + Future onDataCallback(List events, Function() _, Function() __) async { onDataCallCount++; } @@ -229,8 +231,7 @@ void main() { when(() => mockStreamedResponse.stream).thenAnswer((_) => http.ByteStream(errorBodyController.stream)); int onDataCallCount = 0; - - onDataCallback(List events, Function() _) { + Future onDataCallback(List events, Function() _, Function() __) async { onDataCallCount++; }