diff --git a/mobile/lib/domain/services/sync_stream.service.dart b/mobile/lib/domain/services/sync_stream.service.dart index 265ba0b09f..6b9ee9963f 100644 --- a/mobile/lib/domain/services/sync_stream.service.dart +++ b/mobile/lib/domain/services/sync_stream.service.dart @@ -5,24 +5,24 @@ import 'dart:async'; import 'package:collection/collection.dart'; import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; -import 'package:immich_mobile/domain/utils/cancel.exception.dart'; import 'package:logging/logging.dart'; import 'package:openapi/api.dart'; +import 'package:worker_manager/worker_manager.dart'; class SyncStreamService { final Logger _logger = Logger('SyncStreamService'); final ISyncApiRepository _syncApiRepository; final ISyncStreamRepository _syncStreamRepository; - final Completer? _cancelCompleter; + final bool Function()? _cancelChecker; SyncStreamService({ required ISyncApiRepository syncApiRepository, required ISyncStreamRepository syncStreamRepository, - Completer? cancelCompleter, + bool Function()? cancelChecker, }) : _syncApiRepository = syncApiRepository, _syncStreamRepository = syncStreamRepository, - _cancelCompleter = cancelCompleter; + _cancelChecker = cancelChecker; Future _handleSyncData( SyncEntityType type, @@ -61,7 +61,7 @@ class SyncStreamService { return false; } - Future _syncEvent(List types) async { + Future _syncEvent(List types) { _logger.info("Syncing Events: $types"); final streamCompleter = Completer(); bool shouldComplete = false; @@ -70,101 +70,111 @@ class SyncStreamService { // before the events are processed and also that events are processed sequentially Completer? mutex; StreamSubscription? subscription; - subscription = _syncApiRepository.getSyncEvents(types).listen( - (events) async { - if (events.isEmpty) { - _logger.warning("Received empty sync events"); - return; - } - - // If previous events are still being processed, wait for them to finish - if (mutex != null) { - await mutex!.future; - } - - if (_cancelCompleter?.isCompleted ?? false) { - _logger.info("Sync cancelled, stopping stream"); - subscription?.cancel(); - if (!streamCompleter.isCompleted) { - streamCompleter.completeError( - const CancelException(), - StackTrace.current, - ); + try { + subscription = _syncApiRepository.getSyncEvents(types).listen( + (events) async { + if (events.isEmpty) { + _logger.warning("Received empty sync events"); + return; } - return; - } - // Take control of the mutex and process the events - mutex = Completer(); + // If previous events are still being processed, wait for them to finish + if (mutex != null) { + await mutex!.future; + } - try { - final eventsMap = events.groupListsBy((event) => event.type); - final Map acks = {}; + if (_cancelChecker?.call() ?? false) { + _logger.info("Sync cancelled, stopping stream"); + subscription?.cancel(); + if (!streamCompleter.isCompleted) { + streamCompleter.completeError( + CanceledError(), + StackTrace.current, + ); + } + return; + } - for (final entry in eventsMap.entries) { - if (_cancelCompleter?.isCompleted ?? false) { - _logger.info("Sync cancelled, stopping stream"); - mutex?.complete(); - mutex = null; - if (!streamCompleter.isCompleted) { - streamCompleter.completeError( - const CancelException(), - StackTrace.current, - ); + // Take control of the mutex and process the events + mutex = Completer(); + + try { + final eventsMap = events.groupListsBy((event) => event.type); + final Map acks = {}; + + for (final entry in eventsMap.entries) { + if (_cancelChecker?.call() ?? false) { + _logger.info("Sync cancelled, stopping stream"); + mutex?.complete(); + mutex = null; + if (!streamCompleter.isCompleted) { + streamCompleter.completeError( + CanceledError(), + StackTrace.current, + ); + } + + return; } - return; + final type = entry.key; + final data = entry.value; + + if (data.isEmpty) { + _logger.warning("Received empty sync events for $type"); + continue; + } + + if (await _handleSyncData(type, data.map((e) => e.data))) { + // ignore: avoid-unsafe-collection-methods + acks[type] = data.last.ack; + } else { + _logger.warning("Failed to handle sync events for $type"); + } } - final type = entry.key; - final data = entry.value; - - if (data.isEmpty) { - _logger.warning("Received empty sync events for $type"); - continue; - } - - if (await _handleSyncData(type, data.map((e) => e.data))) { - // ignore: avoid-unsafe-collection-methods - acks[type] = data.last.ack; - } else { - _logger.warning("Failed to handle sync events for $type"); + if (acks.isNotEmpty) { + await _syncApiRepository.ack(acks.values.toList()); } + _logger.info("$types events processed"); + } catch (error, stack) { + _logger.warning("Error handling sync events", error, stack); + } finally { + mutex?.complete(); + mutex = null; } - if (acks.isNotEmpty) { - await _syncApiRepository.ack(acks.values.toList()); + if (shouldComplete) { + _logger.info("Sync done, completing stream"); + if (!streamCompleter.isCompleted) streamCompleter.complete(); } - _logger.info("$types events processed"); - } catch (error, stack) { - _logger.warning("Error handling sync events", error, stack); - } finally { - mutex?.complete(); - mutex = null; - } - - if (shouldComplete) { - _logger.info("Sync done, completing stream"); - if (!streamCompleter.isCompleted) streamCompleter.complete(); - } - }, - onError: (error, stack) { - _logger.warning("Error in sync stream for $types", error, stack); - // Do not proceed if the stream errors - if (!streamCompleter.isCompleted) streamCompleter.complete(); - }, - onDone: () { - _logger.info("$types stream done"); - if (mutex == null && !streamCompleter.isCompleted) { - streamCompleter.complete(); - } else { - // Marks the stream as done but does not complete the completer - // until the events are processed - shouldComplete = true; - } - }, - ); - return await streamCompleter.future.whenComplete(() { + }, + onError: (error, stack) { + _logger.warning("Error in sync stream for $types", error, stack); + // Do not proceed if the stream errors + if (!streamCompleter.isCompleted) { + // ignore: avoid-missing-completer-stack-trace + streamCompleter.completeError(error, stack); + } + }, + onDone: () { + _logger.info("$types stream done"); + if (mutex == null && !streamCompleter.isCompleted) { + streamCompleter.complete(); + } else { + // Marks the stream as done but does not complete the completer + // until the events are processed + shouldComplete = true; + } + }, + ); + } catch (error, stack) { + _logger.severe("Error starting sync stream", error, stack); + if (!streamCompleter.isCompleted) { + streamCompleter.completeError(error, stack); + } + } + return streamCompleter.future.whenComplete(() { _logger.info("Sync stream completed"); return subscription?.cancel(); }); diff --git a/mobile/lib/domain/utils/background_sync.dart b/mobile/lib/domain/utils/background_sync.dart index 0016c1cabb..dfa392b1fe 100644 --- a/mobile/lib/domain/utils/background_sync.dart +++ b/mobile/lib/domain/utils/background_sync.dart @@ -2,41 +2,30 @@ import 'dart:async'; -import 'package:async/async.dart'; -import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart'; import 'package:immich_mobile/providers/infrastructure/sync_stream.provider.dart'; import 'package:immich_mobile/utils/isolate.dart'; +import 'package:worker_manager/worker_manager.dart'; class BackgroundSyncManager { - CancelableOperation? _userSyncFuture; + Cancelable? _userSyncTask; BackgroundSyncManager(); - Future cancel() async { - await _userSyncFuture?.cancel(); - _userSyncFuture = null; + void cancel() { + _userSyncTask?.cancel(); + _userSyncTask = null; } Future syncUsers() async { - if (_userSyncFuture != null) { - return _userSyncFuture!.valueOrCancellation(); + if (_userSyncTask != null) { + return _userSyncTask!.future; } - if (_userSyncFuture == null) { - final isolate = await IsolateManager.spawn( - computation: (ref) => ref.read(syncStreamServiceProvider).syncUsers(), - onCancel: (ref) => ref.read(cancellationProvider).complete(true), - ); - _userSyncFuture = CancelableOperation.fromFuture( - isolate.run(), - onCancel: () { - isolate.cancel(); - _userSyncFuture = null; - }, - ); - return _userSyncFuture! - .valueOrCancellation() - .then((_) => _userSyncFuture = null); - } + _userSyncTask = runInIsolateGentle( + computation: (ref) => ref.read(syncStreamServiceProvider).syncUsers(), + ); + _userSyncTask!.whenComplete(() { + _userSyncTask = null; + }); } } diff --git a/mobile/lib/domain/utils/cancel.exception.dart b/mobile/lib/domain/utils/cancel.exception.dart deleted file mode 100644 index 702660b79c..0000000000 --- a/mobile/lib/domain/utils/cancel.exception.dart +++ /dev/null @@ -1,8 +0,0 @@ -class CancelException implements Exception { - final String message; - - const CancelException([this.message = "Operation was cancelled."]); - - @override - String toString() => "CancelException: $message"; -} diff --git a/mobile/lib/main.dart b/mobile/lib/main.dart index 1a434aa359..73af81d69d 100644 --- a/mobile/lib/main.dart +++ b/mobile/lib/main.dart @@ -11,6 +11,7 @@ import 'package:flutter_displaymode/flutter_displaymode.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/constants/locales.dart'; import 'package:immich_mobile/extensions/build_context_extensions.dart'; +import 'package:immich_mobile/generated/codegen_loader.g.dart'; import 'package:immich_mobile/providers/app_life_cycle.provider.dart'; import 'package:immich_mobile/providers/asset_viewer/share_intent_upload.provider.dart'; import 'package:immich_mobile/providers/db.provider.dart'; @@ -31,13 +32,15 @@ import 'package:immich_mobile/utils/migration.dart'; import 'package:intl/date_symbol_data_local.dart'; import 'package:logging/logging.dart'; import 'package:timezone/data/latest.dart'; -import 'package:immich_mobile/generated/codegen_loader.g.dart'; +import 'package:worker_manager/worker_manager.dart'; void main() async { ImmichWidgetsBinding(); final db = await Bootstrap.initIsar(); await Bootstrap.initDomain(db); await initApp(); + // Warm-up isolate pool for worker manager + await workerManager.init(dynamicSpawning: true); await migrateDatabaseIfNeeded(db); HttpOverrides.global = HttpSSLCertOverride(); diff --git a/mobile/lib/providers/background_sync.provider.dart b/mobile/lib/providers/background_sync.provider.dart index dfc880c908..83d103bb3b 100644 --- a/mobile/lib/providers/background_sync.provider.dart +++ b/mobile/lib/providers/background_sync.provider.dart @@ -1,10 +1,8 @@ -import 'dart:async'; - import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/utils/background_sync.dart'; final backgroundSyncProvider = Provider((ref) { final manager = BackgroundSyncManager(); - ref.onDispose(() => unawaited(manager.cancel())); + ref.onDispose(manager.cancel); return manager; }); diff --git a/mobile/lib/providers/infrastructure/cancel.provider.dart b/mobile/lib/providers/infrastructure/cancel.provider.dart index 37d0ad3180..6851861e1a 100644 --- a/mobile/lib/providers/infrastructure/cancel.provider.dart +++ b/mobile/lib/providers/infrastructure/cancel.provider.dart @@ -1,10 +1,8 @@ -import 'dart:async'; - import 'package:hooks_riverpod/hooks_riverpod.dart'; -/// Provider holding a boolean notifier that becomes true when cancellation is requested. -/// A computation running in the isolate use the completer to implement cooperative cancellation. -final cancellationProvider = Provider>( +/// Provider holding a boolean function that returns true when cancellation is requested. +/// A computation running in the isolate uses the function to implement cooperative cancellation. +final cancellationProvider = Provider( // This will be overridden in the isolate's container. // Throwing ensures it's not used without an override. (ref) => throw UnimplementedError( diff --git a/mobile/lib/providers/infrastructure/sync_stream.provider.dart b/mobile/lib/providers/infrastructure/sync_stream.provider.dart index 6b182e3475..e313982a30 100644 --- a/mobile/lib/providers/infrastructure/sync_stream.provider.dart +++ b/mobile/lib/providers/infrastructure/sync_stream.provider.dart @@ -10,7 +10,7 @@ final syncStreamServiceProvider = Provider( (ref) => SyncStreamService( syncApiRepository: ref.watch(syncApiRepositoryProvider), syncStreamRepository: ref.watch(syncStreamRepositoryProvider), - cancelCompleter: ref.watch(cancellationProvider), + cancelChecker: ref.watch(cancellationProvider), ), ); diff --git a/mobile/lib/services/auth.service.dart b/mobile/lib/services/auth.service.dart index 36b73f980a..4ffa9444c5 100644 --- a/mobile/lib/services/auth.service.dart +++ b/mobile/lib/services/auth.service.dart @@ -121,8 +121,8 @@ class AuthService { /// /// All deletions are executed in parallel using [Future.wait]. Future clearLocalData() { + _backgroundSyncManager.cancel(); return Future.wait([ - _backgroundSyncManager.cancel(), _authRepository.clearLocalData(), Store.delete(StoreKey.currentUser), Store.delete(StoreKey.accessToken), diff --git a/mobile/lib/utils/isolate.dart b/mobile/lib/utils/isolate.dart index 7041b3d417..cfbb1b544f 100644 --- a/mobile/lib/utils/isolate.dart +++ b/mobile/lib/utils/isolate.dart @@ -1,10 +1,6 @@ -// ignore_for_file: avoid-missing-completer-stack-trace - import 'dart:async'; -import 'dart:isolate'; import 'dart:ui'; -import 'package:flutter/foundation.dart'; import 'package:flutter/services.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/providers/db.provider.dart'; @@ -12,390 +8,62 @@ import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart'; import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; import 'package:immich_mobile/utils/bootstrap.dart'; import 'package:logging/logging.dart'; +import 'package:worker_manager/worker_manager.dart'; -/// Exception thrown when IsolateManager is used incorrectly from a non-root isolate. class InvalidIsolateUsageException implements Exception { const InvalidIsolateUsageException(); @override String toString() => - "IsolateManager should only be used from the root isolate"; + "IsolateHelper should only be used from the root isolate"; } -/// Exception thrown when an isolate computation is cancelled via [IsolateManager.cancel]. -class IsolateCancelledException implements Exception { - final String? debugLabel; - const IsolateCancelledException([this.debugLabel]); - - @override - String toString() => - "Isolate computation${debugLabel == null ? '' : ' ($debugLabel)'} was cancelled."; -} - -/// Signature for the computation function to be executed in the isolate. -/// It receives a Riverpod [ProviderContainer] for dependency injection. -typedef Computation = FutureOr Function(ProviderContainer ref); - -enum _IsolateCommand { - cancel, - start, -} - -/// Manages running a [Computation] in a separate isolate with Riverpod support -/// and optional cancellation. -/// -/// Similar to [compute] or [Isolate.run], but provides: -/// - A pre-configured Riverpod [ProviderContainer] to the computation. -/// - An optional `onCancel` computation for cleanup. -/// - A `cancel()` method to attempt cancellation. -class IsolateManager { - final SendPort _sendPort; - final ReceivePort _receivePort; - final Completer _completer = Completer(); - final String? _debugLabel; - StreamSubscription? _subscription; - bool _isClosed = false; - bool _isRunning = false; - - IsolateManager._(this._sendPort, this._receivePort, this._debugLabel) { - _subscription = _receivePort.listen( - _handleResponseFromIsolate, - onDone: () { - // If the port closes before the completer finishes, signal an error. - if (_completer.isCompleted) { - return; - } - - _completer.completeError( - RemoteError( - "Isolate terminated unexpectedly${_debugLabel == null ? '' : ' ($_debugLabel)'}", - "", - ), - StackTrace.empty, - ); - _isClosed = true; - }, - ); +// !! Should be used only from the root isolate +Cancelable runInIsolateGentle({ + required Future Function(ProviderContainer ref) computation, + String? debugLabel, +}) { + final token = RootIsolateToken.instance; + if (token == null) { + throw const InvalidIsolateUsageException(); } - /// Spawns a new isolate and prepares it to run the [computation]. - /// - /// - [computation]: The function to run in the isolate. - /// - [onCancel]: An optional function to run if `cancel()` is called before the computation completes. - /// - [debugLabel]: An optional label for debugging. - /// - /// Must be called from the root isolate. - static Future> spawn({ - required Computation computation, - Computation? onCancel, - String? debugLabel, - }) async { - final Logger logger = Logger("IsolateManager"); - // Port for receiving the [SendPort] from the new isolate. - final initPort = RawReceivePort(); - // Completer to synchronize the establishment of the main communication channel. - final connection = Completer<(ReceivePort, SendPort)>.sync(); - - initPort.handler = (initialMessage) { - if (initialMessage == null) { - // onExit handler message, isolate terminated without sending a SendPort. - initPort.close(); - connection.completeError( - RemoteError( - "Isolate exited unexpectedly during initialization${debugLabel == null ? '' : ' ($debugLabel)'}", - "", - ), - StackTrace.empty, - ); - return; - } - // The first message should be the SendPort for commands. - final commandPort = initialMessage as SendPort; - connection.complete( - (ReceivePort.fromRawReceivePort(initPort), commandPort), - ); - }; - - final token = RootIsolateToken.instance; - if (token == null) { - initPort.close(); // Clean up before throwing - throw const InvalidIsolateUsageException(); - } - - try { - await Isolate.spawn( - _IsolateRunner._execute, - _IsolateRunner( - computation: computation, - sendPort: initPort.sendPort, - token: token, - onCancel: onCancel, - debugLabel: debugLabel, - ), - errorsAreFatal: true, - onExit: initPort.sendPort, // Send null on exit - onError: initPort.sendPort, // Errors during spawn sent here - debugName: debugLabel, - ); - } catch (e, s) { - logger.warning( - "Failed to spawn isolate${debugLabel == null ? '' : ' - $debugLabel'}", - e, - s, - ); - initPort.close(); - rethrow; - } - - try { - final (ReceivePort receivePort, SendPort sendPort) = - await connection.future; - return IsolateManager._(sendPort, receivePort, debugLabel); - } catch (e) { - // If connection.future completed with an error (e.g., isolate exited early) - logger.warning( - "Isolate connection failed${debugLabel == null ? '' : ' - $debugLabel'}", - e, - ); - // initPort is closed by the handler in case of error/null message - rethrow; - } - } - - /// Starts the computation in the isolate. - /// - /// Returns a future that completes with the result of the computation, - /// or an error if the computation fails or is cancelled. - Future run() { - if (_isClosed) { - return Future.error(StateError("IsolateManager is already closed")); - } - if (_isRunning) { - return Future.error( - StateError("Isolate computation is already running"), - ); - } - _isRunning = true; - _sendPort.send(_IsolateCommand.start); - return _completer.future; - } - - /// Attempts to cancel the computation. - /// - /// If the computation has not yet completed, this will cause the future - /// returned by [run] to complete with an [IsolateCancelledException]. - /// An optional `onCancel` computation might be executed in the isolate. - /// - /// Does nothing if the computation has already completed or the manager is closed. - void cancel() { - if (_isClosed || _completer.isCompleted) { - return; - } - if (!_isRunning) { - _close(IsolateCancelledException(_debugLabel)); - return; - } - - // If running, send cancel command. The isolate will handle sending back the error. - _sendPort.send(_IsolateCommand.cancel); - } - - /// Closes communication channels and completes the future with the given error. - void _close([Object? error, StackTrace? stackTrace]) { - if (_isClosed) return; - _isClosed = true; - unawaited(_subscription?.cancel()); - _receivePort.close(); - if (!_completer.isCompleted) { - if (error == null) { - _completer.completeError( - StateError("IsolateManager closed without result or error."), - StackTrace.empty, - ); - } else { - _completer.completeError(error, stackTrace); - } - } - } - - /// Handles messages received from the isolate. - void _handleResponseFromIsolate(Object? response) { - if (_isClosed) return; - - // Expect list: [result] or [error, stackTrace] or [IsolateCancelledException] - final list = response as List; - Object? error; - StackTrace? stackTrace; - - if (list.length == 2) { - error = list.firstOrNull; - final remoteStack = list.elementAtOrNull(1); - if (remoteStack is StackTrace) { - stackTrace = remoteStack; - } else if (error is String && remoteStack is String?) { - // Reconstruct RemoteError if possible - error = RemoteError(error, remoteStack ?? ""); - stackTrace = (error as RemoteError).stackTrace; - } - } else if (list.length == 1) { - final result = list.firstOrNull; - if (result is IsolateCancelledException) { - error = result; - } else { - // Success case - if (!_completer.isCompleted) { - _completer.complete(result as T?); - } - _close(); - return; - } - } else { - error = RemoteError( - "Invalid message format from isolate", - response.toString(), - ); - } - - // If we reached here, it's an error or cancellation - _close(error, stackTrace); - } -} - -/// Internal helper class that runs within the isolate. -class _IsolateRunner { - final Computation _computation; - final Computation? _onCancel; - final String? _debugLabel; - - final RootIsolateToken _token; - final SendPort _sendPort; // Port to send results/errors back to main - ReceivePort? _receivePort; // Port to receive commands from main - bool _cancelled = false; - bool _computationStarted = false; - ProviderContainer? _ref; // Hold ref for cleanup - - _IsolateRunner({ - required Computation computation, - required SendPort sendPort, - required RootIsolateToken token, - Computation? onCancel, - String? debugLabel, - }) : _computation = computation, - _sendPort = sendPort, - _token = token, - _onCancel = onCancel, - _debugLabel = debugLabel; - - /// Entry point for the isolate. - static void _execute(_IsolateRunner runner) { - runner._start(); - } - - /// Initializes the isolate environment and listens for commands. - void _start() { - BackgroundIsolateBinaryMessenger.ensureInitialized(_token); + return workerManager.executeGentle((cancelledChecker) async { + BackgroundIsolateBinaryMessenger.ensureInitialized(token); DartPluginRegistrant.ensureInitialized(); - _receivePort = ReceivePort(); - _sendPort.send(_receivePort!.sendPort); + final db = await Bootstrap.initIsar(); + await Bootstrap.initDomain(db, shouldBufferLogs: false); + final ref = ProviderContainer( + overrides: [ + // TODO: Remove once isar is removed + dbProvider.overrideWithValue(db), + isarProvider.overrideWithValue(db), + cancellationProvider.overrideWithValue(cancelledChecker), + ], + ); - // Listen for commands from the main isolate. - // ignore: avoid-passing-async-when-sync-expected - final _ = _receivePort!.listen((message) async { - if (_cancelled) return; + Logger log = Logger("IsolateLogger"); - switch (message) { - case _IsolateCommand.cancel: - _cancelled = true; - // Run onCancel if computation hasn't started or finished - if (_onCancel == null) { - // Close the receive port and exit when no cleanup is needed - _sendPort.send([IsolateCancelledException(_debugLabel)]); - _cleanupAndExit(); - } else { - await _runCleanup(_onCancel); - } - break; - case _IsolateCommand.start: - if (_computationStarted) return; // Ignore duplicate start - _computationStarted = true; - await _exec(); - _cleanupAndExit(); - break; - } - }); - } - - /// Executes the main computation. - Future _exec() async { - Logger log = Logger("IsolateRunner"); try { - _ref = await _bootstrap(); - if (_cancelled) return; - - final potentiallyAsyncResult = _computation(_ref!); - final T result; - if (potentiallyAsyncResult is Future) { - result = await potentiallyAsyncResult; - } else { - result = potentiallyAsyncResult; - } - - if (_cancelled) return; - - _sendPort.send([result]); + return await computation(ref); + } on CanceledError { + log.warning( + "Computation cancelled ${debugLabel == null ? '' : ' for $debugLabel'}", + ); } catch (error, stack) { - if (_cancelled) return; log.severe( - "Error in computation${_debugLabel == null ? '' : ' ($_debugLabel)'}", + "Error in runInIsolateGentle ${debugLabel == null ? '' : ' for $debugLabel'}", error, stack, ); - _sendPort.send([error, stack]); - } - // Cleanup happens in _cleanupAndExit called by the listener - } - - /// Executes the onCancel computation. - Future _runCleanup(Computation cleanupFunc) async { - Logger log = Logger("IsolateRunner"); - try { - if (_ref == null) { - log.warning("IsolateRunner cleanup called without ref"); - return; - } - await cleanupFunc(_ref!); - } catch (e, s) { - log.warning("Error during isolate onCancel cleanup", e, s); - // Don't send this error back, primary goal is cancellation signal - } - } - - Future _bootstrap() async { - final db = await Bootstrap.initIsar(); - await Bootstrap.initDomain(db, shouldBufferLogs: false); - return ProviderContainer( - overrides: [ - dbProvider.overrideWithValue(db), - isarProvider.overrideWithValue(db), - cancellationProvider.overrideWithValue(Completer()), - ], - ); - } - - /// Closes resources and the receive port. - Future _cleanupAndExit() async { - // Always close the db connections and dispose ref if created - try { - await _ref?.read(driftProvider).close(); - await _ref?.read(isarProvider).close(); - _ref?.dispose(); - } catch (e) { - if (kDebugMode) { - print("Error during resource cleanup: $e"); - } } finally { - _receivePort?.close(); + // Wait for the logs to flush + await Future.delayed(const Duration(seconds: 2)); + // Always close the new db connection on Isolate end + ref.read(driftProvider).close(); + ref.read(isarProvider).close(); } - } + return null; + }); } diff --git a/mobile/pubspec.lock b/mobile/pubspec.lock index 16e23c403b..1e24807306 100644 --- a/mobile/pubspec.lock +++ b/mobile/pubspec.lock @@ -1933,6 +1933,14 @@ packages: url: "https://pub.dev" source: hosted version: "0.0.3" + worker_manager: + dependency: "direct main" + description: + name: worker_manager + sha256: "086ed63e9b36266e851404ca90fd44e37c0f4c9bbf819e5f8d7c87f9741c0591" + url: "https://pub.dev" + source: hosted + version: "7.2.3" xdg_directories: dependency: transitive description: diff --git a/mobile/pubspec.yaml b/mobile/pubspec.yaml index 0bd67523ef..f67db7d0ad 100644 --- a/mobile/pubspec.yaml +++ b/mobile/pubspec.yaml @@ -62,6 +62,7 @@ dependencies: url_launcher: ^6.3.1 uuid: ^4.5.1 wakelock_plus: ^1.2.10 + worker_manager: ^7.2.3 native_video_player: git: diff --git a/mobile/test/domain/services/sync_stream_service_test.dart b/mobile/test/domain/services/sync_stream_service_test.dart index ff2d5b40e2..01ba4ae68b 100644 --- a/mobile/test/domain/services/sync_stream_service_test.dart +++ b/mobile/test/domain/services/sync_stream_service_test.dart @@ -7,13 +7,21 @@ import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; import 'package:immich_mobile/domain/models/sync_event.model.dart'; import 'package:immich_mobile/domain/services/sync_stream.service.dart'; -import 'package:immich_mobile/domain/utils/cancel.exception.dart'; import 'package:mocktail/mocktail.dart'; import 'package:openapi/api.dart'; +import 'package:worker_manager/worker_manager.dart'; import '../../fixtures/sync_stream.stub.dart'; import '../../infrastructure/repository.mock.dart'; +class _CancellationWrapper { + const _CancellationWrapper(); + + bool isCancelled() => false; +} + +class _MockCancellationWrapper extends Mock implements _CancellationWrapper {} + void main() { late SyncStreamService sut; late ISyncStreamRepository mockSyncStreamRepo; @@ -96,7 +104,7 @@ void main() { when(() => mockSyncApiRepo.getSyncEvents(any())) .thenAnswer((_) => Stream.error(Exception("Stream Error"))); // Should complete gracefully without throwing - await expectLater(sut.syncUsers(), completes); + await expectLater(sut.syncUsers(), throwsException); verifyNever(() => mockSyncApiRepo.ack(any())); // No ack on stream error }); @@ -252,13 +260,15 @@ void main() { }); test( - "stops processing and ack when cancel completer is completed", + "stops processing and ack when cancel checker is completed", () async { - final cancelCompleter = Completer(); + final cancellationChecker = _MockCancellationWrapper(); + when(() => cancellationChecker.isCancelled()).thenAnswer((_) => false); + sut = SyncStreamService( syncApiRepository: mockSyncApiRepo, syncStreamRepository: mockSyncStreamRepo, - cancelCompleter: cancelCompleter, + cancelChecker: cancellationChecker.isCancelled, ); final processingCompleter = Completer(); @@ -282,7 +292,7 @@ void main() { expect(handlerStarted, isTrue, reason: "Handler should have started"); - cancelCompleter.complete(true); // Signal cancellation + when(() => cancellationChecker.isCancelled()).thenAnswer((_) => true); // Allow cancellation logic to propagate await Future.delayed(const Duration(milliseconds: 10)); @@ -291,7 +301,7 @@ void main() { // to ensure the cancellation logic itself isn't blocked by the handler. processingCompleter.complete(); - await expectLater(syncFuture, throwsA(isA())); + await expectLater(syncFuture, throwsA(isA())); // Verify that ack was NOT called because processing was cancelled verifyNever(() => mockSyncApiRepo.ack(any())); diff --git a/mobile/test/services/auth.service_test.dart b/mobile/test/services/auth.service_test.dart index 4ada98a6c9..bbb54d98d5 100644 --- a/mobile/test/services/auth.service_test.dart +++ b/mobile/test/services/auth.service_test.dart @@ -120,7 +120,7 @@ void main() { group('logout', () { test('Should logout user', () async { when(() => authApiRepository.logout()).thenAnswer((_) async => {}); - when(() => backgroundSyncManager.cancel()).thenAnswer((_) async => {}); + when(() => backgroundSyncManager.cancel()).thenAnswer((_) => {}); when(() => authRepository.clearLocalData()) .thenAnswer((_) => Future.value(null)); @@ -134,7 +134,7 @@ void main() { test('Should clear local data even on server error', () async { when(() => authApiRepository.logout()) .thenThrow(Exception('Server error')); - when(() => backgroundSyncManager.cancel()).thenAnswer((_) async => {}); + when(() => backgroundSyncManager.cancel()).thenAnswer((_) => {}); when(() => authRepository.clearLocalData()) .thenAnswer((_) => Future.value(null));