From 1bb9ce77be2959a780cc8a38feaaa97fbbe15b71 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Thu, 12 Jan 2023 16:00:15 -0500 Subject: [PATCH] add e2e test --- pkg/cliplugins/workload/plugin/sync_test.go | 6 +++ pkg/cliplugins/workload/plugin/syncer.yaml | 6 +++ pkg/syncer/spec/dns/dns_process.go | 12 +++++- pkg/syncer/spec/dns/dns_process_test.go | 30 ++++++++----- pkg/syncer/spec/dns/networkpolicy_dns.yaml | 14 +++++- pkg/syncer/spec/dns/resources.go | 22 +++++++++- test/e2e/syncer/dns/dns_test.go | 48 +++++++++++++++++++++ 7 files changed, 124 insertions(+), 14 deletions(-) diff --git a/pkg/cliplugins/workload/plugin/sync_test.go b/pkg/cliplugins/workload/plugin/sync_test.go index fd27ad1b1bf7..f5f09f6a5d6b 100644 --- a/pkg/cliplugins/workload/plugin/sync_test.go +++ b/pkg/cliplugins/workload/plugin/sync_test.go @@ -60,6 +60,12 @@ rules: - "list" - "watch" - "delete" +- apiGroups: + - "" + resources: + - endpoints + verbs: + - "get" - apiGroups: - "apiextensions.k8s.io" resources: diff --git a/pkg/cliplugins/workload/plugin/syncer.yaml b/pkg/cliplugins/workload/plugin/syncer.yaml index 25bd500416bb..1eca47fe6190 100644 --- a/pkg/cliplugins/workload/plugin/syncer.yaml +++ b/pkg/cliplugins/workload/plugin/syncer.yaml @@ -34,6 +34,12 @@ rules: - "list" - "watch" - "delete" +- apiGroups: + - "" + resources: + - endpoints + verbs: + - "get" - apiGroups: - "apiextensions.k8s.io" resources: diff --git a/pkg/syncer/spec/dns/dns_process.go b/pkg/syncer/spec/dns/dns_process.go index 7a4cc6b83fb0..4eab7e7f8630 100644 --- a/pkg/syncer/spec/dns/dns_process.go +++ b/pkg/syncer/spec/dns/dns_process.go @@ -18,6 +18,7 @@ package dns import ( "context" + "errors" "sync" "github.com/kcp-dev/logicalcluster/v3" @@ -247,9 +248,18 @@ func (d *DNSProcessor) processService(ctx context.Context, name string) error { func (d *DNSProcessor) processNetworkPolicy(ctx context.Context, name string) error { logger := klog.FromContext(ctx) + var kubeEndpoints *corev1.Endpoints _, err := d.networkPolicyLister.NetworkPolicies(d.dnsNamespace).Get(name) if apierrors.IsNotFound(err) { - expected := MakeNetworkPolicy(name, d.dnsNamespace, d.syncTargetKey) + kubeEndpoints, err = d.downstreamKubeClient.CoreV1().Endpoints("default").Get(ctx, "kubernetes", metav1.GetOptions{}) + if err != nil { + return err + } + if len(kubeEndpoints.Subsets) == 0 || len(kubeEndpoints.Subsets[0].Addresses) == 0 { + return errors.New("missing kubernetes API endpoints") + } + + expected := MakeNetworkPolicy(name, d.dnsNamespace, d.syncTargetKey, &kubeEndpoints.Subsets[0]) _, err = d.downstreamKubeClient.NetworkingV1().NetworkPolicies(d.dnsNamespace).Create(ctx, expected, metav1.CreateOptions{}) if err == nil { logger.Info("NetworkPolicy created") diff --git a/pkg/syncer/spec/dns/dns_process_test.go b/pkg/syncer/spec/dns/dns_process_test.go index 1f0961786394..139b15300ed3 100644 --- a/pkg/syncer/spec/dns/dns_process_test.go +++ b/pkg/syncer/spec/dns/dns_process_test.go @@ -46,6 +46,7 @@ var ( roleBindingGVR = schema.GroupVersionResource{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "rolebindings"} serviceGVR = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"} deploymentGVR = schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"} + endpointGVR = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "endpoints"} networkPolicyGVR = schema.GroupVersionResource{Group: "networking.k8s.io", Version: "v1", Resource: "networkpolicies"} ) @@ -96,7 +97,7 @@ func TestDNSProcess(t *testing.T) { MakeService(dnsID, dnsns), MakeDeployment(dnsID, dnsns, "dnsimage"), endpoints(dnsID, dnsns, "8.8.8.8"), - MakeNetworkPolicy(dnsID, dnsns, syncTargetKey), + MakeNetworkPolicy(dnsID, dnsns, syncTargetKey, &corev1.EndpointSubset{}), }, expectReady: true, expectActions: []clienttesting.Action{}, @@ -111,7 +112,7 @@ func TestDNSProcess(t *testing.T) { MakeService(dnsID, dnsns), MakeDeployment(dnsID, dnsns, "dnsimage"), endpoints(dnsID, dnsns, "8.8.8.8"), - MakeNetworkPolicy(dnsID, dnsns, syncTargetKey), + MakeNetworkPolicy(dnsID, dnsns, syncTargetKey, &corev1.EndpointSubset{}), }, expectReady: false, expectActions: []clienttesting.Action{ @@ -121,7 +122,9 @@ func TestDNSProcess(t *testing.T) { dnsImage: "newdnsimage", }, "endpoint does not exist, no DNS objects": { - resources: []runtime.Object{}, + resources: []runtime.Object{ + endpoints("kubernetes", "default", "10.0.0.0"), + }, expectReady: false, expectActions: []clienttesting.Action{ clienttesting.NewCreateAction(serviceAccountGVR, dnsns, MakeServiceAccount(dnsID, dnsns)), @@ -129,7 +132,10 @@ func TestDNSProcess(t *testing.T) { clienttesting.NewCreateAction(roleBindingGVR, dnsns, MakeRoleBinding(dnsID, dnsns)), clienttesting.NewCreateAction(deploymentGVR, dnsns, MakeDeployment(dnsID, dnsns, "dnsimage")), clienttesting.NewCreateAction(serviceGVR, dnsns, MakeService(dnsID, dnsns)), - clienttesting.NewCreateAction(networkPolicyGVR, dnsns, MakeNetworkPolicy(dnsID, dnsns, syncTargetKey)), + clienttesting.NewGetAction(endpointGVR, "default", "kubernetes"), + clienttesting.NewCreateAction(networkPolicyGVR, dnsns, MakeNetworkPolicy(dnsID, dnsns, syncTargetKey, &corev1.EndpointSubset{ + Addresses: []corev1.EndpointAddress{{IP: "10.0.0.0"}}, + })), }, initialized: true, dnsImage: "dnsimage", @@ -141,7 +147,7 @@ func TestDNSProcess(t *testing.T) { MakeRoleBinding(dnsID, dnsns), MakeService(dnsID, dnsns), MakeDeployment(dnsID, dnsns, "dnsimage"), - MakeNetworkPolicy(dnsID, dnsns, syncTargetKey), + MakeNetworkPolicy(dnsID, dnsns, syncTargetKey, &corev1.EndpointSubset{}), }, expectReady: false, expectActions: []clienttesting.Action{}, @@ -155,7 +161,7 @@ func TestDNSProcess(t *testing.T) { MakeRoleBinding(dnsID, dnsns), MakeService(dnsID, dnsns), MakeDeployment(dnsID, dnsns, "dnsimage"), - MakeNetworkPolicy(dnsID, dnsns, syncTargetKey), + MakeNetworkPolicy(dnsID, dnsns, syncTargetKey, &corev1.EndpointSubset{}), }, expectReady: false, expectActions: []clienttesting.Action{}, @@ -169,7 +175,7 @@ func TestDNSProcess(t *testing.T) { MakeRoleBinding(dnsID, dnsns), MakeService(dnsID, dnsns), MakeDeployment(dnsID, dnsns, "dnsimage"), - MakeNetworkPolicy(dnsID, dnsns, syncTargetKey), + MakeNetworkPolicy(dnsID, dnsns, syncTargetKey, &corev1.EndpointSubset{}), }, expectReady: false, expectActions: []clienttesting.Action{ @@ -278,10 +284,12 @@ func endpoints(name, namespace, ip string) *corev1.Endpoints { } if ip != "" { endpoint.Subsets = []corev1.EndpointSubset{ - {Addresses: []corev1.EndpointAddress{ - { - IP: ip, - }}}, + { + Addresses: []corev1.EndpointAddress{ + { + IP: ip, + }}, + }, } } return endpoint diff --git a/pkg/syncer/spec/dns/networkpolicy_dns.yaml b/pkg/syncer/spec/dns/networkpolicy_dns.yaml index 34e1cd06cc39..cb5a5db38a57 100644 --- a/pkg/syncer/spec/dns/networkpolicy_dns.yaml +++ b/pkg/syncer/spec/dns/networkpolicy_dns.yaml @@ -14,7 +14,7 @@ spec: - from: - namespaceSelector: matchLabels: - internal.workload.kcp.dev/cluster: Cluster + internal.workload.kcp.io/cluster: Cluster ports: - protocol: TCP port: 5353 @@ -26,8 +26,20 @@ spec: - namespaceSelector: matchLabels: kubernetes.io/metadata.name: kube-system + - podSelector: + matchLabels: + k8s-app: kube-dns ports: - protocol: TCP port: 53 - protocol: UDP port: 53 + # Give access to the API server to watch its associated configmap + - to: + # one ipBlock per IP (dynamically filled) + - ipBlock: + cidr: APIServerIP + ports: + - protocol: TCP + port: 6443 + diff --git a/pkg/syncer/spec/dns/resources.go b/pkg/syncer/spec/dns/resources.go index a0ad83d7cec5..e77b8d5f6093 100644 --- a/pkg/syncer/spec/dns/resources.go +++ b/pkg/syncer/spec/dns/resources.go @@ -27,6 +27,7 @@ import ( rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/yaml" workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1" @@ -109,7 +110,7 @@ func MakeService(name, namespace string) *corev1.Service { return service } -func MakeNetworkPolicy(name, namespace, cluster string) *networkingv1.NetworkPolicy { +func MakeNetworkPolicy(name, namespace, cluster string, kubeEndpoints *corev1.EndpointSubset) *networkingv1.NetworkPolicy { np := networkPolicyTemplate.DeepCopy() np.Name = name @@ -117,6 +118,25 @@ func MakeNetworkPolicy(name, namespace, cluster string) *networkingv1.NetworkPol np.Spec.PodSelector.MatchLabels["app"] = name np.Spec.Ingress[0].From[0].NamespaceSelector.MatchLabels[workloadv1alpha1.InternalDownstreamClusterLabel] = cluster + to := make([]networkingv1.NetworkPolicyPeer, len(kubeEndpoints.Addresses)) + for i, endpoint := range kubeEndpoints.Addresses { + to[i] = networkingv1.NetworkPolicyPeer{ + IPBlock: &networkingv1.IPBlock{ + CIDR: endpoint.IP + "/32", + }, + } + } + np.Spec.Egress[1].To = to + + ports := make([]networkingv1.NetworkPolicyPort, len(kubeEndpoints.Ports)) + for i, port := range kubeEndpoints.Ports { + pport := intstr.FromInt(int(port.Port)) + ports[i].Port = &pport + pprotocol := port.Protocol + ports[i].Protocol = &pprotocol + } + np.Spec.Egress[1].Ports = ports + return np } diff --git a/test/e2e/syncer/dns/dns_test.go b/test/e2e/syncer/dns/dns_test.go index 6ab3ac78f9ef..7096b494ee7c 100644 --- a/test/e2e/syncer/dns/dns_test.go +++ b/test/e2e/syncer/dns/dns_test.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/retry" "github.com/kcp-dev/kcp/test/e2e/framework" "github.com/kcp-dev/kcp/test/e2e/syncer/dns/workspace1" @@ -91,6 +92,18 @@ func TestDNSResolution(t *testing.T) { downstreamWS2NS1 := syncer.DownstreamNamespaceFor(t, logicalcluster.Name(workloadWorkspace2.Spec.Cluster), "dns-ws2-ns1") t.Logf("Downstream namespace 1 in workspace 2 is %s", downstreamWS2NS1) + t.Log("Checking network policies have been created") + framework.Eventually(t, func() (success bool, reason string) { + np, err := downstreamKubeClient.NetworkingV1().NetworkPolicies(syncer.SyncerID).List(ctx, metav1.ListOptions{}) + if err != nil { + return false, fmt.Sprintf("error while getting network policies: %v\n", err) + } + if len(np.Items) != 2 { + return false, fmt.Sprintf("expecting 2 network policies, got: %d\n", len(np.Items)) + } + return true, "" + }, wait.ForeverTestTimeout, time.Millisecond*500, "Network policies haven't been created") + t.Log("Checking fully qualified DNS name resolves") framework.Eventually(t, checkLogs(ctx, t, downstreamKubeClient, downstreamWS1NS1, "ping-fully-qualified", "PING svc.dns-ws1-ns1.svc.cluster.local ("), wait.ForeverTestTimeout, time.Millisecond*500, "Service name was not resolved") @@ -106,6 +119,41 @@ func TestDNSResolution(t *testing.T) { t.Log("Checking DNS name does not resolve across workspaces") framework.Eventually(t, checkLogs(ctx, t, downstreamKubeClient, downstreamWS2NS1, "ping-fully-qualified-fail", "ping: bad"), wait.ForeverTestTimeout, time.Millisecond*500, "Service name was resolved") + + t.Log("Change ping-fully-qualified deployment DNS config to use workspace 2 nameserver") + dnsServices, err := downstreamKubeClient.CoreV1().Services(syncer.SyncerID).List(ctx, metav1.ListOptions{}) + require.NoError(t, err) + require.True(t, len(dnsServices.Items) >= 2) + + deployment, err := downstreamKubeClient.AppsV1().Deployments(downstreamWS1NS1).Get(ctx, "ping-fully-qualified", metav1.GetOptions{}) + require.NoError(t, err) + + existingDNSIP := deployment.Spec.Template.Spec.DNSConfig.Nameservers[0] + newDNSIP := "" + for _, svc := range dnsServices.Items { + if strings.HasPrefix(svc.Name, "kcp-dns-") { + if svc.Spec.ClusterIP != existingDNSIP { + newDNSIP = svc.Spec.ClusterIP + break + } + } + } + require.NotEmpty(t, newDNSIP, "could not find another DNS service") + deployment.Spec.Template.Spec.DNSConfig.Nameservers[0] = newDNSIP + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + deployment, err := downstreamKubeClient.AppsV1().Deployments(downstreamWS1NS1).Get(ctx, "ping-fully-qualified", metav1.GetOptions{}) + if err != nil { + return err + } + + deployment.Spec.Template.Spec.DNSConfig.Nameservers[0] = newDNSIP + _, err = downstreamKubeClient.AppsV1().Deployments(downstreamWS1NS1).Update(ctx, deployment, metav1.UpdateOptions{}) + return err + }) + require.NoError(t, err) + + framework.Eventually(t, checkLogs(ctx, t, downstreamKubeClient, downstreamWS1NS1, "ping-fully-qualified", "ping: bad"), + wait.ForeverTestTimeout, time.Millisecond*500, "Service name was still not resolved") } func checkLogs(ctx context.Context, t *testing.T, downstreamKubeClient *kubernetes.Clientset, downstreamNamespace, containerName, expectedPrefix string) func() (success bool, reason string) {