Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fixes after the rebase on the Syncer refactoring #1

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i
}

logger.Info("Creating resource upsyncer")
upSyncer, err := upsync.NewUpSyncer(logger, logicalcluster.From(syncTarget), cfg.SyncTargetName, syncTargetKey, upstreamUpsyncerClusterClient, downstreamDynamicClient, ddsifForUpstreamSyncer, ddsifForUpstreamUpsyncer, ddsifForDownstream, syncTarget.GetUID())
upSyncer, err := upsync.NewUpSyncer(logger, logicalcluster.From(syncTarget), cfg.SyncTargetName, syncTargetKey, upstreamUpsyncerClusterClient, downstreamDynamicClient, ddsifForUpstreamUpsyncer, ddsifForDownstream, syncTarget.GetUID())
if err != nil {
return err
}
Expand Down
118 changes: 63 additions & 55 deletions pkg/syncer/upsync/upsync_controller.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2022 The KCP Authors.
Copyright 2023 The KCP Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -23,14 +23,12 @@ import (
"time"

"github.com/go-logr/logr"
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
kcpdynamic "github.com/kcp-dev/client-go/dynamic"
"github.com/kcp-dev/client-go/dynamic/dynamicinformer"
ddsif "github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
"github.com/kcp-dev/logicalcluster/v3"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -41,22 +39,25 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

ddsif "github.com/kcp-dev/kcp/pkg/informer"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
)

const (
controllerName = "kcp-resource-upsycner"
controllerName = "kcp-resource-upsyncer"
labelKey = "kcp.resource.upsync"
syncValue = "sync"
)

