Skip to content

Commit

Permalink
Add instrumentation for rocketmq (#2263)
Browse files Browse the repository at this point in the history
* add rocketmq support

* merge main

* modify tests

* modify tests

* modify tests

* modify style

* modify style

* modify style

* modify style

* modify tests

* modify tests

* modify tests

* modify tests

* Use hooks to register in the iavaagent instrumentation

* Use hooks to register in the iavaagent instrumentation

* Use hooks to register in the iavaagent instrumentation

* Use hooks to register in the iavaagent instrumentation

* Use hooks to register in the iavaagent instrumentation

* Use hooks to register in the iavaagent instrumentation

* Fix

* Revert "Use hooks to register in the iavaagent instrumentation"

This reverts commit 558acf7

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Update settings.gradle

Co-authored-by: Mateusz Rzeszutek <mrzeszutek@splunk.com>

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

Co-authored-by: Anuraag Agrawal <anuraaga@gmail.com>
Co-authored-by: Mateusz Rzeszutek <mrzeszutek@splunk.com>
  • Loading branch information
3 people authored Mar 11, 2021
1 parent 8e5e453 commit ee66554
Show file tree
Hide file tree
Showing 22 changed files with 1,124 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apply from: "$rootDir/gradle/instrumentation.gradle"

muzzle {
pass {
group = "org.apache.rocketmq"
module = 'rocketmq-client'
versions = "[4.8.0,)"
assertInverse = true
}
}

dependencies {
library group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.8.0'
implementation project(':instrumentation:rocketmq-client-4.8:library')
testImplementation project(':instrumentation:rocketmq-client-4.8:testing')

}

tasks.withType(Test) {
jvmArgs "-Dotel.instrumentation.rocketmq-client.experimental-span-attributes=true"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmq;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.rocketmq.RocketMqTracing;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.hook.SendMessageHook;

public final class RocketMqClientHooks {
private static final RocketMqTracing TRACING =
RocketMqTracing.newBuilder(GlobalOpenTelemetry.get())
.setPropagationEnabled(
Config.get()
.getBooleanProperty("otel.instrumentation.rocketmq-client.propagation", true))
.setCaptureExperimentalSpanAttributes(
Config.get()
.getBooleanProperty(
"otel.instrumentation.rocketmq-client.experimental-span-attributes", false))
.build();

public static final ConsumeMessageHook CONSUME_MESSAGE_HOOK =
TRACING.newTracingConsumeMessageHook();

public static final SendMessageHook SEND_MESSAGE_HOOK = TRACING.newTracingSendMessageHook();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmq;

import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;

public class RocketMqConsumerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.consumer.DefaultMQPushConsumer");
}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(named("start")).and(takesArguments(0)),
RocketMqConsumerInstrumentation.class.getName() + "$AdviceStart");
}

