Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix cancel drop column ddl error #8545

Merged
merged 23 commits into from
Dec 28, 2018
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,91 @@ LOOP:
s.tk.MustExec("drop table test_drop_index")
}

// TestCancelDropColumn tests cancel ddl job which type is drop column.
func (s *testDBSuite) TestCancelDropColumn(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.mustExec(c, "use test_db")
s.mustExec(c, "create database if not exists test_drop_column")
s.mustExec(c, "drop table if exists t")
s.mustExec(c, "create table t(c1 int, c2 int)")
defer s.mustExec(c, "drop table t;")
testCases := []struct {
jobState model.JobState
JobSchemaState model.SchemaState
cancelSucc bool
}{
{model.JobStateNone, model.StateNone, true},
{model.JobStateRunning, model.StateWriteOnly, false},
{model.JobStateRunning, model.StateDeleteOnly, false},
{model.JobStateRunning, model.StateDeleteReorganization, false},
}
var checkErr error
oldReorgWaitTimeout := ddl.ReorgWaitTimeout
ddl.ReorgWaitTimeout = 50 * time.Millisecond
defer func() { ddl.ReorgWaitTimeout = oldReorgWaitTimeout }()
hook := &ddl.TestDDLCallback{}
var jobID int64
testCase := &testCases[0]
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionDropColumn && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState {
jobIDs := []int64{job.ID}
hookCtx := mock.NewContext()
hookCtx.Store = s.store
err := hookCtx.NewTxn(context.Background())
if err != nil {
checkErr = errors.Trace(err)
return
}
txn, err := hookCtx.Txn(true)
if err != nil {
checkErr = errors.Trace(err)
return
}
errs, err := admin.CancelJobs(txn, jobIDs)
if err != nil {
checkErr = errors.Trace(err)
return
}
if errs[0] != nil {
checkErr = errors.Trace(errs[0])
return
}
checkErr = txn.Commit(context.Background())
}
}
originalHook := s.dom.DDL().GetHook()
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
for i := range testCases {
testCase = &testCases[i]
s.mustExec(c, "alter table t add column c3 int")
rs, err := s.tk.Exec("alter table t drop column c3")
if rs != nil {
rs.Close()
}
var col1 *table.Column
t := s.testGetTable(c, "t")
for _, col := range t.Cols() {
if strings.EqualFold(col.Name.L, "c3") {
col1 = col
break
}
}
if testCase.cancelSucc {
c.Assert(checkErr, IsNil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
} else {
c.Assert(col1, IsNil)
c.Assert(err, IsNil)
c.Assert(checkErr, NotNil)
c.Assert(checkErr.Error(), Equals, admin.ErrCannotCancelDDLJob.GenWithStackByArgs(jobID).Error())
}
}
s.dom.DDL().(ddl.DDLForTest).SetHook(originalHook)
s.mustExec(c, "alter table t add column c3 int")
s.mustExec(c, "alter table t drop column c3")
}

func checkDelRangeDone(c *C, ctx sessionctx.Context, idx table.Index) {
startTime := time.Now()
f := func() map[int64]struct{} {
Expand Down
38 changes: 38 additions & 0 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,12 @@ func buildCancelJobTests(firstID int64) []testCancelJob {
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 7}, cancelRetErrs: noErrs, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionAddColumn, jobIDs: []int64{firstID + 8}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 8)}, cancelState: model.StatePublic, ddlRetErr: err},


{act: model.ActionDropColumn, jobIDs: []int64{firstID + 9}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 9)}, cancelState: model.StateDeleteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 10}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 10)}, cancelState: model.StateWriteOnly, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 11}, cancelRetErrs: []error{admin.ErrCancelFinishedDDLJob.GenWithStackByArgs(firstID + 11)}, cancelState: model.StateWriteReorganization, ddlRetErr: err},
{act: model.ActionDropColumn, jobIDs: []int64{firstID + 12}, cancelRetErrs: noErrs, cancelState: model.StatePublic, ddlRetErr: err},

