update live stream management

This commit is contained in:
Luke Pulverenti
2016-10-07 11:08:13 -04:00
parent d22b7817a4
commit 50e6686987
53 changed files with 487 additions and 1078 deletions

View File

@@ -377,10 +377,10 @@ namespace MediaBrowser.Server.Implementations.EntryPoints.Notifications
DisposeLibraryUpdateTimer();
}
if (items.Count == 1)
{
var item = items.First();
items = items.Take(10).ToList();
foreach (var item in items)
{
var notification = new NotificationRequest
{
NotificationType = NotificationType.NewLibraryContent.ToString()
@@ -388,17 +388,6 @@ namespace MediaBrowser.Server.Implementations.EntryPoints.Notifications
notification.Variables["Name"] = GetItemName(item);
await SendNotification(notification).ConfigureAwait(false);
}
else
{
var notification = new NotificationRequest
{
NotificationType = NotificationType.NewLibraryContentMultiple.ToString()
};
notification.Variables["ItemCount"] = items.Count.ToString(CultureInfo.InvariantCulture);
await SendNotification(notification).ConfigureAwait(false);
}
}

View File

@@ -94,12 +94,12 @@ namespace MediaBrowser.Server.Implementations.HttpServer
// The Markdown feature causes slow startup times (5 mins+) on cold boots for some users
// Custom format allows images
HostConfig.Instance.EnableFeatures = Feature.Csv | Feature.Html | Feature.Json | Feature.Jsv | Feature.Metadata | Feature.Xml | Feature.CustomFormat;
HostConfig.Instance.EnableFeatures = Feature.Html | Feature.Json | Feature.CustomFormat;
container.Adapter = _containerAdapter;
Plugins.RemoveAll(x => x is NativeTypesFeature);
Plugins.Add(new SwaggerFeature());
//Plugins.Add(new SwaggerFeature());
Plugins.Add(new CorsFeature(allowedHeaders: "Content-Type, Authorization, Range, X-MediaBrowser-Token, X-Emby-Authorization"));
//Plugins.Add(new AuthFeature(() => new AuthUserSession(), new IAuthProvider[] {
@@ -546,8 +546,10 @@ namespace MediaBrowser.Server.Implementations.HttpServer
}
}
}
throw new NotImplementedException("Cannot execute handler: " + handler + " at PathInfo: " + httpReq.PathInfo);
else
{
httpRes.Close();
}
}
/// <summary>

View File

@@ -683,29 +683,6 @@ namespace MediaBrowser.Server.Implementations.HttpServer
}
}
/// <summary>
/// Gets the error result.
/// </summary>
/// <param name="statusCode">The status code.</param>
/// <param name="errorMessage">The error message.</param>
/// <param name="responseHeaders">The response headers.</param>
/// <returns>System.Object.</returns>
public void ThrowError(int statusCode, string errorMessage, IDictionary<string, string> responseHeaders = null)
{
var error = new HttpError
{
Status = statusCode,
ErrorCode = errorMessage
};
if (responseHeaders != null)
{
AddResponseHeaders(error, responseHeaders);
}
throw error;
}
public object GetAsyncStreamWriter(IAsyncStreamSource streamSource)
{
return new AsyncStreamWriter(streamSource);

View File

@@ -1,240 +0,0 @@
using MediaBrowser.Common.Events;
using MediaBrowser.Controller.Net;
using MediaBrowser.Model.Logging;
using System;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using WebSocketMessageType = MediaBrowser.Model.Net.WebSocketMessageType;
using WebSocketState = MediaBrowser.Model.Net.WebSocketState;
namespace MediaBrowser.Server.Implementations.HttpServer
{
/// <summary>
/// Class NativeWebSocket
/// </summary>
public class NativeWebSocket : IWebSocket
{
/// <summary>
/// The logger
/// </summary>
private readonly ILogger _logger;
public event EventHandler<EventArgs> Closed;
/// <summary>
/// Gets or sets the web socket.
/// </summary>
/// <value>The web socket.</value>
private System.Net.WebSockets.WebSocket WebSocket { get; set; }
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
/// <summary>
/// Initializes a new instance of the <see cref="NativeWebSocket" /> class.
/// </summary>
/// <param name="socket">The socket.</param>
/// <param name="logger">The logger.</param>
/// <exception cref="System.ArgumentNullException">socket</exception>
public NativeWebSocket(WebSocket socket, ILogger logger)
{
if (socket == null)
{
throw new ArgumentNullException("socket");
}
if (logger == null)
{
throw new ArgumentNullException("logger");
}
_logger = logger;
WebSocket = socket;
Receive();
}
/// <summary>
/// Gets or sets the state.
/// </summary>
/// <value>The state.</value>
public WebSocketState State
{
get
{
WebSocketState commonState;
if (!Enum.TryParse(WebSocket.State.ToString(), true, out commonState))
{
_logger.Warn("Unrecognized WebSocketState: {0}", WebSocket.State.ToString());
}
return commonState;
}
}
/// <summary>
/// Receives this instance.
/// </summary>
private async void Receive()
{
while (true)
{
byte[] bytes;
try
{
bytes = await ReceiveBytesAsync(_cancellationTokenSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
catch (WebSocketException ex)
{
_logger.ErrorException("Error receiving web socket message", ex);
break;
}
if (bytes == null)
{
// Connection closed
EventHelper.FireEventIfNotNull(Closed, this, EventArgs.Empty, _logger);
break;
}
if (OnReceiveBytes != null)
{
OnReceiveBytes(bytes);
}
}
}
/// <summary>
/// Receives the async.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>Task{WebSocketMessageInfo}.</returns>
/// <exception cref="System.Net.WebSockets.WebSocketException">Connection closed</exception>
private async Task<byte[]> ReceiveBytesAsync(CancellationToken cancellationToken)
{
var bytes = new byte[4096];
var buffer = new ArraySegment<byte>(bytes);
var result = await WebSocket.ReceiveAsync(buffer, cancellationToken).ConfigureAwait(false);
if (result.CloseStatus.HasValue)
{
_logger.Info("Web socket connection closed by client. Reason: {0}", result.CloseStatus.Value);
return null;
}
return buffer.Array;
}
/// <summary>
/// Sends the async.
/// </summary>
/// <param name="bytes">The bytes.</param>
/// <param name="type">The type.</param>
/// <param name="endOfMessage">if set to <c>true</c> [end of message].</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>Task.</returns>
public Task SendAsync(byte[] bytes, WebSocketMessageType type, bool endOfMessage, CancellationToken cancellationToken)
{
System.Net.WebSockets.WebSocketMessageType nativeType;
if (!Enum.TryParse(type.ToString(), true, out nativeType))
{
_logger.Warn("Unrecognized WebSocketMessageType: {0}", type.ToString());
}
var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token);
return WebSocket.SendAsync(new ArraySegment<byte>(bytes), nativeType, true, linkedTokenSource.Token);
}
public Task SendAsync(byte[] bytes, bool endOfMessage, CancellationToken cancellationToken)
{
var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token);
return WebSocket.SendAsync(new ArraySegment<byte>(bytes), System.Net.WebSockets.WebSocketMessageType.Binary, true, linkedTokenSource.Token);
}
public Task SendAsync(string text, bool endOfMessage, CancellationToken cancellationToken)
{
var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token);
var bytes = Encoding.UTF8.GetBytes(text);
return WebSocket.SendAsync(new ArraySegment<byte>(bytes), System.Net.WebSockets.WebSocketMessageType.Text, true, linkedTokenSource.Token);
}
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose()
{
Dispose(true);
}
/// <summary>
/// Releases unmanaged and - optionally - managed resources.
/// </summary>
/// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
protected virtual void Dispose(bool dispose)
{
if (dispose)
{
_cancellationTokenSource.Cancel();
WebSocket.Dispose();
}
}
/// <summary>
/// Gets or sets the receive action.
/// </summary>
/// <value>The receive action.</value>
public Action<byte[]> OnReceiveBytes { get; set; }
/// <summary>
/// Gets or sets the on receive.
/// </summary>
/// <value>The on receive.</value>
public Action<string> OnReceive { get; set; }
/// <summary>
/// The _supports native web socket
/// </summary>
private static bool? _supportsNativeWebSocket;
/// <summary>
/// Gets a value indicating whether [supports web sockets].
/// </summary>
/// <value><c>true</c> if [supports web sockets]; otherwise, <c>false</c>.</value>
public static bool IsSupported
{
get
{
if (!_supportsNativeWebSocket.HasValue)
{
try
{
new ClientWebSocket();
_supportsNativeWebSocket = true;
}
catch (PlatformNotSupportedException)
{
_supportsNativeWebSocket = false;
}
}
return _supportsNativeWebSocket.Value;
}
}
}
}

