Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector/signaltometrics]Add collector telemetry as resource attrib… #37117

Merged
merged 2 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/singlewriter-signaltometrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: signaltometrics

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds resource attributes based on telemetry settings to the connector to ensure single writer

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35930]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
11 changes: 6 additions & 5 deletions connector/signaltometricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
)

type signalToMetrics struct {
next consumer.Metrics
logger *zap.Logger
next consumer.Metrics
collectorInstanceInfo *model.CollectorInstanceInfo
logger *zap.Logger

spanMetricDefs []model.MetricDef[ottlspan.TransformContext]
dpMetricDefs []model.MetricDef[ottldatapoint.TransformContext]
Expand Down Expand Up @@ -75,7 +76,7 @@ func (sm *signalToMetrics) ConsumeTraces(ctx context.Context, td ptrace.Traces)
}
}

filteredResAttrs := md.FilterResourceAttributes(resourceAttrs)
filteredResAttrs := md.FilterResourceAttributes(resourceAttrs, sm.collectorInstanceInfo)
if err := aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, filteredSpanAttrs, 1); err != nil {
return err
}
Expand Down Expand Up @@ -104,7 +105,7 @@ func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics
metrics := scopeMetric.Metrics()
metric := metrics.At(k)
for _, md := range sm.dpMetricDefs {
filteredResAttrs := md.FilterResourceAttributes(resourceAttrs)
filteredResAttrs := md.FilterResourceAttributes(resourceAttrs, sm.collectorInstanceInfo)
aggregate := func(dp any, dpAttrs pcommon.Map) error {
// The transform context is created from original attributes so that the
// OTTL expressions are also applied on the original attributes.
Expand Down Expand Up @@ -230,7 +231,7 @@ func (sm *signalToMetrics) ConsumeLogs(ctx context.Context, logs plog.Logs) erro
continue
}
}
filteredResAttrs := md.FilterResourceAttributes(resourceAttrs)
filteredResAttrs := md.FilterResourceAttributes(resourceAttrs, sm.collectorInstanceInfo)
if err := aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, filteredLogAttrs, 1); err != nil {
return err
}
Expand Down
15 changes: 12 additions & 3 deletions connector/signaltometricsconnector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ func createTracesToMetrics(
}

return &signalToMetrics{
logger: set.Logger,
logger: set.Logger,
collectorInstanceInfo: model.NewCollectorInstanceInfo(
set.TelemetrySettings,
),
next: nextConsumer,
spanMetricDefs: metricDefs,
}, nil
Expand Down Expand Up @@ -85,7 +88,10 @@ func createMetricsToMetrics(
}

return &signalToMetrics{
logger: set.Logger,
logger: set.Logger,
collectorInstanceInfo: model.NewCollectorInstanceInfo(
set.TelemetrySettings,
),
next: nextConsumer,
dpMetricDefs: metricDefs,
}, nil
Expand Down Expand Up @@ -113,7 +119,10 @@ func createLogsToMetrics(
}

return &signalToMetrics{
logger: set.Logger,
logger: set.Logger,
collectorInstanceInfo: model.NewCollectorInstanceInfo(
set.TelemetrySettings,
),
next: nextConsumer,
logMetricDefs: metricDefs,
}, nil
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package model // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/model"

import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
semconv "go.opentelemetry.io/collector/semconv/v1.26.0"

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/metadata"
)

var prefix = metadata.Type.String()

// CollectorInstanceInfo holds the attributes that could uniquely identify
// the current collector instance. These attributes are initialized from the
// telemetry settings. The CollectorInstanceInfo can copy these attributes,
// with a given prefix, to a provided map.
type CollectorInstanceInfo struct {
size int
serviceInstanceID string
serviceName string
serviceNamespace string
}

