Skip to content

Commit

Permalink
support direct upload/download in http client
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Jul 18, 2022
1 parent b1b4cc1 commit d1a97b1
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public ClickHouseInputStream asInputStream() {
try {
return ClickHouseInputStream.wrap(this, new FileInputStream(getFile()),
(int) ClickHouseClientOption.READ_BUFFER_SIZE.getDefaultValue(), null,
getCompressionAlgorithm(), getCompressionLevel());
ClickHouseCompression.NONE, getCompressionLevel());
} catch (FileNotFoundException e) {
throw new IllegalArgumentException(e);
}
Expand All @@ -87,7 +87,7 @@ public ClickHouseOutputStream asOutputStream() {
try {
return ClickHouseOutputStream.wrap(this, new FileOutputStream(getFile()),
(int) ClickHouseClientOption.WRITE_BUFFER_SIZE.getDefaultValue(), null,
getCompressionAlgorithm(), getCompressionLevel());
ClickHouseCompression.NONE, getCompressionLevel());
} catch (FileNotFoundException e) {
throw new IllegalArgumentException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public static ClickHouseInputStream of(ClickHouseFile file, int bufferSize, Runn
}
try {
return wrap(file, new FileInputStream(file.getFile()), bufferSize, postCloseAction,
file.getCompressionAlgorithm(), file.getCompressionLevel());
ClickHouseCompression.NONE, file.getCompressionLevel());
} catch (FileNotFoundException e) {
throw new IllegalArgumentException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static ClickHouseOutputStream of(ClickHouseFile file, int bufferSize, Run
}
try {
return wrap(file, new FileOutputStream(file.getFile()), bufferSize, postCloseAction,
file.getCompressionAlgorithm(), file.getCompressionLevel());
ClickHouseCompression.NONE, file.getCompressionLevel());
} catch (FileNotFoundException e) {
throw new IllegalArgumentException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.math.BigInteger;
Expand All @@ -24,6 +29,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import com.clickhouse.client.ClickHouseClientBuilder.Agent;
import com.clickhouse.client.config.ClickHouseBufferingMode;
Expand Down Expand Up @@ -119,6 +126,16 @@ protected Object[][] getCompressionMatrix() {
return array;
}

@DataProvider(name = "fileProcessMatrix")
protected Object[][] getFileProcessMatrix() {
return new Object[][] {
{ true, true },
{ true, false },
{ false, true },
{ false, false },
};
}

@DataProvider(name = "renameMethods")
protected Object[][] getRenameMethods() {
return new Object[][] {
Expand Down Expand Up @@ -1035,6 +1052,47 @@ public void testDump() throws Exception {
Files.delete(temp);
}

@Test(dataProvider = "fileProcessMatrix", groups = "integration")
public void testDumpFile(boolean gzipCompressed, boolean useOneLiner) throws Exception {
ClickHouseNode server = getServer();
if (server.getProtocol() != ClickHouseProtocol.HTTP) {
throw new SkipException("Skip as only http implementation works well");
}

File file = File.createTempFile("chc", ".data");
ClickHouseFile wrappedFile = ClickHouseFile.of(file,
gzipCompressed ? ClickHouseCompression.GZIP : ClickHouseCompression.NONE, 0,
ClickHouseFormat.CSV);
String query = "select number, if(number % 2 = 0, null, toString(number)) str from numbers(10)";
if (useOneLiner) {
ClickHouseClient.dump(server, query, wrappedFile).get();
} else {
try (ClickHouseClient client = getClient();
ClickHouseResponse response = client.connect(server).query(query).output(wrappedFile)
.executeAndWait()) {
// ignore
}
}
try (InputStream in = gzipCompressed ? new GZIPInputStream(new FileInputStream(file))
: new FileInputStream(file); ByteArrayOutputStream out = new ByteArrayOutputStream()) {
ClickHouseInputStream.pipe(in, out, 512);
String content = new String(out.toByteArray(), StandardCharsets.US_ASCII);
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 10; i++) {
builder.append(i).append(',');
if (i % 2 == 0) {
builder.append("\\N");
} else {
builder.append('"').append(i).append('"');
}
builder.append('\n');
}
Assert.assertEquals(content, builder.toString());
} finally {
file.delete();
}
}

@Test(groups = { "integration" })
public void testCustomLoad() throws Exception {
ClickHouseNode server = getServer();
Expand Down Expand Up @@ -1117,6 +1175,66 @@ public void testLoadCsv() throws Exception {
}
}

@Test(dataProvider = "fileProcessMatrix", groups = "integration")
public void testLoadFile(boolean gzipCompressed, boolean useOneLiner) throws Exception {
ClickHouseNode server = getServer();
if (server.getProtocol() != ClickHouseProtocol.HTTP) {
throw new SkipException("Skip as only http implementation works well");
}

File file = File.createTempFile("chc", ".data");
Object[][] data = new Object[][] {
{ 1, "12345" },
{ 2, "23456" },
{ 3, "\\N" },
{ 4, "x" },
{ 5, "y" },
};
try (OutputStream out = gzipCompressed ? new GZIPOutputStream(new FileOutputStream(file))
: new FileOutputStream(file)) {
for (Object[] row : data) {
out.write((row[0] + "," + row[1]).getBytes(StandardCharsets.US_ASCII));
if ((int) row[0] != 5) {
out.write(10);
}
}
out.flush();
}

ClickHouseClient.send(server, "drop table if exists test_load_file",
"create table test_load_file(a Int32, b Nullable(String))engine=Memory").get();
ClickHouseFile wrappedFile = ClickHouseFile.of(file,
gzipCompressed ? ClickHouseCompression.GZIP : ClickHouseCompression.NONE, 0,
ClickHouseFormat.CSV);
if (useOneLiner) {
ClickHouseClient
.load(server, "test_load_file", wrappedFile)
.get();
} else {
try (ClickHouseClient client = getClient();
ClickHouseResponse response = client.connect(server).write().table("test_load_file")
.data(wrappedFile).executeAndWait()) {
// ignore
}
}
try (ClickHouseClient client = getClient();
ClickHouseResponse response = client.connect(server).format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("select * from test_load_file order by a").executeAndWait()) {
int row = 0;
for (ClickHouseRecord r : response.records()) {
Assert.assertEquals(r.getValue(0).asObject(), data[row][0]);
if (row == 2) {
Assert.assertNull(r.getValue(1).asObject());
} else {
Assert.assertEquals(r.getValue(1).asObject(), data[row][1]);
}
row++;
}
} finally {
file.delete();
}
}

@Test(groups = { "integration" })
public void testLoadRawData() throws Exception {
ClickHouseNode server = getServer();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.clickhouse.client.http;

import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.Charset;
Expand All @@ -17,6 +16,7 @@
import com.clickhouse.client.ClickHouseCompression;
import com.clickhouse.client.ClickHouseConfig;
import com.clickhouse.client.ClickHouseCredentials;
import com.clickhouse.client.ClickHouseInputStream;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseOutputStream;
import com.clickhouse.client.ClickHouseRequest;
Expand Down Expand Up @@ -59,15 +59,22 @@ static String buildQueryParams(ClickHouseRequest<?> request) {
appendQueryParameter(builder, cp.getKey(), cp.getValue());
}

if (config.isResponseCompressed()) {
// request server to compress response
appendQueryParameter(builder, "compress", "1");
}
if (config.isRequestCompressed()) {
ClickHouseInputStream chIn = request.getInputStream().orElse(null);
if (chIn != null && chIn.getUnderlyingFile().isAvailable()) {
appendQueryParameter(builder, "query", request.getStatements().get(0));
} else if (config.isRequestCompressed()) {
// inform server that client's request is compressed
appendQueryParameter(builder, "decompress", "1");
}

ClickHouseOutputStream chOut = request.getOutputStream().orElse(null);
if (chOut != null && chOut.getUnderlyingFile().isAvailable()) {
appendQueryParameter(builder, "enable_http_compression", "1");
} else if (config.isResponseCompressed()) {
// request server to compress response
appendQueryParameter(builder, "compress", "1");
}

Map<String, Object> settings = request.getSettings();
List<String> stmts = request.getStatements(false);
String settingKey = "max_execution_time";
Expand Down Expand Up @@ -263,8 +270,9 @@ protected Map<String, String> mergeHeaders(Map<String, String> requestHeaders) {
* @throws IOException when error occured posting request and/or server failed
* to respond
*/
protected abstract ClickHouseHttpResponse post(String query, InputStream data, List<ClickHouseExternalTable> tables,
String url, Map<String, String> headers, ClickHouseConfig config) throws IOException;
protected abstract ClickHouseHttpResponse post(String query, ClickHouseInputStream data,
List<ClickHouseExternalTable> tables, String url, Map<String, String> headers, ClickHouseConfig config)
throws IOException;

/**
* Checks whether the connection is reusable or not. This method will be called
Expand Down Expand Up @@ -296,11 +304,11 @@ public ClickHouseHttpResponse update(String query, Map<String, String> headers)
return post(query, null, null, null, headers, null);
}

public ClickHouseHttpResponse update(String query, InputStream data) throws IOException {
public ClickHouseHttpResponse update(String query, ClickHouseInputStream data) throws IOException {
return post(query, data, null, null, null, null);
}

public ClickHouseHttpResponse update(String query, InputStream data, Map<String, String> headers)
public ClickHouseHttpResponse update(String query, ClickHouseInputStream data, Map<String, String> headers)
throws IOException {
return post(query, data, null, null, headers, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ private ClickHouseHttpResponse buildResponse() throws IOException {
ClickHouseConfig c = config;
ClickHouseFormat format = c.getFormat();
TimeZone timeZone = c.getServerTimeZone();
boolean hasOutputFile = output != null && output.getUnderlyingFile().isAvailable();
boolean hasQueryResult = false;
// queryId, format and timeZone are only available for queries
if (!ClickHouseChecker.isNullOrEmpty(queryId)) {
Expand Down Expand Up @@ -102,8 +103,9 @@ private ClickHouseHttpResponse buildResponse() throws IOException {
action = null;
}
return new ClickHouseHttpResponse(this,
hasQueryResult ? ClickHouseClient.getAsyncResponseInputStream(c, source, action)
: ClickHouseClient.getResponseInputStream(c, source, action),
hasOutputFile ? ClickHouseInputStream.of(source, c.getReadBufferSize(), action)
: (hasQueryResult ? ClickHouseClient.getAsyncResponseInputStream(c, source, action)
: ClickHouseClient.getResponseInputStream(c, source, action)),
displayName, queryId, summary, format, timeZone);
}

Expand Down Expand Up @@ -202,7 +204,7 @@ protected boolean isReusable() {
}

@Override
protected ClickHouseHttpResponse post(String sql, InputStream data, List<ClickHouseExternalTable> tables,
protected ClickHouseHttpResponse post(String sql, ClickHouseInputStream data, List<ClickHouseExternalTable> tables,
String url, Map<String, String> headers, ClickHouseConfig config) throws IOException {
Charset charset = StandardCharsets.US_ASCII;
byte[] boundary = null;
Expand All @@ -216,16 +218,19 @@ protected ClickHouseHttpResponse post(String sql, InputStream data, List<ClickHo
setHeaders(conn, headers);

ClickHouseConfig c = config;
final boolean hasFile = data != null && data.getUnderlyingFile().isAvailable();
final boolean hasInput = data != null || boundary != null;
if (hasInput) {
conn.setChunkedStreamingMode(config.getRequestChunkSize());
} else {
// TODO conn.setFixedLengthStreamingMode(contentLength);
}
try (ClickHouseOutputStream out = hasInput
? ClickHouseClient.getAsyncRequestOutputStream(config, conn.getOutputStream(), null) // latch::countDown)
: ClickHouseClient.getRequestOutputStream(c, conn.getOutputStream(), null)) {
byte[] sqlBytes = sql.getBytes(StandardCharsets.UTF_8);
try (ClickHouseOutputStream out = hasFile
? ClickHouseOutputStream.of(conn.getOutputStream(), config.getWriteBufferSize())
: (hasInput
? ClickHouseClient.getAsyncRequestOutputStream(config, conn.getOutputStream(), null) // latch::countDown)
: ClickHouseClient.getRequestOutputStream(c, conn.getOutputStream(), null))) {
byte[] sqlBytes = hasFile ? new byte[0] : sql.getBytes(StandardCharsets.UTF_8);
if (boundary != null) {
byte[] linePrefix = new byte[] { '\r', '\n', '-', '-' };
byte[] lineSuffix = new byte[] { '\r', '\n' };
Expand Down Expand Up @@ -268,7 +273,7 @@ protected ClickHouseHttpResponse post(String sql, InputStream data, List<ClickHo
out.writeBytes(sqlBytes);
if (data != null && data.available() > 0) {
// append \n
if (sqlBytes[sqlBytes.length - 1] != (byte) '\n') {
if (sqlBytes.length > 0 && sqlBytes[sqlBytes.length - 1] != (byte) '\n') {
out.write(10);
}
ClickHouseInputStream.pipe(data, out, c.getWriteBufferSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseInputStream;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseOutputStream;
import com.clickhouse.client.ClickHousePipedOutputStream;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseSslContextProvider;
Expand Down Expand Up @@ -98,6 +99,7 @@ private ClickHouseHttpResponse buildResponse(ClickHouseConfig config, HttpRespon
: timeZone;
}

boolean hasOutputFile = output != null && output.getUnderlyingFile().isAvailable();
final InputStream source;
final Runnable action;
if (output != null) {
Expand All @@ -117,8 +119,9 @@ private ClickHouseHttpResponse buildResponse(ClickHouseConfig config, HttpRespon
}

return new ClickHouseHttpResponse(this,
ClickHouseInputStream.wrap(null, source, config.getReadBufferSize(), action,
config.getResponseCompressAlgorithm(), config.getResponseCompressLevel()),
hasOutputFile ? ClickHouseInputStream.of(source, config.getReadBufferSize(), action)
: ClickHouseInputStream.wrap(null, source, config.getReadBufferSize(), action,
config.getResponseCompressAlgorithm(), config.getResponseCompressLevel()),
displayName, queryId, summary, format, timeZone);
}

Expand Down Expand Up @@ -194,7 +197,8 @@ private CompletableFuture<HttpResponse<InputStream>> postRequest(HttpRequest req
}

private ClickHouseHttpResponse postStream(ClickHouseConfig config, HttpRequest.Builder reqBuilder, String boundary,
String sql, InputStream data, List<ClickHouseExternalTable> tables) throws IOException {
String sql, ClickHouseInputStream data, List<ClickHouseExternalTable> tables) throws IOException {
final boolean hasFile = data != null && data.getUnderlyingFile().isAvailable();
ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config,
null);
reqBuilder.POST(HttpRequest.BodyPublishers.ofInputStream(stream::getInputStream));
Expand Down Expand Up @@ -228,12 +232,14 @@ private ClickHouseHttpResponse postStream(ClickHouseConfig config, HttpRequest.B
writer.write("\r\n--" + boundary + "--\r\n");
writer.flush();
} else {
writer.write(sql);
writer.flush();
if (!hasFile) {
writer.write(sql);
writer.flush();
}

if (data.available() > 0) {
// append \n
if (sql.charAt(sql.length() - 1) != '\n') {
if (!hasFile && sql.charAt(sql.length() - 1) != '\n') {
stream.write(10);
}

Expand Down Expand Up @@ -281,7 +287,7 @@ private ClickHouseHttpResponse postString(ClickHouseConfig config, HttpRequest.B
}

@Override
protected ClickHouseHttpResponse post(String sql, InputStream data, List<ClickHouseExternalTable> tables,
protected ClickHouseHttpResponse post(String sql, ClickHouseInputStream data, List<ClickHouseExternalTable> tables,
String url, Map<String, String> headers, ClickHouseConfig config) throws IOException {
ClickHouseConfig c = config == null ? this.config : config;
HttpRequest.Builder reqBuilder = HttpRequest.newBuilder()
Expand Down
Loading

0 comments on commit d1a97b1

Please sign in to comment.