aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarc Brooks <IDisposable@gmail.com>2026-05-28 13:31:13 -0500
committerMarc Brooks <IDisposable@gmail.com>2026-05-28 13:31:13 -0500
commit645ae6bb99671ec8bd87c6cb78e6fa3d77063c55 (patch)
treef3e4e3ec47b150471f3a7629064515c3727f472d
parent175232329612ea8bfc268519e21f6c372e79eea7 (diff)
Use ReadAtLeastAsync to handle short-reads.
Seeks to beginning of streams if CanSeek is true. Added remarks about stream position. Add test coverage for short-reads. Fix fast-path tests to actually test the fast path. Also fix class comment.
-rw-r--r--src/Jellyfin.Extensions/StreamExtensions.cs38
-rw-r--r--tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs235
2 files changed, 259 insertions, 14 deletions
diff --git a/src/Jellyfin.Extensions/StreamExtensions.cs b/src/Jellyfin.Extensions/StreamExtensions.cs
index fb3fd2eac1..15b44d8f40 100644
--- a/src/Jellyfin.Extensions/StreamExtensions.cs
+++ b/src/Jellyfin.Extensions/StreamExtensions.cs
@@ -11,7 +11,7 @@ using System.Threading.Tasks;
namespace Jellyfin.Extensions
{
/// <summary>
- /// Class BaseExtensions.
+ /// Extension methods for the <see cref="Stream"/> class.
/// </summary>
public static class StreamExtensions
{
@@ -74,7 +74,11 @@ namespace Jellyfin.Extensions
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>True if the stream and file are identical; otherwise false.</returns>
/// <exception cref="ArgumentException"><paramref name="stream"/> does not support seeking.</exception>
- public static async Task<bool> IsFileIdenticalAsync(this Stream stream, string path, CancellationToken cancellationToken)
+ /// <remarks>
+ /// The entire stream is compared against the file from the beginning (the position is reset to 0 on entry)
+ /// and restored to its original value after the call.
+ /// </remarks>
+ public static async Task<bool> IsFileIdenticalAsync(this Stream stream, string path, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(stream);
ArgumentException.ThrowIfNullOrEmpty(path);
@@ -114,11 +118,31 @@ namespace Jellyfin.Extensions
/// <param name="b">The second stream to compare.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>True if the streams are identical; otherwise false.</returns>
- public static async Task<bool> IsStreamIdenticalAsync(this Stream a, Stream b, CancellationToken cancellationToken)
+ /// <remarks>
+ /// Seekable streams are compared from the beginning (their position is reset to 0 on entry).
+ /// Non-seekable streams are compared from their current read position. Stream positions are not
+ /// restored after the call.
+ /// </remarks>
+ public static async Task<bool> IsStreamIdenticalAsync(this Stream a, Stream b, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(a);
ArgumentNullException.ThrowIfNull(b);
+ if (ReferenceEquals(a, b))
+ {
+ return true;
+ }
+
+ if (a.CanSeek)
+ {
+ a.Position = 0;
+ }
+
+ if (b.CanSeek)
+ {
+ b.Position = 0;
+ }
+
if (a.CanSeek && b.CanSeek && b.Length != a.Length)
{
return false;
@@ -145,9 +169,9 @@ namespace Jellyfin.Extensions
var memoryB = bufferB.AsMemory();
int offset = 0;
int bytesRead;
- while ((bytesRead = await b.ReadAsync(memoryB, cancellationToken).ConfigureAwait(false)) > 0)
+ while ((bytesRead = await b.ReadAtLeastAsync(memoryB, memoryB.Length, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false)) > 0)
{
- if (!segmentA.AsSpan(offset, bytesRead).SequenceEqual(memoryB.Span[..bytesRead]))
+ if (offset + bytesRead > segmentA.Count || !segmentA.AsSpan(offset, bytesRead).SequenceEqual(memoryB.Span[..bytesRead]))
{
return false;
}
@@ -174,8 +198,8 @@ namespace Jellyfin.Extensions
{
cancellationToken.ThrowIfCancellationRequested();
- var bytesReadA = await a.ReadAsync(memoryA, cancellationToken).ConfigureAwait(false);
- var bytesReadB = await b.ReadAsync(memoryB, cancellationToken).ConfigureAwait(false);
+ var bytesReadA = await a.ReadAtLeastAsync(memoryA, memoryA.Length, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false);
+ var bytesReadB = await b.ReadAtLeastAsync(memoryB, memoryB.Length, throwOnEndOfStream: false, cancellationToken).ConfigureAwait(false);
if (bytesReadA != bytesReadB)
{
diff --git a/tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs b/tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs
index f7efee1e6c..cdbf2f8b1d 100644
--- a/tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs
+++ b/tests/Jellyfin.Extensions.Tests/StreamExtensionsTests.cs
@@ -2,7 +2,6 @@ using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
-using Jellyfin.Extensions;
using Xunit;
namespace Jellyfin.Extensions.Tests;
@@ -65,8 +64,12 @@ public class StreamExtensionsTests
}
}
- [Fact]
- public async Task IsFileIdenticalAsync_UsesStartOfStreamAndRestoresPosition_OnMatch()
+ // Both publiclyVisible values are exercised so the test runs once under the fast path
+ // (TryGetBuffer succeeds) and once under the slow path (TryGetBuffer returns false).
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task IsFileIdenticalAsync_UsesStartOfStreamAndRestoresPosition_OnMatch(bool publiclyVisible)
{
var cancellationToken = TestContext.Current.CancellationToken;
var path = Path.Join(Path.GetTempPath(), Path.GetRandomFileName());
@@ -75,7 +78,7 @@ public class StreamExtensionsTests
try
{
- await using var stream = new MemoryStream(bytes);
+ await using var stream = CreateMemoryStream(bytes, publiclyVisible);
stream.Position = 3;
var result = await stream.IsFileIdenticalAsync(path, cancellationToken);
@@ -89,8 +92,10 @@ public class StreamExtensionsTests
}
}
- [Fact]
- public async Task IsFileIdenticalAsync_RestoresPosition_OnMismatch()
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task IsFileIdenticalAsync_RestoresPosition_OnMismatch(bool publiclyVisible)
{
var cancellationToken = TestContext.Current.CancellationToken;
var path = Path.Join(Path.GetTempPath(), Path.GetRandomFileName());
@@ -98,7 +103,7 @@ public class StreamExtensionsTests
try
{
- await using var stream = new MemoryStream(new byte[] { 10, 20, 30, 40, 50 });
+ await using var stream = CreateMemoryStream(new byte[] { 10, 20, 30, 40, 50 }, publiclyVisible);
stream.Position = 2;
var result = await stream.IsFileIdenticalAsync(path, cancellationToken);
@@ -112,6 +117,96 @@ public class StreamExtensionsTests
}
}
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task IsStreamIdenticalAsync_BothMemoryStreams_NonZeroPositions_SeeksToStart(bool publiclyVisible)
+ {
+ var cancellationToken = TestContext.Current.CancellationToken;
+ await using var a = CreateMemoryStream(new byte[] { 1, 2, 3, 4, 5 }, publiclyVisible);
+ await using var b = CreateMemoryStream(new byte[] { 1, 2, 3, 4, 5 }, publiclyVisible);
+ a.Position = 3;
+ b.Position = 1;
+
+ var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
+
+ Assert.True(result);
+ }
+
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task IsStreamIdenticalAsync_MemoryStreamPairedWithSeekableNonMemoryStream_NonZeroPositions_SeeksToStart(bool publiclyVisible)
+ {
+ var cancellationToken = TestContext.Current.CancellationToken;
+ await using var a = CreateMemoryStream(new byte[] { 1, 2, 3, 4 }, publiclyVisible);
+ await using var b = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 });
+ a.Position = 2;
+ b.Position = 3;
+
+ var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
+
+ Assert.True(result);
+ }
+
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task IsStreamIdenticalAsync_NonMemoryStreamPairedWithMemoryStream_Swaps_ReturnsTrue(bool publiclyVisible)
+ {
+ var cancellationToken = TestContext.Current.CancellationToken;
+ await using var a = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 });
+ await using var b = CreateMemoryStream(new byte[] { 1, 2, 3, 4 }, publiclyVisible);
+
+ var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
+
+ Assert.True(result);
+ }
+
+ [Fact]
+ public async Task IsStreamIdenticalAsync_BothSeekableNonMemoryStreams_NonZeroPositions_SeeksToStart()
+ {
+ var cancellationToken = TestContext.Current.CancellationToken;
+ await using var a = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 });
+ await using var b = new SeekableNonMemoryStream(new byte[] { 1, 2, 3, 4 });
+ a.Position = 1;
+ b.Position = 2;
+
+ var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
+
+ Assert.True(result);
+ }
+
+ [Fact]
+ public async Task IsStreamIdenticalAsync_NonSeekableShortReads_Identical_ReturnsTrue()
+ {
+ var cancellationToken = TestContext.Current.CancellationToken;
+ var data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+ await using var a = new ShortReadingNonSeekableStream(data, maxReadSize: 3);
+ await using var b = new ShortReadingNonSeekableStream(data, maxReadSize: 5);
+
+ var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
+
+ Assert.True(result);
+ }
+
+ [Fact]
+ public async Task IsStreamIdenticalAsync_NonSeekableShortReads_DifferentLengths_ReturnsFalse()
+ {
+ var cancellationToken = TestContext.Current.CancellationToken;
+ await using var a = new ShortReadingNonSeekableStream(new byte[] { 1, 2, 3, 4 }, maxReadSize: 3);
+ await using var b = new ShortReadingNonSeekableStream(new byte[] { 1, 2, 3, 4, 5 }, maxReadSize: 5);
+
+ var result = await a.IsStreamIdenticalAsync(b, cancellationToken);
+
+ Assert.False(result);
+ }
+
+ private static MemoryStream CreateMemoryStream(byte[] data, bool publiclyVisible)
+ => publiclyVisible
+ ? new MemoryStream(data, 0, data.Length, writable: false, publiclyVisible: true)
+ : new MemoryStream(data);
+
private sealed class NonSeekableReadStream : Stream
{
private readonly Stream _inner;
@@ -173,4 +268,130 @@ public class StreamExtensionsTests
await base.DisposeAsync();
}
}
+
+ private sealed class SeekableNonMemoryStream : Stream
+ {
+ private readonly MemoryStream _inner;
+
+ public SeekableNonMemoryStream(byte[] data)
+ {
+ _inner = new MemoryStream(data, writable: false);
+ }
+
+ public override bool CanRead => true;
+
+ public override bool CanSeek => true;
+
+ public override bool CanWrite => false;
+
+ public override long Length => _inner.Length;
+
+ public override long Position
+ {
+ get => _inner.Position;
+ set => _inner.Position = value;
+ }
+
+ public override void Flush()
+ {
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ => _inner.Read(buffer, offset, count);
+
+ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
+ => _inner.ReadAsync(buffer, cancellationToken);
+
+ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ => _inner.ReadAsync(buffer.AsMemory(offset, count), cancellationToken).AsTask();
+
+ public override long Seek(long offset, SeekOrigin origin)
+ => _inner.Seek(offset, origin);
+
+ public override void SetLength(long value)
+ => throw new NotSupportedException();
+
+ public override void Write(byte[] buffer, int offset, int count)
+ => throw new NotSupportedException();
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ _inner.Dispose();
+ }
+
+ base.Dispose(disposing);
+ }
+
+ public override async ValueTask DisposeAsync()
+ {
+ await _inner.DisposeAsync();
+ await base.DisposeAsync();
+ }
+ }
+
+ private sealed class ShortReadingNonSeekableStream : Stream
+ {
+ private readonly Stream _inner;
+ private readonly int _maxReadSize;
+
+ public ShortReadingNonSeekableStream(byte[] data, int maxReadSize)
+ {
+ _inner = new MemoryStream(data, writable: false);
+ _maxReadSize = maxReadSize;
+ }
+
+ public override bool CanRead => true;
+
+ public override bool CanSeek => false;
+
+ public override bool CanWrite => false;
+
+ public override long Length => throw new NotSupportedException();
+
+ public override long Position
+ {
+ get => throw new NotSupportedException();
+ set => throw new NotSupportedException();
+ }
+
+ public override void Flush()
+ {
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ => _inner.Read(buffer, offset, Math.Min(count, _maxReadSize));
+
+ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
+ => _inner.ReadAsync(buffer[..Math.Min(buffer.Length, _maxReadSize)], cancellationToken);
+
+ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
+ => _inner.ReadAsync(buffer.AsMemory(offset, Math.Min(count, _maxReadSize)), cancellationToken).AsTask();
+
+ public override long Seek(long offset, SeekOrigin origin)
+ => throw new NotSupportedException();
+
+ public override void SetLength(long value)
+ => throw new NotSupportedException();
+
+ public override void Write(byte[] buffer, int offset, int count)
+ => throw new NotSupportedException();
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ {
+ _inner.Dispose();
+ }
+
+ base.Dispose(disposing);
+ }
+
+ public override async ValueTask DisposeAsync()
+ {
+ await _inner.DisposeAsync();
+ await base.DisposeAsync();
+ }
+ }
}