Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add instrumentation for rocketmq #2263

Merged
merged 57 commits into from
Mar 11, 2021
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
535b95b
Merge pull request #1 from open-telemetry/main
addname Feb 12, 2021
0d1fbed
add rocketmq support
addname Feb 12, 2021
cb059ab
Merge branch 'main'
addname Feb 12, 2021
2f2cfcf
merge main
addname Feb 12, 2021
d3fd46b
Merge branch 'main'
addname Feb 15, 2021
bce0921
modify tests
addname Feb 16, 2021
924b2e3
modify tests
addname Feb 16, 2021
fb41154
modify tests
addname Feb 16, 2021
451d3a5
modify style
addname Feb 16, 2021
24d1d4a
modify style
addname Feb 16, 2021
fd9f3a4
modify style
addname Feb 16, 2021
06ea7bc
modify style
addname Feb 16, 2021
f40b086
modify tests
addname Feb 16, 2021
bad5c16
modify tests
addname Feb 16, 2021
73bbefb
modify tests
addname Feb 16, 2021
06175b2
modify tests
addname Feb 16, 2021
fb3dc3d
Merge branch 'main'
addname Feb 18, 2021
a7f764c
Merge branch 'main'
addname Feb 24, 2021
66617dc
Merge branch 'main' of /~https://github.com/open-telemetry/opentelemetr…
addname Feb 28, 2021
4094f4a
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
558acf7
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
310c268
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
c98b828
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
1ea63af
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
449b8e2
Use hooks to register in the iavaagent instrumentation
addname Feb 28, 2021
55aa9f2
Merge branch 'main' of /~https://github.com/open-telemetry/opentelemetr…
addname Mar 5, 2021
a92c9b3
Fix
addname Mar 6, 2021
b5ba2ca
Revert "Use hooks to register in the iavaagent instrumentation"
addname Mar 6, 2021
0887762
Merge branch 'main' of /~https://github.com/open-telemetry/opentelemetr…
addname Mar 6, 2021
48de0f7
Fix
addname Mar 6, 2021
ad8cbc7
Fix
addname Mar 6, 2021
f28eee6
Fix
addname Mar 6, 2021
81c3720
Fix
addname Mar 6, 2021
1c2c71f
Fix
addname Mar 6, 2021
0ed11a0
Fix
addname Mar 6, 2021
c32bc40
Fix
addname Mar 6, 2021
460092a
Merge branch 'main' of /~https://github.com/open-telemetry/opentelemetr…
addname Mar 9, 2021
0489ef9
Fix
addname Mar 9, 2021
963e012
Fix
addname Mar 9, 2021
00aa1fa
Fix
addname Mar 9, 2021
5419081
Fix
addname Mar 9, 2021
8419db8
Fix
addname Mar 9, 2021
aa2c7b5
Fix
addname Mar 9, 2021
8c606f6
Update settings.gradle
anuraaga Mar 9, 2021
4a00e83
Merge branch 'main' of /~https://github.com/open-telemetry/opentelemetr…
addname Mar 10, 2021
11aa520
Fix
addname Mar 10, 2021
9ffde5d
Merge remote-tracking branch 'origin/issue#1916' into issue#1916
addname Mar 10, 2021
6ad79eb
Fix
addname Mar 10, 2021
24de0ca
Fix
addname Mar 10, 2021
3aac1a5
Merge branch 'main' of /~https://github.com/open-telemetry/opentelemetr…
addname Mar 10, 2021
b4eb7c3
Fix
addname Mar 10, 2021
58801ed
Fix
addname Mar 10, 2021
398246b
Fix
addname Mar 10, 2021
5b45a6a
Fix
addname Mar 10, 2021
f873db7
Fix
addname Mar 10, 2021
c0c4c9d
Merge branch 'main' of /~https://github.com/open-telemetry/opentelemetr…
addname Mar 10, 2021
28f30fc
Fix
addname Mar 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apply from: "$rootDir/gradle/instrumentation.gradle"

muzzle {
pass {
group = "org.apache.rocketmq"
module = 'rocketmq-client'
versions = "[4.8.0,)"
mateuszrzeszutek marked this conversation as resolved.
Show resolved Hide resolved
assertInverse = true
}
}

dependencies {
library group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.8.0'
implementation project(':instrumentation:rocketmq-client-4.8:library')
testImplementation project(':instrumentation:rocketmq-client-4.8:testing')

}

tasks.withType(Test) {
jvmArgs "-Dotel.instrumentation.rocketmq-client.experimental-span-attributes=true"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmq;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.rocketmq.RocketMqTracing;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.hook.SendMessageHook;

public final class RocketMqClientHooks {
private static final RocketMqTracing TRACING =
RocketMqTracing.newBuilder(GlobalOpenTelemetry.get())
.setPropagationEnabled(
Config.get()
.getBooleanProperty("otel.instrumentation.rocketmq-client.propagation", true))
.setCaptureExperimentalSpanAttributes(
Config.get()
.getBooleanProperty(
"otel.instrumentation.rocketmq-client.experimental-span-attributes", false))
.build();

public static final ConsumeMessageHook CONSUME_MESSAGE_HOOK =
TRACING.newTracingConsumeMessageHook();

public static final SendMessageHook SEND_MESSAGE_HOOK = TRACING.newTracingSendMessageHook();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmq;

import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;

public class RocketMqConsumerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.consumer.DefaultMQPushConsumer");
}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(named("start")).and(takesArguments(0)),
RocketMqConsumerInstrumentation.class.getName() + "$AdviceStart");
}

