Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: origin detection with container ID field (dogstatsd 1.2) #188

Merged
merged 4 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ build

# rbenv for pimpmychangelog
.ruby-version

# vscode
.vscode/*
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved
89 changes: 89 additions & 0 deletions src/main/java/com/timgroup/statsd/ContainerID.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.timgroup.statsd;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* A reader class that retrieves the current container ID parsed from a the cgroup file.
*
*/
public class ContainerID {
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved
private static final Path CGROUP_PATH = Paths.get("/proc/self/cgroup");
private static final String UUID_SOURCE = "[0-9a-f]{8}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{12}";
private static final String CONTAINER_SOURCE = "[0-9a-f]{64}";
private static final String TASK_SOURCE = "[0-9a-f]{32}-\\d+";
private static final Pattern LINE_RE = Pattern.compile("(\\d+):([^:]*):(.+)$");
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved
private static final Pattern CONTAINER_RE =
Pattern.compile(
"(?:.+)?(" + UUID_SOURCE + "|" + CONTAINER_SOURCE + "|" + TASK_SOURCE + ")(?:.scope)?$");
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved

private static final ContainerID INSTANCE;

public String containerID;

public String getContainerID() {
return containerID;
}

public void setContainerID(String containerID) {
this.containerID = containerID;
}

static {
INSTANCE = ContainerID.read(CGROUP_PATH);
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved
}

public static ContainerID get() {
return INSTANCE;
}

static ContainerID read(Path path) {
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved
final String content;
try {
content = new String(Files.readAllBytes(path));
} catch (final IOException e) {
return new ContainerID();
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved
}

if (content.isEmpty()) {
return new ContainerID();
}

return parse(content);
}

/**
* Parses a Cgroup file content and returns the corresponding container ID.
*
* @param cgroupsContent
* Cgroup file content
*/
public static ContainerID parse(final String cgroupsContent) {
final ContainerID containerID = new ContainerID();
final String[] lines = cgroupsContent.split("\n");
for (final String line : lines) {
final Matcher matcher = LINE_RE.matcher(line);

if (!matcher.matches()) {
continue;
}

final String path = matcher.group(3);
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved
final String[] pathParts = path.split("/");

if (pathParts.length >= 1) {
final Matcher containerIDMatcher = CONTAINER_RE.matcher(pathParts[pathParts.length - 1]);
if (containerIDMatcher.matches()) {
containerID.setContainerID(containerIDMatcher.group(1));
return containerID;
}
}
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved
}

return containerID;
}
}
2 changes: 1 addition & 1 deletion src/main/java/com/timgroup/statsd/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected Message(String aspect, Message.Type type, String[] tags) {
* @param builder
* StringBuilder the text representation will be written to.
*/
abstract void writeTo(StringBuilder builder);
abstract void writeTo(StringBuilder builder, String containerID);

/**
* Aggregate message.
Expand Down
87 changes: 80 additions & 7 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
import java.text.DecimalFormatSymbols;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -54,6 +57,7 @@ public class NonBlockingStatsDClient implements StatsDClient {
public static final String DD_NAMED_PIPE_ENV_VAR = "DD_DOGSTATSD_PIPE_NAME";
public static final String DD_ENTITY_ID_ENV_VAR = "DD_ENTITY_ID";
private static final String ENTITY_ID_TAG_NAME = "dd.internal.entity_id" ;
public static final String ORIGIN_DETECTION_ENABLED_ENV_VAR = "DD_ORIGIN_DETECTION_ENABLED";

enum Literal {
SERVICE,
Expand Down Expand Up @@ -87,6 +91,7 @@ String tag() {
public static final boolean DEFAULT_ENABLE_TELEMETRY = true;

public static final boolean DEFAULT_ENABLE_AGGREGATION = true;
public static final boolean DEFAULT_ENABLE_ORIGIN_DETECTION = true;

public static final String CLIENT_TAG = "client:java";
public static final String CLIENT_VERSION_TAG = "client_version:";
Expand Down Expand Up @@ -214,6 +219,21 @@ protected static String format(ThreadLocal<NumberFormat> formatter, Number value
* Aggregation flush interval integer, in milliseconds. 0 disables aggregation.
* @param aggregationShards
* Aggregation flush interval integer, in milliseconds. 0 disables aggregation.
* @param containerID
* Allows passing the container ID, this will be used by the Agent to enrich
* metrics with container tags.
* This feature requires Datadog Agent version >=6.35.0 && <7.0.0 or Agent versions >=7.35.0.
* When configured, the provided container ID is prioritized over the container ID discovered
* via Origin Detection. When entityID or DD_ENTITY_ID are set, this value is ignored.
* @param originDetectionEnabled
* Enable/disable the client origin detection.
* This feature requires Datadog Agent version >=6.35.0 && <7.0.0 or Agent versions >=7.35.0.
* When enabled, the client tries to discover its container ID and sends it to the Agent
* to enrich the metrics with container tags.
* Origin detection can be disabled by configuring the environment variabe DD_ORIGIN_DETECTION_ENABLED=false
* The client tries to read the container ID by parsing the file /proc/self/cgroup.
* This is not supported on Windows.
* The client prioritizes the value passed via or entityID or DD_ENTITY_ID (if set) over the container ID.
* @throws StatsDClientException
* if the client could not be started
*/
Expand All @@ -222,7 +242,8 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
final Callable<SocketAddress> telemetryAddressLookup, final int timeout, final int bufferSize,
final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers,
final int senderWorkers, boolean blocking, final boolean enableTelemetry, final int telemetryFlushInterval,
final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory customThreadFactory)
final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory customThreadFactory,
String containerID, final boolean originDetectionEnabled)
throws StatsDClientException {

if ((prefix != null) && (!prefix.isEmpty())) {
Expand All @@ -245,7 +266,7 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
}
}
// Support "dd.internal.entity_id" internal tag.
updateTagsWithEntityID(costantPreTags, entityID);
final boolean hasEntityID = updateTagsWithEntityID(costantPreTags, entityID);
for (final Literal literal : Literal.values()) {
final String envVal = literal.envVal();
if (envVal != null && !envVal.trim().isEmpty()) {
Expand All @@ -259,6 +280,13 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
costantPreTags.toArray(new String[costantPreTags.size()]), null, new StringBuilder()).toString();
}
costantPreTags = null;
// Origin detection
if (hasEntityID) {
containerID = null;
} else {
boolean originEnabled = isOriginDetectionEnabled(containerID, originDetectionEnabled, hasEntityID);
containerID = getContainerID(containerID, originEnabled);
}
}

