From 03a13828e1f4634c040b330649ba8d9100fb6c61 Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 25 Jul 2025 10:09:32 -0500 Subject: [PATCH] chore: refactor upload service (#20130) * chore: refactor upload service * fix: cancel upload queue on logout (#20131) * fix: cancel upload on logout * fix: test --------- Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com> Co-authored-by: Alex --------- Co-authored-by: shenlong <139912620+shenlong-tanwen@users.noreply.github.com> Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com> --- mobile/lib/main.dart | 2 +- .../lib/pages/backup/drift_backup.page.dart | 2 +- .../drift_backup_album_selection.page.dart | 2 +- .../share_intent_upload.provider.dart | 4 +- mobile/lib/providers/auth.provider.dart | 5 + .../backup/drift_backup.provider.dart | 38 +- .../infrastructure/action.provider.dart | 8 +- .../lib/repositories/upload.repository.dart | 49 ++- mobile/lib/services/auth.service.dart | 11 +- mobile/lib/services/drift_backup.service.dart | 316 --------------- mobile/lib/services/upload.service.dart | 368 +++++++++++++++--- mobile/test/domain/service.mock.dart | 6 + mobile/test/services/auth.service_test.dart | 22 +- 13 files changed, 438 insertions(+), 395 deletions(-) delete mode 100644 mobile/lib/services/drift_backup.service.dart diff --git a/mobile/lib/main.dart b/mobile/lib/main.dart index 5f0c5d42db..e52f298413 100644 --- a/mobile/lib/main.dart +++ b/mobile/lib/main.dart @@ -97,7 +97,7 @@ Future initApp() async { await FileDownloader().configure( // maxConcurrent: 6, maxConcurrentByHost(server):6, maxConcurrentByGroup: 3 - globalConfig: (Config.holdingQueue, (6, 6, 3)), + globalConfig: (Config.holdingQueue, (1000, 1000, 1000)), ); await FileDownloader().trackTasksInGroup( diff --git a/mobile/lib/pages/backup/drift_backup.page.dart b/mobile/lib/pages/backup/drift_backup.page.dart index 0ebb68883f..f691eb576b 100644 --- a/mobile/lib/pages/backup/drift_backup.page.dart +++ b/mobile/lib/pages/backup/drift_backup.page.dart @@ -40,7 +40,7 @@ class _DriftBackupPageState extends ConsumerState { } await ref.read(driftBackupProvider.notifier).getBackupStatus(currentUser.id); - await ref.read(driftBackupProvider.notifier).backup(currentUser.id); + await ref.read(driftBackupProvider.notifier).startBackup(currentUser.id); } Future stopBackup() async { diff --git a/mobile/lib/pages/backup/drift_backup_album_selection.page.dart b/mobile/lib/pages/backup/drift_backup_album_selection.page.dart index 8182798636..047bd26a9e 100644 --- a/mobile/lib/pages/backup/drift_backup_album_selection.page.dart +++ b/mobile/lib/pages/backup/drift_backup_album_selection.page.dart @@ -101,7 +101,7 @@ class _DriftBackupAlbumSelectionPageState extends ConsumerState((ref) { ref.watch(authServiceProvider), ref.watch(apiServiceProvider), ref.watch(userServiceProvider), + ref.watch(uploadServiceProvider), ref.watch(secureStorageServiceProvider), ref.watch(widgetServiceProvider), ); @@ -32,6 +34,7 @@ class AuthNotifier extends StateNotifier { final AuthService _authService; final ApiService _apiService; final UserService _userService; + final UploadService _uploadService; final SecureStorageService _secureStorageService; final WidgetService _widgetService; final _log = Logger("AuthenticationNotifier"); @@ -42,6 +45,7 @@ class AuthNotifier extends StateNotifier { this._authService, this._apiService, this._userService, + this._uploadService, this._secureStorageService, this._widgetService, ) : super( @@ -83,6 +87,7 @@ class AuthNotifier extends StateNotifier { await _widgetService.clearCredentials(); await _authService.logout(); + await _uploadService.cancelBackup(); } finally { await _cleanUp(); } diff --git a/mobile/lib/providers/backup/drift_backup.provider.dart b/mobile/lib/providers/backup/drift_backup.provider.dart index 42a4ae95f3..d352869651 100644 --- a/mobile/lib/providers/backup/drift_backup.provider.dart +++ b/mobile/lib/providers/backup/drift_backup.provider.dart @@ -8,7 +8,6 @@ import 'package:flutter/widgets.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/constants/constants.dart'; -import 'package:immich_mobile/services/drift_backup.service.dart'; import 'package:immich_mobile/services/upload.service.dart'; class EnqueueStatus { @@ -199,14 +198,12 @@ class DriftBackupState { final driftBackupProvider = StateNotifierProvider((ref) { return ExpBackupNotifier( - ref.watch(driftBackupServiceProvider), ref.watch(uploadServiceProvider), ); }); class ExpBackupNotifier extends StateNotifier { ExpBackupNotifier( - this._backupService, this._uploadService, ) : super( const DriftBackupState( @@ -225,7 +222,6 @@ class ExpBackupNotifier extends StateNotifier { } } - final DriftBackupService _backupService; final UploadService _uploadService; StreamSubscription? _statusSubscription; StreamSubscription? _progressSubscription; @@ -328,9 +324,9 @@ class ExpBackupNotifier extends StateNotifier { Future getBackupStatus(String userId) async { final [totalCount, backupCount, remainderCount] = await Future.wait([ - _backupService.getTotalCount(), - _backupService.getBackupCount(userId), - _backupService.getRemainderCount(userId), + _uploadService.getBackupTotalCount(), + _uploadService.getBackupFinishedCount(userId), + _uploadService.getBackupRemainderCount(userId), ]); state = state.copyWith( @@ -340,8 +336,8 @@ class ExpBackupNotifier extends StateNotifier { ); } - Future backup(String userId) { - return _backupService.backup(userId, _updateEnqueueCount); + Future startBackup(String userId) { + return _uploadService.startBackup(userId, _updateEnqueueCount); } void _updateEnqueueCount(EnqueueStatus status) { @@ -352,22 +348,22 @@ class ExpBackupNotifier extends StateNotifier { } Future cancel() async { + debugPrint("Canceling backup tasks..."); state = state.copyWith( enqueueCount: 0, enqueueTotalCount: 0, isCanceling: true, ); - await _backupService.cancel(); + final activeTaskCount = await _uploadService.cancelBackup(); - // Check if there are any tasks left in the queue - final tasks = await FileDownloader().allTasks(group: kBackupGroup); - - debugPrint("Tasks left to cancel: ${tasks.length}"); - - if (tasks.isNotEmpty) { + if (activeTaskCount > 0) { + debugPrint( + "$activeTaskCount tasks left, continuing to cancel...", + ); await cancel(); } else { + debugPrint("All tasks canceled successfully."); // Clear all upload items when cancellation is complete state = state.copyWith( isCanceling: false, @@ -377,14 +373,18 @@ class ExpBackupNotifier extends StateNotifier { } Future handleBackupResume(String userId) async { - final tasks = await FileDownloader().allTasks(group: kBackupGroup); + debugPrint("handleBackupResume"); + final tasks = await _uploadService.getActiveTasks(kBackupGroup); + debugPrint("Found ${tasks.length} tasks"); + if (tasks.isEmpty) { // Start a new backup queue - await backup(userId); + debugPrint("Start a new backup queue"); + await startBackup(userId); } debugPrint("Tasks to resume: ${tasks.length}"); - await FileDownloader().start(); + await _uploadService.resumeBackup(); } @override diff --git a/mobile/lib/providers/infrastructure/action.provider.dart b/mobile/lib/providers/infrastructure/action.provider.dart index 96c0c5eb18..899bd7858b 100644 --- a/mobile/lib/providers/infrastructure/action.provider.dart +++ b/mobile/lib/providers/infrastructure/action.provider.dart @@ -5,8 +5,8 @@ import 'package:immich_mobile/providers/infrastructure/asset_viewer/current_asse import 'package:immich_mobile/providers/timeline/multiselect.provider.dart'; import 'package:immich_mobile/providers/user.provider.dart'; import 'package:immich_mobile/services/action.service.dart'; -import 'package:immich_mobile/services/drift_backup.service.dart'; import 'package:immich_mobile/services/timeline.service.dart'; +import 'package:immich_mobile/services/upload.service.dart'; import 'package:logging/logging.dart'; import 'package:riverpod_annotation/riverpod_annotation.dart'; @@ -32,14 +32,14 @@ class ActionResult { class ActionNotifier extends Notifier { final Logger _logger = Logger('ActionNotifier'); late ActionService _service; - late DriftBackupService _backupService; + late UploadService _uploadService; ActionNotifier() : super(); @override void build() { + _uploadService = ref.watch(uploadServiceProvider); _service = ref.watch(actionServiceProvider); - _backupService = ref.watch(driftBackupServiceProvider); } List _getRemoteIdsForSource(ActionSource source) { @@ -366,7 +366,7 @@ class ActionNotifier extends Notifier { Future upload(ActionSource source) async { final assets = _getAssets(source).whereType().toList(); try { - await _backupService.manualBackup(assets); + await _uploadService.manualBackup(assets); return ActionResult(count: assets.length, success: true); } catch (error, stack) { _logger.severe('Failed manually upload assets', error, stack); diff --git a/mobile/lib/repositories/upload.repository.dart b/mobile/lib/repositories/upload.repository.dart index 6d75313a24..ec093c5297 100644 --- a/mobile/lib/repositories/upload.repository.dart +++ b/mobile/lib/repositories/upload.repository.dart @@ -1,4 +1,5 @@ import 'package:background_downloader/background_downloader.dart'; +import 'package:flutter/material.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/constants/constants.dart'; @@ -6,7 +7,6 @@ final uploadRepositoryProvider = Provider((ref) => UploadRepository()); class UploadRepository { void Function(TaskStatusUpdate)? onUploadStatus; - void Function(TaskProgressUpdate)? onTaskProgress; UploadRepository() { @@ -27,11 +27,11 @@ class UploadRepository { ); } - void enqueueAll(List tasks) { + void enqueueBackgroundAll(List tasks) { FileDownloader().enqueueAll(tasks); } - Future deleteAllTrackingRecords(String group) { + Future deleteDatabaseRecords(String group) { return FileDownloader().database.deleteAllRecords(group: group); } @@ -42,4 +42,47 @@ class UploadRepository { Future reset(String group) { return FileDownloader().reset(group: group); } + + /// Get a list of tasks that are ENQUEUED or RUNNING + Future> getActiveTasks(String group) { + return FileDownloader().allTasks(group: group); + } + + Future start() { + return FileDownloader().start(); + } + + Future getUploadInfo() async { + final [enqueuedTasks, runningTasks, canceledTasks, waitingTasks, pausedTasks] = await Future.wait([ + FileDownloader().database.allRecordsWithStatus( + TaskStatus.enqueued, + group: kBackupGroup, + ), + FileDownloader().database.allRecordsWithStatus( + TaskStatus.running, + group: kBackupGroup, + ), + FileDownloader().database.allRecordsWithStatus( + TaskStatus.canceled, + group: kBackupGroup, + ), + FileDownloader().database.allRecordsWithStatus( + TaskStatus.waitingToRetry, + group: kBackupGroup, + ), + FileDownloader().database.allRecordsWithStatus( + TaskStatus.paused, + group: kBackupGroup, + ), + ]); + + debugPrint(""" + Upload Info: + Enqueued: ${enqueuedTasks.length} + Running: ${runningTasks.length} + Canceled: ${canceledTasks.length} + Waiting: ${waitingTasks.length} + Paused: ${pausedTasks.length} + """); + } } diff --git a/mobile/lib/services/auth.service.dart b/mobile/lib/services/auth.service.dart index 0eec253ee1..e8c4c5e97e 100644 --- a/mobile/lib/services/auth.service.dart +++ b/mobile/lib/services/auth.service.dart @@ -8,10 +8,12 @@ import 'package:immich_mobile/entities/store.entity.dart'; import 'package:immich_mobile/models/auth/auxilary_endpoint.model.dart'; import 'package:immich_mobile/models/auth/login_response.model.dart'; import 'package:immich_mobile/providers/api.provider.dart'; +import 'package:immich_mobile/providers/app_settings.provider.dart'; import 'package:immich_mobile/providers/background_sync.provider.dart'; import 'package:immich_mobile/repositories/auth.repository.dart'; import 'package:immich_mobile/repositories/auth_api.repository.dart'; import 'package:immich_mobile/services/api.service.dart'; +import 'package:immich_mobile/services/app_settings.service.dart'; import 'package:immich_mobile/services/network.service.dart'; import 'package:logging/logging.dart'; import 'package:openapi/api.dart'; @@ -23,6 +25,7 @@ final authServiceProvider = Provider( ref.watch(apiServiceProvider), ref.watch(networkServiceProvider), ref.watch(backgroundSyncProvider), + ref.watch(appSettingsServiceProvider), ), ); @@ -32,7 +35,7 @@ class AuthService { final ApiService _apiService; final NetworkService _networkService; final BackgroundSyncManager _backgroundSyncManager; - + final AppSettingsService _appSettingsService; final _log = Logger("AuthService"); AuthService( @@ -41,6 +44,7 @@ class AuthService { this._apiService, this._networkService, this._backgroundSyncManager, + this._appSettingsService, ); /// Validates the provided server URL by resolving and setting the endpoint. @@ -106,6 +110,11 @@ class AuthService { await clearLocalData().catchError((error, stackTrace) { _log.severe("Error clearing local data", error, stackTrace); }); + + await _appSettingsService.setSetting( + AppSettingsEnum.enableBackup, + false, + ); } } diff --git a/mobile/lib/services/drift_backup.service.dart b/mobile/lib/services/drift_backup.service.dart deleted file mode 100644 index 5aba1794f6..0000000000 --- a/mobile/lib/services/drift_backup.service.dart +++ /dev/null @@ -1,316 +0,0 @@ -import 'dart:async'; -import 'dart:convert'; -import 'dart:io'; - -import 'package:background_downloader/background_downloader.dart'; -import 'package:flutter/material.dart'; -import 'package:immich_mobile/constants/constants.dart'; -import 'package:immich_mobile/domain/models/asset/base_asset.model.dart'; -import 'package:immich_mobile/infrastructure/repositories/backup.repository.dart'; -import 'package:immich_mobile/infrastructure/repositories/local_asset.repository.dart'; -import 'package:immich_mobile/infrastructure/repositories/storage.repository.dart'; -import 'package:immich_mobile/providers/backup/drift_backup.provider.dart'; -import 'package:immich_mobile/providers/infrastructure/asset.provider.dart'; -import 'package:immich_mobile/providers/infrastructure/storage.provider.dart'; -import 'package:immich_mobile/services/upload.service.dart'; -import 'package:logging/logging.dart'; -import 'package:path/path.dart' as p; -import 'package:riverpod_annotation/riverpod_annotation.dart'; - -final driftBackupServiceProvider = Provider( - (ref) => DriftBackupService( - ref.watch(backupRepositoryProvider), - ref.watch(storageRepositoryProvider), - ref.watch(uploadServiceProvider), - ref.watch(localAssetRepository), - ), -); - -// TODO: Rename to UploadService after removing Isar -class DriftBackupService { - DriftBackupService( - this._backupRepository, - this._storageRepository, - this._uploadService, - this._localAssetRepository, - ) { - _uploadService.taskStatusStream.listen(_handleTaskStatusUpdate); - } - - final DriftBackupRepository _backupRepository; - final StorageRepository _storageRepository; - final DriftLocalAssetRepository _localAssetRepository; - final UploadService _uploadService; - final _log = Logger("DriftBackupService"); - - bool shouldCancel = false; - - Future getTotalCount() { - return _backupRepository.getTotalCount(); - } - - Future getRemainderCount(String userId) { - return _backupRepository.getRemainderCount(userId); - } - - Future getBackupCount(String userId) { - return _backupRepository.getBackupCount(userId); - } - - Future manualBackup(List localAssets) async { - List tasks = []; - for (final asset in localAssets) { - final task = await _getUploadTask( - asset, - group: kManualUploadGroup, - priority: 1, // High priority after upload motion photo part - ); - if (task != null) { - tasks.add(task); - } - } - - if (tasks.isNotEmpty) { - _uploadService.enqueueTasks(tasks); - } - } - - Future backup( - String userId, - void Function(EnqueueStatus status) onEnqueueTasks, - ) async { - shouldCancel = false; - - final candidates = await _backupRepository.getCandidates(userId); - if (candidates.isEmpty) { - return; - } - - const batchSize = 100; - int count = 0; - for (int i = 0; i < candidates.length; i += batchSize) { - if (shouldCancel) { - break; - } - - final batch = candidates.skip(i).take(batchSize).toList(); - - List tasks = []; - for (final asset in batch) { - final task = await _getUploadTask(asset); - if (task != null) { - tasks.add(task); - } - } - - if (tasks.isNotEmpty && !shouldCancel) { - count += tasks.length; - _uploadService.enqueueTasks(tasks); - - onEnqueueTasks( - EnqueueStatus( - enqueueCount: count, - totalCount: candidates.length, - ), - ); - } - } - } - - void _handleTaskStatusUpdate(TaskStatusUpdate update) { - switch (update.status) { - case TaskStatus.complete: - _handleLivePhoto(update); - break; - - default: - break; - } - } - - Future _handleLivePhoto(TaskStatusUpdate update) async { - try { - if (update.task.metaData.isEmpty || update.task.metaData == '') { - return; - } - - final metadata = UploadTaskMetadata.fromJson(update.task.metaData); - if (!metadata.isLivePhotos) { - return; - } - - if (update.responseBody == null || update.responseBody!.isEmpty) { - return; - } - final response = jsonDecode(update.responseBody!); - - final localAsset = await _localAssetRepository.getById(metadata.localAssetId); - if (localAsset == null) { - return; - } - - final uploadTask = await _getLivePhotoUploadTask( - localAsset, - response['id'] as String, - ); - - if (uploadTask == null) { - return; - } - - _uploadService.enqueueTasks([uploadTask]); - } catch (error, stackTrace) { - _log.severe("Error handling live photo upload task", error, stackTrace); - debugPrint("Error handling live photo upload task: $error $stackTrace"); - } - } - - Future _getUploadTask( - LocalAsset asset, { - String group = kBackupGroup, - int? priority, - }) async { - final entity = await _storageRepository.getAssetEntityForAsset(asset); - if (entity == null) { - return null; - } - - File? file; - - /// iOS LivePhoto has two files: a photo and a video. - /// They are uploaded separately, with video file being upload first, then returned with the assetId - /// The assetId is then used as a metadata for the photo file upload task. - /// - /// We implement two separate upload groups for this, the normal one for the video file - /// and the higher priority group for the photo file because the video file is already uploaded. - /// - /// The cancel operation will only cancel the video group (normal group), the photo group will not - /// be touched, as the video file is already uploaded. - - if (entity.isLivePhoto) { - file = await _storageRepository.getMotionFileForAsset(asset); - } else { - file = await _storageRepository.getFileForAsset(asset.id); - } - - if (file == null) { - return null; - } - - final originalFileName = entity.isLivePhoto - ? p.setExtension( - asset.name, - p.extension(file.path), - ) - : asset.name; - - String metadata = UploadTaskMetadata( - localAssetId: asset.id, - isLivePhotos: entity.isLivePhoto, - livePhotoVideoId: '', - ).toJson(); - - return _uploadService.buildUploadTask( - file, - originalFileName: originalFileName, - deviceAssetId: asset.id, - metadata: metadata, - group: group, - priority: priority, - ); - } - - Future _getLivePhotoUploadTask( - LocalAsset asset, - String livePhotoVideoId, - ) async { - final entity = await _storageRepository.getAssetEntityForAsset(asset); - if (entity == null) { - return null; - } - - final file = await _storageRepository.getFileForAsset(asset.id); - if (file == null) { - return null; - } - - final fields = { - 'livePhotoVideoId': livePhotoVideoId, - }; - - return _uploadService.buildUploadTask( - file, - originalFileName: asset.name, - deviceAssetId: asset.id, - fields: fields, - group: kBackupLivePhotoGroup, - priority: 0, // Highest priority to get upload immediately - ); - } - - Future cancel() async { - shouldCancel = true; - await _uploadService.cancelAllForGroup(kBackupGroup); - } -} - -class UploadTaskMetadata { - final String localAssetId; - final bool isLivePhotos; - final String livePhotoVideoId; - - const UploadTaskMetadata({ - required this.localAssetId, - required this.isLivePhotos, - required this.livePhotoVideoId, - }); - - UploadTaskMetadata copyWith({ - String? localAssetId, - bool? isLivePhotos, - String? livePhotoVideoId, - }) { - return UploadTaskMetadata( - localAssetId: localAssetId ?? this.localAssetId, - isLivePhotos: isLivePhotos ?? this.isLivePhotos, - livePhotoVideoId: livePhotoVideoId ?? this.livePhotoVideoId, - ); - } - - Map toMap() { - return { - 'localAssetId': localAssetId, - 'isLivePhotos': isLivePhotos, - 'livePhotoVideoId': livePhotoVideoId, - }; - } - - factory UploadTaskMetadata.fromMap(Map map) { - return UploadTaskMetadata( - localAssetId: map['localAssetId'] as String, - isLivePhotos: map['isLivePhotos'] as bool, - livePhotoVideoId: map['livePhotoVideoId'] as String, - ); - } - - String toJson() => json.encode(toMap()); - - factory UploadTaskMetadata.fromJson(String source) => - UploadTaskMetadata.fromMap(json.decode(source) as Map); - - @override - String toString() => - 'UploadTaskMetadata(localAssetId: $localAssetId, isLivePhotos: $isLivePhotos, livePhotoVideoId: $livePhotoVideoId)'; - - @override - bool operator ==(covariant UploadTaskMetadata other) { - if (identical(this, other)) return true; - - return other.localAssetId == localAssetId && - other.isLivePhotos == isLivePhotos && - other.livePhotoVideoId == livePhotoVideoId; - } - - @override - int get hashCode => localAssetId.hashCode ^ isLivePhotos.hashCode ^ livePhotoVideoId.hashCode; -} diff --git a/mobile/lib/services/upload.service.dart b/mobile/lib/services/upload.service.dart index 398a402586..e8acf791a2 100644 --- a/mobile/lib/services/upload.service.dart +++ b/mobile/lib/services/upload.service.dart @@ -1,24 +1,51 @@ import 'dart:async'; +import 'dart:convert'; import 'dart:io'; import 'package:background_downloader/background_downloader.dart'; +import 'package:flutter/material.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; +import 'package:immich_mobile/constants/constants.dart'; +import 'package:immich_mobile/domain/models/asset/base_asset.model.dart'; import 'package:immich_mobile/domain/models/store.model.dart'; import 'package:immich_mobile/entities/store.entity.dart'; +import 'package:immich_mobile/infrastructure/repositories/backup.repository.dart'; +import 'package:immich_mobile/infrastructure/repositories/local_asset.repository.dart'; +import 'package:immich_mobile/infrastructure/repositories/storage.repository.dart'; +import 'package:immich_mobile/providers/backup/drift_backup.provider.dart'; +import 'package:immich_mobile/providers/infrastructure/asset.provider.dart'; +import 'package:immich_mobile/providers/infrastructure/storage.provider.dart'; import 'package:immich_mobile/repositories/upload.repository.dart'; import 'package:immich_mobile/services/api.service.dart'; -import 'package:path/path.dart'; +import 'package:path/path.dart' as p; final uploadServiceProvider = Provider((ref) { - final service = UploadService(ref.watch(uploadRepositoryProvider)); + final service = UploadService( + ref.watch(uploadRepositoryProvider), + ref.watch(backupRepositoryProvider), + ref.watch(storageRepositoryProvider), + ref.watch(localAssetRepository), + ); + ref.onDispose(service.dispose); return service; }); class UploadService { + UploadService( + this._uploadRepository, + this._backupRepository, + this._storageRepository, + this._localAssetRepository, + ) { + _uploadRepository.onUploadStatus = _onUploadCallback; + _uploadRepository.onTaskProgress = _onTaskProgressCallback; + } + final UploadRepository _uploadRepository; - void Function(TaskStatusUpdate)? onUploadStatus; - void Function(TaskProgressUpdate)? onTaskProgress; + final DriftBackupRepository _backupRepository; + final StorageRepository _storageRepository; + final DriftLocalAssetRepository _localAssetRepository; final StreamController _taskStatusController = StreamController.broadcast(); final StreamController _taskProgressController = StreamController.broadcast(); @@ -26,25 +53,19 @@ class UploadService { Stream get taskStatusStream => _taskStatusController.stream; Stream get taskProgressStream => _taskProgressController.stream; - UploadService( - this._uploadRepository, - ) { - _uploadRepository.onUploadStatus = _onUploadCallback; - _uploadRepository.onTaskProgress = _onTaskProgressCallback; - } + bool shouldAbortQueuingTasks = false; void _onTaskProgressCallback(TaskProgressUpdate update) { - onTaskProgress?.call(update); if (!_taskProgressController.isClosed) { _taskProgressController.add(update); } } void _onUploadCallback(TaskStatusUpdate update) { - onUploadStatus?.call(update); if (!_taskStatusController.isClosed) { _taskStatusController.add(update); } + _handleTaskStatusUpdate(update); } void dispose() { @@ -52,18 +73,234 @@ class UploadService { _taskProgressController.close(); } - Future cancelUpload(String id) { - return FileDownloader().cancelTaskWithId(id); - } - - Future cancelAllForGroup(String group) async { - await _uploadRepository.cancelAll(group); - await _uploadRepository.reset(group); - await _uploadRepository.deleteAllTrackingRecords(group); - } - void enqueueTasks(List tasks) { - _uploadRepository.enqueueAll(tasks); + _uploadRepository.enqueueBackgroundAll(tasks); + } + + Future> getActiveTasks(String group) { + return _uploadRepository.getActiveTasks(group); + } + + Future getBackupTotalCount() { + return _backupRepository.getTotalCount(); + } + + Future getBackupRemainderCount(String userId) { + return _backupRepository.getRemainderCount(userId); + } + + Future getBackupFinishedCount(String userId) { + return _backupRepository.getBackupCount(userId); + } + + Future manualBackup(List localAssets) async { + List tasks = []; + for (final asset in localAssets) { + final task = await _getUploadTask( + asset, + group: kManualUploadGroup, + priority: 1, // High priority after upload motion photo part + ); + if (task != null) { + tasks.add(task); + } + } + + if (tasks.isNotEmpty) { + enqueueTasks(tasks); + } + } + + /// Find backup candidates + /// Build the upload tasks + /// Enqueue the tasks + Future startBackup( + String userId, + void Function(EnqueueStatus status) onEnqueueTasks, + ) async { + shouldAbortQueuingTasks = false; + + final candidates = await _backupRepository.getCandidates(userId); + if (candidates.isEmpty) { + return; + } + + const batchSize = 100; + int count = 0; + for (int i = 0; i < candidates.length; i += batchSize) { + if (shouldAbortQueuingTasks) { + break; + } + + final batch = candidates.skip(i).take(batchSize).toList(); + + List tasks = []; + for (final asset in batch) { + final task = await _getUploadTask(asset); + if (task != null) { + tasks.add(task); + } + } + + if (tasks.isNotEmpty && !shouldAbortQueuingTasks) { + count += tasks.length; + enqueueTasks(tasks); + + onEnqueueTasks( + EnqueueStatus( + enqueueCount: count, + totalCount: candidates.length, + ), + ); + } + } + } + + /// Cancel all ongoing uploads and reset the upload queue + /// + /// Return the number of left over tasks in the queue + Future cancelBackup() async { + shouldAbortQueuingTasks = true; + + await _uploadRepository.reset(kBackupGroup); + await _uploadRepository.deleteDatabaseRecords(kBackupGroup); + + final activeTasks = await _uploadRepository.getActiveTasks(kBackupGroup); + return activeTasks.length; + } + + Future resumeBackup() { + return _uploadRepository.start(); + } + + void _handleTaskStatusUpdate(TaskStatusUpdate update) { + switch (update.status) { + case TaskStatus.complete: + _handleLivePhoto(update); + break; + + default: + break; + } + } + + Future _handleLivePhoto(TaskStatusUpdate update) async { + try { + if (update.task.metaData.isEmpty || update.task.metaData == '') { + return; + } + + final metadata = UploadTaskMetadata.fromJson(update.task.metaData); + if (!metadata.isLivePhotos) { + return; + } + + if (update.responseBody == null || update.responseBody!.isEmpty) { + return; + } + final response = jsonDecode(update.responseBody!); + + final localAsset = await _localAssetRepository.getById(metadata.localAssetId); + if (localAsset == null) { + return; + } + + final uploadTask = await _getLivePhotoUploadTask( + localAsset, + response['id'] as String, + ); + + if (uploadTask == null) { + return; + } + + enqueueTasks([uploadTask]); + } catch (error, stackTrace) { + debugPrint("Error handling live photo upload task: $error $stackTrace"); + } + } + + Future _getUploadTask( + LocalAsset asset, { + String group = kBackupGroup, + int? priority, + }) async { + final entity = await _storageRepository.getAssetEntityForAsset(asset); + if (entity == null) { + return null; + } + + File? file; + + /// iOS LivePhoto has two files: a photo and a video. + /// They are uploaded separately, with video file being upload first, then returned with the assetId + /// The assetId is then used as a metadata for the photo file upload task. + /// + /// We implement two separate upload groups for this, the normal one for the video file + /// and the higher priority group for the photo file because the video file is already uploaded. + /// + /// The cancel operation will only cancel the video group (normal group), the photo group will not + /// be touched, as the video file is already uploaded. + + if (entity.isLivePhoto) { + file = await _storageRepository.getMotionFileForAsset(asset); + } else { + file = await _storageRepository.getFileForAsset(asset.id); + } + + if (file == null) { + return null; + } + + final originalFileName = entity.isLivePhoto + ? p.setExtension( + asset.name, + p.extension(file.path), + ) + : asset.name; + + String metadata = UploadTaskMetadata( + localAssetId: asset.id, + isLivePhotos: entity.isLivePhoto, + livePhotoVideoId: '', + ).toJson(); + + return buildUploadTask( + file, + originalFileName: originalFileName, + deviceAssetId: asset.id, + metadata: metadata, + group: group, + priority: priority, + ); + } + + Future _getLivePhotoUploadTask( + LocalAsset asset, + String livePhotoVideoId, + ) async { + final entity = await _storageRepository.getAssetEntityForAsset(asset); + if (entity == null) { + return null; + } + + final file = await _storageRepository.getFileForAsset(asset.id); + if (file == null) { + return null; + } + + final fields = { + 'livePhotoVideoId': livePhotoVideoId, + }; + + return buildUploadTask( + file, + originalFileName: asset.name, + deviceAssetId: asset.id, + fields: fields, + group: kBackupLivePhotoGroup, + priority: 0, // Highest priority to get upload immediately + ); } Future buildUploadTask( @@ -74,26 +311,6 @@ class UploadService { String? deviceAssetId, String? metadata, int? priority, - }) async { - return _buildTask( - deviceAssetId ?? hash(file.path).toString(), - file, - fields: fields, - originalFileName: originalFileName, - metadata: metadata, - group: group, - priority: priority, - ); - } - - Future _buildTask( - String id, - File file, { - required String group, - Map? fields, - String? originalFileName, - String? metadata, - int? priority, }) async { final serverEndpoint = Store.get(StoreKey.serverEndpoint); final url = Uri.parse('$serverEndpoint/assets').toString(); @@ -106,7 +323,7 @@ class UploadService { final fileModifiedAt = stats.modified; final fieldsMap = { 'filename': originalFileName ?? filename, - 'deviceAssetId': id, + 'deviceAssetId': deviceAssetId ?? '', 'deviceId': deviceId, 'fileCreatedAt': fileCreatedAt.toUtc().toIso8601String(), 'fileModifiedAt': fileModifiedAt.toUtc().toIso8601String(), @@ -116,7 +333,7 @@ class UploadService { }; return UploadTask( - taskId: id, + taskId: deviceAssetId, displayName: originalFileName ?? filename, httpRequestMethod: 'POST', url: url, @@ -134,3 +351,64 @@ class UploadService { ); } } + +class UploadTaskMetadata { + final String localAssetId; + final bool isLivePhotos; + final String livePhotoVideoId; + + const UploadTaskMetadata({ + required this.localAssetId, + required this.isLivePhotos, + required this.livePhotoVideoId, + }); + + UploadTaskMetadata copyWith({ + String? localAssetId, + bool? isLivePhotos, + String? livePhotoVideoId, + }) { + return UploadTaskMetadata( + localAssetId: localAssetId ?? this.localAssetId, + isLivePhotos: isLivePhotos ?? this.isLivePhotos, + livePhotoVideoId: livePhotoVideoId ?? this.livePhotoVideoId, + ); + } + + Map toMap() { + return { + 'localAssetId': localAssetId, + 'isLivePhotos': isLivePhotos, + 'livePhotoVideoId': livePhotoVideoId, + }; + } + + factory UploadTaskMetadata.fromMap(Map map) { + return UploadTaskMetadata( + localAssetId: map['localAssetId'] as String, + isLivePhotos: map['isLivePhotos'] as bool, + livePhotoVideoId: map['livePhotoVideoId'] as String, + ); + } + + String toJson() => json.encode(toMap()); + + factory UploadTaskMetadata.fromJson(String source) => + UploadTaskMetadata.fromMap(json.decode(source) as Map); + + @override + String toString() => + 'UploadTaskMetadata(localAssetId: $localAssetId, isLivePhotos: $isLivePhotos, livePhotoVideoId: $livePhotoVideoId)'; + + @override + bool operator ==(covariant UploadTaskMetadata other) { + if (identical(this, other)) return true; + + return other.localAssetId == localAssetId && + other.isLivePhotos == isLivePhotos && + other.livePhotoVideoId == livePhotoVideoId; + } + + @override + int get hashCode => localAssetId.hashCode ^ isLivePhotos.hashCode ^ livePhotoVideoId.hashCode; +} diff --git a/mobile/test/domain/service.mock.dart b/mobile/test/domain/service.mock.dart index f4c5a32a4b..8293faf125 100644 --- a/mobile/test/domain/service.mock.dart +++ b/mobile/test/domain/service.mock.dart @@ -2,6 +2,8 @@ import 'package:immich_mobile/domain/services/store.service.dart'; import 'package:immich_mobile/domain/services/user.service.dart'; import 'package:immich_mobile/domain/utils/background_sync.dart'; import 'package:immich_mobile/platform/native_sync_api.g.dart'; +import 'package:immich_mobile/services/app_settings.service.dart'; +import 'package:immich_mobile/services/upload.service.dart'; import 'package:mocktail/mocktail.dart'; class MockStoreService extends Mock implements StoreService {} @@ -11,3 +13,7 @@ class MockUserService extends Mock implements UserService {} class MockBackgroundSyncManager extends Mock implements BackgroundSyncManager {} class MockNativeSyncApi extends Mock implements NativeSyncApi {} + +class MockAppSettingsService extends Mock implements AppSettingsService {} + +class MockUploadService extends Mock implements UploadService {} diff --git a/mobile/test/services/auth.service_test.dart b/mobile/test/services/auth.service_test.dart index 3e9a02a2a5..c9b44fe28b 100644 --- a/mobile/test/services/auth.service_test.dart +++ b/mobile/test/services/auth.service_test.dart @@ -3,6 +3,7 @@ import 'package:flutter_test/flutter_test.dart'; import 'package:immich_mobile/domain/services/store.service.dart'; import 'package:immich_mobile/infrastructure/repositories/store.repository.dart'; import 'package:immich_mobile/models/auth/auxilary_endpoint.model.dart'; +import 'package:immich_mobile/services/app_settings.service.dart'; import 'package:immich_mobile/services/auth.service.dart'; import 'package:isar/isar.dart'; import 'package:mocktail/mocktail.dart'; @@ -20,6 +21,8 @@ void main() { late MockApiService apiService; late MockNetworkService networkService; late MockBackgroundSyncManager backgroundSyncManager; + late MockUploadService uploadService; + late MockAppSettingService appSettingsService; late Isar db; setUp(() async { @@ -28,6 +31,8 @@ void main() { apiService = MockApiService(); networkService = MockNetworkService(); backgroundSyncManager = MockBackgroundSyncManager(); + uploadService = MockUploadService(); + appSettingsService = MockAppSettingService(); sut = AuthService( authApiRepository, @@ -35,6 +40,7 @@ void main() { apiService, networkService, backgroundSyncManager, + appSettingsService, ); registerFallbackValue(Uri()); @@ -118,7 +124,13 @@ void main() { when(() => authApiRepository.logout()).thenAnswer((_) async => {}); when(() => backgroundSyncManager.cancel()).thenAnswer((_) async => {}); when(() => authRepository.clearLocalData()).thenAnswer((_) => Future.value(null)); - + when(() => uploadService.cancelBackup()).thenAnswer((_) => Future.value(1)); + when( + () => appSettingsService.setSetting( + AppSettingsEnum.enableBackup, + false, + ), + ).thenAnswer((_) => Future.value(null)); await sut.logout(); verify(() => authApiRepository.logout()).called(1); @@ -130,7 +142,13 @@ void main() { when(() => authApiRepository.logout()).thenThrow(Exception('Server error')); when(() => backgroundSyncManager.cancel()).thenAnswer((_) async => {}); when(() => authRepository.clearLocalData()).thenAnswer((_) => Future.value(null)); - + when(() => uploadService.cancelBackup()).thenAnswer((_) => Future.value(1)); + when( + () => appSettingsService.setSetting( + AppSettingsEnum.enableBackup, + false, + ), + ).thenAnswer((_) => Future.value(null)); await sut.logout(); verify(() => authApiRepository.logout()).called(1);