Merge pull request #16936 from IDisposable/stable-nfo

Compare old file byte-by-byte to new stream
This commit is contained in:
Bond-009
2026-05-31 17:18:05 +02:00
committed by GitHub
3 changed files with 581 additions and 4 deletions

View File

@@ -198,15 +198,23 @@ namespace MediaBrowser.XbmcMetadata.Savers
cancellationToken.ThrowIfCancellationRequested();
await SaveToFileAsync(memoryStream, path).ConfigureAwait(false);
await SaveToFileAsync(memoryStream, path, cancellationToken).ConfigureAwait(false);
}
}
private async Task SaveToFileAsync(Stream stream, string path)
private async Task SaveToFileAsync(Stream stream, string path, CancellationToken cancellationToken)
{
var directory = Path.GetDirectoryName(path) ?? throw new ArgumentException($"Provided path ({path}) is not valid.", nameof(path));
Directory.CreateDirectory(directory);
// Compare byte-for-byte before proceeding.
if (File.Exists(path) && await stream.IsFileIdenticalAsync(path, cancellationToken).ConfigureAwait(false))
{
return; // Don't save since .nfo is unchanged.
}
stream.Position = 0;
// On Windows, saving the file will fail if the file is hidden or readonly
FileSystem.SetAttributes(path, false, false);
@@ -222,7 +230,7 @@ namespace MediaBrowser.XbmcMetadata.Savers
var filestream = new FileStream(path, fileStreamOptions);
await using (filestream.ConfigureAwait(false))
{
await stream.CopyToAsync(filestream).ConfigureAwait(false);
await stream.CopyToAsync(filestream, cancellationToken).ConfigureAwait(false);
}
if (ConfigurationManager.Configuration.SaveMetadataHidden)

View File

@@ -1,17 +1,22 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Jellyfin.Extensions
{
/// <summary>
/// Class BaseExtensions.
/// Extension methods for the <see cref="Stream"/> class.
/// </summary>
public static class StreamExtensions
{
private const int StreamComparisonBufferSize = 81920;
/// <summary>
/// Reads all lines in the <see cref="Stream" />.
/// </summary>
@@ -60,5 +65,172 @@ namespace Jellyfin.Extensions
yield return line;
}
}
/// <summary>
/// Determines whether a stream is identical to a file on disk.
/// </summary>
/// <param name="stream">The stream to compare.</param>
/// <param name="path">The file path to compare against.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>True if the stream and file are identical; otherwise false.</returns>
/// <exception cref="ArgumentException"><paramref name="stream"/> does not support seeking.</exception>
/// <remarks>
/// The entire stream is compared against the file from the beginning (the position is reset to 0 on entry)
/// and restored to its original value after the call.
/// </remarks>
public static async Task<bool> IsFileIdenticalAsync(this Stream stream, string path, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(stream);
ArgumentException.ThrowIfNullOrEmpty(path);
if (!stream.CanSeek)
{
throw new ArgumentException("Stream must support seeking.", nameof(stream));
}
var originalPosition = stream.Position;
try
{
stream.Position = 0;
var existingFileStream = new FileStream(
path,
FileMode.Open,
FileAccess.Read,
FileShare.Read,
bufferSize: StreamComparisonBufferSize,
FileOptions.Asynchronous | FileOptions.SequentialScan);
await using (existingFileStream.ConfigureAwait(false))
{
return await stream.IsStreamIdenticalAsync(existingFileStream, cancellationToken).ConfigureAwait(false);
}
}
finally
{
stream.Position = originalPosition;
}
}
/// <summary>
/// Determines whether two streams are identical.
/// </summary>
/// <param name="a">The first stream to compare.</param>
/// <param name="b">The second stream to compare.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>True if the streams are identical; otherwise false.</returns>
/// <remarks>
/// Seekable streams are compared from the beginning (their position is reset to 0 on entry).
/// Non-seekable streams are compared from their current read position. Stream positions are not
/// restored after the call.
/// </remarks>
public static async Task<bool> IsStreamIdenticalAsync(this Stream a, Stream b, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(a);
ArgumentNullException.ThrowIfNull(b);
if (ReferenceEquals(a, b))
{
return true;
}
if (a.CanSeek is var aCanSeek && aCanSeek)
{
a.Position = 0;
}
if (b.CanSeek is var bCanSeek && bCanSeek)
{
b.Position = 0;
}
if (aCanSeek && bCanSeek && b.Length != a.Length)
{
return false;
}
// MemoryStreams only unlock a fast path if their underlying buffer is exposed via TryGetBuffer.
var segmentA = a is MemoryStream streamA && streamA.TryGetBuffer(out var bufA) ? bufA : default;
var segmentB = b is MemoryStream streamB && streamB.TryGetBuffer(out var bufB) ? bufB : default;
// Fast path A: both streams expose buffers, compare segments directly
if (segmentA.Array is not null && segmentB.Array is not null)
{
return segmentA.AsSpan().SequenceEqual(segmentB.AsSpan());
}
if (segmentB.Array is not null) // && segmentA.Array is null guaranteed by previous check
{
// swap so that segmentA is the non-null one, compared to b we need only one fast path B
(segmentA, b) = (segmentB, a);
}
if (segmentA.Array is not null) // either a was non-null, or b was non-null and was swapped there
{
// Fast path B: only one stream exposed a buffer, compare against the other chunk-by-chunk
var bufferB = ArrayPool<byte>.Shared.Rent(StreamComparisonBufferSize);
try
{
var memoryB = bufferB.AsMemory();
int offset = 0;
int bytesRead;
while ((bytesRead = await b.ReadAtLeastAsync(memoryB, memoryB.Length, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false)) > 0)
{
if (offset + bytesRead > segmentA.Count || !segmentA.AsSpan(offset, bytesRead).SequenceEqual(memoryB.Span[..bytesRead]))
{
return false;
}
offset += bytesRead;
}
return offset == segmentA.Count;
}
finally
{
ArrayPool<byte>.Shared.Return(bufferB);
}
}
else
{
var bufferA = ArrayPool<byte>.Shared.Rent(StreamComparisonBufferSize);
var bufferB = ArrayPool<byte>.Shared.Rent(StreamComparisonBufferSize);
try
{
var memoryA = bufferA.AsMemory();
var memoryB = bufferB.AsMemory();
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
var taskA = a.ReadAtLeastAsync(memoryA, memoryA.Length, throwOnEndOfStream: false, cancellationToken).AsTask();
var taskB = b.ReadAtLeastAsync(memoryB, memoryB.Length, throwOnEndOfStream: false, cancellationToken).AsTask();
await Task.WhenAll(taskA, taskB).ConfigureAwait(false);
var bytesReadA = await taskA.ConfigureAwait(false);
var bytesReadB = await taskB.ConfigureAwait(false);
if (bytesReadA != bytesReadB)
{
return false;
}
if (bytesReadA == 0)
{
return true;
}
if (!memoryA.Span[..bytesReadA].SequenceEqual(memoryB.Span[..bytesReadB]))
{
return false;
}
}
}
finally
{
ArrayPool<byte>.Shared.Return(bufferA);
ArrayPool<byte>.Shared.Return(bufferB);
}
}
}
}
}

