Skip to content

Commit

Permalink
✨ Implement PV and PVC syncers
Browse files Browse the repository at this point in the history
This implements two new syncers as laid out in
/~https://github.com/kcp-dev/kcp/issues/1981.

Fixes: /~https://github.com/kcp-dev/kcp/issues/1981

Co-authored-by: David Festal <dfestal@redhat.com>
Signed-off-by: Sébastien Han <seb@redhat.com>
  • Loading branch information
leseb and davidfestal committed Feb 27, 2023
1 parent 771cfc8 commit d5f4cdf
Show file tree
Hide file tree
Showing 17 changed files with 2,132 additions and 30 deletions.
1 change: 0 additions & 1 deletion pkg/crdpuller/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"k8s.io/kube-openapi/pkg/util/proto"
"k8s.io/kube-openapi/pkg/util/sets"
"k8s.io/kubernetes/pkg/api/genericcontrolplanescheme"
_ "k8s.io/kubernetes/pkg/genericcontrolplane/apis/install"
)

type schemaPuller struct {
Expand Down
11 changes: 9 additions & 2 deletions pkg/features/kcp_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ const (
//
// Enable reverse tunnels to the downstream clusters through the syncers.
SyncerTunnel featuregate.Feature = "KCPSyncerTunnel"

// owner: @leseb
// alpha: v0.11
//
// Enable PersistentVolume and PersistentVolumeClaim storage controllers in the Syncer.
SyncerStorage featuregate.Feature = "KCPSyncerStorage"
)

// DefaultFeatureGate exposes the upstream feature gate, but with our gate setting applied.
Expand Down Expand Up @@ -96,8 +102,9 @@ func (f *kcpFeatureGate) Type() string {
// in the generic control plane code. To add a new feature, define a key for it above and add it
// here. The features will be available throughout Kubernetes binaries.
var defaultGenericControlPlaneFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
LocationAPI: {Default: true, PreRelease: featuregate.Alpha},
SyncerTunnel: {Default: false, PreRelease: featuregate.Alpha},
LocationAPI: {Default: true, PreRelease: featuregate.Alpha},
SyncerTunnel: {Default: false, PreRelease: featuregate.Alpha},
SyncerStorage: {Default: false, PreRelease: featuregate.Alpha},

// inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:
Expand Down
28 changes: 28 additions & 0 deletions pkg/indexers/indexers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package indexers

import (
"encoding/json"
"fmt"
"strings"

Expand Down Expand Up @@ -45,6 +46,8 @@ const (
ByLogicalClusterPath = "ByLogicalClusterPath"
// ByLogicalClusterPathAndName indexes by logical cluster path and object name, if the annotation exists.
ByLogicalClusterPathAndName = "ByLogicalClusterPathAndName"
// NamespaceLocatorIndexName is the name of the index that allows you to filter by namespace locator.
NamespaceLocatorIndexName = "ns-locator"
)

// IndexBySyncerFinalizerKey indexes by syncer finalizer label keys.
Expand Down Expand Up @@ -142,3 +145,28 @@ func ByPathAndName[T runtime.Object](groupResource schema.GroupResource, indexer
}
return objs[0].(T), nil
}

// IndexByNamespaceLocator is a cache.IndexFunc that indexes namespaces by the namespaceLocator annotation.
func IndexByNamespaceLocator(obj interface{}) ([]string, error) {
metaObj, ok := obj.(metav1.Object)
if !ok {
return nil, fmt.Errorf("obj is supposed to be a metav1.Object, but is %T", obj)
}

if loc, found, err := syncershared.LocatorFromAnnotations(metaObj.GetAnnotations()); err != nil {
return nil, fmt.Errorf("failed to get locator from annotations: %w", err)
} else if !found {
return nil, nil
} else {
nsLocByte, err := json.Marshal(loc)
if err != nil {
return nil, fmt.Errorf("failed to marshal locator %#v: %w", loc, err)
}
return []string{NamespaceLocatorIndexKey(nsLocByte)}, nil
}
}

// NamespaceLocatorIndexKey formats the index key for a namespace locator.
func NamespaceLocatorIndexKey(namespaceLocator []byte) string {
return string(namespaceLocator)
}
2 changes: 1 addition & 1 deletion pkg/syncer/indexers/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

const (
ByNamespaceLocatorIndexName = "syncer-spec-ByNamespaceLocator"
ByNamespaceLocatorIndexName = "syncer-ByNamespaceLocator"
)

// indexByNamespaceLocator is a cache.IndexFunc that indexes namespaces by the namespaceLocator annotation.
Expand Down
2 changes: 1 addition & 1 deletion pkg/syncer/shared/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
type NamespaceLocator struct {
SyncTarget SyncTargetLocator `json:"syncTarget"`
ClusterName logicalcluster.Name `json:"cluster,omitempty"`
Namespace string `json:"namespace"`
Namespace string `json:"namespace,omitempty"`
}

type SyncTargetLocator struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/syncer/status/status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func NewStatusSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalclus
if newUnstrob.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] == string(workloadv1alpha1.ResourceStateUpsync) {
return
}
if !deepEqualFinalizersAndStatus(oldUnstrob, newUnstrob) {
if !notDeepEqualFinalizersAndStatusOrDelayStatusSyncingAnnotationChanged(oldUnstrob, newUnstrob) {
c.AddToQueue(gvr, newUnstrob, logger)
}
},
Expand Down
29 changes: 29 additions & 0 deletions pkg/syncer/status/status_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ import (
workloadcliplugin "github.com/kcp-dev/kcp/pkg/cliplugins/workload/plugin"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
"github.com/kcp-dev/kcp/pkg/syncer/storage"
. "github.com/kcp-dev/kcp/tmc/pkg/logging"
)

var deepEqualFinalizersAndStatusFunc = deepEqualFinalizersAndStatus
var hasDelayStatusSyncingAnnotationChangedFunc = hasDelayStatusSyncingAnnotationChanged

func deepEqualFinalizersAndStatus(oldUnstrob, newUnstrob *unstructured.Unstructured) bool {
newFinalizers := newUnstrob.GetFinalizers()
oldFinalizers := oldUnstrob.GetFinalizers()
Expand All @@ -51,6 +55,31 @@ func deepEqualFinalizersAndStatus(oldUnstrob, newUnstrob *unstructured.Unstructu
return equality.Semantic.DeepEqual(oldFinalizers, newFinalizers) && equality.Semantic.DeepEqual(oldStatus, newStatus)
}

func hasDelayStatusSyncingAnnotationChanged(oldUnstrob, newUnstrob *unstructured.Unstructured) bool {
// If the annotation was added
if !hasDelayStatusSyncingAnnotation(oldUnstrob) && hasDelayStatusSyncingAnnotation(newUnstrob) {
return true
}

// Annotation was removed
if hasDelayStatusSyncingAnnotation(oldUnstrob) && !hasDelayStatusSyncingAnnotation(newUnstrob) {
return false
}

// Both object have the annotation, nothing changed
return hasDelayStatusSyncingAnnotation(oldUnstrob) && hasDelayStatusSyncingAnnotation(newUnstrob)
}

func notDeepEqualFinalizersAndStatusOrDelayStatusSyncingAnnotationChanged(oldUnstrob, newUnstrob *unstructured.Unstructured) bool {
return !deepEqualFinalizersAndStatusFunc(oldUnstrob, newUnstrob) || hasDelayStatusSyncingAnnotationChangedFunc(oldUnstrob, newUnstrob)
}

func hasDelayStatusSyncingAnnotation(unstrob *unstructured.Unstructured) bool {
annotations := unstrob.GetAnnotations()
delayStatusSyncing, ok := annotations[storage.DelayStatusSyncing]
return ok && delayStatusSyncing == "true"
}

func (c *Controller) process(ctx context.Context, gvr schema.GroupVersionResource, key string) error {
logger := klog.FromContext(ctx)

Expand Down
84 changes: 84 additions & 0 deletions pkg/syncer/status/status_process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
ddsif "github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/syncer/indexers"
"github.com/kcp-dev/kcp/pkg/syncer/storage"
)

var scheme *runtime.Scheme
Expand Down Expand Up @@ -812,3 +813,86 @@ func updateDeploymentAction(namespace string, object runtime.Object, subresource
Object: object,
}
}

func Test_hasDelayStatusSyncingAnnotation(t *testing.T) {
type args struct {
unstrob *unstructured.Unstructured
}
tests := []struct {
name string
args args
want bool
}{
{"no annotation", args{unstrob: &unstructured.Unstructured{}}, false},
{"annotation but incorrect content", args{unstrob: &unstructured.Unstructured{Object: map[string]interface{}{"metadata": map[string]interface{}{"annotations": map[string]interface{}{storage.DelayStatusSyncing: "foo"}}}}}, false},
{"annotation", args{unstrob: &unstructured.Unstructured{Object: map[string]interface{}{"metadata": map[string]interface{}{"annotations": map[string]interface{}{storage.DelayStatusSyncing: "true"}}}}}, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := hasDelayStatusSyncingAnnotation(tt.args.unstrob); got != tt.want {
t.Errorf("hasDelayStatusSyncingAnnotation() = %v, want %v", got, tt.want)
}
})
}
}

