Publish WatchStatus changes to rabbitmq

This commit is contained in:
Zoe Roux 2024-03-17 19:25:28 +01:00
parent cbb05ac977
commit b6f9c050e1
No known key found for this signature in database
5 changed files with 124 additions and 31 deletions

View File

@ -30,7 +30,6 @@ namespace Kyoo.Abstractions.Controllers;
public interface IWatchStatusRepository public interface IWatchStatusRepository
{ {
public delegate Task ResourceEventHandler<T>(T resource); public delegate Task ResourceEventHandler<T>(T resource);
public delegate Task WatchStatusDeletedEventHandler(Guid resourceId, Guid userId);
Task<ICollection<IWatchlist>> GetAll( Task<ICollection<IWatchlist>> GetAll(
Filter<IWatchlist>? filter = default, Filter<IWatchlist>? filter = default,
@ -54,10 +53,6 @@ public interface IWatchStatusRepository
Task DeleteMovieStatus(Guid movieId, Guid userId); Task DeleteMovieStatus(Guid movieId, Guid userId);
static event WatchStatusDeletedEventHandler OnMovieStatusDeletedHandler;
protected static Task OnMovieStatusDeleted(Guid movieId, Guid userId) =>
OnMovieStatusDeletedHandler?.Invoke(movieId, userId) ?? Task.CompletedTask;
Task<ShowWatchStatus?> GetShowStatus(Guid showId, Guid userId); Task<ShowWatchStatus?> GetShowStatus(Guid showId, Guid userId);
Task<ShowWatchStatus?> SetShowStatus(Guid showId, Guid userId, WatchStatus status); Task<ShowWatchStatus?> SetShowStatus(Guid showId, Guid userId, WatchStatus status);
@ -68,10 +63,6 @@ public interface IWatchStatusRepository
Task DeleteShowStatus(Guid showId, Guid userId); Task DeleteShowStatus(Guid showId, Guid userId);
static event WatchStatusDeletedEventHandler OnShowStatusDeletedHandler;
protected static Task OnShowStatusDeleted(Guid showId, Guid userId) =>
OnShowStatusDeletedHandler?.Invoke(showId, userId) ?? Task.CompletedTask;
Task<EpisodeWatchStatus?> GetEpisodeStatus(Guid episodeId, Guid userId); Task<EpisodeWatchStatus?> GetEpisodeStatus(Guid episodeId, Guid userId);
/// <param name="watchedTime">Where the user has stopped watching. Only usable if Status /// <param name="watchedTime">Where the user has stopped watching. Only usable if Status
@ -89,9 +80,4 @@ public interface IWatchStatusRepository
OnEpisodeStatusChangedHandler?.Invoke(obj) ?? Task.CompletedTask; OnEpisodeStatusChangedHandler?.Invoke(obj) ?? Task.CompletedTask;
Task DeleteEpisodeStatus(Guid episodeId, Guid userId); Task DeleteEpisodeStatus(Guid episodeId, Guid userId);
static event WatchStatusDeletedEventHandler OnEpisodeStatusDeletedHandler;
protected static Task OnEpisodeStatusDeleted(Guid episodeId, Guid userId) =>
OnEpisodeStatusDeletedHandler?.Invoke(episodeId, userId) ?? Task.CompletedTask;
} }

View File

@ -46,13 +46,27 @@ public enum WatchStatus
/// The user has not started watching this but plans to. /// The user has not started watching this but plans to.
/// </summary> /// </summary>
Planned, Planned,
/// <summary>
/// The watch status was deleted and can not be retrived again.
/// </summary>
Deleted,
} }
public interface IWatchStatus
{
/// <summary>
/// Has the user started watching, is it planned?
/// </summary>
public WatchStatus Status { get; set; }
}
/// <summary> /// <summary>
/// Metadata of what an user as started/planned to watch. /// Metadata of what an user as started/planned to watch.
/// </summary> /// </summary>
[SqlFirstColumn(nameof(UserId))] [SqlFirstColumn(nameof(UserId))]
public class MovieWatchStatus : IAddedDate public class MovieWatchStatus : IAddedDate, IWatchStatus
{ {
/// <summary> /// <summary>
/// The ID of the user that started watching this episode. /// The ID of the user that started watching this episode.
@ -107,7 +121,7 @@ public class MovieWatchStatus : IAddedDate
} }
[SqlFirstColumn(nameof(UserId))] [SqlFirstColumn(nameof(UserId))]
public class EpisodeWatchStatus : IAddedDate public class EpisodeWatchStatus : IAddedDate, IWatchStatus
{ {
/// <summary> /// <summary>
/// The ID of the user that started watching this episode. /// The ID of the user that started watching this episode.
@ -162,7 +176,7 @@ public class EpisodeWatchStatus : IAddedDate
} }
[SqlFirstColumn(nameof(UserId))] [SqlFirstColumn(nameof(UserId))]
public class ShowWatchStatus : IAddedDate public class ShowWatchStatus : IAddedDate, IWatchStatus
{ {
/// <summary> /// <summary>
/// The ID of the user that started watching this episode. /// The ID of the user that started watching this episode.

View File

@ -38,7 +38,7 @@ public class WatchStatusRepository(
IRepository<Movie> movies, IRepository<Movie> movies,
DbConnection db, DbConnection db,
SqlVariableContext context SqlVariableContext context
) : IWatchStatusRepository ) : IWatchStatusRepository
{ {
/// <summary> /// <summary>
/// If the watch percent is below this value, don't consider the item started. /// If the watch percent is below this value, don't consider the item started.
@ -275,7 +275,15 @@ public class WatchStatusRepository(
await database await database
.MovieWatchStatus.Where(x => x.MovieId == movieId && x.UserId == userId) .MovieWatchStatus.Where(x => x.MovieId == movieId && x.UserId == userId)
.ExecuteDeleteAsync(); .ExecuteDeleteAsync();
await IWatchStatusRepository.OnMovieStatusDeleted(movieId, userId); await IWatchStatusRepository.OnMovieStatusChanged(
new()
{
UserId = userId,
MovieId = movieId,
AddedDate = DateTime.UtcNow,
Status = WatchStatus.Deleted,
}
);
} }
/// <inheritdoc /> /// <inheritdoc />
@ -419,7 +427,15 @@ public class WatchStatusRepository(
await database await database
.EpisodeWatchStatus.Where(x => x.Episode.ShowId == showId && x.UserId == userId) .EpisodeWatchStatus.Where(x => x.Episode.ShowId == showId && x.UserId == userId)
.ExecuteDeleteAsync(); .ExecuteDeleteAsync();
await IWatchStatusRepository.OnShowStatusDeleted(showId, userId); await IWatchStatusRepository.OnShowStatusChanged(
new()
{
UserId = userId,
ShowId = showId,
AddedDate = DateTime.UtcNow,
Status = WatchStatus.Deleted,
}
);
} }
/// <inheritdoc /> /// <inheritdoc />
@ -490,6 +506,14 @@ public class WatchStatusRepository(
await database await database
.EpisodeWatchStatus.Where(x => x.EpisodeId == episodeId && x.UserId == userId) .EpisodeWatchStatus.Where(x => x.EpisodeId == episodeId && x.UserId == userId)
.ExecuteDeleteAsync(); .ExecuteDeleteAsync();
await IWatchStatusRepository.OnEpisodeStatusDeleted(episodeId, userId); await IWatchStatusRepository.OnEpisodeStatusChanged(
new()
{
UserId = userId,
EpisodeId = episodeId,
AddedDate = DateTime.UtcNow,
Status = WatchStatus.Deleted,
}
);
} }
} }

