feat: websocket background sync (#19888)

* feat: websocket background sync

* batch websocket

* pr feedback
This commit is contained in:
Alex 2025-07-15 09:38:28 -05:00 committed by GitHub
parent 0acbf1199a
commit 59e7754bdc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 116 additions and 1 deletions

View File

@ -31,6 +31,59 @@ class SyncStreamService {
return _syncApiRepository.streamChanges(_handleEvents); return _syncApiRepository.streamChanges(_handleEvents);
} }
Future<void> handleWsAssetUploadReadyV1Batch(List<dynamic> batchData) async {
if (batchData.isEmpty) return;
_logger.info(
'Processing batch of ${batchData.length} AssetUploadReadyV1 events',
);
final List<SyncAssetV1> assets = [];
final List<SyncAssetExifV1> exifs = [];
try {
for (final data in batchData) {
if (data is! Map<String, dynamic>) {
continue;
}
final payload = data;
final assetData = payload['asset'];
final exifData = payload['exif'];
if (assetData == null || exifData == null) {
continue;
}
final asset = SyncAssetV1.fromJson(assetData);
final exif = SyncAssetExifV1.fromJson(exifData);
if (asset != null && exif != null) {
assets.add(asset);
exifs.add(exif);
}
}
if (assets.isNotEmpty && exifs.isNotEmpty) {
await _syncStreamRepository.updateAssetsV1(
assets,
debugLabel: 'websocket-batch',
);
await _syncStreamRepository.updateAssetsExifV1(
exifs,
debugLabel: 'websocket-batch',
);
_logger.info('Successfully processed ${assets.length} assets in batch');
}
} catch (error, stackTrace) {
_logger.severe(
"Error processing AssetUploadReadyV1 websocket batch events",
error,
stackTrace,
);
}
}
Future<void> _handleEvents(List<SyncEvent> events, Function() abort) async { Future<void> _handleEvents(List<SyncEvent> events, Function() abort) async {
List<SyncEvent> items = []; List<SyncEvent> items = [];
for (final event in events) { for (final event in events) {

View File

@ -6,6 +6,7 @@ import 'package:worker_manager/worker_manager.dart';
class BackgroundSyncManager { class BackgroundSyncManager {
Cancelable<void>? _syncTask; Cancelable<void>? _syncTask;
Cancelable<void>? _syncWebsocketTask;
Cancelable<void>? _deviceAlbumSyncTask; Cancelable<void>? _deviceAlbumSyncTask;
Cancelable<void>? _hashTask; Cancelable<void>? _hashTask;
@ -20,6 +21,12 @@ class BackgroundSyncManager {
_syncTask?.cancel(); _syncTask?.cancel();
_syncTask = null; _syncTask = null;
if (_syncWebsocketTask != null) {
futures.add(_syncWebsocketTask!.future);
}
_syncWebsocketTask?.cancel();
_syncWebsocketTask = null;
return Future.wait(futures); return Future.wait(futures);
} }
@ -72,4 +79,19 @@ class BackgroundSyncManager {
_syncTask = null; _syncTask = null;
}); });
} }
Future<void> syncWebsocketBatch(List<dynamic> batchData) {
if (_syncWebsocketTask != null) {
return _syncWebsocketTask!.future;
}
_syncWebsocketTask = runInIsolateGentle(
computation: (ref) => ref
.read(syncStreamServiceProvider)
.handleWsAssetUploadReadyV1Batch(batchData),
);
return _syncWebsocketTask!.whenComplete(() {
_syncWebsocketTask = null;
});
}
} }

View File

