From 93e69afd3c82f090282e150ac76eba5a7ffc8dc5 Mon Sep 17 00:00:00 2001 From: Frederic Giloux Date: Thu, 17 Nov 2022 16:15:02 +0100 Subject: [PATCH 1/2] Adding ClusterWorkspaceShard to the resources stored in the cache server Signed-off-by: Frederic Giloux --- docs/content/en/main/concepts/cache-server.md | 1 + pkg/cache/server/bootstrap/bootstrap.go | 10 +- .../replication/replication_controller.go | 49 ++++-- .../replication/replication_reconcile.go | 20 ++- .../replication/replication_reconcile_test.go | 166 +++++++++--------- test/e2e/reconciler/cache/replication_test.go | 131 ++++++++++++-- 6 files changed, 262 insertions(+), 115 deletions(-) diff --git a/docs/content/en/main/concepts/cache-server.md b/docs/content/en/main/concepts/cache-server.md index 0c91a61ff15..7b627fff6ca 100644 --- a/docs/content/en/main/concepts/cache-server.md +++ b/docs/content/en/main/concepts/cache-server.md @@ -77,6 +77,7 @@ Out of the box, the server supports the following resources: - `apiresourceschemas` - `apiexports` +- `clusterworkspaceshards` All those resources are represented as CustomResourceDefinitions and stored in `system:cache:server` shard under `system:system-crds` cluster. diff --git a/pkg/cache/server/bootstrap/bootstrap.go b/pkg/cache/server/bootstrap/bootstrap.go index 66f10739c7b..f986b6d5ecf 100644 --- a/pkg/cache/server/bootstrap/bootstrap.go +++ b/pkg/cache/server/bootstrap/bootstrap.go @@ -43,10 +43,14 @@ const SystemCacheServerShard = "system:cache:server" func Bootstrap(ctx context.Context, apiExtensionsClusterClient kcpapiextensionsclientset.ClusterInterface) error { crds := []*apiextensionsv1.CustomResourceDefinition{} - for _, resource := range []string{"apiresourceschemas", "apiexports"} { + for _, gr := range []struct{ group, resource string }{ + {"apis.kcp.dev", "apiresourceschemas"}, + {"apis.kcp.dev", "apiexports"}, + {"tenancy.kcp.dev", "clusterworkspaceshards"}, + } { crd := &apiextensionsv1.CustomResourceDefinition{} - if err := configcrds.Unmarshal(fmt.Sprintf("apis.kcp.dev_%s.yaml", resource), crd); err != nil { - panic(fmt.Errorf("failed to unmarshal %v resource: %w", resource, err)) + if err := configcrds.Unmarshal(fmt.Sprintf("%s_%s.yaml", gr.group, gr.resource), crd); err != nil { + panic(fmt.Errorf("failed to unmarshal %v resource: %w", gr, err)) } for i := range crd.Spec.Versions { v := &crd.Spec.Versions[i] diff --git a/pkg/reconciler/cache/replication/replication_controller.go b/pkg/reconciler/cache/replication/replication_controller.go index 8e6982e617a..7ab0d654dbe 100644 --- a/pkg/reconciler/cache/replication/replication_controller.go +++ b/pkg/reconciler/cache/replication/replication_controller.go @@ -32,10 +32,12 @@ import ( "k8s.io/klog/v2" apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1" + tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1" cacheclient "github.com/kcp-dev/kcp/pkg/cache/client" "github.com/kcp-dev/kcp/pkg/cache/client/shard" kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions" apisv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/apis/v1alpha1" + tenancyv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/tenancy/v1alpha1" "github.com/kcp-dev/kcp/pkg/logging" ) @@ -46,8 +48,6 @@ const ( // NewController returns a new replication controller. // -// The replication controller copies objects of defined resources that have the "internal.sharding.kcp.dev/replicate" annotation to the cache server. -// // The replicated object will be placed under the same cluster as the original object. // In addition to that, all replicated objects will be placed under the shard taken from the shardName argument. // For example: shards/{shardName}/clusters/{clusterName}/apis/apis.kcp.dev/v1alpha1/apiexports @@ -59,14 +59,16 @@ func NewController( cacheKcpInformers kcpinformers.SharedInformerFactory, ) (*controller, error) { c := &controller{ - shardName: shardName, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), - dynamicCacheClient: dynamicCacheClient, - dynamicLocalClient: dynamicLocalClient, - localApiExportLister: localKcpInformers.Apis().V1alpha1().APIExports().Lister(), - localApiResourceSchemaLister: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Lister(), - cacheApiExportsIndexer: cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(), - cacheApiResourceSchemaIndexer: cacheKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(), + shardName: shardName, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), + dynamicCacheClient: dynamicCacheClient, + dynamicLocalClient: dynamicLocalClient, + localAPIExportLister: localKcpInformers.Apis().V1alpha1().APIExports().Lister(), + localAPIResourceSchemaLister: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Lister(), + localClusterWorkspaceShardLister: localKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Lister(), + cacheAPIExportsIndexer: cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(), + cacheAPIResourceSchemaIndexer: cacheKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(), + cacheClusterWorkspaceShardIndexer: cacheKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().GetIndexer(), } if err := cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().AddIndexers(cache.Indexers{ @@ -80,10 +82,19 @@ func NewController( return nil, err } + if err := cacheKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().AddIndexers(cache.Indexers{ + ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace, + }); err != nil { + return nil, err + } + localKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.apiExportInformerEventHandler()) localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.apiResourceSchemaInformerEventHandler()) + localKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().AddEventHandler(c.clusterWorkspaceShardInformerEventHandler()) cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.apiExportInformerEventHandler()) cacheKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.apiResourceSchemaInformerEventHandler()) + cacheKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().AddEventHandler(c.clusterWorkspaceShardInformerEventHandler()) + return c, nil } @@ -95,6 +106,10 @@ func (c *controller) enqueueAPIResourceSchema(obj interface{}) { c.enqueueObject(obj, apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas")) } +func (c *controller) enqueueClusterWorkspaceShard(obj interface{}) { + c.enqueueObject(obj, tenancyv1alpha1.SchemeGroupVersion.WithResource("clusterworkspaceshards")) +} + func (c *controller) enqueueObject(obj interface{}, gvr schema.GroupVersionResource) { key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj) if err != nil { @@ -155,6 +170,10 @@ func (c *controller) apiResourceSchemaInformerEventHandler() cache.ResourceEvent return objectInformerEventHandler(c.enqueueAPIResourceSchema) } +func (c *controller) clusterWorkspaceShardInformerEventHandler() cache.ResourceEventHandler { + return objectInformerEventHandler(c.enqueueClusterWorkspaceShard) +} + func objectInformerEventHandler(enqueueObject func(obj interface{})) cache.ResourceEventHandler { return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { enqueueObject(obj) }, @@ -170,9 +189,11 @@ type controller struct { dynamicCacheClient kcpdynamic.ClusterInterface dynamicLocalClient kcpdynamic.ClusterInterface - localApiExportLister apisv1alpha1listers.APIExportClusterLister - localApiResourceSchemaLister apisv1alpha1listers.APIResourceSchemaClusterLister + localAPIExportLister apisv1alpha1listers.APIExportClusterLister + localAPIResourceSchemaLister apisv1alpha1listers.APIResourceSchemaClusterLister + localClusterWorkspaceShardLister tenancyv1alpha1listers.ClusterWorkspaceShardClusterLister - cacheApiExportsIndexer cache.Indexer - cacheApiResourceSchemaIndexer cache.Indexer + cacheAPIExportsIndexer cache.Indexer + cacheAPIResourceSchemaIndexer cache.Indexer + cacheClusterWorkspaceShardIndexer cache.Indexer } diff --git a/pkg/reconciler/cache/replication/replication_reconcile.go b/pkg/reconciler/cache/replication/replication_reconcile.go index 0f50853472c..a4d2df805a3 100644 --- a/pkg/reconciler/cache/replication/replication_reconcile.go +++ b/pkg/reconciler/cache/replication/replication_reconcile.go @@ -33,6 +33,7 @@ import ( "k8s.io/client-go/tools/cache" apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1" + tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1" ) func (c *controller) reconcile(ctx context.Context, gvrKey string) error { @@ -47,10 +48,10 @@ func (c *controller) reconcile(ctx context.Context, gvrKey string) error { apisv1alpha1.SchemeGroupVersion.WithResource("apiexports"), apisv1alpha1.SchemeGroupVersion.WithKind("APIExport"), func(gvr schema.GroupVersionResource, cluster logicalcluster.Name, namespace, name string) (interface{}, error) { - return retrieveCacheObject(&gvr, c.cacheApiExportsIndexer, c.shardName, cluster, namespace, name) + return retrieveCacheObject(&gvr, c.cacheAPIExportsIndexer, c.shardName, cluster, namespace, name) }, func(cluster logicalcluster.Name, _, name string) (interface{}, error) { - return c.localApiExportLister.Cluster(cluster).Get(name) + return c.localAPIExportLister.Cluster(cluster).Get(name) }) case apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas").String(): return c.reconcileObject(ctx, @@ -58,10 +59,21 @@ func (c *controller) reconcile(ctx context.Context, gvrKey string) error { apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"), apisv1alpha1.SchemeGroupVersion.WithKind("APIResourceSchema"), func(gvr schema.GroupVersionResource, cluster logicalcluster.Name, namespace, name string) (interface{}, error) { - return retrieveCacheObject(&gvr, c.cacheApiResourceSchemaIndexer, c.shardName, cluster, namespace, name) + return retrieveCacheObject(&gvr, c.cacheAPIResourceSchemaIndexer, c.shardName, cluster, namespace, name) }, func(cluster logicalcluster.Name, _, name string) (interface{}, error) { - return c.localApiResourceSchemaLister.Cluster(cluster).Get(name) + return c.localAPIResourceSchemaLister.Cluster(cluster).Get(name) + }) + case tenancyv1alpha1.SchemeGroupVersion.WithResource("clusterworkspaceshards").String(): + return c.reconcileObject(ctx, + keyParts[1], + tenancyv1alpha1.SchemeGroupVersion.WithResource("clusterworkspaceshards"), + tenancyv1alpha1.SchemeGroupVersion.WithKind("ClusterWorkspaceShard"), + func(gvr schema.GroupVersionResource, cluster logicalcluster.Name, namespace, name string) (interface{}, error) { + return retrieveCacheObject(&gvr, c.cacheClusterWorkspaceShardIndexer, c.shardName, cluster, namespace, name) + }, + func(cluster logicalcluster.Name, _, name string) (interface{}, error) { + return c.localClusterWorkspaceShardLister.Cluster(cluster).Get(name) }) default: return fmt.Errorf("unsupported resource %v", keyParts[0]) diff --git a/pkg/reconciler/cache/replication/replication_reconcile_test.go b/pkg/reconciler/cache/replication/replication_reconcile_test.go index 5057eaa41a9..f0c6184fab3 100644 --- a/pkg/reconciler/cache/replication/replication_reconcile_test.go +++ b/pkg/reconciler/cache/replication/replication_reconcile_test.go @@ -48,49 +48,49 @@ func init() { func TestReconcileAPIExports(t *testing.T) { scenarios := []struct { name string - initialLocalApiExports []runtime.Object - initialCacheApiExports []runtime.Object - initCacheFakeClientWithInitialApiExports bool + initialLocalAPIExports []runtime.Object + initialCacheAPIExports []runtime.Object + initCacheFakeClientWithInitialAPIExports bool reconcileKey string validateFunc func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) }{ { name: "case 1: creation of the object in the cache server", - initialLocalApiExports: []runtime.Object{newAPIExport("foo")}, + initialLocalAPIExports: []runtime.Object{newAPIExport("foo")}, reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), validateFunc: func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { if len(localClientActions) != 0 { ts.Fatalf("unexpected REST calls were made to the localDynamicClient: %#v", localClientActions) } - wasCacheApiExportValidated := false + wasCacheAPIExportValidated := false for _, action := range cacheClientActions { if action.Matches("create", "apiexports") { createAction := action.(kcptesting.CreateAction) if createAction.GetCluster().String() != "root" { ts.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", createAction.GetCluster()) } - createdUnstructuredApiExport := createAction.GetObject().(*unstructured.Unstructured) - cacheApiExportFromUnstructured := &apisv1alpha1.APIExport{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(createdUnstructuredApiExport.Object, cacheApiExportFromUnstructured); err != nil { + createdUnstructuredAPIExport := createAction.GetObject().(*unstructured.Unstructured) + cacheAPIExportFromUnstructured := &apisv1alpha1.APIExport{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(createdUnstructuredAPIExport.Object, cacheAPIExportFromUnstructured); err != nil { ts.Fatalf("failed to convert unstructured to APIExport: %v", err) } - expectedApiExport := newAPIExportWithShardAnnotation("foo") - if !equality.Semantic.DeepEqual(cacheApiExportFromUnstructured, expectedApiExport) { - ts.Errorf("unexpected ApiExport was creaetd:\n%s", cmp.Diff(cacheApiExportFromUnstructured, expectedApiExport)) + expectedAPIExport := newAPIExportWithShardAnnotation("foo") + if !equality.Semantic.DeepEqual(cacheAPIExportFromUnstructured, expectedAPIExport) { + ts.Errorf("unexpected APIExport was created:\n%s", cmp.Diff(cacheAPIExportFromUnstructured, expectedAPIExport)) } - wasCacheApiExportValidated = true + wasCacheAPIExportValidated = true break } } - if !wasCacheApiExportValidated { - ts.Errorf("an ApiExport on the cache sever wasn't created") + if !wasCacheAPIExportValidated { + ts.Errorf("an APIExport on the cache sever wasn't created") } }, }, { name: "case 2: cached object is removed when local object was removed", - initialLocalApiExports: []runtime.Object{ + initialLocalAPIExports: []runtime.Object{ func() *apisv1alpha1.APIExport { t := metav1.NewTime(time.Now()) apiExport := newAPIExport("foo") @@ -99,14 +99,14 @@ func TestReconcileAPIExports(t *testing.T) { return apiExport }(), }, - initialCacheApiExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, - initCacheFakeClientWithInitialApiExports: true, + initialCacheAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, + initCacheFakeClientWithInitialAPIExports: true, reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), validateFunc: func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { if len(localClientActions) != 0 { ts.Fatalf("unexpected REST calls were made to the localDynamicClient: %#v", localClientActions) } - wasCacheApiExportValidated := false + wasCacheAPIExportValidated := false for _, action := range cacheClientActions { if action.Matches("delete", "apiexports") { deleteAction := action.(kcptesting.DeleteAction) @@ -116,23 +116,23 @@ func TestReconcileAPIExports(t *testing.T) { if deleteAction.GetName() != "foo" { ts.Fatalf("unexpected APIExport was removed = %v, expected = %v", deleteAction.GetName(), "foo") } - wasCacheApiExportValidated = true + wasCacheAPIExportValidated = true break } } - if !wasCacheApiExportValidated { - ts.Errorf("an ApiExport on the cache sever wasn't deleted") + if !wasCacheAPIExportValidated { + ts.Errorf("an APIExport on the cache sever wasn't deleted") } }, }, { name: "case 2: cached object is removed when local object was not found", - initialCacheApiExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, - initCacheFakeClientWithInitialApiExports: true, + initialCacheAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, + initCacheFakeClientWithInitialAPIExports: true, reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), validateFunc: func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { - wasCacheApiExportDeletionValidated := false - wasCacheApiExportRetrievalValidated := false + wasCacheAPIExportDeletionValidated := false + wasCacheAPIExportRetrievalValidated := false for _, action := range localClientActions { if action.Matches("get", "apiexports") { getAction := action.(kcptesting.GetAction) @@ -140,14 +140,14 @@ func TestReconcileAPIExports(t *testing.T) { ts.Fatalf("wrong cluster = %s was targeted for localDynamicClient", getAction.GetCluster()) } if getAction.GetName() != "foo" { - ts.Fatalf("unexpected ApiExport was retrieved = %s, expected = %s", getAction.GetName(), "foo") + ts.Fatalf("unexpected APIExport was retrieved = %s, expected = %s", getAction.GetName(), "foo") } - wasCacheApiExportRetrievalValidated = true + wasCacheAPIExportRetrievalValidated = true break } } - if !wasCacheApiExportRetrievalValidated { - ts.Errorf("before deleting an ApiExport the controller should live GET it") + if !wasCacheAPIExportRetrievalValidated { + ts.Errorf("before deleting an APIExport the controller should live GET it") } for _, action := range cacheClientActions { if action.Matches("delete", "apiexports") { @@ -158,141 +158,141 @@ func TestReconcileAPIExports(t *testing.T) { if deleteAction.GetName() != "foo" { ts.Fatalf("unexpected APIExport was removed = %v, expected = %v", deleteAction.GetName(), "foo") } - wasCacheApiExportDeletionValidated = true + wasCacheAPIExportDeletionValidated = true break } } - if !wasCacheApiExportDeletionValidated { - ts.Errorf("an ApiExport on the cache sever wasn't deleted") + if !wasCacheAPIExportDeletionValidated { + ts.Errorf("an APIExport on the cache sever wasn't deleted") } }, }, { name: "case 3: update, metadata mismatch", - initialLocalApiExports: []runtime.Object{ + initialLocalAPIExports: []runtime.Object{ func() *apisv1alpha1.APIExport { apiExport := newAPIExport("foo") apiExport.Labels["fooLabel"] = "fooLabelVal" return apiExport }(), }, - initialCacheApiExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, - initCacheFakeClientWithInitialApiExports: true, + initialCacheAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, + initCacheFakeClientWithInitialAPIExports: true, reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), validateFunc: func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { if len(localClientActions) != 0 { ts.Fatalf("unexpected REST calls were made to the localDynamicClient: %#v", localClientActions) } - wasCacheApiExportValidated := false + wasCacheAPIExportValidated := false for _, action := range cacheClientActions { if action.Matches("update", "apiexports") { updateAction := action.(kcptesting.UpdateAction) if updateAction.GetCluster().String() != "root" { ts.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", updateAction.GetCluster()) } - updatedUnstructuredApiExport := updateAction.GetObject().(*unstructured.Unstructured) - cacheApiExportFromUnstructured := &apisv1alpha1.APIExport{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredApiExport.Object, cacheApiExportFromUnstructured); err != nil { + updatedUnstructuredAPIExport := updateAction.GetObject().(*unstructured.Unstructured) + cacheAPIExportFromUnstructured := &apisv1alpha1.APIExport{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredAPIExport.Object, cacheAPIExportFromUnstructured); err != nil { ts.Fatalf("failed to convert unstructured to APIExport: %v", err) } - expectedApiExport := newAPIExportWithShardAnnotation("foo") - expectedApiExport.Labels["fooLabel"] = "fooLabelVal" - if !equality.Semantic.DeepEqual(cacheApiExportFromUnstructured, expectedApiExport) { - ts.Errorf("unexpected update to the ApiExport:\n%s", cmp.Diff(cacheApiExportFromUnstructured, expectedApiExport)) + expectedAPIExport := newAPIExportWithShardAnnotation("foo") + expectedAPIExport.Labels["fooLabel"] = "fooLabelVal" + if !equality.Semantic.DeepEqual(cacheAPIExportFromUnstructured, expectedAPIExport) { + ts.Errorf("unexpected update to the APIExport:\n%s", cmp.Diff(cacheAPIExportFromUnstructured, expectedAPIExport)) } - wasCacheApiExportValidated = true + wasCacheAPIExportValidated = true break } } - if !wasCacheApiExportValidated { - ts.Errorf("an ApiExport on the cache sever wasn't updated") + if !wasCacheAPIExportValidated { + ts.Errorf("an APIExport on the cache sever wasn't updated") } }, }, { name: "case 3: update, spec changed", - initialLocalApiExports: []runtime.Object{ + initialLocalAPIExports: []runtime.Object{ func() *apisv1alpha1.APIExport { apiExport := newAPIExport("foo") apiExport.Spec.PermissionClaims = []apisv1alpha1.PermissionClaim{{GroupResource: apisv1alpha1.GroupResource{}, IdentityHash: "abc"}} return apiExport }(), }, - initialCacheApiExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, - initCacheFakeClientWithInitialApiExports: true, + initialCacheAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, + initCacheFakeClientWithInitialAPIExports: true, reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), validateFunc: func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { if len(localClientActions) != 0 { ts.Fatalf("unexpected REST calls were made to the localDynamicClient: %#v", localClientActions) } - wasCacheApiExportValidated := false + wasCacheAPIExportValidated := false for _, action := range cacheClientActions { if action.Matches("update", "apiexports") { updateAction := action.(kcptesting.UpdateAction) if updateAction.GetCluster().String() != "root" { ts.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", updateAction.GetCluster()) } - updatedUnstructuredApiExport := updateAction.GetObject().(*unstructured.Unstructured) - cacheApiExportFromUnstructured := &apisv1alpha1.APIExport{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredApiExport.Object, cacheApiExportFromUnstructured); err != nil { + updatedUnstructuredAPIExport := updateAction.GetObject().(*unstructured.Unstructured) + cacheAPIExportFromUnstructured := &apisv1alpha1.APIExport{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredAPIExport.Object, cacheAPIExportFromUnstructured); err != nil { ts.Fatalf("failed to convert unstructured to APIExport: %v", err) } - expectedApiExport := newAPIExportWithShardAnnotation("foo") - expectedApiExport.Spec.PermissionClaims = []apisv1alpha1.PermissionClaim{{GroupResource: apisv1alpha1.GroupResource{}, IdentityHash: "abc"}} - if !equality.Semantic.DeepEqual(cacheApiExportFromUnstructured, expectedApiExport) { - ts.Errorf("unexpected update to the ApiExport:\n%s", cmp.Diff(cacheApiExportFromUnstructured, expectedApiExport)) + expectedAPIExport := newAPIExportWithShardAnnotation("foo") + expectedAPIExport.Spec.PermissionClaims = []apisv1alpha1.PermissionClaim{{GroupResource: apisv1alpha1.GroupResource{}, IdentityHash: "abc"}} + if !equality.Semantic.DeepEqual(cacheAPIExportFromUnstructured, expectedAPIExport) { + ts.Errorf("unexpected update to the APIExport:\n%s", cmp.Diff(cacheAPIExportFromUnstructured, expectedAPIExport)) } - wasCacheApiExportValidated = true + wasCacheAPIExportValidated = true break } } - if !wasCacheApiExportValidated { - ts.Errorf("an ApiExport on the cache sever wasn't updated") + if !wasCacheAPIExportValidated { + ts.Errorf("an APIExport on the cache sever wasn't updated") } }, }, { name: "case 3: update, status changed", - initialLocalApiExports: []runtime.Object{ + initialLocalAPIExports: []runtime.Object{ func() *apisv1alpha1.APIExport { apiExport := newAPIExport("foo") apiExport.Status.VirtualWorkspaces = []apisv1alpha1.VirtualWorkspace{{URL: "https://acme.dev"}} return apiExport }(), }, - initialCacheApiExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, - initCacheFakeClientWithInitialApiExports: true, + initialCacheAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, + initCacheFakeClientWithInitialAPIExports: true, reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), validateFunc: func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { if len(localClientActions) != 0 { ts.Fatalf("unexpected REST calls were made to the localDynamicClient: %#v", localClientActions) } - wasCacheApiExportValidated := false + wasCacheAPIExportValidated := false for _, action := range cacheClientActions { if action.Matches("update", "apiexports") { updateAction := action.(kcptesting.UpdateAction) if updateAction.GetCluster().String() != "root" { ts.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", updateAction.GetCluster()) } - updatedUnstructuredApiExport := updateAction.GetObject().(*unstructured.Unstructured) - cacheApiExportFromUnstructured := &apisv1alpha1.APIExport{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredApiExport.Object, cacheApiExportFromUnstructured); err != nil { + updatedUnstructuredAPIExport := updateAction.GetObject().(*unstructured.Unstructured) + cacheAPIExportFromUnstructured := &apisv1alpha1.APIExport{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredAPIExport.Object, cacheAPIExportFromUnstructured); err != nil { ts.Fatalf("failed to convert unstructured to APIExport: %v", err) } - expectedApiExport := newAPIExportWithShardAnnotation("foo") - expectedApiExport.Status.VirtualWorkspaces = []apisv1alpha1.VirtualWorkspace{{URL: "https://acme.dev"}} - if !equality.Semantic.DeepEqual(cacheApiExportFromUnstructured, expectedApiExport) { - ts.Errorf("unexpected update to the ApiExport:\n%s", cmp.Diff(cacheApiExportFromUnstructured, expectedApiExport)) + expectedAPIExport := newAPIExportWithShardAnnotation("foo") + expectedAPIExport.Status.VirtualWorkspaces = []apisv1alpha1.VirtualWorkspace{{URL: "https://acme.dev"}} + if !equality.Semantic.DeepEqual(cacheAPIExportFromUnstructured, expectedAPIExport) { + ts.Errorf("unexpected update to the APIExport:\n%s", cmp.Diff(cacheAPIExportFromUnstructured, expectedAPIExport)) } - wasCacheApiExportValidated = true + wasCacheAPIExportValidated = true break } } - if !wasCacheApiExportValidated { - ts.Errorf("an ApiExport on the cache sever wasn't updated") + if !wasCacheAPIExportValidated { + ts.Errorf("an APIExport on the cache sever wasn't updated") } }, }, @@ -300,22 +300,22 @@ func TestReconcileAPIExports(t *testing.T) { for _, scenario := range scenarios { t.Run(scenario.name, func(tt *testing.T) { target := &controller{shardName: "amber"} - localApiExportIndexer := cache.NewIndexer(kcpcache.MetaClusterNamespaceKeyFunc, cache.Indexers{}) - for _, obj := range scenario.initialLocalApiExports { - if err := localApiExportIndexer.Add(obj); err != nil { + localAPIExportIndexer := cache.NewIndexer(kcpcache.MetaClusterNamespaceKeyFunc, cache.Indexers{}) + for _, obj := range scenario.initialLocalAPIExports { + if err := localAPIExportIndexer.Add(obj); err != nil { tt.Error(err) } } - target.localApiExportLister = apisv1alpha1listers.NewAPIExportClusterLister(localApiExportIndexer) - target.cacheApiExportsIndexer = cache.NewIndexer(kcpcache.MetaClusterNamespaceKeyFunc, cache.Indexers{ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace}) - for _, obj := range scenario.initialCacheApiExports { - if err := target.cacheApiExportsIndexer.Add(obj); err != nil { + target.localAPIExportLister = apisv1alpha1listers.NewAPIExportClusterLister(localAPIExportIndexer) + target.cacheAPIExportsIndexer = cache.NewIndexer(kcpcache.MetaClusterNamespaceKeyFunc, cache.Indexers{ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace}) + for _, obj := range scenario.initialCacheAPIExports { + if err := target.cacheAPIExportsIndexer.Add(obj); err != nil { tt.Error(err) } } fakeCacheDynamicClient := kcpfakedynamic.NewSimpleDynamicClient(scheme, func() []runtime.Object { - if scenario.initCacheFakeClientWithInitialApiExports { - return scenario.initialCacheApiExports + if scenario.initCacheFakeClientWithInitialAPIExports { + return scenario.initialCacheAPIExports } return []runtime.Object{} }()...) diff --git a/test/e2e/reconciler/cache/replication_test.go b/test/e2e/reconciler/cache/replication_test.go index 051e8b108a6..8bd4c0f3fbc 100644 --- a/test/e2e/reconciler/cache/replication_test.go +++ b/test/e2e/reconciler/cache/replication_test.go @@ -57,10 +57,12 @@ var scenarios = []testScenario{ {"TestReplicateAPIExportNegative", replicateAPIExportNegativeScenario}, {"TestReplicateAPIResourceSchema", replicateAPIResourceSchemaScenario}, {"TestReplicateAPIResourceSchemaNegative", replicateAPIResourceSchemaNegativeScenario}, + {"TestReplicateClusterWorkspaceShard", replicateClusterWorkspaceShardScenario}, + {"TestReplicateClusterWorkspaceShardNegative", replicateClusterWorkspaceShardNegativeScenario}, } // replicateAPIResourceSchemaScenario tests if an APIResourceSchema is propagated to the cache server. -// The test exercises creation, modification and removal of the APIExport object. +// The test exercises creation, modification and removal of the APIResourceSchema object. func replicateAPIResourceSchemaScenario(ctx context.Context, t *testing.T, server framework.RunningServer, kcpShardClusterClient kcpclientset.ClusterInterface, cacheKcpClusterClient kcpclientset.ClusterInterface) { org := framework.NewOrganizationFixture(t, server) cluster := framework.NewWorkspaceFixture(t, server, org, framework.WithShardConstraints(tenancyv1alpha1.ShardConstraints{Name: "root"})) @@ -75,7 +77,7 @@ func replicateAPIResourceSchemaScenario(ctx context.Context, t *testing.T, serve t.Logf("Verify that the source APIResourceSchema %s/%s was replicated to the cache server", cluster, resourceName) scenario.VerifyReplication(ctx, t, cluster) - // not that since the spec of an APIResourceSchema is immutable we are limited to changing some metadata + // note that since the spec of an APIResourceSchema is immutable we are limited to changing some metadata t.Logf("Change some metadata on source APIResourceSchema %s/%s and verify if updates were propagated to the cached object", cluster, resourceName) scenario.UpdateSourceResource(ctx, t, cluster, func(res runtime.Object) error { if err := scenario.ChangeMetadataFor(res); err != nil { @@ -148,7 +150,7 @@ func replicateAPIResourceSchemaNegativeScenario(ctx context.Context, t *testing. scenario.DeleteCachedResource(ctx, t, cluster) scenario.VerifyReplication(ctx, t, cluster) - t.Logf("Update cahced APIResourceSchema %s/%s so that it differs from the source resource", cluster, scenario.resourceName) + t.Logf("Update cached APIResourceSchema %s/%s so that it differs from the source resource", cluster, scenario.resourceName) scenario.UpdateCachedResource(ctx, t, cluster, func(res runtime.Object) error { cachedSchema, ok := res.(*apisv1alpha1.APIResourceSchema) if !ok { @@ -237,7 +239,7 @@ func replicateAPIExportNegativeScenario(ctx context.Context, t *testing.T, serve scenario.DeleteCachedResource(ctx, t, cluster) scenario.VerifyReplication(ctx, t, cluster) - t.Logf("Update cahced APIExport %s/%s so that it differs from the source resource", cluster, scenario.resourceName) + t.Logf("Update cached APIExport %s/%s so that it differs from the source resource", cluster, scenario.resourceName) scenario.UpdateCachedResource(ctx, t, cluster, func(res runtime.Object) error { cachedExport, ok := res.(*apisv1alpha1.APIExport) if !ok { @@ -251,6 +253,105 @@ func replicateAPIExportNegativeScenario(ctx context.Context, t *testing.T, serve scenario.VerifyReplication(ctx, t, cluster) } +// replicateClusterWorkspaceShardScenario tests if a ClusterWorkspaceShard is propagated to the cache server. +// The test exercises creation, modification and removal of the ClusterWorkspaceShard object. +func replicateClusterWorkspaceShardScenario(ctx context.Context, t *testing.T, server framework.RunningServer, kcpShardClusterClient kcpclientset.ClusterInterface, cacheKcpClusterClient kcpclientset.ClusterInterface) { + // The ClusterWorkspaceShard API is per default only available in the root workspace + cluster := tenancyv1alpha1.RootCluster + resourceName := "test-shard" + scenario := &replicateResourceScenario{resourceName: resourceName, resourceKind: "ClusterWorkspaceShard", server: server, kcpShardClusterClient: kcpShardClusterClient, cacheKcpClusterClient: cacheKcpClusterClient} + + t.Logf("Create source ClusterWorkspaceShard %s/%s on the root shard for replication", cluster, resourceName) + scenario.CreateSourceResource(t, func() error { + cws := &tenancyv1alpha1.ClusterWorkspaceShard{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: resourceName, + }, + Spec: tenancyv1alpha1.ClusterWorkspaceShardSpec{ + BaseURL: "https://base.kcp.test.dev", + }, + } + cws, err := kcpShardClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Create(ctx, cws, metav1.CreateOptions{}) + resourceName = cws.Name + scenario.resourceName = cws.Name + return err + }) + t.Logf("Verify that the source ClusterWorkspaceShard %s/%s was replicated to the cache server", cluster, resourceName) + scenario.VerifyReplication(ctx, t, cluster) + + t.Logf("Change the spec on source ClusterWorkspaceShard %s/%s and verify if updates were propagated to the cached object", cluster, resourceName) + scenario.UpdateSourceResource(ctx, t, cluster, func(res runtime.Object) error { + cws, ok := res.(*tenancyv1alpha1.ClusterWorkspaceShard) + if !ok { + return fmt.Errorf("%T is not *ClusterWorkspaceShard", res) + } + cws.Spec.BaseURL = "https://kcp.test.dev" + _, err := kcpShardClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Update(ctx, cws, metav1.UpdateOptions{}) + return err + }) + scenario.VerifyReplication(ctx, t, cluster) + + t.Logf("Change some metadata on source ClusterWorkspaceShard %s/%s and verify if updates were propagated to the cached object", cluster, resourceName) + scenario.UpdateSourceResource(ctx, t, cluster, func(res runtime.Object) error { + if err := scenario.ChangeMetadataFor(res); err != nil { + return err + } + cws, ok := res.(*tenancyv1alpha1.ClusterWorkspaceShard) + if !ok { + return fmt.Errorf("%T is not *ClusterWorkspaceShard", res) + } + _, err := kcpShardClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Update(ctx, cws, metav1.UpdateOptions{}) + return err + }) + scenario.VerifyReplication(ctx, t, cluster) + + t.Logf("Verify that deleting source ClusterWorkspaceShard %s/%s leads to removal of the cached object", cluster, resourceName) + scenario.DeleteSourceResourceAndVerify(ctx, t, cluster) +} + +// replicateClusterWorkspaceShardNegativeScenario checks if modified or even deleted cached ClusterWorkspaceShard will be reconciled to match the original object +func replicateClusterWorkspaceShardNegativeScenario(ctx context.Context, t *testing.T, server framework.RunningServer, kcpShardClusterClient kcpclientset.ClusterInterface, cacheKcpClusterClient kcpclientset.ClusterInterface) { + // The ClusterWorkspaceShard API is per default only available in the root workspace + cluster := tenancyv1alpha1.RootCluster + resourceName := "test-shard" + scenario := &replicateResourceScenario{resourceName: resourceName, resourceKind: "ClusterWorkspaceShard", server: server, kcpShardClusterClient: kcpShardClusterClient, cacheKcpClusterClient: cacheKcpClusterClient} + + t.Logf("Create source ClusterWorkspaceShard %s/%s on the root shard for replication", cluster, resourceName) + scenario.CreateSourceResource(t, func() error { + cws := &tenancyv1alpha1.ClusterWorkspaceShard{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: resourceName, + }, + Spec: tenancyv1alpha1.ClusterWorkspaceShardSpec{ + BaseURL: "https://base.kcp.test.dev", + }, + } + cws, err := kcpShardClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Create(ctx, cws, metav1.CreateOptions{}) + resourceName = cws.Name + scenario.resourceName = cws.Name + return err + }) + t.Logf("Verify that the source ClusterWorkspaceShard %s/%s was replicated to the cache server", cluster, resourceName) + scenario.VerifyReplication(ctx, t, cluster) + + t.Logf("Delete cached ClusterWorkspaceShard %s/%s and check if it was brought back by the replication controller", cluster, resourceName) + scenario.DeleteCachedResource(ctx, t, cluster) + scenario.VerifyReplication(ctx, t, cluster) + + t.Logf("Update cached ClusterWorkspaceShard %s/%s so that it differs from the source resource", cluster, scenario.resourceName) + scenario.UpdateCachedResource(ctx, t, cluster, func(res runtime.Object) error { + cachedCws, ok := res.(*tenancyv1alpha1.ClusterWorkspaceShard) + if !ok { + return fmt.Errorf("%T is not *ClusterWorkspaceShard", res) + } + cachedCws.Spec.BaseURL = "https://base2.kcp.test.dev" + _, err := cacheKcpClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Update(cacheclient.WithShardInContext(ctx, shard.New("root")), cachedCws, metav1.UpdateOptions{}) + return err + }) + t.Logf("Verify that the cached ClusterWorkspaceShard %s/%s was brought back by the replication controller after an update", cluster, resourceName) + scenario.VerifyReplication(ctx, t, cluster) +} + // TestCacheServerInProcess runs all test scenarios against a cache server that runs with a kcp server func TestCacheServerInProcess(t *testing.T) { t.Parallel() @@ -390,7 +491,7 @@ func (b *replicateResourceScenario) resourceUpdateHelper(ctx context.Context, t err = resourceUpdater(resource) if err != nil { if !errors.IsConflict(err) { - return false, fmt.Sprintf("unknow error while updating the cached %s/%s/%s, err: %s", b.resourceKind, cluster, b.resourceName, err.Error()) + return false, fmt.Sprintf("unknown error while updating the cached %s/%s/%s, err: %s", b.resourceKind, cluster, b.resourceName, err.Error()) } return false, err.Error() // try again } @@ -413,7 +514,7 @@ func (b *replicateResourceScenario) verifyResourceReplicationHelper(ctx context. } return false, err.Error() } - t.Logf("Compare if both the orignal and replicated resources (%s %s/%s) are the same except %s annotation and ResourceVersion", b.resourceKind, cluster, b.resourceName, genericapirequest.AnnotationKey) + t.Logf("Compare if both the original and replicated resources (%s %s/%s) are the same except %s annotation and ResourceVersion", b.resourceKind, cluster, b.resourceName, genericapirequest.AnnotationKey) cachedResourceMeta, err := meta.Accessor(cachedResource) if err != nil { return false, err.Error() @@ -423,7 +524,7 @@ func (b *replicateResourceScenario) verifyResourceReplicationHelper(ctx context. } delete(cachedResourceMeta.GetAnnotations(), genericapirequest.AnnotationKey) if diff := cmp.Diff(cachedResource, originalResource, cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion")); len(diff) > 0 { - return false, fmt.Sprintf("replicated %s root|%s/%s is different that the original", b.resourceKind, cluster, cachedResourceMeta.GetName()) + return false, fmt.Sprintf("replicated %s root|%s/%s is different from the original", b.resourceKind, cluster, cachedResourceMeta.GetName()) } return true, "" }, wait.ForeverTestTimeout, 100*time.Millisecond) @@ -435,8 +536,10 @@ func (b *replicateResourceScenario) getSourceResourceHelper(ctx context.Context, return b.kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Get(ctx, b.resourceName, metav1.GetOptions{}) case "APIResourceSchema": return b.kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIResourceSchemas().Get(ctx, b.resourceName, metav1.GetOptions{}) + case "ClusterWorkspaceShard": + return b.kcpShardClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Get(ctx, b.resourceName, metav1.GetOptions{}) } - return nil, fmt.Errorf("unable to get a REST client for an uknown %s Kind", b.resourceKind) + return nil, fmt.Errorf("unable to get a REST client for an unknown %s Kind", b.resourceKind) } func (b *replicateResourceScenario) getCachedResourceHelper(ctx context.Context, cluster logicalcluster.Name) (runtime.Object, error) { @@ -445,8 +548,10 @@ func (b *replicateResourceScenario) getCachedResourceHelper(ctx context.Context, return b.cacheKcpClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Get(cacheclient.WithShardInContext(ctx, shard.New("root")), b.resourceName, metav1.GetOptions{}) case "APIResourceSchema": return b.cacheKcpClusterClient.Cluster(cluster).ApisV1alpha1().APIResourceSchemas().Get(cacheclient.WithShardInContext(ctx, shard.New("root")), b.resourceName, metav1.GetOptions{}) + case "ClusterWorkspaceShard": + return b.cacheKcpClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Get(cacheclient.WithShardInContext(ctx, shard.New("root")), b.resourceName, metav1.GetOptions{}) } - return nil, fmt.Errorf("unable to get a REST client for an uknown %s Kind", b.resourceKind) + return nil, fmt.Errorf("unable to get a REST client for an unknown %s Kind", b.resourceKind) } func (b *replicateResourceScenario) deleteSourceResourceHelper(ctx context.Context, cluster logicalcluster.Name) error { @@ -455,8 +560,10 @@ func (b *replicateResourceScenario) deleteSourceResourceHelper(ctx context.Conte return b.kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Delete(ctx, b.resourceName, metav1.DeleteOptions{}) case "APIResourceSchema": return b.kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIResourceSchemas().Delete(ctx, b.resourceName, metav1.DeleteOptions{}) + case "ClusterWorkspaceShard": + return b.kcpShardClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Delete(ctx, b.resourceName, metav1.DeleteOptions{}) } - return fmt.Errorf("unable to get a REST client for an uknown %s Kind", b.resourceKind) + return fmt.Errorf("unable to get a REST client for an unknown %s Kind", b.resourceKind) } func (b *replicateResourceScenario) deleteCachedResource(ctx context.Context, cluster logicalcluster.Name) error { @@ -465,6 +572,8 @@ func (b *replicateResourceScenario) deleteCachedResource(ctx context.Context, cl return b.cacheKcpClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Delete(cacheclient.WithShardInContext(ctx, shard.New("root")), b.resourceName, metav1.DeleteOptions{}) case "APIResourceSchema": return b.cacheKcpClusterClient.Cluster(cluster).ApisV1alpha1().APIResourceSchemas().Delete(cacheclient.WithShardInContext(ctx, shard.New("root")), b.resourceName, metav1.DeleteOptions{}) + case "ClusterWorkspaceShard": + return b.cacheKcpClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Delete(cacheclient.WithShardInContext(ctx, shard.New("root")), b.resourceName, metav1.DeleteOptions{}) } - return fmt.Errorf("unable to get a REST client for an uknown %s Kind", b.resourceKind) + return fmt.Errorf("unable to get a REST client for an unknown %s Kind", b.resourceKind) } From ee45c6579d098f7b4a4481e46c31409b82a4cbc2 Mon Sep 17 00:00:00 2001 From: Frederic Giloux Date: Wed, 23 Nov 2022 18:32:05 +0100 Subject: [PATCH 2/2] renaming "cache" prefix to "global" to avoid ambiguity between go-client cache and global resources (cache server) using indexers.AddIfNotPresentOrDie for consistency sake Signed-off-by: Frederic Giloux --- .../replication/replication_controller.go | 71 +++++++------- .../replication/replication_reconcile.go | 6 +- .../replication/replication_reconcile_test.go | 94 +++++++++---------- 3 files changed, 88 insertions(+), 83 deletions(-) diff --git a/pkg/reconciler/cache/replication/replication_controller.go b/pkg/reconciler/cache/replication/replication_controller.go index 7ab0d654dbe..fee1473f46a 100644 --- a/pkg/reconciler/cache/replication/replication_controller.go +++ b/pkg/reconciler/cache/replication/replication_controller.go @@ -38,6 +38,7 @@ import ( kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions" apisv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/apis/v1alpha1" tenancyv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/tenancy/v1alpha1" + "github.com/kcp-dev/kcp/pkg/indexers" "github.com/kcp-dev/kcp/pkg/logging" ) @@ -56,44 +57,48 @@ func NewController( dynamicCacheClient kcpdynamic.ClusterInterface, dynamicLocalClient kcpdynamic.ClusterInterface, localKcpInformers kcpinformers.SharedInformerFactory, - cacheKcpInformers kcpinformers.SharedInformerFactory, + globalKcpInformers kcpinformers.SharedInformerFactory, ) (*controller, error) { c := &controller{ - shardName: shardName, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), - dynamicCacheClient: dynamicCacheClient, - dynamicLocalClient: dynamicLocalClient, - localAPIExportLister: localKcpInformers.Apis().V1alpha1().APIExports().Lister(), - localAPIResourceSchemaLister: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Lister(), - localClusterWorkspaceShardLister: localKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Lister(), - cacheAPIExportsIndexer: cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(), - cacheAPIResourceSchemaIndexer: cacheKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(), - cacheClusterWorkspaceShardIndexer: cacheKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().GetIndexer(), + shardName: shardName, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), + dynamicCacheClient: dynamicCacheClient, + dynamicLocalClient: dynamicLocalClient, + localAPIExportLister: localKcpInformers.Apis().V1alpha1().APIExports().Lister(), + localAPIResourceSchemaLister: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Lister(), + localClusterWorkspaceShardLister: localKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Lister(), + globalAPIExportIndexer: globalKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(), + globalAPIResourceSchemaIndexer: globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(), + globalClusterWorkspaceShardIndexer: globalKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().GetIndexer(), } - if err := cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().AddIndexers(cache.Indexers{ - ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace, - }); err != nil { - return nil, err - } - if err := cacheKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddIndexers(cache.Indexers{ - ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace, - }); err != nil { - return nil, err - } - - if err := cacheKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().AddIndexers(cache.Indexers{ - ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace, - }); err != nil { - return nil, err - } + indexers.AddIfNotPresentOrDie( + globalKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(), + cache.Indexers{ + ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace, + }, + ) + + indexers.AddIfNotPresentOrDie( + globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(), + cache.Indexers{ + ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace, + }, + ) + + indexers.AddIfNotPresentOrDie( + globalKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().GetIndexer(), + cache.Indexers{ + ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace, + }, + ) localKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.apiExportInformerEventHandler()) localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.apiResourceSchemaInformerEventHandler()) localKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().AddEventHandler(c.clusterWorkspaceShardInformerEventHandler()) - cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.apiExportInformerEventHandler()) - cacheKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.apiResourceSchemaInformerEventHandler()) - cacheKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().AddEventHandler(c.clusterWorkspaceShardInformerEventHandler()) + globalKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.apiExportInformerEventHandler()) + globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.apiResourceSchemaInformerEventHandler()) + globalKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().AddEventHandler(c.clusterWorkspaceShardInformerEventHandler()) return c, nil } @@ -193,7 +198,7 @@ type controller struct { localAPIResourceSchemaLister apisv1alpha1listers.APIResourceSchemaClusterLister localClusterWorkspaceShardLister tenancyv1alpha1listers.ClusterWorkspaceShardClusterLister - cacheAPIExportsIndexer cache.Indexer - cacheAPIResourceSchemaIndexer cache.Indexer - cacheClusterWorkspaceShardIndexer cache.Indexer + globalAPIExportIndexer cache.Indexer + globalAPIResourceSchemaIndexer cache.Indexer + globalClusterWorkspaceShardIndexer cache.Indexer } diff --git a/pkg/reconciler/cache/replication/replication_reconcile.go b/pkg/reconciler/cache/replication/replication_reconcile.go index a4d2df805a3..576f93483c0 100644 --- a/pkg/reconciler/cache/replication/replication_reconcile.go +++ b/pkg/reconciler/cache/replication/replication_reconcile.go @@ -48,7 +48,7 @@ func (c *controller) reconcile(ctx context.Context, gvrKey string) error { apisv1alpha1.SchemeGroupVersion.WithResource("apiexports"), apisv1alpha1.SchemeGroupVersion.WithKind("APIExport"), func(gvr schema.GroupVersionResource, cluster logicalcluster.Name, namespace, name string) (interface{}, error) { - return retrieveCacheObject(&gvr, c.cacheAPIExportsIndexer, c.shardName, cluster, namespace, name) + return retrieveCacheObject(&gvr, c.globalAPIExportIndexer, c.shardName, cluster, namespace, name) }, func(cluster logicalcluster.Name, _, name string) (interface{}, error) { return c.localAPIExportLister.Cluster(cluster).Get(name) @@ -59,7 +59,7 @@ func (c *controller) reconcile(ctx context.Context, gvrKey string) error { apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"), apisv1alpha1.SchemeGroupVersion.WithKind("APIResourceSchema"), func(gvr schema.GroupVersionResource, cluster logicalcluster.Name, namespace, name string) (interface{}, error) { - return retrieveCacheObject(&gvr, c.cacheAPIResourceSchemaIndexer, c.shardName, cluster, namespace, name) + return retrieveCacheObject(&gvr, c.globalAPIResourceSchemaIndexer, c.shardName, cluster, namespace, name) }, func(cluster logicalcluster.Name, _, name string) (interface{}, error) { return c.localAPIResourceSchemaLister.Cluster(cluster).Get(name) @@ -70,7 +70,7 @@ func (c *controller) reconcile(ctx context.Context, gvrKey string) error { tenancyv1alpha1.SchemeGroupVersion.WithResource("clusterworkspaceshards"), tenancyv1alpha1.SchemeGroupVersion.WithKind("ClusterWorkspaceShard"), func(gvr schema.GroupVersionResource, cluster logicalcluster.Name, namespace, name string) (interface{}, error) { - return retrieveCacheObject(&gvr, c.cacheClusterWorkspaceShardIndexer, c.shardName, cluster, namespace, name) + return retrieveCacheObject(&gvr, c.globalClusterWorkspaceShardIndexer, c.shardName, cluster, namespace, name) }, func(cluster logicalcluster.Name, _, name string) (interface{}, error) { return c.localClusterWorkspaceShardLister.Cluster(cluster).Get(name) diff --git a/pkg/reconciler/cache/replication/replication_reconcile_test.go b/pkg/reconciler/cache/replication/replication_reconcile_test.go index f0c6184fab3..3b8dac5bbdf 100644 --- a/pkg/reconciler/cache/replication/replication_reconcile_test.go +++ b/pkg/reconciler/cache/replication/replication_reconcile_test.go @@ -49,7 +49,7 @@ func TestReconcileAPIExports(t *testing.T) { scenarios := []struct { name string initialLocalAPIExports []runtime.Object - initialCacheAPIExports []runtime.Object + initialGlobalAPIExports []runtime.Object initCacheFakeClientWithInitialAPIExports bool reconcileKey string validateFunc func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) @@ -62,7 +62,7 @@ func TestReconcileAPIExports(t *testing.T) { if len(localClientActions) != 0 { ts.Fatalf("unexpected REST calls were made to the localDynamicClient: %#v", localClientActions) } - wasCacheAPIExportValidated := false + wasGlobalAPIExportValidated := false for _, action := range cacheClientActions { if action.Matches("create", "apiexports") { createAction := action.(kcptesting.CreateAction) @@ -70,20 +70,20 @@ func TestReconcileAPIExports(t *testing.T) { ts.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", createAction.GetCluster()) } createdUnstructuredAPIExport := createAction.GetObject().(*unstructured.Unstructured) - cacheAPIExportFromUnstructured := &apisv1alpha1.APIExport{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(createdUnstructuredAPIExport.Object, cacheAPIExportFromUnstructured); err != nil { + globalAPIExportFromUnstructured := &apisv1alpha1.APIExport{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(createdUnstructuredAPIExport.Object, globalAPIExportFromUnstructured); err != nil { ts.Fatalf("failed to convert unstructured to APIExport: %v", err) } expectedAPIExport := newAPIExportWithShardAnnotation("foo") - if !equality.Semantic.DeepEqual(cacheAPIExportFromUnstructured, expectedAPIExport) { - ts.Errorf("unexpected APIExport was created:\n%s", cmp.Diff(cacheAPIExportFromUnstructured, expectedAPIExport)) + if !equality.Semantic.DeepEqual(globalAPIExportFromUnstructured, expectedAPIExport) { + ts.Errorf("unexpected APIExport was created:\n%s", cmp.Diff(globalAPIExportFromUnstructured, expectedAPIExport)) } - wasCacheAPIExportValidated = true + wasGlobalAPIExportValidated = true break } } - if !wasCacheAPIExportValidated { + if !wasGlobalAPIExportValidated { ts.Errorf("an APIExport on the cache sever wasn't created") } }, @@ -99,14 +99,14 @@ func TestReconcileAPIExports(t *testing.T) { return apiExport }(), }, - initialCacheAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, + initialGlobalAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, initCacheFakeClientWithInitialAPIExports: true, reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), validateFunc: func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { if len(localClientActions) != 0 { ts.Fatalf("unexpected REST calls were made to the localDynamicClient: %#v", localClientActions) } - wasCacheAPIExportValidated := false + wasGlobalAPIExportValidated := false for _, action := range cacheClientActions { if action.Matches("delete", "apiexports") { deleteAction := action.(kcptesting.DeleteAction) @@ -116,23 +116,23 @@ func TestReconcileAPIExports(t *testing.T) { if deleteAction.GetName() != "foo" { ts.Fatalf("unexpected APIExport was removed = %v, expected = %v", deleteAction.GetName(), "foo") } - wasCacheAPIExportValidated = true + wasGlobalAPIExportValidated = true break } } - if !wasCacheAPIExportValidated { + if !wasGlobalAPIExportValidated { ts.Errorf("an APIExport on the cache sever wasn't deleted") } }, }, { name: "case 2: cached object is removed when local object was not found", - initialCacheAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, + initialGlobalAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, initCacheFakeClientWithInitialAPIExports: true, reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), validateFunc: func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { - wasCacheAPIExportDeletionValidated := false - wasCacheAPIExportRetrievalValidated := false + wasGlobalAPIExportDeletionValidated := false + wasGlobalAPIExportRetrievalValidated := false for _, action := range localClientActions { if action.Matches("get", "apiexports") { getAction := action.(kcptesting.GetAction) @@ -142,11 +142,11 @@ func TestReconcileAPIExports(t *testing.T) { if getAction.GetName() != "foo" { ts.Fatalf("unexpected APIExport was retrieved = %s, expected = %s", getAction.GetName(), "foo") } - wasCacheAPIExportRetrievalValidated = true + wasGlobalAPIExportRetrievalValidated = true break } } - if !wasCacheAPIExportRetrievalValidated { + if !wasGlobalAPIExportRetrievalValidated { ts.Errorf("before deleting an APIExport the controller should live GET it") } for _, action := range cacheClientActions { @@ -158,11 +158,11 @@ func TestReconcileAPIExports(t *testing.T) { if deleteAction.GetName() != "foo" { ts.Fatalf("unexpected APIExport was removed = %v, expected = %v", deleteAction.GetName(), "foo") } - wasCacheAPIExportDeletionValidated = true + wasGlobalAPIExportDeletionValidated = true break } } - if !wasCacheAPIExportDeletionValidated { + if !wasGlobalAPIExportDeletionValidated { ts.Errorf("an APIExport on the cache sever wasn't deleted") } }, @@ -176,14 +176,14 @@ func TestReconcileAPIExports(t *testing.T) { return apiExport }(), }, - initialCacheAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, + initialGlobalAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, initCacheFakeClientWithInitialAPIExports: true, reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), validateFunc: func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { if len(localClientActions) != 0 { ts.Fatalf("unexpected REST calls were made to the localDynamicClient: %#v", localClientActions) } - wasCacheAPIExportValidated := false + wasGlobalAPIExportValidated := false for _, action := range cacheClientActions { if action.Matches("update", "apiexports") { updateAction := action.(kcptesting.UpdateAction) @@ -191,21 +191,21 @@ func TestReconcileAPIExports(t *testing.T) { ts.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", updateAction.GetCluster()) } updatedUnstructuredAPIExport := updateAction.GetObject().(*unstructured.Unstructured) - cacheAPIExportFromUnstructured := &apisv1alpha1.APIExport{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredAPIExport.Object, cacheAPIExportFromUnstructured); err != nil { + globalAPIExportFromUnstructured := &apisv1alpha1.APIExport{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredAPIExport.Object, globalAPIExportFromUnstructured); err != nil { ts.Fatalf("failed to convert unstructured to APIExport: %v", err) } expectedAPIExport := newAPIExportWithShardAnnotation("foo") expectedAPIExport.Labels["fooLabel"] = "fooLabelVal" - if !equality.Semantic.DeepEqual(cacheAPIExportFromUnstructured, expectedAPIExport) { - ts.Errorf("unexpected update to the APIExport:\n%s", cmp.Diff(cacheAPIExportFromUnstructured, expectedAPIExport)) + if !equality.Semantic.DeepEqual(globalAPIExportFromUnstructured, expectedAPIExport) { + ts.Errorf("unexpected update to the APIExport:\n%s", cmp.Diff(globalAPIExportFromUnstructured, expectedAPIExport)) } - wasCacheAPIExportValidated = true + wasGlobalAPIExportValidated = true break } } - if !wasCacheAPIExportValidated { + if !wasGlobalAPIExportValidated { ts.Errorf("an APIExport on the cache sever wasn't updated") } }, @@ -219,14 +219,14 @@ func TestReconcileAPIExports(t *testing.T) { return apiExport }(), }, - initialCacheAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, + initialGlobalAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, initCacheFakeClientWithInitialAPIExports: true, reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), validateFunc: func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { if len(localClientActions) != 0 { ts.Fatalf("unexpected REST calls were made to the localDynamicClient: %#v", localClientActions) } - wasCacheAPIExportValidated := false + wasGlobalAPIExportValidated := false for _, action := range cacheClientActions { if action.Matches("update", "apiexports") { updateAction := action.(kcptesting.UpdateAction) @@ -234,21 +234,21 @@ func TestReconcileAPIExports(t *testing.T) { ts.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", updateAction.GetCluster()) } updatedUnstructuredAPIExport := updateAction.GetObject().(*unstructured.Unstructured) - cacheAPIExportFromUnstructured := &apisv1alpha1.APIExport{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredAPIExport.Object, cacheAPIExportFromUnstructured); err != nil { + globalAPIExportFromUnstructured := &apisv1alpha1.APIExport{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredAPIExport.Object, globalAPIExportFromUnstructured); err != nil { ts.Fatalf("failed to convert unstructured to APIExport: %v", err) } expectedAPIExport := newAPIExportWithShardAnnotation("foo") expectedAPIExport.Spec.PermissionClaims = []apisv1alpha1.PermissionClaim{{GroupResource: apisv1alpha1.GroupResource{}, IdentityHash: "abc"}} - if !equality.Semantic.DeepEqual(cacheAPIExportFromUnstructured, expectedAPIExport) { - ts.Errorf("unexpected update to the APIExport:\n%s", cmp.Diff(cacheAPIExportFromUnstructured, expectedAPIExport)) + if !equality.Semantic.DeepEqual(globalAPIExportFromUnstructured, expectedAPIExport) { + ts.Errorf("unexpected update to the APIExport:\n%s", cmp.Diff(globalAPIExportFromUnstructured, expectedAPIExport)) } - wasCacheAPIExportValidated = true + wasGlobalAPIExportValidated = true break } } - if !wasCacheAPIExportValidated { + if !wasGlobalAPIExportValidated { ts.Errorf("an APIExport on the cache sever wasn't updated") } }, @@ -262,14 +262,14 @@ func TestReconcileAPIExports(t *testing.T) { return apiExport }(), }, - initialCacheAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, + initialGlobalAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, initCacheFakeClientWithInitialAPIExports: true, reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), validateFunc: func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { if len(localClientActions) != 0 { ts.Fatalf("unexpected REST calls were made to the localDynamicClient: %#v", localClientActions) } - wasCacheAPIExportValidated := false + wasGlobalAPIExportValidated := false for _, action := range cacheClientActions { if action.Matches("update", "apiexports") { updateAction := action.(kcptesting.UpdateAction) @@ -277,21 +277,21 @@ func TestReconcileAPIExports(t *testing.T) { ts.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", updateAction.GetCluster()) } updatedUnstructuredAPIExport := updateAction.GetObject().(*unstructured.Unstructured) - cacheAPIExportFromUnstructured := &apisv1alpha1.APIExport{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredAPIExport.Object, cacheAPIExportFromUnstructured); err != nil { + globalAPIExportFromUnstructured := &apisv1alpha1.APIExport{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredAPIExport.Object, globalAPIExportFromUnstructured); err != nil { ts.Fatalf("failed to convert unstructured to APIExport: %v", err) } expectedAPIExport := newAPIExportWithShardAnnotation("foo") expectedAPIExport.Status.VirtualWorkspaces = []apisv1alpha1.VirtualWorkspace{{URL: "https://acme.dev"}} - if !equality.Semantic.DeepEqual(cacheAPIExportFromUnstructured, expectedAPIExport) { - ts.Errorf("unexpected update to the APIExport:\n%s", cmp.Diff(cacheAPIExportFromUnstructured, expectedAPIExport)) + if !equality.Semantic.DeepEqual(globalAPIExportFromUnstructured, expectedAPIExport) { + ts.Errorf("unexpected update to the APIExport:\n%s", cmp.Diff(globalAPIExportFromUnstructured, expectedAPIExport)) } - wasCacheAPIExportValidated = true + wasGlobalAPIExportValidated = true break } } - if !wasCacheAPIExportValidated { + if !wasGlobalAPIExportValidated { ts.Errorf("an APIExport on the cache sever wasn't updated") } }, @@ -307,15 +307,15 @@ func TestReconcileAPIExports(t *testing.T) { } } target.localAPIExportLister = apisv1alpha1listers.NewAPIExportClusterLister(localAPIExportIndexer) - target.cacheAPIExportsIndexer = cache.NewIndexer(kcpcache.MetaClusterNamespaceKeyFunc, cache.Indexers{ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace}) - for _, obj := range scenario.initialCacheAPIExports { - if err := target.cacheAPIExportsIndexer.Add(obj); err != nil { + target.globalAPIExportIndexer = cache.NewIndexer(kcpcache.MetaClusterNamespaceKeyFunc, cache.Indexers{ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace}) + for _, obj := range scenario.initialGlobalAPIExports { + if err := target.globalAPIExportIndexer.Add(obj); err != nil { tt.Error(err) } } fakeCacheDynamicClient := kcpfakedynamic.NewSimpleDynamicClient(scheme, func() []runtime.Object { if scenario.initCacheFakeClientWithInitialAPIExports { - return scenario.initialCacheAPIExports + return scenario.initialGlobalAPIExports } return []runtime.Object{} }()...)