From 8fc1c74765a2a8af2d71556e9d7a08b8fc9bdc2d Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 13 Jun 2022 14:12:34 -0700 Subject: [PATCH 01/25] Add the aggtor package --- sdk/metric/internal/aggtor/aggregation.go | 106 ++++++++++++++++++++++ sdk/metric/internal/aggtor/aggregator.go | 24 +++++ sdk/metric/internal/aggtor/doc.go | 20 ++++ sdk/metric/internal/aggtor/drop.go | 29 ++++++ sdk/metric/internal/aggtor/histogram.go | 40 ++++++++ sdk/metric/internal/aggtor/lastvalue.go | 39 ++++++++ sdk/metric/internal/aggtor/number.go | 87 ++++++++++++++++++ sdk/metric/internal/aggtor/sum.go | 40 ++++++++ 8 files changed, 385 insertions(+) create mode 100644 sdk/metric/internal/aggtor/aggregation.go create mode 100644 sdk/metric/internal/aggtor/aggregator.go create mode 100644 sdk/metric/internal/aggtor/doc.go create mode 100644 sdk/metric/internal/aggtor/drop.go create mode 100644 sdk/metric/internal/aggtor/histogram.go create mode 100644 sdk/metric/internal/aggtor/lastvalue.go create mode 100644 sdk/metric/internal/aggtor/number.go create mode 100644 sdk/metric/internal/aggtor/sum.go diff --git a/sdk/metric/internal/aggtor/aggregation.go b/sdk/metric/internal/aggtor/aggregation.go new file mode 100644 index 00000000000..fb722ea39b0 --- /dev/null +++ b/sdk/metric/internal/aggtor/aggregation.go @@ -0,0 +1,106 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" + +import ( + "errors" + "fmt" +) + +// Aggregation is a single data point in a timeseries that summarizes +// measurements made during a time span. +type Aggregation struct { + // Timestamp defines the time the last measurement was made. If zero, no + // measurements were made for this time span. The time is represented as a + // unix timestamp with nanosecond precision. + Timestamp uint64 + + // Value is the summarization of the measurements made. + Value value +} + +var errIncompatible = errors.New("incompatible aggregation") + +// Fold combines other into a. +func (a Aggregation) Fold(other Aggregation) error { + if other.Timestamp > a.Timestamp { + a.Timestamp = other.Timestamp + } + return a.Value.fold(other.Value) +} + +type value interface { + // fold combines other into the value. It will return an errIncompatible + // if other is not a compatible type with value. + fold(other value) error +} + +// SingleValue summarizes a set of measurements as a single numeric value. +type SingleValue[N int64 | float64] struct { + Value N +} + +func (v SingleValue[N]) fold(other value) error { + o, ok := other.(SingleValue[N]) + if !ok { + return fmt.Errorf("%w: value types %T and %T", errIncompatible, v, other) + } + v.Value += o.Value + return nil +} + +// HistogramValue summarizes a set of measurements as a histogram. +type HistogramValue struct { + Bounds []float64 + Counts []uint64 + Sum float64 + Min, Max float64 +} + +func (v HistogramValue) fold(other value) error { + o, ok := other.(HistogramValue) + if !ok { + return fmt.Errorf("%w: value types %T and %T", errIncompatible, v, other) + } + if !sliceEqual[float64](v.Bounds, o.Bounds) || len(o.Counts) != len(v.Counts) { + return fmt.Errorf("%w: different histogram binning", errIncompatible) + } + v.Sum += o.Sum + for i, c := range o.Counts { + v.Counts[i] += c + } + if o.Min < v.Min { + v.Min = o.Min + } + if o.Max > v.Max { + v.Max = o.Max + } + return nil +} + +func sliceEqual[T comparable](a, b []T) bool { + if len(a) != len(b) { + return false + } + for i, v := range a { + if v != b[i] { + return false + } + } + return true +} diff --git a/sdk/metric/internal/aggtor/aggregator.go b/sdk/metric/internal/aggtor/aggregator.go new file mode 100644 index 00000000000..576424f1f2e --- /dev/null +++ b/sdk/metric/internal/aggtor/aggregator.go @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" + +// Aggregator forms an aggregation from a collection of recorded measurements. +type Aggregator[N int64 | float64] interface { + Record(value N) + Aggregate() Aggregation +} diff --git a/sdk/metric/internal/aggtor/doc.go b/sdk/metric/internal/aggtor/doc.go new file mode 100644 index 00000000000..77b97fb7c13 --- /dev/null +++ b/sdk/metric/internal/aggtor/doc.go @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +// Package aggtor provides types and functionality to accumulate and aggregate +// measurements. +package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" diff --git a/sdk/metric/internal/aggtor/drop.go b/sdk/metric/internal/aggtor/drop.go new file mode 100644 index 00000000000..f37169ac173 --- /dev/null +++ b/sdk/metric/internal/aggtor/drop.go @@ -0,0 +1,29 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" + +// dropAgg drops all recorded data and returns an empty Aggregation. +type dropAgg[N int64 | float64] struct{} + +// NewDrop returns an Aggregator that drops all recorded data and returns an +// empty Aggregation. +func NewDrop[N int64 | float64]() Aggregator[N] { return &dropAgg[N]{} } + +func (s *dropAgg[N]) Record(value N) {} + +func (s *dropAgg[N]) Aggregate() Aggregation { return Aggregation{} } diff --git a/sdk/metric/internal/aggtor/histogram.go b/sdk/metric/internal/aggtor/histogram.go new file mode 100644 index 00000000000..ee1f6dbe6b2 --- /dev/null +++ b/sdk/metric/internal/aggtor/histogram.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" + +// histogramAgg summarizes a set of measurements as an histogram with +// explicitly defined buckets. +type histogramAgg[N int64 | float64] struct { + // TODO: implement. +} + +// NewHistogram returns an Aggregator that summarizes a set of measurements as +// an histogram. The zero value will be used as the start value for all the +// buckets of new Aggregations. +func NewHistogram[N int64 | float64](zero Number[N], bounds []float64, recordMinMax bool) Aggregator[N] { + return &histogramAgg[N]{} +} + +func (s *histogramAgg[N]) Record(value N) { + // TODO: implement. +} + +func (s *histogramAgg[N]) Aggregate() Aggregation { + // TODO: implement. + return Aggregation{Value: HistogramValue{ /* TODO: calculate. */ }} +} diff --git a/sdk/metric/internal/aggtor/lastvalue.go b/sdk/metric/internal/aggtor/lastvalue.go new file mode 100644 index 00000000000..ef7fac60f9b --- /dev/null +++ b/sdk/metric/internal/aggtor/lastvalue.go @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" + +// lastValueAgg summarizes a set of measurements as the last one made. +type lastValueAgg[N int64 | float64] struct { + // TODO: implement. +} + +// NewLastValue returns an Aggregator that summarizes a set of measurements as +// the last one made. The zero value will be used as the start value for all +// new Aggregations. +func NewLastValue[N int64 | float64](zero Number[N]) Aggregator[N] { + return &lastValueAgg[N]{} +} + +func (s *lastValueAgg[N]) Record(value N) { + // TODO: implement. +} + +func (s *lastValueAgg[N]) Aggregate() Aggregation { + // TODO: implement. + return Aggregation{Value: SingleValue[N]{ /* TODO: calculate */ }} +} diff --git a/sdk/metric/internal/aggtor/number.go b/sdk/metric/internal/aggtor/number.go new file mode 100644 index 00000000000..99cc6c9c291 --- /dev/null +++ b/sdk/metric/internal/aggtor/number.go @@ -0,0 +1,87 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" + +import ( + "math" + "sync/atomic" +) + +type Number[N int64 | float64] interface { + // Store value atomically. + Store(value N) + + // Add value atomically. + Add(value N) + + // Load returns the stored value. + Load() N + + // Clone creates a new Number from the current value. + Clone() Number[N] +} + +type Int64 struct { + value *int64 +} + +var _ Number[int64] = Int64{} + +func NewInt64(v int64) Int64 { + return Int64{value: &v} +} + +func (v Int64) Store(value int64) { atomic.StoreInt64(v.value, value) } +func (v Int64) Add(value int64) { atomic.AddInt64(v.value, value) } +func (v Int64) Load() int64 { return atomic.LoadInt64(v.value) } +func (v Int64) Clone() Number[int64] { + return NewInt64(v.Load()) +} + +type Float64 struct { + value *uint64 +} + +var _ Number[float64] = Float64{} + +func NewFloat64(v float64) Float64 { + u := math.Float64bits(v) + return Float64{value: &u} +} + +func (v Float64) Store(value float64) { + atomic.StoreUint64(v.value, math.Float64bits(value)) +} + +func (v Float64) Add(value float64) { + for { + old := atomic.LoadUint64(v.value) + sum := math.Float64bits(math.Float64frombits(old) + value) + if atomic.CompareAndSwapUint64(v.value, old, sum) { + return + } + } +} + +func (v Float64) Load() float64 { + return math.Float64frombits(atomic.LoadUint64(v.value)) +} + +func (v Float64) Clone() Number[float64] { + return NewFloat64(v.Load()) +} diff --git a/sdk/metric/internal/aggtor/sum.go b/sdk/metric/internal/aggtor/sum.go new file mode 100644 index 00000000000..67e9f926430 --- /dev/null +++ b/sdk/metric/internal/aggtor/sum.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" + +// sumAgg summarizes a set of measurements as their arithmetic sum. +type sumAgg[N int64 | float64] struct { + // TODO: implement. +} + +// NewSum returns an Aggregator that summarizes a set of +// measurements as their arithmetic sum. The zero value will be used as the +// start value for all new Aggregations. +func NewSum[N int64 | float64](zero Number[N]) Aggregator[N] { + // TODO: implement. + return &sumAgg[N]{} +} + +func (s *sumAgg[N]) Record(value N) { + // TODO: implement. +} + +func (s *sumAgg[N]) Aggregate() Aggregation { + // TODO: implement. + return Aggregation{Value: SingleValue[N]{ /* TODO: calculate */ }} +} From dc1d912728de74feff3cdb5c8ee59b37ccebfd78 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 13 Jun 2022 14:16:49 -0700 Subject: [PATCH 02/25] Restrict to Go 1.18 --- sdk/metric/config.go | 4 ++-- sdk/metric/config_test.go | 4 ++-- sdk/metric/export/data.go | 4 ++-- sdk/metric/exporter.go | 4 ++-- sdk/metric/go.mod | 2 +- sdk/metric/go.sum | 1 - sdk/metric/instrumentkind.go | 4 ++-- sdk/metric/manual_reader.go | 4 ++-- sdk/metric/manual_reader_test.go | 4 ++-- sdk/metric/meter.go | 4 ++-- sdk/metric/meter_test.go | 4 ++-- sdk/metric/periodic_reader.go | 4 ++-- sdk/metric/periodic_reader_test.go | 4 ++-- sdk/metric/provider.go | 4 ++-- sdk/metric/provider_test.go | 4 ++-- sdk/metric/reader.go | 4 ++-- sdk/metric/reader_test.go | 4 ++-- sdk/metric/temporality.go | 4 ++-- sdk/metric/view/instrument.go | 4 ++-- sdk/metric/view/view.go | 4 ++-- 20 files changed, 37 insertions(+), 38 deletions(-) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 258526dbe3a..fcf7024ab95 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index c2b91e8f559..bad1df033be 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric diff --git a/sdk/metric/export/data.go b/sdk/metric/export/data.go index 750294b2218..9ac934d5390 100644 --- a/sdk/metric/export/data.go +++ b/sdk/metric/export/data.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 // TODO: NOTE this is a temporary space, it may be moved following the // discussion of #2813, or #2841 diff --git a/sdk/metric/exporter.go b/sdk/metric/exporter.go index 52f39292555..d62838c9b15 100644 --- a/sdk/metric/exporter.go +++ b/sdk/metric/exporter.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/go.mod b/sdk/metric/go.mod index 07226ebfd9e..77e8d8111b9 100644 --- a/sdk/metric/go.mod +++ b/sdk/metric/go.mod @@ -1,6 +1,6 @@ module go.opentelemetry.io/otel/sdk/metric -go 1.17 +go 1.18 require ( github.com/go-logr/logr v1.2.3 diff --git a/sdk/metric/go.sum b/sdk/metric/go.sum index ac3360c6fee..2e2aed63d24 100644 --- a/sdk/metric/go.sum +++ b/sdk/metric/go.sum @@ -6,7 +6,6 @@ github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= -github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/sdk/metric/instrumentkind.go b/sdk/metric/instrumentkind.go index 8174eee5ef3..20d94fcc43a 100644 --- a/sdk/metric/instrumentkind.go +++ b/sdk/metric/instrumentkind.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 97a11dfdfcd..636db3d982c 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/manual_reader_test.go b/sdk/metric/manual_reader_test.go index 61b9ec74291..14c5d0765f3 100644 --- a/sdk/metric/manual_reader_test.go +++ b/sdk/metric/manual_reader_test.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric/reader" diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 3b052ae864e..daf892dd1dc 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 3fdfbdcb344..38aa7ae28d9 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 83ef273a341..55f1a2b8999 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index ae5e40f2a20..e33e28a26e7 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index 8276d278f26..98d0faffc22 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/provider_test.go b/sdk/metric/provider_test.go index dca401fe3b8..20eb60590a9 100644 --- a/sdk/metric/provider_test.go +++ b/sdk/metric/provider_test.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index 2af077bb4bf..0db7fcd255c 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index cb6a1761805..b47493124a7 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric/reader" diff --git a/sdk/metric/temporality.go b/sdk/metric/temporality.go index 289b151606e..fa896748287 100644 --- a/sdk/metric/temporality.go +++ b/sdk/metric/temporality.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package metric // import "go.opentelemetry.io/otel/sdk/metric" diff --git a/sdk/metric/view/instrument.go b/sdk/metric/view/instrument.go index e0a7774b481..b17205c3ac4 100644 --- a/sdk/metric/view/instrument.go +++ b/sdk/metric/view/instrument.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package view // import "go.opentelemetry.io/otel/sdk/metric/view" diff --git a/sdk/metric/view/view.go b/sdk/metric/view/view.go index f8190a3906a..7598ce15ab5 100644 --- a/sdk/metric/view/view.go +++ b/sdk/metric/view/view.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.17 -// +build go1.17 +//go:build go1.18 +// +build go1.18 package view // import "go.opentelemetry.io/otel/sdk/metric/view" From f9e8a49170dfdae5d984e6c715faf8fe6d90233b Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 13 Jun 2022 19:42:59 -0700 Subject: [PATCH 03/25] Add missing build block to view_test.go --- sdk/metric/view/view_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/metric/view/view_test.go b/sdk/metric/view/view_test.go index ffd4095fea1..10696ff00a6 100644 --- a/sdk/metric/view/view_test.go +++ b/sdk/metric/view/view_test.go @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build go1.18 +// +build go1.18 + package view import ( From 59df1f71458d972998e840886625a6526579a8eb Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 13 Jun 2022 19:45:06 -0700 Subject: [PATCH 04/25] Comment Aggregator iface --- sdk/metric/internal/aggtor/aggregator.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/metric/internal/aggtor/aggregator.go b/sdk/metric/internal/aggtor/aggregator.go index 576424f1f2e..7e5174ec360 100644 --- a/sdk/metric/internal/aggtor/aggregator.go +++ b/sdk/metric/internal/aggtor/aggregator.go @@ -19,6 +19,9 @@ package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" // Aggregator forms an aggregation from a collection of recorded measurements. type Aggregator[N int64 | float64] interface { + // Record includes value in the aggregation. Record(value N) + + // Aggregate returns an aggregation of all recorded values. Aggregate() Aggregation } From 0a700fe596642a758df49eec3d61197fcfb113f0 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 13 Jun 2022 19:51:23 -0700 Subject: [PATCH 05/25] Use Go 1.18 as the default ci version --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index da3fcf1b71f..aa2bd8d3fac 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,7 +8,7 @@ env: # Path to where test results will be saved. TEST_RESULTS: /tmp/test-results # Default minimum version of Go to support. - DEFAULT_GO_VERSION: 1.17 + DEFAULT_GO_VERSION: 1.18 jobs: lint: runs-on: ubuntu-latest From a5c5834a939f825fc4e0b10a28b0a4ce249b19f9 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 15 Jun 2022 12:36:21 -0700 Subject: [PATCH 06/25] Update Aggregator iface from feedback --- sdk/metric/internal/aggtor/aggregation.go | 8 ++++++++ sdk/metric/internal/aggtor/aggregator.go | 10 ++++++---- sdk/metric/internal/aggtor/doc.go | 3 +-- sdk/metric/internal/aggtor/drop.go | 6 ++++-- sdk/metric/internal/aggtor/histogram.go | 12 +++++++++--- sdk/metric/internal/aggtor/lastvalue.go | 12 +++++++++--- sdk/metric/internal/aggtor/sum.go | 12 +++++++++--- 7 files changed, 46 insertions(+), 17 deletions(-) diff --git a/sdk/metric/internal/aggtor/aggregation.go b/sdk/metric/internal/aggtor/aggregation.go index fb722ea39b0..8b038e5ccd7 100644 --- a/sdk/metric/internal/aggtor/aggregation.go +++ b/sdk/metric/internal/aggtor/aggregation.go @@ -20,6 +20,8 @@ package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" import ( "errors" "fmt" + + "go.opentelemetry.io/otel/attribute" ) // Aggregation is a single data point in a timeseries that summarizes @@ -30,6 +32,9 @@ type Aggregation struct { // unix timestamp with nanosecond precision. Timestamp uint64 + // Attributes are the unique dimensions Value describes. + Attributes *attribute.Set + // Value is the summarization of the measurements made. Value value } @@ -41,6 +46,9 @@ func (a Aggregation) Fold(other Aggregation) error { if other.Timestamp > a.Timestamp { a.Timestamp = other.Timestamp } + if !a.Attributes.Equals(other.Attributes) { + return fmt.Errorf("%w: attributes not equal", errIncompatible) + } return a.Value.fold(other.Value) } diff --git a/sdk/metric/internal/aggtor/aggregator.go b/sdk/metric/internal/aggtor/aggregator.go index 7e5174ec360..32dd9a5f7d1 100644 --- a/sdk/metric/internal/aggtor/aggregator.go +++ b/sdk/metric/internal/aggtor/aggregator.go @@ -17,11 +17,13 @@ package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" +import "go.opentelemetry.io/otel/attribute" + // Aggregator forms an aggregation from a collection of recorded measurements. type Aggregator[N int64 | float64] interface { - // Record includes value in the aggregation. - Record(value N) + // Record includes value scoped by attr in the aggregation. + Record(value N, attr *attribute.Set) - // Aggregate returns an aggregation of all recorded values. - Aggregate() Aggregation + // Aggregate returns aggregations of all recorded values. + Aggregate() []Aggregation } diff --git a/sdk/metric/internal/aggtor/doc.go b/sdk/metric/internal/aggtor/doc.go index 77b97fb7c13..8690de1449c 100644 --- a/sdk/metric/internal/aggtor/doc.go +++ b/sdk/metric/internal/aggtor/doc.go @@ -15,6 +15,5 @@ //go:build go1.18 // +build go1.18 -// Package aggtor provides types and functionality to accumulate and aggregate -// measurements. +// Package aggtor provides types and functionality to aggregate measurements. package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" diff --git a/sdk/metric/internal/aggtor/drop.go b/sdk/metric/internal/aggtor/drop.go index f37169ac173..8095d2ec404 100644 --- a/sdk/metric/internal/aggtor/drop.go +++ b/sdk/metric/internal/aggtor/drop.go @@ -17,6 +17,8 @@ package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" +import "go.opentelemetry.io/otel/attribute" + // dropAgg drops all recorded data and returns an empty Aggregation. type dropAgg[N int64 | float64] struct{} @@ -24,6 +26,6 @@ type dropAgg[N int64 | float64] struct{} // empty Aggregation. func NewDrop[N int64 | float64]() Aggregator[N] { return &dropAgg[N]{} } -func (s *dropAgg[N]) Record(value N) {} +func (s *dropAgg[N]) Record(N, *attribute.Set) {} -func (s *dropAgg[N]) Aggregate() Aggregation { return Aggregation{} } +func (s *dropAgg[N]) Aggregate() []Aggregation { return nil } diff --git a/sdk/metric/internal/aggtor/histogram.go b/sdk/metric/internal/aggtor/histogram.go index ee1f6dbe6b2..e8ac2c83929 100644 --- a/sdk/metric/internal/aggtor/histogram.go +++ b/sdk/metric/internal/aggtor/histogram.go @@ -17,6 +17,8 @@ package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" +import "go.opentelemetry.io/otel/attribute" + // histogramAgg summarizes a set of measurements as an histogram with // explicitly defined buckets. type histogramAgg[N int64 | float64] struct { @@ -30,11 +32,15 @@ func NewHistogram[N int64 | float64](zero Number[N], bounds []float64, recordMin return &histogramAgg[N]{} } -func (s *histogramAgg[N]) Record(value N) { +func (s *histogramAgg[N]) Record(value N, attr *attribute.Set) { // TODO: implement. } -func (s *histogramAgg[N]) Aggregate() Aggregation { +func (s *histogramAgg[N]) Aggregate() []Aggregation { // TODO: implement. - return Aggregation{Value: HistogramValue{ /* TODO: calculate. */ }} + return []Aggregation{ + { + Value: HistogramValue{ /* TODO: calculate. */ }, + }, + } } diff --git a/sdk/metric/internal/aggtor/lastvalue.go b/sdk/metric/internal/aggtor/lastvalue.go index ef7fac60f9b..dd9f8be1416 100644 --- a/sdk/metric/internal/aggtor/lastvalue.go +++ b/sdk/metric/internal/aggtor/lastvalue.go @@ -17,6 +17,8 @@ package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" +import "go.opentelemetry.io/otel/attribute" + // lastValueAgg summarizes a set of measurements as the last one made. type lastValueAgg[N int64 | float64] struct { // TODO: implement. @@ -29,11 +31,15 @@ func NewLastValue[N int64 | float64](zero Number[N]) Aggregator[N] { return &lastValueAgg[N]{} } -func (s *lastValueAgg[N]) Record(value N) { +func (s *lastValueAgg[N]) Record(value N, attr *attribute.Set) { // TODO: implement. } -func (s *lastValueAgg[N]) Aggregate() Aggregation { +func (s *lastValueAgg[N]) Aggregate() []Aggregation { // TODO: implement. - return Aggregation{Value: SingleValue[N]{ /* TODO: calculate */ }} + return []Aggregation{ + { + Value: SingleValue[N]{ /* TODO: calculate */ }, + }, + } } diff --git a/sdk/metric/internal/aggtor/sum.go b/sdk/metric/internal/aggtor/sum.go index 67e9f926430..3103b62de79 100644 --- a/sdk/metric/internal/aggtor/sum.go +++ b/sdk/metric/internal/aggtor/sum.go @@ -17,6 +17,8 @@ package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" +import "go.opentelemetry.io/otel/attribute" + // sumAgg summarizes a set of measurements as their arithmetic sum. type sumAgg[N int64 | float64] struct { // TODO: implement. @@ -30,11 +32,15 @@ func NewSum[N int64 | float64](zero Number[N]) Aggregator[N] { return &sumAgg[N]{} } -func (s *sumAgg[N]) Record(value N) { +func (s *sumAgg[N]) Record(value N, attr *attribute.Set) { // TODO: implement. } -func (s *sumAgg[N]) Aggregate() Aggregation { +func (s *sumAgg[N]) Aggregate() []Aggregation { // TODO: implement. - return Aggregation{Value: SingleValue[N]{ /* TODO: calculate */ }} + return []Aggregation{ + { + Value: SingleValue[N]{ /* TODO: calculate */ }, + }, + } } From aa8b3012a474fc59f823a4c9300cde25a1c8e427 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 21 Jun 2022 10:53:59 -0700 Subject: [PATCH 07/25] Accept hist conf --- sdk/metric/internal/aggtor/histogram.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdk/metric/internal/aggtor/histogram.go b/sdk/metric/internal/aggtor/histogram.go index e8ac2c83929..239a1d94025 100644 --- a/sdk/metric/internal/aggtor/histogram.go +++ b/sdk/metric/internal/aggtor/histogram.go @@ -17,7 +17,10 @@ package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" -import "go.opentelemetry.io/otel/attribute" +import ( + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/aggregation" +) // histogramAgg summarizes a set of measurements as an histogram with // explicitly defined buckets. @@ -28,7 +31,7 @@ type histogramAgg[N int64 | float64] struct { // NewHistogram returns an Aggregator that summarizes a set of measurements as // an histogram. The zero value will be used as the start value for all the // buckets of new Aggregations. -func NewHistogram[N int64 | float64](zero Number[N], bounds []float64, recordMinMax bool) Aggregator[N] { +func NewHistogram[N int64 | float64](zero Number[N], cfg aggregation.ExplicitBucketHistogram) Aggregator[N] { return &histogramAgg[N]{} } From f390e41c9a267980d39eec05b571cd88bfcbc671 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 21 Jun 2022 10:58:03 -0700 Subject: [PATCH 08/25] Flatten aggtor into just internal --- sdk/metric/internal/{aggtor => }/aggregation.go | 2 +- sdk/metric/internal/{aggtor => }/aggregator.go | 2 +- sdk/metric/internal/{aggtor => }/doc.go | 6 ++++-- sdk/metric/internal/{aggtor => }/drop.go | 2 +- sdk/metric/internal/{aggtor => }/histogram.go | 2 +- sdk/metric/internal/{aggtor => }/lastvalue.go | 2 +- sdk/metric/internal/{aggtor => }/number.go | 2 +- sdk/metric/internal/{aggtor => }/sum.go | 2 +- 8 files changed, 11 insertions(+), 9 deletions(-) rename sdk/metric/internal/{aggtor => }/aggregation.go (97%) rename sdk/metric/internal/{aggtor => }/aggregator.go (92%) rename sdk/metric/internal/{aggtor => }/doc.go (69%) rename sdk/metric/internal/{aggtor => }/drop.go (93%) rename sdk/metric/internal/{aggtor => }/histogram.go (94%) rename sdk/metric/internal/{aggtor => }/lastvalue.go (94%) rename sdk/metric/internal/{aggtor => }/number.go (96%) rename sdk/metric/internal/{aggtor => }/sum.go (94%) diff --git a/sdk/metric/internal/aggtor/aggregation.go b/sdk/metric/internal/aggregation.go similarity index 97% rename from sdk/metric/internal/aggtor/aggregation.go rename to sdk/metric/internal/aggregation.go index 8b038e5ccd7..3e9d6cebfc8 100644 --- a/sdk/metric/internal/aggtor/aggregation.go +++ b/sdk/metric/internal/aggregation.go @@ -15,7 +15,7 @@ //go:build go1.18 // +build go1.18 -package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import ( "errors" diff --git a/sdk/metric/internal/aggtor/aggregator.go b/sdk/metric/internal/aggregator.go similarity index 92% rename from sdk/metric/internal/aggtor/aggregator.go rename to sdk/metric/internal/aggregator.go index 32dd9a5f7d1..64ab4d9a059 100644 --- a/sdk/metric/internal/aggtor/aggregator.go +++ b/sdk/metric/internal/aggregator.go @@ -15,7 +15,7 @@ //go:build go1.18 // +build go1.18 -package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import "go.opentelemetry.io/otel/attribute" diff --git a/sdk/metric/internal/aggtor/doc.go b/sdk/metric/internal/doc.go similarity index 69% rename from sdk/metric/internal/aggtor/doc.go rename to sdk/metric/internal/doc.go index 8690de1449c..059a38f5bd1 100644 --- a/sdk/metric/internal/aggtor/doc.go +++ b/sdk/metric/internal/doc.go @@ -15,5 +15,7 @@ //go:build go1.18 // +build go1.18 -// Package aggtor provides types and functionality to aggregate measurements. -package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" +// Package internal provides types and functionality used to aggregate and +// cycle the state of metric measurements made by the SDK. These types and +// functionality are meant only for internal SDK use. +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" diff --git a/sdk/metric/internal/aggtor/drop.go b/sdk/metric/internal/drop.go similarity index 93% rename from sdk/metric/internal/aggtor/drop.go rename to sdk/metric/internal/drop.go index 8095d2ec404..b145c3518c0 100644 --- a/sdk/metric/internal/aggtor/drop.go +++ b/sdk/metric/internal/drop.go @@ -15,7 +15,7 @@ //go:build go1.18 // +build go1.18 -package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import "go.opentelemetry.io/otel/attribute" diff --git a/sdk/metric/internal/aggtor/histogram.go b/sdk/metric/internal/histogram.go similarity index 94% rename from sdk/metric/internal/aggtor/histogram.go rename to sdk/metric/internal/histogram.go index 239a1d94025..bf64af2c42a 100644 --- a/sdk/metric/internal/aggtor/histogram.go +++ b/sdk/metric/internal/histogram.go @@ -15,7 +15,7 @@ //go:build go1.18 // +build go1.18 -package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import ( "go.opentelemetry.io/otel/attribute" diff --git a/sdk/metric/internal/aggtor/lastvalue.go b/sdk/metric/internal/lastvalue.go similarity index 94% rename from sdk/metric/internal/aggtor/lastvalue.go rename to sdk/metric/internal/lastvalue.go index dd9f8be1416..cd7663e092a 100644 --- a/sdk/metric/internal/aggtor/lastvalue.go +++ b/sdk/metric/internal/lastvalue.go @@ -15,7 +15,7 @@ //go:build go1.18 // +build go1.18 -package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import "go.opentelemetry.io/otel/attribute" diff --git a/sdk/metric/internal/aggtor/number.go b/sdk/metric/internal/number.go similarity index 96% rename from sdk/metric/internal/aggtor/number.go rename to sdk/metric/internal/number.go index 99cc6c9c291..a6f4654b323 100644 --- a/sdk/metric/internal/aggtor/number.go +++ b/sdk/metric/internal/number.go @@ -15,7 +15,7 @@ //go:build go1.18 // +build go1.18 -package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import ( "math" diff --git a/sdk/metric/internal/aggtor/sum.go b/sdk/metric/internal/sum.go similarity index 94% rename from sdk/metric/internal/aggtor/sum.go rename to sdk/metric/internal/sum.go index 3103b62de79..a57b0254e89 100644 --- a/sdk/metric/internal/aggtor/sum.go +++ b/sdk/metric/internal/sum.go @@ -15,7 +15,7 @@ //go:build go1.18 // +build go1.18 -package aggtor // import "go.opentelemetry.io/otel/sdk/metric/internal/aggtor" +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import "go.opentelemetry.io/otel/attribute" From 778c10aa3dac935ae2b3e1eb9e1cbf88dd161006 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 21 Jun 2022 13:08:26 -0700 Subject: [PATCH 09/25] Add Cycler interface Separate the duties of aggregation and maintaining state across aggregation periods. --- sdk/metric/internal/aggregation.go | 3 ++ sdk/metric/internal/aggregator.go | 16 +++++-- sdk/metric/internal/cycler.go | 69 ++++++++++++++++++++++++++++++ sdk/metric/internal/drop.go | 4 +- sdk/metric/internal/histogram.go | 4 +- sdk/metric/internal/lastvalue.go | 4 +- sdk/metric/internal/sum.go | 4 +- 7 files changed, 92 insertions(+), 12 deletions(-) create mode 100644 sdk/metric/internal/cycler.go diff --git a/sdk/metric/internal/aggregation.go b/sdk/metric/internal/aggregation.go index 3e9d6cebfc8..414e2fa0505 100644 --- a/sdk/metric/internal/aggregation.go +++ b/sdk/metric/internal/aggregation.go @@ -27,6 +27,9 @@ import ( // Aggregation is a single data point in a timeseries that summarizes // measurements made during a time span. type Aggregation struct { + // TODO: Replace this with the export.Aggregation type once #2961 is + // merged. + // Timestamp defines the time the last measurement was made. If zero, no // measurements were made for this time span. The time is represented as a // unix timestamp with nanosecond precision. diff --git a/sdk/metric/internal/aggregator.go b/sdk/metric/internal/aggregator.go index 64ab4d9a059..2f2b11dc83a 100644 --- a/sdk/metric/internal/aggregator.go +++ b/sdk/metric/internal/aggregator.go @@ -20,10 +20,18 @@ package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import "go.opentelemetry.io/otel/attribute" // Aggregator forms an aggregation from a collection of recorded measurements. +// Aggregators are use with Cyclers to collect and produce metrics from +// instrument measurements. Aggregators handle the collection (and +// aggregation) of measurements, while Cyclers handle how those aggregated +// measurements are combined and then produced to the telemetry pipeline. type Aggregator[N int64 | float64] interface { - // Record includes value scoped by attr in the aggregation. - Record(value N, attr *attribute.Set) + // Aggregate records the measurement, scoped by attr, and aggregates it + // into an aggregation. + Aggregate(measurement N, attr *attribute.Set) - // Aggregate returns aggregations of all recorded values. - Aggregate() []Aggregation + // flush clears aggregations that have been recorded. The Aggregator + // resets itself for a new aggregation period when called, it does not + // carry forward any state. If aggregation periods need to be combined it + // is the callers responsibility to achieve this. + flush() []Aggregation } diff --git a/sdk/metric/internal/cycler.go b/sdk/metric/internal/cycler.go new file mode 100644 index 00000000000..599280f8c6c --- /dev/null +++ b/sdk/metric/internal/cycler.go @@ -0,0 +1,69 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" + +// Cycler cycles aggregation periods. It will handle any state and progression +// of one period to the next based on the temporality of the cycling. +type Cycler interface { + // Cycle returns an []Aggregation for the current period. If the cycler + // merges state from previous periods into the current, the []Aggregation + // returned reflects this. + Cycle() []Aggregation + + // TODO: Replace the return type with []export.Aggregation once #2961 is + // merged. +} + +// deltaCylcer cycles aggregation periods by returning the aggregation +// produces from that period only. No state is maintained from one period to +// the next. +type deltaCylcer[N int64 | float64] struct { + aggregator Aggregator[N] +} + +func NewDeltaCylcer[N int64 | float64](a Aggregator[N]) Cycler { + return deltaCylcer[N]{aggregator: a} +} + +func (c deltaCylcer[N]) Cycle() []Aggregation { + return c.aggregator.flush() +} + +// cumulativeCylcer cycles aggregation periods by returning the cumulative +// aggregation from its start time until the current period. +type cumulativeCylcer[N int64 | float64] struct { + // TODO: implement a cumulative storing field. + aggregator Aggregator[N] +} + +func NewCumulativeCylcer[N int64 | float64](a Aggregator[N]) Cycler { + c := cumulativeCylcer[N]{aggregator: a} + + // TODO: Initialize a new cumulative storage. + + return c +} + +func (c cumulativeCylcer[N]) Cycle() []Aggregation { + // TODO: Update cumulative storage of aggregations and return them. + + // FIXME: currently this returns a delta representation of the + // aggregation. When the cumulative storage is complete it should return a + // cumulative representation. + return c.aggregator.flush() +} diff --git a/sdk/metric/internal/drop.go b/sdk/metric/internal/drop.go index b145c3518c0..23d32676fba 100644 --- a/sdk/metric/internal/drop.go +++ b/sdk/metric/internal/drop.go @@ -26,6 +26,6 @@ type dropAgg[N int64 | float64] struct{} // empty Aggregation. func NewDrop[N int64 | float64]() Aggregator[N] { return &dropAgg[N]{} } -func (s *dropAgg[N]) Record(N, *attribute.Set) {} +func (s *dropAgg[N]) Aggregate(N, *attribute.Set) {} -func (s *dropAgg[N]) Aggregate() []Aggregation { return nil } +func (s *dropAgg[N]) flush() []Aggregation { return nil } diff --git a/sdk/metric/internal/histogram.go b/sdk/metric/internal/histogram.go index bf64af2c42a..5c4fdd4c2e2 100644 --- a/sdk/metric/internal/histogram.go +++ b/sdk/metric/internal/histogram.go @@ -35,11 +35,11 @@ func NewHistogram[N int64 | float64](zero Number[N], cfg aggregation.ExplicitBuc return &histogramAgg[N]{} } -func (s *histogramAgg[N]) Record(value N, attr *attribute.Set) { +func (s *histogramAgg[N]) Aggregate(value N, attr *attribute.Set) { // TODO: implement. } -func (s *histogramAgg[N]) Aggregate() []Aggregation { +func (s *histogramAgg[N]) flush() []Aggregation { // TODO: implement. return []Aggregation{ { diff --git a/sdk/metric/internal/lastvalue.go b/sdk/metric/internal/lastvalue.go index cd7663e092a..1e49456c368 100644 --- a/sdk/metric/internal/lastvalue.go +++ b/sdk/metric/internal/lastvalue.go @@ -31,11 +31,11 @@ func NewLastValue[N int64 | float64](zero Number[N]) Aggregator[N] { return &lastValueAgg[N]{} } -func (s *lastValueAgg[N]) Record(value N, attr *attribute.Set) { +func (s *lastValueAgg[N]) Aggregate(value N, attr *attribute.Set) { // TODO: implement. } -func (s *lastValueAgg[N]) Aggregate() []Aggregation { +func (s *lastValueAgg[N]) flush() []Aggregation { // TODO: implement. return []Aggregation{ { diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index a57b0254e89..06702bd4c38 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -32,11 +32,11 @@ func NewSum[N int64 | float64](zero Number[N]) Aggregator[N] { return &sumAgg[N]{} } -func (s *sumAgg[N]) Record(value N, attr *attribute.Set) { +func (s *sumAgg[N]) Aggregate(value N, attr *attribute.Set) { // TODO: implement. } -func (s *sumAgg[N]) Aggregate() []Aggregation { +func (s *sumAgg[N]) flush() []Aggregation { // TODO: implement. return []Aggregation{ { From bd9c9634e892469fa6984df6c6fa581aa4c49f28 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 21 Jun 2022 13:09:51 -0700 Subject: [PATCH 10/25] Remove build flags for doc.go --- sdk/metric/internal/doc.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdk/metric/internal/doc.go b/sdk/metric/internal/doc.go index 059a38f5bd1..e1aa11ab2e1 100644 --- a/sdk/metric/internal/doc.go +++ b/sdk/metric/internal/doc.go @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:build go1.18 -// +build go1.18 - // Package internal provides types and functionality used to aggregate and // cycle the state of metric measurements made by the SDK. These types and // functionality are meant only for internal SDK use. From 816f3dc491d87eb47f38a3dc64c75961ab8dba81 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 21 Jun 2022 13:11:08 -0700 Subject: [PATCH 11/25] Clarify Cycler documentation --- sdk/metric/internal/cycler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/metric/internal/cycler.go b/sdk/metric/internal/cycler.go index 599280f8c6c..d8bbaeae9cd 100644 --- a/sdk/metric/internal/cycler.go +++ b/sdk/metric/internal/cycler.go @@ -17,8 +17,8 @@ package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" -// Cycler cycles aggregation periods. It will handle any state and progression -// of one period to the next based on the temporality of the cycling. +// Cycler cycles aggregation periods. It will handle any state progression +// from one period to the next based on the temporality of the cycling. type Cycler interface { // Cycle returns an []Aggregation for the current period. If the cycler // merges state from previous periods into the current, the []Aggregation From 0d7923c7506cc1b7ea16f0e2f6693034b5bc64eb Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 21 Jun 2022 13:15:19 -0700 Subject: [PATCH 12/25] Remove aggregation fold logic --- sdk/metric/internal/aggregation.go | 64 ++---------------------------- 1 file changed, 4 insertions(+), 60 deletions(-) diff --git a/sdk/metric/internal/aggregation.go b/sdk/metric/internal/aggregation.go index 414e2fa0505..71ad88cbca3 100644 --- a/sdk/metric/internal/aggregation.go +++ b/sdk/metric/internal/aggregation.go @@ -18,9 +18,6 @@ package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import ( - "errors" - "fmt" - "go.opentelemetry.io/otel/attribute" ) @@ -42,38 +39,16 @@ type Aggregation struct { Value value } -var errIncompatible = errors.New("incompatible aggregation") - -// Fold combines other into a. -func (a Aggregation) Fold(other Aggregation) error { - if other.Timestamp > a.Timestamp { - a.Timestamp = other.Timestamp - } - if !a.Attributes.Equals(other.Attributes) { - return fmt.Errorf("%w: attributes not equal", errIncompatible) - } - return a.Value.fold(other.Value) -} - type value interface { - // fold combines other into the value. It will return an errIncompatible - // if other is not a compatible type with value. - fold(other value) error + private() } -// SingleValue summarizes a set of measurements as a single numeric value. +// SingleValue summarizes a set of measurements as a single value. type SingleValue[N int64 | float64] struct { Value N } -func (v SingleValue[N]) fold(other value) error { - o, ok := other.(SingleValue[N]) - if !ok { - return fmt.Errorf("%w: value types %T and %T", errIncompatible, v, other) - } - v.Value += o.Value - return nil -} +func (SingleValue[N]) private() {} // HistogramValue summarizes a set of measurements as a histogram. type HistogramValue struct { @@ -83,35 +58,4 @@ type HistogramValue struct { Min, Max float64 } -func (v HistogramValue) fold(other value) error { - o, ok := other.(HistogramValue) - if !ok { - return fmt.Errorf("%w: value types %T and %T", errIncompatible, v, other) - } - if !sliceEqual[float64](v.Bounds, o.Bounds) || len(o.Counts) != len(v.Counts) { - return fmt.Errorf("%w: different histogram binning", errIncompatible) - } - v.Sum += o.Sum - for i, c := range o.Counts { - v.Counts[i] += c - } - if o.Min < v.Min { - v.Min = o.Min - } - if o.Max > v.Max { - v.Max = o.Max - } - return nil -} - -func sliceEqual[T comparable](a, b []T) bool { - if len(a) != len(b) { - return false - } - for i, v := range a { - if v != b[i] { - return false - } - } - return true -} +func (HistogramValue) private() {} From 0287a06a05ff39751ecfb916ad3b569f3d4be4f7 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 21 Jun 2022 14:08:20 -0700 Subject: [PATCH 13/25] Rename Number to Atomic --- sdk/metric/internal/{number.go => atomic.go} | 15 ++++++++------- sdk/metric/internal/histogram.go | 2 +- sdk/metric/internal/lastvalue.go | 2 +- sdk/metric/internal/sum.go | 2 +- 4 files changed, 11 insertions(+), 10 deletions(-) rename sdk/metric/internal/{number.go => atomic.go} (84%) diff --git a/sdk/metric/internal/number.go b/sdk/metric/internal/atomic.go similarity index 84% rename from sdk/metric/internal/number.go rename to sdk/metric/internal/atomic.go index a6f4654b323..4a554974df5 100644 --- a/sdk/metric/internal/number.go +++ b/sdk/metric/internal/atomic.go @@ -22,7 +22,8 @@ import ( "sync/atomic" ) -type Number[N int64 | float64] interface { +// Atomic provides atomic access to a generic value type. +type Atomic[N int64 | float64] interface { // Store value atomically. Store(value N) @@ -32,15 +33,15 @@ type Number[N int64 | float64] interface { // Load returns the stored value. Load() N - // Clone creates a new Number from the current value. - Clone() Number[N] + // Clone creates an independent copy of the current value. + Clone() Atomic[N] } type Int64 struct { value *int64 } -var _ Number[int64] = Int64{} +var _ Atomic[int64] = Int64{} func NewInt64(v int64) Int64 { return Int64{value: &v} @@ -49,7 +50,7 @@ func NewInt64(v int64) Int64 { func (v Int64) Store(value int64) { atomic.StoreInt64(v.value, value) } func (v Int64) Add(value int64) { atomic.AddInt64(v.value, value) } func (v Int64) Load() int64 { return atomic.LoadInt64(v.value) } -func (v Int64) Clone() Number[int64] { +func (v Int64) Clone() Atomic[int64] { return NewInt64(v.Load()) } @@ -57,7 +58,7 @@ type Float64 struct { value *uint64 } -var _ Number[float64] = Float64{} +var _ Atomic[float64] = Float64{} func NewFloat64(v float64) Float64 { u := math.Float64bits(v) @@ -82,6 +83,6 @@ func (v Float64) Load() float64 { return math.Float64frombits(atomic.LoadUint64(v.value)) } -func (v Float64) Clone() Number[float64] { +func (v Float64) Clone() Atomic[float64] { return NewFloat64(v.Load()) } diff --git a/sdk/metric/internal/histogram.go b/sdk/metric/internal/histogram.go index 5c4fdd4c2e2..8bf73838b5d 100644 --- a/sdk/metric/internal/histogram.go +++ b/sdk/metric/internal/histogram.go @@ -31,7 +31,7 @@ type histogramAgg[N int64 | float64] struct { // NewHistogram returns an Aggregator that summarizes a set of measurements as // an histogram. The zero value will be used as the start value for all the // buckets of new Aggregations. -func NewHistogram[N int64 | float64](zero Number[N], cfg aggregation.ExplicitBucketHistogram) Aggregator[N] { +func NewHistogram[N int64 | float64](zero Atomic[N], cfg aggregation.ExplicitBucketHistogram) Aggregator[N] { return &histogramAgg[N]{} } diff --git a/sdk/metric/internal/lastvalue.go b/sdk/metric/internal/lastvalue.go index 1e49456c368..74278d3679d 100644 --- a/sdk/metric/internal/lastvalue.go +++ b/sdk/metric/internal/lastvalue.go @@ -27,7 +27,7 @@ type lastValueAgg[N int64 | float64] struct { // NewLastValue returns an Aggregator that summarizes a set of measurements as // the last one made. The zero value will be used as the start value for all // new Aggregations. -func NewLastValue[N int64 | float64](zero Number[N]) Aggregator[N] { +func NewLastValue[N int64 | float64](zero Atomic[N]) Aggregator[N] { return &lastValueAgg[N]{} } diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index 06702bd4c38..543c66390b0 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -27,7 +27,7 @@ type sumAgg[N int64 | float64] struct { // NewSum returns an Aggregator that summarizes a set of // measurements as their arithmetic sum. The zero value will be used as the // start value for all new Aggregations. -func NewSum[N int64 | float64](zero Number[N]) Aggregator[N] { +func NewSum[N int64 | float64](zero Atomic[N]) Aggregator[N] { // TODO: implement. return &sumAgg[N]{} } From 36fad53be18efb3ed965ff67cae3a3ca9710d801 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 21 Jun 2022 14:29:43 -0700 Subject: [PATCH 14/25] Add tests for Atomic impls --- sdk/metric/internal/atomic.go | 6 +++ sdk/metric/internal/atomic_test.go | 64 ++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 sdk/metric/internal/atomic_test.go diff --git a/sdk/metric/internal/atomic.go b/sdk/metric/internal/atomic.go index 4a554974df5..2ceeabf0f25 100644 --- a/sdk/metric/internal/atomic.go +++ b/sdk/metric/internal/atomic.go @@ -37,6 +37,9 @@ type Atomic[N int64 | float64] interface { Clone() Atomic[N] } +// Int64 is an int64 implementation of an Atomic. +// +// An Int64 must not be copied. type Int64 struct { value *int64 } @@ -54,6 +57,9 @@ func (v Int64) Clone() Atomic[int64] { return NewInt64(v.Load()) } +// Float64 is a float64 implementation of an Atomic. +// +// An Float64 must not be copied. type Float64 struct { value *uint64 } diff --git a/sdk/metric/internal/atomic_test.go b/sdk/metric/internal/atomic_test.go new file mode 100644 index 00000000000..67449e8c06f --- /dev/null +++ b/sdk/metric/internal/atomic_test.go @@ -0,0 +1,64 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const routines = 5 + +func testAtomic[N int64 | float64](t *testing.T, a Atomic[N]) { + n, clone := a.Load(), a.Clone() + require.Equal(t, n, clone.Load(), "Clone() did not copy value") + + clone.Add(1) + assert.Equal(t, n, a.Load(), "Clone() returned original") + assert.Equal(t, n+1, clone.Load()) + + clone.Store(n) + assert.Equal(t, n, clone.Load()) +} + +func TestInt64(t *testing.T) { + var wg sync.WaitGroup + wg.Add(routines) + for i := int64(0); i < routines; i++ { + go func(n int64) { + defer wg.Done() + testAtomic[int64](t, NewInt64(n)) + }(i) + } + wg.Wait() +} + +func TestFloat64(t *testing.T) { + var wg sync.WaitGroup + wg.Add(routines) + for i := 0; i < routines; i++ { + go func(n float64) { + defer wg.Done() + testAtomic[float64](t, NewFloat64(n)) + }(float64(i)) + } + wg.Wait() +} From 47082f00bc67487c68743032b64223054868d758 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 21 Jun 2022 14:31:25 -0700 Subject: [PATCH 15/25] Remove unneeded Atomic implementation Add back when filling in structures. --- sdk/metric/internal/atomic.go | 42 +------------------- sdk/metric/internal/atomic_test.go | 64 ------------------------------ 2 files changed, 1 insertion(+), 105 deletions(-) delete mode 100644 sdk/metric/internal/atomic_test.go diff --git a/sdk/metric/internal/atomic.go b/sdk/metric/internal/atomic.go index 2ceeabf0f25..e3b8bc7c938 100644 --- a/sdk/metric/internal/atomic.go +++ b/sdk/metric/internal/atomic.go @@ -19,22 +19,11 @@ package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import ( "math" - "sync/atomic" ) // Atomic provides atomic access to a generic value type. type Atomic[N int64 | float64] interface { - // Store value atomically. - Store(value N) - - // Add value atomically. - Add(value N) - - // Load returns the stored value. - Load() N - - // Clone creates an independent copy of the current value. - Clone() Atomic[N] + // TODO: Add needed atomic methods. } // Int64 is an int64 implementation of an Atomic. @@ -50,13 +39,6 @@ func NewInt64(v int64) Int64 { return Int64{value: &v} } -func (v Int64) Store(value int64) { atomic.StoreInt64(v.value, value) } -func (v Int64) Add(value int64) { atomic.AddInt64(v.value, value) } -func (v Int64) Load() int64 { return atomic.LoadInt64(v.value) } -func (v Int64) Clone() Atomic[int64] { - return NewInt64(v.Load()) -} - // Float64 is a float64 implementation of an Atomic. // // An Float64 must not be copied. @@ -70,25 +52,3 @@ func NewFloat64(v float64) Float64 { u := math.Float64bits(v) return Float64{value: &u} } - -func (v Float64) Store(value float64) { - atomic.StoreUint64(v.value, math.Float64bits(value)) -} - -func (v Float64) Add(value float64) { - for { - old := atomic.LoadUint64(v.value) - sum := math.Float64bits(math.Float64frombits(old) + value) - if atomic.CompareAndSwapUint64(v.value, old, sum) { - return - } - } -} - -func (v Float64) Load() float64 { - return math.Float64frombits(atomic.LoadUint64(v.value)) -} - -func (v Float64) Clone() Atomic[float64] { - return NewFloat64(v.Load()) -} diff --git a/sdk/metric/internal/atomic_test.go b/sdk/metric/internal/atomic_test.go deleted file mode 100644 index 67449e8c06f..00000000000 --- a/sdk/metric/internal/atomic_test.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build go1.18 -// +build go1.18 - -package internal - -import ( - "sync" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const routines = 5 - -func testAtomic[N int64 | float64](t *testing.T, a Atomic[N]) { - n, clone := a.Load(), a.Clone() - require.Equal(t, n, clone.Load(), "Clone() did not copy value") - - clone.Add(1) - assert.Equal(t, n, a.Load(), "Clone() returned original") - assert.Equal(t, n+1, clone.Load()) - - clone.Store(n) - assert.Equal(t, n, clone.Load()) -} - -func TestInt64(t *testing.T) { - var wg sync.WaitGroup - wg.Add(routines) - for i := int64(0); i < routines; i++ { - go func(n int64) { - defer wg.Done() - testAtomic[int64](t, NewInt64(n)) - }(i) - } - wg.Wait() -} - -func TestFloat64(t *testing.T) { - var wg sync.WaitGroup - wg.Add(routines) - for i := 0; i < routines; i++ { - go func(n float64) { - defer wg.Done() - testAtomic[float64](t, NewFloat64(n)) - }(float64(i)) - } - wg.Wait() -} From 86061ccca45a105d44cafa500f89340835bdf74d Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 21 Jun 2022 14:32:47 -0700 Subject: [PATCH 16/25] Fix article in Float64 docs --- sdk/metric/internal/atomic.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/internal/atomic.go b/sdk/metric/internal/atomic.go index e3b8bc7c938..9f004fdd921 100644 --- a/sdk/metric/internal/atomic.go +++ b/sdk/metric/internal/atomic.go @@ -41,7 +41,7 @@ func NewInt64(v int64) Int64 { // Float64 is a float64 implementation of an Atomic. // -// An Float64 must not be copied. +// A Float64 must not be copied. type Float64 struct { value *uint64 } From fc0f30efa3da1c394ab3c9b4c8b8a01a146a6ad3 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 21 Jun 2022 15:41:25 -0700 Subject: [PATCH 17/25] Remove Atomic This is an implementation detail. --- sdk/metric/internal/atomic.go | 54 -------------------------------- sdk/metric/internal/histogram.go | 2 +- sdk/metric/internal/lastvalue.go | 2 +- sdk/metric/internal/sum.go | 2 +- 4 files changed, 3 insertions(+), 57 deletions(-) delete mode 100644 sdk/metric/internal/atomic.go diff --git a/sdk/metric/internal/atomic.go b/sdk/metric/internal/atomic.go deleted file mode 100644 index 9f004fdd921..00000000000 --- a/sdk/metric/internal/atomic.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build go1.18 -// +build go1.18 - -package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" - -import ( - "math" -) - -// Atomic provides atomic access to a generic value type. -type Atomic[N int64 | float64] interface { - // TODO: Add needed atomic methods. -} - -// Int64 is an int64 implementation of an Atomic. -// -// An Int64 must not be copied. -type Int64 struct { - value *int64 -} - -var _ Atomic[int64] = Int64{} - -func NewInt64(v int64) Int64 { - return Int64{value: &v} -} - -// Float64 is a float64 implementation of an Atomic. -// -// A Float64 must not be copied. -type Float64 struct { - value *uint64 -} - -var _ Atomic[float64] = Float64{} - -func NewFloat64(v float64) Float64 { - u := math.Float64bits(v) - return Float64{value: &u} -} diff --git a/sdk/metric/internal/histogram.go b/sdk/metric/internal/histogram.go index 8bf73838b5d..5338550f429 100644 --- a/sdk/metric/internal/histogram.go +++ b/sdk/metric/internal/histogram.go @@ -31,7 +31,7 @@ type histogramAgg[N int64 | float64] struct { // NewHistogram returns an Aggregator that summarizes a set of measurements as // an histogram. The zero value will be used as the start value for all the // buckets of new Aggregations. -func NewHistogram[N int64 | float64](zero Atomic[N], cfg aggregation.ExplicitBucketHistogram) Aggregator[N] { +func NewHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) Aggregator[N] { return &histogramAgg[N]{} } diff --git a/sdk/metric/internal/lastvalue.go b/sdk/metric/internal/lastvalue.go index 74278d3679d..93ad99d6edb 100644 --- a/sdk/metric/internal/lastvalue.go +++ b/sdk/metric/internal/lastvalue.go @@ -27,7 +27,7 @@ type lastValueAgg[N int64 | float64] struct { // NewLastValue returns an Aggregator that summarizes a set of measurements as // the last one made. The zero value will be used as the start value for all // new Aggregations. -func NewLastValue[N int64 | float64](zero Atomic[N]) Aggregator[N] { +func NewLastValue[N int64 | float64]() Aggregator[N] { return &lastValueAgg[N]{} } diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index 543c66390b0..7ad46459df9 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -27,7 +27,7 @@ type sumAgg[N int64 | float64] struct { // NewSum returns an Aggregator that summarizes a set of // measurements as their arithmetic sum. The zero value will be used as the // start value for all new Aggregations. -func NewSum[N int64 | float64](zero Atomic[N]) Aggregator[N] { +func NewSum[N int64 | float64]() Aggregator[N] { // TODO: implement. return &sumAgg[N]{} } From 658f4a5a6596ce1c65d19578eb5e7f0a89e9c395 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 21 Jun 2022 19:30:26 -0700 Subject: [PATCH 18/25] Add aggregator_example_test --- .../internal/aggregator_example_test.go | 126 ++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 sdk/metric/internal/aggregator_example_test.go diff --git a/sdk/metric/internal/aggregator_example_test.go b/sdk/metric/internal/aggregator_example_test.go new file mode 100644 index 00000000000..9b61be9a9d6 --- /dev/null +++ b/sdk/metric/internal/aggregator_example_test.go @@ -0,0 +1,126 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package internal + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.opentelemetry.io/otel/sdk/metric/aggregation" +) + +type meter struct { + // When a reader initiates a collection, the meter would collect + // aggregations from each of these cyclers. In this process they will + // progress the aggregation period of each instrument's aggregator. + cyclers []Cycler +} + +func (m *meter) SyncInt64() syncint64.InstrumentProvider { + // The same would be done for all the other instrument providers. + return (*syncInt64Provider)(m) +} + +type syncInt64Provider meter + +func (p *syncInt64Provider) Counter(string, ...instrument.Option) (syncint64.Counter, error) { + // This is an example of how a synchronous int64 provider would create an + // aggregator and cycler for a new counter. At this point the provider + // would determine the aggregation and temporality to used based on the + // Reader and View configuration. Assume here these are determined to be a + // cumulative sum. + + aggregator := NewSum[int64]() + count := inst{agg: aggregator} + + cycler := NewCumulativeCylcer(aggregator) + p.cyclers = append(p.cyclers, cycler) + + return count, nil +} + +func (p *syncInt64Provider) UpDownCounter(string, ...instrument.Option) (syncint64.UpDownCounter, error) { + // This is an example of how a synchronous int64 provider would create an + // aggregator and cycler for a new up-down counter. At this point the + // provider would determine the aggregation and temporality to used based + // on the Reader and View configuration. Assume here these are determined + // to be a delta last-value. + + aggregator := NewLastValue[int64]() + upDownCount := inst{agg: aggregator} + + cycler := NewDeltaCylcer(aggregator) + p.cyclers = append(p.cyclers, cycler) + + return upDownCount, nil +} + +func (p *syncInt64Provider) Histogram(string, ...instrument.Option) (syncint64.Histogram, error) { + // This is an example of how a synchronous int64 provider would create an + // aggregator and cycler for a new histogram. At this point the provider + // would determine the aggregation and temporality to used based on the + // Reader and View configuration. Assume here these are determined to be a + // delta explicit-bucket histogram. + + aggregator := NewHistogram[int64](aggregation.ExplicitBucketHistogram{ + Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000}, + NoMinMax: false, + }) + hist := inst{agg: aggregator} + + cycler := NewDeltaCylcer(aggregator) + p.cyclers = append(p.cyclers, cycler) + + return hist, nil +} + +// inst is a generalized int64 synchronous counter, up-down counter, and +// histogram used for demonstration purposes only. +type inst struct { + instrument.Synchronous + + agg Aggregator[int64] +} + +func (inst) Add(context.Context, int64, ...attribute.KeyValue) {} +func (inst) Record(context.Context, int64, ...attribute.KeyValue) {} + +func Example() { + m := meter{} + provider := m.SyncInt64() + + count, _ := provider.Counter("counter example") + fmt.Printf("counter aggregator: %T\n", count.(inst).agg) + + upDownCount, _ := provider.UpDownCounter("up-down counter example") + fmt.Printf("up-down counter aggregator: %T\n", upDownCount.(inst).agg) + + hist, _ := provider.UpDownCounter("histogram example") + fmt.Printf("histogram aggregator: %T\n", hist.(inst).agg) + + fmt.Printf("meter cyclers: %T{%T, %T, %T}\n", m.cyclers, m.cyclers[0], m.cyclers[1], m.cyclers[2]) + + // Output: + // counter aggregator: *internal.sumAgg[int64] + // up-down counter aggregator: *internal.lastValueAgg[int64] + // histogram aggregator: *internal.lastValueAgg[int64] + // meter cyclers: []internal.Cycler{internal.cumulativeCylcer[int64], internal.deltaCylcer[int64], internal.deltaCylcer[int64]} +} From 02488a235352aa110e4eb01c2f093a5373c9ec1d Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 22 Jun 2022 08:19:45 -0700 Subject: [PATCH 19/25] Fix hist example --- sdk/metric/internal/aggregator_example_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/metric/internal/aggregator_example_test.go b/sdk/metric/internal/aggregator_example_test.go index 9b61be9a9d6..6b76092919c 100644 --- a/sdk/metric/internal/aggregator_example_test.go +++ b/sdk/metric/internal/aggregator_example_test.go @@ -113,7 +113,7 @@ func Example() { upDownCount, _ := provider.UpDownCounter("up-down counter example") fmt.Printf("up-down counter aggregator: %T\n", upDownCount.(inst).agg) - hist, _ := provider.UpDownCounter("histogram example") + hist, _ := provider.Histogram("histogram example") fmt.Printf("histogram aggregator: %T\n", hist.(inst).agg) fmt.Printf("meter cyclers: %T{%T, %T, %T}\n", m.cyclers, m.cyclers[0], m.cyclers[1], m.cyclers[2]) @@ -121,6 +121,6 @@ func Example() { // Output: // counter aggregator: *internal.sumAgg[int64] // up-down counter aggregator: *internal.lastValueAgg[int64] - // histogram aggregator: *internal.lastValueAgg[int64] + // histogram aggregator: *internal.histogramAgg[int64] // meter cyclers: []internal.Cycler{internal.cumulativeCylcer[int64], internal.deltaCylcer[int64], internal.deltaCylcer[int64]} } From c266e46cf6c55d17961fa05aadbde2dd36a56150 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 22 Jun 2022 08:41:47 -0700 Subject: [PATCH 20/25] Add issue numbers to all TODO and FIXME --- sdk/metric/internal/aggregation.go | 4 ++-- sdk/metric/internal/cycler.go | 12 ++++++------ sdk/metric/internal/histogram.go | 8 ++++---- sdk/metric/internal/lastvalue.go | 8 ++++---- sdk/metric/internal/sum.go | 10 +++++----- 5 files changed, 21 insertions(+), 21 deletions(-) diff --git a/sdk/metric/internal/aggregation.go b/sdk/metric/internal/aggregation.go index 71ad88cbca3..4149f2672cb 100644 --- a/sdk/metric/internal/aggregation.go +++ b/sdk/metric/internal/aggregation.go @@ -24,8 +24,8 @@ import ( // Aggregation is a single data point in a timeseries that summarizes // measurements made during a time span. type Aggregation struct { - // TODO: Replace this with the export.Aggregation type once #2961 is - // merged. + // TODO(#2968): Replace this with the export.Aggregation type once #2961 + // is merged. // Timestamp defines the time the last measurement was made. If zero, no // measurements were made for this time span. The time is represented as a diff --git a/sdk/metric/internal/cycler.go b/sdk/metric/internal/cycler.go index d8bbaeae9cd..e37a855b24c 100644 --- a/sdk/metric/internal/cycler.go +++ b/sdk/metric/internal/cycler.go @@ -25,8 +25,8 @@ type Cycler interface { // returned reflects this. Cycle() []Aggregation - // TODO: Replace the return type with []export.Aggregation once #2961 is - // merged. + // TODO(#2968): Replace the return type with []export.Aggregation once + // #2961 is merged. } // deltaCylcer cycles aggregation periods by returning the aggregation @@ -47,22 +47,22 @@ func (c deltaCylcer[N]) Cycle() []Aggregation { // cumulativeCylcer cycles aggregation periods by returning the cumulative // aggregation from its start time until the current period. type cumulativeCylcer[N int64 | float64] struct { - // TODO: implement a cumulative storing field. + // TODO(#2969): implement a cumulative storing field. aggregator Aggregator[N] } func NewCumulativeCylcer[N int64 | float64](a Aggregator[N]) Cycler { c := cumulativeCylcer[N]{aggregator: a} - // TODO: Initialize a new cumulative storage. + // TODO(#2969): Initialize a new cumulative storage. return c } func (c cumulativeCylcer[N]) Cycle() []Aggregation { - // TODO: Update cumulative storage of aggregations and return them. + // TODO(#2969): Update cumulative storage of aggregations and return them. - // FIXME: currently this returns a delta representation of the + // FIXME(#2969): currently this returns a delta representation of the // aggregation. When the cumulative storage is complete it should return a // cumulative representation. return c.aggregator.flush() diff --git a/sdk/metric/internal/histogram.go b/sdk/metric/internal/histogram.go index 5338550f429..5b0dea85fc3 100644 --- a/sdk/metric/internal/histogram.go +++ b/sdk/metric/internal/histogram.go @@ -25,7 +25,7 @@ import ( // histogramAgg summarizes a set of measurements as an histogram with // explicitly defined buckets. type histogramAgg[N int64 | float64] struct { - // TODO: implement. + // TODO(#2970): implement. } // NewHistogram returns an Aggregator that summarizes a set of measurements as @@ -36,14 +36,14 @@ func NewHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) Ag } func (s *histogramAgg[N]) Aggregate(value N, attr *attribute.Set) { - // TODO: implement. + // TODO(#2970): implement. } func (s *histogramAgg[N]) flush() []Aggregation { - // TODO: implement. + // TODO(#2970): implement. return []Aggregation{ { - Value: HistogramValue{ /* TODO: calculate. */ }, + Value: HistogramValue{ /* TODO(#2970): calculate. */ }, }, } } diff --git a/sdk/metric/internal/lastvalue.go b/sdk/metric/internal/lastvalue.go index 93ad99d6edb..2756cfbef9f 100644 --- a/sdk/metric/internal/lastvalue.go +++ b/sdk/metric/internal/lastvalue.go @@ -21,7 +21,7 @@ import "go.opentelemetry.io/otel/attribute" // lastValueAgg summarizes a set of measurements as the last one made. type lastValueAgg[N int64 | float64] struct { - // TODO: implement. + // TODO(#2971): implement. } // NewLastValue returns an Aggregator that summarizes a set of measurements as @@ -32,14 +32,14 @@ func NewLastValue[N int64 | float64]() Aggregator[N] { } func (s *lastValueAgg[N]) Aggregate(value N, attr *attribute.Set) { - // TODO: implement. + // TODO(#2971): implement. } func (s *lastValueAgg[N]) flush() []Aggregation { - // TODO: implement. + // TODO(#2971): implement. return []Aggregation{ { - Value: SingleValue[N]{ /* TODO: calculate */ }, + Value: SingleValue[N]{ /* TODO(#2971): calculate */ }, }, } } diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index 7ad46459df9..7210e10a6a8 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -21,26 +21,26 @@ import "go.opentelemetry.io/otel/attribute" // sumAgg summarizes a set of measurements as their arithmetic sum. type sumAgg[N int64 | float64] struct { - // TODO: implement. + // TODO(#2972): implement. } // NewSum returns an Aggregator that summarizes a set of // measurements as their arithmetic sum. The zero value will be used as the // start value for all new Aggregations. func NewSum[N int64 | float64]() Aggregator[N] { - // TODO: implement. + // TODO(#2972): implement. return &sumAgg[N]{} } func (s *sumAgg[N]) Aggregate(value N, attr *attribute.Set) { - // TODO: implement. + // TODO(#2972): implement. } func (s *sumAgg[N]) flush() []Aggregation { - // TODO: implement. + // TODO(#2972): implement. return []Aggregation{ { - Value: SingleValue[N]{ /* TODO: calculate */ }, + Value: SingleValue[N]{ /* TODO(#2972): calculate */ }, }, } } From 3057ac24ac39ceff849c28531407321c0d3fcbb8 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 22 Jun 2022 09:31:08 -0700 Subject: [PATCH 21/25] Remove zero parameter comment --- sdk/metric/internal/histogram.go | 3 +-- sdk/metric/internal/lastvalue.go | 3 +-- sdk/metric/internal/sum.go | 3 +-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/sdk/metric/internal/histogram.go b/sdk/metric/internal/histogram.go index 5b0dea85fc3..90d442c8d84 100644 --- a/sdk/metric/internal/histogram.go +++ b/sdk/metric/internal/histogram.go @@ -29,8 +29,7 @@ type histogramAgg[N int64 | float64] struct { } // NewHistogram returns an Aggregator that summarizes a set of measurements as -// an histogram. The zero value will be used as the start value for all the -// buckets of new Aggregations. +// an histogram. func NewHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) Aggregator[N] { return &histogramAgg[N]{} } diff --git a/sdk/metric/internal/lastvalue.go b/sdk/metric/internal/lastvalue.go index 2756cfbef9f..467eeae4c18 100644 --- a/sdk/metric/internal/lastvalue.go +++ b/sdk/metric/internal/lastvalue.go @@ -25,8 +25,7 @@ type lastValueAgg[N int64 | float64] struct { } // NewLastValue returns an Aggregator that summarizes a set of measurements as -// the last one made. The zero value will be used as the start value for all -// new Aggregations. +// the last one made. func NewLastValue[N int64 | float64]() Aggregator[N] { return &lastValueAgg[N]{} } diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index 7210e10a6a8..0fb0eebfeb4 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -25,8 +25,7 @@ type sumAgg[N int64 | float64] struct { } // NewSum returns an Aggregator that summarizes a set of -// measurements as their arithmetic sum. The zero value will be used as the -// start value for all new Aggregations. +// measurements as their arithmetic sum. func NewSum[N int64 | float64]() Aggregator[N] { // TODO(#2972): implement. return &sumAgg[N]{} From 6b39c546587550b435a923e2ff4b0d513b8f7fab Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 23 Jun 2022 14:06:37 -0700 Subject: [PATCH 22/25] Combine the cycler into the aggregators --- sdk/metric/internal/aggregator.go | 12 +-- .../internal/aggregator_example_test.go | 74 +++++++++---------- sdk/metric/internal/cycler.go | 69 ----------------- sdk/metric/internal/drop.go | 2 +- sdk/metric/internal/histogram.go | 58 +++++++++++---- sdk/metric/internal/lastvalue.go | 16 ++-- sdk/metric/internal/sum.go | 58 +++++++++++---- 7 files changed, 135 insertions(+), 154 deletions(-) delete mode 100644 sdk/metric/internal/cycler.go diff --git a/sdk/metric/internal/aggregator.go b/sdk/metric/internal/aggregator.go index 2f2b11dc83a..63489f1ae8b 100644 --- a/sdk/metric/internal/aggregator.go +++ b/sdk/metric/internal/aggregator.go @@ -20,18 +20,12 @@ package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import "go.opentelemetry.io/otel/attribute" // Aggregator forms an aggregation from a collection of recorded measurements. -// Aggregators are use with Cyclers to collect and produce metrics from -// instrument measurements. Aggregators handle the collection (and -// aggregation) of measurements, while Cyclers handle how those aggregated -// measurements are combined and then produced to the telemetry pipeline. type Aggregator[N int64 | float64] interface { // Aggregate records the measurement, scoped by attr, and aggregates it // into an aggregation. Aggregate(measurement N, attr *attribute.Set) - // flush clears aggregations that have been recorded. The Aggregator - // resets itself for a new aggregation period when called, it does not - // carry forward any state. If aggregation periods need to be combined it - // is the callers responsibility to achieve this. - flush() []Aggregation + // Aggregations retuns a slice of Aggregation, one per each attribute set + // used to scope measurement aggregation, and ends an aggregation cycle. + Aggregations() []Aggregation } diff --git a/sdk/metric/internal/aggregator_example_test.go b/sdk/metric/internal/aggregator_example_test.go index 6b76092919c..04db712016a 100644 --- a/sdk/metric/internal/aggregator_example_test.go +++ b/sdk/metric/internal/aggregator_example_test.go @@ -29,9 +29,9 @@ import ( type meter struct { // When a reader initiates a collection, the meter would collect - // aggregations from each of these cyclers. In this process they will + // aggregations from each of these functions. In this process they will // progress the aggregation period of each instrument's aggregator. - cyclers []Cycler + aggregationFuncs []func() []Aggregation } func (m *meter) SyncInt64() syncint64.InstrumentProvider { @@ -43,51 +43,55 @@ type syncInt64Provider meter func (p *syncInt64Provider) Counter(string, ...instrument.Option) (syncint64.Counter, error) { // This is an example of how a synchronous int64 provider would create an - // aggregator and cycler for a new counter. At this point the provider - // would determine the aggregation and temporality to used based on the - // Reader and View configuration. Assume here these are determined to be a + // aggregator for a new counter. At this point the provider would + // determine the aggregation and temporality to used based on the Reader + // and View configuration. Assume here these are determined to be a // cumulative sum. - aggregator := NewSum[int64]() - count := inst{agg: aggregator} + aggregator := NewCumulativeSum[int64]() + count := inst{aggregateFunc: aggregator.Aggregate} - cycler := NewCumulativeCylcer(aggregator) - p.cyclers = append(p.cyclers, cycler) + p.aggregationFuncs = append(p.aggregationFuncs, aggregator.Aggregations) + + fmt.Printf("using %T aggregator for counter\n", aggregator) return count, nil } func (p *syncInt64Provider) UpDownCounter(string, ...instrument.Option) (syncint64.UpDownCounter, error) { // This is an example of how a synchronous int64 provider would create an - // aggregator and cycler for a new up-down counter. At this point the - // provider would determine the aggregation and temporality to used based - // on the Reader and View configuration. Assume here these are determined - // to be a delta last-value. + // aggregator for a new up-down counter. At this point the provider would + // determine the aggregation and temporality to used based on the Reader + // and View configuration. Assume here these are determined to be a + // last-value aggregation (the temporality does not affect the produced + // aggregations). aggregator := NewLastValue[int64]() - upDownCount := inst{agg: aggregator} + upDownCount := inst{aggregateFunc: aggregator.Aggregate} + + p.aggregationFuncs = append(p.aggregationFuncs, aggregator.Aggregations) - cycler := NewDeltaCylcer(aggregator) - p.cyclers = append(p.cyclers, cycler) + fmt.Printf("using %T aggregator for up-down counter\n", aggregator) return upDownCount, nil } func (p *syncInt64Provider) Histogram(string, ...instrument.Option) (syncint64.Histogram, error) { // This is an example of how a synchronous int64 provider would create an - // aggregator and cycler for a new histogram. At this point the provider - // would determine the aggregation and temporality to used based on the - // Reader and View configuration. Assume here these are determined to be a - // delta explicit-bucket histogram. + // aggregator for a new histogram. At this point the provider would + // determine the aggregation and temporality to used based on the Reader + // and View configuration. Assume here these are determined to be a delta + // explicit-bucket histogram. - aggregator := NewHistogram[int64](aggregation.ExplicitBucketHistogram{ + aggregator := NewDeltaHistogram[int64](aggregation.ExplicitBucketHistogram{ Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000}, NoMinMax: false, }) - hist := inst{agg: aggregator} + hist := inst{aggregateFunc: aggregator.Aggregate} + + p.aggregationFuncs = append(p.aggregationFuncs, aggregator.Aggregations) - cycler := NewDeltaCylcer(aggregator) - p.cyclers = append(p.cyclers, cycler) + fmt.Printf("using %T aggregator for histogram\n", aggregator) return hist, nil } @@ -97,7 +101,7 @@ func (p *syncInt64Provider) Histogram(string, ...instrument.Option) (syncint64.H type inst struct { instrument.Synchronous - agg Aggregator[int64] + aggregateFunc func(int64, *attribute.Set) } func (inst) Add(context.Context, int64, ...attribute.KeyValue) {} @@ -107,20 +111,12 @@ func Example() { m := meter{} provider := m.SyncInt64() - count, _ := provider.Counter("counter example") - fmt.Printf("counter aggregator: %T\n", count.(inst).agg) - - upDownCount, _ := provider.UpDownCounter("up-down counter example") - fmt.Printf("up-down counter aggregator: %T\n", upDownCount.(inst).agg) - - hist, _ := provider.Histogram("histogram example") - fmt.Printf("histogram aggregator: %T\n", hist.(inst).agg) - - fmt.Printf("meter cyclers: %T{%T, %T, %T}\n", m.cyclers, m.cyclers[0], m.cyclers[1], m.cyclers[2]) + provider.Counter("counter example") + provider.UpDownCounter("up-down counter example") + provider.Histogram("histogram example") // Output: - // counter aggregator: *internal.sumAgg[int64] - // up-down counter aggregator: *internal.lastValueAgg[int64] - // histogram aggregator: *internal.histogramAgg[int64] - // meter cyclers: []internal.Cycler{internal.cumulativeCylcer[int64], internal.deltaCylcer[int64], internal.deltaCylcer[int64]} + // using *internal.cumulativeSum[int64] aggregator for counter + // using *internal.lastValue[int64] aggregator for up-down counter + // using *internal.deltaHistogram[int64] aggregator for histogram } diff --git a/sdk/metric/internal/cycler.go b/sdk/metric/internal/cycler.go deleted file mode 100644 index e37a855b24c..00000000000 --- a/sdk/metric/internal/cycler.go +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build go1.18 -// +build go1.18 - -package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" - -// Cycler cycles aggregation periods. It will handle any state progression -// from one period to the next based on the temporality of the cycling. -type Cycler interface { - // Cycle returns an []Aggregation for the current period. If the cycler - // merges state from previous periods into the current, the []Aggregation - // returned reflects this. - Cycle() []Aggregation - - // TODO(#2968): Replace the return type with []export.Aggregation once - // #2961 is merged. -} - -// deltaCylcer cycles aggregation periods by returning the aggregation -// produces from that period only. No state is maintained from one period to -// the next. -type deltaCylcer[N int64 | float64] struct { - aggregator Aggregator[N] -} - -func NewDeltaCylcer[N int64 | float64](a Aggregator[N]) Cycler { - return deltaCylcer[N]{aggregator: a} -} - -func (c deltaCylcer[N]) Cycle() []Aggregation { - return c.aggregator.flush() -} - -// cumulativeCylcer cycles aggregation periods by returning the cumulative -// aggregation from its start time until the current period. -type cumulativeCylcer[N int64 | float64] struct { - // TODO(#2969): implement a cumulative storing field. - aggregator Aggregator[N] -} - -func NewCumulativeCylcer[N int64 | float64](a Aggregator[N]) Cycler { - c := cumulativeCylcer[N]{aggregator: a} - - // TODO(#2969): Initialize a new cumulative storage. - - return c -} - -func (c cumulativeCylcer[N]) Cycle() []Aggregation { - // TODO(#2969): Update cumulative storage of aggregations and return them. - - // FIXME(#2969): currently this returns a delta representation of the - // aggregation. When the cumulative storage is complete it should return a - // cumulative representation. - return c.aggregator.flush() -} diff --git a/sdk/metric/internal/drop.go b/sdk/metric/internal/drop.go index 23d32676fba..8a52910ae6f 100644 --- a/sdk/metric/internal/drop.go +++ b/sdk/metric/internal/drop.go @@ -28,4 +28,4 @@ func NewDrop[N int64 | float64]() Aggregator[N] { return &dropAgg[N]{} } func (s *dropAgg[N]) Aggregate(N, *attribute.Set) {} -func (s *dropAgg[N]) flush() []Aggregation { return nil } +func (s *dropAgg[N]) Aggregations() []Aggregation { return nil } diff --git a/sdk/metric/internal/histogram.go b/sdk/metric/internal/histogram.go index 90d442c8d84..f25e9aa7430 100644 --- a/sdk/metric/internal/histogram.go +++ b/sdk/metric/internal/histogram.go @@ -22,27 +22,59 @@ import ( "go.opentelemetry.io/otel/sdk/metric/aggregation" ) -// histogramAgg summarizes a set of measurements as an histogram with +// histogram summarizes a set of measurements as an histogram with // explicitly defined buckets. -type histogramAgg[N int64 | float64] struct { +type histogram[N int64 | float64] struct { // TODO(#2970): implement. } -// NewHistogram returns an Aggregator that summarizes a set of measurements as -// an histogram. -func NewHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) Aggregator[N] { - return &histogramAgg[N]{} +func (s *histogram[N]) Aggregate(value N, attr *attribute.Set) { + // TODO(#2970): implement. +} + +// NewDeltaHistogram returns an Aggregator that summarizes a set of +// measurements as an histogram. Each histogram is scoped by attributes and +// the aggregation cycle the measurements were made in. +// +// Each aggregation cycle is treated independently. When the returned +// Aggregator's Aggregations method is called it will reset all histogram +// counts to zero. +func NewDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) Aggregator[N] { + return &deltaHistogram[N]{} +} + +// deltaHistogram summarizes a set of measurements made in a single +// aggregation cycle as an histogram with explicitly defined buckets. +type deltaHistogram[N int64 | float64] struct { + histogram[N] + + // TODO(#2970): implement. } -func (s *histogramAgg[N]) Aggregate(value N, attr *attribute.Set) { +func (s *deltaHistogram[N]) Aggregations() []Aggregation { + // TODO(#2970): implement. + return nil +} + +// NewCumulativeHistogram returns an Aggregator that summarizes a set of +// measurements as an histogram. Each histogram is scoped by attributes. +// +// Each aggregation cycle builds from the previous, the histogram counts are +// the bucketed counts of all values aggregated since the returned Aggregator +// was created. +func NewCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) Aggregator[N] { + return &cumulativeHistogram[N]{} +} + +// cumulativeHistogram summarizes a set of measurements made over all +// aggregation cycles as an histogram with explicitly defined buckets. +type cumulativeHistogram[N int64 | float64] struct { + histogram[N] + // TODO(#2970): implement. } -func (s *histogramAgg[N]) flush() []Aggregation { +func (s *cumulativeHistogram[N]) Aggregations() []Aggregation { // TODO(#2970): implement. - return []Aggregation{ - { - Value: HistogramValue{ /* TODO(#2970): calculate. */ }, - }, - } + return nil } diff --git a/sdk/metric/internal/lastvalue.go b/sdk/metric/internal/lastvalue.go index 467eeae4c18..b4e5fa51590 100644 --- a/sdk/metric/internal/lastvalue.go +++ b/sdk/metric/internal/lastvalue.go @@ -19,26 +19,22 @@ package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import "go.opentelemetry.io/otel/attribute" -// lastValueAgg summarizes a set of measurements as the last one made. -type lastValueAgg[N int64 | float64] struct { +// lastValue summarizes a set of measurements as the last one made. +type lastValue[N int64 | float64] struct { // TODO(#2971): implement. } // NewLastValue returns an Aggregator that summarizes a set of measurements as // the last one made. func NewLastValue[N int64 | float64]() Aggregator[N] { - return &lastValueAgg[N]{} + return &lastValue[N]{} } -func (s *lastValueAgg[N]) Aggregate(value N, attr *attribute.Set) { +func (s *lastValue[N]) Aggregate(value N, attr *attribute.Set) { // TODO(#2971): implement. } -func (s *lastValueAgg[N]) flush() []Aggregation { +func (s *lastValue[N]) Aggregations() []Aggregation { // TODO(#2971): implement. - return []Aggregation{ - { - Value: SingleValue[N]{ /* TODO(#2971): calculate */ }, - }, - } + return nil } diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index 0fb0eebfeb4..6315bf70d44 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -19,27 +19,59 @@ package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" import "go.opentelemetry.io/otel/attribute" -// sumAgg summarizes a set of measurements as their arithmetic sum. -type sumAgg[N int64 | float64] struct { +// sum summarizes a set of measurements as their arithmetic sum. +type sum[N int64 | float64] struct { // TODO(#2972): implement. } -// NewSum returns an Aggregator that summarizes a set of -// measurements as their arithmetic sum. -func NewSum[N int64 | float64]() Aggregator[N] { +func (s *sum[N]) Aggregate(value N, attr *attribute.Set) { // TODO(#2972): implement. - return &sumAgg[N]{} } -func (s *sumAgg[N]) Aggregate(value N, attr *attribute.Set) { +// NewDeltaSum returns an Aggregator that summarizes a set of measurements as +// their arithmetic sum. Each sum is scoped by attributes and the aggregation +// cycle the measurements were made in. +// +// Each aggregation cycle is treated independently. When the returned +// Aggregator's Aggregations method is called it will reset all sums to zero. +func NewDeltaSum[N int64 | float64]() Aggregator[N] { + // TODO(#2972): implement. + return &deltaSum[N]{} +} + +// deltaSum summarizes a set of measurements made in a single aggregation +// cycle as their arithmetic sum. +type deltaSum[N int64 | float64] struct { + sum[N] + + // TODO(#2972): implement. +} + +func (s *deltaSum[N]) Aggregations() []Aggregation { + // TODO(#2972): implement. + return nil +} + +// NewCumulativeSum returns an Aggregator that summarizes a set of +// measurements as their arithmetic sum. Each sum is scoped by attributes. +// +// Each aggregation cycle builds from the previous, the sums are the +// arithmetic sum of all values aggregated since the returned Aggregator was +// created. +func NewCumulativeSum[N int64 | float64]() Aggregator[N] { + // TODO(#2972): implement. + return &cumulativeSum[N]{} +} + +// cumulativeSum summarizes a set of measurements made over all aggregation +// cycles as their arithmetic sum. +type cumulativeSum[N int64 | float64] struct { + sum[N] + // TODO(#2972): implement. } -func (s *sumAgg[N]) flush() []Aggregation { +func (s *cumulativeSum[N]) Aggregations() []Aggregation { // TODO(#2972): implement. - return []Aggregation{ - { - Value: SingleValue[N]{ /* TODO(#2972): calculate */ }, - }, - } + return nil } From 02edebdab0549422c259c31de4019558b00916e5 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 23 Jun 2022 14:07:10 -0700 Subject: [PATCH 23/25] Remove the drop aggregator --- sdk/metric/internal/drop.go | 31 ------------------------------- 1 file changed, 31 deletions(-) delete mode 100644 sdk/metric/internal/drop.go diff --git a/sdk/metric/internal/drop.go b/sdk/metric/internal/drop.go deleted file mode 100644 index 8a52910ae6f..00000000000 --- a/sdk/metric/internal/drop.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build go1.18 -// +build go1.18 - -package internal // import "go.opentelemetry.io/otel/sdk/metric/internal" - -import "go.opentelemetry.io/otel/attribute" - -// dropAgg drops all recorded data and returns an empty Aggregation. -type dropAgg[N int64 | float64] struct{} - -// NewDrop returns an Aggregator that drops all recorded data and returns an -// empty Aggregation. -func NewDrop[N int64 | float64]() Aggregator[N] { return &dropAgg[N]{} } - -func (s *dropAgg[N]) Aggregate(N, *attribute.Set) {} - -func (s *dropAgg[N]) Aggregations() []Aggregation { return nil } From c7cfda1b820e6468a42eedba9e2f8e33a6724c95 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 23 Jun 2022 14:12:13 -0700 Subject: [PATCH 24/25] Fix lint --- sdk/metric/internal/aggregator.go | 2 +- sdk/metric/internal/aggregator_example_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/metric/internal/aggregator.go b/sdk/metric/internal/aggregator.go index 63489f1ae8b..089f9c9fdea 100644 --- a/sdk/metric/internal/aggregator.go +++ b/sdk/metric/internal/aggregator.go @@ -25,7 +25,7 @@ type Aggregator[N int64 | float64] interface { // into an aggregation. Aggregate(measurement N, attr *attribute.Set) - // Aggregations retuns a slice of Aggregation, one per each attribute set + // Aggregations returns a slice of Aggregation, one per each attribute set // used to scope measurement aggregation, and ends an aggregation cycle. Aggregations() []Aggregation } diff --git a/sdk/metric/internal/aggregator_example_test.go b/sdk/metric/internal/aggregator_example_test.go index 04db712016a..34f59c01d97 100644 --- a/sdk/metric/internal/aggregator_example_test.go +++ b/sdk/metric/internal/aggregator_example_test.go @@ -111,9 +111,9 @@ func Example() { m := meter{} provider := m.SyncInt64() - provider.Counter("counter example") - provider.UpDownCounter("up-down counter example") - provider.Histogram("histogram example") + _, _ = provider.Counter("counter example") + _, _ = provider.UpDownCounter("up-down counter example") + _, _ = provider.Histogram("histogram example") // Output: // using *internal.cumulativeSum[int64] aggregator for counter From 3d0d54ee09eee0d7407831576b5ab9be3ad30c85 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 24 Jun 2022 08:18:48 -0700 Subject: [PATCH 25/25] Use attribute.Set instead of ptr to it --- sdk/metric/internal/aggregation.go | 2 +- sdk/metric/internal/aggregator.go | 2 +- sdk/metric/internal/aggregator_example_test.go | 2 +- sdk/metric/internal/histogram.go | 2 +- sdk/metric/internal/lastvalue.go | 2 +- sdk/metric/internal/sum.go | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/metric/internal/aggregation.go b/sdk/metric/internal/aggregation.go index 4149f2672cb..f161d2debbf 100644 --- a/sdk/metric/internal/aggregation.go +++ b/sdk/metric/internal/aggregation.go @@ -33,7 +33,7 @@ type Aggregation struct { Timestamp uint64 // Attributes are the unique dimensions Value describes. - Attributes *attribute.Set + Attributes attribute.Set // Value is the summarization of the measurements made. Value value diff --git a/sdk/metric/internal/aggregator.go b/sdk/metric/internal/aggregator.go index 089f9c9fdea..df8b59e84bb 100644 --- a/sdk/metric/internal/aggregator.go +++ b/sdk/metric/internal/aggregator.go @@ -23,7 +23,7 @@ import "go.opentelemetry.io/otel/attribute" type Aggregator[N int64 | float64] interface { // Aggregate records the measurement, scoped by attr, and aggregates it // into an aggregation. - Aggregate(measurement N, attr *attribute.Set) + Aggregate(measurement N, attr attribute.Set) // Aggregations returns a slice of Aggregation, one per each attribute set // used to scope measurement aggregation, and ends an aggregation cycle. diff --git a/sdk/metric/internal/aggregator_example_test.go b/sdk/metric/internal/aggregator_example_test.go index 34f59c01d97..47fe2253818 100644 --- a/sdk/metric/internal/aggregator_example_test.go +++ b/sdk/metric/internal/aggregator_example_test.go @@ -101,7 +101,7 @@ func (p *syncInt64Provider) Histogram(string, ...instrument.Option) (syncint64.H type inst struct { instrument.Synchronous - aggregateFunc func(int64, *attribute.Set) + aggregateFunc func(int64, attribute.Set) } func (inst) Add(context.Context, int64, ...attribute.KeyValue) {} diff --git a/sdk/metric/internal/histogram.go b/sdk/metric/internal/histogram.go index f25e9aa7430..6931c01f344 100644 --- a/sdk/metric/internal/histogram.go +++ b/sdk/metric/internal/histogram.go @@ -28,7 +28,7 @@ type histogram[N int64 | float64] struct { // TODO(#2970): implement. } -func (s *histogram[N]) Aggregate(value N, attr *attribute.Set) { +func (s *histogram[N]) Aggregate(value N, attr attribute.Set) { // TODO(#2970): implement. } diff --git a/sdk/metric/internal/lastvalue.go b/sdk/metric/internal/lastvalue.go index b4e5fa51590..986a2313ad0 100644 --- a/sdk/metric/internal/lastvalue.go +++ b/sdk/metric/internal/lastvalue.go @@ -30,7 +30,7 @@ func NewLastValue[N int64 | float64]() Aggregator[N] { return &lastValue[N]{} } -func (s *lastValue[N]) Aggregate(value N, attr *attribute.Set) { +func (s *lastValue[N]) Aggregate(value N, attr attribute.Set) { // TODO(#2971): implement. } diff --git a/sdk/metric/internal/sum.go b/sdk/metric/internal/sum.go index 6315bf70d44..6d01183fb03 100644 --- a/sdk/metric/internal/sum.go +++ b/sdk/metric/internal/sum.go @@ -24,7 +24,7 @@ type sum[N int64 | float64] struct { // TODO(#2972): implement. } -func (s *sum[N]) Aggregate(value N, attr *attribute.Set) { +func (s *sum[N]) Aggregate(value N, attr attribute.Set) { // TODO(#2972): implement. }