Skip to content

Commit

Permalink
add support for Observation propagation in kotlin gRPC coroutine server
Browse files Browse the repository at this point in the history
This commit contains changes that allow to capture current observation when using kotlin gRPC coroutine server.

In order to properly propagate current Observation to a coroutine server method, we need to propagate Observation as a context element by extending `CoroutineContextServerInterceptor`.

Moreover, current Observation needs to be somehow allowed to capture by `CoroutineContextServerInterceptor`. To do this, we need to open current Observation scope inside `ObservationGrpcServerInterceptor`. It is important to keep these two interceptors in a proper order - first, we need to open current Observation scope, and later create a context element based on it.

Fixes: micrometer-metrics#4218
  • Loading branch information
Aleksander Brzozowski committed Oct 27, 2023
1 parent e70cc86 commit 005fdf6
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 2 deletions.
1 change: 1 addition & 0 deletions dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def VERSIONS = [
libs.grpcStubs,
libs.grpcAlts,
libs.grpcTestingProto,
libs.grpcKotlinStub,
libs.gmetric4j,
libs.prometheusClient,
libs.prometheusPushgateway,
Expand Down
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ehcache3 = "3.10.8"
gmetric4j = "1.0.10"
google-cloud-monitoring = "3.29.0"
grpc = "1.58.0"
grpcKotlin = "1.4.0"
guava = "32.1.2-jre"
guice = "5.1.0"
h2 = "2.2.224"
Expand Down Expand Up @@ -113,6 +114,7 @@ grpcServices = { module = "io.grpc:grpc-services", version.ref = "grpc" }
grpcStubs = { module = "io.grpc:grpc-stubs", version.ref = "grpc" }
grpcAlts = { module = "io.grpc:grpc-alts", version.ref = "grpc" }
grpcTestingProto = { module = "io.grpc:grpc-testing-proto", version.ref = "grpc" }
grpcKotlinStub = { module = "io.grpc:grpc-kotlin-stub", version.ref = "grpcKotlin" }
guava = { module = "com.google.guava:guava", version.ref = "guava" }
guice = { module = "com.google.inject:guice", version.ref = "guice" }
h2 = { module = "com.h2database:h2", version.ref = "h2" }
Expand Down
1 change: 1 addition & 0 deletions micrometer-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ dependencies {
optionalApi 'org.apache.tomcat.embed:tomcat-embed-core'
optionalApi 'org.glassfish.jersey.core:jersey-server'
optionalApi 'io.grpc:grpc-api'
optionalApi 'io.grpc:grpc-kotlin-stub'
optionalApi 'io.netty:netty-transport'
// jakarta JMS
optionalApi 'jakarta.jms:jakarta.jms-api'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,

ObservationGrpcServerCall<ReqT, RespT> serverCall = new ObservationGrpcServerCall<>(call, observation);

try {
try (Observation.Scope scope = observation.openScope()) {
Listener<ReqT> result = next.startCall(serverCall, headers);
return new ObservationGrpcServerCallListener<>(result, observation);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.micrometer.core.instrument.kotlin

import io.grpc.Metadata
import io.grpc.ServerCall
import io.grpc.kotlin.CoroutineContextServerInterceptor
import io.micrometer.observation.ObservationRegistry
import kotlin.coroutines.CoroutineContext

/**
* This interceptor is meant to propagate observation context to a kotlin coroutine gRPC server method.
*
* Usage:
*
* ```
* val server = ServerBuilder.forPort(8080)
* .intercept(ObservationCoroutineContextServerInterceptor(observationRegistry))
* .intercept(ObservationGrpcServerInterceptor(observationRegistry))
* .build();
* server.start()
* ```
*
* Please remember that order of interceptors matters, and it has to be the same as it is in the example above.
*
* @since ??? TODO should we set this property?
*/
class ObservationCoroutineContextServerInterceptor(
private val observationRegistry: ObservationRegistry,
) : CoroutineContextServerInterceptor() {
override fun coroutineContext(call: ServerCall<*, *>, headers: Metadata): CoroutineContext {
return observationRegistry.asContextElement()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
Expand Down Expand Up @@ -81,13 +85,15 @@ class GrpcObservationTest {

ObservationGrpcClientInterceptor clientInterceptor;

ObservationRegistry observationRegistry;

@BeforeEach
void setUp() {
serverHandler = new ContextAndEventHoldingObservationHandler<>(GrpcServerObservationContext.class);
clientHandler = new ContextAndEventHoldingObservationHandler<>(GrpcClientObservationContext.class);
observationRegistry = ObservationRegistry.create();

MeterRegistry meterRegistry = new SimpleMeterRegistry();
ObservationRegistry observationRegistry = ObservationRegistry.create();
observationRegistry.observationConfig()
.observationHandler(new ObservationTextPublisher())
.observationHandler(new DefaultMeterObservationHandler(meterRegistry))
Expand Down Expand Up @@ -433,6 +439,60 @@ public void onCompleted() {

}

@Nested
class WithObservationAwareInterceptor {

private static class ObservationAwareServerInterceptor implements ServerInterceptor {

Observation lastObservation;

private final ObservationRegistry observationRegistry;

private ObservationAwareServerInterceptor(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
this.lastObservation = observationRegistry.getCurrentObservation();
return next.startCall(call, headers);
}

}

ObservationAwareServerInterceptor scopeAwareServerInterceptor;

@BeforeEach
void setCustomInterceptor() throws Exception {
scopeAwareServerInterceptor = new ObservationAwareServerInterceptor(observationRegistry);

EchoService echoService = new EchoService();
server = InProcessServerBuilder.forName("sample")
.addService(echoService)
.intercept(scopeAwareServerInterceptor)
.intercept(serverInterceptor)
.build();
server.start();

channel = InProcessChannelBuilder.forName("sample").intercept(clientInterceptor).build();
}

@Test
void observationShouldBeCapturedByInterceptor() {
SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub(channel);

SimpleRequest request = SimpleRequest.newBuilder().setRequestMessage("Hello").build();
stub.unaryRpc(request);

assertThat(scopeAwareServerInterceptor.lastObservation).isNotNull().satisfies((observation -> {
assertThat(observation.getContext().getContextualName())
.isEqualTo("grpc.testing.SimpleService/UnaryRpc");
}));
}

}

// perform server context verification on basic information
void verifyServerContext(String serviceName, String methodName, String contextualName, MethodType methodType) {
assertThat(serverHandler.getContext()).isNotNull().satisfies((serverContext) -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.micrometer.core.instrument.kotlin.binder.grpc

import io.grpc.ManagedChannel
import io.grpc.MethodDescriptor
import io.grpc.Server
import io.grpc.ServerServiceDefinition
import io.grpc.inprocess.InProcessChannelBuilder
import io.grpc.inprocess.InProcessServerBuilder
import io.grpc.kotlin.AbstractCoroutineServerImpl
import io.grpc.kotlin.ServerCalls
import io.grpc.stub.annotations.RpcMethod
import io.grpc.testing.protobuf.SimpleRequest
import io.grpc.testing.protobuf.SimpleResponse
import io.grpc.testing.protobuf.SimpleServiceGrpc
import io.micrometer.core.instrument.binder.grpc.ObservationGrpcServerInterceptor
import io.micrometer.core.instrument.kotlin.ObservationCoroutineContextServerInterceptor
import io.micrometer.observation.Observation
import io.micrometer.observation.ObservationRegistry
import io.micrometer.observation.ObservationTextPublisher
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.ThrowingConsumer
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test

class GrpcCoroutinesTest {

val observationRegistry: ObservationRegistry = ObservationRegistry.create()
val echoServiceCoroutine: EchoServiceCoroutine = EchoServiceCoroutine(observationRegistry)
lateinit var server: Server
lateinit var channel: ManagedChannel

@BeforeEach
fun setUp() {
server = InProcessServerBuilder.forName("sample")
.intercept(ObservationCoroutineContextServerInterceptor(observationRegistry))
.intercept(ObservationGrpcServerInterceptor(observationRegistry))
.addService(echoServiceCoroutine)
.build()
server.start()
channel = InProcessChannelBuilder.forName("sample").build()
}

@AfterEach
fun cleanUp() {
channel.shutdownNow()
server.shutdownNow()
}

@Test
fun `unary rpc should propagate observation`() {
val stub = SimpleServiceGrpc.newBlockingStub(channel)
val request = SimpleRequest.newBuilder()
.setRequestMessage("hello")
.build()
observationRegistry.observationConfig()
.observationHandler(ObservationTextPublisher())

stub.unaryRpc(request)

assertThat<Observation>(echoServiceCoroutine.lastObservation).isNotNull()
.satisfies(
ThrowingConsumer { observation: Observation ->
assertThat(observation.getContext().contextualName)
.isEqualTo("grpc.testing.SimpleService/UnaryRpc")
},
)
}

// This service has the same rpc method that the one defined in SimpleServiceGrpc
class EchoServiceCoroutine(private val observationRegistry: ObservationRegistry) : AbstractCoroutineServerImpl() {

var lastObservation: Observation? = null

@RpcMethod(
fullMethodName = "${SimpleServiceGrpc.SERVICE_NAME}/UnaryRpc",
requestType = SimpleRequest::class,
responseType = SimpleResponse::class,
methodType = MethodDescriptor.MethodType.UNARY,
)
fun unaryRpc(request: SimpleRequest): SimpleResponse {
lastObservation = observationRegistry.currentObservation
return SimpleResponse.newBuilder()
.setResponseMessage(request.getRequestMessage())
.build()
}

override fun bindService(): ServerServiceDefinition {
return ServerServiceDefinition.builder(SimpleServiceGrpc.SERVICE_NAME)
.addMethod(
ServerCalls.unaryServerMethodDefinition(
context = context,
descriptor = SimpleServiceGrpc.getUnaryRpcMethod(),
implementation = ::unaryRpc,
),
).build()
}
}
}

0 comments on commit 005fdf6

Please sign in to comment.