Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix slowness in performance mode and failover not working when protocol is unsupported #995

Merged
merged 5 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions clickhouse-cli-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ This is a thin wrapper of ClickHouse native command-line client. It provides an
- native CLI client instead of pure Java implementation
- an example of implementing SPI defined in `clickhouse-client` module

Either [clickhouse-client](https://clickhouse.com/docs/en/interfaces/cli/) or [docker](https://docs.docker.com/get-docker/) must be installed prior to use. And it's important to understand that this module uses sub-process(in addition to threads) and file-based streaming, meaning 1) it's not as fast as native CLI client or pure Java implementation, although it's close in the case of dumping and loading data; and 2) it's not suitable for scenarios like dealing with many queries in short period of time.
Either [clickhouse](https://clickhouse.com/docs/en/interfaces/cli/) or [docker](https://docs.docker.com/get-docker/) must be installed prior to use. And it's important to understand that this module uses sub-process(in addition to threads) and file-based streaming, meaning 1) it's not as fast as native CLI client or pure Java implementation, although it's close in the case of dumping and loading data; and 2) it's not suitable for scenarios like dealing with many queries in short period of time.

## Limitations and Known Issues

- Only `max_result_rows` and `result_overflow_mode` two settings are currently supported
- Only `max_result_rows`, `result_overflow_mode` and `readonly` 3 settings are currently supported
- ClickHouseResponseSummary is always empty - see ClickHouse/ClickHouse#37241
- Session is not supported - see ClickHouse/ClickHouse#37308

Expand All @@ -28,10 +28,10 @@ Either [clickhouse-client](https://clickhouse.com/docs/en/interfaces/cli/) or [d
## Examples

```java
// make sure 'clickhouse-client' or 'docker' is in PATH before you start the program
// make sure 'clickhouse' or 'docker' is in PATH before you start the program
// alternatively, configure CLI path in either Java system property or environment variable, for examples:
// CHC_CLICKHOUSE_CLI_PATH=/path/to/clickhouse-client CHC_DOCKER_CLI_PATH=/path/to/docker java MyProgram
// java -Dchc_clickhouse_cli_path=/path/to/clickhouse-client -Dchc_docker_cli_path=/path/to/docker MyProgram
// CHC_CLICKHOUSE_CLI_PATH=/path/to/clickhouse CHC_DOCKER_CLI_PATH=/path/to/docker java MyProgram
// java -Dchc_clickhouse_cli_path=/path/to/clickhouse -Dchc_docker_cli_path=/path/to/docker MyProgram

// clickhouse-cli-client uses TCP protocol
ClickHouseProtocol preferredProtocol = ClickHouseProtocol.TCP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -93,7 +94,7 @@ static void dockerCommand(ClickHouseConfig config, String hostDir, String contai
cli = DEFAULT_DOCKER_CLI_PATH;
}
if (!check(timeout, cli, DEFAULT_CLI_ARG_VERSION)) {
throw new IllegalStateException("Docker command-line is not available: " + cli);
throw new UncheckedIOException(new ConnectException("Docker command-line is not available: " + cli));
} else {
commands.add(cli);
}
Expand All @@ -111,7 +112,7 @@ static void dockerCommand(ClickHouseConfig config, String hostDir, String contai
DEFAULT_CLI_ARG_VERSION)
&& !check(timeout, cli, "run", "--rm", "--name", str, "-v", hostDir + ':' + containerDir,
"-d", img, "tail", "-f", "/dev/null")) {
throw new IllegalStateException("Failed to start new container: " + str);
throw new UncheckedIOException(new ConnectException("Failed to start new container: " + str));
}
}
}
Expand All @@ -122,7 +123,7 @@ static void dockerCommand(ClickHouseConfig config, String hostDir, String contai
} else { // create new container for each query
if (!check(timeout, cli, "run", "--rm", img, DEFAULT_CLICKHOUSE_CLI_PATH, DEFAULT_CLIENT_OPTION,
DEFAULT_CLI_ARG_VERSION)) {
throw new IllegalStateException("Invalid ClickHouse docker image: " + img);
throw new UncheckedIOException(new ConnectException("Invalid ClickHouse docker image: " + img));
}
commands.add("run");
commands.add("--rm");
Expand Down Expand Up @@ -235,6 +236,10 @@ static Process startProcess(ClickHouseNode server, ClickHouseRequest<?> request)
if (value != null) {
commands.add("--result_overflow_mode=".concat(value.toString()));
}
value = settings.get("readonly");
if (value != null) {
commands.add("--readonly=".concat(value.toString()));
}
if ((boolean) config.getOption(ClickHouseCommandLineOption.USE_PROFILE_EVENTS)) {
commands.add("--print-profile-events");
commands.add("--profile-events-delay-ms=-1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
public enum ClickHouseCommandLineOption implements ClickHouseOption {
/**
* ClickHouse native command-line client path. Empty value is treated as
* 'clickhouse-client'.
* 'clickhouse'.
*/
CLICKHOUSE_CLI_PATH("clickhouse_cli_path", "",
"ClickHouse native command-line client path, empty value is treated as 'clickhouse-client'"),
"ClickHouse native command-line client path, empty value is treated as 'clickhouse'"),
/**
* ClickHouse docker image. Empty value is treated as
* 'clickhouse/clickhouse-server'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ static ClickHouseInputStream getResponseInputStream(ClickHouseConfig config, Inp

/**
* Gets piped input stream for reading data from response asynchronously. When
* {@code config} is null or {@code config.isAsync()} is faluse, this method is
* {@code config} is null or {@code config.isAsync()} is false, this method is
* same as
* {@link #getResponseInputStream(ClickHouseConfig, InputStream, Runnable)}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.clickhouse.client;

import java.io.Serializable;
import java.io.UncheckedIOException;
import java.net.ConnectException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -27,25 +30,36 @@
*/
public class ClickHouseClientBuilder {
/**
* Dummy client which is only used {@link Agent}.
* Dummy client which is only used by {@link Agent}.
*/
static class DummyClient implements ClickHouseClient {
static final ClickHouseConfig CONFIG = new ClickHouseConfig();
static final DummyClient INSTANCE = new DummyClient();
static final ClickHouseConfig DEFAULT_CONFIG = new ClickHouseConfig();

private final ClickHouseConfig config;

DummyClient() {
this(null);
}

DummyClient(ClickHouseConfig config) {
this.config = config != null ? config : DEFAULT_CONFIG;
}

@Override
public boolean accept(ClickHouseProtocol protocol) {
return true;
return false;
}

@Override
public CompletableFuture<ClickHouseResponse> execute(ClickHouseRequest<?> request) {
return CompletableFuture.completedFuture(ClickHouseResponse.EMPTY);
CompletableFuture<ClickHouseResponse> future = new CompletableFuture<>();
future.completeExceptionally(new ConnectException("No client available"));
return future;
}

@Override
public ClickHouseConfig getConfig() {
return CONFIG;
return config;
}

@Override
Expand All @@ -55,7 +69,7 @@ public void close() {

@Override
public boolean ping(ClickHouseNode server, int timeout) {
return true;
return false;
}
}

Expand All @@ -68,8 +82,8 @@ static final class Agent implements ClickHouseClient {

private final AtomicReference<ClickHouseClient> client;

Agent(ClickHouseClient client) {
this.client = new AtomicReference<>(client != null ? client : DummyClient.INSTANCE);
Agent(ClickHouseClient client, ClickHouseConfig config) {
this.client = new AtomicReference<>(client != null ? client : new DummyClient(config));
}

ClickHouseClient getClient() {
Expand All @@ -90,25 +104,27 @@ boolean changeClient(ClickHouseClient currentClient, ClickHouseClient newClient)
return changed;
}

ClickHouseResponse failover(ClickHouseRequest<?> sealedRequest, Throwable cause, int times) {
ClickHouseResponse failover(ClickHouseRequest<?> sealedRequest, ClickHouseException exception, int times) {
for (int i = 1; i <= times; i++) {
log.debug("Failover %d of %d due to: %s", i, times, cause.getMessage());
log.debug("Failover %d of %d due to: %s", i, times, exception.getCause(), null);
ClickHouseNode current = sealedRequest.getServer();
ClickHouseNodeManager manager = current.manager.get();
if (manager == null) {
break;
}
ClickHouseNode next = manager.suggestNode(current, cause);
ClickHouseNode next = manager.suggestNode(current, exception);
if (next == current) {
log.debug("Cancel failover for same node returned from %s", manager.getPolicy());
break;
}
current.update(Status.FAULTY);
next = sealedRequest.changeServer(current, next);
if (next == current) {
log.debug("Cancel failover for no alternative of %s", current);
break;
}

log.info("Switching node from %s to %s due to: %s", current, next, cause.getMessage());
log.info("Switching node from %s to %s due to: %s", current, next, exception.getCause(), null);
final ClickHouseProtocol protocol = next.getProtocol();
final ClickHouseClient currentClient = client.get();
if (!currentClient.accept(protocol)) {
Expand All @@ -118,64 +134,70 @@ ClickHouseResponse failover(ClickHouseRequest<?> sealedRequest, Throwable cause,
.config(new ClickHouseConfig(currentClient.getConfig(), next.config))
.nodeSelector(ClickHouseNodeSelector.of(protocol)).build();
} catch (Exception e) {
cause = e;
continue;
exception = ClickHouseException.of(new ConnectException("No client available for " + next),
sealedRequest.getServer());
} finally {
if (newClient != null) {
boolean changed = changeClient(currentClient, newClient);
log.debug("Switching client from %s to %s: %s", currentClient, newClient, changed);
log.info("Switching client from %s to %s: %s", currentClient, newClient, changed);
if (changed) {
sealedRequest.resetCache();
}
}
}

if (newClient == null) {
continue;
}
}

try {
return sendOnce(sealedRequest);
} catch (Exception exp) {
cause = exp.getCause();
if (cause == null) {
cause = exp;
}
exception = ClickHouseException.of(exp.getCause() != null ? exp.getCause() : exp,
sealedRequest.getServer());
}
}

throw new CompletionException(cause);
throw new CompletionException(exception);
}

ClickHouseResponse retry(ClickHouseRequest<?> sealedRequest, Throwable cause, int times) {
ClickHouseResponse retry(ClickHouseRequest<?> sealedRequest, ClickHouseException exception, int times) {
for (int i = 1; i <= times; i++) {
log.debug("Retry %d of %d due to: %s", i, times, cause.getMessage());
log.debug("Retry %d of %d due to: %s", i, times, exception.getMessage());
// TODO retry idempotent query
if (cause instanceof ClickHouseException
&& ((ClickHouseException) cause).getErrorCode() == ClickHouseException.ERROR_NETWORK) {
if (exception.getErrorCode() == ClickHouseException.ERROR_NETWORK) {
log.info("Retry request on %s due to connection issue", sealedRequest.getServer());
try {
return sendOnce(sealedRequest);
} catch (Exception exp) {
cause = exp.getCause();
if (cause == null) {
cause = exp;
}
exception = ClickHouseException.of(exp.getCause() != null ? exp.getCause() : exp,
sealedRequest.getServer());
}
}
}

throw new CompletionException(cause);
throw new CompletionException(exception);
}

ClickHouseResponse handle(ClickHouseRequest<?> sealedRequest, Throwable cause) {
// in case there's any recoverable exception wrapped by UncheckedIOException
if (cause instanceof UncheckedIOException && cause.getCause() != null) {
cause = ((UncheckedIOException) cause).getCause();
}

log.debug("Handling %s(failover=%d, retry=%d)", cause, sealedRequest.getConfig().getFailover(),
sealedRequest.getConfig().getRetry());
try {
int times = sealedRequest.getConfig().getFailover();
if (times > 0) {
return failover(sealedRequest, cause, times);
return failover(sealedRequest, ClickHouseException.of(cause, sealedRequest.getServer()), times);
}

// different from failover: 1) retry on the same node; 2) never retry on timeout
times = sealedRequest.getConfig().getRetry();
if (times > 0) {
return retry(sealedRequest, cause, times);
return retry(sealedRequest, ClickHouseException.of(cause, sealedRequest.getServer()), times);
}

throw new CompletionException(cause);
Expand All @@ -200,8 +222,8 @@ ClickHouseResponse sendOnce(ClickHouseRequest<?> sealedRequest) {
ClickHouseResponse send(ClickHouseRequest<?> sealedRequest) {
try {
return sendOnce(sealedRequest);
} catch (CompletionException e) {
return handle(sealedRequest, e.getCause());
} catch (Exception e) {
return handle(sealedRequest, e.getCause() != null ? e.getCause() : e);
}
}

Expand All @@ -228,9 +250,32 @@ public boolean ping(ClickHouseNode server, int timeout) {
@Override
public CompletableFuture<ClickHouseResponse> execute(ClickHouseRequest<?> request) {
final ClickHouseRequest<?> sealedRequest = request.seal();
final ClickHouseNode server = sealedRequest.getServer();
final ClickHouseProtocol protocol = server.getProtocol();
final ClickHouseClient currentClient = client.get();
if (!currentClient.accept(protocol)) {
ClickHouseClient newClient = null;
try {
newClient = ClickHouseClient.builder().agent(false)
.config(new ClickHouseConfig(currentClient.getConfig(), server.config))
.nodeSelector(ClickHouseNodeSelector.of(protocol)).build();
} catch (IllegalStateException e) {
// let it fail on execution phase
log.debug("Failed to find client for %s", server);
} finally {
if (newClient != null) {
boolean changed = changeClient(currentClient, newClient);
log.debug("Switching client from %s to %s: %s", currentClient, newClient, changed);
if (changed) {
sealedRequest.resetCache();
}
}
}
}
return sealedRequest.getConfig().isAsync()
? getClient().execute(sealedRequest)
.handle((r, t) -> t == null ? r : handle(sealedRequest, t.getCause()))
.handle((r, t) -> t == null ? r
: handle(sealedRequest, t.getCause() != null ? t.getCause() : t))
: CompletableFuture.completedFuture(send(sealedRequest));
}

Expand Down Expand Up @@ -339,26 +384,28 @@ public ClickHouseConfig getConfig() {
public ClickHouseClient build() {
ClickHouseClient client = null;

boolean noSelector = nodeSelector == null || nodeSelector == ClickHouseNodeSelector.EMPTY;
int counter = 0;
ClickHouseConfig conf = getConfig();
for (ClickHouseClient c : loadClients()) {
c.init(conf);
int counter = 0;
if (nodeSelector != null) {
for (ClickHouseClient c : loadClients()) {
c.init(conf);

counter++;
if (noSelector || nodeSelector.match(c)) {
client = c;
break;
counter++;
if (nodeSelector == ClickHouseNodeSelector.EMPTY || nodeSelector.match(c)) {
client = c;
break;
}
}
}

if (client == null) {
if (agent) {
return new Agent(client, conf);
} else if (client == null) {
throw new IllegalStateException(
ClickHouseUtils.format("No suitable ClickHouse client(out of %d) found in classpath for %s.",
counter, nodeSelector));
}

return agent ? new Agent(client) : client;
return client;
}

/**
Expand Down Expand Up @@ -475,7 +522,11 @@ public ClickHouseClientBuilder defaultCredentials(ClickHouseCredentials credenti
*/
public ClickHouseClientBuilder nodeSelector(ClickHouseNodeSelector nodeSelector) {
if (!ClickHouseChecker.nonNull(nodeSelector, "nodeSelector").equals(this.nodeSelector)) {
this.nodeSelector = nodeSelector;
this.nodeSelector = (nodeSelector.getPreferredProtocols().isEmpty() || nodeSelector.getPreferredProtocols()
.equals(Collections.singletonList(ClickHouseProtocol.ANY)))
&& nodeSelector.getPreferredTags().isEmpty()
? ClickHouseNodeSelector.EMPTY
: nodeSelector;
resetConfig();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,6 @@ public String toString() {
.append(checking.get()).append(", index=").append(index.get()).append(", lock=r")
.append(lock.getReadHoldCount()).append('w').append(lock.getWriteHoldCount()).append(", nodes=")
.append(nodes.size()).append(", faulty=").append(faultyNodes.size()).append(", policy=")
.append(policy.getClass().getSimpleName()).append(']').toString();
.append(policy.getClass().getSimpleName()).append("]@").append(hashCode()).toString();
}
}
Loading