Skip to content

Commit

Permalink
Kafka Consumer cache groupMetadata for accessing without going throug…
Browse files Browse the repository at this point in the history
…h polling thread
  • Loading branch information
ozangunalp committed Jul 17, 2024
1 parent bd181f0 commit 61a492b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class ReactiveKafkaConsumer<K, V> implements io.smallrye.reactive.messagi
private final KafkaRecordStream<K, V> stream;
private final KafkaRecordBatchStream<K, V> batchStream;
private final Map<String, Object> kafkaConfiguration;
private final AtomicReference<ConsumerGroupMetadata> consumerGroupMetadataRef = new AtomicReference<>();

public ReactiveKafkaConsumer(KafkaConnectorIncomingConfiguration config,
Instance<DeserializationFailureHandler<?>> deserializationFailureHandlers,
Expand Down Expand Up @@ -143,6 +144,13 @@ public void setRebalanceListener(KafkaConsumerRebalanceListener listener, KafkaC
}
}

public void setCachedConsumerGroupMetadata() {
Consumer<K, V> consumer = consumerRef.get();
if (consumer != null) {
consumerGroupMetadataRef.set(consumer.groupMetadata());
}
}

public String getConsumerGroup() {
return consumerGroup;
}
Expand Down Expand Up @@ -318,7 +326,8 @@ public Uni<Void> resume() {
@Override
@CheckReturnValue
public Uni<ConsumerGroupMetadata> consumerGroupMetadata() {
return runOnPollingThread((Function<Consumer<K, V>, ConsumerGroupMetadata>) Consumer::groupMetadata);
return Uni.createFrom().item(consumerGroupMetadataRef::get)
.onItem().ifNull().switchTo(() -> runOnPollingThread((Consumer<K, V> c) -> c.groupMetadata()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public WrappedConsumerRebalanceListener(String consumerGroup,
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.executingConsumerRevokedRebalanceListener(consumerGroup);
try {
reactiveKafkaConsumer.setCachedConsumerGroupMetadata();
reactiveKafkaConsumer.removeFromQueueRecordsFromTopicPartitions(partitions);
commitHandler.partitionsRevoked(partitions);
if (listener != null) {
Expand All @@ -63,6 +64,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
try {
reactiveKafkaConsumer.setCachedConsumerGroupMetadata();
if (reactiveKafkaConsumer.isPaused()) {
reactiveKafkaConsumer.unwrap().pause(partitions);
}
Expand Down

0 comments on commit 61a492b

Please sign in to comment.