Skip to content

Commit

Permalink
feat(producer): add sync pool for channel reuse (#3109)
Browse files Browse the repository at this point in the history
Use a sync.Pool for the sync producer expectation channels to allow for re-use. 

Signed-off-by: k.torgaev <k.torgaev@tinkoff.ru>
  • Loading branch information
kasimtj authored Mar 2, 2025
1 parent 3c67885 commit c2e0d94
Showing 1 changed file with 22 additions and 10 deletions.
32 changes: 22 additions & 10 deletions sync_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package sarama

import "sync"

var expectationsPool = sync.Pool{
New: func() interface{} {
return make(chan *ProducerError, 1)
},
}

// SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct
// broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer
// to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
Expand Down Expand Up @@ -110,32 +116,38 @@ func verifyProducerConfig(config *Config) error {
}

func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
expectation := make(chan *ProducerError, 1)
expectation := expectationsPool.Get().(chan *ProducerError)
msg.expectation = expectation
sp.producer.Input() <- msg

if pErr := <-expectation; pErr != nil {
pErr := <-expectation
msg.expectation = nil
expectationsPool.Put(expectation)
if pErr != nil {
return -1, -1, pErr.Err
}

return msg.Partition, msg.Offset, nil
}

func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
expectations := make(chan chan *ProducerError, len(msgs))
indices := make(chan int, len(msgs))
go func() {
for _, msg := range msgs {
expectation := make(chan *ProducerError, 1)
for i, msg := range msgs {
expectation := expectationsPool.Get().(chan *ProducerError)
msg.expectation = expectation
sp.producer.Input() <- msg
expectations <- expectation
indices <- i
}
close(expectations)
close(indices)
}()

var errors ProducerErrors
for expectation := range expectations {
if pErr := <-expectation; pErr != nil {
for i := range indices {
expectation := msgs[i].expectation
pErr := <-expectation
msgs[i].expectation = nil
expectationsPool.Put(expectation)
if pErr != nil {
errors = append(errors, pErr)
}
}
Expand Down

0 comments on commit c2e0d94

Please sign in to comment.