Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use sync.Cond for packetio.Buffer #307

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 78 additions & 32 deletions deadline/deadline.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,54 +21,53 @@

var _ context.Context = (*Deadline)(nil)

// Deadline signals updatable deadline timer.
// Also, it implements context.Context.
type Deadline struct {
mu sync.RWMutex
timer timer
done chan struct{}
deadline time.Time
state deadlineState
pending uint8
// VerifyFunc checks that the deadline handler call is still valid ie.
// SetDeadline has not been called concurrently.
type VerifyFunc func() bool

// Scheduler is a utility for building deadline handlers. Scheduler is not safe
// for concurrent access.
type Scheduler struct {
f func(VerifyFunc)
timer timer
state deadlineState
pending uint8
}

// New creates new deadline timer.
func New() *Deadline {
return &Deadline{
done: make(chan struct{}),
// NewScheduler creates a Scheduler with the supplied deadline handler function.
// The handler is called once the scheduled deadline is reached. The VerifyFunc
// must be called exactly once and cannot be called concurrently with calls to
// other Scheduler functions.
func NewScheduler(f func(VerifyFunc)) *Scheduler {
return &Scheduler{
f: f,
}
}

func (d *Deadline) timeout() {
d.mu.Lock()
func (d *Scheduler) verify() bool {
if d.pending--; d.pending != 0 || d.state != deadlineStarted {
d.mu.Unlock()
return
return false

Check warning on line 49 in deadline/deadline.go

View check run for this annotation

Codecov / codecov/patch

deadline/deadline.go#L49

Added line #L49 was not covered by tests
}

d.state = deadlineExceeded
done := d.done
d.mu.Unlock()
return true
}

close(done)
func (d *Scheduler) timeout() {
d.f(d.verify)
}

// Set new deadline. Zero value means no deadline.
func (d *Deadline) Set(t time.Time) {
d.mu.Lock()
defer d.mu.Unlock()
// SetDeadline schedules the function to be called. If t is zero the function
// is unscheduled. The returned reset value is true when the previous deadline
// was exceeded. The returned exceeded value is true when t is in the past.
func (d *Scheduler) SetDeadline(t time.Time) (reset, exceeded bool) {
reset = d.state == deadlineExceeded

if d.state == deadlineStarted && d.timer.Stop() {
d.pending--
}

d.deadline = t
d.pending++

if d.state == deadlineExceeded {
d.done = make(chan struct{})
}

if t.IsZero() {
d.pending--
d.state = deadlineStopped
Expand All @@ -87,7 +86,54 @@

d.pending--
d.state = deadlineExceeded
close(d.done)
return reset, true
}

// DeadlineExceeded returns true when the last set deadline has passed
func (d *Scheduler) DeadlineExceeded() bool {
return d.state == deadlineExceeded
}

// Deadline signals updatable deadline timer.
// Also, it implements context.Context.
type Deadline struct {
mu sync.RWMutex
done chan struct{}
deadline time.Time
scheduler *Scheduler
}

// New creates new deadline timer.
func New() *Deadline {
d := &Deadline{
done: make(chan struct{}),
}
d.scheduler = NewScheduler(d.handleDeadline)
return d
}

func (d *Deadline) handleDeadline(verify VerifyFunc) {
d.mu.Lock()
defer d.mu.Unlock()
if verify() {
close(d.done)
}
}

// Set new deadline. Zero value means no deadline.
func (d *Deadline) Set(t time.Time) {
d.mu.Lock()
defer d.mu.Unlock()

d.deadline = t

reset, exceeded := d.scheduler.SetDeadline(t)
if reset {
d.done = make(chan struct{})
}
if exceeded {
close(d.done)
}
}

// Done receives deadline signal.
Expand All @@ -102,7 +148,7 @@
func (d *Deadline) Err() error {
d.mu.RLock()
defer d.mu.RUnlock()
if d.state == deadlineExceeded {
if d.scheduler.DeadlineExceeded() {
return context.DeadlineExceeded
}
return nil
Expand Down
93 changes: 48 additions & 45 deletions packetio/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ const (
// Buffer allows writing packets to an intermediate buffer, which can then be read form.
// This is verify similar to bytes.Buffer but avoids combining multiple writes into a single read.
type Buffer struct {
mutex sync.Mutex
cond sync.Cond
readDeadline *deadline.Scheduler

// this is a circular buffer. If head <= tail, then the useful
// data is in the interval [head, tail[. If tail < head, then
Expand All @@ -38,13 +39,10 @@ type Buffer struct {
data []byte
head, tail int

notify chan struct{}
closed bool

count int
limitCount, limitSize int

readDeadline *deadline.Deadline
}

const (
Expand All @@ -55,10 +53,11 @@ const (

// NewBuffer creates a new Buffer.
func NewBuffer() *Buffer {
return &Buffer{
notify: make(chan struct{}, 1),
readDeadline: deadline.New(),
b := &Buffer{
cond: sync.Cond{L: &sync.Mutex{}},
}
b.readDeadline = deadline.NewScheduler(b.handleReadDeadline)
return b
}

// available returns true if the buffer is large enough to fit a packet
Expand Down Expand Up @@ -128,24 +127,24 @@ func (b *Buffer) Write(packet []byte) (int, error) {
return 0, errPacketTooBig
}

b.mutex.Lock()
b.cond.L.Lock()

if b.closed {
b.mutex.Unlock()
b.cond.L.Unlock()
return 0, io.ErrClosedPipe
}

if (b.limitCount > 0 && b.count >= b.limitCount) ||
(b.limitSize > 0 && b.size()+2+len(packet) > b.limitSize) {
b.mutex.Unlock()
b.cond.L.Unlock()
return 0, ErrFull
}

// grow the buffer until the packet fits
for !b.available(len(packet)) {
err := b.grow()
if err != nil {
b.mutex.Unlock()
b.cond.L.Unlock()
return 0, err
}
}
Expand All @@ -172,11 +171,9 @@ func (b *Buffer) Write(packet []byte) (int, error) {
}
b.count++

select {
case b.notify <- struct{}{}:
default:
}
b.mutex.Unlock()
b.cond.L.Unlock()

b.cond.Signal()

return len(packet), nil
}
Expand All @@ -186,15 +183,12 @@ func (b *Buffer) Write(packet []byte) (int, error) {
// Returns io.ErrShortBuffer is the packet is too small to copy the Write.
// Returns io.EOF if the buffer is closed.
func (b *Buffer) Read(packet []byte) (n int, err error) { //nolint:gocognit
// Return immediately if the deadline is already exceeded.
select {
case <-b.readDeadline.Done():
return 0, &netError{ErrTimeout, true, true}
default:
}

b.cond.L.Lock()
for {
b.mutex.Lock()
if b.readDeadline.DeadlineExceeded() {
b.cond.L.Unlock()
return 0, &netError{ErrTimeout, true, true}
}

if b.head != b.tail {
// decode the packet size
Expand Down Expand Up @@ -238,7 +232,7 @@ func (b *Buffer) Read(packet []byte) (n int, err error) { //nolint:gocognit
}

b.count--
b.mutex.Unlock()
b.cond.L.Unlock()

if copied < count {
return copied, io.ErrShortBuffer
Expand All @@ -247,58 +241,53 @@ func (b *Buffer) Read(packet []byte) (n int, err error) { //nolint:gocognit
}

if b.closed {
b.mutex.Unlock()
b.cond.L.Unlock()
return 0, io.EOF
}
b.mutex.Unlock()

select {
case <-b.readDeadline.Done():
return 0, &netError{ErrTimeout, true, true}
case <-b.notify:
}
b.cond.Wait()
}
}

// Close the buffer, unblocking any pending reads.
// Data in the buffer can still be read, Read will return io.EOF only when empty.
func (b *Buffer) Close() (err error) {
b.mutex.Lock()
b.cond.L.Lock()

if b.closed {
b.mutex.Unlock()
b.cond.L.Unlock()
return nil
}

b.closed = true
close(b.notify)
b.mutex.Unlock()
b.cond.Broadcast()
b.cond.L.Unlock()

return nil
}

// Count returns the number of packets in the buffer.
func (b *Buffer) Count() int {
b.mutex.Lock()
defer b.mutex.Unlock()
b.cond.L.Lock()
defer b.cond.L.Unlock()
return b.count
}

// SetLimitCount controls the maximum number of packets that can be buffered.
// Causes Write to return ErrFull when this limit is reached.
// A zero value will disable this limit.
func (b *Buffer) SetLimitCount(limit int) {
b.mutex.Lock()
defer b.mutex.Unlock()
b.cond.L.Lock()
defer b.cond.L.Unlock()

b.limitCount = limit
}

// Size returns the total byte size of packets in the buffer, including
// a small amount of administrative overhead.
func (b *Buffer) Size() int {
b.mutex.Lock()
defer b.mutex.Unlock()
b.cond.L.Lock()
defer b.cond.L.Unlock()

return b.size()
}
Expand All @@ -319,15 +308,29 @@ func (b *Buffer) size() int {
// When packetioSizeHardLimit build tag is set, SetLimitSize exceeding
// the hard limit will be silently discarded.
func (b *Buffer) SetLimitSize(limit int) {
b.mutex.Lock()
defer b.mutex.Unlock()
b.cond.L.Lock()
defer b.cond.L.Unlock()

b.limitSize = limit
}

func (b *Buffer) handleReadDeadline(verify deadline.VerifyFunc) {
b.cond.L.Lock()
defer b.cond.L.Unlock()
if verify() {
b.cond.Broadcast()
}
}

// SetReadDeadline sets the deadline for the Read operation.
// Setting to zero means no deadline.
func (b *Buffer) SetReadDeadline(t time.Time) error {
b.readDeadline.Set(t)
b.cond.L.Lock()
defer b.cond.L.Unlock()

if _, exceeded := b.readDeadline.SetDeadline(t); exceeded {
b.cond.Broadcast()
}

return nil
}
10 changes: 10 additions & 0 deletions packetio/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ func TestBuffer(t *testing.T) {
}
assert.Equal(0, n)

// Future deadline
err = buffer.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
assert.NoError(err)
time.Sleep(200 * time.Millisecond)
n, err = buffer.Read(packet)
if !errors.As(err, &e) || !e.Timeout() {
t.Errorf("Unexpected error: %v", err)
}
assert.Equal(0, n)

// Reset deadline
err = buffer.SetReadDeadline(time.Time{})
assert.NoError(err)
Expand Down
Loading