|
|
@@ -4,6 +4,7 @@ import (
|
|
|
"encoding/binary"
|
|
|
"io"
|
|
|
"net"
|
|
|
+ "sarama/types"
|
|
|
"strconv"
|
|
|
"testing"
|
|
|
)
|
|
|
@@ -79,7 +80,7 @@ func FakeKafkaServer(t *testing.T, responses <-chan []byte) (int32, <-chan bool,
|
|
|
if response == nil {
|
|
|
continue
|
|
|
}
|
|
|
- binary.BigEndian.PutUint32(resHeader, uint32(len(response)))
|
|
|
+ binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
|
|
|
binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
|
|
|
_, err = conn.Write(resHeader)
|
|
|
if err != nil {
|
|
|
@@ -171,7 +172,7 @@ func TestBrokerID(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func TestBrokerConnectClose(t *testing.T) {
|
|
|
+func TestSimpleBrokerCommunication(t *testing.T) {
|
|
|
responses := make(chan []byte)
|
|
|
port, done, err := FakeKafkaServer(t, responses)
|
|
|
if err != nil {
|
|
|
@@ -184,8 +185,43 @@ func TestBrokerConnectClose(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
+
|
|
|
+ for _, tt := range brokerTestTable {
|
|
|
+ go func() {
|
|
|
+ responses <- tt.response
|
|
|
+ }()
|
|
|
+ tt.runner(t, broker)
|
|
|
+ }
|
|
|
+
|
|
|
err = broker.Close()
|
|
|
if err != nil {
|
|
|
t.Error(err)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+var brokerTestTable = []struct {
|
|
|
+ response []byte
|
|
|
+ runner func(*testing.T, *Broker)
|
|
|
+}{
|
|
|
+ {[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
|
|
|
+ func(t *testing.T, broker *Broker) {
|
|
|
+ request := MetadataRequest{}
|
|
|
+ _, err := broker.GetMetadata("clientID", &request)
|
|
|
+ if err != nil {
|
|
|
+ t.Error(err)
|
|
|
+ }
|
|
|
+ }},
|
|
|
+
|
|
|
+ {nil,
|
|
|
+ func(t *testing.T, broker *Broker) {
|
|
|
+ request := ProduceRequest{}
|
|
|
+ request.RequiredAcks = types.NO_RESPONSE
|
|
|
+ response, err := broker.Produce("clientID", &request)
|
|
|
+ if err != nil {
|
|
|
+ t.Error(err)
|
|
|
+ }
|
|
|
+ if response != nil {
|
|
|
+ t.Error("Produce request with NO_RESPONSE got a response!")
|
|
|
+ }
|
|
|
+ }},
|
|
|
+}
|