Skip to content

Commit

Permalink
Tunnel: Validate namespace/pod at the syncer side
Browse files Browse the repository at this point in the history
  • Loading branch information
jmprusi committed Mar 2, 2023
1 parent 771cfc8 commit 585ee3c
Show file tree
Hide file tree
Showing 5 changed files with 400 additions and 11 deletions.
9 changes: 8 additions & 1 deletion pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,14 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i

// Start tunneler for POD access
if kcpfeatures.DefaultFeatureGate.Enabled(kcpfeatures.SyncerTunnel) {
go startSyncerTunnel(ctx, upstreamConfig, downstreamConfig, logicalcluster.From(syncTarget), cfg.SyncTargetName)
StartSyncerTunnel(ctx, upstreamConfig, downstreamConfig, logicalcluster.From(syncTarget), cfg.SyncTargetName, cfg.SyncTargetUID, func(gvr schema.GroupVersionResource) (cache.GenericLister, error) {
informers, _ := ddsifForDownstream.Informers()
informer, ok := informers[gvr]
if !ok {
return nil, fmt.Errorf("failed to get informer for gvr: %s", gvr)
}
return informer.Lister(), nil
})
}

StartHeartbeat(ctx, kcpSyncTargetClient, cfg.SyncTargetName, cfg.SyncTargetUID)
Expand Down
167 changes: 161 additions & 6 deletions pkg/syncer/tunneler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,48 @@ package syncer

import (
"context"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"time"

"github.com/kcp-dev/logicalcluster/v3"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/utils/clock"

workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
"github.com/kcp-dev/kcp/pkg/server/requestinfo"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
"github.com/kcp-dev/kcp/pkg/tunneler"
)