View File

@@ -191,15 +191,6 @@ namespace MediaBrowser.Server.Implementations.HttpServer
}
}
}
catch (IOException)
{
throw;
}
catch (Exception ex)
{
_logger.ErrorException("Error in range request writer", ex);
throw;
}
finally
{
if (OnComplete != null)
@@ -251,15 +242,6 @@ namespace MediaBrowser.Server.Implementations.HttpServer
}
}
}
catch (IOException ex)
{
throw;
}
catch (Exception ex)
{
_logger.ErrorException("Error in range request writer", ex);
throw;
}
finally
{
if (OnComplete != null)

View File

@@ -81,20 +81,12 @@ namespace MediaBrowser.Server.Implementations.HttpServer.SocketSharp
public void Write(string text)
{
try
{
var bOutput = System.Text.Encoding.UTF8.GetBytes(text);
response.ContentLength64 = bOutput.Length;
var bOutput = System.Text.Encoding.UTF8.GetBytes(text);
response.ContentLength64 = bOutput.Length;
var outputStream = response.OutputStream;
outputStream.Write(bOutput, 0, bOutput.Length);
Close();
}
catch (Exception ex)
{
_logger.ErrorException("Could not WriteTextToResponse: " + ex.Message, ex);
throw;
}
var outputStream = response.OutputStream;
outputStream.Write(bOutput, 0, bOutput.Length);
Close();
}
public void Close()

View File

@@ -2463,7 +2463,7 @@ namespace MediaBrowser.Server.Implementations.Library
public IEnumerable<Video> FindTrailers(BaseItem owner, List<FileSystemMetadata> fileSystemChildren, IDirectoryService directoryService)
{
var files = owner.IsInMixedFolder ? new List<FileSystemMetadata>() : fileSystemChildren.Where(i => i.IsDirectory)
var files = owner.DetectIsInMixedFolder() ? new List<FileSystemMetadata>() : fileSystemChildren.Where(i => i.IsDirectory)
.Where(i => string.Equals(i.Name, BaseItem.TrailerFolderName, StringComparison.OrdinalIgnoreCase))
.SelectMany(i => _fileSystem.GetFiles(i.FullName, false))
.ToList();

View File

@@ -7,6 +7,7 @@ using System;
using System.IO;
using System.Linq;
using CommonIO;
using MediaBrowser.Controller.Configuration;
namespace MediaBrowser.Server.Implementations.Library.Resolvers.Audio
{
@@ -18,12 +19,14 @@ namespace MediaBrowser.Server.Implementations.Library.Resolvers.Audio
private readonly ILogger _logger;
private readonly IFileSystem _fileSystem;
private readonly ILibraryManager _libraryManager;
private readonly IServerConfigurationManager _config;
public MusicArtistResolver(ILogger logger, IFileSystem fileSystem, ILibraryManager libraryManager)
public MusicArtistResolver(ILogger logger, IFileSystem fileSystem, ILibraryManager libraryManager, IServerConfigurationManager config)
{
_logger = logger;
_fileSystem = fileSystem;
_libraryManager = libraryManager;
_config = config;
}
/// <summary>
@@ -67,6 +70,19 @@ namespace MediaBrowser.Server.Implementations.Library.Resolvers.Audio
return null;
}
if (args.IsDirectory)
{
if (args.ContainsFileSystemEntryByName("artist.nfo"))
{
return new MusicArtist();
}
}
if (_config.Configuration.EnableSimpleArtistDetection)
{
return null;
}
var directoryService = args.DirectoryService;
var albumResolver = new MusicAlbumResolver(_logger, _fileSystem, _libraryManager);

View File

@@ -48,12 +48,6 @@ namespace MediaBrowser.Server.Implementations.Library.Resolvers.Movies
string collectionType,
IDirectoryService directoryService)
{
if (parent != null && parent.Path != null && parent.Path.IndexOf("disney", StringComparison.OrdinalIgnoreCase) != -1)
{
var b = true;
var a = b;
}
var result = ResolveMultipleInternal(parent, files, collectionType, directoryService);
if (result != null)
@@ -213,26 +207,22 @@ namespace MediaBrowser.Server.Implementations.Library.Resolvers.Movies
// Find movies with their own folders
if (args.IsDirectory)
{
var files = args.FileSystemChildren
.Where(i => !LibraryManager.IgnoreFile(i, args.Parent))
.ToList();
if (string.Equals(collectionType, CollectionType.MusicVideos, StringComparison.OrdinalIgnoreCase))
{
return FindMovie<MusicVideo>(args.Path, args.Parent, files, args.DirectoryService, collectionType);
return null;
}
if (string.Equals(collectionType, CollectionType.HomeVideos, StringComparison.OrdinalIgnoreCase))
{
return FindMovie<Video>(args.Path, args.Parent, files, args.DirectoryService, collectionType);
return null;
}
if (string.IsNullOrEmpty(collectionType))
{
// Owned items should just use the plain video type
// Owned items will be caught by the plain video resolver
if (args.Parent == null)
{
return FindMovie<Video>(args.Path, args.Parent, files, args.DirectoryService, collectionType);
return null;
}
if (args.HasParent<Series>())
@@ -240,11 +230,21 @@ namespace MediaBrowser.Server.Implementations.Library.Resolvers.Movies
return null;
}
return FindMovie<Movie>(args.Path, args.Parent, files, args.DirectoryService, collectionType);
{
var files = args.FileSystemChildren
.Where(i => !LibraryManager.IgnoreFile(i, args.Parent))
.ToList();
return FindMovie<Movie>(args.Path, args.Parent, files, args.DirectoryService, collectionType);
}
}
if (string.Equals(collectionType, CollectionType.Movies, StringComparison.OrdinalIgnoreCase))
{
var files = args.FileSystemChildren
.Where(i => !LibraryManager.IgnoreFile(i, args.Parent))
.ToList();
return FindMovie<Movie>(args.Path, args.Parent, files, args.DirectoryService, collectionType);
}

View File

@@ -61,16 +61,7 @@ namespace MediaBrowser.Server.Implementations.Library
foreach (var key in keys)
{
try
{
await Repository.SaveUserData(userId, key, userData, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.ErrorException("Error saving user data", ex);
throw;
}
await Repository.SaveUserData(userId, key, userData, cancellationToken).ConfigureAwait(false);
}
var cacheKey = GetCacheKey(userId, item.Id);
@@ -107,18 +98,7 @@ namespace MediaBrowser.Server.Implementations.Library
cancellationToken.ThrowIfCancellationRequested();
try
{
await Repository.SaveAllUserData(userId, userData, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.ErrorException("Error saving user data", ex);
throw;
}
await Repository.SaveAllUserData(userId, userData, cancellationToken).ConfigureAwait(false);
}
/// <summary>

View File

@@ -841,23 +841,39 @@ namespace MediaBrowser.Server.Implementations.LiveTv.EmbyTV
return new Tuple<MediaSourceInfo, IDirectStreamProvider>(result.Item2, result.Item1 as IDirectStreamProvider);
}
private MediaSourceInfo CloneMediaSource(MediaSourceInfo mediaSource, int consumerId, bool enableStreamSharing)
private MediaSourceInfo CloneMediaSource(MediaSourceInfo mediaSource, bool enableStreamSharing)
{
var json = _jsonSerializer.SerializeToString(mediaSource);
mediaSource = _jsonSerializer.DeserializeFromString<MediaSourceInfo>(json);
mediaSource.Id = Guid.NewGuid().ToString("N") + "_" + mediaSource.Id;
if (mediaSource.DateLiveStreamOpened.HasValue && enableStreamSharing)
{
var ticks = (DateTime.UtcNow - mediaSource.DateLiveStreamOpened.Value).Ticks - TimeSpan.FromSeconds(10).Ticks;
ticks = Math.Max(0, ticks);
mediaSource.Path += "?t=" + ticks.ToString(CultureInfo.InvariantCulture) + "&s=" + mediaSource.DateLiveStreamOpened.Value.Ticks.ToString(CultureInfo.InvariantCulture);
}
//if (mediaSource.DateLiveStreamOpened.HasValue && enableStreamSharing)
//{
// var ticks = (DateTime.UtcNow - mediaSource.DateLiveStreamOpened.Value).Ticks - TimeSpan.FromSeconds(10).Ticks;
// ticks = Math.Max(0, ticks);
// mediaSource.Path += "?t=" + ticks.ToString(CultureInfo.InvariantCulture) + "&s=" + mediaSource.DateLiveStreamOpened.Value.Ticks.ToString(CultureInfo.InvariantCulture);
//}
return mediaSource;
}
public async Task<LiveStream> GetLiveStream(string uniqueId)
{
await _liveStreamsSemaphore.WaitAsync().ConfigureAwait(false);
try
{
return _liveStreams.Values
.FirstOrDefault(i => string.Equals(i.UniqueId, uniqueId, StringComparison.OrdinalIgnoreCase));
}
finally
{
_liveStreamsSemaphore.Release();
}
}
private async Task<Tuple<LiveStream, MediaSourceInfo, ITunerHost>> GetChannelStreamInternal(string channelId, string streamId, CancellationToken cancellationToken)
{
_logger.Info("Streaming Channel " + channelId);
@@ -872,7 +888,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv.EmbyTV
_logger.Info("Live stream {0} consumer count is now {1}", streamId, result.ConsumerCount);
var openedMediaSource = CloneMediaSource(result.OpenedMediaSource, result.ConsumerCount - 1, result.EnableStreamSharing);
var openedMediaSource = CloneMediaSource(result.OpenedMediaSource, result.EnableStreamSharing);
_liveStreamsSemaphore.Release();
return new Tuple<LiveStream, MediaSourceInfo, ITunerHost>(result, openedMediaSource, result.TunerHost);
}
@@ -885,7 +901,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv.EmbyTV
{
result = await hostInstance.GetChannelStream(channelId, streamId, cancellationToken).ConfigureAwait(false);
var openedMediaSource = CloneMediaSource(result.OpenedMediaSource, 0, result.EnableStreamSharing);
var openedMediaSource = CloneMediaSource(result.OpenedMediaSource, result.EnableStreamSharing);
_liveStreams[openedMediaSource.Id] = result;
@@ -1542,6 +1558,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv.EmbyTV
if (timer.IsKids)
{
AddGenre(timer.Genres, "Kids");
AddGenre(timer.Genres, "Children");
}
if (timer.IsNews)
{

View File

@@ -223,8 +223,6 @@ namespace MediaBrowser.Server.Implementations.LiveTv
return result.Items.FirstOrDefault();
}
private readonly SemaphoreSlim _liveStreamSemaphore = new SemaphoreSlim(1, 1);
public async Task<MediaSourceInfo> GetRecordingStream(string id, CancellationToken cancellationToken)
{
var info = await GetLiveStream(id, null, false, cancellationToken).ConfigureAwait(false);
@@ -284,80 +282,65 @@ namespace MediaBrowser.Server.Implementations.LiveTv
private async Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> GetLiveStream(string id, string mediaSourceId, bool isChannel, CancellationToken cancellationToken)
{
await _liveStreamSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
if (string.Equals(id, mediaSourceId, StringComparison.OrdinalIgnoreCase))
{
mediaSourceId = null;
}
try
MediaSourceInfo info;
bool isVideo;
ILiveTvService service;
IDirectStreamProvider directStreamProvider = null;
if (isChannel)
{
MediaSourceInfo info;
bool isVideo;
ILiveTvService service;
IDirectStreamProvider directStreamProvider = null;
var channel = GetInternalChannel(id);
isVideo = channel.ChannelType == ChannelType.TV;
service = GetService(channel);
_logger.Info("Opening channel stream from {0}, external channel Id: {1}", service.Name, channel.ExternalId);
if (isChannel)
var supportsManagedStream = service as ISupportsDirectStreamProvider;
if (supportsManagedStream != null)
{
var channel = GetInternalChannel(id);
isVideo = channel.ChannelType == ChannelType.TV;
service = GetService(channel);
_logger.Info("Opening channel stream from {0}, external channel Id: {1}", service.Name, channel.ExternalId);
var supportsManagedStream = service as ISupportsDirectStreamProvider;
if (supportsManagedStream != null)
{
var streamInfo = await supportsManagedStream.GetChannelStreamWithDirectStreamProvider(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false);
info = streamInfo.Item1;
directStreamProvider = streamInfo.Item2;
}
else
{
info = await service.GetChannelStream(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false);
}
info.RequiresClosing = true;
if (info.RequiresClosing)
{
var idPrefix = service.GetType().FullName.GetMD5().ToString("N") + "_";
info.LiveStreamId = idPrefix + info.Id;
}
var streamInfo = await supportsManagedStream.GetChannelStreamWithDirectStreamProvider(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false);
info = streamInfo.Item1;
directStreamProvider = streamInfo.Item2;
}
else
{
var recording = await GetInternalRecording(id, cancellationToken).ConfigureAwait(false);
isVideo = !string.Equals(recording.MediaType, MediaType.Audio, StringComparison.OrdinalIgnoreCase);
service = GetService(recording);
_logger.Info("Opening recording stream from {0}, external recording Id: {1}", service.Name, recording.ExternalId);
info = await service.GetRecordingStream(recording.ExternalId, null, cancellationToken).ConfigureAwait(false);
info.RequiresClosing = true;
if (info.RequiresClosing)
{
var idPrefix = service.GetType().FullName.GetMD5().ToString("N") + "_";
info.LiveStreamId = idPrefix + info.Id;
}
info = await service.GetChannelStream(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false);
}
info.RequiresClosing = true;
_logger.Info("Live stream info: {0}", _jsonSerializer.SerializeToString(info));
Normalize(info, service, isVideo);
if (info.RequiresClosing)
{
var idPrefix = service.GetType().FullName.GetMD5().ToString("N") + "_";
return new Tuple<MediaSourceInfo, IDirectStreamProvider>(info, directStreamProvider);
info.LiveStreamId = idPrefix + info.Id;
}
}
catch (Exception ex)
else
{
_logger.ErrorException("Error getting channel stream", ex);
var recording = await GetInternalRecording(id, cancellationToken).ConfigureAwait(false);
isVideo = !string.Equals(recording.MediaType, MediaType.Audio, StringComparison.OrdinalIgnoreCase);
service = GetService(recording);
throw;
}
finally
{
_liveStreamSemaphore.Release();
_logger.Info("Opening recording stream from {0}, external recording Id: {1}", service.Name, recording.ExternalId);
info = await service.GetRecordingStream(recording.ExternalId, null, cancellationToken).ConfigureAwait(false);
info.RequiresClosing = true;
if (info.RequiresClosing)
{
var idPrefix = service.GetType().FullName.GetMD5().ToString("N") + "_";
info.LiveStreamId = idPrefix + info.Id;
}
}
_logger.Info("Live stream info: {0}", _jsonSerializer.SerializeToString(info));
Normalize(info, service, isVideo);
return new Tuple<MediaSourceInfo, IDirectStreamProvider>(info, directStreamProvider);
}
private void Normalize(MediaSourceInfo mediaSource, ILiveTvService service, bool isVideo)
@@ -2560,35 +2543,20 @@ namespace MediaBrowser.Server.Implementations.LiveTv
public async Task CloseLiveStream(string id)
{
await _liveStreamSemaphore.WaitAsync().ConfigureAwait(false);
var parts = id.Split(new[] { '_' }, 2);
try
var service = _services.FirstOrDefault(i => string.Equals(i.GetType().FullName.GetMD5().ToString("N"), parts[0], StringComparison.OrdinalIgnoreCase));
if (service == null)
{
var parts = id.Split(new[] { '_' }, 2);
var service = _services.FirstOrDefault(i => string.Equals(i.GetType().FullName.GetMD5().ToString("N"), parts[0], StringComparison.OrdinalIgnoreCase));
if (service == null)
{
throw new ArgumentException("Service not found.");
}
id = parts[1];
_logger.Info("Closing live stream from {0}, stream Id: {1}", service.Name, id);
await service.CloseLiveStream(id, CancellationToken.None).ConfigureAwait(false);
throw new ArgumentException("Service not found.");
}
catch (Exception ex)
{
_logger.ErrorException("Error closing live stream", ex);
throw;
}
finally
{
_liveStreamSemaphore.Release();
}
id = parts[1];
_logger.Info("Closing live stream from {0}, stream Id: {1}", service.Name, id);
await service.CloseLiveStream(id, CancellationToken.None).ConfigureAwait(false);
}
public GuideInfo GetGuideInfo()

View File

@@ -9,6 +9,7 @@ using MediaBrowser.Model.MediaInfo;
using MediaBrowser.Model.Serialization;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
@@ -139,7 +140,14 @@ namespace MediaBrowser.Server.Implementations.LiveTv
try
{
await AddMediaInfo(stream, isAudio, cancellationToken).ConfigureAwait(false);
if (stream.MediaStreams.Any(i => i.Index != -1))
{
await AddMediaInfo(stream, isAudio, cancellationToken).ConfigureAwait(false);
}
else
{
await AddMediaInfoWithProbe(stream, isAudio, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex)
{
@@ -208,10 +216,12 @@ namespace MediaBrowser.Server.Implementations.LiveTv
}
}
private async Task AddMediaInfoInternal(MediaSourceInfo mediaSource, bool isAudio, CancellationToken cancellationToken)
private async Task AddMediaInfoWithProbe(MediaSourceInfo mediaSource, bool isAudio, CancellationToken cancellationToken)
{
var originalRuntime = mediaSource.RunTimeTicks;
var now = DateTime.UtcNow;
var info = await _mediaEncoder.GetMediaInfo(new MediaInfoRequest
{
InputPath = mediaSource.Path,
@@ -221,6 +231,8 @@ namespace MediaBrowser.Server.Implementations.LiveTv
}, cancellationToken).ConfigureAwait(false);
_logger.Info("Live tv media info probe took {0} seconds", (DateTime.UtcNow - now).TotalSeconds.ToString(CultureInfo.InvariantCulture));
mediaSource.Bitrate = info.Bitrate;
mediaSource.Container = info.Container;
mediaSource.Formats = info.Formats;
@@ -272,6 +284,9 @@ namespace MediaBrowser.Server.Implementations.LiveTv
videoStream.BitRate = 1000000;
}
}
// This is coming up false and preventing stream copy
videoStream.IsAVC = null;
}
// Try to estimate this

View File

@@ -233,25 +233,6 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts
protected abstract Task<bool> IsAvailableInternal(TunerHostInfo tuner, string channelId, CancellationToken cancellationToken);
private async Task AddMediaInfo(LiveStream stream, bool isAudio, CancellationToken cancellationToken)
{
//await resourcePool.WaitAsync(cancellationToken).ConfigureAwait(false);
//try
//{
// await AddMediaInfoInternal(mediaSource, isAudio, cancellationToken).ConfigureAwait(false);
// // Leave the resource locked. it will be released upstream
//}
//catch (Exception)
//{
// // Release the resource if there's some kind of failure.
// resourcePool.Release();
// throw;
//}
}
protected abstract bool IsValidChannelId(string channelId);
protected LiveTvOptions GetConfiguration()

View File

@@ -105,7 +105,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
});
}
private Dictionary<string, DiscoverResponse> _modelCache = new Dictionary<string, DiscoverResponse>();
private readonly Dictionary<string, DiscoverResponse> _modelCache = new Dictionary<string, DiscoverResponse>();
private async Task<string> GetModelInfo(TunerHostInfo info, CancellationToken cancellationToken)
{
lock (_modelCache)
@@ -387,6 +387,8 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
}
id += "_" + url.GetMD5().ToString("N");
var enableLocalBuffer = EnableLocalBuffer();
var mediaSource = new MediaSourceInfo
{
Path = url,
@@ -420,8 +422,8 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
BufferMs = 0,
Container = "ts",
Id = id,
SupportsDirectPlay = false,
SupportsDirectStream = true,
SupportsDirectPlay = !enableLocalBuffer,
SupportsDirectStream = enableLocalBuffer,
SupportsTranscoding = true,
IsInfiniteStream = true
};
@@ -488,6 +490,11 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
return channelId.StartsWith(ChannelIdPrefix, StringComparison.OrdinalIgnoreCase);
}
private bool EnableLocalBuffer()
{
return true;
}
protected override async Task<LiveStream> GetChannelStream(TunerHostInfo info, string channelId, string streamId, CancellationToken cancellationToken)
{
var profile = streamId.Split('_')[0];
@@ -502,25 +509,34 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
var mediaSource = await GetMediaSource(info, hdhrId, profile).ConfigureAwait(false);
var liveStream = new HdHomerunLiveStream(mediaSource, _fileSystem, _httpClient, Logger, Config.ApplicationPaths, _appHost);
if (info.AllowHWTranscoding)
if (EnableLocalBuffer())
{
var model = await GetModelInfo(info, cancellationToken).ConfigureAwait(false);
if ((model ?? string.Empty).IndexOf("hdtc", StringComparison.OrdinalIgnoreCase) != -1)
var liveStream = new HdHomerunLiveStream(mediaSource, streamId, _fileSystem, _httpClient, Logger, Config.ApplicationPaths, _appHost);
if (info.AllowHWTranscoding)
{
liveStream.EnableStreamSharing = !info.AllowHWTranscoding;
var model = await GetModelInfo(info, cancellationToken).ConfigureAwait(false);
if ((model ?? string.Empty).IndexOf("hdtc", StringComparison.OrdinalIgnoreCase) != -1)
{
liveStream.EnableStreamSharing = !info.AllowHWTranscoding;
}
else
{
liveStream.EnableStreamSharing = true;
}
}
else
{
liveStream.EnableStreamSharing = true;
}
return liveStream;
}
else
{
liveStream.EnableStreamSharing = true;
var liveStream = new LiveStream(mediaSource);
liveStream.EnableStreamSharing = false;
return liveStream;
}
return liveStream;
}
public async Task Validate(TunerHostInfo info)

