using System; using System.Buffers; using System.Collections.Generic; using System.IO; using System.Linq; using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Jellyfin.Extensions { /// /// Extension methods for the class. /// public static class StreamExtensions { private const int StreamComparisonBufferSize = 81920; /// /// Reads all lines in the . /// /// The to read from. /// All lines in the stream. public static string[] ReadAllLines(this Stream stream) => ReadAllLines(stream, Encoding.UTF8); /// /// Reads all lines in the . /// /// The to read from. /// The character encoding to use. /// All lines in the stream. public static string[] ReadAllLines(this Stream stream, Encoding encoding) { using StreamReader reader = new StreamReader(stream, encoding); return ReadAllLines(reader).ToArray(); } /// /// Reads all lines in the . /// /// The to read from. /// All lines in the stream. public static IEnumerable ReadAllLines(this TextReader reader) { string? line; while ((line = reader.ReadLine()) is not null) { yield return line; } } /// /// Reads all lines in the . /// /// The to read from. /// The token to monitor for cancellation requests. /// All lines in the stream. public static async IAsyncEnumerable ReadAllLinesAsync(this TextReader reader, [EnumeratorCancellation] CancellationToken cancellationToken = default) { string? line; while ((line = await reader.ReadLineAsync(cancellationToken).ConfigureAwait(false)) is not null) { yield return line; } } /// /// Determines whether a stream is identical to a file on disk. /// /// The stream to compare. /// The file path to compare against. /// The token to monitor for cancellation requests. /// True if the stream and file are identical; otherwise false. /// does not support seeking. /// /// The entire stream is compared against the file from the beginning (the position is reset to 0 on entry) /// and restored to its original value after the call. /// public static async Task IsFileIdenticalAsync(this Stream stream, string path, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(stream); ArgumentException.ThrowIfNullOrEmpty(path); if (!stream.CanSeek) { throw new ArgumentException("Stream must support seeking.", nameof(stream)); } var originalPosition = stream.Position; try { stream.Position = 0; var existingFileStream = new FileStream( path, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize: StreamComparisonBufferSize, FileOptions.Asynchronous | FileOptions.SequentialScan); await using (existingFileStream.ConfigureAwait(false)) { return await stream.IsStreamIdenticalAsync(existingFileStream, cancellationToken).ConfigureAwait(false); } } finally { stream.Position = originalPosition; } } /// /// Determines whether two streams are identical. /// /// The first stream to compare. /// The second stream to compare. /// The token to monitor for cancellation requests. /// True if the streams are identical; otherwise false. /// /// Seekable streams are compared from the beginning (their position is reset to 0 on entry). /// Non-seekable streams are compared from their current read position. Stream positions are not /// restored after the call. /// public static async Task IsStreamIdenticalAsync(this Stream a, Stream b, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(a); ArgumentNullException.ThrowIfNull(b); if (ReferenceEquals(a, b)) { return true; } if (a.CanSeek is var aCanSeek && aCanSeek) { a.Position = 0; } if (b.CanSeek is var bCanSeek && bCanSeek) { b.Position = 0; } if (aCanSeek && bCanSeek && b.Length != a.Length) { return false; } // MemoryStreams only unlock a fast path if their underlying buffer is exposed via TryGetBuffer. var segmentA = a is MemoryStream streamA && streamA.TryGetBuffer(out var bufA) ? bufA : default; var segmentB = b is MemoryStream streamB && streamB.TryGetBuffer(out var bufB) ? bufB : default; // Fast path A: both streams expose buffers, compare segments directly if (segmentA.Array is not null && segmentB.Array is not null) { return segmentA.AsSpan().SequenceEqual(segmentB.AsSpan()); } if (segmentB.Array is not null) // && segmentA.Array is null guaranteed by previous check { // swap so that segmentA is the non-null one, compared to b we need only one fast path B (segmentA, b) = (segmentB, a); } if (segmentA.Array is not null) // either a was non-null, or b was non-null and was swapped there { // Fast path B: only one stream exposed a buffer, compare against the other chunk-by-chunk var bufferB = ArrayPool.Shared.Rent(StreamComparisonBufferSize); try { var memoryB = bufferB.AsMemory(); int offset = 0; int bytesRead; while ((bytesRead = await b.ReadAtLeastAsync(memoryB, memoryB.Length, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false)) > 0) { if (offset + bytesRead > segmentA.Count || !segmentA.AsSpan(offset, bytesRead).SequenceEqual(memoryB.Span[..bytesRead])) { return false; } offset += bytesRead; } return offset == segmentA.Count; } finally { ArrayPool.Shared.Return(bufferB); } } else { var bufferA = ArrayPool.Shared.Rent(StreamComparisonBufferSize); var bufferB = ArrayPool.Shared.Rent(StreamComparisonBufferSize); try { var memoryA = bufferA.AsMemory(); var memoryB = bufferB.AsMemory(); while (true) { cancellationToken.ThrowIfCancellationRequested(); var taskA = a.ReadAtLeastAsync(memoryA, memoryA.Length, throwOnEndOfStream: false, cancellationToken).AsTask(); var taskB = b.ReadAtLeastAsync(memoryB, memoryB.Length, throwOnEndOfStream: false, cancellationToken).AsTask(); await Task.WhenAll(taskA, taskB).ConfigureAwait(false); var bytesReadA = await taskA.ConfigureAwait(false); var bytesReadB = await taskB.ConfigureAwait(false); if (bytesReadA != bytesReadB) { return false; } if (bytesReadA == 0) { return true; } if (!memoryA.Span[..bytesReadA].SequenceEqual(memoryB.Span[..bytesReadB])) { return false; } } } finally { ArrayPool.Shared.Return(bufferA); ArrayPool.Shared.Return(bufferB); } } } } }