From ff40ce1221ce8cdee8ff71f0f0e023b089628f66 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 3 Jul 2024 18:36:55 -0700 Subject: [PATCH 1/2] Update integration test --- .../metrics/OpenTelemetrySanityTest.java | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java index 38afc1f127d18..31e600f3aa812 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/metrics/OpenTelemetrySanityTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.tests.integration.metrics; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.awaitility.Awaitility.waitAtMost; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -37,7 +39,6 @@ import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; import org.apache.pulsar.tests.integration.topologies.PulsarTestBase; -import org.awaitility.Awaitility; import org.testng.annotations.Test; public class OpenTelemetrySanityTest { @@ -71,17 +72,17 @@ public void testOpenTelemetryMetricsOtlpExport() throws Exception { // TODO: Validate cluster name and service version are present once // /~https://github.com/open-telemetry/opentelemetry-java/issues/6108 is solved. var metricName = "queueSize_ratio"; // Sent automatically by the OpenTelemetry SDK. - Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> { + waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> { var metrics = getMetricsFromPrometheus( openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT); return !metrics.findByNameAndLabels(metricName, "job", PulsarBrokerOpenTelemetry.SERVICE_NAME).isEmpty(); }); - Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> { + waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> { var metrics = getMetricsFromPrometheus( openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT); return !metrics.findByNameAndLabels(metricName, "job", PulsarProxyOpenTelemetry.SERVICE_NAME).isEmpty(); }); - Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> { + waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> { var metrics = getMetricsFromPrometheus( openTelemetryCollectorContainer, OpenTelemetryCollectorContainer.PROMETHEUS_EXPORTER_PORT); return !metrics.findByNameAndLabels(metricName, "job", PulsarWorkerOpenTelemetry.SERVICE_NAME).isEmpty(); @@ -120,30 +121,34 @@ public void testOpenTelemetryMetricsPrometheusExport() throws Exception { pulsarCluster.start(); pulsarCluster.setupFunctionWorkers(PulsarTestBase.randomName(), FunctionRuntimeType.PROCESS, 1); - var metricName = "target_info"; // Sent automatically by the OpenTelemetry SDK. - Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> { - var metrics = getMetricsFromPrometheus(pulsarCluster.getBroker(0), prometheusExporterPort); - return !metrics.findByNameAndLabels(metricName, + var targetInfoMetricName = "target_info"; // Sent automatically by the OpenTelemetry SDK. + var cpuCountMetricName = "jvm_cpu_count"; // Configured by the OpenTelemetryService. + waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> { + var expectedMetrics = new String[] {targetInfoMetricName, cpuCountMetricName, "pulsar_broker_topic_producer_count"}; + var actualMetrics = getMetricsFromPrometheus(pulsarCluster.getBroker(0), prometheusExporterPort); + assertThat(expectedMetrics).allMatch(expectedMetric -> !actualMetrics.findByNameAndLabels(expectedMetric, Pair.of("pulsar_cluster", clusterName), Pair.of("service_name", PulsarBrokerOpenTelemetry.SERVICE_NAME), Pair.of("service_version", PulsarVersion.getVersion()), - Pair.of("host_name", pulsarCluster.getBroker(0).getHostname())).isEmpty(); + Pair.of("host_name", pulsarCluster.getBroker(0).getHostname())).isEmpty()); }); - Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> { - var metrics = getMetricsFromPrometheus(pulsarCluster.getProxy(), prometheusExporterPort); - return !metrics.findByNameAndLabels(metricName, + waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> { + var expectedMetrics = new String[] {targetInfoMetricName, cpuCountMetricName}; + var actualMetrics = getMetricsFromPrometheus(pulsarCluster.getProxy(), prometheusExporterPort); + assertThat(expectedMetrics).allMatch(expectedMetric -> !actualMetrics.findByNameAndLabels(expectedMetric, Pair.of("pulsar_cluster", clusterName), Pair.of("service_name", PulsarProxyOpenTelemetry.SERVICE_NAME), Pair.of("service_version", PulsarVersion.getVersion()), - Pair.of("host_name", pulsarCluster.getProxy().getHostname())).isEmpty(); + Pair.of("host_name", pulsarCluster.getProxy().getHostname())).isEmpty()); }); - Awaitility.waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).until(() -> { - var metrics = getMetricsFromPrometheus(pulsarCluster.getAnyWorker(), prometheusExporterPort); - return !metrics.findByNameAndLabels(metricName, + waitAtMost(90, TimeUnit.SECONDS).ignoreExceptions().pollInterval(1, TimeUnit.SECONDS).untilAsserted(() -> { + var expectedMetrics = new String[] {targetInfoMetricName, cpuCountMetricName}; + var actualMetrics = getMetricsFromPrometheus(pulsarCluster.getAnyWorker(), prometheusExporterPort); + assertThat(expectedMetrics).allMatch(expectedMetric -> !actualMetrics.findByNameAndLabels(expectedMetric, Pair.of("pulsar_cluster", clusterName), Pair.of("service_name", PulsarWorkerOpenTelemetry.SERVICE_NAME), Pair.of("service_version", PulsarVersion.getVersion()), - Pair.of("host_name", pulsarCluster.getAnyWorker().getHostname())).isEmpty(); + Pair.of("host_name", pulsarCluster.getAnyWorker().getHostname())).isEmpty()); }); } From aebc1324fa9db8c6308ae07b406ea3bcd7dcc604 Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 3 Jul 2024 14:15:56 -0700 Subject: [PATCH 2/2] Allow pulsar.cluster attribute to be copied to Prometheus labels --- .../opentelemetry/OpenTelemetryService.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java index b32d353eb5ae7..e6c6d95273e0e 100644 --- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java +++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryService.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.annotations.VisibleForTesting; import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.exporter.prometheus.PrometheusHttpServer; import io.opentelemetry.instrumentation.runtimemetrics.java17.RuntimeMetrics; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk; @@ -97,6 +98,20 @@ public OpenTelemetryService(String clusterName, return resource.merge(resourceBuilder.build()); }); + sdkBuilder.addMetricReaderCustomizer((metricReader, configProperties) -> { + if (metricReader instanceof PrometheusHttpServer prometheusHttpServer) { + // At this point, the server is already started. We need to close it and create a new one with the + // correct resource attributes filter. + prometheusHttpServer.close(); + + // Allow all resource attributes to be exposed. + return prometheusHttpServer.toBuilder() + .setAllowedResourceAttributesFilter(s -> true) + .build(); + } + return metricReader; + }); + if (builderCustomizer != null) { builderCustomizer.accept(sdkBuilder); }