From 2612314ed7652a1379381e5b9068352a4af18fa5 Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Fri, 8 Nov 2024 22:09:11 -0500 Subject: [PATCH] [connector/routing] Support metric routing (#36236) --- .chloggen/routing-by-metrics.yaml | 27 +++ connector/routingconnector/README.md | 4 +- connector/routingconnector/config.go | 2 +- connector/routingconnector/config_test.go | 16 ++ .../internal/pmetricutil/metrics.go | 43 ++++ .../internal/pmetricutil/metrics_test.go | 144 ++++++++++++ .../internal/pmetricutiltest/metrics.go | 56 +++++ .../internal/pmetricutiltest/metrics_test.go | 50 +++++ connector/routingconnector/logs_test.go | 32 ++- connector/routingconnector/metrics.go | 10 + connector/routingconnector/metrics_test.go | 205 +++++++++++++++++- connector/routingconnector/router.go | 24 +- 12 files changed, 588 insertions(+), 25 deletions(-) create mode 100644 .chloggen/routing-by-metrics.yaml diff --git a/.chloggen/routing-by-metrics.yaml b/.chloggen/routing-by-metrics.yaml new file mode 100644 index 000000000000..070210e19a55 --- /dev/null +++ b/.chloggen/routing-by-metrics.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: routingconnector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add ability to route by metric context + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36236] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/connector/routingconnector/README.md b/connector/routingconnector/README.md index 6a45f230fb20..02ad40317832 100644 --- a/connector/routingconnector/README.md +++ b/connector/routingconnector/README.md @@ -33,7 +33,7 @@ If you are not already familiar with connectors, you may find it helpful to firs The following settings are available: - `table (required)`: the routing table for this connector. -- `table.context (optional, default: resource)`: the [OTTL Context] in which the statement will be evaluated. Currently, only `resource`, `log`, and `request` are supported. +- `table.context (optional, default: resource)`: the [OTTL Context] in which the statement will be evaluated. Currently, only `resource`, `metric`, `log`, and `request` are supported. - `table.statement`: the routing condition provided as the [OTTL] statement. Required if `table.condition` is not provided. May not be used for `request` context. - `table.condition`: the routing condition provided as the [OTTL] condition. Required if `table.statement` is not provided. Required for `request` context. - `table.pipelines (required)`: the list of pipelines to use when the routing condition is met. @@ -43,7 +43,7 @@ The following settings are available: ### Limitations -- The `match_once` setting is only supported when using the `resource` context. If any routes use `log` or `request` context, `match_once` must be set to `true`. +- The `match_once` setting is only supported when using the `resource` context. If any routes use `metric`, `log` or `request` context, `match_once` must be set to `true`. - The `request` context requires use of the `condition` setting, and relies on a very limited grammar. Conditions must be in the form of `request["key"] == "value"` or `request["key"] != "value"`. (In the future, this grammar may be expanded to support more complex conditions.) ### Supported [OTTL] functions diff --git a/connector/routingconnector/config.go b/connector/routingconnector/config.go index f526ec460ab9..fb2f838474c7 100644 --- a/connector/routingconnector/config.go +++ b/connector/routingconnector/config.go @@ -77,7 +77,7 @@ func (c *Config) Validate() error { return err } fallthrough - case "log": // ok + case "metric", "log": // ok if !c.MatchOnce { return fmt.Errorf(`%q context is not supported with "match_once: false"`, item.Context) } diff --git a/connector/routingconnector/config_test.go b/connector/routingconnector/config_test.go index 0cd0456ec8af..b79eb4ee1bf3 100644 --- a/connector/routingconnector/config_test.go +++ b/connector/routingconnector/config_test.go @@ -218,6 +218,22 @@ func TestValidateConfig(t *testing.T) { }, error: "invalid context: invalid", }, + { + name: "metric context with match_once false", + config: &Config{ + MatchOnce: false, + Table: []RoutingTableItem{ + { + Context: "metric", + Statement: `route() where attributes["attr"] == "acme"`, + Pipelines: []pipeline.ID{ + pipeline.NewIDWithName(pipeline.SignalTraces, "otlp"), + }, + }, + }, + }, + error: `"metric" context is not supported with "match_once: false"`, + }, { name: "log context with match_once false", config: &Config{ diff --git a/connector/routingconnector/internal/pmetricutil/metrics.go b/connector/routingconnector/internal/pmetricutil/metrics.go index 1ca6d23b1ad7..58199dc02fe8 100644 --- a/connector/routingconnector/internal/pmetricutil/metrics.go +++ b/connector/routingconnector/internal/pmetricutil/metrics.go @@ -16,3 +16,46 @@ func MoveResourcesIf(from, to pmetric.Metrics, f func(pmetric.ResourceMetrics) b return true }) } + +// MoveMetricsWithContextIf calls f sequentially for each Metric present in the first pmetric.Metrics. +// If f returns true, the element is removed from the first pmetric.Metrics and added to the second pmetric.Metrics. +// Notably, the Resource and Scope associated with the Metric are created in the second pmetric.Metrics only once. +// Resources or Scopes are removed from the original if they become empty. All ordering is preserved. +func MoveMetricsWithContextIf(from, to pmetric.Metrics, f func(pmetric.ResourceMetrics, pmetric.ScopeMetrics, pmetric.Metric) bool) { + rms := from.ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + rm := rms.At(i) + sms := rm.ScopeMetrics() + var rmCopy *pmetric.ResourceMetrics + for j := 0; j < sms.Len(); j++ { + sm := sms.At(j) + ms := sm.Metrics() + var smCopy *pmetric.ScopeMetrics + ms.RemoveIf(func(m pmetric.Metric) bool { + if !f(rm, sm, m) { + return false + } + if rmCopy == nil { + rmc := to.ResourceMetrics().AppendEmpty() + rmCopy = &rmc + rm.Resource().CopyTo(rmCopy.Resource()) + rmCopy.SetSchemaUrl(rm.SchemaUrl()) + } + if smCopy == nil { + smc := rmCopy.ScopeMetrics().AppendEmpty() + smCopy = &smc + sm.Scope().CopyTo(smCopy.Scope()) + smCopy.SetSchemaUrl(sm.SchemaUrl()) + } + m.CopyTo(smCopy.Metrics().AppendEmpty()) + return true + }) + } + sms.RemoveIf(func(sm pmetric.ScopeMetrics) bool { + return sm.Metrics().Len() == 0 + }) + } + rms.RemoveIf(func(rm pmetric.ResourceMetrics) bool { + return rm.ScopeMetrics().Len() == 0 + }) +} diff --git a/connector/routingconnector/internal/pmetricutil/metrics_test.go b/connector/routingconnector/internal/pmetricutil/metrics_test.go index 5b3d751c6826..8c23b4232246 100644 --- a/connector/routingconnector/internal/pmetricutil/metrics_test.go +++ b/connector/routingconnector/internal/pmetricutil/metrics_test.go @@ -80,3 +80,147 @@ func TestMoveResourcesIf(t *testing.T) { }) } } + +func TestMoveMetricsWithContextIf(t *testing.T) { + testCases := []struct { + name string + moveIf func(pmetric.ResourceMetrics, pmetric.ScopeMetrics, pmetric.Metric) bool + from pmetric.Metrics + to pmetric.Metrics + expectFrom pmetric.Metrics + expectTo pmetric.Metrics + }{ + { + name: "move_none", + moveIf: func(_ pmetric.ResourceMetrics, _ pmetric.ScopeMetrics, _ pmetric.Metric) bool { + return false + }, + from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + to: pmetric.NewMetrics(), + expectFrom: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectTo: pmetric.NewMetrics(), + }, + { + name: "move_all", + moveIf: func(_ pmetric.ResourceMetrics, _ pmetric.ScopeMetrics, _ pmetric.Metric) bool { + return true + }, + from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + to: pmetric.NewMetrics(), + expectFrom: pmetric.NewMetrics(), + expectTo: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + }, + { + name: "move_all_from_one_resource", + moveIf: func(rl pmetric.ResourceMetrics, _ pmetric.ScopeMetrics, _ pmetric.Metric) bool { + rname, ok := rl.Resource().Attributes().Get("resourceName") + return ok && rname.AsString() == "resourceB" + }, + from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + to: pmetric.NewMetrics(), + expectFrom: pmetricutiltest.NewMetrics("A", "CD", "EF", "GH"), + expectTo: pmetricutiltest.NewMetrics("B", "CD", "EF", "GH"), + }, + { + name: "move_all_from_one_scope", + moveIf: func(rl pmetric.ResourceMetrics, sl pmetric.ScopeMetrics, _ pmetric.Metric) bool { + rname, ok := rl.Resource().Attributes().Get("resourceName") + return ok && rname.AsString() == "resourceB" && sl.Scope().Name() == "scopeC" + }, + from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + to: pmetric.NewMetrics(), + expectFrom: pmetricutiltest.NewMetricsFromOpts( + pmetricutiltest.WithResource('A', + pmetricutiltest.WithScope('C', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")), + pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")), + ), + pmetricutiltest.WithResource('B', + pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")), + ), + ), + expectTo: pmetricutiltest.NewMetrics("B", "C", "EF", "GH"), + }, + { + name: "move_all_from_one_scope_in_each_resource", + moveIf: func(_ pmetric.ResourceMetrics, sl pmetric.ScopeMetrics, _ pmetric.Metric) bool { + return sl.Scope().Name() == "scopeD" + }, + from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + to: pmetric.NewMetrics(), + expectFrom: pmetricutiltest.NewMetrics("AB", "C", "EF", "GH"), + expectTo: pmetricutiltest.NewMetrics("AB", "D", "EF", "GH"), + }, + { + name: "move_one", + moveIf: func(rl pmetric.ResourceMetrics, sl pmetric.ScopeMetrics, m pmetric.Metric) bool { + rname, ok := rl.Resource().Attributes().Get("resourceName") + return ok && rname.AsString() == "resourceA" && sl.Scope().Name() == "scopeD" && m.Name() == "metricF" + }, + from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + to: pmetric.NewMetrics(), + expectFrom: pmetricutiltest.NewMetricsFromOpts( + pmetricutiltest.WithResource('A', + pmetricutiltest.WithScope('C', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")), + pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH")), + ), + pmetricutiltest.WithResource('B', + pmetricutiltest.WithScope('C', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")), + pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")), + ), + ), + expectTo: pmetricutiltest.NewMetrics("A", "D", "F", "GH"), + }, + { + name: "move_one_from_each_scope", + moveIf: func(_ pmetric.ResourceMetrics, _ pmetric.ScopeMetrics, m pmetric.Metric) bool { + return m.Name() == "metricE" + }, + from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + to: pmetric.NewMetrics(), + expectFrom: pmetricutiltest.NewMetrics("AB", "CD", "F", "GH"), + expectTo: pmetricutiltest.NewMetrics("AB", "CD", "E", "GH"), + }, + { + name: "move_one_from_each_scope_in_one_resource", + moveIf: func(rl pmetric.ResourceMetrics, _ pmetric.ScopeMetrics, m pmetric.Metric) bool { + rname, ok := rl.Resource().Attributes().Get("resourceName") + return ok && rname.AsString() == "resourceB" && m.Name() == "metricE" + }, + from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + to: pmetric.NewMetrics(), + expectFrom: pmetricutiltest.NewMetricsFromOpts( + pmetricutiltest.WithResource('A', + pmetricutiltest.WithScope('C', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")), + pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")), + ), + pmetricutiltest.WithResource('B', + pmetricutiltest.WithScope('C', pmetricutiltest.WithMetric('F', "GH")), + pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('F', "GH")), + ), + ), + expectTo: pmetricutiltest.NewMetrics("B", "CD", "E", "GH"), + }, + { + name: "move_some_to_preexisting", + moveIf: func(_ pmetric.ResourceMetrics, sl pmetric.ScopeMetrics, _ pmetric.Metric) bool { + return sl.Scope().Name() == "scopeD" + }, + from: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + to: pmetricutiltest.NewMetrics("1", "2", "3", "4"), + expectFrom: pmetricutiltest.NewMetrics("AB", "C", "EF", "GH"), + expectTo: pmetricutiltest.NewMetricsFromOpts( + pmetricutiltest.WithResource('1', pmetricutiltest.WithScope('2', pmetricutiltest.WithMetric('3', "4"))), + pmetricutiltest.WithResource('A', pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH"))), + pmetricutiltest.WithResource('B', pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH"))), + ), + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + pmetricutil.MoveMetricsWithContextIf(tt.from, tt.to, tt.moveIf) + assert.NoError(t, pmetrictest.CompareMetrics(tt.expectFrom, tt.from), "from not modified as expected") + assert.NoError(t, pmetrictest.CompareMetrics(tt.expectTo, tt.to), "to not as expected") + }) + } +} diff --git a/connector/routingconnector/internal/pmetricutiltest/metrics.go b/connector/routingconnector/internal/pmetricutiltest/metrics.go index ce8b2cb06d5e..a908e1638e63 100644 --- a/connector/routingconnector/internal/pmetricutiltest/metrics.go +++ b/connector/routingconnector/internal/pmetricutiltest/metrics.go @@ -43,3 +43,59 @@ func NewMetrics(resourceIDs, scopeIDs, metricIDs, dataPointIDs string) pmetric.M } return md } + +type Resource struct { + id byte + scopes []Scope +} + +type Scope struct { + id byte + metrics []Metric +} + +type Metric struct { + id byte + dataPoints string +} + +func WithResource(id byte, scopes ...Scope) Resource { + r := Resource{id: id} + r.scopes = append(r.scopes, scopes...) + return r +} + +func WithScope(id byte, metrics ...Metric) Scope { + s := Scope{id: id} + s.metrics = append(s.metrics, metrics...) + return s +} + +func WithMetric(id byte, dataPoints string) Metric { + return Metric{id: id, dataPoints: dataPoints} +} + +// NewMetricsFromOpts creates a pmetric.Metrics with the specified resources, scopes, metrics, +// and data points. The general idea is the same as NewMetrics, but this function allows for +// more flexibility in creating non-uniform structures. +func NewMetricsFromOpts(resources ...Resource) pmetric.Metrics { + md := pmetric.NewMetrics() + for _, resource := range resources { + r := md.ResourceMetrics().AppendEmpty() + r.Resource().Attributes().PutStr("resourceName", "resource"+string(resource.id)) + for _, scope := range resource.scopes { + s := r.ScopeMetrics().AppendEmpty() + s.Scope().SetName("scope" + string(scope.id)) + for _, metric := range scope.metrics { + m := s.Metrics().AppendEmpty() + m.SetName("metric" + string(metric.id)) + dps := m.SetEmptyGauge().DataPoints() + for i := 0; i < len(metric.dataPoints); i++ { + dp := dps.AppendEmpty() + dp.Attributes().PutStr("dpName", "dp"+string(metric.dataPoints[i])) + } + } + } + } + return md +} diff --git a/connector/routingconnector/internal/pmetricutiltest/metrics_test.go b/connector/routingconnector/internal/pmetricutiltest/metrics_test.go index 069a27f8282c..3be7405a1e14 100644 --- a/connector/routingconnector/internal/pmetricutiltest/metrics_test.go +++ b/connector/routingconnector/internal/pmetricutiltest/metrics_test.go @@ -18,6 +18,7 @@ func TestNewMetrics(t *testing.T) { t.Run("empty", func(t *testing.T) { expected := pmetric.NewMetrics() assert.NoError(t, pmetrictest.CompareMetrics(expected, pmetricutiltest.NewMetrics("", "", "", ""))) + assert.NoError(t, pmetrictest.CompareMetrics(expected, pmetricutiltest.NewMetricsFromOpts())) }) t.Run("simple", func(t *testing.T) { @@ -34,7 +35,15 @@ func TestNewMetrics(t *testing.T) { dp.Attributes().PutStr("dpName", "dpD") // resourceA.scopeB.metricC.dpD return md }() + fromOpts := pmetricutiltest.NewMetricsFromOpts( + pmetricutiltest.WithResource('A', + pmetricutiltest.WithScope('B', + pmetricutiltest.WithMetric('C', "D"), + ), + ), + ) assert.NoError(t, pmetrictest.CompareMetrics(expected, pmetricutiltest.NewMetrics("A", "B", "C", "D"))) + assert.NoError(t, pmetrictest.CompareMetrics(expected, fromOpts)) }) t.Run("two_resources", func(t *testing.T) { @@ -60,7 +69,20 @@ func TestNewMetrics(t *testing.T) { dp.Attributes().PutStr("dpName", "dpE") // resourceB.scopeC.metricD.dpE return md }() + fromOpts := pmetricutiltest.NewMetricsFromOpts( + pmetricutiltest.WithResource('A', + pmetricutiltest.WithScope('C', + pmetricutiltest.WithMetric('D', "E"), + ), + ), + pmetricutiltest.WithResource('B', + pmetricutiltest.WithScope('C', + pmetricutiltest.WithMetric('D', "E"), + ), + ), + ) assert.NoError(t, pmetrictest.CompareMetrics(expected, pmetricutiltest.NewMetrics("AB", "C", "D", "E"))) + assert.NoError(t, pmetrictest.CompareMetrics(expected, fromOpts)) }) t.Run("two_scopes", func(t *testing.T) { @@ -84,7 +106,18 @@ func TestNewMetrics(t *testing.T) { dp.Attributes().PutStr("dpName", "dpE") // resourceA.scopeC.metricD.dpE return md }() + fromOpts := pmetricutiltest.NewMetricsFromOpts( + pmetricutiltest.WithResource('A', + pmetricutiltest.WithScope('B', + pmetricutiltest.WithMetric('D', "E"), + ), + pmetricutiltest.WithScope('C', + pmetricutiltest.WithMetric('D', "E"), + ), + ), + ) assert.NoError(t, pmetrictest.CompareMetrics(expected, pmetricutiltest.NewMetrics("A", "BC", "D", "E"))) + assert.NoError(t, pmetrictest.CompareMetrics(expected, fromOpts)) }) t.Run("two_metrics", func(t *testing.T) { @@ -106,7 +139,16 @@ func TestNewMetrics(t *testing.T) { dp.Attributes().PutStr("dpName", "dpE") // resourceA.scopeB.metricD.dpE return md }() + fromOpts := pmetricutiltest.NewMetricsFromOpts( + pmetricutiltest.WithResource('A', + pmetricutiltest.WithScope('B', + pmetricutiltest.WithMetric('C', "E"), + pmetricutiltest.WithMetric('D', "E"), + ), + ), + ) assert.NoError(t, pmetrictest.CompareMetrics(expected, pmetricutiltest.NewMetrics("A", "B", "CD", "E"))) + assert.NoError(t, pmetrictest.CompareMetrics(expected, fromOpts)) }) t.Run("two_datapoints", func(t *testing.T) { @@ -125,6 +167,14 @@ func TestNewMetrics(t *testing.T) { dp.Attributes().PutStr("dpName", "dpE") // resourceA.scopeB.metricC.dpE return md }() + fromOpts := pmetricutiltest.NewMetricsFromOpts( + pmetricutiltest.WithResource('A', + pmetricutiltest.WithScope('B', + pmetricutiltest.WithMetric('C', "DE"), + ), + ), + ) assert.NoError(t, pmetrictest.CompareMetrics(expected, pmetricutiltest.NewMetrics("A", "B", "C", "DE"))) + assert.NoError(t, pmetrictest.CompareMetrics(expected, fromOpts)) }) } diff --git a/connector/routingconnector/logs_test.go b/connector/routingconnector/logs_test.go index 24747154c213..c0198fe16523 100644 --- a/connector/routingconnector/logs_test.go +++ b/connector/routingconnector/logs_test.go @@ -475,22 +475,20 @@ func TestLogsConnectorDetailed(t *testing.T) { isAcme := `request["X-Tenant"] == "acme"` - isAnyResource := `attributes["resourceName"] != nil` isResourceA := `attributes["resourceName"] == "resourceA"` isResourceB := `attributes["resourceName"] == "resourceB"` isResourceX := `attributes["resourceName"] == "resourceX"` isResourceY := `attributes["resourceName"] == "resourceY"` - isScopeC := `instrumentation_scope.name == "scopeC"` - isScopeD := `instrumentation_scope.name == "scopeD"` - - isAnyLog := `body != nil` isLogE := `body == "logE"` isLogF := `body == "logF"` isLogX := `body == "logX"` isLogY := `body == "logY"` - and, or := " and ", " or " + isScopeCFromLowerContext := `instrumentation_scope.name == "scopeC"` + isScopeDFromLowerContext := `instrumentation_scope.name == "scopeD"` + + isResourceBFromLowerContext := `resource.attributes["resourceName"] == "resourceB"` testCases := []struct { name string @@ -594,7 +592,7 @@ func TestLogsConnectorDetailed(t *testing.T) { { name: "resource/all_match_first_only", cfg: testConfig( - withRoute("resource", isAnyResource, idSink0), + withRoute("resource", "true", idSink0), withRoute("resource", isResourceY, idSink1), withDefault(idSinkD), ), @@ -607,7 +605,7 @@ func TestLogsConnectorDetailed(t *testing.T) { name: "resource/all_match_last_only", cfg: testConfig( withRoute("resource", isResourceX, idSink0), - withRoute("resource", isAnyResource, idSink1), + withRoute("resource", "true", idSink1), withDefault(idSinkD), ), input: plogutiltest.NewLogs("AB", "CD", "EF"), @@ -618,8 +616,8 @@ func TestLogsConnectorDetailed(t *testing.T) { { name: "resource/all_match_only_once", cfg: testConfig( - withRoute("resource", isAnyResource, idSink0), - withRoute("resource", isResourceA+or+isResourceB, idSink1), + withRoute("resource", "true", idSink0), + withRoute("resource", isResourceA+" or "+isResourceB, idSink1), withDefault(idSinkD), ), input: plogutiltest.NewLogs("AB", "CD", "EF"), @@ -688,7 +686,7 @@ func TestLogsConnectorDetailed(t *testing.T) { { name: "log/all_match_first_only", cfg: testConfig( - withRoute("log", isAnyLog, idSink0), + withRoute("log", "true", idSink0), withRoute("log", isLogY, idSink1), withDefault(idSinkD), ), @@ -701,7 +699,7 @@ func TestLogsConnectorDetailed(t *testing.T) { name: "log/all_match_last_only", cfg: testConfig( withRoute("log", isLogX, idSink0), - withRoute("log", isAnyLog, idSink1), + withRoute("log", "true", idSink1), withDefault(idSinkD), ), input: plogutiltest.NewLogs("AB", "CD", "EF"), @@ -712,8 +710,8 @@ func TestLogsConnectorDetailed(t *testing.T) { { name: "log/all_match_only_once", cfg: testConfig( - withRoute("log", isAnyLog, idSink0), - withRoute("log", isLogE+or+isLogF, idSink1), + withRoute("log", "true", idSink0), + withRoute("log", isLogE+" or "+isLogF, idSink1), withDefault(idSinkD), ), input: plogutiltest.NewLogs("AB", "CD", "EF"), @@ -782,7 +780,7 @@ func TestLogsConnectorDetailed(t *testing.T) { { name: "log/with_resource_condition", cfg: testConfig( - withRoute("log", "resource."+isResourceB+and+isAnyLog, idSink0), + withRoute("log", isResourceBFromLowerContext, idSink0), withRoute("log", isLogY, idSink1), withDefault(idSinkD), ), @@ -794,7 +792,7 @@ func TestLogsConnectorDetailed(t *testing.T) { { name: "log/with_scope_condition", cfg: testConfig( - withRoute("log", isScopeC+and+isAnyLog, idSink0), + withRoute("log", isScopeCFromLowerContext, idSink0), withRoute("log", isLogY, idSink1), withDefault(idSinkD), ), @@ -806,7 +804,7 @@ func TestLogsConnectorDetailed(t *testing.T) { { name: "log/with_resource_and_scope_conditions", cfg: testConfig( - withRoute("log", "resource."+isResourceB+and+isScopeD+and+isAnyLog, idSink0), + withRoute("log", isResourceBFromLowerContext+" and "+isScopeDFromLowerContext, idSink0), withRoute("log", isLogY, idSink1), withDefault(idSinkD), ), diff --git a/connector/routingconnector/metrics.go b/connector/routingconnector/metrics.go index 8f25c586bf71..874d8c2d9887 100644 --- a/connector/routingconnector/metrics.go +++ b/connector/routingconnector/metrics.go @@ -15,6 +15,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/pmetricutil" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" ) @@ -88,6 +89,15 @@ func (c *metricsConnector) switchMetrics(ctx context.Context, md pmetric.Metrics return isMatch }, ) + case "metric": + pmetricutil.MoveMetricsWithContextIf(md, matchedMetrics, + func(rm pmetric.ResourceMetrics, sm pmetric.ScopeMetrics, m pmetric.Metric) bool { + mtx := ottlmetric.NewTransformContext(m, sm.Metrics(), sm.Scope(), rm.Resource(), sm, rm) + _, isMatch, err := route.metricStatement.Execute(ctx, mtx) + errs = errors.Join(errs, err) + return isMatch + }, + ) } if errs != nil { if c.config.ErrorMode == ottl.PropagateError { diff --git a/connector/routingconnector/metrics_test.go b/connector/routingconnector/metrics_test.go index 0fba4eabc748..f87a15ff613c 100644 --- a/connector/routingconnector/metrics_test.go +++ b/connector/routingconnector/metrics_test.go @@ -505,12 +505,21 @@ func TestMetricsConnectorDetailed(t *testing.T) { isAcme := `request["X-Tenant"] == "acme"` - isAnyResource := `attributes["resourceName"] != nil` isResourceA := `attributes["resourceName"] == "resourceA"` isResourceB := `attributes["resourceName"] == "resourceB"` isResourceX := `attributes["resourceName"] == "resourceX"` isResourceY := `attributes["resourceName"] == "resourceY"` + isMetricE := `name == "metricE"` + isMetricF := `name == "metricF"` + isMetricX := `name == "metricX"` + isMetricY := `name == "metricY"` + + isScopeCFromLowerContext := `instrumentation_scope.name == "scopeC"` + isScopeDFromLowerContext := `instrumentation_scope.name == "scopeD"` + + isResourceBFromLowerContext := `resource.attributes["resourceName"] == "resourceB"` + testCases := []struct { name string cfg *Config @@ -613,7 +622,7 @@ func TestMetricsConnectorDetailed(t *testing.T) { { name: "resource/all_match_first_only", cfg: testConfig( - withRoute("resource", isAnyResource, idSink0), + withRoute("resource", "true", idSink0), withRoute("resource", isResourceY, idSink1), withDefault(idSinkD), ), @@ -626,7 +635,7 @@ func TestMetricsConnectorDetailed(t *testing.T) { name: "resource/all_match_last_only", cfg: testConfig( withRoute("resource", isResourceX, idSink0), - withRoute("resource", isAnyResource, idSink1), + withRoute("resource", "true", idSink1), withDefault(idSinkD), ), input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), @@ -637,7 +646,7 @@ func TestMetricsConnectorDetailed(t *testing.T) { { name: "resource/all_match_only_once", cfg: testConfig( - withRoute("resource", isAnyResource, idSink0), + withRoute("resource", "true", idSink0), withRoute("resource", isResourceA+" or "+isResourceB, idSink1), withDefault(idSinkD), ), @@ -704,6 +713,168 @@ func TestMetricsConnectorDetailed(t *testing.T) { expectSink1: pmetric.Metrics{}, expectSinkD: pmetric.Metrics{}, }, + { + name: "metric/all_match_first_only", + cfg: testConfig( + withRoute("metric", "true", idSink0), + withRoute("metric", isMetricY, idSink1), + withDefault(idSinkD), + ), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink1: pmetric.Metrics{}, + expectSinkD: pmetric.Metrics{}, + }, + { + name: "metric/all_match_last_only", + cfg: testConfig( + withRoute("metric", isMetricX, idSink0), + withRoute("metric", "true", idSink1), + withDefault(idSinkD), + ), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetric.Metrics{}, + expectSink1: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSinkD: pmetric.Metrics{}, + }, + { + name: "metric/all_match_only_once", + cfg: testConfig( + withRoute("metric", "true", idSink0), + withRoute("metric", isMetricE+" or "+isMetricF, idSink1), + withDefault(idSinkD), + ), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink1: pmetric.Metrics{}, + expectSinkD: pmetric.Metrics{}, + }, + { + name: "metric/each_matches_one", + cfg: testConfig( + withRoute("metric", isMetricE, idSink0), + withRoute("metric", isMetricF, idSink1), + withDefault(idSinkD), + ), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "E", "GH"), + expectSink1: pmetricutiltest.NewMetrics("AB", "CD", "F", "GH"), + expectSinkD: pmetric.Metrics{}, + }, + { + name: "metric/some_match_with_default", + cfg: testConfig( + withRoute("metric", isMetricX, idSink0), + withRoute("metric", isMetricF, idSink1), + withDefault(idSinkD), + ), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetric.Metrics{}, + expectSink1: pmetricutiltest.NewMetrics("AB", "CD", "F", "GH"), + expectSinkD: pmetricutiltest.NewMetrics("AB", "CD", "E", "GH"), + }, + { + name: "metric/some_match_without_default", + cfg: testConfig( + withRoute("metric", isMetricX, idSink0), + withRoute("metric", isMetricF, idSink1), + ), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetric.Metrics{}, + expectSink1: pmetricutiltest.NewMetrics("AB", "CD", "F", "GH"), + expectSinkD: pmetric.Metrics{}, + }, + { + name: "metric/match_none_with_default", + cfg: testConfig( + withRoute("metric", isMetricX, idSink0), + withRoute("metric", isMetricY, idSink1), + withDefault(idSinkD), + ), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetric.Metrics{}, + expectSink1: pmetric.Metrics{}, + expectSinkD: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + }, + { + name: "metric/match_none_without_default", + cfg: testConfig( + withRoute("metric", isMetricX, idSink0), + withRoute("metric", isMetricY, idSink1), + ), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetric.Metrics{}, + expectSink1: pmetric.Metrics{}, + expectSinkD: pmetric.Metrics{}, + }, + { + name: "metric/with_resource_condition", + cfg: testConfig( + withRoute("metric", isResourceBFromLowerContext, idSink0), + withRoute("metric", isMetricY, idSink1), + withDefault(idSinkD), + ), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("B", "CD", "EF", "GH"), + expectSink1: pmetric.Metrics{}, + expectSinkD: pmetricutiltest.NewMetrics("A", "CD", "EF", "GH"), + }, + { + name: "metric/with_scope_condition", + cfg: testConfig( + withRoute("metric", isScopeCFromLowerContext, idSink0), + withRoute("metric", isMetricY, idSink1), + withDefault(idSinkD), + ), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("AB", "C", "EF", "GH"), + expectSink1: pmetric.Metrics{}, + expectSinkD: pmetricutiltest.NewMetrics("AB", "D", "EF", "GH"), + }, + { + name: "metric/with_resource_and_scope_conditions", + cfg: testConfig( + withRoute("metric", isResourceBFromLowerContext+" and "+isScopeDFromLowerContext, idSink0), + withRoute("metric", isMetricY, idSink1), + withDefault(idSinkD), + ), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("B", "D", "EF", "GH"), + expectSink1: pmetric.Metrics{}, + expectSinkD: pmetricutiltest.NewMetricsFromOpts( + pmetricutiltest.WithResource('A', + pmetricutiltest.WithScope('C', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")), + pmetricutiltest.WithScope('D', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")), + ), + pmetricutiltest.WithResource('B', + pmetricutiltest.WithScope('C', pmetricutiltest.WithMetric('E', "GH"), pmetricutiltest.WithMetric('F', "GH")), + ), + ), + }, + { + name: "mixed/match_resource_then_metrics", + cfg: testConfig( + withRoute("resource", isResourceA, idSink0), + withRoute("metric", isMetricE, idSink1), + withDefault(idSinkD), + ), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("A", "CD", "EF", "GH"), + expectSink1: pmetricutiltest.NewMetrics("B", "CD", "E", "GH"), + expectSinkD: pmetricutiltest.NewMetrics("B", "CD", "F", "GH"), + }, + { + name: "mixed/match_metrics_then_resource", + cfg: testConfig( + withRoute("metric", isMetricE, idSink0), + withRoute("resource", isResourceB, idSink1), + withDefault(idSinkD), + ), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "E", "GH"), + expectSink1: pmetricutiltest.NewMetrics("B", "CD", "F", "GH"), + expectSinkD: pmetricutiltest.NewMetrics("A", "CD", "F", "GH"), + }, { name: "mixed/match_resource_then_grpc_request", cfg: testConfig( @@ -717,6 +888,19 @@ func TestMetricsConnectorDetailed(t *testing.T) { expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "GH"), expectSinkD: pmetric.Metrics{}, }, + { + name: "mixed/match_metrics_then_grpc_request", + cfg: testConfig( + withRoute("metric", isMetricF, idSink0), + withRoute("request", isAcme, idSink1), + withDefault(idSinkD), + ), + ctx: withGRPCMetadata(context.Background(), map[string]string{"X-Tenant": "acme"}), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "F", "GH"), + expectSink1: pmetricutiltest.NewMetrics("AB", "CD", "E", "GH"), + expectSinkD: pmetric.Metrics{}, + }, { name: "mixed/match_resource_then_http_request", cfg: testConfig( @@ -730,6 +914,19 @@ func TestMetricsConnectorDetailed(t *testing.T) { expectSink1: pmetricutiltest.NewMetrics("B", "CD", "EF", "GH"), expectSinkD: pmetric.Metrics{}, }, + { + name: "mixed/match_metrics_then_http_request", + cfg: testConfig( + withRoute("metric", isMetricF, idSink0), + withRoute("request", isAcme, idSink1), + withDefault(idSinkD), + ), + ctx: withHTTPMetadata(context.Background(), map[string][]string{"X-Tenant": {"acme"}}), + input: pmetricutiltest.NewMetrics("AB", "CD", "EF", "GH"), + expectSink0: pmetricutiltest.NewMetrics("AB", "CD", "F", "GH"), + expectSink1: pmetricutiltest.NewMetrics("AB", "CD", "E", "GH"), + expectSinkD: pmetric.Metrics{}, + }, } for _, tt := range testCases { diff --git a/connector/routingconnector/router.go b/connector/routingconnector/router.go index 9114695bab67..01dd13143261 100644 --- a/connector/routingconnector/router.go +++ b/connector/routingconnector/router.go @@ -15,6 +15,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/internal/common" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" ) @@ -31,6 +32,7 @@ type consumerProvider[C any] func(...pipeline.ID) (C, error) type router[C any] struct { logger *zap.Logger resourceParser ottl.Parser[ottlresource.TransformContext] + metricParser ottl.Parser[ottlmetric.TransformContext] logParser ottl.Parser[ottllog.TransformContext] table []RoutingTableItem @@ -72,15 +74,18 @@ type routingItem[C any] struct { statementContext string requestCondition *requestCondition resourceStatement *ottl.Statement[ottlresource.TransformContext] + metricStatement *ottl.Statement[ottlmetric.TransformContext] logStatement *ottl.Statement[ottllog.TransformContext] } func (r *router[C]) buildParsers(table []RoutingTableItem, settings component.TelemetrySettings) error { - var buildResource, buildLog bool + var buildResource, buildMetric, buildLog bool for _, item := range table { switch item.Context { case "", "resource": buildResource = true + case "metric": + buildMetric = true case "log": buildLog = true } @@ -98,6 +103,17 @@ func (r *router[C]) buildParsers(table []RoutingTableItem, settings component.Te errs = errors.Join(errs, err) } } + if buildMetric { + parser, err := ottlmetric.NewParser( + common.Functions[ottlmetric.TransformContext](), + settings, + ) + if err == nil { + r.metricParser = parser + } else { + errs = errors.Join(errs, err) + } + } if buildLog { parser, err := ottllog.NewParser( common.Functions[ottllog.TransformContext](), @@ -174,6 +190,12 @@ func (r *router[C]) registerRouteConsumers() (err error) { return err } route.resourceStatement = statement + case "metric": + statement, err := r.metricParser.ParseStatement(item.Statement) + if err != nil { + return err + } + route.metricStatement = statement case "log": statement, err := r.logParser.ParseStatement(item.Statement) if err != nil {