Skip to content

Commit

Permalink
Numalogic original payload support (#34)
Browse files Browse the repository at this point in the history
Signed-off-by: sarabala1979 <sarabala1979@gmail.com>
  • Loading branch information
sarabala1979 authored Jun 7, 2024
1 parent 213ee50 commit 7b68e65
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 44 deletions.
29 changes: 18 additions & 11 deletions prometheus-pusher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (p *prometheusSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.
failed = failed.Append(sinksdk.ResponseFailure(datum.ID(), "failed to push the metrics"))
}
var pls []PrometheusPayload
var prometheusPayload PrometheusPayload
var prometheusPayloads []*PrometheusPayload
for _, payloadMsg := range payloads {
p.metrics.IncreaseTotalPushed()
if p.enableMsgTransformer {
Expand All @@ -125,21 +125,27 @@ func (p *prometheusSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.
p.metrics.IncreaseTotalSkipped()
return failed
}
prometheusPayload = *opl.ConvertToPrometheusPayload(p.metricsName)
prometheusPayloads = opl.ConvertToPrometheusPayload(p.metricsName)
for _, prometheusPayload := range prometheusPayloads {
prometheusPayload.mergeLabels(p.labels)
if len(p.excludeLabels) > 0 {
prometheusPayload.excludeLabels(p.excludeLabels)
}
pls = append(pls, *prometheusPayload)
}
} else {
var prometheusPayload PrometheusPayload
err := json.Unmarshal([]byte(payloadMsg), &prometheusPayload)
if !p.skipFailed && err != nil {
p.metrics.IncreaseTotalSkipped()
return failed
}
prometheusPayload.mergeLabels(p.labels)
if len(p.excludeLabels) > 0 {
prometheusPayload.excludeLabels(p.excludeLabels)
}
pls = append(pls, prometheusPayload)
}
prometheusPayload.mergeLabels(p.labels)

if len(p.excludeLabels) > 0 {
prometheusPayload.excludeLabels(p.excludeLabels)
}

pls = append(pls, prometheusPayload)
}
err := p.push(pls)
if err != nil {
Expand Down Expand Up @@ -215,8 +221,9 @@ func main() {
}
}

ps := prometheusSink{logger: logger, skipFailed: skipFailed, labels: labels, excludeLabels: excludeLabels, ignoreMetricsTs: ignoreMetricsTs,
metricsName: metricName, enableMsgTransformer: enableMsgTransformer}
ps := prometheusSink{logger: logger, skipFailed: skipFailed, labels: labels, excludeLabels: excludeLabels,
ignoreMetricsTs: ignoreMetricsTs, metricsName: metricName, enableMsgTransformer: enableMsgTransformer}

ps.metrics = NewMetricsServer(labels)
go ps.metrics.startMetricServer(metricPort)
ps.logger.Infof("Metrics publisher initialized with port=%d", metricPort)
Expand Down
23 changes: 19 additions & 4 deletions prometheus-pusher/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ type OriginalPayload struct {
Metadata map[string]interface{} `json:"metadata"`
}

func (op *OriginalPayload) ConvertToPrometheusPayload(metricName string) *PrometheusPayload {
func (op *OriginalPayload) ConvertToPrometheusPayload(metricName string) []*PrometheusPayload {
payloads := make([]*PrometheusPayload, 0)

value, err := strconv.ParseFloat(fmt.Sprintf("%.4f", op.UnifiedAnomaly), 64)
if err != nil {
Expand All @@ -74,15 +75,29 @@ func (op *OriginalPayload) ConvertToPrometheusPayload(metricName string) *Promet
if namespace == nil {
namespace = ""
}
payload := &PrometheusPayload{
payloads = append(payloads, &PrometheusPayload{
Name: metricName,
TimestampMs: op.Timestamp,
Namespace: namespace.(string),
Subsystem: "none",
Type: "Gauge",
Value: value,
Labels: labels,
})
for name, metricVal := range op.Data {
value, err := strconv.ParseFloat(fmt.Sprintf("%.4f", metricVal), 64)
if err != nil {
value = 0
}
payloads = append(payloads, &PrometheusPayload{
Name: fmt.Sprintf("%s_anomaly", name),
TimestampMs: op.Timestamp,
Namespace: namespace.(string),
Subsystem: "none",
Type: "Gauge",
Value: value,
Labels: labels,
})
}
return payload

return payloads
}
72 changes: 43 additions & 29 deletions prometheus-pusher/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,30 @@ import (
)

func TestConvertToPrometheusPayload(t *testing.T) {
JsonStr := `{"uuid":"35e0dc4603c845c9b999f5f669c64606","config_id":"test","composite_keys":["test_namespace","test_app","597b5bd8cc"],"timestamp":1701201827,"unified_anomaly":1.2,"data":{"namespace_app_rollouts_http_request_error_rate":null},"metadata":{"model_version":0,"artifact_versions":{"MinMaxScaler":"0","LSTMAE":"0","StdDevThreshold":"0"},"app":"test-app","intuit_alert":"true","namespace":"test-namespace","numalogic":"true","prometheus":"k8s-prometheus","rollouts_pod_template_hash":"597b5bd8cc"}}`
JsonStr := `{"uuid":"35e0dc4603c845c9b999f5f669c64606","config_id":"test","composite_keys":["test_namespace","test_app","597b5bd8cc"],"timestamp":1701201827,"unified_anomaly":1.2,"data":{"namespace_app_rollouts_cpu_utilization":0.517299409015888,"namespace_app_rollouts_http_request_error_rate":0.517299409015888,"namespace_app_rollouts_memory_utilization":0.517299409015888,"namespace_app_rollouts_http_requests_latency":0.517299409015888},"metadata":{"model_version":0,"artifact_versions":{"MinMaxScaler":"0","LSTMAE":"0","StdDevThreshold":"0"},"app":"test-app","intuit_alert":"true","namespace":"test-namespace","numalogic":"true","prometheus":"k8s-prometheus","rollouts_pod_template_hash":"597b5bd8cc"}}`

var origiObj OriginalPayload
err := json.Unmarshal([]byte(JsonStr), &origiObj)
assert.NoError(t, err)
prometheusPayload := origiObj.ConvertToPrometheusPayload("test")
assert.Equal(t, prometheusPayload.TimestampMs, int64(1701201827))
assert.Equal(t, prometheusPayload.Value, 1.2)
assert.Equal(t, prometheusPayload.Labels["app"], "test-app")
assert.Equal(t, prometheusPayload.Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload.Labels["rollouts_pod_template_hash"], "597b5bd8cc")
assert.Equal(t, prometheusPayload[0].TimestampMs, int64(1701201827))
assert.Equal(t, prometheusPayload[0].Value, 1.2)
assert.Equal(t, prometheusPayload[0].Labels["app"], "test-app")
assert.Equal(t, prometheusPayload[0].Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload[0].Labels["rollouts_pod_template_hash"], "597b5bd8cc")

payloadJson, err := json.Marshal(prometheusPayload)
assert.NoError(t, err)
assert.Equal(t, prometheusPayload[1].TimestampMs, int64(1701201827))
assert.Equal(t, 0.5173, prometheusPayload[1].Value)
assert.Contains(t, string(payloadJson), "namespace_app_rollouts_cpu_utilization_anomaly")
assert.Contains(t, string(payloadJson), "namespace_app_rollouts_memory_utilization_anomaly")
assert.Contains(t, string(payloadJson), "namespace_app_rollouts_http_request_error_rate_anomaly")
assert.Contains(t, string(payloadJson), "namespace_app_rollouts_http_requests_latency_anomaly")
assert.Equal(t, prometheusPayload[1].Labels["app"], "test-app")
assert.Equal(t, prometheusPayload[1].Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload[1].Labels["rollouts_pod_template_hash"], "597b5bd8cc")

}

func TestMergePrometheusLabelPayload(t *testing.T) {
Expand All @@ -27,13 +41,13 @@ func TestMergePrometheusLabelPayload(t *testing.T) {
assert.NoError(t, err)
prometheusPayload := origiObj.ConvertToPrometheusPayload("test")
labels := map[string]string{"label1": "value1", "label2": "value1"}
prometheusPayload.mergeLabels(labels)
assert.Equal(t, prometheusPayload.TimestampMs, int64(1701201827))
assert.Equal(t, prometheusPayload.Value, 1.2)
assert.Equal(t, prometheusPayload.Labels["app"], "test-app")
assert.Equal(t, prometheusPayload.Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload.Labels["rollouts_pod_template_hash"], "597b5bd8cc")
assert.Equal(t, prometheusPayload.Labels["label1"], "value1")
prometheusPayload[0].mergeLabels(labels)
assert.Equal(t, prometheusPayload[0].TimestampMs, int64(1701201827))
assert.Equal(t, prometheusPayload[0].Value, 1.2)
assert.Equal(t, prometheusPayload[0].Labels["app"], "test-app")
assert.Equal(t, prometheusPayload[0].Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload[0].Labels["rollouts_pod_template_hash"], "597b5bd8cc")
assert.Equal(t, prometheusPayload[0].Labels["label1"], "value1")
}

func TestExcludePrometheusLabelPayload(t *testing.T) {
Expand All @@ -44,21 +58,21 @@ func TestExcludePrometheusLabelPayload(t *testing.T) {
prometheusPayload := origiObj.ConvertToPrometheusPayload("test")
labels := map[string]string{"label1": "value1", "label2": "value1"}
exlabels := []string{"label1", "numalogic"}
prometheusPayload.mergeLabels(labels)
prometheusPayload.excludeLabels(exlabels)
assert.Equal(t, prometheusPayload.TimestampMs, int64(1701201827))
assert.Equal(t, prometheusPayload.Value, 1.2)
assert.Equal(t, prometheusPayload.Labels["app"], "test-app")
assert.Equal(t, prometheusPayload.Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload.Labels["rollouts_pod_template_hash"], "597b5bd8cc")
assert.NotContains(t, prometheusPayload.Labels, "label1")
assert.NotContains(t, prometheusPayload.Labels, "numalogic")
prometheusPayload[0].mergeLabels(labels)
prometheusPayload[0].excludeLabels(exlabels)
assert.Equal(t, prometheusPayload[0].TimestampMs, int64(1701201827))
assert.Equal(t, prometheusPayload[0].Value, 1.2)
assert.Equal(t, prometheusPayload[0].Labels["app"], "test-app")
assert.Equal(t, prometheusPayload[0].Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload[0].Labels["rollouts_pod_template_hash"], "597b5bd8cc")
assert.NotContains(t, prometheusPayload[0].Labels, "label1")
assert.NotContains(t, prometheusPayload[0].Labels, "numalogic")

prometheusPayload.excludeLabels(nil)
prometheusPayload.excludeLabels(strings.Split("", ","))
assert.Equal(t, prometheusPayload.Labels["app"], "test-app")
assert.Equal(t, prometheusPayload.Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload.Labels["rollouts_pod_template_hash"], "597b5bd8cc")
assert.NotContains(t, prometheusPayload.Labels, "label1")
assert.NotContains(t, prometheusPayload.Labels, "numalogic")
prometheusPayload[0].excludeLabels(nil)
prometheusPayload[0].excludeLabels(strings.Split("", ","))
assert.Equal(t, prometheusPayload[0].Labels["app"], "test-app")
assert.Equal(t, prometheusPayload[0].Labels["namespace"], "test-namespace")
assert.Equal(t, prometheusPayload[0].Labels["rollouts_pod_template_hash"], "597b5bd8cc")
assert.NotContains(t, prometheusPayload[0].Labels, "label1")
assert.NotContains(t, prometheusPayload[0].Labels, "numalogic")
}

0 comments on commit 7b68e65

Please sign in to comment.