Skip to content

Commit

Permalink
Fix build failure
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Jul 13, 2022
1 parent 8d6ce58 commit 8b8415f
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ public abstract class ClickHouseInputStream extends InputStream {
* @param compressionLevel compression level
* @return non-null wrapped input stream
*/
static ClickHouseInputStream wrap(ClickHouseFile file, InputStream input, int bufferSize, Runnable postCloseAction,
ClickHouseCompression compression, int compressionLevel) {
public static ClickHouseInputStream wrap(ClickHouseFile file, InputStream input, int bufferSize,
Runnable postCloseAction, ClickHouseCompression compression, int compressionLevel) {
final ClickHouseInputStream chInput;
if (compression == null || compression == ClickHouseCompression.NONE) {
chInput = new WrappedInputStream(file, input, bufferSize, postCloseAction);
chInput = input != EmptyInputStream.INSTANCE && input instanceof ClickHouseInputStream
? (ClickHouseInputStream) input
: new WrappedInputStream(file, input, bufferSize, postCloseAction);
} else {
switch (compression) {
case GZIP:
Expand Down Expand Up @@ -485,7 +487,7 @@ public static File save(File file, InputStream in, int bufferSize, int timeout,
tmp = File.createTempFile("chc", "data");
tmp.deleteOnExit();
} catch (IOException e) {
throw new IllegalStateException("Failed to create temp file", e);
throw new UncheckedIOException("Failed to create temp file", e);
}
}
CompletableFuture<File> data = CompletableFuture.supplyAsync(() -> {
Expand All @@ -509,7 +511,9 @@ public static File save(File file, InputStream in, int bufferSize, int timeout,
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof UncheckedIOException) {
cause = ((UncheckedIOException) cause).getCause();
throw ((UncheckedIOException) cause);
} else if (cause instanceof IOException) {
throw new UncheckedIOException((IOException) cause);
}
throw new IllegalStateException(cause);
}
Expand Down
29 changes: 22 additions & 7 deletions clickhouse-grpc-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
<!-- necessary for Java 9+ -->
<dependency>
<groupId>org.apache.tomcat</groupId>
Expand Down Expand Up @@ -119,6 +119,10 @@
<pattern>okio</pattern>
<shadedPattern>${shade.base}.okio</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache</pattern>
<shadedPattern>${shade.base}.apache</shadedPattern>
</relocation>
<relocation>
<pattern>io.grpc</pattern>
<shadedPattern>${shade.base}.grpc</shadedPattern>
Expand Down Expand Up @@ -153,7 +157,8 @@
<exclude>android/**</exclude>
<exclude>google/**</exclude>
<exclude>javax/**</exclude>
<exclude>org/**</exclude>
<exclude>org/checkerframework/**</exclude>
<exclude>org/codehaus/**</exclude>
<exclude>**/module-info.class</exclude>
<exclude>META-INF/MANIFEST.MF</exclude>
<exclude>META-INF/maven/**</exclude>
Expand All @@ -179,6 +184,10 @@
<pattern>com.google</pattern>
<shadedPattern>${shade.base}.google</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache</pattern>
<shadedPattern>${shade.base}.apache</shadedPattern>
</relocation>
<relocation>
<pattern>io.grpc</pattern>
<shadedPattern>${shade.base}.grpc</shadedPattern>
Expand Down Expand Up @@ -211,7 +220,8 @@
<exclude>io/grpc/okhttp/**</exclude>
<exclude>javax/**</exclude>
<exclude>okio/**</exclude>
<exclude>org/**</exclude>
<exclude>org/checkerframework/**</exclude>
<exclude>org/codehaus/**</exclude>
<exclude>**/module-info.class</exclude>
<exclude>META-INF/MANIFEST.MF</exclude>
<exclude>META-INF/maven/**</exclude>
Expand Down Expand Up @@ -245,6 +255,10 @@
<pattern>okio</pattern>
<shadedPattern>${shade.base}.okio</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache</pattern>
<shadedPattern>${shade.base}.apache</shadedPattern>
</relocation>
<relocation>
<pattern>io.grpc</pattern>
<shadedPattern>${shade.base}.grpc</shadedPattern>
Expand Down Expand Up @@ -275,7 +289,8 @@
<exclude>google/**</exclude>
<exclude>io/grpc/netty/**</exclude>
<exclude>javax/**</exclude>
<exclude>org/**</exclude>
<exclude>org/checkerframework/**</exclude>
<exclude>org/codehaus/**</exclude>
<exclude>**/module-info.class</exclude>
<exclude>META-INF/MANIFEST.MF</exclude>
<exclude>META-INF/maven/**</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.concurrent.CompletionException;

import com.clickhouse.client.AbstractClient;
import com.clickhouse.client.ClickHouseConfig;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseProtocol;
Expand All @@ -29,7 +30,7 @@ public class ClickHouseHttpClient extends AbstractClient<ClickHouseHttpConnectio
protected boolean checkConnection(ClickHouseHttpConnection connection, ClickHouseNode requestServer,
ClickHouseNode currentServer, ClickHouseRequest<?> request) {
// return false to suggest creating a new connection
return connection != null && connection.isReusable() && requestServer.equals(currentServer);
return connection != null && connection.isReusable() && requestServer.isSameEndpoint(currentServer);
}

@Override
Expand Down Expand Up @@ -98,8 +99,18 @@ protected ClickHouseResponse send(ClickHouseRequest<?> sealedRequest) throws Cli
}

log.debug("Query: %s", sql);
ClickHouseHttpResponse httpResponse = conn.post(sql, sealedRequest.getInputStream().orElse(null),
sealedRequest.getExternalTables(), null);
ClickHouseConfig config = sealedRequest.getConfig();
final ClickHouseHttpResponse httpResponse;
if (conn.isReusable()) {
ClickHouseNode server = sealedRequest.getServer();
httpResponse = conn.post(sql, sealedRequest.getInputStream().orElse(null),
sealedRequest.getExternalTables(),
ClickHouseHttpConnection.buildUrl(server.getBaseUri(), sealedRequest),
ClickHouseHttpConnection.createDefaultHeaders(config, server), config);
} else {
httpResponse = conn.post(sql, sealedRequest.getInputStream().orElse(null),
sealedRequest.getExternalTables(), null, null, config);
}
return ClickHouseStreamResponse.of(httpResponse.getConfig(sealedRequest), httpResponse.getInputStream(),
sealedRequest.getSettings(), null, httpResponse.summary);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
import com.clickhouse.client.http.config.ClickHouseHttpOption;

public abstract class ClickHouseHttpConnection implements AutoCloseable {
private static StringBuilder appendQueryParameter(StringBuilder builder, String key, String value) {
return builder.append(urlEncode(key, StandardCharsets.UTF_8)).append('=')
.append(urlEncode(value, StandardCharsets.UTF_8)).append('&');
}

static String urlEncode(String str, Charset charset) {
if (charset == null) {
charset = StandardCharsets.UTF_8;
Expand All @@ -39,11 +44,6 @@ static String urlEncode(String str, Charset charset) {
}
}

private static StringBuilder appendQueryParameter(StringBuilder builder, String key, String value) {
return builder.append(urlEncode(key, StandardCharsets.UTF_8)).append('=')
.append(urlEncode(value, StandardCharsets.UTF_8)).append('&');
}

static String buildQueryParams(ClickHouseRequest<?> request) {
if (request == null) {
return "";
Expand Down Expand Up @@ -146,26 +146,7 @@ static String buildUrl(String baseUrl, ClickHouseRequest<?> request) {
return builder.toString();
}

protected final ClickHouseConfig config;
protected final ClickHouseNode server;
protected final Map<String, String> defaultHeaders;

protected final ClickHouseOutputStream output;

protected final String url;

protected ClickHouseHttpConnection(ClickHouseNode server, ClickHouseRequest<?> request) {
if (server == null || request == null) {
throw new IllegalArgumentException("Non-null server and request are required");
}

this.config = request.getConfig();
this.server = server;

this.output = request.getOutputStream().orElse(null);

this.url = buildUrl(server.getBaseUri(), request);

protected static Map<String, String> createDefaultHeaders(ClickHouseConfig config, ClickHouseNode server) {
Map<String, String> map = new LinkedHashMap<>();
// add customer headers
map.putAll(ClickHouseUtils.getKeyValuePairs((String) config.getOption(ClickHouseHttpOption.CUSTOM_HEADERS)));
Expand Down Expand Up @@ -201,8 +182,25 @@ protected ClickHouseHttpConnection(ClickHouseNode server, ClickHouseRequest<?> r
&& config.getRequestCompressAlgorithm() != ClickHouseCompression.LZ4) {
map.put("Content-Encoding", config.getRequestCompressAlgorithm().encoding());
}
return map;
}

this.defaultHeaders = Collections.unmodifiableMap(map);
protected final ClickHouseConfig config;
protected final ClickHouseNode server;
protected final ClickHouseOutputStream output;
protected final String url;
protected final Map<String, String> defaultHeaders;

protected ClickHouseHttpConnection(ClickHouseNode server, ClickHouseRequest<?> request) {
if (server == null || request == null) {
throw new IllegalArgumentException("Non-null server and request are required");
}

this.config = request.getConfig();
this.server = server;
this.output = request.getOutputStream().orElse(null);
this.url = buildUrl(server.getBaseUri(), request);
this.defaultHeaders = Collections.unmodifiableMap(createDefaultHeaders(config, server));
}

protected void closeQuietly() {
Expand Down Expand Up @@ -231,11 +229,13 @@ protected String getBaseUrl() {
* Creates a merged map.
*
* @param requestHeaders request headers
* @return
* @return non-null merged headers
*/
protected Map<String, String> mergeHeaders(Map<String, String> requestHeaders) {
if (requestHeaders == null || requestHeaders.isEmpty()) {
return defaultHeaders;
} else if (isReusable()) {
return requestHeaders;
}

Map<String, String> merged = new LinkedHashMap<>();
Expand All @@ -256,13 +256,15 @@ protected Map<String, String> mergeHeaders(Map<String, String> requestHeaders) {
* @param query non-blank query
* @param data optionally input stream for batch updating
* @param tables optionally external tables for query
* @param url optionally url
* @param headers optionally request headers
* @param config optionally configuration
* @return response
* @throws IOException when error occured posting request and/or server failed
* to respond
*/
protected abstract ClickHouseHttpResponse post(String query, InputStream data, List<ClickHouseExternalTable> tables,
Map<String, String> headers) throws IOException;
String url, Map<String, String> headers, ClickHouseConfig config) throws IOException;

/**
* Checks whether the connection is reusable or not. This method will be called
Expand All @@ -287,36 +289,36 @@ protected boolean isReusable() {
public abstract boolean ping(int timeout);

public ClickHouseHttpResponse update(String query) throws IOException {
return post(query, null, null, null);
return post(query, null, null, null, null, null);
}

public ClickHouseHttpResponse update(String query, Map<String, String> headers) throws IOException {
return post(query, null, null, headers);
return post(query, null, null, null, headers, null);
}

public ClickHouseHttpResponse update(String query, InputStream data) throws IOException {
return post(query, data, null, null);
return post(query, data, null, null, null, null);
}

public ClickHouseHttpResponse update(String query, InputStream data, Map<String, String> headers)
throws IOException {
return post(query, data, null, headers);
return post(query, data, null, null, headers, null);
}

public ClickHouseHttpResponse query(String query) throws IOException {
return post(query, null, null, null);
return post(query, null, null, null, null, null);
}

public ClickHouseHttpResponse query(String query, Map<String, String> headers) throws IOException {
return post(query, null, null, headers);
return post(query, null, null, null, headers, null);
}

public ClickHouseHttpResponse query(String query, List<ClickHouseExternalTable> tables) throws IOException {
return post(query, null, tables, null);
return post(query, null, tables, null, null, null);
}

public ClickHouseHttpResponse query(String query, List<ClickHouseExternalTable> tables, Map<String, String> headers)
throws IOException {
return post(query, null, tables, headers);
return post(query, null, tables, null, headers, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ protected boolean isReusable() {

@Override
protected ClickHouseHttpResponse post(String sql, InputStream data, List<ClickHouseExternalTable> tables,
Map<String, String> headers) throws IOException {
String url, Map<String, String> headers, ClickHouseConfig config) throws IOException {
Charset charset = StandardCharsets.US_ASCII;
byte[] boundary = null;
if (tables != null && !tables.isEmpty()) {
Expand Down
Loading

0 comments on commit 8b8415f

Please sign in to comment.