Skip to content

Commit

Permalink
[processor/metricstransformprocessor]: Support median aggregation type
Browse files Browse the repository at this point in the history
Signed-off-by: odubajDT <ondrej.dubaj@dynatrace.com>
  • Loading branch information
odubajDT committed Jul 18, 2024
1 parent 2997a90 commit fe9b1f4
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 346 deletions.
27 changes: 27 additions & 0 deletions .chloggen/aggregation-metricstransformprocessor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: metricstransformprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Adds the 'median' aggregation type to the Metrics Transform Processor. Also uses the refactored aggregation business logic from internal/core package."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [16224]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
8 changes: 6 additions & 2 deletions processor/metricstransformprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ processors:
# new_name specifies the updated name of the metric; if action is insert or combine, new_name is required
new_name: <new_metric_name_inserted>
# aggregation_type defines how combined data points will be aggregated; if action is combine, aggregation_type is required
aggregation_type: {sum, mean, min, max, count}
aggregation_type: {sum, mean, min, max, count, median}
# submatch_case specifies the case that should be used when adding label values based on regexp submatches when performing a combine action; leave blank to use the submatch value as is
submatch_case: {lower, upper}
# operations contain a list of operations that will be performed on the resulting metric(s)
Expand All @@ -106,7 +106,7 @@ processors:
# label_set contains a list of labels that will remain after aggregation; if action is aggregate_labels, label_set is required
label_set: [labels...]
# aggregation_type defines how data points will be aggregated; if action is aggregate_labels or aggregate_label_values, aggregation_type is required
aggregation_type: {sum, mean, min, max, count}
aggregation_type: {sum, mean, min, max, count, median}
# experimental_scale specifies the scalar to apply to values
experimental_scale: <scalar>
# value_actions contain a list of operations that will be performed on the selected label
Expand Down Expand Up @@ -273,6 +273,8 @@ operations:
aggregation_type: sum
```
**NOTE:** Only the `sum` aggregation function is supported for histogram and exponential histogram datatypes.

### Aggregate label values
```yaml
# aggregate data points with state label value slab_reclaimable & slab_unreclaimable using summation into slab
Expand All @@ -286,6 +288,8 @@ operations:
aggregation_type: sum
```

**NOTE:** Only the `sum` aggregation function is supported for histogram and exponential histogram datatypes.

### Combine metrics
```yaml
# convert a set of metrics for each http_method into a single metric with an http_method label, i.e.
Expand Down
38 changes: 4 additions & 34 deletions processor/metricstransformprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package metricstransformprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor"

import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil"

