diff --git a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs index 2292d86a4a..171047e65f 100644 --- a/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs +++ b/Emby.Server.Implementations/HttpServer/WebSocketConnection.cs @@ -94,6 +94,9 @@ namespace Emby.Server.Implementations.HttpServer /// The last activity date. public DateTime LastActivityDate { get; private set; } + /// + public DateTime LastKeepAliveDate { get; set; } + /// /// Gets the id. /// @@ -158,11 +161,6 @@ namespace Emby.Server.Implementations.HttpServer return; } - if (OnReceive == null) - { - return; - } - try { var stub = (WebSocketMessage)_jsonSerializer.DeserializeFromString(message, typeof(WebSocketMessage)); @@ -174,7 +172,15 @@ namespace Emby.Server.Implementations.HttpServer Connection = this }; - OnReceive(info); + if (info.MessageType.Equals("KeepAlive", StringComparison.Ordinal)) + { + SendKeepAliveResponse(); + } + + if (OnReceive != null) + { + OnReceive(info); + } } catch (Exception ex) { @@ -233,6 +239,15 @@ namespace Emby.Server.Implementations.HttpServer return _socket.SendAsync(text, true, cancellationToken); } + private Task SendKeepAliveResponse() + { + LastKeepAliveDate = DateTime.UtcNow; + return SendAsync(new WebSocketMessage + { + MessageType = "KeepAlive" + }, CancellationToken.None); + } + /// public void Dispose() { diff --git a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs index 930f2d35d3..d8e02ef395 100644 --- a/Emby.Server.Implementations/Session/SessionWebSocketListener.cs +++ b/Emby.Server.Implementations/Session/SessionWebSocketListener.cs @@ -1,8 +1,13 @@ using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Net.WebSockets; +using System.Threading; using System.Threading.Tasks; using MediaBrowser.Controller.Net; using MediaBrowser.Controller.Session; using MediaBrowser.Model.Events; +using MediaBrowser.Model.Net; using MediaBrowser.Model.Serialization; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; @@ -14,6 +19,21 @@ namespace Emby.Server.Implementations.Session /// public class SessionWebSocketListener : IWebSocketListener, IDisposable { + /// + /// The timeout in seconds after which a WebSocket is considered to be lost. + /// + public readonly int WebSocketLostTimeout = 60; + + /// + /// The timer factor; controls the frequency of the timer. + /// + public readonly double TimerFactor = 0.2; + + /// + /// The ForceKeepAlive factor; controls when a ForceKeepAlive is sent. + /// + public readonly double ForceKeepAliveFactor = 0.75; + /// /// The _session manager /// @@ -31,6 +51,15 @@ namespace Emby.Server.Implementations.Session private readonly IHttpServer _httpServer; + /// + /// The KeepAlive timer. + /// + private Timer _keepAliveTimer; + + /// + /// The WebSocket watchlist. + /// + private readonly ConcurrentDictionary _webSockets = new ConcurrentDictionary(); /// /// Initializes a new instance of the class. @@ -55,6 +84,7 @@ namespace Emby.Server.Implementations.Session if (session != null) { EnsureController(session, e.Argument); + KeepAliveWebSocket(e.Argument); } else { @@ -82,6 +112,7 @@ namespace Emby.Server.Implementations.Session public void Dispose() { _httpServer.WebSocketConnected -= _serverManager_WebSocketConnected; + StopKeepAliveTimer(); } /// @@ -99,5 +130,130 @@ namespace Emby.Server.Implementations.Session var controller = (WebSocketController)controllerInfo.Item1; controller.AddWebSocket(connection); } + + /// + /// Called when a WebSocket is closed. + /// + /// The WebSocket. + /// The event arguments. + private void _webSocket_Closed(object sender, EventArgs e) + { + var webSocket = (IWebSocketConnection) sender; + webSocket.Closed -= _webSocket_Closed; + _webSockets.TryRemove(webSocket, out _); + } + + /// + /// Adds a WebSocket to the KeepAlive watchlist. + /// + /// The WebSocket to monitor. + private async void KeepAliveWebSocket(IWebSocketConnection webSocket) + { + _webSockets.TryAdd(webSocket, 0); + webSocket.Closed += _webSocket_Closed; + webSocket.LastKeepAliveDate = DateTime.UtcNow; + + // Notify WebSocket about timeout + try + { + await SendForceKeepAlive(webSocket); + } + catch (WebSocketException exception) + { + _logger.LogDebug(exception, "Error sending ForceKeepAlive message to WebSocket."); + } + + StartKeepAliveTimer(); + } + + /// + /// Starts the KeepAlive timer. + /// + private void StartKeepAliveTimer() + { + if (_keepAliveTimer == null) + { + _keepAliveTimer = new Timer( + KeepAliveSockets, + null, + TimeSpan.FromSeconds(WebSocketLostTimeout * TimerFactor), + TimeSpan.FromSeconds(WebSocketLostTimeout * TimerFactor) + ); + } + } + + /// + /// Stops the KeepAlive timer. + /// + private void StopKeepAliveTimer() + { + if (_keepAliveTimer != null) + { + _keepAliveTimer.Dispose(); + _keepAliveTimer = null; + } + + foreach (var pair in _webSockets) + { + pair.Key.Closed -= _webSocket_Closed; + } + } + + /// + /// Checks status of KeepAlive of WebSockets. + /// + /// The state. + private async void KeepAliveSockets(object state) + { + var inactive = _webSockets.Keys.Where(i => + { + var elapsed = (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds; + return (elapsed > WebSocketLostTimeout * ForceKeepAliveFactor) && (elapsed < WebSocketLostTimeout); + }); + var lost = _webSockets.Keys.Where(i => (DateTime.UtcNow - i.LastKeepAliveDate).TotalSeconds >= WebSocketLostTimeout); + + if (inactive.Any()) + { + _logger.LogDebug("Sending ForceKeepAlive message to {0} WebSockets.", inactive.Count()); + } + + foreach (var webSocket in inactive) + { + try + { + await SendForceKeepAlive(webSocket); + } + catch (WebSocketException exception) + { + _logger.LogDebug(exception, "Error sending ForceKeepAlive message to WebSocket."); + lost.Append(webSocket); + } + } + + if (lost.Any()) + { + // TODO: handle lost webSockets + _logger.LogDebug("Lost {0} WebSockets.", lost.Count()); + } + + if (!_webSockets.Any()) + { + StopKeepAliveTimer(); + } + } + + /// + /// Sends a ForceKeepAlive message to a WebSocket. + /// + /// The WebSocket. + /// Task. + private Task SendForceKeepAlive(IWebSocketConnection webSocket) + { + return webSocket.SendAsync(new WebSocketMessage + { + MessageType = "ForceKeepAlive", + Data = WebSocketLostTimeout + }, CancellationToken.None); + } } } diff --git a/MediaBrowser.Controller/Net/IWebSocketConnection.cs b/MediaBrowser.Controller/Net/IWebSocketConnection.cs index 31eb7ccb75..fb766ab57f 100644 --- a/MediaBrowser.Controller/Net/IWebSocketConnection.cs +++ b/MediaBrowser.Controller/Net/IWebSocketConnection.cs @@ -26,6 +26,12 @@ namespace MediaBrowser.Controller.Net /// The last activity date. DateTime LastActivityDate { get; } + /// + /// Gets or sets the date of last Keeplive received. + /// + /// The date of last Keeplive received. + public DateTime LastKeepAliveDate { get; set; } + /// /// Gets or sets the URL. ///