package kafka import ( "encoding/binary" "sarama/mock" "testing" ) func TestSimpleConsumer(t *testing.T) { masterResponses := make(chan []byte, 1) extraResponses := make(chan []byte) mockBroker := mock.NewBroker(t, masterResponses) mockExtra := mock.NewBroker(t, extraResponses) defer mockBroker.Close() defer mockExtra.Close() // return the extra mock as another available broker response := []byte{ 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x07, 'm', 'y', 'T', 'o', 'p', 'i', 'c', 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port())) masterResponses <- response go func() { for i := 0; i < 10; i++ { msg := []byte{ 0x00, 0x00, 0x00, 0x01, 0x00, 0x07, 'm', 'y', 'T', 'o', 'p', 'i', 'c', 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x1C, // messageSet 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, // message 0x23, 0x96, 0x4a, 0xf7, // CRC 0x00, 0x00, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x02, 0x00, 0xEE} binary.BigEndian.PutUint64(msg[35:], uint64(i)) extraResponses <- msg } extraResponses <- []byte{ 0x00, 0x00, 0x00, 0x01, 0x00, 0x07, 'm', 'y', 'T', 'o', 'p', 'i', 'c', 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} }() client, err := NewClient("clientID", "localhost", mockBroker.Port()) if err != nil { t.Fatal(err) } consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup") if err != nil { t.Fatal(err) } for i := 0; i < 10; i++ { select { case msg := <-consumer.Messages(): if msg.Offset != int64(i) { t.Error("Incorrect message offset!") } case err := <-consumer.Errors(): t.Error(err) } } consumer.Close() client.Close() }