immich/mobile/lib/domain/services/sync_stream.service.dart
shenlong ea3a14ed25
feat(mobile): add album asset sync (#19522)
* feat(mobile): add album asset sync

* add SyncAlbumToAssetDeleteV1 to openapi-spec

* update delete queries to use where in statements

* clear remote album when clear remote data

* fix: bad merge

* fix: bad merge

* fix: _SyncAckV1 return type

---------

Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com>
Co-authored-by: wuzihao051119 <wuzihao051119@outlook.com>
Co-authored-by: Alex Tran <alex.tran1502@gmail.com>
2025-06-26 08:50:39 -05:00

154 lines
5.4 KiB
Dart

import 'dart:async';
import 'package:immich_mobile/domain/models/sync_event.model.dart';
import 'package:immich_mobile/infrastructure/repositories/sync_api.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/sync_stream.repository.dart';
import 'package:logging/logging.dart';
import 'package:openapi/api.dart';
class SyncStreamService {
final Logger _logger = Logger('SyncStreamService');
final SyncApiRepository _syncApiRepository;
final SyncStreamRepository _syncStreamRepository;
final bool Function()? _cancelChecker;
SyncStreamService({
required SyncApiRepository syncApiRepository,
required SyncStreamRepository syncStreamRepository,
bool Function()? cancelChecker,
}) : _syncApiRepository = syncApiRepository,
_syncStreamRepository = syncStreamRepository,
_cancelChecker = cancelChecker;
bool get isCancelled => _cancelChecker?.call() ?? false;
Future<void> sync() => _syncApiRepository.streamChanges(_handleEvents);
Future<void> _handleEvents(List<SyncEvent> events, Function() abort) async {
List<SyncEvent> items = [];
for (final event in events) {
if (isCancelled) {
_logger.warning("Sync stream cancelled");
abort();
return;
}
if (event.type != items.firstOrNull?.type) {
await _processBatch(items);
}
items.add(event);
}
await _processBatch(items);
}
Future<void> _processBatch(List<SyncEvent> batch) async {
if (batch.isEmpty) {
return;
}
final type = batch.first.type;
await _handleSyncData(type, batch.map((e) => e.data));
await _syncApiRepository.ack([batch.last.ack]);
batch.clear();
}
Future<void> _handleSyncData(
SyncEntityType type,
Iterable<Object> data,
) async {
_logger.fine("Processing sync data for $type of length ${data.length}");
switch (type) {
case SyncEntityType.userV1:
return _syncStreamRepository.updateUsersV1(data.cast());
case SyncEntityType.userDeleteV1:
return _syncStreamRepository.deleteUsersV1(data.cast());
case SyncEntityType.partnerV1:
return _syncStreamRepository.updatePartnerV1(data.cast());
case SyncEntityType.partnerDeleteV1:
return _syncStreamRepository.deletePartnerV1(data.cast());
case SyncEntityType.assetV1:
return _syncStreamRepository.updateAssetsV1(data.cast());
case SyncEntityType.assetDeleteV1:
return _syncStreamRepository.deleteAssetsV1(data.cast());
case SyncEntityType.assetExifV1:
return _syncStreamRepository.updateAssetsExifV1(data.cast());
case SyncEntityType.partnerAssetV1:
return _syncStreamRepository.updateAssetsV1(
data.cast(),
debugLabel: 'partner',
);
case SyncEntityType.partnerAssetBackfillV1:
return _syncStreamRepository.updateAssetsV1(
data.cast(),
debugLabel: 'partner backfill',
);
case SyncEntityType.partnerAssetDeleteV1:
return _syncStreamRepository.deleteAssetsV1(
data.cast(),
debugLabel: "partner",
);
case SyncEntityType.partnerAssetExifV1:
return _syncStreamRepository.updateAssetsExifV1(
data.cast(),
debugLabel: 'partner',
);
case SyncEntityType.partnerAssetExifBackfillV1:
return _syncStreamRepository.updateAssetsExifV1(
data.cast(),
debugLabel: 'partner backfill',
);
case SyncEntityType.albumV1:
return _syncStreamRepository.updateAlbumsV1(data.cast());
case SyncEntityType.albumDeleteV1:
return _syncStreamRepository.deleteAlbumsV1(data.cast());
case SyncEntityType.albumUserV1:
return _syncStreamRepository.updateAlbumUsersV1(data.cast());
case SyncEntityType.albumUserBackfillV1:
return _syncStreamRepository.updateAlbumUsersV1(
data.cast(),
debugLabel: 'backfill',
);
case SyncEntityType.albumUserDeleteV1:
return _syncStreamRepository.deleteAlbumUsersV1(data.cast());
case SyncEntityType.albumAssetV1:
return _syncStreamRepository.updateAssetsV1(
data.cast(),
debugLabel: 'album',
);
case SyncEntityType.albumAssetBackfillV1:
return _syncStreamRepository.updateAssetsV1(
data.cast(),
debugLabel: 'album backfill',
);
case SyncEntityType.albumAssetExifV1:
return _syncStreamRepository.updateAssetsExifV1(
data.cast(),
debugLabel: 'album',
);
case SyncEntityType.albumAssetExifBackfillV1:
return _syncStreamRepository.updateAssetsExifV1(
data.cast(),
debugLabel: 'album backfill',
);
case SyncEntityType.albumToAssetV1:
return _syncStreamRepository.updateAlbumToAssetsV1(data.cast());
case SyncEntityType.albumToAssetBackfillV1:
return _syncStreamRepository.updateAlbumToAssetsV1(
data.cast(),
debugLabel: 'backfill',
);
case SyncEntityType.albumToAssetDeleteV1:
return _syncStreamRepository.deleteAlbumToAssetsV1(data.cast());
// No-op. SyncAckV1 entities are checkpoints in the sync stream
// to acknowledge that the client has processed all the backfill events
case SyncEntityType.syncAckV1:
return;
default:
_logger.warning("Unknown sync data type: $type");
}
}
}