Skip to content

Commit

Permalink
[processor/k8sattributes] Wait for ReplicaSet informer before startin…
Browse files Browse the repository at this point in the history
…g pod informer (#37138)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This PR is an alternative approach to the previous fix made for #37056,
which did not fully solve the issue of the deployment name not being
added to a pod after the initial informer sync

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes #37056

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Re-enabled the flaky E2E test

---------

Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>
  • Loading branch information
bacherfl authored Jan 21, 2025
1 parent afbb05f commit 04e745e
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 16 deletions.
27 changes: 27 additions & 0 deletions .chloggen/k8sattributes-wait-for-informers.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: k8sattributesprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Wait for the other informers to complete their initial sync before starting the pod informers

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37056]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 0 additions & 2 deletions processor/k8sattributesprocessor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,8 +1099,6 @@ func TestE2E_NamespacedRBACNoPodIP(t *testing.T) {
// make docker-otelcontribcol
// KUBECONFIG=/tmp/kube-config-otelcol-e2e-testing kind load docker-image otelcontribcol:latest
func TestE2E_ClusterRBACCollectorStartAfterTelemetryGen(t *testing.T) {
// TODO: Re-enable this test when the issue being tested here is fully solved: /~https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37056
t.Skip("Skipping test as /~https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37056 is not fully solved yet")
testDir := filepath.Join("testdata", "e2e", "clusterrbac")

k8sClient, err := k8stest.NewK8sClient(testKubeConfig)
Expand Down
47 changes: 33 additions & 14 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ func New(
// Start registers pod event handlers and starts watching the kubernetes cluster for pod changes.
func (c *WatchClient) Start() error {
synced := make([]cache.InformerSynced, 0)

// start the replicaSet informer first, as the replica sets need to be
// present at the time the pods are handled, to correctly establish the connection between pods and deployments
if c.Rules.DeploymentName || c.Rules.DeploymentUID {
Expand All @@ -225,18 +224,7 @@ func (c *WatchClient) Start() error {
go c.replicasetInformer.Run(c.stopCh)
}

reg, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handlePodAdd,
UpdateFunc: c.handlePodUpdate,
DeleteFunc: c.handlePodDelete,
})
if err != nil {
return err
}
synced = append(synced, reg.HasSynced)
go c.informer.Run(c.stopCh)

reg, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
reg, err := c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleNamespaceAdd,
UpdateFunc: c.handleNamespaceUpdate,
DeleteFunc: c.handleNamespaceDelete,
Expand All @@ -260,13 +248,28 @@ func (c *WatchClient) Start() error {
go c.nodeInformer.Run(c.stopCh)
}

reg, err = c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handlePodAdd,
UpdateFunc: c.handlePodUpdate,
DeleteFunc: c.handlePodDelete,
})
if err != nil {
return err
}

// start the podInformer with the prerequisite of the other informers to be finished first
go c.runInformerWithDependencies(c.informer, synced)

if c.waitForMetadata {
timeoutCh := make(chan struct{})
t := time.AfterFunc(c.waitForMetadataTimeout, func() {
close(timeoutCh)
})
defer t.Stop()
if !cache.WaitForCacheSync(timeoutCh, synced...) {
// Wait for the Pod informer to be completed.
// The other informers will already be finished at this point, as the pod informer
// waits for them be finished before it can run
if !cache.WaitForCacheSync(timeoutCh, reg.HasSynced) {
return errors.New("failed to wait for caches to sync")
}
}
Expand Down Expand Up @@ -1123,6 +1126,22 @@ func (c *WatchClient) getReplicaSet(uid string) (*ReplicaSet, bool) {
return nil, false
}

// runInformerWithDependencies starts the given informer. The second argument is a list of other informers that should complete
// before the informer is started. This is necessary e.g. for the pod informer which requires the replica set informer
// to be finished to correctly establish the connection to the replicaset/deployment it belongs to.
func (c *WatchClient) runInformerWithDependencies(informer cache.SharedInformer, dependencies []cache.InformerSynced) {
if len(dependencies) > 0 {
timeoutCh := make(chan struct{})
// TODO hard coding the timeout for now, check if we should make this configurable
t := time.AfterFunc(5*time.Second, func() {
close(timeoutCh)
})
defer t.Stop()
cache.WaitForCacheSync(timeoutCh, dependencies...)
}
informer.Run(c.stopCh)
}

// ignoreDeletedFinalStateUnknown returns the object wrapped in
// DeletedFinalStateUnknown. Useful in OnDelete resource event handlers that do
// not need the additional context.
Expand Down

0 comments on commit 04e745e

Please sign in to comment.