mirror of
https://github.com/immich-app/immich.git
synced 2026-06-04 13:55:19 -04:00
f1f8142409
wire hash cancellation await cleanup remove forced kill add regression tests abort sync requests fix cleanup ordering in teardown exit isolate test background sync test sigabrt crash cleanup
218 lines
8.2 KiB
Dart
218 lines
8.2 KiB
Dart
import 'dart:async';
|
|
|
|
import 'package:drift/drift.dart';
|
|
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
|
import 'package:immich_mobile/constants/constants.dart';
|
|
import 'package:immich_mobile/domain/models/asset/asset_metadata.model.dart';
|
|
import 'package:immich_mobile/domain/models/asset/base_asset.model.dart';
|
|
import 'package:immich_mobile/extensions/platform_extensions.dart';
|
|
import 'package:immich_mobile/infrastructure/entities/local_asset.entity.dart';
|
|
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
|
|
import 'package:immich_mobile/infrastructure/repositories/local_album.repository.dart';
|
|
import 'package:immich_mobile/platform/native_sync_api.g.dart';
|
|
import 'package:immich_mobile/providers/api.provider.dart';
|
|
import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
|
|
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
|
|
import 'package:immich_mobile/providers/infrastructure/sync.provider.dart';
|
|
import 'package:immich_mobile/providers/server_info.provider.dart';
|
|
import 'package:immich_mobile/providers/user.provider.dart';
|
|
import 'package:logging/logging.dart';
|
|
// ignore: import_rule_openapi
|
|
import 'package:openapi/api.dart' hide AssetVisibility;
|
|
|
|
Future<void> syncCloudIds(ProviderContainer ref) async {
|
|
if (!CurrentPlatform.isIOS) {
|
|
return;
|
|
}
|
|
final logger = Logger('migrateCloudIds');
|
|
|
|
final db = ref.read(driftProvider);
|
|
// Populate cloud IDs for local assets that don't have one yet
|
|
await _populateCloudIds(db);
|
|
|
|
final serverInfo = await ref.read(serverInfoProvider.notifier).getServerInfo();
|
|
final canUpdateMetadata = serverInfo.serverVersion.isAtLeast(major: 2, minor: 4);
|
|
if (!canUpdateMetadata) {
|
|
logger.fine('Server version does not support asset metadata updates. Skipping cloudId migration.');
|
|
return;
|
|
}
|
|
final canBulkUpdateMetadata = serverInfo.serverVersion.isAtLeast(major: 2, minor: 5);
|
|
|
|
// Wait for remote sync to complete, so we have up-to-date asset metadata entries
|
|
try {
|
|
await ref.read(syncStreamServiceProvider).sync();
|
|
} catch (e, s) {
|
|
logger.fine('Failed to complete remote sync before cloudId migration.', e, s);
|
|
return;
|
|
}
|
|
|
|
// Fetch the mapping for backed up assets that have a cloud ID locally but do not have a cloud ID on the server
|
|
final currentUser = ref.read(currentUserProvider);
|
|
if (currentUser == null) {
|
|
logger.warning('Current user is null. Aborting cloudId migration.');
|
|
return;
|
|
}
|
|
|
|
final assetApi = ref.read(apiServiceProvider).assetsApi;
|
|
final cancellation = ref.read(cancellationProvider);
|
|
|
|
// Process cloud IDs in paginated batches
|
|
await _processCloudIdMappingsInBatches(db, currentUser.id, assetApi, canBulkUpdateMetadata, logger, cancellation);
|
|
}
|
|
|
|
Future<void> _processCloudIdMappingsInBatches(
|
|
Drift drift,
|
|
String userId,
|
|
AssetsApi assetsApi,
|
|
bool canBulkUpdate,
|
|
Logger logger,
|
|
Completer<void> cancellation,
|
|
) async {
|
|
const pageSize = 20000;
|
|
String? lastLocalId;
|
|
final seenRemoteAssetIds = <String>{};
|
|
|
|
while (true) {
|
|
if (cancellation.isCompleted) {
|
|
logger.warning('Cloud ID migration cancelled. Stopping batch processing.');
|
|
break;
|
|
}
|
|
final mappings = await _fetchCloudIdMappings(drift, userId, pageSize, lastLocalId);
|
|
if (mappings.isEmpty) {
|
|
break;
|
|
}
|
|
|
|
final items = <AssetMetadataBulkUpsertItemDto>[];
|
|
for (final mapping in mappings) {
|
|
if (seenRemoteAssetIds.add(mapping.remoteAssetId)) {
|
|
items.add(
|
|
AssetMetadataBulkUpsertItemDto(
|
|
assetId: mapping.remoteAssetId,
|
|
key: kMobileMetadataKey,
|
|
value: Map<String, Object>.from(
|
|
RemoteAssetMobileAppMetadata(
|
|
cloudId: mapping.localAsset.cloudId,
|
|
createdAt: mapping.localAsset.createdAt.toIso8601String(),
|
|
adjustmentTime: mapping.localAsset.adjustmentTime?.toIso8601String(),
|
|
latitude: mapping.localAsset.latitude?.toString(),
|
|
longitude: mapping.localAsset.longitude?.toString(),
|
|
).toJson(),
|
|
),
|
|
),
|
|
);
|
|
} else {
|
|
logger.fine('Duplicate remote asset ID found: ${mapping.remoteAssetId}. Skipping duplicate entry.');
|
|
}
|
|
}
|
|
|
|
if (items.isNotEmpty) {
|
|
if (canBulkUpdate) {
|
|
await _bulkUpdateCloudIds(assetsApi, items, cancellation.future);
|
|
} else {
|
|
await _sequentialUpdateCloudIds(assetsApi, items, cancellation);
|
|
}
|
|
}
|
|
|
|
lastLocalId = mappings.last.localAsset.id;
|
|
if (mappings.length < pageSize) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
Future<void> _sequentialUpdateCloudIds(
|
|
AssetsApi assetsApi,
|
|
List<AssetMetadataBulkUpsertItemDto> items,
|
|
Completer<void> cancellation,
|
|
) async {
|
|
for (final item in items) {
|
|
if (cancellation.isCompleted) {
|
|
break;
|
|
}
|
|
final upsertItem = AssetMetadataUpsertItemDto(key: item.key, value: item.value);
|
|
try {
|
|
await assetsApi.updateAssetMetadata(
|
|
item.assetId,
|
|
AssetMetadataUpsertDto(items: [upsertItem]),
|
|
abortTrigger: cancellation.future,
|
|
);
|
|
} catch (error, stack) {
|
|
Logger('migrateCloudIds').warning('Failed to update metadata for asset ${item.assetId}', error, stack);
|
|
}
|
|
}
|
|
}
|
|
|
|
Future<void> _bulkUpdateCloudIds(
|
|
AssetsApi assetsApi,
|
|
List<AssetMetadataBulkUpsertItemDto> items,
|
|
Future<void> abortTrigger,
|
|
) async {
|
|
try {
|
|
await assetsApi.updateBulkAssetMetadata(AssetMetadataBulkUpsertDto(items: items), abortTrigger: abortTrigger);
|
|
} catch (error, stack) {
|
|
Logger('migrateCloudIds').warning('Failed to bulk update metadata', error, stack);
|
|
}
|
|
}
|
|
|
|
Future<void> _populateCloudIds(Drift drift) async {
|
|
final query = drift.localAssetEntity.selectOnly()
|
|
..addColumns([drift.localAssetEntity.id])
|
|
..where(drift.localAssetEntity.iCloudId.isNull());
|
|
final ids = await query.map((row) => row.read(drift.localAssetEntity.id)!).get();
|
|
final cloudMapping = <String, String>{};
|
|
final cloudIds = await NativeSyncApi().getCloudIdForAssetIds(ids);
|
|
for (int i = 0; i < cloudIds.length; i++) {
|
|
final cloudIdResult = cloudIds[i];
|
|
if (cloudIdResult.cloudId != null) {
|
|
cloudMapping[cloudIdResult.assetId] = cloudIdResult.cloudId!;
|
|
} else {
|
|
Logger('migrateCloudIds').fine(
|
|
"Cannot fetch cloudId for asset with id: ${cloudIdResult.assetId}. Error: ${cloudIdResult.error ?? "unknown"}",
|
|
);
|
|
}
|
|
}
|
|
await DriftLocalAlbumRepository(drift).updateCloudMapping(cloudMapping);
|
|
}
|
|
|
|
typedef _CloudIdMapping = ({String remoteAssetId, LocalAsset localAsset});
|
|
|
|
Future<List<_CloudIdMapping>> _fetchCloudIdMappings(Drift drift, String userId, int limit, String? lastLocalId) async {
|
|
final query =
|
|
drift.localAssetEntity.select().join([
|
|
innerJoin(
|
|
drift.remoteAssetEntity,
|
|
drift.localAssetEntity.checksum.equalsExp(drift.remoteAssetEntity.checksum),
|
|
),
|
|
leftOuterJoin(
|
|
drift.remoteAssetCloudIdEntity,
|
|
drift.remoteAssetEntity.id.equalsExp(drift.remoteAssetCloudIdEntity.assetId),
|
|
useColumns: false,
|
|
),
|
|
])
|
|
..where(
|
|
// Only select assets that have a local cloud ID but either no remote cloud ID or a mismatched eTag
|
|
drift.localAssetEntity.iCloudId.isNotNull() &
|
|
drift.remoteAssetEntity.ownerId.equals(userId) &
|
|
// Skip locked assets as we cannot update them without unlocking first
|
|
drift.remoteAssetEntity.visibility.isNotValue(AssetVisibility.locked.index) &
|
|
(drift.remoteAssetCloudIdEntity.cloudId.isNull() |
|
|
drift.remoteAssetCloudIdEntity.adjustmentTime.isNotExp(drift.localAssetEntity.adjustmentTime) |
|
|
drift.remoteAssetCloudIdEntity.latitude.isNotExp(drift.localAssetEntity.latitude) |
|
|
drift.remoteAssetCloudIdEntity.longitude.isNotExp(drift.localAssetEntity.longitude) |
|
|
drift.remoteAssetCloudIdEntity.createdAt.isNotExp(drift.localAssetEntity.createdAt)),
|
|
)
|
|
..orderBy([OrderingTerm.asc(drift.localAssetEntity.id)])
|
|
..limit(limit);
|
|
|
|
if (lastLocalId != null) {
|
|
query.where(drift.localAssetEntity.id.isBiggerThanValue(lastLocalId));
|
|
}
|
|
|
|
return query.map((row) {
|
|
return (
|
|
remoteAssetId: row.read(drift.remoteAssetEntity.id)!,
|
|
localAsset: row.readTable(drift.localAssetEntity).toDto(),
|
|
);
|
|
}).get();
|
|
}
|