mirror of
https://github.com/jellyfin/jellyfin.git
synced 2026-06-02 13:58:29 +01:00
Use ReadAtLeastAsync to handle short-reads.
Seeks to beginning of streams if CanSeek is true. Added remarks about stream position. Add test coverage for short-reads. Fix fast-path tests to actually test the fast path. Also fix class comment.
This commit is contained in:
@@ -11,7 +11,7 @@ using System.Threading.Tasks;
|
||||
namespace Jellyfin.Extensions
|
||||
{
|
||||
/// <summary>
|
||||
/// Class BaseExtensions.
|
||||
/// Extension methods for the <see cref="Stream"/> class.
|
||||
/// </summary>
|
||||
public static class StreamExtensions
|
||||
{
|
||||
@@ -74,7 +74,11 @@ namespace Jellyfin.Extensions
|
||||
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
|
||||
/// <returns>True if the stream and file are identical; otherwise false.</returns>
|
||||
/// <exception cref="ArgumentException"><paramref name="stream"/> does not support seeking.</exception>
|
||||
public static async Task<bool> IsFileIdenticalAsync(this Stream stream, string path, CancellationToken cancellationToken)
|
||||
/// <remarks>
|
||||
/// The entire stream is compared against the file from the beginning (the position is reset to 0 on entry)
|
||||
/// and restored to its original value after the call.
|
||||
/// </remarks>
|
||||
public static async Task<bool> IsFileIdenticalAsync(this Stream stream, string path, CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(stream);
|
||||
ArgumentException.ThrowIfNullOrEmpty(path);
|
||||
@@ -114,11 +118,31 @@ namespace Jellyfin.Extensions
|
||||
/// <param name="b">The second stream to compare.</param>
|
||||
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
|
||||
/// <returns>True if the streams are identical; otherwise false.</returns>
|
||||
public static async Task<bool> IsStreamIdenticalAsync(this Stream a, Stream b, CancellationToken cancellationToken)
|
||||
/// <remarks>
|
||||
/// Seekable streams are compared from the beginning (their position is reset to 0 on entry).
|
||||
/// Non-seekable streams are compared from their current read position. Stream positions are not
|
||||
/// restored after the call.
|
||||
/// </remarks>
|
||||
public static async Task<bool> IsStreamIdenticalAsync(this Stream a, Stream b, CancellationToken cancellationToken = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(a);
|
||||
ArgumentNullException.ThrowIfNull(b);
|
||||
|
||||
if (ReferenceEquals(a, b))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if (a.CanSeek)
|
||||
{
|
||||
a.Position = 0;
|
||||
}
|
||||
|
||||
if (b.CanSeek)
|
||||
{
|
||||
b.Position = 0;
|
||||
}
|
||||
|
||||
if (a.CanSeek && b.CanSeek && b.Length != a.Length)
|
||||
{
|
||||
return false;
|
||||
@@ -145,9 +169,9 @@ namespace Jellyfin.Extensions
|
||||
var memoryB = bufferB.AsMemory();
|
||||
int offset = 0;
|
||||
int bytesRead;
|
||||
while ((bytesRead = await b.ReadAsync(memoryB, cancellationToken).ConfigureAwait(false)) > 0)
|
||||
while ((bytesRead = await b.ReadAtLeastAsync(memoryB, memoryB.Length, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false)) > 0)
|
||||
{
|
||||
if (!segmentA.AsSpan(offset, bytesRead).SequenceEqual(memoryB.Span[..bytesRead]))
|
||||
if (offset + bytesRead > segmentA.Count || !segmentA.AsSpan(offset, bytesRead).SequenceEqual(memoryB.Span[..bytesRead]))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
@@ -174,8 +198,8 @@ namespace Jellyfin.Extensions
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
var bytesReadA = await a.ReadAsync(memoryA, cancellationToken).ConfigureAwait(false);
|
||||
var bytesReadB = await b.ReadAsync(memoryB, cancellationToken).ConfigureAwait(false);
|
||||
var bytesReadA = await a.ReadAtLeastAsync(memoryA, memoryA.Length, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false);
|
||||
var bytesReadB = await b.ReadAtLeastAsync(memoryB, memoryB.Length, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (bytesReadA != bytesReadB)
|
||||
{
|
||||
|
||||
@@ -2,7 +2,6 @@ using System;
|
||||
using System.IO;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Jellyfin.Extensions;
|
||||
using Xunit;
|
||||
|
||||
namespace Jellyfin.Extensions.Tests;
|
||||
@@ -65,8 +64,12 @@ public class StreamExtensionsTests
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task IsFileIdenticalAsync_UsesStartOfStreamAndRestoresPosition_OnMatch()
|
||||
// Both publiclyVisible values are exercised so the test runs once under the fast path
|
||||
// (TryGetBuffer succeeds) and once under the slow path (TryGetBuffer returns false).
|
||||
[Theory]
|
||||
[InlineData(true)]
|
||||
[InlineData(false)]
|
||||
public async Task IsFileIdenticalAsync_UsesStartOfStreamAndRestoresPosition_OnMatch(bool publiclyVisible)
|
||||
{
|
||||
var cancellationToken = TestContext.Current.CancellationToken;
|
||||
var path = Path.Join(Path.GetTempPath(), Path.GetRandomFileName());
|
||||
@@ -75,7 +78,7 @@ public class StreamExtensionsTests
|
||||
|
||||
try
|
||||
{
|
||||
await using var stream = new MemoryStream(bytes);
|
||||
await using var stream = CreateMemoryStream(bytes, publiclyVisible);
|
||||
stream.Position = 3;
|
||||
|
||||
var result = await stream.IsFileIdenticalAsync(path, cancellationToken);
|
||||
@@ -89,8 +92,10 @@ public class StreamExtensionsTests
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task IsFileIdenticalAsync_RestoresPosition_OnMismatch()
|
||||
[Theory]
|
||||
[InlineData(true)]
|
||||
[InlineData(false)]
|
||||
public async Task IsFileIdenticalAsync_RestoresPosition_OnMismatch(bool publiclyVisible)
|
||||
{
|
||||
var cancellationToken = TestContext.Current.CancellationToken;
|
||||
var path = Path.Join(Path.GetTempPath(), Path.GetRandomFileName());
|
||||
@@ -98,7 +103,7 @@ public class StreamExtensionsTests
|
||||
|
||||
try
|
||||
{
|
||||
await using var stream = new MemoryStream(new byte[] { 10, 20, 30, 40, 50 });
|
||||
await using var stream = CreateMemoryStream(new byte[] { 10, 20, 30, 40, 50 }, publiclyVisible);
|
||||
stream.Position = 2;
|
||||
|
||||
var result = await stream.IsFileIdenticalAsync(path, cancellationToken);
|
||||
@@ -112,6 +117,96 @@ public class StreamExtensionsTests
|
||||
}
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(true)]
|
||||
[InlineData(false)]
|
||||
public async Task IsStreamIdenticalAsync_BothMemoryStreams_NonZeroPositions_SeeksToStart(bool publiclyVisible)
|
||||
{
|
||||
var cancellationToken = TestContext.Current.CancellationToken;
|
||||
await using var a = CreateMemoryStream(new byte[] { 1, 2, 3, 4, 5 }, publiclyVisible);
|
||||
await using var b = CreateMemoryStream(new byte[] { 1, 2, 3, 4, 5 }, publiclyVisible);
|
||||
a.Position = 3;
|
||||
b.Position = 1;
|
||||
|
||||
var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
|
||||
|
||||
Assert.True(result);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(true)]
|
||||
[InlineData(false)]
|
||||
public async Task IsStreamIdenticalAsync_MemoryStreamPairedWithSeekableNonMemoryStream_NonZeroPositions_SeeksToStart(bool publiclyVisible)
|
||||
{
|
||||
var cancellationToken = TestContext.Current.CancellationToken;
|
||||
await using var a = CreateMemoryStream(new byte[] { 1, 2, 3, 4 }, publiclyVisible);
|
||||
await using var b = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 });
|
||||
a.Position = 2;
|
||||
b.Position = 3;
|
||||
|
||||
var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
|
||||
|
||||
Assert.True(result);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(true)]
|
||||
[InlineData(false)]
|
||||
public async Task IsStreamIdenticalAsync_NonMemoryStreamPairedWithMemoryStream_Swaps_ReturnsTrue(bool publiclyVisible)
|
||||
{
|
||||
var cancellationToken = TestContext.Current.CancellationToken;
|
||||
await using var a = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 });
|
||||
await using var b = CreateMemoryStream(new byte[] { 1, 2, 3, 4 }, publiclyVisible);
|
||||
|
||||
var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
|
||||
|
||||
Assert.True(result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task IsStreamIdenticalAsync_BothSeekableNonMemoryStreams_NonZeroPositions_SeeksToStart()
|
||||
{
|
||||
var cancellationToken = TestContext.Current.CancellationToken;
|
||||
await using var a = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 });
|
||||
await using var b = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 });
|
||||
a.Position = 1;
|
||||
b.Position = 2;
|
||||
|
||||
var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
|
||||
|
||||
Assert.True(result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task IsStreamIdenticalAsync_NonSeekableShortReads_Identical_ReturnsTrue()
|
||||
{
|
||||
var cancellationToken = TestContext.Current.CancellationToken;
|
||||
var data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
|
||||
await using var a = new ShortReadingNonSeekableStream(data, maxReadSize: 3);
|
||||
await using var b = new ShortReadingNonSeekableStream(data, maxReadSize: 5);
|
||||
|
||||
var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
|
||||
|
||||
Assert.True(result);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task IsStreamIdenticalAsync_NonSeekableShortReads_DifferentLengths_ReturnsFalse()
|
||||
{
|
||||
var cancellationToken = TestContext.Current.CancellationToken;
|
||||
await using var a = new ShortReadingNonSeekableStream(new byte[] { 1, 2, 3, 4 }, maxReadSize: 3);
|
||||
await using var b = new ShortReadingNonSeekableStream(new byte[] { 1, 2, 3, 4, 5 }, maxReadSize: 5);
|
||||
|
||||
var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
|
||||
|
||||
Assert.False(result);
|
||||
}
|
||||
|
||||
private static MemoryStream CreateMemoryStream(byte[] data, bool publiclyVisible)
|
||||
=> publiclyVisible
|
||||
? new MemoryStream(data, 0, data.Length, writable: false, publiclyVisible: true)
|
||||
: new MemoryStream(data);
|
||||
|
||||
private sealed class NonSeekableReadStream : Stream
|
||||
{
|
||||
private readonly Stream _inner;
|
||||
@@ -173,4 +268,130 @@ public class StreamExtensionsTests
|
||||
await base.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class SeekableNonMemoryStream : Stream
|
||||
{
|
||||
private readonly MemoryStream _inner;
|
||||
|
||||
public SeekableNonMemoryStream(byte[] data)
|
||||
{
|
||||
_inner = new MemoryStream(data, writable: false);
|
||||
}
|
||||
|
||||
public override bool CanRead => true;
|
||||
|
||||
public override bool CanSeek => true;
|
||||
|
||||
public override bool CanWrite => false;
|
||||
|
||||
public override long Length => _inner.Length;
|
||||
|
||||
public override long Position
|
||||
{
|
||||
get => _inner.Position;
|
||||
set => _inner.Position = value;
|
||||
}
|
||||
|
||||
public override void Flush()
|
||||
{
|
||||
}
|
||||
|
||||
public override int Read(byte[] buffer, int offset, int count)
|
||||
=> _inner.Read(buffer, offset, count);
|
||||
|
||||
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
|
||||
=> _inner.ReadAsync(buffer, cancellationToken);
|
||||
|
||||
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
||||
=> _inner.ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
|
||||
|
||||
public override long Seek(long offset, SeekOrigin origin)
|
||||
=> _inner.Seek(offset, origin);
|
||||
|
||||
public override void SetLength(long value)
|
||||
=> throw new NotSupportedException();
|
||||
|
||||
public override void Write(byte[] buffer, int offset, int count)
|
||||
=> throw new NotSupportedException();
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
if (disposing)
|
||||
{
|
||||
_inner.Dispose();
|
||||
}
|
||||
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
|
||||
public override async ValueTask DisposeAsync()
|
||||
{
|
||||
await _inner.DisposeAsync();
|
||||
await base.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class ShortReadingNonSeekableStream : Stream
|
||||
{
|
||||
private readonly Stream _inner;
|
||||
private readonly int _maxReadSize;
|
||||
|
||||
public ShortReadingNonSeekableStream(byte[] data, int maxReadSize)
|
||||
{
|
||||
_inner = new MemoryStream(data, writable: false);
|
||||
_maxReadSize = maxReadSize;
|
||||
}
|
||||
|
||||
public override bool CanRead => true;
|
||||
|
||||
public override bool CanSeek => false;
|
||||
|
||||
public override bool CanWrite => false;
|
||||
|
||||
public override long Length => throw new NotSupportedException();
|
||||
|
||||
public override long Position
|
||||
{
|
||||
get => throw new NotSupportedException();
|
||||
set => throw new NotSupportedException();
|
||||
}
|
||||
|
||||
public override void Flush()
|
||||
{
|
||||
}
|
||||
|
||||
public override int Read(byte[] buffer, int offset, int count)
|
||||
=> _inner.Read(buffer, offset, Math.Min(count, _maxReadSize));
|
||||
|
||||
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
|
||||
=> _inner.ReadAsync(buffer[..Math.Min(buffer.Length, _maxReadSize)], cancellationToken);
|
||||
|
||||
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
||||
=> _inner.ReadAsync(buffer.AsMemory(offset, Math.Min(count, _maxReadSize)), cancellationToken).AsTask();
|
||||
|
||||
public override long Seek(long offset, SeekOrigin origin)
|
||||
=> throw new NotSupportedException();
|
||||
|
||||
public override void SetLength(long value)
|
||||
=> throw new NotSupportedException();
|
||||
|
||||
public override void Write(byte[] buffer, int offset, int count)
|
||||
=> throw new NotSupportedException();
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
if (disposing)
|
||||
{
|
||||
_inner.Dispose();
|
||||
}
|
||||
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
|
||||
public override async ValueTask DisposeAsync()
|
||||
{
|
||||
await _inner.DisposeAsync();
|
||||
await base.DisposeAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user