mirror of
https://github.com/jellyfin/jellyfin.git
synced 2025-07-09 03:04:24 -04:00
Add multiple options for internal locking (#14047)
This commit is contained in:
parent
9456d7168f
commit
a1d72deba2
@ -58,6 +58,7 @@
|
|||||||
<PackageVersion Include="prometheus-net.AspNetCore" Version="8.2.1" />
|
<PackageVersion Include="prometheus-net.AspNetCore" Version="8.2.1" />
|
||||||
<PackageVersion Include="prometheus-net.DotNetRuntime" Version="4.4.1" />
|
<PackageVersion Include="prometheus-net.DotNetRuntime" Version="4.4.1" />
|
||||||
<PackageVersion Include="prometheus-net" Version="8.2.1" />
|
<PackageVersion Include="prometheus-net" Version="8.2.1" />
|
||||||
|
<PackageVersion Include="Polly" Version="8.5.2" />
|
||||||
<PackageVersion Include="Serilog.AspNetCore" Version="9.0.0" />
|
<PackageVersion Include="Serilog.AspNetCore" Version="9.0.0" />
|
||||||
<PackageVersion Include="Serilog.Enrichers.Thread" Version="4.0.0" />
|
<PackageVersion Include="Serilog.Enrichers.Thread" Version="4.0.0" />
|
||||||
<PackageVersion Include="Serilog.Settings.Configuration" Version="9.0.0" />
|
<PackageVersion Include="Serilog.Settings.Configuration" Version="9.0.0" />
|
||||||
|
@ -3,6 +3,7 @@ using System.Collections.Generic;
|
|||||||
using System.Reflection;
|
using System.Reflection;
|
||||||
using Jellyfin.Database.Implementations;
|
using Jellyfin.Database.Implementations;
|
||||||
using Jellyfin.Database.Implementations.DbConfiguration;
|
using Jellyfin.Database.Implementations.DbConfiguration;
|
||||||
|
using Jellyfin.Database.Implementations.Locking;
|
||||||
using Jellyfin.Database.Providers.Sqlite;
|
using Jellyfin.Database.Providers.Sqlite;
|
||||||
using MediaBrowser.Common.Configuration;
|
using MediaBrowser.Common.Configuration;
|
||||||
using MediaBrowser.Controller.Configuration;
|
using MediaBrowser.Controller.Configuration;
|
||||||
@ -73,6 +74,7 @@ public static class ServiceCollectionExtensions
|
|||||||
efCoreConfiguration = new DatabaseConfigurationOptions()
|
efCoreConfiguration = new DatabaseConfigurationOptions()
|
||||||
{
|
{
|
||||||
DatabaseType = "Jellyfin-SQLite",
|
DatabaseType = "Jellyfin-SQLite",
|
||||||
|
LockingBehavior = DatabaseLockingBehaviorTypes.NoLock
|
||||||
};
|
};
|
||||||
configurationManager.SaveConfiguration("database", efCoreConfiguration);
|
configurationManager.SaveConfiguration("database", efCoreConfiguration);
|
||||||
}
|
}
|
||||||
@ -85,10 +87,25 @@ public static class ServiceCollectionExtensions
|
|||||||
|
|
||||||
serviceCollection.AddSingleton<IJellyfinDatabaseProvider>(providerFactory!);
|
serviceCollection.AddSingleton<IJellyfinDatabaseProvider>(providerFactory!);
|
||||||
|
|
||||||
|
switch (efCoreConfiguration.LockingBehavior)
|
||||||
|
{
|
||||||
|
case DatabaseLockingBehaviorTypes.NoLock:
|
||||||
|
serviceCollection.AddSingleton<IEntityFrameworkCoreLockingBehavior, NoLockBehavior>();
|
||||||
|
break;
|
||||||
|
case DatabaseLockingBehaviorTypes.Pessimistic:
|
||||||
|
serviceCollection.AddSingleton<IEntityFrameworkCoreLockingBehavior, PessimisticLockBehavior>();
|
||||||
|
break;
|
||||||
|
case DatabaseLockingBehaviorTypes.Optimistic:
|
||||||
|
serviceCollection.AddSingleton<IEntityFrameworkCoreLockingBehavior, OptimisticLockBehavior>();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
serviceCollection.AddPooledDbContextFactory<JellyfinDbContext>((serviceProvider, opt) =>
|
serviceCollection.AddPooledDbContextFactory<JellyfinDbContext>((serviceProvider, opt) =>
|
||||||
{
|
{
|
||||||
var provider = serviceProvider.GetRequiredService<IJellyfinDatabaseProvider>();
|
var provider = serviceProvider.GetRequiredService<IJellyfinDatabaseProvider>();
|
||||||
provider.Initialise(opt);
|
provider.Initialise(opt);
|
||||||
|
var lockingBehavior = serviceProvider.GetRequiredService<IEntityFrameworkCoreLockingBehavior>();
|
||||||
|
lockingBehavior.Initialise(opt);
|
||||||
});
|
});
|
||||||
|
|
||||||
return serviceCollection;
|
return serviceCollection;
|
||||||
|
@ -54,7 +54,7 @@ public class PeopleRepository(IDbContextFactory<JellyfinDbContext> dbProvider, I
|
|||||||
public IReadOnlyList<string> GetPeopleNames(InternalPeopleQuery filter)
|
public IReadOnlyList<string> GetPeopleNames(InternalPeopleQuery filter)
|
||||||
{
|
{
|
||||||
using var context = _dbProvider.CreateDbContext();
|
using var context = _dbProvider.CreateDbContext();
|
||||||
var dbQuery = TranslateQuery(context.Peoples.AsNoTracking(), context, filter);
|
var dbQuery = TranslateQuery(context.Peoples.AsNoTracking(), context, filter).Select(e => e.Name).Distinct();
|
||||||
|
|
||||||
// dbQuery = dbQuery.OrderBy(e => e.ListOrder);
|
// dbQuery = dbQuery.OrderBy(e => e.ListOrder);
|
||||||
if (filter.Limit > 0)
|
if (filter.Limit > 0)
|
||||||
@ -62,7 +62,7 @@ public class PeopleRepository(IDbContextFactory<JellyfinDbContext> dbProvider, I
|
|||||||
dbQuery = dbQuery.Take(filter.Limit);
|
dbQuery = dbQuery.Take(filter.Limit);
|
||||||
}
|
}
|
||||||
|
|
||||||
return dbQuery.Select(e => e.Name).ToArray();
|
return dbQuery.ToArray();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
@ -141,8 +141,13 @@ public class PeopleRepository(IDbContextFactory<JellyfinDbContext> dbProvider, I
|
|||||||
if (filter.User is not null && filter.IsFavorite.HasValue)
|
if (filter.User is not null && filter.IsFavorite.HasValue)
|
||||||
{
|
{
|
||||||
var personType = itemTypeLookup.BaseItemKindNames[BaseItemKind.Person];
|
var personType = itemTypeLookup.BaseItemKindNames[BaseItemKind.Person];
|
||||||
query = query
|
var oldQuery = query;
|
||||||
.Where(e => context.BaseItems.Any(b => b.Type == personType && b.Name == e.Name && b.UserData!.Any(u => u.IsFavorite == filter.IsFavorite && u.UserId.Equals(filter.User.Id))));
|
|
||||||
|
query = context.UserData
|
||||||
|
.Where(u => u.Item!.Type == personType && u.IsFavorite == filter.IsFavorite && u.UserId.Equals(filter.User.Id))
|
||||||
|
.Join(oldQuery, e => e.Item!.Name, e => e.Name, (item, person) => person)
|
||||||
|
.Distinct()
|
||||||
|
.AsNoTracking();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!filter.ItemId.IsEmpty())
|
if (!filter.ItemId.IsEmpty())
|
||||||
|
@ -9,4 +9,10 @@ public class DatabaseConfigurationOptions
|
|||||||
/// Gets or Sets the type of database jellyfin should use.
|
/// Gets or Sets the type of database jellyfin should use.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public required string DatabaseType { get; set; }
|
public required string DatabaseType { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets or Sets the kind of locking behavior jellyfin should perform. Possible options are "NoLock", "Pessimistic", "Optimistic".
|
||||||
|
/// Defaults to "NoLock".
|
||||||
|
/// </summary>
|
||||||
|
public DatabaseLockingBehaviorTypes LockingBehavior { get; set; }
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,22 @@
|
|||||||
|
namespace Jellyfin.Database.Implementations.DbConfiguration;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Defines all possible methods for locking database access for concurrent queries.
|
||||||
|
/// </summary>
|
||||||
|
public enum DatabaseLockingBehaviorTypes
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Defines that no explicit application level locking for reads and writes should be done and only provider specific locking should be relied on.
|
||||||
|
/// </summary>
|
||||||
|
NoLock = 0,
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Defines a behavior that always blocks all reads while any one write is done.
|
||||||
|
/// </summary>
|
||||||
|
Pessimistic = 1,
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Defines that all writes should be attempted and when fail should be retried.
|
||||||
|
/// </summary>
|
||||||
|
Optimistic = 2
|
||||||
|
}
|
@ -24,6 +24,7 @@
|
|||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Polly" />
|
||||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" />
|
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" />
|
||||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Design">
|
<PackageReference Include="Microsoft.EntityFrameworkCore.Design">
|
||||||
<PrivateAssets>all</PrivateAssets>
|
<PrivateAssets>all</PrivateAssets>
|
||||||
|
@ -1,9 +1,14 @@
|
|||||||
using System;
|
using System;
|
||||||
|
using System.Data.Common;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
using Jellyfin.Database.Implementations.Entities;
|
using Jellyfin.Database.Implementations.Entities;
|
||||||
using Jellyfin.Database.Implementations.Entities.Security;
|
using Jellyfin.Database.Implementations.Entities.Security;
|
||||||
using Jellyfin.Database.Implementations.Interfaces;
|
using Jellyfin.Database.Implementations.Interfaces;
|
||||||
|
using Jellyfin.Database.Implementations.Locking;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.EntityFrameworkCore.Diagnostics;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
namespace Jellyfin.Database.Implementations;
|
namespace Jellyfin.Database.Implementations;
|
||||||
@ -15,7 +20,8 @@ namespace Jellyfin.Database.Implementations;
|
|||||||
/// <param name="options">The database context options.</param>
|
/// <param name="options">The database context options.</param>
|
||||||
/// <param name="logger">Logger.</param>
|
/// <param name="logger">Logger.</param>
|
||||||
/// <param name="jellyfinDatabaseProvider">The provider for the database engine specific operations.</param>
|
/// <param name="jellyfinDatabaseProvider">The provider for the database engine specific operations.</param>
|
||||||
public class JellyfinDbContext(DbContextOptions<JellyfinDbContext> options, ILogger<JellyfinDbContext> logger, IJellyfinDatabaseProvider jellyfinDatabaseProvider) : DbContext(options)
|
/// <param name="entityFrameworkCoreLocking">The locking behavior.</param>
|
||||||
|
public class JellyfinDbContext(DbContextOptions<JellyfinDbContext> options, ILogger<JellyfinDbContext> logger, IJellyfinDatabaseProvider jellyfinDatabaseProvider, IEntityFrameworkCoreLockingBehavior entityFrameworkCoreLocking) : DbContext(options)
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Gets the <see cref="DbSet{TEntity}"/> containing the access schedules.
|
/// Gets the <see cref="DbSet{TEntity}"/> containing the access schedules.
|
||||||
@ -247,7 +253,50 @@ public class JellyfinDbContext(DbContextOptions<JellyfinDbContext> options, ILog
|
|||||||
public DbSet<TrackMetadata> TrackMetadata => Set<TrackMetadata>();*/
|
public DbSet<TrackMetadata> TrackMetadata => Set<TrackMetadata>();*/
|
||||||
|
|
||||||
/// <inheritdoc/>
|
/// <inheritdoc/>
|
||||||
public override int SaveChanges()
|
public override async Task<int> SaveChangesAsync(
|
||||||
|
bool acceptAllChangesOnSuccess,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
HandleConcurrencyToken();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var result = -1;
|
||||||
|
await entityFrameworkCoreLocking.OnSaveChangesAsync(this, async () =>
|
||||||
|
{
|
||||||
|
result = await base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken).ConfigureAwait(false);
|
||||||
|
}).ConfigureAwait(false);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
logger.LogError(e, "Error trying to save changes.");
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc/>
|
||||||
|
public override int SaveChanges(bool acceptAllChangesOnSuccess) // SaveChanges(bool) is beeing called by SaveChanges() with default to false.
|
||||||
|
{
|
||||||
|
HandleConcurrencyToken();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var result = -1;
|
||||||
|
entityFrameworkCoreLocking.OnSaveChanges(this, () =>
|
||||||
|
{
|
||||||
|
result = base.SaveChanges(acceptAllChangesOnSuccess);
|
||||||
|
});
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
logger.LogError(e, "Error trying to save changes.");
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void HandleConcurrencyToken()
|
||||||
{
|
{
|
||||||
foreach (var saveEntity in ChangeTracker.Entries()
|
foreach (var saveEntity in ChangeTracker.Entries()
|
||||||
.Where(e => e.State == EntityState.Modified)
|
.Where(e => e.State == EntityState.Modified)
|
||||||
@ -256,16 +305,6 @@ public class JellyfinDbContext(DbContextOptions<JellyfinDbContext> options, ILog
|
|||||||
{
|
{
|
||||||
saveEntity.OnSavingChanges();
|
saveEntity.OnSavingChanges();
|
||||||
}
|
}
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
return base.SaveChanges();
|
|
||||||
}
|
|
||||||
catch (Exception e)
|
|
||||||
{
|
|
||||||
logger.LogError(e, "Error trying to save changes.");
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
|
@ -0,0 +1,32 @@
|
|||||||
|
using System;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
|
||||||
|
namespace Jellyfin.Database.Implementations.Locking;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Defines a jellyfin locking behavior that can be configured.
|
||||||
|
/// </summary>
|
||||||
|
public interface IEntityFrameworkCoreLockingBehavior
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Provides access to the builder to setup any connection related locking behavior.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="optionsBuilder">The options builder.</param>
|
||||||
|
void Initialise(DbContextOptionsBuilder optionsBuilder);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Will be invoked when changes should be saved in the current locking behavior.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="context">The database context invoking the action.</param>
|
||||||
|
/// <param name="saveChanges">Callback for performing the actual save changes.</param>
|
||||||
|
void OnSaveChanges(JellyfinDbContext context, Action saveChanges);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Will be invoked when changes should be saved in the current locking behavior.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="context">The database context invoking the action.</param>
|
||||||
|
/// <param name="saveChanges">Callback for performing the actual save changes.</param>
|
||||||
|
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
|
||||||
|
Task OnSaveChangesAsync(JellyfinDbContext context, Func<Task> saveChanges);
|
||||||
|
}
|
@ -0,0 +1,41 @@
|
|||||||
|
using System;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
|
namespace Jellyfin.Database.Implementations.Locking;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Default lock behavior. Defines no explicit application locking behavior.
|
||||||
|
/// </summary>
|
||||||
|
public class NoLockBehavior : IEntityFrameworkCoreLockingBehavior
|
||||||
|
{
|
||||||
|
private readonly ILogger<NoLockBehavior> _logger;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Initializes a new instance of the <see cref="NoLockBehavior"/> class.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="logger">The Application logger.</param>
|
||||||
|
public NoLockBehavior(ILogger<NoLockBehavior> logger)
|
||||||
|
{
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc/>
|
||||||
|
public void OnSaveChanges(JellyfinDbContext context, Action saveChanges)
|
||||||
|
{
|
||||||
|
saveChanges();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc/>
|
||||||
|
public void Initialise(DbContextOptionsBuilder optionsBuilder)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("The database locking mode has been set to: NoLock.");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc/>
|
||||||
|
public async Task OnSaveChangesAsync(JellyfinDbContext context, Func<Task> saveChanges)
|
||||||
|
{
|
||||||
|
await saveChanges().ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,137 @@
|
|||||||
|
using System;
|
||||||
|
using System.Data.Common;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.EntityFrameworkCore.Diagnostics;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Polly;
|
||||||
|
|
||||||
|
namespace Jellyfin.Database.Implementations.Locking;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Defines a locking mechanism that will retry any write operation for a few times.
|
||||||
|
/// </summary>
|
||||||
|
public class OptimisticLockBehavior : IEntityFrameworkCoreLockingBehavior
|
||||||
|
{
|
||||||
|
private readonly Policy _writePolicy;
|
||||||
|
private readonly AsyncPolicy _writeAsyncPolicy;
|
||||||
|
private readonly ILogger<OptimisticLockBehavior> _logger;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Initializes a new instance of the <see cref="OptimisticLockBehavior"/> class.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="logger">The application logger.</param>
|
||||||
|
public OptimisticLockBehavior(ILogger<OptimisticLockBehavior> logger)
|
||||||
|
{
|
||||||
|
TimeSpan[] sleepDurations = [
|
||||||
|
TimeSpan.FromMilliseconds(50),
|
||||||
|
TimeSpan.FromMilliseconds(50),
|
||||||
|
TimeSpan.FromMilliseconds(250),
|
||||||
|
TimeSpan.FromMilliseconds(150),
|
||||||
|
TimeSpan.FromMilliseconds(500),
|
||||||
|
TimeSpan.FromMilliseconds(500),
|
||||||
|
TimeSpan.FromSeconds(3)
|
||||||
|
];
|
||||||
|
_logger = logger;
|
||||||
|
_writePolicy = Policy.HandleInner<Exception>(e => e.Message.Contains("database is locked", StringComparison.InvariantCultureIgnoreCase)).WaitAndRetry(sleepDurations, RetryHandle);
|
||||||
|
_writeAsyncPolicy = Policy.HandleInner<Exception>(e => e.Message.Contains("database is locked", StringComparison.InvariantCultureIgnoreCase)).WaitAndRetryAsync(sleepDurations, RetryHandle);
|
||||||
|
|
||||||
|
void RetryHandle(Exception exception, TimeSpan timespan, int retryNo, Context context)
|
||||||
|
{
|
||||||
|
if (retryNo < sleepDurations.Length)
|
||||||
|
{
|
||||||
|
_logger.LogWarning("Operation failed retry {RetryNo}", retryNo);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
_logger.LogError(exception, "Operation failed retry {RetryNo}", retryNo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc/>
|
||||||
|
public void Initialise(DbContextOptionsBuilder optionsBuilder)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("The database locking mode has been set to: Optimistic.");
|
||||||
|
optionsBuilder.AddInterceptors(new RetryInterceptor(_writeAsyncPolicy, _writePolicy));
|
||||||
|
optionsBuilder.AddInterceptors(new TransactionLockingInterceptor(_writeAsyncPolicy, _writePolicy));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc/>
|
||||||
|
public void OnSaveChanges(JellyfinDbContext context, Action saveChanges)
|
||||||
|
{
|
||||||
|
_writePolicy.ExecuteAndCapture(saveChanges);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc/>
|
||||||
|
public async Task OnSaveChangesAsync(JellyfinDbContext context, Func<Task> saveChanges)
|
||||||
|
{
|
||||||
|
await _writeAsyncPolicy.ExecuteAndCaptureAsync(saveChanges).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class TransactionLockingInterceptor : DbTransactionInterceptor
|
||||||
|
{
|
||||||
|
private readonly AsyncPolicy _asyncRetryPolicy;
|
||||||
|
private readonly Policy _retryPolicy;
|
||||||
|
|
||||||
|
public TransactionLockingInterceptor(AsyncPolicy asyncRetryPolicy, Policy retryPolicy)
|
||||||
|
{
|
||||||
|
_asyncRetryPolicy = asyncRetryPolicy;
|
||||||
|
_retryPolicy = retryPolicy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public override InterceptionResult<DbTransaction> TransactionStarting(DbConnection connection, TransactionStartingEventData eventData, InterceptionResult<DbTransaction> result)
|
||||||
|
{
|
||||||
|
return InterceptionResult<DbTransaction>.SuppressWithResult(_retryPolicy.Execute(() => connection.BeginTransaction(eventData.IsolationLevel)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public override async ValueTask<InterceptionResult<DbTransaction>> TransactionStartingAsync(DbConnection connection, TransactionStartingEventData eventData, InterceptionResult<DbTransaction> result, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return InterceptionResult<DbTransaction>.SuppressWithResult(await _asyncRetryPolicy.ExecuteAsync(async () => await connection.BeginTransactionAsync(eventData.IsolationLevel, cancellationToken).ConfigureAwait(false)).ConfigureAwait(false));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class RetryInterceptor : DbCommandInterceptor
|
||||||
|
{
|
||||||
|
private readonly AsyncPolicy _asyncRetryPolicy;
|
||||||
|
private readonly Policy _retryPolicy;
|
||||||
|
|
||||||
|
public RetryInterceptor(AsyncPolicy asyncRetryPolicy, Policy retryPolicy)
|
||||||
|
{
|
||||||
|
_asyncRetryPolicy = asyncRetryPolicy;
|
||||||
|
_retryPolicy = retryPolicy;
|
||||||
|
}
|
||||||
|
|
||||||
|
public override InterceptionResult<int> NonQueryExecuting(DbCommand command, CommandEventData eventData, InterceptionResult<int> result)
|
||||||
|
{
|
||||||
|
return InterceptionResult<int>.SuppressWithResult(_retryPolicy.Execute(command.ExecuteNonQuery));
|
||||||
|
}
|
||||||
|
|
||||||
|
public override async ValueTask<InterceptionResult<int>> NonQueryExecutingAsync(DbCommand command, CommandEventData eventData, InterceptionResult<int> result, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return InterceptionResult<int>.SuppressWithResult(await _asyncRetryPolicy.ExecuteAsync(async () => await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false)).ConfigureAwait(false));
|
||||||
|
}
|
||||||
|
|
||||||
|
public override InterceptionResult<object> ScalarExecuting(DbCommand command, CommandEventData eventData, InterceptionResult<object> result)
|
||||||
|
{
|
||||||
|
return InterceptionResult<object>.SuppressWithResult(_retryPolicy.Execute(() => command.ExecuteScalar()!));
|
||||||
|
}
|
||||||
|
|
||||||
|
public override async ValueTask<InterceptionResult<object>> ScalarExecutingAsync(DbCommand command, CommandEventData eventData, InterceptionResult<object> result, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return InterceptionResult<object>.SuppressWithResult((await _asyncRetryPolicy.ExecuteAsync(async () => await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false)!).ConfigureAwait(false))!);
|
||||||
|
}
|
||||||
|
|
||||||
|
public override InterceptionResult<DbDataReader> ReaderExecuting(DbCommand command, CommandEventData eventData, InterceptionResult<DbDataReader> result)
|
||||||
|
{
|
||||||
|
return InterceptionResult<DbDataReader>.SuppressWithResult(_retryPolicy.Execute(command.ExecuteReader));
|
||||||
|
}
|
||||||
|
|
||||||
|
public override async ValueTask<InterceptionResult<DbDataReader>> ReaderExecutingAsync(DbCommand command, CommandEventData eventData, InterceptionResult<DbDataReader> result, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return InterceptionResult<DbDataReader>.SuppressWithResult(await _asyncRetryPolicy.ExecuteAsync(async () => await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false)).ConfigureAwait(false));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,296 @@
|
|||||||
|
#pragma warning disable MT1013 // Releasing lock without guarantee of execution
|
||||||
|
#pragma warning disable MT1012 // Acquiring lock without guarantee of releasing
|
||||||
|
|
||||||
|
using System;
|
||||||
|
using System.Data;
|
||||||
|
using System.Data.Common;
|
||||||
|
using System.Runtime.CompilerServices;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.EntityFrameworkCore.Diagnostics;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
|
namespace Jellyfin.Database.Implementations.Locking;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// A locking behavior that will always block any operation while a write is requested. Mimicks the old SqliteRepository behavior.
|
||||||
|
/// </summary>
|
||||||
|
public class PessimisticLockBehavior : IEntityFrameworkCoreLockingBehavior
|
||||||
|
{
|
||||||
|
private readonly ILogger<PessimisticLockBehavior> _logger;
|
||||||
|
private readonly ILoggerFactory _loggerFactory;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Initializes a new instance of the <see cref="PessimisticLockBehavior"/> class.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="logger">The application logger.</param>
|
||||||
|
/// <param name="loggerFactory">The logger factory.</param>
|
||||||
|
public PessimisticLockBehavior(ILogger<PessimisticLockBehavior> logger, ILoggerFactory loggerFactory)
|
||||||
|
{
|
||||||
|
_logger = logger;
|
||||||
|
_loggerFactory = loggerFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ReaderWriterLockSlim DatabaseLock { get; } = new(LockRecursionPolicy.SupportsRecursion);
|
||||||
|
|
||||||
|
/// <inheritdoc/>
|
||||||
|
public void OnSaveChanges(JellyfinDbContext context, Action saveChanges)
|
||||||
|
{
|
||||||
|
using (DbLock.EnterWrite(_logger))
|
||||||
|
{
|
||||||
|
saveChanges();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc/>
|
||||||
|
public void Initialise(DbContextOptionsBuilder optionsBuilder)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("The database locking mode has been set to: Pessimistic.");
|
||||||
|
optionsBuilder.AddInterceptors(new CommandLockingInterceptor(_loggerFactory.CreateLogger<CommandLockingInterceptor>()));
|
||||||
|
optionsBuilder.AddInterceptors(new TransactionLockingInterceptor(_loggerFactory.CreateLogger<TransactionLockingInterceptor>()));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc/>
|
||||||
|
public async Task OnSaveChangesAsync(JellyfinDbContext context, Func<Task> saveChanges)
|
||||||
|
{
|
||||||
|
using (DbLock.EnterWrite(_logger))
|
||||||
|
{
|
||||||
|
await saveChanges().ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class TransactionLockingInterceptor : DbTransactionInterceptor
|
||||||
|
{
|
||||||
|
private readonly ILogger _logger;
|
||||||
|
|
||||||
|
public TransactionLockingInterceptor(ILogger logger)
|
||||||
|
{
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public override InterceptionResult<DbTransaction> TransactionStarting(DbConnection connection, TransactionStartingEventData eventData, InterceptionResult<DbTransaction> result)
|
||||||
|
{
|
||||||
|
DbLock.BeginWriteLock(_logger);
|
||||||
|
|
||||||
|
return base.TransactionStarting(connection, eventData, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
public override ValueTask<InterceptionResult<DbTransaction>> TransactionStartingAsync(DbConnection connection, TransactionStartingEventData eventData, InterceptionResult<DbTransaction> result, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
DbLock.BeginWriteLock(_logger);
|
||||||
|
|
||||||
|
return base.TransactionStartingAsync(connection, eventData, result, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public override void TransactionCommitted(DbTransaction transaction, TransactionEndEventData eventData)
|
||||||
|
{
|
||||||
|
DbLock.EndWriteLock(_logger);
|
||||||
|
|
||||||
|
base.TransactionCommitted(transaction, eventData);
|
||||||
|
}
|
||||||
|
|
||||||
|
public override Task TransactionCommittedAsync(DbTransaction transaction, TransactionEndEventData eventData, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
DbLock.EndWriteLock(_logger);
|
||||||
|
|
||||||
|
return base.TransactionCommittedAsync(transaction, eventData, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public override void TransactionFailed(DbTransaction transaction, TransactionErrorEventData eventData)
|
||||||
|
{
|
||||||
|
DbLock.EndWriteLock(_logger);
|
||||||
|
|
||||||
|
base.TransactionFailed(transaction, eventData);
|
||||||
|
}
|
||||||
|
|
||||||
|
public override Task TransactionFailedAsync(DbTransaction transaction, TransactionErrorEventData eventData, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
DbLock.EndWriteLock(_logger);
|
||||||
|
|
||||||
|
return base.TransactionFailedAsync(transaction, eventData, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public override void TransactionRolledBack(DbTransaction transaction, TransactionEndEventData eventData)
|
||||||
|
{
|
||||||
|
DbLock.EndWriteLock(_logger);
|
||||||
|
|
||||||
|
base.TransactionRolledBack(transaction, eventData);
|
||||||
|
}
|
||||||
|
|
||||||
|
public override Task TransactionRolledBackAsync(DbTransaction transaction, TransactionEndEventData eventData, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
DbLock.EndWriteLock(_logger);
|
||||||
|
|
||||||
|
return base.TransactionRolledBackAsync(transaction, eventData, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Adds strict read/write locking.
|
||||||
|
/// </summary>
|
||||||
|
private sealed class CommandLockingInterceptor : DbCommandInterceptor
|
||||||
|
{
|
||||||
|
private readonly ILogger _logger;
|
||||||
|
|
||||||
|
public CommandLockingInterceptor(ILogger logger)
|
||||||
|
{
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
public override InterceptionResult<int> NonQueryExecuting(DbCommand command, CommandEventData eventData, InterceptionResult<int> result)
|
||||||
|
{
|
||||||
|
using (DbLock.EnterWrite(_logger, command))
|
||||||
|
{
|
||||||
|
return InterceptionResult<int>.SuppressWithResult(command.ExecuteNonQuery());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public override async ValueTask<InterceptionResult<int>> NonQueryExecutingAsync(DbCommand command, CommandEventData eventData, InterceptionResult<int> result, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
using (DbLock.EnterWrite(_logger, command))
|
||||||
|
{
|
||||||
|
return InterceptionResult<int>.SuppressWithResult(await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public override InterceptionResult<object> ScalarExecuting(DbCommand command, CommandEventData eventData, InterceptionResult<object> result)
|
||||||
|
{
|
||||||
|
using (DbLock.EnterRead(_logger))
|
||||||
|
{
|
||||||
|
return InterceptionResult<object>.SuppressWithResult(command.ExecuteScalar()!);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public override async ValueTask<InterceptionResult<object>> ScalarExecutingAsync(DbCommand command, CommandEventData eventData, InterceptionResult<object> result, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
using (DbLock.EnterRead(_logger))
|
||||||
|
{
|
||||||
|
return InterceptionResult<object>.SuppressWithResult((await command.ExecuteScalarAsync(cancellationToken).ConfigureAwait(false))!);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public override InterceptionResult<DbDataReader> ReaderExecuting(DbCommand command, CommandEventData eventData, InterceptionResult<DbDataReader> result)
|
||||||
|
{
|
||||||
|
using (DbLock.EnterRead(_logger))
|
||||||
|
{
|
||||||
|
return InterceptionResult<DbDataReader>.SuppressWithResult(command.ExecuteReader()!);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public override async ValueTask<InterceptionResult<DbDataReader>> ReaderExecutingAsync(DbCommand command, CommandEventData eventData, InterceptionResult<DbDataReader> result, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
using (DbLock.EnterRead(_logger))
|
||||||
|
{
|
||||||
|
return InterceptionResult<DbDataReader>.SuppressWithResult(await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class DbLock : IDisposable
|
||||||
|
{
|
||||||
|
private readonly Action? _action;
|
||||||
|
private bool _disposed;
|
||||||
|
|
||||||
|
private static readonly IDisposable _noLock = new DbLock(null) { _disposed = true };
|
||||||
|
private static (string Command, Guid Id, DateTimeOffset QueryDate, bool Printed) _blockQuery;
|
||||||
|
|
||||||
|
public DbLock(Action? action = null)
|
||||||
|
{
|
||||||
|
_action = action;
|
||||||
|
}
|
||||||
|
|
||||||
|
#pragma warning disable IDISP015 // Member should not return created and cached instance
|
||||||
|
public static IDisposable EnterWrite(ILogger logger, IDbCommand? command = null, [CallerMemberName] string? callerMemberName = null, [CallerLineNumber] int? callerNo = null)
|
||||||
|
#pragma warning restore IDISP015 // Member should not return created and cached instance
|
||||||
|
{
|
||||||
|
logger.LogTrace("Enter Write for {Caller}:{Line}", callerMemberName, callerNo);
|
||||||
|
if (DatabaseLock.IsWriteLockHeld)
|
||||||
|
{
|
||||||
|
logger.LogTrace("Write Held {Caller}:{Line}", callerMemberName, callerNo);
|
||||||
|
return _noLock;
|
||||||
|
}
|
||||||
|
|
||||||
|
BeginWriteLock(logger, command, callerMemberName, callerNo);
|
||||||
|
return new DbLock(() =>
|
||||||
|
{
|
||||||
|
EndWriteLock(logger, callerMemberName, callerNo);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#pragma warning disable IDISP015 // Member should not return created and cached instance
|
||||||
|
public static IDisposable EnterRead(ILogger logger, [CallerMemberName] string? callerMemberName = null, [CallerLineNumber] int? callerNo = null)
|
||||||
|
#pragma warning restore IDISP015 // Member should not return created and cached instance
|
||||||
|
{
|
||||||
|
logger.LogTrace("Enter Read {Caller}:{Line}", callerMemberName, callerNo);
|
||||||
|
if (DatabaseLock.IsWriteLockHeld)
|
||||||
|
{
|
||||||
|
logger.LogTrace("Write Held {Caller}:{Line}", callerMemberName, callerNo);
|
||||||
|
return _noLock;
|
||||||
|
}
|
||||||
|
|
||||||
|
BeginReadLock(logger, callerMemberName, callerNo);
|
||||||
|
return new DbLock(() =>
|
||||||
|
{
|
||||||
|
ExitReadLock(logger, callerMemberName, callerNo);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void BeginWriteLock(ILogger logger, IDbCommand? command = null, [CallerMemberName] string? callerMemberName = null, [CallerLineNumber] int? callerNo = null)
|
||||||
|
{
|
||||||
|
logger.LogTrace("Aquire Write {Caller}:{Line}", callerMemberName, callerNo);
|
||||||
|
if (!DatabaseLock.TryEnterWriteLock(TimeSpan.FromMilliseconds(1000)))
|
||||||
|
{
|
||||||
|
var blockingQuery = _blockQuery;
|
||||||
|
if (!blockingQuery.Printed)
|
||||||
|
{
|
||||||
|
_blockQuery = (blockingQuery.Command, blockingQuery.Id, blockingQuery.QueryDate, true);
|
||||||
|
logger.LogInformation("QueryLock: {Id} --- {Query}", blockingQuery.Id, blockingQuery.Command);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.LogInformation("Query congestion detected: '{Id}' since '{Date}'", blockingQuery.Id, blockingQuery.QueryDate);
|
||||||
|
|
||||||
|
DatabaseLock.EnterWriteLock();
|
||||||
|
|
||||||
|
logger.LogInformation("Query congestion cleared: '{Id}' for '{Date}'", blockingQuery.Id, DateTimeOffset.Now - blockingQuery.QueryDate);
|
||||||
|
}
|
||||||
|
|
||||||
|
_blockQuery = (command?.CommandText ?? "Transaction", Guid.NewGuid(), DateTimeOffset.Now, false);
|
||||||
|
|
||||||
|
logger.LogTrace("Write Aquired {Caller}:{Line}", callerMemberName, callerNo);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void BeginReadLock(ILogger logger, [CallerMemberName] string? callerMemberName = null, [CallerLineNumber] int? callerNo = null)
|
||||||
|
{
|
||||||
|
logger.LogTrace("Aquire Write {Caller}:{Line}", callerMemberName, callerNo);
|
||||||
|
DatabaseLock.EnterReadLock();
|
||||||
|
logger.LogTrace("Read Aquired {Caller}:{Line}", callerMemberName, callerNo);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void EndWriteLock(ILogger logger, [CallerMemberName] string? callerMemberName = null, [CallerLineNumber] int? callerNo = null)
|
||||||
|
{
|
||||||
|
logger.LogTrace("Release Write {Caller}:{Line}", callerMemberName, callerNo);
|
||||||
|
DatabaseLock.ExitWriteLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void ExitReadLock(ILogger logger, [CallerMemberName] string? callerMemberName = null, [CallerLineNumber] int? callerNo = null)
|
||||||
|
{
|
||||||
|
logger.LogTrace("Release Read {Caller}:{Line}", callerMemberName, callerNo);
|
||||||
|
DatabaseLock.ExitReadLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
if (_disposed)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_disposed = true;
|
||||||
|
if (_action is not null)
|
||||||
|
{
|
||||||
|
_action();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,5 @@
|
|||||||
using Jellyfin.Database.Implementations;
|
using Jellyfin.Database.Implementations;
|
||||||
|
using Jellyfin.Database.Implementations.Locking;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using Microsoft.EntityFrameworkCore.Design;
|
using Microsoft.EntityFrameworkCore.Design;
|
||||||
using Microsoft.Extensions.Logging.Abstractions;
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
@ -19,7 +20,8 @@ namespace Jellyfin.Database.Providers.Sqlite.Migrations
|
|||||||
return new JellyfinDbContext(
|
return new JellyfinDbContext(
|
||||||
optionsBuilder.Options,
|
optionsBuilder.Options,
|
||||||
NullLogger<JellyfinDbContext>.Instance,
|
NullLogger<JellyfinDbContext>.Instance,
|
||||||
new SqliteDatabaseProvider(null!, NullLogger<SqliteDatabaseProvider>.Instance));
|
new SqliteDatabaseProvider(null!, NullLogger<SqliteDatabaseProvider>.Instance),
|
||||||
|
new NoLockBehavior(NullLogger<NoLockBehavior>.Instance));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user