batch db calls

This commit is contained in:
shenlong-tanwen 2025-04-15 04:33:18 +05:30
parent 70263c5351
commit 52cca7afa0
12 changed files with 437 additions and 162 deletions

View File

@ -35,6 +35,7 @@ linter:
analyzer: analyzer:
exclude: exclude:
- openapi/** - openapi/**
- build/**
- lib/generated_plugin_registrant.dart - lib/generated_plugin_registrant.dart
- lib/**/*.g.dart - lib/**/*.g.dart
- lib/**/*.drift.dart - lib/**/*.drift.dart
@ -147,7 +148,9 @@ dart_code_metrics:
- avoid-global-state - avoid-global-state
- avoid-inverted-boolean-checks - avoid-inverted-boolean-checks
- avoid-late-final-reassignment - avoid-late-final-reassignment
- avoid-local-functions - avoid-local-functions:
exclude:
- test/**.dart
- avoid-negated-conditions - avoid-negated-conditions
- avoid-nested-streams-and-futures - avoid-nested-streams-and-futures
- avoid-referencing-subclasses - avoid-referencing-subclasses

View File

@ -1,3 +1,4 @@
description: This file stores settings for Dart & Flutter DevTools. description: This file stores settings for Dart & Flutter DevTools.
documentation: https://docs.flutter.dev/tools/devtools/extensions#configure-extension-enablement-states documentation: https://docs.flutter.dev/tools/devtools/extensions#configure-extension-enablement-states
extensions: extensions:
- drift: true

View File

@ -5,6 +5,9 @@ const double downloadFailed = -2;
// Number of log entries to retain on app start // Number of log entries to retain on app start
const int kLogTruncateLimit = 250; const int kLogTruncateLimit = 250;
// Sync
const int kSyncEventBatchSize = 5000;
// Hash batch limits // Hash batch limits
const int kBatchHashFileLimit = 128; const int kBatchHashFileLimit = 128;
const int kBatchHashSizeLimit = 1024 * 1024 * 1024; // 1GB const int kBatchHashSizeLimit = 1024 * 1024 * 1024; // 1GB

View File

@ -2,9 +2,9 @@ import 'package:immich_mobile/domain/interfaces/db.interface.dart';
import 'package:openapi/api.dart'; import 'package:openapi/api.dart';
abstract interface class ISyncStreamRepository implements IDatabaseRepository { abstract interface class ISyncStreamRepository implements IDatabaseRepository {
Future<bool> updateUsersV1(SyncUserV1 data); Future<bool> updateUsersV1(Iterable<SyncUserV1> data);
Future<bool> deleteUsersV1(SyncUserDeleteV1 data); Future<bool> deleteUsersV1(Iterable<SyncUserDeleteV1> data);
Future<bool> updatePartnerV1(SyncPartnerV1 data); Future<bool> updatePartnerV1(Iterable<SyncPartnerV1> data);
Future<bool> deletePartnerV1(SyncPartnerDeleteV1 data); Future<bool> deletePartnerV1(Iterable<SyncPartnerDeleteV1> data);
} }

View File

@ -2,6 +2,7 @@
import 'dart:async'; import 'dart:async';
import 'package:collection/collection.dart';
import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
@ -19,79 +20,122 @@ class SyncStreamService {
}) : _syncApiRepository = syncApiRepository, }) : _syncApiRepository = syncApiRepository,
_syncStreamRepository = syncStreamRepository; _syncStreamRepository = syncStreamRepository;
Future<bool> _handleSyncData(
SyncEntityType type,
// ignore: avoid-dynamic // ignore: avoid-dynamic
Future<bool> _handleSyncData(dynamic data) async { Iterable<dynamic> data,
if (data is SyncPartnerV1) { ) async {
_logger.fine("SyncPartnerV1: $data"); if (data.isEmpty) {
return await _syncStreamRepository.updatePartnerV1(data); _logger.warning("Received empty sync data for $type");
return false;
} }
if (data is SyncUserV1) { _logger.fine("Processing sync data for $type of length ${data.length}");
_logger.fine("SyncUserV1: $data");
return await _syncStreamRepository.updateUsersV1(data); try {
if (type == SyncEntityType.partnerV1) {
return await _syncStreamRepository.updatePartnerV1(data.cast());
} }
if (data is SyncPartnerDeleteV1) { if (type == SyncEntityType.partnerDeleteV1) {
_logger.fine("SyncPartnerDeleteV1: $data"); return await _syncStreamRepository.deletePartnerV1(data.cast());
return await _syncStreamRepository.deletePartnerV1(data);
} }
if (data is SyncUserDeleteV1) { if (type == SyncEntityType.userV1) {
_logger.fine("SyncUserDeleteV1: $data"); return await _syncStreamRepository.updateUsersV1(data.cast());
return await _syncStreamRepository.deleteUsersV1(data);
} }
if (type == SyncEntityType.userDeleteV1) {
return await _syncStreamRepository.deleteUsersV1(data.cast());
}
} catch (error, stack) {
_logger.severe("Error processing sync data for $type", error, stack);
return false;
}
_logger.warning("Unknown sync data type: $type");
return false; return false;
} }
Future<void> _syncEvent(List<SyncRequestType> types) async { Future<void> _syncEvent(List<SyncRequestType> types) async {
_logger.info("Syncing Events: $types"); _logger.info("Syncing Events: $types");
final streamCompleter = Completer(); final streamCompleter = Completer();
bool shouldSkipOnDone = false; bool shouldComplete = false;
final subscription = _syncApiRepository.getSyncEvents(types).listen(
(events) async {
try {
Map<SyncEntityType, String> acks = {};
for (final event in events) {
// the onDone callback might fire before the events are processed // the onDone callback might fire before the events are processed
// the following flag ensures that the onDone callback is not called // the following flag ensures that the onDone callback is not called
// before the events are processed // before the events are processed and also that events are processed sequentially
shouldSkipOnDone = true; Completer? mutex;
if (await _handleSyncData(event.data)) { final subscription = _syncApiRepository.getSyncEvents(types).listen(
// Only retain the latest ack from each type (events) async {
acks[event.type] = event.ack; if (events.isEmpty) {
_logger.warning("Received empty sync events");
return;
}
// If previous events are still being processed, wait for them to finish
if (mutex != null) {
await mutex!.future;
}
// Take control of the mutex and process the events
mutex = Completer();
try {
final eventsMap = events.groupListsBy((event) => event.type);
final Map<SyncEntityType, String> acks = {};
for (final entry in eventsMap.entries) {
final type = entry.key;
final data = entry.value;
if (data.isEmpty) {
_logger.warning("Received empty sync events for $type");
continue;
}
if (await _handleSyncData(type, data.map((e) => e.data))) {
// ignore: avoid-unsafe-collection-methods
acks[type] = data.last.ack;
} else {
_logger.warning("Failed to handle sync events for $type");
} }
} }
if (acks.isNotEmpty) {
await _syncApiRepository.ack(acks.values.toList()); await _syncApiRepository.ack(acks.values.toList());
}
_logger.info("$types events processed"); _logger.info("$types events processed");
} catch (error, stack) { } catch (error, stack) {
_logger.warning("Error handling sync events", error, stack); _logger.warning("Error handling sync events", error, stack);
} finally {
mutex?.complete();
mutex = null;
}
if (shouldComplete) {
_logger.info("Sync done, completing stream");
if (!streamCompleter.isCompleted) streamCompleter.complete();
} }
streamCompleter.completeOnce();
}, },
onError: (error) { onError: (error, stack) {
_logger.warning("Error in sync stream for $types", error); _logger.warning("Error in sync stream for $types", error, stack);
streamCompleter.completeOnce(); // Do not proceed if the stream errors
if (!streamCompleter.isCompleted) streamCompleter.complete();
}, },
// onDone is required to be called in cases where the stream is empty
onDone: () { onDone: () {
_logger.info("$types stream done"); _logger.info("$types stream done");
if (!shouldSkipOnDone) { if (mutex == null && !streamCompleter.isCompleted) {
streamCompleter.completeOnce(); streamCompleter.complete();
} else {
// Marks the stream as done but does not complete the completer
// until the events are processed
shouldComplete = true;
} }
}, },
); );
return await streamCompleter.future.whenComplete(subscription.cancel); return await streamCompleter.future.whenComplete(subscription.cancel);
} }
Future<void> syncUsers() => _syncEvent([SyncRequestType.usersV1]); Future<void> syncUsers() =>
Future<void> syncPartners() => _syncEvent([SyncRequestType.partnersV1]); _syncEvent([SyncRequestType.usersV1, SyncRequestType.partnersV1]);
}
extension on Completer {
void completeOnce() {
if (!isCompleted) {
complete();
}
}
} }

View File

@ -35,5 +35,6 @@ extension DurationExtension on String {
} }
extension UUIDExtension on String { extension UUIDExtension on String {
Uint8List toUuidByte() => UuidParsing.parseAsByteList(this); Uint8List toUuidByte({bool shouldValidate = false}) =>
UuidParsing.parseAsByteList(this, validate: shouldValidate);
} }

View File

@ -2,6 +2,7 @@ import 'dart:async';
import 'dart:convert'; import 'dart:convert';
import 'package:http/http.dart' as http; import 'package:http/http.dart' as http;
import 'package:immich_mobile/constants/constants.dart';
import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
import 'package:immich_mobile/domain/models/sync_event.model.dart'; import 'package:immich_mobile/domain/models/sync_event.model.dart';
import 'package:immich_mobile/services/api.service.dart'; import 'package:immich_mobile/services/api.service.dart';
@ -11,7 +12,9 @@ import 'package:openapi/api.dart';
class SyncApiRepository implements ISyncApiRepository { class SyncApiRepository implements ISyncApiRepository {
final Logger _logger = Logger('SyncApiRepository'); final Logger _logger = Logger('SyncApiRepository');
final ApiService _api; final ApiService _api;
SyncApiRepository(this._api); final int _batchSize;
SyncApiRepository(this._api, {int batchSize = kSyncEventBatchSize})
: _batchSize = batchSize;
@override @override
Stream<List<SyncEvent>> getSyncEvents(List<SyncRequestType> type) { Stream<List<SyncEvent>> getSyncEvents(List<SyncRequestType> type) {
@ -23,10 +26,7 @@ class SyncApiRepository implements ISyncApiRepository {
return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: data)); return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: data));
} }
Stream<List<SyncEvent>> _getSyncStream( Stream<List<SyncEvent>> _getSyncStream(SyncStreamDto dto) async* {
SyncStreamDto dto, {
int batchSize = 5000,
}) async* {
final client = http.Client(); final client = http.Client();
final endpoint = "${_api.apiClient.basePath}/sync/stream"; final endpoint = "${_api.apiClient.basePath}/sync/stream";
@ -64,7 +64,7 @@ class SyncApiRepository implements ISyncApiRepository {
previousChunk = parts.removeLast(); previousChunk = parts.removeLast();
lines.addAll(parts); lines.addAll(parts);
if (lines.length < batchSize) { if (lines.length < _batchSize) {
continue; continue;
} }
@ -90,7 +90,7 @@ class SyncApiRepository implements ISyncApiRepository {
final ack = jsonData['ack']; final ack = jsonData['ack'];
final converter = _kResponseMap[type]; final converter = _kResponseMap[type];
if (converter == null) { if (converter == null) {
_logger.warning("[_parseSyncReponse] Unknown type $type"); _logger.warning("[_parseSyncResponse] Unknown type $type");
continue; continue;
} }

View File

@ -15,11 +15,16 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository
DriftSyncStreamRepository(super.db) : _db = db; DriftSyncStreamRepository(super.db) : _db = db;
@override @override
Future<bool> deleteUsersV1(SyncUserDeleteV1 data) async { Future<bool> deleteUsersV1(Iterable<SyncUserDeleteV1> data) async {
try { try {
await _db.managers.userEntity await _db.batch((batch) {
.filter((row) => row.id.equals(data.userId.toUuidByte())) for (final user in data) {
.delete(); batch.delete(
_db.userEntity,
UserEntityCompanion(id: Value(user.userId.toUuidByte())),
);
}
});
return true; return true;
} catch (e, s) { } catch (e, s) {
_logger.severe('Error while processing SyncUserDeleteV1', e, s); _logger.severe('Error while processing SyncUserDeleteV1', e, s);
@ -28,17 +33,22 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository
} }
@override @override
Future<bool> updateUsersV1(SyncUserV1 data) async { Future<bool> updateUsersV1(Iterable<SyncUserV1> data) async {
try {
await _db.batch((batch) {
for (final user in data) {
final companion = UserEntityCompanion( final companion = UserEntityCompanion(
name: Value(data.name), name: Value(user.name),
email: Value(data.email), email: Value(user.email),
); );
try { batch.insert(
await _db.userEntity.insertOne( _db.userEntity,
companion.copyWith(id: Value(data.id.toUuidByte())), companion.copyWith(id: Value(user.id.toUuidByte())),
onConflict: DoUpdate((_) => companion), onConflict: DoUpdate((_) => companion),
); );
}
});
return true; return true;
} catch (e, s) { } catch (e, s) {
_logger.severe('Error while processing SyncUserV1', e, s); _logger.severe('Error while processing SyncUserV1', e, s);
@ -47,15 +57,19 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository
} }
@override @override
Future<bool> deletePartnerV1(SyncPartnerDeleteV1 data) async { Future<bool> deletePartnerV1(Iterable<SyncPartnerDeleteV1> data) async {
try { try {
await _db.managers.partnerEntity await _db.batch((batch) {
.filter( for (final partner in data) {
(row) => batch.delete(
row.sharedById.id.equals(data.sharedById.toUuidByte()) & _db.partnerEntity,
row.sharedWithId.id.equals(data.sharedWithId.toUuidByte()), PartnerEntityCompanion(
) sharedById: Value(partner.sharedById.toUuidByte()),
.delete(); sharedWithId: Value(partner.sharedWithId.toUuidByte()),
),
);
}
});
return true; return true;
} catch (e, s) { } catch (e, s) {
_logger.severe('Error while processing SyncPartnerDeleteV1', e, s); _logger.severe('Error while processing SyncPartnerDeleteV1', e, s);
@ -64,18 +78,23 @@ class DriftSyncStreamRepository extends DriftDatabaseRepository
} }
@override @override
Future<bool> updatePartnerV1(SyncPartnerV1 data) async { Future<bool> updatePartnerV1(Iterable<SyncPartnerV1> data) async {
final companion =
PartnerEntityCompanion(inTimeline: Value(data.inTimeline));
try { try {
await _db.partnerEntity.insertOne( await _db.batch((batch) {
for (final partner in data) {
final companion =
PartnerEntityCompanion(inTimeline: Value(partner.inTimeline));
batch.insert(
_db.partnerEntity,
companion.copyWith( companion.copyWith(
sharedById: Value(data.sharedById.toUuidByte()), sharedById: Value(partner.sharedById.toUuidByte()),
sharedWithId: Value(data.sharedWithId.toUuidByte()), sharedWithId: Value(partner.sharedWithId.toUuidByte()),
), ),
onConflict: DoUpdate((_) => companion), onConflict: DoUpdate((_) => companion),
); );
}
});
return true; return true;
} catch (e, s) { } catch (e, s) {
_logger.severe('Error while processing SyncPartnerV1', e, s); _logger.severe('Error while processing SyncPartnerV1', e, s);

View File

@ -9,7 +9,6 @@ import 'package:immich_mobile/utils/isolate.dart';
class BackgroundSyncManager { class BackgroundSyncManager {
// This prevents multiple syncs from running at the same time // This prevents multiple syncs from running at the same time
final _userSyncCache = AsyncCache.ephemeral(); final _userSyncCache = AsyncCache.ephemeral();
final _partnerSyncCache = AsyncCache.ephemeral();
BackgroundSyncManager(); BackgroundSyncManager();
@ -18,10 +17,4 @@ class BackgroundSyncManager {
(ref) => ref.read(syncStreamServiceProvider).syncUsers(), (ref) => ref.read(syncStreamServiceProvider).syncUsers(),
), ),
); );
Future<void> syncPartners() => _partnerSyncCache.fetch(
() async => runInIsolate(
(ref) => ref.read(syncStreamServiceProvider).syncPartners(),
),
);
} }

View File

@ -1,8 +1,11 @@
// ignore_for_file: avoid-unnecessary-futures
import 'dart:async'; import 'dart:async';
import 'package:flutter_test/flutter_test.dart'; import 'package:flutter_test/flutter_test.dart';
import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
import 'package:immich_mobile/domain/models/sync_event.model.dart';
import 'package:immich_mobile/domain/services/sync_stream.service.dart'; import 'package:immich_mobile/domain/services/sync_stream.service.dart';
import 'package:mocktail/mocktail.dart'; import 'package:mocktail/mocktail.dart';
import 'package:openapi/api.dart'; import 'package:openapi/api.dart';
@ -14,105 +17,314 @@ void main() {
late SyncStreamService sut; late SyncStreamService sut;
late ISyncStreamRepository mockSyncStreamRepo; late ISyncStreamRepository mockSyncStreamRepo;
late ISyncApiRepository mockSyncApiRepo; late ISyncApiRepository mockSyncApiRepo;
late StreamController<List<SyncEvent>> streamController;
successHandler(Invocation _) async => true;
failureHandler(Invocation _) async => false;
setUp(() { setUp(() {
mockSyncStreamRepo = MockSyncStreamRepository(); mockSyncStreamRepo = MockSyncStreamRepository();
mockSyncApiRepo = MockSyncApiRepository(); mockSyncApiRepo = MockSyncApiRepository();
streamController = StreamController<List<SyncEvent>>.broadcast();
sut = SyncStreamService( sut = SyncStreamService(
syncApiRepository: mockSyncApiRepo, syncApiRepository: mockSyncApiRepo,
syncStreamRepository: mockSyncStreamRepo, syncStreamRepository: mockSyncStreamRepo,
); );
when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1])) // Default stream setup - emits one batch and closes
.thenAnswer((_) => Stream.value(SyncStreamStub.userEvents)); when(() => mockSyncApiRepo.getSyncEvents(any()))
when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.partnersV1])) .thenAnswer((_) => streamController.stream);
.thenAnswer((_) => Stream.value(SyncStreamStub.partnerEvents));
when(() => mockSyncApiRepo.ack(any())).thenAnswer((_) => Future.value());
registerFallbackValue(SyncStreamStub.userV1Admin); // Default ack setup
when(() => mockSyncApiRepo.ack(any())).thenAnswer((_) async => {});
// Register fallbacks for mocktail verification
registerFallbackValue(<SyncUserV1>[]);
registerFallbackValue(<SyncPartnerV1>[]);
registerFallbackValue(<SyncUserDeleteV1>[]);
registerFallbackValue(<SyncPartnerDeleteV1>[]);
// Default successful repository calls
when(() => mockSyncStreamRepo.updateUsersV1(any())) when(() => mockSyncStreamRepo.updateUsersV1(any()))
.thenAnswer((_) => Future.value(true)); .thenAnswer(successHandler);
registerFallbackValue(SyncStreamStub.partnerV1); when(() => mockSyncStreamRepo.deleteUsersV1(any()))
.thenAnswer(successHandler);
when(() => mockSyncStreamRepo.updatePartnerV1(any())) when(() => mockSyncStreamRepo.updatePartnerV1(any()))
.thenAnswer((_) => Future.value(false)); .thenAnswer(successHandler);
registerFallbackValue(SyncStreamStub.userDeleteV1);
when(() => mockSyncStreamRepo.deleteUsersV1(any()))
.thenAnswer((_) => Future.value(false));
registerFallbackValue(SyncStreamStub.partnerDeleteV1);
when(() => mockSyncStreamRepo.deletePartnerV1(any())) when(() => mockSyncStreamRepo.deletePartnerV1(any()))
.thenAnswer((_) => Future.value(true)); .thenAnswer(successHandler);
}); });
group("_syncEvent", () { tearDown(() async {
test("future completed on success", () async { await streamController.close();
});
// Helper to trigger sync and add events to the stream
Future<void> triggerSyncAndEmit(List<SyncEvent> events) async {
final future = sut.syncUsers(); // Start listening
await Future.delayed(Duration.zero); // Allow listener to attach
if (!streamController.isClosed) {
streamController.add(events);
await streamController.close(); // Close after emitting
}
await future; // Wait for processing to complete
}
group("SyncStreamService", () {
test(
"completes successfully when stream emits data and handlers succeed",
() async {
final events = [
...SyncStreamStub.userEvents,
...SyncStreamStub.partnerEvents,
];
final future = triggerSyncAndEmit(events);
await expectLater(future, completes);
// Verify ack includes last ack from each successfully handled type
verify(
() => mockSyncApiRepo
.ack(any(that: containsAllInOrder(["5", "2", "4", "3"]))),
).called(1);
},
);
test("completes successfully when stream emits an error", () async {
when(() => mockSyncApiRepo.getSyncEvents(any()))
.thenAnswer((_) => Stream.error(Exception("Stream Error")));
// Should complete gracefully without throwing
await expectLater(sut.syncUsers(), completes); await expectLater(sut.syncUsers(), completes);
verifyNever(() => mockSyncApiRepo.ack(any())); // No ack on stream error
}); });
test("future completes on error from stream", () async { test("throws when initial getSyncEvents call fails", () async {
when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1])) final apiException = Exception("API Error");
.thenAnswer((_) => Stream.error(Exception("Error"))); when(() => mockSyncApiRepo.getSyncEvents(any())).thenThrow(apiException);
await expectLater(sut.syncUsers(), completes); // Should rethrow the exception from the initial call
await expectLater(sut.syncUsers(), throwsA(apiException));
verifyNever(() => mockSyncApiRepo.ack(any()));
}); });
test("future throws on api exception", () { test(
when(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1])) "completes successfully when a repository handler throws an exception",
.thenThrow(Exception("Error")); () async {
expect(sut.syncUsers(), throwsA(isA<Exception>()));
});
test("future completes on repository exception", () {
when(() => mockSyncStreamRepo.updateUsersV1(any())) when(() => mockSyncStreamRepo.updateUsersV1(any()))
.thenThrow(Exception("Error")); .thenThrow(Exception("Repo Error"));
expect(sut.syncUsers(), completes); final events = [
}); ...SyncStreamStub.userEvents,
...SyncStreamStub.partnerEvents,
];
// Should complete, but ack only for the successful types
await triggerSyncAndEmit(events);
// Only partner delete was successful by default setup
verify(() => mockSyncApiRepo.ack(["2", "4", "3"])).called(1);
},
);
test("sends ack for successful events", () async { test(
"completes successfully but sends no ack when all handlers fail",
() async {
when(() => mockSyncStreamRepo.updateUsersV1(any())) when(() => mockSyncStreamRepo.updateUsersV1(any()))
.thenAnswer((_) => Future.value(false)); .thenAnswer(failureHandler);
when(() => mockSyncStreamRepo.deleteUsersV1(any())) when(() => mockSyncStreamRepo.deleteUsersV1(any()))
.thenAnswer((_) => Future.value(true)); .thenAnswer(failureHandler);
await sut.syncUsers(); when(() => mockSyncStreamRepo.updatePartnerV1(any()))
verify(() => mockSyncApiRepo.ack(["2"])).called(1); .thenAnswer(failureHandler);
when(() => mockSyncStreamRepo.deletePartnerV1(any()))
.thenAnswer(failureHandler);
final events = [
...SyncStreamStub.userEvents,
...SyncStreamStub.partnerEvents,
];
await triggerSyncAndEmit(events);
verifyNever(() => mockSyncApiRepo.ack(any()));
},
);
test("sends ack only for types where handler returns true", () async {
// Mock specific handlers: user update fails, user delete succeeds
when(() => mockSyncStreamRepo.updateUsersV1(any()))
.thenAnswer(failureHandler);
when(() => mockSyncStreamRepo.deleteUsersV1(any()))
.thenAnswer(successHandler);
// partner update fails, partner delete succeeds
when(() => mockSyncStreamRepo.updatePartnerV1(any()))
.thenAnswer(failureHandler);
final events = [
...SyncStreamStub.userEvents,
...SyncStreamStub.partnerEvents,
];
await triggerSyncAndEmit(events);
// Expect ack only for userDeleteV1 (ack: "2") and partnerDeleteV1 (ack: "4")
verify(
() => mockSyncApiRepo.ack(any(that: containsAllInOrder(["2", "4"]))),
).called(1);
}); });
test("only sends the latest ack for events of same type", () async { test("does not process or ack when stream emits an empty list", () async {
await sut.syncUsers(); final future = sut.syncUsers();
verify(() => mockSyncApiRepo.ack(["5"])).called(1); streamController.add([]); // Emit empty list
await streamController.close();
await future; // Wait for completion
verifyNever(() => mockSyncStreamRepo.updateUsersV1(any()));
verifyNever(() => mockSyncStreamRepo.deleteUsersV1(any()));
verifyNever(() => mockSyncStreamRepo.updatePartnerV1(any()));
verifyNever(() => mockSyncStreamRepo.deletePartnerV1(any()));
verifyNever(() => mockSyncApiRepo.ack(any()));
});
test("processes multiple batches sequentially using mutex", () async {
final completer1 = Completer<void>();
final completer2 = Completer<void>();
int callOrder = 0;
int handler1StartOrder = -1;
int handler2StartOrder = -1;
when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer((_) async {
handler1StartOrder = ++callOrder; // Record when handler 1 starts
await completer1.future; // Wait for external signal
return true;
});
when(() => mockSyncStreamRepo.updatePartnerV1(any()))
.thenAnswer((_) async {
handler2StartOrder = ++callOrder; // Record when handler 2 starts
await completer2.future;
return true;
});
final batch1 = SyncStreamStub.userEvents;
final batch2 = SyncStreamStub.partnerEvents;
final syncFuture = sut.syncUsers();
streamController.add(batch1); // Emit first batch
// Ensure first batch processing starts
await Future.delayed(const Duration(milliseconds: 10));
expect(handler1StartOrder, 1, reason: "Handler 1 should start first");
streamController.add(batch2); // Emit second batch while first is waiting
await Future.delayed(const Duration(milliseconds: 10));
// Handler 2 should NOT have started yet because mutex is held by handler 1
expect(
handler2StartOrder,
-1,
reason: "Handler 2 should wait for Handler 1",
);
completer1.complete(); // Allow first handler to finish
// Allow second batch to start processing
await Future.delayed(const Duration(milliseconds: 10));
// Now handler 2 should start
expect(
handler2StartOrder,
2,
reason: "Handler 2 should start after Handler 1 finishes",
);
completer2.complete(); // Allow second handler to finish
await streamController.close(); // Close stream
await syncFuture; // Wait for overall completion
// Verify handlers were called and acks sent for both batches eventually
verify(() => mockSyncStreamRepo.updateUsersV1(any())).called(1);
verify(() => mockSyncStreamRepo.updatePartnerV1(any())).called(1);
// Acks might be called separately or combined depending on timing, check count
verify(() => mockSyncApiRepo.ack(any())).called(2);
});
test("completes successfully when ack call throws an exception", () async {
when(() => mockSyncApiRepo.ack(any())).thenThrow(Exception("Ack Error"));
final events = [
...SyncStreamStub.userEvents,
...SyncStreamStub.partnerEvents,
];
// Should still complete even if ack fails
await triggerSyncAndEmit(events);
verify(() => mockSyncApiRepo.ack(any()))
.called(1); // Verify ack was attempted
});
test("waits for processing to finish if onDone called early", () async {
final processingCompleter = Completer<void>();
bool handlerFinished = false;
when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer((_) async {
await processingCompleter.future; // Wait inside handler
handlerFinished = true;
return true;
});
final syncFuture = sut.syncUsers();
// Allow listener to attach
// This is necessary to ensure the stream is ready to receive events
await Future.delayed(Duration.zero);
streamController.add(SyncStreamStub.userEvents); // Emit batch
await Future.delayed(
const Duration(milliseconds: 10),
); // Ensure processing starts
await streamController
.close(); // Close stream (triggers onDone internally)
await Future.delayed(
const Duration(milliseconds: 10),
); // Give onDone a chance to fire
// At this point, onDone was called, but processing is blocked
expect(handlerFinished, isFalse);
processingCompleter.complete(); // Allow processing to finish
await syncFuture; // Now the main future should complete
expect(handlerFinished, isTrue);
verify(() => mockSyncApiRepo.ack(any())).called(1);
}); });
}); });
group("syncUsers", () { group("syncUsers", () {
test("calls _syncEvent with usersV1", () async { test("calls getSyncEvents with correct types", () async {
await sut.syncUsers(); // Need to close the stream for the future to complete
verify(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.usersV1])) final future = sut.syncUsers();
.called(1); await streamController.close();
}); await future;
test("calls _handleSyncData for each event", () async {
await sut.syncUsers();
verify(() => mockSyncStreamRepo.updateUsersV1(SyncStreamStub.userV1Admin))
.called(1);
verify( verify(
() => mockSyncStreamRepo.deleteUsersV1(SyncStreamStub.userDeleteV1), () => mockSyncApiRepo.getSyncEvents([
SyncRequestType.usersV1,
SyncRequestType.partnersV1,
]),
).called(1); ).called(1);
}); });
});
group("syncPartners", () { test("calls repository methods with correctly grouped data", () async {
test("calls _syncEvent with partnersV1", () async { final events = [
await sut.syncPartners(); ...SyncStreamStub.userEvents,
verify(() => mockSyncApiRepo.getSyncEvents([SyncRequestType.partnersV1])) ...SyncStreamStub.partnerEvents,
.called(1); ];
}); await triggerSyncAndEmit(events);
test("calls _handleSyncData for each event", () async { // Verify each handler was called with the correct list of data payloads
await sut.syncPartners();
verify( verify(
() => mockSyncStreamRepo.updatePartnerV1(SyncStreamStub.partnerV1), () => mockSyncStreamRepo.updateUsersV1(
[SyncStreamStub.userV1Admin, SyncStreamStub.userV1User],
),
).called(1); ).called(1);
verify( verify(
() => () => mockSyncStreamRepo.deleteUsersV1([SyncStreamStub.userDeleteV1]),
mockSyncStreamRepo.deletePartnerV1(SyncStreamStub.partnerDeleteV1), ).called(1);
verify(
() => mockSyncStreamRepo.updatePartnerV1([SyncStreamStub.partnerV1]),
).called(1);
verify(
() => mockSyncStreamRepo
.deletePartnerV1([SyncStreamStub.partnerDeleteV1]),
).called(1); ).called(1);
}); });
}); });

View File

@ -35,11 +35,11 @@ abstract final class SyncStreamStub {
sharedWithId: "4", sharedWithId: "4",
); );
static final partnerEvents = [ static final partnerEvents = [
SyncEvent(type: SyncEntityType.partnerV1, data: partnerV1, ack: "3"),
SyncEvent( SyncEvent(
type: SyncEntityType.partnerDeleteV1, type: SyncEntityType.partnerDeleteV1,
data: partnerDeleteV1, data: partnerDeleteV1,
ack: "4", ack: "4",
), ),
SyncEvent(type: SyncEntityType.partnerV1, data: partnerV1, ack: "3"),
]; ];
} }

View File

@ -29,4 +29,3 @@ class MockSearchApi extends Mock implements SearchApi {}
class MockAppSettingService extends Mock implements AppSettingsService {} class MockAppSettingService extends Mock implements AppSettingsService {}
class MockBackgroundService extends Mock implements BackgroundService {} class MockBackgroundService extends Mock implements BackgroundService {}