Skip to content

Commit

Permalink
Adding ClusterWorkspaceShard to the resources stored in the cache server
Browse files Browse the repository at this point in the history
Signed-off-by: Frederic Giloux <fgiloux@redhat.com>
  • Loading branch information
fgiloux committed Nov 18, 2022
1 parent dc43097 commit ef7f921
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 112 deletions.
1 change: 1 addition & 0 deletions docs/content/en/main/concepts/cache-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Out of the box, the server supports the following resources:

- `apiresourceschemas`
- `apiexports`
- `clusterworkspaceshards`

All those resources are represented as CustomResourceDefinitions and
stored in `system:cache:server` shard under `system:system-crds` cluster.
Expand Down
8 changes: 6 additions & 2 deletions pkg/cache/server/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@ const SystemCacheServerShard = "system:cache:server"

func Bootstrap(ctx context.Context, apiExtensionsClusterClient kcpapiextensionsclientset.ClusterInterface) error {
crds := []*apiextensionsv1.CustomResourceDefinition{}
for _, resource := range []string{"apiresourceschemas", "apiexports"} {
for _, resource := range []struct{ g, n string }{
{"apis.kcp.dev", "apiresourceschemas"},
{"apis.kcp.dev", "apiexports"},
{"tenancy.kcp.dev", "clusterworkspaceshards"},
} {
crd := &apiextensionsv1.CustomResourceDefinition{}
if err := configcrds.Unmarshal(fmt.Sprintf("apis.kcp.dev_%s.yaml", resource), crd); err != nil {
if err := configcrds.Unmarshal(fmt.Sprintf("%s_%s.yaml", resource.g, resource.n), crd); err != nil {
panic(fmt.Errorf("failed to unmarshal %v resource: %w", resource, err))
}
for i := range crd.Spec.Versions {
Expand Down
50 changes: 36 additions & 14 deletions pkg/reconciler/cache/replication/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ import (
"k8s.io/klog/v2"

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1"
cacheclient "github.com/kcp-dev/kcp/pkg/cache/client"
"github.com/kcp-dev/kcp/pkg/cache/client/shard"
kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions"
apislisters "github.com/kcp-dev/kcp/pkg/client/listers/apis/v1alpha1"
tenancyclisters "github.com/kcp-dev/kcp/pkg/client/listers/tenancy/v1alpha1"

"github.com/kcp-dev/kcp/pkg/logging"
)

Expand All @@ -46,8 +49,6 @@ const (

// NewController returns a new replication controller.
//
// The replication controller copies objects of defined resources that have the "internal.sharding.kcp.dev/replicate" annotation to the cache server.
//
// The replicated object will be placed under the same cluster as the original object.
// In addition to that, all replicated objects will be placed under the shard taken from the shardName argument.
// For example: shards/{shardName}/clusters/{clusterName}/apis/apis.kcp.dev/v1alpha1/apiexports
Expand All @@ -59,14 +60,16 @@ func NewController(
cacheKcpInformers kcpinformers.SharedInformerFactory,
) (*controller, error) {
c := &controller{
shardName: shardName,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
dynamicCacheClient: dynamicCacheClient,
dynamicLocalClient: dynamicLocalClient,
localApiExportLister: localKcpInformers.Apis().V1alpha1().APIExports().Lister(),
localApiResourceSchemaLister: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Lister(),
cacheApiExportsIndexer: cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(),
cacheApiResourceSchemaIndexer: cacheKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(),
shardName: shardName,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
dynamicCacheClient: dynamicCacheClient,
dynamicLocalClient: dynamicLocalClient,
localAPIExportLister: localKcpInformers.Apis().V1alpha1().APIExports().Lister(),
localAPIResourceSchemaLister: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Lister(),
localClusterWorkspaceShardLister: localKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Lister(),
cacheAPIExportsIndexer: cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(),
cacheAPIResourceSchemaIndexer: cacheKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(),
cacheClusterWorkspaceShardIndexer: cacheKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().GetIndexer(),
}

if err := cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().AddIndexers(cache.Indexers{
Expand All @@ -80,10 +83,19 @@ func NewController(
return nil, err
}

if err := cacheKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().AddIndexers(cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
}); err != nil {
return nil, err
}

localKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.apiExportInformerEventHandler())
localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.apiResourceSchemaInformerEventHandler())
localKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().AddEventHandler(c.clusterWorkspaceShardInformerEventHandler())
cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.apiExportInformerEventHandler())
cacheKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.apiResourceSchemaInformerEventHandler())
cacheKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().AddEventHandler(c.clusterWorkspaceShardInformerEventHandler())

return c, nil
}

Expand All @@ -95,6 +107,10 @@ func (c *controller) enqueueAPIResourceSchema(obj interface{}) {
c.enqueueObject(obj, apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"))
}

func (c *controller) enqueueClusterWorkspaceShard(obj interface{}) {
c.enqueueObject(obj, tenancyv1alpha1.SchemeGroupVersion.WithResource("clusterworkspaceshards"))
}

func (c *controller) enqueueObject(obj interface{}, gvr schema.GroupVersionResource) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
Expand Down Expand Up @@ -155,6 +171,10 @@ func (c *controller) apiResourceSchemaInformerEventHandler() cache.ResourceEvent
return objectInformerEventHandler(c.enqueueAPIResourceSchema)
}

func (c *controller) clusterWorkspaceShardInformerEventHandler() cache.ResourceEventHandler {
return objectInformerEventHandler(c.enqueueClusterWorkspaceShard)
}

func objectInformerEventHandler(enqueueObject func(obj interface{})) cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { enqueueObject(obj) },
Expand All @@ -170,9 +190,11 @@ type controller struct {
dynamicCacheClient kcpdynamic.ClusterInterface
dynamicLocalClient kcpdynamic.ClusterInterface

localApiExportLister apislisters.APIExportLister
localApiResourceSchemaLister apislisters.APIResourceSchemaLister
localAPIExportLister apislisters.APIExportLister
localAPIResourceSchemaLister apislisters.APIResourceSchemaLister
localClusterWorkspaceShardLister tenancyclisters.ClusterWorkspaceShardLister

cacheApiExportsIndexer cache.Indexer
cacheApiResourceSchemaIndexer cache.Indexer
cacheAPIExportsIndexer cache.Indexer
cacheAPIResourceSchemaIndexer cache.Indexer
cacheClusterWorkspaceShardIndexer cache.Indexer
}
20 changes: 16 additions & 4 deletions pkg/reconciler/cache/replication/replication_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/client-go/tools/cache"

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1"
)

func (c *controller) reconcile(ctx context.Context, gvrKey string) error {
Expand All @@ -47,21 +48,32 @@ func (c *controller) reconcile(ctx context.Context, gvrKey string) error {
apisv1alpha1.SchemeGroupVersion.WithResource("apiexports"),
apisv1alpha1.SchemeGroupVersion.WithKind("APIExport"),
func(gvr schema.GroupVersionResource, cluster, namespace, name string) (interface{}, error) {
return retrieveCacheObject(&gvr, c.cacheApiExportsIndexer, c.shardName, cluster, namespace, name)
return retrieveCacheObject(&gvr, c.cacheAPIExportsIndexer, c.shardName, cluster, namespace, name)
},
func(key string) (interface{}, error) {
return c.localApiExportLister.Get(key)
return c.localAPIExportLister.Get(key)
})
case apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas").String():
return c.reconcileObject(ctx,
keyParts[1],
apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"),
apisv1alpha1.SchemeGroupVersion.WithKind("APIResourceSchema"),
func(gvr schema.GroupVersionResource, cluster, namespace, name string) (interface{}, error) {
return retrieveCacheObject(&gvr, c.cacheApiResourceSchemaIndexer, c.shardName, cluster, namespace, name)
return retrieveCacheObject(&gvr, c.cacheAPIResourceSchemaIndexer, c.shardName, cluster, namespace, name)
},
func(key string) (interface{}, error) {
return c.localApiResourceSchemaLister.Get(key)
return c.localAPIResourceSchemaLister.Get(key)
})
case tenancyv1alpha1.SchemeGroupVersion.WithResource("clusterworkspaceshards").String():
return c.reconcileObject(ctx,
keyParts[1],
tenancyv1alpha1.SchemeGroupVersion.WithResource("clusterworkspaceshards"),
tenancyv1alpha1.SchemeGroupVersion.WithKind("ClusterWorkspaceShard"),
func(gvr schema.GroupVersionResource, cluster, namespace, name string) (interface{}, error) {
return retrieveCacheObject(&gvr, c.cacheClusterWorkspaceShardIndexer, c.shardName, cluster, namespace, name)
},
func(key string) (interface{}, error) {
return c.localClusterWorkspaceShardLister.Get(key)
})
default:
return fmt.Errorf("unsupported resource %v", keyParts[0])
Expand Down
Loading

0 comments on commit ef7f921

Please sign in to comment.