浏览代码

Fix block on control messages

Move the skip for those block to after the point where we've
parsed/incremented the offset so we don't get stuck on a response
containing *only* control messages.
Evan Huus 6 年之前
父节点
当前提交
807de26ba5
共有 1 个文件被更改,包括 3 次插入4 次删除
  1. 3 4
      consumer.go

+ 3 - 4
consumer.go

@@ -579,10 +579,6 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 
 	messages := []*ConsumerMessage{}
 	for _, records := range block.RecordsSet {
-		if control, err := records.isControl(); err != nil || control {
-			continue
-		}
-
 		switch records.recordsType {
 		case legacyRecords:
 			messageSetMessages, err := child.parseMessages(records.MsgSet)
@@ -596,6 +592,9 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 			if err != nil {
 				return nil, err
 			}
+			if control, err := records.isControl(); err != nil || control {
+				continue
+			}
 
 			messages = append(messages, recordBatchMessages...)
 		default: