using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Net;
using System.Net.WebSockets;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using MediaBrowser.Common.Json;
using MediaBrowser.Controller.Net;
using MediaBrowser.Model.Net;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
namespace Emby.Server.Implementations.HttpServer
{
///
/// Class WebSocketConnection.
///
public class WebSocketConnection : IWebSocketConnection
{
///
/// The logger.
///
private readonly ILogger _logger;
///
/// The json serializer options.
///
private readonly JsonSerializerOptions _jsonOptions;
///
/// The socket.
///
private readonly WebSocket _socket;
private bool _disposed = false;
///
/// Initializes a new instance of the class.
///
/// The socket.
/// The remote end point.
/// The logger.
/// socket
public WebSocketConnection(ILogger logger, WebSocket socket, IPAddress remoteEndPoint)
{
if (socket == null)
{
throw new ArgumentNullException(nameof(socket));
}
if (remoteEndPoint != null)
{
throw new ArgumentNullException(nameof(remoteEndPoint));
}
if (logger == null)
{
throw new ArgumentNullException(nameof(logger));
}
_socket = socket;
RemoteEndPoint = remoteEndPoint;
_logger = logger;
_jsonOptions = JsonDefaults.GetOptions();
}
///
public event EventHandler Closed;
///
/// Gets or sets the remote end point.
///
public IPAddress RemoteEndPoint { get; private set; }
///
/// Gets or sets the receive action.
///
/// The receive action.
public Func OnReceive { get; set; }
///
/// Gets the last activity date.
///
/// The last activity date.
public DateTime LastActivityDate { get; private set; }
///
/// Gets or sets the URL.
///
/// The URL.
public string Url { get; set; }
///
/// Gets or sets the query string.
///
/// The query string.
public IQueryCollection QueryString { get; set; }
///
/// Gets the state.
///
/// The state.
public WebSocketState State => _socket.State;
///
/// Sends a message asynchronously.
///
///
/// The message.
/// The cancellation token.
/// Task.
/// message
public Task SendAsync(WebSocketMessage message, CancellationToken cancellationToken)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions);
return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken);
}
///
public async Task ProcessAsync(CancellationToken cancellationToken = default)
{
var pipe = new Pipe();
var writer = pipe.Writer;
ValueWebSocketReceiveResult receiveresult;
do
{
// Allocate at least 512 bytes from the PipeWriter
Memory memory = writer.GetMemory(512);
receiveresult = await _socket.ReceiveAsync(memory, cancellationToken);
int bytesRead = receiveresult.Count;
if (bytesRead == 0)
{
continue;
}
// Tell the PipeWriter how much was read from the Socket
writer.Advance(bytesRead);
// Make the data available to the PipeReader
FlushResult flushResult = await writer.FlushAsync();
if (flushResult.IsCompleted)
{
// The PipeReader stopped reading
break;
}
if (receiveresult.EndOfMessage)
{
await ProcessInternal(pipe.Reader).ConfigureAwait(false);
}
} while (_socket.State == WebSocketState.Open && receiveresult.MessageType != WebSocketMessageType.Close);
if (_socket.State == WebSocketState.Open)
{
await _socket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
string.Empty, // REVIEW: human readable explanation as to why the connection is closed.
cancellationToken).ConfigureAwait(false);
}
Closed?.Invoke(this, EventArgs.Empty);
_socket.Dispose();
}
private async Task ProcessInternal(PipeReader reader)
{
LastActivityDate = DateTime.UtcNow;
if (OnReceive == null)
{
return;
}
try
{
var result = await reader.ReadAsync().ConfigureAwait(false);
if (!result.IsCompleted)
{
return;
}
WebSocketMessage