From 0e1be6ce30ae42f3efe0128069a18e11022dca5b Mon Sep 17 00:00:00 2001 From: JPVenson Date: Mon, 16 Jun 2025 00:21:11 +0300 Subject: [PATCH] Use proper scheduler that honors the parallel task limit (#14281) --- .../ApplicationHost.cs | 3 + .../Tasks/RefreshMediaLibraryTask.cs | 4 +- MediaBrowser.Controller/Entities/Folder.cs | 60 +--- .../ILimitedConcurrencyLibraryScheduler.cs | 23 ++ .../LimitedConcurrencyLibraryScheduler.cs | 316 ++++++++++++++++++ 5 files changed, 356 insertions(+), 50 deletions(-) create mode 100644 MediaBrowser.Controller/LibraryTaskScheduler/ILimitedConcurrencyLibraryScheduler.cs create mode 100644 MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs diff --git a/Emby.Server.Implementations/ApplicationHost.cs b/Emby.Server.Implementations/ApplicationHost.cs index 565d0f0c85..cbb0f6c565 100644 --- a/Emby.Server.Implementations/ApplicationHost.cs +++ b/Emby.Server.Implementations/ApplicationHost.cs @@ -62,6 +62,7 @@ using MediaBrowser.Controller.Entities; using MediaBrowser.Controller.Entities.TV; using MediaBrowser.Controller.IO; using MediaBrowser.Controller.Library; +using MediaBrowser.Controller.LibraryTaskScheduler; using MediaBrowser.Controller.LiveTv; using MediaBrowser.Controller.Lyrics; using MediaBrowser.Controller.MediaEncoding; @@ -552,6 +553,7 @@ namespace Emby.Server.Implementations serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); + serviceCollection.AddSingleton(); serviceCollection.AddSingleton(); @@ -650,6 +652,7 @@ namespace Emby.Server.Implementations CollectionFolder.ApplicationHost = this; Folder.UserViewManager = Resolve(); Folder.CollectionManager = Resolve(); + Folder.LimitedConcurrencyLibraryScheduler = Resolve(); Episode.MediaEncoder = Resolve(); UserView.TVSeriesManager = Resolve(); Video.RecordingsManager = Resolve(); diff --git a/Emby.Server.Implementations/ScheduledTasks/Tasks/RefreshMediaLibraryTask.cs b/Emby.Server.Implementations/ScheduledTasks/Tasks/RefreshMediaLibraryTask.cs index c96843199c..1865189d0a 100644 --- a/Emby.Server.Implementations/ScheduledTasks/Tasks/RefreshMediaLibraryTask.cs +++ b/Emby.Server.Implementations/ScheduledTasks/Tasks/RefreshMediaLibraryTask.cs @@ -54,12 +54,12 @@ public class RefreshMediaLibraryTask : IScheduledTask } /// - public Task ExecuteAsync(IProgress progress, CancellationToken cancellationToken) + public async Task ExecuteAsync(IProgress progress, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); progress.Report(0); - return ((LibraryManager)_libraryManager).ValidateMediaLibraryInternal(progress, cancellationToken); + await ((LibraryManager)_libraryManager).ValidateMediaLibraryInternal(progress, cancellationToken).ConfigureAwait(false); } } diff --git a/MediaBrowser.Controller/Entities/Folder.cs b/MediaBrowser.Controller/Entities/Folder.cs index e0c3b0a938..732a03a2ae 100644 --- a/MediaBrowser.Controller/Entities/Folder.cs +++ b/MediaBrowser.Controller/Entities/Folder.cs @@ -11,7 +11,6 @@ using System.Security; using System.Text.Json.Serialization; using System.Threading; using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; using J2N.Collections.Generic.Extensions; using Jellyfin.Data; using Jellyfin.Data.Enums; @@ -25,6 +24,7 @@ using MediaBrowser.Controller.Dto; using MediaBrowser.Controller.Entities.Audio; using MediaBrowser.Controller.Entities.Movies; using MediaBrowser.Controller.Library; +using MediaBrowser.Controller.LibraryTaskScheduler; using MediaBrowser.Controller.Providers; using MediaBrowser.Model.Dto; using MediaBrowser.Model.IO; @@ -49,6 +49,8 @@ namespace MediaBrowser.Controller.Entities public static IUserViewManager UserViewManager { get; set; } + public static ILimitedConcurrencyLibraryScheduler LimitedConcurrencyLibraryScheduler { get; set; } + /// /// Gets or sets a value indicating whether this instance is root. /// @@ -598,51 +600,13 @@ namespace MediaBrowser.Controller.Entities /// Task. private async Task RunTasks(Func, Task> task, IList children, IProgress progress, CancellationToken cancellationToken) { - var childrenCount = children.Count; - var childrenProgress = new double[childrenCount]; - - void UpdateProgress() - { - progress.Report(childrenProgress.Average()); - } - - var fanoutConcurrency = ConfigurationManager.Configuration.LibraryScanFanoutConcurrency; - var parallelism = fanoutConcurrency > 0 ? fanoutConcurrency : Environment.ProcessorCount; - - var actionBlock = new ActionBlock( - async i => - { - var innerProgress = new Progress(innerPercent => - { - // round the percent and only update progress if it changed to prevent excessive UpdateProgress calls - var innerPercentRounded = Math.Round(innerPercent); - if (childrenProgress[i] != innerPercentRounded) - { - childrenProgress[i] = innerPercentRounded; - UpdateProgress(); - } - }); - - await task(children[i], innerProgress).ConfigureAwait(false); - - childrenProgress[i] = 100; - - UpdateProgress(); - }, - new ExecutionDataflowBlockOptions - { - MaxDegreeOfParallelism = parallelism, - CancellationToken = cancellationToken, - }); - - for (var i = 0; i < childrenCount; i++) - { - await actionBlock.SendAsync(i, cancellationToken).ConfigureAwait(false); - } - - actionBlock.Complete(); - - await actionBlock.Completion.ConfigureAwait(false); + await LimitedConcurrencyLibraryScheduler + .Enqueue( + children.ToArray(), + task, + progress, + cancellationToken) + .ConfigureAwait(false); } /// @@ -1008,7 +972,7 @@ namespace MediaBrowser.Controller.Entities items = CollapseBoxSetItemsIfNeeded(items, query, this, user, ConfigurationManager, CollectionManager); } - #pragma warning disable CA1309 +#pragma warning disable CA1309 if (!string.IsNullOrEmpty(query.NameStartsWithOrGreater)) { items = items.Where(i => string.Compare(query.NameStartsWithOrGreater, i.SortName, StringComparison.InvariantCultureIgnoreCase) < 1); @@ -1023,7 +987,7 @@ namespace MediaBrowser.Controller.Entities { items = items.Where(i => string.Compare(query.NameLessThan, i.SortName, StringComparison.InvariantCultureIgnoreCase) == 1); } - #pragma warning restore CA1309 +#pragma warning restore CA1309 // This must be the last filter if (!query.AdjacentTo.IsNullOrEmpty()) diff --git a/MediaBrowser.Controller/LibraryTaskScheduler/ILimitedConcurrencyLibraryScheduler.cs b/MediaBrowser.Controller/LibraryTaskScheduler/ILimitedConcurrencyLibraryScheduler.cs new file mode 100644 index 0000000000..e7460a2e65 --- /dev/null +++ b/MediaBrowser.Controller/LibraryTaskScheduler/ILimitedConcurrencyLibraryScheduler.cs @@ -0,0 +1,23 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Model.Configuration; + +namespace MediaBrowser.Controller.LibraryTaskScheduler; + +/// +/// Provides a shared scheduler to run library related tasks based on the . +/// +public interface ILimitedConcurrencyLibraryScheduler +{ + /// + /// Enqueues an action that will be invoked with the set data. + /// + /// The data Type. + /// The data. + /// The callback to process the data. + /// A progress reporter. + /// Stop token. + /// A task that finishes when all data has been processed by the worker. + Task Enqueue(T[] data, Func, Task> worker, IProgress progress, CancellationToken cancellationToken); +} diff --git a/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs b/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs new file mode 100644 index 0000000000..f1428d6b97 --- /dev/null +++ b/MediaBrowser.Controller/LibraryTaskScheduler/LimitedConcurrencyLibraryScheduler.cs @@ -0,0 +1,316 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using MediaBrowser.Controller.Configuration; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace MediaBrowser.Controller.LibraryTaskScheduler; + +/// +/// Provides Parallel action interface to process tasks with a set concurrency level. +/// +public sealed class LimitedConcurrencyLibraryScheduler : ILimitedConcurrencyLibraryScheduler, IAsyncDisposable +{ + private const int CleanupGracePeriod = 60; + private readonly IHostApplicationLifetime _hostApplicationLifetime; + private readonly ILogger _logger; + private readonly IServerConfigurationManager _serverConfigurationManager; + private readonly Dictionary _taskRunners = new(); + + private static readonly AsyncLocal _deadlockDetector = new(); + + /// + /// Gets used to lock all operations on the Tasks queue and creating workers. + /// + private readonly Lock _taskLock = new(); + + private readonly BlockingCollection _tasks = new(); + + private volatile int _workCounter; + private Task? _cleanupTask; + private bool _disposed; + + /// + /// Initializes a new instance of the class. + /// + /// The hosting lifetime. + /// The logger. + /// The server configuration manager. + public LimitedConcurrencyLibraryScheduler( + IHostApplicationLifetime hostApplicationLifetime, + ILogger logger, + IServerConfigurationManager serverConfigurationManager) + { + _hostApplicationLifetime = hostApplicationLifetime; + _logger = logger; + _serverConfigurationManager = serverConfigurationManager; + } + + private void ScheduleTaskCleanup() + { + lock (_taskLock) + { + if (_cleanupTask is not null) + { + _logger.LogDebug("Cleanup task already scheduled."); + // cleanup task is already running. + return; + } + + _cleanupTask = RunCleanupTask(); + } + + async Task RunCleanupTask() + { + _logger.LogDebug("Schedule cleanup task in {CleanupGracePerioid} sec.", CleanupGracePeriod); + await Task.Delay(TimeSpan.FromSeconds(CleanupGracePeriod)).ConfigureAwait(false); + if (_disposed) + { + _logger.LogDebug("Abort cleaning up, already disposed."); + return; + } + + lock (_taskLock) + { + if (_tasks.Count > 0 || _workCounter > 0) + { + _logger.LogDebug("Delay cleanup task, operations still running."); + // tasks are still there so its still in use. Reschedule cleanup task. + // we cannot just exit here and rely on the other invoker because there is a considerable timeframe where it could have already ended. + _cleanupTask = RunCleanupTask(); + return; + } + } + + _logger.LogDebug("Cleanup runners."); + foreach (var item in _taskRunners.ToArray()) + { + await item.Key.CancelAsync().ConfigureAwait(false); + _taskRunners.Remove(item.Key); + } + } + } + + private void Worker() + { + lock (_taskLock) + { + var fanoutConcurrency = _serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency; + var parallelism = (fanoutConcurrency > 0 ? fanoutConcurrency : Environment.ProcessorCount) - _taskRunners.Count; + _logger.LogDebug("Spawn {NumberRunners} new runners.", parallelism); + for (int i = 0; i < parallelism; i++) + { + var stopToken = new CancellationTokenSource(); + var combinedSource = CancellationTokenSource.CreateLinkedTokenSource(stopToken.Token, _hostApplicationLifetime.ApplicationStopping); + _taskRunners.Add( + combinedSource, + Task.Factory.StartNew( + ItemWorker, + (combinedSource, stopToken), + combinedSource.Token, + TaskCreationOptions.PreferFairness, + TaskScheduler.Default)); + } + } + } + + private async Task ItemWorker(object? obj) + { + var stopToken = ((CancellationTokenSource TaskStop, CancellationTokenSource GlobalStop))obj!; + _deadlockDetector.Value = stopToken.TaskStop; + try + { + foreach (var item in _tasks.GetConsumingEnumerable(stopToken.GlobalStop.Token)) + { + stopToken.GlobalStop.Token.ThrowIfCancellationRequested(); + try + { + var newWorkerLimit = Interlocked.Increment(ref _workCounter) > 0; + Debug.Assert(newWorkerLimit, "_workCounter > 0"); + _logger.LogDebug("Process new item '{Data}'.", item.Data); + await ProcessItem(item).ConfigureAwait(false); + } + finally + { + var newWorkerLimit = Interlocked.Decrement(ref _workCounter) >= 0; + Debug.Assert(newWorkerLimit, "_workCounter > 0"); + } + } + } + catch (OperationCanceledException) when (stopToken.TaskStop.IsCancellationRequested) + { + // thats how you do it, interupt the waiter thread. There is nothing to do here when it was on purpose. + } + finally + { + _logger.LogDebug("Cleanup Runner'."); + _deadlockDetector.Value = default!; + _taskRunners.Remove(stopToken.TaskStop); + stopToken.GlobalStop.Dispose(); + stopToken.TaskStop.Dispose(); + } + } + + private async Task ProcessItem(TaskQueueItem item) + { + try + { + if (item.CancellationToken.IsCancellationRequested) + { + // if item is cancelled, just skip it + return; + } + + await item.Worker(item.Data).ConfigureAwait(true); + } + catch (System.Exception ex) + { + _logger.LogError(ex, "Error while performing a library operation"); + } + finally + { + item.Progress.Report(100); + item.Done.SetResult(); + } + } + + /// + public async Task Enqueue(T[] data, Func, Task> worker, IProgress progress, CancellationToken cancellationToken) + { + if (_disposed) + { + return; + } + + if (data.Length == 0 || cancellationToken.IsCancellationRequested) + { + progress.Report(100); + return; + } + + _logger.LogDebug("Enqueue new Workset of {NoItems} items.", data.Length); + + TaskQueueItem[] workItems = null!; + + void UpdateProgress() + { + progress.Report(workItems.Select(e => e.ProgressValue).Average()); + } + + workItems = data.Select(item => + { + TaskQueueItem queueItem = null!; + return queueItem = new TaskQueueItem() + { + Data = item!, + Progress = new Progress(innerPercent => + { + // round the percent and only update progress if it changed to prevent excessive UpdateProgress calls + var innerPercentRounded = Math.Round(innerPercent); + if (queueItem.ProgressValue != innerPercentRounded) + { + queueItem.ProgressValue = innerPercentRounded; + UpdateProgress(); + } + }), + Worker = (val) => worker((T)val, queueItem.Progress), + CancellationToken = cancellationToken + }; + }).ToArray(); + + if (_serverConfigurationManager.Configuration.LibraryScanFanoutConcurrency == 1) + { + _logger.LogDebug("Process sequentially."); + try + { + foreach (var item in workItems) + { + await ProcessItem(item).ConfigureAwait(false); + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // operation is cancelled. Do nothing. + } + + _logger.LogDebug("Process sequentially done."); + return; + } + + for (var i = 0; i < workItems.Length; i++) + { + var item = workItems[i]!; + _tasks.Add(item, CancellationToken.None); + } + + if (_deadlockDetector.Value is not null) + { + _logger.LogDebug("Nested invocation detected, process in-place."); + try + { + // we are in a nested loop. There is no reason to spawn a task here as that would just lead to deadlocks and no additional concurrency is achieved + while (workItems.Any(e => !e.Done.Task.IsCompleted) && _tasks.TryTake(out var item, 200, _deadlockDetector.Value.Token)) + { + await ProcessItem(item).ConfigureAwait(false); + } + } + catch (OperationCanceledException) when (_deadlockDetector.Value.IsCancellationRequested) + { + // operation is cancelled. Do nothing. + } + + _logger.LogDebug("process in-place done."); + } + else + { + Worker(); + _logger.LogDebug("Wait for {NoWorkers} to complete.", workItems.Length); + await Task.WhenAll([.. workItems.Select(f => f.Done.Task)]).ConfigureAwait(false); + _logger.LogDebug("{NoWorkers} completed.", workItems.Length); + ScheduleTaskCleanup(); + } + } + + /// + public async ValueTask DisposeAsync() + { + if (_disposed) + { + return; + } + + _disposed = true; + _tasks.CompleteAdding(); + foreach (var item in _taskRunners) + { + await item.Key.CancelAsync().ConfigureAwait(false); + } + + _tasks.Dispose(); + if (_cleanupTask is not null) + { + await _cleanupTask.ConfigureAwait(false); + _cleanupTask?.Dispose(); + } + } + + private class TaskQueueItem + { + public required object Data { get; init; } + + public double ProgressValue { get; set; } + + public required Func Worker { get; init; } + + public required IProgress Progress { get; init; } + + public TaskCompletionSource Done { get; } = new(); + + public CancellationToken CancellationToken { get; init; } + } +}