update live streams

This commit is contained in:
Luke Pulverenti
2016-10-05 03:15:29 -04:00
parent c4e137e6cf
commit b9cacd8076
35 changed files with 295 additions and 627 deletions

View File

@@ -30,7 +30,7 @@ namespace MediaBrowser.Server.Implementations.Channels
return Task.FromResult<IEnumerable<MediaSourceInfo>>(new List<MediaSourceInfo>());
}
public Task<MediaSourceInfo> OpenMediaSource(string openToken, CancellationToken cancellationToken)
public Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> OpenMediaSource(string openToken, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

View File

@@ -43,6 +43,7 @@ using MediaBrowser.Server.Implementations.Library.Resolvers;
using MoreLinq;
using SortOrder = MediaBrowser.Model.Entities.SortOrder;
using VideoResolver = MediaBrowser.Naming.Video.VideoResolver;
using MediaBrowser.Common.Configuration;
namespace MediaBrowser.Server.Implementations.Library
{
@@ -1900,6 +1901,24 @@ namespace MediaBrowser.Server.Implementations.Library
options.EnableInternetProviders = ConfigurationManager.Configuration.EnableInternetProviders;
}
if (options.SchemaVersion < 2)
{
var chapterOptions = ConfigurationManager.GetConfiguration<ChapterOptions>("chapters");
options.ExtractChapterImagesDuringLibraryScan = chapterOptions.ExtractDuringLibraryScan;
if (collectionFolder != null)
{
if (string.Equals(collectionFolder.CollectionType, "movies", StringComparison.OrdinalIgnoreCase))
{
options.EnableChapterImageExtraction = chapterOptions.EnableMovieChapterImageExtraction;
}
else if (string.Equals(collectionFolder.CollectionType, CollectionType.TvShows, StringComparison.OrdinalIgnoreCase))
{
options.EnableChapterImageExtraction = chapterOptions.EnableEpisodeChapterImageExtraction;
}
}
}
return options;
}

View File

@@ -367,7 +367,9 @@ namespace MediaBrowser.Server.Implementations.Library
var tuple = GetProvider(request.OpenToken);
var provider = tuple.Item1;
var mediaSource = await provider.OpenMediaSource(tuple.Item2, cancellationToken).ConfigureAwait(false);
var mediaSourceTuple = await provider.OpenMediaSource(tuple.Item2, cancellationToken).ConfigureAwait(false);
var mediaSource = mediaSourceTuple.Item1;
if (string.IsNullOrWhiteSpace(mediaSource.LiveStreamId))
{
@@ -381,8 +383,10 @@ namespace MediaBrowser.Server.Implementations.Library
Date = DateTime.UtcNow,
EnableCloseTimer = enableAutoClose,
Id = mediaSource.LiveStreamId,
MediaSource = mediaSource
MediaSource = mediaSource,
DirectStreamProvider = mediaSourceTuple.Item2
};
_openStreams[mediaSource.LiveStreamId] = info;
if (enableAutoClose)
@@ -414,7 +418,7 @@ namespace MediaBrowser.Server.Implementations.Library
}
}
public async Task<MediaSourceInfo> GetLiveStream(string id, CancellationToken cancellationToken)
public async Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> GetLiveStreamWithDirectStreamProvider(string id, CancellationToken cancellationToken)
{
if (string.IsNullOrWhiteSpace(id))
{
@@ -430,7 +434,7 @@ namespace MediaBrowser.Server.Implementations.Library
LiveStreamInfo info;
if (_openStreams.TryGetValue(id, out info))
{
return info.MediaSource;
return new Tuple<MediaSourceInfo, IDirectStreamProvider>(info.MediaSource, info.DirectStreamProvider);
}
else
{
@@ -443,6 +447,12 @@ namespace MediaBrowser.Server.Implementations.Library
}
}
public async Task<MediaSourceInfo> GetLiveStream(string id, CancellationToken cancellationToken)
{
var result = await GetLiveStreamWithDirectStreamProvider(id, cancellationToken).ConfigureAwait(false);
return result.Item1;
}
public async Task PingLiveStream(string id, CancellationToken cancellationToken)
{
await _liveStreamSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
@@ -630,6 +640,7 @@ namespace MediaBrowser.Server.Implementations.Library
public string Id;
public bool Closed;
public MediaSourceInfo MediaSource;
public IDirectStreamProvider DirectStreamProvider;
}
}
}

View File

