Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Adding ClusterWorkspaceShard to the resources stored in the cache server #2381

Merged
merged 2 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/en/main/concepts/cache-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Out of the box, the server supports the following resources:

- `apiresourceschemas`
- `apiexports`
- `clusterworkspaceshards`

All those resources are represented as CustomResourceDefinitions and
stored in `system:cache:server` shard under `system:system-crds` cluster.
Expand Down
10 changes: 7 additions & 3 deletions pkg/cache/server/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@ const SystemCacheServerShard = "system:cache:server"

func Bootstrap(ctx context.Context, apiExtensionsClusterClient kcpapiextensionsclientset.ClusterInterface) error {
crds := []*apiextensionsv1.CustomResourceDefinition{}
for _, resource := range []string{"apiresourceschemas", "apiexports"} {
for _, gr := range []struct{ group, resource string }{
{"apis.kcp.dev", "apiresourceschemas"},
{"apis.kcp.dev", "apiexports"},
{"tenancy.kcp.dev", "clusterworkspaceshards"},
} {
crd := &apiextensionsv1.CustomResourceDefinition{}
if err := configcrds.Unmarshal(fmt.Sprintf("apis.kcp.dev_%s.yaml", resource), crd); err != nil {
panic(fmt.Errorf("failed to unmarshal %v resource: %w", resource, err))
if err := configcrds.Unmarshal(fmt.Sprintf("%s_%s.yaml", gr.group, gr.resource), crd); err != nil {
panic(fmt.Errorf("failed to unmarshal %v resource: %w", gr, err))
}
for i := range crd.Spec.Versions {
v := &crd.Spec.Versions[i]
Expand Down
80 changes: 53 additions & 27 deletions pkg/reconciler/cache/replication/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ import (
"k8s.io/klog/v2"

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1"
cacheclient "github.com/kcp-dev/kcp/pkg/cache/client"
"github.com/kcp-dev/kcp/pkg/cache/client/shard"
kcpinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions"
apisv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/apis/v1alpha1"
tenancyv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/tenancy/v1alpha1"
"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/logging"
)

Expand All @@ -46,8 +49,6 @@ const (

// NewController returns a new replication controller.
//
// The replication controller copies objects of defined resources that have the "internal.sharding.kcp.dev/replicate" annotation to the cache server.
//
// The replicated object will be placed under the same cluster as the original object.
// In addition to that, all replicated objects will be placed under the shard taken from the shardName argument.
// For example: shards/{shardName}/clusters/{clusterName}/apis/apis.kcp.dev/v1alpha1/apiexports
Expand All @@ -56,34 +57,49 @@ func NewController(
dynamicCacheClient kcpdynamic.ClusterInterface,
dynamicLocalClient kcpdynamic.ClusterInterface,
localKcpInformers kcpinformers.SharedInformerFactory,
cacheKcpInformers kcpinformers.SharedInformerFactory,
globalKcpInformers kcpinformers.SharedInformerFactory,
) (*controller, error) {
c := &controller{
shardName: shardName,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
dynamicCacheClient: dynamicCacheClient,
dynamicLocalClient: dynamicLocalClient,
localApiExportLister: localKcpInformers.Apis().V1alpha1().APIExports().Lister(),
localApiResourceSchemaLister: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Lister(),
cacheApiExportsIndexer: cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(),
cacheApiResourceSchemaIndexer: cacheKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(),
shardName: shardName,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
dynamicCacheClient: dynamicCacheClient,
dynamicLocalClient: dynamicLocalClient,
localAPIExportLister: localKcpInformers.Apis().V1alpha1().APIExports().Lister(),
localAPIResourceSchemaLister: localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Lister(),
localClusterWorkspaceShardLister: localKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Lister(),
globalAPIExportIndexer: globalKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(),
globalAPIResourceSchemaIndexer: globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(),
globalClusterWorkspaceShardIndexer: globalKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().GetIndexer(),
}

if err := cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().AddIndexers(cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
}); err != nil {
return nil, err
}
if err := cacheKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddIndexers(cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
}); err != nil {
return nil, err
}
indexers.AddIfNotPresentOrDie(
globalKcpInformers.Apis().V1alpha1().APIExports().Informer().GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
},
)

indexers.AddIfNotPresentOrDie(
globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
},
)

indexers.AddIfNotPresentOrDie(
globalKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().GetIndexer(),
cache.Indexers{
ByShardAndLogicalClusterAndNamespaceAndName: IndexByShardAndLogicalClusterAndNamespace,
},
)

localKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.apiExportInformerEventHandler())
localKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.apiResourceSchemaInformerEventHandler())
cacheKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.apiExportInformerEventHandler())
cacheKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.apiResourceSchemaInformerEventHandler())
localKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().AddEventHandler(c.clusterWorkspaceShardInformerEventHandler())
globalKcpInformers.Apis().V1alpha1().APIExports().Informer().AddEventHandler(c.apiExportInformerEventHandler())
globalKcpInformers.Apis().V1alpha1().APIResourceSchemas().Informer().AddEventHandler(c.apiResourceSchemaInformerEventHandler())
globalKcpInformers.Tenancy().V1alpha1().ClusterWorkspaceShards().Informer().AddEventHandler(c.clusterWorkspaceShardInformerEventHandler())

