From 05cf9ea9b3c2578b8cba1f15468d73b974bf3519 Mon Sep 17 00:00:00 2001 From: Jesse Hallam Date: Fri, 21 Feb 2020 08:31:42 -0500 Subject: [PATCH 1/9] refactor mock plugin api --- cluster/job_example_test.go | 8 ++- cluster/job_test.go | 18 ++----- cluster/mock_plugin_api_test.go | 87 ++++++++++++++++++++++++++++++ cluster/mutex_test.go | 93 +++------------------------------ 4 files changed, 103 insertions(+), 103 deletions(-) create mode 100644 cluster/mock_plugin_api_test.go diff --git a/cluster/job_example_test.go b/cluster/job_example_test.go index 13593b5..585a792 100644 --- a/cluster/job_example_test.go +++ b/cluster/job_example_test.go @@ -1,10 +1,14 @@ package cluster -import "time" +import ( + "time" + + "github.com/mattermost/mattermost-server/v5/plugin" +) func ExampleSchedule() { // Use p.API from your plugin instead. - pluginAPI := NewMockMutexPluginAPI(nil) + pluginAPI := plugin.API(nil) callback := func() { // periodic work to do diff --git a/cluster/job_test.go b/cluster/job_test.go index 5aa955a..bc41d2d 100644 --- a/cluster/job_test.go +++ b/cluster/job_test.go @@ -10,19 +10,9 @@ import ( "github.com/stretchr/testify/require" ) -type MockJobPluginAPI struct { - *MockMutexPluginAPI -} - -func NewMockJobPluginAPI(t *testing.T) *MockJobPluginAPI { - return &MockJobPluginAPI{ - MockMutexPluginAPI: NewMockMutexPluginAPI(t), - } -} - func TestSchedule(t *testing.T) { t.Run("invalid interval", func(t *testing.T) { - mockPluginAPI := NewMockJobPluginAPI(t) + mockPluginAPI := newMockPluginAPI(t) job, err := Schedule(mockPluginAPI, "key", JobConfig{}, func() {}) require.Error(t, err, "must specify non-zero job config interval") @@ -30,7 +20,7 @@ func TestSchedule(t *testing.T) { }) t.Run("single-threaded", func(t *testing.T) { - mockPluginAPI := NewMockJobPluginAPI(t) + mockPluginAPI := newMockPluginAPI(t) count := new(int32) callback := func() { @@ -56,7 +46,7 @@ func TestSchedule(t *testing.T) { }) t.Run("multi-threaded, single job", func(t *testing.T) { - mockPluginAPI := NewMockJobPluginAPI(t) + mockPluginAPI := newMockPluginAPI(t) count := new(int32) callback := func() { @@ -97,7 +87,7 @@ func TestSchedule(t *testing.T) { }) t.Run("multi-threaded, multiple jobs", func(t *testing.T) { - mockPluginAPI := NewMockJobPluginAPI(t) + mockPluginAPI := newMockPluginAPI(t) countA := new(int32) callbackA := func() { diff --git a/cluster/mock_plugin_api_test.go b/cluster/mock_plugin_api_test.go new file mode 100644 index 0000000..c3907c9 --- /dev/null +++ b/cluster/mock_plugin_api_test.go @@ -0,0 +1,87 @@ +package cluster + +import ( + "bytes" + "sync" + "testing" + + "github.com/mattermost/mattermost-server/v5/model" +) + +type mockPluginAPI struct { + t *testing.T + + lock sync.Mutex + keyValues map[string][]byte + failing bool +} + +func newMockPluginAPI(t *testing.T) *mockPluginAPI { + return &mockPluginAPI{ + t: t, + keyValues: make(map[string][]byte), + } +} + +func (pluginAPI *mockPluginAPI) setFailing(failing bool) { + pluginAPI.lock.Lock() + defer pluginAPI.lock.Unlock() + + pluginAPI.failing = failing +} + +func (pluginAPI *mockPluginAPI) clear() { + pluginAPI.lock.Lock() + defer pluginAPI.lock.Unlock() + + for k := range pluginAPI.keyValues { + delete(pluginAPI.keyValues, k) + } +} + +func (pluginAPI *mockPluginAPI) KVGet(key string) ([]byte, *model.AppError) { + pluginAPI.lock.Lock() + defer pluginAPI.lock.Unlock() + + if pluginAPI.failing { + return nil, &model.AppError{Message: "fake error"} + } + + return pluginAPI.keyValues[key], nil +} + +func (pluginAPI *mockPluginAPI) KVSetWithOptions(key string, value []byte, options model.PluginKVSetOptions) (bool, *model.AppError) { + pluginAPI.lock.Lock() + defer pluginAPI.lock.Unlock() + + if pluginAPI.failing { + return false, &model.AppError{Message: "fake error"} + } + + if options.Atomic { + if actualValue := pluginAPI.keyValues[key]; !bytes.Equal(actualValue, options.OldValue) { + return false, nil + } + } + + if value == nil { + delete(pluginAPI.keyValues, key) + } else { + pluginAPI.keyValues[key] = value + } + + return true, nil +} + +func (pluginAPI *mockPluginAPI) LogError(msg string, keyValuePairs ...interface{}) { + if pluginAPI.t == nil { + return + } + + pluginAPI.t.Helper() + + params := []interface{}{msg} + params = append(params, keyValuePairs...) + + pluginAPI.t.Log(params...) +} diff --git a/cluster/mutex_test.go b/cluster/mutex_test.go index 7a5d827..f5f2fda 100644 --- a/cluster/mutex_test.go +++ b/cluster/mutex_test.go @@ -1,94 +1,13 @@ package cluster import ( - "bytes" - "sync" "testing" "time" - "github.com/mattermost/mattermost-server/v5/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -type MockMutexPluginAPI struct { - t *testing.T - - lock sync.Mutex - keyValues map[string][]byte - failing bool -} - -func NewMockMutexPluginAPI(t *testing.T) *MockMutexPluginAPI { - return &MockMutexPluginAPI{ - t: t, - keyValues: make(map[string][]byte), - } -} - -func (pluginAPI *MockMutexPluginAPI) setFailing(failing bool) { - pluginAPI.lock.Lock() - defer pluginAPI.lock.Unlock() - - pluginAPI.failing = failing -} - -func (pluginAPI *MockMutexPluginAPI) clear() { - pluginAPI.lock.Lock() - defer pluginAPI.lock.Unlock() - - for k := range pluginAPI.keyValues { - delete(pluginAPI.keyValues, k) - } -} - -func (pluginAPI *MockMutexPluginAPI) KVGet(key string) ([]byte, *model.AppError) { - pluginAPI.lock.Lock() - defer pluginAPI.lock.Unlock() - - if pluginAPI.failing { - return nil, &model.AppError{Message: "fake error"} - } - - return pluginAPI.keyValues[key], nil -} - -func (pluginAPI *MockMutexPluginAPI) KVSetWithOptions(key string, value []byte, options model.PluginKVSetOptions) (bool, *model.AppError) { - pluginAPI.lock.Lock() - defer pluginAPI.lock.Unlock() - - if pluginAPI.failing { - return false, &model.AppError{Message: "fake error"} - } - - if options.Atomic { - if actualValue := pluginAPI.keyValues[key]; !bytes.Equal(actualValue, options.OldValue) { - return false, nil - } - } - - if value == nil { - delete(pluginAPI.keyValues, key) - } else { - pluginAPI.keyValues[key] = value - } - - return true, nil -} - -func (pluginAPI *MockMutexPluginAPI) LogError(msg string, keyValuePairs ...interface{}) { - if pluginAPI.t == nil { - return - } - - pluginAPI.t.Helper() - - params := []interface{}{msg} - params = append(params, keyValuePairs...) - - pluginAPI.t.Log(params...) -} - func lock(t *testing.T, m *Mutex) { t.Helper() @@ -127,7 +46,7 @@ func unlock(t *testing.T, m *Mutex, panics bool) { func TestMutex(t *testing.T) { t.Run("successful lock/unlock cycle", func(t *testing.T) { - mockPluginAPI := NewMockMutexPluginAPI(t) + mockPluginAPI := newMockPluginAPI(t) m := NewMutex(mockPluginAPI, "key") lock(t, m) @@ -137,14 +56,14 @@ func TestMutex(t *testing.T) { }) t.Run("unlock when not locked", func(t *testing.T) { - mockPluginAPI := NewMockMutexPluginAPI(t) + mockPluginAPI := newMockPluginAPI(t) m := NewMutex(mockPluginAPI, "key") unlock(t, m, true) }) t.Run("blocking lock", func(t *testing.T) { - mockPluginAPI := NewMockMutexPluginAPI(t) + mockPluginAPI := newMockPluginAPI(t) m := NewMutex(mockPluginAPI, "key") lock(t, m) @@ -171,7 +90,7 @@ func TestMutex(t *testing.T) { }) t.Run("failed lock", func(t *testing.T) { - mockPluginAPI := NewMockMutexPluginAPI(t) + mockPluginAPI := newMockPluginAPI(t) m := NewMutex(mockPluginAPI, "key") @@ -199,7 +118,7 @@ func TestMutex(t *testing.T) { }) t.Run("failed unlock", func(t *testing.T) { - mockPluginAPI := NewMockMutexPluginAPI(t) + mockPluginAPI := newMockPluginAPI(t) m := NewMutex(mockPluginAPI, "key") lock(t, m) @@ -216,7 +135,7 @@ func TestMutex(t *testing.T) { }) t.Run("discrete keys", func(t *testing.T) { - mockPluginAPI := NewMockMutexPluginAPI(t) + mockPluginAPI := newMockPluginAPI(t) m1 := NewMutex(mockPluginAPI, "key1") lock(t, m1) From 80e2766891ff17c8c8fff493860c1542fc512cd1 Mon Sep 17 00:00:00 2001 From: Jesse Hallam Date: Sat, 22 Feb 2020 20:09:34 -0500 Subject: [PATCH 2/9] refactor wait constants --- cluster/mutex.go | 12 ------------ cluster/wait.go | 14 ++++++++++++++ 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/cluster/mutex.go b/cluster/mutex.go index 206720e..c060a2a 100644 --- a/cluster/mutex.go +++ b/cluster/mutex.go @@ -20,18 +20,6 @@ const ( // refreshInterval is the interval on which the mutex will be refreshed when locked refreshInterval = ttl / 2 - - // minWaitInterval is the minimum amount of time to wait between locking attempts - minWaitInterval = 1 * time.Second - - // maxWaitInterval is the maximum amount of time to wait between locking attempts - maxWaitInterval = 5 * time.Minute - - // pollWaitInterval is the usual time to wait between unsuccessful locking attempts - pollWaitInterval = 1 * time.Second - - // jitterWaitInterval is the amount of jitter to add when waiting to avoid thundering herds - jitterWaitInterval = minWaitInterval / 2 ) // MutexPluginAPI is the plugin API interface required to manage mutexes. diff --git a/cluster/wait.go b/cluster/wait.go index bee8df9..bf62b4a 100644 --- a/cluster/wait.go +++ b/cluster/wait.go @@ -5,6 +5,20 @@ import ( "time" ) +const ( + // minWaitInterval is the minimum amount of time to wait between locking attempts + minWaitInterval = 1 * time.Second + + // maxWaitInterval is the maximum amount of time to wait between locking attempts + maxWaitInterval = 5 * time.Minute + + // pollWaitInterval is the usual time to wait between unsuccessful locking attempts + pollWaitInterval = 1 * time.Second + + // jitterWaitInterval is the amount of jitter to add when waiting to avoid thundering herds + jitterWaitInterval = minWaitInterval / 2 +) + // nextWaitInterval determines how long to wait until the next lock retry. func nextWaitInterval(lastWaitInterval time.Duration, err error) time.Duration { nextWaitInterval := lastWaitInterval From f69119021cb6877c03d41891a5574ce3610a9ef3 Mon Sep 17 00:00:00 2001 From: Jesse Hallam Date: Fri, 21 Feb 2020 08:15:06 -0500 Subject: [PATCH 3/9] cluster: user-space key expiry --- cluster/job.go | 1 - cluster/job_test.go | 30 +++++-- cluster/mutex.go | 110 ++++++++++++++++++++++---- cluster/mutex_test.go | 176 +++++++++++++++++++++++++++++++++++++++--- 4 files changed, 285 insertions(+), 32 deletions(-) diff --git a/cluster/job.go b/cluster/job.go index d751abc..d5e414a 100644 --- a/cluster/job.go +++ b/cluster/job.go @@ -18,7 +18,6 @@ const ( // JobPluginAPI is the plugin API interface required to schedule jobs. type JobPluginAPI interface { MutexPluginAPI - KVGet(key string) ([]byte, *model.AppError) } // JobConfig defines the configuration of a scheduled job. diff --git a/cluster/job_test.go b/cluster/job_test.go index bc41d2d..e0988b3 100644 --- a/cluster/job_test.go +++ b/cluster/job_test.go @@ -6,20 +6,31 @@ import ( "testing" "time" + "github.com/mattermost/mattermost-server/v5/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestSchedule(t *testing.T) { + t.Parallel() + + makeKey := func() string { + return model.NewId() + } + t.Run("invalid interval", func(t *testing.T) { + t.Parallel() + mockPluginAPI := newMockPluginAPI(t) - job, err := Schedule(mockPluginAPI, "key", JobConfig{}, func() {}) + job, err := Schedule(mockPluginAPI, makeKey(), JobConfig{}, func() {}) require.Error(t, err, "must specify non-zero job config interval") require.Nil(t, job) }) t.Run("single-threaded", func(t *testing.T) { + t.Parallel() + mockPluginAPI := newMockPluginAPI(t) count := new(int32) @@ -27,7 +38,7 @@ func TestSchedule(t *testing.T) { atomic.AddInt32(count, 1) } - job, err := Schedule(mockPluginAPI, "key", JobConfig{Interval: 100 * time.Millisecond}, callback) + job, err := Schedule(mockPluginAPI, makeKey(), JobConfig{Interval: 100 * time.Millisecond}, callback) require.NoError(t, err) require.NotNil(t, job) @@ -46,6 +57,8 @@ func TestSchedule(t *testing.T) { }) t.Run("multi-threaded, single job", func(t *testing.T) { + t.Parallel() + mockPluginAPI := newMockPluginAPI(t) count := new(int32) @@ -55,8 +68,10 @@ func TestSchedule(t *testing.T) { var jobs []*Job + key := makeKey() + for i := 0; i < 3; i++ { - job, err := Schedule(mockPluginAPI, "key", JobConfig{Interval: 100 * time.Millisecond}, callback) + job, err := Schedule(mockPluginAPI, key, JobConfig{Interval: 100 * time.Millisecond}, callback) require.NoError(t, err) require.NotNil(t, job) @@ -87,6 +102,8 @@ func TestSchedule(t *testing.T) { }) t.Run("multi-threaded, multiple jobs", func(t *testing.T) { + t.Parallel() + mockPluginAPI := newMockPluginAPI(t) countA := new(int32) @@ -99,15 +116,18 @@ func TestSchedule(t *testing.T) { atomic.AddInt32(countB, 1) } + keyA := makeKey() + keyB := makeKey() + var jobs []*Job for i := 0; i < 3; i++ { var key string var callback func() if i <= 1 { - key = "keyA" + key = keyA callback = callbackA } else { - key = "keyB" + key = keyB callback = callbackB } diff --git a/cluster/mutex.go b/cluster/mutex.go index c060a2a..82c39b3 100644 --- a/cluster/mutex.go +++ b/cluster/mutex.go @@ -1,6 +1,7 @@ package cluster import ( + "strconv" "sync" "time" @@ -24,6 +25,7 @@ const ( // MutexPluginAPI is the plugin API interface required to manage mutexes. type MutexPluginAPI interface { + KVGet(key string) ([]byte, *model.AppError) KVSetWithOptions(key string, value []byte, options model.PluginKVSetOptions) (bool, *model.AppError) LogError(msg string, keyValuePairs ...interface{}) } @@ -31,7 +33,10 @@ type MutexPluginAPI interface { // Mutex is similar to sync.Mutex, except usable by multiple plugin instances across a cluster. // // Internally, a mutex relies on an atomic key-value set operation as exposed by the Mattermost -// plugin API. +// plugin API. Note that it explicitly does not rely on the built-in support for key value expiry, +// since the implementation for same in the server was partially broken prior to v5.22 and thus +// unreliable for something like a mutex. Instead, we encode the desired expiry as the value of +// the mutex's key value and atomically delete when found to be expired. // // Mutexes with different names are unrelated. Mutexes with the same name from different plugins // are unrelated. Pick a unique name for each mutex your plugin requires. @@ -46,40 +51,109 @@ type Mutex struct { lock sync.Mutex stopRefresh chan bool refreshDone chan bool + + // lockExpiry tracks the expiration time of the lock when last locked. It is not guarded + // by a local mutex, since it is only read or written when the cluster lock is held. + lockExpiry time.Time } -// NewMutex creates a mutex with the given name. +// NewMutex creates a mutex with the given key name. +// +// Panics if key is empty. func NewMutex(pluginAPI MutexPluginAPI, key string) *Mutex { - key = mutexPrefix + key - return &Mutex{ pluginAPI: pluginAPI, - key: key, + key: makeLockKey(key), } } +// makeLockKey returns the prefixed key used to namespace mutex keys. +func makeLockKey(key string) string { + if len(key) == 0 { + panic("must specify valid mutex key") + } + + return mutexPrefix + key +} + +// makeLockValue returns the encoded lock value for the given expiry timestamp. +func makeLockValue(expiresAt time.Time) []byte { + return []byte(strconv.FormatInt(expiresAt.UnixNano(), 10)) +} + +// getLockValue decodes the given lock value into the expiry timestamp it potentially represents. +func getLockValue(valueBytes []byte) (time.Time, error) { + if len(valueBytes) == 0 { + return time.Time{}, nil + } + + value, err := strconv.ParseInt(string(valueBytes), 10, 64) + if err != nil { + return time.Time{}, errors.Wrap(err, "failed to parse mutex kv value") + } + + return time.Unix(0, value), nil +} + // lock makes a single attempt to atomically lock the mutex, returning true only if successful. func (m *Mutex) tryLock() (bool, error) { - ok, err := m.pluginAPI.KVSetWithOptions(m.key, []byte{1}, model.PluginKVSetOptions{ - Atomic: true, - OldValue: nil, // No existing key value. - ExpireInSeconds: int64(ttl / time.Second), + now := time.Now() + newLockExpiry := now.Add(ttl) + + ok, appErr := m.pluginAPI.KVSetWithOptions(m.key, makeLockValue(newLockExpiry), model.PluginKVSetOptions{ + Atomic: true, + OldValue: nil, // No existing key value. }) + if appErr != nil { + return false, errors.Wrap(appErr, "failed to set mutex kv") + } + + if ok { + m.lockExpiry = newLockExpiry + return true, nil + } + + // Check to see if the lock has expired. + valueBytes, appErr := m.pluginAPI.KVGet(m.key) + if appErr != nil { + return false, errors.Wrap(appErr, "failed to get mutex kv") + } + actualLockExpiry, err := getLockValue(valueBytes) if err != nil { - return false, errors.Wrap(err, "failed to set mutex kv") + return false, err + } + + // It might have already been deleted. + if actualLockExpiry.IsZero() { + return false, nil + } + + // It might still be valid. + if actualLockExpiry.After(now) { + return false, nil } - return ok, nil + // Atomically delete the expired lock and try again. + ok, appErr = m.pluginAPI.KVSetWithOptions(m.key, nil, model.PluginKVSetOptions{ + Atomic: true, + OldValue: valueBytes, + }) + if err != nil { + return false, errors.Wrap(err, "failed to delete mutex kv") + } + + return false, nil } // refreshLock rewrites the lock key value with a new expiry, returning true only if successful. -// -// Only call this while holding the lock. func (m *Mutex) refreshLock() error { - ok, err := m.pluginAPI.KVSetWithOptions(m.key, []byte{1}, model.PluginKVSetOptions{ - Atomic: true, - OldValue: []byte{1}, - ExpireInSeconds: int64(ttl / time.Second), + now := time.Now() + + newLockExpiry := now.Add(ttl) + + ok, err := m.pluginAPI.KVSetWithOptions(m.key, makeLockValue(newLockExpiry), model.PluginKVSetOptions{ + Atomic: true, + OldValue: makeLockValue(m.lockExpiry), }) if err != nil { return errors.Wrap(err, "failed to refresh mutex kv") @@ -87,6 +161,8 @@ func (m *Mutex) refreshLock() error { return errors.New("unexpectedly failed to refresh mutex kv") } + m.lockExpiry = newLockExpiry + return nil } diff --git a/cluster/mutex_test.go b/cluster/mutex_test.go index f5f2fda..ea9e008 100644 --- a/cluster/mutex_test.go +++ b/cluster/mutex_test.go @@ -4,21 +4,74 @@ import ( "testing" "time" + "github.com/mattermost/mattermost-server/v5/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +func TestMakeLockKey(t *testing.T) { + t.Run("empty", func(t *testing.T) { + assert.Panics(t, func() { + makeLockKey("") + }) + }) + + t.Run("not-empty", func(t *testing.T) { + testCases := map[string]string{ + "key": mutexPrefix + "key", + "other": mutexPrefix + "other", + } + + for key, expected := range testCases { + actual := makeLockKey(key) + assert.Equal(t, expected, actual) + } + }) +} + +func TestLockValue(t *testing.T) { + t.Run("empty", func(t *testing.T) { + actual, err := getLockValue([]byte{}) + require.NoError(t, err) + require.True(t, actual.IsZero()) + }) + + t.Run("invalid", func(t *testing.T) { + actual, err := getLockValue([]byte("abc")) + require.Error(t, err) + require.True(t, actual.IsZero()) + }) + + t.Run("successful", func(t *testing.T) { + testCases := []time.Time{ + time.Now().Add(-15 * time.Second), + time.Now(), + time.Now().Add(15 * time.Second), + } + + for _, testCase := range testCases { + t.Run(testCase.Format("Mon Jan 2 15:04:05 -0700 MST 2006"), func(t *testing.T) { + actual, err := getLockValue(makeLockValue(testCase)) + require.NoError(t, err) + require.Equal(t, testCase.Truncate(0), actual.Truncate(0)) + }) + } + }) +} + func lock(t *testing.T, m *Mutex) { t.Helper() done := make(chan bool) go func() { + t.Helper() + defer close(done) m.Lock() }() select { - case <-time.After(1 * time.Second): + case <-time.After(2 * time.Second): require.Fail(t, "failed to lock mutex within 1 second") case <-done: } @@ -29,6 +82,8 @@ func unlock(t *testing.T, m *Mutex, panics bool) { done := make(chan bool) go func() { + t.Helper() + defer close(done) if panics { assert.Panics(t, m.Unlock) @@ -45,10 +100,18 @@ func unlock(t *testing.T, m *Mutex, panics bool) { } func TestMutex(t *testing.T) { + t.Parallel() + + makeKey := func() string { + return model.NewId() + } + t.Run("successful lock/unlock cycle", func(t *testing.T) { + t.Parallel() + mockPluginAPI := newMockPluginAPI(t) - m := NewMutex(mockPluginAPI, "key") + m := NewMutex(mockPluginAPI, makeKey()) lock(t, m) unlock(t, m, false) lock(t, m) @@ -56,16 +119,20 @@ func TestMutex(t *testing.T) { }) t.Run("unlock when not locked", func(t *testing.T) { + t.Parallel() + mockPluginAPI := newMockPluginAPI(t) - m := NewMutex(mockPluginAPI, "key") + m := NewMutex(mockPluginAPI, makeKey()) unlock(t, m, true) }) t.Run("blocking lock", func(t *testing.T) { + t.Parallel() + mockPluginAPI := newMockPluginAPI(t) - m := NewMutex(mockPluginAPI, "key") + m := NewMutex(mockPluginAPI, makeKey()) lock(t, m) done := make(chan bool) @@ -90,9 +157,11 @@ func TestMutex(t *testing.T) { }) t.Run("failed lock", func(t *testing.T) { + t.Parallel() + mockPluginAPI := newMockPluginAPI(t) - m := NewMutex(mockPluginAPI, "key") + m := NewMutex(mockPluginAPI, makeKey()) mockPluginAPI.setFailing(true) @@ -118,9 +187,11 @@ func TestMutex(t *testing.T) { }) t.Run("failed unlock", func(t *testing.T) { + t.Parallel() + mockPluginAPI := newMockPluginAPI(t) - m := NewMutex(mockPluginAPI, "key") + m := NewMutex(mockPluginAPI, makeKey()) lock(t, m) mockPluginAPI.setFailing(true) @@ -135,15 +206,17 @@ func TestMutex(t *testing.T) { }) t.Run("discrete keys", func(t *testing.T) { + t.Parallel() + mockPluginAPI := newMockPluginAPI(t) - m1 := NewMutex(mockPluginAPI, "key1") + m1 := NewMutex(mockPluginAPI, makeKey()) lock(t, m1) - m2 := NewMutex(mockPluginAPI, "key2") + m2 := NewMutex(mockPluginAPI, makeKey()) lock(t, m2) - m3 := NewMutex(mockPluginAPI, "key3") + m3 := NewMutex(mockPluginAPI, makeKey()) lock(t, m3) unlock(t, m1, false) @@ -154,4 +227,89 @@ func TestMutex(t *testing.T) { unlock(t, m2, false) unlock(t, m1, false) }) + + t.Run("expiring lock", func(t *testing.T) { + t.Parallel() + + mockPluginAPI := newMockPluginAPI(t) + + key := makeKey() + m := NewMutex(mockPluginAPI, key) + + // Simulate lock expiring in 5 seconds + now := time.Now() + ok, appErr := mockPluginAPI.KVSetWithOptions(mutexPrefix+key, makeLockValue(now.Add(5*time.Second)), model.PluginKVSetOptions{}) + require.Nil(t, appErr) + require.True(t, ok) + + done1 := make(chan bool) + go func() { + defer close(done1) + m.Lock() + }() + + done2 := make(chan bool) + go func() { + defer close(done2) + m.Lock() + }() + + select { + case <-time.After(1 * time.Second): + case <-done1: + require.Fail(t, "first goroutine should not have locked yet") + case <-done2: + require.Fail(t, "second goroutine should not have locked yet") + } + + select { + case <-time.After(4*time.Second + pollWaitInterval*2): + require.Fail(t, "some goroutine should have locked after expiry") + case <-done1: + m.Unlock() + select { + case <-done2: + case <-time.After(pollWaitInterval * 2): + require.Fail(t, "second goroutine should have locked") + } + + case <-done2: + m.Unlock() + select { + case <-done2: + case <-time.After(pollWaitInterval * 2): + require.Fail(t, "first goroutine should have locked") + } + } + }) + + t.Run("held lock does not expire", func(t *testing.T) { + t.Parallel() + + mockPluginAPI := newMockPluginAPI(t) + + m := NewMutex(mockPluginAPI, makeKey()) + + m.Lock() + + done := make(chan bool) + go func() { + defer close(done) + m.Lock() + }() + + select { + case <-time.After(ttl + pollWaitInterval*2): + case <-done: + require.Fail(t, "goroutine should not have locked") + } + + m.Unlock() + + select { + case <-time.After(pollWaitInterval * 2): + require.Fail(t, "goroutine should have locked after expiry") + case <-done: + } + }) } From cb37824227e392846977e4703d2a4dd313b0fb1c Mon Sep 17 00:00:00 2001 From: Jesse Hallam Date: Mon, 24 Feb 2020 19:41:59 -0500 Subject: [PATCH 4/9] switch to v5.16 compatible APIs --- cluster/job.go | 5 +-- cluster/mock_plugin_api_test.go | 58 ++++++++++++++++++++++++--------- cluster/mutex.go | 20 ++++-------- cluster/mutex_test.go | 32 ++++++++++++++---- 4 files changed, 78 insertions(+), 37 deletions(-) diff --git a/cluster/job.go b/cluster/job.go index d5e414a..0066468 100644 --- a/cluster/job.go +++ b/cluster/job.go @@ -18,6 +18,7 @@ const ( // JobPluginAPI is the plugin API interface required to schedule jobs. type JobPluginAPI interface { MutexPluginAPI + KVSet(key string, value []byte) *model.AppError } // JobConfig defines the configuration of a scheduled job. @@ -101,8 +102,8 @@ func (j *Job) saveMetadata(metadata jobMetadata) error { return errors.Wrap(err, "failed to marshal data") } - ok, appErr := j.pluginAPI.KVSetWithOptions(j.key, data, model.PluginKVSetOptions{}) - if appErr != nil || !ok { + appErr := j.pluginAPI.KVSet(j.key, data) + if appErr != nil { return errors.Wrap(appErr, "failed to set data") } diff --git a/cluster/mock_plugin_api_test.go b/cluster/mock_plugin_api_test.go index c3907c9..8a145ef 100644 --- a/cluster/mock_plugin_api_test.go +++ b/cluster/mock_plugin_api_test.go @@ -30,27 +30,44 @@ func (pluginAPI *mockPluginAPI) setFailing(failing bool) { pluginAPI.failing = failing } -func (pluginAPI *mockPluginAPI) clear() { +func (pluginAPI *mockPluginAPI) KVGet(key string) ([]byte, *model.AppError) { pluginAPI.lock.Lock() defer pluginAPI.lock.Unlock() - for k := range pluginAPI.keyValues { - delete(pluginAPI.keyValues, k) + if pluginAPI.failing { + return nil, &model.AppError{Message: "fake error"} } + + return pluginAPI.keyValues[key], nil } -func (pluginAPI *mockPluginAPI) KVGet(key string) ([]byte, *model.AppError) { +func (pluginAPI *mockPluginAPI) KVSet(key string, value []byte) *model.AppError { pluginAPI.lock.Lock() defer pluginAPI.lock.Unlock() if pluginAPI.failing { - return nil, &model.AppError{Message: "fake error"} + return &model.AppError{Message: "fake error"} } - return pluginAPI.keyValues[key], nil + pluginAPI.keyValues[key] = value + + return nil +} + +func (pluginAPI *mockPluginAPI) KVDelete(key string) *model.AppError { + pluginAPI.lock.Lock() + defer pluginAPI.lock.Unlock() + + if pluginAPI.failing { + return &model.AppError{Message: "fake error"} + } + + delete(pluginAPI.keyValues, key) + + return nil } -func (pluginAPI *mockPluginAPI) KVSetWithOptions(key string, value []byte, options model.PluginKVSetOptions) (bool, *model.AppError) { +func (pluginAPI *mockPluginAPI) KVCompareAndSet(key string, oldValue []byte, value []byte) (bool, *model.AppError) { pluginAPI.lock.Lock() defer pluginAPI.lock.Unlock() @@ -58,18 +75,29 @@ func (pluginAPI *mockPluginAPI) KVSetWithOptions(key string, value []byte, optio return false, &model.AppError{Message: "fake error"} } - if options.Atomic { - if actualValue := pluginAPI.keyValues[key]; !bytes.Equal(actualValue, options.OldValue) { - return false, nil - } + if actualValue := pluginAPI.keyValues[key]; !bytes.Equal(actualValue, oldValue) { + return false, nil } - if value == nil { - delete(pluginAPI.keyValues, key) - } else { - pluginAPI.keyValues[key] = value + pluginAPI.keyValues[key] = value + + return true, nil +} + +func (pluginAPI *mockPluginAPI) KVCompareAndDelete(key string, oldValue []byte) (bool, *model.AppError) { + pluginAPI.lock.Lock() + defer pluginAPI.lock.Unlock() + + if pluginAPI.failing { + return false, &model.AppError{Message: "fake error"} } + if actualValue := pluginAPI.keyValues[key]; !bytes.Equal(actualValue, oldValue) { + return false, nil + } + + delete(pluginAPI.keyValues, key) + return true, nil } diff --git a/cluster/mutex.go b/cluster/mutex.go index 82c39b3..6fa1d77 100644 --- a/cluster/mutex.go +++ b/cluster/mutex.go @@ -26,7 +26,8 @@ const ( // MutexPluginAPI is the plugin API interface required to manage mutexes. type MutexPluginAPI interface { KVGet(key string) ([]byte, *model.AppError) - KVSetWithOptions(key string, value []byte, options model.PluginKVSetOptions) (bool, *model.AppError) + KVCompareAndSet(key string, oldValue, newValue []byte) (bool, *model.AppError) + KVCompareAndDelete(key string, oldValue []byte) (bool, *model.AppError) LogError(msg string, keyValuePairs ...interface{}) } @@ -100,10 +101,7 @@ func (m *Mutex) tryLock() (bool, error) { now := time.Now() newLockExpiry := now.Add(ttl) - ok, appErr := m.pluginAPI.KVSetWithOptions(m.key, makeLockValue(newLockExpiry), model.PluginKVSetOptions{ - Atomic: true, - OldValue: nil, // No existing key value. - }) + ok, appErr := m.pluginAPI.KVCompareAndSet(m.key, nil, makeLockValue(newLockExpiry)) if appErr != nil { return false, errors.Wrap(appErr, "failed to set mutex kv") } @@ -134,10 +132,7 @@ func (m *Mutex) tryLock() (bool, error) { } // Atomically delete the expired lock and try again. - ok, appErr = m.pluginAPI.KVSetWithOptions(m.key, nil, model.PluginKVSetOptions{ - Atomic: true, - OldValue: valueBytes, - }) + ok, appErr = m.pluginAPI.KVCompareAndDelete(m.key, valueBytes) if err != nil { return false, errors.Wrap(err, "failed to delete mutex kv") } @@ -151,10 +146,7 @@ func (m *Mutex) refreshLock() error { newLockExpiry := now.Add(ttl) - ok, err := m.pluginAPI.KVSetWithOptions(m.key, makeLockValue(newLockExpiry), model.PluginKVSetOptions{ - Atomic: true, - OldValue: makeLockValue(m.lockExpiry), - }) + ok, err := m.pluginAPI.KVCompareAndSet(m.key, makeLockValue(m.lockExpiry), makeLockValue(newLockExpiry)) if err != nil { return errors.Wrap(err, "failed to refresh mutex kv") } else if !ok { @@ -231,5 +223,5 @@ func (m *Mutex) Unlock() { m.lock.Unlock() // If an error occurs deleting, the mutex kv will still expire, allowing later retry. - _, _ = m.pluginAPI.KVSetWithOptions(m.key, nil, model.PluginKVSetOptions{}) + m.pluginAPI.KVCompareAndDelete(m.key, makeLockValue(m.lockExpiry)) } diff --git a/cluster/mutex_test.go b/cluster/mutex_test.go index ea9e008..09047ff 100644 --- a/cluster/mutex_test.go +++ b/cluster/mutex_test.go @@ -186,21 +186,42 @@ func TestMutex(t *testing.T) { } }) - t.Run("failed unlock", func(t *testing.T) { + t.Run("failed unlock, key deleted", func(t *testing.T) { t.Parallel() mockPluginAPI := newMockPluginAPI(t) - m := NewMutex(mockPluginAPI, makeKey()) + key := makeKey() + m := NewMutex(mockPluginAPI, key) + lock(t, m) + + mockPluginAPI.setFailing(true) + + unlock(t, m, false) + + // Simulate expiry by deleting key + mockPluginAPI.setFailing(false) + mockPluginAPI.KVDelete(makeLockKey(key)) + + lock(t, m) + }) + + t.Run("failed unlock, key expired", func(t *testing.T) { + t.Parallel() + + mockPluginAPI := newMockPluginAPI(t) + + key := makeKey() + m := NewMutex(mockPluginAPI, key) lock(t, m) mockPluginAPI.setFailing(true) unlock(t, m, false) - // Simulate expiry - mockPluginAPI.clear() + // Simulate expiry by writing expired value mockPluginAPI.setFailing(false) + mockPluginAPI.KVSet(makeLockKey(key), makeLockValue(time.Now().Add(-1*time.Second))) lock(t, m) }) @@ -238,9 +259,8 @@ func TestMutex(t *testing.T) { // Simulate lock expiring in 5 seconds now := time.Now() - ok, appErr := mockPluginAPI.KVSetWithOptions(mutexPrefix+key, makeLockValue(now.Add(5*time.Second)), model.PluginKVSetOptions{}) + appErr := mockPluginAPI.KVSet(makeLockKey(key), makeLockValue(now.Add(5*time.Second))) require.Nil(t, appErr) - require.True(t, ok) done1 := make(chan bool) go func() { From c811115520191df802ea421ba4fc3a7d1800f8cd Mon Sep 17 00:00:00 2001 From: Jesse Hallam Date: Mon, 2 Mar 2020 17:28:08 -0400 Subject: [PATCH 5/9] cluster: support LockWithContext Support locking use cases where the default blocking semantics are undesirable during teardown. --- cluster/mutex.go | 18 ++++++++++-- cluster/mutex_test.go | 65 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+), 2 deletions(-) diff --git a/cluster/mutex.go b/cluster/mutex.go index 6fa1d77..1346025 100644 --- a/cluster/mutex.go +++ b/cluster/mutex.go @@ -1,6 +1,7 @@ package cluster import ( + "context" "strconv" "sync" "time" @@ -161,10 +162,23 @@ func (m *Mutex) refreshLock() error { // Lock locks m. If the mutex is already locked by any plugin instance, including the current one, // the calling goroutine blocks until the mutex can be locked. func (m *Mutex) Lock() { + m.LockWithContext(context.Background()) +} + +// Lock locks m unless the context is cancelled. If the mutex is already locked by any plugin +// instance, including the current one, the calling goroutine blocks until the mutex can be locked, +// or the context is cancelled. +// +// The mutex is locked only if a nil error is returned. +func (m *Mutex) LockWithContext(ctx context.Context) error { var waitInterval time.Duration for { - time.Sleep(waitInterval) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(waitInterval): + } locked, err := m.tryLock() if err != nil { @@ -200,7 +214,7 @@ func (m *Mutex) Lock() { m.refreshDone = done m.lock.Unlock() - return + return nil } } diff --git a/cluster/mutex_test.go b/cluster/mutex_test.go index 09047ff..eb98a34 100644 --- a/cluster/mutex_test.go +++ b/cluster/mutex_test.go @@ -1,6 +1,7 @@ package cluster import ( + "context" "testing" "time" @@ -332,4 +333,68 @@ func TestMutex(t *testing.T) { case <-done: } }) + + t.Run("with uncancelled context", func(t *testing.T) { + t.Parallel() + + mockPluginAPI := newMockPluginAPI(t) + + m := NewMutex(mockPluginAPI, makeKey()) + + m.Lock() + + ctx := context.Background() + done := make(chan bool) + go func() { + defer close(done) + err := m.LockWithContext(ctx) + require.Nil(t, err) + }() + + select { + case <-time.After(ttl + pollWaitInterval*2): + case <-done: + require.Fail(t, "goroutine should not have locked") + } + + m.Unlock() + + select { + case <-time.After(pollWaitInterval * 2): + require.Fail(t, "goroutine should have locked after unlock") + case <-done: + } + }) + + t.Run("with cancelled context", func(t *testing.T) { + t.Parallel() + + mockPluginAPI := newMockPluginAPI(t) + + m := NewMutex(mockPluginAPI, makeKey()) + + m.Lock() + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan bool) + go func() { + defer close(done) + err := m.LockWithContext(ctx) + require.NotNil(t, err) + }() + + select { + case <-time.After(ttl + pollWaitInterval*2): + case <-done: + require.Fail(t, "goroutine should not have locked") + } + + cancel() + + select { + case <-time.After(pollWaitInterval * 2): + require.Fail(t, "goroutine should have aborted after cancellation") + case <-done: + } + }) } From edf3953f3e4a532f003b5da20e41cc5232188abe Mon Sep 17 00:00:00 2001 From: Jesse Hallam Date: Tue, 3 Mar 2020 14:53:42 -0400 Subject: [PATCH 6/9] Update cluster/mutex.go Co-Authored-By: Michael Kochell --- cluster/mutex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster/mutex.go b/cluster/mutex.go index 1346025..ead7411 100644 --- a/cluster/mutex.go +++ b/cluster/mutex.go @@ -165,7 +165,7 @@ func (m *Mutex) Lock() { m.LockWithContext(context.Background()) } -// Lock locks m unless the context is cancelled. If the mutex is already locked by any plugin +// LockWithContext locks m unless the context is cancelled. If the mutex is already locked by any plugin // instance, including the current one, the calling goroutine blocks until the mutex can be locked, // or the context is cancelled. // From 2749a376020625c9864d3dd813f4ebfe88290feb Mon Sep 17 00:00:00 2001 From: Jesse Hallam Date: Thu, 12 Mar 2020 15:51:04 -0300 Subject: [PATCH 7/9] Revert "switch to v5.16 compatible APIs" This reverts commit cb37824227e392846977e4703d2a4dd313b0fb1c. --- cluster/job.go | 5 ++- cluster/mock_plugin_api_test.go | 58 +++++++++------------------------ cluster/mutex.go | 20 ++++++++---- cluster/mutex_test.go | 32 ++++-------------- 4 files changed, 37 insertions(+), 78 deletions(-) diff --git a/cluster/job.go b/cluster/job.go index 0066468..d5e414a 100644 --- a/cluster/job.go +++ b/cluster/job.go @@ -18,7 +18,6 @@ const ( // JobPluginAPI is the plugin API interface required to schedule jobs. type JobPluginAPI interface { MutexPluginAPI - KVSet(key string, value []byte) *model.AppError } // JobConfig defines the configuration of a scheduled job. @@ -102,8 +101,8 @@ func (j *Job) saveMetadata(metadata jobMetadata) error { return errors.Wrap(err, "failed to marshal data") } - appErr := j.pluginAPI.KVSet(j.key, data) - if appErr != nil { + ok, appErr := j.pluginAPI.KVSetWithOptions(j.key, data, model.PluginKVSetOptions{}) + if appErr != nil || !ok { return errors.Wrap(appErr, "failed to set data") } diff --git a/cluster/mock_plugin_api_test.go b/cluster/mock_plugin_api_test.go index 8a145ef..c3907c9 100644 --- a/cluster/mock_plugin_api_test.go +++ b/cluster/mock_plugin_api_test.go @@ -30,44 +30,27 @@ func (pluginAPI *mockPluginAPI) setFailing(failing bool) { pluginAPI.failing = failing } -func (pluginAPI *mockPluginAPI) KVGet(key string) ([]byte, *model.AppError) { +func (pluginAPI *mockPluginAPI) clear() { pluginAPI.lock.Lock() defer pluginAPI.lock.Unlock() - if pluginAPI.failing { - return nil, &model.AppError{Message: "fake error"} + for k := range pluginAPI.keyValues { + delete(pluginAPI.keyValues, k) } - - return pluginAPI.keyValues[key], nil } -func (pluginAPI *mockPluginAPI) KVSet(key string, value []byte) *model.AppError { - pluginAPI.lock.Lock() - defer pluginAPI.lock.Unlock() - - if pluginAPI.failing { - return &model.AppError{Message: "fake error"} - } - - pluginAPI.keyValues[key] = value - - return nil -} - -func (pluginAPI *mockPluginAPI) KVDelete(key string) *model.AppError { +func (pluginAPI *mockPluginAPI) KVGet(key string) ([]byte, *model.AppError) { pluginAPI.lock.Lock() defer pluginAPI.lock.Unlock() if pluginAPI.failing { - return &model.AppError{Message: "fake error"} + return nil, &model.AppError{Message: "fake error"} } - delete(pluginAPI.keyValues, key) - - return nil + return pluginAPI.keyValues[key], nil } -func (pluginAPI *mockPluginAPI) KVCompareAndSet(key string, oldValue []byte, value []byte) (bool, *model.AppError) { +func (pluginAPI *mockPluginAPI) KVSetWithOptions(key string, value []byte, options model.PluginKVSetOptions) (bool, *model.AppError) { pluginAPI.lock.Lock() defer pluginAPI.lock.Unlock() @@ -75,29 +58,18 @@ func (pluginAPI *mockPluginAPI) KVCompareAndSet(key string, oldValue []byte, val return false, &model.AppError{Message: "fake error"} } - if actualValue := pluginAPI.keyValues[key]; !bytes.Equal(actualValue, oldValue) { - return false, nil + if options.Atomic { + if actualValue := pluginAPI.keyValues[key]; !bytes.Equal(actualValue, options.OldValue) { + return false, nil + } } - pluginAPI.keyValues[key] = value - - return true, nil -} - -func (pluginAPI *mockPluginAPI) KVCompareAndDelete(key string, oldValue []byte) (bool, *model.AppError) { - pluginAPI.lock.Lock() - defer pluginAPI.lock.Unlock() - - if pluginAPI.failing { - return false, &model.AppError{Message: "fake error"} + if value == nil { + delete(pluginAPI.keyValues, key) + } else { + pluginAPI.keyValues[key] = value } - if actualValue := pluginAPI.keyValues[key]; !bytes.Equal(actualValue, oldValue) { - return false, nil - } - - delete(pluginAPI.keyValues, key) - return true, nil } diff --git a/cluster/mutex.go b/cluster/mutex.go index ead7411..6060697 100644 --- a/cluster/mutex.go +++ b/cluster/mutex.go @@ -27,8 +27,7 @@ const ( // MutexPluginAPI is the plugin API interface required to manage mutexes. type MutexPluginAPI interface { KVGet(key string) ([]byte, *model.AppError) - KVCompareAndSet(key string, oldValue, newValue []byte) (bool, *model.AppError) - KVCompareAndDelete(key string, oldValue []byte) (bool, *model.AppError) + KVSetWithOptions(key string, value []byte, options model.PluginKVSetOptions) (bool, *model.AppError) LogError(msg string, keyValuePairs ...interface{}) } @@ -102,7 +101,10 @@ func (m *Mutex) tryLock() (bool, error) { now := time.Now() newLockExpiry := now.Add(ttl) - ok, appErr := m.pluginAPI.KVCompareAndSet(m.key, nil, makeLockValue(newLockExpiry)) + ok, appErr := m.pluginAPI.KVSetWithOptions(m.key, makeLockValue(newLockExpiry), model.PluginKVSetOptions{ + Atomic: true, + OldValue: nil, // No existing key value. + }) if appErr != nil { return false, errors.Wrap(appErr, "failed to set mutex kv") } @@ -133,7 +135,10 @@ func (m *Mutex) tryLock() (bool, error) { } // Atomically delete the expired lock and try again. - ok, appErr = m.pluginAPI.KVCompareAndDelete(m.key, valueBytes) + ok, appErr = m.pluginAPI.KVSetWithOptions(m.key, nil, model.PluginKVSetOptions{ + Atomic: true, + OldValue: valueBytes, + }) if err != nil { return false, errors.Wrap(err, "failed to delete mutex kv") } @@ -147,7 +152,10 @@ func (m *Mutex) refreshLock() error { newLockExpiry := now.Add(ttl) - ok, err := m.pluginAPI.KVCompareAndSet(m.key, makeLockValue(m.lockExpiry), makeLockValue(newLockExpiry)) + ok, err := m.pluginAPI.KVSetWithOptions(m.key, makeLockValue(newLockExpiry), model.PluginKVSetOptions{ + Atomic: true, + OldValue: makeLockValue(m.lockExpiry), + }) if err != nil { return errors.Wrap(err, "failed to refresh mutex kv") } else if !ok { @@ -237,5 +245,5 @@ func (m *Mutex) Unlock() { m.lock.Unlock() // If an error occurs deleting, the mutex kv will still expire, allowing later retry. - m.pluginAPI.KVCompareAndDelete(m.key, makeLockValue(m.lockExpiry)) + _, _ = m.pluginAPI.KVSetWithOptions(m.key, nil, model.PluginKVSetOptions{}) } diff --git a/cluster/mutex_test.go b/cluster/mutex_test.go index eb98a34..7becf85 100644 --- a/cluster/mutex_test.go +++ b/cluster/mutex_test.go @@ -187,42 +187,21 @@ func TestMutex(t *testing.T) { } }) - t.Run("failed unlock, key deleted", func(t *testing.T) { + t.Run("failed unlock", func(t *testing.T) { t.Parallel() mockPluginAPI := newMockPluginAPI(t) - key := makeKey() - m := NewMutex(mockPluginAPI, key) - lock(t, m) - - mockPluginAPI.setFailing(true) - - unlock(t, m, false) - - // Simulate expiry by deleting key - mockPluginAPI.setFailing(false) - mockPluginAPI.KVDelete(makeLockKey(key)) - - lock(t, m) - }) - - t.Run("failed unlock, key expired", func(t *testing.T) { - t.Parallel() - - mockPluginAPI := newMockPluginAPI(t) - - key := makeKey() - m := NewMutex(mockPluginAPI, key) + m := NewMutex(mockPluginAPI, makeKey()) lock(t, m) mockPluginAPI.setFailing(true) unlock(t, m, false) - // Simulate expiry by writing expired value + // Simulate expiry + mockPluginAPI.clear() mockPluginAPI.setFailing(false) - mockPluginAPI.KVSet(makeLockKey(key), makeLockValue(time.Now().Add(-1*time.Second))) lock(t, m) }) @@ -260,8 +239,9 @@ func TestMutex(t *testing.T) { // Simulate lock expiring in 5 seconds now := time.Now() - appErr := mockPluginAPI.KVSet(makeLockKey(key), makeLockValue(now.Add(5*time.Second))) + ok, appErr := mockPluginAPI.KVSetWithOptions(mutexPrefix+key, makeLockValue(now.Add(5*time.Second)), model.PluginKVSetOptions{}) require.Nil(t, appErr) + require.True(t, ok) done1 := make(chan bool) go func() { From 606bbfa807b4e71b55c1b756b28d1678eea81025 Mon Sep 17 00:00:00 2001 From: Jesse Hallam Date: Thu, 12 Mar 2020 16:28:35 -0300 Subject: [PATCH 8/9] Revert "cluster: user-space key expiry" This reverts commit f69119021cb6877c03d41891a5574ce3610a9ef3. --- cluster/job.go | 1 + cluster/mutex.go | 91 ++++---------------------------- cluster/mutex_test.go | 117 +----------------------------------------- 3 files changed, 13 insertions(+), 196 deletions(-) diff --git a/cluster/job.go b/cluster/job.go index d5e414a..d751abc 100644 --- a/cluster/job.go +++ b/cluster/job.go @@ -18,6 +18,7 @@ const ( // JobPluginAPI is the plugin API interface required to schedule jobs. type JobPluginAPI interface { MutexPluginAPI + KVGet(key string) ([]byte, *model.AppError) } // JobConfig defines the configuration of a scheduled job. diff --git a/cluster/mutex.go b/cluster/mutex.go index 6060697..dd24675 100644 --- a/cluster/mutex.go +++ b/cluster/mutex.go @@ -2,7 +2,6 @@ package cluster import ( "context" - "strconv" "sync" "time" @@ -26,7 +25,6 @@ const ( // MutexPluginAPI is the plugin API interface required to manage mutexes. type MutexPluginAPI interface { - KVGet(key string) ([]byte, *model.AppError) KVSetWithOptions(key string, value []byte, options model.PluginKVSetOptions) (bool, *model.AppError) LogError(msg string, keyValuePairs ...interface{}) } @@ -34,10 +32,7 @@ type MutexPluginAPI interface { // Mutex is similar to sync.Mutex, except usable by multiple plugin instances across a cluster. // // Internally, a mutex relies on an atomic key-value set operation as exposed by the Mattermost -// plugin API. Note that it explicitly does not rely on the built-in support for key value expiry, -// since the implementation for same in the server was partially broken prior to v5.22 and thus -// unreliable for something like a mutex. Instead, we encode the desired expiry as the value of -// the mutex's key value and atomically delete when found to be expired. +// plugin API. // // Mutexes with different names are unrelated. Mutexes with the same name from different plugins // are unrelated. Pick a unique name for each mutex your plugin requires. @@ -52,10 +47,6 @@ type Mutex struct { lock sync.Mutex stopRefresh chan bool refreshDone chan bool - - // lockExpiry tracks the expiration time of the lock when last locked. It is not guarded - // by a local mutex, since it is only read or written when the cluster lock is held. - lockExpiry time.Time } // NewMutex creates a mutex with the given key name. @@ -77,84 +68,26 @@ func makeLockKey(key string) string { return mutexPrefix + key } -// makeLockValue returns the encoded lock value for the given expiry timestamp. -func makeLockValue(expiresAt time.Time) []byte { - return []byte(strconv.FormatInt(expiresAt.UnixNano(), 10)) -} - -// getLockValue decodes the given lock value into the expiry timestamp it potentially represents. -func getLockValue(valueBytes []byte) (time.Time, error) { - if len(valueBytes) == 0 { - return time.Time{}, nil - } - - value, err := strconv.ParseInt(string(valueBytes), 10, 64) - if err != nil { - return time.Time{}, errors.Wrap(err, "failed to parse mutex kv value") - } - - return time.Unix(0, value), nil -} - // lock makes a single attempt to atomically lock the mutex, returning true only if successful. func (m *Mutex) tryLock() (bool, error) { - now := time.Now() - newLockExpiry := now.Add(ttl) - - ok, appErr := m.pluginAPI.KVSetWithOptions(m.key, makeLockValue(newLockExpiry), model.PluginKVSetOptions{ - Atomic: true, - OldValue: nil, // No existing key value. + ok, err := m.pluginAPI.KVSetWithOptions(m.key, []byte{1}, model.PluginKVSetOptions{ + Atomic: true, + OldValue: nil, // No existing key value. + ExpireInSeconds: int64(ttl / time.Second), }) - if appErr != nil { - return false, errors.Wrap(appErr, "failed to set mutex kv") - } - - if ok { - m.lockExpiry = newLockExpiry - return true, nil - } - - // Check to see if the lock has expired. - valueBytes, appErr := m.pluginAPI.KVGet(m.key) - if appErr != nil { - return false, errors.Wrap(appErr, "failed to get mutex kv") - } - actualLockExpiry, err := getLockValue(valueBytes) if err != nil { - return false, err - } - - // It might have already been deleted. - if actualLockExpiry.IsZero() { - return false, nil + return false, errors.Wrap(err, "failed to set mutex kv") } - // It might still be valid. - if actualLockExpiry.After(now) { - return false, nil - } - - // Atomically delete the expired lock and try again. - ok, appErr = m.pluginAPI.KVSetWithOptions(m.key, nil, model.PluginKVSetOptions{ - Atomic: true, - OldValue: valueBytes, - }) - if err != nil { - return false, errors.Wrap(err, "failed to delete mutex kv") - } - - return false, nil + return ok, nil } // refreshLock rewrites the lock key value with a new expiry, returning true only if successful. func (m *Mutex) refreshLock() error { - now := time.Now() - - newLockExpiry := now.Add(ttl) - - ok, err := m.pluginAPI.KVSetWithOptions(m.key, makeLockValue(newLockExpiry), model.PluginKVSetOptions{ - Atomic: true, - OldValue: makeLockValue(m.lockExpiry), + ok, err := m.pluginAPI.KVSetWithOptions(m.key, []byte{1}, model.PluginKVSetOptions{ + Atomic: true, + OldValue: []byte{1}, + ExpireInSeconds: int64(ttl / time.Second), }) if err != nil { return errors.Wrap(err, "failed to refresh mutex kv") @@ -162,8 +95,6 @@ func (m *Mutex) refreshLock() error { return errors.New("unexpectedly failed to refresh mutex kv") } - m.lockExpiry = newLockExpiry - return nil } diff --git a/cluster/mutex_test.go b/cluster/mutex_test.go index 7becf85..7a27158 100644 --- a/cluster/mutex_test.go +++ b/cluster/mutex_test.go @@ -30,36 +30,6 @@ func TestMakeLockKey(t *testing.T) { }) } -func TestLockValue(t *testing.T) { - t.Run("empty", func(t *testing.T) { - actual, err := getLockValue([]byte{}) - require.NoError(t, err) - require.True(t, actual.IsZero()) - }) - - t.Run("invalid", func(t *testing.T) { - actual, err := getLockValue([]byte("abc")) - require.Error(t, err) - require.True(t, actual.IsZero()) - }) - - t.Run("successful", func(t *testing.T) { - testCases := []time.Time{ - time.Now().Add(-15 * time.Second), - time.Now(), - time.Now().Add(15 * time.Second), - } - - for _, testCase := range testCases { - t.Run(testCase.Format("Mon Jan 2 15:04:05 -0700 MST 2006"), func(t *testing.T) { - actual, err := getLockValue(makeLockValue(testCase)) - require.NoError(t, err) - require.Equal(t, testCase.Truncate(0), actual.Truncate(0)) - }) - } - }) -} - func lock(t *testing.T, m *Mutex) { t.Helper() @@ -72,7 +42,7 @@ func lock(t *testing.T, m *Mutex) { }() select { - case <-time.After(2 * time.Second): + case <-time.After(1 * time.Second): require.Fail(t, "failed to lock mutex within 1 second") case <-done: } @@ -229,91 +199,6 @@ func TestMutex(t *testing.T) { unlock(t, m1, false) }) - t.Run("expiring lock", func(t *testing.T) { - t.Parallel() - - mockPluginAPI := newMockPluginAPI(t) - - key := makeKey() - m := NewMutex(mockPluginAPI, key) - - // Simulate lock expiring in 5 seconds - now := time.Now() - ok, appErr := mockPluginAPI.KVSetWithOptions(mutexPrefix+key, makeLockValue(now.Add(5*time.Second)), model.PluginKVSetOptions{}) - require.Nil(t, appErr) - require.True(t, ok) - - done1 := make(chan bool) - go func() { - defer close(done1) - m.Lock() - }() - - done2 := make(chan bool) - go func() { - defer close(done2) - m.Lock() - }() - - select { - case <-time.After(1 * time.Second): - case <-done1: - require.Fail(t, "first goroutine should not have locked yet") - case <-done2: - require.Fail(t, "second goroutine should not have locked yet") - } - - select { - case <-time.After(4*time.Second + pollWaitInterval*2): - require.Fail(t, "some goroutine should have locked after expiry") - case <-done1: - m.Unlock() - select { - case <-done2: - case <-time.After(pollWaitInterval * 2): - require.Fail(t, "second goroutine should have locked") - } - - case <-done2: - m.Unlock() - select { - case <-done2: - case <-time.After(pollWaitInterval * 2): - require.Fail(t, "first goroutine should have locked") - } - } - }) - - t.Run("held lock does not expire", func(t *testing.T) { - t.Parallel() - - mockPluginAPI := newMockPluginAPI(t) - - m := NewMutex(mockPluginAPI, makeKey()) - - m.Lock() - - done := make(chan bool) - go func() { - defer close(done) - m.Lock() - }() - - select { - case <-time.After(ttl + pollWaitInterval*2): - case <-done: - require.Fail(t, "goroutine should not have locked") - } - - m.Unlock() - - select { - case <-time.After(pollWaitInterval * 2): - require.Fail(t, "goroutine should have locked after expiry") - case <-done: - } - }) - t.Run("with uncancelled context", func(t *testing.T) { t.Parallel() From 27717b32488aa48debb506509efec856c2525c36 Mon Sep 17 00:00:00 2001 From: Jesse Hallam Date: Wed, 11 Mar 2020 11:12:44 -0300 Subject: [PATCH 9/9] do not panic in external API --- cluster/job.go | 7 ++++- cluster/mutex.go | 17 +++++++----- cluster/mutex_example_test.go | 5 +++- cluster/mutex_test.go | 51 ++++++++++++++++++++++++----------- 4 files changed, 57 insertions(+), 23 deletions(-) diff --git a/cluster/job.go b/cluster/job.go index d751abc..0c82671 100644 --- a/cluster/job.go +++ b/cluster/job.go @@ -58,10 +58,15 @@ func Schedule(pluginAPI JobPluginAPI, key string, config JobConfig, callback fun key = cronPrefix + key + mutex, err := NewMutex(pluginAPI, key) + if err != nil { + return nil, errors.Wrap(err, "failed to create job mutex") + } + job := &Job{ pluginAPI: pluginAPI, key: key, - mutex: NewMutex(pluginAPI, key), + mutex: mutex, config: config, callback: callback, stop: make(chan bool), diff --git a/cluster/mutex.go b/cluster/mutex.go index dd24675..858cdcb 100644 --- a/cluster/mutex.go +++ b/cluster/mutex.go @@ -52,20 +52,25 @@ type Mutex struct { // NewMutex creates a mutex with the given key name. // // Panics if key is empty. -func NewMutex(pluginAPI MutexPluginAPI, key string) *Mutex { +func NewMutex(pluginAPI MutexPluginAPI, key string) (*Mutex, error) { + key, err := makeLockKey(key) + if err != nil { + return nil, err + } + return &Mutex{ pluginAPI: pluginAPI, - key: makeLockKey(key), - } + key: key, + }, nil } // makeLockKey returns the prefixed key used to namespace mutex keys. -func makeLockKey(key string) string { +func makeLockKey(key string) (string, error) { if len(key) == 0 { - panic("must specify valid mutex key") + return "", errors.New("must specify valid mutex key") } - return mutexPrefix + key + return mutexPrefix + key, nil } // lock makes a single attempt to atomically lock the mutex, returning true only if successful. diff --git a/cluster/mutex_example_test.go b/cluster/mutex_example_test.go index 5f2d039..73bcbf9 100644 --- a/cluster/mutex_example_test.go +++ b/cluster/mutex_example_test.go @@ -9,7 +9,10 @@ func ExampleMutex() { // Use p.API from your plugin instead. pluginAPI := plugin.API(nil) - m := cluster.NewMutex(pluginAPI, "key") + m, err := cluster.NewMutex(pluginAPI, "key") + if err != nil { + panic(err) + } m.Lock() // critical section m.Unlock() diff --git a/cluster/mutex_test.go b/cluster/mutex_test.go index 7a27158..65e6a98 100644 --- a/cluster/mutex_test.go +++ b/cluster/mutex_test.go @@ -10,11 +10,29 @@ import ( "github.com/stretchr/testify/require" ) +func mustMakeLockKey(key string) string { + key, err := makeLockKey(key) + if err != nil { + panic(err) + } + + return key +} + +func mustNewMutex(pluginAPI MutexPluginAPI, key string) *Mutex { + m, err := NewMutex(pluginAPI, key) + if err != nil { + panic(err) + } + + return m +} + func TestMakeLockKey(t *testing.T) { - t.Run("empty", func(t *testing.T) { - assert.Panics(t, func() { - makeLockKey("") - }) + t.Run("fails when empty", func(t *testing.T) { + key, err := makeLockKey("") + assert.Error(t, err) + assert.Empty(t, key) }) t.Run("not-empty", func(t *testing.T) { @@ -24,7 +42,8 @@ func TestMakeLockKey(t *testing.T) { } for key, expected := range testCases { - actual := makeLockKey(key) + actual, err := makeLockKey(key) + require.NoError(t, err) assert.Equal(t, expected, actual) } }) @@ -82,7 +101,7 @@ func TestMutex(t *testing.T) { mockPluginAPI := newMockPluginAPI(t) - m := NewMutex(mockPluginAPI, makeKey()) + m := mustNewMutex(mockPluginAPI, makeKey()) lock(t, m) unlock(t, m, false) lock(t, m) @@ -94,7 +113,7 @@ func TestMutex(t *testing.T) { mockPluginAPI := newMockPluginAPI(t) - m := NewMutex(mockPluginAPI, makeKey()) + m := mustNewMutex(mockPluginAPI, makeKey()) unlock(t, m, true) }) @@ -103,7 +122,7 @@ func TestMutex(t *testing.T) { mockPluginAPI := newMockPluginAPI(t) - m := NewMutex(mockPluginAPI, makeKey()) + m := mustNewMutex(mockPluginAPI, makeKey()) lock(t, m) done := make(chan bool) @@ -132,7 +151,7 @@ func TestMutex(t *testing.T) { mockPluginAPI := newMockPluginAPI(t) - m := NewMutex(mockPluginAPI, makeKey()) + m := mustNewMutex(mockPluginAPI, makeKey()) mockPluginAPI.setFailing(true) @@ -162,7 +181,8 @@ func TestMutex(t *testing.T) { mockPluginAPI := newMockPluginAPI(t) - m := NewMutex(mockPluginAPI, makeKey()) + key := makeKey() + m := mustNewMutex(mockPluginAPI, key) lock(t, m) mockPluginAPI.setFailing(true) @@ -181,13 +201,13 @@ func TestMutex(t *testing.T) { mockPluginAPI := newMockPluginAPI(t) - m1 := NewMutex(mockPluginAPI, makeKey()) + m1 := mustNewMutex(mockPluginAPI, makeKey()) lock(t, m1) - m2 := NewMutex(mockPluginAPI, makeKey()) + m2 := mustNewMutex(mockPluginAPI, makeKey()) lock(t, m2) - m3 := NewMutex(mockPluginAPI, makeKey()) + m3 := mustNewMutex(mockPluginAPI, makeKey()) lock(t, m3) unlock(t, m1, false) @@ -204,7 +224,8 @@ func TestMutex(t *testing.T) { mockPluginAPI := newMockPluginAPI(t) - m := NewMutex(mockPluginAPI, makeKey()) + key := makeKey() + m := mustNewMutex(mockPluginAPI, key) m.Lock() @@ -236,7 +257,7 @@ func TestMutex(t *testing.T) { mockPluginAPI := newMockPluginAPI(t) - m := NewMutex(mockPluginAPI, makeKey()) + m := mustNewMutex(mockPluginAPI, makeKey()) m.Lock()