Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove some IAsyncResult implementations from System.Net.Mail #82644

Merged
merged 1 commit into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 60 additions & 175 deletions src/libraries/System.Net.Mail/src/System/Net/Base64Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
using System.IO;
using System.Net.Mime;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace System.Net
{
Expand Down Expand Up @@ -61,24 +63,11 @@ internal WriteStateInfoBase WriteState
}
}

public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
{
ValidateBufferArguments(buffer, offset, count);

var result = new ReadAsyncResult(this, buffer, offset, count, callback, state);
result.Read();
return result;
}

public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
{
ValidateBufferArguments(buffer, offset, count);

var result = new WriteAsyncResult(this, buffer, offset, count, callback, state);
result.Write();
return result;
}
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
TaskToAsyncResult.Begin(ReadAsync(buffer, offset, count, CancellationToken.None), callback, state);

public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
TaskToAsyncResult.Begin(WriteAsync(buffer, offset, count, CancellationToken.None), callback, state);

public override void Close()
{
Expand Down Expand Up @@ -156,28 +145,31 @@ internal int EncodeBytes(byte[] buffer, int offset, int count, bool dontDeferFin

public string GetEncodedString() => _encoder.GetEncodedString();

public override int EndRead(IAsyncResult asyncResult)
{
ArgumentNullException.ThrowIfNull(asyncResult);
public override int EndRead(IAsyncResult asyncResult) =>
TaskToAsyncResult.End<int>(asyncResult);

return ReadAsyncResult.End(asyncResult);
}
public override void EndWrite(IAsyncResult asyncResult) =>
TaskToAsyncResult.End(asyncResult);

public override void EndWrite(IAsyncResult asyncResult)
public override void Flush()
{
ArgumentNullException.ThrowIfNull(asyncResult);
if (_writeState != null && WriteState.Length > 0)
{
FlushInternal();
}

WriteAsyncResult.End(asyncResult);
base.Flush();
}

public override void Flush()
public override async Task FlushAsync(CancellationToken cancellationToken)
{
if (_writeState != null && WriteState.Length > 0)
{
FlushInternal();
await base.WriteAsync(WriteState.Buffer.AsMemory(0, WriteState.Length), cancellationToken).ConfigureAwait(false);
WriteState.Reset();
}

base.Flush();
await base.FlushAsync(cancellationToken).ConfigureAwait(false);
}

private void FlushInternal()
Expand Down Expand Up @@ -212,6 +204,36 @@ public override int Read(byte[] buffer, int offset, int count)
}
}
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ValidateBufferArguments(buffer, offset, count);
return ReadAsyncCore(buffer, offset, count, cancellationToken);

async Task<int> ReadAsyncCore(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
while (true)
{
// read data from the underlying stream
int read = await base.ReadAsync(buffer.AsMemory(offset, count), cancellationToken).ConfigureAwait(false);

// if the underlying stream returns 0 then there
// is no more data - ust return 0.
if (read == 0)
{
return 0;
}

// while decoding, we may end up not having
// any bytes to return pending additional data
// from the underlying stream.
read = DecodeBytes(buffer, offset, read);
if (read > 0)
{
return read;
}
}
}
}

public override void Write(byte[] buffer, int offset, int count)
{
Expand All @@ -235,167 +257,30 @@ public override void Write(byte[] buffer, int offset, int count)
}
}

private sealed class ReadAsyncResult : LazyAsyncResult
{
private readonly Base64Stream _parent;
private readonly byte[] _buffer;
private readonly int _offset;
private readonly int _count;
private int _read;

private static readonly AsyncCallback s_onRead = OnRead;

internal ReadAsyncResult(Base64Stream parent, byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) : base(null, state, callback)
{
_parent = parent;
_buffer = buffer;
_offset = offset;
_count = count;
}

private bool CompleteRead(IAsyncResult result)
{
_read = _parent.BaseStream.EndRead(result);

// if the underlying stream returns 0 then there
// is no more data - ust return 0.
if (_read == 0)
{
InvokeCallback();
return true;
}

// while decoding, we may end up not having
// any bytes to return pending additional data
// from the underlying stream.
_read = _parent.DecodeBytes(_buffer, _offset, _read);
if (_read > 0)
{
InvokeCallback();
return true;
}

return false;
}

internal void Read()
{
while (true)
{
IAsyncResult result = _parent.BaseStream.BeginRead(_buffer, _offset, _count, s_onRead, this);
if (!result.CompletedSynchronously || CompleteRead(result))
{
break;
}
}
}

private static void OnRead(IAsyncResult result)
{
if (!result.CompletedSynchronously)
{
ReadAsyncResult thisPtr = (ReadAsyncResult)result.AsyncState!;
try
{
if (!thisPtr.CompleteRead(result))
{
thisPtr.Read();
}
}
catch (Exception e)
{
if (thisPtr.IsCompleted)
{
throw;
}
thisPtr.InvokeCallback(e);
}
}
}

internal static int End(IAsyncResult result)
{
ReadAsyncResult thisPtr = (ReadAsyncResult)result;
thisPtr.InternalWaitForCompletion();
return thisPtr._read;
}
}

