Skip to content

Commit

Permalink
Amend apiexportendpointslice reconciliation to filter endpoints by pa…
Browse files Browse the repository at this point in the history
…rtition

Signed-off-by: Frederic Giloux <fgiloux@redhat.com>
  • Loading branch information
fgiloux committed Dec 15, 2022
1 parent 917c3cc commit dc435b3
Show file tree
Hide file tree
Showing 6 changed files with 278 additions and 38 deletions.
9 changes: 9 additions & 0 deletions pkg/apis/apis/v1alpha1/types_apiexportendpointslice.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,16 @@ func (in *APIExportEndpointSlice) SetConditions(conditions conditionsv1alpha1.Co
// These are valid conditions of APIExportEndpointSlice in addition to
// APIExportValid and related reasons defined with the APIBinding type.
const (
// PartitionValid is a condition for APIExportEndpointSlice that reflects the validity of the referenced Partition.
PartitionValid conditionsv1alpha1.ConditionType = "PartitionValid"

APIExportEndpointSliceURLsReady conditionsv1alpha1.ConditionType = "EndpointURLsReady"

// PartitionInvalidReferenceReason is a reason for the PartitionValid condition of APIExportEndpointSlice that the
// Partition reference is invalid.
PartitionInvalidReferenceReason = "PartitionInvalidReference"
// PartitionNotFoundReason is a reason for the PartitionValid condition that the referenced Partition is not found.
PartitionNotFoundReason = "PartitionNotFound"
)

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1"
topologyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/topology/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
apisv1alpha1client "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/typed/apis/v1alpha1"
apisinformers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/apis/v1alpha1"
Expand All @@ -55,6 +56,7 @@ func NewController(
apiExportEndpointSliceClusterInformer apisinformers.APIExportEndpointSliceClusterInformer,
clusterWorkspaceShardClusterInformer tenancyinformers.ClusterWorkspaceShardClusterInformer,
apiExportClusterInformer apisinformers.APIExportClusterInformer,
partitionClusterInformer topoplogyinformers.PartitionClusterInformer,
kcpClusterClient kcpclientset.ClusterInterface,
) (*controller, error) {
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName)
Expand All @@ -65,15 +67,22 @@ func NewController(
listAPIExportEndpointSlices: func() ([]*apisv1alpha1.APIExportEndpointSlice, error) {
return apiExportEndpointSliceClusterInformer.Lister().List(labels.Everything())
},
listClusterWorkspaceShards: func() ([]*tenancyv1alpha1.ClusterWorkspaceShard, error) {
return clusterWorkspaceShardClusterInformer.Lister().List(labels.Everything())
listClusterWorkspaceShards: func(selector labels.Selector) ([]*tenancyv1alpha1.ClusterWorkspaceShard, error) {
return clusterWorkspaceShardClusterInformer.Lister().List(selector)
},
getAPIExportEndpointSlice: func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIExportEndpointSlice, error) {
return apiExportEndpointSliceClusterInformer.Lister().Cluster(clusterName).Get(name)
},
getAPIExport: func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIExport, error) {
return apiExportClusterInformer.Lister().Cluster(clusterName).Get(name)
},
getPartition: func(clusterName logicalcluster.Name, name string) (*topologyv1alpha1.Partition, error) {
return partitionClusterInformer.Lister().Cluster(clusterName).Get(name)
},
getAPIExportEndpointSlicesByPartition: func(key string) ([]interface{}, error) {
return apiExportEndpointSliceClusterInformer.Informer().GetIndexer().ByIndex(indexAPIExportEndpointSlicesByPartition, key)
},

commit: committer.NewCommitter[*APIExportEndpointSlice, Patcher, *APIExportEndpointSliceSpec, *APIExportEndpointSliceStatus](kcpClusterClient.ApisV1alpha1().APIExportEndpointSlices()),
}

Expand Down Expand Up @@ -103,6 +112,25 @@ func NewController(
},
)

partitionClusterInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.enqueuePartition(obj)
},
UpdateFunc: func(_, newObj interface{}) {
c.enqueuePartition(newObj)
},
DeleteFunc: func(obj interface{}) {
c.enqueuePartition(obj)
},
},
)

if err := apiExportEndpointSliceClusterInformer.Informer().AddIndexers(cache.Indexers{
indexAPIExportEndpointSlicesByPartition: indexAPIExportEndpointSlicesByPartitionFunc,
}); err != nil {
return nil, fmt.Errorf("error adding partition indexes for APIExportEndpointSlices: %w", err)
}
return c, nil
}

