Ver código fonte

Simplify messageSet and recordBatch parsing

Maxim Vladimirskiy 7 anos atrás
pai
commit
973e75ca79
1 arquivos alterados com 25 adições e 52 exclusões
  1. 25 52
      consumer.go

+ 25 - 52
consumer.go

@@ -482,9 +482,6 @@ feederLoop:
 
 func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
 	var messages []*ConsumerMessage
-	var incomplete bool
-	prelude := true
-
 	for _, msgBlock := range msgSet.Messages {
 		for _, msg := range msgBlock.Messages() {
 			offset := msg.Offset
@@ -492,29 +489,22 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
 				baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
 				offset += baseOffset
 			}
-			if prelude && offset < child.offset {
+			if offset < child.offset {
 				continue
 			}
-			prelude = false
-
-			if offset >= child.offset {
-				messages = append(messages, &ConsumerMessage{
-					Topic:          child.topic,
-					Partition:      child.partition,
-					Key:            msg.Msg.Key,
-					Value:          msg.Msg.Value,
-					Offset:         offset,
-					Timestamp:      msg.Msg.Timestamp,
-					BlockTimestamp: msgBlock.Msg.Timestamp,
-				})
-				child.offset = offset + 1
-			} else {
-				incomplete = true
-			}
+			messages = append(messages, &ConsumerMessage{
+				Topic:          child.topic,
+				Partition:      child.partition,
+				Key:            msg.Msg.Key,
+				Value:          msg.Msg.Value,
+				Offset:         offset,
+				Timestamp:      msg.Msg.Timestamp,
+				BlockTimestamp: msgBlock.Msg.Timestamp,
+			})
+			child.offset = offset + 1
 		}
 	}
-
-	if incomplete && len(messages) == 0 {
+	if len(messages) == 0 {
 		return nil, ErrIncompleteResponse
 	}
 	return messages, nil
@@ -522,42 +512,25 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
 
 func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
 	var messages []*ConsumerMessage
-	var incomplete bool
-	prelude := true
-	originalOffset := child.offset
-
 	for _, rec := range batch.Records {
 		offset := batch.FirstOffset + rec.OffsetDelta
-		if prelude && offset < child.offset {
+		if offset < child.offset {
 			continue
 		}
-		prelude = false
-
-		if offset >= child.offset {
-			messages = append(messages, &ConsumerMessage{
-				Topic:     child.topic,
-				Partition: child.partition,
-				Key:       rec.Key,
-				Value:     rec.Value,
-				Offset:    offset,
-				Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
-				Headers:   rec.Headers,
-			})
-			child.offset = offset + 1
-		} else {
-			incomplete = true
-		}
-	}
-
-	if incomplete && len(messages) == 0 {
+		messages = append(messages, &ConsumerMessage{
+			Topic:     child.topic,
+			Partition: child.partition,
+			Key:       rec.Key,
+			Value:     rec.Value,
+			Offset:    offset,
+			Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
+			Headers:   rec.Headers,
+		})
+		child.offset = offset + 1
+	}
+	if len(messages) == 0 {
 		return nil, ErrIncompleteResponse
 	}
-
-	child.offset = batch.FirstOffset + int64(batch.LastOffsetDelta) + 1
-	if child.offset <= originalOffset {
-		return nil, ErrConsumerOffsetNotAdvanced
-	}
-
 	return messages, nil
 }