rewrite with worker_manager

This commit is contained in:
shenlong-tanwen 2025-04-16 17:33:03 +05:30
parent 92210b3bbb
commit c427f5204b
13 changed files with 185 additions and 508 deletions

View File

@ -5,24 +5,24 @@ import 'dart:async';
import 'package:collection/collection.dart'; import 'package:collection/collection.dart';
import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
import 'package:immich_mobile/domain/utils/cancel.exception.dart';
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
import 'package:openapi/api.dart'; import 'package:openapi/api.dart';
import 'package:worker_manager/worker_manager.dart';
class SyncStreamService { class SyncStreamService {
final Logger _logger = Logger('SyncStreamService'); final Logger _logger = Logger('SyncStreamService');
final ISyncApiRepository _syncApiRepository; final ISyncApiRepository _syncApiRepository;
final ISyncStreamRepository _syncStreamRepository; final ISyncStreamRepository _syncStreamRepository;
final Completer<bool>? _cancelCompleter; final bool Function()? _cancelChecker;
SyncStreamService({ SyncStreamService({
required ISyncApiRepository syncApiRepository, required ISyncApiRepository syncApiRepository,
required ISyncStreamRepository syncStreamRepository, required ISyncStreamRepository syncStreamRepository,
Completer<bool>? cancelCompleter, bool Function()? cancelChecker,
}) : _syncApiRepository = syncApiRepository, }) : _syncApiRepository = syncApiRepository,
_syncStreamRepository = syncStreamRepository, _syncStreamRepository = syncStreamRepository,
_cancelCompleter = cancelCompleter; _cancelChecker = cancelChecker;
Future<bool> _handleSyncData( Future<bool> _handleSyncData(
SyncEntityType type, SyncEntityType type,
@ -61,7 +61,7 @@ class SyncStreamService {
return false; return false;
} }
Future<void> _syncEvent(List<SyncRequestType> types) async { Future<void> _syncEvent(List<SyncRequestType> types) {
_logger.info("Syncing Events: $types"); _logger.info("Syncing Events: $types");
final streamCompleter = Completer(); final streamCompleter = Completer();
bool shouldComplete = false; bool shouldComplete = false;
@ -70,101 +70,111 @@ class SyncStreamService {
// before the events are processed and also that events are processed sequentially // before the events are processed and also that events are processed sequentially
Completer? mutex; Completer? mutex;
StreamSubscription? subscription; StreamSubscription? subscription;
subscription = _syncApiRepository.getSyncEvents(types).listen( try {
(events) async { subscription = _syncApiRepository.getSyncEvents(types).listen(
if (events.isEmpty) { (events) async {
_logger.warning("Received empty sync events"); if (events.isEmpty) {
return; _logger.warning("Received empty sync events");
} return;
// If previous events are still being processed, wait for them to finish
if (mutex != null) {
await mutex!.future;
}
if (_cancelCompleter?.isCompleted ?? false) {
_logger.info("Sync cancelled, stopping stream");
subscription?.cancel();
if (!streamCompleter.isCompleted) {
streamCompleter.completeError(
const CancelException(),
StackTrace.current,
);
} }
return;
}
// Take control of the mutex and process the events // If previous events are still being processed, wait for them to finish
mutex = Completer(); if (mutex != null) {
await mutex!.future;
}
try { if (_cancelChecker?.call() ?? false) {
final eventsMap = events.groupListsBy((event) => event.type); _logger.info("Sync cancelled, stopping stream");
final Map<SyncEntityType, String> acks = {}; subscription?.cancel();
if (!streamCompleter.isCompleted) {
streamCompleter.completeError(
CanceledError(),
StackTrace.current,
);
}
return;
}
for (final entry in eventsMap.entries) { // Take control of the mutex and process the events
if (_cancelCompleter?.isCompleted ?? false) { mutex = Completer();
_logger.info("Sync cancelled, stopping stream");
mutex?.complete(); try {
mutex = null; final eventsMap = events.groupListsBy((event) => event.type);
if (!streamCompleter.isCompleted) { final Map<SyncEntityType, String> acks = {};
streamCompleter.completeError(
const CancelException(), for (final entry in eventsMap.entries) {
StackTrace.current, if (_cancelChecker?.call() ?? false) {
); _logger.info("Sync cancelled, stopping stream");
mutex?.complete();
mutex = null;
if (!streamCompleter.isCompleted) {
streamCompleter.completeError(
CanceledError(),
StackTrace.current,
);
}
return;
} }
return; final type = entry.key;
final data = entry.value;
if (data.isEmpty) {
_logger.warning("Received empty sync events for $type");
continue;
}
if (await _handleSyncData(type, data.map((e) => e.data))) {
// ignore: avoid-unsafe-collection-methods
acks[type] = data.last.ack;
} else {
_logger.warning("Failed to handle sync events for $type");
}
} }
final type = entry.key; if (acks.isNotEmpty) {
final data = entry.value; await _syncApiRepository.ack(acks.values.toList());
if (data.isEmpty) {
_logger.warning("Received empty sync events for $type");
continue;
}
if (await _handleSyncData(type, data.map((e) => e.data))) {
// ignore: avoid-unsafe-collection-methods
acks[type] = data.last.ack;
} else {
_logger.warning("Failed to handle sync events for $type");
} }
_logger.info("$types events processed");
} catch (error, stack) {
_logger.warning("Error handling sync events", error, stack);
} finally {
mutex?.complete();
mutex = null;
} }
if (acks.isNotEmpty) { if (shouldComplete) {
await _syncApiRepository.ack(acks.values.toList()); _logger.info("Sync done, completing stream");
if (!streamCompleter.isCompleted) streamCompleter.complete();
} }
_logger.info("$types events processed"); },
} catch (error, stack) { onError: (error, stack) {
_logger.warning("Error handling sync events", error, stack); _logger.warning("Error in sync stream for $types", error, stack);
} finally { // Do not proceed if the stream errors
mutex?.complete(); if (!streamCompleter.isCompleted) {
mutex = null; // ignore: avoid-missing-completer-stack-trace
} streamCompleter.completeError(error, stack);
}
if (shouldComplete) { },
_logger.info("Sync done, completing stream"); onDone: () {
if (!streamCompleter.isCompleted) streamCompleter.complete(); _logger.info("$types stream done");
} if (mutex == null && !streamCompleter.isCompleted) {
}, streamCompleter.complete();
onError: (error, stack) { } else {
_logger.warning("Error in sync stream for $types", error, stack); // Marks the stream as done but does not complete the completer
// Do not proceed if the stream errors // until the events are processed
if (!streamCompleter.isCompleted) streamCompleter.complete(); shouldComplete = true;
}, }
onDone: () { },
_logger.info("$types stream done"); );
if (mutex == null && !streamCompleter.isCompleted) { } catch (error, stack) {
streamCompleter.complete(); _logger.severe("Error starting sync stream", error, stack);
} else { if (!streamCompleter.isCompleted) {
// Marks the stream as done but does not complete the completer streamCompleter.completeError(error, stack);
// until the events are processed }
shouldComplete = true; }
} return streamCompleter.future.whenComplete(() {
},
);
return await streamCompleter.future.whenComplete(() {
_logger.info("Sync stream completed"); _logger.info("Sync stream completed");
return subscription?.cancel(); return subscription?.cancel();
}); });

View File

@ -2,41 +2,30 @@
import 'dart:async'; import 'dart:async';
import 'package:async/async.dart';
import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
import 'package:immich_mobile/providers/infrastructure/sync_stream.provider.dart'; import 'package:immich_mobile/providers/infrastructure/sync_stream.provider.dart';
import 'package:immich_mobile/utils/isolate.dart'; import 'package:immich_mobile/utils/isolate.dart';
import 'package:worker_manager/worker_manager.dart';
class BackgroundSyncManager { class BackgroundSyncManager {
CancelableOperation<void>? _userSyncFuture; Cancelable<void>? _userSyncTask;
BackgroundSyncManager(); BackgroundSyncManager();
Future<void> cancel() async { void cancel() {
await _userSyncFuture?.cancel(); _userSyncTask?.cancel();
_userSyncFuture = null; _userSyncTask = null;
} }
Future<void> syncUsers() async { Future<void> syncUsers() async {
if (_userSyncFuture != null) { if (_userSyncTask != null) {
return _userSyncFuture!.valueOrCancellation(); return _userSyncTask!.future;
} }
if (_userSyncFuture == null) { _userSyncTask = runInIsolateGentle(
final isolate = await IsolateManager.spawn( computation: (ref) => ref.read(syncStreamServiceProvider).syncUsers(),
computation: (ref) => ref.read(syncStreamServiceProvider).syncUsers(), );
onCancel: (ref) => ref.read(cancellationProvider).complete(true), _userSyncTask!.whenComplete(() {
); _userSyncTask = null;
_userSyncFuture = CancelableOperation.fromFuture( });
isolate.run(),
onCancel: () {
isolate.cancel();
_userSyncFuture = null;
},
);
return _userSyncFuture!
.valueOrCancellation()
.then((_) => _userSyncFuture = null);
}
} }
} }

View File

@ -1,8 +0,0 @@
class CancelException implements Exception {
final String message;
const CancelException([this.message = "Operation was cancelled."]);
@override
String toString() => "CancelException: $message";
}

View File

@ -11,6 +11,7 @@ import 'package:flutter_displaymode/flutter_displaymode.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/constants/locales.dart'; import 'package:immich_mobile/constants/locales.dart';
import 'package:immich_mobile/extensions/build_context_extensions.dart'; import 'package:immich_mobile/extensions/build_context_extensions.dart';
import 'package:immich_mobile/generated/codegen_loader.g.dart';
import 'package:immich_mobile/providers/app_life_cycle.provider.dart'; import 'package:immich_mobile/providers/app_life_cycle.provider.dart';
import 'package:immich_mobile/providers/asset_viewer/share_intent_upload.provider.dart'; import 'package:immich_mobile/providers/asset_viewer/share_intent_upload.provider.dart';
import 'package:immich_mobile/providers/db.provider.dart'; import 'package:immich_mobile/providers/db.provider.dart';
@ -31,13 +32,15 @@ import 'package:immich_mobile/utils/migration.dart';
import 'package:intl/date_symbol_data_local.dart'; import 'package:intl/date_symbol_data_local.dart';
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
import 'package:timezone/data/latest.dart'; import 'package:timezone/data/latest.dart';
import 'package:immich_mobile/generated/codegen_loader.g.dart'; import 'package:worker_manager/worker_manager.dart';
void main() async { void main() async {
ImmichWidgetsBinding(); ImmichWidgetsBinding();
final db = await Bootstrap.initIsar(); final db = await Bootstrap.initIsar();
await Bootstrap.initDomain(db); await Bootstrap.initDomain(db);
await initApp(); await initApp();
// Warm-up isolate pool for worker manager
await workerManager.init(dynamicSpawning: true);
await migrateDatabaseIfNeeded(db); await migrateDatabaseIfNeeded(db);
HttpOverrides.global = HttpSSLCertOverride(); HttpOverrides.global = HttpSSLCertOverride();

View File

@ -1,10 +1,8 @@
import 'dart:async';
import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/domain/utils/background_sync.dart'; import 'package:immich_mobile/domain/utils/background_sync.dart';
final backgroundSyncProvider = Provider<BackgroundSyncManager>((ref) { final backgroundSyncProvider = Provider<BackgroundSyncManager>((ref) {
final manager = BackgroundSyncManager(); final manager = BackgroundSyncManager();
ref.onDispose(() => unawaited(manager.cancel())); ref.onDispose(manager.cancel);
return manager; return manager;
}); });

View File

@ -1,10 +1,8 @@
import 'dart:async';
import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart';
/// Provider holding a boolean notifier that becomes true when cancellation is requested. /// Provider holding a boolean function that returns true when cancellation is requested.
/// A computation running in the isolate use the completer to implement cooperative cancellation. /// A computation running in the isolate uses the function to implement cooperative cancellation.
final cancellationProvider = Provider<Completer<bool>>( final cancellationProvider = Provider<bool Function()>(
// This will be overridden in the isolate's container. // This will be overridden in the isolate's container.
// Throwing ensures it's not used without an override. // Throwing ensures it's not used without an override.
(ref) => throw UnimplementedError( (ref) => throw UnimplementedError(

View File

@ -10,7 +10,7 @@ final syncStreamServiceProvider = Provider(
(ref) => SyncStreamService( (ref) => SyncStreamService(
syncApiRepository: ref.watch(syncApiRepositoryProvider), syncApiRepository: ref.watch(syncApiRepositoryProvider),
syncStreamRepository: ref.watch(syncStreamRepositoryProvider), syncStreamRepository: ref.watch(syncStreamRepositoryProvider),
cancelCompleter: ref.watch(cancellationProvider), cancelChecker: ref.watch(cancellationProvider),
), ),
); );

View File

@ -121,8 +121,8 @@ class AuthService {
/// ///
/// All deletions are executed in parallel using [Future.wait]. /// All deletions are executed in parallel using [Future.wait].
Future<void> clearLocalData() { Future<void> clearLocalData() {
_backgroundSyncManager.cancel();
return Future.wait([ return Future.wait([
_backgroundSyncManager.cancel(),
_authRepository.clearLocalData(), _authRepository.clearLocalData(),
Store.delete(StoreKey.currentUser), Store.delete(StoreKey.currentUser),
Store.delete(StoreKey.accessToken), Store.delete(StoreKey.accessToken),

View File

@ -1,10 +1,6 @@
// ignore_for_file: avoid-missing-completer-stack-trace
import 'dart:async'; import 'dart:async';
import 'dart:isolate';
import 'dart:ui'; import 'dart:ui';
import 'package:flutter/foundation.dart';
import 'package:flutter/services.dart'; import 'package:flutter/services.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/providers/db.provider.dart'; import 'package:immich_mobile/providers/db.provider.dart';
@ -12,390 +8,62 @@ import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
import 'package:immich_mobile/utils/bootstrap.dart'; import 'package:immich_mobile/utils/bootstrap.dart';
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
import 'package:worker_manager/worker_manager.dart';
/// Exception thrown when IsolateManager is used incorrectly from a non-root isolate.
class InvalidIsolateUsageException implements Exception { class InvalidIsolateUsageException implements Exception {
const InvalidIsolateUsageException(); const InvalidIsolateUsageException();
@override @override
String toString() => String toString() =>
"IsolateManager should only be used from the root isolate"; "IsolateHelper should only be used from the root isolate";
} }
/// Exception thrown when an isolate computation is cancelled via [IsolateManager.cancel]. // !! Should be used only from the root isolate
class IsolateCancelledException implements Exception { Cancelable<T?> runInIsolateGentle<T>({
final String? debugLabel; required Future<T> Function(ProviderContainer ref) computation,
const IsolateCancelledException([this.debugLabel]); String? debugLabel,
}) {
@override final token = RootIsolateToken.instance;
String toString() => if (token == null) {
"Isolate computation${debugLabel == null ? '' : ' ($debugLabel)'} was cancelled."; throw const InvalidIsolateUsageException();
}
/// Signature for the computation function to be executed in the isolate.
/// It receives a Riverpod [ProviderContainer] for dependency injection.
typedef Computation<T> = FutureOr<T> Function(ProviderContainer ref);
enum _IsolateCommand {
cancel,
start,
}
/// Manages running a [Computation] in a separate isolate with Riverpod support
/// and optional cancellation.
///
/// Similar to [compute] or [Isolate.run], but provides:
/// - A pre-configured Riverpod [ProviderContainer] to the computation.
/// - An optional `onCancel` computation for cleanup.
/// - A `cancel()` method to attempt cancellation.
class IsolateManager<T> {
final SendPort _sendPort;
final ReceivePort _receivePort;
final Completer<T?> _completer = Completer<T?>();
final String? _debugLabel;
StreamSubscription? _subscription;
bool _isClosed = false;
bool _isRunning = false;
IsolateManager._(this._sendPort, this._receivePort, this._debugLabel) {
_subscription = _receivePort.listen(
_handleResponseFromIsolate,
onDone: () {
// If the port closes before the completer finishes, signal an error.
if (_completer.isCompleted) {
return;
}
_completer.completeError(
RemoteError(
"Isolate terminated unexpectedly${_debugLabel == null ? '' : ' ($_debugLabel)'}",
"",
),
StackTrace.empty,
);
_isClosed = true;
},
);
} }
/// Spawns a new isolate and prepares it to run the [computation]. return workerManager.executeGentle((cancelledChecker) async {
/// BackgroundIsolateBinaryMessenger.ensureInitialized(token);
/// - [computation]: The function to run in the isolate.
/// - [onCancel]: An optional function to run if `cancel()` is called before the computation completes.
/// - [debugLabel]: An optional label for debugging.
///
/// Must be called from the root isolate.
static Future<IsolateManager<T>> spawn<T>({
required Computation<T> computation,
Computation<T>? onCancel,
String? debugLabel,
}) async {
final Logger logger = Logger("IsolateManager");
// Port for receiving the [SendPort] from the new isolate.
final initPort = RawReceivePort();
// Completer to synchronize the establishment of the main communication channel.
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
if (initialMessage == null) {
// onExit handler message, isolate terminated without sending a SendPort.
initPort.close();
connection.completeError(
RemoteError(
"Isolate exited unexpectedly during initialization${debugLabel == null ? '' : ' ($debugLabel)'}",
"",
),
StackTrace.empty,
);
return;
}
// The first message should be the SendPort for commands.
final commandPort = initialMessage as SendPort;
connection.complete(
(ReceivePort.fromRawReceivePort(initPort), commandPort),
);
};
final token = RootIsolateToken.instance;
if (token == null) {
initPort.close(); // Clean up before throwing
throw const InvalidIsolateUsageException();
}
try {
await Isolate.spawn(
_IsolateRunner._execute,
_IsolateRunner(
computation: computation,
sendPort: initPort.sendPort,
token: token,
onCancel: onCancel,
debugLabel: debugLabel,
),
errorsAreFatal: true,
onExit: initPort.sendPort, // Send null on exit
onError: initPort.sendPort, // Errors during spawn sent here
debugName: debugLabel,
);
} catch (e, s) {
logger.warning(
"Failed to spawn isolate${debugLabel == null ? '' : ' - $debugLabel'}",
e,
s,
);
initPort.close();
rethrow;
}
try {
final (ReceivePort receivePort, SendPort sendPort) =
await connection.future;
return IsolateManager._(sendPort, receivePort, debugLabel);
} catch (e) {
// If connection.future completed with an error (e.g., isolate exited early)
logger.warning(
"Isolate connection failed${debugLabel == null ? '' : ' - $debugLabel'}",
e,
);
// initPort is closed by the handler in case of error/null message
rethrow;
}
}
/// Starts the computation in the isolate.
///
/// Returns a future that completes with the result of the computation,
/// or an error if the computation fails or is cancelled.
Future<T?> run() {
if (_isClosed) {
return Future.error(StateError("IsolateManager is already closed"));
}
if (_isRunning) {
return Future.error(
StateError("Isolate computation is already running"),
);
}
_isRunning = true;
_sendPort.send(_IsolateCommand.start);
return _completer.future;
}
/// Attempts to cancel the computation.
///
/// If the computation has not yet completed, this will cause the future
/// returned by [run] to complete with an [IsolateCancelledException].
/// An optional `onCancel` computation might be executed in the isolate.
///
/// Does nothing if the computation has already completed or the manager is closed.
void cancel() {
if (_isClosed || _completer.isCompleted) {
return;
}
if (!_isRunning) {
_close(IsolateCancelledException(_debugLabel));
return;
}
// If running, send cancel command. The isolate will handle sending back the error.
_sendPort.send(_IsolateCommand.cancel);
}
/// Closes communication channels and completes the future with the given error.
void _close([Object? error, StackTrace? stackTrace]) {
if (_isClosed) return;
_isClosed = true;
unawaited(_subscription?.cancel());
_receivePort.close();
if (!_completer.isCompleted) {
if (error == null) {
_completer.completeError(
StateError("IsolateManager closed without result or error."),
StackTrace.empty,
);
} else {
_completer.completeError(error, stackTrace);
}
}
}
/// Handles messages received from the isolate.
void _handleResponseFromIsolate(Object? response) {
if (_isClosed) return;
// Expect list: [result] or [error, stackTrace] or [IsolateCancelledException]
final list = response as List<Object?>;
Object? error;
StackTrace? stackTrace;
if (list.length == 2) {
error = list.firstOrNull;
final remoteStack = list.elementAtOrNull(1);
if (remoteStack is StackTrace) {
stackTrace = remoteStack;
} else if (error is String && remoteStack is String?) {
// Reconstruct RemoteError if possible
error = RemoteError(error, remoteStack ?? "");
stackTrace = (error as RemoteError).stackTrace;
}
} else if (list.length == 1) {
final result = list.firstOrNull;
if (result is IsolateCancelledException) {
error = result;
} else {
// Success case
if (!_completer.isCompleted) {
_completer.complete(result as T?);
}
_close();
return;
}
} else {
error = RemoteError(
"Invalid message format from isolate",
response.toString(),
);
}
// If we reached here, it's an error or cancellation
_close(error, stackTrace);
}
}
/// Internal helper class that runs within the isolate.
class _IsolateRunner<T> {
final Computation<T> _computation;
final Computation<T>? _onCancel;
final String? _debugLabel;
final RootIsolateToken _token;
final SendPort _sendPort; // Port to send results/errors back to main
ReceivePort? _receivePort; // Port to receive commands from main
bool _cancelled = false;
bool _computationStarted = false;
ProviderContainer? _ref; // Hold ref for cleanup
_IsolateRunner({
required Computation<T> computation,
required SendPort sendPort,
required RootIsolateToken token,
Computation<T>? onCancel,
String? debugLabel,
}) : _computation = computation,
_sendPort = sendPort,
_token = token,
_onCancel = onCancel,
_debugLabel = debugLabel;
/// Entry point for the isolate.
static void _execute(_IsolateRunner<Object?> runner) {
runner._start();
}
/// Initializes the isolate environment and listens for commands.
void _start() {
BackgroundIsolateBinaryMessenger.ensureInitialized(_token);
DartPluginRegistrant.ensureInitialized(); DartPluginRegistrant.ensureInitialized();
_receivePort = ReceivePort(); final db = await Bootstrap.initIsar();
_sendPort.send(_receivePort!.sendPort); await Bootstrap.initDomain(db, shouldBufferLogs: false);
final ref = ProviderContainer(
overrides: [
// TODO: Remove once isar is removed
dbProvider.overrideWithValue(db),
isarProvider.overrideWithValue(db),
cancellationProvider.overrideWithValue(cancelledChecker),
],
);
// Listen for commands from the main isolate. Logger log = Logger("IsolateLogger");
// ignore: avoid-passing-async-when-sync-expected
final _ = _receivePort!.listen((message) async {
if (_cancelled) return;
switch (message) {
case _IsolateCommand.cancel:
_cancelled = true;
// Run onCancel if computation hasn't started or finished
if (_onCancel == null) {
// Close the receive port and exit when no cleanup is needed
_sendPort.send([IsolateCancelledException(_debugLabel)]);
_cleanupAndExit();
} else {
await _runCleanup(_onCancel);
}
break;
case _IsolateCommand.start:
if (_computationStarted) return; // Ignore duplicate start
_computationStarted = true;
await _exec();
_cleanupAndExit();
break;
}
});
}
/// Executes the main computation.
Future<void> _exec() async {
Logger log = Logger("IsolateRunner");
try { try {
_ref = await _bootstrap(); return await computation(ref);
if (_cancelled) return; } on CanceledError {
log.warning(
final potentiallyAsyncResult = _computation(_ref!); "Computation cancelled ${debugLabel == null ? '' : ' for $debugLabel'}",
final T result; );
if (potentiallyAsyncResult is Future<T>) {
result = await potentiallyAsyncResult;
} else {
result = potentiallyAsyncResult;
}
if (_cancelled) return;
_sendPort.send([result]);
} catch (error, stack) { } catch (error, stack) {
if (_cancelled) return;
log.severe( log.severe(
"Error in computation${_debugLabel == null ? '' : ' ($_debugLabel)'}", "Error in runInIsolateGentle ${debugLabel == null ? '' : ' for $debugLabel'}",
error, error,
stack, stack,
); );
_sendPort.send([error, stack]);
}
// Cleanup happens in _cleanupAndExit called by the listener
}
/// Executes the onCancel computation.
Future<void> _runCleanup(Computation<T> cleanupFunc) async {
Logger log = Logger("IsolateRunner");
try {
if (_ref == null) {
log.warning("IsolateRunner cleanup called without ref");
return;
}
await cleanupFunc(_ref!);
} catch (e, s) {
log.warning("Error during isolate onCancel cleanup", e, s);
// Don't send this error back, primary goal is cancellation signal
}
}
Future<ProviderContainer> _bootstrap() async {
final db = await Bootstrap.initIsar();
await Bootstrap.initDomain(db, shouldBufferLogs: false);
return ProviderContainer(
overrides: [
dbProvider.overrideWithValue(db),
isarProvider.overrideWithValue(db),
cancellationProvider.overrideWithValue(Completer()),
],
);
}
/// Closes resources and the receive port.
Future<void> _cleanupAndExit() async {
// Always close the db connections and dispose ref if created
try {
await _ref?.read(driftProvider).close();
await _ref?.read(isarProvider).close();
_ref?.dispose();
} catch (e) {
if (kDebugMode) {
print("Error during resource cleanup: $e");
}
} finally { } finally {
_receivePort?.close(); // Wait for the logs to flush
await Future.delayed(const Duration(seconds: 2));
// Always close the new db connection on Isolate end
ref.read(driftProvider).close();
ref.read(isarProvider).close();
} }
} return null;
});
} }

View File

@ -1933,6 +1933,14 @@ packages:
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "0.0.3" version: "0.0.3"
worker_manager:
dependency: "direct main"
description:
name: worker_manager
sha256: "086ed63e9b36266e851404ca90fd44e37c0f4c9bbf819e5f8d7c87f9741c0591"
url: "https://pub.dev"
source: hosted
version: "7.2.3"
xdg_directories: xdg_directories:
dependency: transitive dependency: transitive
description: description:

View File

@ -62,6 +62,7 @@ dependencies:
url_launcher: ^6.3.1 url_launcher: ^6.3.1
uuid: ^4.5.1 uuid: ^4.5.1
wakelock_plus: ^1.2.10 wakelock_plus: ^1.2.10
worker_manager: ^7.2.3
native_video_player: native_video_player:
git: git:

View File

@ -7,13 +7,21 @@ import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
import 'package:immich_mobile/domain/models/sync_event.model.dart'; import 'package:immich_mobile/domain/models/sync_event.model.dart';
import 'package:immich_mobile/domain/services/sync_stream.service.dart'; import 'package:immich_mobile/domain/services/sync_stream.service.dart';
import 'package:immich_mobile/domain/utils/cancel.exception.dart';
import 'package:mocktail/mocktail.dart'; import 'package:mocktail/mocktail.dart';
import 'package:openapi/api.dart'; import 'package:openapi/api.dart';
import 'package:worker_manager/worker_manager.dart';
import '../../fixtures/sync_stream.stub.dart'; import '../../fixtures/sync_stream.stub.dart';
import '../../infrastructure/repository.mock.dart'; import '../../infrastructure/repository.mock.dart';
class _CancellationWrapper {
const _CancellationWrapper();
bool isCancelled() => false;
}
class _MockCancellationWrapper extends Mock implements _CancellationWrapper {}
void main() { void main() {
late SyncStreamService sut; late SyncStreamService sut;
late ISyncStreamRepository mockSyncStreamRepo; late ISyncStreamRepository mockSyncStreamRepo;
@ -96,7 +104,7 @@ void main() {
when(() => mockSyncApiRepo.getSyncEvents(any())) when(() => mockSyncApiRepo.getSyncEvents(any()))
.thenAnswer((_) => Stream.error(Exception("Stream Error"))); .thenAnswer((_) => Stream.error(Exception("Stream Error")));
// Should complete gracefully without throwing // Should complete gracefully without throwing
await expectLater(sut.syncUsers(), completes); await expectLater(sut.syncUsers(), throwsException);
verifyNever(() => mockSyncApiRepo.ack(any())); // No ack on stream error verifyNever(() => mockSyncApiRepo.ack(any())); // No ack on stream error
}); });
@ -252,13 +260,15 @@ void main() {
}); });
test( test(
"stops processing and ack when cancel completer is completed", "stops processing and ack when cancel checker is completed",
() async { () async {
final cancelCompleter = Completer<bool>(); final cancellationChecker = _MockCancellationWrapper();
when(() => cancellationChecker.isCancelled()).thenAnswer((_) => false);
sut = SyncStreamService( sut = SyncStreamService(
syncApiRepository: mockSyncApiRepo, syncApiRepository: mockSyncApiRepo,
syncStreamRepository: mockSyncStreamRepo, syncStreamRepository: mockSyncStreamRepo,
cancelCompleter: cancelCompleter, cancelChecker: cancellationChecker.isCancelled,
); );
final processingCompleter = Completer<void>(); final processingCompleter = Completer<void>();
@ -282,7 +292,7 @@ void main() {
expect(handlerStarted, isTrue, reason: "Handler should have started"); expect(handlerStarted, isTrue, reason: "Handler should have started");
cancelCompleter.complete(true); // Signal cancellation when(() => cancellationChecker.isCancelled()).thenAnswer((_) => true);
// Allow cancellation logic to propagate // Allow cancellation logic to propagate
await Future.delayed(const Duration(milliseconds: 10)); await Future.delayed(const Duration(milliseconds: 10));
@ -291,7 +301,7 @@ void main() {
// to ensure the cancellation logic itself isn't blocked by the handler. // to ensure the cancellation logic itself isn't blocked by the handler.
processingCompleter.complete(); processingCompleter.complete();
await expectLater(syncFuture, throwsA(isA<CancelException>())); await expectLater(syncFuture, throwsA(isA<CanceledError>()));
// Verify that ack was NOT called because processing was cancelled // Verify that ack was NOT called because processing was cancelled
verifyNever(() => mockSyncApiRepo.ack(any())); verifyNever(() => mockSyncApiRepo.ack(any()));

View File

@ -120,7 +120,7 @@ void main() {
group('logout', () { group('logout', () {
test('Should logout user', () async { test('Should logout user', () async {
when(() => authApiRepository.logout()).thenAnswer((_) async => {}); when(() => authApiRepository.logout()).thenAnswer((_) async => {});
when(() => backgroundSyncManager.cancel()).thenAnswer((_) async => {}); when(() => backgroundSyncManager.cancel()).thenAnswer((_) => {});
when(() => authRepository.clearLocalData()) when(() => authRepository.clearLocalData())
.thenAnswer((_) => Future.value(null)); .thenAnswer((_) => Future.value(null));
@ -134,7 +134,7 @@ void main() {
test('Should clear local data even on server error', () async { test('Should clear local data even on server error', () async {
when(() => authApiRepository.logout()) when(() => authApiRepository.logout())
.thenThrow(Exception('Server error')); .thenThrow(Exception('Server error'));
when(() => backgroundSyncManager.cancel()).thenAnswer((_) async => {}); when(() => backgroundSyncManager.cancel()).thenAnswer((_) => {});
when(() => authRepository.clearLocalData()) when(() => authRepository.clearLocalData())
.thenAnswer((_) => Future.value(null)); .thenAnswer((_) => Future.value(null));