Skip to content

Commit

Permalink
Merge pull request #2692 from ozangunalp/kafka_tx_fix
Browse files Browse the repository at this point in the history
Improve failure handling for Kafka transactions
  • Loading branch information
ozangunalp authored Jul 18, 2024
2 parents 18ede8f + 0c8a336 commit c1e4e23
Show file tree
Hide file tree
Showing 13 changed files with 172 additions and 14 deletions.
16 changes: 14 additions & 2 deletions smallrye-reactive-messaging-kafka-api/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,19 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [ {
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordBatchMetadata<K, T>::<init>(org.apache.kafka.clients.consumer.ConsumerRecords<K, T>, java.lang.String, int, java.util.Map<org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata>)",
"new": "method void io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordBatchMetadata<K, T>::<init>(org.apache.kafka.clients.consumer.ConsumerRecords<K, T>, java.lang.String, int, java.util.Map<org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata>, int)",
"justification": "Added consumer group generation id to the constructor"
},
{
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata<K, T>::<init>(org.apache.kafka.clients.consumer.ConsumerRecord<K, T>, java.lang.String, int)",
"new": "method void io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata<K, T>::<init>(org.apache.kafka.clients.consumer.ConsumerRecord<K, T>, java.lang.String, int, int)",
"justification": "Added consumer group generation id to the constructor"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand All @@ -43,4 +55,4 @@
"minCriticality" : "documented",
"output" : "out"
}
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@ public class IncomingKafkaRecordBatchMetadata<K, T> {
private final String channel;
private final int index;
private final Map<TopicPartition, OffsetAndMetadata> offsets;
private final int consumerGroupGenerationId;

public IncomingKafkaRecordBatchMetadata(ConsumerRecords<K, T> records, String channel, int index,
Map<TopicPartition, OffsetAndMetadata> offsets) {
Map<TopicPartition, OffsetAndMetadata> offsets, int consumerGroupGenerationId) {
this.records = records;
this.channel = channel;
this.index = index;
this.offsets = Collections.unmodifiableMap(offsets);
this.consumerGroupGenerationId = consumerGroupGenerationId;
}

public IncomingKafkaRecordBatchMetadata(ConsumerRecords<K, T> records, String channel,
Map<TopicPartition, OffsetAndMetadata> offsets) {
this(records, channel, -1, offsets);
this(records, channel, -1, offsets, -1);
}

/**
Expand Down Expand Up @@ -72,4 +74,11 @@ public Map<TopicPartition, OffsetAndMetadata> getOffsets() {
public int getConsumerIndex() {
return index;
}

/**
* @return the consumer group metadata generation id at the time of polling this record
*/
public int getConsumerGroupGenerationId() {
return consumerGroupGenerationId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,22 @@ public class IncomingKafkaRecordMetadata<K, T> implements KafkaMessageMetadata<K
private volatile Headers headers;
private final String channel;
private final int index;
private final int consumerGroupGenerationId;

/**
* Constructor
*
* @param record the underlying record received from Kafka
*/
public IncomingKafkaRecordMetadata(ConsumerRecord<K, T> record, String channel, int index) {
public IncomingKafkaRecordMetadata(ConsumerRecord<K, T> record, String channel, int index, int consumerGroupGenerationId) {
this.record = record;
this.channel = channel;
this.index = index;
this.consumerGroupGenerationId = consumerGroupGenerationId;
}

public IncomingKafkaRecordMetadata(ConsumerRecord<K, T> record, String channel) {
this(record, channel, -1);
this(record, channel, -1, -1);
}

/**
Expand Down Expand Up @@ -125,4 +127,11 @@ public String getChannel() {
public int getConsumerIndex() {
return index;
}

/**
* @return the consumer group metadata generation id at the time of polling this record
*/
public int getConsumerGroupGenerationId() {
return consumerGroupGenerationId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
import io.smallrye.reactive.messaging.kafka.impl.KafkaRecordHelper;
import io.smallrye.reactive.messaging.kafka.impl.ce.KafkaCloudEventHelper;
import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
Expand All @@ -34,7 +35,8 @@ public IncomingKafkaRecord(ConsumerRecord<K, T> record,
boolean cloudEventEnabled,
boolean tracingEnabled) {
this.commitHandler = commitHandler;
this.kafkaMetadata = new IncomingKafkaRecordMetadata<>(record, channel, index);
int generationId = KafkaRecordHelper.extractGenerationIdFrom(record);
this.kafkaMetadata = new IncomingKafkaRecordMetadata<>(record, channel, index, generationId);

ArrayList<Object> meta = new ArrayList<>();
meta.add(this.kafkaMetadata);
Expand Down Expand Up @@ -102,6 +104,10 @@ public long getOffset() {
return kafkaMetadata.getOffset();
}

public int getConsumerGroupGenerationId() {
return kafkaMetadata.getConsumerGroupGenerationId();
}

@Override
public Metadata getMetadata() {
return metadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ public IncomingKafkaRecordBatch(ConsumerRecords<K, T> records, String channel, i
this.incomingRecords = Collections.unmodifiableList(incomingRecords);
this.latestOffsetRecords = Collections.unmodifiableMap(latestOffsetRecords);
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
latestOffsetRecords.forEach((e, r) -> offsets.put(e, new OffsetAndMetadata(r.getOffset())));
this.metadata = captureContextMetadata(new IncomingKafkaRecordBatchMetadata<>(records, channel, index, offsets));
int generationId = -1;
for (var entry : latestOffsetRecords.entrySet()) {
generationId = entry.getValue().getConsumerGroupGenerationId();
offsets.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().getOffset()));
}
this.metadata = captureContextMetadata(
new IncomingKafkaRecordBatchMetadata<>(records, channel, index, offsets, generationId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,8 @@ public interface KafkaExceptions {

@Message(id = 18024, value = "Invalid Kafka incoming configuration for channel `%s`, `assign-seek` portion `%s` is invalid. If topic portion is not present, a single `topic` configuration is needed.")
IllegalArgumentException invalidAssignSeekTopic(String channel, String assignSeek);

@Message(id = 18025, value = "Partition rebalance during exactly-once processing for channel `%s`: current consumer group metadata: %s, generation id for message: %s")
IllegalStateException exactlyOnceProcessingRebalance(String channel, String groupMetadata, String generationId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -353,4 +353,7 @@ void delayedRetryTopic(String channel, Collection<String> retryTopics, int maxRe
@Message(id = 18283, value = "Failure from channel `%s` request/reply consumer for topic `%s`")
void requestReplyConsumerFailure(String channel, String replyTopic, @Cause Throwable throwable);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 18284, value = "Transaction commit failed for channel, aborting the transaction")
void transactionCommitFailed(@Cause Throwable throwable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
Expand All @@ -14,6 +16,8 @@

public class KafkaRecordHelper {

public static final String SMALLRYE_GENERATION_ID = "__smallrye_generationId";

public static Headers getHeaders(OutgoingKafkaRecordMetadata<?> om,
IncomingKafkaRecordMetadata<?, ?> im,
RuntimeKafkaSinkConfiguration configuration) {
Expand Down Expand Up @@ -47,4 +51,18 @@ public static Headers getHeaders(OutgoingKafkaRecordMetadata<?> om,
public static boolean isNotBlank(String s) {
return s != null && !s.trim().isEmpty();
}

public static void addGenerationIdToHeaders(ConsumerRecord<?, ?> record, ConsumerGroupMetadata metadata) {
record.headers().add(SMALLRYE_GENERATION_ID, Integer.toString(metadata.generationId()).getBytes());
}

public static int extractGenerationIdFrom(ConsumerRecord<?, ?> record) {
Header header = record.headers().lastHeader(SMALLRYE_GENERATION_ID);
if (header != null) {
record.headers().remove(SMALLRYE_GENERATION_ID);
return Integer.parseInt(new String(header.value()));
}
return -1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.function.UnaryOperator;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import io.smallrye.mutiny.Uni;
Expand Down Expand Up @@ -88,6 +90,11 @@ public KafkaRecordStreamSubscription(
if (log.isTraceEnabled()) {
log.tracef("Adding %s messages to the queue", cr.count());
}
// Called on the polling thread
ConsumerGroupMetadata metadata = client.unwrap().groupMetadata();
for (ConsumerRecord<K, V> r : cr) {
KafkaRecordHelper.addGenerationIdToHeaders(r, metadata);
}
enqueueFunction.accept(cr, queue);
return cr;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class ReactiveKafkaConsumer<K, V> implements io.smallrye.reactive.messagi
private final KafkaRecordStream<K, V> stream;
private final KafkaRecordBatchStream<K, V> batchStream;
private final Map<String, Object> kafkaConfiguration;
private final AtomicReference<ConsumerGroupMetadata> consumerGroupMetadataRef = new AtomicReference<>();

public ReactiveKafkaConsumer(KafkaConnectorIncomingConfiguration config,
Instance<DeserializationFailureHandler<?>> deserializationFailureHandlers,
Expand Down Expand Up @@ -143,6 +144,13 @@ public void setRebalanceListener(KafkaConsumerRebalanceListener listener, KafkaC
}
}

public void setCachedConsumerGroupMetadata() {
Consumer<K, V> consumer = consumerRef.get();
if (consumer != null) {
consumerGroupMetadataRef.set(consumer.groupMetadata());
}
}

public String getConsumerGroup() {
return consumerGroup;
}
Expand Down Expand Up @@ -318,7 +326,8 @@ public Uni<Void> resume() {
@Override
@CheckReturnValue
public Uni<ConsumerGroupMetadata> consumerGroupMetadata() {
return runOnPollingThread((Function<Consumer<K, V>, ConsumerGroupMetadata>) Consumer::groupMetadata);
return Uni.createFrom().item(consumerGroupMetadataRef::get)
.onItem().ifNull().switchTo(() -> runOnPollingThread((Consumer<K, V> c) -> c.groupMetadata()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public WrappedConsumerRebalanceListener(String consumerGroup,
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.executingConsumerRevokedRebalanceListener(consumerGroup);
try {
reactiveKafkaConsumer.setCachedConsumerGroupMetadata();
reactiveKafkaConsumer.removeFromQueueRecordsFromTopicPartitions(partitions);
commitHandler.partitionsRevoked(partitions);
if (listener != null) {
Expand All @@ -63,6 +64,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
try {
reactiveKafkaConsumer.setCachedConsumerGroupMetadata();
if (reactiveKafkaConsumer.isPaused()) {
reactiveKafkaConsumer.unwrap().pause(partitions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,27 @@ public <R> Uni<R> withTransaction(Message<?> message, Function<TransactionalEmit
try {
String channel;
Map<TopicPartition, OffsetAndMetadata> offsets;
int generationId;

Optional<IncomingKafkaRecordBatchMetadata> batchMetadata = message
.getMetadata(IncomingKafkaRecordBatchMetadata.class);
Optional<IncomingKafkaRecordMetadata> recordMetadata = message.getMetadata(IncomingKafkaRecordMetadata.class);
if (batchMetadata.isPresent()) {
IncomingKafkaRecordBatchMetadata<?, ?> metadata = batchMetadata.get();
channel = metadata.getChannel();
generationId = metadata.getConsumerGroupGenerationId();
offsets = metadata.getOffsets().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue().offset() + 1)));
} else if (recordMetadata.isPresent()) {
IncomingKafkaRecordMetadata<?, ?> metadata = recordMetadata.get();
channel = metadata.getChannel();
offsets = new HashMap<>();
generationId = metadata.getConsumerGroupGenerationId();
offsets.put(TopicPartitions.getTopicPartition(metadata.getTopic(), metadata.getPartition()),
new OffsetAndMetadata(metadata.getOffset() + 1));
} else {
throw KafkaExceptions.ex.noKafkaMetadataFound(message);
}

List<KafkaConsumer<Object, Object>> consumers = clientService.getConsumers(channel);
if (consumers.isEmpty()) {
throw KafkaExceptions.ex.unableToFindConsumerForChannel(channel);
Expand All @@ -107,8 +109,20 @@ public <R> Uni<R> withTransaction(Message<?> message, Function<TransactionalEmit
if (currentTransaction == null) {
return new Transaction<R>(
/* before commit */
consumer.consumerGroupMetadata()
.chain(groupMetadata -> producer.sendOffsetsToTransaction(offsets, groupMetadata)),
consumer.consumerGroupMetadata().chain(groupMetadata -> {
// if the generationId is the same, we can send the offsets to tx
if (groupMetadata.generationId() == generationId) {
// stay on the polling thread
producer.unwrap().sendOffsetsToTransaction(offsets, groupMetadata);
return Uni.createFrom().voidItem();
} else {
// abort the transaction if the generationId is different,
// after abort will set the consumer position to the last committed positions
return Uni.createFrom().failure(
KafkaExceptions.ex.exactlyOnceProcessingRebalance(channel, groupMetadata.toString(),
String.valueOf(generationId)));
}
}),
r -> Uni.createFrom().item(r),
VOID_UNI,
/* after abort */
Expand Down Expand Up @@ -179,7 +193,10 @@ private Uni<R> executeInTransaction(Function<TransactionalEmitter<T>, Uni<R>> wo
.onCancellation().call(() -> abort())
// when there was no exception,
// commit or rollback the transaction
.call(() -> abort ? abort() : commit())
.call(() -> abort ? abort() : commit().onFailure().recoverWithUni(throwable -> {
KafkaLogging.log.transactionCommitFailed(throwable);
return abort();
}))
// finally, call after commit or after abort callbacks
.onFailure().recoverWithUni(throwable -> afterAbort.apply(throwable))
.onItem().transformToUni(result -> afterCommit.apply(result));
Expand Down
Loading

0 comments on commit c1e4e23

Please sign in to comment.