Skip to content

Commit

Permalink
Fix ClusterCacheTracker memory leak
Browse files Browse the repository at this point in the history
Decouple the code that creates uncached client from the code that
creates the cached client. This way, we don't start the cache until
the cached client is created.

This avoids scenarios where the `ClusterCacheTracker` cache is started
when only an uncached client is needed.

There is an existing code path, described in the original issue, where
the `ClusterCacheTracker` cache is started when only an uncached client
is needed. The cache is re-created later, and the initial cache is
still running continuously in the background.

Signed-off-by: Ionut Balutoiu <ionut@balutoiu.com>
  • Loading branch information
ionutbalutoiu committed Oct 12, 2023
1 parent a82c340 commit 36c0b2c
Showing 1 changed file with 56 additions and 24 deletions.
80 changes: 56 additions & 24 deletions controllers/remote/cluster_cache_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand Down Expand Up @@ -295,8 +296,8 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
return nil, errors.Wrapf(err, "error fetching REST client config for remote cluster %q", cluster.String())
}

// Create a client and a cache for the cluster.
c, uncachedClient, cache, err := t.createClient(ctx, config, cluster, indexes)
// Create an uncached client for the cluster.
uncachedClient, err := t.createUncachedClient(ctx, config, cluster)
if err != nil {
return nil, err
}
Expand All @@ -308,6 +309,9 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
return nil, err
}

var c client.Client
var cache *stoppableCache

// If the controller runs on the workload cluster, access the apiserver directly by using the
// CA and Host from the in-cluster configuration.
if runningOnCluster {
Expand All @@ -321,13 +325,19 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl
config.CAFile = inClusterConfig.CAFile
config.Host = inClusterConfig.Host

// Create a new client and overwrite the previously created client.
c, _, cache, err = t.createClient(ctx, config, cluster, indexes)
// Create a client and a cache for the cluster, using the in-cluster config.
c, cache, err = t.createCachedClient(ctx, config, cluster, indexes)
if err != nil {
return nil, errors.Wrap(err, "error creating client for self-hosted cluster")
}
log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with in-cluster service %q", cluster.String(), config.Host))
} else {
// Create a client and a cache for the cluster.
c, cache, err = t.createCachedClient(ctx, config, cluster, indexes)
if err != nil {
return nil, err
}

log.Info(fmt.Sprintf("Creating cluster accessor for cluster %q with the regular apiserver endpoint %q", cluster.String(), config.Host))
}

Expand Down Expand Up @@ -374,26 +384,58 @@ func (t *ClusterCacheTracker) runningOnWorkloadCluster(ctx context.Context, c cl
return t.controllerPodMetadata.UID == pod.UID, nil
}

// createClient creates a cached client, and uncached client and a mapper based on a rest.Config.
func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, indexes []Index) (client.Client, client.Client, *stoppableCache, error) {
// createHttpClientAndMapper creates a http client and a dynamic rest mapper for the given cluster, based on the rest.Config.
func (t *ClusterCacheTracker) createHttpClientAndMapper(ctx context.Context, config *rest.Config, cluster client.ObjectKey) (*http.Client, meta.RESTMapper, error) {
// Create a http client for the cluster.
httpClient, err := rest.HTTPClientFor(config)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating http client", cluster.String())
}

// Create a mapper for it
mapper, err := apiutil.NewDynamicRESTMapper(config, httpClient)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating dynamic rest mapper", cluster.String())
}

// Verify if we can get a rest mapping from the workload cluster apiserver.
// Note: This also checks if the apiserver is up in general. We do this already here
// to avoid further effort creating a cache and a client and to produce a clearer error message.
_, err = mapper.RESTMapping(corev1.SchemeGroupVersion.WithKind("Node").GroupKind(), corev1.SchemeGroupVersion.Version)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error getting rest mapping", cluster.String())
}

return httpClient, mapper, nil
}

// createUncachedClient creates an uncached client for the given cluster, based on the rest.Config.
func (t *ClusterCacheTracker) createUncachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey) (client.Client, error) {
// Create a http client and a mapper for the cluster.
httpClient, mapper, err := t.createHttpClientAndMapper(ctx, config, cluster)
if err != nil {
return nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String())
}

// Create the uncached client for the remote cluster
uncachedClient, err := client.New(config, client.Options{
Scheme: t.scheme,
Mapper: mapper,
HTTPClient: httpClient,
})
if err != nil {
return nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String())
}

return uncachedClient, nil
}

// createCachedClient creates a cached client for the given cluster, based on a rest.Config.
func (t *ClusterCacheTracker) createCachedClient(ctx context.Context, config *rest.Config, cluster client.ObjectKey, indexes []Index) (client.Client, *stoppableCache, error) {
// Create a http client and a mapper for the cluster.
httpClient, mapper, err := t.createHttpClientAndMapper(ctx, config, cluster)
if err != nil {
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String())
}

// Create the cache for the remote cluster
Expand All @@ -404,7 +446,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
}
remoteCache, err := cache.New(config, cacheOptions)
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating cache", cluster.String())
return nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q: error creating cache", cluster.String())
}

cacheCtx, cacheCtxCancel := context.WithCancel(ctx)
Expand All @@ -417,7 +459,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con

for _, index := range indexes {
if err := cache.IndexField(ctx, index.Object, index.Field, index.ExtractValue); err != nil {
return nil, nil, nil, errors.Wrapf(err, "error adding index for field %q to cache for remote cluster %q", index.Field, cluster.String())
return nil, nil, errors.Wrapf(err, "error adding index for field %q to cache for remote cluster %q", index.Field, cluster.String())
}
}

Expand All @@ -433,19 +475,9 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
},
})
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating client for remote cluster %q", cluster.String())
return nil, nil, errors.Wrapf(err, "error creating cached client for remote cluster %q", cluster.String())
}

// Create an uncached client. This is used in `runningOnWorkloadCluster` to ensure we don't continuously cache
// pods in the client.
uncachedClient, err := client.New(config, client.Options{
Scheme: t.scheme,
Mapper: mapper,
HTTPClient: httpClient,
})
if err != nil {
return nil, nil, nil, errors.Wrapf(err, "error creating uncached client for remote cluster %q", cluster.String())
}
// Start the cache!!!
go cache.Start(cacheCtx) //nolint:errcheck

Expand All @@ -454,7 +486,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
defer cacheSyncCtxCancel()
if !cache.WaitForCacheSync(cacheSyncCtx) {
cache.Stop()
return nil, nil, nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
return nil, nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, cacheCtx.Err())
}

// Wrap the cached client with a client that sets timeouts on all Get and List calls
Expand All @@ -471,7 +503,7 @@ func (t *ClusterCacheTracker) createClient(ctx context.Context, config *rest.Con
httpClient: httpClient,
})

return cachedClient, uncachedClient, cache, nil
return cachedClient, cache, nil
}

// deleteAccessor stops a clusterAccessor's cache and removes the clusterAccessor from the tracker.
Expand Down

0 comments on commit 36c0b2c

Please sign in to comment.