diff --git a/mobile/analysis_options.yaml b/mobile/analysis_options.yaml index a44e508088..226e198a04 100644 --- a/mobile/analysis_options.yaml +++ b/mobile/analysis_options.yaml @@ -89,10 +89,13 @@ custom_lint: allowed: # required / wanted - lib/repositories/*_api.repository.dart + - lib/infrastructure/repositories/*_api.repository.dart # acceptable exceptions for the time being - lib/entities/{album,asset,exif_info,user}.entity.dart # to convert DTOs to entities - lib/utils/{image_url_builder,openapi_patching}.dart # utils are fine - test/modules/utils/openapi_patching_test.dart # filename is self-explanatory... + - lib/domain/services/sync_stream.service.dart # Making sure to comply with the type from database + # refactor - lib/models/map/map_marker.model.dart - lib/models/server_info/server_{config,disk_info,features,version}.model.dart diff --git a/mobile/lib/domain/interfaces/sync_api.interface.dart b/mobile/lib/domain/interfaces/sync_api.interface.dart new file mode 100644 index 0000000000..fb8f1aa46e --- /dev/null +++ b/mobile/lib/domain/interfaces/sync_api.interface.dart @@ -0,0 +1,7 @@ +import 'package:immich_mobile/domain/models/sync/sync_event.model.dart'; + +abstract interface class ISyncApiRepository { + Future ack(String data); + + Stream> watchUserSyncEvent(); +} diff --git a/mobile/lib/domain/models/sync/sync_event.model.dart b/mobile/lib/domain/models/sync/sync_event.model.dart new file mode 100644 index 0000000000..f4642d59cf --- /dev/null +++ b/mobile/lib/domain/models/sync/sync_event.model.dart @@ -0,0 +1,14 @@ +class SyncEvent { + // dynamic + final dynamic data; + + final String ack; + + SyncEvent({ + required this.data, + required this.ack, + }); + + @override + String toString() => 'SyncEvent(data: $data, ack: $ack)'; +} diff --git a/mobile/lib/domain/services/sync_stream.service.dart b/mobile/lib/domain/services/sync_stream.service.dart new file mode 100644 index 0000000000..72e29b3677 --- /dev/null +++ b/mobile/lib/domain/services/sync_stream.service.dart @@ -0,0 +1,49 @@ +import 'dart:async'; + +import 'package:flutter/foundation.dart'; +import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; +import 'package:openapi/api.dart'; + +class SyncStreamService { + final ISyncApiRepository _syncApiRepository; + + SyncStreamService(this._syncApiRepository); + + StreamSubscription? _userSyncSubscription; + + void syncUsers() { + _userSyncSubscription = + _syncApiRepository.watchUserSyncEvent().listen((events) async { + for (final event in events) { + if (event.data is SyncUserV1) { + final data = event.data as SyncUserV1; + debugPrint("User Update: $data"); + + // final user = await _userRepository.get(data.id); + + // if (user == null) { + // continue; + // } + + // user.name = data.name; + // user.email = data.email; + // user.updatedAt = DateTime.now(); + + // await _userRepository.update(user); + // await _syncApiRepository.ack(event.ack); + } + + if (event.data is SyncUserDeleteV1) { + final data = event.data as SyncUserDeleteV1; + + debugPrint("User delete: $data"); + // await _syncApiRepository.ack(event.ack); + } + } + }); + } + + Future dispose() async { + await _userSyncSubscription?.cancel(); + } +} diff --git a/mobile/lib/infrastructure/repositories/sync_api.repository.dart b/mobile/lib/infrastructure/repositories/sync_api.repository.dart new file mode 100644 index 0000000000..88a6838c44 --- /dev/null +++ b/mobile/lib/infrastructure/repositories/sync_api.repository.dart @@ -0,0 +1,112 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:flutter/foundation.dart'; +import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; +import 'package:immich_mobile/domain/models/sync/sync_event.model.dart'; +import 'package:immich_mobile/services/api.service.dart'; +import 'package:openapi/api.dart'; +import 'package:http/http.dart' as http; + +class SyncApiRepository implements ISyncApiRepository { + final ApiService _api; + const SyncApiRepository(this._api); + + @override + Stream> watchUserSyncEvent() { + return _getSyncStream( + SyncStreamDto(types: [SyncRequestType.usersV1]), + ); + } + + @override + Future ack(String data) { + return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: [data])); + } + + Stream> _getSyncStream( + SyncStreamDto dto, { + int batchSize = 5000, + }) async* { + final client = http.Client(); + final endpoint = "${_api.apiClient.basePath}/sync/stream"; + + final headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/jsonlines+json', + }; + + final queryParams = []; + final headerParams = {}; + await _api.applyToParams(queryParams, headerParams); + headers.addAll(headerParams); + + final request = http.Request('POST', Uri.parse(endpoint)); + request.headers.addAll(headers); + request.body = jsonEncode(dto.toJson()); + + String previousChunk = ''; + List lines = []; + + try { + final response = await client.send(request); + + if (response.statusCode != 200) { + final errorBody = await response.stream.bytesToString(); + throw ApiException( + response.statusCode, + 'Failed to get sync stream: $errorBody', + ); + } + + await for (final chunk in response.stream.transform(utf8.decoder)) { + previousChunk += chunk; + final parts = previousChunk.split('\n'); + previousChunk = parts.removeLast(); + lines.addAll(parts); + + if (lines.length < batchSize) { + continue; + } + + yield await compute(_parseSyncResponse, lines); + lines.clear(); + } + } finally { + if (lines.isNotEmpty) { + yield await compute(_parseSyncResponse, lines); + } + client.close(); + } + } +} + +const _kResponseMap = { + SyncEntityType.userV1: SyncUserV1.fromJson, + SyncEntityType.userDeleteV1: SyncUserDeleteV1.fromJson, +}; + +// Need to be outside of the class to be able to use compute +List _parseSyncResponse(List lines) { + final List data = []; + + for (var line in lines) { + try { + final jsonData = jsonDecode(line); + final type = SyncEntityType.fromJson(jsonData['type'])!; + final dataJson = jsonData['data']; + final ack = jsonData['ack']; + final converter = _kResponseMap[type]; + if (converter == null) { + debugPrint("[_parseSyncReponse] Unknown type $type"); + continue; + } + + data.add(SyncEvent(data: converter(dataJson), ack: ack)); + } catch (error, stack) { + debugPrint("[_parseSyncReponse] Error parsing json $error $stack"); + } + } + + return data; +} diff --git a/mobile/lib/providers/infrastructure/sync_stream.provider.dart b/mobile/lib/providers/infrastructure/sync_stream.provider.dart new file mode 100644 index 0000000000..64f1a6cb05 --- /dev/null +++ b/mobile/lib/providers/infrastructure/sync_stream.provider.dart @@ -0,0 +1,24 @@ +import 'dart:async'; + +import 'package:hooks_riverpod/hooks_riverpod.dart'; +import 'package:immich_mobile/domain/services/sync_stream.service.dart'; +import 'package:immich_mobile/infrastructure/repositories/sync_api.repository.dart'; +import 'package:immich_mobile/providers/api.provider.dart'; + +final syncStreamServiceProvider = Provider( + (ref) { + final instance = SyncStreamService( + ref.watch(syncApiRepositoryProvider), + ); + + ref.onDispose(() => unawaited(instance.dispose())); + + return instance; + }, +); + +final syncApiRepositoryProvider = Provider( + (ref) => SyncApiRepository( + ref.watch(apiServiceProvider), + ), +); diff --git a/mobile/lib/widgets/common/immich_app_bar.dart b/mobile/lib/widgets/common/immich_app_bar.dart index 7a42606797..7f97944cd5 100644 --- a/mobile/lib/widgets/common/immich_app_bar.dart +++ b/mobile/lib/widgets/common/immich_app_bar.dart @@ -10,6 +10,7 @@ import 'package:immich_mobile/models/backup/backup_state.model.dart'; import 'package:immich_mobile/models/server_info/server_info.model.dart'; import 'package:immich_mobile/providers/backup/backup.provider.dart'; import 'package:immich_mobile/providers/immich_logo_provider.dart'; +import 'package:immich_mobile/providers/infrastructure/sync_stream.provider.dart'; import 'package:immich_mobile/providers/server_info.provider.dart'; import 'package:immich_mobile/routing/router.dart'; import 'package:immich_mobile/widgets/common/app_bar_dialog/app_bar_dialog.dart'; @@ -185,6 +186,12 @@ class ImmichAppBar extends ConsumerWidget implements PreferredSizeWidget { }, ), actions: [ + IconButton( + onPressed: () { + ref.read(syncStreamServiceProvider).syncUsers(); + }, + icon: const Icon(Icons.sync), + ), if (actions != null) ...actions!.map( (action) => Padding(