Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

processor/deltatocumulative: golden tests #35562

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions processor/deltatocumulativeprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.22.0
require (
github.com/google/go-cmp v0.6.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.111.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.111.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.111.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.111.1-0.20241008154146-ea48c09c31ae
Expand All @@ -21,6 +20,8 @@ require (
go.opentelemetry.io/otel/trace v1.31.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/tools v0.25.0
gopkg.in/yaml.v3 v3.0.1
)

require (
Expand Down Expand Up @@ -50,13 +51,12 @@ require (
go.opentelemetry.io/collector/processor/processorprofiles v0.111.1-0.20241008154146-ea48c09c31ae // indirect
go.opentelemetry.io/otel/sdk v1.31.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
google.golang.org/grpc v1.67.1 // indirect
google.golang.org/protobuf v1.35.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil
Expand Down
10 changes: 6 additions & 4 deletions processor/deltatocumulativeprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

112 changes: 112 additions & 0 deletions processor/deltatocumulativeprocessor/internal/testar/decode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// testar is a textual archive (based on [golang.org/x/tools/txtar]) to define
// test fixtures.
//
// Archive data is read into struct fields, optionally calling parsers for field
// types other than string or []byte:
//
// type T struct {
// Literal string `testar:"file1"`
// Parsed int `testar:"file2,myparser"`
// }
//
// var into T
// err := Read(data, &into)
//
// See [Read] and [Parser] for examples.
package testar // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testar"

import (
"fmt"
"io/fs"
"reflect"
"strings"

"golang.org/x/tools/txtar"
)

// Read archive data into the fields of struct *T
func Read[T any](data []byte, into *T, parsers ...Format) error {
ar := txtar.Parse(data)
return Decode(ar, into, parsers...)
}

func ReadFile[T any](file string, into *T, parsers ...Format) error {
ar, err := txtar.ParseFile(file)
if err != nil {
return err
}
return Decode(ar, into, parsers...)
}

func Decode[T any](ar *txtar.Archive, into *T, parsers ...Format) error {
arfs, err := txtar.FS(ar)
if err != nil {
return err
}

pv := reflect.ValueOf(into)
if pv.Kind() != reflect.Pointer {
return fmt.Errorf("into must be pointer")
}
sv := pv.Elem()

for i := range sv.NumField() {
f := sv.Type().Field(i)
tag := f.Tag.Get("testar")
if tag == "" {
continue
}

name, format, _ := strings.Cut(tag, ",")
data, err := fs.ReadFile(arfs, name)
if err != nil {
return fmt.Errorf("%s: %w", name, err)
}

err = formats(parsers).Parse(format, data, sv.Field(i).Addr().Interface())
if err != nil {
return fmt.Errorf("%s: %w", name, err)
}
}
return nil
}

type formats []Format

func (fmts formats) Parse(name string, data []byte, into any) error {
if name == "" {
return LiteralParser(data, into)
}

for _, f := range fmts {
if f.name == name {
return f.parse(data, into)
}
}
return fmt.Errorf("no such format: %q", name)
}

type Format struct {
name string
parse func(file []byte, into any) error
}

func Parser(name string, fn func(data []byte, into any) error) Format {
return Format{name: name, parse: fn}
}

// LiteralParser sets data unaltered into a []byte or string
func LiteralParser(data []byte, into any) error {
switch ptr := into.(type) {
case *[]byte:
*ptr = append([]byte(nil), data...)
case *string:
*ptr = string(data)
default:
return fmt.Errorf("pass *[]byte, *string or use a parser. got %T", into)
}
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package testar

import (
"fmt"
"strconv"
"strings"
)

func ExampleRead() {
data := []byte(`
-- foo --
hello

-- bar --
world
`)

var into struct {
Foo string `testar:"foo"`
Bar []byte `testar:"bar"`
}

_ = Read(data, &into)
fmt.Printf("foo: %T(%q)\n", into.Foo, into.Foo)
fmt.Printf("bar: %T(%q)\n", into.Bar, into.Bar)

// Output:
// foo: string("hello\n\n")
// bar: []uint8("world\n")
}

func ExampleParser() {
data := []byte(`
-- foobar --
377927
`)

var into struct {
Foobar int `testar:"foobar,atoi"`
}

_ = Read(data, &into, Parser("atoi", func(file []byte, into any) error {
n, err := strconv.Atoi(strings.TrimSpace(string(file)))
if err != nil {
return err
}
*(into.(*int)) = n
return nil
}))

fmt.Printf("foobar: %T(%d)\n", into.Foobar, into.Foobar)

// Output:
// foobar: int(377927)
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,23 +68,35 @@ func (m Metric[P]) Stream() (streams.Ident, P) {
}

func Resource() pcommon.Resource {
return ResourceN(10)
}

func ResourceN(n int) pcommon.Resource {
res := pcommon.NewResource()
for i := 0; i < 10; i++ {
res.Attributes().PutStr(randStr(), randStr())
}
Attributes(n).MoveTo(res.Attributes())
return res
}

func Scope() pcommon.InstrumentationScope {
return ScopeN(3)
}

func ScopeN(n int) pcommon.InstrumentationScope {
scope := pcommon.NewInstrumentationScope()
scope.SetName(randStr())
scope.SetVersion(randStr())
for i := 0; i < 3; i++ {
scope.Attributes().PutStr(randStr(), randStr())
}
Attributes(n).MoveTo(scope.Attributes())
return scope
}

func Attributes(n int) pcommon.Map {
m := pcommon.NewMap()
for i := 0; i < n; i++ {
m.PutStr(randStr(), randStr())
}
return m
}

func randStr() string {
return strconv.FormatInt(randInt(), 16)
}
Expand Down
2 changes: 1 addition & 1 deletion processor/deltatocumulativeprocessor/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (p *Linear) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
acc, err := func() (data.Number, error) {
if !ok {
// new stream: there is no existing aggregation, so start new with current dp
return dp, nil
return dp.Clone(), nil
}
// tracked stream: add incoming delta dp to existing cumulative aggregation
return acc, delta.AccumulateInto(acc, dp)
Expand Down
Loading
Loading