import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'package:background_downloader/background_downloader.dart'; import 'package:flutter/material.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/constants/constants.dart'; import 'package:immich_mobile/domain/models/asset/base_asset.model.dart'; import 'package:immich_mobile/domain/models/store.model.dart'; import 'package:immich_mobile/entities/store.entity.dart'; import 'package:immich_mobile/infrastructure/repositories/backup.repository.dart'; import 'package:immich_mobile/infrastructure/repositories/local_asset.repository.dart'; import 'package:immich_mobile/infrastructure/repositories/storage.repository.dart'; import 'package:immich_mobile/providers/backup/drift_backup.provider.dart'; import 'package:immich_mobile/providers/infrastructure/asset.provider.dart'; import 'package:immich_mobile/providers/infrastructure/storage.provider.dart'; import 'package:immich_mobile/repositories/upload.repository.dart'; import 'package:immich_mobile/services/api.service.dart'; import 'package:path/path.dart' as p; final uploadServiceProvider = Provider((ref) { final service = UploadService( ref.watch(uploadRepositoryProvider), ref.watch(backupRepositoryProvider), ref.watch(storageRepositoryProvider), ref.watch(localAssetRepository), ); ref.onDispose(service.dispose); return service; }); class UploadService { UploadService( this._uploadRepository, this._backupRepository, this._storageRepository, this._localAssetRepository, ) { _uploadRepository.onUploadStatus = _onUploadCallback; _uploadRepository.onTaskProgress = _onTaskProgressCallback; } final UploadRepository _uploadRepository; final DriftBackupRepository _backupRepository; final StorageRepository _storageRepository; final DriftLocalAssetRepository _localAssetRepository; final StreamController _taskStatusController = StreamController.broadcast(); final StreamController _taskProgressController = StreamController.broadcast(); Stream get taskStatusStream => _taskStatusController.stream; Stream get taskProgressStream => _taskProgressController.stream; bool shouldAbortQueuingTasks = false; void _onTaskProgressCallback(TaskProgressUpdate update) { if (!_taskProgressController.isClosed) { _taskProgressController.add(update); } } void _onUploadCallback(TaskStatusUpdate update) { if (!_taskStatusController.isClosed) { _taskStatusController.add(update); } _handleTaskStatusUpdate(update); } void dispose() { _taskStatusController.close(); _taskProgressController.close(); } void enqueueTasks(List tasks) { _uploadRepository.enqueueBackgroundAll(tasks); } Future> getActiveTasks(String group) { return _uploadRepository.getActiveTasks(group); } Future getBackupTotalCount() { return _backupRepository.getTotalCount(); } Future getBackupRemainderCount(String userId) { return _backupRepository.getRemainderCount(userId); } Future getBackupFinishedCount(String userId) { return _backupRepository.getBackupCount(userId); } Future manualBackup(List localAssets) async { List tasks = []; for (final asset in localAssets) { final task = await _getUploadTask( asset, group: kManualUploadGroup, priority: 1, // High priority after upload motion photo part ); if (task != null) { tasks.add(task); } } if (tasks.isNotEmpty) { enqueueTasks(tasks); } } /// Find backup candidates /// Build the upload tasks /// Enqueue the tasks Future startBackup( String userId, void Function(EnqueueStatus status) onEnqueueTasks, ) async { shouldAbortQueuingTasks = false; final candidates = await _backupRepository.getCandidates(userId); if (candidates.isEmpty) { return; } const batchSize = 100; int count = 0; for (int i = 0; i < candidates.length; i += batchSize) { if (shouldAbortQueuingTasks) { break; } final batch = candidates.skip(i).take(batchSize).toList(); List tasks = []; for (final asset in batch) { final task = await _getUploadTask(asset); if (task != null) { tasks.add(task); } } if (tasks.isNotEmpty && !shouldAbortQueuingTasks) { count += tasks.length; enqueueTasks(tasks); onEnqueueTasks( EnqueueStatus( enqueueCount: count, totalCount: candidates.length, ), ); } } } /// Cancel all ongoing uploads and reset the upload queue /// /// Return the number of left over tasks in the queue Future cancelBackup() async { shouldAbortQueuingTasks = true; await _uploadRepository.reset(kBackupGroup); await _uploadRepository.deleteDatabaseRecords(kBackupGroup); final activeTasks = await _uploadRepository.getActiveTasks(kBackupGroup); return activeTasks.length; } Future resumeBackup() { return _uploadRepository.start(); } void _handleTaskStatusUpdate(TaskStatusUpdate update) { switch (update.status) { case TaskStatus.complete: _handleLivePhoto(update); break; default: break; } } Future _handleLivePhoto(TaskStatusUpdate update) async { try { if (update.task.metaData.isEmpty || update.task.metaData == '') { return; } final metadata = UploadTaskMetadata.fromJson(update.task.metaData); if (!metadata.isLivePhotos) { return; } if (update.responseBody == null || update.responseBody!.isEmpty) { return; } final response = jsonDecode(update.responseBody!); final localAsset = await _localAssetRepository.getById(metadata.localAssetId); if (localAsset == null) { return; } final uploadTask = await _getLivePhotoUploadTask( localAsset, response['id'] as String, ); if (uploadTask == null) { return; } enqueueTasks([uploadTask]); } catch (error, stackTrace) { debugPrint("Error handling live photo upload task: $error $stackTrace"); } } Future _getUploadTask( LocalAsset asset, { String group = kBackupGroup, int? priority, }) async { final entity = await _storageRepository.getAssetEntityForAsset(asset); if (entity == null) { return null; } File? file; /// iOS LivePhoto has two files: a photo and a video. /// They are uploaded separately, with video file being upload first, then returned with the assetId /// The assetId is then used as a metadata for the photo file upload task. /// /// We implement two separate upload groups for this, the normal one for the video file /// and the higher priority group for the photo file because the video file is already uploaded. /// /// The cancel operation will only cancel the video group (normal group), the photo group will not /// be touched, as the video file is already uploaded. if (entity.isLivePhoto) { file = await _storageRepository.getMotionFileForAsset(asset); } else { file = await _storageRepository.getFileForAsset(asset.id); } if (file == null) { return null; } final originalFileName = entity.isLivePhoto ? p.setExtension( asset.name, p.extension(file.path), ) : asset.name; String metadata = UploadTaskMetadata( localAssetId: asset.id, isLivePhotos: entity.isLivePhoto, livePhotoVideoId: '', ).toJson(); return buildUploadTask( file, originalFileName: originalFileName, deviceAssetId: asset.id, metadata: metadata, group: group, priority: priority, ); } Future _getLivePhotoUploadTask( LocalAsset asset, String livePhotoVideoId, ) async { final entity = await _storageRepository.getAssetEntityForAsset(asset); if (entity == null) { return null; } final file = await _storageRepository.getFileForAsset(asset.id); if (file == null) { return null; } final fields = { 'livePhotoVideoId': livePhotoVideoId, }; return buildUploadTask( file, originalFileName: asset.name, deviceAssetId: asset.id, fields: fields, group: kBackupLivePhotoGroup, priority: 0, // Highest priority to get upload immediately ); } Future buildUploadTask( File file, { required String group, Map? fields, String? originalFileName, String? deviceAssetId, String? metadata, int? priority, }) async { final serverEndpoint = Store.get(StoreKey.serverEndpoint); final url = Uri.parse('$serverEndpoint/assets').toString(); final headers = ApiService.getRequestHeaders(); final deviceId = Store.get(StoreKey.deviceId); final (baseDirectory, directory, filename) = await Task.split(filePath: file.path); final stats = await file.stat(); final fileCreatedAt = stats.changed; final fileModifiedAt = stats.modified; final fieldsMap = { 'filename': originalFileName ?? filename, 'deviceAssetId': deviceAssetId ?? '', 'deviceId': deviceId, 'fileCreatedAt': fileCreatedAt.toUtc().toIso8601String(), 'fileModifiedAt': fileModifiedAt.toUtc().toIso8601String(), 'isFavorite': 'false', 'duration': '0', if (fields != null) ...fields, }; return UploadTask( taskId: deviceAssetId, displayName: originalFileName ?? filename, httpRequestMethod: 'POST', url: url, headers: headers, filename: filename, fields: fieldsMap, baseDirectory: baseDirectory, directory: directory, fileField: 'assetData', metaData: metadata ?? '', group: group, priority: priority ?? 5, updates: Updates.statusAndProgress, retries: 3, ); } } class UploadTaskMetadata { final String localAssetId; final bool isLivePhotos; final String livePhotoVideoId; const UploadTaskMetadata({ required this.localAssetId, required this.isLivePhotos, required this.livePhotoVideoId, }); UploadTaskMetadata copyWith({ String? localAssetId, bool? isLivePhotos, String? livePhotoVideoId, }) { return UploadTaskMetadata( localAssetId: localAssetId ?? this.localAssetId, isLivePhotos: isLivePhotos ?? this.isLivePhotos, livePhotoVideoId: livePhotoVideoId ?? this.livePhotoVideoId, ); } Map toMap() { return { 'localAssetId': localAssetId, 'isLivePhotos': isLivePhotos, 'livePhotoVideoId': livePhotoVideoId, }; } factory UploadTaskMetadata.fromMap(Map map) { return UploadTaskMetadata( localAssetId: map['localAssetId'] as String, isLivePhotos: map['isLivePhotos'] as bool, livePhotoVideoId: map['livePhotoVideoId'] as String, ); } String toJson() => json.encode(toMap()); factory UploadTaskMetadata.fromJson(String source) => UploadTaskMetadata.fromMap(json.decode(source) as Map); @override String toString() => 'UploadTaskMetadata(localAssetId: $localAssetId, isLivePhotos: $isLivePhotos, livePhotoVideoId: $livePhotoVideoId)'; @override bool operator ==(covariant UploadTaskMetadata other) { if (identical(this, other)) return true; return other.localAssetId == localAssetId && other.isLivePhotos == isLivePhotos && other.livePhotoVideoId == livePhotoVideoId; } @override int get hashCode => localAssetId.hashCode ^ isLivePhotos.hashCode ^ livePhotoVideoId.hashCode; }