Selaa lähdekoodia

consumer: add unit test

also fix a bunch of other bugs found by the test
Evan Huus 11 vuotta sitten
vanhempi
commit
39a066ae3d
3 muutettua tiedostoa jossa 58 lisäystä ja 15 poistoa
  1. 10 9
      consumer.go
  2. 42 1
      consumer_test.go
  3. 6 5
      fetch_response.go

+ 10 - 9
consumer.go

@@ -263,8 +263,8 @@ func (c *Consumer) fetchMessages() {
 			fetchSize = c.config.DefaultFetchSize
 		}
 
+		atLeastOne := false
 		for _, msgBlock := range block.MsgSet.Messages {
-			atLeastOne := false
 			prelude := true
 
 			for _, msg := range msgBlock.Messages() {
@@ -293,14 +293,15 @@ func (c *Consumer) fetchMessages() {
 				}
 			}
 
-			if !atLeastOne {
-				select {
-				case <-c.stopper:
-					close(c.events)
-					close(c.done)
-					return
-				case c.events <- &ConsumerEvent{Topic: c.topic, Partition: c.partition, Err: IncompleteResponse}:
-				}
+		}
+
+		if !atLeastOne {
+			select {
+			case <-c.stopper:
+				close(c.events)
+				close(c.done)
+				return
+			case c.events <- &ConsumerEvent{Topic: c.topic, Partition: c.partition, Err: IncompleteResponse}:
 			}
 		}
 	}

+ 42 - 1
consumer_test.go

@@ -46,7 +46,7 @@ func TestSimpleConsumer(t *testing.T) {
 	for i := 0; i < 10; i++ {
 		event := <-consumer.Events()
 		if event.Err != nil {
-			t.Error(err)
+			t.Error(event.Err)
 		}
 		if event.Offset != int64(i) {
 			t.Error("Incorrect message offset!")
@@ -124,6 +124,47 @@ func TestConsumerLatestOffset(t *testing.T) {
 	}
 }
 
+func TestConsumerPrelude(t *testing.T) {
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
+
+	mdr := new(MetadataResponse)
+	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
+	mdr.AddTopicPartition("my_topic", 0, 2)
+	mb1.Returns(mdr)
+
+	fr := new(FetchResponse)
+	fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
+	fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
+	mb2.Returns(fr)
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+
+	config := NewConsumerConfig()
+	config.OffsetMethod = OffsetMethodManual
+	config.OffsetValue = 1
+	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", config)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer consumer.Close()
+	defer mb1.Close()
+	defer mb2.Close()
+
+	event := <-consumer.Events()
+	if event.Err != nil {
+		t.Error(event.Err)
+	}
+	if event.Offset != 1 {
+		t.Error("Incorrect message offset!")
+	}
+}
+
 func ExampleConsumer() {
 	client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
 	if err != nil {

+ 6 - 5
fetch_response.go

@@ -137,8 +137,11 @@ func (fr *FetchResponse) AddMessage(topic string, partition int32, key, value En
 		partitions = make(map[int32]*FetchResponseBlock)
 		fr.Blocks[topic] = partitions
 	}
-	frb := new(FetchResponseBlock)
-	partitions[partition] = frb
+	frb, ok := partitions[partition]
+	if !ok {
+		frb = new(FetchResponseBlock)
+		partitions[partition] = frb
+	}
 	var kb []byte
 	var vb []byte
 	if key != nil {
@@ -147,9 +150,7 @@ func (fr *FetchResponse) AddMessage(topic string, partition int32, key, value En
 	if value != nil {
 		vb, _ = value.Encode()
 	}
-	var msgSet MessageSet
 	msg := &Message{Key: kb, Value: vb}
 	msgBlock := &MessageBlock{Msg: msg, Offset: offset}
-	msgSet.Messages = append(msgSet.Messages, msgBlock)
-	frb.MsgSet = msgSet
+	frb.MsgSet.Messages = append(frb.MsgSet.Messages, msgBlock)
 }