Skip to content

Commit

Permalink
Add Badger OTEL exporter (#2269)
Browse files Browse the repository at this point in the history
* Add Badger OTEL exporter

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* Assert dir contains

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* store singleton badger factory in badger exporter

Signed-off-by: Pavol Loffay <ploffay@redhat.com>

* lint

Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay authored Jun 3, 2020
1 parent 6f3a169 commit e504fb9
Show file tree
Hide file tree
Showing 15 changed files with 420 additions and 66 deletions.
4 changes: 4 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.opentelemetry.io/collector/receiver/jaegerreceiver"
"go.opentelemetry.io/collector/receiver/zipkinreceiver"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/badger"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/grpcplugin"
Expand Down Expand Up @@ -145,6 +146,9 @@ func createExporters(component ComponentType, storageTypes string, factories con
case "memory":
mem := factories.Exporters[memory.TypeStr].CreateDefaultConfig()
exporters[memory.TypeStr] = mem
case "badger":
badg := factories.Exporters[badger.TypeStr].CreateDefaultConfig()
exporters[badger.TypeStr] = badg
case "cassandra":
cass := factories.Exporters[cassandra.TypeStr].CreateDefaultConfig()
exporters[cassandra.TypeStr] = cass
Expand Down
8 changes: 8 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/collector/service/defaultcomponents"

ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/badger"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/grpcplugin"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/jaegerreceiver"
kafkaRec "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/kafka"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/zipkinreceiver"
storageBadger "github.com/jaegertracing/jaeger/plugin/storage/badger"
storageCassandra "github.com/jaegertracing/jaeger/plugin/storage/cassandra"
storageEs "github.com/jaegertracing/jaeger/plugin/storage/es"
storageGrpc "github.com/jaegertracing/jaeger/plugin/storage/grpc"
Expand Down Expand Up @@ -70,6 +72,11 @@ func Components(v *viper.Viper) config.Factories {
return opts
}}
memoryExp := memory.NewFactory(v)
badgerExp := badger.NewFactory(func() *storageBadger.Options {
opts := badger.DefaultOptions()
opts.InitFromViper(v)
return opts
})
kafkaRec := &kafkaRec.Factory{OptionsFactory: func() *ingesterApp.Options {
opts := kafkaRec.DefaultOptions()
opts.InitFromViper(v)
Expand All @@ -82,6 +89,7 @@ func Components(v *viper.Viper) config.Factories {
factories.Exporters[esExp.Type()] = esExp
factories.Exporters[grpcExp.Type()] = grpcExp
factories.Exporters[memoryExp.Type()] = memoryExp
factories.Exporters[badgerExp.Type()] = badgerExp
factories.Receivers[kafkaRec.Type()] = kafkaRec

jaegerRec := factories.Receivers["jaeger"].(*otelJaegerReceiver.Factory)
Expand Down
2 changes: 2 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/badger"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/grpcplugin"
Expand All @@ -43,6 +44,7 @@ func TestComponents(t *testing.T) {
assert.IsType(t, &elasticsearch.Factory{}, factories.Exporters[elasticsearch.TypeStr])
assert.IsType(t, &grpcplugin.Factory{}, factories.Exporters[grpcplugin.TypeStr])
assert.IsType(t, &memory.Factory{}, factories.Exporters[memory.TypeStr])
assert.IsType(t, &badger.Factory{}, factories.Exporters[badger.TypeStr])
assert.IsType(t, &jaegerreceiver.Factory{}, factories.Receivers["jaeger"])
assert.IsType(t, &jaegerexporter.Factory{}, factories.Exporters["jaeger"])
assert.IsType(t, &kafkaRec.Factory{}, factories.Receivers[kafkaRec.TypeStr])
Expand Down
27 changes: 27 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/badger/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package badger

import (
"go.opentelemetry.io/collector/config/configmodels"

"github.com/jaegertracing/jaeger/plugin/storage/badger"
)

// Config holds configuration of Jaeger Badger exporter/storage.
type Config struct {
badger.Options `mapstructure:",squash"`
configmodels.ExporterSettings `mapstructure:",squash"`
}
77 changes: 77 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/badger/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package badger

import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config"

"github.com/jaegertracing/jaeger/cmd/flags"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
)

func TestDefaultConfig(t *testing.T) {
factory := NewFactory(func() *badger.Options {
opts := DefaultOptions()
v, _ := jConfig.Viperize(opts.AddFlags)
opts.InitFromViper(v)
return opts
})
defaultCfg := factory.CreateDefaultConfig().(*Config)
opts := defaultCfg.Options.GetPrimary()
assert.Contains(t, opts.KeyDirectory, "/data/keys")
assert.Contains(t, opts.ValueDirectory, "/data/values")
assert.Equal(t, true, opts.Ephemeral)
assert.Equal(t, false, opts.ReadOnly)
assert.Equal(t, false, opts.SyncWrites)
assert.Equal(t, false, opts.Truncate)
assert.Equal(t, time.Second*10, opts.MetricsUpdateInterval)
assert.Equal(t, time.Minute*5, opts.MaintenanceInterval)
assert.Equal(t, time.Hour*72, opts.SpanStoreTTL)
}

func TestLoadConfigAndFlags(t *testing.T) {
factories, err := config.ExampleComponents()
require.NoError(t, err)

v, c := jConfig.Viperize(DefaultOptions().AddFlags)
err = c.ParseFlags([]string{"--badger.directory-key=bar"})
require.NoError(t, err)

err = flags.TryLoadConfigFile(v)
require.NoError(t, err)

factory := NewFactory(func() *badger.Options {
opts := DefaultOptions()
opts.InitFromViper(v)
require.Equal(t, "bar", opts.GetPrimary().KeyDirectory)
return opts
})

factories.Exporters[TypeStr] = factory
colConfig, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, colConfig)

cfg := colConfig.Exporters[TypeStr].(*Config)
assert.Equal(t, TypeStr, cfg.Name())
assert.Equal(t, "key", cfg.GetPrimary().KeyDirectory)
}
131 changes: 131 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/badger/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package badger

import (
"context"
"fmt"
"sync"

"github.com/uber/jaeger-lib/metrics"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/storage"
)

const (
// TypeStr defines type of the Badger exporter.
TypeStr = "jaeger_badger"
)

// OptionsFactory returns initialized badger.Options structure.
type OptionsFactory func() *badger.Options

// DefaultOptions creates Badger options supported by this exporter.
func DefaultOptions() *badger.Options {
return badger.NewOptions("badger")
}

// Factory is the factory for Jaeger Cassandra exporter.
type Factory struct {
mutex *sync.Mutex
optionsFactory OptionsFactory
}

// NewFactory creates new Factory instance.
func NewFactory(optionsFactory OptionsFactory) *Factory {
return &Factory{
optionsFactory: optionsFactory,
mutex: &sync.Mutex{},
}
}

var _ component.ExporterFactory = (*Factory)(nil)

// singleton instance of the factory
// the badger exporter factory always returns this instance
// the singleton instance is shared between OTEL collector and query service
var instance storage.Factory

// GetFactory returns singleton instance of the storage factory.
func GetFactory() storage.Factory {
return instance
}

// Type gets the type of exporter.
func (f Factory) Type() configmodels.Type {
return TypeStr
}

// CreateDefaultConfig returns default configuration of Factory.
// This function implements OTEL component.ExporterFactoryBase interface.
func (f Factory) CreateDefaultConfig() configmodels.Exporter {
opts := f.optionsFactory()
return &Config{
Options: *opts,
ExporterSettings: configmodels.ExporterSettings{
TypeVal: TypeStr,
NameVal: TypeStr,
},
}
}

// CreateTraceExporter creates Jaeger Cassandra trace exporter.
// This function implements OTEL component.ExporterFactory interface.
func (f Factory) CreateTraceExporter(
_ context.Context,
params component.ExporterCreateParams,
cfg configmodels.Exporter,
) (component.TraceExporter, error) {
factory, err := f.createStorageFactory(params, cfg)
if err != nil {
return nil, err
}
return exporter.NewSpanWriterExporter(cfg, factory)
}

// CreateMetricsExporter is not implemented.
// This function implements OTEL component.ExporterFactory interface.
func (f Factory) CreateMetricsExporter(
_ context.Context,
_ component.ExporterCreateParams,
_ configmodels.Exporter,
) (component.MetricsExporter, error) {
return nil, configerror.ErrDataTypeIsNotSupported
}

func (f Factory) createStorageFactory(params component.ExporterCreateParams, cfg configmodels.Exporter) (storage.Factory, error) {
config, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("could not cast configuration to %s", TypeStr)
}
f.mutex.Lock()
if instance != nil {
return instance, nil
}
factory := badger.NewFactory()
factory.InitFromOptions(config.Options)
err := factory.Initialize(metrics.NullFactory, params.Logger)
if err != nil {
return nil, err
}
instance = factory
f.mutex.Unlock()
return factory, nil
}
69 changes: 69 additions & 0 deletions cmd/opentelemetry-collector/app/exporter/badger/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package badger

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configmodels"
"go.uber.org/zap"

jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
)

func TestCreateTraceExporter(t *testing.T) {
v, _ := jConfig.Viperize(DefaultOptions().AddFlags)
opts := DefaultOptions()
opts.InitFromViper(v)
factory := NewFactory(func() *badger.Options {
return opts
})
exporter, err := factory.CreateTraceExporter(context.Background(), component.ExporterCreateParams{Logger: zap.NewNop()}, factory.CreateDefaultConfig())
require.NoError(t, err)
assert.NotNil(t, exporter)
}

func TestCreateTraceExporter_NilConfig(t *testing.T) {
factory := Factory{}
exporter, err := factory.CreateTraceExporter(context.Background(), component.ExporterCreateParams{}, nil)
require.Nil(t, exporter)
assert.Contains(t, err.Error(), "could not cast configuration to jaeger_badger")
}

func TestCreateDefaultConfig(t *testing.T) {
factory := NewFactory(DefaultOptions)
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestCreateMetricsExporter(t *testing.T) {
f := NewFactory(DefaultOptions)
mReceiver, err := f.CreateMetricsExporter(context.Background(), component.ExporterCreateParams{}, f.CreateDefaultConfig())
assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported)
assert.Nil(t, mReceiver)
}

func TestType(t *testing.T) {
factory := Factory{}
assert.Equal(t, configmodels.Type(TypeStr), factory.Type())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
receivers:
examplereceiver:

processors:
exampleprocessor:

exporters:
jaeger_badger:
ephemeral: false
directory_key: key

service:
pipelines:
traces:
receivers: [examplereceiver]
processors: [exampleprocessor]
exporters: [jaeger_badger]
Loading

0 comments on commit e504fb9

Please sign in to comment.