immich/mobile/lib/providers/backup/drift_backup.provider.dart
Alex 03a13828e1
chore: refactor upload service (#20130)
* chore: refactor upload service

* fix: cancel upload queue on logout (#20131)

* fix: cancel upload on logout

* fix: test

---------

Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com>
Co-authored-by: Alex <alex.tran1502@gmail.com>

---------

Co-authored-by: shenlong <139912620+shenlong-tanwen@users.noreply.github.com>
Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com>
2025-07-25 10:09:32 -05:00

397 lines
11 KiB
Dart

// ignore_for_file: public_member_api_docs, sort_constructors_first
import 'dart:async';
import 'dart:convert';
import 'package:background_downloader/background_downloader.dart';
import 'package:collection/collection.dart';
import 'package:flutter/widgets.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/constants/constants.dart';
import 'package:immich_mobile/services/upload.service.dart';
class EnqueueStatus {
final int enqueueCount;
final int totalCount;
const EnqueueStatus({
required this.enqueueCount,
required this.totalCount,
});
EnqueueStatus copyWith({
int? enqueueCount,
int? totalCount,
}) {
return EnqueueStatus(
enqueueCount: enqueueCount ?? this.enqueueCount,
totalCount: totalCount ?? this.totalCount,
);
}
@override
String toString() => 'EnqueueStatus(enqueueCount: $enqueueCount, totalCount: $totalCount)';
}
class DriftUploadStatus {
final String taskId;
final String filename;
final double progress;
final int fileSize;
final String networkSpeedAsString;
final bool? isFailed;
const DriftUploadStatus({
required this.taskId,
required this.filename,
required this.progress,
required this.fileSize,
required this.networkSpeedAsString,
this.isFailed,
});
DriftUploadStatus copyWith({
String? taskId,
String? filename,
double? progress,
int? fileSize,
String? networkSpeedAsString,
bool? isFailed,
}) {
return DriftUploadStatus(
taskId: taskId ?? this.taskId,
filename: filename ?? this.filename,
progress: progress ?? this.progress,
fileSize: fileSize ?? this.fileSize,
networkSpeedAsString: networkSpeedAsString ?? this.networkSpeedAsString,
isFailed: isFailed ?? this.isFailed,
);
}
@override
String toString() {
return 'DriftUploadStatus(taskId: $taskId, filename: $filename, progress: $progress, fileSize: $fileSize, networkSpeedAsString: $networkSpeedAsString, isFailed: $isFailed)';
}
@override
bool operator ==(covariant DriftUploadStatus other) {
if (identical(this, other)) return true;
return other.taskId == taskId &&
other.filename == filename &&
other.progress == progress &&
other.fileSize == fileSize &&
other.networkSpeedAsString == networkSpeedAsString &&
other.isFailed == isFailed;
}
@override
int get hashCode {
return taskId.hashCode ^
filename.hashCode ^
progress.hashCode ^
fileSize.hashCode ^
networkSpeedAsString.hashCode ^
isFailed.hashCode;
}
Map<String, dynamic> toMap() {
return <String, dynamic>{
'taskId': taskId,
'filename': filename,
'progress': progress,
'fileSize': fileSize,
'networkSpeedAsString': networkSpeedAsString,
'isFailed': isFailed,
};
}
factory DriftUploadStatus.fromMap(Map<String, dynamic> map) {
return DriftUploadStatus(
taskId: map['taskId'] as String,
filename: map['filename'] as String,
progress: map['progress'] as double,
fileSize: map['fileSize'] as int,
networkSpeedAsString: map['networkSpeedAsString'] as String,
isFailed: map['isFailed'] != null ? map['isFailed'] as bool : null,
);
}
String toJson() => json.encode(toMap());
factory DriftUploadStatus.fromJson(String source) =>
DriftUploadStatus.fromMap(json.decode(source) as Map<String, dynamic>);
}
class DriftBackupState {
final int totalCount;
final int backupCount;
final int remainderCount;
final int enqueueCount;
final int enqueueTotalCount;
final bool isCanceling;
final Map<String, DriftUploadStatus> uploadItems;
const DriftBackupState({
required this.totalCount,
required this.backupCount,
required this.remainderCount,
required this.enqueueCount,
required this.enqueueTotalCount,
required this.isCanceling,
required this.uploadItems,
});
DriftBackupState copyWith({
int? totalCount,
int? backupCount,
int? remainderCount,
int? enqueueCount,
int? enqueueTotalCount,
bool? isCanceling,
Map<String, DriftUploadStatus>? uploadItems,
}) {
return DriftBackupState(
totalCount: totalCount ?? this.totalCount,
backupCount: backupCount ?? this.backupCount,
remainderCount: remainderCount ?? this.remainderCount,
enqueueCount: enqueueCount ?? this.enqueueCount,
enqueueTotalCount: enqueueTotalCount ?? this.enqueueTotalCount,
isCanceling: isCanceling ?? this.isCanceling,
uploadItems: uploadItems ?? this.uploadItems,
);
}
@override
String toString() {
return 'DriftBackupState(totalCount: $totalCount, backupCount: $backupCount, remainderCount: $remainderCount, enqueueCount: $enqueueCount, enqueueTotalCount: $enqueueTotalCount, isCanceling: $isCanceling, uploadItems: $uploadItems)';
}
@override
bool operator ==(covariant DriftBackupState other) {
if (identical(this, other)) return true;
final mapEquals = const DeepCollectionEquality().equals;
return other.totalCount == totalCount &&
other.backupCount == backupCount &&
other.remainderCount == remainderCount &&
other.enqueueCount == enqueueCount &&
other.enqueueTotalCount == enqueueTotalCount &&
other.isCanceling == isCanceling &&
mapEquals(other.uploadItems, uploadItems);
}
@override
int get hashCode {
return totalCount.hashCode ^
backupCount.hashCode ^
remainderCount.hashCode ^
enqueueCount.hashCode ^
enqueueTotalCount.hashCode ^
isCanceling.hashCode ^
uploadItems.hashCode;
}
}
final driftBackupProvider = StateNotifierProvider<ExpBackupNotifier, DriftBackupState>((ref) {
return ExpBackupNotifier(
ref.watch(uploadServiceProvider),
);
});
class ExpBackupNotifier extends StateNotifier<DriftBackupState> {
ExpBackupNotifier(
this._uploadService,
) : super(
const DriftBackupState(
totalCount: 0,
backupCount: 0,
remainderCount: 0,
enqueueCount: 0,
enqueueTotalCount: 0,
isCanceling: false,
uploadItems: {},
),
) {
{
_uploadService.taskStatusStream.listen(_handleTaskStatusUpdate);
_uploadService.taskProgressStream.listen(_handleTaskProgressUpdate);
}
}
final UploadService _uploadService;
StreamSubscription<TaskStatusUpdate>? _statusSubscription;
StreamSubscription<TaskProgressUpdate>? _progressSubscription;
/// Remove upload item from state
void _removeUploadItem(String taskId) {
if (state.uploadItems.containsKey(taskId)) {
final updatedItems = Map<String, DriftUploadStatus>.from(state.uploadItems);
updatedItems.remove(taskId);
state = state.copyWith(uploadItems: updatedItems);
}
}
void _handleTaskStatusUpdate(TaskStatusUpdate update) {
final taskId = update.task.taskId;
switch (update.status) {
case TaskStatus.complete:
if (update.task.group == kBackupGroup) {
state = state.copyWith(
backupCount: state.backupCount + 1,
remainderCount: state.remainderCount - 1,
);
}
// Remove the completed task from the upload items
if (state.uploadItems.containsKey(taskId)) {
Future.delayed(const Duration(milliseconds: 1000), () {
_removeUploadItem(taskId);
});
}
case TaskStatus.failed:
final currentItem = state.uploadItems[taskId];
if (currentItem == null) {
return;
}
state = state.copyWith(
uploadItems: {
...state.uploadItems,
taskId: currentItem.copyWith(
isFailed: true,
),
},
);
break;
case TaskStatus.canceled:
_removeUploadItem(update.task.taskId);
break;
default:
break;
}
}
void _handleTaskProgressUpdate(TaskProgressUpdate update) {
final taskId = update.task.taskId;
final filename = update.task.displayName;
final progress = update.progress;
final currentItem = state.uploadItems[taskId];
if (currentItem != null) {
if (progress == kUploadStatusCanceled) {
_removeUploadItem(update.task.taskId);
return;
}
state = state.copyWith(
uploadItems: {
...state.uploadItems,
taskId: update.hasExpectedFileSize
? currentItem.copyWith(
progress: progress,
fileSize: update.expectedFileSize,
networkSpeedAsString: update.networkSpeedAsString,
)
: currentItem.copyWith(
progress: progress,
),
},
);
return;
}
state = state.copyWith(
uploadItems: {
...state.uploadItems,
taskId: DriftUploadStatus(
taskId: taskId,
filename: filename,
progress: progress,
fileSize: update.expectedFileSize,
networkSpeedAsString: update.networkSpeedAsString,
),
},
);
}
Future<void> getBackupStatus(String userId) async {
final [totalCount, backupCount, remainderCount] = await Future.wait([
_uploadService.getBackupTotalCount(),
_uploadService.getBackupFinishedCount(userId),
_uploadService.getBackupRemainderCount(userId),
]);
state = state.copyWith(
totalCount: totalCount,
backupCount: backupCount,
remainderCount: remainderCount,
);
}
Future<void> startBackup(String userId) {
return _uploadService.startBackup(userId, _updateEnqueueCount);
}
void _updateEnqueueCount(EnqueueStatus status) {
state = state.copyWith(
enqueueCount: status.enqueueCount,
enqueueTotalCount: status.totalCount,
);
}
Future<void> cancel() async {
debugPrint("Canceling backup tasks...");
state = state.copyWith(
enqueueCount: 0,
enqueueTotalCount: 0,
isCanceling: true,
);
final activeTaskCount = await _uploadService.cancelBackup();
if (activeTaskCount > 0) {
debugPrint(
"$activeTaskCount tasks left, continuing to cancel...",
);
await cancel();
} else {
debugPrint("All tasks canceled successfully.");
// Clear all upload items when cancellation is complete
state = state.copyWith(
isCanceling: false,
uploadItems: {},
);
}
}
Future<void> handleBackupResume(String userId) async {
debugPrint("handleBackupResume");
final tasks = await _uploadService.getActiveTasks(kBackupGroup);
debugPrint("Found ${tasks.length} tasks");
if (tasks.isEmpty) {
// Start a new backup queue
debugPrint("Start a new backup queue");
await startBackup(userId);
}
debugPrint("Tasks to resume: ${tasks.length}");
await _uploadService.resumeBackup();
}
@override
void dispose() {
_statusSubscription?.cancel();
_progressSubscription?.cancel();
super.dispose();
}
}