From 3739e869744f0f3e47842636a061630e60d843e3 Mon Sep 17 00:00:00 2001 From: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com> Date: Fri, 14 Mar 2025 11:06:42 +0530 Subject: [PATCH] feat: user & partner sync stream --- mobile/analysis_options.yaml | 2 + mobile/lib/constants/constants.dart | 2 + .../domain/interfaces/sync_api.interface.dart | 4 +- .../interfaces/sync_stream.interface.dart | 10 ++ .../domain/models/sync/sync_event.model.dart | 15 ++- .../domain/services/sync_stream.service.dart | 99 ++++++++++++++----- .../repositories/sync_api.repository.dart | 81 ++++++++------- .../repositories/sync_stream.repository.dart | 84 ++++++++++++++++ .../providers/app_life_cycle.provider.dart | 2 + .../providers/background_sync.provider.dart | 10 ++ .../providers/background_sync.provider.g.dart | 27 +++++ mobile/lib/providers/db.provider.dart | 5 + .../providers/infrastructure/db.provider.dart | 4 + .../infrastructure/db.provider.g.dart | 16 +++ .../infrastructure/sync_stream.provider.dart | 27 ++--- mobile/lib/utils/background_sync.dart | 58 +++++++++++ mobile/lib/utils/isolate.dart | 67 +++++++++++++ 17 files changed, 429 insertions(+), 84 deletions(-) create mode 100644 mobile/lib/domain/interfaces/sync_stream.interface.dart create mode 100644 mobile/lib/infrastructure/repositories/sync_stream.repository.dart create mode 100644 mobile/lib/providers/background_sync.provider.dart create mode 100644 mobile/lib/providers/background_sync.provider.g.dart create mode 100644 mobile/lib/utils/background_sync.dart create mode 100644 mobile/lib/utils/isolate.dart diff --git a/mobile/analysis_options.yaml b/mobile/analysis_options.yaml index 04f3145908..f082c9185d 100644 --- a/mobile/analysis_options.yaml +++ b/mobile/analysis_options.yaml @@ -92,6 +92,8 @@ custom_lint: allowed: # required / wanted - lib/repositories/*_api.repository.dart + - lib/domain/models/sync/sync_event.model.dart + - lib/{domain,infrastructure}/**/sync_stream.* - lib/infrastructure/repositories/*_api.repository.dart - lib/infrastructure/utils/*.converter.dart # acceptable exceptions for the time being diff --git a/mobile/lib/constants/constants.dart b/mobile/lib/constants/constants.dart index 83d540d54c..0c17beef00 100644 --- a/mobile/lib/constants/constants.dart +++ b/mobile/lib/constants/constants.dart @@ -5,5 +5,7 @@ const double downloadFailed = -2; // Number of log entries to retain on app start const int kLogTruncateLimit = 250; +// Duration for backgroun sync +const Duration kBackgroundSyncDuration = Duration(minutes: 1); const int kBatchHashFileLimit = 128; const int kBatchHashSizeLimit = 1024 * 1024 * 1024; // 1GB diff --git a/mobile/lib/domain/interfaces/sync_api.interface.dart b/mobile/lib/domain/interfaces/sync_api.interface.dart index fb8f1aa46e..095448f9c8 100644 --- a/mobile/lib/domain/interfaces/sync_api.interface.dart +++ b/mobile/lib/domain/interfaces/sync_api.interface.dart @@ -1,7 +1,9 @@ import 'package:immich_mobile/domain/models/sync/sync_event.model.dart'; abstract interface class ISyncApiRepository { - Future ack(String data); + Future ack(List data); Stream> watchUserSyncEvent(); + + Stream> watchPartnerSyncEvent(); } diff --git a/mobile/lib/domain/interfaces/sync_stream.interface.dart b/mobile/lib/domain/interfaces/sync_stream.interface.dart new file mode 100644 index 0000000000..0b2b5879f0 --- /dev/null +++ b/mobile/lib/domain/interfaces/sync_stream.interface.dart @@ -0,0 +1,10 @@ +import 'package:immich_mobile/domain/interfaces/db.interface.dart'; +import 'package:openapi/api.dart'; + +abstract interface class ISyncStreamRepository implements IDatabaseRepository { + Future updateUsersV1(SyncUserV1 data); + Future deleteUsersV1(SyncUserDeleteV1 data); + + Future updatePartnerV1(SyncPartnerV1 data); + Future deletePartnerV1(SyncPartnerDeleteV1 data); +} diff --git a/mobile/lib/domain/models/sync/sync_event.model.dart b/mobile/lib/domain/models/sync/sync_event.model.dart index f4642d59cf..2ad8a75fe1 100644 --- a/mobile/lib/domain/models/sync/sync_event.model.dart +++ b/mobile/lib/domain/models/sync/sync_event.model.dart @@ -1,14 +1,13 @@ -class SyncEvent { - // dynamic - final dynamic data; +import 'package:openapi/api.dart'; +class SyncEvent { + final SyncEntityType type; + // ignore: avoid-dynamic + final dynamic data; final String ack; - SyncEvent({ - required this.data, - required this.ack, - }); + const SyncEvent({required this.type, required this.data, required this.ack}); @override - String toString() => 'SyncEvent(data: $data, ack: $ack)'; + String toString() => 'SyncEvent(type: $type, data: $data, ack: $ack)'; } diff --git a/mobile/lib/domain/services/sync_stream.service.dart b/mobile/lib/domain/services/sync_stream.service.dart index 72e29b3677..ff946ec1e3 100644 --- a/mobile/lib/domain/services/sync_stream.service.dart +++ b/mobile/lib/domain/services/sync_stream.service.dart @@ -1,49 +1,96 @@ +// ignore_for_file: avoid-passing-async-when-sync-expected + import 'dart:async'; -import 'package:flutter/foundation.dart'; import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; +import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; +import 'package:immich_mobile/domain/models/sync/sync_event.model.dart'; +import 'package:logging/logging.dart'; import 'package:openapi/api.dart'; class SyncStreamService { - final ISyncApiRepository _syncApiRepository; + final Logger _logger = Logger('SyncStreamService'); - SyncStreamService(this._syncApiRepository); + final ISyncApiRepository _syncApiRepository; + final ISyncStreamRepository _syncStreamRepository; StreamSubscription? _userSyncSubscription; + Completer _userSyncCompleter = Completer(); - void syncUsers() { - _userSyncSubscription = - _syncApiRepository.watchUserSyncEvent().listen((events) async { - for (final event in events) { - if (event.data is SyncUserV1) { - final data = event.data as SyncUserV1; - debugPrint("User Update: $data"); + StreamSubscription? _partnerSyncSubscription; + Completer _partnerSyncCompleter = Completer(); - // final user = await _userRepository.get(data.id); + SyncStreamService({ + required ISyncApiRepository syncApiRepository, + required ISyncStreamRepository syncStreamRepository, + }) : _syncApiRepository = syncApiRepository, + _syncStreamRepository = syncStreamRepository; - // if (user == null) { - // continue; - // } + // ignore: avoid-dynamic + Future _handleSyncData(dynamic data) async { + if (data is SyncPartnerV1) { + _logger.fine("SyncPartnerV1: $data"); + return await _syncStreamRepository.updatePartnerV1(data); + } - // user.name = data.name; - // user.email = data.email; - // user.updatedAt = DateTime.now(); + if (data is SyncUserV1) { + _logger.fine("SyncUserV1: $data"); + return await _syncStreamRepository.updateUsersV1(data); + } - // await _userRepository.update(user); - // await _syncApiRepository.ack(event.ack); - } + if (data is SyncPartnerDeleteV1) { + _logger.fine("SyncPartnerDeleteV1: $data"); + return await _syncStreamRepository.deletePartnerV1(data); + } - if (event.data is SyncUserDeleteV1) { - final data = event.data as SyncUserDeleteV1; + if (data is SyncUserDeleteV1) { + _logger.fine("SyncUserDeleteV1: $data"); + return await _syncStreamRepository.deleteUsersV1(data); + } - debugPrint("User delete: $data"); - // await _syncApiRepository.ack(event.ack); - } + return false; + } + + Future _handleSyncEvents(List events) async { + Map acks = {}; + for (final event in events) { + if (await _handleSyncData(event.data)) { + // Only retain the latest ack from each type + acks[event.type] = event.ack; } - }); + } + await _syncApiRepository.ack(acks.values.toList()); + } + + Future syncUsers() async { + _logger.info("Syncing User Changes"); + _userSyncSubscription = _syncApiRepository.watchUserSyncEvent().listen( + _handleSyncEvents, + onDone: () { + _userSyncCompleter.complete(); + _userSyncCompleter = Completer(); + }, + ); + return await _userSyncCompleter.future; + } + + Future syncPartners() async { + _logger.info("Syncing Partner Changes"); + _partnerSyncSubscription = + _syncApiRepository.watchPartnerSyncEvent().listen( + _handleSyncEvents, + onDone: () { + _partnerSyncCompleter.complete(); + _partnerSyncCompleter = Completer(); + }, + ); + return await _partnerSyncCompleter.future; } Future dispose() async { await _userSyncSubscription?.cancel(); + _userSyncCompleter.complete(); + await _partnerSyncSubscription?.cancel(); + _partnerSyncCompleter.complete(); } } diff --git a/mobile/lib/infrastructure/repositories/sync_api.repository.dart b/mobile/lib/infrastructure/repositories/sync_api.repository.dart index 88a6838c44..c2c33b6b05 100644 --- a/mobile/lib/infrastructure/repositories/sync_api.repository.dart +++ b/mobile/lib/infrastructure/repositories/sync_api.repository.dart @@ -1,27 +1,31 @@ import 'dart:async'; import 'dart:convert'; -import 'package:flutter/foundation.dart'; +import 'package:http/http.dart' as http; import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart'; import 'package:immich_mobile/domain/models/sync/sync_event.model.dart'; import 'package:immich_mobile/services/api.service.dart'; +import 'package:logging/logging.dart'; import 'package:openapi/api.dart'; -import 'package:http/http.dart' as http; class SyncApiRepository implements ISyncApiRepository { + final Logger _logger = Logger('SyncApiRepository'); final ApiService _api; - const SyncApiRepository(this._api); + SyncApiRepository(this._api); @override Stream> watchUserSyncEvent() { - return _getSyncStream( - SyncStreamDto(types: [SyncRequestType.usersV1]), - ); + return _getSyncStream(SyncStreamDto(types: [SyncRequestType.usersV1])); } @override - Future ack(String data) { - return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: [data])); + Stream> watchPartnerSyncEvent() { + return _getSyncStream(SyncStreamDto(types: [SyncRequestType.partnersV1])); + } + + @override + Future ack(List data) { + return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: data)); } Stream> _getSyncStream( @@ -31,7 +35,7 @@ class SyncApiRepository implements ISyncApiRepository { final client = http.Client(); final endpoint = "${_api.apiClient.basePath}/sync/stream"; - final headers = { + final headers = { 'Content-Type': 'application/json', 'Accept': 'application/jsonlines+json', }; @@ -61,7 +65,8 @@ class SyncApiRepository implements ISyncApiRepository { await for (final chunk in response.stream.transform(utf8.decoder)) { previousChunk += chunk; - final parts = previousChunk.split('\n'); + // ignore: move-variable-outside-iteration + final parts = previousChunk.toString().split('\n'); previousChunk = parts.removeLast(); lines.addAll(parts); @@ -69,44 +74,46 @@ class SyncApiRepository implements ISyncApiRepository { continue; } - yield await compute(_parseSyncResponse, lines); + yield _parseSyncResponse(lines); lines.clear(); } } finally { if (lines.isNotEmpty) { - yield await compute(_parseSyncResponse, lines); + yield _parseSyncResponse(lines); } client.close(); } } + + List _parseSyncResponse(List lines) { + final List data = []; + + for (final line in lines) { + try { + final jsonData = jsonDecode(line); + final type = SyncEntityType.fromJson(jsonData['type'])!; + final dataJson = jsonData['data']; + final ack = jsonData['ack']; + final converter = _kResponseMap[type]; + if (converter == null) { + _logger.warning("[_parseSyncReponse] Unknown type $type"); + continue; + } + + data.add(SyncEvent(type: type, data: converter(dataJson), ack: ack)); + } catch (error, stack) { + _logger.severe("[_parseSyncResponse] Error parsing json", error, stack); + } + } + + return data; + } } +// ignore: avoid-dynamic const _kResponseMap = { SyncEntityType.userV1: SyncUserV1.fromJson, SyncEntityType.userDeleteV1: SyncUserDeleteV1.fromJson, + SyncEntityType.partnerV1: SyncPartnerV1.fromJson, + SyncEntityType.partnerDeleteV1: SyncPartnerDeleteV1.fromJson, }; - -// Need to be outside of the class to be able to use compute -List _parseSyncResponse(List lines) { - final List data = []; - - for (var line in lines) { - try { - final jsonData = jsonDecode(line); - final type = SyncEntityType.fromJson(jsonData['type'])!; - final dataJson = jsonData['data']; - final ack = jsonData['ack']; - final converter = _kResponseMap[type]; - if (converter == null) { - debugPrint("[_parseSyncReponse] Unknown type $type"); - continue; - } - - data.add(SyncEvent(data: converter(dataJson), ack: ack)); - } catch (error, stack) { - debugPrint("[_parseSyncReponse] Error parsing json $error $stack"); - } - } - - return data; -} diff --git a/mobile/lib/infrastructure/repositories/sync_stream.repository.dart b/mobile/lib/infrastructure/repositories/sync_stream.repository.dart new file mode 100644 index 0000000000..a765315452 --- /dev/null +++ b/mobile/lib/infrastructure/repositories/sync_stream.repository.dart @@ -0,0 +1,84 @@ +import 'package:drift/drift.dart'; +import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart'; +import 'package:immich_mobile/infrastructure/entities/partner.entity.drift.dart'; +import 'package:immich_mobile/infrastructure/entities/user.entity.drift.dart'; +import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; +import 'package:logging/logging.dart'; +import 'package:openapi/api.dart'; + +class DriftSyncStreamRepository extends DriftDatabaseRepository + implements ISyncStreamRepository { + final Logger _logger = Logger('DriftSyncStreamRepository'); + final Drift _db; + + DriftSyncStreamRepository(super.db) : _db = db; + + @override + Future deleteUsersV1(SyncUserDeleteV1 data) async { + try { + await _db.managers.userEntity + .filter((row) => row.id.equals(data.userId)) + .delete(); + return true; + } catch (e, s) { + _logger.severe('Error while processing SyncUserDeleteV1', e, s); + return false; + } + } + + @override + Future updateUsersV1(SyncUserV1 data) async { + final companion = UserEntityCompanion( + name: Value(data.name), + email: Value(data.email), + ); + + try { + await _db.userEntity.insertOne( + companion.copyWith(id: Value(data.id)), + onConflict: DoUpdate((_) => companion), + ); + return true; + } catch (e, s) { + _logger.severe('Error while processing SyncUserV1', e, s); + return false; + } + } + + @override + Future deletePartnerV1(SyncPartnerDeleteV1 data) async { + try { + await _db.managers.partnerEntity + .filter( + (row) => + row.sharedById.id.equals(data.sharedById) & + row.sharedWithId.id.equals(data.sharedWithId), + ) + .delete(); + return true; + } catch (e, s) { + _logger.severe('Error while processing SyncPartnerDeleteV1', e, s); + return false; + } + } + + @override + Future updatePartnerV1(SyncPartnerV1 data) async { + final companion = + PartnerEntityCompanion(inTimeline: Value(data.inTimeline)); + + try { + await _db.partnerEntity.insertOne( + companion.copyWith( + sharedById: Value(data.sharedById), + sharedWithId: Value(data.sharedWithId), + ), + onConflict: DoUpdate((_) => companion), + ); + return true; + } catch (e, s) { + _logger.severe('Error while processing SyncUserV1', e, s); + return false; + } + } +} diff --git a/mobile/lib/providers/app_life_cycle.provider.dart b/mobile/lib/providers/app_life_cycle.provider.dart index 3ec7813e2f..e45fcb56f6 100644 --- a/mobile/lib/providers/app_life_cycle.provider.dart +++ b/mobile/lib/providers/app_life_cycle.provider.dart @@ -7,6 +7,7 @@ import 'package:immich_mobile/models/backup/backup_state.model.dart'; import 'package:immich_mobile/providers/album/album.provider.dart'; import 'package:immich_mobile/providers/asset.provider.dart'; import 'package:immich_mobile/providers/auth.provider.dart'; +import 'package:immich_mobile/providers/background_sync.provider.dart'; import 'package:immich_mobile/providers/backup/backup.provider.dart'; import 'package:immich_mobile/providers/backup/ios_background_settings.provider.dart'; import 'package:immich_mobile/providers/backup/manual_upload.provider.dart'; @@ -113,6 +114,7 @@ class AppLifeCycleNotifier extends StateNotifier { _ref.read(backupProvider.notifier).cancelBackup(); } _ref.read(websocketProvider.notifier).disconnect(); + _ref.read(backgroundSyncProvider).stop(); } LogService.I.flush(); diff --git a/mobile/lib/providers/background_sync.provider.dart b/mobile/lib/providers/background_sync.provider.dart new file mode 100644 index 0000000000..7782885a04 --- /dev/null +++ b/mobile/lib/providers/background_sync.provider.dart @@ -0,0 +1,10 @@ +import 'package:hooks_riverpod/hooks_riverpod.dart'; +import 'package:immich_mobile/constants/constants.dart'; +import 'package:immich_mobile/utils/background_sync.dart'; +import 'package:riverpod_annotation/riverpod_annotation.dart'; + +part 'background_sync.provider.g.dart'; + +@Riverpod(keepAlive: true) +BackgroundSyncManager backgroundSync(Ref _) => + BackgroundSyncManager(duration: kBackgroundSyncDuration); diff --git a/mobile/lib/providers/background_sync.provider.g.dart b/mobile/lib/providers/background_sync.provider.g.dart new file mode 100644 index 0000000000..fc9f1f1528 --- /dev/null +++ b/mobile/lib/providers/background_sync.provider.g.dart @@ -0,0 +1,27 @@ +// GENERATED CODE - DO NOT MODIFY BY HAND + +part of 'background_sync.provider.dart'; + +// ************************************************************************** +// RiverpodGenerator +// ************************************************************************** + +String _$backgroundSyncHash() => r'c08b6499af18d3fedeb9bb6c4ac0833c656f30dd'; + +/// See also [backgroundSync]. +@ProviderFor(backgroundSync) +final backgroundSyncProvider = Provider.internal( + backgroundSync, + name: r'backgroundSyncProvider', + debugGetCreateSourceHash: const bool.fromEnvironment('dart.vm.product') + ? null + : _$backgroundSyncHash, + dependencies: null, + allTransitiveDependencies: null, +); + +@Deprecated('Will be removed in 3.0. Use Ref instead') +// ignore: unused_element +typedef BackgroundSyncRef = ProviderRef; +// ignore_for_file: type=lint +// ignore_for_file: subtype_of_sealed_class, invalid_use_of_internal_member, invalid_use_of_visible_for_testing_member, deprecated_member_use_from_same_package diff --git a/mobile/lib/providers/db.provider.dart b/mobile/lib/providers/db.provider.dart index e03e037f36..cdcfbcf606 100644 --- a/mobile/lib/providers/db.provider.dart +++ b/mobile/lib/providers/db.provider.dart @@ -1,5 +1,10 @@ import 'package:hooks_riverpod/hooks_riverpod.dart'; +import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; import 'package:isar/isar.dart'; +import 'package:riverpod_annotation/riverpod_annotation.dart'; // overwritten in main.dart due to async loading final dbProvider = Provider((_) => throw UnimplementedError()); + +@Riverpod(keepAlive: true) +Drift drift(Ref _) => Drift(); diff --git a/mobile/lib/providers/infrastructure/db.provider.dart b/mobile/lib/providers/infrastructure/db.provider.dart index 84010b3b96..5d052ddcd5 100644 --- a/mobile/lib/providers/infrastructure/db.provider.dart +++ b/mobile/lib/providers/infrastructure/db.provider.dart @@ -1,4 +1,5 @@ import 'package:hooks_riverpod/hooks_riverpod.dart'; +import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; import 'package:isar/isar.dart'; import 'package:riverpod_annotation/riverpod_annotation.dart'; @@ -6,3 +7,6 @@ part 'db.provider.g.dart'; @Riverpod(keepAlive: true) Isar isar(Ref ref) => throw UnimplementedError('isar'); + +@Riverpod(keepAlive: true) +Drift drift(Ref _) => Drift(); diff --git a/mobile/lib/providers/infrastructure/db.provider.g.dart b/mobile/lib/providers/infrastructure/db.provider.g.dart index 33b330192f..5539886cd6 100644 --- a/mobile/lib/providers/infrastructure/db.provider.g.dart +++ b/mobile/lib/providers/infrastructure/db.provider.g.dart @@ -22,5 +22,21 @@ final isarProvider = Provider.internal( @Deprecated('Will be removed in 3.0. Use Ref instead') // ignore: unused_element typedef IsarRef = ProviderRef; +String _$driftHash() => r'21ceb1dbc569b877cb710387b350590e994fe64e'; + +/// See also [drift]. +@ProviderFor(drift) +final driftProvider = Provider.internal( + drift, + name: r'driftProvider', + debugGetCreateSourceHash: + const bool.fromEnvironment('dart.vm.product') ? null : _$driftHash, + dependencies: null, + allTransitiveDependencies: null, +); + +@Deprecated('Will be removed in 3.0. Use Ref instead') +// ignore: unused_element +typedef DriftRef = ProviderRef; // ignore_for_file: type=lint // ignore_for_file: subtype_of_sealed_class, invalid_use_of_internal_member, invalid_use_of_visible_for_testing_member, deprecated_member_use_from_same_package diff --git a/mobile/lib/providers/infrastructure/sync_stream.provider.dart b/mobile/lib/providers/infrastructure/sync_stream.provider.dart index 64f1a6cb05..085661ad5c 100644 --- a/mobile/lib/providers/infrastructure/sync_stream.provider.dart +++ b/mobile/lib/providers/infrastructure/sync_stream.provider.dart @@ -3,22 +3,25 @@ import 'dart:async'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/services/sync_stream.service.dart'; import 'package:immich_mobile/infrastructure/repositories/sync_api.repository.dart'; +import 'package:immich_mobile/infrastructure/repositories/sync_stream.repository.dart'; import 'package:immich_mobile/providers/api.provider.dart'; +import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; -final syncStreamServiceProvider = Provider( - (ref) { - final instance = SyncStreamService( - ref.watch(syncApiRepositoryProvider), - ); +final syncStreamServiceProvider = Provider((ref) { + final instance = SyncStreamService( + syncApiRepository: ref.watch(syncApiRepositoryProvider), + syncStreamRepository: ref.watch(syncStreamRepositoryProvider), + ); - ref.onDispose(() => unawaited(instance.dispose())); + ref.onDispose(() => unawaited(instance.dispose())); - return instance; - }, -); + return instance; +}); final syncApiRepositoryProvider = Provider( - (ref) => SyncApiRepository( - ref.watch(apiServiceProvider), - ), + (ref) => SyncApiRepository(ref.watch(apiServiceProvider)), +); + +final syncStreamRepositoryProvider = Provider( + (ref) => DriftSyncStreamRepository(ref.watch(driftProvider)), ); diff --git a/mobile/lib/utils/background_sync.dart b/mobile/lib/utils/background_sync.dart new file mode 100644 index 0000000000..327daa3171 --- /dev/null +++ b/mobile/lib/utils/background_sync.dart @@ -0,0 +1,58 @@ +// ignore_for_file: avoid-passing-async-when-sync-expected + +import 'dart:async'; + +import 'package:async/async.dart'; +import 'package:immich_mobile/providers/infrastructure/sync_stream.provider.dart'; +import 'package:immich_mobile/utils/isolate.dart'; +import 'package:logging/logging.dart'; + +class _SyncStreamDriver { + final _userSyncCache = AsyncCache.ephemeral(); + final _partnerSyncCache = AsyncCache.ephemeral(); + + Future syncUsers() => _userSyncCache.fetch( + () async => runInIsolate( + (ref) => ref.read(syncStreamServiceProvider).syncUsers(), + ), + ); + + Future syncPartners() => _partnerSyncCache.fetch( + () async => runInIsolate( + (ref) => ref.read(syncStreamServiceProvider).syncPartners(), + ), + ); +} + +class BackgroundSyncManager { + final Logger _logger = Logger('BackgroundSyncManager'); + Timer? _timer; + final Duration _duration; + // This allows us to keep synching in the background while allowing ondemand syncs + final _driver = _SyncStreamDriver(); + + BackgroundSyncManager({required Duration duration}) : _duration = duration; + + Timer _createTimer() { + return Timer.periodic(_duration, (timer) async { + _logger.info('Background sync started'); + await _driver.syncUsers(); + await _driver.syncPartners(); + _logger.info('Background sync completed'); + }); + } + + void start() { + _logger.info('Background sync enabled'); + _timer ??= _createTimer(); + } + + void stop() { + _logger.info('Background sync disabled'); + _timer?.cancel(); + _timer = null; + } + + Future syncUsers() => _driver.syncUsers(); + Future syncPartners() => _driver.syncPartners(); +} diff --git a/mobile/lib/utils/isolate.dart b/mobile/lib/utils/isolate.dart new file mode 100644 index 0000000000..22c4f4e7fd --- /dev/null +++ b/mobile/lib/utils/isolate.dart @@ -0,0 +1,67 @@ +import 'dart:async'; +import 'dart:isolate'; +import 'dart:ui'; + +import 'package:flutter/material.dart'; +import 'package:flutter/services.dart'; +import 'package:hooks_riverpod/hooks_riverpod.dart'; +import 'package:immich_mobile/providers/db.provider.dart'; +import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; +import 'package:immich_mobile/utils/bootstrap.dart'; +import 'package:logging/logging.dart'; + +class InvalidIsolateUsageException implements Exception { + const InvalidIsolateUsageException(); + + @override + String toString() => + "IsolateHelper should only be used from the root isolate"; +} + +// !! Should be used only from the root isolate +Future runInIsolate( + FutureOr Function(ProviderContainer ref) computation, { + String? debugLabel, +}) async { + final token = RootIsolateToken.instance; + if (token == null) { + throw const InvalidIsolateUsageException(); + } + + return await Isolate.run(() async { + BackgroundIsolateBinaryMessenger.ensureInitialized(token); + DartPluginRegistrant.ensureInitialized(); + + final db = await Bootstrap.initIsar(); + await Bootstrap.initDomain(db); + final ref = ProviderContainer( + overrides: [ + // TODO: Remove once isar is removed + dbProvider.overrideWithValue(db), + isarProvider.overrideWithValue(db), + ], + ); + + Logger log = Logger("IsolateLogger"); + + try { + final result = await computation(ref); + // Wait for isolate to end; i.e, logs to be flushed + await Future.delayed(Durations.short2); + return result; + } catch (error, stack) { + // Log the error and stack trace + log.severe( + "Error in runInIsolate${debugLabel == null ? '' : ' for $debugLabel'}", + error, + stack, + ); + } finally { + // Always close the new database connection on Isolate end + ref.read(driftProvider).close(); + ref.read(dbProvider).close(); + ref.read(isarProvider).close(); + } + return null; + }); +}