update closing of streams

This commit is contained in:
Luke Pulverenti
2016-09-29 08:55:49 -04:00
parent f5d37ed659
commit 76c7bfcb67
36 changed files with 481 additions and 332 deletions

View File

@@ -355,7 +355,7 @@ namespace MediaBrowser.Server.Implementations.Library
.ToList();
}
private readonly ConcurrentDictionary<string, LiveStreamInfo> _openStreams = new ConcurrentDictionary<string, LiveStreamInfo>(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, LiveStreamInfo> _openStreams = new Dictionary<string, LiveStreamInfo>(StringComparer.OrdinalIgnoreCase);
private readonly SemaphoreSlim _liveStreamSemaphore = new SemaphoreSlim(1, 1);
public async Task<LiveStreamResponse> OpenLiveStream(LiveStreamRequest request, bool enableAutoClose, CancellationToken cancellationToken)
@@ -383,7 +383,7 @@ namespace MediaBrowser.Server.Implementations.Library
Id = mediaSource.LiveStreamId,
MediaSource = mediaSource
};
_openStreams.AddOrUpdate(mediaSource.LiveStreamId, info, (key, i) => info);
_openStreams[mediaSource.LiveStreamId] = info;
if (enableAutoClose)
{
@@ -421,7 +421,7 @@ namespace MediaBrowser.Server.Implementations.Library
throw new ArgumentNullException("id");
}
_logger.Debug("Getting live stream {0}", id);
_logger.Debug("Getting already opened live stream {0}", id);
await _liveStreamSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
@@ -465,17 +465,16 @@ namespace MediaBrowser.Server.Implementations.Library
}
}
private async Task CloseLiveStreamWithProvider(IMediaSourceProvider provider, string streamId, CancellationToken cancellationToken)
private async Task CloseLiveStreamWithProvider(IMediaSourceProvider provider, string streamId)
{
_logger.Info("Closing live stream {0} with provider {1}", streamId, provider.GetType().Name);
try
{
await provider.CloseMediaSource(streamId, cancellationToken).ConfigureAwait(false);
await provider.CloseMediaSource(streamId).ConfigureAwait(false);
}
catch (NotImplementedException)
{
}
catch (Exception ex)
{
@@ -483,37 +482,35 @@ namespace MediaBrowser.Server.Implementations.Library
}
}
public async Task CloseLiveStream(string id, CancellationToken cancellationToken)
public async Task CloseLiveStream(string id)
{
if (string.IsNullOrWhiteSpace(id))
{
throw new ArgumentNullException("id");
}
await _liveStreamSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
await _liveStreamSemaphore.WaitAsync().ConfigureAwait(false);
try
{
LiveStreamInfo current;
if (_openStreams.TryGetValue(id, out current))
{
_openStreams.Remove(id);
current.Closed = true;
if (current.MediaSource.RequiresClosing)
{
var tuple = GetProvider(id);
await CloseLiveStreamWithProvider(tuple.Item1, tuple.Item2, cancellationToken).ConfigureAwait(false);
await CloseLiveStreamWithProvider(tuple.Item1, tuple.Item2).ConfigureAwait(false);
}
}
LiveStreamInfo removed;
if (_openStreams.TryRemove(id, out removed))
{
removed.Closed = true;
}
if (_openStreams.Count == 0)
{
StopCloseTimer();
if (_openStreams.Count == 0)
{
StopCloseTimer();
}
}
}
finally
@@ -565,10 +562,20 @@ namespace MediaBrowser.Server.Implementations.Library
private async void CloseTimerCallback(object state)
{
var infos = _openStreams
.Values
.Where(i => i.EnableCloseTimer && DateTime.UtcNow - i.Date > _openStreamMaxAge)
.ToList();
List<LiveStreamInfo> infos;
await _liveStreamSemaphore.WaitAsync().ConfigureAwait(false);
try
{
infos = _openStreams
.Values
.Where(i => i.EnableCloseTimer && DateTime.UtcNow - i.Date > _openStreamMaxAge)
.ToList();
}
finally
{
_liveStreamSemaphore.Release();
}
foreach (var info in infos)
{
@@ -576,7 +583,7 @@ namespace MediaBrowser.Server.Implementations.Library
{
try
{
await CloseLiveStream(info.Id, CancellationToken.None).ConfigureAwait(false);
await CloseLiveStream(info.Id).ConfigureAwait(false);
}
catch (Exception ex)
{
@@ -608,12 +615,10 @@ namespace MediaBrowser.Server.Implementations.Library
{
foreach (var key in _openStreams.Keys.ToList())
{
var task = CloseLiveStream(key, CancellationToken.None);
var task = CloseLiveStream(key);
Task.WaitAll(task);
}
_openStreams.Clear();
}
}
}