// Test create table, watch out, table id will alloc a globalID.
{act: model.ActionCreateTable, jobIDs: []int64{firstID + 10}, cancelRetErrs: noErrs, cancelState: model.StateNone, ddlRetErr: err},
// Test create database, watch out, database id will alloc a globalID.
Expand Down Expand Up @@ -403,6 +409,18 @@ func (s *testDDLSuite) checkAddColumn(c *C, d *ddl, schemaID int64, tableID int6
c.Assert(found, Equals, success)
}

func (s *testDDLSuite) checkDropColumn(c *C, d *ddl, schemaID int64, tableID int64, colName string, success bool) {
changedTable := testGetTable(c, d, schemaID, tableID)
found := true
for _, colInfo := range changedTable.Meta().Columns {
if colInfo.Name.O == colName {
found = false
break
}
}
c.Assert(found, Equals, success)
}

func (s *testDDLSuite) TestCancelJob(c *C) {
store := testCreateStore(c, "test_cancel_job")
defer store.Close()
Expand Down Expand Up @@ -527,6 +545,25 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkAddColumn(c, d, dbInfo.ID, tblInfo.ID, addingColName, true)


// for drop column.
test = &tests[9]
dropColName := "c2"
dropColumnArgs := []interface{}{model.NewCIStr(dropColName)}
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionDropColumn, dropColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)

test = &tests[10]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionDropColumn, dropColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)

test = &tests[11]
doDDLJobErrWithSchemaState(ctx, d, c, dbInfo.ID, tblInfo.ID, model.ActionDropColumn, dropColumnArgs, &cancelState)
c.Check(errors.ErrorStack(checkErr), Equals, "")
s.checkDropColumn(c, d, dbInfo.ID, tblInfo.ID, dropColName, false)

// for create table
tblInfo1 := testTableInfo(c, d, "t1", 2)
test = &tests[8]
Expand All @@ -541,6 +578,7 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
c.Check(checkErr, IsNil)
testCheckSchemaState(c, d, dbInfo1, model.StateNone)


}

func (s *testDDLSuite) TestIgnorableSpec(c *C) {
Expand Down
38 changes: 38 additions & 0 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,42 @@ func rollingbackAddColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
return ver, errCancelledDDLJob
}

func rollingbackDropColumn(t *meta.Meta, job *model.Job) (ver int64, err error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
if err != nil {
return ver, errors.Trace(err)
}
var colName model.CIStr
err = job.DecodeArgs(&colName)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L)
if colInfo == nil {
job.State = model.JobStateCancelled
ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", colName)
}
originalState := colInfo.State
switch colInfo.State {
// In the state of drop column `public -> write only -> delete only -> reorganization`,
// We can not rollback now, so just continue to drop column.
case model.StatePublic, model.StateWriteOnly, model.StateDeleteOnly, model.StateDeleteReorganization:
job.State = model.JobStateRunning
return ver, nil
default:
err = ErrInvalidTableState.GenWithStack("invalid table state %v", tblInfo.State)
}
job.Args = []interface{}{colInfo.Name}
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateRollbackDone, model.StatePublic, ver, tblInfo)
return ver, errCancelledDDLJob
}

func rollingbackDropIndex(t *meta.Meta, job *model.Job) (ver int64, err error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfo(t, job, schemaID)
Expand Down Expand Up @@ -180,6 +216,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
switch job.Type {
case model.ActionAddColumn:
ver, err = rollingbackAddColumn(t, job)
case model.ActionDropColumn:
ver, err = rollingbackDropColumn(t, job)
case model.ActionAddIndex:
ver, err = rollingbackAddindex(w, d, t, job)
case model.ActionDropIndex:
Expand Down
6 changes: 6 additions & 0 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ func isJobRollbackable(job *model.Job, id int64) error {
job.SchemaState == model.StateDeleteReorganization {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
}
case model.ActionDropColumn:
if job.SchemaState == model.StatePublic || job.SchemaState == model.StateDeleteOnly ||
job.SchemaState == model.StateWriteOnly ||
job.SchemaState == model.StateDeleteReorganization {
return ErrCannotCancelDDLJob.GenWithStackByArgs(id)
}
case model.ActionDropSchema, model.ActionDropTable:
// To simplify the rollback logic, cannot be canceled in the following states.
if job.SchemaState == model.StateWriteOnly ||
Expand Down