Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Numalogic original payload support #34

Merged
merged 11 commits into from
Jun 7, 2024
29 changes: 18 additions & 11 deletions prometheus-pusher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (p *prometheusSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.
failed = failed.Append(sinksdk.ResponseFailure(datum.ID(), "failed to push the metrics"))
}
var pls []PrometheusPayload
var prometheusPayload PrometheusPayload
var prometheusPayloads []*PrometheusPayload
for _, payloadMsg := range payloads {
p.metrics.IncreaseTotalPushed()
if p.enableMsgTransformer {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a feature flag to support univariate payload and multivariate numalogic payload

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just temporary right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. once we moved to Multivariant. we can remove this code

Expand All @@ -125,21 +125,27 @@ func (p *prometheusSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.
p.metrics.IncreaseTotalSkipped()
return failed
}
prometheusPayload = *opl.ConvertToPrometheusPayload(p.metricsName)
prometheusPayloads = opl.ConvertToPrometheusPayload(p.metricsName)
for _, prometheusPayload := range prometheusPayloads {
prometheusPayload.mergeLabels(p.labels)
if len(p.excludeLabels) > 0 {
prometheusPayload.excludeLabels(p.excludeLabels)
}
pls = append(pls, *prometheusPayload)
}
} else {
var prometheusPayload PrometheusPayload
err := json.Unmarshal([]byte(payloadMsg), &prometheusPayload)
if !p.skipFailed && err != nil {
p.metrics.IncreaseTotalSkipped()
return failed
}
prometheusPayload.mergeLabels(p.labels)
if len(p.excludeLabels) > 0 {
prometheusPayload.excludeLabels(p.excludeLabels)
}
pls = append(pls, prometheusPayload)
}
prometheusPayload.mergeLabels(p.labels)

if len(p.excludeLabels) > 0 {
prometheusPayload.excludeLabels(p.excludeLabels)
}

pls = append(pls, prometheusPayload)
}
err := p.push(pls)
if err != nil {
Expand Down Expand Up @@ -215,8 +221,9 @@ func main() {
}
}

ps := prometheusSink{logger: logger, skipFailed: skipFailed, labels: labels, excludeLabels: excludeLabels, ignoreMetricsTs: ignoreMetricsTs,
metricsName: metricName, enableMsgTransformer: enableMsgTransformer}
ps := prometheusSink{logger: logger, skipFailed: skipFailed, labels: labels, excludeLabels: excludeLabels,
ignoreMetricsTs: ignoreMetricsTs, metricsName: metricName, enableMsgTransformer: enableMsgTransformer}

ps.metrics = NewMetricsServer(labels)
go ps.metrics.startMetricServer(metricPort)
ps.logger.Infof("Metrics publisher initialized with port=%d", metricPort)
Expand Down
23 changes: 19 additions & 4 deletions prometheus-pusher/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ type OriginalPayload struct {
Metadata map[string]interface{} `json:"metadata"`
}

