Skip to content

Commit

Permalink
Implement "split" metric action
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrannajaryan committed Jul 7, 2022
1 parent 886822b commit 2b45e48
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 126 deletions.
1 change: 0 additions & 1 deletion .idea/watcherTasks.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 17 additions & 17 deletions schema/ast/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,30 @@ type VersionOfMetrics struct {
}

type MetricTranslationAction struct {
RenameMetrics map[types.MetricName]types.MetricName `yaml:"rename_metrics"`
RenameLabels *LabelMapForMetrics `yaml:"rename_labels"`
AddLabels *LabelMapForMetrics `yaml:"add_labels"`
DuplicateLabels *LabelMapForMetrics `yaml:"duplicate_labels"`
Split *SplitMetric `yaml:"split"`
Merge *MergeMetric `yaml:"merge"`
ToDelta []types.MetricName `yaml:"to_delta"`
RenameMetrics map[types.MetricName]types.MetricName `yaml:"rename_metrics"`
RenameLabels *AttributeMapForMetrics `yaml:"rename_attributes"`
AddAttributes *AttributeMapForMetrics `yaml:"add_attributes"`
DuplicateAttributes *AttributeMapForMetrics `yaml:"duplicate_attributes"`
Split *SplitMetric `yaml:"split"`
Merge *MergeMetric `yaml:"merge"`
ToDelta []types.MetricName `yaml:"to_delta"`
}

