Skip to content

Commit

Permalink
Kafka Tx: include poll-time ConsumerGroupMetadata with the KafkaRecord
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Jul 17, 2024
1 parent 61a492b commit 0c8a336
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 12 deletions.
16 changes: 14 additions & 2 deletions smallrye-reactive-messaging-kafka-api/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,19 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [ {
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordBatchMetadata<K, T>::<init>(org.apache.kafka.clients.consumer.ConsumerRecords<K, T>, java.lang.String, int, java.util.Map<org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata>)",
"new": "method void io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordBatchMetadata<K, T>::<init>(org.apache.kafka.clients.consumer.ConsumerRecords<K, T>, java.lang.String, int, java.util.Map<org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata>, int)",
"justification": "Added consumer group generation id to the constructor"
},
{
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata<K, T>::<init>(org.apache.kafka.clients.consumer.ConsumerRecord<K, T>, java.lang.String, int)",
"new": "method void io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata<K, T>::<init>(org.apache.kafka.clients.consumer.ConsumerRecord<K, T>, java.lang.String, int, int)",
"justification": "Added consumer group generation id to the constructor"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand All @@ -43,4 +55,4 @@
"minCriticality" : "documented",
"output" : "out"
}
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@ public class IncomingKafkaRecordBatchMetadata<K, T> {
private final String channel;
private final int index;
private final Map<TopicPartition, OffsetAndMetadata> offsets;
private final int consumerGroupGenerationId;

public IncomingKafkaRecordBatchMetadata(ConsumerRecords<K, T> records, String channel, int index,
Map<TopicPartition, OffsetAndMetadata> offsets) {
Map<TopicPartition, OffsetAndMetadata> offsets, int consumerGroupGenerationId) {
this.records = records;
this.channel = channel;
this.index = index;
this.offsets = Collections.unmodifiableMap(offsets);
this.consumerGroupGenerationId = consumerGroupGenerationId;
}

public IncomingKafkaRecordBatchMetadata(ConsumerRecords<K, T> records, String channel,
Map<TopicPartition, OffsetAndMetadata> offsets) {
this(records, channel, -1, offsets);
this(records, channel, -1, offsets, -1);
}

/**
Expand Down Expand Up @@ -72,4 +74,11 @@ public Map<TopicPartition, OffsetAndMetadata> getOffsets() {
public int getConsumerIndex() {
return index;
}

/**
* @return the consumer group metadata generation id at the time of polling this record
*/
public int getConsumerGroupGenerationId() {
return consumerGroupGenerationId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,22 @@ public class IncomingKafkaRecordMetadata<K, T> implements KafkaMessageMetadata<K
private volatile Headers headers;
private final String channel;
private final int index;
private final int consumerGroupGenerationId;

/**
* Constructor
*
* @param record the underlying record received from Kafka
*/
public IncomingKafkaRecordMetadata(ConsumerRecord<K, T> record, String channel, int index) {
public IncomingKafkaRecordMetadata(ConsumerRecord<K, T> record, String channel, int index, int consumerGroupGenerationId) {
this.record = record;
this.channel = channel;
this.index = index;
this.consumerGroupGenerationId = consumerGroupGenerationId;
}

public IncomingKafkaRecordMetadata(ConsumerRecord<K, T> record, String channel) {
this(record, channel, -1);
this(record, channel, -1, -1);
}

/**
Expand Down Expand Up @@ -125,4 +127,11 @@ public String getChannel() {
public int getConsumerIndex() {
return index;
}

/**
* @return the consumer group metadata generation id at the time of polling this record
*/
public int getConsumerGroupGenerationId() {
return consumerGroupGenerationId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.impl.KafkaRecordHelper;
import io.smallrye.reactive.messaging.kafka.impl.ce.KafkaCloudEventHelper;
import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
Expand All @@ -34,7 +35,8 @@ public IncomingKafkaRecord(ConsumerRecord<K, T> record,
boolean cloudEventEnabled,
boolean tracingEnabled) {
this.commitHandler = commitHandler;
this.kafkaMetadata = new IncomingKafkaRecordMetadata<>(record, channel, index);
int generationId = KafkaRecordHelper.extractGenerationIdFrom(record);
this.kafkaMetadata = new IncomingKafkaRecordMetadata<>(record, channel, index, generationId);

ArrayList<Object> meta = new ArrayList<>();
meta.add(this.kafkaMetadata);
Expand Down Expand Up @@ -102,6 +104,10 @@ public long getOffset() {
return kafkaMetadata.getOffset();
}

public int getConsumerGroupGenerationId() {
return kafkaMetadata.getConsumerGroupGenerationId();
}

@Override
public Metadata getMetadata() {
return metadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ public IncomingKafkaRecordBatch(ConsumerRecords<K, T> records, String channel, i
this.incomingRecords = Collections.unmodifiableList(incomingRecords);
this.latestOffsetRecords = Collections.unmodifiableMap(latestOffsetRecords);
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
latestOffsetRecords.forEach((e, r) -> offsets.put(e, new OffsetAndMetadata(r.getOffset())));
this.metadata = captureContextMetadata(new IncomingKafkaRecordBatchMetadata<>(records, channel, index, offsets));
int generationId = -1;
for (var entry : latestOffsetRecords.entrySet()) {
generationId = entry.getValue().getConsumerGroupGenerationId();
offsets.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().getOffset()));
}
this.metadata = captureContextMetadata(
new IncomingKafkaRecordBatchMetadata<>(records, channel, index, offsets, generationId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,8 @@ public interface KafkaExceptions {

@Message(id = 18024, value = "Invalid Kafka incoming configuration for channel `%s`, `assign-seek` portion `%s` is invalid. If topic portion is not present, a single `topic` configuration is needed.")
IllegalArgumentException invalidAssignSeekTopic(String channel, String assignSeek);

@Message(id = 18025, value = "Partition rebalance during exactly-once processing for channel `%s`: current consumer group metadata: %s, generation id for message: %s")
IllegalStateException exactlyOnceProcessingRebalance(String channel, String groupMetadata, String generationId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -353,4 +353,7 @@ void delayedRetryTopic(String channel, Collection<String> retryTopics, int maxRe
@Message(id = 18283, value = "Failure from channel `%s` request/reply consumer for topic `%s`")
void requestReplyConsumerFailure(String channel, String replyTopic, @Cause Throwable throwable);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 18284, value = "Transaction commit failed for channel, aborting the transaction")
void transactionCommitFailed(@Cause Throwable throwable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
Expand All @@ -14,6 +16,8 @@

public class KafkaRecordHelper {

public static final String SMALLRYE_GENERATION_ID = "__smallrye_generationId";

public static Headers getHeaders(OutgoingKafkaRecordMetadata<?> om,
IncomingKafkaRecordMetadata<?, ?> im,
RuntimeKafkaSinkConfiguration configuration) {
Expand Down Expand Up @@ -47,4 +51,18 @@ public static Headers getHeaders(OutgoingKafkaRecordMetadata<?> om,
public static boolean isNotBlank(String s) {
return s != null && !s.trim().isEmpty();
}

public static void addGenerationIdToHeaders(ConsumerRecord<?, ?> record, ConsumerGroupMetadata metadata) {
record.headers().add(SMALLRYE_GENERATION_ID, Integer.toString(metadata.generationId()).getBytes());
}

public static int extractGenerationIdFrom(ConsumerRecord<?, ?> record) {
Header header = record.headers().lastHeader(SMALLRYE_GENERATION_ID);
if (header != null) {
record.headers().remove(SMALLRYE_GENERATION_ID);
return Integer.parseInt(new String(header.value()));
}
return -1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.function.UnaryOperator;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -88,6 +90,11 @@ public KafkaRecordStreamSubscription(
if (log.isTraceEnabled()) {
log.tracef("Adding %s messages to the queue", cr.count());
}
// Called on the polling thread
ConsumerGroupMetadata metadata = client.unwrap().groupMetadata();
for (ConsumerRecord<K, V> r : cr) {
KafkaRecordHelper.addGenerationIdToHeaders(r, metadata);
}
enqueueFunction.accept(cr, queue);
return cr;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,27 @@ public <R> Uni<R> withTransaction(Message<?> message, Function<TransactionalEmit
try {
String channel;
Map<TopicPartition, OffsetAndMetadata> offsets;
int generationId;

Optional<IncomingKafkaRecordBatchMetadata> batchMetadata = message
.getMetadata(IncomingKafkaRecordBatchMetadata.class);
Optional<IncomingKafkaRecordMetadata> recordMetadata = message.getMetadata(IncomingKafkaRecordMetadata.class);
if (batchMetadata.isPresent()) {
IncomingKafkaRecordBatchMetadata<?, ?> metadata = batchMetadata.get();
channel = metadata.getChannel();
generationId = metadata.getConsumerGroupGenerationId();
offsets = metadata.getOffsets().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue().offset() + 1)));
} else if (recordMetadata.isPresent()) {
IncomingKafkaRecordMetadata<?, ?> metadata = recordMetadata.get();
channel = metadata.getChannel();
offsets = new HashMap<>();
generationId = metadata.getConsumerGroupGenerationId();
offsets.put(TopicPartitions.getTopicPartition(metadata.getTopic(), metadata.getPartition()),
new OffsetAndMetadata(metadata.getOffset() + 1));
} else {
throw KafkaExceptions.ex.noKafkaMetadataFound(message);
}

List<KafkaConsumer<Object, Object>> consumers = clientService.getConsumers(channel);
if (consumers.isEmpty()) {
throw KafkaExceptions.ex.unableToFindConsumerForChannel(channel);
Expand All @@ -107,8 +109,20 @@ public <R> Uni<R> withTransaction(Message<?> message, Function<TransactionalEmit
if (currentTransaction == null) {
return new Transaction<R>(
/* before commit */
consumer.consumerGroupMetadata()
.chain(groupMetadata -> producer.sendOffsetsToTransaction(offsets, groupMetadata)),
consumer.consumerGroupMetadata().chain(groupMetadata -> {
// if the generationId is the same, we can send the offsets to tx
if (groupMetadata.generationId() == generationId) {
// stay on the polling thread
producer.unwrap().sendOffsetsToTransaction(offsets, groupMetadata);
return Uni.createFrom().voidItem();
} else {
// abort the transaction if the generationId is different,
// after abort will set the consumer position to the last committed positions
return Uni.createFrom().failure(
KafkaExceptions.ex.exactlyOnceProcessingRebalance(channel, groupMetadata.toString(),
String.valueOf(generationId)));
}
}),
r -> Uni.createFrom().item(r),
VOID_UNI,
/* after abort */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -14,17 +15,20 @@

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.TestTags;
import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase;
import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig;
import io.smallrye.reactive.messaging.kafka.companion.ConsumerTask;
Expand Down Expand Up @@ -65,13 +69,21 @@ public static class ExactlyOnceProcessor {
@Channel("transactional-producer")
KafkaTransactions<Integer> transaction;

List<Integer> processed = new CopyOnWriteArrayList<>();

@Incoming("exactly-once-consumer")
Uni<Void> process(KafkaRecord<String, Integer> record) {
return transaction.withTransaction(record, emitter -> {
emitter.send(KafkaRecord.of(record.getKey(), record.getPayload()));
processed.add(record.getPayload());
return Uni.createFrom().voidItem();
});
}

public List<Integer> getProcessed() {
return processed;
}

}

@Test
Expand Down Expand Up @@ -116,6 +128,51 @@ void testExactlyOnceProcessorWithProcessingErrorWithMultiplePartitions() {
await().until(() -> !healthCenter.getLiveness().isOk());
}

@Test
@Tag(TestTags.SLOW)
void testExactlyOnceProcessorWithWithMultiplePartitions() throws InterruptedException {
inTopic = companion.topics().createAndWait(Uuid.randomUuid().toString(), 3);
outTopic = companion.topics().createAndWait(Uuid.randomUuid().toString(), 3);
int numberOfRecords = 10000;
MapBasedConfig config = new MapBasedConfig(producerConfig());
config.putAll(consumerConfig());
runApplication(config, ExactlyOnceProcessor.class);

companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(inTopic, i % 3, "k" + i, i), numberOfRecords);

HealthCenter healthCenter = get(HealthCenter.class);
await().until(() -> healthCenter.getLiveness().isOk());

Thread.sleep(1000);

List<Integer> processed = new CopyOnWriteArrayList<>();

try (var toClose = companion.processTransactional(Set.of(inTopic),
companion.consumeIntegers()
.withOffsetReset(OffsetResetStrategy.EARLIEST)
.withProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1)
.withOnPartitionsAssigned(partitions -> System.out.println(partitions + " assigned"))
.withGroupId("my-consumer"),
companion.produceIntegers()
.withTransactionalId("tx-producer-1")
.withProp("acks", "all"),
c -> {
processed.add(c.value());
return new ProducerRecord<>(outTopic, c.value());
})) {
List<ConsumerRecord<String, Integer>> committed = companion.consumeIntegers()
.withProp(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
.fromTopics(outTopic, numberOfRecords)
.awaitCompletion(Duration.ofMinutes(3))
.getRecords();

assertThat(committed)
.extracting(ConsumerRecord::value)
.doesNotHaveDuplicates()
.containsAll(IntStream.range(0, numberOfRecords).boxed().collect(Collectors.toList()));
}
}

private KafkaMapBasedConfig producerConfig() {
return kafkaConfig("mp.messaging.outgoing.transactional-producer")
.with("topic", outTopic)
Expand Down

0 comments on commit 0c8a336

Please sign in to comment.