diff options
| author | Marc Brooks <IDisposable@gmail.com> | 2026-05-28 13:31:13 -0500 |
|---|---|---|
| committer | Marc Brooks <IDisposable@gmail.com> | 2026-05-28 13:31:13 -0500 |
| commit | 645ae6bb99671ec8bd87c6cb78e6fa3d77063c55 (patch) | |
| tree | f3e4e3ec47b150471f3a7629064515c3727f472d | |
| parent | 175232329612ea8bfc268519e21f6c372e79eea7 (diff) | |
Use ReadAtLeastAsync to handle short-reads.
Seeks to beginning of streams if CanSeek is true.
Added remarks about stream position.
Add test coverage for short-reads.
Fix fast-path tests to actually test the fast path.
Also fix class comment.
| -rw-r--r-- | src/Jellyfin.Extensions/StreamExtensions.cs | 38 | ||||
| -rw-r--r-- | tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs | 235 |
2 files changed, 259 insertions, 14 deletions
diff --git a/src/Jellyfin.Extensions/StreamExtensions.cs b/src/Jellyfin.Extensions/StreamExtensions.cs index fb3fd2eac1..15b44d8f40 100644 --- a/src/Jellyfin.Extensions/StreamExtensions.cs +++ b/src/Jellyfin.Extensions/StreamExtensions.cs @@ -11,7 +11,7 @@ using System.Threading.Tasks; namespace Jellyfin.Extensions { /// <summary> - /// Class BaseExtensions. + /// Extension methods for the <see cref="Stream"/> class. /// </summary> public static class StreamExtensions { @@ -74,7 +74,11 @@ namespace Jellyfin.Extensions /// <param name="cancellationToken">The token to monitor for cancellation requests.</param> /// <returns>True if the stream and file are identical; otherwise false.</returns> /// <exception cref="ArgumentException"><paramref name="stream"/> does not support seeking.</exception> - public static async Task<bool> IsFileIdenticalAsync(this Stream stream, string path, CancellationToken cancellationToken) + /// <remarks> + /// The entire stream is compared against the file from the beginning (the position is reset to 0 on entry) + /// and restored to its original value after the call. + /// </remarks> + public static async Task<bool> IsFileIdenticalAsync(this Stream stream, string path, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(stream); ArgumentException.ThrowIfNullOrEmpty(path); @@ -114,11 +118,31 @@ namespace Jellyfin.Extensions /// <param name="b">The second stream to compare.</param> /// <param name="cancellationToken">The token to monitor for cancellation requests.</param> /// <returns>True if the streams are identical; otherwise false.</returns> - public static async Task<bool> IsStreamIdenticalAsync(this Stream a, Stream b, CancellationToken cancellationToken) + /// <remarks> + /// Seekable streams are compared from the beginning (their position is reset to 0 on entry). + /// Non-seekable streams are compared from their current read position. Stream positions are not + /// restored after the call. + /// </remarks> + public static async Task<bool> IsStreamIdenticalAsync(this Stream a, Stream b, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(a); ArgumentNullException.ThrowIfNull(b); + if (ReferenceEquals(a, b)) + { + return true; + } + + if (a.CanSeek) + { + a.Position = 0; + } + + if (b.CanSeek) + { + b.Position = 0; + } + if (a.CanSeek && b.CanSeek && b.Length != a.Length) { return false; @@ -145,9 +169,9 @@ namespace Jellyfin.Extensions var memoryB = bufferB.AsMemory(); int offset = 0; int bytesRead; - while ((bytesRead = await b.ReadAsync(memoryB, cancellationToken).ConfigureAwait(false)) > 0) + while ((bytesRead = await b.ReadAtLeastAsync(memoryB, memoryB.Length, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false)) > 0) { - if (!segmentA.AsSpan(offset, bytesRead).SequenceEqual(memoryB.Span[..bytesRead])) + if (offset + bytesRead > segmentA.Count || !segmentA.AsSpan(offset, bytesRead).SequenceEqual(memoryB.Span[..bytesRead])) { return false; } @@ -174,8 +198,8 @@ namespace Jellyfin.Extensions { cancellationToken.ThrowIfCancellationRequested(); - var bytesReadA = await a.ReadAsync(memoryA, cancellationToken).ConfigureAwait(false); - var bytesReadB = await b.ReadAsync(memoryB, cancellationToken).ConfigureAwait(false); + var bytesReadA = await a.ReadAtLeastAsync(memoryA, memoryA.Length, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false); + var bytesReadB = await b.ReadAtLeastAsync(memoryB, memoryB.Length, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false); if (bytesReadA != bytesReadB) { diff --git a/tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs b/tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs index f7efee1e6c..cdbf2f8b1d 100644 --- a/tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs +++ b/tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs @@ -2,7 +2,6 @@ using System; using System.IO; using System.Threading; using System.Threading.Tasks; -using Jellyfin.Extensions; using Xunit; namespace Jellyfin.Extensions.Tests; @@ -65,8 +64,12 @@ public class StreamExtensionsTests } } - [Fact] - public async Task IsFileIdenticalAsync_UsesStartOfStreamAndRestoresPosition_OnMatch() + // Both publiclyVisible values are exercised so the test runs once under the fast path + // (TryGetBuffer succeeds) and once under the slow path (TryGetBuffer returns false). + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task IsFileIdenticalAsync_UsesStartOfStreamAndRestoresPosition_OnMatch(bool publiclyVisible) { var cancellationToken = TestContext.Current.CancellationToken; var path = Path.Join(Path.GetTempPath(), Path.GetRandomFileName()); @@ -75,7 +78,7 @@ public class StreamExtensionsTests try { - await using var stream = new MemoryStream(bytes); + await using var stream = CreateMemoryStream(bytes, publiclyVisible); stream.Position = 3; var result = await stream.IsFileIdenticalAsync(path, cancellationToken); @@ -89,8 +92,10 @@ public class StreamExtensionsTests } } - [Fact] - public async Task IsFileIdenticalAsync_RestoresPosition_OnMismatch() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task IsFileIdenticalAsync_RestoresPosition_OnMismatch(bool publiclyVisible) { var cancellationToken = TestContext.Current.CancellationToken; var path = Path.Join(Path.GetTempPath(), Path.GetRandomFileName()); @@ -98,7 +103,7 @@ public class StreamExtensionsTests try { - await using var stream = new MemoryStream(new byte[] { 10, 20, 30, 40, 50 }); + await using var stream = CreateMemoryStream(new byte[] { 10, 20, 30, 40, 50 }, publiclyVisible); stream.Position = 2; var result = await stream.IsFileIdenticalAsync(path, cancellationToken); @@ -112,6 +117,96 @@ public class StreamExtensionsTests } } + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task IsStreamIdenticalAsync_BothMemoryStreams_NonZeroPositions_SeeksToStart(bool publiclyVisible) + { + var cancellationToken = TestContext.Current.CancellationToken; + await using var a = CreateMemoryStream(new byte[] { 1, 2, 3, 4, 5 }, publiclyVisible); + await using var b = CreateMemoryStream(new byte[] { 1, 2, 3, 4, 5 }, publiclyVisible); + a.Position = 3; + b.Position = 1; + + var result = await a.IsStreamIdenticalAsync(b, cancellationToken); + + Assert.True(result); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task IsStreamIdenticalAsync_MemoryStreamPairedWithSeekableNonMemoryStream_NonZeroPositions_SeeksToStart(bool publiclyVisible) + { + var cancellationToken = TestContext.Current.CancellationToken; + await using var a = CreateMemoryStream(new byte[] { 1, 2, 3, 4 }, publiclyVisible); + await using var b = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 }); + a.Position = 2; + b.Position = 3; + + var result = await a.IsStreamIdenticalAsync(b, cancellationToken); + + Assert.True(result); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task IsStreamIdenticalAsync_NonMemoryStreamPairedWithMemoryStream_Swaps_ReturnsTrue(bool publiclyVisible) + { + var cancellationToken = TestContext.Current.CancellationToken; + await using var a = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 }); + await using var b = CreateMemoryStream(new byte[] { 1, 2, 3, 4 }, publiclyVisible); + + var result = await a.IsStreamIdenticalAsync(b, cancellationToken); + + Assert.True(result); + } + + [Fact] + public async Task IsStreamIdenticalAsync_BothSeekableNonMemoryStreams_NonZeroPositions_SeeksToStart() + { + var cancellationToken = TestContext.Current.CancellationToken; + await using var a = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 }); + await using var b = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 }); + a.Position = 1; + b.Position = 2; + + var result = await a.IsStreamIdenticalAsync(b, cancellationToken); + + Assert.True(result); + } + + [Fact] + public async Task IsStreamIdenticalAsync_NonSeekableShortReads_Identical_ReturnsTrue() + { + var cancellationToken = TestContext.Current.CancellationToken; + var data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + await using var a = new ShortReadingNonSeekableStream(data, maxReadSize: 3); + await using var b = new ShortReadingNonSeekableStream(data, maxReadSize: 5); + + var result = await a.IsStreamIdenticalAsync(b, cancellationToken); + + Assert.True(result); + } + + [Fact] + public async Task IsStreamIdenticalAsync_NonSeekableShortReads_DifferentLengths_ReturnsFalse() + { + var cancellationToken = TestContext.Current.CancellationToken; + await using var a = new ShortReadingNonSeekableStream(new byte[] { 1, 2, 3, 4 }, maxReadSize: 3); + await using var b = new ShortReadingNonSeekableStream(new byte[] { 1, 2, 3, 4, 5 }, maxReadSize: 5); + + var result = await a.IsStreamIdenticalAsync(b, cancellationToken); + + Assert.False(result); + } + + private static MemoryStream CreateMemoryStream(byte[] data, bool publiclyVisible) + => publiclyVisible + ? new MemoryStream(data, 0, data.Length, writable: false, publiclyVisible: true) + : new MemoryStream(data); + private sealed class NonSeekableReadStream : Stream { private readonly Stream _inner; @@ -173,4 +268,130 @@ public class StreamExtensionsTests await base.DisposeAsync(); } } + + private sealed class SeekableNonMemoryStream : Stream + { + private readonly MemoryStream _inner; + + public SeekableNonMemoryStream(byte[] data) + { + _inner = new MemoryStream(data, writable: false); + } + + public override bool CanRead => true; + + public override bool CanSeek => true; + + public override bool CanWrite => false; + + public override long Length => _inner.Length; + + public override long Position + { + get => _inner.Position; + set => _inner.Position = value; + } + + public override void Flush() + { + } + + public override int Read(byte[] buffer, int offset, int count) + => _inner.Read(buffer, offset, count); + + public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) + => _inner.ReadAsync(buffer, cancellationToken); + + public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + => _inner.ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask(); + + public override long Seek(long offset, SeekOrigin origin) + => _inner.Seek(offset, origin); + + public override void SetLength(long value) + => throw new NotSupportedException(); + + public override void Write(byte[] buffer, int offset, int count) + => throw new NotSupportedException(); + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _inner.Dispose(); + } + + base.Dispose(disposing); + } + + public override async ValueTask DisposeAsync() + { + await _inner.DisposeAsync(); + await base.DisposeAsync(); + } + } + + private sealed class ShortReadingNonSeekableStream : Stream + { + private readonly Stream _inner; + private readonly int _maxReadSize; + + public ShortReadingNonSeekableStream(byte[] data, int maxReadSize) + { + _inner = new MemoryStream(data, writable: false); + _maxReadSize = maxReadSize; + } + + public override bool CanRead => true; + + public override bool CanSeek => false; + + public override bool CanWrite => false; + + public override long Length => throw new NotSupportedException(); + + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + public override void Flush() + { + } + + public override int Read(byte[] buffer, int offset, int count) + => _inner.Read(buffer, offset, Math.Min(count, _maxReadSize)); + + public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) + => _inner.ReadAsync(buffer[..Math.Min(buffer.Length, _maxReadSize)], cancellationToken); + + public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + => _inner.ReadAsync(buffer.AsMemory(offset, Math.Min(count, _maxReadSize)), cancellationToken).AsTask(); + + public override long Seek(long offset, SeekOrigin origin) + => throw new NotSupportedException(); + + public override void SetLength(long value) + => throw new NotSupportedException(); + + public override void Write(byte[] buffer, int offset, int count) + => throw new NotSupportedException(); + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _inner.Dispose(); + } + + base.Dispose(disposing); + } + + public override async ValueTask DisposeAsync() + { + await _inner.DisposeAsync(); + await base.DisposeAsync(); + } + } } |
