update live stream management

This commit is contained in:
Luke Pulverenti 2017-08-19 18:37:15 -04:00
parent 1ad990ad72
commit b783f317fe
6 changed files with 25 additions and 99 deletions

View File

@ -49,8 +49,8 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
_logger.Info("Copying recording stream to file {0}", targetFile); _logger.Info("Copying recording stream to file {0}", targetFile);
// The media source if infinite so we need to handle stopping ourselves // The media source if infinite so we need to handle stopping ourselves
var durationToken = new CancellationTokenSource(duration); //var durationToken = new CancellationTokenSource(duration);
cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, durationToken.Token).Token; //cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, durationToken.Token).Token;
await directStreamProvider.CopyToAsync(output, cancellationToken).ConfigureAwait(false); await directStreamProvider.CopyToAsync(output, cancellationToken).ConfigureAwait(false);
} }

View File

@ -39,22 +39,10 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
if (bytesRead > 0) if (bytesRead > 0)
{ {
var allStreams = _outputStreams.ToList(); foreach (var stream in _outputStreams)
//if (allStreams.Count == 1)
//{
// allStreams[0].Value.Write(buffer, 0, bytesRead);
//}
//else
{
//byte[] copy = new byte[bytesRead];
//Buffer.BlockCopy(buffer, 0, copy, 0, bytesRead);
foreach (var stream in allStreams)
{ {
stream.Value.Queue(buffer, 0, bytesRead); stream.Value.Queue(buffer, 0, bytesRead);
} }
}
if (onStarted != null) if (onStarted != null)
{ {
@ -73,27 +61,21 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
public Task CopyToAsync(Stream stream, CancellationToken cancellationToken) public Task CopyToAsync(Stream stream, CancellationToken cancellationToken)
{ {
var result = new QueueStream(stream, _logger) var queueStream = new QueueStream(stream, _logger);
_outputStreams.TryAdd(queueStream.Id, queueStream);
try
{ {
OnFinished = OnFinished queueStream.Start(cancellationToken);
}; }
finally
_outputStreams.TryAdd(result.Id, result); {
_outputStreams.TryRemove(queueStream.Id, out queueStream);
result.Start(cancellationToken); GC.Collect();
return result.TaskCompletion.Task;
} }
public void RemoveOutputStream(QueueStream stream) return Task.FromResult(true);
{
QueueStream removed;
_outputStreams.TryRemove(stream.Id, out removed);
}
private void OnFinished(QueueStream queueStream)
{
RemoveOutputStream(queueStream);
} }
} }
} }

View File

@ -14,9 +14,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
{ {
private readonly Stream _outputStream; private readonly Stream _outputStream;
private readonly BlockingCollection<Tuple<byte[], int, int>> _queue = new BlockingCollection<Tuple<byte[], int, int>>(); private readonly BlockingCollection<Tuple<byte[], int, int>> _queue = new BlockingCollection<Tuple<byte[], int, int>>();
public TaskCompletionSource<bool> TaskCompletion { get; private set; }
public Action<QueueStream> OnFinished { get; set; }
private readonly ILogger _logger; private readonly ILogger _logger;
public Guid Id = Guid.NewGuid(); public Guid Id = Guid.NewGuid();
@ -24,7 +22,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
{ {
_outputStream = outputStream; _outputStream = outputStream;
_logger = logger; _logger = logger;
TaskCompletion = new TaskCompletionSource<bool>();
} }
public void Queue(byte[] bytes, int offset, int count) public void Queue(byte[] bytes, int offset, int count)
@ -33,69 +30,16 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
} }
public void Start(CancellationToken cancellationToken) public void Start(CancellationToken cancellationToken)
{
Task.Run(() => StartInternal(cancellationToken));
}
private void OnClosed()
{
GC.Collect();
if (OnFinished != null)
{
OnFinished(this);
}
}
public void Write(byte[] bytes, int offset, int count)
{
//return _outputStream.WriteAsync(bytes, offset, count, cancellationToken);
try
{
_outputStream.Write(bytes, offset, count);
}
catch (OperationCanceledException)
{
_logger.Debug("QueueStream cancelled");
TaskCompletion.TrySetCanceled();
OnClosed();
}
catch (Exception ex)
{
_logger.ErrorException("Error in QueueStream", ex);
TaskCompletion.TrySetException(ex);
OnClosed();
}
}
private void StartInternal(CancellationToken cancellationToken)
{
try
{ {
while (true) while (true)
{
foreach (var result in _queue.GetConsumingEnumerable())
{ {
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
foreach (var result in _queue.GetConsumingEnumerable())
{
_outputStream.Write(result.Item1, result.Item2, result.Item3); _outputStream.Write(result.Item1, result.Item2, result.Item3);
} }
} }
} }
catch (OperationCanceledException)
{
_logger.Debug("QueueStream cancelled");
TaskCompletion.TrySetCanceled();
}
catch (Exception ex)
{
_logger.ErrorException("Error in QueueStream", ex);
TaskCompletion.TrySetException(ex);
}
finally
{
OnClosed();
}
}
} }
} }

