Skip to content

Commit

Permalink
override Stream.ReadAsync(Memory, CancellationToken)
Browse files Browse the repository at this point in the history
  • Loading branch information
smdn committed Feb 4, 2022
1 parent 0b6a41d commit 93e8860
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -456,66 +456,116 @@ CancellationToken cancellationToken
if (count == 0L)
return Task.FromResult(0); // do nothing

return ReadAsyncCore(
destination: buffer.AsMemory(offset, count),
cancellationToken: cancellationToken
#if NETSTANDARD2_1_OR_GREATER || NET5_0_OR_GREATER
).AsTask();
#else
);
#endif
}

#if SYSTEM_IO_STREAM_READASYNC_MEMORY_OF_BYTE
public override ValueTask<int> ReadAsync(
Memory<byte> buffer,
CancellationToken cancellationToken = default
)
{
CheckDisposed();

if (cancellationToken.IsCancellationRequested)
#if SYSTEM_THREADING_TASKS_VALUETASK_FROMCANCELED
return ValueTask.FromCanceled<int>(cancellationToken);
#else
#if SYSTEM_THREADING_TASKS_TASK_FROMCANCELED
return new(Task.FromCanceled<int>(cancellationToken));
#else
return new(new Task<int>(() => default, cancellationToken));
#endif
#endif

if (buffer.IsEmpty)
return new(0); // do nothing

return ReadAsyncCore(
destination: buffer,
offset: offset,
count: count,
cancellationToken: cancellationToken
);
}
#endif

