From deca45a008f1a99c230384873e6d32918803ece0 Mon Sep 17 00:00:00 2001 From: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com> Date: Tue, 15 Apr 2025 20:05:05 +0530 Subject: [PATCH] rewrite isolate manager --- .../domain/services/sync_stream.service.dart | 40 +- mobile/lib/domain/utils/background_sync.dart | 42 ++ mobile/lib/domain/utils/cancel.exception.dart | 8 + .../providers/background_sync.provider.dart | 14 +- .../infrastructure/cancel.provider.dart | 14 + .../infrastructure/sync_stream.provider.dart | 2 + mobile/lib/services/auth.service.dart | 6 + mobile/lib/utils/background_sync.dart | 20 - mobile/lib/utils/bootstrap.dart | 6 +- mobile/lib/utils/isolate.dart | 405 ++++++++++++++++-- mobile/test/domain/service.mock.dart | 3 + .../services/sync_stream_service_test.dart | 128 ++++-- mobile/test/services/auth.service_test.dart | 8 + 13 files changed, 598 insertions(+), 98 deletions(-) create mode 100644 mobile/lib/domain/utils/background_sync.dart create mode 100644 mobile/lib/domain/utils/cancel.exception.dart create mode 100644 mobile/lib/providers/infrastructure/cancel.provider.dart delete mode 100644 mobile/lib/utils/background_sync.dart diff --git a/mobile/lib/domain/services/sync_stream.service.dart b/mobile/lib/domain/services/sync_stream.service.dart index 57c8ec72fd..265ba0b09f 100644 --- a/mobile/lib/domain/services/sync_stream.service.dart +++ b/mobile/lib/domain/services/sync_stream.service.dart @@ -5,6 +5,7 @@ import 'dart:async'; import 'package:collection/collection.dart'; import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; +import 'package:immich_mobile/domain/utils/cancel.exception.dart'; import 'package:logging/logging.dart'; import 'package:openapi/api.dart'; @@ -13,12 +14,15 @@ class SyncStreamService { final ISyncApiRepository _syncApiRepository; final ISyncStreamRepository _syncStreamRepository; + final Completer? _cancelCompleter; SyncStreamService({ required ISyncApiRepository syncApiRepository, required ISyncStreamRepository syncStreamRepository, + Completer? cancelCompleter, }) : _syncApiRepository = syncApiRepository, - _syncStreamRepository = syncStreamRepository; + _syncStreamRepository = syncStreamRepository, + _cancelCompleter = cancelCompleter; Future _handleSyncData( SyncEntityType type, @@ -65,7 +69,8 @@ class SyncStreamService { // the following flag ensures that the onDone callback is not called // before the events are processed and also that events are processed sequentially Completer? mutex; - final subscription = _syncApiRepository.getSyncEvents(types).listen( + StreamSubscription? subscription; + subscription = _syncApiRepository.getSyncEvents(types).listen( (events) async { if (events.isEmpty) { _logger.warning("Received empty sync events"); @@ -77,6 +82,18 @@ class SyncStreamService { await mutex!.future; } + if (_cancelCompleter?.isCompleted ?? false) { + _logger.info("Sync cancelled, stopping stream"); + subscription?.cancel(); + if (!streamCompleter.isCompleted) { + streamCompleter.completeError( + const CancelException(), + StackTrace.current, + ); + } + return; + } + // Take control of the mutex and process the events mutex = Completer(); @@ -85,6 +102,20 @@ class SyncStreamService { final Map acks = {}; for (final entry in eventsMap.entries) { + if (_cancelCompleter?.isCompleted ?? false) { + _logger.info("Sync cancelled, stopping stream"); + mutex?.complete(); + mutex = null; + if (!streamCompleter.isCompleted) { + streamCompleter.completeError( + const CancelException(), + StackTrace.current, + ); + } + + return; + } + final type = entry.key; final data = entry.value; @@ -133,7 +164,10 @@ class SyncStreamService { } }, ); - return await streamCompleter.future.whenComplete(subscription.cancel); + return await streamCompleter.future.whenComplete(() { + _logger.info("Sync stream completed"); + return subscription?.cancel(); + }); } Future syncUsers() => diff --git a/mobile/lib/domain/utils/background_sync.dart b/mobile/lib/domain/utils/background_sync.dart new file mode 100644 index 0000000000..0016c1cabb --- /dev/null +++ b/mobile/lib/domain/utils/background_sync.dart @@ -0,0 +1,42 @@ +// ignore_for_file: avoid-passing-async-when-sync-expected + +import 'dart:async'; + +import 'package:async/async.dart'; +import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart'; +import 'package:immich_mobile/providers/infrastructure/sync_stream.provider.dart'; +import 'package:immich_mobile/utils/isolate.dart'; + +class BackgroundSyncManager { + CancelableOperation? _userSyncFuture; + + BackgroundSyncManager(); + + Future cancel() async { + await _userSyncFuture?.cancel(); + _userSyncFuture = null; + } + + Future syncUsers() async { + if (_userSyncFuture != null) { + return _userSyncFuture!.valueOrCancellation(); + } + + if (_userSyncFuture == null) { + final isolate = await IsolateManager.spawn( + computation: (ref) => ref.read(syncStreamServiceProvider).syncUsers(), + onCancel: (ref) => ref.read(cancellationProvider).complete(true), + ); + _userSyncFuture = CancelableOperation.fromFuture( + isolate.run(), + onCancel: () { + isolate.cancel(); + _userSyncFuture = null; + }, + ); + return _userSyncFuture! + .valueOrCancellation() + .then((_) => _userSyncFuture = null); + } + } +} diff --git a/mobile/lib/domain/utils/cancel.exception.dart b/mobile/lib/domain/utils/cancel.exception.dart new file mode 100644 index 0000000000..702660b79c --- /dev/null +++ b/mobile/lib/domain/utils/cancel.exception.dart @@ -0,0 +1,8 @@ +class CancelException implements Exception { + final String message; + + const CancelException([this.message = "Operation was cancelled."]); + + @override + String toString() => "CancelException: $message"; +} diff --git a/mobile/lib/providers/background_sync.provider.dart b/mobile/lib/providers/background_sync.provider.dart index 75efebddd8..dfc880c908 100644 --- a/mobile/lib/providers/background_sync.provider.dart +++ b/mobile/lib/providers/background_sync.provider.dart @@ -1,6 +1,10 @@ -import 'package:hooks_riverpod/hooks_riverpod.dart'; -import 'package:immich_mobile/utils/background_sync.dart'; +import 'dart:async'; -final backgroundSyncProvider = Provider( - (ref) => BackgroundSyncManager(), -); +import 'package:hooks_riverpod/hooks_riverpod.dart'; +import 'package:immich_mobile/domain/utils/background_sync.dart'; + +final backgroundSyncProvider = Provider((ref) { + final manager = BackgroundSyncManager(); + ref.onDispose(() => unawaited(manager.cancel())); + return manager; +}); diff --git a/mobile/lib/providers/infrastructure/cancel.provider.dart b/mobile/lib/providers/infrastructure/cancel.provider.dart new file mode 100644 index 0000000000..37d0ad3180 --- /dev/null +++ b/mobile/lib/providers/infrastructure/cancel.provider.dart @@ -0,0 +1,14 @@ +import 'dart:async'; + +import 'package:hooks_riverpod/hooks_riverpod.dart'; + +/// Provider holding a boolean notifier that becomes true when cancellation is requested. +/// A computation running in the isolate use the completer to implement cooperative cancellation. +final cancellationProvider = Provider>( + // This will be overridden in the isolate's container. + // Throwing ensures it's not used without an override. + (ref) => throw UnimplementedError( + "cancellationProvider must be overridden in the isolate's ProviderContainer and not to be used in the root isolate", + ), + name: 'cancellationProvider', +); diff --git a/mobile/lib/providers/infrastructure/sync_stream.provider.dart b/mobile/lib/providers/infrastructure/sync_stream.provider.dart index 2bd26bd933..6b182e3475 100644 --- a/mobile/lib/providers/infrastructure/sync_stream.provider.dart +++ b/mobile/lib/providers/infrastructure/sync_stream.provider.dart @@ -3,12 +3,14 @@ import 'package:immich_mobile/domain/services/sync_stream.service.dart'; import 'package:immich_mobile/infrastructure/repositories/sync_api.repository.dart'; import 'package:immich_mobile/infrastructure/repositories/sync_stream.repository.dart'; import 'package:immich_mobile/providers/api.provider.dart'; +import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart'; import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; final syncStreamServiceProvider = Provider( (ref) => SyncStreamService( syncApiRepository: ref.watch(syncApiRepositoryProvider), syncStreamRepository: ref.watch(syncStreamRepositoryProvider), + cancelCompleter: ref.watch(cancellationProvider), ), ); diff --git a/mobile/lib/services/auth.service.dart b/mobile/lib/services/auth.service.dart index 20fa62dc4b..36b73f980a 100644 --- a/mobile/lib/services/auth.service.dart +++ b/mobile/lib/services/auth.service.dart @@ -3,12 +3,14 @@ import 'dart:io'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/models/store.model.dart'; +import 'package:immich_mobile/domain/utils/background_sync.dart'; import 'package:immich_mobile/entities/store.entity.dart'; import 'package:immich_mobile/interfaces/auth.interface.dart'; import 'package:immich_mobile/interfaces/auth_api.interface.dart'; import 'package:immich_mobile/models/auth/auxilary_endpoint.model.dart'; import 'package:immich_mobile/models/auth/login_response.model.dart'; import 'package:immich_mobile/providers/api.provider.dart'; +import 'package:immich_mobile/providers/background_sync.provider.dart'; import 'package:immich_mobile/repositories/auth.repository.dart'; import 'package:immich_mobile/repositories/auth_api.repository.dart'; import 'package:immich_mobile/services/api.service.dart'; @@ -22,6 +24,7 @@ final authServiceProvider = Provider( ref.watch(authRepositoryProvider), ref.watch(apiServiceProvider), ref.watch(networkServiceProvider), + ref.watch(backgroundSyncProvider), ), ); @@ -30,6 +33,7 @@ class AuthService { final IAuthRepository _authRepository; final ApiService _apiService; final NetworkService _networkService; + final BackgroundSyncManager _backgroundSyncManager; final _log = Logger("AuthService"); @@ -38,6 +42,7 @@ class AuthService { this._authRepository, this._apiService, this._networkService, + this._backgroundSyncManager, ); /// Validates the provided server URL by resolving and setting the endpoint. @@ -117,6 +122,7 @@ class AuthService { /// All deletions are executed in parallel using [Future.wait]. Future clearLocalData() { return Future.wait([ + _backgroundSyncManager.cancel(), _authRepository.clearLocalData(), Store.delete(StoreKey.currentUser), Store.delete(StoreKey.accessToken), diff --git a/mobile/lib/utils/background_sync.dart b/mobile/lib/utils/background_sync.dart deleted file mode 100644 index 906a548cbe..0000000000 --- a/mobile/lib/utils/background_sync.dart +++ /dev/null @@ -1,20 +0,0 @@ -// ignore_for_file: avoid-passing-async-when-sync-expected - -import 'dart:async'; - -import 'package:async/async.dart'; -import 'package:immich_mobile/providers/infrastructure/sync_stream.provider.dart'; -import 'package:immich_mobile/utils/isolate.dart'; - -class BackgroundSyncManager { - // This prevents multiple syncs from running at the same time - final _userSyncCache = AsyncCache.ephemeral(); - - BackgroundSyncManager(); - - Future syncUsers() => _userSyncCache.fetch( - () => runInIsolate( - (ref) => ref.read(syncStreamServiceProvider).syncUsers(), - ), - ); -} diff --git a/mobile/lib/utils/bootstrap.dart b/mobile/lib/utils/bootstrap.dart index 570752c6d9..26f3b49242 100644 --- a/mobile/lib/utils/bootstrap.dart +++ b/mobile/lib/utils/bootstrap.dart @@ -48,11 +48,15 @@ abstract final class Bootstrap { ); } - static Future initDomain(Isar db) async { + static Future initDomain( + Isar db, { + bool shouldBufferLogs = true, + }) async { await StoreService.init(storeRepository: IsarStoreRepository(db)); await LogService.init( logRepository: IsarLogRepository(db), storeRepository: IsarStoreRepository(db), + shouldBuffer: shouldBufferLogs, ); } } diff --git a/mobile/lib/utils/isolate.dart b/mobile/lib/utils/isolate.dart index 4cbd4e8bf0..7041b3d417 100644 --- a/mobile/lib/utils/isolate.dart +++ b/mobile/lib/utils/isolate.dart @@ -1,66 +1,401 @@ +// ignore_for_file: avoid-missing-completer-stack-trace + import 'dart:async'; import 'dart:isolate'; import 'dart:ui'; -import 'package:flutter/material.dart'; +import 'package:flutter/foundation.dart'; import 'package:flutter/services.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/providers/db.provider.dart'; +import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart'; import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; import 'package:immich_mobile/utils/bootstrap.dart'; import 'package:logging/logging.dart'; +/// Exception thrown when IsolateManager is used incorrectly from a non-root isolate. class InvalidIsolateUsageException implements Exception { const InvalidIsolateUsageException(); @override String toString() => - "IsolateHelper should only be used from the root isolate"; + "IsolateManager should only be used from the root isolate"; } -// !! Should be used only from the root isolate -Future runInIsolate( - FutureOr Function(ProviderContainer ref) computation, { - String? debugLabel, -}) async { - final token = RootIsolateToken.instance; - if (token == null) { - throw const InvalidIsolateUsageException(); +/// Exception thrown when an isolate computation is cancelled via [IsolateManager.cancel]. +class IsolateCancelledException implements Exception { + final String? debugLabel; + const IsolateCancelledException([this.debugLabel]); + + @override + String toString() => + "Isolate computation${debugLabel == null ? '' : ' ($debugLabel)'} was cancelled."; +} + +/// Signature for the computation function to be executed in the isolate. +/// It receives a Riverpod [ProviderContainer] for dependency injection. +typedef Computation = FutureOr Function(ProviderContainer ref); + +enum _IsolateCommand { + cancel, + start, +} + +/// Manages running a [Computation] in a separate isolate with Riverpod support +/// and optional cancellation. +/// +/// Similar to [compute] or [Isolate.run], but provides: +/// - A pre-configured Riverpod [ProviderContainer] to the computation. +/// - An optional `onCancel` computation for cleanup. +/// - A `cancel()` method to attempt cancellation. +class IsolateManager { + final SendPort _sendPort; + final ReceivePort _receivePort; + final Completer _completer = Completer(); + final String? _debugLabel; + StreamSubscription? _subscription; + bool _isClosed = false; + bool _isRunning = false; + + IsolateManager._(this._sendPort, this._receivePort, this._debugLabel) { + _subscription = _receivePort.listen( + _handleResponseFromIsolate, + onDone: () { + // If the port closes before the completer finishes, signal an error. + if (_completer.isCompleted) { + return; + } + + _completer.completeError( + RemoteError( + "Isolate terminated unexpectedly${_debugLabel == null ? '' : ' ($_debugLabel)'}", + "", + ), + StackTrace.empty, + ); + _isClosed = true; + }, + ); } - return await Isolate.run(() async { - BackgroundIsolateBinaryMessenger.ensureInitialized(token); - DartPluginRegistrant.ensureInitialized(); + /// Spawns a new isolate and prepares it to run the [computation]. + /// + /// - [computation]: The function to run in the isolate. + /// - [onCancel]: An optional function to run if `cancel()` is called before the computation completes. + /// - [debugLabel]: An optional label for debugging. + /// + /// Must be called from the root isolate. + static Future> spawn({ + required Computation computation, + Computation? onCancel, + String? debugLabel, + }) async { + final Logger logger = Logger("IsolateManager"); + // Port for receiving the [SendPort] from the new isolate. + final initPort = RawReceivePort(); + // Completer to synchronize the establishment of the main communication channel. + final connection = Completer<(ReceivePort, SendPort)>.sync(); - final db = await Bootstrap.initIsar(); - await Bootstrap.initDomain(db); - final ref = ProviderContainer( - overrides: [ - // TODO: Remove once isar is removed - dbProvider.overrideWithValue(db), - isarProvider.overrideWithValue(db), - ], - ); + initPort.handler = (initialMessage) { + if (initialMessage == null) { + // onExit handler message, isolate terminated without sending a SendPort. + initPort.close(); + connection.completeError( + RemoteError( + "Isolate exited unexpectedly during initialization${debugLabel == null ? '' : ' ($debugLabel)'}", + "", + ), + StackTrace.empty, + ); + return; + } + // The first message should be the SendPort for commands. + final commandPort = initialMessage as SendPort; + connection.complete( + (ReceivePort.fromRawReceivePort(initPort), commandPort), + ); + }; - Logger log = Logger("IsolateLogger"); + final token = RootIsolateToken.instance; + if (token == null) { + initPort.close(); // Clean up before throwing + throw const InvalidIsolateUsageException(); + } try { - final result = await computation(ref); - // Wait for isolate to end; i.e, logs to be flushed - await Future.delayed(Durations.short2); - return result; + await Isolate.spawn( + _IsolateRunner._execute, + _IsolateRunner( + computation: computation, + sendPort: initPort.sendPort, + token: token, + onCancel: onCancel, + debugLabel: debugLabel, + ), + errorsAreFatal: true, + onExit: initPort.sendPort, // Send null on exit + onError: initPort.sendPort, // Errors during spawn sent here + debugName: debugLabel, + ); + } catch (e, s) { + logger.warning( + "Failed to spawn isolate${debugLabel == null ? '' : ' - $debugLabel'}", + e, + s, + ); + initPort.close(); + rethrow; + } + + try { + final (ReceivePort receivePort, SendPort sendPort) = + await connection.future; + return IsolateManager._(sendPort, receivePort, debugLabel); + } catch (e) { + // If connection.future completed with an error (e.g., isolate exited early) + logger.warning( + "Isolate connection failed${debugLabel == null ? '' : ' - $debugLabel'}", + e, + ); + // initPort is closed by the handler in case of error/null message + rethrow; + } + } + + /// Starts the computation in the isolate. + /// + /// Returns a future that completes with the result of the computation, + /// or an error if the computation fails or is cancelled. + Future run() { + if (_isClosed) { + return Future.error(StateError("IsolateManager is already closed")); + } + if (_isRunning) { + return Future.error( + StateError("Isolate computation is already running"), + ); + } + _isRunning = true; + _sendPort.send(_IsolateCommand.start); + return _completer.future; + } + + /// Attempts to cancel the computation. + /// + /// If the computation has not yet completed, this will cause the future + /// returned by [run] to complete with an [IsolateCancelledException]. + /// An optional `onCancel` computation might be executed in the isolate. + /// + /// Does nothing if the computation has already completed or the manager is closed. + void cancel() { + if (_isClosed || _completer.isCompleted) { + return; + } + if (!_isRunning) { + _close(IsolateCancelledException(_debugLabel)); + return; + } + + // If running, send cancel command. The isolate will handle sending back the error. + _sendPort.send(_IsolateCommand.cancel); + } + + /// Closes communication channels and completes the future with the given error. + void _close([Object? error, StackTrace? stackTrace]) { + if (_isClosed) return; + _isClosed = true; + unawaited(_subscription?.cancel()); + _receivePort.close(); + if (!_completer.isCompleted) { + if (error == null) { + _completer.completeError( + StateError("IsolateManager closed without result or error."), + StackTrace.empty, + ); + } else { + _completer.completeError(error, stackTrace); + } + } + } + + /// Handles messages received from the isolate. + void _handleResponseFromIsolate(Object? response) { + if (_isClosed) return; + + // Expect list: [result] or [error, stackTrace] or [IsolateCancelledException] + final list = response as List; + Object? error; + StackTrace? stackTrace; + + if (list.length == 2) { + error = list.firstOrNull; + final remoteStack = list.elementAtOrNull(1); + if (remoteStack is StackTrace) { + stackTrace = remoteStack; + } else if (error is String && remoteStack is String?) { + // Reconstruct RemoteError if possible + error = RemoteError(error, remoteStack ?? ""); + stackTrace = (error as RemoteError).stackTrace; + } + } else if (list.length == 1) { + final result = list.firstOrNull; + if (result is IsolateCancelledException) { + error = result; + } else { + // Success case + if (!_completer.isCompleted) { + _completer.complete(result as T?); + } + _close(); + return; + } + } else { + error = RemoteError( + "Invalid message format from isolate", + response.toString(), + ); + } + + // If we reached here, it's an error or cancellation + _close(error, stackTrace); + } +} + +/// Internal helper class that runs within the isolate. +class _IsolateRunner { + final Computation _computation; + final Computation? _onCancel; + final String? _debugLabel; + + final RootIsolateToken _token; + final SendPort _sendPort; // Port to send results/errors back to main + ReceivePort? _receivePort; // Port to receive commands from main + bool _cancelled = false; + bool _computationStarted = false; + ProviderContainer? _ref; // Hold ref for cleanup + + _IsolateRunner({ + required Computation computation, + required SendPort sendPort, + required RootIsolateToken token, + Computation? onCancel, + String? debugLabel, + }) : _computation = computation, + _sendPort = sendPort, + _token = token, + _onCancel = onCancel, + _debugLabel = debugLabel; + + /// Entry point for the isolate. + static void _execute(_IsolateRunner runner) { + runner._start(); + } + + /// Initializes the isolate environment and listens for commands. + void _start() { + BackgroundIsolateBinaryMessenger.ensureInitialized(_token); + DartPluginRegistrant.ensureInitialized(); + + _receivePort = ReceivePort(); + _sendPort.send(_receivePort!.sendPort); + + // Listen for commands from the main isolate. + // ignore: avoid-passing-async-when-sync-expected + final _ = _receivePort!.listen((message) async { + if (_cancelled) return; + + switch (message) { + case _IsolateCommand.cancel: + _cancelled = true; + // Run onCancel if computation hasn't started or finished + if (_onCancel == null) { + // Close the receive port and exit when no cleanup is needed + _sendPort.send([IsolateCancelledException(_debugLabel)]); + _cleanupAndExit(); + } else { + await _runCleanup(_onCancel); + } + break; + case _IsolateCommand.start: + if (_computationStarted) return; // Ignore duplicate start + _computationStarted = true; + await _exec(); + _cleanupAndExit(); + break; + } + }); + } + + /// Executes the main computation. + Future _exec() async { + Logger log = Logger("IsolateRunner"); + try { + _ref = await _bootstrap(); + if (_cancelled) return; + + final potentiallyAsyncResult = _computation(_ref!); + final T result; + if (potentiallyAsyncResult is Future) { + result = await potentiallyAsyncResult; + } else { + result = potentiallyAsyncResult; + } + + if (_cancelled) return; + + _sendPort.send([result]); } catch (error, stack) { + if (_cancelled) return; log.severe( - "Error in runInIsolate${debugLabel == null ? '' : ' for $debugLabel'}", + "Error in computation${_debugLabel == null ? '' : ' ($_debugLabel)'}", error, stack, ); - } finally { - // Always close the new db connection on Isolate end - ref.read(driftProvider).close(); - ref.read(dbProvider).close(); - ref.read(isarProvider).close(); + _sendPort.send([error, stack]); } - return null; - }); + // Cleanup happens in _cleanupAndExit called by the listener + } + + /// Executes the onCancel computation. + Future _runCleanup(Computation cleanupFunc) async { + Logger log = Logger("IsolateRunner"); + try { + if (_ref == null) { + log.warning("IsolateRunner cleanup called without ref"); + return; + } + await cleanupFunc(_ref!); + } catch (e, s) { + log.warning("Error during isolate onCancel cleanup", e, s); + // Don't send this error back, primary goal is cancellation signal + } + } + + Future _bootstrap() async { + final db = await Bootstrap.initIsar(); + await Bootstrap.initDomain(db, shouldBufferLogs: false); + return ProviderContainer( + overrides: [ + dbProvider.overrideWithValue(db), + isarProvider.overrideWithValue(db), + cancellationProvider.overrideWithValue(Completer()), + ], + ); + } + + /// Closes resources and the receive port. + Future _cleanupAndExit() async { + // Always close the db connections and dispose ref if created + try { + await _ref?.read(driftProvider).close(); + await _ref?.read(isarProvider).close(); + _ref?.dispose(); + } catch (e) { + if (kDebugMode) { + print("Error during resource cleanup: $e"); + } + } finally { + _receivePort?.close(); + } + } } diff --git a/mobile/test/domain/service.mock.dart b/mobile/test/domain/service.mock.dart index 53a173fc28..97a3f30294 100644 --- a/mobile/test/domain/service.mock.dart +++ b/mobile/test/domain/service.mock.dart @@ -1,7 +1,10 @@ import 'package:immich_mobile/domain/services/store.service.dart'; import 'package:immich_mobile/domain/services/user.service.dart'; +import 'package:immich_mobile/domain/utils/background_sync.dart'; import 'package:mocktail/mocktail.dart'; class MockStoreService extends Mock implements StoreService {} class MockUserService extends Mock implements UserService {} + +class MockBackgroundSyncManager extends Mock implements BackgroundSyncManager {} diff --git a/mobile/test/domain/services/sync_stream_service_test.dart b/mobile/test/domain/services/sync_stream_service_test.dart index aa4ab9d896..ff2d5b40e2 100644 --- a/mobile/test/domain/services/sync_stream_service_test.dart +++ b/mobile/test/domain/services/sync_stream_service_test.dart @@ -7,6 +7,7 @@ import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; import 'package:immich_mobile/domain/models/sync_event.model.dart'; import 'package:immich_mobile/domain/services/sync_stream.service.dart'; +import 'package:immich_mobile/domain/utils/cancel.exception.dart'; import 'package:mocktail/mocktail.dart'; import 'package:openapi/api.dart'; @@ -57,7 +58,9 @@ void main() { }); tearDown(() async { - await streamController.close(); + if (!streamController.isClosed) { + await streamController.close(); + } }); // Helper to trigger sync and add events to the stream @@ -83,8 +86,8 @@ void main() { await expectLater(future, completes); // Verify ack includes last ack from each successfully handled type verify( - () => mockSyncApiRepo - .ack(any(that: containsAllInOrder(["5", "2", "4", "3"]))), + () => + mockSyncApiRepo.ack(any(that: containsAll(["5", "2", "4", "3"]))), ).called(1); }, ); @@ -159,9 +162,8 @@ void main() { await triggerSyncAndEmit(events); // Expect ack only for userDeleteV1 (ack: "2") and partnerDeleteV1 (ack: "4") - verify( - () => mockSyncApiRepo.ack(any(that: containsAllInOrder(["2", "4"]))), - ).called(1); + verify(() => mockSyncApiRepo.ack(any(that: containsAll(["2", "4"])))) + .called(1); }); test("does not process or ack when stream emits an empty list", () async { @@ -183,15 +185,19 @@ void main() { int callOrder = 0; int handler1StartOrder = -1; int handler2StartOrder = -1; + int handler1Calls = 0; + int handler2Calls = 0; when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer((_) async { - handler1StartOrder = ++callOrder; // Record when handler 1 starts - await completer1.future; // Wait for external signal + handler1Calls++; + handler1StartOrder = ++callOrder; + await completer1.future; return true; }); when(() => mockSyncStreamRepo.updatePartnerV1(any())) .thenAnswer((_) async { - handler2StartOrder = ++callOrder; // Record when handler 2 starts + handler2Calls++; + handler2StartOrder = ++callOrder; await completer2.future; return true; }); @@ -200,44 +206,98 @@ void main() { final batch2 = SyncStreamStub.partnerEvents; final syncFuture = sut.syncUsers(); - streamController.add(batch1); // Emit first batch + await pumpEventQueue(); + + streamController.add(batch1); + await pumpEventQueue(); + // Small delay to ensure the first handler starts + await Future.delayed(const Duration(milliseconds: 20)); - // Ensure first batch processing starts - await Future.delayed(const Duration(milliseconds: 10)); expect(handler1StartOrder, 1, reason: "Handler 1 should start first"); + expect(handler1Calls, 1); - streamController.add(batch2); // Emit second batch while first is waiting + streamController.add(batch2); + await pumpEventQueue(); + // Small delay + await Future.delayed(const Duration(milliseconds: 20)); - await Future.delayed(const Duration(milliseconds: 10)); - // Handler 2 should NOT have started yet because mutex is held by handler 1 - expect( - handler2StartOrder, - -1, - reason: "Handler 2 should wait for Handler 1", - ); + expect(handler2StartOrder, -1, reason: "Handler 2 should wait"); + expect(handler2Calls, 0); - completer1.complete(); // Allow first handler to finish - // Allow second batch to start processing - await Future.delayed(const Duration(milliseconds: 10)); + completer1.complete(); + await pumpEventQueue(times: 40); + // Small delay to ensure the second handler starts + await Future.delayed(const Duration(milliseconds: 20)); - // Now handler 2 should start - expect( - handler2StartOrder, - 2, - reason: "Handler 2 should start after Handler 1 finishes", - ); + expect(handler2StartOrder, 2, reason: "Handler 2 should start after H1"); + expect(handler2Calls, 1); - completer2.complete(); // Allow second handler to finish - await streamController.close(); // Close stream - await syncFuture; // Wait for overall completion + completer2.complete(); + await pumpEventQueue(times: 40); + // Small delay before closing the stream + await Future.delayed(const Duration(milliseconds: 20)); + + if (!streamController.isClosed) { + await streamController.close(); + } + await pumpEventQueue(times: 40); + // Small delay to ensure the sync completes + await Future.delayed(const Duration(milliseconds: 20)); + + await syncFuture; - // Verify handlers were called and acks sent for both batches eventually verify(() => mockSyncStreamRepo.updateUsersV1(any())).called(1); verify(() => mockSyncStreamRepo.updatePartnerV1(any())).called(1); - // Acks might be called separately or combined depending on timing, check count verify(() => mockSyncApiRepo.ack(any())).called(2); }); + test( + "stops processing and ack when cancel completer is completed", + () async { + final cancelCompleter = Completer(); + sut = SyncStreamService( + syncApiRepository: mockSyncApiRepo, + syncStreamRepository: mockSyncStreamRepo, + cancelCompleter: cancelCompleter, + ); + + final processingCompleter = Completer(); + bool handlerStarted = false; + + // Make handler wait so we can cancel it mid-flight + when(() => mockSyncStreamRepo.updateUsersV1(any())) + .thenAnswer((_) async { + handlerStarted = true; + await processingCompleter + .future; // Wait indefinitely until test completes it + return true; + }); + + final syncFuture = sut.syncUsers(); + await pumpEventQueue(times: 30); + + streamController.add(SyncStreamStub.userEvents); + // Ensure processing starts + await Future.delayed(const Duration(milliseconds: 10)); + + expect(handlerStarted, isTrue, reason: "Handler should have started"); + + cancelCompleter.complete(true); // Signal cancellation + + // Allow cancellation logic to propagate + await Future.delayed(const Duration(milliseconds: 10)); + + // Complete the handler's completer after cancellation signal + // to ensure the cancellation logic itself isn't blocked by the handler. + processingCompleter.complete(); + + await expectLater(syncFuture, throwsA(isA())); + + // Verify that ack was NOT called because processing was cancelled + verifyNever(() => mockSyncApiRepo.ack(any())); + }, + ); + test("completes successfully when ack call throws an exception", () async { when(() => mockSyncApiRepo.ack(any())).thenThrow(Exception("Ack Error")); final events = [ diff --git a/mobile/test/services/auth.service_test.dart b/mobile/test/services/auth.service_test.dart index e4f011d940..4ada98a6c9 100644 --- a/mobile/test/services/auth.service_test.dart +++ b/mobile/test/services/auth.service_test.dart @@ -8,6 +8,7 @@ import 'package:isar/isar.dart'; import 'package:mocktail/mocktail.dart'; import 'package:openapi/api.dart'; +import '../domain/service.mock.dart'; import '../repository.mocks.dart'; import '../service.mocks.dart'; import '../test_utils.dart'; @@ -18,6 +19,7 @@ void main() { late MockAuthRepository authRepository; late MockApiService apiService; late MockNetworkService networkService; + late MockBackgroundSyncManager backgroundSyncManager; late Isar db; setUp(() async { @@ -25,12 +27,14 @@ void main() { authRepository = MockAuthRepository(); apiService = MockApiService(); networkService = MockNetworkService(); + backgroundSyncManager = MockBackgroundSyncManager(); sut = AuthService( authApiRepository, authRepository, apiService, networkService, + backgroundSyncManager, ); registerFallbackValue(Uri()); @@ -116,24 +120,28 @@ void main() { group('logout', () { test('Should logout user', () async { when(() => authApiRepository.logout()).thenAnswer((_) async => {}); + when(() => backgroundSyncManager.cancel()).thenAnswer((_) async => {}); when(() => authRepository.clearLocalData()) .thenAnswer((_) => Future.value(null)); await sut.logout(); verify(() => authApiRepository.logout()).called(1); + verify(() => backgroundSyncManager.cancel()).called(1); verify(() => authRepository.clearLocalData()).called(1); }); test('Should clear local data even on server error', () async { when(() => authApiRepository.logout()) .thenThrow(Exception('Server error')); + when(() => backgroundSyncManager.cancel()).thenAnswer((_) async => {}); when(() => authRepository.clearLocalData()) .thenAnswer((_) => Future.value(null)); await sut.logout(); verify(() => authApiRepository.logout()).called(1); + verify(() => backgroundSyncManager.cancel()).called(1); verify(() => authRepository.clearLocalData()).called(1); }); });