mirror of
https://github.com/immich-app/immich.git
synced 2025-07-31 15:08:44 -04:00
* chore: refactor upload service * fix: cancel upload queue on logout (#20131) * fix: cancel upload on logout * fix: test --------- Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com> Co-authored-by: Alex <alex.tran1502@gmail.com> --------- Co-authored-by: shenlong <139912620+shenlong-tanwen@users.noreply.github.com> Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com>
397 lines
11 KiB
Dart
397 lines
11 KiB
Dart
// ignore_for_file: public_member_api_docs, sort_constructors_first
|
|
import 'dart:async';
|
|
import 'dart:convert';
|
|
|
|
import 'package:background_downloader/background_downloader.dart';
|
|
import 'package:collection/collection.dart';
|
|
import 'package:flutter/widgets.dart';
|
|
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
|
|
|
import 'package:immich_mobile/constants/constants.dart';
|
|
import 'package:immich_mobile/services/upload.service.dart';
|
|
|
|
class EnqueueStatus {
|
|
final int enqueueCount;
|
|
final int totalCount;
|
|
|
|
const EnqueueStatus({
|
|
required this.enqueueCount,
|
|
required this.totalCount,
|
|
});
|
|
|
|
EnqueueStatus copyWith({
|
|
int? enqueueCount,
|
|
int? totalCount,
|
|
}) {
|
|
return EnqueueStatus(
|
|
enqueueCount: enqueueCount ?? this.enqueueCount,
|
|
totalCount: totalCount ?? this.totalCount,
|
|
);
|
|
}
|
|
|
|
@override
|
|
String toString() => 'EnqueueStatus(enqueueCount: $enqueueCount, totalCount: $totalCount)';
|
|
}
|
|
|
|
class DriftUploadStatus {
|
|
final String taskId;
|
|
final String filename;
|
|
final double progress;
|
|
final int fileSize;
|
|
final String networkSpeedAsString;
|
|
final bool? isFailed;
|
|
|
|
const DriftUploadStatus({
|
|
required this.taskId,
|
|
required this.filename,
|
|
required this.progress,
|
|
required this.fileSize,
|
|
required this.networkSpeedAsString,
|
|
this.isFailed,
|
|
});
|
|
|
|
DriftUploadStatus copyWith({
|
|
String? taskId,
|
|
String? filename,
|
|
double? progress,
|
|
int? fileSize,
|
|
String? networkSpeedAsString,
|
|
bool? isFailed,
|
|
}) {
|
|
return DriftUploadStatus(
|
|
taskId: taskId ?? this.taskId,
|
|
filename: filename ?? this.filename,
|
|
progress: progress ?? this.progress,
|
|
fileSize: fileSize ?? this.fileSize,
|
|
networkSpeedAsString: networkSpeedAsString ?? this.networkSpeedAsString,
|
|
isFailed: isFailed ?? this.isFailed,
|
|
);
|
|
}
|
|
|
|
@override
|
|
String toString() {
|
|
return 'DriftUploadStatus(taskId: $taskId, filename: $filename, progress: $progress, fileSize: $fileSize, networkSpeedAsString: $networkSpeedAsString, isFailed: $isFailed)';
|
|
}
|
|
|
|
@override
|
|
bool operator ==(covariant DriftUploadStatus other) {
|
|
if (identical(this, other)) return true;
|
|
|
|
return other.taskId == taskId &&
|
|
other.filename == filename &&
|
|
other.progress == progress &&
|
|
other.fileSize == fileSize &&
|
|
other.networkSpeedAsString == networkSpeedAsString &&
|
|
other.isFailed == isFailed;
|
|
}
|
|
|
|
@override
|
|
int get hashCode {
|
|
return taskId.hashCode ^
|
|
filename.hashCode ^
|
|
progress.hashCode ^
|
|
fileSize.hashCode ^
|
|
networkSpeedAsString.hashCode ^
|
|
isFailed.hashCode;
|
|
}
|
|
|
|
Map<String, dynamic> toMap() {
|
|
return <String, dynamic>{
|
|
'taskId': taskId,
|
|
'filename': filename,
|
|
'progress': progress,
|
|
'fileSize': fileSize,
|
|
'networkSpeedAsString': networkSpeedAsString,
|
|
'isFailed': isFailed,
|
|
};
|
|
}
|
|
|
|
factory DriftUploadStatus.fromMap(Map<String, dynamic> map) {
|
|
return DriftUploadStatus(
|
|
taskId: map['taskId'] as String,
|
|
filename: map['filename'] as String,
|
|
progress: map['progress'] as double,
|
|
fileSize: map['fileSize'] as int,
|
|
networkSpeedAsString: map['networkSpeedAsString'] as String,
|
|
isFailed: map['isFailed'] != null ? map['isFailed'] as bool : null,
|
|
);
|
|
}
|
|
|
|
String toJson() => json.encode(toMap());
|
|
|
|
factory DriftUploadStatus.fromJson(String source) =>
|
|
DriftUploadStatus.fromMap(json.decode(source) as Map<String, dynamic>);
|
|
}
|
|
|
|
class DriftBackupState {
|
|
final int totalCount;
|
|
final int backupCount;
|
|
final int remainderCount;
|
|
|
|
final int enqueueCount;
|
|
final int enqueueTotalCount;
|
|
|
|
final bool isCanceling;
|
|
|
|
final Map<String, DriftUploadStatus> uploadItems;
|
|
|
|
const DriftBackupState({
|
|
required this.totalCount,
|
|
required this.backupCount,
|
|
required this.remainderCount,
|
|
required this.enqueueCount,
|
|
required this.enqueueTotalCount,
|
|
required this.isCanceling,
|
|
required this.uploadItems,
|
|
});
|
|
|
|
DriftBackupState copyWith({
|
|
int? totalCount,
|
|
int? backupCount,
|
|
int? remainderCount,
|
|
int? enqueueCount,
|
|
int? enqueueTotalCount,
|
|
bool? isCanceling,
|
|
Map<String, DriftUploadStatus>? uploadItems,
|
|
}) {
|
|
return DriftBackupState(
|
|
totalCount: totalCount ?? this.totalCount,
|
|
backupCount: backupCount ?? this.backupCount,
|
|
remainderCount: remainderCount ?? this.remainderCount,
|
|
enqueueCount: enqueueCount ?? this.enqueueCount,
|
|
enqueueTotalCount: enqueueTotalCount ?? this.enqueueTotalCount,
|
|
isCanceling: isCanceling ?? this.isCanceling,
|
|
uploadItems: uploadItems ?? this.uploadItems,
|
|
);
|
|
}
|
|
|
|
@override
|
|
String toString() {
|
|
return 'DriftBackupState(totalCount: $totalCount, backupCount: $backupCount, remainderCount: $remainderCount, enqueueCount: $enqueueCount, enqueueTotalCount: $enqueueTotalCount, isCanceling: $isCanceling, uploadItems: $uploadItems)';
|
|
}
|
|
|
|
@override
|
|
bool operator ==(covariant DriftBackupState other) {
|
|
if (identical(this, other)) return true;
|
|
final mapEquals = const DeepCollectionEquality().equals;
|
|
|
|
return other.totalCount == totalCount &&
|
|
other.backupCount == backupCount &&
|
|
other.remainderCount == remainderCount &&
|
|
other.enqueueCount == enqueueCount &&
|
|
other.enqueueTotalCount == enqueueTotalCount &&
|
|
other.isCanceling == isCanceling &&
|
|
mapEquals(other.uploadItems, uploadItems);
|
|
}
|
|
|
|
@override
|
|
int get hashCode {
|
|
return totalCount.hashCode ^
|
|
backupCount.hashCode ^
|
|
remainderCount.hashCode ^
|
|
enqueueCount.hashCode ^
|
|
enqueueTotalCount.hashCode ^
|
|
isCanceling.hashCode ^
|
|
uploadItems.hashCode;
|
|
}
|
|
}
|
|
|
|
final driftBackupProvider = StateNotifierProvider<ExpBackupNotifier, DriftBackupState>((ref) {
|
|
return ExpBackupNotifier(
|
|
ref.watch(uploadServiceProvider),
|
|
);
|
|
});
|
|
|
|
class ExpBackupNotifier extends StateNotifier<DriftBackupState> {
|
|
ExpBackupNotifier(
|
|
this._uploadService,
|
|
) : super(
|
|
const DriftBackupState(
|
|
totalCount: 0,
|
|
backupCount: 0,
|
|
remainderCount: 0,
|
|
enqueueCount: 0,
|
|
enqueueTotalCount: 0,
|
|
isCanceling: false,
|
|
uploadItems: {},
|
|
),
|
|
) {
|
|
{
|
|
_uploadService.taskStatusStream.listen(_handleTaskStatusUpdate);
|
|
_uploadService.taskProgressStream.listen(_handleTaskProgressUpdate);
|
|
}
|
|
}
|
|
|
|
final UploadService _uploadService;
|
|
StreamSubscription<TaskStatusUpdate>? _statusSubscription;
|
|
StreamSubscription<TaskProgressUpdate>? _progressSubscription;
|
|
|
|
/// Remove upload item from state
|
|
void _removeUploadItem(String taskId) {
|
|
if (state.uploadItems.containsKey(taskId)) {
|
|
final updatedItems = Map<String, DriftUploadStatus>.from(state.uploadItems);
|
|
updatedItems.remove(taskId);
|
|
state = state.copyWith(uploadItems: updatedItems);
|
|
}
|
|
}
|
|
|
|
void _handleTaskStatusUpdate(TaskStatusUpdate update) {
|
|
final taskId = update.task.taskId;
|
|
|
|
switch (update.status) {
|
|
case TaskStatus.complete:
|
|
if (update.task.group == kBackupGroup) {
|
|
state = state.copyWith(
|
|
backupCount: state.backupCount + 1,
|
|
remainderCount: state.remainderCount - 1,
|
|
);
|
|
}
|
|
|
|
// Remove the completed task from the upload items
|
|
if (state.uploadItems.containsKey(taskId)) {
|
|
Future.delayed(const Duration(milliseconds: 1000), () {
|
|
_removeUploadItem(taskId);
|
|
});
|
|
}
|
|
|
|
case TaskStatus.failed:
|
|
final currentItem = state.uploadItems[taskId];
|
|
if (currentItem == null) {
|
|
return;
|
|
}
|
|
|
|
state = state.copyWith(
|
|
uploadItems: {
|
|
...state.uploadItems,
|
|
taskId: currentItem.copyWith(
|
|
isFailed: true,
|
|
),
|
|
},
|
|
);
|
|
break;
|
|
|
|
case TaskStatus.canceled:
|
|
_removeUploadItem(update.task.taskId);
|
|
break;
|
|
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
|
|
void _handleTaskProgressUpdate(TaskProgressUpdate update) {
|
|
final taskId = update.task.taskId;
|
|
final filename = update.task.displayName;
|
|
final progress = update.progress;
|
|
final currentItem = state.uploadItems[taskId];
|
|
if (currentItem != null) {
|
|
if (progress == kUploadStatusCanceled) {
|
|
_removeUploadItem(update.task.taskId);
|
|
return;
|
|
}
|
|
|
|
state = state.copyWith(
|
|
uploadItems: {
|
|
...state.uploadItems,
|
|
taskId: update.hasExpectedFileSize
|
|
? currentItem.copyWith(
|
|
progress: progress,
|
|
fileSize: update.expectedFileSize,
|
|
networkSpeedAsString: update.networkSpeedAsString,
|
|
)
|
|
: currentItem.copyWith(
|
|
progress: progress,
|
|
),
|
|
},
|
|
);
|
|
|
|
return;
|
|
}
|
|
|
|
state = state.copyWith(
|
|
uploadItems: {
|
|
...state.uploadItems,
|
|
taskId: DriftUploadStatus(
|
|
taskId: taskId,
|
|
filename: filename,
|
|
progress: progress,
|
|
fileSize: update.expectedFileSize,
|
|
networkSpeedAsString: update.networkSpeedAsString,
|
|
),
|
|
},
|
|
);
|
|
}
|
|
|
|
Future<void> getBackupStatus(String userId) async {
|
|
final [totalCount, backupCount, remainderCount] = await Future.wait([
|
|
_uploadService.getBackupTotalCount(),
|
|
_uploadService.getBackupFinishedCount(userId),
|
|
_uploadService.getBackupRemainderCount(userId),
|
|
]);
|
|
|
|
state = state.copyWith(
|
|
totalCount: totalCount,
|
|
backupCount: backupCount,
|
|
remainderCount: remainderCount,
|
|
);
|
|
}
|
|
|
|
Future<void> startBackup(String userId) {
|
|
return _uploadService.startBackup(userId, _updateEnqueueCount);
|
|
}
|
|
|
|
void _updateEnqueueCount(EnqueueStatus status) {
|
|
state = state.copyWith(
|
|
enqueueCount: status.enqueueCount,
|
|
enqueueTotalCount: status.totalCount,
|
|
);
|
|
}
|
|
|
|
Future<void> cancel() async {
|
|
debugPrint("Canceling backup tasks...");
|
|
state = state.copyWith(
|
|
enqueueCount: 0,
|
|
enqueueTotalCount: 0,
|
|
isCanceling: true,
|
|
);
|
|
|
|
final activeTaskCount = await _uploadService.cancelBackup();
|
|
|
|
if (activeTaskCount > 0) {
|
|
debugPrint(
|
|
"$activeTaskCount tasks left, continuing to cancel...",
|
|
);
|
|
await cancel();
|
|
} else {
|
|
debugPrint("All tasks canceled successfully.");
|
|
// Clear all upload items when cancellation is complete
|
|
state = state.copyWith(
|
|
isCanceling: false,
|
|
uploadItems: {},
|
|
);
|
|
}
|
|
}
|
|
|
|
Future<void> handleBackupResume(String userId) async {
|
|
debugPrint("handleBackupResume");
|
|
final tasks = await _uploadService.getActiveTasks(kBackupGroup);
|
|
debugPrint("Found ${tasks.length} tasks");
|
|
|
|
if (tasks.isEmpty) {
|
|
// Start a new backup queue
|
|
debugPrint("Start a new backup queue");
|
|
await startBackup(userId);
|
|
}
|
|
|
|
debugPrint("Tasks to resume: ${tasks.length}");
|
|
await _uploadService.resumeBackup();
|
|
}
|
|
|
|
@override
|
|
void dispose() {
|
|
_statusSubscription?.cancel();
|
|
_progressSubscription?.cancel();
|
|
super.dispose();
|
|
}
|
|
}
|