func (op *OriginalPayload) ConvertToPrometheusPayload(metricName string) *PrometheusPayload {
func (op *OriginalPayload) ConvertToPrometheusPayload(metricName string) []*PrometheusPayload {
payloads := make([]*PrometheusPayload, 0)

value, err := strconv.ParseFloat(fmt.Sprintf("%.4f", op.UnifiedAnomaly), 64)
if err != nil {
Expand All @@ -74,15 +75,29 @@ func (op *OriginalPayload) ConvertToPrometheusPayload(metricName string) *Promet
if namespace == nil {
namespace = ""
}
payload := &PrometheusPayload{
payloads = append(payloads, &PrometheusPayload{
Name: metricName,
TimestampMs: op.Timestamp,
Namespace: namespace.(string),
Subsystem: "none",
Type: "Gauge",
Value: value,
Labels: labels,
})
for name, metricVal := range op.Data {
value, err := strconv.ParseFloat(fmt.Sprintf("%.4f", metricVal), 64)
if err != nil {
value = 0
}
payloads = append(payloads, &PrometheusPayload{
Name: fmt.Sprintf("%s_anomaly", name),
TimestampMs: op.Timestamp,
Namespace: namespace.(string),
Subsystem: "none",
Type: "Gauge",
Value: value,
Labels: labels,
})
}
return payload

return payloads
}
72 changes: 43 additions & 29 deletions prometheus-pusher/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,30 @@ import (
)

func TestConvertToPrometheusPayload(t *testing.T) {
JsonStr := `{"uuid":"35e0dc4603c845c9b999f5f669c64606","config_id":"test","composite_keys":["test_namespace","test_app","597b5bd8cc"],"timestamp":1701201827,"unified_anomaly":1.2,"data":{"namespace_app_rollouts_http_request_error_rate":null},"metadata":{"model_version":0,"artifact_versions":{"MinMaxScaler":"0","LSTMAE":"0","StdDevThreshold":"0"},"app":"test-app","intuit_alert":"true","namespace":"test-namespace","numalogic":"true","prometheus":"k8s-prometheus","rollouts_pod_template_hash":"597b5bd8cc"}}`
JsonStr := `{"uuid":"35e0dc4603c845c9b999f5f669c64606","config_id":"test","composite_keys":["test_namespace","test_app","597b5bd8cc"],"timestamp":1701201827,"unified_anomaly":1.2,"data":{"namespace_app_rollouts_cpu_utilization":0.517299409015888,"namespace_app_rollouts_http_request_error_rate":0.517299409015888,"namespace_app_rollouts_memory_utilization":0.517299409015888,"namespace_app_rollouts_http_requests_latency":0.517299409015888},"metadata":{"model_version":0,"artifact_versions":{"MinMaxScaler":"0","LSTMAE":"0","StdDevThreshold":"0"},"app":"test-app","intuit_alert":"true","namespace":"test-namespace","numalogic":"true","prometheus":"k8s-prometheus","rollouts_pod_template_hash":"597b5bd8cc"}}`

var origiObj OriginalPayload
err := json.Unmarshal([]byte(JsonStr), &origiObj)
assert.NoError(t, err)
prometheusPayload := origiObj.ConvertToPrometheusPayload("test")
assert.Equal(t, prometheusPayload.TimestampMs, int64(1701201827))
assert.Equal(t, prometheusPayload.Value, 1.2)
assert.Equal(t, prometheusPayload.Labels["app"], "test-app")
assert.Equal(t, prometheusPayload.Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload.Labels["rollouts_pod_template_hash"], "597b5bd8cc")
assert.Equal(t, prometheusPayload[0].TimestampMs, int64(1701201827))
assert.Equal(t, prometheusPayload[0].Value, 1.2)
assert.Equal(t, prometheusPayload[0].Labels["app"], "test-app")
assert.Equal(t, prometheusPayload[0].Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload[0].Labels["rollouts_pod_template_hash"], "597b5bd8cc")

payloadJson, err := json.Marshal(prometheusPayload)
assert.NoError(t, err)
assert.Equal(t, prometheusPayload[1].TimestampMs, int64(1701201827))
assert.Equal(t, 0.5173, prometheusPayload[1].Value)
assert.Contains(t, string(payloadJson), "namespace_app_rollouts_cpu_utilization_anomaly")
assert.Contains(t, string(payloadJson), "namespace_app_rollouts_memory_utilization_anomaly")
assert.Contains(t, string(payloadJson), "namespace_app_rollouts_http_request_error_rate_anomaly")
assert.Contains(t, string(payloadJson), "namespace_app_rollouts_http_requests_latency_anomaly")
assert.Equal(t, prometheusPayload[1].Labels["app"], "test-app")
assert.Equal(t, prometheusPayload[1].Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload[1].Labels["rollouts_pod_template_hash"], "597b5bd8cc")

}

func TestMergePrometheusLabelPayload(t *testing.T) {
Expand All @@ -27,13 +41,13 @@ func TestMergePrometheusLabelPayload(t *testing.T) {
assert.NoError(t, err)
prometheusPayload := origiObj.ConvertToPrometheusPayload("test")
labels := map[string]string{"label1": "value1", "label2": "value1"}
prometheusPayload.mergeLabels(labels)
assert.Equal(t, prometheusPayload.TimestampMs, int64(1701201827))
assert.Equal(t, prometheusPayload.Value, 1.2)
assert.Equal(t, prometheusPayload.Labels["app"], "test-app")
assert.Equal(t, prometheusPayload.Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload.Labels["rollouts_pod_template_hash"], "597b5bd8cc")
assert.Equal(t, prometheusPayload.Labels["label1"], "value1")
prometheusPayload[0].mergeLabels(labels)
assert.Equal(t, prometheusPayload[0].TimestampMs, int64(1701201827))
assert.Equal(t, prometheusPayload[0].Value, 1.2)
assert.Equal(t, prometheusPayload[0].Labels["app"], "test-app")
assert.Equal(t, prometheusPayload[0].Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload[0].Labels["rollouts_pod_template_hash"], "597b5bd8cc")
assert.Equal(t, prometheusPayload[0].Labels["label1"], "value1")
}

func TestExcludePrometheusLabelPayload(t *testing.T) {
Expand All @@ -44,21 +58,21 @@ func TestExcludePrometheusLabelPayload(t *testing.T) {
prometheusPayload := origiObj.ConvertToPrometheusPayload("test")
labels := map[string]string{"label1": "value1", "label2": "value1"}
exlabels := []string{"label1", "numalogic"}
prometheusPayload.mergeLabels(labels)
prometheusPayload.excludeLabels(exlabels)
assert.Equal(t, prometheusPayload.TimestampMs, int64(1701201827))
assert.Equal(t, prometheusPayload.Value, 1.2)
assert.Equal(t, prometheusPayload.Labels["app"], "test-app")
assert.Equal(t, prometheusPayload.Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload.Labels["rollouts_pod_template_hash"], "597b5bd8cc")
assert.NotContains(t, prometheusPayload.Labels, "label1")
assert.NotContains(t, prometheusPayload.Labels, "numalogic")
prometheusPayload[0].mergeLabels(labels)
prometheusPayload[0].excludeLabels(exlabels)
assert.Equal(t, prometheusPayload[0].TimestampMs, int64(1701201827))
assert.Equal(t, prometheusPayload[0].Value, 1.2)
assert.Equal(t, prometheusPayload[0].Labels["app"], "test-app")
assert.Equal(t, prometheusPayload[0].Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload[0].Labels["rollouts_pod_template_hash"], "597b5bd8cc")
assert.NotContains(t, prometheusPayload[0].Labels, "label1")
assert.NotContains(t, prometheusPayload[0].Labels, "numalogic")

prometheusPayload.excludeLabels(nil)
prometheusPayload.excludeLabels(strings.Split("", ","))
assert.Equal(t, prometheusPayload.Labels["app"], "test-app")
assert.Equal(t, prometheusPayload.Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload.Labels["rollouts_pod_template_hash"], "597b5bd8cc")
assert.NotContains(t, prometheusPayload.Labels, "label1")
assert.NotContains(t, prometheusPayload.Labels, "numalogic")
prometheusPayload[0].excludeLabels(nil)
prometheusPayload[0].excludeLabels(strings.Split("", ","))
assert.Equal(t, prometheusPayload[0].Labels["app"], "test-app")
assert.Equal(t, prometheusPayload[0].Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload[0].Labels["rollouts_pod_template_hash"], "597b5bd8cc")
assert.NotContains(t, prometheusPayload[0].Labels, "label1")
assert.NotContains(t, prometheusPayload[0].Labels, "numalogic")
}