diff --git a/go.mod b/go.mod index 732698964e..3509b5330b 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module sigs.k8s.io/karpenter -go 1.23.6 +go 1.24.0 require ( github.com/Pallinder/go-randomdata v1.2.0 diff --git a/kwok/cloudprovider/cloudprovider.go b/kwok/cloudprovider/cloudprovider.go index 75f2b816fc..5b0cc4c2ba 100644 --- a/kwok/cloudprovider/cloudprovider.go +++ b/kwok/cloudprovider/cloudprovider.go @@ -23,6 +23,7 @@ import ( "math/rand" "strings" + "github.com/awslabs/operatorpkg/option" "github.com/awslabs/operatorpkg/status" "github.com/docker/docker/pkg/namesgenerator" "github.com/samber/lo" @@ -109,7 +110,7 @@ func (c CloudProvider) List(ctx context.Context) ([]*v1.NodeClaim, error) { } // Return the hard-coded instance types. -func (c CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) { +func (c CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool, _ ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) { return c.instanceTypes, nil } diff --git a/kwok/cloudprovider/helpers.go b/kwok/cloudprovider/helpers.go index 764d6e268e..397eb51da3 100644 --- a/kwok/cloudprovider/helpers.go +++ b/kwok/cloudprovider/helpers.go @@ -186,11 +186,11 @@ func newInstanceType(options InstanceTypeOptions) *cloudprovider.InstanceType { Requirements: requirements, Offerings: lo.Map(options.Offerings, func(off KWOKOffering, _ int) cloudprovider.Offering { return cloudprovider.Offering{ + ReservationManager: off.Offering.ReservationManager, Requirements: scheduling.NewRequirements(lo.Map(off.Requirements, func(req corev1.NodeSelectorRequirement, _ int) *scheduling.Requirement { return scheduling.NewRequirement(req.Key, req.Operator, req.Values...) })...), - Price: off.Offering.Price, - Available: off.Offering.Available, + Price: off.Offering.Price, } }), Capacity: options.Resources, diff --git a/kwok/tools/gen_instance_types.go b/kwok/tools/gen_instance_types.go index fd4aa90c83..2a330605c4 100644 --- a/kwok/tools/gen_instance_types.go +++ b/kwok/tools/gen_instance_types.go @@ -98,8 +98,8 @@ func constructGenericInstanceTypes() []kwok.InstanceTypeOptions { corev1.NodeSelectorRequirement{Key: corev1.LabelTopologyZone, Operator: corev1.NodeSelectorOpIn, Values: []string{zone}}, }, Offering: cloudprovider.Offering{ - Price: lo.Ternary(ct == v1.CapacityTypeSpot, price*.7, price), Available: true, + Price: lo.Ternary(ct == v1.CapacityTypeSpot, price*.7, price), }, }) } diff --git a/pkg/apis/v1/labels.go b/pkg/apis/v1/labels.go index 9364936dfe..a00d67ecdc 100644 --- a/pkg/apis/v1/labels.go +++ b/pkg/apis/v1/labels.go @@ -33,6 +33,7 @@ const ( ArchitectureArm64 = "arm64" CapacityTypeSpot = "spot" CapacityTypeOnDemand = "on-demand" + CapacityTypeReserved = "reserved" ) // Karpenter specific domains and labels diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index 946a56ac3f..19b231b8ce 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/awslabs/operatorpkg/option" "github.com/awslabs/operatorpkg/status" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" @@ -40,6 +41,10 @@ import ( "sigs.k8s.io/karpenter/pkg/utils/resources" ) +func init() { + v1.WellKnownLabels = v1.WellKnownLabels.Insert(v1alpha1.LabelReservationID) +} + var _ cloudprovider.CloudProvider = (*CloudProvider)(nil) type CloudProvider struct { @@ -61,14 +66,17 @@ type CloudProvider struct { Drifted cloudprovider.DriftReason NodeClassGroupVersionKind []schema.GroupVersionKind RepairPolicy []cloudprovider.RepairPolicy + + ReservationManagerProvider *ReservationManagerProvider } func NewCloudProvider() *CloudProvider { return &CloudProvider{ - AllowedCreateCalls: math.MaxInt, - CreatedNodeClaims: map[string]*v1.NodeClaim{}, - InstanceTypesForNodePool: map[string][]*cloudprovider.InstanceType{}, - ErrorsForNodePool: map[string]error{}, + AllowedCreateCalls: math.MaxInt, + CreatedNodeClaims: map[string]*v1.NodeClaim{}, + InstanceTypesForNodePool: map[string][]*cloudprovider.InstanceType{}, + ErrorsForNodePool: map[string]error{}, + ReservationManagerProvider: NewReservationManagerProvider(), } } @@ -102,6 +110,7 @@ func (c *CloudProvider) Reset() { TolerationDuration: 30 * time.Minute, }, } + c.ReservationManagerProvider.Reset() } func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1.NodeClaim, error) { @@ -139,14 +148,19 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v labels[key] = requirement.Values()[0] } } - // Find Offering - for _, o := range instanceType.Offerings.Available() { - if reqs.IsCompatible(o.Requirements, scheduling.AllowUndefinedWellKnownLabels) { - labels[corev1.LabelTopologyZone] = o.Requirements.Get(corev1.LabelTopologyZone).Any() - labels[v1.CapacityTypeLabelKey] = o.Requirements.Get(v1.CapacityTypeLabelKey).Any() - break + // Find offering, prioritizing reserved instances + offering := func() cloudprovider.Offering { + offerings := instanceType.Offerings.Available().Compatible(reqs) + lo.Must0(len(offerings) != 0, "created nodeclaim with no available offerings") + if reservedOfferings := offerings.WithCapacityType(v1.CapacityTypeReserved); len(reservedOfferings) != 0 { + c.ReservationManagerProvider.create(reservedOfferings[0].Requirements.Get(v1alpha1.LabelReservationID).Any()) + return reservedOfferings[0] } - } + return offerings[0] + }() + labels[corev1.LabelTopologyZone] = offering.Requirements.Get(corev1.LabelTopologyZone).Any() + labels[v1.CapacityTypeLabelKey] = offering.Requirements.Get(v1.CapacityTypeLabelKey).Any() + created := &v1.NodeClaim{ ObjectMeta: metav1.ObjectMeta{ Name: nodeClaim.Name, @@ -189,7 +203,8 @@ func (c *CloudProvider) List(_ context.Context) ([]*v1.NodeClaim, error) { }), nil } -func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool) ([]*cloudprovider.InstanceType, error) { +// Note: this fake implementation does **not** support availability snapshots. The burden of testing snapshot support should be on the cloudprovider implementation. +func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool, opts ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) { if np != nil { if err, ok := c.ErrorsForNodePool[np.Name]; ok { return nil, err @@ -200,7 +215,23 @@ func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool) ([] } } if c.InstanceTypes != nil { - return c.InstanceTypes, nil + return lo.Map(c.InstanceTypes, func(it *cloudprovider.InstanceType, _ int) *cloudprovider.InstanceType { + for i := range it.Offerings { + if it.Offerings[i].Requirements.Get(v1.CapacityTypeLabelKey).Any() != v1.CapacityTypeReserved { + continue + } + lo.Must0( + it.Offerings[i].Requirements.Has(v1alpha1.LabelReservationID), + "reserved offering for instance type %s must define requirement for label %s", + it.Name, + v1alpha1.LabelReservationID, + ) + reservationID := it.Offerings[i].Requirements.Get(v1alpha1.LabelReservationID).Any() + it.Offerings[i].ReservationManager = c.ReservationManagerProvider.ReservationManager(reservationID, opts...) + it.Offerings[i].Available = c.ReservationManagerProvider.Capacity(reservationID) > 0 + } + return it + }), nil } return []*cloudprovider.InstanceType{ NewInstanceType(InstanceTypeOptions{ diff --git a/pkg/cloudprovider/fake/instancetype.go b/pkg/cloudprovider/fake/instancetype.go index 73e1cf8157..f5ae29c53d 100644 --- a/pkg/cloudprovider/fake/instancetype.go +++ b/pkg/cloudprovider/fake/instancetype.go @@ -65,26 +65,46 @@ func NewInstanceTypeWithCustomRequirement(options InstanceTypeOptions, customReq } if len(options.Offerings) == 0 { options.Offerings = []cloudprovider.Offering{ - {Requirements: scheduling.NewLabelRequirements(map[string]string{ - v1.CapacityTypeLabelKey: "spot", - corev1.LabelTopologyZone: "test-zone-1", - }), Price: PriceFromResources(options.Resources), Available: true}, - {Requirements: scheduling.NewLabelRequirements(map[string]string{ - v1.CapacityTypeLabelKey: "spot", - corev1.LabelTopologyZone: "test-zone-2", - }), Price: PriceFromResources(options.Resources), Available: true}, - {Requirements: scheduling.NewLabelRequirements(map[string]string{ - v1.CapacityTypeLabelKey: "on-demand", - corev1.LabelTopologyZone: "test-zone-1", - }), Price: PriceFromResources(options.Resources), Available: true}, - {Requirements: scheduling.NewLabelRequirements(map[string]string{ - v1.CapacityTypeLabelKey: "on-demand", - corev1.LabelTopologyZone: "test-zone-2", - }), Price: PriceFromResources(options.Resources), Available: true}, - {Requirements: scheduling.NewLabelRequirements(map[string]string{ - v1.CapacityTypeLabelKey: "on-demand", - corev1.LabelTopologyZone: "test-zone-3", - }), Price: PriceFromResources(options.Resources), Available: true}, + { + Available: true, + Requirements: scheduling.NewLabelRequirements(map[string]string{ + v1.CapacityTypeLabelKey: "spot", + corev1.LabelTopologyZone: "test-zone-1", + }), + Price: PriceFromResources(options.Resources), + }, + { + Available: true, + Requirements: scheduling.NewLabelRequirements(map[string]string{ + v1.CapacityTypeLabelKey: "spot", + corev1.LabelTopologyZone: "test-zone-2", + }), + Price: PriceFromResources(options.Resources), + }, + { + Available: true, + Requirements: scheduling.NewLabelRequirements(map[string]string{ + v1.CapacityTypeLabelKey: "on-demand", + corev1.LabelTopologyZone: "test-zone-1", + }), + Price: PriceFromResources(options.Resources), + }, + { + Available: true, + Requirements: scheduling.NewLabelRequirements(map[string]string{ + v1.CapacityTypeLabelKey: "on-demand", + corev1.LabelTopologyZone: "test-zone-2", + }), + Price: PriceFromResources(options.Resources), + }, + { + Available: true, + Requirements: scheduling.NewLabelRequirements(map[string]string{ + v1.CapacityTypeLabelKey: "on-demand", + corev1.LabelTopologyZone: "test-zone-3", + }), + Price: PriceFromResources(options.Resources), + }, } } if len(options.Architecture) == 0 { @@ -153,12 +173,12 @@ func InstanceTypesAssorted() []*cloudprovider.InstanceType { price := PriceFromResources(opts.Resources) opts.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduling.NewLabelRequirements(map[string]string{ v1.CapacityTypeLabelKey: ct, corev1.LabelTopologyZone: zone, }), - Price: price, - Available: true, + Price: price, }, } instanceTypes = append(instanceTypes, NewInstanceType(opts)) diff --git a/pkg/cloudprovider/fake/reservationmanager.go b/pkg/cloudprovider/fake/reservationmanager.go new file mode 100644 index 0000000000..3410e57227 --- /dev/null +++ b/pkg/cloudprovider/fake/reservationmanager.go @@ -0,0 +1,105 @@ +package fake + +import ( + "maps" + + "github.com/awslabs/operatorpkg/option" + "github.com/samber/lo" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/karpenter/pkg/cloudprovider" +) + +type ReservationManagerProvider struct { + snapshots map[types.UID]*snapshot + capacity map[string]int // map[offering name]total capacity +} + +type snapshot struct { + reservations map[string]sets.Set[string] // map[reservation id]set[offering name] + capacity map[string]int +} + +func NewReservationManagerProvider() *ReservationManagerProvider { + return &ReservationManagerProvider{ + snapshots: map[types.UID]*snapshot{}, + capacity: map[string]int{}, + } +} + +// SetCapacity sets the total number of instances available for a given reservationID. This value will be decremented +// internally each time an instance is launched for the given reservationID. +func (p *ReservationManagerProvider) SetCapacity(reservationID string, capacity int) { + p.capacity[reservationID] = capacity +} + +// Capacity returns the total number of instances +func (p *ReservationManagerProvider) Capacity(reservationID string) int { + return p.capacity[reservationID] +} + +// create decrements the availability for the given reservationID by one. +func (p *ReservationManagerProvider) create(reservationID string) { + lo.Must0(p.capacity[reservationID] > 0, "created an instance with an offering with no availability") + p.capacity[reservationID] -= 1 + if p.capacity[reservationID] == 0 { + + } +} + +// getSnapshot returns an existing snapshot, if one exists for the given UUID, or creates a new one +func (p *ReservationManagerProvider) getSnapshot(uuid *types.UID) *snapshot { + if uuid != nil { + if snapshot, ok := p.snapshots[*uuid]; ok { + return snapshot + } + } + snapshot := &snapshot{ + reservations: map[string]sets.Set[string]{}, + capacity: map[string]int{}, + } + maps.Copy(snapshot.capacity, p.capacity) + if uuid != nil { + p.snapshots[*uuid] = snapshot + } + return snapshot +} + +func (p *ReservationManagerProvider) Reset() { + *p = *NewReservationManagerProvider() +} + +func (p *ReservationManagerProvider) ReservationManager(reservationID string, opts ...option.Function[cloudprovider.GetInstanceTypeOptions]) cloudprovider.ReservationManager { + return snapshotAdapter{ + snapshot: p.getSnapshot(option.Resolve(opts...).AvailabilitySnapshotUUID), + reservationID: reservationID, + } +} + +type snapshotAdapter struct { + *snapshot + reservationID string +} + +func (a snapshotAdapter) Reserve(reservationID string) bool { + if reservations, ok := a.reservations[reservationID]; ok && reservations.Has(a.reservationID) { + return true + } + if a.capacity[a.reservationID] > 0 { + reservations, ok := a.reservations[reservationID] + if !ok { + reservations = sets.New[string]() + a.reservations[reservationID] = reservations + } + reservations.Insert(a.reservationID) + a.capacity[a.reservationID] -= 1 + return true + } + return false +} + +func (a snapshotAdapter) Release(reservationID string) { + if reservations, ok := a.reservations[reservationID]; ok && reservations.Has(a.reservationID) { + reservations.Delete(a.reservationID) + } +} diff --git a/pkg/cloudprovider/metrics/cloudprovider.go b/pkg/cloudprovider/metrics/cloudprovider.go index 79ae60b028..8abcd2f2da 100644 --- a/pkg/cloudprovider/metrics/cloudprovider.go +++ b/pkg/cloudprovider/metrics/cloudprovider.go @@ -20,6 +20,7 @@ import ( "context" opmetrics "github.com/awslabs/operatorpkg/metrics" + "github.com/awslabs/operatorpkg/option" "github.com/prometheus/client_golang/prometheus" crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" @@ -133,7 +134,7 @@ func (d *decorator) List(ctx context.Context) ([]*v1.NodeClaim, error) { return nodeClaims, err } -func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) { +func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool, opts ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) { method := "GetInstanceTypes" defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))() instanceType, err := d.CloudProvider.GetInstanceTypes(ctx, nodePool) diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index a6a6bd324e..01585bec19 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -25,9 +25,11 @@ import ( "sync" "time" + "github.com/awslabs/operatorpkg/option" "github.com/awslabs/operatorpkg/status" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" @@ -69,7 +71,7 @@ type CloudProvider interface { // Availability of types or zone may vary by nodepool or over time. Regardless of // availability, the GetInstanceTypes method should always return all instance types, // even those with no offerings available. - GetInstanceTypes(context.Context, *v1.NodePool) ([]*InstanceType, error) + GetInstanceTypes(context.Context, *v1.NodePool, ...option.Function[GetInstanceTypeOptions]) ([]*InstanceType, error) // IsDrifted returns whether a NodeClaim has drifted from the provisioning requirements // it is tied to. IsDrifted(context.Context, *v1.NodeClaim) (DriftReason, error) @@ -83,6 +85,18 @@ type CloudProvider interface { GetSupportedNodeClasses() []status.Object } +type GetInstanceTypeOptions struct { + AvailabilitySnapshotUUID *types.UID +} + +// GetInstanceTypes calls made with the same snapshot ID should have a consistent view of offering availability. This +// is crucial for offerings with capacity type "reserved" since cross-nodepool offerings may share availability. +func WithAvailabilitySnapshotUUID(uuid types.UID) option.Function[GetInstanceTypeOptions] { + return func(opts *GetInstanceTypeOptions) { + opts.AvailabilitySnapshotUUID = lo.ToPtr(uuid) + } +} + // InstanceType describes the properties of a potential node (either concrete attributes of an instance of this type // or supported options in the case of arrays) type InstanceType struct { @@ -226,6 +240,15 @@ func (its InstanceTypes) Truncate(requirements scheduling.Requirements, maxItems return truncatedInstanceTypes, nil } +func (its InstanceTypes) Difference(other InstanceTypes) InstanceTypes { + names := sets.New(lo.Map(other, func(it *InstanceType, _ int) string { + return it.Name + })...) + return lo.Reject(its, func(it *InstanceType, _ int) bool { + return names.Has(it.Name) + }) +} + type InstanceTypeOverhead struct { // KubeReserved returns the default resources allocated to kubernetes system daemons by default KubeReserved corev1.ResourceList @@ -239,20 +262,56 @@ func (i InstanceTypeOverhead) Total() corev1.ResourceList { return resources.Merge(i.KubeReserved, i.SystemReserved, i.EvictionThreshold) } +// ReservationManager is used to track the availability of a reserved offering over the course of a scheduling +// simulation. Reserved offerings may have a limited number of available instances associated with them, +// This is exposed as an interface for cloudprovider's to implement to give flexibility when dealing with separate +// offerings with associated availablility. +type ReservationManager interface { + // Reserve takes a unique identifier for a reservation, and returns a boolean indicating if the reservation was + // successful. Reserve should be idempotent, i.e. multiple calls with the same reservation ID should only count for a + // single reservation. + Reserve(string) bool + // Release takes a unique identifier for a reservation, and should discard any matching reservations. If no + // reservations exist for the given id, release should be a no-op. + Release(string) +} + // An Offering describes where an InstanceType is available to be used, with the expectation that its properties // may be tightly coupled (e.g. the availability of an instance type in some zone is scoped to a capacity type) and // these properties are captured with labels in Requirements. -// Requirements are required to contain the keys v1.CapacityTypeLabelKey and corev1.LabelTopologyZone +// Requirements are required to contain the keys v1.CapacityTypeLabelKey and corev1.LabelTopologyZone. type Offering struct { + // ReservationManager is used for tracking availabity of reserved offerings over the course of a scheduling loop. It + // must be non-nil for offerings with capacity type "reserved", but may be nil otherwise. + ReservationManager + Requirements scheduling.Requirements Price float64 - // Available is added so that Offerings can return all offerings that have ever existed for an instance type, - // so we can get historical pricing data for calculating savings in consolidation - Available bool + Available bool } type Offerings []Offering +// WithCapacityType filters the offerings by the provided capacity type. +func (ofs Offerings) WithCapacityType(capacityType string) Offerings { + return lo.Filter(ofs, func(o Offering, _ int) bool { + return o.Requirements.Get(v1.CapacityTypeLabelKey).Any() == capacityType + }) +} + +// Reserve attempts to make a reservation for each offering, returning true if it was successful for any. +func (ofs Offerings) Reserve(id string) Offerings { + return lo.Filter(ofs, func(o Offering, _ int) bool { + return o.Reserve(id) + }) +} + +func (ofs Offerings) Release(id string) { + for i := range ofs { + ofs[i].Release(id) + } +} + // Available filters the available offerings from the returned offerings func (ofs Offerings) Available() Offerings { return lo.Filter(ofs, func(o Offering, _ int) bool { @@ -260,6 +319,17 @@ func (ofs Offerings) Available() Offerings { }) } +func (ofs Offerings) PartitionCompatible(reqs scheduling.Requirements) (compatible Offerings, incompatible Offerings) { + for _, o := range ofs { + if reqs.IsCompatible(o.Requirements, scheduling.AllowUndefinedWellKnownLabels) { + compatible = append(compatible, o) + } else { + incompatible = append(incompatible, o) + } + } + return compatible, incompatible +} + // Compatible returns the offerings based on the passed requirements func (ofs Offerings) Compatible(reqs scheduling.Requirements) Offerings { return lo.Filter(ofs, func(offering Offering, _ int) bool { diff --git a/pkg/controllers/disruption/consolidation_test.go b/pkg/controllers/disruption/consolidation_test.go index 9506873470..b01c19d922 100644 --- a/pkg/controllers/disruption/consolidation_test.go +++ b/pkg/controllers/disruption/consolidation_test.go @@ -2050,9 +2050,9 @@ var _ = Describe("Consolidation", func() { Name: "current-on-demand", Offerings: []cloudprovider.Offering{ { + Available: false, Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}), Price: 0.5, - Available: false, }, }, }) @@ -2060,19 +2060,19 @@ var _ = Describe("Consolidation", func() { Name: "potential-spot-replacement", Offerings: []cloudprovider.Offering{ { + Available: true, Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1a"}), Price: 1.0, - Available: true, }, { + Available: true, Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1b"}), Price: 0.2, - Available: true, }, { + Available: true, Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1c"}), Price: 0.4, - Available: true, }, }, }) @@ -2134,9 +2134,9 @@ var _ = Describe("Consolidation", func() { Name: "current-on-demand", Offerings: []cloudprovider.Offering{ { + Available: false, Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}), Price: 0.5, - Available: false, }, }, }) @@ -2144,24 +2144,24 @@ var _ = Describe("Consolidation", func() { Name: "on-demand-replacement", Offerings: []cloudprovider.Offering{ { + Available: true, Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}), Price: 0.6, - Available: true, }, { + Available: true, Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1b"}), Price: 0.6, - Available: true, }, { + Available: true, Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1b"}), Price: 0.2, - Available: true, }, { + Available: true, Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1c"}), Price: 0.3, - Available: true, }, }, }) diff --git a/pkg/controllers/disruption/drift_test.go b/pkg/controllers/disruption/drift_test.go index fcf5ee1c9d..31d2157949 100644 --- a/pkg/controllers/disruption/drift_test.go +++ b/pkg/controllers/disruption/drift_test.go @@ -775,9 +775,9 @@ var _ = Describe("Drift", func() { Name: "current-on-demand", Offerings: []cloudprovider.Offering{ { + Available: false, Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}), Price: 0.5, - Available: false, }, }, }) @@ -785,9 +785,9 @@ var _ = Describe("Drift", func() { Name: "replacement-on-demand", Offerings: []cloudprovider.Offering{ { + Available: true, Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}), Price: 0.3, - Available: true, }, }, Resources: map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("3")}, diff --git a/pkg/controllers/disruption/helpers.go b/pkg/controllers/disruption/helpers.go index 85654fe597..2170922a60 100644 --- a/pkg/controllers/disruption/helpers.go +++ b/pkg/controllers/disruption/helpers.go @@ -33,7 +33,7 @@ import ( disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events" "sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration" "sigs.k8s.io/karpenter/pkg/controllers/provisioning" - pscheduling "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling" + "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling" "sigs.k8s.io/karpenter/pkg/controllers/state" "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/metrics" @@ -48,7 +48,7 @@ var errCandidateDeleting = fmt.Errorf("candidate is deleting") //nolint:gocyclo func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *state.Cluster, provisioner *provisioning.Provisioner, candidates ...*Candidate, -) (pscheduling.Results, error) { +) (scheduling.Results, error) { candidateNames := sets.NewString(lo.Map(candidates, func(t *Candidate, i int) string { return t.Name() })...) nodes := cluster.Nodes() deletingNodes := nodes.Deleting() @@ -62,33 +62,45 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster * if _, ok := lo.Find(deletingNodes, func(n *state.StateNode) bool { return candidateNames.Has(n.Name()) }); ok { - return pscheduling.Results{}, errCandidateDeleting + return scheduling.Results{}, errCandidateDeleting } // We get the pods that are on nodes that are deleting deletingNodePods, err := deletingNodes.ReschedulablePods(ctx, kubeClient) if err != nil { - return pscheduling.Results{}, fmt.Errorf("failed to get pods from deleting nodes, %w", err) + return scheduling.Results{}, fmt.Errorf("failed to get pods from deleting nodes, %w", err) } // start by getting all pending pods pods, err := provisioner.GetPendingPods(ctx) if err != nil { - return pscheduling.Results{}, fmt.Errorf("determining pending pods, %w", err) + return scheduling.Results{}, fmt.Errorf("determining pending pods, %w", err) } for _, n := range candidates { pods = append(pods, n.reschedulablePods...) } pods = append(pods, deletingNodePods...) - scheduler, err := provisioner.NewScheduler(log.IntoContext(ctx, operatorlogging.NopLogger), pods, stateNodes) + scheduler, err := provisioner.NewScheduler( + log.IntoContext(ctx, operatorlogging.NopLogger), + pods, + stateNodes, + // ReservedOfferingModeFallback is used for the following reasons: + // - For consolidation, we're only going to accept a decision if it lowers the cost of the cluster, and if it only + // requires a single additional nodeclaim. It doesn't matter in this scenario if we fallback. + // - For drift, fallback is required to ensure progress. Progress is only ensured with strict if multiple scheduling + // loops are allowed to proceed, but we need to ensure all pods on the drifted node are scheduled within a single + // iteration. This may result in non-ideal instance choices, but the alternative is deadlock. + // See issue TODO for more details. + scheduling.ReservedOfferingModeFallback, + ) if err != nil { - return pscheduling.Results{}, fmt.Errorf("creating scheduler, %w", err) + return scheduling.Results{}, fmt.Errorf("creating scheduler, %w", err) } deletingNodePodKeys := lo.SliceToMap(deletingNodePods, func(p *corev1.Pod) (client.ObjectKey, interface{}) { return client.ObjectKeyFromObject(p), nil }) - results := scheduler.Solve(log.IntoContext(ctx, operatorlogging.NopLogger), pods).TruncateInstanceTypes(pscheduling.MaxInstanceTypes) + results := scheduler.Solve(log.IntoContext(ctx, operatorlogging.NopLogger), pods).TruncateInstanceTypes(scheduling.MaxInstanceTypes) for _, n := range results.ExistingNodes { // We consider existing nodes for scheduling. When these nodes are unmanaged, their taint logic should // tell us if we can schedule to them or not; however, if these nodes are managed, we will still schedule to them @@ -100,6 +112,7 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster * // If the pod is on a deleting node, we assume one of two things has already happened: // 1. The node was manually terminated, at which the provisioning controller has scheduled or is scheduling a node // for the pod. + // TODO: clarify this point, not clear to me // 2. The node was chosen for a previous disruption command, we assume that the uninitialized node will come up // for this command, and we assume it will be successful. If it is not successful, the node will become // not terminating, and we will no longer need to consider these pods. @@ -115,10 +128,10 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster * // UninitializedNodeError tracks a special pod error for disruption where pods schedule to a node // that hasn't been initialized yet, meaning that we can't be confident to make a disruption decision based off of it type UninitializedNodeError struct { - *pscheduling.ExistingNode + *scheduling.ExistingNode } -func NewUninitializedNodeError(node *pscheduling.ExistingNode) *UninitializedNodeError { +func NewUninitializedNodeError(node *scheduling.ExistingNode) *UninitializedNodeError { return &UninitializedNodeError{ExistingNode: node} } diff --git a/pkg/controllers/disruption/suite_test.go b/pkg/controllers/disruption/suite_test.go index 42cd1afd38..78c999755d 100644 --- a/pkg/controllers/disruption/suite_test.go +++ b/pkg/controllers/disruption/suite_test.go @@ -471,9 +471,9 @@ var _ = Describe("Disruption Taints", func() { Name: "current-on-demand", Offerings: []cloudprovider.Offering{ { + Available: false, Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}), Price: 1.5, - Available: false, }, }, }) @@ -481,19 +481,19 @@ var _ = Describe("Disruption Taints", func() { Name: "spot-replacement", Offerings: []cloudprovider.Offering{ { + Available: true, Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1a"}), Price: 1.0, - Available: true, }, { + Available: true, Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1b"}), Price: 0.2, - Available: true, }, { + Available: true, Requirements: scheduling.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1c"}), Price: 0.4, - Available: true, }, }, }) diff --git a/pkg/controllers/nodeclaim/disruption/suite_test.go b/pkg/controllers/nodeclaim/disruption/suite_test.go index 23fbc1641b..3eb2c29f5b 100644 --- a/pkg/controllers/nodeclaim/disruption/suite_test.go +++ b/pkg/controllers/nodeclaim/disruption/suite_test.go @@ -79,20 +79,20 @@ var _ = BeforeEach(func() { OperatingSystems: sets.New(string(corev1.Linux)), Offerings: []cloudprovider.Offering{ { + Available: true, Requirements: scheduling.NewLabelRequirements(map[string]string{ v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1a", }), - Price: fake.PriceFromResources(resources), - Available: true, + Price: fake.PriceFromResources(resources), }, { + Available: true, Requirements: scheduling.NewLabelRequirements(map[string]string{ v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a", }), - Price: fake.PriceFromResources(resources), - Available: true, + Price: fake.PriceFromResources(resources), }, }, }) diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index ef2b5b52f5..81e63b086f 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -31,6 +31,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "k8s.io/utils/clock" @@ -213,7 +214,12 @@ func (p *Provisioner) consolidationWarnings(ctx context.Context, pods []*corev1. var ErrNodePoolsNotFound = errors.New("no nodepools found") //nolint:gocyclo -func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stateNodes []*state.StateNode) (*scheduler.Scheduler, error) { +func (p *Provisioner) NewScheduler( + ctx context.Context, + pods []*corev1.Pod, + stateNodes []*state.StateNode, + reservedOfferingMode scheduler.ReservedOfferingMode, +) (*scheduler.Scheduler, error) { nodePools, err := nodepoolutils.ListManaged(ctx, p.kubeClient, p.cloudProvider) if err != nil { return nil, fmt.Errorf("listing nodepools, %w", err) @@ -229,6 +235,8 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat return nil, ErrNodePoolsNotFound } + schedulerID := uuid.NewUUID() + // nodeTemplates generated from NodePools are ordered by weight // since they are stored within a slice and scheduling // will always attempt to schedule on the first nodeTemplate @@ -237,7 +245,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat instanceTypes := map[string][]*cloudprovider.InstanceType{} for _, np := range nodePools { // Get instance type options - its, err := p.cloudProvider.GetInstanceTypes(ctx, np) + its, err := p.cloudProvider.GetInstanceTypes(ctx, np, cloudprovider.WithAvailabilitySnapshotUUID(schedulerID)) if err != nil { log.FromContext(ctx).WithValues("NodePool", klog.KRef("", np.Name)).Error(err, "skipping, unable to resolve instance types") continue @@ -261,7 +269,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat if err != nil { return nil, fmt.Errorf("getting daemon pods, %w", err) } - return scheduler.NewScheduler(ctx, p.kubeClient, nodePools, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, p.clock), nil + return scheduler.NewScheduler(ctx, schedulerID, p.kubeClient, nodePools, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, p.clock, reservedOfferingMode), nil } func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { @@ -299,7 +307,7 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { return scheduler.Results{}, nil } log.FromContext(ctx).V(1).WithValues("pending-pods", len(pendingPods), "deleting-pods", len(deletingNodePods)).Info("computing scheduling decision for provisionable pod(s)") - s, err := p.NewScheduler(ctx, pods, nodes.Active()) + s, err := p.NewScheduler(ctx, pods, nodes.Active(), scheduler.ReservedOfferingModeStrict) if err != nil { if errors.Is(err, ErrNodePoolsNotFound) { log.FromContext(ctx).Info("no nodepools found") diff --git a/pkg/controllers/provisioning/scheduling/instance_selection_test.go b/pkg/controllers/provisioning/scheduling/instance_selection_test.go index cfeffb23a0..95188e3e2d 100644 --- a/pkg/controllers/provisioning/scheduling/instance_selection_test.go +++ b/pkg/controllers/provisioning/scheduling/instance_selection_test.go @@ -608,8 +608,16 @@ var _ = Describe("Instance Type Selection", func() { corev1.ResourceMemory: resource.MustParse("1Gi"), }, Offerings: []cloudprovider.Offering{ - {Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}), Price: 1.0, Available: true}, - {Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1a"}), Price: 0.2, Available: true}, + { + Available: true, + Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}), + Price: 1.0, + }, + { + Available: true, + Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1a"}), + Price: 0.2, + }, }, }), fake.NewInstanceType(fake.InstanceTypeOptions{ @@ -621,8 +629,16 @@ var _ = Describe("Instance Type Selection", func() { corev1.ResourceMemory: resource.MustParse("1Gi"), }, Offerings: []cloudprovider.Offering{ - {Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}), Price: 1.3, Available: true}, - {Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1a"}), Price: 0.1, Available: true}, + { + Available: true, + Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a"}), + Price: 1.3, + }, + { + Available: true, + Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1a"}), + Price: 0.1, + }, }, }), } @@ -657,9 +673,9 @@ var _ = Describe("Instance Type Selection", func() { } opts1.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 0.52, - Available: true, }, } opts2 := fake.InstanceTypeOptions{ @@ -673,9 +689,9 @@ var _ = Describe("Instance Type Selection", func() { } opts2.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 1.0, - Available: true, }, } instanceTypes = append(instanceTypes, fake.NewInstanceType(opts1)) @@ -735,9 +751,9 @@ var _ = Describe("Instance Type Selection", func() { } opts1.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 0.52, - Available: true, }, } opts2 := fake.InstanceTypeOptions{ @@ -751,9 +767,9 @@ var _ = Describe("Instance Type Selection", func() { } opts2.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 1.0, - Available: true, }, } opts3 := fake.InstanceTypeOptions{ @@ -767,9 +783,9 @@ var _ = Describe("Instance Type Selection", func() { } opts3.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 1.2, - Available: true, }, } @@ -831,9 +847,9 @@ var _ = Describe("Instance Type Selection", func() { } opts1.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 0.52, - Available: true, }, } opts2 := fake.InstanceTypeOptions{ @@ -847,9 +863,9 @@ var _ = Describe("Instance Type Selection", func() { } opts2.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 1.0, - Available: true, }, } instanceTypes = append(instanceTypes, fake.NewInstanceTypeWithCustomRequirement(opts1, scheduler.NewRequirement(instanceGeneration, corev1.NodeSelectorOpIn, "2"))) @@ -920,9 +936,9 @@ var _ = Describe("Instance Type Selection", func() { } opts1.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 0.52, - Available: true, }, } opts2 := fake.InstanceTypeOptions{ @@ -936,9 +952,9 @@ var _ = Describe("Instance Type Selection", func() { } opts2.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 1.0, - Available: true, }, } opts3 := fake.InstanceTypeOptions{ @@ -952,9 +968,9 @@ var _ = Describe("Instance Type Selection", func() { } opts3.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 1.2, - Available: true, }, } instanceTypes = append(instanceTypes, fake.NewInstanceTypeWithCustomRequirement(opts1, scheduler.NewRequirement(instanceGeneration, corev1.NodeSelectorOpIn, "2"))) @@ -1015,9 +1031,9 @@ var _ = Describe("Instance Type Selection", func() { } opts1.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 0.52, - Available: true, }, } opts2 := fake.InstanceTypeOptions{ @@ -1031,9 +1047,9 @@ var _ = Describe("Instance Type Selection", func() { } opts2.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 1.2, - Available: true, }, } instanceTypes = append(instanceTypes, fake.NewInstanceTypeWithCustomRequirement(opts1, scheduler.NewRequirement(instanceGeneration, corev1.NodeSelectorOpIn, "2"))) @@ -1084,9 +1100,9 @@ var _ = Describe("Instance Type Selection", func() { } opts1.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 0.52, - Available: true, }, } opts2 := fake.InstanceTypeOptions{ @@ -1100,9 +1116,9 @@ var _ = Describe("Instance Type Selection", func() { } opts2.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 1.0, - Available: true, }, } opts3 := fake.InstanceTypeOptions{ @@ -1116,9 +1132,9 @@ var _ = Describe("Instance Type Selection", func() { } opts3.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 1.2, - Available: true, }, } instanceTypes = append(instanceTypes, fake.NewInstanceType(opts1)) @@ -1186,9 +1202,9 @@ var _ = Describe("Instance Type Selection", func() { } opts1.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 0.52, - Available: true, }, } opts2 := fake.InstanceTypeOptions{ @@ -1202,9 +1218,9 @@ var _ = Describe("Instance Type Selection", func() { } opts2.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 1.0, - Available: true, }, } opts3 := fake.InstanceTypeOptions{ @@ -1218,9 +1234,9 @@ var _ = Describe("Instance Type Selection", func() { } opts3.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 1.2, - Available: true, }, } opts4 := fake.InstanceTypeOptions{ @@ -1234,9 +1250,9 @@ var _ = Describe("Instance Type Selection", func() { } opts4.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 1.2, - Available: true, }, } instanceTypes = append(instanceTypes, fake.NewInstanceTypeWithCustomRequirement(opts1, scheduler.NewRequirement(instanceGeneration, corev1.NodeSelectorOpIn, "2"))) @@ -1332,9 +1348,9 @@ var _ = Describe("Instance Type Selection", func() { } opts1.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 0.52, - Available: true, }, } opts2 := fake.InstanceTypeOptions{ @@ -1348,9 +1364,9 @@ var _ = Describe("Instance Type Selection", func() { } opts2.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 1.0, - Available: true, }, } instanceTypes = append(instanceTypes, fake.NewInstanceType(opts1)) @@ -1407,9 +1423,9 @@ var _ = Describe("Instance Type Selection", func() { } opts1.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 0.52, - Available: true, }, } opts2 := fake.InstanceTypeOptions{ @@ -1423,9 +1439,9 @@ var _ = Describe("Instance Type Selection", func() { } opts2.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 1.0, - Available: true, }, } instanceTypes = append(instanceTypes, fake.NewInstanceType(opts1)) @@ -1492,9 +1508,9 @@ var _ = Describe("Instance Type Selection", func() { } opts1.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 0.52, - Available: true, }, } opts2 := fake.InstanceTypeOptions{ @@ -1508,9 +1524,9 @@ var _ = Describe("Instance Type Selection", func() { } opts2.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduler.NewLabelRequirements(map[string]string{v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1-spot"}), Price: 1.0, - Available: true, }, } instanceTypes = append(instanceTypes, fake.NewInstanceType(opts1)) diff --git a/pkg/controllers/provisioning/scheduling/nodeclaim.go b/pkg/controllers/provisioning/scheduling/nodeclaim.go index c2bbcb90cf..5e57831d65 100644 --- a/pkg/controllers/provisioning/scheduling/nodeclaim.go +++ b/pkg/controllers/provisioning/scheduling/nodeclaim.go @@ -17,14 +17,21 @@ limitations under the License. package scheduling import ( + "context" + "errors" "fmt" "strings" "sync/atomic" "github.com/samber/lo" - v1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/log" + + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/scheduling" "sigs.k8s.io/karpenter/pkg/utils/resources" @@ -35,36 +42,93 @@ import ( type NodeClaim struct { NodeClaimTemplate - Pods []*v1.Pod - topology *Topology - hostPortUsage *scheduling.HostPortUsage - daemonResources v1.ResourceList - hostname string + Pods []*corev1.Pod + reservedOfferings map[string]cloudprovider.Offerings + reservedOfferingMode ReservedOfferingMode + topology *Topology + hostPortUsage *scheduling.HostPortUsage + daemonResources corev1.ResourceList + hostname string +} + +type NodePoolLimitsExceededError struct { + nodePool string +} + +func (e NodePoolLimitsExceededError) Error() string { + return fmt.Sprintf("all available instance types exceed limits for nodepool: %q", e.nodePool) +} + +// ReservedOfferingError indicates a NodeClaim couldn't be created or a pod couldn't be added to an exxisting NodeClaim +// due to +type ReservedOfferingError struct { + error +} + +func NewReservedOfferingError(err error) ReservedOfferingError { + return ReservedOfferingError{error: err} +} + +func IsReservedOfferingError(err error) bool { + roe := &ReservedOfferingError{} + return errors.As(err, roe) } var nodeID int64 -func NewNodeClaim(nodeClaimTemplate *NodeClaimTemplate, topology *Topology, daemonResources v1.ResourceList, instanceTypes []*cloudprovider.InstanceType) *NodeClaim { +func NewNodeClaimForPod( + ctx context.Context, + nodeClaimTemplate *NodeClaimTemplate, + topology *Topology, + daemonResources corev1.ResourceList, + remainingResources corev1.ResourceList, + pod *corev1.Pod, + podData *PodData, + reservedOfferingMode ReservedOfferingMode, +) (*NodeClaim, error) { + // Ensure we don't consider instance types which would exceed the limits of the NodePool + instanceTypes := filterByRemainingResources(nodeClaimTemplate.InstanceTypeOptions, remainingResources) + if len(instanceTypes) == 0 { + return nil, NodePoolLimitsExceededError{nodePool: nodeClaimTemplate.NodePoolName} + } else if len(nodeClaimTemplate.InstanceTypeOptions) != len(instanceTypes) { + log.FromContext(ctx).V(1).WithValues( + "NodePool", klog.KRef("", nodeClaimTemplate.NodePoolName), + ).Info(fmt.Sprintf( + "%d out of %d instance types were excluded because they would breach limits", + len(nodeClaimTemplate.InstanceTypeOptions)-len(instanceTypes), + len(nodeClaimTemplate.InstanceTypeOptions), + )) + } + // Copy the template, and add hostname hostname := fmt.Sprintf("hostname-placeholder-%04d", atomic.AddInt64(&nodeID, 1)) - topology.Register(v1.LabelHostname, hostname) + topology.Register(corev1.LabelHostname, hostname) template := *nodeClaimTemplate template.Requirements = scheduling.NewRequirements() template.Requirements.Add(nodeClaimTemplate.Requirements.Values()...) - template.Requirements.Add(scheduling.NewRequirement(v1.LabelHostname, v1.NodeSelectorOpIn, hostname)) + template.Requirements.Add(scheduling.NewRequirement(corev1.LabelHostname, corev1.NodeSelectorOpIn, hostname)) template.InstanceTypeOptions = instanceTypes template.Spec.Resources.Requests = daemonResources - - return &NodeClaim{ + nodeClaim := &NodeClaim{ NodeClaimTemplate: template, hostPortUsage: scheduling.NewHostPortUsage(), topology: topology, daemonResources: daemonResources, hostname: hostname, + reservedOfferings: map[string]cloudprovider.Offerings{}, + } + if err := nodeClaim.Add(pod, podData); err != nil { + return nil, fmt.Errorf( + "incomptible with nodepool %q, daemonset overhead=%s, %w", + nodeClaimTemplate.NodePoolName, + resources.String(daemonResources), + err, + ) } + return nodeClaim, nil } -func (n *NodeClaim) Add(pod *v1.Pod, podData *PodData) error { +func (n *NodeClaim) Add(pod *corev1.Pod, podData *PodData) error { // Check Taints if err := scheduling.Taints(n.Spec.Taints).ToleratesPod(pod); err != nil { return err @@ -103,6 +167,11 @@ func (n *NodeClaim) Add(pod *v1.Pod, podData *PodData) error { return err } + reservedOfferings, offeringsToRelease, err := n.reserveOfferings(remaining, nodeClaimRequirements) + if err != nil { + return err + } + // Update node n.Pods = append(n.Pods, pod) n.InstanceTypeOptions = remaining @@ -110,11 +179,74 @@ func (n *NodeClaim) Add(pod *v1.Pod, podData *PodData) error { n.Requirements = nodeClaimRequirements n.topology.Record(pod, n.NodeClaim.Spec.Taints, nodeClaimRequirements, scheduling.AllowUndefinedWellKnownLabels) n.hostPortUsage.Add(pod, hostPorts) + n.reservedOfferings = reservedOfferings + for _, o := range offeringsToRelease { + o.Release(n.hostname) + } return nil } +// reserveOfferings handles the reservation of `karpenter.sh/capacity-type: reserved` offerings, returning the reserved +// offerings in a map from instance type name to offerings. Additionally, a slice of offerings to release is returned. +// This is based on the previously reserved offerings which are no longer compatible with the nodeclaim. These should +// not be released until we're ready to persist the changes to the nodeclaim. +// nolint:gocyclo +func (n *NodeClaim) reserveOfferings( + instanceTypes []*cloudprovider.InstanceType, + nodeClaimRequirements scheduling.Requirements, +) (reservedOfferings map[string]cloudprovider.Offerings, offeringsToRelease []cloudprovider.Offering, err error) { + compatibleReservedInstanceTypes := sets.New[string]() + reservedOfferings = map[string]cloudprovider.Offerings{} + for _, it := range instanceTypes { + offerings := it.Offerings.Available().WithCapacityType(v1.CapacityTypeReserved) + if len(offerings) == 0 { + continue + } + compatible, incompatible := offerings.PartitionCompatible(nodeClaimRequirements) + if len(compatible) != 0 { + compatibleReservedInstanceTypes.Insert(it.Name) + } + offeringsToRelease = append(offeringsToRelease, incompatible...) + // Reserved is a superset of n.reservedOfferings - toRelease. Since reservation is idempotent, any reservations made + // in previous iterations are guaranteed to succeed, guaranteeing that it is at least an equal set. It may have + // additional elements since offerings may have been released by other nodeclaims in previous iterations, freeing + // them to be reserved for this nodeclaim. We want to expand the set of reserved offerings when possible to maximize + // flexibility, maximizing binpacking potential. + if reserved := compatible.Reserve(n.hostname); len(reserved) != 0 { + reservedOfferings[it.Name] = reserved + } + } + if n.reservedOfferingMode == ReservedOfferingModeStrict { + // If an instance type with a compatible reserved offering exists, but we failed to make any reservations, we should + // fail. We include this check due to the pessimistic nature of instance reservation - we have to reserve each potential + // offering for every nodeclaim, since we don't know what we'll launch with. Without this check we would fall back to + // on-demand / spot even when there's sufficient reserved capacity. This check means we may fail to schedule the pod + // during this scheduling simulation, but with the possibility of success on subsequent simulations. + // Note: while this can occur both on initial creation and on + if len(compatibleReservedInstanceTypes) != 0 && len(reservedOfferings) == 0 { + return nil, nil, NewReservedOfferingError(fmt.Errorf("one or more instance types with compatible reserved offerings are available, but could not be reserved")) + } + // If the nodeclaim previously had compatible reserved offerings, but the additional requirements filtered those out, + // we should fail to add the pod to this nodeclaim. + if len(n.reservedOfferings) != 0 && len(reservedOfferings) == 0 { + return nil, nil, NewReservedOfferingError(fmt.Errorf("satisfying updated nodeclaim constraints would remove all compatible reserved offering options")) + } + } + // Ensure we release any offerings for instance types which are no longer compatible with nodeClaimRequirements + for instanceName, offerings := range n.reservedOfferings { + if compatibleReservedInstanceTypes.Has(instanceName) { + continue + } + offeringsToRelease = append(offeringsToRelease, offerings...) + } + return reservedOfferings, offeringsToRelease, nil +} + func (n *NodeClaim) Destroy() { - n.topology.Unregister(v1.LabelHostname, n.hostname) + n.topology.Unregister(corev1.LabelHostname, n.hostname) + for _, ofs := range n.reservedOfferings { + ofs.Release(n.hostname) + } } // FinalizeScheduling is called once all scheduling has completed and allows the node to perform any cleanup @@ -122,7 +254,7 @@ func (n *NodeClaim) Destroy() { func (n *NodeClaim) FinalizeScheduling() { // We need nodes to have hostnames for topology purposes, but we don't want to pass that node name on to consumers // of the node as it will be displayed in error messages - delete(n.Requirements, v1.LabelHostname) + delete(n.Requirements, corev1.LabelHostname) } func (n *NodeClaim) RemoveInstanceTypeOptionsByPriceAndMinValues(reqs scheduling.Requirements, maxPrice float64) (*NodeClaim, error) { @@ -169,9 +301,9 @@ type InstanceTypeFilterError struct { requirements scheduling.Requirements // We capture podRequests here since when a pod can't schedule due to requests, it's because the pod // was on its own on the simulated Node and exceeded the available resources for any instance type for this NodePool - podRequests v1.ResourceList + podRequests corev1.ResourceList // We capture daemonRequests since this contributes to the resources that are required to schedule to this NodePool - daemonRequests v1.ResourceList + daemonRequests corev1.ResourceList } //nolint:gocyclo @@ -227,8 +359,10 @@ func (e InstanceTypeFilterError) Error() string { return fmt.Sprintf("no instance type met the requirements/resources/offering tuple, requirements=%s, resources=%s", e.requirements, resources.String(resources.Merge(e.daemonRequests, e.podRequests))) } +// TODO: When doing the comaptibllity check, we must also filtered by reserved offerings for the +// //nolint:gocyclo -func filterInstanceTypesByRequirements(instanceTypes []*cloudprovider.InstanceType, requirements scheduling.Requirements, podRequests, daemonRequests, totalRequests v1.ResourceList) (cloudprovider.InstanceTypes, error) { +func filterInstanceTypesByRequirements(instanceTypes []*cloudprovider.InstanceType, requirements scheduling.Requirements, podRequests, daemonRequests, totalRequests corev1.ResourceList) (cloudprovider.InstanceTypes, error) { // We hold the results of our scheduling simulation inside of this InstanceTypeFilterError struct // to reduce the CPU load of having to generate the error string for a failed scheduling simulation err := InstanceTypeFilterError{ @@ -296,6 +430,6 @@ func compatible(instanceType *cloudprovider.InstanceType, requirements schedulin return instanceType.Requirements.Intersects(requirements) == nil } -func fits(instanceType *cloudprovider.InstanceType, requests v1.ResourceList) bool { +func fits(instanceType *cloudprovider.InstanceType, requests corev1.ResourceList) bool { return resources.Fits(requests, instanceType.Allocatable()) } diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index 398942c536..860f011db2 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -27,7 +27,6 @@ import ( "go.uber.org/multierr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/klog/v2" "k8s.io/utils/clock" "sigs.k8s.io/controller-runtime/pkg/client" @@ -44,10 +43,41 @@ import ( "sigs.k8s.io/karpenter/pkg/utils/resources" ) -func NewScheduler(ctx context.Context, kubeClient client.Client, nodePools []*v1.NodePool, - cluster *state.Cluster, stateNodes []*state.StateNode, topology *Topology, - instanceTypes map[string][]*cloudprovider.InstanceType, daemonSetPods []*corev1.Pod, - recorder events.Recorder, clock clock.Clock) *Scheduler { +type ReservedOfferingMode int + +// TODO: Evaluate if another mode should be created for drift. The problem with strict is that it assumes we can run +// multiple scheduling loops to make progress, but if scheduling all pods from the drifted node in a single iteration +// requires fallback, we're at a stalemate. This makes strict a non-starter for drift IMO. +// On the other hand, fallback will result in non-ideal launches when there's constrained capacity. This should be +// rectified by consolidation, but if we can be "right" the at initial launch that would be preferable. +// One potential improvement is a "preferences" type strategy, where we attempt to schedule the pod without fallback +// first. This is an improvement over the current fallback strategy since it ensures all new nodeclaims are attempted, +// before then attempting all nodepools, but it still doesn't address the case when offerings are reserved pessimistically. +// I don't believe there's a solution to this short of the max-flow based instance selection algorithm, which has its own +// drawbacks. +const ( + // ReservedOfferingModeStrict indicates that the scheduler should fail to add a pod to a nodeclaim if doing so would + // prevent it from scheduling to reserved capacity, when it would have otherwise. + ReservedOfferingModeStrict ReservedOfferingMode = iota + // ReservedOfferingModeFallbackAlways indicates to the scheduler that the addition of a pod to a nodeclaim which + // results in all potential reserved offerings being filtered out is allowed (e.g. on-demand / spot fallback). + ReservedOfferingModeFallback +) + +func NewScheduler( + ctx context.Context, + uuid types.UID, + kubeClient client.Client, + nodePools []*v1.NodePool, + cluster *state.Cluster, + stateNodes []*state.StateNode, + topology *Topology, + instanceTypes map[string][]*cloudprovider.InstanceType, + daemonSetPods []*corev1.Pod, + recorder events.Recorder, + clock clock.Clock, + reservedOfferingMode ReservedOfferingMode, +) *Scheduler { // if any of the nodePools add a taint with a prefer no schedule effect, we add a toleration for the taint // during preference relaxation @@ -71,7 +101,7 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodePools []*v1 return nct, true }) s := &Scheduler{ - id: uuid.NewUUID(), + uuid: uuid, kubeClient: kubeClient, nodeClaimTemplates: templates, topology: topology, @@ -83,7 +113,8 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodePools []*v1 remainingResources: lo.SliceToMap(nodePools, func(np *v1.NodePool) (string, corev1.ResourceList) { return np.Name, corev1.ResourceList(np.Spec.Limits) }), - clock: clock, + clock: clock, + reservedOfferingMode: reservedOfferingMode, } s.calculateExistingNodeClaims(stateNodes, daemonSetPods) return s @@ -96,19 +127,20 @@ type PodData struct { } type Scheduler struct { - id types.UID // Unique UUID attached to this scheduling loop - newNodeClaims []*NodeClaim - existingNodes []*ExistingNode - nodeClaimTemplates []*NodeClaimTemplate - remainingResources map[string]corev1.ResourceList // (NodePool name) -> remaining resources for that NodePool - daemonOverhead map[*NodeClaimTemplate]corev1.ResourceList + uuid types.UID // Unique UUID attached to this scheduling loop + newNodeClaims []*NodeClaim + existingNodes []*ExistingNode + nodeClaimTemplates []*NodeClaimTemplate + remainingResources map[string]corev1.ResourceList // (NodePool name) -> remaining resources for that NodePool + daemonOverhead map[*NodeClaimTemplate]corev1.ResourceList cachedPodData map[types.UID]*PodData // (Pod Namespace/Name) -> pre-computed data for pods to avoid re-computation and memory usage - preferences *Preferences - topology *Topology - cluster *state.Cluster - recorder events.Recorder - kubeClient client.Client - clock clock.Clock + preferences *Preferences + topology *Topology + cluster *state.Cluster + recorder events.Recorder + kubeClient client.Client + clock clock.Clock + reservedOfferingMode ReservedOfferingMode } // Results contains the results of the scheduling operation @@ -231,8 +263,8 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results { lastLogTime := s.clock.Now() batchSize := len(q.pods) for { - UnfinishedWorkSeconds.Set(s.clock.Since(startTime).Seconds(), map[string]string{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.id)}) - QueueDepth.Set(float64(len(q.pods)), map[string]string{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.id)}) + UnfinishedWorkSeconds.Set(s.clock.Since(startTime).Seconds(), map[string]string{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.uuid)}) + QueueDepth.Set(float64(len(q.pods)), map[string]string{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.uuid)}) if s.clock.Since(lastLogTime) > time.Minute { log.FromContext(ctx).WithValues("pods-scheduled", batchSize-len(q.pods), "pods-remaining", len(q.pods), "existing-nodes", len(s.existingNodes), "simulated-nodes", len(s.newNodeClaims), "duration", s.clock.Since(startTime).Truncate(time.Second), "scheduling-id", string(s.id)).Info("computing pod scheduling...") @@ -261,7 +293,7 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results { s.updateCachedPodData(pod) } } - UnfinishedWorkSeconds.Delete(map[string]string{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.id)}) + UnfinishedWorkSeconds.Delete(map[string]string{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.uuid)}) for _, m := range s.newNodeClaims { m.FinalizeScheduling() } @@ -269,7 +301,9 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results { return Results{ NewNodeClaims: s.newNodeClaims, ExistingNodes: s.existingNodes, - PodErrors: errors, + PodErrors: lo.OmitBy(errors, func(_ *corev1.Pod, err error) bool { + return IsReservedOfferingError(err) + }), } } @@ -309,25 +343,21 @@ func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error { // Create new node var errs error for _, nodeClaimTemplate := range s.nodeClaimTemplates { - instanceTypes := nodeClaimTemplate.InstanceTypeOptions - // if limits have been applied to the nodepool, ensure we filter instance types to avoid violating those limits - if remaining, ok := s.remainingResources[nodeClaimTemplate.NodePoolName]; ok { - instanceTypes = filterByRemainingResources(instanceTypes, remaining) - if len(instanceTypes) == 0 { - errs = multierr.Append(errs, fmt.Errorf("all available instance types exceed limits for nodepool: %q", nodeClaimTemplate.NodePoolName)) - continue - } else if len(nodeClaimTemplate.InstanceTypeOptions) != len(instanceTypes) { - log.FromContext(ctx).V(1).WithValues("NodePool", klog.KRef("", nodeClaimTemplate.NodePoolName)).Info(fmt.Sprintf("%d out of %d instance types were excluded because they would breach limits", - len(nodeClaimTemplate.InstanceTypeOptions)-len(instanceTypes), len(nodeClaimTemplate.InstanceTypeOptions))) + nodeClaim, err := NewNodeClaimForPod( + ctx, + nodeClaimTemplate, + s.topology, + s.daemonOverhead[nodeClaimTemplate], + s.remainingResources[nodeClaimTemplate.NodePoolName], + pod, + s.cachedPodData[pod.UID], + s.reservedOfferingMode, + ) + if err != nil { + if nodeClaim != nil { + nodeClaim.Destroy() } - } - nodeClaim := NewNodeClaim(nodeClaimTemplate, s.topology, s.daemonOverhead[nodeClaimTemplate], instanceTypes) - if err := nodeClaim.Add(pod, s.cachedPodData[pod.UID]); err != nil { - nodeClaim.Destroy() // Ensure we cleanup any changes that we made while mocking out a NodeClaim - errs = multierr.Append(errs, fmt.Errorf("incompatible with nodepool %q, daemonset overhead=%s, %w", - nodeClaimTemplate.NodePoolName, - resources.String(s.daemonOverhead[nodeClaimTemplate]), - err)) + errs = multierr.Append(errs, err) continue } // we will launch this nodeClaim and need to track its maximum possible resource usage against our remaining resources diff --git a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go index 13b9e368e1..939c7fe303 100644 --- a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go +++ b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go @@ -162,10 +162,20 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) { b.Fatalf("creating topology, %s", err) } - scheduler := scheduling.NewScheduler(ctx, client, []*v1.NodePool{nodePool}, - cluster, nil, topology, - map[string][]*cloudprovider.InstanceType{nodePool.Name: instanceTypes}, nil, - events.NewRecorder(&record.FakeRecorder{}), clock) + scheduler := scheduling.NewScheduler( + ctx, + uuid.NewUUID(), + client, + []*v1.NodePool{nodePool}, + cluster, + nil, + topology, + map[string][]*cloudprovider.InstanceType{nodePool.Name: instanceTypes}, + nil, + events.NewRecorder(&record.FakeRecorder{}), + clock, + scheduling.ReservedOfferingModeStrict, + ) b.ResetTimer() // Pack benchmark diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index 79054a1733..535266bb91 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -130,7 +130,7 @@ var _ = Context("Scheduling", func() { NodeSelectorRequirement: corev1.NodeSelectorRequirement{ Key: v1.CapacityTypeLabelKey, Operator: corev1.NodeSelectorOpIn, - Values: []string{v1.CapacityTypeSpot, v1.CapacityTypeOnDemand}, + Values: []string{v1.CapacityTypeSpot, v1.CapacityTypeOnDemand, v1.CapacityTypeReserved}, }, }, }, @@ -1758,12 +1758,12 @@ var _ = Context("Scheduling", func() { }, Offerings: []cloudprovider.Offering{ { + Available: true, Requirements: pscheduling.NewLabelRequirements(map[string]string{ v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a", }), - Price: 3.00, - Available: true, + Price: 3.00, }, }, }), @@ -1775,12 +1775,12 @@ var _ = Context("Scheduling", func() { }, Offerings: []cloudprovider.Offering{ { + Available: true, Requirements: pscheduling.NewLabelRequirements(map[string]string{ v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a", }), - Price: 2.00, - Available: true, + Price: 2.00, }, }, }), @@ -1792,12 +1792,12 @@ var _ = Context("Scheduling", func() { }, Offerings: []cloudprovider.Offering{ { + Available: true, Requirements: pscheduling.NewLabelRequirements(map[string]string{ v1.CapacityTypeLabelKey: v1.CapacityTypeOnDemand, corev1.LabelTopologyZone: "test-zone-1a", }), - Price: 1.00, - Available: true, + Price: 1.00, }, }, }), @@ -3668,7 +3668,7 @@ var _ = Context("Scheduling", func() { }, }, }) // Create 1000 pods which should take long enough to schedule that we should be able to read the queueDepth metric with a value - s, err := prov.NewScheduler(ctx, pods, nil) + s, err := prov.NewScheduler(ctx, pods, nil, scheduling.ReservedOfferingModeStrict) Expect(err).To(BeNil()) var wg sync.WaitGroup @@ -3740,7 +3740,7 @@ var _ = Context("Scheduling", func() { }, }, }) // Create 1000 pods which should take long enough to schedule that we should be able to read the queueDepth metric with a value - s, err := prov.NewScheduler(ctx, pods, nil) + s, err := prov.NewScheduler(ctx, pods, nil, scheduling.ReservedOfferingModeStrict) Expect(err).To(BeNil()) s.Solve(injection.WithControllerName(ctx, "provisioner"), pods) @@ -3778,6 +3778,98 @@ var _ = Context("Scheduling", func() { Expect(lo.FromPtr(m.Histogram.SampleCount)).To(BeNumerically("==", val+3)) }) }) + + Describe("Reserved Instance Types", func() { + BeforeEach(func() { + cloudProvider.InstanceTypes = []*cloudprovider.InstanceType{ + fake.NewInstanceType(fake.InstanceTypeOptions{ + Name: "large-instance-type", + Resources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("6"), + corev1.ResourceMemory: resource.MustParse("6Gi"), + }, + }), + fake.NewInstanceType(fake.InstanceTypeOptions{ + Name: "medium-instance-type", + Resources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("3"), + corev1.ResourceMemory: resource.MustParse("3Gi"), + }, + }), + fake.NewInstanceType(fake.InstanceTypeOptions{ + Name: "small-instance-type", + Resources: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("2Gi"), + }, + }), + } + reservedInstanceTypes := []*cloudprovider.InstanceType{cloudProvider.InstanceTypes[1], cloudProvider.InstanceTypes[2]} + for _, it := range reservedInstanceTypes { + it.Offerings = append(it.Offerings, cloudprovider.Offering{ + Available: true, + Requirements: pscheduling.NewLabelRequirements(map[string]string{ + v1.CapacityTypeLabelKey: v1.CapacityTypeReserved, + corev1.LabelTopologyZone: "test-zone-1", + v1alpha1.LabelReservationID: fmt.Sprintf("r-%s", it.Name), + }), + Price: fake.PriceFromResources(it.Capacity) / 10000.0, + }) + } + }) + FIt("shouldn't fallback to on-demand or spot when compatible reserved offerings are available", func() { + // With the pessimistic nature of scheduling reservations, we'll only be able to provision one instance per loop if a + // nodeclaim is comaptible with both instance types + cloudProvider.ReservationManagerProvider.SetCapacity("r-small-instance-type", 1) + cloudProvider.ReservationManagerProvider.SetCapacity("r-medium-instance-type", 1) + ExpectApplied(ctx, env.Client, nodePool) + + pods := lo.Times(3, func(_ int) *corev1.Pod { + return test.UnschedulablePod(test.PodOptions{ + ResourceRequirements: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + // Ensures that this can fit on both small and medium, but two can't fit on medium + corev1.ResourceCPU: resource.MustParse("1800m"), + }, + }, + }) + }) + + // All pods won't be able to fit on a single small or medium instance, but we're not going to create a large instance + // since that would involve falling back to on-demand or spot. Instead, we'll schedule a single pod this loop. We + // can't schedule all three because we don't know what instance type will be selected in the launch flow, so the + // single nodeclaim reserves both the small and medium offerings. + bindings := ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...) + Expect(len(bindings)).To(Equal(1)) + node := lo.Values(bindings)[0].Node + Expect(node.Labels).To(HaveKeyWithValue(v1.CapacityTypeLabelKey, v1.CapacityTypeReserved)) + Expect(node.Labels).To(HaveKeyWithValue(corev1.LabelInstanceTypeStable, "small-instance-type")) + + pods = lo.Filter(pods, func(p *corev1.Pod, _ int) bool { + return bindings.Get(p) == nil + }) + + // Again, we'll only be able to schedule a single pod + bindings = ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...) + Expect(len(bindings)).To(Equal(1)) + node = lo.Values(bindings)[0].Node + Expect(node.Labels).To(HaveKeyWithValue(v1.CapacityTypeLabelKey, v1.CapacityTypeReserved)) + Expect(node.Labels).To(HaveKeyWithValue(corev1.LabelInstanceTypeStable, "medium-instance-type")) + + pods = lo.Filter(pods, func(p *corev1.Pod, _ int) bool { + return bindings.Get(p) == nil + }) + + bindings = ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pods...) + Expect(len(bindings)).To(Equal(1)) + node = lo.Values(bindings)[0].Node + Expect(node.Labels).To(HaveKeyWithValue(v1.CapacityTypeLabelKey, Not(Equal(v1.CapacityTypeReserved)))) + Expect(node.Labels).To(HaveKeyWithValue(corev1.LabelInstanceTypeStable, "small-instance-type")) + }) + // It("should fallback to on-demand or spot when no compatible reserved offerings are available", func() { + // + // }) + }) }) // nolint:gocyclo diff --git a/pkg/controllers/provisioning/suite_test.go b/pkg/controllers/provisioning/suite_test.go index 1174ef973d..111754c246 100644 --- a/pkg/controllers/provisioning/suite_test.go +++ b/pkg/controllers/provisioning/suite_test.go @@ -2259,12 +2259,12 @@ func AddInstanceResources(instanceTypes []*cloudprovider.InstanceType, resources price := fake.PriceFromResources(opts.Resources) opts.Offerings = []cloudprovider.Offering{ { + Available: true, Requirements: scheduling.NewLabelRequirements(map[string]string{ v1.CapacityTypeLabelKey: v1.CapacityTypeSpot, corev1.LabelTopologyZone: "test-zone-1", }), - Price: price, - Available: true, + Price: price, }, } diff --git a/pkg/operator/options/options.go b/pkg/operator/options/options.go index 49fec1c314..1cf68a5f34 100644 --- a/pkg/operator/options/options.go +++ b/pkg/operator/options/options.go @@ -43,6 +43,7 @@ type FeatureGates struct { SpotToSpotConsolidation bool NodeRepair bool + ReservedCapacity bool } // Options contains all CLI flags / env vars for karpenter-core. It adheres to the options.Injectable interface. @@ -98,7 +99,7 @@ func (o *Options) AddFlags(fs *FlagSet) { fs.StringVar(&o.LogErrorOutputPaths, "log-error-output-paths", env.WithDefaultString("LOG_ERROR_OUTPUT_PATHS", "stderr"), "Optional comma separated paths for logging error output") fs.DurationVar(&o.BatchMaxDuration, "batch-max-duration", env.WithDefaultDuration("BATCH_MAX_DURATION", 10*time.Second), "The maximum length of a batch window. The longer this is, the more pods we can consider for provisioning at one time which usually results in fewer but larger nodes.") fs.DurationVar(&o.BatchIdleDuration, "batch-idle-duration", env.WithDefaultDuration("BATCH_IDLE_DURATION", time.Second), "The maximum amount of time with no new pending pods that if exceeded ends the current batching window. If pods arrive faster than this time, the batching window will be extended up to the maxDuration. If they arrive slower, the pods will be batched separately.") - fs.StringVar(&o.FeatureGates.inputStr, "feature-gates", env.WithDefaultString("FEATURE_GATES", "NodeRepair=false,SpotToSpotConsolidation=false"), "Optional features can be enabled / disabled using feature gates. Current options are: SpotToSpotConsolidation") + fs.StringVar(&o.FeatureGates.inputStr, "feature-gates", env.WithDefaultString("FEATURE_GATES", "ReservedCapacity=false,NodeRepair=false,SpotToSpotConsolidation=false"), "Optional features can be enabled / disabled using feature gates. Current options are: ReservedCapacity, NodeRepair, and SpotToSpotConsolidation") } func (o *Options) Parse(fs *FlagSet, args ...string) error { diff --git a/pkg/test/v1alpha1/labels.go b/pkg/test/v1alpha1/labels.go new file mode 100644 index 0000000000..89bb1ca10c --- /dev/null +++ b/pkg/test/v1alpha1/labels.go @@ -0,0 +1,21 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +const ( + LabelReservationID = Group + "/reservation-id" +)