Skip to content

Commit

Permalink
Merge pull request #2609 from sttts/sttts-replication-controller-rewrite
Browse files Browse the repository at this point in the history
🌱 reconciler/cache/reconciler: simplify and generalize
  • Loading branch information
openshift-merge-robot authored Jan 12, 2023
2 parents aba9890 + 1a046b6 commit 31c76aa
Show file tree
Hide file tree
Showing 6 changed files with 400 additions and 776 deletions.
194 changes: 87 additions & 107 deletions pkg/reconciler/cache/replication/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
kcpdynamic "github.com/kcp-dev/client-go/dynamic"
kcpkubernetesinformers "github.com/kcp-dev/client-go/informers"
kcprbaclisters "github.com/kcp-dev/client-go/listers/rbac/v1"

rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -35,14 +35,12 @@ import (
"k8s.io/klog/v2"

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
"github.com/kcp-dev/kcp/pkg/apis/core"
corev1alpha1 "github.com/kcp-dev/kcp/pkg/apis/core/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1"
cacheclient "github.com/kcp-dev/kcp/pkg/cache/client"
"github.com/kcp-dev/kcp/pkg/cache/client/shard"
kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions"
apisv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/apis/v1alpha1"
corev1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/core/v1alpha1"
tenancyv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/tenancy/v1alpha1"
"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/logging"
)
Expand All @@ -60,92 +58,79 @@ const (
func NewController(
shardName string,
dynamicCacheClient kcpdynamic.ClusterInterface,
dynamicLocalClient kcpdynamic.ClusterInterface,
localKcpInformers kcpinformers.SharedInformerFactory,
globalKcpInformers kcpinformers.SharedInformerFactory,
localKubeInformers kcpkubernetesinformers.SharedInformerFactory,
globalKubeInformers kcpkubernetesinformers.SharedInformerFactory,
) (*controller, error) {
c := &controller{
shardName: shardName,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
dynamicKcpCacheClient: dynamicCacheClient,
dynamicKcpLocalClient: dynamicLocalClient,

localAPIExportLister: localKcpInformers.Apis().V1alpha1().APIExports().Lister(),
localAPIResourceSchemaLister: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Lister(),
localShardLister: localKcpInformers.Core().V1alpha1().Shards().Lister(),
localWorkspaceTypeLister: localKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Lister(),
localClusterRoleLister: localKubeInformers.Rbac().V1().ClusterRoles().Lister(),
localClusterRoleBindingLister: localKubeInformers.Rbac().V1().ClusterRoleBindings().Lister(),

globalAPIExportIndexer: globalKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(),
globalAPIResourceSchemaIndexer: globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(),
globalShardIndexer: globalKcpInformers.Core().V1alpha1().Shards().Informer().GetIndexer(),
globalWorkspaceTypeIndexer: globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer().GetIndexer(),
globalClusterRoleIndexer: globalKubeInformers.Rbac().V1().ClusterRoles().Informer().GetIndexer(),
globalClusterRoleBindingIndexer: globalKubeInformers.Rbac().V1().ClusterRoleBindings().Informer().GetIndexer(),
}

indexers.AddIfNotPresentOrDie(
globalKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
shardName: shardName,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
dynamicCacheClient: dynamicCacheClient,

gvrs: map[schema.GroupVersionResource]replicatedGVR{
apisv1alpha1.SchemeGroupVersion.WithResource("apiexports"): {
kind: "APIExport",
local: localKcpInformers.Apis().V1alpha1().APIExports().Informer(),
global: globalKcpInformers.Apis().V1alpha1().APIExports().Informer(),
},
apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"): {
kind: "APIResourceSchema",
local: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(),
global: globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer(),
},
corev1alpha1.SchemeGroupVersion.WithResource("shards"): {
kind: "Shard",
local: localKcpInformers.Core().V1alpha1().Shards().Informer(),
global: globalKcpInformers.Core().V1alpha1().Shards().Informer(),
},
tenancyv1alpha1.SchemeGroupVersion.WithResource("workspacetypes"): {
kind: "WorkspaceType",
local: localKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer(),
global: globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer(),
},
rbacv1.SchemeGroupVersion.WithResource("clusterroles"): {
kind: "ClusterRole",
filter: func(u *unstructured.Unstructured) bool {
return u.GetAnnotations()[core.ReplicateAnnotationKey] != ""
},
local: localKubeInformers.Rbac().V1().ClusterRoles().Informer(),
global: globalKubeInformers.Rbac().V1().ClusterRoles().Informer(),
},
rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings"): {
kind: "ClusterRoleBinding",
filter: func(u *unstructured.Unstructured) bool {
return u.GetAnnotations()[core.ReplicateAnnotationKey] != ""
},
local: localKubeInformers.Rbac().V1().ClusterRoleBindings().Informer(),
global: globalKubeInformers.Rbac().V1().ClusterRoleBindings().Informer(),
},
},
)

indexers.AddIfNotPresentOrDie(
globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
},
)

indexers.AddIfNotPresentOrDie(
globalKcpInformers.Core().V1alpha1().Shards().Informer().GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
},
)

indexers.AddIfNotPresentOrDie(
globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer().GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
},
)

