Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Follow-up on review comments for the bounded queue #2008

Merged
merged 2 commits into from
Jan 9, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func makeJaegerSpan(service string, rootSpan bool, debugEnabled bool) (*jaeger.S

func TestSpanProcessor(t *testing.T) {
w := &fakeSpanWriter{}
p := NewSpanProcessor(w).(*spanProcessor)
p := NewSpanProcessor(w, Options.QueueSize(1)).(*spanProcessor)
defer p.Stop()

res, err := p.ProcessSpans([]*model.Span{
Expand All @@ -229,6 +229,7 @@ func TestSpanProcessorErrors(t *testing.T) {
p := NewSpanProcessor(w,
Options.Logger(logger),
Options.ServiceMetrics(serviceMetrics),
Options.QueueSize(1),
).(*spanProcessor)

res, err := p.ProcessSpans([]*model.Span{
Expand Down
46 changes: 27 additions & 19 deletions pkg/queue/bounded_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"unsafe"

"github.com/uber/jaeger-lib/metrics"
uatomic "go.uber.org/atomic"
)

// BoundedQueue implements a producer-consumer exchange similar to a ring buffer queue,
Expand All @@ -30,14 +31,15 @@ import (
// channels, with a special Reaper goroutine that wakes up when the queue is full and consumers
// the items from the top of the queue until its size drops back to maxSize
type BoundedQueue struct {
size int32
onDroppedItem func(item interface{})
items *chan interface{}
stopCh chan struct{}
workers int
stopWG sync.WaitGroup
stopped int32
size *uatomic.Uint32
capacity *uatomic.Uint32
stopped *uatomic.Uint32
items *chan interface{}
onDroppedItem func(item interface{})
consumer func(item interface{})
workers int
stopCh chan struct{}
}

// NewBoundedQueue constructs the new queue of specified capacity, and with an optional
Expand All @@ -48,6 +50,9 @@ func NewBoundedQueue(capacity int, onDroppedItem func(item interface{})) *Bounde
onDroppedItem: onDroppedItem,
items: &queue,
stopCh: make(chan struct{}),
capacity: uatomic.NewUint32(uint32(capacity)),
stopped: uatomic.NewUint32(0),
size: uatomic.NewUint32(0),
}
}

Expand All @@ -60,14 +65,15 @@ func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{}))
for i := 0; i < q.workers; i++ {
q.stopWG.Add(1)
startWG.Add(1)
go func(queue chan interface{}) {
go func() {
startWG.Done()
defer q.stopWG.Done()
queue := *q.items
for {
select {
case item, ok := <-queue:
if ok {
atomic.AddInt32(&q.size, -1)
q.size.Sub(1)
q.consumer(item)
} else {
// channel closed, finish worker
Expand All @@ -78,31 +84,30 @@ func (q *BoundedQueue) StartConsumers(num int, consumer func(item interface{}))
return
}
}
}(*q.items)
}()
}
startWG.Wait()
}

// Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
func (q *BoundedQueue) Produce(item interface{}) bool {
if atomic.LoadInt32(&q.stopped) != 0 {
if q.stopped.Load() != 0 {
q.onDroppedItem(item)
return false
}

// we might have two concurrent backing queues at the moment
// their combined size is stored in q.size, and their combined capacity
// should match the capacity of the new queue
if q.Size() >= q.Capacity() && q.Capacity() > 0 {
// current consumers of the queue (like tests) expect the queue capacity = 0 to work
// so, we don't drop items when the capacity is 0
if q.Size() >= q.Capacity() {
// note that all items will be dropped if the capacity is 0
q.onDroppedItem(item)
return false
}

select {
case *q.items <- item:
atomic.AddInt32(&q.size, 1)
q.size.Add(1)
return true
default:
// should not happen, as overflows should have been captured earlier
Expand All @@ -116,20 +121,20 @@ func (q *BoundedQueue) Produce(item interface{}) bool {
// Stop stops all consumers, as well as the length reporter if started,
// and releases the items channel. It blocks until all consumers have stopped.
func (q *BoundedQueue) Stop() {
atomic.StoreInt32(&q.stopped, 1) // disable producer
q.stopped.Store(1) // disable producer
close(q.stopCh)
q.stopWG.Wait()
close(*q.items)
}

// Size returns the current size of the queue
func (q *BoundedQueue) Size() int {
return int(atomic.LoadInt32(&q.size))
return int(q.size.Load())
}

// Capacity returns capacity of the queue
func (q *BoundedQueue) Capacity() int {
return cap(*q.items)
return int(q.capacity.Load())
}

// StartLengthReporting starts a timer-based goroutine that periodically reports
Expand All @@ -152,7 +157,7 @@ func (q *BoundedQueue) StartLengthReporting(reportPeriod time.Duration, gauge me

// Resize changes the capacity of the queue, returning whether the action was successful
func (q *BoundedQueue) Resize(capacity int) bool {
if capacity == cap(*q.items) {
if capacity == q.Capacity() {
// noop
return false
}
Expand All @@ -165,10 +170,13 @@ func (q *BoundedQueue) Resize(capacity int) bool {
swapped := atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&q.items)), unsafe.Pointer(q.items), unsafe.Pointer(&queue))
if swapped {
// start a new set of consumers, based on the information given previously
q.StartConsumers(q.workers, q.consumer)
q.StartConsumers(int(q.workers), q.consumer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q.workers is already int

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you spot it using a different linter?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, just paying attention

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should plug your brain in the CI, as our linters didn't catch it.


// gracefully drain the existing queue
close(previous)

// update the capacity
q.capacity.Store(uint32(capacity))
}

return swapped
Expand Down
10 changes: 3 additions & 7 deletions pkg/queue/bounded_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func TestBoundedQueue(t *testing.T) {
_, g := mFact.Snapshot()
if g["size"] == 0 {
time.Sleep(time.Millisecond)
} else {
break
}
}

Expand Down Expand Up @@ -313,16 +315,10 @@ func TestZeroSize(t *testing.T) {
q := NewBoundedQueue(0, func(item interface{}) {
})

var wg sync.WaitGroup
wg.Add(1)
q.StartConsumers(1, func(item interface{}) {
wg.Done()
})

assert.True(t, q.Produce("a")) // in process
wg.Wait()

// if we didn't finish with a timeout, then we are good
assert.False(t, q.Produce("a")) // in process
}

func BenchmarkBoundedQueue(b *testing.B) {
Expand Down