-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[storage] Remove distinction between primary and archive
storage intefaces
#6567
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #6567 +/- ##
==========================================
- Coverage 96.24% 96.16% -0.08%
==========================================
Files 373 372 -1
Lines 21406 21198 -208
==========================================
- Hits 20602 20385 -217
- Misses 612 618 +6
- Partials 192 195 +3
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
|
||
func (f *Factory) getArchiveClient() es.Client { | ||
if c := f.archiveClient.Load(); c != nil { | ||
if c := f.client.Load(); c != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(not for this PR) I am curious if we still need this indirection. The primary reason it was added was to support TLS cert reload. But we don't do file-watcher-based reloads anymore, so nothing will ever trigger resetting of the client, and the cert rotation should be supported via OTEL's configtls indirection (as confirmed by the unit tests where we simulate rotation of certs). One this PR is merged let's open a good-first-issue ticket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, maybe we still use file-watcher for password file, then we'd keep this.
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
archive
storage in jaeger-v1archive
storage intefaces
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
Signed-off-by: Mahad Zaryab <mahadzaryab1@gmail.com>
@@ -138,10 +138,18 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*Q | |||
return qOpts, nil | |||
} | |||
|
|||
type archiveInitializer interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since this is used as part of (potentially) public API BuildQueryServiceOptions
, and since you only need one function, a better approach would be accepting a parameter of public type
type InitArchiveStorageFn func(logger *zap.Logger) (spanstore.Reader, spanstore.Writer)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's also easier to test with a function
@@ -114,7 +114,9 @@ type Configuration struct { | |||
// UseReadWriteAliases, if set to true, will use read and write aliases for indices. | |||
// Use this option with Elasticsearch rollover API. It requires an external component | |||
// to create aliases before startup and then performing its management. | |||
UseReadWriteAliases bool `mapstructure:"use_aliases"` | |||
UseReadWriteAliases bool `mapstructure:"use_aliases"` | |||
ReadAliasSuffix string `mapstructure:"-"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should add comment why these are not exposed via mapstructure:
|
||
// DefaultConfigurable is an interface that can be implement by some storage implementations | ||
// to provide a way to inherit configuration settings from another factory. | ||
type DefaultConfigurable interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inheritable?
) | ||
|
||
var ( // interface comformance checks | ||
_ storage.Factory = (*Factory)(nil) | ||
_ storage.Purger = (*Factory)(nil) | ||
_ storage.ArchiveFactory = (*Factory)(nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add Inheritable?
|
||
primaryClient atomic.Pointer[es.Client] | ||
archiveClient atomic.Pointer[es.Client] | ||
client atomic.Pointer[es.Client] | ||
|
||
watchers []*fswatcher.FSWatcher |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this still need to be a list, or just exactly one watcher? If one, and it's for a password, I would rename it to pwdFileWatcher
errs = append(errs, err) | ||
} | ||
} | ||
} | ||
for _, storageType := range f.SpanWriterTypes { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why this shouldn't simply be looping through f.factories
and f.archiveFactories
and closing them all? Seems weird that it depends on external list of types, considering that some types are added implicitly. Unless the same storage can be added more than once.
if role == "archive" { | ||
if primaryFactory, ok := f.factories[kind]; ok { | ||
if dc, ok := factory.(plugin.DefaultConfigurable); ok { | ||
dc.InheritSettingsFrom(primaryFactory) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the order here correct, i.e. inheriting settings after calling Initialize
?
if dc, ok := factory.(plugin.DefaultConfigurable); ok { | ||
dc.InheritSettingsFrom(primaryFactory) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -72,14 +67,6 @@ func (c *GRPCClient) StreamingSpanWriter() spanstore.Writer { | |||
return newStreamingSpanWriter(c.streamWriterClient) | |||
} | |||
|
|||
func (c *GRPCClient) ArchiveSpanReader() spanstore.Reader { | |||
return &archiveReader{client: c.archiveReaderClient} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we deleting these internal archiveReader/archiveWriter types?
@@ -52,7 +52,7 @@ func NewGRPCHandler(impl *GRPCHandlerStorageImpl) *GRPCHandler { | |||
// NewGRPCHandler creates a handler given implementations grouped by plugin services. | |||
func NewGRPCHandlerWithPlugins( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this? I don't see anything using this, I was expecting the remote-storage to utilize this framework, but if it doesn't then we can delete it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(maybe in a prequel PR)
"--cassandra-archive.servers=127.0.0.1", | ||
"--cassandra-archive.basic.allowed-authenticators=org.apache.cassandra.auth.PasswordAuthenticator", | ||
"--cassandra-archive.password=" + password, | ||
"--cassandra-archive.username=" + username, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we drop these? The inheritance should take care of them.
af := s.initializeESFactory(t, []string{ | ||
"--es-archive.enabled=true", | ||
fmt.Sprintf("--es-archive.tags-as-fields.all=%v", allTagsAsFields), | ||
fmt.Sprintf("--es-archive.index-prefix=%v", indexPrefix), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be dropped, for inheritance?
err := command.ParseFlags(s.flags) | ||
require.NoError(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err := command.ParseFlags(s.flags) | |
require.NoError(t, err) | |
require.NoError(t, command.ParseFlags(s.flags)) |
f := initFactory() | ||
af := initFactory() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this creates a gRPC proxy pointing to the same remote storage port? I.e. not really testing two separate storage namespaces? Maybe we can fix in the follow-up.
impl.ArchiveSpanWriter = func() spanstore.Writer { return qOpts.ArchiveSpanWriter } | ||
ar, aw := f.InitArchiveStorage(logger) | ||
impl.ArchiveSpanReader = func() spanstore.Reader { return ar } | ||
impl.ArchiveSpanWriter = func() spanstore.Writer { return aw } | ||
|
||
handler := shared.NewGRPCHandler(impl) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing Remote Storage API v1 has dedicated methods for WriteSpan and ArchiveSpan, exposed on the same gRPC service / port. Do we actually want that? Because the grpc storage client no longer knows about primary/archive, so it has only one SpanWriter that always calls WriteSpan
gRPC method.
Which problem is this PR solving?
Description of the changes
storage.ArchiveFactory
by refactoring all the storage implementations to remove the distinction between a primary and archive interface. Note that the concept of archive storage remains the same within Jaeger, we just now use the same interface to handle both primary and archive storages.How was this change tested?
Checklist
jaeger
:make lint test
jaeger-ui
:npm run lint
andnpm run test