From 59e7754bdce044bc880806cc1f63187ff27a8fa8 Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 15 Jul 2025 09:38:28 -0500 Subject: [PATCH] feat: websocket background sync (#19888) * feat: websocket background sync * batch websocket * pr feedback --- .../domain/services/sync_stream.service.dart | 53 +++++++++++++++++++ mobile/lib/domain/utils/background_sync.dart | 22 ++++++++ mobile/lib/providers/websocket.provider.dart | 42 ++++++++++++++- 3 files changed, 116 insertions(+), 1 deletion(-) diff --git a/mobile/lib/domain/services/sync_stream.service.dart b/mobile/lib/domain/services/sync_stream.service.dart index 48cf318a8b..6183865041 100644 --- a/mobile/lib/domain/services/sync_stream.service.dart +++ b/mobile/lib/domain/services/sync_stream.service.dart @@ -31,6 +31,59 @@ class SyncStreamService { return _syncApiRepository.streamChanges(_handleEvents); } + Future handleWsAssetUploadReadyV1Batch(List batchData) async { + if (batchData.isEmpty) return; + + _logger.info( + 'Processing batch of ${batchData.length} AssetUploadReadyV1 events', + ); + + final List assets = []; + final List exifs = []; + + try { + for (final data in batchData) { + if (data is! Map) { + continue; + } + + final payload = data; + final assetData = payload['asset']; + final exifData = payload['exif']; + + if (assetData == null || exifData == null) { + continue; + } + + final asset = SyncAssetV1.fromJson(assetData); + final exif = SyncAssetExifV1.fromJson(exifData); + + if (asset != null && exif != null) { + assets.add(asset); + exifs.add(exif); + } + } + + if (assets.isNotEmpty && exifs.isNotEmpty) { + await _syncStreamRepository.updateAssetsV1( + assets, + debugLabel: 'websocket-batch', + ); + await _syncStreamRepository.updateAssetsExifV1( + exifs, + debugLabel: 'websocket-batch', + ); + _logger.info('Successfully processed ${assets.length} assets in batch'); + } + } catch (error, stackTrace) { + _logger.severe( + "Error processing AssetUploadReadyV1 websocket batch events", + error, + stackTrace, + ); + } + } + Future _handleEvents(List events, Function() abort) async { List items = []; for (final event in events) { diff --git a/mobile/lib/domain/utils/background_sync.dart b/mobile/lib/domain/utils/background_sync.dart index c8d2e2b624..c71f1a8315 100644 --- a/mobile/lib/domain/utils/background_sync.dart +++ b/mobile/lib/domain/utils/background_sync.dart @@ -6,6 +6,7 @@ import 'package:worker_manager/worker_manager.dart'; class BackgroundSyncManager { Cancelable? _syncTask; + Cancelable? _syncWebsocketTask; Cancelable? _deviceAlbumSyncTask; Cancelable? _hashTask; @@ -20,6 +21,12 @@ class BackgroundSyncManager { _syncTask?.cancel(); _syncTask = null; + if (_syncWebsocketTask != null) { + futures.add(_syncWebsocketTask!.future); + } + _syncWebsocketTask?.cancel(); + _syncWebsocketTask = null; + return Future.wait(futures); } @@ -72,4 +79,19 @@ class BackgroundSyncManager { _syncTask = null; }); } + + Future syncWebsocketBatch(List batchData) { + if (_syncWebsocketTask != null) { + return _syncWebsocketTask!.future; + } + + _syncWebsocketTask = runInIsolateGentle( + computation: (ref) => ref + .read(syncStreamServiceProvider) + .handleWsAssetUploadReadyV1Batch(batchData), + ); + return _syncWebsocketTask!.whenComplete(() { + _syncWebsocketTask = null; + }); + } } diff --git a/mobile/lib/providers/websocket.provider.dart b/mobile/lib/providers/websocket.provider.dart index 2600c6d8ee..d9db831776 100644 --- a/mobile/lib/providers/websocket.provider.dart +++ b/mobile/lib/providers/websocket.provider.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:convert'; import 'package:collection/collection.dart'; @@ -10,6 +11,7 @@ import 'package:immich_mobile/entities/store.entity.dart'; import 'package:immich_mobile/models/server_info/server_version.model.dart'; import 'package:immich_mobile/providers/asset.provider.dart'; import 'package:immich_mobile/providers/auth.provider.dart'; +import 'package:immich_mobile/providers/background_sync.provider.dart'; import 'package:immich_mobile/providers/db.provider.dart'; import 'package:immich_mobile/providers/server_info.provider.dart'; import 'package:immich_mobile/services/api.service.dart'; @@ -106,6 +108,18 @@ class WebsocketNotifier extends StateNotifier { final Debouncer _debounce = Debouncer(interval: const Duration(milliseconds: 500)); + final Debouncer _batchDebouncer = Debouncer( + interval: const Duration(seconds: 5), + maxWaitTime: const Duration(seconds: 10), + ); + final List _batchedAssetUploadReady = []; + + @override + void dispose() { + _batchDebouncer.dispose(); + super.dispose(); + } + /// Connects websocket to server unless already connected void connect() { if (state.isConnected) return; @@ -171,6 +185,7 @@ class WebsocketNotifier extends StateNotifier { socket.on('on_asset_stack_update', _handleServerUpdates); socket.on('on_asset_hidden', _handleOnAssetHidden); socket.on('on_new_release', _handleReleaseUpdates); + socket.on('AssetUploadReadyV1', _handleSyncAssetUploadReady); } catch (e) { debugPrint("[WEBSOCKET] Catch Websocket Error - ${e.toString()}"); } @@ -180,6 +195,8 @@ class WebsocketNotifier extends StateNotifier { void disconnect() { debugPrint("Attempting to disconnect from websocket"); + _batchedAssetUploadReady.clear(); + var socket = state.socket?.disconnect(); if (socket?.disconnected == true) { @@ -288,7 +305,7 @@ class WebsocketNotifier extends StateNotifier { } } - void handlePendingChanges() async { + Future handlePendingChanges() async { await _handlePendingUploaded(); await _handlePendingDeletes(); await _handlingPendingHidden(); @@ -347,6 +364,29 @@ class WebsocketNotifier extends StateNotifier { .read(serverInfoProvider.notifier) .handleNewRelease(serverVersion, releaseVersion); } + + void _handleSyncAssetUploadReady(dynamic data) { + _batchedAssetUploadReady.add(data); + _batchDebouncer.run(_processBatchedAssetUploadReady); + } + + void _processBatchedAssetUploadReady() { + if (_batchedAssetUploadReady.isEmpty) { + return; + } + + try { + unawaited( + _ref + .read(backgroundSyncProvider) + .syncWebsocketBatch(_batchedAssetUploadReady.toList()), + ); + } catch (error) { + _log.severe("Error processing batched AssetUploadReadyV1 events: $error"); + } + + _batchedAssetUploadReady.clear(); + } } final websocketProvider =