diff --git a/.chloggen/fix-onmessage.yaml b/.chloggen/fix-onmessage.yaml new file mode 100644 index 000000000000..8ed7500ed069 --- /dev/null +++ b/.chloggen/fix-onmessage.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cmd/opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Ensure the Supervisor processes all fields in a ServerToAgent message. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34349] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 3ef418a68f14..e20fb7808526 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -1180,16 +1180,17 @@ func (s *Supervisor) saveLastReceivedOwnTelemetrySettings(set *protobufs.Telemet func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { configChanged := false - if msg.RemoteConfig != nil { - configChanged = configChanged || s.processRemoteConfigMessage(msg.RemoteConfig) + + if msg.AgentIdentification != nil { + configChanged = s.processAgentIdentificationMessage(msg.AgentIdentification) || configChanged } - if msg.OwnMetricsConnSettings != nil { - configChanged = configChanged || s.processOwnMetricsConnSettingsMessage(ctx, msg.OwnMetricsConnSettings) + if msg.RemoteConfig != nil { + configChanged = s.processRemoteConfigMessage(msg.RemoteConfig) || configChanged } - if msg.AgentIdentification != nil { - configChanged = configChanged || s.processAgentIdentificationMessage(msg.AgentIdentification) + if msg.OwnMetricsConnSettings != nil { + configChanged = s.processOwnMetricsConnSettingsMessage(ctx, msg.OwnMetricsConnSettings) || configChanged } // Update the agent config if any messages have touched the config @@ -1299,7 +1300,14 @@ func (s *Supervisor) processAgentIdentificationMessage(msg *protobufs.AgentIdent s.logger.Error("Failed to send agent description to OpAMP server") } - return true + // Need to recalculate the Agent config so that the new agent identification is included in it. + configChanged, err := s.composeMergedConfig(s.remoteConfig) + if err != nil { + s.logger.Error("Error composing merged config with new instance ID", zap.Error(err)) + return false + } + + return configChanged } func (s *Supervisor) persistentStateFile() string { diff --git a/cmd/opampsupervisor/supervisor/supervisor_test.go b/cmd/opampsupervisor/supervisor/supervisor_test.go index 1af3653e05cc..3fdd1372b175 100644 --- a/cmd/opampsupervisor/supervisor/supervisor_test.go +++ b/cmd/opampsupervisor/supervisor/supervisor_test.go @@ -104,10 +104,12 @@ func Test_onMessage(t *testing.T) { persistentState: &persistentState{InstanceID: initialID}, agentDescription: agentDesc, agentConfigOwnMetricsSection: &atomic.Value{}, + mergedConfig: &atomic.Value{}, effectiveConfig: &atomic.Value{}, agentHealthCheckEndpoint: "localhost:8000", opampClient: client.NewHTTP(newLoggerFromZap(zap.NewNop())), } + require.NoError(t, s.createTemplates()) s.onMessage(context.Background(), &types.MessageData{ AgentIdentification: &protobufs.AgentIdentification{ @@ -131,9 +133,11 @@ func Test_onMessage(t *testing.T) { persistentState: &persistentState{InstanceID: testUUID}, agentDescription: agentDesc, agentConfigOwnMetricsSection: &atomic.Value{}, + mergedConfig: &atomic.Value{}, effectiveConfig: &atomic.Value{}, agentHealthCheckEndpoint: "localhost:8000", } + require.NoError(t, s.createTemplates()) s.onMessage(context.Background(), &types.MessageData{ AgentIdentification: &protobufs.AgentIdentification{ @@ -175,6 +179,7 @@ func Test_onMessage(t *testing.T) { hasNewConfig: make(chan struct{}, 1), persistentState: &persistentState{InstanceID: testUUID}, agentConfigOwnMetricsSection: &atomic.Value{}, + mergedConfig: &atomic.Value{}, effectiveConfig: &atomic.Value{}, agentConn: agentConnAtomic, agentHealthCheckEndpoint: "localhost:8000", @@ -231,6 +236,61 @@ func Test_onMessage(t *testing.T) { require.True(t, gotMessage, "Message was not sent to agent") }) + t.Run("Processes all ServerToAgent fields", func(t *testing.T) { + agentDesc := &atomic.Value{} + agentDesc.Store(&protobufs.AgentDescription{ + NonIdentifyingAttributes: []*protobufs.KeyValue{ + { + Key: "runtime.type", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{ + StringValue: "test", + }, + }, + }, + }, + }) + initialID := uuid.MustParse("018fee23-4a51-7303-a441-73faed7d9deb") + newID := uuid.MustParse("018fef3f-14a8-73ef-b63e-3b96b146ea38") + s := Supervisor{ + logger: zap.NewNop(), + pidProvider: defaultPIDProvider{}, + config: config.Supervisor{}, + hasNewConfig: make(chan struct{}, 1), + persistentState: &persistentState{InstanceID: initialID}, + agentDescription: agentDesc, + agentConfigOwnMetricsSection: &atomic.Value{}, + mergedConfig: &atomic.Value{}, + effectiveConfig: &atomic.Value{}, + agentHealthCheckEndpoint: "localhost:8000", + opampClient: client.NewHTTP(newLoggerFromZap(zap.NewNop())), + } + require.NoError(t, s.createTemplates()) + + s.onMessage(context.Background(), &types.MessageData{ + AgentIdentification: &protobufs.AgentIdentification{ + NewInstanceUid: newID[:], + }, + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": { + Body: []byte(""), + }, + }, + }, + }, + OwnMetricsConnSettings: &protobufs.TelemetryConnectionSettings{ + DestinationEndpoint: "http://localhost:4318", + }, + }) + + require.Equal(t, newID, s.persistentState.InstanceID) + t.Log(s.mergedConfig.Load()) + require.Contains(t, s.mergedConfig.Load(), "prometheus/own_metrics") + require.Contains(t, s.mergedConfig.Load(), newID.String()) + require.Contains(t, s.mergedConfig.Load(), "runtime.type: test") + }) } func Test_handleAgentOpAMPMessage(t *testing.T) {