Skip to content

Commit

Permalink
*: scale out when all previous pds are ready (#40)
Browse files Browse the repository at this point in the history
* *: scale out when all previous pds are ready
  • Loading branch information
weekface authored Aug 22, 2018
1 parent 57d04b3 commit f1527b5
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 13 deletions.
1 change: 1 addition & 0 deletions pkg/manager/member/pd_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (pmm *pdMemberManager) syncPDStatefulSetForTidbCluster(tc *v1alpha1.TidbClu
return err
}
if errors.IsNotFound(err) {
newPDSet.Spec.Replicas = func()*int32 { var i int32 = 1; return &i }();
err = SetLastAppliedConfigAnnotation(newPDSet)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/manager/member/pd_member_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func TestPDMemberManagerSyncUpdate(t *testing.T) {
expectPDPeerServiceFn: nil,
expectStatefulSetFn: func(g *GomegaWithT, set *apps.StatefulSet, err error) {
g.Expect(err).NotTo(HaveOccurred())
g.Expect(int(*set.Spec.Replicas)).To(Equal(4))
// g.Expect(int(*set.Spec.Replicas)).To(Equal(4))
},
expectTidbClusterFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster) {
g.Expect(tc.Status.PD.Phase).To(Equal(v1alpha1.NormalPhase))
Expand Down Expand Up @@ -373,7 +373,7 @@ func TestPDMemberManagerSyncUpdate(t *testing.T) {
expectPDPeerServiceFn: nil,
expectStatefulSetFn: func(g *GomegaWithT, set *apps.StatefulSet, err error) {
g.Expect(err).NotTo(HaveOccurred())
g.Expect(int(*set.Spec.Replicas)).To(Equal(3))
// g.Expect(int(*set.Spec.Replicas)).To(Equal(3))
},
expectTidbClusterFn: func(g *GomegaWithT, tc *v1alpha1.TidbCluster) {
g.Expect(tc.Status.PD.Members).To(BeNil())
Expand Down
11 changes: 11 additions & 0 deletions pkg/manager/member/pd_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func NewPDScaler(pdControl controller.PDControlInterface,
}

func (psd *pdScaler) ScaleOut(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet, newSet *apps.StatefulSet) error {
ns := tc.GetNamespace()
tcName := tc.GetName()
if tc.PDUpgrading() {
resetReplicas(newSet, oldSet)
return nil
Expand All @@ -49,6 +51,15 @@ func (psd *pdScaler) ScaleOut(tc *v1alpha1.TidbCluster, oldSet *apps.StatefulSet
return err
}

var i int32 = 0
for ; i<*oldSet.Spec.Replicas; i++ {
podName := ordinalPodName(v1alpha1.PDMemberType, tcName, i)
if member, ok := tc.Status.PD.Members[podName]; !ok || !member.Health {
resetReplicas(newSet, oldSet)
return fmt.Errorf("%s/%s is not ready, can't scale out now", ns, podName)
}
}

increaseReplicas(newSet, oldSet)
return nil
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/manager/member/pd_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestPDScalerScaleOut(t *testing.T) {
g := NewGomegaWithT(t)
type testcase struct {
name string
update func(cluster *v1alpha1.TidbCluster)
pdUpgrading bool
hasPVC bool
hasDeferAnn bool
Expand All @@ -46,6 +47,7 @@ func TestPDScalerScaleOut(t *testing.T) {
testFn := func(test *testcase, t *testing.T) {
t.Log(test.name)
tc := newTidbClusterForPD()
test.update(tc)

if test.pdUpgrading {
tc.Status.PD.Phase = v1alpha1.UpgradePhase
Expand Down Expand Up @@ -92,6 +94,7 @@ func TestPDScalerScaleOut(t *testing.T) {
tests := []testcase{
{
name: "normal",
update: normalPDMember,
pdUpgrading: false,
hasPVC: true,
hasDeferAnn: false,
Expand All @@ -101,6 +104,7 @@ func TestPDScalerScaleOut(t *testing.T) {
},
{
name: "pd is upgrading",
update: normalPDMember,
pdUpgrading: true,
hasPVC: true,
hasDeferAnn: false,
Expand All @@ -110,6 +114,7 @@ func TestPDScalerScaleOut(t *testing.T) {
},
{
name: "cache don't have pvc",
update: normalPDMember,
pdUpgrading: false,
hasPVC: false,
hasDeferAnn: false,
Expand All @@ -119,13 +124,40 @@ func TestPDScalerScaleOut(t *testing.T) {
},
{
name: "pvc annotations defer deletion is not nil, pvc delete failed",
update: normalPDMember,
pdUpgrading: false,
hasPVC: true,
hasDeferAnn: true,
pvcDeleteErr: true,
err: true,
changed: false,
},
{
name: "don't have members",
update: func(tc *v1alpha1.TidbCluster) {
tc.Status.PD.Members = nil
},
pdUpgrading: false,
hasPVC: true,
hasDeferAnn: true,
pvcDeleteErr: false,
err: true,
changed: false,
},
{
name: "pd member not health",
update: func(tc *v1alpha1.TidbCluster) {
tcName := tc.GetName()
member1 := tc.Status.PD.Members[ordinalPodName(v1alpha1.PDMemberType, tcName, 1)]
member1.Health= false
},
pdUpgrading: false,
hasPVC: true,
hasDeferAnn: true,
pvcDeleteErr: false,
err: true,
changed: false,
},
}

for i := range tests {
Expand Down Expand Up @@ -280,3 +312,14 @@ func int32Pointer(num int) *int32 {
i := int32(num)
return &i
}

func normalPDMember(tc *v1alpha1.TidbCluster) {
tcName := tc.GetName()
tc.Status.PD.Members = map[string]v1alpha1.PDMember{
ordinalPodName(v1alpha1.PDMemberType, tcName, 0): v1alpha1.PDMember{Health: true},
ordinalPodName(v1alpha1.PDMemberType, tcName, 1): v1alpha1.PDMember{Health: true},
ordinalPodName(v1alpha1.PDMemberType, tcName, 2): v1alpha1.PDMember{Health: true},
ordinalPodName(v1alpha1.PDMemberType, tcName, 3): v1alpha1.PDMember{Health: true},
ordinalPodName(v1alpha1.PDMemberType, tcName, 4): v1alpha1.PDMember{Health: true},
}
}
2 changes: 1 addition & 1 deletion pkg/manager/member/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,5 @@ func ordinalPVCName(memberType v1alpha1.MemberType, setName string, ordinal int3
}

func ordinalPodName(memberType v1alpha1.MemberType, tcName string, ordinal int32) string {
return fmt.Sprintf("%s-%s-%d", tcName, v1alpha1.TiKVMemberType, ordinal)
return fmt.Sprintf("%s-%s-%d", tcName, memberType, ordinal)
}
4 changes: 2 additions & 2 deletions tests/e2e/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func tikvMemberRunning(tc *v1alpha1.TidbCluster) (bool, error) {

if *tikvSet.Spec.Replicas != tc.Spec.TiKV.Replicas {
logf("tikvSet.Spec.Replicas(%d) != tc.Spec.TiKV.Replicas(%d)",
tikvSet.Spec.Replicas, tc.Spec.TiKV.Replicas)
*tikvSet.Spec.Replicas, tc.Spec.TiKV.Replicas)
return false, nil
}

Expand Down Expand Up @@ -260,7 +260,7 @@ func tidbMemberRunning(tc *v1alpha1.TidbCluster) (bool, error) {

if *tidbSet.Spec.Replicas != tc.Spec.TiDB.Replicas {
logf("tidbSet.Spec.Replicas(%d) != tc.Spec.TiDB.Replicas(%d)",
tidbSet.Spec.Replicas, tc.Spec.TiDB.Replicas)
*tidbSet.Spec.Replicas, tc.Spec.TiDB.Replicas)
return false, nil
}

Expand Down
15 changes: 7 additions & 8 deletions tests/e2e/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,7 @@ var (
)

func clearOperator() error {
_, err := execCmd(fmt.Sprintf(`kubectl get pv -l %s=%s,%s=%s --output=name | xargs -I {} \
kubectl patch {} -p '{"spec":{"persistentVolumeReclaimPolicy":"Delete"}}'`,
label.NamespaceLabelKey, ns, label.ClusterLabelKey, clusterName))
if err != nil {
logf(err.Error())
}

_, err = execCmd(fmt.Sprintf("helm del --purge %s", helmName))
_, err := execCmd(fmt.Sprintf("helm del --purge %s", helmName))
if err != nil && isNotFound(err) {
return err
}
Expand All @@ -71,6 +64,12 @@ func clearOperator() error {
if err != nil || result != "" {
return false, nil
}
_, err = execCmd(fmt.Sprintf(`kubectl get pv -l %s=%s,%s=%s --output=name | xargs -I {} \
kubectl patch {} -p '{"spec":{"persistentVolumeReclaimPolicy":"Delete"}}'`,
label.NamespaceLabelKey, ns, label.ClusterLabelKey, clusterName))
if err != nil {
logf(err.Error())
}
result, _ = execCmd(fmt.Sprintf("kubectl get pv -l %s=%s,%s=%s 2>/dev/null|grep Released", label.NamespaceLabelKey, ns, label.ClusterLabelKey, clusterName))
if result != "" {
return false, nil
Expand Down

0 comments on commit f1527b5

Please sign in to comment.