Skip to content

Commit

Permalink
5074 Fix the WebSocket exception handling strategy
Browse files Browse the repository at this point in the history
The ConnectionBase exception handler was not set when `webSocket.exceptionHandler(...)` is called. This was leading `ConnectionBase` to consider the exception as uncaught and to print it, ignoring the exception handler set at the WebSocketImplBase level.

Moreover, this WebSocket handler was also called after the call to `ConnectionBase#handleException(...)`.

So I decided to stick on the `ConnectionBase` exception handler for the WebSocketImplBase.

The replacement of `Http1xConnectionBase conn` with `Http1xConnectionBase<?> conn` was needed otherwise the compiler complains.

Signed-off-by: Nils Renaud <renaud.nils@gmail.com>
  • Loading branch information
NilsRenaud authored and vietj committed Jan 24, 2024
1 parent 52b7e89 commit 9cfa7cb
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1229,18 +1229,13 @@ protected void handleIdle(IdleStateEvent event) {
}

@Override
protected void handleException(Throwable e) {
public void handleException(Throwable e) {
super.handleException(e);
WebSocketImpl ws;
LinkedHashSet<Stream> allStreams = new LinkedHashSet<>();
synchronized (this) {
ws = webSocket;
allStreams.addAll(requests);
allStreams.addAll(responses);
}
if (ws != null) {
ws.handleException(e);
}
for (Stream stream : allStreams) {
stream.handleException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ public Http1xConnectionBase exceptionHandler(Handler<Throwable> handler) {
return (Http1xConnectionBase) super.exceptionHandler(handler);
}

@Override
public void handleException(Throwable t) {
super.handleException(t);
}

@Override
public HttpConnection goAway(long errorCode, int lastStreamId, Buffer debugData) {
throw new UnsupportedOperationException("HTTP/1.x connections don't support GOAWAY");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,13 +521,11 @@ protected void handleClosed() {
}

@Override
protected void handleException(Throwable t) {
public void handleException(Throwable t) {
super.handleException(t);
Http1xServerRequest responseInProgress;
Http1xServerRequest requestInProgress;
ServerWebSocketImpl ws;
synchronized (this) {
ws = this.webSocket;
requestInProgress = this.requestInProgress;
responseInProgress = this.responseInProgress;
if (METRICS_ENABLED && metrics != null) {
Expand All @@ -540,9 +538,6 @@ protected void handleException(Throwable t) {
if (responseInProgress != null && responseInProgress != requestInProgress) {
responseInProgress.handleException(t);
}
if (ws != null) {
ws.context.execute(v -> ws.handleException(t));
}
}

@Override
Expand Down
22 changes: 4 additions & 18 deletions src/main/java/io/vertx/core/http/impl/WebSocketImplBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public abstract class WebSocketImplBase<S extends WebSocketBase> implements WebS
private FrameAggregator frameAggregator;
private Handler<Buffer> pongHandler;
private Handler<Void> drainHandler;
private Handler<Throwable> exceptionHandler;
private Handler<Void> closeHandler;
private Handler<Void> endHandler;
protected final Http1xConnectionBase conn;
Expand Down Expand Up @@ -503,13 +502,12 @@ protected void handleClose(boolean graceful) {
Handler<Throwable> exceptionHandler;
synchronized (conn) {
closeHandler = this.closeHandler;
exceptionHandler = this.exceptionHandler;
exceptionHandler = conn::handleException;
binaryConsumer = this.binaryHandlerRegistration;
textConsumer = this.textHandlerRegistration;
this.binaryHandlerRegistration = null;
this.textHandlerRegistration = null;
this.closeHandler = null;
this.exceptionHandler = null;
}
if (binaryConsumer != null) {
binaryConsumer.unregister();
Expand Down Expand Up @@ -747,14 +745,7 @@ void handleWritabilityChanged(boolean writable) {
}

void handleException(Throwable t) {
Handler<Throwable> handler;
synchronized (conn) {
handler = this.exceptionHandler;
if (handler == null) {
return;
}
}
context.dispatch(t, handler);
conn.handleException(t);
}

void handleConnectionClosed() {
Expand Down Expand Up @@ -814,13 +805,8 @@ private Handler<Void> endHandler() {

@Override
public S exceptionHandler(Handler<Throwable> handler) {
synchronized (conn) {
if (handler != null) {
checkClosed();
}
this.exceptionHandler = handler;
return (S) this;
}
conn.exceptionHandler(handler);
return (S) this;
}

@Override
Expand Down
55 changes: 53 additions & 2 deletions src/test/java/io/vertx/core/http/WebSocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class WebSocketTest extends VertxTestBase {

private static final String TEST_REASON = "I'm moving away!";
private static final short TEST_STATUS_CODE = (short)1001;
private static final short INVALID_STATUS_CODE = (short)1004;

private WebSocketClient client;
private HttpServer server;
Expand Down Expand Up @@ -3160,7 +3161,9 @@ public void testServerConnectionClose(int timeout) {
.webSocketHandler(ws -> {
long now = System.currentTimeMillis();
ws.endHandler(v -> fail());
ws.exceptionHandler(ignore -> complete());
ws.exceptionHandler(ignore -> {
complete();
});
ws.closeHandler(v -> {
long elapsed = System.currentTimeMillis() - now;
assertTrue(timeout <= elapsed && elapsed < 5000);
Expand Down Expand Up @@ -3889,4 +3892,52 @@ public void testConnect() throws Exception {

}));
await();
}}
}

@Test
public void testServerWebSocketExceptionHandlerIsCalled() {
waitFor(2);
server = vertx.createHttpServer(new HttpServerOptions())
.exceptionHandler(t -> fail())
.connectionHandler(connection -> connection.exceptionHandler(t -> fail()))
.webSocketHandler(ws -> {
ws.endHandler(v -> fail());
ws.closeHandler(v -> complete());
ws.exceptionHandler(t -> complete());
})
.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, onSuccess(s ->
vertx.createWebSocketClient()
.connect(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/")
.onSuccess(ws -> {
ws.close(INVALID_STATUS_CODE);
})));
await();
}

@Test
public void testClientWebSocketExceptionHandlerIsCalled() {
waitFor(3);
server = vertx.createHttpServer(new HttpServerOptions())
.webSocketHandler(ws -> ws.close(INVALID_STATUS_CODE))
.listen(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, onSuccess(s ->
vertx.httpClientBuilder()
.withConnectHandler(conn -> conn.exceptionHandler(t -> fail()))
.build()
.webSocket(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/")
.onSuccess(ws -> {
ws.exceptionHandler(t -> {
// This exception handler is called 2 times:
// - From VertxHandler#exceptionCaught > Http1xClientConnection#handleException
// with io.netty.handler.codec.http.websocketx.CorruptedWebSocketFrameException: Invalid close frame getStatus code: 1004
// - From VertxHandler#channelInactive > Http1xClientConnection#handleClosed
// with io.vertx.core.http.HttpClosedException: Connection was closed
// Both are technically true though.
complete();
});
ws.endHandler(v -> fail());
ws.closeHandler(v -> complete());
})));
await();
}

}

0 comments on commit 9cfa7cb

Please sign in to comment.