Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture the SNS topic ARN under the 'messaging.destination.name' span attribute. #10096

Merged
merged 10 commits into from
Jan 3, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ class S3TracingTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.SERVER_ADDRESS" String
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.SERVER_PORT" { it == null || Number }
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" topicArn
}
}
}
Expand Down Expand Up @@ -465,6 +466,7 @@ class S3TracingTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.SERVER_ADDRESS" String
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.SERVER_PORT" { it == null || Number }
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" topicArn
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.SERVER_ADDRESS" String
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.SERVER_PORT" { it == null || Number }
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" topicArn
}
}
}
Expand All @@ -164,6 +165,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.SERVER_ADDRESS" String
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.SERVER_PORT" { it == null || Number }
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" topicArn
}
}
span(1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,18 @@ final class AwsSdkInstrumenterFactory {
RpcClientAttributesExtractor.create(AwsSdkRpcAttributesGetter.INSTANCE);
private static final AwsSdkExperimentalAttributesExtractor experimentalAttributesExtractor =
new AwsSdkExperimentalAttributesExtractor();
private static final SnsAttributesExtractor snsAttributesExtractor = new SnsAttributesExtractor();

private static final List<AttributesExtractor<Request<?>, Response<?>>>
defaultAttributesExtractors = Arrays.asList(httpAttributesExtractor, rpcAttributesExtractor);
defaultAttributesExtractors =
Arrays.asList(httpAttributesExtractor, rpcAttributesExtractor, snsAttributesExtractor);
private static final List<AttributesExtractor<Request<?>, Response<?>>>
extendedAttributesExtractors =
Arrays.asList(
httpAttributesExtractor, rpcAttributesExtractor, experimentalAttributesExtractor);
httpAttributesExtractor,
rpcAttributesExtractor,
snsAttributesExtractor,
experimentalAttributesExtractor);
private static final AwsSdkSpanNameExtractor spanName = new AwsSdkSpanNameExtractor();

private final OpenTelemetry openTelemetry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ static String getTableName(Object request) {
return invokeOrNull(access.getTableName, request);
}

@Nullable
static String getTopicArn(Object request) {
RequestAccess access = REQUEST_ACCESSORS.get(request.getClass());
return invokeOrNull(access.getTopicArn, request);
}

@Nullable
static String getTargetArn(Object request) {
RequestAccess access = REQUEST_ACCESSORS.get(request.getClass());
return invokeOrNull(access.getTargetArn, request);
}

@Nullable
private static String invokeOrNull(@Nullable MethodHandle method, Object obj) {
if (method == null) {
Expand All @@ -67,13 +79,17 @@ private static String invokeOrNull(@Nullable MethodHandle method, Object obj) {
@Nullable private final MethodHandle getQueueName;
@Nullable private final MethodHandle getStreamName;
@Nullable private final MethodHandle getTableName;
@Nullable private final MethodHandle getTopicArn;
@Nullable private final MethodHandle getTargetArn;

private RequestAccess(Class<?> clz) {
getBucketName = findAccessorOrNull(clz, "getBucketName");
getQueueUrl = findAccessorOrNull(clz, "getQueueUrl");
getQueueName = findAccessorOrNull(clz, "getQueueName");
getStreamName = findAccessorOrNull(clz, "getStreamName");
getTableName = findAccessorOrNull(clz, "getTableName");
getTopicArn = findAccessorOrNull(clz, "getTopicArn");
getTargetArn = findAccessorOrNull(clz, "getTargetArn");
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.awssdk.v1_11;

import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.Request;
import com.amazonaws.Response;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.internal.AttributesExtractorUtil;
import io.opentelemetry.semconv.SemanticAttributes;
import javax.annotation.Nullable;

public class SnsAttributesExtractor implements AttributesExtractor<Request<?>, Response<?>> {
@Override
public void onStart(AttributesBuilder attributes, Context parentContext, Request<?> request) {
String destination = findMessageDestination(request.getOriginalRequest());
AttributesExtractorUtil.internalSet(
attributes, SemanticAttributes.MESSAGING_DESTINATION_NAME, destination);
}

/*
* Attempt to discover the destination of the SNS message by first checking for a topic ARN and
* falling back to the target ARN. If neither is found null is returned.
*/
private static String findMessageDestination(AmazonWebServiceRequest request) {
String destination = RequestAccess.getTopicArn(request);
if (destination != null) {
return destination;
}
return RequestAccess.getTargetArn(request);
}

@Override
public void onEnd(
AttributesBuilder attributes,
Context context,
Request<?> request,
@Nullable Response<?> response,
@Nullable Throwable error) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import com.amazonaws.services.rds.AmazonRDSClientBuilder
import com.amazonaws.services.rds.model.DeleteOptionGroupRequest
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import com.amazonaws.services.sns.AmazonSNSClientBuilder
import com.amazonaws.services.sns.model.PublishRequest
import io.opentelemetry.api.trace.Span
import io.opentelemetry.instrumentation.api.semconv.http.internal.HttpAttributes
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
Expand Down Expand Up @@ -154,6 +156,26 @@ abstract class AbstractAws1ClientTest extends InstrumentationSpecification {
</ResponseMetadata>
</DeleteOptionGroupResponse>
"""
"SNS" | "Publish" | "POST" | "d74b8436-ae13-5ab4-a9ff-ce54dfea72a0" | AmazonSNSClientBuilder.standard() | { c -> c.publish(new PublishRequest().withMessage("somemessage").withTopicArn("somearn")) } | ["$SemanticAttributes.MESSAGING_DESTINATION_NAME": "somearn"] | """
<PublishResponse xmlns="https://sns.amazonaws.com/doc/2010-03-31/">
<PublishResult>
<MessageId>567910cd-659e-55d4-8ccb-5aaf14679dc0</MessageId>
</PublishResult>
<ResponseMetadata>
<RequestId>d74b8436-ae13-5ab4-a9ff-ce54dfea72a0</RequestId>
</ResponseMetadata>
</PublishResponse>
"""
"SNS" | "Publish" | "POST" | "d74b8436-ae13-5ab4-a9ff-ce54dfea72a0" | AmazonSNSClientBuilder.standard() | { c -> c.publish(new PublishRequest().withMessage("somemessage").withTargetArn("somearn")) } | ["$SemanticAttributes.MESSAGING_DESTINATION_NAME": "somearn"] | """
<PublishResponse xmlns="https://sns.amazonaws.com/doc/2010-03-31/">
<PublishResult>
<MessageId>567910cd-659e-55d4-8ccb-5aaf14679dc0</MessageId>
</PublishResult>
<ResponseMetadata>
<RequestId>d74b8436-ae13-5ab4-a9ff-ce54dfea72a0</RequestId>
</ResponseMetadata>
</PublishResponse>
"""
}

def "send #operation request to closed port"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkRequestType.DYNAMODB;
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkRequestType.KINESIS;
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkRequestType.S3;
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkRequestType.SNS;
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkRequestType.SQS;
import static io.opentelemetry.instrumentation.awssdk.v2_2.FieldMapping.request;
import static io.opentelemetry.instrumentation.awssdk.v2_2.FieldMapping.response;
Expand All @@ -30,6 +31,7 @@ enum AwsSdkRequest {
// generic requests
DynamoDbRequest(DYNAMODB, "DynamoDbRequest"),
S3Request(S3, "S3Request"),
SnsRequest(SNS, "SnsRequest"),
SqsRequest(SQS, "SqsRequest"),
KinesisRequest(KINESIS, "KinesisRequest"),
// specific requests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static io.opentelemetry.instrumentation.awssdk.v2_2.FieldMapping.request;

import io.opentelemetry.semconv.SemanticAttributes;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -15,7 +16,13 @@ enum AwsSdkRequestType {
S3(request("aws.bucket.name", "Bucket")),
SQS(request("aws.queue.url", "QueueUrl"), request("aws.queue.name", "QueueName")),
KINESIS(request("aws.stream.name", "StreamName")),
DYNAMODB(request("aws.table.name", "TableName"));
DYNAMODB(request("aws.table.name", "TableName")),
SNS(
/*
* Only one of TopicArn and TargetArn are permitted on an SNS request.
*/
request(SemanticAttributes.MESSAGING_DESTINATION_NAME.getKey(), "TargetArn"),
laurit marked this conversation as resolved.
Show resolved Hide resolved
request(SemanticAttributes.MESSAGING_DESTINATION_NAME.getKey(), "TopicArn"));

// Wrapping in unmodifiableMap
@SuppressWarnings("ImmutableEnumChecker")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.model.CreateBucketRequest
import software.amazon.awssdk.services.s3.model.GetObjectRequest
import software.amazon.awssdk.services.sns.SnsAsyncClient
import software.amazon.awssdk.services.sns.SnsClient
import software.amazon.awssdk.services.sns.model.PublishRequest
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest
Expand Down Expand Up @@ -84,10 +86,10 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
setup:
configureSdkClient(builder)
def client = builder
.endpointOverride(clientUri)
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
.build()
.endpointOverride(clientUri)
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
.build()

if (body instanceof Closure) {
server.enqueue(body.call())
Expand Down Expand Up @@ -142,6 +144,8 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
} else if (service == "Kinesis") {
"aws.stream.name" "somestream"
} else if (service == "Sns") {
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "somearn"
}
}
}
Expand All @@ -156,6 +160,26 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
"S3" | "CreateBucket" | "PUT" | "UNKNOWN" | s3ClientBuilder() | { c -> c.createBucket(CreateBucketRequest.builder().bucket("somebucket").build()) } | ""
"S3" | "GetObject" | "GET" | "UNKNOWN" | s3ClientBuilder() | { c -> c.getObject(GetObjectRequest.builder().bucket("somebucket").key("somekey").build()) } | ""
"Kinesis" | "DeleteStream" | "POST" | "UNKNOWN" | KinesisClient.builder() | { c -> c.deleteStream(DeleteStreamRequest.builder().streamName("somestream").build()) } | ""
"Sns" | "Publish" | "POST" | "d74b8436-ae13-5ab4-a9ff-ce54dfea72a0" | SnsClient.builder() | { c -> c.publish(PublishRequest.builder().message("somemessage").topicArn("somearn").build()) } | """
laurit marked this conversation as resolved.
Show resolved Hide resolved
<PublishResponse xmlns="https://sns.amazonaws.com/doc/2010-03-31/">
<PublishResult>
<MessageId>567910cd-659e-55d4-8ccb-5aaf14679dc0</MessageId>
</PublishResult>
<ResponseMetadata>
<RequestId>d74b8436-ae13-5ab4-a9ff-ce54dfea72a0</RequestId>
</ResponseMetadata>
</PublishResponse>
"""
"Sns" | "Publish" | "POST" | "d74b8436-ae13-5ab4-a9ff-ce54dfea72a0" | SnsClient.builder() | { c -> c.publish(PublishRequest.builder().message("somemessage").targetArn("somearn").build()) } | """
<PublishResponse xmlns="https://sns.amazonaws.com/doc/2010-03-31/">
<PublishResult>
<MessageId>567910cd-659e-55d4-8ccb-5aaf14679dc0</MessageId>
</PublishResult>
<ResponseMetadata>
<RequestId>d74b8436-ae13-5ab4-a9ff-ce54dfea72a0</RequestId>
</ResponseMetadata>
</PublishResponse>
"""
"Sqs" | "CreateQueue" | "POST" | "7a62c49f-347e-4fc4-9331-6e8e7a96aa73" | SqsClient.builder() | { c -> c.createQueue(CreateQueueRequest.builder().queueName("somequeue").build()) } | {
if (!Boolean.getBoolean("testLatestDeps")) {
def content = """
Expand All @@ -172,9 +196,9 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
}
"""
ResponseHeaders headers = ResponseHeaders.builder(HttpStatus.OK)
.contentType(MediaType.PLAIN_TEXT_UTF_8)
.add("x-amzn-RequestId", "7a62c49f-347e-4fc4-9331-6e8e7a96aa73")
.build()
.contentType(MediaType.PLAIN_TEXT_UTF_8)
.add("x-amzn-RequestId", "7a62c49f-347e-4fc4-9331-6e8e7a96aa73")
.build()
return HttpResponse.of(headers, HttpData.of(StandardCharsets.UTF_8, content))
}
"Sqs" | "SendMessage" | "POST" | "27daac76-34dd-47df-bd01-1f6e873584a0" | SqsClient.builder() | { c -> c.sendMessage(SendMessageRequest.builder().queueUrl(QUEUE_URL).messageBody("").build()) } | {
Expand All @@ -199,9 +223,9 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
}
"""
ResponseHeaders headers = ResponseHeaders.builder(HttpStatus.OK)
.contentType(MediaType.PLAIN_TEXT_UTF_8)
.add("x-amzn-RequestId", "27daac76-34dd-47df-bd01-1f6e873584a0")
.build()
.contentType(MediaType.PLAIN_TEXT_UTF_8)
.add("x-amzn-RequestId", "27daac76-34dd-47df-bd01-1f6e873584a0")
.build()
return HttpResponse.of(headers, HttpData.of(StandardCharsets.UTF_8, content))
}
"Ec2" | "AllocateAddress" | "POST" | "59dbff89-35bd-4eac-99ed-be587EXAMPLE" | Ec2Client.builder() | { c -> c.allocateAddress() } | """
Expand All @@ -223,10 +247,10 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
setup:
configureSdkClient(builder)
def client = builder
.endpointOverride(clientUri)
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
.build()
.endpointOverride(clientUri)
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
.build()

if (body instanceof Closure) {
server.enqueue(body.call())
Expand Down Expand Up @@ -280,6 +304,8 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
} else if (service == "Kinesis") {
"aws.stream.name" "somestream"
} else if (service == "Sns") {
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "somearn"
}
}
}
Expand Down Expand Up @@ -322,9 +348,9 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
}
"""
ResponseHeaders headers = ResponseHeaders.builder(HttpStatus.OK)
.contentType(MediaType.PLAIN_TEXT_UTF_8)
.add("x-amzn-RequestId", "7a62c49f-347e-4fc4-9331-6e8e7a96aa73")
.build()
.contentType(MediaType.PLAIN_TEXT_UTF_8)
.add("x-amzn-RequestId", "7a62c49f-347e-4fc4-9331-6e8e7a96aa73")
.build()
return HttpResponse.of(headers, HttpData.of(StandardCharsets.UTF_8, content))
}
"Sqs" | "SendMessage" | "POST" | "27daac76-34dd-47df-bd01-1f6e873584a0" | SqsAsyncClient.builder() | { c -> c.sendMessage(SendMessageRequest.builder().queueUrl(QUEUE_URL).messageBody("").build()) } | {
Expand All @@ -349,9 +375,9 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
}
"""
ResponseHeaders headers = ResponseHeaders.builder(HttpStatus.OK)
.contentType(MediaType.PLAIN_TEXT_UTF_8)
.add("x-amzn-RequestId", "27daac76-34dd-47df-bd01-1f6e873584a0")
.build()
.contentType(MediaType.PLAIN_TEXT_UTF_8)
.add("x-amzn-RequestId", "27daac76-34dd-47df-bd01-1f6e873584a0")
.build()
return HttpResponse.of(headers, HttpData.of(StandardCharsets.UTF_8, content))
}
"Ec2" | "AllocateAddress" | "POST" | "59dbff89-35bd-4eac-99ed-be587EXAMPLE" | Ec2AsyncClient.builder() | { c -> c.allocateAddress() } | """
Expand All @@ -366,7 +392,7 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
<ResponseMetadata><RequestId>0ac9cda2-bbf4-11d3-f92b-31fa5e8dbc99</RequestId></ResponseMetadata>
</DeleteOptionGroupResponse>
"""
"Sns" | "Publish" | "POST" | "f187a3c1-376f-11df-8963-01868b7c937a" | SnsAsyncClient.builder() | { SnsAsyncClient c -> c.publish(r -> r.message("hello")) } | """
"Sns" | "Publish" | "POST" | "f187a3c1-376f-11df-8963-01868b7c937a" | SnsAsyncClient.builder() | { SnsAsyncClient c -> c.publish(r -> r.message("hello").topicArn("somearn")) } | """
<PublishResponse xmlns="https://sns.amazonaws.com/doc/2010-03-31/">
<PublishResult>
<MessageId>94f20ce6-13c5-43a0-9a9e-ca52d816e90b</MessageId>
Expand All @@ -387,13 +413,13 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
server.enqueue(HttpResponse.delayed(HttpResponse.of(HttpStatus.OK), Duration.ofMillis(5000)))
server.enqueue(HttpResponse.delayed(HttpResponse.of(HttpStatus.OK), Duration.ofMillis(5000)))
def builder = S3Client.builder()
.overrideConfiguration(createOverrideConfigurationBuilder()
.retryPolicy(RetryPolicy.builder().numRetries(1).build())
.build())
.endpointOverride(clientUri)
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
.httpClientBuilder(ApacheHttpClient.builder().socketTimeout(Duration.ofMillis(50)))
.overrideConfiguration(createOverrideConfigurationBuilder()
.retryPolicy(RetryPolicy.builder().numRetries(1).build())
.build())
.endpointOverride(clientUri)
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
.httpClientBuilder(ApacheHttpClient.builder().socketTimeout(Duration.ofMillis(50)))

if (Boolean.getBoolean("testLatestDeps")) {
builder.forcePathStyle(true)
Expand Down
Loading