Skip to content

Commit

Permalink
[#115] Use collector when executing query
Browse files Browse the repository at this point in the history
  • Loading branch information
zero88 committed Aug 6, 2022
1 parent 197df55 commit 21b8553
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.github.zero88.jooqx;

import java.util.List;
import java.util.stream.Collector;

import org.jetbrains.annotations.NotNull;
import org.jooq.DSLContext;

Expand Down Expand Up @@ -30,4 +33,19 @@ static JooqxResultCollector create() {
@NotNull SQLResultAdapter<ROW, RESULT> adapter,
@NotNull DSLContext dslContext, @NotNull DataTypeMapperRegistry registry);

/**
* Create collector
*
* @param adapter SQL result adapter
* @param dslContext dsl context
* @param registry data type mapper registry
* @param <ROW> Type of intermediate row, might be jOOQ record or custom type
* @param <RESULT> Type of expectation result
* @return the collector
*/
@GenIgnore
<ROW, RESULT> @NotNull Collector<Row, List<ROW>, RESULT> collector(@NotNull SQLResultAdapter<ROW, RESULT> adapter,
@NotNull DSLContext dslContext,
@NotNull DataTypeMapperRegistry registry);

}
55 changes: 35 additions & 20 deletions core/src/main/java/io/github/zero88/jooqx/JooqxSQLImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collector;
Expand Down Expand Up @@ -77,12 +78,20 @@ protected Tuple doConvert(Map<String, Param<?>> params, DataTypeMapperRegistry r
static class ReactiveSQLRC implements JooqxResultCollector {

@Override
public <ROW, RESULT> @NotNull RESULT collect(@NotNull RowSet<Row> resultSet,
public <ROW, RESULT> @NotNull RESULT collect(@NotNull RowSet<Row> rs,
@NotNull SQLResultAdapter<ROW, RESULT> adapter,
@NotNull DSLContext dslContext,
@NotNull DataTypeMapperRegistry registry) {
return adapter.collect(
collect(resultSet, dslContext, registry, adapter.recordFactory(), adapter.strategy()));
return adapter.collect(collect(rs, dslContext, registry, adapter.recordFactory(), adapter.strategy()));
}

@Override
public @NotNull <ROW, RESULT> Collector<Row, List<ROW>, RESULT> collector(
@NotNull SQLResultAdapter<ROW, RESULT> adapter, @NotNull DSLContext dsl,
@NotNull DataTypeMapperRegistry registry) {
return Collector.of(ArrayList::new,
(rows, row) -> rows.add(rowToRecord(row, dsl, registry, adapter.recordFactory())),
(rows1, rows2) -> rows2, rows -> collect(adapter, rows));
}

@NotNull
Expand All @@ -92,36 +101,41 @@ protected final <ROW> List<ROW> collect(@NotNull RowSet<Row> resultSet, @NotNull
@NotNull SelectStrategy strategy) {
final List<ROW> records = new ArrayList<>();
final RowIterator<Row> iterator = resultSet.iterator();
final Function<Row, ROW> fn = row -> IntStream.range(0, row.size())
.mapToObj(i -> recordFactory.lookup(row.getColumnName(i), i))
.filter(Objects::nonNull)
.collect(rowToRecord(row, dsl, registry, recordFactory));
if (strategy == SelectStrategy.MANY) {
iterator.forEachRemaining(row -> records.add(fn.apply(row)));
iterator.forEachRemaining(row -> records.add(rowToRecord(row, dsl, registry, recordFactory)));
} else if (iterator.hasNext()) {
final Row row = iterator.next();
if (iterator.hasNext()) {
throw new TooManyRowsException();
}
records.add(fn.apply(row));
records.add(rowToRecord(row, dsl, registry, recordFactory));
}
return records;
}

private <REC extends Record, ROW> Collector<FieldWrapper, REC, ROW> rowToRecord(@NotNull Row row,
@NotNull DSLContext dsl,
@NotNull DataTypeMapperRegistry registry,
RecordFactory<REC, ROW> recordFactory) {
BiFunction<FieldWrapper, Row, Object> getValue = (f, r) -> {
private <REC extends Record, ROW> ROW rowToRecord(@NotNull Row row, @NotNull DSLContext dsl,
@NotNull DataTypeMapperRegistry registry,
RecordFactory<REC, ROW> recordFactory) {
final Function<FieldWrapper, Object> getValue = (f) -> {
try {
return r.getValue(f.field().getName());
return registry.toUserType(f.field(), row.getValue(f.field().getName()));
} catch (NoSuchElementException e) {
return r.getValue(f.colNo());
return registry.toUserType(f.field(), row.getValue(f.colNo()));
}
};
return Collector.of(() -> recordFactory.create(dsl),
(rec, f) -> rec.set(f.field(), registry.toUserType(f.field(), getValue.apply(f, row))),
(rec1, rec2) -> rec2, recordFactory::map);
final BiConsumer<REC, FieldWrapper> accumulator = (rec, f) -> rec.set(f.field(), getValue.apply(f));
return IntStream.range(0, row.size())
.mapToObj(i -> recordFactory.lookup(row.getColumnName(i), i))
.filter(Objects::nonNull)
.collect(Collector.of(() -> recordFactory.create(dsl), accumulator, (r1, r2) -> r2,
recordFactory::map));
}

private static <ROW, RESULT> RESULT collect(@NotNull SQLResultAdapter<ROW, RESULT> adapter, List<ROW> rows) {
if (adapter.strategy() == SelectStrategy.FIRST_ONE && rows.size() > 1) {
throw new TooManyRowsException();
}
return adapter.collect(rows);
}

}
Expand Down Expand Up @@ -268,8 +282,9 @@ private void tweakDSLSetting() {
@Override
public <T, R> Future<@Nullable R> execute(@NotNull Query query, @NotNull SQLResultAdapter<T, R> adapter) {
return sqlClient().preparedQuery(preparedQuery().sql(dsl().configuration(), query))
.collecting(resultCollector().collector(adapter, dsl(), typeMapperRegistry()))
.execute(preparedQuery().bindValues(query, typeMapperRegistry()))
.map(rs -> resultCollector().collect(rs, adapter, dsl(), typeMapperRegistry()))
.map(SqlResult::value)
.otherwise(errorConverter()::reThrowError);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void tearUp(Vertx vertx, VertxTestContext ctx) {
void test_query(VertxTestContext ctx) {
final Books table = schema().BOOKS;
Checkpoint cp = ctx.checkpoint();
jooqxRx2.rxExecute(jooqx.dsl().selectFrom(table), DSLAdapter.fetchJsonRecords(table)).subscribe(recs -> {
jooqxRx2.rxExecute(dsl -> dsl.selectFrom(table), DSLAdapter.fetchJsonRecords(table)).subscribe(recs -> {
ctx.verify(() -> Assertions.assertEquals(7, recs.size()));
cp.flag();
}, ctx::failNow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void tearUp(Vertx vertx, VertxTestContext ctx) {
void test_query(VertxTestContext ctx) {
final Books table = schema().BOOKS;
Checkpoint cp = ctx.checkpoint();
jooqxRx3.rxExecute(jooqx.dsl().selectFrom(table), DSLAdapter.fetchJsonRecords(table)).subscribe(recs -> {
jooqxRx3.rxExecute(dsl -> dsl.selectFrom(table), DSLAdapter.fetchJsonRecords(table)).subscribe(recs -> {
ctx.verify(() -> Assertions.assertEquals(7, recs.size()));
cp.flag();
}, ctx::failNow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@ public void tearUp(Vertx vertx, VertxTestContext ctx) {
void test_query(VertxTestContext ctx) {
final Books table = schema().BOOKS;
Checkpoint cp = ctx.checkpoint();
jooqxMutiny.execute(jooqx.dsl().selectFrom(table), DSLAdapter.fetchJsonRecords(table))
.subscribe()
.with(recs -> {
ctx.verify(() -> Assertions.assertEquals(7, recs.size()));
cp.flag();
}, ctx::failNow);
jooqxMutiny.execute(dsl -> dsl.selectFrom(table), DSLAdapter.fetchJsonRecords(table)).subscribe().with(recs -> {
ctx.verify(() -> Assertions.assertEquals(7, recs.size()));
cp.flag();
}, ctx::failNow);
}

}

0 comments on commit 21b8553

Please sign in to comment.