mirror of
https://github.com/immich-app/immich.git
synced 2026-05-31 03:45:19 -04:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5fefd13ac8 | |||
| f1f8142409 | |||
| 206992605e |
+66
-34
@@ -542,16 +542,17 @@ private open class MessagesPigeonCodec : StandardMessageCodec() {
|
|||||||
|
|
||||||
/** Generated interface from Pigeon that represents a handler of messages from Flutter. */
|
/** Generated interface from Pigeon that represents a handler of messages from Flutter. */
|
||||||
interface NativeSyncApi {
|
interface NativeSyncApi {
|
||||||
fun shouldFullSync(): Boolean
|
fun shouldFullSync(callback: (Result<Boolean>) -> Unit)
|
||||||
fun getMediaChanges(): SyncDelta
|
fun getMediaChanges(callback: (Result<SyncDelta>) -> Unit)
|
||||||
fun checkpointSync()
|
fun checkpointSync()
|
||||||
fun clearSyncCheckpoint()
|
fun clearSyncCheckpoint()
|
||||||
fun getAssetIdsForAlbum(albumId: String): List<String>
|
fun getAssetIdsForAlbum(albumId: String, callback: (Result<List<String>>) -> Unit)
|
||||||
fun getAlbums(): List<PlatformAlbum>
|
fun getAlbums(callback: (Result<List<PlatformAlbum>>) -> Unit)
|
||||||
fun getAssetsCountSince(albumId: String, timestamp: Long): Long
|
fun getAssetsCountSince(albumId: String, timestamp: Long): Long
|
||||||
fun getAssetsForAlbum(albumId: String, updatedTimeCond: Long?): List<PlatformAsset>
|
fun getAssetsForAlbum(albumId: String, updatedTimeCond: Long?, callback: (Result<List<PlatformAsset>>) -> Unit)
|
||||||
fun hashAssets(assetIds: List<String>, allowNetworkAccess: Boolean, callback: (Result<List<HashResult>>) -> Unit)
|
fun hashAssets(assetIds: List<String>, allowNetworkAccess: Boolean, callback: (Result<List<HashResult>>) -> Unit)
|
||||||
fun cancelHashing()
|
fun cancelHashing()
|
||||||
|
fun cancelSync()
|
||||||
fun getTrashedAssets(): Map<String, List<PlatformAsset>>
|
fun getTrashedAssets(): Map<String, List<PlatformAsset>>
|
||||||
fun restoreFromTrashById(mediaId: String, type: Long, callback: (Result<Boolean>) -> Unit)
|
fun restoreFromTrashById(mediaId: String, type: Long, callback: (Result<Boolean>) -> Unit)
|
||||||
fun getCloudIdForAssetIds(assetIds: List<String>): List<CloudIdResult>
|
fun getCloudIdForAssetIds(assetIds: List<String>): List<CloudIdResult>
|
||||||
@@ -570,27 +571,33 @@ interface NativeSyncApi {
|
|||||||
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.shouldFullSync$separatedMessageChannelSuffix", codec)
|
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.shouldFullSync$separatedMessageChannelSuffix", codec)
|
||||||
if (api != null) {
|
if (api != null) {
|
||||||
channel.setMessageHandler { _, reply ->
|
channel.setMessageHandler { _, reply ->
|
||||||
val wrapped: List<Any?> = try {
|
api.shouldFullSync{ result: Result<Boolean> ->
|
||||||
listOf(api.shouldFullSync())
|
val error = result.exceptionOrNull()
|
||||||
} catch (exception: Throwable) {
|
if (error != null) {
|
||||||
MessagesPigeonUtils.wrapError(exception)
|
reply.reply(MessagesPigeonUtils.wrapError(error))
|
||||||
|
} else {
|
||||||
|
val data = result.getOrNull()
|
||||||
|
reply.reply(MessagesPigeonUtils.wrapResult(data))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
reply.reply(wrapped)
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
channel.setMessageHandler(null)
|
channel.setMessageHandler(null)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
run {
|
run {
|
||||||
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getMediaChanges$separatedMessageChannelSuffix", codec, taskQueue)
|
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getMediaChanges$separatedMessageChannelSuffix", codec)
|
||||||
if (api != null) {
|
if (api != null) {
|
||||||
channel.setMessageHandler { _, reply ->
|
channel.setMessageHandler { _, reply ->
|
||||||
val wrapped: List<Any?> = try {
|
api.getMediaChanges{ result: Result<SyncDelta> ->
|
||||||
listOf(api.getMediaChanges())
|
val error = result.exceptionOrNull()
|
||||||
} catch (exception: Throwable) {
|
if (error != null) {
|
||||||
MessagesPigeonUtils.wrapError(exception)
|
reply.reply(MessagesPigeonUtils.wrapError(error))
|
||||||
|
} else {
|
||||||
|
val data = result.getOrNull()
|
||||||
|
reply.reply(MessagesPigeonUtils.wrapResult(data))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
reply.reply(wrapped)
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
channel.setMessageHandler(null)
|
channel.setMessageHandler(null)
|
||||||
@@ -629,32 +636,38 @@ interface NativeSyncApi {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
run {
|
run {
|
||||||
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetIdsForAlbum$separatedMessageChannelSuffix", codec, taskQueue)
|
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetIdsForAlbum$separatedMessageChannelSuffix", codec)
|
||||||
if (api != null) {
|
if (api != null) {
|
||||||
channel.setMessageHandler { message, reply ->
|
channel.setMessageHandler { message, reply ->
|
||||||
val args = message as List<Any?>
|
val args = message as List<Any?>
|
||||||
val albumIdArg = args[0] as String
|
val albumIdArg = args[0] as String
|
||||||
val wrapped: List<Any?> = try {
|
api.getAssetIdsForAlbum(albumIdArg) { result: Result<List<String>> ->
|
||||||
listOf(api.getAssetIdsForAlbum(albumIdArg))
|
val error = result.exceptionOrNull()
|
||||||
} catch (exception: Throwable) {
|
if (error != null) {
|
||||||
MessagesPigeonUtils.wrapError(exception)
|
reply.reply(MessagesPigeonUtils.wrapError(error))
|
||||||
|
} else {
|
||||||
|
val data = result.getOrNull()
|
||||||
|
reply.reply(MessagesPigeonUtils.wrapResult(data))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
reply.reply(wrapped)
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
channel.setMessageHandler(null)
|
channel.setMessageHandler(null)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
run {
|
run {
|
||||||
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAlbums$separatedMessageChannelSuffix", codec, taskQueue)
|
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAlbums$separatedMessageChannelSuffix", codec)
|
||||||
if (api != null) {
|
if (api != null) {
|
||||||
channel.setMessageHandler { _, reply ->
|
channel.setMessageHandler { _, reply ->
|
||||||
val wrapped: List<Any?> = try {
|
api.getAlbums{ result: Result<List<PlatformAlbum>> ->
|
||||||
listOf(api.getAlbums())
|
val error = result.exceptionOrNull()
|
||||||
} catch (exception: Throwable) {
|
if (error != null) {
|
||||||
MessagesPigeonUtils.wrapError(exception)
|
reply.reply(MessagesPigeonUtils.wrapError(error))
|
||||||
|
} else {
|
||||||
|
val data = result.getOrNull()
|
||||||
|
reply.reply(MessagesPigeonUtils.wrapResult(data))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
reply.reply(wrapped)
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
channel.setMessageHandler(null)
|
channel.setMessageHandler(null)
|
||||||
@@ -679,18 +692,21 @@ interface NativeSyncApi {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
run {
|
run {
|
||||||
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetsForAlbum$separatedMessageChannelSuffix", codec, taskQueue)
|
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetsForAlbum$separatedMessageChannelSuffix", codec)
|
||||||
if (api != null) {
|
if (api != null) {
|
||||||
channel.setMessageHandler { message, reply ->
|
channel.setMessageHandler { message, reply ->
|
||||||
val args = message as List<Any?>
|
val args = message as List<Any?>
|
||||||
val albumIdArg = args[0] as String
|
val albumIdArg = args[0] as String
|
||||||
val updatedTimeCondArg = args[1] as Long?
|
val updatedTimeCondArg = args[1] as Long?
|
||||||
val wrapped: List<Any?> = try {
|
api.getAssetsForAlbum(albumIdArg, updatedTimeCondArg) { result: Result<List<PlatformAsset>> ->
|
||||||
listOf(api.getAssetsForAlbum(albumIdArg, updatedTimeCondArg))
|
val error = result.exceptionOrNull()
|
||||||
} catch (exception: Throwable) {
|
if (error != null) {
|
||||||
MessagesPigeonUtils.wrapError(exception)
|
reply.reply(MessagesPigeonUtils.wrapError(error))
|
||||||
|
} else {
|
||||||
|
val data = result.getOrNull()
|
||||||
|
reply.reply(MessagesPigeonUtils.wrapResult(data))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
reply.reply(wrapped)
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
channel.setMessageHandler(null)
|
channel.setMessageHandler(null)
|
||||||
@@ -733,6 +749,22 @@ interface NativeSyncApi {
|
|||||||
channel.setMessageHandler(null)
|
channel.setMessageHandler(null)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
run {
|
||||||
|
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.cancelSync$separatedMessageChannelSuffix", codec)
|
||||||
|
if (api != null) {
|
||||||
|
channel.setMessageHandler { _, reply ->
|
||||||
|
val wrapped: List<Any?> = try {
|
||||||
|
api.cancelSync()
|
||||||
|
listOf(null)
|
||||||
|
} catch (exception: Throwable) {
|
||||||
|
MessagesPigeonUtils.wrapError(exception)
|
||||||
|
}
|
||||||
|
reply.reply(wrapped)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
channel.setMessageHandler(null)
|
||||||
|
}
|
||||||
|
}
|
||||||
run {
|
run {
|
||||||
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getTrashedAssets$separatedMessageChannelSuffix", codec, taskQueue)
|
val channel = BasicMessageChannel<Any?>(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getTrashedAssets$separatedMessageChannelSuffix", codec, taskQueue)
|
||||||
if (api != null) {
|
if (api != null) {
|
||||||
|
|||||||
@@ -4,7 +4,11 @@ import android.content.Context
|
|||||||
|
|
||||||
|
|
||||||
class NativeSyncApiImpl26(context: Context) : NativeSyncApiImplBase(context), NativeSyncApi {
|
class NativeSyncApiImpl26(context: Context) : NativeSyncApiImplBase(context), NativeSyncApi {
|
||||||
override fun shouldFullSync(): Boolean {
|
override fun shouldFullSync(callback: (Result<Boolean>) -> Unit) {
|
||||||
|
runSync(callback) { shouldFullSync() }
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun shouldFullSync(): Boolean {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -18,7 +22,11 @@ class NativeSyncApiImpl26(context: Context) : NativeSyncApiImplBase(context), Na
|
|||||||
// No-op for Android 10 and below
|
// No-op for Android 10 and below
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun getMediaChanges(): SyncDelta {
|
override fun getMediaChanges(callback: (Result<SyncDelta>) -> Unit) {
|
||||||
|
runSync(callback) { getMediaChanges() }
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun getMediaChanges(): SyncDelta {
|
||||||
throw IllegalStateException("Method not supported on this Android version.")
|
throw IllegalStateException("Method not supported on this Android version.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,8 @@ import android.os.Bundle
|
|||||||
import android.provider.MediaStore
|
import android.provider.MediaStore
|
||||||
import androidx.annotation.RequiresApi
|
import androidx.annotation.RequiresApi
|
||||||
import androidx.annotation.RequiresExtension
|
import androidx.annotation.RequiresExtension
|
||||||
|
import kotlinx.coroutines.currentCoroutineContext
|
||||||
|
import kotlinx.coroutines.ensureActive
|
||||||
import kotlinx.serialization.json.Json
|
import kotlinx.serialization.json.Json
|
||||||
|
|
||||||
@RequiresApi(Build.VERSION_CODES.Q)
|
@RequiresApi(Build.VERSION_CODES.Q)
|
||||||
@@ -35,7 +37,11 @@ class NativeSyncApiImpl30(context: Context) : NativeSyncApiImplBase(context), Na
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun shouldFullSync(): Boolean =
|
override fun shouldFullSync(callback: (Result<Boolean>) -> Unit) {
|
||||||
|
runSync(callback) { shouldFullSync() }
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun shouldFullSync(): Boolean =
|
||||||
MediaStore.getVersion(ctx) != prefs.getString(SHARED_PREF_MEDIA_STORE_VERSION_KEY, null)
|
MediaStore.getVersion(ctx) != prefs.getString(SHARED_PREF_MEDIA_STORE_VERSION_KEY, null)
|
||||||
|
|
||||||
override fun checkpointSync() {
|
override fun checkpointSync() {
|
||||||
@@ -49,7 +55,11 @@ class NativeSyncApiImpl30(context: Context) : NativeSyncApiImplBase(context), Na
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun getMediaChanges(): SyncDelta {
|
override fun getMediaChanges(callback: (Result<SyncDelta>) -> Unit) {
|
||||||
|
runSync(callback) { getMediaChanges() }
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun getMediaChanges(): SyncDelta {
|
||||||
val genMap = getSavedGenerationMap()
|
val genMap = getSavedGenerationMap()
|
||||||
val currentVolumes = MediaStore.getExternalVolumeNames(ctx)
|
val currentVolumes = MediaStore.getExternalVolumeNames(ctx)
|
||||||
val changed = mutableListOf<PlatformAsset>()
|
val changed = mutableListOf<PlatformAsset>()
|
||||||
@@ -58,6 +68,7 @@ class NativeSyncApiImpl30(context: Context) : NativeSyncApiImplBase(context), Na
|
|||||||
var hasChanges = genMap.keys != currentVolumes
|
var hasChanges = genMap.keys != currentVolumes
|
||||||
|
|
||||||
for (volume in currentVolumes) {
|
for (volume in currentVolumes) {
|
||||||
|
currentCoroutineContext().ensureActive()
|
||||||
val currentGen = MediaStore.getGeneration(ctx, volume)
|
val currentGen = MediaStore.getGeneration(ctx, volume)
|
||||||
val storedGen = genMap[volume] ?: 0
|
val storedGen = genMap[volume] ?: 0
|
||||||
if (currentGen <= storedGen) {
|
if (currentGen <= storedGen) {
|
||||||
|
|||||||
@@ -45,12 +45,14 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa
|
|||||||
private val ctx: Context = context.applicationContext
|
private val ctx: Context = context.applicationContext
|
||||||
|
|
||||||
private var hashTask: Job? = null
|
private var hashTask: Job? = null
|
||||||
|
private var syncJob: Job? = null
|
||||||
private val mediaTrashDelegate = MediaTrashDelegate(ctx)
|
private val mediaTrashDelegate = MediaTrashDelegate(ctx)
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
private const val MAX_CONCURRENT_HASH_OPERATIONS = 16
|
private const val MAX_CONCURRENT_HASH_OPERATIONS = 16
|
||||||
private val hashSemaphore = Semaphore(MAX_CONCURRENT_HASH_OPERATIONS)
|
private val hashSemaphore = Semaphore(MAX_CONCURRENT_HASH_OPERATIONS)
|
||||||
private const val HASHING_CANCELLED_CODE = "HASH_CANCELLED"
|
private const val HASHING_CANCELLED_CODE = "HASH_CANCELLED"
|
||||||
|
private const val SYNC_CANCELLED_CODE = "SYNC_CANCELLED"
|
||||||
|
|
||||||
// MediaStore.Files.FileColumns.SPECIAL_FORMAT — S Extensions 21+
|
// MediaStore.Files.FileColumns.SPECIAL_FORMAT — S Extensions 21+
|
||||||
// https://developer.android.com/reference/android/provider/MediaStore.Files.FileColumns#SPECIAL_FORMAT
|
// https://developer.android.com/reference/android/provider/MediaStore.Files.FileColumns#SPECIAL_FORMAT
|
||||||
@@ -295,7 +297,11 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa
|
|||||||
return PlatformAssetPlaybackStyle.IMAGE
|
return PlatformAssetPlaybackStyle.IMAGE
|
||||||
}
|
}
|
||||||
|
|
||||||
fun getAlbums(): List<PlatformAlbum> {
|
fun getAlbums(callback: (Result<List<PlatformAlbum>>) -> Unit) {
|
||||||
|
runSync(callback) { getAlbums() }
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun getAlbums(): List<PlatformAlbum> {
|
||||||
val albums = mutableListOf<PlatformAlbum>()
|
val albums = mutableListOf<PlatformAlbum>()
|
||||||
val albumsCount = mutableMapOf<String, Int>()
|
val albumsCount = mutableMapOf<String, Int>()
|
||||||
|
|
||||||
@@ -322,6 +328,7 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa
|
|||||||
cursor.getColumnIndexOrThrow(MediaStore.Files.FileColumns.DATE_MODIFIED)
|
cursor.getColumnIndexOrThrow(MediaStore.Files.FileColumns.DATE_MODIFIED)
|
||||||
|
|
||||||
while (cursor.moveToNext()) {
|
while (cursor.moveToNext()) {
|
||||||
|
currentCoroutineContext().ensureActive()
|
||||||
val id = cursor.getString(bucketIdColumn)
|
val id = cursor.getString(bucketIdColumn)
|
||||||
|
|
||||||
val count = albumsCount.getOrDefault(id, 0)
|
val count = albumsCount.getOrDefault(id, 0)
|
||||||
@@ -342,7 +349,11 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa
|
|||||||
.sortedBy { it.id }
|
.sortedBy { it.id }
|
||||||
}
|
}
|
||||||
|
|
||||||
fun getAssetIdsForAlbum(albumId: String): List<String> {
|
fun getAssetIdsForAlbum(albumId: String, callback: (Result<List<String>>) -> Unit) {
|
||||||
|
runSync(callback) { getAssetIdsForAlbum(albumId) }
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun getAssetIdsForAlbum(albumId: String): List<String> {
|
||||||
val projection = arrayOf(MediaStore.MediaColumns._ID)
|
val projection = arrayOf(MediaStore.MediaColumns._ID)
|
||||||
|
|
||||||
return getCursor(
|
return getCursor(
|
||||||
@@ -366,7 +377,11 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa
|
|||||||
)?.use { cursor -> cursor.count.toLong() } ?: 0L
|
)?.use { cursor -> cursor.count.toLong() } ?: 0L
|
||||||
|
|
||||||
|
|
||||||
fun getAssetsForAlbum(albumId: String, updatedTimeCond: Long?): List<PlatformAsset> {
|
fun getAssetsForAlbum(albumId: String, updatedTimeCond: Long?, callback: (Result<List<PlatformAsset>>) -> Unit) {
|
||||||
|
runSync(callback) { getAssetsForAlbum(albumId, updatedTimeCond) }
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun getAssetsForAlbum(albumId: String, updatedTimeCond: Long?): List<PlatformAsset> {
|
||||||
var selection = "$BUCKET_SELECTION AND $MEDIA_SELECTION"
|
var selection = "$BUCKET_SELECTION AND $MEDIA_SELECTION"
|
||||||
val selectionArgs = mutableListOf(albumId, *MEDIA_SELECTION_ARGS)
|
val selectionArgs = mutableListOf(albumId, *MEDIA_SELECTION_ARGS)
|
||||||
|
|
||||||
@@ -451,6 +466,24 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa
|
|||||||
hashTask = null
|
hashTask = null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun cancelSync() {
|
||||||
|
syncJob?.cancel()
|
||||||
|
syncJob = null
|
||||||
|
}
|
||||||
|
|
||||||
|
protected fun <T> runSync(callback: (Result<T>) -> Unit, work: suspend () -> T) {
|
||||||
|
syncJob?.cancel()
|
||||||
|
syncJob = CoroutineScope(Dispatchers.IO).launch {
|
||||||
|
try {
|
||||||
|
completeWhenActive(callback, Result.success(work()))
|
||||||
|
} catch (e: CancellationException) {
|
||||||
|
completeWhenActive(callback, Result.failure(FlutterError(SYNC_CANCELLED_CODE, "Sync cancelled", null)))
|
||||||
|
} catch (e: Exception) {
|
||||||
|
completeWhenActive(callback, Result.failure(e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fun restoreFromTrashById(mediaId: String, type: Long, callback: (Result<Boolean>) -> Unit) {
|
fun restoreFromTrashById(mediaId: String, type: Long, callback: (Result<Boolean>) -> Unit) {
|
||||||
mediaTrashDelegate.restoreFromTrashById(mediaId, type) { completeWhenActive(callback, it) }
|
mediaTrashDelegate.restoreFromTrashById(mediaId, type) { completeWhenActive(callback, it) }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,154 @@
|
|||||||
|
import 'dart:async';
|
||||||
|
|
||||||
|
import 'package:drift/drift.dart' show Value;
|
||||||
|
import 'package:flutter_test/flutter_test.dart';
|
||||||
|
import 'package:immich_mobile/domain/models/store.model.dart';
|
||||||
|
import 'package:immich_mobile/domain/utils/background_sync.dart';
|
||||||
|
import 'package:immich_mobile/entities/store.entity.dart';
|
||||||
|
import 'package:immich_mobile/infrastructure/entities/user.entity.drift.dart';
|
||||||
|
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
|
||||||
|
import 'package:immich_mobile/main.dart' as app;
|
||||||
|
import 'package:immich_mobile/services/api.service.dart';
|
||||||
|
import 'package:immich_mobile/utils/bootstrap.dart';
|
||||||
|
import 'package:immich_mobile/wm_executor.dart';
|
||||||
|
import 'package:integration_test/integration_test.dart';
|
||||||
|
import 'package:openapi/api.dart';
|
||||||
|
|
||||||
|
import 'test_utils/fake_immich_server.dart';
|
||||||
|
|
||||||
|
void main() {
|
||||||
|
final binding = IntegrationTestWidgetsFlutterBinding.ensureInitialized();
|
||||||
|
// These tests do real I/O without pumping a widget tree, so disable the fake async clock
|
||||||
|
binding.framePolicy = LiveTestWidgetsFlutterBindingFramePolicy.fullyLive;
|
||||||
|
|
||||||
|
late Drift drift;
|
||||||
|
late FakeImmichServer server;
|
||||||
|
|
||||||
|
setUpAll(() async {
|
||||||
|
await app.initApp();
|
||||||
|
(drift, _) = await Bootstrap.initDomain();
|
||||||
|
});
|
||||||
|
|
||||||
|
setUp(() async {
|
||||||
|
await workerManagerPatch.init(dynamicSpawning: true);
|
||||||
|
server = await FakeImmichServer.start();
|
||||||
|
await ApiService().resolveAndSetEndpoint(server.endpoint);
|
||||||
|
await drift.delete(drift.userEntity).go();
|
||||||
|
await Store.delete(StoreKey.syncMigrationStatus);
|
||||||
|
});
|
||||||
|
|
||||||
|
tearDown(() async {
|
||||||
|
await workerManagerPatch.dispose();
|
||||||
|
await server.close();
|
||||||
|
await Store.delete(StoreKey.serverEndpoint);
|
||||||
|
await Store.delete(StoreKey.syncMigrationStatus);
|
||||||
|
});
|
||||||
|
|
||||||
|
void sendUser(SyncStream stream, String id, String name) {
|
||||||
|
stream.send(
|
||||||
|
type: SyncEntityType.userV1.value,
|
||||||
|
data: SyncUserV1(
|
||||||
|
id: id,
|
||||||
|
name: name,
|
||||||
|
email: '$id@test.com',
|
||||||
|
hasProfileImage: false,
|
||||||
|
deletedAt: null,
|
||||||
|
profileChangedAt: DateTime.utc(2025),
|
||||||
|
).toJson(),
|
||||||
|
ack: id,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<bool> dbReadable() async {
|
||||||
|
try {
|
||||||
|
await drift.customSelect('SELECT 1').get().timeout(const Duration(seconds: 5));
|
||||||
|
return true;
|
||||||
|
} catch (_) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<int> userCount() async => (await drift.select(drift.userEntity).get()).length;
|
||||||
|
|
||||||
|
// Starts a remote sync and resolves once its /sync/stream request is open.
|
||||||
|
Future<(Future<bool>, SyncStream)> startSync() async {
|
||||||
|
final sync = BackgroundSyncManager().syncRemote();
|
||||||
|
final stream = await server.streamOpened.timeout(
|
||||||
|
const Duration(seconds: 30),
|
||||||
|
onTimeout: () => fail('sync isolate never opened /sync/stream'),
|
||||||
|
);
|
||||||
|
return (sync, stream);
|
||||||
|
}
|
||||||
|
|
||||||
|
testWidgets('a full sync ingests streamed events into the shared DB', (tester) async {
|
||||||
|
expect(await userCount(), 0);
|
||||||
|
|
||||||
|
final (sync, stream) = await startSync();
|
||||||
|
|
||||||
|
sendUser(stream, 'u1', 'Alice');
|
||||||
|
sendUser(stream, 'u2', 'Bob');
|
||||||
|
await stream.close();
|
||||||
|
|
||||||
|
final result = await sync.timeout(
|
||||||
|
const Duration(seconds: 30),
|
||||||
|
onTimeout: () => fail('sync did not complete after the stream ended'),
|
||||||
|
);
|
||||||
|
expect(result, isTrue);
|
||||||
|
expect(await userCount(), 2);
|
||||||
|
expect(server.ackRequests, greaterThan(0));
|
||||||
|
});
|
||||||
|
|
||||||
|
testWidgets('disposing the pool during an in-flight sync drains promptly', (tester) async {
|
||||||
|
final (sync, _) = await startSync();
|
||||||
|
|
||||||
|
final sw = Stopwatch()..start();
|
||||||
|
await workerManagerPatch.dispose().timeout(
|
||||||
|
const Duration(seconds: 15),
|
||||||
|
onTimeout: () => fail('dispose() hung — worker did not drain and exit'),
|
||||||
|
);
|
||||||
|
expect(sw.elapsed, lessThan(const Duration(seconds: 10)), reason: 'abort-driven, not socket-timeout bound');
|
||||||
|
|
||||||
|
expect(await sync.timeout(const Duration(seconds: 5), onTimeout: () => false), isFalse);
|
||||||
|
});
|
||||||
|
|
||||||
|
testWidgets('tearing down a worker blocked mid-write leaves the DB usable', (tester) async {
|
||||||
|
final (sync, stream) = await startSync();
|
||||||
|
|
||||||
|
// Hold an exclusive write transaction so the worker's write is blocked. The lock is taken only
|
||||||
|
// after the stream opens to avoid blocking the worker's own startup DB reads.
|
||||||
|
final releaseTxn = Completer<void>();
|
||||||
|
final txnHeld = Completer<void>();
|
||||||
|
final txn = drift.transaction(() async {
|
||||||
|
await drift.into(drift.userEntity).insert(
|
||||||
|
UserEntityCompanion.insert(
|
||||||
|
id: 'holder',
|
||||||
|
name: 'holder',
|
||||||
|
email: 'holder@test.com',
|
||||||
|
hasProfileImage: const Value(false),
|
||||||
|
profileChangedAt: Value(DateTime.utc(2025)),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
txnHeld.complete();
|
||||||
|
await releaseTxn.future;
|
||||||
|
});
|
||||||
|
await txnHeld.future;
|
||||||
|
|
||||||
|
sendUser(stream, 'u1', 'Alice');
|
||||||
|
await stream.close();
|
||||||
|
|
||||||
|
// dispose() can only finish once the worker unwinds, which is blocked on the
|
||||||
|
// lock — so start it, release the lock, then await completion.
|
||||||
|
final disposed = workerManagerPatch.dispose();
|
||||||
|
releaseTxn.complete();
|
||||||
|
await txn;
|
||||||
|
await disposed.timeout(
|
||||||
|
const Duration(seconds: 15),
|
||||||
|
onTimeout: () => fail('dispose() hung after releasing the write lock'),
|
||||||
|
);
|
||||||
|
await sync.timeout(const Duration(seconds: 5), onTimeout: () => false);
|
||||||
|
|
||||||
|
expect(await dbReadable(), isTrue);
|
||||||
|
final users = await drift.select(drift.userEntity).get();
|
||||||
|
expect(users.map((u) => u.id), contains('holder'));
|
||||||
|
});
|
||||||
|
}
|
||||||
@@ -0,0 +1,115 @@
|
|||||||
|
import 'dart:async';
|
||||||
|
import 'dart:convert';
|
||||||
|
import 'dart:io';
|
||||||
|
|
||||||
|
/// A dummy localhost server that implements only the endpoints that remote-sync touches.
|
||||||
|
class FakeImmichServer {
|
||||||
|
FakeImmichServer._(this._server, this.version);
|
||||||
|
|
||||||
|
final HttpServer _server;
|
||||||
|
final (int, int, int) version;
|
||||||
|
|
||||||
|
final Completer<SyncStream> _streamOpened = Completer<SyncStream>();
|
||||||
|
|
||||||
|
int ackRequests = 0;
|
||||||
|
|
||||||
|
String get endpoint => 'http://${_server.address.host}:${_server.port}/api';
|
||||||
|
|
||||||
|
/// Resolves when the sync isolate opens `POST /sync/stream`.
|
||||||
|
Future<SyncStream> get streamOpened => _streamOpened.future;
|
||||||
|
|
||||||
|
static Future<FakeImmichServer> start({(int, int, int) version = (3, 0, 0)}) async {
|
||||||
|
final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0);
|
||||||
|
final fake = FakeImmichServer._(server, version);
|
||||||
|
fake._listen();
|
||||||
|
return fake;
|
||||||
|
}
|
||||||
|
|
||||||
|
void _listen() {
|
||||||
|
// A connection torn down mid-write during teardown is expected
|
||||||
|
_server.listen((request) => unawaited(_route(request).catchError((_) {})));
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _route(HttpRequest request) async {
|
||||||
|
final method = request.method;
|
||||||
|
final path = request.uri.path;
|
||||||
|
|
||||||
|
if (method == 'GET' && path == '/api/server/ping') {
|
||||||
|
return _respondJson(request, {'res': 'pong'});
|
||||||
|
}
|
||||||
|
if (method == 'GET' && path == '/api/server/version') {
|
||||||
|
final (major, minor, patch) = version;
|
||||||
|
return _respondJson(request, {'major': major, 'minor': minor, 'patch': patch});
|
||||||
|
}
|
||||||
|
if (path == '/api/sync/ack') {
|
||||||
|
if (method != 'DELETE') {
|
||||||
|
ackRequests++;
|
||||||
|
}
|
||||||
|
return _respondEmpty(request);
|
||||||
|
}
|
||||||
|
if (method == 'POST' && path == '/api/sync/stream') {
|
||||||
|
return _openSyncStream(request);
|
||||||
|
}
|
||||||
|
return _respondEmpty(request, status: HttpStatus.notFound);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _openSyncStream(HttpRequest request) async {
|
||||||
|
await request.drain<void>();
|
||||||
|
request.response
|
||||||
|
..statusCode = HttpStatus.ok
|
||||||
|
..headers.contentType = ContentType('application', 'jsonlines+json')
|
||||||
|
..contentLength = -1 // chunked: stays open to stream incrementally
|
||||||
|
..bufferOutput = false;
|
||||||
|
// Flush headers so the client's send() resolves and enters its read loop.
|
||||||
|
await request.response.flush();
|
||||||
|
if (!_streamOpened.isCompleted) {
|
||||||
|
_streamOpened.complete(SyncStream._(request.response));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _respondJson(HttpRequest request, Object body) async {
|
||||||
|
await request.drain<void>();
|
||||||
|
request.response
|
||||||
|
..statusCode = HttpStatus.ok
|
||||||
|
..headers.contentType = ContentType.json
|
||||||
|
..write(jsonEncode(body));
|
||||||
|
await request.response.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _respondEmpty(HttpRequest request, {int status = HttpStatus.ok}) async {
|
||||||
|
await request.drain<void>();
|
||||||
|
request.response.statusCode = status;
|
||||||
|
await request.response.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> close() async {
|
||||||
|
if (_streamOpened.isCompleted) {
|
||||||
|
await (await _streamOpened.future).close();
|
||||||
|
}
|
||||||
|
await _server.close(force: true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle to the open `/sync/stream` response: push jsonlines events, then end.
|
||||||
|
class SyncStream {
|
||||||
|
SyncStream._(this._response);
|
||||||
|
|
||||||
|
final HttpResponse _response;
|
||||||
|
bool _closed = false;
|
||||||
|
|
||||||
|
/// [data] should be a Sync*V1 DTO's `toJson()` so the parser's `fromJson` round-trips it.
|
||||||
|
void send({required String type, required Object data, required String ack}) {
|
||||||
|
if (_closed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
_response.write('${jsonEncode({'type': type, 'data': data, 'ack': ack})}\n');
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> close() async {
|
||||||
|
if (_closed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
_closed = true;
|
||||||
|
await _response.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -121,8 +121,8 @@ class BackgroundWorker: BackgroundWorkerBgHostApi {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Cancels the currently running background task, either due to timeout or external request.
|
* Cancels the currently running background task, either due to timeout or external request.
|
||||||
* Sends a cancel signal to the Flutter side and sets up a fallback timer to ensure
|
* Only tears down the engine after Dart confirms it's drained. If Dart overruns iOS's grace window,
|
||||||
* the completion handler is eventually called even if Flutter doesn't respond.
|
* the expiration handler still calls setTaskCompleted and iOS suspends us.
|
||||||
*/
|
*/
|
||||||
func close() {
|
func close() {
|
||||||
if isComplete {
|
if isComplete {
|
||||||
@@ -132,12 +132,6 @@ class BackgroundWorker: BackgroundWorkerBgHostApi {
|
|||||||
flutterApi?.cancel { result in
|
flutterApi?.cancel { result in
|
||||||
self.complete(success: false)
|
self.complete(success: false)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fallback safety mechanism: ensure completion is called within 2 seconds
|
|
||||||
// This prevents the background task from hanging indefinitely if Flutter doesn't respond
|
|
||||||
Timer.scheduledTimer(withTimeInterval: 2, repeats: false) { _ in
|
|
||||||
self.complete(success: false)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Generated
+58
-42
@@ -526,16 +526,17 @@ class MessagesPigeonCodec: FlutterStandardMessageCodec, @unchecked Sendable {
|
|||||||
|
|
||||||
/// Generated protocol from Pigeon that represents a handler of messages from Flutter.
|
/// Generated protocol from Pigeon that represents a handler of messages from Flutter.
|
||||||
protocol NativeSyncApi {
|
protocol NativeSyncApi {
|
||||||
func shouldFullSync() throws -> Bool
|
func shouldFullSync(completion: @escaping (Result<Bool, Error>) -> Void)
|
||||||
func getMediaChanges() throws -> SyncDelta
|
func getMediaChanges(completion: @escaping (Result<SyncDelta, Error>) -> Void)
|
||||||
func checkpointSync() throws
|
func checkpointSync() throws
|
||||||
func clearSyncCheckpoint() throws
|
func clearSyncCheckpoint() throws
|
||||||
func getAssetIdsForAlbum(albumId: String) throws -> [String]
|
func getAssetIdsForAlbum(albumId: String, completion: @escaping (Result<[String], Error>) -> Void)
|
||||||
func getAlbums() throws -> [PlatformAlbum]
|
func getAlbums(completion: @escaping (Result<[PlatformAlbum], Error>) -> Void)
|
||||||
func getAssetsCountSince(albumId: String, timestamp: Int64) throws -> Int64
|
func getAssetsCountSince(albumId: String, timestamp: Int64) throws -> Int64
|
||||||
func getAssetsForAlbum(albumId: String, updatedTimeCond: Int64?) throws -> [PlatformAsset]
|
func getAssetsForAlbum(albumId: String, updatedTimeCond: Int64?, completion: @escaping (Result<[PlatformAsset], Error>) -> Void)
|
||||||
func hashAssets(assetIds: [String], allowNetworkAccess: Bool, completion: @escaping (Result<[HashResult], Error>) -> Void)
|
func hashAssets(assetIds: [String], allowNetworkAccess: Bool, completion: @escaping (Result<[HashResult], Error>) -> Void)
|
||||||
func cancelHashing() throws
|
func cancelHashing() throws
|
||||||
|
func cancelSync() throws
|
||||||
func getTrashedAssets() throws -> [String: [PlatformAsset]]
|
func getTrashedAssets() throws -> [String: [PlatformAsset]]
|
||||||
func restoreFromTrashById(mediaId: String, type: Int64, completion: @escaping (Result<Bool, Error>) -> Void)
|
func restoreFromTrashById(mediaId: String, type: Int64, completion: @escaping (Result<Bool, Error>) -> Void)
|
||||||
func getCloudIdForAssetIds(assetIds: [String]) throws -> [CloudIdResult]
|
func getCloudIdForAssetIds(assetIds: [String]) throws -> [CloudIdResult]
|
||||||
@@ -555,26 +556,28 @@ class NativeSyncApiSetup {
|
|||||||
let shouldFullSyncChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.shouldFullSync\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
|
let shouldFullSyncChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.shouldFullSync\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
|
||||||
if let api = api {
|
if let api = api {
|
||||||
shouldFullSyncChannel.setMessageHandler { _, reply in
|
shouldFullSyncChannel.setMessageHandler { _, reply in
|
||||||
do {
|
api.shouldFullSync { result in
|
||||||
let result = try api.shouldFullSync()
|
switch result {
|
||||||
reply(wrapResult(result))
|
case .success(let res):
|
||||||
} catch {
|
reply(wrapResult(res))
|
||||||
reply(wrapError(error))
|
case .failure(let error):
|
||||||
|
reply(wrapError(error))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
shouldFullSyncChannel.setMessageHandler(nil)
|
shouldFullSyncChannel.setMessageHandler(nil)
|
||||||
}
|
}
|
||||||
let getMediaChangesChannel = taskQueue == nil
|
let getMediaChangesChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getMediaChanges\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
|
||||||
? FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getMediaChanges\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
|
|
||||||
: FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getMediaChanges\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec, taskQueue: taskQueue)
|
|
||||||
if let api = api {
|
if let api = api {
|
||||||
getMediaChangesChannel.setMessageHandler { _, reply in
|
getMediaChangesChannel.setMessageHandler { _, reply in
|
||||||
do {
|
api.getMediaChanges { result in
|
||||||
let result = try api.getMediaChanges()
|
switch result {
|
||||||
reply(wrapResult(result))
|
case .success(let res):
|
||||||
} catch {
|
reply(wrapResult(res))
|
||||||
reply(wrapError(error))
|
case .failure(let error):
|
||||||
|
reply(wrapError(error))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -606,33 +609,33 @@ class NativeSyncApiSetup {
|
|||||||
} else {
|
} else {
|
||||||
clearSyncCheckpointChannel.setMessageHandler(nil)
|
clearSyncCheckpointChannel.setMessageHandler(nil)
|
||||||
}
|
}
|
||||||
let getAssetIdsForAlbumChannel = taskQueue == nil
|
let getAssetIdsForAlbumChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetIdsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
|
||||||
? FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetIdsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
|
|
||||||
: FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetIdsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec, taskQueue: taskQueue)
|
|
||||||
if let api = api {
|
if let api = api {
|
||||||
getAssetIdsForAlbumChannel.setMessageHandler { message, reply in
|
getAssetIdsForAlbumChannel.setMessageHandler { message, reply in
|
||||||
let args = message as! [Any?]
|
let args = message as! [Any?]
|
||||||
let albumIdArg = args[0] as! String
|
let albumIdArg = args[0] as! String
|
||||||
do {
|
api.getAssetIdsForAlbum(albumId: albumIdArg) { result in
|
||||||
let result = try api.getAssetIdsForAlbum(albumId: albumIdArg)
|
switch result {
|
||||||
reply(wrapResult(result))
|
case .success(let res):
|
||||||
} catch {
|
reply(wrapResult(res))
|
||||||
reply(wrapError(error))
|
case .failure(let error):
|
||||||
|
reply(wrapError(error))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
getAssetIdsForAlbumChannel.setMessageHandler(nil)
|
getAssetIdsForAlbumChannel.setMessageHandler(nil)
|
||||||
}
|
}
|
||||||
let getAlbumsChannel = taskQueue == nil
|
let getAlbumsChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAlbums\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
|
||||||
? FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAlbums\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
|
|
||||||
: FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAlbums\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec, taskQueue: taskQueue)
|
|
||||||
if let api = api {
|
if let api = api {
|
||||||
getAlbumsChannel.setMessageHandler { _, reply in
|
getAlbumsChannel.setMessageHandler { _, reply in
|
||||||
do {
|
api.getAlbums { result in
|
||||||
let result = try api.getAlbums()
|
switch result {
|
||||||
reply(wrapResult(result))
|
case .success(let res):
|
||||||
} catch {
|
reply(wrapResult(res))
|
||||||
reply(wrapError(error))
|
case .failure(let error):
|
||||||
|
reply(wrapError(error))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -656,19 +659,19 @@ class NativeSyncApiSetup {
|
|||||||
} else {
|
} else {
|
||||||
getAssetsCountSinceChannel.setMessageHandler(nil)
|
getAssetsCountSinceChannel.setMessageHandler(nil)
|
||||||
}
|
}
|
||||||
let getAssetsForAlbumChannel = taskQueue == nil
|
let getAssetsForAlbumChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
|
||||||
? FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
|
|
||||||
: FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec, taskQueue: taskQueue)
|
|
||||||
if let api = api {
|
if let api = api {
|
||||||
getAssetsForAlbumChannel.setMessageHandler { message, reply in
|
getAssetsForAlbumChannel.setMessageHandler { message, reply in
|
||||||
let args = message as! [Any?]
|
let args = message as! [Any?]
|
||||||
let albumIdArg = args[0] as! String
|
let albumIdArg = args[0] as! String
|
||||||
let updatedTimeCondArg: Int64? = nilOrValue(args[1])
|
let updatedTimeCondArg: Int64? = nilOrValue(args[1])
|
||||||
do {
|
api.getAssetsForAlbum(albumId: albumIdArg, updatedTimeCond: updatedTimeCondArg) { result in
|
||||||
let result = try api.getAssetsForAlbum(albumId: albumIdArg, updatedTimeCond: updatedTimeCondArg)
|
switch result {
|
||||||
reply(wrapResult(result))
|
case .success(let res):
|
||||||
} catch {
|
reply(wrapResult(res))
|
||||||
reply(wrapError(error))
|
case .failure(let error):
|
||||||
|
reply(wrapError(error))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -707,6 +710,19 @@ class NativeSyncApiSetup {
|
|||||||
} else {
|
} else {
|
||||||
cancelHashingChannel.setMessageHandler(nil)
|
cancelHashingChannel.setMessageHandler(nil)
|
||||||
}
|
}
|
||||||
|
let cancelSyncChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.cancelSync\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
|
||||||
|
if let api = api {
|
||||||
|
cancelSyncChannel.setMessageHandler { _, reply in
|
||||||
|
do {
|
||||||
|
try api.cancelSync()
|
||||||
|
reply(wrapResult(nil))
|
||||||
|
} catch {
|
||||||
|
reply(wrapError(error))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cancelSyncChannel.setMessageHandler(nil)
|
||||||
|
}
|
||||||
let getTrashedAssetsChannel = taskQueue == nil
|
let getTrashedAssetsChannel = taskQueue == nil
|
||||||
? FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getTrashedAssets\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
|
? FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getTrashedAssets\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec)
|
||||||
: FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getTrashedAssets\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec, taskQueue: taskQueue)
|
: FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getTrashedAssets\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec, taskQueue: taskQueue)
|
||||||
|
|||||||
@@ -39,6 +39,9 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
|
|||||||
private static let hashCancelledCode = "HASH_CANCELLED"
|
private static let hashCancelledCode = "HASH_CANCELLED"
|
||||||
private static let hashCancelled = Result<[HashResult], Error>.failure(PigeonError(code: hashCancelledCode, message: "Hashing cancelled", details: nil))
|
private static let hashCancelled = Result<[HashResult], Error>.failure(PigeonError(code: hashCancelledCode, message: "Hashing cancelled", details: nil))
|
||||||
|
|
||||||
|
private var syncTask: Task<Void?, Error>?
|
||||||
|
private static let syncCancelledCode = "SYNC_CANCELLED"
|
||||||
|
private static let syncCancelled = PigeonError(code: syncCancelledCode, message: "Sync cancelled", details: nil)
|
||||||
|
|
||||||
init(with defaults: UserDefaults = .standard) {
|
init(with defaults: UserDefaults = .standard) {
|
||||||
self.defaults = defaults
|
self.defaults = defaults
|
||||||
@@ -71,7 +74,11 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
|
|||||||
saveChangeToken(token: PHPhotoLibrary.shared().currentChangeToken)
|
saveChangeToken(token: PHPhotoLibrary.shared().currentChangeToken)
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldFullSync() -> Bool {
|
func shouldFullSync(completion: @escaping (Result<Bool, Error>) -> Void) {
|
||||||
|
runSync(completion) { $0.shouldFullSync() }
|
||||||
|
}
|
||||||
|
|
||||||
|
private func shouldFullSync() -> Bool {
|
||||||
guard #available(iOS 16, *),
|
guard #available(iOS 16, *),
|
||||||
PHPhotoLibrary.authorizationStatus(for: .readWrite) == .authorized,
|
PHPhotoLibrary.authorizationStatus(for: .readWrite) == .authorized,
|
||||||
let storedToken = getChangeToken() else {
|
let storedToken = getChangeToken() else {
|
||||||
@@ -87,12 +94,17 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func getAlbums() throws -> [PlatformAlbum] {
|
func getAlbums(completion: @escaping (Result<[PlatformAlbum], Error>) -> Void) {
|
||||||
|
runSync(completion) { try $0.getAlbums() }
|
||||||
|
}
|
||||||
|
|
||||||
|
private func getAlbums() throws -> [PlatformAlbum] {
|
||||||
var albums: [PlatformAlbum] = []
|
var albums: [PlatformAlbum] = []
|
||||||
|
|
||||||
albumTypes.forEach { type in
|
for type in albumTypes {
|
||||||
let collections = PHAssetCollection.fetchAssetCollections(with: type, subtype: .any, options: nil)
|
let collections = PHAssetCollection.fetchAssetCollections(with: type, subtype: .any, options: nil)
|
||||||
for i in 0..<collections.count {
|
for i in 0..<collections.count {
|
||||||
|
try Task.checkCancellation()
|
||||||
let album = collections.object(at: i)
|
let album = collections.object(at: i)
|
||||||
|
|
||||||
// Ignore recovered album
|
// Ignore recovered album
|
||||||
@@ -126,7 +138,11 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
|
|||||||
return albums.sorted { $0.id < $1.id }
|
return albums.sorted { $0.id < $1.id }
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMediaChanges() throws -> SyncDelta {
|
func getMediaChanges(completion: @escaping (Result<SyncDelta, Error>) -> Void) {
|
||||||
|
runSync(completion) { try $0.getMediaChanges() }
|
||||||
|
}
|
||||||
|
|
||||||
|
private func getMediaChanges() throws -> SyncDelta {
|
||||||
guard #available(iOS 16, *) else {
|
guard #available(iOS 16, *) else {
|
||||||
throw PigeonError(code: "UNSUPPORTED_OS", message: "This feature requires iOS 16 or later.", details: nil)
|
throw PigeonError(code: "UNSUPPORTED_OS", message: "This feature requires iOS 16 or later.", details: nil)
|
||||||
}
|
}
|
||||||
@@ -146,51 +162,49 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
|
|||||||
return SyncDelta(hasChanges: false, updates: [], deletes: [], assetAlbums: [:])
|
return SyncDelta(hasChanges: false, updates: [], deletes: [], assetAlbums: [:])
|
||||||
}
|
}
|
||||||
|
|
||||||
do {
|
let changes = try PHPhotoLibrary.shared().fetchPersistentChanges(since: storedToken)
|
||||||
let changes = try PHPhotoLibrary.shared().fetchPersistentChanges(since: storedToken)
|
|
||||||
|
var updatedAssets: Set<AssetWrapper> = []
|
||||||
|
var deletedAssets: Set<String> = []
|
||||||
|
|
||||||
|
for change in changes {
|
||||||
|
try Task.checkCancellation()
|
||||||
|
guard let details = try? change.changeDetails(for: PHObjectType.asset) else { continue }
|
||||||
|
|
||||||
var updatedAssets: Set<AssetWrapper> = []
|
let updated = details.updatedLocalIdentifiers.union(details.insertedLocalIdentifiers)
|
||||||
var deletedAssets: Set<String> = []
|
deletedAssets.formUnion(details.deletedLocalIdentifiers)
|
||||||
|
|
||||||
for change in changes {
|
if (updated.isEmpty) { continue }
|
||||||
guard let details = try? change.changeDetails(for: PHObjectType.asset) else { continue }
|
|
||||||
|
let options = PHFetchOptions()
|
||||||
|
options.includeHiddenAssets = false
|
||||||
|
let result = PHAsset.fetchAssets(withLocalIdentifiers: Array(updated), options: options)
|
||||||
|
for i in 0..<result.count {
|
||||||
|
let asset = result.object(at: i)
|
||||||
|
|
||||||
let updated = details.updatedLocalIdentifiers.union(details.insertedLocalIdentifiers)
|
// Asset wrapper only uses the id for comparison. Multiple change can contain the same asset, skip duplicate changes
|
||||||
deletedAssets.formUnion(details.deletedLocalIdentifiers)
|
let predicate = PlatformAsset(
|
||||||
|
id: asset.localIdentifier,
|
||||||
if (updated.isEmpty) { continue }
|
name: "",
|
||||||
|
type: 0,
|
||||||
let options = PHFetchOptions()
|
durationMs: 0,
|
||||||
options.includeHiddenAssets = false
|
orientation: 0,
|
||||||
let result = PHAsset.fetchAssets(withLocalIdentifiers: Array(updated), options: options)
|
isFavorite: false,
|
||||||
for i in 0..<result.count {
|
playbackStyle: .unknown
|
||||||
let asset = result.object(at: i)
|
)
|
||||||
|
if (updatedAssets.contains(AssetWrapper(with: predicate))) {
|
||||||
// Asset wrapper only uses the id for comparison. Multiple change can contain the same asset, skip duplicate changes
|
continue
|
||||||
let predicate = PlatformAsset(
|
|
||||||
id: asset.localIdentifier,
|
|
||||||
name: "",
|
|
||||||
type: 0,
|
|
||||||
durationMs: 0,
|
|
||||||
orientation: 0,
|
|
||||||
isFavorite: false,
|
|
||||||
playbackStyle: .unknown
|
|
||||||
)
|
|
||||||
if (updatedAssets.contains(AssetWrapper(with: predicate))) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
let domainAsset = AssetWrapper(with: asset.toPlatformAsset())
|
|
||||||
updatedAssets.insert(domainAsset)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let domainAsset = AssetWrapper(with: asset.toPlatformAsset())
|
||||||
|
updatedAssets.insert(domainAsset)
|
||||||
}
|
}
|
||||||
|
|
||||||
let updates = Array(updatedAssets.map { $0.asset })
|
|
||||||
return SyncDelta(hasChanges: true, updates: updates, deletes: Array(deletedAssets), assetAlbums: buildAssetAlbumsMap(assets: updates))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let updates = Array(updatedAssets.map { $0.asset })
|
||||||
|
return SyncDelta(hasChanges: true, updates: updates, deletes: Array(deletedAssets), assetAlbums: buildAssetAlbumsMap(assets: updates))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private func buildAssetAlbumsMap(assets: Array<PlatformAsset>) -> [String: [String]] {
|
private func buildAssetAlbumsMap(assets: Array<PlatformAsset>) -> [String: [String]] {
|
||||||
guard !assets.isEmpty else {
|
guard !assets.isEmpty else {
|
||||||
return [:]
|
return [:]
|
||||||
@@ -213,7 +227,11 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
|
|||||||
return albumAssets
|
return albumAssets
|
||||||
}
|
}
|
||||||
|
|
||||||
func getAssetIdsForAlbum(albumId: String) throws -> [String] {
|
func getAssetIdsForAlbum(albumId: String, completion: @escaping (Result<[String], Error>) -> Void) {
|
||||||
|
runSync(completion) { try $0.getAssetIdsForAlbum(albumId: albumId) }
|
||||||
|
}
|
||||||
|
|
||||||
|
private func getAssetIdsForAlbum(albumId: String) throws -> [String] {
|
||||||
let collections = PHAssetCollection.fetchAssetCollections(withLocalIdentifiers: [albumId], options: nil)
|
let collections = PHAssetCollection.fetchAssetCollections(withLocalIdentifiers: [albumId], options: nil)
|
||||||
guard let album = collections.firstObject else {
|
guard let album = collections.firstObject else {
|
||||||
return []
|
return []
|
||||||
@@ -223,9 +241,14 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
|
|||||||
let options = PHFetchOptions()
|
let options = PHFetchOptions()
|
||||||
options.includeHiddenAssets = false
|
options.includeHiddenAssets = false
|
||||||
let assets = getAssetsFromAlbum(in: album, options: options)
|
let assets = getAssetsFromAlbum(in: album, options: options)
|
||||||
assets.enumerateObjects { (asset, _, _) in
|
assets.enumerateObjects { (asset, _, stop) in
|
||||||
|
if Task.isCancelled {
|
||||||
|
stop.pointee = true
|
||||||
|
return
|
||||||
|
}
|
||||||
ids.append(asset.localIdentifier)
|
ids.append(asset.localIdentifier)
|
||||||
}
|
}
|
||||||
|
try Task.checkCancellation()
|
||||||
return ids
|
return ids
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -243,7 +266,11 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
|
|||||||
return Int64(assets.count)
|
return Int64(assets.count)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getAssetsForAlbum(albumId: String, updatedTimeCond: Int64?) throws -> [PlatformAsset] {
|
func getAssetsForAlbum(albumId: String, updatedTimeCond: Int64?, completion: @escaping (Result<[PlatformAsset], Error>) -> Void) {
|
||||||
|
runSync(completion) { try $0.getAssetsForAlbum(albumId: albumId, updatedTimeCond: updatedTimeCond) }
|
||||||
|
}
|
||||||
|
|
||||||
|
private func getAssetsForAlbum(albumId: String, updatedTimeCond: Int64?) throws -> [PlatformAsset] {
|
||||||
let collections = PHAssetCollection.fetchAssetCollections(withLocalIdentifiers: [albumId], options: nil)
|
let collections = PHAssetCollection.fetchAssetCollections(withLocalIdentifiers: [albumId], options: nil)
|
||||||
guard let album = collections.firstObject else {
|
guard let album = collections.firstObject else {
|
||||||
return []
|
return []
|
||||||
@@ -262,9 +289,14 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var assets: [PlatformAsset] = []
|
var assets: [PlatformAsset] = []
|
||||||
result.enumerateObjects { (asset, _, _) in
|
result.enumerateObjects { (asset, _, stop) in
|
||||||
|
if Task.isCancelled {
|
||||||
|
stop.pointee = true
|
||||||
|
return
|
||||||
|
}
|
||||||
assets.append(asset.toPlatformAsset())
|
assets.append(asset.toPlatformAsset())
|
||||||
}
|
}
|
||||||
|
try Task.checkCancellation()
|
||||||
return assets
|
return assets
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -324,6 +356,31 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin {
|
|||||||
hashTask = nil
|
hashTask = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func cancelSync() {
|
||||||
|
syncTask?.cancel()
|
||||||
|
syncTask = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
private func runSync<T>(
|
||||||
|
_ completion: @escaping (Result<T, Error>) -> Void,
|
||||||
|
_ work: @escaping (NativeSyncApiImpl) throws -> T
|
||||||
|
) {
|
||||||
|
syncTask?.cancel()
|
||||||
|
syncTask = Task { [weak self] in
|
||||||
|
guard let self else { return nil }
|
||||||
|
let result: Result<T, Error>
|
||||||
|
do {
|
||||||
|
result = .success(try work(self))
|
||||||
|
} catch is CancellationError {
|
||||||
|
result = .failure(Self.syncCancelled)
|
||||||
|
} catch {
|
||||||
|
result = .failure(error)
|
||||||
|
}
|
||||||
|
self.completeWhenActive(for: completion, with: result)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private func hashAsset(_ asset: PHAsset, allowNetworkAccess: Bool) async -> HashResult? {
|
private func hashAsset(_ asset: PHAsset, allowNetworkAccess: Bool) async -> HashResult? {
|
||||||
class RequestRef {
|
class RequestRef {
|
||||||
var id: PHAssetResourceDataRequestID?
|
var id: PHAssetResourceDataRequestID?
|
||||||
|
|||||||
@@ -188,20 +188,14 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
|
|||||||
if (!_cancellationToken.isCompleted) {
|
if (!_cancellationToken.isCompleted) {
|
||||||
_cancellationToken.complete();
|
_cancellationToken.complete();
|
||||||
}
|
}
|
||||||
final cleanupFutures = [
|
|
||||||
nativeSyncApi?.cancelHashing(),
|
|
||||||
workerManagerPatch.dispose().catchError((_) async {
|
|
||||||
// Discard any errors on the dispose call
|
|
||||||
return;
|
|
||||||
}),
|
|
||||||
LogService.I.dispose(),
|
|
||||||
Store.dispose(),
|
|
||||||
|
|
||||||
backgroundSyncManager?.cancel(),
|
// Workers share one sqlite connection, so DB teardown must wait until every worker has stopped using it.
|
||||||
_drift.optimize(allTables: true),
|
await Future.wait([
|
||||||
];
|
if (backgroundSyncManager != null) backgroundSyncManager.cancel(),
|
||||||
|
if (nativeSyncApi != null) nativeSyncApi.cancelHashing(),
|
||||||
await Future.wait(cleanupFutures.nonNulls);
|
]);
|
||||||
|
await workerManagerPatch.dispose().catchError((_) async {});
|
||||||
|
await Future.wait([LogService.I.dispose(), Store.dispose(), _drift.optimize(allTables: true)]);
|
||||||
await _drift.close();
|
await _drift.close();
|
||||||
await _driftLogger.close();
|
await _driftLogger.close();
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
import 'dart:async';
|
||||||
|
|
||||||
import 'package:flutter/services.dart';
|
import 'package:flutter/services.dart';
|
||||||
import 'package:immich_mobile/constants/constants.dart';
|
import 'package:immich_mobile/constants/constants.dart';
|
||||||
import 'package:immich_mobile/domain/models/album/local_album.model.dart';
|
import 'package:immich_mobile/domain/models/album/local_album.model.dart';
|
||||||
@@ -17,7 +19,7 @@ class HashService {
|
|||||||
final DriftLocalAssetRepository _localAssetRepository;
|
final DriftLocalAssetRepository _localAssetRepository;
|
||||||
final DriftTrashedLocalAssetRepository _trashedLocalAssetRepository;
|
final DriftTrashedLocalAssetRepository _trashedLocalAssetRepository;
|
||||||
final NativeSyncApi _nativeSyncApi;
|
final NativeSyncApi _nativeSyncApi;
|
||||||
final bool Function()? _cancelChecker;
|
final Completer<void>? _cancellation;
|
||||||
final _log = Logger('HashService');
|
final _log = Logger('HashService');
|
||||||
|
|
||||||
HashService({
|
HashService({
|
||||||
@@ -25,11 +27,15 @@ class HashService {
|
|||||||
required this._localAssetRepository,
|
required this._localAssetRepository,
|
||||||
required this._trashedLocalAssetRepository,
|
required this._trashedLocalAssetRepository,
|
||||||
required this._nativeSyncApi,
|
required this._nativeSyncApi,
|
||||||
this._cancelChecker,
|
this._cancellation,
|
||||||
int? batchSize,
|
int? batchSize,
|
||||||
}) : _batchSize = batchSize ?? kBatchHashFileLimit;
|
}) : _batchSize = batchSize ?? kBatchHashFileLimit {
|
||||||
|
// Stop the in-flight native hash call promptly on cancellation; the loops
|
||||||
|
// below also observe [isCancelled] to bail between batches.
|
||||||
|
_cancellation?.future.then((_) => _nativeSyncApi.cancelHashing().onError(_log.warning));
|
||||||
|
}
|
||||||
|
|
||||||
bool get isCancelled => _cancelChecker?.call() ?? false;
|
bool get isCancelled => _cancellation?.isCompleted ?? false;
|
||||||
|
|
||||||
Future<void> hashAssets() async {
|
Future<void> hashAssets() async {
|
||||||
_log.info("Starting hashing of assets");
|
_log.info("Starting hashing of assets");
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ import 'dart:async';
|
|||||||
|
|
||||||
import 'package:collection/collection.dart';
|
import 'package:collection/collection.dart';
|
||||||
import 'package:flutter/foundation.dart';
|
import 'package:flutter/foundation.dart';
|
||||||
|
import 'package:flutter/services.dart';
|
||||||
import 'package:immich_mobile/domain/models/album/local_album.model.dart';
|
import 'package:immich_mobile/domain/models/album/local_album.model.dart';
|
||||||
import 'package:immich_mobile/domain/models/asset/base_asset.model.dart';
|
import 'package:immich_mobile/domain/models/asset/base_asset.model.dart';
|
||||||
import 'package:immich_mobile/domain/models/store.model.dart';
|
import 'package:immich_mobile/domain/models/store.model.dart';
|
||||||
@@ -17,6 +18,8 @@ import 'package:immich_mobile/utils/datetime_helpers.dart';
|
|||||||
import 'package:immich_mobile/utils/diff.dart';
|
import 'package:immich_mobile/utils/diff.dart';
|
||||||
import 'package:logging/logging.dart';
|
import 'package:logging/logging.dart';
|
||||||
|
|
||||||
|
const String _kSyncCancelledCode = "SYNC_CANCELLED";
|
||||||
|
|
||||||
class LocalSyncService {
|
class LocalSyncService {
|
||||||
final DriftLocalAlbumRepository _localAlbumRepository;
|
final DriftLocalAlbumRepository _localAlbumRepository;
|
||||||
// ignore: unused_field
|
// ignore: unused_field
|
||||||
@@ -25,6 +28,7 @@ class LocalSyncService {
|
|||||||
final DriftTrashedLocalAssetRepository _trashedLocalAssetRepository;
|
final DriftTrashedLocalAssetRepository _trashedLocalAssetRepository;
|
||||||
final AssetMediaRepository _assetMediaRepository;
|
final AssetMediaRepository _assetMediaRepository;
|
||||||
final IPermissionRepository _permissionRepository;
|
final IPermissionRepository _permissionRepository;
|
||||||
|
final Completer<void>? _cancellation;
|
||||||
final Logger _log = Logger("DeviceSyncService");
|
final Logger _log = Logger("DeviceSyncService");
|
||||||
|
|
||||||
LocalSyncService({
|
LocalSyncService({
|
||||||
@@ -34,7 +38,12 @@ class LocalSyncService {
|
|||||||
required this._trashedLocalAssetRepository,
|
required this._trashedLocalAssetRepository,
|
||||||
required this._assetMediaRepository,
|
required this._assetMediaRepository,
|
||||||
required this._permissionRepository,
|
required this._permissionRepository,
|
||||||
});
|
this._cancellation,
|
||||||
|
}) {
|
||||||
|
_cancellation?.future.then((_) => _nativeSyncApi.cancelSync().onError(_log.warning));
|
||||||
|
}
|
||||||
|
|
||||||
|
bool get _isCancelled => _cancellation?.isCompleted ?? false;
|
||||||
|
|
||||||
Future<void> sync({bool full = false}) async {
|
Future<void> sync({bool full = false}) async {
|
||||||
final Stopwatch stopwatch = Stopwatch()..start();
|
final Stopwatch stopwatch = Stopwatch()..start();
|
||||||
@@ -81,6 +90,10 @@ class LocalSyncService {
|
|||||||
// detect album deletions from the native side
|
// detect album deletions from the native side
|
||||||
if (CurrentPlatform.isAndroid) {
|
if (CurrentPlatform.isAndroid) {
|
||||||
for (final album in dbAlbums) {
|
for (final album in dbAlbums) {
|
||||||
|
if (_isCancelled) {
|
||||||
|
_log.warning("Local sync cancelled. Stopped processing albums.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
final deviceIds = await _nativeSyncApi.getAssetIdsForAlbum(album.id);
|
final deviceIds = await _nativeSyncApi.getAssetIdsForAlbum(album.id);
|
||||||
await _localAlbumRepository.syncDeletes(album.id, deviceIds);
|
await _localAlbumRepository.syncDeletes(album.id, deviceIds);
|
||||||
}
|
}
|
||||||
@@ -91,6 +104,10 @@ class LocalSyncService {
|
|||||||
// does not include changes for cloud albums.
|
// does not include changes for cloud albums.
|
||||||
final cloudAlbums = deviceAlbums.where((a) => a.isCloud).toLocalAlbums();
|
final cloudAlbums = deviceAlbums.where((a) => a.isCloud).toLocalAlbums();
|
||||||
for (final album in cloudAlbums) {
|
for (final album in cloudAlbums) {
|
||||||
|
if (_isCancelled) {
|
||||||
|
_log.warning("Local sync cancelled. Stopped processing cloud albums.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
final dbAlbum = dbAlbums.firstWhereOrNull((a) => a.id == album.id);
|
final dbAlbum = dbAlbums.firstWhereOrNull((a) => a.id == album.id);
|
||||||
if (dbAlbum == null) {
|
if (dbAlbum == null) {
|
||||||
_log.warning("Cloud album ${album.name} not found in local database. Skipping sync.");
|
_log.warning("Cloud album ${album.name} not found in local database. Skipping sync.");
|
||||||
@@ -102,6 +119,12 @@ class LocalSyncService {
|
|||||||
await _mapIosCloudIds(newAssets);
|
await _mapIosCloudIds(newAssets);
|
||||||
}
|
}
|
||||||
await _nativeSyncApi.checkpointSync();
|
await _nativeSyncApi.checkpointSync();
|
||||||
|
} on PlatformException catch (e, s) {
|
||||||
|
if (e.code == _kSyncCancelledCode) {
|
||||||
|
_log.warning("Local sync cancelled");
|
||||||
|
} else {
|
||||||
|
_log.severe("Error performing device sync", e, s);
|
||||||
|
}
|
||||||
} catch (e, s) {
|
} catch (e, s) {
|
||||||
_log.severe("Error performing device sync", e, s);
|
_log.severe("Error performing device sync", e, s);
|
||||||
} finally {
|
} finally {
|
||||||
@@ -129,12 +152,21 @@ class LocalSyncService {
|
|||||||
await _nativeSyncApi.checkpointSync();
|
await _nativeSyncApi.checkpointSync();
|
||||||
stopwatch.stop();
|
stopwatch.stop();
|
||||||
_log.info("Full device sync took - ${stopwatch.elapsedMilliseconds}ms");
|
_log.info("Full device sync took - ${stopwatch.elapsedMilliseconds}ms");
|
||||||
|
} on PlatformException catch (e, s) {
|
||||||
|
if (e.code == _kSyncCancelledCode) {
|
||||||
|
_log.warning("Full device sync cancelled");
|
||||||
|
} else {
|
||||||
|
_log.severe("Error performing full device sync", e, s);
|
||||||
|
}
|
||||||
} catch (e, s) {
|
} catch (e, s) {
|
||||||
_log.severe("Error performing full device sync", e, s);
|
_log.severe("Error performing full device sync", e, s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> addAlbum(LocalAlbum album) async {
|
Future<void> addAlbum(LocalAlbum album) async {
|
||||||
|
if (_isCancelled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
_log.fine("Adding device album ${album.name}");
|
_log.fine("Adding device album ${album.name}");
|
||||||
|
|
||||||
@@ -162,6 +194,9 @@ class LocalSyncService {
|
|||||||
|
|
||||||
// The deviceAlbum is ignored since we are going to refresh it anyways
|
// The deviceAlbum is ignored since we are going to refresh it anyways
|
||||||
FutureOr<bool> updateAlbum(LocalAlbum dbAlbum, LocalAlbum deviceAlbum) async {
|
FutureOr<bool> updateAlbum(LocalAlbum dbAlbum, LocalAlbum deviceAlbum) async {
|
||||||
|
if (_isCancelled) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
_log.fine("Syncing device album ${dbAlbum.name}");
|
_log.fine("Syncing device album ${dbAlbum.name}");
|
||||||
|
|
||||||
|
|||||||
@@ -112,10 +112,16 @@ class LogService {
|
|||||||
return _flushBuffer();
|
return _flushBuffer();
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> dispose() {
|
Future<void> dispose() async {
|
||||||
_flushTimer?.cancel();
|
_flushTimer?.cancel();
|
||||||
_logSubscription.cancel();
|
_flushTimer = null;
|
||||||
return _flushBuffer();
|
await _logSubscription.cancel();
|
||||||
|
await _flushBuffer();
|
||||||
|
// Allow a subsequent init() (e.g. when a worker isolate is reused) to
|
||||||
|
// create a fresh instance instead of returning this disposed one.
|
||||||
|
if (identical(_instance, this)) {
|
||||||
|
_instance = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> _flushBuffer() async {
|
Future<void> _flushBuffer() async {
|
||||||
|
|||||||
@@ -192,43 +192,30 @@ class RemoteAlbumService {
|
|||||||
required UserDto uploader,
|
required UserDto uploader,
|
||||||
required AlbumAssetCandidates candidates,
|
required AlbumAssetCandidates candidates,
|
||||||
UploadCallbacks uploadCallbacks = const UploadCallbacks(),
|
UploadCallbacks uploadCallbacks = const UploadCallbacks(),
|
||||||
|
Completer<void>? cancelToken,
|
||||||
}) async {
|
}) async {
|
||||||
int addedCount = 0;
|
int addedCount = 0;
|
||||||
if (candidates.remoteAssetIds.isNotEmpty) {
|
if (candidates.remoteAssetIds.isNotEmpty) {
|
||||||
addedCount += await addAssets(albumId: albumId, assetIds: candidates.remoteAssetIds);
|
addedCount += await addAssets(albumId: albumId, assetIds: candidates.remoteAssetIds);
|
||||||
}
|
}
|
||||||
if (candidates.localAssetsToUpload.isNotEmpty) {
|
if (candidates.localAssetsToUpload.isNotEmpty) {
|
||||||
addedCount += await _uploadAndAddLocals(albumId, uploader, candidates.localAssetsToUpload, uploadCallbacks);
|
addedCount += await _uploadAndAddLocals(
|
||||||
|
albumId,
|
||||||
|
uploader,
|
||||||
|
candidates.localAssetsToUpload,
|
||||||
|
uploadCallbacks,
|
||||||
|
cancelToken,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
return addedCount;
|
return addedCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates an album, seeding it with already-remote asset IDs, then uploads
|
|
||||||
/// local-only assets and links each one as it finishes.
|
|
||||||
Future<RemoteAlbum> createAlbumWithAssets({
|
|
||||||
required String title,
|
|
||||||
required UserDto owner,
|
|
||||||
String? description,
|
|
||||||
AlbumAssetCandidates candidates = const AlbumAssetCandidates(remoteAssetIds: [], localAssetsToUpload: []),
|
|
||||||
UploadCallbacks uploadCallbacks = const UploadCallbacks(),
|
|
||||||
}) async {
|
|
||||||
final album = await createAlbum(
|
|
||||||
title: title,
|
|
||||||
owner: owner,
|
|
||||||
description: description,
|
|
||||||
assetIds: candidates.remoteAssetIds,
|
|
||||||
);
|
|
||||||
if (candidates.localAssetsToUpload.isNotEmpty) {
|
|
||||||
await _uploadAndAddLocals(album.id, owner, candidates.localAssetsToUpload, uploadCallbacks);
|
|
||||||
}
|
|
||||||
return album;
|
|
||||||
}
|
|
||||||
|
|
||||||
Future<int> _uploadAndAddLocals(
|
Future<int> _uploadAndAddLocals(
|
||||||
String albumId,
|
String albumId,
|
||||||
UserDto uploader,
|
UserDto uploader,
|
||||||
List<LocalAsset> localAssets,
|
List<LocalAsset> localAssets,
|
||||||
UploadCallbacks userCallbacks,
|
UploadCallbacks userCallbacks,
|
||||||
|
Completer<void>? cancelToken,
|
||||||
) async {
|
) async {
|
||||||
int addedCount = 0;
|
int addedCount = 0;
|
||||||
final pendingAdds = <Future<void>>[];
|
final pendingAdds = <Future<void>>[];
|
||||||
@@ -258,7 +245,7 @@ class RemoteAlbumService {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
pendingAdds.add(
|
pendingAdds.add(
|
||||||
_linkUploadedAssetToAlbum(albumId, remoteId, uploader, source)
|
linkUploadedAssetToAlbum(albumId, remoteId, uploader, source)
|
||||||
.then<void>((added) {
|
.then<void>((added) {
|
||||||
addedCount += added;
|
addedCount += added;
|
||||||
})
|
})
|
||||||
@@ -269,7 +256,7 @@ class RemoteAlbumService {
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
await _uploadService.uploadManual(localAssets, callbacks: wrappedCallbacks);
|
await _uploadService.uploadManual(localAssets, callbacks: wrappedCallbacks, cancelToken: cancelToken);
|
||||||
await Future.wait(pendingAdds);
|
await Future.wait(pendingAdds);
|
||||||
return addedCount;
|
return addedCount;
|
||||||
}
|
}
|
||||||
@@ -288,7 +275,7 @@ class RemoteAlbumService {
|
|||||||
/// `remote_asset_entity` row from the local source so the FK-protected
|
/// `remote_asset_entity` row from the local source so the FK-protected
|
||||||
/// junction insert succeeds. Sync overwrites the placeholder later with
|
/// junction insert succeeds. Sync overwrites the placeholder later with
|
||||||
/// the authoritative server data.
|
/// the authoritative server data.
|
||||||
Future<int> _linkUploadedAssetToAlbum(String albumId, String remoteId, UserDto uploader, LocalAsset source) async {
|
Future<int> linkUploadedAssetToAlbum(String albumId, String remoteId, UserDto uploader, LocalAsset source) async {
|
||||||
final result = await _albumApiRepository.addAssets(albumId, [remoteId]);
|
final result = await _albumApiRepository.addAssets(albumId, [remoteId]);
|
||||||
if (result.added.isEmpty) {
|
if (result.added.isEmpty) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|||||||
@@ -54,7 +54,13 @@ class StoreService {
|
|||||||
/// Disposes the store and cancels the subscription. To reuse the store call init() again
|
/// Disposes the store and cancels the subscription. To reuse the store call init() again
|
||||||
Future<void> dispose() async {
|
Future<void> dispose() async {
|
||||||
await _storeUpdateSubscription?.cancel();
|
await _storeUpdateSubscription?.cancel();
|
||||||
|
_storeUpdateSubscription = null;
|
||||||
_cache.clear();
|
_cache.clear();
|
||||||
|
// Allow a subsequent init() (e.g. when a worker isolate is reused) to
|
||||||
|
// create a fresh instance instead of returning this disposed one.
|
||||||
|
if (identical(_instance, this)) {
|
||||||
|
_instance = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the cached value for [key], or `null`
|
/// Returns the cached value for [key], or `null`
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
import 'dart:async';
|
||||||
|
|
||||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||||
import 'package:immich_mobile/domain/models/album/local_album.model.dart';
|
import 'package:immich_mobile/domain/models/album/local_album.model.dart';
|
||||||
import 'package:immich_mobile/domain/models/store.model.dart';
|
import 'package:immich_mobile/domain/models/store.model.dart';
|
||||||
@@ -5,6 +7,7 @@ import 'package:immich_mobile/domain/services/store.service.dart';
|
|||||||
import 'package:immich_mobile/infrastructure/repositories/local_album.repository.dart';
|
import 'package:immich_mobile/infrastructure/repositories/local_album.repository.dart';
|
||||||
import 'package:immich_mobile/infrastructure/repositories/remote_album.repository.dart';
|
import 'package:immich_mobile/infrastructure/repositories/remote_album.repository.dart';
|
||||||
import 'package:immich_mobile/providers/infrastructure/album.provider.dart';
|
import 'package:immich_mobile/providers/infrastructure/album.provider.dart';
|
||||||
|
import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
|
||||||
import 'package:immich_mobile/providers/infrastructure/store.provider.dart';
|
import 'package:immich_mobile/providers/infrastructure/store.provider.dart';
|
||||||
import 'package:immich_mobile/repositories/drift_album_api_repository.dart';
|
import 'package:immich_mobile/repositories/drift_album_api_repository.dart';
|
||||||
import 'package:immich_mobile/utils/debug_print.dart';
|
import 'package:immich_mobile/utils/debug_print.dart';
|
||||||
@@ -16,6 +19,7 @@ final syncLinkedAlbumServiceProvider = Provider(
|
|||||||
ref.watch(remoteAlbumRepository),
|
ref.watch(remoteAlbumRepository),
|
||||||
ref.watch(driftAlbumApiRepositoryProvider),
|
ref.watch(driftAlbumApiRepositoryProvider),
|
||||||
ref.watch(storeServiceProvider),
|
ref.watch(storeServiceProvider),
|
||||||
|
cancellation: ref.watch(cancellationProvider),
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -24,13 +28,15 @@ class SyncLinkedAlbumService {
|
|||||||
final DriftRemoteAlbumRepository _remoteAlbumRepository;
|
final DriftRemoteAlbumRepository _remoteAlbumRepository;
|
||||||
final DriftAlbumApiRepository _albumApiRepository;
|
final DriftAlbumApiRepository _albumApiRepository;
|
||||||
final StoreService _storeService;
|
final StoreService _storeService;
|
||||||
|
final Completer<void>? _cancellation;
|
||||||
|
|
||||||
SyncLinkedAlbumService(
|
SyncLinkedAlbumService(
|
||||||
this._localAlbumRepository,
|
this._localAlbumRepository,
|
||||||
this._remoteAlbumRepository,
|
this._remoteAlbumRepository,
|
||||||
this._albumApiRepository,
|
this._albumApiRepository,
|
||||||
this._storeService,
|
this._storeService, {
|
||||||
);
|
this._cancellation,
|
||||||
|
});
|
||||||
|
|
||||||
final _log = Logger("SyncLinkedAlbumService");
|
final _log = Logger("SyncLinkedAlbumService");
|
||||||
|
|
||||||
@@ -55,7 +61,11 @@ class SyncLinkedAlbumService {
|
|||||||
final assetIds = await _remoteAlbumRepository.getLinkedAssetIds(userId, localAlbum.id, linkedRemoteAlbumId);
|
final assetIds = await _remoteAlbumRepository.getLinkedAssetIds(userId, localAlbum.id, linkedRemoteAlbumId);
|
||||||
_log.fine("Syncing ${assetIds.length} assets to remote album: ${remoteAlbum.name}");
|
_log.fine("Syncing ${assetIds.length} assets to remote album: ${remoteAlbum.name}");
|
||||||
if (assetIds.isNotEmpty) {
|
if (assetIds.isNotEmpty) {
|
||||||
final album = await _albumApiRepository.addAssets(remoteAlbum.id, assetIds);
|
final album = await _albumApiRepository.addAssets(
|
||||||
|
remoteAlbum.id,
|
||||||
|
assetIds,
|
||||||
|
abortTrigger: _cancellation?.future,
|
||||||
|
);
|
||||||
await _remoteAlbumRepository.addAssets(remoteAlbum.id, album.added);
|
await _remoteAlbumRepository.addAssets(remoteAlbum.id, album.added);
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ class SyncStreamService {
|
|||||||
final IPermissionRepository _permissionRepository;
|
final IPermissionRepository _permissionRepository;
|
||||||
final SyncMigrationRepository _syncMigrationRepository;
|
final SyncMigrationRepository _syncMigrationRepository;
|
||||||
final ApiService _api;
|
final ApiService _api;
|
||||||
final bool Function()? _cancelChecker;
|
final Completer<void>? _cancellation;
|
||||||
|
|
||||||
SyncStreamService({
|
SyncStreamService({
|
||||||
required this._syncApiRepository,
|
required this._syncApiRepository,
|
||||||
@@ -49,10 +49,10 @@ class SyncStreamService {
|
|||||||
required this._permissionRepository,
|
required this._permissionRepository,
|
||||||
required this._syncMigrationRepository,
|
required this._syncMigrationRepository,
|
||||||
required this._api,
|
required this._api,
|
||||||
this._cancelChecker,
|
this._cancellation,
|
||||||
});
|
});
|
||||||
|
|
||||||
bool get isCancelled => _cancelChecker?.call() ?? false;
|
bool get isCancelled => _cancellation?.isCompleted ?? false;
|
||||||
|
|
||||||
Future<bool> sync() async {
|
Future<bool> sync() async {
|
||||||
_logger.info("Remote sync request for user");
|
_logger.info("Remote sync request for user");
|
||||||
@@ -80,10 +80,15 @@ class SyncStreamService {
|
|||||||
_handleEvents,
|
_handleEvents,
|
||||||
serverVersion: serverSemVer,
|
serverVersion: serverSemVer,
|
||||||
onReset: () => shouldReset = true,
|
onReset: () => shouldReset = true,
|
||||||
|
abortSignal: _cancellation?.future,
|
||||||
);
|
);
|
||||||
if (shouldReset) {
|
if (shouldReset) {
|
||||||
_logger.info("Resetting sync state as requested by server");
|
_logger.info("Resetting sync state as requested by server");
|
||||||
await _syncApiRepository.streamChanges(_handleEvents, serverVersion: serverSemVer);
|
await _syncApiRepository.streamChanges(
|
||||||
|
_handleEvents,
|
||||||
|
serverVersion: serverSemVer,
|
||||||
|
abortSignal: _cancellation?.future,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
previousLength = migrations.length;
|
previousLength = migrations.length;
|
||||||
@@ -318,7 +323,7 @@ class SyncStreamService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Future<void> handleWsAssetUploadReadyV1Batch(List<dynamic> batchData) async {
|
Future<void> handleWsAssetUploadReadyV1Batch(List<dynamic> batchData) async {
|
||||||
if (batchData.isEmpty) {
|
if (batchData.isEmpty || isCancelled) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -361,7 +366,7 @@ class SyncStreamService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Future<void> handleWsAssetUploadReadyV2Batch(List<dynamic> batchData) async {
|
Future<void> handleWsAssetUploadReadyV2Batch(List<dynamic> batchData) async {
|
||||||
if (batchData.isEmpty) {
|
if (batchData.isEmpty || isCancelled) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -404,6 +409,9 @@ class SyncStreamService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Future<void> handleWsAssetEditReadyV1(dynamic data) async {
|
Future<void> handleWsAssetEditReadyV1(dynamic data) async {
|
||||||
|
if (isCancelled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
_logger.info('Processing AssetEditReadyV1 event');
|
_logger.info('Processing AssetEditReadyV1 event');
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@@ -444,6 +452,9 @@ class SyncStreamService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Future<void> handleWsAssetEditReadyV2(dynamic data) async {
|
Future<void> handleWsAssetEditReadyV2(dynamic data) async {
|
||||||
|
if (isCancelled) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
_logger.info('Processing AssetEditReadyV2 event');
|
_logger.info('Processing AssetEditReadyV2 event');
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -50,53 +50,27 @@ class BackgroundSyncManager {
|
|||||||
});
|
});
|
||||||
|
|
||||||
Future<void> cancel() async {
|
Future<void> cancel() async {
|
||||||
final futures = <Future>[];
|
final tasks = [
|
||||||
|
_syncTask,
|
||||||
if (_syncTask != null) {
|
_syncWebsocketTask,
|
||||||
futures.add(_syncTask!.future);
|
_cloudIdSyncTask,
|
||||||
|
_linkedAlbumSyncTask,
|
||||||
|
_deviceAlbumSyncTask,
|
||||||
|
_hashTask,
|
||||||
|
];
|
||||||
|
final futures = [
|
||||||
|
for (final task in tasks)
|
||||||
|
if (task != null) task.future,
|
||||||
|
];
|
||||||
|
for (final task in tasks) {
|
||||||
|
task?.cancel();
|
||||||
}
|
}
|
||||||
_syncTask?.cancel();
|
|
||||||
_syncTask = null;
|
_syncTask = null;
|
||||||
|
|
||||||
if (_syncWebsocketTask != null) {
|
|
||||||
futures.add(_syncWebsocketTask!.future);
|
|
||||||
}
|
|
||||||
_syncWebsocketTask?.cancel();
|
|
||||||
_syncWebsocketTask = null;
|
_syncWebsocketTask = null;
|
||||||
|
|
||||||
if (_cloudIdSyncTask != null) {
|
|
||||||
futures.add(_cloudIdSyncTask!.future);
|
|
||||||
}
|
|
||||||
_cloudIdSyncTask?.cancel();
|
|
||||||
_cloudIdSyncTask = null;
|
_cloudIdSyncTask = null;
|
||||||
|
|
||||||
if (_linkedAlbumSyncTask != null) {
|
|
||||||
futures.add(_linkedAlbumSyncTask!.future);
|
|
||||||
}
|
|
||||||
_linkedAlbumSyncTask?.cancel();
|
|
||||||
_linkedAlbumSyncTask = null;
|
_linkedAlbumSyncTask = null;
|
||||||
|
|
||||||
try {
|
|
||||||
await Future.wait(futures);
|
|
||||||
} on CanceledError {
|
|
||||||
// Ignore cancellation errors
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Future<void> cancelLocal() async {
|
|
||||||
final futures = <Future>[];
|
|
||||||
|
|
||||||
if (_hashTask != null) {
|
|
||||||
futures.add(_hashTask!.future);
|
|
||||||
}
|
|
||||||
_hashTask?.cancel();
|
|
||||||
_hashTask = null;
|
|
||||||
|
|
||||||
if (_deviceAlbumSyncTask != null) {
|
|
||||||
futures.add(_deviceAlbumSyncTask!.future);
|
|
||||||
}
|
|
||||||
_deviceAlbumSyncTask?.cancel();
|
|
||||||
_deviceAlbumSyncTask = null;
|
_deviceAlbumSyncTask = null;
|
||||||
|
_hashTask = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await Future.wait(futures);
|
await Future.wait(futures);
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
import 'dart:async';
|
||||||
|
|
||||||
import 'package:drift/drift.dart';
|
import 'package:drift/drift.dart';
|
||||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||||
import 'package:immich_mobile/constants/constants.dart';
|
import 'package:immich_mobile/constants/constants.dart';
|
||||||
@@ -9,6 +11,7 @@ import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
|
|||||||
import 'package:immich_mobile/infrastructure/repositories/local_album.repository.dart';
|
import 'package:immich_mobile/infrastructure/repositories/local_album.repository.dart';
|
||||||
import 'package:immich_mobile/platform/native_sync_api.g.dart';
|
import 'package:immich_mobile/platform/native_sync_api.g.dart';
|
||||||
import 'package:immich_mobile/providers/api.provider.dart';
|
import 'package:immich_mobile/providers/api.provider.dart';
|
||||||
|
import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
|
||||||
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
|
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
|
||||||
import 'package:immich_mobile/providers/infrastructure/sync.provider.dart';
|
import 'package:immich_mobile/providers/infrastructure/sync.provider.dart';
|
||||||
import 'package:immich_mobile/providers/server_info.provider.dart';
|
import 'package:immich_mobile/providers/server_info.provider.dart';
|
||||||
@@ -51,9 +54,10 @@ Future<void> syncCloudIds(ProviderContainer ref) async {
|
|||||||
}
|
}
|
||||||
|
|
||||||
final assetApi = ref.read(apiServiceProvider).assetsApi;
|
final assetApi = ref.read(apiServiceProvider).assetsApi;
|
||||||
|
final cancellation = ref.read(cancellationProvider);
|
||||||
|
|
||||||
// Process cloud IDs in paginated batches
|
// Process cloud IDs in paginated batches
|
||||||
await _processCloudIdMappingsInBatches(db, currentUser.id, assetApi, canBulkUpdateMetadata, logger);
|
await _processCloudIdMappingsInBatches(db, currentUser.id, assetApi, canBulkUpdateMetadata, logger, cancellation);
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> _processCloudIdMappingsInBatches(
|
Future<void> _processCloudIdMappingsInBatches(
|
||||||
@@ -62,12 +66,17 @@ Future<void> _processCloudIdMappingsInBatches(
|
|||||||
AssetsApi assetsApi,
|
AssetsApi assetsApi,
|
||||||
bool canBulkUpdate,
|
bool canBulkUpdate,
|
||||||
Logger logger,
|
Logger logger,
|
||||||
|
Completer<void> cancellation,
|
||||||
) async {
|
) async {
|
||||||
const pageSize = 20000;
|
const pageSize = 20000;
|
||||||
String? lastLocalId;
|
String? lastLocalId;
|
||||||
final seenRemoteAssetIds = <String>{};
|
final seenRemoteAssetIds = <String>{};
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
if (cancellation.isCompleted) {
|
||||||
|
logger.warning('Cloud ID migration cancelled. Stopping batch processing.');
|
||||||
|
break;
|
||||||
|
}
|
||||||
final mappings = await _fetchCloudIdMappings(drift, userId, pageSize, lastLocalId);
|
final mappings = await _fetchCloudIdMappings(drift, userId, pageSize, lastLocalId);
|
||||||
if (mappings.isEmpty) {
|
if (mappings.isEmpty) {
|
||||||
break;
|
break;
|
||||||
@@ -98,9 +107,9 @@ Future<void> _processCloudIdMappingsInBatches(
|
|||||||
|
|
||||||
if (items.isNotEmpty) {
|
if (items.isNotEmpty) {
|
||||||
if (canBulkUpdate) {
|
if (canBulkUpdate) {
|
||||||
await _bulkUpdateCloudIds(assetsApi, items);
|
await _bulkUpdateCloudIds(assetsApi, items, cancellation.future);
|
||||||
} else {
|
} else {
|
||||||
await _sequentialUpdateCloudIds(assetsApi, items);
|
await _sequentialUpdateCloudIds(assetsApi, items, cancellation);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -111,20 +120,35 @@ Future<void> _processCloudIdMappingsInBatches(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> _sequentialUpdateCloudIds(AssetsApi assetsApi, List<AssetMetadataBulkUpsertItemDto> items) async {
|
Future<void> _sequentialUpdateCloudIds(
|
||||||
|
AssetsApi assetsApi,
|
||||||
|
List<AssetMetadataBulkUpsertItemDto> items,
|
||||||
|
Completer<void> cancellation,
|
||||||
|
) async {
|
||||||
for (final item in items) {
|
for (final item in items) {
|
||||||
|
if (cancellation.isCompleted) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
final upsertItem = AssetMetadataUpsertItemDto(key: item.key, value: item.value);
|
final upsertItem = AssetMetadataUpsertItemDto(key: item.key, value: item.value);
|
||||||
try {
|
try {
|
||||||
await assetsApi.updateAssetMetadata(item.assetId, AssetMetadataUpsertDto(items: [upsertItem]));
|
await assetsApi.updateAssetMetadata(
|
||||||
|
item.assetId,
|
||||||
|
AssetMetadataUpsertDto(items: [upsertItem]),
|
||||||
|
abortTrigger: cancellation.future,
|
||||||
|
);
|
||||||
} catch (error, stack) {
|
} catch (error, stack) {
|
||||||
Logger('migrateCloudIds').warning('Failed to update metadata for asset ${item.assetId}', error, stack);
|
Logger('migrateCloudIds').warning('Failed to update metadata for asset ${item.assetId}', error, stack);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> _bulkUpdateCloudIds(AssetsApi assetsApi, List<AssetMetadataBulkUpsertItemDto> items) async {
|
Future<void> _bulkUpdateCloudIds(
|
||||||
|
AssetsApi assetsApi,
|
||||||
|
List<AssetMetadataBulkUpsertItemDto> items,
|
||||||
|
Future<void> abortTrigger,
|
||||||
|
) async {
|
||||||
try {
|
try {
|
||||||
await assetsApi.updateBulkAssetMetadata(AssetMetadataBulkUpsertDto(items: items));
|
await assetsApi.updateBulkAssetMetadata(AssetMetadataBulkUpsertDto(items: items), abortTrigger: abortTrigger);
|
||||||
} catch (error, stack) {
|
} catch (error, stack) {
|
||||||
Logger('migrateCloudIds').warning('Failed to bulk update metadata', error, stack);
|
Logger('migrateCloudIds').warning('Failed to bulk update metadata', error, stack);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ class SyncApiRepository {
|
|||||||
Function()? onReset,
|
Function()? onReset,
|
||||||
int batchSize = kSyncEventBatchSize,
|
int batchSize = kSyncEventBatchSize,
|
||||||
http.Client? httpClient,
|
http.Client? httpClient,
|
||||||
|
Future<void>? abortSignal,
|
||||||
}) async {
|
}) async {
|
||||||
final stopwatch = Stopwatch()..start();
|
final stopwatch = Stopwatch()..start();
|
||||||
final client = httpClient ?? NetworkRepository.client;
|
final client = httpClient ?? NetworkRepository.client;
|
||||||
@@ -36,7 +37,7 @@ class SyncApiRepository {
|
|||||||
|
|
||||||
final headers = {'Content-Type': 'application/json', 'Accept': 'application/jsonlines+json'};
|
final headers = {'Content-Type': 'application/json', 'Accept': 'application/jsonlines+json'};
|
||||||
|
|
||||||
final request = http.Request('POST', Uri.parse(endpoint));
|
final request = http.AbortableRequest('POST', Uri.parse(endpoint), abortTrigger: abortSignal);
|
||||||
request.headers.addAll(headers);
|
request.headers.addAll(headers);
|
||||||
request.body = jsonEncode(
|
request.body = jsonEncode(
|
||||||
SyncStreamDto(
|
SyncStreamDto(
|
||||||
|
|||||||
+14
@@ -635,6 +635,20 @@ class NativeSyncApi {
|
|||||||
_extractReplyValueOrThrow(pigeonVar_replyList, pigeonVar_channelName, isNullValid: true);
|
_extractReplyValueOrThrow(pigeonVar_replyList, pigeonVar_channelName, isNullValid: true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Future<void> cancelSync() async {
|
||||||
|
final pigeonVar_channelName =
|
||||||
|
'dev.flutter.pigeon.immich_mobile.NativeSyncApi.cancelSync$pigeonVar_messageChannelSuffix';
|
||||||
|
final pigeonVar_channel = BasicMessageChannel<Object?>(
|
||||||
|
pigeonVar_channelName,
|
||||||
|
pigeonChannelCodec,
|
||||||
|
binaryMessenger: pigeonVar_binaryMessenger,
|
||||||
|
);
|
||||||
|
final Future<Object?> pigeonVar_sendFuture = pigeonVar_channel.send(null);
|
||||||
|
final pigeonVar_replyList = await pigeonVar_sendFuture as List<Object?>?;
|
||||||
|
|
||||||
|
_extractReplyValueOrThrow(pigeonVar_replyList, pigeonVar_channelName, isNullValid: true);
|
||||||
|
}
|
||||||
|
|
||||||
Future<Map<String, List<PlatformAsset>>> getTrashedAssets() async {
|
Future<Map<String, List<PlatformAsset>>> getTrashedAssets() async {
|
||||||
final pigeonVar_channelName =
|
final pigeonVar_channelName =
|
||||||
'dev.flutter.pigeon.immich_mobile.NativeSyncApi.getTrashedAssets$pigeonVar_messageChannelSuffix';
|
'dev.flutter.pigeon.immich_mobile.NativeSyncApi.getTrashedAssets$pigeonVar_messageChannelSuffix';
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import 'package:immich_mobile/presentation/widgets/action_buttons/base_action_bu
|
|||||||
import 'package:immich_mobile/presentation/widgets/action_buttons/unarchive_action_button.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/action_buttons/unarchive_action_button.widget.dart';
|
||||||
import 'package:immich_mobile/providers/asset_viewer/asset_viewer.provider.dart';
|
import 'package:immich_mobile/providers/asset_viewer/asset_viewer.provider.dart';
|
||||||
import 'package:immich_mobile/presentation/widgets/album/album_selector.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/album/album_selector.widget.dart';
|
||||||
|
import 'package:immich_mobile/providers/infrastructure/action.provider.dart';
|
||||||
import 'package:immich_mobile/providers/infrastructure/album.provider.dart';
|
import 'package:immich_mobile/providers/infrastructure/album.provider.dart';
|
||||||
import 'package:immich_mobile/providers/routes.provider.dart';
|
import 'package:immich_mobile/providers/routes.provider.dart';
|
||||||
import 'package:immich_mobile/widgets/common/immich_toast.dart';
|
import 'package:immich_mobile/widgets/common/immich_toast.dart';
|
||||||
@@ -142,13 +143,18 @@ class _AddActionButtonState extends ConsumerState<AddActionButton> {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final addedCount = await ref.read(remoteAlbumProvider.notifier).addAssets(album.id, [latest.remoteId!]);
|
final result = await ref.read(actionProvider.notifier).addToAlbum(ActionSource.viewer, album);
|
||||||
|
|
||||||
if (!context.mounted) {
|
if (!context.mounted) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (addedCount == 0) {
|
if (!result.success) {
|
||||||
|
ImmichToast.show(context: context, msg: 'scaffold_body_error_occurred'.tr(), toastType: ToastType.error);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result.count == 0) {
|
||||||
ImmichToast.show(
|
ImmichToast.show(
|
||||||
context: context,
|
context: context,
|
||||||
msg: 'add_to_album_bottom_sheet_already_exists'.tr(namedArgs: {'album': album.name}),
|
msg: 'add_to_album_bottom_sheet_already_exists'.tr(namedArgs: {'album': album.name}),
|
||||||
@@ -159,7 +165,7 @@ class _AddActionButtonState extends ConsumerState<AddActionButton> {
|
|||||||
msg: 'add_to_album_bottom_sheet_added'.tr(namedArgs: {'album': album.name}),
|
msg: 'add_to_album_bottom_sheet_added'.tr(namedArgs: {'album': album.name}),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Invalidate using the asset's remote ID to refresh the "Appears in" list
|
// Refresh the "Appears in" list on the asset's info panel.
|
||||||
ref.invalidate(albumsContainingAssetProvider(latest.remoteId!));
|
ref.invalidate(albumsContainingAssetProvider(latest.remoteId!));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ import 'package:flutter/material.dart';
|
|||||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||||
import 'package:immich_mobile/constants/enums.dart';
|
import 'package:immich_mobile/constants/enums.dart';
|
||||||
import 'package:immich_mobile/domain/models/album/album.model.dart';
|
import 'package:immich_mobile/domain/models/album/album.model.dart';
|
||||||
import 'package:immich_mobile/domain/models/asset/base_asset.model.dart';
|
|
||||||
import 'package:immich_mobile/extensions/build_context_extensions.dart';
|
import 'package:immich_mobile/extensions/build_context_extensions.dart';
|
||||||
import 'package:immich_mobile/extensions/theme_extensions.dart';
|
import 'package:immich_mobile/extensions/theme_extensions.dart';
|
||||||
import 'package:immich_mobile/extensions/translate_extensions.dart';
|
import 'package:immich_mobile/extensions/translate_extensions.dart';
|
||||||
@@ -746,12 +745,10 @@ class AddToAlbumHeader extends ConsumerWidget {
|
|||||||
@override
|
@override
|
||||||
Widget build(BuildContext context, WidgetRef ref) {
|
Widget build(BuildContext context, WidgetRef ref) {
|
||||||
Future<void> onCreateAlbum() async {
|
Future<void> onCreateAlbum() async {
|
||||||
|
final selectedAssets = ref.read(multiSelectProvider).selectedAssets;
|
||||||
final newAlbum = await ref
|
final newAlbum = await ref
|
||||||
.read(remoteAlbumProvider.notifier)
|
.read(remoteAlbumProvider.notifier)
|
||||||
.createAlbum(
|
.createAlbumWithAssets(title: "Untitled Album", assets: selectedAssets);
|
||||||
title: "Untitled Album",
|
|
||||||
assetIds: ref.read(multiSelectProvider).selectedAssets.map((e) => (e as RemoteAsset).id).toList(),
|
|
||||||
);
|
|
||||||
|
|
||||||
if (newAlbum == null) {
|
if (newAlbum == null) {
|
||||||
ImmichToast.show(context: context, toastType: ToastType.error, msg: 'errors.failed_to_create_album'.tr());
|
ImmichToast.show(context: context, toastType: ToastType.error, msg: 'errors.failed_to_create_album'.tr());
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import 'package:immich_mobile/extensions/build_context_extensions.dart';
|
|||||||
import 'package:immich_mobile/extensions/translate_extensions.dart';
|
import 'package:immich_mobile/extensions/translate_extensions.dart';
|
||||||
import 'package:immich_mobile/presentation/widgets/images/thumbnail.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/images/thumbnail.widget.dart';
|
||||||
import 'package:immich_mobile/providers/album/pending_album_uploads.provider.dart';
|
import 'package:immich_mobile/providers/album/pending_album_uploads.provider.dart';
|
||||||
|
import 'package:immich_mobile/providers/backup/asset_upload_progress.provider.dart';
|
||||||
|
|
||||||
/// Pinned banner sliver that surfaces in-flight album uploads directly under
|
/// Pinned banner sliver that surfaces in-flight album uploads directly under
|
||||||
/// the album app bar. Renders nothing while the queue is empty. Tapping the
|
/// the album app bar. Renders nothing while the queue is empty. Tapping the
|
||||||
@@ -165,6 +166,8 @@ class _PendingUploadsSheet extends ConsumerWidget {
|
|||||||
}
|
}
|
||||||
|
|
||||||
final failedCount = pending.where((p) => p.failed).length;
|
final failedCount = pending.where((p) => p.failed).length;
|
||||||
|
final inFlightCount = pending.length - failedCount;
|
||||||
|
final canAbort = inFlightCount > 0 && ref.watch(manualUploadCancelTokenProvider) != null;
|
||||||
|
|
||||||
return SafeArea(
|
return SafeArea(
|
||||||
child: Padding(
|
child: Padding(
|
||||||
@@ -183,7 +186,21 @@ class _PendingUploadsSheet extends ConsumerWidget {
|
|||||||
style: context.textTheme.titleMedium,
|
style: context.textTheme.titleMedium,
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
if (failedCount > 0)
|
if (canAbort)
|
||||||
|
TextButton.icon(
|
||||||
|
onPressed: () {
|
||||||
|
final cancelToken = ref.read(manualUploadCancelTokenProvider);
|
||||||
|
if (cancelToken != null && !cancelToken.isCompleted) {
|
||||||
|
cancelToken.complete();
|
||||||
|
}
|
||||||
|
ref.read(manualUploadCancelTokenProvider.notifier).state = null;
|
||||||
|
ref.read(pendingAlbumUploadsProvider(albumId).notifier).clear();
|
||||||
|
},
|
||||||
|
icon: const Icon(Icons.stop_circle_outlined, size: 18),
|
||||||
|
label: Text('cancel'.t(context: context)),
|
||||||
|
style: TextButton.styleFrom(foregroundColor: context.colorScheme.error),
|
||||||
|
)
|
||||||
|
else if (failedCount > 0)
|
||||||
TextButton.icon(
|
TextButton.icon(
|
||||||
onPressed: () => ref.read(pendingAlbumUploadsProvider(albumId).notifier).clearFailed(),
|
onPressed: () => ref.read(pendingAlbumUploadsProvider(albumId).notifier).clearFailed(),
|
||||||
icon: const Icon(Icons.clear_rounded, size: 18),
|
icon: const Icon(Icons.clear_rounded, size: 18),
|
||||||
|
|||||||
@@ -3,9 +3,7 @@ import 'package:flutter/material.dart';
|
|||||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||||
import 'package:immich_mobile/constants/enums.dart';
|
import 'package:immich_mobile/constants/enums.dart';
|
||||||
import 'package:immich_mobile/domain/models/album/album.model.dart';
|
import 'package:immich_mobile/domain/models/album/album.model.dart';
|
||||||
import 'package:immich_mobile/domain/models/asset/base_asset.model.dart';
|
|
||||||
import 'package:immich_mobile/domain/models/setting.model.dart';
|
import 'package:immich_mobile/domain/models/setting.model.dart';
|
||||||
import 'package:immich_mobile/extensions/translate_extensions.dart';
|
|
||||||
import 'package:immich_mobile/presentation/widgets/action_buttons/advanced_info_action_button.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/action_buttons/advanced_info_action_button.widget.dart';
|
||||||
import 'package:immich_mobile/presentation/widgets/action_buttons/archive_action_button.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/action_buttons/archive_action_button.widget.dart';
|
||||||
import 'package:immich_mobile/presentation/widgets/action_buttons/bulk_tag_assets_action_button.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/action_buttons/bulk_tag_assets_action_button.widget.dart';
|
||||||
@@ -25,7 +23,7 @@ import 'package:immich_mobile/presentation/widgets/action_buttons/unstack_action
|
|||||||
import 'package:immich_mobile/presentation/widgets/action_buttons/upload_action_button.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/action_buttons/upload_action_button.widget.dart';
|
||||||
import 'package:immich_mobile/presentation/widgets/album/album_selector.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/album/album_selector.widget.dart';
|
||||||
import 'package:immich_mobile/presentation/widgets/bottom_sheet/base_bottom_sheet.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/bottom_sheet/base_bottom_sheet.widget.dart';
|
||||||
import 'package:immich_mobile/providers/infrastructure/album.provider.dart';
|
import 'package:immich_mobile/providers/infrastructure/action.provider.dart';
|
||||||
import 'package:immich_mobile/providers/infrastructure/setting.provider.dart';
|
import 'package:immich_mobile/providers/infrastructure/setting.provider.dart';
|
||||||
import 'package:immich_mobile/providers/infrastructure/user_metadata.provider.dart';
|
import 'package:immich_mobile/providers/infrastructure/user_metadata.provider.dart';
|
||||||
import 'package:immich_mobile/providers/server_info.provider.dart';
|
import 'package:immich_mobile/providers/server_info.provider.dart';
|
||||||
@@ -63,37 +61,23 @@ class _GeneralBottomSheetState extends ConsumerState<GeneralBottomSheet> {
|
|||||||
userMetadataPreferencesProvider.select((value) => value.valueOrNull?.tagsEnabled ?? false),
|
userMetadataPreferencesProvider.select((value) => value.valueOrNull?.tagsEnabled ?? false),
|
||||||
);
|
);
|
||||||
|
|
||||||
Future<void> addAssetsToAlbum(RemoteAlbum album) async {
|
Future<void> addToAlbum(RemoteAlbum album) async {
|
||||||
final selectedAssets = multiselect.selectedAssets;
|
final result = await ref.read(actionProvider.notifier).addToAlbum(ActionSource.timeline, album);
|
||||||
if (selectedAssets.isEmpty) {
|
|
||||||
|
if (!context.mounted) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final remoteAssets = selectedAssets.whereType<RemoteAsset>();
|
if (!result.success) {
|
||||||
final addedCount = await ref
|
ImmichToast.show(context: context, msg: 'scaffold_body_error_occurred'.tr(), toastType: ToastType.error);
|
||||||
.read(remoteAlbumProvider.notifier)
|
return;
|
||||||
.addAssets(album.id, remoteAssets.map((e) => e.id).toList());
|
|
||||||
|
|
||||||
if (selectedAssets.length != remoteAssets.length) {
|
|
||||||
ImmichToast.show(
|
|
||||||
context: context,
|
|
||||||
msg: 'add_to_album_bottom_sheet_some_local_assets'.t(context: context),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
ImmichToast.show(
|
||||||
if (addedCount != remoteAssets.length) {
|
context: context,
|
||||||
ImmichToast.show(
|
msg: result.count == 0
|
||||||
context: context,
|
? 'add_to_album_bottom_sheet_already_exists'.tr(namedArgs: {'album': album.name})
|
||||||
msg: 'add_to_album_bottom_sheet_already_exists'.tr(namedArgs: {"album": album.name}),
|
: 'add_to_album_bottom_sheet_added'.tr(namedArgs: {'album': album.name}),
|
||||||
);
|
);
|
||||||
} else {
|
|
||||||
ImmichToast.show(
|
|
||||||
context: context,
|
|
||||||
msg: 'add_to_album_bottom_sheet_added'.tr(namedArgs: {"album": album.name}),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
ref.read(multiSelectProvider.notifier).reset();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> onKeyboardExpand() {
|
Future<void> onKeyboardExpand() {
|
||||||
@@ -131,12 +115,10 @@ class _GeneralBottomSheetState extends ConsumerState<GeneralBottomSheet> {
|
|||||||
const DeleteLocalActionButton(source: ActionSource.timeline),
|
const DeleteLocalActionButton(source: ActionSource.timeline),
|
||||||
if (multiselect.onlyLocal) const UploadActionButton(source: ActionSource.timeline),
|
if (multiselect.onlyLocal) const UploadActionButton(source: ActionSource.timeline),
|
||||||
],
|
],
|
||||||
slivers: multiselect.hasRemote
|
slivers: [
|
||||||
? [
|
const AddToAlbumHeader(),
|
||||||
const AddToAlbumHeader(),
|
AlbumSelector(onAlbumSelected: addToAlbum, onKeyboardExpanded: onKeyboardExpand),
|
||||||
AlbumSelector(onAlbumSelected: addAssetsToAlbum, onKeyboardExpanded: onKeyboardExpand),
|
],
|
||||||
]
|
|
||||||
: [],
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,25 +1,78 @@
|
|||||||
|
import 'package:easy_localization/easy_localization.dart';
|
||||||
import 'package:flutter/material.dart';
|
import 'package:flutter/material.dart';
|
||||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||||
import 'package:immich_mobile/constants/enums.dart';
|
import 'package:immich_mobile/constants/enums.dart';
|
||||||
|
import 'package:immich_mobile/domain/models/album/album.model.dart';
|
||||||
import 'package:immich_mobile/presentation/widgets/action_buttons/delete_local_action_button.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/action_buttons/delete_local_action_button.widget.dart';
|
||||||
import 'package:immich_mobile/presentation/widgets/action_buttons/share_action_button.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/action_buttons/share_action_button.widget.dart';
|
||||||
import 'package:immich_mobile/presentation/widgets/action_buttons/upload_action_button.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/action_buttons/upload_action_button.widget.dart';
|
||||||
|
import 'package:immich_mobile/presentation/widgets/album/album_selector.widget.dart';
|
||||||
import 'package:immich_mobile/presentation/widgets/bottom_sheet/base_bottom_sheet.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/bottom_sheet/base_bottom_sheet.widget.dart';
|
||||||
|
import 'package:immich_mobile/providers/infrastructure/action.provider.dart';
|
||||||
|
import 'package:immich_mobile/widgets/common/immich_toast.dart';
|
||||||
|
|
||||||
class LocalAlbumBottomSheet extends ConsumerWidget {
|
class LocalAlbumBottomSheet extends ConsumerStatefulWidget {
|
||||||
const LocalAlbumBottomSheet({super.key});
|
const LocalAlbumBottomSheet({super.key});
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Widget build(BuildContext context, WidgetRef ref) {
|
ConsumerState<LocalAlbumBottomSheet> createState() => _LocalAlbumBottomSheetState();
|
||||||
return const BaseBottomSheet(
|
}
|
||||||
|
|
||||||
|
class _LocalAlbumBottomSheetState extends ConsumerState<LocalAlbumBottomSheet> {
|
||||||
|
late final DraggableScrollableController sheetController;
|
||||||
|
|
||||||
|
@override
|
||||||
|
void initState() {
|
||||||
|
super.initState();
|
||||||
|
sheetController = DraggableScrollableController();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void dispose() {
|
||||||
|
sheetController.dispose();
|
||||||
|
super.dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Widget build(BuildContext context) {
|
||||||
|
Future<void> addToAlbum(RemoteAlbum album) async {
|
||||||
|
final result = await ref.read(actionProvider.notifier).addToAlbum(ActionSource.timeline, album);
|
||||||
|
|
||||||
|
if (!context.mounted) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!result.success) {
|
||||||
|
ImmichToast.show(context: context, msg: 'scaffold_body_error_occurred'.tr(), toastType: ToastType.error);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ImmichToast.show(
|
||||||
|
context: context,
|
||||||
|
msg: result.count == 0
|
||||||
|
? 'add_to_album_bottom_sheet_already_exists'.tr(namedArgs: {'album': album.name})
|
||||||
|
: 'add_to_album_bottom_sheet_added'.tr(namedArgs: {'album': album.name}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> onKeyboardExpand() {
|
||||||
|
return sheetController.animateTo(0.85, duration: const Duration(milliseconds: 200), curve: Curves.easeInOut);
|
||||||
|
}
|
||||||
|
|
||||||
|
return BaseBottomSheet(
|
||||||
|
controller: sheetController,
|
||||||
initialChildSize: 0.25,
|
initialChildSize: 0.25,
|
||||||
maxChildSize: 0.4,
|
maxChildSize: 0.85,
|
||||||
shouldCloseOnMinExtent: false,
|
shouldCloseOnMinExtent: false,
|
||||||
actions: [
|
actions: const [
|
||||||
ShareActionButton(source: ActionSource.timeline),
|
ShareActionButton(source: ActionSource.timeline),
|
||||||
DeleteLocalActionButton(source: ActionSource.timeline),
|
DeleteLocalActionButton(source: ActionSource.timeline),
|
||||||
UploadActionButton(source: ActionSource.timeline),
|
UploadActionButton(source: ActionSource.timeline),
|
||||||
],
|
],
|
||||||
|
slivers: [
|
||||||
|
const AddToAlbumHeader(),
|
||||||
|
AlbumSelector(onAlbumSelected: addToAlbum, onKeyboardExpanded: onKeyboardExpand),
|
||||||
|
],
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+16
-21
@@ -2,7 +2,6 @@ import 'package:flutter/material.dart';
|
|||||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||||
import 'package:immich_mobile/constants/enums.dart';
|
import 'package:immich_mobile/constants/enums.dart';
|
||||||
import 'package:immich_mobile/domain/models/album/album.model.dart';
|
import 'package:immich_mobile/domain/models/album/album.model.dart';
|
||||||
import 'package:immich_mobile/domain/models/asset/base_asset.model.dart';
|
|
||||||
import 'package:immich_mobile/extensions/translate_extensions.dart';
|
import 'package:immich_mobile/extensions/translate_extensions.dart';
|
||||||
import 'package:immich_mobile/presentation/widgets/action_buttons/archive_action_button.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/action_buttons/archive_action_button.widget.dart';
|
||||||
import 'package:immich_mobile/presentation/widgets/action_buttons/delete_local_action_button.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/action_buttons/delete_local_action_button.widget.dart';
|
||||||
@@ -21,7 +20,7 @@ import 'package:immich_mobile/presentation/widgets/action_buttons/trash_action_b
|
|||||||
import 'package:immich_mobile/presentation/widgets/action_buttons/unstack_action_button.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/action_buttons/unstack_action_button.widget.dart';
|
||||||
import 'package:immich_mobile/presentation/widgets/album/album_selector.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/album/album_selector.widget.dart';
|
||||||
import 'package:immich_mobile/presentation/widgets/bottom_sheet/base_bottom_sheet.widget.dart';
|
import 'package:immich_mobile/presentation/widgets/bottom_sheet/base_bottom_sheet.widget.dart';
|
||||||
import 'package:immich_mobile/providers/infrastructure/album.provider.dart';
|
import 'package:immich_mobile/providers/infrastructure/action.provider.dart';
|
||||||
import 'package:immich_mobile/providers/server_info.provider.dart';
|
import 'package:immich_mobile/providers/server_info.provider.dart';
|
||||||
import 'package:immich_mobile/providers/timeline/multiselect.provider.dart';
|
import 'package:immich_mobile/providers/timeline/multiselect.provider.dart';
|
||||||
import 'package:immich_mobile/providers/user.provider.dart';
|
import 'package:immich_mobile/providers/user.provider.dart';
|
||||||
@@ -56,29 +55,28 @@ class _RemoteAlbumBottomSheetState extends ConsumerState<RemoteAlbumBottomSheet>
|
|||||||
final isTrashEnable = ref.watch(serverInfoProvider.select((state) => state.serverFeatures.trash));
|
final isTrashEnable = ref.watch(serverInfoProvider.select((state) => state.serverFeatures.trash));
|
||||||
final ownsAlbum = ref.watch(currentUserProvider)?.id == widget.album.ownerId;
|
final ownsAlbum = ref.watch(currentUserProvider)?.id == widget.album.ownerId;
|
||||||
|
|
||||||
Future<void> addAssetsToAlbum(RemoteAlbum album) async {
|
Future<void> addToAlbum(RemoteAlbum album) async {
|
||||||
final selectedAssets = multiselect.selectedAssets;
|
final result = await ref.read(actionProvider.notifier).addToAlbum(ActionSource.timeline, album);
|
||||||
if (selectedAssets.isEmpty) {
|
|
||||||
|
if (!context.mounted) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final addedCount = await ref
|
if (!result.success) {
|
||||||
.read(remoteAlbumProvider.notifier)
|
|
||||||
.addAssets(album.id, selectedAssets.map((e) => (e as RemoteAsset).id).toList());
|
|
||||||
|
|
||||||
if (addedCount != selectedAssets.length) {
|
|
||||||
ImmichToast.show(
|
ImmichToast.show(
|
||||||
context: context,
|
context: context,
|
||||||
msg: 'add_to_album_bottom_sheet_already_exists'.t(context: context, args: {"album": album.name}),
|
msg: 'scaffold_body_error_occurred'.t(context: context),
|
||||||
);
|
toastType: ToastType.error,
|
||||||
} else {
|
|
||||||
ImmichToast.show(
|
|
||||||
context: context,
|
|
||||||
msg: 'add_to_album_bottom_sheet_added'.t(context: context, args: {"album": album.name}),
|
|
||||||
);
|
);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
ref.read(multiSelectProvider.notifier).reset();
|
ImmichToast.show(
|
||||||
|
context: context,
|
||||||
|
msg: result.count == 0
|
||||||
|
? 'add_to_album_bottom_sheet_already_exists'.t(context: context, args: {"album": album.name})
|
||||||
|
: 'add_to_album_bottom_sheet_added'.t(context: context, args: {"album": album.name}),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> onKeyboardExpand() {
|
Future<void> onKeyboardExpand() {
|
||||||
@@ -118,10 +116,7 @@ class _RemoteAlbumBottomSheetState extends ConsumerState<RemoteAlbumBottomSheet>
|
|||||||
SetAlbumCoverActionButton(source: ActionSource.timeline, albumId: widget.album.id),
|
SetAlbumCoverActionButton(source: ActionSource.timeline, albumId: widget.album.id),
|
||||||
],
|
],
|
||||||
slivers: ownsAlbum
|
slivers: ownsAlbum
|
||||||
? [
|
? [const AddToAlbumHeader(), AlbumSelector(onAlbumSelected: addToAlbum, onKeyboardExpanded: onKeyboardExpand)]
|
||||||
const AddToAlbumHeader(),
|
|
||||||
AlbumSelector(onAlbumSelected: addAssetsToAlbum, onKeyboardExpanded: onKeyboardExpand),
|
|
||||||
]
|
|
||||||
: null,
|
: null,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -67,6 +67,11 @@ class AlbumPendingUploadsNotifier extends AutoDisposeFamilyNotifier<List<Pending
|
|||||||
_syncKeepAlive();
|
_syncKeepAlive();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void clear() {
|
||||||
|
state = const [];
|
||||||
|
_syncKeepAlive();
|
||||||
|
}
|
||||||
|
|
||||||
void _syncKeepAlive() {
|
void _syncKeepAlive() {
|
||||||
if (state.isEmpty) {
|
if (state.isEmpty) {
|
||||||
_keepAliveLink?.close();
|
_keepAliveLink?.close();
|
||||||
|
|||||||
@@ -5,12 +5,15 @@ import 'package:background_downloader/background_downloader.dart';
|
|||||||
import 'package:flutter/material.dart';
|
import 'package:flutter/material.dart';
|
||||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||||
import 'package:immich_mobile/constants/enums.dart';
|
import 'package:immich_mobile/constants/enums.dart';
|
||||||
|
import 'package:immich_mobile/domain/models/album/album.model.dart';
|
||||||
import 'package:immich_mobile/domain/models/asset/base_asset.model.dart';
|
import 'package:immich_mobile/domain/models/asset/base_asset.model.dart';
|
||||||
import 'package:immich_mobile/domain/models/asset_edit.model.dart';
|
import 'package:immich_mobile/domain/models/asset_edit.model.dart';
|
||||||
import 'package:immich_mobile/domain/services/asset.service.dart';
|
import 'package:immich_mobile/domain/services/asset.service.dart';
|
||||||
|
import 'package:immich_mobile/domain/services/remote_album.service.dart';
|
||||||
import 'package:immich_mobile/models/download/livephotos_medatada.model.dart';
|
import 'package:immich_mobile/models/download/livephotos_medatada.model.dart';
|
||||||
import 'package:immich_mobile/providers/asset_viewer/asset_viewer.provider.dart';
|
import 'package:immich_mobile/providers/asset_viewer/asset_viewer.provider.dart';
|
||||||
import 'package:immich_mobile/providers/backup/asset_upload_progress.provider.dart';
|
import 'package:immich_mobile/providers/backup/asset_upload_progress.provider.dart';
|
||||||
|
import 'package:immich_mobile/providers/infrastructure/album.provider.dart';
|
||||||
import 'package:immich_mobile/providers/infrastructure/asset.provider.dart';
|
import 'package:immich_mobile/providers/infrastructure/asset.provider.dart';
|
||||||
import 'package:immich_mobile/providers/infrastructure/asset_viewer/asset.provider.dart' show assetExifProvider;
|
import 'package:immich_mobile/providers/infrastructure/asset_viewer/asset.provider.dart' show assetExifProvider;
|
||||||
import 'package:immich_mobile/providers/infrastructure/tag.provider.dart';
|
import 'package:immich_mobile/providers/infrastructure/tag.provider.dart';
|
||||||
@@ -373,6 +376,52 @@ class ActionNotifier extends Notifier<void> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Future<ActionResult> addToAlbum(ActionSource source, RemoteAlbum album) async {
|
||||||
|
final selected = _getAssets(source).toList(growable: false);
|
||||||
|
if (selected.isEmpty) {
|
||||||
|
return const ActionResult(count: 0, success: true);
|
||||||
|
}
|
||||||
|
|
||||||
|
final candidates = RemoteAlbumService.categorizeCandidates(selected);
|
||||||
|
final remoteIds = candidates.remoteAssetIds;
|
||||||
|
final localAssets = candidates.localAssetsToUpload;
|
||||||
|
final albumNotifier = ref.read(remoteAlbumProvider.notifier);
|
||||||
|
|
||||||
|
int addedRemote = 0;
|
||||||
|
if (remoteIds.isNotEmpty) {
|
||||||
|
try {
|
||||||
|
addedRemote = await albumNotifier.addAssets(album.id, remoteIds);
|
||||||
|
} catch (error, stack) {
|
||||||
|
_logger.severe('Failed to add assets to album ${album.id}', error, stack);
|
||||||
|
return ActionResult(count: 0, success: false, error: error.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep the selection available for retry if the remote add fails. Once the
|
||||||
|
// album mutation succeeds, clear timeline selection so upload overlays can render.
|
||||||
|
if (source == ActionSource.timeline) {
|
||||||
|
ref.read(multiSelectProvider.notifier).reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (localAssets.isEmpty) {
|
||||||
|
return ActionResult(count: addedRemote, success: true);
|
||||||
|
}
|
||||||
|
|
||||||
|
final uploadResult = await upload(
|
||||||
|
source,
|
||||||
|
assets: localAssets,
|
||||||
|
onAssetUploaded: (asset, remoteId) async {
|
||||||
|
await albumNotifier.linkUploadedAssetToAlbum(album.id, asset, remoteId);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
return ActionResult(
|
||||||
|
count: addedRemote + uploadResult.count,
|
||||||
|
success: uploadResult.success,
|
||||||
|
error: uploadResult.error,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
Future<ActionResult> removeFromAlbum(ActionSource source, String albumId) async {
|
Future<ActionResult> removeFromAlbum(ActionSource source, String albumId) async {
|
||||||
final ids = _getRemoteIdsForSource(source);
|
final ids = _getRemoteIdsForSource(source);
|
||||||
try {
|
try {
|
||||||
@@ -495,8 +544,16 @@ class ActionNotifier extends Notifier<void> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<ActionResult> upload(ActionSource source, {List<LocalAsset>? assets}) async {
|
Future<ActionResult> upload(
|
||||||
|
ActionSource source, {
|
||||||
|
List<LocalAsset>? assets,
|
||||||
|
FutureOr<void> Function(LocalAsset asset, String remoteId)? onAssetUploaded,
|
||||||
|
}) async {
|
||||||
final assetsToUpload = assets ?? _getAssets(source).whereType<LocalAsset>().toList();
|
final assetsToUpload = assets ?? _getAssets(source).whereType<LocalAsset>().toList();
|
||||||
|
final assetById = {for (final a in assetsToUpload) a.id: a};
|
||||||
|
final uploadedAssetIds = <String>{};
|
||||||
|
final failedAssetIds = <String>{};
|
||||||
|
final postUploadTasks = <Future<void>>[];
|
||||||
|
|
||||||
final progressNotifier = ref.read(assetUploadProgressProvider.notifier);
|
final progressNotifier = ref.read(assetUploadProgressProvider.notifier);
|
||||||
final cancelToken = Completer<void>();
|
final cancelToken = Completer<void>();
|
||||||
@@ -518,16 +575,43 @@ class ActionNotifier extends Notifier<void> {
|
|||||||
},
|
},
|
||||||
onSuccess: (localAssetId, remoteAssetId) {
|
onSuccess: (localAssetId, remoteAssetId) {
|
||||||
progressNotifier.remove(localAssetId);
|
progressNotifier.remove(localAssetId);
|
||||||
|
uploadedAssetIds.add(localAssetId);
|
||||||
|
final asset = assetById[localAssetId];
|
||||||
|
final callback = onAssetUploaded;
|
||||||
|
if (asset != null && callback != null) {
|
||||||
|
postUploadTasks.add(
|
||||||
|
Future.sync(() => callback(asset, remoteAssetId)).catchError((Object error, StackTrace stack) {
|
||||||
|
failedAssetIds.add(localAssetId);
|
||||||
|
progressNotifier.setError(localAssetId);
|
||||||
|
_logger.warning('Post-upload callback failed for $localAssetId', error, stack);
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
onError: (localAssetId, errorMessage) {
|
onError: (localAssetId, errorMessage) {
|
||||||
|
failedAssetIds.add(localAssetId);
|
||||||
progressNotifier.setError(localAssetId);
|
progressNotifier.setError(localAssetId);
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
return ActionResult(count: assetsToUpload.length, success: true);
|
|
||||||
|
await Future.wait(postUploadTasks);
|
||||||
|
final successCount = uploadedAssetIds.difference(failedAssetIds).length;
|
||||||
|
final isSuccess = successCount == assetsToUpload.length && failedAssetIds.isEmpty;
|
||||||
|
|
||||||
|
return ActionResult(
|
||||||
|
count: successCount,
|
||||||
|
success: isSuccess,
|
||||||
|
error: isSuccess ? null : 'Failed to upload ${assetsToUpload.length - successCount} assets',
|
||||||
|
);
|
||||||
} catch (error, stack) {
|
} catch (error, stack) {
|
||||||
_logger.severe('Failed manually upload assets', error, stack);
|
_logger.severe('Failed manually upload assets', error, stack);
|
||||||
return ActionResult(count: assetsToUpload.length, success: false, error: error.toString());
|
|
||||||
|
return ActionResult(
|
||||||
|
count: uploadedAssetIds.difference(failedAssetIds).length,
|
||||||
|
success: false,
|
||||||
|
error: error.toString(),
|
||||||
|
);
|
||||||
} finally {
|
} finally {
|
||||||
ref.read(manualUploadCancelTokenProvider.notifier).state = null;
|
ref.read(manualUploadCancelTokenProvider.notifier).state = null;
|
||||||
Future.delayed(const Duration(seconds: 2), () {
|
Future.delayed(const Duration(seconds: 2), () {
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
|
import 'dart:async';
|
||||||
|
|
||||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||||
|
|
||||||
/// Provider holding a boolean function that returns true when cancellation is requested.
|
/// Holds the isolate's cancellation signal.
|
||||||
/// A computation running in the isolate uses the function to implement cooperative cancellation.
|
final cancellationProvider = Provider<Completer<void>>(
|
||||||
final cancellationProvider = Provider<bool Function()>(
|
|
||||||
// This will be overridden in the isolate's container.
|
// This will be overridden in the isolate's container.
|
||||||
// Throwing ensures it's not used without an override.
|
// Throwing ensures it's not used without an override.
|
||||||
(ref) => throw UnimplementedError(
|
(ref) => throw UnimplementedError(
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import 'package:immich_mobile/domain/services/remote_album.service.dart';
|
|||||||
import 'package:immich_mobile/models/albums/album_search.model.dart';
|
import 'package:immich_mobile/models/albums/album_search.model.dart';
|
||||||
import 'package:immich_mobile/providers/album/album_sort_by_options.provider.dart';
|
import 'package:immich_mobile/providers/album/album_sort_by_options.provider.dart';
|
||||||
import 'package:immich_mobile/providers/album/pending_album_uploads.provider.dart';
|
import 'package:immich_mobile/providers/album/pending_album_uploads.provider.dart';
|
||||||
|
import 'package:immich_mobile/providers/backup/asset_upload_progress.provider.dart';
|
||||||
import 'package:immich_mobile/providers/infrastructure/album.provider.dart';
|
import 'package:immich_mobile/providers/infrastructure/album.provider.dart';
|
||||||
import 'package:immich_mobile/providers/user.provider.dart';
|
import 'package:immich_mobile/providers/user.provider.dart';
|
||||||
import 'package:immich_mobile/services/foreground_upload.service.dart';
|
import 'package:immich_mobile/services/foreground_upload.service.dart';
|
||||||
@@ -207,6 +208,22 @@ class RemoteAlbumNotifier extends Notifier<RemoteAlbumState> {
|
|||||||
return added;
|
return added;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Links a freshly-uploaded local asset to an album using its new remote ID,
|
||||||
|
/// upserting a placeholder remote asset row so the local DB join survives
|
||||||
|
/// until the next sync catches up.
|
||||||
|
Future<int> linkUploadedAssetToAlbum(String albumId, LocalAsset source, String remoteId) async {
|
||||||
|
final currentUser = ref.read(currentUserProvider);
|
||||||
|
if (currentUser == null) {
|
||||||
|
throw Exception('User not logged in');
|
||||||
|
}
|
||||||
|
|
||||||
|
final added = await _remoteAlbumService.linkUploadedAssetToAlbum(albumId, remoteId, currentUser, source);
|
||||||
|
if (added > 0) {
|
||||||
|
await _refreshAlbumInState(albumId);
|
||||||
|
}
|
||||||
|
return added;
|
||||||
|
}
|
||||||
|
|
||||||
/// Adds a heterogeneous asset selection to an album. Already-remote assets
|
/// Adds a heterogeneous asset selection to an album. Already-remote assets
|
||||||
/// are linked immediately; local-only assets are queued in
|
/// are linked immediately; local-only assets are queued in
|
||||||
/// [pendingAlbumUploadsProvider] (so the album page can show them with
|
/// [pendingAlbumUploadsProvider] (so the album page can show them with
|
||||||
@@ -221,11 +238,18 @@ class RemoteAlbumNotifier extends Notifier<RemoteAlbumState> {
|
|||||||
final pendingNotifier = ref.read(pendingAlbumUploadsProvider(albumId).notifier);
|
final pendingNotifier = ref.read(pendingAlbumUploadsProvider(albumId).notifier);
|
||||||
pendingNotifier.enqueue(candidates.localAssetsToUpload);
|
pendingNotifier.enqueue(candidates.localAssetsToUpload);
|
||||||
|
|
||||||
|
Completer<void>? cancelToken;
|
||||||
|
if (candidates.localAssetsToUpload.isNotEmpty) {
|
||||||
|
cancelToken = Completer<void>();
|
||||||
|
ref.read(manualUploadCancelTokenProvider.notifier).state = cancelToken;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final added = await _remoteAlbumService.addAssetsToAlbum(
|
final added = await _remoteAlbumService.addAssetsToAlbum(
|
||||||
albumId: albumId,
|
albumId: albumId,
|
||||||
uploader: currentUser,
|
uploader: currentUser,
|
||||||
candidates: candidates,
|
candidates: candidates,
|
||||||
|
cancelToken: cancelToken,
|
||||||
uploadCallbacks: UploadCallbacks(
|
uploadCallbacks: UploadCallbacks(
|
||||||
onProgress: (localAssetId, _, bytes, totalBytes) {
|
onProgress: (localAssetId, _, bytes, totalBytes) {
|
||||||
final progress = totalBytes > 0 ? bytes / totalBytes : 0.0;
|
final progress = totalBytes > 0 ? bytes / totalBytes : 0.0;
|
||||||
@@ -245,6 +269,15 @@ class RemoteAlbumNotifier extends Notifier<RemoteAlbumState> {
|
|||||||
}
|
}
|
||||||
_logger.severe('Failed to add assets to album $albumId', error, stack);
|
_logger.severe('Failed to add assets to album $albumId', error, stack);
|
||||||
rethrow;
|
rethrow;
|
||||||
|
} finally {
|
||||||
|
if (cancelToken != null) {
|
||||||
|
if (cancelToken.isCompleted) {
|
||||||
|
pendingNotifier.clear();
|
||||||
|
}
|
||||||
|
if (ref.read(manualUploadCancelTokenProvider) == cancelToken) {
|
||||||
|
ref.read(manualUploadCancelTokenProvider.notifier).state = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ final syncStreamServiceProvider = Provider(
|
|||||||
permissionRepository: ref.watch(permissionRepositoryProvider),
|
permissionRepository: ref.watch(permissionRepositoryProvider),
|
||||||
syncMigrationRepository: ref.watch(syncMigrationRepositoryProvider),
|
syncMigrationRepository: ref.watch(syncMigrationRepositoryProvider),
|
||||||
api: ref.watch(apiServiceProvider),
|
api: ref.watch(apiServiceProvider),
|
||||||
cancelChecker: ref.watch(cancellationProvider),
|
cancellation: ref.watch(cancellationProvider),
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -42,6 +42,7 @@ final localSyncServiceProvider = Provider(
|
|||||||
assetMediaRepository: ref.watch(assetMediaRepositoryProvider),
|
assetMediaRepository: ref.watch(assetMediaRepositoryProvider),
|
||||||
permissionRepository: ref.watch(permissionRepositoryProvider),
|
permissionRepository: ref.watch(permissionRepositoryProvider),
|
||||||
nativeSyncApi: ref.watch(nativeSyncApiProvider),
|
nativeSyncApi: ref.watch(nativeSyncApiProvider),
|
||||||
|
cancellation: ref.watch(cancellationProvider),
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -51,5 +52,6 @@ final hashServiceProvider = Provider(
|
|||||||
localAssetRepository: ref.watch(localAssetRepository),
|
localAssetRepository: ref.watch(localAssetRepository),
|
||||||
nativeSyncApi: ref.watch(nativeSyncApiProvider),
|
nativeSyncApi: ref.watch(nativeSyncApiProvider),
|
||||||
trashedLocalAssetRepository: ref.watch(trashedLocalAssetRepository),
|
trashedLocalAssetRepository: ref.watch(trashedLocalAssetRepository),
|
||||||
|
cancellation: ref.watch(cancellationProvider),
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -41,8 +41,14 @@ class DriftAlbumApiRepository extends ApiRepository {
|
|||||||
return (removed: removed, failed: failed);
|
return (removed: removed, failed: failed);
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<({List<String> added, List<String> failed})> addAssets(String albumId, Iterable<String> assetIds) async {
|
Future<({List<String> added, List<String> failed})> addAssets(
|
||||||
final response = await checkNull(_api.addAssetsToAlbum(albumId, BulkIdsDto(ids: assetIds.toList())));
|
String albumId,
|
||||||
|
Iterable<String> assetIds, {
|
||||||
|
Future<void>? abortTrigger,
|
||||||
|
}) async {
|
||||||
|
final response = await checkNull(
|
||||||
|
_api.addAssetsToAlbum(albumId, BulkIdsDto(ids: assetIds.toList()), abortTrigger: abortTrigger),
|
||||||
|
);
|
||||||
final List<String> added = [], failed = [];
|
final List<String> added = [], failed = [];
|
||||||
for (final dto in response) {
|
for (final dto in response) {
|
||||||
if (dto.success) {
|
if (dto.success) {
|
||||||
|
|||||||
@@ -8,10 +8,9 @@ import 'package:immich_mobile/entities/store.entity.dart';
|
|||||||
import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
|
import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
|
||||||
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
|
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
|
||||||
import 'package:immich_mobile/utils/bootstrap.dart';
|
import 'package:immich_mobile/utils/bootstrap.dart';
|
||||||
import 'package:immich_mobile/utils/debug_print.dart';
|
|
||||||
import 'package:immich_mobile/wm_executor.dart';
|
import 'package:immich_mobile/wm_executor.dart';
|
||||||
import 'package:logging/logging.dart';
|
import 'package:logging/logging.dart';
|
||||||
import 'package:worker_manager/worker_manager.dart';
|
import 'package:worker_manager/worker_manager.dart' show Cancelable;
|
||||||
|
|
||||||
class InvalidIsolateUsageException implements Exception {
|
class InvalidIsolateUsageException implements Exception {
|
||||||
const InvalidIsolateUsageException();
|
const InvalidIsolateUsageException();
|
||||||
@@ -30,50 +29,27 @@ Cancelable<T?> runInIsolateGentle<T>({
|
|||||||
throw const InvalidIsolateUsageException();
|
throw const InvalidIsolateUsageException();
|
||||||
}
|
}
|
||||||
|
|
||||||
return workerManagerPatch.executeGentle((cancelledChecker) async {
|
return workerManagerPatch.executeGentle((onCancel) async {
|
||||||
T? result;
|
BackgroundIsolateBinaryMessenger.ensureInitialized(token);
|
||||||
await runZonedGuarded(
|
DartPluginRegistrant.ensureInitialized();
|
||||||
() async {
|
|
||||||
BackgroundIsolateBinaryMessenger.ensureInitialized(token);
|
|
||||||
DartPluginRegistrant.ensureInitialized();
|
|
||||||
|
|
||||||
final (drift, logDb) = await Bootstrap.initDomain(shouldBufferLogs: false, listenStoreUpdates: false);
|
final log = Logger("IsolateLogger");
|
||||||
final ref = ProviderContainer(
|
final (drift, logDb) = await Bootstrap.initDomain(shouldBufferLogs: false, listenStoreUpdates: false);
|
||||||
overrides: [
|
final ref = ProviderContainer(
|
||||||
cancellationProvider.overrideWithValue(cancelledChecker),
|
overrides: [cancellationProvider.overrideWithValue(onCancel), driftProvider.overrideWith(driftOverride(drift))],
|
||||||
driftProvider.overrideWith(driftOverride(drift)),
|
|
||||||
],
|
|
||||||
);
|
|
||||||
|
|
||||||
Logger log = Logger("IsolateLogger");
|
|
||||||
|
|
||||||
try {
|
|
||||||
result = await computation(ref);
|
|
||||||
} on CanceledError {
|
|
||||||
log.warning("Computation cancelled ${debugLabel == null ? '' : ' for $debugLabel'}");
|
|
||||||
} catch (error, stack) {
|
|
||||||
log.severe("Error in runInIsolateGentle ${debugLabel == null ? '' : ' for $debugLabel'}", error, stack);
|
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
ref.dispose();
|
|
||||||
|
|
||||||
await Store.dispose();
|
|
||||||
await LogService.I.dispose();
|
|
||||||
await logDb.close();
|
|
||||||
await drift.close();
|
|
||||||
} catch (error, stack) {
|
|
||||||
dPrint(() => "Error closing resources in isolate: $error, $stack");
|
|
||||||
} finally {
|
|
||||||
ref.dispose();
|
|
||||||
// Delay to ensure all resources are released
|
|
||||||
await Future.delayed(const Duration(seconds: 2));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
(error, stack) {
|
|
||||||
dPrint(() => "Error in isolate $debugLabel zone: $error, $stack");
|
|
||||||
},
|
|
||||||
);
|
);
|
||||||
return result;
|
|
||||||
|
try {
|
||||||
|
return await computation(ref);
|
||||||
|
} catch (error, stack) {
|
||||||
|
log.severe("Error in runInIsolateGentle${debugLabel == null ? '' : ' for $debugLabel'}", error, stack);
|
||||||
|
return null;
|
||||||
|
} finally {
|
||||||
|
ref.dispose();
|
||||||
|
await Store.dispose();
|
||||||
|
await LogService.I.dispose();
|
||||||
|
await logDb.close();
|
||||||
|
await drift.close();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,163 @@
|
|||||||
|
// Forked from worker_manager's `WorkerImpl` (src/worker/worker_io.dart): a
|
||||||
|
// `CancelRequest` completes the computation's [Completer] (so it can await
|
||||||
|
// cancellation and unwind) instead of flipping a polled flag, and [shutdown]
|
||||||
|
// lets the isolate drain and exit on its own rather than force-killing it. Only
|
||||||
|
// the gentle-with-cancellation path immich uses is kept.
|
||||||
|
//
|
||||||
|
// ignore_for_file: implementation_imports
|
||||||
|
|
||||||
|
import 'dart:async';
|
||||||
|
import 'dart:isolate';
|
||||||
|
|
||||||
|
import 'package:worker_manager/src/scheduling/task.dart';
|
||||||
|
import 'package:worker_manager/src/worker/cancel_request.dart';
|
||||||
|
import 'package:worker_manager/src/worker/result.dart';
|
||||||
|
|
||||||
|
/// A worker computation that receives a [Completer] which completes on
|
||||||
|
/// cancellation: await its future to react promptly, or read `isCompleted`.
|
||||||
|
typedef GentleExecution<R> = FutureOr<R> Function(Completer<void> onCancel);
|
||||||
|
|
||||||
|
class _Shutdown {
|
||||||
|
const _Shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
class IsolateWorker {
|
||||||
|
IsolateWorker();
|
||||||
|
|
||||||
|
Isolate? _isolate;
|
||||||
|
RawReceivePort? _receivePort;
|
||||||
|
SendPort? _sendPort;
|
||||||
|
Completer<void>? _sendPortReceived;
|
||||||
|
Completer? _result;
|
||||||
|
|
||||||
|
String? taskId;
|
||||||
|
|
||||||
|
bool get initialized => _sendPortReceived?.isCompleted ?? false;
|
||||||
|
|
||||||
|
bool get initializing {
|
||||||
|
final sendPortReceived = _sendPortReceived;
|
||||||
|
return sendPortReceived != null && !sendPortReceived.isCompleted;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> initialize() async {
|
||||||
|
final sendPortReceived = _sendPortReceived = Completer<void>();
|
||||||
|
final receivePort = _receivePort = RawReceivePort();
|
||||||
|
receivePort.handler = (Object message) {
|
||||||
|
if (message is SendPort) {
|
||||||
|
_sendPort = message;
|
||||||
|
sendPortReceived.complete();
|
||||||
|
} else if (message is ResultSuccess) {
|
||||||
|
_result?.complete(message.value);
|
||||||
|
_afterTask();
|
||||||
|
} else if (message is ResultError) {
|
||||||
|
_result?.completeError(message.error, message.stackTrace);
|
||||||
|
_afterTask();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
_isolate = await Isolate.spawn(_isolateEntry, receivePort.sendPort, errorsAreFatal: false);
|
||||||
|
await sendPortReceived.future;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<R> work<R>(Task<R> task) async {
|
||||||
|
taskId = task.id;
|
||||||
|
final result = _result = Completer();
|
||||||
|
_sendPort!.send(task.execution);
|
||||||
|
return await (result.future as Future<R>);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cancels the current task without retiring the worker.
|
||||||
|
void cancelGentle() => _sendPort?.send(CancelRequest());
|
||||||
|
|
||||||
|
/// Cancels any in-flight task and awaits the isolate exiting on its own — no
|
||||||
|
/// force-kill, so `finally` blocks and native cleanup always run.
|
||||||
|
///
|
||||||
|
/// Detaches the slot up front so a concurrent [initialize] can revive it
|
||||||
|
/// without colliding (revival installs fresh ports while this drains the ones
|
||||||
|
/// it captured locally). A revived worker is always idle, so the still-live
|
||||||
|
/// receive-port handler can't misroute a result.
|
||||||
|
Future<void> shutdown() async {
|
||||||
|
final sendPortReceived = _sendPortReceived;
|
||||||
|
if (sendPortReceived != null && !sendPortReceived.isCompleted) {
|
||||||
|
await sendPortReceived.future;
|
||||||
|
}
|
||||||
|
|
||||||
|
final isolate = _isolate;
|
||||||
|
final receivePort = _receivePort;
|
||||||
|
final sendPort = _sendPort;
|
||||||
|
if (isolate == null || receivePort == null || sendPort == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
_isolate = null;
|
||||||
|
_sendPort = null;
|
||||||
|
_sendPortReceived = null;
|
||||||
|
// Not _result: an in-flight task still delivers it before exiting; nulling
|
||||||
|
// here would drop that and hang work()'s caller.
|
||||||
|
|
||||||
|
final exited = Completer<void>();
|
||||||
|
final exitPort = RawReceivePort();
|
||||||
|
exitPort.handler = (_) {
|
||||||
|
if (!exited.isCompleted) {
|
||||||
|
exited.complete();
|
||||||
|
}
|
||||||
|
exitPort.close();
|
||||||
|
};
|
||||||
|
isolate.addOnExitListener(exitPort.sendPort);
|
||||||
|
sendPort.send(const _Shutdown());
|
||||||
|
await exited.future;
|
||||||
|
receivePort.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
void _afterTask() {
|
||||||
|
taskId = null;
|
||||||
|
_result = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void _isolateEntry(SendPort sendPort) {
|
||||||
|
final receivePort = RawReceivePort();
|
||||||
|
sendPort.send(receivePort.sendPort);
|
||||||
|
// One task at a time, so a single completer suffices; null between tasks.
|
||||||
|
Completer<void>? onCancel;
|
||||||
|
void cancel() {
|
||||||
|
if (onCancel?.isCompleted == false) {
|
||||||
|
onCancel!.complete();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var shuttingDown = false;
|
||||||
|
var running = false;
|
||||||
|
receivePort.handler = (message) async {
|
||||||
|
if (message is _Shutdown) {
|
||||||
|
shuttingDown = true;
|
||||||
|
cancel();
|
||||||
|
if (!running) {
|
||||||
|
Isolate.exit();
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (message is CancelRequest) {
|
||||||
|
cancel();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
final execution = message as GentleExecution;
|
||||||
|
onCancel = Completer<void>();
|
||||||
|
running = true;
|
||||||
|
Result result;
|
||||||
|
try {
|
||||||
|
result = ResultSuccess(await execution(onCancel!));
|
||||||
|
} catch (error, stackTrace) {
|
||||||
|
result = ResultError(error, stackTrace);
|
||||||
|
} finally {
|
||||||
|
onCancel = null;
|
||||||
|
running = false;
|
||||||
|
}
|
||||||
|
if (shuttingDown) {
|
||||||
|
// An isolate that has used platform channels can't exit on its own (Flutter's BackgroundIsolateBinaryMessenger
|
||||||
|
// opens an undisposable port), so closing our ports isn't enough. Isolate.exit delivers the result as its final
|
||||||
|
// message and terminates. It's abrupt (skips pending finally/microtasks) but safe here: the computation and its
|
||||||
|
// `finally` are already done and there's no await before this, so nothing pending is skipped.
|
||||||
|
Isolate.exit(sendPort, result);
|
||||||
|
}
|
||||||
|
sendPort.send(result);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
+31
-109
@@ -6,8 +6,8 @@ import 'dart:math';
|
|||||||
|
|
||||||
import 'package:collection/collection.dart';
|
import 'package:collection/collection.dart';
|
||||||
import 'package:flutter/foundation.dart';
|
import 'package:flutter/foundation.dart';
|
||||||
|
import 'package:immich_mobile/utils/isolate_worker.dart';
|
||||||
import 'package:worker_manager/src/number_of_processors/processors_io.dart';
|
import 'package:worker_manager/src/number_of_processors/processors_io.dart';
|
||||||
import 'package:worker_manager/src/worker/worker.dart';
|
|
||||||
import 'package:worker_manager/worker_manager.dart';
|
import 'package:worker_manager/worker_manager.dart';
|
||||||
|
|
||||||
final workerManagerPatch = _Executor();
|
final workerManagerPatch = _Executor();
|
||||||
@@ -16,6 +16,13 @@ final workerManagerPatch = _Executor();
|
|||||||
const _minId = -9007199254740992;
|
const _minId = -9007199254740992;
|
||||||
const _maxId = 9007199254740992;
|
const _maxId = 9007199254740992;
|
||||||
|
|
||||||
|
class _GentleTask<R> extends Task<R> implements Gentle {
|
||||||
|
@override
|
||||||
|
final GentleExecution<R> execution;
|
||||||
|
|
||||||
|
_GentleTask({required super.id, required super.completer, required super.workPriority, required this.execution});
|
||||||
|
}
|
||||||
|
|
||||||
class Mixinable<T> {
|
class Mixinable<T> {
|
||||||
late final itSelf = this as T;
|
late final itSelf = this as T;
|
||||||
}
|
}
|
||||||
@@ -51,13 +58,13 @@ mixin _ExecutorLogger on Mixinable<_Executor> {
|
|||||||
|
|
||||||
class _Executor extends Mixinable<_Executor> with _ExecutorLogger {
|
class _Executor extends Mixinable<_Executor> with _ExecutorLogger {
|
||||||
final _queue = PriorityQueue<Task>();
|
final _queue = PriorityQueue<Task>();
|
||||||
final _pool = <Worker>[];
|
final _pool = <IsolateWorker>[];
|
||||||
var _nextTaskId = _minId;
|
var _nextTaskId = _minId;
|
||||||
var _dynamicSpawning = false;
|
var _dynamicSpawning = false;
|
||||||
var _isolatesCount = numberOfProcessors;
|
var _isolatesCount = numberOfProcessors;
|
||||||
|
|
||||||
@visibleForTesting
|
@visibleForTesting
|
||||||
UnmodifiableListView<Worker> get pool => UnmodifiableListView(_pool);
|
UnmodifiableListView<IsolateWorker> get pool => UnmodifiableListView(_pool);
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> init({int? isolatesCount, bool? dynamicSpawning}) async {
|
Future<void> init({int? isolatesCount, bool? dynamicSpawning}) async {
|
||||||
@@ -80,117 +87,37 @@ class _Executor extends Mixinable<_Executor> with _ExecutorLogger {
|
|||||||
@override
|
@override
|
||||||
Future<void> dispose() async {
|
Future<void> dispose() async {
|
||||||
_queue.clear();
|
_queue.clear();
|
||||||
for (final worker in _pool) {
|
final shutdown = _pool.map((worker) => worker.shutdown()).toList(growable: false);
|
||||||
if (worker.initialized || worker.initializing) {
|
|
||||||
worker.kill();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_pool.clear();
|
_pool.clear();
|
||||||
|
await Future.wait(shutdown);
|
||||||
super.dispose();
|
super.dispose();
|
||||||
}
|
}
|
||||||
|
|
||||||
Cancelable<R> execute<R>(Execute<R> execution, {WorkPriority priority = WorkPriority.immediately}) {
|
/// Runs [execution] on a worker isolate; its [Completer] completes when the
|
||||||
return _createCancelable<R>(execution: execution, priority: priority);
|
/// returned [Cancelable] is cancelled.
|
||||||
}
|
Cancelable<R> executeGentle<R>(GentleExecution<R> execution, {WorkPriority priority = WorkPriority.immediately}) {
|
||||||
|
|
||||||
Cancelable<R> executeNow<R>(ExecuteGentle<R> execution) {
|
|
||||||
final task = TaskGentle<R>(
|
|
||||||
id: "",
|
|
||||||
workPriority: WorkPriority.immediately,
|
|
||||||
execution: execution,
|
|
||||||
completer: Completer<R>(),
|
|
||||||
);
|
|
||||||
|
|
||||||
Future<void> run() async {
|
|
||||||
try {
|
|
||||||
final result = await execution(() => task.canceled);
|
|
||||||
task.complete(result, null, null);
|
|
||||||
} catch (error, st) {
|
|
||||||
task.complete(null, error, st);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
run();
|
|
||||||
return Cancelable(completer: task.completer, onCancel: () => _cancel(task));
|
|
||||||
}
|
|
||||||
|
|
||||||
Cancelable<R> executeWithPort<R, T>(
|
|
||||||
ExecuteWithPort<R> execution, {
|
|
||||||
WorkPriority priority = WorkPriority.immediately,
|
|
||||||
required void Function(T value) onMessage,
|
|
||||||
}) {
|
|
||||||
return _createCancelable<R>(
|
|
||||||
execution: execution,
|
|
||||||
priority: priority,
|
|
||||||
onMessage: (message) => onMessage(message as T),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
Cancelable<R> executeGentle<R>(ExecuteGentle<R> execution, {WorkPriority priority = WorkPriority.immediately}) {
|
|
||||||
return _createCancelable<R>(execution: execution, priority: priority);
|
|
||||||
}
|
|
||||||
|
|
||||||
Cancelable<R> executeGentleWithPort<R, T>(
|
|
||||||
ExecuteGentleWithPort<R> execution, {
|
|
||||||
WorkPriority priority = WorkPriority.immediately,
|
|
||||||
required void Function(T value) onMessage,
|
|
||||||
}) {
|
|
||||||
return _createCancelable<R>(
|
|
||||||
execution: execution,
|
|
||||||
priority: priority,
|
|
||||||
onMessage: (message) => onMessage(message as T),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
void _createWorkers() {
|
|
||||||
for (var i = 0; i < _isolatesCount; i++) {
|
|
||||||
_pool.add(Worker());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Future<void> _initializeWorkers() async {
|
|
||||||
await Future.wait(_pool.map((e) => e.initialize()));
|
|
||||||
}
|
|
||||||
|
|
||||||
Cancelable<R> _createCancelable<R>({
|
|
||||||
required Function execution,
|
|
||||||
WorkPriority priority = WorkPriority.immediately,
|
|
||||||
void Function(Object value)? onMessage,
|
|
||||||
}) {
|
|
||||||
if (_nextTaskId + 1 == _maxId) {
|
if (_nextTaskId + 1 == _maxId) {
|
||||||
_nextTaskId = _minId;
|
_nextTaskId = _minId;
|
||||||
}
|
}
|
||||||
final id = _nextTaskId.toString();
|
final id = _nextTaskId.toString();
|
||||||
_nextTaskId++;
|
_nextTaskId++;
|
||||||
late final Task<R> task;
|
final task = _GentleTask<R>(id: id, workPriority: priority, execution: execution, completer: Completer<R>());
|
||||||
final completer = Completer<R>();
|
|
||||||
if (execution is ExecuteWithPort<R>) {
|
|
||||||
task = TaskWithPort<R>(
|
|
||||||
id: id,
|
|
||||||
workPriority: priority,
|
|
||||||
execution: execution,
|
|
||||||
completer: completer,
|
|
||||||
onMessage: onMessage!,
|
|
||||||
);
|
|
||||||
} else if (execution is ExecuteGentle<R>) {
|
|
||||||
task = TaskGentle<R>(id: id, workPriority: priority, execution: execution, completer: completer);
|
|
||||||
} else if (execution is ExecuteGentleWithPort<R>) {
|
|
||||||
task = TaskGentleWithPort<R>(
|
|
||||||
id: id,
|
|
||||||
workPriority: priority,
|
|
||||||
execution: execution,
|
|
||||||
completer: completer,
|
|
||||||
onMessage: onMessage!,
|
|
||||||
);
|
|
||||||
} else if (execution is Execute<R>) {
|
|
||||||
task = TaskRegular<R>(id: id, workPriority: priority, execution: execution, completer: completer);
|
|
||||||
}
|
|
||||||
_queue.add(task);
|
_queue.add(task);
|
||||||
_schedule();
|
_schedule();
|
||||||
logTaskAdded(task.id);
|
logTaskAdded(task.id);
|
||||||
return Cancelable(completer: task.completer, onCancel: () => _cancel(task));
|
return Cancelable(completer: task.completer, onCancel: () => _cancel(task));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void _createWorkers() {
|
||||||
|
for (var i = 0; i < _isolatesCount; i++) {
|
||||||
|
_pool.add(IsolateWorker());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _initializeWorkers() async {
|
||||||
|
await Future.wait(_pool.map((e) => e.initialize()));
|
||||||
|
}
|
||||||
|
|
||||||
Future<void> _ensureWorkersInitialized() async {
|
Future<void> _ensureWorkersInitialized() async {
|
||||||
if (_pool.isEmpty) {
|
if (_pool.isEmpty) {
|
||||||
_createWorkers();
|
_createWorkers();
|
||||||
@@ -240,7 +167,9 @@ class _Executor extends Mixinable<_Executor> with _ExecutorLogger {
|
|||||||
)
|
)
|
||||||
.whenComplete(() {
|
.whenComplete(() {
|
||||||
if (_dynamicSpawning && _queue.isEmpty) {
|
if (_dynamicSpawning && _queue.isEmpty) {
|
||||||
availableWorker.kill();
|
// Retire the idle worker; shutdown() nulls its fields so the husk
|
||||||
|
// stays pooled and is revived by initialize() if work arrives.
|
||||||
|
unawaited(availableWorker.shutdown());
|
||||||
}
|
}
|
||||||
_schedule();
|
_schedule();
|
||||||
});
|
});
|
||||||
@@ -250,15 +179,8 @@ class _Executor extends Mixinable<_Executor> with _ExecutorLogger {
|
|||||||
void _cancel(Task task) {
|
void _cancel(Task task) {
|
||||||
task.cancel();
|
task.cancel();
|
||||||
_queue.remove(task);
|
_queue.remove(task);
|
||||||
final targetWorker = _pool.firstWhereOrNull((worker) => worker.taskId == task.id);
|
// All tasks are gentle: signal cancellation; the worker unwinds on its own.
|
||||||
if (task is Gentle) {
|
_pool.firstWhereOrNull((worker) => worker.taskId == task.id)?.cancelGentle();
|
||||||
targetWorker?.cancelGentle();
|
|
||||||
} else {
|
|
||||||
targetWorker?.kill();
|
|
||||||
if (!_dynamicSpawning) {
|
|
||||||
targetWorker?.initialize();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
super._cancel(task);
|
super._cancel(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -105,25 +105,26 @@ class CloudIdResult {
|
|||||||
|
|
||||||
@HostApi()
|
@HostApi()
|
||||||
abstract class NativeSyncApi {
|
abstract class NativeSyncApi {
|
||||||
|
@async
|
||||||
bool shouldFullSync();
|
bool shouldFullSync();
|
||||||
|
|
||||||
@TaskQueue(type: TaskQueueType.serialBackgroundThread)
|
@async
|
||||||
SyncDelta getMediaChanges();
|
SyncDelta getMediaChanges();
|
||||||
|
|
||||||
void checkpointSync();
|
void checkpointSync();
|
||||||
|
|
||||||
void clearSyncCheckpoint();
|
void clearSyncCheckpoint();
|
||||||
|
|
||||||
@TaskQueue(type: TaskQueueType.serialBackgroundThread)
|
@async
|
||||||
List<String> getAssetIdsForAlbum(String albumId);
|
List<String> getAssetIdsForAlbum(String albumId);
|
||||||
|
|
||||||
@TaskQueue(type: TaskQueueType.serialBackgroundThread)
|
@async
|
||||||
List<PlatformAlbum> getAlbums();
|
List<PlatformAlbum> getAlbums();
|
||||||
|
|
||||||
@TaskQueue(type: TaskQueueType.serialBackgroundThread)
|
@TaskQueue(type: TaskQueueType.serialBackgroundThread)
|
||||||
int getAssetsCountSince(String albumId, int timestamp);
|
int getAssetsCountSince(String albumId, int timestamp);
|
||||||
|
|
||||||
@TaskQueue(type: TaskQueueType.serialBackgroundThread)
|
@async
|
||||||
List<PlatformAsset> getAssetsForAlbum(String albumId, {int? updatedTimeCond});
|
List<PlatformAsset> getAssetsForAlbum(String albumId, {int? updatedTimeCond});
|
||||||
|
|
||||||
@async
|
@async
|
||||||
@@ -132,6 +133,8 @@ abstract class NativeSyncApi {
|
|||||||
|
|
||||||
void cancelHashing();
|
void cancelHashing();
|
||||||
|
|
||||||
|
void cancelSync();
|
||||||
|
|
||||||
@TaskQueue(type: TaskQueueType.serialBackgroundThread)
|
@TaskQueue(type: TaskQueueType.serialBackgroundThread)
|
||||||
Map<String, List<PlatformAsset>> getTrashedAssets();
|
Map<String, List<PlatformAsset>> getTrashedAssets();
|
||||||
|
|
||||||
|
|||||||
@@ -36,13 +36,6 @@ class _AbortCallbackWrapper {
|
|||||||
|
|
||||||
class _MockAbortCallbackWrapper extends Mock implements _AbortCallbackWrapper {}
|
class _MockAbortCallbackWrapper extends Mock implements _AbortCallbackWrapper {}
|
||||||
|
|
||||||
class _CancellationWrapper {
|
|
||||||
const _CancellationWrapper();
|
|
||||||
|
|
||||||
bool call() => false;
|
|
||||||
}
|
|
||||||
|
|
||||||
class _MockCancellationWrapper extends Mock implements _CancellationWrapper {}
|
|
||||||
|
|
||||||
void main() {
|
void main() {
|
||||||
late SyncStreamService sut;
|
late SyncStreamService sut;
|
||||||
@@ -94,9 +87,13 @@ void main() {
|
|||||||
|
|
||||||
when(() => mockAbortCallbackWrapper()).thenReturn(false);
|
when(() => mockAbortCallbackWrapper()).thenReturn(false);
|
||||||
|
|
||||||
when(() => mockSyncApiRepo.streamChanges(any(), serverVersion: any(named: 'serverVersion'))).thenAnswer((
|
when(
|
||||||
invocation,
|
() => mockSyncApiRepo.streamChanges(
|
||||||
) async {
|
any(),
|
||||||
|
serverVersion: any(named: 'serverVersion'),
|
||||||
|
abortSignal: any(named: 'abortSignal'),
|
||||||
|
),
|
||||||
|
).thenAnswer((invocation) async {
|
||||||
handleEventsCallback = invocation.positionalArguments.first;
|
handleEventsCallback = invocation.positionalArguments.first;
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -105,6 +102,7 @@ void main() {
|
|||||||
any(),
|
any(),
|
||||||
onReset: any(named: 'onReset'),
|
onReset: any(named: 'onReset'),
|
||||||
serverVersion: any(named: 'serverVersion'),
|
serverVersion: any(named: 'serverVersion'),
|
||||||
|
abortSignal: any(named: 'abortSignal'),
|
||||||
),
|
),
|
||||||
).thenAnswer((invocation) async {
|
).thenAnswer((invocation) async {
|
||||||
handleEventsCallback = invocation.positionalArguments.first;
|
handleEventsCallback = invocation.positionalArguments.first;
|
||||||
@@ -233,8 +231,7 @@ void main() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
test("aborts and stops processing if cancelled during iteration", () async {
|
test("aborts and stops processing if cancelled during iteration", () async {
|
||||||
final cancellationChecker = _MockCancellationWrapper();
|
final cancellation = Completer<void>();
|
||||||
when(() => cancellationChecker()).thenReturn(false);
|
|
||||||
|
|
||||||
sut = SyncStreamService(
|
sut = SyncStreamService(
|
||||||
syncApiRepository: mockSyncApiRepo,
|
syncApiRepository: mockSyncApiRepo,
|
||||||
@@ -243,7 +240,7 @@ void main() {
|
|||||||
trashedLocalAssetRepository: mockTrashedLocalAssetRepo,
|
trashedLocalAssetRepository: mockTrashedLocalAssetRepo,
|
||||||
assetMediaRepository: mockAssetMediaRepo,
|
assetMediaRepository: mockAssetMediaRepo,
|
||||||
permissionRepository: mockPermissionRepo,
|
permissionRepository: mockPermissionRepo,
|
||||||
cancelChecker: cancellationChecker.call,
|
cancellation: cancellation,
|
||||||
api: mockApi,
|
api: mockApi,
|
||||||
syncMigrationRepository: mockSyncMigrationRepo,
|
syncMigrationRepository: mockSyncMigrationRepo,
|
||||||
);
|
);
|
||||||
@@ -252,7 +249,7 @@ void main() {
|
|||||||
final events = [SyncStreamStub.userDeleteV1, SyncStreamStub.userV1Admin, SyncStreamStub.partnerDeleteV1];
|
final events = [SyncStreamStub.userDeleteV1, SyncStreamStub.userV1Admin, SyncStreamStub.partnerDeleteV1];
|
||||||
|
|
||||||
when(() => mockSyncStreamRepo.deleteUsersV1(any())).thenAnswer((_) async {
|
when(() => mockSyncStreamRepo.deleteUsersV1(any())).thenAnswer((_) async {
|
||||||
when(() => cancellationChecker()).thenReturn(true);
|
cancellation.complete();
|
||||||
});
|
});
|
||||||
|
|
||||||
await handleEventsCallback(events, mockAbortCallbackWrapper.call, mockResetCallbackWrapper.call);
|
await handleEventsCallback(events, mockAbortCallbackWrapper.call, mockResetCallbackWrapper.call);
|
||||||
@@ -267,8 +264,7 @@ void main() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
test("aborts and stops processing if cancelled before processing batch", () async {
|
test("aborts and stops processing if cancelled before processing batch", () async {
|
||||||
final cancellationChecker = _MockCancellationWrapper();
|
final cancellation = Completer<void>();
|
||||||
when(() => cancellationChecker()).thenReturn(false);
|
|
||||||
|
|
||||||
final processingCompleter = Completer<void>();
|
final processingCompleter = Completer<void>();
|
||||||
bool handler1Started = false;
|
bool handler1Started = false;
|
||||||
@@ -284,7 +280,7 @@ void main() {
|
|||||||
trashedLocalAssetRepository: mockTrashedLocalAssetRepo,
|
trashedLocalAssetRepository: mockTrashedLocalAssetRepo,
|
||||||
assetMediaRepository: mockAssetMediaRepo,
|
assetMediaRepository: mockAssetMediaRepo,
|
||||||
permissionRepository: mockPermissionRepo,
|
permissionRepository: mockPermissionRepo,
|
||||||
cancelChecker: cancellationChecker.call,
|
cancellation: cancellation,
|
||||||
api: mockApi,
|
api: mockApi,
|
||||||
syncMigrationRepository: mockSyncMigrationRepo,
|
syncMigrationRepository: mockSyncMigrationRepo,
|
||||||
);
|
);
|
||||||
@@ -303,7 +299,7 @@ void main() {
|
|||||||
expect(handler1Started, isTrue);
|
expect(handler1Started, isTrue);
|
||||||
|
|
||||||
// Signal cancellation while handler 1 is waiting
|
// Signal cancellation while handler 1 is waiting
|
||||||
when(() => cancellationChecker()).thenReturn(true);
|
cancellation.complete();
|
||||||
await pumpEventQueue();
|
await pumpEventQueue();
|
||||||
|
|
||||||
processingCompleter.complete();
|
processingCompleter.complete();
|
||||||
|
|||||||
@@ -0,0 +1,23 @@
|
|||||||
|
import 'package:flutter_test/flutter_test.dart';
|
||||||
|
import 'package:immich_mobile/wm_executor.dart';
|
||||||
|
|
||||||
|
void main() {
|
||||||
|
tearDown(workerManagerPatch.dispose);
|
||||||
|
|
||||||
|
test('dispose() drains a cancelled task and delivers its result', () async {
|
||||||
|
await workerManagerPatch.init(isolatesCount: 1, dynamicSpawning: false);
|
||||||
|
|
||||||
|
final task = workerManagerPatch.executeGentle((onCancel) async {
|
||||||
|
await onCancel.future;
|
||||||
|
return 'drained';
|
||||||
|
});
|
||||||
|
|
||||||
|
await workerManagerPatch.dispose();
|
||||||
|
|
||||||
|
expect(
|
||||||
|
await task.timeout(const Duration(seconds: 5)),
|
||||||
|
'drained',
|
||||||
|
reason: 'the worker must finish and return its result, not be killed mid-task',
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
@@ -152,13 +152,4 @@ export const Route = {
|
|||||||
// queues
|
// queues
|
||||||
queues: () => '/admin/queues',
|
queues: () => '/admin/queues',
|
||||||
viewQueue: ({ name }: { name: QueueName }) => `/admin/queues/${asQueueSlug(name)}`,
|
viewQueue: ({ name }: { name: QueueName }) => `/admin/queues/${asQueueSlug(name)}`,
|
||||||
|
|
||||||
// continue helper for ensuring same-origin URLs
|
|
||||||
continue: (url: string | null, fallback: string) => {
|
|
||||||
if (!url || !url.startsWith('/') || url.startsWith('//')) {
|
|
||||||
return fallback;
|
|
||||||
}
|
|
||||||
|
|
||||||
return url;
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -8,8 +8,7 @@ import type { PageLoad } from './$types';
|
|||||||
export const load = (async ({ parent, url }) => {
|
export const load = (async ({ parent, url }) => {
|
||||||
await parent();
|
await parent();
|
||||||
|
|
||||||
const continueUrl = Route.continue(url.searchParams.get('continue'), Route.photos());
|
const continueUrl = url.searchParams.get('continue') || Route.photos();
|
||||||
|
|
||||||
if (authManager.authenticated) {
|
if (authManager.authenticated) {
|
||||||
redirect(307, continueUrl);
|
redirect(307, continueUrl);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,7 +30,7 @@
|
|||||||
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, 1000));
|
await new Promise((resolve) => setTimeout(resolve, 1000));
|
||||||
|
|
||||||
await goto(Route.continue(data.continueUrl, Route.photos()));
|
await goto(data.continueUrl);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
handleError(error, $t('wrong_pin_code'));
|
handleError(error, $t('wrong_pin_code'));
|
||||||
isBadPinCode = true;
|
isBadPinCode = true;
|
||||||
|
|||||||
Reference in New Issue
Block a user