Skip to content

Commit

Permalink
[chore] [exporter/elasticsearch] Extract mapping hints into an intern…
Browse files Browse the repository at this point in the history
…al elasticsearch module (#37235)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This is the first (or the first after several attempts canceled for
various reasons) attempt to split data serialization/encoding into
multiple packages, so we can separate the module per encoding type.

See
#37207
as the previous attempt which was canceled due to another refactoring
that required more ground splitting first.

---------

Co-authored-by: Tim Rühsen <tim.ruehsen@gmx.de>
  • Loading branch information
dmathieu and rockdaboot authored Jan 22, 2025
1 parent 49d5b89 commit 7ce882e
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package elasticsearch // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch"

import (
"slices"

"go.opentelemetry.io/collector/pdata/pcommon"
)

const (
MappingHintsAttrKey = "elasticsearch.mapping.hints"
)

type MappingHint string

const (
HintAggregateMetricDouble MappingHint = "aggregate_metric_double"
HintDocCount MappingHint = "_doc_count"
)

type MappingHintGetter struct {
hints []MappingHint
}

// NewMappingHintGetter creates a new MappingHintGetter
func NewMappingHintGetter(attr pcommon.Map) (g MappingHintGetter) {
v, ok := attr.Get(MappingHintsAttrKey)
if !ok || v.Type() != pcommon.ValueTypeSlice {
return
}
slice := v.Slice()
g.hints = slices.Grow(g.hints, slice.Len())
for i := range slice.Len() {
g.hints = append(g.hints, MappingHint(slice.At(i).Str()))
}
return
}

// HasMappingHint checks whether the getter contains the requested mapping hint
func (g MappingHintGetter) HasMappingHint(hint MappingHint) bool {
return slices.Contains(g.hints, hint)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package elasticsearchexporter
package elasticsearch

import (
"testing"
Expand All @@ -14,74 +14,74 @@ func TestHasHint(t *testing.T) {
tests := []struct {
name string
attrsFunc func() pcommon.Map
hint mappingHint
hint MappingHint
want bool
}{
{
name: "empty map",
attrsFunc: pcommon.NewMap,
hint: hintAggregateMetricDouble,
hint: HintAggregateMetricDouble,
want: false,
},
{
name: "bad type",
attrsFunc: func() pcommon.Map {
m := pcommon.NewMap()
m.PutBool(mappingHintsAttrKey, true)
m.PutBool(MappingHintsAttrKey, true)
return m
},
hint: hintAggregateMetricDouble,
hint: HintAggregateMetricDouble,
want: false,
},
{
name: "bad inner type",
attrsFunc: func() pcommon.Map {
m := pcommon.NewMap()
s := m.PutEmptySlice(mappingHintsAttrKey)
s := m.PutEmptySlice(MappingHintsAttrKey)
s.AppendEmpty().SetBool(true)
return m
},
hint: hintAggregateMetricDouble,
hint: HintAggregateMetricDouble,
want: false,
},
{
name: "hit",
attrsFunc: func() pcommon.Map {
m := pcommon.NewMap()
s := m.PutEmptySlice(mappingHintsAttrKey)
s.AppendEmpty().SetStr(string(hintAggregateMetricDouble))
s := m.PutEmptySlice(MappingHintsAttrKey)
s.AppendEmpty().SetStr(string(HintAggregateMetricDouble))
return m
},
hint: hintAggregateMetricDouble,
hint: HintAggregateMetricDouble,
want: true,
},
{
name: "hit 2nd",
attrsFunc: func() pcommon.Map {
m := pcommon.NewMap()
s := m.PutEmptySlice(mappingHintsAttrKey)
s.AppendEmpty().SetStr(string(hintDocCount))
s.AppendEmpty().SetStr(string(hintAggregateMetricDouble))
s := m.PutEmptySlice(MappingHintsAttrKey)
s.AppendEmpty().SetStr(string(HintDocCount))
s.AppendEmpty().SetStr(string(HintAggregateMetricDouble))
return m
},
hint: hintAggregateMetricDouble,
hint: HintAggregateMetricDouble,
want: true,
},
{
name: "miss",
attrsFunc: func() pcommon.Map {
m := pcommon.NewMap()
s := m.PutEmptySlice(mappingHintsAttrKey)
s.AppendEmpty().SetStr(string(hintDocCount))
s := m.PutEmptySlice(MappingHintsAttrKey)
s.AppendEmpty().SetStr(string(HintDocCount))
return m
},
hint: hintAggregateMetricDouble,
hint: HintAggregateMetricDouble,
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, newMappingHintGetter(tt.attrsFunc()).HasMappingHint(tt.hint))
assert.Equal(t, tt.want, NewMappingHintGetter(tt.attrsFunc()).HasMappingHint(tt.hint))
})
}
}
42 changes: 0 additions & 42 deletions exporter/elasticsearchexporter/mapping_hint.go

