Skip to content

Commit

Permalink
Replace archive span reader-writer with trace reader-writer
Browse files Browse the repository at this point in the history
Signed-off-by: Emmanuel Emonueje Ebenezer <eebenezer949@gmail.com>
  • Loading branch information
ekefan committed Jan 17, 2025
1 parent 7f16f49 commit 01b1a05
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 29 deletions.
6 changes: 4 additions & 2 deletions plugin/storage/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,12 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) {
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.ArchiveSpanReader, err = f.CreateArchiveSpanReader()
archiveSpanReader, err := f.CreateArchiveSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
s.ArchiveTraceReader = v1adapter.NewTraceReader(archiveSpanReader)
archiveSpanWriter, err := f.CreateArchiveSpanWriter()
require.NoError(t, err)
s.ArchiveTraceWriter = v1adapter.NewTraceWriter(archiveSpanWriter)
s.SamplingStore, err = f.CreateSamplingStore(0)
require.NoError(t, err)
s.initializeDependencyReaderAndWriter(t, f)
Expand Down
6 changes: 4 additions & 2 deletions plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,12 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool)
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.ArchiveSpanReader, err = f.CreateArchiveSpanReader()
archiveSpanReader, err := f.CreateArchiveSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
s.ArchiveTraceReader = v1adapter.NewTraceReader(archiveSpanReader)
archiveSpanWriter, err := f.CreateArchiveSpanWriter()
require.NoError(t, err)
s.ArchiveTraceWriter = v1adapter.NewTraceWriter(archiveSpanWriter)

dependencyReader, err := f.CreateDependencyReader()
require.NoError(t, err)
Expand Down
7 changes: 4 additions & 3 deletions plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) {
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.ArchiveSpanReader, err = f.CreateArchiveSpanReader()
archiveSpanReader, err := f.CreateArchiveSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
s.ArchiveTraceReader = v1adapter.NewTraceReader(archiveSpanReader)
archiveSpanWriter, err := f.CreateArchiveSpanWriter()
require.NoError(t, err)

s.ArchiveTraceWriter = v1adapter.NewTraceWriter(archiveSpanWriter)
// TODO DependencyWriter is not implemented in grpc store

s.CleanUp = s.cleanUp
Expand Down
47 changes: 27 additions & 20 deletions plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
samplemodel "github.com/jaegertracing/jaeger/storage/samplingstore/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
Expand All @@ -44,14 +43,14 @@ var fixtures embed.FS
// Some implementations may declare multiple tests, with different settings,
// and RunAll() under different conditions.
type StorageIntegration struct {
TraceWriter tracestore.Writer
TraceReader tracestore.Reader
ArchiveSpanReader spanstore.Reader
ArchiveSpanWriter spanstore.Writer
DependencyWriter dependencystore.Writer
DependencyReader depstore.Reader
SamplingStore samplingstore.Store
Fixtures []*QueryFixtures
TraceWriter tracestore.Writer
TraceReader tracestore.Reader
ArchiveTraceReader tracestore.Reader
ArchiveTraceWriter tracestore.Writer
DependencyWriter dependencystore.Writer
DependencyReader depstore.Reader
SamplingStore samplingstore.Store
Fixtures []*QueryFixtures

// TODO: remove this after all storage backends return spanKind from GetOperations
GetOperationsMissingSpanKind bool
Expand Down Expand Up @@ -190,25 +189,33 @@ func (s *StorageIntegration) testArchiveTrace(t *testing.T) {
}
defer s.cleanUp(t)
tID := model.NewTraceID(uint64(11), uint64(22))
expected := &model.Span{
OperationName: "archive_span",
StartTime: time.Now().Add(-time.Hour * 72 * 5).Truncate(time.Microsecond),
TraceID: tID,
SpanID: model.NewSpanID(55),
References: []model.SpanRef{},
Process: model.NewProcess("archived_service", model.KeyValues{}),
expected := &model.Trace{
Spans: []*model.Span{
{
OperationName: "archive_span",
StartTime: time.Now().Add(-time.Hour * 72 * 5).Truncate(time.Microsecond),
TraceID: tID,
SpanID: model.NewSpanID(55),
References: []model.SpanRef{},
Process: model.NewProcess("archived_service", model.KeyValues{}),
},
},
}

require.NoError(t, s.ArchiveSpanWriter.WriteSpan(context.Background(), expected))
require.NoError(t, s.ArchiveTraceWriter.WriteTraces(context.Background(), v1adapter.V1TraceToOtelTrace(expected)))

var actual *model.Trace
found := s.waitForCondition(t, func(_ *testing.T) bool {
var err error
actual, err = s.ArchiveSpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: tID})
return err == nil && len(actual.Spans) == 1
iterTraces := s.ArchiveTraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: tID.ToOTELTraceID()})
traces, err := v1adapter.V1TracesFromSeq2(iterTraces)
if len(traces) > 0 {
actual = traces[0]
}
return err == nil && len(actual.Spans) >= len(expected.Spans)
})
require.True(t, found)
CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual)
CompareTraces(t, expected, actual)
}

func (s *StorageIntegration) testGetLargeSpan(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/integration/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func (s *MemStorageIntegrationTestSuite) initialize(_ *testing.T) {
s.SamplingStore = memory.NewSamplingStore(2)
s.TraceReader = v1adapter.NewTraceReader(store)
s.TraceWriter = v1adapter.NewTraceWriter(store)
s.ArchiveSpanReader = archiveStore
s.ArchiveSpanWriter = archiveStore
s.ArchiveTraceReader = v1adapter.NewTraceReader(archiveStore)
s.ArchiveTraceWriter = v1adapter.NewTraceWriter(archiveStore)

// TODO DependencyWriter is not implemented in memory store

Expand Down

0 comments on commit 01b1a05

Please sign in to comment.