Skip to content

Commit

Permalink
engine: remove implicit fallback (#518)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <mhoffmann@cloudflare.com>
Co-authored-by: Michael Hoffmann <mhoffmann@cloudflare.com>
  • Loading branch information
MichaHoffmann and Michael Hoffmann authored Feb 21, 2025
1 parent fab1185 commit 4230034
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 430 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The project is currently under active development.

## Roadmap

The engine intends to have full compatibility with the original engine used in Prometheus. Since implementing the full specification will take time, we aim to add support for most commonly used expressions while falling back to the original engine for operations that are not yet supported. This will allow us to have smaller and faster releases, and gather feedback on a regular basis. Instructions on using the engine will be added after we have enough confidence in its correctness.
The engine intends to have full compatibility with the original engine used in Prometheus. Since implementing the full specification will take time, we aim to add support for most commonly used expressions. Instructions on using the engine will be added after we have enough confidence in its correctness. If the engine encounters an expression it does not support it will return an error that can be tested with `engine.IsUnimplemented(err)`, the calling code is expected to handle this fallback.

The following table shows operations which are currently supported by the engine

Expand Down
1 change: 0 additions & 1 deletion engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func BenchmarkSingleQuery(b *testing.B) {
query := "sum(rate(http_requests_total[2m]))"
opts := engine.Opts{
EngineOpts: promql.EngineOpts{Timeout: 100 * time.Second},
DisableFallback: true,
SelectorBatchSize: 256,
}
b.ReportAllocs()
Expand Down
12 changes: 5 additions & 7 deletions engine/distributed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,9 @@ func TestDistributedAggregations(t *testing.T) {
}

queries := []struct {
name string
query string
rangeStart time.Time
expectFallback bool
name string
query string
rangeStart time.Time
}{
{name: "binop with selector and constant series", query: `bar or on () vector(0)`},
{name: "binop with aggregation and constant series", query: `sum(bar) or on () vector(0)`},
Expand Down Expand Up @@ -226,7 +225,7 @@ func TestDistributedAggregations(t *testing.T) {
{name: "binary nested with constants", query: `(1 + 2) + (1 atan2 (-1 % -1))`},
{name: "binary nested with functions", query: `(1 + exp(vector(1))) + (1 atan2 (-1 % -1))`},
{name: "filtered selector interaction", query: `sum by (region) (bar{region="east"}) / sum by (region) (bar)`},
{name: "unsupported aggregation", query: `count_values("pod", bar)`, expectFallback: true},
{name: "unsupported aggregation", query: `count_values("pod", bar)`},
{name: "absent_over_time for non-existing metric", query: `absent_over_time(foo[2m])`},
{name: "absent_over_time for existing metric", query: `absent_over_time(bar{pod="nginx-1"}[2m])`},
{name: "absent for non-existing metric", query: `absent(foo)`},
Expand All @@ -249,7 +248,7 @@ func TestDistributedAggregations(t *testing.T) {
{name: "query with @start() absolute timestamp", query: `sum(bar @ start())`},
{name: "query with @end() timestamp", query: `sum(bar @ end())`},
{name: "query with numeric timestamp", query: `sum(bar @ 140.000)`},
{name: "query with range and @end() timestamp", query: `sum(count_over_time(bar[1h] @ end()))`, expectFallback: true},
{name: "query with range and @end() timestamp", query: `sum(count_over_time(bar[1h] @ end()))`},
{name: `subquery with @end() timestamp`, query: `bar @ 100.000 - bar @ 150.000`},
}

Expand Down Expand Up @@ -306,7 +305,6 @@ func TestDistributedAggregations(t *testing.T) {
for _, queryOpts := range allQueryOpts {
ctx := context.Background()
distOpts := localOpts
distOpts.DisableFallback = !query.expectFallback
for _, instantTS := range instantTSs {
t.Run(fmt.Sprintf("instant/ts=%d", instantTS.Unix()), func(t *testing.T) {
distEngine := engine.NewDistributedEngine(distOpts, api.NewStaticEndpoints(remoteEngines))
Expand Down
64 changes: 13 additions & 51 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"sort"
"time"

"github.com/thanos-io/promql-engine/execution/telemetry"

"github.com/efficientgo/core/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -27,6 +25,7 @@ import (
"github.com/thanos-io/promql-engine/execution/function"
"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/execution/parse"
"github.com/thanos-io/promql-engine/execution/telemetry"
"github.com/thanos-io/promql-engine/execution/warnings"
"github.com/thanos-io/promql-engine/extlabels"
"github.com/thanos-io/promql-engine/logicalplan"
Expand All @@ -39,7 +38,7 @@ type QueryType int

type engineMetrics struct {
currentQueries prometheus.Gauge
queries *prometheus.CounterVec
totalQueries prometheus.Counter
}

const (
Expand All @@ -50,16 +49,16 @@ const (
stepsBatch = 10
)

func IsUnimplemented(err error) bool {
return errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented)
}

type Opts struct {
promql.EngineOpts

// LogicalOptimizers are optimizers that are run if the value is not nil. If it is nil then the default optimizers are run. Default optimizer list is available in the logicalplan package.
LogicalOptimizers []logicalplan.Optimizer

// DisableFallback enables mode where engine returns error if some expression of feature is not yet implemented
// in the new engine, instead of falling back to prometheus engine.
DisableFallback bool

// ExtLookbackDelta specifies what time range to use to determine valid previous sample for extended range functions.
// Defaults to 1 hour if not specified.
ExtLookbackDelta time.Duration
Expand All @@ -71,9 +70,6 @@ type Opts struct {
// This will default to false.
EnableXFunctions bool

// FallbackEngine
Engine promql.QueryEngine

// EnableAnalysis enables query analysis.
EnableAnalysis bool

Expand Down Expand Up @@ -177,23 +173,16 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine {
Help: "The current number of queries being executed or waiting.",
},
),
queries: promauto.With(opts.Reg).NewCounterVec(
totalQueries: promauto.With(opts.Reg).NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queries_total",
Help: "Number of PromQL queries.",
}, []string{"fallback"},
},
),
}

var engine promql.QueryEngine
if opts.Engine == nil {
engine = promql.NewEngine(opts.EngineOpts)
} else {
engine = opts.Engine
}

decodingConcurrency := opts.DecodingConcurrency
if opts.DecodingConcurrency < 1 {
decodingConcurrency = runtime.GOMAXPROCS(0) / 2
Expand All @@ -208,13 +197,11 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine {
}

return &Engine{
prom: engine,
functions: functions,
scanners: scanners,
activeQueryTracker: queryTracker,

disableDuplicateLabelChecks: opts.DisableDuplicateLabelChecks,
disableFallback: opts.DisableFallback,

logger: opts.Logger,
lookbackDelta: opts.LookbackDelta,
Expand All @@ -240,13 +227,11 @@ var (
)

type Engine struct {
prom promql.QueryEngine
functions map[string]*parser.Function
scanners engstorage.Scanners
activeQueryTracker promql.QueryTracker

disableDuplicateLabelChecks bool
disableFallback bool

logger *slog.Logger
lookbackDelta time.Duration
Expand Down Expand Up @@ -290,14 +275,10 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts
ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
exec, err := execution.New(ctx, lplan.Root(), scanners, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewInstantQuery(ctx, q, opts, qs, ts)
}
e.metrics.queries.WithLabelValues("false").Inc()
if err != nil {
return nil, err
}
e.metrics.totalQueries.Inc()
return &compatibilityQuery{
Query: &Query{exec: exec, opts: opts},
engine: e,
Expand Down Expand Up @@ -338,14 +319,10 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab
}

exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewInstantQuery(ctx, q, opts, root.String(), ts)
}
e.metrics.queries.WithLabelValues("false").Inc()
if err != nil {
return nil, err
}
e.metrics.totalQueries.Inc()

return &compatibilityQuery{
Query: &Query{exec: exec, opts: opts},
Expand Down Expand Up @@ -396,14 +373,10 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *
}

exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewRangeQuery(ctx, q, opts, qs, start, end, step)
}
e.metrics.queries.WithLabelValues("false").Inc()
if err != nil {
return nil, err
}
e.metrics.totalQueries.Inc()

return &compatibilityQuery{
Query: &Query{exec: exec, opts: opts},
Expand Down Expand Up @@ -442,14 +415,11 @@ func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable
ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewRangeQuery(ctx, q, opts, lplan.Root().String(), start, end, step)
}
e.metrics.queries.WithLabelValues("false").Inc()
if err != nil {
return nil, err
}
e.metrics.totalQueries.Inc()

return &compatibilityQuery{
Query: &Query{exec: exec, opts: opts},
engine: e,
Expand Down Expand Up @@ -516,14 +486,6 @@ func (e *Engine) storageScanners(queryable storage.Queryable, qOpts *query.Optio
return e.scanners, nil
}

func (e *Engine) triggerFallback(err error) bool {
if e.disableFallback {
return false
}

return errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented)
}

type Query struct {
exec model.VectorOperator
opts promql.QueryOpts
Expand Down
Loading

0 comments on commit 4230034

Please sign in to comment.