Skip to content

Commit

Permalink
Fix tiflash's rolling update (pingcap#151)
Browse files Browse the repository at this point in the history
  • Loading branch information
fgksgf authored Nov 20, 2024
1 parent a0edfb2 commit 4e6cc71
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 45 deletions.
4 changes: 2 additions & 2 deletions pkg/controllers/tiflash/tasks/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func (t *TaskStatus) Sync(ctx task.Context[ReconcileContext]) task.Result {
rtx.Healthy = false
} else if statefulset.IsPodRunningAndReady(&pod) && !rtx.PodTerminating && rtx.StoreState == v1alpha1.StoreStateServing {
rtx.Healthy = true
if rtx.TiFlash.Status.CurrentRevision != rtx.TiFlash.Labels[appsv1.ControllerRevisionHashLabelKey] {
rtx.TiFlash.Status.CurrentRevision = rtx.TiFlash.Labels[appsv1.ControllerRevisionHashLabelKey]
if rtx.TiFlash.Status.CurrentRevision != pod.Labels[appsv1.ControllerRevisionHashLabelKey] {
rtx.TiFlash.Status.CurrentRevision = pod.Labels[appsv1.ControllerRevisionHashLabelKey]
needUpdate = true
}
} else {
Expand Down
9 changes: 1 addition & 8 deletions pkg/controllers/tiflashgroup/tasks/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,7 @@ func (t *TaskUpdater) Sync(ctx task.Context[ReconcileContext]) task.Result {

update, outdated, needUpdate := action.NeedUpdate(t.Logger, rtx.Peers, updateRevision.Name)
if needUpdate {
updateName, outdatedName := []string{}, []string{}
for _, n := range update {
updateName = append(updateName, n.Name)
}
for _, n := range outdated {
outdatedName = append(outdatedName, n.Name)
}
t.Logger.Info("TiFlashGroup needs to be updated", "update", updateName, "outdated", outdatedName)
t.Logger.Info("TiFlashGroup needs to be updated", "update", len(update), "outdated", len(outdated))
outdatedSet := NewTiFlashSet(t.Client, rtx.TiFlashGroup, outdated, rtx.UpdateRevision)
updateSet := NewTiFlashSet(t.Client, rtx.TiFlashGroup, update, rtx.UpdateRevision)
updater := action.NewUpdater(updateSet, outdatedSet)
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/tiflashgroup/tasks/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func NewTiFlashSet(c client.Client, flashg *v1alpha1.TiFlashGroup, tiflashes []*
c: c,
flashg: flashg,
nameToTiFlash: make(map[string]*v1alpha1.TiFlash, len(tiflashes)),
rev: rev,
}
for _, p := range flashg.Spec.SchedulePolicies {
switch p.Type {
Expand Down
14 changes: 10 additions & 4 deletions tests/e2e/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import (
)

const (
createClusterTimeout = 15 * time.Minute
createClusterTimeout = 10 * time.Minute
createClusterPolling = 5 * time.Second

deleteClusterTimeout = 10 * time.Minute
Expand Down Expand Up @@ -585,7 +585,7 @@ var _ = Describe("TiDB Cluster", func() {
pdg := data.NewPDGroup(ns.Name, "pd", tc.Name, ptr.To(int32(3)), nil)
kvg := data.NewTiKVGroup(ns.Name, "tikv", tc.Name, ptr.To(int32(3)), nil)
dbg := data.NewTiDBGroup(ns.Name, "tidb", tc.Name, ptr.To(int32(1)), nil)
flashg := data.NewTiFlashGroup(ns.Name, "flash", tc.Name, ptr.To(int32(3)), nil)
flashg := data.NewTiFlashGroup(ns.Name, "flash", tc.Name, ptr.To(int32(2)), nil)
Expect(k8sClient.Create(ctx, pdg)).To(Succeed())
Expect(k8sClient.Create(ctx, kvg)).To(Succeed())
Expect(k8sClient.Create(ctx, dbg)).To(Succeed())
Expand All @@ -595,7 +595,6 @@ var _ = Describe("TiDB Cluster", func() {
Eventually(func(g Gomega) {
_, ready := utiltidb.IsClusterReady(k8sClient, tc.Name, tc.Namespace)
g.Expect(ready).To(BeTrue())

g.Expect(utiltidb.AreAllInstancesReady(k8sClient, pdg,
[]*v1alpha1.TiKVGroup{kvg}, []*v1alpha1.TiDBGroup{dbg}, []*v1alpha1.TiFlashGroup{flashg})).To(Succeed())
}).WithTimeout(createClusterTimeout).WithPolling(createClusterPolling).Should(Succeed())
Expand Down Expand Up @@ -626,8 +625,15 @@ var _ = Describe("TiDB Cluster", func() {
if !isPod {
continue
}
var t time.Time
switch event.Type {
case watch.Added:
t = pod.CreationTimestamp.Time
case watch.Deleted:
t = pod.DeletionTimestamp.Time
}
if event.Type == watch.Added || event.Type == watch.Deleted {
GinkgoWriter.Printf("Pod %s/%s event type: %v at %v\n", pod.Namespace, pod.Name, event.Type, time.Now())
GinkgoWriter.Printf("Pod %s/%s was %s at %v\n", pod.Namespace, pod.Name, string(event.Type), t)
eventSlice = append(eventSlice, event)
}
}
Expand Down
28 changes: 25 additions & 3 deletions tests/e2e/utils/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package k8s

import (
"os"
"slices"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
Expand All @@ -36,9 +38,8 @@ func LoadConfig() (*rest.Config, error) {

// CheckRollingRestartLogic checks if the rolling restart logic is correctly followed
// based on the provided list of Kubernetes watch events.
//
// The function first checks for pod creation events and then verifies the restart
// events to ensure that the rolling restart logic is adhered to.
// The function first sort events by time, then checks for pod creation events and
// verifies the restart events to ensure that the rolling restart logic is adhered to.
// An example of a rolling restart event sequence is like:
// 1. pod1 added
// 2. pod2 added
Expand All @@ -51,6 +52,17 @@ func CheckRollingRestartLogic(events []watch.Event) bool {
return false
}

// Sort eventsBeforeShuffle by time, since the eventsBeforeShuffle are not guaranteed to be in order
slices.SortFunc(events, func(e1, e2 watch.Event) int {
t1, t2 := getTimeFromEvent(e1), getTimeFromEvent(e2)
if t1.Before(t2) {
return -1
} else if t1.After(t2) {
return 1
}
return 0
})

podCreated := make(map[string]bool)
restartEvents := make([]watch.Event, 0, len(events))

Expand Down Expand Up @@ -99,3 +111,13 @@ func CheckRollingRestartLogic(events []watch.Event) bool {
}
return true
}

func getTimeFromEvent(e watch.Event) time.Time {
switch e.Type {
case watch.Added:
return e.Object.(*corev1.Pod).CreationTimestamp.Time
case watch.Deleted:
return e.Object.(*corev1.Pod).DeletionTimestamp.Time
}
return time.Time{}
}
90 changes: 62 additions & 28 deletions tests/e2e/utils/k8s/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,74 +15,108 @@
package k8s

import (
"math/rand"
"testing"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)

func makeTestEvent(podName string, t watch.EventType) watch.Event {
func makeTestEvent(podName string, t watch.EventType, time time.Time) watch.Event {
return watch.Event{
Type: t,
Object: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Name: podName,
CreationTimestamp: metav1.Time{Time: time},
DeletionTimestamp: &metav1.Time{Time: time},
},
},
}
}

const fixedUnixTime = 1257894000

func getTestTime(s int) time.Time {
return time.Unix(fixedUnixTime+int64(s), 0)
}

func shuffleEvents(events []watch.Event) []watch.Event {
for i := range events {
j := rand.Intn(i + 1)
events[i], events[j] = events[j], events[i]
}
return events
}

func TestCheckRollingRestartLogic(t *testing.T) {
tests := []struct {
name string
events []watch.Event
want bool
name string
eventsBeforeShuffle []watch.Event
want bool
}{
{
name: "happy path",
events: []watch.Event{
makeTestEvent("pod1", watch.Added),
makeTestEvent("pod2", watch.Added),
makeTestEvent("pod3", watch.Added),
makeTestEvent("pod2", watch.Deleted),
makeTestEvent("pod2", watch.Added),
makeTestEvent("pod3", watch.Deleted),
makeTestEvent("pod3", watch.Added),
makeTestEvent("pod1", watch.Deleted),
makeTestEvent("pod1", watch.Added),
eventsBeforeShuffle: []watch.Event{
makeTestEvent("pod1", watch.Added, getTestTime(1)),
makeTestEvent("pod2", watch.Added, getTestTime(2)),
makeTestEvent("pod3", watch.Added, getTestTime(3)),

makeTestEvent("pod2", watch.Deleted, getTestTime(4)),
makeTestEvent("pod2", watch.Added, getTestTime(5)),
makeTestEvent("pod3", watch.Deleted, getTestTime(6)),
makeTestEvent("pod3", watch.Added, getTestTime(7)),
makeTestEvent("pod1", watch.Deleted, getTestTime(8)),
makeTestEvent("pod1", watch.Added, getTestTime(9)),
},
want: true,
},
{
name: "Empty events",
events: []watch.Event{},
want: false,
name: "Empty events",
eventsBeforeShuffle: []watch.Event{},
want: false,
},
{
name: "only add events",
events: []watch.Event{
makeTestEvent("pod1", watch.Added),
makeTestEvent("pod2", watch.Added),
makeTestEvent("pod3", watch.Added),
eventsBeforeShuffle: []watch.Event{
makeTestEvent("pod1", watch.Added, getTestTime(1)),
makeTestEvent("pod2", watch.Added, getTestTime(2)),
makeTestEvent("pod3", watch.Added, getTestTime(3)),
},
want: false,
},
{
name: "Alternating delete and add events",
events: []watch.Event{
makeTestEvent("pod1", watch.Deleted),
makeTestEvent("pod1", watch.Added),
makeTestEvent("pod2", watch.Deleted),
makeTestEvent("pod2", watch.Added),
eventsBeforeShuffle: []watch.Event{
makeTestEvent("pod1", watch.Added, getTestTime(1)),
makeTestEvent("pod1", watch.Deleted, getTestTime(2)),
makeTestEvent("pod2", watch.Added, getTestTime(3)),
makeTestEvent("pod2", watch.Deleted, getTestTime(4)),
},
want: false,
},
{
name: "two pods are deleted at the same time",
eventsBeforeShuffle: []watch.Event{
makeTestEvent("pod2", watch.Added, getTestTime(1)),
makeTestEvent("pod1", watch.Added, getTestTime(2)),
makeTestEvent("pod3", watch.Added, getTestTime(3)),
makeTestEvent("pod1", watch.Deleted, getTestTime(4)),
makeTestEvent("pod3", watch.Deleted, getTestTime(4)),
makeTestEvent("pod1", watch.Added, getTestTime(5)),
makeTestEvent("pod3", watch.Added, getTestTime(5)),
makeTestEvent("pod2", watch.Deleted, getTestTime(6)),
makeTestEvent("pod2", watch.Added, getTestTime(7)),
},
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := CheckRollingRestartLogic(tt.events); got != tt.want {
if got := CheckRollingRestartLogic(shuffleEvents(tt.eventsBeforeShuffle)); got != tt.want {
t.Errorf("CheckRollingRestartLogic() = %v, want %v", got, tt.want)
}
})
Expand Down
3 changes: 3 additions & 0 deletions tests/e2e/utils/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ func AreAllPDHealthy(cli client.Client, pdg *v1alpha1.PDGroup) error {
return fmt.Errorf("pd %s/%s replicas %d not equal to %d", pdg.Namespace, pdg.Name, len(pdList.Items), *pdg.Spec.Replicas)
}
for _, pd := range pdList.Items {
if !meta.IsStatusConditionPresentAndEqual(pd.Status.Conditions, v1alpha1.PDCondInitialized, metav1.ConditionTrue) {
return fmt.Errorf("pd %s/%s is not initialized", pd.Namespace, pd.Name)
}
if !meta.IsStatusConditionPresentAndEqual(pd.Status.Conditions, v1alpha1.PDCondHealth, metav1.ConditionTrue) {
return fmt.Errorf("pd %s/%s is not healthy", pd.Namespace, pd.Name)
}
Expand Down

0 comments on commit 4e6cc71

Please sign in to comment.