From 9456d7168f64a30513922f8077f0a61c8b751d2e Mon Sep 17 00:00:00 2001 From: JPVenson Date: Wed, 4 Jun 2025 00:15:04 +0300 Subject: [PATCH] Add partition helper (#14039) --- .../Migrations/Routines/MoveExtractedFiles.cs | 52 ++--- .../ProgressablePartitionReporting.cs | 55 +++++ .../QueryPartitionHelpers.cs | 215 ++++++++++++++++++ 3 files changed, 293 insertions(+), 29 deletions(-) create mode 100644 src/Jellyfin.Database/Jellyfin.Database.Implementations/ProgressablePartitionReporting.cs create mode 100644 src/Jellyfin.Database/Jellyfin.Database.Implementations/QueryPartitionHelpers.cs diff --git a/Jellyfin.Server/Migrations/Routines/MoveExtractedFiles.cs b/Jellyfin.Server/Migrations/Routines/MoveExtractedFiles.cs index 8b4abdfe59..38952eec96 100644 --- a/Jellyfin.Server/Migrations/Routines/MoveExtractedFiles.cs +++ b/Jellyfin.Server/Migrations/Routines/MoveExtractedFiles.cs @@ -8,13 +8,14 @@ using System.IO; using System.Linq; using System.Security.Cryptography; using System.Text; +using System.Threading; +using System.Threading.Tasks; using Jellyfin.Data.Enums; using Jellyfin.Database.Implementations; using Jellyfin.Database.Implementations.Entities; using MediaBrowser.Common.Configuration; using MediaBrowser.Common.Extensions; using MediaBrowser.Controller.IO; -using MediaBrowser.Model.Entities; using MediaBrowser.Model.IO; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; @@ -25,9 +26,7 @@ namespace Jellyfin.Server.Migrations.Routines; /// Migration to move extracted files to the new directories. /// [JellyfinMigration("2025-04-20T21:00:00", nameof(MoveExtractedFiles))] -#pragma warning disable CS0618 // Type or member is obsolete -public class MoveExtractedFiles : IMigrationRoutine -#pragma warning restore CS0618 // Type or member is obsolete +public class MoveExtractedFiles : IAsyncMigrationRoutine { private readonly IApplicationPaths _appPaths; private readonly ILogger _logger; @@ -62,10 +61,10 @@ public class MoveExtractedFiles : IMigrationRoutine private string AttachmentCachePath => Path.Combine(_appPaths.DataPath, "attachments"); /// - public void Perform() + public async Task PerformAsync(CancellationToken cancellationToken) { const int Limit = 5000; - int itemCount = 0, offset = 0; + int itemCount = 0; var sw = Stopwatch.StartNew(); @@ -76,32 +75,27 @@ public class MoveExtractedFiles : IMigrationRoutine // Make sure directories exist Directory.CreateDirectory(SubtitleCachePath); Directory.CreateDirectory(AttachmentCachePath); - do + + await foreach (var result in context.BaseItems + .Include(e => e.MediaStreams!.Where(s => s.StreamType == MediaStreamTypeEntity.Subtitle && !s.IsExternal)) + .Where(b => b.MediaType == MediaType.Video.ToString() && !b.IsVirtualItem && !b.IsFolder) + .Select(b => new + { + b.Id, + b.Path, + b.MediaStreams + }) + .OrderBy(e => e.Id) + .WithPartitionProgress((partition) => _logger.LogInformation("Checked: {Count} - Moved: {Items} - Time: {Time}", partition * Limit, itemCount, sw.Elapsed)) + .PartitionEagerAsync(Limit, cancellationToken) + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) { - var results = context.BaseItems - .Include(e => e.MediaStreams!.Where(s => s.StreamType == MediaStreamTypeEntity.Subtitle && !s.IsExternal)) - .Where(b => b.MediaType == MediaType.Video.ToString() && !b.IsVirtualItem && !b.IsFolder) - .OrderBy(e => e.Id) - .Skip(offset) - .Take(Limit) - .Select(b => new Tuple?>(b.Id, b.Path, b.MediaStreams)).ToList(); - - foreach (var result in results) + if (MoveSubtitleAndAttachmentFiles(result.Id, result.Path, result.MediaStreams, context)) { - if (MoveSubtitleAndAttachmentFiles(result.Item1, result.Item2, result.Item3, context)) - { - itemCount++; - } + itemCount++; } - - offset += Limit; - if (offset > records) - { - offset = records; - } - - _logger.LogInformation("Checked: {Count} - Moved: {Items} - Time: {Time}", offset, itemCount, sw.Elapsed); - } while (offset < records); + } _logger.LogInformation("Moved files for {Count} items in {Time}", itemCount, sw.Elapsed); diff --git a/src/Jellyfin.Database/Jellyfin.Database.Implementations/ProgressablePartitionReporting.cs b/src/Jellyfin.Database/Jellyfin.Database.Implementations/ProgressablePartitionReporting.cs new file mode 100644 index 0000000000..7654dd3c51 --- /dev/null +++ b/src/Jellyfin.Database/Jellyfin.Database.Implementations/ProgressablePartitionReporting.cs @@ -0,0 +1,55 @@ +using System; +using System.Diagnostics; +using System.Linq; + +namespace Jellyfin.Database.Implementations; + +/// +/// Wrapper for progress reporting on Partition helpers. +/// +/// The entity to load. +public class ProgressablePartitionReporting +{ + private readonly IOrderedQueryable _source; + + private readonly Stopwatch _partitionTime = new(); + + private readonly Stopwatch _itemTime = new(); + + internal ProgressablePartitionReporting(IOrderedQueryable source) + { + _source = source; + } + + internal Action? OnBeginItem { get; set; } + + internal Action? OnBeginPartition { get; set; } + + internal Action? OnEndItem { get; set; } + + internal Action? OnEndPartition { get; set; } + + internal IOrderedQueryable Source => _source; + + internal void BeginItem(TEntity entity, int iteration, int itemIndex) + { + _itemTime.Restart(); + OnBeginItem?.Invoke(entity, iteration, itemIndex); + } + + internal void BeginPartition(int iteration) + { + _partitionTime.Restart(); + OnBeginPartition?.Invoke(iteration); + } + + internal void EndItem(TEntity entity, int iteration, int itemIndex) + { + OnEndItem?.Invoke(entity, iteration, itemIndex, _itemTime.Elapsed); + } + + internal void EndPartition(int iteration) + { + OnEndPartition?.Invoke(iteration, _partitionTime.Elapsed); + } +} diff --git a/src/Jellyfin.Database/Jellyfin.Database.Implementations/QueryPartitionHelpers.cs b/src/Jellyfin.Database/Jellyfin.Database.Implementations/QueryPartitionHelpers.cs new file mode 100644 index 0000000000..bb66bddca3 --- /dev/null +++ b/src/Jellyfin.Database/Jellyfin.Database.Implementations/QueryPartitionHelpers.cs @@ -0,0 +1,215 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.EntityFrameworkCore; + +namespace Jellyfin.Database.Implementations; + +/// +/// Contains helpers to partition EFCore queries. +/// +public static class QueryPartitionHelpers +{ + /// + /// Adds a callback to any directly following calls of Partition for every partition thats been invoked. + /// + /// The entity to load. + /// The source query. + /// The callback invoked for partition before enumerating items. + /// The callback invoked for partition after enumerating items. + /// A queryable that can be used to partition. + public static ProgressablePartitionReporting WithPartitionProgress(this IOrderedQueryable query, Action? beginPartition = null, Action? endPartition = null) + { + var progressable = new ProgressablePartitionReporting(query); + progressable.OnBeginPartition = beginPartition; + progressable.OnEndPartition = endPartition; + return progressable; + } + + /// + /// Adds a callback to any directly following calls of Partition for every item thats been invoked. + /// + /// The entity to load. + /// The source query. + /// The callback invoked for each item before processing. + /// The callback invoked for each item after processing. + /// A queryable that can be used to partition. + public static ProgressablePartitionReporting WithItemProgress(this IOrderedQueryable query, Action? beginItem = null, Action? endItem = null) + { + var progressable = new ProgressablePartitionReporting(query); + progressable.OnBeginItem = beginItem; + progressable.OnEndItem = endItem; + return progressable; + } + + /// + /// Adds a callback to any directly following calls of Partition for every partition thats been invoked. + /// + /// The entity to load. + /// The source query. + /// The callback invoked for partition before enumerating items. + /// The callback invoked for partition after enumerating items. + /// A queryable that can be used to partition. + public static ProgressablePartitionReporting WithPartitionProgress(this ProgressablePartitionReporting progressable, Action? beginPartition = null, Action? endPartition = null) + { + progressable.OnBeginPartition = beginPartition; + progressable.OnEndPartition = endPartition; + return progressable; + } + + /// + /// Adds a callback to any directly following calls of Partition for every item thats been invoked. + /// + /// The entity to load. + /// The source query. + /// The callback invoked for each item before processing. + /// The callback invoked for each item after processing. + /// A queryable that can be used to partition. + public static ProgressablePartitionReporting WithItemProgress(this ProgressablePartitionReporting progressable, Action? beginItem = null, Action? endItem = null) + { + progressable.OnBeginItem = beginItem; + progressable.OnEndItem = endItem; + return progressable; + } + + /// + /// Enumerates the source query by loading the entities in partitions in a lazy manner reading each item from the database as its requested. + /// + /// The entity to load. + /// The source query. + /// The number of elements to load per partition. + /// The cancelation token. + /// A enumerable representing the whole of the query. + public static async IAsyncEnumerable PartitionAsync(this ProgressablePartitionReporting partitionInfo, int partitionSize, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await foreach (var item in partitionInfo.Source.PartitionAsync(partitionSize, partitionInfo, cancellationToken).ConfigureAwait(false)) + { + yield return item; + } + } + + /// + /// Enumerates the source query by loading the entities in partitions directly into memory. + /// + /// The entity to load. + /// The source query. + /// The number of elements to load per partition. + /// The cancelation token. + /// A enumerable representing the whole of the query. + public static async IAsyncEnumerable PartitionEagerAsync(this ProgressablePartitionReporting partitionInfo, int partitionSize, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + await foreach (var item in partitionInfo.Source.PartitionEagerAsync(partitionSize, partitionInfo, cancellationToken).ConfigureAwait(false)) + { + yield return item; + } + } + + /// + /// Enumerates the source query by loading the entities in partitions in a lazy manner reading each item from the database as its requested. + /// + /// The entity to load. + /// The source query. + /// The number of elements to load per partition. + /// Reporting helper. + /// The cancelation token. + /// A enumerable representing the whole of the query. + public static async IAsyncEnumerable PartitionAsync( + this IOrderedQueryable query, + int partitionSize, + ProgressablePartitionReporting? progressablePartition = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var iterator = 0; + int itemCounter; + do + { + progressablePartition?.BeginPartition(iterator); + itemCounter = 0; + await foreach (var item in query + .Skip(partitionSize * iterator) + .Take(partitionSize) + .AsAsyncEnumerable() + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) + { + progressablePartition?.BeginItem(item, iterator, itemCounter); + yield return item; + progressablePartition?.EndItem(item, iterator, itemCounter); + itemCounter++; + } + + progressablePartition?.EndPartition(iterator); + iterator++; + } while (itemCounter == partitionSize && !cancellationToken.IsCancellationRequested); + } + + /// + /// Enumerates the source query by loading the entities in partitions directly into memory. + /// + /// The entity to load. + /// The source query. + /// The number of elements to load per partition. + /// Reporting helper. + /// The cancelation token. + /// A enumerable representing the whole of the query. + public static async IAsyncEnumerable PartitionEagerAsync( + this IOrderedQueryable query, + int partitionSize, + ProgressablePartitionReporting? progressablePartition = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var iterator = 0; + int itemCounter; + var items = ArrayPool.Shared.Rent(partitionSize); + try + { + do + { + progressablePartition?.BeginPartition(iterator); + itemCounter = 0; + await foreach (var item in query + .Skip(partitionSize * iterator) + .Take(partitionSize) + .AsAsyncEnumerable() + .WithCancellation(cancellationToken) + .ConfigureAwait(false)) + { + items[itemCounter++] = item; + } + + for (int i = 0; i < itemCounter; i++) + { + progressablePartition?.BeginItem(items[i], iterator, itemCounter); + yield return items[i]; + progressablePartition?.EndItem(items[i], iterator, itemCounter); + } + + progressablePartition?.EndPartition(iterator); + iterator++; + } while (itemCounter == partitionSize && !cancellationToken.IsCancellationRequested); + } + finally + { + ArrayPool.Shared.Return(items); + } + } + + /// + /// Adds an Index to the enumeration of the async enumerable. + /// + /// The entity to load. + /// The source query. + /// The source list with an index added. + public static async IAsyncEnumerable<(TEntity Item, int Index)> WithIndex(this IAsyncEnumerable query) + { + var index = 0; + await foreach (var item in query.ConfigureAwait(false)) + { + yield return (item, index++); + } + } +}