Expand All @@ -114,17 +142,19 @@ type Resource = committer.Resource[*APIExportEndpointSliceSpec, *APIExportEndpoi
type CommitFunc = func(context.Context, *Resource, *Resource) error

// controller reconciles APIExportEndpointSlices. It ensures that the shard endpoints are populated
// in the status of every APIExportEndpointSlices.
// in the status of every APIExportEndpointSlices.
type controller struct {
queue workqueue.RateLimitingInterface

// kcpClusterClient is cluster aware and used to communicate with kcp API servers
kcpClusterClient kcpclientset.ClusterInterface

listClusterWorkspaceShards func() ([]*tenancyv1alpha1.ClusterWorkspaceShard, error)
listAPIExportEndpointSlices func() ([]*apisv1alpha1.APIExportEndpointSlice, error)
getAPIExportEndpointSlice func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIExportEndpointSlice, error)
getAPIExport func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIExport, error)
listClusterWorkspaceShards func(selector labels.Selector) ([]*tenancyv1alpha1.ClusterWorkspaceShard, error)
listAPIExportEndpointSlices func() ([]*apisv1alpha1.APIExportEndpointSlice, error)
getAPIExportEndpointSlice func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIExportEndpointSlice, error)
getAPIExport func(clusterName logicalcluster.Name, name string) (*apisv1alpha1.APIExport, error)
getPartition func(clusterName logicalcluster.Name, name string) (*topologyv1alpha1.Partition, error)
getAPIExportEndpointSlicesByPartition func(key string) ([]interface{}, error)

commit CommitFunc
}
Expand Down Expand Up @@ -163,6 +193,32 @@ func (c *controller) enqueueAllAPIExportEndpointSlices(clusterWorkspaceShard int
}
}

// enqueuePartition maps a Partition to APIExportEndpointSlices for enqueuing
func (c *controller) enqueuePartition(obj interface{}) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}

objectsForPartition, err := c.getAPIExportEndpointSlicesByPartition(key)
if err != nil {
runtime.HandleError(err)
return
}

logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), obj.(*topologyv1alpha1.Partition))
logging.WithQueueKey(logger, key).V(2).Info("queuing APIExportEndpointSlices because Partition changed")
for _, obj := range objectsForPartition {
slice, ok := obj.(*apisv1alpha1.APIExportEndpointSlice)
if !ok {
runtime.HandleError(fmt.Errorf("obj is supposed to be an APIExportEndpointSlice, but is %T", obj))
return
}
c.enqueueAPIExportEndpointSlice(slice)
}
}

// Start starts the controller, which stops when ctx.Done() is closed.
func (c *controller) Start(ctx context.Context, numThreads int) {
defer runtime.HandleCrash()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,20 @@ import (

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
tenancyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/tenancy/v1alpha1"
conditionsv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/apis/conditions/v1alpha1"
"github.com/kcp-dev/kcp/pkg/apis/third_party/conditions/util/conditions"
topologyv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/topology/v1alpha1"
)

func TestReconcile(t *testing.T) {
tests := map[string]struct {
keyMissing bool
apiExportMissing bool
partitionMissing bool
apiExportHasInvalidRef bool
listClusterWorkspaceShardsError error
errorReason string
Expand All @@ -47,27 +50,38 @@ func TestReconcile(t *testing.T) {
wantAPIExportEndpointSliceURLsError bool
wantAPIExportEndpointSliceURLsReady bool
wantAPIExportValid bool
wantPartitionValid bool
wantAPIExportNotValid bool
wantPartitionNotValid bool
}{
"error listing clusterworkspaceshards": {
listClusterWorkspaceShardsError: errors.New("foo"),
wantError: true,
wantAPIExportEndpointSliceURLsError: true,
},
"APIExportValid set to false when apiExport is missing": {
"APIExportValid set to false when APIExport is missing": {
apiExportMissing: true,
errorReason: apisv1alpha1.APIExportNotFoundReason,
wantAPIExportNotValid: true,
},
"APIExportValid set to false when an invalid reference to an apiExport is provided": {
apiExportHasInvalidRef: true,
wantError: true,
errorReason: apisv1alpha1.InternalErrorReason,
wantAPIExportNotValid: true,
"APIExportValid set to false when an invalid reference to an APIExport is provided": {
apiExportHasInvalidRef: true,
wantError: true,
errorReason: apisv1alpha1.InternalErrorReason,
wantAPIExportNotValid: true,
wantAPIExportEndpointSliceURLsError: true,
},
"PartitionValid set to false when the Partition is missing": {
partitionMissing: true,
wantError: true,
errorReason: apisv1alpha1.PartitionInvalidReferenceReason,
wantPartitionNotValid: true,
wantAPIExportEndpointSliceURLsError: true,
},
"APIExportEndpointSliceURLs set when no issue": {
wantAPIExportEndpointSliceURLsReady: true,
wantAPIExportValid: true,
wantPartitionValid: true,
},
}

Expand All @@ -78,7 +92,7 @@ func TestReconcile(t *testing.T) {

c := &controller{

listClusterWorkspaceShards: func() ([]*tenancyv1alpha1.ClusterWorkspaceShard, error) {
listClusterWorkspaceShards: func(selector labels.Selector) ([]*tenancyv1alpha1.ClusterWorkspaceShard, error) {
if tc.listClusterWorkspaceShardsError != nil {
return nil, tc.listClusterWorkspaceShardsError
}
Expand All @@ -89,21 +103,13 @@ func TestReconcile(t *testing.T) {
Annotations: map[string]string{
logicalcluster.AnnotationKey: "root:org:ws",
},
Name: "shard1",
},
Spec: tenancyv1alpha1.ClusterWorkspaceShardSpec{
ExternalURL: "https://server-1.kcp.dev/",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
logicalcluster.AnnotationKey: "root:org:ws",
Labels: map[string]string{
"region": "Europe",
},
Name: "shard2",
Name: "shard1",
},
Spec: tenancyv1alpha1.ClusterWorkspaceShardSpec{
ExternalURL: "https://server-2.kcp.dev/",
VirtualWorkspaceURL: "https://server-1.kcp.dev/",
},
},
}, nil
Expand All @@ -124,6 +130,27 @@ func TestReconcile(t *testing.T) {
}, nil
}
},
getPartition: func(clusterName logicalcluster.Name, name string) (*topologyv1alpha1.Partition, error) {
if tc.partitionMissing {
return nil, apierrors.NewNotFound(topologyv1alpha1.Resource("Partition"), name)
} else {
return &topologyv1alpha1.Partition{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
logicalcluster.AnnotationKey: "root:org:ws",
},
Name: "my-partition",
},
Spec: topologyv1alpha1.PartitionSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"region": "Europe",
},
},
},
}, nil
}
},
}