// startSyncerTunnel blocks until the context is cancelled trying to establish a tunnel against the specified target.
func startSyncerTunnel(ctx context.Context, upstream, downstream *rest.Config, syncTargetWorkspace logicalcluster.Name, syncTargetName string) {
var (
errorScheme = runtime.NewScheme()
errorCodecs = serializer.NewCodecFactory(errorScheme)
)

func init() {
errorScheme.AddUnversionedTypes(metav1.Unversioned,
&metav1.Status{},
)
}

type ResourceListerFunc func(gvr schema.GroupVersionResource) (cache.GenericLister, error)

// StartSyncerTunnel blocks until the context is cancelled trying to establish a tunnel against the specified target.
func StartSyncerTunnel(ctx context.Context, upstream, downstream *rest.Config, syncTargetWorkspace logicalcluster.Name, syncTargetName, syncTargetUID string, getDownstreamLister ResourceListerFunc) {
// connect to create the reverse tunnels
var (
initBackoff = 5 * time.Second
Expand All @@ -49,16 +74,16 @@ func startSyncerTunnel(ctx context.Context, upstream, downstream *rest.Config, s
backoffMgr := wait.NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration, backoffFactor, jitter, clock)
logger := klog.FromContext(ctx)

wait.BackoffUntil(func() {
go wait.BackoffUntil(func() {
logger.V(5).Info("starting tunnel")
err := startTunneler(ctx, upstream, downstream, syncTargetWorkspace, syncTargetName)
err := startTunneler(ctx, upstream, downstream, syncTargetWorkspace, syncTargetName, syncTargetUID, getDownstreamLister)
if err != nil {
logger.Error(err, "failed to create tunnel")
}
}, backoffMgr, sliding, ctx.Done())
}

func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncTargetClusterName logicalcluster.Name, syncTargetName string) error {
func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncTargetClusterName logicalcluster.Name, syncTargetName, syncTargetUID string, getDownstreamLister ResourceListerFunc) error {
logger := klog.FromContext(ctx)

// syncer --> kcp
Expand All @@ -85,6 +110,7 @@ func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncT
if err != nil {
return err
}

proxy.Transport = clientDownstream.Transport

// create the reverse connection
Expand All @@ -109,7 +135,7 @@ func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncT
defer l.Close()

// reverse proxy the request coming from the reverse connection to the p-cluster apiserver
server := &http.Server{ReadHeaderTimeout: 30 * time.Second, Handler: proxy}
server := &http.Server{ReadHeaderTimeout: 30 * time.Second, Handler: withPodAccessCheck(proxy, getDownstreamLister, syncTargetClusterName, syncTargetName, syncTargetUID)}
defer server.Close()

logger.V(2).Info("serving on reverse connection")
Expand All @@ -126,3 +152,132 @@ func startTunneler(ctx context.Context, upstream, downstream *rest.Config, syncT
logger.V(2).Info("stop serving on reverse connection")
return err
}

func withPodAccessCheck(handler http.Handler, getDownstreamLister ResourceListerFunc, synctargetClusterName logicalcluster.Name, synctargetName, syncTargetUID string) http.HandlerFunc {
namespaceGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "namespaces"}
podGVR := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}

return func(w http.ResponseWriter, req *http.Request) {
resolver := requestinfo.NewKCPRequestInfoResolver()
requestInfo, err := resolver.NewRequestInfo(req)
if err != nil {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("could not resolve RequestInfo: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}

// Ensure that requests are only for pods, and we have the required information, if not, return false.
if requestInfo.Resource != "pods" || requestInfo.Subresource == "" || requestInfo.Name == "" || requestInfo.Namespace == "" {
responsewriters.ErrorNegotiated(
errors.NewForbidden(podGVR.GroupResource(), requestInfo.Name, fmt.Errorf("only pod subresources are allowed")),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}

// Ensure that requests are only for pods in a namespace owned by this syncer, if not, return false.
downstreamNamespaceName := requestInfo.Namespace

nsInformer, err := getDownstreamLister(namespaceGVR)
if err != nil {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("error while getting downstream namespace lister: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}

obj, err := nsInformer.Get(downstreamNamespaceName)
if errors.IsNotFound(err) {
responsewriters.ErrorNegotiated(
errors.NewForbidden(namespaceGVR.GroupResource(), downstreamNamespaceName, fmt.Errorf("namespace %s does not exist", downstreamNamespaceName)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
if err != nil {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("error while getting namespace resource: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}

downstreamNs, ok := obj.(*unstructured.Unstructured)
if !ok {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("namespace resource should be *unstructured.Unstructured but was: %T", obj)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}

// Ensure the referenced downstream namespace locator is correct and owned by this syncer.
if locator, ok, err := shared.LocatorFromAnnotations(downstreamNs.GetAnnotations()); ok {
if err != nil {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("error while getting locator from namespace: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
if locator.SyncTarget.Name != synctargetName || string(locator.SyncTarget.UID) != syncTargetUID || locator.SyncTarget.ClusterName != string(synctargetClusterName) {
responsewriters.ErrorNegotiated(
errors.NewForbidden(namespaceGVR.GroupResource(), downstreamNamespaceName, fmt.Errorf("namespace %q is not owned by this syncer", downstreamNamespaceName)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
} else {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("locator not found on namespace: %q", downstreamNamespaceName)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}

// Ensure Pod is in Upsynced state.
podName := requestInfo.Name
podInformer, err := getDownstreamLister(podGVR)
if err != nil {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("error while getting pod lister: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
obj, err = podInformer.ByNamespace(downstreamNamespaceName).Get(podName)
if errors.IsNotFound(err) {
responsewriters.ErrorNegotiated(
errors.NewForbidden(podGVR.GroupResource(), podName, fmt.Errorf("pod %q does not exist", podName)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
if err != nil {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("error while getting pod resource: %w", err)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
downstreamPod, ok := obj.(*unstructured.Unstructured)
if !ok {
responsewriters.ErrorNegotiated(
errors.NewInternalError(fmt.Errorf("pod resource should be *unstructured.Unstructured but was: %T", obj)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
if downstreamPod.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+workloadv1alpha1.ToSyncTargetKey(synctargetClusterName, synctargetName)] != string(workloadv1alpha1.ResourceStateUpsync) {
responsewriters.ErrorNegotiated(
errors.NewForbidden(podGVR.GroupResource(), podName, fmt.Errorf("pod %q is not in upsynced state", podName)),
errorCodecs, schema.GroupVersion{}, w, req,
)
return
}
handler.ServeHTTP(w, req)
}
}
1 change: 0 additions & 1 deletion pkg/tunneler/podsubresourceproxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ func (tn *tunneler) Proxy(clusterName logicalcluster.Name, syncerName string, rw
}

proxy := httputil.NewSingleHostReverseProxy(target)
// director := proxy.Director
proxy.Transport = &http.Transport{
Proxy: nil, // no proxies
DialContext: d.Dial, // use a reverse connection
Expand Down
49 changes: 48 additions & 1 deletion test/e2e/framework/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
kubernetesinformers "k8s.io/client-go/informers"
kubernetesclient "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
Expand All @@ -56,6 +59,7 @@ import (
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions"
workloadcliplugin "github.com/kcp-dev/kcp/pkg/cliplugins/workload/plugin"
"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/syncer"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
)
Expand Down Expand Up @@ -570,7 +574,50 @@ type appliedSyncerFixture struct {
SyncerVirtualWorkspaceConfig *rest.Config
UpsyncerVirtualWorkspaceConfig *rest.Config

stopHeartBeat context.CancelFunc
stopHeartBeat context.CancelFunc
stopSyncerTunnel context.CancelFunc
}

func (sf *appliedSyncerFixture) StartSyncerTunnel(t *testing.T) *StartedSyncerFixture {
t.Helper()
ctx, cancelFunc := context.WithCancel(context.Background())
t.Cleanup(cancelFunc)
sf.stopSyncerTunnel = cancelFunc

downstreamClient, err := dynamic.NewForConfig(sf.SyncerConfig.DownstreamConfig)
require.NoError(t, err)

downstreamInformer := dynamicinformer.NewDynamicSharedInformerFactory(downstreamClient, 10*time.Hour)
downstreamInformer.Start(ctx.Done())

podGvr := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}
namespaceGvr := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "namespaces"}
informers := make(map[schema.GroupVersionResource]kubernetesinformers.GenericInformer)

// Let's bootstrap the pod and namespace informers so they are ready to use during tests.
informers[podGvr] = downstreamInformer.ForResource(podGvr)
indexers.AddIfNotPresentOrDie(informers[podGvr].Informer().GetIndexer(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
informers[namespaceGvr] = downstreamInformer.ForResource(namespaceGvr)

syncer.StartSyncerTunnel(ctx, sf.SyncerConfig.UpstreamConfig, sf.SyncerConfig.DownstreamConfig, sf.SyncTargetClusterName, sf.SyncerConfig.SyncTargetName, sf.SyncerConfig.SyncTargetUID, func(gvr schema.GroupVersionResource) (cache.GenericLister, error) {
if _, ok := informers[gvr]; !ok {
return nil, fmt.Errorf("no informer for %v", gvr)
}
return informers[gvr].Lister(), nil
})
startedSyncer := &StartedSyncerFixture{
sf,
}

return startedSyncer
}

// StopSyncerTunnel stops the syncer tunnel, the syncer will close the reverse connection and
// pod subresources will not be available anymore.
func (sf *StartedSyncerFixture) StopSyncerTunnel(t *testing.T) {
t.Helper()

sf.stopSyncerTunnel()
}

// StartHeartBeat starts the Heartbeat keeper to maintain
Expand Down
Loading

0 comments on commit 585ee3c

Please sign in to comment.