rewrite isolate manager

This commit is contained in:
shenlong-tanwen 2025-04-15 20:05:05 +05:30
parent 52cca7afa0
commit deca45a008
13 changed files with 598 additions and 98 deletions

View File

@ -5,6 +5,7 @@ import 'dart:async';
import 'package:collection/collection.dart'; import 'package:collection/collection.dart';
import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
import 'package:immich_mobile/domain/utils/cancel.exception.dart';
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
import 'package:openapi/api.dart'; import 'package:openapi/api.dart';
@ -13,12 +14,15 @@ class SyncStreamService {
final ISyncApiRepository _syncApiRepository; final ISyncApiRepository _syncApiRepository;
final ISyncStreamRepository _syncStreamRepository; final ISyncStreamRepository _syncStreamRepository;
final Completer<bool>? _cancelCompleter;
SyncStreamService({ SyncStreamService({
required ISyncApiRepository syncApiRepository, required ISyncApiRepository syncApiRepository,
required ISyncStreamRepository syncStreamRepository, required ISyncStreamRepository syncStreamRepository,
Completer<bool>? cancelCompleter,
}) : _syncApiRepository = syncApiRepository, }) : _syncApiRepository = syncApiRepository,
_syncStreamRepository = syncStreamRepository; _syncStreamRepository = syncStreamRepository,
_cancelCompleter = cancelCompleter;
Future<bool> _handleSyncData( Future<bool> _handleSyncData(
SyncEntityType type, SyncEntityType type,
@ -65,7 +69,8 @@ class SyncStreamService {
// the following flag ensures that the onDone callback is not called // the following flag ensures that the onDone callback is not called
// before the events are processed and also that events are processed sequentially // before the events are processed and also that events are processed sequentially
Completer? mutex; Completer? mutex;
final subscription = _syncApiRepository.getSyncEvents(types).listen( StreamSubscription? subscription;
subscription = _syncApiRepository.getSyncEvents(types).listen(
(events) async { (events) async {
if (events.isEmpty) { if (events.isEmpty) {
_logger.warning("Received empty sync events"); _logger.warning("Received empty sync events");
@ -77,6 +82,18 @@ class SyncStreamService {
await mutex!.future; await mutex!.future;
} }
if (_cancelCompleter?.isCompleted ?? false) {
_logger.info("Sync cancelled, stopping stream");
subscription?.cancel();
if (!streamCompleter.isCompleted) {
streamCompleter.completeError(
const CancelException(),
StackTrace.current,
);
}
return;
}
// Take control of the mutex and process the events // Take control of the mutex and process the events
mutex = Completer(); mutex = Completer();
@ -85,6 +102,20 @@ class SyncStreamService {
final Map<SyncEntityType, String> acks = {}; final Map<SyncEntityType, String> acks = {};
for (final entry in eventsMap.entries) { for (final entry in eventsMap.entries) {
if (_cancelCompleter?.isCompleted ?? false) {
_logger.info("Sync cancelled, stopping stream");
mutex?.complete();
mutex = null;
if (!streamCompleter.isCompleted) {
streamCompleter.completeError(
const CancelException(),
StackTrace.current,
);
}
return;
}
final type = entry.key; final type = entry.key;
final data = entry.value; final data = entry.value;
@ -133,7 +164,10 @@ class SyncStreamService {
} }
}, },
); );
return await streamCompleter.future.whenComplete(subscription.cancel); return await streamCompleter.future.whenComplete(() {
_logger.info("Sync stream completed");
return subscription?.cancel();
});
} }
Future<void> syncUsers() => Future<void> syncUsers() =>

View File

@ -0,0 +1,42 @@
// ignore_for_file: avoid-passing-async-when-sync-expected
import 'dart:async';
import 'package:async/async.dart';
import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
import 'package:immich_mobile/providers/infrastructure/sync_stream.provider.dart';
import 'package:immich_mobile/utils/isolate.dart';
class BackgroundSyncManager {
CancelableOperation<void>? _userSyncFuture;
BackgroundSyncManager();
Future<void> cancel() async {
await _userSyncFuture?.cancel();
_userSyncFuture = null;
}
Future<void> syncUsers() async {
if (_userSyncFuture != null) {
return _userSyncFuture!.valueOrCancellation();
}
if (_userSyncFuture == null) {
final isolate = await IsolateManager.spawn(
computation: (ref) => ref.read(syncStreamServiceProvider).syncUsers(),
onCancel: (ref) => ref.read(cancellationProvider).complete(true),
);
_userSyncFuture = CancelableOperation.fromFuture(
isolate.run(),
onCancel: () {
isolate.cancel();
_userSyncFuture = null;
},
);
return _userSyncFuture!
.valueOrCancellation()
.then((_) => _userSyncFuture = null);
}
}
}

View File

@ -0,0 +1,8 @@
class CancelException implements Exception {
final String message;
const CancelException([this.message = "Operation was cancelled."]);
@override
String toString() => "CancelException: $message";
}

View File

@ -1,6 +1,10 @@
import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'dart:async';
import 'package:immich_mobile/utils/background_sync.dart';
final backgroundSyncProvider = Provider<BackgroundSyncManager>( import 'package:hooks_riverpod/hooks_riverpod.dart';
(ref) => BackgroundSyncManager(), import 'package:immich_mobile/domain/utils/background_sync.dart';
);
final backgroundSyncProvider = Provider<BackgroundSyncManager>((ref) {
final manager = BackgroundSyncManager();
ref.onDispose(() => unawaited(manager.cancel()));
return manager;
});

View File

@ -0,0 +1,14 @@
import 'dart:async';
import 'package:hooks_riverpod/hooks_riverpod.dart';
/// Provider holding a boolean notifier that becomes true when cancellation is requested.
/// A computation running in the isolate use the completer to implement cooperative cancellation.
final cancellationProvider = Provider<Completer<bool>>(
// This will be overridden in the isolate's container.
// Throwing ensures it's not used without an override.
(ref) => throw UnimplementedError(
"cancellationProvider must be overridden in the isolate's ProviderContainer and not to be used in the root isolate",
),
name: 'cancellationProvider',
);

View File

@ -3,12 +3,14 @@ import 'package:immich_mobile/domain/services/sync_stream.service.dart';
import 'package:immich_mobile/infrastructure/repositories/sync_api.repository.dart'; import 'package:immich_mobile/infrastructure/repositories/sync_api.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/sync_stream.repository.dart'; import 'package:immich_mobile/infrastructure/repositories/sync_stream.repository.dart';
import 'package:immich_mobile/providers/api.provider.dart'; import 'package:immich_mobile/providers/api.provider.dart';
import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
final syncStreamServiceProvider = Provider( final syncStreamServiceProvider = Provider(
(ref) => SyncStreamService( (ref) => SyncStreamService(
syncApiRepository: ref.watch(syncApiRepositoryProvider), syncApiRepository: ref.watch(syncApiRepositoryProvider),
syncStreamRepository: ref.watch(syncStreamRepositoryProvider), syncStreamRepository: ref.watch(syncStreamRepositoryProvider),
cancelCompleter: ref.watch(cancellationProvider),
), ),
); );

View File

@ -3,12 +3,14 @@ import 'dart:io';
import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/domain/models/store.model.dart'; import 'package:immich_mobile/domain/models/store.model.dart';
import 'package:immich_mobile/domain/utils/background_sync.dart';
import 'package:immich_mobile/entities/store.entity.dart'; import 'package:immich_mobile/entities/store.entity.dart';
import 'package:immich_mobile/interfaces/auth.interface.dart'; import 'package:immich_mobile/interfaces/auth.interface.dart';
import 'package:immich_mobile/interfaces/auth_api.interface.dart'; import 'package:immich_mobile/interfaces/auth_api.interface.dart';
import 'package:immich_mobile/models/auth/auxilary_endpoint.model.dart'; import 'package:immich_mobile/models/auth/auxilary_endpoint.model.dart';
import 'package:immich_mobile/models/auth/login_response.model.dart'; import 'package:immich_mobile/models/auth/login_response.model.dart';
import 'package:immich_mobile/providers/api.provider.dart'; import 'package:immich_mobile/providers/api.provider.dart';
import 'package:immich_mobile/providers/background_sync.provider.dart';
import 'package:immich_mobile/repositories/auth.repository.dart'; import 'package:immich_mobile/repositories/auth.repository.dart';
import 'package:immich_mobile/repositories/auth_api.repository.dart'; import 'package:immich_mobile/repositories/auth_api.repository.dart';
import 'package:immich_mobile/services/api.service.dart'; import 'package:immich_mobile/services/api.service.dart';
@ -22,6 +24,7 @@ final authServiceProvider = Provider(
ref.watch(authRepositoryProvider), ref.watch(authRepositoryProvider),
ref.watch(apiServiceProvider), ref.watch(apiServiceProvider),
ref.watch(networkServiceProvider), ref.watch(networkServiceProvider),
ref.watch(backgroundSyncProvider),
), ),
); );
@ -30,6 +33,7 @@ class AuthService {
final IAuthRepository _authRepository; final IAuthRepository _authRepository;
final ApiService _apiService; final ApiService _apiService;
final NetworkService _networkService; final NetworkService _networkService;
final BackgroundSyncManager _backgroundSyncManager;
final _log = Logger("AuthService"); final _log = Logger("AuthService");
@ -38,6 +42,7 @@ class AuthService {
this._authRepository, this._authRepository,
this._apiService, this._apiService,
this._networkService, this._networkService,
this._backgroundSyncManager,
); );
/// Validates the provided server URL by resolving and setting the endpoint. /// Validates the provided server URL by resolving and setting the endpoint.
@ -117,6 +122,7 @@ class AuthService {
/// All deletions are executed in parallel using [Future.wait]. /// All deletions are executed in parallel using [Future.wait].
Future<void> clearLocalData() { Future<void> clearLocalData() {
return Future.wait([ return Future.wait([
_backgroundSyncManager.cancel(),
_authRepository.clearLocalData(), _authRepository.clearLocalData(),
Store.delete(StoreKey.currentUser), Store.delete(StoreKey.currentUser),
Store.delete(StoreKey.accessToken), Store.delete(StoreKey.accessToken),

View File

@ -1,20 +0,0 @@
// ignore_for_file: avoid-passing-async-when-sync-expected
import 'dart:async';
import 'package:async/async.dart';
import 'package:immich_mobile/providers/infrastructure/sync_stream.provider.dart';
import 'package:immich_mobile/utils/isolate.dart';
class BackgroundSyncManager {
// This prevents multiple syncs from running at the same time
final _userSyncCache = AsyncCache.ephemeral();
BackgroundSyncManager();
Future<void> syncUsers() => _userSyncCache.fetch(
() => runInIsolate(
(ref) => ref.read(syncStreamServiceProvider).syncUsers(),
),
);
}

View File

@ -48,11 +48,15 @@ abstract final class Bootstrap {
); );
} }
static Future<void> initDomain(Isar db) async { static Future<void> initDomain(
Isar db, {
bool shouldBufferLogs = true,
}) async {
await StoreService.init(storeRepository: IsarStoreRepository(db)); await StoreService.init(storeRepository: IsarStoreRepository(db));
await LogService.init( await LogService.init(
logRepository: IsarLogRepository(db), logRepository: IsarLogRepository(db),
storeRepository: IsarStoreRepository(db), storeRepository: IsarStoreRepository(db),
shouldBuffer: shouldBufferLogs,
); );
} }
} }

View File

@ -1,66 +1,401 @@
// ignore_for_file: avoid-missing-completer-stack-trace
import 'dart:async'; import 'dart:async';
import 'dart:isolate'; import 'dart:isolate';
import 'dart:ui'; import 'dart:ui';
import 'package:flutter/material.dart'; import 'package:flutter/foundation.dart';
import 'package:flutter/services.dart'; import 'package:flutter/services.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/providers/db.provider.dart'; import 'package:immich_mobile/providers/db.provider.dart';
import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
import 'package:immich_mobile/utils/bootstrap.dart'; import 'package:immich_mobile/utils/bootstrap.dart';
import 'package:logging/logging.dart'; import 'package:logging/logging.dart';
/// Exception thrown when IsolateManager is used incorrectly from a non-root isolate.
class InvalidIsolateUsageException implements Exception { class InvalidIsolateUsageException implements Exception {
const InvalidIsolateUsageException(); const InvalidIsolateUsageException();
@override @override
String toString() => String toString() =>
"IsolateHelper should only be used from the root isolate"; "IsolateManager should only be used from the root isolate";
} }
// !! Should be used only from the root isolate /// Exception thrown when an isolate computation is cancelled via [IsolateManager.cancel].
Future<T?> runInIsolate<T>( class IsolateCancelledException implements Exception {
FutureOr<T> Function(ProviderContainer ref) computation, { final String? debugLabel;
const IsolateCancelledException([this.debugLabel]);
@override
String toString() =>
"Isolate computation${debugLabel == null ? '' : ' ($debugLabel)'} was cancelled.";
}
/// Signature for the computation function to be executed in the isolate.
/// It receives a Riverpod [ProviderContainer] for dependency injection.
typedef Computation<T> = FutureOr<T> Function(ProviderContainer ref);
enum _IsolateCommand {
cancel,
start,
}
/// Manages running a [Computation] in a separate isolate with Riverpod support
/// and optional cancellation.
///
/// Similar to [compute] or [Isolate.run], but provides:
/// - A pre-configured Riverpod [ProviderContainer] to the computation.
/// - An optional `onCancel` computation for cleanup.
/// - A `cancel()` method to attempt cancellation.
class IsolateManager<T> {
final SendPort _sendPort;
final ReceivePort _receivePort;
final Completer<T?> _completer = Completer<T?>();
final String? _debugLabel;
StreamSubscription? _subscription;
bool _isClosed = false;
bool _isRunning = false;
IsolateManager._(this._sendPort, this._receivePort, this._debugLabel) {
_subscription = _receivePort.listen(
_handleResponseFromIsolate,
onDone: () {
// If the port closes before the completer finishes, signal an error.
if (_completer.isCompleted) {
return;
}
_completer.completeError(
RemoteError(
"Isolate terminated unexpectedly${_debugLabel == null ? '' : ' ($_debugLabel)'}",
"",
),
StackTrace.empty,
);
_isClosed = true;
},
);
}
/// Spawns a new isolate and prepares it to run the [computation].
///
/// - [computation]: The function to run in the isolate.
/// - [onCancel]: An optional function to run if `cancel()` is called before the computation completes.
/// - [debugLabel]: An optional label for debugging.
///
/// Must be called from the root isolate.
static Future<IsolateManager<T>> spawn<T>({
required Computation<T> computation,
Computation<T>? onCancel,
String? debugLabel, String? debugLabel,
}) async { }) async {
final Logger logger = Logger("IsolateManager");
// Port for receiving the [SendPort] from the new isolate.
final initPort = RawReceivePort();
// Completer to synchronize the establishment of the main communication channel.
final connection = Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
if (initialMessage == null) {
// onExit handler message, isolate terminated without sending a SendPort.
initPort.close();
connection.completeError(
RemoteError(
"Isolate exited unexpectedly during initialization${debugLabel == null ? '' : ' ($debugLabel)'}",
"",
),
StackTrace.empty,
);
return;
}
// The first message should be the SendPort for commands.
final commandPort = initialMessage as SendPort;
connection.complete(
(ReceivePort.fromRawReceivePort(initPort), commandPort),
);
};
final token = RootIsolateToken.instance; final token = RootIsolateToken.instance;
if (token == null) { if (token == null) {
initPort.close(); // Clean up before throwing
throw const InvalidIsolateUsageException(); throw const InvalidIsolateUsageException();
} }
return await Isolate.run(() async { try {
BackgroundIsolateBinaryMessenger.ensureInitialized(token); await Isolate.spawn(
DartPluginRegistrant.ensureInitialized(); _IsolateRunner._execute,
_IsolateRunner(
final db = await Bootstrap.initIsar(); computation: computation,
await Bootstrap.initDomain(db); sendPort: initPort.sendPort,
final ref = ProviderContainer( token: token,
overrides: [ onCancel: onCancel,
// TODO: Remove once isar is removed debugLabel: debugLabel,
dbProvider.overrideWithValue(db), ),
isarProvider.overrideWithValue(db), errorsAreFatal: true,
], onExit: initPort.sendPort, // Send null on exit
onError: initPort.sendPort, // Errors during spawn sent here
debugName: debugLabel,
); );
} catch (e, s) {
Logger log = Logger("IsolateLogger"); logger.warning(
"Failed to spawn isolate${debugLabel == null ? '' : ' - $debugLabel'}",
e,
s,
);
initPort.close();
rethrow;
}
try { try {
final result = await computation(ref); final (ReceivePort receivePort, SendPort sendPort) =
// Wait for isolate to end; i.e, logs to be flushed await connection.future;
await Future.delayed(Durations.short2); return IsolateManager._(sendPort, receivePort, debugLabel);
return result; } catch (e) {
// If connection.future completed with an error (e.g., isolate exited early)
logger.warning(
"Isolate connection failed${debugLabel == null ? '' : ' - $debugLabel'}",
e,
);
// initPort is closed by the handler in case of error/null message
rethrow;
}
}
/// Starts the computation in the isolate.
///
/// Returns a future that completes with the result of the computation,
/// or an error if the computation fails or is cancelled.
Future<T?> run() {
if (_isClosed) {
return Future.error(StateError("IsolateManager is already closed"));
}
if (_isRunning) {
return Future.error(
StateError("Isolate computation is already running"),
);
}
_isRunning = true;
_sendPort.send(_IsolateCommand.start);
return _completer.future;
}
/// Attempts to cancel the computation.
///
/// If the computation has not yet completed, this will cause the future
/// returned by [run] to complete with an [IsolateCancelledException].
/// An optional `onCancel` computation might be executed in the isolate.
///
/// Does nothing if the computation has already completed or the manager is closed.
void cancel() {
if (_isClosed || _completer.isCompleted) {
return;
}
if (!_isRunning) {
_close(IsolateCancelledException(_debugLabel));
return;
}
// If running, send cancel command. The isolate will handle sending back the error.
_sendPort.send(_IsolateCommand.cancel);
}
/// Closes communication channels and completes the future with the given error.
void _close([Object? error, StackTrace? stackTrace]) {
if (_isClosed) return;
_isClosed = true;
unawaited(_subscription?.cancel());
_receivePort.close();
if (!_completer.isCompleted) {
if (error == null) {
_completer.completeError(
StateError("IsolateManager closed without result or error."),
StackTrace.empty,
);
} else {
_completer.completeError(error, stackTrace);
}
}
}
/// Handles messages received from the isolate.
void _handleResponseFromIsolate(Object? response) {
if (_isClosed) return;
// Expect list: [result] or [error, stackTrace] or [IsolateCancelledException]
final list = response as List<Object?>;
Object? error;
StackTrace? stackTrace;
if (list.length == 2) {
error = list.firstOrNull;
final remoteStack = list.elementAtOrNull(1);
if (remoteStack is StackTrace) {
stackTrace = remoteStack;
} else if (error is String && remoteStack is String?) {
// Reconstruct RemoteError if possible
error = RemoteError(error, remoteStack ?? "");
stackTrace = (error as RemoteError).stackTrace;
}
} else if (list.length == 1) {
final result = list.firstOrNull;
if (result is IsolateCancelledException) {
error = result;
} else {
// Success case
if (!_completer.isCompleted) {
_completer.complete(result as T?);
}
_close();
return;
}
} else {
error = RemoteError(
"Invalid message format from isolate",
response.toString(),
);
}
// If we reached here, it's an error or cancellation
_close(error, stackTrace);
}
}
/// Internal helper class that runs within the isolate.
class _IsolateRunner<T> {
final Computation<T> _computation;
final Computation<T>? _onCancel;
final String? _debugLabel;
final RootIsolateToken _token;
final SendPort _sendPort; // Port to send results/errors back to main
ReceivePort? _receivePort; // Port to receive commands from main
bool _cancelled = false;
bool _computationStarted = false;
ProviderContainer? _ref; // Hold ref for cleanup
_IsolateRunner({
required Computation<T> computation,
required SendPort sendPort,
required RootIsolateToken token,
Computation<T>? onCancel,
String? debugLabel,
}) : _computation = computation,
_sendPort = sendPort,
_token = token,
_onCancel = onCancel,
_debugLabel = debugLabel;
/// Entry point for the isolate.
static void _execute(_IsolateRunner<Object?> runner) {
runner._start();
}
/// Initializes the isolate environment and listens for commands.
void _start() {
BackgroundIsolateBinaryMessenger.ensureInitialized(_token);
DartPluginRegistrant.ensureInitialized();
_receivePort = ReceivePort();
_sendPort.send(_receivePort!.sendPort);
// Listen for commands from the main isolate.
// ignore: avoid-passing-async-when-sync-expected
final _ = _receivePort!.listen((message) async {
if (_cancelled) return;
switch (message) {
case _IsolateCommand.cancel:
_cancelled = true;
// Run onCancel if computation hasn't started or finished
if (_onCancel == null) {
// Close the receive port and exit when no cleanup is needed
_sendPort.send([IsolateCancelledException(_debugLabel)]);
_cleanupAndExit();
} else {
await _runCleanup(_onCancel);
}
break;
case _IsolateCommand.start:
if (_computationStarted) return; // Ignore duplicate start
_computationStarted = true;
await _exec();
_cleanupAndExit();
break;
}
});
}
/// Executes the main computation.
Future<void> _exec() async {
Logger log = Logger("IsolateRunner");
try {
_ref = await _bootstrap();
if (_cancelled) return;
final potentiallyAsyncResult = _computation(_ref!);
final T result;
if (potentiallyAsyncResult is Future<T>) {
result = await potentiallyAsyncResult;
} else {
result = potentiallyAsyncResult;
}
if (_cancelled) return;
_sendPort.send([result]);
} catch (error, stack) { } catch (error, stack) {
if (_cancelled) return;
log.severe( log.severe(
"Error in runInIsolate${debugLabel == null ? '' : ' for $debugLabel'}", "Error in computation${_debugLabel == null ? '' : ' ($_debugLabel)'}",
error, error,
stack, stack,
); );
_sendPort.send([error, stack]);
}
// Cleanup happens in _cleanupAndExit called by the listener
}
/// Executes the onCancel computation.
Future<void> _runCleanup(Computation<T> cleanupFunc) async {
Logger log = Logger("IsolateRunner");
try {
if (_ref == null) {
log.warning("IsolateRunner cleanup called without ref");
return;
}
await cleanupFunc(_ref!);
} catch (e, s) {
log.warning("Error during isolate onCancel cleanup", e, s);
// Don't send this error back, primary goal is cancellation signal
}
}
Future<ProviderContainer> _bootstrap() async {
final db = await Bootstrap.initIsar();
await Bootstrap.initDomain(db, shouldBufferLogs: false);
return ProviderContainer(
overrides: [
dbProvider.overrideWithValue(db),
isarProvider.overrideWithValue(db),
cancellationProvider.overrideWithValue(Completer()),
],
);
}
/// Closes resources and the receive port.
Future<void> _cleanupAndExit() async {
// Always close the db connections and dispose ref if created
try {
await _ref?.read(driftProvider).close();
await _ref?.read(isarProvider).close();
_ref?.dispose();
} catch (e) {
if (kDebugMode) {
print("Error during resource cleanup: $e");
}
} finally { } finally {
// Always close the new db connection on Isolate end _receivePort?.close();
ref.read(driftProvider).close(); }
ref.read(dbProvider).close();
ref.read(isarProvider).close();
} }
return null;
});
} }

View File

@ -1,7 +1,10 @@
import 'package:immich_mobile/domain/services/store.service.dart'; import 'package:immich_mobile/domain/services/store.service.dart';
import 'package:immich_mobile/domain/services/user.service.dart'; import 'package:immich_mobile/domain/services/user.service.dart';
import 'package:immich_mobile/domain/utils/background_sync.dart';
import 'package:mocktail/mocktail.dart'; import 'package:mocktail/mocktail.dart';
class MockStoreService extends Mock implements StoreService {} class MockStoreService extends Mock implements StoreService {}
class MockUserService extends Mock implements UserService {} class MockUserService extends Mock implements UserService {}
class MockBackgroundSyncManager extends Mock implements BackgroundSyncManager {}

View File

@ -7,6 +7,7 @@ import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
import 'package:immich_mobile/domain/models/sync_event.model.dart'; import 'package:immich_mobile/domain/models/sync_event.model.dart';
import 'package:immich_mobile/domain/services/sync_stream.service.dart'; import 'package:immich_mobile/domain/services/sync_stream.service.dart';
import 'package:immich_mobile/domain/utils/cancel.exception.dart';
import 'package:mocktail/mocktail.dart'; import 'package:mocktail/mocktail.dart';
import 'package:openapi/api.dart'; import 'package:openapi/api.dart';
@ -57,7 +58,9 @@ void main() {
}); });
tearDown(() async { tearDown(() async {
if (!streamController.isClosed) {
await streamController.close(); await streamController.close();
}
}); });
// Helper to trigger sync and add events to the stream // Helper to trigger sync and add events to the stream
@ -83,8 +86,8 @@ void main() {
await expectLater(future, completes); await expectLater(future, completes);
// Verify ack includes last ack from each successfully handled type // Verify ack includes last ack from each successfully handled type
verify( verify(
() => mockSyncApiRepo () =>
.ack(any(that: containsAllInOrder(["5", "2", "4", "3"]))), mockSyncApiRepo.ack(any(that: containsAll(["5", "2", "4", "3"]))),
).called(1); ).called(1);
}, },
); );
@ -159,9 +162,8 @@ void main() {
await triggerSyncAndEmit(events); await triggerSyncAndEmit(events);
// Expect ack only for userDeleteV1 (ack: "2") and partnerDeleteV1 (ack: "4") // Expect ack only for userDeleteV1 (ack: "2") and partnerDeleteV1 (ack: "4")
verify( verify(() => mockSyncApiRepo.ack(any(that: containsAll(["2", "4"]))))
() => mockSyncApiRepo.ack(any(that: containsAllInOrder(["2", "4"]))), .called(1);
).called(1);
}); });
test("does not process or ack when stream emits an empty list", () async { test("does not process or ack when stream emits an empty list", () async {
@ -183,15 +185,19 @@ void main() {
int callOrder = 0; int callOrder = 0;
int handler1StartOrder = -1; int handler1StartOrder = -1;
int handler2StartOrder = -1; int handler2StartOrder = -1;
int handler1Calls = 0;
int handler2Calls = 0;
when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer((_) async { when(() => mockSyncStreamRepo.updateUsersV1(any())).thenAnswer((_) async {
handler1StartOrder = ++callOrder; // Record when handler 1 starts handler1Calls++;
await completer1.future; // Wait for external signal handler1StartOrder = ++callOrder;
await completer1.future;
return true; return true;
}); });
when(() => mockSyncStreamRepo.updatePartnerV1(any())) when(() => mockSyncStreamRepo.updatePartnerV1(any()))
.thenAnswer((_) async { .thenAnswer((_) async {
handler2StartOrder = ++callOrder; // Record when handler 2 starts handler2Calls++;
handler2StartOrder = ++callOrder;
await completer2.future; await completer2.future;
return true; return true;
}); });
@ -200,44 +206,98 @@ void main() {
final batch2 = SyncStreamStub.partnerEvents; final batch2 = SyncStreamStub.partnerEvents;
final syncFuture = sut.syncUsers(); final syncFuture = sut.syncUsers();
streamController.add(batch1); // Emit first batch await pumpEventQueue();
streamController.add(batch1);
await pumpEventQueue();
// Small delay to ensure the first handler starts
await Future.delayed(const Duration(milliseconds: 20));
// Ensure first batch processing starts
await Future.delayed(const Duration(milliseconds: 10));
expect(handler1StartOrder, 1, reason: "Handler 1 should start first"); expect(handler1StartOrder, 1, reason: "Handler 1 should start first");
expect(handler1Calls, 1);
streamController.add(batch2); // Emit second batch while first is waiting streamController.add(batch2);
await pumpEventQueue();
// Small delay
await Future.delayed(const Duration(milliseconds: 20));
await Future.delayed(const Duration(milliseconds: 10)); expect(handler2StartOrder, -1, reason: "Handler 2 should wait");
// Handler 2 should NOT have started yet because mutex is held by handler 1 expect(handler2Calls, 0);
expect(
handler2StartOrder,
-1,
reason: "Handler 2 should wait for Handler 1",
);
completer1.complete(); // Allow first handler to finish completer1.complete();
// Allow second batch to start processing await pumpEventQueue(times: 40);
await Future.delayed(const Duration(milliseconds: 10)); // Small delay to ensure the second handler starts
await Future.delayed(const Duration(milliseconds: 20));
// Now handler 2 should start expect(handler2StartOrder, 2, reason: "Handler 2 should start after H1");
expect( expect(handler2Calls, 1);
handler2StartOrder,
2,
reason: "Handler 2 should start after Handler 1 finishes",
);
completer2.complete(); // Allow second handler to finish completer2.complete();
await streamController.close(); // Close stream await pumpEventQueue(times: 40);
await syncFuture; // Wait for overall completion // Small delay before closing the stream
await Future.delayed(const Duration(milliseconds: 20));
if (!streamController.isClosed) {
await streamController.close();
}
await pumpEventQueue(times: 40);
// Small delay to ensure the sync completes
await Future.delayed(const Duration(milliseconds: 20));
await syncFuture;
// Verify handlers were called and acks sent for both batches eventually
verify(() => mockSyncStreamRepo.updateUsersV1(any())).called(1); verify(() => mockSyncStreamRepo.updateUsersV1(any())).called(1);
verify(() => mockSyncStreamRepo.updatePartnerV1(any())).called(1); verify(() => mockSyncStreamRepo.updatePartnerV1(any())).called(1);
// Acks might be called separately or combined depending on timing, check count
verify(() => mockSyncApiRepo.ack(any())).called(2); verify(() => mockSyncApiRepo.ack(any())).called(2);
}); });
test(
"stops processing and ack when cancel completer is completed",
() async {
final cancelCompleter = Completer<bool>();
sut = SyncStreamService(
syncApiRepository: mockSyncApiRepo,
syncStreamRepository: mockSyncStreamRepo,
cancelCompleter: cancelCompleter,
);
final processingCompleter = Completer<void>();
bool handlerStarted = false;
// Make handler wait so we can cancel it mid-flight
when(() => mockSyncStreamRepo.updateUsersV1(any()))
.thenAnswer((_) async {
handlerStarted = true;
await processingCompleter
.future; // Wait indefinitely until test completes it
return true;
});
final syncFuture = sut.syncUsers();
await pumpEventQueue(times: 30);
streamController.add(SyncStreamStub.userEvents);
// Ensure processing starts
await Future.delayed(const Duration(milliseconds: 10));
expect(handlerStarted, isTrue, reason: "Handler should have started");
cancelCompleter.complete(true); // Signal cancellation
// Allow cancellation logic to propagate
await Future.delayed(const Duration(milliseconds: 10));
// Complete the handler's completer after cancellation signal
// to ensure the cancellation logic itself isn't blocked by the handler.
processingCompleter.complete();
await expectLater(syncFuture, throwsA(isA<CancelException>()));
// Verify that ack was NOT called because processing was cancelled
verifyNever(() => mockSyncApiRepo.ack(any()));
},
);
test("completes successfully when ack call throws an exception", () async { test("completes successfully when ack call throws an exception", () async {
when(() => mockSyncApiRepo.ack(any())).thenThrow(Exception("Ack Error")); when(() => mockSyncApiRepo.ack(any())).thenThrow(Exception("Ack Error"));
final events = [ final events = [

View File

@ -8,6 +8,7 @@ import 'package:isar/isar.dart';
import 'package:mocktail/mocktail.dart'; import 'package:mocktail/mocktail.dart';
import 'package:openapi/api.dart'; import 'package:openapi/api.dart';
import '../domain/service.mock.dart';
import '../repository.mocks.dart'; import '../repository.mocks.dart';
import '../service.mocks.dart'; import '../service.mocks.dart';
import '../test_utils.dart'; import '../test_utils.dart';
@ -18,6 +19,7 @@ void main() {
late MockAuthRepository authRepository; late MockAuthRepository authRepository;
late MockApiService apiService; late MockApiService apiService;
late MockNetworkService networkService; late MockNetworkService networkService;
late MockBackgroundSyncManager backgroundSyncManager;
late Isar db; late Isar db;
setUp(() async { setUp(() async {
@ -25,12 +27,14 @@ void main() {
authRepository = MockAuthRepository(); authRepository = MockAuthRepository();
apiService = MockApiService(); apiService = MockApiService();
networkService = MockNetworkService(); networkService = MockNetworkService();
backgroundSyncManager = MockBackgroundSyncManager();
sut = AuthService( sut = AuthService(
authApiRepository, authApiRepository,
authRepository, authRepository,
apiService, apiService,
networkService, networkService,
backgroundSyncManager,
); );
registerFallbackValue(Uri()); registerFallbackValue(Uri());
@ -116,24 +120,28 @@ void main() {
group('logout', () { group('logout', () {
test('Should logout user', () async { test('Should logout user', () async {
when(() => authApiRepository.logout()).thenAnswer((_) async => {}); when(() => authApiRepository.logout()).thenAnswer((_) async => {});
when(() => backgroundSyncManager.cancel()).thenAnswer((_) async => {});
when(() => authRepository.clearLocalData()) when(() => authRepository.clearLocalData())
.thenAnswer((_) => Future.value(null)); .thenAnswer((_) => Future.value(null));
await sut.logout(); await sut.logout();
verify(() => authApiRepository.logout()).called(1); verify(() => authApiRepository.logout()).called(1);
verify(() => backgroundSyncManager.cancel()).called(1);
verify(() => authRepository.clearLocalData()).called(1); verify(() => authRepository.clearLocalData()).called(1);
}); });
test('Should clear local data even on server error', () async { test('Should clear local data even on server error', () async {
when(() => authApiRepository.logout()) when(() => authApiRepository.logout())
.thenThrow(Exception('Server error')); .thenThrow(Exception('Server error'));
when(() => backgroundSyncManager.cancel()).thenAnswer((_) async => {});
when(() => authRepository.clearLocalData()) when(() => authRepository.clearLocalData())
.thenAnswer((_) => Future.value(null)); .thenAnswer((_) => Future.value(null));
await sut.logout(); await sut.logout();
verify(() => authApiRepository.logout()).called(1); verify(() => authApiRepository.logout()).called(1);
verify(() => backgroundSyncManager.cancel()).called(1);
verify(() => authRepository.clearLocalData()).called(1); verify(() => authRepository.clearLocalData()).called(1);
}); });
}); });