feat: new mobile asset sync

This commit is contained in:
Alex 2025-03-07 14:47:30 -06:00
parent a230b40a58
commit eb64c6b04e
No known key found for this signature in database
GPG Key ID: 53CD082B3A5E1082
5 changed files with 145 additions and 47 deletions

View File

@ -6,4 +6,6 @@ abstract interface class ISyncApiRepository {
Stream<List<SyncEvent>> watchUserSyncEvent();
Stream<List<SyncEvent>> watchAssetSyncEvent();
Stream<List<SyncEvent>> watchExifSyncEvent();
}

View File

@ -10,26 +10,26 @@ class SyncStreamService {
SyncStreamService(this._syncApiRepository);
StreamSubscription? _userSyncSubscription;
StreamSubscription? _assetSyncSubscription;
StreamSubscription? _exifSyncSubscription;
bool _isUserSyncing = false;
bool _isAssetSyncing = false;
bool _isExifSyncing = false;
void syncUsers() {
_userSyncSubscription =
_syncApiRepository.watchUserSyncEvent().listen((events) async {
if (_isUserSyncing) {
return;
}
_isUserSyncing = true;
_userSyncSubscription = _syncApiRepository.watchUserSyncEvent().listen(
(events) async {
for (final event in events) {
if (event.data is SyncUserV1) {
final data = event.data as SyncUserV1;
debugPrint("User Update: $data");
// final user = await _userRepository.get(data.id);
// if (user == null) {
// continue;
// }
// user.name = data.name;
// user.email = data.email;
// user.updatedAt = DateTime.now();
// await _userRepository.update(user);
// await _syncApiRepository.ack(event.ack);
}
@ -40,32 +40,87 @@ class SyncStreamService {
// await _syncApiRepository.ack(event.ack);
}
}
});
await _syncApiRepository.ack(events.last.ack);
},
onDone: () {
_isUserSyncing = false;
},
onError: (_) {
_isUserSyncing = false;
},
);
}
void syncAssets() {
if (_isAssetSyncing) {
debugPrint("Asset syncing already in progress");
return;
}
_isAssetSyncing = true;
int eventCount = 0;
_syncApiRepository.watchAssetSyncEvent().listen((events) async {
_assetSyncSubscription = _syncApiRepository.watchAssetSyncEvent().listen(
(events) async {
eventCount += events.length;
debugPrint("Asset events: $eventCount");
for (final event in events) {
if (event.data is SyncAssetV1) {
final data = event.data as SyncAssetV1;
// debugPrint("Asset Update: $data");
// final data = event.data as SyncAssetV1;
// await _syncApiRepository.ack(event.ack);
}
if (event.data is SyncAssetDeleteV1) {
final data = event.data as SyncAssetDeleteV1;
// final data = event.data as SyncAssetDeleteV1;
// debugPrint("Asset delete: $data");
// await _syncApiRepository.ack(event.ack);
}
}
});
await _syncApiRepository.ack(events.last.ack);
},
onDone: () {
_isAssetSyncing = false;
},
onError: (_) {
_isAssetSyncing = false;
},
);
}
void syncExif() {
if (_isExifSyncing) {
debugPrint("EXIF syncing already in progress");
return;
}
_isExifSyncing = true;
int eventCount = 0;
_exifSyncSubscription = _syncApiRepository.watchExifSyncEvent().listen(
(events) async {
eventCount += events.length;
debugPrint("exif events: $eventCount");
for (final event in events) {
if (event.data is SyncAssetExifV1) {
// final data = event.data as SyncAssetExifV1;
// await _syncApiRepository.ack(event.ack);
}
}
await _syncApiRepository.ack(events.last.ack);
},
onDone: () {
_isExifSyncing = false;
},
onError: (_) {
_isExifSyncing = false;
},
);
}
Future<void> dispose() async {
await _userSyncSubscription?.cancel();
await _assetSyncSubscription?.cancel();
await _exifSyncSubscription?.cancel();
}
}

View File

