fix: run reload within a mutex

This commit is contained in:
shenlong-tanwen 2025-07-08 23:08:06 +05:30
parent d03eb87058
commit 9ab0c604ea

View File

@ -108,6 +108,11 @@ class TimelineFactory {
class TimelineService { class TimelineService {
final TimelineAssetSource _assetSource; final TimelineAssetSource _assetSource;
final TimelineBucketSource _bucketSource; final TimelineBucketSource _bucketSource;
final AsyncMutex _mutex = AsyncMutex();
int _bufferOffset = 0;
List<BaseAsset> _buffer = [];
StreamSubscription? _bucketSubscription;
int _totalAssets = 0; int _totalAssets = 0;
int get totalAssets => _totalAssets; int get totalAssets => _totalAssets;
@ -117,24 +122,41 @@ class TimelineService {
}) : _assetSource = assetSource, }) : _assetSource = assetSource,
_bucketSource = bucketSource { _bucketSource = bucketSource {
_bucketSubscription = _bucketSource().listen((buckets) { _bucketSubscription = _bucketSource().listen((buckets) {
_totalAssets = _mutex.run(() async {
buckets.fold<int>(0, (acc, bucket) => acc + bucket.assetCount); final totalAssets =
unawaited(_reloadBucket()); buckets.fold<int>(0, (acc, bucket) => acc + bucket.assetCount);
if (totalAssets == 0) {
_bufferOffset = 0;
_buffer.clear();
// When the buffer is empty or the old bufferOffset is greater than the new total assets,
// we need to reset the buffer and load the first batch of assets.
} else {
final int offset;
final int count;
if (_bufferOffset >= totalAssets || _buffer.isEmpty) {
offset = 0;
count = kTimelineAssetLoadBatchSize;
} else {
offset = _bufferOffset;
count = math.min(
_buffer.length,
totalAssets - _bufferOffset,
);
}
_buffer = await _assetSource(offset, count);
_bufferOffset = offset;
}
EventStream.shared.emit(const TimelineReloadEvent());
// change the state's total assets count only after the buffer is reloaded
_totalAssets = totalAssets;
});
}); });
} }
final AsyncMutex _mutex = AsyncMutex();
int _bufferOffset = 0;
List<BaseAsset> _buffer = [];
StreamSubscription? _bucketSubscription;
Stream<List<Bucket>> Function() get watchBuckets => _bucketSource; Stream<List<Bucket>> Function() get watchBuckets => _bucketSource;
Future<void> _reloadBucket() => _mutex.run(() async {
_buffer = await _assetSource(_bufferOffset, _buffer.length);
EventStream.shared.emit(const TimelineReloadEvent());
});
Future<List<BaseAsset>> loadAssets(int index, int count) => Future<List<BaseAsset>> loadAssets(int index, int count) =>
_mutex.run(() => _loadAssets(index, count)); _mutex.run(() => _loadAssets(index, count));
@ -163,18 +185,20 @@ class TimelineService {
: (len > kTimelineAssetLoadBatchSize ? index : index + count - len), : (len > kTimelineAssetLoadBatchSize ? index : index + count - len),
); );
final assets = await _assetSource(start, len); _buffer = await _assetSource(start, len);
_buffer = assets;
_bufferOffset = start; _bufferOffset = start;
return getAssets(index, count); return getAssets(index, count);
} }
bool hasRange(int index, int count) => bool hasRange(int index, int count) =>
index >= _bufferOffset && index + count <= _bufferOffset + _buffer.length; index >= 0 &&
index < _totalAssets &&
index >= _bufferOffset &&
index + count <= _bufferOffset + _buffer.length &&
index + count <= _totalAssets;
List<BaseAsset> getAssets(int index, int count) { List<BaseAsset> getAssets(int index, int count) {
assert(index + count <= totalAssets);
if (!hasRange(index, count)) { if (!hasRange(index, count)) {
throw RangeError('TimelineService::getAssets Index out of range'); throw RangeError('TimelineService::getAssets Index out of range');
} }
@ -184,11 +208,13 @@ class TimelineService {
// Pre-cache assets around the given index for asset viewer // Pre-cache assets around the given index for asset viewer
Future<void> preCacheAssets(int index) => Future<void> preCacheAssets(int index) =>
_mutex.run(() => _loadAssets(index, 5)); _mutex.run(() => _loadAssets(index, math.min(5, _totalAssets - index)));
BaseAsset getAsset(int index) { BaseAsset getAsset(int index) {
if (!hasRange(index, 1)) { if (!hasRange(index, 1)) {
throw RangeError('TimelineService::getAsset Index out of range'); throw RangeError(
'TimelineService::getAsset Index $index not in buffer range [$_bufferOffset, ${_bufferOffset + _buffer.length})',
);
} }
return _buffer.elementAt(index - _bufferOffset); return _buffer.elementAt(index - _bufferOffset);
} }