func Test_hasDelayStatusSyncingAnnotationChanged(t *testing.T) {
type args struct {
oldUnstrob *unstructured.Unstructured
newUnstrob *unstructured.Unstructured
}
tests := []struct {
name string
args args
want bool
}{
{"no annotation", args{oldUnstrob: &unstructured.Unstructured{}, newUnstrob: &unstructured.Unstructured{}}, false},
{"annotation but incorrect content", args{oldUnstrob: &unstructured.Unstructured{Object: map[string]interface{}{"metadata": map[string]interface{}{"annotations": map[string]interface{}{storage.DelayStatusSyncing: "foo"}}}}, newUnstrob: &unstructured.Unstructured{Object: map[string]interface{}{"metadata": map[string]interface{}{"annotations": map[string]interface{}{storage.DelayStatusSyncing: "foo"}}}}}, false},
{"annotation is still there and hasn't changed", args{oldUnstrob: &unstructured.Unstructured{Object: map[string]interface{}{"metadata": map[string]interface{}{"annotations": map[string]interface{}{storage.DelayStatusSyncing: "true"}}}}, newUnstrob: &unstructured.Unstructured{Object: map[string]interface{}{"metadata": map[string]interface{}{"annotations": map[string]interface{}{storage.DelayStatusSyncing: "true"}}}}}, true},
{"annotation was removed", args{oldUnstrob: &unstructured.Unstructured{Object: map[string]interface{}{"metadata": map[string]interface{}{"annotations": map[string]interface{}{storage.DelayStatusSyncing: "true"}}}}, newUnstrob: &unstructured.Unstructured{Object: map[string]interface{}{"metadata": map[string]interface{}{"annotations": map[string]interface{}{"foo": "true"}}}}}, false},
{"annotation was added", args{oldUnstrob: &unstructured.Unstructured{Object: map[string]interface{}{"metadata": map[string]interface{}{"annotations": map[string]interface{}{"foo": "true"}}}}, newUnstrob: &unstructured.Unstructured{Object: map[string]interface{}{"metadata": map[string]interface{}{"annotations": map[string]interface{}{storage.DelayStatusSyncing: "true"}}}}}, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := hasDelayStatusSyncingAnnotationChanged(tt.args.oldUnstrob, tt.args.newUnstrob); got != tt.want {
t.Errorf("hasDelayStatusSyncingAnnotationChanged() = %v, want %v", got, tt.want)
}
})
}
}

