Skip to content

Commit

Permalink
e2e/syncer-fixture: use constructor pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
sttts committed Aug 1, 2022
1 parent bc620be commit abe4772
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 146 deletions.
130 changes: 70 additions & 60 deletions test/e2e/framework/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -366,70 +365,83 @@ func NewWorkspaceFixture(t *testing.T, server RunningServer, orgClusterName logi
return wsClusterName
}

// SyncerFixture configures a syncer fixture. Its `Start` method does the work of starting a syncer.
type SyncerFixture struct {
ResourcesToSync sets.String
UpstreamServer RunningServer
WorkspaceClusterName logicalcluster.Name
SyncTargetLogicalClusterName logicalcluster.Name
SyncTargetName string
SyncTargetUID types.UID
InstallCRDs func(config *rest.Config, isLogicalCluster bool)
}
type SyncerOption func(t *testing.T, fs *syncerFixture)

// SetDefaults ensures a valid configuration even if not all values are explicitly provided.
func (sf *SyncerFixture) setDefaults() {
// Default configuration to avoid tests having to be exaustive
if len(sf.SyncTargetName) == 0 {
// This only needs to vary when more than one syncer need to be tested in a workspace
sf.SyncTargetName = "pcluster-01"
func NewSyncerFixture(t *testing.T, server RunningServer, clusterName logicalcluster.Name, opts ...SyncerOption) *syncerFixture {
sf := &syncerFixture{
upstreamServer: server,
workspaceClusterName: clusterName,
syncTargetClusterName: clusterName,
syncTargetName: "psyncer-01",
}
for _, opt := range opts {
opt(t, sf)
}
if len(sf.SyncTargetUID) == 0 {
sf.SyncTargetUID = types.UID("syncTargetUID")
return sf
}

// syncerFixture configures a syncer fixture. Its `Start` method does the work of starting a syncer.
type syncerFixture struct {
upstreamServer RunningServer

workspaceClusterName logicalcluster.Name

syncTargetClusterName logicalcluster.Name
syncTargetName string

extraResourcesToSync []string
prepareDownstream func(config *rest.Config, isFakePCluster bool)
}

func WithSyncTarget(clusterName logicalcluster.Name, name string) SyncerOption {
return func(t *testing.T, sf *syncerFixture) {
sf.syncTargetClusterName = clusterName
sf.syncTargetName = name
}
if sf.SyncTargetLogicalClusterName.Empty() {
sf.SyncTargetLogicalClusterName = sf.WorkspaceClusterName
}

func WithExtraResources(resources ...string) SyncerOption {
return func(t *testing.T, sf *syncerFixture) {
sf.extraResourcesToSync = append(sf.extraResourcesToSync, resources...)
}
if sf.ResourcesToSync == nil {
// resources-to-sync is additive to the core set of resources so not providing any
// values means default types will still be synced.
sf.ResourcesToSync = sets.NewString()
}

func WithDownstreamPreparation(prepare func(config *rest.Config, isFakePCluster bool)) SyncerOption {
return func(t *testing.T, sf *syncerFixture) {
sf.prepareDownstream = prepare
}
}

// Start starts a new syncer against the given upstream kcp workspace. Whether the syncer run
// in-process or deployed on a pcluster will depend whether --pcluster-kubeconfig and
// --syncer-image are supplied to the test invocation.
func (sf SyncerFixture) Start(t *testing.T) *StartedSyncerFixture {
sf.setDefaults()

func (sf *syncerFixture) Start(t *testing.T) *StartedSyncerFixture {
// Write the upstream logical cluster config to disk for the workspace plugin
upstreamRawConfig, err := sf.UpstreamServer.RawConfig()
upstreamRawConfig, err := sf.upstreamServer.RawConfig()
require.NoError(t, err)
_, kubeconfigPath := WriteLogicalClusterConfig(t, upstreamRawConfig, sf.WorkspaceClusterName, "base")
_, kubeconfigPath := WriteLogicalClusterConfig(t, upstreamRawConfig, sf.workspaceClusterName, "base")

useDeployedSyncer := len(TestConfig.PClusterKubeconfig()) > 0

syncerImage := TestConfig.SyncerImage()
if useDeployedSyncer {
require.NotZero(t, len(syncerImage), "--syncer-image must be specified if testing with a deployed syncer")
} else {
// The image needs to be a non-empty string for the plugin command but the value
// doesn't matter if not deploying a syncer.
// The image needs to be a non-empty string for the plugin command but the value doesn't matter if not deploying a syncer.
syncerImage = "not-a-valid-image"
}

// Run the plugin command to enable the syncer and collect the resulting yaml
t.Logf("Configuring workspace %s for syncing", sf.WorkspaceClusterName)
t.Logf("Configuring workspace %s for syncing", sf.workspaceClusterName)
pluginArgs := []string{
"workload",
"sync",
sf.SyncTargetName,
sf.syncTargetName,
"--syncer-image", syncerImage,
"--output-file", "-",
"--qps", "-1",
}
for _, resource := range sf.ResourcesToSync.List() {
for _, resource := range sf.extraResourcesToSync {
pluginArgs = append(pluginArgs, "--resources", resource)
}
syncerYAML := RunKcpCliPlugin(t, kubeconfigPath, pluginArgs)
Expand All @@ -451,17 +463,17 @@ func (sf SyncerFixture) Start(t *testing.T) *StartedSyncerFixture {
// The syncer will target a logical cluster that is a peer to the current workspace. A
// logical server provides as a lightweight approximation of a pcluster for tests that
// don't need to validate running workloads or interaction with kube controllers.
parentClusterName, ok := sf.WorkspaceClusterName.Parent()
require.True(t, ok, "%s does not have a parent", sf.WorkspaceClusterName)
downstreamServer := NewFakeWorkloadServer(t, sf.UpstreamServer, parentClusterName)
parentClusterName, ok := sf.workspaceClusterName.Parent()
require.True(t, ok, "%s does not have a parent", sf.workspaceClusterName)
downstreamServer := NewFakeWorkloadServer(t, sf.upstreamServer, parentClusterName)
downstreamConfig = downstreamServer.BaseConfig(t)
downstreamKubeconfigPath = downstreamServer.KubeconfigPath()
}

if sf.InstallCRDs != nil {
if sf.prepareDownstream != nil {
// Attempt crd installation to ensure the downstream server has an api surface
// compatible with the test.
sf.InstallCRDs(downstreamConfig, !useDeployedSyncer)
sf.prepareDownstream(downstreamConfig, !useDeployedSyncer)
}

// Apply the yaml output from the plugin to the downstream server
Expand All @@ -478,7 +490,7 @@ func (sf SyncerFixture) Start(t *testing.T) *StartedSyncerFixture {
defer cancelFn()

t.Logf("Collecting imported resource info: %s", artifactDir)
upstreamCfg := sf.UpstreamServer.BaseConfig(t)
upstreamCfg := sf.upstreamServer.BaseConfig(t)

gather := func(client dynamic.Interface, gvr schema.GroupVersionResource) {
resourceClient := client.Resource(gvr)
Expand All @@ -492,7 +504,7 @@ func (sf SyncerFixture) Start(t *testing.T) *StartedSyncerFixture {

for i := range list.Items {
item := list.Items[i]
sf.UpstreamServer.Artifact(t, func() (runtime.Object, error) {
sf.upstreamServer.Artifact(t, func() (runtime.Object, error) {
return &item, nil
})
}
Expand All @@ -504,11 +516,11 @@ func (sf SyncerFixture) Start(t *testing.T) *StartedSyncerFixture {
downstreamDynamic, err := dynamic.NewForConfig(downstreamConfig)
require.NoError(t, err, "error creating downstream dynamic client")

gather(upstreamDynamic.Cluster(sf.WorkspaceClusterName), apiresourcev1alpha1.SchemeGroupVersion.WithResource("apiresourceimports"))
gather(upstreamDynamic.Cluster(sf.WorkspaceClusterName), apiresourcev1alpha1.SchemeGroupVersion.WithResource("negotiatedapiresources"))
gather(upstreamDynamic.Cluster(sf.WorkspaceClusterName), corev1.SchemeGroupVersion.WithResource("namespaces"))
gather(upstreamDynamic.Cluster(sf.workspaceClusterName), apiresourcev1alpha1.SchemeGroupVersion.WithResource("apiresourceimports"))
gather(upstreamDynamic.Cluster(sf.workspaceClusterName), apiresourcev1alpha1.SchemeGroupVersion.WithResource("negotiatedapiresources"))
gather(upstreamDynamic.Cluster(sf.workspaceClusterName), corev1.SchemeGroupVersion.WithResource("namespaces"))
gather(downstreamDynamic, corev1.SchemeGroupVersion.WithResource("namespaces"))
gather(upstreamDynamic.Cluster(sf.WorkspaceClusterName), appsv1.SchemeGroupVersion.WithResource("deployments"))
gather(upstreamDynamic.Cluster(sf.workspaceClusterName), appsv1.SchemeGroupVersion.WithResource("deployments"))
gather(downstreamDynamic, appsv1.SchemeGroupVersion.WithResource("deployments"))
})

Expand All @@ -527,7 +539,7 @@ func (sf SyncerFixture) Start(t *testing.T) *StartedSyncerFixture {
break
}
}
require.NotEmpty(t, syncerID, "failed to extract syncer ID from yaml produced by plugin:\n%s", string(syncerYAML))
require.NotEmpty(t, syncerID, "failed to extract syncer namespace from yaml produced by plugin:\n%s", string(syncerYAML))

syncerConfig := syncerConfigFromCluster(t, downstreamConfig, syncerID, syncerID)

Expand Down Expand Up @@ -570,7 +582,7 @@ func (sf SyncerFixture) Start(t *testing.T) *StartedSyncerFixture {
return
}

t.Logf("Deleting syncer resources for logical cluster %q, sync target %q", sf.WorkspaceClusterName, syncerConfig.SyncTargetName)
t.Logf("Deleting syncer resources for logical cluster %q, sync target %q", sf.workspaceClusterName, syncerConfig.SyncTargetName)
err = downstreamKubeClient.CoreV1().Namespaces().Delete(ctx, syncerID, metav1.DeleteOptions{})
if err != nil {
t.Errorf("failed to delete Namespace %q: %v", syncerID, err)
Expand All @@ -584,27 +596,25 @@ func (sf SyncerFixture) Start(t *testing.T) *StartedSyncerFixture {
t.Errorf("failed to delete ClusterRole %q: %v", syncerID, err)
}

t.Logf("Deleting synced resources for logical cluster %q, sync target %q", sf.WorkspaceClusterName, syncerConfig.SyncTargetName)
t.Logf("Deleting synced resources for logical cluster %s, sync target %s|%s", sf.workspaceClusterName, syncerConfig.SyncTargetWorkspace, syncerConfig.SyncTargetName)
namespaces, err := downstreamKubeClient.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
t.Errorf("failed to list namespaces: %v", err)
}
for _, ns := range namespaces.Items {
locator, exists, err := shared.LocatorFromAnnotations(ns.Annotations)
if err != nil {
t.Logf("failed to retrieve locator from ns %q: %v", ns.Name, err)
continue
require.NoError(t, err, "failed to extract locator from namespace %s", ns.Name)
if !exists {
continue // Not a kcp-synced namespace
}
if !exists || locator == nil {
// Not a kcp-synced namespace
continue
if locator.Workspace != sf.workspaceClusterName {
continue // Not a namespace synced from this upstream workspace
}
if locator.Workspace.String() != syncerConfig.SyncTargetWorkspace.String() {
// Not a namespace synced by this syncer
continue
if locator.SyncTarget.Workspace != syncerConfig.SyncTargetWorkspace.String() ||
locator.SyncTarget.Name != syncerConfig.SyncTargetName {
continue // Not a namespace synced by this syncer
}
err = downstreamKubeClient.CoreV1().Namespaces().Delete(ctx, ns.Name, metav1.DeleteOptions{})
if err != nil {
if err = downstreamKubeClient.CoreV1().Namespaces().Delete(ctx, ns.Name, metav1.DeleteOptions{}); err != nil {
t.Logf("failed to delete Namespace %q: %v", ns.Name, err)
}
}
Expand Down
12 changes: 4 additions & 8 deletions test/e2e/reconciler/cluster/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
kubernetesclient "k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -176,19 +175,16 @@ func TestClusterController(t *testing.T) {
sourceKubeClient := sourceKubeClusterClient.Cluster(wsClusterName)
sourceWildwestClient := sourceWildwestClusterClient.Cluster(wsClusterName)

syncerFixture := framework.SyncerFixture{
ResourcesToSync: sets.NewString("cowboys.wildwest.dev"),
UpstreamServer: source,
WorkspaceClusterName: wsClusterName,
InstallCRDs: func(config *rest.Config, isLogicalCluster bool) {
syncerFixture := framework.NewSyncerFixture(t, source, wsClusterName,
framework.WithExtraResources("cowboys.wildwest.dev"),
framework.WithDownstreamPreparation(func(config *rest.Config, isFakePCluster bool) {
// Always install the crd regardless of whether the target is
// logical or not since cowboys is not a native type.
sinkCrdClient, err := apiextensionsclientset.NewForConfig(config)
require.NoError(t, err)
t.Log("Installing test CRDs into sink cluster...")
fixturewildwest.Create(t, sinkCrdClient.ApiextensionsV1().CustomResourceDefinitions(), metav1.GroupResource{Group: wildwest.GroupName, Resource: "cowboys"})
},
}.Start(t)
})).Start(t)

sinkWildwestClient, err := wildwestclientset.NewForConfig(syncerFixture.DownstreamConfig)
require.NoError(t, err)
Expand Down
14 changes: 6 additions & 8 deletions test/e2e/reconciler/ingress/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,10 @@ func TestIngressController(t *testing.T) {
sourceKcpClient := sourceKcpClusterClient.Cluster(clusterName)

t.Logf("Deploy syncer")
syncerFixture := framework.SyncerFixture{
ResourcesToSync: sets.NewString("ingresses.networking.k8s.io", "services"),
UpstreamServer: source,
WorkspaceClusterName: clusterName,
InstallCRDs: func(config *rest.Config, isLogicalCluster bool) {
if !isLogicalCluster {
syncerFixture := framework.NewSyncerFixture(t, source, clusterName,
framework.WithExtraResources("ingresses.networking.k8s.io", "services"),
framework.WithDownstreamPreparation(func(config *rest.Config, isFakePCluster bool) {
if !isFakePCluster {
// Only need to install services and ingresses in a logical cluster
return
}
Expand All @@ -178,8 +176,8 @@ func TestIngressController(t *testing.T) {
metav1.GroupResource{Group: "networking.k8s.io", Resource: "ingresses"},
)
require.NoError(t, err)
},
}.Start(t)
}),
).Start(t)

t.Log("Wait for \"kubernetes\" apiexport")
require.Eventually(t, func() bool {
Expand Down
5 changes: 1 addition & 4 deletions test/e2e/reconciler/namespace/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,7 @@ func TestNamespaceScheduler(t *testing.T) {
// TODO(marun) Extract the heartbeater out of the syncer for reuse in a test fixture. The namespace
// controller just needs ready clusters which can be accomplished without a syncer by having the
// heartbeater update the sync target so the heartbeat controller can set the cluster ready.
syncerFixture := framework.SyncerFixture{
UpstreamServer: server,
WorkspaceClusterName: server.clusterName,
}.Start(t)
syncerFixture := framework.NewSyncerFixture(t, server, server.clusterName).Start(t)
syncTargetName := syncerFixture.SyncerConfig.SyncTargetName

t.Log("Wait for \"kubernetes\" apiexport")
Expand Down
16 changes: 7 additions & 9 deletions test/e2e/reconciler/scheduling/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,11 @@ func TestScheduling(t *testing.T) {

syncTargetName := fmt.Sprintf("synctarget-%d", +rand.Intn(1000000))
t.Logf("Creating a SyncTarget and syncer in %s", negotiationClusterName)
syncerFixture := framework.SyncerFixture{
ResourcesToSync: sets.NewString("services"),
UpstreamServer: source,
WorkspaceClusterName: negotiationClusterName,
SyncTargetName: syncTargetName,
InstallCRDs: func(config *rest.Config, isLogicalCluster bool) {
if !isLogicalCluster {
syncerFixture := framework.NewSyncerFixture(t, source, negotiationClusterName,
framework.WithExtraResources("services"),
framework.WithSyncTarget(negotiationClusterName, syncTargetName),
framework.WithDownstreamPreparation(func(config *rest.Config, isFakePCluster bool) {
if !isFakePCluster {
// Only need to install services and ingresses in a logical cluster
return
}
Expand All @@ -89,8 +87,8 @@ func TestScheduling(t *testing.T) {
metav1.GroupResource{Group: "core.k8s.io", Resource: "services"},
)
require.NoError(t, err)
},
}.Start(t)
}),
).Start(t)

t.Logf("Wait for APIResourceImports to show up in the negotiation workspace")
require.Eventually(t, func() bool {
Expand Down
Loading

0 comments on commit abe4772

Please sign in to comment.