View File

@ -253,7 +253,7 @@ namespace MediaBrowser.XbmcMetadata.Parsers
int value; int value;
if (!string.IsNullOrWhiteSpace(tmdbId) && int.TryParse(tmdbId, NumberStyles.Any, CultureInfo.InvariantCulture, out value)) if (!string.IsNullOrWhiteSpace(tmdbId) && int.TryParse(tmdbId, NumberStyles.Any, CultureInfo.InvariantCulture, out value))
{ {
item.SetProviderId(MetadataProviders.Tmdb, tmdbId); item.SetProviderId(MetadataProviders.Tmdb, value.ToString(_usCulture));
} }
} }
@ -269,7 +269,7 @@ namespace MediaBrowser.XbmcMetadata.Parsers
int value; int value;
if (!string.IsNullOrWhiteSpace(tvdbId) && int.TryParse(tvdbId, NumberStyles.Any, CultureInfo.InvariantCulture, out value)) if (!string.IsNullOrWhiteSpace(tvdbId) && int.TryParse(tvdbId, NumberStyles.Any, CultureInfo.InvariantCulture, out value))
{ {
item.SetProviderId(MetadataProviders.Tvdb, tvdbId); item.SetProviderId(MetadataProviders.Tvdb, value.ToString(_usCulture));
} }
} }
} }

View File

@ -2,7 +2,7 @@
<package xmlns="http://schemas.microsoft.com/packaging/2011/08/nuspec.xsd"> <package xmlns="http://schemas.microsoft.com/packaging/2011/08/nuspec.xsd">
<metadata> <metadata>
<id>MediaBrowser.Common</id> <id>MediaBrowser.Common</id>
<version>3.0.734</version> <version>3.0.735</version>
<title>Emby.Common</title> <title>Emby.Common</title>
<authors>Emby Team</authors> <authors>Emby Team</authors>
<owners>ebr,Luke,scottisafool</owners> <owners>ebr,Luke,scottisafool</owners>

View File

@ -2,7 +2,7 @@
<package xmlns="http://schemas.microsoft.com/packaging/2010/07/nuspec.xsd"> <package xmlns="http://schemas.microsoft.com/packaging/2010/07/nuspec.xsd">
<metadata> <metadata>
<id>MediaBrowser.Server.Core</id> <id>MediaBrowser.Server.Core</id>
<version>3.0.734</version> <version>3.0.735</version>
<title>Emby.Server.Core</title> <title>Emby.Server.Core</title>
<authors>Emby Team</authors> <authors>Emby Team</authors>
<owners>ebr,Luke,scottisafool</owners> <owners>ebr,Luke,scottisafool</owners>
@ -12,7 +12,7 @@
<description>Contains core components required to build plugins for Emby Server.</description> <description>Contains core components required to build plugins for Emby Server.</description>
<copyright>Copyright © Emby 2013</copyright> <copyright>Copyright © Emby 2013</copyright>
<dependencies> <dependencies>
<dependency id="MediaBrowser.Common" version="3.0.734" /> <dependency id="MediaBrowser.Common" version="3.0.735" />
</dependencies> </dependencies>
</metadata> </metadata>
<files> <files>