diff --git a/sync_producer.go b/sync_producer.go index 3119baa6d..f6876fbee 100644 --- a/sync_producer.go +++ b/sync_producer.go @@ -2,6 +2,12 @@ package sarama import "sync" +var expectationsPool = sync.Pool{ + New: func() interface{} { + return make(chan *ProducerError, 1) + }, +} + // SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct // broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer // to avoid leaks, it may not be garbage-collected automatically when it passes out of scope. @@ -110,11 +116,13 @@ func verifyProducerConfig(config *Config) error { } func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) { - expectation := make(chan *ProducerError, 1) + expectation := expectationsPool.Get().(chan *ProducerError) msg.expectation = expectation sp.producer.Input() <- msg - - if pErr := <-expectation; pErr != nil { + pErr := <-expectation + msg.expectation = nil + expectationsPool.Put(expectation) + if pErr != nil { return -1, -1, pErr.Err } @@ -122,20 +130,24 @@ func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offs } func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error { - expectations := make(chan chan *ProducerError, len(msgs)) + indices := make(chan int, len(msgs)) go func() { - for _, msg := range msgs { - expectation := make(chan *ProducerError, 1) + for i, msg := range msgs { + expectation := expectationsPool.Get().(chan *ProducerError) msg.expectation = expectation sp.producer.Input() <- msg - expectations <- expectation + indices <- i } - close(expectations) + close(indices) }() var errors ProducerErrors - for expectation := range expectations { - if pErr := <-expectation; pErr != nil { + for i := range indices { + expectation := msgs[i].expectation + pErr := <-expectation + msgs[i].expectation = nil + expectationsPool.Put(expectation) + if pErr != nil { errors = append(errors, pErr) } }