Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add NodeRegistrationHealthy status condition to nodepool #1969

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions kwok/charts/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,12 @@ spec:
- type
type: object
type: array
nodeClassObservedGeneration:
description: |-
NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match
the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
format: int64
type: integer
resources:
additionalProperties:
anyOf:
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,12 @@ spec:
- type
type: object
type: array
nodeClassObservedGeneration:
description: |-
NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match
the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
format: int64
type: integer
resources:
additionalProperties:
anyOf:
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/v1/nodepool_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@ const (
ConditionTypeValidationSucceeded = "ValidationSucceeded"
// ConditionTypeNodeClassReady = "NodeClassReady" condition indicates that underlying nodeClass was resolved and is reporting as Ready
ConditionTypeNodeClassReady = "NodeClassReady"
// ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy" condition indicates if a misconfiguration exists that is preventing successful node launch/registrations that requires manual investigation
ConditionTypeNodeRegistrationHealthy = "NodeRegistrationHealthy"
)

// NodePoolStatus defines the observed state of NodePool
type NodePoolStatus struct {
// Resources is the list of resources that have been provisioned.
// +optional
Resources v1.ResourceList `json:"resources,omitempty"`
// NodeClassObservedGeneration represents the observed nodeClass generation for referenced nodeClass. If this does not match
// the actual NodeClass Generation, NodeRegistrationHealthy status condition on the NodePool will be reset
// +optional
NodeClassObservedGeneration int64 `json:"nodeClassObservedGeneration,omitempty"`
// Conditions contains signals for health and readiness
// +optional
Conditions []status.Condition `json:"conditions,omitempty"`
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
nodepoolcounter "sigs.k8s.io/karpenter/pkg/controllers/nodepool/counter"
nodepoolhash "sigs.k8s.io/karpenter/pkg/controllers/nodepool/hash"
nodepoolreadiness "sigs.k8s.io/karpenter/pkg/controllers/nodepool/readiness"
nodepoolregistrationhealth "sigs.k8s.io/karpenter/pkg/controllers/nodepool/registrationhealth"
nodepoolvalidation "sigs.k8s.io/karpenter/pkg/controllers/nodepool/validation"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
"sigs.k8s.io/karpenter/pkg/controllers/state"
Expand Down Expand Up @@ -88,6 +89,7 @@ func NewControllers(
metricsnodepool.NewController(kubeClient, cloudProvider),
metricsnode.NewController(cluster),
nodepoolreadiness.NewController(kubeClient, cloudProvider),
nodepoolregistrationhealth.NewController(kubeClient, cloudProvider),
nodepoolcounter.NewController(kubeClient, cloudProvider, cluster),
nodepoolvalidation.NewController(kubeClient, cloudProvider),
podevents.NewController(clock, kubeClient, cloudProvider),
Expand Down
43 changes: 41 additions & 2 deletions pkg/controllers/nodeclaim/lifecycle/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ import (
"context"
"time"

"k8s.io/apimachinery/pkg/api/errors"

"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/controller-runtime/pkg/log"

"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
Expand Down Expand Up @@ -51,6 +56,12 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco
if ttl := registrationTTL - l.clock.Since(registered.LastTransitionTime.Time); ttl > 0 {
return reconcile.Result{RequeueAfter: ttl}, nil
}
if err := l.updateNodePoolRegistrationHealth(ctx, nodeClaim); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
// Delete the NodeClaim if we believe the NodeClaim won't register since we haven't seen the node
if err := l.kubeClient.Delete(ctx, nodeClaim); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
Expand All @@ -61,6 +72,34 @@ func (l *Liveness) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (reco
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
metrics.CapacityTypeLabel: nodeClaim.Labels[v1.CapacityTypeLabelKey],
})

return reconcile.Result{}, nil
}

// updateNodePoolRegistrationHealth sets the NodeRegistrationHealthy=False
// on the NodePool if the nodeClaim fails to launch/register
func (l *Liveness) updateNodePoolRegistrationHealth(ctx context.Context, nodeClaim *v1.NodeClaim) error {
nodePoolName := nodeClaim.Labels[v1.NodePoolLabelKey]
if nodePoolName != "" {
nodePool := &v1.NodePool{}
if err := l.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil {
return client.IgnoreNotFound(err)
}
if nodePool.StatusConditions().Get(v1.ConditionTypeNodeRegistrationHealthy).IsUnknown() {
stored := nodePool.DeepCopy()
// If the nodeClaim failed to register during the TTL set NodeRegistrationHealthy status condition on
// NodePool to False. If the launch failed get the launch failure reason and message from nodeClaim.
if launchCondition := nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched); launchCondition.IsTrue() {
nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "RegistrationFailed", "Failed to register node")
} else {
nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, launchCondition.Reason, launchCondition.Message)
}
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err := l.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil {
return err
}
}
}
return nil
}
61 changes: 61 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package lifecycle_test
import (
"time"

"github.com/awslabs/operatorpkg/status"

operatorpkg "github.com/awslabs/operatorpkg/test/expectations"
. "github.com/onsi/ginkgo/v2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -78,6 +81,12 @@ var _ = Describe("Liveness", func() {
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
if isManagedNodeClaim {
ExpectNotFound(ctx, env.Client, nodeClaim)
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionFalse,
Reason: "RegistrationFailed",
Message: "Failed to register node",
})
} else {
ExpectExists(ctx, env.Client, nodeClaim)
}
Expand Down Expand Up @@ -138,6 +147,58 @@ var _ = Describe("Liveness", func() {
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

// If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim
fakeClock.Step(time.Minute * 20)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionFalse,
Reason: nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Reason,
Message: nodeClaim.StatusConditions().Get(v1.ConditionTypeLaunched).Message,
})
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
ExpectNotFound(ctx, env.Client, nodeClaim)
})
It("should not update NodeRegistrationHealthy status condition if it is already set to True", func() {
nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy)
nodeClaim := test.NodeClaim(v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
},
},
Spec: v1.NodeClaimSpec{
Resources: v1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("50Mi"),
corev1.ResourcePods: resource.MustParse("5"),
fake.ResourceGPUVendorA: resource.MustParse("1"),
},
},
},
})
cloudProvider.AllowedCreateCalls = 0 // Don't allow Create() calls to succeed
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

