Skip to content

Commit

Permalink
concurrency/lock: Adds context and outercancel locks (#115)
Browse files Browse the repository at this point in the history
* concurrency/lock: Adds context and outercancel locks

Adds context lock which will cancel and return an error to Lock & RLock
if the given context cancels before the lock is achieved.

Adds outercancel lock which will cancel all RLocks in progress if the
outer Lock is called.

Signed-off-by: joshvanl <me@joshvanl.dev>

* lint

Signed-off-by: joshvanl <me@joshvanl.dev>

* Lint

Signed-off-by: joshvanl <me@joshvanl.dev>

* Fix tests

Signed-off-by: joshvanl <me@joshvanl.dev>

* lint

Signed-off-by: joshvanl <me@joshvanl.dev>

* lint

Signed-off-by: joshvanl <me@joshvanl.dev>

---------

Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL authored Feb 24, 2025
1 parent 6271c8b commit 39c4bf5
Show file tree
Hide file tree
Showing 4 changed files with 565 additions and 0 deletions.
62 changes: 62 additions & 0 deletions concurrency/lock/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lock

import (
"context"
"sync"
)

// Context is a ready write mutex lock where Locking can return early with an
// error if the context is done. No error response means the lock is acquired.
type Context struct {
lock sync.RWMutex
locked chan struct{}
}

func NewContext() *Context {
return &Context{
locked: make(chan struct{}, 1),
}
}

func (c *Context) Lock(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case c.locked <- struct{}{}:
c.lock.Lock()
return nil
}
}

func (c *Context) Unlock() {
c.lock.Unlock()
<-c.locked
}

func (c *Context) RLock(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case c.locked <- struct{}{}:
c.lock.RLock()
return nil
}
}

func (c *Context) RUnlock() {
c.lock.RUnlock()
<-c.locked
}
81 changes: 81 additions & 0 deletions concurrency/lock/context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lock

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func Test_Context(t *testing.T) {
tests := map[string]struct {
name string
action func(l *Context) error
expectError bool
}{
"Successful Lock": {
action: func(l *Context) error {
return l.Lock(context.Background())
},
expectError: false,
},
"Lock with Context Timeout": {
action: func(l *Context) error {
l.Lock(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
return l.Lock(ctx)
},
expectError: true,
},
"Successful RLock": {
action: func(l *Context) error {
return l.RLock(context.Background())
},
expectError: false,
},
"RLock with Context Timeout": {
action: func(l *Context) error {
l.Lock(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
return l.RLock(ctx)
},
expectError: true,
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()

l := NewContext()

done := make(chan error)
go func() {
done <- test.action(l)
}()

select {
case err := <-done:
assert.Equal(t, (err != nil), test.expectError, "unexpected error, expected error: %v, got: %v", test.expectError, err)
case <-time.After(time.Second):
t.Errorf("test timed out")
}
})
}
}
199 changes: 199 additions & 0 deletions concurrency/lock/outercancel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lock

import (
"context"
"errors"
"sync"
"time"

"github.com/dapr/kit/concurrency/fifo"
)

var errLockClosed = errors.New("lock closed")

type hold struct {
writeLock bool
rctx context.Context
respCh chan *holdresp
}

type holdresp struct {
rctx context.Context
cancel context.CancelFunc
err error
}

type OuterCancel struct {
ch chan *hold
cancelErr error
gracefulTimeout time.Duration

lock chan struct{}

wg sync.WaitGroup
rcancelLock sync.Mutex
rcancelx uint64
rcancels map[uint64]context.CancelFunc

closeCh chan struct{}
shutdownLock *fifo.Mutex
}

func NewOuterCancel(cancelErr error, gracefulTimeout time.Duration) *OuterCancel {
return &OuterCancel{
lock: make(chan struct{}, 1),
ch: make(chan *hold, 1),
rcancels: make(map[uint64]context.CancelFunc),
closeCh: make(chan struct{}),
shutdownLock: fifo.New(),
cancelErr: cancelErr,
gracefulTimeout: gracefulTimeout,
}
}

func (o *OuterCancel) Run(ctx context.Context) {
defer func() {
o.rcancelLock.Lock()
defer o.rcancelLock.Unlock()

for _, cancel := range o.rcancels {
go cancel()
}
}()

go func() {
<-ctx.Done()
close(o.closeCh)
}()

for {
select {
case <-o.closeCh:
return
case h := <-o.ch:
o.handleHold(h)
}
}
}

func (o *OuterCancel) handleHold(h *hold) {
if h.rctx != nil {
select {
case o.lock <- struct{}{}:
case <-h.rctx.Done():
h.respCh <- &holdresp{err: h.rctx.Err()}
return
}
} else {
o.lock <- struct{}{}
}

o.rcancelLock.Lock()

if h.writeLock {
for _, cancel := range o.rcancels {
go cancel()
}
o.rcancelx = 0
o.rcancelLock.Unlock()
o.wg.Wait()

h.respCh <- &holdresp{cancel: func() { <-o.lock }}

return
}

o.wg.Add(1)
var done bool
doneCh := make(chan bool)
rctx, cancel := context.WithCancelCause(h.rctx)
i := o.rcancelx

rcancel := func() {
o.rcancelLock.Lock()
if !done {
close(doneCh)
cancel(o.cancelErr)
delete(o.rcancels, i)
o.wg.Done()
done = true
}
o.rcancelLock.Unlock()
}

rcancelGrace := func() {
select {
case <-time.After(o.gracefulTimeout):
case <-o.closeCh:
case <-doneCh:
}
rcancel()
}

o.rcancels[i] = rcancelGrace
o.rcancelx++

o.rcancelLock.Unlock()

h.respCh <- &holdresp{rctx: rctx, cancel: rcancel}

<-o.lock
}

func (o *OuterCancel) Lock() context.CancelFunc {
h := hold{
writeLock: true,
respCh: make(chan *holdresp, 1),
}

select {
case <-o.closeCh:
o.shutdownLock.Lock()
return o.shutdownLock.Unlock
case o.ch <- &h:
}

select {
case <-o.closeCh:
o.shutdownLock.Lock()
return o.shutdownLock.Unlock
case resp := <-h.respCh:
return resp.cancel
}
}

func (o *OuterCancel) RLock(ctx context.Context) (context.Context, context.CancelFunc, error) {
h := hold{
writeLock: false,
rctx: ctx,
respCh: make(chan *holdresp, 1),
}

select {
case <-o.closeCh:
return nil, nil, errLockClosed
case <-ctx.Done():
return nil, nil, ctx.Err()
case o.ch <- &h:
}

select {
case <-o.closeCh:
return nil, nil, errLockClosed
case resp := <-h.respCh:
return resp.rctx, resp.cancel, resp.err
}
}
Loading

0 comments on commit 39c4bf5

Please sign in to comment.