diff --git a/mobile/lib/domain/services/background_worker.service.dart b/mobile/lib/domain/services/background_worker.service.dart index b3d97e0938..29c15bd915 100644 --- a/mobile/lib/domain/services/background_worker.service.dart +++ b/mobile/lib/domain/services/background_worker.service.dart @@ -5,7 +5,6 @@ import 'package:background_downloader/background_downloader.dart'; import 'package:flutter/material.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/constants/constants.dart'; -import 'package:immich_mobile/domain/utils/isolate_lock_manager.dart'; import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; import 'package:immich_mobile/infrastructure/repositories/logger_db.repository.dart'; import 'package:immich_mobile/platform/background_worker_api.g.dart'; @@ -24,6 +23,7 @@ import 'package:immich_mobile/utils/bootstrap.dart'; import 'package:immich_mobile/utils/http_ssl_options.dart'; import 'package:isar/isar.dart'; import 'package:logging/logging.dart'; +import 'package:worker_manager/worker_manager.dart'; class BackgroundWorkerFgService { final BackgroundWorkerFgHostApi _foregroundHostApi; @@ -42,8 +42,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi { final Drift _drift; final DriftLogger _driftLogger; final BackgroundWorkerBgHostApi _backgroundHostApi; - final Logger _logger = Logger('BackgroundUploadBgService'); - late final IsolateLockManager _lockManager; + final Logger _logger = Logger('BackgroundWorkerBgService'); bool _isCleanedUp = false; @@ -59,7 +58,6 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi { driftProvider.overrideWith(driftOverride(drift)), ], ); - _lockManager = IsolateLockManager(onCloseRequest: _cleanup); BackgroundWorkerFlutterApi.setUp(this); } @@ -67,41 +65,30 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi { Future init() async { try { - await loadTranslations(); HttpSSLOptions.apply(applyNative: false); - await _ref.read(authServiceProvider).setOpenApiServiceEndpoint(); - // Initialize the file downloader - await FileDownloader().configure( - globalConfig: [ - // maxConcurrent: 6, maxConcurrentByHost(server):6, maxConcurrentByGroup: 3 - (Config.holdingQueue, (6, 6, 3)), - // On Android, if files are larger than 256MB, run in foreground service - (Config.runInForegroundIfFileLargerThan, 256), - ], - ); - await FileDownloader().trackTasksInGroup(kDownloadGroupLivePhoto, markDownloadedComplete: false); - await FileDownloader().trackTasks(); + await Future.wait([ + loadTranslations(), + workerManager.init(dynamicSpawning: true), + _ref.read(authServiceProvider).setOpenApiServiceEndpoint(), + // Initialize the file downloader + FileDownloader().configure( + globalConfig: [ + // maxConcurrent: 6, maxConcurrentByHost(server):6, maxConcurrentByGroup: 3 + (Config.holdingQueue, (6, 6, 3)), + // On Android, if files are larger than 256MB, run in foreground service + (Config.runInForegroundIfFileLargerThan, 256), + ], + ), + FileDownloader().trackTasksInGroup(kDownloadGroupLivePhoto, markDownloadedComplete: false), + FileDownloader().trackTasks(), + _ref.read(fileMediaRepositoryProvider).enableBackgroundAccess(), + ]); + configureFileDownloaderNotifications(); - await _ref.read(fileMediaRepositoryProvider).enableBackgroundAccess(); - // Notify the host that the background upload service has been initialized and is ready to use - debugPrint("Acquiring background worker lock"); - if (await _lockManager.acquireLock().timeout( - const Duration(seconds: 5), - onTimeout: () { - _lockManager.cancel(); - return false; - }, - )) { - _logger.info("Acquired background worker lock"); - await _backgroundHostApi.onInitialized(); - return; - } - - _logger.warning("Failed to acquire background worker lock"); - await _cleanup(); - await _backgroundHostApi.close(); + // Notify the host that the background worker service has been initialized and is ready to use + _backgroundHostApi.onInitialized(); } catch (error, stack) { _logger.severe("Failed to initialize background worker", error, stack); _backgroundHostApi.close(); @@ -170,6 +157,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi { _isCleanedUp = true; _logger.info("Cleaning up background worker"); final cleanupFutures = [ + workerManager.dispose(), _drift.close(), _driftLogger.close(), _ref.read(backgroundSyncProvider).cancel(), @@ -180,8 +168,6 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi { cleanupFutures.add(_isar.close()); } _ref.dispose(); - _lockManager.releaseLock(); - await Future.wait(cleanupFutures); _logger.info("Background worker resources cleaned up"); } catch (error, stack) { @@ -191,22 +177,29 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi { Future _handleBackup({bool processBulk = true}) async { if (!_isBackupEnabled) { + _logger.info("[_handleBackup 1] Backup is disabled. Skipping backup routine"); return; } + _logger.info("[_handleBackup 2] Enqueuing assets for backup from the background service"); + final currentUser = _ref.read(currentUserProvider); if (currentUser == null) { + _logger.warning("[_handleBackup 3] No current user found. Skipping backup from background"); return; } if (processBulk) { + _logger.info("[_handleBackup 4] Resume backup from background"); return _ref.read(driftBackupProvider.notifier).handleBackupResume(currentUser.id); } final activeTask = await _ref.read(uploadServiceProvider).getActiveTasks(currentUser.id); if (activeTask.isNotEmpty) { + _logger.info("[_handleBackup 5] Resuming backup for active tasks from background"); await _ref.read(uploadServiceProvider).resumeBackup(); } else { + _logger.info("[_handleBackup 6] Starting serial backup for new tasks from background"); await _ref.read(uploadServiceProvider).startBackupSerial(currentUser.id); } } diff --git a/mobile/lib/domain/utils/isolate_lock_manager.dart b/mobile/lib/domain/utils/isolate_lock_manager.dart deleted file mode 100644 index 37de649204..0000000000 --- a/mobile/lib/domain/utils/isolate_lock_manager.dart +++ /dev/null @@ -1,235 +0,0 @@ -import 'dart:isolate'; -import 'dart:ui'; - -import 'package:flutter/foundation.dart'; -import 'package:logging/logging.dart'; - -const String kIsolateLockManagerPort = "immich://isolate_mutex"; - -enum _LockStatus { active, released } - -class _IsolateRequest { - const _IsolateRequest(); -} - -class _HeartbeatRequest extends _IsolateRequest { - // Port for the receiver to send replies back - final SendPort sendPort; - - const _HeartbeatRequest(this.sendPort); - - Map toJson() { - return {'type': 'heartbeat', 'sendPort': sendPort}; - } -} - -class _CloseRequest extends _IsolateRequest { - const _CloseRequest(); - - Map toJson() { - return {'type': 'close'}; - } -} - -class _IsolateResponse { - const _IsolateResponse(); -} - -class _HeartbeatResponse extends _IsolateResponse { - final _LockStatus status; - - const _HeartbeatResponse(this.status); - - Map toJson() { - return {'type': 'heartbeat', 'status': status.index}; - } -} - -typedef OnCloseLockHolderRequest = void Function(); - -class IsolateLockManager { - final String _portName; - bool _hasLock = false; - ReceivePort? _receivePort; - final OnCloseLockHolderRequest? _onCloseRequest; - final Set _waitingIsolates = {}; - // Token object - a new one is created for each acquisition attempt - Object? _currentAcquisitionToken; - - IsolateLockManager({String? portName, OnCloseLockHolderRequest? onCloseRequest}) - : _portName = portName ?? kIsolateLockManagerPort, - _onCloseRequest = onCloseRequest; - - Future acquireLock() async { - if (_hasLock) { - Logger('BackgroundWorkerLockManager').warning("WARNING: [acquireLock] called more than once"); - return true; - } - - // Create a new token - this invalidates any previous attempt - final token = _currentAcquisitionToken = Object(); - - final ReceivePort rp = _receivePort = ReceivePort(_portName); - final SendPort sp = rp.sendPort; - - while (!IsolateNameServer.registerPortWithName(sp, _portName)) { - // This attempt was superseded by a newer one in the same isolate - if (_currentAcquisitionToken != token) { - return false; - } - - await _lockReleasedByHolder(token); - } - - _hasLock = true; - rp.listen(_onRequest); - return true; - } - - Future _lockReleasedByHolder(Object token) async { - SendPort? holder = IsolateNameServer.lookupPortByName(_portName); - debugPrint("Found lock holder: $holder"); - if (holder == null) { - // No holder, try and acquire lock - return; - } - - final ReceivePort tempRp = ReceivePort(); - final SendPort tempSp = tempRp.sendPort; - final bs = tempRp.asBroadcastStream(); - - try { - while (true) { - // Send a heartbeat request with the send port to receive reply from the holder - - debugPrint("Sending heartbeat request to lock holder"); - holder.send(_HeartbeatRequest(tempSp).toJson()); - dynamic answer = await bs.first.timeout(const Duration(seconds: 3), onTimeout: () => null); - - debugPrint("Received heartbeat response from lock holder: $answer"); - // This attempt was superseded by a newer one in the same isolate - if (_currentAcquisitionToken != token) { - break; - } - - if (answer == null) { - // Holder failed, most likely killed without calling releaseLock - // Check if a different waiting isolate took the lock - if (holder == IsolateNameServer.lookupPortByName(_portName)) { - // No, remove the stale lock - IsolateNameServer.removePortNameMapping(_portName); - } - break; - } - - // Unknown message type received for heartbeat request. Try again - _IsolateResponse? response = _parseResponse(answer); - if (response == null || response is! _HeartbeatResponse) { - break; - } - - if (response.status == _LockStatus.released) { - // Holder has released the lock - break; - } - - // If the _LockStatus is active, we check again if the task completed - // by sending a released messaged again, if not, send a new heartbeat again - - // Check if the holder completed its task after the heartbeat - answer = await bs.first.timeout( - const Duration(seconds: 3), - onTimeout: () => const _HeartbeatResponse(_LockStatus.active).toJson(), - ); - - response = _parseResponse(answer); - if (response is _HeartbeatResponse && response.status == _LockStatus.released) { - break; - } - } - } catch (e) { - // Timeout or error - } finally { - tempRp.close(); - } - return; - } - - _IsolateRequest? _parseRequest(dynamic msg) { - if (msg is! Map) { - return null; - } - - return switch (msg['type']) { - 'heartbeat' => _HeartbeatRequest(msg['sendPort']), - 'close' => const _CloseRequest(), - _ => null, - }; - } - - _IsolateResponse? _parseResponse(dynamic msg) { - if (msg is! Map) { - return null; - } - - return switch (msg['type']) { - 'heartbeat' => _HeartbeatResponse(_LockStatus.values[msg['status']]), - _ => null, - }; - } - - // Executed in the isolate with the lock - void _onRequest(dynamic msg) { - final request = _parseRequest(msg); - if (request == null) { - return; - } - - if (request is _HeartbeatRequest) { - // Add the send port to the list of waiting isolates - _waitingIsolates.add(request.sendPort); - request.sendPort.send(const _HeartbeatResponse(_LockStatus.active).toJson()); - return; - } - - if (request is _CloseRequest) { - _onCloseRequest?.call(); - return; - } - } - - void releaseLock() { - if (_hasLock) { - IsolateNameServer.removePortNameMapping(_portName); - - // Notify waiting isolates - for (final port in _waitingIsolates) { - port.send(const _HeartbeatResponse(_LockStatus.released).toJson()); - } - _waitingIsolates.clear(); - - _hasLock = false; - } - - _receivePort?.close(); - _receivePort = null; - } - - void cancel() { - if (_hasLock) { - return; - } - - debugPrint("Cancelling ongoing acquire lock attempts"); - // Create a new token to invalidate ongoing acquire lock attempts - _currentAcquisitionToken = Object(); - } - - void requestHolderToClose() { - if (_hasLock) { - return; - } - - IsolateNameServer.lookupPortByName(_portName)?.send(const _CloseRequest().toJson()); - } -} diff --git a/mobile/lib/main.dart b/mobile/lib/main.dart index 9066c5bfc7..4f74c30e3b 100644 --- a/mobile/lib/main.dart +++ b/mobile/lib/main.dart @@ -17,9 +17,9 @@ import 'package:immich_mobile/extensions/build_context_extensions.dart'; import 'package:immich_mobile/generated/codegen_loader.g.dart'; import 'package:immich_mobile/providers/app_life_cycle.provider.dart'; import 'package:immich_mobile/providers/asset_viewer/share_intent_upload.provider.dart'; -import 'package:immich_mobile/providers/backup/backup.provider.dart'; import 'package:immich_mobile/providers/db.provider.dart'; import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; +import 'package:immich_mobile/providers/infrastructure/platform.provider.dart'; import 'package:immich_mobile/providers/locale_provider.dart'; import 'package:immich_mobile/providers/routes.provider.dart'; import 'package:immich_mobile/providers/theme.provider.dart'; @@ -205,9 +205,9 @@ class ImmichAppState extends ConsumerState with WidgetsBindingObserve // needs to be delayed so that EasyLocalization is working if (Store.isBetaTimelineEnabled) { ref.read(backgroundServiceProvider).disableService(); - ref.read(driftBackgroundUploadFgService).enable(); + ref.read(backgroundWorkerFgServiceProvider).enable(); } else { - ref.read(driftBackgroundUploadFgService).disable(); + ref.read(backgroundWorkerFgServiceProvider).disable(); ref.read(backgroundServiceProvider).resumeServiceIfEnabled(); } }); diff --git a/mobile/lib/pages/common/change_experience.page.dart b/mobile/lib/pages/common/change_experience.page.dart index ffdba1fb71..8779eecd7f 100644 --- a/mobile/lib/pages/common/change_experience.page.dart +++ b/mobile/lib/pages/common/change_experience.page.dart @@ -13,6 +13,7 @@ import 'package:immich_mobile/providers/backup/backup.provider.dart'; import 'package:immich_mobile/providers/backup/manual_upload.provider.dart'; import 'package:immich_mobile/providers/gallery_permission.provider.dart'; import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; +import 'package:immich_mobile/providers/infrastructure/platform.provider.dart'; import 'package:immich_mobile/providers/infrastructure/readonly_mode.provider.dart'; import 'package:immich_mobile/providers/websocket.provider.dart'; import 'package:immich_mobile/services/background.service.dart'; @@ -79,7 +80,7 @@ class _ChangeExperiencePageState extends ConsumerState { ref.read(readonlyModeProvider.notifier).setReadonlyMode(false); await migrateStoreToIsar(ref.read(isarProvider), ref.read(driftProvider)); await ref.read(backgroundServiceProvider).resumeServiceIfEnabled(); - await ref.read(driftBackgroundUploadFgService).disable(); + await ref.read(backgroundWorkerFgServiceProvider).disable(); } await IsarStoreRepository(ref.read(isarProvider)).upsert(StoreKey.betaTimeline, widget.switchingToBeta); diff --git a/mobile/lib/pages/common/splash_screen.page.dart b/mobile/lib/pages/common/splash_screen.page.dart index f41cf317bf..c64d6fe80f 100644 --- a/mobile/lib/pages/common/splash_screen.page.dart +++ b/mobile/lib/pages/common/splash_screen.page.dart @@ -2,10 +2,8 @@ import 'package:auto_route/auto_route.dart'; import 'package:flutter/material.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/models/store.model.dart'; -import 'package:immich_mobile/domain/utils/isolate_lock_manager.dart'; import 'package:immich_mobile/entities/store.entity.dart'; import 'package:immich_mobile/providers/auth.provider.dart'; -import 'package:immich_mobile/providers/background_sync.provider.dart'; import 'package:immich_mobile/providers/backup/backup.provider.dart'; import 'package:immich_mobile/providers/gallery_permission.provider.dart'; import 'package:immich_mobile/providers/server_info.provider.dart'; @@ -23,23 +21,14 @@ class SplashScreenPage extends StatefulHookConsumerWidget { class SplashScreenPageState extends ConsumerState { final log = Logger("SplashScreenPage"); - @override void initState() { super.initState(); - final lockManager = ref.read(isolateLockManagerProvider(kIsolateLockManagerPort)); - - lockManager.requestHolderToClose(); - lockManager - .acquireLock() - .timeout(const Duration(seconds: 5)) - .whenComplete( - () => ref - .read(authProvider.notifier) - .setOpenApiServiceEndpoint() - .then(logConnectionInfo) - .whenComplete(() => resumeSession()), - ); + ref + .read(authProvider.notifier) + .setOpenApiServiceEndpoint() + .then(logConnectionInfo) + .whenComplete(() => resumeSession()); } void logConnectionInfo(String? endpoint) { diff --git a/mobile/lib/providers/app_life_cycle.provider.dart b/mobile/lib/providers/app_life_cycle.provider.dart index 18b7c3464a..cfe10a472d 100644 --- a/mobile/lib/providers/app_life_cycle.provider.dart +++ b/mobile/lib/providers/app_life_cycle.provider.dart @@ -2,7 +2,6 @@ import 'dart:async'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/services/log.service.dart'; -import 'package:immich_mobile/domain/utils/isolate_lock_manager.dart'; import 'package:immich_mobile/entities/store.entity.dart'; import 'package:immich_mobile/models/backup/backup_state.model.dart'; import 'package:immich_mobile/providers/album/album.provider.dart'; @@ -139,27 +138,10 @@ class AppLifeCycleNotifier extends StateNotifier { Future _handleBetaTimelineResume() async { _ref.read(backupProvider.notifier).cancelBackup(); - final lockManager = _ref.read(isolateLockManagerProvider(kIsolateLockManagerPort)); // Give isolates time to complete any ongoing database transactions await Future.delayed(const Duration(milliseconds: 500)); - lockManager.requestHolderToClose(); - - // Add timeout to prevent deadlock on lock acquisition - try { - await lockManager.acquireLock().timeout( - const Duration(seconds: 10), - onTimeout: () { - _log.warning("Lock acquisition timed out, proceeding without lock"); - throw TimeoutException("Lock acquisition timed out", const Duration(seconds: 10)); - }, - ); - } catch (e) { - _log.warning("Failed to acquire lock: $e"); - return; - } - final backgroundManager = _ref.read(backgroundSyncProvider); final isAlbumLinkedSyncEnable = _ref.read(appSettingsServiceProvider).getSetting(AppSettingsEnum.syncAlbums); final isEnableBackup = _ref.read(appSettingsServiceProvider).getSetting(AppSettingsEnum.enableBackup); @@ -186,7 +168,6 @@ class AppLifeCycleNotifier extends StateNotifier { } finally { // Ensure lock is released even if operations fail try { - lockManager.releaseLock(); _log.info("Lock released after background sync operations"); } catch (lockError) { _log.warning("Failed to release lock after error: $lockError"); @@ -241,28 +222,6 @@ class AppLifeCycleNotifier extends StateNotifier { if (_ref.read(backupProvider.notifier).backupProgress != BackUpProgressEnum.manualInProgress) { _ref.read(backupProvider.notifier).cancelBackup(); } - } else { - final backgroundManager = _ref.read(backgroundSyncProvider); - - // Cancel operations with extended timeout to allow database transactions to complete - try { - await Future.wait([ - backgroundManager.cancel().timeout(const Duration(seconds: 10)), - backgroundManager.cancelLocal().timeout(const Duration(seconds: 10)), - ]).timeout(const Duration(seconds: 15)); - - // Give additional time for isolates to clean up database connections - await Future.delayed(const Duration(milliseconds: 1000)); - } catch (e) { - _log.warning("Timeout during background cancellation: $e"); - } - - // Always release the lock, even if cancellation failed - try { - _ref.read(isolateLockManagerProvider(kIsolateLockManagerPort)).releaseLock(); - } catch (e) { - _log.warning("Failed to release lock on pause: $e"); - } } _ref.read(websocketProvider.notifier).disconnect(); @@ -290,7 +249,6 @@ class AppLifeCycleNotifier extends StateNotifier { } catch (_) {} if (Store.isBetaTimelineEnabled) { - _ref.read(isolateLockManagerProvider(kIsolateLockManagerPort)).releaseLock(); return; } diff --git a/mobile/lib/providers/background_sync.provider.dart b/mobile/lib/providers/background_sync.provider.dart index 1981c45fb1..e6e83b64df 100644 --- a/mobile/lib/providers/background_sync.provider.dart +++ b/mobile/lib/providers/background_sync.provider.dart @@ -1,6 +1,5 @@ import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/utils/background_sync.dart'; -import 'package:immich_mobile/domain/utils/isolate_lock_manager.dart'; import 'package:immich_mobile/providers/sync_status.provider.dart'; final backgroundSyncProvider = Provider((ref) { @@ -19,7 +18,3 @@ final backgroundSyncProvider = Provider((ref) { ref.onDispose(manager.cancel); return manager; }); - -final isolateLockManagerProvider = Provider.family((ref, name) { - return IsolateLockManager(portName: name); -}); diff --git a/mobile/lib/providers/backup/backup.provider.dart b/mobile/lib/providers/backup/backup.provider.dart index 6035e53e5d..76cb383465 100644 --- a/mobile/lib/providers/backup/backup.provider.dart +++ b/mobile/lib/providers/backup/backup.provider.dart @@ -6,7 +6,6 @@ import 'package:flutter/foundation.dart'; import 'package:flutter/widgets.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/models/store.model.dart'; -import 'package:immich_mobile/domain/services/background_worker.service.dart'; import 'package:immich_mobile/entities/album.entity.dart'; import 'package:immich_mobile/entities/backup_album.entity.dart'; import 'package:immich_mobile/entities/store.entity.dart'; @@ -18,7 +17,6 @@ import 'package:immich_mobile/models/backup/current_upload_asset.model.dart'; import 'package:immich_mobile/models/backup/error_upload_asset.model.dart'; import 'package:immich_mobile/models/backup/success_upload_asset.model.dart'; import 'package:immich_mobile/models/server_info/server_disk_info.model.dart'; -import 'package:immich_mobile/platform/background_worker_api.g.dart'; import 'package:immich_mobile/providers/app_life_cycle.provider.dart'; import 'package:immich_mobile/providers/auth.provider.dart'; import 'package:immich_mobile/providers/backup/error_backup_list.provider.dart'; @@ -36,8 +34,6 @@ import 'package:logging/logging.dart'; import 'package:permission_handler/permission_handler.dart'; import 'package:photo_manager/photo_manager.dart' show PMProgressHandler; -final driftBackgroundUploadFgService = Provider((ref) => BackgroundWorkerFgService(BackgroundWorkerFgHostApi())); - final backupProvider = StateNotifierProvider((ref) { return BackupNotifier( ref.watch(backupServiceProvider), diff --git a/mobile/lib/providers/infrastructure/platform.provider.dart b/mobile/lib/providers/infrastructure/platform.provider.dart index 6469624c09..05901a4fec 100644 --- a/mobile/lib/providers/infrastructure/platform.provider.dart +++ b/mobile/lib/providers/infrastructure/platform.provider.dart @@ -1,7 +1,11 @@ import 'package:hooks_riverpod/hooks_riverpod.dart'; +import 'package:immich_mobile/domain/services/background_worker.service.dart'; +import 'package:immich_mobile/platform/background_worker_api.g.dart'; import 'package:immich_mobile/platform/native_sync_api.g.dart'; import 'package:immich_mobile/platform/thumbnail_api.g.dart'; +final backgroundWorkerFgServiceProvider = Provider((_) => BackgroundWorkerFgService(BackgroundWorkerFgHostApi())); + final nativeSyncApiProvider = Provider((_) => NativeSyncApi()); final thumbnailApi = ThumbnailApi();