public static class AdviceStart {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.FieldValue(
value = "defaultMQPushConsumerImpl",
declaringType = DefaultMQPushConsumer.class)
DefaultMQPushConsumerImpl defaultMqPushConsumerImpl) {
defaultMqPushConsumerImpl.registerConsumeMessageHook(
RocketMqClientHooks.CONSUME_MESSAGE_HOOK);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmq;

import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class RocketMqInstrumentationModule extends InstrumentationModule {
public RocketMqInstrumentationModule() {
super("rocketmq-client", "rocketmq-client-4.8");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(new RocketMqProducerInstrumentation(), new RocketMqConsumerInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmq;

import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

public class RocketMqProducerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.producer.DefaultMQProducer");
}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(named("start")).and(takesArguments(0)),
RocketMqProducerInstrumentation.class.getName() + "$AdviceStart");
}

public static class AdviceStart {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.FieldValue(value = "defaultMQProducerImpl", declaringType = DefaultMQProducer.class)
DefaultMQProducerImpl defaultMqProducerImpl) {
defaultMqProducerImpl.registerSendMessageHook(RocketMqClientHooks.SEND_MESSAGE_HOOK);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.rocketmq

import io.opentelemetery.instrumentation.rocketmq.AbstractRocketMqClientTest
import io.opentelemetry.instrumentation.test.AgentTestTrait
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.producer.DefaultMQProducer

class RocketMqClientTest extends AbstractRocketMqClientTest implements AgentTestTrait {

@Override
void configureMQProducer(DefaultMQProducer producer) {
}

@Override
void configureMQPushConsumer(DefaultMQPushConsumer consumer) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
apply from: "$rootDir/gradle/instrumentation-library.gradle"

dependencies {
library group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.8.0'
testImplementation project(':instrumentation:rocketmq-client-4.8:testing')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.rocketmq;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;

final class ContextAndScope {
private final Context context;
private final Scope scope;

public ContextAndScope(Context context, Scope scope) {
this.context = context;
this.scope = scope;
}

public Context getContext() {
return context;
}

public void closeScope() {
scope.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.rocketmq;

import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
import static io.opentelemetry.instrumentation.rocketmq.TextMapExtractAdapter.GETTER;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;
import org.checkerframework.checker.nullness.qual.Nullable;

final class RocketMqConsumerTracer extends BaseTracer {

private boolean captureExperimentalSpanAttributes;
private boolean propagationEnabled;

RocketMqConsumerTracer(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
boolean propagationEnabled) {
super(openTelemetry);
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.propagationEnabled = propagationEnabled;
}

@Override
protected String getInstrumentationName() {
return "io.opentelemetry.javaagent.rocketmq-client";
}

Context startSpan(Context parentContext, List<MessageExt> msgs) {
if (msgs.size() == 1) {
SpanBuilder spanBuilder = startSpanBuilder(extractParent(msgs.get(0)), msgs.get(0));
return parentContext.with(spanBuilder.startSpan());
} else {
SpanBuilder spanBuilder =
spanBuilder(parentContext, "multiple_sources receive", CONSUMER)
.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq")
.setAttribute(SemanticAttributes.MESSAGING_OPERATION, "receive");
Context rootContext = parentContext.with(spanBuilder.startSpan());
for (MessageExt message : msgs) {
createChildSpan(rootContext, message);
}
return rootContext;
}
}

private void createChildSpan(Context parentContext, MessageExt msg) {
SpanBuilder childSpanBuilder =
startSpanBuilder(parentContext, msg)
.addLink(Span.fromContext(extractParent(msg)).getSpanContext());
end(parentContext.with(childSpanBuilder.startSpan()));
}

private SpanBuilder startSpanBuilder(Context parentContext, MessageExt msg) {
SpanBuilder spanBuilder =
spanBuilder(parentContext, spanNameOnConsume(msg), CONSUMER)
.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq")
.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic())
.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")
.setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process")
.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, msg.getMsgId())
.setAttribute(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
(long) msg.getBody().length);
onConsume(spanBuilder, msg);
return spanBuilder;
}

private Context extractParent(MessageExt msg) {
if (propagationEnabled) {
return extract(msg.getProperties(), GETTER);
} else {
return Context.current();
}
}

private void onConsume(SpanBuilder spanBuilder, MessageExt msg) {
if (captureExperimentalSpanAttributes) {
spanBuilder.setAttribute("messaging.rocketmq.tags", msg.getTags());
spanBuilder.setAttribute("messaging.rocketmq.queue_id", msg.getQueueId());
spanBuilder.setAttribute("messaging.rocketmq.queue_offset", msg.getQueueOffset());
spanBuilder.setAttribute("messaging.rocketmq.broker_address", getBrokerHost(msg));
}
}

private String spanNameOnConsume(MessageExt msg) {
return msg.getTopic() + " process";
}

@Nullable
private String getBrokerHost(MessageExt msg) {
if (msg.getStoreHost() != null) {
return msg.getStoreHost().toString().replace("/", "");
} else {
return null;
}
}
}
Loading

0 comments on commit ee66554

Please sign in to comment.