Pārlūkot izejas kodu

Feed consumer responses to the user asynchronously

Give each PartitionConsumer a goroutine whose job is to feed the user, and have
the brokerConsumer feed each partition with the response, then wait for acks.

This makes the consumer much more resilient to weird user consumption ordering,
as the old "one partition at a time" logic was very unforgiving.
Evan Huus 10 gadi atpakaļ
vecāks
revīzija
0e02258ac8
1 mainītis faili ar 51 papildinājumiem un 29 dzēšanām
  1. 51 29
      consumer.go

+ 51 - 29
consumer.go

@@ -124,8 +124,9 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
 		partition: partition,
 		partition: partition,
 		messages:  make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
 		messages:  make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
 		errors:    make(chan *ConsumerError, c.conf.ChannelBufferSize),
 		errors:    make(chan *ConsumerError, c.conf.ChannelBufferSize),
+		feeder:    make(chan *FetchResponse, 1),
 		trigger:   make(chan none, 1),
 		trigger:   make(chan none, 1),
-		dying:     make(chan none),
+		dying:     make(chan error, 1),
 		fetchSize: c.conf.Consumer.Fetch.Default,
 		fetchSize: c.conf.Consumer.Fetch.Default,
 	}
 	}
 
 
@@ -144,6 +145,7 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
 	}
 	}
 
 
 	go withRecover(child.dispatcher)
 	go withRecover(child.dispatcher)
+	go withRecover(child.responseFeeder)
 
 
 	child.broker = c.refBrokerConsumer(leader)
 	child.broker = c.refBrokerConsumer(leader)
 	child.broker.input <- child
 	child.broker.input <- child
@@ -259,10 +261,12 @@ type partitionConsumer struct {
 	topic     string
 	topic     string
 	partition int32
 	partition int32
 
 
-	broker         *brokerConsumer
-	messages       chan *ConsumerMessage
-	errors         chan *ConsumerError
-	trigger, dying chan none
+	broker   *brokerConsumer
+	messages chan *ConsumerMessage
+	errors   chan *ConsumerError
+	feeder   chan *FetchResponse
+	trigger  chan none
+	dying    chan error
 
 
 	fetchSize           int32
 	fetchSize           int32
 	offset              int64
 	offset              int64
@@ -306,8 +310,7 @@ func (child *partitionConsumer) dispatcher() {
 		child.consumer.unrefBrokerConsumer(child.broker)
 		child.consumer.unrefBrokerConsumer(child.broker)
 	}
 	}
 	child.consumer.removeChild(child)
 	child.consumer.removeChild(child)
-	close(child.messages)
-	close(child.errors)
+	close(child.feeder)
 }
 }
 
 
 func (child *partitionConsumer) dispatch() error {
 func (child *partitionConsumer) dispatch() error {
@@ -361,11 +364,11 @@ func (child *partitionConsumer) Errors() <-chan *ConsumerError {
 }
 }
 
 
 func (child *partitionConsumer) AsyncClose() {
 func (child *partitionConsumer) AsyncClose() {
-	// this triggers whatever worker owns this child to abandon it and close its trigger channel, which causes
+	// this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes
 	// the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
 	// the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
 	// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
 	// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
 	// also just close itself)
 	// also just close itself)
-	close(child.dying)
+	child.dying <- nil
 }
 }
 
 
 func (child *partitionConsumer) Close() error {
 func (child *partitionConsumer) Close() error {
@@ -392,6 +395,33 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
 	return atomic.LoadInt64(&child.highWaterMarkOffset)
 	return atomic.LoadInt64(&child.highWaterMarkOffset)
 }
 }
 
 
+func (child *partitionConsumer) responseFeeder() {
+	for response := range child.feeder {
+		switch err := child.handleResponse(response); err {
+		case nil:
+			break
+		case ErrOffsetOutOfRange:
+			// there's no point in retrying this it will just fail the same way again
+			// so shut it down and force the user to choose what to do
+			Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, err)
+			child.sendError(err)
+			child.AsyncClose()
+		case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
+			// these three are not fatal errors, but do require redispatching
+			child.dying <- err
+		default:
+			// dunno, tell the user and try redispatching
+			child.sendError(err)
+			child.dying <- err
+		}
+
+		child.broker.acks.Done()
+	}
+
+	close(child.messages)
+	close(child.errors)
+}
+
 func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
 func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
 	block := response.GetBlock(child.topic, child.partition)
 	block := response.GetBlock(child.topic, child.partition)
 	if block == nil {
 	if block == nil {
@@ -468,6 +498,7 @@ type brokerConsumer struct {
 	newSubscriptions chan []*partitionConsumer
 	newSubscriptions chan []*partitionConsumer
 	wait             chan none
 	wait             chan none
 	subscriptions    map[*partitionConsumer]none
 	subscriptions    map[*partitionConsumer]none
+	acks             sync.WaitGroup
 	refs             int
 	refs             int
 }
 }
 
 
@@ -550,25 +581,11 @@ func (bc *brokerConsumer) subscriptionConsumer() {
 			return
 			return
 		}
 		}
 
 
+		bc.acks.Add(len(bc.subscriptions))
 		for child := range bc.subscriptions {
 		for child := range bc.subscriptions {
-			if err := child.handleResponse(response); err != nil {
-				switch err {
-				case ErrOffsetOutOfRange:
-					// there's no point in retrying this it will just fail the same way again
-					// so shut it down and force the user to choose what to do
-					child.AsyncClose()
-					fallthrough
-				default:
-					child.sendError(err)
-					fallthrough
-				case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
-					// these three are not fatal errors, but do require redispatching
-					child.trigger <- none{}
-					delete(bc.subscriptions, child)
-					Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", bc.broker.ID(), child.topic, child.partition, err)
-				}
-			}
+			child.feeder <- response
 		}
 		}
+		bc.acks.Wait()
 	}
 	}
 }
 }
 
 
@@ -581,10 +598,15 @@ func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionC
 
 
 	for child := range bc.subscriptions {
 	for child := range bc.subscriptions {
 		select {
 		select {
-		case <-child.dying:
-			close(child.trigger)
+		case err := <-child.dying:
+			if err == nil {
+				Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
+				close(child.trigger)
+			} else {
+				Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", bc.broker.ID(), child.topic, child.partition, err)
+				child.trigger <- none{}
+			}
 			delete(bc.subscriptions, child)
 			delete(bc.subscriptions, child)
-			Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
 		default:
 		default:
 		}
 		}
 	}
 	}