From f1f814240944220d913ca45318b2ce5bea5d2f0e Mon Sep 17 00:00:00 2001 From: mertalev <101130780+mertalev@users.noreply.github.com> Date: Thu, 28 May 2026 21:24:06 -0400 Subject: [PATCH] event-based cancellation wire hash cancellation await cleanup remove forced kill add regression tests abort sync requests fix cleanup ordering in teardown exit isolate test background sync test sigabrt crash cleanup --- .../background_sync_teardown_test.dart | 154 +++++++++++++++++ .../test_utils/fake_immich_server.dart | 115 ++++++++++++ .../Runner/Background/BackgroundWorker.swift | 10 +- .../services/background_worker.service.dart | 20 +-- mobile/lib/domain/services/hash.service.dart | 14 +- .../domain/services/local_sync.service.dart | 18 ++ mobile/lib/domain/services/log.service.dart | 12 +- mobile/lib/domain/services/store.service.dart | 6 + .../services/sync_linked_album.service.dart | 16 +- .../domain/services/sync_stream.service.dart | 23 ++- mobile/lib/domain/utils/background_sync.dart | 56 ++---- .../lib/domain/utils/migrate_cloud_ids.dart | 38 +++- .../repositories/sync_api.repository.dart | 3 +- .../infrastructure/cancel.provider.dart | 7 +- .../infrastructure/sync.provider.dart | 4 +- .../drift_album_api_repository.dart | 10 +- mobile/lib/utils/isolate.dart | 66 +++---- mobile/lib/utils/isolate_worker.dart | 163 ++++++++++++++++++ mobile/lib/wm_executor.dart | 140 ++++----------- .../services/sync_stream_service_test.dart | 32 ++-- .../test/unit/utils/isolate_worker_test.dart | 23 +++ 21 files changed, 666 insertions(+), 264 deletions(-) create mode 100644 mobile/integration_test/background_sync_teardown_test.dart create mode 100644 mobile/integration_test/test_utils/fake_immich_server.dart create mode 100644 mobile/lib/utils/isolate_worker.dart create mode 100644 mobile/test/unit/utils/isolate_worker_test.dart diff --git a/mobile/integration_test/background_sync_teardown_test.dart b/mobile/integration_test/background_sync_teardown_test.dart new file mode 100644 index 0000000000..0f125b7fcc --- /dev/null +++ b/mobile/integration_test/background_sync_teardown_test.dart @@ -0,0 +1,154 @@ +import 'dart:async'; + +import 'package:drift/drift.dart' show Value; +import 'package:flutter_test/flutter_test.dart'; +import 'package:immich_mobile/domain/models/store.model.dart'; +import 'package:immich_mobile/domain/utils/background_sync.dart'; +import 'package:immich_mobile/entities/store.entity.dart'; +import 'package:immich_mobile/infrastructure/entities/user.entity.drift.dart'; +import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; +import 'package:immich_mobile/main.dart' as app; +import 'package:immich_mobile/services/api.service.dart'; +import 'package:immich_mobile/utils/bootstrap.dart'; +import 'package:immich_mobile/wm_executor.dart'; +import 'package:integration_test/integration_test.dart'; +import 'package:openapi/api.dart'; + +import 'test_utils/fake_immich_server.dart'; + +void main() { + final binding = IntegrationTestWidgetsFlutterBinding.ensureInitialized(); + // These tests do real I/O without pumping a widget tree, so disable the fake async clock + binding.framePolicy = LiveTestWidgetsFlutterBindingFramePolicy.fullyLive; + + late Drift drift; + late FakeImmichServer server; + + setUpAll(() async { + await app.initApp(); + (drift, _) = await Bootstrap.initDomain(); + }); + + setUp(() async { + await workerManagerPatch.init(dynamicSpawning: true); + server = await FakeImmichServer.start(); + await ApiService().resolveAndSetEndpoint(server.endpoint); + await drift.delete(drift.userEntity).go(); + await Store.delete(StoreKey.syncMigrationStatus); + }); + + tearDown(() async { + await workerManagerPatch.dispose(); + await server.close(); + await Store.delete(StoreKey.serverEndpoint); + await Store.delete(StoreKey.syncMigrationStatus); + }); + + void sendUser(SyncStream stream, String id, String name) { + stream.send( + type: SyncEntityType.userV1.value, + data: SyncUserV1( + id: id, + name: name, + email: '$id@test.com', + hasProfileImage: false, + deletedAt: null, + profileChangedAt: DateTime.utc(2025), + ).toJson(), + ack: id, + ); + } + + Future dbReadable() async { + try { + await drift.customSelect('SELECT 1').get().timeout(const Duration(seconds: 5)); + return true; + } catch (_) { + return false; + } + } + + Future userCount() async => (await drift.select(drift.userEntity).get()).length; + + // Starts a remote sync and resolves once its /sync/stream request is open. + Future<(Future, SyncStream)> startSync() async { + final sync = BackgroundSyncManager().syncRemote(); + final stream = await server.streamOpened.timeout( + const Duration(seconds: 30), + onTimeout: () => fail('sync isolate never opened /sync/stream'), + ); + return (sync, stream); + } + + testWidgets('a full sync ingests streamed events into the shared DB', (tester) async { + expect(await userCount(), 0); + + final (sync, stream) = await startSync(); + + sendUser(stream, 'u1', 'Alice'); + sendUser(stream, 'u2', 'Bob'); + await stream.close(); + + final result = await sync.timeout( + const Duration(seconds: 30), + onTimeout: () => fail('sync did not complete after the stream ended'), + ); + expect(result, isTrue); + expect(await userCount(), 2); + expect(server.ackRequests, greaterThan(0)); + }); + + testWidgets('disposing the pool during an in-flight sync drains promptly', (tester) async { + final (sync, _) = await startSync(); + + final sw = Stopwatch()..start(); + await workerManagerPatch.dispose().timeout( + const Duration(seconds: 15), + onTimeout: () => fail('dispose() hung — worker did not drain and exit'), + ); + expect(sw.elapsed, lessThan(const Duration(seconds: 10)), reason: 'abort-driven, not socket-timeout bound'); + + expect(await sync.timeout(const Duration(seconds: 5), onTimeout: () => false), isFalse); + }); + + testWidgets('tearing down a worker blocked mid-write leaves the DB usable', (tester) async { + final (sync, stream) = await startSync(); + + // Hold an exclusive write transaction so the worker's write is blocked. The lock is taken only + // after the stream opens to avoid blocking the worker's own startup DB reads. + final releaseTxn = Completer(); + final txnHeld = Completer(); + final txn = drift.transaction(() async { + await drift.into(drift.userEntity).insert( + UserEntityCompanion.insert( + id: 'holder', + name: 'holder', + email: 'holder@test.com', + hasProfileImage: const Value(false), + profileChangedAt: Value(DateTime.utc(2025)), + ), + ); + txnHeld.complete(); + await releaseTxn.future; + }); + await txnHeld.future; + + sendUser(stream, 'u1', 'Alice'); + await stream.close(); + + // dispose() can only finish once the worker unwinds, which is blocked on the + // lock — so start it, release the lock, then await completion. + final disposed = workerManagerPatch.dispose(); + releaseTxn.complete(); + await txn; + await disposed.timeout( + const Duration(seconds: 15), + onTimeout: () => fail('dispose() hung after releasing the write lock'), + ); + await sync.timeout(const Duration(seconds: 5), onTimeout: () => false); + + expect(await dbReadable(), isTrue); + final users = await drift.select(drift.userEntity).get(); + expect(users.map((u) => u.id), contains('holder')); + }); +} diff --git a/mobile/integration_test/test_utils/fake_immich_server.dart b/mobile/integration_test/test_utils/fake_immich_server.dart new file mode 100644 index 0000000000..c434f83bc5 --- /dev/null +++ b/mobile/integration_test/test_utils/fake_immich_server.dart @@ -0,0 +1,115 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; + +/// A dummy localhost server that implements only the endpoints that remote-sync touches. +class FakeImmichServer { + FakeImmichServer._(this._server, this.version); + + final HttpServer _server; + final (int, int, int) version; + + final Completer _streamOpened = Completer(); + + int ackRequests = 0; + + String get endpoint => 'http://${_server.address.host}:${_server.port}/api'; + + /// Resolves when the sync isolate opens `POST /sync/stream`. + Future get streamOpened => _streamOpened.future; + + static Future start({(int, int, int) version = (3, 0, 0)}) async { + final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0); + final fake = FakeImmichServer._(server, version); + fake._listen(); + return fake; + } + + void _listen() { + // A connection torn down mid-write during teardown is expected + _server.listen((request) => unawaited(_route(request).catchError((_) {}))); + } + + Future _route(HttpRequest request) async { + final method = request.method; + final path = request.uri.path; + + if (method == 'GET' && path == '/api/server/ping') { + return _respondJson(request, {'res': 'pong'}); + } + if (method == 'GET' && path == '/api/server/version') { + final (major, minor, patch) = version; + return _respondJson(request, {'major': major, 'minor': minor, 'patch': patch}); + } + if (path == '/api/sync/ack') { + if (method != 'DELETE') { + ackRequests++; + } + return _respondEmpty(request); + } + if (method == 'POST' && path == '/api/sync/stream') { + return _openSyncStream(request); + } + return _respondEmpty(request, status: HttpStatus.notFound); + } + + Future _openSyncStream(HttpRequest request) async { + await request.drain(); + request.response + ..statusCode = HttpStatus.ok + ..headers.contentType = ContentType('application', 'jsonlines+json') + ..contentLength = -1 // chunked: stays open to stream incrementally + ..bufferOutput = false; + // Flush headers so the client's send() resolves and enters its read loop. + await request.response.flush(); + if (!_streamOpened.isCompleted) { + _streamOpened.complete(SyncStream._(request.response)); + } + } + + Future _respondJson(HttpRequest request, Object body) async { + await request.drain(); + request.response + ..statusCode = HttpStatus.ok + ..headers.contentType = ContentType.json + ..write(jsonEncode(body)); + await request.response.close(); + } + + Future _respondEmpty(HttpRequest request, {int status = HttpStatus.ok}) async { + await request.drain(); + request.response.statusCode = status; + await request.response.close(); + } + + Future close() async { + if (_streamOpened.isCompleted) { + await (await _streamOpened.future).close(); + } + await _server.close(force: true); + } +} + +/// Handle to the open `/sync/stream` response: push jsonlines events, then end. +class SyncStream { + SyncStream._(this._response); + + final HttpResponse _response; + bool _closed = false; + + /// [data] should be a Sync*V1 DTO's `toJson()` so the parser's `fromJson` round-trips it. + void send({required String type, required Object data, required String ack}) { + if (_closed) { + return; + } + _response.write('${jsonEncode({'type': type, 'data': data, 'ack': ack})}\n'); + } + + Future close() async { + if (_closed) { + return; + } + _closed = true; + await _response.close(); + } +} diff --git a/mobile/ios/Runner/Background/BackgroundWorker.swift b/mobile/ios/Runner/Background/BackgroundWorker.swift index c5b5e1778a..ad583065f0 100644 --- a/mobile/ios/Runner/Background/BackgroundWorker.swift +++ b/mobile/ios/Runner/Background/BackgroundWorker.swift @@ -121,8 +121,8 @@ class BackgroundWorker: BackgroundWorkerBgHostApi { /** * Cancels the currently running background task, either due to timeout or external request. - * Sends a cancel signal to the Flutter side and sets up a fallback timer to ensure - * the completion handler is eventually called even if Flutter doesn't respond. + * Only tears down the engine after Dart confirms it's drained. If Dart overruns iOS's grace window, + * the expiration handler still calls setTaskCompleted and iOS suspends us. */ func close() { if isComplete { @@ -132,12 +132,6 @@ class BackgroundWorker: BackgroundWorkerBgHostApi { flutterApi?.cancel { result in self.complete(success: false) } - - // Fallback safety mechanism: ensure completion is called within 2 seconds - // This prevents the background task from hanging indefinitely if Flutter doesn't respond - Timer.scheduledTimer(withTimeInterval: 2, repeats: false) { _ in - self.complete(success: false) - } } diff --git a/mobile/lib/domain/services/background_worker.service.dart b/mobile/lib/domain/services/background_worker.service.dart index d28f7ff14b..eadcf7c9db 100644 --- a/mobile/lib/domain/services/background_worker.service.dart +++ b/mobile/lib/domain/services/background_worker.service.dart @@ -188,20 +188,14 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi { if (!_cancellationToken.isCompleted) { _cancellationToken.complete(); } - final cleanupFutures = [ - nativeSyncApi?.cancelHashing(), - workerManagerPatch.dispose().catchError((_) async { - // Discard any errors on the dispose call - return; - }), - LogService.I.dispose(), - Store.dispose(), - backgroundSyncManager?.cancel(), - _drift.optimize(allTables: true), - ]; - - await Future.wait(cleanupFutures.nonNulls); + // Workers share one sqlite connection, so DB teardown must wait until every worker has stopped using it. + await Future.wait([ + if (backgroundSyncManager != null) backgroundSyncManager.cancel(), + if (nativeSyncApi != null) nativeSyncApi.cancelHashing(), + ]); + await workerManagerPatch.dispose().catchError((_) async {}); + await Future.wait([LogService.I.dispose(), Store.dispose(), _drift.optimize(allTables: true)]); await _drift.close(); await _driftLogger.close(); diff --git a/mobile/lib/domain/services/hash.service.dart b/mobile/lib/domain/services/hash.service.dart index e2938a79ad..18b9d35b3b 100644 --- a/mobile/lib/domain/services/hash.service.dart +++ b/mobile/lib/domain/services/hash.service.dart @@ -1,3 +1,5 @@ +import 'dart:async'; + import 'package:flutter/services.dart'; import 'package:immich_mobile/constants/constants.dart'; import 'package:immich_mobile/domain/models/album/local_album.model.dart'; @@ -17,7 +19,7 @@ class HashService { final DriftLocalAssetRepository _localAssetRepository; final DriftTrashedLocalAssetRepository _trashedLocalAssetRepository; final NativeSyncApi _nativeSyncApi; - final bool Function()? _cancelChecker; + final Completer? _cancellation; final _log = Logger('HashService'); HashService({ @@ -25,11 +27,15 @@ class HashService { required this._localAssetRepository, required this._trashedLocalAssetRepository, required this._nativeSyncApi, - this._cancelChecker, + this._cancellation, int? batchSize, - }) : _batchSize = batchSize ?? kBatchHashFileLimit; + }) : _batchSize = batchSize ?? kBatchHashFileLimit { + // Stop the in-flight native hash call promptly on cancellation; the loops + // below also observe [isCancelled] to bail between batches. + _cancellation?.future.then((_) => _nativeSyncApi.cancelHashing()); + } - bool get isCancelled => _cancelChecker?.call() ?? false; + bool get isCancelled => _cancellation?.isCompleted ?? false; Future hashAssets() async { _log.info("Starting hashing of assets"); diff --git a/mobile/lib/domain/services/local_sync.service.dart b/mobile/lib/domain/services/local_sync.service.dart index 77ded0ba4d..889b21c071 100644 --- a/mobile/lib/domain/services/local_sync.service.dart +++ b/mobile/lib/domain/services/local_sync.service.dart @@ -25,6 +25,7 @@ class LocalSyncService { final DriftTrashedLocalAssetRepository _trashedLocalAssetRepository; final AssetMediaRepository _assetMediaRepository; final IPermissionRepository _permissionRepository; + final Completer? _cancellation; final Logger _log = Logger("DeviceSyncService"); LocalSyncService({ @@ -34,8 +35,11 @@ class LocalSyncService { required this._trashedLocalAssetRepository, required this._assetMediaRepository, required this._permissionRepository, + this._cancellation, }); + bool get _isCancelled => _cancellation?.isCompleted ?? false; + Future sync({bool full = false}) async { final Stopwatch stopwatch = Stopwatch()..start(); try { @@ -81,6 +85,10 @@ class LocalSyncService { // detect album deletions from the native side if (CurrentPlatform.isAndroid) { for (final album in dbAlbums) { + if (_isCancelled) { + _log.warning("Local sync cancelled. Stopped processing albums."); + return; + } final deviceIds = await _nativeSyncApi.getAssetIdsForAlbum(album.id); await _localAlbumRepository.syncDeletes(album.id, deviceIds); } @@ -91,6 +99,10 @@ class LocalSyncService { // does not include changes for cloud albums. final cloudAlbums = deviceAlbums.where((a) => a.isCloud).toLocalAlbums(); for (final album in cloudAlbums) { + if (_isCancelled) { + _log.warning("Local sync cancelled. Stopped processing cloud albums."); + return; + } final dbAlbum = dbAlbums.firstWhereOrNull((a) => a.id == album.id); if (dbAlbum == null) { _log.warning("Cloud album ${album.name} not found in local database. Skipping sync."); @@ -135,6 +147,9 @@ class LocalSyncService { } Future addAlbum(LocalAlbum album) async { + if (_isCancelled) { + return; + } try { _log.fine("Adding device album ${album.name}"); @@ -162,6 +177,9 @@ class LocalSyncService { // The deviceAlbum is ignored since we are going to refresh it anyways FutureOr updateAlbum(LocalAlbum dbAlbum, LocalAlbum deviceAlbum) async { + if (_isCancelled) { + return false; + } try { _log.fine("Syncing device album ${dbAlbum.name}"); diff --git a/mobile/lib/domain/services/log.service.dart b/mobile/lib/domain/services/log.service.dart index 216f030b12..b612b3ce91 100644 --- a/mobile/lib/domain/services/log.service.dart +++ b/mobile/lib/domain/services/log.service.dart @@ -112,10 +112,16 @@ class LogService { return _flushBuffer(); } - Future dispose() { + Future dispose() async { _flushTimer?.cancel(); - _logSubscription.cancel(); - return _flushBuffer(); + _flushTimer = null; + await _logSubscription.cancel(); + await _flushBuffer(); + // Allow a subsequent init() (e.g. when a worker isolate is reused) to + // create a fresh instance instead of returning this disposed one. + if (identical(_instance, this)) { + _instance = null; + } } Future _flushBuffer() async { diff --git a/mobile/lib/domain/services/store.service.dart b/mobile/lib/domain/services/store.service.dart index 16ed64e6d3..758622a43b 100644 --- a/mobile/lib/domain/services/store.service.dart +++ b/mobile/lib/domain/services/store.service.dart @@ -54,7 +54,13 @@ class StoreService { /// Disposes the store and cancels the subscription. To reuse the store call init() again Future dispose() async { await _storeUpdateSubscription?.cancel(); + _storeUpdateSubscription = null; _cache.clear(); + // Allow a subsequent init() (e.g. when a worker isolate is reused) to + // create a fresh instance instead of returning this disposed one. + if (identical(_instance, this)) { + _instance = null; + } } /// Returns the cached value for [key], or `null` diff --git a/mobile/lib/domain/services/sync_linked_album.service.dart b/mobile/lib/domain/services/sync_linked_album.service.dart index 3bc76083b8..ddcd6721d7 100644 --- a/mobile/lib/domain/services/sync_linked_album.service.dart +++ b/mobile/lib/domain/services/sync_linked_album.service.dart @@ -1,3 +1,5 @@ +import 'dart:async'; + import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/models/album/local_album.model.dart'; import 'package:immich_mobile/domain/models/store.model.dart'; @@ -5,6 +7,7 @@ import 'package:immich_mobile/domain/services/store.service.dart'; import 'package:immich_mobile/infrastructure/repositories/local_album.repository.dart'; import 'package:immich_mobile/infrastructure/repositories/remote_album.repository.dart'; import 'package:immich_mobile/providers/infrastructure/album.provider.dart'; +import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart'; import 'package:immich_mobile/providers/infrastructure/store.provider.dart'; import 'package:immich_mobile/repositories/drift_album_api_repository.dart'; import 'package:immich_mobile/utils/debug_print.dart'; @@ -16,6 +19,7 @@ final syncLinkedAlbumServiceProvider = Provider( ref.watch(remoteAlbumRepository), ref.watch(driftAlbumApiRepositoryProvider), ref.watch(storeServiceProvider), + cancellation: ref.watch(cancellationProvider), ), ); @@ -24,13 +28,15 @@ class SyncLinkedAlbumService { final DriftRemoteAlbumRepository _remoteAlbumRepository; final DriftAlbumApiRepository _albumApiRepository; final StoreService _storeService; + final Completer? _cancellation; SyncLinkedAlbumService( this._localAlbumRepository, this._remoteAlbumRepository, this._albumApiRepository, - this._storeService, - ); + this._storeService, { + this._cancellation, + }); final _log = Logger("SyncLinkedAlbumService"); @@ -55,7 +61,11 @@ class SyncLinkedAlbumService { final assetIds = await _remoteAlbumRepository.getLinkedAssetIds(userId, localAlbum.id, linkedRemoteAlbumId); _log.fine("Syncing ${assetIds.length} assets to remote album: ${remoteAlbum.name}"); if (assetIds.isNotEmpty) { - final album = await _albumApiRepository.addAssets(remoteAlbum.id, assetIds); + final album = await _albumApiRepository.addAssets( + remoteAlbum.id, + assetIds, + abortTrigger: _cancellation?.future, + ); await _remoteAlbumRepository.addAssets(remoteAlbum.id, album.added); } }), diff --git a/mobile/lib/domain/services/sync_stream.service.dart b/mobile/lib/domain/services/sync_stream.service.dart index 200dca2418..08109b25d3 100644 --- a/mobile/lib/domain/services/sync_stream.service.dart +++ b/mobile/lib/domain/services/sync_stream.service.dart @@ -38,7 +38,7 @@ class SyncStreamService { final IPermissionRepository _permissionRepository; final SyncMigrationRepository _syncMigrationRepository; final ApiService _api; - final bool Function()? _cancelChecker; + final Completer? _cancellation; SyncStreamService({ required this._syncApiRepository, @@ -49,10 +49,10 @@ class SyncStreamService { required this._permissionRepository, required this._syncMigrationRepository, required this._api, - this._cancelChecker, + this._cancellation, }); - bool get isCancelled => _cancelChecker?.call() ?? false; + bool get isCancelled => _cancellation?.isCompleted ?? false; Future sync() async { _logger.info("Remote sync request for user"); @@ -80,10 +80,15 @@ class SyncStreamService { _handleEvents, serverVersion: serverSemVer, onReset: () => shouldReset = true, + abortSignal: _cancellation?.future, ); if (shouldReset) { _logger.info("Resetting sync state as requested by server"); - await _syncApiRepository.streamChanges(_handleEvents, serverVersion: serverSemVer); + await _syncApiRepository.streamChanges( + _handleEvents, + serverVersion: serverSemVer, + abortSignal: _cancellation?.future, + ); } previousLength = migrations.length; @@ -318,7 +323,7 @@ class SyncStreamService { } Future handleWsAssetUploadReadyV1Batch(List batchData) async { - if (batchData.isEmpty) { + if (batchData.isEmpty || isCancelled) { return; } @@ -361,7 +366,7 @@ class SyncStreamService { } Future handleWsAssetUploadReadyV2Batch(List batchData) async { - if (batchData.isEmpty) { + if (batchData.isEmpty || isCancelled) { return; } @@ -404,6 +409,9 @@ class SyncStreamService { } Future handleWsAssetEditReadyV1(dynamic data) async { + if (isCancelled) { + return; + } _logger.info('Processing AssetEditReadyV1 event'); try { @@ -444,6 +452,9 @@ class SyncStreamService { } Future handleWsAssetEditReadyV2(dynamic data) async { + if (isCancelled) { + return; + } _logger.info('Processing AssetEditReadyV2 event'); try { diff --git a/mobile/lib/domain/utils/background_sync.dart b/mobile/lib/domain/utils/background_sync.dart index 030e77cd54..82f397d9b6 100644 --- a/mobile/lib/domain/utils/background_sync.dart +++ b/mobile/lib/domain/utils/background_sync.dart @@ -50,53 +50,27 @@ class BackgroundSyncManager { }); Future cancel() async { - final futures = []; - - if (_syncTask != null) { - futures.add(_syncTask!.future); + final tasks = [ + _syncTask, + _syncWebsocketTask, + _cloudIdSyncTask, + _linkedAlbumSyncTask, + _deviceAlbumSyncTask, + _hashTask, + ]; + final futures = [ + for (final task in tasks) + if (task != null) task.future, + ]; + for (final task in tasks) { + task?.cancel(); } - _syncTask?.cancel(); _syncTask = null; - - if (_syncWebsocketTask != null) { - futures.add(_syncWebsocketTask!.future); - } - _syncWebsocketTask?.cancel(); _syncWebsocketTask = null; - - if (_cloudIdSyncTask != null) { - futures.add(_cloudIdSyncTask!.future); - } - _cloudIdSyncTask?.cancel(); _cloudIdSyncTask = null; - - if (_linkedAlbumSyncTask != null) { - futures.add(_linkedAlbumSyncTask!.future); - } - _linkedAlbumSyncTask?.cancel(); _linkedAlbumSyncTask = null; - - try { - await Future.wait(futures); - } on CanceledError { - // Ignore cancellation errors - } - } - - Future cancelLocal() async { - final futures = []; - - if (_hashTask != null) { - futures.add(_hashTask!.future); - } - _hashTask?.cancel(); - _hashTask = null; - - if (_deviceAlbumSyncTask != null) { - futures.add(_deviceAlbumSyncTask!.future); - } - _deviceAlbumSyncTask?.cancel(); _deviceAlbumSyncTask = null; + _hashTask = null; try { await Future.wait(futures); diff --git a/mobile/lib/domain/utils/migrate_cloud_ids.dart b/mobile/lib/domain/utils/migrate_cloud_ids.dart index 32188b4838..efef6e8327 100644 --- a/mobile/lib/domain/utils/migrate_cloud_ids.dart +++ b/mobile/lib/domain/utils/migrate_cloud_ids.dart @@ -1,3 +1,5 @@ +import 'dart:async'; + import 'package:drift/drift.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/constants/constants.dart'; @@ -9,6 +11,7 @@ import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; import 'package:immich_mobile/infrastructure/repositories/local_album.repository.dart'; import 'package:immich_mobile/platform/native_sync_api.g.dart'; import 'package:immich_mobile/providers/api.provider.dart'; +import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart'; import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; import 'package:immich_mobile/providers/infrastructure/sync.provider.dart'; import 'package:immich_mobile/providers/server_info.provider.dart'; @@ -51,9 +54,10 @@ Future syncCloudIds(ProviderContainer ref) async { } final assetApi = ref.read(apiServiceProvider).assetsApi; + final cancellation = ref.read(cancellationProvider); // Process cloud IDs in paginated batches - await _processCloudIdMappingsInBatches(db, currentUser.id, assetApi, canBulkUpdateMetadata, logger); + await _processCloudIdMappingsInBatches(db, currentUser.id, assetApi, canBulkUpdateMetadata, logger, cancellation); } Future _processCloudIdMappingsInBatches( @@ -62,12 +66,17 @@ Future _processCloudIdMappingsInBatches( AssetsApi assetsApi, bool canBulkUpdate, Logger logger, + Completer cancellation, ) async { const pageSize = 20000; String? lastLocalId; final seenRemoteAssetIds = {}; while (true) { + if (cancellation.isCompleted) { + logger.warning('Cloud ID migration cancelled. Stopping batch processing.'); + break; + } final mappings = await _fetchCloudIdMappings(drift, userId, pageSize, lastLocalId); if (mappings.isEmpty) { break; @@ -98,9 +107,9 @@ Future _processCloudIdMappingsInBatches( if (items.isNotEmpty) { if (canBulkUpdate) { - await _bulkUpdateCloudIds(assetsApi, items); + await _bulkUpdateCloudIds(assetsApi, items, cancellation.future); } else { - await _sequentialUpdateCloudIds(assetsApi, items); + await _sequentialUpdateCloudIds(assetsApi, items, cancellation); } } @@ -111,20 +120,35 @@ Future _processCloudIdMappingsInBatches( } } -Future _sequentialUpdateCloudIds(AssetsApi assetsApi, List items) async { +Future _sequentialUpdateCloudIds( + AssetsApi assetsApi, + List items, + Completer cancellation, +) async { for (final item in items) { + if (cancellation.isCompleted) { + break; + } final upsertItem = AssetMetadataUpsertItemDto(key: item.key, value: item.value); try { - await assetsApi.updateAssetMetadata(item.assetId, AssetMetadataUpsertDto(items: [upsertItem])); + await assetsApi.updateAssetMetadata( + item.assetId, + AssetMetadataUpsertDto(items: [upsertItem]), + abortTrigger: cancellation.future, + ); } catch (error, stack) { Logger('migrateCloudIds').warning('Failed to update metadata for asset ${item.assetId}', error, stack); } } } -Future _bulkUpdateCloudIds(AssetsApi assetsApi, List items) async { +Future _bulkUpdateCloudIds( + AssetsApi assetsApi, + List items, + Future abortTrigger, +) async { try { - await assetsApi.updateBulkAssetMetadata(AssetMetadataBulkUpsertDto(items: items)); + await assetsApi.updateBulkAssetMetadata(AssetMetadataBulkUpsertDto(items: items), abortTrigger: abortTrigger); } catch (error, stack) { Logger('migrateCloudIds').warning('Failed to bulk update metadata', error, stack); } diff --git a/mobile/lib/infrastructure/repositories/sync_api.repository.dart b/mobile/lib/infrastructure/repositories/sync_api.repository.dart index d9d262e64f..0a7d7d6473 100644 --- a/mobile/lib/infrastructure/repositories/sync_api.repository.dart +++ b/mobile/lib/infrastructure/repositories/sync_api.repository.dart @@ -29,6 +29,7 @@ class SyncApiRepository { Function()? onReset, int batchSize = kSyncEventBatchSize, http.Client? httpClient, + Future? abortSignal, }) async { final stopwatch = Stopwatch()..start(); final client = httpClient ?? NetworkRepository.client; @@ -36,7 +37,7 @@ class SyncApiRepository { final headers = {'Content-Type': 'application/json', 'Accept': 'application/jsonlines+json'}; - final request = http.Request('POST', Uri.parse(endpoint)); + final request = http.AbortableRequest('POST', Uri.parse(endpoint), abortTrigger: abortSignal); request.headers.addAll(headers); request.body = jsonEncode( SyncStreamDto( diff --git a/mobile/lib/providers/infrastructure/cancel.provider.dart b/mobile/lib/providers/infrastructure/cancel.provider.dart index 6851861e1a..9d4a6790f2 100644 --- a/mobile/lib/providers/infrastructure/cancel.provider.dart +++ b/mobile/lib/providers/infrastructure/cancel.provider.dart @@ -1,8 +1,9 @@ +import 'dart:async'; + import 'package:hooks_riverpod/hooks_riverpod.dart'; -/// Provider holding a boolean function that returns true when cancellation is requested. -/// A computation running in the isolate uses the function to implement cooperative cancellation. -final cancellationProvider = Provider( +/// Holds the isolate's cancellation signal. +final cancellationProvider = Provider>( // This will be overridden in the isolate's container. // Throwing ensures it's not used without an override. (ref) => throw UnimplementedError( diff --git a/mobile/lib/providers/infrastructure/sync.provider.dart b/mobile/lib/providers/infrastructure/sync.provider.dart index 75c8e09326..700b51f12d 100644 --- a/mobile/lib/providers/infrastructure/sync.provider.dart +++ b/mobile/lib/providers/infrastructure/sync.provider.dart @@ -26,7 +26,7 @@ final syncStreamServiceProvider = Provider( permissionRepository: ref.watch(permissionRepositoryProvider), syncMigrationRepository: ref.watch(syncMigrationRepositoryProvider), api: ref.watch(apiServiceProvider), - cancelChecker: ref.watch(cancellationProvider), + cancellation: ref.watch(cancellationProvider), ), ); @@ -42,6 +42,7 @@ final localSyncServiceProvider = Provider( assetMediaRepository: ref.watch(assetMediaRepositoryProvider), permissionRepository: ref.watch(permissionRepositoryProvider), nativeSyncApi: ref.watch(nativeSyncApiProvider), + cancellation: ref.watch(cancellationProvider), ), ); @@ -51,5 +52,6 @@ final hashServiceProvider = Provider( localAssetRepository: ref.watch(localAssetRepository), nativeSyncApi: ref.watch(nativeSyncApiProvider), trashedLocalAssetRepository: ref.watch(trashedLocalAssetRepository), + cancellation: ref.watch(cancellationProvider), ), ); diff --git a/mobile/lib/repositories/drift_album_api_repository.dart b/mobile/lib/repositories/drift_album_api_repository.dart index a0c7a3732a..1f08564698 100644 --- a/mobile/lib/repositories/drift_album_api_repository.dart +++ b/mobile/lib/repositories/drift_album_api_repository.dart @@ -41,8 +41,14 @@ class DriftAlbumApiRepository extends ApiRepository { return (removed: removed, failed: failed); } - Future<({List added, List failed})> addAssets(String albumId, Iterable assetIds) async { - final response = await checkNull(_api.addAssetsToAlbum(albumId, BulkIdsDto(ids: assetIds.toList()))); + Future<({List added, List failed})> addAssets( + String albumId, + Iterable assetIds, { + Future? abortTrigger, + }) async { + final response = await checkNull( + _api.addAssetsToAlbum(albumId, BulkIdsDto(ids: assetIds.toList()), abortTrigger: abortTrigger), + ); final List added = [], failed = []; for (final dto in response) { if (dto.success) { diff --git a/mobile/lib/utils/isolate.dart b/mobile/lib/utils/isolate.dart index 20b56d4875..ab3b19b78f 100644 --- a/mobile/lib/utils/isolate.dart +++ b/mobile/lib/utils/isolate.dart @@ -8,10 +8,9 @@ import 'package:immich_mobile/entities/store.entity.dart'; import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart'; import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; import 'package:immich_mobile/utils/bootstrap.dart'; -import 'package:immich_mobile/utils/debug_print.dart'; import 'package:immich_mobile/wm_executor.dart'; import 'package:logging/logging.dart'; -import 'package:worker_manager/worker_manager.dart'; +import 'package:worker_manager/worker_manager.dart' show Cancelable; class InvalidIsolateUsageException implements Exception { const InvalidIsolateUsageException(); @@ -30,50 +29,27 @@ Cancelable runInIsolateGentle({ throw const InvalidIsolateUsageException(); } - return workerManagerPatch.executeGentle((cancelledChecker) async { - T? result; - await runZonedGuarded( - () async { - BackgroundIsolateBinaryMessenger.ensureInitialized(token); - DartPluginRegistrant.ensureInitialized(); + return workerManagerPatch.executeGentle((onCancel) async { + BackgroundIsolateBinaryMessenger.ensureInitialized(token); + DartPluginRegistrant.ensureInitialized(); - final (drift, logDb) = await Bootstrap.initDomain(shouldBufferLogs: false, listenStoreUpdates: false); - final ref = ProviderContainer( - overrides: [ - cancellationProvider.overrideWithValue(cancelledChecker), - driftProvider.overrideWith(driftOverride(drift)), - ], - ); - - Logger log = Logger("IsolateLogger"); - - try { - result = await computation(ref); - } on CanceledError { - log.warning("Computation cancelled ${debugLabel == null ? '' : ' for $debugLabel'}"); - } catch (error, stack) { - log.severe("Error in runInIsolateGentle ${debugLabel == null ? '' : ' for $debugLabel'}", error, stack); - } finally { - try { - ref.dispose(); - - await Store.dispose(); - await LogService.I.dispose(); - await logDb.close(); - await drift.close(); - } catch (error, stack) { - dPrint(() => "Error closing resources in isolate: $error, $stack"); - } finally { - ref.dispose(); - // Delay to ensure all resources are released - await Future.delayed(const Duration(seconds: 2)); - } - } - }, - (error, stack) { - dPrint(() => "Error in isolate $debugLabel zone: $error, $stack"); - }, + final log = Logger("IsolateLogger"); + final (drift, logDb) = await Bootstrap.initDomain(shouldBufferLogs: false, listenStoreUpdates: false); + final ref = ProviderContainer( + overrides: [cancellationProvider.overrideWithValue(onCancel), driftProvider.overrideWith(driftOverride(drift))], ); - return result; + + try { + return await computation(ref); + } catch (error, stack) { + log.severe("Error in runInIsolateGentle${debugLabel == null ? '' : ' for $debugLabel'}", error, stack); + return null; + } finally { + ref.dispose(); + await Store.dispose(); + await LogService.I.dispose(); + await logDb.close(); + await drift.close(); + } }); } diff --git a/mobile/lib/utils/isolate_worker.dart b/mobile/lib/utils/isolate_worker.dart new file mode 100644 index 0000000000..60048c2c81 --- /dev/null +++ b/mobile/lib/utils/isolate_worker.dart @@ -0,0 +1,163 @@ +// Forked from worker_manager's `WorkerImpl` (src/worker/worker_io.dart): a +// `CancelRequest` completes the computation's [Completer] (so it can await +// cancellation and unwind) instead of flipping a polled flag, and [shutdown] +// lets the isolate drain and exit on its own rather than force-killing it. Only +// the gentle-with-cancellation path immich uses is kept. +// +// ignore_for_file: implementation_imports + +import 'dart:async'; +import 'dart:isolate'; + +import 'package:worker_manager/src/scheduling/task.dart'; +import 'package:worker_manager/src/worker/cancel_request.dart'; +import 'package:worker_manager/src/worker/result.dart'; + +/// A worker computation that receives a [Completer] which completes on +/// cancellation: await its future to react promptly, or read `isCompleted`. +typedef GentleExecution = FutureOr Function(Completer onCancel); + +class _Shutdown { + const _Shutdown(); +} + +class IsolateWorker { + IsolateWorker(); + + Isolate? _isolate; + RawReceivePort? _receivePort; + SendPort? _sendPort; + Completer? _sendPortReceived; + Completer? _result; + + String? taskId; + + bool get initialized => _sendPortReceived?.isCompleted ?? false; + + bool get initializing { + final sendPortReceived = _sendPortReceived; + return sendPortReceived != null && !sendPortReceived.isCompleted; + } + + Future initialize() async { + final sendPortReceived = _sendPortReceived = Completer(); + final receivePort = _receivePort = RawReceivePort(); + receivePort.handler = (Object message) { + if (message is SendPort) { + _sendPort = message; + sendPortReceived.complete(); + } else if (message is ResultSuccess) { + _result?.complete(message.value); + _afterTask(); + } else if (message is ResultError) { + _result?.completeError(message.error, message.stackTrace); + _afterTask(); + } + }; + _isolate = await Isolate.spawn(_isolateEntry, receivePort.sendPort, errorsAreFatal: false); + await sendPortReceived.future; + } + + Future work(Task task) async { + taskId = task.id; + final result = _result = Completer(); + _sendPort!.send(task.execution); + return await (result.future as Future); + } + + /// Cancels the current task without retiring the worker. + void cancelGentle() => _sendPort?.send(CancelRequest()); + + /// Cancels any in-flight task and awaits the isolate exiting on its own — no + /// force-kill, so `finally` blocks and native cleanup always run. + /// + /// Detaches the slot up front so a concurrent [initialize] can revive it + /// without colliding (revival installs fresh ports while this drains the ones + /// it captured locally). A revived worker is always idle, so the still-live + /// receive-port handler can't misroute a result. + Future shutdown() async { + final sendPortReceived = _sendPortReceived; + if (sendPortReceived != null && !sendPortReceived.isCompleted) { + await sendPortReceived.future; + } + + final isolate = _isolate; + final receivePort = _receivePort; + final sendPort = _sendPort; + if (isolate == null || receivePort == null || sendPort == null) { + return; + } + _isolate = null; + _sendPort = null; + _sendPortReceived = null; + // Not _result: an in-flight task still delivers it before exiting; nulling + // here would drop that and hang work()'s caller. + + final exited = Completer(); + final exitPort = RawReceivePort(); + exitPort.handler = (_) { + if (!exited.isCompleted) { + exited.complete(); + } + exitPort.close(); + }; + isolate.addOnExitListener(exitPort.sendPort); + sendPort.send(const _Shutdown()); + await exited.future; + receivePort.close(); + } + + void _afterTask() { + taskId = null; + _result = null; + } + + static void _isolateEntry(SendPort sendPort) { + final receivePort = RawReceivePort(); + sendPort.send(receivePort.sendPort); + // One task at a time, so a single completer suffices; null between tasks. + Completer? onCancel; + void cancel() { + if (onCancel?.isCompleted == false) { + onCancel!.complete(); + } + } + + var shuttingDown = false; + var running = false; + receivePort.handler = (message) async { + if (message is _Shutdown) { + shuttingDown = true; + cancel(); + if (!running) { + Isolate.exit(); + } + return; + } + if (message is CancelRequest) { + cancel(); + return; + } + final execution = message as GentleExecution; + onCancel = Completer(); + running = true; + Result result; + try { + result = ResultSuccess(await execution(onCancel!)); + } catch (error, stackTrace) { + result = ResultError(error, stackTrace); + } finally { + onCancel = null; + running = false; + } + if (shuttingDown) { + // An isolate that has used platform channels can't exit on its own (Flutter's BackgroundIsolateBinaryMessenger + // opens an undisposable port), so closing our ports isn't enough. Isolate.exit delivers the result as its final + // message and terminates. It's abrupt (skips pending finally/microtasks) but safe here: the computation and its + // `finally` are already done and there's no await before this, so nothing pending is skipped. + Isolate.exit(sendPort, result); + } + sendPort.send(result); + }; + } +} diff --git a/mobile/lib/wm_executor.dart b/mobile/lib/wm_executor.dart index 2eb31fe300..e873c5f76d 100644 --- a/mobile/lib/wm_executor.dart +++ b/mobile/lib/wm_executor.dart @@ -6,8 +6,8 @@ import 'dart:math'; import 'package:collection/collection.dart'; import 'package:flutter/foundation.dart'; +import 'package:immich_mobile/utils/isolate_worker.dart'; import 'package:worker_manager/src/number_of_processors/processors_io.dart'; -import 'package:worker_manager/src/worker/worker.dart'; import 'package:worker_manager/worker_manager.dart'; final workerManagerPatch = _Executor(); @@ -16,6 +16,13 @@ final workerManagerPatch = _Executor(); const _minId = -9007199254740992; const _maxId = 9007199254740992; +class _GentleTask extends Task implements Gentle { + @override + final GentleExecution execution; + + _GentleTask({required super.id, required super.completer, required super.workPriority, required this.execution}); +} + class Mixinable { late final itSelf = this as T; } @@ -51,13 +58,13 @@ mixin _ExecutorLogger on Mixinable<_Executor> { class _Executor extends Mixinable<_Executor> with _ExecutorLogger { final _queue = PriorityQueue(); - final _pool = []; + final _pool = []; var _nextTaskId = _minId; var _dynamicSpawning = false; var _isolatesCount = numberOfProcessors; @visibleForTesting - UnmodifiableListView get pool => UnmodifiableListView(_pool); + UnmodifiableListView get pool => UnmodifiableListView(_pool); @override Future init({int? isolatesCount, bool? dynamicSpawning}) async { @@ -80,117 +87,37 @@ class _Executor extends Mixinable<_Executor> with _ExecutorLogger { @override Future dispose() async { _queue.clear(); - for (final worker in _pool) { - if (worker.initialized || worker.initializing) { - worker.kill(); - } - } + final shutdown = _pool.map((worker) => worker.shutdown()).toList(growable: false); _pool.clear(); + await Future.wait(shutdown); super.dispose(); } - Cancelable execute(Execute execution, {WorkPriority priority = WorkPriority.immediately}) { - return _createCancelable(execution: execution, priority: priority); - } - - Cancelable executeNow(ExecuteGentle execution) { - final task = TaskGentle( - id: "", - workPriority: WorkPriority.immediately, - execution: execution, - completer: Completer(), - ); - - Future run() async { - try { - final result = await execution(() => task.canceled); - task.complete(result, null, null); - } catch (error, st) { - task.complete(null, error, st); - } - } - - run(); - return Cancelable(completer: task.completer, onCancel: () => _cancel(task)); - } - - Cancelable executeWithPort( - ExecuteWithPort execution, { - WorkPriority priority = WorkPriority.immediately, - required void Function(T value) onMessage, - }) { - return _createCancelable( - execution: execution, - priority: priority, - onMessage: (message) => onMessage(message as T), - ); - } - - Cancelable executeGentle(ExecuteGentle execution, {WorkPriority priority = WorkPriority.immediately}) { - return _createCancelable(execution: execution, priority: priority); - } - - Cancelable executeGentleWithPort( - ExecuteGentleWithPort execution, { - WorkPriority priority = WorkPriority.immediately, - required void Function(T value) onMessage, - }) { - return _createCancelable( - execution: execution, - priority: priority, - onMessage: (message) => onMessage(message as T), - ); - } - - void _createWorkers() { - for (var i = 0; i < _isolatesCount; i++) { - _pool.add(Worker()); - } - } - - Future _initializeWorkers() async { - await Future.wait(_pool.map((e) => e.initialize())); - } - - Cancelable _createCancelable({ - required Function execution, - WorkPriority priority = WorkPriority.immediately, - void Function(Object value)? onMessage, - }) { + /// Runs [execution] on a worker isolate; its [Completer] completes when the + /// returned [Cancelable] is cancelled. + Cancelable executeGentle(GentleExecution execution, {WorkPriority priority = WorkPriority.immediately}) { if (_nextTaskId + 1 == _maxId) { _nextTaskId = _minId; } final id = _nextTaskId.toString(); _nextTaskId++; - late final Task task; - final completer = Completer(); - if (execution is ExecuteWithPort) { - task = TaskWithPort( - id: id, - workPriority: priority, - execution: execution, - completer: completer, - onMessage: onMessage!, - ); - } else if (execution is ExecuteGentle) { - task = TaskGentle(id: id, workPriority: priority, execution: execution, completer: completer); - } else if (execution is ExecuteGentleWithPort) { - task = TaskGentleWithPort( - id: id, - workPriority: priority, - execution: execution, - completer: completer, - onMessage: onMessage!, - ); - } else if (execution is Execute) { - task = TaskRegular(id: id, workPriority: priority, execution: execution, completer: completer); - } + final task = _GentleTask(id: id, workPriority: priority, execution: execution, completer: Completer()); _queue.add(task); _schedule(); logTaskAdded(task.id); return Cancelable(completer: task.completer, onCancel: () => _cancel(task)); } + void _createWorkers() { + for (var i = 0; i < _isolatesCount; i++) { + _pool.add(IsolateWorker()); + } + } + + Future _initializeWorkers() async { + await Future.wait(_pool.map((e) => e.initialize())); + } + Future _ensureWorkersInitialized() async { if (_pool.isEmpty) { _createWorkers(); @@ -240,7 +167,9 @@ class _Executor extends Mixinable<_Executor> with _ExecutorLogger { ) .whenComplete(() { if (_dynamicSpawning && _queue.isEmpty) { - availableWorker.kill(); + // Retire the idle worker; shutdown() nulls its fields so the husk + // stays pooled and is revived by initialize() if work arrives. + unawaited(availableWorker.shutdown()); } _schedule(); }); @@ -250,15 +179,8 @@ class _Executor extends Mixinable<_Executor> with _ExecutorLogger { void _cancel(Task task) { task.cancel(); _queue.remove(task); - final targetWorker = _pool.firstWhereOrNull((worker) => worker.taskId == task.id); - if (task is Gentle) { - targetWorker?.cancelGentle(); - } else { - targetWorker?.kill(); - if (!_dynamicSpawning) { - targetWorker?.initialize(); - } - } + // All tasks are gentle: signal cancellation; the worker unwinds on its own. + _pool.firstWhereOrNull((worker) => worker.taskId == task.id)?.cancelGentle(); super._cancel(task); } } diff --git a/mobile/test/domain/services/sync_stream_service_test.dart b/mobile/test/domain/services/sync_stream_service_test.dart index ef29997e0b..e6db92b31a 100644 --- a/mobile/test/domain/services/sync_stream_service_test.dart +++ b/mobile/test/domain/services/sync_stream_service_test.dart @@ -36,13 +36,6 @@ class _AbortCallbackWrapper { class _MockAbortCallbackWrapper extends Mock implements _AbortCallbackWrapper {} -class _CancellationWrapper { - const _CancellationWrapper(); - - bool call() => false; -} - -class _MockCancellationWrapper extends Mock implements _CancellationWrapper {} void main() { late SyncStreamService sut; @@ -94,9 +87,13 @@ void main() { when(() => mockAbortCallbackWrapper()).thenReturn(false); - when(() => mockSyncApiRepo.streamChanges(any(), serverVersion: any(named: 'serverVersion'))).thenAnswer(( - invocation, - ) async { + when( + () => mockSyncApiRepo.streamChanges( + any(), + serverVersion: any(named: 'serverVersion'), + abortSignal: any(named: 'abortSignal'), + ), + ).thenAnswer((invocation) async { handleEventsCallback = invocation.positionalArguments.first; }); @@ -105,6 +102,7 @@ void main() { any(), onReset: any(named: 'onReset'), serverVersion: any(named: 'serverVersion'), + abortSignal: any(named: 'abortSignal'), ), ).thenAnswer((invocation) async { handleEventsCallback = invocation.positionalArguments.first; @@ -233,8 +231,7 @@ void main() { }); test("aborts and stops processing if cancelled during iteration", () async { - final cancellationChecker = _MockCancellationWrapper(); - when(() => cancellationChecker()).thenReturn(false); + final cancellation = Completer(); sut = SyncStreamService( syncApiRepository: mockSyncApiRepo, @@ -243,7 +240,7 @@ void main() { trashedLocalAssetRepository: mockTrashedLocalAssetRepo, assetMediaRepository: mockAssetMediaRepo, permissionRepository: mockPermissionRepo, - cancelChecker: cancellationChecker.call, + cancellation: cancellation, api: mockApi, syncMigrationRepository: mockSyncMigrationRepo, ); @@ -252,7 +249,7 @@ void main() { final events = [SyncStreamStub.userDeleteV1, SyncStreamStub.userV1Admin, SyncStreamStub.partnerDeleteV1]; when(() => mockSyncStreamRepo.deleteUsersV1(any())).thenAnswer((_) async { - when(() => cancellationChecker()).thenReturn(true); + cancellation.complete(); }); await handleEventsCallback(events, mockAbortCallbackWrapper.call, mockResetCallbackWrapper.call); @@ -267,8 +264,7 @@ void main() { }); test("aborts and stops processing if cancelled before processing batch", () async { - final cancellationChecker = _MockCancellationWrapper(); - when(() => cancellationChecker()).thenReturn(false); + final cancellation = Completer(); final processingCompleter = Completer(); bool handler1Started = false; @@ -284,7 +280,7 @@ void main() { trashedLocalAssetRepository: mockTrashedLocalAssetRepo, assetMediaRepository: mockAssetMediaRepo, permissionRepository: mockPermissionRepo, - cancelChecker: cancellationChecker.call, + cancellation: cancellation, api: mockApi, syncMigrationRepository: mockSyncMigrationRepo, ); @@ -303,7 +299,7 @@ void main() { expect(handler1Started, isTrue); // Signal cancellation while handler 1 is waiting - when(() => cancellationChecker()).thenReturn(true); + cancellation.complete(); await pumpEventQueue(); processingCompleter.complete(); diff --git a/mobile/test/unit/utils/isolate_worker_test.dart b/mobile/test/unit/utils/isolate_worker_test.dart new file mode 100644 index 0000000000..5b79395f52 --- /dev/null +++ b/mobile/test/unit/utils/isolate_worker_test.dart @@ -0,0 +1,23 @@ +import 'package:flutter_test/flutter_test.dart'; +import 'package:immich_mobile/wm_executor.dart'; + +void main() { + tearDown(workerManagerPatch.dispose); + + test('dispose() drains a cancelled task and delivers its result', () async { + await workerManagerPatch.init(isolatesCount: 1, dynamicSpawning: false); + + final task = workerManagerPatch.executeGentle((onCancel) async { + await onCancel.future; + return 'drained'; + }); + + await workerManagerPatch.dispose(); + + expect( + await task.timeout(const Duration(seconds: 5)), + 'drained', + reason: 'the worker must finish and return its result, not be killed mid-task', + ); + }); +}