Skip to content

Commit

Permalink
Add TaskScheduler option for Kafka metrics components
Browse files Browse the repository at this point in the history
Related to: micrometer-metrics/micrometer#4976

* Introduce a `KafkaMetricsSupport` to have a common API for the `MeterBinder` registration
* Rework `MicrometerConsumerListener`, `MicrometerProducerListener` and `KafkaStreamsMicrometerListener`
to extend the `KafkaMetricsSupport` which allows to minimize code duplication
* Expose ctors on those listeners based on the `TaskScheduler`
* Implement a simple `ScheduledExecutorServiceAdapter` to adapt a `TaskScheduler`
to the expected by the `KafkaMetrics` `ScheduledExecutorService`
  • Loading branch information
artembilan committed Oct 17, 2024
1 parent 704e253 commit 7230c23
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ When using `KafkaStreamsCustomizer` it is now possible to return a custom implem

[[x33-kafka-headers-for-batch-listeners]]
=== KafkaHeaders.DELIVERY_ATTEMPT for batch listeners

When using a `BatchListener`, the `ConsumerRecord` can have the `KafkaHeaders.DELIVERY_ATTMPT` header in its headers fields.
If the `DeliveryAttemptAwareRetryListener` is set to error handler as retry listener, each `ConsumerRecord` has delivery attempt header.
For more details, see xref:kafka/annotation-error-handling.adoc#delivery-attempts-header-for-batch-listener[kafka-headers-for-batch-listener].
For more details, see xref:kafka/annotation-error-handling.adoc#delivery-attempts-header-for-batch-listener[Kafka Headers for Batch Listener].

[[x33-task-scheduler-for-kafka-metrics]]
=== Kafka Metrics Listeners and `TaskScheduler`

The `MicrometerProducerListener`, `MicrometerConsumerListener` and `KafkaStreamsMicrometerListener` can now be configured with a `TaskScheduler`.
See `KafkaMetrics` JavaDocs and xref:kafka/micrometer.adoc[Micrometer Support] for more information.
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.core;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;

import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;


import io.micrometer.core.instrument.ImmutableTag;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;

