Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add implementation of Sum aggregators #3000

Merged
merged 42 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
405c377
Implement the sum aggregators
MrAlias Jul 7, 2022
a138c5f
Add unit tests for delta/cumulative sums
MrAlias Jul 7, 2022
4ba0c4c
Add benchmarks
MrAlias Jul 7, 2022
f4237c1
Merge sum tests into one
MrAlias Jul 7, 2022
1397f4c
Remove unused start time from cumulative sum
MrAlias Jul 7, 2022
31adb7f
Refactor benchmark tests
MrAlias Jul 7, 2022
3ad3c89
goimports
MrAlias Jul 8, 2022
4b78904
Move timestamp out of lock
MrAlias Jul 8, 2022
090fb3b
Refactor testing
MrAlias Jul 8, 2022
1166395
Fix spelling mistake
MrAlias Jul 8, 2022
0a01d06
Name param of expectFunc
MrAlias Jul 8, 2022
9e9a0ad
Reset delta sum to zero instead of delete
MrAlias Jul 8, 2022
2b1741b
Revert to deleting unused attr sets
MrAlias Jul 11, 2022
2399af3
Refactor testing to allow use across other aggs
MrAlias Jul 11, 2022
45b07ed
Add TODO to bound cumulative sum mem usage
MrAlias Jul 11, 2022
ee4ce7b
Fix misspelling
MrAlias Jul 11, 2022
f125d53
Merge branch 'new_sdk/main' into sum-agg-impl
MrAlias Jul 11, 2022
4311369
Unify aggregator benchmark code in aggregator_test
MrAlias Jul 11, 2022
02cdb3a
Merge branch 'new_sdk/main' into sum-agg-impl
MrAlias Jul 21, 2022
d483c2a
Use generic DataPoint value
MrAlias Jul 21, 2022
5f72d60
Fix assertion_fail_test.go
MrAlias Jul 22, 2022
b5732c0
Merge branch 'generic-metricdata' into sum-agg-impl
MrAlias Jul 22, 2022
17b4e8f
Use generic metricdata types
MrAlias Jul 22, 2022
e48ad68
Merge branch 'new_sdk/main' into sum-agg-impl
MrAlias Jul 25, 2022
dcbfd9a
Fix tests
MrAlias Jul 25, 2022
292898c
Fix benchmarks
MrAlias Jul 25, 2022
b911e60
Fix lint
MrAlias Jul 26, 2022
86971db
Merge branch 'new_sdk/main' into sum-agg-impl
MrAlias Jul 26, 2022
840ca2e
Update sum documentation
MrAlias Jul 26, 2022
bcf2fc5
Remove leftover encapsulating test run
MrAlias Jul 26, 2022
1fe3558
Use t.Cleanup to mock time
MrAlias Jul 26, 2022
16359dd
Merge branch 'new_sdk/main' into sum-agg-impl
MrAlias Jul 27, 2022
3c7a389
Consolidate expecter logic into funcs
MrAlias Jul 27, 2022
2745e1c
Move errNegVal closer to use
MrAlias Jul 27, 2022
c4ad8ac
Run the agg test
MrAlias Jul 27, 2022
e7b779f
Add tests for monotonic sum Aggregate err
MrAlias Jul 27, 2022
cfd7d87
Run make lint
MrAlias Jul 27, 2022
4db6b90
Make monotonic an arg of creation funcs
MrAlias Jul 27, 2022
f01d942
Remove Aggregate monotonic validation
MrAlias Jul 27, 2022
6fed240
Rename sum to valueMap
MrAlias Jul 27, 2022
b2486fa
Merge branch 'new_sdk/main' into sum-agg-impl
MrAlias Jul 28, 2022
672e8b2
Merge branch 'new_sdk/main' into sum-agg-impl
MrAlias Aug 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/metric/internal/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Aggregation struct {
// Timestamp defines the time the last measurement was made. If zero, no
// measurements were made for this time span. The time is represented as a
// unix timestamp with nanosecond precision.
Timestamp uint64
Timestamp int64

// Attributes are the unique dimensions Value describes.
Attributes attribute.Set
Expand Down
65 changes: 50 additions & 15 deletions sdk/metric/internal/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,28 @@

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import "go.opentelemetry.io/otel/attribute"
import (
"sync"
"time"

"go.opentelemetry.io/otel/attribute"
)

// sum summarizes a set of measurements as their arithmetic sum.
type sum[N int64 | float64] struct {
// TODO(#2972): implement.
sync.Mutex

values map[attribute.Set]N
}
MrAlias marked this conversation as resolved.
Show resolved Hide resolved

func newSum[N int64 | float64]() sum[N] {
return sum[N]{values: make(map[attribute.Set]N)}
}

func (s *sum[N]) Aggregate(value N, attr attribute.Set) {
// TODO(#2972): implement.
s.Lock()
s.values[attr] += value
s.Unlock()
}

// NewDeltaSum returns an Aggregator that summarizes a set of measurements as
Expand All @@ -35,21 +48,33 @@ func (s *sum[N]) Aggregate(value N, attr attribute.Set) {
// Each aggregation cycle is treated independently. When the returned
// Aggregator's Aggregations method is called it will reset all sums to zero.
func NewDeltaSum[N int64 | float64]() Aggregator[N] {
// TODO(#2972): implement.
return &deltaSum[N]{}
return &deltaSum[N]{newSum[N]()}
}

// deltaSum summarizes a set of measurements made in a single aggregation
// cycle as their arithmetic sum.
type deltaSum[N int64 | float64] struct {
sum[N]

// TODO(#2972): implement.
}

func (s *deltaSum[N]) Aggregations() []Aggregation {
// TODO(#2972): implement.
return nil
now := time.Now().UnixNano()

s.Lock()
defer s.Unlock()

aggs := make([]Aggregation, 0, len(s.values))
var zero N
for attr, value := range s.values {
aggs = append(aggs, Aggregation{
Timestamp: now,
Attributes: attr,
Value: SingleValue[N]{Value: value},
})
s.values[attr] = zero
}

return aggs
}

// NewCumulativeSum returns an Aggregator that summarizes a set of
Expand All @@ -59,19 +84,29 @@ func (s *deltaSum[N]) Aggregations() []Aggregation {
// arithmetic sum of all values aggregated since the returned Aggregator was
// created.
func NewCumulativeSum[N int64 | float64]() Aggregator[N] {
// TODO(#2972): implement.
return &cumulativeSum[N]{}
return &cumulativeSum[N]{sum: newSum[N]()}
}

// cumulativeSum summarizes a set of measurements made over all aggregation
// cycles as their arithmetic sum.
type cumulativeSum[N int64 | float64] struct {
sum[N]

// TODO(#2972): implement.
}

func (s *cumulativeSum[N]) Aggregations() []Aggregation {
// TODO(#2972): implement.
return nil
now := time.Now().UnixNano()

s.Lock()
defer s.Unlock()

aggs := make([]Aggregation, 0, len(s.values))
for attr, value := range s.values {
aggs = append(aggs, Aggregation{
Timestamp: now,
Attributes: attr,
Value: SingleValue[N]{Value: value},
})
}

return aggs
}
221 changes: 221 additions & 0 deletions sdk/metric/internal/sum_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"

import (
"strconv"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/attribute"
)

const (
goroutines = 5
measurements = 30
cycles = 3
)

var (
alice = attribute.NewSet(attribute.String("user", "alice"), attribute.Bool("admin", true))
bob = attribute.NewSet(attribute.String("user", "bob"), attribute.Bool("admin", false))
carol = attribute.NewSet(attribute.String("user", "carol"), attribute.Bool("admin", false))
)

func TestSum(t *testing.T) {
t.Run("Delta", func(t *testing.T) {
t.Run("Int64", testSum(NewDeltaSum[int64](), deltaExpecter[int64]))
t.Run("Float64", testSum(NewDeltaSum[float64](), deltaExpecter[float64]))
})

t.Run("Cumulative", func(t *testing.T) {
t.Run("Int64", testSum(NewCumulativeSum[int64](), cumulativeExpecter[int64]))
t.Run("Float64", testSum(NewCumulativeSum[float64](), cumulativeExpecter[float64]))
})
}

// expectFunc returns a function that will return a map of expected values of
// a cycle. Each call advances the cycle.
type expectFunc[N int64 | float64] func(increments map[attribute.Set]N) func() map[attribute.Set]N

func testSum[N int64 | float64](a Aggregator[N], expecter expectFunc[N]) func(*testing.T) {
increments := map[attribute.Set]N{alice: 1, bob: -1, carol: 2}
f := expecter(increments)
return func(t *testing.T) {
for i := 0; i < cycles; i++ {
var wg sync.WaitGroup
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func() {
defer wg.Done()
for j := 0; j < measurements; j++ {
for attrs, n := range increments {
a.Aggregate(n, attrs)
}
}
}()
}
wg.Wait()

assertMap(t, f(), aggregationsToMap[N](a.Aggregations()))
}
}
}

func aggregationsToMap[N int64 | float64](a []Aggregation) map[attribute.Set]N {
m := make(map[attribute.Set]N)
for _, a := range a {
m[a.Attributes] = a.Value.(SingleValue[N]).Value
}
return m
}

// assertMap asserts expected equals actual. The testify assert.Equal function
// does not give clear error messages for maps, this attempts to do so.
func assertMap[N int64 | float64](t *testing.T, expected, actual map[attribute.Set]N) {
extra := make(map[attribute.Set]struct{})
for attr := range actual {
extra[attr] = struct{}{}
}

for attr, v := range expected {
name := attr.Encoded(attribute.DefaultEncoder())
t.Run(name, func(t *testing.T) {
require.Contains(t, actual, attr)
delete(extra, attr)
assert.Equal(t, v, actual[attr])
})
}

assert.Lenf(t, extra, 0, "unknown values added: %v", extra)
}

func deltaExpecter[N int64 | float64](incr map[attribute.Set]N) func() map[attribute.Set]N {
expect := make(map[attribute.Set]N, len(incr))
for actor, incr := range incr {
expect[actor] = incr * measurements * goroutines
}
return func() map[attribute.Set]N { return expect }
}

func cumulativeExpecter[N int64 | float64](incr map[attribute.Set]N) func() map[attribute.Set]N {
var cycle int
base := make(map[attribute.Set]N, len(incr))
for actor, incr := range incr {
base[actor] = incr * measurements * goroutines
}

expect := make(map[attribute.Set]N, len(incr))
return func() map[attribute.Set]N {
cycle++
for actor := range base {
expect[actor] = base[actor] * N(cycle)
}
return expect
}
}

func testDeltaSumReset[N int64 | float64](a Aggregator[N]) func(*testing.T) {
return func(t *testing.T) {
expect := make(map[attribute.Set]N)
assertMap(t, expect, aggregationsToMap[N](a.Aggregations()))

a.Aggregate(1, alice)
expect[alice] = 1
assertMap(t, expect, aggregationsToMap[N](a.Aggregations()))

// The sum should be reset to zero once Aggregations is called.
expect[alice] = 0
assertMap(t, expect, aggregationsToMap[N](a.Aggregations()))

// Aggregating another set should not affect the original (alice).
a.Aggregate(1, bob)
expect[bob] = 1
assertMap(t, expect, aggregationsToMap[N](a.Aggregations()))
}
}

func TestDeltaSumReset(t *testing.T) {
t.Run("Int64", testDeltaSumReset(NewDeltaSum[int64]()))
t.Run("Float64", testDeltaSumReset(NewDeltaSum[float64]()))
}

var result []Aggregation

func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() Aggregator[N], count int) {
attrs := make([]attribute.Set, count)
for i := range attrs {
attrs[i] = attribute.NewSet(attribute.Int("value", i))
}

b.Run("Aggregate", func(b *testing.B) {
agg := factory()
b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
for _, attr := range attrs {
agg.Aggregate(1, attr)
}
}
assert.Len(b, agg.Aggregations(), count)
})

b.Run("Aggregations", func(b *testing.B) {
aggs := make([]Aggregator[N], b.N)
for n := range aggs {
a := factory()
for _, attr := range attrs {
a.Aggregate(1, attr)
}
aggs[n] = a
}

b.ReportAllocs()
b.ResetTimer()

for n := 0; n < b.N; n++ {
result = aggs[n].Aggregations()
}
})
}

func benchmarkAggregator[N int64 | float64](factory func() Aggregator[N]) func(*testing.B) {
counts := []int{1, 10, 100}
return func(b *testing.B) {
for _, n := range counts {
b.Run(strconv.Itoa(n), func(b *testing.B) {
benchmarkAggregatorN(b, factory, n)
})
}
}
}

func BenchmarkSum(b *testing.B) {
b.Run("Delta", func(b *testing.B) {
b.Run("Int64", benchmarkAggregator(NewDeltaSum[int64]))
b.Run("Float64", benchmarkAggregator(NewDeltaSum[float64]))
})
b.Run("Cumulative", func(b *testing.B) {
b.Run("Int64", benchmarkAggregator(NewCumulativeSum[int64]))
b.Run("Float64", benchmarkAggregator(NewCumulativeSum[float64]))
})
}