func NewCollectorInstanceInfo(
set component.TelemetrySettings,
) *CollectorInstanceInfo {
var info CollectorInstanceInfo
set.Resource.Attributes().Range(func(k string, v pcommon.Value) bool {
switch k {
case semconv.AttributeServiceInstanceID:
if str := v.Str(); str != "" {
info.serviceInstanceID = str
info.size++
}
case semconv.AttributeServiceName:
if str := v.Str(); str != "" {
info.serviceName = str
info.size++
}
case semconv.AttributeServiceNamespace:
if str := v.Str(); str != "" {
info.serviceNamespace = str
info.size++
}
}
return true
})
return &info
}

// Size returns the max number of attributes that defines a collector's
// instance information. Can be used to presize the attributes.
func (info CollectorInstanceInfo) Size() int {
return info.size
}

func (info CollectorInstanceInfo) Copy(to pcommon.Map) {
to.EnsureCapacity(info.Size())
if info.serviceInstanceID != "" {
to.PutStr(keyWithPrefix(semconv.AttributeServiceInstanceID), info.serviceInstanceID)
}
if info.serviceName != "" {
to.PutStr(keyWithPrefix(semconv.AttributeServiceName), info.serviceName)
}
if info.serviceNamespace != "" {
to.PutStr(keyWithPrefix(semconv.AttributeServiceNamespace), info.serviceNamespace)
}
}

