diff --git a/test/e2e/reconciler/cache/replication_test.go b/test/e2e/reconciler/cache/replication_test.go index dc7af6eabbf..bf7ceb7c794 100644 --- a/test/e2e/reconciler/cache/replication_test.go +++ b/test/e2e/reconciler/cache/replication_test.go @@ -18,6 +18,7 @@ package cache import ( "context" + "encoding/json" "fmt" "path" "path/filepath" @@ -31,8 +32,11 @@ import ( "github.com/kcp-dev/logicalcluster/v2" "github.com/stretchr/testify/require" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" apimachineryerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" @@ -60,109 +64,205 @@ type testScenario struct { // scenarios all test scenarios that will be run against in-process and standalone cache server var scenarios = []testScenario{ {"TestReplicateAPIExport", replicateAPIExportScenario}, + {"TestReplicateAPIExportNegative", replicateAPIExportNegativeScenario}, + {"TestReplicateAPIResourceSchema", replicateAPIResourceSchemaScenario}, + {"TestReplicateAPIResourceSchemaNegative", replicateAPIResourceSchemaNegativeScenario}, } -// replicateAPIExportScenario tests if an APIExport is propagated to the cache server. +// replicateAPIResourceSchemaScenario tests if an APIResourceSchema is propagated to the cache server. // The test exercises creation, modification and removal of the APIExport object. -func replicateAPIExportScenario(ctx context.Context, t *testing.T, server framework.RunningServer, kcpShardClusterClient clientset.ClusterInterface, cacheKcpClusterClient clientset.ClusterInterface) { +func replicateAPIResourceSchemaScenario(ctx context.Context, t *testing.T, server framework.RunningServer, kcpShardClusterClient clientset.ClusterInterface, cacheKcpClusterClient clientset.ClusterInterface) { org := framework.NewOrganizationFixture(t, server) cluster := framework.NewWorkspaceFixture(t, server, org, framework.WithShardConstraints(tenancyv1alpha1.ShardConstraints{Name: "root"})) + resourceName := "today.sheriffs.wild.wild.west" + scenario := &replicateResourceScenario{resourceName: resourceName, resourceKind: "APIResourceSchema", server: server, kcpShardClusterClient: kcpShardClusterClient, cacheKcpClusterClient: cacheKcpClusterClient} - t.Logf("Create an APIExport in %s workspace on the root shard", cluster) - apifixtures.CreateSheriffsSchemaAndExport(ctx, t, cluster, kcpShardClusterClient.Cluster(cluster), "wild.wild.west", "testing replication to the cache server") - var cachedWildAPIExport *apisv1alpha1.APIExport - var wildAPIExport *apisv1alpha1.APIExport - var err error - t.Logf("Get %s/%s APIExport from the root shard and the cache server for comparison", cluster, "wild.wild.west") - framework.Eventually(t, func() (bool, string) { - wildAPIExport, err = kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Get(ctx, "wild.wild.west", metav1.GetOptions{}) - if err != nil { - return false, err.Error() - } - cachedWildAPIExport, err = cacheKcpClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Get(cacheclient.WithShardInContext(ctx, shard.New("root")), wildAPIExport.Name, metav1.GetOptions{}) - if err != nil { - if !errors.IsNotFound(err) { - return false, err.Error() - } - return false, err.Error() - } - t.Logf("Verify if both the orignal APIExport and replicated are the same except %s annotation and ResourceVersion after creation", genericapirequest.AnnotationKey) - if _, found := cachedWildAPIExport.Annotations[genericapirequest.AnnotationKey]; !found { - t.Fatalf("replicated APIExport root/%s/%s, doesn't have %s annotation", cluster, cachedWildAPIExport.Name, genericapirequest.AnnotationKey) + t.Logf("Create source APIResourceSchema %s/%s on the root shard for replication", cluster, resourceName) + scenario.CreateSourceResource(t, func() error { + apifixtures.CreateSheriffsSchemaAndExport(ctx, t, cluster, kcpShardClusterClient.Cluster(cluster), "wild.wild.west", "testing replication to the cache server") + return nil + }) + t.Logf("Verify that the source APIResourceSchema %s/%s was replicated to the cache server", cluster, resourceName) + scenario.VerifyReplication(ctx, t, cluster) + + // not that since the spec of an APIResourceSchema is immutable we are limited to changing some metadata + t.Logf("Change some metadata on source APIResourceSchema %s/%s and verify if updates were propagated to the cached object", cluster, resourceName) + scenario.UpdateSourceResource(ctx, t, cluster, func(res runtime.Object) error { + if err := scenario.ChangeMetadataFor(res); err != nil { + return err } - delete(cachedWildAPIExport.Annotations, genericapirequest.AnnotationKey) - if diff := cmp.Diff(cachedWildAPIExport, wildAPIExport, cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion")); len(diff) > 0 { - return false, fmt.Sprintf("replicated APIExport root/%s/%s is different that the original", cluster, wildAPIExport.Name) + apiResSchema, ok := res.(*apisv1alpha1.APIResourceSchema) + if !ok { + return fmt.Errorf("%T is not *APIResourceSchema", res) } - return true, "" - }, wait.ForeverTestTimeout, 400*time.Millisecond) - - t.Logf("Verify that a spec update on %s/%s APIExport is propagated to the cached object", cluster, "wild.wild.west") - verifyAPIExportUpdate(ctx, t, cluster, kcpShardClusterClient, cacheKcpClusterClient, func(apiExport *apisv1alpha1.APIExport) { - apiExport.Spec.LatestResourceSchemas = append(apiExport.Spec.LatestResourceSchemas, "foo.bar") + _, err := kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIResourceSchemas().Update(ctx, apiResSchema, metav1.UpdateOptions{}) + return err }) - t.Logf("Verify that a metadata update on %s/%s APIExport is propagated ot the cached object", cluster, "wild.wild.west") - verifyAPIExportUpdate(ctx, t, cluster, kcpShardClusterClient, cacheKcpClusterClient, func(apiExport *apisv1alpha1.APIExport) { - if apiExport.Annotations == nil { - apiExport.Annotations = map[string]string{} + scenario.VerifyReplication(ctx, t, cluster) + + t.Logf("Verify that deleting source APIResourceSchema %s/%s leads to removal of the cached object", cluster, resourceName) + scenario.DeleteSourceResourceAndVerify(ctx, t, cluster) +} + +// replicateAPIResourceSchemaNegativeScenario checks if modified or even deleted cached APIResourceSchema will be reconciled to match the original object +func replicateAPIResourceSchemaNegativeScenario(ctx context.Context, t *testing.T, server framework.RunningServer, kcpShardClusterClient clientset.ClusterInterface, cacheKcpClusterClient clientset.ClusterInterface) { + org := framework.NewOrganizationFixture(t, server) + cluster := framework.NewWorkspaceFixture(t, server, org, framework.WithShardConstraints(tenancyv1alpha1.ShardConstraints{Name: "root"})) + resourceName := "juicy.mangodbs.db.io" + scenario := &replicateResourceScenario{resourceName: resourceName, resourceKind: "APIResourceSchema", server: server, kcpShardClusterClient: kcpShardClusterClient, cacheKcpClusterClient: cacheKcpClusterClient} + + t.Logf("Create source APIResourceSchema %s/%s on the root shard for replication", cluster, resourceName) + scenario.CreateSourceResource(t, func() error { + schema := &apisv1alpha1.APIResourceSchema{ + ObjectMeta: metav1.ObjectMeta{ + Name: resourceName, + }, + Spec: apisv1alpha1.APIResourceSchemaSpec{ + Group: "db.io", + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Plural: "mangodbs", + Singular: "mangodb", + Kind: "MangoDB", + ListKind: "MangoDBList", + }, + Scope: "Namespaced", + Versions: []apisv1alpha1.APIResourceVersion{ + { + Name: "v1", + Served: true, + Storage: true, + Schema: runtime.RawExtension{ + Raw: func() []byte { + ret, err := json.Marshal(&apiextensionsv1.JSONSchemaProps{ + Type: "object", + Description: "the best db out there", + }) + if err != nil { + panic(err) + } + + return ret + }(), + }, + }, + }, + }, } - apiExport.Annotations["testAnnotation"] = "testAnnotationValue" + _, err := kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIResourceSchemas().Create(ctx, schema, metav1.CreateOptions{}) + return err }) + t.Logf("Verify that the source APIResourceSchema %s/%s was replicated to the cache server", cluster, resourceName) + scenario.VerifyReplication(ctx, t, cluster) - t.Logf("Verify that deleting %s/%s APIExport leads to removal of the cached object", cluster, "wild.wild.west") - err = kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Delete(ctx, "wild.wild.west", metav1.DeleteOptions{}) - require.NoError(t, err) - framework.Eventually(t, func() (bool, string) { - _, err := cacheKcpClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Get(cacheclient.WithShardInContext(ctx, shard.New("root")), wildAPIExport.Name, metav1.GetOptions{}) - if errors.IsNotFound(err) { - return true, "" + t.Logf("Delete cached APIResourceSchema %s/%s and check if it was brought back by the replication controller", cluster, resourceName) + scenario.DeleteCachedResource(ctx, t, cluster) + scenario.VerifyReplication(ctx, t, cluster) + + t.Logf("Update cahced APIResourceSchema %s/%s so that it differs from the source resource", cluster, scenario.resourceName) + scenario.UpdateCachedResource(ctx, t, cluster, func(res runtime.Object) error { + cachedSchema, ok := res.(*apisv1alpha1.APIResourceSchema) + if !ok { + return fmt.Errorf("%T is not *APIResourceSchema", res) } - if err != nil { - return false, err.Error() + // since the spec of an APIResourceSchema is immutable + // let's modify some metadata + if cachedSchema.Labels == nil { + cachedSchema.Labels = map[string]string{} } - return false, fmt.Sprintf("replicated APIExport root/%s/%s wasn't removed", cluster, cachedWildAPIExport.Name) - }, wait.ForeverTestTimeout, 400*time.Millisecond) + cachedSchema.Labels["foo"] = "bar" + _, err := cacheKcpClusterClient.Cluster(cluster).ApisV1alpha1().APIResourceSchemas().Update(cacheclient.WithShardInContext(ctx, shard.New("root")), cachedSchema, metav1.UpdateOptions{}) + return err + }) + t.Logf("Verify that the cached APIResourceSchema %s/%s was brought back by the replication controller after an update", cluster, resourceName) + scenario.VerifyReplication(ctx, t, cluster) } -func verifyAPIExportUpdate(ctx context.Context, t *testing.T, cluster logicalcluster.Name, kcpRootShardClient clientset.ClusterInterface, cacheKcpClusterClient clientset.ClusterInterface, changeApiExportFn func(*apisv1alpha1.APIExport)) { - var wildAPIExport *apisv1alpha1.APIExport - var updatedWildAPIExport *apisv1alpha1.APIExport - var err error - framework.Eventually(t, func() (bool, string) { - wildAPIExport, err = kcpRootShardClient.Cluster(cluster).ApisV1alpha1().APIExports().Get(ctx, "wild.wild.west", metav1.GetOptions{}) - if err != nil { - return false, err.Error() +// replicateAPIExportScenario tests if an APIExport is propagated to the cache server. +// The test exercises creation, modification and removal of the APIExport object. +func replicateAPIExportScenario(ctx context.Context, t *testing.T, server framework.RunningServer, kcpShardClusterClient clientset.ClusterInterface, cacheKcpClusterClient clientset.ClusterInterface) { + org := framework.NewOrganizationFixture(t, server) + cluster := framework.NewWorkspaceFixture(t, server, org, framework.WithShardConstraints(tenancyv1alpha1.ShardConstraints{Name: "root"})) + resourceName := "wild.wild.west" + scenario := &replicateResourceScenario{resourceName: resourceName, resourceKind: "APIExport", server: server, kcpShardClusterClient: kcpShardClusterClient, cacheKcpClusterClient: cacheKcpClusterClient} + + t.Logf("Create source APIExport %s/%s on the root shard for replication", cluster, resourceName) + scenario.CreateSourceResource(t, func() error { + apifixtures.CreateSheriffsSchemaAndExport(ctx, t, cluster, kcpShardClusterClient.Cluster(cluster), "wild.wild.west", "testing replication to the cache server") + return nil + }) + t.Logf("Verify that the source APIExport %s/%s was replicated to the cache server", cluster, resourceName) + scenario.VerifyReplication(ctx, t, cluster) + + t.Logf("Change the spec on source APIExport %s/%s and verify if updates were propagated to the cached object", cluster, resourceName) + scenario.UpdateSourceResource(ctx, t, cluster, func(res runtime.Object) error { + apiExport, ok := res.(*apisv1alpha1.APIExport) + if !ok { + return fmt.Errorf("%T is not *APIExport", res) } - changeApiExportFn(wildAPIExport) - updatedWildAPIExport, err = kcpRootShardClient.Cluster(cluster).ApisV1alpha1().APIExports().Update(ctx, wildAPIExport, metav1.UpdateOptions{}) - if err != nil { - if !errors.IsConflict(err) { - return false, fmt.Sprintf("unknow error while updating the cached %s/%s/%s APIExport, err: %s", "root", cluster, "wild.wild.west", err.Error()) - } - return false, err.Error() // try again + apiExport.Spec.LatestResourceSchemas = append(apiExport.Spec.LatestResourceSchemas, "foo.bar") + _, err := kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Update(ctx, apiExport, metav1.UpdateOptions{}) + return err + }) + scenario.VerifyReplication(ctx, t, cluster) + + t.Logf("Change some metadata on source APIExport %s/%s and verify if updates were propagated to the cached object", cluster, resourceName) + scenario.UpdateSourceResource(ctx, t, cluster, func(res runtime.Object) error { + if err := scenario.ChangeMetadataFor(res); err != nil { + return err } - return true, "" - }, wait.ForeverTestTimeout, 400*time.Millisecond) - t.Logf("Get root/%s/%s APIExport from the cache server", cluster, "wild.wild.west") - framework.Eventually(t, func() (bool, string) { - cachedWildAPIExport, err := cacheKcpClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Get(cacheclient.WithShardInContext(ctx, shard.New("root")), wildAPIExport.Name, metav1.GetOptions{}) - if err != nil { - return false, err.Error() + apiExport, ok := res.(*apisv1alpha1.APIExport) + if !ok { + return fmt.Errorf("%T is not *APIExport", res) } - t.Logf("Verify if both the orignal APIExport and replicated are the same except %s annotation and ResourceVersion after an update to the spec", genericapirequest.AnnotationKey) - if _, found := cachedWildAPIExport.Annotations[genericapirequest.AnnotationKey]; !found { - return false, fmt.Sprintf("replicated APIExport root/%s/%s, doesn't have %s annotation", cluster, cachedWildAPIExport.Name, genericapirequest.AnnotationKey) + _, err := kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Update(ctx, apiExport, metav1.UpdateOptions{}) + return err + }) + scenario.VerifyReplication(ctx, t, cluster) + + t.Logf("Verify that deleting source APIExport %s/%s leads to removal of the cached object", cluster, resourceName) + scenario.DeleteSourceResourceAndVerify(ctx, t, cluster) +} + +// replicateAPIExportNegativeScenario checks if modified or even deleted cached APIExport will be reconciled to match the original object +func replicateAPIExportNegativeScenario(ctx context.Context, t *testing.T, server framework.RunningServer, kcpShardClusterClient clientset.ClusterInterface, cacheKcpClusterClient clientset.ClusterInterface) { + org := framework.NewOrganizationFixture(t, server) + cluster := framework.NewWorkspaceFixture(t, server, org, framework.WithShardConstraints(tenancyv1alpha1.ShardConstraints{Name: "root"})) + resourceName := "mangodb" + scenario := &replicateResourceScenario{resourceName: resourceName, resourceKind: "APIExport", server: server, kcpShardClusterClient: kcpShardClusterClient, cacheKcpClusterClient: cacheKcpClusterClient} + + t.Logf("Create source APIExport %s/%s on the root shard for replication", cluster, resourceName) + scenario.CreateSourceResource(t, func() error { + export := &apisv1alpha1.APIExport{ + ObjectMeta: metav1.ObjectMeta{ + Name: resourceName, + }, } - delete(cachedWildAPIExport.Annotations, genericapirequest.AnnotationKey) - if diff := cmp.Diff(cachedWildAPIExport, updatedWildAPIExport, cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion")); len(diff) > 0 { - return false, fmt.Sprintf("replicated APIExport root/%s/%s is different that the original, diff: %s", cluster, wildAPIExport.Name, diff) + _, err := kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Create(ctx, export, metav1.CreateOptions{}) + return err + }) + t.Logf("Verify that the source APIExport %s/%s was replicated to the cache server", cluster, resourceName) + scenario.VerifyReplication(ctx, t, cluster) + + t.Logf("Delete cached APIExport %s/%s and check if it was brought back by the replication controller", cluster, resourceName) + scenario.DeleteCachedResource(ctx, t, cluster) + scenario.VerifyReplication(ctx, t, cluster) + + t.Logf("Update cahced APIExport %s/%s so that it differs from the source resource", cluster, scenario.resourceName) + scenario.UpdateCachedResource(ctx, t, cluster, func(res runtime.Object) error { + cachedExport, ok := res.(*apisv1alpha1.APIExport) + if !ok { + return fmt.Errorf("%T is not *APIExport", res) } - return true, "" - }, wait.ForeverTestTimeout, 400*time.Millisecond) + cachedExport.Spec.LatestResourceSchemas = append(cachedExport.Spec.LatestResourceSchemas, "foo") + _, err := cacheKcpClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Update(cacheclient.WithShardInContext(ctx, shard.New("root")), cachedExport, metav1.UpdateOptions{}) + return err + }) + t.Logf("Verify that the cached APIExport %s/%s was brought back by the replication controller after an update", cluster, resourceName) + scenario.VerifyReplication(ctx, t, cluster) } -// TestAllAgainstInProcessCacheServer runs all test scenarios against a cache server that runs with a kcp server -func TestAllAgainstInProcessCacheServer(t *testing.T) { +// TestCacheServerInProcess runs all test scenarios against a cache server that runs with a kcp server +func TestCacheServerInProcess(t *testing.T) { t.Parallel() framework.Suite(t, "control-plane") @@ -185,14 +285,13 @@ func TestAllAgainstInProcessCacheServer(t *testing.T) { for _, scenario := range scenarios { t.Run(scenario.name, func(tt *testing.T) { - tt.Parallel() scenario.work(ctx, tt, server, kcpRootShardClient, cacheKcpClusterClient) }) } } -// TestAllScenariosAgainstStandaloneCacheServer runs all test scenarios against a standalone cache server -func TestAllScenariosAgainstStandaloneCacheServer(t *testing.T) { +// TestCacheServerStandalone runs all test scenarios against a standalone cache server +func TestCacheServerStandalone(t *testing.T) { t.Parallel() framework.Suite(t, "control-plane") @@ -273,7 +372,6 @@ func TestAllScenariosAgainstStandaloneCacheServer(t *testing.T) { for _, scenario := range scenarios { t.Run(scenario.name, func(tt *testing.T) { - tt.Parallel() scenario.work(ctx, tt, server, kcpRootShardClient, cacheKcpClusterClient) }) } @@ -286,3 +384,150 @@ func cacheClientRoundTrippersFor(cfg *rest.Config) *rest.Config { kcpclienthelper.SetMultiClusterRoundTripper(cacheClientRT) return cacheClientRT } + +// replicateResourceScenario an auxiliary struct that is used by all test scenarios defined in this pkg +type replicateResourceScenario struct { + resourceName string + resourceKind string + + server framework.RunningServer + kcpShardClusterClient clientset.ClusterInterface + cacheKcpClusterClient clientset.ClusterInterface +} + +func (b *replicateResourceScenario) CreateSourceResource(t *testing.T, createSourceResource func() error) { + require.NoError(t, createSourceResource()) +} + +func (b *replicateResourceScenario) UpdateSourceResource(ctx context.Context, t *testing.T, cluster logicalcluster.Name, updater func(runtime.Object) error) { + b.resourceUpdateHelper(ctx, t, cluster, b.getSourceResourceHelper, updater) +} + +func (b *replicateResourceScenario) UpdateCachedResource(ctx context.Context, t *testing.T, cluster logicalcluster.Name, updater func(runtime.Object) error) { + b.resourceUpdateHelper(ctx, t, cluster, b.getCachedResourceHelper, updater) +} + +func (b *replicateResourceScenario) DeleteSourceResourceAndVerify(ctx context.Context, t *testing.T, cluster logicalcluster.Name) { + require.NoError(t, b.deleteSourceResourceHelper(ctx, cluster)) + framework.Eventually(t, func() (bool, string) { + _, err := b.getCachedResourceHelper(ctx, cluster) + if errors.IsNotFound(err) { + return true, "" + } + if err != nil { + return false, err.Error() + } + return false, fmt.Sprintf("replicated %s %s/%s wasn't removed", b.resourceKind, cluster, b.resourceName) + }, wait.ForeverTestTimeout, 100*time.Millisecond) +} + +func (b *replicateResourceScenario) DeleteCachedResource(ctx context.Context, t *testing.T, cluster logicalcluster.Name) { + err := b.deleteCachedResource(ctx, cluster) + require.NoError(t, err) +} + +func (b *replicateResourceScenario) VerifyReplication(ctx context.Context, t *testing.T, cluster logicalcluster.Name) { + b.verifyResourceReplicationHelper(ctx, t, cluster) +} + +func (b *replicateResourceScenario) ChangeMetadataFor(originalResource runtime.Object) error { + originalResourceMeta, err := meta.Accessor(originalResource) + if err != nil { + return err + } + annotations := originalResourceMeta.GetAnnotations() + if annotations == nil { + annotations = map[string]string{} + } + annotations["testAnnotation"] = "testAnnotationValue" + originalResourceMeta.SetAnnotations(annotations) + return nil +} + +func (b *replicateResourceScenario) resourceUpdateHelper(ctx context.Context, t *testing.T, cluster logicalcluster.Name, resourceGetter func(ctx context.Context, cluster logicalcluster.Name) (runtime.Object, error), resourceUpdater func(runtime.Object) error) { + framework.Eventually(t, func() (bool, string) { + resource, err := resourceGetter(ctx, cluster) + if err != nil { + return false, err.Error() + } + err = resourceUpdater(resource) + if err != nil { + if !errors.IsConflict(err) { + return false, fmt.Sprintf("unknow error while updating the cached %s/%s/%s, err: %s", b.resourceKind, cluster, b.resourceName, err.Error()) + } + return false, err.Error() // try again + } + return true, "" + }, wait.ForeverTestTimeout, 100*time.Millisecond) +} + +func (b *replicateResourceScenario) verifyResourceReplicationHelper(ctx context.Context, t *testing.T, cluster logicalcluster.Name) { + t.Helper() + t.Logf("Get %s %s/%s from the root shard and the cache server for comparison", b.resourceKind, cluster, b.resourceName) + framework.Eventually(t, func() (bool, string) { + originalResource, err := b.getSourceResourceHelper(ctx, cluster) + if err != nil { + return false, err.Error() + } + cachedResource, err := b.getCachedResourceHelper(ctx, cluster) + if err != nil { + if !errors.IsNotFound(err) { + return true, err.Error() + } + return false, err.Error() + } + t.Logf("Compare if both the orignal and replicated resources (%s %s/%s) are the same except %s annotation and ResourceVersion", b.resourceKind, cluster, b.resourceName, genericapirequest.AnnotationKey) + cachedResourceMeta, err := meta.Accessor(cachedResource) + if err != nil { + return false, err.Error() + } + if _, found := cachedResourceMeta.GetAnnotations()[genericapirequest.AnnotationKey]; !found { + t.Fatalf("replicated %s root|%s/%s, doesn't have %s annotation", b.resourceKind, cluster, cachedResourceMeta.GetName(), genericapirequest.AnnotationKey) + } + delete(cachedResourceMeta.GetAnnotations(), genericapirequest.AnnotationKey) + if diff := cmp.Diff(cachedResource, originalResource, cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion")); len(diff) > 0 { + return false, fmt.Sprintf("replicated %s root|%s/%s is different that the original", b.resourceKind, cluster, cachedResourceMeta.GetName()) + } + return true, "" + }, wait.ForeverTestTimeout, 100*time.Millisecond) +} + +func (b *replicateResourceScenario) getSourceResourceHelper(ctx context.Context, cluster logicalcluster.Name) (runtime.Object, error) { + switch b.resourceKind { + case "APIExport": + return b.kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Get(ctx, b.resourceName, metav1.GetOptions{}) + case "APIResourceSchema": + return b.kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIResourceSchemas().Get(ctx, b.resourceName, metav1.GetOptions{}) + } + return nil, fmt.Errorf("unable to get a REST client for an uknown %s Kind", b.resourceKind) +} + +func (b *replicateResourceScenario) getCachedResourceHelper(ctx context.Context, cluster logicalcluster.Name) (runtime.Object, error) { + switch b.resourceKind { + case "APIExport": + return b.cacheKcpClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Get(cacheclient.WithShardInContext(ctx, shard.New("root")), b.resourceName, metav1.GetOptions{}) + case "APIResourceSchema": + return b.cacheKcpClusterClient.Cluster(cluster).ApisV1alpha1().APIResourceSchemas().Get(cacheclient.WithShardInContext(ctx, shard.New("root")), b.resourceName, metav1.GetOptions{}) + } + return nil, fmt.Errorf("unable to get a REST client for an uknown %s Kind", b.resourceKind) +} + +func (b *replicateResourceScenario) deleteSourceResourceHelper(ctx context.Context, cluster logicalcluster.Name) error { + switch b.resourceKind { + case "APIExport": + return b.kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Delete(ctx, b.resourceName, metav1.DeleteOptions{}) + case "APIResourceSchema": + return b.kcpShardClusterClient.Cluster(cluster).ApisV1alpha1().APIResourceSchemas().Delete(ctx, b.resourceName, metav1.DeleteOptions{}) + } + return fmt.Errorf("unable to get a REST client for an uknown %s Kind", b.resourceKind) +} + +func (b *replicateResourceScenario) deleteCachedResource(ctx context.Context, cluster logicalcluster.Name) error { + switch b.resourceKind { + case "APIExport": + return b.cacheKcpClusterClient.Cluster(cluster).ApisV1alpha1().APIExports().Delete(cacheclient.WithShardInContext(ctx, shard.New("root")), b.resourceName, metav1.DeleteOptions{}) + case "APIResourceSchema": + return b.cacheKcpClusterClient.Cluster(cluster).ApisV1alpha1().APIResourceSchemas().Delete(cacheclient.WithShardInContext(ctx, shard.New("root")), b.resourceName, metav1.DeleteOptions{}) + } + return fmt.Errorf("unable to get a REST client for an uknown %s Kind", b.resourceKind) +}