@@ -36,7 +36,7 @@ using Microsoft.Win32;
namespace MediaBrowser.Server.Implementations.LiveTv.EmbyTV
{
public class EmbyTV : ILiveTvService, ISupportsNewTimerIds, IDisposable
public class EmbyTV : ILiveTvService, ISupportsDirectStreamProvider, ISupportsNewTimerIds, IDisposable
{
private readonly IApplicationHost _appHpst;
private readonly ILogger _logger;
@@ -828,10 +828,17 @@ namespace MediaBrowser.Server.Implementations.LiveTv.EmbyTV
private readonly Dictionary<string, LiveStream> _liveStreams = new Dictionary<string, LiveStream>();
public async Task<MediaSourceInfo> GetChannelStream(string channelId, string streamId, CancellationToken cancellationToken)
{
var result = await GetChannelStreamWithDirectStreamProvider(channelId, streamId, cancellationToken).ConfigureAwait(false);
return result.Item1;
}
public async Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> GetChannelStreamWithDirectStreamProvider(string channelId, string streamId, CancellationToken cancellationToken)
{
var result = await GetChannelStreamInternal(channelId, streamId, cancellationToken).ConfigureAwait(false);
return result.Item2;
return new Tuple<MediaSourceInfo, IDirectStreamProvider>(result.Item2, result.Item1 as IDirectStreamProvider);
}
private MediaSourceInfo CloneMediaSource(MediaSourceInfo mediaSource, int consumerId, bool enableStreamSharing)

View File

@@ -227,10 +227,12 @@ namespace MediaBrowser.Server.Implementations.LiveTv
public async Task<MediaSourceInfo> GetRecordingStream(string id, CancellationToken cancellationToken)
{
return await GetLiveStream(id, null, false, cancellationToken).ConfigureAwait(false);
var info = await GetLiveStream(id, null, false, cancellationToken).ConfigureAwait(false);
return info.Item1;
}
public async Task<MediaSourceInfo> GetChannelStream(string id, string mediaSourceId, CancellationToken cancellationToken)
public async Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> GetChannelStream(string id, string mediaSourceId, CancellationToken cancellationToken)
{
return await GetLiveStream(id, mediaSourceId, true, cancellationToken).ConfigureAwait(false);
}
@@ -280,7 +282,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv
return _services.FirstOrDefault(i => string.Equals(i.Name, name, StringComparison.OrdinalIgnoreCase));
}
private async Task<MediaSourceInfo> GetLiveStream(string id, string mediaSourceId, bool isChannel, CancellationToken cancellationToken)
private async Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> GetLiveStream(string id, string mediaSourceId, bool isChannel, CancellationToken cancellationToken)
{
await _liveStreamSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
@@ -294,6 +296,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv
MediaSourceInfo info;
bool isVideo;
ILiveTvService service;
IDirectStreamProvider directStreamProvider = null;
if (isChannel)
{
@@ -301,7 +304,18 @@ namespace MediaBrowser.Server.Implementations.LiveTv
isVideo = channel.ChannelType == ChannelType.TV;
service = GetService(channel);
_logger.Info("Opening channel stream from {0}, external channel Id: {1}", service.Name, channel.ExternalId);
info = await service.GetChannelStream(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false);
var supportsManagedStream = service as ISupportsDirectStreamProvider;
if (supportsManagedStream != null)
{
var streamInfo = await supportsManagedStream.GetChannelStreamWithDirectStreamProvider(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false);
info = streamInfo.Item1;
directStreamProvider = streamInfo.Item2;
}
else
{
info = await service.GetChannelStream(channel.ExternalId, mediaSourceId, cancellationToken).ConfigureAwait(false);
}
info.RequiresClosing = true;
if (info.RequiresClosing)
@@ -332,7 +346,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv
_logger.Info("Live stream info: {0}", _jsonSerializer.SerializeToString(info));
Normalize(info, service, isVideo);
return info;
return new Tuple<MediaSourceInfo, IDirectStreamProvider>(info, directStreamProvider);
}
catch (Exception ex)
{
@@ -1881,11 +1895,11 @@ namespace MediaBrowser.Server.Implementations.LiveTv
{
if (query.IsScheduled.Value)
{
timers = timers.Where(i => i.Item1.Status == RecordingStatus.New || i.Item1.Status == RecordingStatus.Scheduled);
timers = timers.Where(i => i.Item1.Status == RecordingStatus.New);
}
else
{
timers = timers.Where(i => !(i.Item1.Status == RecordingStatus.New || i.Item1.Status == RecordingStatus.Scheduled));
timers = timers.Where(i => !(i.Item1.Status == RecordingStatus.New));
}
}

View File

@@ -116,17 +116,20 @@ namespace MediaBrowser.Server.Implementations.LiveTv
return list;
}
public async Task<MediaSourceInfo> OpenMediaSource(string openToken, CancellationToken cancellationToken)
public async Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> OpenMediaSource(string openToken, CancellationToken cancellationToken)
{
MediaSourceInfo stream;
MediaSourceInfo stream = null;
const bool isAudio = false;
var keys = openToken.Split(new[] { StreamIdDelimeter }, 3);
var mediaSourceId = keys.Length >= 3 ? keys[2] : null;
IDirectStreamProvider directStreamProvider = null;
if (string.Equals(keys[0], typeof(LiveTvChannel).Name, StringComparison.OrdinalIgnoreCase))
{
stream = await _liveTvManager.GetChannelStream(keys[1], mediaSourceId, cancellationToken).ConfigureAwait(false);
var info = await _liveTvManager.GetChannelStream(keys[1], mediaSourceId, cancellationToken).ConfigureAwait(false);
stream = info.Item1;
directStreamProvider = info.Item2;
}
else
{
@@ -142,7 +145,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv
_logger.ErrorException("Error probing live tv stream", ex);
}
return stream;
return new Tuple<MediaSourceInfo, IDirectStreamProvider>(stream, directStreamProvider);
}
private async Task AddMediaInfo(MediaSourceInfo mediaSource, bool isAudio, CancellationToken cancellationToken)

View File

@@ -6,14 +6,16 @@ using CommonIO;
using MediaBrowser.Common.Net;
using MediaBrowser.Controller;
using MediaBrowser.Controller.LiveTv;
using MediaBrowser.Controller.Library;
using MediaBrowser.Model.Dto;
using MediaBrowser.Model.Logging;
using MediaBrowser.Model.MediaInfo;
using MediaBrowser.Server.Implementations.LiveTv.EmbyTV;
using System.Collections.Generic;
namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
{
public class HdHomerunLiveStream : LiveStream
public class HdHomerunLiveStream : LiveStream, IDirectStreamProvider
{
private readonly ILogger _logger;
private readonly IHttpClient _httpClient;
@@ -106,7 +108,7 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
{
onStarted = () => ResolveWhenExists(openTaskCompletionSource, tempFilePath, cancellationToken);
}
await DirectRecorder.CopyUntilCancelled(response.Content, outputStream, onStarted, cancellationToken).ConfigureAwait(false);
await CopyUntilCancelled(response.Content, outputStream, onStarted, cancellationToken).ConfigureAwait(false);
}
}
}
@@ -137,6 +139,79 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
}).ConfigureAwait(false);
}
private List<Tuple<Stream, CancellationToken, TaskCompletionSource<bool>>> _additionalStreams = new List<Tuple<Stream, CancellationToken, TaskCompletionSource<bool>>>();
public Task CopyToAsync(Stream stream, CancellationToken cancellationToken)
{
var taskCompletionSource = new TaskCompletionSource<bool>();
_additionalStreams.Add(new Tuple<Stream, CancellationToken, TaskCompletionSource<bool>>(stream, cancellationToken, taskCompletionSource));
return taskCompletionSource.Task;
}
private void PopAdditionalStream(Tuple<Stream, CancellationToken, TaskCompletionSource<bool>> stream, Exception exception)
{
if (_additionalStreams.Remove(stream))
{
stream.Item3.TrySetException(exception);
}
}
private const int BufferSize = 81920;
private async Task CopyUntilCancelled(Stream source, Stream target, Action onStarted, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var bytesRead = await CopyToAsyncInternal(source, target, BufferSize, onStarted, cancellationToken).ConfigureAwait(false);
onStarted = null;
//var position = fs.Position;
//_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
if (bytesRead == 0)
{
await Task.Delay(100).ConfigureAwait(false);
}
}
}
private async Task<int> CopyToAsyncInternal(Stream source, Stream destination, Int32 bufferSize, Action onStarted, CancellationToken cancellationToken)
{
byte[] buffer = new byte[bufferSize];
int bytesRead;
int totalBytesRead = 0;
while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false)) != 0)
{
await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken).ConfigureAwait(false);
foreach (var additionalStream in _additionalStreams)
{
cancellationToken.ThrowIfCancellationRequested();
try
{
await additionalStream.Item1.WriteAsync(buffer, 0, bytesRead, additionalStream.Item2).ConfigureAwait(false);
}
catch (Exception ex)
{
PopAdditionalStream(additionalStream, ex);
}
}
totalBytesRead += bytesRead;
if (onStarted != null)
{
onStarted();
}
onStarted = null;
}
return totalBytesRead;
}
private async void ResolveWhenExists(TaskCompletionSource<bool> taskCompletionSource, string file, CancellationToken cancellationToken)
{
while (!File.Exists(file) && !cancellationToken.IsCancellationRequested)

View File

@@ -99,7 +99,7 @@ namespace MediaBrowser.Server.Implementations.Sync
// Do not use a pipe here because Roku http requests to the server will fail, without any explicit error message.
private const string StreamIdDelimeterString = "_";
public async Task<MediaSourceInfo> OpenMediaSource(string openToken, CancellationToken cancellationToken)
public async Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> OpenMediaSource(string openToken, CancellationToken cancellationToken)
{
var openKeys = openToken.Split(new[] { StreamIdDelimeterString[0] }, 3);
@@ -137,7 +137,7 @@ namespace MediaBrowser.Server.Implementations.Sync
mediaSource.Protocol = dynamicInfo.Protocol;
mediaSource.RequiredHttpHeaders = dynamicInfo.RequiredHttpHeaders;
return mediaSource;
return new Tuple<MediaSourceInfo, IDirectStreamProvider>(mediaSource, null);
}
private void SetStaticMediaSourceInfo(LocalItem item, MediaSourceInfo mediaSource)