Skip to content

Commit

Permalink
Tracing: end HTTP spans when body is fully consumed, add missing meth…
Browse files Browse the repository at this point in the history
…ods for OpenAI instrumentation (Azure#39381)

* Tracing fixes, add isRecording, add setAttribute(String, Object, stream reading fix
  • Loading branch information
lmolkova authored Mar 27, 2024
1 parent 15f7c58 commit b8a9d8c
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 60 deletions.
2 changes: 2 additions & 0 deletions sdk/core/azure-core-tracing-opentelemetry/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Fixed unreliable HTTP span reporting when response is not closed.

### Other Changes

## 1.0.0-beta.44 (2024-03-01)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public Context start(String spanName, StartSpanOptions options, Context context)
return startSuppressedSpan(context);
}
context = unsuppress(context);
if (spanKind == SpanKind.INTERNAL && !context.getData(CLIENT_METHOD_CALL_FLAG).isPresent()) {
if (isInternalOrClientSpan(spanKind) && !context.getData(CLIENT_METHOD_CALL_FLAG).isPresent()) {
context = context.addData(CLIENT_METHOD_CALL_FLAG, true);
}

Expand Down Expand Up @@ -184,6 +184,9 @@ private SpanBuilder createSpanBuilder(String spanName, StartSpanOptions options,
return spanBuilder;
}

/**
* {@inheritDoc}
*/
@Override
public void injectContext(BiConsumer<String, String> headerSetter, Context context) {
io.opentelemetry.context.Context otelContext = getTraceContextOrDefault(context, null);
Expand All @@ -192,6 +195,9 @@ public void injectContext(BiConsumer<String, String> headerSetter, Context conte
}
}

/**
* {@inheritDoc}
*/
@Override
public void setAttribute(String key, long value, Context context) {
Objects.requireNonNull(context, "'context' cannot be null");
Expand Down Expand Up @@ -235,6 +241,28 @@ public void setAttribute(String key, String value, Context context) {
}
}

/**
* {@inheritDoc}
*/
@Override
public void setAttribute(String key, Object value, Context context) {
Objects.requireNonNull(value, "'value' cannot be null");
Objects.requireNonNull(context, "'context' cannot be null");

if (!isEnabled) {
return;
}

final Span span = getSpanOrNull(context);
if (span == null) {
return;
}

if (span.isRecording()) {
OpenTelemetryUtils.addAttribute(span, key, value);
}
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -267,6 +295,24 @@ public Context extractContext(Function<String, String> headerGetter) {
return new Context(SPAN_CONTEXT_KEY, Span.fromContext(traceContext).getSpanContext());
}

/**
* {@inheritDoc}
*/
@Override
public boolean isRecording(Context context) {
Objects.requireNonNull(context, "'context' cannot be null");
if (!isEnabled) {
return false;
}

Span span = getSpanOrNull(context);
if (span != null) {
return span.isRecording();
}

return false;
}

private static class Getter implements TextMapGetter<Function<String, String>> {

public static final TextMapGetter<Function<String, String>> INSTANCE = new Getter();
Expand Down Expand Up @@ -420,4 +466,8 @@ private static TracerProvider getTracerProvider(TracingOptions options) {

return GlobalOpenTelemetry.getTracerProvider();
}

private static boolean isInternalOrClientSpan(SpanKind kind) {
return kind == SpanKind.INTERNAL || kind == SpanKind.CLIENT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void openTelemetryHttpPolicyTest() {
request.setHeader(HttpHeaderName.USER_AGENT, "user-agent");

try (Scope scope = parentSpan.makeCurrent()) {
createHttpPipeline(azTracer).send(request, tracingContext).block().close();
createHttpPipeline(azTracer).send(request, tracingContext).block().getBodyAsBinaryData();
}
// Assert
List<SpanData> exportedSpans = exporter.getFinishedSpanItems();
Expand Down Expand Up @@ -181,8 +181,9 @@ public String getDescription() {
public void clientRequestIdIsStamped() {
try (Scope scope = tracer.spanBuilder("test").startSpan().makeCurrent()) {
HttpRequest request = new HttpRequest(HttpMethod.PUT, "https://httpbin.org/hello?there#otel");
HttpResponse response = createHttpPipeline(azTracer, new RequestIdPolicy()).send(request).block();
response.close();
HttpResponse response = createHttpPipeline(azTracer, new RequestIdPolicy()).send(request)
.flatMap(r -> r.getBodyAsByteArray().thenReturn(r))
.block();

// Assert
List<SpanData> exportedSpans = exporter.getFinishedSpanItems();
Expand Down Expand Up @@ -293,8 +294,8 @@ public void endStatusDependingOnStatusCode(int statusCode, StatusCode status) {

StepVerifier.create(pipeline.send(new HttpRequest(HttpMethod.GET, "http://localhost/hello")))
.assertNext(response -> {
response.getBodyAsByteArray().block();
assertEquals(statusCode, response.getStatusCode());
response.close();
})
.verifyComplete();

Expand Down Expand Up @@ -366,12 +367,10 @@ public void timeoutIsTraced() {
= new Context(PARENT_TRACE_CONTEXT_KEY, io.opentelemetry.context.Context.root().with(parentSpan))
.addData("az.namespace", "foo");

StepVerifier.create(pipeline.send(new HttpRequest(HttpMethod.GET, "http://localhost/hello"), tracingContext))
.assertNext(response -> {
StepVerifier.create(pipeline.send(new HttpRequest(HttpMethod.GET, "http://localhost/hello"), tracingContext)
.flatMap(r -> r.getBody().collectList().thenReturn(r))).assertNext(response -> {
assertEquals(200, response.getStatusCode());
response.close();
})
.verifyComplete();
}).verifyComplete();

List<SpanData> exportedSpans = exporter.getFinishedSpanItems();
assertEquals(2, exportedSpans.size());
Expand All @@ -398,11 +397,8 @@ public void connectionErrorAfterResponseCodeIsTraced() {
.tracer(azTracer)
.build();

StepVerifier
.create(pipeline.send(new HttpRequest(HttpMethod.GET, "http://localhost/hello"), Context.NONE)
.flatMap(response -> response.getBodyAsInputStream().doFinally(i -> response.close())))
.expectError(IOException.class)
.verify();
StepVerifier.create(pipeline.send(new HttpRequest(HttpMethod.GET, "http://localhost/hello"), Context.NONE)
.flatMap(response -> response.getBodyAsInputStream())).expectError(IOException.class).verify();

List<SpanData> exportedSpans = exporter.getFinishedSpanItems();
assertEquals(1, exportedSpans.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import reactor.core.Exceptions;

import java.io.IOException;
Expand Down Expand Up @@ -285,6 +286,28 @@ public void startWithRemoteParent() {
assertEquals(SpanKind.CONSUMER, spanData.getKind());
}

@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testIsRecording(boolean isRecording) {
// Arrange
SpanContext remoteParent
= SpanContext.create(IdGenerator.random().generateTraceId(), IdGenerator.random().generateSpanId(),
isRecording ? TraceFlags.getSampled() : TraceFlags.getDefault(), TraceState.getDefault());
StartSpanOptions options = new StartSpanOptions(com.azure.core.util.tracing.SpanKind.CLIENT)
.setRemoteParent(new Context(SPAN_CONTEXT_KEY, remoteParent));

// Act
final Context span = openTelemetryTracer.start(METHOD_NAME, options, Context.NONE);

// Assert
assertEquals(isRecording, openTelemetryTracer.isRecording(span));
}

@Test
public void testIsRecordingNoSpan() {
assertFalse(openTelemetryTracer.isRecording(Context.NONE));
}

@Test
@SuppressWarnings("deprecation")
public void startSpanProcessKindSend() {
Expand Down Expand Up @@ -462,7 +485,6 @@ public void startConsumeSpanWitStartTimeInContext() {
@Test
@SuppressWarnings("deprecation")
public void startSpanOverloadNullPointerException() {

// Assert
assertThrows(NullPointerException.class, () -> openTelemetryTracer.start("", Context.NONE, null));
}
Expand All @@ -476,23 +498,19 @@ public void startSpanInvalid() {
new StartSpanOptions(com.azure.core.util.tracing.SpanKind.CONSUMER), Context.NONE));
assertThrows(NullPointerException.class, () -> openTelemetryTracer.start("span",
new StartSpanOptions(com.azure.core.util.tracing.SpanKind.CONSUMER), null));

}

@Test
@SuppressWarnings("deprecation")
public void addLinkTest() {
// Arrange
StartSpanOptions spanBuilder = new StartSpanOptions(com.azure.core.util.tracing.SpanKind.INTERNAL);
Span toLinkSpan = tracer.spanBuilder("new test span").startSpan();

Context linkContext = new Context(SPAN_CONTEXT_KEY, toLinkSpan.getSpanContext());
LinkData expectedLink = LinkData.create(toLinkSpan.getSpanContext());

StartSpanOptions startOptions
= new StartSpanOptions(com.azure.core.util.tracing.SpanKind.INTERNAL).addLink(new TracingLink(linkContext));
// Act
openTelemetryTracer.addLink(linkContext.addData(SPAN_BUILDER_KEY, spanBuilder));

Context span = openTelemetryTracer.start(METHOD_NAME, spanBuilder, Context.NONE);
Context span = openTelemetryTracer.start(METHOD_NAME, startOptions, Context.NONE);
ReadableSpan span1 = getSpan(span);

// Assert
Expand Down Expand Up @@ -886,6 +904,36 @@ public void startSpanWithAttributes() {
verifySpanAttributes(expectedAttributes, span.toSpanData().getAttributes());
}

@Test
public void spanAttributes() {
Map<String, Object> attributes = new HashMap<>();
attributes.put("S", "foo");
attributes.put("I", 1);
attributes.put("L", 10L);
attributes.put("D", 0.1d);
attributes.put("B", true);
attributes.put("S[]", new String[] { "foo" });
attributes.put("L[]", new long[] { 10L });
attributes.put("D[]", new double[] { 0.1d });
attributes.put("B[]", new boolean[] { true });
attributes.put("I[]", new int[] { 1 });
attributes.put("Complex", Collections.singletonMap("key", "value"));

Attributes expectedAttributes = Attributes.builder()
.put("S", "foo")
.put("L", 10L)
.put("I", 1)
.put("D", 0.1d)
.put("B", true)
.put("az.namespace", AZ_NAMESPACE_VALUE)
.build();

Context spanCtx = openTelemetryTracer.start(METHOD_NAME, tracingContext);
attributes.forEach((key, value) -> openTelemetryTracer.setAttribute(key, value, spanCtx));
final ReadableSpan span = getSpan(spanCtx);
verifySpanAttributes(expectedAttributes, span.toSpanData().getAttributes());
}

@Test
public void suppressNestedClientSpan() {
Context outer = openTelemetryTracer.start("outer", Context.NONE);
Expand All @@ -909,9 +957,9 @@ public void suppressNestedClientSpan() {
}

@Test
@SuppressWarnings("deprecation")
public void suppressNestedInterleavedClientSpan() {
Context outer = openTelemetryTracer.start("outer", Context.NONE, com.azure.core.util.tracing.ProcessKind.SEND);
Context outer = openTelemetryTracer.start("outer",
new StartSpanOptions(com.azure.core.util.tracing.SpanKind.CONSUMER), Context.NONE);

Context inner1NotSuppressed = openTelemetryTracer.start("innerSuppressed", outer);
Context inner2Suppressed = openTelemetryTracer.start("innerSuppressed", inner1NotSuppressed);
Expand Down Expand Up @@ -940,7 +988,7 @@ public void suppressNestedInterleavedClientSpanWithOptions() {

Context inner1Suppressed = openTelemetryTracer.start("innerSuppressed", outer);
Context inner1NotSuppressed = openTelemetryTracer.start("innerNotSuppressed",
new StartSpanOptions(com.azure.core.util.tracing.SpanKind.CLIENT), inner1Suppressed);
new StartSpanOptions(com.azure.core.util.tracing.SpanKind.PRODUCER), inner1Suppressed);
Context inner2Suppressed = openTelemetryTracer.start("innerSuppressed", inner1NotSuppressed);

openTelemetryTracer.end("ok", null, inner2Suppressed);
Expand Down Expand Up @@ -1009,7 +1057,7 @@ public static Stream<Arguments> spanKinds() {
Arguments.of(com.azure.core.util.tracing.SpanKind.CLIENT, com.azure.core.util.tracing.SpanKind.CLIENT,
false),
Arguments.of(com.azure.core.util.tracing.SpanKind.CLIENT, com.azure.core.util.tracing.SpanKind.INTERNAL,
false),
true),
Arguments.of(com.azure.core.util.tracing.SpanKind.CLIENT, com.azure.core.util.tracing.SpanKind.PRODUCER,
false),
Arguments.of(com.azure.core.util.tracing.SpanKind.CLIENT, com.azure.core.util.tracing.SpanKind.CONSUMER,
Expand Down
2 changes: 2 additions & 0 deletions sdk/core/azure-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added

- Added new methods on `com.azure.core.util.tracing.Tracer` - `isRecording` and `addAttribute(String, Object, Context)`.

### Breaking Changes

### Bugs Fixed
Expand Down
Loading

0 comments on commit b8a9d8c

Please sign in to comment.