diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index a69410c8f..7ef4366e2 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -39,11 +39,14 @@ import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.client.config.ClickHouseDefaults; import com.clickhouse.client.http.ClickHouseHttpProto; +import com.clickhouse.client.http.config.ClickHouseHttpOption; import com.clickhouse.data.ClickHouseColumn; import com.clickhouse.data.ClickHouseFormat; import com.clickhouse.data.format.BinaryStreamUtils; +import org.apache.hc.client5.http.ConnectTimeoutException; import org.apache.hc.core5.concurrent.DefaultThreadFactory; import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ConnectionRequestTimeoutException; import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.NoHttpResponseException; import org.slf4j.Logger; @@ -220,17 +223,17 @@ public Builder() { public Builder addEndpoint(String endpoint) { try { URL endpointURL = new java.net.URL(endpoint); - if (!(endpointURL.getProtocol().equalsIgnoreCase("https") || - endpointURL.getProtocol().equalsIgnoreCase("http"))) { + + if (endpointURL.getProtocol().equalsIgnoreCase("https")) { + addEndpoint(Protocol.HTTP, endpointURL.getHost(), endpointURL.getPort(), true); + } else if (endpointURL.getProtocol().equalsIgnoreCase("http")) { + addEndpoint(Protocol.HTTP, endpointURL.getHost(), endpointURL.getPort(), false); + } else { throw new IllegalArgumentException("Only HTTP and HTTPS protocols are supported"); } } catch (java.net.MalformedURLException e) { throw new IllegalArgumentException("Endpoint should be a valid URL string", e); } - if (endpoint.startsWith("https://")) { - this.configuration.put(ClickHouseClientOption.SSL.getKey(), "true"); - } - this.endpoints.add(endpoint); return this; } @@ -252,7 +255,7 @@ public Builder addEndpoint(Protocol protocol, String host, int port, boolean sec this.configuration.put(ClickHouseClientOption.SSL.getKey(), "true"); } String endpoint = String.format("%s%s://%s:%d", protocol.toString().toLowerCase(), secure ? "s": "", host, port); - this.addEndpoint(endpoint); + this.endpoints.add(endpoint); return this; } @@ -302,7 +305,15 @@ public Builder setAccessToken(String accessToken) { return this; } - // SOCKET SETTINGS + /** + * Configures client to use build-in connection pool + * @param enable - if connection pool should be enabled + * @return + */ + public Builder enableConnectionPool(boolean enable) { + this.configuration.put("connection_pool_enabled", String.valueOf(enable)); + return this; + } /** * Default connection timeout in milliseconds. Timeout is applied to establish a connection. @@ -324,6 +335,72 @@ public Builder setConnectTimeout(long timeout, ChronoUnit unit) { return this.setConnectTimeout(Duration.of(timeout, unit).toMillis()); } + /** + * Set timeout for waiting a free connection from a pool when all connections are leased. + * This configuration is important when need to fail fast in high concurrent scenarios. + * Default is 10 s. + * @param timeout - connection timeout in milliseconds + * @param unit - time unit + */ + public Builder setConnectionRequestTimeout(long timeout, ChronoUnit unit) { + this.configuration.put("connection_request_timeout", String.valueOf(Duration.of(timeout, unit).toMillis())); + return this; + } + + /** + * Sets the maximum number of connections that can be opened at the same time to a single server. Limit is not + * a hard stop. It is done to prevent threads stuck inside a connection pool waiting for a connection. + * Default is 10. It is recommended to set a higher value for a high concurrent applications. It will let + * more threads to get a connection and execute a query. + * + * @param maxConnections - maximum number of connections + */ + public Builder setMaxConnections(int maxConnections) { + this.configuration.put(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey(), String.valueOf(maxConnections)); + return this; + } + + /** + * Sets how long any connection would be considered as active and able for a lease. + * After this time connection will be marked for sweep and will not be returned from a pool. + * Has more effect than keep-alive timeout. + * @param timeout - time in unit + * @param unit - time unit + * @return + */ + public Builder setConnectionTTL(long timeout, ChronoUnit unit) { + this.configuration.put(ClickHouseClientOption.CONNECTION_TTL.getKey(), String.valueOf(Duration.of(timeout, unit).toMillis())); + return this; + } + + /** + * Sets keep alive timeout for a connection to override server value. If set to -1 then server value will be used. + * Default is -1. + * Doesn't override connection TTL value. + * {@see Client#setConnectionTTL} + * @param timeout - time in unit + * @param unit - time unit + * @return + */ + public Builder setKeepAliveTimeout(long timeout, ChronoUnit unit) { + this.configuration.put(ClickHouseHttpOption.KEEP_ALIVE_TIMEOUT.getKey(), String.valueOf(Duration.of(timeout, unit).toMillis())); + return this; + } + + /** + * Sets strategy of how connections are reuse. + * Default is {@link ConnectionReuseStrategy#FIFO} to evenly distribute load between them. + * + * @param strategy - strategy for connection reuse + * @return + */ + public Builder setConnectionReuseStrategy(ConnectionReuseStrategy strategy) { + this.configuration.put("connection_reuse_strategy", strategy.name()); + return this; + } + + // SOCKET SETTINGS + /** * Default socket timeout in milliseconds. Timeout is applied to read and write operations. * @@ -485,8 +562,8 @@ public Builder setProxyCredentials(String user, String pass) { * @param timeUnit * @return */ - public Builder setExecutionTimeout(long timeout, TimeUnit timeUnit) { - this.configuration.put(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey(), String.valueOf(timeUnit.toMillis(timeout))); + public Builder setExecutionTimeout(long timeout, ChronoUnit timeUnit) { + this.configuration.put(ClickHouseClientOption.MAX_EXECUTION_TIME.getKey(), String.valueOf(Duration.of(timeout, timeUnit).toMillis())); return this; } @@ -719,6 +796,26 @@ private Map setDefaults(Map userConfig) { userConfig.put(ClickHouseClientOption.ASYNC.getKey(), "false"); } + if (!userConfig.containsKey(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey())) { + userConfig.put(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey(), "10"); + } + + if (!userConfig.containsKey("connection_request_timeout")) { + userConfig.put("connection_request_timeout", "10000"); + } + + if (!userConfig.containsKey("connection_reuse_strategy")) { + userConfig.put("connection_reuse_strategy", ConnectionReuseStrategy.FIFO.name()); + } + + if (!userConfig.containsKey("connection_pool_enabled")) { + userConfig.put("connection_pool_enabled", "true"); + } + + if (!userConfig.containsKey("connection_ttl")) { + userConfig.put("connection_ttl", "-1"); + } + return userConfig; } } @@ -1212,6 +1309,8 @@ public CompletableFuture query(String sqlQuery, Map configuration) { this.httpClient = createHttpClient(); RequestConfig.Builder reqConfBuilder = RequestConfig.custom(); - MapUtils.applyLong(chConfiguration, ClickHouseClientOption.CONNECTION_TIMEOUT.getKey(), - (t) -> reqConfBuilder.setConnectionRequestTimeout(t, TimeUnit.MILLISECONDS)); + MapUtils.applyLong(chConfiguration, "connection_request_timeout", + (t) -> reqConfBuilder + .setConnectionRequestTimeout(t, TimeUnit.MILLISECONDS)); this.baseRequestConfig = reqConfBuilder.build(); @@ -95,29 +90,19 @@ public HttpAPIClientHelper(Map configuration) { LOG.info("client compression: {}, server compression: {}, http compression: {}", usingClientCompression, usingServerCompression, useHttpCompression); } - public CloseableHttpClient createHttpClient() { - - // Top Level builders - HttpClientBuilder clientBuilder = HttpClientBuilder.create(); - - - // Socket configuration - SocketConfig.Builder soCfgBuilder = SocketConfig.custom(); - MapUtils.applyInt(chConfiguration, ClickHouseClientOption.SOCKET_TIMEOUT.getKey(), - (t) -> soCfgBuilder.setSoTimeout(t, TimeUnit.MILLISECONDS)); - MapUtils.applyInt(chConfiguration, ClickHouseClientOption.SOCKET_RCVBUF.getKey(), - soCfgBuilder::setRcvBufSize); - MapUtils.applyInt(chConfiguration, ClickHouseClientOption.SOCKET_SNDBUF.getKey(), - soCfgBuilder::setSndBufSize); - - - // Connection manager - PoolingHttpClientConnectionManagerBuilder connMgrBuilder = PoolingHttpClientConnectionManagerBuilder.create(); - + /** + * Creates or returns default SSL context. + * @return SSLContext + */ + public SSLContext createSSLContext() { + SSLContext sslContext; + try { + sslContext = SSLContext.getDefault(); + } catch (NoSuchAlgorithmException e) { + throw new ClientException("Failed to create default SSL context", e); + } ClickHouseSslContextProvider sslContextProvider = ClickHouseSslContextProvider.getProvider(); - String trustStorePath = chConfiguration.get(ClickHouseClientOption.TRUST_STORE.getKey()); - SSLContext sslContext = null; if (trustStorePath != null ) { try { sslContext = sslContextProvider.getSslContextFromKeyStore( @@ -142,12 +127,67 @@ public CloseableHttpClient createHttpClient() { throw new ClientMisconfigurationException("Failed to create SSL context from certificates", e); } } - if (sslContext !=null) { - connMgrBuilder.setSSLSocketFactory(new SSLConnectionSocketFactory(sslContext)); - } + return sslContext; + } + + private long CONNECTION_INACTIVITY_CHECK = 5000L; + + private ConnectionConfig createConnectionConfig() { + ConnectionConfig.Builder connConfig = ConnectionConfig.custom(); + connConfig.setTimeToLive(MapUtils.getLong(chConfiguration, ClickHouseClientOption.CONNECTION_TTL.getKey()), + TimeUnit.MILLISECONDS); + connConfig.setConnectTimeout(MapUtils.getLong(chConfiguration, ClickHouseClientOption.CONNECTION_TIMEOUT.getKey()), + TimeUnit.MILLISECONDS); + connConfig.setValidateAfterInactivity(CONNECTION_INACTIVITY_CHECK, TimeUnit.MILLISECONDS); // non-configurable for now + + return connConfig.build(); + } + + private HttpClientConnectionManager basicConnectionManager(SSLContext sslContext, SocketConfig socketConfig) { + RegistryBuilder registryBuilder = RegistryBuilder.create(); + registryBuilder.register("http", PlainConnectionSocketFactory.getSocketFactory()); + registryBuilder.register("https", new SSLConnectionSocketFactory(sslContext)); - connMgrBuilder.setDefaultSocketConfig(soCfgBuilder.build()); - clientBuilder.setConnectionManager(connMgrBuilder.build()); + + BasicHttpClientConnectionManager connManager = new BasicHttpClientConnectionManager(registryBuilder.build()); + connManager.setConnectionConfig(createConnectionConfig()); + connManager.setSocketConfig(socketConfig); + + return connManager; + } + + private HttpClientConnectionManager poolConnectionManager(SSLContext sslContext, SocketConfig socketConfig) { + PoolingHttpClientConnectionManagerBuilder connMgrBuilder = PoolingHttpClientConnectionManagerBuilder.create() + .setConnPoolPolicy(PoolReusePolicy.LIFO) + .setPoolConcurrencyPolicy(PoolConcurrencyPolicy.STRICT); + + connMgrBuilder.setDefaultConnectionConfig(createConnectionConfig()); + connMgrBuilder.setMaxConnTotal(Integer.MAX_VALUE); // as we do not know how many routes we will have + MapUtils.applyInt(chConfiguration, ClickHouseHttpOption.MAX_OPEN_CONNECTIONS.getKey(), + connMgrBuilder::setMaxConnPerRoute); + + + connMgrBuilder.setSSLSocketFactory(new SSLConnectionSocketFactory(sslContext)); + connMgrBuilder.setDefaultSocketConfig(socketConfig); + return connMgrBuilder.build(); + } + + public CloseableHttpClient createHttpClient() { + + // Top Level builders + HttpClientBuilder clientBuilder = HttpClientBuilder.create(); + SSLContext sslContext = createSSLContext(); + + // Socket configuration + SocketConfig.Builder soCfgBuilder = SocketConfig.custom(); + MapUtils.applyInt(chConfiguration, ClickHouseClientOption.SOCKET_TIMEOUT.getKey(), + (t) -> soCfgBuilder.setSoTimeout(t, TimeUnit.MILLISECONDS)); + MapUtils.applyInt(chConfiguration, ClickHouseClientOption.SOCKET_RCVBUF.getKey(), + soCfgBuilder::setRcvBufSize); + MapUtils.applyInt(chConfiguration, ClickHouseClientOption.SOCKET_SNDBUF.getKey(), + soCfgBuilder::setSndBufSize); + MapUtils.applyInt(chConfiguration, ClickHouseClientOption.SOCKET_LINGER.getKey(), + (v) -> soCfgBuilder.setSoLinger(v, TimeUnit.SECONDS)); // Proxy String proxyHost = chConfiguration.get(ClickHouseClientOption.PROXY_HOST.getKey()); @@ -173,6 +213,19 @@ public CloseableHttpClient createHttpClient() { .equalsIgnoreCase("false")) { clientBuilder.disableCookieManagement(); } + SocketConfig socketConfig = soCfgBuilder.build(); + + // Connection manager + boolean isConnectionPooling = MapUtils.getFlag(chConfiguration, "connection_pool_enabled"); + if (isConnectionPooling) { + clientBuilder.setConnectionManager(poolConnectionManager(sslContext, socketConfig)); + } else { + clientBuilder.setConnectionManager(basicConnectionManager(sslContext, socketConfig)); + } + long keepAliveTimeout = MapUtils.getLong(chConfiguration, ClickHouseHttpOption.KEEP_ALIVE_TIMEOUT.getKey()); + if (keepAliveTimeout > 0) { + clientBuilder.setKeepAliveStrategy((response, context) -> TimeValue.ofMilliseconds(keepAliveTimeout)); + } return clientBuilder.build(); } @@ -196,20 +249,17 @@ public Exception readError(ClassicHttpResponse httpResponse) { public ClassicHttpResponse executeRequest(ClickHouseNode server, Map requestConfig, IOCallback writeCallback) throws IOException { -// HttpHost target = new HttpHost("https", server.getHost(), server.getPort()); - URI uri; try { URIBuilder uriBuilder = new URIBuilder(server.getBaseUri()); addQueryParams(uriBuilder, chConfiguration, requestConfig); - uri = uriBuilder.build(); + uri = uriBuilder.normalizeSyntax().build(); } catch (URISyntaxException e) { throw new RuntimeException(e); } HttpPost req = new HttpPost(uri); addHeaders(req, chConfiguration, requestConfig); - RequestConfig httpReqConfig = RequestConfig.copy(baseRequestConfig) .build(); req.setConfig(httpReqConfig); @@ -246,11 +296,7 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map map, String key) { return 0; } + public static long getLong(Map map, String key) { + String val = map.get(key); + if (val != null) { + try { + return Long.parseLong(val); + } catch (NumberFormatException e) { + throw new RuntimeException("Invalid value for key " + key + ": " + val, e); + } + } + return 0; + } + public static boolean getFlag(Map map, String key) { String val = map.get(key); if (val == null) { diff --git a/client-v2/src/test/java/com/clickhouse/client/ConnectionManagementTests.java b/client-v2/src/test/java/com/clickhouse/client/ConnectionManagementTests.java new file mode 100644 index 000000000..8691ab91e --- /dev/null +++ b/client-v2/src/test/java/com/clickhouse/client/ConnectionManagementTests.java @@ -0,0 +1,166 @@ +package com.clickhouse.client; + +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.ConnectionInitiationException; +import com.clickhouse.client.api.enums.ProxyType; +import com.clickhouse.client.api.query.GenericRecord; +import com.clickhouse.client.api.query.QueryResponse; +import com.clickhouse.client.config.ClickHouseClientOption; +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.common.Slf4jNotifier; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import com.github.tomakehurst.wiremock.http.trafficlistener.WiremockNetworkTrafficListener; +import org.apache.hc.core5.http.ConnectionRequestTimeoutException; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.net.URIBuilder; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.net.Socket; +import java.nio.ByteBuffer; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +public class ConnectionManagementTests extends BaseIntegrationTest{ + + + @Test(groups = {"integration"},dataProvider = "testConnectionTTLProvider") + @SuppressWarnings("java:S2925") + public void testConnectionTTL(Long connectionTtl, Long keepAlive, int openSockets) throws Exception { + if (isCloud()) { + return; // skip cloud tests because of wiremock proxy. TODO: fix it + } + ClickHouseNode server = getServer(ClickHouseProtocol.HTTP); + + int proxyPort = new Random().nextInt(1000) + 10000; + System.out.println("proxyPort: " + proxyPort); + ConnectionCounterListener connectionCounter = new ConnectionCounterListener(); + WireMockServer proxy = new WireMockServer(WireMockConfiguration + .options().port(proxyPort) + .networkTrafficListener(connectionCounter) + .notifier(new Slf4jNotifier(true))); + proxy.start(); + URIBuilder targetURI = new URIBuilder(server.getBaseUri()) + .setPath(""); + proxy.addStubMapping(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse().proxiedFrom(targetURI.build().toString())).build()); + + Client.Builder clientBuilder = new Client.Builder() + .addEndpoint(server.getBaseUri()) + .setUsername("default") + .setPassword(getPassword()) + .useNewImplementation(true) + .addProxy(ProxyType.HTTP, "localhost", proxyPort); + if (connectionTtl != null) { + clientBuilder.setConnectionTTL(connectionTtl, ChronoUnit.MILLIS); + } + if (keepAlive != null) { + clientBuilder.setKeepAliveTimeout(keepAlive, ChronoUnit.MILLIS); + } + + try (Client client = clientBuilder.build()) { + List resp = client.queryAll("select 1"); + Assert.assertEquals(resp.stream().findFirst().get().getString(1), "1"); + + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + Assert.fail("Unexpected exception", e); + } + + resp = client.queryAll("select 1"); + Assert.assertEquals(resp.stream().findFirst().get().getString(1), "1"); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Unexpected exception", e); + } finally { + Assert.assertEquals(connectionCounter.opened.get(), openSockets); + proxy.stop(); + } + } + + @DataProvider(name = "testConnectionTTLProvider") + public static Object[][] testConnectionTTLProvider() { + return new Object[][] { + { 1000L, null, 2 }, + { 2000L, null, 1 }, + { null, 2000L, 1 }, + { null, 500L, 2 }, + { 1000L, 0L, 2 }, + { 1000L, 3000L, 2} + }; + } + + private static class ConnectionCounterListener implements WiremockNetworkTrafficListener { + + private AtomicInteger opened = new AtomicInteger(0); + private AtomicInteger closed = new AtomicInteger(0); + + @Override + public void opened(Socket socket) { + opened.incrementAndGet(); + } + + @Override + public void incoming(Socket socket, ByteBuffer bytes) { + // ignore + } + + @Override + public void outgoing(Socket socket, ByteBuffer bytes) { + // ignore + } + + @Override + public void closed(Socket socket) { + closed.incrementAndGet(); + } + } + + @Test(groups = {"integration"}) + public void testConnectionRequestTimeout() { + + int serverPort = new Random().nextInt(1000) + 10000; + System.out.println("proxyPort: " + serverPort); + ConnectionCounterListener connectionCounter = new ConnectionCounterListener(); + WireMockServer proxy = new WireMockServer(WireMockConfiguration + .options().port(serverPort) + .networkTrafficListener(connectionCounter) + .notifier(new Slf4jNotifier(true))); + proxy.start(); + proxy.addStubMapping(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse().withFixedDelay(5000) + .withStatus(HttpStatus.SC_NOT_FOUND)).build()); + + Client.Builder clientBuilder = new Client.Builder() + .addEndpoint("http://localhost:" + serverPort) + .setUsername("default") + .setPassword(getPassword()) + .useNewImplementation(true) + .setMaxConnections(1) + .setOption(ClickHouseClientOption.ASYNC.getKey(), "true") + .setSocketTimeout(10000, ChronoUnit.MILLIS) + .setConnectionRequestTimeout(5, ChronoUnit.MILLIS); + + try (Client client = clientBuilder.build()) { + CompletableFuture f1 = client.query("select 1"); + CompletableFuture f2 = client.query("select 1"); + f2.get(); + } catch (ExecutionException e) { + e.printStackTrace(); + Assert.assertTrue(e.getCause() instanceof ConnectionInitiationException); + Assert.assertTrue(e.getCause().getCause() instanceof ConnectionRequestTimeoutException); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Unexpected exception", e); + } finally { + proxy.stop(); + } + } +}