From 222dfef9355703133b985beaf49319edf3c78bcf Mon Sep 17 00:00:00 2001 From: chen zhengwei Date: Fri, 20 Nov 2020 16:15:28 +0800 Subject: [PATCH 1/5] Add elasticsearch index date format configuration Signed-off-by: Chen Zhengwei Signed-off-by: chen zhengwei --- .../elasticsearchexporter/config_test.go | 3 +- .../elasticsearchexporter/integration_test.go | 9 +++-- .../elasticsearchexporter/spanstore.go | 4 +- .../elasticsearchexporter/spanstore_test.go | 4 +- .../elasticsearchexporter/storagefactory.go | 4 +- .../app/internal/esutil/index_name.go | 14 +++---- .../app/internal/esutil/index_name_test.go | 20 +++++----- .../es/esdependencyreader/dependency_store.go | 38 +++++++++---------- .../dependency_store_test.go | 15 ++++---- .../reader/es/esspanreader/span_reader.go | 5 ++- pkg/es/config/config.go | 7 ++++ plugin/storage/es/dependencystore/storage.go | 36 +++++++++--------- .../es/dependencystore/storage_test.go | 22 +++++------ plugin/storage/es/esCleaner.py | 20 ++++++++-- plugin/storage/es/factory.go | 5 ++- plugin/storage/es/options.go | 27 ++++++++++++- plugin/storage/es/options_test.go | 23 +++++++++++ plugin/storage/es/spanstore/index_utils.go | 4 +- plugin/storage/es/spanstore/reader.go | 25 ++++++------ plugin/storage/es/spanstore/reader_test.go | 19 +++++----- plugin/storage/es/spanstore/writer.go | 7 ++-- plugin/storage/es/spanstore/writer_test.go | 23 +++++------ .../storage/integration/elasticsearch_test.go | 3 +- 23 files changed, 211 insertions(+), 126 deletions(-) diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/config_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/config_test.go index f8a12a5cdb5..4014818d187 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/config_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/config_test.go @@ -48,7 +48,7 @@ func TestLoadConfigAndFlags(t *testing.T) { require.NoError(t, err) v, c := jConfig.Viperize(DefaultOptions().AddFlags, flags.AddConfigFileFlag) - err = c.ParseFlags([]string{"--es.server-urls=bar", "--es.index-prefix=staging", "--config-file=./testdata/jaeger-config.yaml"}) + err = c.ParseFlags([]string{"--es.server-urls=bar", "--es.index-prefix=staging", "--es.index-date-separator=-", "--config-file=./testdata/jaeger-config.yaml"}) require.NoError(t, err) err = flags.TryLoadConfigFile(v) @@ -74,6 +74,7 @@ func TestLoadConfigAndFlags(t *testing.T) { assert.Equal(t, []string{"someUrl"}, esCfg.Servers) assert.Equal(t, true, esCfg.CreateIndexTemplates) assert.Equal(t, "staging", esCfg.IndexPrefix) + assert.Equal(t, "2006-01-02", esCfg.IndexDateLayout) assert.Equal(t, int64(100), esCfg.NumShards) assert.Equal(t, "user", esCfg.Username) assert.Equal(t, "pass", esCfg.Password) diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go index 18274db2222..2d28d20efe6 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go @@ -45,6 +45,7 @@ const ( esHostPort = host + ":" + esPort esURL = "http://" + esHostPort indexPrefix = "integration-test" + indexDateLayout = "2006-01-02" tagKeyDeDotChar = "@" maxSpanAge = time.Hour * 72 numShards = 5 @@ -91,8 +92,9 @@ func (s *IntegrationTest) esCleanUp(allTagsAsFields bool) error { func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error { cfg := config.Configuration{ - Servers: []string{esURL}, - IndexPrefix: indexPrefix, + Servers: []string{esURL}, + IndexPrefix: indexPrefix, + IndexDateLayout: indexDateLayout, Tags: config.TagsAsFields{ AllAsFields: allTagsAsFields, }, @@ -118,6 +120,7 @@ func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error { } reader := esspanreader.NewEsSpanReader(elasticsearchClient, s.logger, esspanreader.Config{ IndexPrefix: indexPrefix, + IndexDateLayout: indexDateLayout, TagDotReplacement: tagKeyDeDotChar, MaxSpanAge: maxSpanAge, MaxDocCount: defaultMaxDocCount, @@ -125,7 +128,7 @@ func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error { s.SpanReader = reader depMapping := es.GetDependenciesMappings(numShards, numReplicas, esVersion) - depStore := esdependencyreader.NewDependencyStore(elasticsearchClient, s.logger, indexPrefix, defaultMaxDocCount) + depStore := esdependencyreader.NewDependencyStore(elasticsearchClient, s.logger, indexPrefix, indexDateLayout, defaultMaxDocCount) if err := depStore.CreateTemplates(depMapping); err != nil { return nil } diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go index b8905eae28b..6b8e4503792 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go @@ -78,8 +78,8 @@ func newEsSpanWriter(params config.Configuration, logger *zap.Logger, archive bo logger: logger, nameTag: tag.Insert(storagemetrics.TagExporterName(), name), client: client, - spanIndexName: esutil.NewIndexNameProvider(spanIndexBaseName, params.IndexPrefix, alias, archive), - serviceIndexName: esutil.NewIndexNameProvider(serviceIndexBaseName, params.IndexPrefix, alias, archive), + spanIndexName: esutil.NewIndexNameProvider(spanIndexBaseName, params.IndexPrefix, params.IndexDateLayout, alias, archive), + serviceIndexName: esutil.NewIndexNameProvider(serviceIndexBaseName, params.IndexPrefix, params.IndexDateLayout, alias, archive), translator: esmodeltranslator.NewTranslator(params.Tags.AllAsFields, tagsKeysAsFields, params.GetTagDotReplacement()), isArchive: archive, serviceCache: cache.NewLRUWithOptions( diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore_test.go index 5a48da5c1bd..0a544a9450b 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore_test.go @@ -147,8 +147,8 @@ func TestWriteSpans(t *testing.T) { w := esSpanWriter{ logger: zap.NewNop(), client: esClient, - spanIndexName: esutil.NewIndexNameProvider("span", "", esutil.AliasNone, false), - serviceIndexName: esutil.NewIndexNameProvider("service", "", esutil.AliasNone, false), + spanIndexName: esutil.NewIndexNameProvider("span", "", "2006-01-02", esutil.AliasNone, false), + serviceIndexName: esutil.NewIndexNameProvider("service", "", "2006-01-02", esutil.AliasNone, false), serviceCache: cache.NewLRU(1), nameTag: tag.Insert(storagemetrics.TagExporterName(), "name"), } diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go index 75f8b0ac506..b74001c6682 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go @@ -88,6 +88,7 @@ func (s *StorageFactory) CreateSpanReader() (spanstore.Reader, error) { Archive: false, UseReadWriteAliases: cfg.GetUseReadWriteAliases(), IndexPrefix: cfg.GetIndexPrefix(), + IndexDateLayout: cfg.GetIndexDateLayout(), MaxSpanAge: cfg.GetMaxSpanAge(), MaxDocCount: cfg.GetMaxDocCount(), TagDotReplacement: cfg.GetTagDotReplacement(), @@ -101,7 +102,7 @@ func (s *StorageFactory) CreateDependencyReader() (dependencystore.Reader, error if err != nil { return nil, err } - return esdependencyreader.NewDependencyStore(client, s.logger, cfg.GetIndexPrefix(), cfg.GetMaxDocCount()), nil + return esdependencyreader.NewDependencyStore(client, s.logger, cfg.GetIndexPrefix(), cfg.GetIndexDateLayout(), cfg.GetMaxDocCount()), nil } // CreateArchiveSpanReader creates archive spanstore.Reader @@ -115,6 +116,7 @@ func (s *StorageFactory) CreateArchiveSpanReader() (spanstore.Reader, error) { Archive: true, UseReadWriteAliases: cfg.GetUseReadWriteAliases(), IndexPrefix: cfg.GetIndexPrefix(), + IndexDateLayout: cfg.GetIndexDateLayout(), MaxSpanAge: cfg.GetMaxSpanAge(), MaxDocCount: cfg.GetMaxDocCount(), TagDotReplacement: cfg.GetTagDotReplacement(), diff --git a/cmd/opentelemetry/app/internal/esutil/index_name.go b/cmd/opentelemetry/app/internal/esutil/index_name.go index 1ef07ad5642..7b08c396de4 100644 --- a/cmd/opentelemetry/app/internal/esutil/index_name.go +++ b/cmd/opentelemetry/app/internal/esutil/index_name.go @@ -16,8 +16,6 @@ package esutil import "time" -const indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20 - // Alias is used to configure the kind of index alias type Alias string @@ -33,11 +31,12 @@ const ( // IndexNameProvider creates standard index names from dates type IndexNameProvider struct { index string + dateLayout string useSingleIndex bool } // NewIndexNameProvider constructs a new IndexNameProvider -func NewIndexNameProvider(index, prefix string, alias Alias, archive bool) IndexNameProvider { +func NewIndexNameProvider(index, prefix, layout string, alias Alias, archive bool) IndexNameProvider { if prefix != "" { index = prefix + "-" + index } @@ -53,6 +52,7 @@ func NewIndexNameProvider(index, prefix string, alias Alias, archive bool) Index } return IndexNameProvider{ index: index, + dateLayout: layout, useSingleIndex: archive || (alias != AliasNone), } } @@ -63,12 +63,12 @@ func (n IndexNameProvider) IndexNameRange(start, end time.Time) []string { return []string{n.index} } var indices []string - firstIndex := n.index + start.UTC().Format(indexDateFormat) - currentIndex := n.index + end.UTC().Format(indexDateFormat) + firstIndex := n.index + start.UTC().Format(n.dateLayout) + currentIndex := n.index + end.UTC().Format(n.dateLayout) for currentIndex != firstIndex { indices = append(indices, currentIndex) end = end.Add(-24 * time.Hour) - currentIndex = n.index + end.UTC().Format(indexDateFormat) + currentIndex = n.index + end.UTC().Format(n.dateLayout) } indices = append(indices, firstIndex) return indices @@ -79,6 +79,6 @@ func (n IndexNameProvider) IndexName(date time.Time) string { if n.useSingleIndex { return n.index } - spanDate := date.UTC().Format(indexDateFormat) + spanDate := date.UTC().Format(n.dateLayout) return n.index + spanDate } diff --git a/cmd/opentelemetry/app/internal/esutil/index_name_test.go b/cmd/opentelemetry/app/internal/esutil/index_name_test.go index 90cec2c7b20..37f4d289ae2 100644 --- a/cmd/opentelemetry/app/internal/esutil/index_name_test.go +++ b/cmd/opentelemetry/app/internal/esutil/index_name_test.go @@ -31,33 +31,33 @@ func TestIndexNames(t *testing.T) { }{ { name: "index prefix", - nameProvider: NewIndexNameProvider("myindex", "production", AliasNone, false), + nameProvider: NewIndexNameProvider("myindex", "production", "2006-01-02", AliasNone, false), indices: []string{"production-myindex-0001-01-01"}, }, { name: "multiple dates", - nameProvider: NewIndexNameProvider("myindex", "", AliasNone, false), + nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasNone, false), indices: []string{"myindex-2020-08-30", "myindex-2020-08-29", "myindex-2020-08-28"}, start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC), }, { name: "use aliases", - nameProvider: NewIndexNameProvider("myindex", "", AliasRead, false), + nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasRead, false), indices: []string{"myindex-read"}, start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC), }, { name: "use archive", - nameProvider: NewIndexNameProvider("myindex", "", AliasNone, true), + nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasNone, true), indices: []string{"myindex-archive"}, start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC), }, { name: "use archive alias", - nameProvider: NewIndexNameProvider("myindex", "", AliasRead, true), + nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasRead, true), indices: []string{"myindex-archive-read"}, start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC), @@ -81,30 +81,30 @@ func TestIndexName(t *testing.T) { }{ { name: "index prefix", - nameProvider: NewIndexNameProvider("myindex", "production", AliasNone, false), + nameProvider: NewIndexNameProvider("myindex", "production", "2006-01-02", AliasNone, false), index: "production-myindex-0001-01-01", }, { name: "no prefix", - nameProvider: NewIndexNameProvider("myindex", "", AliasNone, false), + nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasNone, false), index: "myindex-2020-08-28", date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), }, { name: "use aliases", - nameProvider: NewIndexNameProvider("myindex", "", AliasWrite, false), + nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasWrite, false), index: "myindex-write", date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), }, { name: "use archive", - nameProvider: NewIndexNameProvider("myindex", "", AliasNone, true), + nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasNone, true), index: "myindex-archive", date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), }, { name: "use archive alias", - nameProvider: NewIndexNameProvider("myindex", "", AliasWrite, true), + nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasWrite, true), index: "myindex-archive-write", date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), }, diff --git a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go index 174a54b2780..97bc3b5e6e3 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go +++ b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go @@ -35,31 +35,31 @@ const ( dependencyIndexBaseName = "jaeger-dependencies" timestampField = "timestamp" - - indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20 ) // DependencyStore defines Elasticsearch dependency store. type DependencyStore struct { - client esclient.ElasticsearchClient - logger *zap.Logger - indexPrefix string - maxDocCount int + client esclient.ElasticsearchClient + logger *zap.Logger + indexPrefix string + indexDateLayout string + maxDocCount int } var _ dependencystore.Reader = (*DependencyStore)(nil) var _ dependencystore.Writer = (*DependencyStore)(nil) // NewDependencyStore creates dependency store. -func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, indexPrefix string, maxDocCount int) *DependencyStore { +func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, indexPrefix, indexDateLayout string, maxDocCount int) *DependencyStore { if indexPrefix != "" { indexPrefix += "-" } return &DependencyStore{ - client: client, - logger: logger, - indexPrefix: indexPrefix + dependencyIndexBaseName + "-", - maxDocCount: maxDocCount, + client: client, + logger: logger, + indexPrefix: indexPrefix + dependencyIndexBaseName + "-", + indexDateLayout: indexDateLayout, + maxDocCount: maxDocCount, } } @@ -78,14 +78,14 @@ func (r *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D if err != nil { return err } - return r.client.Index(context.Background(), bytes.NewReader(data), indexWithDate(r.indexPrefix, ts), dependencyType) + return r.client.Index(context.Background(), bytes.NewReader(data), indexWithDate(r.indexPrefix, r.indexDateLayout, ts), dependencyType) } // GetDependencies implements dependencystore.Reader func (r *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { searchBody := getSearchBody(endTs, lookback, r.maxDocCount) - indices := dailyIndices(r.indexPrefix, endTs, lookback) + indices := dailyIndices(r.indexPrefix, r.indexDateLayout, endTs, lookback) response, err := r.client.Search(ctx, searchBody, r.maxDocCount, indices...) if err != nil { return nil, err @@ -114,18 +114,18 @@ func getSearchBody(endTs time.Time, lookback time.Duration, maxDocCount int) esc } } -func indexWithDate(indexNamePrefix string, date time.Time) string { - return indexNamePrefix + date.UTC().Format(indexDateFormat) +func indexWithDate(indexNamePrefix, indexDateLayout string, date time.Time) string { + return indexNamePrefix + date.UTC().Format(indexDateLayout) } -func dailyIndices(prefix string, ts time.Time, lookback time.Duration) []string { +func dailyIndices(prefix, format string, ts time.Time, lookback time.Duration) []string { var indices []string - firstIndex := indexWithDate(prefix, ts.Add(-lookback)) - currentIndex := indexWithDate(prefix, ts) + firstIndex := indexWithDate(prefix, format, ts.Add(-lookback)) + currentIndex := indexWithDate(prefix, format, ts) for currentIndex != firstIndex { indices = append(indices, currentIndex) ts = ts.Add(-24 * time.Hour) - currentIndex = indexWithDate(prefix, ts) + currentIndex = indexWithDate(prefix, format, ts) } return append(indices, firstIndex) } diff --git a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go index ac19e02b5c0..32cc7dca7b6 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go +++ b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go @@ -37,7 +37,7 @@ const defaultMaxDocCount = 10_000 func TestCreateTemplates(t *testing.T) { client := &mockClient{} - store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) + store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount) template := "template" err := store.CreateTemplates(template) require.NoError(t, err) @@ -48,7 +48,7 @@ func TestCreateTemplates(t *testing.T) { func TestWriteDependencies(t *testing.T) { client := &mockClient{} - store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) + store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount) dependencies := []model.DependencyLink{{Parent: "foo", Child: "bar", CallCount: 1}} tsNow := time.Now() err := store.WriteDependencies(tsNow, dependencies) @@ -87,7 +87,7 @@ func TestGetDependencies(t *testing.T) { }, }, } - store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) + store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount) dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour) require.NoError(t, err) assert.Equal(t, timeDependencies, dbmodel.TimeDependencies{ @@ -109,7 +109,7 @@ func TestGetDependencies_err_unmarshall(t *testing.T) { }, }, } - store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) + store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount) dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour) require.Contains(t, err.Error(), "invalid character") assert.Nil(t, dependencies) @@ -120,7 +120,7 @@ func TestGetDependencies_err_client(t *testing.T) { client := &mockClient{ searchErr: searchErr, } - store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) + store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount) tsNow := time.Now() dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour) require.Error(t, err) @@ -150,11 +150,12 @@ func TestSearchBody(t *testing.T) { } func TestIndexWithDate(t *testing.T) { - assert.Equal(t, "foo-2020-09-30", indexWithDate("foo-", time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC))) + assert.Equal(t, "foo-2020-09-30", indexWithDate("foo-", "2006-01-02", + time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC))) } func TestDailyIndices(t *testing.T) { - indices := dailyIndices("foo-", time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC), time.Hour) + indices := dailyIndices("foo-", "2006-01-02", time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC), time.Hour) assert.Equal(t, []string{"foo-2020-09-30", "foo-2020-09-29"}, indices) } diff --git a/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go b/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go index 1a209b288f4..1d8f645117c 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go +++ b/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go @@ -71,6 +71,7 @@ type Config struct { Archive bool UseReadWriteAliases bool IndexPrefix string + IndexDateLayout string MaxSpanAge time.Duration MaxDocCount int TagDotReplacement string @@ -89,8 +90,8 @@ func NewEsSpanReader(client esclient.ElasticsearchClient, logger *zap.Logger, co maxSpanAge: config.MaxSpanAge, maxDocCount: config.MaxDocCount, converter: dbmodel.NewToDomain(config.TagDotReplacement), - spanIndexName: esutil.NewIndexNameProvider(spanIndexBaseName, config.IndexPrefix, alias, config.Archive), - serviceIndexName: esutil.NewIndexNameProvider(serviceIndexBaseName, config.IndexPrefix, alias, config.Archive), + spanIndexName: esutil.NewIndexNameProvider(spanIndexBaseName, config.IndexPrefix, config.IndexDateLayout, alias, config.Archive), + serviceIndexName: esutil.NewIndexNameProvider(serviceIndexBaseName, config.IndexPrefix, config.IndexDateLayout, alias, config.Archive), } } diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 861f7033356..9edaac60efb 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -59,6 +59,7 @@ type Configuration struct { BulkActions int `mapstructure:"-"` BulkFlushInterval time.Duration `mapstructure:"-"` IndexPrefix string `mapstructure:"index_prefix"` + IndexDateLayout string `mapstructure:"index_date_layout"` Tags TagsAsFields `mapstructure:"tags_as_fields"` Enabled bool `mapstructure:"-"` TLS tlscfg.Options `mapstructure:"tls"` @@ -89,6 +90,7 @@ type ClientBuilder interface { GetMaxSpanAge() time.Duration GetMaxDocCount() int GetIndexPrefix() string + GetIndexDateLayout() string GetTagsFilePath() string GetAllTagsAsFields() bool GetTagDotReplacement() string @@ -260,6 +262,11 @@ func (c *Configuration) GetIndexPrefix() string { return c.IndexPrefix } +// GetIndexDateLayout returns index date layout +func (c *Configuration) GetIndexDateLayout() string { + return c.IndexDateLayout +} + // GetTagsFilePath returns a path to file containing tag keys func (c *Configuration) GetTagsFilePath() string { return c.Tags.File diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index 46bfd6b5d30..64ea820c6fe 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -37,29 +37,31 @@ const ( // DependencyStore handles all queries and insertions to ElasticSearch dependencies type DependencyStore struct { - client es.Client - logger *zap.Logger - indexPrefix string - maxDocCount int + client es.Client + logger *zap.Logger + indexPrefix string + indexDateLayout string + maxDocCount int } // NewDependencyStore returns a DependencyStore -func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string, maxDocCount int) *DependencyStore { +func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix, indexDateLayout string, maxDocCount int) *DependencyStore { var prefix string if indexPrefix != "" { prefix = indexPrefix + "-" } return &DependencyStore{ - client: client, - logger: logger, - indexPrefix: prefix + dependencyIndex, - maxDocCount: maxDocCount, + client: client, + logger: logger, + indexPrefix: prefix + dependencyIndex, + indexDateLayout: indexDateLayout, + maxDocCount: maxDocCount, } } // WriteDependencies implements dependencystore.Writer#WriteDependencies. func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error { - indexName := indexWithDate(s.indexPrefix, ts) + indexName := indexWithDate(s.indexPrefix, s.indexDateLayout, ts) s.writeDependencies(indexName, ts, dependencies) return nil } @@ -82,7 +84,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe // GetDependencies returns all interservice dependencies func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { - indices := getIndices(s.indexPrefix, endTs, lookback) + indices := getIndices(s.indexPrefix, s.indexDateLayout, endTs, lookback) searchResult, err := s.client.Search(indices...). Size(s.maxDocCount). Query(buildTSQuery(endTs, lookback)). @@ -109,18 +111,18 @@ func buildTSQuery(endTs time.Time, lookback time.Duration) elastic.Query { return elastic.NewRangeQuery("timestamp").Gte(endTs.Add(-lookback)).Lte(endTs) } -func getIndices(prefix string, ts time.Time, lookback time.Duration) []string { +func getIndices(prefix, dateLayout string, ts time.Time, lookback time.Duration) []string { var indices []string - firstIndex := indexWithDate(prefix, ts.Add(-lookback)) - currentIndex := indexWithDate(prefix, ts) + firstIndex := indexWithDate(prefix, dateLayout, ts.Add(-lookback)) + currentIndex := indexWithDate(prefix, dateLayout, ts) for currentIndex != firstIndex { indices = append(indices, currentIndex) ts = ts.Add(-24 * time.Hour) - currentIndex = indexWithDate(prefix, ts) + currentIndex = indexWithDate(prefix, dateLayout, ts) } return append(indices, firstIndex) } -func indexWithDate(indexNamePrefix string, date time.Time) string { - return indexNamePrefix + date.UTC().Format("2006-01-02") +func indexWithDate(indexNamePrefix, indexDateLayout string, date time.Time) string { + return indexNamePrefix + date.UTC().Format(indexDateLayout) } diff --git a/plugin/storage/es/dependencystore/storage_test.go b/plugin/storage/es/dependencystore/storage_test.go index a4558f80be0..3465752d2e1 100644 --- a/plugin/storage/es/dependencystore/storage_test.go +++ b/plugin/storage/es/dependencystore/storage_test.go @@ -43,14 +43,14 @@ type depStorageTest struct { storage *DependencyStore } -func withDepStorage(indexPrefix string, maxDocCount int, fn func(r *depStorageTest)) { +func withDepStorage(indexPrefix, indexDateLayout string, maxDocCount int, fn func(r *depStorageTest)) { client := &mocks.Client{} logger, logBuffer := testutils.NewLogger() r := &depStorageTest{ client: client, logger: logger, logBuffer: logBuffer, - storage: NewDependencyStore(client, logger, indexPrefix, maxDocCount), + storage: NewDependencyStore(client, logger, indexPrefix, indexDateLayout, maxDocCount), } fn(r) } @@ -69,7 +69,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) { } for _, testCase := range testCases { client := &mocks.Client{} - r := NewDependencyStore(client, zap.NewNop(), testCase.prefix, defaultMaxDocCount) + r := NewDependencyStore(client, zap.NewNop(), testCase.prefix, "2006-01-02", defaultMaxDocCount) assert.Equal(t, testCase.expected+dependencyIndex, r.indexPrefix) } } @@ -90,9 +90,9 @@ func TestWriteDependencies(t *testing.T) { }, } for _, testCase := range testCases { - withDepStorage("", defaultMaxDocCount, func(r *depStorageTest) { + withDepStorage("", "2006-01-02", defaultMaxDocCount, func(r *depStorageTest) { fixedTime := time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC) - indexName := indexWithDate("", fixedTime) + indexName := indexWithDate("", "2006-01-02", fixedTime) writeService := &mocks.IndexService{} r.client.On("Index").Return(writeService) @@ -165,7 +165,7 @@ func TestGetDependencies(t *testing.T) { }, } for _, testCase := range testCases { - withDepStorage(testCase.indexPrefix, testCase.maxDocCount, func(r *depStorageTest) { + withDepStorage(testCase.indexPrefix, "2006-01-02", testCase.maxDocCount, func(r *depStorageTest) { fixedTime := time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC) searchService := &mocks.SearchService{} @@ -208,28 +208,28 @@ func TestGetIndices(t *testing.T) { prefix string }{ { - expected: []string{indexWithDate("", fixedTime), indexWithDate("", fixedTime.Add(-24*time.Hour))}, + expected: []string{indexWithDate("", "2006-01-02", fixedTime), indexWithDate("", "2006-01-02", fixedTime.Add(-24*time.Hour))}, lookback: 23 * time.Hour, prefix: "", }, { - expected: []string{indexWithDate("", fixedTime), indexWithDate("", fixedTime.Add(-24*time.Hour))}, + expected: []string{indexWithDate("", "2006-01-02", fixedTime), indexWithDate("", "2006-01-02", fixedTime.Add(-24*time.Hour))}, lookback: 13 * time.Hour, prefix: "", }, { - expected: []string{indexWithDate("foo:", fixedTime)}, + expected: []string{indexWithDate("foo:", "2006-01-02", fixedTime)}, lookback: 1 * time.Hour, prefix: "foo:", }, { - expected: []string{indexWithDate("foo-", fixedTime)}, + expected: []string{indexWithDate("foo-", "2006-01-02", fixedTime)}, lookback: 0, prefix: "foo-", }, } for _, testCase := range testCases { - assert.EqualValues(t, testCase.expected, getIndices(testCase.prefix, fixedTime, testCase.lookback)) + assert.EqualValues(t, testCase.expected, getIndices(testCase.prefix, "2006-01-02", fixedTime, testCase.lookback)) } } diff --git a/plugin/storage/es/esCleaner.py b/plugin/storage/es/esCleaner.py index b3fd8357b1d..4b83f8d2db0 100755 --- a/plugin/storage/es/esCleaner.py +++ b/plugin/storage/es/esCleaner.py @@ -15,6 +15,7 @@ def main(): print('HOSTNAME ... specifies which Elasticsearch hosts URL to search and delete indices from.') print('TIMEOUT ... number of seconds to wait for master node response.'.format(TIMEOUT)) print('INDEX_PREFIX ... specifies index prefix.') + print('INDEX_DATE_SEPARATOR ... specifies index date separator.') print('ARCHIVE ... specifies whether to remove archive indices (only works for rollover) (default false).') print('ROLLOVER ... specifies whether to remove indices created by rollover (default false).') print('ES_USERNAME ... The username required by Elasticsearch.') @@ -33,6 +34,7 @@ def main(): prefix = os.getenv("INDEX_PREFIX", '') if prefix != '': prefix += '-' + separator = os.getenv("INDEX_DATE_SEPARATOR", '') if str2bool(os.getenv("ARCHIVE", 'false')): filter_archive_indices_rollover(ilo, prefix) @@ -40,7 +42,7 @@ def main(): if str2bool(os.getenv("ROLLOVER", 'false')): filter_main_indices_rollover(ilo, prefix) else: - filter_main_indices(ilo, prefix) + filter_main_indices(ilo, prefix, separator) empty_list(ilo, 'No indices to delete') @@ -51,12 +53,22 @@ def main(): delete_indices.do_action() -def filter_main_indices(ilo, prefix): - ilo.filter_by_regex(kind='regex', value=prefix + "jaeger-(span|service|dependencies)-\d{4}-\d{2}-\d{2}") +def filter_main_indices(ilo, prefix, separator): + date_regex = "\d{4}-\d{2}-\d{2}" + time_string = "%Y-%m-%d" + if separator != "": + if separator == "none": + date_regex = "\d{4}\d{2}\d{2}" + time_string = "%Y%m%d" + else: + date_regex = "\d{4}" + separator + "\d{2}" + separator + "\d{2}" + time_string = "%Y" + separator + "%m" + separator + "%d" + + ilo.filter_by_regex(kind='regex', value=prefix + "jaeger-(span|service|dependencies)-" + date_regex) empty_list(ilo, "No indices to delete") # This excludes archive index as we use source='name' # source `creation_date` would include archive index - ilo.filter_by_age(source='name', direction='older', timestring='%Y-%m-%d', unit='days', unit_count=int(sys.argv[1])) + ilo.filter_by_age(source='name', direction='older', timestring=time_string, unit='days', unit_count=int(sys.argv[1])) def filter_main_indices_rollover(ilo, prefix): diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index b41724db7b0..41dcf9c9970 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -111,7 +111,8 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - reader := esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix(), f.primaryConfig.GetMaxDocCount()) + reader := esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix(), + f.primaryConfig.GetIndexDateLayout(), f.primaryConfig.GetMaxDocCount()) return reader, nil } @@ -145,6 +146,7 @@ func createSpanReader( MaxDocCount: cfg.GetMaxDocCount(), MaxSpanAge: cfg.GetMaxSpanAge(), IndexPrefix: cfg.GetIndexPrefix(), + IndexDateLayout: cfg.GetIndexDateLayout(), TagDotReplacement: cfg.GetTagDotReplacement(), UseReadWriteAliases: cfg.GetUseReadWriteAliases(), Archive: archive, @@ -171,6 +173,7 @@ func createSpanWriter( Logger: logger, MetricsFactory: mFactory, IndexPrefix: cfg.GetIndexPrefix(), + IndexDateLayout: cfg.GetIndexDateLayout(), AllTagsAsFields: cfg.GetAllTagsAsFields(), TagKeysAsFields: tags, TagDotReplacement: cfg.GetTagDotReplacement(), diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index c575befc8a4..68f1c16da43 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -17,6 +17,7 @@ package es import ( "flag" + "fmt" "math" "strings" "time" @@ -45,6 +46,7 @@ const ( suffixBulkFlushInterval = ".bulk.flush-interval" suffixTimeout = ".timeout" suffixIndexPrefix = ".index-prefix" + suffixIndexDateSeparator = ".index-date-separator" suffixTagsAsFields = ".tags-as-fields" suffixTagsAsFieldsAll = suffixTagsAsFields + ".all" suffixTagsAsFieldsInclude = suffixTagsAsFields + ".include" @@ -58,8 +60,9 @@ const ( // default number of documents to return from a query (elasticsearch allowed limit) // see search.max_buckets and index.max_result_window - defaultMaxDocCount = 10_000 - defaultServerURL = "http://127.0.0.1:9200" + defaultMaxDocCount = 10_000 + defaultServerURL = "http://127.0.0.1:9200" + defaultIndexDateLayout = "2006-01-02" // date format for index e.g. 2020-01-20 ) // TODO this should be moved next to config.Configuration struct (maybe ./flags package) @@ -210,6 +213,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixIndexPrefix, nsConfig.IndexPrefix, "Optional prefix of Jaeger indices. For example \"production\" creates \"production-jaeger-*\".") + flagSet.String( + nsConfig.namespace+suffixIndexDateSeparator, + "", + "Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20 \".") flagSet.Bool( nsConfig.namespace+suffixTagsAsFieldsAll, nsConfig.Tags.AllAsFields, @@ -281,6 +288,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval) cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout) cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix) + cfg.IndexDateLayout = initDateLayout(v.GetString(cfg.namespace + suffixIndexDateSeparator)) cfg.Tags.AllAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll) cfg.Tags.Include = v.GetString(cfg.namespace + suffixTagsAsFieldsInclude) cfg.Tags.File = v.GetString(cfg.namespace + suffixTagsFile) @@ -325,3 +333,18 @@ func (opt *Options) Get(namespace string) *config.Configuration { func stripWhiteSpace(str string) string { return strings.Replace(str, " ", "", -1) } + +func initDateLayout(separator string) string { + var dateLayout string + if separator != "" { + switch separator { + case "none": + dateLayout = "20060102" + default: + dateLayout = fmt.Sprintf("2006%s01%s02", separator, separator) + } + } else { + dateLayout = defaultIndexDateLayout + } + return dateLayout +} diff --git a/plugin/storage/es/options_test.go b/plugin/storage/es/options_test.go index a7148a82e68..f25c25eeae9 100644 --- a/plugin/storage/es/options_test.go +++ b/plugin/storage/es/options_test.go @@ -55,10 +55,12 @@ func TestOptionsWithFlags(t *testing.T) { "--es.max-span-age=48h", "--es.num-shards=20", "--es.num-replicas=10", + "--es.index-date-separator=none", // a couple overrides "--es.aux.server-urls=3.3.3.3, 4.4.4.4", "--es.aux.max-span-age=24h", "--es.aux.num-replicas=10", + "--es.aux.index-date-separator=.", "--es.tls.enabled=true", "--es.tls.skip-host-verify=true", "--es.tags-as-fields.all=true", @@ -81,6 +83,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "!", primary.Tags.DotReplacement) assert.Equal(t, "./file.txt", primary.Tags.File) assert.Equal(t, "test,tags", primary.Tags.Include) + assert.Equal(t, "20060102", primary.IndexDateLayout) aux := opts.Get("es.aux") assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Servers) @@ -94,6 +97,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "!", aux.Tags.DotReplacement) assert.Equal(t, "./file.txt", aux.Tags.File) assert.Equal(t, "test,tags", aux.Tags.Include) + assert.Equal(t, "2006.01.02", aux.IndexDateLayout) } func TestMaxNumSpansUsage(t *testing.T) { @@ -148,3 +152,22 @@ func TestMaxDocCount(t *testing.T) { }) } } + +func TestDateLayout(t *testing.T) { + testCases := []struct { + name string + separator string + wantLayout string + }{ + {"default", "", "2006-01-02"}, + {"crossbar", "-", "2006-01-02"}, + {"normal separator", ".", "2006.01.02"}, + {"none separator", "none", "20060102"}, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dateLayout := initDateLayout(tc.separator) + assert.Equal(t, dateLayout, tc.wantLayout) + }) + } +} diff --git a/plugin/storage/es/spanstore/index_utils.go b/plugin/storage/es/spanstore/index_utils.go index b3497a3bdca..a9d4674aa5e 100644 --- a/plugin/storage/es/spanstore/index_utils.go +++ b/plugin/storage/es/spanstore/index_utils.go @@ -19,8 +19,8 @@ import ( ) // returns index name with date -func indexWithDate(indexPrefix string, date time.Time) string { - spanDate := date.UTC().Format("2006-01-02") +func indexWithDate(indexPrefix, indexDateLayout string, date time.Time) string { + spanDate := date.UTC().Format(indexDateLayout) return indexPrefix + spanDate } diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 5c7176d514e..ca6e3d249de 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -97,6 +97,7 @@ type SpanReader struct { serviceOperationStorage *ServiceOperationStorage spanIndexPrefix string serviceIndexPrefix string + indexDateLayout string spanConverter dbmodel.ToDomain timeRangeIndices timeRangeIndexFn sourceFn sourceFn @@ -111,6 +112,7 @@ type SpanReaderParams struct { MaxDocCount int MetricsFactory metrics.Factory IndexPrefix string + IndexDateLayout string TagDotReplacement string Archive bool UseReadWriteAliases bool @@ -125,6 +127,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { serviceOperationStorage: NewServiceOperationStorage(p.Client, p.Logger, 0), // the decorator takes care of metrics spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex), serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), + indexDateLayout: p.IndexDateLayout, spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases), sourceFn: getSourceFn(p.Archive, p.MaxDocCount), @@ -132,7 +135,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { } } -type timeRangeIndexFn func(indexName string, startTime time.Time, endTime time.Time) []string +type timeRangeIndexFn func(indexName string, indexDateLayout string, startTime time.Time, endTime time.Time) []string type sourceFn func(query elastic.Query, nextTime uint64) *elastic.SearchSource @@ -144,12 +147,12 @@ func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn { } else { archivePrefix = archiveIndexSuffix } - return func(indexName string, startTime time.Time, endTime time.Time) []string { + return func(indexName, indexDateLayout string, startTime time.Time, endTime time.Time) []string { return []string{archiveIndex(indexName, archivePrefix)} } } if useReadWriteAliases { - return func(indices string, startTime time.Time, endTime time.Time) []string { + return func(indices string, indexDateLayout string, startTime time.Time, endTime time.Time) []string { return []string{indices + "read"} } } @@ -171,14 +174,14 @@ func getSourceFn(archive bool, maxDocCount int) sourceFn { } // timeRangeIndices returns the array of indices that we need to query, based on query params -func timeRangeIndices(indexName string, startTime time.Time, endTime time.Time) []string { +func timeRangeIndices(indexName, indexDateLayout string, startTime time.Time, endTime time.Time) []string { var indices []string - firstIndex := indexWithDate(indexName, startTime) - currentIndex := indexWithDate(indexName, endTime) + firstIndex := indexWithDate(indexName, indexDateLayout, startTime) + currentIndex := indexWithDate(indexName, indexDateLayout, endTime) for currentIndex != firstIndex { indices = append(indices, currentIndex) endTime = endTime.Add(-24 * time.Hour) - currentIndex = indexWithDate(indexName, endTime) + currentIndex = indexWithDate(indexName, indexDateLayout, endTime) } indices = append(indices, firstIndex) return indices @@ -241,7 +244,7 @@ func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "GetServices") defer span.Finish() currentTime := time.Now() - jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) + jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.indexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime) return s.serviceOperationStorage.getServices(ctx, jaegerIndices, s.maxDocCount) } @@ -253,7 +256,7 @@ func (s *SpanReader) GetOperations( span, ctx := opentracing.StartSpanFromContext(ctx, "GetOperations") defer span.Finish() currentTime := time.Now() - jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) + jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.indexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime) operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName, s.maxDocCount) if err != nil { return nil, err @@ -326,7 +329,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st // Add an hour in both directions so that traces that straddle two indexes are retrieved. // i.e starts in one and ends in another. - indices := s.timeRangeIndices(s.spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour)) + indices := s.timeRangeIndices(s.spanIndexPrefix, s.indexDateLayout, startTime.Add(-time.Hour), endTime.Add(time.Hour)) nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour)) searchAfterTime := make(map[model.TraceID]uint64) @@ -512,7 +515,7 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra // } aggregation := s.buildTraceIDAggregation(traceQuery.NumTraces) boolQuery := s.buildFindTraceIDsQuery(traceQuery) - jaegerIndices := s.timeRangeIndices(s.spanIndexPrefix, traceQuery.StartTimeMin, traceQuery.StartTimeMax) + jaegerIndices := s.timeRangeIndices(s.spanIndexPrefix, s.indexDateLayout, traceQuery.StartTimeMin, traceQuery.StartTimeMax) searchService := s.client.Search(jaegerIndices...). Size(0). // set to 0 because we don't want actual documents. diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 997579d19e4..de841663016 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -176,7 +176,7 @@ func TestSpanReaderIndices(t *testing.T) { } for _, testCase := range testCases { r := NewSpanReader(testCase.params) - actual := r.timeRangeIndices(r.spanIndexPrefix, date, date) + actual := r.timeRangeIndices(r.spanIndexPrefix, "2006-01-02", date, date) assert.Equal(t, []string{testCase.index}, actual) } } @@ -435,6 +435,7 @@ func TestSpanReaderFindIndices(t *testing.T) { today := time.Date(1995, time.April, 21, 4, 12, 19, 95, time.UTC) yesterday := today.AddDate(0, 0, -1) twoDaysAgo := today.AddDate(0, 0, -2) + dateLayout := "2006-01-02" testCases := []struct { startTime time.Time @@ -445,30 +446,30 @@ func TestSpanReaderFindIndices(t *testing.T) { startTime: today.Add(-time.Millisecond), endTime: today, expected: []string{ - indexWithDate(spanIndex, today), + indexWithDate(spanIndex, dateLayout, today), }, }, { startTime: today.Add(-13 * time.Hour), endTime: today, expected: []string{ - indexWithDate(spanIndex, today), - indexWithDate(spanIndex, yesterday), + indexWithDate(spanIndex, dateLayout, today), + indexWithDate(spanIndex, dateLayout, yesterday), }, }, { startTime: today.Add(-48 * time.Hour), endTime: today, expected: []string{ - indexWithDate(spanIndex, today), - indexWithDate(spanIndex, yesterday), - indexWithDate(spanIndex, twoDaysAgo), + indexWithDate(spanIndex, dateLayout, today), + indexWithDate(spanIndex, dateLayout, yesterday), + indexWithDate(spanIndex, dateLayout, twoDaysAgo), }, }, } withSpanReader(func(r *spanReaderTest) { for _, testCase := range testCases { - actual := r.reader.timeRangeIndices(spanIndex, testCase.startTime, testCase.endTime) + actual := r.reader.timeRangeIndices(spanIndex, dateLayout, testCase.startTime, testCase.endTime) assert.EqualValues(t, testCase.expected, actual) } }) @@ -476,7 +477,7 @@ func TestSpanReaderFindIndices(t *testing.T) { func TestSpanReader_indexWithDate(t *testing.T) { withSpanReader(func(r *spanReaderTest) { - actual := indexWithDate(spanIndex, time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC)) + actual := indexWithDate(spanIndex, "2006-01-02", time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC)) assert.Equal(t, "jaeger-span-1995-04-21", actual) }) } diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index c7cf714e660..74be940a308 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -57,6 +57,7 @@ type SpanWriterParams struct { Logger *zap.Logger MetricsFactory metrics.Factory IndexPrefix string + IndexDateLayout string AllTagsAsFields bool TagKeysAsFields []string TagDotReplacement string @@ -82,7 +83,7 @@ func NewSpanWriter(p SpanWriterParams) *SpanWriter { }, ), spanConverter: dbmodel.NewFromDomain(p.AllTagsAsFields, p.TagKeysAsFields, p.TagDotReplacement), - spanServiceIndex: getSpanAndServiceIndexFn(p.Archive, p.UseReadWriteAliases, p.IndexPrefix), + spanServiceIndex: getSpanAndServiceIndexFn(p.Archive, p.UseReadWriteAliases, p.IndexPrefix, p.IndexDateLayout), } } @@ -102,7 +103,7 @@ func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate string) error // spanAndServiceIndexFn returns names of span and service indices type spanAndServiceIndexFn func(spanTime time.Time) (string, string) -func getSpanAndServiceIndexFn(archive, useReadWriteAliases bool, prefix string) spanAndServiceIndexFn { +func getSpanAndServiceIndexFn(archive, useReadWriteAliases bool, prefix, dateLayout string) spanAndServiceIndexFn { if prefix != "" { prefix += indexPrefixSeparator } @@ -123,7 +124,7 @@ func getSpanAndServiceIndexFn(archive, useReadWriteAliases bool, prefix string) } } return func(date time.Time) (string, string) { - return indexWithDate(spanIndexPrefix, date), indexWithDate(serviceIndexPrefix, date) + return indexWithDate(spanIndexPrefix, dateLayout, date), indexWithDate(serviceIndexPrefix, dateLayout, date) } } diff --git a/plugin/storage/es/spanstore/writer_test.go b/plugin/storage/es/spanstore/writer_test.go index 1e1292c33e0..1ac8070c5aa 100644 --- a/plugin/storage/es/spanstore/writer_test.go +++ b/plugin/storage/es/spanstore/writer_test.go @@ -50,7 +50,7 @@ func withSpanWriter(fn func(w *spanWriterTest)) { client: client, logger: logger, logBuffer: logBuffer, - writer: NewSpanWriter(SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory}), + writer: NewSpanWriter(SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexDateLayout: "2006-01-02"}), } fn(w) } @@ -62,31 +62,32 @@ func TestSpanWriterIndices(t *testing.T) { logger, _ := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) date := time.Now() - dateFormat := date.UTC().Format("2006-01-02") + layout := "2006-01-02" + dateFormat := date.UTC().Format(layout) testCases := []struct { indices []string params SpanWriterParams }{ {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "", Archive: false}, + IndexPrefix: "", IndexDateLayout: layout, Archive: false}, indices: []string{spanIndex + dateFormat, serviceIndex + dateFormat}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "", UseReadWriteAliases: true}, + IndexPrefix: "", IndexDateLayout: layout, UseReadWriteAliases: true}, indices: []string{spanIndex + "write", serviceIndex + "write"}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", Archive: false}, + IndexPrefix: "foo:", IndexDateLayout: layout, Archive: false}, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + dateFormat, "foo:" + indexPrefixSeparator + serviceIndex + dateFormat}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", UseReadWriteAliases: true}, + IndexPrefix: "foo:", IndexDateLayout: layout, UseReadWriteAliases: true}, indices: []string{"foo:-" + spanIndex + "write", "foo:-" + serviceIndex + "write"}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "", Archive: true}, + IndexPrefix: "", IndexDateLayout: layout, Archive: true}, indices: []string{spanIndex + archiveIndexSuffix, ""}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", Archive: true}, + IndexPrefix: "foo:", IndexDateLayout: layout, Archive: true}, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix, ""}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true}, + IndexPrefix: "foo:", IndexDateLayout: layout, Archive: true, UseReadWriteAliases: true}, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveWriteIndexSuffix, ""}}, } for _, testCase := range testCases { @@ -254,8 +255,8 @@ func TestSpanIndexName(t *testing.T) { span := &model.Span{ StartTime: date, } - spanIndexName := indexWithDate(spanIndex, span.StartTime) - serviceIndexName := indexWithDate(serviceIndex, span.StartTime) + spanIndexName := indexWithDate(spanIndex, "2006-01-02", span.StartTime) + serviceIndexName := indexWithDate(serviceIndex, "2006-01-02", span.StartTime) assert.Equal(t, "jaeger-span-1995-04-21", spanIndexName) assert.Equal(t, "jaeger-service-1995-04-21", serviceIndexName) } diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 301f68a3a3c..7eb1df8bac1 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -44,6 +44,7 @@ const ( queryHostPort = host + ":" + queryPort queryURL = "http://" + queryHostPort indexPrefix = "integration-test" + indexDateLayout = "2006-01-02" tagKeyDeDotChar = "@" maxSpanAge = time.Hour * 72 defaultMaxDocCount = 10_000 @@ -132,7 +133,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro Archive: archive, MaxDocCount: defaultMaxDocCount, }) - dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix, defaultMaxDocCount) + dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix, indexDateLayout, defaultMaxDocCount) depMapping := es.GetDependenciesMappings(5, 1, client.GetVersion()) err = dependencyStore.CreateTemplates(depMapping) if err != nil { From 07d71039eba3fd156302ae47b85eede28c1b202d Mon Sep 17 00:00:00 2001 From: chen zhengwei Date: Mon, 23 Nov 2020 14:55:53 +0800 Subject: [PATCH 2/5] Set default separator to '-', remove 'none' config Signed-off-by: Chen Zhengwei Signed-off-by: chen zhengwei --- plugin/storage/es/esCleaner.py | 13 +++---------- plugin/storage/es/options.go | 24 +++++++----------------- plugin/storage/es/options_test.go | 29 +++++++++++++++++------------ 3 files changed, 27 insertions(+), 39 deletions(-) diff --git a/plugin/storage/es/esCleaner.py b/plugin/storage/es/esCleaner.py index 4b83f8d2db0..336704cf03d 100755 --- a/plugin/storage/es/esCleaner.py +++ b/plugin/storage/es/esCleaner.py @@ -34,7 +34,7 @@ def main(): prefix = os.getenv("INDEX_PREFIX", '') if prefix != '': prefix += '-' - separator = os.getenv("INDEX_DATE_SEPARATOR", '') + separator = os.getenv("INDEX_DATE_SEPARATOR", '-') if str2bool(os.getenv("ARCHIVE", 'false')): filter_archive_indices_rollover(ilo, prefix) @@ -54,15 +54,8 @@ def main(): def filter_main_indices(ilo, prefix, separator): - date_regex = "\d{4}-\d{2}-\d{2}" - time_string = "%Y-%m-%d" - if separator != "": - if separator == "none": - date_regex = "\d{4}\d{2}\d{2}" - time_string = "%Y%m%d" - else: - date_regex = "\d{4}" + separator + "\d{2}" + separator + "\d{2}" - time_string = "%Y" + separator + "%m" + separator + "%d" + date_regex = "\d{4}" + separator + "\d{2}" + separator + "\d{2}" + time_string = "%Y" + separator + "%m" + separator + "%d" ilo.filter_by_regex(kind='regex', value=prefix + "jaeger-(span|service|dependencies)-" + date_regex) empty_list(ilo, "No indices to delete") diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 68f1c16da43..bba622d67e6 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -60,9 +60,10 @@ const ( // default number of documents to return from a query (elasticsearch allowed limit) // see search.max_buckets and index.max_result_window - defaultMaxDocCount = 10_000 - defaultServerURL = "http://127.0.0.1:9200" - defaultIndexDateLayout = "2006-01-02" // date format for index e.g. 2020-01-20 + defaultMaxDocCount = 10_000 + defaultServerURL = "http://127.0.0.1:9200" + // default separator for Elasticsearch index date layout. + defaultIndexDateSeparator = "-" ) // TODO this should be moved next to config.Configuration struct (maybe ./flags package) @@ -215,8 +216,8 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { "Optional prefix of Jaeger indices. For example \"production\" creates \"production-jaeger-*\".") flagSet.String( nsConfig.namespace+suffixIndexDateSeparator, - "", - "Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20 \".") + defaultIndexDateSeparator, + "Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20 \". This config is "+defaultIndexDateSeparator+" by default") flagSet.Bool( nsConfig.namespace+suffixTagsAsFieldsAll, nsConfig.Tags.AllAsFields, @@ -335,16 +336,5 @@ func stripWhiteSpace(str string) string { } func initDateLayout(separator string) string { - var dateLayout string - if separator != "" { - switch separator { - case "none": - dateLayout = "20060102" - default: - dateLayout = fmt.Sprintf("2006%s01%s02", separator, separator) - } - } else { - dateLayout = defaultIndexDateLayout - } - return dateLayout + return fmt.Sprintf("2006%s01%s02", separator, separator) } diff --git a/plugin/storage/es/options_test.go b/plugin/storage/es/options_test.go index f25c25eeae9..1f7762f8a0f 100644 --- a/plugin/storage/es/options_test.go +++ b/plugin/storage/es/options_test.go @@ -55,7 +55,7 @@ func TestOptionsWithFlags(t *testing.T) { "--es.max-span-age=48h", "--es.num-shards=20", "--es.num-replicas=10", - "--es.index-date-separator=none", + "--es.index-date-separator=-", // a couple overrides "--es.aux.server-urls=3.3.3.3, 4.4.4.4", "--es.aux.max-span-age=24h", @@ -83,7 +83,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "!", primary.Tags.DotReplacement) assert.Equal(t, "./file.txt", primary.Tags.File) assert.Equal(t, "test,tags", primary.Tags.Include) - assert.Equal(t, "20060102", primary.IndexDateLayout) + assert.Equal(t, "2006-01-02", primary.IndexDateLayout) aux := opts.Get("es.aux") assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Servers) @@ -153,21 +153,26 @@ func TestMaxDocCount(t *testing.T) { } } -func TestDateLayout(t *testing.T) { +func TestIndexDateSeparator(t *testing.T) { testCases := []struct { - name string - separator string - wantLayout string + name string + flags []string + wantDateLayout string }{ - {"default", "", "2006-01-02"}, - {"crossbar", "-", "2006-01-02"}, - {"normal separator", ".", "2006.01.02"}, - {"none separator", "none", "20060102"}, + {"not defined", []string{}, "2006-01-02"}, + {"empty string", []string{"--es.index-date-separator="}, "20060102"}, + {"normal separator", []string{"--es.index-date-separator=."}, "2006.01.02"}, + {"crossbar", []string{"--es.index-date-separator=-"}, "2006-01-02"}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - dateLayout := initDateLayout(tc.separator) - assert.Equal(t, dateLayout, tc.wantLayout) + opts := NewOptions("es") + v, command := config.Viperize(opts.AddFlags) + command.ParseFlags(tc.flags) + opts.InitFromViper(v) + + primary := opts.GetPrimary() + assert.Equal(t, tc.wantDateLayout, primary.IndexDateLayout) }) } } From 4f722a1376f372935a83a2580cc3b38ba17b8bb6 Mon Sep 17 00:00:00 2001 From: chen zhengwei Date: Mon, 23 Nov 2020 21:20:24 +0800 Subject: [PATCH 3/5] Update options_test.go and separator usage. Signed-off-by: Chen Zhengwei Signed-off-by: chen zhengwei --- plugin/storage/es/options.go | 2 +- plugin/storage/es/options_test.go | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index bba622d67e6..43a354f2f39 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -217,7 +217,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { flagSet.String( nsConfig.namespace+suffixIndexDateSeparator, defaultIndexDateSeparator, - "Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20 \". This config is "+defaultIndexDateSeparator+" by default") + "Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20 \". Default: '"+defaultIndexDateSeparator+"'") flagSet.Bool( nsConfig.namespace+suffixTagsAsFieldsAll, nsConfig.Tags.AllAsFields, diff --git a/plugin/storage/es/options_test.go b/plugin/storage/es/options_test.go index 1f7762f8a0f..79a6c7d608a 100644 --- a/plugin/storage/es/options_test.go +++ b/plugin/storage/es/options_test.go @@ -55,7 +55,7 @@ func TestOptionsWithFlags(t *testing.T) { "--es.max-span-age=48h", "--es.num-shards=20", "--es.num-replicas=10", - "--es.index-date-separator=-", + "--es.index-date-separator=", // a couple overrides "--es.aux.server-urls=3.3.3.3, 4.4.4.4", "--es.aux.max-span-age=24h", @@ -83,7 +83,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "!", primary.Tags.DotReplacement) assert.Equal(t, "./file.txt", primary.Tags.File) assert.Equal(t, "test,tags", primary.Tags.Include) - assert.Equal(t, "2006-01-02", primary.IndexDateLayout) + assert.Equal(t, "20060102", primary.IndexDateLayout) aux := opts.Get("es.aux") assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Servers) @@ -159,10 +159,12 @@ func TestIndexDateSeparator(t *testing.T) { flags []string wantDateLayout string }{ - {"not defined", []string{}, "2006-01-02"}, - {"empty string", []string{"--es.index-date-separator="}, "20060102"}, - {"normal separator", []string{"--es.index-date-separator=."}, "2006.01.02"}, - {"crossbar", []string{"--es.index-date-separator=-"}, "2006-01-02"}, + {"not defined (default)", []string{}, "2006-01-02"}, + {"empty separator", []string{"--es.index-date-separator="}, "20060102"}, + {"dot separator", []string{"--es.index-date-separator=."}, "2006.01.02"}, + {"crossbar separator", []string{"--es.index-date-separator=-"}, "2006-01-02"}, + {"backslash separator", []string{"--es.index-date-separator=/"}, "2006/01/02"}, + {"multiple characters separator", []string{"--es.index-date-separator=' '"}, "2006' '01' '02"}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { From 7b4e2de44b1b6d0e3d2f9fa1e64b4b077946f474 Mon Sep 17 00:00:00 2001 From: chen zhengwei Date: Mon, 23 Nov 2020 22:01:19 +0800 Subject: [PATCH 4/5] Update options_test.go Signed-off-by: Chen Zhengwei Signed-off-by: chen zhengwei --- plugin/storage/es/options_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/storage/es/options_test.go b/plugin/storage/es/options_test.go index 79a6c7d608a..73bb1476e93 100644 --- a/plugin/storage/es/options_test.go +++ b/plugin/storage/es/options_test.go @@ -163,8 +163,8 @@ func TestIndexDateSeparator(t *testing.T) { {"empty separator", []string{"--es.index-date-separator="}, "20060102"}, {"dot separator", []string{"--es.index-date-separator=."}, "2006.01.02"}, {"crossbar separator", []string{"--es.index-date-separator=-"}, "2006-01-02"}, - {"backslash separator", []string{"--es.index-date-separator=/"}, "2006/01/02"}, - {"multiple characters separator", []string{"--es.index-date-separator=' '"}, "2006' '01' '02"}, + {"slash separator", []string{"--es.index-date-separator=/"}, "2006/01/02"}, + {"empty string with single quotes", []string{"--es.index-date-separator=''"}, "2006''01''02"}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { From d73a9e08e5c970532d3f4375e5867b09b366a17a Mon Sep 17 00:00:00 2001 From: chen zhengwei Date: Wed, 2 Dec 2020 13:47:16 +0800 Subject: [PATCH 5/5] Remove separator's default value description. Signed-off-by: Chen Zhengwei Signed-off-by: chen zhengwei --- plugin/storage/es/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 43a354f2f39..b0f212670d6 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -217,7 +217,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { flagSet.String( nsConfig.namespace+suffixIndexDateSeparator, defaultIndexDateSeparator, - "Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20 \". Default: '"+defaultIndexDateSeparator+"'") + "Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20 \".") flagSet.Bool( nsConfig.namespace+suffixTagsAsFieldsAll, nsConfig.Tags.AllAsFields,