فهرست منبع

Move the consumer's channel send slightly

Prep for unblocking consumers that are not being drained
Evan Huus 11 سال پیش
والد
کامیت
7e4b74b20d
1فایلهای تغییر یافته به همراه18 افزوده شده و 12 حذف شده
  1. 18 12
      consumer.go

+ 18 - 12
consumer.go

@@ -401,8 +401,15 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
 }
 }
 
 
 func (child *partitionConsumer) responseFeeder() {
 func (child *partitionConsumer) responseFeeder() {
+	var msgs []*ConsumerMessage
+
 	for response := range child.feeder {
 	for response := range child.feeder {
-		child.responseResult = child.handleResponse(response)
+		msgs, child.responseResult = child.parseResponse(response)
+
+		for _, msg := range msgs {
+			child.messages <- msg
+		}
+
 		child.broker.acks.Done()
 		child.broker.acks.Done()
 	}
 	}
 
 
@@ -410,14 +417,14 @@ func (child *partitionConsumer) responseFeeder() {
 	close(child.errors)
 	close(child.errors)
 }
 }
 
 
-func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
+func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
 	block := response.GetBlock(child.topic, child.partition)
 	block := response.GetBlock(child.topic, child.partition)
 	if block == nil {
 	if block == nil {
-		return ErrIncompleteResponse
+		return nil, ErrIncompleteResponse
 	}
 	}
 
 
 	if block.Err != ErrNoError {
 	if block.Err != ErrNoError {
-		return block.Err
+		return nil, block.Err
 	}
 	}
 
 
 	if len(block.MsgSet.Messages) == 0 {
 	if len(block.MsgSet.Messages) == 0 {
@@ -436,7 +443,7 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
 			}
 			}
 		}
 		}
 
 
-		return nil
+		return nil, nil
 	}
 	}
 
 
 	// we got messages, reset our fetch size in case it was increased for a previous request
 	// we got messages, reset our fetch size in case it was increased for a previous request
@@ -444,8 +451,8 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
 	atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
 	atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
 
 
 	incomplete := false
 	incomplete := false
-	atLeastOne := false
 	prelude := true
 	prelude := true
+	var messages []*ConsumerMessage
 	for _, msgBlock := range block.MsgSet.Messages {
 	for _, msgBlock := range block.MsgSet.Messages {
 
 
 		for _, msg := range msgBlock.Messages() {
 		for _, msg := range msgBlock.Messages() {
@@ -455,14 +462,13 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
 			prelude = false
 			prelude = false
 
 
 			if msg.Offset >= child.offset {
 			if msg.Offset >= child.offset {
-				atLeastOne = true
-				child.messages <- &ConsumerMessage{
+				messages = append(messages, &ConsumerMessage{
 					Topic:     child.topic,
 					Topic:     child.topic,
 					Partition: child.partition,
 					Partition: child.partition,
 					Key:       msg.Msg.Key,
 					Key:       msg.Msg.Key,
 					Value:     msg.Msg.Value,
 					Value:     msg.Msg.Value,
 					Offset:    msg.Offset,
 					Offset:    msg.Offset,
-				}
+				})
 				child.offset = msg.Offset + 1
 				child.offset = msg.Offset + 1
 			} else {
 			} else {
 				incomplete = true
 				incomplete = true
@@ -471,10 +477,10 @@ func (child *partitionConsumer) handleResponse(response *FetchResponse) error {
 
 
 	}
 	}
 
 
-	if incomplete || !atLeastOne {
-		return ErrIncompleteResponse
+	if incomplete || len(messages) == 0 {
+		return nil, ErrIncompleteResponse
 	}
 	}
-	return nil
+	return messages, nil
 }
 }
 
 
 // brokerConsumer
 // brokerConsumer