immich/mobile/lib/services/upload.service.dart
Alex 03a13828e1
chore: refactor upload service (#20130)
* chore: refactor upload service

* fix: cancel upload queue on logout (#20131)

* fix: cancel upload on logout

* fix: test

---------

Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com>
Co-authored-by: Alex <alex.tran1502@gmail.com>

---------

Co-authored-by: shenlong <139912620+shenlong-tanwen@users.noreply.github.com>
Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com>
2025-07-25 10:09:32 -05:00

415 lines
12 KiB
Dart

import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:background_downloader/background_downloader.dart';
import 'package:flutter/material.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/constants/constants.dart';
import 'package:immich_mobile/domain/models/asset/base_asset.model.dart';
import 'package:immich_mobile/domain/models/store.model.dart';
import 'package:immich_mobile/entities/store.entity.dart';
import 'package:immich_mobile/infrastructure/repositories/backup.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/local_asset.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/storage.repository.dart';
import 'package:immich_mobile/providers/backup/drift_backup.provider.dart';
import 'package:immich_mobile/providers/infrastructure/asset.provider.dart';
import 'package:immich_mobile/providers/infrastructure/storage.provider.dart';
import 'package:immich_mobile/repositories/upload.repository.dart';
import 'package:immich_mobile/services/api.service.dart';
import 'package:path/path.dart' as p;
final uploadServiceProvider = Provider((ref) {
final service = UploadService(
ref.watch(uploadRepositoryProvider),
ref.watch(backupRepositoryProvider),
ref.watch(storageRepositoryProvider),
ref.watch(localAssetRepository),
);
ref.onDispose(service.dispose);
return service;
});
class UploadService {
UploadService(
this._uploadRepository,
this._backupRepository,
this._storageRepository,
this._localAssetRepository,
) {
_uploadRepository.onUploadStatus = _onUploadCallback;
_uploadRepository.onTaskProgress = _onTaskProgressCallback;
}
final UploadRepository _uploadRepository;
final DriftBackupRepository _backupRepository;
final StorageRepository _storageRepository;
final DriftLocalAssetRepository _localAssetRepository;
final StreamController<TaskStatusUpdate> _taskStatusController = StreamController<TaskStatusUpdate>.broadcast();
final StreamController<TaskProgressUpdate> _taskProgressController = StreamController<TaskProgressUpdate>.broadcast();
Stream<TaskStatusUpdate> get taskStatusStream => _taskStatusController.stream;
Stream<TaskProgressUpdate> get taskProgressStream => _taskProgressController.stream;
bool shouldAbortQueuingTasks = false;
void _onTaskProgressCallback(TaskProgressUpdate update) {
if (!_taskProgressController.isClosed) {
_taskProgressController.add(update);
}
}
void _onUploadCallback(TaskStatusUpdate update) {
if (!_taskStatusController.isClosed) {
_taskStatusController.add(update);
}
_handleTaskStatusUpdate(update);
}
void dispose() {
_taskStatusController.close();
_taskProgressController.close();
}
void enqueueTasks(List<UploadTask> tasks) {
_uploadRepository.enqueueBackgroundAll(tasks);
}
Future<List<Task>> getActiveTasks(String group) {
return _uploadRepository.getActiveTasks(group);
}
Future<int> getBackupTotalCount() {
return _backupRepository.getTotalCount();
}
Future<int> getBackupRemainderCount(String userId) {
return _backupRepository.getRemainderCount(userId);
}
Future<int> getBackupFinishedCount(String userId) {
return _backupRepository.getBackupCount(userId);
}
Future<void> manualBackup(List<LocalAsset> localAssets) async {
List<UploadTask> tasks = [];
for (final asset in localAssets) {
final task = await _getUploadTask(
asset,
group: kManualUploadGroup,
priority: 1, // High priority after upload motion photo part
);
if (task != null) {
tasks.add(task);
}
}
if (tasks.isNotEmpty) {
enqueueTasks(tasks);
}
}
/// Find backup candidates
/// Build the upload tasks
/// Enqueue the tasks
Future<void> startBackup(
String userId,
void Function(EnqueueStatus status) onEnqueueTasks,
) async {
shouldAbortQueuingTasks = false;
final candidates = await _backupRepository.getCandidates(userId);
if (candidates.isEmpty) {
return;
}
const batchSize = 100;
int count = 0;
for (int i = 0; i < candidates.length; i += batchSize) {
if (shouldAbortQueuingTasks) {
break;
}
final batch = candidates.skip(i).take(batchSize).toList();
List<UploadTask> tasks = [];
for (final asset in batch) {
final task = await _getUploadTask(asset);
if (task != null) {
tasks.add(task);
}
}
if (tasks.isNotEmpty && !shouldAbortQueuingTasks) {
count += tasks.length;
enqueueTasks(tasks);
onEnqueueTasks(
EnqueueStatus(
enqueueCount: count,
totalCount: candidates.length,
),
);
}
}
}
/// Cancel all ongoing uploads and reset the upload queue
///
/// Return the number of left over tasks in the queue
Future<int> cancelBackup() async {
shouldAbortQueuingTasks = true;
await _uploadRepository.reset(kBackupGroup);
await _uploadRepository.deleteDatabaseRecords(kBackupGroup);
final activeTasks = await _uploadRepository.getActiveTasks(kBackupGroup);
return activeTasks.length;
}
Future<void> resumeBackup() {
return _uploadRepository.start();
}
void _handleTaskStatusUpdate(TaskStatusUpdate update) {
switch (update.status) {
case TaskStatus.complete:
_handleLivePhoto(update);
break;
default:
break;
}
}
Future<void> _handleLivePhoto(TaskStatusUpdate update) async {
try {
if (update.task.metaData.isEmpty || update.task.metaData == '') {
return;
}
final metadata = UploadTaskMetadata.fromJson(update.task.metaData);
if (!metadata.isLivePhotos) {
return;
}
if (update.responseBody == null || update.responseBody!.isEmpty) {
return;
}
final response = jsonDecode(update.responseBody!);
final localAsset = await _localAssetRepository.getById(metadata.localAssetId);
if (localAsset == null) {
return;
}
final uploadTask = await _getLivePhotoUploadTask(
localAsset,
response['id'] as String,
);
if (uploadTask == null) {
return;
}
enqueueTasks([uploadTask]);
} catch (error, stackTrace) {
debugPrint("Error handling live photo upload task: $error $stackTrace");
}
}
Future<UploadTask?> _getUploadTask(
LocalAsset asset, {
String group = kBackupGroup,
int? priority,
}) async {
final entity = await _storageRepository.getAssetEntityForAsset(asset);
if (entity == null) {
return null;
}
File? file;
/// iOS LivePhoto has two files: a photo and a video.
/// They are uploaded separately, with video file being upload first, then returned with the assetId
/// The assetId is then used as a metadata for the photo file upload task.
///
/// We implement two separate upload groups for this, the normal one for the video file
/// and the higher priority group for the photo file because the video file is already uploaded.
///
/// The cancel operation will only cancel the video group (normal group), the photo group will not
/// be touched, as the video file is already uploaded.
if (entity.isLivePhoto) {
file = await _storageRepository.getMotionFileForAsset(asset);
} else {
file = await _storageRepository.getFileForAsset(asset.id);
}
if (file == null) {
return null;
}
final originalFileName = entity.isLivePhoto
? p.setExtension(
asset.name,
p.extension(file.path),
)
: asset.name;
String metadata = UploadTaskMetadata(
localAssetId: asset.id,
isLivePhotos: entity.isLivePhoto,
livePhotoVideoId: '',
).toJson();
return buildUploadTask(
file,
originalFileName: originalFileName,
deviceAssetId: asset.id,
metadata: metadata,
group: group,
priority: priority,
);
}
Future<UploadTask?> _getLivePhotoUploadTask(
LocalAsset asset,
String livePhotoVideoId,
) async {
final entity = await _storageRepository.getAssetEntityForAsset(asset);
if (entity == null) {
return null;
}
final file = await _storageRepository.getFileForAsset(asset.id);
if (file == null) {
return null;
}
final fields = {
'livePhotoVideoId': livePhotoVideoId,
};
return buildUploadTask(
file,
originalFileName: asset.name,
deviceAssetId: asset.id,
fields: fields,
group: kBackupLivePhotoGroup,
priority: 0, // Highest priority to get upload immediately
);
}
Future<UploadTask> buildUploadTask(
File file, {
required String group,
Map<String, String>? fields,
String? originalFileName,
String? deviceAssetId,
String? metadata,
int? priority,
}) async {
final serverEndpoint = Store.get(StoreKey.serverEndpoint);
final url = Uri.parse('$serverEndpoint/assets').toString();
final headers = ApiService.getRequestHeaders();
final deviceId = Store.get(StoreKey.deviceId);
final (baseDirectory, directory, filename) = await Task.split(filePath: file.path);
final stats = await file.stat();
final fileCreatedAt = stats.changed;
final fileModifiedAt = stats.modified;
final fieldsMap = {
'filename': originalFileName ?? filename,
'deviceAssetId': deviceAssetId ?? '',
'deviceId': deviceId,
'fileCreatedAt': fileCreatedAt.toUtc().toIso8601String(),
'fileModifiedAt': fileModifiedAt.toUtc().toIso8601String(),
'isFavorite': 'false',
'duration': '0',
if (fields != null) ...fields,
};
return UploadTask(
taskId: deviceAssetId,
displayName: originalFileName ?? filename,
httpRequestMethod: 'POST',
url: url,
headers: headers,
filename: filename,
fields: fieldsMap,
baseDirectory: baseDirectory,
directory: directory,
fileField: 'assetData',
metaData: metadata ?? '',
group: group,
priority: priority ?? 5,
updates: Updates.statusAndProgress,
retries: 3,
);
}
}
class UploadTaskMetadata {
final String localAssetId;
final bool isLivePhotos;
final String livePhotoVideoId;
const UploadTaskMetadata({
required this.localAssetId,
required this.isLivePhotos,
required this.livePhotoVideoId,
});
UploadTaskMetadata copyWith({
String? localAssetId,
bool? isLivePhotos,
String? livePhotoVideoId,
}) {
return UploadTaskMetadata(
localAssetId: localAssetId ?? this.localAssetId,
isLivePhotos: isLivePhotos ?? this.isLivePhotos,
livePhotoVideoId: livePhotoVideoId ?? this.livePhotoVideoId,
);
}
Map<String, dynamic> toMap() {
return <String, dynamic>{
'localAssetId': localAssetId,
'isLivePhotos': isLivePhotos,
'livePhotoVideoId': livePhotoVideoId,
};
}
factory UploadTaskMetadata.fromMap(Map<String, dynamic> map) {
return UploadTaskMetadata(
localAssetId: map['localAssetId'] as String,
isLivePhotos: map['isLivePhotos'] as bool,
livePhotoVideoId: map['livePhotoVideoId'] as String,
);
}
String toJson() => json.encode(toMap());
factory UploadTaskMetadata.fromJson(String source) =>
UploadTaskMetadata.fromMap(json.decode(source) as Map<String, dynamic>);
@override
String toString() =>
'UploadTaskMetadata(localAssetId: $localAssetId, isLivePhotos: $isLivePhotos, livePhotoVideoId: $livePhotoVideoId)';
@override
bool operator ==(covariant UploadTaskMetadata other) {
if (identical(this, other)) return true;
return other.localAssetId == localAssetId &&
other.isLivePhotos == isLivePhotos &&
other.livePhotoVideoId == livePhotoVideoId;
}
@override
int get hashCode => localAssetId.hashCode ^ isLivePhotos.hashCode ^ livePhotoVideoId.hashCode;
}