From 6baff238015993fb7831c9506204e923e7b453ac Mon Sep 17 00:00:00 2001 From: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com> Date: Tue, 18 Mar 2025 11:29:39 +0530 Subject: [PATCH] refactor: sync service and add tests --- .../domain/services/sync_stream.service.dart | 88 ++++++------- .../providers/app_life_cycle.provider.dart | 2 - .../providers/background_sync.provider.dart | 9 +- .../providers/background_sync.provider.g.dart | 27 ---- mobile/lib/providers/db.provider.dart | 5 - .../providers/infrastructure/db.provider.dart | 9 +- .../infrastructure/sync_stream.provider.dart | 14 +-- mobile/lib/utils/background_sync.dart | 4 +- .../services/sync_stream_service_test.dart | 119 ++++++++++++++++++ mobile/test/fixtures/sync_stream.stub.dart | 45 +++++++ .../test/infrastructure/repository.mock.dart | 6 + 11 files changed, 226 insertions(+), 102 deletions(-) delete mode 100644 mobile/lib/providers/background_sync.provider.g.dart create mode 100644 mobile/test/domain/services/sync_stream_service_test.dart create mode 100644 mobile/test/fixtures/sync_stream.stub.dart diff --git a/mobile/lib/domain/services/sync_stream.service.dart b/mobile/lib/domain/services/sync_stream.service.dart index da0d156cdd..8169a5477c 100644 --- a/mobile/lib/domain/services/sync_stream.service.dart +++ b/mobile/lib/domain/services/sync_stream.service.dart @@ -4,7 +4,6 @@ import 'dart:async'; import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; -import 'package:immich_mobile/domain/models/sync/sync_event.model.dart'; import 'package:logging/logging.dart'; import 'package:openapi/api.dart'; @@ -14,12 +13,6 @@ class SyncStreamService { final ISyncApiRepository _syncApiRepository; final ISyncStreamRepository _syncStreamRepository; - StreamSubscription? _userSyncSubscription; - Completer _userSyncCompleter = Completer(); - - StreamSubscription? _partnerSyncSubscription; - Completer _partnerSyncCompleter = Completer(); - SyncStreamService({ required ISyncApiRepository syncApiRepository, required ISyncStreamRepository syncStreamRepository, @@ -51,47 +44,46 @@ class SyncStreamService { return false; } - Future _handleSyncEvents(List events) async { - Map acks = {}; - for (final event in events) { - if (await _handleSyncData(event.data)) { - // Only retain the latest ack from each type - acks[event.type] = event.ack; - } + Future _syncEvent(List types) async { + _logger.info("Syncing Events: $types"); + final streamCompleter = Completer(); + bool shouldSkipOnDone = false; + final subscription = _syncApiRepository.getSyncEvents(types).listen( + (events) async { + try { + Map acks = {}; + for (final event in events) { + // the onDone callback might fire before the events are processed + // the following flag ensures that the onDone callback is not called + // before the events are processed + shouldSkipOnDone = true; + if (await _handleSyncData(event.data)) { + // Only retain the latest ack from each type + acks[event.type] = event.ack; + } + } + await _syncApiRepository.ack(acks.values.toList()); + } catch (error, stack) { + _logger.warning("Error handling sync events", error, stack); + } + streamCompleter.completeOnce(); + }, + onError: (_) => streamCompleter.completeOnce(), + // onDone is required to be called in cases where the stream is empty + onDone: () => shouldSkipOnDone ? null : streamCompleter.completeOnce, + ); + streamCompleter.future.whenComplete(subscription.cancel); + return await streamCompleter.future; + } + + Future syncUsers() => _syncEvent([SyncRequestType.usersV1]); + Future syncPartners() => _syncEvent([SyncRequestType.partnersV1]); +} + +extension on Completer { + void completeOnce() { + if (!isCompleted) { + complete(); } - await _syncApiRepository.ack(acks.values.toList()); - } - - Future syncUsers() async { - _logger.info("Syncing User Changes"); - _userSyncSubscription = - _syncApiRepository.getSyncEvents([SyncRequestType.usersV1]).listen( - _handleSyncEvents, - onDone: () { - _userSyncCompleter.complete(); - _userSyncCompleter = Completer(); - }, - ); - return await _userSyncCompleter.future; - } - - Future syncPartners() async { - _logger.info("Syncing Partner Changes"); - _partnerSyncSubscription = - _syncApiRepository.getSyncEvents([SyncRequestType.partnersV1]).listen( - _handleSyncEvents, - onDone: () { - _partnerSyncCompleter.complete(); - _partnerSyncCompleter = Completer(); - }, - ); - return await _partnerSyncCompleter.future; - } - - Future dispose() async { - await _userSyncSubscription?.cancel(); - _userSyncCompleter.complete(); - await _partnerSyncSubscription?.cancel(); - _partnerSyncCompleter.complete(); } } diff --git a/mobile/lib/providers/app_life_cycle.provider.dart b/mobile/lib/providers/app_life_cycle.provider.dart index e45fcb56f6..3ec7813e2f 100644 --- a/mobile/lib/providers/app_life_cycle.provider.dart +++ b/mobile/lib/providers/app_life_cycle.provider.dart @@ -7,7 +7,6 @@ import 'package:immich_mobile/models/backup/backup_state.model.dart'; import 'package:immich_mobile/providers/album/album.provider.dart'; import 'package:immich_mobile/providers/asset.provider.dart'; import 'package:immich_mobile/providers/auth.provider.dart'; -import 'package:immich_mobile/providers/background_sync.provider.dart'; import 'package:immich_mobile/providers/backup/backup.provider.dart'; import 'package:immich_mobile/providers/backup/ios_background_settings.provider.dart'; import 'package:immich_mobile/providers/backup/manual_upload.provider.dart'; @@ -114,7 +113,6 @@ class AppLifeCycleNotifier extends StateNotifier { _ref.read(backupProvider.notifier).cancelBackup(); } _ref.read(websocketProvider.notifier).disconnect(); - _ref.read(backgroundSyncProvider).stop(); } LogService.I.flush(); diff --git a/mobile/lib/providers/background_sync.provider.dart b/mobile/lib/providers/background_sync.provider.dart index 7782885a04..1c5dbc38b9 100644 --- a/mobile/lib/providers/background_sync.provider.dart +++ b/mobile/lib/providers/background_sync.provider.dart @@ -1,10 +1,7 @@ import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/constants/constants.dart'; import 'package:immich_mobile/utils/background_sync.dart'; -import 'package:riverpod_annotation/riverpod_annotation.dart'; -part 'background_sync.provider.g.dart'; - -@Riverpod(keepAlive: true) -BackgroundSyncManager backgroundSync(Ref _) => - BackgroundSyncManager(duration: kBackgroundSyncDuration); +final backgroundSyncProvider = Provider( + (ref) => BackgroundSyncManager(duration: kBackgroundSyncDuration), +); diff --git a/mobile/lib/providers/background_sync.provider.g.dart b/mobile/lib/providers/background_sync.provider.g.dart deleted file mode 100644 index fc9f1f1528..0000000000 --- a/mobile/lib/providers/background_sync.provider.g.dart +++ /dev/null @@ -1,27 +0,0 @@ -// GENERATED CODE - DO NOT MODIFY BY HAND - -part of 'background_sync.provider.dart'; - -// ************************************************************************** -// RiverpodGenerator -// ************************************************************************** - -String _$backgroundSyncHash() => r'c08b6499af18d3fedeb9bb6c4ac0833c656f30dd'; - -/// See also [backgroundSync]. -@ProviderFor(backgroundSync) -final backgroundSyncProvider = Provider.internal( - backgroundSync, - name: r'backgroundSyncProvider', - debugGetCreateSourceHash: const bool.fromEnvironment('dart.vm.product') - ? null - : _$backgroundSyncHash, - dependencies: null, - allTransitiveDependencies: null, -); - -@Deprecated('Will be removed in 3.0. Use Ref instead') -// ignore: unused_element -typedef BackgroundSyncRef = ProviderRef; -// ignore_for_file: type=lint -// ignore_for_file: subtype_of_sealed_class, invalid_use_of_internal_member, invalid_use_of_visible_for_testing_member, deprecated_member_use_from_same_package diff --git a/mobile/lib/providers/db.provider.dart b/mobile/lib/providers/db.provider.dart index cdcfbcf606..e03e037f36 100644 --- a/mobile/lib/providers/db.provider.dart +++ b/mobile/lib/providers/db.provider.dart @@ -1,10 +1,5 @@ import 'package:hooks_riverpod/hooks_riverpod.dart'; -import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; import 'package:isar/isar.dart'; -import 'package:riverpod_annotation/riverpod_annotation.dart'; // overwritten in main.dart due to async loading final dbProvider = Provider((_) => throw UnimplementedError()); - -@Riverpod(keepAlive: true) -Drift drift(Ref _) => Drift(); diff --git a/mobile/lib/providers/infrastructure/db.provider.dart b/mobile/lib/providers/infrastructure/db.provider.dart index 5d052ddcd5..4eefbc556c 100644 --- a/mobile/lib/providers/infrastructure/db.provider.dart +++ b/mobile/lib/providers/infrastructure/db.provider.dart @@ -1,3 +1,5 @@ +import 'dart:async'; + import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; import 'package:isar/isar.dart'; @@ -8,5 +10,8 @@ part 'db.provider.g.dart'; @Riverpod(keepAlive: true) Isar isar(Ref ref) => throw UnimplementedError('isar'); -@Riverpod(keepAlive: true) -Drift drift(Ref _) => Drift(); +final driftProvider = Provider((ref) { + final drift = Drift(); + ref.onDispose(() => unawaited(drift.close())); + return drift; +}); diff --git a/mobile/lib/providers/infrastructure/sync_stream.provider.dart b/mobile/lib/providers/infrastructure/sync_stream.provider.dart index 085661ad5c..2bd26bd933 100644 --- a/mobile/lib/providers/infrastructure/sync_stream.provider.dart +++ b/mobile/lib/providers/infrastructure/sync_stream.provider.dart @@ -1,5 +1,3 @@ -import 'dart:async'; - import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/services/sync_stream.service.dart'; import 'package:immich_mobile/infrastructure/repositories/sync_api.repository.dart'; @@ -7,16 +5,12 @@ import 'package:immich_mobile/infrastructure/repositories/sync_stream.repository import 'package:immich_mobile/providers/api.provider.dart'; import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; -final syncStreamServiceProvider = Provider((ref) { - final instance = SyncStreamService( +final syncStreamServiceProvider = Provider( + (ref) => SyncStreamService( syncApiRepository: ref.watch(syncApiRepositoryProvider), syncStreamRepository: ref.watch(syncStreamRepositoryProvider), - ); - - ref.onDispose(() => unawaited(instance.dispose())); - - return instance; -}); + ), +); final syncApiRepositoryProvider = Provider( (ref) => SyncApiRepository(ref.watch(apiServiceProvider)), diff --git a/mobile/lib/utils/background_sync.dart b/mobile/lib/utils/background_sync.dart index cec8d4f17d..ead1c640ab 100644 --- a/mobile/lib/utils/background_sync.dart +++ b/mobile/lib/utils/background_sync.dart @@ -27,12 +27,12 @@ class BackgroundSyncManager { } void start() { - _logger.info('Background sync enabled'); + _logger.info('Starting Background sync'); _timer ??= _createTimer(); } void stop() { - _logger.info('Background sync disabled'); + _logger.info('Stopping Background sync'); _timer?.cancel(); _timer = null; } diff --git a/mobile/test/domain/services/sync_stream_service_test.dart b/mobile/test/domain/services/sync_stream_service_test.dart new file mode 100644 index 0000000000..e5cfbd0ccc --- /dev/null +++ b/mobile/test/domain/services/sync_stream_service_test.dart @@ -0,0 +1,119 @@ +import 'dart:async'; + +import 'package:flutter_test/flutter_test.dart'; +import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; +import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; +import 'package:immich_mobile/domain/services/sync_stream.service.dart'; +import 'package:mocktail/mocktail.dart'; +import 'package:openapi/api.dart'; + +import '../../fixtures/sync_stream.stub.dart'; +import '../../infrastructure/repository.mock.dart'; + +void main() { + late SyncStreamService sut; + late ISyncStreamRepository mockSyncStreamRepo; + late ISyncApiRepository mockSyncApiRepo; + + setUp(() { + mockSyncStreamRepo = MockSyncStreamRepository(); + mockSyncApiRepo = MockSyncApiRepository(); + sut = SyncStreamService( + syncApiRepository: mockSyncApiRepo, + syncStreamRepository: mockSyncStreamRepo, + ); + + when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1])) + .thenAnswer((_) => Stream.value(SyncStreamStub.userEvents)); + when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.partnersV1])) + .thenAnswer((_) => Stream.value(SyncStreamStub.partnerEvents)); + when(() => mockSyncApiRepo.ack(any())).thenAnswer((_) => Future.value()); + + registerFallbackValue(SyncStreamStub.userV1Admin); + when(() => mockSyncStreamRepo.updateUsersV1(any())) + .thenAnswer((_) => Future.value(true)); + registerFallbackValue(SyncStreamStub.partnerV1); + when(() => mockSyncStreamRepo.updatePartnerV1(any())) + .thenAnswer((_) => Future.value(false)); + registerFallbackValue(SyncStreamStub.userDeleteV1); + when(() => mockSyncStreamRepo.deleteUsersV1(any())) + .thenAnswer((_) => Future.value(false)); + registerFallbackValue(SyncStreamStub.partnerDeleteV1); + when(() => mockSyncStreamRepo.deletePartnerV1(any())) + .thenAnswer((_) => Future.value(true)); + }); + + group("_syncEvent", () { + test("future completed on success", () async { + await expectLater(sut.syncUsers(), completes); + }); + + test("future completes on error from stream", () async { + when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1])) + .thenAnswer((_) => Stream.error(Exception("Error"))); + await expectLater(sut.syncUsers(), completes); + }); + + test("future throws on api exception", () { + when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1])) + .thenThrow(Exception("Error")); + expect(sut.syncUsers(), throwsA(isA())); + }); + + test("future completes on repository exception", () { + when(() => mockSyncStreamRepo.updateUsersV1(any())) + .thenThrow(Exception("Error")); + expect(sut.syncUsers(), completes); + }); + + test("sends ack for successful events", () async { + when(() => mockSyncStreamRepo.updateUsersV1(any())) + .thenAnswer((_) => Future.value(false)); + when(() => mockSyncStreamRepo.deleteUsersV1(any())) + .thenAnswer((_) => Future.value(true)); + await sut.syncUsers(); + verify(() => mockSyncApiRepo.ack(["2"])).called(1); + }); + + test("only sends the latest ack for events of same type", () async { + await sut.syncUsers(); + verify(() => mockSyncApiRepo.ack(["5"])).called(1); + }); + }); + + group("syncUsers", () { + test("calls _syncEvent with usersV1", () async { + await sut.syncUsers(); + verify(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1])) + .called(1); + }); + + test("calls _handleSyncData for each event", () async { + await sut.syncUsers(); + verify(() => mockSyncStreamRepo.updateUsersV1(SyncStreamStub.userV1Admin)) + .called(1); + verify( + () => mockSyncStreamRepo.deleteUsersV1(SyncStreamStub.userDeleteV1), + ).called(1); + }); + }); + + group("syncPartners", () { + test("calls _syncEvent with partnersV1", () async { + await sut.syncPartners(); + verify(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.partnersV1])) + .called(1); + }); + + test("calls _handleSyncData for each event", () async { + await sut.syncPartners(); + verify( + () => mockSyncStreamRepo.updatePartnerV1(SyncStreamStub.partnerV1), + ).called(1); + verify( + () => + mockSyncStreamRepo.deletePartnerV1(SyncStreamStub.partnerDeleteV1), + ).called(1); + }); + }); +} diff --git a/mobile/test/fixtures/sync_stream.stub.dart b/mobile/test/fixtures/sync_stream.stub.dart new file mode 100644 index 0000000000..e6395483fa --- /dev/null +++ b/mobile/test/fixtures/sync_stream.stub.dart @@ -0,0 +1,45 @@ +import 'package:immich_mobile/domain/models/sync/sync_event.model.dart'; +import 'package:openapi/api.dart'; + +abstract final class SyncStreamStub { + static final userV1Admin = SyncUserV1( + deletedAt: DateTime(2020), + email: "admin@admin", + id: "1", + name: "Admin", + ); + static final userV1User = SyncUserV1( + deletedAt: DateTime(2021), + email: "user@user", + id: "2", + name: "User", + ); + static final userDeleteV1 = SyncUserDeleteV1(userId: "2"); + static final userEvents = [ + SyncEvent(type: SyncEntityType.userV1, data: userV1Admin, ack: "1"), + SyncEvent( + type: SyncEntityType.userDeleteV1, + data: userDeleteV1, + ack: "2", + ), + SyncEvent(type: SyncEntityType.userV1, data: userV1User, ack: "5"), + ]; + + static final partnerV1 = SyncPartnerV1( + inTimeline: true, + sharedById: "1", + sharedWithId: "2", + ); + static final partnerDeleteV1 = SyncPartnerDeleteV1( + sharedById: "3", + sharedWithId: "4", + ); + static final partnerEvents = [ + SyncEvent(type: SyncEntityType.partnerV1, data: partnerV1, ack: "3"), + SyncEvent( + type: SyncEntityType.partnerDeleteV1, + data: partnerDeleteV1, + ack: "4", + ), + ]; +} diff --git a/mobile/test/infrastructure/repository.mock.dart b/mobile/test/infrastructure/repository.mock.dart index 192858adff..c4a5680f71 100644 --- a/mobile/test/infrastructure/repository.mock.dart +++ b/mobile/test/infrastructure/repository.mock.dart @@ -1,6 +1,8 @@ import 'package:immich_mobile/domain/interfaces/device_asset.interface.dart'; import 'package:immich_mobile/domain/interfaces/log.interface.dart'; import 'package:immich_mobile/domain/interfaces/store.interface.dart'; +import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; +import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; import 'package:immich_mobile/domain/interfaces/user.interface.dart'; import 'package:immich_mobile/domain/interfaces/user_api.interface.dart'; import 'package:mocktail/mocktail.dart'; @@ -14,5 +16,9 @@ class MockUserRepository extends Mock implements IUserRepository {} class MockDeviceAssetRepository extends Mock implements IDeviceAssetRepository {} +class MockSyncStreamRepository extends Mock implements ISyncStreamRepository {} + // API Repos class MockUserApiRepository extends Mock implements IUserApiRepository {} + +class MockSyncApiRepository extends Mock implements ISyncApiRepository {}