瀏覽代碼

consumer: don't block on undrained partitions

If a partitionConsumer fills up and is not being drained (or is taking a long
time) remove its subscription until it can proceed again in order to not block
other partitions which may still be making progress.
Evan Huus 10 年之前
父節點
當前提交
292f3b0aa1
共有 3 個文件被更改,包括 39 次插入11 次删除
  1. 9 1
      config.go
  2. 29 10
      consumer.go
  3. 1 0
      consumer_test.go

+ 9 - 1
config.go

@@ -105,6 +105,11 @@ type Config struct {
 		// Equivalent to the JVM's `fetch.wait.max.ms`.
 		MaxWaitTime time.Duration
 
+		// The maximum amount of time the consumer expects a message takes to process for the user. If writing to the Messages channel
+		// takes longer than this, that partition will stop fetching more messages until it can proceed again. Note that, since the
+		// Messages channel is buffered, the actual grace time is (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
+		MaxProcessingTime time.Duration
+
 		// Return specifies what channels will be populated. If they are set to true, you must read from
 		// them to prevent deadlock.
 		Return struct {
@@ -147,6 +152,7 @@ func NewConfig() *Config {
 	c.Consumer.Fetch.Default = 32768
 	c.Consumer.Retry.Backoff = 2 * time.Second
 	c.Consumer.MaxWaitTime = 250 * time.Millisecond
+	c.Consumer.MaxProcessingTime = 100 * time.Millisecond
 	c.Consumer.Return.Errors = false
 
 	c.ChannelBufferSize = 256
@@ -239,7 +245,9 @@ func (c *Config) Validate() error {
 	case c.Consumer.Fetch.Max < 0:
 		return ConfigurationError("Consumer.Fetch.Max must be >= 0")
 	case c.Consumer.MaxWaitTime < 1*time.Millisecond:
-		return ConfigurationError("Consumer.MaxWaitTime must be > 1ms")
+		return ConfigurationError("Consumer.MaxWaitTime must be >= 1ms")
+	case c.Consumer.MaxProcessingTime <= 0:
+		return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
 	case c.Consumer.Retry.Backoff < 0:
 		return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
 	}

+ 29 - 10
consumer.go

@@ -1,6 +1,7 @@
 package sarama
 
 import (
+	"errors"
 	"fmt"
 	"sync"
 	"sync/atomic"
@@ -278,6 +279,8 @@ type partitionConsumer struct {
 	highWaterMarkOffset int64
 }
 
+var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
+
 func (child *partitionConsumer) sendError(err error) {
 	cErr := &ConsumerError{
 		Topic:     child.topic,
@@ -403,11 +406,22 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
 func (child *partitionConsumer) responseFeeder() {
 	var msgs []*ConsumerMessage
 
+feederLoop:
 	for response := range child.feeder {
 		msgs, child.responseResult = child.parseResponse(response)
 
-		for _, msg := range msgs {
-			child.messages <- msg
+		for i, msg := range msgs {
+			select {
+			case child.messages <- msg:
+			case <-time.After(child.conf.Consumer.MaxProcessingTime):
+				child.responseResult = errTimedOut
+				child.broker.acks.Done()
+				for _, msg = range msgs[i:] {
+					child.messages <- msg
+				}
+				child.broker.input <- child
+				continue feederLoop
+			}
 		}
 
 		child.broker.acks.Done()
@@ -596,32 +610,37 @@ func (bc *brokerConsumer) handleResponses() {
 			close(child.trigger)
 			delete(bc.subscriptions, child)
 		default:
-			switch child.responseResult {
+			result := child.responseResult
+			child.responseResult = nil
+
+			switch result {
 			case nil:
 				break
+			case errTimedOut:
+				Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
+					bc.broker.ID(), child.topic, child.partition)
+				delete(bc.subscriptions, child)
 			case ErrOffsetOutOfRange:
 				// there's no point in retrying this it will just fail the same way again
 				// shut it down and force the user to choose what to do
-				child.sendError(child.responseResult)
-				Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, child.responseResult)
+				child.sendError(result)
+				Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
 				close(child.trigger)
 				delete(bc.subscriptions, child)
 			case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
 				// not an error, but does need redispatching
 				Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
-					bc.broker.ID(), child.topic, child.partition, child.responseResult)
+					bc.broker.ID(), child.topic, child.partition, result)
 				child.trigger <- none{}
 				delete(bc.subscriptions, child)
 			default:
 				// dunno, tell the user and try redispatching
-				child.sendError(child.responseResult)
+				child.sendError(result)
 				Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
-					bc.broker.ID(), child.topic, child.partition, child.responseResult)
+					bc.broker.ID(), child.topic, child.partition, result)
 				child.trigger <- none{}
 				delete(bc.subscriptions, child)
 			}
-
-			child.responseResult = nil
 		}
 	}
 }

+ 1 - 0
consumer_test.go

@@ -390,6 +390,7 @@ func TestConsumerInterleavedClose(t *testing.T) {
 	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
 	fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
 	leader.Returns(fetchResponse)
+	leader.Returns(fetchResponse)
 
 	safeClose(t, c1)
 	safeClose(t, c0)