public static class AdviceStart {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.FieldValue(
value = "defaultMQPushConsumerImpl",
declaringType = DefaultMQPushConsumer.class)
DefaultMQPushConsumerImpl defaultMqPushConsumerImpl) {
defaultMqPushConsumerImpl.registerConsumeMessageHook(
RocketMqClientHooks.CONSUME_MESSAGE_HOOK);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmq;

import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class RocketMqInstrumentationModule extends InstrumentationModule {
public RocketMqInstrumentationModule() {
super("rocketmq-client", "rocketmq-client-4.8");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(new RocketMqProducerInstrumentation(), new RocketMqConsumerInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmq;

import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

public class RocketMqProducerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.producer.DefaultMQProducer");
}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(named("start")).and(takesArguments(0)),
RocketMqProducerInstrumentation.class.getName() + "$AdviceStart");
}

public static class AdviceStart {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.FieldValue(value = "defaultMQProducerImpl", declaringType = DefaultMQProducer.class)
DefaultMQProducerImpl defaultMqProducerImpl) {
defaultMqProducerImpl.registerSendMessageHook(RocketMqClientHooks.SEND_MESSAGE_HOOK);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.rocketmq

import io.opentelemetery.instrumentation.rocketmq.AbstractRocketMqClientTest
import io.opentelemetry.instrumentation.test.AgentTestTrait
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.producer.DefaultMQProducer

class RocketMqClientTest extends AbstractRocketMqClientTest implements AgentTestTrait {

@Override
void configureMQProducer(DefaultMQProducer producer) {
}

@Override
void configureMQPushConsumer(DefaultMQPushConsumer consumer) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apply from: "$rootDir/gradle/instrumentation-library.gradle"

dependencies {
library group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.8.0'
testImplementation project(':instrumentation:rocketmq-client-4.8:testing')
}

tasks.withType(Test) {
jvmArgs "-Dotel.instrumentation.rocketmq-client.experimental-span-attributes=true"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.rocketmq;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;

public final class ContextAndScope {
addname marked this conversation as resolved.
Show resolved Hide resolved
private final Context context;
private final Scope scope;

public ContextAndScope(Context context, Scope scope) {
this.context = context;
this.scope = scope;
}

public Context getContext() {
return context;
}

public void closeScope() {
scope.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.rocketmq;

import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
import static io.opentelemetry.instrumentation.rocketmq.TextMapExtractAdapter.GETTER;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;
import org.checkerframework.checker.nullness.qual.Nullable;

final class RocketMqConsumerTracer extends BaseTracer {

private boolean captureExperimentalSpanAttributes;
private boolean propagationEnabled;

RocketMqConsumerTracer(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
boolean propagationEnabled) {
super(openTelemetry);
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.propagationEnabled = propagationEnabled;
}

addname marked this conversation as resolved.
Show resolved Hide resolved
@Override
protected String getInstrumentationName() {
return "io.opentelemetry.javaagent.rocketmq-client";
}

Context startSpan(Context parentContext, List<MessageExt> msgs) {
if (msgs.size() == 1) {
SpanBuilder spanBuilder = startSpanBuilder(extractParent(msgs.get(0)), msgs.get(0));
return parentContext.with(spanBuilder.startSpan());
} else {
SpanBuilder spanBuilder =
spanBuilder(parentContext, "multiple_sources receive", CONSUMER)
.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq")
.setAttribute(SemanticAttributes.MESSAGING_OPERATION, "receive");
Context rootContext = withClientSpan(parentContext, spanBuilder.startSpan());
addname marked this conversation as resolved.
Show resolved Hide resolved
for (MessageExt message : msgs) {
createChildSpan(rootContext, message);
}
return rootContext;
}
}

private void createChildSpan(Context parentContext, MessageExt msg) {
SpanBuilder childSpanBuilder =
startSpanBuilder(parentContext, msg)
.addLink(Span.fromContext(extractParent(msg)).getSpanContext());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this particular case if the propagation is turned off you shouldn't add a link - it'll end up pointing to the parentContext anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed this spec issue we may be able to handle this in the SDK too open-telemetry/opentelemetry-specification#1492

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave this code as is, next SDK will handle invalid links for us

end(withClientSpan(parentContext, childSpanBuilder.startSpan()));
addname marked this conversation as resolved.
Show resolved Hide resolved
}

private SpanBuilder startSpanBuilder(Context parentContext, MessageExt msg) {
SpanBuilder spanBuilder =
spanBuilder(parentContext, spanNameOnConsume(msg), CONSUMER)
.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq")
.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic())
.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")
.setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process")
.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, msg.getMsgId())
.setAttribute(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
(long) msg.getBody().length);
onConsume(spanBuilder, msg);
return spanBuilder;
}

private Context extractParent(MessageExt msg) {
if (propagationEnabled) {
return extract(msg.getProperties(), GETTER);
} else {
return Context.current();
}
}

private void onConsume(SpanBuilder spanBuilder, MessageExt msg) {
if (captureExperimentalSpanAttributes) {
spanBuilder.setAttribute("messaging.rocketmq.tags", msg.getTags());
spanBuilder.setAttribute("messaging.rocketmq.queue_id", msg.getQueueId());
spanBuilder.setAttribute("messaging.rocketmq.queue_offset", msg.getQueueOffset());
spanBuilder.setAttribute("messaging.rocketmq.broker_address", getBrokerHost(msg));
}
}

private String spanNameOnConsume(MessageExt msg) {
return msg.getTopic() + " process";
}

@Nullable
private String getBrokerHost(MessageExt msg) {
if (msg.getStoreHost() != null) {
return msg.getStoreHost().toString().replace("/", "");
} else {
return null;
}
}
}
Loading