// Kyoo - A portable and vast media library solution. // Copyright (c) Kyoo. // // See AUTHORS.md and LICENSE file in the project root for full license information. // // Kyoo is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // any later version. // // Kyoo is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with Kyoo. If not, see . using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Threading; using System.Threading.Tasks; using Autofac.Features.Metadata; using Autofac.Features.OwnedInstances; using JetBrains.Annotations; using Kyoo.Abstractions.Controllers; using Kyoo.Abstractions.Models.Attributes; using Kyoo.Abstractions.Models.Exceptions; using Kyoo.Core.Models.Options; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace Kyoo.Core.Controllers { /// /// A service to handle long running tasks and a background runner. /// /// Task will be queued, only one can run simultaneously. public class TaskManager : BackgroundService, ITaskManager { /// /// The class representing task under this jurisdiction. /// private class ManagedTask { /// /// The metadata for this task (the slug, and other useful information). /// public TaskMetadataAttribute Metadata { get; set; } /// /// The function used to create the task object. /// public Func> Factory { get; init; } /// /// The next scheduled date for this task /// public DateTime ScheduledDate { get; set; } } /// /// A class representing a task inside the list. /// private class QueuedTask { /// /// The task currently queued. /// public ManagedTask Task { get; init; } /// /// The progress reporter that this task should use. /// public IProgress ProgressReporter { get; init; } /// /// The arguments to give to run the task with. /// public Dictionary Arguments { get; init; } /// /// A token informing the task that it should be cancelled or not. /// public CancellationToken? CancellationToken { get; init; } } /// /// The configuration instance used to get schedule information /// private readonly IOptionsMonitor _options; /// /// The logger instance. /// private readonly ILogger _logger; /// /// The list of tasks and their next scheduled run. /// private readonly List _tasks; /// /// The queue of tasks that should be run as soon as possible. /// private readonly Queue _queuedTasks = new(); /// /// The cancellation token used to cancel the running task when the runner should shutdown. /// private readonly CancellationTokenSource _taskToken = new(); /// /// The currently running task. /// private (TaskMetadataAttribute, ITask)? _runningTask; /// /// Create a new . /// /// The list of tasks to manage with their metadata /// The configuration to load schedule information. /// The logger. public TaskManager(IEnumerable>, TaskMetadataAttribute>> tasks, IOptionsMonitor options, ILogger logger) { _options = options; _logger = logger; _tasks = tasks.Select(x => new ManagedTask { Factory = x.Value, Metadata = x.Metadata, ScheduledDate = _GetNextTaskDate(x.Metadata.Slug) }).ToList(); if (_tasks.Any()) _logger.LogTrace("Task manager initiated with: {Tasks}", _tasks.Select(x => x.Metadata.Name)); else _logger.LogInformation("Task manager initiated without any tasks"); } /// /// Triggered when the application host is ready to start the service. /// /// Start the runner in another thread. /// Indicates that the start process has been aborted. /// A representing the asynchronous operation. public override Task StartAsync(CancellationToken cancellationToken) { Task.Run(() => base.StartAsync(cancellationToken), CancellationToken.None); return Task.CompletedTask; } /// public override Task StopAsync(CancellationToken cancellationToken) { _taskToken.Cancel(); return base.StopAsync(cancellationToken); } /// /// The runner that will host tasks and run queued tasks. /// /// A token to stop the runner /// A representing the asynchronous operation. protected override async Task ExecuteAsync(CancellationToken cancellationToken) { _EnqueueStartupTasks(); while (!cancellationToken.IsCancellationRequested) { if (_queuedTasks.Any()) { QueuedTask task = _queuedTasks.Dequeue(); try { await _RunTask(task.Task, task.ProgressReporter, task.Arguments, task.CancellationToken); } catch (TaskFailedException ex) { _logger.LogWarning("The task \"{Task}\" failed: {Message}", task.Task.Metadata.Name, ex.Message); } catch (Exception e) { _logger.LogError(e, "An unhandled exception occured while running the task {Task}", task.Task.Metadata.Name); } } else { await Task.Delay(1000, cancellationToken); _QueueScheduledTasks(); } } } /// /// Parse parameters, inject a task and run it. /// /// The task to run /// A progress reporter to know the percentage of completion of the task. /// The arguments to pass to the function /// An optional cancellation token that will be passed to the task. /// /// If the number of arguments is invalid, if an argument can't be converted or if the task finds the argument /// invalid. /// private async Task _RunTask(ManagedTask task, [NotNull] IProgress progress, Dictionary arguments, CancellationToken? cancellationToken = null) { using (_logger.BeginScope("Task: {Task}", task.Metadata.Name)) { await using Owned taskObj = task.Factory.Invoke(); ICollection all = taskObj.Value.GetParameters(); _runningTask = (task.Metadata, taskObj.Value); ICollection invalids = arguments.Keys .Where(x => all.All(y => x != y.Name)) .ToArray(); if (invalids.Any()) { throw new ArgumentException($"{string.Join(", ", invalids)} are " + $"invalid arguments for the task {task.Metadata.Name}"); } TaskParameters args = new(all .Select(x => { object value = arguments .FirstOrDefault(y => string.Equals(y.Key, x.Name, StringComparison.OrdinalIgnoreCase)) .Value; if (value == null && x.IsRequired) { throw new ArgumentException($"The argument {x.Name} is required to run " + $"{task.Metadata.Name} but it was not specified."); } return x.CreateValue(value ?? x.DefaultValue); })); _logger.LogInformation("Task starting: {Task} ({Parameters})", task.Metadata.Name, args.ToDictionary(x => x.Name, x => x.As())); CancellationToken token = cancellationToken != null ? CancellationTokenSource.CreateLinkedTokenSource(_taskToken.Token, cancellationToken.Value).Token : _taskToken.Token; await taskObj.Value.Run(args, progress, token); _logger.LogInformation("Task finished: {Task}", task.Metadata.Name); _runningTask = null; } } /// /// Start tasks that are scheduled for start. /// private void _QueueScheduledTasks() { IEnumerable tasksToQueue = _tasks.Where(x => x.ScheduledDate <= DateTime.Now) .Select(x => x.Metadata.Slug); foreach (string task in tasksToQueue) { _logger.LogDebug("Queuing task scheduled for running: {Task}", task); StartTask(task, new Progress(), new Dictionary()); } } /// /// Queue startup tasks with respect to the priority rules. /// private void _EnqueueStartupTasks() { IEnumerable startupTasks = _tasks .Where(x => x.Metadata.RunOnStartup) .OrderByDescending(x => x.Metadata.Priority) .Select(x => x.Metadata.Slug); foreach (string task in startupTasks) StartTask(task, new Progress(), new Dictionary()); } /// public void StartTask(string taskSlug, IProgress progress, Dictionary arguments = null, CancellationToken? cancellationToken = null) { arguments ??= new Dictionary(); int index = _tasks.FindIndex(x => x.Metadata.Slug == taskSlug); if (index == -1) throw new ItemNotFoundException($"No task found with the slug {taskSlug}"); _queuedTasks.Enqueue(new QueuedTask { Task = _tasks[index], ProgressReporter = progress, Arguments = arguments, CancellationToken = cancellationToken }); _tasks[index].ScheduledDate = _GetNextTaskDate(taskSlug); } /// public void StartTask(IProgress progress, Dictionary arguments = null, CancellationToken? cancellationToken = null) where T : ITask { TaskMetadataAttribute metadata = typeof(T).GetCustomAttribute(); if (metadata == null) throw new ArgumentException($"No metadata found on the given task (type: {typeof(T).Name})."); StartTask(metadata.Slug, progress, arguments, cancellationToken); } /// /// Get the next date of the execution of the given task. /// /// The slug of the task /// The next date. private DateTime _GetNextTaskDate(string taskSlug) { if (_options.CurrentValue.Scheduled.TryGetValue(taskSlug, out TimeSpan delay)) return DateTime.Now + delay; return DateTime.MaxValue; } /// public ICollection<(TaskMetadataAttribute, ITask)> GetRunningTasks() { return _runningTask == null ? ArraySegment<(TaskMetadataAttribute, ITask)>.Empty : new[] { _runningTask.Value }; } /// public ICollection GetAllTasks() { return _tasks.Select(x => x.Metadata).ToArray(); } } }