From 7b68e655ca614c809dd35ea603bce71166d394fc Mon Sep 17 00:00:00 2001 From: Saravanan Balasubramanian <33908564+sarabala1979@users.noreply.github.com> Date: Fri, 7 Jun 2024 11:09:36 -0700 Subject: [PATCH] Numalogic original payload support (#34) Signed-off-by: sarabala1979 --- prometheus-pusher/main.go | 29 ++++++++----- prometheus-pusher/payload.go | 23 ++++++++-- prometheus-pusher/payload_test.go | 72 ++++++++++++++++++------------- 3 files changed, 80 insertions(+), 44 deletions(-) diff --git a/prometheus-pusher/main.go b/prometheus-pusher/main.go index 91dc277..22e989d 100644 --- a/prometheus-pusher/main.go +++ b/prometheus-pusher/main.go @@ -115,7 +115,7 @@ func (p *prometheusSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk. failed = failed.Append(sinksdk.ResponseFailure(datum.ID(), "failed to push the metrics")) } var pls []PrometheusPayload - var prometheusPayload PrometheusPayload + var prometheusPayloads []*PrometheusPayload for _, payloadMsg := range payloads { p.metrics.IncreaseTotalPushed() if p.enableMsgTransformer { @@ -125,21 +125,27 @@ func (p *prometheusSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk. p.metrics.IncreaseTotalSkipped() return failed } - prometheusPayload = *opl.ConvertToPrometheusPayload(p.metricsName) + prometheusPayloads = opl.ConvertToPrometheusPayload(p.metricsName) + for _, prometheusPayload := range prometheusPayloads { + prometheusPayload.mergeLabels(p.labels) + if len(p.excludeLabels) > 0 { + prometheusPayload.excludeLabels(p.excludeLabels) + } + pls = append(pls, *prometheusPayload) + } } else { + var prometheusPayload PrometheusPayload err := json.Unmarshal([]byte(payloadMsg), &prometheusPayload) if !p.skipFailed && err != nil { p.metrics.IncreaseTotalSkipped() return failed } + prometheusPayload.mergeLabels(p.labels) + if len(p.excludeLabels) > 0 { + prometheusPayload.excludeLabels(p.excludeLabels) + } + pls = append(pls, prometheusPayload) } - prometheusPayload.mergeLabels(p.labels) - - if len(p.excludeLabels) > 0 { - prometheusPayload.excludeLabels(p.excludeLabels) - } - - pls = append(pls, prometheusPayload) } err := p.push(pls) if err != nil { @@ -215,8 +221,9 @@ func main() { } } - ps := prometheusSink{logger: logger, skipFailed: skipFailed, labels: labels, excludeLabels: excludeLabels, ignoreMetricsTs: ignoreMetricsTs, - metricsName: metricName, enableMsgTransformer: enableMsgTransformer} + ps := prometheusSink{logger: logger, skipFailed: skipFailed, labels: labels, excludeLabels: excludeLabels, + ignoreMetricsTs: ignoreMetricsTs, metricsName: metricName, enableMsgTransformer: enableMsgTransformer} + ps.metrics = NewMetricsServer(labels) go ps.metrics.startMetricServer(metricPort) ps.logger.Infof("Metrics publisher initialized with port=%d", metricPort) diff --git a/prometheus-pusher/payload.go b/prometheus-pusher/payload.go index 2bbb7fb..dde2be9 100644 --- a/prometheus-pusher/payload.go +++ b/prometheus-pusher/payload.go @@ -51,7 +51,8 @@ type OriginalPayload struct { Metadata map[string]interface{} `json:"metadata"` } -func (op *OriginalPayload) ConvertToPrometheusPayload(metricName string) *PrometheusPayload { +func (op *OriginalPayload) ConvertToPrometheusPayload(metricName string) []*PrometheusPayload { + payloads := make([]*PrometheusPayload, 0) value, err := strconv.ParseFloat(fmt.Sprintf("%.4f", op.UnifiedAnomaly), 64) if err != nil { @@ -74,7 +75,7 @@ func (op *OriginalPayload) ConvertToPrometheusPayload(metricName string) *Promet if namespace == nil { namespace = "" } - payload := &PrometheusPayload{ + payloads = append(payloads, &PrometheusPayload{ Name: metricName, TimestampMs: op.Timestamp, Namespace: namespace.(string), @@ -82,7 +83,21 @@ func (op *OriginalPayload) ConvertToPrometheusPayload(metricName string) *Promet Type: "Gauge", Value: value, Labels: labels, + }) + for name, metricVal := range op.Data { + value, err := strconv.ParseFloat(fmt.Sprintf("%.4f", metricVal), 64) + if err != nil { + value = 0 + } + payloads = append(payloads, &PrometheusPayload{ + Name: fmt.Sprintf("%s_anomaly", name), + TimestampMs: op.Timestamp, + Namespace: namespace.(string), + Subsystem: "none", + Type: "Gauge", + Value: value, + Labels: labels, + }) } - return payload - + return payloads } diff --git a/prometheus-pusher/payload_test.go b/prometheus-pusher/payload_test.go index 6ded201..45b2348 100644 --- a/prometheus-pusher/payload_test.go +++ b/prometheus-pusher/payload_test.go @@ -8,16 +8,30 @@ import ( ) func TestConvertToPrometheusPayload(t *testing.T) { - JsonStr := `{"uuid":"35e0dc4603c845c9b999f5f669c64606","config_id":"test","composite_keys":["test_namespace","test_app","597b5bd8cc"],"timestamp":1701201827,"unified_anomaly":1.2,"data":{"namespace_app_rollouts_http_request_error_rate":null},"metadata":{"model_version":0,"artifact_versions":{"MinMaxScaler":"0","LSTMAE":"0","StdDevThreshold":"0"},"app":"test-app","intuit_alert":"true","namespace":"test-namespace","numalogic":"true","prometheus":"k8s-prometheus","rollouts_pod_template_hash":"597b5bd8cc"}}` + JsonStr := `{"uuid":"35e0dc4603c845c9b999f5f669c64606","config_id":"test","composite_keys":["test_namespace","test_app","597b5bd8cc"],"timestamp":1701201827,"unified_anomaly":1.2,"data":{"namespace_app_rollouts_cpu_utilization":0.517299409015888,"namespace_app_rollouts_http_request_error_rate":0.517299409015888,"namespace_app_rollouts_memory_utilization":0.517299409015888,"namespace_app_rollouts_http_requests_latency":0.517299409015888},"metadata":{"model_version":0,"artifact_versions":{"MinMaxScaler":"0","LSTMAE":"0","StdDevThreshold":"0"},"app":"test-app","intuit_alert":"true","namespace":"test-namespace","numalogic":"true","prometheus":"k8s-prometheus","rollouts_pod_template_hash":"597b5bd8cc"}}` + var origiObj OriginalPayload err := json.Unmarshal([]byte(JsonStr), &origiObj) assert.NoError(t, err) prometheusPayload := origiObj.ConvertToPrometheusPayload("test") - assert.Equal(t, prometheusPayload.TimestampMs, int64(1701201827)) - assert.Equal(t, prometheusPayload.Value, 1.2) - assert.Equal(t, prometheusPayload.Labels["app"], "test-app") - assert.Equal(t, prometheusPayload.Labels["namespace"], "test-namespace") - assert.Equal(t, prometheusPayload.Labels["rollouts_pod_template_hash"], "597b5bd8cc") + assert.Equal(t, prometheusPayload[0].TimestampMs, int64(1701201827)) + assert.Equal(t, prometheusPayload[0].Value, 1.2) + assert.Equal(t, prometheusPayload[0].Labels["app"], "test-app") + assert.Equal(t, prometheusPayload[0].Labels["namespace"], "test-namespace") + assert.Equal(t, prometheusPayload[0].Labels["rollouts_pod_template_hash"], "597b5bd8cc") + + payloadJson, err := json.Marshal(prometheusPayload) + assert.NoError(t, err) + assert.Equal(t, prometheusPayload[1].TimestampMs, int64(1701201827)) + assert.Equal(t, 0.5173, prometheusPayload[1].Value) + assert.Contains(t, string(payloadJson), "namespace_app_rollouts_cpu_utilization_anomaly") + assert.Contains(t, string(payloadJson), "namespace_app_rollouts_memory_utilization_anomaly") + assert.Contains(t, string(payloadJson), "namespace_app_rollouts_http_request_error_rate_anomaly") + assert.Contains(t, string(payloadJson), "namespace_app_rollouts_http_requests_latency_anomaly") + assert.Equal(t, prometheusPayload[1].Labels["app"], "test-app") + assert.Equal(t, prometheusPayload[1].Labels["namespace"], "test-namespace") + assert.Equal(t, prometheusPayload[1].Labels["rollouts_pod_template_hash"], "597b5bd8cc") + } func TestMergePrometheusLabelPayload(t *testing.T) { @@ -27,13 +41,13 @@ func TestMergePrometheusLabelPayload(t *testing.T) { assert.NoError(t, err) prometheusPayload := origiObj.ConvertToPrometheusPayload("test") labels := map[string]string{"label1": "value1", "label2": "value1"} - prometheusPayload.mergeLabels(labels) - assert.Equal(t, prometheusPayload.TimestampMs, int64(1701201827)) - assert.Equal(t, prometheusPayload.Value, 1.2) - assert.Equal(t, prometheusPayload.Labels["app"], "test-app") - assert.Equal(t, prometheusPayload.Labels["namespace"], "test-namespace") - assert.Equal(t, prometheusPayload.Labels["rollouts_pod_template_hash"], "597b5bd8cc") - assert.Equal(t, prometheusPayload.Labels["label1"], "value1") + prometheusPayload[0].mergeLabels(labels) + assert.Equal(t, prometheusPayload[0].TimestampMs, int64(1701201827)) + assert.Equal(t, prometheusPayload[0].Value, 1.2) + assert.Equal(t, prometheusPayload[0].Labels["app"], "test-app") + assert.Equal(t, prometheusPayload[0].Labels["namespace"], "test-namespace") + assert.Equal(t, prometheusPayload[0].Labels["rollouts_pod_template_hash"], "597b5bd8cc") + assert.Equal(t, prometheusPayload[0].Labels["label1"], "value1") } func TestExcludePrometheusLabelPayload(t *testing.T) { @@ -44,21 +58,21 @@ func TestExcludePrometheusLabelPayload(t *testing.T) { prometheusPayload := origiObj.ConvertToPrometheusPayload("test") labels := map[string]string{"label1": "value1", "label2": "value1"} exlabels := []string{"label1", "numalogic"} - prometheusPayload.mergeLabels(labels) - prometheusPayload.excludeLabels(exlabels) - assert.Equal(t, prometheusPayload.TimestampMs, int64(1701201827)) - assert.Equal(t, prometheusPayload.Value, 1.2) - assert.Equal(t, prometheusPayload.Labels["app"], "test-app") - assert.Equal(t, prometheusPayload.Labels["namespace"], "test-namespace") - assert.Equal(t, prometheusPayload.Labels["rollouts_pod_template_hash"], "597b5bd8cc") - assert.NotContains(t, prometheusPayload.Labels, "label1") - assert.NotContains(t, prometheusPayload.Labels, "numalogic") + prometheusPayload[0].mergeLabels(labels) + prometheusPayload[0].excludeLabels(exlabels) + assert.Equal(t, prometheusPayload[0].TimestampMs, int64(1701201827)) + assert.Equal(t, prometheusPayload[0].Value, 1.2) + assert.Equal(t, prometheusPayload[0].Labels["app"], "test-app") + assert.Equal(t, prometheusPayload[0].Labels["namespace"], "test-namespace") + assert.Equal(t, prometheusPayload[0].Labels["rollouts_pod_template_hash"], "597b5bd8cc") + assert.NotContains(t, prometheusPayload[0].Labels, "label1") + assert.NotContains(t, prometheusPayload[0].Labels, "numalogic") - prometheusPayload.excludeLabels(nil) - prometheusPayload.excludeLabels(strings.Split("", ",")) - assert.Equal(t, prometheusPayload.Labels["app"], "test-app") - assert.Equal(t, prometheusPayload.Labels["namespace"], "test-namespace") - assert.Equal(t, prometheusPayload.Labels["rollouts_pod_template_hash"], "597b5bd8cc") - assert.NotContains(t, prometheusPayload.Labels, "label1") - assert.NotContains(t, prometheusPayload.Labels, "numalogic") + prometheusPayload[0].excludeLabels(nil) + prometheusPayload[0].excludeLabels(strings.Split("", ",")) + assert.Equal(t, prometheusPayload[0].Labels["app"], "test-app") + assert.Equal(t, prometheusPayload[0].Labels["namespace"], "test-namespace") + assert.Equal(t, prometheusPayload[0].Labels["rollouts_pod_template_hash"], "597b5bd8cc") + assert.NotContains(t, prometheusPayload[0].Labels, "label1") + assert.NotContains(t, prometheusPayload[0].Labels, "numalogic") }