From eb64c6b04e11b725cca92367f2f08257dabe5e0e Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 7 Mar 2025 14:47:30 -0600 Subject: [PATCH] feat: new mobile asset sync --- .../domain/interfaces/sync_api.interface.dart | 2 + .../domain/services/sync_stream.service.dart | 133 +++++++++++++----- .../repositories/sync_api.repository.dart | 46 +++++- mobile/lib/widgets/common/immich_app_bar.dart | 1 + .../lib/widgets/forms/login/login_form.dart | 10 +- 5 files changed, 145 insertions(+), 47 deletions(-) diff --git a/mobile/lib/domain/interfaces/sync_api.interface.dart b/mobile/lib/domain/interfaces/sync_api.interface.dart index c639b59a88..5999be1ba8 100644 --- a/mobile/lib/domain/interfaces/sync_api.interface.dart +++ b/mobile/lib/domain/interfaces/sync_api.interface.dart @@ -6,4 +6,6 @@ abstract interface class ISyncApiRepository { Stream> watchUserSyncEvent(); Stream> watchAssetSyncEvent(); + + Stream> watchExifSyncEvent(); } diff --git a/mobile/lib/domain/services/sync_stream.service.dart b/mobile/lib/domain/services/sync_stream.service.dart index db4b007ce2..270de28287 100644 --- a/mobile/lib/domain/services/sync_stream.service.dart +++ b/mobile/lib/domain/services/sync_stream.service.dart @@ -10,62 +10,117 @@ class SyncStreamService { SyncStreamService(this._syncApiRepository); StreamSubscription? _userSyncSubscription; + StreamSubscription? _assetSyncSubscription; + StreamSubscription? _exifSyncSubscription; + + bool _isUserSyncing = false; + bool _isAssetSyncing = false; + bool _isExifSyncing = false; void syncUsers() { - _userSyncSubscription = - _syncApiRepository.watchUserSyncEvent().listen((events) async { - for (final event in events) { - if (event.data is SyncUserV1) { - final data = event.data as SyncUserV1; - debugPrint("User Update: $data"); + if (_isUserSyncing) { + return; + } - // final user = await _userRepository.get(data.id); + _isUserSyncing = true; + _userSyncSubscription = _syncApiRepository.watchUserSyncEvent().listen( + (events) async { + for (final event in events) { + if (event.data is SyncUserV1) { + final data = event.data as SyncUserV1; + debugPrint("User Update: $data"); - // if (user == null) { - // continue; - // } + // await _syncApiRepository.ack(event.ack); + } - // user.name = data.name; - // user.email = data.email; - // user.updatedAt = DateTime.now(); + if (event.data is SyncUserDeleteV1) { + final data = event.data as SyncUserDeleteV1; - // await _userRepository.update(user); - // await _syncApiRepository.ack(event.ack); + debugPrint("User delete: $data"); + // await _syncApiRepository.ack(event.ack); + } } - - if (event.data is SyncUserDeleteV1) { - final data = event.data as SyncUserDeleteV1; - - debugPrint("User delete: $data"); - // await _syncApiRepository.ack(event.ack); - } - } - }); + await _syncApiRepository.ack(events.last.ack); + }, + onDone: () { + _isUserSyncing = false; + }, + onError: (_) { + _isUserSyncing = false; + }, + ); } void syncAssets() { + if (_isAssetSyncing) { + debugPrint("Asset syncing already in progress"); + return; + } + _isAssetSyncing = true; int eventCount = 0; - _syncApiRepository.watchAssetSyncEvent().listen((events) async { - eventCount += events.length; - debugPrint("Asset events: $eventCount"); - for (final event in events) { - if (event.data is SyncAssetV1) { - final data = event.data as SyncAssetV1; - // debugPrint("Asset Update: $data"); - // await _syncApiRepository.ack(event.ack); - } - if (event.data is SyncAssetDeleteV1) { - final data = event.data as SyncAssetDeleteV1; + _assetSyncSubscription = _syncApiRepository.watchAssetSyncEvent().listen( + (events) async { + eventCount += events.length; + debugPrint("Asset events: $eventCount"); + for (final event in events) { + if (event.data is SyncAssetV1) { + // final data = event.data as SyncAssetV1; + // await _syncApiRepository.ack(event.ack); + } - // debugPrint("Asset delete: $data"); - // await _syncApiRepository.ack(event.ack); + if (event.data is SyncAssetDeleteV1) { + // final data = event.data as SyncAssetDeleteV1; + + // debugPrint("Asset delete: $data"); + // await _syncApiRepository.ack(event.ack); + } } - } - }); + await _syncApiRepository.ack(events.last.ack); + }, + onDone: () { + _isAssetSyncing = false; + }, + onError: (_) { + _isAssetSyncing = false; + }, + ); + } + + void syncExif() { + if (_isExifSyncing) { + debugPrint("EXIF syncing already in progress"); + return; + } + + _isExifSyncing = true; + int eventCount = 0; + + _exifSyncSubscription = _syncApiRepository.watchExifSyncEvent().listen( + (events) async { + eventCount += events.length; + debugPrint("exif events: $eventCount"); + for (final event in events) { + if (event.data is SyncAssetExifV1) { + // final data = event.data as SyncAssetExifV1; + + // await _syncApiRepository.ack(event.ack); + } + } + await _syncApiRepository.ack(events.last.ack); + }, + onDone: () { + _isExifSyncing = false; + }, + onError: (_) { + _isExifSyncing = false; + }, + ); } Future dispose() async { await _userSyncSubscription?.cancel(); + await _assetSyncSubscription?.cancel(); + await _exifSyncSubscription?.cancel(); } } diff --git a/mobile/lib/infrastructure/repositories/sync_api.repository.dart b/mobile/lib/infrastructure/repositories/sync_api.repository.dart index e486cb9a5d..206fd0917f 100644 --- a/mobile/lib/infrastructure/repositories/sync_api.repository.dart +++ b/mobile/lib/infrastructure/repositories/sync_api.repository.dart @@ -16,17 +16,33 @@ class SyncApiRepository implements ISyncApiRepository { Stream> watchUserSyncEvent() { return _getSyncStream( SyncStreamDto(types: [SyncRequestType.usersV1]), + methodName: 'watchUserSyncEvent', ); } @override Stream> watchAssetSyncEvent() { return _getSyncStream( - SyncStreamDto(types: [SyncRequestType.assetsV1]), + SyncStreamDto( + types: [SyncRequestType.assetsV1, SyncRequestType.partnerAssetsV1], + ), methodName: 'watchAssetSyncEvent', ); } + @override + Stream> watchExifSyncEvent() { + return _getSyncStream( + SyncStreamDto( + types: [ + SyncRequestType.assetExifsV1, + SyncRequestType.partnerAssetExifsV1, + ], + ), + methodName: 'watchExifSyncEvent', + ); + } + @override Future ack(String data) { return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: [data])); @@ -34,7 +50,7 @@ class SyncApiRepository implements ISyncApiRepository { Stream> _getSyncStream( SyncStreamDto dto, { - int batchSize = 20000, + int batchSize = 10000, String methodName = '', }) async* { final stopwatch = Stopwatch()..start(); @@ -95,17 +111,33 @@ class SyncApiRepository implements ISyncApiRepository { } const _kResponseMap = { + /// user SyncEntityType.userV1: SyncUserV1.fromJson, SyncEntityType.userDeleteV1: SyncUserDeleteV1.fromJson, + + /// partners + SyncEntityType.partnerV1: SyncPartnerV1.fromJson, + SyncEntityType.partnerDeleteV1: SyncPartnerDeleteV1.fromJson, + + /// assets SyncEntityType.assetV1: SyncAssetV1.fromJson, SyncEntityType.assetDeleteV1: SyncAssetDeleteV1.fromJson, + SyncEntityType.assetExifV1: SyncAssetExifV1.fromJson, + + /// partners' assets + SyncEntityType.partnerAssetV1: SyncAssetV1.fromJson, + SyncEntityType.partnerAssetDeleteV1: SyncAssetDeleteV1.fromJson, + SyncEntityType.partnerAssetExifV1: SyncAssetExifV1.fromJson, + + /// album }; // Need to be outside of the class to be able to use compute List _parseSyncResponse(List lines) { + final stopwatch = Stopwatch()..start(); final List data = []; - for (var line in lines) { + for (final line in lines) { try { final jsonData = jsonDecode(line); final type = SyncEntityType.fromJson(jsonData['type'])!; @@ -113,15 +145,19 @@ List _parseSyncResponse(List lines) { final ack = jsonData['ack']; final converter = _kResponseMap[type]; if (converter == null) { - debugPrint("[_parseSyncReponse] Unknown type $type"); + debugPrint("[_parseSyncResponse] Unknown type $type"); continue; } data.add(SyncEvent(data: converter(dataJson), ack: ack)); } catch (error, stack) { - debugPrint("[_parseSyncReponse] Error parsing json $error $stack"); + debugPrint("[_parseSyncResponse] Error parsing json $error $stack"); } } + debugPrint( + "[_parseSyncResponse] Parsed ${data.length} events in ${stopwatch.elapsedMilliseconds}ms", + ); + return data; } diff --git a/mobile/lib/widgets/common/immich_app_bar.dart b/mobile/lib/widgets/common/immich_app_bar.dart index c89ee9e4ca..8f2144f272 100644 --- a/mobile/lib/widgets/common/immich_app_bar.dart +++ b/mobile/lib/widgets/common/immich_app_bar.dart @@ -188,6 +188,7 @@ class ImmichAppBar extends ConsumerWidget implements PreferredSizeWidget { actions: [ IconButton( onPressed: () { + ref.read(syncStreamServiceProvider).syncExif(); ref.read(syncStreamServiceProvider).syncAssets(); }, icon: const Icon(Icons.sync), diff --git a/mobile/lib/widgets/forms/login/login_form.dart b/mobile/lib/widgets/forms/login/login_form.dart index dc63841193..e5427f4bcc 100644 --- a/mobile/lib/widgets/forms/login/login_form.dart +++ b/mobile/lib/widgets/forms/login/login_form.dart @@ -165,9 +165,13 @@ class LoginForm extends HookConsumerWidget { } populateTestLoginInfo1() { - emailController.text = 'testuser@email.com'; - passwordController.text = 'password'; - serverEndpointController.text = 'http://10.1.15.216:2283/api'; + emailController.text = 'admin@immich.cloud'; + passwordController.text = 'kthxbye'; + serverEndpointController.text = + 'https://pr-16658.preview.internal.immich.cloud/'; + // emailController.text = 'testuser@email.com'; + // passwordController.text = 'password'; + // serverEndpointController.text = 'http://10.1.15.216:2283/api'; } login() async {