Skip to content

Commit

Permalink
reconciler/cache/reconciler: simplify and generalize
Browse files Browse the repository at this point in the history
Co-authored-by: Lukasz Szaszkiewicz <lukasz.szaszkiewicz@gmail.com>
Co-authored-by: Stefan Schimanski <sttts@redhat.com>
  • Loading branch information
3 people committed Jan 12, 2023
1 parent ea32ecd commit 19e4bde
Show file tree
Hide file tree
Showing 6 changed files with 398 additions and 776 deletions.
194 changes: 87 additions & 107 deletions pkg/reconciler/cache/replication/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
kcpdynamic "github.com/kcp-dev/client-go/dynamic"
kcpkubernetesinformers "github.com/kcp-dev/client-go/informers"
kcprbaclisters "github.com/kcp-dev/client-go/listers/rbac/v1"

rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -35,14 +35,12 @@ import (
"k8s.io/klog/v2"

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
"github.com/kcp-dev/kcp/pkg/apis/core"
corev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/core/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1"
cacheclient "github.com/kcp-dev/kcp/pkg/cache/client"
"github.com/kcp-dev/kcp/pkg/cache/client/shard"
kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions"
apisv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/apis/v1alpha1"
corev1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/core/v1alpha1"
tenancyv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/tenancy/v1alpha1"
"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/logging"
)
Expand All @@ -60,92 +58,79 @@ const (
func NewController(
shardName string,
dynamicCacheClient kcpdynamic.ClusterInterface,
dynamicLocalClient kcpdynamic.ClusterInterface,
localKcpInformers kcpinformers.SharedInformerFactory,
globalKcpInformers kcpinformers.SharedInformerFactory,
localKubeInformers kcpkubernetesinformers.SharedInformerFactory,
globalKubeInformers kcpkubernetesinformers.SharedInformerFactory,
) (*controller, error) {
c := &controller{
shardName: shardName,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
dynamicKcpCacheClient: dynamicCacheClient,
dynamicKcpLocalClient: dynamicLocalClient,

localAPIExportLister: localKcpInformers.Apis().V1alpha1().APIExports().Lister(),
localAPIResourceSchemaLister: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Lister(),
localShardLister: localKcpInformers.Core().V1alpha1().Shards().Lister(),
localWorkspaceTypeLister: localKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Lister(),
localClusterRoleLister: localKubeInformers.Rbac().V1().ClusterRoles().Lister(),
localClusterRoleBindingLister: localKubeInformers.Rbac().V1().ClusterRoleBindings().Lister(),

globalAPIExportIndexer: globalKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(),
globalAPIResourceSchemaIndexer: globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(),
globalShardIndexer: globalKcpInformers.Core().V1alpha1().Shards().Informer().GetIndexer(),
globalWorkspaceTypeIndexer: globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer().GetIndexer(),
globalClusterRoleIndexer: globalKubeInformers.Rbac().V1().ClusterRoles().Informer().GetIndexer(),
globalClusterRoleBindingIndexer: globalKubeInformers.Rbac().V1().ClusterRoleBindings().Informer().GetIndexer(),
}

indexers.AddIfNotPresentOrDie(
globalKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
shardName: shardName,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
dynamicCacheClient: dynamicCacheClient,

gvrs: map[schema.GroupVersionResource]replicatedGVR{
apisv1alpha1.SchemeGroupVersion.WithResource("apiexports"): {
kind: "APIExport",
local: localKcpInformers.Apis().V1alpha1().APIExports().Informer(),
global: globalKcpInformers.Apis().V1alpha1().APIExports().Informer(),
},
apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"): {
kind: "APIResourceSchema",
local: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(),
global: globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(),
},
corev1alpha1.SchemeGroupVersion.WithResource("shards"): {
kind: "Shard",
local: localKcpInformers.Core().V1alpha1().Shards().Informer(),
global: globalKcpInformers.Core().V1alpha1().Shards().Informer(),
},
tenancyv1alpha1.SchemeGroupVersion.WithResource("workspacetypes"): {
kind: "WorkspaceType",
local: localKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer(),
global: globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer(),
},
rbacv1.SchemeGroupVersion.WithResource("clusterroles"): {
kind: "ClusterRole",
filter: func(u *unstructured.Unstructured) bool {
return u.GetAnnotations()[core.ReplicateAnnotationKey] != ""
},
local: localKubeInformers.Rbac().V1().ClusterRoles().Informer(),
global: globalKubeInformers.Rbac().V1().ClusterRoles().Informer(),
},
rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"): {
kind: "ClusterRoleBinding",
filter: func(u *unstructured.Unstructured) bool {
return u.GetAnnotations()[core.ReplicateAnnotationKey] != ""
},
local: localKubeInformers.Rbac().V1().ClusterRoleBindings().Informer(),
global: globalKubeInformers.Rbac().V1().ClusterRoleBindings().Informer(),
},
},
)

indexers.AddIfNotPresentOrDie(
globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
},
)

indexers.AddIfNotPresentOrDie(
globalKcpInformers.Core().V1alpha1().Shards().Informer().GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
},
)

