diff --git a/mobile/lib/pages/backup/drift_backup.page.dart b/mobile/lib/pages/backup/drift_backup.page.dart index 2e7c3e946c..c24bc1774f 100644 --- a/mobile/lib/pages/backup/drift_backup.page.dart +++ b/mobile/lib/pages/backup/drift_backup.page.dart @@ -97,7 +97,8 @@ class _DriftBackupPageState extends ConsumerState { } Future stopBackup() async { - await backupNotifier.cancel(); + // await backupNotifier.cancel(); + await backupNotifier.stopBackup(); } return Scaffold( diff --git a/mobile/lib/providers/backup/drift_backup.provider.dart b/mobile/lib/providers/backup/drift_backup.provider.dart index f52fc654f2..76dd96ec25 100644 --- a/mobile/lib/providers/backup/drift_backup.provider.dart +++ b/mobile/lib/providers/backup/drift_backup.provider.dart @@ -1,9 +1,11 @@ -// ignore_for_file: public_member_api_docs, sort_constructors_first import 'dart:async'; import 'package:background_downloader/background_downloader.dart'; +import 'package:cancellation_token_http/http.dart'; import 'package:collection/collection.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; +import 'package:logging/logging.dart'; + import 'package:immich_mobile/constants/constants.dart'; import 'package:immich_mobile/domain/models/album/local_album.model.dart'; import 'package:immich_mobile/domain/models/asset/base_asset.model.dart'; @@ -13,7 +15,6 @@ import 'package:immich_mobile/providers/infrastructure/asset.provider.dart'; import 'package:immich_mobile/providers/user.provider.dart'; import 'package:immich_mobile/services/upload.service.dart'; import 'package:immich_mobile/utils/debug_print.dart'; -import 'package:logging/logging.dart'; class EnqueueStatus { final int enqueueCount; @@ -114,6 +115,7 @@ class DriftBackupState { final BackupError error; final Map uploadItems; + final CancellationToken? cancelToken; const DriftBackupState({ required this.totalCount, @@ -122,10 +124,11 @@ class DriftBackupState { required this.processingCount, required this.enqueueCount, required this.enqueueTotalCount, - required this.isCanceling, required this.isSyncing, - required this.uploadItems, + required this.isCanceling, this.error = BackupError.none, + required this.uploadItems, + this.cancelToken, }); DriftBackupState copyWith({ @@ -135,10 +138,11 @@ class DriftBackupState { int? processingCount, int? enqueueCount, int? enqueueTotalCount, - bool? isCanceling, bool? isSyncing, - Map? uploadItems, + bool? isCanceling, BackupError? error, + Map? uploadItems, + CancellationToken? cancelToken, }) { return DriftBackupState( totalCount: totalCount ?? this.totalCount, @@ -147,16 +151,17 @@ class DriftBackupState { processingCount: processingCount ?? this.processingCount, enqueueCount: enqueueCount ?? this.enqueueCount, enqueueTotalCount: enqueueTotalCount ?? this.enqueueTotalCount, - isCanceling: isCanceling ?? this.isCanceling, isSyncing: isSyncing ?? this.isSyncing, - uploadItems: uploadItems ?? this.uploadItems, + isCanceling: isCanceling ?? this.isCanceling, error: error ?? this.error, + uploadItems: uploadItems ?? this.uploadItems, + cancelToken: cancelToken ?? this.cancelToken, ); } @override String toString() { - return 'DriftBackupState(totalCount: $totalCount, backupCount: $backupCount, remainderCount: $remainderCount, processingCount: $processingCount, enqueueCount: $enqueueCount, enqueueTotalCount: $enqueueTotalCount, isCanceling: $isCanceling, isSyncing: $isSyncing, uploadItems: $uploadItems, error: $error)'; + return 'DriftBackupState(totalCount: $totalCount, backupCount: $backupCount, remainderCount: $remainderCount, processingCount: $processingCount, enqueueCount: $enqueueCount, enqueueTotalCount: $enqueueTotalCount, isSyncing: $isSyncing, isCanceling: $isCanceling, error: $error, uploadItems: $uploadItems, cancelToken: $cancelToken)'; } @override @@ -170,10 +175,11 @@ class DriftBackupState { other.processingCount == processingCount && other.enqueueCount == enqueueCount && other.enqueueTotalCount == enqueueTotalCount && - other.isCanceling == isCanceling && other.isSyncing == isSyncing && + other.isCanceling == isCanceling && + other.error == error && mapEquals(other.uploadItems, uploadItems) && - other.error == error; + other.cancelToken == cancelToken; } @override @@ -184,10 +190,11 @@ class DriftBackupState { processingCount.hashCode ^ enqueueCount.hashCode ^ enqueueTotalCount.hashCode ^ - isCanceling.hashCode ^ isSyncing.hashCode ^ + isCanceling.hashCode ^ + error.hashCode ^ uploadItems.hashCode ^ - error.hashCode; + cancelToken.hashCode; } } @@ -352,13 +359,68 @@ class DriftBackupNotifier extends StateNotifier { Future startBackup(String userId) { state = state.copyWith(error: BackupError.none); - return _uploadService.startBackup(userId, _updateEnqueueCount); + // return _uploadService.startBackup(userId, _updateEnqueueCount); + + final cancelToken = CancellationToken(); + state = state.copyWith(cancelToken: cancelToken); + + return _uploadService.startForegroundUpload( + userId, + cancelToken, + _handleForegroundBackupProgress, + _handleForegroundBackupSuccess, + _handleForegroundBackupError, + ); } - void _updateEnqueueCount(EnqueueStatus status) { - state = state.copyWith(enqueueCount: status.enqueueCount, enqueueTotalCount: status.totalCount); + Future stopBackup() async { + state.cancelToken?.cancel(); + state = state.copyWith(cancelToken: null, uploadItems: {}); } + void _handleForegroundBackupProgress(String localAssetId, int bytes, int totalBytes) { + final progress = totalBytes > 0 ? bytes / totalBytes : 0.0; + final currentItem = state.uploadItems[localAssetId]; + if (currentItem != null) { + state = state.copyWith( + uploadItems: { + ...state.uploadItems, + localAssetId: currentItem.copyWith(progress: progress, fileSize: totalBytes), + }, + ); + } else { + state = state.copyWith( + uploadItems: { + ...state.uploadItems, + localAssetId: DriftUploadStatus( + taskId: localAssetId, + filename: localAssetId, + progress: progress, + fileSize: totalBytes, + networkSpeedAsString: '', + ), + }, + ); + } + } + + void _handleForegroundBackupSuccess(String localAssetId, String remoteAssetId) { + state = state.copyWith(backupCount: state.backupCount + 1, remainderCount: state.remainderCount - 1); + + Future.delayed(const Duration(milliseconds: 1000), () { + _removeUploadItem(localAssetId); + }); + } + + void _handleForegroundBackupError(String errorMessage) { + _logger.severe("Upload failed: $errorMessage"); + // Here you can update the state to reflect the error if needed + } + + // void _updateEnqueueCount(EnqueueStatus status) { + // state = state.copyWith(enqueueCount: status.enqueueCount, enqueueTotalCount: status.totalCount); + // } + Future cancel() async { dPrint(() => "Canceling backup tasks..."); state = state.copyWith(enqueueCount: 0, enqueueTotalCount: 0, isCanceling: true, error: BackupError.none); diff --git a/mobile/lib/repositories/upload.repository.dart b/mobile/lib/repositories/upload.repository.dart index 38f2c22cf2..c693adb592 100644 --- a/mobile/lib/repositories/upload.repository.dart +++ b/mobile/lib/repositories/upload.repository.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:convert'; import 'dart:io'; @@ -140,4 +141,155 @@ class UploadRepository { } } } + + /// Upload a single asset with progress tracking + Future uploadSingleAsset({ + required File file, + required String originalFileName, + required Map headers, + required Map fields, + required Client httpClient, + required CancellationToken cancelToken, + required void Function(int bytes, int totalBytes) onProgress, + }) async { + return _uploadFile( + file: file, + originalFileName: originalFileName, + headers: headers, + fields: fields, + httpClient: httpClient, + cancelToken: cancelToken, + onProgress: onProgress, + logContext: 'asset', + ); + } + + /// Upload live photo video part and return the video asset ID + Future uploadLivePhotoVideo({ + required File livePhotoFile, + required String originalFileName, + required Map headers, + required Map fields, + required Client httpClient, + required CancellationToken cancelToken, + required void Function(int bytes, int totalBytes) onProgress, + }) async { + final result = await _uploadFile( + file: livePhotoFile, + originalFileName: originalFileName, + headers: headers, + fields: fields, + httpClient: httpClient, + cancelToken: cancelToken, + onProgress: onProgress, + logContext: 'livePhoto video', + ); + + if (result.isSuccess && result.remoteAssetId != null) { + return result.remoteAssetId; + } + + return null; + } + + /// Internal method to upload a file to the server + Future _uploadFile({ + required File file, + required String originalFileName, + required Map headers, + required Map fields, + required Client httpClient, + required CancellationToken cancelToken, + required void Function(int bytes, int totalBytes) onProgress, + required String logContext, + }) async { + final String savedEndpoint = Store.get(StoreKey.serverEndpoint); + final Logger logger = Logger('UploadRepository'); + + try { + final fileStream = file.openRead(); + final assetRawUploadData = MultipartFile("assetData", fileStream, file.lengthSync(), filename: originalFileName); + + final baseRequest = CustomMultipartRequest('POST', Uri.parse('$savedEndpoint/assets'), onProgress: onProgress); + + baseRequest.headers.addAll(headers); + baseRequest.fields.addAll(fields); + baseRequest.files.add(assetRawUploadData); + + final response = await httpClient.send(baseRequest, cancellationToken: cancelToken); + final responseBody = jsonDecode(await response.stream.bytesToString()); + + if (![200, 201].contains(response.statusCode)) { + final error = responseBody; + final errorMessage = error['message'] ?? error['error']; + + logger.warning( + "Error(${error['statusCode']}) uploading $logContext | $originalFileName | ${error['error']}", + ); + + return UploadResult.error(statusCode: response.statusCode, errorMessage: errorMessage); + } + + return UploadResult.success(remoteAssetId: responseBody['id'] as String); + } on CancelledException { + logger.warning("Upload $logContext was cancelled"); + return UploadResult.cancelled(); + } catch (error, stackTrace) { + logger.warning("Error uploading $logContext: ${error.toString()}: $stackTrace"); + return UploadResult.error(errorMessage: error.toString()); + } + } +} + +/// Result of an upload operation +class UploadResult { + final bool isSuccess; + final bool isCancelled; + final String? remoteAssetId; + final String? errorMessage; + final int? statusCode; + + const UploadResult({ + required this.isSuccess, + required this.isCancelled, + this.remoteAssetId, + this.errorMessage, + this.statusCode, + }); + + factory UploadResult.success({required String remoteAssetId}) { + return UploadResult(isSuccess: true, isCancelled: false, remoteAssetId: remoteAssetId); + } + + factory UploadResult.error({String? errorMessage, int? statusCode}) { + return UploadResult(isSuccess: false, isCancelled: false, errorMessage: errorMessage, statusCode: statusCode); + } + + factory UploadResult.cancelled() { + return const UploadResult(isSuccess: false, isCancelled: true); + } +} + +/// Custom MultipartRequest with progress tracking +class CustomMultipartRequest extends MultipartRequest { + CustomMultipartRequest(super.method, super.url, {required this.onProgress}); + + final void Function(int bytes, int totalBytes) onProgress; + + @override + ByteStream finalize() { + final byteStream = super.finalize(); + final total = contentLength; + var bytes = 0; + + final t = StreamTransformer.fromHandlers( + handleData: (List data, EventSink> sink) { + bytes += data.length; + onProgress.call(bytes, total); + sink.add(data); + }, + ); + final stream = byteStream.transform(t); + return ByteStream(stream); + } } diff --git a/mobile/lib/services/upload.service.dart b/mobile/lib/services/upload.service.dart index e8e98562f7..de502dc018 100644 --- a/mobile/lib/services/upload.service.dart +++ b/mobile/lib/services/upload.service.dart @@ -188,6 +188,170 @@ class UploadService { } } + Future startForegroundUpload( + String userId, + CancellationToken cancelToken, + void Function(String localAssetId, int bytes, int totalBytes) onProgress, + void Function(String localAssetId, String remoteAssetId) onSuccess, + void Function(String errorMessage) onError, + ) async { + const concurrentUploads = 3; + final httpClients = List.generate(concurrentUploads, (_) => Client()); + + await _storageRepository.clearCache(); + + shouldAbortQueuingTasks = false; + + final candidates = await _backupRepository.getCandidates(userId); + if (candidates.isEmpty) { + return; + } + + try { + int clientIndex = 0; + + for (int i = 0; i < candidates.length; i += concurrentUploads) { + if (shouldAbortQueuingTasks || cancelToken.isCancelled) { + break; + } + + final batch = candidates.skip(i).take(concurrentUploads).toList(); + final uploadFutures = >[]; + + for (final asset in batch) { + final httpClient = httpClients[clientIndex % concurrentUploads]; + clientIndex++; + + uploadFutures.add( + _uploadSingleAsset( + asset, + httpClient, + cancelToken, + (bytes, totalBytes) => onProgress(asset.localId!, bytes, totalBytes), + onSuccess, + onError, + ), + ); + } + + await Future.wait(uploadFutures); + + if (shouldAbortQueuingTasks) { + break; + } + } + } finally { + for (final client in httpClients) { + client.close(); + } + } + } + + Future _uploadSingleAsset( + LocalAsset asset, + Client httpClient, + CancellationToken cancelToken, + void Function(int bytes, int totalBytes) onProgress, + void Function(String localAssetId, String remoteAssetId) onSuccess, + void Function(String errorMessage) onError, + ) async { + File? file; + File? livePhotoFile; + + try { + final entity = await _storageRepository.getAssetEntityForAsset(asset); + if (entity == null) { + return; + } + + file = await _storageRepository.getFileForAsset(asset.id); + if (file == null) { + return; + } + + // For live photos, get the motion video file + if (entity.isLivePhoto) { + livePhotoFile = await _storageRepository.getMotionFileForAsset(asset); + if (livePhotoFile == null) { + _logger.warning("Failed to obtain motion part of the livePhoto - ${asset.name}"); + } + } + + final originalFileName = entity.isLivePhoto ? p.setExtension(asset.name, p.extension(file.path)) : asset.name; + final deviceId = Store.get(StoreKey.deviceId); + + final headers = ApiService.getRequestHeaders(); + final fields = { + 'deviceAssetId': asset.localId!, + 'deviceId': deviceId, + 'fileCreatedAt': asset.createdAt.toUtc().toIso8601String(), + 'fileModifiedAt': asset.updatedAt.toUtc().toIso8601String(), + 'isFavorite': asset.isFavorite.toString(), + 'duration': asset.duration.toString(), + }; + + // Upload live photo video first if available + String? livePhotoVideoId; + if (entity.isLivePhoto && livePhotoFile != null) { + final livePhotoTitle = p.setExtension(originalFileName, p.extension(livePhotoFile.path)); + livePhotoVideoId = await _uploadRepository.uploadLivePhotoVideo( + livePhotoFile: livePhotoFile, + originalFileName: livePhotoTitle, + headers: headers, + fields: fields, + httpClient: httpClient, + cancelToken: cancelToken, + onProgress: onProgress, + ); + } + + // Add livePhotoVideoId to fields if available + if (livePhotoVideoId != null) { + fields['livePhotoVideoId'] = livePhotoVideoId; + } + + final result = await _uploadRepository.uploadSingleAsset( + file: file, + originalFileName: originalFileName, + headers: headers, + fields: fields, + httpClient: httpClient, + cancelToken: cancelToken, + onProgress: onProgress, + ); + + if (result.isSuccess && result.remoteAssetId != null) { + onSuccess(asset.localId!, result.remoteAssetId!); + } else if (result.isCancelled) { + dPrint(() => "Backup was cancelled by the user"); + shouldAbortQueuingTasks = true; + } else if (result.errorMessage != null) { + dPrint( + () => + "Error(${result.statusCode}) uploading ${asset.localId} | $originalFileName | Created on ${asset.createdAt} | ${result.errorMessage}", + ); + + onError(result.errorMessage!); + + if (result.errorMessage == "Quota has been exceeded!") { + shouldAbortQueuingTasks = true; + } + } + } catch (error, stackTrace) { + dPrint(() => "Error backup asset: ${error.toString()}: $stackTrace"); + onError(error.toString()); + } finally { + if (Platform.isIOS) { + try { + await file?.delete(); + await livePhotoFile?.delete(); + } catch (e) { + dPrint(() => "ERROR deleting file: ${e.toString()}"); + } + } + } + } + /// Cancel all ongoing uploads and reset the upload queue /// /// Return the number of left over tasks in the queue