private sealed class WriteAsyncResult : LazyAsyncResult
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
private static readonly AsyncCallback s_onWrite = OnWrite;

private readonly Base64Stream _parent;
private readonly byte[] _buffer;
private readonly int _offset;
private readonly int _count;
private int _written;
ValidateBufferArguments(buffer, offset, count);
return WriteAsyncCore(buffer, offset, count, cancellationToken);

internal WriteAsyncResult(Base64Stream parent, byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) : base(null, state, callback)
async Task WriteAsyncCore(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
_parent = parent;
_buffer = buffer;
_offset = offset;
_count = count;
}
int written = 0;

internal void Write()
{
// do not append a space when writing from a stream since this means
// it's writing the email body
while (true)
{
// do not append a space when writing from a stream since this means
// it's writing the email body
_written += _parent.EncodeBytes(_buffer, _offset + _written, _count - _written, false, false);
if (_written < _count)
written += EncodeBytes(buffer, offset + written, count - written, false, false);
if (written < count)
{
IAsyncResult result = _parent.BaseStream.BeginWrite(_parent.WriteState.Buffer, 0, _parent.WriteState.Length, s_onWrite, this);
if (!result.CompletedSynchronously)
{
break;
}
CompleteWrite(result);
await FlushAsync(cancellationToken).ConfigureAwait(false);
}
else
{
InvokeCallback();
break;
}
}
}

private void CompleteWrite(IAsyncResult result)
{
_parent.BaseStream.EndWrite(result);
_parent.WriteState.Reset();
}

private static void OnWrite(IAsyncResult result)
{
if (!result.CompletedSynchronously)
{
WriteAsyncResult thisPtr = (WriteAsyncResult)result.AsyncState!;
try
{
thisPtr.CompleteWrite(result);
thisPtr.Write();
}
catch (Exception e)
{
if (thisPtr.IsCompleted)
{
throw;
}
thisPtr.InvokeCallback(e);
}
}
}

internal static void End(IAsyncResult result)
{
WriteAsyncResult thisPtr = (WriteAsyncResult)result;
thisPtr.InternalWaitForCompletion();
Debug.Assert(thisPtr._written == thisPtr._count);
}
}

private sealed class ReadStateInfo
Expand Down
78 changes: 4 additions & 74 deletions src/libraries/System.Net.Mail/src/System/Net/BufferedReadStream.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -40,18 +39,11 @@ public override bool CanSeek
}
}

public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
{
ReadAsyncResult result = new ReadAsyncResult(this, callback, state);
result.Read(buffer, offset, count);
return result;
}
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
TaskToAsyncResult.Begin(ReadAsync(buffer, offset, count, CancellationToken.None), callback, state);

public override int EndRead(IAsyncResult asyncResult)
{
int read = ReadAsyncResult.End(asyncResult);
return read;
}
public override int EndRead(IAsyncResult asyncResult) =>
TaskToAsyncResult.End<int>(asyncResult);

public override int Read(byte[] buffer, int offset, int count)
{
Expand Down Expand Up @@ -156,67 +148,5 @@ internal void Push(byte[] buffer, int offset, int count)

Buffer.BlockCopy(buffer, offset, _storedBuffer!, _storedOffset, count);
}

private sealed class ReadAsyncResult : LazyAsyncResult
{
private readonly BufferedReadStream _parent;
private int _read;
private static readonly AsyncCallback s_onRead = new AsyncCallback(OnRead);

internal ReadAsyncResult(BufferedReadStream parent, AsyncCallback? callback, object? state) : base(null, state, callback)
{
_parent = parent;
}

internal void Read(byte[] buffer, int offset, int count)
{
if (_parent._storedOffset < _parent._storedLength)
{
_read = Math.Min(count, _parent._storedLength - _parent._storedOffset);
Buffer.BlockCopy(_parent._storedBuffer!, _parent._storedOffset, buffer, offset, _read);
_parent._storedOffset += _read;
if (_read == count || !_parent._readMore)
{
InvokeCallback();
return;
}

count -= _read;
offset += _read;
}
IAsyncResult result = _parent.BaseStream.BeginRead(buffer, offset, count, s_onRead, this);
if (result.CompletedSynchronously)
{
_read += _parent.BaseStream.EndRead(result);
InvokeCallback();
}
}

internal static int End(IAsyncResult result)
{
ReadAsyncResult thisPtr = (ReadAsyncResult)result;
thisPtr.InternalWaitForCompletion();
return thisPtr._read;
}

private static void OnRead(IAsyncResult result)
{
if (!result.CompletedSynchronously)
{
ReadAsyncResult thisPtr = (ReadAsyncResult)result.AsyncState!;
try
{
thisPtr._read += thisPtr._parent.BaseStream.EndRead(result);
thisPtr.InvokeCallback();
}
catch (Exception e)
{
if (thisPtr.IsCompleted)
throw;
thisPtr.InvokeCallback(e);
}
}
}
}
}
}
Loading