Skip to content

Commit

Permalink
feat: reserved capacity support (#1911)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal authored Feb 26, 2025
1 parent 82a7d80 commit a863104
Show file tree
Hide file tree
Showing 29 changed files with 1,392 additions and 267 deletions.
6 changes: 3 additions & 3 deletions designs/capacity-reservations.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ Karpenter doesn't currently support reasoning about this capacity type. Karpente
3. Karpenter should add logic to its scheduler to reason about this availability as an `int` -- ensuring that the scheduler never schedules more offerings of an instance type for a capacity type than are available
4. Karpenter should extend its CloudProvider [InstanceType](/~https://github.com/kubernetes-sigs/karpenter/blob/35d6197e38e64cd6abfef71a082aee80e38d09fd/pkg/cloudprovider/types.go#L75) struct to allow offerings to represent availability of an offering as an `int` rather than a `bool` -- allowing Cloud Providers to represent the constrained capacity of `reserved`
5. Karpenter should consolidate between `on-demand` and/or `spot` instance types to `reserved` when the capacity type is available
6. Karpenter should introduce a feature flag `FEATURE_FLAG=CapacityReservations` to gate this new feature in `ALPHA` when it's introduced
6. Karpenter should introduce a feature flag `FEATURE_FLAG=ReservedCapacity` to gate this new feature in `ALPHA` when it's introduced

### `karpenter.sh/capacity-type` API

_Note: Some excerpts taken from [`aws/karpenter-provider-aws` RFC](/~https://github.com/aws/karpenter-provider-aws/blob/main/designs/odcr.md#nodepool-api)._

This RFC proposes the addition of a new `karpenter.sh/capacity-type` label value, called `reserved`. A cluster admin could then select to support only launching reserved node capacity and falling back between reserved capacity to on-demand (or even spot) capacity respectively.
This RFC proposes the addition of a new `karpenter.sh/capacity-type` label value, called `reserved`. A cluster admin could then select to support only launching reserved node capacity and falling back between reserved capacity to on-demand (or even spot) capacity respectively.

_Note: This option requires any applications (pods) that are using node selection on `karpenter.sh/capacity-type: "on-demand"` to expand their selection to include `reserved` or to update it to perform a `NotIn` node affinity on `karpenter.sh/capacity-type: spot`_

Expand Down Expand Up @@ -140,4 +140,4 @@ In practice, this means that if a user has two capacity reservation offerings av

## Appendix

1. AWS Cloud Provider's RFC for On-Demand Capacity Reservations: /~https://github.com/aws/karpenter-provider-aws/blob/main/designs/odcr.md
1. AWS Cloud Provider's RFC for On-Demand Capacity Reservations: /~https://github.com/aws/karpenter-provider-aws/blob/main/designs/odcr.md
4 changes: 2 additions & 2 deletions kwok/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ func (c CloudProvider) toNode(nodeClaim *v1.NodeClaim) (*corev1.Node, error) {

availableOfferings := it.Offerings.Available().Compatible(requirements)

offeringsByPrice := lo.GroupBy(availableOfferings, func(of cloudprovider.Offering) float64 { return of.Price })
offeringsByPrice := lo.GroupBy(availableOfferings, func(of *cloudprovider.Offering) float64 { return of.Price })
minOfferingPrice := lo.Min(lo.Keys(offeringsByPrice))
if cheapestOffering == nil || minOfferingPrice < cheapestOffering.Price {
cheapestOffering = lo.ToPtr(lo.Sample(offeringsByPrice[minOfferingPrice]))
cheapestOffering = lo.Sample(offeringsByPrice[minOfferingPrice])
instanceType = it
}
}
Expand Down
7 changes: 4 additions & 3 deletions kwok/cloudprovider/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,14 @@ func newInstanceType(options InstanceTypeOptions) *cloudprovider.InstanceType {
return &cloudprovider.InstanceType{
Name: options.Name,
Requirements: requirements,
Offerings: lo.Map(options.Offerings, func(off KWOKOffering, _ int) cloudprovider.Offering {
return cloudprovider.Offering{
Offerings: lo.Map(options.Offerings, func(off KWOKOffering, _ int) *cloudprovider.Offering {
return &cloudprovider.Offering{
ReservationCapacity: off.ReservationCapacity,
Requirements: scheduling.NewRequirements(lo.Map(off.Requirements, func(req corev1.NodeSelectorRequirement, _ int) *scheduling.Requirement {
return scheduling.NewRequirement(req.Key, req.Operator, req.Values...)
})...),
Price: off.Offering.Price,
Available: off.Offering.Available,
Available: off.Available,
}
}),
Capacity: options.Resources,
Expand Down
2 changes: 1 addition & 1 deletion kwok/tools/gen_instance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func constructGenericInstanceTypes() []kwok.InstanceTypeOptions {
corev1.NodeSelectorRequirement{Key: corev1.LabelTopologyZone, Operator: corev1.NodeSelectorOpIn, Values: []string{zone}},
},
Offering: cloudprovider.Offering{
Price: lo.Ternary(ct == v1.CapacityTypeSpot, price*.7, price),
Available: true,
Price: lo.Ternary(ct == v1.CapacityTypeSpot, price*.7, price),
},
})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/v1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
ArchitectureArm64 = "arm64"
CapacityTypeSpot = "spot"
CapacityTypeOnDemand = "on-demand"
CapacityTypeReserved = "reserved"
)

// Karpenter specific domains and labels
Expand Down
43 changes: 35 additions & 8 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ import (
"sigs.k8s.io/karpenter/pkg/utils/resources"
)

func init() {
v1.WellKnownLabels = v1.WellKnownLabels.Insert(v1alpha1.LabelReservationID)
cloudprovider.ReservationIDLabel = v1alpha1.LabelReservationID
}

var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)

type CloudProvider struct {
Expand Down Expand Up @@ -104,6 +109,7 @@ func (c *CloudProvider) Reset() {
}
}

//nolint:gocyclo
func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1.NodeClaim, error) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -121,9 +127,16 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v
reqs := scheduling.NewNodeSelectorRequirementsWithMinValues(nodeClaim.Spec.Requirements...)
np := &v1.NodePool{ObjectMeta: metav1.ObjectMeta{Name: nodeClaim.Labels[v1.NodePoolLabelKey]}}
instanceTypes := lo.Filter(lo.Must(c.GetInstanceTypes(ctx, np)), func(i *cloudprovider.InstanceType, _ int) bool {
return reqs.IsCompatible(i.Requirements, scheduling.AllowUndefinedWellKnownLabels) &&
i.Offerings.Available().HasCompatible(reqs) &&
resources.Fits(nodeClaim.Spec.Resources.Requests, i.Allocatable())
if !reqs.IsCompatible(i.Requirements, scheduling.AllowUndefinedWellKnownLabels) {
return false
}
if !i.Offerings.Available().HasCompatible(reqs) {
return false
}
if !resources.Fits(nodeClaim.Spec.Resources.Requests, i.Allocatable()) {
return false
}
return true
})
// Order instance types so that we get the cheapest instance types of the available offerings
sort.Slice(instanceTypes, func(i, j int) bool {
Expand All @@ -139,14 +152,28 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v
labels[key] = requirement.Values()[0]
}
}
// Find Offering
for _, o := range instanceType.Offerings.Available() {
if reqs.IsCompatible(o.Requirements, scheduling.AllowUndefinedWellKnownLabels) {
labels[corev1.LabelTopologyZone] = o.Requirements.Get(corev1.LabelTopologyZone).Any()
labels[v1.CapacityTypeLabelKey] = o.Requirements.Get(v1.CapacityTypeLabelKey).Any()
// Find offering, prioritizing reserved instances
var offering *cloudprovider.Offering
offerings := instanceType.Offerings.Available().Compatible(reqs)
lo.Must0(len(offerings) != 0, "created nodeclaim with no available offerings")
for _, o := range offerings {
if o.CapacityType() == v1.CapacityTypeReserved {
o.ReservationCapacity -= 1
if o.ReservationCapacity == 0 {
o.Available = false
}
offering = o
break
}
}
if offering == nil {
offering = offerings[0]
}
// Propagate labels dictated by offering requirements - e.g. zone, capacity-type, and reservation-id
for _, req := range offering.Requirements {
labels[req.Key] = req.Any()
}

created := &v1.NodeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: nodeClaim.Name,
Expand Down
72 changes: 46 additions & 26 deletions pkg/cloudprovider/fake/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,27 +64,47 @@ func NewInstanceTypeWithCustomRequirement(options InstanceTypeOptions, customReq
options.Resources[corev1.ResourcePods] = resource.MustParse("5")
}
if len(options.Offerings) == 0 {
options.Offerings = []cloudprovider.Offering{
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-1",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-2",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-1",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-2",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-3",
}), Price: PriceFromResources(options.Resources), Available: true},
options.Offerings = []*cloudprovider.Offering{
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-1",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-2",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-1",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-2",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-3",
}),
Price: PriceFromResources(options.Resources),
},
}
}
if len(options.Architecture) == 0 {
Expand All @@ -97,10 +117,10 @@ func NewInstanceTypeWithCustomRequirement(options InstanceTypeOptions, customReq
scheduling.NewRequirement(corev1.LabelInstanceTypeStable, corev1.NodeSelectorOpIn, options.Name),
scheduling.NewRequirement(corev1.LabelArchStable, corev1.NodeSelectorOpIn, options.Architecture),
scheduling.NewRequirement(corev1.LabelOSStable, corev1.NodeSelectorOpIn, sets.List(options.OperatingSystems)...),
scheduling.NewRequirement(corev1.LabelTopologyZone, corev1.NodeSelectorOpIn, lo.Map(options.Offerings.Available(), func(o cloudprovider.Offering, _ int) string {
scheduling.NewRequirement(corev1.LabelTopologyZone, corev1.NodeSelectorOpIn, lo.Map(options.Offerings.Available(), func(o *cloudprovider.Offering, _ int) string {
return o.Requirements.Get(corev1.LabelTopologyZone).Any()
})...),
scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, lo.Map(options.Offerings.Available(), func(o cloudprovider.Offering, _ int) string {
scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, lo.Map(options.Offerings.Available(), func(o *cloudprovider.Offering, _ int) string {
return o.Requirements.Get(v1.CapacityTypeLabelKey).Any()
})...),
scheduling.NewRequirement(LabelInstanceSize, corev1.NodeSelectorOpDoesNotExist),
Expand Down Expand Up @@ -151,14 +171,14 @@ func InstanceTypesAssorted() []*cloudprovider.InstanceType {
},
}
price := PriceFromResources(opts.Resources)
opts.Offerings = []cloudprovider.Offering{
opts.Offerings = []*cloudprovider.Offering{
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: ct,
corev1.LabelTopologyZone: zone,
}),
Price: price,
Available: true,
Price: price,
},
}
instanceTypes = append(instanceTypes, NewInstanceType(opts))
Expand Down
67 changes: 40 additions & 27 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ import (
var (
SpotRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeSpot))
OnDemandRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeOnDemand))
ReservedRequirement = scheduling.NewRequirements(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeReserved))

