Skip to content

Commit

Permalink
optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
lmolkova committed Jan 4, 2024
1 parent 40c61e1 commit 4de4bff
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 19 deletions.
2 changes: 1 addition & 1 deletion sdk/template/azure-template-stress/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ WORKDIR /app
COPY --from=builder /stress/sdk/template/azure-template-stress/target .

# Configure monitoring
ARG APPLICATION_INSIGHTS_AGENT_VERSION=3.4.18
ARG APPLICATION_INSIGHTS_AGENT_VERSION=3.4.19
ARG AGENT_URL=/~https://github.com/microsoft/ApplicationInsights-Java/releases/download/${APPLICATION_INSIGHTS_AGENT_VERSION}/applicationinsights-agent-${APPLICATION_INSIGHTS_AGENT_VERSION}.jar
ADD ${AGENT_URL} ./applicationinsights-agent.jar

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
import com.azure.core.http.policy.UserAgentPolicy;
import com.azure.core.util.Context;
import com.azure.core.util.HttpClientOptions;
import com.azure.core.util.logging.ClientLogger;
import com.azure.sdk.template.stress.util.TelemetryHelper;
import reactor.core.publisher.Mono;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;

/**
Expand All @@ -35,22 +38,28 @@
public class HttpGet extends ScenarioBase<StressOptions> {
// there will be multiple instances of scenario
private final static TelemetryHelper TELEMETRY_HELPER = new TelemetryHelper(HttpGet.class);
private final static ClientLogger LOGGER = new ClientLogger(HttpGet.class);
private final HttpPipeline pipeline;

private final URL url;
public HttpGet(StressOptions options) {
super(options, TELEMETRY_HELPER);
pipeline = getPipelineBuilder().build();
try {
url = new URL(options.getServiceEndpoint());
} catch (MalformedURLException ex) {
throw LOGGER.logThrowableAsError(new IllegalArgumentException("'url' must be a valid URL.", ex));
}
}

@Override
public void run() {
TELEMETRY_HELPER.instrumentRun(ctx -> runInternal(ctx));
TELEMETRY_HELPER.instrumentRun(this::runInternal);
}

private void runInternal(Context context) {
HttpRequest request = new HttpRequest(HttpMethod.GET, options.getServiceEndpoint());
private void runInternal() {
HttpRequest request = new HttpRequest(HttpMethod.GET, url);
// no need to handle exceptions here, they will be handled (and recorded) by the telemetry helper
HttpResponse response = pipeline.sendSync(request, context);
HttpResponse response = pipeline.sendSync(request, Context.NONE);
response.buffer().close();
response.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class TelemetryHelper {
private final String scenarioName;
private final Meter meter;
private final DoubleHistogram runDuration;
private final Attributes commonAttributes;
private final Attributes canceledAttributes;

static {
// enables micrometer metrics from Reactor schedulers allowing to monitor thread pool usage and starvation
Expand All @@ -46,14 +48,16 @@ public TelemetryHelper(Class<?> scenarioClass) {
this.runDuration = meter.histogramBuilder("test.run.duration")
.setUnit("s")
.build();
this.commonAttributes = Attributes.of(SCENARIO_NAME_ATTRIBUTE, scenarioName);
this.canceledAttributes = Attributes.of(SCENARIO_NAME_ATTRIBUTE, scenarioName, ERROR_TYPE_ATTRIBUTE, "cancelled");
}

@SuppressWarnings("try")
public void instrumentRun(Consumer<Context> oneRun) {
public void instrumentRun(Runnable oneRun) {
Instant start = Instant.now();
Span span = tracer.spanBuilder("run").startSpan();
try (Scope s = span.makeCurrent()) {
oneRun.accept(new Context(com.azure.core.util.tracing.Tracer.PARENT_TRACE_CONTEXT_KEY, io.opentelemetry.context.Context.current()));
oneRun.run();
trackSuccess(start, span);
} catch (Throwable e) {
if (e.getMessage().contains("Timeout on blocking read") || e instanceof InterruptedException || e instanceof TimeoutException) {
Expand Down Expand Up @@ -82,18 +86,16 @@ private void trackSuccess(Instant start, Span span) {
logger.atInfo()
.log("run ended");

Attributes attributes = Attributes.of(SCENARIO_NAME_ATTRIBUTE, scenarioName);
runDuration.record((Instant.now().toEpochMilli() - start.toEpochMilli())/1000d, attributes, io.opentelemetry.context.Context.current().with(span));
runDuration.record((Instant.now().toEpochMilli() - start.toEpochMilli())/1000d, commonAttributes);
span.end();
}

private void trackCancellation(Instant start, Span span) {
logger.atWarning()
.addKeyValue("error.type", "cancelled")
.log("run ended");
Attributes attributes = Attributes.of(SCENARIO_NAME_ATTRIBUTE, scenarioName, ERROR_TYPE_ATTRIBUTE, "cancelled");

runDuration.record((Instant.now().toEpochMilli() - start.toEpochMilli())/1000d, attributes, io.opentelemetry.context.Context.current().with(span));
runDuration.record((Instant.now().toEpochMilli() - start.toEpochMilli())/1000d, canceledAttributes);
span.setAttribute(ERROR_TYPE_ATTRIBUTE, "cancelled");
span.setStatus(StatusCode.ERROR);
span.end();
Expand All @@ -111,7 +113,7 @@ private void trackFailure(Instant start, Throwable e, Span span) {
.log("run ended", unwrapped);

Attributes attributes = Attributes.of(SCENARIO_NAME_ATTRIBUTE, scenarioName, ERROR_TYPE_ATTRIBUTE, errorType);
runDuration.record((Instant.now().toEpochMilli() - start.toEpochMilli())/1000d, attributes, io.opentelemetry.context.Context.current().with(span));
runDuration.record((Instant.now().toEpochMilli() - start.toEpochMilli())/1000d, attributes);
span.end();
}

Expand All @@ -130,7 +132,7 @@ public void recordStart(StressOptions options) {
}

Span before = startSampledInSpan("before run");
before.setAttribute(AttributeKey.longKey("durationMin"), options.getDuration());
before.setAttribute(AttributeKey.longKey("durationSec"), options.getDuration());
before.setAttribute(AttributeKey.stringKey("scenarioName"), scenarioName);
before.setAttribute(AttributeKey.longKey("concurrency"), options.getParallel());
before.setAttribute(AttributeKey.stringKey("libraryPackageVersion"), libraryPackageVersion);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
{
"sampling": {
"percentage": 1
"percentage": 0.01
},
"instrumentation": {
"azureSdk": {
"enabled": true
"enabled": false
},
"reactor": {
"enabled": false
},
"logging": {
"level": "INFO"
"level": "WARN"
}
},
"preview": {
Expand All @@ -30,4 +30,4 @@
]
}
}
}
}
2 changes: 2 additions & 0 deletions sdk/template/azure-template-stress/templates/job.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ spec:
export APPLICATIONINSIGHTS_ROLE_NAME={{ .Release.Name }}-{{ .Stress.BaseName }} &&
java -javaagent:applicationinsights-agent.jar \
-Dreactor.schedulers.defaultBoundedElasticSize={{ max 20 .Stress.concurrency }} \
-Dotel.instrumentation.okhttp.enabled=false \
-Dotel.instrumentation.http-url-connection.enabled=false \
-jar /app/azure-template-stress-1.0.0-beta.1-jar-with-dependencies.jar \
{{ .Stress.testScenario }} \
--parallel {{ .Stress.concurrency }} \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
"type": 3,
"content": {
"version": "KqlItem/1.0",
"query": "let runId = \"{runId}\";\r\nlet roleName = strcat(\"java-template-\", runId);\r\nlet metrics = customMetrics\r\n| where timestamp >= {timeRange:start} and timestamp <= {timeRange:end}\r\n| where cloud_RoleName == roleName;\r\nlet testSpans = dependencies\r\n| where timestamp >= {timeRange:start} and timestamp <= {timeRange:end}\r\n| where cloud_RoleName == roleName;\r\nlet errors = metrics\r\n| where name == \"test.run.duration\"\r\n| extend errorType = tostring(customDimensions[\"error.type\"])\r\n| summarize error_by_type=sum(valueCount) by errorType\r\n| summarize test_errors=make_bag(bag_pack(errorType, error_by_type))\r\n| evaluate narrow();\r\nlet runs = metrics \r\n| where name == \"test.run.duration\" \r\n| summarize successful_runs=sumif(valueCount, customDimensions[\"error.type\"] == \"\"), total_runs=sum(valueCount)\r\n| evaluate narrow();\r\nlet parameters = testSpans \r\n| where name == \"before run\"\r\n| project params_pod=customDimensions[\"hostname\"], params_scenarioName=customDimensions[\"scenarioName\"], params_durationMin=customDimensions[\"durationMin\"], params_concurrency=customDimensions[\"concurrency\"], params_sync=customDimensions[\"sync\"], params_httpClient=customDimensions[\"httpClientProvider\"]\r\n| evaluate narrow();\r\nparameters \r\n| union runs, errors\r\n| project Property = Column, Value\r\n",
"query": "let runId = \"{runId}\";\r\nlet roleName = strcat(\"java-template-\", runId);\r\nlet metrics = customMetrics\r\n| where timestamp >= {timeRange:start} and timestamp <= {timeRange:end}\r\n| where cloud_RoleName == roleName;\r\nlet testSpans = dependencies\r\n| where timestamp >= {timeRange:start} and timestamp <= {timeRange:end}\r\n| where cloud_RoleName == roleName;\r\nlet errors = metrics\r\n| where name == \"test.run.duration\"\r\n| extend errorType = tostring(customDimensions[\"error.type\"])\r\n| summarize error_by_type=sum(valueCount) by errorType\r\n| summarize test_errors=make_bag(bag_pack(errorType, error_by_type))\r\n| evaluate narrow();\r\nlet runs = metrics \r\n| where name == \"test.run.duration\" \r\n| summarize successful_runs=sumif(valueCount, customDimensions[\"error.type\"] == \"\"), total_runs=sum(valueCount)\r\n| evaluate narrow();\r\nlet parameters = testSpans \r\n| where name == \"before run\"\r\n| project params_pod=customDimensions[\"hostname\"], params_scenarioName=customDimensions[\"scenarioName\"], params_durationSec=customDimensions[\"durationSec\"], params_concurrency=customDimensions[\"concurrency\"], params_sync=customDimensions[\"sync\"], params_httpClient=customDimensions[\"httpClientProvider\"]\r\n| evaluate narrow();\r\nparameters \r\n| union runs, errors\r\n| project Property = Column, Value\r\n",
"size": 0,
"showAnalytics": true,
"title": "Test summary",
Expand Down

0 comments on commit 4de4bff

Please sign in to comment.