diff --git a/test/e2e/framework/config.go b/test/e2e/framework/config.go index 2d6853cf92e..c875788056e 100644 --- a/test/e2e/framework/config.go +++ b/test/e2e/framework/config.go @@ -17,11 +17,34 @@ limitations under the License. package framework import ( + "context" "errors" "flag" + "fmt" "path/filepath" + "strings" + "testing" + + "github.com/kcp-dev/logicalcluster/v2" + "github.com/stretchr/testify/require" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/klog/v2" + + tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1" + kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned" ) +func init() { + klog.InitFlags(flag.CommandLine) + if err := flag.Lookup("v").Value.Set("2"); err != nil { + panic(err) + } +} + type testConfig struct { syncerImage string kcpTestImage string @@ -78,3 +101,32 @@ func registerFlags(c *testConfig) { flag.StringVar(&c.kcpTestImage, "kcp-test-image", "", "The test image to use with the pcluster. Requires --pcluster-kubeconfig") flag.BoolVar(&c.useDefaultKCPServer, "use-default-kcp-server", false, "Whether to use server configuration from .kcp/admin.kubeconfig.") } + +// WriteLogicalClusterConfig creates a logical cluster config for the given config and +// cluster name and writes it to the test's artifact path. Useful for configuring the +// workspace plugin with --kubeconfig. +func WriteLogicalClusterConfig(t *testing.T, rawConfig clientcmdapi.Config, contextName string, clusterName logicalcluster.Name) (clientcmd.ClientConfig, string) { + logicalRawConfig := LogicalClusterRawConfig(rawConfig, clusterName, contextName) + artifactDir, err := CreateTempDirForTest(t, "artifacts") + require.NoError(t, err) + pathSafeClusterName := strings.ReplaceAll(clusterName.String(), ":", "_") + kubeconfigPath := filepath.Join(artifactDir, fmt.Sprintf("%s.kubeconfig", pathSafeClusterName)) + err = clientcmd.WriteToFile(logicalRawConfig, kubeconfigPath) + require.NoError(t, err) + logicalConfig := clientcmd.NewNonInteractiveClientConfig(logicalRawConfig, logicalRawConfig.CurrentContext, &clientcmd.ConfigOverrides{}, nil) + return logicalConfig, kubeconfigPath +} + +// ShardConfig returns a rest config that talk directly to the given shard. +func ShardConfig(t *testing.T, kcpClusterClient kcpclientset.ClusterInterface, shardName string, cfg *rest.Config) *rest.Config { + ctx, cancelFunc := context.WithCancel(context.Background()) + t.Cleanup(cancelFunc) + + shard, err := kcpClusterClient.Cluster(tenancyv1alpha1.RootCluster).TenancyV1alpha1().ClusterWorkspaceShards().Get(ctx, shardName, metav1.GetOptions{}) + require.NoError(t, err) + + shardCfg := rest.CopyConfig(cfg) + shardCfg.Host = shard.Spec.BaseURL + + return shardCfg +} diff --git a/test/e2e/framework/fixture.go b/test/e2e/framework/fixture.go deleted file mode 100644 index ed48cc7d4b0..00000000000 --- a/test/e2e/framework/fixture.go +++ /dev/null @@ -1,873 +0,0 @@ -/* -Copyright 2022 The KCP Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package framework - -import ( - "bytes" - "context" - "flag" - "fmt" - "io" - "io/ioutil" - "os" - "os/exec" - "path/filepath" - "strconv" - "strings" - "sync" - "testing" - "time" - - "github.com/kcp-dev/logicalcluster/v2" - "github.com/stretchr/testify/require" - - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes" - kubernetesclientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - clientcmdapi "k8s.io/client-go/tools/clientcmd/api" - "k8s.io/klog/v2" - "sigs.k8s.io/yaml" - - apiresourcev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apiresource/v1alpha1" - tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1" - conditionsapi "github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/apis/conditions/v1alpha1" - "github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/util/conditions" - kcpclient "github.com/kcp-dev/kcp/pkg/client/clientset/versioned" - kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned" - workloadcliplugin "github.com/kcp-dev/kcp/pkg/cliplugins/workload/plugin" - "github.com/kcp-dev/kcp/pkg/syncer" - "github.com/kcp-dev/kcp/pkg/syncer/shared" -) - -func init() { - klog.InitFlags(flag.CommandLine) - if err := flag.Lookup("v").Value.Set("2"); err != nil { - panic(err) - } -} - -// TestServerArgs returns the set of kcp args used to start a test -// server using the token auth file from the working tree. -func TestServerArgs() []string { - return TestServerArgsWithTokenAuthFile("test/e2e/framework/auth-tokens.csv") -} - -// TestServerArgsWithTokenAuthFile returns the set of kcp args used to -// start a test server with the given token auth file. -func TestServerArgsWithTokenAuthFile(tokenAuthFile string) []string { - return []string{ - "-v=4", - "--token-auth-file", tokenAuthFile, - } -} - -// KcpFixture manages the lifecycle of a set of kcp servers. -// -// Deprecated for use outside this package. Prefer PrivateKcpServer(). -type kcpFixture struct { - Servers map[string]RunningServer -} - -// PrivateKcpServer returns a new kcp server fixture managing a new -// server process that is not intended to be shared between tests. -func PrivateKcpServer(t *testing.T, args ...string) RunningServer { - serverName := "main" - f := newKcpFixture(t, kcpConfig{ - Name: serverName, - Args: args, - }) - return f.Servers[serverName] -} - -// SharedKcpServer returns a kcp server fixture intended to be shared -// between tests. A persistent server will be configured if -// `--kcp-kubeconfig` or `--use-default-kcp-server` is supplied to the test -// runner. Otherwise a test-managed server will be started. Only tests -// that are known to be hermetic are compatible with shared fixture. -func SharedKcpServer(t *testing.T) RunningServer { - serverName := "shared" - kubeconfig := TestConfig.KCPKubeconfig() - if len(kubeconfig) > 0 { - // Use a persistent server - - t.Logf("shared kcp server will target configuration %q", kubeconfig) - server, err := newPersistentKCPServer(serverName, kubeconfig, TestConfig.RootShardKubeconfig()) - require.NoError(t, err, "failed to create persistent server fixture") - return server - } - - // Use a test-provisioned server - // - // TODO(marun) Enable non-persistent fixture to be shared across - // tests. This will likely require composing tests into a suite that - // initializes the shared fixture before tests that rely on the - // fixture. - - tokenAuthFile := WriteTokenAuthFile(t) - f := newKcpFixture(t, kcpConfig{ - Name: serverName, - Args: TestServerArgsWithTokenAuthFile(tokenAuthFile), - }) - return f.Servers[serverName] -} - -// Deprecated for use outside this package. Prefer PrivateKcpServer(). -func newKcpFixture(t *testing.T, cfgs ...kcpConfig) *kcpFixture { - f := &kcpFixture{} - - artifactDir, dataDir, err := ScratchDirs(t) - require.NoError(t, err, "failed to create scratch dirs: %v", err) - - // Initialize servers from the provided configuration - var servers []*kcpServer - f.Servers = map[string]RunningServer{} - for _, cfg := range cfgs { - server, err := newKcpServer(t, cfg, artifactDir, dataDir) - require.NoError(t, err) - - servers = append(servers, server) - f.Servers[server.name] = server - } - - // Launch kcp servers and ensure they are ready before starting the test - start := time.Now() - t.Log("Starting kcp servers...") - wg := sync.WaitGroup{} - wg.Add(len(servers)) - for i, srv := range servers { - var opts []RunOption - if LogToConsoleEnvSet() || cfgs[i].LogToConsole { - opts = append(opts, WithLogStreaming) - } - if InProcessEnvSet() || cfgs[i].RunInProcess { - opts = append(opts, RunInProcess) - } - err := srv.Run(opts...) - require.NoError(t, err) - - // Wait for the server to become ready - go func(s *kcpServer, i int) { - defer wg.Done() - err := s.Ready(!cfgs[i].RunInProcess) - require.NoError(t, err, "kcp server %s never became ready: %v", s.name, err) - }(srv, i) - } - wg.Wait() - - if t.Failed() { - t.Fatal("Fixture setup failed: one or more servers did not become ready") - } - - t.Logf("Started kcp servers after %s", time.Since(start)) - - return f -} - -func InProcessEnvSet() bool { - inProcess, _ := strconv.ParseBool(os.Getenv("INPROCESS")) - return inProcess -} - -func LogToConsoleEnvSet() bool { - inProcess, _ := strconv.ParseBool(os.Getenv("LOG_TO_CONSOLE")) - return inProcess -} - -func preserveTestResources() bool { - return os.Getenv("PRESERVE") != "" -} - -func NewOrganizationFixture(t *testing.T, server RunningServer, options ...ClusterWorkspaceOption) (orgClusterName logicalcluster.Name) { - ctx, cancelFunc := context.WithCancel(context.Background()) - t.Cleanup(cancelFunc) - - cfg := server.BaseConfig(t) - clusterClient, err := kcpclientset.NewForConfig(cfg) - require.NoError(t, err, "failed to create kcp cluster client") - - tmpl := &tenancyv1alpha1.ClusterWorkspace{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "e2e-org-", - }, - Spec: tenancyv1alpha1.ClusterWorkspaceSpec{ - Type: tenancyv1alpha1.ClusterWorkspaceTypeReference{ - Name: "organization", - Path: "root", - }, - }, - } - for _, opt := range options { - opt(tmpl) - } - - // we are referring here to a ClusterWorkspaceType that may have just been created; if the admission controller - // does not have a fresh enough cache, our request will be denied as the admission controller does not know the - // type exists. Therefore, we can require.Eventually our way out of this problem. We expect users to create new - // types very infrequently, so we do not think this will be a serious UX issue in the product. - var org *tenancyv1alpha1.ClusterWorkspace - require.Eventually(t, func() bool { - var err error - org, err = clusterClient.TenancyV1alpha1().ClusterWorkspaces().Create(logicalcluster.WithCluster(ctx, tenancyv1alpha1.RootCluster), tmpl, metav1.CreateOptions{}) - if err != nil { - t.Logf("error creating org workspace under %s: %v", tenancyv1alpha1.RootCluster, err) - } - return err == nil - }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to create org workspace under %s", tenancyv1alpha1.RootCluster) - - t.Cleanup(func() { - if preserveTestResources() { - return - } - - ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) - defer cancelFn() - - err := clusterClient.TenancyV1alpha1().ClusterWorkspaces().Delete(logicalcluster.WithCluster(ctx, tenancyv1alpha1.RootCluster), org.Name, metav1.DeleteOptions{}) - if apierrors.IsNotFound(err) { - return // ignore not found error - } - require.NoErrorf(t, err, "failed to delete organization workspace %s", org.Name) - }) - - Eventually(t, func() (bool, string) { - ws, err := clusterClient.TenancyV1alpha1().ClusterWorkspaces().Get(logicalcluster.WithCluster(ctx, tenancyv1alpha1.RootCluster), org.Name, metav1.GetOptions{}) - require.Falsef(t, apierrors.IsNotFound(err), "workspace %s was deleted", org.Name) - if err != nil { - t.Logf("failed to get workspace %s: %v", org.Name, err) - return false, "" - } - return ws.Status.Phase == tenancyv1alpha1.ClusterWorkspacePhaseReady, toYaml(t, ws.Status.Conditions) - }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to wait for organization workspace %s to become ready", org.Name) - - clusterName := tenancyv1alpha1.RootCluster.Join(org.Name) - t.Logf("Created organization workspace %s", clusterName) - return clusterName -} - -func toYaml(t *testing.T, obj interface{}) string { - bs, err := yaml.Marshal(obj) - require.NoError(t, err) - return string(bs) -} - -type ClusterWorkspaceOption func(ws *tenancyv1alpha1.ClusterWorkspace) - -func WithShardConstraints(c tenancyv1alpha1.ShardConstraints) ClusterWorkspaceOption { - return func(ws *tenancyv1alpha1.ClusterWorkspace) { - ws.Spec.Shard = &c - } -} - -func WithType(path logicalcluster.Name, name tenancyv1alpha1.ClusterWorkspaceTypeName) ClusterWorkspaceOption { - return func(ws *tenancyv1alpha1.ClusterWorkspace) { - ws.Spec.Type = tenancyv1alpha1.ClusterWorkspaceTypeReference{ - Name: name, - Path: path.String(), - } - } -} - -func WithName(s string, formatArgs ...interface{}) ClusterWorkspaceOption { - return func(ws *tenancyv1alpha1.ClusterWorkspace) { - ws.Name = fmt.Sprintf(s, formatArgs...) - ws.GenerateName = "" - } -} - -func NewWorkspaceFixture(t *testing.T, server RunningServer, orgClusterName logicalcluster.Name, options ...ClusterWorkspaceOption) (clusterName logicalcluster.Name) { - ctx, cancelFunc := context.WithCancel(context.Background()) - t.Cleanup(cancelFunc) - - cfg := server.BaseConfig(t) - clusterClient, err := kcpclientset.NewClusterForConfig(cfg) - require.NoError(t, err, "failed to construct client for server") - - tmpl := &tenancyv1alpha1.ClusterWorkspace{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: "e2e-workspace-", - }, - Spec: tenancyv1alpha1.ClusterWorkspaceSpec{ - Type: tenancyv1alpha1.ClusterWorkspaceTypeReference{ - Name: tenancyv1alpha1.ClusterWorkspaceTypeName("universal"), - Path: "root", - }, - }, - } - for _, opt := range options { - opt(tmpl) - } - - // we are referring here to a ClusterWorkspaceType that may have just been created; if the admission controller - // does not have a fresh enough cache, our request will be denied as the admission controller does not know the - // type exists. Therefore, we can require.Eventually our way out of this problem. We expect users to create new - // types very infrequently, so we do not think this will be a serious UX issue in the product. - var ws *tenancyv1alpha1.ClusterWorkspace - require.Eventually(t, func() bool { - var err error - ws, err = clusterClient.Cluster(orgClusterName).TenancyV1alpha1().ClusterWorkspaces().Create(ctx, tmpl, metav1.CreateOptions{}) - if err != nil { - t.Logf("error creating workspace under %s: %v", orgClusterName, err) - } - return err == nil - }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to create workspace under %s", orgClusterName) - - t.Cleanup(func() { - if preserveTestResources() { - return - } - - ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) - defer cancelFn() - - err := clusterClient.Cluster(orgClusterName).TenancyV1alpha1().ClusterWorkspaces().Delete(ctx, ws.Name, metav1.DeleteOptions{}) - if apierrors.IsNotFound(err) || apierrors.IsForbidden(err) { - return // ignore not found and forbidden because this probably means the parent has been deleted - } - require.NoErrorf(t, err, "failed to delete workspace %s", ws.Name) - }) - - Eventually(t, func() (bool, string) { - ws, err := clusterClient.Cluster(orgClusterName).TenancyV1alpha1().ClusterWorkspaces().Get(ctx, ws.Name, metav1.GetOptions{}) - require.Falsef(t, apierrors.IsNotFound(err), "workspace %s was deleted", ws.Name) - if err != nil { - t.Logf("failed to get workspace %s: %v", ws.Name, err) - return false, err.Error() - } - return ws.Status.Phase == tenancyv1alpha1.ClusterWorkspacePhaseReady, toYaml(t, ws) - }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to wait for workspace %s to become ready", orgClusterName.Join(ws.Name)) - - wsClusterName := orgClusterName.Join(ws.Name) - t.Logf("Created %s workspace %s", ws.Spec.Type, wsClusterName) - return wsClusterName -} - -// SyncerFixture configures a syncer fixture. Its `Start` method does the work of starting a syncer. -type SyncerFixture struct { - ResourcesToSync sets.String - UpstreamServer RunningServer - WorkspaceClusterName logicalcluster.Name - SyncTargetLogicalClusterName logicalcluster.Name - SyncTargetName string - SyncTargetUID types.UID - InstallCRDs func(config *rest.Config, isLogicalCluster bool) -} - -// SetDefaults ensures a valid configuration even if not all values are explicitly provided. -func (sf *SyncerFixture) setDefaults() { - // Default configuration to avoid tests having to be exaustive - if len(sf.SyncTargetName) == 0 { - // This only needs to vary when more than one syncer need to be tested in a workspace - sf.SyncTargetName = "pcluster-01" - } - if len(sf.SyncTargetUID) == 0 { - sf.SyncTargetUID = types.UID("syncTargetUID") - } - if sf.SyncTargetLogicalClusterName.Empty() { - sf.SyncTargetLogicalClusterName = logicalcluster.New("org:ws:workload") - } - if sf.ResourcesToSync == nil { - // resources-to-sync is additive to the core set of resources so not providing any - // values means default types will still be synced. - sf.ResourcesToSync = sets.NewString() - } -} - -// Start starts a new syncer against the given upstream kcp workspace. Whether the syncer run -// in-process or deployed on a pcluster will depend whether --pcluster-kubeconfig and -// --syncer-image are supplied to the test invocation. -func (sf SyncerFixture) Start(t *testing.T) *StartedSyncerFixture { - sf.setDefaults() - - // Write the upstream logical cluster config to disk for the workspace plugin - upstreamRawConfig, err := sf.UpstreamServer.RawConfig() - require.NoError(t, err) - _, kubeconfigPath := WriteLogicalClusterConfig(t, upstreamRawConfig, sf.WorkspaceClusterName, "base") - - useDeployedSyncer := len(TestConfig.PClusterKubeconfig()) > 0 - - syncerImage := TestConfig.SyncerImage() - if useDeployedSyncer { - require.NotZero(t, len(syncerImage), "--syncer-image must be specified if testing with a deployed syncer") - } else { - // The image needs to be a non-empty string for the plugin command but the value - // doesn't matter if not deploying a syncer. - syncerImage = "not-a-valid-image" - } - - // Run the plugin command to enable the syncer and collect the resulting yaml - t.Logf("Configuring workspace %s for syncing", sf.WorkspaceClusterName) - pluginArgs := []string{ - "workload", - "sync", - sf.SyncTargetName, - "--syncer-image", syncerImage, - "--output-file", "-", - "--qps", "-1", - } - for _, resource := range sf.ResourcesToSync.List() { - pluginArgs = append(pluginArgs, "--resources", resource) - } - syncerYAML := RunKcpCliPlugin(t, kubeconfigPath, pluginArgs) - - var downstreamConfig *rest.Config - var downstreamKubeconfigPath string - if useDeployedSyncer { - // The syncer will target the pcluster identified by `--pcluster-kubeconfig`. - downstreamKubeconfigPath = TestConfig.PClusterKubeconfig() - fs, err := os.Stat(downstreamKubeconfigPath) - require.NoError(t, err) - require.NotZero(t, fs.Size(), "%s points to an empty file", downstreamKubeconfigPath) - rawConfig, err := clientcmd.LoadFromFile(downstreamKubeconfigPath) - require.NoError(t, err, "failed to load pcluster kubeconfig") - config := clientcmd.NewNonInteractiveClientConfig(*rawConfig, rawConfig.CurrentContext, nil, nil) - downstreamConfig, err = config.ClientConfig() - require.NoError(t, err) - } else { - // The syncer will target a logical cluster that is a peer to the current workspace. A - // logical server provides as a lightweight approximation of a pcluster for tests that - // don't need to validate running workloads or interaction with kube controllers. - parentClusterName, ok := sf.WorkspaceClusterName.Parent() - require.True(t, ok, "%s does not have a parent", sf.WorkspaceClusterName) - downstreamServer := NewFakeWorkloadServer(t, sf.UpstreamServer, parentClusterName) - downstreamConfig = downstreamServer.BaseConfig(t) - downstreamKubeconfigPath = downstreamServer.KubeconfigPath() - } - - if sf.InstallCRDs != nil { - // Attempt crd installation to ensure the downstream server has an api surface - // compatible with the test. - sf.InstallCRDs(downstreamConfig, !useDeployedSyncer) - } - - // Apply the yaml output from the plugin to the downstream server - KubectlApply(t, downstreamKubeconfigPath, syncerYAML) - - artifactDir, err := CreateTempDirForTest(t, "artifacts") - if err != nil { - t.Errorf("failed to create temp dir for syncer artifacts: %v", err) - } - - // collect both in deployed and in-process mode - t.Cleanup(func() { - ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) - defer cancelFn() - - t.Logf("Collecting imported resource info: %s", artifactDir) - upstreamCfg := sf.UpstreamServer.BaseConfig(t) - - gather := func(client dynamic.Interface, gvr schema.GroupVersionResource) { - resourceClient := client.Resource(gvr) - - list, err := resourceClient.List(ctx, metav1.ListOptions{}) - if err != nil { - // Don't fail the test - t.Logf("Error gathering %s: %v", gvr, err) - return - } - - for i := range list.Items { - item := list.Items[i] - sf.UpstreamServer.Artifact(t, func() (runtime.Object, error) { - return &item, nil - }) - } - } - - upstreamDynamic, err := dynamic.NewClusterForConfig(upstreamCfg) - require.NoError(t, err, "error creating upstream dynamic client") - - downstreamDynamic, err := dynamic.NewForConfig(downstreamConfig) - require.NoError(t, err, "error creating downstream dynamic client") - - gather(upstreamDynamic.Cluster(sf.WorkspaceClusterName), apiresourcev1alpha1.SchemeGroupVersion.WithResource("apiresourceimports")) - gather(upstreamDynamic.Cluster(sf.WorkspaceClusterName), apiresourcev1alpha1.SchemeGroupVersion.WithResource("negotiatedapiresources")) - gather(upstreamDynamic.Cluster(sf.WorkspaceClusterName), corev1.SchemeGroupVersion.WithResource("namespaces")) - gather(downstreamDynamic, corev1.SchemeGroupVersion.WithResource("namespaces")) - gather(upstreamDynamic.Cluster(sf.WorkspaceClusterName), appsv1.SchemeGroupVersion.WithResource("deployments")) - gather(downstreamDynamic, appsv1.SchemeGroupVersion.WithResource("deployments")) - }) - - // Extract the configuration for an in-process syncer from the resources that were - // applied to the downstream server. This maximizes the parity between the - // configuration of a deployed and in-process syncer. - var syncerID string - for _, doc := range strings.Split(string(syncerYAML), "\n---\n") { - var manifest struct { - metav1.ObjectMeta `json:"metadata"` - } - err := yaml.Unmarshal([]byte(doc), &manifest) - require.NoError(t, err) - if manifest.Namespace != "" { - syncerID = manifest.Namespace - break - } - } - require.NotEmpty(t, syncerID, "failed to extract syncer ID from yaml produced by plugin:\n%s", string(syncerYAML)) - - syncerConfig := syncerConfigFromCluster(t, downstreamConfig, syncerID, syncerID) - - ctx, cancelFunc := context.WithCancel(context.Background()) - t.Cleanup(cancelFunc) - - downstreamKubeClient, err := kubernetesclientset.NewForConfig(downstreamConfig) - require.NoError(t, err) - - if useDeployedSyncer { - t.Cleanup(func() { - ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) - defer cancelFn() - - // collect syncer logs - t.Logf("Collecting syncer pod logs") - func() { - t.Logf("Listing downstream pods in namespace %s", syncerID) - pods, err := downstreamKubeClient.CoreV1().Pods(syncerID).List(ctx, metav1.ListOptions{}) - if err != nil { - t.Logf("failed to list pods in %s: %v", syncerID, err) - return - } - - for _, pod := range pods.Items { - artifactPath := filepath.Join(artifactDir, fmt.Sprintf("syncer-%s-%s.log", syncerID, pod.Name)) - - t.Logf("Collecting downstream logs for pod %s/%s: %s", syncerID, pod.Name, artifactPath) - logs := Kubectl(t, downstreamKubeconfigPath, "-n", syncerID, "logs", pod.Name) - - err = ioutil.WriteFile(artifactPath, logs, 0644) - if err != nil { - t.Logf("failed to write logs for pod %s in %s to %s: %v", pod.Name, syncerID, artifactPath, err) - continue // not fatal - } - } - }() - - if preserveTestResources() { - return - } - - t.Logf("Deleting syncer resources for logical cluster %q, sync target %q", sf.WorkspaceClusterName, syncerConfig.SyncTargetName) - err = downstreamKubeClient.CoreV1().Namespaces().Delete(ctx, syncerID, metav1.DeleteOptions{}) - if err != nil { - t.Errorf("failed to delete Namespace %q: %v", syncerID, err) - } - err = downstreamKubeClient.RbacV1().ClusterRoleBindings().Delete(ctx, syncerID, metav1.DeleteOptions{}) - if err != nil { - t.Errorf("failed to delete ClusterRoleBinding %q: %v", syncerID, err) - } - err = downstreamKubeClient.RbacV1().ClusterRoles().Delete(ctx, syncerID, metav1.DeleteOptions{}) - if err != nil { - t.Errorf("failed to delete ClusterRole %q: %v", syncerID, err) - } - - t.Logf("Deleting synced resources for logical cluster %q, sync target %q", sf.WorkspaceClusterName, syncerConfig.SyncTargetName) - namespaces, err := downstreamKubeClient.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) - if err != nil { - t.Errorf("failed to list namespaces: %v", err) - } - for _, ns := range namespaces.Items { - locator, exists, err := shared.LocatorFromAnnotations(ns.Annotations) - if err != nil { - t.Logf("failed to retrieve locator from ns %q: %v", ns.Name, err) - continue - } - if !exists || locator == nil { - // Not a kcp-synced namespace - continue - } - if locator.Workspace.String() != syncerConfig.SyncTargetWorkspace.String() { - // Not a namespace synced by this syncer - continue - } - err = downstreamKubeClient.CoreV1().Namespaces().Delete(ctx, ns.Name, metav1.DeleteOptions{}) - if err != nil { - t.Logf("failed to delete Namespace %q: %v", ns.Name, err) - } - } - }) - } else { - // Start an in-process syncer - err := syncer.StartSyncer(ctx, syncerConfig, 2, 5*time.Second) - require.NoError(t, err, "syncer failed to start") - } - - startedSyncer := &StartedSyncerFixture{ - SyncerConfig: syncerConfig, - DownstreamConfig: downstreamConfig, - DownstreamKubeClient: downstreamKubeClient, - } - - // The sync target becoming ready indicates the syncer is healthy and has - // successfully sent a heartbeat to kcp. - startedSyncer.WaitForClusterReady(t, ctx) - - return startedSyncer -} - -// StartedSyncerFixture contains the configuration used to start a syncer and interact with its -// downstream cluster. -type StartedSyncerFixture struct { - SyncerConfig *syncer.SyncerConfig - - // Provide cluster-admin config and client for test purposes. The downstream config in - // SyncerConfig will be less privileged. - DownstreamConfig *rest.Config - DownstreamKubeClient kubernetes.Interface -} - -// WaitForClusterReady waits for the cluster to be ready with the given reason. -func (sf *StartedSyncerFixture) WaitForClusterReady(t *testing.T, ctx context.Context) { - cfg := sf.SyncerConfig - - kcpClusterClient, err := kcpclient.NewClusterForConfig(cfg.UpstreamConfig) - require.NoError(t, err) - kcpClient := kcpClusterClient.Cluster(cfg.SyncTargetWorkspace) - EventuallyReady(t, func() (conditions.Getter, error) { - return kcpClient.WorkloadV1alpha1().SyncTargets().Get(ctx, cfg.SyncTargetName, metav1.GetOptions{}) - }, "Waiting for cluster %q condition %q", cfg.SyncTargetName, conditionsapi.ReadyCondition) - t.Logf("Cluster %q is %s", cfg.SyncTargetName, conditionsapi.ReadyCondition) -} - -// WriteLogicalClusterConfig creates a logical cluster config for the given config and -// cluster name and writes it to the test's artifact path. Useful for configuring the -// workspace plugin with --kubeconfig. -func WriteLogicalClusterConfig(t *testing.T, rawConfig clientcmdapi.Config, clusterName logicalcluster.Name, contextName string) (clientcmd.ClientConfig, string) { - logicalRawConfig := LogicalClusterRawConfig(rawConfig, clusterName, contextName) - artifactDir, err := CreateTempDirForTest(t, "artifacts") - require.NoError(t, err) - pathSafeClusterName := strings.ReplaceAll(clusterName.String(), ":", "_") - kubeconfigPath := filepath.Join(artifactDir, fmt.Sprintf("%s.kubeconfig", pathSafeClusterName)) - err = clientcmd.WriteToFile(logicalRawConfig, kubeconfigPath) - require.NoError(t, err) - logicalConfig := clientcmd.NewNonInteractiveClientConfig(logicalRawConfig, logicalRawConfig.CurrentContext, &clientcmd.ConfigOverrides{}, nil) - return logicalConfig, kubeconfigPath -} - -// syncerConfigFromCluster reads the configuration needed to start an in-process -// syncer from the resources applied to a cluster for a deployed syncer. -func syncerConfigFromCluster(t *testing.T, downstreamConfig *rest.Config, namespace, syncerID string) *syncer.SyncerConfig { - ctx, cancelFunc := context.WithCancel(context.Background()) - t.Cleanup(cancelFunc) - - downstreamKubeClient, err := kubernetesclientset.NewForConfig(downstreamConfig) - require.NoError(t, err) - - // Read the upstream kubeconfig from the syncer secret - secret, err := downstreamKubeClient.CoreV1().Secrets(namespace).Get(ctx, syncerID, metav1.GetOptions{}) - require.NoError(t, err) - upstreamConfigBytes := secret.Data[workloadcliplugin.SyncerSecretConfigKey] - require.NotEmpty(t, upstreamConfigBytes, "upstream config is required") - upstreamConfig, err := clientcmd.RESTConfigFromKubeConfig(upstreamConfigBytes) - require.NoError(t, err, "failed to load upstream config") - - // Read the arguments from the syncer deployment - deployment, err := downstreamKubeClient.AppsV1().Deployments(namespace).Get(ctx, syncerID, metav1.GetOptions{}) - require.NoError(t, err) - containers := deployment.Spec.Template.Spec.Containers - require.NotEmpty(t, containers, "expected at least one container in syncer deployment") - argMap, err := syncerArgsToMap(containers[0].Args) - require.NoError(t, err) - - require.NotEmpty(t, argMap["--sync-target-name"], "--sync-target-name is required") - syncTargetName := argMap["--sync-target-name"][0] - require.NotEmpty(t, syncTargetName, "a value for --sync-target-name is required") - - require.NotEmpty(t, argMap["--from-cluster"], "--sync-target-name is required") - fromCluster := argMap["--from-cluster"][0] - require.NotEmpty(t, fromCluster, "a value for --from-cluster is required") - kcpClusterName := logicalcluster.New(fromCluster) - - resourcesToSync := argMap["--resources"] - require.NotEmpty(t, fromCluster, "--resources is required") - - // Read the downstream token from the deployment's service account secret - var tokenSecret corev1.Secret - Eventually(t, func() (bool, string) { - secrets, err := downstreamKubeClient.CoreV1().Secrets(namespace).List(ctx, metav1.ListOptions{}) - if err != nil { - t.Errorf("failed to list secrets: %v", err) - return false, fmt.Sprintf("failed to list secrets downstream: %v", err) - } - for _, secret := range secrets.Items { - t.Logf("checking secret %s/%s for annotation %s=%s", secret.Namespace, secret.Name, corev1.ServiceAccountNameKey, syncerID) - if secret.Annotations[corev1.ServiceAccountNameKey] == syncerID { - tokenSecret = secret - return len(secret.Data["token"]) > 0, fmt.Sprintf("token secret %s/%s for service account %s found", namespace, secret.Name, syncerID) - } - } - return false, fmt.Sprintf("token secret for service account %s/%s not found", namespace, syncerID) - }, wait.ForeverTestTimeout/30, time.Millisecond*100, "token secret in namespace %q for syncer service account %q not found", namespace, syncerID) - token := tokenSecret.Data["token"] - require.NotEmpty(t, token, "token is required") - - // Compose a new downstream config that uses the token - downstreamConfigWithToken := ConfigWithToken(string(token), rest.CopyConfig(downstreamConfig)) - return &syncer.SyncerConfig{ - UpstreamConfig: upstreamConfig, - DownstreamConfig: downstreamConfigWithToken, - ResourcesToSync: sets.NewString(resourcesToSync...), - SyncTargetWorkspace: kcpClusterName, - SyncTargetName: syncTargetName, - } -} - -// syncerArgsToMap converts the cli argument list from a syncer deployment into a map -// keyed by flags. -func syncerArgsToMap(args []string) (map[string][]string, error) { - argMap := map[string][]string{} - for _, arg := range args { - argParts := strings.Split(arg, "=") - if len(argParts) != 2 { - return nil, fmt.Errorf("arg %q isn't of the expected form `=`", arg) - } - key, value := argParts[0], argParts[1] - if _, ok := argMap[key]; !ok { - argMap[key] = []string{value} - } else { - argMap[key] = append(argMap[key], value) - } - } - return argMap, nil -} - -// KcpCliPluginCommand returns the cli args to run the workspace plugin directly or -// via go run (depending on whether NO_GORUN is set). -func KcpCliPluginCommand() []string { - if NoGoRunEnvSet() { - return []string{"kubectl", "kcp"} - - } else { - cmdPath := filepath.Join(RepositoryDir(), "cmd", "kubectl-kcp") - return []string{"go", "run", cmdPath} - } -} - -// RunKcpCliPlugin runs the kcp workspace plugin with the provided subcommand and -// returns the combined stderr and stdout output. -func RunKcpCliPlugin(t *testing.T, kubeconfigPath string, subcommand []string) []byte { - ctx, cancelFunc := context.WithCancel(context.Background()) - t.Cleanup(cancelFunc) - - cmdParts := append(KcpCliPluginCommand(), subcommand...) - cmd := exec.CommandContext(ctx, cmdParts[0], cmdParts[1:]...) - - cmd.Env = os.Environ() - // TODO(marun) Consider configuring the workspace plugin with args instead of this env - cmd.Env = append(cmd.Env, fmt.Sprintf("KUBECONFIG=%s", kubeconfigPath)) - - t.Logf("running: KUBECONFIG=%s %s", kubeconfigPath, strings.Join(cmdParts, " ")) - - var output, _, combined bytes.Buffer - var lock sync.Mutex - cmd.Stdout = split{a: locked{mu: &lock, w: &combined}, b: &output} - cmd.Stderr = locked{mu: &lock, w: &combined} - err := cmd.Run() - if err != nil { - t.Logf("kcp plugin output:\n%s", combined.String()) - } - require.NoError(t, err, "error running kcp plugin command") - return output.Bytes() -} - -type split struct { - a, b io.Writer -} - -func (w split) Write(p []byte) (int, error) { - w.a.Write(p) // nolint: errcheck - return w.b.Write(p) -} - -type locked struct { - mu *sync.Mutex - w io.Writer -} - -func (w locked) Write(p []byte) (int, error) { - w.mu.Lock() - defer w.mu.Unlock() - return w.w.Write(p) -} - -// KubectlApply runs kubectl apply -f with the supplied input piped to stdin and returns -// the combined stderr and stdout output. -func KubectlApply(t *testing.T, kubeconfigPath string, input []byte) []byte { - ctx, cancelFunc := context.WithCancel(context.Background()) - t.Cleanup(cancelFunc) - - cmdParts := []string{"kubectl", "--kubeconfig", kubeconfigPath, "apply", "-f", "-"} - cmd := exec.CommandContext(ctx, cmdParts[0], cmdParts[1:]...) - stdin, err := cmd.StdinPipe() - require.NoError(t, err) - _, err = stdin.Write(input) - require.NoError(t, err) - // Close to ensure kubectl doesn't keep waiting for input - err = stdin.Close() - require.NoError(t, err) - - t.Logf("running: %s", strings.Join(cmdParts, " ")) - - output, err := cmd.CombinedOutput() - if err != nil { - t.Logf("kubectl apply output:\n%s", output) - } - require.NoError(t, err) - - return output -} - -// Kubectl runs kubectl with the given arguments and returns the combined stderr and stdout. -func Kubectl(t *testing.T, kubeconfigPath string, args ...string) []byte { - ctx, cancelFunc := context.WithCancel(context.Background()) - t.Cleanup(cancelFunc) - - cmdParts := append([]string{"kubectl", "--kubeconfig", kubeconfigPath}, args...) - cmd := exec.CommandContext(ctx, cmdParts[0], cmdParts[1:]...) - t.Logf("running: %s", strings.Join(cmdParts, " ")) - - output, err := cmd.CombinedOutput() - if err != nil { - t.Logf("kubectl output:\n%s", output) - } - require.NoError(t, err) - - return output -} - -// ShardConfig returns a rest config that talk directly to the given shard. -func ShardConfig(t *testing.T, kcpClusterClient kcpclientset.ClusterInterface, shardName string, cfg *rest.Config) *rest.Config { - ctx, cancelFunc := context.WithCancel(context.Background()) - t.Cleanup(cancelFunc) - - shard, err := kcpClusterClient.Cluster(tenancyv1alpha1.RootCluster).TenancyV1alpha1().ClusterWorkspaceShards().Get(ctx, shardName, metav1.GetOptions{}) - require.NoError(t, err) - - shardCfg := rest.CopyConfig(cfg) - shardCfg.Host = shard.Spec.BaseURL - - return shardCfg -} diff --git a/test/e2e/framework/kcp.go b/test/e2e/framework/kcp.go index fe020e261cc..17123cd422b 100644 --- a/test/e2e/framework/kcp.go +++ b/test/e2e/framework/kcp.go @@ -59,6 +59,137 @@ import ( kubefixtures "github.com/kcp-dev/kcp/test/e2e/fixtures/kube" ) +// TestServerArgs returns the set of kcp args used to start a test +// server using the token auth file from the working tree. +func TestServerArgs() []string { + return TestServerArgsWithTokenAuthFile("test/e2e/framework/auth-tokens.csv") +} + +// TestServerArgsWithTokenAuthFile returns the set of kcp args used to +// start a test server with the given token auth file. +func TestServerArgsWithTokenAuthFile(tokenAuthFile string) []string { + return []string{ + "-v=4", + "--token-auth-file", tokenAuthFile, + } +} + +// KcpFixture manages the lifecycle of a set of kcp servers. +// +// Deprecated for use outside this package. Prefer PrivateKcpServer(). +type kcpFixture struct { + Servers map[string]RunningServer +} + +// PrivateKcpServer returns a new kcp server fixture managing a new +// server process that is not intended to be shared between tests. +func PrivateKcpServer(t *testing.T, args ...string) RunningServer { + serverName := "main" + f := newKcpFixture(t, kcpConfig{ + Name: serverName, + Args: args, + }) + return f.Servers[serverName] +} + +// SharedKcpServer returns a kcp server fixture intended to be shared +// between tests. A persistent server will be configured if +// `--kcp-kubeconfig` or `--use-default-kcp-server` is supplied to the test +// runner. Otherwise a test-managed server will be started. Only tests +// that are known to be hermetic are compatible with shared fixture. +func SharedKcpServer(t *testing.T) RunningServer { + serverName := "shared" + kubeconfig := TestConfig.KCPKubeconfig() + if len(kubeconfig) > 0 { + // Use a persistent server + + t.Logf("shared kcp server will target configuration %q", kubeconfig) + server, err := newPersistentKCPServer(serverName, kubeconfig, TestConfig.RootShardKubeconfig()) + require.NoError(t, err, "failed to create persistent server fixture") + return server + } + + // Use a test-provisioned server + // + // TODO(marun) Enable non-persistent fixture to be shared across + // tests. This will likely require composing tests into a suite that + // initializes the shared fixture before tests that rely on the + // fixture. + + tokenAuthFile := WriteTokenAuthFile(t) + f := newKcpFixture(t, kcpConfig{ + Name: serverName, + Args: TestServerArgsWithTokenAuthFile(tokenAuthFile), + }) + return f.Servers[serverName] +} + +// Deprecated for use outside this package. Prefer PrivateKcpServer(). +func newKcpFixture(t *testing.T, cfgs ...kcpConfig) *kcpFixture { + f := &kcpFixture{} + + artifactDir, dataDir, err := ScratchDirs(t) + require.NoError(t, err, "failed to create scratch dirs: %v", err) + + // Initialize servers from the provided configuration + var servers []*kcpServer + f.Servers = map[string]RunningServer{} + for _, cfg := range cfgs { + server, err := newKcpServer(t, cfg, artifactDir, dataDir) + require.NoError(t, err) + + servers = append(servers, server) + f.Servers[server.name] = server + } + + // Launch kcp servers and ensure they are ready before starting the test + start := time.Now() + t.Log("Starting kcp servers...") + wg := sync.WaitGroup{} + wg.Add(len(servers)) + for i, srv := range servers { + var opts []RunOption + if LogToConsoleEnvSet() || cfgs[i].LogToConsole { + opts = append(opts, WithLogStreaming) + } + if InProcessEnvSet() || cfgs[i].RunInProcess { + opts = append(opts, RunInProcess) + } + err := srv.Run(opts...) + require.NoError(t, err) + + // Wait for the server to become ready + go func(s *kcpServer, i int) { + defer wg.Done() + err := s.Ready(!cfgs[i].RunInProcess) + require.NoError(t, err, "kcp server %s never became ready: %v", s.name, err) + }(srv, i) + } + wg.Wait() + + if t.Failed() { + t.Fatal("Fixture setup failed: one or more servers did not become ready") + } + + t.Logf("Started kcp servers after %s", time.Since(start)) + + return f +} + +func InProcessEnvSet() bool { + inProcess, _ := strconv.ParseBool(os.Getenv("INPROCESS")) + return inProcess +} + +func LogToConsoleEnvSet() bool { + inProcess, _ := strconv.ParseBool(os.Getenv("LOG_TO_CONSOLE")) + return inProcess +} + +func preserveTestResources() bool { + return os.Getenv("PRESERVE") != "" +} + type RunningServer interface { Name() string KubeconfigPath() string @@ -644,7 +775,7 @@ func NewFakeWorkloadServer(t *testing.T, server RunningServer, org logicalcluste logicalClusterName := NewWorkspaceFixture(t, server, org) rawConfig, err := server.RawConfig() require.NoError(t, err, "failed to read config for server") - logicalConfig, kubeconfigPath := WriteLogicalClusterConfig(t, rawConfig, logicalClusterName, "base") + logicalConfig, kubeconfigPath := WriteLogicalClusterConfig(t, rawConfig, "base", logicalClusterName) fakeServer := &unmanagedKCPServer{ name: logicalClusterName.String(), cfg: logicalConfig, diff --git a/test/e2e/framework/kubectl.go b/test/e2e/framework/kubectl.go new file mode 100644 index 00000000000..232e7d97f8e --- /dev/null +++ b/test/e2e/framework/kubectl.go @@ -0,0 +1,136 @@ +/* +Copyright 2022 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +// KcpCliPluginCommand returns the cli args to run the workspace plugin directly or +// via go run (depending on whether NO_GORUN is set). +func KcpCliPluginCommand() []string { + if NoGoRunEnvSet() { + return []string{"kubectl", "kcp"} + + } else { + cmdPath := filepath.Join(RepositoryDir(), "cmd", "kubectl-kcp") + return []string{"go", "run", cmdPath} + } +} + +// RunKcpCliPlugin runs the kcp workspace plugin with the provided subcommand and +// returns the combined stderr and stdout output. +func RunKcpCliPlugin(t *testing.T, kubeconfigPath string, subcommand []string) []byte { + ctx, cancelFunc := context.WithCancel(context.Background()) + t.Cleanup(cancelFunc) + + cmdParts := append(KcpCliPluginCommand(), subcommand...) + cmd := exec.CommandContext(ctx, cmdParts[0], cmdParts[1:]...) + + cmd.Env = os.Environ() + // TODO(marun) Consider configuring the workspace plugin with args instead of this env + cmd.Env = append(cmd.Env, fmt.Sprintf("KUBECONFIG=%s", kubeconfigPath)) + + t.Logf("running: KUBECONFIG=%s %s", kubeconfigPath, strings.Join(cmdParts, " ")) + + var output, _, combined bytes.Buffer + var lock sync.Mutex + cmd.Stdout = split{a: locked{mu: &lock, w: &combined}, b: &output} + cmd.Stderr = locked{mu: &lock, w: &combined} + err := cmd.Run() + if err != nil { + t.Logf("kcp plugin output:\n%s", combined.String()) + } + require.NoError(t, err, "error running kcp plugin command") + return output.Bytes() +} + +// KubectlApply runs kubectl apply -f with the supplied input piped to stdin and returns +// the combined stderr and stdout output. +func KubectlApply(t *testing.T, kubeconfigPath string, input []byte) []byte { + ctx, cancelFunc := context.WithCancel(context.Background()) + t.Cleanup(cancelFunc) + + cmdParts := []string{"kubectl", "--kubeconfig", kubeconfigPath, "apply", "-f", "-"} + cmd := exec.CommandContext(ctx, cmdParts[0], cmdParts[1:]...) + stdin, err := cmd.StdinPipe() + require.NoError(t, err) + _, err = stdin.Write(input) + require.NoError(t, err) + // Close to ensure kubectl doesn't keep waiting for input + err = stdin.Close() + require.NoError(t, err) + + t.Logf("running: %s", strings.Join(cmdParts, " ")) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("kubectl apply output:\n%s", output) + } + require.NoError(t, err) + + return output +} + +// Kubectl runs kubectl with the given arguments and returns the combined stderr and stdout. +func Kubectl(t *testing.T, kubeconfigPath string, args ...string) []byte { + ctx, cancelFunc := context.WithCancel(context.Background()) + t.Cleanup(cancelFunc) + + cmdParts := append([]string{"kubectl", "--kubeconfig", kubeconfigPath}, args...) + cmd := exec.CommandContext(ctx, cmdParts[0], cmdParts[1:]...) + t.Logf("running: %s", strings.Join(cmdParts, " ")) + + output, err := cmd.CombinedOutput() + if err != nil { + t.Logf("kubectl output:\n%s", output) + } + require.NoError(t, err) + + return output +} + +type locked struct { + mu *sync.Mutex + w io.Writer +} + +func (w locked) Write(p []byte) (int, error) { + w.mu.Lock() + defer w.mu.Unlock() + return w.w.Write(p) +} + +type split struct { + a, b io.Writer +} + +func (w split) Write(p []byte) (int, error) { + w.a.Write(p) // nolint: errcheck + return w.b.Write(p) +} diff --git a/test/e2e/framework/syncer.go b/test/e2e/framework/syncer.go new file mode 100644 index 00000000000..58e613623b1 --- /dev/null +++ b/test/e2e/framework/syncer.go @@ -0,0 +1,437 @@ +/* +Copyright 2022 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/kcp-dev/logicalcluster/v2" + "github.com/stretchr/testify/require" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + kubernetesclientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "sigs.k8s.io/yaml" + + apiresourcev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apiresource/v1alpha1" + conditionsapi "github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/apis/conditions/v1alpha1" + "github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/util/conditions" + kcpclient "github.com/kcp-dev/kcp/pkg/client/clientset/versioned" + workloadcliplugin "github.com/kcp-dev/kcp/pkg/cliplugins/workload/plugin" + "github.com/kcp-dev/kcp/pkg/syncer" + "github.com/kcp-dev/kcp/pkg/syncer/shared" +) + +type SyncerOption func(t *testing.T, fs *syncerFixture) + +func NewSyncerFixture(t *testing.T, server RunningServer, clusterName logicalcluster.Name, opts ...SyncerOption) *syncerFixture { + sf := &syncerFixture{ + upstreamServer: server, + workspaceClusterName: clusterName, + syncTargetClusterName: clusterName, + syncTargetName: "psyncer-01", + } + for _, opt := range opts { + opt(t, sf) + } + return sf +} + +// syncerFixture configures a syncer fixture. Its `Start` method does the work of starting a syncer. +type syncerFixture struct { + upstreamServer RunningServer + + workspaceClusterName logicalcluster.Name + + syncTargetClusterName logicalcluster.Name + syncTargetName string + + extraResourcesToSync []string + prepareDownstream func(config *rest.Config, isFakePCluster bool) +} + +func WithSyncTarget(clusterName logicalcluster.Name, name string) SyncerOption { + return func(t *testing.T, sf *syncerFixture) { + sf.syncTargetClusterName = clusterName + sf.syncTargetName = name + } +} + +func WithExtraResources(resources ...string) SyncerOption { + return func(t *testing.T, sf *syncerFixture) { + sf.extraResourcesToSync = append(sf.extraResourcesToSync, resources...) + } +} + +func WithDownstreamPreparation(prepare func(config *rest.Config, isFakePCluster bool)) SyncerOption { + return func(t *testing.T, sf *syncerFixture) { + sf.prepareDownstream = prepare + } +} + +// Start starts a new syncer against the given upstream kcp workspace. Whether the syncer run +// in-process or deployed on a pcluster will depend whether --pcluster-kubeconfig and +// --syncer-image are supplied to the test invocation. +func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture { + // Write the upstream logical cluster config to disk for the workspace plugin + upstreamRawConfig, err := sf.upstreamServer.RawConfig() + require.NoError(t, err) + _, kubeconfigPath := WriteLogicalClusterConfig(t, upstreamRawConfig, "base", sf.workspaceClusterName) + + useDeployedSyncer := len(TestConfig.PClusterKubeconfig()) > 0 + + syncerImage := TestConfig.SyncerImage() + if useDeployedSyncer { + require.NotZero(t, len(syncerImage), "--syncer-image must be specified if testing with a deployed syncer") + } else { + // The image needs to be a non-empty string for the plugin command but the value doesn't matter if not deploying a syncer. + syncerImage = "not-a-valid-image" + } + + // Run the plugin command to enable the syncer and collect the resulting yaml + t.Logf("Configuring workspace %s for syncing", sf.workspaceClusterName) + pluginArgs := []string{ + "workload", + "sync", + sf.syncTargetName, + "--syncer-image", syncerImage, + "--output-file", "-", + "--qps", "-1", + } + for _, resource := range sf.extraResourcesToSync { + pluginArgs = append(pluginArgs, "--resources", resource) + } + syncerYAML := RunKcpCliPlugin(t, kubeconfigPath, pluginArgs) + + var downstreamConfig *rest.Config + var downstreamKubeconfigPath string + if useDeployedSyncer { + // The syncer will target the pcluster identified by `--pcluster-kubeconfig`. + downstreamKubeconfigPath = TestConfig.PClusterKubeconfig() + fs, err := os.Stat(downstreamKubeconfigPath) + require.NoError(t, err) + require.NotZero(t, fs.Size(), "%s points to an empty file", downstreamKubeconfigPath) + rawConfig, err := clientcmd.LoadFromFile(downstreamKubeconfigPath) + require.NoError(t, err, "failed to load pcluster kubeconfig") + config := clientcmd.NewNonInteractiveClientConfig(*rawConfig, rawConfig.CurrentContext, nil, nil) + downstreamConfig, err = config.ClientConfig() + require.NoError(t, err) + } else { + // The syncer will target a logical cluster that is a peer to the current workspace. A + // logical server provides as a lightweight approximation of a pcluster for tests that + // don't need to validate running workloads or interaction with kube controllers. + parentClusterName, ok := sf.workspaceClusterName.Parent() + require.True(t, ok, "%s does not have a parent", sf.workspaceClusterName) + downstreamServer := NewFakeWorkloadServer(t, sf.upstreamServer, parentClusterName) + downstreamConfig = downstreamServer.BaseConfig(t) + downstreamKubeconfigPath = downstreamServer.KubeconfigPath() + } + + if sf.prepareDownstream != nil { + // Attempt crd installation to ensure the downstream server has an api surface + // compatible with the test. + sf.prepareDownstream(downstreamConfig, !useDeployedSyncer) + } + + // Apply the yaml output from the plugin to the downstream server + KubectlApply(t, downstreamKubeconfigPath, syncerYAML) + + artifactDir, err := CreateTempDirForTest(t, "artifacts") + if err != nil { + t.Errorf("failed to create temp dir for syncer artifacts: %v", err) + } + + // collect both in deployed and in-process mode + t.Cleanup(func() { + ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) + defer cancelFn() + + t.Logf("Collecting imported resource info: %s", artifactDir) + upstreamCfg := sf.upstreamServer.BaseConfig(t) + + gather := func(client dynamic.Interface, gvr schema.GroupVersionResource) { + resourceClient := client.Resource(gvr) + + list, err := resourceClient.List(ctx, metav1.ListOptions{}) + if err != nil { + // Don't fail the test + t.Logf("Error gathering %s: %v", gvr, err) + return + } + + for i := range list.Items { + item := list.Items[i] + sf.upstreamServer.Artifact(t, func() (runtime.Object, error) { + return &item, nil + }) + } + } + + upstreamDynamic, err := dynamic.NewClusterForConfig(upstreamCfg) + require.NoError(t, err, "error creating upstream dynamic client") + + downstreamDynamic, err := dynamic.NewForConfig(downstreamConfig) + require.NoError(t, err, "error creating downstream dynamic client") + + gather(upstreamDynamic.Cluster(sf.workspaceClusterName), apiresourcev1alpha1.SchemeGroupVersion.WithResource("apiresourceimports")) + gather(upstreamDynamic.Cluster(sf.workspaceClusterName), apiresourcev1alpha1.SchemeGroupVersion.WithResource("negotiatedapiresources")) + gather(upstreamDynamic.Cluster(sf.workspaceClusterName), corev1.SchemeGroupVersion.WithResource("namespaces")) + gather(downstreamDynamic, corev1.SchemeGroupVersion.WithResource("namespaces")) + gather(upstreamDynamic.Cluster(sf.workspaceClusterName), appsv1.SchemeGroupVersion.WithResource("deployments")) + gather(downstreamDynamic, appsv1.SchemeGroupVersion.WithResource("deployments")) + }) + + // Extract the configuration for an in-process syncer from the resources that were + // applied to the downstream server. This maximizes the parity between the + // configuration of a deployed and in-process syncer. + var syncerID string + for _, doc := range strings.Split(string(syncerYAML), "\n---\n") { + var manifest struct { + metav1.ObjectMeta `json:"metadata"` + } + err := yaml.Unmarshal([]byte(doc), &manifest) + require.NoError(t, err) + if manifest.Namespace != "" { + syncerID = manifest.Namespace + break + } + } + require.NotEmpty(t, syncerID, "failed to extract syncer namespace from yaml produced by plugin:\n%s", string(syncerYAML)) + + syncerConfig := syncerConfigFromCluster(t, downstreamConfig, syncerID, syncerID) + + ctx, cancelFunc := context.WithCancel(context.Background()) + t.Cleanup(cancelFunc) + + downstreamKubeClient, err := kubernetesclientset.NewForConfig(downstreamConfig) + require.NoError(t, err) + + if useDeployedSyncer { + t.Cleanup(func() { + ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) + defer cancelFn() + + // collect syncer logs + t.Logf("Collecting syncer pod logs") + func() { + t.Logf("Listing downstream pods in namespace %s", syncerID) + pods, err := downstreamKubeClient.CoreV1().Pods(syncerID).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Logf("failed to list pods in %s: %v", syncerID, err) + return + } + + for _, pod := range pods.Items { + artifactPath := filepath.Join(artifactDir, fmt.Sprintf("syncer-%s-%s.log", syncerID, pod.Name)) + + t.Logf("Collecting downstream logs for pod %s/%s: %s", syncerID, pod.Name, artifactPath) + logs := Kubectl(t, downstreamKubeconfigPath, "-n", syncerID, "logs", pod.Name) + + err = ioutil.WriteFile(artifactPath, logs, 0644) + if err != nil { + t.Logf("failed to write logs for pod %s in %s to %s: %v", pod.Name, syncerID, artifactPath, err) + continue // not fatal + } + } + }() + + if preserveTestResources() { + return + } + + t.Logf("Deleting syncer resources for logical cluster %q, sync target %q", sf.workspaceClusterName, syncerConfig.SyncTargetName) + err = downstreamKubeClient.CoreV1().Namespaces().Delete(ctx, syncerID, metav1.DeleteOptions{}) + if err != nil { + t.Errorf("failed to delete Namespace %q: %v", syncerID, err) + } + err = downstreamKubeClient.RbacV1().ClusterRoleBindings().Delete(ctx, syncerID, metav1.DeleteOptions{}) + if err != nil { + t.Errorf("failed to delete ClusterRoleBinding %q: %v", syncerID, err) + } + err = downstreamKubeClient.RbacV1().ClusterRoles().Delete(ctx, syncerID, metav1.DeleteOptions{}) + if err != nil { + t.Errorf("failed to delete ClusterRole %q: %v", syncerID, err) + } + + t.Logf("Deleting synced resources for logical cluster %s, sync target %s|%s", sf.workspaceClusterName, syncerConfig.SyncTargetWorkspace, syncerConfig.SyncTargetName) + namespaces, err := downstreamKubeClient.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) + if err != nil { + t.Errorf("failed to list namespaces: %v", err) + } + for _, ns := range namespaces.Items { + locator, exists, err := shared.LocatorFromAnnotations(ns.Annotations) + require.NoError(t, err, "failed to extract locator from namespace %s", ns.Name) + if !exists { + continue // Not a kcp-synced namespace + } + if locator.Workspace != sf.workspaceClusterName { + continue // Not a namespace synced from this upstream workspace + } + if locator.SyncTarget.Workspace != syncerConfig.SyncTargetWorkspace.String() || + locator.SyncTarget.Name != syncerConfig.SyncTargetName { + continue // Not a namespace synced by this syncer + } + if err = downstreamKubeClient.CoreV1().Namespaces().Delete(ctx, ns.Name, metav1.DeleteOptions{}); err != nil { + t.Logf("failed to delete Namespace %q: %v", ns.Name, err) + } + } + }) + } else { + // Start an in-process syncer + err := syncer.StartSyncer(ctx, syncerConfig, 2, 5*time.Second) + require.NoError(t, err, "syncer failed to start") + } + + startedSyncer := &StartedSyncerFixture{ + SyncerConfig: syncerConfig, + DownstreamConfig: downstreamConfig, + DownstreamKubeClient: downstreamKubeClient, + } + + // The sync target becoming ready indicates the syncer is healthy and has + // successfully sent a heartbeat to kcp. + startedSyncer.WaitForClusterReady(t, ctx) + + return startedSyncer +} + +// StartedSyncerFixture contains the configuration used to start a syncer and interact with its +// downstream cluster. +type StartedSyncerFixture struct { + SyncerConfig *syncer.SyncerConfig + + // Provide cluster-admin config and client for test purposes. The downstream config in + // SyncerConfig will be less privileged. + DownstreamConfig *rest.Config + DownstreamKubeClient kubernetes.Interface +} + +// WaitForClusterReady waits for the cluster to be ready with the given reason. +func (sf *StartedSyncerFixture) WaitForClusterReady(t *testing.T, ctx context.Context) { + cfg := sf.SyncerConfig + + kcpClusterClient, err := kcpclient.NewClusterForConfig(cfg.UpstreamConfig) + require.NoError(t, err) + kcpClient := kcpClusterClient.Cluster(cfg.SyncTargetWorkspace) + EventuallyReady(t, func() (conditions.Getter, error) { + return kcpClient.WorkloadV1alpha1().SyncTargets().Get(ctx, cfg.SyncTargetName, metav1.GetOptions{}) + }, "Waiting for cluster %q condition %q", cfg.SyncTargetName, conditionsapi.ReadyCondition) + t.Logf("Cluster %q is %s", cfg.SyncTargetName, conditionsapi.ReadyCondition) +} + +// syncerConfigFromCluster reads the configuration needed to start an in-process +// syncer from the resources applied to a cluster for a deployed syncer. +func syncerConfigFromCluster(t *testing.T, downstreamConfig *rest.Config, namespace, syncerID string) *syncer.SyncerConfig { + ctx, cancelFunc := context.WithCancel(context.Background()) + t.Cleanup(cancelFunc) + + downstreamKubeClient, err := kubernetesclientset.NewForConfig(downstreamConfig) + require.NoError(t, err) + + // Read the upstream kubeconfig from the syncer secret + secret, err := downstreamKubeClient.CoreV1().Secrets(namespace).Get(ctx, syncerID, metav1.GetOptions{}) + require.NoError(t, err) + upstreamConfigBytes := secret.Data[workloadcliplugin.SyncerSecretConfigKey] + require.NotEmpty(t, upstreamConfigBytes, "upstream config is required") + upstreamConfig, err := clientcmd.RESTConfigFromKubeConfig(upstreamConfigBytes) + require.NoError(t, err, "failed to load upstream config") + + // Read the arguments from the syncer deployment + deployment, err := downstreamKubeClient.AppsV1().Deployments(namespace).Get(ctx, syncerID, metav1.GetOptions{}) + require.NoError(t, err) + containers := deployment.Spec.Template.Spec.Containers + require.NotEmpty(t, containers, "expected at least one container in syncer deployment") + argMap, err := syncerArgsToMap(containers[0].Args) + require.NoError(t, err) + + require.NotEmpty(t, argMap["--sync-target-name"], "--sync-target-name is required") + syncTargetName := argMap["--sync-target-name"][0] + require.NotEmpty(t, syncTargetName, "a value for --sync-target-name is required") + + require.NotEmpty(t, argMap["--from-cluster"], "--sync-target-name is required") + fromCluster := argMap["--from-cluster"][0] + require.NotEmpty(t, fromCluster, "a value for --from-cluster is required") + kcpClusterName := logicalcluster.New(fromCluster) + + resourcesToSync := argMap["--resources"] + require.NotEmpty(t, fromCluster, "--resources is required") + + // Read the downstream token from the deployment's service account secret + var tokenSecret corev1.Secret + Eventually(t, func() (bool, string) { + secrets, err := downstreamKubeClient.CoreV1().Secrets(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Errorf("failed to list secrets: %v", err) + return false, fmt.Sprintf("failed to list secrets downstream: %v", err) + } + for _, secret := range secrets.Items { + t.Logf("checking secret %s/%s for annotation %s=%s", secret.Namespace, secret.Name, corev1.ServiceAccountNameKey, syncerID) + if secret.Annotations[corev1.ServiceAccountNameKey] == syncerID { + tokenSecret = secret + return len(secret.Data["token"]) > 0, fmt.Sprintf("token secret %s/%s for service account %s found", namespace, secret.Name, syncerID) + } + } + return false, fmt.Sprintf("token secret for service account %s/%s not found", namespace, syncerID) + }, wait.ForeverTestTimeout/30, time.Millisecond*100, "token secret in namespace %q for syncer service account %q not found", namespace, syncerID) + token := tokenSecret.Data["token"] + require.NotEmpty(t, token, "token is required") + + // Compose a new downstream config that uses the token + downstreamConfigWithToken := ConfigWithToken(string(token), rest.CopyConfig(downstreamConfig)) + return &syncer.SyncerConfig{ + UpstreamConfig: upstreamConfig, + DownstreamConfig: downstreamConfigWithToken, + ResourcesToSync: sets.NewString(resourcesToSync...), + SyncTargetWorkspace: kcpClusterName, + SyncTargetName: syncTargetName, + } +} + +// syncerArgsToMap converts the cli argument list from a syncer deployment into a map +// keyed by flags. +func syncerArgsToMap(args []string) (map[string][]string, error) { + argMap := map[string][]string{} + for _, arg := range args { + argParts := strings.Split(arg, "=") + if len(argParts) != 2 { + return nil, fmt.Errorf("arg %q isn't of the expected form `=`", arg) + } + key, value := argParts[0], argParts[1] + if _, ok := argMap[key]; !ok { + argMap[key] = []string{value} + } else { + argMap[key] = append(argMap[key], value) + } + } + return argMap, nil +} diff --git a/test/e2e/framework/workspaces.go b/test/e2e/framework/workspaces.go new file mode 100644 index 00000000000..97221bc7226 --- /dev/null +++ b/test/e2e/framework/workspaces.go @@ -0,0 +1,199 @@ +/* +Copyright 2022 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package framework + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/kcp-dev/logicalcluster/v2" + "github.com/stretchr/testify/require" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/yaml" + + tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1" + kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned" +) + +type ClusterWorkspaceOption func(ws *tenancyv1alpha1.ClusterWorkspace) + +func WithShardConstraints(c tenancyv1alpha1.ShardConstraints) ClusterWorkspaceOption { + return func(ws *tenancyv1alpha1.ClusterWorkspace) { + ws.Spec.Shard = &c + } +} + +func WithType(path logicalcluster.Name, name tenancyv1alpha1.ClusterWorkspaceTypeName) ClusterWorkspaceOption { + return func(ws *tenancyv1alpha1.ClusterWorkspace) { + ws.Spec.Type = tenancyv1alpha1.ClusterWorkspaceTypeReference{ + Name: name, + Path: path.String(), + } + } +} + +func WithName(s string, formatArgs ...interface{}) ClusterWorkspaceOption { + return func(ws *tenancyv1alpha1.ClusterWorkspace) { + ws.Name = fmt.Sprintf(s, formatArgs...) + ws.GenerateName = "" + } +} + +func NewWorkspaceFixture(t *testing.T, server RunningServer, orgClusterName logicalcluster.Name, options ...ClusterWorkspaceOption) (clusterName logicalcluster.Name) { + ctx, cancelFunc := context.WithCancel(context.Background()) + t.Cleanup(cancelFunc) + + cfg := server.BaseConfig(t) + clusterClient, err := kcpclientset.NewClusterForConfig(cfg) + require.NoError(t, err, "failed to construct client for server") + + tmpl := &tenancyv1alpha1.ClusterWorkspace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "e2e-workspace-", + }, + Spec: tenancyv1alpha1.ClusterWorkspaceSpec{ + Type: tenancyv1alpha1.ClusterWorkspaceTypeReference{ + Name: tenancyv1alpha1.ClusterWorkspaceTypeName("universal"), + Path: "root", + }, + }, + } + for _, opt := range options { + opt(tmpl) + } + + // we are referring here to a ClusterWorkspaceType that may have just been created; if the admission controller + // does not have a fresh enough cache, our request will be denied as the admission controller does not know the + // type exists. Therefore, we can require.Eventually our way out of this problem. We expect users to create new + // types very infrequently, so we do not think this will be a serious UX issue in the product. + var ws *tenancyv1alpha1.ClusterWorkspace + require.Eventually(t, func() bool { + var err error + ws, err = clusterClient.Cluster(orgClusterName).TenancyV1alpha1().ClusterWorkspaces().Create(ctx, tmpl, metav1.CreateOptions{}) + if err != nil { + t.Logf("error creating workspace under %s: %v", orgClusterName, err) + } + return err == nil + }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to create workspace under %s", orgClusterName) + + t.Cleanup(func() { + if preserveTestResources() { + return + } + + ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) + defer cancelFn() + + err := clusterClient.Cluster(orgClusterName).TenancyV1alpha1().ClusterWorkspaces().Delete(ctx, ws.Name, metav1.DeleteOptions{}) + if apierrors.IsNotFound(err) || apierrors.IsForbidden(err) { + return // ignore not found and forbidden because this probably means the parent has been deleted + } + require.NoErrorf(t, err, "failed to delete workspace %s", ws.Name) + }) + + Eventually(t, func() (bool, string) { + ws, err := clusterClient.Cluster(orgClusterName).TenancyV1alpha1().ClusterWorkspaces().Get(ctx, ws.Name, metav1.GetOptions{}) + require.Falsef(t, apierrors.IsNotFound(err), "workspace %s was deleted", ws.Name) + if err != nil { + t.Logf("failed to get workspace %s: %v", ws.Name, err) + return false, err.Error() + } + return ws.Status.Phase == tenancyv1alpha1.ClusterWorkspacePhaseReady, toYaml(t, ws) + }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to wait for workspace %s to become ready", orgClusterName.Join(ws.Name)) + + wsClusterName := orgClusterName.Join(ws.Name) + t.Logf("Created %s workspace %s", ws.Spec.Type, wsClusterName) + return wsClusterName +} + +func NewOrganizationFixture(t *testing.T, server RunningServer, options ...ClusterWorkspaceOption) (orgClusterName logicalcluster.Name) { + ctx, cancelFunc := context.WithCancel(context.Background()) + t.Cleanup(cancelFunc) + + cfg := server.BaseConfig(t) + clusterClient, err := kcpclientset.NewForConfig(cfg) + require.NoError(t, err, "failed to create kcp cluster client") + + tmpl := &tenancyv1alpha1.ClusterWorkspace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "e2e-org-", + }, + Spec: tenancyv1alpha1.ClusterWorkspaceSpec{ + Type: tenancyv1alpha1.ClusterWorkspaceTypeReference{ + Name: "organization", + Path: "root", + }, + }, + } + for _, opt := range options { + opt(tmpl) + } + + // we are referring here to a ClusterWorkspaceType that may have just been created; if the admission controller + // does not have a fresh enough cache, our request will be denied as the admission controller does not know the + // type exists. Therefore, we can require.Eventually our way out of this problem. We expect users to create new + // types very infrequently, so we do not think this will be a serious UX issue in the product. + var org *tenancyv1alpha1.ClusterWorkspace + require.Eventually(t, func() bool { + var err error + org, err = clusterClient.TenancyV1alpha1().ClusterWorkspaces().Create(logicalcluster.WithCluster(ctx, tenancyv1alpha1.RootCluster), tmpl, metav1.CreateOptions{}) + if err != nil { + t.Logf("error creating org workspace under %s: %v", tenancyv1alpha1.RootCluster, err) + } + return err == nil + }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to create org workspace under %s", tenancyv1alpha1.RootCluster) + + t.Cleanup(func() { + if preserveTestResources() { + return + } + + ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) + defer cancelFn() + + err := clusterClient.TenancyV1alpha1().ClusterWorkspaces().Delete(logicalcluster.WithCluster(ctx, tenancyv1alpha1.RootCluster), org.Name, metav1.DeleteOptions{}) + if apierrors.IsNotFound(err) { + return // ignore not found error + } + require.NoErrorf(t, err, "failed to delete organization workspace %s", org.Name) + }) + + Eventually(t, func() (bool, string) { + ws, err := clusterClient.TenancyV1alpha1().ClusterWorkspaces().Get(logicalcluster.WithCluster(ctx, tenancyv1alpha1.RootCluster), org.Name, metav1.GetOptions{}) + require.Falsef(t, apierrors.IsNotFound(err), "workspace %s was deleted", org.Name) + if err != nil { + t.Logf("failed to get workspace %s: %v", org.Name, err) + return false, "" + } + return ws.Status.Phase == tenancyv1alpha1.ClusterWorkspacePhaseReady, toYaml(t, ws.Status.Conditions) + }, wait.ForeverTestTimeout, time.Millisecond*100, "failed to wait for organization workspace %s to become ready", org.Name) + + clusterName := tenancyv1alpha1.RootCluster.Join(org.Name) + t.Logf("Created organization workspace %s", clusterName) + return clusterName +} + +func toYaml(t *testing.T, obj interface{}) string { + bs, err := yaml.Marshal(obj) + require.NoError(t, err) + return string(bs) +} diff --git a/test/e2e/reconciler/cluster/controller_test.go b/test/e2e/reconciler/cluster/controller_test.go index adb35575151..95b49ab9d89 100644 --- a/test/e2e/reconciler/cluster/controller_test.go +++ b/test/e2e/reconciler/cluster/controller_test.go @@ -31,7 +31,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" kubernetesclient "k8s.io/client-go/kubernetes" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" @@ -176,19 +175,16 @@ func TestClusterController(t *testing.T) { sourceKubeClient := sourceKubeClusterClient.Cluster(wsClusterName) sourceWildwestClient := sourceWildwestClusterClient.Cluster(wsClusterName) - syncerFixture := framework.SyncerFixture{ - ResourcesToSync: sets.NewString("cowboys.wildwest.dev"), - UpstreamServer: source, - WorkspaceClusterName: wsClusterName, - InstallCRDs: func(config *rest.Config, isLogicalCluster bool) { + syncerFixture := framework.NewSyncerFixture(t, source, wsClusterName, + framework.WithExtraResources("cowboys.wildwest.dev"), + framework.WithDownstreamPreparation(func(config *rest.Config, isFakePCluster bool) { // Always install the crd regardless of whether the target is // logical or not since cowboys is not a native type. sinkCrdClient, err := apiextensionsclientset.NewForConfig(config) require.NoError(t, err) t.Log("Installing test CRDs into sink cluster...") fixturewildwest.Create(t, sinkCrdClient.ApiextensionsV1().CustomResourceDefinitions(), metav1.GroupResource{Group: wildwest.GroupName, Resource: "cowboys"}) - }, - }.Start(t) + })).Start(t) sinkWildwestClient, err := wildwestclientset.NewForConfig(syncerFixture.DownstreamConfig) require.NoError(t, err) diff --git a/test/e2e/reconciler/ingress/controller_test.go b/test/e2e/reconciler/ingress/controller_test.go index 4f11eb797fd..8ecafe40961 100644 --- a/test/e2e/reconciler/ingress/controller_test.go +++ b/test/e2e/reconciler/ingress/controller_test.go @@ -161,12 +161,10 @@ func TestIngressController(t *testing.T) { sourceKcpClient := sourceKcpClusterClient.Cluster(clusterName) t.Logf("Deploy syncer") - syncerFixture := framework.SyncerFixture{ - ResourcesToSync: sets.NewString("ingresses.networking.k8s.io", "services"), - UpstreamServer: source, - WorkspaceClusterName: clusterName, - InstallCRDs: func(config *rest.Config, isLogicalCluster bool) { - if !isLogicalCluster { + syncerFixture := framework.NewSyncerFixture(t, source, clusterName, + framework.WithExtraResources("ingresses.networking.k8s.io", "services"), + framework.WithDownstreamPreparation(func(config *rest.Config, isFakePCluster bool) { + if !isFakePCluster { // Only need to install services and ingresses in a logical cluster return } @@ -178,8 +176,8 @@ func TestIngressController(t *testing.T) { metav1.GroupResource{Group: "networking.k8s.io", Resource: "ingresses"}, ) require.NoError(t, err) - }, - }.Start(t) + }), + ).Start(t) t.Log("Wait for \"kubernetes\" apiexport") require.Eventually(t, func() bool { diff --git a/test/e2e/reconciler/namespace/controller_test.go b/test/e2e/reconciler/namespace/controller_test.go index da47d58b36f..546c217443b 100644 --- a/test/e2e/reconciler/namespace/controller_test.go +++ b/test/e2e/reconciler/namespace/controller_test.go @@ -91,10 +91,7 @@ func TestNamespaceScheduler(t *testing.T) { // TODO(marun) Extract the heartbeater out of the syncer for reuse in a test fixture. The namespace // controller just needs ready clusters which can be accomplished without a syncer by having the // heartbeater update the sync target so the heartbeat controller can set the cluster ready. - syncerFixture := framework.SyncerFixture{ - UpstreamServer: server, - WorkspaceClusterName: server.clusterName, - }.Start(t) + syncerFixture := framework.NewSyncerFixture(t, server, server.clusterName).Start(t) syncTargetName := syncerFixture.SyncerConfig.SyncTargetName t.Log("Wait for \"kubernetes\" apiexport") diff --git a/test/e2e/reconciler/scheduling/controller_test.go b/test/e2e/reconciler/scheduling/controller_test.go index 89988e744a6..9ec36d4cc33 100644 --- a/test/e2e/reconciler/scheduling/controller_test.go +++ b/test/e2e/reconciler/scheduling/controller_test.go @@ -72,13 +72,11 @@ func TestScheduling(t *testing.T) { syncTargetName := fmt.Sprintf("synctarget-%d", +rand.Intn(1000000)) t.Logf("Creating a SyncTarget and syncer in %s", negotiationClusterName) - syncerFixture := framework.SyncerFixture{ - ResourcesToSync: sets.NewString("services"), - UpstreamServer: source, - WorkspaceClusterName: negotiationClusterName, - SyncTargetName: syncTargetName, - InstallCRDs: func(config *rest.Config, isLogicalCluster bool) { - if !isLogicalCluster { + syncerFixture := framework.NewSyncerFixture(t, source, negotiationClusterName, + framework.WithExtraResources("services"), + framework.WithSyncTarget(negotiationClusterName, syncTargetName), + framework.WithDownstreamPreparation(func(config *rest.Config, isFakePCluster bool) { + if !isFakePCluster { // Only need to install services and ingresses in a logical cluster return } @@ -89,8 +87,8 @@ func TestScheduling(t *testing.T) { metav1.GroupResource{Group: "core.k8s.io", Resource: "services"}, ) require.NoError(t, err) - }, - }.Start(t) + }), + ).Start(t) t.Logf("Wait for APIResourceImports to show up in the negotiation workspace") require.Eventually(t, func() bool { diff --git a/test/e2e/reconciler/scheduling/multi_placements_test.go b/test/e2e/reconciler/scheduling/multi_placements_test.go index ded543dc382..cca306412cd 100644 --- a/test/e2e/reconciler/scheduling/multi_placements_test.go +++ b/test/e2e/reconciler/scheduling/multi_placements_test.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -68,13 +67,11 @@ func TestMultiPlacement(t *testing.T) { firstSyncTargetName := fmt.Sprintf("synctarget-%d", +rand.Intn(1000000)) t.Logf("Creating a SyncTarget and syncer in %s", locationClusterName) - firstSyncerFixture := framework.SyncerFixture{ - ResourcesToSync: sets.NewString("services"), - UpstreamServer: source, - WorkspaceClusterName: locationClusterName, - SyncTargetName: firstSyncTargetName, - InstallCRDs: func(config *rest.Config, isLogicalCluster bool) { - if !isLogicalCluster { + firstSyncerFixture := framework.NewSyncerFixture(t, source, locationClusterName, + framework.WithSyncTarget(locationClusterName, firstSyncTargetName), + framework.WithExtraResources("services"), + framework.WithDownstreamPreparation(func(config *rest.Config, isFakePCluster bool) { + if !isFakePCluster { // Only need to install services and ingresses in a logical cluster return } @@ -85,18 +82,16 @@ func TestMultiPlacement(t *testing.T) { metav1.GroupResource{Group: "core.k8s.io", Resource: "services"}, ) require.NoError(t, err) - }, - }.Start(t) + }), + ).Start(t) secondSyncTargetName := fmt.Sprintf("synctarget-%d", +rand.Intn(1000000)) t.Logf("Creating a SyncTarget and syncer in %s", locationClusterName) - secondSyncerFixture := framework.SyncerFixture{ - ResourcesToSync: sets.NewString("services"), - UpstreamServer: source, - WorkspaceClusterName: locationClusterName, - SyncTargetName: secondSyncTargetName, - InstallCRDs: func(config *rest.Config, isLogicalCluster bool) { - if !isLogicalCluster { + secondSyncerFixture := framework.NewSyncerFixture(t, source, locationClusterName, + framework.WithExtraResources("services"), + framework.WithSyncTarget(locationClusterName, secondSyncTargetName), + framework.WithDownstreamPreparation(func(config *rest.Config, isFakePCluster bool) { + if !isFakePCluster { // Only need to install services and ingresses in a logical cluster return } @@ -107,8 +102,8 @@ func TestMultiPlacement(t *testing.T) { metav1.GroupResource{Group: "core.k8s.io", Resource: "services"}, ) require.NoError(t, err) - }, - }.Start(t) + }), + ).Start(t) t.Log("Label synctarget") patchData1 := `{"metadata":{"labels":{"loc":"loc1"}}}` diff --git a/test/e2e/reconciler/scheduling/placement_scheduler_test.go b/test/e2e/reconciler/scheduling/placement_scheduler_test.go index 5f600fc0c22..c68b318145f 100644 --- a/test/e2e/reconciler/scheduling/placement_scheduler_test.go +++ b/test/e2e/reconciler/scheduling/placement_scheduler_test.go @@ -29,7 +29,6 @@ import ( apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -67,13 +66,11 @@ func TestPlacementUpdate(t *testing.T) { firstSyncTargetName := fmt.Sprintf("synctarget-%d", +rand.Intn(1000000)) t.Logf("Creating a SyncTarget and syncer in %s", locationClusterName) - syncerFixture := framework.SyncerFixture{ - ResourcesToSync: sets.NewString("services"), - UpstreamServer: source, - WorkspaceClusterName: locationClusterName, - SyncTargetName: firstSyncTargetName, - InstallCRDs: func(config *rest.Config, isLogicalCluster bool) { - if !isLogicalCluster { + syncerFixture := framework.NewSyncerFixture(t, source, locationClusterName, + framework.WithSyncTarget(locationClusterName, firstSyncTargetName), + framework.WithExtraResources("services"), + framework.WithDownstreamPreparation(func(config *rest.Config, isFakePCluster bool) { + if !isFakePCluster { // Only need to install services and ingresses in a logical cluster return } @@ -84,8 +81,8 @@ func TestPlacementUpdate(t *testing.T) { metav1.GroupResource{Group: "core.k8s.io", Resource: "services"}, ) require.NoError(t, err) - }, - }.Start(t) + }), + ).Start(t) t.Log("Wait for \"default\" location") require.Eventually(t, func() bool { diff --git a/test/e2e/syncer/syncer_test.go b/test/e2e/syncer/syncer_test.go index 017cb68c102..23484ffd029 100644 --- a/test/e2e/syncer/syncer_test.go +++ b/test/e2e/syncer/syncer_test.go @@ -64,10 +64,7 @@ func TestSyncerLifecycle(t *testing.T) { // its sync target to go ready. This implicitly validates the syncer // heartbeating and the heartbeat controller setting the sync target ready in // response. - syncerFixture := framework.SyncerFixture{ - UpstreamServer: upstreamServer, - WorkspaceClusterName: wsClusterName, - }.Start(t) + syncerFixture := framework.NewSyncerFixture(t, upstreamServer, wsClusterName).Start(t) ctx, cancelFunc := context.WithCancel(context.Background()) t.Cleanup(cancelFunc) @@ -88,10 +85,10 @@ func TestSyncerLifecycle(t *testing.T) { downstreamKubeClient, err := kubernetesclientset.NewForConfig(syncerFixture.DownstreamConfig) require.NoError(t, err) - kcpClient, err := kcpclientset.NewForConfig(syncerFixture.SyncerConfig.UpstreamConfig) + upstreamKcpClient, err := kcpclientset.NewForConfig(syncerFixture.SyncerConfig.UpstreamConfig) require.NoError(t, err) - syncTarget, err := kcpClient.WorkloadV1alpha1().SyncTargets().Get(ctx, + syncTarget, err := upstreamKcpClient.WorkloadV1alpha1().SyncTargets().Get(ctx, syncerFixture.SyncerConfig.SyncTargetName, metav1.GetOptions{}, ) @@ -449,7 +446,7 @@ func TestSyncWorkload(t *testing.T) { // Write the upstream logical cluster config to disk for the workspace plugin upstreamRawConfig, err := upstreamServer.RawConfig() require.NoError(t, err) - _, kubeconfigPath := framework.WriteLogicalClusterConfig(t, upstreamRawConfig, wsClusterName, "base") + _, kubeconfigPath := framework.WriteLogicalClusterConfig(t, upstreamRawConfig, "base", wsClusterName) subCommand := []string{ "workload", @@ -482,7 +479,7 @@ func TestCordonUncordonDrain(t *testing.T) { // Write the upstream logical cluster config to disk for the workspace plugin upstreamRawConfig, err := upstreamServer.RawConfig() require.NoError(t, err) - _, kubeconfigPath := framework.WriteLogicalClusterConfig(t, upstreamRawConfig, wsClusterName, "base") + _, kubeconfigPath := framework.WriteLogicalClusterConfig(t, upstreamRawConfig, "base", wsClusterName) clients, err := clientset.NewClusterForConfig(upstreamCfg) require.NoError(t, err, "failed to construct client for server") @@ -492,10 +489,7 @@ func TestCordonUncordonDrain(t *testing.T) { // its sync target to go ready. This implicitly validates the syncer // heartbeating and the heartbeat controller setting the sync target ready in // response. - syncerFixture := framework.SyncerFixture{ - UpstreamServer: upstreamServer, - WorkspaceClusterName: wsClusterName, - }.Start(t) + syncerFixture := framework.NewSyncerFixture(t, upstreamServer, wsClusterName).Start(t) syncTargetName := syncerFixture.SyncerConfig.SyncTargetName ctx, cancelFunc := context.WithCancel(context.Background()) diff --git a/test/e2e/virtual/syncer/virtualworkspace_test.go b/test/e2e/virtual/syncer/virtualworkspace_test.go index 0d1eafb5c33..45229192a4d 100644 --- a/test/e2e/virtual/syncer/virtualworkspace_test.go +++ b/test/e2e/virtual/syncer/virtualworkspace_test.go @@ -34,7 +34,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/endpoints/discovery" clientgodiscovery "k8s.io/client-go/discovery" @@ -568,13 +567,11 @@ func TestSyncerVirtualWorkspace(t *testing.T) { kubelikeWorkspace := framework.NewWorkspaceFixture(t, server, orgClusterName) t.Logf("Deploying syncer into workspace %s", kubelikeWorkspace) - _ = framework.SyncerFixture{ - ResourcesToSync: sets.NewString("ingresses.networking.k8s.io", "services"), - UpstreamServer: server, - WorkspaceClusterName: kubelikeWorkspace, - SyncTargetName: "kubelike", - InstallCRDs: func(config *rest.Config, isLogicalCluster bool) { - if !isLogicalCluster { + _ = framework.NewSyncerFixture(t, server, kubelikeWorkspace, + framework.WithSyncTarget(kubelikeWorkspace, "kubelike"), + framework.WithExtraResources("ingresses.networking.k8s.io", "services"), + framework.WithDownstreamPreparation(func(config *rest.Config, isFakePCluster bool) { + if !isFakePCluster { // Only need to install services and ingresses in a logical cluster return } @@ -586,8 +583,8 @@ func TestSyncerVirtualWorkspace(t *testing.T) { metav1.GroupResource{Group: "networking.k8s.io", Resource: "ingresses"}, ) require.NoError(t, err) - }, - }.Start(t) + }), + ).Start(t) t.Log("Waiting for ingresses crd to be imported and available in the kubelike source cluster...") require.Eventually(t, func() bool { @@ -644,20 +641,18 @@ func TestSyncerVirtualWorkspace(t *testing.T) { wildwestSyncTargetName := fmt.Sprintf("wildwest-%d", +rand.Intn(1000000)) t.Logf("Deploying syncer into workspace %s", wildwestWorkspace) - _ = framework.SyncerFixture{ - ResourcesToSync: sets.NewString("cowboys.wildwest.dev"), - UpstreamServer: server, - WorkspaceClusterName: wildwestWorkspace, - SyncTargetName: wildwestSyncTargetName, - InstallCRDs: func(config *rest.Config, isLogicalCluster bool) { + _ = framework.NewSyncerFixture(t, server, wildwestWorkspace, + framework.WithExtraResources("cowboys.wildwest.dev"), + framework.WithSyncTarget(wildwestWorkspace, wildwestSyncTargetName), + framework.WithDownstreamPreparation(func(config *rest.Config, isFakePCluster bool) { // Always install the crd regardless of whether the target is // logical or not since cowboys is not a native type. sinkCrdClient, err := apiextensionsclientset.NewForConfig(config) require.NoError(t, err) t.Log("Installing test CRDs into sink cluster...") fixturewildwest.Create(t, sinkCrdClient.ApiextensionsV1().CustomResourceDefinitions(), metav1.GroupResource{Group: wildwest.GroupName, Resource: "cowboys"}) - }, - }.Start(t) + }), + ).Start(t) t.Log("Waiting for cowboys crd to be imported and available in the wildwest source workspace...") require.Eventually(t, func() bool {