Merge pull request #1943 from MediaBrowser/dev

Dev
This commit is contained in:
Luke 2016-07-15 13:19:29 -04:00 committed by GitHub
commit 090d79e8c4
10 changed files with 149 additions and 312 deletions

View File

@ -13,6 +13,7 @@ using ServiceStack.Web;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Globalization; using System.Globalization;
using System.IO;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using CommonIO; using CommonIO;
@ -336,17 +337,19 @@ namespace MediaBrowser.Api.Playback.Progressive
state.Dispose(); state.Dispose();
} }
var result = new ProgressiveStreamWriter(outputPath, Logger, FileSystem, job); var outputHeaders = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
result.Options["Content-Type"] = contentType; outputHeaders["Content-Type"] = contentType;
// Add the response headers to the result object // Add the response headers to the result object
foreach (var item in responseHeaders) foreach (var item in responseHeaders)
{ {
result.Options[item.Key] = item.Value; outputHeaders[item.Key] = item.Value;
} }
return result; Func<Stream,Task> streamWriter = stream => new ProgressiveFileCopier(FileSystem, job, Logger).StreamFile(outputPath, stream);
return ResultFactory.GetAsyncStreamWriter(streamWriter, outputHeaders);
} }
finally finally
{ {

View File

@ -48,21 +48,19 @@ namespace MediaBrowser.Api.Playback.Progressive
/// <param name="responseStream">The response stream.</param> /// <param name="responseStream">The response stream.</param>
public void WriteTo(Stream responseStream) public void WriteTo(Stream responseStream)
{ {
WriteToInternal(responseStream); var task = WriteToAsync(responseStream);
Task.WaitAll(task);
} }
/// <summary> /// <summary>
/// Writes to async. /// Writes to.
/// </summary> /// </summary>
/// <param name="responseStream">The response stream.</param> /// <param name="responseStream">The response stream.</param>
/// <returns>Task.</returns> public async Task WriteToAsync(Stream responseStream)
private void WriteToInternal(Stream responseStream)
{ {
try try
{ {
var task = new ProgressiveFileCopier(_fileSystem, _job, Logger).StreamFile(Path, responseStream); await new ProgressiveFileCopier(_fileSystem, _job, Logger).StreamFile(Path, responseStream).ConfigureAwait(false);
Task.WaitAll(task);
} }
catch (IOException) catch (IOException)
{ {

View File

@ -28,6 +28,8 @@ namespace MediaBrowser.Controller.Net
/// <returns>System.Object.</returns> /// <returns>System.Object.</returns>
object GetResult(object content, string contentType, IDictionary<string,string> responseHeaders = null); object GetResult(object content, string contentType, IDictionary<string,string> responseHeaders = null);
object GetAsyncStreamWriter(Func<Stream,Task> streamWriter, IDictionary<string, string> responseHeaders = null);
/// <summary> /// <summary>
/// Gets the optimized result. /// Gets the optimized result.
/// </summary> /// </summary>

View File

@ -0,0 +1,56 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using ServiceStack;
using ServiceStack.Web;
namespace MediaBrowser.Server.Implementations.HttpServer
{
public class AsyncStreamWriterFunc : IStreamWriter, IAsyncStreamWriter, IHasOptions
{
/// <summary>
/// Gets or sets the source stream.
/// </summary>
/// <value>The source stream.</value>
private Func<Stream, Task> Writer { get; set; }
/// <summary>
/// Gets the options.
/// </summary>
/// <value>The options.</value>
public IDictionary<string, string> Options { get; private set; }
public Action OnComplete { get; set; }
public Action OnError { get; set; }
/// <summary>
/// Initializes a new instance of the <see cref="StreamWriter" /> class.
/// </summary>
public AsyncStreamWriterFunc(Func<Stream, Task> writer, IDictionary<string, string> headers)
{
Writer = writer;
if (headers == null)
{
headers = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
}
Options = headers;
}
/// <summary>
/// Writes to.
/// </summary>
/// <param name="responseStream">The response stream.</param>
public void WriteTo(Stream responseStream)
{
var task = Writer(responseStream);
Task.WaitAll(task);
}
public async Task WriteToAsync(Stream responseStream)
{
await Writer(responseStream).ConfigureAwait(false);
}
}
}

View File

@ -699,5 +699,10 @@ namespace MediaBrowser.Server.Implementations.HttpServer
throw error; throw error;
} }
public object GetAsyncStreamWriter(Func<Stream, Task> streamWriter, IDictionary<string, string> responseHeaders = null)
{
return new AsyncStreamWriterFunc(streamWriter, responseHeaders);
}
} }
} }

