Skip to content

Commit

Permalink
Support holt_winters function
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
  • Loading branch information
SungJin1212 committed Feb 21, 2025
1 parent b3fe23d commit 32ef5d3
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [FEATURE] Query Frontend: Add dynamic interval size for query splitting. This is enabled by configuring experimental flags `querier.max-shards-per-query` and/or `querier.max-fetched-data-duration-per-query`. The split interval size is dynamically increased to maintain a number of shards and total duration fetched below the configured values. #6458
* [ENHANCEMENT] Add `compactor.auto-forget-delay` for compactor to auto forget compactors after X minutes without heartbeat. #6533
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
* [ENHANCEMENT] Update prometheus version to v3.1.0. #6583
* [ENHANCEMENT] StoreGateway: Emit more histogram buckets on the `cortex_querier_storegateway_refetches_per_query` metric. #6570
* [ENHANCEMENT] Querier: Apply bytes limiter to LabelNames and LabelValuesForLabelNames. #6568
* [ENHANCEMENT] Query Frontend: Add a `too_many_tenants` reason label value to `cortex_rejected_queries_total` metric to track the rejected query count due to the # of tenant limits. #6569
Expand Down
95 changes: 95 additions & 0 deletions integration/backward_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@ package integration

import (
"fmt"
"path"
"strconv"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
)

type versionsImagesFlags struct {
Expand Down Expand Up @@ -118,6 +122,97 @@ func TestNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T) {
}
}

// Test cortex which uses Prometheus v3.x can support holt_winters function
func TestCanSupportHoltWintersFunc(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))

flags := mergeFlags(
AlertmanagerLocalFlags(),
map[string]string{
"-store.engine": blocksStorageEngine,
"-blocks-storage.backend": "filesystem",
"-blocks-storage.tsdb.head-compaction-interval": "4m",
"-blocks-storage.tsdb.block-ranges-period": "2h",
"-blocks-storage.tsdb.ship-interval": "1h",
"-blocks-storage.bucket-store.sync-interval": "15m",
"-blocks-storage.tsdb.retention-period": "2h",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
"-querier.query-store-for-labels-enabled": "true",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
},
)
// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

path := path.Join(s.SharedDir(), "cortex-1")

flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})
// Start Cortex replicas.
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

// Wait until Cortex replicas have updated the ring state.
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

now := time.Now()
// Push some series to Cortex.
start := now.Add(-time.Minute * 120)
end := now
scrapeInterval := 30 * time.Second

numSeries := 10
numSamples := 240
serieses := make([]prompb.TimeSeries, numSeries)
lbls := make([]labels.Labels, numSeries)
for i := 0; i < numSeries; i++ {
series := e2e.GenerateSeriesWithSamples("test_series", start, scrapeInterval, i*numSamples, numSamples, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "series", Value: strconv.Itoa(i)})
serieses[i] = series

builder := labels.NewBuilder(labels.EmptyLabels())
for _, lbl := range series.Labels {
builder.Set(lbl.Name, lbl.Value)
}
lbls[i] = builder.Labels()
}

res, err := c.Push(serieses)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// Test range query
query := "holt_winters(test_series[2h], 0.5, 0.5)"
rangeResult, err := c.QueryRange(query, start, end, scrapeInterval)
require.NoError(t, err)
matrix, ok := rangeResult.(model.Matrix)
require.True(t, ok)
require.True(t, matrix.Len() > 0)

// Test instant query
instantResult, err := c.Query(query, now)
require.NoError(t, err)
vector, ok := instantResult.(model.Vector)
require.True(t, ok)
require.True(t, vector.Len() > 0)
}

func blocksStorageFlagsWithFlushOnShutdown() map[string]string {
return mergeFlags(BlocksStorageFlags(), map[string]string{
"-blocks-storage.tsdb.flush-blocks-on-shutdown": "true",
Expand Down
7 changes: 7 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor

// set EnableExperimentalFunctions
parser.EnableExperimentalFunctions = cfg.EnablePromQLExperimentalFunctions
// The holt_winters function is renamed to double_exponential_smoothing and has been experimental since Prometheus v3. (/~https://github.com/prometheus/prometheus/pull/14930)
// The cortex supports holt_winters for users using this function.
holtWinters := *parser.Functions["double_exponential_smoothing"]
holtWinters.Experimental = false
holtWinters.Name = "holt_winters"
parser.Functions["holt_winters"] = &holtWinters
promql.FunctionCalls["holt_winters"] = promql.FunctionCalls["double_exponential_smoothing"]

var queryEngine promql.QueryEngine
opts := promql.EngineOpts{
Expand Down
8 changes: 8 additions & 0 deletions pkg/querier/tripperware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/thanos/pkg/querysharding"
"github.com/weaveworks/common/httpgrpc"
Expand Down Expand Up @@ -121,6 +122,13 @@ func NewQueryTripperware(

// set EnableExperimentalFunctions
parser.EnableExperimentalFunctions = enablePromQLExperimentalFunctions
// The holt_winters function is renamed to double_exponential_smoothing and has been experimental since Prometheus v3. (/~https://github.com/prometheus/prometheus/pull/14930)
// The cortex supports holt_winters for users using this function.
holtWinters := *parser.Functions["double_exponential_smoothing"]
holtWinters.Experimental = false
holtWinters.Name = "holt_winters"
parser.Functions["holt_winters"] = &holtWinters
promql.FunctionCalls["holt_winters"] = promql.FunctionCalls["double_exponential_smoothing"]

// Per tenant query metrics.
queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Expand Down
1 change: 0 additions & 1 deletion pkg/querier/tripperware/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (m mockMiddleware) Do(_ context.Context, req Request) (Response, error) {
}

func TestRoundTrip(t *testing.T) {
t.Parallel()
s := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte("bar"))
Expand Down

0 comments on commit 32ef5d3

Please sign in to comment.