diff --git a/mobile/analysis_options.yaml b/mobile/analysis_options.yaml index 21595e3e10..854f852e3c 100644 --- a/mobile/analysis_options.yaml +++ b/mobile/analysis_options.yaml @@ -35,6 +35,7 @@ linter: analyzer: exclude: - openapi/** + - build/** - lib/generated_plugin_registrant.dart - lib/**/*.g.dart - lib/**/*.drift.dart @@ -147,7 +148,9 @@ dart_code_metrics: - avoid-global-state - avoid-inverted-boolean-checks - avoid-late-final-reassignment - - avoid-local-functions + - avoid-local-functions: + exclude: + - test/**.dart - avoid-negated-conditions - avoid-nested-streams-and-futures - avoid-referencing-subclasses diff --git a/mobile/devtools_options.yaml b/mobile/devtools_options.yaml index fa0b357c4f..f592d85a9b 100644 --- a/mobile/devtools_options.yaml +++ b/mobile/devtools_options.yaml @@ -1,3 +1,4 @@ description: This file stores settings for Dart & Flutter DevTools. documentation: https://docs.flutter.dev/tools/devtools/extensions#configure-extension-enablement-states extensions: + - drift: true \ No newline at end of file diff --git a/mobile/lib/constants/constants.dart b/mobile/lib/constants/constants.dart index b084effd91..a91e0a715d 100644 --- a/mobile/lib/constants/constants.dart +++ b/mobile/lib/constants/constants.dart @@ -5,6 +5,9 @@ const double downloadFailed = -2; // Number of log entries to retain on app start const int kLogTruncateLimit = 250; +// Sync +const int kSyncEventBatchSize = 5000; + // Hash batch limits const int kBatchHashFileLimit = 128; const int kBatchHashSizeLimit = 1024 * 1024 * 1024; // 1GB diff --git a/mobile/lib/domain/interfaces/sync_stream.interface.dart b/mobile/lib/domain/interfaces/sync_stream.interface.dart index 0b2b5879f0..f9c52d7ee0 100644 --- a/mobile/lib/domain/interfaces/sync_stream.interface.dart +++ b/mobile/lib/domain/interfaces/sync_stream.interface.dart @@ -2,9 +2,9 @@ import 'package:immich_mobile/domain/interfaces/db.interface.dart'; import 'package:openapi/api.dart'; abstract interface class ISyncStreamRepository implements IDatabaseRepository { - Future updateUsersV1(SyncUserV1 data); - Future deleteUsersV1(SyncUserDeleteV1 data); + Future updateUsersV1(Iterable data); + Future deleteUsersV1(Iterable data); - Future updatePartnerV1(SyncPartnerV1 data); - Future deletePartnerV1(SyncPartnerDeleteV1 data); + Future updatePartnerV1(Iterable data); + Future deletePartnerV1(Iterable data); } diff --git a/mobile/lib/domain/services/sync_stream.service.dart b/mobile/lib/domain/services/sync_stream.service.dart index e710fba1b4..57c8ec72fd 100644 --- a/mobile/lib/domain/services/sync_stream.service.dart +++ b/mobile/lib/domain/services/sync_stream.service.dart @@ -2,6 +2,7 @@ import 'dart:async'; +import 'package:collection/collection.dart'; import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; import 'package:logging/logging.dart'; @@ -19,79 +20,122 @@ class SyncStreamService { }) : _syncApiRepository = syncApiRepository, _syncStreamRepository = syncStreamRepository; - // ignore: avoid-dynamic - Future _handleSyncData(dynamic data) async { - if (data is SyncPartnerV1) { - _logger.fine("SyncPartnerV1: $data"); - return await _syncStreamRepository.updatePartnerV1(data); + Future _handleSyncData( + SyncEntityType type, + // ignore: avoid-dynamic + Iterable data, + ) async { + if (data.isEmpty) { + _logger.warning("Received empty sync data for $type"); + return false; } - if (data is SyncUserV1) { - _logger.fine("SyncUserV1: $data"); - return await _syncStreamRepository.updateUsersV1(data); - } - - if (data is SyncPartnerDeleteV1) { - _logger.fine("SyncPartnerDeleteV1: $data"); - return await _syncStreamRepository.deletePartnerV1(data); - } - - if (data is SyncUserDeleteV1) { - _logger.fine("SyncUserDeleteV1: $data"); - return await _syncStreamRepository.deleteUsersV1(data); + _logger.fine("Processing sync data for $type of length ${data.length}"); + + try { + if (type == SyncEntityType.partnerV1) { + return await _syncStreamRepository.updatePartnerV1(data.cast()); + } + + if (type == SyncEntityType.partnerDeleteV1) { + return await _syncStreamRepository.deletePartnerV1(data.cast()); + } + + if (type == SyncEntityType.userV1) { + return await _syncStreamRepository.updateUsersV1(data.cast()); + } + + if (type == SyncEntityType.userDeleteV1) { + return await _syncStreamRepository.deleteUsersV1(data.cast()); + } + } catch (error, stack) { + _logger.severe("Error processing sync data for $type", error, stack); + return false; } + _logger.warning("Unknown sync data type: $type"); return false; } Future _syncEvent(List types) async { _logger.info("Syncing Events: $types"); final streamCompleter = Completer(); - bool shouldSkipOnDone = false; + bool shouldComplete = false; + // the onDone callback might fire before the events are processed + // the following flag ensures that the onDone callback is not called + // before the events are processed and also that events are processed sequentially + Completer? mutex; final subscription = _syncApiRepository.getSyncEvents(types).listen( (events) async { + if (events.isEmpty) { + _logger.warning("Received empty sync events"); + return; + } + + // If previous events are still being processed, wait for them to finish + if (mutex != null) { + await mutex!.future; + } + + // Take control of the mutex and process the events + mutex = Completer(); + try { - Map acks = {}; - for (final event in events) { - // the onDone callback might fire before the events are processed - // the following flag ensures that the onDone callback is not called - // before the events are processed - shouldSkipOnDone = true; - if (await _handleSyncData(event.data)) { - // Only retain the latest ack from each type - acks[event.type] = event.ack; + final eventsMap = events.groupListsBy((event) => event.type); + final Map acks = {}; + + for (final entry in eventsMap.entries) { + final type = entry.key; + final data = entry.value; + + if (data.isEmpty) { + _logger.warning("Received empty sync events for $type"); + continue; + } + + if (await _handleSyncData(type, data.map((e) => e.data))) { + // ignore: avoid-unsafe-collection-methods + acks[type] = data.last.ack; + } else { + _logger.warning("Failed to handle sync events for $type"); } } - await _syncApiRepository.ack(acks.values.toList()); + + if (acks.isNotEmpty) { + await _syncApiRepository.ack(acks.values.toList()); + } _logger.info("$types events processed"); } catch (error, stack) { _logger.warning("Error handling sync events", error, stack); + } finally { + mutex?.complete(); + mutex = null; + } + + if (shouldComplete) { + _logger.info("Sync done, completing stream"); + if (!streamCompleter.isCompleted) streamCompleter.complete(); } - streamCompleter.completeOnce(); }, - onError: (error) { - _logger.warning("Error in sync stream for $types", error); - streamCompleter.completeOnce(); + onError: (error, stack) { + _logger.warning("Error in sync stream for $types", error, stack); + // Do not proceed if the stream errors + if (!streamCompleter.isCompleted) streamCompleter.complete(); }, - // onDone is required to be called in cases where the stream is empty onDone: () { _logger.info("$types stream done"); - if (!shouldSkipOnDone) { - streamCompleter.completeOnce(); + if (mutex == null && !streamCompleter.isCompleted) { + streamCompleter.complete(); + } else { + // Marks the stream as done but does not complete the completer + // until the events are processed + shouldComplete = true; } }, ); return await streamCompleter.future.whenComplete(subscription.cancel); } - Future syncUsers() => _syncEvent([SyncRequestType.usersV1]); - Future syncPartners() => _syncEvent([SyncRequestType.partnersV1]); -} - -extension on Completer { - void completeOnce() { - if (!isCompleted) { - complete(); - } - } + Future syncUsers() => + _syncEvent([SyncRequestType.usersV1, SyncRequestType.partnersV1]); } diff --git a/mobile/lib/extensions/string_extensions.dart b/mobile/lib/extensions/string_extensions.dart index 293618df0b..73c8c2d34c 100644 --- a/mobile/lib/extensions/string_extensions.dart +++ b/mobile/lib/extensions/string_extensions.dart @@ -35,5 +35,6 @@ extension DurationExtension on String { } extension UUIDExtension on String { - Uint8List toUuidByte() => UuidParsing.parseAsByteList(this); + Uint8List toUuidByte({bool shouldValidate = false}) => + UuidParsing.parseAsByteList(this, validate: shouldValidate); } diff --git a/mobile/lib/infrastructure/repositories/sync_api.repository.dart b/mobile/lib/infrastructure/repositories/sync_api.repository.dart index 916d5f4740..a26b867df6 100644 --- a/mobile/lib/infrastructure/repositories/sync_api.repository.dart +++ b/mobile/lib/infrastructure/repositories/sync_api.repository.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'dart:convert'; import 'package:http/http.dart' as http; +import 'package:immich_mobile/constants/constants.dart'; import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; import 'package:immich_mobile/domain/models/sync_event.model.dart'; import 'package:immich_mobile/services/api.service.dart'; @@ -11,7 +12,9 @@ import 'package:openapi/api.dart'; class SyncApiRepository implements ISyncApiRepository { final Logger _logger = Logger('SyncApiRepository'); final ApiService _api; - SyncApiRepository(this._api); + final int _batchSize; + SyncApiRepository(this._api, {int batchSize = kSyncEventBatchSize}) + : _batchSize = batchSize; @override Stream> getSyncEvents(List type) { @@ -23,10 +26,7 @@ class SyncApiRepository implements ISyncApiRepository { return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: data)); } - Stream> _getSyncStream( - SyncStreamDto dto, { - int batchSize = 5000, - }) async* { + Stream> _getSyncStream(SyncStreamDto dto) async* { final client = http.Client(); final endpoint = "${_api.apiClient.basePath}/sync/stream"; @@ -64,7 +64,7 @@ class SyncApiRepository implements ISyncApiRepository { previousChunk = parts.removeLast(); lines.addAll(parts); - if (lines.length < batchSize) { + if (lines.length < _batchSize) { continue; } @@ -90,7 +90,7 @@ class SyncApiRepository implements ISyncApiRepository { final ack = jsonData['ack']; final converter = _kResponseMap[type]; if (converter == null) { - _logger.warning("[_parseSyncReponse] Unknown type $type"); + _logger.warning("[_parseSyncResponse] Unknown type $type"); continue; } diff --git a/mobile/lib/infrastructure/repositories/sync_stream.repository.dart b/mobile/lib/infrastructure/repositories/sync_stream.repository.dart index f85b50c220..a947a9a66b 100644 --- a/mobile/lib/infrastructure/repositories/sync_stream.repository.dart +++ b/mobile/lib/infrastructure/repositories/sync_stream.repository.dart @@ -15,11 +15,16 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository DriftSyncStreamRepository(super.db) : _db = db; @override - Future deleteUsersV1(SyncUserDeleteV1 data) async { + Future deleteUsersV1(Iterable data) async { try { - await _db.managers.userEntity - .filter((row) => row.id.equals(data.userId.toUuidByte())) - .delete(); + await _db.batch((batch) { + for (final user in data) { + batch.delete( + _db.userEntity, + UserEntityCompanion(id: Value(user.userId.toUuidByte())), + ); + } + }); return true; } catch (e, s) { _logger.severe('Error while processing SyncUserDeleteV1', e, s); @@ -28,17 +33,22 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository } @override - Future updateUsersV1(SyncUserV1 data) async { - final companion = UserEntityCompanion( - name: Value(data.name), - email: Value(data.email), - ); - + Future updateUsersV1(Iterable data) async { try { - await _db.userEntity.insertOne( - companion.copyWith(id: Value(data.id.toUuidByte())), - onConflict: DoUpdate((_) => companion), - ); + await _db.batch((batch) { + for (final user in data) { + final companion = UserEntityCompanion( + name: Value(user.name), + email: Value(user.email), + ); + + batch.insert( + _db.userEntity, + companion.copyWith(id: Value(user.id.toUuidByte())), + onConflict: DoUpdate((_) => companion), + ); + } + }); return true; } catch (e, s) { _logger.severe('Error while processing SyncUserV1', e, s); @@ -47,15 +57,19 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository } @override - Future deletePartnerV1(SyncPartnerDeleteV1 data) async { + Future deletePartnerV1(Iterable data) async { try { - await _db.managers.partnerEntity - .filter( - (row) => - row.sharedById.id.equals(data.sharedById.toUuidByte()) & - row.sharedWithId.id.equals(data.sharedWithId.toUuidByte()), - ) - .delete(); + await _db.batch((batch) { + for (final partner in data) { + batch.delete( + _db.partnerEntity, + PartnerEntityCompanion( + sharedById: Value(partner.sharedById.toUuidByte()), + sharedWithId: Value(partner.sharedWithId.toUuidByte()), + ), + ); + } + }); return true; } catch (e, s) { _logger.severe('Error while processing SyncPartnerDeleteV1', e, s); @@ -64,18 +78,23 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository } @override - Future updatePartnerV1(SyncPartnerV1 data) async { - final companion = - PartnerEntityCompanion(inTimeline: Value(data.inTimeline)); - + Future updatePartnerV1(Iterable data) async { try { - await _db.partnerEntity.insertOne( - companion.copyWith( - sharedById: Value(data.sharedById.toUuidByte()), - sharedWithId: Value(data.sharedWithId.toUuidByte()), - ), - onConflict: DoUpdate((_) => companion), - ); + await _db.batch((batch) { + for (final partner in data) { + final companion = + PartnerEntityCompanion(inTimeline: Value(partner.inTimeline)); + + batch.insert( + _db.partnerEntity, + companion.copyWith( + sharedById: Value(partner.sharedById.toUuidByte()), + sharedWithId: Value(partner.sharedWithId.toUuidByte()), + ), + onConflict: DoUpdate((_) => companion), + ); + } + }); return true; } catch (e, s) { _logger.severe('Error while processing SyncPartnerV1', e, s); diff --git a/mobile/lib/utils/background_sync.dart b/mobile/lib/utils/background_sync.dart index bfedbf2601..906a548cbe 100644 --- a/mobile/lib/utils/background_sync.dart +++ b/mobile/lib/utils/background_sync.dart @@ -9,7 +9,6 @@ import 'package:immich_mobile/utils/isolate.dart'; class BackgroundSyncManager { // This prevents multiple syncs from running at the same time final _userSyncCache = AsyncCache.ephemeral(); - final _partnerSyncCache = AsyncCache.ephemeral(); BackgroundSyncManager(); @@ -18,10 +17,4 @@ class BackgroundSyncManager { (ref) => ref.read(syncStreamServiceProvider).syncUsers(), ), ); - - Future syncPartners() => _partnerSyncCache.fetch( - () async => runInIsolate( - (ref) => ref.read(syncStreamServiceProvider).syncPartners(), - ), - ); } diff --git a/mobile/test/domain/services/sync_stream_service_test.dart b/mobile/test/domain/services/sync_stream_service_test.dart index e5cfbd0ccc..aa4ab9d896 100644 --- a/mobile/test/domain/services/sync_stream_service_test.dart +++ b/mobile/test/domain/services/sync_stream_service_test.dart @@ -1,8 +1,11 @@ +// ignore_for_file: avoid-unnecessary-futures + import 'dart:async'; import 'package:flutter_test/flutter_test.dart'; import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; +import 'package:immich_mobile/domain/models/sync_event.model.dart'; import 'package:immich_mobile/domain/services/sync_stream.service.dart'; import 'package:mocktail/mocktail.dart'; import 'package:openapi/api.dart'; @@ -14,105 +17,314 @@ void main() { late SyncStreamService sut; late ISyncStreamRepository mockSyncStreamRepo; late ISyncApiRepository mockSyncApiRepo; + late StreamController> streamController; + + successHandler(Invocation _) async => true; + failureHandler(Invocation _) async => false; setUp(() { mockSyncStreamRepo = MockSyncStreamRepository(); mockSyncApiRepo = MockSyncApiRepository(); + streamController = StreamController>.broadcast(); + sut = SyncStreamService( syncApiRepository: mockSyncApiRepo, syncStreamRepository: mockSyncStreamRepo, ); - when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1])) - .thenAnswer((_) => Stream.value(SyncStreamStub.userEvents)); - when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.partnersV1])) - .thenAnswer((_) => Stream.value(SyncStreamStub.partnerEvents)); - when(() => mockSyncApiRepo.ack(any())).thenAnswer((_) => Future.value()); + // Default stream setup - emits one batch and closes + when(() => mockSyncApiRepo.getSyncEvents(any())) + .thenAnswer((_) => streamController.stream); - registerFallbackValue(SyncStreamStub.userV1Admin); + // Default ack setup + when(() => mockSyncApiRepo.ack(any())).thenAnswer((_) async => {}); + + // Register fallbacks for mocktail verification + registerFallbackValue([]); + registerFallbackValue([]); + registerFallbackValue([]); + registerFallbackValue([]); + + // Default successful repository calls when(() => mockSyncStreamRepo.updateUsersV1(any())) - .thenAnswer((_) => Future.value(true)); - registerFallbackValue(SyncStreamStub.partnerV1); - when(() => mockSyncStreamRepo.updatePartnerV1(any())) - .thenAnswer((_) => Future.value(false)); - registerFallbackValue(SyncStreamStub.userDeleteV1); + .thenAnswer(successHandler); when(() => mockSyncStreamRepo.deleteUsersV1(any())) - .thenAnswer((_) => Future.value(false)); - registerFallbackValue(SyncStreamStub.partnerDeleteV1); + .thenAnswer(successHandler); + when(() => mockSyncStreamRepo.updatePartnerV1(any())) + .thenAnswer(successHandler); when(() => mockSyncStreamRepo.deletePartnerV1(any())) - .thenAnswer((_) => Future.value(true)); + .thenAnswer(successHandler); }); - group("_syncEvent", () { - test("future completed on success", () async { + tearDown(() async { + await streamController.close(); + }); + + // Helper to trigger sync and add events to the stream + Future triggerSyncAndEmit(List events) async { + final future = sut.syncUsers(); // Start listening + await Future.delayed(Duration.zero); // Allow listener to attach + if (!streamController.isClosed) { + streamController.add(events); + await streamController.close(); // Close after emitting + } + await future; // Wait for processing to complete + } + + group("SyncStreamService", () { + test( + "completes successfully when stream emits data and handlers succeed", + () async { + final events = [ + ...SyncStreamStub.userEvents, + ...SyncStreamStub.partnerEvents, + ]; + final future = triggerSyncAndEmit(events); + await expectLater(future, completes); + // Verify ack includes last ack from each successfully handled type + verify( + () => mockSyncApiRepo + .ack(any(that: containsAllInOrder(["5", "2", "4", "3"]))), + ).called(1); + }, + ); + + test("completes successfully when stream emits an error", () async { + when(() => mockSyncApiRepo.getSyncEvents(any())) + .thenAnswer((_) => Stream.error(Exception("Stream Error"))); + // Should complete gracefully without throwing await expectLater(sut.syncUsers(), completes); + verifyNever(() => mockSyncApiRepo.ack(any())); // No ack on stream error }); - test("future completes on error from stream", () async { - when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1])) - .thenAnswer((_) => Stream.error(Exception("Error"))); - await expectLater(sut.syncUsers(), completes); + test("throws when initial getSyncEvents call fails", () async { + final apiException = Exception("API Error"); + when(() => mockSyncApiRepo.getSyncEvents(any())).thenThrow(apiException); + // Should rethrow the exception from the initial call + await expectLater(sut.syncUsers(), throwsA(apiException)); + verifyNever(() => mockSyncApiRepo.ack(any())); }); - test("future throws on api exception", () { - when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1])) - .thenThrow(Exception("Error")); - expect(sut.syncUsers(), throwsA(isA())); - }); + test( + "completes successfully when a repository handler throws an exception", + () async { + when(() => mockSyncStreamRepo.updateUsersV1(any())) + .thenThrow(Exception("Repo Error")); + final events = [ + ...SyncStreamStub.userEvents, + ...SyncStreamStub.partnerEvents, + ]; + // Should complete, but ack only for the successful types + await triggerSyncAndEmit(events); + // Only partner delete was successful by default setup + verify(() => mockSyncApiRepo.ack(["2", "4", "3"])).called(1); + }, + ); - test("future completes on repository exception", () { + test( + "completes successfully but sends no ack when all handlers fail", + () async { + when(() => mockSyncStreamRepo.updateUsersV1(any())) + .thenAnswer(failureHandler); + when(() => mockSyncStreamRepo.deleteUsersV1(any())) + .thenAnswer(failureHandler); + when(() => mockSyncStreamRepo.updatePartnerV1(any())) + .thenAnswer(failureHandler); + when(() => mockSyncStreamRepo.deletePartnerV1(any())) + .thenAnswer(failureHandler); + + final events = [ + ...SyncStreamStub.userEvents, + ...SyncStreamStub.partnerEvents, + ]; + await triggerSyncAndEmit(events); + verifyNever(() => mockSyncApiRepo.ack(any())); + }, + ); + + test("sends ack only for types where handler returns true", () async { + // Mock specific handlers: user update fails, user delete succeeds when(() => mockSyncStreamRepo.updateUsersV1(any())) - .thenThrow(Exception("Error")); - expect(sut.syncUsers(), completes); - }); - - test("sends ack for successful events", () async { - when(() => mockSyncStreamRepo.updateUsersV1(any())) - .thenAnswer((_) => Future.value(false)); + .thenAnswer(failureHandler); when(() => mockSyncStreamRepo.deleteUsersV1(any())) - .thenAnswer((_) => Future.value(true)); - await sut.syncUsers(); - verify(() => mockSyncApiRepo.ack(["2"])).called(1); + .thenAnswer(successHandler); + // partner update fails, partner delete succeeds + when(() => mockSyncStreamRepo.updatePartnerV1(any())) + .thenAnswer(failureHandler); + + final events = [ + ...SyncStreamStub.userEvents, + ...SyncStreamStub.partnerEvents, + ]; + await triggerSyncAndEmit(events); + + // Expect ack only for userDeleteV1 (ack: "2") and partnerDeleteV1 (ack: "4") + verify( + () => mockSyncApiRepo.ack(any(that: containsAllInOrder(["2", "4"]))), + ).called(1); }); - test("only sends the latest ack for events of same type", () async { - await sut.syncUsers(); - verify(() => mockSyncApiRepo.ack(["5"])).called(1); + test("does not process or ack when stream emits an empty list", () async { + final future = sut.syncUsers(); + streamController.add([]); // Emit empty list + await streamController.close(); + await future; // Wait for completion + + verifyNever(() => mockSyncStreamRepo.updateUsersV1(any())); + verifyNever(() => mockSyncStreamRepo.deleteUsersV1(any())); + verifyNever(() => mockSyncStreamRepo.updatePartnerV1(any())); + verifyNever(() => mockSyncStreamRepo.deletePartnerV1(any())); + verifyNever(() => mockSyncApiRepo.ack(any())); + }); + + test("processes multiple batches sequentially using mutex", () async { + final completer1 = Completer(); + final completer2 = Completer(); + int callOrder = 0; + int handler1StartOrder = -1; + int handler2StartOrder = -1; + + when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer((_) async { + handler1StartOrder = ++callOrder; // Record when handler 1 starts + await completer1.future; // Wait for external signal + return true; + }); + when(() => mockSyncStreamRepo.updatePartnerV1(any())) + .thenAnswer((_) async { + handler2StartOrder = ++callOrder; // Record when handler 2 starts + await completer2.future; + return true; + }); + + final batch1 = SyncStreamStub.userEvents; + final batch2 = SyncStreamStub.partnerEvents; + + final syncFuture = sut.syncUsers(); + streamController.add(batch1); // Emit first batch + + // Ensure first batch processing starts + await Future.delayed(const Duration(milliseconds: 10)); + expect(handler1StartOrder, 1, reason: "Handler 1 should start first"); + + streamController.add(batch2); // Emit second batch while first is waiting + + await Future.delayed(const Duration(milliseconds: 10)); + // Handler 2 should NOT have started yet because mutex is held by handler 1 + expect( + handler2StartOrder, + -1, + reason: "Handler 2 should wait for Handler 1", + ); + + completer1.complete(); // Allow first handler to finish + // Allow second batch to start processing + await Future.delayed(const Duration(milliseconds: 10)); + + // Now handler 2 should start + expect( + handler2StartOrder, + 2, + reason: "Handler 2 should start after Handler 1 finishes", + ); + + completer2.complete(); // Allow second handler to finish + await streamController.close(); // Close stream + await syncFuture; // Wait for overall completion + + // Verify handlers were called and acks sent for both batches eventually + verify(() => mockSyncStreamRepo.updateUsersV1(any())).called(1); + verify(() => mockSyncStreamRepo.updatePartnerV1(any())).called(1); + // Acks might be called separately or combined depending on timing, check count + verify(() => mockSyncApiRepo.ack(any())).called(2); + }); + + test("completes successfully when ack call throws an exception", () async { + when(() => mockSyncApiRepo.ack(any())).thenThrow(Exception("Ack Error")); + final events = [ + ...SyncStreamStub.userEvents, + ...SyncStreamStub.partnerEvents, + ]; + + // Should still complete even if ack fails + await triggerSyncAndEmit(events); + verify(() => mockSyncApiRepo.ack(any())) + .called(1); // Verify ack was attempted + }); + + test("waits for processing to finish if onDone called early", () async { + final processingCompleter = Completer(); + bool handlerFinished = false; + + when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer((_) async { + await processingCompleter.future; // Wait inside handler + handlerFinished = true; + return true; + }); + + final syncFuture = sut.syncUsers(); + // Allow listener to attach + // This is necessary to ensure the stream is ready to receive events + await Future.delayed(Duration.zero); + + streamController.add(SyncStreamStub.userEvents); // Emit batch + await Future.delayed( + const Duration(milliseconds: 10), + ); // Ensure processing starts + + await streamController + .close(); // Close stream (triggers onDone internally) + await Future.delayed( + const Duration(milliseconds: 10), + ); // Give onDone a chance to fire + + // At this point, onDone was called, but processing is blocked + expect(handlerFinished, isFalse); + + processingCompleter.complete(); // Allow processing to finish + await syncFuture; // Now the main future should complete + + expect(handlerFinished, isTrue); + verify(() => mockSyncApiRepo.ack(any())).called(1); }); }); group("syncUsers", () { - test("calls _syncEvent with usersV1", () async { - await sut.syncUsers(); - verify(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1])) - .called(1); - }); + test("calls getSyncEvents with correct types", () async { + // Need to close the stream for the future to complete + final future = sut.syncUsers(); + await streamController.close(); + await future; - test("calls _handleSyncData for each event", () async { - await sut.syncUsers(); - verify(() => mockSyncStreamRepo.updateUsersV1(SyncStreamStub.userV1Admin)) - .called(1); verify( - () => mockSyncStreamRepo.deleteUsersV1(SyncStreamStub.userDeleteV1), + () => mockSyncApiRepo.getSyncEvents([ + SyncRequestType.usersV1, + SyncRequestType.partnersV1, + ]), ).called(1); }); - }); - group("syncPartners", () { - test("calls _syncEvent with partnersV1", () async { - await sut.syncPartners(); - verify(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.partnersV1])) - .called(1); - }); + test("calls repository methods with correctly grouped data", () async { + final events = [ + ...SyncStreamStub.userEvents, + ...SyncStreamStub.partnerEvents, + ]; + await triggerSyncAndEmit(events); - test("calls _handleSyncData for each event", () async { - await sut.syncPartners(); + // Verify each handler was called with the correct list of data payloads verify( - () => mockSyncStreamRepo.updatePartnerV1(SyncStreamStub.partnerV1), + () => mockSyncStreamRepo.updateUsersV1( + [SyncStreamStub.userV1Admin, SyncStreamStub.userV1User], + ), ).called(1); verify( - () => - mockSyncStreamRepo.deletePartnerV1(SyncStreamStub.partnerDeleteV1), + () => mockSyncStreamRepo.deleteUsersV1([SyncStreamStub.userDeleteV1]), + ).called(1); + verify( + () => mockSyncStreamRepo.updatePartnerV1([SyncStreamStub.partnerV1]), + ).called(1); + verify( + () => mockSyncStreamRepo + .deletePartnerV1([SyncStreamStub.partnerDeleteV1]), ).called(1); }); }); diff --git a/mobile/test/fixtures/sync_stream.stub.dart b/mobile/test/fixtures/sync_stream.stub.dart index d52de43662..781e63a2bb 100644 --- a/mobile/test/fixtures/sync_stream.stub.dart +++ b/mobile/test/fixtures/sync_stream.stub.dart @@ -35,11 +35,11 @@ abstract final class SyncStreamStub { sharedWithId: "4", ); static final partnerEvents = [ - SyncEvent(type: SyncEntityType.partnerV1, data: partnerV1, ack: "3"), SyncEvent( type: SyncEntityType.partnerDeleteV1, data: partnerDeleteV1, ack: "4", ), + SyncEvent(type: SyncEntityType.partnerV1, data: partnerV1, ack: "3"), ]; } diff --git a/mobile/test/service.mocks.dart b/mobile/test/service.mocks.dart index e1b8df40a3..87a8c01cf0 100644 --- a/mobile/test/service.mocks.dart +++ b/mobile/test/service.mocks.dart @@ -29,4 +29,3 @@ class MockSearchApi extends Mock implements SearchApi {} class MockAppSettingService extends Mock implements AppSettingsService {} class MockBackgroundService extends Mock implements BackgroundService {} -