This file was deleted.

29 changes: 15 additions & 14 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
semconv "go.opentelemetry.io/collector/semconv/v1.22.0"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/exphistogram"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/traceutil"
Expand Down Expand Up @@ -101,7 +102,7 @@ type dataPoint interface {
Value() (pcommon.Value, error)
DynamicTemplate(pmetric.Metric) string
DocCount() uint64
HasMappingHint(mappingHint) bool
HasMappingHint(elasticsearch.MappingHint) bool
Metric() pmetric.Metric
}

Expand Down Expand Up @@ -263,12 +264,12 @@ func (m *encodeModel) encodeMetrics(resource pcommon.Resource, resourceSchemaURL

type summaryDataPoint struct {
pmetric.SummaryDataPoint
mappingHintGetter
elasticsearch.MappingHintGetter
metric pmetric.Metric
}

func newSummaryDataPoint(metric pmetric.Metric, dp pmetric.SummaryDataPoint) summaryDataPoint {
return summaryDataPoint{SummaryDataPoint: dp, mappingHintGetter: newMappingHintGetter(dp.Attributes()), metric: metric}
return summaryDataPoint{SummaryDataPoint: dp, MappingHintGetter: elasticsearch.NewMappingHintGetter(dp.Attributes()), metric: metric}
}

func (dp summaryDataPoint) Value() (pcommon.Value, error) {
Expand All @@ -295,16 +296,16 @@ func (dp summaryDataPoint) Metric() pmetric.Metric {

type exponentialHistogramDataPoint struct {
pmetric.ExponentialHistogramDataPoint
mappingHintGetter
elasticsearch.MappingHintGetter
metric pmetric.Metric
}

func newExponentialHistogramDataPoint(metric pmetric.Metric, dp pmetric.ExponentialHistogramDataPoint) exponentialHistogramDataPoint {
return exponentialHistogramDataPoint{ExponentialHistogramDataPoint: dp, mappingHintGetter: newMappingHintGetter(dp.Attributes()), metric: metric}
return exponentialHistogramDataPoint{ExponentialHistogramDataPoint: dp, MappingHintGetter: elasticsearch.NewMappingHintGetter(dp.Attributes()), metric: metric}
}

func (dp exponentialHistogramDataPoint) Value() (pcommon.Value, error) {
if dp.HasMappingHint(hintAggregateMetricDouble) {
if dp.HasMappingHint(elasticsearch.HintAggregateMetricDouble) {
vm := pcommon.NewValueMap()
m := vm.Map()
m.PutDouble("sum", dp.Sum())
Expand All @@ -331,7 +332,7 @@ func (dp exponentialHistogramDataPoint) Value() (pcommon.Value, error) {
}

func (dp exponentialHistogramDataPoint) DynamicTemplate(_ pmetric.Metric) string {
if dp.HasMappingHint(hintAggregateMetricDouble) {
if dp.HasMappingHint(elasticsearch.HintAggregateMetricDouble) {
return "summary"
}
return "histogram"
Expand All @@ -347,16 +348,16 @@ func (dp exponentialHistogramDataPoint) Metric() pmetric.Metric {

type histogramDataPoint struct {
pmetric.HistogramDataPoint
mappingHintGetter
elasticsearch.MappingHintGetter
metric pmetric.Metric
}

func newHistogramDataPoint(metric pmetric.Metric, dp pmetric.HistogramDataPoint) histogramDataPoint {
return histogramDataPoint{HistogramDataPoint: dp, mappingHintGetter: newMappingHintGetter(dp.Attributes()), metric: metric}
return histogramDataPoint{HistogramDataPoint: dp, MappingHintGetter: elasticsearch.NewMappingHintGetter(dp.Attributes()), metric: metric}
}

func (dp histogramDataPoint) Value() (pcommon.Value, error) {
if dp.HasMappingHint(hintAggregateMetricDouble) {
if dp.HasMappingHint(elasticsearch.HintAggregateMetricDouble) {
vm := pcommon.NewValueMap()
m := vm.Map()
m.PutDouble("sum", dp.Sum())
Expand All @@ -367,7 +368,7 @@ func (dp histogramDataPoint) Value() (pcommon.Value, error) {
}

func (dp histogramDataPoint) DynamicTemplate(_ pmetric.Metric) string {
if dp.HasMappingHint(hintAggregateMetricDouble) {
if dp.HasMappingHint(elasticsearch.HintAggregateMetricDouble) {
return "summary"
}
return "histogram"
Expand Down Expand Up @@ -431,12 +432,12 @@ func histogramToValue(dp pmetric.HistogramDataPoint) (pcommon.Value, error) {

type numberDataPoint struct {
pmetric.NumberDataPoint
mappingHintGetter
elasticsearch.MappingHintGetter
metric pmetric.Metric
}

func newNumberDataPoint(metric pmetric.Metric, dp pmetric.NumberDataPoint) numberDataPoint {
return numberDataPoint{NumberDataPoint: dp, mappingHintGetter: newMappingHintGetter(dp.Attributes()), metric: metric}
return numberDataPoint{NumberDataPoint: dp, MappingHintGetter: elasticsearch.NewMappingHintGetter(dp.Attributes()), metric: metric}
}

func (dp numberDataPoint) Value() (pcommon.Value, error) {
Expand Down Expand Up @@ -724,7 +725,7 @@ func metricOTelHash(dp dataPoint, unit string) uint32 {

hasher.Write([]byte(unit))

mapHashExcludeReservedAttrs(hasher, dp.Attributes(), mappingHintsAttrKey)
mapHashExcludeReservedAttrs(hasher, dp.Attributes(), elasticsearch.MappingHintsAttrKey)

return hasher.Sum32()
}
Expand Down
6 changes: 4 additions & 2 deletions exporter/elasticsearchexporter/pdata_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/elasticsearch"
)

const tsLayout = "2006-01-02T15:04:05.000000000Z"
Expand Down Expand Up @@ -68,7 +70,7 @@ func serializeDataPoints(v *json.Visitor, dataPoints []dataPoint, validationErro
// TODO here's potential for more optimization by directly serializing the value instead of allocating a pcommon.Value
// the tradeoff is that this would imply a duplicated logic for the ECS mode
value, err := dp.Value()
if dp.HasMappingHint(hintDocCount) {
if dp.HasMappingHint(elasticsearch.HintDocCount) {
docCount = dp.DocCount()
}
if err != nil {
Expand Down Expand Up @@ -296,7 +298,7 @@ func writeAttributes(v *json.Visitor, attributes pcommon.Map, stringifyMapValues
_ = v.OnObjectStart(-1, structform.AnyType)
attributes.Range(func(k string, val pcommon.Value) bool {
switch k {
case dataStreamType, dataStreamDataset, dataStreamNamespace, mappingHintsAttrKey:
case dataStreamType, dataStreamDataset, dataStreamNamespace, elasticsearch.MappingHintsAttrKey:
return true
}
if isGeoAttribute(k, val) {
Expand Down

0 comments on commit 7ce882e

Please sign in to comment.