@ -16,17 +16,33 @@ class SyncApiRepository implements ISyncApiRepository {
Stream<List<SyncEvent>> watchUserSyncEvent() {
return _getSyncStream(
SyncStreamDto(types: [SyncRequestType.usersV1]),
methodName: 'watchUserSyncEvent',
);
}
@override
Stream<List<SyncEvent>> watchAssetSyncEvent() {
return _getSyncStream(
SyncStreamDto(types: [SyncRequestType.assetsV1]),
SyncStreamDto(
types: [SyncRequestType.assetsV1, SyncRequestType.partnerAssetsV1],
),
methodName: 'watchAssetSyncEvent',
);
}
@override
Stream<List<SyncEvent>> watchExifSyncEvent() {
return _getSyncStream(
SyncStreamDto(
types: [
SyncRequestType.assetExifsV1,
SyncRequestType.partnerAssetExifsV1,
],
),
methodName: 'watchExifSyncEvent',
);
}
@override
Future<void> ack(String data) {
return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: [data]));
@ -34,7 +50,7 @@ class SyncApiRepository implements ISyncApiRepository {
Stream<List<SyncEvent>> _getSyncStream(
SyncStreamDto dto, {
int batchSize = 20000,
int batchSize = 10000,
String methodName = '',
}) async* {
final stopwatch = Stopwatch()..start();
@ -95,17 +111,33 @@ class SyncApiRepository implements ISyncApiRepository {
}
const _kResponseMap = <SyncEntityType, Function(dynamic)>{
/// user
SyncEntityType.userV1: SyncUserV1.fromJson,
SyncEntityType.userDeleteV1: SyncUserDeleteV1.fromJson,
/// partners
SyncEntityType.partnerV1: SyncPartnerV1.fromJson,
SyncEntityType.partnerDeleteV1: SyncPartnerDeleteV1.fromJson,
/// assets
SyncEntityType.assetV1: SyncAssetV1.fromJson,
SyncEntityType.assetDeleteV1: SyncAssetDeleteV1.fromJson,
SyncEntityType.assetExifV1: SyncAssetExifV1.fromJson,
/// partners' assets
SyncEntityType.partnerAssetV1: SyncAssetV1.fromJson,
SyncEntityType.partnerAssetDeleteV1: SyncAssetDeleteV1.fromJson,
SyncEntityType.partnerAssetExifV1: SyncAssetExifV1.fromJson,
/// album
};
// Need to be outside of the class to be able to use compute
List<SyncEvent> _parseSyncResponse(List<String> lines) {
final stopwatch = Stopwatch()..start();
final List<SyncEvent> data = [];
for (var line in lines) {
for (final line in lines) {
try {
final jsonData = jsonDecode(line);
final type = SyncEntityType.fromJson(jsonData['type'])!;
@ -113,15 +145,19 @@ List<SyncEvent> _parseSyncResponse(List<String> lines) {
final ack = jsonData['ack'];
final converter = _kResponseMap[type];
if (converter == null) {
debugPrint("[_parseSyncReponse] Unknown type $type");
debugPrint("[_parseSyncResponse] Unknown type $type");
continue;
}
data.add(SyncEvent(data: converter(dataJson), ack: ack));
} catch (error, stack) {
debugPrint("[_parseSyncReponse] Error parsing json $error $stack");
debugPrint("[_parseSyncResponse] Error parsing json $error $stack");
}
}
debugPrint(
"[_parseSyncResponse] Parsed ${data.length} events in ${stopwatch.elapsedMilliseconds}ms",
);
return data;
}

View File

@ -188,6 +188,7 @@ class ImmichAppBar extends ConsumerWidget implements PreferredSizeWidget {
actions: [
IconButton(
onPressed: () {
ref.read(syncStreamServiceProvider).syncExif();
ref.read(syncStreamServiceProvider).syncAssets();
},
icon: const Icon(Icons.sync),

View File

@ -165,9 +165,13 @@ class LoginForm extends HookConsumerWidget {
}
populateTestLoginInfo1() {
emailController.text = 'testuser@email.com';
passwordController.text = 'password';
serverEndpointController.text = 'http://10.1.15.216:2283/api';
emailController.text = 'admin@immich.cloud';
passwordController.text = 'kthxbye';
serverEndpointController.text =
'https://pr-16658.preview.internal.immich.cloud/';
// emailController.text = 'testuser@email.com';
// passwordController.text = 'password';
// serverEndpointController.text = 'http://10.1.15.216:2283/api';
}
login() async {