refactor: sync service and add tests

This commit is contained in:
shenlong-tanwen 2025-03-18 11:29:39 +05:30
parent 7246bcfc5b
commit 6baff23801
11 changed files with 226 additions and 102 deletions

View File

@ -4,7 +4,6 @@ import 'dart:async';
import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
import 'package:immich_mobile/domain/models/sync/sync_event.model.dart';
import 'package:logging/logging.dart';
import 'package:openapi/api.dart';
@ -14,12 +13,6 @@ class SyncStreamService {
final ISyncApiRepository _syncApiRepository;
final ISyncStreamRepository _syncStreamRepository;
StreamSubscription? _userSyncSubscription;
Completer<void> _userSyncCompleter = Completer<void>();
StreamSubscription? _partnerSyncSubscription;
Completer<void> _partnerSyncCompleter = Completer<void>();
SyncStreamService({
required ISyncApiRepository syncApiRepository,
required ISyncStreamRepository syncStreamRepository,
@ -51,47 +44,46 @@ class SyncStreamService {
return false;
}
Future<void> _handleSyncEvents(List<SyncEvent> events) async {
Future<void> _syncEvent(List<SyncRequestType> types) async {
_logger.info("Syncing Events: $types");
final streamCompleter = Completer();
bool shouldSkipOnDone = false;
final subscription = _syncApiRepository.getSyncEvents(types).listen(
(events) async {
try {
Map<SyncEntityType, String> acks = {};
for (final event in events) {
// the onDone callback might fire before the events are processed
// the following flag ensures that the onDone callback is not called
// before the events are processed
shouldSkipOnDone = true;
if (await _handleSyncData(event.data)) {
// Only retain the latest ack from each type
acks[event.type] = event.ack;
}
}
await _syncApiRepository.ack(acks.values.toList());
} catch (error, stack) {
_logger.warning("Error handling sync events", error, stack);
}
Future<void> syncUsers() async {
_logger.info("Syncing User Changes");
_userSyncSubscription =
_syncApiRepository.getSyncEvents([SyncRequestType.usersV1]).listen(
_handleSyncEvents,
onDone: () {
_userSyncCompleter.complete();
_userSyncCompleter = Completer<void>();
streamCompleter.completeOnce();
},
onError: (_) => streamCompleter.completeOnce(),
// onDone is required to be called in cases where the stream is empty
onDone: () => shouldSkipOnDone ? null : streamCompleter.completeOnce,
);
return await _userSyncCompleter.future;
streamCompleter.future.whenComplete(subscription.cancel);
return await streamCompleter.future;
}
Future<void> syncPartners() async {
_logger.info("Syncing Partner Changes");
_partnerSyncSubscription =
_syncApiRepository.getSyncEvents([SyncRequestType.partnersV1]).listen(
_handleSyncEvents,
onDone: () {
_partnerSyncCompleter.complete();
_partnerSyncCompleter = Completer<void>();
},
);
return await _partnerSyncCompleter.future;
}
Future<void> syncUsers() => _syncEvent([SyncRequestType.usersV1]);
Future<void> syncPartners() => _syncEvent([SyncRequestType.partnersV1]);
}
Future<void> dispose() async {
await _userSyncSubscription?.cancel();
_userSyncCompleter.complete();
await _partnerSyncSubscription?.cancel();
_partnerSyncCompleter.complete();
extension on Completer {
void completeOnce() {
if (!isCompleted) {
complete();
}
}
}

View File

@ -7,7 +7,6 @@ import 'package:immich_mobile/models/backup/backup_state.model.dart';
import 'package:immich_mobile/providers/album/album.provider.dart';
import 'package:immich_mobile/providers/asset.provider.dart';
import 'package:immich_mobile/providers/auth.provider.dart';
import 'package:immich_mobile/providers/background_sync.provider.dart';
import 'package:immich_mobile/providers/backup/backup.provider.dart';
import 'package:immich_mobile/providers/backup/ios_background_settings.provider.dart';
import 'package:immich_mobile/providers/backup/manual_upload.provider.dart';
@ -114,7 +113,6 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
_ref.read(backupProvider.notifier).cancelBackup();
}
_ref.read(websocketProvider.notifier).disconnect();
_ref.read(backgroundSyncProvider).stop();
}
LogService.I.flush();

View File

@ -1,10 +1,7 @@
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/constants/constants.dart';
import 'package:immich_mobile/utils/background_sync.dart';
import 'package:riverpod_annotation/riverpod_annotation.dart';
part 'background_sync.provider.g.dart';
@Riverpod(keepAlive: true)
BackgroundSyncManager backgroundSync(Ref _) =>
BackgroundSyncManager(duration: kBackgroundSyncDuration);
final backgroundSyncProvider = Provider<BackgroundSyncManager>(
(ref) => BackgroundSyncManager(duration: kBackgroundSyncDuration),
);

View File

@ -1,27 +0,0 @@
// GENERATED CODE - DO NOT MODIFY BY HAND
part of 'background_sync.provider.dart';
// **************************************************************************
// RiverpodGenerator
// **************************************************************************
String _$backgroundSyncHash() => r'c08b6499af18d3fedeb9bb6c4ac0833c656f30dd';
/// See also [backgroundSync].
@ProviderFor(backgroundSync)
final backgroundSyncProvider = Provider<BackgroundSyncManager>.internal(
backgroundSync,
name: r'backgroundSyncProvider',
debugGetCreateSourceHash: const bool.fromEnvironment('dart.vm.product')
? null
: _$backgroundSyncHash,
dependencies: null,
allTransitiveDependencies: null,
);
@Deprecated('Will be removed in 3.0. Use Ref instead')
// ignore: unused_element
typedef BackgroundSyncRef = ProviderRef<BackgroundSyncManager>;
// ignore_for_file: type=lint
// ignore_for_file: subtype_of_sealed_class, invalid_use_of_internal_member, invalid_use_of_visible_for_testing_member, deprecated_member_use_from_same_package

View File

@ -1,10 +1,5 @@
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
import 'package:isar/isar.dart';
import 'package:riverpod_annotation/riverpod_annotation.dart';
// overwritten in main.dart due to async loading
final dbProvider = Provider<Isar>((_) => throw UnimplementedError());
@Riverpod(keepAlive: true)
Drift drift(Ref _) => Drift();

View File

@ -1,3 +1,5 @@
import 'dart:async';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
import 'package:isar/isar.dart';
@ -8,5 +10,8 @@ part 'db.provider.g.dart';
@Riverpod(keepAlive: true)
Isar isar(Ref ref) => throw UnimplementedError('isar');
@Riverpod(keepAlive: true)
Drift drift(Ref _) => Drift();
final driftProvider = Provider<Drift>((ref) {
final drift = Drift();
ref.onDispose(() => unawaited(drift.close()));
return drift;
});

View File

@ -1,5 +1,3 @@
import 'dart:async';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/domain/services/sync_stream.service.dart';
import 'package:immich_mobile/infrastructure/repositories/sync_api.repository.dart';
@ -7,16 +5,12 @@ import 'package:immich_mobile/infrastructure/repositories/sync_stream.repository
import 'package:immich_mobile/providers/api.provider.dart';
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
final syncStreamServiceProvider = Provider((ref) {
final instance = SyncStreamService(
final syncStreamServiceProvider = Provider(
(ref) => SyncStreamService(
syncApiRepository: ref.watch(syncApiRepositoryProvider),
syncStreamRepository: ref.watch(syncStreamRepositoryProvider),
);
ref.onDispose(() => unawaited(instance.dispose()));
return instance;
});
),
);
final syncApiRepositoryProvider = Provider(
(ref) => SyncApiRepository(ref.watch(apiServiceProvider)),

View File

@ -27,12 +27,12 @@ class BackgroundSyncManager {
}
void start() {
_logger.info('Background sync enabled');
_logger.info('Starting Background sync');
_timer ??= _createTimer();
}
void stop() {
_logger.info('Background sync disabled');
_logger.info('Stopping Background sync');
_timer?.cancel();
_timer = null;
}

View File

@ -0,0 +1,119 @@
import 'dart:async';
import 'package:flutter_test/flutter_test.dart';
import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
import 'package:immich_mobile/domain/services/sync_stream.service.dart';
import 'package:mocktail/mocktail.dart';
import 'package:openapi/api.dart';
import '../../fixtures/sync_stream.stub.dart';
import '../../infrastructure/repository.mock.dart';
void main() {
late SyncStreamService sut;
late ISyncStreamRepository mockSyncStreamRepo;
late ISyncApiRepository mockSyncApiRepo;
setUp(() {
mockSyncStreamRepo = MockSyncStreamRepository();
mockSyncApiRepo = MockSyncApiRepository();
sut = SyncStreamService(
syncApiRepository: mockSyncApiRepo,
syncStreamRepository: mockSyncStreamRepo,
);
when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1]))
.thenAnswer((_) => Stream.value(SyncStreamStub.userEvents));
when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.partnersV1]))
.thenAnswer((_) => Stream.value(SyncStreamStub.partnerEvents));
when(() => mockSyncApiRepo.ack(any())).thenAnswer((_) => Future.value());
registerFallbackValue(SyncStreamStub.userV1Admin);
when(() => mockSyncStreamRepo.updateUsersV1(any()))
.thenAnswer((_) => Future.value(true));
registerFallbackValue(SyncStreamStub.partnerV1);
when(() => mockSyncStreamRepo.updatePartnerV1(any()))
.thenAnswer((_) => Future.value(false));
registerFallbackValue(SyncStreamStub.userDeleteV1);
when(() => mockSyncStreamRepo.deleteUsersV1(any()))
.thenAnswer((_) => Future.value(false));
registerFallbackValue(SyncStreamStub.partnerDeleteV1);
when(() => mockSyncStreamRepo.deletePartnerV1(any()))
.thenAnswer((_) => Future.value(true));
});
group("_syncEvent", () {
test("future completed on success", () async {
await expectLater(sut.syncUsers(), completes);
});
test("future completes on error from stream", () async {
when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1]))
.thenAnswer((_) => Stream.error(Exception("Error")));
await expectLater(sut.syncUsers(), completes);
});
test("future throws on api exception", () {
when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1]))
.thenThrow(Exception("Error"));
expect(sut.syncUsers(), throwsA(isA<Exception>()));
});
test("future completes on repository exception", () {
when(() => mockSyncStreamRepo.updateUsersV1(any()))
.thenThrow(Exception("Error"));
expect(sut.syncUsers(), completes);
});
test("sends ack for successful events", () async {
when(() => mockSyncStreamRepo.updateUsersV1(any()))
.thenAnswer((_) => Future.value(false));
when(() => mockSyncStreamRepo.deleteUsersV1(any()))
.thenAnswer((_) => Future.value(true));
await sut.syncUsers();
verify(() => mockSyncApiRepo.ack(["2"])).called(1);
});
test("only sends the latest ack for events of same type", () async {
await sut.syncUsers();
verify(() => mockSyncApiRepo.ack(["5"])).called(1);
});
});
group("syncUsers", () {
test("calls _syncEvent with usersV1", () async {
await sut.syncUsers();
verify(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1]))
.called(1);
});
test("calls _handleSyncData for each event", () async {
await sut.syncUsers();
verify(() => mockSyncStreamRepo.updateUsersV1(SyncStreamStub.userV1Admin))
.called(1);
verify(
() => mockSyncStreamRepo.deleteUsersV1(SyncStreamStub.userDeleteV1),
).called(1);
});
});
group("syncPartners", () {
test("calls _syncEvent with partnersV1", () async {
await sut.syncPartners();
verify(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.partnersV1]))
.called(1);
});
test("calls _handleSyncData for each event", () async {
await sut.syncPartners();
verify(
() => mockSyncStreamRepo.updatePartnerV1(SyncStreamStub.partnerV1),
).called(1);
verify(
() =>
mockSyncStreamRepo.deletePartnerV1(SyncStreamStub.partnerDeleteV1),
).called(1);
});
});
}

