Skip to content
This repository has been archived by the owner on Mar 15, 2021. It is now read-only.

Commit

Permalink
Cardinality detection and limiting implementation (#186)
Browse files Browse the repository at this point in the history
Cardinality detection and limiting implementation
  • Loading branch information
Sergey Rustamov authored Jan 30, 2020
1 parent 795e333 commit 32a8cc3
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package com.spotify.ffwd.statistics;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
Expand All @@ -32,6 +33,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class SemanticCoreStatistics implements CoreStatistics {
private static final int HISTOGRAM_TTL_MINUTES = 2;
Expand Down Expand Up @@ -84,28 +86,45 @@ public void reportMetricsDroppedByFilter(int dropped) {
@Override
public OutputManagerStatistics newOutputManager() {
final MetricId m = metric.tagged("component", "output-manager");
final AtomicLong metricsCardinality = new AtomicLong();

return new OutputManagerStatistics() {
private final Meter sentMetrics =
registry.meter(m.tagged("what", "sent-metrics", "unit", "metric"));
private final Meter metricsDroppedByFilter =
registry.meter(m.tagged("what", "metrics-dropped-by-filter", "unit", "metric"));
private final Meter metricsDroppedByRateLimit =
registry.meter(m.tagged("what", "metrics-dropped-by-ratelimit", "unit", "metric"));
private final Counter sentMetrics =
registry.counter(m.tagged("what", "sent-metrics", "unit", "metric"));
private final Counter metricsDroppedByFilter =
registry.counter(m.tagged("what", "metrics-dropped-by-filter", "unit", "metric"));
private final Counter metricsDroppedByRateLimit =
registry.counter(m.tagged("what", "metrics-dropped-by-ratelimit", "unit", "metric"));
private final Counter metricsDroppedByCardinalityLimit =
registry.counter(m.tagged("what", "metrics-dropped-by-cardlimit", "unit", "metric"));

private final Gauge metricsCardinalityMetric =
registry.register(m.tagged("what", "metrics-cardinality"),
(Gauge<Long>) () -> (long) metricsCardinality.get());

@Override
public void reportSentMetrics(int sent) {
sentMetrics.mark(sent);
sentMetrics.inc(sent);
}

@Override
public void reportMetricsDroppedByFilter(int dropped) {
metricsDroppedByFilter.mark(dropped);
metricsDroppedByFilter.inc(dropped);
}

@Override
public void reportMetricsDroppedByRateLimit(int dropped) {
metricsDroppedByRateLimit.mark(dropped);
metricsDroppedByRateLimit.inc(dropped);
}

@Override
public void reportMetricsDroppedByCardinalityLimit(int dropped) {
metricsDroppedByCardinalityLimit.inc(dropped);
}

@Override
public void reportMetricsCardinality(long cardinality) {
metricsCardinality.set(cardinality);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,15 @@ public void testConfMixedPluginsEnabled() {
public void testConfigFromEnvVars() {
environmentVariables.set("FFWD_TTL", "100");
environmentVariables.set("FFWD_OUTPUT_RATELIMIT", "1000");
environmentVariables.set("FFWD_OUTPUT_CARDINALITYLIMIT", "10000");
environmentVariables.set("FFWD_OUTPUT_CARDINALITYTTL", "3000000");

CoreOutputManager outputManager = getOutputManager(null);

assertEquals(100, outputManager.getTtl());
assertEquals(Long.valueOf(1000), outputManager.getRateLimit());
assertEquals(Long.valueOf(10000), outputManager.getCardinalityLimit());
assertEquals(Long.valueOf(3000000), outputManager.getHLLPRefreshPeriodLimit());
}

@Test
Expand All @@ -112,6 +116,8 @@ public void testMergeOrder() {

assertEquals(100, outputManager.getTtl());
assertEquals("jimjam", outputManager.getHost());
assertEquals(Long.valueOf(10000), outputManager.getCardinalityLimit());
assertEquals(Long.valueOf(555555), outputManager.getHLLPRefreshPeriodLimit());
}

private CoreOutputManager getOutputManager(final Path configPath) {
Expand Down
5 changes: 4 additions & 1 deletion agent/src/test/resources/basic-settings.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
ttl: 50
host: jimjam
host: jimjam
output:
cardinalitylimit: 10000
cardinalityttl: 555555
2 changes: 2 additions & 0 deletions api/src/main/java/com/spotify/ffwd/model/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import java.util.Map;
import java.util.Optional;
import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@EqualsAndHashCode(of = {"commonTags", "commonResource"})
public class Batch {
private final Map<String, String> commonTags;
private final Map<String, String> commonResource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,22 @@ default void reportMetricsDroppedByFilter(int dropped) {
*/
default void reportMetricsDroppedByRateLimit(int dropped) {
}

/**
* Reported that the given number of metrics were dropped due to a cardinality limit.
* <p>
* Dropped metrics are <em>not</em> sent to output plugins.
*
* @param dropped The number of dropped metrics.
*/
default void reportMetricsDroppedByCardinalityLimit(int dropped) {
}

/**
* Report current cardinality of metrics sent to output plugins.
*
* @param cardinality The cardinality number of metrics sent.
*/
default void reportMetricsCardinality(long cardinality) {
}
}
23 changes: 23 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@
<artifactId>snappy-java</artifactId>
</dependency>

<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
</dependency>

<!-- testing -->
<dependency>
<groupId>junit</groupId>
Expand All @@ -99,5 +104,23 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<!-- logging -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<scope>test</scope>
</dependency>

</dependencies>
</project>
106 changes: 96 additions & 10 deletions core/src/main/java/com/spotify/ffwd/output/CoreOutputManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package com.spotify.ffwd.output;

import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
Expand All @@ -37,8 +38,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.isomorphism.util.TokenBucket;
Expand All @@ -51,8 +56,11 @@ public class CoreOutputManager implements OutputManager {
private static final String HOST = "host";
private static final Logger log = LoggerFactory.getLogger(CoreOutputManager.class);
private static final String[] KEYS_NEVER_TO_DROP = {"ffwd-java", "ffwd-java.ffwd-java"};
private static final int HYPER_LOG_LOG_PLUS_PRECISION_NORMAL = 14;
private static final int HYPER_LOG_LOG_PLUS_PRECISION_SPARSE = 25;

private final TokenBucket rateLimiter;
private final Long cardinalityLimit;

@Inject
private List<PluginSink> sinks;
Expand Down Expand Up @@ -113,18 +121,35 @@ public long getTtl() {
@Inject
private Filter filter;

private AtomicReference<HyperLogLogPlus> hyperLog;
private AtomicLong hyperLogSwapTS;
private AtomicBoolean hyperLogSwapLock;
private Long hyperLogLogPlusSwapPeriodMS;

public final Long getRateLimit() {
if (rateLimiter == null) {
return null;
}
return rateLimiter.getCapacity();
}

public final Long getCardinalityLimit() {
return cardinalityLimit;
}

public final Long getHLLPRefreshPeriodLimit() {
return hyperLogLogPlusSwapPeriodMS;
}

@Inject
CoreOutputManager(@Named("rateLimit") @Nullable Integer rateLimit) {
CoreOutputManager(@Named("rateLimit") @Nullable Integer rateLimit,
@Named("cardinalityLimit") @Nullable Long cardinalityLimit,
@Named("hyperLogLogPlusSwapPeriodMS") @Nullable Long hyperLogLogPlusSwapPeriodMS) {

if (rateLimit != null && rateLimit > 0) {
// Create a rate limiter with a configurable QPS, and
// tick every half second to reduce the delay between refills.
log.info("Initializing rate limiting: {} per second", rateLimit);
rateLimiter = TokenBuckets.builder()
.withCapacity(rateLimit)
.withInitialTokens(rateLimit)
Expand All @@ -133,6 +158,23 @@ public final Long getRateLimit() {
} else {
rateLimiter = null;
}

hyperLog = new AtomicReference<>(new HyperLogLogPlus(
HYPER_LOG_LOG_PLUS_PRECISION_NORMAL, HYPER_LOG_LOG_PLUS_PRECISION_SPARSE));
hyperLogSwapTS = new AtomicLong(System.currentTimeMillis());
hyperLogSwapLock = new AtomicBoolean(false);
this.hyperLogLogPlusSwapPeriodMS =
Optional.ofNullable(hyperLogLogPlusSwapPeriodMS).orElse(3_600_000L);

if (cardinalityLimit != null && cardinalityLimit > 0) {
// Use cardinalityLimit to limit cardinality
log.info("Initializing cardinality limit: {}", cardinalityLimit);
log.info("Initializing HyperLogLogPlus swap time: {} ms",
this.hyperLogLogPlusSwapPeriodMS);
this.cardinalityLimit = cardinalityLimit;
} else {
this.cardinalityLimit = null;
}
}

@Override
Expand All @@ -155,9 +197,10 @@ public void sendMetric(Metric metric) {

debug.inspectMetric(DEBUG_ID, filtered);

if (!rateLimitAllowed(1, metric.getKey())) {
log.debug("Dropping a metric due to rate limiting");
statistics.reportMetricsDroppedByRateLimit(1);
hyperLog.get().offer(metric.hashCode());
statistics.reportMetricsCardinality(hyperLog.get().cardinality());

if (isDroppable(1, metric.getKey())) {
return;
}

Expand All @@ -176,9 +219,10 @@ public void sendBatch(Batch batch) {

int batchSize = batch.getPoints().size();

if (batchSize > 0 && !rateLimitAllowed(batchSize, batch.getPoints().get(0).getKey())) {
log.debug("Dropping {} metrics due to rate limiting", batchSize);
statistics.reportMetricsDroppedByRateLimit(batchSize);
hyperLog.get().offer(batch.hashCode());
statistics.reportMetricsCardinality(hyperLog.get().cardinality());

if (isDroppable(batchSize, batch.getPoints().get(0).getKey())) {
return;
}

Expand Down Expand Up @@ -207,9 +251,8 @@ public AsyncFuture<Void> stop() {
return async.collectAndDiscard(futures);
}

private boolean rateLimitAllowed(int permits, String key) {
if (rateLimiter == null
|| Arrays.asList(KEYS_NEVER_TO_DROP).contains(key)) {
private boolean rateLimitAllowed(int permits) {
if (rateLimiter == null) {
return true;
}
try {
Expand All @@ -220,6 +263,49 @@ private boolean rateLimitAllowed(int permits, String key) {
}
}

private boolean cardinalityLimitAllowed(long currentCardinality) {
return cardinalityLimit == null || cardinalityLimit >= currentCardinality;
}

/**
* To reset cardinality this will swap HLL++ if it was tripped after configured period of ms
*/
private void swapHyperLogLogPlus() {
if (System.currentTimeMillis() - hyperLogSwapTS.get() > hyperLogLogPlusSwapPeriodMS
&& hyperLogSwapLock.compareAndExchange(false, true)) {
hyperLog.set(new HyperLogLogPlus(
HYPER_LOG_LOG_PLUS_PRECISION_NORMAL, HYPER_LOG_LOG_PLUS_PRECISION_SPARSE));
hyperLogSwapTS.set(System.currentTimeMillis());
hyperLogSwapLock.set(false);
}
}

/**
* Makes sure either batch or individual metric should be dropped either
* 1. by rate limit
* 2. by cardinality limit
*/
private boolean isDroppable(final int batchSize, String key) {
if (!Arrays.asList(KEYS_NEVER_TO_DROP).contains(key)) {
if (batchSize > 0 && !rateLimitAllowed(batchSize)) {
log.debug("Dropping {} metrics due to rate limiting", batchSize);
statistics.reportMetricsDroppedByRateLimit(batchSize);
return true;
}

if (!cardinalityLimitAllowed(hyperLog.get().cardinality())) {
log.debug(
"Dropping {} metrics due to cardinality limiting; cardinality {}",
batchSize,
hyperLog.get().cardinality());
statistics.reportMetricsDroppedByCardinalityLimit(batchSize);
swapHyperLogLogPlus();
return true;
}
}
return false;
}

/**
* Filter the provided Metric and complete fields.
*/
Expand Down
Loading

0 comments on commit 32a8cc3

Please sign in to comment.