瀏覽代碼

Merge pull request #1006 from SamiHiltunen/fetch-based-on-last-delta

partition consumer offset to last offset delta
Evan Huus 8 年之前
父節點
當前提交
b1433c2924
共有 4 個文件被更改,包括 38 次插入3 次删除
  1. 8 1
      consumer.go
  2. 16 2
      consumer_test.go
  3. 4 0
      errors.go
  4. 10 0
      fetch_response.go

+ 8 - 1
consumer.go

@@ -523,6 +523,7 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
 	var messages []*ConsumerMessage
 	var incomplete bool
 	prelude := true
+	originalOffset := child.offset
 
 	for _, rec := range batch.Records {
 		offset := batch.FirstOffset + rec.OffsetDelta
@@ -547,9 +548,15 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
 		}
 	}
 
-	if incomplete || len(messages) == 0 {
+	if incomplete {
 		return nil, ErrIncompleteResponse
 	}
+
+	child.offset = batch.FirstOffset + int64(batch.LastOffsetDelta) + 1
+	if child.offset <= originalOffset {
+		return nil, ErrConsumerOffsetNotAdvanced
+	}
+
 	return messages, nil
 }
 

+ 16 - 2
consumer_test.go

@@ -389,10 +389,12 @@ func TestConsumerExtraOffsets(t *testing.T) {
 	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
 	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
 	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4)
+	newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4)
 	newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
 	for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
 		var offsetResponseVersion int16
 		cfg := NewConfig()
+		cfg.Consumer.Return.Errors = true
 		if fetchResponse1.Version >= 4 {
 			cfg.Version = V0_11_0_0
 			offsetResponseVersion = 1
@@ -426,8 +428,19 @@ func TestConsumerExtraOffsets(t *testing.T) {
 
 		// Then: messages with offsets 1 and 2 are not returned even though they
 		// are present in the response.
-		assertMessageOffset(t, <-consumer.Messages(), 3)
-		assertMessageOffset(t, <-consumer.Messages(), 4)
+		select {
+		case msg := <-consumer.Messages():
+			assertMessageOffset(t, msg, 3)
+		case err := <-consumer.Errors():
+			t.Fatal(err)
+		}
+
+		select {
+		case msg := <-consumer.Messages():
+			assertMessageOffset(t, msg, 4)
+		case err := <-consumer.Errors():
+			t.Fatal(err)
+		}
 
 		safeClose(t, consumer)
 		safeClose(t, master)
@@ -490,6 +503,7 @@ func TestConsumerNonSequentialOffsets(t *testing.T) {
 	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
 	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
 	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
+	newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11)
 	newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
 	for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
 		var offsetResponseVersion int16

+ 4 - 0
errors.go

@@ -37,6 +37,10 @@ var ErrShuttingDown = errors.New("kafka: message received by producer in process
 // ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
 var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")
 
+// ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing
+// a RecordBatch.
+var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")
+
 // PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
 // if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
 type PacketEncodingError struct {

+ 10 - 0
fetch_response.go

@@ -309,6 +309,16 @@ func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Enco
 	batch.addRecord(rec)
 }
 
+func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
+	frb := r.getOrCreateBlock(topic, partition)
+	batch := frb.Records.recordBatch
+	if batch == nil {
+		batch = &RecordBatch{Version: 2}
+		frb.Records = newDefaultRecords(batch)
+	}
+	batch.LastOffsetDelta = offset
+}
+
 func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
 	frb := r.getOrCreateBlock(topic, partition)
 	frb.LastStableOffset = offset