View File

@ -0,0 +1,45 @@
import 'package:immich_mobile/domain/models/sync/sync_event.model.dart';
import 'package:openapi/api.dart';
abstract final class SyncStreamStub {
static final userV1Admin = SyncUserV1(
deletedAt: DateTime(2020),
email: "admin@admin",
id: "1",
name: "Admin",
);
static final userV1User = SyncUserV1(
deletedAt: DateTime(2021),
email: "user@user",
id: "2",
name: "User",
);
static final userDeleteV1 = SyncUserDeleteV1(userId: "2");
static final userEvents = [
SyncEvent(type: SyncEntityType.userV1, data: userV1Admin, ack: "1"),
SyncEvent(
type: SyncEntityType.userDeleteV1,
data: userDeleteV1,
ack: "2",
),
SyncEvent(type: SyncEntityType.userV1, data: userV1User, ack: "5"),
];
static final partnerV1 = SyncPartnerV1(
inTimeline: true,
sharedById: "1",
sharedWithId: "2",
);
static final partnerDeleteV1 = SyncPartnerDeleteV1(
sharedById: "3",
sharedWithId: "4",
);
static final partnerEvents = [
SyncEvent(type: SyncEntityType.partnerV1, data: partnerV1, ack: "3"),
SyncEvent(
type: SyncEntityType.partnerDeleteV1,
data: partnerDeleteV1,
ack: "4",
),
];
}

View File

@ -1,6 +1,8 @@
import 'package:immich_mobile/domain/interfaces/device_asset.interface.dart';
import 'package:immich_mobile/domain/interfaces/log.interface.dart';
import 'package:immich_mobile/domain/interfaces/store.interface.dart';
import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
import 'package:immich_mobile/domain/interfaces/user.interface.dart';
import 'package:immich_mobile/domain/interfaces/user_api.interface.dart';
import 'package:mocktail/mocktail.dart';
@ -14,5 +16,9 @@ class MockUserRepository extends Mock implements IUserRepository {}
class MockDeviceAssetRepository extends Mock
implements IDeviceAssetRepository {}
class MockSyncStreamRepository extends Mock implements ISyncStreamRepository {}
// API Repos
class MockUserApiRepository extends Mock implements IUserApiRepository {}
class MockSyncApiRepository extends Mock implements ISyncApiRepository {}