Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: shuffle disruption candidates by nodepool #2035

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 120 additions & 2 deletions pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package disruption
import (
"context"
"fmt"
"sort"
"time"

"sigs.k8s.io/controller-runtime/pkg/log"
Expand All @@ -33,10 +34,94 @@ const SingleNodeConsolidationType = "single"
// SingleNodeConsolidation is the consolidation controller that performs single-node consolidation.
type SingleNodeConsolidation struct {
consolidation
// nodePoolsTimedOut tracks which nodepools were not fully considered due to timeout
nodePoolsTimedOut map[string]bool
}

func NewSingleNodeConsolidation(consolidation consolidation) *SingleNodeConsolidation {
return &SingleNodeConsolidation{consolidation: consolidation}
return &SingleNodeConsolidation{
consolidation: consolidation,
nodePoolsTimedOut: make(map[string]bool),
}
}

func (s *SingleNodeConsolidation) groupCandidatesByNodePool(candidates []*Candidate) (map[string][]*Candidate, []string) {
nodePoolCandidates := make(map[string][]*Candidate)
nodePoolNames := []string{}

for _, candidate := range candidates {
nodePoolName := candidate.nodePool.Name
if _, exists := nodePoolCandidates[nodePoolName]; !exists {
nodePoolNames = append(nodePoolNames, nodePoolName)
}
nodePoolCandidates[nodePoolName] = append(nodePoolCandidates[nodePoolName], candidate)
}
return nodePoolCandidates, nodePoolNames
}

func (s *SingleNodeConsolidation) sortNodePoolsByTimeout(ctx context.Context, nodePoolNames []string) {
// Log the timed out nodepools that we're prioritizing
timedOutNodePools := []string{}
for np := range s.nodePoolsTimedOut {
timedOutNodePools = append(timedOutNodePools, np)
}
if len(timedOutNodePools) > 0 {
log.FromContext(ctx).V(1).Info("prioritizing nodepools that have not yet been considered due to timeouts in previous runs: %v", timedOutNodePools)
}

// Prioritize nodepools that timed out in previous runs
sort.Slice(nodePoolNames, func(i, j int) bool {
// If nodepool i timed out but j didn't, i comes first
if s.nodePoolsTimedOut[nodePoolNames[i]] && !s.nodePoolsTimedOut[nodePoolNames[j]] {
return true
}
// If nodepool j timed out but i didn't, j comes first
if !s.nodePoolsTimedOut[nodePoolNames[i]] && s.nodePoolsTimedOut[nodePoolNames[j]] {
return false
}
// If both or neither timed out, keep original order
return i < j
})
}

func (s *SingleNodeConsolidation) shuffleCandidates(nodePoolCandidates map[string][]*Candidate, nodePoolNames []string) []*Candidate {
result := make([]*Candidate, 0)
maxCandidatesPerNodePool := 0

// Find the maximum number of candidates in any nodepool
for _, nodePoolName := range nodePoolNames {
if len(nodePoolCandidates[nodePoolName]) > maxCandidatesPerNodePool {
maxCandidatesPerNodePool = len(nodePoolCandidates[nodePoolName])
}
}

// Interweave candidates from different nodepools
for i := range maxCandidatesPerNodePool {
for _, nodePoolName := range nodePoolNames {
if i < len(nodePoolCandidates[nodePoolName]) {
result = append(result, nodePoolCandidates[nodePoolName][i])
}
}
}

return result
}

// sortCandidates interweaves candidates from different nodepools and prioritizes nodepools
// that timed out in previous runs
func (s *SingleNodeConsolidation) sortCandidates(candidates []*Candidate) []*Candidate {
ctx := context.Background()

// First sort by disruption cost as the base ordering
sort.Slice(candidates, func(i int, j int) bool {
return candidates[i].disruptionCost < candidates[j].disruptionCost
})

nodePoolCandidates, nodePoolNames := s.groupCandidatesByNodePool(candidates)

s.sortNodePoolsByTimeout(ctx, nodePoolNames)

return s.shuffleCandidates(nodePoolCandidates, nodePoolNames)
}

// ComputeCommand generates a disruption command given candidates
Expand All @@ -53,8 +138,17 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
timeout := s.clock.Now().Add(SingleNodeConsolidationTimeoutDuration)
constrainedByBudgets := false

// binary search to find the maximum number of NodeClaims we can terminate
nodePoolsSeen := make(map[string]bool)

allNodePools := make(map[string]bool)
for _, candidate := range candidates {
allNodePools[candidate.nodePool.Name] = true
}

for i, candidate := range candidates {
// Track that we've considered this nodepool
nodePoolsSeen[candidate.nodePool.Name] = true

// If the disruption budget doesn't allow this candidate to be disrupted,
// continue to the next candidate. We don't need to decrement any budget
// counter since single node consolidation commands can only have one candidate.
Expand All @@ -71,6 +165,14 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
if s.clock.Now().After(timeout) {
ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: s.ConsolidationType()})
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i))

// Mark all nodepools that we haven't seen yet as timed out
for _, c := range candidates[i:] {
if !nodePoolsSeen[c.nodePool.Name] {
s.nodePoolsTimedOut[c.nodePool.Name] = true
}
}

return Command{}, scheduling.Results{}, nil
}
// compute a possible consolidation option
Expand All @@ -82,6 +184,7 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
if cmd.Decision() == NoOpDecision {
continue
}
// might have some edge cases where if there is an error, we should remove the nodepool from the list of "seen" nodepools
if err := v.IsValid(ctx, cmd, consolidationTTL); err != nil {
if IsValidationError(err) {
log.FromContext(ctx).V(1).WithValues(cmd.LogValues()...).Info("abandoning single-node consolidation attempt due to pod churn, command is no longer valid")
Expand All @@ -91,6 +194,21 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
}
return cmd, results, nil
}

// Check if we've considered all nodepools
allNodePoolsConsidered := true
for nodePool := range allNodePools {
if !nodePoolsSeen[nodePool] {
allNodePoolsConsidered = false
break
}
}

// If we've considered all nodepools, reset the timed out nodepools
if allNodePoolsConsidered {
s.nodePoolsTimedOut = make(map[string]bool)
}

if !constrainedByBudgets {
// if there are no candidates because of a budget, don't mark
// as consolidated, as it's possible it should be consolidatable
Expand Down