浏览代码

Improve consumer test to actually consume messages

Evan Huus 12 年之前
父节点
当前提交
927944d928
共有 1 个文件被更改,包括 43 次插入10 次删除
  1. 43 10
      kafka/consumer_test.go

+ 43 - 10
kafka/consumer_test.go

@@ -8,7 +8,7 @@ import (
 
 func TestSimpleConsumer(t *testing.T) {
 	masterResponses := make(chan []byte, 1)
-	extraResponses := make(chan []byte, 2)
+	extraResponses := make(chan []byte)
 	mockBroker := mock.NewBroker(t, masterResponses)
 	mockExtra := mock.NewBroker(t, extraResponses)
 	defer mockBroker.Close()
@@ -31,15 +31,37 @@ func TestSimpleConsumer(t *testing.T) {
 		0x00, 0x00, 0x00, 0x00}
 	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
 	masterResponses <- response
-	extraResponses <- nil
-	extraResponses <- []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x07, 'm', 'y', 'T', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
+	go func() {
+		for i:=0; i<10; i++ {
+			msg := []byte{
+				0x00, 0x00, 0x00, 0x01,
+				0x00, 0x07, 'm', 'y', 'T', 'o', 'p', 'i', 'c',
+				0x00, 0x00, 0x00, 0x01,
+				0x00, 0x00, 0x00, 0x00,
+				0x00, 0x00,
+				0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+				0x00, 0x00, 0x00, 0x1C,
+				// messageSet
+				0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+				0x00, 0x00, 0x00, 0x10,
+				// message
+				0x23, 0x96, 0x4a, 0xf7, // CRC
+				0x00,
+				0x00,
+				0xFF, 0xFF, 0xFF, 0xFF,
+				0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
+			binary.BigEndian.PutUint64(msg[35:], uint64(i))
+			extraResponses <- msg
+		}
+		extraResponses <- []byte{
+			0x00, 0x00, 0x00, 0x01,
+			0x00, 0x07, 'm', 'y', 'T', 'o', 'p', 'i', 'c',
+			0x00, 0x00, 0x00, 0x01,
+			0x00, 0x00, 0x00, 0x00,
+			0x00, 0x00,
+			0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+			0x00, 0x00, 0x00, 0x00}
+	}()
 
 	client, err := NewClient("clientID", "localhost", mockBroker.Port())
 	if err != nil {
@@ -51,6 +73,17 @@ func TestSimpleConsumer(t *testing.T) {
 		t.Fatal(err)
 	}
 
+	for i:=0; i<10; i++ {
+		select {
+		case msg := <-consumer.Messages():
+			if msg.Offset != int64(i) {
+				t.Error("Incorrect message offset!")
+			}
+		case err := <-consumer.Errors():
+			t.Error(err)
+		}
+	}
+
 	consumer.Close()
 	client.Close()
 }