Jelajahi Sumber

Merge pull request #485 from Shopify/dont-require-all-consumers-drained

Dont require all consumers drained
Evan Huus 10 tahun lalu
induk
melakukan
e1729d6285
3 mengubah file dengan 75 tambahan dan 40 penghapusan
  1. 9 1
      config.go
  2. 65 39
      consumer.go
  3. 1 0
      consumer_test.go

+ 9 - 1
config.go

@@ -105,6 +105,11 @@ type Config struct {
 		// Equivalent to the JVM's `fetch.wait.max.ms`.
 		MaxWaitTime time.Duration
 
+		// The maximum amount of time the consumer expects a message takes to process for the user. If writing to the Messages channel
+		// takes longer than this, that partition will stop fetching more messages until it can proceed again. Note that, since the
+		// Messages channel is buffered, the actual grace time is (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
+		MaxProcessingTime time.Duration
+
 		// Return specifies what channels will be populated. If they are set to true, you must read from
 		// them to prevent deadlock.
 		Return struct {
@@ -147,6 +152,7 @@ func NewConfig() *Config {
 	c.Consumer.Fetch.Default = 32768
 	c.Consumer.Retry.Backoff = 2 * time.Second
 	c.Consumer.MaxWaitTime = 250 * time.Millisecond
+	c.Consumer.MaxProcessingTime = 100 * time.Millisecond
 	c.Consumer.Return.Errors = false
 
 	c.ChannelBufferSize = 256
@@ -239,7 +245,9 @@ func (c *Config) Validate() error {
 	case c.Consumer.Fetch.Max < 0:
 		return ConfigurationError("Consumer.Fetch.Max must be >= 0")
 	case c.Consumer.MaxWaitTime < 1*time.Millisecond:
-		return ConfigurationError("Consumer.MaxWaitTime must be > 1ms")
+		return ConfigurationError("Consumer.MaxWaitTime must be >= 1ms")
+	case c.Consumer.MaxProcessingTime <= 0:
+		return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
 	case c.Consumer.Retry.Backoff < 0:
 		return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
 	}

+ 65 - 39
consumer.go

@@ -1,6 +1,7 @@
 package sarama
 
 import (
+	"errors"
 	"fmt"
 	"sync"
 	"sync/atomic"
@@ -271,13 +272,15 @@ type partitionConsumer struct {
 	feeder   chan *FetchResponse
 
 	trigger, dying chan none
-	dispatchReason error
+	responseResult error
 
 	fetchSize           int32
 	offset              int64
 	highWaterMarkOffset int64
 }
 
+var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
+
 func (child *partitionConsumer) sendError(err error) {
 	cErr := &ConsumerError{
 		Topic:     child.topic,
@@ -401,23 +404,24 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
 }
 
 func (child *partitionConsumer) responseFeeder() {
+	var msgs []*ConsumerMessage
+
+feederLoop:
 	for response := range child.feeder {
-		switch err := child.handleResponse(response); err {
-		case nil:
-			break
-		case ErrOffsetOutOfRange:
-			// there's no point in retrying this it will just fail the same way again
-			// so shut it down and force the user to choose what to do
-			Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, err)
-			child.sendError(err)
-			child.AsyncClose()
-		case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
-			// these three are not fatal errors, but do require redispatching
-			child.dispatchReason = err
-		default:
-			// dunno, tell the user and try redispatching
-			child.sendError(err)
-			child.dispatchReason = err
+		msgs, child.responseResult = child.parseResponse(response)
+
+		for i, msg := range msgs {
+			select {
+			case child.messages <- msg:
+			case <-time.After(child.conf.Consumer.MaxProcessingTime):
+				child.responseResult = errTimedOut
+				child.broker.acks.Done()
+				for _, msg = range msgs[i:] {
+					child.messages <- msg
+				}
+				child.broker.input <- child
+				continue feederLoop
+			}
 		}
 
 		child.broker.acks.Done()
@@ -427,14 +431,14 @@ func (child *partitionConsumer) responseFeeder() {
 	close(child.errors)
 }
 
-func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
+func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
 	block := response.GetBlock(child.topic, child.partition)
 	if block == nil {
-		return ErrIncompleteResponse
+		return nil, ErrIncompleteResponse
 	}
 
 	if block.Err != ErrNoError {
-		return block.Err
+		return nil, block.Err
 	}
 
 	if len(block.MsgSet.Messages) == 0 {
@@ -453,7 +457,7 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
 			}
 		}
 
-		return nil
+		return nil, nil
 	}
 
 	// we got messages, reset our fetch size in case it was increased for a previous request
@@ -461,8 +465,8 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
 	atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
 
 	incomplete := false
-	atLeastOne := false
 	prelude := true
+	var messages []*ConsumerMessage
 	for _, msgBlock := range block.MsgSet.Messages {
 
 		for _, msg := range msgBlock.Messages() {
@@ -472,14 +476,13 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
 			prelude = false
 
 			if msg.Offset >= child.offset {
-				atLeastOne = true
-				child.messages <- &ConsumerMessage{
+				messages = append(messages, &ConsumerMessage{
 					Topic:     child.topic,
 					Partition: child.partition,
 					Key:       msg.Msg.Key,
 					Value:     msg.Msg.Value,
 					Offset:    msg.Offset,
-				}
+				})
 				child.offset = msg.Offset + 1
 			} else {
 				incomplete = true
@@ -488,10 +491,10 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
 
 	}
 
-	if incomplete || !atLeastOne {
-		return ErrIncompleteResponse
+	if incomplete || len(messages) == 0 {
+		return nil, ErrIncompleteResponse
 	}
-	return nil
+	return messages, nil
 }
 
 // brokerConsumer
@@ -569,7 +572,10 @@ func (bc *brokerConsumer) subscriptionConsumer() {
 
 	// the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
 	for newSubscriptions := range bc.newSubscriptions {
-		bc.updateSubscriptionCache(newSubscriptions)
+		for _, child := range newSubscriptions {
+			bc.subscriptions[child] = none{}
+			Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
+		}
 
 		if len(bc.subscriptions) == 0 {
 			// We're about to be shut down or we're about to receive more subscriptions.
@@ -591,16 +597,12 @@ func (bc *brokerConsumer) subscriptionConsumer() {
 			child.feeder <- response
 		}
 		bc.acks.Wait()
+		bc.handleResponses()
 	}
 }
 
-func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionConsumer) {
-	// take new subscriptions, and abandon subscriptions that have been closed
-	for _, child := range newSubscriptions {
-		bc.subscriptions[child] = none{}
-		Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
-	}
-
+func (bc *brokerConsumer) handleResponses() {
+	// handles the response codes left for us by our subscriptions, and abandons ones that have been closed
 	for child := range bc.subscriptions {
 		select {
 		case <-child.dying:
@@ -608,10 +610,34 @@ func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionC
 			close(child.trigger)
 			delete(bc.subscriptions, child)
 		default:
-			if child.dispatchReason != nil {
+			result := child.responseResult
+			child.responseResult = nil
+
+			switch result {
+			case nil:
+				break
+			case errTimedOut:
+				Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
+					bc.broker.ID(), child.topic, child.partition)
+				delete(bc.subscriptions, child)
+			case ErrOffsetOutOfRange:
+				// there's no point in retrying this it will just fail the same way again
+				// shut it down and force the user to choose what to do
+				child.sendError(result)
+				Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
+				close(child.trigger)
+				delete(bc.subscriptions, child)
+			case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
+				// not an error, but does need redispatching
+				Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
+					bc.broker.ID(), child.topic, child.partition, result)
+				child.trigger <- none{}
+				delete(bc.subscriptions, child)
+			default:
+				// dunno, tell the user and try redispatching
+				child.sendError(result)
 				Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
-					bc.broker.ID(), child.topic, child.partition, child.dispatchReason)
-				child.dispatchReason = nil
+					bc.broker.ID(), child.topic, child.partition, result)
 				child.trigger <- none{}
 				delete(bc.subscriptions, child)
 			}

+ 1 - 0
consumer_test.go

@@ -390,6 +390,7 @@ func TestConsumerInterleavedClose(t *testing.T) {
 	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
 	fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
 	leader.Returns(fetchResponse)
+	leader.Returns(fetchResponse)
 
 	safeClose(t, c1)
 	safeClose(t, c0)