-
Notifications
You must be signed in to change notification settings - Fork 134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement ChannelMode and sampling rate for extended aggregation #187
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
package statsd | ||
|
||
import ( | ||
"math/rand" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
) | ||
|
||
// bufferedMetricContexts represent the contexts for Histograms, Distributions | ||
// and Timing. Since those 3 metric types behave the same way and are sampled | ||
// with the same type they're represented by the same class. | ||
type bufferedMetricContexts struct { | ||
nbContext int32 | ||
mutex sync.RWMutex | ||
values bufferedMetricMap | ||
newMetric func(string, float64, string) *bufferedMetric | ||
|
||
// Each bufferedMetricContexts uses its own random source and random | ||
// lock to prevent goroutines from contending for the lock on the | ||
// "math/rand" package-global random source (e.g. calls like | ||
// "rand.Float64()" must acquire a shared lock to get the next | ||
// pseudorandom number). | ||
random *rand.Rand | ||
randomLock sync.Mutex | ||
} | ||
|
||
func newBufferedContexts(newMetric func(string, float64, string) *bufferedMetric) bufferedMetricContexts { | ||
return bufferedMetricContexts{ | ||
values: bufferedMetricMap{}, | ||
newMetric: newMetric, | ||
// Note that calling "time.Now().UnixNano()" repeatedly quickly may return | ||
// very similar values. That's fine for seeding the worker-specific random | ||
// source because we just need an evenly distributed stream of float values. | ||
// Do not use this random source for cryptographic randomness. | ||
random: rand.New(rand.NewSource(time.Now().UnixNano())), | ||
} | ||
} | ||
|
||
func (bc *bufferedMetricContexts) flush(metrics []metric) []metric { | ||
bc.mutex.Lock() | ||
values := bc.values | ||
bc.values = bufferedMetricMap{} | ||
bc.mutex.Unlock() | ||
|
||
for _, d := range values { | ||
metrics = append(metrics, d.flushUnsafe()) | ||
} | ||
atomic.AddInt32(&bc.nbContext, int32(len(values))) | ||
return metrics | ||
} | ||
|
||
func (bc *bufferedMetricContexts) sample(name string, value float64, tags []string, rate float64) error { | ||
if !shouldSample(rate, bc.random, &bc.randomLock) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know there's no easy way around this, I'm a little concerned about all this locking here. We have one lock per metric type which is still a pretty coarse granularity. Can't really think of a good workaround. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's true. Also it's the same overhead as the default configuration right now. When using the default setting (ie mutext mode), we have the same lock and random. So the overhead should be the same with and without extended aggregation. |
||
return nil | ||
} | ||
|
||
context, stringTags := getContextAndTags(name, tags) | ||
bc.mutex.RLock() | ||
if v, found := bc.values[context]; found { | ||
v.sample(value) | ||
bc.mutex.RUnlock() | ||
return nil | ||
} | ||
bc.mutex.RUnlock() | ||
|
||
bc.mutex.Lock() | ||
bc.values[context] = bc.newMetric(name, value, stringTags) | ||
bc.mutex.Unlock() | ||
return nil | ||
} | ||
|
||
func (bc *bufferedMetricContexts) resetAndGetNbContext() int32 { | ||
return atomic.SwapInt32(&bc.nbContext, 0) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I'm wondering if we should consider
sync.Map
? It's not totally clear to me, but we might get better performance. On paper async.Map
might offer better performance when used as follows:I think we have a bit of (1) in that we create the context once and then sample over and over, but it might not fit the description fully. We don't have super high concurrency for (2), and the sets of keys would not be disjoint, so I'm not sure that apples. In any case, might be worth giving it thought to decide if we should keep the current implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a really good point. I'll do that in a different PR !