|
|
@@ -0,0 +1,56 @@
|
|
|
+package kafka
|
|
|
+
|
|
|
+import (
|
|
|
+ "encoding/binary"
|
|
|
+ "sarama/mock"
|
|
|
+ "testing"
|
|
|
+)
|
|
|
+
|
|
|
+func TestSimpleConsumer(t *testing.T) {
|
|
|
+ masterResponses := make(chan []byte, 1)
|
|
|
+ extraResponses := make(chan []byte, 2)
|
|
|
+ mockBroker := mock.NewBroker(t, masterResponses)
|
|
|
+ mockExtra := mock.NewBroker(t, extraResponses)
|
|
|
+ defer mockBroker.Close()
|
|
|
+ defer mockExtra.Close()
|
|
|
+
|
|
|
+ // return the extra mock as another available broker
|
|
|
+ response := []byte{
|
|
|
+ 0x00, 0x00, 0x00, 0x01,
|
|
|
+ 0x00, 0x00, 0x00, 0x01,
|
|
|
+ 0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
|
|
|
+ 0x00, 0x00, 0x00, 0x00,
|
|
|
+ 0x00, 0x00, 0x00, 0x01,
|
|
|
+ 0x00, 0x00,
|
|
|
+ 0x00, 0x07, 'm', 'y', 'T', 'o', 'p', 'i', 'c',
|
|
|
+ 0x00, 0x00, 0x00, 0x01,
|
|
|
+ 0x00, 0x00,
|
|
|
+ 0x00, 0x00, 0x00, 0x00,
|
|
|
+ 0x00, 0x00, 0x00, 0x01,
|
|
|
+ 0x00, 0x00, 0x00, 0x00,
|
|
|
+ 0x00, 0x00, 0x00, 0x00}
|
|
|
+ binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
|
|
|
+ masterResponses <- response
|
|
|
+ extraResponses <- nil
|
|
|
+ extraResponses <- []byte{
|
|
|
+ 0x00, 0x00, 0x00, 0x01,
|
|
|
+ 0x00, 0x07, 'm', 'y', 'T', 'o', 'p', 'i', 'c',
|
|
|
+ 0x00, 0x00, 0x00, 0x01,
|
|
|
+ 0x00, 0x00, 0x00, 0x00,
|
|
|
+ 0x00, 0x00,
|
|
|
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
|
|
+ 0x00, 0x00, 0x00, 0x00}
|
|
|
+
|
|
|
+ client, err := NewClient("clientID", "localhost", mockBroker.Port())
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup")
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ consumer.Close()
|
|
|
+ client.Close()
|
|
|
+}
|