Skip to content

Commit

Permalink
First reconciliation of APIExportEndpointSlices, no filtering
Browse files Browse the repository at this point in the history
Signed-off-by: Frederic Giloux <fgiloux@redhat.com>
  • Loading branch information
fgiloux committed Dec 1, 2022
1 parent a2973eb commit 4868e4b
Show file tree
Hide file tree
Showing 8 changed files with 676 additions and 4 deletions.
5 changes: 3 additions & 2 deletions config/crds/apis.kcp.dev_apiexports.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,9 @@ spec:
APIExport. This value is immutable as soon as it is set.
type: string
virtualWorkspaces:
description: virtualWorkspaces contains all APIExport virtual workspace
URLs. This field will get deprecated in favor of APIExportEndpointSlice.
description: "virtualWorkspaces contains all APIExport virtual workspace
URLs. \n Deprecated: use APIExportEndpointSlice.status.endpoints
instead"
items:
properties:
url:
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/apis/v1alpha1/types_apiexport.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ type APIExportStatus struct {
Conditions conditionsv1alpha1.Conditions `json:"conditions,omitempty"`

// virtualWorkspaces contains all APIExport virtual workspace URLs.
// This field will get deprecated in favor of APIExportEndpointSlice.
//
// Deprecated: use APIExportEndpointSlice.status.endpoints instead
//
// +optional
VirtualWorkspaces []VirtualWorkspace `json:"virtualWorkspaces,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
/*
Copyright 2022 The KCP Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package apiexportendpointslice

import (
"context"
"fmt"
"time"

kcpcache "github.com/kcp-dev/apimachinery/pkg/cache"
"github.com/kcp-dev/logicalcluster/v2"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
apisv1alpha1client "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/typed/apis/v1alpha1"
apisinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/apis/v1alpha1"
tenancyinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/tenancy/v1alpha1"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/reconciler/committer"
)

const (
ControllerName = "kcp-apiexportendpointslice"

DefaultIdentitySecretNamespace = "kcp-system"
)

// NewController returns a new controller for APIExportEndpointSlices.
// ClusterWorkspaceShards and APIExports are read from the cache server
func NewController(
apiExportEndpointSliceClusterInformer apisinformers.APIExportEndpointSliceClusterInformer,
clusterWorkspaceShardClusterInformer tenancyinformers.ClusterWorkspaceShardClusterInformer,
apiExportClusterInformer apisinformers.APIExportClusterInformer,
kcpClusterClient kcpclientset.ClusterInterface,
) (*controller, error) {
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName)

c := &controller{
queue: queue,
kcpClusterClient: kcpClusterClient,
listAPIExportEndpointSlices: func() ([]*apisv1alpha1.APIExportEndpointSlice, error) {
return apiExportEndpointSliceClusterInformer.Lister().List(labels.Everything())
},
listClusterWorkspaceShards: func() ([]*tenancyv1alpha1.ClusterWorkspaceShard, error) {
return clusterWorkspaceShardClusterInformer.Lister().List(labels.Everything())
},
getAPIExportEndpointSlice: func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIExportEndpointSlice, error) {
return apiExportEndpointSliceClusterInformer.Lister().Cluster(clusterName).Get(name)
},
getAPIExport: func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIExport, error) {
return apiExportClusterInformer.Lister().Cluster(clusterName).Get(name)
},
commit: committer.NewCommitter[*APIExportEndpointSlice, Patcher, *APIExportEndpointSliceSpec, *APIExportEndpointSliceStatus](kcpClusterClient.ApisV1alpha1().APIExportEndpointSlices()),
}

apiExportEndpointSliceClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueueAPIExportEndpointSlice(obj)
},
UpdateFunc: func(_, newObj interface{}) {
c.enqueueAPIExportEndpointSlice(newObj)
},
DeleteFunc: func(obj interface{}) {
c.enqueueAPIExportEndpointSlice(obj)
},
})

clusterWorkspaceShardClusterInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueueAllAPIExportEndpointSlices(obj)
},
UpdateFunc: func(_, newObj interface{}) {
c.enqueueAllAPIExportEndpointSlices(newObj)
},
DeleteFunc: func(obj interface{}) {
c.enqueueAllAPIExportEndpointSlices(obj)
},
},
)

return c, nil
}

type APIExportEndpointSlice = apisv1alpha1.APIExportEndpointSlice
type APIExportEndpointSliceSpec = apisv1alpha1.APIExportEndpointSliceSpec
type APIExportEndpointSliceStatus = apisv1alpha1.APIExportEndpointSliceStatus
type Patcher = apisv1alpha1client.APIExportEndpointSliceInterface
type Resource = committer.Resource[*APIExportEndpointSliceSpec, *APIExportEndpointSliceStatus]
type CommitFunc = func(context.Context, *Resource, *Resource) error

// controller reconciles APIExportEndpointSlices. It ensures that the shard endpoints are populated
// in the status of every APIExportEndpointSlices.
type controller struct {
queue workqueue.RateLimitingInterface

// kcpClusterClient is cluster aware and used to communicate with kcp API servers
kcpClusterClient kcpclientset.ClusterInterface

listClusterWorkspaceShards func() ([]*tenancyv1alpha1.ClusterWorkspaceShard, error)
listAPIExportEndpointSlices func() ([]*apisv1alpha1.APIExportEndpointSlice, error)
getAPIExportEndpointSlice func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIExportEndpointSlice, error)
getAPIExport func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIExport, error)

commit CommitFunc
}

// enqueueAPIExportEndpointSlice enqueues an APIExportEndpointSlice.
func (c *controller) enqueueAPIExportEndpointSlice(obj interface{}) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}

logger := logging.WithQueueKey(logging.WithReconciler(klog.Background(), ControllerName), key)
logger.V(4).Info("queueing APIExportEndpointSlice")
c.queue.Add(key)
}

// enqueueAllAPIExportEndpointSlices enqueues all APIExportEndpointSlices.
func (c *controller) enqueueAllAPIExportEndpointSlices(clusterWorkspaceShard interface{}) {
list, err := c.listAPIExportEndpointSlices()
if err != nil {
runtime.HandleError(err)
return
}

logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), clusterWorkspaceShard.(*tenancyv1alpha1.ClusterWorkspaceShard))
for i := range list {
key, err := kcpcache.MetaClusterNamespaceKeyFunc(list[i])
if err != nil {
runtime.HandleError(err)
continue
}

logging.WithQueueKey(logger, key).V(2).Info("queuing APIExportEndpointSlice because ClusterWorkspaceShard changed")
c.queue.Add(key)
}
}

// Start starts the controller, which stops when ctx.Done() is closed.
func (c *controller) Start(ctx context.Context, numThreads int) {
defer runtime.HandleCrash()
defer c.queue.ShutDown()

logger := logging.WithReconciler(klog.FromContext(ctx), ControllerName)
ctx = klog.NewContext(ctx, logger)
logger.Info("Starting controller")
defer logger.Info("Shutting down controller")

for i := 0; i < numThreads; i++ {
go wait.UntilWithContext(ctx, c.startWorker, time.Second)
}

<-ctx.Done()
}

func (c *controller) startWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}

func (c *controller) processNextWorkItem(ctx context.Context) bool {
// Wait until there is a new item in the working queue
k, quit := c.queue.Get()
if quit {
return false
}
key := k.(string)

logger := logging.WithQueueKey(klog.FromContext(ctx), key)
ctx = klog.NewContext(ctx, logger)
logger.V(4).Info("processing key")

// No matter what, tell the queue we're done with this key, to unblock
// other workers.
defer c.queue.Done(key)

if err := c.process(ctx, key); err != nil {
runtime.HandleError(fmt.Errorf("%q controller failed to sync %q, err: %w", ControllerName, key, err))
c.queue.AddRateLimited(key)
return true
}
c.queue.Forget(key)
return true
}

func (c *controller) process(ctx context.Context, key string) error {
clusterName, _, name, err := kcpcache.SplitMetaClusterNamespaceKey(key)
if err != nil {
runtime.HandleError(err)
return nil
}
obj, err := c.getAPIExportEndpointSlice(clusterName, name)
if err != nil {
if errors.IsNotFound(err) {
return nil // object deleted before we handled it
}
return err
}

old := obj
obj = obj.DeepCopy()

logger := logging.WithObject(klog.FromContext(ctx), obj)
ctx = klog.NewContext(ctx, logger)

var errs []error
if err := c.reconcile(ctx, obj); err != nil {
errs = append(errs, err)
}

// Regardless of whether reconcile returned an error or not, always try to patch status if needed. Return the
// reconciliation error at the end.

// If the object being reconciled changed as a result, update it.
oldResource := &Resource{ObjectMeta: old.ObjectMeta, Spec: &old.Spec, Status: &old.Status}
newResource := &Resource{ObjectMeta: obj.ObjectMeta, Spec: &obj.Spec, Status: &obj.Status}
if err := c.commit(ctx, oldResource, newResource); err != nil {
errs = append(errs, err)
}

return utilerrors.NewAggregate(errs)
}
Loading

0 comments on commit 4868e4b

Please sign in to comment.