View File

@@ -0,0 +1,397 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace Jellyfin.Extensions.Tests;
public class StreamExtensionsTests
{
[Fact]
public async Task IsStreamIdenticalAsync_SeekableDifferentLengths_ReturnsFalse()
{
var cancellationToken = TestContext.Current.CancellationToken;
await using var a = new MemoryStream(new byte[] { 1, 2, 3 });
await using var b = new MemoryStream(new byte[] { 1, 2, 3, 4 });
var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
Assert.False(result);
}
[Fact]
public async Task IsStreamIdenticalAsync_NonSeekableIdenticalStreams_ReturnsTrue()
{
var cancellationToken = TestContext.Current.CancellationToken;
await using var a = new NonSeekableReadStream(new byte[] { 1, 2, 3, 4 });
await using var b = new NonSeekableReadStream(new byte[] { 1, 2, 3, 4 });
var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
Assert.True(result);
}
[Fact]
public async Task IsStreamIdenticalAsync_NonSeekableDifferentStreams_ReturnsFalse()
{
var cancellationToken = TestContext.Current.CancellationToken;
await using var a = new NonSeekableReadStream(new byte[] { 1, 2, 3, 4 });
await using var b = new NonSeekableReadStream(new byte[] { 1, 2, 9, 4 });
var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
Assert.False(result);
}
[Fact]
public async Task IsFileIdenticalAsync_NonSeekableStream_ThrowsArgumentException()
{
var cancellationToken = TestContext.Current.CancellationToken;
var path = Path.Join(Path.GetTempPath(), Path.GetRandomFileName());
await File.WriteAllBytesAsync(path, new byte[] { 1, 2, 3, 4 }, cancellationToken);
try
{
await using var stream = new NonSeekableReadStream(new byte[] { 1, 2, 3, 4 });
await Assert.ThrowsAsync<ArgumentException>(async () =>
await stream.IsFileIdenticalAsync(path, cancellationToken));
}
finally
{
File.Delete(path);
}
}
// Both publiclyVisible values are exercised so the test runs once under the fast path
// (TryGetBuffer succeeds) and once under the slow path (TryGetBuffer returns false).
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task IsFileIdenticalAsync_UsesStartOfStreamAndRestoresPosition_OnMatch(bool publiclyVisible)
{
var cancellationToken = TestContext.Current.CancellationToken;
var path = Path.Join(Path.GetTempPath(), Path.GetRandomFileName());
var bytes = new byte[] { 10, 20, 30, 40, 50 };
await File.WriteAllBytesAsync(path, bytes, cancellationToken);
try
{
await using var stream = CreateMemoryStream(bytes, publiclyVisible);
stream.Position = 3;
var result = await stream.IsFileIdenticalAsync(path, cancellationToken);
Assert.True(result);
Assert.Equal(3, stream.Position);
}
finally
{
File.Delete(path);
}
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task IsFileIdenticalAsync_RestoresPosition_OnMismatch(bool publiclyVisible)
{
var cancellationToken = TestContext.Current.CancellationToken;
var path = Path.Join(Path.GetTempPath(), Path.GetRandomFileName());
await File.WriteAllBytesAsync(path, new byte[] { 10, 20, 30, 40, 99 }, cancellationToken);
try
{
await using var stream = CreateMemoryStream(new byte[] { 10, 20, 30, 40, 50 }, publiclyVisible);
stream.Position = 2;
var result = await stream.IsFileIdenticalAsync(path, cancellationToken);
Assert.False(result);
Assert.Equal(2, stream.Position);
}
finally
{
File.Delete(path);
}
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task IsStreamIdenticalAsync_BothMemoryStreams_NonZeroPositions_SeeksToStart(bool publiclyVisible)
{
var cancellationToken = TestContext.Current.CancellationToken;
await using var a = CreateMemoryStream(new byte[] { 1, 2, 3, 4, 5 }, publiclyVisible);
await using var b = CreateMemoryStream(new byte[] { 1, 2, 3, 4, 5 }, publiclyVisible);
a.Position = 3;
b.Position = 1;
var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
Assert.True(result);
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task IsStreamIdenticalAsync_MemoryStreamPairedWithSeekableNonMemoryStream_NonZeroPositions_SeeksToStart(bool publiclyVisible)
{
var cancellationToken = TestContext.Current.CancellationToken;
await using var a = CreateMemoryStream(new byte[] { 1, 2, 3, 4 }, publiclyVisible);
await using var b = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 });
a.Position = 2;
b.Position = 3;
var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
Assert.True(result);
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task IsStreamIdenticalAsync_NonMemoryStreamPairedWithMemoryStream_Swaps_ReturnsTrue(bool publiclyVisible)
{
var cancellationToken = TestContext.Current.CancellationToken;
await using var a = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 });
await using var b = CreateMemoryStream(new byte[] { 1, 2, 3, 4 }, publiclyVisible);
var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
Assert.True(result);
}
[Fact]
public async Task IsStreamIdenticalAsync_BothSeekableNonMemoryStreams_NonZeroPositions_SeeksToStart()
{
var cancellationToken = TestContext.Current.CancellationToken;
await using var a = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 });
await using var b = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 });
a.Position = 1;
b.Position = 2;
var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
Assert.True(result);
}
[Fact]
public async Task IsStreamIdenticalAsync_NonSeekableShortReads_Identical_ReturnsTrue()
{
var cancellationToken = TestContext.Current.CancellationToken;
var data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
await using var a = new ShortReadingNonSeekableStream(data, maxReadSize: 3);
await using var b = new ShortReadingNonSeekableStream(data, maxReadSize: 5);
var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
Assert.True(result);
}
[Fact]
public async Task IsStreamIdenticalAsync_NonSeekableShortReads_DifferentLengths_ReturnsFalse()
{
var cancellationToken = TestContext.Current.CancellationToken;
await using var a = new ShortReadingNonSeekableStream(new byte[] { 1, 2, 3, 4 }, maxReadSize: 3);
await using var b = new ShortReadingNonSeekableStream(new byte[] { 1, 2, 3, 4, 5 }, maxReadSize: 5);
var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
Assert.False(result);
}
private static MemoryStream CreateMemoryStream(byte[] data, bool publiclyVisible)
=> publiclyVisible
? new MemoryStream(data, 0, data.Length, writable: false, publiclyVisible: true)
: new MemoryStream(data);
private sealed class NonSeekableReadStream : Stream
{
private readonly Stream _inner;
public NonSeekableReadStream(byte[] data)
{
_inner = new MemoryStream(data, writable: false);
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override void Flush()
{
}
public override int Read(byte[] buffer, int offset, int count)
=> _inner.Read(buffer, offset, count);
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
=> _inner.ReadAsync(buffer, cancellationToken);
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> _inner.ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
public override long Seek(long offset, SeekOrigin origin)
=> throw new NotSupportedException();
public override void SetLength(long value)
=> throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();
protected override void Dispose(bool disposing)
{
if (disposing)
{
_inner.Dispose();
}
base.Dispose(disposing);
}
public override async ValueTask DisposeAsync()
{
await _inner.DisposeAsync();
await base.DisposeAsync();
}
}
private sealed class SeekableNonMemoryStream : Stream
{
private readonly MemoryStream _inner;
public SeekableNonMemoryStream(byte[] data)
{
_inner = new MemoryStream(data, writable: false);
}
public override bool CanRead => true;
public override bool CanSeek => true;
public override bool CanWrite => false;
public override long Length => _inner.Length;
public override long Position
{
get => _inner.Position;
set => _inner.Position = value;
}
public override void Flush()
{
}
public override int Read(byte[] buffer, int offset, int count)
=> _inner.Read(buffer, offset, count);
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
=> _inner.ReadAsync(buffer, cancellationToken);
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> _inner.ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
public override long Seek(long offset, SeekOrigin origin)
=> _inner.Seek(offset, origin);
public override void SetLength(long value)
=> throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();
protected override void Dispose(bool disposing)
{
if (disposing)
{
_inner.Dispose();
}
base.Dispose(disposing);
}
public override async ValueTask DisposeAsync()
{
await _inner.DisposeAsync();
await base.DisposeAsync();
}
}
private sealed class ShortReadingNonSeekableStream : Stream
{
private readonly Stream _inner;
private readonly int _maxReadSize;
public ShortReadingNonSeekableStream(byte[] data, int maxReadSize)
{
_inner = new MemoryStream(data, writable: false);
_maxReadSize = maxReadSize;
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override void Flush()
{
}
public override int Read(byte[] buffer, int offset, int count)
=> _inner.Read(buffer, offset, Math.Min(count, _maxReadSize));
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
=> _inner.ReadAsync(buffer[..Math.Min(buffer.Length, _maxReadSize)], cancellationToken);
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> _inner.ReadAsync(buffer.AsMemory(offset, Math.Min(count, _maxReadSize)), cancellationToken).AsTask();
public override long Seek(long offset, SeekOrigin origin)
=> throw new NotSupportedException();
public override void SetLength(long value)
=> throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count)
=> throw new NotSupportedException();
protected override void Dispose(bool disposing)
{
if (disposing)
{
_inner.Dispose();
}
base.Dispose(disposing);
}
public override async ValueTask DisposeAsync()
{
await _inner.DisposeAsync();
await base.DisposeAsync();
}
}
}