View File

@ -1,285 +0,0 @@
using MediaBrowser.Controller.Net;
using MediaBrowser.Model.Logging;
using ServiceStack;
using ServiceStack.Host.HttpListener;
using ServiceStack.Web;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace MediaBrowser.Server.Implementations.HttpServer.NetListener
{
public class HttpListenerServer : IHttpListener
{
private readonly ILogger _logger;
private HttpListener _listener;
private readonly ManualResetEventSlim _listenForNextRequest = new ManualResetEventSlim(false);
public Action<Exception, IRequest> ErrorHandler { get; set; }
public Action<WebSocketConnectEventArgs> WebSocketHandler { get; set; }
public Func<IHttpRequest, Uri, Task> RequestHandler { get; set; }
private readonly Action<string> _endpointListener;
public HttpListenerServer(ILogger logger, Action<string> endpointListener)
{
_logger = logger;
_endpointListener = endpointListener;
}
private List<string> UrlPrefixes { get; set; }
public void Start(IEnumerable<string> urlPrefixes)
{
UrlPrefixes = urlPrefixes.ToList();
if (_listener == null)
_listener = new HttpListener();
//HostContext.Config.HandlerFactoryPath = ListenerRequest.GetHandlerPathIfAny(UrlPrefixes.First());
foreach (var prefix in UrlPrefixes)
{
_logger.Info("Adding HttpListener prefix " + prefix);
_listener.Prefixes.Add(prefix);
}
_listener.Start();
Task.Factory.StartNew(Listen, TaskCreationOptions.LongRunning);
}
private bool IsListening
{
get { return _listener != null && _listener.IsListening; }
}
// Loop here to begin processing of new requests.
private void Listen()
{
while (IsListening)
{
if (_listener == null) return;
_listenForNextRequest.Reset();
try
{
_listener.BeginGetContext(ListenerCallback, _listener);
_listenForNextRequest.Wait();
}
catch (Exception ex)
{
_logger.Error("Listen()", ex);
return;
}
if (_listener == null) return;
}
}
// Handle the processing of a request in here.
private void ListenerCallback(IAsyncResult asyncResult)
{
_listenForNextRequest.Set();
var listener = asyncResult.AsyncState as HttpListener;
HttpListenerContext context;
if (listener == null) return;
var isListening = listener.IsListening;
try
{
if (!isListening)
{
_logger.Debug("Ignoring ListenerCallback() as HttpListener is no longer listening"); return;
}
// The EndGetContext() method, as with all Begin/End asynchronous methods in the .NET Framework,
// blocks until there is a request to be processed or some type of data is available.
context = listener.EndGetContext(asyncResult);
}
catch (Exception ex)
{
// You will get an exception when httpListener.Stop() is called
// because there will be a thread stopped waiting on the .EndGetContext()
// method, and again, that is just the way most Begin/End asynchronous
// methods of the .NET Framework work.
var errMsg = ex + ": " + IsListening;
_logger.Warn(errMsg);
return;
}
Task.Factory.StartNew(() => InitTask(context));
}
private void InitTask(HttpListenerContext context)
{
try
{
var task = this.ProcessRequestAsync(context);
task.ContinueWith(x => HandleError(x.Exception, context), TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.AttachedToParent);
if (task.Status == TaskStatus.Created)
{
task.RunSynchronously();
}
}
catch (Exception ex)
{
HandleError(ex, context);
}
}
private Task ProcessRequestAsync(HttpListenerContext context)
{
var request = context.Request;
LogHttpRequest(request);
if (request.IsWebSocketRequest)
{
return ProcessWebSocketRequest(context);
}
if (string.IsNullOrEmpty(context.Request.RawUrl))
return ((object)null).AsTaskResult();
var operationName = context.Request.GetOperationName();
var httpReq = GetRequest(context, operationName);
return RequestHandler(httpReq, request.Url);
}
/// <summary>
/// Processes the web socket request.
/// </summary>
/// <param name="ctx">The CTX.</param>
/// <returns>Task.</returns>
private async Task ProcessWebSocketRequest(HttpListenerContext ctx)
{
#if !__MonoCS__
try
{
var webSocketContext = await ctx.AcceptWebSocketAsync(null).ConfigureAwait(false);
if (WebSocketHandler != null)
{
WebSocketHandler(new WebSocketConnectEventArgs
{
WebSocket = new NativeWebSocket(webSocketContext.WebSocket, _logger),
Endpoint = ctx.Request.RemoteEndPoint.ToString()
});
}
}
catch (Exception ex)
{
_logger.ErrorException("AcceptWebSocketAsync error", ex);
ctx.Response.StatusCode = 500;
ctx.Response.Close();
}
#endif
}
private void HandleError(Exception ex, HttpListenerContext context)
{
var operationName = context.Request.GetOperationName();
var httpReq = GetRequest(context, operationName);
if (ErrorHandler != null)
{
ErrorHandler(ex, httpReq);
}
}
private static ListenerRequest GetRequest(HttpListenerContext httpContext, string operationName)
{
var req = new ListenerRequest(httpContext, operationName, RequestAttributes.None);
req.RequestAttributes = req.GetAttributes();
return req;
}
/// <summary>
/// Logs the HTTP request.
/// </summary>
/// <param name="request">The request.</param>
private void LogHttpRequest(HttpListenerRequest request)
{
var endpoint = request.LocalEndPoint;
if (endpoint != null)
{
var address = endpoint.ToString();
_endpointListener(address);
}
LogRequest(_logger, request);
}
/// <summary>
/// Logs the request.
/// </summary>
/// <param name="logger">The logger.</param>
/// <param name="request">The request.</param>
private static void LogRequest(ILogger logger, HttpListenerRequest request)
{
var log = new StringBuilder();
var logHeaders = true;
if (logHeaders)
{
var headers = string.Join(",", request.Headers.AllKeys.Where(i => !string.Equals(i, "cookie", StringComparison.OrdinalIgnoreCase) && !string.Equals(i, "Referer", StringComparison.OrdinalIgnoreCase)).Select(k => k + "=" + request.Headers[k]));
log.AppendLine("Ip: " + request.RemoteEndPoint + ". Headers: " + headers);
}
var type = request.IsWebSocketRequest ? "Web Socket" : "HTTP " + request.HttpMethod;
logger.LogMultiline(type + " " + request.Url, LogSeverity.Debug, log);
}
public void Stop()
{
if (_listener != null)
{
foreach (var prefix in UrlPrefixes)
{
_listener.Prefixes.Remove(prefix);
}
_listener.Close();
}
}
public void Dispose()
{
Dispose(true);
}
private bool _disposed;
private readonly object _disposeLock = new object();
protected virtual void Dispose(bool disposing)
{
if (_disposed) return;
lock (_disposeLock)
{
if (_disposed) return;
if (disposing)
{
Stop();
}
//release unmanaged resources here...
_disposed = true;
}
}
}
}

