Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aggregation package and reader/view options #2958

Merged
merged 19 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions sdk/metric/aggregation/aggregation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17

// Package aggregation contains configuration types that define the
// aggregation operation used to summarizes recorded measurements.
package aggregation // import "go.opentelemetry.io/otel/sdk/metric/aggregation"

import (
"errors"
"fmt"
)

// errAgg is wrapped by misconfigured aggregations.
var errAgg = errors.New("aggregation")

// Aggregation is the aggregation used to summarize recorded measurements.
type Aggregation interface {
// private attempts to ensure no user-defined Aggregation are allowed. The
// OTel specification does not allow user-defined Aggregation currently.
private()

// Copy returns a deep copy of the Aggregation.
Copy() Aggregation

// Err returns an error for any misconfigured Aggregation.
Err() error
}

// Drop is an aggregation that drops all recorded data.
type Drop struct{} // Drop has no parameters.

var _ Aggregation = Drop{}

func (Drop) private() {}

// Copy returns a deep copy of d.
func (d Drop) Copy() Aggregation { return d }

// Err returns an error for any misconfiguration. A Drop aggregation has no
// parameters and cannot be misconfigured, therefore this always returns nil.
func (Drop) Err() error { return nil }

// Default is an aggregation that uses the default instrument kind selection
// mapping to select another aggregation. A metric reader can be configured to
// make an aggregation selection based on instrument kind that differs from
// the default. This aggregation ensures the default is used.
//
// See the "go.opentelemetry.io/otel/sdk/metric".DefaultAggregationSelector
// for information about the default instrument kind selection mapping.
type Default struct{} // Default has no parameters.

var _ Aggregation = Default{}

func (Default) private() {}

// Copy returns a deep copy of d.
func (d Default) Copy() Aggregation { return d }

// Err returns an error for any misconfiguration. A Default aggregation has no
// parameters and cannot be misconfigured, therefore this always returns nil.
func (Default) Err() error { return nil }

// Sum is an aggregation that summarizes a set of measurements as their
// arithmetic sum.
type Sum struct{} // Sum has no parameters.

var _ Aggregation = Sum{}

func (Sum) private() {}

// Copy returns a deep copy of s.
func (s Sum) Copy() Aggregation { return s }

// Err returns an error for any misconfiguration. A Sum aggregation has no
// parameters and cannot be misconfigured, therefore this always returns nil.
func (Sum) Err() error { return nil }

// LastValue is an aggregation that summarizes a set of measurements as the
// last one made.
type LastValue struct{} // LastValue has no parameters.

var _ Aggregation = LastValue{}

func (LastValue) private() {}

// Copy returns a deep copy of l.
func (l LastValue) Copy() Aggregation { return l }

// Err returns an error for any misconfiguration. A LastValue aggregation has
// no parameters and cannot be misconfigured, therefore this always returns
// nil.
func (LastValue) Err() error { return nil }

// ExplicitBucketHistogram is an aggregation that summarizes a set of
// measurements as an histogram with explicitly defined buckets.
type ExplicitBucketHistogram struct {
// Boundaries are the increasing bucket boundary values. Boundary values
// define bucket upper bounds. Buckets are exclusive of their lower
// boundary and inclusive of their upper bound (except at positive
// infinity). A measurement is defined to fall into the greatest-numbered
// bucket with a boundary that is greater than or equal to the
// measurement. As an example, boundaries defined as:
//
// []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000}
//
// Will define these buckets:
//
// (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0],
// (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0],
// (500.0, 1000.0], (1000.0, +∞)
Boundaries []float64
dashpole marked this conversation as resolved.
Show resolved Hide resolved
// NoMinMax indicates whether to not record the min and max of the
// distribution. By default, these extremes are recorded.
NoMinMax bool
}

var _ Aggregation = ExplicitBucketHistogram{}

func (ExplicitBucketHistogram) private() {}

// errHist is returned by misconfigured ExplicitBucketHistograms.
var errHist = fmt.Errorf("%w: explicit bucket histogram", errAgg)

// Err returns an error for any misconfiguration.
func (h ExplicitBucketHistogram) Err() error {
if len(h.Boundaries) <= 1 {
return nil
}

// Check boundaries are monotonic.
i := h.Boundaries[0]
for _, j := range h.Boundaries[1:] {
if i >= j {
return fmt.Errorf("%w: non-monotonic boundaries: %v", errHist, h.Boundaries)
}
i = j
}

return nil
}

// Copy returns a deep copy of h.
func (h ExplicitBucketHistogram) Copy() Aggregation {
b := make([]float64, len(h.Boundaries))
for i, v := range h.Boundaries {
b[i] = v
}
return ExplicitBucketHistogram{
Boundaries: b,
NoMinMax: h.NoMinMax,
}
}
70 changes: 70 additions & 0 deletions sdk/metric/aggregation/aggregation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17

package aggregation

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestAggregationErr(t *testing.T) {
t.Run("DropOperation", func(t *testing.T) {
assert.NoError(t, Drop{}.Err())
})

t.Run("SumOperation", func(t *testing.T) {
assert.NoError(t, Sum{}.Err())
})

t.Run("LastValueOperation", func(t *testing.T) {
assert.NoError(t, LastValue{}.Err())
})

t.Run("ExplicitBucketHistogramOperation", func(t *testing.T) {
assert.NoError(t, ExplicitBucketHistogram{}.Err())

assert.NoError(t, ExplicitBucketHistogram{
Boundaries: []float64{0},
NoMinMax: true,
}.Err())

assert.NoError(t, ExplicitBucketHistogram{
Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000},
}.Err())
})