// If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim
fakeClock.Step(time.Minute * 20)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)

// NodeClaim registration failed, but we should not update the NodeRegistrationHealthy status condition if it is already True
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{Type: v1.ConditionTypeNodeRegistrationHealthy, Status: metav1.ConditionTrue})
ExpectFinalizersRemoved(ctx, env.Client, nodeClaim)
ExpectNotFound(ctx, env.Client, nodeClaim)
})
It("should not block on updating NodeRegistrationHealthy status condition if nodeClaim is not owned by a nodePool", func() {
nodeClaim := test.NodeClaim()
cloudProvider.AllowedCreateCalls = 0 // Don't allow Create() calls to succeed
ExpectApplied(ctx, env.Client, nodeClaim)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

// If the node hasn't registered in the registration timeframe, then we deprovision the nodeClaim
fakeClock.Step(time.Minute * 20)
_ = ExpectObjectReconcileFailed(ctx, env.Client, nodeClaimController, nodeClaim)
Expand Down
30 changes: 30 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/types"

"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -83,9 +85,37 @@ func (r *Registration) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (
metrics.NodesCreatedTotal.Inc(map[string]string{
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
})
if err := r.updateNodePoolRegistrationHealth(ctx, nodeClaim); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}

// updateNodePoolRegistrationHealth sets the NodeRegistrationHealthy=True
// on the NodePool if the nodeClaim that registered is owned by a NodePool
func (r *Registration) updateNodePoolRegistrationHealth(ctx context.Context, nodeClaim *v1.NodeClaim) error {
nodePoolName := nodeClaim.Labels[v1.NodePoolLabelKey]
if nodePoolName != "" {
nodePool := &v1.NodePool{}
if err := r.kubeClient.Get(ctx, types.NamespacedName{Name: nodePoolName}, nodePool); err != nil {
return client.IgnoreNotFound(err)
}
storedNodePool := nodePool.DeepCopy()
if nodePool.StatusConditions().SetTrue(v1.ConditionTypeNodeRegistrationHealthy) {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err := r.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(storedNodePool, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil {
return err
}
}
}
return nil
}

func (r *Registration) syncNode(ctx context.Context, nodeClaim *v1.NodeClaim, node *corev1.Node) error {
stored := node.DeepCopy()
controllerutil.AddFinalizer(node, v1.TerminationFinalizer)
Expand Down
49 changes: 49 additions & 0 deletions pkg/controllers/nodeclaim/lifecycle/registration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ limitations under the License.
package lifecycle_test

import (
"time"

"github.com/awslabs/operatorpkg/status"
operatorpkg "github.com/awslabs/operatorpkg/test/expectations"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -55,6 +59,7 @@ var _ = Describe("Registration", func() {
})
}
nodeClaim := test.NodeClaim(nodeClaimOpts...)
nodePool.StatusConditions().SetUnknown(v1.ConditionTypeNodeRegistrationHealthy)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expand All @@ -67,6 +72,10 @@ var _ = Describe("Registration", func() {
if isManagedNodeClaim {
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsTrue()).To(BeTrue())
Expect(nodeClaim.Status.NodeName).To(Equal(node.Name))
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionTrue,
})
} else {
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsUnknown()).To(BeTrue())
Expect(nodeClaim.Status.NodeName).To(Equal(""))
Expand Down Expand Up @@ -380,4 +389,44 @@ var _ = Describe("Registration", func() {
node = ExpectExists(ctx, env.Client, node)
Expect(node.Spec.Taints).To(HaveLen(0))
})
It("should add NodeRegistrationHealthy=true on the nodePool if registration succeeds and if it was previously false", func() {
nodeClaimOpts := []v1.NodeClaim{{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1.NodePoolLabelKey: nodePool.Name,
},
},
}}
nodeClaim := test.NodeClaim(nodeClaimOpts...)
nodePool.StatusConditions().SetFalse(v1.ConditionTypeNodeRegistrationHealthy, "unhealthy", "unhealthy")
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

node := test.Node(test.NodeOptions{ProviderID: nodeClaim.Status.ProviderID, Taints: []corev1.Taint{v1.UnregisteredNoExecuteTaint}})
ExpectApplied(ctx, env.Client, node)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsTrue()).To(BeTrue())
Expect(nodeClaim.Status.NodeName).To(Equal(node.Name))
operatorpkg.ExpectStatusConditions(ctx, env.Client, 1*time.Minute, nodePool, status.Condition{
Type: v1.ConditionTypeNodeRegistrationHealthy,
Status: metav1.ConditionTrue,
})
})
It("should not block on updating NodeRegistrationHealthy status condition if nodeClaim is not owned by a nodePool", func() {
nodeClaim := test.NodeClaim()
ExpectApplied(ctx, env.Client, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)

node := test.Node(test.NodeOptions{ProviderID: nodeClaim.Status.ProviderID, Taints: []corev1.Taint{v1.UnregisteredNoExecuteTaint}})
ExpectApplied(ctx, env.Client, node)
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)

nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeRegistered).IsTrue()).To(BeTrue())
Expect(nodeClaim.Status.NodeName).To(Equal(node.Name))
})
})
11 changes: 2 additions & 9 deletions pkg/controllers/nodepool/readiness/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package readiness
import (
"context"

"github.com/awslabs/operatorpkg/object"
"github.com/awslabs/operatorpkg/status"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -55,15 +53,10 @@ func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reco
ctx = injection.WithControllerName(ctx, "nodepool.readiness")
stored := nodePool.DeepCopy()

nodeClass, ok := lo.Find(c.cloudProvider.GetSupportedNodeClasses(), func(nc status.Object) bool {
return object.GVK(nc).GroupKind() == nodePool.Spec.Template.Spec.NodeClassRef.GroupKind()
})
if !ok {
// Ignore NodePools which aren't using a supported NodeClass.
nodeClass, err := nodepoolutils.GetNodeClass(ctx, c.kubeClient, nodePool, c.cloudProvider)
if nodeClass == nil {
return reconcile.Result{}, nil
}

err := c.kubeClient.Get(ctx, client.ObjectKey{Name: nodePool.Spec.Template.Spec.NodeClassRef.Name}, nodeClass)
if client.IgnoreNotFound(err) != nil {
return reconcile.Result{}, err
}
Expand Down
Loading