private async Task<int> ReadAsyncCore(
byte[] destination,
int offset,
int count,
private async
#if NETSTANDARD2_1_OR_GREATER || NET5_0_OR_GREATER
ValueTask<int>
#else
Task<int>
#endif
ReadAsyncCore(
Memory<byte> destination,
CancellationToken cancellationToken
)
{
if (count <= bufRemain) {
Buffer.BlockCopy(buffer, bufOffset, destination, offset, count);
bufOffset += count;
bufRemain -= count;
if (destination.Length <= bufRemain) {
buffer.AsSpan(bufOffset, destination.Length).CopyTo(destination.Span);
bufOffset += destination.Length;
bufRemain -= destination.Length;

return count;
return destination.Length;
}

var read = 0;

if (bufRemain != 0) {
Buffer.BlockCopy(buffer, bufOffset, destination, offset, bufRemain);
buffer.AsSpan(bufOffset, bufRemain).CopyTo(destination.Span);

read = bufRemain;
offset += bufRemain;
count -= bufRemain;

destination = destination.Slice(bufRemain);

bufRemain = 0;
}

// read from base stream
for (; ; ) {
if (count <= 0)
if (destination.IsEmpty)
break;

var r =
#if SYSTEM_IO_STREAM_READASYNC_MEMORY_OF_BYTE
await stream.ReadAsync(
destination.AsMemory(offset, count),
var r = await stream.ReadAsync(
destination,
cancellationToken
).ConfigureAwait(false);
#else
#pragma warning disable CA1835
await stream.ReadAsync(
destination,
offset,
count,
#pragma warning restore CA1835
#endif
byte[] readBuffer = null;
int r = 0;

try {
readBuffer = ArrayPool<byte>.Shared.Rent(destination.Length);

r = await stream.ReadAsync(
readBuffer,
0,
destination.Length,
cancellationToken
).ConfigureAwait(false);
}
finally {
if (readBuffer is not null) {
if (0 < r)
readBuffer.AsMemory(0, r).CopyTo(destination);

ArrayPool<byte>.Shared.Return(readBuffer);
}
}
#pragma warning restore CA1835
#endif

if (r <= 0)
break;

offset += r;
count -= r;
destination = destination.Slice(r);
read += r;
}

Expand Down
81 changes: 81 additions & 0 deletions tests/Smdn/Smdn.IO.Streams.LineOriented/LineOrientedStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,24 @@ public void TestRead_BufferEmpty(StreamType type)
Assert.AreEqual(data, buffer);
}

#if SYSTEM_IO_STREAM_READASYNC_MEMORY_OF_BYTE
[TestCase(StreamType.Strict)]
[TestCase(StreamType.Loose)]
public async Task TestReadAsync_ToMemory_BufferEmpty(StreamType type)
{
var data = new byte[] {0x40, 0x41, Ascii.Octets.CR, Ascii.Octets.LF, 0x42, 0x43, 0x44, Ascii.Octets.CR, Ascii.Octets.LF, 0x45, 0x46, 0x47};
var stream = CreateStream(type, new MemoryStream(data), 8);

Memory<byte> buffer = new byte[12];

Assert.AreEqual(12L, await stream.ReadAsync(buffer));

Assert.AreEqual(12L, stream.Position, "Position");

Assert.That(buffer, Is.EqualTo(data.AsMemory()), nameof(buffer));
}
#endif

[TestCase(StreamType.Strict)]
[TestCase(StreamType.Loose)]
public void TestRead_LessThanBuffered(StreamType type)
Expand All @@ -272,6 +290,30 @@ public void TestRead_LessThanBuffered(StreamType type)
Assert.AreEqual(data.Slice(4, 4), buffer);
}

#if SYSTEM_IO_STREAM_READASYNC_MEMORY_OF_BYTE
[TestCase(StreamType.Strict)]
[TestCase(StreamType.Loose)]
public async Task TestReadAsync_ToMemory_LessThanBuffered(StreamType type)
{
var data = new byte[] {0x40, 0x41, Ascii.Octets.CR, Ascii.Octets.LF, 0x42, 0x43, 0x44, Ascii.Octets.CR, Ascii.Octets.LF, 0x45, 0x46, 0x47};
var stream = CreateStream(type, new MemoryStream(data), 16);

var line = stream.ReadLine(true);

Assert.AreEqual(4L, stream.Position, "Position");

Assert.AreEqual(data.Slice(0, 4), line);

Memory<byte> buffer = new byte[4];

Assert.AreEqual(4, await stream.ReadAsync(buffer));

Assert.AreEqual(8L, stream.Position, "Position");

Assert.That(buffer, Is.EqualTo(data.AsMemory(4, 4)), nameof(buffer));
}
#endif

[TestCase(StreamType.Strict)]
[TestCase(StreamType.Loose)]
public void TestRead_LongerThanBuffered(StreamType type)
Expand All @@ -294,6 +336,30 @@ public void TestRead_LongerThanBuffered(StreamType type)
Assert.AreEqual(data.Slice(4, 8), buffer.Slice(0, 8));
}

#if SYSTEM_IO_STREAM_READASYNC_MEMORY_OF_BYTE
[TestCase(StreamType.Strict)]
[TestCase(StreamType.Loose)]
public async Task TestReadAsync_ToMemory_LongerThanBuffered(StreamType type)
{
var data = new byte[] {0x40, 0x41, Ascii.Octets.CR, Ascii.Octets.LF, 0x42, 0x43, 0x44, Ascii.Octets.CR, Ascii.Octets.LF, 0x45, 0x46, 0x47};
var stream = CreateStream(type, new MemoryStream(data), 8);

var line = stream.ReadLine(true);

Assert.AreEqual(4L, stream.Position, "Position");

Assert.AreEqual(data.Slice(0, 4), line);

Memory<byte> buffer = new byte[10];

Assert.AreEqual(8, await stream.ReadAsync(buffer));

Assert.AreEqual(12L, stream.Position, "Position");

Assert.That(buffer.Slice(0, 8), Is.EqualTo(data.AsMemory(4, 8)), nameof(buffer));
}
#endif

[TestCase(StreamType.Strict)]
[TestCase(StreamType.Loose)]
public void TestReadToStream_LessThanBuffered(StreamType type)
Expand Down Expand Up @@ -368,6 +434,18 @@ public void TestReadAsync_LengthZero(StreamType type)
Assert.AreEqual(0L, stream.Position, "Position");
}

#if SYSTEM_IO_STREAM_READASYNC_MEMORY_OF_BYTE
[TestCase(StreamType.Strict)]
[TestCase(StreamType.Loose)]
public async Task TestReadAsync_ToMemory_LengthZero(StreamType type)
{
var stream = CreateStream(type, new MemoryStream(), 8);

Assert.AreEqual(0L, await stream.ReadAsync(Memory<byte>.Empty));
Assert.AreEqual(0L, stream.Position, "Position");
}
#endif

[TestCase(StreamType.Strict)]
[TestCase(StreamType.Loose)]
public void TestReadAsync_CancelledToken(StreamType type)
Expand Down Expand Up @@ -548,6 +626,9 @@ public void TestClose(StreamType type)
Assert.Throws<ObjectDisposedException>(() => stream.ReadByte());
Assert.Throws<ObjectDisposedException>(() => stream.Read(buffer, 0, 8));
Assert.Throws<ObjectDisposedException>(() => stream.ReadAsync(buffer, 0, 8));
#if SYSTEM_IO_STREAM_READASYNC_MEMORY_OF_BYTE
Assert.ThrowsAsync<ObjectDisposedException>(async () => await stream.ReadAsync(Memory<byte>.Empty));
#endif
Assert.Throws<ObjectDisposedException>(() => stream.Read(Stream.Null, 8));
Assert.Throws<ObjectDisposedException>(() => stream.ReadAsync(Stream.Null, 8));
Assert.Throws<ObjectDisposedException>(() => stream.Flush());
Expand Down

0 comments on commit 93e8860

Please sign in to comment.