/**
* An abstract class to manage {@link io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics}.
*
* @param <C> the Kafka Client type.
*
* @author Artem Bilan
*
* @since 3.3
*/
public abstract class KafkaMetricsSupport<C> {

protected final MeterRegistry meterRegistry;

protected final List<Tag> tags;

@Nullable
protected final ScheduledExecutorService scheduler;

private final Map<String, MeterBinder> metrics = new HashMap<>();

/**
* Construct an instance with the provided registry.
*
* @param meterRegistry the registry.
*/
protected KafkaMetricsSupport(MeterRegistry meterRegistry) {
this(meterRegistry, Collections.emptyList());
}

/**
* Construct an instance with the provided {@link MeterRegistry} and {@link TaskScheduler}.
*
* @param meterRegistry the registry.
* @param taskScheduler the task scheduler.
*/
protected KafkaMetricsSupport(MeterRegistry meterRegistry, TaskScheduler taskScheduler) {
this(meterRegistry, Collections.emptyList(), taskScheduler);
}

/**
* Construct an instance with the provided {@link MeterRegistry} and tags.
*
* @param meterRegistry the registry.
* @param tags the tags.
*/
protected KafkaMetricsSupport(MeterRegistry meterRegistry, List<Tag> tags) {
Assert.notNull(meterRegistry, "The 'meterRegistry' cannot be null");
this.meterRegistry = meterRegistry;
this.tags = tags;
this.scheduler = null;
}

/**
* Construct an instance with the provided {@link MeterRegistry}, tags and {@link TaskScheduler}.
*
* @param meterRegistry the registry.
* @param tags the tags.
* @param taskScheduler the task scheduler.
*/
protected KafkaMetricsSupport(MeterRegistry meterRegistry, List<Tag> tags, TaskScheduler taskScheduler) {
Assert.notNull(meterRegistry, "The 'meterRegistry' cannot be null");
Assert.notNull(taskScheduler, "The 'taskScheduler' cannot be null");
this.meterRegistry = meterRegistry;
this.tags = tags;
this.scheduler = obtainScheduledExecutorService(taskScheduler);
}

protected void clientAdded(String id, C client) {
if (!this.metrics.containsKey(id)) {
List<Tag> clientTags = new ArrayList<>(this.tags);
clientTags.add(new ImmutableTag("spring.id", id));
this.metrics.put(id, createClientMetrics(client, clientTags));
this.metrics.get(id).bindTo(this.meterRegistry);
}
}

protected MeterBinder createClientMetrics(C client, List<Tag> tags) {
if (client instanceof Consumer<?, ?> consumer) {
return createConsumerMetrics(consumer, tags);
}
else if (client instanceof Producer<?, ?> producer) {
return createProducerMetrics(producer, tags);
}
else if (client instanceof AdminClient admin) {
return createAdminMetrics(admin, tags);
}

throw new IllegalArgumentException("Unsupported client type: " + client.getClass());
}

private KafkaClientMetrics createConsumerMetrics(Consumer<?, ?> consumer, List<Tag> tags) {
return this.scheduler != null
? new KafkaClientMetrics(consumer, tags, this.scheduler)
: new KafkaClientMetrics(consumer, tags);
}

private KafkaClientMetrics createProducerMetrics(Producer<?, ?> producer, List<Tag> tags) {
return this.scheduler != null
? new KafkaClientMetrics(producer, tags, this.scheduler)
: new KafkaClientMetrics(producer, tags);
}

private KafkaClientMetrics createAdminMetrics(AdminClient adminClient, List<Tag> tags) {
return this.scheduler != null
? new KafkaClientMetrics(adminClient, tags, this.scheduler)
: new KafkaClientMetrics(adminClient, tags);
}

protected void clientRemoved(String id, C client) {
AutoCloseable removed = (AutoCloseable) this.metrics.remove(id);
if (removed != null) {
try {
removed.close();
}
catch (Exception ex) {
ReflectionUtils.rethrowRuntimeException(ex);
}
}
}

private static ScheduledExecutorService obtainScheduledExecutorService(TaskScheduler taskScheduler) {
if (taskScheduler instanceof ThreadPoolTaskScheduler threadPoolTaskScheduler) {
return threadPoolTaskScheduler.getScheduledExecutor();
}

return new ScheduledExecutorServiceAdapter(taskScheduler);
}

private static final class ScheduledExecutorServiceAdapter extends ScheduledThreadPoolExecutor {

private final TaskScheduler delegate;

private ScheduledExecutorServiceAdapter(TaskScheduler delegate) {
super(0);
this.delegate = delegate;
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {

return this.delegate.scheduleAtFixedRate(command,
Instant.now().plus(initialDelay, unit.toChronoUnit()),
Duration.of(period, unit.toChronoUnit()));
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,15 +16,14 @@

package org.springframework.kafka.core;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


import org.apache.kafka.clients.consumer.Consumer;

import io.micrometer.core.instrument.ImmutableTag;
import org.springframework.scheduling.TaskScheduler;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
Expand All @@ -36,16 +35,12 @@
* @param <V> the value type.
*
* @author Gary Russell
* @since 2.5
* @author Artem Bilan
*
* @since 2.5
*/
public class MicrometerConsumerListener<K, V> implements ConsumerFactory.Listener<K, V> {

private final MeterRegistry meterRegistry;

private final List<Tag> tags;

private final Map<String, KafkaClientMetrics> metrics = new HashMap<>();
public class MicrometerConsumerListener<K, V> extends KafkaMetricsSupport<Consumer<K, V>>
implements ConsumerFactory.Listener<K, V> {

/**
* Construct an instance with the provided registry.
Expand All @@ -55,32 +50,44 @@ public MicrometerConsumerListener(MeterRegistry meterRegistry) {
this(meterRegistry, Collections.emptyList());
}

/**
* Construct an instance with the provided registry and task scheduler.
* @param meterRegistry the registry.
* @param taskScheduler the task scheduler.
* @since 3.3
*/
public MicrometerConsumerListener(MeterRegistry meterRegistry, TaskScheduler taskScheduler) {
this(meterRegistry, Collections.emptyList(), taskScheduler);
}

/**
* Construct an instance with the provided registry and tags.
* @param meterRegistry the registry.
* @param tags the tags.
*/
public MicrometerConsumerListener(MeterRegistry meterRegistry, List<Tag> tags) {
this.meterRegistry = meterRegistry;
this.tags = tags;
super(meterRegistry, tags);
}

/**
* Construct an instance with the provided registry, tags and task scheduler.
* @param meterRegistry the registry.
* @param tags the tags.
* @param taskScheduler the task scheduler.
* @since 3.3
*/
public MicrometerConsumerListener(MeterRegistry meterRegistry, List<Tag> tags, TaskScheduler taskScheduler) {
super(meterRegistry, tags, taskScheduler);
}

@Override
public synchronized void consumerAdded(String id, Consumer<K, V> consumer) {
if (!this.metrics.containsKey(id)) {
List<Tag> consumerTags = new ArrayList<>(this.tags);
consumerTags.add(new ImmutableTag("spring.id", id));
this.metrics.put(id, new KafkaClientMetrics(consumer, consumerTags));
this.metrics.get(id).bindTo(this.meterRegistry);
}
clientAdded(id, consumer);
}

@Override
public synchronized void consumerRemoved(String id, Consumer<K, V> consumer) {
KafkaClientMetrics removed = this.metrics.remove(id);
if (removed != null) {
removed.close();
}
clientRemoved(id, consumer);
}

}
Loading

0 comments on commit 7230c23

Please sign in to comment.