return c, nil
}

Expand All @@ -95,6 +111,10 @@ func (c *controller) enqueueAPIResourceSchema(obj interface{}) {
c.enqueueObject(obj, apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"))
}

func (c *controller) enqueueClusterWorkspaceShard(obj interface{}) {
c.enqueueObject(obj, tenancyv1alpha1.SchemeGroupVersion.WithResource("clusterworkspaceshards"))
}

func (c *controller) enqueueObject(obj interface{}, gvr schema.GroupVersionResource) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
Expand Down Expand Up @@ -155,6 +175,10 @@ func (c *controller) apiResourceSchemaInformerEventHandler() cache.ResourceEvent
return objectInformerEventHandler(c.enqueueAPIResourceSchema)
}

func (c *controller) clusterWorkspaceShardInformerEventHandler() cache.ResourceEventHandler {
return objectInformerEventHandler(c.enqueueClusterWorkspaceShard)
}

func objectInformerEventHandler(enqueueObject func(obj interface{})) cache.ResourceEventHandler {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { enqueueObject(obj) },
Expand All @@ -170,9 +194,11 @@ type controller struct {
dynamicCacheClient kcpdynamic.ClusterInterface
dynamicLocalClient kcpdynamic.ClusterInterface

localApiExportLister apisv1alpha1listers.APIExportClusterLister
localApiResourceSchemaLister apisv1alpha1listers.APIResourceSchemaClusterLister
localAPIExportLister apisv1alpha1listers.APIExportClusterLister
localAPIResourceSchemaLister apisv1alpha1listers.APIResourceSchemaClusterLister
localClusterWorkspaceShardLister tenancyv1alpha1listers.ClusterWorkspaceShardClusterLister

cacheApiExportsIndexer cache.Indexer
cacheApiResourceSchemaIndexer cache.Indexer
globalAPIExportIndexer cache.Indexer
globalAPIResourceSchemaIndexer cache.Indexer
globalClusterWorkspaceShardIndexer cache.Indexer
}
20 changes: 16 additions & 4 deletions pkg/reconciler/cache/replication/replication_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/client-go/tools/cache"

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1"
)

func (c *controller) reconcile(ctx context.Context, gvrKey string) error {
Expand All @@ -47,21 +48,32 @@ func (c *controller) reconcile(ctx context.Context, gvrKey string) error {
apisv1alpha1.SchemeGroupVersion.WithResource("apiexports"),
apisv1alpha1.SchemeGroupVersion.WithKind("APIExport"),
func(gvr schema.GroupVersionResource, cluster logicalcluster.Name, namespace, name string) (interface{}, error) {
return retrieveCacheObject(&gvr, c.cacheApiExportsIndexer, c.shardName, cluster, namespace, name)
return retrieveCacheObject(&gvr, c.globalAPIExportIndexer, c.shardName, cluster, namespace, name)
},
func(cluster logicalcluster.Name, _, name string) (interface{}, error) {
return c.localApiExportLister.Cluster(cluster).Get(name)
return c.localAPIExportLister.Cluster(cluster).Get(name)
fgiloux marked this conversation as resolved.
Show resolved Hide resolved
})
case apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas").String():
return c.reconcileObject(ctx,
keyParts[1],
apisv1alpha1.SchemeGroupVersion.WithResource("apiresourceschemas"),
apisv1alpha1.SchemeGroupVersion.WithKind("APIResourceSchema"),
func(gvr schema.GroupVersionResource, cluster logicalcluster.Name, namespace, name string) (interface{}, error) {
return retrieveCacheObject(&gvr, c.cacheApiResourceSchemaIndexer, c.shardName, cluster, namespace, name)
return retrieveCacheObject(&gvr, c.globalAPIResourceSchemaIndexer, c.shardName, cluster, namespace, name)
},
func(cluster logicalcluster.Name, _, name string) (interface{}, error) {
return c.localApiResourceSchemaLister.Cluster(cluster).Get(name)
return c.localAPIResourceSchemaLister.Cluster(cluster).Get(name)
})
case tenancyv1alpha1.SchemeGroupVersion.WithResource("clusterworkspaceshards").String():
return c.reconcileObject(ctx,
keyParts[1],
tenancyv1alpha1.SchemeGroupVersion.WithResource("clusterworkspaceshards"),
tenancyv1alpha1.SchemeGroupVersion.WithKind("ClusterWorkspaceShard"),
func(gvr schema.GroupVersionResource, cluster logicalcluster.Name, namespace, name string) (interface{}, error) {
return retrieveCacheObject(&gvr, c.globalClusterWorkspaceShardIndexer, c.shardName, cluster, namespace, name)
},
func(cluster logicalcluster.Name, _, name string) (interface{}, error) {
return c.localClusterWorkspaceShardLister.Cluster(cluster).Get(name)
})
default:
return fmt.Errorf("unsupported resource %v", keyParts[0])
Expand Down
Loading