From 1a046b6dcf7efd3a7bbb03478e1a4787d7dfd45c Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Thu, 12 Jan 2023 13:01:24 +0100 Subject: [PATCH] reconciler/cache/reconciler: simplify and generalize Co-authored-by: Lukasz Szaszkiewicz Co-authored-by: Stefan Schimanski --- .../replication/replication_controller.go | 194 ++++---- .../replication/replication_reconcile.go | 300 ++++++------ .../replication/replication_reconcile_test.go | 456 +++++++----------- .../replication_reconcile_unstructured.go | 70 --- ...replication_reconcile_unstructured_test.go | 147 ------ pkg/server/controllers.go | 9 +- 6 files changed, 400 insertions(+), 776 deletions(-) diff --git a/pkg/reconciler/cache/replication/replication_controller.go b/pkg/reconciler/cache/replication/replication_controller.go index cd13f2bf1ad..bce2be1958d 100644 --- a/pkg/reconciler/cache/replication/replication_controller.go +++ b/pkg/reconciler/cache/replication/replication_controller.go @@ -24,9 +24,9 @@ import ( kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" kcpdynamic "github.com/kcp-dev/client-go/dynamic" kcpkubernetesinformers "github.com/kcp-dev/client-go/informers" - kcprbaclisters "github.com/kcp-dev/client-go/listers/rbac/v1" rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -35,14 +35,12 @@ import ( "k8s.io/klog/v2" apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1" + "github.com/kcp-dev/kcp/pkg/apis/core" corev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/core/v1alpha1" tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1" cacheclient "github.com/kcp-dev/kcp/pkg/cache/client" "github.com/kcp-dev/kcp/pkg/cache/client/shard" kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions" - apisv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/apis/v1alpha1" - corev1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/core/v1alpha1" - tenancyv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/tenancy/v1alpha1" "github.com/kcp-dev/kcp/pkg/indexers" "github.com/kcp-dev/kcp/pkg/logging" ) @@ -60,92 +58,79 @@ const ( func NewController( shardName string, dynamicCacheClient kcpdynamic.ClusterInterface, - dynamicLocalClient kcpdynamic.ClusterInterface, localKcpInformers kcpinformers.SharedInformerFactory, globalKcpInformers kcpinformers.SharedInformerFactory, localKubeInformers kcpkubernetesinformers.SharedInformerFactory, globalKubeInformers kcpkubernetesinformers.SharedInformerFactory, ) (*controller, error) { c := &controller{ - shardName: shardName, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), - dynamicKcpCacheClient: dynamicCacheClient, - dynamicKcpLocalClient: dynamicLocalClient, - - localAPIExportLister: localKcpInformers.Apis().V1alpha1().APIExports().Lister(), - localAPIResourceSchemaLister: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Lister(), - localShardLister: localKcpInformers.Core().V1alpha1().Shards().Lister(), - localWorkspaceTypeLister: localKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Lister(), - localClusterRoleLister: localKubeInformers.Rbac().V1().ClusterRoles().Lister(), - localClusterRoleBindingLister: localKubeInformers.Rbac().V1().ClusterRoleBindings().Lister(), - - globalAPIExportIndexer: globalKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(), - globalAPIResourceSchemaIndexer: globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(), - globalShardIndexer: globalKcpInformers.Core().V1alpha1().Shards().Informer().GetIndexer(), - globalWorkspaceTypeIndexer: globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer().GetIndexer(), - globalClusterRoleIndexer: globalKubeInformers.Rbac().V1().ClusterRoles().Informer().GetIndexer(), - globalClusterRoleBindingIndexer: globalKubeInformers.Rbac().V1().ClusterRoleBindings().Informer().GetIndexer(), - } - - indexers.AddIfNotPresentOrDie( - globalKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(), - cache.Indexers{ - ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace, + shardName: shardName, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), + dynamicCacheClient: dynamicCacheClient, + + gvrs: map[schema.GroupVersionResource]replicatedGVR{ + apisv1alpha1.SchemeGroupVersion.WithResource("apiexports"): { + kind: "APIExport", + local: localKcpInformers.Apis().V1alpha1().APIExports().Informer(), + global: globalKcpInformers.Apis().V1alpha1().APIExports().Informer(), + }, + apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"): { + kind: "APIResourceSchema", + local: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(), + global: globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(), + }, + corev1alpha1.SchemeGroupVersion.WithResource("shards"): { + kind: "Shard", + local: localKcpInformers.Core().V1alpha1().Shards().Informer(), + global: globalKcpInformers.Core().V1alpha1().Shards().Informer(), + }, + tenancyv1alpha1.SchemeGroupVersion.WithResource("workspacetypes"): { + kind: "WorkspaceType", + local: localKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer(), + global: globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer(), + }, + rbacv1.SchemeGroupVersion.WithResource("clusterroles"): { + kind: "ClusterRole", + filter: func(u *unstructured.Unstructured) bool { + return u.GetAnnotations()[core.ReplicateAnnotationKey] != "" + }, + local: localKubeInformers.Rbac().V1().ClusterRoles().Informer(), + global: globalKubeInformers.Rbac().V1().ClusterRoles().Informer(), + }, + rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"): { + kind: "ClusterRoleBinding", + filter: func(u *unstructured.Unstructured) bool { + return u.GetAnnotations()[core.ReplicateAnnotationKey] != "" + }, + local: localKubeInformers.Rbac().V1().ClusterRoleBindings().Informer(), + global: globalKubeInformers.Rbac().V1().ClusterRoleBindings().Informer(), + }, }, - ) - - indexers.AddIfNotPresentOrDie( - globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(), - cache.Indexers{ - ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace, - }, - ) - - indexers.AddIfNotPresentOrDie( - globalKcpInformers.Core().V1alpha1().Shards().Informer().GetIndexer(), - cache.Indexers{ - ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace, - }, - ) - - indexers.AddIfNotPresentOrDie( - globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer().GetIndexer(), - cache.Indexers{ - ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace, - }, - ) - - indexers.AddIfNotPresentOrDie( - globalKubeInformers.Rbac().V1().ClusterRoles().Informer().GetIndexer(), - cache.Indexers{ - ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace, - }, - ) - - indexers.AddIfNotPresentOrDie( - globalKubeInformers.Rbac().V1().ClusterRoleBindings().Informer().GetIndexer(), - cache.Indexers{ - ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace, - }, - ) - - localKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.objectInformerEventHandler(apisv1alpha1.SchemeGroupVersion.WithResource("apiexports"))) - globalKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.objectInformerEventHandler(apisv1alpha1.SchemeGroupVersion.WithResource("apiexports"))) - - localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.objectInformerEventHandler(apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"))) - globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.objectInformerEventHandler(apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"))) - - localKcpInformers.Core().V1alpha1().Shards().Informer().AddEventHandler(c.objectInformerEventHandler(corev1alpha1.SchemeGroupVersion.WithResource("shards"))) - globalKcpInformers.Core().V1alpha1().Shards().Informer().AddEventHandler(c.objectInformerEventHandler(corev1alpha1.SchemeGroupVersion.WithResource("shards"))) - - localKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer().AddEventHandler(c.objectInformerEventHandler(tenancyv1alpha1.SchemeGroupVersion.WithResource("workspacetypes"))) - globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer().AddEventHandler(c.objectInformerEventHandler(tenancyv1alpha1.SchemeGroupVersion.WithResource("workspacetypes"))) - - localKubeInformers.Rbac().V1().ClusterRoles().Informer().AddEventHandler(c.objectInformerEventHandler(rbacv1.SchemeGroupVersion.WithResource("clusterroles"))) - globalKubeInformers.Rbac().V1().ClusterRoles().Informer().AddEventHandler(c.objectInformerEventHandler(rbacv1.SchemeGroupVersion.WithResource("clusterroles"))) + } - localKubeInformers.Rbac().V1().ClusterRoleBindings().Informer().AddEventHandler(c.objectInformerEventHandler(rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"))) - globalKubeInformers.Rbac().V1().ClusterRoleBindings().Informer().AddEventHandler(c.objectInformerEventHandler(rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"))) + for gvr, info := range c.gvrs { + indexers.AddIfNotPresentOrDie( + info.global.GetIndexer(), + cache.Indexers{ + ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace, + }, + ) + + // shadow gvr to get the right value in the closure + gvr := gvr + + info.local.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) }, + UpdateFunc: func(_, obj interface{}) { c.enqueueObject(obj, gvr) }, + DeleteFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) }, + }) + + info.global.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { c.enqueueCacheObject(obj, gvr) }, + UpdateFunc: func(_, obj interface{}) { c.enqueueCacheObject(obj, gvr) }, + DeleteFunc: func(obj interface{}) { c.enqueueCacheObject(obj, gvr) }, + }) + } return c, nil } @@ -156,7 +141,17 @@ func (c *controller) enqueueObject(obj interface{}, gvr schema.GroupVersionResou runtime.HandleError(err) return } - gvrKey := fmt.Sprintf("%v::%v", gvr.String(), key) + gvrKey := fmt.Sprintf("%s.%s.%s::%s", gvr.Version, gvr.Resource, gvr.Group, key) + c.queue.Add(gvrKey) +} + +func (c *controller) enqueueCacheObject(obj interface{}, gvr schema.GroupVersionResource) { + key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(err) + return + } + gvrKey := fmt.Sprintf("%s.%s.%s::%s", gvr.Version, gvr.Resource, gvr.Group, key) c.queue.Add(gvrKey) } @@ -202,32 +197,17 @@ func (c *controller) processNextWorkItem(ctx context.Context) bool { return true } -func (c *controller) objectInformerEventHandler(gvr schema.GroupVersionResource) cache.ResourceEventHandler { - return cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) }, - UpdateFunc: func(_, obj interface{}) { c.enqueueObject(obj, gvr) }, - DeleteFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) }, - } -} - type controller struct { shardName string queue workqueue.RateLimitingInterface - dynamicKcpCacheClient kcpdynamic.ClusterInterface - dynamicKcpLocalClient kcpdynamic.ClusterInterface - - localAPIExportLister apisv1alpha1listers.APIExportClusterLister - localAPIResourceSchemaLister apisv1alpha1listers.APIResourceSchemaClusterLister - localShardLister corev1alpha1listers.ShardClusterLister - localWorkspaceTypeLister tenancyv1alpha1listers.WorkspaceTypeClusterLister - localClusterRoleLister kcprbaclisters.ClusterRoleClusterLister - localClusterRoleBindingLister kcprbaclisters.ClusterRoleBindingClusterLister - - globalAPIExportIndexer cache.Indexer - globalAPIResourceSchemaIndexer cache.Indexer - globalShardIndexer cache.Indexer - globalWorkspaceTypeIndexer cache.Indexer - globalClusterRoleIndexer cache.Indexer - globalClusterRoleBindingIndexer cache.Indexer + dynamicCacheClient kcpdynamic.ClusterInterface + + gvrs map[schema.GroupVersionResource]replicatedGVR +} + +type replicatedGVR struct { + kind string + filter func(u *unstructured.Unstructured) bool + global, local cache.SharedIndexInformer } diff --git a/pkg/reconciler/cache/replication/replication_reconcile.go b/pkg/reconciler/cache/replication/replication_reconcile.go index daad9f0eb93..f25045c147f 100644 --- a/pkg/reconciler/cache/replication/replication_reconcile.go +++ b/pkg/reconciler/cache/replication/replication_reconcile.go @@ -24,194 +24,180 @@ import ( kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" "github.com/kcp-dev/logicalcluster/v3" - rbacv1 "k8s.io/api/rbac/v1" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/tools/cache" - - apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1" - "github.com/kcp-dev/kcp/pkg/apis/core" - corev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/core/v1alpha1" - tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1" + "k8s.io/apimachinery/pkg/util/runtime" + genericrequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/klog/v2" ) func (c *controller) reconcile(ctx context.Context, gvrKey string) error { + // split apart the gvr from the key keyParts := strings.Split(gvrKey, "::") if len(keyParts) != 2 { return fmt.Errorf("incorrect key: %v, expected group.version.resource::key", gvrKey) } - switch keyParts[0] { - case apisv1alpha1.SchemeGroupVersion.WithResource("apiexports").String(): - return c.reconcileObject(ctx, - keyParts[1], - apisv1alpha1.SchemeGroupVersion.WithResource("apiexports"), - apisv1alpha1.SchemeGroupVersion.WithKind("APIExport"), - func(gvr schema.GroupVersionResource, cluster logicalcluster.Name, namespace, name string) (interface{}, error) { - return retrieveCacheObject(&gvr, c.globalAPIExportIndexer, c.shardName, cluster, namespace, name) - }, - func(cluster logicalcluster.Name, _, name string) (interface{}, bool, error) { - obj, err := c.localAPIExportLister.Cluster(cluster).Get(name) - if err != nil { - return nil, false, err - } - return obj, true, nil - }) - case apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas").String(): - return c.reconcileObject(ctx, - keyParts[1], - apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"), - apisv1alpha1.SchemeGroupVersion.WithKind("APIResourceSchema"), - func(gvr schema.GroupVersionResource, cluster logicalcluster.Name, namespace, name string) (interface{}, error) { - return retrieveCacheObject(&gvr, c.globalAPIResourceSchemaIndexer, c.shardName, cluster, namespace, name) - }, - func(cluster logicalcluster.Name, _, name string) (interface{}, bool, error) { - obj, err := c.localAPIResourceSchemaLister.Cluster(cluster).Get(name) - if err != nil { - return nil, false, err - } - return obj, true, nil - }) - case corev1alpha1.SchemeGroupVersion.WithResource("shards").String(): - return c.reconcileObject(ctx, - keyParts[1], - corev1alpha1.SchemeGroupVersion.WithResource("shards"), - corev1alpha1.SchemeGroupVersion.WithKind("Shard"), - func(gvr schema.GroupVersionResource, cluster logicalcluster.Name, namespace, name string) (interface{}, error) { - return retrieveCacheObject(&gvr, c.globalShardIndexer, c.shardName, cluster, namespace, name) - }, - func(cluster logicalcluster.Name, _, name string) (interface{}, bool, error) { - obj, err := c.localShardLister.Cluster(cluster).Get(name) - if err != nil { - return nil, false, err - } - return obj, true, nil - }) - case tenancyv1alpha1.SchemeGroupVersion.WithResource("workspacetypes").String(): - return c.reconcileObject(ctx, - keyParts[1], - tenancyv1alpha1.SchemeGroupVersion.WithResource("workspacetypes"), - tenancyv1alpha1.SchemeGroupVersion.WithKind("WorkspaceType"), - func(gvr schema.GroupVersionResource, cluster logicalcluster.Name, namespace, name string) (interface{}, error) { - return retrieveCacheObject(&gvr, c.globalWorkspaceTypeIndexer, c.shardName, cluster, namespace, name) - }, - func(cluster logicalcluster.Name, _, name string) (interface{}, bool, error) { - obj, err := c.localWorkspaceTypeLister.Cluster(cluster).Get(name) - if err != nil { - return nil, false, err - } - return obj, true, nil - }) - case rbacv1.SchemeGroupVersion.WithResource("clusterroles").String(): - return c.reconcileObject(ctx, - keyParts[1], - rbacv1.SchemeGroupVersion.WithResource("clusterroles"), - rbacv1.SchemeGroupVersion.WithKind("ClusterRole"), - func(gvr schema.GroupVersionResource, cluster logicalcluster.Name, namespace, name string) (interface{}, error) { - return retrieveCacheObject(&gvr, c.globalClusterRoleIndexer, c.shardName, cluster, namespace, name) - }, - func(cluster logicalcluster.Name, _, name string) (interface{}, bool, error) { - obj, err := c.localClusterRoleLister.Cluster(cluster).Get(name) - if err != nil { - return nil, false, err - } - return obj, obj.Annotations[core.ReplicateAnnotationKey] != "", nil - }) - case rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings").String(): - return c.reconcileObject(ctx, - keyParts[1], - rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"), - rbacv1.SchemeGroupVersion.WithKind("ClusterRoleBinding"), - func(gvr schema.GroupVersionResource, cluster logicalcluster.Name, namespace, name string) (interface{}, error) { - return retrieveCacheObject(&gvr, c.globalClusterRoleBindingIndexer, c.shardName, cluster, namespace, name) - }, - func(cluster logicalcluster.Name, _, name string) (interface{}, bool, error) { - obj, err := c.localClusterRoleBindingLister.Cluster(cluster).Get(name) - if err != nil { - return nil, false, err - } - return obj, obj.Annotations[core.ReplicateAnnotationKey] != "", nil - }) - default: - return fmt.Errorf("unsupported resource %v", keyParts[0]) + gvrParts := strings.SplitN(keyParts[0], ".", 3) + gvr := schema.GroupVersionResource{Version: gvrParts[0], Resource: gvrParts[1], Group: gvrParts[2]} + key := keyParts[1] + + info := c.gvrs[gvr] + + r := &reconciler{ + shardName: c.shardName, + getLocalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) { + key := kcpcache.ToClusterAwareKey(cluster.String(), namespace, name) + obj, exists, err := info.local.GetIndexer().GetByKey(key) + if !exists { + return nil, apierrors.NewNotFound(gvr.GroupResource(), name) + } else if err != nil { + return nil, err // necessary to avoid non-zero nil interface + } + + u, err := toUnstructured(obj) + if err != nil { + return nil, err + } + + if info.filter != nil && !info.filter(u) { + return nil, apierrors.NewNotFound(gvr.GroupResource(), name) + } + + if _, ok := obj.(*unstructured.Unstructured); ok { + u = u.DeepCopy() + } + u.SetKind(info.kind) + u.SetAPIVersion(gvr.GroupVersion().String()) + return u, nil + }, + getGlobalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) { + objs, err := info.global.GetIndexer().ByIndex(ByShardAndLogicalClusterAndNamespaceAndName, ShardAndLogicalClusterAndNamespaceKey(c.shardName, cluster, namespace, name)) + if err != nil { + return nil, err // necessary to avoid non-zero nil interface + } + if len(objs) == 0 { + return nil, apierrors.NewNotFound(gvr.GroupResource(), name) + } else if len(objs) > 1 { + return nil, fmt.Errorf("found multiple objects for %v|%v/%v", cluster, namespace, name) + } + + obj := objs[0] + + u, err := toUnstructured(obj) + if err != nil { + return nil, err + } + if _, ok := obj.(*unstructured.Unstructured); ok { + u = u.DeepCopy() + } + + u.SetKind(info.kind) + u.SetAPIVersion(gvr.GroupVersion().String()) + return u, nil + }, + createObject: func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + return c.dynamicCacheClient.Cluster(cluster.Path()).Resource(gvr).Namespace(obj.GetNamespace()).Create(ctx, obj, metav1.CreateOptions{}) + }, + updateObject: func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + return c.dynamicCacheClient.Cluster(cluster.Path()).Resource(gvr).Namespace(obj.GetNamespace()).Update(ctx, obj, metav1.UpdateOptions{}) + }, + deleteObject: func(ctx context.Context, cluster logicalcluster.Name, ns, name string) error { + return c.dynamicCacheClient.Cluster(cluster.Path()).Resource(gvr).Namespace(ns).Delete(ctx, name, metav1.DeleteOptions{}) + }, } + return r.reconcile(ctx, key) } -// reconcileObject makes sure that the object under the given key from the local shard is replicated to the cache server. +type reconciler struct { + shardName string + + getLocalCopy func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) + getGlobalCopy func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) + + createObject func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) + updateObject func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) + deleteObject func(ctx context.Context, cluster logicalcluster.Name, ns, name string) error +} + +// reconcile makes sure that the object under the given key from the local shard is replicated to the cache server. // the replication function handles the following cases: -// 1. creation of the object in the cache server when the cached object is not found by retrieveLocalObject -// 2. deletion of the object from the cache server when the original/local object was removed OR was not found by retrieveLocalObject +// 1. creation of the object in the cache server when the cached object is not found by getGlobalCopy +// 2. deletion of the object from the cache server when the original/local object was removed OR was not found by getLocalCopy // 3. modification of the cached object to match the original one when meta.annotations, meta.labels, spec or status are different -func (c *controller) reconcileObject(ctx context.Context, - key string, gvr schema.GroupVersionResource, gvk schema.GroupVersionKind, - retrieveCacheObject func(gvr schema.GroupVersionResource, cluster logicalcluster.Name, namespace, name string) (interface{}, error), - retrieveLocalObject func(cluster logicalcluster.Name, namespace, name string) (interface{}, bool, error)) error { - cluster, namespace, name, err := kcpcache.SplitMetaClusterNamespaceKey(key) +func (r *reconciler) reconcile(ctx context.Context, key string) error { + logger := klog.FromContext(ctx).WithValues("reconcilerKey", key) + + clusterName, ns, name, err := kcpcache.SplitMetaClusterNamespaceKey(key) if err != nil { - return err + runtime.HandleError(err) + return nil } - cacheObject, err := retrieveCacheObject(gvr, cluster, namespace, name) - if err != nil && !errors.IsNotFound(err) { - return err + + localCopy, err := r.getLocalCopy(clusterName, ns, name) + if err != nil && !apierrors.IsNotFound(err) { + runtime.HandleError(err) + return nil } - localObject, replicate, err := retrieveLocalObject(cluster, namespace, name) - if err != nil && !errors.IsNotFound(err) { - return err + localExists := !apierrors.IsNotFound(err) + + globalCopy, err := r.getGlobalCopy(clusterName, ns, name) + if err != nil && !apierrors.IsNotFound(err) { + runtime.HandleError(err) + return nil } - if errors.IsNotFound(err) { - // issue a live GET to make sure the localObject was removed - _, err = c.dynamicKcpLocalClient.Cluster(cluster.Path()).Resource(gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{}) - if err == nil { - return fmt.Errorf("the informer used by this controller is stale, the following %s resource was found on the local server: %s/%s/%s but was missing from the informer", gvr, cluster, namespace, name) - } - if !errors.IsNotFound(err) { - return err + globalExists := !apierrors.IsNotFound(err) + + // local is gone or being deleted. Delete in cache. + if !localExists || !localCopy.GetDeletionTimestamp().IsZero() { + if !globalExists { + return nil } - } else if !replicate { - localObject = nil // like if it wasn't there - } - var unstructuredCacheObject *unstructured.Unstructured - var unstructuredLocalObject *unstructured.Unstructured - if cacheObject != nil { - unstructuredCacheObject, err = toUnstructured(cacheObject) - if err != nil { + // Object doesn't exist anymore, delete it from the global cache. + logger.V(2).WithValues("cluster", clusterName, "namespace", ns, "name", name).Info("Deleting object from global cache") + if err := r.deleteObject(ctx, clusterName, ns, name); err != nil && !apierrors.IsNotFound(err) { return err } - unstructuredCacheObject.SetKind(gvk.Kind) - unstructuredCacheObject.SetAPIVersion(gvr.GroupVersion().String()) + return nil } - if localObject != nil { - unstructuredLocalObject, err = toUnstructured(localObject) - if err != nil { - return err + + // local exists, global doesn't. Create in cache. + if !globalExists { + // TODO: in the future the original RV will have to be stored in an annotation (?) + // so that the clients that need to modify the original/local object can do it + localCopy.SetResourceVersion("") + annotations := localCopy.GetAnnotations() + if annotations == nil { + annotations = map[string]string{} } - unstructuredLocalObject.SetKind(gvk.Kind) - unstructuredLocalObject.SetAPIVersion(gvr.GroupVersion().String()) - } - if cluster.Empty() && localObject != nil { - metadata, err := meta.Accessor(localObject) - if err != nil { + annotations[genericrequest.AnnotationKey] = r.shardName + localCopy.SetAnnotations(annotations) + + logger.V(2).Info("Creating object in global cache") + _, err := r.createObject(ctx, clusterName, localCopy) + if err != nil && !apierrors.IsAlreadyExists(err) { return err } - cluster = logicalcluster.From(metadata) + return nil } - return c.reconcileUnstructuredObjects(ctx, cluster, &gvr, unstructuredCacheObject, unstructuredLocalObject) -} - -func retrieveCacheObject(gvr *schema.GroupVersionResource, cacheIndex cache.Indexer, shard string, cluster logicalcluster.Name, namespace, name string) (interface{}, error) { - cacheObjects, err := cacheIndex.ByIndex(ByShardAndLogicalClusterAndNamespaceAndName, ShardAndLogicalClusterAndNamespaceKey(shard, cluster, namespace, name)) + // update global copy and compare + metaChanged, err := ensureMeta(globalCopy, localCopy) if err != nil { - return nil, err + return err } - if len(cacheObjects) == 0 { - return nil, errors.NewNotFound(gvr.GroupResource(), name) + remainingChanged, err := ensureRemaining(globalCopy, localCopy) + if err != nil { + return err } - if len(cacheObjects) > 1 { - return nil, fmt.Errorf("expected to find only one instance of %s resource for the key %s, found %d", gvr, ShardAndLogicalClusterAndNamespaceKey(shard, cluster, namespace, name), len(cacheObjects)) + if !metaChanged && !remainingChanged { + logger.V(4).Info("Object is up to date") + return nil } - return cacheObjects[0], nil + + logger.V(2).Info("Updating object in global cache") + _, err = r.updateObject(ctx, clusterName, globalCopy) // no need for patch because there is only this actor + return err } diff --git a/pkg/reconciler/cache/replication/replication_reconcile_test.go b/pkg/reconciler/cache/replication/replication_reconcile_test.go index 4a4107427c0..8791ccf2de4 100644 --- a/pkg/reconciler/cache/replication/replication_reconcile_test.go +++ b/pkg/reconciler/cache/replication/replication_reconcile_test.go @@ -18,346 +18,226 @@ package replication import ( "context" - "fmt" + "reflect" + "strings" "testing" "time" "github.com/google/go-cmp/cmp" - kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" "github.com/kcp-dev/logicalcluster/v3" - kcpfakedynamic "github.com/kcp-dev/client-go/third_party/k8s.io/client-go/dynamic/fake" - kcptesting "github.com/kcp-dev/client-go/third_party/k8s.io/client-go/testing" - "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/cache" - - apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1" - apisv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/apis/v1alpha1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/endpoints/request" ) -var scheme *runtime.Scheme - -func init() { - scheme = runtime.NewScheme() - _ = apisv1alpha1.AddToScheme(scheme) -} +func TestReconcile(t *testing.T) { + gr := schema.GroupResource{Group: "example.com", Resource: "elephants"} + elephant := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "example.com/v1", + "kind": "Elephant", + "metadata": map[string]interface{}{ + "name": "dumbo", + "namespace": "zoo", + "resourceVersion": "42", + "uid": "47-11", + "kcp.dev/cluster": "root", + }, + "spec": map[string]interface{}{ + "color": "pink", + }, + "status": map[string]interface{}{ + "weight": float64(42), + }, + }, + } -func TestReconcileAPIExports(t *testing.T) { scenarios := []struct { - name string - initialLocalAPIExports []runtime.Object - initialGlobalAPIExports []runtime.Object - initCacheFakeClientWithInitialAPIExports bool - reconcileKey string - validateFunc func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) + name string + getLocalCopy func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) + getGlobalCopy func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) + + key string + + expectedCreate *unstructured.Unstructured + expectedDeleteName string + expectedDeleteNamespace string + expectedUpdate *unstructured.Unstructured + + expectedError string }{ { - name: "case 1: creation of the object in the cache server", - initialLocalAPIExports: []runtime.Object{newAPIExport("foo")}, - reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), - validateFunc: func(t *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { - if len(localClientActions) != 0 { - t.Fatalf("unexpected REST calls were made to the localDynamicClient: %#v", localClientActions) - } - wasGlobalAPIExportValidated := false - for _, action := range cacheClientActions { - if action.Matches("create", "apiexports") { - createAction := action.(kcptesting.CreateAction) - if createAction.GetCluster().String() != "root" { - t.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", createAction.GetCluster()) - } - createdUnstructuredAPIExport := createAction.GetObject().(*unstructured.Unstructured) - globalAPIExportFromUnstructured := &apisv1alpha1.APIExport{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(createdUnstructuredAPIExport.Object, globalAPIExportFromUnstructured); err != nil { - t.Fatalf("failed to convert unstructured to APIExport: %v", err) - } - - expectedAPIExport := newAPIExportWithShardAnnotation("foo") - if !equality.Semantic.DeepEqual(globalAPIExportFromUnstructured, expectedAPIExport) { - t.Errorf("unexpected APIExport was created:\n%s", cmp.Diff(globalAPIExportFromUnstructured, expectedAPIExport)) - } - wasGlobalAPIExportValidated = true - break - } - } - if !wasGlobalAPIExportValidated { - t.Errorf("an APIExport on the cache sever wasn't created") + name: "case 1: creation of the object in the cache server", + getLocalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) { + if cluster.String() == "root" && namespace == "zoo" && name == "dumbo" { + return elephant.DeepCopy(), nil } + return nil, errors.NewNotFound(gr, name) }, + getGlobalCopy: getCopyNotFoundFunc, + key: "root|zoo/dumbo", + expectedCreate: WithShardName(WithoutResourceVersion(elephant.DeepCopy()), "root"), }, { name: "case 2: cached object is removed when local object was removed", - initialLocalAPIExports: []runtime.Object{ - func() *apisv1alpha1.APIExport { - t := metav1.NewTime(time.Now()) - apiExport := newAPIExport("foo") - apiExport.DeletionTimestamp = &t - apiExport.Finalizers = []string{"aFinalizer"} - return apiExport - }(), + getLocalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) { + return WithDeletionTimestamp(elephant.DeepCopy(), time.Now()), nil }, - initialGlobalAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, - initCacheFakeClientWithInitialAPIExports: true, - reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), - validateFunc: func(t *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { - if len(localClientActions) != 0 { - t.Fatalf("unexpected REST calls were made to the localDynamicClient: %#v", localClientActions) - } - wasGlobalAPIExportValidated := false - for _, action := range cacheClientActions { - if action.Matches("delete", "apiexports") { - deleteAction := action.(kcptesting.DeleteAction) - if deleteAction.GetCluster().String() != "root" { - t.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", deleteAction.GetCluster()) - } - if deleteAction.GetName() != "foo" { - t.Fatalf("unexpected APIExport was removed = %v, expected = %v", deleteAction.GetName(), "foo") - } - wasGlobalAPIExportValidated = true - break - } - } - if !wasGlobalAPIExportValidated { - t.Errorf("an APIExport on the cache sever wasn't deleted") - } + getGlobalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) { + return WithResourceVersion(WithShardName(elephant.DeepCopy(), "shard-1"), "7"), nil }, + key: "root|zoo/dumbo", + expectedDeleteName: "dumbo", + expectedDeleteNamespace: "zoo", }, { - name: "case 2: cached object is removed when local object was not found", - initialGlobalAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, - initCacheFakeClientWithInitialAPIExports: true, - reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), - validateFunc: func(t *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { - wasGlobalAPIExportDeletionValidated := false - wasGlobalAPIExportRetrievalValidated := false - for _, action := range localClientActions { - if action.Matches("get", "apiexports") { - getAction := action.(kcptesting.GetAction) - if getAction.GetCluster().String() != "root" { - t.Fatalf("wrong cluster = %s was targeted for localDynamicClient", getAction.GetCluster()) - } - if getAction.GetName() != "foo" { - t.Fatalf("unexpected APIExport was retrieved = %s, expected = %s", getAction.GetName(), "foo") - } - wasGlobalAPIExportRetrievalValidated = true - break - } - } - if !wasGlobalAPIExportRetrievalValidated { - t.Errorf("before deleting an APIExport the controller should live GET it") - } - for _, action := range cacheClientActions { - if action.Matches("delete", "apiexports") { - deleteAction := action.(kcptesting.DeleteAction) - if deleteAction.GetCluster().String() != "root" { - t.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", deleteAction.GetCluster()) - } - if deleteAction.GetName() != "foo" { - t.Fatalf("unexpected APIExport was removed = %v, expected = %v", deleteAction.GetName(), "foo") - } - wasGlobalAPIExportDeletionValidated = true - break - } - } - if !wasGlobalAPIExportDeletionValidated { - t.Errorf("an APIExport on the cache sever wasn't deleted") - } + name: "case 2: cached object is removed when local object was not found", + getLocalCopy: getCopyNotFoundFunc, + getGlobalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) { + return elephant.DeepCopy(), nil }, + key: "root|zoo/dumbo", + expectedDeleteName: "dumbo", + expectedDeleteNamespace: "zoo", }, { name: "case 3: update, metadata mismatch", - initialLocalAPIExports: []runtime.Object{ - func() *apisv1alpha1.APIExport { - apiExport := newAPIExport("foo") - apiExport.Labels["fooLabel"] = "fooLabelVal" - return apiExport - }(), + getLocalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) { + return WithLabel(elephant.DeepCopy(), "a", "b"), nil }, - initialGlobalAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, - initCacheFakeClientWithInitialAPIExports: true, - reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), - validateFunc: func(t *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { - if len(localClientActions) != 0 { - t.Fatalf("unexpected REST calls were made to the localDynamicClient: %#v", localClientActions) - } - wasGlobalAPIExportValidated := false - for _, action := range cacheClientActions { - if action.Matches("update", "apiexports") { - updateAction := action.(kcptesting.UpdateAction) - if updateAction.GetCluster().String() != "root" { - t.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", updateAction.GetCluster()) - } - updatedUnstructuredAPIExport := updateAction.GetObject().(*unstructured.Unstructured) - globalAPIExportFromUnstructured := &apisv1alpha1.APIExport{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredAPIExport.Object, globalAPIExportFromUnstructured); err != nil { - t.Fatalf("failed to convert unstructured to APIExport: %v", err) - } - - expectedAPIExport := newAPIExportWithShardAnnotation("foo") - expectedAPIExport.Labels["fooLabel"] = "fooLabelVal" - if !equality.Semantic.DeepEqual(globalAPIExportFromUnstructured, expectedAPIExport) { - t.Errorf("unexpected update to the APIExport:\n%s", cmp.Diff(globalAPIExportFromUnstructured, expectedAPIExport)) - } - wasGlobalAPIExportValidated = true - break - } - } - if !wasGlobalAPIExportValidated { - t.Errorf("an APIExport on the cache sever wasn't updated") - } + getGlobalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) { + return WithResourceVersion(elephant.DeepCopy(), "7"), nil }, + key: "root|zoo/dumbo", + expectedUpdate: WithLabel(WithResourceVersion(elephant.DeepCopy(), "7"), "a", "b"), }, { name: "case 3: update, spec changed", - initialLocalAPIExports: []runtime.Object{ - func() *apisv1alpha1.APIExport { - apiExport := newAPIExport("foo") - apiExport.Spec.PermissionClaims = []apisv1alpha1.PermissionClaim{{GroupResource: apisv1alpha1.GroupResource{}, IdentityHash: "abc"}} - return apiExport - }(), + getLocalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) { + return WithChange(elephant.DeepCopy(), []string{"spec", "color"}, "blue"), nil }, - initialGlobalAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, - initCacheFakeClientWithInitialAPIExports: true, - reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), - validateFunc: func(t *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { - if len(localClientActions) != 0 { - t.Fatalf("unexpected REST calls were made to the localDynamicClient: %#v", localClientActions) - } - wasGlobalAPIExportValidated := false - for _, action := range cacheClientActions { - if action.Matches("update", "apiexports") { - updateAction := action.(kcptesting.UpdateAction) - if updateAction.GetCluster().String() != "root" { - t.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", updateAction.GetCluster()) - } - updatedUnstructuredAPIExport := updateAction.GetObject().(*unstructured.Unstructured) - globalAPIExportFromUnstructured := &apisv1alpha1.APIExport{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredAPIExport.Object, globalAPIExportFromUnstructured); err != nil { - t.Fatalf("failed to convert unstructured to APIExport: %v", err) - } - - expectedAPIExport := newAPIExportWithShardAnnotation("foo") - expectedAPIExport.Spec.PermissionClaims = []apisv1alpha1.PermissionClaim{{GroupResource: apisv1alpha1.GroupResource{}, IdentityHash: "abc"}} - if !equality.Semantic.DeepEqual(globalAPIExportFromUnstructured, expectedAPIExport) { - t.Errorf("unexpected update to the APIExport:\n%s", cmp.Diff(globalAPIExportFromUnstructured, expectedAPIExport)) - } - wasGlobalAPIExportValidated = true - break - } - } - if !wasGlobalAPIExportValidated { - t.Errorf("an APIExport on the cache sever wasn't updated") - } + getGlobalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) { + return WithResourceVersion(elephant.DeepCopy(), "7"), nil }, + key: "root|zoo/dumbo", + expectedUpdate: WithChange(WithResourceVersion(elephant.DeepCopy(), "7"), []string{"spec", "color"}, "blue"), }, { name: "case 3: update, status changed", - initialLocalAPIExports: []runtime.Object{ - func() *apisv1alpha1.APIExport { - apiExport := newAPIExport("foo") - //nolint:staticcheck // SA1019 VirtualWorkspaces is deprecated but not removed yet - apiExport.Status.VirtualWorkspaces = []apisv1alpha1.VirtualWorkspace{{URL: "https://acme.dev"}} - return apiExport - }(), + getLocalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) { + return WithChange(elephant.DeepCopy(), []string{"status", "weight"}, "42.5"), nil }, - initialGlobalAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, - initCacheFakeClientWithInitialAPIExports: true, - reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), - validateFunc: func(t *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { - if len(localClientActions) != 0 { - t.Fatalf("unexpected REST calls were made to the localDynamicClient: %#v", localClientActions) - } - wasGlobalAPIExportValidated := false - for _, action := range cacheClientActions { - if action.Matches("update", "apiexports") { - updateAction := action.(kcptesting.UpdateAction) - if updateAction.GetCluster().String() != "root" { - t.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", updateAction.GetCluster()) - } - updatedUnstructuredAPIExport := updateAction.GetObject().(*unstructured.Unstructured) - globalAPIExportFromUnstructured := &apisv1alpha1.APIExport{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredAPIExport.Object, globalAPIExportFromUnstructured); err != nil { - t.Fatalf("failed to convert unstructured to APIExport: %v", err) - } - - expectedAPIExport := newAPIExportWithShardAnnotation("foo") - //nolint:staticcheck // SA1019 VirtualWorkspaces is deprecated but not removed yet - expectedAPIExport.Status.VirtualWorkspaces = []apisv1alpha1.VirtualWorkspace{{URL: "https://acme.dev"}} - if !equality.Semantic.DeepEqual(globalAPIExportFromUnstructured, expectedAPIExport) { - t.Errorf("unexpected update to the APIExport:\n%s", cmp.Diff(globalAPIExportFromUnstructured, expectedAPIExport)) - } - wasGlobalAPIExportValidated = true - break - } - } - if !wasGlobalAPIExportValidated { - t.Errorf("an APIExport on the cache sever wasn't updated") - } + getGlobalCopy: func(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) { + return WithResourceVersion(elephant.DeepCopy(), "7"), nil }, + key: "root|zoo/dumbo", + expectedUpdate: WithChange(WithResourceVersion(elephant.DeepCopy(), "7"), []string{"status", "weight"}, "42.5"), }, } for _, scenario := range scenarios { t.Run(scenario.name, func(tt *testing.T) { - target := &controller{shardName: "amber"} - localAPIExportIndexer := cache.NewIndexer(kcpcache.MetaClusterNamespaceKeyFunc, cache.Indexers{}) - for _, obj := range scenario.initialLocalAPIExports { - if err := localAPIExportIndexer.Add(obj); err != nil { - tt.Error(err) - } + var created *unstructured.Unstructured + var updated *unstructured.Unstructured + var deletedNamespace, deletedName string + + r := &reconciler{ + shardName: "root", + getLocalCopy: scenario.getLocalCopy, + getGlobalCopy: scenario.getGlobalCopy, + createObject: func(ctx context.Context, clusterName logicalcluster.Name, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + created = obj.DeepCopy() + return created, nil + }, + updateObject: func(ctx context.Context, cluster logicalcluster.Name, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + updated = obj.DeepCopy() + return updated, nil + }, + deleteObject: func(ctx context.Context, cluster logicalcluster.Name, ns, name string) error { + deletedNamespace = ns + deletedName = name + return nil + }, } - target.localAPIExportLister = apisv1alpha1listers.NewAPIExportClusterLister(localAPIExportIndexer) - target.globalAPIExportIndexer = cache.NewIndexer(kcpcache.MetaClusterNamespaceKeyFunc, cache.Indexers{ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace}) - for _, obj := range scenario.initialGlobalAPIExports { - if err := target.globalAPIExportIndexer.Add(obj); err != nil { - tt.Error(err) - } + + err := r.reconcile(context.Background(), scenario.key) + if scenario.expectedError != "" && err == nil { + tt.Fatalf("expected error %q, got nil", scenario.expectedError) + } else if scenario.expectedError == "" && err != nil { + tt.Fatalf("unexpected error: %v", err) + } else if scenario.expectedError != "" && err != nil && !strings.Contains(err.Error(), scenario.expectedError) { + tt.Fatalf("expected error %q, got %q", scenario.expectedError, err.Error()) } - fakeCacheDynamicClient := kcpfakedynamic.NewSimpleDynamicClient(scheme, func() []runtime.Object { - if scenario.initCacheFakeClientWithInitialAPIExports { - return scenario.initialGlobalAPIExports - } - return []runtime.Object{} - }()...) - target.dynamicKcpCacheClient = fakeCacheDynamicClient - fakeLocalDynamicClient := kcpfakedynamic.NewSimpleDynamicClient(scheme) - target.dynamicKcpLocalClient = fakeLocalDynamicClient - if err := target.reconcile(context.TODO(), scenario.reconcileKey); err != nil { - tt.Fatal(err) + + if scenario.expectedCreate != nil && created == nil { + tt.Fatalf("expected object to be created, but it was not") + } else if scenario.expectedCreate == nil && created != nil { + tt.Fatalf("expected object to not be created, but it was") + } else if !reflect.DeepEqual(scenario.expectedCreate, created) { + tt.Fatalf("expected created object to be %v, got %v, diff:\n%s", scenario.expectedCreate, created, cmp.Diff(scenario.expectedCreate, created)) + } + + if scenario.expectedUpdate != nil && updated == nil { + tt.Fatalf("expected object to be updated, but it was not") + } else if scenario.expectedUpdate == nil && updated != nil { + tt.Fatalf("expected object to not be updated, but it was") + } else if !reflect.DeepEqual(scenario.expectedUpdate, updated) { + tt.Fatalf("expected updated object to be %v, got %v, diff:\n%s", scenario.expectedUpdate, updated, cmp.Diff(scenario.expectedUpdate, updated)) } - if scenario.validateFunc != nil { - scenario.validateFunc(tt, fakeCacheDynamicClient.Actions(), fakeLocalDynamicClient.Actions()) + + if scenario.expectedDeleteName != deletedName { + tt.Fatalf("expected deleted name %q, got %q", scenario.expectedDeleteName, deletedName) + } + if scenario.expectedDeleteNamespace != deletedNamespace { + tt.Fatalf("expected deleted namespace %q, got %q", scenario.expectedDeleteNamespace, deletedNamespace) } }) } } -func newAPIExport(name string) *apisv1alpha1.APIExport { - return &apisv1alpha1.APIExport{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "apis.kcp.io/v1alpha1", - Kind: "APIExport", - }, - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{}, - Annotations: map[string]string{ - logicalcluster.AnnotationKey: "root", - }, - Name: name, - }, - Spec: apisv1alpha1.APIExportSpec{ - LatestResourceSchemas: []string{"lrs"}, - }, - Status: apisv1alpha1.APIExportStatus{ - IdentityHash: fmt.Sprintf("%s-identity", name), - }, +func WithResourceVersion(u *unstructured.Unstructured, rv string) *unstructured.Unstructured { + u.SetResourceVersion(rv) + + return u +} +func WithoutResourceVersion(u *unstructured.Unstructured) *unstructured.Unstructured { + unstructured.RemoveNestedField(u.Object, "metadata", "resourceVersion") + return u +} + +func WithDeletionTimestamp(u *unstructured.Unstructured, t time.Time) *unstructured.Unstructured { + ts := metav1.NewTime(t) + u.SetDeletionTimestamp(&ts) + return u +} + +func WithLabel(u *unstructured.Unstructured, key, value string) *unstructured.Unstructured { + labels := u.GetLabels() + if labels == nil { + labels = map[string]string{} } + labels[key] = value + u.SetLabels(labels) + return u +} + +func WithShardName(u *unstructured.Unstructured, name string) *unstructured.Unstructured { + ann := u.GetAnnotations() + if ann == nil { + ann = map[string]string{} + } + ann[request.AnnotationKey] = name + u.SetAnnotations(ann) + return u +} + +func WithChange(u *unstructured.Unstructured, path []string, value interface{}) *unstructured.Unstructured { + unstructured.SetNestedField(u.Object, value, path...) //nolint:errcheck + return u } -func newAPIExportWithShardAnnotation(name string) *apisv1alpha1.APIExport { - apiExport := newAPIExport(name) - apiExport.Annotations["kcp.io/shard"] = "amber" - return apiExport +func getCopyNotFoundFunc(cluster logicalcluster.Name, namespace, name string) (*unstructured.Unstructured, error) { + return nil, errors.NewNotFound(schema.GroupResource{Group: "example.com", Resource: "elephants"}, name) } diff --git a/pkg/reconciler/cache/replication/replication_reconcile_unstructured.go b/pkg/reconciler/cache/replication/replication_reconcile_unstructured.go index c852abf6c44..3de6daebbaf 100644 --- a/pkg/reconciler/cache/replication/replication_reconcile_unstructured.go +++ b/pkg/reconciler/cache/replication/replication_reconcile_unstructured.go @@ -17,84 +17,14 @@ limitations under the License. package replication import ( - "context" "encoding/json" "fmt" "reflect" - "github.com/kcp-dev/logicalcluster/v3" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" genericrequest "k8s.io/apiserver/pkg/endpoints/request" ) -// reconcileUnstructuredObjects makes sure that the given cachedObject of the given GVR under the given key from the local shard is replicated to the cache server. -// -// this method handles the following cases: -// 1. creation of the object in the cache server -// happens when the cacheObject is nil -// 2. deletion of the object from the cache server -// happens when either of the following is true: -// - the localObject object is nil -// - the localObject was deleted -// 3. modification of the object to match the original/local object -// happens when either of the following is true: -// - the localObject's metadata doesn't match the cacheObject -// - the localObject's spec doesn't match the cacheObject -// - the localObject's status doesn't match the cacheObject -func (c *controller) reconcileUnstructuredObjects(ctx context.Context, cluster logicalcluster.Name, gvr *schema.GroupVersionResource, cacheObject *unstructured.Unstructured, localObject *unstructured.Unstructured) error { - if localObject == nil { - return c.handleObjectDeletion(ctx, cluster, gvr, cacheObject) - } - if localObject.GetDeletionTimestamp() != nil { - return c.handleObjectDeletion(ctx, cluster, gvr, cacheObject) - } - - if cacheObject == nil { - // TODO: in the future the original RV will have to be stored in an annotation (?) - // so that the clients that need to modify the original/local object can do it - localObject.SetResourceVersion("") - annotations := localObject.GetAnnotations() - if annotations == nil { - annotations = map[string]string{} - } - annotations[genericrequest.AnnotationKey] = c.shardName - localObject.SetAnnotations(annotations) - _, err := c.dynamicKcpCacheClient.Cluster(cluster.Path()).Resource(*gvr).Namespace(localObject.GetNamespace()).Create(ctx, localObject, metav1.CreateOptions{}) - return err - } - - metaChanged, err := ensureMeta(cacheObject, localObject) - if err != nil { - return err - } - remainingChanged, err := ensureRemaining(cacheObject, localObject) - if err != nil { - return err - } - if !metaChanged && !remainingChanged { - return nil - } - - if metaChanged || remainingChanged { - _, err := c.dynamicKcpCacheClient.Cluster(cluster.Path()).Resource(*gvr).Namespace(cacheObject.GetNamespace()).Update(ctx, cacheObject, metav1.UpdateOptions{}) - return err - } - return nil -} - -func (c *controller) handleObjectDeletion(ctx context.Context, cluster logicalcluster.Name, gvr *schema.GroupVersionResource, cacheObject *unstructured.Unstructured) error { - if cacheObject == nil { - return nil // the cached object already removed - } - if cacheObject.GetDeletionTimestamp() == nil { - return c.dynamicKcpCacheClient.Cluster(cluster.Path()).Resource(*gvr).Namespace(cacheObject.GetNamespace()).Delete(ctx, cacheObject.GetName(), metav1.DeleteOptions{}) - } - return nil -} - // ensureMeta changes unstructuredCacheObject's metadata to match unstructuredLocalObject's metadata except the ResourceVersion and the shard annotation fields. func ensureMeta(cacheObject *unstructured.Unstructured, localObject *unstructured.Unstructured) (changed bool, err error) { cacheObjMetaRaw, hasCacheObjMetaRaw, err := unstructured.NestedFieldNoCopy(cacheObject.Object, "metadata") diff --git a/pkg/reconciler/cache/replication/replication_reconcile_unstructured_test.go b/pkg/reconciler/cache/replication/replication_reconcile_unstructured_test.go index c6399080779..1beee584a9e 100644 --- a/pkg/reconciler/cache/replication/replication_reconcile_unstructured_test.go +++ b/pkg/reconciler/cache/replication/replication_reconcile_unstructured_test.go @@ -17,15 +17,11 @@ limitations under the License. package replication import ( - "context" "reflect" "testing" - "time" "github.com/google/go-cmp/cmp" - kcpfakedynamic "github.com/kcp-dev/client-go/third_party/k8s.io/client-go/dynamic/fake" - kcptesting "github.com/kcp-dev/client-go/third_party/k8s.io/client-go/testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -419,146 +415,3 @@ func TestEnsureUnstructuredMeta(t *testing.T) { }) } } - -func TestHandleUnstructuredObjectDeletion(t *testing.T) { - scenarios := []struct { - name string - cacheObject *apisv1alpha1.APIExport - validateCacheObjectDeletion func(ts *testing.T, actions []kcptesting.Action) - }{ - { - name: "no-op", - }, - { - name: "DeletionTimestamp filed not set on cacheObject", - cacheObject: newAPIExport("foo"), - validateCacheObjectDeletion: func(t *testing.T, actions []kcptesting.Action) { - t.Helper() - - wasCacheApiExportValidated := false - for _, action := range actions { - if action.Matches("delete", "apiexports") { - deleteAction := action.(kcptesting.DeleteAction) - if deleteAction.GetName() != "foo" { - t.Fatalf("unexpected APIExport was removed = %v, expected = %v", deleteAction.GetName(), "foo") - } - wasCacheApiExportValidated = true - break - } - } - if !wasCacheApiExportValidated { - t.Errorf("an ApiExport on the cache sever wasn't deleted") - } - }, - }, - { - name: "no-op when DeletionTimestamp filed set", - cacheObject: func() *apisv1alpha1.APIExport { - t := metav1.NewTime(time.Now()) - apiExport := newAPIExport("foo") - apiExport.DeletionTimestamp = &t - return apiExport - }(), - validateCacheObjectDeletion: func(t *testing.T, actions []kcptesting.Action) { - t.Helper() - - if len(actions) > 0 { - t.Fatalf("didn't expect any API calls, got %v", actions) - } - }, - }, - { - name: "no-op when DeletionTimestamp filed and Finalizers are set", - cacheObject: func() *apisv1alpha1.APIExport { - t := metav1.NewTime(time.Now()) - apiExport := newAPIExport("foo") - apiExport.DeletionTimestamp = &t - apiExport.Finalizers = []string{"aFinalizer"} - return apiExport - }(), - validateCacheObjectDeletion: func(t *testing.T, actions []kcptesting.Action) { - t.Helper() - - if len(actions) > 0 { - t.Fatalf("didn't expect any API calls, got %v", actions) - } - }, - }, - } - - for _, scenario := range scenarios { - t.Run(scenario.name, func(tt *testing.T) { - var unstructuredCacheObject *unstructured.Unstructured - var err error - if scenario.cacheObject != nil { - unstructuredCacheObject, err = toUnstructured(scenario.cacheObject) - if err != nil { - tt.Fatal(err) - } - } - gvr := apisv1alpha1.SchemeGroupVersion.WithResource("apiexports") - target := &controller{} - fakeDynamicClient := kcpfakedynamic.NewSimpleDynamicClient(scheme, func() []runtime.Object { - if unstructuredCacheObject == nil { - return []runtime.Object{} - } - return []runtime.Object{unstructuredCacheObject} - }()...) - target.dynamicKcpCacheClient = fakeDynamicClient - - err = target.handleObjectDeletion(context.TODO(), "root", &gvr, unstructuredCacheObject) - if err != nil { - tt.Fatal(err) - } - if scenario.validateCacheObjectDeletion != nil { - scenario.validateCacheObjectDeletion(tt, fakeDynamicClient.Actions()) - } - }) - } -} - -// TestToUnstructured test if changing an unstructured obj won't change the original object. -func TestToUnstructured(t *testing.T) { - apiExport := newAPIExport("a1") - apiExport.Spec.MaximalPermissionPolicy = &apisv1alpha1.MaximalPermissionPolicy{Local: &apisv1alpha1.LocalAPIExportPolicy{}} - unstructuredApiExport, err := toUnstructured(apiExport) - if err != nil { - t.Fatal(err) - } - - // manipulate map (a reference type) - if err := unstructured.SetNestedField(unstructuredApiExport.Object, "valForNewAnnotation", "metadata", "annotations", "newAnnotation"); err != nil { - t.Fatal(err) - } - if newAnnotationVal := unstructuredApiExport.GetAnnotations()["newAnnotation"]; newAnnotationVal != "valForNewAnnotation" { - t.Fatalf("unexpected value %v for newAnnotation", newAnnotationVal) - } - - if _, hasNewAnnotation := apiExport.Annotations["newAnnotation"]; hasNewAnnotation { - t.Fatal("didn't expect changing unstructuredApiExport annotation will also change the original apiExport object") - } - - // manipulate string (a simple type) - if err := unstructured.SetNestedField(unstructuredApiExport.Object, "newName", "metadata", "name"); err != nil { - t.Fatal(err) - } - if unstructuredApiExport.GetName() != "newName" { - t.Fatalf("unexpected name %v", unstructuredApiExport.GetName()) - } - if apiExport.Name != "a1" { - t.Fatal("didn't expect changing unstructuredApiExport name will also change the original apiExport object") - } - - // manipulate pinter (a reference type) - unstructured.RemoveNestedField(unstructuredApiExport.Object, "spec", "maximalPermissionPolicy") - _, maximalPolicyFound, err := unstructured.NestedFieldNoCopy(unstructuredApiExport.Object, "spec", "maximalPermissionPolicy") - if err != nil { - t.Fatal(err) - } - if maximalPolicyFound { - t.Fatal("didn't expect to find spec.maximalPermissionPolicy") - } - if apiExport.Spec.MaximalPermissionPolicy == nil { - t.Fatal("apiExport.Spec.MaximalPermissionPolicy was removed") - } -} diff --git a/pkg/server/controllers.go b/pkg/server/controllers.go index f6e55d8a056..48b52baee3c 100644 --- a/pkg/server/controllers.go +++ b/pkg/server/controllers.go @@ -1352,13 +1352,8 @@ func (s *Server) installApiExportIdentityController(ctx context.Context, config } func (s *Server) installReplicationController(ctx context.Context, config *rest.Config, server *genericapiserver.GenericAPIServer) error { - config = rest.CopyConfig(config) - config = rest.AddUserAgent(config, replication.ControllerName) - dynamicLocalClient, err := kcpdynamic.NewForConfig(config) - if err != nil { - return err - } - controller, err := replication.NewController(s.Options.Extra.ShardName, s.CacheDynamicClient, dynamicLocalClient, s.KcpSharedInformerFactory, s.CacheKcpSharedInformerFactory, s.KubeSharedInformerFactory, s.CacheKubeSharedInformerFactory) + // TODO(sttts): set user agent + controller, err := replication.NewController(s.Options.Extra.ShardName, s.CacheDynamicClient, s.KcpSharedInformerFactory, s.CacheKcpSharedInformerFactory, s.KubeSharedInformerFactory, s.CacheKubeSharedInformerFactory) if err != nil { return err }