From 04e745e0cc7486f1cf1465de0034e6cdb551426a Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Tue, 21 Jan 2025 21:18:17 +0100 Subject: [PATCH] [processor/k8sattributes] Wait for ReplicaSet informer before starting pod informer (#37138) #### Description This PR is an alternative approach to the previous fix made for #37056, which did not fully solve the issue of the deployment name not being added to a pod after the initial informer sync #### Link to tracking issue Fixes #37056 #### Testing Re-enabled the flaky E2E test --------- Signed-off-by: Florian Bacher --- .../k8sattributes-wait-for-informers.yaml | 27 +++++++++++ processor/k8sattributesprocessor/e2e_test.go | 2 - .../internal/kube/client.go | 47 +++++++++++++------ 3 files changed, 60 insertions(+), 16 deletions(-) create mode 100644 .chloggen/k8sattributes-wait-for-informers.yaml diff --git a/.chloggen/k8sattributes-wait-for-informers.yaml b/.chloggen/k8sattributes-wait-for-informers.yaml new file mode 100644 index 000000000000..e0fc2dc978bb --- /dev/null +++ b/.chloggen/k8sattributes-wait-for-informers.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: k8sattributesprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Wait for the other informers to complete their initial sync before starting the pod informers + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37056] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/k8sattributesprocessor/e2e_test.go b/processor/k8sattributesprocessor/e2e_test.go index 147b968ec913..7de25e62e3b7 100644 --- a/processor/k8sattributesprocessor/e2e_test.go +++ b/processor/k8sattributesprocessor/e2e_test.go @@ -1099,8 +1099,6 @@ func TestE2E_NamespacedRBACNoPodIP(t *testing.T) { // make docker-otelcontribcol // KUBECONFIG=/tmp/kube-config-otelcol-e2e-testing kind load docker-image otelcontribcol:latest func TestE2E_ClusterRBACCollectorStartAfterTelemetryGen(t *testing.T) { - // TODO: Re-enable this test when the issue being tested here is fully solved: /~https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37056 - t.Skip("Skipping test as /~https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37056 is not fully solved yet") testDir := filepath.Join("testdata", "e2e", "clusterrbac") k8sClient, err := k8stest.NewK8sClient(testKubeConfig) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 3e38cafeaaef..f2952046a56d 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -209,7 +209,6 @@ func New( // Start registers pod event handlers and starts watching the kubernetes cluster for pod changes. func (c *WatchClient) Start() error { synced := make([]cache.InformerSynced, 0) - // start the replicaSet informer first, as the replica sets need to be // present at the time the pods are handled, to correctly establish the connection between pods and deployments if c.Rules.DeploymentName || c.Rules.DeploymentUID { @@ -225,18 +224,7 @@ func (c *WatchClient) Start() error { go c.replicasetInformer.Run(c.stopCh) } - reg, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.handlePodAdd, - UpdateFunc: c.handlePodUpdate, - DeleteFunc: c.handlePodDelete, - }) - if err != nil { - return err - } - synced = append(synced, reg.HasSynced) - go c.informer.Run(c.stopCh) - - reg, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + reg, err := c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleNamespaceAdd, UpdateFunc: c.handleNamespaceUpdate, DeleteFunc: c.handleNamespaceDelete, @@ -260,13 +248,28 @@ func (c *WatchClient) Start() error { go c.nodeInformer.Run(c.stopCh) } + reg, err = c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.handlePodAdd, + UpdateFunc: c.handlePodUpdate, + DeleteFunc: c.handlePodDelete, + }) + if err != nil { + return err + } + + // start the podInformer with the prerequisite of the other informers to be finished first + go c.runInformerWithDependencies(c.informer, synced) + if c.waitForMetadata { timeoutCh := make(chan struct{}) t := time.AfterFunc(c.waitForMetadataTimeout, func() { close(timeoutCh) }) defer t.Stop() - if !cache.WaitForCacheSync(timeoutCh, synced...) { + // Wait for the Pod informer to be completed. + // The other informers will already be finished at this point, as the pod informer + // waits for them be finished before it can run + if !cache.WaitForCacheSync(timeoutCh, reg.HasSynced) { return errors.New("failed to wait for caches to sync") } } @@ -1123,6 +1126,22 @@ func (c *WatchClient) getReplicaSet(uid string) (*ReplicaSet, bool) { return nil, false } +// runInformerWithDependencies starts the given informer. The second argument is a list of other informers that should complete +// before the informer is started. This is necessary e.g. for the pod informer which requires the replica set informer +// to be finished to correctly establish the connection to the replicaset/deployment it belongs to. +func (c *WatchClient) runInformerWithDependencies(informer cache.SharedInformer, dependencies []cache.InformerSynced) { + if len(dependencies) > 0 { + timeoutCh := make(chan struct{}) + // TODO hard coding the timeout for now, check if we should make this configurable + t := time.AfterFunc(5*time.Second, func() { + close(timeoutCh) + }) + defer t.Stop() + cache.WaitForCacheSync(timeoutCh, dependencies...) + } + informer.Run(c.stopCh) +} + // ignoreDeletedFinalStateUnknown returns the object wrapped in // DeletedFinalStateUnknown. Useful in OnDelete resource event handlers that do // not need the additional context.