mirror of
https://github.com/jellyfin/jellyfin.git
synced 2026-04-07 19:02:11 +01:00
Merge pull request #6538 from cvium/livetv_oh_no
This commit is contained in:
@@ -4,6 +4,7 @@
|
||||
|
||||
using System;
|
||||
using System.Globalization;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MediaBrowser.Controller.Library;
|
||||
@@ -41,6 +42,11 @@ namespace Emby.Server.Implementations.Library
|
||||
return _closeFn();
|
||||
}
|
||||
|
||||
public Stream GetStream()
|
||||
{
|
||||
throw new NotSupportedException();
|
||||
}
|
||||
|
||||
public Task Open(CancellationToken openCancellationToken)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
|
||||
@@ -587,13 +587,6 @@ namespace Emby.Server.Implementations.Library
|
||||
mediaSource.InferTotalBitrate();
|
||||
}
|
||||
|
||||
public Task<IDirectStreamProvider> GetDirectStreamProviderByUniqueId(string uniqueId, CancellationToken cancellationToken)
|
||||
{
|
||||
var info = _openStreams.FirstOrDefault(i => i.Value != null && string.Equals(i.Value.UniqueId, uniqueId, StringComparison.OrdinalIgnoreCase));
|
||||
|
||||
return Task.FromResult(info.Value as IDirectStreamProvider);
|
||||
}
|
||||
|
||||
public async Task<LiveStreamResponse> OpenLiveStream(LiveStreamRequest request, CancellationToken cancellationToken)
|
||||
{
|
||||
var result = await OpenLiveStreamInternal(request, cancellationToken).ConfigureAwait(false);
|
||||
@@ -602,7 +595,8 @@ namespace Emby.Server.Implementations.Library
|
||||
|
||||
public async Task<MediaSourceInfo> GetLiveStreamMediaInfo(string id, CancellationToken cancellationToken)
|
||||
{
|
||||
var liveStreamInfo = await GetLiveStreamInfo(id, cancellationToken).ConfigureAwait(false);
|
||||
// TODO probably shouldn't throw here but it is kept for "backwards compatibility"
|
||||
var liveStreamInfo = GetLiveStreamInfo(id) ?? throw new ResourceNotFoundException();
|
||||
|
||||
var mediaSource = liveStreamInfo.MediaSource;
|
||||
|
||||
@@ -771,18 +765,19 @@ namespace Emby.Server.Implementations.Library
|
||||
mediaSource.InferTotalBitrate(true);
|
||||
}
|
||||
|
||||
public async Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> GetLiveStreamWithDirectStreamProvider(string id, CancellationToken cancellationToken)
|
||||
public Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> GetLiveStreamWithDirectStreamProvider(string id, CancellationToken cancellationToken)
|
||||
{
|
||||
if (string.IsNullOrEmpty(id))
|
||||
{
|
||||
throw new ArgumentNullException(nameof(id));
|
||||
}
|
||||
|
||||
var info = await GetLiveStreamInfo(id, cancellationToken).ConfigureAwait(false);
|
||||
return new Tuple<MediaSourceInfo, IDirectStreamProvider>(info.MediaSource, info as IDirectStreamProvider);
|
||||
// TODO probably shouldn't throw here but it is kept for "backwards compatibility"
|
||||
var info = GetLiveStreamInfo(id) ?? throw new ResourceNotFoundException();
|
||||
return Task.FromResult(new Tuple<MediaSourceInfo, IDirectStreamProvider>(info.MediaSource, info as IDirectStreamProvider));
|
||||
}
|
||||
|
||||
private Task<ILiveStream> GetLiveStreamInfo(string id, CancellationToken cancellationToken)
|
||||
public ILiveStream GetLiveStreamInfo(string id)
|
||||
{
|
||||
if (string.IsNullOrEmpty(id))
|
||||
{
|
||||
@@ -791,12 +786,16 @@ namespace Emby.Server.Implementations.Library
|
||||
|
||||
if (_openStreams.TryGetValue(id, out ILiveStream info))
|
||||
{
|
||||
return Task.FromResult(info);
|
||||
}
|
||||
else
|
||||
{
|
||||
return Task.FromException<ILiveStream>(new ResourceNotFoundException());
|
||||
return info;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public ILiveStream GetLiveStreamInfoByUniqueId(string uniqueId)
|
||||
{
|
||||
return _openStreams.Values.FirstOrDefault(stream => string.Equals(uniqueId, stream?.UniqueId, StringComparison.OrdinalIgnoreCase));
|
||||
}
|
||||
|
||||
public async Task<MediaSourceInfo> GetLiveStream(string id, CancellationToken cancellationToken)
|
||||
|
||||
@@ -5,6 +5,7 @@ using System.IO;
|
||||
using System.Net.Http;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Jellyfin.Api.Helpers;
|
||||
using MediaBrowser.Common.Net;
|
||||
using MediaBrowser.Controller.Library;
|
||||
using MediaBrowser.Model.Dto;
|
||||
@@ -50,16 +51,23 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
|
||||
{
|
||||
onStarted();
|
||||
|
||||
_logger.LogInformation("Copying recording stream to file {0}", targetFile);
|
||||
_logger.LogInformation("Copying recording to file {FilePath}", targetFile);
|
||||
|
||||
// The media source is infinite so we need to handle stopping ourselves
|
||||
using var durationToken = new CancellationTokenSource(duration);
|
||||
using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, durationToken.Token);
|
||||
var linkedCancellationToken = cancellationTokenSource.Token;
|
||||
|
||||
await directStreamProvider.CopyToAsync(output, cancellationTokenSource.Token).ConfigureAwait(false);
|
||||
await using var fileStream = new ProgressiveFileStream(directStreamProvider.GetStream());
|
||||
await _streamHelper.CopyToAsync(
|
||||
fileStream,
|
||||
output,
|
||||
IODefaults.CopyToBufferSize,
|
||||
1000,
|
||||
linkedCancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
_logger.LogInformation("Recording completed to file {0}", targetFile);
|
||||
_logger.LogInformation("Recording completed: {FilePath}", targetFile);
|
||||
}
|
||||
|
||||
private async Task RecordFromMediaSource(MediaSourceInfo mediaSource, string targetFile, TimeSpan duration, Action onStarted, CancellationToken cancellationToken)
|
||||
|
||||
@@ -156,11 +156,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
|
||||
await taskCompletionSource.Task.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public string GetFilePath()
|
||||
{
|
||||
return TempFilePath;
|
||||
}
|
||||
|
||||
private async Task StartStreaming(UdpClient udpClient, HdHomerunManager hdHomerunManager, IPAddress remoteAddress, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
|
||||
{
|
||||
using (udpClient)
|
||||
@@ -184,7 +179,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
|
||||
EnableStreamSharing = false;
|
||||
}
|
||||
|
||||
await DeleteTempFiles(new List<string> { TempFilePath }).ConfigureAwait(false);
|
||||
await DeleteTempFiles(TempFilePath).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private async Task CopyTo(UdpClient udpClient, string file, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
|
||||
|
||||
@@ -3,10 +3,8 @@
|
||||
#pragma warning disable CS1591
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Globalization;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MediaBrowser.Common.Configuration;
|
||||
@@ -97,6 +95,18 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Stream GetStream()
|
||||
{
|
||||
var stream = GetInputStream(TempFilePath, AsyncFile.UseAsyncIO);
|
||||
bool seekFile = (DateTime.UtcNow - DateOpened).TotalSeconds > 10;
|
||||
if (seekFile)
|
||||
{
|
||||
TrySeek(stream, -20000);
|
||||
}
|
||||
|
||||
return stream;
|
||||
}
|
||||
|
||||
protected FileStream GetInputStream(string path, bool allowAsyncFileRead)
|
||||
=> new FileStream(
|
||||
path,
|
||||
@@ -106,112 +116,29 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
|
||||
IODefaults.FileStreamBufferSize,
|
||||
allowAsyncFileRead ? FileOptions.SequentialScan | FileOptions.Asynchronous : FileOptions.SequentialScan);
|
||||
|
||||
public Task DeleteTempFiles()
|
||||
{
|
||||
return DeleteTempFiles(GetStreamFilePaths());
|
||||
}
|
||||
|
||||
protected async Task DeleteTempFiles(IEnumerable<string> paths, int retryCount = 0)
|
||||
protected async Task DeleteTempFiles(string path, int retryCount = 0)
|
||||
{
|
||||
if (retryCount == 0)
|
||||
{
|
||||
Logger.LogInformation("Deleting temp files {0}", paths);
|
||||
Logger.LogInformation("Deleting temp file {FilePath}", path);
|
||||
}
|
||||
|
||||
var failedFiles = new List<string>();
|
||||
|
||||
foreach (var path in paths)
|
||||
try
|
||||
{
|
||||
if (!File.Exists(path))
|
||||
FileSystem.DeleteFile(path);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogError(ex, "Error deleting file {FilePath}", path);
|
||||
if (retryCount <= 40)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
FileSystem.DeleteFile(path);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogError(ex, "Error deleting file {path}", path);
|
||||
failedFiles.Add(path);
|
||||
await Task.Delay(500).ConfigureAwait(false);
|
||||
await DeleteTempFiles(path, retryCount + 1).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
if (failedFiles.Count > 0 && retryCount <= 40)
|
||||
{
|
||||
await Task.Delay(500).ConfigureAwait(false);
|
||||
await DeleteTempFiles(failedFiles, retryCount + 1).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
protected virtual List<string> GetStreamFilePaths()
|
||||
{
|
||||
return new List<string> { TempFilePath };
|
||||
}
|
||||
|
||||
public async Task CopyToAsync(Stream stream, CancellationToken cancellationToken)
|
||||
{
|
||||
using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, LiveStreamCancellationTokenSource.Token);
|
||||
cancellationToken = linkedCancellationTokenSource.Token;
|
||||
|
||||
bool seekFile = (DateTime.UtcNow - DateOpened).TotalSeconds > 10;
|
||||
|
||||
var nextFileInfo = GetNextFile(null);
|
||||
var nextFile = nextFileInfo.file;
|
||||
var isLastFile = nextFileInfo.isLastFile;
|
||||
|
||||
var allowAsync = AsyncFile.UseAsyncIO;
|
||||
while (!string.IsNullOrEmpty(nextFile))
|
||||
{
|
||||
var emptyReadLimit = isLastFile ? EmptyReadLimit : 1;
|
||||
|
||||
await CopyFile(nextFile, seekFile, emptyReadLimit, allowAsync, stream, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
seekFile = false;
|
||||
nextFileInfo = GetNextFile(nextFile);
|
||||
nextFile = nextFileInfo.file;
|
||||
isLastFile = nextFileInfo.isLastFile;
|
||||
}
|
||||
|
||||
Logger.LogInformation("Live Stream ended.");
|
||||
}
|
||||
|
||||
private (string file, bool isLastFile) GetNextFile(string currentFile)
|
||||
{
|
||||
var files = GetStreamFilePaths();
|
||||
|
||||
if (string.IsNullOrEmpty(currentFile))
|
||||
{
|
||||
return (files[^1], true);
|
||||
}
|
||||
|
||||
var nextIndex = files.FindIndex(i => string.Equals(i, currentFile, StringComparison.OrdinalIgnoreCase)) + 1;
|
||||
|
||||
var isLastFile = nextIndex == files.Count - 1;
|
||||
|
||||
return (files.ElementAtOrDefault(nextIndex), isLastFile);
|
||||
}
|
||||
|
||||
private async Task CopyFile(string path, bool seekFile, int emptyReadLimit, bool allowAsync, Stream stream, CancellationToken cancellationToken)
|
||||
{
|
||||
using (var inputStream = GetInputStream(path, allowAsync))
|
||||
{
|
||||
if (seekFile)
|
||||
{
|
||||
TrySeek(inputStream, -20000);
|
||||
}
|
||||
|
||||
await StreamHelper.CopyToAsync(
|
||||
inputStream,
|
||||
stream,
|
||||
IODefaults.CopyToBufferSize,
|
||||
emptyReadLimit,
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private void TrySeek(FileStream stream, long offset)
|
||||
private void TrySeek(Stream stream, long offset)
|
||||
{
|
||||
if (!stream.CanSeek)
|
||||
{
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
#pragma warning disable CS1591
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Globalization;
|
||||
using System.IO;
|
||||
using System.Net.Http;
|
||||
@@ -55,39 +54,26 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
|
||||
Directory.CreateDirectory(Path.GetDirectoryName(TempFilePath));
|
||||
|
||||
var typeName = GetType().Name;
|
||||
Logger.LogInformation("Opening " + typeName + " Live stream from {0}", url);
|
||||
Logger.LogInformation("Opening {StreamType} Live stream from {Url}", typeName, url);
|
||||
|
||||
// Response stream is disposed manually.
|
||||
var response = await _httpClientFactory.CreateClient(NamedClient.Default)
|
||||
.GetAsync(url, HttpCompletionOption.ResponseHeadersRead, CancellationToken.None)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
var extension = "ts";
|
||||
var requiresRemux = false;
|
||||
|
||||
var contentType = response.Content.Headers.ContentType?.ToString() ?? string.Empty;
|
||||
if (contentType.IndexOf("matroska", StringComparison.OrdinalIgnoreCase) != -1)
|
||||
if (contentType.Contains("matroska", StringComparison.OrdinalIgnoreCase)
|
||||
|| contentType.Contains("mp4", StringComparison.OrdinalIgnoreCase)
|
||||
|| contentType.Contains("dash", StringComparison.OrdinalIgnoreCase)
|
||||
|| contentType.Contains("mpegURL", StringComparison.OrdinalIgnoreCase)
|
||||
|| contentType.Contains("text/", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
requiresRemux = true;
|
||||
}
|
||||
else if (contentType.IndexOf("mp4", StringComparison.OrdinalIgnoreCase) != -1 ||
|
||||
contentType.IndexOf("dash", StringComparison.OrdinalIgnoreCase) != -1 ||
|
||||
contentType.IndexOf("mpegURL", StringComparison.OrdinalIgnoreCase) != -1 ||
|
||||
contentType.IndexOf("text/", StringComparison.OrdinalIgnoreCase) != -1)
|
||||
{
|
||||
requiresRemux = true;
|
||||
// Close the stream without any sharing features
|
||||
response.Dispose();
|
||||
return;
|
||||
}
|
||||
|
||||
// Close the stream without any sharing features
|
||||
if (requiresRemux)
|
||||
{
|
||||
using (response)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
SetTempFilePath(extension);
|
||||
SetTempFilePath("ts");
|
||||
|
||||
var taskCompletionSource = new TaskCompletionSource<bool>();
|
||||
|
||||
@@ -117,16 +103,11 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
|
||||
|
||||
if (!taskCompletionSource.Task.Result)
|
||||
{
|
||||
Logger.LogWarning("Zero bytes copied from stream {0} to {1} but no exception raised", GetType().Name, TempFilePath);
|
||||
Logger.LogWarning("Zero bytes copied from stream {StreamType} to {FilePath} but no exception raised", GetType().Name, TempFilePath);
|
||||
throw new EndOfStreamException(string.Format(CultureInfo.InvariantCulture, "Zero bytes copied from stream {0}", GetType().Name));
|
||||
}
|
||||
}
|
||||
|
||||
public string GetFilePath()
|
||||
{
|
||||
return TempFilePath;
|
||||
}
|
||||
|
||||
private Task StartStreaming(HttpResponseMessage response, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.Run(
|
||||
@@ -134,7 +115,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
|
||||
{
|
||||
try
|
||||
{
|
||||
Logger.LogInformation("Beginning {0} stream to {1}", GetType().Name, TempFilePath);
|
||||
Logger.LogInformation("Beginning {StreamType} stream to {FilePath}", GetType().Name, TempFilePath);
|
||||
using var message = response;
|
||||
await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
|
||||
await using var fileStream = new FileStream(TempFilePath, FileMode.Create, FileAccess.Write, FileShare.Read, IODefaults.FileStreamBufferSize, AsyncFile.UseAsyncIO);
|
||||
@@ -147,19 +128,19 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
|
||||
}
|
||||
catch (OperationCanceledException ex)
|
||||
{
|
||||
Logger.LogInformation("Copying of {0} to {1} was canceled", GetType().Name, TempFilePath);
|
||||
Logger.LogInformation("Copying of {StreamType} to {FilePath} was canceled", GetType().Name, TempFilePath);
|
||||
openTaskCompletionSource.TrySetException(ex);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogError(ex, "Error copying live stream {0} to {1}.", GetType().Name, TempFilePath);
|
||||
Logger.LogError(ex, "Error copying live stream {StreamType} to {FilePath}", GetType().Name, TempFilePath);
|
||||
openTaskCompletionSource.TrySetException(ex);
|
||||
}
|
||||
|
||||
openTaskCompletionSource.TrySetResult(false);
|
||||
|
||||
EnableStreamSharing = false;
|
||||
await DeleteTempFiles(new List<string> { TempFilePath }).ConfigureAwait(false);
|
||||
await DeleteTempFiles(TempFilePath).ConfigureAwait(false);
|
||||
},
|
||||
CancellationToken.None);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user