type LabelMapForMetrics struct {
type AttributeMapForMetrics struct {
ApplyToMetrics []types.MetricName `yaml:"apply_to_metrics"`
LabelMap map[string]string `yaml:"label_map"`
AttributeMap map[string]string `yaml:"label_map"`
}

type SplitMetric struct {
ApplyToMetric types.MetricName `yaml:"apply_to_metric"`
ByLabel string `yaml:"by_label"`
LabelsToMetrics map[types.LabelValue]types.MetricName `yaml:"labels_to_metrics"`
ApplyToMetric types.MetricName `yaml:"apply_to_metric"`
ByAttribute types.AttributeName `yaml:"by_attribute"`
AttributesToMetrics map[types.AttributeValue]types.MetricName `yaml:"attributes_to_metrics"`
}

type MergeMetric struct {
CreateMetric types.MetricName `yaml:"create_metric"`
ByLabel string `yaml:"by_label"`
LabelsForMetrics map[types.LabelValue]types.MetricName `yaml:"labels_for_metrics"`
CreateMetric types.MetricName `yaml:"create_metric"`
ByAttribute string `yaml:"by_attribute"`
AttributesForMetrics map[types.AttributeValue]types.MetricName `yaml:"attributes_for_metrics"`
}

type MetricSchema struct {
Expand All @@ -40,10 +40,10 @@ type MetricSchema struct {
ValueType string `yaml:"value_type"`
Temporality string
Monotonic bool
Labels map[string]LabelSchema
Attributes map[string]AttributesSchema
}

type LabelSchema struct {
type AttributesSchema struct {
Values []string
Description string
Required string
Expand Down
74 changes: 39 additions & 35 deletions schema/compiled/compiled_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
otlpmetric "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"
otlpresource "github.com/open-telemetry/opentelemetry-proto/gen/go/resource/v1"
otlptrace "github.com/open-telemetry/opentelemetry-proto/gen/go/trace/v1"

"github.com/tigrannajaryan/telemetry-schema/schema/types"
)

Expand Down Expand Up @@ -39,24 +38,18 @@ func (acts ResourceActions) Apply(resource *otlpresource.Resource) error {
}

type MetricActions struct {
ByName map[types.MetricName][]MetricAction
OtherMetrics []MetricAction
Actions []MetricAction
}

func (acts MetricActions) Apply(metric *otlpmetric.Metric) error {
metricName := metric.MetricDescriptor.Name
actions, exists := acts.ByName[types.MetricName(metricName)]
if !exists {
actions = acts.OtherMetrics
}

for _, a := range actions {
err := a.Apply(metric)
func (acts MetricActions) Apply(metrics []*otlpmetric.Metric) ([]*otlpmetric.Metric, error) {
for _, a := range acts.Actions {
var err error
metrics, err = a.Apply(metrics)
if err != nil {
return err
return metrics, err
}
}
return nil
return metrics, nil
}

type ResourceAction interface {
Expand All @@ -82,7 +75,7 @@ func (acts SpanActions) Apply(span *otlptrace.Span) error {
}

type MetricAction interface {
Apply(metric *otlpmetric.Metric) error
Apply(metrics []*otlpmetric.Metric) ([]*otlpmetric.Metric, error)
}

//type LogRecordAction interface {
Expand All @@ -101,11 +94,15 @@ func (afv ActionsForVersions) Swap(i, j int) {
afv[i], afv[j] = afv[j], afv[i]
}

func (s *Schema) ConvertResourceToLatest(fromVersion types.TelemetryVersion, resource *otlpresource.Resource) error {
startIndex := sort.Search(len(s.Versions), func(i int) bool {
// TODO: use proper semver comparison.
return s.Versions[i].VersionNum > fromVersion
})
func (s *Schema) ConvertResourceToLatest(
fromVersion types.TelemetryVersion, resource *otlpresource.Resource,
) error {
startIndex := sort.Search(
len(s.Versions), func(i int) bool {
// TODO: use proper semver comparison.
return s.Versions[i].VersionNum > fromVersion
},
)
if startIndex > len(s.Versions) {
// Nothing to do
return nil
Expand All @@ -120,11 +117,15 @@ func (s *Schema) ConvertResourceToLatest(fromVersion types.TelemetryVersion, res
return nil
}

func (s *Schema) ConvertSpansToLatest(fromVersion types.TelemetryVersion, spans []*otlptrace.Span) error {
startIndex := sort.Search(len(s.Versions), func(i int) bool {
// TODO: use proper semver comparison.
return s.Versions[i].VersionNum > fromVersion
})
func (s *Schema) ConvertSpansToLatest(
fromVersion types.TelemetryVersion, spans []*otlptrace.Span,
) error {
startIndex := sort.Search(
len(s.Versions), func(i int) bool {
// TODO: use proper semver comparison.
return s.Versions[i].VersionNum > fromVersion
},
)
if startIndex > len(s.Versions) {
// Nothing to do
return nil
Expand All @@ -142,22 +143,25 @@ func (s *Schema) ConvertSpansToLatest(fromVersion types.TelemetryVersion, spans
return nil
}

func (s *Schema) ConvertMetricsToLatest(fromVersion types.TelemetryVersion, metrics []*otlpmetric.Metric) error {
startIndex := sort.Search(len(s.Versions), func(i int) bool {
// TODO: use proper semver comparison.
return s.Versions[i].VersionNum > fromVersion
})
func (s *Schema) ConvertMetricsToLatest(
fromVersion types.TelemetryVersion, metrics *[]*otlpmetric.Metric,
) error {
startIndex := sort.Search(
len(s.Versions), func(i int) bool {
// TODO: use proper semver comparison.
return s.Versions[i].VersionNum > fromVersion
},
)
if startIndex > len(s.Versions) {
// Nothing to do
return nil
}

for i := startIndex; i < len(s.Versions); i++ {
for j := 0; j < len(metrics); j++ {
metric := metrics[j]
if err := s.Versions[i].Metrics.Apply(metric); err != nil {
return err
}
var err error
*metrics, err = s.Versions[i].Metrics.Apply(*metrics)
if err != nil {
return err
}
}

Expand Down
110 changes: 89 additions & 21 deletions schema/compiled/metric_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ import (

otlpcommon "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
otlpmetric "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"

"github.com/tigrannajaryan/telemetry-schema/schema/types"
)

type MetricRenameAction map[types.MetricName]types.MetricName

func (act MetricRenameAction) Apply(metric *otlpmetric.Metric) error {
newName, exists := act[types.MetricName(metric.MetricDescriptor.Name)]
if exists {
metric.MetricDescriptor.Name = string(newName)
func (act MetricRenameAction) Apply(metrics []*otlpmetric.Metric) ([]*otlpmetric.Metric, error) {
for _, metric := range metrics {
newName, exists := act[types.MetricName(metric.MetricDescriptor.Name)]
if exists {
metric.MetricDescriptor.Name = string(newName)
}
}
return nil
return metrics, nil
}

type MetricLabelRenameAction struct {
Expand All @@ -26,27 +27,34 @@ type MetricLabelRenameAction struct {
LabelMap map[string]string
}

func (act MetricLabelRenameAction) Apply(metric *otlpmetric.Metric) error {
if len(act.ApplyOnlyToMetrics) > 0 {
if _, exists := act.ApplyOnlyToMetrics[types.MetricName(metric.MetricDescriptor.Name)]; !exists {
return nil
func (act MetricLabelRenameAction) Apply(metrics []*otlpmetric.Metric) (
[]*otlpmetric.Metric, error,
) {
var retErr error
for _, metric := range metrics {

if len(act.ApplyOnlyToMetrics) > 0 {
if _, exists := act.ApplyOnlyToMetrics[types.MetricName(metric.MetricDescriptor.Name)]; !exists {
continue
}
}
}

dt := metric.MetricDescriptor.Type
switch dt {
case otlpmetric.MetricDescriptor_INT64:
dps := metric.Int64DataPoints
for i := 0; i < len(dps); i++ {
dp := dps[i]
err := renameLabels(dp.Labels, act.LabelMap)
if err != nil {
return err
dt := metric.MetricDescriptor.Type
switch dt {
case otlpmetric.MetricDescriptor_INT64:
dps := metric.Int64DataPoints
for i := 0; i < len(dps); i++ {
dp := dps[i]
err := renameLabels(dp.Labels, act.LabelMap)
if err != nil {
retErr = err
}
}
}

}

return nil
return metrics, retErr
}

func renameLabels(labels []*otlpcommon.StringKeyValue, renameRules map[string]string) error {
Expand Down Expand Up @@ -74,3 +82,63 @@ func renameLabels(labels []*otlpcommon.StringKeyValue, renameRules map[string]st
}
return err
}

type MetricSplitAction struct {
// ApplyOnlyToMetrics limits which metrics this action should apply to. If empty then
// there is no limitation.
MetricName types.MetricName
AttributeName types.AttributeName
SplitMap map[types.AttributeValue]types.MetricName
}

func (act MetricSplitAction) Apply(metrics []*otlpmetric.Metric) ([]*otlpmetric.Metric, error) {
for i := 0; i < len(metrics); i++ {
metric := metrics[i]
if act.MetricName != types.MetricName(metric.MetricDescriptor.Name) {
continue
}

var outputMetrics []*otlpmetric.Metric
dt := metric.MetricDescriptor.Type
switch dt {
case otlpmetric.MetricDescriptor_INT64:
dps := metric.Int64DataPoints
for j := 0; j < len(dps); j++ {
dp := dps[j]
outputMetric := splitMetric(act.AttributeName, act.SplitMap, metric, dp)
outputMetrics = append(outputMetrics, outputMetric)
}
}

metrics = append(append(metrics[0:i], outputMetrics...), metrics[i+1:]...)
}

return metrics, nil
}

func splitMetric(
splitByAttr types.AttributeName,
splitRules map[types.AttributeValue]types.MetricName,
input *otlpmetric.Metric,
inputDp *otlpmetric.Int64DataPoint,
) *otlpmetric.Metric {
output := &otlpmetric.Metric{}
descr := *input.MetricDescriptor
output.MetricDescriptor = &descr

outputDp := *inputDp
outputDp.Labels = nil

for _, label := range inputDp.Labels {
if label.Key == string(splitByAttr) {
if convertTo, exists := splitRules[types.AttributeValue(label.Value)]; exists {
newMetricName := string(convertTo)
output.MetricDescriptor.Name = newMetricName
}
continue
}
outputDp.Labels = append(outputDp.Labels, label)
}
output.Int64DataPoints = []*otlpmetric.Int64DataPoint{&outputDp}
return output
}
Loading

0 comments on commit 2b45e48

Please sign in to comment.