Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ChannelMode and sampling rate for extended aggregation #187

Merged
merged 1 commit into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 63 additions & 76 deletions statsd/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,59 +11,9 @@ type (
countsMap map[string]*countMetric
gaugesMap map[string]*gaugeMetric
setsMap map[string]*setMetric
bufferedMetricMap map[string]*histogramMetric
bufferedMetricMap map[string]*bufferedMetric
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I'm wondering if we should consider sync.Map? It's not totally clear to me, but we might get better performance. On paper a sync.Map might offer better performance when used as follows:

(1) when the entry for a given key is only ever written once but read many times, as in caches that only grow,

(2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys.

I think we have a bit of (1) in that we create the context once and then sample over and over, but it might not fit the description fully. We don't have super high concurrency for (2), and the sets of keys would not be disjoint, so I'm not sure that apples. In any case, might be worth giving it thought to decide if we should keep the current implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a really good point. I'll do that in a different PR !

)

// bufferedMetricContexts represent the contexts for Histograms, Distributions
// and Timing. Since those 3 metric types behave the same way and are sampled
// with the same type they're represented by the same class.
type bufferedMetricContexts struct {
nbContext int32
mutex sync.RWMutex
values bufferedMetricMap
newMetric func(string, float64, string) *bufferedMetric
}

func newBufferedContexts(newMetric func(string, float64, string) *bufferedMetric) bufferedMetricContexts {
return bufferedMetricContexts{
values: bufferedMetricMap{},
newMetric: newMetric,
}
}

func (bc *bufferedMetricContexts) flush(metrics []metric) []metric {
bc.mutex.Lock()
values := bc.values
bc.values = bufferedMetricMap{}
bc.mutex.Unlock()

for _, d := range values {
metrics = append(metrics, d.flushUnsafe())
}
atomic.AddInt32(&bc.nbContext, int32(len(values)))
return metrics
}

func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string) error {
context, stringTags := getContextAndTags(name, tags)
bc.mutex.RLock()
if v, found := bc.values[context]; found {
v.sample(value)
bc.mutex.RUnlock()
return nil
}
bc.mutex.RUnlock()

bc.mutex.Lock()
bc.values[context] = bc.newMetric(name, value, stringTags)
bc.mutex.Unlock()
return nil
}

func (bc *bufferedMetricContexts) resetAndGetNbContext() int32 {
return atomic.SwapInt32(&bc.nbContext, 0)
}

