Browse Source

Consumer: reduce timer allocations

Rather than allocating a new timer with `time.After` on every message we
consume, allocate one for the `responseFeeder` and just keep resetting it.
Thanks to @Tevic for suggesting this approach.

Fixes #707.
Evan Huus 9 years ago
parent
commit
9d159b4cf0
2 changed files with 6 additions and 1 deletions
  1. 2 0
      CHANGELOG.md
  2. 4 1
      consumer.go

+ 2 - 0
CHANGELOG.md

@@ -25,6 +25,8 @@ Improvements:
    ([#634](https://github.com/Shopify/sarama/pull/634)).
  - Pre-allocate decoding errors, greatly reducing heap usage and GC time against
    misbehaving brokers ([#690](https://github.com/Shopify/sarama/pull/690)).
+ - Re-use consumer expiry timers, removing one allocation per consumed message
+   ([#707](https://github.com/Shopify/sarama/pull/707)).
 
 Bug Fixes:
  - Actually default the client ID to "sarama" like we say we do

+ 4 - 1
consumer.go

@@ -413,15 +413,18 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
 
 func (child *partitionConsumer) responseFeeder() {
 	var msgs []*ConsumerMessage
+	expiryTimer := time.NewTimer(child.conf.Consumer.MaxProcessingTime)
 
 feederLoop:
 	for response := range child.feeder {
 		msgs, child.responseResult = child.parseResponse(response)
 
 		for i, msg := range msgs {
+			expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)
+
 			select {
 			case child.messages <- msg:
-			case <-time.After(child.conf.Consumer.MaxProcessingTime):
+			case <-expiryTimer.C:
 				child.responseResult = errTimedOut
 				child.broker.acks.Done()
 				for _, msg = range msgs[i:] {