Skip to content

Commit

Permalink
Merge pull request #2278 from ballerina-platform/idle-eviction-10.x
Browse files Browse the repository at this point in the history
[2201.10.x] Implement HTTP/2 IDLE based eviction
  • Loading branch information
TharmiganK authored Feb 18, 2025
2 parents cad8c96 + 0b6a077 commit 269a245
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 14 deletions.
6 changes: 6 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ This file contains all the notable changes done to the Ballerina HTTP package th
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## Added

- [Add idle based eviction for HTTP/2 connections](/~https://github.com/ballerina-platform/ballerina-library/issues/7309)

## [2.12.6] - 2025-02-14

### Changed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* {@code Http2ClientChannel} encapsulates the Channel associated with a particular connection.
Expand All @@ -62,6 +63,7 @@ public class Http2ClientChannel {
private Map<String, Http2DataEventListener> dataEventListeners;
private StreamCloseListener streamCloseListener;
private long timeSinceMarkedAsStale = 0;
private AtomicLong timeSinceMarkedAsIdle = new AtomicLong(0);
private AtomicBoolean isStale = new AtomicBoolean(false);

public Http2ClientChannel(Http2ConnectionManager http2ConnectionManager, Http2Connection connection,
Expand Down Expand Up @@ -293,6 +295,7 @@ private class StreamCloseListener extends Http2EventAdapter {
public void onStreamClosed(Http2Stream stream) {
// Channel is no longer exhausted, so we can return it back to the pool
http2ClientChannel.removeInFlightMessage(stream.id());
http2ConnectionManager.markClientChannelAsIdle(http2ClientChannel);
activeStreams.decrementAndGet();
http2ClientChannel.getDataEventListeners().
forEach(dataEventListener -> dataEventListener.onStreamClose(stream.id()));
Expand Down Expand Up @@ -349,4 +352,16 @@ void setTimeSinceMarkedAsStale(long timeSinceMarkedAsStale) {
long getTimeSinceMarkedAsStale() {
return timeSinceMarkedAsStale;
}

void setTimeSinceMarkedAsIdle(long timeSinceMarkedAsIdle) {
this.timeSinceMarkedAsIdle.set(timeSinceMarkedAsIdle);
}

long getTimeSinceMarkedAsIdle() {
return timeSinceMarkedAsIdle.get();
}

HttpRoute getHttpRoute() {
return httpRoute;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ public class Http2ConnectionManager {

private final Http2ChannelPool http2ChannelPool = new Http2ChannelPool();
private final BlockingQueue<Http2ClientChannel> http2StaleClientChannels = new LinkedBlockingQueue<>();
private final BlockingQueue<Http2ClientChannel> http2ClientChannels = new LinkedBlockingQueue<>();
private final PoolConfiguration poolConfiguration;

public Http2ConnectionManager(PoolConfiguration poolConfiguration) {
this.poolConfiguration = poolConfiguration;
initiateConnectionEvictionTask();
initiateStaleConnectionEvictionTask();
initiateIdleConnectionEvictionTask();
}

/**
Expand Down Expand Up @@ -171,7 +173,20 @@ void removeClosedChannelFromStalePool(Http2ClientChannel http2ClientChannel) {
}
}

private void initiateConnectionEvictionTask() {
void removeClosedChannelFromIdlePool(Http2ClientChannel http2ClientChannel) {
if (!http2ClientChannels.remove(http2ClientChannel)) {
logger.warn("Specified channel does not exist in the HTTP2 client channel list.");
}
}

void markClientChannelAsIdle(Http2ClientChannel http2ClientChannel) {
http2ClientChannel.setTimeSinceMarkedAsIdle(System.currentTimeMillis());
if (!http2ClientChannels.contains(http2ClientChannel)) {
http2ClientChannels.add(http2ClientChannel);
}
}

private void initiateStaleConnectionEvictionTask() {
Timer timer = new Timer(true);
TimerTask timerTask = new TimerTask() {
@Override
Expand All @@ -183,27 +198,49 @@ public void run() {
}
} else if ((System.currentTimeMillis() - http2ClientChannel.getTimeSinceMarkedAsStale()) >
poolConfiguration.getMinIdleTimeInStaleState()) {
http2ClientChannel.getInFlightMessages().forEach((streamId, outboundMsgHolder) -> {
Http2MessageStateContext messageStateContext =
outboundMsgHolder.getRequest().getHttp2MessageStateContext();
if (messageStateContext != null) {
messageStateContext.getSenderState().handleConnectionClose(outboundMsgHolder);
}
});
closeInFlightRequests(http2ClientChannel);
closeChannelAndEvict(http2ClientChannel);
}
});
}

public void closeChannelAndEvict(Http2ClientChannel http2ClientChannel) {
removeClosedChannelFromStalePool(http2ClientChannel);
http2ClientChannel.getConnection().close(http2ClientChannel.getChannel().newPromise());
}
};
timer.schedule(timerTask, poolConfiguration.getTimeBetweenStaleEviction(),
poolConfiguration.getTimeBetweenStaleEviction());
}

private void initiateIdleConnectionEvictionTask() {
Timer timer = new Timer(true);
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
http2ClientChannels.forEach(http2ClientChannel -> {
if (poolConfiguration.getMinEvictableIdleTime() == -1) {
if (!http2ClientChannel.hasInFlightMessages()) {
closeChannelAndEvict(http2ClientChannel);
}
} else if ((System.currentTimeMillis() - http2ClientChannel.getTimeSinceMarkedAsIdle()) >
poolConfiguration.getMinEvictableIdleTime()) {
closeInFlightRequests(http2ClientChannel);
removeClientChannel(http2ClientChannel.getHttpRoute(), http2ClientChannel);
closeChannelAndEvict(http2ClientChannel);
}
});
}
};
timer.schedule(timerTask, poolConfiguration.getTimeBetweenEvictionRuns(),
poolConfiguration.getTimeBetweenEvictionRuns());
}

private static void closeInFlightRequests(Http2ClientChannel http2ClientChannel) {
http2ClientChannel.getInFlightMessages().forEach((streamId, outboundMsgHolder) -> {
Http2MessageStateContext messageStateContext =
outboundMsgHolder.getRequest().getHttp2MessageStateContext();
if (messageStateContext != null) {
messageStateContext.getSenderState().handleConnectionClose(outboundMsgHolder);
}
});
}

private Http2ChannelPool.PerRouteConnectionPool fetchPerRoutePool(HttpRoute httpRoute) {
String key = generateKey(httpRoute);
return this.http2ChannelPool.fetchPerRoutePool(key);
Expand All @@ -213,4 +250,9 @@ private String generateKey(HttpRoute httpRoute) {
return httpRoute.getScheme() + ":" + httpRoute.getHost() + ":" + httpRoute.getPort() + ":" +
httpRoute.getConfigHash();
}

private void closeChannelAndEvict(Http2ClientChannel http2ClientChannel) {
removeClosedChannelFromIdlePool(http2ClientChannel);
http2ClientChannel.getConnection().close(http2ClientChannel.getChannel().newPromise());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Copyright (c) 2025, WSO2 LLC. (http://www.wso2.com).
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.ballerina.stdlib.http.transport.http2.connectionpool;

import io.ballerina.stdlib.http.transport.contract.Constants;
import io.ballerina.stdlib.http.transport.contract.HttpClientConnector;
import io.ballerina.stdlib.http.transport.contract.HttpConnectorListener;
import io.ballerina.stdlib.http.transport.contract.HttpWsConnectorFactory;
import io.ballerina.stdlib.http.transport.contract.ServerConnector;
import io.ballerina.stdlib.http.transport.contract.ServerConnectorFuture;
import io.ballerina.stdlib.http.transport.contract.config.ListenerConfiguration;
import io.ballerina.stdlib.http.transport.contract.config.SenderConfiguration;
import io.ballerina.stdlib.http.transport.contract.config.ServerBootstrapConfiguration;
import io.ballerina.stdlib.http.transport.contract.config.TransportsConfiguration;
import io.ballerina.stdlib.http.transport.contract.exceptions.ServerConnectorException;
import io.ballerina.stdlib.http.transport.contractimpl.DefaultHttpWsConnectorFactory;
import io.ballerina.stdlib.http.transport.contractimpl.listener.http2.Http2SourceHandler;
import io.ballerina.stdlib.http.transport.message.HttpCarbonMessage;
import io.ballerina.stdlib.http.transport.message.HttpCarbonResponse;
import io.ballerina.stdlib.http.transport.message.HttpConnectorUtil;
import io.ballerina.stdlib.http.transport.message.HttpMessageDataStreamer;
import io.ballerina.stdlib.http.transport.util.TestUtil;
import io.ballerina.stdlib.http.transport.util.client.http2.MessageGenerator;
import io.ballerina.stdlib.http.transport.util.client.http2.MessageSender;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static io.ballerina.stdlib.http.transport.contract.Constants.CHNL_HNDLR_CTX;
import static io.ballerina.stdlib.http.transport.contract.Constants.HTTP_2_0;
import static io.ballerina.stdlib.http.transport.util.TestUtil.HTTP_SCHEME;
import static io.ballerina.stdlib.http.transport.util.TestUtil.SERVER_CONNECTOR_PORT;
import static org.testng.Assert.assertNotNull;

public class ConnectionPoolEvictionTest {

private static final Logger LOG = LoggerFactory.getLogger(ConnectionPoolEvictionTest.class);

private HttpWsConnectorFactory httpWsConnectorFactory;
private ServerConnector serverConnector;

@BeforeClass
public void setup() {
httpWsConnectorFactory = new DefaultHttpWsConnectorFactory();
ListenerConfiguration listenerConfiguration = new ListenerConfiguration();
listenerConfiguration.setPort(SERVER_CONNECTOR_PORT);
listenerConfiguration.setScheme(Constants.HTTP_SCHEME);
listenerConfiguration.setVersion(Constants.HTTP_2_0);
serverConnector = httpWsConnectorFactory
.createServerConnector(new ServerBootstrapConfiguration(new HashMap<>()), listenerConfiguration);
ServerConnectorFuture serverConnectorFuture = serverConnector.start();
TransportsConfiguration transportsConfiguration = new TransportsConfiguration();
SenderConfiguration h2cSenderConfiguration = HttpConnectorUtil.getSenderConfiguration(transportsConfiguration,
Constants.HTTP_SCHEME);
h2cSenderConfiguration.setHttpVersion(Constants.HTTP_2_0);
serverConnectorFuture.setHttpConnectorListener(new Listener());
try {
serverConnectorFuture.sync();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for server connector to start");
}
}

@Test
public void testConnectionEvictionWithUpgrade() throws InterruptedException {
TransportsConfiguration transportsConfiguration = new TransportsConfiguration();
SenderConfiguration senderConfiguration = HttpConnectorUtil.getSenderConfiguration(transportsConfiguration,
Constants.HTTP_SCHEME);
senderConfiguration.getPoolConfiguration().setMinEvictableIdleTime(2000);
senderConfiguration.getPoolConfiguration().setTimeBetweenEvictionRuns(1000);
senderConfiguration.setHttpVersion(HTTP_2_0);
HttpClientConnector client = httpWsConnectorFactory.createHttpClientConnector(
HttpConnectorUtil.getTransportProperties(transportsConfiguration), senderConfiguration);
String firstId = getResponse(client);

Thread.sleep(500);
String secondId = getResponse(client);
Assert.assertEquals(firstId, secondId);

Thread.sleep(5000);
String thirdId = getResponse(client);
Assert.assertNotEquals(firstId, thirdId);
}

@Test
public void testConnectionEvictionWithPriorKnowledge() throws InterruptedException {
TransportsConfiguration transportsConfiguration = new TransportsConfiguration();
SenderConfiguration senderConfiguration = HttpConnectorUtil.getSenderConfiguration(transportsConfiguration,
Constants.HTTP_SCHEME);
senderConfiguration.getPoolConfiguration().setMinEvictableIdleTime(2000);
senderConfiguration.getPoolConfiguration().setTimeBetweenEvictionRuns(1000);
senderConfiguration.setHttpVersion(HTTP_2_0);
senderConfiguration.setForceHttp2(true);
HttpClientConnector client = httpWsConnectorFactory.createHttpClientConnector(
HttpConnectorUtil.getTransportProperties(transportsConfiguration), senderConfiguration);
String firstId = getResponse(client);

Thread.sleep(500);
String secondId = getResponse(client);
Assert.assertEquals(firstId, secondId);

Thread.sleep(5000);
String thirdId = getResponse(client);
Assert.assertNotEquals(firstId, thirdId);
}

private String getResponse(HttpClientConnector client) {
HttpCarbonMessage httpCarbonMessage = MessageGenerator.generateRequest(HttpMethod.GET, null,
SERVER_CONNECTOR_PORT, HTTP_SCHEME);
HttpCarbonMessage response = new MessageSender(client).sendMessage(httpCarbonMessage);
assertNotNull(response);
return TestUtil.getStringFromInputStream(new HttpMessageDataStreamer(response).getInputStream());
}

@AfterClass
public void cleanUp() throws ServerConnectorException {
try {
serverConnector.stop();
httpWsConnectorFactory.shutdown();
} catch (Exception e) {
LOG.warn("Resource clean up is interrupted", e);
}
}

static class Listener implements HttpConnectorListener {
private final ExecutorService executor = Executors.newSingleThreadExecutor();

@Override
public void onMessage(HttpCarbonMessage httpRequest) {
executor.execute(() -> {
try {
HttpVersion httpVersion = new HttpVersion(Constants.HTTP_VERSION_2_0, true);
HttpCarbonMessage httpResponse = new HttpCarbonResponse(new DefaultHttpResponse(httpVersion,
HttpResponseStatus.OK));
httpResponse.setHeader(HttpHeaderNames.CONTENT_TYPE.toString(), Constants.TEXT_PLAIN);
httpResponse.setHttpStatusCode(HttpResponseStatus.OK.code());
String id = ((Http2SourceHandler) ((ChannelHandlerContext) httpRequest
.getProperty(CHNL_HNDLR_CTX)).handler()).getChannelHandlerContext()
.channel().id().asLongText();

do {
HttpContent httpContent = httpRequest.getHttpContent();
if (httpContent instanceof LastHttpContent) {
break;
}
} while (true);

HttpContent httpContent = new DefaultLastHttpContent(Unpooled.wrappedBuffer(id.getBytes()));
httpResponse.addHttpContent(httpContent);
httpRequest.respond(httpResponse);
} catch (ServerConnectorException e) {
LOG.error("Error occurred during message notification: {}", e.getMessage());
}
});
}

@Override
public void onError(Throwable throwable) {}
}
}
1 change: 1 addition & 0 deletions native/src/test/resources/testng.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
<class name="io.ballerina.stdlib.http.transport.http2.clienttimeout.TimeoutDuringResponseReceive"/>
<class name="io.ballerina.stdlib.http.transport.http2.Http2WithHttp2ResetContent"/>
<class name="io.ballerina.stdlib.http.transport.http2.Http2WithPriorKnowledgeTestCase"/>
<class name="io.ballerina.stdlib.http.transport.http2.connectionpool.ConnectionPoolEvictionTest"/>
<class name="io.ballerina.stdlib.http.transport.http2.connectionpool.H2ConnectionPoolWithALPN"/>
<class name="io.ballerina.stdlib.http.transport.http2.connectionpool.H2ConnectionPoolWithPriorKnowledge"/>
<class name="io.ballerina.stdlib.http.transport.http2.connectionpool.H2ConnectionPoolWithUpgrade"/>
Expand Down

0 comments on commit 269a245

Please sign in to comment.