@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:convert'; import 'dart:convert';
import 'package:collection/collection.dart'; import 'package:collection/collection.dart';
@ -10,6 +11,7 @@ import 'package:immich_mobile/entities/store.entity.dart';
import 'package:immich_mobile/models/server_info/server_version.model.dart'; import 'package:immich_mobile/models/server_info/server_version.model.dart';
import 'package:immich_mobile/providers/asset.provider.dart'; import 'package:immich_mobile/providers/asset.provider.dart';
import 'package:immich_mobile/providers/auth.provider.dart'; import 'package:immich_mobile/providers/auth.provider.dart';
import 'package:immich_mobile/providers/background_sync.provider.dart';
import 'package:immich_mobile/providers/db.provider.dart'; import 'package:immich_mobile/providers/db.provider.dart';
import 'package:immich_mobile/providers/server_info.provider.dart'; import 'package:immich_mobile/providers/server_info.provider.dart';
import 'package:immich_mobile/services/api.service.dart'; import 'package:immich_mobile/services/api.service.dart';
@ -106,6 +108,18 @@ class WebsocketNotifier extends StateNotifier<WebsocketState> {
final Debouncer _debounce = final Debouncer _debounce =
Debouncer(interval: const Duration(milliseconds: 500)); Debouncer(interval: const Duration(milliseconds: 500));
final Debouncer _batchDebouncer = Debouncer(
interval: const Duration(seconds: 5),
maxWaitTime: const Duration(seconds: 10),
);
final List<dynamic> _batchedAssetUploadReady = [];
@override
void dispose() {
_batchDebouncer.dispose();
super.dispose();
}
/// Connects websocket to server unless already connected /// Connects websocket to server unless already connected
void connect() { void connect() {
if (state.isConnected) return; if (state.isConnected) return;
@ -171,6 +185,7 @@ class WebsocketNotifier extends StateNotifier<WebsocketState> {
socket.on('on_asset_stack_update', _handleServerUpdates); socket.on('on_asset_stack_update', _handleServerUpdates);
socket.on('on_asset_hidden', _handleOnAssetHidden); socket.on('on_asset_hidden', _handleOnAssetHidden);
socket.on('on_new_release', _handleReleaseUpdates); socket.on('on_new_release', _handleReleaseUpdates);
socket.on('AssetUploadReadyV1', _handleSyncAssetUploadReady);
} catch (e) { } catch (e) {
debugPrint("[WEBSOCKET] Catch Websocket Error - ${e.toString()}"); debugPrint("[WEBSOCKET] Catch Websocket Error - ${e.toString()}");
} }
@ -180,6 +195,8 @@ class WebsocketNotifier extends StateNotifier<WebsocketState> {
void disconnect() { void disconnect() {
debugPrint("Attempting to disconnect from websocket"); debugPrint("Attempting to disconnect from websocket");
_batchedAssetUploadReady.clear();
var socket = state.socket?.disconnect(); var socket = state.socket?.disconnect();
if (socket?.disconnected == true) { if (socket?.disconnected == true) {
@ -288,7 +305,7 @@ class WebsocketNotifier extends StateNotifier<WebsocketState> {
} }
} }
void handlePendingChanges() async { Future<void> handlePendingChanges() async {
await _handlePendingUploaded(); await _handlePendingUploaded();
await _handlePendingDeletes(); await _handlePendingDeletes();
await _handlingPendingHidden(); await _handlingPendingHidden();
@ -347,6 +364,29 @@ class WebsocketNotifier extends StateNotifier<WebsocketState> {
.read(serverInfoProvider.notifier) .read(serverInfoProvider.notifier)
.handleNewRelease(serverVersion, releaseVersion); .handleNewRelease(serverVersion, releaseVersion);
} }
void _handleSyncAssetUploadReady(dynamic data) {
_batchedAssetUploadReady.add(data);
_batchDebouncer.run(_processBatchedAssetUploadReady);
}
void _processBatchedAssetUploadReady() {
if (_batchedAssetUploadReady.isEmpty) {
return;
}
try {
unawaited(
_ref
.read(backgroundSyncProvider)
.syncWebsocketBatch(_batchedAssetUploadReady.toList()),
);
} catch (error) {
_log.severe("Error processing batched AssetUploadReadyV1 events: $error");
}
_batchedAssetUploadReady.clear();
}
} }
final websocketProvider = final websocketProvider =