apiExportEndpointSlice := &apisv1alpha1.APIExportEndpointSlice{
Expand All @@ -140,6 +167,7 @@ func TestReconcile(t *testing.T) {
ExportName: "my-export",
},
},
Partition: "my-partition",
},
}
err := c.reconcile(context.Background(), apiExportEndpointSlice)
Expand All @@ -162,6 +190,9 @@ func TestReconcile(t *testing.T) {

if tc.wantAPIExportEndpointSliceURLsReady {
requireConditionMatches(t, apiExportEndpointSlice, conditions.TrueCondition(apisv1alpha1.APIExportEndpointSliceURLsReady))
require.Equal(t, []apisv1alpha1.APIExportEndpoint{
{URL: "https://server-1.kcp.dev/services/apiexport/root:org:ws/my-export"},
}, apiExportEndpointSlice.Status.APIExportEndpoints)
}

if tc.wantAPIExportNotValid {
Expand All @@ -175,11 +206,28 @@ func TestReconcile(t *testing.T) {
)
}

if tc.wantPartitionNotValid {
requireConditionMatches(t, apiExportEndpointSlice,
conditions.FalseCondition(
apisv1alpha1.PartitionValid,
tc.errorReason,
conditionsv1alpha1.ConditionSeverityError,
"",
),
)
}

if tc.wantAPIExportValid {
requireConditionMatches(t, apiExportEndpointSlice,
conditions.TrueCondition(apisv1alpha1.APIExportValid),
)
}

if tc.wantPartitionValid {
requireConditionMatches(t, apiExportEndpointSlice,
conditions.TrueCondition(apisv1alpha1.PartitionValid),
)
}
})
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
Copyright 2022 The KCP Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package apiexportendpointslice

import (
"fmt"

"github.com/kcp-dev/logicalcluster/v2"

apisv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/apis/v1alpha1"
"github.com/kcp-dev/kcp/pkg/client"
)

const indexAPIExportEndpointSlicesByPartition = "indexAPIExportEndpointSlicesByPartition"

// indexAPIExportEndpointSlicesByPartitionFunc is an index function that maps a Partition to the key for its
// spec.partition.
func indexAPIExportEndpointSlicesByPartitionFunc(obj interface{}) ([]string, error) {
slice, ok := obj.(*apisv1alpha1.APIExportEndpointSlice)
if !ok {
return []string{}, fmt.Errorf("obj is supposed to be an APIExportEndpointSlice, but is %T", obj)
}

if slice.Spec.Partition != "" {
clusterName := logicalcluster.From(slice)
if !ok {
// this will never happen due to validation
return []string{}, fmt.Errorf("cluster information missing")
}
key := client.ToClusterAwareKey(clusterName, slice.Spec.Partition)
return []string{key}, nil
}

return []string{}, nil
}
Loading

0 comments on commit dc435b3

Please sign in to comment.