misc fixes

This commit is contained in:
shenlong-tanwen 2025-04-16 22:47:41 +05:30
parent b72a91a8da
commit 04e6e5641e
5 changed files with 34 additions and 12 deletions

View File

@ -9,6 +9,19 @@ import 'package:logging/logging.dart';
import 'package:openapi/api.dart'; import 'package:openapi/api.dart';
import 'package:worker_manager/worker_manager.dart'; import 'package:worker_manager/worker_manager.dart';
const _kSyncTypeOrder = [
SyncEntityType.userDeleteV1,
SyncEntityType.userV1,
SyncEntityType.partnerDeleteV1,
SyncEntityType.partnerV1,
SyncEntityType.assetDeleteV1,
SyncEntityType.assetV1,
SyncEntityType.assetExifV1,
SyncEntityType.partnerAssetDeleteV1,
SyncEntityType.partnerAssetV1,
SyncEntityType.partnerAssetExifV1,
];
class SyncStreamService { class SyncStreamService {
final Logger _logger = Logger('SyncStreamService'); final Logger _logger = Logger('SyncStreamService');
@ -102,7 +115,12 @@ class SyncStreamService {
final eventsMap = events.groupListsBy((event) => event.type); final eventsMap = events.groupListsBy((event) => event.type);
final Map<SyncEntityType, String> acks = {}; final Map<SyncEntityType, String> acks = {};
for (final entry in eventsMap.entries) { for (final type in _kSyncTypeOrder) {
final data = eventsMap[type];
if (data == null) {
continue;
}
if (_cancelChecker?.call() ?? false) { if (_cancelChecker?.call() ?? false) {
_logger.info("Sync cancelled, stopping stream"); _logger.info("Sync cancelled, stopping stream");
mutex?.complete(); mutex?.complete();
@ -117,9 +135,6 @@ class SyncStreamService {
return; return;
} }
final type = entry.key;
final data = entry.value;
if (data.isEmpty) { if (data.isEmpty) {
_logger.warning("Received empty sync events for $type"); _logger.warning("Received empty sync events for $type");
continue; continue;

View File

@ -11,12 +11,17 @@ class BackgroundSyncManager {
BackgroundSyncManager(); BackgroundSyncManager();
void cancel() { Future<void> cancel() {
final futures = <Future>[];
if (_userSyncTask != null) {
futures.add(_userSyncTask!.future);
}
_userSyncTask?.cancel(); _userSyncTask?.cancel();
_userSyncTask = null; _userSyncTask = null;
return Future.wait(futures);
} }
Future<void> syncUsers() async { Future<void> syncUsers() {
if (_userSyncTask != null) { if (_userSyncTask != null) {
return _userSyncTask!.future; return _userSyncTask!.future;
} }
@ -27,5 +32,6 @@ class BackgroundSyncManager {
_userSyncTask!.whenComplete(() { _userSyncTask!.whenComplete(() {
_userSyncTask = null; _userSyncTask = null;
}); });
return _userSyncTask!.future;
} }
} }

View File

@ -120,9 +120,10 @@ class AuthService {
/// - Asset ETag /// - Asset ETag
/// ///
/// All deletions are executed in parallel using [Future.wait]. /// All deletions are executed in parallel using [Future.wait].
Future<void> clearLocalData() { Future<void> clearLocalData() async {
_backgroundSyncManager.cancel(); // Cancel any ongoing background sync operations before clearing data
return Future.wait([ await _backgroundSyncManager.cancel();
await Future.wait([
_authRepository.clearLocalData(), _authRepository.clearLocalData(),
Store.delete(StoreKey.currentUser), Store.delete(StoreKey.currentUser),
Store.delete(StoreKey.accessToken), Store.delete(StoreKey.accessToken),

View File

@ -275,7 +275,7 @@ void main() {
bool handlerStarted = false; bool handlerStarted = false;
// Make handler wait so we can cancel it mid-flight // Make handler wait so we can cancel it mid-flight
when(() => mockSyncStreamRepo.updateUsersV1(any())) when(() => mockSyncStreamRepo.deleteUsersV1(any()))
.thenAnswer((_) async { .thenAnswer((_) async {
handlerStarted = true; handlerStarted = true;
await processingCompleter await processingCompleter

View File

@ -120,7 +120,7 @@ void main() {
group('logout', () { group('logout', () {
test('Should logout user', () async { test('Should logout user', () async {
when(() => authApiRepository.logout()).thenAnswer((_) async => {}); when(() => authApiRepository.logout()).thenAnswer((_) async => {});
when(() => backgroundSyncManager.cancel()).thenAnswer((_) => {}); when(() => backgroundSyncManager.cancel()).thenAnswer((_) async => {});
when(() => authRepository.clearLocalData()) when(() => authRepository.clearLocalData())
.thenAnswer((_) => Future.value(null)); .thenAnswer((_) => Future.value(null));
@ -134,7 +134,7 @@ void main() {
test('Should clear local data even on server error', () async { test('Should clear local data even on server error', () async {
when(() => authApiRepository.logout()) when(() => authApiRepository.logout())
.thenThrow(Exception('Server error')); .thenThrow(Exception('Server error'));
when(() => backgroundSyncManager.cancel()).thenAnswer((_) => {}); when(() => backgroundSyncManager.cancel()).thenAnswer((_) async => {});
when(() => authRepository.clearLocalData()) when(() => authRepository.clearLocalData())
.thenAnswer((_) => Future.value(null)); .thenAnswer((_) => Future.value(null));