// Both called functions have good unit tests already so this is only testing the OR case.
func Test_notDeepEqualFinalizersAndStatusOrDelayStatusSyncingAnnotationChanged(t *testing.T) {
t.Run("deepEqualFinalizersAndStatusFunc is true / hasDelayStatusSyncingAnnotationChangedFunc istrue", func(t *testing.T) {
deepEqualFinalizersAndStatusFunc = func(oldObj, newObj *unstructured.Unstructured) bool { return true }
hasDelayStatusSyncingAnnotationChangedFunc = func(oldObj, newObj *unstructured.Unstructured) bool { return true }
want := true
if got := notDeepEqualFinalizersAndStatusOrDelayStatusSyncingAnnotationChanged(nil, nil); got != want {
t.Errorf("notDeepEqualFinalizersAndStatusOrDelayStatusSyncingAnnotationChanged() = %v, want %v", got, want)
}
})
t.Run("deepEqualFinalizersAndStatusFunc is false / hasDelayStatusSyncingAnnotationChangedFunc is false", func(t *testing.T) {
deepEqualFinalizersAndStatusFunc = func(oldObj, newObj *unstructured.Unstructured) bool { return true }
hasDelayStatusSyncingAnnotationChangedFunc = func(oldObj, newObj *unstructured.Unstructured) bool { return false }
want := false
if got := notDeepEqualFinalizersAndStatusOrDelayStatusSyncingAnnotationChanged(nil, nil); got != want {
t.Errorf("notDeepEqualFinalizersAndStatusOrDelayStatusSyncingAnnotationChanged() = %v, want %v", got, want)
}
})
t.Run("deepEqualFinalizersAndStatusFunc is false / hasDelayStatusSyncingAnnotationChangedFunc is true", func(t *testing.T) {
deepEqualFinalizersAndStatusFunc = func(oldObj, newObj *unstructured.Unstructured) bool { return false }
hasDelayStatusSyncingAnnotationChangedFunc = func(oldObj, newObj *unstructured.Unstructured) bool { return true }
want := true
if got := notDeepEqualFinalizersAndStatusOrDelayStatusSyncingAnnotationChanged(nil, nil); got != want {
t.Errorf("notDeepEqualFinalizersAndStatusOrDelayStatusSyncingAnnotationChanged() = %v, want %v", got, want)
}
})
t.Run("deepEqualFinalizersAndStatusFunc is true / hasDelayStatusSyncingAnnotationChangedFunc is true", func(t *testing.T) {
deepEqualFinalizersAndStatusFunc = func(oldObj, newObj *unstructured.Unstructured) bool { return true }
hasDelayStatusSyncingAnnotationChangedFunc = func(oldObj, newObj *unstructured.Unstructured) bool { return true }
want := true
if got := notDeepEqualFinalizersAndStatusOrDelayStatusSyncingAnnotationChanged(nil, nil); got != want {
t.Errorf("notDeepEqualFinalizersAndStatusOrDelayStatusSyncingAnnotationChanged() = %v, want %v", got, want)
}
})
}
Loading

0 comments on commit d5f4cdf

Please sign in to comment.