From 9ab0c604eabaa767276b5f60d6b11557a029b68a Mon Sep 17 00:00:00 2001 From: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com> Date: Tue, 8 Jul 2025 23:08:06 +0530 Subject: [PATCH] fix: run reload within a mutex --- .../lib/domain/services/timeline.service.dart | 64 +++++++++++++------ 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/mobile/lib/domain/services/timeline.service.dart b/mobile/lib/domain/services/timeline.service.dart index 02ed552e0e..1c66e76bdf 100644 --- a/mobile/lib/domain/services/timeline.service.dart +++ b/mobile/lib/domain/services/timeline.service.dart @@ -108,6 +108,11 @@ class TimelineFactory { class TimelineService { final TimelineAssetSource _assetSource; final TimelineBucketSource _bucketSource; + final AsyncMutex _mutex = AsyncMutex(); + int _bufferOffset = 0; + List _buffer = []; + StreamSubscription? _bucketSubscription; + int _totalAssets = 0; int get totalAssets => _totalAssets; @@ -117,24 +122,41 @@ class TimelineService { }) : _assetSource = assetSource, _bucketSource = bucketSource { _bucketSubscription = _bucketSource().listen((buckets) { - _totalAssets = - buckets.fold(0, (acc, bucket) => acc + bucket.assetCount); - unawaited(_reloadBucket()); + _mutex.run(() async { + final totalAssets = + buckets.fold(0, (acc, bucket) => acc + bucket.assetCount); + + if (totalAssets == 0) { + _bufferOffset = 0; + _buffer.clear(); + // When the buffer is empty or the old bufferOffset is greater than the new total assets, + // we need to reset the buffer and load the first batch of assets. + } else { + final int offset; + final int count; + if (_bufferOffset >= totalAssets || _buffer.isEmpty) { + offset = 0; + count = kTimelineAssetLoadBatchSize; + } else { + offset = _bufferOffset; + count = math.min( + _buffer.length, + totalAssets - _bufferOffset, + ); + } + _buffer = await _assetSource(offset, count); + _bufferOffset = offset; + } + + EventStream.shared.emit(const TimelineReloadEvent()); + // change the state's total assets count only after the buffer is reloaded + _totalAssets = totalAssets; + }); }); } - final AsyncMutex _mutex = AsyncMutex(); - int _bufferOffset = 0; - List _buffer = []; - StreamSubscription? _bucketSubscription; - Stream> Function() get watchBuckets => _bucketSource; - Future _reloadBucket() => _mutex.run(() async { - _buffer = await _assetSource(_bufferOffset, _buffer.length); - EventStream.shared.emit(const TimelineReloadEvent()); - }); - Future> loadAssets(int index, int count) => _mutex.run(() => _loadAssets(index, count)); @@ -163,18 +185,20 @@ class TimelineService { : (len > kTimelineAssetLoadBatchSize ? index : index + count - len), ); - final assets = await _assetSource(start, len); - _buffer = assets; + _buffer = await _assetSource(start, len); _bufferOffset = start; return getAssets(index, count); } bool hasRange(int index, int count) => - index >= _bufferOffset && index + count <= _bufferOffset + _buffer.length; + index >= 0 && + index < _totalAssets && + index >= _bufferOffset && + index + count <= _bufferOffset + _buffer.length && + index + count <= _totalAssets; List getAssets(int index, int count) { - assert(index + count <= totalAssets); if (!hasRange(index, count)) { throw RangeError('TimelineService::getAssets Index out of range'); } @@ -184,11 +208,13 @@ class TimelineService { // Pre-cache assets around the given index for asset viewer Future preCacheAssets(int index) => - _mutex.run(() => _loadAssets(index, 5)); + _mutex.run(() => _loadAssets(index, math.min(5, _totalAssets - index))); BaseAsset getAsset(int index) { if (!hasRange(index, 1)) { - throw RangeError('TimelineService::getAsset Index out of range'); + throw RangeError( + 'TimelineService::getAsset Index $index not in buffer range [$_bufferOffset, ${_bufferOffset + _buffer.length})', + ); } return _buffer.elementAt(index - _bufferOffset); }