From be85cc8d7fa873343363b52c50c3139271c22394 Mon Sep 17 00:00:00 2001 From: Brandon Johnson Date: Tue, 16 Jul 2024 12:24:53 -0400 Subject: [PATCH] [cmd/opampsupervisor] Forward custom messages to/from agent (#33576) **Description:** * Forwards custom messages through the supervisor to/from the agent **Link to tracking Issue:** Closes #33575 **Testing:** * Tested against BindPlane * E2E tests aren't possible since there is no component using custom messages in contrib right now. * Added a couple unit tests --- ...visor-passthrough-custom-capabilities.yaml | 13 + cmd/opampsupervisor/supervisor/server.go | 62 +++-- cmd/opampsupervisor/supervisor/supervisor.go | 240 ++++++++++++---- .../supervisor/supervisor_test.go | 263 ++++++++++++++++++ 4 files changed, 496 insertions(+), 82 deletions(-) create mode 100644 .chloggen/feat_supervisor-passthrough-custom-capabilities.yaml diff --git a/.chloggen/feat_supervisor-passthrough-custom-capabilities.yaml b/.chloggen/feat_supervisor-passthrough-custom-capabilities.yaml new file mode 100644 index 000000000000..9b67b30c367d --- /dev/null +++ b/.chloggen/feat_supervisor-passthrough-custom-capabilities.yaml @@ -0,0 +1,13 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cmd/opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Adds support for forwarding custom messages to/from the agent" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33575] diff --git a/cmd/opampsupervisor/supervisor/server.go b/cmd/opampsupervisor/supervisor/server.go index 6e4c00697097..cd54f96336eb 100644 --- a/cmd/opampsupervisor/supervisor/server.go +++ b/cmd/opampsupervisor/supervisor/server.go @@ -13,34 +13,50 @@ import ( ) type flattenedSettings struct { - onMessageFunc func(conn serverTypes.Connection, message *protobufs.AgentToServer) - onConnectingFunc func(request *http.Request) - endpoint string + onMessageFunc func(conn serverTypes.Connection, message *protobufs.AgentToServer) + onConnectingFunc func(request *http.Request) (shouldConnect bool, rejectStatusCode int) + onConnectionCloseFunc func(conn serverTypes.Connection) + endpoint string } -func newServerSettings(fs flattenedSettings) server.StartSettings { +func (fs flattenedSettings) toServerSettings() server.StartSettings { return server.StartSettings{ Settings: server.Settings{ - Callbacks: server.CallbacksStruct{ - OnConnectingFunc: func(request *http.Request) serverTypes.ConnectionResponse { - if fs.onConnectingFunc != nil { - fs.onConnectingFunc(request) - } - return serverTypes.ConnectionResponse{ - Accept: true, - ConnectionCallbacks: server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ context.Context, conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { - if fs.onMessageFunc != nil { - fs.onMessageFunc(conn, message) - } - - return &protobufs.ServerToAgent{} - }, - }, - } - }, - }, + Callbacks: fs, }, ListenEndpoint: fs.endpoint, } } + +func (fs flattenedSettings) OnConnecting(request *http.Request) serverTypes.ConnectionResponse { + if fs.onConnectingFunc != nil { + shouldConnect, rejectStatusCode := fs.onConnectingFunc(request) + if !shouldConnect { + return serverTypes.ConnectionResponse{ + Accept: false, + HTTPStatusCode: rejectStatusCode, + } + } + } + + return serverTypes.ConnectionResponse{ + Accept: true, + ConnectionCallbacks: fs, + } +} + +func (fs flattenedSettings) OnConnected(_ context.Context, _ serverTypes.Connection) {} + +func (fs flattenedSettings) OnMessage(_ context.Context, conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if fs.onMessageFunc != nil { + fs.onMessageFunc(conn, message) + } + + return &protobufs.ServerToAgent{} +} + +func (fs flattenedSettings) OnConnectionClose(conn serverTypes.Connection) { + if fs.onConnectionCloseFunc != nil { + fs.onConnectionCloseFunc(conn) + } +} diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 36f6b77a7c48..3ef418a68f14 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -64,6 +64,8 @@ const ( agentConfigFilePath = "effective.yaml" ) +const maxBufferedCustomMessages = 10 + // Supervisor implements supervising of OpenTelemetry Collector and uses OpAMPClient // to work with an OpAMP Server. type Supervisor struct { @@ -92,6 +94,8 @@ type Supervisor struct { extraConfigTemplate *template.Template ownTelemetryTemplate *template.Template + agentConn *atomic.Value + // A config section to be added to the Collector's config to fetch its own metrics. // TODO: store this persistently so that when starting we can compose the effective // config correctly. @@ -120,6 +124,9 @@ type Supervisor struct { doneChan chan struct{} agentWG sync.WaitGroup + customMessageToServer chan *protobufs.CustomMessage + customMessageWG sync.WaitGroup + agentHasStarted bool agentStartHealthCheckAttempts int agentRestarting atomic.Bool @@ -142,6 +149,8 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { effectiveConfig: &atomic.Value{}, agentDescription: &atomic.Value{}, doneChan: make(chan struct{}), + customMessageToServer: make(chan *protobufs.CustomMessage, maxBufferedCustomMessages), + agentConn: &atomic.Value{}, } if err := s.createTemplates(); err != nil { return nil, err @@ -210,6 +219,12 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { s.runAgentProcess() }() + s.customMessageWG.Add(1) + go func() { + defer s.customMessageWG.Done() + s.forwardCustomMessagesToServerLoop() + }() + return s, nil } @@ -282,10 +297,11 @@ func (s *Supervisor) getBootstrapInfo() (err error) { // Start a one-shot server to get the Collector's agent description // using the Collector's OpAMP extension. - err = srv.Start(newServerSettings(flattenedSettings{ + err = srv.Start(flattenedSettings{ endpoint: fmt.Sprintf("localhost:%d", s.opampServerPort), - onConnectingFunc: func(_ *http.Request) { + onConnectingFunc: func(_ *http.Request) (bool, int) { connected.Store(true) + return true, http.StatusOK }, onMessageFunc: func(_ serverTypes.Connection, message *protobufs.AgentToServer) { if message.AgentDescription != nil { @@ -316,7 +332,7 @@ func (s *Supervisor) getBootstrapInfo() (err error) { done <- nil } }, - })) + }.toServerSettings()) if err != nil { return err } @@ -453,10 +469,20 @@ func (s *Supervisor) startOpAMPServer() error { s.logger.Debug("Starting OpAMP server...") - err = s.opampServer.Start(newServerSettings(flattenedSettings{ - endpoint: fmt.Sprintf("localhost:%d", s.opampServerPort), + connected := &atomic.Bool{} + + err = s.opampServer.Start(flattenedSettings{ + endpoint: fmt.Sprintf("localhost:%d", s.opampServerPort), + onConnectingFunc: func(_ *http.Request) (bool, int) { + // Only allow one agent to be connected the this server at a time. + alreadyConnected := connected.Swap(true) + return !alreadyConnected, http.StatusConflict + }, onMessageFunc: s.handleAgentOpAMPMessage, - })) + onConnectionCloseFunc: func(_ serverTypes.Connection) { + connected.Store(false) + }, + }.toServerSettings()) if err != nil { return err } @@ -466,11 +492,14 @@ func (s *Supervisor) startOpAMPServer() error { return nil } -func (s *Supervisor) handleAgentOpAMPMessage(_ serverTypes.Connection, message *protobufs.AgentToServer) { +func (s *Supervisor) handleAgentOpAMPMessage(conn serverTypes.Connection, message *protobufs.AgentToServer) { + s.agentConn.Store(conn) + s.logger.Debug("Received OpAMP message from the agent") if message.AgentDescription != nil { s.setAgentDescription(message.AgentDescription) } + if message.EffectiveConfig != nil { if cfg, ok := message.EffectiveConfig.GetConfigMap().GetConfigMap()[""]; ok { s.logger.Debug("Received effective config from agent") @@ -483,6 +512,51 @@ func (s *Supervisor) handleAgentOpAMPMessage(_ serverTypes.Connection, message * s.logger.Error("Got effective config message, but the instance config was not present. Ignoring effective config.") } } + + // Proxy client capabilities to server + if message.CustomCapabilities != nil { + err := s.opampClient.SetCustomCapabilities(message.CustomCapabilities) + if err != nil { + s.logger.Error("Failed to send custom capabilities to OpAMP server") + } + } + + // Proxy agent custom messages to server + if message.CustomMessage != nil { + select { + case s.customMessageToServer <- message.CustomMessage: + default: + s.logger.Warn( + "Buffer full, skipping forwarding custom message to server", + zap.String("capability", message.CustomMessage.Capability), + zap.String("type", message.CustomMessage.Type), + ) + } + } +} + +func (s *Supervisor) forwardCustomMessagesToServerLoop() { + for { + select { + case cm := <-s.customMessageToServer: + for { + sendingChan, err := s.opampClient.SendCustomMessage(cm) + switch { + case errors.Is(err, types.ErrCustomMessagePending): + s.logger.Debug("Custom message pending, waiting to send...") + <-sendingChan + continue + case err == nil: // OK + s.logger.Debug("Custom message forwarded to server.") + default: + s.logger.Error("Failed to send custom message to OpAMP server") + } + break + } + case <-s.doneChan: + return + } + } } // setAgentDescription sets the agent description, merging in any user-specified attributes from the supervisor configuration. @@ -1046,8 +1120,9 @@ func (s *Supervisor) Shutdown() { s.logger.Debug("Supervisor shutting down...") close(s.doneChan) - // Shutdown in order from producer to consumer (agent -> local OpAMP server -> client to remote OpAMP server). + // Shutdown in order from producer to consumer (agent -> customMessageForwarder -> local OpAMP server -> client to remote OpAMP server). s.agentWG.Wait() + s.customMessageWG.Wait() if s.opampServer != nil { s.logger.Debug("Stopping OpAMP server...") @@ -1106,65 +1181,18 @@ func (s *Supervisor) saveLastReceivedOwnTelemetrySettings(set *protobufs.Telemet func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { configChanged := false if msg.RemoteConfig != nil { - if err := s.saveLastReceivedConfig(msg.RemoteConfig); err != nil { - s.logger.Error("Could not save last received remote config", zap.Error(err)) - } - s.remoteConfig = msg.RemoteConfig - s.logger.Debug("Received remote config from server", zap.String("hash", fmt.Sprintf("%x", s.remoteConfig.ConfigHash))) - - var err error - configChanged, err = s.composeMergedConfig(s.remoteConfig) - if err != nil { - s.logger.Error("Error composing merged config. Reporting failed remote config status.", zap.Error(err)) - err = s.opampClient.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{ - LastRemoteConfigHash: msg.RemoteConfig.ConfigHash, - Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, - ErrorMessage: err.Error(), - }) - if err != nil { - s.logger.Error("Could not report failed OpAMP remote config status", zap.Error(err)) - } - } else { - err = s.opampClient.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{ - LastRemoteConfigHash: msg.RemoteConfig.ConfigHash, - Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED, - }) - if err != nil { - s.logger.Error("Could not report applied OpAMP remote config status", zap.Error(err)) - } - } + configChanged = configChanged || s.processRemoteConfigMessage(msg.RemoteConfig) } if msg.OwnMetricsConnSettings != nil { - if err := s.saveLastReceivedOwnTelemetrySettings(msg.OwnMetricsConnSettings, lastRecvOwnMetricsConfigFile); err != nil { - s.logger.Error("Could not save last received own telemetry settings", zap.Error(err)) - } - configChanged = s.setupOwnMetrics(ctx, msg.OwnMetricsConnSettings) || configChanged + configChanged = configChanged || s.processOwnMetricsConnSettingsMessage(ctx, msg.OwnMetricsConnSettings) } if msg.AgentIdentification != nil { - newInstanceID, err := uuid.FromBytes(msg.AgentIdentification.NewInstanceUid) - if err != nil { - s.logger.Error("Failed to parse instance UUID", zap.Error(err)) - } else { - s.logger.Debug("Agent identity is changing", - zap.String("old_id", s.persistentState.InstanceID.String()), - zap.String("new_id", newInstanceID.String())) - - err = s.persistentState.SetInstanceID(newInstanceID) - if err != nil { - s.logger.Error("Failed to persist new instance ID, instance ID will revert on restart.", zap.String("new_id", newInstanceID.String()), zap.Error(err)) - } - - err = s.opampClient.SetAgentDescription(s.agentDescription.Load().(*protobufs.AgentDescription)) - if err != nil { - s.logger.Error("Failed to send agent description to OpAMP server") - } - - configChanged = true - } + configChanged = configChanged || s.processAgentIdentificationMessage(msg.AgentIdentification) } + // Update the agent config if any messages have touched the config if configChanged { err := s.opampClient.UpdateEffectiveConfig(ctx) if err != nil { @@ -1178,6 +1206,100 @@ func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { default: } } + + messageToAgent := &protobufs.ServerToAgent{ + InstanceUid: s.persistentState.InstanceID[:], + } + haveMessageForAgent := false + // Proxy server capabilities to opamp extension + if msg.CustomCapabilities != nil { + messageToAgent.CustomCapabilities = msg.CustomCapabilities + haveMessageForAgent = true + } + + // Proxy server messages to opamp extension + if msg.CustomMessage != nil { + messageToAgent.CustomMessage = msg.CustomMessage + haveMessageForAgent = true + } + + // Send any messages that need proxying to the agent. + if haveMessageForAgent { + conn, ok := s.agentConn.Load().(serverTypes.Connection) + if ok { + err := conn.Send(ctx, messageToAgent) + if err != nil { + s.logger.Error("Error forwarding message to agent from server", zap.Error(err)) + } + } + } +} + +// processRemoteConfigMessage processes an AgentRemoteConfig message, returning true if the agent config has changed. +func (s *Supervisor) processRemoteConfigMessage(msg *protobufs.AgentRemoteConfig) bool { + if err := s.saveLastReceivedConfig(msg); err != nil { + s.logger.Error("Could not save last received remote config", zap.Error(err)) + } + + s.remoteConfig = msg + s.logger.Debug("Received remote config from server", zap.String("hash", fmt.Sprintf("%x", s.remoteConfig.ConfigHash))) + + var err error + configChanged, err := s.composeMergedConfig(s.remoteConfig) + if err != nil { + s.logger.Error("Error composing merged config. Reporting failed remote config status.", zap.Error(err)) + err = s.opampClient.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{ + LastRemoteConfigHash: msg.ConfigHash, + Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, + ErrorMessage: err.Error(), + }) + if err != nil { + s.logger.Error("Could not report failed OpAMP remote config status", zap.Error(err)) + } + } else { + err = s.opampClient.SetRemoteConfigStatus(&protobufs.RemoteConfigStatus{ + LastRemoteConfigHash: msg.ConfigHash, + Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED, + }) + if err != nil { + s.logger.Error("Could not report applied OpAMP remote config status", zap.Error(err)) + } + } + + return configChanged +} + +// processOwnMetricsConnSettingsMessage processes a TelemetryConnectionSettings message, returning true if the agent config has changed. +func (s *Supervisor) processOwnMetricsConnSettingsMessage(ctx context.Context, msg *protobufs.TelemetryConnectionSettings) bool { + if err := s.saveLastReceivedOwnTelemetrySettings(msg, lastRecvOwnMetricsConfigFile); err != nil { + s.logger.Error("Could not save last received own telemetry settings", zap.Error(err)) + } + return s.setupOwnMetrics(ctx, msg) +} + +// processAgentIdentificationMessage processes an AgentIdentification message, returning true if the agent config has changed. +func (s *Supervisor) processAgentIdentificationMessage(msg *protobufs.AgentIdentification) bool { + newInstanceID, err := uuid.FromBytes(msg.NewInstanceUid) + if err != nil { + s.logger.Error("Failed to parse instance UUID", zap.Error(err)) + return false + } + + s.logger.Debug("Agent identity is changing", + zap.String("old_id", s.persistentState.InstanceID.String()), + zap.String("new_id", newInstanceID.String())) + + err = s.persistentState.SetInstanceID(newInstanceID) + if err != nil { + s.logger.Error("Failed to persist new instance ID, instance ID will revert on restart.", zap.String("new_id", newInstanceID.String()), zap.Error(err)) + } + + err = s.opampClient.SetAgentDescription(s.agentDescription.Load().(*protobufs.AgentDescription)) + if err != nil { + s.logger.Error("Failed to send agent description to OpAMP server") + } + + return true } func (s *Supervisor) persistentStateFile() string { diff --git a/cmd/opampsupervisor/supervisor/supervisor_test.go b/cmd/opampsupervisor/supervisor/supervisor_test.go index ea4b1bafb610..1af3653e05cc 100644 --- a/cmd/opampsupervisor/supervisor/supervisor_test.go +++ b/cmd/opampsupervisor/supervisor/supervisor_test.go @@ -6,14 +6,17 @@ package supervisor import ( "bytes" "context" + "net" "os" "sync/atomic" "testing" + "time" "github.com/google/uuid" "github.com/open-telemetry/opamp-go/client" "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" + serverTypes "github.com/open-telemetry/opamp-go/server/types" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -140,6 +143,189 @@ func Test_onMessage(t *testing.T) { require.Equal(t, testUUID, s.persistentState.InstanceID) }) + + t.Run("CustomMessage - Custom message from server is forwarded to agent", func(t *testing.T) { + customMessage := &protobufs.CustomMessage{ + Capability: "teapot", + Type: "brew", + Data: []byte("chamomile"), + } + + testUUID := uuid.MustParse("018fee23-4a51-7303-a441-73faed7d9deb") + gotMessage := false + var agentConn serverTypes.Connection = &mockConn{ + sendFunc: func(_ context.Context, message *protobufs.ServerToAgent) error { + require.Equal(t, &protobufs.ServerToAgent{ + InstanceUid: testUUID[:], + CustomMessage: customMessage, + }, message) + gotMessage = true + + return nil + }, + } + + agentConnAtomic := &atomic.Value{} + agentConnAtomic.Store(agentConn) + + s := Supervisor{ + logger: zap.NewNop(), + pidProvider: defaultPIDProvider{}, + config: config.Supervisor{}, + hasNewConfig: make(chan struct{}, 1), + persistentState: &persistentState{InstanceID: testUUID}, + agentConfigOwnMetricsSection: &atomic.Value{}, + effectiveConfig: &atomic.Value{}, + agentConn: agentConnAtomic, + agentHealthCheckEndpoint: "localhost:8000", + customMessageToServer: make(chan *protobufs.CustomMessage, 10), + doneChan: make(chan struct{}), + } + + s.onMessage(context.Background(), &types.MessageData{ + CustomMessage: customMessage, + }) + + require.True(t, gotMessage, "Message was not sent to agent") + }) + + t.Run("CustomCapabilities - Custom capabilities from server are forwarded to agent", func(t *testing.T) { + customCapabilities := &protobufs.CustomCapabilities{ + Capabilities: []string{"coffeemaker", "teapot"}, + } + testUUID := uuid.MustParse("018fee23-4a51-7303-a441-73faed7d9deb") + gotMessage := false + var agentConn serverTypes.Connection = &mockConn{ + sendFunc: func(_ context.Context, message *protobufs.ServerToAgent) error { + require.Equal(t, &protobufs.ServerToAgent{ + InstanceUid: testUUID[:], + CustomCapabilities: customCapabilities, + }, message) + gotMessage = true + + return nil + }, + } + + agentConnAtomic := &atomic.Value{} + agentConnAtomic.Store(agentConn) + + s := Supervisor{ + logger: zap.NewNop(), + pidProvider: defaultPIDProvider{}, + config: config.Supervisor{}, + hasNewConfig: make(chan struct{}, 1), + persistentState: &persistentState{InstanceID: testUUID}, + agentConfigOwnMetricsSection: &atomic.Value{}, + effectiveConfig: &atomic.Value{}, + agentConn: agentConnAtomic, + agentHealthCheckEndpoint: "localhost:8000", + customMessageToServer: make(chan *protobufs.CustomMessage, 10), + doneChan: make(chan struct{}), + } + + s.onMessage(context.Background(), &types.MessageData{ + CustomCapabilities: customCapabilities, + }) + + require.True(t, gotMessage, "Message was not sent to agent") + }) + +} + +func Test_handleAgentOpAMPMessage(t *testing.T) { + t.Run("CustomMessage - Custom message from agent is forwarded to server", func(t *testing.T) { + customMessage := &protobufs.CustomMessage{ + Capability: "teapot", + Type: "brew", + Data: []byte("chamomile"), + } + + gotMessageChan := make(chan struct{}) + client := &mockOpAMPClient{ + sendCustomMessageFunc: func(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) { + require.Equal(t, customMessage, message) + + close(gotMessageChan) + msgChan := make(chan struct{}, 1) + msgChan <- struct{}{} + return msgChan, nil + }, + } + + testUUID := uuid.MustParse("018fee23-4a51-7303-a441-73faed7d9deb") + s := Supervisor{ + logger: zap.NewNop(), + pidProvider: defaultPIDProvider{}, + config: config.Supervisor{}, + hasNewConfig: make(chan struct{}, 1), + persistentState: &persistentState{InstanceID: testUUID}, + agentConfigOwnMetricsSection: &atomic.Value{}, + effectiveConfig: &atomic.Value{}, + agentConn: &atomic.Value{}, + opampClient: client, + agentHealthCheckEndpoint: "localhost:8000", + customMessageToServer: make(chan *protobufs.CustomMessage, 10), + doneChan: make(chan struct{}), + } + + loopDoneChan := make(chan struct{}) + go func() { + defer close(loopDoneChan) + s.forwardCustomMessagesToServerLoop() + }() + + s.handleAgentOpAMPMessage(&mockConn{}, &protobufs.AgentToServer{ + CustomMessage: customMessage, + }) + + select { + case <-gotMessageChan: + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for custom message to send") + } + + close(s.doneChan) + + select { + case <-loopDoneChan: + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for forward loop to stop") + } + }) + + t.Run("CustomCapabilities - Custom capabilities from agent are forwarded to server", func(t *testing.T) { + customCapabilities := &protobufs.CustomCapabilities{ + Capabilities: []string{"coffeemaker", "teapot"}, + } + + client := &mockOpAMPClient{ + setCustomCapabilitiesFunc: func(caps *protobufs.CustomCapabilities) error { + require.Equal(t, customCapabilities, caps) + return nil + }, + } + + testUUID := uuid.MustParse("018fee23-4a51-7303-a441-73faed7d9deb") + s := Supervisor{ + logger: zap.NewNop(), + pidProvider: defaultPIDProvider{}, + config: config.Supervisor{}, + hasNewConfig: make(chan struct{}, 1), + persistentState: &persistentState{InstanceID: testUUID}, + agentConfigOwnMetricsSection: &atomic.Value{}, + effectiveConfig: &atomic.Value{}, + agentConn: &atomic.Value{}, + opampClient: client, + agentHealthCheckEndpoint: "localhost:8000", + customMessageToServer: make(chan *protobufs.CustomMessage, 10), + doneChan: make(chan struct{}), + } + + s.handleAgentOpAMPMessage(&mockConn{}, &protobufs.AgentToServer{ + CustomCapabilities: customCapabilities, + }) + }) } type staticPIDProvider int @@ -147,3 +333,80 @@ type staticPIDProvider int func (s staticPIDProvider) PID() int { return int(s) } + +type mockOpAMPClient struct { + agentDesc *protobufs.AgentDescription + sendCustomMessageFunc func(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) + setCustomCapabilitiesFunc func(customCapabilities *protobufs.CustomCapabilities) error +} + +func (mockOpAMPClient) Start(_ context.Context, _ types.StartSettings) error { + return nil +} + +func (mockOpAMPClient) Stop(_ context.Context) error { + return nil +} + +func (m *mockOpAMPClient) SetAgentDescription(descr *protobufs.AgentDescription) error { + m.agentDesc = descr + return nil +} + +func (m mockOpAMPClient) AgentDescription() *protobufs.AgentDescription { + return m.agentDesc +} + +func (mockOpAMPClient) SetHealth(_ *protobufs.ComponentHealth) error { + return nil +} + +func (mockOpAMPClient) UpdateEffectiveConfig(_ context.Context) error { + return nil +} + +func (mockOpAMPClient) SetRemoteConfigStatus(_ *protobufs.RemoteConfigStatus) error { + return nil +} + +func (mockOpAMPClient) SetPackageStatuses(_ *protobufs.PackageStatuses) error { + return nil +} + +func (mockOpAMPClient) RequestConnectionSettings(_ *protobufs.ConnectionSettingsRequest) error { + return nil +} + +func (m mockOpAMPClient) SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error { + if m.setCustomCapabilitiesFunc != nil { + return m.setCustomCapabilitiesFunc(customCapabilities) + } + return nil +} + +func (m mockOpAMPClient) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) { + if m.sendCustomMessageFunc != nil { + return m.sendCustomMessageFunc(message) + } + + msgChan := make(chan struct{}, 1) + msgChan <- struct{}{} + return msgChan, nil +} + +type mockConn struct { + sendFunc func(ctx context.Context, message *protobufs.ServerToAgent) error +} + +func (mockConn) Connection() net.Conn { + return nil +} +func (m mockConn) Send(ctx context.Context, message *protobufs.ServerToAgent) error { + if m.sendFunc != nil { + return m.sendFunc(ctx, message) + } + return nil +} +func (mockConn) Disconnect() error { + return nil +}