diff --git a/core/src/main/java/io/github/zero88/jooqx/JooqxResultCollector.java b/core/src/main/java/io/github/zero88/jooqx/JooqxResultCollector.java index d2ddfca4..7d8cbc87 100644 --- a/core/src/main/java/io/github/zero88/jooqx/JooqxResultCollector.java +++ b/core/src/main/java/io/github/zero88/jooqx/JooqxResultCollector.java @@ -1,5 +1,8 @@ package io.github.zero88.jooqx; +import java.util.List; +import java.util.stream.Collector; + import org.jetbrains.annotations.NotNull; import org.jooq.DSLContext; @@ -30,4 +33,19 @@ static JooqxResultCollector create() { @NotNull SQLResultAdapter adapter, @NotNull DSLContext dslContext, @NotNull DataTypeMapperRegistry registry); + /** + * Create collector + * + * @param adapter SQL result adapter + * @param dslContext dsl context + * @param registry data type mapper registry + * @param Type of intermediate row, might be jOOQ record or custom type + * @param Type of expectation result + * @return the collector + */ + @GenIgnore + @NotNull Collector, RESULT> collector(@NotNull SQLResultAdapter adapter, + @NotNull DSLContext dslContext, + @NotNull DataTypeMapperRegistry registry); + } diff --git a/core/src/main/java/io/github/zero88/jooqx/JooqxSQLImpl.java b/core/src/main/java/io/github/zero88/jooqx/JooqxSQLImpl.java index ed16240d..68c350e4 100644 --- a/core/src/main/java/io/github/zero88/jooqx/JooqxSQLImpl.java +++ b/core/src/main/java/io/github/zero88/jooqx/JooqxSQLImpl.java @@ -5,6 +5,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collector; @@ -77,12 +78,20 @@ protected Tuple doConvert(Map> params, DataTypeMapperRegistry r static class ReactiveSQLRC implements JooqxResultCollector { @Override - public @NotNull RESULT collect(@NotNull RowSet resultSet, + public @NotNull RESULT collect(@NotNull RowSet rs, @NotNull SQLResultAdapter adapter, @NotNull DSLContext dslContext, @NotNull DataTypeMapperRegistry registry) { - return adapter.collect( - collect(resultSet, dslContext, registry, adapter.recordFactory(), adapter.strategy())); + return adapter.collect(collect(rs, dslContext, registry, adapter.recordFactory(), adapter.strategy())); + } + + @Override + public @NotNull Collector, RESULT> collector( + @NotNull SQLResultAdapter adapter, @NotNull DSLContext dsl, + @NotNull DataTypeMapperRegistry registry) { + return Collector.of(ArrayList::new, + (rows, row) -> rows.add(rowToRecord(row, dsl, registry, adapter.recordFactory())), + (rows1, rows2) -> rows2, rows -> collect(adapter, rows)); } @NotNull @@ -92,36 +101,41 @@ protected final List collect(@NotNull RowSet resultSet, @NotNull @NotNull SelectStrategy strategy) { final List records = new ArrayList<>(); final RowIterator iterator = resultSet.iterator(); - final Function fn = row -> IntStream.range(0, row.size()) - .mapToObj(i -> recordFactory.lookup(row.getColumnName(i), i)) - .filter(Objects::nonNull) - .collect(rowToRecord(row, dsl, registry, recordFactory)); if (strategy == SelectStrategy.MANY) { - iterator.forEachRemaining(row -> records.add(fn.apply(row))); + iterator.forEachRemaining(row -> records.add(rowToRecord(row, dsl, registry, recordFactory))); } else if (iterator.hasNext()) { final Row row = iterator.next(); if (iterator.hasNext()) { throw new TooManyRowsException(); } - records.add(fn.apply(row)); + records.add(rowToRecord(row, dsl, registry, recordFactory)); } return records; } - private Collector rowToRecord(@NotNull Row row, - @NotNull DSLContext dsl, - @NotNull DataTypeMapperRegistry registry, - RecordFactory recordFactory) { - BiFunction getValue = (f, r) -> { + private ROW rowToRecord(@NotNull Row row, @NotNull DSLContext dsl, + @NotNull DataTypeMapperRegistry registry, + RecordFactory recordFactory) { + final Function getValue = (f) -> { try { - return r.getValue(f.field().getName()); + return registry.toUserType(f.field(), row.getValue(f.field().getName())); } catch (NoSuchElementException e) { - return r.getValue(f.colNo()); + return registry.toUserType(f.field(), row.getValue(f.colNo())); } }; - return Collector.of(() -> recordFactory.create(dsl), - (rec, f) -> rec.set(f.field(), registry.toUserType(f.field(), getValue.apply(f, row))), - (rec1, rec2) -> rec2, recordFactory::map); + final BiConsumer accumulator = (rec, f) -> rec.set(f.field(), getValue.apply(f)); + return IntStream.range(0, row.size()) + .mapToObj(i -> recordFactory.lookup(row.getColumnName(i), i)) + .filter(Objects::nonNull) + .collect(Collector.of(() -> recordFactory.create(dsl), accumulator, (r1, r2) -> r2, + recordFactory::map)); + } + + private static RESULT collect(@NotNull SQLResultAdapter adapter, List rows) { + if (adapter.strategy() == SelectStrategy.FIRST_ONE && rows.size() > 1) { + throw new TooManyRowsException(); + } + return adapter.collect(rows); } } @@ -268,8 +282,9 @@ private void tweakDSLSetting() { @Override public Future<@Nullable R> execute(@NotNull Query query, @NotNull SQLResultAdapter adapter) { return sqlClient().preparedQuery(preparedQuery().sql(dsl().configuration(), query)) + .collecting(resultCollector().collector(adapter, dsl(), typeMapperRegistry())) .execute(preparedQuery().bindValues(query, typeMapperRegistry())) - .map(rs -> resultCollector().collect(rs, adapter, dsl(), typeMapperRegistry())) + .map(SqlResult::value) .otherwise(errorConverter()::reThrowError); } diff --git a/integtest/pg/src/test/java/io/github/zero88/integtest/jooqx/pg/jooq/PgReARx2Test.java b/integtest/pg/src/test/java/io/github/zero88/integtest/jooqx/pg/jooq/PgReARx2Test.java index 25d54d38..37ac1c16 100644 --- a/integtest/pg/src/test/java/io/github/zero88/integtest/jooqx/pg/jooq/PgReARx2Test.java +++ b/integtest/pg/src/test/java/io/github/zero88/integtest/jooqx/pg/jooq/PgReARx2Test.java @@ -38,7 +38,7 @@ public void tearUp(Vertx vertx, VertxTestContext ctx) { void test_query(VertxTestContext ctx) { final Books table = schema().BOOKS; Checkpoint cp = ctx.checkpoint(); - jooqxRx2.rxExecute(jooqx.dsl().selectFrom(table), DSLAdapter.fetchJsonRecords(table)).subscribe(recs -> { + jooqxRx2.rxExecute(dsl -> dsl.selectFrom(table), DSLAdapter.fetchJsonRecords(table)).subscribe(recs -> { ctx.verify(() -> Assertions.assertEquals(7, recs.size())); cp.flag(); }, ctx::failNow); diff --git a/integtest/pg/src/test/java/io/github/zero88/integtest/jooqx/pg/jooq/PgReARx3Test.java b/integtest/pg/src/test/java/io/github/zero88/integtest/jooqx/pg/jooq/PgReARx3Test.java index a8956fec..ad39a582 100644 --- a/integtest/pg/src/test/java/io/github/zero88/integtest/jooqx/pg/jooq/PgReARx3Test.java +++ b/integtest/pg/src/test/java/io/github/zero88/integtest/jooqx/pg/jooq/PgReARx3Test.java @@ -38,7 +38,7 @@ public void tearUp(Vertx vertx, VertxTestContext ctx) { void test_query(VertxTestContext ctx) { final Books table = schema().BOOKS; Checkpoint cp = ctx.checkpoint(); - jooqxRx3.rxExecute(jooqx.dsl().selectFrom(table), DSLAdapter.fetchJsonRecords(table)).subscribe(recs -> { + jooqxRx3.rxExecute(dsl -> dsl.selectFrom(table), DSLAdapter.fetchJsonRecords(table)).subscribe(recs -> { ctx.verify(() -> Assertions.assertEquals(7, recs.size())); cp.flag(); }, ctx::failNow); diff --git a/integtest/pg/src/test/java/io/github/zero88/integtest/jooqx/pg/jooq/PgReARxMutinyTest.java b/integtest/pg/src/test/java/io/github/zero88/integtest/jooqx/pg/jooq/PgReARxMutinyTest.java index fc995524..261a0a28 100644 --- a/integtest/pg/src/test/java/io/github/zero88/integtest/jooqx/pg/jooq/PgReARxMutinyTest.java +++ b/integtest/pg/src/test/java/io/github/zero88/integtest/jooqx/pg/jooq/PgReARxMutinyTest.java @@ -37,12 +37,10 @@ public void tearUp(Vertx vertx, VertxTestContext ctx) { void test_query(VertxTestContext ctx) { final Books table = schema().BOOKS; Checkpoint cp = ctx.checkpoint(); - jooqxMutiny.execute(jooqx.dsl().selectFrom(table), DSLAdapter.fetchJsonRecords(table)) - .subscribe() - .with(recs -> { - ctx.verify(() -> Assertions.assertEquals(7, recs.size())); - cp.flag(); - }, ctx::failNow); + jooqxMutiny.execute(dsl -> dsl.selectFrom(table), DSLAdapter.fetchJsonRecords(table)).subscribe().with(recs -> { + ctx.verify(() -> Assertions.assertEquals(7, recs.size())); + cp.flag(); + }, ctx::failNow); } }