Skip to content

Commit

Permalink
Fixes after the rebase on the Syncer refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: David Festal <dfestal@redhat.com>
  • Loading branch information
davidfestal committed Jan 10, 2023
1 parent 1e0ceaa commit 93c244c
Show file tree
Hide file tree
Showing 4 changed files with 343 additions and 251 deletions.
2 changes: 1 addition & 1 deletion pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i
}

logger.Info("Creating resource upsyncer")
upSyncer, err := upsync.NewUpSyncer(logger, logicalcluster.From(syncTarget), cfg.SyncTargetName, syncTargetKey, upstreamUpsyncerClusterClient, downstreamDynamicClient, ddsifForUpstreamSyncer, ddsifForUpstreamUpsyncer, ddsifForDownstream, syncTarget.GetUID())
upSyncer, err := upsync.NewUpSyncer(logger, logicalcluster.From(syncTarget), cfg.SyncTargetName, syncTargetKey, upstreamUpsyncerClusterClient, downstreamDynamicClient, ddsifForUpstreamUpsyncer, ddsifForDownstream, syncTarget.GetUID())
if err != nil {
return err
}
Expand Down
77 changes: 39 additions & 38 deletions pkg/syncer/upsync/upsync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/kcp-dev/kcp/pkg/syncer/shared"
"github.com/kcp-dev/logicalcluster/v3"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -41,6 +40,8 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
corev1 "k8s.io/api/core/v1"
)

const (
Expand All @@ -57,6 +58,7 @@ type Controller struct {
downstreamInformer dynamicinformer.DynamicSharedInformerFactory

getDownstreamLister func(gvr schema.GroupVersionResource) (cache.GenericLister, error)
getUpstreamUpsyncerLister func(gvr schema.GroupVersionResource) (kcpcache.GenericClusterLister, error)

syncTargetName string
syncTargetWorkspace logicalcluster.Name
Expand All @@ -82,13 +84,7 @@ const (
// Add the cluster name/namespace#cluster-name
func getKey(obj interface{}, keySource Keysource) (string, error) {
if keySource == Upstream {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return key, err
}
clusterName := logicalcluster.From(obj.(metav1.Object)).String()
keyWithClusterName := key + "#" + clusterName
return keyWithClusterName, nil
return kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
}
return cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
}
Expand Down Expand Up @@ -180,7 +176,6 @@ func (c *Controller) AddToQueue(gvr schema.GroupVersionResource, obj interface{}
func NewUpSyncer(syncerLogger logr.Logger, syncTargetWorkspace logicalcluster.Name,
syncTargetName, syncTargetKey string,
upstreamClient kcpdynamic.ClusterInterface, downstreamClient dynamic.Interface,
ddsifForUpstreamSyncer *ddsif.DiscoveringDynamicSharedInformerFactory,
ddsifForUpstreamUpyncer *ddsif.DiscoveringDynamicSharedInformerFactory,
ddsifForDownstream *ddsif.GenericDiscoveringDynamicSharedInformerFactory[cache.SharedIndexInformer, cache.GenericLister, informers.GenericInformer],
syncTargetUID types.UID) (*Controller, error) {
Expand All @@ -199,46 +194,52 @@ func NewUpSyncer(syncerLogger logr.Logger, syncTargetWorkspace logicalcluster.Na
}
return informer.Lister(), nil
},

downstreamUpsyncInformer: downstreamUpsyncInformer,
getUpstreamUpsyncerLister: func(gvr schema.GroupVersionResource) (kcpcache.GenericClusterLister, error) {
informers, notSynced := ddsifForUpstreamUpyncer.Informers()
informer, ok := informers[gvr]
if !ok {
if shared.ContainsGVR(notSynced, gvr) {
return nil, fmt.Errorf("informer for gvr %v not synced in the downstream informer factory", gvr)
}
return nil, fmt.Errorf("gvr %v should be known in the downstream informer factory", gvr)
}
return informer.Lister(), nil
},
syncTargetName: syncTargetKey,
syncTargetWorkspace: syncTargetWorkspace,
syncTargetUID: syncTargetUID,
syncTargetKey: syncTargetKey,
upstreamInformer: upstreamInformer,
}
logger := logging.WithReconciler(syncerLogger, controllerName)

downstreamUpsyncInformer.AddDownstreamEventHandler(
func(gvr schema.GroupVersionResource) cache.ResourceEventHandler {
logger.V(2).Info("Set up downstream resources informer", "gvr", gvr.String())
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.AddToQueue(gvr, obj, logger, Downstream, []UpdateType{})
},
UpdateFunc: func(oldObj, newObj interface{}) {
updateType := getUpdateType(oldObj.(*unstructured.Unstructured), newObj.(*unstructured.Unstructured))
if len(updateType) > 0 {
c.AddToQueue(gvr, newObj, logger, Downstream, updateType)
}
},
DeleteFunc: func(obj interface{}) {
c.AddToQueue(gvr, obj, logger, Downstream, []UpdateType{})
},
namespaceGVR := corev1.SchemeGroupVersion.WithResource("namespaces")

ddsifForDownstream.AddEventHandler(ddsif.GVREventHandlerFuncs{
AddFunc: func(gvr schema.GroupVersionResource, obj interface{}) {
if gvr == namespaceGVR {
return
}
},
)

downstreamUpsyncInformer.AddUpstreamEventHandler(
func(gvr schema.GroupVersionResource) cache.ResourceEventHandler {
logger.V(2).Info("Set up upstream resources informer", "gvr", gvr.String())
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.AddToQueue(gvr, obj, logger, Upstream, []UpdateType{})
},
c.AddToQueue(gvr, obj, logger, Downstream, []UpdateType{})
},
UpdateFunc: func(gvr schema.GroupVersionResource, oldObj, newObj interface{}) {
updateType := getUpdateType(oldObj.(*unstructured.Unstructured), newObj.(*unstructured.Unstructured))
if len(updateType) > 0 {
c.AddToQueue(gvr, newObj, logger, Downstream, updateType)
}
},
)
DeleteFunc: func(gvr schema.GroupVersionResource, obj interface{}) {
// TODO(davidfestal): do we want to extract the namespace from where the resource was deleted,
// as done in the SpecController ?
c.AddToQueue(gvr, obj, logger, Downstream, []UpdateType{})
},
})

ddsifForUpstreamUpyncer.AddEventHandler(ddsif.GVREventHandlerFuncs{
AddFunc: func(gvr schema.GroupVersionResource, obj interface{}) {
c.AddToQueue(gvr, obj, logger, Upstream, []UpdateType{})
},
})
return c, nil
}

Expand Down
Loading

0 comments on commit 93c244c

Please sign in to comment.