func keyWithPrefix(key string) string {
return prefix + "." + key
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package model

import (
"reflect"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/pcommon"
semconv "go.opentelemetry.io/collector/semconv/v1.26.0"
)

func TestCollectorInstanceInfo(t *testing.T) {
for _, tc := range []struct {
name string
input component.TelemetrySettings
expected pcommon.Map
}{
{
name: "empty",
input: componenttest.NewNopTelemetrySettings(),
expected: pcommon.NewMap(),
},
{
name: "with_service_instance_id",
input: func() component.TelemetrySettings {
ts := componenttest.NewNopTelemetrySettings()
ts.Resource.Attributes().PutStr(semconv.AttributeServiceInstanceID, "627cc493-f310-47de-96bd-71410b7dec09")
return ts
}(),
expected: func() pcommon.Map {
m := pcommon.NewMap()
m.PutStr(
"signaltometrics."+semconv.AttributeServiceInstanceID,
"627cc493-f310-47de-96bd-71410b7dec09",
)
return m
}(),
},
{
name: "with_all_values",
input: func() component.TelemetrySettings {
ts := componenttest.NewNopTelemetrySettings()
ts.Resource.Attributes().PutStr(semconv.AttributeServiceInstanceID, "627cc493-f310-47de-96bd-71410b7dec09")
ts.Resource.Attributes().PutStr(semconv.AttributeServiceName, "signaltometrics")
ts.Resource.Attributes().PutStr(semconv.AttributeServiceNamespace, "test")
return ts
}(),
expected: func() pcommon.Map {
m := pcommon.NewMap()
m.PutStr(
"signaltometrics."+semconv.AttributeServiceInstanceID,
"627cc493-f310-47de-96bd-71410b7dec09",
)
m.PutStr(
"signaltometrics."+semconv.AttributeServiceName,
"signaltometrics",
)
m.PutStr(
"signaltometrics."+semconv.AttributeServiceNamespace,
"test",
)
return m
}(),
},
} {
t.Run(tc.name, func(t *testing.T) {
ci := NewCollectorInstanceInfo(tc.input)
require.NotNil(t, ci)

actual := pcommon.NewMap()
ci.Copy(actual)
assert.Equal(t, ci.Size(), actual.Len())
assertMapEquality(t, tc.expected, actual)
})
}
}

func assertMapEquality(t *testing.T, expected, actual pcommon.Map) bool {
t.Helper()

expectedRaw := expected.AsRaw()
actualRaw := actual.AsRaw()
return assert.True(
t, reflect.DeepEqual(expectedRaw, actualRaw),
"attributes don't match expected: %v, actual: %v",
expectedRaw, actualRaw,
)
}
6 changes: 4 additions & 2 deletions connector/signaltometricsconnector/internal/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,19 @@ func (md *MetricDef[K]) FromMetricInfo(
// definition.
func (md *MetricDef[K]) FilterResourceAttributes(
attrs pcommon.Map,
collectorInfo *CollectorInstanceInfo,
) pcommon.Map {
var filteredAttributes pcommon.Map
switch {
case len(md.IncludeResourceAttributes) == 0:
filteredAttributes = pcommon.NewMap()
filteredAttributes.EnsureCapacity(attrs.Len())
filteredAttributes.EnsureCapacity(attrs.Len() + collectorInfo.Size())
attrs.CopyTo(filteredAttributes)
default:
expectedLen := len(md.IncludeResourceAttributes)
expectedLen := len(md.IncludeResourceAttributes) + collectorInfo.Size()
filteredAttributes = filterAttributes(attrs, md.IncludeResourceAttributes, expectedLen)
}
collectorInfo.Copy(filteredAttributes)
return filteredAttributes
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ resourceMetrics:
- key: resource.foo
value:
stringValue: foo
- key: signaltometrics.service.instance.id
value:
stringValue: 627cc493-f310-47de-96bd-71410b7dec09
- key: signaltometrics.service.name
value:
stringValue: signaltometrics
- key: signaltometrics.service.namespace
value:
stringValue: test
scopeMetrics:
- metrics:
- description: Logrecords as exponential histogram with log.duration from attributes
Expand Down Expand Up @@ -327,6 +336,15 @@ resourceMetrics:
- key: resource.foo
value:
stringValue: foo
- key: signaltometrics.service.instance.id
value:
stringValue: 627cc493-f310-47de-96bd-71410b7dec09
- key: signaltometrics.service.name
value:
stringValue: signaltometrics
- key: signaltometrics.service.namespace
value:
stringValue: test
scopeMetrics:
- metrics:
- description: Logrecords with resource attribute foo as exponential histogram with log.duration from attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ resourceMetrics:
- key: resource.foo
value:
stringValue: foo
- key: signaltometrics.service.instance.id
value:
stringValue: 627cc493-f310-47de-96bd-71410b7dec09
- key: signaltometrics.service.name
value:
stringValue: signaltometrics
- key: signaltometrics.service.namespace
value:
stringValue: test
scopeMetrics:
- metrics:
- description: Logrecords as histogram with log.duration from attributes
Expand Down Expand Up @@ -127,6 +136,15 @@ resourceMetrics:
- key: resource.foo
value:
stringValue: foo
- key: signaltometrics.service.instance.id
value:
stringValue: 627cc493-f310-47de-96bd-71410b7dec09
- key: signaltometrics.service.name
value:
stringValue: signaltometrics
- key: signaltometrics.service.namespace
value:
stringValue: test
scopeMetrics:
- metrics:
- description: Logrecords with resource attribute foo as histogram with log.duration from attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ resourceMetrics:
- key: resource.foo
value:
stringValue: foo
- key: signaltometrics.service.instance.id
value:
stringValue: 627cc493-f310-47de-96bd-71410b7dec09
- key: signaltometrics.service.name
value:
stringValue: signaltometrics
- key: signaltometrics.service.namespace
value:
stringValue: test
scopeMetrics:
- metrics:
- description: Count total number of log records
Expand Down Expand Up @@ -57,6 +66,15 @@ resourceMetrics:
- key: resource.foo
value:
stringValue: foo
- key: signaltometrics.service.instance.id
value:
stringValue: 627cc493-f310-47de-96bd-71410b7dec09
- key: signaltometrics.service.name
value:
stringValue: signaltometrics
- key: signaltometrics.service.namespace
value:
stringValue: test
scopeMetrics:
- metrics:
- description: Count total number of log records with resource attribute foo
Expand Down
Loading
Loading