const (
// includeFieldName is the mapstructure field name for Include field
includeFieldName = "include"
Expand Down Expand Up @@ -75,7 +77,7 @@ type transform struct {

// AggregationType specifies how to aggregate.
// REQUIRED only if Action is COMBINE.
AggregationType aggregationType `mapstructure:"aggregation_type"`
AggregationType aggregateutil.AggregationType `mapstructure:"aggregation_type"`

// SubmatchCase specifies what case to use for label values created from regexp submatches.
SubmatchCase submatchCase `mapstructure:"submatch_case"`
Expand Down Expand Up @@ -112,7 +114,7 @@ type Operation struct {
LabelSet []string `mapstructure:"label_set"`

// AggregationType specifies how to aggregate.
AggregationType aggregationType `mapstructure:"aggregation_type"`
AggregationType aggregateutil.AggregationType `mapstructure:"aggregation_type"`

// AggregatedValues is a list of label values to aggregate away.
AggregatedValues []string `mapstructure:"aggregated_values"`
Expand Down Expand Up @@ -216,38 +218,6 @@ func (oa operationAction) isValid() bool {
return false
}

// aggregationType is the enum to capture the three types of aggregation for the aggregation operation.
type aggregationType string

const (
// sum indicates taking the sum of the aggregated data.
sum aggregationType = "sum"

// mean indicates taking the mean of the aggregated data.
mean aggregationType = "mean"

// min indicates taking the minimum of the aggregated data.
min aggregationType = "min"

// max indicates taking the max of the aggregated data.
max aggregationType = "max"

// count indicates taking the count of the aggregated data.
count aggregationType = "count"
)

var aggregationTypes = []aggregationType{sum, mean, min, max, count}

func (at aggregationType) isValid() bool {
for _, aggregationType := range aggregationTypes {
if at == aggregationType {
return true
}
}

return false
}

// matchType is the enum to capture the two types of matching metric(s) that should have operations applied to them.
type matchType string

Expand Down
9 changes: 5 additions & 4 deletions processor/metricstransformprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor/internal/metadata"
)

Expand Down Expand Up @@ -88,8 +89,8 @@ func validateConfiguration(config *Config) error {
return fmt.Errorf("missing required field %q while %q is %v", groupResourceLabelsFieldName, actionFieldName, Group)
}

if transform.AggregationType != "" && !transform.AggregationType.isValid() {
return fmt.Errorf("%q must be in %q", aggregationTypeFieldName, aggregationTypes)
if transform.AggregationType != "" && !transform.AggregationType.IsValid() {
return fmt.Errorf("%q must be in %q", aggregationTypeFieldName, aggregateutil.AggregationTypes)
}

if transform.SubmatchCase != "" && !transform.SubmatchCase.isValid() {
Expand All @@ -114,8 +115,8 @@ func validateConfiguration(config *Config) error {
return fmt.Errorf("operation %v: missing required field %q while %q is %v", i+1, scaleFieldName, actionFieldName, scaleValue)
}

if op.AggregationType != "" && !op.AggregationType.isValid() {
return fmt.Errorf("operation %v: %q must be in %q", i+1, aggregationTypeFieldName, aggregationTypes)
if op.AggregationType != "" && !op.AggregationType.IsValid() {
return fmt.Errorf("operation %v: %q must be in %q", i+1, aggregationTypeFieldName, aggregateutil.AggregationTypes)
}
}
}
Expand Down
13 changes: 7 additions & 6 deletions processor/metricstransformprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/processor/processortest"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor/internal/metadata"
)

Expand Down Expand Up @@ -87,7 +88,7 @@ func TestCreateProcessors(t *testing.T) {
{
configName: "config_invalid_aggregationtype.yaml",
succeed: false,
errorMessage: fmt.Sprintf("%q must be in %q", aggregationTypeFieldName, aggregationTypes),
errorMessage: fmt.Sprintf("%q must be in %q", aggregationTypeFieldName, aggregateutil.AggregationTypes),
},
{
configName: "config_invalid_operation_action.yaml",
Expand All @@ -97,7 +98,7 @@ func TestCreateProcessors(t *testing.T) {
{
configName: "config_invalid_operation_aggregationtype.yaml",
succeed: false,
errorMessage: fmt.Sprintf("operation %v: %q must be in %q", 1, aggregationTypeFieldName, aggregationTypes),
errorMessage: fmt.Sprintf("operation %v: %q must be in %q", 1, aggregationTypeFieldName, aggregateutil.AggregationTypes),
},
{
configName: "config_invalid_submatchcase.yaml",
Expand Down Expand Up @@ -221,14 +222,14 @@ func TestCreateProcessorsFilledData(t *testing.T) {
{
Action: aggregateLabels,
LabelSet: []string{"label1", "label2"},
AggregationType: sum,
AggregationType: aggregateutil.Sum,
},
{
Action: aggregateLabelValues,
Label: "label",
AggregatedValues: []string{"value1", "value2"},
NewValue: "new-value",
AggregationType: sum,
AggregationType: aggregateutil.Sum,
},
},
},
Expand Down Expand Up @@ -265,7 +266,7 @@ func TestCreateProcessorsFilledData(t *testing.T) {
configOperation: Operation{
Action: aggregateLabels,
LabelSet: []string{"label1", "label2"},
AggregationType: sum,
AggregationType: aggregateutil.Sum,
},
labelSetMap: map[string]bool{
"label1": true,
Expand All @@ -278,7 +279,7 @@ func TestCreateProcessorsFilledData(t *testing.T) {
Label: "label",
AggregatedValues: []string{"value1", "value2"},
NewValue: "new-value",
AggregationType: sum,
AggregationType: aggregateutil.Sum,
},
aggregatedValuesSet: map[string]bool{
"value1": true,
Expand Down
3 changes: 3 additions & 0 deletions processor/metricstransformprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/metri
go 1.21.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.105.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.105.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.105.0
github.com/stretchr/testify v1.9.0
Expand Down Expand Up @@ -73,3 +74,5 @@ retract (
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil"
)

type metricsTransformProcessor struct {
Expand All @@ -23,7 +25,7 @@ type internalTransform struct {
Action ConfigAction
NewName string
GroupResourceLabels map[string]string
AggregationType aggregationType
AggregationType aggregateutil.AggregationType
SubmatchCase submatchCase
Operations []internalOperation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/aggregateutil"
)

// extractAndRemoveMatchedMetrics extracts matched metrics from ms metric slice and returns a new slice.
Expand Down Expand Up @@ -440,6 +442,17 @@ func combine(transform internalTransform, metrics pmetric.MetricSlice) pmetric.M
return combinedMetric
}

// groupMetrics groups all the provided timeseries that will be aggregated together based on all the label values.
// Returns a map of grouped timeseries and the corresponding selected labels
// canBeCombined must be callled before.
func groupMetrics(metrics pmetric.MetricSlice, aggType aggregateutil.AggregationType, to pmetric.Metric) {
ag := aggregateutil.AggGroups{}
for i := 0; i < metrics.Len(); i++ {
aggregateutil.GroupDataPoints(metrics.At(i), &ag)
}
aggregateutil.MergeDataPoints(to, aggType, ag)
}

func copyMetricDetails(from, to pmetric.Metric) {
to.SetName(from.Name())
to.SetUnit(from.Unit())
Expand Down Expand Up @@ -541,7 +554,13 @@ func transformMetric(metric pmetric.Metric, transform internalTransform) bool {
updateLabelOp(metric, op, transform.MetricIncludeFilter)
case aggregateLabels:
if canChangeMetric {
aggregateLabelsOp(metric, op)
ops := []string{}
for k, v := range op.labelSetMap {
if v {
ops = append(ops, k)
}
}
aggregateLabelsOp(metric, ops, op.configOperation.AggregationType)
}
case aggregateLabelValues:
if canChangeMetric {
Expand Down
Loading

0 comments on commit fe9b1f4

Please sign in to comment.