diff --git a/instrumentation/rocketmq-client-4.8/javaagent/rocketmq-client-4.8-javaagent.gradle b/instrumentation/rocketmq-client-4.8/javaagent/rocketmq-client-4.8-javaagent.gradle new file mode 100644 index 000000000000..b0b2ed33f6ba --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/javaagent/rocketmq-client-4.8-javaagent.gradle @@ -0,0 +1,21 @@ +apply from: "$rootDir/gradle/instrumentation.gradle" + +muzzle { + pass { + group = "org.apache.rocketmq" + module = 'rocketmq-client' + versions = "[4.8.0,)" + assertInverse = true + } +} + +dependencies { + library group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.8.0' + implementation project(':instrumentation:rocketmq-client-4.8:library') + testImplementation project(':instrumentation:rocketmq-client-4.8:testing') + +} + +tasks.withType(Test) { + jvmArgs "-Dotel.instrumentation.rocketmq-client.experimental-span-attributes=true" +} diff --git a/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqClientHooks.java b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqClientHooks.java new file mode 100644 index 000000000000..eaebd89e0340 --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqClientHooks.java @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmq; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.api.config.Config; +import io.opentelemetry.instrumentation.rocketmq.RocketMqTracing; +import org.apache.rocketmq.client.hook.ConsumeMessageHook; +import org.apache.rocketmq.client.hook.SendMessageHook; + +public final class RocketMqClientHooks { + private static final RocketMqTracing TRACING = + RocketMqTracing.newBuilder(GlobalOpenTelemetry.get()) + .setPropagationEnabled( + Config.get() + .getBooleanProperty("otel.instrumentation.rocketmq-client.propagation", true)) + .setCaptureExperimentalSpanAttributes( + Config.get() + .getBooleanProperty( + "otel.instrumentation.rocketmq-client.experimental-span-attributes", false)) + .build(); + + public static final ConsumeMessageHook CONSUME_MESSAGE_HOOK = + TRACING.newTracingConsumeMessageHook(); + + public static final SendMessageHook SEND_MESSAGE_HOOK = TRACING.newTracingSendMessageHook(); +} diff --git a/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqConsumerInstrumentation.java b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqConsumerInstrumentation.java new file mode 100644 index 000000000000..69b8fea08fac --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqConsumerInstrumentation.java @@ -0,0 +1,47 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmq; + +import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.javaagent.tooling.TypeInstrumentation; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; + +public class RocketMqConsumerInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.rocketmq.client.consumer.DefaultMQPushConsumer"); + } + + @Override + public Map, String> transformers() { + return singletonMap( + isMethod().and(named("start")).and(takesArguments(0)), + RocketMqConsumerInstrumentation.class.getName() + "$AdviceStart"); + } + + public static class AdviceStart { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.FieldValue( + value = "defaultMQPushConsumerImpl", + declaringType = DefaultMQPushConsumer.class) + DefaultMQPushConsumerImpl defaultMqPushConsumerImpl) { + defaultMqPushConsumerImpl.registerConsumeMessageHook( + RocketMqClientHooks.CONSUME_MESSAGE_HOOK); + } + } +} diff --git a/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqInstrumentationModule.java b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqInstrumentationModule.java new file mode 100644 index 000000000000..9b8aeead195a --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqInstrumentationModule.java @@ -0,0 +1,25 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmq; + +import static java.util.Arrays.asList; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.tooling.InstrumentationModule; +import io.opentelemetry.javaagent.tooling.TypeInstrumentation; +import java.util.List; + +@AutoService(InstrumentationModule.class) +public class RocketMqInstrumentationModule extends InstrumentationModule { + public RocketMqInstrumentationModule() { + super("rocketmq-client", "rocketmq-client-4.8"); + } + + @Override + public List typeInstrumentations() { + return asList(new RocketMqProducerInstrumentation(), new RocketMqConsumerInstrumentation()); + } +} diff --git a/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqProducerInstrumentation.java b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqProducerInstrumentation.java new file mode 100644 index 000000000000..153ef6d2a836 --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqProducerInstrumentation.java @@ -0,0 +1,44 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmq; + +import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.javaagent.tooling.TypeInstrumentation; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.producer.DefaultMQProducer; + +public class RocketMqProducerInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.rocketmq.client.producer.DefaultMQProducer"); + } + + @Override + public Map, String> transformers() { + return singletonMap( + isMethod().and(named("start")).and(takesArguments(0)), + RocketMqProducerInstrumentation.class.getName() + "$AdviceStart"); + } + + public static class AdviceStart { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.FieldValue(value = "defaultMQProducerImpl", declaringType = DefaultMQProducer.class) + DefaultMQProducerImpl defaultMqProducerImpl) { + defaultMqProducerImpl.registerSendMessageHook(RocketMqClientHooks.SEND_MESSAGE_HOOK); + } + } +} diff --git a/instrumentation/rocketmq-client-4.8/javaagent/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy b/instrumentation/rocketmq-client-4.8/javaagent/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy new file mode 100644 index 000000000000..478b03d22566 --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/javaagent/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq + +import io.opentelemetery.instrumentation.rocketmq.AbstractRocketMqClientTest +import io.opentelemetry.instrumentation.test.AgentTestTrait +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer +import org.apache.rocketmq.client.producer.DefaultMQProducer + +class RocketMqClientTest extends AbstractRocketMqClientTest implements AgentTestTrait { + + @Override + void configureMQProducer(DefaultMQProducer producer) { + } + + @Override + void configureMQPushConsumer(DefaultMQPushConsumer consumer) { + } +} \ No newline at end of file diff --git a/instrumentation/rocketmq-client-4.8/library/rocketmq-client-4.8-library.gradle b/instrumentation/rocketmq-client-4.8/library/rocketmq-client-4.8-library.gradle new file mode 100644 index 000000000000..e9ab4deda882 --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/rocketmq-client-4.8-library.gradle @@ -0,0 +1,6 @@ +apply from: "$rootDir/gradle/instrumentation-library.gradle" + +dependencies { + library group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.8.0' + testImplementation project(':instrumentation:rocketmq-client-4.8:testing') +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/ContextAndScope.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/ContextAndScope.java new file mode 100644 index 000000000000..aa67e1f4cde1 --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/ContextAndScope.java @@ -0,0 +1,27 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; + +final class ContextAndScope { + private final Context context; + private final Scope scope; + + public ContextAndScope(Context context, Scope scope) { + this.context = context; + this.scope = scope; + } + + public Context getContext() { + return context; + } + + public void closeScope() { + scope.close(); + } +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqConsumerTracer.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqConsumerTracer.java new file mode 100644 index 000000000000..cad16785156d --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqConsumerTracer.java @@ -0,0 +1,108 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq; + +import static io.opentelemetry.api.trace.SpanKind.CONSUMER; +import static io.opentelemetry.instrumentation.rocketmq.TextMapExtractAdapter.GETTER; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.tracer.BaseTracer; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.List; +import org.apache.rocketmq.common.message.MessageExt; +import org.checkerframework.checker.nullness.qual.Nullable; + +final class RocketMqConsumerTracer extends BaseTracer { + + private boolean captureExperimentalSpanAttributes; + private boolean propagationEnabled; + + RocketMqConsumerTracer( + OpenTelemetry openTelemetry, + boolean captureExperimentalSpanAttributes, + boolean propagationEnabled) { + super(openTelemetry); + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + this.propagationEnabled = propagationEnabled; + } + + @Override + protected String getInstrumentationName() { + return "io.opentelemetry.javaagent.rocketmq-client"; + } + + Context startSpan(Context parentContext, List msgs) { + if (msgs.size() == 1) { + SpanBuilder spanBuilder = startSpanBuilder(extractParent(msgs.get(0)), msgs.get(0)); + return parentContext.with(spanBuilder.startSpan()); + } else { + SpanBuilder spanBuilder = + spanBuilder(parentContext, "multiple_sources receive", CONSUMER) + .setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq") + .setAttribute(SemanticAttributes.MESSAGING_OPERATION, "receive"); + Context rootContext = parentContext.with(spanBuilder.startSpan()); + for (MessageExt message : msgs) { + createChildSpan(rootContext, message); + } + return rootContext; + } + } + + private void createChildSpan(Context parentContext, MessageExt msg) { + SpanBuilder childSpanBuilder = + startSpanBuilder(parentContext, msg) + .addLink(Span.fromContext(extractParent(msg)).getSpanContext()); + end(parentContext.with(childSpanBuilder.startSpan())); + } + + private SpanBuilder startSpanBuilder(Context parentContext, MessageExt msg) { + SpanBuilder spanBuilder = + spanBuilder(parentContext, spanNameOnConsume(msg), CONSUMER) + .setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq") + .setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic()) + .setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic") + .setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, msg.getMsgId()) + .setAttribute( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + (long) msg.getBody().length); + onConsume(spanBuilder, msg); + return spanBuilder; + } + + private Context extractParent(MessageExt msg) { + if (propagationEnabled) { + return extract(msg.getProperties(), GETTER); + } else { + return Context.current(); + } + } + + private void onConsume(SpanBuilder spanBuilder, MessageExt msg) { + if (captureExperimentalSpanAttributes) { + spanBuilder.setAttribute("messaging.rocketmq.tags", msg.getTags()); + spanBuilder.setAttribute("messaging.rocketmq.queue_id", msg.getQueueId()); + spanBuilder.setAttribute("messaging.rocketmq.queue_offset", msg.getQueueOffset()); + spanBuilder.setAttribute("messaging.rocketmq.broker_address", getBrokerHost(msg)); + } + } + + private String spanNameOnConsume(MessageExt msg) { + return msg.getTopic() + " process"; + } + + @Nullable + private String getBrokerHost(MessageExt msg) { + if (msg.getStoreHost() != null) { + return msg.getStoreHost().toString().replace("/", ""); + } else { + return null; + } + } +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqProducerTracer.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqProducerTracer.java new file mode 100644 index 000000000000..607367f84773 --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqProducerTracer.java @@ -0,0 +1,60 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq; + +import static io.opentelemetry.api.trace.SpanKind.PRODUCER; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.tracer.BaseTracer; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; + +final class RocketMqProducerTracer extends BaseTracer { + + private boolean captureExperimentalSpanAttributes; + + RocketMqProducerTracer(OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) { + super(openTelemetry); + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + } + + @Override + protected String getInstrumentationName() { + return "io.opentelemetry.javaagent.rocketmq-client"; + } + + Context startProducerSpan(Context parentContext, String addr, Message msg) { + SpanBuilder spanBuilder = spanBuilder(parentContext, spanNameOnProduce(msg), PRODUCER); + onProduce(spanBuilder, msg, addr); + return parentContext.with(spanBuilder.startSpan()); + } + + private void onProduce(SpanBuilder spanBuilder, Message msg, String addr) { + spanBuilder.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"); + spanBuilder.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"); + spanBuilder.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic()); + if (captureExperimentalSpanAttributes) { + spanBuilder.setAttribute("messaging.rocketmq.tags", msg.getTags()); + spanBuilder.setAttribute("messaging.rocketmq.broker_address", addr); + } + } + + public void afterProduce(Context context, SendResult sendResult) { + Span span = Span.fromContext(context); + span.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, sendResult.getMsgId()); + if (captureExperimentalSpanAttributes) { + span.setAttribute("messaging.rocketmq.send_result", sendResult.getSendStatus().name()); + } + } + + private String spanNameOnProduce(Message msg) { + return msg.getTopic() + " send"; + } +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTracing.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTracing.java new file mode 100644 index 000000000000..c29f7a20132e --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTracing.java @@ -0,0 +1,61 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq; + +import io.opentelemetry.api.OpenTelemetry; +import org.apache.rocketmq.client.hook.ConsumeMessageHook; +import org.apache.rocketmq.client.hook.SendMessageHook; + +/** Entrypoint for tracing RocketMq producers or consumers. */ +public final class RocketMqTracing { + + /** Returns a new {@link RocketMqTracing} configured with the given {@link OpenTelemetry}. */ + public static RocketMqTracing create(OpenTelemetry openTelemetry) { + return newBuilder(openTelemetry).build(); + } + + /** + * Returns a new {@link RocketMqTracingBuilder} configured with the given {@link OpenTelemetry}. + */ + public static RocketMqTracingBuilder newBuilder(OpenTelemetry openTelemetry) { + return new RocketMqTracingBuilder(openTelemetry); + } + + private final boolean captureExperimentalSpanAttributes; + private final boolean propagationEnabled; + + private final RocketMqConsumerTracer rocketMqConsumerTracer; + private final RocketMqProducerTracer rocketMqProducerTracer; + + RocketMqTracing( + OpenTelemetry openTelemetry, + boolean captureExperimentalSpanAttributes, + boolean propagationEnabled) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + this.propagationEnabled = propagationEnabled; + rocketMqConsumerTracer = + new RocketMqConsumerTracer( + openTelemetry, captureExperimentalSpanAttributes, propagationEnabled); + rocketMqProducerTracer = + new RocketMqProducerTracer(openTelemetry, captureExperimentalSpanAttributes); + } + + /** + * Returns a new {@link ConsumeMessageHook} for use with methods like {@link + * org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#registerConsumeMessageHook(ConsumeMessageHook)}. + */ + public ConsumeMessageHook newTracingConsumeMessageHook() { + return new TracingConsumeMessageHookImpl(rocketMqConsumerTracer); + } + + /** + * Returns a new {@link SendMessageHook} for use with methods like {@link + * org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#registerSendMessageHook(SendMessageHook)}. + */ + public SendMessageHook newTracingSendMessageHook() { + return new TracingSendMessageHookImpl(rocketMqProducerTracer, propagationEnabled); + } +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTracingBuilder.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTracingBuilder.java new file mode 100644 index 000000000000..faaa2407d4b8 --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTracingBuilder.java @@ -0,0 +1,49 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq; + +import io.opentelemetry.api.OpenTelemetry; + +/** A builder of {@link RocketMqTracing}. */ +public final class RocketMqTracingBuilder { + + private final OpenTelemetry openTelemetry; + + private boolean captureExperimentalSpanAttributes; + private boolean propagationEnabled = true; + + RocketMqTracingBuilder(OpenTelemetry openTelemetry) { + this.openTelemetry = openTelemetry; + } + + /** + * Sets whether experimental attributes should be set to spans. These attributes may be changed or + * removed in the future, so only enable this if you know you do not require attributes filled by + * this instrumentation to be stable across versions + */ + public RocketMqTracingBuilder setCaptureExperimentalSpanAttributes( + boolean captureExperimentalSpanAttributes) { + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + return this; + } + + /** + * Sets whether the trace context should be written from producers / read from consumers for + * propagating through messaging. + */ + public RocketMqTracingBuilder setPropagationEnabled(boolean propagationEnabled) { + this.propagationEnabled = propagationEnabled; + return this; + } + + /** + * Returns a new {@link RocketMqTracing} with the settings of this {@link RocketMqTracingBuilder}. + */ + public RocketMqTracing build() { + return new RocketMqTracing( + openTelemetry, captureExperimentalSpanAttributes, propagationEnabled); + } +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapExtractAdapter.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapExtractAdapter.java new file mode 100644 index 000000000000..c7fd9ed0a623 --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapExtractAdapter.java @@ -0,0 +1,24 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq; + +import io.opentelemetry.context.propagation.TextMapGetter; +import java.util.Map; + +final class TextMapExtractAdapter implements TextMapGetter> { + + public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter(); + + @Override + public Iterable keys(Map carrier) { + return carrier.keySet(); + } + + @Override + public String get(Map carrier, String key) { + return carrier.get(key); + } +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapInjectAdapter.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapInjectAdapter.java new file mode 100644 index 000000000000..556f60f32800 --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapInjectAdapter.java @@ -0,0 +1,19 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq; + +import io.opentelemetry.context.propagation.TextMapSetter; +import java.util.Map; + +final class TextMapInjectAdapter implements TextMapSetter> { + + public static final TextMapInjectAdapter SETTER = new TextMapInjectAdapter(); + + @Override + public void set(Map carrier, String key, String value) { + carrier.put(key, value); + } +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingConsumeMessageHookImpl.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingConsumeMessageHookImpl.java new file mode 100644 index 000000000000..11b0775a02cc --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingConsumeMessageHookImpl.java @@ -0,0 +1,46 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq; + +import io.opentelemetry.context.Context; +import org.apache.rocketmq.client.hook.ConsumeMessageContext; +import org.apache.rocketmq.client.hook.ConsumeMessageHook; + +final class TracingConsumeMessageHookImpl implements ConsumeMessageHook { + + private final RocketMqConsumerTracer tracer; + + TracingConsumeMessageHookImpl(RocketMqConsumerTracer tracer) { + this.tracer = tracer; + } + + @Override + public String hookName() { + return "OpenTelemetryConsumeMessageTraceHook"; + } + + @Override + public void consumeMessageBefore(ConsumeMessageContext context) { + if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { + return; + } + Context traceContext = tracer.startSpan(Context.current(), context.getMsgList()); + ContextAndScope contextAndScope = new ContextAndScope(traceContext, traceContext.makeCurrent()); + context.setMqTraceContext(contextAndScope); + } + + @Override + public void consumeMessageAfter(ConsumeMessageContext context) { + if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { + return; + } + if (context.getMqTraceContext() instanceof ContextAndScope) { + ContextAndScope contextAndScope = (ContextAndScope) context.getMqTraceContext(); + contextAndScope.closeScope(); + tracer.end(contextAndScope.getContext()); + } + } +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingSendMessageHookImpl.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingSendMessageHookImpl.java new file mode 100644 index 000000000000..8665c3c9f36a --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingSendMessageHookImpl.java @@ -0,0 +1,58 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq; + +import static io.opentelemetry.instrumentation.rocketmq.TextMapInjectAdapter.SETTER; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.context.Context; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.hook.SendMessageHook; + +final class TracingSendMessageHookImpl implements SendMessageHook { + + private final RocketMqProducerTracer tracer; + private boolean propagationEnabled; + + TracingSendMessageHookImpl(RocketMqProducerTracer tracer, boolean propagationEnabled) { + this.tracer = tracer; + this.propagationEnabled = propagationEnabled; + } + + @Override + public String hookName() { + return "OpenTelemetrySendMessageTraceHook"; + } + + @Override + public void sendMessageBefore(SendMessageContext context) { + if (context == null) { + return; + } + Context traceContext = + tracer.startProducerSpan(Context.current(), context.getBrokerAddr(), context.getMessage()); + if (propagationEnabled) { + GlobalOpenTelemetry.getPropagators() + .getTextMapPropagator() + .inject(traceContext, context.getMessage().getProperties(), SETTER); + } + ContextAndScope contextAndScope = new ContextAndScope(traceContext, traceContext.makeCurrent()); + context.setMqTraceContext(contextAndScope); + } + + @Override + public void sendMessageAfter(SendMessageContext context) { + if (context == null || context.getMqTraceContext() == null || context.getSendResult() == null) { + return; + } + if (context.getMqTraceContext() instanceof ContextAndScope) { + ContextAndScope contextAndScope = (ContextAndScope) context.getMqTraceContext(); + tracer.afterProduce(contextAndScope.getContext(), context.getSendResult()); + contextAndScope.closeScope(); + tracer.end(contextAndScope.getContext()); + } + } +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy b/instrumentation/rocketmq-client-4.8/library/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy new file mode 100644 index 000000000000..994c865fc161 --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy @@ -0,0 +1,35 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq + +import io.opentelemetery.instrumentation.rocketmq.AbstractRocketMqClientTest +import io.opentelemetry.instrumentation.api.config.Config +import io.opentelemetry.instrumentation.test.LibraryTestTrait +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer +import org.apache.rocketmq.client.producer.DefaultMQProducer + +class RocketMqClientTest extends AbstractRocketMqClientTest implements LibraryTestTrait { + + @Override + void configureMQProducer(DefaultMQProducer producer) { + producer.getDefaultMQProducerImpl().registerSendMessageHook(RocketMqTracing.newBuilder(openTelemetry) + .setCaptureExperimentalSpanAttributes( + Config.get() + .getBooleanProperty( + "otel.instrumentation.rocketmq-client.experimental-span-attributes", true)) + .build().newTracingSendMessageHook()) + } + + @Override + void configureMQPushConsumer(DefaultMQPushConsumer consumer) { + consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(RocketMqTracing.newBuilder(openTelemetry) + .setCaptureExperimentalSpanAttributes( + Config.get() + .getBooleanProperty( + "otel.instrumentation.rocketmq-client.experimental-span-attributes", true)) + .build().newTracingConsumeMessageHook()) + } +} \ No newline at end of file diff --git a/instrumentation/rocketmq-client-4.8/testing/rocketmq-client-4.8-testing.gradle b/instrumentation/rocketmq-client-4.8/testing/rocketmq-client-4.8-testing.gradle new file mode 100644 index 000000000000..ec6d24d3ce2a --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/testing/rocketmq-client-4.8-testing.gradle @@ -0,0 +1,11 @@ +apply from: "$rootDir/gradle/java.gradle" + +dependencies { + api project(':testing-common') + api group: 'org.apache.rocketmq', name: 'rocketmq-test', version: '4.8.0' + + implementation deps.guava + implementation deps.groovy + implementation deps.opentelemetryApi + implementation deps.spock +} diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetery/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetery/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy new file mode 100644 index 000000000000..e260c597e659 --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetery/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy @@ -0,0 +1,220 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetery.instrumentation.rocketmq + +import base.BaseConf +import io.opentelemetry.instrumentation.test.InstrumentationSpecification +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer +import org.apache.rocketmq.client.producer.DefaultMQProducer +import org.apache.rocketmq.client.producer.SendCallback +import org.apache.rocketmq.client.producer.SendResult +import org.apache.rocketmq.common.message.Message +import org.apache.rocketmq.remoting.common.RemotingHelper +import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener +import spock.lang.Shared +import spock.lang.Unroll +import static io.opentelemetry.api.trace.SpanKind.CONSUMER +import static io.opentelemetry.api.trace.SpanKind.PRODUCER +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace + +@Unroll +abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { + + @Shared + DefaultMQProducer producer + + @Shared + DefaultMQPushConsumer consumer + + @Shared + DefaultMQPushConsumer batchConsumer + + @Shared + def sharedTopic + + @Shared + Message msg + + @Shared + def msgs = new ArrayList() + + abstract void configureMQProducer(DefaultMQProducer producer) + + abstract void configureMQPushConsumer(DefaultMQPushConsumer consumer) + + def setupSpec() { + producer = BaseConf.getProducer(BaseConf.nsAddr) + configureMQProducer(producer) + } + + def "test rocketmq produce callback"() { + setup: + sharedTopic = BaseConf.initTopic() + msg = new Message(sharedTopic, "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)) + when: + producer.send(msg, new SendCallback() { + @Override + void onSuccess(SendResult sendResult) { + } + + @Override + void onException(Throwable throwable) { + } + }) + then: + assertTraces(1) { + trace(0, 1) { + span(0) { + name sharedTopic + " send" + kind PRODUCER + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String + "messaging.rocketmq.tags" "TagA" + "messaging.rocketmq.broker_address" String + "messaging.rocketmq.send_result" "SEND_OK" + } + } + } + } + } + + def "test rocketmq produce and consume"() { + setup: + sharedTopic = BaseConf.initTopic() + msg = new Message(sharedTopic, "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)) + consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new RMQOrderListener()) + configureMQPushConsumer(consumer) + when: + runUnderTrace("parent") { + producer.send(msg) + } + then: + assertTraces(1) { + trace(0, 3) { + basicSpan(it, 0, "parent") + span(1) { + name sharedTopic + " send" + kind PRODUCER + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String + "messaging.rocketmq.tags" "TagA" + "messaging.rocketmq.broker_address" String + "messaging.rocketmq.send_result" "SEND_OK" + } + } + span(2) { + name sharedTopic + " process" + kind CONSUMER + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String + "messaging.rocketmq.tags" "TagA" + "messaging.rocketmq.broker_address" String + "messaging.rocketmq.queue_id" Long + "messaging.rocketmq.queue_offset" Long + } + } + } + } + } + + def "test rocketmq produce and batch consume"() { + setup: + sharedTopic = BaseConf.initTopic() + Message msg1 = new Message(sharedTopic, "TagA", ("hello world a").getBytes()) + Message msg2 = new Message(sharedTopic, "TagB", ("hello world b").getBytes()) + msgs.add(msg1) + msgs.add(msg2) + batchConsumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new RMQOrderListener()) + batchConsumer.setConsumeMessageBatchMaxSize(2) + configureMQPushConsumer(batchConsumer) + when: + runUnderTrace("parent") { + producer.send(msgs) + } + then: + assertTraces(2) { + def itemStepSpan = null + + trace(0, 2) { + itemStepSpan = span(1) + + basicSpan(it, 0, "parent") + span(1) { + name sharedTopic + " send" + kind PRODUCER + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String + "messaging.rocketmq.broker_address" String + "messaging.rocketmq.send_result" "SEND_OK" + } + } + } + + trace(1, 3) { + span(0) { + name "multiple_sources receive" + kind CONSUMER + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + } + } + span(1) { + name sharedTopic + " process" + kind CONSUMER + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String + "messaging.rocketmq.tags" "TagA" + "messaging.rocketmq.broker_address" String + "messaging.rocketmq.queue_id" Long + "messaging.rocketmq.queue_offset" Long + } + childOf span(0) + hasLink itemStepSpan + } + span(2) { + name sharedTopic + " process" + kind CONSUMER + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String + "messaging.rocketmq.tags" "TagB" + "messaging.rocketmq.broker_address" String + "messaging.rocketmq.queue_id" Long + "messaging.rocketmq.queue_offset" Long + } + childOf span(0) + hasLink itemStepSpan + } + } + } + } +} diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/BaseConf.java b/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/BaseConf.java new file mode 100644 index 000000000000..c84669a19eb2 --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/BaseConf.java @@ -0,0 +1,70 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package base; + +import java.util.UUID; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.MQVersion; +import org.apache.rocketmq.namesrv.NamesrvController; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.test.listener.AbstractListener; +import org.apache.rocketmq.test.util.MQRandomUtils; +import org.apache.rocketmq.test.util.RandomUtil; + +public final class BaseConf { + public static final String nsAddr; + public static final String broker1Addr; + protected static String broker1Name; + protected static final String clusterName; + protected static final NamesrvController namesrvController; + protected static final BrokerController brokerController1; + + static { + System.setProperty( + RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); + namesrvController = IntegrationTestBase.createAndStartNamesrv(); + nsAddr = "localhost:" + namesrvController.getNettyServerConfig().getListenPort(); + brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr); + clusterName = brokerController1.getBrokerConfig().getBrokerClusterName(); + broker1Name = brokerController1.getBrokerConfig().getBrokerName(); + broker1Addr = "localhost:" + brokerController1.getNettyServerConfig().getListenPort(); + } + + private BaseConf() {} + + public static String initTopic() { + String topic = MQRandomUtils.getRandomTopic(); + IntegrationTestBase.initTopic(topic, nsAddr, clusterName); + return topic; + } + + public static DefaultMQPushConsumer getConsumer( + String nsAddr, String topic, String subExpression, AbstractListener listener) + throws MQClientException { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup"); + consumer.setInstanceName(RandomUtil.getStringByUUID()); + consumer.setNamesrvAddr(nsAddr); + consumer.subscribe(topic, subExpression); + consumer.setMessageListener(listener); + consumer.start(); + return consumer; + } + + public static DefaultMQProducer getProducer(String ns) throws MQClientException { + DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID()); + producer.setInstanceName(UUID.randomUUID().toString()); + producer.setNamesrvAddr(ns); + producer.start(); + return producer; + } + + private static void deleteTempDir() { + IntegrationTestBase.deleteTempDir(); + } +} diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/IntegrationTestBase.java b/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/IntegrationTestBase.java new file mode 100644 index 000000000000..f58e58125789 --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/IntegrationTestBase.java @@ -0,0 +1,138 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package base; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.namesrv.NamesrvConfig; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.namesrv.NamesrvController; +import org.apache.rocketmq.remoting.netty.NettyClientConfig; +import org.apache.rocketmq.remoting.netty.NettyServerConfig; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.test.util.MQAdmin; +import org.junit.Assert; + +public class IntegrationTestBase { + public static final InternalLogger logger = + InternalLoggerFactory.getLogger(IntegrationTestBase.class); + + protected static final String BROKER_NAME_PREFIX = "TestBrokerName_"; + protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0); + protected static final List TMPE_FILES = new ArrayList<>(); + protected static final List BROKER_CONTROLLERS = new ArrayList<>(); + protected static final List NAMESRV_CONTROLLERS = new ArrayList<>(); + protected static final int COMMIT_LOG_SIZE = 1024 * 1024 * 100; + protected static final int INDEX_NUM = 1000; + private static final AtomicInteger port = new AtomicInteger(40000); + + public static synchronized int nextPort() { + return port.addAndGet(random.nextInt(10) + 10); + } + + protected static final Random random = new Random(); + + private static String createTempDir() { + String path = null; + try { + File file = Files.createTempDirectory("opentelemetry-rocketmq-client-temp").toFile(); + TMPE_FILES.add(file); + path = file.getCanonicalPath(); + } catch (IOException e) { + e.printStackTrace(); + } + return path; + } + + public static void deleteTempDir() { + for (File file : TMPE_FILES) { + boolean deleted = file.delete(); + if (!deleted) { + file.deleteOnExit(); + } + } + } + + public static NamesrvController createAndStartNamesrv() { + String baseDir = createTempDir(); + Path kvConfigPath = Paths.get(baseDir, "namesrv", "kvConfig.json"); + Path namesrvPath = Paths.get(baseDir, "namesrv", "namesrv.properties"); + + NamesrvConfig namesrvConfig = new NamesrvConfig(); + NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig(); + + namesrvConfig.setKvConfigPath(kvConfigPath.toString()); + namesrvConfig.setConfigStorePath(namesrvPath.toString()); + + nameServerNettyServerConfig.setListenPort(nextPort()); + NamesrvController namesrvController = + new NamesrvController(namesrvConfig, nameServerNettyServerConfig); + try { + Assert.assertTrue(namesrvController.initialize()); + logger.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort()); + namesrvController.start(); + } catch (Exception e) { + logger.info("Name Server start failed", e); + } + NAMESRV_CONTROLLERS.add(namesrvController); + return namesrvController; + } + + public static BrokerController createAndStartBroker(String nsAddr) { + String baseDir = createTempDir(); + Path commitLogPath = Paths.get(baseDir, "commitlog"); + + BrokerConfig brokerConfig = new BrokerConfig(); + MessageStoreConfig storeConfig = new MessageStoreConfig(); + brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement()); + brokerConfig.setBrokerIP1("127.0.0.1"); + brokerConfig.setNamesrvAddr(nsAddr); + brokerConfig.setEnablePropertyFilter(true); + storeConfig.setStorePathRootDir(baseDir); + storeConfig.setStorePathCommitLog(commitLogPath.toString()); + storeConfig.setMappedFileSizeCommitLog(COMMIT_LOG_SIZE); + storeConfig.setMaxIndexNum(INDEX_NUM); + storeConfig.setMaxHashSlotNum(INDEX_NUM * 4); + return createAndStartBroker(storeConfig, brokerConfig); + } + + public static BrokerController createAndStartBroker( + MessageStoreConfig storeConfig, BrokerConfig brokerConfig) { + NettyServerConfig nettyServerConfig = new NettyServerConfig(); + NettyClientConfig nettyClientConfig = new NettyClientConfig(); + nettyServerConfig.setListenPort(nextPort()); + storeConfig.setHaListenPort(nextPort()); + BrokerController brokerController = + new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig); + try { + Assert.assertTrue(brokerController.initialize()); + logger.info( + "Broker Start name:{} addr:{}", + brokerConfig.getBrokerName(), + brokerController.getBrokerAddr()); + brokerController.start(); + } catch (Throwable t) { + logger.error("Broker start failed, will exit", t); + System.exit(1); + } + BROKER_CONTROLLERS.add(brokerController); + return brokerController; + } + + public static void initTopic(String topic, String nsAddr, String clusterName) { + MQAdmin.createTopic(nsAddr, clusterName, topic, 20); + } +} diff --git a/settings.gradle b/settings.gradle index a4d9471c1ba3..a07470b8e91d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -251,6 +251,9 @@ include ':instrumentation:undertow:javaagent' include ':instrumentation:vertx-web-3.0' include ':instrumentation:vertx-reactive-3.5:javaagent' include ':instrumentation:wicket-8.0:javaagent' +include ':instrumentation:rocketmq-client-4.8:javaagent' +include ':instrumentation:rocketmq-client-4.8:library' +include ':instrumentation:rocketmq-client-4.8:testing' include ':instrumentation-core:servlet-2.2'