From ab2fd4d6a1f3a0dbb581961c4a38096d53b2fc8f Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Wed, 5 Jun 2019 16:07:49 +0800 Subject: [PATCH] store/tikv: support single statement rollback for pessimistic transaction (#10654) (#10721) --- executor/adapter.go | 54 +++++++++++-------- executor/errors.go | 2 + executor/executor.go | 32 +++++------- session/pessimistic_test.go | 35 +++++++++++++ session/tidb.go | 6 +-- store/mockstore/mocktikv/cluster.go | 35 ++++++++++++- store/mockstore/mocktikv/errors.go | 13 ++++- store/mockstore/mocktikv/mvcc.go | 16 +++--- store/mockstore/mocktikv/mvcc_leveldb.go | 66 +++++++++++++++++++++--- store/mockstore/mocktikv/rpc.go | 45 +++++++++++++++- store/tikv/2pc.go | 42 +++++++++++++-- store/tikv/backoff.go | 2 + store/tikv/error.go | 12 +++++ store/tikv/tikvrpc/tikvrpc.go | 25 ++++++++- store/tikv/txn.go | 61 +++++++++++++++++----- util/deadlock/deadlock.go | 6 +-- util/testkit/testkit.go | 11 ++++ 17 files changed, 379 insertions(+), 84 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index e172812bdda11..853eefe4e2e77 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -40,6 +40,7 @@ import ( "github.com/pingcap/tidb/plugin" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" @@ -427,40 +428,51 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E if err == nil { return nil, nil } - if !terror.ErrorEqual(kv.ErrWriteConflict, err) { + txnCtx := a.Ctx.GetSessionVars().TxnCtx + var newForUpdateTS uint64 + if deadlock, ok := errors.Cause(err).(*tikv.ErrDeadlock); ok { + if !deadlock.IsRetryable { + return nil, ErrDeadlock + } + logutil.Logger(ctx).Info("single statement deadlock, retry statement", + zap.Uint64("txn", txnCtx.StartTS), + zap.Uint64("lockTS", deadlock.LockTs), + zap.Binary("lockKey", deadlock.LockKey), + zap.Uint64("deadlockKeyHash", deadlock.DeadlockKeyHash)) + } else if terror.ErrorEqual(kv.ErrWriteConflict, err) { + conflictCommitTS := extractConflictCommitTS(err.Error()) + if conflictCommitTS == 0 { + logutil.Logger(ctx).Warn("failed to extract conflictCommitTS from a conflict error") + } + forUpdateTS := txnCtx.GetForUpdateTS() + logutil.Logger(ctx).Info("pessimistic write conflict, retry statement", + zap.Uint64("txn", txnCtx.StartTS), + zap.Uint64("forUpdateTS", forUpdateTS), + zap.Uint64("conflictCommitTS", conflictCommitTS)) + if conflictCommitTS > forUpdateTS { + newForUpdateTS = conflictCommitTS + } + } else { return nil, err } if a.retryCount >= config.GetGlobalConfig().PessimisticTxn.MaxRetryCount { return nil, errors.New("pessimistic lock retry limit reached") } a.retryCount++ - conflictCommitTS := extractConflictCommitTS(err.Error()) - if conflictCommitTS == 0 { - logutil.Logger(ctx).Warn("failed to extract conflictCommitTS from a conflict error") - } - sctx := a.Ctx - txnCtx := sctx.GetSessionVars().TxnCtx - forUpdateTS := txnCtx.GetForUpdateTS() - logutil.Logger(ctx).Info("pessimistic write conflict, retry statement", - zap.Uint64("txn", txnCtx.StartTS), - zap.Uint64("forUpdateTS", forUpdateTS), - zap.Uint64("conflictCommitTS", conflictCommitTS)) - if conflictCommitTS > txnCtx.GetForUpdateTS() { - txnCtx.SetForUpdateTS(conflictCommitTS) - } else { - ts, err1 := sctx.GetStore().GetOracle().GetTimestamp(ctx) - if err1 != nil { - return nil, err1 + if newForUpdateTS == 0 { + newForUpdateTS, err = a.Ctx.GetStore().GetOracle().GetTimestamp(ctx) + if err != nil { + return nil, err } - txnCtx.SetForUpdateTS(ts) } + txnCtx.SetForUpdateTS(newForUpdateTS) e, err := a.buildExecutor() if err != nil { return nil, err } // Rollback the statement change before retry it. - sctx.StmtRollback() - sctx.GetSessionVars().StmtCtx.ResetForRetry() + a.Ctx.StmtRollback() + a.Ctx.GetSessionVars().StmtCtx.ResetForRetry() if err = e.Open(ctx); err != nil { return nil, err diff --git a/executor/errors.go b/executor/errors.go index b7f8ce2ebf19f..c1d426ef71535 100644 --- a/executor/errors.go +++ b/executor/errors.go @@ -51,6 +51,7 @@ var ( ErrBadDB = terror.ClassExecutor.New(mysql.ErrBadDB, mysql.MySQLErrName[mysql.ErrBadDB]) ErrWrongObject = terror.ClassExecutor.New(mysql.ErrWrongObject, mysql.MySQLErrName[mysql.ErrWrongObject]) ErrRoleNotGranted = terror.ClassPrivilege.New(mysql.ErrRoleNotGranted, mysql.MySQLErrName[mysql.ErrRoleNotGranted]) + ErrDeadlock = terror.ClassExecutor.New(mysql.ErrLockDeadlock, mysql.MySQLErrName[mysql.ErrLockDeadlock]) ) func init() { @@ -67,6 +68,7 @@ func init() { mysql.ErrTableaccessDenied: mysql.ErrTableaccessDenied, mysql.ErrBadDB: mysql.ErrBadDB, mysql.ErrWrongObject: mysql.ErrWrongObject, + mysql.ErrLockDeadlock: mysql.ErrLockDeadlock, } terror.ErrClassToMySQLCodes[terror.ClassExecutor] = tableMySQLErrCodes } diff --git a/executor/executor.go b/executor/executor.go index bcc8465c38af5..2b5cbc8acc83c 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -637,6 +637,7 @@ type SelectLockExec struct { baseExecutor Lock ast.SelectLockType + keys []kv.Key } // Open implements the Executor Open interface. @@ -670,29 +671,24 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.RecordBatch) error if len(e.Schema().TblID2Handle) == 0 || e.Lock != ast.SelectLockForUpdate { return nil } + if req.NumRows() != 0 { + iter := chunk.NewIterator4Chunk(req.Chunk) + for id, cols := range e.Schema().TblID2Handle { + for _, col := range cols { + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(id, row.GetInt64(col.Index))) + } + } + } + return nil + } + // Lock keys only once when finished fetching all results. txn, err := e.ctx.Txn(true) if err != nil { return err } - keys := make([]kv.Key, 0, req.NumRows()) - iter := chunk.NewIterator4Chunk(req.Chunk) forUpdateTS := e.ctx.GetSessionVars().TxnCtx.GetForUpdateTS() - for id, cols := range e.Schema().TblID2Handle { - for _, col := range cols { - keys = keys[:0] - for row := iter.Begin(); row != iter.End(); row = iter.Next() { - keys = append(keys, tablecodec.EncodeRowKeyWithHandle(id, row.GetInt64(col.Index))) - } - if len(keys) == 0 { - continue - } - err = txn.LockKeys(ctx, forUpdateTS, keys...) - if err != nil { - return err - } - } - } - return nil + return txn.LockKeys(ctx, forUpdateTS, e.keys...) } // LimitExec represents limit executor diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 1cc2c9ddd4b56..5632fa62c5de2 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -27,6 +27,8 @@ import ( "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/mockstore/mocktikv" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" ) @@ -194,3 +196,36 @@ func (s *testPessimisticSuite) TestDeadlock(c *C) { c.Assert(int(e.Code()), Equals, mysql.ErrLockDeadlock) syncCh <- struct{}{} } + +func (s *testPessimisticSuite) TestSingleStatementRollback(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + + tk.MustExec("drop table if exists pessimistic") + tk.MustExec("create table single_statement (id int primary key, v int)") + tk.MustExec("insert into single_statement values (1, 1), (2, 1), (3, 1), (4, 1)") + tblID := tk.GetTableID("single_statement") + s.cluster.SplitTable(s.mvccStore, tblID, 2) + region1Key := codec.EncodeBytes(nil, tablecodec.EncodeRowKeyWithHandle(tblID, 1)) + region1, _ := s.cluster.GetRegionByKey(region1Key) + region1ID := region1.Id + region2Key := codec.EncodeBytes(nil, tablecodec.EncodeRowKeyWithHandle(tblID, 3)) + region2, _ := s.cluster.GetRegionByKey(region2Key) + region2ID := region2.Id + + syncCh := make(chan bool) + go func() { + tk2.MustExec("begin pessimistic") + <-syncCh + s.cluster.ScheduleDelay(tk2.Se.GetSessionVars().TxnCtx.StartTS, region2ID, time.Millisecond*3) + tk2.MustExec("update single_statement set v = v + 1") + tk2.MustExec("commit") + <-syncCh + }() + tk.MustExec("begin pessimistic") + syncCh <- true + s.cluster.ScheduleDelay(tk.Se.GetSessionVars().TxnCtx.StartTS, region1ID, time.Millisecond*3) + tk.MustExec("update single_statement set v = v + 1") + tk.MustExec("commit") + syncCh <- true +} diff --git a/session/tidb.go b/session/tidb.go index 0863a1fc3f63c..2618be8cf4b9e 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -163,9 +163,8 @@ func finishStmt(ctx context.Context, sctx sessionctx.Context, se *session, sessV if !sessVars.InTxn() { logutil.Logger(context.Background()).Info("rollbackTxn for ddl/autocommit error.") se.RollbackTxn(ctx) - } else if se.txn.Valid() && se.txn.IsPessimistic() && strings.Contains(meetsErr.Error(), "deadlock") { + } else if se.txn.Valid() && se.txn.IsPessimistic() && executor.ErrDeadlock.Equal(meetsErr) { logutil.Logger(context.Background()).Info("rollbackTxn for deadlock error", zap.Uint64("txn", se.txn.StartTS())) - meetsErr = errDeadlock se.RollbackTxn(ctx) } return meetsErr @@ -328,18 +327,15 @@ func IsQuery(sql string) bool { var ( errForUpdateCantRetry = terror.ClassSession.New(codeForUpdateCantRetry, mysql.MySQLErrName[mysql.ErrForUpdateCantRetry]) - errDeadlock = terror.ClassSession.New(codeDeadlock, mysql.MySQLErrName[mysql.ErrLockDeadlock]) ) const ( codeForUpdateCantRetry terror.ErrCode = mysql.ErrForUpdateCantRetry - codeDeadlock terror.ErrCode = mysql.ErrLockDeadlock ) func init() { sessionMySQLErrCodes := map[terror.ErrCode]uint16{ codeForUpdateCantRetry: mysql.ErrForUpdateCantRetry, - codeDeadlock: mysql.ErrLockDeadlock, } terror.ErrClassToMySQLCodes[terror.ClassSession] = sessionMySQLErrCodes } diff --git a/store/mockstore/mocktikv/cluster.go b/store/mockstore/mocktikv/cluster.go index 7e6c0270ebc4f..88b3195c8ffa4 100644 --- a/store/mockstore/mocktikv/cluster.go +++ b/store/mockstore/mocktikv/cluster.go @@ -18,6 +18,7 @@ import ( "context" "math" "sync" + "time" "github.com/golang/protobuf/proto" "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -41,14 +42,24 @@ type Cluster struct { id uint64 stores map[uint64]*Store regions map[uint64]*Region + + // delayEvents is used to control the execution sequence of rpc requests for test. + delayEvents map[delayKey]time.Duration + delayMu sync.Mutex +} + +type delayKey struct { + startTS uint64 + regionID uint64 } // NewCluster creates an empty cluster. It needs to be bootstrapped before // providing service. func NewCluster() *Cluster { return &Cluster{ - stores: make(map[uint64]*Store), - regions: make(map[uint64]*Region), + stores: make(map[uint64]*Store), + regions: make(map[uint64]*Region), + delayEvents: make(map[delayKey]time.Duration), } } @@ -347,6 +358,26 @@ func (c *Cluster) SplitKeys(mvccStore MVCCStore, start, end kv.Key, count int) { c.splitRange(mvccStore, NewMvccKey(start), NewMvccKey(end), count) } +// ScheduleDelay schedules a delay event for a transaction on a region. +func (c *Cluster) ScheduleDelay(startTS, regionID uint64, dur time.Duration) { + c.delayMu.Lock() + c.delayEvents[delayKey{startTS: startTS, regionID: regionID}] = dur + c.delayMu.Unlock() +} + +func (c *Cluster) handleDelay(startTS, regionID uint64) { + key := delayKey{startTS: startTS, regionID: regionID} + c.delayMu.Lock() + dur, ok := c.delayEvents[key] + if ok { + delete(c.delayEvents, key) + } + c.delayMu.Unlock() + if ok { + time.Sleep(dur) + } +} + func (c *Cluster) splitRange(mvccStore MVCCStore, start, end MvccKey, count int) { c.Lock() defer c.Unlock() diff --git a/store/mockstore/mocktikv/errors.go b/store/mockstore/mocktikv/errors.go index eb986eeb1e456..5f10ac74d7e6f 100644 --- a/store/mockstore/mocktikv/errors.go +++ b/store/mockstore/mocktikv/errors.go @@ -61,7 +61,7 @@ func (e ErrAlreadyCommitted) Error() string { return "txn already committed" } -// ErrConflict is turned when the commitTS of key in the DB is greater than startTS. +// ErrConflict is returned when the commitTS of key in the DB is greater than startTS. type ErrConflict struct { StartTS uint64 ConflictTS uint64 @@ -71,3 +71,14 @@ type ErrConflict struct { func (e *ErrConflict) Error() string { return "write conflict" } + +// ErrDeadlock is returned when deadlock error is detected. +type ErrDeadlock struct { + LockTS uint64 + LockKey []byte + DealockKeyHash uint64 +} + +func (e *ErrDeadlock) Error() string { + return "deadlock" +} diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index 9607819f018e9..06e9f04d9cfd6 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -42,11 +42,12 @@ type mvccValue struct { } type mvccLock struct { - startTS uint64 - primary []byte - value []byte - op kvrpcpb.Op - ttl uint64 + startTS uint64 + primary []byte + value []byte + op kvrpcpb.Op + ttl uint64 + forUpdateTS uint64 } type mvccEntry struct { @@ -66,6 +67,7 @@ func (l *mvccLock) MarshalBinary() ([]byte, error) { mh.WriteSlice(&buf, l.value) mh.WriteNumber(&buf, l.op) mh.WriteNumber(&buf, l.ttl) + mh.WriteNumber(&buf, l.forUpdateTS) return buf.Bytes(), errors.Trace(mh.err) } @@ -78,6 +80,7 @@ func (l *mvccLock) UnmarshalBinary(data []byte) error { mh.ReadSlice(buf, &l.value) mh.ReadNumber(buf, &l.op) mh.ReadNumber(buf, &l.ttl) + mh.ReadNumber(buf, &l.forUpdateTS) return errors.Trace(mh.err) } @@ -429,7 +432,8 @@ type MVCCStore interface { ReverseScan(startKey, endKey []byte, limit int, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair BatchGet(ks [][]byte, startTS uint64, isoLevel kvrpcpb.IsolationLevel) []Pair PessimisticLock(mutations []*kvrpcpb.Mutation, primary []byte, startTS, forUpdateTS uint64, ttl uint64) []error - Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, startTS uint64, ttl uint64) []error + PessimisticRollback(keys [][]byte, startTS, forUpdateTS uint64) []error + Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, startTS, ttl uint64) []error Commit(keys [][]byte, startTS, commitTS uint64) error Rollback(keys [][]byte, startTS uint64) error Cleanup(key []byte, startTS uint64) error diff --git a/store/mockstore/mocktikv/mvcc_leveldb.go b/store/mockstore/mocktikv/mvcc_leveldb.go index c13446d569c9a..b58c5f7a3cf31 100644 --- a/store/mockstore/mocktikv/mvcc_leveldb.go +++ b/store/mockstore/mocktikv/mvcc_leveldb.go @@ -520,7 +520,7 @@ func (mvcc *MVCCLevelDB) PessimisticLock(mutations []*kvrpcpb.Mutation, primary return errs } if err := mvcc.db.Write(batch, nil); err != nil { - return nil + return []error{err} } return errs @@ -544,7 +544,11 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation if dec.lock.startTS != startTS { errDeadlock := mvcc.deadlockDetector.Detect(startTS, dec.lock.startTS, farm.Fingerprint64(mutation.Key)) if errDeadlock != nil { - return errDeadlock + return &ErrDeadlock{ + LockKey: mutation.Key, + LockTS: dec.lock.startTS, + DealockKeyHash: errDeadlock.KeyHash, + } } return dec.lock.lockErr(mutation.Key) } @@ -555,10 +559,11 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation } lock := mvccLock{ - startTS: startTS, - primary: primary, - op: kvrpcpb.Op_PessimisticLock, - ttl: ttl, + startTS: startTS, + primary: primary, + op: kvrpcpb.Op_PessimisticLock, + ttl: ttl, + forUpdateTS: forUpdateTS, } writeKey := mvccEncode(mutation.Key, lockVer) writeValue, err := lock.MarshalBinary() @@ -570,6 +575,53 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation return nil } +// PessimisticRollback implements the MVCCStore interface. +func (mvcc *MVCCLevelDB) PessimisticRollback(keys [][]byte, startTS, forUpdateTS uint64) []error { + mvcc.mu.Lock() + defer mvcc.mu.Unlock() + + anyError := false + batch := &leveldb.Batch{} + errs := make([]error, 0, len(keys)) + for _, key := range keys { + err := pessimisticRollbackKey(mvcc.db, batch, key, startTS, forUpdateTS) + errs = append(errs, err) + if err != nil { + anyError = true + } + } + if anyError { + return errs + } + if err := mvcc.db.Write(batch, nil); err != nil { + return []error{err} + } + return errs +} + +func pessimisticRollbackKey(db *leveldb.DB, batch *leveldb.Batch, key []byte, startTS, forUpdateTS uint64) error { + startKey := mvccEncode(key, lockVer) + iter := newIterator(db, &util.Range{ + Start: startKey, + }) + defer iter.Release() + + dec := lockDecoder{ + expectKey: key, + } + ok, err := dec.Decode(iter) + if err != nil { + return errors.Trace(err) + } + if ok { + lock := dec.lock + if lock.op == kvrpcpb.Op_PessimisticLock && lock.startTS == startTS && lock.forUpdateTS <= forUpdateTS { + batch.Delete(startKey) + } + } + return nil +} + // Prewrite implements the MVCCStore interface. func (mvcc *MVCCLevelDB) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, startTS uint64, ttl uint64) []error { mvcc.mu.Lock() @@ -607,7 +659,7 @@ func (mvcc *MVCCLevelDB) Prewrite(mutations []*kvrpcpb.Mutation, primary []byte, return errs } if err := mvcc.db.Write(batch, nil); err != nil { - return nil + return []error{err} } return errs diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index 23d65f7267a85..078ea063b3025 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -76,6 +76,15 @@ func convertToKeyError(err error) *kvrpcpb.KeyError { }, } } + if dead, ok := errors.Cause(err).(*ErrDeadlock); ok { + return &kvrpcpb.KeyError{ + Deadlock: &kvrpcpb.Deadlock{ + LockTs: dead.LockTS, + LockKey: dead.LockKey, + DeadlockKeyHash: dead.DealockKeyHash, + }, + } + } if retryable, ok := errors.Cause(err).(ErrRetryable); ok { return &kvrpcpb.KeyError{ Retryable: retryable.Error(), @@ -292,15 +301,42 @@ func (h *rpcHandler) handleKvPrewrite(req *kvrpcpb.PrewriteRequest) *kvrpcpb.Pre func (h *rpcHandler) handleKvPessimisticLock(req *kvrpcpb.PessimisticLockRequest) *kvrpcpb.PessimisticLockResponse { for _, m := range req.Mutations { if !h.checkKeyInRegion(m.Key) { - panic("KvPrewrite: key not in region") + panic("KvPessimisticLock: key not in region") } } + startTS := req.StartVersion + regionID := req.Context.RegionId + h.cluster.handleDelay(startTS, regionID) errs := h.mvccStore.PessimisticLock(req.Mutations, req.PrimaryLock, req.GetStartVersion(), req.GetForUpdateTs(), req.GetLockTtl()) + + // TODO: remove this when implement sever side wait. + h.simulateServerSideWaitLock(errs) return &kvrpcpb.PessimisticLockResponse{ Errors: convertToKeyErrors(errs), } } +func (h *rpcHandler) simulateServerSideWaitLock(errs []error) { + for _, err := range errs { + if _, ok := err.(*ErrLocked); ok { + time.Sleep(time.Millisecond * 5) + break + } + } +} + +func (h *rpcHandler) handleKvPessimisticRollback(req *kvrpcpb.PessimisticRollbackRequest) *kvrpcpb.PessimisticRollbackResponse { + for _, key := range req.Keys { + if !h.checkKeyInRegion(key) { + panic("KvPessimisticRollback: key not in region") + } + } + errs := h.mvccStore.PessimisticRollback(req.Keys, req.StartVersion, req.ForUpdateTs) + return &kvrpcpb.PessimisticRollbackResponse{ + Errors: convertToKeyErrors(errs), + } +} + func (h *rpcHandler) handleKvCommit(req *kvrpcpb.CommitRequest) *kvrpcpb.CommitResponse { for _, k := range req.Keys { if !h.checkKeyInRegion(k) { @@ -663,6 +699,13 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R return resp, nil } resp.PessimisticLock = handler.handleKvPessimisticLock(r) + case tikvrpc.CmdPessimisticRollback: + r := req.PessimisticRollback + if err := handler.checkRequest(reqCtx, r.Size()); err != nil { + resp.PessimisticRollback = &kvrpcpb.PessimisticRollbackResponse{RegionError: err} + return resp, nil + } + resp.PessimisticRollback = handler.handleKvPessimisticRollback(r) case tikvrpc.CmdCommit: failpoint.Inject("rpcCommitResult", func(val failpoint.Value) { switch val.(string) { diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 1f1de71f66b1e..eb8cc64df7ee3 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -45,6 +45,7 @@ const ( actionCommit actionCleanup actionPessimisticLock + actionPessimisticRollback ) var ( @@ -67,6 +68,8 @@ func (ca twoPhaseCommitAction) String() string { return "cleanup" case actionPessimisticLock: return "pessimistic_lock" + case actionPessimisticRollback: + return "pessimistic_rollback" } return "unknown" } @@ -375,6 +378,8 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm singleBatchActionFunc = c.cleanupSingleBatch case actionPessimisticLock: singleBatchActionFunc = c.pessimisticLockSingleBatch + case actionPessimisticRollback: + singleBatchActionFunc = c.pessimisticRollbackSingleBatch } if len(batches) == 1 { e := singleBatchActionFunc(bo, batches[0]) @@ -618,7 +623,7 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc return errors.Trace(conditionPair.Err()) } if deadlock := keyErr.Deadlock; deadlock != nil { - return errors.New("deadlock") + return &ErrDeadlock{Deadlock: deadlock} } // Extract lock from key error @@ -628,16 +633,41 @@ func (c *twoPhaseCommitter) pessimisticLockSingleBatch(bo *Backoffer, batch batc } locks = append(locks, lock) } - msBeforeExpired, err := c.store.lockResolver.ResolveLocks(bo, locks) + _, err = c.store.lockResolver.ResolveLocks(bo, locks) if err != nil { return errors.Trace(err) } - if msBeforeExpired > 0 { - err = bo.BackoffWithMaxSleep(BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks))) + // Because we already waited on tikv, no need to Backoff here. + } +} + +func (c *twoPhaseCommitter) pessimisticRollbackSingleBatch(bo *Backoffer, batch batchKeys) error { + req := &tikvrpc.Request{ + Type: tikvrpc.CmdPessimisticRollback, + PessimisticRollback: &pb.PessimisticRollbackRequest{ + StartVersion: c.startTS, + ForUpdateTs: c.forUpdateTS, + Keys: batch.keys, + }, + } + for { + resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort) + if err != nil { + return errors.Trace(err) + } + regionErr, err := resp.GetRegionError() + if err != nil { + return errors.Trace(err) + } + if regionErr != nil { + err = bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) if err != nil { return errors.Trace(err) } + err = c.pessimisticRollbackKeys(bo, batch.keys) + return errors.Trace(err) } + return nil } } @@ -811,6 +841,10 @@ func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, keys [][]byte) er return c.doActionOnKeys(bo, actionPessimisticLock, keys) } +func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte) error { + return c.doActionOnKeys(bo, actionPessimisticRollback, keys) +} + func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error { err := c.execute(ctx) if err != nil { diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index be374e3fc5145..cd555eb4a2a08 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -214,6 +214,8 @@ const ( scatterRegionBackoff = 20000 waitScatterRegionFinishBackoff = 120000 locateRegionMaxBackoff = 20000 + pessimisticLockMaxBackoff = 10000 + pessimisticRollbackMaxBackoff = 10000 ) // CommitMaxBackoff is max sleep time of the 'commit' command diff --git a/store/tikv/error.go b/store/tikv/error.go index 357aa0f76cb07..5e775218a9f5e 100644 --- a/store/tikv/error.go +++ b/store/tikv/error.go @@ -15,6 +15,7 @@ package tikv import ( "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" ) @@ -39,6 +40,17 @@ var ( ErrGCTooEarly = terror.ClassTiKV.New(mysql.ErrGCTooEarly, mysql.MySQLErrName[mysql.ErrGCTooEarly]) ) +// ErrDeadlock wraps *kvrpcpb.Deadlock to implement the error interface. +// It also marks if the deadlock is retryable. +type ErrDeadlock struct { + *kvrpcpb.Deadlock + IsRetryable bool +} + +func (d *ErrDeadlock) Error() string { + return d.Deadlock.String() +} + func init() { tikvMySQLErrCodes := map[terror.ErrCode]uint16{ mysql.ErrTiKVServerTimeout: mysql.ErrTiKVServerTimeout, diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index e62a153410a61..8ee060fdd49b4 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -45,6 +45,7 @@ const ( CmdGC CmdDeleteRange CmdPessimisticLock + CmdPessimisticRollback CmdRawGet CmdType = 256 + iota CmdRawBatchGet @@ -77,6 +78,8 @@ func (t CmdType) String() string { return "Prewrite" case CmdPessimisticLock: return "PessimisticLock" + case CmdPessimisticRollback: + return "PessimisticRollback" case CmdCommit: return "Commit" case CmdCleanup: @@ -134,7 +137,6 @@ type Request struct { Get *kvrpcpb.GetRequest Scan *kvrpcpb.ScanRequest Prewrite *kvrpcpb.PrewriteRequest - PessimisticLock *kvrpcpb.PessimisticLockRequest Commit *kvrpcpb.CommitRequest Cleanup *kvrpcpb.CleanupRequest BatchGet *kvrpcpb.BatchGetRequest @@ -157,6 +159,9 @@ type Request struct { MvccGetByStartTs *kvrpcpb.MvccGetByStartTsRequest SplitRegion *kvrpcpb.SplitRegionRequest + PessimisticLock *kvrpcpb.PessimisticLockRequest + PessimisticRollback *kvrpcpb.PessimisticRollbackRequest + DebugGetRegionProperties *debugpb.GetRegionPropertiesRequest } @@ -205,6 +210,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_Coprocessor{Coprocessor: req.Cop}} case CmdPessimisticLock: return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_PessimisticLock{PessimisticLock: req.PessimisticLock}} + case CmdPessimisticRollback: + return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_PessimisticRollback{PessimisticRollback: req.PessimisticRollback}} } return nil } @@ -224,7 +231,6 @@ type Response struct { Get *kvrpcpb.GetResponse Scan *kvrpcpb.ScanResponse Prewrite *kvrpcpb.PrewriteResponse - PessimisticLock *kvrpcpb.PessimisticLockResponse Commit *kvrpcpb.CommitResponse Cleanup *kvrpcpb.CleanupResponse BatchGet *kvrpcpb.BatchGetResponse @@ -248,6 +254,9 @@ type Response struct { MvccGetByStartTS *kvrpcpb.MvccGetByStartTsResponse SplitRegion *kvrpcpb.SplitRegionResponse + PessimisticLock *kvrpcpb.PessimisticLockResponse + PessimisticRollback *kvrpcpb.PessimisticRollbackResponse + DebugGetRegionProperties *debugpb.GetRegionPropertiesResponse } @@ -296,6 +305,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) *Resp return &Response{Type: CmdCop, Cop: res.Coprocessor} case *tikvpb.BatchCommandsResponse_Response_PessimisticLock: return &Response{Type: CmdPessimisticLock, PessimisticLock: res.PessimisticLock} + case *tikvpb.BatchCommandsResponse_Response_PessimisticRollback: + return &Response{Type: CmdPessimisticRollback, PessimisticRollback: res.PessimisticRollback} } return nil } @@ -326,6 +337,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { req.Prewrite.Context = ctx case CmdPessimisticLock: req.PessimisticLock.Context = ctx + case CmdPessimisticRollback: + req.PessimisticRollback.Context = ctx case CmdCommit: req.Commit.Context = ctx case CmdCleanup: @@ -398,6 +411,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) { resp.PessimisticLock = &kvrpcpb.PessimisticLockResponse{ RegionError: e, } + case CmdPessimisticRollback: + resp.PessimisticRollback = &kvrpcpb.PessimisticRollbackResponse{ + RegionError: e, + } case CmdCommit: resp.Commit = &kvrpcpb.CommitResponse{ RegionError: e, @@ -504,6 +521,8 @@ func (resp *Response) GetRegionError() (*errorpb.Error, error) { e = resp.Scan.GetRegionError() case CmdPessimisticLock: e = resp.PessimisticLock.GetRegionError() + case CmdPessimisticRollback: + e = resp.PessimisticRollback.GetRegionError() case CmdPrewrite: e = resp.Prewrite.GetRegionError() case CmdCommit: @@ -572,6 +591,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.Prewrite, err = client.KvPrewrite(ctx, req.Prewrite) case CmdPessimisticLock: resp.PessimisticLock, err = client.KvPessimisticLock(ctx, req.PessimisticLock) + case CmdPessimisticRollback: + resp.PessimisticRollback, err = client.KVPessimisticRollback(ctx, req.PessimisticRollback) case CmdCommit: resp.Commit, err = client.KvCommit(ctx, req.Commit) case CmdCleanup: diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 3f2faccaef183..c6dbd2fe12e10 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/dgryski/go-farm" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/kv" @@ -319,17 +320,16 @@ func (txn *tikvTxn) close() { } func (txn *tikvTxn) Rollback() error { + if !txn.valid { + return kv.ErrInvalidTxn + } // Clean up pessimistic lock. if txn.IsPessimistic() && txn.committer != nil { - err := txn.rollbackPessimisticLock() + err := txn.rollbackPessimisticLocks() if err != nil { logutil.Logger(context.Background()).Error(err.Error()) } } - - if !txn.valid { - return kv.ErrInvalidTxn - } txn.close() logutil.Logger(context.Background()).Debug("[kv] rollback txn", zap.Uint64("txnStartTS", txn.StartTS())) tikvTxnCmdCountWithRollback.Inc() @@ -337,16 +337,11 @@ func (txn *tikvTxn) Rollback() error { return nil } -func (txn *tikvTxn) rollbackPessimisticLock() error { - c := txn.committer - if err := c.initKeysAndMutations(); err != nil { - return errors.Trace(err) - } - if len(c.keys) == 0 { +func (txn *tikvTxn) rollbackPessimisticLocks() error { + if len(txn.lockKeys) == 0 { return nil } - - return c.cleanupKeys(NewBackoffer(context.Background(), cleanupMaxBackoff), c.keys) + return txn.committer.pessimisticRollbackKeys(NewBackoffer(context.Background(), cleanupMaxBackoff), txn.lockKeys) } func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keys ...kv.Key) error { @@ -370,7 +365,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keys ...kv txn.committer.primaryKey = keys[0] } - bo := NewBackoffer(ctx, prewriteMaxBackoff).WithVars(txn.vars) + bo := NewBackoffer(ctx, pessimisticLockMaxBackoff).WithVars(txn.vars) keys1 := make([][]byte, len(keys)) for i, key := range keys { keys1[i] = key @@ -381,6 +376,14 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keys ...kv txn.committer.isFirstLock = len(txn.lockKeys) == 0 && len(keys1) == 1 err := txn.committer.pessimisticLockKeys(bo, keys1) if err != nil { + wg := txn.asyncPessimisticRollback(ctx, keys1) + if dl, ok := errors.Cause(err).(*ErrDeadlock); ok && hashInKeys(dl.DeadlockKeyHash, keys) { + dl.IsRetryable = true + // Wait for the pessimistic rollback to finish before we retry the statement. + wg.Wait() + // Sleep a little, wait for the other transaction that blocked by this transaction to acquire the lock. + time.Sleep(time.Millisecond * 5) + } return err } } @@ -393,6 +396,36 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, forUpdateTS uint64, keys ...kv return nil } +func (txn *tikvTxn) asyncPessimisticRollback(ctx context.Context, keys [][]byte) *sync.WaitGroup { + // Clone a new committer for execute in background. + committer := &twoPhaseCommitter{ + store: txn.committer.store, + connID: txn.committer.connID, + startTS: txn.committer.startTS, + forUpdateTS: txn.committer.forUpdateTS, + primaryKey: txn.committer.primaryKey, + } + wg := new(sync.WaitGroup) + wg.Add(1) + go func() { + err := committer.pessimisticRollbackKeys(NewBackoffer(ctx, pessimisticRollbackMaxBackoff), keys) + if err != nil { + logutil.Logger(ctx).Warn("[kv] pessimisticRollback failed.", zap.Error(err)) + } + wg.Done() + }() + return wg +} + +func hashInKeys(deadlockKeyHash uint64, keys []kv.Key) bool { + for _, key := range keys { + if farm.Fingerprint64(key) == deadlockKeyHash { + return true + } + } + return false +} + func (txn *tikvTxn) IsReadOnly() bool { return !txn.dirty } diff --git a/util/deadlock/deadlock.go b/util/deadlock/deadlock.go index 921c3aa542807..5f0d781427f4f 100644 --- a/util/deadlock/deadlock.go +++ b/util/deadlock/deadlock.go @@ -42,11 +42,11 @@ func NewDetector() *Detector { // ErrDeadlock is returned when deadlock is detected. type ErrDeadlock struct { - keyHash uint64 + KeyHash uint64 } func (e *ErrDeadlock) Error() string { - return fmt.Sprintf("deadlock(%d)", e.keyHash) + return fmt.Sprintf("deadlock(%d)", e.KeyHash) } // Detect detects deadlock for the sourceTxn on a locked key. @@ -67,7 +67,7 @@ func (d *Detector) doDetect(sourceTxn, waitForTxn uint64) *ErrDeadlock { } for _, nextTarget := range list.txns { if nextTarget.txn == sourceTxn { - return &ErrDeadlock{keyHash: nextTarget.keyHash} + return &ErrDeadlock{KeyHash: nextTarget.keyHash} } if err := d.doDetect(sourceTxn, nextTarget.txn); err != nil { return err diff --git a/util/testkit/testkit.go b/util/testkit/testkit.go index c9e58d0cf9f58..6b93d900409f8 100644 --- a/util/testkit/testkit.go +++ b/util/testkit/testkit.go @@ -23,6 +23,8 @@ import ( "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/util/sqlexec" @@ -262,3 +264,12 @@ func (tk *TestKit) ResultSetToResultWithCtx(ctx context.Context, rs sqlexec.Reco func Rows(args ...string) [][]interface{} { return testutil.RowsWithSep(" ", args...) } + +// GetTableID gets table ID by name. +func (tk *TestKit) GetTableID(tableName string) int64 { + dom := domain.GetDomain(tk.Se) + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr(tableName)) + tk.c.Assert(err, check.IsNil) + return tbl.Meta().ID +}