type Controller struct {
queue workqueue.RateLimitingInterface
upstreamClient kcpdynamic.ClusterInterface
downstreamClient dynamic.Interface
downstreamNamespaceLister cache.GenericLister
downstreamInformer dynamicinformer.DynamicSharedInformerFactory
queue workqueue.RateLimitingInterface
upstreamClient kcpdynamic.ClusterInterface
downstreamClient dynamic.Interface

getDownstreamLister func(gvr schema.GroupVersionResource) (cache.GenericLister, error)
getDownstreamLister func(gvr schema.GroupVersionResource) (cache.GenericLister, error)
getUpstreamUpsyncerLister func(gvr schema.GroupVersionResource) (kcpcache.GenericClusterLister, error)

syncTargetName string
syncTargetWorkspace logicalcluster.Name
Expand All @@ -82,13 +83,7 @@ const (
// Add the cluster name/namespace#cluster-name
func getKey(obj interface{}, keySource Keysource) (string, error) {
if keySource == Upstream {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return key, err
}
clusterName := logicalcluster.From(obj.(metav1.Object)).String()
keyWithClusterName := key + "#" + clusterName
return keyWithClusterName, nil
return kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
}
return cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
}
Expand All @@ -103,7 +98,7 @@ func ParseUpstreamKey(key string) (clusterName, namespace, name string, err erro
return clusterName, namespace, name, nil
}

//Queue handles keys for both upstream and downstream resources. Key Source identifies if it's a downstream or an upstream object
// Queue handles keys for both upstream and downstream resources. Key Source identifies if it's a downstream or an upstream object
type queueKey struct {
gvr schema.GroupVersionResource
key string
Expand Down Expand Up @@ -154,7 +149,7 @@ func getUpdateType(oldObj *unstructured.Unstructured, newObj *unstructured.Unstr
updatedValues = append(updatedValues, StatusUpdate)
}
if isMetadataUpdated {
updatedValues = append(updatedValues, StatusUpdate)
updatedValues = append(updatedValues, MetadataUpdate)
}
return updatedValues
}
Expand All @@ -180,7 +175,6 @@ func (c *Controller) AddToQueue(gvr schema.GroupVersionResource, obj interface{}
func NewUpSyncer(syncerLogger logr.Logger, syncTargetWorkspace logicalcluster.Name,
syncTargetName, syncTargetKey string,
upstreamClient kcpdynamic.ClusterInterface, downstreamClient dynamic.Interface,
ddsifForUpstreamSyncer *ddsif.DiscoveringDynamicSharedInformerFactory,
ddsifForUpstreamUpyncer *ddsif.DiscoveringDynamicSharedInformerFactory,
ddsifForDownstream *ddsif.GenericDiscoveringDynamicSharedInformerFactory[cache.SharedIndexInformer, cache.GenericLister, informers.GenericInformer],
syncTargetUID types.UID) (*Controller, error) {
Expand All @@ -199,46 +193,60 @@ func NewUpSyncer(syncerLogger logr.Logger, syncTargetWorkspace logicalcluster.Na
}
return informer.Lister(), nil
},

downstreamUpsyncInformer: downstreamUpsyncInformer,
syncTargetName: syncTargetKey,
syncTargetWorkspace: syncTargetWorkspace,
syncTargetUID: syncTargetUID,
syncTargetKey: syncTargetKey,
upstreamInformer: upstreamInformer,
getUpstreamUpsyncerLister: func(gvr schema.GroupVersionResource) (kcpcache.GenericClusterLister, error) {
informers, notSynced := ddsifForUpstreamUpyncer.Informers()
informer, ok := informers[gvr]
if !ok {
if shared.ContainsGVR(notSynced, gvr) {
return nil, fmt.Errorf("informer for gvr %v not synced in the downstream informer factory", gvr)
}
return nil, fmt.Errorf("gvr %v should be known in the downstream informer factory", gvr)
}
return informer.Lister(), nil
},
syncTargetName: syncTargetKey,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, excuse me for interrupting.
I am very interesting to Upsync feature and am trying it on this branch.
Maybe, is this typo?
syncTargetName: syncTargetKey, => syncTargetName: syncTargetName,?
I'm facing an issue at /~https://github.com/davidfestal/kcp/blob/pr/bipuladh/2214/pkg/syncer/upsync/upsync_process.go#L60 that calculates the downstream namespace hash prefix but results in a different value from the actually used one.

syncTargetWorkspace: syncTargetWorkspace,
syncTargetUID: syncTargetUID,
syncTargetKey: syncTargetKey,
}
logger := logging.WithReconciler(syncerLogger, controllerName)

downstreamUpsyncInformer.AddDownstreamEventHandler(
func(gvr schema.GroupVersionResource) cache.ResourceEventHandler {
logger.V(2).Info("Set up downstream resources informer", "gvr", gvr.String())
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.AddToQueue(gvr, obj, logger, Downstream, []UpdateType{})
},
UpdateFunc: func(oldObj, newObj interface{}) {
updateType := getUpdateType(oldObj.(*unstructured.Unstructured), newObj.(*unstructured.Unstructured))
if len(updateType) > 0 {
c.AddToQueue(gvr, newObj, logger, Downstream, updateType)
}
},
DeleteFunc: func(obj interface{}) {
c.AddToQueue(gvr, obj, logger, Downstream, []UpdateType{})
},
namespaceGVR := corev1.SchemeGroupVersion.WithResource("namespaces")

ddsifForDownstream.AddEventHandler(ddsif.GVREventHandlerFuncs{
AddFunc: func(gvr schema.GroupVersionResource, obj interface{}) {
if gvr == namespaceGVR {
return
}
c.AddToQueue(gvr, obj, logger, Downstream, []UpdateType{})
},
)
UpdateFunc: func(gvr schema.GroupVersionResource, oldObj, newObj interface{}) {
if gvr == namespaceGVR {
return
}
updateType := getUpdateType(oldObj.(*unstructured.Unstructured), newObj.(*unstructured.Unstructured))
if len(updateType) > 0 {
c.AddToQueue(gvr, newObj, logger, Downstream, updateType)
}
},
DeleteFunc: func(gvr schema.GroupVersionResource, obj interface{}) {
if gvr == namespaceGVR {
return
}
// TODO(davidfestal): do we want to extract the namespace from where the resource was deleted,
// as done in the SpecController ?
c.AddToQueue(gvr, obj, logger, Downstream, []UpdateType{})
},
})

downstreamUpsyncInformer.AddUpstreamEventHandler(
func(gvr schema.GroupVersionResource) cache.ResourceEventHandler {
logger.V(2).Info("Set up upstream resources informer", "gvr", gvr.String())
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.AddToQueue(gvr, obj, logger, Upstream, []UpdateType{})
},
ddsifForUpstreamUpyncer.AddEventHandler(ddsif.GVREventHandlerFuncs{
AddFunc: func(gvr schema.GroupVersionResource, obj interface{}) {
if gvr == namespaceGVR {
return
}
c.AddToQueue(gvr, obj, logger, Upstream, []UpdateType{})
},
)
})
return c, nil
}

Expand Down
Loading