diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHousePreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHousePreparedStatement.java index b899b64cc..2cb2d9c73 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHousePreparedStatement.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHousePreparedStatement.java @@ -68,17 +68,6 @@ default void setObject(int parameterIndex, Object x, int targetSqlType) throws S setObject(parameterIndex, x, targetSqlType, 0); } - @Override - default boolean execute() throws SQLException { - return executeQuery() != null; - } - - @Override - default void addBatch(String sql) throws SQLException { - throw SqlExceptionUtils - .unsupportedError("addBatch(String) cannot be called in PreparedStatement or CallableStatement!"); - } - @Override default void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException { String s = null; diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/SqlExceptionUtils.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/SqlExceptionUtils.java index 10f7a7a98..da5b824be 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/SqlExceptionUtils.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/SqlExceptionUtils.java @@ -1,6 +1,7 @@ package com.clickhouse.jdbc; import java.net.ConnectException; +import java.sql.BatchUpdateException; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; @@ -76,6 +77,53 @@ public static SQLException handle(Throwable e) { return new SQLException(cause); } + public static BatchUpdateException batchUpdateError(Throwable e, long[] updateCounts) { + if (e == null) { + return new BatchUpdateException("Something went wrong when performing batch update", SQL_STATE_CLIENT_ERROR, + 0, updateCounts, null); + } else if (e instanceof BatchUpdateException) { + return (BatchUpdateException) e; + } else if (e instanceof ClickHouseException) { + return batchUpdateError(e, updateCounts); + } else if (e instanceof SQLException) { + SQLException sqlExp = (SQLException) e; + return new BatchUpdateException(sqlExp.getMessage(), sqlExp.getSQLState(), sqlExp.getErrorCode(), + updateCounts, null); + } + + Throwable cause = e.getCause(); + if (e instanceof BatchUpdateException) { + return (BatchUpdateException) e; + } else if (cause instanceof ClickHouseException) { + return batchUpdateError(cause, updateCounts); + } else if (e instanceof SQLException) { + SQLException sqlExp = (SQLException) e; + return new BatchUpdateException(sqlExp.getMessage(), sqlExp.getSQLState(), sqlExp.getErrorCode(), + updateCounts, null); + } else if (cause == null) { + cause = e; + } + + return new BatchUpdateException("Unexpected error", SQL_STATE_SQL_ERROR, 0, updateCounts, cause); + } + + public static SQLException emptyBatchError() { + return clientError("Please call addBatch method at least once before batch execution"); + } + + public static BatchUpdateException queryInBatchError(int[] updateCounts) { + return new BatchUpdateException("Query is not allow in batch update", SQL_STATE_CLIENT_ERROR, updateCounts); + } + + public static BatchUpdateException queryInBatchError(long[] updateCounts) { + return new BatchUpdateException("Query is not allow in batch update", SQL_STATE_CLIENT_ERROR, 0, updateCounts, + null); + } + + public static SQLException undeterminedExecutionError() { + return clientError("Please either call clearBatch() to clean up context first, or use executeBatch() instead"); + } + public static SQLException forCancellation(Exception e) { Throwable cause = e.getCause(); if (cause == null) { diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/AbstractPreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/AbstractPreparedStatement.java new file mode 100644 index 000000000..5fe5cfaa9 --- /dev/null +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/AbstractPreparedStatement.java @@ -0,0 +1,153 @@ +package com.clickhouse.jdbc.internal; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import com.clickhouse.client.ClickHouseRequest; +import com.clickhouse.jdbc.SqlExceptionUtils; + +public abstract class AbstractPreparedStatement extends ClickHouseStatementImpl implements PreparedStatement { + protected AbstractPreparedStatement(ClickHouseConnectionImpl connection, ClickHouseRequest request, + int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + super(connection, request, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + protected abstract long[] executeAny(boolean asBatch) throws SQLException; + + @Override + public final void addBatch(String sql) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "addBatch(String) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final boolean execute(String sql) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "execute(String) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final boolean execute(String sql, int autoGeneratedKeys) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "execute(String, int) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final boolean execute(String sql, int[] columnIndexes) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "execute(String, int[]) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final boolean execute(String sql, String[] columnNames) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "execute(String, String[]) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public long[] executeLargeBatch() throws SQLException { + return executeAny(true); + } + + @Override + public final long executeLargeUpdate(String sql) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeLargeUpdate(String) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final long executeLargeUpdate(String sql, int autoGeneratedKeys) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeLargeUpdate(String, int) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final long executeLargeUpdate(String sql, int[] columnIndexes) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeLargeUpdate(String, int[]) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final long executeLargeUpdate(String sql, String[] columnNames) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeLargeUpdate(String, String[]) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final ResultSet executeQuery(String sql) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeQuery(String) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final int executeUpdate() throws SQLException { + return (int) executeLargeUpdate(); + } + + @Override + public final int executeUpdate(String sql) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeUpate(String) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeUpdate(String, int) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final int executeUpdate(String sql, int[] columnIndexes) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeUpdate(String, int[]) cannot be called in PreparedStatement or CallableStatement!"); + } + + @Override + public final int executeUpdate(String sql, String[] columnNames) throws SQLException { + ensureOpen(); + + throw SqlExceptionUtils + .unsupportedError( + "executeUpdate(String, String[]) cannot be called in PreparedStatement or CallableStatement!"); + } +} diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseConnectionImpl.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseConnectionImpl.java index b75f72357..92d726d42 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseConnectionImpl.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseConnectionImpl.java @@ -608,7 +608,7 @@ public PreparedStatement prepareStatement(String sql, int resultSetType, int res clientRequest.write().query(parsedStmt.getSQL(), newQueryId()), ClickHouseColumn.parse(parsedStmt.getInput()), resultSetType, resultSetConcurrency, resultSetHoldability); - } else if (!parsedStmt.containsKeyword("SELECT") && + } else if (!parsedStmt.containsKeyword("SELECT") && !parsedStmt.hasValues() && (!parsedStmt.hasFormat() || clientRequest.getFormat().name().equals(parsedStmt.getFormat()))) { ps = new InputBasedPreparedStatement(this, clientRequest.write().query(parsedStmt.getSQL(), newQueryId()), diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java index 3ce069c90..d1a314ac4 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java @@ -7,6 +7,7 @@ import java.sql.SQLWarning; import java.sql.Statement; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; @@ -22,7 +23,6 @@ import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.client.config.ClickHouseOption; import com.clickhouse.client.data.ClickHouseExternalTable; -import com.clickhouse.client.data.ClickHouseSimpleResponse; import com.clickhouse.client.logging.Logger; import com.clickhouse.client.logging.LoggerFactory; import com.clickhouse.jdbc.ClickHouseConnection; @@ -43,6 +43,8 @@ public class ClickHouseStatementImpl extends JdbcWrapper implements ClickHouseSt private final int resultSetConcurrency; private final int resultSetHoldability; + private final List batchStmts; + private boolean closed; private boolean closeOnCompletion; @@ -50,16 +52,15 @@ public class ClickHouseStatementImpl extends JdbcWrapper implements ClickHouseSt private boolean escapeScan; private int fetchSize; private int maxFieldSize; - private int maxRows; + private long maxRows; private boolean poolable; private volatile String queryId; private int queryTimeout; private ClickHouseResultSet currentResult; - private int currentUpdateCount; + private long currentUpdateCount; protected ClickHouseSqlStatement[] parsedStmts; - protected List batchStmts; private ClickHouseResponse getLastResponse(Map options, List tables, Map settings) throws SQLException { @@ -149,9 +150,8 @@ protected ClickHouseResponse executeStatement(ClickHouseSqlStatement stmt, protected int executeInsert(String sql, InputStream input) throws SQLException { ClickHouseResponseSummary summary = null; try (ClickHouseResponse resp = request.write().query(sql, queryId = connection.newQueryId()) - .format(ClickHouseFormat.RowBinary).data(input).execute() - .get()) { - updateResult(new ClickHouseSqlStatement(sql, StatementType.INSERT), resp); + .format(ClickHouseFormat.RowBinary).data(input).execute().get(); + ResultSet rs = updateResult(new ClickHouseSqlStatement(sql, StatementType.INSERT), resp)) { summary = resp.getSummary(); } catch (InterruptedException e) { log.error("can not close stream: %s", e.getMessage()); @@ -191,22 +191,28 @@ protected ClickHouseSqlStatement parseSqlStatements(String sql) { return getLastStatement(); } + protected ClickHouseResultSet newEmptyResultSet() throws SQLException { + return new ClickHouseResultSet("", "", this, ClickHouseResponse.EMPTY); + } + protected ResultSet updateResult(ClickHouseSqlStatement stmt, ClickHouseResponse response) throws SQLException { ResultSet rs = null; if (stmt.isQuery() || !response.getColumns().isEmpty()) { - currentUpdateCount = -1; + currentUpdateCount = -1L; currentResult = new ClickHouseResultSet(stmt.getDatabaseOrDefault(getConnection().getCurrentDatabase()), stmt.getTable(), this, response); rs = currentResult; } else { - currentUpdateCount = response.getSummary().getUpdateCount(); - if (currentUpdateCount <= 0) { - currentUpdateCount = 1; + currentUpdateCount = response.getSummary().getWrittenRows(); + // FIXME apparently this is not always true + if (currentUpdateCount <= 0L) { + currentUpdateCount = 1L; } + currentResult = null; response.close(); } - return rs; + return rs == null ? newEmptyResultSet() : rs; } protected ClickHouseStatementImpl(ClickHouseConnectionImpl connection, ClickHouseRequest request, @@ -228,29 +234,34 @@ protected ClickHouseStatementImpl(ClickHouseConnectionImpl connection, ClickHous this.fetchSize = connection.getJdbcConfig().getFetchSize(); this.maxFieldSize = 0; - this.maxRows = 0; + this.maxRows = 0L; this.poolable = false; this.queryId = null; this.queryTimeout = 0; this.currentResult = null; - this.currentUpdateCount = -1; + this.currentUpdateCount = -1L; - this.batchStmts = new ArrayList<>(); + this.batchStmts = new LinkedList<>(); ClickHouseConfig c = request.getConfig(); - setMaxRows(c.getMaxResultRows()); + setLargeMaxRows(c.getMaxResultRows()); setQueryTimeout(c.getMaxExecutionTime()); } + @Override + public boolean execute(String sql) throws SQLException { + executeQuery(sql); + return currentResult != null; + } + @Override public ResultSet executeQuery(String sql) throws SQLException { ensureOpen(); - // forcibly disable extremes for ResultSet queries - // additionalDBParams = importAdditionalDBParameters(additionalDBParams); - // FIXME respect the value set in additionalDBParams? - // additionalDBParams.put(ClickHouseQueryParam.EXTREMES, "0"); + if (!batchStmts.isEmpty()) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } parseSqlStatements(sql); @@ -268,8 +279,11 @@ public ResultSet executeQuery(String sql) throws SQLException { } @Override - public int executeUpdate(String sql) throws SQLException { + public long executeLargeUpdate(String sql) throws SQLException { ensureOpen(); + if (!batchStmts.isEmpty()) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } parseSqlStatements(sql); @@ -280,7 +294,12 @@ public int executeUpdate(String sql) throws SQLException { throw SqlExceptionUtils.handle(e); } - return summary != null ? (int) summary.getWrittenRows() : 1; + return summary != null ? summary.getWrittenRows() : 1L; + } + + @Override + public int executeUpdate(String sql) throws SQLException { + return (int) executeLargeUpdate(sql); } @Override @@ -310,21 +329,26 @@ public void setMaxFieldSize(int max) throws SQLException { } @Override - public int getMaxRows() throws SQLException { + public long getLargeMaxRows() throws SQLException { ensureOpen(); return maxRows; } @Override - public void setMaxRows(int max) throws SQLException { - if (max < 0) { + public int getMaxRows() throws SQLException { + return (int) getLargeMaxRows(); + } + + @Override + public void setLargeMaxRows(long max) throws SQLException { + if (max < 0L) { throw SqlExceptionUtils.clientError("Max rows cannot be set to negative number"); } ensureOpen(); if (this.maxRows != max) { - if (max == 0) { + if (max == 0L) { request.removeSetting("max_result_rows"); request.removeSetting("result_overflow_mode"); } else { @@ -335,6 +359,11 @@ public void setMaxRows(int max) throws SQLException { } } + @Override + public void setMaxRows(int max) throws SQLException { + setLargeMaxRows(max); + } + @Override public void setEscapeProcessing(boolean enable) throws SQLException { ensureOpen(); @@ -402,13 +431,6 @@ public void setCursorName(String name) throws SQLException { cursorName = name; } - @Override - public boolean execute(String sql) throws SQLException { - // currentResult is stored here. InputString and currentResult will be closed on - // this.close() - return executeQuery(sql) != null; - } - @Override public ResultSet getResultSet() throws SQLException { ensureOpen(); @@ -417,12 +439,17 @@ public ResultSet getResultSet() throws SQLException { } @Override - public int getUpdateCount() throws SQLException { + public long getLargeUpdateCount() throws SQLException { ensureOpen(); return currentUpdateCount; } + @Override + public int getUpdateCount() throws SQLException { + return (int) getLargeUpdateCount(); + } + @Override public boolean getMoreResults() throws SQLException { ensureOpen(); @@ -431,7 +458,7 @@ public boolean getMoreResults() throws SQLException { currentResult.close(); currentResult = null; } - currentUpdateCount = -1; + currentUpdateCount = -1L; return false; } @@ -504,29 +531,46 @@ public void addBatch(String sql) throws SQLException { public void clearBatch() throws SQLException { ensureOpen(); - this.batchStmts = new ArrayList<>(); + this.batchStmts.clear(); } @Override public int[] executeBatch() throws SQLException { + long[] largeUpdateCounts = executeLargeBatch(); + + int len = largeUpdateCounts.length; + int[] results = new int[len]; + for (int i = 0; i < len; i++) { + results[i] = (int) largeUpdateCounts[i]; + } + return results; + } + + @Override + public long[] executeLargeBatch() throws SQLException { ensureOpen(); + if (batchStmts.isEmpty()) { + throw SqlExceptionUtils.emptyBatchError(); + } boolean continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError(); - int len = batchStmts.size(); - int[] results = new int[len]; + long[] results = new long[batchStmts.size()]; try { - for (int i = 0; i < len; i++) { - ClickHouseSqlStatement s = batchStmts.get(i); - try (ClickHouseResponse r = executeStatement(s, null, null, null)) { - updateResult(s, r); - results[i] = currentUpdateCount <= 0 ? 0 : currentUpdateCount; + int i = 0; + for (ClickHouseSqlStatement s : batchStmts) { + try (ClickHouseResponse r = executeStatement(s, null, null, null); ResultSet rs = updateResult(s, r)) { + if (currentResult != null) { + throw SqlExceptionUtils.queryInBatchError(results); + } + results[i] = currentUpdateCount <= 0L ? 0L : currentUpdateCount; } catch (Exception e) { + results[i] = EXECUTE_FAILED; if (!continueOnError) { - throw SqlExceptionUtils.handle(e); + throw SqlExceptionUtils.batchUpdateError(e, results); } - - results[i] = EXECUTE_FAILED; - log.error("Faled to execute task %d of %d", i + 1, len, e); + log.error("Faled to execute task %d of %d", i + 1, batchStmts.size(), e); + } finally { + i++; } } } finally { @@ -559,8 +603,7 @@ public boolean getMoreResults(int current) throws SQLException { public ResultSet getGeneratedKeys() throws SQLException { ensureOpen(); - return new ClickHouseResultSet(request.getConfig().getDatabase(), "unknown", this, - ClickHouseSimpleResponse.EMPTY); + return new ClickHouseResultSet(request.getConfig().getDatabase(), "unknown", this, ClickHouseResponse.EMPTY); } @Override diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java index 9fb3ba02c..f6e9d35bc 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java @@ -2,7 +2,6 @@ import java.io.IOException; import java.math.BigDecimal; -import java.nio.charset.StandardCharsets; import java.sql.Array; import java.sql.Date; import java.sql.ParameterMetaData; @@ -33,7 +32,7 @@ import com.clickhouse.jdbc.ClickHousePreparedStatement; import com.clickhouse.jdbc.SqlExceptionUtils; -public class InputBasedPreparedStatement extends ClickHouseStatementImpl implements ClickHousePreparedStatement { +public class InputBasedPreparedStatement extends AbstractPreparedStatement implements ClickHousePreparedStatement { private static final Logger log = LoggerFactory.getLogger(InputBasedPreparedStatement.class); private final Calendar defaultCalendar; @@ -88,6 +87,56 @@ protected void ensureParams() throws SQLException { } } + @Override + protected long[] executeAny(boolean asBatch) throws SQLException { + ensureOpen(); + boolean continueOnError = false; + if (asBatch) { + if (counter < 1) { + throw SqlExceptionUtils.emptyBatchError(); + } + continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError(); + } else { + if (counter != 0) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } + addBatch(); + } + + long[] results = new long[counter]; + long rows = 0; + try { + stream.close(); + rows = executeInsert(getRequest().getStatements(false).get(0), stream.getInput()); + if (asBatch && getResultSet() != null) { + throw SqlExceptionUtils.queryInBatchError(results); + } + // FIXME grpc and tcp by default can provides accurate result + Arrays.fill(results, 1); + } catch (Exception e) { + // just a wild guess... + if (rows < 1) { + results[0] = EXECUTE_FAILED; + } else { + if (rows >= counter) { + rows = counter; + } + for (int i = 0, len = (int) rows - 1; i < len; i++) { + results[i] = 1; + } + results[(int) rows] = EXECUTE_FAILED; + } + if (!continueOnError) { + throw SqlExceptionUtils.batchUpdateError(e, results); + } + log.error("Failed to execute batch insert of %d records", counter + 1, e); + } finally { + clearBatch(); + } + + return results; + } + protected int toArrayIndex(int parameterIndex) throws SQLException { if (parameterIndex < 1 || parameterIndex > values.length) { throw SqlExceptionUtils.clientError(ClickHouseUtils @@ -99,16 +148,32 @@ protected int toArrayIndex(int parameterIndex) throws SQLException { @Override public ResultSet executeQuery() throws SQLException { - throw SqlExceptionUtils.clientError("Input function can be only used for insertion not query"); + ensureParams(); + // log.warn("Input function can be only used for insertion not query"); + if (executeAny(false)[0] == EXECUTE_FAILED) { + throw new SQLException("Query failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); + } + + ResultSet rs = getResultSet(); + if (rs != null) { // should not happen + try { + rs.close(); + } catch (Exception e) { + // ignore + } + } + return newEmptyResultSet(); } @Override - public int executeUpdate() throws SQLException { + public long executeLargeUpdate() throws SQLException { ensureParams(); - addBatch(); - int row = getUpdateCount(); - return row > 0 ? row : 0; + if (executeAny(false)[0] == EXECUTE_FAILED) { + throw new SQLException("Update failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); + } + long row = getLargeUpdateCount(); + return row > 0L ? row : 0L; } @Override @@ -214,19 +279,12 @@ public void setObject(int parameterIndex, Object x) throws SQLException { public boolean execute() throws SQLException { ensureParams(); - addBatch(); - executeBatch(); + if (executeAny(false)[0] == EXECUTE_FAILED) { + throw new SQLException("Execution failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); + } return false; } - @Override - public void addBatch(String sql) throws SQLException { - ensureOpen(); - - throw SqlExceptionUtils - .unsupportedError("addBatch(String) cannot be called in PreparedStatement or CallableStatement!"); - } - @Override public void addBatch() throws SQLException { ensureOpen(); @@ -249,33 +307,6 @@ public void addBatch() throws SQLException { clearParameters(); } - @Override - public int[] executeBatch() throws SQLException { - ensureOpen(); - - boolean continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError(); - int[] results = new int[counter]; - int result = 0; - try { - stream.close(); - result = executeInsert(getRequest().getStatements(false).get(0), stream.getInput()); - } catch (Exception e) { - if (!continueOnError) { - throw SqlExceptionUtils.handle(e); - } - - result = EXECUTE_FAILED; - log.error("Failed to execute batch insert of %d records", counter + 1, e); - } finally { - clearBatch(); - } - - // FIXME grpc and tcp by default can provides accurate result - Arrays.fill(results, 1); - - return results; - } - @Override public void clearBatch() throws SQLException { ensureOpen(); diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/SqlBasedPreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/SqlBasedPreparedStatement.java index 0492b5f2a..047599cc7 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/SqlBasedPreparedStatement.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/SqlBasedPreparedStatement.java @@ -38,7 +38,7 @@ import com.clickhouse.jdbc.SqlExceptionUtils; import com.clickhouse.jdbc.parser.ClickHouseSqlStatement; -public class SqlBasedPreparedStatement extends ClickHouseStatementImpl implements ClickHousePreparedStatement { +public class SqlBasedPreparedStatement extends AbstractPreparedStatement implements ClickHousePreparedStatement { private static final Logger log = LoggerFactory.getLogger(SqlBasedPreparedStatement.class); private final Calendar defaultCalendar; @@ -116,38 +116,64 @@ protected void ensureParams() throws SQLException { } } - protected int[] executeBatch(boolean keepLastResponse) throws SQLException { + @Override + protected long[] executeAny(boolean asBatch) throws SQLException { ensureOpen(); + boolean continueOnError = false; + if (asBatch) { + if (counter < 1) { + throw SqlExceptionUtils.emptyBatchError(); + } + continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError(); + } else { + if (counter != 0) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } + addBatch(); + } - boolean continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError(); - int[] results = new int[counter]; + long[] results = new long[counter]; ClickHouseResponse r = null; if (builder.length() > 0) { // insert ... values - int result = 0; + long rows = 0L; try { r = executeStatement(builder.toString(), null, null, null); updateResult(parsedStmt, r); - long rows = r.getSummary().getWrittenRows(); - if (rows > 0 && rows != counter) { - log.warn("Expect %d rows being inserted but got %d", counter, rows); + if (asBatch && getResultSet() != null) { + throw SqlExceptionUtils.queryInBatchError(results); } - - result = 1; + rows = r.getSummary().getWrittenRows(); + // no effective rows for update and delete, and the number for insertion is not + // accurate as well + // if (rows > 0L && rows != counter) { + // log.warn("Expect %d rows being inserted but only got %d", counter, rows); + // } + // FIXME needs to enhance http client before getting back to this + Arrays.fill(results, 1); } catch (Exception e) { + // just a wild guess... + if (rows < 1) { + results[0] = EXECUTE_FAILED; + } else { + if (rows >= counter) { + rows = counter; + } + for (int i = 0, len = (int) rows - 1; i < len; i++) { + results[i] = 1; + } + results[(int) rows] = EXECUTE_FAILED; + } + if (!continueOnError) { - throw SqlExceptionUtils.handle(e); + throw SqlExceptionUtils.batchUpdateError(e, results); } - // actually we don't know which ones failed - result = EXECUTE_FAILED; log.error("Failed to execute batch insertion of %d records", counter, e); } finally { - if (!keepLastResponse && r != null) { + if (asBatch && r != null) { r.close(); } clearBatch(); } - - Arrays.fill(results, result); } else { int index = 0; try { @@ -157,20 +183,21 @@ protected int[] executeBatch(boolean keepLastResponse) throws SQLException { try { r = executeStatement(builder.toString(), null, null, null); updateResult(parsedStmt, r); + if (asBatch && getResultSet() != null) { + throw SqlExceptionUtils.queryInBatchError(results); + } int count = getUpdateCount(); results[index] = count > 0 ? count : 0; } catch (Exception e) { + results[index] = EXECUTE_FAILED; if (!continueOnError) { - throw SqlExceptionUtils.handle(e); + throw SqlExceptionUtils.batchUpdateError(e, results); } - results[index] = EXECUTE_FAILED; log.error("Failed to execute batch insert at %d of %d", index + 1, counter, e); } finally { index++; - if (!keepLastResponse || index < counter || results[index - 1] == EXECUTE_FAILED) { - if (r != null) { - r.close(); - } + if (asBatch && r != null) { + r.close(); } } } @@ -195,28 +222,21 @@ protected int toArrayIndex(int parameterIndex) throws SQLException { public ResultSet executeQuery() throws SQLException { ensureParams(); - addBatch(); - int[] results = executeBatch(true); - for (int i = 0; i < results.length; i++) { - if (results[i] == EXECUTE_FAILED) { - throw new SQLException("Query failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); - } + if (executeAny(false)[0] == EXECUTE_FAILED) { + throw new SQLException("Query failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); } - return getResultSet(); + ResultSet rs = getResultSet(); + return rs == null ? newEmptyResultSet() : rs; } @Override - public int executeUpdate() throws SQLException { + public long executeLargeUpdate() throws SQLException { ensureParams(); - addBatch(); - int[] results = executeBatch(false); - for (int i = 0; i < results.length; i++) { - if (results[i] == EXECUTE_FAILED) { - throw new SQLException("Update failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); - } + if (executeAny(false)[0] == EXECUTE_FAILED) { + throw new SQLException("Update failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); } - return getUpdateCount(); + return getLargeUpdateCount(); } @Override @@ -376,8 +396,9 @@ public void setObject(int parameterIndex, Object x) throws SQLException { public boolean execute() throws SQLException { ensureParams(); - addBatch(); - executeBatch(true); + if (executeAny(false)[0] == EXECUTE_FAILED) { + throw new SQLException("Execution failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR); + } return getResultSet() != null; } @@ -413,11 +434,6 @@ public void addBatch() throws SQLException { clearParameters(); } - @Override - public int[] executeBatch() throws SQLException { - return executeBatch(false); - } - @Override public void clearBatch() throws SQLException { ensureOpen(); diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/TableBasedPreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/TableBasedPreparedStatement.java index a2aa9c13f..edbd5d21a 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/TableBasedPreparedStatement.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/TableBasedPreparedStatement.java @@ -26,7 +26,7 @@ import com.clickhouse.jdbc.SqlExceptionUtils; import com.clickhouse.jdbc.parser.ClickHouseSqlStatement; -public class TableBasedPreparedStatement extends ClickHouseStatementImpl implements ClickHousePreparedStatement { +public class TableBasedPreparedStatement extends AbstractPreparedStatement implements ClickHousePreparedStatement { private static final Logger log = LoggerFactory.getLogger(TableBasedPreparedStatement.class); private static final String ERROR_SET_TABLE = "Please use setObject(ClickHouseExternalTable) method instead"; @@ -69,6 +69,50 @@ protected void ensureParams() throws SQLException { } } + @Override + public long[] executeAny(boolean asBatch) throws SQLException { + ensureOpen(); + boolean continueOnError = false; + if (asBatch) { + if (batch.isEmpty()) { + throw SqlExceptionUtils.emptyBatchError(); + } + continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError(); + } else { + if (!batch.isEmpty()) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } + addBatch(); + } + + long[] results = new long[batch.size()]; + int index = 0; + try { + String sql = getSql(); + for (List list : batch) { + try (ClickHouseResponse r = executeStatement(sql, null, list, null); + ResultSet rs = updateResult(parsedStmt, r)) { + if (asBatch && getResultSet() != null) { + throw SqlExceptionUtils.queryInBatchError(results); + } + long rows = getLargeUpdateCount(); + results[index] = rows > 0L ? rows : 0L; + } catch (Exception e) { + results[index] = EXECUTE_FAILED; + if (!continueOnError) { + throw SqlExceptionUtils.batchUpdateError(e, results); + } + log.error("Failed to execute batch insert at %d of %d", index + 1, batch.size(), e); + } + index++; + } + } finally { + clearBatch(); + } + + return results; + } + protected String getSql() { // why? because request can be modified so it might not always same as // parsedStmt.getSQL() @@ -87,18 +131,24 @@ protected int toArrayIndex(int parameterIndex) throws SQLException { @Override public ResultSet executeQuery() throws SQLException { ensureParams(); + if (!batch.isEmpty()) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } ClickHouseSqlStatement stmt = new ClickHouseSqlStatement(getSql()); return updateResult(parsedStmt, executeStatement(stmt, null, Arrays.asList(values), null)); } @Override - public int executeUpdate() throws SQLException { + public long executeLargeUpdate() throws SQLException { ensureParams(); + if (!batch.isEmpty()) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } try (ClickHouseResponse r = executeStatement(getSql(), null, Arrays.asList(values), null)) { updateResult(parsedStmt, r); - return getUpdateCount(); + return getLargeUpdateCount(); } } @@ -171,18 +221,13 @@ public void setObject(int parameterIndex, Object x) throws SQLException { @Override public boolean execute() throws SQLException { ensureParams(); + if (!batch.isEmpty()) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } ClickHouseSqlStatement stmt = new ClickHouseSqlStatement(getSql()); - ClickHouseResponse r = executeStatement(stmt, null, Arrays.asList(values), null); - return updateResult(parsedStmt, r) != null; - } - - @Override - public void addBatch(String sql) throws SQLException { - ensureOpen(); - - throw SqlExceptionUtils - .unsupportedError("addBatch(String) cannot be called in PreparedStatement or CallableStatement!"); + updateResult(parsedStmt, executeStatement(stmt, null, Arrays.asList(values), null)); + return getResultSet() != null; } @Override @@ -198,38 +243,6 @@ public void addBatch() throws SQLException { clearParameters(); } - @Override - public int[] executeBatch() throws SQLException { - ensureOpen(); - - boolean continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError(); - int len = batch.size(); - int[] results = new int[len]; - int counter = 0; - try { - String sql = getSql(); - for (List list : batch) { - try (ClickHouseResponse r = executeStatement(sql, null, list, null)) { - updateResult(parsedStmt, r); - int rows = getUpdateCount(); - results[counter] = rows > 0 ? rows : 0; - } catch (Exception e) { - if (!continueOnError) { - throw SqlExceptionUtils.handle(e); - } - - results[counter] = EXECUTE_FAILED; - log.error("Failed to execute batch insert at %d of %d", counter + 1, len, e); - } - counter++; - } - } finally { - clearBatch(); - } - - return results; - } - @Override public void clearBatch() throws SQLException { ensureOpen(); diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java index 5221226d2..ec9602eb1 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java @@ -3,6 +3,8 @@ import java.io.ByteArrayInputStream; import java.net.Inet4Address; import java.net.Inet6Address; +import java.sql.BatchUpdateException; +import java.sql.Connection; import java.sql.Date; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -22,6 +24,8 @@ import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.client.data.ClickHouseBitmap; import com.clickhouse.client.data.ClickHouseExternalTable; +import com.clickhouse.jdbc.internal.InputBasedPreparedStatement; +import com.clickhouse.jdbc.internal.SqlBasedPreparedStatement; import org.testng.Assert; import org.testng.annotations.DataProvider; @@ -35,6 +39,51 @@ private Object[][] getTypedParameters() { LocalDateTime.of(2021, 11, 2, 2, 3, 4) } } }; } + @DataProvider(name = "statementAndParams") + private Object[][] getStatementAndParameters() { + return new Object[][] { + // ddl + new Object[] { "ddl", "drop table if exists non_existing_table", SqlBasedPreparedStatement.class, false, + null, false }, + // query + new Object[] { "select1", "select 1", SqlBasedPreparedStatement.class, true, null, false }, + new Object[] { "select_param", "select ?", SqlBasedPreparedStatement.class, true, new String[] { "1" }, + false }, + // mutation + new Object[] { "insert_static", "insert into $table values(1)", + SqlBasedPreparedStatement.class, false, null, + false }, + new Object[] { "insert_table", "insert into $table", InputBasedPreparedStatement.class, false, + new String[] { "2" }, true }, + new Object[] { "insert_param", "insert into $table values(?)", SqlBasedPreparedStatement.class, + false, + new String[] { "3" }, true }, + new Object[] { "insert_input", "insert into $table select s from input('s String')", + InputBasedPreparedStatement.class, false, new String[] { "4" }, true }, + }; + } + + private void setParameters(PreparedStatement ps, String[] params) throws SQLException { + if (params != null) { + for (int i = 0; i < params.length; i++) { + ps.setString(i + 1, params[i]); + } + } + } + + private void checkTable(Statement stmt, String query, String[] results) throws SQLException { + if (results == null) { + return; + } + try (ResultSet rs = stmt.executeQuery(query)) { + for (int i = 0; i < results.length; i++) { + Assert.assertTrue(rs.next(), "Should have next row"); + Assert.assertEquals(rs.getString(1), results[i]); + } + Assert.assertFalse(rs.next(), "Should not have next row"); + } + } + @Test(groups = "integration") public void testReadWriteBinaryString() throws SQLException { Properties props = new Properties(); @@ -237,6 +286,34 @@ public void testReadWriteDateTime() throws SQLException { } } + @Test(groups = "integration") + public void testReadWriteDateTimeWithNanos() throws SQLException { + try (ClickHouseConnection conn = newConnection(new Properties()); + Statement stmt = conn.createStatement()) { + stmt.execute("drop table if exists test_read_write_datetime_nanos;" + + "CREATE TABLE test_read_write_datetime_nanos (id UUID, date DateTime64(3)) ENGINE = MergeTree() ORDER BY (id, date)"); + UUID id = UUID.randomUUID(); + long value = 1617359745321000L; + Instant i = Instant.ofEpochMilli(value / 1000L); + LocalDateTime dt = LocalDateTime.ofInstant(i, conn.getServerTimeZone().toZoneId()); + try (PreparedStatement ps = conn + .prepareStatement("insert into test_read_write_datetime_nanos values(?,?)")) { + ps.setObject(1, id); + ps.setObject(2, dt); + // below works too but too slow + // ps.setTimestamp(2, new Timestamp(value / 1000L)); + ps.executeUpdate(); + } + + ResultSet rs = stmt.executeQuery("select * from test_read_write_datetime_nanos"); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getObject(1), id); + Assert.assertEquals(rs.getObject(2), dt); + // rs.getString(2) will return "2021-04-02 03:35:45.321" + Assert.assertFalse(rs.next()); + } + } + @Test(groups = "integration") public void testReadWriteDateTimeWithClientTimeZone() throws SQLException { Properties props = new Properties(); @@ -478,8 +555,160 @@ public void testBatchQuery() throws SQLException { stmt.addBatch(); stmt.setInt(1, 2); stmt.addBatch(); - int[] results = stmt.executeBatch(); - Assert.assertEquals(results, new int[] { 0, 0 }); + Assert.assertThrows(BatchUpdateException.class, () -> stmt.executeBatch()); + } + } + + @Test(dataProvider = "statementAndParams", groups = "integration") + public void testExecuteWithOrWithoutParameters(String tableSuffix, String query, Class clazz, + boolean hasResultSet, String[] params, boolean checkTable) throws SQLException { + String tableName = "test_execute_ps_" + tableSuffix; + query = query.replace("$table", tableName); + Properties props = new Properties(); + try (Connection conn = newConnection(props); Statement stmt = conn.createStatement()) { + Assert.assertFalse(stmt.execute("drop table if exists " + tableName + + "; create table " + tableName + "(s String)engine=Memory"), "Should not have result set"); + + try (PreparedStatement ps = conn.prepareStatement(query)) { + Assert.assertEquals(ps.getClass(), clazz); + + // executeQuery + setParameters(ps, params); + Assert.assertNotNull(ps.executeQuery(), "executeQuery should never return null result set"); + if (hasResultSet) { + Assert.assertNotNull(ps.getResultSet(), "Should have result set"); + Assert.assertEquals(ps.getUpdateCount(), -1); + Assert.assertEquals(ps.getLargeUpdateCount(), -1L); + } else { + Assert.assertNull(ps.getResultSet(), "Should not have result set"); + Assert.assertTrue(ps.getUpdateCount() >= 0, "Should have update count"); + Assert.assertTrue(ps.getLargeUpdateCount() >= 0L, "Should have update count"); + } + if (checkTable) + checkTable(stmt, "select * from " + tableName, params); + + // execute + Assert.assertFalse(stmt.execute("truncate table " + tableName), "Should not have result set"); + setParameters(ps, params); + if (hasResultSet) { + Assert.assertTrue(ps.execute(), "Should have result set"); + Assert.assertNotNull(ps.getResultSet(), "Should have result set"); + Assert.assertEquals(ps.getUpdateCount(), -1); + Assert.assertEquals(ps.getLargeUpdateCount(), -1L); + } else { + Assert.assertFalse(ps.execute(), "Should not have result set"); + Assert.assertNull(ps.getResultSet(), "Should not have result set"); + Assert.assertTrue(ps.getUpdateCount() >= 0, "Should have update count"); + Assert.assertTrue(ps.getLargeUpdateCount() >= 0L, "Should have update count"); + } + if (checkTable) + checkTable(stmt, "select * from " + tableName, params); + + // executeLargeUpdate + Assert.assertFalse(stmt.execute("truncate table " + tableName), "Should not have result set"); + setParameters(ps, params); + Assert.assertEquals(ps.executeLargeUpdate(), ps.getLargeUpdateCount()); + if (hasResultSet) { + Assert.assertNotNull(ps.getResultSet(), "Should have result set"); + Assert.assertEquals(ps.getUpdateCount(), -1); + Assert.assertEquals(ps.getLargeUpdateCount(), -1L); + } else { + Assert.assertNull(ps.getResultSet(), "Should not have result set"); + Assert.assertTrue(ps.getUpdateCount() >= 0, "Should have update count"); + Assert.assertTrue(ps.getLargeUpdateCount() >= 0L, "Should have update count"); + } + if (checkTable) + checkTable(stmt, "select * from " + tableName, params); + + // executeUpdate + Assert.assertFalse(stmt.execute("truncate table " + tableName), "Should not have result set"); + setParameters(ps, params); + Assert.assertEquals(ps.executeUpdate(), ps.getUpdateCount()); + if (hasResultSet) { + Assert.assertNotNull(ps.getResultSet(), "Should have result set"); + Assert.assertEquals(ps.getUpdateCount(), -1); + Assert.assertEquals(ps.getLargeUpdateCount(), -1L); + } else { + Assert.assertNull(ps.getResultSet(), "Should not have result set"); + Assert.assertTrue(ps.getUpdateCount() >= 0, "Should have update count"); + Assert.assertTrue(ps.getLargeUpdateCount() >= 0L, "Should have update count"); + } + if (checkTable) + checkTable(stmt, "select * from " + tableName, params); + } + + // executeLargeBatch + Assert.assertFalse(stmt.execute("truncate table " + tableName), "Should not have result set"); + try (PreparedStatement ps = conn.prepareStatement(query)) { + Assert.assertEquals(ps.getClass(), clazz); + setParameters(ps, params); + ps.addBatch(); + Assert.assertThrows(SQLException.class, () -> ps.execute()); + Assert.assertThrows(SQLException.class, () -> ps.executeQuery()); + Assert.assertThrows(SQLException.class, () -> ps.executeUpdate()); + if (hasResultSet) { + Assert.assertThrows(SQLException.class, () -> ps.executeLargeBatch()); + } else { + Assert.assertEquals(ps.executeLargeBatch(), new long[] { 1L }); + } + if (checkTable) + checkTable(stmt, "select * from " + tableName, params); + } + + // executeBatch + Assert.assertFalse(stmt.execute("truncate table " + tableName), "Should not have result set"); + try (PreparedStatement ps = conn.prepareStatement(query)) { + Assert.assertEquals(ps.getClass(), clazz); + setParameters(ps, params); + ps.addBatch(); + Assert.assertThrows(SQLException.class, () -> ps.execute()); + Assert.assertThrows(SQLException.class, () -> ps.executeQuery()); + Assert.assertThrows(SQLException.class, () -> ps.executeUpdate()); + if (hasResultSet) { + Assert.assertThrows(SQLException.class, () -> ps.executeBatch()); + } else { + Assert.assertEquals(ps.executeBatch(), new int[] { 1 }); + } + if (checkTable) + checkTable(stmt, "select * from " + tableName, params); + } + } + + props.setProperty(JdbcConfig.PROP_CONTINUE_BATCH, "true"); + try (Connection conn = newConnection(props); + Statement stmt = conn.createStatement(); + PreparedStatement ps = conn.prepareStatement(query)) { + Assert.assertEquals(ps.getClass(), clazz); + + // executeLargeBatch + Assert.assertFalse(stmt.execute("truncate table " + tableName), "Should not have result set"); + setParameters(ps, params); + ps.addBatch(); + Assert.assertThrows(SQLException.class, () -> ps.execute()); + Assert.assertThrows(SQLException.class, () -> ps.executeQuery()); + Assert.assertThrows(SQLException.class, () -> ps.executeUpdate()); + if (hasResultSet) { + Assert.assertEquals(ps.executeLargeBatch(), new long[] { Statement.EXECUTE_FAILED }); + } else { + Assert.assertEquals(ps.executeLargeBatch(), new long[] { 1L }); + } + if (checkTable) + checkTable(stmt, "select * from " + tableName, params); + + // executeBatch + Assert.assertFalse(stmt.execute("truncate table " + tableName), "Should not have result set"); + setParameters(ps, params); + ps.addBatch(); + Assert.assertThrows(SQLException.class, () -> ps.execute()); + Assert.assertThrows(SQLException.class, () -> ps.executeQuery()); + Assert.assertThrows(SQLException.class, () -> ps.executeUpdate()); + if (hasResultSet) { + Assert.assertEquals(ps.executeBatch(), new int[] { Statement.EXECUTE_FAILED }); + } else { + Assert.assertEquals(ps.executeBatch(), new int[] { 1 }); + } + if (checkTable) + checkTable(stmt, "select * from " + tableName, params); } } diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java index c09f3a156..83fef9807 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java @@ -1,6 +1,7 @@ package com.clickhouse.jdbc; import java.sql.Array; +import java.sql.BatchUpdateException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -96,8 +97,8 @@ public void testLogComment() throws SQLException { String sql = "-- select something " + uuid + "\nselect 12345"; stmt.execute(sql + "; system flush logs;"); ResultSet rs = stmt.executeQuery( - "select distinct query, type from system.query_log where log_comment = 'select something " + uuid - + "'"); + "select distinct query from system.query_log where type = 'QueryStart' and log_comment = 'select something " + + uuid + "'"); Assert.assertTrue(rs.next()); Assert.assertEquals(rs.getString(1), sql); Assert.assertFalse(rs.next()); @@ -122,21 +123,22 @@ public void testMutation() throws SQLException { Assert.assertEquals(conn.createStatement().executeUpdate("update test_mutation set b = 22 where b = 1"), 0); Assert.assertThrows(SQLException.class, - () -> stmt.executeUpdate("update non_exist_table set value=1 where key=1")); + () -> stmt.executeUpdate("update non_existing_table set value=1 where key=1")); - stmt.addBatch("select 1"); - stmt.addBatch("select * from non_exist_table"); - stmt.addBatch("select 2"); + stmt.addBatch("insert into test_mutation values('1',1)"); + stmt.addBatch("drop table non_existing_table"); + stmt.addBatch("insert into test_mutation values('2',2)"); Assert.assertThrows(SQLException.class, () -> stmt.executeBatch()); } props.setProperty(JdbcConfig.PROP_CONTINUE_BATCH, "true"); try (ClickHouseConnection conn = newConnection(props); ClickHouseStatement stmt = conn.createStatement()) { - stmt.addBatch("select 1"); - stmt.addBatch("select * from non_exist_table"); stmt.addBatch("insert into test_mutation values('a',1)"); + stmt.addBatch("drop table non_existing_table"); + stmt.addBatch("insert into test_mutation values('b',2)"); stmt.addBatch("select 2"); - Assert.assertEquals(stmt.executeBatch(), new int[] { 0, ClickHouseStatement.EXECUTE_FAILED, 1, 0 }); + Assert.assertEquals(stmt.executeBatch(), + new int[] { 1, Statement.EXECUTE_FAILED, 1, Statement.EXECUTE_FAILED }); } } @@ -174,7 +176,8 @@ public void testAsyncInsert() throws SQLException { + "INSERT INTO test_async_insert VALUES(1, 'a'); " + "select * from test_async_insert"); ResultSet rs = stmt.getResultSet(); - Assert.assertFalse(rs.next()); + Assert.assertFalse(rs.next(), + "Server was probably busy at that time, so the row was inserted before your query"); } } @@ -205,6 +208,187 @@ public void testCancelQuery() throws Exception { } } + @Test(groups = "integration") + public void testExecute() throws SQLException { + try (Connection conn = newConnection(new Properties()); + Statement stmt = conn.createStatement()) { + // ddl + Assert.assertFalse(stmt.execute("drop table if exists non_existing_table"), "Should have no result set"); + Assert.assertEquals(stmt.getResultSet(), null); + Assert.assertTrue(stmt.getUpdateCount() >= 0, "Should have update count"); + // query + Assert.assertTrue(stmt.execute("select 1"), "Should have result set"); + ResultSet rs = stmt.getResultSet(); + Assert.assertTrue(rs.next(), "Should have one record"); + Assert.assertEquals(rs.getInt(1), 1); + Assert.assertFalse(rs.next(), "Should have only one record"); + // mixed usage + stmt.addBatch("drop table if exists non_existing_table"); + Assert.assertThrows(SQLException.class, () -> stmt.executeQuery("drop table if exists non_existing_table")); + Assert.assertThrows(SQLException.class, () -> stmt.executeQuery("select 2")); + stmt.clearBatch(); + Assert.assertFalse(stmt.execute("drop table if exists non_existing_table"), "Should have no result set"); + Assert.assertEquals(stmt.getResultSet(), null); + Assert.assertTrue(stmt.getUpdateCount() >= 0, "Should have update count"); + Assert.assertTrue(stmt.execute("select 2"), "Should have result set"); + rs = stmt.getResultSet(); + Assert.assertTrue(rs.next(), "Should have one record"); + Assert.assertEquals(rs.getInt(1), 2); + Assert.assertFalse(rs.next(), "Should have only one record"); + } + } + + @Test(groups = "integration") + public void testExecuteBatch() throws SQLException { + Properties props = new Properties(); + try (Connection conn = newConnection(props); Statement stmt = conn.createStatement()) { + Assert.assertThrows(SQLException.class, () -> stmt.executeBatch()); + stmt.addBatch("select 1"); + // mixed usage + Assert.assertThrows(SQLException.class, () -> stmt.execute("select 2")); + Assert.assertThrows(SQLException.class, () -> stmt.executeQuery("select 2")); + Assert.assertThrows(SQLException.class, + () -> stmt.executeLargeUpdate("drop table if exists non_existing_table")); + Assert.assertThrows(SQLException.class, + () -> stmt.executeUpdate("drop table if exists non_existing_table")); + // query in batch + Assert.assertThrows(BatchUpdateException.class, () -> stmt.executeBatch()); + stmt.addBatch("select 1"); + Assert.assertThrows(BatchUpdateException.class, () -> stmt.executeLargeBatch()); + + Assert.assertFalse(stmt.execute("drop table if exists test_execute_batch; " + + "create table test_execute_batch(a Int32, b String)engine=Memory"), "Should not have result set"); + stmt.addBatch("insert into test_execute_batch values(1,'1')"); + stmt.addBatch("insert into test_execute_batch values(2,'2')"); + stmt.addBatch("insert into test_execute_batch values(3,'3')"); + Assert.assertEquals(stmt.executeBatch(), new int[] { 1, 1, 1 }); + + Assert.assertFalse(stmt.execute("truncate table test_execute_batch"), "Should not have result set"); + stmt.addBatch("insert into test_execute_batch values(1,'1')"); + stmt.addBatch("insert into test_execute_batch values(2,'2')"); + stmt.addBatch("insert into test_execute_batch values(3,'3')"); + Assert.assertEquals(stmt.executeLargeBatch(), new long[] { 1L, 1L, 1L }); + + try (ResultSet rs = stmt.executeQuery("select * from test_execute_batch order by a")) { + int count = 0; + while (rs.next()) { + count++; + Assert.assertEquals(rs.getInt(1), count); + Assert.assertEquals(rs.getString(2), String.valueOf(count)); + } + Assert.assertEquals(count, 3); + } + + Assert.assertFalse(stmt.execute("truncate table test_execute_batch"), "Should not have result set"); + stmt.addBatch("insert into test_execute_batch values(1,'1')"); + stmt.addBatch("drop table non_existing_table"); + stmt.addBatch("insert into test_execute_batch values(2,'2')"); + Assert.assertThrows(BatchUpdateException.class, () -> stmt.executeBatch()); + + Assert.assertFalse(stmt.execute("truncate table test_execute_batch"), "Should not have result set"); + stmt.addBatch("insert into test_execute_batch values(1,'1')"); + stmt.addBatch("drop table non_existing_table"); + stmt.addBatch("insert into test_execute_batch values(2,'2')"); + Assert.assertThrows(BatchUpdateException.class, () -> stmt.executeLargeBatch()); + } + + props.setProperty(JdbcConfig.PROP_CONTINUE_BATCH, "true"); + try (Connection conn = newConnection(props); Statement stmt = conn.createStatement()) { + Assert.assertFalse(stmt.execute("truncate table test_execute_batch"), "Should not have result set"); + stmt.addBatch("insert into test_execute_batch values(1,'1')"); + stmt.addBatch("drop table non_existing_table"); + stmt.addBatch("insert into test_execute_batch values(2,'2')"); + stmt.addBatch("drop table non_existing_table"); + Assert.assertEquals(stmt.executeBatch(), + new int[] { 1, Statement.EXECUTE_FAILED, 1, Statement.EXECUTE_FAILED }); + + Assert.assertFalse(stmt.execute("truncate table test_execute_batch"), "Should not have result set"); + stmt.addBatch("insert into test_execute_batch values(1,'1')"); + stmt.addBatch("drop table non_existing_table"); + stmt.addBatch("insert into test_execute_batch values(2,'2')"); + stmt.addBatch("drop table non_existing_table"); + Assert.assertEquals(stmt.executeLargeBatch(), + new long[] { 1L, Statement.EXECUTE_FAILED, 1L, Statement.EXECUTE_FAILED }); + try (ResultSet rs = stmt.executeQuery("select * from test_execute_batch order by a")) { + int count = 0; + while (rs.next()) { + count++; + Assert.assertEquals(rs.getInt(1), count); + Assert.assertEquals(rs.getString(2), String.valueOf(count)); + } + Assert.assertEquals(count, 2); + } + } + } + + @Test(groups = "integration") + public void testExecuteQuery() throws SQLException { + try (Connection conn = newConnection(new Properties()); + Statement stmt = conn.createStatement()) { + ResultSet rs = stmt.executeQuery("select 1"); + Assert.assertTrue(rs == stmt.getResultSet(), "Should be the exact same result set"); + Assert.assertEquals(stmt.getUpdateCount(), -1); + Assert.assertTrue(rs.next(), "Should have one record"); + Assert.assertEquals(rs.getInt(1), 1); + Assert.assertFalse(rs.next(), "Should have only one record"); + + stmt.addBatch("select 1"); + Assert.assertThrows(SQLException.class, () -> stmt.executeQuery("select 2")); + stmt.clearBatch(); + rs = stmt.executeQuery("select 2"); + Assert.assertTrue(rs == stmt.getResultSet(), "Should be the exact same result set"); + Assert.assertEquals(stmt.getUpdateCount(), -1); + Assert.assertTrue(rs.next(), "Should have one record"); + Assert.assertEquals(rs.getInt(1), 2); + Assert.assertFalse(rs.next(), "Should have only one record"); + + // never return null result set + rs = stmt.executeQuery("drop table if exists non_existing_table"); + Assert.assertNotNull(rs, "Should never be null"); + Assert.assertNull(stmt.getResultSet(), "Should be null"); + Assert.assertEquals(stmt.getUpdateCount(), 1); + Assert.assertFalse(rs.next(), "Should has no row"); + } + } + + @Test(groups = "integration") + public void testExecuteUpdate() throws SQLException { + try (Connection conn = newConnection(new Properties()); + Statement stmt = conn.createStatement()) { + Assert.assertFalse(stmt.execute("drop table if exists test_execute_query; " + + "create table test_execute_query(a Int32, b String)engine=Memory"), "Should not have result set"); + + Assert.assertTrue(stmt.executeUpdate("insert into test_execute_query values(1,'1')") >= 0, + "Should return value greater than or equal to zero"); + Assert.assertNull(stmt.getResultSet(), "Should have no result set"); + Assert.assertEquals(stmt.getUpdateCount(), 1); + Assert.assertEquals(stmt.getLargeUpdateCount(), 1L); + Assert.assertTrue(stmt.executeLargeUpdate("insert into test_execute_query values(1,'1')") >= 0L, + "Should return value greater than or equal to zero"); + Assert.assertNull(stmt.getResultSet(), "Should have no result set"); + Assert.assertEquals(stmt.getUpdateCount(), 1); + Assert.assertEquals(stmt.getLargeUpdateCount(), 1L); + + stmt.addBatch("select 1"); + Assert.assertThrows(SQLException.class, + () -> stmt.executeUpdate("insert into test_execute_query values(1,'1')")); + Assert.assertThrows(SQLException.class, + () -> stmt.executeLargeUpdate("insert into test_execute_query values(1,'1')")); + stmt.clearBatch(); + + Assert.assertTrue(stmt.executeUpdate("insert into test_execute_query values(2,'2')") >= 0, + "Should return value greater than or equal to zero"); + Assert.assertNull(stmt.getResultSet(), "Should have no result set"); + Assert.assertEquals(stmt.getUpdateCount(), 1); + Assert.assertEquals(stmt.getLargeUpdateCount(), 1L); + Assert.assertTrue(stmt.executeLargeUpdate("insert into test_execute_query values(2,'2')") >= 0, + "Should return value greater than or equal to zero"); + Assert.assertNull(stmt.getResultSet(), "Should have no result set"); + Assert.assertEquals(stmt.getUpdateCount(), 1); + Assert.assertEquals(stmt.getLargeUpdateCount(), 1L); + } + } + @Test(groups = "integration") public void testSimpleAggregateFunction() throws SQLException { try (ClickHouseConnection conn = newConnection(new Properties()); @@ -248,12 +432,14 @@ public void testWrapperObject() throws SQLException { } @Test(groups = "integration") - public void testQuery() throws SQLException { + public void testQuerySystemLog() throws SQLException { try (ClickHouseConnection conn = newConnection(new Properties())) { ClickHouseStatement stmt = conn.createStatement(); stmt.setMaxRows(10); + stmt.setLargeMaxRows(11L); ResultSet rs = stmt.executeQuery("select * from numbers(100)"); + int rows = 0; try (ResultSet colRs = conn.getMetaData().getColumns(null, "system", "query_log", "")) { while (colRs.next()) { continue; @@ -261,15 +447,16 @@ public void testQuery() throws SQLException { } while (rs.next()) { - continue; + rows++; } + Assert.assertEquals(rows, 11); // batch query - stmt.addBatch("select 1"); - stmt.addBatch("select 2"); - stmt.addBatch("select 3"); + stmt.addBatch("drop table if exists non_existing_table1"); + stmt.addBatch("drop table if exists non_existing_table2"); + stmt.addBatch("drop table if exists non_existing_table3"); int[] results = stmt.executeBatch(); - Assert.assertEquals(results, new int[] { 0, 0, 0 }); + Assert.assertEquals(results, new int[] { 1, 1, 1 }); } }