Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keep MsQuicConnection alive when streams are pending #52800

Merged
merged 9 commits into from
Jun 10, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ internal static class MsQuicStatusCodes
internal static uint InternalError => OperatingSystem.IsWindows() ? Windows.InternalError : Posix.InternalError;
internal static uint InvalidState => OperatingSystem.IsWindows() ? Windows.InvalidState : Posix.InvalidState;
internal static uint HandshakeFailure => OperatingSystem.IsWindows() ? Windows.HandshakeFailure : Posix.HandshakeFailure;
internal static uint UserCanceled => OperatingSystem.IsWindows() ? Windows.UserCanceled : Posix.UserCanceled;

// TODO return better error messages here.
public static string GetError(uint status) => OperatingSystem.IsWindows() ? Windows.GetError(status) : Posix.GetError(status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ internal sealed class MsQuicConnection : QuicConnectionProvider

private readonly State _state = new State();
private GCHandle _stateHandle;
private bool _disposed;
private int _disposed;

private IPEndPoint? _localEndPoint;
private readonly EndPoint _remoteEndPoint;
Expand All @@ -53,6 +53,8 @@ internal sealed class State

public bool Connected;
public long AbortErrorCode = -1;
public int StreamCount;
wfurt marked this conversation as resolved.
Show resolved Hide resolved
private IntPtr _closingGCHandle;

// Queue for accepted streams.
// Backlog limit is managed by MsQuic so it can be unbounded here.
Expand All @@ -61,6 +63,61 @@ internal sealed class State
SingleReader = true,
SingleWriter = true,
});

public void RemoveStream(MsQuicStream stream)
{
bool closing;
lock (this)
{
StreamCount--;
wfurt marked this conversation as resolved.
Show resolved Hide resolved
closing = _closingGCHandle != IntPtr.Zero && StreamCount == 0;
}

if (closing)
{
Handle?.Dispose();
try
{
GCHandle gcHandle = GCHandle.FromIntPtr(_closingGCHandle);
wfurt marked this conversation as resolved.
Show resolved Hide resolved
Debug.Assert(gcHandle.IsAllocated);
if (gcHandle.IsAllocated) gcHandle.Free();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (gcHandle.IsAllocated) gcHandle.Free();
gcHandle.Free();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this generally safe? We have other places with identical check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsAllocated just checks whether the GCHandle is not IntPtr.Zero. _closingGCHandle != IntPtr.Zero above does that already, so it should not be needed here.

}
catch (Exception ex)
wfurt marked this conversation as resolved.
Show resolved Hide resolved
{
if (NetEventSource.Log.IsEnabled()) NetEventSource.Error(this, $"Failed to restore GCHandle from ptr: {ex.Message}");
}
}
}

public bool TryAddStream(SafeMsQuicStreamHandle streamHandle, QUIC_STREAM_OPEN_FLAGS flags)
{
lock (this)
{
if (_closingGCHandle != IntPtr.Zero)
{
return false;
}

var stream = new MsQuicStream(this, streamHandle, flags);
// once stream is created, it will call RemoveStream on disposal.
StreamCount++;
if (AcceptQueue.Writer.TryWrite(stream))
{
return true;
}
else
{
stream.Dispose();
return false;
}
}
}

// This is called under lock from connection dispose
public void SetClosing(GCHandle handle)
{
_closingGCHandle = GCHandle.ToIntPtr(handle);
}
}

