diff --git a/mobile/analysis_options.yaml b/mobile/analysis_options.yaml index 04f3145908..854f852e3c 100644 --- a/mobile/analysis_options.yaml +++ b/mobile/analysis_options.yaml @@ -35,6 +35,7 @@ linter: analyzer: exclude: - openapi/** + - build/** - lib/generated_plugin_registrant.dart - lib/**/*.g.dart - lib/**/*.drift.dart @@ -92,6 +93,9 @@ custom_lint: allowed: # required / wanted - lib/repositories/*_api.repository.dart + - lib/domain/models/sync_event.model.dart + - lib/{domain,infrastructure}/**/sync_stream.* + - lib/{domain,infrastructure}/**/sync_api.* - lib/infrastructure/repositories/*_api.repository.dart - lib/infrastructure/utils/*.converter.dart # acceptable exceptions for the time being @@ -144,7 +148,9 @@ dart_code_metrics: - avoid-global-state - avoid-inverted-boolean-checks - avoid-late-final-reassignment - - avoid-local-functions + - avoid-local-functions: + exclude: + - test/**.dart - avoid-negated-conditions - avoid-nested-streams-and-futures - avoid-referencing-subclasses diff --git a/mobile/devtools_options.yaml b/mobile/devtools_options.yaml index fa0b357c4f..f592d85a9b 100644 --- a/mobile/devtools_options.yaml +++ b/mobile/devtools_options.yaml @@ -1,3 +1,4 @@ description: This file stores settings for Dart & Flutter DevTools. documentation: https://docs.flutter.dev/tools/devtools/extensions#configure-extension-enablement-states extensions: + - drift: true \ No newline at end of file diff --git a/mobile/lib/constants/constants.dart b/mobile/lib/constants/constants.dart index 83d540d54c..a91e0a715d 100644 --- a/mobile/lib/constants/constants.dart +++ b/mobile/lib/constants/constants.dart @@ -5,5 +5,9 @@ const double downloadFailed = -2; // Number of log entries to retain on app start const int kLogTruncateLimit = 250; +// Sync +const int kSyncEventBatchSize = 5000; + +// Hash batch limits const int kBatchHashFileLimit = 128; const int kBatchHashSizeLimit = 1024 * 1024 * 1024; // 1GB diff --git a/mobile/lib/domain/interfaces/sync_api.interface.dart b/mobile/lib/domain/interfaces/sync_api.interface.dart index fb8f1aa46e..44e22c5894 100644 --- a/mobile/lib/domain/interfaces/sync_api.interface.dart +++ b/mobile/lib/domain/interfaces/sync_api.interface.dart @@ -1,7 +1,8 @@ -import 'package:immich_mobile/domain/models/sync/sync_event.model.dart'; +import 'package:immich_mobile/domain/models/sync_event.model.dart'; +import 'package:openapi/api.dart'; abstract interface class ISyncApiRepository { - Future ack(String data); + Future ack(List data); - Stream> watchUserSyncEvent(); + Stream> getSyncEvents(List type); } diff --git a/mobile/lib/domain/interfaces/sync_stream.interface.dart b/mobile/lib/domain/interfaces/sync_stream.interface.dart new file mode 100644 index 0000000000..f9c52d7ee0 --- /dev/null +++ b/mobile/lib/domain/interfaces/sync_stream.interface.dart @@ -0,0 +1,10 @@ +import 'package:immich_mobile/domain/interfaces/db.interface.dart'; +import 'package:openapi/api.dart'; + +abstract interface class ISyncStreamRepository implements IDatabaseRepository { + Future updateUsersV1(Iterable data); + Future deleteUsersV1(Iterable data); + + Future updatePartnerV1(Iterable data); + Future deletePartnerV1(Iterable data); +} diff --git a/mobile/lib/domain/models/sync/sync_event.model.dart b/mobile/lib/domain/models/sync/sync_event.model.dart deleted file mode 100644 index f4642d59cf..0000000000 --- a/mobile/lib/domain/models/sync/sync_event.model.dart +++ /dev/null @@ -1,14 +0,0 @@ -class SyncEvent { - // dynamic - final dynamic data; - - final String ack; - - SyncEvent({ - required this.data, - required this.ack, - }); - - @override - String toString() => 'SyncEvent(data: $data, ack: $ack)'; -} diff --git a/mobile/lib/domain/models/sync_event.model.dart b/mobile/lib/domain/models/sync_event.model.dart new file mode 100644 index 0000000000..2ad8a75fe1 --- /dev/null +++ b/mobile/lib/domain/models/sync_event.model.dart @@ -0,0 +1,13 @@ +import 'package:openapi/api.dart'; + +class SyncEvent { + final SyncEntityType type; + // ignore: avoid-dynamic + final dynamic data; + final String ack; + + const SyncEvent({required this.type, required this.data, required this.ack}); + + @override + String toString() => 'SyncEvent(type: $type, data: $data, ack: $ack)'; +} diff --git a/mobile/lib/domain/services/sync_stream.service.dart b/mobile/lib/domain/services/sync_stream.service.dart index 72e29b3677..8d7d87e35e 100644 --- a/mobile/lib/domain/services/sync_stream.service.dart +++ b/mobile/lib/domain/services/sync_stream.service.dart @@ -1,49 +1,200 @@ +// ignore_for_file: avoid-passing-async-when-sync-expected + import 'dart:async'; -import 'package:flutter/foundation.dart'; +import 'package:collection/collection.dart'; import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; +import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; +import 'package:logging/logging.dart'; import 'package:openapi/api.dart'; +import 'package:worker_manager/worker_manager.dart'; + +const _kSyncTypeOrder = [ + SyncEntityType.userDeleteV1, + SyncEntityType.userV1, + SyncEntityType.partnerDeleteV1, + SyncEntityType.partnerV1, + SyncEntityType.assetDeleteV1, + SyncEntityType.assetV1, + SyncEntityType.assetExifV1, + SyncEntityType.partnerAssetDeleteV1, + SyncEntityType.partnerAssetV1, + SyncEntityType.partnerAssetExifV1, +]; class SyncStreamService { + final Logger _logger = Logger('SyncStreamService'); + final ISyncApiRepository _syncApiRepository; + final ISyncStreamRepository _syncStreamRepository; + final bool Function()? _cancelChecker; - SyncStreamService(this._syncApiRepository); + SyncStreamService({ + required ISyncApiRepository syncApiRepository, + required ISyncStreamRepository syncStreamRepository, + bool Function()? cancelChecker, + }) : _syncApiRepository = syncApiRepository, + _syncStreamRepository = syncStreamRepository, + _cancelChecker = cancelChecker; - StreamSubscription? _userSyncSubscription; + Future _handleSyncData( + SyncEntityType type, + // ignore: avoid-dynamic + Iterable data, + ) async { + if (data.isEmpty) { + _logger.warning("Received empty sync data for $type"); + return false; + } - void syncUsers() { - _userSyncSubscription = - _syncApiRepository.watchUserSyncEvent().listen((events) async { - for (final event in events) { - if (event.data is SyncUserV1) { - final data = event.data as SyncUserV1; - debugPrint("User Update: $data"); + _logger.fine("Processing sync data for $type of length ${data.length}"); - // final user = await _userRepository.get(data.id); - - // if (user == null) { - // continue; - // } - - // user.name = data.name; - // user.email = data.email; - // user.updatedAt = DateTime.now(); - - // await _userRepository.update(user); - // await _syncApiRepository.ack(event.ack); - } - - if (event.data is SyncUserDeleteV1) { - final data = event.data as SyncUserDeleteV1; - - debugPrint("User delete: $data"); - // await _syncApiRepository.ack(event.ack); - } + try { + if (type == SyncEntityType.partnerV1) { + return await _syncStreamRepository.updatePartnerV1(data.cast()); } + + if (type == SyncEntityType.partnerDeleteV1) { + return await _syncStreamRepository.deletePartnerV1(data.cast()); + } + + if (type == SyncEntityType.userV1) { + return await _syncStreamRepository.updateUsersV1(data.cast()); + } + + if (type == SyncEntityType.userDeleteV1) { + return await _syncStreamRepository.deleteUsersV1(data.cast()); + } + } catch (error, stack) { + _logger.severe("Error processing sync data for $type", error, stack); + return false; + } + + _logger.warning("Unknown sync data type: $type"); + return false; + } + + Future _syncEvent(List types) { + _logger.info("Syncing Events: $types"); + final streamCompleter = Completer(); + bool shouldComplete = false; + // the onDone callback might fire before the events are processed + // the following flag ensures that the onDone callback is not called + // before the events are processed and also that events are processed sequentially + Completer? mutex; + StreamSubscription? subscription; + try { + subscription = _syncApiRepository.getSyncEvents(types).listen( + (events) async { + if (events.isEmpty) { + _logger.warning("Received empty sync events"); + return; + } + + // If previous events are still being processed, wait for them to finish + if (mutex != null) { + await mutex!.future; + } + + if (_cancelChecker?.call() ?? false) { + _logger.info("Sync cancelled, stopping stream"); + subscription?.cancel(); + if (!streamCompleter.isCompleted) { + streamCompleter.completeError( + CanceledError(), + StackTrace.current, + ); + } + return; + } + + // Take control of the mutex and process the events + mutex = Completer(); + + try { + final eventsMap = events.groupListsBy((event) => event.type); + final Map acks = {}; + + for (final type in _kSyncTypeOrder) { + final data = eventsMap[type]; + if (data == null) { + continue; + } + + if (_cancelChecker?.call() ?? false) { + _logger.info("Sync cancelled, stopping stream"); + mutex?.complete(); + mutex = null; + if (!streamCompleter.isCompleted) { + streamCompleter.completeError( + CanceledError(), + StackTrace.current, + ); + } + + return; + } + + if (data.isEmpty) { + _logger.warning("Received empty sync events for $type"); + continue; + } + + if (await _handleSyncData(type, data.map((e) => e.data))) { + // ignore: avoid-unsafe-collection-methods + acks[type] = data.last.ack; + } else { + _logger.warning("Failed to handle sync events for $type"); + } + } + + if (acks.isNotEmpty) { + await _syncApiRepository.ack(acks.values.toList()); + } + _logger.info("$types events processed"); + } catch (error, stack) { + _logger.warning("Error handling sync events", error, stack); + } finally { + mutex?.complete(); + mutex = null; + } + + if (shouldComplete) { + _logger.info("Sync done, completing stream"); + if (!streamCompleter.isCompleted) streamCompleter.complete(); + } + }, + onError: (error, stack) { + _logger.warning("Error in sync stream for $types", error, stack); + // Do not proceed if the stream errors + if (!streamCompleter.isCompleted) { + // ignore: avoid-missing-completer-stack-trace + streamCompleter.completeError(error, stack); + } + }, + onDone: () { + _logger.info("$types stream done"); + if (mutex == null && !streamCompleter.isCompleted) { + streamCompleter.complete(); + } else { + // Marks the stream as done but does not complete the completer + // until the events are processed + shouldComplete = true; + } + }, + ); + } catch (error, stack) { + _logger.severe("Error starting sync stream", error, stack); + if (!streamCompleter.isCompleted) { + streamCompleter.completeError(error, stack); + } + } + return streamCompleter.future.whenComplete(() { + _logger.info("Sync stream completed"); + return subscription?.cancel(); }); } - Future dispose() async { - await _userSyncSubscription?.cancel(); - } + Future syncUsers() => + _syncEvent([SyncRequestType.usersV1, SyncRequestType.partnersV1]); } diff --git a/mobile/lib/domain/utils/background_sync.dart b/mobile/lib/domain/utils/background_sync.dart new file mode 100644 index 0000000000..0bd456f0bb --- /dev/null +++ b/mobile/lib/domain/utils/background_sync.dart @@ -0,0 +1,37 @@ +// ignore_for_file: avoid-passing-async-when-sync-expected + +import 'dart:async'; + +import 'package:immich_mobile/providers/infrastructure/sync_stream.provider.dart'; +import 'package:immich_mobile/utils/isolate.dart'; +import 'package:worker_manager/worker_manager.dart'; + +class BackgroundSyncManager { + Cancelable? _userSyncTask; + + BackgroundSyncManager(); + + Future cancel() { + final futures = []; + if (_userSyncTask != null) { + futures.add(_userSyncTask!.future); + } + _userSyncTask?.cancel(); + _userSyncTask = null; + return Future.wait(futures); + } + + Future syncUsers() { + if (_userSyncTask != null) { + return _userSyncTask!.future; + } + + _userSyncTask = runInIsolateGentle( + computation: (ref) => ref.read(syncStreamServiceProvider).syncUsers(), + ); + _userSyncTask!.whenComplete(() { + _userSyncTask = null; + }); + return _userSyncTask!.future; + } +} diff --git a/mobile/lib/extensions/string_extensions.dart b/mobile/lib/extensions/string_extensions.dart index 67411013ee..73c8c2d34c 100644 --- a/mobile/lib/extensions/string_extensions.dart +++ b/mobile/lib/extensions/string_extensions.dart @@ -1,3 +1,7 @@ +import 'dart:typed_data'; + +import 'package:uuid/parsing.dart'; + extension StringExtension on String { String capitalize() { return split(" ") @@ -29,3 +33,8 @@ extension DurationExtension on String { return int.parse(this); } } + +extension UUIDExtension on String { + Uint8List toUuidByte({bool shouldValidate = false}) => + UuidParsing.parseAsByteList(this, validate: shouldValidate); +} diff --git a/mobile/lib/infrastructure/repositories/sync_api.repository.dart b/mobile/lib/infrastructure/repositories/sync_api.repository.dart index 88a6838c44..a26b867df6 100644 --- a/mobile/lib/infrastructure/repositories/sync_api.repository.dart +++ b/mobile/lib/infrastructure/repositories/sync_api.repository.dart @@ -1,37 +1,36 @@ import 'dart:async'; import 'dart:convert'; -import 'package:flutter/foundation.dart'; -import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; -import 'package:immich_mobile/domain/models/sync/sync_event.model.dart'; -import 'package:immich_mobile/services/api.service.dart'; -import 'package:openapi/api.dart'; import 'package:http/http.dart' as http; +import 'package:immich_mobile/constants/constants.dart'; +import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; +import 'package:immich_mobile/domain/models/sync_event.model.dart'; +import 'package:immich_mobile/services/api.service.dart'; +import 'package:logging/logging.dart'; +import 'package:openapi/api.dart'; class SyncApiRepository implements ISyncApiRepository { + final Logger _logger = Logger('SyncApiRepository'); final ApiService _api; - const SyncApiRepository(this._api); + final int _batchSize; + SyncApiRepository(this._api, {int batchSize = kSyncEventBatchSize}) + : _batchSize = batchSize; @override - Stream> watchUserSyncEvent() { - return _getSyncStream( - SyncStreamDto(types: [SyncRequestType.usersV1]), - ); + Stream> getSyncEvents(List type) { + return _getSyncStream(SyncStreamDto(types: type)); } @override - Future ack(String data) { - return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: [data])); + Future ack(List data) { + return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: data)); } - Stream> _getSyncStream( - SyncStreamDto dto, { - int batchSize = 5000, - }) async* { + Stream> _getSyncStream(SyncStreamDto dto) async* { final client = http.Client(); final endpoint = "${_api.apiClient.basePath}/sync/stream"; - final headers = { + final headers = { 'Content-Type': 'application/json', 'Accept': 'application/jsonlines+json', }; @@ -61,52 +60,54 @@ class SyncApiRepository implements ISyncApiRepository { await for (final chunk in response.stream.transform(utf8.decoder)) { previousChunk += chunk; - final parts = previousChunk.split('\n'); + final parts = previousChunk.toString().split('\n'); previousChunk = parts.removeLast(); lines.addAll(parts); - if (lines.length < batchSize) { + if (lines.length < _batchSize) { continue; } - yield await compute(_parseSyncResponse, lines); + yield _parseSyncResponse(lines); lines.clear(); } } finally { if (lines.isNotEmpty) { - yield await compute(_parseSyncResponse, lines); + yield _parseSyncResponse(lines); } client.close(); } } + + List _parseSyncResponse(List lines) { + final List data = []; + + for (final line in lines) { + try { + final jsonData = jsonDecode(line); + final type = SyncEntityType.fromJson(jsonData['type'])!; + final dataJson = jsonData['data']; + final ack = jsonData['ack']; + final converter = _kResponseMap[type]; + if (converter == null) { + _logger.warning("[_parseSyncResponse] Unknown type $type"); + continue; + } + + data.add(SyncEvent(type: type, data: converter(dataJson), ack: ack)); + } catch (error, stack) { + _logger.severe("[_parseSyncResponse] Error parsing json", error, stack); + } + } + + return data; + } } +// ignore: avoid-dynamic const _kResponseMap = { SyncEntityType.userV1: SyncUserV1.fromJson, SyncEntityType.userDeleteV1: SyncUserDeleteV1.fromJson, + SyncEntityType.partnerV1: SyncPartnerV1.fromJson, + SyncEntityType.partnerDeleteV1: SyncPartnerDeleteV1.fromJson, }; - -// Need to be outside of the class to be able to use compute -List _parseSyncResponse(List lines) { - final List data = []; - - for (var line in lines) { - try { - final jsonData = jsonDecode(line); - final type = SyncEntityType.fromJson(jsonData['type'])!; - final dataJson = jsonData['data']; - final ack = jsonData['ack']; - final converter = _kResponseMap[type]; - if (converter == null) { - debugPrint("[_parseSyncReponse] Unknown type $type"); - continue; - } - - data.add(SyncEvent(data: converter(dataJson), ack: ack)); - } catch (error, stack) { - debugPrint("[_parseSyncReponse] Error parsing json $error $stack"); - } - } - - return data; -} diff --git a/mobile/lib/infrastructure/repositories/sync_stream.repository.dart b/mobile/lib/infrastructure/repositories/sync_stream.repository.dart new file mode 100644 index 0000000000..a947a9a66b --- /dev/null +++ b/mobile/lib/infrastructure/repositories/sync_stream.repository.dart @@ -0,0 +1,104 @@ +import 'package:drift/drift.dart'; +import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; +import 'package:immich_mobile/extensions/string_extensions.dart'; +import 'package:immich_mobile/infrastructure/entities/partner.entity.drift.dart'; +import 'package:immich_mobile/infrastructure/entities/user.entity.drift.dart'; +import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; +import 'package:logging/logging.dart'; +import 'package:openapi/api.dart'; + +class DriftSyncStreamRepository extends DriftDatabaseRepository + implements ISyncStreamRepository { + final Logger _logger = Logger('DriftSyncStreamRepository'); + final Drift _db; + + DriftSyncStreamRepository(super.db) : _db = db; + + @override + Future deleteUsersV1(Iterable data) async { + try { + await _db.batch((batch) { + for (final user in data) { + batch.delete( + _db.userEntity, + UserEntityCompanion(id: Value(user.userId.toUuidByte())), + ); + } + }); + return true; + } catch (e, s) { + _logger.severe('Error while processing SyncUserDeleteV1', e, s); + return false; + } + } + + @override + Future updateUsersV1(Iterable data) async { + try { + await _db.batch((batch) { + for (final user in data) { + final companion = UserEntityCompanion( + name: Value(user.name), + email: Value(user.email), + ); + + batch.insert( + _db.userEntity, + companion.copyWith(id: Value(user.id.toUuidByte())), + onConflict: DoUpdate((_) => companion), + ); + } + }); + return true; + } catch (e, s) { + _logger.severe('Error while processing SyncUserV1', e, s); + return false; + } + } + + @override + Future deletePartnerV1(Iterable data) async { + try { + await _db.batch((batch) { + for (final partner in data) { + batch.delete( + _db.partnerEntity, + PartnerEntityCompanion( + sharedById: Value(partner.sharedById.toUuidByte()), + sharedWithId: Value(partner.sharedWithId.toUuidByte()), + ), + ); + } + }); + return true; + } catch (e, s) { + _logger.severe('Error while processing SyncPartnerDeleteV1', e, s); + return false; + } + } + + @override + Future updatePartnerV1(Iterable data) async { + try { + await _db.batch((batch) { + for (final partner in data) { + final companion = + PartnerEntityCompanion(inTimeline: Value(partner.inTimeline)); + + batch.insert( + _db.partnerEntity, + companion.copyWith( + sharedById: Value(partner.sharedById.toUuidByte()), + sharedWithId: Value(partner.sharedWithId.toUuidByte()), + ), + onConflict: DoUpdate((_) => companion), + ); + } + }); + return true; + } catch (e, s) { + _logger.severe('Error while processing SyncPartnerV1', e, s); + return false; + } + } +} diff --git a/mobile/lib/main.dart b/mobile/lib/main.dart index 1a434aa359..73af81d69d 100644 --- a/mobile/lib/main.dart +++ b/mobile/lib/main.dart @@ -11,6 +11,7 @@ import 'package:flutter_displaymode/flutter_displaymode.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/constants/locales.dart'; import 'package:immich_mobile/extensions/build_context_extensions.dart'; +import 'package:immich_mobile/generated/codegen_loader.g.dart'; import 'package:immich_mobile/providers/app_life_cycle.provider.dart'; import 'package:immich_mobile/providers/asset_viewer/share_intent_upload.provider.dart'; import 'package:immich_mobile/providers/db.provider.dart'; @@ -31,13 +32,15 @@ import 'package:immich_mobile/utils/migration.dart'; import 'package:intl/date_symbol_data_local.dart'; import 'package:logging/logging.dart'; import 'package:timezone/data/latest.dart'; -import 'package:immich_mobile/generated/codegen_loader.g.dart'; +import 'package:worker_manager/worker_manager.dart'; void main() async { ImmichWidgetsBinding(); final db = await Bootstrap.initIsar(); await Bootstrap.initDomain(db); await initApp(); + // Warm-up isolate pool for worker manager + await workerManager.init(dynamicSpawning: true); await migrateDatabaseIfNeeded(db); HttpOverrides.global = HttpSSLCertOverride(); diff --git a/mobile/lib/providers/background_sync.provider.dart b/mobile/lib/providers/background_sync.provider.dart new file mode 100644 index 0000000000..83d103bb3b --- /dev/null +++ b/mobile/lib/providers/background_sync.provider.dart @@ -0,0 +1,8 @@ +import 'package:hooks_riverpod/hooks_riverpod.dart'; +import 'package:immich_mobile/domain/utils/background_sync.dart'; + +final backgroundSyncProvider = Provider((ref) { + final manager = BackgroundSyncManager(); + ref.onDispose(manager.cancel); + return manager; +}); diff --git a/mobile/lib/providers/infrastructure/cancel.provider.dart b/mobile/lib/providers/infrastructure/cancel.provider.dart new file mode 100644 index 0000000000..6851861e1a --- /dev/null +++ b/mobile/lib/providers/infrastructure/cancel.provider.dart @@ -0,0 +1,12 @@ +import 'package:hooks_riverpod/hooks_riverpod.dart'; + +/// Provider holding a boolean function that returns true when cancellation is requested. +/// A computation running in the isolate uses the function to implement cooperative cancellation. +final cancellationProvider = Provider( + // This will be overridden in the isolate's container. + // Throwing ensures it's not used without an override. + (ref) => throw UnimplementedError( + "cancellationProvider must be overridden in the isolate's ProviderContainer and not to be used in the root isolate", + ), + name: 'cancellationProvider', +); diff --git a/mobile/lib/providers/infrastructure/db.provider.dart b/mobile/lib/providers/infrastructure/db.provider.dart index 84010b3b96..4eefbc556c 100644 --- a/mobile/lib/providers/infrastructure/db.provider.dart +++ b/mobile/lib/providers/infrastructure/db.provider.dart @@ -1,4 +1,7 @@ +import 'dart:async'; + import 'package:hooks_riverpod/hooks_riverpod.dart'; +import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; import 'package:isar/isar.dart'; import 'package:riverpod_annotation/riverpod_annotation.dart'; @@ -6,3 +9,9 @@ part 'db.provider.g.dart'; @Riverpod(keepAlive: true) Isar isar(Ref ref) => throw UnimplementedError('isar'); + +final driftProvider = Provider((ref) { + final drift = Drift(); + ref.onDispose(() => unawaited(drift.close())); + return drift; +}); diff --git a/mobile/lib/providers/infrastructure/sync_stream.provider.dart b/mobile/lib/providers/infrastructure/sync_stream.provider.dart index 64f1a6cb05..e313982a30 100644 --- a/mobile/lib/providers/infrastructure/sync_stream.provider.dart +++ b/mobile/lib/providers/infrastructure/sync_stream.provider.dart @@ -1,24 +1,23 @@ -import 'dart:async'; - import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/services/sync_stream.service.dart'; import 'package:immich_mobile/infrastructure/repositories/sync_api.repository.dart'; +import 'package:immich_mobile/infrastructure/repositories/sync_stream.repository.dart'; import 'package:immich_mobile/providers/api.provider.dart'; +import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart'; +import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; final syncStreamServiceProvider = Provider( - (ref) { - final instance = SyncStreamService( - ref.watch(syncApiRepositoryProvider), - ); - - ref.onDispose(() => unawaited(instance.dispose())); - - return instance; - }, + (ref) => SyncStreamService( + syncApiRepository: ref.watch(syncApiRepositoryProvider), + syncStreamRepository: ref.watch(syncStreamRepositoryProvider), + cancelChecker: ref.watch(cancellationProvider), + ), ); final syncApiRepositoryProvider = Provider( - (ref) => SyncApiRepository( - ref.watch(apiServiceProvider), - ), + (ref) => SyncApiRepository(ref.watch(apiServiceProvider)), +); + +final syncStreamRepositoryProvider = Provider( + (ref) => DriftSyncStreamRepository(ref.watch(driftProvider)), ); diff --git a/mobile/lib/services/auth.service.dart b/mobile/lib/services/auth.service.dart index 20fa62dc4b..ec053c078b 100644 --- a/mobile/lib/services/auth.service.dart +++ b/mobile/lib/services/auth.service.dart @@ -3,12 +3,14 @@ import 'dart:io'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/models/store.model.dart'; +import 'package:immich_mobile/domain/utils/background_sync.dart'; import 'package:immich_mobile/entities/store.entity.dart'; import 'package:immich_mobile/interfaces/auth.interface.dart'; import 'package:immich_mobile/interfaces/auth_api.interface.dart'; import 'package:immich_mobile/models/auth/auxilary_endpoint.model.dart'; import 'package:immich_mobile/models/auth/login_response.model.dart'; import 'package:immich_mobile/providers/api.provider.dart'; +import 'package:immich_mobile/providers/background_sync.provider.dart'; import 'package:immich_mobile/repositories/auth.repository.dart'; import 'package:immich_mobile/repositories/auth_api.repository.dart'; import 'package:immich_mobile/services/api.service.dart'; @@ -22,6 +24,7 @@ final authServiceProvider = Provider( ref.watch(authRepositoryProvider), ref.watch(apiServiceProvider), ref.watch(networkServiceProvider), + ref.watch(backgroundSyncProvider), ), ); @@ -30,6 +33,7 @@ class AuthService { final IAuthRepository _authRepository; final ApiService _apiService; final NetworkService _networkService; + final BackgroundSyncManager _backgroundSyncManager; final _log = Logger("AuthService"); @@ -38,6 +42,7 @@ class AuthService { this._authRepository, this._apiService, this._networkService, + this._backgroundSyncManager, ); /// Validates the provided server URL by resolving and setting the endpoint. @@ -115,8 +120,10 @@ class AuthService { /// - Asset ETag /// /// All deletions are executed in parallel using [Future.wait]. - Future clearLocalData() { - return Future.wait([ + Future clearLocalData() async { + // Cancel any ongoing background sync operations before clearing data + await _backgroundSyncManager.cancel(); + await Future.wait([ _authRepository.clearLocalData(), Store.delete(StoreKey.currentUser), Store.delete(StoreKey.accessToken), diff --git a/mobile/lib/utils/bootstrap.dart b/mobile/lib/utils/bootstrap.dart index 570752c6d9..26f3b49242 100644 --- a/mobile/lib/utils/bootstrap.dart +++ b/mobile/lib/utils/bootstrap.dart @@ -48,11 +48,15 @@ abstract final class Bootstrap { ); } - static Future initDomain(Isar db) async { + static Future initDomain( + Isar db, { + bool shouldBufferLogs = true, + }) async { await StoreService.init(storeRepository: IsarStoreRepository(db)); await LogService.init( logRepository: IsarLogRepository(db), storeRepository: IsarStoreRepository(db), + shouldBuffer: shouldBufferLogs, ); } } diff --git a/mobile/lib/utils/isolate.dart b/mobile/lib/utils/isolate.dart new file mode 100644 index 0000000000..cfbb1b544f --- /dev/null +++ b/mobile/lib/utils/isolate.dart @@ -0,0 +1,69 @@ +import 'dart:async'; +import 'dart:ui'; + +import 'package:flutter/services.dart'; +import 'package:hooks_riverpod/hooks_riverpod.dart'; +import 'package:immich_mobile/providers/db.provider.dart'; +import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart'; +import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; +import 'package:immich_mobile/utils/bootstrap.dart'; +import 'package:logging/logging.dart'; +import 'package:worker_manager/worker_manager.dart'; + +class InvalidIsolateUsageException implements Exception { + const InvalidIsolateUsageException(); + + @override + String toString() => + "IsolateHelper should only be used from the root isolate"; +} + +// !! Should be used only from the root isolate +Cancelable runInIsolateGentle({ + required Future Function(ProviderContainer ref) computation, + String? debugLabel, +}) { + final token = RootIsolateToken.instance; + if (token == null) { + throw const InvalidIsolateUsageException(); + } + + return workerManager.executeGentle((cancelledChecker) async { + BackgroundIsolateBinaryMessenger.ensureInitialized(token); + DartPluginRegistrant.ensureInitialized(); + + final db = await Bootstrap.initIsar(); + await Bootstrap.initDomain(db, shouldBufferLogs: false); + final ref = ProviderContainer( + overrides: [ + // TODO: Remove once isar is removed + dbProvider.overrideWithValue(db), + isarProvider.overrideWithValue(db), + cancellationProvider.overrideWithValue(cancelledChecker), + ], + ); + + Logger log = Logger("IsolateLogger"); + + try { + return await computation(ref); + } on CanceledError { + log.warning( + "Computation cancelled ${debugLabel == null ? '' : ' for $debugLabel'}", + ); + } catch (error, stack) { + log.severe( + "Error in runInIsolateGentle ${debugLabel == null ? '' : ' for $debugLabel'}", + error, + stack, + ); + } finally { + // Wait for the logs to flush + await Future.delayed(const Duration(seconds: 2)); + // Always close the new db connection on Isolate end + ref.read(driftProvider).close(); + ref.read(isarProvider).close(); + } + return null; + }); +} diff --git a/mobile/pubspec.lock b/mobile/pubspec.lock index 3731832296..235b3f71c3 100644 --- a/mobile/pubspec.lock +++ b/mobile/pubspec.lock @@ -1806,7 +1806,7 @@ packages: source: hosted version: "3.1.4" uuid: - dependency: transitive + dependency: "direct main" description: name: uuid sha256: a5be9ef6618a7ac1e964353ef476418026db906c4facdedaa299b7a2e71690ff @@ -1933,6 +1933,14 @@ packages: url: "https://pub.dev" source: hosted version: "0.0.3" + worker_manager: + dependency: "direct main" + description: + name: worker_manager + sha256: "086ed63e9b36266e851404ca90fd44e37c0f4c9bbf819e5f8d7c87f9741c0591" + url: "https://pub.dev" + source: hosted + version: "7.2.3" xdg_directories: dependency: transitive description: diff --git a/mobile/pubspec.yaml b/mobile/pubspec.yaml index 03c39810f6..fdd91e1f87 100644 --- a/mobile/pubspec.yaml +++ b/mobile/pubspec.yaml @@ -60,7 +60,9 @@ dependencies: thumbhash: 0.1.0+1 timezone: ^0.9.4 url_launcher: ^6.3.1 + uuid: ^4.5.1 wakelock_plus: ^1.2.10 + worker_manager: ^7.2.3 native_video_player: git: diff --git a/mobile/test/domain/service.mock.dart b/mobile/test/domain/service.mock.dart index 53a173fc28..97a3f30294 100644 --- a/mobile/test/domain/service.mock.dart +++ b/mobile/test/domain/service.mock.dart @@ -1,7 +1,10 @@ import 'package:immich_mobile/domain/services/store.service.dart'; import 'package:immich_mobile/domain/services/user.service.dart'; +import 'package:immich_mobile/domain/utils/background_sync.dart'; import 'package:mocktail/mocktail.dart'; class MockStoreService extends Mock implements StoreService {} class MockUserService extends Mock implements UserService {} + +class MockBackgroundSyncManager extends Mock implements BackgroundSyncManager {} diff --git a/mobile/test/domain/services/sync_stream_service_test.dart b/mobile/test/domain/services/sync_stream_service_test.dart new file mode 100644 index 0000000000..e1d8e6987f --- /dev/null +++ b/mobile/test/domain/services/sync_stream_service_test.dart @@ -0,0 +1,443 @@ +// ignore_for_file: avoid-unnecessary-futures, avoid-async-call-in-sync-function + +import 'dart:async'; + +import 'package:flutter_test/flutter_test.dart'; +import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; +import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; +import 'package:immich_mobile/domain/models/sync_event.model.dart'; +import 'package:immich_mobile/domain/services/sync_stream.service.dart'; +import 'package:mocktail/mocktail.dart'; +import 'package:openapi/api.dart'; +import 'package:worker_manager/worker_manager.dart'; + +import '../../fixtures/sync_stream.stub.dart'; +import '../../infrastructure/repository.mock.dart'; + +class _CancellationWrapper { + const _CancellationWrapper(); + + bool isCancelled() => false; +} + +class _MockCancellationWrapper extends Mock implements _CancellationWrapper {} + +void main() { + late SyncStreamService sut; + late ISyncStreamRepository mockSyncStreamRepo; + late ISyncApiRepository mockSyncApiRepo; + late StreamController> streamController; + + successHandler(Invocation _) async => true; + failureHandler(Invocation _) async => false; + + setUp(() { + mockSyncStreamRepo = MockSyncStreamRepository(); + mockSyncApiRepo = MockSyncApiRepository(); + streamController = StreamController>.broadcast(); + + sut = SyncStreamService( + syncApiRepository: mockSyncApiRepo, + syncStreamRepository: mockSyncStreamRepo, + ); + + // Default stream setup - emits one batch and closes + when(() => mockSyncApiRepo.getSyncEvents(any())) + .thenAnswer((_) => streamController.stream); + + // Default ack setup + when(() => mockSyncApiRepo.ack(any())).thenAnswer((_) async => {}); + + // Register fallbacks for mocktail verification + registerFallbackValue([]); + registerFallbackValue([]); + registerFallbackValue([]); + registerFallbackValue([]); + + // Default successful repository calls + when(() => mockSyncStreamRepo.updateUsersV1(any())) + .thenAnswer(successHandler); + when(() => mockSyncStreamRepo.deleteUsersV1(any())) + .thenAnswer(successHandler); + when(() => mockSyncStreamRepo.updatePartnerV1(any())) + .thenAnswer(successHandler); + when(() => mockSyncStreamRepo.deletePartnerV1(any())) + .thenAnswer(successHandler); + }); + + tearDown(() async { + if (!streamController.isClosed) { + await streamController.close(); + } + }); + + // Helper to trigger sync and add events to the stream + Future triggerSyncAndEmit(List events) async { + final future = sut.syncUsers(); // Start listening + await Future.delayed(Duration.zero); // Allow listener to attach + if (!streamController.isClosed) { + streamController.add(events); + await streamController.close(); // Close after emitting + } + await future; // Wait for processing to complete + } + + group("SyncStreamService", () { + test( + "completes successfully when stream emits data and handlers succeed", + () async { + final events = [ + ...SyncStreamStub.userEvents, + ...SyncStreamStub.partnerEvents, + ]; + final future = triggerSyncAndEmit(events); + await expectLater(future, completes); + // Verify ack includes last ack from each successfully handled type + verify( + () => + mockSyncApiRepo.ack(any(that: containsAll(["5", "2", "4", "3"]))), + ).called(1); + }, + ); + + test("completes successfully when stream emits an error", () async { + when(() => mockSyncApiRepo.getSyncEvents(any())) + .thenAnswer((_) => Stream.error(Exception("Stream Error"))); + // Should complete gracefully without throwing + await expectLater(sut.syncUsers(), throwsException); + verifyNever(() => mockSyncApiRepo.ack(any())); // No ack on stream error + }); + + test("throws when initial getSyncEvents call fails", () async { + final apiException = Exception("API Error"); + when(() => mockSyncApiRepo.getSyncEvents(any())).thenThrow(apiException); + // Should rethrow the exception from the initial call + await expectLater(sut.syncUsers(), throwsA(apiException)); + verifyNever(() => mockSyncApiRepo.ack(any())); + }); + + test( + "completes successfully when a repository handler throws an exception", + () async { + when(() => mockSyncStreamRepo.updateUsersV1(any())) + .thenThrow(Exception("Repo Error")); + final events = [ + ...SyncStreamStub.userEvents, + ...SyncStreamStub.partnerEvents, + ]; + // Should complete, but ack only for the successful types + await triggerSyncAndEmit(events); + // Only partner delete was successful by default setup + verify(() => mockSyncApiRepo.ack(["2", "4", "3"])).called(1); + }, + ); + + test( + "completes successfully but sends no ack when all handlers fail", + () async { + when(() => mockSyncStreamRepo.updateUsersV1(any())) + .thenAnswer(failureHandler); + when(() => mockSyncStreamRepo.deleteUsersV1(any())) + .thenAnswer(failureHandler); + when(() => mockSyncStreamRepo.updatePartnerV1(any())) + .thenAnswer(failureHandler); + when(() => mockSyncStreamRepo.deletePartnerV1(any())) + .thenAnswer(failureHandler); + + final events = [ + ...SyncStreamStub.userEvents, + ...SyncStreamStub.partnerEvents, + ]; + await triggerSyncAndEmit(events); + verifyNever(() => mockSyncApiRepo.ack(any())); + }, + ); + + test("sends ack only for types where handler returns true", () async { + // Mock specific handlers: user update fails, user delete succeeds + when(() => mockSyncStreamRepo.updateUsersV1(any())) + .thenAnswer(failureHandler); + when(() => mockSyncStreamRepo.deleteUsersV1(any())) + .thenAnswer(successHandler); + // partner update fails, partner delete succeeds + when(() => mockSyncStreamRepo.updatePartnerV1(any())) + .thenAnswer(failureHandler); + + final events = [ + ...SyncStreamStub.userEvents, + ...SyncStreamStub.partnerEvents, + ]; + await triggerSyncAndEmit(events); + + // Expect ack only for userDeleteV1 (ack: "2") and partnerDeleteV1 (ack: "4") + verify(() => mockSyncApiRepo.ack(any(that: containsAll(["2", "4"])))) + .called(1); + }); + + test("does not process or ack when stream emits an empty list", () async { + final future = sut.syncUsers(); + streamController.add([]); // Emit empty list + await streamController.close(); + await future; // Wait for completion + + verifyNever(() => mockSyncStreamRepo.updateUsersV1(any())); + verifyNever(() => mockSyncStreamRepo.deleteUsersV1(any())); + verifyNever(() => mockSyncStreamRepo.updatePartnerV1(any())); + verifyNever(() => mockSyncStreamRepo.deletePartnerV1(any())); + verifyNever(() => mockSyncApiRepo.ack(any())); + }); + + test("processes multiple batches sequentially using mutex", () async { + final completer1 = Completer(); + final completer2 = Completer(); + int callOrder = 0; + int handler1StartOrder = -1; + int handler2StartOrder = -1; + int handler1Calls = 0; + int handler2Calls = 0; + + when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer((_) async { + handler1Calls++; + handler1StartOrder = ++callOrder; + await completer1.future; + return true; + }); + when(() => mockSyncStreamRepo.updatePartnerV1(any())) + .thenAnswer((_) async { + handler2Calls++; + handler2StartOrder = ++callOrder; + await completer2.future; + return true; + }); + + final batch1 = SyncStreamStub.userEvents; + final batch2 = SyncStreamStub.partnerEvents; + + final syncFuture = sut.syncUsers(); + await pumpEventQueue(); + + streamController.add(batch1); + await pumpEventQueue(); + // Small delay to ensure the first handler starts + await Future.delayed(const Duration(milliseconds: 20)); + + expect(handler1StartOrder, 1, reason: "Handler 1 should start first"); + expect(handler1Calls, 1); + + streamController.add(batch2); + await pumpEventQueue(); + // Small delay + await Future.delayed(const Duration(milliseconds: 20)); + + expect(handler2StartOrder, -1, reason: "Handler 2 should wait"); + expect(handler2Calls, 0); + + completer1.complete(); + await pumpEventQueue(times: 40); + // Small delay to ensure the second handler starts + await Future.delayed(const Duration(milliseconds: 20)); + + expect(handler2StartOrder, 2, reason: "Handler 2 should start after H1"); + expect(handler2Calls, 1); + + completer2.complete(); + await pumpEventQueue(times: 40); + // Small delay before closing the stream + await Future.delayed(const Duration(milliseconds: 20)); + + if (!streamController.isClosed) { + await streamController.close(); + } + await pumpEventQueue(times: 40); + // Small delay to ensure the sync completes + await Future.delayed(const Duration(milliseconds: 20)); + + await syncFuture; + + verify(() => mockSyncStreamRepo.updateUsersV1(any())).called(1); + verify(() => mockSyncStreamRepo.updatePartnerV1(any())).called(1); + verify(() => mockSyncApiRepo.ack(any())).called(2); + }); + + test( + "stops processing and ack when cancel checker is completed", + () async { + final cancellationChecker = _MockCancellationWrapper(); + when(() => cancellationChecker.isCancelled()).thenAnswer((_) => false); + + sut = SyncStreamService( + syncApiRepository: mockSyncApiRepo, + syncStreamRepository: mockSyncStreamRepo, + cancelChecker: cancellationChecker.isCancelled, + ); + + final processingCompleter = Completer(); + bool handlerStarted = false; + + // Make handler wait so we can cancel it mid-flight + when(() => mockSyncStreamRepo.deleteUsersV1(any())) + .thenAnswer((_) async { + handlerStarted = true; + await processingCompleter + .future; // Wait indefinitely until test completes it + return true; + }); + + final syncFuture = sut.syncUsers(); + await pumpEventQueue(times: 30); + + streamController.add(SyncStreamStub.userEvents); + // Ensure processing starts + await Future.delayed(const Duration(milliseconds: 10)); + + expect(handlerStarted, isTrue, reason: "Handler should have started"); + + when(() => cancellationChecker.isCancelled()).thenAnswer((_) => true); + + // Allow cancellation logic to propagate + await Future.delayed(const Duration(milliseconds: 10)); + + // Complete the handler's completer after cancellation signal + // to ensure the cancellation logic itself isn't blocked by the handler. + processingCompleter.complete(); + + await expectLater(syncFuture, throwsA(isA())); + + // Verify that ack was NOT called because processing was cancelled + verifyNever(() => mockSyncApiRepo.ack(any())); + }, + ); + + test("completes successfully when ack call throws an exception", () async { + when(() => mockSyncApiRepo.ack(any())).thenThrow(Exception("Ack Error")); + final events = [ + ...SyncStreamStub.userEvents, + ...SyncStreamStub.partnerEvents, + ]; + + // Should still complete even if ack fails + await triggerSyncAndEmit(events); + verify(() => mockSyncApiRepo.ack(any())) + .called(1); // Verify ack was attempted + }); + + test("waits for processing to finish if onDone called early", () async { + final processingCompleter = Completer(); + bool handlerFinished = false; + + when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer((_) async { + await processingCompleter.future; // Wait inside handler + handlerFinished = true; + return true; + }); + + final syncFuture = sut.syncUsers(); + // Allow listener to attach + // This is necessary to ensure the stream is ready to receive events + await Future.delayed(Duration.zero); + + streamController.add(SyncStreamStub.userEvents); // Emit batch + await Future.delayed( + const Duration(milliseconds: 10), + ); // Ensure processing starts + + await streamController + .close(); // Close stream (triggers onDone internally) + await Future.delayed( + const Duration(milliseconds: 10), + ); // Give onDone a chance to fire + + // At this point, onDone was called, but processing is blocked + expect(handlerFinished, isFalse); + + processingCompleter.complete(); // Allow processing to finish + await syncFuture; // Now the main future should complete + + expect(handlerFinished, isTrue); + verify(() => mockSyncApiRepo.ack(any())).called(1); + }); + + test("processes events in the defined _kSyncTypeOrder", () async { + final future = sut.syncUsers(); + await pumpEventQueue(); + if (!streamController.isClosed) { + final events = [ + SyncEvent( + type: SyncEntityType.partnerV1, + data: SyncStreamStub.partnerV1, + ack: "1", + ), // Should be processed last + SyncEvent( + type: SyncEntityType.userV1, + data: SyncStreamStub.userV1Admin, + ack: "2", + ), // Should be processed second + SyncEvent( + type: SyncEntityType.partnerDeleteV1, + data: SyncStreamStub.partnerDeleteV1, + ack: "3", + ), // Should be processed third + SyncEvent( + type: SyncEntityType.userDeleteV1, + data: SyncStreamStub.userDeleteV1, + ack: "4", + ), // Should be processed first + ]; + + streamController.add(events); + await streamController.close(); + } + await future; + + verifyInOrder([ + () => mockSyncStreamRepo.deleteUsersV1(any()), + () => mockSyncStreamRepo.updateUsersV1(any()), + () => mockSyncStreamRepo.deletePartnerV1(any()), + () => mockSyncStreamRepo.updatePartnerV1(any()), + // Verify ack happens after all processing + () => mockSyncApiRepo.ack(any()), + ]); + }); + }); + + group("syncUsers", () { + test("calls getSyncEvents with correct types", () async { + // Need to close the stream for the future to complete + final future = sut.syncUsers(); + await streamController.close(); + await future; + + verify( + () => mockSyncApiRepo.getSyncEvents([ + SyncRequestType.usersV1, + SyncRequestType.partnersV1, + ]), + ).called(1); + }); + + test("calls repository methods with correctly grouped data", () async { + final events = [ + ...SyncStreamStub.userEvents, + ...SyncStreamStub.partnerEvents, + ]; + await triggerSyncAndEmit(events); + + // Verify each handler was called with the correct list of data payloads + verify( + () => mockSyncStreamRepo.updateUsersV1( + [SyncStreamStub.userV1Admin, SyncStreamStub.userV1User], + ), + ).called(1); + verify( + () => mockSyncStreamRepo.deleteUsersV1([SyncStreamStub.userDeleteV1]), + ).called(1); + verify( + () => mockSyncStreamRepo.updatePartnerV1([SyncStreamStub.partnerV1]), + ).called(1); + verify( + () => mockSyncStreamRepo + .deletePartnerV1([SyncStreamStub.partnerDeleteV1]), + ).called(1); + }); + }); +} diff --git a/mobile/test/fixtures/sync_stream.stub.dart b/mobile/test/fixtures/sync_stream.stub.dart new file mode 100644 index 0000000000..781e63a2bb --- /dev/null +++ b/mobile/test/fixtures/sync_stream.stub.dart @@ -0,0 +1,45 @@ +import 'package:immich_mobile/domain/models/sync_event.model.dart'; +import 'package:openapi/api.dart'; + +abstract final class SyncStreamStub { + static final userV1Admin = SyncUserV1( + deletedAt: DateTime(2020), + email: "admin@admin", + id: "1", + name: "Admin", + ); + static final userV1User = SyncUserV1( + deletedAt: DateTime(2021), + email: "user@user", + id: "2", + name: "User", + ); + static final userDeleteV1 = SyncUserDeleteV1(userId: "2"); + static final userEvents = [ + SyncEvent(type: SyncEntityType.userV1, data: userV1Admin, ack: "1"), + SyncEvent( + type: SyncEntityType.userDeleteV1, + data: userDeleteV1, + ack: "2", + ), + SyncEvent(type: SyncEntityType.userV1, data: userV1User, ack: "5"), + ]; + + static final partnerV1 = SyncPartnerV1( + inTimeline: true, + sharedById: "1", + sharedWithId: "2", + ); + static final partnerDeleteV1 = SyncPartnerDeleteV1( + sharedById: "3", + sharedWithId: "4", + ); + static final partnerEvents = [ + SyncEvent( + type: SyncEntityType.partnerDeleteV1, + data: partnerDeleteV1, + ack: "4", + ), + SyncEvent(type: SyncEntityType.partnerV1, data: partnerV1, ack: "3"), + ]; +} diff --git a/mobile/test/infrastructure/repository.mock.dart b/mobile/test/infrastructure/repository.mock.dart index 192858adff..c4a5680f71 100644 --- a/mobile/test/infrastructure/repository.mock.dart +++ b/mobile/test/infrastructure/repository.mock.dart @@ -1,6 +1,8 @@ import 'package:immich_mobile/domain/interfaces/device_asset.interface.dart'; import 'package:immich_mobile/domain/interfaces/log.interface.dart'; import 'package:immich_mobile/domain/interfaces/store.interface.dart'; +import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; +import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; import 'package:immich_mobile/domain/interfaces/user.interface.dart'; import 'package:immich_mobile/domain/interfaces/user_api.interface.dart'; import 'package:mocktail/mocktail.dart'; @@ -14,5 +16,9 @@ class MockUserRepository extends Mock implements IUserRepository {} class MockDeviceAssetRepository extends Mock implements IDeviceAssetRepository {} +class MockSyncStreamRepository extends Mock implements ISyncStreamRepository {} + // API Repos class MockUserApiRepository extends Mock implements IUserApiRepository {} + +class MockSyncApiRepository extends Mock implements ISyncApiRepository {} diff --git a/mobile/test/service.mocks.dart b/mobile/test/service.mocks.dart index e1b8df40a3..87a8c01cf0 100644 --- a/mobile/test/service.mocks.dart +++ b/mobile/test/service.mocks.dart @@ -29,4 +29,3 @@ class MockSearchApi extends Mock implements SearchApi {} class MockAppSettingService extends Mock implements AppSettingsService {} class MockBackgroundService extends Mock implements BackgroundService {} - diff --git a/mobile/test/services/auth.service_test.dart b/mobile/test/services/auth.service_test.dart index e4f011d940..4ada98a6c9 100644 --- a/mobile/test/services/auth.service_test.dart +++ b/mobile/test/services/auth.service_test.dart @@ -8,6 +8,7 @@ import 'package:isar/isar.dart'; import 'package:mocktail/mocktail.dart'; import 'package:openapi/api.dart'; +import '../domain/service.mock.dart'; import '../repository.mocks.dart'; import '../service.mocks.dart'; import '../test_utils.dart'; @@ -18,6 +19,7 @@ void main() { late MockAuthRepository authRepository; late MockApiService apiService; late MockNetworkService networkService; + late MockBackgroundSyncManager backgroundSyncManager; late Isar db; setUp(() async { @@ -25,12 +27,14 @@ void main() { authRepository = MockAuthRepository(); apiService = MockApiService(); networkService = MockNetworkService(); + backgroundSyncManager = MockBackgroundSyncManager(); sut = AuthService( authApiRepository, authRepository, apiService, networkService, + backgroundSyncManager, ); registerFallbackValue(Uri()); @@ -116,24 +120,28 @@ void main() { group('logout', () { test('Should logout user', () async { when(() => authApiRepository.logout()).thenAnswer((_) async => {}); + when(() => backgroundSyncManager.cancel()).thenAnswer((_) async => {}); when(() => authRepository.clearLocalData()) .thenAnswer((_) => Future.value(null)); await sut.logout(); verify(() => authApiRepository.logout()).called(1); + verify(() => backgroundSyncManager.cancel()).called(1); verify(() => authRepository.clearLocalData()).called(1); }); test('Should clear local data even on server error', () async { when(() => authApiRepository.logout()) .thenThrow(Exception('Server error')); + when(() => backgroundSyncManager.cancel()).thenAnswer((_) async => {}); when(() => authRepository.clearLocalData()) .thenAnswer((_) => Future.value(null)); await sut.logout(); verify(() => authApiRepository.logout()).called(1); + verify(() => backgroundSyncManager.cancel()).called(1); verify(() => authRepository.clearLocalData()).called(1); }); });