Skip to content

Commit

Permalink
[#115] Use collector when executing batch and routine
Browse files Browse the repository at this point in the history
  • Loading branch information
zero88 committed Aug 6, 2022
1 parent 21b8553 commit 541438f
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 110 deletions.
23 changes: 21 additions & 2 deletions core/src/main/java/io/github/zero88/jooqx/JooqxBatchCollector.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,40 @@
package io.github.zero88.jooqx;

import java.util.ArrayList;
import java.util.List;

import org.jetbrains.annotations.NotNull;

import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlResult;

/**
* Represents for a collector that collects {@code Vert.x SQL batch result} to an expectation output
*
* @param <R> Type of each row in batch result
* @see JooqxResultCollector
* @since 2.0.0
*/
@VertxGen
public interface JooqxBatchCollector extends JooqxResultCollector, SQLBatchCollector<RowSet<Row>, RowSet<Row>> {
public interface JooqxBatchCollector<R>
extends JooqxResultCollector, SQLBatchCollector<RowSet<Row>, SqlResult<List<R>>> {

@Override
int batchResultSize(@NotNull RowSet<Row> batchResult);
default int batchResultSize(@NotNull SqlResult<List<R>> batchResult) {
return reduce(batchResult).size();
}

@GenIgnore
default List<R> reduce(SqlResult<List<R>> batchResult) {
final List<R> br = new ArrayList<>();
SqlResult<List<R>> res = batchResult;
do {
br.add(res.value().stream().findFirst().orElse(null));
} while ((res = res.next()) != null);
return br;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.github.zero88.jooqx.adapter.SQLResultAdapter;
import io.github.zero88.jooqx.datatype.DataTypeMapperRegistry;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
Expand All @@ -27,12 +26,6 @@ static JooqxResultCollector create() {
return new ReactiveSQLRC();
}

@Override
@GenIgnore(GenIgnore.PERMITTED_TYPE)
<ROW, RESULT> @Nullable RESULT collect(@NotNull RowSet<Row> resultSet,
@NotNull SQLResultAdapter<ROW, RESULT> adapter,
@NotNull DSLContext dslContext, @NotNull DataTypeMapperRegistry registry);

/**
* Create collector
*
Expand All @@ -48,4 +41,22 @@ static JooqxResultCollector create() {
@NotNull DSLContext dslContext,
@NotNull DataTypeMapperRegistry registry);

/**
* Create result batch collector
*
* @param <R> Type of each row in batch result
* @return the batch collector
* @see JooqxBatchCollector
*/
default <R> JooqxBatchCollector<R> batchCollector() {
return new JooqxBatchCollector<R>() {
@Override
public @NotNull <ROW, RESULT> Collector<Row, List<ROW>, RESULT> collector(
@NotNull SQLResultAdapter<ROW, RESULT> adapter, @NotNull DSLContext dslContext,
@NotNull DataTypeMapperRegistry registry) {
return JooqxResultCollector.this.collector(adapter, dslContext, registry);
}
};
}

}
73 changes: 8 additions & 65 deletions core/src/main/java/io/github/zero88/jooqx/JooqxSQLImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -38,7 +39,6 @@
import io.vertx.core.Vertx;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowIterator;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlClient;
import io.vertx.sqlclient.SqlConnection;
Expand Down Expand Up @@ -77,14 +77,6 @@ protected Tuple doConvert(Map<String, Param<?>> params, DataTypeMapperRegistry r

static class ReactiveSQLRC implements JooqxResultCollector {

@Override
public <ROW, RESULT> @NotNull RESULT collect(@NotNull RowSet<Row> rs,
@NotNull SQLResultAdapter<ROW, RESULT> adapter,
@NotNull DSLContext dslContext,
@NotNull DataTypeMapperRegistry registry) {
return adapter.collect(collect(rs, dslContext, registry, adapter.recordFactory(), adapter.strategy()));
}

@Override
public @NotNull <ROW, RESULT> Collector<Row, List<ROW>, RESULT> collector(
@NotNull SQLResultAdapter<ROW, RESULT> adapter, @NotNull DSLContext dsl,
Expand All @@ -94,28 +86,9 @@ static class ReactiveSQLRC implements JooqxResultCollector {
(rows1, rows2) -> rows2, rows -> collect(adapter, rows));
}

@NotNull
protected final <ROW> List<ROW> collect(@NotNull RowSet<Row> resultSet, @NotNull DSLContext dsl,
@NotNull DataTypeMapperRegistry registry,
@NotNull RecordFactory<? extends Record, ROW> recordFactory,
@NotNull SelectStrategy strategy) {
final List<ROW> records = new ArrayList<>();
final RowIterator<Row> iterator = resultSet.iterator();
if (strategy == SelectStrategy.MANY) {
iterator.forEachRemaining(row -> records.add(rowToRecord(row, dsl, registry, recordFactory)));
} else if (iterator.hasNext()) {
final Row row = iterator.next();
if (iterator.hasNext()) {
throw new TooManyRowsException();
}
records.add(rowToRecord(row, dsl, registry, recordFactory));
}
return records;
}

private <REC extends Record, ROW> ROW rowToRecord(@NotNull Row row, @NotNull DSLContext dsl,
@NotNull DataTypeMapperRegistry registry,
RecordFactory<REC, ROW> recordFactory) {
protected final <REC extends Record, ROW> ROW rowToRecord(@NotNull Row row, @NotNull DSLContext dsl,
@NotNull DataTypeMapperRegistry registry,
@NotNull RecordFactory<REC, ROW> recordFactory) {
final Function<FieldWrapper, Object> getValue = (f) -> {
try {
return registry.toUserType(f.field(), row.getValue(f.field().getName()));
Expand All @@ -141,38 +114,6 @@ private static <ROW, RESULT> RESULT collect(@NotNull SQLResultAdapter<ROW, RESUL
}


static final class ReactiveSQLBC extends ReactiveSQLRC implements JooqxBatchCollector {

@Override
public <ROW, RESULT> @NotNull RESULT collect(@NotNull RowSet<Row> batchResult,
@NotNull SQLResultAdapter<ROW, RESULT> adapter,
@NotNull DSLContext dslContext,
@NotNull DataTypeMapperRegistry registry) {
final List<ROW> records = new ArrayList<>();
while (batchResult != null) {
final List<ROW> rows = collect(batchResult, dslContext, registry, adapter.recordFactory(),
SelectStrategy.FIRST_ONE);
if (!rows.isEmpty()) {
records.add(rows.get(0));
}
batchResult = batchResult.next();
}
return adapter.collect(records);
}

@Override
public int batchResultSize(@NotNull RowSet<Row> batchResult) {
final int[] count = new int[] { 0 };
while (batchResult != null) {
count[0]++;
batchResult = batchResult.next();
}
return count[0];
}

}


static class JooqxConnImpl extends JooqxImpl<SqlConnection> implements JooqxTx {

private final Jooqx delegate;
Expand Down Expand Up @@ -291,8 +232,9 @@ private void tweakDSLSetting() {
@Override
public Future<BatchResult> batch(@NotNull Query query, @NotNull BindBatchValues bindBatchValues) {
return sqlClient().preparedQuery(preparedQuery().sql(dsl().configuration(), query))
.collecting(Collectors.toList())
.executeBatch(preparedQuery().bindValues(query, bindBatchValues, typeMapperRegistry()))
.map(r -> new ReactiveSQLBC().batchResultSize(r))
.map(resultCollector().<Row>batchCollector()::batchResultSize)
.map(s -> BatchResult.create(bindBatchValues.size(), s))
.otherwise(errorConverter()::reThrowError);
}
Expand All @@ -302,8 +244,9 @@ public <T, R> Future<BatchReturningResult<R>> batchResult(@NotNull Query query,
@NotNull BindBatchValues bindBatchValues,
@NotNull SQLResultListAdapter<T, R> adapter) {
return sqlClient().preparedQuery(preparedQuery().sql(dsl().configuration(), query))
.collecting(resultCollector().collector(adapter, dsl(), typeMapperRegistry()))
.executeBatch(preparedQuery().bindValues(query, bindBatchValues, typeMapperRegistry()))
.map(rs -> new ReactiveSQLBC().collect(rs, adapter, dsl(), typeMapperRegistry()))
.map(resultCollector().<R>batchCollector()::reduce)
.map(rs -> BatchReturningResult.create(bindBatchValues.size(), rs))
.otherwise(errorConverter()::reThrowError);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,17 @@ static LegacySQLCollector create() {
@Override
int batchResultSize(@NotNull List<Integer> batchResult);

@Override
/**
* Collect result set to an expectation result that defines in SQL result adapter
*
* @param <ROW> the type of jOOQ record of the reduction operation
* @param <RESULT> the type of result after the reduction operation
* @param resultSet result set
* @return an expectation result
* @see SQLResultAdapter
* @see DataTypeMapperRegistry
* @since 2.0.0
*/
@GenIgnore(GenIgnore.PERMITTED_TYPE)
<ROW, RESULT> @Nullable RESULT collect(@NotNull ResultSet resultSet, @NotNull SQLResultAdapter<ROW, RESULT> adapter,
@NotNull DSLContext dslContext, @NotNull DataTypeMapperRegistry registry);
Expand Down
28 changes: 0 additions & 28 deletions core/src/main/java/io/github/zero88/jooqx/SQLResultCollector.java
Original file line number Diff line number Diff line change
@@ -1,39 +1,11 @@
package io.github.zero88.jooqx;

import org.jetbrains.annotations.NotNull;
import org.jooq.DSLContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.github.zero88.jooqx.adapter.SQLResultAdapter;
import io.github.zero88.jooqx.adapter.SelectStrategy;
import io.github.zero88.jooqx.datatype.DataTypeMapperRegistry;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.Nullable;

/**
* Represents for a collector that collects and transforms {@code Vert.x SQL result} to an expectation output
*
* @param <RS> Type of Vertx SQL result set
* @see LegacySQLCollector
* @see JooqxResultCollector
* @see JooqxBatchCollector
* @since 1.0.0
*/
public interface SQLResultCollector<RS> {

/**
* Collect result set to an expectation result that defines in SQL result adapter
*
* @param <ROW> the type of jOOQ record of the reduction operation
* @param <RESULT> the type of result after the reduction operation
* @param resultSet result set
* @return an expectation result
* @see SQLResultAdapter
* @see DataTypeMapperRegistry
* @since 2.0.0
*/
@Nullable <ROW, RESULT> RESULT collect(@NotNull RS resultSet, @NotNull SQLResultAdapter<ROW, RESULT> adapter,
@NotNull DSLContext dslContext, @NotNull DataTypeMapperRegistry registry);

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.vertx.core.Future;
import io.vertx.jdbcclient.SqlOutParam;
import io.vertx.sqlclient.SqlClient;
import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.Tuple;

class JDBCRoutineExecutor<S extends SqlClient> extends RoutineExecutorDelegateImpl {
Expand Down Expand Up @@ -44,11 +45,11 @@ public <T> Future<RoutineResult> routineResult(@NotNull Routine<T> routine) {
@Override
public <T, X, R> Future<@Nullable R> routineResultSet(@NotNull Routine<T> routine,
@NotNull SQLResultAdapter<X, R> adapter) {
final Tuple bindValues = createBindValues(routine);
return jooqx().sqlClient()
.preparedQuery(jooqx().preparedQuery().routine(dsl().configuration(), routine))
.execute(bindValues)
.map(rs -> jooqx().resultCollector().collect(rs, adapter, dsl(), jooqx().typeMapperRegistry()))
.collecting(jooqx().resultCollector().collector(adapter, dsl(), jooqx().typeMapperRegistry()))
.execute(createBindValues(routine))
.map(SqlResult::value)
.otherwise(jooqx().errorConverter()::reThrowError);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ void test_batch_insert_and_returning_id(VertxTestContext ctx) {
final Handler<AsyncResult<BatchReturningResult<Record1<?>>>> asserter = ar -> ctx.verify(() -> {
final BatchReturningResult<Record1<?>> result = assertSuccess(ctx, ar);
final List<Record1<?>> records = result.getRecords();
System.out.println(records);
Assertions.assertEquals(2, result.getTotal());
Assertions.assertEquals(2, result.getSuccesses());
Assertions.assertEquals(9, result.getRecords().get(0).value1());
Assertions.assertEquals(10, result.getRecords().get(1).value1());
System.out.println(records);
jooqx.execute(jooqx.dsl().selectFrom(table), DSLAdapter.fetchJsonRecords(table), ar2 -> {
Assertions.assertEquals(9, records.get(0).value1());
Assertions.assertEquals(10, records.get(1).value1());
jooqx.execute(dsl -> dsl.selectFrom(table), DSLAdapter.fetchJsonRecords(table), ar2 -> {
assertResultSize(ctx, ar2, 10);
flag.flag();
});
Expand Down

0 comments on commit 541438f

Please sign in to comment.