try {
Expand Down Expand Up @@ -308,6 +336,9 @@ private NonBlockingStatsDClient(final String prefix, final int queueSize, final
statsDProcessor.setTelemetry(this.telemetry);
statsDSender.setTelemetry(this.telemetry);

// set container ID
statsDProcessor.setContainerID(containerID);
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved

} catch (final Exception e) {
throw new StatsDClientException("Failed to start StatsD client", e);
}
Expand Down Expand Up @@ -341,7 +372,8 @@ public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder) thr
builder.bufferPoolSize, builder.processorWorkers, builder.senderWorkers,
builder.blocking, builder.enableTelemetry, builder.telemetryFlushInterval,
(builder.enableAggregation ? builder.aggregationFlushInterval : 0),
builder.aggregationShards, builder.threadFactory);
builder.aggregationShards, builder.threadFactory, builder.containerID,
builder.originDetectionEnabled);
}

protected StatsDProcessor createProcessor(final int queueSize, final StatsDClientErrorHandler handler,
Expand Down Expand Up @@ -466,14 +498,17 @@ protected StatsDMessage(String aspect, Message.Type type, T value, double sample
}

@Override
public final void writeTo(StringBuilder builder) {
public final void writeTo(StringBuilder builder, String containerID) {
builder.append(prefix).append(aspect).append(':');
writeValue(builder);
builder.append('|').append(type);
if (!Double.isNaN(sampleRate)) {
builder.append('|').append('@').append(format(SAMPLE_RATE_FORMATTER, sampleRate));
}
tagString(this.tags, builder);
if (containerID != null && !containerID.isEmpty()) {
builder.append("|c:").append(containerID);
}

builder.append('\n');
}
Expand Down Expand Up @@ -1019,7 +1054,7 @@ private StringBuilder eventMap(final Event event, StringBuilder res) {
@Override
public void recordEvent(final Event event, final String... eventTags) {
statsDProcessor.send(new AlphaNumericMessage(Message.Type.EVENT, "") {
@Override public void writeTo(StringBuilder builder) {
@Override public void writeTo(StringBuilder builder, String containerID) {
final String title = escapeEventString(prefix + event.getTitle());
final String text = escapeEventString(event.getText());
builder.append(Message.Type.EVENT.toString())
Expand All @@ -1037,6 +1072,9 @@ public void recordEvent(final Event event, final String... eventTags) {

eventMap(event, builder);
tagString(eventTags, builder);
if (containerID != null && !containerID.isEmpty()) {
builder.append("|c:").append(containerID);
}

builder.append('\n');
}
Expand Down Expand Up @@ -1071,7 +1109,7 @@ private int getUtf8Length(final String text) {
@Override
public void recordServiceCheckRun(final ServiceCheck sc) {
statsDProcessor.send(new AlphaNumericMessage(Message.Type.SERVICE_CHECK, "") {
@Override public void writeTo(StringBuilder sb) {
@Override public void writeTo(StringBuilder sb, String containerID) {
// see http://docs.datadoghq.com/guides/dogstatsd/#service-checks
sb.append(Message.Type.SERVICE_CHECK.toString())
.append("|")
Expand All @@ -1088,6 +1126,9 @@ public void recordServiceCheckRun(final ServiceCheck sc) {
if (sc.getMessage() != null) {
sb.append("|m:").append(sc.getEscapedMessage());
}
if (containerID != null && !containerID.isEmpty()) {
sb.append("|c:").append(containerID);
}

sb.append('\n');
}
Expand Down Expand Up @@ -1154,11 +1195,14 @@ protected void writeValue(StringBuilder builder) {
builder.append(getValue());
}

@Override protected final void writeTo(StringBuilder builder) {
@Override protected final void writeTo(StringBuilder builder, String containerID) {
builder.append(prefix).append(aspect).append(':');
writeValue(builder);
builder.append('|').append(type);
tagString(this.tags, builder);
if (containerID != null && !containerID.isEmpty()) {
builder.append("|c:").append(containerID);
}

builder.append('\n');
}
Expand All @@ -1169,5 +1213,34 @@ protected boolean isInvalidSample(double sampleRate) {
return sampleRate != 1 && ThreadLocalRandom.current().nextDouble() > sampleRate;
}

protected boolean isOriginDetectionEnabled(String containerID, boolean originDetectionEnabled, boolean hasEntityID) {
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved
if (!originDetectionEnabled || hasEntityID || (containerID != null && !containerID.isEmpty())) {
// origin detection is explicitly disabled
// or DD_ENTITY_ID was found
// or a user-defined container ID was provided
return false;
}

final String value = System.getenv(ORIGIN_DETECTION_ENABLED_ENV_VAR);
if (value != null && !value.trim().isEmpty()) {
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved
Set<String> falseValues = new HashSet<String>(Arrays.asList("no", "false", "0", "n", "off"));
return !falseValues.contains(value.toLowerCase());
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved
}

// DD_ORIGIN_DETECTION_ENABLED is not set
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved
// default to true
return true;
}

protected String getContainerID(String containerID, boolean originDetectionEnabled) {
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved
if (containerID != null && !containerID.isEmpty()) {
return containerID;
}

if (originDetectionEnabled) {
return ContainerID.get().getContainerID();
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class NonBlockingStatsDClientBuilder implements Cloneable {
public int telemetryFlushInterval = Telemetry.DEFAULT_FLUSH_INTERVAL;
public int aggregationFlushInterval = StatsDAggregator.DEFAULT_FLUSH_INTERVAL;
public int aggregationShards = StatsDAggregator.DEFAULT_SHARDS;
public boolean originDetectionEnabled = NonBlockingStatsDClient.DEFAULT_ENABLE_ORIGIN_DETECTION;

public Callable<SocketAddress> addressLookup;
public Callable<SocketAddress> telemetryAddressLookup;
Expand All @@ -42,6 +43,7 @@ public class NonBlockingStatsDClientBuilder implements Cloneable {
public String prefix;
public String entityID;
public String[] constantTags;
public String containerID;

public StatsDClientErrorHandler errorHandler;
public ThreadFactory threadFactory;
Expand Down Expand Up @@ -173,6 +175,16 @@ public NonBlockingStatsDClientBuilder threadFactory(ThreadFactory val) {
return this;
}

public NonBlockingStatsDClientBuilder containerID(String val) {
containerID = val;
return this;
}

public NonBlockingStatsDClientBuilder originDetectionEnabled(boolean val) {
originDetectionEnabled = val;
return this;
}

/**
* NonBlockingStatsDClient factory method.
* @return the built NonBlockingStatsDClient.
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/com/timgroup/statsd/StatsDProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public abstract class StatsDProcessor {
protected volatile boolean shutdown;
volatile boolean shutdownAgg;

protected String containerID;
ahmed-mez marked this conversation as resolved.
Show resolved Hide resolved

protected abstract class ProcessingTask implements Runnable {
protected StringBuilder builder = new StringBuilder();
protected CharBuffer buffer = CharBuffer.wrap(builder);
Expand Down Expand Up @@ -97,7 +99,7 @@ protected void processLoop() {
}

builder.setLength(0);
message.writeTo(builder);
message.writeTo(builder, containerID);
int lowerBoundSize = builder.length();

if (sendBuffer.capacity() < lowerBoundSize) {
Expand Down Expand Up @@ -215,6 +217,10 @@ public Telemetry getTelemetry() {
return telemetry;
}

public void setContainerID(final String containerID) {
this.containerID = containerID;
}

void shutdown(boolean blocking) throws InterruptedException {
shutdown = true;
aggregator.stop();
Expand Down
Loading