Skip to content

Commit

Permalink
Fixups! review comments #1
Browse files Browse the repository at this point in the history
  • Loading branch information
bipuladh authored and davidfestal committed Jan 12, 2023
1 parent eb9a82c commit ce87666
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 31 deletions.
52 changes: 39 additions & 13 deletions pkg/syncer/upsync/upsync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/go-logr/logr"
kcpdynamic "github.com/kcp-dev/client-go/dynamic"
"github.com/kcp-dev/client-go/dynamic/dynamicinformer"
ddsif "github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
Expand Down Expand Up @@ -53,6 +54,7 @@ type Controller struct {
upstreamClient kcpdynamic.ClusterInterface
downstreamClient dynamic.Interface
downstreamNamespaceLister cache.GenericLister
downstreamInformer dynamicinformer.DynamicSharedInformerFactory

getDownstreamLister func(gvr schema.GroupVersionResource) (cache.GenericLister, error)

Expand All @@ -62,18 +64,18 @@ type Controller struct {
syncTargetKey string
}

type Keysource uint
type UpdateType uint
type Keysource string
type UpdateType string

const (
Upstream Keysource = iota
Downstream
Upstream Keysource = "Upstream"
Downstream Keysource = "Downstream"
)

const (
SpecUpdate UpdateType = iota
StatusUpdate
MetadataUpdate
SpecUpdate UpdateType = "Spec"
StatusUpdate UpdateType = "Status"
MetadataUpdate UpdateType = "Meta"
)

// Upstream resource key generation
Expand Down Expand Up @@ -108,7 +110,27 @@ type queueKey struct {
// Differentiate between upstream and downstream keys
keysource Keysource
// Update type
updateType []UpdateType
updateType UpdateType
}

func updateTypeSlicetoString(update []UpdateType) UpdateType {
str := ""
for i, updateType := range update {
if i != 0 {
str += ","
}
str += string(updateType)
}
return UpdateType(str)
}

func updateTypeStringtoSlice(update UpdateType) []UpdateType {
str := strings.Split(string(update), ",")
updateType := make([]UpdateType, len(str))
for i, item := range str {
updateType[i] = UpdateType(item)
}
return updateType
}

func getUpdateType(oldObj *unstructured.Unstructured, newObj *unstructured.Unstructured) []UpdateType {
Expand Down Expand Up @@ -144,11 +166,13 @@ func (c *Controller) AddToQueue(gvr schema.GroupVersionResource, obj interface{}
return
}
logging.WithQueueKey(logger, key).V(2).Info("queueing GVR", "gvr", gvr.String())
updateTypeHashed := updateTypeSlicetoString(updateType)
c.queue.Add(
queueKey{
gvr: gvr,
key: key,
keysource: keysource,
gvr: gvr,
key: key,
keysource: keysource,
updateType: updateTypeHashed,
},
)
}
Expand Down Expand Up @@ -251,12 +275,14 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {
keySource = "Upstream"
}

logger := logging.WithQueueKey(klog.FromContext(ctx), qk.key).WithValues("gvr", qk.gvr.String(), "Key for:", keySource)
logger := logging.WithQueueKey(klog.FromContext(ctx), qk.key).WithValues("gvr", qk.gvr.String(), "source", keySource)
ctx = klog.NewContext(ctx, logger)
logger.V(1).Info("Processing key")
defer c.queue.Done(key)

if err := c.process(ctx, qk.gvr, qk.key, qk.keysource == Upstream, updateType); err != nil {
updateTypeArray := updateTypeStringtoSlice(updateType)

if err := c.process(ctx, qk.gvr, qk.key, qk.keysource == Upstream, updateTypeArray); err != nil {
runtime.HandleError(fmt.Errorf("%s failed to upsync %q, err: %w", controllerName, key, err))
c.queue.AddRateLimited(key)
return true
Expand Down
19 changes: 12 additions & 7 deletions pkg/syncer/upsync/upsync_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package upsync

import (
"context"
"errors"
"fmt"

"github.com/kcp-dev/kcp/pkg/syncer/shared"
Expand Down Expand Up @@ -84,7 +85,7 @@ func (c *Controller) processUpstreamResource(ctx context.Context, gvr schema.Gro
return err
}
} else {
_, err := c.downstreamClient.Resource(gvr).Get(context.TODO(), upstreamName, metav1.GetOptions{})
_, err := c.downstreamInformer.ForResource(gvr).Lister().Get(upstreamName)
if err != nil && k8serror.IsNotFound(err) {
// Downstream namespace not present; assume object is not present as well and return upstream object
// Prune resource upstream
Expand Down Expand Up @@ -137,6 +138,7 @@ func (c *Controller) processDownstreamResource(ctx context.Context, gvr schema.G
var nsObj runtime.Object
if nsObj, err = c.downstreamNamespaceLister.Get(downstreamNamespace); err != nil {
logger.Error(err, "Error getting downstream Namespace from downstream namespace lister", "ns", downstreamNamespace)
return err
}
nsMeta, ok := nsObj.(metav1.Object)
if !ok {
Expand All @@ -151,15 +153,16 @@ func (c *Controller) processDownstreamResource(ctx context.Context, gvr schema.G
if !locatorExists || upstreamLocator == nil {
return nil
}
downstreamObject, err = c.downstreamClient.Resource(gvr).Namespace(downstreamNamespace).Get(ctx, downstreamName, metav1.GetOptions{})
downstreamObject, err = c.downstreamInformer.ForResource(gvr).Lister().ByNamespace(downstreamNamespace).Get(downstreamName)
if err != nil {
logger.Error(err, "Could not find the resource downstream")
}
}
if downstreamNamespace == "" {
downstreamObject, err = c.downstreamClient.Resource(gvr).Get(ctx, downstreamName, metav1.GetOptions{})
if err != nil {
} else {
downstreamObject, err = c.downstreamInformer.ForResource(gvr).Lister().Get(downstreamName)
if err != nil && k8serror.IsNotFound(err) {
logger.Error(err, "Could not find the resource downstream")
// Todo: Perform cleanup on the upstream resource
return nil
}
objMeta, ok := downstreamObject.(metav1.Object)
if !ok {
Expand All @@ -170,7 +173,8 @@ func (c *Controller) processDownstreamResource(ctx context.Context, gvr schema.G
}

if !locatorExists || upstreamLocator == nil {
return nil
logger.Error(err, "locator not found in the resource")
return errors.New("locator not found in the downstream resource")
}
}

Expand Down Expand Up @@ -199,6 +203,7 @@ func (c *Controller) processDownstreamResource(ctx context.Context, gvr schema.G
}
c.updateResourceContent(ctx, gvr, upstreamWorkspace, upstreamNamespace, resource, downstreamResource, "spec")
c.updateResourceContent(ctx, gvr, upstreamWorkspace, upstreamNamespace, resource, downstreamResource, "status")
return nil
} else {
resourceVersionUpstream = unstructuredUpstreamResource.GetAnnotations()[ResourceVersionAnnotation]
}
Expand Down
18 changes: 7 additions & 11 deletions pkg/syncer/upsync/upsync_process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestUpsyncerprocess(t *testing.T) {
isUpstream bool
updateType []UpdateType
}{
"StatusSyncer upsyncs namespaced resources": {
"Upsyncer upsyncs namespaced resources": {
upstreamLogicalCluster: "root:org:ws",
fromNamespace: namespace("test", "", map[string]string{
"internal.workload.kcp.dev/cluster": "2gzO8uuQmIoZ2FE95zoOPKtrtGGXzzjAvtl6q5",
Expand All @@ -100,9 +100,7 @@ func TestUpsyncerprocess(t *testing.T) {
toResources: []runtime.Object{},
resourceToProcessName: "test-pvc",
syncTargetName: "us-west1",
expectActionsOnFrom: []clienttesting.Action{clienttesting.NewGetAction(schema.GroupVersionResource{Group: "",
Version: "v1",
Resource: "persistentvolumeclaims"}, "test", "test-pvc")},
expectActionsOnFrom: []clienttesting.Action{},
expectActionsOnTo: []kcptesting.Action{
// kcptesting.NewGetAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumeclaims"}, logicalcluster.New("root:org:ws"), "test", "test-pvc"),
kcptesting.NewCreateAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumeclaims"}, logicalcluster.New("root:org:ws"), "test", toUnstructured(t, getPVC("test-pvc", "test", "", map[string]string{
Expand All @@ -123,7 +121,7 @@ func TestUpsyncerprocess(t *testing.T) {
isUpstream: false,
updateType: []UpdateType{MetadataUpdate, SpecUpdate},
},
"StatusSyncer upsyncs cluster-wide resources": {
"Upsyncer upsyncs cluster-wide resources": {
upstreamLogicalCluster: "root:org:ws",
fromNamespace: nil,
gvr: schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"},
Expand All @@ -137,7 +135,7 @@ func TestUpsyncerprocess(t *testing.T) {
toResources: []runtime.Object{},
resourceToProcessName: "test-pv",
syncTargetName: "us-west1",
expectActionsOnFrom: []clienttesting.Action{clienttesting.NewGetAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"}, "", "test-pv")},
expectActionsOnFrom: []clienttesting.Action{},
expectActionsOnTo: []kcptesting.Action{
// kcptesting.NewGetAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"}, logicalcluster.New("root:org:ws"), "", "test-pv"),
kcptesting.NewCreateAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"}, logicalcluster.New("root:org:ws"), "", toUnstructured(t, getPV(getPVC("test", "test", "", map[string]string{
Expand Down Expand Up @@ -179,7 +177,7 @@ func TestUpsyncerprocess(t *testing.T) {
},
resourceToProcessName: "test-pvc",
syncTargetName: "us-west1",
expectActionsOnFrom: []clienttesting.Action{clienttesting.NewGetAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumeclaims"}, "test", "test-pvc")},
expectActionsOnFrom: []clienttesting.Action{},
expectActionsOnTo: []kcptesting.Action{
// kcptesting.NewGetAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumeclaims"}, logicalcluster.New("root:org:ws"), "test", "test-pvc"),
kcptesting.NewUpdateSubresourceAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumeclaims"}, logicalcluster.New("root:org:ws"), "metadata", "test",
Expand Down Expand Up @@ -212,7 +210,7 @@ func TestUpsyncerprocess(t *testing.T) {
},
resourceToProcessName: "test-pv",
syncTargetName: "us-west1",
expectActionsOnFrom: []clienttesting.Action{clienttesting.NewGetAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"}, "", "test-pv")},
expectActionsOnFrom: []clienttesting.Action{},
expectActionsOnTo: []kcptesting.Action{
// kcptesting.NewGetAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"}, logicalcluster.New("root:org:ws"), "", "test-pv"),
kcptesting.NewUpdateSubresourceAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"}, logicalcluster.New("root:org:ws"), "metadata", "", toUnstructured(t, getPV(getPVC("test", "test", "", map[string]string{
Expand Down Expand Up @@ -270,9 +268,7 @@ func TestUpsyncerprocess(t *testing.T) {
},
resourceToProcessName: "test-pv",
syncTargetName: "us-west1",
expectActionsOnFrom: []clienttesting.Action{
clienttesting.NewGetAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"}, "", "test-pv"),
},
expectActionsOnFrom: []clienttesting.Action{},
expectActionsOnTo: []kcptesting.Action{
kcptesting.NewDeleteAction(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumes"}, logicalcluster.New("root:org:ws"), "", "test-pv"),
},
Expand Down

0 comments on commit ce87666

Please sign in to comment.