From ef7f921ba50f3a2ef2849d2145a49fcddfcf35af Mon Sep 17 00:00:00 2001 From: Frederic Giloux Date: Thu, 17 Nov 2022 16:15:02 +0100 Subject: [PATCH] Adding ClusterWorkspaceShard to the resources stored in the cache server Signed-off-by: Frederic Giloux --- docs/content/en/main/concepts/cache-server.md | 1 + pkg/cache/server/bootstrap/bootstrap.go | 8 +- .../replication/replication_controller.go | 50 ++++-- .../replication/replication_reconcile.go | 20 ++- .../replication/replication_reconcile_test.go | 166 +++++++++--------- test/e2e/reconciler/cache/replication_test.go | 123 ++++++++++++- 6 files changed, 256 insertions(+), 112 deletions(-) diff --git a/docs/content/en/main/concepts/cache-server.md b/docs/content/en/main/concepts/cache-server.md index 1e47c965fcb2..655fa81fede8 100644 --- a/docs/content/en/main/concepts/cache-server.md +++ b/docs/content/en/main/concepts/cache-server.md @@ -77,6 +77,7 @@ Out of the box, the server supports the following resources: - `apiresourceschemas` - `apiexports` +- `clusterworkspaceshards` All those resources are represented as CustomResourceDefinitions and stored in `system:cache:server` shard under `system:system-crds` cluster. diff --git a/pkg/cache/server/bootstrap/bootstrap.go b/pkg/cache/server/bootstrap/bootstrap.go index 66f10739c7b7..5defbe93bf1a 100644 --- a/pkg/cache/server/bootstrap/bootstrap.go +++ b/pkg/cache/server/bootstrap/bootstrap.go @@ -43,9 +43,13 @@ const SystemCacheServerShard = "system:cache:server" func Bootstrap(ctx context.Context, apiExtensionsClusterClient kcpapiextensionsclientset.ClusterInterface) error { crds := []*apiextensionsv1.CustomResourceDefinition{} - for _, resource := range []string{"apiresourceschemas", "apiexports"} { + for _, resource := range []struct{ g, n string }{ + {"apis.kcp.dev", "apiresourceschemas"}, + {"apis.kcp.dev", "apiexports"}, + {"tenancy.kcp.dev", "clusterworkspaceshards"}, + } { crd := &apiextensionsv1.CustomResourceDefinition{} - if err := configcrds.Unmarshal(fmt.Sprintf("apis.kcp.dev_%s.yaml", resource), crd); err != nil { + if err := configcrds.Unmarshal(fmt.Sprintf("%s_%s.yaml", resource.g, resource.n), crd); err != nil { panic(fmt.Errorf("failed to unmarshal %v resource: %w", resource, err)) } for i := range crd.Spec.Versions { diff --git a/pkg/reconciler/cache/replication/replication_controller.go b/pkg/reconciler/cache/replication/replication_controller.go index 9265fe790278..a0f6ee484b07 100644 --- a/pkg/reconciler/cache/replication/replication_controller.go +++ b/pkg/reconciler/cache/replication/replication_controller.go @@ -32,10 +32,13 @@ import ( "k8s.io/klog/v2" apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1" + tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1" cacheclient "github.com/kcp-dev/kcp/pkg/cache/client" "github.com/kcp-dev/kcp/pkg/cache/client/shard" kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions" apislisters "github.com/kcp-dev/kcp/pkg/client/listers/apis/v1alpha1" + tenancyclisters "github.com/kcp-dev/kcp/pkg/client/listers/tenancy/v1alpha1" + "github.com/kcp-dev/kcp/pkg/logging" ) @@ -46,8 +49,6 @@ const ( // NewController returns a new replication controller. // -// The replication controller copies objects of defined resources that have the "internal.sharding.kcp.dev/replicate" annotation to the cache server. -// // The replicated object will be placed under the same cluster as the original object. // In addition to that, all replicated objects will be placed under the shard taken from the shardName argument. // For example: shards/{shardName}/clusters/{clusterName}/apis/apis.kcp.dev/v1alpha1/apiexports @@ -59,14 +60,16 @@ func NewController( cacheKcpInformers kcpinformers.SharedInformerFactory, ) (*controller, error) { c := &controller{ - shardName: shardName, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), - dynamicCacheClient: dynamicCacheClient, - dynamicLocalClient: dynamicLocalClient, - localApiExportLister: localKcpInformers.Apis().V1alpha1().APIExports().Lister(), - localApiResourceSchemaLister: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Lister(), - cacheApiExportsIndexer: cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(), - cacheApiResourceSchemaIndexer: cacheKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(), + shardName: shardName, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), + dynamicCacheClient: dynamicCacheClient, + dynamicLocalClient: dynamicLocalClient, + localAPIExportLister: localKcpInformers.Apis().V1alpha1().APIExports().Lister(), + localAPIResourceSchemaLister: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Lister(), + localClusterWorkspaceShardLister: localKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Lister(), + cacheAPIExportsIndexer: cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(), + cacheAPIResourceSchemaIndexer: cacheKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(), + cacheClusterWorkspaceShardIndexer: cacheKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().GetIndexer(), } if err := cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().AddIndexers(cache.Indexers{ @@ -80,10 +83,19 @@ func NewController( return nil, err } + if err := cacheKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().AddIndexers(cache.Indexers{ + ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace, + }); err != nil { + return nil, err + } + localKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.apiExportInformerEventHandler()) localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.apiResourceSchemaInformerEventHandler()) + localKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().AddEventHandler(c.clusterWorkspaceShardInformerEventHandler()) cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.apiExportInformerEventHandler()) cacheKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.apiResourceSchemaInformerEventHandler()) + cacheKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().AddEventHandler(c.clusterWorkspaceShardInformerEventHandler()) + return c, nil } @@ -95,6 +107,10 @@ func (c *controller) enqueueAPIResourceSchema(obj interface{}) { c.enqueueObject(obj, apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas")) } +func (c *controller) enqueueClusterWorkspaceShard(obj interface{}) { + c.enqueueObject(obj, tenancyv1alpha1.SchemeGroupVersion.WithResource("clusterworkspaceshards")) +} + func (c *controller) enqueueObject(obj interface{}, gvr schema.GroupVersionResource) { key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj) if err != nil { @@ -155,6 +171,10 @@ func (c *controller) apiResourceSchemaInformerEventHandler() cache.ResourceEvent return objectInformerEventHandler(c.enqueueAPIResourceSchema) } +func (c *controller) clusterWorkspaceShardInformerEventHandler() cache.ResourceEventHandler { + return objectInformerEventHandler(c.enqueueClusterWorkspaceShard) +} + func objectInformerEventHandler(enqueueObject func(obj interface{})) cache.ResourceEventHandler { return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { enqueueObject(obj) }, @@ -170,9 +190,11 @@ type controller struct { dynamicCacheClient kcpdynamic.ClusterInterface dynamicLocalClient kcpdynamic.ClusterInterface - localApiExportLister apislisters.APIExportLister - localApiResourceSchemaLister apislisters.APIResourceSchemaLister + localAPIExportLister apislisters.APIExportLister + localAPIResourceSchemaLister apislisters.APIResourceSchemaLister + localClusterWorkspaceShardLister tenancyclisters.ClusterWorkspaceShardLister - cacheApiExportsIndexer cache.Indexer - cacheApiResourceSchemaIndexer cache.Indexer + cacheAPIExportsIndexer cache.Indexer + cacheAPIResourceSchemaIndexer cache.Indexer + cacheClusterWorkspaceShardIndexer cache.Indexer } diff --git a/pkg/reconciler/cache/replication/replication_reconcile.go b/pkg/reconciler/cache/replication/replication_reconcile.go index 87c43954dadc..be6297683eca 100644 --- a/pkg/reconciler/cache/replication/replication_reconcile.go +++ b/pkg/reconciler/cache/replication/replication_reconcile.go @@ -33,6 +33,7 @@ import ( "k8s.io/client-go/tools/cache" apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1" + tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1" ) func (c *controller) reconcile(ctx context.Context, gvrKey string) error { @@ -47,10 +48,10 @@ func (c *controller) reconcile(ctx context.Context, gvrKey string) error { apisv1alpha1.SchemeGroupVersion.WithResource("apiexports"), apisv1alpha1.SchemeGroupVersion.WithKind("APIExport"), func(gvr schema.GroupVersionResource, cluster, namespace, name string) (interface{}, error) { - return retrieveCacheObject(&gvr, c.cacheApiExportsIndexer, c.shardName, cluster, namespace, name) + return retrieveCacheObject(&gvr, c.cacheAPIExportsIndexer, c.shardName, cluster, namespace, name) }, func(key string) (interface{}, error) { - return c.localApiExportLister.Get(key) + return c.localAPIExportLister.Get(key) }) case apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas").String(): return c.reconcileObject(ctx, @@ -58,10 +59,21 @@ func (c *controller) reconcile(ctx context.Context, gvrKey string) error { apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"), apisv1alpha1.SchemeGroupVersion.WithKind("APIResourceSchema"), func(gvr schema.GroupVersionResource, cluster, namespace, name string) (interface{}, error) { - return retrieveCacheObject(&gvr, c.cacheApiResourceSchemaIndexer, c.shardName, cluster, namespace, name) + return retrieveCacheObject(&gvr, c.cacheAPIResourceSchemaIndexer, c.shardName, cluster, namespace, name) }, func(key string) (interface{}, error) { - return c.localApiResourceSchemaLister.Get(key) + return c.localAPIResourceSchemaLister.Get(key) + }) + case tenancyv1alpha1.SchemeGroupVersion.WithResource("clusterworkspaceshards").String(): + return c.reconcileObject(ctx, + keyParts[1], + tenancyv1alpha1.SchemeGroupVersion.WithResource("clusterworkspaceshards"), + tenancyv1alpha1.SchemeGroupVersion.WithKind("ClusterWorkspaceShard"), + func(gvr schema.GroupVersionResource, cluster, namespace, name string) (interface{}, error) { + return retrieveCacheObject(&gvr, c.cacheClusterWorkspaceShardIndexer, c.shardName, cluster, namespace, name) + }, + func(key string) (interface{}, error) { + return c.localClusterWorkspaceShardLister.Get(key) }) default: return fmt.Errorf("unsupported resource %v", keyParts[0]) diff --git a/pkg/reconciler/cache/replication/replication_reconcile_test.go b/pkg/reconciler/cache/replication/replication_reconcile_test.go index ece4cb0ebf70..10c92b0431d5 100644 --- a/pkg/reconciler/cache/replication/replication_reconcile_test.go +++ b/pkg/reconciler/cache/replication/replication_reconcile_test.go @@ -47,49 +47,49 @@ func init() { func TestReconcileAPIExports(t *testing.T) { scenarios := []struct { name string - initialLocalApiExports []runtime.Object - initialCacheApiExports []runtime.Object - initCacheFakeClientWithInitialApiExports bool + initialLocalAPIExports []runtime.Object + initialCacheAPIExports []runtime.Object + initCacheFakeClientWithInitialAPIExports bool reconcileKey string validateFunc func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) }{ { name: "case 1: creation of the object in the cache server", - initialLocalApiExports: []runtime.Object{newAPIExport("foo")}, + initialLocalAPIExports: []runtime.Object{newAPIExport("foo")}, reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), validateFunc: func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { if len(localClientActions) != 0 { ts.Fatal("unexpected REST calls were made to the localDynamicClient") } - wasCacheApiExportValidated := false + wasCacheAPIExportValidated := false for _, action := range cacheClientActions { if action.Matches("create", "apiexports") { createAction := action.(kcptesting.CreateAction) if createAction.GetCluster().String() != "root" { ts.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", createAction.GetCluster()) } - createdUnstructuredApiExport := createAction.GetObject().(*unstructured.Unstructured) - cacheApiExportFromUnstructured := &apisv1alpha1.APIExport{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(createdUnstructuredApiExport.Object, cacheApiExportFromUnstructured); err != nil { + createdUnstructuredAPIExport := createAction.GetObject().(*unstructured.Unstructured) + cacheAPIExportFromUnstructured := &apisv1alpha1.APIExport{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(createdUnstructuredAPIExport.Object, cacheAPIExportFromUnstructured); err != nil { ts.Fatalf("failed to convert unstructured to APIExport: %v", err) } - expectedApiExport := newAPIExportWithShardAnnotation("foo") - if !equality.Semantic.DeepEqual(cacheApiExportFromUnstructured, expectedApiExport) { - ts.Errorf("unexpected ApiExport was creaetd:\n%s", cmp.Diff(cacheApiExportFromUnstructured, expectedApiExport)) + expectedAPIExport := newAPIExportWithShardAnnotation("foo") + if !equality.Semantic.DeepEqual(cacheAPIExportFromUnstructured, expectedAPIExport) { + ts.Errorf("unexpected APIExport was created:\n%s", cmp.Diff(cacheAPIExportFromUnstructured, expectedAPIExport)) } - wasCacheApiExportValidated = true + wasCacheAPIExportValidated = true break } } - if !wasCacheApiExportValidated { - ts.Errorf("an ApiExport on the cache sever wasn't created") + if !wasCacheAPIExportValidated { + ts.Errorf("an APIExport on the cache sever wasn't created") } }, }, { name: "case 2: cached object is removed when local object was removed", - initialLocalApiExports: []runtime.Object{ + initialLocalAPIExports: []runtime.Object{ func() *apisv1alpha1.APIExport { t := metav1.NewTime(time.Now()) apiExport := newAPIExport("foo") @@ -98,14 +98,14 @@ func TestReconcileAPIExports(t *testing.T) { return apiExport }(), }, - initialCacheApiExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, - initCacheFakeClientWithInitialApiExports: true, + initialCacheAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, + initCacheFakeClientWithInitialAPIExports: true, reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), validateFunc: func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { if len(localClientActions) != 0 { ts.Fatal("unexpected REST calls were made to the localDynamicClient") } - wasCacheApiExportValidated := false + wasCacheAPIExportValidated := false for _, action := range cacheClientActions { if action.Matches("delete", "apiexports") { deleteAction := action.(kcptesting.DeleteAction) @@ -115,23 +115,23 @@ func TestReconcileAPIExports(t *testing.T) { if deleteAction.GetName() != "foo" { ts.Fatalf("unexpected APIExport was removed = %v, expected = %v", deleteAction.GetName(), "foo") } - wasCacheApiExportValidated = true + wasCacheAPIExportValidated = true break } } - if !wasCacheApiExportValidated { - ts.Errorf("an ApiExport on the cache sever wasn't deleted") + if !wasCacheAPIExportValidated { + ts.Errorf("an APIExport on the cache sever wasn't deleted") } }, }, { name: "case 2: cached object is removed when local object was not found", - initialCacheApiExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, - initCacheFakeClientWithInitialApiExports: true, + initialCacheAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, + initCacheFakeClientWithInitialAPIExports: true, reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), validateFunc: func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { - wasCacheApiExportDeletionValidated := false - wasCacheApiExportRetrievalValidated := false + wasCacheAPIExportDeletionValidated := false + wasCacheAPIExportRetrievalValidated := false for _, action := range localClientActions { if action.Matches("get", "apiexports") { getAction := action.(kcptesting.GetAction) @@ -139,14 +139,14 @@ func TestReconcileAPIExports(t *testing.T) { ts.Fatalf("wrong cluster = %s was targeted for localDynamicClient", getAction.GetCluster()) } if getAction.GetName() != "foo" { - ts.Fatalf("unexpected ApiExport was retrieved = %s, expected = %s", getAction.GetName(), "foo") + ts.Fatalf("unexpected APIExport was retrieved = %s, expected = %s", getAction.GetName(), "foo") } - wasCacheApiExportRetrievalValidated = true + wasCacheAPIExportRetrievalValidated = true break } } - if !wasCacheApiExportRetrievalValidated { - ts.Errorf("before deleting an ApiExport the controller should live GET it") + if !wasCacheAPIExportRetrievalValidated { + ts.Errorf("before deleting an APIExport the controller should live GET it") } for _, action := range cacheClientActions { if action.Matches("delete", "apiexports") { @@ -157,141 +157,141 @@ func TestReconcileAPIExports(t *testing.T) { if deleteAction.GetName() != "foo" { ts.Fatalf("unexpected APIExport was removed = %v, expected = %v", deleteAction.GetName(), "foo") } - wasCacheApiExportDeletionValidated = true + wasCacheAPIExportDeletionValidated = true break } } - if !wasCacheApiExportDeletionValidated { - ts.Errorf("an ApiExport on the cache sever wasn't deleted") + if !wasCacheAPIExportDeletionValidated { + ts.Errorf("an APIExport on the cache sever wasn't deleted") } }, }, { name: "case 3: update, metadata mismatch", - initialLocalApiExports: []runtime.Object{ + initialLocalAPIExports: []runtime.Object{ func() *apisv1alpha1.APIExport { apiExport := newAPIExport("foo") apiExport.Labels["fooLabel"] = "fooLabelVal" return apiExport }(), }, - initialCacheApiExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, - initCacheFakeClientWithInitialApiExports: true, + initialCacheAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, + initCacheFakeClientWithInitialAPIExports: true, reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), validateFunc: func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { if len(localClientActions) != 0 { ts.Fatal("unexpected REST calls were made to the localDynamicClient") } - wasCacheApiExportValidated := false + wasCacheAPIExportValidated := false for _, action := range cacheClientActions { if action.Matches("update", "apiexports") { updateAction := action.(kcptesting.UpdateAction) if updateAction.GetCluster().String() != "root" { ts.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", updateAction.GetCluster()) } - updatedUnstructuredApiExport := updateAction.GetObject().(*unstructured.Unstructured) - cacheApiExportFromUnstructured := &apisv1alpha1.APIExport{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredApiExport.Object, cacheApiExportFromUnstructured); err != nil { + updatedUnstructuredAPIExport := updateAction.GetObject().(*unstructured.Unstructured) + cacheAPIExportFromUnstructured := &apisv1alpha1.APIExport{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredAPIExport.Object, cacheAPIExportFromUnstructured); err != nil { ts.Fatalf("failed to convert unstructured to APIExport: %v", err) } - expectedApiExport := newAPIExportWithShardAnnotation("foo") - expectedApiExport.Labels["fooLabel"] = "fooLabelVal" - if !equality.Semantic.DeepEqual(cacheApiExportFromUnstructured, expectedApiExport) { - ts.Errorf("unexpected update to the ApiExport:\n%s", cmp.Diff(cacheApiExportFromUnstructured, expectedApiExport)) + expectedAPIExport := newAPIExportWithShardAnnotation("foo") + expectedAPIExport.Labels["fooLabel"] = "fooLabelVal" + if !equality.Semantic.DeepEqual(cacheAPIExportFromUnstructured, expectedAPIExport) { + ts.Errorf("unexpected update to the APIExport:\n%s", cmp.Diff(cacheAPIExportFromUnstructured, expectedAPIExport)) } - wasCacheApiExportValidated = true + wasCacheAPIExportValidated = true break } } - if !wasCacheApiExportValidated { - ts.Errorf("an ApiExport on the cache sever wasn't updated") + if !wasCacheAPIExportValidated { + ts.Errorf("an APIExport on the cache sever wasn't updated") } }, }, { name: "case 3: update, spec changed", - initialLocalApiExports: []runtime.Object{ + initialLocalAPIExports: []runtime.Object{ func() *apisv1alpha1.APIExport { apiExport := newAPIExport("foo") apiExport.Spec.PermissionClaims = []apisv1alpha1.PermissionClaim{{GroupResource: apisv1alpha1.GroupResource{}, IdentityHash: "abc"}} return apiExport }(), }, - initialCacheApiExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, - initCacheFakeClientWithInitialApiExports: true, + initialCacheAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, + initCacheFakeClientWithInitialAPIExports: true, reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), validateFunc: func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { if len(localClientActions) != 0 { ts.Fatal("unexpected REST calls were made to the localDynamicClient") } - wasCacheApiExportValidated := false + wasCacheAPIExportValidated := false for _, action := range cacheClientActions { if action.Matches("update", "apiexports") { updateAction := action.(kcptesting.UpdateAction) if updateAction.GetCluster().String() != "root" { ts.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", updateAction.GetCluster()) } - updatedUnstructuredApiExport := updateAction.GetObject().(*unstructured.Unstructured) - cacheApiExportFromUnstructured := &apisv1alpha1.APIExport{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredApiExport.Object, cacheApiExportFromUnstructured); err != nil { + updatedUnstructuredAPIExport := updateAction.GetObject().(*unstructured.Unstructured) + cacheAPIExportFromUnstructured := &apisv1alpha1.APIExport{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredAPIExport.Object, cacheAPIExportFromUnstructured); err != nil { ts.Fatalf("failed to convert unstructured to APIExport: %v", err) } - expectedApiExport := newAPIExportWithShardAnnotation("foo") - expectedApiExport.Spec.PermissionClaims = []apisv1alpha1.PermissionClaim{{GroupResource: apisv1alpha1.GroupResource{}, IdentityHash: "abc"}} - if !equality.Semantic.DeepEqual(cacheApiExportFromUnstructured, expectedApiExport) { - ts.Errorf("unexpected update to the ApiExport:\n%s", cmp.Diff(cacheApiExportFromUnstructured, expectedApiExport)) + expectedAPIExport := newAPIExportWithShardAnnotation("foo") + expectedAPIExport.Spec.PermissionClaims = []apisv1alpha1.PermissionClaim{{GroupResource: apisv1alpha1.GroupResource{}, IdentityHash: "abc"}} + if !equality.Semantic.DeepEqual(cacheAPIExportFromUnstructured, expectedAPIExport) { + ts.Errorf("unexpected update to the APIExport:\n%s", cmp.Diff(cacheAPIExportFromUnstructured, expectedAPIExport)) } - wasCacheApiExportValidated = true + wasCacheAPIExportValidated = true break } } - if !wasCacheApiExportValidated { - ts.Errorf("an ApiExport on the cache sever wasn't updated") + if !wasCacheAPIExportValidated { + ts.Errorf("an APIExport on the cache sever wasn't updated") } }, }, { name: "case 3: update, status changed", - initialLocalApiExports: []runtime.Object{ + initialLocalAPIExports: []runtime.Object{ func() *apisv1alpha1.APIExport { apiExport := newAPIExport("foo") apiExport.Status.VirtualWorkspaces = []apisv1alpha1.VirtualWorkspace{{URL: "https://acme.dev"}} return apiExport }(), }, - initialCacheApiExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, - initCacheFakeClientWithInitialApiExports: true, + initialCacheAPIExports: []runtime.Object{newAPIExportWithShardAnnotation("foo")}, + initCacheFakeClientWithInitialAPIExports: true, reconcileKey: fmt.Sprintf("%s::root|foo", apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")), validateFunc: func(ts *testing.T, cacheClientActions []kcptesting.Action, localClientActions []kcptesting.Action) { if len(localClientActions) != 0 { ts.Fatal("unexpected REST calls were made to the localDynamicClient") } - wasCacheApiExportValidated := false + wasCacheAPIExportValidated := false for _, action := range cacheClientActions { if action.Matches("update", "apiexports") { updateAction := action.(kcptesting.UpdateAction) if updateAction.GetCluster().String() != "root" { ts.Fatalf("wrong cluster = %s was targeted for cacheDynamicClient", updateAction.GetCluster()) } - updatedUnstructuredApiExport := updateAction.GetObject().(*unstructured.Unstructured) - cacheApiExportFromUnstructured := &apisv1alpha1.APIExport{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredApiExport.Object, cacheApiExportFromUnstructured); err != nil { + updatedUnstructuredAPIExport := updateAction.GetObject().(*unstructured.Unstructured) + cacheAPIExportFromUnstructured := &apisv1alpha1.APIExport{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(updatedUnstructuredAPIExport.Object, cacheAPIExportFromUnstructured); err != nil { ts.Fatalf("failed to convert unstructured to APIExport: %v", err) } - expectedApiExport := newAPIExportWithShardAnnotation("foo") - expectedApiExport.Status.VirtualWorkspaces = []apisv1alpha1.VirtualWorkspace{{URL: "https://acme.dev"}} - if !equality.Semantic.DeepEqual(cacheApiExportFromUnstructured, expectedApiExport) { - ts.Errorf("unexpected update to the ApiExport:\n%s", cmp.Diff(cacheApiExportFromUnstructured, expectedApiExport)) + expectedAPIExport := newAPIExportWithShardAnnotation("foo") + expectedAPIExport.Status.VirtualWorkspaces = []apisv1alpha1.VirtualWorkspace{{URL: "https://acme.dev"}} + if !equality.Semantic.DeepEqual(cacheAPIExportFromUnstructured, expectedAPIExport) { + ts.Errorf("unexpected update to the APIExport:\n%s", cmp.Diff(cacheAPIExportFromUnstructured, expectedAPIExport)) } - wasCacheApiExportValidated = true + wasCacheAPIExportValidated = true break } } - if !wasCacheApiExportValidated { - ts.Errorf("an ApiExport on the cache sever wasn't updated") + if !wasCacheAPIExportValidated { + ts.Errorf("an APIExport on the cache sever wasn't updated") } }, }, @@ -299,22 +299,22 @@ func TestReconcileAPIExports(t *testing.T) { for _, scenario := range scenarios { t.Run(scenario.name, func(tt *testing.T) { target := &controller{shardName: "amber"} - localApiExportIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) - for _, obj := range scenario.initialLocalApiExports { - if err := localApiExportIndexer.Add(obj); err != nil { + localAPIExportIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + for _, obj := range scenario.initialLocalAPIExports { + if err := localAPIExportIndexer.Add(obj); err != nil { tt.Error(err) } } - target.localApiExportLister = apislisters.NewAPIExportLister(localApiExportIndexer) - target.cacheApiExportsIndexer = cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace}) - for _, obj := range scenario.initialCacheApiExports { - if err := target.cacheApiExportsIndexer.Add(obj); err != nil { + target.localAPIExportLister = apislisters.NewAPIExportLister(localAPIExportIndexer) + target.cacheAPIExportsIndexer = cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace}) + for _, obj := range scenario.initialCacheAPIExports { + if err := target.cacheAPIExportsIndexer.Add(obj); err != nil { tt.Error(err) } } fakeCacheDynamicClient := kcpfakedynamic.NewSimpleDynamicClient(scheme, func() []runtime.Object { - if scenario.initCacheFakeClientWithInitialApiExports { - return scenario.initialCacheApiExports + if scenario.initCacheFakeClientWithInitialAPIExports { + return scenario.initialCacheAPIExports } return []runtime.Object{} }()...) diff --git a/test/e2e/reconciler/cache/replication_test.go b/test/e2e/reconciler/cache/replication_test.go index c784dfd8395f..ba117fb8e9db 100644 --- a/test/e2e/reconciler/cache/replication_test.go +++ b/test/e2e/reconciler/cache/replication_test.go @@ -57,10 +57,12 @@ var scenarios = []testScenario{ {"TestReplicateAPIExportNegative", replicateAPIExportNegativeScenario}, {"TestReplicateAPIResourceSchema", replicateAPIResourceSchemaScenario}, {"TestReplicateAPIResourceSchemaNegative", replicateAPIResourceSchemaNegativeScenario}, + {"TestReplicateClusterWorkspaceShard", replicateClusterWorkspaceShardScenario}, + {"TestReplicateClusterWorkspaceShardNegative", replicateClusterWorkspaceShardNegativeScenario}, } // replicateAPIResourceSchemaScenario tests if an APIResourceSchema is propagated to the cache server. -// The test exercises creation, modification and removal of the APIExport object. +// The test exercises creation, modification and removal of the APIResourceSchema object. func replicateAPIResourceSchemaScenario(ctx context.Context, t *testing.T, server framework.RunningServer, kcpShardClusterClient clientset.ClusterInterface, cacheKcpClusterClient clientset.ClusterInterface) { org := framework.NewOrganizationFixture(t, server) cluster := framework.NewWorkspaceFixture(t, server, org, framework.WithShardConstraints(tenancyv1alpha1.ShardConstraints{Name: "root"})) @@ -75,7 +77,7 @@ func replicateAPIResourceSchemaScenario(ctx context.Context, t *testing.T, serve t.Logf("Verify that the source APIResourceSchema %s/%s was replicated to the cache server", cluster, resourceName) scenario.VerifyReplication(ctx, t, cluster) - // not that since the spec of an APIResourceSchema is immutable we are limited to changing some metadata + // note that since the spec of an APIResourceSchema is immutable we are limited to changing some metadata t.Logf("Change some metadata on source APIResourceSchema %s/%s and verify if updates were propagated to the cached object", cluster, resourceName) scenario.UpdateSourceResource(ctx, t, cluster, func(res runtime.Object) error { if err := scenario.ChangeMetadataFor(res); err != nil { @@ -237,7 +239,7 @@ func replicateAPIExportNegativeScenario(ctx context.Context, t *testing.T, serve scenario.DeleteCachedResource(ctx, t, cluster) scenario.VerifyReplication(ctx, t, cluster) - t.Logf("Update cahced APIExport %s/%s so that it differs from the source resource", cluster, scenario.resourceName) + t.Logf("Update cached APIExport %s/%s so that it differs from the source resource", cluster, scenario.resourceName) scenario.UpdateCachedResource(ctx, t, cluster, func(res runtime.Object) error { cachedExport, ok := res.(*apisv1alpha1.APIExport) if !ok { @@ -251,6 +253,101 @@ func replicateAPIExportNegativeScenario(ctx context.Context, t *testing.T, serve scenario.VerifyReplication(ctx, t, cluster) } +// replicateClusterWorkspaceShardScenario tests if a ClusterWorkspaceShard is propagated to the cache server. +// The test exercises creation, modification and removal of the ClusterWorkspaceShard object. +func replicateClusterWorkspaceShardScenario(ctx context.Context, t *testing.T, server framework.RunningServer, kcpShardClusterClient clientset.ClusterInterface, cacheKcpClusterClient clientset.ClusterInterface) { + // The ClusterWorkspaceShard API is per default only available in the root workspace + cluster := tenancyv1alpha1.RootCluster + resourceName := "sheriffs.wild.wild.west" + scenario := &replicateResourceScenario{resourceName: resourceName, resourceKind: "ClusterWorkspaceShard", server: server, kcpShardClusterClient: kcpShardClusterClient, cacheKcpClusterClient: cacheKcpClusterClient} + + t.Logf("Create source ClusterWorkspaceShard %s/%s on the root shard for replication", cluster, resourceName) + scenario.CreateSourceResource(t, func() error { + cws := &tenancyv1alpha1.ClusterWorkspaceShard{ + ObjectMeta: metav1.ObjectMeta{ + Name: resourceName, + }, + Spec: tenancyv1alpha1.ClusterWorkspaceShardSpec{ + BaseURL: "https://base.kcp.test.dev", + }, + } + _, err := kcpShardClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Create(ctx, cws, metav1.CreateOptions{}) + return err + }) + t.Logf("Verify that the source ClusterWorkspaceShard %s/%s was replicated to the cache server", cluster, resourceName) + scenario.VerifyReplication(ctx, t, cluster) + + t.Logf("Change the spec on source ClusterWorkspaceShard %s/%s and verify if updates were propagated to the cached object", cluster, resourceName) + scenario.UpdateSourceResource(ctx, t, cluster, func(res runtime.Object) error { + cws, ok := res.(*tenancyv1alpha1.ClusterWorkspaceShard) + if !ok { + return fmt.Errorf("%T is not *ClusterWorkspaceShard", res) + } + cws.Spec.BaseURL = "https://kcp.test.dev" + _, err := kcpShardClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Update(ctx, cws, metav1.UpdateOptions{}) + return err + }) + scenario.VerifyReplication(ctx, t, cluster) + + t.Logf("Change some metadata on source ClusterWorkspaceShard %s/%s and verify if updates were propagated to the cached object", cluster, resourceName) + scenario.UpdateSourceResource(ctx, t, cluster, func(res runtime.Object) error { + if err := scenario.ChangeMetadataFor(res); err != nil { + return err + } + cws, ok := res.(*tenancyv1alpha1.ClusterWorkspaceShard) + if !ok { + return fmt.Errorf("%T is not *ClusterWorkspaceShard", res) + } + _, err := kcpShardClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Update(ctx, cws, metav1.UpdateOptions{}) + return err + }) + scenario.VerifyReplication(ctx, t, cluster) + + t.Logf("Verify that deleting source ClusterWorkspaceShard %s/%s leads to removal of the cached object", cluster, resourceName) + scenario.DeleteSourceResourceAndVerify(ctx, t, cluster) +} + +// replicateClusterWorkspaceShardNegativeScenario checks if modified or even deleted cached ClusterWorkspaceShard will be reconciled to match the original object +func replicateClusterWorkspaceShardNegativeScenario(ctx context.Context, t *testing.T, server framework.RunningServer, kcpShardClusterClient clientset.ClusterInterface, cacheKcpClusterClient clientset.ClusterInterface) { + // The ClusterWorkspaceShard API is per default only available in the root workspace + cluster := tenancyv1alpha1.RootCluster + resourceName := "negative.sheriffs.wild.wild.west" + scenario := &replicateResourceScenario{resourceName: resourceName, resourceKind: "ClusterWorkspaceShard", server: server, kcpShardClusterClient: kcpShardClusterClient, cacheKcpClusterClient: cacheKcpClusterClient} + + t.Logf("Create source ClusterWorkspaceShard %s/%s on the root shard for replication", cluster, resourceName) + scenario.CreateSourceResource(t, func() error { + cws := &tenancyv1alpha1.ClusterWorkspaceShard{ + ObjectMeta: metav1.ObjectMeta{ + Name: resourceName, + }, + Spec: tenancyv1alpha1.ClusterWorkspaceShardSpec{ + BaseURL: "https://base.kcp.test.dev", + }, + } + _, err := kcpShardClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Create(ctx, cws, metav1.CreateOptions{}) + return err + }) + t.Logf("Verify that the source ClusterWorkspaceShard %s/%s was replicated to the cache server", cluster, resourceName) + scenario.VerifyReplication(ctx, t, cluster) + + t.Logf("Delete cached ClusterWorkspaceShard %s/%s and check if it was brought back by the replication controller", cluster, resourceName) + scenario.DeleteCachedResource(ctx, t, cluster) + scenario.VerifyReplication(ctx, t, cluster) + + t.Logf("Update cached ClusterWorkspaceShard %s/%s so that it differs from the source resource", cluster, scenario.resourceName) + scenario.UpdateCachedResource(ctx, t, cluster, func(res runtime.Object) error { + cachedCws, ok := res.(*tenancyv1alpha1.ClusterWorkspaceShard) + if !ok { + return fmt.Errorf("%T is not *ClusterWorkspaceShard", res) + } + cachedCws.Spec.BaseURL = "https://base2.kcp.test.dev" + _, err := cacheKcpClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Update(cacheclient.WithShardInContext(ctx, shard.New("root")), cachedCws, metav1.UpdateOptions{}) + return err + }) + t.Logf("Verify that the cached ClusterWorkspaceShard %s/%s was brought back by the replication controller after an update", cluster, resourceName) + scenario.VerifyReplication(ctx, t, cluster) +} + // TestCacheServerInProcess runs all test scenarios against a cache server that runs with a kcp server func TestCacheServerInProcess(t *testing.T) { t.Parallel() @@ -413,7 +510,7 @@ func (b *replicateResourceScenario) verifyResourceReplicationHelper(ctx context. } return false, err.Error() } - t.Logf("Compare if both the orignal and replicated resources (%s %s/%s) are the same except %s annotation and ResourceVersion", b.resourceKind, cluster, b.resourceName, genericapirequest.AnnotationKey) + t.Logf("Compare if both the original and replicated resources (%s %s/%s) are the same except %s annotation and ResourceVersion", b.resourceKind, cluster, b.resourceName, genericapirequest.AnnotationKey) cachedResourceMeta, err := meta.Accessor(cachedResource) if err != nil { return false, err.Error() @@ -423,7 +520,7 @@ func (b *replicateResourceScenario) verifyResourceReplicationHelper(ctx context. } delete(cachedResourceMeta.GetAnnotations(), genericapirequest.AnnotationKey) if diff := cmp.Diff(cachedResource, originalResource, cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion")); len(diff) > 0 { - return false, fmt.Sprintf("replicated %s root|%s/%s is different that the original", b.resourceKind, cluster, cachedResourceMeta.GetName()) + return false, fmt.Sprintf("replicated %s root|%s/%s is different from the original", b.resourceKind, cluster, cachedResourceMeta.GetName()) } return true, "" }, wait.ForeverTestTimeout, 100*time.Millisecond) @@ -435,8 +532,10 @@ func (b *replicateResourceScenario) getSourceResourceHelper(ctx context.Context, return b.kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Get(ctx, b.resourceName, metav1.GetOptions{}) case "APIResourceSchema": return b.kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIResourceSchemas().Get(ctx, b.resourceName, metav1.GetOptions{}) + case "ClusterWorkspaceShard": + return b.kcpShardClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Get(ctx, b.resourceName, metav1.GetOptions{}) } - return nil, fmt.Errorf("unable to get a REST client for an uknown %s Kind", b.resourceKind) + return nil, fmt.Errorf("unable to get a REST client for an unknown %s Kind", b.resourceKind) } func (b *replicateResourceScenario) getCachedResourceHelper(ctx context.Context, cluster logicalcluster.Name) (runtime.Object, error) { @@ -445,8 +544,10 @@ func (b *replicateResourceScenario) getCachedResourceHelper(ctx context.Context, return b.cacheKcpClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Get(cacheclient.WithShardInContext(ctx, shard.New("root")), b.resourceName, metav1.GetOptions{}) case "APIResourceSchema": return b.cacheKcpClusterClient.Cluster(cluster).ApisV1alpha1().APIResourceSchemas().Get(cacheclient.WithShardInContext(ctx, shard.New("root")), b.resourceName, metav1.GetOptions{}) + case "ClusterWorkspaceShard": + return b.cacheKcpClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Get(cacheclient.WithShardInContext(ctx, shard.New("root")), b.resourceName, metav1.GetOptions{}) } - return nil, fmt.Errorf("unable to get a REST client for an uknown %s Kind", b.resourceKind) + return nil, fmt.Errorf("unable to get a REST client for an unknown %s Kind", b.resourceKind) } func (b *replicateResourceScenario) deleteSourceResourceHelper(ctx context.Context, cluster logicalcluster.Name) error { @@ -455,8 +556,10 @@ func (b *replicateResourceScenario) deleteSourceResourceHelper(ctx context.Conte return b.kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Delete(ctx, b.resourceName, metav1.DeleteOptions{}) case "APIResourceSchema": return b.kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIResourceSchemas().Delete(ctx, b.resourceName, metav1.DeleteOptions{}) + case "ClusterWorkspaceShard": + return b.kcpShardClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Delete(ctx, b.resourceName, metav1.DeleteOptions{}) } - return fmt.Errorf("unable to get a REST client for an uknown %s Kind", b.resourceKind) + return fmt.Errorf("unable to get a REST client for an unknown %s Kind", b.resourceKind) } func (b *replicateResourceScenario) deleteCachedResource(ctx context.Context, cluster logicalcluster.Name) error { @@ -465,6 +568,8 @@ func (b *replicateResourceScenario) deleteCachedResource(ctx context.Context, cl return b.cacheKcpClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Delete(cacheclient.WithShardInContext(ctx, shard.New("root")), b.resourceName, metav1.DeleteOptions{}) case "APIResourceSchema": return b.cacheKcpClusterClient.Cluster(cluster).ApisV1alpha1().APIResourceSchemas().Delete(cacheclient.WithShardInContext(ctx, shard.New("root")), b.resourceName, metav1.DeleteOptions{}) + case "ClusterWorkspaceShard": + return b.cacheKcpClusterClient.Cluster(cluster).TenancyV1alpha1().ClusterWorkspaceShards().Delete(cacheclient.WithShardInContext(ctx, shard.New("root")), b.resourceName, metav1.DeleteOptions{}) } - return fmt.Errorf("unable to get a REST client for an uknown %s Kind", b.resourceKind) + return fmt.Errorf("unable to get a REST client for an unknown %s Kind", b.resourceKind) }