type aggregator struct {
nbContextGauge int32
nbContextCount int32
Expand All @@ -81,9 +31,15 @@ type aggregator struct {
timings bufferedMetricContexts

closed chan struct{}
exited chan struct{}

client *Client

// aggregator implements ChannelMode mechanism to receive histograms,
// distributions and timings. Since they need sampling they need to
// lock for random. When using both ChannelMode and ExtendedAggregation
// we don't want goroutine to fight over the lock.
inputMetrics chan metric
stopChannelMode chan struct{}
}

type aggregatorMetrics struct {
Expand All @@ -98,15 +54,15 @@ type aggregatorMetrics struct {

func newAggregator(c *Client) *aggregator {
return &aggregator{
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
histograms: newBufferedContexts(newHistogramMetric),
distributions: newBufferedContexts(newDistributionMetric),
timings: newBufferedContexts(newTimingMetric),
closed: make(chan struct{}),
exited: make(chan struct{}),
client: c,
counts: countsMap{},
gauges: gaugesMap{},
sets: setsMap{},
histograms: newBufferedContexts(newHistogramMetric),
distributions: newBufferedContexts(newDistributionMetric),
timings: newBufferedContexts(newTimingMetric),
closed: make(chan struct{}),
stopChannelMode: make(chan struct{}),
}
}

Expand All @@ -117,25 +73,49 @@ func (a *aggregator) start(flushInterval time.Duration) {
for {
select {
case <-ticker.C:
a.sendMetrics()
a.flush()
case <-a.closed:
close(a.exited)
return
}
}
}()
}

func (a *aggregator) sendMetrics() {
for _, m := range a.flushMetrics() {
a.client.send(m)
}
func (a *aggregator) startReceivingMetric(bufferSize int) {
a.inputMetrics = make(chan metric, bufferSize)
go a.pullMetric()
}

func (a *aggregator) stopReceivingMetric() {
a.stopChannelMode <- struct{}{}
}

func (a *aggregator) stop() {
close(a.closed)
<-a.exited
a.sendMetrics()
a.closed <- struct{}{}
}

func (a *aggregator) pullMetric() {
for {
select {
case m := <-a.inputMetrics:
switch m.metricType {
case histogram:
a.histogram(m.name, m.fvalue, m.tags, m.rate)
case distribution:
a.distribution(m.name, m.fvalue, m.tags, m.rate)
case timing:
a.timing(m.name, m.fvalue, m.tags, m.rate)
}
case <-a.stopChannelMode:
return
}
}
}

func (a *aggregator) flush() {
for _, m := range a.flushMetrics() {
a.client.sendBlocking(m)
}
}

func (a *aggregator) flushTelemetryMetrics() *aggregatorMetrics {
Expand Down Expand Up @@ -258,14 +238,21 @@ func (a *aggregator) set(name string, value string, tags []string) error {
return nil
}

func (a *aggregator) histogram(name string, value float64, tags []string) error {
return a.histograms.sample(name, value, tags)
// Only histograms, distributions and timings are sampled with a rate since we
// only pack them in on message instead of aggregating them. Discarding the
// sample rate will have impacts on the CPU and memory usage of the Agent.

// type alias for Client.sendToAggregator
type bufferedMetricSampleFunc func(name string, value float64, tags []string, rate float64) error

func (a *aggregator) histogram(name string, value float64, tags []string, rate float64) error {
return a.histograms.sample(name, value, tags, rate)
}

func (a *aggregator) distribution(name string, value float64, tags []string) error {
return a.distributions.sample(name, value, tags)
func (a *aggregator) distribution(name string, value float64, tags []string, rate float64) error {
return a.distributions.sample(name, value, tags, rate)
}

func (a *aggregator) timing(name string, value float64, tags []string) error {
return a.timings.sample(name, value, tags)
func (a *aggregator) timing(name string, value float64, tags []string, rate float64) error {
return a.timings.sample(name, value, tags, rate)
}
30 changes: 15 additions & 15 deletions statsd/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ func TestAggregatorSample(t *testing.T) {
assert.Len(t, a.sets, 1)
assert.Contains(t, a.sets, "setTest:tag1,tag2")

a.histogram("histogramTest", 21, tags)
a.histogram("histogramTest", 21, tags, 1)
assert.Len(t, a.histograms.values, 1)
assert.Contains(t, a.histograms.values, "histogramTest:tag1,tag2")

a.distribution("distributionTest", 21, tags)
a.distribution("distributionTest", 21, tags, 1)
assert.Len(t, a.distributions.values, 1)
assert.Contains(t, a.distributions.values, "distributionTest:tag1,tag2")

a.timing("timingTest", 21, tags)
a.timing("timingTest", 21, tags, 1)
assert.Len(t, a.timings.values, 1)
assert.Contains(t, a.timings.values, "timingTest:tag1,tag2")
}
Expand All @@ -63,17 +63,17 @@ func TestAggregatorFlush(t *testing.T) {
a.set("setTest1", "value2", tags)
a.set("setTest2", "value1", tags)

a.histogram("histogramTest1", 21, tags)
a.histogram("histogramTest1", 22, tags)
a.histogram("histogramTest2", 23, tags)
a.histogram("histogramTest1", 21, tags, 1)
a.histogram("histogramTest1", 22, tags, 1)
a.histogram("histogramTest2", 23, tags, 1)

a.distribution("distributionTest1", 21, tags)
a.distribution("distributionTest1", 22, tags)
a.distribution("distributionTest2", 23, tags)
a.distribution("distributionTest1", 21, tags, 1)
a.distribution("distributionTest1", 22, tags, 1)
a.distribution("distributionTest2", 23, tags, 1)

a.timing("timingTest1", 21, tags)
a.timing("timingTest1", 22, tags)
a.timing("timingTest2", 23, tags)
a.timing("timingTest1", 21, tags, 1)
a.timing("timingTest1", 22, tags, 1)
a.timing("timingTest2", 23, tags, 1)

metrics := a.flushMetrics()

Expand Down Expand Up @@ -210,9 +210,9 @@ func TestAggregatorFlushConcurrency(t *testing.T) {
a.gauge("gaugeTest1", 21, tags)
a.count("countTest1", 21, tags)
a.set("setTest1", "value1", tags)
a.histogram("histogramTest1", 21, tags)
a.distribution("distributionTest1", 21, tags)
a.timing("timingTest1", 21, tags)
a.histogram("histogramTest1", 21, tags, 1)
a.distribution("distributionTest1", 21, tags, 1)
a.timing("timingTest1", 21, tags, 1)
}()
}

Expand Down
75 changes: 75 additions & 0 deletions statsd/buffered_metric_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package statsd

import (
"math/rand"
"sync"
"sync/atomic"
"time"
)

// bufferedMetricContexts represent the contexts for Histograms, Distributions
// and Timing. Since those 3 metric types behave the same way and are sampled
// with the same type they're represented by the same class.
type bufferedMetricContexts struct {
nbContext int32
mutex sync.RWMutex
values bufferedMetricMap
newMetric func(string, float64, string) *bufferedMetric

// Each bufferedMetricContexts uses its own random source and random
// lock to prevent goroutines from contending for the lock on the
// "math/rand" package-global random source (e.g. calls like
// "rand.Float64()" must acquire a shared lock to get the next
// pseudorandom number).
random *rand.Rand
randomLock sync.Mutex
}

func newBufferedContexts(newMetric func(string, float64, string) *bufferedMetric) bufferedMetricContexts {
return bufferedMetricContexts{
values: bufferedMetricMap{},
newMetric: newMetric,
// Note that calling "time.Now().UnixNano()" repeatedly quickly may return
// very similar values. That's fine for seeding the worker-specific random
// source because we just need an evenly distributed stream of float values.
// Do not use this random source for cryptographic randomness.
random: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

func (bc *bufferedMetricContexts) flush(metrics []metric) []metric {
bc.mutex.Lock()
values := bc.values
bc.values = bufferedMetricMap{}
bc.mutex.Unlock()

for _, d := range values {
metrics = append(metrics, d.flushUnsafe())
}
atomic.AddInt32(&bc.nbContext, int32(len(values)))
return metrics
}

func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string, rate float64) error {
if !shouldSample(rate, bc.random, &bc.randomLock) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know there's no easy way around this, I'm a little concerned about all this locking here. We have one lock per metric type which is still a pretty coarse granularity. Can't really think of a good workaround.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true. Also it's the same overhead as the default configuration right now. When using the default setting (ie mutext mode), we have the same lock and random. So the overhead should be the same with and without extended aggregation.

return nil
}

context, stringTags := getContextAndTags(name, tags)
bc.mutex.RLock()
if v, found := bc.values[context]; found {
v.sample(value)
bc.mutex.RUnlock()
return nil
}
bc.mutex.RUnlock()

bc.mutex.Lock()
bc.values[context] = bc.newMetric(name, value, stringTags)
bc.mutex.Unlock()
return nil
}

func (bc *bufferedMetricContexts) resetAndGetNbContext() int32 {
return atomic.SwapInt32(&bc.nbContext, 0)
}
4 changes: 4 additions & 0 deletions statsd/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package statsd

import (
"fmt"
"math"
"strings"
"time"
Expand Down Expand Up @@ -199,6 +200,9 @@ func WithBufferFlushInterval(bufferFlushInterval time.Duration) Option {
// WithBufferShardCount sets the BufferShardCount option.
func WithBufferShardCount(bufferShardCount int) Option {
return func(o *Options) error {
if bufferShardCount < 1 {
return fmt.Errorf("BufferShardCount must be a positive integer")
}
o.BufferShardCount = bufferShardCount
return nil
}
Expand Down
Loading