View File

@ -5,10 +5,12 @@ using System.Collections.Generic;
using System.Globalization; using System.Globalization;
using System.IO; using System.IO;
using System.Net; using System.Net;
using System.Threading.Tasks;
using ServiceStack;
namespace MediaBrowser.Server.Implementations.HttpServer namespace MediaBrowser.Server.Implementations.HttpServer
{ {
public class RangeRequestWriter : IStreamWriter, IHttpResult public class RangeRequestWriter : IStreamWriter, IAsyncStreamWriter, IHttpResult
{ {
/// <summary> /// <summary>
/// Gets or sets the source stream. /// Gets or sets the source stream.
@ -168,16 +170,6 @@ namespace MediaBrowser.Server.Implementations.HttpServer
/// </summary> /// </summary>
/// <param name="responseStream">The response stream.</param> /// <param name="responseStream">The response stream.</param>
public void WriteTo(Stream responseStream) public void WriteTo(Stream responseStream)
{
WriteToInternal(responseStream);
}
/// <summary>
/// Writes to async.
/// </summary>
/// <param name="responseStream">The response stream.</param>
/// <returns>Task.</returns>
private void WriteToInternal(Stream responseStream)
{ {
try try
{ {
@ -237,6 +229,66 @@ namespace MediaBrowser.Server.Implementations.HttpServer
} }
} }
public async Task WriteToAsync(Stream responseStream)
{
try
{
// Headers only
if (IsHeadRequest)
{
return;
}
using (var source = SourceStream)
{
// If the requested range is "0-", we can optimize by just doing a stream copy
if (RangeEnd >= TotalContentLength - 1)
{
await source.CopyToAsync(responseStream, BufferSize).ConfigureAwait(false);
}
else
{
await CopyToInternalAsync(source, responseStream, RangeLength).ConfigureAwait(false);
}
}
}
catch (IOException ex)
{
throw;
}
catch (Exception ex)
{
_logger.ErrorException("Error in range request writer", ex);
throw;
}
finally
{
if (OnComplete != null)
{
OnComplete();
}
}
}
private async Task CopyToInternalAsync(Stream source, Stream destination, long copyLength)
{
var array = new byte[BufferSize];
int count;
while ((count = await source.ReadAsync(array, 0, array.Length).ConfigureAwait(false)) != 0)
{
var bytesToCopy = Math.Min(count, copyLength);
await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToCopy)).ConfigureAwait(false);
copyLength -= bytesToCopy;
if (copyLength <= 0)
{
break;
}
}
}
public string ContentType { get; set; } public string ContentType { get; set; }
public IRequest RequestContext { get; set; } public IRequest RequestContext { get; set; }

View File

@ -12,7 +12,7 @@ namespace MediaBrowser.Server.Implementations.HttpServer
/// <summary> /// <summary>
/// Class StreamWriter /// Class StreamWriter
/// </summary> /// </summary>
public class StreamWriter : IStreamWriter, /*IAsyncStreamWriter,*/ IHasOptions public class StreamWriter : IStreamWriter, IAsyncStreamWriter, IHasOptions
{ {
private ILogger Logger { get; set; } private ILogger Logger { get; set; }

View File

@ -156,6 +156,7 @@
<Compile Include="EntryPoints\ServerEventNotifier.cs" /> <Compile Include="EntryPoints\ServerEventNotifier.cs" />
<Compile Include="EntryPoints\UserDataChangeNotifier.cs" /> <Compile Include="EntryPoints\UserDataChangeNotifier.cs" />
<Compile Include="FileOrganization\OrganizerScheduledTask.cs" /> <Compile Include="FileOrganization\OrganizerScheduledTask.cs" />
<Compile Include="HttpServer\AsyncStreamWriterFunc.cs" />
<Compile Include="HttpServer\IHttpListener.cs" /> <Compile Include="HttpServer\IHttpListener.cs" />
<Compile Include="HttpServer\Security\AuthorizationContext.cs" /> <Compile Include="HttpServer\Security\AuthorizationContext.cs" />
<Compile Include="HttpServer\ContainerAdapter.cs" /> <Compile Include="HttpServer\ContainerAdapter.cs" />
@ -757,9 +758,7 @@
<EmbeddedResource Include="Localization\iso6392.txt" /> <EmbeddedResource Include="Localization\iso6392.txt" />
<EmbeddedResource Include="Localization\Ratings\be.txt" /> <EmbeddedResource Include="Localization\Ratings\be.txt" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup />
<Folder Include="HttpServer\NetListener\" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it. <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets. Other similar extension points exist, see Microsoft.Common.targets.

View File

@ -4179,6 +4179,13 @@ namespace MediaBrowser.Server.Implementations.Persistence
throw new ArgumentNullException("values"); throw new ArgumentNullException("values");
} }
// Just in case there might be case-insensitive duplicates, strip them out now
var newValues = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
foreach (var pair in values)
{
newValues[pair.Key] = pair.Value;
}
CheckDisposed(); CheckDisposed();
// First delete // First delete
@ -4187,7 +4194,7 @@ namespace MediaBrowser.Server.Implementations.Persistence
_deleteProviderIdsCommand.ExecuteNonQuery(); _deleteProviderIdsCommand.ExecuteNonQuery();
foreach (var pair in values) foreach (var pair in newValues)
{ {
_saveProviderIdsCommand.GetParameter(0).Value = itemId; _saveProviderIdsCommand.GetParameter(0).Value = itemId;
_saveProviderIdsCommand.GetParameter(1).Value = pair.Key; _saveProviderIdsCommand.GetParameter(1).Value = pair.Key;