indexers.AddIfNotPresentOrDie(
globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer().GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
},
)

indexers.AddIfNotPresentOrDie(
globalKubeInformers.Rbac().V1().ClusterRoles().Informer().GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
},
)

indexers.AddIfNotPresentOrDie(
globalKubeInformers.Rbac().V1().ClusterRoleBindings().Informer().GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
},
)

localKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.objectInformerEventHandler(apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")))
globalKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.objectInformerEventHandler(apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")))

localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.objectInformerEventHandler(apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas")))
globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.objectInformerEventHandler(apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas")))

localKcpInformers.Core().V1alpha1().Shards().Informer().AddEventHandler(c.objectInformerEventHandler(corev1alpha1.SchemeGroupVersion.WithResource("shards")))
globalKcpInformers.Core().V1alpha1().Shards().Informer().AddEventHandler(c.objectInformerEventHandler(corev1alpha1.SchemeGroupVersion.WithResource("shards")))

localKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer().AddEventHandler(c.objectInformerEventHandler(tenancyv1alpha1.SchemeGroupVersion.WithResource("workspacetypes")))
globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer().AddEventHandler(c.objectInformerEventHandler(tenancyv1alpha1.SchemeGroupVersion.WithResource("workspacetypes")))

localKubeInformers.Rbac().V1().ClusterRoles().Informer().AddEventHandler(c.objectInformerEventHandler(rbacv1.SchemeGroupVersion.WithResource("clusterroles")))
globalKubeInformers.Rbac().V1().ClusterRoles().Informer().AddEventHandler(c.objectInformerEventHandler(rbacv1.SchemeGroupVersion.WithResource("clusterroles")))
}

localKubeInformers.Rbac().V1().ClusterRoleBindings().Informer().AddEventHandler(c.objectInformerEventHandler(rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings")))
globalKubeInformers.Rbac().V1().ClusterRoleBindings().Informer().AddEventHandler(c.objectInformerEventHandler(rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings")))
for gvr, info := range c.gvrs {
indexers.AddIfNotPresentOrDie(
info.global.GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
},
)

// shadow gvr to get the right value in the closure
gvr := gvr

info.local.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) },
UpdateFunc: func(_, obj interface{}) { c.enqueueObject(obj, gvr) },
DeleteFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) },
})

info.global.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueueCacheObject(obj, gvr) },
UpdateFunc: func(_, obj interface{}) { c.enqueueCacheObject(obj, gvr) },
DeleteFunc: func(obj interface{}) { c.enqueueCacheObject(obj, gvr) },
})
}

return c, nil
}
Expand All @@ -156,7 +141,17 @@ func (c *controller) enqueueObject(obj interface{}, gvr schema.GroupVersionResou
runtime.HandleError(err)
return
}
gvrKey := fmt.Sprintf("%v::%v", gvr.String(), key)
gvrKey := fmt.Sprintf("%s.%s.%s::%s", gvr.Version, gvr.Resource, gvr.Group, key)
c.queue.Add(gvrKey)
}

func (c *controller) enqueueCacheObject(obj interface{}, gvr schema.GroupVersionResource) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
gvrKey := fmt.Sprintf("%s.%s.%s::%s", gvr.Version, gvr.Resource, gvr.Group, key)
c.queue.Add(gvrKey)
}

Expand Down Expand Up @@ -202,32 +197,17 @@ func (c *controller) processNextWorkItem(ctx context.Context) bool {
return true
}

func (c *controller) objectInformerEventHandler(gvr schema.GroupVersionResource) cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) },
UpdateFunc: func(_, obj interface{}) { c.enqueueObject(obj, gvr) },
DeleteFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) },
}
}

type controller struct {
shardName string
queue workqueue.RateLimitingInterface

dynamicKcpCacheClient kcpdynamic.ClusterInterface
dynamicKcpLocalClient kcpdynamic.ClusterInterface

localAPIExportLister apisv1alpha1listers.APIExportClusterLister
localAPIResourceSchemaLister apisv1alpha1listers.APIResourceSchemaClusterLister
localShardLister corev1alpha1listers.ShardClusterLister
localWorkspaceTypeLister tenancyv1alpha1listers.WorkspaceTypeClusterLister
localClusterRoleLister kcprbaclisters.ClusterRoleClusterLister
localClusterRoleBindingLister kcprbaclisters.ClusterRoleBindingClusterLister

globalAPIExportIndexer cache.Indexer
globalAPIResourceSchemaIndexer cache.Indexer
globalShardIndexer cache.Indexer
globalWorkspaceTypeIndexer cache.Indexer
globalClusterRoleIndexer cache.Indexer
globalClusterRoleBindingIndexer cache.Indexer
dynamicCacheClient kcpdynamic.ClusterInterface

gvrs map[schema.GroupVersionResource]replicatedGVR
}

type replicatedGVR struct {
kind string
filter func(u *unstructured.Unstructured) bool
global, local cache.SharedIndexInformer
}
Loading

0 comments on commit 19e4bde

Please sign in to comment.