Skip to content

Commit

Permalink
[processor/transform] Replace ParserCollection and add initial suppor…
Browse files Browse the repository at this point in the history
…t for context inference (open-telemetry#37272)
  • Loading branch information
edmocosta authored Jan 22, 2025
1 parent 6156ecb commit 4ebb7af
Show file tree
Hide file tree
Showing 14 changed files with 1,938 additions and 311 deletions.
27 changes: 27 additions & 0 deletions .chloggen/replace-pc-and-add-intial-contextinference-support.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/transformprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Replace parser collection implementations with `ottl.ParserCollection` and add initial support for expressing statement's context via path names.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29017]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
56 changes: 55 additions & 1 deletion processor/transformprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,63 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "bad_syntax_multi_signal"),
errorLen: 3,
},
{
id: component.NewIDWithName(metadata.Type, "structured_configuration_with_path_context"),
expected: &Config{
ErrorMode: ottl.PropagateError,
TraceStatements: []common.ContextStatements{
{
Context: "span",
Statements: []string{`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`},
},
},
MetricStatements: []common.ContextStatements{
{
Context: "metric",
Statements: []string{`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`},
},
},
LogStatements: []common.ContextStatements{
{
Context: "log",
Statements: []string{`set(log.body, "bear") where log.attributes["http.path"] == "/animal"`},
},
},
},
},
{
id: component.NewIDWithName(metadata.Type, "structured_configuration_with_inferred_context"),
expected: &Config{
ErrorMode: ottl.PropagateError,
TraceStatements: []common.ContextStatements{
{
Statements: []string{
`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`,
`set(resource.attributes["name"], "bear")`,
},
},
},
MetricStatements: []common.ContextStatements{
{
Statements: []string{
`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`,
`set(resource.attributes["name"], "bear")`,
},
},
},
LogStatements: []common.ContextStatements{
{
Statements: []string{
`set(log.body, "bear") where log.attributes["http.path"] == "/animal"`,
`set(resource.attributes["name"], "bear")`,
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
t.Run(tt.id.Name(), func(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
assert.NoError(t, err)

Expand Down
16 changes: 16 additions & 0 deletions processor/transformprocessor/internal/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ package common // import "github.com/open-telemetry/opentelemetry-collector-cont
import (
"fmt"
"strings"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
)

var _ ottl.StatementsGetter = (*ContextStatements)(nil)

type ContextID string

const (
Expand Down Expand Up @@ -36,3 +40,15 @@ type ContextStatements struct {
Conditions []string `mapstructure:"conditions"`
Statements []string `mapstructure:"statements"`
}

func (c ContextStatements) GetStatements() []string {
return c.Statements
}

func toContextStatements(statements any) (*ContextStatements, error) {
contextStatements, ok := statements.(ContextStatements)
if !ok {
return nil, fmt.Errorf("invalid context statements type, expected: common.ContextStatements, got: %T", statements)
}
return &contextStatements, nil
}
100 changes: 41 additions & 59 deletions processor/transformprocessor/internal/common/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,37 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope"
)

var _ consumer.Logs = &logStatements{}
type LogsConsumer interface {
Context() ContextID
ConsumeLogs(ctx context.Context, ld plog.Logs, cache *pcommon.Map) error
}

type logStatements struct {
ottl.StatementSequence[ottllog.TransformContext]
expr.BoolExpr[ottllog.TransformContext]
}

func (l logStatements) Capabilities() consumer.Capabilities {
return consumer.Capabilities{
MutatesData: true,
}
func (l logStatements) Context() ContextID {
return Log
}

func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs, cache *pcommon.Map) error {
for i := 0; i < ld.ResourceLogs().Len(); i++ {
rlogs := ld.ResourceLogs().At(i)
for j := 0; j < rlogs.ScopeLogs().Len(); j++ {
slogs := rlogs.ScopeLogs().At(j)
logs := slogs.LogRecords()
for k := 0; k < logs.Len(); k++ {
tCtx := ottllog.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource(), slogs, rlogs)
tCtx := ottllog.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource(), slogs, rlogs, ottllog.WithCache(cache))
condition, err := l.BoolExpr.Eval(ctx, tCtx)
if err != nil {
return err
Expand All @@ -55,76 +54,59 @@ func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
return nil
}

type LogParserCollection struct {
parserCollection
logParser ottl.Parser[ottllog.TransformContext]
}
type LogParserCollection ottl.ParserCollection[LogsConsumer]

type LogParserCollectionOption func(*LogParserCollection) error
type LogParserCollectionOption ottl.ParserCollectionOption[LogsConsumer]

func WithLogParser(functions map[string]ottl.Factory[ottllog.TransformContext]) LogParserCollectionOption {
return func(lp *LogParserCollection) error {
logParser, err := ottllog.NewParser(functions, lp.settings)
return func(pc *ottl.ParserCollection[LogsConsumer]) error {
logParser, err := ottllog.NewParser(functions, pc.Settings, ottllog.EnablePathContextNames())
if err != nil {
return err
}
lp.logParser = logParser
return nil
return ottl.WithParserCollectionContext(ottllog.ContextName, &logParser, convertLogStatements)(pc)
}
}

func WithLogErrorMode(errorMode ottl.ErrorMode) LogParserCollectionOption {
return func(lp *LogParserCollection) error {
lp.errorMode = errorMode
return nil
}
return LogParserCollectionOption(ottl.WithParserCollectionErrorMode[LogsConsumer](errorMode))
}

func NewLogParserCollection(settings component.TelemetrySettings, options ...LogParserCollectionOption) (*LogParserCollection, error) {
rp, err := ottlresource.NewParser(ResourceFunctions(), settings)
pcOptions := []ottl.ParserCollectionOption[LogsConsumer]{
withCommonContextParsers[LogsConsumer](),
}

for _, option := range options {
pcOptions = append(pcOptions, ottl.ParserCollectionOption[LogsConsumer](option))
}

pc, err := ottl.NewParserCollection(settings, pcOptions...)
if err != nil {
return nil, err
}
sp, err := ottlscope.NewParser(ScopeFunctions(), settings)

lpc := LogParserCollection(*pc)
return &lpc, nil
}

func convertLogStatements(pc *ottl.ParserCollection[LogsConsumer], _ *ottl.Parser[ottllog.TransformContext], _ string, statements ottl.StatementsGetter, parsedStatements []*ottl.Statement[ottllog.TransformContext]) (LogsConsumer, error) {
contextStatements, err := toContextStatements(statements)
if err != nil {
return nil, err
}
lpc := &LogParserCollection{
parserCollection: parserCollection{
settings: settings,
resourceParser: rp,
scopeParser: sp,
},
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForLog, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardLogFuncs())
if errGlobalBoolExpr != nil {
return nil, errGlobalBoolExpr
}

for _, op := range options {
err := op(lpc)
if err != nil {
return nil, err
}
}

return lpc, nil
lStatements := ottllog.NewStatementSequence(parsedStatements, pc.Settings, ottllog.WithStatementSequenceErrorMode(pc.ErrorMode))
return logStatements{lStatements, globalExpr}, nil
}

func (pc LogParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Logs, error) {
switch contextStatements.Context {
case Log:
parsedStatements, err := pc.logParser.ParseStatements(contextStatements.Statements)
if err != nil {
return nil, err
}
globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForLog, contextStatements.Conditions, pc.parserCollection, filterottl.StandardLogFuncs())
if errGlobalBoolExpr != nil {
return nil, errGlobalBoolExpr
}
lStatements := ottllog.NewStatementSequence(parsedStatements, pc.settings, ottllog.WithStatementSequenceErrorMode(pc.errorMode))
return logStatements{lStatements, globalExpr}, nil
default:
statements, err := pc.parseCommonContextStatements(contextStatements)
if err != nil {
return nil, err
}
return statements, nil
func (lpc *LogParserCollection) ParseContextStatements(contextStatements ContextStatements) (LogsConsumer, error) {
pc := ottl.ParserCollection[LogsConsumer](*lpc)
if contextStatements.Context != "" {
return pc.ParseStatementsWithContext(string(contextStatements.Context), contextStatements, true)
}
return pc.ParseStatements(contextStatements)
}
Loading

0 comments on commit 4ebb7af

Please sign in to comment.