From 961c6d3d547a1e3c6352129b45fffd1158571d15 Mon Sep 17 00:00:00 2001 From: Marc Brooks Date: Fri, 22 May 2026 16:53:58 -0500 Subject: [PATCH 1/8] Compare old file byte-by-byte to new stream Don't overwrite if identical. --- .../Savers/BaseNfoSaver.cs | 76 ++++++++++++++++++- 1 file changed, 73 insertions(+), 3 deletions(-) diff --git a/MediaBrowser.XbmcMetadata/Savers/BaseNfoSaver.cs b/MediaBrowser.XbmcMetadata/Savers/BaseNfoSaver.cs index ed32e6c76a..64daac68e3 100644 --- a/MediaBrowser.XbmcMetadata/Savers/BaseNfoSaver.cs +++ b/MediaBrowser.XbmcMetadata/Savers/BaseNfoSaver.cs @@ -198,15 +198,23 @@ namespace MediaBrowser.XbmcMetadata.Savers cancellationToken.ThrowIfCancellationRequested(); - await SaveToFileAsync(memoryStream, path).ConfigureAwait(false); + await SaveToFileAsync(memoryStream, path, cancellationToken).ConfigureAwait(false); } } - private async Task SaveToFileAsync(Stream stream, string path) + private async Task SaveToFileAsync(Stream stream, string path, CancellationToken cancellationToken) { var directory = Path.GetDirectoryName(path) ?? throw new ArgumentException($"Provided path ({path}) is not valid.", nameof(path)); Directory.CreateDirectory(directory); + // Compare byte-for-byte before proceeding. + if (File.Exists(path) && await IsFileIdenticalAsync(stream, path, cancellationToken).ConfigureAwait(false)) + { + return; // Don't save since .nfo is unchanged. + } + + stream.Position = 0; + // On Windows, saving the file will fail if the file is hidden or readonly FileSystem.SetAttributes(path, false, false); @@ -222,7 +230,7 @@ namespace MediaBrowser.XbmcMetadata.Savers var filestream = new FileStream(path, fileStreamOptions); await using (filestream.ConfigureAwait(false)) { - await stream.CopyToAsync(filestream).ConfigureAwait(false); + await stream.CopyToAsync(filestream, cancellationToken).ConfigureAwait(false); } if (ConfigurationManager.Configuration.SaveMetadataHidden) @@ -231,6 +239,68 @@ namespace MediaBrowser.XbmcMetadata.Savers } } + private static async Task IsFileIdenticalAsync(Stream stream, string path, CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(stream); + ArgumentException.ThrowIfNullOrEmpty(path); + + if (!stream.CanSeek) + { + return false; + } + + const int BufferSize = 81920; + var originalPosition = stream.Position; + + try + { + stream.Position = 0; + + using var existingFileStream = new FileStream( + path, + FileMode.Open, + FileAccess.Read, + FileShare.Read, + bufferSize: BufferSize, + FileOptions.Asynchronous); + + if (existingFileStream.Length != stream.Length) + { + return false; + } + + var streamBuffer = new byte[BufferSize]; + var existingBuffer = new byte[BufferSize]; + + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + + var streamBytesRead = await stream.ReadAsync(streamBuffer.AsMemory(), cancellationToken).ConfigureAwait(false); + var existingBytesRead = await existingFileStream.ReadAsync(existingBuffer.AsMemory(), cancellationToken).ConfigureAwait(false); + + if (streamBytesRead != existingBytesRead) + { + return false; + } + + if (streamBytesRead == 0) + { + return true; + } + + if (!streamBuffer.AsSpan(0, streamBytesRead).SequenceEqual(existingBuffer.AsSpan(0, existingBytesRead))) + { + return false; + } + } + } + finally + { + stream.Position = originalPosition; + } + } + private void SetHidden(string path, bool hidden) { try From 02ca63cd13779dbff9971e10a7afd62d2634337b Mon Sep 17 00:00:00 2001 From: Marc Brooks Date: Tue, 26 May 2026 22:37:17 +0000 Subject: [PATCH 2/8] Moved IsFileIdenticalAsync & IsStreamIdenticalAsync to StreamExtensions. --- .../Savers/BaseNfoSaver.cs | 64 +------------ src/Jellyfin.Extensions/StreamExtensions.cs | 96 +++++++++++++++++++ 2 files changed, 97 insertions(+), 63 deletions(-) diff --git a/MediaBrowser.XbmcMetadata/Savers/BaseNfoSaver.cs b/MediaBrowser.XbmcMetadata/Savers/BaseNfoSaver.cs index 64daac68e3..78907a5e68 100644 --- a/MediaBrowser.XbmcMetadata/Savers/BaseNfoSaver.cs +++ b/MediaBrowser.XbmcMetadata/Savers/BaseNfoSaver.cs @@ -208,7 +208,7 @@ namespace MediaBrowser.XbmcMetadata.Savers Directory.CreateDirectory(directory); // Compare byte-for-byte before proceeding. - if (File.Exists(path) && await IsFileIdenticalAsync(stream, path, cancellationToken).ConfigureAwait(false)) + if (File.Exists(path) && await stream.IsFileIdenticalAsync(path, cancellationToken).ConfigureAwait(false)) { return; // Don't save since .nfo is unchanged. } @@ -239,68 +239,6 @@ namespace MediaBrowser.XbmcMetadata.Savers } } - private static async Task IsFileIdenticalAsync(Stream stream, string path, CancellationToken cancellationToken) - { - ArgumentNullException.ThrowIfNull(stream); - ArgumentException.ThrowIfNullOrEmpty(path); - - if (!stream.CanSeek) - { - return false; - } - - const int BufferSize = 81920; - var originalPosition = stream.Position; - - try - { - stream.Position = 0; - - using var existingFileStream = new FileStream( - path, - FileMode.Open, - FileAccess.Read, - FileShare.Read, - bufferSize: BufferSize, - FileOptions.Asynchronous); - - if (existingFileStream.Length != stream.Length) - { - return false; - } - - var streamBuffer = new byte[BufferSize]; - var existingBuffer = new byte[BufferSize]; - - while (true) - { - cancellationToken.ThrowIfCancellationRequested(); - - var streamBytesRead = await stream.ReadAsync(streamBuffer.AsMemory(), cancellationToken).ConfigureAwait(false); - var existingBytesRead = await existingFileStream.ReadAsync(existingBuffer.AsMemory(), cancellationToken).ConfigureAwait(false); - - if (streamBytesRead != existingBytesRead) - { - return false; - } - - if (streamBytesRead == 0) - { - return true; - } - - if (!streamBuffer.AsSpan(0, streamBytesRead).SequenceEqual(existingBuffer.AsSpan(0, existingBytesRead))) - { - return false; - } - } - } - finally - { - stream.Position = originalPosition; - } - } - private void SetHidden(string path, bool hidden) { try diff --git a/src/Jellyfin.Extensions/StreamExtensions.cs b/src/Jellyfin.Extensions/StreamExtensions.cs index 0cfac384e3..fa019b0059 100644 --- a/src/Jellyfin.Extensions/StreamExtensions.cs +++ b/src/Jellyfin.Extensions/StreamExtensions.cs @@ -1,9 +1,12 @@ +using System; +using System.Buffers; using System.Collections.Generic; using System.IO; using System.Linq; using System.Runtime.CompilerServices; using System.Text; using System.Threading; +using System.Threading.Tasks; namespace Jellyfin.Extensions { @@ -12,6 +15,8 @@ namespace Jellyfin.Extensions /// public static class StreamExtensions { + private const int StreamComparisonBufferSize = 65536; + /// /// Reads all lines in the . /// @@ -60,5 +65,96 @@ namespace Jellyfin.Extensions yield return line; } } + + /// + /// Determines whether a stream is identical to a file on disk. + /// + /// The stream to compare. + /// The file path to compare against. + /// The token to monitor for cancellation requests. + /// True if the stream and file are identical; otherwise false. + public static async Task IsFileIdenticalAsync(this Stream stream, string path, CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(stream); + ArgumentException.ThrowIfNullOrEmpty(path); + + if (!stream.CanSeek) + { + return false; + } + + var originalPosition = stream.Position; + try + { + stream.Position = 0; + + var existingFileStream = new FileStream( + path, + FileMode.Open, + FileAccess.Read, + FileShare.Read, + bufferSize: StreamComparisonBufferSize, + FileOptions.Asynchronous | FileOptions.SequentialScan); + await using (existingFileStream.ConfigureAwait(false)) + { + return await stream.IsStreamIdenticalAsync(existingFileStream, cancellationToken).ConfigureAwait(false); + } + } + finally + { + stream.Position = originalPosition; + } + } + + /// + /// Determines whether two streams are identical. + /// + /// The first stream to compare. + /// The second stream to compare. + /// The token to monitor for cancellation requests. + /// True if the streams are identical; otherwise false. + public static async Task IsStreamIdenticalAsync(this Stream a, Stream b, CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(a); + ArgumentNullException.ThrowIfNull(b); + + if (b.Length != a.Length) + { + return false; + } + + var bufferA = ArrayPool.Shared.Rent(StreamComparisonBufferSize); + var bufferB = ArrayPool.Shared.Rent(StreamComparisonBufferSize); + try + { + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + + var bytesReadA = await a.ReadAsync(bufferA.AsMemory(), cancellationToken).ConfigureAwait(false); + var bytesReadB = await b.ReadAsync(bufferB.AsMemory(), cancellationToken).ConfigureAwait(false); + + if (bytesReadA != bytesReadB) + { + return false; + } + + if (bytesReadA == 0) + { + return true; + } + + if (!bufferA.AsSpan(0, bytesReadA).SequenceEqual(bufferB.AsSpan(0, bytesReadB))) + { + return false; + } + } + } + finally + { + ArrayPool.Shared.Return(bufferA); + ArrayPool.Shared.Return(bufferB); + } + } } } From c449a933722980625640e56bfe5dbd746214b5a8 Mon Sep 17 00:00:00 2001 From: Marc Brooks Date: Tue, 26 May 2026 23:11:01 +0000 Subject: [PATCH 3/8] Explicitly handle MemoryStream(s) --- src/Jellyfin.Extensions/StreamExtensions.cs | 91 ++++++++++++++++----- 1 file changed, 69 insertions(+), 22 deletions(-) diff --git a/src/Jellyfin.Extensions/StreamExtensions.cs b/src/Jellyfin.Extensions/StreamExtensions.cs index fa019b0059..ed3f6e665d 100644 --- a/src/Jellyfin.Extensions/StreamExtensions.cs +++ b/src/Jellyfin.Extensions/StreamExtensions.cs @@ -123,37 +123,84 @@ namespace Jellyfin.Extensions return false; } - var bufferA = ArrayPool.Shared.Rent(StreamComparisonBufferSize); - var bufferB = ArrayPool.Shared.Rent(StreamComparisonBufferSize); - try + // If b is MemoryStream but a is not, swap them to use fast path B + if (b is MemoryStream && a is not MemoryStream) { - while (true) + (a, b) = (b, a); + } + + if (a is MemoryStream ms_a) + { + var bufferA = ms_a.GetBuffer(); + + // Fast path A: if both streams are MemoryStreams, compare directly against each other + if (b is MemoryStream ms_b) { - cancellationToken.ThrowIfCancellationRequested(); + return bufferA.AsSpan(0, (int)ms_a.Length).SequenceEqual(ms_b.GetBuffer().AsSpan(0, (int)ms_b.Length)); + } - var bytesReadA = await a.ReadAsync(bufferA.AsMemory(), cancellationToken).ConfigureAwait(false); - var bytesReadB = await b.ReadAsync(bufferB.AsMemory(), cancellationToken).ConfigureAwait(false); - - if (bytesReadA != bytesReadB) + // Fast path B: only first stream is a MemoryStream, compare against second stream chunk-by-chunk + var bufferB = ArrayPool.Shared.Rent(StreamComparisonBufferSize); + try + { + var memoryB = bufferB.AsMemory(); + int offset = 0; + int bytesRead; + while ((bytesRead = await b.ReadAsync(memoryB, cancellationToken).ConfigureAwait(false)) > 0) { - return false; + cancellationToken.ThrowIfCancellationRequested(); + + if (!bufferA.AsSpan(offset, bytesRead).SequenceEqual(bufferB.AsSpan(0, bytesRead))) + { + return false; + } + + offset += bytesRead; } - if (bytesReadA == 0) - { - return true; - } - - if (!bufferA.AsSpan(0, bytesReadA).SequenceEqual(bufferB.AsSpan(0, bytesReadB))) - { - return false; - } + return offset == ms_a.Length; + } + finally + { + ArrayPool.Shared.Return(bufferB); } } - finally + else { - ArrayPool.Shared.Return(bufferA); - ArrayPool.Shared.Return(bufferB); + var bufferA = ArrayPool.Shared.Rent(StreamComparisonBufferSize); + var bufferB = ArrayPool.Shared.Rent(StreamComparisonBufferSize); + try + { + var memoryA = bufferA.AsMemory(); + var memoryB = bufferB.AsMemory(); + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + + var bytesReadA = await a.ReadAsync(memoryA, cancellationToken).ConfigureAwait(false); + var bytesReadB = await b.ReadAsync(memoryB, cancellationToken).ConfigureAwait(false); + + if (bytesReadA != bytesReadB) + { + return false; + } + + if (bytesReadA == 0) + { + return true; + } + + if (!bufferA.AsSpan(0, bytesReadA).SequenceEqual(bufferB.AsSpan(0, bytesReadB))) + { + return false; + } + } + } + finally + { + ArrayPool.Shared.Return(bufferA); + ArrayPool.Shared.Return(bufferB); + } } } } From aa2370e0212333d93ee250e9f2236f9d5bcb3d93 Mon Sep 17 00:00:00 2001 From: Marc Brooks Date: Wed, 27 May 2026 19:53:31 -0500 Subject: [PATCH 4/8] Use TryGetBuffer() on MemoryStreams Also now throws if the streams are no CanSeek. --- src/Jellyfin.Extensions/StreamExtensions.cs | 32 +++++++++++++-------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/src/Jellyfin.Extensions/StreamExtensions.cs b/src/Jellyfin.Extensions/StreamExtensions.cs index ed3f6e665d..56a66b885a 100644 --- a/src/Jellyfin.Extensions/StreamExtensions.cs +++ b/src/Jellyfin.Extensions/StreamExtensions.cs @@ -73,6 +73,7 @@ namespace Jellyfin.Extensions /// The file path to compare against. /// The token to monitor for cancellation requests. /// True if the stream and file are identical; otherwise false. + /// does not support seeking. public static async Task IsFileIdenticalAsync(this Stream stream, string path, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(stream); @@ -80,7 +81,7 @@ namespace Jellyfin.Extensions if (!stream.CanSeek) { - return false; + throw new ArgumentException("Stream must support seeking.", nameof(stream)); } var originalPosition = stream.Position; @@ -113,30 +114,39 @@ namespace Jellyfin.Extensions /// The second stream to compare. /// The token to monitor for cancellation requests. /// True if the streams are identical; otherwise false. + /// or does not support seeking. public static async Task IsStreamIdenticalAsync(this Stream a, Stream b, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(a); ArgumentNullException.ThrowIfNull(b); + if (!a.CanSeek) + { + throw new ArgumentException("Stream must support seeking.", nameof(a)); + } + + if (!b.CanSeek) + { + throw new ArgumentException("Stream must support seeking.", nameof(b)); + } + if (b.Length != a.Length) { return false; } - // If b is MemoryStream but a is not, swap them to use fast path B + // If b is MemoryStream but a is not, swap them to enable fast path B if (b is MemoryStream && a is not MemoryStream) { (a, b) = (b, a); } - if (a is MemoryStream ms_a) + if (a is MemoryStream streamA && streamA.TryGetBuffer(out var segmentA)) { - var bufferA = ms_a.GetBuffer(); - // Fast path A: if both streams are MemoryStreams, compare directly against each other - if (b is MemoryStream ms_b) + if (b is MemoryStream streamB && streamB.TryGetBuffer(out var segmentB)) { - return bufferA.AsSpan(0, (int)ms_a.Length).SequenceEqual(ms_b.GetBuffer().AsSpan(0, (int)ms_b.Length)); + return segmentA.AsSpan().SequenceEqual(segmentB.AsSpan()); } // Fast path B: only first stream is a MemoryStream, compare against second stream chunk-by-chunk @@ -148,9 +158,7 @@ namespace Jellyfin.Extensions int bytesRead; while ((bytesRead = await b.ReadAsync(memoryB, cancellationToken).ConfigureAwait(false)) > 0) { - cancellationToken.ThrowIfCancellationRequested(); - - if (!bufferA.AsSpan(offset, bytesRead).SequenceEqual(bufferB.AsSpan(0, bytesRead))) + if (!segmentA.AsSpan(offset, bytesRead).SequenceEqual(memoryB.Span[..bytesRead])) { return false; } @@ -158,7 +166,7 @@ namespace Jellyfin.Extensions offset += bytesRead; } - return offset == ms_a.Length; + return offset == segmentA.Count; } finally { @@ -190,7 +198,7 @@ namespace Jellyfin.Extensions return true; } - if (!bufferA.AsSpan(0, bytesReadA).SequenceEqual(bufferB.AsSpan(0, bytesReadB))) + if (!memoryA.Span[..bytesReadA].SequenceEqual(memoryB.Span[..bytesReadB])) { return false; } From f12b666cbb1658fb9b98abe59270ee18a9e67085 Mon Sep 17 00:00:00 2001 From: Marc Brooks Date: Wed, 27 May 2026 20:13:52 -0500 Subject: [PATCH 5/8] Remove IsStreamIdenticalAsync CanSeek requirement Now only uses for the Length mismatch. --- src/Jellyfin.Extensions/StreamExtensions.cs | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/src/Jellyfin.Extensions/StreamExtensions.cs b/src/Jellyfin.Extensions/StreamExtensions.cs index 56a66b885a..fb3fd2eac1 100644 --- a/src/Jellyfin.Extensions/StreamExtensions.cs +++ b/src/Jellyfin.Extensions/StreamExtensions.cs @@ -15,7 +15,7 @@ namespace Jellyfin.Extensions /// public static class StreamExtensions { - private const int StreamComparisonBufferSize = 65536; + private const int StreamComparisonBufferSize = 81920; /// /// Reads all lines in the . @@ -114,23 +114,12 @@ namespace Jellyfin.Extensions /// The second stream to compare. /// The token to monitor for cancellation requests. /// True if the streams are identical; otherwise false. - /// or does not support seeking. public static async Task IsStreamIdenticalAsync(this Stream a, Stream b, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(a); ArgumentNullException.ThrowIfNull(b); - if (!a.CanSeek) - { - throw new ArgumentException("Stream must support seeking.", nameof(a)); - } - - if (!b.CanSeek) - { - throw new ArgumentException("Stream must support seeking.", nameof(b)); - } - - if (b.Length != a.Length) + if (a.CanSeek && b.CanSeek && b.Length != a.Length) { return false; } From 175232329612ea8bfc268519e21f6c372e79eea7 Mon Sep 17 00:00:00 2001 From: Marc Brooks Date: Wed, 27 May 2026 20:18:18 -0500 Subject: [PATCH 6/8] Add unit tests for new public methods. --- .../StreamExtensionsTests.cs | 176 ++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs diff --git a/tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs b/tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs new file mode 100644 index 0000000000..f7efee1e6c --- /dev/null +++ b/tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs @@ -0,0 +1,176 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Jellyfin.Extensions; +using Xunit; + +namespace Jellyfin.Extensions.Tests; + +public class StreamExtensionsTests +{ + [Fact] + public async Task IsStreamIdenticalAsync_SeekableDifferentLengths_ReturnsFalse() + { + var cancellationToken = TestContext.Current.CancellationToken; + await using var a = new MemoryStream(new byte[] { 1, 2, 3 }); + await using var b = new MemoryStream(new byte[] { 1, 2, 3, 4 }); + + var result = await a.IsStreamIdenticalAsync(b, cancellationToken); + + Assert.False(result); + } + + [Fact] + public async Task IsStreamIdenticalAsync_NonSeekableIdenticalStreams_ReturnsTrue() + { + var cancellationToken = TestContext.Current.CancellationToken; + await using var a = new NonSeekableReadStream(new byte[] { 1, 2, 3, 4 }); + await using var b = new NonSeekableReadStream(new byte[] { 1, 2, 3, 4 }); + + var result = await a.IsStreamIdenticalAsync(b, cancellationToken); + + Assert.True(result); + } + + [Fact] + public async Task IsStreamIdenticalAsync_NonSeekableDifferentStreams_ReturnsFalse() + { + var cancellationToken = TestContext.Current.CancellationToken; + await using var a = new NonSeekableReadStream(new byte[] { 1, 2, 3, 4 }); + await using var b = new NonSeekableReadStream(new byte[] { 1, 2, 9, 4 }); + + var result = await a.IsStreamIdenticalAsync(b, cancellationToken); + + Assert.False(result); + } + + [Fact] + public async Task IsFileIdenticalAsync_NonSeekableStream_ThrowsArgumentException() + { + var cancellationToken = TestContext.Current.CancellationToken; + var path = Path.Join(Path.GetTempPath(), Path.GetRandomFileName()); + await File.WriteAllBytesAsync(path, new byte[] { 1, 2, 3, 4 }, cancellationToken); + + try + { + await using var stream = new NonSeekableReadStream(new byte[] { 1, 2, 3, 4 }); + + await Assert.ThrowsAsync(async () => + await stream.IsFileIdenticalAsync(path, cancellationToken)); + } + finally + { + File.Delete(path); + } + } + + [Fact] + public async Task IsFileIdenticalAsync_UsesStartOfStreamAndRestoresPosition_OnMatch() + { + var cancellationToken = TestContext.Current.CancellationToken; + var path = Path.Join(Path.GetTempPath(), Path.GetRandomFileName()); + var bytes = new byte[] { 10, 20, 30, 40, 50 }; + await File.WriteAllBytesAsync(path, bytes, cancellationToken); + + try + { + await using var stream = new MemoryStream(bytes); + stream.Position = 3; + + var result = await stream.IsFileIdenticalAsync(path, cancellationToken); + + Assert.True(result); + Assert.Equal(3, stream.Position); + } + finally + { + File.Delete(path); + } + } + + [Fact] + public async Task IsFileIdenticalAsync_RestoresPosition_OnMismatch() + { + var cancellationToken = TestContext.Current.CancellationToken; + var path = Path.Join(Path.GetTempPath(), Path.GetRandomFileName()); + await File.WriteAllBytesAsync(path, new byte[] { 10, 20, 30, 40, 99 }, cancellationToken); + + try + { + await using var stream = new MemoryStream(new byte[] { 10, 20, 30, 40, 50 }); + stream.Position = 2; + + var result = await stream.IsFileIdenticalAsync(path, cancellationToken); + + Assert.False(result); + Assert.Equal(2, stream.Position); + } + finally + { + File.Delete(path); + } + } + + private sealed class NonSeekableReadStream : Stream + { + private readonly Stream _inner; + + public NonSeekableReadStream(byte[] data) + { + _inner = new MemoryStream(data, writable: false); + } + + public override bool CanRead => true; + + public override bool CanSeek => false; + + public override bool CanWrite => false; + + public override long Length => throw new NotSupportedException(); + + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + public override void Flush() + { + } + + public override int Read(byte[] buffer, int offset, int count) + => _inner.Read(buffer, offset, count); + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + => _inner.ReadAsync(buffer, cancellationToken); + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + => _inner.ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask(); + + public override long Seek(long offset, SeekOrigin origin) + => throw new NotSupportedException(); + + public override void SetLength(long value) + => throw new NotSupportedException(); + + public override void Write(byte[] buffer, int offset, int count) + => throw new NotSupportedException(); + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _inner.Dispose(); + } + + base.Dispose(disposing); + } + + public override async ValueTask DisposeAsync() + { + await _inner.DisposeAsync(); + await base.DisposeAsync(); + } + } +} From 645ae6bb99671ec8bd87c6cb78e6fa3d77063c55 Mon Sep 17 00:00:00 2001 From: Marc Brooks Date: Thu, 28 May 2026 13:31:13 -0500 Subject: [PATCH 7/8] Use ReadAtLeastAsync to handle short-reads. Seeks to beginning of streams if CanSeek is true. Added remarks about stream position. Add test coverage for short-reads. Fix fast-path tests to actually test the fast path. Also fix class comment. --- src/Jellyfin.Extensions/StreamExtensions.cs | 38 ++- .../StreamExtensionsTests.cs | 235 +++++++++++++++++- 2 files changed, 259 insertions(+), 14 deletions(-) diff --git a/src/Jellyfin.Extensions/StreamExtensions.cs b/src/Jellyfin.Extensions/StreamExtensions.cs index fb3fd2eac1..15b44d8f40 100644 --- a/src/Jellyfin.Extensions/StreamExtensions.cs +++ b/src/Jellyfin.Extensions/StreamExtensions.cs @@ -11,7 +11,7 @@ using System.Threading.Tasks; namespace Jellyfin.Extensions { /// - /// Class BaseExtensions. + /// Extension methods for the class. /// public static class StreamExtensions { @@ -74,7 +74,11 @@ namespace Jellyfin.Extensions /// The token to monitor for cancellation requests. /// True if the stream and file are identical; otherwise false. /// does not support seeking. - public static async Task IsFileIdenticalAsync(this Stream stream, string path, CancellationToken cancellationToken) + /// + /// The entire stream is compared against the file from the beginning (the position is reset to 0 on entry) + /// and restored to its original value after the call. + /// + public static async Task IsFileIdenticalAsync(this Stream stream, string path, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(stream); ArgumentException.ThrowIfNullOrEmpty(path); @@ -114,11 +118,31 @@ namespace Jellyfin.Extensions /// The second stream to compare. /// The token to monitor for cancellation requests. /// True if the streams are identical; otherwise false. - public static async Task IsStreamIdenticalAsync(this Stream a, Stream b, CancellationToken cancellationToken) + /// + /// Seekable streams are compared from the beginning (their position is reset to 0 on entry). + /// Non-seekable streams are compared from their current read position. Stream positions are not + /// restored after the call. + /// + public static async Task IsStreamIdenticalAsync(this Stream a, Stream b, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(a); ArgumentNullException.ThrowIfNull(b); + if (ReferenceEquals(a, b)) + { + return true; + } + + if (a.CanSeek) + { + a.Position = 0; + } + + if (b.CanSeek) + { + b.Position = 0; + } + if (a.CanSeek && b.CanSeek && b.Length != a.Length) { return false; @@ -145,9 +169,9 @@ namespace Jellyfin.Extensions var memoryB = bufferB.AsMemory(); int offset = 0; int bytesRead; - while ((bytesRead = await b.ReadAsync(memoryB, cancellationToken).ConfigureAwait(false)) > 0) + while ((bytesRead = await b.ReadAtLeastAsync(memoryB, memoryB.Length, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false)) > 0) { - if (!segmentA.AsSpan(offset, bytesRead).SequenceEqual(memoryB.Span[..bytesRead])) + if (offset + bytesRead > segmentA.Count || !segmentA.AsSpan(offset, bytesRead).SequenceEqual(memoryB.Span[..bytesRead])) { return false; } @@ -174,8 +198,8 @@ namespace Jellyfin.Extensions { cancellationToken.ThrowIfCancellationRequested(); - var bytesReadA = await a.ReadAsync(memoryA, cancellationToken).ConfigureAwait(false); - var bytesReadB = await b.ReadAsync(memoryB, cancellationToken).ConfigureAwait(false); + var bytesReadA = await a.ReadAtLeastAsync(memoryA, memoryA.Length, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false); + var bytesReadB = await b.ReadAtLeastAsync(memoryB, memoryB.Length, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false); if (bytesReadA != bytesReadB) { diff --git a/tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs b/tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs index f7efee1e6c..cdbf2f8b1d 100644 --- a/tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs +++ b/tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs @@ -2,7 +2,6 @@ using System; using System.IO; using System.Threading; using System.Threading.Tasks; -using Jellyfin.Extensions; using Xunit; namespace Jellyfin.Extensions.Tests; @@ -65,8 +64,12 @@ public class StreamExtensionsTests } } - [Fact] - public async Task IsFileIdenticalAsync_UsesStartOfStreamAndRestoresPosition_OnMatch() + // Both publiclyVisible values are exercised so the test runs once under the fast path + // (TryGetBuffer succeeds) and once under the slow path (TryGetBuffer returns false). + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task IsFileIdenticalAsync_UsesStartOfStreamAndRestoresPosition_OnMatch(bool publiclyVisible) { var cancellationToken = TestContext.Current.CancellationToken; var path = Path.Join(Path.GetTempPath(), Path.GetRandomFileName()); @@ -75,7 +78,7 @@ public class StreamExtensionsTests try { - await using var stream = new MemoryStream(bytes); + await using var stream = CreateMemoryStream(bytes, publiclyVisible); stream.Position = 3; var result = await stream.IsFileIdenticalAsync(path, cancellationToken); @@ -89,8 +92,10 @@ public class StreamExtensionsTests } } - [Fact] - public async Task IsFileIdenticalAsync_RestoresPosition_OnMismatch() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task IsFileIdenticalAsync_RestoresPosition_OnMismatch(bool publiclyVisible) { var cancellationToken = TestContext.Current.CancellationToken; var path = Path.Join(Path.GetTempPath(), Path.GetRandomFileName()); @@ -98,7 +103,7 @@ public class StreamExtensionsTests try { - await using var stream = new MemoryStream(new byte[] { 10, 20, 30, 40, 50 }); + await using var stream = CreateMemoryStream(new byte[] { 10, 20, 30, 40, 50 }, publiclyVisible); stream.Position = 2; var result = await stream.IsFileIdenticalAsync(path, cancellationToken); @@ -112,6 +117,96 @@ public class StreamExtensionsTests } } + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task IsStreamIdenticalAsync_BothMemoryStreams_NonZeroPositions_SeeksToStart(bool publiclyVisible) + { + var cancellationToken = TestContext.Current.CancellationToken; + await using var a = CreateMemoryStream(new byte[] { 1, 2, 3, 4, 5 }, publiclyVisible); + await using var b = CreateMemoryStream(new byte[] { 1, 2, 3, 4, 5 }, publiclyVisible); + a.Position = 3; + b.Position = 1; + + var result = await a.IsStreamIdenticalAsync(b, cancellationToken); + + Assert.True(result); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task IsStreamIdenticalAsync_MemoryStreamPairedWithSeekableNonMemoryStream_NonZeroPositions_SeeksToStart(bool publiclyVisible) + { + var cancellationToken = TestContext.Current.CancellationToken; + await using var a = CreateMemoryStream(new byte[] { 1, 2, 3, 4 }, publiclyVisible); + await using var b = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 }); + a.Position = 2; + b.Position = 3; + + var result = await a.IsStreamIdenticalAsync(b, cancellationToken); + + Assert.True(result); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task IsStreamIdenticalAsync_NonMemoryStreamPairedWithMemoryStream_Swaps_ReturnsTrue(bool publiclyVisible) + { + var cancellationToken = TestContext.Current.CancellationToken; + await using var a = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 }); + await using var b = CreateMemoryStream(new byte[] { 1, 2, 3, 4 }, publiclyVisible); + + var result = await a.IsStreamIdenticalAsync(b, cancellationToken); + + Assert.True(result); + } + + [Fact] + public async Task IsStreamIdenticalAsync_BothSeekableNonMemoryStreams_NonZeroPositions_SeeksToStart() + { + var cancellationToken = TestContext.Current.CancellationToken; + await using var a = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 }); + await using var b = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 }); + a.Position = 1; + b.Position = 2; + + var result = await a.IsStreamIdenticalAsync(b, cancellationToken); + + Assert.True(result); + } + + [Fact] + public async Task IsStreamIdenticalAsync_NonSeekableShortReads_Identical_ReturnsTrue() + { + var cancellationToken = TestContext.Current.CancellationToken; + var data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + await using var a = new ShortReadingNonSeekableStream(data, maxReadSize: 3); + await using var b = new ShortReadingNonSeekableStream(data, maxReadSize: 5); + + var result = await a.IsStreamIdenticalAsync(b, cancellationToken); + + Assert.True(result); + } + + [Fact] + public async Task IsStreamIdenticalAsync_NonSeekableShortReads_DifferentLengths_ReturnsFalse() + { + var cancellationToken = TestContext.Current.CancellationToken; + await using var a = new ShortReadingNonSeekableStream(new byte[] { 1, 2, 3, 4 }, maxReadSize: 3); + await using var b = new ShortReadingNonSeekableStream(new byte[] { 1, 2, 3, 4, 5 }, maxReadSize: 5); + + var result = await a.IsStreamIdenticalAsync(b, cancellationToken); + + Assert.False(result); + } + + private static MemoryStream CreateMemoryStream(byte[] data, bool publiclyVisible) + => publiclyVisible + ? new MemoryStream(data, 0, data.Length, writable: false, publiclyVisible: true) + : new MemoryStream(data); + private sealed class NonSeekableReadStream : Stream { private readonly Stream _inner; @@ -173,4 +268,130 @@ public class StreamExtensionsTests await base.DisposeAsync(); } } + + private sealed class SeekableNonMemoryStream : Stream + { + private readonly MemoryStream _inner; + + public SeekableNonMemoryStream(byte[] data) + { + _inner = new MemoryStream(data, writable: false); + } + + public override bool CanRead => true; + + public override bool CanSeek => true; + + public override bool CanWrite => false; + + public override long Length => _inner.Length; + + public override long Position + { + get => _inner.Position; + set => _inner.Position = value; + } + + public override void Flush() + { + } + + public override int Read(byte[] buffer, int offset, int count) + => _inner.Read(buffer, offset, count); + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + => _inner.ReadAsync(buffer, cancellationToken); + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + => _inner.ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask(); + + public override long Seek(long offset, SeekOrigin origin) + => _inner.Seek(offset, origin); + + public override void SetLength(long value) + => throw new NotSupportedException(); + + public override void Write(byte[] buffer, int offset, int count) + => throw new NotSupportedException(); + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _inner.Dispose(); + } + + base.Dispose(disposing); + } + + public override async ValueTask DisposeAsync() + { + await _inner.DisposeAsync(); + await base.DisposeAsync(); + } + } + + private sealed class ShortReadingNonSeekableStream : Stream + { + private readonly Stream _inner; + private readonly int _maxReadSize; + + public ShortReadingNonSeekableStream(byte[] data, int maxReadSize) + { + _inner = new MemoryStream(data, writable: false); + _maxReadSize = maxReadSize; + } + + public override bool CanRead => true; + + public override bool CanSeek => false; + + public override bool CanWrite => false; + + public override long Length => throw new NotSupportedException(); + + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + public override void Flush() + { + } + + public override int Read(byte[] buffer, int offset, int count) + => _inner.Read(buffer, offset, Math.Min(count, _maxReadSize)); + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) + => _inner.ReadAsync(buffer[..Math.Min(buffer.Length, _maxReadSize)], cancellationToken); + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + => _inner.ReadAsync(buffer.AsMemory(offset, Math.Min(count, _maxReadSize)), cancellationToken).AsTask(); + + public override long Seek(long offset, SeekOrigin origin) + => throw new NotSupportedException(); + + public override void SetLength(long value) + => throw new NotSupportedException(); + + public override void Write(byte[] buffer, int offset, int count) + => throw new NotSupportedException(); + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _inner.Dispose(); + } + + base.Dispose(disposing); + } + + public override async ValueTask DisposeAsync() + { + await _inner.DisposeAsync(); + await base.DisposeAsync(); + } + } } From 5c7ee6a6356917252063078fdb3ff331f897bf69 Mon Sep 17 00:00:00 2001 From: Marc Brooks Date: Fri, 29 May 2026 15:54:58 -0500 Subject: [PATCH 8/8] Improved resilience for fast-paths Use fast paths only if we can TryGetBuffer on MemoryStream using segment's Array. Reduce swap overhead for fast path B. Avoid multiple virtcalls by memoizing the CanSeeks. Overlap slow path stream async reads. --- src/Jellyfin.Extensions/StreamExtensions.cs | 38 +++++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/src/Jellyfin.Extensions/StreamExtensions.cs b/src/Jellyfin.Extensions/StreamExtensions.cs index 15b44d8f40..36361c58e8 100644 --- a/src/Jellyfin.Extensions/StreamExtensions.cs +++ b/src/Jellyfin.Extensions/StreamExtensions.cs @@ -133,36 +133,40 @@ namespace Jellyfin.Extensions return true; } - if (a.CanSeek) + if (a.CanSeek is var aCanSeek && aCanSeek) { a.Position = 0; } - if (b.CanSeek) + if (b.CanSeek is var bCanSeek && bCanSeek) { b.Position = 0; } - if (a.CanSeek && b.CanSeek && b.Length != a.Length) + if (aCanSeek && bCanSeek && b.Length != a.Length) { return false; } - // If b is MemoryStream but a is not, swap them to enable fast path B - if (b is MemoryStream && a is not MemoryStream) + // MemoryStreams only unlock a fast path if their underlying buffer is exposed via TryGetBuffer. + var segmentA = a is MemoryStream streamA && streamA.TryGetBuffer(out var bufA) ? bufA : default; + var segmentB = b is MemoryStream streamB && streamB.TryGetBuffer(out var bufB) ? bufB : default; + + // Fast path A: both streams expose buffers, compare segments directly + if (segmentA.Array is not null && segmentB.Array is not null) { - (a, b) = (b, a); + return segmentA.AsSpan().SequenceEqual(segmentB.AsSpan()); } - if (a is MemoryStream streamA && streamA.TryGetBuffer(out var segmentA)) + if (segmentB.Array is not null) // && segmentA.Array is null guaranteed by previous check { - // Fast path A: if both streams are MemoryStreams, compare directly against each other - if (b is MemoryStream streamB && streamB.TryGetBuffer(out var segmentB)) - { - return segmentA.AsSpan().SequenceEqual(segmentB.AsSpan()); - } + // swap so that segmentA is the non-null one, compared to b we need only one fast path B + (segmentA, b) = (segmentB, a); + } - // Fast path B: only first stream is a MemoryStream, compare against second stream chunk-by-chunk + if (segmentA.Array is not null) // either a was non-null, or b was non-null and was swapped there + { + // Fast path B: only one stream exposed a buffer, compare against the other chunk-by-chunk var bufferB = ArrayPool.Shared.Rent(StreamComparisonBufferSize); try { @@ -198,8 +202,12 @@ namespace Jellyfin.Extensions { cancellationToken.ThrowIfCancellationRequested(); - var bytesReadA = await a.ReadAtLeastAsync(memoryA, memoryA.Length, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false); - var bytesReadB = await b.ReadAtLeastAsync(memoryB, memoryB.Length, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false); + var taskA = a.ReadAtLeastAsync(memoryA, memoryA.Length, throwOnEndOfStream: false, cancellationToken).AsTask(); + var taskB = b.ReadAtLeastAsync(memoryB, memoryB.Length, throwOnEndOfStream: false, cancellationToken).AsTask(); + await Task.WhenAll(taskA, taskB).ConfigureAwait(false); + + var bytesReadA = await taskA.ConfigureAwait(false); + var bytesReadB = await taskB.ConfigureAwait(false); if (bytesReadA != bytesReadB) {