using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using JetBrains.Annotations; using Kyoo.Models.Exceptions; using Kyoo.Models.Options; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace Kyoo.Controllers { /// /// A service to handle long running tasks and a background runner. /// /// Task will be queued, only one can run simultaneously. public class TaskManager : BackgroundService, ITaskManager { /// /// The service provider used to activate /// private readonly IServiceProvider _provider; /// /// The configuration instance used to get schedule information /// private readonly IOptionsMonitor _options; /// /// The logger instance. /// private readonly ILogger _logger; /// /// The list of tasks and their next scheduled run. /// private readonly List<(ITask task, DateTime scheduledDate)> _tasks; /// /// The queue of tasks that should be run as soon as possible. /// private readonly Queue<(ITask, IProgress, Dictionary)> _queuedTasks = new(); /// /// The currently running task. /// private ITask _runningTask; /// /// The cancellation token used to cancel the running task when the runner should shutdown. /// private readonly CancellationTokenSource _taskToken = new(); /// /// Create a new . /// /// The list of tasks to manage /// The service provider to request services for tasks /// The configuration to load schedule information. /// The logger. public TaskManager(IEnumerable tasks, IServiceProvider provider, IOptionsMonitor options, ILogger logger) { _provider = provider; _options = options; _logger = logger; _tasks = tasks.Select(x => (x, GetNextTaskDate(x.Slug))).ToList(); if (_tasks.Any()) _logger.LogTrace("Task manager initiated with: {Tasks}", _tasks.Select(x => x.task.Name)); else _logger.LogInformation("Task manager initiated without any tasks"); } /// /// Triggered when the application host is ready to start the service. /// /// Start the runner in another thread. /// Indicates that the start process has been aborted. public override Task StartAsync(CancellationToken cancellationToken) { Task.Run(() => base.StartAsync(cancellationToken), CancellationToken.None); return Task.CompletedTask; } /// public override Task StopAsync(CancellationToken cancellationToken) { _taskToken.Cancel(); return base.StopAsync(cancellationToken); } /// /// The runner that will host tasks and run queued tasks. /// /// A token to stop the runner protected override async Task ExecuteAsync(CancellationToken cancellationToken) { EnqueueStartupTasks(); while (!cancellationToken.IsCancellationRequested) { if (_queuedTasks.Any()) { (ITask task, IProgress progress, Dictionary args) = _queuedTasks.Dequeue(); _runningTask = task; try { await RunTask(task, progress, args); } catch (TaskFailedException ex) { _logger.LogWarning("The task \"{Task}\" failed: {Message}", task.Name, ex.Message); } catch (Exception e) { _logger.LogError(e, "An unhandled exception occured while running the task {Task}", task.Name); } } else { await Task.Delay(1000, cancellationToken); QueueScheduledTasks(); } } } /// /// Parse parameters, inject a task and run it. /// /// The task to run /// A progress reporter to know the percentage of completion of the task. /// The arguments to pass to the function /// /// If the number of arguments is invalid, if an argument can't be converted or if the task finds the argument /// invalid. /// private async Task RunTask(ITask task, [NotNull] IProgress progress, Dictionary arguments) { _logger.LogInformation("Task starting: {Task}", task.Name); ICollection all = task.GetParameters(); ICollection invalids = arguments.Keys .Where(x => all.All(y => x != y.Name)) .ToArray(); if (invalids.Any()) { string invalidsStr = string.Join(", ", invalids); throw new ArgumentException($"{invalidsStr} are invalid arguments for the task {task.Name}"); } TaskParameters args = new(all .Select(x => { object value = arguments .FirstOrDefault(y => string.Equals(y.Key, x.Name, StringComparison.OrdinalIgnoreCase)) .Value; if (value == null && x.IsRequired) throw new ArgumentException($"The argument {x.Name} is required to run {task.Name}" + " but it was not specified."); return x.CreateValue(value ?? x.DefaultValue); })); using IServiceScope scope = _provider.CreateScope(); Helper.InjectServices(task, x => scope.ServiceProvider.GetRequiredService(x)); await task.Run(args, progress, _taskToken.Token); Helper.InjectServices(task, _ => null); _logger.LogInformation("Task finished: {Task}", task.Name); } /// /// Start tasks that are scheduled for start. /// private void QueueScheduledTasks() { IEnumerable tasksToQueue = _tasks.Where(x => x.scheduledDate <= DateTime.Now) .Select(x => x.task.Slug); foreach (string task in tasksToQueue) { _logger.LogDebug("Queuing task scheduled for running: {Task}", task); StartTask(task, new Progress(), new Dictionary()); } } /// /// Queue startup tasks with respect to the priority rules. /// private void EnqueueStartupTasks() { IEnumerable startupTasks = _tasks.Select(x => x.task) .Where(x => x.RunOnStartup) .OrderByDescending(x => x.Priority); foreach (ITask task in startupTasks) _queuedTasks.Enqueue((task, new Progress(), new Dictionary())); } /// public void StartTask(string taskSlug, IProgress progress, Dictionary arguments = null, CancellationToken? cancellationToken = null) { arguments ??= new Dictionary(); int index = _tasks.FindIndex(x => x.task.Slug == taskSlug); if (index == -1) throw new ItemNotFoundException($"No task found with the slug {taskSlug}"); _queuedTasks.Enqueue((_tasks[index].task, progress, arguments)); _tasks[index] = (_tasks[index].task, GetNextTaskDate(taskSlug)); } /// public void StartTask(IProgress progress, Dictionary arguments = null, CancellationToken? cancellationToken = null) where T : ITask, new() { StartTask(new T().Slug, progress, arguments, cancellationToken); } /// /// Get the next date of the execution of the given task. /// /// The slug of the task /// The next date. private DateTime GetNextTaskDate(string taskSlug) { if (_options.CurrentValue.Scheduled.TryGetValue(taskSlug, out TimeSpan delay)) return DateTime.Now + delay; return DateTime.MaxValue; } /// public ICollection GetRunningTasks() { return new[] {_runningTask}; } /// public ICollection GetAllTasks() { return _tasks.Select(x => x.task).ToArray(); } } }