indexers.AddIfNotPresentOrDie(
globalKubeInformers.Rbac().V1().ClusterRoles().Informer().GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
},
)

indexers.AddIfNotPresentOrDie(
globalKubeInformers.Rbac().V1().ClusterRoleBindings().Informer().GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
},
)

localKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.objectInformerEventHandler(apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")))
globalKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.objectInformerEventHandler(apisv1alpha1.SchemeGroupVersion.WithResource("apiexports")))

localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.objectInformerEventHandler(apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas")))
globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.objectInformerEventHandler(apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas")))

localKcpInformers.Core().V1alpha1().Shards().Informer().AddEventHandler(c.objectInformerEventHandler(corev1alpha1.SchemeGroupVersion.WithResource("shards")))
globalKcpInformers.Core().V1alpha1().Shards().Informer().AddEventHandler(c.objectInformerEventHandler(corev1alpha1.SchemeGroupVersion.WithResource("shards")))

localKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer().AddEventHandler(c.objectInformerEventHandler(tenancyv1alpha1.SchemeGroupVersion.WithResource("workspacetypes")))
globalKcpInformers.Tenancy().V1alpha1().WorkspaceTypes().Informer().AddEventHandler(c.objectInformerEventHandler(tenancyv1alpha1.SchemeGroupVersion.WithResource("workspacetypes")))

localKubeInformers.Rbac().V1().ClusterRoles().Informer().AddEventHandler(c.objectInformerEventHandler(rbacv1.SchemeGroupVersion.WithResource("clusterroles")))
globalKubeInformers.Rbac().V1().ClusterRoles().Informer().AddEventHandler(c.objectInformerEventHandler(rbacv1.SchemeGroupVersion.WithResource("clusterroles")))
}

localKubeInformers.Rbac().V1().ClusterRoleBindings().Informer().AddEventHandler(c.objectInformerEventHandler(rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings")))
globalKubeInformers.Rbac().V1().ClusterRoleBindings().Informer().AddEventHandler(c.objectInformerEventHandler(rbacv1.SchemeGroupVersion.WithResource("clusterrolebindings")))
for gvr, info := range c.gvrs {
indexers.AddIfNotPresentOrDie(
info.global.GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
},
)

// shadow gvr to get the right value in the closure
gvr := gvr

info.local.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) },
UpdateFunc: func(_, obj interface{}) { c.enqueueObject(obj, gvr) },
DeleteFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) },
})

info.global.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueueCacheObject(obj, gvr) },
UpdateFunc: func(_, obj interface{}) { c.enqueueCacheObject(obj, gvr) },
DeleteFunc: func(obj interface{}) { c.enqueueCacheObject(obj, gvr) },
})
}

return c, nil
}
Expand All @@ -156,7 +141,17 @@ func (c *controller) enqueueObject(obj interface{}, gvr schema.GroupVersionResou
runtime.HandleError(err)
return
}
gvrKey := fmt.Sprintf("%v::%v", gvr.String(), key)
gvrKey := fmt.Sprintf("%s.%s.%s::%s", gvr.Version, gvr.Resource, gvr.Group, key)
c.queue.Add(gvrKey)
}

func (c *controller) enqueueCacheObject(obj interface{}, gvr schema.GroupVersionResource) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
gvrKey := fmt.Sprintf("%s.%s.%s::%s", gvr.Version, gvr.Resource, gvr.Group, key)
c.queue.Add(gvrKey)
}

Expand Down Expand Up @@ -202,32 +197,17 @@ func (c *controller) processNextWorkItem(ctx context.Context) bool {
return true
}

func (c *controller) objectInformerEventHandler(gvr schema.GroupVersionResource) cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) },
UpdateFunc: func(_, obj interface{}) { c.enqueueObject(obj, gvr) },
DeleteFunc: func(obj interface{}) { c.enqueueObject(obj, gvr) },
}
}

type controller struct {
shardName string
queue workqueue.RateLimitingInterface

dynamicKcpCacheClient kcpdynamic.ClusterInterface
dynamicKcpLocalClient kcpdynamic.ClusterInterface

localAPIExportLister apisv1alpha1listers.APIExportClusterLister
localAPIResourceSchemaLister apisv1alpha1listers.APIResourceSchemaClusterLister
localShardLister corev1alpha1listers.ShardClusterLister
localWorkspaceTypeLister tenancyv1alpha1listers.WorkspaceTypeClusterLister
localClusterRoleLister kcprbaclisters.ClusterRoleClusterLister
localClusterRoleBindingLister kcprbaclisters.ClusterRoleBindingClusterLister

globalAPIExportIndexer cache.Indexer
globalAPIResourceSchemaIndexer cache.Indexer
globalShardIndexer cache.Indexer
globalWorkspaceTypeIndexer cache.Indexer
globalClusterRoleIndexer cache.Indexer
globalClusterRoleBindingIndexer cache.Indexer
dynamicCacheClient kcpdynamic.ClusterInterface

gvrs map[schema.GroupVersionResource]replicatedGVR
}

type replicatedGVR struct {
kind string
filter func(u *unstructured.Unstructured) bool
global, local cache.SharedIndexInformer
}
Loading

0 comments on commit 31c76aa

Please sign in to comment.