diff --git a/pkg/syncer/upsync/upsync_controller.go b/pkg/syncer/upsync/upsync_controller.go index b9c8a40eb93c..da414386f86d 100644 --- a/pkg/syncer/upsync/upsync_controller.go +++ b/pkg/syncer/upsync/upsync_controller.go @@ -24,6 +24,7 @@ import ( "github.com/go-logr/logr" kcpdynamic "github.com/kcp-dev/client-go/dynamic" + "github.com/kcp-dev/client-go/dynamic/dynamicinformer" ddsif "github.com/kcp-dev/kcp/pkg/informer" "github.com/kcp-dev/kcp/pkg/logging" "github.com/kcp-dev/kcp/pkg/syncer/shared" @@ -53,6 +54,7 @@ type Controller struct { upstreamClient kcpdynamic.ClusterInterface downstreamClient dynamic.Interface downstreamNamespaceLister cache.GenericLister + downstreamInformer dynamicinformer.DynamicSharedInformerFactory getDownstreamLister func(gvr schema.GroupVersionResource) (cache.GenericLister, error) @@ -62,18 +64,18 @@ type Controller struct { syncTargetKey string } -type Keysource uint -type UpdateType uint +type Keysource string +type UpdateType string const ( - Upstream Keysource = iota - Downstream + Upstream Keysource = "Upstream" + Downstream Keysource = "Downstream" ) const ( - SpecUpdate UpdateType = iota - StatusUpdate - MetadataUpdate + SpecUpdate UpdateType = "Spec" + StatusUpdate UpdateType = "Status" + MetadataUpdate UpdateType = "Meta" ) // Upstream resource key generation @@ -108,7 +110,27 @@ type queueKey struct { // Differentiate between upstream and downstream keys keysource Keysource // Update type - updateType []UpdateType + updateType UpdateType +} + +func updateTypeSlicetoString(update []UpdateType) UpdateType { + str := "" + for i, updateType := range update { + if i != 0 { + str += "," + } + str += string(updateType) + } + return UpdateType(str) +} + +func updateTypeStringtoSlice(update UpdateType) []UpdateType { + str := strings.Split(string(update), ",") + updateType := make([]UpdateType, len(str)) + for i, item := range str { + updateType[i] = UpdateType(item) + } + return updateType } func getUpdateType(oldObj *unstructured.Unstructured, newObj *unstructured.Unstructured) []UpdateType { @@ -144,11 +166,13 @@ func (c *Controller) AddToQueue(gvr schema.GroupVersionResource, obj interface{} return } logging.WithQueueKey(logger, key).V(2).Info("queueing GVR", "gvr", gvr.String()) + updateTypeHashed := updateTypeSlicetoString(updateType) c.queue.Add( queueKey{ - gvr: gvr, - key: key, - keysource: keysource, + gvr: gvr, + key: key, + keysource: keysource, + updateType: updateTypeHashed, }, ) } @@ -251,12 +275,14 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool { keySource = "Upstream" } - logger := logging.WithQueueKey(klog.FromContext(ctx), qk.key).WithValues("gvr", qk.gvr.String(), "Key for:", keySource) + logger := logging.WithQueueKey(klog.FromContext(ctx), qk.key).WithValues("gvr", qk.gvr.String(), "source", keySource) ctx = klog.NewContext(ctx, logger) logger.V(1).Info("Processing key") defer c.queue.Done(key) - if err := c.process(ctx, qk.gvr, qk.key, qk.keysource == Upstream, updateType); err != nil { + updateTypeArray := updateTypeStringtoSlice(updateType) + + if err := c.process(ctx, qk.gvr, qk.key, qk.keysource == Upstream, updateTypeArray); err != nil { runtime.HandleError(fmt.Errorf("%s failed to upsync %q, err: %w", controllerName, key, err)) c.queue.AddRateLimited(key) return true diff --git a/pkg/syncer/upsync/upsync_process.go b/pkg/syncer/upsync/upsync_process.go index 21c280f84594..89baaf04de5f 100644 --- a/pkg/syncer/upsync/upsync_process.go +++ b/pkg/syncer/upsync/upsync_process.go @@ -18,6 +18,7 @@ package upsync import ( "context" + "errors" "fmt" "github.com/kcp-dev/kcp/pkg/syncer/shared" @@ -84,7 +85,7 @@ func (c *Controller) processUpstreamResource(ctx context.Context, gvr schema.Gro return err } } else { - _, err := c.downstreamClient.Resource(gvr).Get(context.TODO(), upstreamName, metav1.GetOptions{}) + _, err := c.downstreamInformer.ForResource(gvr).Lister().Get(upstreamName) if err != nil && k8serror.IsNotFound(err) { // Downstream namespace not present; assume object is not present as well and return upstream object // Prune resource upstream @@ -137,6 +138,7 @@ func (c *Controller) processDownstreamResource(ctx context.Context, gvr schema.G var nsObj runtime.Object if nsObj, err = c.downstreamNamespaceLister.Get(downstreamNamespace); err != nil { logger.Error(err, "Error getting downstream Namespace from downstream namespace lister", "ns", downstreamNamespace) + return err } nsMeta, ok := nsObj.(metav1.Object) if !ok { @@ -151,15 +153,16 @@ func (c *Controller) processDownstreamResource(ctx context.Context, gvr schema.G if !locatorExists || upstreamLocator == nil { return nil } - downstreamObject, err = c.downstreamClient.Resource(gvr).Namespace(downstreamNamespace).Get(ctx, downstreamName, metav1.GetOptions{}) + downstreamObject, err = c.downstreamInformer.ForResource(gvr).Lister().ByNamespace(downstreamNamespace).Get(downstreamName) if err != nil { logger.Error(err, "Could not find the resource downstream") } - } - if downstreamNamespace == "" { - downstreamObject, err = c.downstreamClient.Resource(gvr).Get(ctx, downstreamName, metav1.GetOptions{}) - if err != nil { + } else { + downstreamObject, err = c.downstreamInformer.ForResource(gvr).Lister().Get(downstreamName) + if err != nil && k8serror.IsNotFound(err) { logger.Error(err, "Could not find the resource downstream") + // Todo: Perform cleanup on the upstream resource + return nil } objMeta, ok := downstreamObject.(metav1.Object) if !ok { @@ -170,7 +173,8 @@ func (c *Controller) processDownstreamResource(ctx context.Context, gvr schema.G } if !locatorExists || upstreamLocator == nil { - return nil + logger.Error(err, "locator not found in the resource") + return errors.New("locator not found in the downstream resource") } } @@ -199,6 +203,7 @@ func (c *Controller) processDownstreamResource(ctx context.Context, gvr schema.G } c.updateResourceContent(ctx, gvr, upstreamWorkspace, upstreamNamespace, resource, downstreamResource, "spec") c.updateResourceContent(ctx, gvr, upstreamWorkspace, upstreamNamespace, resource, downstreamResource, "status") + return nil } else { resourceVersionUpstream = unstructuredUpstreamResource.GetAnnotations()[ResourceVersionAnnotation] } diff --git a/pkg/syncer/upsync/upsync_process_test.go b/pkg/syncer/upsync/upsync_process_test.go index 0bcb20f3e124..b01ef440734c 100644 --- a/pkg/syncer/upsync/upsync_process_test.go +++ b/pkg/syncer/upsync/upsync_process_test.go @@ -82,7 +82,7 @@ func TestUpsyncerprocess(t *testing.T) { isUpstream bool updateType []UpdateType }{ - "StatusSyncer upsyncs namespaced resources": { + "Upsyncer upsyncs namespaced resources": { upstreamLogicalCluster: "root:org:ws", fromNamespace: namespace("test", "", map[string]string{ "internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5", @@ -100,9 +100,7 @@ func TestUpsyncerprocess(t *testing.T) { toResources: []runtime.Object{}, resourceToProcessName: "test-pvc", syncTargetName: "us-west1", - expectActionsOnFrom: []clienttesting.Action{clienttesting.NewGetAction(schema.GroupVersionResource{Group: "", - Version: "v1", - Resource: "persistentvolumeclaims"}, "test", "test-pvc")}, + expectActionsOnFrom: []clienttesting.Action{}, expectActionsOnTo: []kcptesting.Action{ // kcptesting.NewGetAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumeclaims"}, logicalcluster.New("root:org:ws"), "test", "test-pvc"), kcptesting.NewCreateAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumeclaims"}, logicalcluster.New("root:org:ws"), "test", toUnstructured(t, getPVC("test-pvc", "test", "", map[string]string{ @@ -123,7 +121,7 @@ func TestUpsyncerprocess(t *testing.T) { isUpstream: false, updateType: []UpdateType{MetadataUpdate, SpecUpdate}, }, - "StatusSyncer upsyncs cluster-wide resources": { + "Upsyncer upsyncs cluster-wide resources": { upstreamLogicalCluster: "root:org:ws", fromNamespace: nil, gvr: schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"}, @@ -137,7 +135,7 @@ func TestUpsyncerprocess(t *testing.T) { toResources: []runtime.Object{}, resourceToProcessName: "test-pv", syncTargetName: "us-west1", - expectActionsOnFrom: []clienttesting.Action{clienttesting.NewGetAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"}, "", "test-pv")}, + expectActionsOnFrom: []clienttesting.Action{}, expectActionsOnTo: []kcptesting.Action{ // kcptesting.NewGetAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"}, logicalcluster.New("root:org:ws"), "", "test-pv"), kcptesting.NewCreateAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"}, logicalcluster.New("root:org:ws"), "", toUnstructured(t, getPV(getPVC("test", "test", "", map[string]string{ @@ -179,7 +177,7 @@ func TestUpsyncerprocess(t *testing.T) { }, resourceToProcessName: "test-pvc", syncTargetName: "us-west1", - expectActionsOnFrom: []clienttesting.Action{clienttesting.NewGetAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumeclaims"}, "test", "test-pvc")}, + expectActionsOnFrom: []clienttesting.Action{}, expectActionsOnTo: []kcptesting.Action{ // kcptesting.NewGetAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumeclaims"}, logicalcluster.New("root:org:ws"), "test", "test-pvc"), kcptesting.NewUpdateSubresourceAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumeclaims"}, logicalcluster.New("root:org:ws"), "metadata", "test", @@ -212,7 +210,7 @@ func TestUpsyncerprocess(t *testing.T) { }, resourceToProcessName: "test-pv", syncTargetName: "us-west1", - expectActionsOnFrom: []clienttesting.Action{clienttesting.NewGetAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"}, "", "test-pv")}, + expectActionsOnFrom: []clienttesting.Action{}, expectActionsOnTo: []kcptesting.Action{ // kcptesting.NewGetAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"}, logicalcluster.New("root:org:ws"), "", "test-pv"), kcptesting.NewUpdateSubresourceAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"}, logicalcluster.New("root:org:ws"), "metadata", "", toUnstructured(t, getPV(getPVC("test", "test", "", map[string]string{ @@ -270,9 +268,7 @@ func TestUpsyncerprocess(t *testing.T) { }, resourceToProcessName: "test-pv", syncTargetName: "us-west1", - expectActionsOnFrom: []clienttesting.Action{ - clienttesting.NewGetAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"}, "", "test-pv"), - }, + expectActionsOnFrom: []clienttesting.Action{}, expectActionsOnTo: []kcptesting.Action{ kcptesting.NewDeleteAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"}, logicalcluster.New("root:org:ws"), "", "test-pv"), },