From 0c8a3367661e13a945992398046b456759324c80 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Tue, 16 Jul 2024 11:21:52 +0300 Subject: [PATCH] Kafka Tx: include poll-time ConsumerGroupMetadata with the KafkaRecord --- .../revapi.json | 16 +++++- .../api/IncomingKafkaRecordBatchMetadata.java | 13 ++++- .../api/IncomingKafkaRecordMetadata.java | 13 ++++- .../messaging/kafka/IncomingKafkaRecord.java | 8 ++- .../kafka/IncomingKafkaRecordBatch.java | 9 ++- .../messaging/kafka/i18n/KafkaExceptions.java | 4 ++ .../messaging/kafka/i18n/KafkaLogging.java | 3 + .../kafka/impl/KafkaRecordHelper.java | 18 ++++++ .../impl/KafkaRecordStreamSubscription.java | 7 +++ .../transactions/KafkaTransactionsImpl.java | 20 ++++++- .../ExactlyOnceProcessingTest.java | 57 +++++++++++++++++++ 11 files changed, 156 insertions(+), 12 deletions(-) diff --git a/smallrye-reactive-messaging-kafka-api/revapi.json b/smallrye-reactive-messaging-kafka-api/revapi.json index 7ec71d275b..0296239eaf 100644 --- a/smallrye-reactive-messaging-kafka-api/revapi.json +++ b/smallrye-reactive-messaging-kafka-api/revapi.json @@ -24,7 +24,19 @@ "criticality" : "highlight", "minSeverity" : "POTENTIALLY_BREAKING", "minCriticality" : "documented", - "differences" : [ ] + "differences" : [ { + "code": "java.method.numberOfParametersChanged", + "old": "method void io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordBatchMetadata::(org.apache.kafka.clients.consumer.ConsumerRecords, java.lang.String, int, java.util.Map)", + "new": "method void io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordBatchMetadata::(org.apache.kafka.clients.consumer.ConsumerRecords, java.lang.String, int, java.util.Map, int)", + "justification": "Added consumer group generation id to the constructor" + }, + { + "code": "java.method.numberOfParametersChanged", + "old": "method void io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata::(org.apache.kafka.clients.consumer.ConsumerRecord, java.lang.String, int)", + "new": "method void io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata::(org.apache.kafka.clients.consumer.ConsumerRecord, java.lang.String, int, int)", + "justification": "Added consumer group generation id to the constructor" + } + ] } }, { "extension" : "revapi.reporter.json", @@ -43,4 +55,4 @@ "minCriticality" : "documented", "output" : "out" } -} ] \ No newline at end of file +} ] diff --git a/smallrye-reactive-messaging-kafka-api/src/main/java/io/smallrye/reactive/messaging/kafka/api/IncomingKafkaRecordBatchMetadata.java b/smallrye-reactive-messaging-kafka-api/src/main/java/io/smallrye/reactive/messaging/kafka/api/IncomingKafkaRecordBatchMetadata.java index a2422fd548..4e46221252 100644 --- a/smallrye-reactive-messaging-kafka-api/src/main/java/io/smallrye/reactive/messaging/kafka/api/IncomingKafkaRecordBatchMetadata.java +++ b/smallrye-reactive-messaging-kafka-api/src/main/java/io/smallrye/reactive/messaging/kafka/api/IncomingKafkaRecordBatchMetadata.java @@ -23,18 +23,20 @@ public class IncomingKafkaRecordBatchMetadata { private final String channel; private final int index; private final Map offsets; + private final int consumerGroupGenerationId; public IncomingKafkaRecordBatchMetadata(ConsumerRecords records, String channel, int index, - Map offsets) { + Map offsets, int consumerGroupGenerationId) { this.records = records; this.channel = channel; this.index = index; this.offsets = Collections.unmodifiableMap(offsets); + this.consumerGroupGenerationId = consumerGroupGenerationId; } public IncomingKafkaRecordBatchMetadata(ConsumerRecords records, String channel, Map offsets) { - this(records, channel, -1, offsets); + this(records, channel, -1, offsets, -1); } /** @@ -72,4 +74,11 @@ public Map getOffsets() { public int getConsumerIndex() { return index; } + + /** + * @return the consumer group metadata generation id at the time of polling this record + */ + public int getConsumerGroupGenerationId() { + return consumerGroupGenerationId; + } } diff --git a/smallrye-reactive-messaging-kafka-api/src/main/java/io/smallrye/reactive/messaging/kafka/api/IncomingKafkaRecordMetadata.java b/smallrye-reactive-messaging-kafka-api/src/main/java/io/smallrye/reactive/messaging/kafka/api/IncomingKafkaRecordMetadata.java index ea7fd648f9..be98384186 100644 --- a/smallrye-reactive-messaging-kafka-api/src/main/java/io/smallrye/reactive/messaging/kafka/api/IncomingKafkaRecordMetadata.java +++ b/smallrye-reactive-messaging-kafka-api/src/main/java/io/smallrye/reactive/messaging/kafka/api/IncomingKafkaRecordMetadata.java @@ -22,20 +22,22 @@ public class IncomingKafkaRecordMetadata implements KafkaMessageMetadata record, String channel, int index) { + public IncomingKafkaRecordMetadata(ConsumerRecord record, String channel, int index, int consumerGroupGenerationId) { this.record = record; this.channel = channel; this.index = index; + this.consumerGroupGenerationId = consumerGroupGenerationId; } public IncomingKafkaRecordMetadata(ConsumerRecord record, String channel) { - this(record, channel, -1); + this(record, channel, -1, -1); } /** @@ -125,4 +127,11 @@ public String getChannel() { public int getConsumerIndex() { return index; } + + /** + * @return the consumer group metadata generation id at the time of polling this record + */ + public int getConsumerGroupGenerationId() { + return consumerGroupGenerationId; + } } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecord.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecord.java index 6425943ade..ad275393b3 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecord.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecord.java @@ -14,6 +14,7 @@ import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler; import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler; +import io.smallrye.reactive.messaging.kafka.impl.KafkaRecordHelper; import io.smallrye.reactive.messaging.kafka.impl.ce.KafkaCloudEventHelper; import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage; import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; @@ -34,7 +35,8 @@ public IncomingKafkaRecord(ConsumerRecord record, boolean cloudEventEnabled, boolean tracingEnabled) { this.commitHandler = commitHandler; - this.kafkaMetadata = new IncomingKafkaRecordMetadata<>(record, channel, index); + int generationId = KafkaRecordHelper.extractGenerationIdFrom(record); + this.kafkaMetadata = new IncomingKafkaRecordMetadata<>(record, channel, index, generationId); ArrayList meta = new ArrayList<>(); meta.add(this.kafkaMetadata); @@ -102,6 +104,10 @@ public long getOffset() { return kafkaMetadata.getOffset(); } + public int getConsumerGroupGenerationId() { + return kafkaMetadata.getConsumerGroupGenerationId(); + } + @Override public Metadata getMetadata() { return metadata; diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecordBatch.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecordBatch.java index 26dba7c095..b8794bf125 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecordBatch.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecordBatch.java @@ -45,8 +45,13 @@ public IncomingKafkaRecordBatch(ConsumerRecords records, String channel, i this.incomingRecords = Collections.unmodifiableList(incomingRecords); this.latestOffsetRecords = Collections.unmodifiableMap(latestOffsetRecords); Map offsets = new HashMap<>(); - latestOffsetRecords.forEach((e, r) -> offsets.put(e, new OffsetAndMetadata(r.getOffset()))); - this.metadata = captureContextMetadata(new IncomingKafkaRecordBatchMetadata<>(records, channel, index, offsets)); + int generationId = -1; + for (var entry : latestOffsetRecords.entrySet()) { + generationId = entry.getValue().getConsumerGroupGenerationId(); + offsets.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().getOffset())); + } + this.metadata = captureContextMetadata( + new IncomingKafkaRecordBatchMetadata<>(records, channel, index, offsets, generationId)); } @Override diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/i18n/KafkaExceptions.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/i18n/KafkaExceptions.java index 929f0fb66c..72d7c1b0d9 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/i18n/KafkaExceptions.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/i18n/KafkaExceptions.java @@ -93,4 +93,8 @@ public interface KafkaExceptions { @Message(id = 18024, value = "Invalid Kafka incoming configuration for channel `%s`, `assign-seek` portion `%s` is invalid. If topic portion is not present, a single `topic` configuration is needed.") IllegalArgumentException invalidAssignSeekTopic(String channel, String assignSeek); + + @Message(id = 18025, value = "Partition rebalance during exactly-once processing for channel `%s`: current consumer group metadata: %s, generation id for message: %s") + IllegalStateException exactlyOnceProcessingRebalance(String channel, String groupMetadata, String generationId); + } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/i18n/KafkaLogging.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/i18n/KafkaLogging.java index dbde15c4c9..80e8f2eccb 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/i18n/KafkaLogging.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/i18n/KafkaLogging.java @@ -353,4 +353,7 @@ void delayedRetryTopic(String channel, Collection retryTopics, int maxRe @Message(id = 18283, value = "Failure from channel `%s` request/reply consumer for topic `%s`") void requestReplyConsumerFailure(String channel, String replyTopic, @Cause Throwable throwable); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 18284, value = "Transaction commit failed for channel, aborting the transaction") + void transactionCommitFailed(@Cause Throwable throwable); } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaRecordHelper.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaRecordHelper.java index ba8f345963..82e99c4a3d 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaRecordHelper.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaRecordHelper.java @@ -4,6 +4,8 @@ import java.util.Set; import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -14,6 +16,8 @@ public class KafkaRecordHelper { + public static final String SMALLRYE_GENERATION_ID = "__smallrye_generationId"; + public static Headers getHeaders(OutgoingKafkaRecordMetadata om, IncomingKafkaRecordMetadata im, RuntimeKafkaSinkConfiguration configuration) { @@ -47,4 +51,18 @@ public static Headers getHeaders(OutgoingKafkaRecordMetadata om, public static boolean isNotBlank(String s) { return s != null && !s.trim().isEmpty(); } + + public static void addGenerationIdToHeaders(ConsumerRecord record, ConsumerGroupMetadata metadata) { + record.headers().add(SMALLRYE_GENERATION_ID, Integer.toString(metadata.generationId()).getBytes()); + } + + public static int extractGenerationIdFrom(ConsumerRecord record) { + Header header = record.headers().lastHeader(SMALLRYE_GENERATION_ID); + if (header != null) { + record.headers().remove(SMALLRYE_GENERATION_ID); + return Integer.parseInt(new String(header.value())); + } + return -1; + } + } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaRecordStreamSubscription.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaRecordStreamSubscription.java index 522213671c..f4b8401b06 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaRecordStreamSubscription.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaRecordStreamSubscription.java @@ -10,6 +10,8 @@ import java.util.function.UnaryOperator; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import io.smallrye.mutiny.Uni; @@ -88,6 +90,11 @@ public KafkaRecordStreamSubscription( if (log.isTraceEnabled()) { log.tracef("Adding %s messages to the queue", cr.count()); } + // Called on the polling thread + ConsumerGroupMetadata metadata = client.unwrap().groupMetadata(); + for (ConsumerRecord r : cr) { + KafkaRecordHelper.addGenerationIdToHeaders(r, metadata); + } enqueueFunction.accept(cr, queue); return cr; }) diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/transactions/KafkaTransactionsImpl.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/transactions/KafkaTransactionsImpl.java index 5362611c38..b43d9735a5 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/transactions/KafkaTransactionsImpl.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/transactions/KafkaTransactionsImpl.java @@ -78,6 +78,7 @@ public Uni withTransaction(Message message, Function offsets; + int generationId; Optional batchMetadata = message .getMetadata(IncomingKafkaRecordBatchMetadata.class); @@ -85,18 +86,19 @@ public Uni withTransaction(Message message, Function metadata = batchMetadata.get(); channel = metadata.getChannel(); + generationId = metadata.getConsumerGroupGenerationId(); offsets = metadata.getOffsets().entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue().offset() + 1))); } else if (recordMetadata.isPresent()) { IncomingKafkaRecordMetadata metadata = recordMetadata.get(); channel = metadata.getChannel(); offsets = new HashMap<>(); + generationId = metadata.getConsumerGroupGenerationId(); offsets.put(TopicPartitions.getTopicPartition(metadata.getTopic(), metadata.getPartition()), new OffsetAndMetadata(metadata.getOffset() + 1)); } else { throw KafkaExceptions.ex.noKafkaMetadataFound(message); } - List> consumers = clientService.getConsumers(channel); if (consumers.isEmpty()) { throw KafkaExceptions.ex.unableToFindConsumerForChannel(channel); @@ -107,8 +109,20 @@ public Uni withTransaction(Message message, Function( /* before commit */ - consumer.consumerGroupMetadata() - .chain(groupMetadata -> producer.sendOffsetsToTransaction(offsets, groupMetadata)), + consumer.consumerGroupMetadata().chain(groupMetadata -> { + // if the generationId is the same, we can send the offsets to tx + if (groupMetadata.generationId() == generationId) { + // stay on the polling thread + producer.unwrap().sendOffsetsToTransaction(offsets, groupMetadata); + return Uni.createFrom().voidItem(); + } else { + // abort the transaction if the generationId is different, + // after abort will set the consumer position to the last committed positions + return Uni.createFrom().failure( + KafkaExceptions.ex.exactlyOnceProcessingRebalance(channel, groupMetadata.toString(), + String.valueOf(generationId))); + } + }), r -> Uni.createFrom().item(r), VOID_UNI, /* after abort */ diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/transactions/ExactlyOnceProcessingTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/transactions/ExactlyOnceProcessingTest.java index 7c0437789e..8efcee9d97 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/transactions/ExactlyOnceProcessingTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/transactions/ExactlyOnceProcessingTest.java @@ -5,6 +5,7 @@ import java.time.Duration; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -14,6 +15,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.serialization.IntegerDeserializer; @@ -21,10 +23,12 @@ import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.OnOverflow; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.kafka.KafkaRecord; +import io.smallrye.reactive.messaging.kafka.TestTags; import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase; import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig; import io.smallrye.reactive.messaging.kafka.companion.ConsumerTask; @@ -65,13 +69,21 @@ public static class ExactlyOnceProcessor { @Channel("transactional-producer") KafkaTransactions transaction; + List processed = new CopyOnWriteArrayList<>(); + @Incoming("exactly-once-consumer") Uni process(KafkaRecord record) { return transaction.withTransaction(record, emitter -> { emitter.send(KafkaRecord.of(record.getKey(), record.getPayload())); + processed.add(record.getPayload()); return Uni.createFrom().voidItem(); }); } + + public List getProcessed() { + return processed; + } + } @Test @@ -116,6 +128,51 @@ void testExactlyOnceProcessorWithProcessingErrorWithMultiplePartitions() { await().until(() -> !healthCenter.getLiveness().isOk()); } + @Test + @Tag(TestTags.SLOW) + void testExactlyOnceProcessorWithWithMultiplePartitions() throws InterruptedException { + inTopic = companion.topics().createAndWait(Uuid.randomUuid().toString(), 3); + outTopic = companion.topics().createAndWait(Uuid.randomUuid().toString(), 3); + int numberOfRecords = 10000; + MapBasedConfig config = new MapBasedConfig(producerConfig()); + config.putAll(consumerConfig()); + runApplication(config, ExactlyOnceProcessor.class); + + companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(inTopic, i % 3, "k" + i, i), numberOfRecords); + + HealthCenter healthCenter = get(HealthCenter.class); + await().until(() -> healthCenter.getLiveness().isOk()); + + Thread.sleep(1000); + + List processed = new CopyOnWriteArrayList<>(); + + try (var toClose = companion.processTransactional(Set.of(inTopic), + companion.consumeIntegers() + .withOffsetReset(OffsetResetStrategy.EARLIEST) + .withProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1) + .withOnPartitionsAssigned(partitions -> System.out.println(partitions + " assigned")) + .withGroupId("my-consumer"), + companion.produceIntegers() + .withTransactionalId("tx-producer-1") + .withProp("acks", "all"), + c -> { + processed.add(c.value()); + return new ProducerRecord<>(outTopic, c.value()); + })) { + List> committed = companion.consumeIntegers() + .withProp(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed") + .fromTopics(outTopic, numberOfRecords) + .awaitCompletion(Duration.ofMinutes(3)) + .getRecords(); + + assertThat(committed) + .extracting(ConsumerRecord::value) + .doesNotHaveDuplicates() + .containsAll(IntStream.range(0, numberOfRecords).boxed().collect(Collectors.toList())); + } + } + private KafkaMapBasedConfig producerConfig() { return kafkaConfig("mp.messaging.outgoing.transactional-producer") .with("topic", outTopic)