t.Run("NonmonotonicHistogramBoundaries", func(t *testing.T) {
assert.ErrorIs(t, ExplicitBucketHistogram{
Boundaries: []float64{2, 1},
}.Err(), errAgg)

assert.ErrorIs(t, ExplicitBucketHistogram{
Boundaries: []float64{0, 1, 2, 1, 3, 4},
}.Err(), errAgg)
})
}

func TestExplicitBucketHistogramDeepCopy(t *testing.T) {
const orig = 0.0
b := []float64{orig}
h := ExplicitBucketHistogram{Boundaries: b}
cpH := h.Copy().(ExplicitBucketHistogram)
b[0] = orig + 1
assert.Equal(t, orig, cpH.Boundaries[0], "changing the underlying slice data should not affect the copy")
}
6 changes: 6 additions & 0 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,26 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/export"
"go.opentelemetry.io/otel/sdk/resource"
)

type reader struct {
producer producer
temporalityFunc func(InstrumentKind) Temporality
aggregationFunc AggregationSelector
collectFunc func(context.Context) (export.Metrics, error)
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}

var _ Reader = (*reader)(nil)

func (r *reader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type.
return r.aggregationFunc(kind)
}

func (r *reader) register(p producer) { r.producer = p }
func (r *reader) temporality(kind InstrumentKind) Temporality { return r.temporalityFunc(kind) }
func (r *reader) Collect(ctx context.Context) (export.Metrics, error) { return r.collectFunc(ctx) }
Expand Down
10 changes: 10 additions & 0 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync/atomic"

"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/export"
)

Expand All @@ -34,6 +35,7 @@ type manualReader struct {
shutdownOnce sync.Once

temporalitySelector func(InstrumentKind) Temporality
aggregationSelector AggregationSelector
}

// Compile time check the manualReader implements Reader.
Expand All @@ -44,6 +46,7 @@ func NewManualReader(opts ...ManualReaderOption) Reader {
cfg := newManualReaderConfig(opts)
return &manualReader{
temporalitySelector: cfg.temporalitySelector,
aggregationSelector: cfg.aggregationSelector,
}
}

Expand All @@ -62,6 +65,11 @@ func (mr *manualReader) temporality(kind InstrumentKind) Temporality {
return mr.temporalitySelector(kind)
}

// aggregation returns what Aggregation to use for kind.
func (mr *manualReader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type.
return mr.aggregationSelector(kind)
}

// ForceFlush is a no-op, it always returns nil.
func (mr *manualReader) ForceFlush(context.Context) error {
return nil
Expand Down Expand Up @@ -103,12 +111,14 @@ func (mr *manualReader) Collect(ctx context.Context) (export.Metrics, error) {
// manualReaderConfig contains configuration options for a ManualReader.
type manualReaderConfig struct {
temporalitySelector func(InstrumentKind) Temporality
aggregationSelector AggregationSelector
}

// newManualReaderConfig returns a manualReaderConfig configured with options.
func newManualReaderConfig(opts []ManualReaderOption) manualReaderConfig {
cfg := manualReaderConfig{
temporalitySelector: defaultTemporalitySelector,
aggregationSelector: DefaultAggregationSelector,
}
for _, opt := range opts {
cfg = opt.applyManual(cfg)
Expand Down
10 changes: 10 additions & 0 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/export"
)

Expand All @@ -40,6 +41,7 @@ type periodicReaderConfig struct {
interval time.Duration
timeout time.Duration
temporalitySelector func(InstrumentKind) Temporality
aggregationSelector AggregationSelector
}

// newPeriodicReaderConfig returns a periodicReaderConfig configured with
Expand All @@ -49,6 +51,7 @@ func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfi
interval: defaultInterval,
timeout: defaultTimeout,
temporalitySelector: defaultTemporalitySelector,
aggregationSelector: DefaultAggregationSelector,
}
for _, o := range options {
c = o.applyPeriodic(c)
Expand Down Expand Up @@ -117,6 +120,7 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
cancel: cancel,

temporalitySelector: conf.temporalitySelector,
aggregationSelector: conf.aggregationSelector,
}

r.wg.Add(1)
Expand All @@ -137,6 +141,7 @@ type periodicReader struct {
exporter Exporter

temporalitySelector func(InstrumentKind) Temporality
aggregationSelector AggregationSelector

wg sync.WaitGroup
cancel context.CancelFunc
Expand Down Expand Up @@ -184,6 +189,11 @@ func (r *periodicReader) temporality(kind InstrumentKind) Temporality {
return r.temporalitySelector(kind)
}

// aggregation returns what Aggregation to use for kind.
func (r *periodicReader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type.
return r.aggregationSelector(kind)
}

// Collect gathers and returns all metric data related to the Reader from
// the SDK. The returned metric data is not exported to the configured
// exporter, it is left to the caller to handle that if desired.
Expand Down
Loading