View File

@ -0,0 +1,39 @@
// Kyoo - A portable and vast media library solution.
// Copyright (c) Kyoo.
//
// See AUTHORS.md and LICENSE file in the project root for full license information.
//
// Kyoo is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// any later version.
//
// Kyoo is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Kyoo. If not, see <https://www.gnu.org/licenses/>.
using System.Text;
using System.Text.Json;
namespace Kyoo.RabbitMq;
public class Message
{
public string Action { get; set; }
public string Type { get; set; }
public object Value { get; set; }
public string AsRoutingKey()
{
return $"{Type}.{Action}";
}
public byte[] AsBytes()
{
return Encoding.UTF8.GetBytes(JsonSerializer.Serialize(this));
}
}

View File

@ -16,8 +16,6 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Kyoo. If not, see <https://www.gnu.org/licenses/>. // along with Kyoo. If not, see <https://www.gnu.org/licenses/>.
using System.Text;
using System.Text.Json;
using Kyoo.Abstractions.Controllers; using Kyoo.Abstractions.Controllers;
using Kyoo.Abstractions.Models; using Kyoo.Abstractions.Models;
using RabbitMQ.Client; using RabbitMQ.Client;
@ -40,6 +38,16 @@ public class RabbitProducer
_ListenResourceEvents<Episode>("events.resource"); _ListenResourceEvents<Episode>("events.resource");
_ListenResourceEvents<Studio>("events.resource"); _ListenResourceEvents<Studio>("events.resource");
_ListenResourceEvents<User>("events.resource"); _ListenResourceEvents<User>("events.resource");
_channel.ExchangeDeclare("events.watched", ExchangeType.Topic);
IWatchStatusRepository.OnMovieStatusChangedHandler += _PublishWatchStatus<MovieWatchStatus>(
"movie"
);
IWatchStatusRepository.OnShowStatusChangedHandler += _PublishWatchStatus<ShowWatchStatus>(
"show"
);
IWatchStatusRepository.OnEpisodeStatusChangedHandler +=
_PublishWatchStatus<EpisodeWatchStatus>("episode");
} }
private void _ListenResourceEvents<T>(string exchange) private void _ListenResourceEvents<T>(string exchange)
@ -61,16 +69,38 @@ public class RabbitProducer
{ {
return (T resource) => return (T resource) =>
{ {
var message = new Message message =
{ new()
Action = action, {
Type = type, Action = action,
Resource = resource, Type = type,
}; Value = resource,
};
_channel.BasicPublish( _channel.BasicPublish(
exchange, exchange,
routingKey: $"{type}.{action}", routingKey: message.AsRoutingKey(),
body: Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message)) body: message.AsBytes()
);
return Task.CompletedTask;
};
}
private IWatchStatusRepository.ResourceEventHandler<T> _PublishWatchStatus<T>(string resource)
where T : IWatchStatus
{
return (status) =>
{
Message message =
new()
{
Type = resource,
Action = status.Status.ToString().ToLowerInvariant(),
Value = status,
};
_channel.BasicPublish(
exchange: "events.watched",
routingKey: message.AsRoutingKey(),
body: message.AsBytes()
); );
return Task.CompletedTask; return Task.CompletedTask;
}; };