From d1a97b1658f9fd9eaa911f9b21fc978387383002 Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Mon, 18 Jul 2022 20:59:45 +0800 Subject: [PATCH] support direct upload/download in http client --- .../com/clickhouse/client/ClickHouseFile.java | 4 +- .../client/ClickHouseInputStream.java | 2 +- .../client/ClickHouseOutputStream.java | 2 +- .../client/ClientIntegrationTest.java | 118 ++++++++++++++++++ .../client/http/ClickHouseHttpConnection.java | 28 +++-- .../client/http/HttpUrlConnectionImpl.java | 21 ++-- .../client/http/HttpClientConnectionImpl.java | 20 +-- .../http/ClickHouseHttpConnectionTest.java | 7 +- 8 files changed, 170 insertions(+), 32 deletions(-) diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFile.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFile.java index 8af775f00..8887c476e 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFile.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFile.java @@ -68,7 +68,7 @@ public ClickHouseInputStream asInputStream() { try { return ClickHouseInputStream.wrap(this, new FileInputStream(getFile()), (int) ClickHouseClientOption.READ_BUFFER_SIZE.getDefaultValue(), null, - getCompressionAlgorithm(), getCompressionLevel()); + ClickHouseCompression.NONE, getCompressionLevel()); } catch (FileNotFoundException e) { throw new IllegalArgumentException(e); } @@ -87,7 +87,7 @@ public ClickHouseOutputStream asOutputStream() { try { return ClickHouseOutputStream.wrap(this, new FileOutputStream(getFile()), (int) ClickHouseClientOption.WRITE_BUFFER_SIZE.getDefaultValue(), null, - getCompressionAlgorithm(), getCompressionLevel()); + ClickHouseCompression.NONE, getCompressionLevel()); } catch (FileNotFoundException e) { throw new IllegalArgumentException(e); } diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java index 2fec61da4..d7967e117 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java @@ -165,7 +165,7 @@ public static ClickHouseInputStream of(ClickHouseFile file, int bufferSize, Runn } try { return wrap(file, new FileInputStream(file.getFile()), bufferSize, postCloseAction, - file.getCompressionAlgorithm(), file.getCompressionLevel()); + ClickHouseCompression.NONE, file.getCompressionLevel()); } catch (FileNotFoundException e) { throw new IllegalArgumentException(e); } diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java index 33bd4c2ac..8968e44da 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java @@ -87,7 +87,7 @@ public static ClickHouseOutputStream of(ClickHouseFile file, int bufferSize, Run } try { return wrap(file, new FileOutputStream(file.getFile()), bufferSize, postCloseAction, - file.getCompressionAlgorithm(), file.getCompressionLevel()); + ClickHouseCompression.NONE, file.getCompressionLevel()); } catch (FileNotFoundException e) { throw new IllegalArgumentException(e); } diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java index a801e36ba..4635b3381 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java @@ -2,9 +2,14 @@ import java.io.BufferedReader; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.OutputStream; import java.io.UncheckedIOException; import java.math.BigDecimal; import java.math.BigInteger; @@ -24,6 +29,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; import com.clickhouse.client.ClickHouseClientBuilder.Agent; import com.clickhouse.client.config.ClickHouseBufferingMode; @@ -119,6 +126,16 @@ protected Object[][] getCompressionMatrix() { return array; } + @DataProvider(name = "fileProcessMatrix") + protected Object[][] getFileProcessMatrix() { + return new Object[][] { + { true, true }, + { true, false }, + { false, true }, + { false, false }, + }; + } + @DataProvider(name = "renameMethods") protected Object[][] getRenameMethods() { return new Object[][] { @@ -1035,6 +1052,47 @@ public void testDump() throws Exception { Files.delete(temp); } + @Test(dataProvider = "fileProcessMatrix", groups = "integration") + public void testDumpFile(boolean gzipCompressed, boolean useOneLiner) throws Exception { + ClickHouseNode server = getServer(); + if (server.getProtocol() != ClickHouseProtocol.HTTP) { + throw new SkipException("Skip as only http implementation works well"); + } + + File file = File.createTempFile("chc", ".data"); + ClickHouseFile wrappedFile = ClickHouseFile.of(file, + gzipCompressed ? ClickHouseCompression.GZIP : ClickHouseCompression.NONE, 0, + ClickHouseFormat.CSV); + String query = "select number, if(number % 2 = 0, null, toString(number)) str from numbers(10)"; + if (useOneLiner) { + ClickHouseClient.dump(server, query, wrappedFile).get(); + } else { + try (ClickHouseClient client = getClient(); + ClickHouseResponse response = client.connect(server).query(query).output(wrappedFile) + .executeAndWait()) { + // ignore + } + } + try (InputStream in = gzipCompressed ? new GZIPInputStream(new FileInputStream(file)) + : new FileInputStream(file); ByteArrayOutputStream out = new ByteArrayOutputStream()) { + ClickHouseInputStream.pipe(in, out, 512); + String content = new String(out.toByteArray(), StandardCharsets.US_ASCII); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < 10; i++) { + builder.append(i).append(','); + if (i % 2 == 0) { + builder.append("\\N"); + } else { + builder.append('"').append(i).append('"'); + } + builder.append('\n'); + } + Assert.assertEquals(content, builder.toString()); + } finally { + file.delete(); + } + } + @Test(groups = { "integration" }) public void testCustomLoad() throws Exception { ClickHouseNode server = getServer(); @@ -1117,6 +1175,66 @@ public void testLoadCsv() throws Exception { } } + @Test(dataProvider = "fileProcessMatrix", groups = "integration") + public void testLoadFile(boolean gzipCompressed, boolean useOneLiner) throws Exception { + ClickHouseNode server = getServer(); + if (server.getProtocol() != ClickHouseProtocol.HTTP) { + throw new SkipException("Skip as only http implementation works well"); + } + + File file = File.createTempFile("chc", ".data"); + Object[][] data = new Object[][] { + { 1, "12345" }, + { 2, "23456" }, + { 3, "\\N" }, + { 4, "x" }, + { 5, "y" }, + }; + try (OutputStream out = gzipCompressed ? new GZIPOutputStream(new FileOutputStream(file)) + : new FileOutputStream(file)) { + for (Object[] row : data) { + out.write((row[0] + "," + row[1]).getBytes(StandardCharsets.US_ASCII)); + if ((int) row[0] != 5) { + out.write(10); + } + } + out.flush(); + } + + ClickHouseClient.send(server, "drop table if exists test_load_file", + "create table test_load_file(a Int32, b Nullable(String))engine=Memory").get(); + ClickHouseFile wrappedFile = ClickHouseFile.of(file, + gzipCompressed ? ClickHouseCompression.GZIP : ClickHouseCompression.NONE, 0, + ClickHouseFormat.CSV); + if (useOneLiner) { + ClickHouseClient + .load(server, "test_load_file", wrappedFile) + .get(); + } else { + try (ClickHouseClient client = getClient(); + ClickHouseResponse response = client.connect(server).write().table("test_load_file") + .data(wrappedFile).executeAndWait()) { + // ignore + } + } + try (ClickHouseClient client = getClient(); + ClickHouseResponse response = client.connect(server).format(ClickHouseFormat.RowBinaryWithNamesAndTypes) + .query("select * from test_load_file order by a").executeAndWait()) { + int row = 0; + for (ClickHouseRecord r : response.records()) { + Assert.assertEquals(r.getValue(0).asObject(), data[row][0]); + if (row == 2) { + Assert.assertNull(r.getValue(1).asObject()); + } else { + Assert.assertEquals(r.getValue(1).asObject(), data[row][1]); + } + row++; + } + } finally { + file.delete(); + } + } + @Test(groups = { "integration" }) public void testLoadRawData() throws Exception { ClickHouseNode server = getServer(); diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java index 64e73abe0..436e93c53 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java @@ -1,7 +1,6 @@ package com.clickhouse.client.http; import java.io.IOException; -import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.nio.charset.Charset; @@ -17,6 +16,7 @@ import com.clickhouse.client.ClickHouseCompression; import com.clickhouse.client.ClickHouseConfig; import com.clickhouse.client.ClickHouseCredentials; +import com.clickhouse.client.ClickHouseInputStream; import com.clickhouse.client.ClickHouseNode; import com.clickhouse.client.ClickHouseOutputStream; import com.clickhouse.client.ClickHouseRequest; @@ -59,15 +59,22 @@ static String buildQueryParams(ClickHouseRequest request) { appendQueryParameter(builder, cp.getKey(), cp.getValue()); } - if (config.isResponseCompressed()) { - // request server to compress response - appendQueryParameter(builder, "compress", "1"); - } - if (config.isRequestCompressed()) { + ClickHouseInputStream chIn = request.getInputStream().orElse(null); + if (chIn != null && chIn.getUnderlyingFile().isAvailable()) { + appendQueryParameter(builder, "query", request.getStatements().get(0)); + } else if (config.isRequestCompressed()) { // inform server that client's request is compressed appendQueryParameter(builder, "decompress", "1"); } + ClickHouseOutputStream chOut = request.getOutputStream().orElse(null); + if (chOut != null && chOut.getUnderlyingFile().isAvailable()) { + appendQueryParameter(builder, "enable_http_compression", "1"); + } else if (config.isResponseCompressed()) { + // request server to compress response + appendQueryParameter(builder, "compress", "1"); + } + Map settings = request.getSettings(); List stmts = request.getStatements(false); String settingKey = "max_execution_time"; @@ -263,8 +270,9 @@ protected Map mergeHeaders(Map requestHeaders) { * @throws IOException when error occured posting request and/or server failed * to respond */ - protected abstract ClickHouseHttpResponse post(String query, InputStream data, List tables, - String url, Map headers, ClickHouseConfig config) throws IOException; + protected abstract ClickHouseHttpResponse post(String query, ClickHouseInputStream data, + List tables, String url, Map headers, ClickHouseConfig config) + throws IOException; /** * Checks whether the connection is reusable or not. This method will be called @@ -296,11 +304,11 @@ public ClickHouseHttpResponse update(String query, Map headers) return post(query, null, null, null, headers, null); } - public ClickHouseHttpResponse update(String query, InputStream data) throws IOException { + public ClickHouseHttpResponse update(String query, ClickHouseInputStream data) throws IOException { return post(query, data, null, null, null, null); } - public ClickHouseHttpResponse update(String query, InputStream data, Map headers) + public ClickHouseHttpResponse update(String query, ClickHouseInputStream data, Map headers) throws IOException { return post(query, data, null, null, headers, null); } diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpUrlConnectionImpl.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpUrlConnectionImpl.java index 86fe4b1b7..41ef9a7b7 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpUrlConnectionImpl.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpUrlConnectionImpl.java @@ -73,6 +73,7 @@ private ClickHouseHttpResponse buildResponse() throws IOException { ClickHouseConfig c = config; ClickHouseFormat format = c.getFormat(); TimeZone timeZone = c.getServerTimeZone(); + boolean hasOutputFile = output != null && output.getUnderlyingFile().isAvailable(); boolean hasQueryResult = false; // queryId, format and timeZone are only available for queries if (!ClickHouseChecker.isNullOrEmpty(queryId)) { @@ -102,8 +103,9 @@ private ClickHouseHttpResponse buildResponse() throws IOException { action = null; } return new ClickHouseHttpResponse(this, - hasQueryResult ? ClickHouseClient.getAsyncResponseInputStream(c, source, action) - : ClickHouseClient.getResponseInputStream(c, source, action), + hasOutputFile ? ClickHouseInputStream.of(source, c.getReadBufferSize(), action) + : (hasQueryResult ? ClickHouseClient.getAsyncResponseInputStream(c, source, action) + : ClickHouseClient.getResponseInputStream(c, source, action)), displayName, queryId, summary, format, timeZone); } @@ -202,7 +204,7 @@ protected boolean isReusable() { } @Override - protected ClickHouseHttpResponse post(String sql, InputStream data, List tables, + protected ClickHouseHttpResponse post(String sql, ClickHouseInputStream data, List tables, String url, Map headers, ClickHouseConfig config) throws IOException { Charset charset = StandardCharsets.US_ASCII; byte[] boundary = null; @@ -216,16 +218,19 @@ protected ClickHouseHttpResponse post(String sql, InputStream data, List 0) { // append \n - if (sqlBytes[sqlBytes.length - 1] != (byte) '\n') { + if (sqlBytes.length > 0 && sqlBytes[sqlBytes.length - 1] != (byte) '\n') { out.write(10); } ClickHouseInputStream.pipe(data, out, c.getWriteBufferSize()); diff --git a/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java b/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java index 499c7bfa4..5418fd0c1 100644 --- a/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java +++ b/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java @@ -7,6 +7,7 @@ import com.clickhouse.client.ClickHouseFormat; import com.clickhouse.client.ClickHouseInputStream; import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseOutputStream; import com.clickhouse.client.ClickHousePipedOutputStream; import com.clickhouse.client.ClickHouseRequest; import com.clickhouse.client.ClickHouseSslContextProvider; @@ -98,6 +99,7 @@ private ClickHouseHttpResponse buildResponse(ClickHouseConfig config, HttpRespon : timeZone; } + boolean hasOutputFile = output != null && output.getUnderlyingFile().isAvailable(); final InputStream source; final Runnable action; if (output != null) { @@ -117,8 +119,9 @@ private ClickHouseHttpResponse buildResponse(ClickHouseConfig config, HttpRespon } return new ClickHouseHttpResponse(this, - ClickHouseInputStream.wrap(null, source, config.getReadBufferSize(), action, - config.getResponseCompressAlgorithm(), config.getResponseCompressLevel()), + hasOutputFile ? ClickHouseInputStream.of(source, config.getReadBufferSize(), action) + : ClickHouseInputStream.wrap(null, source, config.getReadBufferSize(), action, + config.getResponseCompressAlgorithm(), config.getResponseCompressLevel()), displayName, queryId, summary, format, timeZone); } @@ -194,7 +197,8 @@ private CompletableFuture> postRequest(HttpRequest req } private ClickHouseHttpResponse postStream(ClickHouseConfig config, HttpRequest.Builder reqBuilder, String boundary, - String sql, InputStream data, List tables) throws IOException { + String sql, ClickHouseInputStream data, List tables) throws IOException { + final boolean hasFile = data != null && data.getUnderlyingFile().isAvailable(); ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config, null); reqBuilder.POST(HttpRequest.BodyPublishers.ofInputStream(stream::getInputStream)); @@ -228,12 +232,14 @@ private ClickHouseHttpResponse postStream(ClickHouseConfig config, HttpRequest.B writer.write("\r\n--" + boundary + "--\r\n"); writer.flush(); } else { - writer.write(sql); - writer.flush(); + if (!hasFile) { + writer.write(sql); + writer.flush(); + } if (data.available() > 0) { // append \n - if (sql.charAt(sql.length() - 1) != '\n') { + if (!hasFile && sql.charAt(sql.length() - 1) != '\n') { stream.write(10); } @@ -281,7 +287,7 @@ private ClickHouseHttpResponse postString(ClickHouseConfig config, HttpRequest.B } @Override - protected ClickHouseHttpResponse post(String sql, InputStream data, List tables, + protected ClickHouseHttpResponse post(String sql, ClickHouseInputStream data, List tables, String url, Map headers, ClickHouseConfig config) throws IOException { ClickHouseConfig c = config == null ? this.config : config; HttpRequest.Builder reqBuilder = HttpRequest.newBuilder() diff --git a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpConnectionTest.java b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpConnectionTest.java index 71e9bae4d..8bb67d6ff 100644 --- a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpConnectionTest.java +++ b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpConnectionTest.java @@ -1,13 +1,13 @@ package com.clickhouse.client.http; import java.io.IOException; -import java.io.InputStream; import java.util.List; import java.util.Map; import com.clickhouse.client.ClickHouseClient; import com.clickhouse.client.ClickHouseConfig; import com.clickhouse.client.ClickHouseFormat; +import com.clickhouse.client.ClickHouseInputStream; import com.clickhouse.client.ClickHouseNode; import com.clickhouse.client.ClickHouseProtocol; import com.clickhouse.client.ClickHouseRequest; @@ -24,8 +24,9 @@ protected SimpleHttpConnection(ClickHouseNode server, ClickHouseRequest reque } @Override - protected ClickHouseHttpResponse post(String query, InputStream data, List tables, - String url, Map headers, ClickHouseConfig config) throws IOException { + protected ClickHouseHttpResponse post(String query, ClickHouseInputStream data, + List tables, String url, Map headers, ClickHouseConfig config) + throws IOException { return null; }