View File

@@ -13,6 +13,7 @@ using MediaBrowser.Model.MediaInfo;
using MediaBrowser.Server.Implementations.LiveTv.EmbyTV;
using System.Collections.Generic;
using System.Linq;
using MediaBrowser.Common.Extensions;
namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
{
@@ -26,8 +27,10 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
private readonly CancellationTokenSource _liveStreamCancellationTokenSource = new CancellationTokenSource();
private readonly TaskCompletionSource<bool> _liveStreamTaskCompletionSource = new TaskCompletionSource<bool>();
private readonly MulticastStream _multicastStream;
public HdHomerunLiveStream(MediaSourceInfo mediaSource, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost)
public HdHomerunLiveStream(MediaSourceInfo mediaSource, string originalStreamId, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost)
: base(mediaSource)
{
_fileSystem = fileSystem;
@@ -35,6 +38,8 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
_logger = logger;
_appPaths = appPaths;
_appHost = appHost;
OriginalStreamId = originalStreamId;
_multicastStream = new MulticastStream(_logger);
}
protected override async Task OpenInternal(CancellationToken openCancellationToken)
@@ -44,22 +49,18 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
var mediaSource = OriginalMediaSource;
var url = mediaSource.Path;
var tempFile = Path.Combine(_appPaths.TranscodingTempPath, Guid.NewGuid().ToString("N") + ".ts");
Directory.CreateDirectory(Path.GetDirectoryName(tempFile));
_logger.Info("Opening HDHR Live stream from {0} to {1}", url, tempFile);
var output = _fileSystem.GetFileStream(tempFile, FileMode.Create, FileAccess.Write, FileShare.Read);
_logger.Info("Opening HDHR Live stream from {0}", url);
var taskCompletionSource = new TaskCompletionSource<bool>();
StartStreamingToTempFile(output, tempFile, url, taskCompletionSource, _liveStreamCancellationTokenSource.Token);
StartStreaming(url, taskCompletionSource, _liveStreamCancellationTokenSource.Token);
//OpenedMediaSource.Protocol = MediaProtocol.File;
//OpenedMediaSource.Path = tempFile;
//OpenedMediaSource.ReadAtNativeFramerate = true;
OpenedMediaSource.Path = _appHost.GetLocalApiUrl("localhost") + "/LiveTv/LiveStreamFiles/" + Path.GetFileNameWithoutExtension(tempFile) + "/stream.ts";
OpenedMediaSource.Path = _appHost.GetLocalApiUrl("localhost") + "/LiveTv/LiveStreamFiles/" + UniqueId + "/stream.ts";
OpenedMediaSource.Protocol = MediaProtocol.Http;
OpenedMediaSource.SupportsDirectPlay = false;
OpenedMediaSource.SupportsDirectStream = true;
@@ -78,178 +79,67 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
return _liveStreamTaskCompletionSource.Task;
}
private async Task StartStreamingToTempFile(Stream outputStream, string tempFilePath, string url, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
private async Task StartStreaming(string url, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
{
await Task.Run(async () =>
{
using (outputStream)
var isFirstAttempt = true;
while (!cancellationToken.IsCancellationRequested)
{
var isFirstAttempt = true;
while (!cancellationToken.IsCancellationRequested)
try
{
try
using (var response = await _httpClient.SendAsync(new HttpRequestOptions
{
using (var response = await _httpClient.SendAsync(new HttpRequestOptions
{
Url = url,
CancellationToken = cancellationToken,
BufferContent = false
Url = url,
CancellationToken = cancellationToken,
BufferContent = false
}, "GET").ConfigureAwait(false))
{
_logger.Info("Opened HDHR stream from {0}", url);
}, "GET").ConfigureAwait(false))
{
_logger.Info("Opened HDHR stream from {0}", url);
if (!cancellationToken.IsCancellationRequested)
if (!cancellationToken.IsCancellationRequested)
{
_logger.Info("Beginning multicastStream.CopyUntilCancelled");
Action onStarted = null;
if (isFirstAttempt)
{
_logger.Info("Beginning DirectRecorder.CopyUntilCancelled");
Action onStarted = null;
if (isFirstAttempt)
{
onStarted = () => ResolveWhenExists(openTaskCompletionSource, tempFilePath, cancellationToken);
}
await CopyUntilCancelled(response.Content, outputStream, onStarted, cancellationToken).ConfigureAwait(false);
onStarted = () => openTaskCompletionSource.TrySetResult(true);
}
await _multicastStream.CopyUntilCancelled(response.Content, onStarted, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
if (isFirstAttempt)
{
_logger.ErrorException("Error opening live stream:", ex);
openTaskCompletionSource.TrySetException(ex);
break;
}
catch (Exception ex)
{
if (isFirstAttempt)
{
_logger.ErrorException("Error opening live stream:", ex);
openTaskCompletionSource.TrySetException(ex);
break;
}
_logger.ErrorException("Error copying live stream, will reopen", ex);
}
isFirstAttempt = false;
_logger.ErrorException("Error copying live stream, will reopen", ex);
}
isFirstAttempt = false;
}
_liveStreamTaskCompletionSource.TrySetResult(true);
DeleteTempFile(tempFilePath);
}).ConfigureAwait(false);
}
private readonly List<Tuple<Stream, CancellationToken, TaskCompletionSource<bool>>> _additionalStreams = new List<Tuple<Stream, CancellationToken, TaskCompletionSource<bool>>>();
public Task CopyToAsync(Stream stream, CancellationToken cancellationToken)
{
var taskCompletionSource = new TaskCompletionSource<bool>();
_additionalStreams.Add(new Tuple<Stream, CancellationToken, TaskCompletionSource<bool>>(stream, cancellationToken, taskCompletionSource));
return taskCompletionSource.Task;
}
private void PopAdditionalStream(Tuple<Stream, CancellationToken, TaskCompletionSource<bool>> stream, Exception exception)
{
if (_additionalStreams.Remove(stream))
{
stream.Item3.TrySetException(exception);
}
}
private const int BufferSize = 81920;
private async Task CopyUntilCancelled(Stream source, Stream target, Action onStarted, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var bytesRead = await CopyToAsyncInternal(source, target, BufferSize, onStarted, cancellationToken).ConfigureAwait(false);
onStarted = null;
//var position = fs.Position;
//_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
if (bytesRead == 0)
{
await Task.Delay(100).ConfigureAwait(false);
}
}
}
private async Task<int> CopyToAsyncInternal(Stream source, Stream destination, Int32 bufferSize, Action onStarted, CancellationToken cancellationToken)
{
byte[] buffer = new byte[bufferSize];
int bytesRead;
int totalBytesRead = 0;
while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
{
await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
var additionalStreams = _additionalStreams.ToList();
foreach (var additionalStream in additionalStreams)
{
cancellationToken.ThrowIfCancellationRequested();
try
{
await additionalStream.Item1.WriteAsync(buffer, 0, bytesRead, additionalStream.Item2).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.ErrorException("Error writing HDHR data to stream", ex);
PopAdditionalStream(additionalStream, ex);
}
}
totalBytesRead += bytesRead;
if (onStarted != null)
{
onStarted();
}
onStarted = null;
}
return totalBytesRead;
}
private async void ResolveWhenExists(TaskCompletionSource<bool> taskCompletionSource, string file, CancellationToken cancellationToken)
{
while (!File.Exists(file) && !cancellationToken.IsCancellationRequested)
{
await Task.Delay(50).ConfigureAwait(false);
}
taskCompletionSource.TrySetResult(true);
}
private async void DeleteTempFile(string path)
{
for (var i = 0; i < 10; i++)
{
try
{
File.Delete(path);
return;
}
catch (FileNotFoundException)
{
return;
}
catch (DirectoryNotFoundException)
{
return;
}
catch (Exception ex)
{
_logger.ErrorException("Error deleting temp file {0}", ex, path);
}
await Task.Delay(1000).ConfigureAwait(false);
}
return _multicastStream.CopyToAsync(stream);
}
}
}

View File

@@ -0,0 +1,96 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MediaBrowser.Model.Logging;
namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts
{
public class MulticastStream
{
private readonly List<QueueStream> _outputStreams = new List<QueueStream>();
private const int BufferSize = 81920;
private CancellationToken _cancellationToken;
private readonly ILogger _logger;
public MulticastStream(ILogger logger)
{
_logger = logger;
}
public async Task CopyUntilCancelled(Stream source, Action onStarted, CancellationToken cancellationToken)
{
_cancellationToken = cancellationToken;
while (!cancellationToken.IsCancellationRequested)
{
byte[] buffer = new byte[BufferSize];
var bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
if (bytesRead > 0)
{
byte[] copy = new byte[bytesRead];
Buffer.BlockCopy(buffer, 0, copy, 0, bytesRead);
List<QueueStream> streams = null;
lock (_outputStreams)
{
streams = _outputStreams.ToList();
}
foreach (var stream in streams)
{
stream.Queue(copy);
}
if (onStarted != null)
{
var onStartedCopy = onStarted;
onStarted = null;
Task.Run(onStartedCopy);
}
}
else
{
await Task.Delay(100).ConfigureAwait(false);
}
}
}
public Task CopyToAsync(Stream stream)
{
var result = new QueueStream(stream, _logger)
{
OnFinished = OnFinished
};
lock (_outputStreams)
{
_outputStreams.Add(result);
}
result.Start(_cancellationToken);
return result.TaskCompletion.Task;
}
public void RemoveOutputStream(QueueStream stream)
{
lock (_outputStreams)
{
_outputStreams.Remove(stream);
}
}
private void OnFinished(QueueStream queueStream)
{
RemoveOutputStream(queueStream);
}
}
}

View File

@@ -0,0 +1,93 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MediaBrowser.Model.Logging;
namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts
{
public class QueueStream
{
private readonly Stream _outputStream;
private readonly ConcurrentQueue<byte[]> _queue = new ConcurrentQueue<byte[]>();
private CancellationToken _cancellationToken;
public TaskCompletionSource<bool> TaskCompletion { get; private set; }
public Action<QueueStream> OnFinished { get; set; }
private readonly ILogger _logger;
public QueueStream(Stream outputStream, ILogger logger)
{
_outputStream = outputStream;
_logger = logger;
TaskCompletion = new TaskCompletionSource<bool>();
}
public void Queue(byte[] bytes)
{
_queue.Enqueue(bytes);
}
public void Start(CancellationToken cancellationToken)
{
_cancellationToken = cancellationToken;
Task.Run(StartInternal);
}
private byte[] Dequeue()
{
byte[] bytes;
if (_queue.TryDequeue(out bytes))
{
return bytes;
}
return null;
}
private async Task StartInternal()
{
var cancellationToken = _cancellationToken;
try
{
while (!cancellationToken.IsCancellationRequested)
{
var bytes = Dequeue();
if (bytes != null)
{
await _outputStream.WriteAsync(bytes, 0, bytes.Length, cancellationToken).ConfigureAwait(false);
}
else
{
await Task.Delay(50, cancellationToken).ConfigureAwait(false);
}
}
TaskCompletion.TrySetResult(true);
_logger.Debug("QueueStream complete");
}
catch (OperationCanceledException)
{
_logger.Debug("QueueStream cancelled");
TaskCompletion.TrySetCanceled();
}
catch (Exception ex)
{
_logger.ErrorException("Error in QueueStream", ex);
TaskCompletion.TrySetException(ex);
}
finally
{
if (OnFinished != null)
{
OnFinished(this);
}
}
}
}
}

View File

@@ -47,7 +47,6 @@
"NotificationOptionTaskFailed": "Scheduled task failure",
"NotificationOptionInstallationFailed": "Installation failure",
"NotificationOptionNewLibraryContent": "New content added",
"NotificationOptionNewLibraryContentMultiple": "New content added (multiple)",
"NotificationOptionCameraImageUploaded": "Camera image uploaded",
"NotificationOptionUserLockedOut": "User locked out",
"NotificationOptionServerRestartRequired": "Server restart required",

View File

@@ -161,7 +161,6 @@
<Compile Include="HttpServer\HttpListenerHost.cs" />
<Compile Include="HttpServer\HttpResultFactory.cs" />
<Compile Include="HttpServer\LoggerUtils.cs" />
<Compile Include="HttpServer\NativeWebSocket.cs" />
<Compile Include="HttpServer\RangeRequestWriter.cs" />
<Compile Include="HttpServer\ResponseFilter.cs" />
<Compile Include="HttpServer\Security\AuthService.cs" />
@@ -247,6 +246,8 @@
<Compile Include="LiveTv\ProgramImageProvider.cs" />
<Compile Include="LiveTv\RecordingImageProvider.cs" />
<Compile Include="LiveTv\RefreshChannelsScheduledTask.cs" />
<Compile Include="LiveTv\TunerHosts\MulticastStream.cs" />
<Compile Include="LiveTv\TunerHosts\QueueStream.cs" />
<Compile Include="LiveTv\TunerHosts\SatIp\ChannelScan.cs" />
<Compile Include="LiveTv\TunerHosts\SatIp\Rtcp\ReportBlock.cs" />
<Compile Include="LiveTv\TunerHosts\SatIp\Rtcp\RtcpAppPacket.cs" />

View File

@@ -89,13 +89,6 @@ namespace MediaBrowser.Server.Implementations.Notifications
Variables = new List<string>{"Name"}
},
new NotificationTypeInfo
{
Type = NotificationType.NewLibraryContentMultiple.ToString(),
DefaultTitle = "{ItemCount} new items have been added to your media library.",
Variables = new List<string>{"ItemCount"}
},
new NotificationTypeInfo
{
Type = NotificationType.AudioPlayback.ToString(),

View File

@@ -128,7 +128,7 @@ namespace MediaBrowser.Server.Implementations.Persistence
var cacheSize = _config.Configuration.SqliteCacheSize;
if (cacheSize <= 0)
{
cacheSize = Math.Min(Environment.ProcessorCount * 50000, 200000);
cacheSize = Math.Min(Environment.ProcessorCount * 50000, 100000);
}
var connection = await DbConnector.Connect(DbFilePath, false, false, 0 - cacheSize).ConfigureAwait(false);
@@ -2656,6 +2656,11 @@ namespace MediaBrowser.Server.Implementations.Persistence
}
}
if (query.SimilarTo != null)
{
whereClauses.Add("SimilarityScore > 0");
}
if (query.IsFolder.HasValue)
{
whereClauses.Add("IsFolder=@IsFolder");

View File

@@ -20,7 +20,7 @@ namespace MediaBrowser.Server.Implementations.ServerManager
public class WebSocketConnection : IWebSocketConnection
{
public event EventHandler<EventArgs> Closed;
/// <summary>
/// The _socket
/// </summary>
@@ -36,11 +36,6 @@ namespace MediaBrowser.Server.Implementations.ServerManager
/// </summary>
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
/// <summary>
/// The _send semaphore
/// </summary>
private readonly SemaphoreSlim _sendSemaphore = new SemaphoreSlim(1, 1);
/// <summary>
/// The logger
/// </summary>
@@ -210,7 +205,7 @@ namespace MediaBrowser.Server.Implementations.ServerManager
_logger.ErrorException("Error processing web socket message", ex);
}
}
/// <summary>
/// Sends a message asynchronously.
/// </summary>
@@ -237,7 +232,7 @@ namespace MediaBrowser.Server.Implementations.ServerManager
/// <param name="buffer">The buffer.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>Task.</returns>
public async Task SendAsync(byte[] buffer, CancellationToken cancellationToken)
public Task SendAsync(byte[] buffer, CancellationToken cancellationToken)
{
if (buffer == null)
{
@@ -246,33 +241,10 @@ namespace MediaBrowser.Server.Implementations.ServerManager
cancellationToken.ThrowIfCancellationRequested();
// Per msdn docs, attempting to send simultaneous messages will result in one failing.
// This should help us workaround that and ensure all messages get sent
await _sendSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await _socket.SendAsync(buffer, true, cancellationToken);
}
catch (OperationCanceledException)
{
_logger.Info("WebSocket message to {0} was cancelled", RemoteEndPoint);
throw;
}
catch (Exception ex)
{
_logger.ErrorException("Error sending WebSocket message {0}", ex, RemoteEndPoint);
throw;
}
finally
{
_sendSemaphore.Release();
}
return _socket.SendAsync(buffer, true, cancellationToken);
}
public async Task SendAsync(string text, CancellationToken cancellationToken)
public Task SendAsync(string text, CancellationToken cancellationToken)
{
if (string.IsNullOrWhiteSpace(text))
{
@@ -281,30 +253,7 @@ namespace MediaBrowser.Server.Implementations.ServerManager
cancellationToken.ThrowIfCancellationRequested();
// Per msdn docs, attempting to send simultaneous messages will result in one failing.
// This should help us workaround that and ensure all messages get sent
await _sendSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await _socket.SendAsync(text, true, cancellationToken);
}
catch (OperationCanceledException)
{
_logger.Info("WebSocket message to {0} was cancelled", RemoteEndPoint);
throw;
}
catch (Exception ex)
{
_logger.ErrorException("Error sending WebSocket message {0}", ex, RemoteEndPoint);
throw;
}
finally
{
_sendSemaphore.Release();
}
return _socket.SendAsync(text, true, cancellationToken);
}
/// <summary>