Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add Upsync controller #2214

Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions pkg/syncer/spec/spec_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
"github.com/kcp-dev/kcp/pkg/indexers"
ddsif "github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
Expand Down Expand Up @@ -175,6 +176,18 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
if gvr == namespaceGVR {
return
}
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = d.Obj
}
unstrObj, ok := obj.(*unstructured.Unstructured)
if !ok {
utilruntime.HandleError(fmt.Errorf("resource should be a *unstructured.Unstructured, but was %T", unstrObj))
return
}
if unstrObj.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] == string(workloadv1alpha1.ResourceStateUpsync) {
return
}

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error getting key for type %T: %w", obj, err))
Expand All @@ -188,7 +201,6 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
logger.V(3).Info("processing delete event")

var nsLocatorHolder *unstructured.Unstructured
var ok bool
// Handle namespaced resources
if namespace != "" {
// Use namespace lister
Expand All @@ -214,11 +226,7 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
}
} else {
// The nsLocatorHolder is in the resource itself for cluster-scoped resources.
nsLocatorHolder, ok = obj.(*unstructured.Unstructured)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
return
}
nsLocatorHolder = unstrObj
}
logger = logging.WithObject(logger, nsLocatorHolder)

Expand Down
26 changes: 25 additions & 1 deletion pkg/syncer/status/status_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

workloadv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/workload/v1alpha1"
ddsif "github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
Expand Down Expand Up @@ -116,6 +117,15 @@ func NewStatusSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalclus
if gvr == namespaceGVR {
return
}
unstrObj, ok := obj.(*unstructured.Unstructured)
if !ok {
runtime.HandleError(fmt.Errorf("resource should be a *unstructured.Unstructured, but was %T", unstrObj))
return
}
davidfestal marked this conversation as resolved.
Show resolved Hide resolved
if unstrObj.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] == string(workloadv1alpha1.ResourceStateUpsync) {
return
}

c.AddToQueue(gvr, obj, logger)
},
UpdateFunc: func(gvr schema.GroupVersionResource, oldObj, newObj interface{}) {
Expand All @@ -124,7 +134,9 @@ func NewStatusSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalclus
}
oldUnstrob := oldObj.(*unstructured.Unstructured)
newUnstrob := newObj.(*unstructured.Unstructured)

if newUnstrob.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] == string(workloadv1alpha1.ResourceStateUpsync) {
return
}
if !deepEqualFinalizersAndStatus(oldUnstrob, newUnstrob) {
c.AddToQueue(gvr, newUnstrob, logger)
}
Expand All @@ -133,6 +145,18 @@ func NewStatusSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalclus
if gvr == namespaceGVR {
return
}
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = d.Obj
}
unstrObj, ok := obj.(*unstructured.Unstructured)
if !ok {
runtime.HandleError(fmt.Errorf("resource should be a *unstructured.Unstructured, but was %T", unstrObj))
return
}
davidfestal marked this conversation as resolved.
Show resolved Hide resolved
if unstrObj.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] == string(workloadv1alpha1.ResourceStateUpsync) {
return
}

c.AddToQueue(gvr, obj, logger)
},
})
Expand Down
5 changes: 5 additions & 0 deletions pkg/syncer/status/status_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ func (c *Controller) process(ctx context.Context, gvr schema.GroupVersionResourc
if !ok {
return fmt.Errorf("object to synchronize is expected to be Unstructured, but is %T", obj)
}
if u.GetLabels()[workloadv1alpha1.ClusterResourceStateLabelPrefix+c.syncTargetKey] == string(workloadv1alpha1.ResourceStateUpsync) {
logger.V(4).Info("do not update the status in upstream, since the downstream resource is in Upsync mode")
return nil
}

return c.updateStatusInUpstream(ctx, gvr, upstreamLister, upstreamNamespace, upstreamName, upstreamClusterName, u)
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/kcp-dev/kcp/pkg/syncer/resourcesync"
"github.com/kcp-dev/kcp/pkg/syncer/spec"
"github.com/kcp-dev/kcp/pkg/syncer/status"
"github.com/kcp-dev/kcp/pkg/syncer/upsync"
. "github.com/kcp-dev/kcp/tmc/pkg/logging"
)

Expand Down Expand Up @@ -291,6 +292,12 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i
return err
}

logger.Info("Creating resource upsyncer")
davidfestal marked this conversation as resolved.
Show resolved Hide resolved
upSyncer, err := upsync.NewUpSyncer(logger, logicalcluster.From(syncTarget), cfg.SyncTargetName, syncTargetKey, upstreamUpsyncerClusterClient, downstreamDynamicClient, ddsifForUpstreamUpsyncer, ddsifForDownstream, syncTarget.GetUID())
if err != nil {
return err
}

// Start and sync informer factories
var cacheSyncsForAlwaysRequiredGVRs []cache.InformerSynced
for _, alwaysRequired := range []string{"secrets", "namespaces"} {
Expand Down Expand Up @@ -326,7 +333,7 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i
go syncTargetGVRSource.Start(ctx, 1)
go specSyncer.Start(ctx, numSyncerThreads)
go statusSyncer.Start(ctx, numSyncerThreads)

go upSyncer.Start(ctx, numSyncerThreads)
go downstreamNamespaceController.Start(ctx, numSyncerThreads)

// Create and start GVR-specific controllers through controller managers
Expand Down
Loading