feat: user & partner sync stream

This commit is contained in:
shenlong-tanwen 2025-03-14 11:06:42 +05:30
parent 734566db50
commit 3739e86974
17 changed files with 429 additions and 84 deletions

View File

@ -92,6 +92,8 @@ custom_lint:
allowed:
# required / wanted
- lib/repositories/*_api.repository.dart
- lib/domain/models/sync/sync_event.model.dart
- lib/{domain,infrastructure}/**/sync_stream.*
- lib/infrastructure/repositories/*_api.repository.dart
- lib/infrastructure/utils/*.converter.dart
# acceptable exceptions for the time being

View File

@ -5,5 +5,7 @@ const double downloadFailed = -2;
// Number of log entries to retain on app start
const int kLogTruncateLimit = 250;
// Duration for backgroun sync
const Duration kBackgroundSyncDuration = Duration(minutes: 1);
const int kBatchHashFileLimit = 128;
const int kBatchHashSizeLimit = 1024 * 1024 * 1024; // 1GB

View File

@ -1,7 +1,9 @@
import 'package:immich_mobile/domain/models/sync/sync_event.model.dart';
abstract interface class ISyncApiRepository {
Future<void> ack(String data);
Future<void> ack(List<String> data);
Stream<List<SyncEvent>> watchUserSyncEvent();
Stream<List<SyncEvent>> watchPartnerSyncEvent();
}

View File

@ -0,0 +1,10 @@
import 'package:immich_mobile/domain/interfaces/db.interface.dart';
import 'package:openapi/api.dart';
abstract interface class ISyncStreamRepository implements IDatabaseRepository {
Future<bool> updateUsersV1(SyncUserV1 data);
Future<bool> deleteUsersV1(SyncUserDeleteV1 data);
Future<bool> updatePartnerV1(SyncPartnerV1 data);
Future<bool> deletePartnerV1(SyncPartnerDeleteV1 data);
}

View File

@ -1,14 +1,13 @@
class SyncEvent {
// dynamic
final dynamic data;
import 'package:openapi/api.dart';
class SyncEvent {
final SyncEntityType type;
// ignore: avoid-dynamic
final dynamic data;
final String ack;
SyncEvent({
required this.data,
required this.ack,
});
const SyncEvent({required this.type, required this.data, required this.ack});
@override
String toString() => 'SyncEvent(data: $data, ack: $ack)';
String toString() => 'SyncEvent(type: $type, data: $data, ack: $ack)';
}

View File

@ -1,49 +1,96 @@
// ignore_for_file: avoid-passing-async-when-sync-expected
import 'dart:async';
import 'package:flutter/foundation.dart';
import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
import 'package:immich_mobile/domain/models/sync/sync_event.model.dart';
import 'package:logging/logging.dart';
import 'package:openapi/api.dart';
class SyncStreamService {
final ISyncApiRepository _syncApiRepository;
final Logger _logger = Logger('SyncStreamService');
SyncStreamService(this._syncApiRepository);
final ISyncApiRepository _syncApiRepository;
final ISyncStreamRepository _syncStreamRepository;
StreamSubscription? _userSyncSubscription;
Completer<void> _userSyncCompleter = Completer<void>();
void syncUsers() {
_userSyncSubscription =
_syncApiRepository.watchUserSyncEvent().listen((events) async {
StreamSubscription? _partnerSyncSubscription;
Completer<void> _partnerSyncCompleter = Completer<void>();
SyncStreamService({
required ISyncApiRepository syncApiRepository,
required ISyncStreamRepository syncStreamRepository,
}) : _syncApiRepository = syncApiRepository,
_syncStreamRepository = syncStreamRepository;
// ignore: avoid-dynamic
Future<bool> _handleSyncData(dynamic data) async {
if (data is SyncPartnerV1) {
_logger.fine("SyncPartnerV1: $data");
return await _syncStreamRepository.updatePartnerV1(data);
}
if (data is SyncUserV1) {
_logger.fine("SyncUserV1: $data");
return await _syncStreamRepository.updateUsersV1(data);
}
if (data is SyncPartnerDeleteV1) {
_logger.fine("SyncPartnerDeleteV1: $data");
return await _syncStreamRepository.deletePartnerV1(data);
}
if (data is SyncUserDeleteV1) {
_logger.fine("SyncUserDeleteV1: $data");
return await _syncStreamRepository.deleteUsersV1(data);
}
return false;
}
Future<void> _handleSyncEvents(List<SyncEvent> events) async {
Map<SyncEntityType, String> acks = {};
for (final event in events) {
if (event.data is SyncUserV1) {
final data = event.data as SyncUserV1;
debugPrint("User Update: $data");
// final user = await _userRepository.get(data.id);
// if (user == null) {
// continue;
// }
// user.name = data.name;
// user.email = data.email;
// user.updatedAt = DateTime.now();
// await _userRepository.update(user);
// await _syncApiRepository.ack(event.ack);
}
if (event.data is SyncUserDeleteV1) {
final data = event.data as SyncUserDeleteV1;
debugPrint("User delete: $data");
// await _syncApiRepository.ack(event.ack);
if (await _handleSyncData(event.data)) {
// Only retain the latest ack from each type
acks[event.type] = event.ack;
}
}
});
await _syncApiRepository.ack(acks.values.toList());
}
Future<void> syncUsers() async {
_logger.info("Syncing User Changes");
_userSyncSubscription = _syncApiRepository.watchUserSyncEvent().listen(
_handleSyncEvents,
onDone: () {
_userSyncCompleter.complete();
_userSyncCompleter = Completer<void>();
},
);
return await _userSyncCompleter.future;
}
Future<void> syncPartners() async {
_logger.info("Syncing Partner Changes");
_partnerSyncSubscription =
_syncApiRepository.watchPartnerSyncEvent().listen(
_handleSyncEvents,
onDone: () {
_partnerSyncCompleter.complete();
_partnerSyncCompleter = Completer<void>();
},
);
return await _partnerSyncCompleter.future;
}
Future<void> dispose() async {
await _userSyncSubscription?.cancel();
_userSyncCompleter.complete();
await _partnerSyncSubscription?.cancel();
_partnerSyncCompleter.complete();
}
}

View File

@ -1,27 +1,31 @@
import 'dart:async';
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:http/http.dart' as http;
import 'package:immich_mobile/domain/interfaces/sync_api.interface.dart';
import 'package:immich_mobile/domain/models/sync/sync_event.model.dart';
import 'package:immich_mobile/services/api.service.dart';
import 'package:logging/logging.dart';
import 'package:openapi/api.dart';
import 'package:http/http.dart' as http;
class SyncApiRepository implements ISyncApiRepository {
final Logger _logger = Logger('SyncApiRepository');
final ApiService _api;
const SyncApiRepository(this._api);
SyncApiRepository(this._api);
@override
Stream<List<SyncEvent>> watchUserSyncEvent() {
return _getSyncStream(
SyncStreamDto(types: [SyncRequestType.usersV1]),
);
return _getSyncStream(SyncStreamDto(types: [SyncRequestType.usersV1]));
}
@override
Future<void> ack(String data) {
return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: [data]));
Stream<List<SyncEvent>> watchPartnerSyncEvent() {
return _getSyncStream(SyncStreamDto(types: [SyncRequestType.partnersV1]));
}
@override
Future<void> ack(List<String> data) {
return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: data));
}
Stream<List<SyncEvent>> _getSyncStream(
@ -31,7 +35,7 @@ class SyncApiRepository implements ISyncApiRepository {
final client = http.Client();
final endpoint = "${_api.apiClient.basePath}/sync/stream";
final headers = <String, String>{
final headers = {
'Content-Type': 'application/json',
'Accept': 'application/jsonlines+json',
};
@ -61,7 +65,8 @@ class SyncApiRepository implements ISyncApiRepository {
await for (final chunk in response.stream.transform(utf8.decoder)) {
previousChunk += chunk;
final parts = previousChunk.split('\n');
// ignore: move-variable-outside-iteration
final parts = previousChunk.toString().split('\n');
previousChunk = parts.removeLast();
lines.addAll(parts);
@ -69,28 +74,21 @@ class SyncApiRepository implements ISyncApiRepository {
continue;
}
yield await compute(_parseSyncResponse, lines);
yield _parseSyncResponse(lines);
lines.clear();
}
} finally {
if (lines.isNotEmpty) {
yield await compute(_parseSyncResponse, lines);
yield _parseSyncResponse(lines);
}
client.close();
}
}
}
const _kResponseMap = <SyncEntityType, Function(dynamic)>{
SyncEntityType.userV1: SyncUserV1.fromJson,
SyncEntityType.userDeleteV1: SyncUserDeleteV1.fromJson,
};
// Need to be outside of the class to be able to use compute
List<SyncEvent> _parseSyncResponse(List<String> lines) {
final List<SyncEvent> data = [];
for (var line in lines) {
for (final line in lines) {
try {
final jsonData = jsonDecode(line);
final type = SyncEntityType.fromJson(jsonData['type'])!;
@ -98,15 +96,24 @@ List<SyncEvent> _parseSyncResponse(List<String> lines) {
final ack = jsonData['ack'];
final converter = _kResponseMap[type];
if (converter == null) {
debugPrint("[_parseSyncReponse] Unknown type $type");
_logger.warning("[_parseSyncReponse] Unknown type $type");
continue;
}
data.add(SyncEvent(data: converter(dataJson), ack: ack));
data.add(SyncEvent(type: type, data: converter(dataJson), ack: ack));
} catch (error, stack) {
debugPrint("[_parseSyncReponse] Error parsing json $error $stack");
_logger.severe("[_parseSyncResponse] Error parsing json", error, stack);
}
}
return data;
}
}
// ignore: avoid-dynamic
const _kResponseMap = <SyncEntityType, Function(dynamic)>{
SyncEntityType.userV1: SyncUserV1.fromJson,
SyncEntityType.userDeleteV1: SyncUserDeleteV1.fromJson,
SyncEntityType.partnerV1: SyncPartnerV1.fromJson,
SyncEntityType.partnerDeleteV1: SyncPartnerDeleteV1.fromJson,
};

View File

@ -0,0 +1,84 @@
import 'package:drift/drift.dart';
import 'package:immich_mobile/domain/interfaces/sync_stream.interface.dart';
import 'package:immich_mobile/infrastructure/entities/partner.entity.drift.dart';
import 'package:immich_mobile/infrastructure/entities/user.entity.drift.dart';
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
import 'package:logging/logging.dart';
import 'package:openapi/api.dart';
class DriftSyncStreamRepository extends DriftDatabaseRepository
implements ISyncStreamRepository {
final Logger _logger = Logger('DriftSyncStreamRepository');
final Drift _db;
DriftSyncStreamRepository(super.db) : _db = db;
@override
Future<bool> deleteUsersV1(SyncUserDeleteV1 data) async {
try {
await _db.managers.userEntity
.filter((row) => row.id.equals(data.userId))
.delete();
return true;
} catch (e, s) {
_logger.severe('Error while processing SyncUserDeleteV1', e, s);
return false;
}
}
@override
Future<bool> updateUsersV1(SyncUserV1 data) async {
final companion = UserEntityCompanion(
name: Value(data.name),
email: Value(data.email),
);
try {
await _db.userEntity.insertOne(
companion.copyWith(id: Value(data.id)),
onConflict: DoUpdate((_) => companion),
);
return true;
} catch (e, s) {
_logger.severe('Error while processing SyncUserV1', e, s);
return false;
}
}
@override
Future<bool> deletePartnerV1(SyncPartnerDeleteV1 data) async {
try {
await _db.managers.partnerEntity
.filter(
(row) =>
row.sharedById.id.equals(data.sharedById) &
row.sharedWithId.id.equals(data.sharedWithId),
)
.delete();
return true;
} catch (e, s) {
_logger.severe('Error while processing SyncPartnerDeleteV1', e, s);
return false;
}
}
@override
Future<bool> updatePartnerV1(SyncPartnerV1 data) async {
final companion =
PartnerEntityCompanion(inTimeline: Value(data.inTimeline));
try {
await _db.partnerEntity.insertOne(
companion.copyWith(
sharedById: Value(data.sharedById),
sharedWithId: Value(data.sharedWithId),
),
onConflict: DoUpdate((_) => companion),
);
return true;
} catch (e, s) {
_logger.severe('Error while processing SyncUserV1', e, s);
return false;
}
}
}

View File

@ -7,6 +7,7 @@ import 'package:immich_mobile/models/backup/backup_state.model.dart';
import 'package:immich_mobile/providers/album/album.provider.dart';
import 'package:immich_mobile/providers/asset.provider.dart';
import 'package:immich_mobile/providers/auth.provider.dart';
import 'package:immich_mobile/providers/background_sync.provider.dart';
import 'package:immich_mobile/providers/backup/backup.provider.dart';
import 'package:immich_mobile/providers/backup/ios_background_settings.provider.dart';
import 'package:immich_mobile/providers/backup/manual_upload.provider.dart';
@ -113,6 +114,7 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
_ref.read(backupProvider.notifier).cancelBackup();
}
_ref.read(websocketProvider.notifier).disconnect();
_ref.read(backgroundSyncProvider).stop();
}
LogService.I.flush();

View File

@ -0,0 +1,10 @@
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/constants/constants.dart';
import 'package:immich_mobile/utils/background_sync.dart';
import 'package:riverpod_annotation/riverpod_annotation.dart';
part 'background_sync.provider.g.dart';
@Riverpod(keepAlive: true)
BackgroundSyncManager backgroundSync(Ref _) =>
BackgroundSyncManager(duration: kBackgroundSyncDuration);

View File

@ -0,0 +1,27 @@
// GENERATED CODE - DO NOT MODIFY BY HAND
part of 'background_sync.provider.dart';
// **************************************************************************
// RiverpodGenerator
// **************************************************************************
String _$backgroundSyncHash() => r'c08b6499af18d3fedeb9bb6c4ac0833c656f30dd';
/// See also [backgroundSync].
@ProviderFor(backgroundSync)
final backgroundSyncProvider = Provider<BackgroundSyncManager>.internal(
backgroundSync,
name: r'backgroundSyncProvider',
debugGetCreateSourceHash: const bool.fromEnvironment('dart.vm.product')
? null
: _$backgroundSyncHash,
dependencies: null,
allTransitiveDependencies: null,
);
@Deprecated('Will be removed in 3.0. Use Ref instead')
// ignore: unused_element
typedef BackgroundSyncRef = ProviderRef<BackgroundSyncManager>;
// ignore_for_file: type=lint
// ignore_for_file: subtype_of_sealed_class, invalid_use_of_internal_member, invalid_use_of_visible_for_testing_member, deprecated_member_use_from_same_package

View File

@ -1,5 +1,10 @@
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
import 'package:isar/isar.dart';
import 'package:riverpod_annotation/riverpod_annotation.dart';
// overwritten in main.dart due to async loading
final dbProvider = Provider<Isar>((_) => throw UnimplementedError());
@Riverpod(keepAlive: true)
Drift drift(Ref _) => Drift();

View File

@ -1,4 +1,5 @@
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
import 'package:isar/isar.dart';
import 'package:riverpod_annotation/riverpod_annotation.dart';
@ -6,3 +7,6 @@ part 'db.provider.g.dart';
@Riverpod(keepAlive: true)
Isar isar(Ref ref) => throw UnimplementedError('isar');
@Riverpod(keepAlive: true)
Drift drift(Ref _) => Drift();

View File

@ -22,5 +22,21 @@ final isarProvider = Provider<Isar>.internal(
@Deprecated('Will be removed in 3.0. Use Ref instead')
// ignore: unused_element
typedef IsarRef = ProviderRef<Isar>;
String _$driftHash() => r'21ceb1dbc569b877cb710387b350590e994fe64e';
/// See also [drift].
@ProviderFor(drift)
final driftProvider = Provider<Drift>.internal(
drift,
name: r'driftProvider',
debugGetCreateSourceHash:
const bool.fromEnvironment('dart.vm.product') ? null : _$driftHash,
dependencies: null,
allTransitiveDependencies: null,
);
@Deprecated('Will be removed in 3.0. Use Ref instead')
// ignore: unused_element
typedef DriftRef = ProviderRef<Drift>;
// ignore_for_file: type=lint
// ignore_for_file: subtype_of_sealed_class, invalid_use_of_internal_member, invalid_use_of_visible_for_testing_member, deprecated_member_use_from_same_package

View File

@ -3,22 +3,25 @@ import 'dart:async';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/domain/services/sync_stream.service.dart';
import 'package:immich_mobile/infrastructure/repositories/sync_api.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/sync_stream.repository.dart';
import 'package:immich_mobile/providers/api.provider.dart';
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
final syncStreamServiceProvider = Provider(
(ref) {
final syncStreamServiceProvider = Provider((ref) {
final instance = SyncStreamService(
ref.watch(syncApiRepositoryProvider),
syncApiRepository: ref.watch(syncApiRepositoryProvider),
syncStreamRepository: ref.watch(syncStreamRepositoryProvider),
);
ref.onDispose(() => unawaited(instance.dispose()));
return instance;
},
);
});
final syncApiRepositoryProvider = Provider(
(ref) => SyncApiRepository(
ref.watch(apiServiceProvider),
),
(ref) => SyncApiRepository(ref.watch(apiServiceProvider)),
);
final syncStreamRepositoryProvider = Provider(
(ref) => DriftSyncStreamRepository(ref.watch(driftProvider)),
);

View File

@ -0,0 +1,58 @@
// ignore_for_file: avoid-passing-async-when-sync-expected
import 'dart:async';
import 'package:async/async.dart';
import 'package:immich_mobile/providers/infrastructure/sync_stream.provider.dart';
import 'package:immich_mobile/utils/isolate.dart';
import 'package:logging/logging.dart';
class _SyncStreamDriver {
final _userSyncCache = AsyncCache.ephemeral();
final _partnerSyncCache = AsyncCache.ephemeral();
Future<void> syncUsers() => _userSyncCache.fetch(
() async => runInIsolate(
(ref) => ref.read(syncStreamServiceProvider).syncUsers(),
),
);
Future<void> syncPartners() => _partnerSyncCache.fetch(
() async => runInIsolate(
(ref) => ref.read(syncStreamServiceProvider).syncPartners(),
),
);
}
class BackgroundSyncManager {
final Logger _logger = Logger('BackgroundSyncManager');
Timer? _timer;
final Duration _duration;
// This allows us to keep synching in the background while allowing ondemand syncs
final _driver = _SyncStreamDriver();
BackgroundSyncManager({required Duration duration}) : _duration = duration;
Timer _createTimer() {
return Timer.periodic(_duration, (timer) async {
_logger.info('Background sync started');
await _driver.syncUsers();
await _driver.syncPartners();
_logger.info('Background sync completed');
});
}
void start() {
_logger.info('Background sync enabled');
_timer ??= _createTimer();
}
void stop() {
_logger.info('Background sync disabled');
_timer?.cancel();
_timer = null;
}
Future<void> syncUsers() => _driver.syncUsers();
Future<void> syncPartners() => _driver.syncPartners();
}

View File

@ -0,0 +1,67 @@
import 'dart:async';
import 'dart:isolate';
import 'dart:ui';
import 'package:flutter/material.dart';
import 'package:flutter/services.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/providers/db.provider.dart';
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
import 'package:immich_mobile/utils/bootstrap.dart';
import 'package:logging/logging.dart';
class InvalidIsolateUsageException implements Exception {
const InvalidIsolateUsageException();
@override
String toString() =>
"IsolateHelper should only be used from the root isolate";
}
// !! Should be used only from the root isolate
Future<T?> runInIsolate<T>(
FutureOr<T> Function(ProviderContainer ref) computation, {
String? debugLabel,
}) async {
final token = RootIsolateToken.instance;
if (token == null) {
throw const InvalidIsolateUsageException();
}
return await Isolate.run(() async {
BackgroundIsolateBinaryMessenger.ensureInitialized(token);
DartPluginRegistrant.ensureInitialized();
final db = await Bootstrap.initIsar();
await Bootstrap.initDomain(db);
final ref = ProviderContainer(
overrides: [
// TODO: Remove once isar is removed
dbProvider.overrideWithValue(db),
isarProvider.overrideWithValue(db),
],
);
Logger log = Logger("IsolateLogger");
try {
final result = await computation(ref);
// Wait for isolate to end; i.e, logs to be flushed
await Future.delayed(Durations.short2);
return result;
} catch (error, stack) {
// Log the error and stack trace
log.severe(
"Error in runInIsolate${debugLabel == null ? '' : ' for $debugLabel'}",
error,
stack,
);
} finally {
// Always close the new database connection on Isolate end
ref.read(driftProvider).close();
ref.read(dbProvider).close();
ref.read(isarProvider).close();
}
return null;
});
}