diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/config_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/config_test.go index f8a12a5cdb5..4014818d187 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/config_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/config_test.go @@ -48,7 +48,7 @@ func TestLoadConfigAndFlags(t *testing.T) { require.NoError(t, err) v, c := jConfig.Viperize(DefaultOptions().AddFlags, flags.AddConfigFileFlag) - err = c.ParseFlags([]string{"--es.server-urls=bar", "--es.index-prefix=staging", "--config-file=./testdata/jaeger-config.yaml"}) + err = c.ParseFlags([]string{"--es.server-urls=bar", "--es.index-prefix=staging", "--es.index-date-separator=-", "--config-file=./testdata/jaeger-config.yaml"}) require.NoError(t, err) err = flags.TryLoadConfigFile(v) @@ -74,6 +74,7 @@ func TestLoadConfigAndFlags(t *testing.T) { assert.Equal(t, []string{"someUrl"}, esCfg.Servers) assert.Equal(t, true, esCfg.CreateIndexTemplates) assert.Equal(t, "staging", esCfg.IndexPrefix) + assert.Equal(t, "2006-01-02", esCfg.IndexDateLayout) assert.Equal(t, int64(100), esCfg.NumShards) assert.Equal(t, "user", esCfg.Username) assert.Equal(t, "pass", esCfg.Password) diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go index 18274db2222..2d28d20efe6 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/integration_test.go @@ -45,6 +45,7 @@ const ( esHostPort = host + ":" + esPort esURL = "http://" + esHostPort indexPrefix = "integration-test" + indexDateLayout = "2006-01-02" tagKeyDeDotChar = "@" maxSpanAge = time.Hour * 72 numShards = 5 @@ -91,8 +92,9 @@ func (s *IntegrationTest) esCleanUp(allTagsAsFields bool) error { func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error { cfg := config.Configuration{ - Servers: []string{esURL}, - IndexPrefix: indexPrefix, + Servers: []string{esURL}, + IndexPrefix: indexPrefix, + IndexDateLayout: indexDateLayout, Tags: config.TagsAsFields{ AllAsFields: allTagsAsFields, }, @@ -118,6 +120,7 @@ func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error { } reader := esspanreader.NewEsSpanReader(elasticsearchClient, s.logger, esspanreader.Config{ IndexPrefix: indexPrefix, + IndexDateLayout: indexDateLayout, TagDotReplacement: tagKeyDeDotChar, MaxSpanAge: maxSpanAge, MaxDocCount: defaultMaxDocCount, @@ -125,7 +128,7 @@ func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error { s.SpanReader = reader depMapping := es.GetDependenciesMappings(numShards, numReplicas, esVersion) - depStore := esdependencyreader.NewDependencyStore(elasticsearchClient, s.logger, indexPrefix, defaultMaxDocCount) + depStore := esdependencyreader.NewDependencyStore(elasticsearchClient, s.logger, indexPrefix, indexDateLayout, defaultMaxDocCount) if err := depStore.CreateTemplates(depMapping); err != nil { return nil } diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go index c4608e332b7..859dc5f983a 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go @@ -78,8 +78,8 @@ func newEsSpanWriter(params config.Configuration, logger *zap.Logger, archive bo logger: logger, nameTag: tag.Insert(storagemetrics.TagExporterName(), name), client: client, - spanIndexName: esutil.NewIndexNameProvider(spanIndexBaseName, params.IndexPrefix, alias, archive), - serviceIndexName: esutil.NewIndexNameProvider(serviceIndexBaseName, params.IndexPrefix, alias, archive), + spanIndexName: esutil.NewIndexNameProvider(spanIndexBaseName, params.IndexPrefix, params.IndexDateLayout, alias, archive), + serviceIndexName: esutil.NewIndexNameProvider(serviceIndexBaseName, params.IndexPrefix, params.IndexDateLayout, alias, archive), translator: esmodeltranslator.NewTranslator(params.Tags.AllAsFields, tagsKeysAsFields, params.GetTagDotReplacement()), isArchive: archive, serviceCache: cache.NewLRUWithOptions( diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore_test.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore_test.go index ae3698b5b30..476b5f8650d 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore_test.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore_test.go @@ -146,8 +146,8 @@ func TestWriteSpans(t *testing.T) { w := esSpanWriter{ logger: zap.NewNop(), client: esClient, - spanIndexName: esutil.NewIndexNameProvider("span", "", esutil.AliasNone, false), - serviceIndexName: esutil.NewIndexNameProvider("service", "", esutil.AliasNone, false), + spanIndexName: esutil.NewIndexNameProvider("span", "", "2006-01-02", esutil.AliasNone, false), + serviceIndexName: esutil.NewIndexNameProvider("service", "", "2006-01-02", esutil.AliasNone, false), serviceCache: cache.NewLRU(1), nameTag: tag.Insert(storagemetrics.TagExporterName(), "name"), } diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go index 75f8b0ac506..b74001c6682 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/storagefactory.go @@ -88,6 +88,7 @@ func (s *StorageFactory) CreateSpanReader() (spanstore.Reader, error) { Archive: false, UseReadWriteAliases: cfg.GetUseReadWriteAliases(), IndexPrefix: cfg.GetIndexPrefix(), + IndexDateLayout: cfg.GetIndexDateLayout(), MaxSpanAge: cfg.GetMaxSpanAge(), MaxDocCount: cfg.GetMaxDocCount(), TagDotReplacement: cfg.GetTagDotReplacement(), @@ -101,7 +102,7 @@ func (s *StorageFactory) CreateDependencyReader() (dependencystore.Reader, error if err != nil { return nil, err } - return esdependencyreader.NewDependencyStore(client, s.logger, cfg.GetIndexPrefix(), cfg.GetMaxDocCount()), nil + return esdependencyreader.NewDependencyStore(client, s.logger, cfg.GetIndexPrefix(), cfg.GetIndexDateLayout(), cfg.GetMaxDocCount()), nil } // CreateArchiveSpanReader creates archive spanstore.Reader @@ -115,6 +116,7 @@ func (s *StorageFactory) CreateArchiveSpanReader() (spanstore.Reader, error) { Archive: true, UseReadWriteAliases: cfg.GetUseReadWriteAliases(), IndexPrefix: cfg.GetIndexPrefix(), + IndexDateLayout: cfg.GetIndexDateLayout(), MaxSpanAge: cfg.GetMaxSpanAge(), MaxDocCount: cfg.GetMaxDocCount(), TagDotReplacement: cfg.GetTagDotReplacement(), diff --git a/cmd/opentelemetry/app/internal/esutil/index_name.go b/cmd/opentelemetry/app/internal/esutil/index_name.go index 1ef07ad5642..7b08c396de4 100644 --- a/cmd/opentelemetry/app/internal/esutil/index_name.go +++ b/cmd/opentelemetry/app/internal/esutil/index_name.go @@ -16,8 +16,6 @@ package esutil import "time" -const indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20 - // Alias is used to configure the kind of index alias type Alias string @@ -33,11 +31,12 @@ const ( // IndexNameProvider creates standard index names from dates type IndexNameProvider struct { index string + dateLayout string useSingleIndex bool } // NewIndexNameProvider constructs a new IndexNameProvider -func NewIndexNameProvider(index, prefix string, alias Alias, archive bool) IndexNameProvider { +func NewIndexNameProvider(index, prefix, layout string, alias Alias, archive bool) IndexNameProvider { if prefix != "" { index = prefix + "-" + index } @@ -53,6 +52,7 @@ func NewIndexNameProvider(index, prefix string, alias Alias, archive bool) Index } return IndexNameProvider{ index: index, + dateLayout: layout, useSingleIndex: archive || (alias != AliasNone), } } @@ -63,12 +63,12 @@ func (n IndexNameProvider) IndexNameRange(start, end time.Time) []string { return []string{n.index} } var indices []string - firstIndex := n.index + start.UTC().Format(indexDateFormat) - currentIndex := n.index + end.UTC().Format(indexDateFormat) + firstIndex := n.index + start.UTC().Format(n.dateLayout) + currentIndex := n.index + end.UTC().Format(n.dateLayout) for currentIndex != firstIndex { indices = append(indices, currentIndex) end = end.Add(-24 * time.Hour) - currentIndex = n.index + end.UTC().Format(indexDateFormat) + currentIndex = n.index + end.UTC().Format(n.dateLayout) } indices = append(indices, firstIndex) return indices @@ -79,6 +79,6 @@ func (n IndexNameProvider) IndexName(date time.Time) string { if n.useSingleIndex { return n.index } - spanDate := date.UTC().Format(indexDateFormat) + spanDate := date.UTC().Format(n.dateLayout) return n.index + spanDate } diff --git a/cmd/opentelemetry/app/internal/esutil/index_name_test.go b/cmd/opentelemetry/app/internal/esutil/index_name_test.go index 90cec2c7b20..37f4d289ae2 100644 --- a/cmd/opentelemetry/app/internal/esutil/index_name_test.go +++ b/cmd/opentelemetry/app/internal/esutil/index_name_test.go @@ -31,33 +31,33 @@ func TestIndexNames(t *testing.T) { }{ { name: "index prefix", - nameProvider: NewIndexNameProvider("myindex", "production", AliasNone, false), + nameProvider: NewIndexNameProvider("myindex", "production", "2006-01-02", AliasNone, false), indices: []string{"production-myindex-0001-01-01"}, }, { name: "multiple dates", - nameProvider: NewIndexNameProvider("myindex", "", AliasNone, false), + nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasNone, false), indices: []string{"myindex-2020-08-30", "myindex-2020-08-29", "myindex-2020-08-28"}, start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC), }, { name: "use aliases", - nameProvider: NewIndexNameProvider("myindex", "", AliasRead, false), + nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasRead, false), indices: []string{"myindex-read"}, start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC), }, { name: "use archive", - nameProvider: NewIndexNameProvider("myindex", "", AliasNone, true), + nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasNone, true), indices: []string{"myindex-archive"}, start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC), }, { name: "use archive alias", - nameProvider: NewIndexNameProvider("myindex", "", AliasRead, true), + nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasRead, true), indices: []string{"myindex-archive-read"}, start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC), @@ -81,30 +81,30 @@ func TestIndexName(t *testing.T) { }{ { name: "index prefix", - nameProvider: NewIndexNameProvider("myindex", "production", AliasNone, false), + nameProvider: NewIndexNameProvider("myindex", "production", "2006-01-02", AliasNone, false), index: "production-myindex-0001-01-01", }, { name: "no prefix", - nameProvider: NewIndexNameProvider("myindex", "", AliasNone, false), + nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasNone, false), index: "myindex-2020-08-28", date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), }, { name: "use aliases", - nameProvider: NewIndexNameProvider("myindex", "", AliasWrite, false), + nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasWrite, false), index: "myindex-write", date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), }, { name: "use archive", - nameProvider: NewIndexNameProvider("myindex", "", AliasNone, true), + nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasNone, true), index: "myindex-archive", date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), }, { name: "use archive alias", - nameProvider: NewIndexNameProvider("myindex", "", AliasWrite, true), + nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasWrite, true), index: "myindex-archive-write", date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC), }, diff --git a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go index 174a54b2780..97bc3b5e6e3 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go +++ b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store.go @@ -35,31 +35,31 @@ const ( dependencyIndexBaseName = "jaeger-dependencies" timestampField = "timestamp" - - indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20 ) // DependencyStore defines Elasticsearch dependency store. type DependencyStore struct { - client esclient.ElasticsearchClient - logger *zap.Logger - indexPrefix string - maxDocCount int + client esclient.ElasticsearchClient + logger *zap.Logger + indexPrefix string + indexDateLayout string + maxDocCount int } var _ dependencystore.Reader = (*DependencyStore)(nil) var _ dependencystore.Writer = (*DependencyStore)(nil) // NewDependencyStore creates dependency store. -func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, indexPrefix string, maxDocCount int) *DependencyStore { +func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, indexPrefix, indexDateLayout string, maxDocCount int) *DependencyStore { if indexPrefix != "" { indexPrefix += "-" } return &DependencyStore{ - client: client, - logger: logger, - indexPrefix: indexPrefix + dependencyIndexBaseName + "-", - maxDocCount: maxDocCount, + client: client, + logger: logger, + indexPrefix: indexPrefix + dependencyIndexBaseName + "-", + indexDateLayout: indexDateLayout, + maxDocCount: maxDocCount, } } @@ -78,14 +78,14 @@ func (r *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D if err != nil { return err } - return r.client.Index(context.Background(), bytes.NewReader(data), indexWithDate(r.indexPrefix, ts), dependencyType) + return r.client.Index(context.Background(), bytes.NewReader(data), indexWithDate(r.indexPrefix, r.indexDateLayout, ts), dependencyType) } // GetDependencies implements dependencystore.Reader func (r *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { searchBody := getSearchBody(endTs, lookback, r.maxDocCount) - indices := dailyIndices(r.indexPrefix, endTs, lookback) + indices := dailyIndices(r.indexPrefix, r.indexDateLayout, endTs, lookback) response, err := r.client.Search(ctx, searchBody, r.maxDocCount, indices...) if err != nil { return nil, err @@ -114,18 +114,18 @@ func getSearchBody(endTs time.Time, lookback time.Duration, maxDocCount int) esc } } -func indexWithDate(indexNamePrefix string, date time.Time) string { - return indexNamePrefix + date.UTC().Format(indexDateFormat) +func indexWithDate(indexNamePrefix, indexDateLayout string, date time.Time) string { + return indexNamePrefix + date.UTC().Format(indexDateLayout) } -func dailyIndices(prefix string, ts time.Time, lookback time.Duration) []string { +func dailyIndices(prefix, format string, ts time.Time, lookback time.Duration) []string { var indices []string - firstIndex := indexWithDate(prefix, ts.Add(-lookback)) - currentIndex := indexWithDate(prefix, ts) + firstIndex := indexWithDate(prefix, format, ts.Add(-lookback)) + currentIndex := indexWithDate(prefix, format, ts) for currentIndex != firstIndex { indices = append(indices, currentIndex) ts = ts.Add(-24 * time.Hour) - currentIndex = indexWithDate(prefix, ts) + currentIndex = indexWithDate(prefix, format, ts) } return append(indices, firstIndex) } diff --git a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go index ac69e56b6e5..7f7736a25b8 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go +++ b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go @@ -37,7 +37,7 @@ const defaultMaxDocCount = 10_000 func TestCreateTemplates(t *testing.T) { client := &mockClient{} - store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) + store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount) template := "template" err := store.CreateTemplates(template) require.NoError(t, err) @@ -48,7 +48,7 @@ func TestCreateTemplates(t *testing.T) { func TestWriteDependencies(t *testing.T) { client := &mockClient{} - store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) + store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount) dependencies := []model.DependencyLink{{Parent: "foo", Child: "bar", CallCount: 1}} tsNow := time.Now() err := store.WriteDependencies(tsNow, dependencies) @@ -87,7 +87,7 @@ func TestGetDependencies(t *testing.T) { }, }, } - store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) + store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount) dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour) require.NoError(t, err) assert.Equal(t, timeDependencies, dbmodel.TimeDependencies{ @@ -109,7 +109,7 @@ func TestGetDependencies_err_unmarshall(t *testing.T) { }, }, } - store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) + store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount) dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour) require.Error(t, err) require.Contains(t, err.Error(), "invalid character") @@ -121,7 +121,7 @@ func TestGetDependencies_err_client(t *testing.T) { client := &mockClient{ searchErr: searchErr, } - store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount) + store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount) tsNow := time.Now() dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour) require.Error(t, err) @@ -151,11 +151,12 @@ func TestSearchBody(t *testing.T) { } func TestIndexWithDate(t *testing.T) { - assert.Equal(t, "foo-2020-09-30", indexWithDate("foo-", time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC))) + assert.Equal(t, "foo-2020-09-30", indexWithDate("foo-", "2006-01-02", + time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC))) } func TestDailyIndices(t *testing.T) { - indices := dailyIndices("foo-", time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC), time.Hour) + indices := dailyIndices("foo-", "2006-01-02", time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC), time.Hour) assert.Equal(t, []string{"foo-2020-09-30", "foo-2020-09-29"}, indices) } diff --git a/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go b/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go index 1a209b288f4..1d8f645117c 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go +++ b/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader.go @@ -71,6 +71,7 @@ type Config struct { Archive bool UseReadWriteAliases bool IndexPrefix string + IndexDateLayout string MaxSpanAge time.Duration MaxDocCount int TagDotReplacement string @@ -89,8 +90,8 @@ func NewEsSpanReader(client esclient.ElasticsearchClient, logger *zap.Logger, co maxSpanAge: config.MaxSpanAge, maxDocCount: config.MaxDocCount, converter: dbmodel.NewToDomain(config.TagDotReplacement), - spanIndexName: esutil.NewIndexNameProvider(spanIndexBaseName, config.IndexPrefix, alias, config.Archive), - serviceIndexName: esutil.NewIndexNameProvider(serviceIndexBaseName, config.IndexPrefix, alias, config.Archive), + spanIndexName: esutil.NewIndexNameProvider(spanIndexBaseName, config.IndexPrefix, config.IndexDateLayout, alias, config.Archive), + serviceIndexName: esutil.NewIndexNameProvider(serviceIndexBaseName, config.IndexPrefix, config.IndexDateLayout, alias, config.Archive), } } diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 861f7033356..9edaac60efb 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -59,6 +59,7 @@ type Configuration struct { BulkActions int `mapstructure:"-"` BulkFlushInterval time.Duration `mapstructure:"-"` IndexPrefix string `mapstructure:"index_prefix"` + IndexDateLayout string `mapstructure:"index_date_layout"` Tags TagsAsFields `mapstructure:"tags_as_fields"` Enabled bool `mapstructure:"-"` TLS tlscfg.Options `mapstructure:"tls"` @@ -89,6 +90,7 @@ type ClientBuilder interface { GetMaxSpanAge() time.Duration GetMaxDocCount() int GetIndexPrefix() string + GetIndexDateLayout() string GetTagsFilePath() string GetAllTagsAsFields() bool GetTagDotReplacement() string @@ -260,6 +262,11 @@ func (c *Configuration) GetIndexPrefix() string { return c.IndexPrefix } +// GetIndexDateLayout returns index date layout +func (c *Configuration) GetIndexDateLayout() string { + return c.IndexDateLayout +} + // GetTagsFilePath returns a path to file containing tag keys func (c *Configuration) GetTagsFilePath() string { return c.Tags.File diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index 46bfd6b5d30..64ea820c6fe 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -37,29 +37,31 @@ const ( // DependencyStore handles all queries and insertions to ElasticSearch dependencies type DependencyStore struct { - client es.Client - logger *zap.Logger - indexPrefix string - maxDocCount int + client es.Client + logger *zap.Logger + indexPrefix string + indexDateLayout string + maxDocCount int } // NewDependencyStore returns a DependencyStore -func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string, maxDocCount int) *DependencyStore { +func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix, indexDateLayout string, maxDocCount int) *DependencyStore { var prefix string if indexPrefix != "" { prefix = indexPrefix + "-" } return &DependencyStore{ - client: client, - logger: logger, - indexPrefix: prefix + dependencyIndex, - maxDocCount: maxDocCount, + client: client, + logger: logger, + indexPrefix: prefix + dependencyIndex, + indexDateLayout: indexDateLayout, + maxDocCount: maxDocCount, } } // WriteDependencies implements dependencystore.Writer#WriteDependencies. func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error { - indexName := indexWithDate(s.indexPrefix, ts) + indexName := indexWithDate(s.indexPrefix, s.indexDateLayout, ts) s.writeDependencies(indexName, ts, dependencies) return nil } @@ -82,7 +84,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe // GetDependencies returns all interservice dependencies func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { - indices := getIndices(s.indexPrefix, endTs, lookback) + indices := getIndices(s.indexPrefix, s.indexDateLayout, endTs, lookback) searchResult, err := s.client.Search(indices...). Size(s.maxDocCount). Query(buildTSQuery(endTs, lookback)). @@ -109,18 +111,18 @@ func buildTSQuery(endTs time.Time, lookback time.Duration) elastic.Query { return elastic.NewRangeQuery("timestamp").Gte(endTs.Add(-lookback)).Lte(endTs) } -func getIndices(prefix string, ts time.Time, lookback time.Duration) []string { +func getIndices(prefix, dateLayout string, ts time.Time, lookback time.Duration) []string { var indices []string - firstIndex := indexWithDate(prefix, ts.Add(-lookback)) - currentIndex := indexWithDate(prefix, ts) + firstIndex := indexWithDate(prefix, dateLayout, ts.Add(-lookback)) + currentIndex := indexWithDate(prefix, dateLayout, ts) for currentIndex != firstIndex { indices = append(indices, currentIndex) ts = ts.Add(-24 * time.Hour) - currentIndex = indexWithDate(prefix, ts) + currentIndex = indexWithDate(prefix, dateLayout, ts) } return append(indices, firstIndex) } -func indexWithDate(indexNamePrefix string, date time.Time) string { - return indexNamePrefix + date.UTC().Format("2006-01-02") +func indexWithDate(indexNamePrefix, indexDateLayout string, date time.Time) string { + return indexNamePrefix + date.UTC().Format(indexDateLayout) } diff --git a/plugin/storage/es/dependencystore/storage_test.go b/plugin/storage/es/dependencystore/storage_test.go index a4558f80be0..3465752d2e1 100644 --- a/plugin/storage/es/dependencystore/storage_test.go +++ b/plugin/storage/es/dependencystore/storage_test.go @@ -43,14 +43,14 @@ type depStorageTest struct { storage *DependencyStore } -func withDepStorage(indexPrefix string, maxDocCount int, fn func(r *depStorageTest)) { +func withDepStorage(indexPrefix, indexDateLayout string, maxDocCount int, fn func(r *depStorageTest)) { client := &mocks.Client{} logger, logBuffer := testutils.NewLogger() r := &depStorageTest{ client: client, logger: logger, logBuffer: logBuffer, - storage: NewDependencyStore(client, logger, indexPrefix, maxDocCount), + storage: NewDependencyStore(client, logger, indexPrefix, indexDateLayout, maxDocCount), } fn(r) } @@ -69,7 +69,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) { } for _, testCase := range testCases { client := &mocks.Client{} - r := NewDependencyStore(client, zap.NewNop(), testCase.prefix, defaultMaxDocCount) + r := NewDependencyStore(client, zap.NewNop(), testCase.prefix, "2006-01-02", defaultMaxDocCount) assert.Equal(t, testCase.expected+dependencyIndex, r.indexPrefix) } } @@ -90,9 +90,9 @@ func TestWriteDependencies(t *testing.T) { }, } for _, testCase := range testCases { - withDepStorage("", defaultMaxDocCount, func(r *depStorageTest) { + withDepStorage("", "2006-01-02", defaultMaxDocCount, func(r *depStorageTest) { fixedTime := time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC) - indexName := indexWithDate("", fixedTime) + indexName := indexWithDate("", "2006-01-02", fixedTime) writeService := &mocks.IndexService{} r.client.On("Index").Return(writeService) @@ -165,7 +165,7 @@ func TestGetDependencies(t *testing.T) { }, } for _, testCase := range testCases { - withDepStorage(testCase.indexPrefix, testCase.maxDocCount, func(r *depStorageTest) { + withDepStorage(testCase.indexPrefix, "2006-01-02", testCase.maxDocCount, func(r *depStorageTest) { fixedTime := time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC) searchService := &mocks.SearchService{} @@ -208,28 +208,28 @@ func TestGetIndices(t *testing.T) { prefix string }{ { - expected: []string{indexWithDate("", fixedTime), indexWithDate("", fixedTime.Add(-24*time.Hour))}, + expected: []string{indexWithDate("", "2006-01-02", fixedTime), indexWithDate("", "2006-01-02", fixedTime.Add(-24*time.Hour))}, lookback: 23 * time.Hour, prefix: "", }, { - expected: []string{indexWithDate("", fixedTime), indexWithDate("", fixedTime.Add(-24*time.Hour))}, + expected: []string{indexWithDate("", "2006-01-02", fixedTime), indexWithDate("", "2006-01-02", fixedTime.Add(-24*time.Hour))}, lookback: 13 * time.Hour, prefix: "", }, { - expected: []string{indexWithDate("foo:", fixedTime)}, + expected: []string{indexWithDate("foo:", "2006-01-02", fixedTime)}, lookback: 1 * time.Hour, prefix: "foo:", }, { - expected: []string{indexWithDate("foo-", fixedTime)}, + expected: []string{indexWithDate("foo-", "2006-01-02", fixedTime)}, lookback: 0, prefix: "foo-", }, } for _, testCase := range testCases { - assert.EqualValues(t, testCase.expected, getIndices(testCase.prefix, fixedTime, testCase.lookback)) + assert.EqualValues(t, testCase.expected, getIndices(testCase.prefix, "2006-01-02", fixedTime, testCase.lookback)) } } diff --git a/plugin/storage/es/esCleaner.py b/plugin/storage/es/esCleaner.py index b3fd8357b1d..336704cf03d 100755 --- a/plugin/storage/es/esCleaner.py +++ b/plugin/storage/es/esCleaner.py @@ -15,6 +15,7 @@ def main(): print('HOSTNAME ... specifies which Elasticsearch hosts URL to search and delete indices from.') print('TIMEOUT ... number of seconds to wait for master node response.'.format(TIMEOUT)) print('INDEX_PREFIX ... specifies index prefix.') + print('INDEX_DATE_SEPARATOR ... specifies index date separator.') print('ARCHIVE ... specifies whether to remove archive indices (only works for rollover) (default false).') print('ROLLOVER ... specifies whether to remove indices created by rollover (default false).') print('ES_USERNAME ... The username required by Elasticsearch.') @@ -33,6 +34,7 @@ def main(): prefix = os.getenv("INDEX_PREFIX", '') if prefix != '': prefix += '-' + separator = os.getenv("INDEX_DATE_SEPARATOR", '-') if str2bool(os.getenv("ARCHIVE", 'false')): filter_archive_indices_rollover(ilo, prefix) @@ -40,7 +42,7 @@ def main(): if str2bool(os.getenv("ROLLOVER", 'false')): filter_main_indices_rollover(ilo, prefix) else: - filter_main_indices(ilo, prefix) + filter_main_indices(ilo, prefix, separator) empty_list(ilo, 'No indices to delete') @@ -51,12 +53,15 @@ def main(): delete_indices.do_action() -def filter_main_indices(ilo, prefix): - ilo.filter_by_regex(kind='regex', value=prefix + "jaeger-(span|service|dependencies)-\d{4}-\d{2}-\d{2}") +def filter_main_indices(ilo, prefix, separator): + date_regex = "\d{4}" + separator + "\d{2}" + separator + "\d{2}" + time_string = "%Y" + separator + "%m" + separator + "%d" + + ilo.filter_by_regex(kind='regex', value=prefix + "jaeger-(span|service|dependencies)-" + date_regex) empty_list(ilo, "No indices to delete") # This excludes archive index as we use source='name' # source `creation_date` would include archive index - ilo.filter_by_age(source='name', direction='older', timestring='%Y-%m-%d', unit='days', unit_count=int(sys.argv[1])) + ilo.filter_by_age(source='name', direction='older', timestring=time_string, unit='days', unit_count=int(sys.argv[1])) def filter_main_indices_rollover(ilo, prefix): diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index b41724db7b0..41dcf9c9970 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -111,7 +111,8 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - reader := esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix(), f.primaryConfig.GetMaxDocCount()) + reader := esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix(), + f.primaryConfig.GetIndexDateLayout(), f.primaryConfig.GetMaxDocCount()) return reader, nil } @@ -145,6 +146,7 @@ func createSpanReader( MaxDocCount: cfg.GetMaxDocCount(), MaxSpanAge: cfg.GetMaxSpanAge(), IndexPrefix: cfg.GetIndexPrefix(), + IndexDateLayout: cfg.GetIndexDateLayout(), TagDotReplacement: cfg.GetTagDotReplacement(), UseReadWriteAliases: cfg.GetUseReadWriteAliases(), Archive: archive, @@ -171,6 +173,7 @@ func createSpanWriter( Logger: logger, MetricsFactory: mFactory, IndexPrefix: cfg.GetIndexPrefix(), + IndexDateLayout: cfg.GetIndexDateLayout(), AllTagsAsFields: cfg.GetAllTagsAsFields(), TagKeysAsFields: tags, TagDotReplacement: cfg.GetTagDotReplacement(), diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index c575befc8a4..b0f212670d6 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -17,6 +17,7 @@ package es import ( "flag" + "fmt" "math" "strings" "time" @@ -45,6 +46,7 @@ const ( suffixBulkFlushInterval = ".bulk.flush-interval" suffixTimeout = ".timeout" suffixIndexPrefix = ".index-prefix" + suffixIndexDateSeparator = ".index-date-separator" suffixTagsAsFields = ".tags-as-fields" suffixTagsAsFieldsAll = suffixTagsAsFields + ".all" suffixTagsAsFieldsInclude = suffixTagsAsFields + ".include" @@ -60,6 +62,8 @@ const ( // see search.max_buckets and index.max_result_window defaultMaxDocCount = 10_000 defaultServerURL = "http://127.0.0.1:9200" + // default separator for Elasticsearch index date layout. + defaultIndexDateSeparator = "-" ) // TODO this should be moved next to config.Configuration struct (maybe ./flags package) @@ -210,6 +214,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixIndexPrefix, nsConfig.IndexPrefix, "Optional prefix of Jaeger indices. For example \"production\" creates \"production-jaeger-*\".") + flagSet.String( + nsConfig.namespace+suffixIndexDateSeparator, + defaultIndexDateSeparator, + "Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20 \".") flagSet.Bool( nsConfig.namespace+suffixTagsAsFieldsAll, nsConfig.Tags.AllAsFields, @@ -281,6 +289,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval) cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout) cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix) + cfg.IndexDateLayout = initDateLayout(v.GetString(cfg.namespace + suffixIndexDateSeparator)) cfg.Tags.AllAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll) cfg.Tags.Include = v.GetString(cfg.namespace + suffixTagsAsFieldsInclude) cfg.Tags.File = v.GetString(cfg.namespace + suffixTagsFile) @@ -325,3 +334,7 @@ func (opt *Options) Get(namespace string) *config.Configuration { func stripWhiteSpace(str string) string { return strings.Replace(str, " ", "", -1) } + +func initDateLayout(separator string) string { + return fmt.Sprintf("2006%s01%s02", separator, separator) +} diff --git a/plugin/storage/es/options_test.go b/plugin/storage/es/options_test.go index a7148a82e68..73bb1476e93 100644 --- a/plugin/storage/es/options_test.go +++ b/plugin/storage/es/options_test.go @@ -55,10 +55,12 @@ func TestOptionsWithFlags(t *testing.T) { "--es.max-span-age=48h", "--es.num-shards=20", "--es.num-replicas=10", + "--es.index-date-separator=", // a couple overrides "--es.aux.server-urls=3.3.3.3, 4.4.4.4", "--es.aux.max-span-age=24h", "--es.aux.num-replicas=10", + "--es.aux.index-date-separator=.", "--es.tls.enabled=true", "--es.tls.skip-host-verify=true", "--es.tags-as-fields.all=true", @@ -81,6 +83,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "!", primary.Tags.DotReplacement) assert.Equal(t, "./file.txt", primary.Tags.File) assert.Equal(t, "test,tags", primary.Tags.Include) + assert.Equal(t, "20060102", primary.IndexDateLayout) aux := opts.Get("es.aux") assert.Equal(t, []string{"3.3.3.3", "4.4.4.4"}, aux.Servers) @@ -94,6 +97,7 @@ func TestOptionsWithFlags(t *testing.T) { assert.Equal(t, "!", aux.Tags.DotReplacement) assert.Equal(t, "./file.txt", aux.Tags.File) assert.Equal(t, "test,tags", aux.Tags.Include) + assert.Equal(t, "2006.01.02", aux.IndexDateLayout) } func TestMaxNumSpansUsage(t *testing.T) { @@ -148,3 +152,29 @@ func TestMaxDocCount(t *testing.T) { }) } } + +func TestIndexDateSeparator(t *testing.T) { + testCases := []struct { + name string + flags []string + wantDateLayout string + }{ + {"not defined (default)", []string{}, "2006-01-02"}, + {"empty separator", []string{"--es.index-date-separator="}, "20060102"}, + {"dot separator", []string{"--es.index-date-separator=."}, "2006.01.02"}, + {"crossbar separator", []string{"--es.index-date-separator=-"}, "2006-01-02"}, + {"slash separator", []string{"--es.index-date-separator=/"}, "2006/01/02"}, + {"empty string with single quotes", []string{"--es.index-date-separator=''"}, "2006''01''02"}, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + opts := NewOptions("es") + v, command := config.Viperize(opts.AddFlags) + command.ParseFlags(tc.flags) + opts.InitFromViper(v) + + primary := opts.GetPrimary() + assert.Equal(t, tc.wantDateLayout, primary.IndexDateLayout) + }) + } +} diff --git a/plugin/storage/es/spanstore/index_utils.go b/plugin/storage/es/spanstore/index_utils.go index b3497a3bdca..a9d4674aa5e 100644 --- a/plugin/storage/es/spanstore/index_utils.go +++ b/plugin/storage/es/spanstore/index_utils.go @@ -19,8 +19,8 @@ import ( ) // returns index name with date -func indexWithDate(indexPrefix string, date time.Time) string { - spanDate := date.UTC().Format("2006-01-02") +func indexWithDate(indexPrefix, indexDateLayout string, date time.Time) string { + spanDate := date.UTC().Format(indexDateLayout) return indexPrefix + spanDate } diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 5c7176d514e..ca6e3d249de 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -97,6 +97,7 @@ type SpanReader struct { serviceOperationStorage *ServiceOperationStorage spanIndexPrefix string serviceIndexPrefix string + indexDateLayout string spanConverter dbmodel.ToDomain timeRangeIndices timeRangeIndexFn sourceFn sourceFn @@ -111,6 +112,7 @@ type SpanReaderParams struct { MaxDocCount int MetricsFactory metrics.Factory IndexPrefix string + IndexDateLayout string TagDotReplacement string Archive bool UseReadWriteAliases bool @@ -125,6 +127,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { serviceOperationStorage: NewServiceOperationStorage(p.Client, p.Logger, 0), // the decorator takes care of metrics spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex), serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), + indexDateLayout: p.IndexDateLayout, spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases), sourceFn: getSourceFn(p.Archive, p.MaxDocCount), @@ -132,7 +135,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { } } -type timeRangeIndexFn func(indexName string, startTime time.Time, endTime time.Time) []string +type timeRangeIndexFn func(indexName string, indexDateLayout string, startTime time.Time, endTime time.Time) []string type sourceFn func(query elastic.Query, nextTime uint64) *elastic.SearchSource @@ -144,12 +147,12 @@ func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn { } else { archivePrefix = archiveIndexSuffix } - return func(indexName string, startTime time.Time, endTime time.Time) []string { + return func(indexName, indexDateLayout string, startTime time.Time, endTime time.Time) []string { return []string{archiveIndex(indexName, archivePrefix)} } } if useReadWriteAliases { - return func(indices string, startTime time.Time, endTime time.Time) []string { + return func(indices string, indexDateLayout string, startTime time.Time, endTime time.Time) []string { return []string{indices + "read"} } } @@ -171,14 +174,14 @@ func getSourceFn(archive bool, maxDocCount int) sourceFn { } // timeRangeIndices returns the array of indices that we need to query, based on query params -func timeRangeIndices(indexName string, startTime time.Time, endTime time.Time) []string { +func timeRangeIndices(indexName, indexDateLayout string, startTime time.Time, endTime time.Time) []string { var indices []string - firstIndex := indexWithDate(indexName, startTime) - currentIndex := indexWithDate(indexName, endTime) + firstIndex := indexWithDate(indexName, indexDateLayout, startTime) + currentIndex := indexWithDate(indexName, indexDateLayout, endTime) for currentIndex != firstIndex { indices = append(indices, currentIndex) endTime = endTime.Add(-24 * time.Hour) - currentIndex = indexWithDate(indexName, endTime) + currentIndex = indexWithDate(indexName, indexDateLayout, endTime) } indices = append(indices, firstIndex) return indices @@ -241,7 +244,7 @@ func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "GetServices") defer span.Finish() currentTime := time.Now() - jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) + jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.indexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime) return s.serviceOperationStorage.getServices(ctx, jaegerIndices, s.maxDocCount) } @@ -253,7 +256,7 @@ func (s *SpanReader) GetOperations( span, ctx := opentracing.StartSpanFromContext(ctx, "GetOperations") defer span.Finish() currentTime := time.Now() - jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, currentTime.Add(-s.maxSpanAge), currentTime) + jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.indexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime) operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName, s.maxDocCount) if err != nil { return nil, err @@ -326,7 +329,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st // Add an hour in both directions so that traces that straddle two indexes are retrieved. // i.e starts in one and ends in another. - indices := s.timeRangeIndices(s.spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour)) + indices := s.timeRangeIndices(s.spanIndexPrefix, s.indexDateLayout, startTime.Add(-time.Hour), endTime.Add(time.Hour)) nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour)) searchAfterTime := make(map[model.TraceID]uint64) @@ -512,7 +515,7 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra // } aggregation := s.buildTraceIDAggregation(traceQuery.NumTraces) boolQuery := s.buildFindTraceIDsQuery(traceQuery) - jaegerIndices := s.timeRangeIndices(s.spanIndexPrefix, traceQuery.StartTimeMin, traceQuery.StartTimeMax) + jaegerIndices := s.timeRangeIndices(s.spanIndexPrefix, s.indexDateLayout, traceQuery.StartTimeMin, traceQuery.StartTimeMax) searchService := s.client.Search(jaegerIndices...). Size(0). // set to 0 because we don't want actual documents. diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 997579d19e4..de841663016 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -176,7 +176,7 @@ func TestSpanReaderIndices(t *testing.T) { } for _, testCase := range testCases { r := NewSpanReader(testCase.params) - actual := r.timeRangeIndices(r.spanIndexPrefix, date, date) + actual := r.timeRangeIndices(r.spanIndexPrefix, "2006-01-02", date, date) assert.Equal(t, []string{testCase.index}, actual) } } @@ -435,6 +435,7 @@ func TestSpanReaderFindIndices(t *testing.T) { today := time.Date(1995, time.April, 21, 4, 12, 19, 95, time.UTC) yesterday := today.AddDate(0, 0, -1) twoDaysAgo := today.AddDate(0, 0, -2) + dateLayout := "2006-01-02" testCases := []struct { startTime time.Time @@ -445,30 +446,30 @@ func TestSpanReaderFindIndices(t *testing.T) { startTime: today.Add(-time.Millisecond), endTime: today, expected: []string{ - indexWithDate(spanIndex, today), + indexWithDate(spanIndex, dateLayout, today), }, }, { startTime: today.Add(-13 * time.Hour), endTime: today, expected: []string{ - indexWithDate(spanIndex, today), - indexWithDate(spanIndex, yesterday), + indexWithDate(spanIndex, dateLayout, today), + indexWithDate(spanIndex, dateLayout, yesterday), }, }, { startTime: today.Add(-48 * time.Hour), endTime: today, expected: []string{ - indexWithDate(spanIndex, today), - indexWithDate(spanIndex, yesterday), - indexWithDate(spanIndex, twoDaysAgo), + indexWithDate(spanIndex, dateLayout, today), + indexWithDate(spanIndex, dateLayout, yesterday), + indexWithDate(spanIndex, dateLayout, twoDaysAgo), }, }, } withSpanReader(func(r *spanReaderTest) { for _, testCase := range testCases { - actual := r.reader.timeRangeIndices(spanIndex, testCase.startTime, testCase.endTime) + actual := r.reader.timeRangeIndices(spanIndex, dateLayout, testCase.startTime, testCase.endTime) assert.EqualValues(t, testCase.expected, actual) } }) @@ -476,7 +477,7 @@ func TestSpanReaderFindIndices(t *testing.T) { func TestSpanReader_indexWithDate(t *testing.T) { withSpanReader(func(r *spanReaderTest) { - actual := indexWithDate(spanIndex, time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC)) + actual := indexWithDate(spanIndex, "2006-01-02", time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC)) assert.Equal(t, "jaeger-span-1995-04-21", actual) }) } diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index c7cf714e660..74be940a308 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -57,6 +57,7 @@ type SpanWriterParams struct { Logger *zap.Logger MetricsFactory metrics.Factory IndexPrefix string + IndexDateLayout string AllTagsAsFields bool TagKeysAsFields []string TagDotReplacement string @@ -82,7 +83,7 @@ func NewSpanWriter(p SpanWriterParams) *SpanWriter { }, ), spanConverter: dbmodel.NewFromDomain(p.AllTagsAsFields, p.TagKeysAsFields, p.TagDotReplacement), - spanServiceIndex: getSpanAndServiceIndexFn(p.Archive, p.UseReadWriteAliases, p.IndexPrefix), + spanServiceIndex: getSpanAndServiceIndexFn(p.Archive, p.UseReadWriteAliases, p.IndexPrefix, p.IndexDateLayout), } } @@ -102,7 +103,7 @@ func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate string) error // spanAndServiceIndexFn returns names of span and service indices type spanAndServiceIndexFn func(spanTime time.Time) (string, string) -func getSpanAndServiceIndexFn(archive, useReadWriteAliases bool, prefix string) spanAndServiceIndexFn { +func getSpanAndServiceIndexFn(archive, useReadWriteAliases bool, prefix, dateLayout string) spanAndServiceIndexFn { if prefix != "" { prefix += indexPrefixSeparator } @@ -123,7 +124,7 @@ func getSpanAndServiceIndexFn(archive, useReadWriteAliases bool, prefix string) } } return func(date time.Time) (string, string) { - return indexWithDate(spanIndexPrefix, date), indexWithDate(serviceIndexPrefix, date) + return indexWithDate(spanIndexPrefix, dateLayout, date), indexWithDate(serviceIndexPrefix, dateLayout, date) } } diff --git a/plugin/storage/es/spanstore/writer_test.go b/plugin/storage/es/spanstore/writer_test.go index 1e1292c33e0..1ac8070c5aa 100644 --- a/plugin/storage/es/spanstore/writer_test.go +++ b/plugin/storage/es/spanstore/writer_test.go @@ -50,7 +50,7 @@ func withSpanWriter(fn func(w *spanWriterTest)) { client: client, logger: logger, logBuffer: logBuffer, - writer: NewSpanWriter(SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory}), + writer: NewSpanWriter(SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexDateLayout: "2006-01-02"}), } fn(w) } @@ -62,31 +62,32 @@ func TestSpanWriterIndices(t *testing.T) { logger, _ := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) date := time.Now() - dateFormat := date.UTC().Format("2006-01-02") + layout := "2006-01-02" + dateFormat := date.UTC().Format(layout) testCases := []struct { indices []string params SpanWriterParams }{ {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "", Archive: false}, + IndexPrefix: "", IndexDateLayout: layout, Archive: false}, indices: []string{spanIndex + dateFormat, serviceIndex + dateFormat}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "", UseReadWriteAliases: true}, + IndexPrefix: "", IndexDateLayout: layout, UseReadWriteAliases: true}, indices: []string{spanIndex + "write", serviceIndex + "write"}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", Archive: false}, + IndexPrefix: "foo:", IndexDateLayout: layout, Archive: false}, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + dateFormat, "foo:" + indexPrefixSeparator + serviceIndex + dateFormat}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", UseReadWriteAliases: true}, + IndexPrefix: "foo:", IndexDateLayout: layout, UseReadWriteAliases: true}, indices: []string{"foo:-" + spanIndex + "write", "foo:-" + serviceIndex + "write"}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "", Archive: true}, + IndexPrefix: "", IndexDateLayout: layout, Archive: true}, indices: []string{spanIndex + archiveIndexSuffix, ""}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", Archive: true}, + IndexPrefix: "foo:", IndexDateLayout: layout, Archive: true}, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix, ""}}, {params: SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true}, + IndexPrefix: "foo:", IndexDateLayout: layout, Archive: true, UseReadWriteAliases: true}, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveWriteIndexSuffix, ""}}, } for _, testCase := range testCases { @@ -254,8 +255,8 @@ func TestSpanIndexName(t *testing.T) { span := &model.Span{ StartTime: date, } - spanIndexName := indexWithDate(spanIndex, span.StartTime) - serviceIndexName := indexWithDate(serviceIndex, span.StartTime) + spanIndexName := indexWithDate(spanIndex, "2006-01-02", span.StartTime) + serviceIndexName := indexWithDate(serviceIndex, "2006-01-02", span.StartTime) assert.Equal(t, "jaeger-span-1995-04-21", spanIndexName) assert.Equal(t, "jaeger-service-1995-04-21", serviceIndexName) } diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 301f68a3a3c..7eb1df8bac1 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -44,6 +44,7 @@ const ( queryHostPort = host + ":" + queryPort queryURL = "http://" + queryHostPort indexPrefix = "integration-test" + indexDateLayout = "2006-01-02" tagKeyDeDotChar = "@" maxSpanAge = time.Hour * 72 defaultMaxDocCount = 10_000 @@ -132,7 +133,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro Archive: archive, MaxDocCount: defaultMaxDocCount, }) - dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix, defaultMaxDocCount) + dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix, indexDateLayout, defaultMaxDocCount) depMapping := es.GetDependenciesMappings(5, 1, client.GetVersion()) err = dependencyStore.CreateTemplates(depMapping) if err != nil {