// ReservationIDLabel is a label injected into a reserved offering's requirements which is used to uniquely identify a
// reservation. For example, a reservation could be shared across multiple NodePools, and the value encoded in this
// requirement is used to inform the scheduler that a reservation for one should affect the other.
ReservationIDLabel string
)

type DriftReason string
Expand Down Expand Up @@ -242,27 +248,38 @@ func (i InstanceTypeOverhead) Total() corev1.ResourceList {
// An Offering describes where an InstanceType is available to be used, with the expectation that its properties
// may be tightly coupled (e.g. the availability of an instance type in some zone is scoped to a capacity type) and
// these properties are captured with labels in Requirements.
// Requirements are required to contain the keys v1.CapacityTypeLabelKey and corev1.LabelTopologyZone
// Requirements are required to contain the keys v1.CapacityTypeLabelKey and corev1.LabelTopologyZone.
type Offering struct {
Requirements scheduling.Requirements
Price float64
// Available is added so that Offerings can return all offerings that have ever existed for an instance type,
// so we can get historical pricing data for calculating savings in consolidation
Available bool
Requirements scheduling.Requirements
Price float64
Available bool
ReservationCapacity int
}

func (o *Offering) CapacityType() string {
return o.Requirements.Get(v1.CapacityTypeLabelKey).Any()
}

func (o *Offering) Zone() string {
return o.Requirements.Get(corev1.LabelTopologyZone).Any()
}

func (o *Offering) ReservationID() string {
return o.Requirements.Get(ReservationIDLabel).Any()
}

type Offerings []Offering
type Offerings []*Offering

// Available filters the available offerings from the returned offerings
func (ofs Offerings) Available() Offerings {
return lo.Filter(ofs, func(o Offering, _ int) bool {
return lo.Filter(ofs, func(o *Offering, _ int) bool {
return o.Available
})
}

// Compatible returns the offerings based on the passed requirements
func (ofs Offerings) Compatible(reqs scheduling.Requirements) Offerings {
return lo.Filter(ofs, func(offering Offering, _ int) bool {
return lo.Filter(ofs, func(offering *Offering, _ int) bool {
return reqs.IsCompatible(offering.Requirements, scheduling.AllowUndefinedWellKnownLabels)
})
}
Expand All @@ -278,34 +295,30 @@ func (ofs Offerings) HasCompatible(reqs scheduling.Requirements) bool {
}

// Cheapest returns the cheapest offering from the returned offerings
func (ofs Offerings) Cheapest() Offering {
return lo.MinBy(ofs, func(a, b Offering) bool {
func (ofs Offerings) Cheapest() *Offering {
return lo.MinBy(ofs, func(a, b *Offering) bool {
return a.Price < b.Price
})
}

// MostExpensive returns the most expensive offering from the return offerings
func (ofs Offerings) MostExpensive() Offering {
return lo.MaxBy(ofs, func(a, b Offering) bool {
func (ofs Offerings) MostExpensive() *Offering {
return lo.MaxBy(ofs, func(a, b *Offering) bool {
return a.Price > b.Price
})
}

// WorstLaunchPrice gets the worst-case launch price from the offerings that are offered
// on an instance type. If the instance type has a spot offering available, then it uses the spot offering
// to get the launch price; else, it uses the on-demand launch price
// WorstLaunchPrice gets the worst-case launch price from the offerings that are offered on an instance type. Only
// offerings for the capacity type we will launch with are considered. The following precedence order is used to
// determine which capacity type is used: reserved, spot, on-demand.
func (ofs Offerings) WorstLaunchPrice(reqs scheduling.Requirements) float64 {
// We prefer to launch spot offerings, so we will get the worst price based on the node requirements
if reqs.Get(v1.CapacityTypeLabelKey).Has(v1.CapacityTypeSpot) {
spotOfferings := ofs.Compatible(reqs).Compatible(SpotRequirement)
if len(spotOfferings) > 0 {
return spotOfferings.MostExpensive().Price
}
}
if reqs.Get(v1.CapacityTypeLabelKey).Has(v1.CapacityTypeOnDemand) {
onDemandOfferings := ofs.Compatible(reqs).Compatible(OnDemandRequirement)
if len(onDemandOfferings) > 0 {
return onDemandOfferings.MostExpensive().Price
for _, ctReqs := range []scheduling.Requirements{
ReservedRequirement,
SpotRequirement,
OnDemandRequirement,
} {
if compatOfs := ofs.Compatible(reqs).Compatible(ctReqs); len(compatOfs) != 0 {
return compatOfs.MostExpensive().Price
}
}
return math.MaxFloat64
Expand Down
13 changes: 11 additions & 2 deletions pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
// We are consolidating a node from OD -> [OD,Spot] but have filtered the instance types by cost based on the
// assumption, that the spot variant will launch. We also need to add a requirement to the node to ensure that if
// spot capacity is insufficient we don't replace the node with a more expensive on-demand node. Instead the launch
// should fail and we'll just leave the node alone.
// should fail and we'll just leave the node alone. We don't need to do the same for reserved since the requirements
// are injected on by the scheduler.
ctReq := results.NewNodeClaims[0].Requirements.Get(v1.CapacityTypeLabelKey)
if ctReq.Has(v1.CapacityTypeSpot) && ctReq.Has(v1.CapacityTypeOnDemand) {
results.NewNodeClaims[0].Requirements.Add(scheduling.NewRequirement(v1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, v1.CapacityTypeSpot))
Expand Down Expand Up @@ -307,8 +308,16 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
func getCandidatePrices(candidates []*Candidate) (float64, error) {
var price float64
for _, c := range candidates {
compatibleOfferings := c.instanceType.Offerings.Compatible(scheduling.NewLabelRequirements(c.StateNode.Labels()))
reqs := scheduling.NewLabelRequirements(c.StateNode.Labels())
compatibleOfferings := c.instanceType.Offerings.Compatible(reqs)
if len(compatibleOfferings) == 0 {
// It's expected that offerings may no longer exist for capacity reservations once a NodeClass stops selecting on
// them (or they are no longer considered for some other reason on by the cloudprovider). By definition though,
// reserved capacity is free. By modeling it as free, consolidation won't be able to succeed, but the node should be
// disrupted via drift regardless.
if reqs.Get(v1.CapacityTypeLabelKey).Has(v1.CapacityTypeReserved) {
return 0.0, nil
}
return 0.0, fmt.Errorf("unable to determine offering for %s/%s/%s", c.instanceType.Name, c.capacityType, c.zone)
}
price += compatibleOfferings.Cheapest().Price
Expand Down
Loading

0 comments on commit a863104

Please sign in to comment.