// constructor for inbound connections
Expand Down Expand Up @@ -198,9 +255,13 @@ private static uint HandleEventShutdownComplete(State state, ref ConnectionEvent
private static uint HandleEventNewStream(State state, ref ConnectionEvent connectionEvent)
{
var streamHandle = new SafeMsQuicStreamHandle(connectionEvent.Data.PeerStreamStarted.Stream);
var stream = new MsQuicStream(state, streamHandle, connectionEvent.Data.PeerStreamStarted.Flags);
if (!state.TryAddStream(streamHandle, connectionEvent.Data.PeerStreamStarted.Flags))
{
// This will call StreamCloseDelegate and free the stream.
// We will return Success to the MsQuic to prevent double free.
streamHandle.Dispose();
}

state.AcceptQueue.Writer.TryWrite(stream);
return MsQuicStatusCodes.Success;
}

Expand Down Expand Up @@ -332,13 +393,25 @@ internal override async ValueTask<QuicStreamProvider> AcceptStreamAsync(Cancella
internal override QuicStreamProvider OpenUnidirectionalStream()
{
ThrowIfDisposed();
return new MsQuicStream(_state, QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL);
lock (_state)
{
var stream = new MsQuicStream(_state, QUIC_STREAM_OPEN_FLAGS.UNIDIRECTIONAL);
// once stream is created, it will call RemoveStream on disposal.
_state.StreamCount++;
wfurt marked this conversation as resolved.
Show resolved Hide resolved
return stream;
}
}

internal override QuicStreamProvider OpenBidirectionalStream()
{
ThrowIfDisposed();
return new MsQuicStream(_state, QUIC_STREAM_OPEN_FLAGS.NONE);
lock (_state)
{
var stream = new MsQuicStream(_state, QUIC_STREAM_OPEN_FLAGS.NONE);
// once stream is created, it will call RemoveStream on disposal.
_state.StreamCount++;
return stream;
}
}

internal override long GetRemoteAvailableUnidirectionalStreamCount()
Expand Down Expand Up @@ -488,16 +561,47 @@ public override void Dispose()
Dispose(false);
}

private async Task FlushAcceptQueue()
{
try {
// Writer may or may not be completed.
_state.AcceptQueue.Writer.Complete();
} catch { };
wfurt marked this conversation as resolved.
Show resolved Hide resolved

await foreach (MsQuicStream item in _state.AcceptQueue.Reader.ReadAllAsync())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await foreach (MsQuicStream item in _state.AcceptQueue.Reader.ReadAllAsync())
await foreach (MsQuicStream item in _state.AcceptQueue.Reader.ReadAllAsync().ConfigureAwait(false))

{
item.Dispose();
}
}

private void Dispose(bool disposing)
{
if (_disposed)
int disposed = Interlocked.Exchange(ref _disposed, 1);
if (disposed == 1)
{
return;
}
wfurt marked this conversation as resolved.
Show resolved Hide resolved

_state?.Handle?.Dispose();
if (_stateHandle.IsAllocated) _stateHandle.Free();
_disposed = true;
lock (_state)
{
_state.Connection = null;
if (_state.StreamCount == 0)
{
_state!.Handle?.Dispose();
if (_stateHandle.IsAllocated) _stateHandle.Free();
}
else
{
// normally, _state would be rooted by 'this' and we would hold _stateHandle to prevent
// GC from moving _state as we handed pointer to it by msquic. It was handed to msquic and
// we may try to get _state from it in NativeCallbackHandler()
// At this point we are being disposed either explicitly or by finalizer but we still have active streams.
// To prevent issue, the handle will be transferred to state and it will be released when last stream is gone
_state.SetClosing(_stateHandle);
}
}

FlushAcceptQueue().GetAwaiter().GetResult();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Connections are IAsyncDisposable; we should make dispose do proper await there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you suggest. We still need to flush for synchronous Dispose(). Can we do what ever needs to be done as follow-up? I would like to get this this in to get verifications on the crashes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine.

}

// TODO: this appears abortive and will cause prior successfully shutdown and closed streams to drop data.
Expand All @@ -511,7 +615,7 @@ internal override ValueTask CloseAsync(long errorCode, CancellationToken cancell

private void ThrowIfDisposed()
{
if (_disposed)
if (_disposed == 1)
wfurt marked this conversation as resolved.
Show resolved Hide resolved
{
throw new ObjectDisposedException(nameof(MsQuicStream));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ internal override async ValueTask<int> ReadAsync(Memory<byte> destination, Cance
{
shouldComplete = true;
}

state.ReadState = ReadState.Aborted;
}

Expand Down Expand Up @@ -555,6 +554,8 @@ private void Dispose(bool disposing)
Marshal.FreeHGlobal(_state.SendQuicBuffers);
if (_stateHandle.IsAllocated) _stateHandle.Free();
CleanupSendState(_state);
Debug.Assert(_state.ConnectionState != null);
_state.ConnectionState?.RemoveStream(this);

if (NetEventSource.Log.IsEnabled())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ await RunClientServer(
await (new[] { t1, t2 }).WhenAllOrAnyFailed(millisecondsTimeout: 1000000);
}

[ActiveIssue("/~https://github.com/dotnet/runtime/issues/52048")]
[Fact]
public async Task ManagedAVE_MinimalFailingTest()
{
Expand All @@ -369,6 +368,32 @@ async Task GetStreamIdWithoutStartWorks()
// TODO: stream that is opened by client but left unaccepted by server may cause AccessViolationException in its Finalizer
}

await GetStreamIdWithoutStartWorks().WaitAsync(TimeSpan.FromSeconds(15));

GC.Collect();
}

[Fact]
public async Task DisposingConnection_OK()
{
async Task GetStreamIdWithoutStartWorks()
{
using QuicListener listener = CreateQuicListener();
using QuicConnection clientConnection = CreateQuicConnection(listener.ListenEndPoint);

ValueTask clientTask = clientConnection.ConnectAsync();
using QuicConnection serverConnection = await listener.AcceptConnectionAsync();
await clientTask;

using QuicStream clientStream = clientConnection.OpenBidirectionalStream();
Assert.Equal(0, clientStream.StreamId);

// Dispose all connections before the streams;
clientConnection.Dispose();
serverConnection.Dispose();
listener.Dispose();
}

await GetStreamIdWithoutStartWorks();

GC.Collect();
Expand Down