|
|
@@ -27,6 +27,8 @@ type Broker struct {
|
|
|
port int32
|
|
|
stopper chan bool
|
|
|
responses chan []byte
|
|
|
+ listener net.Listener
|
|
|
+ t *testing.T
|
|
|
}
|
|
|
|
|
|
// Port is the kernel-select TCP port the broker is listening on.
|
|
|
@@ -41,14 +43,86 @@ func (b *Broker) Close() {
|
|
|
<-b.stopper
|
|
|
}
|
|
|
|
|
|
+func (b *Broker) serverLoop() {
|
|
|
+ defer close(b.stopper)
|
|
|
+ conn, err := b.listener.Accept()
|
|
|
+ if err != nil {
|
|
|
+ b.t.Error(err)
|
|
|
+ conn.Close()
|
|
|
+ b.listener.Close()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ reqHeader := make([]byte, 4)
|
|
|
+ resHeader := make([]byte, 8)
|
|
|
+ for response := range b.responses {
|
|
|
+ _, err := io.ReadFull(conn, reqHeader)
|
|
|
+ if err != nil {
|
|
|
+ b.t.Error(err)
|
|
|
+ conn.Close()
|
|
|
+ b.listener.Close()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ body := make([]byte, binary.BigEndian.Uint32(reqHeader))
|
|
|
+ if len(body) < 10 {
|
|
|
+ b.t.Error("Kafka request too short.")
|
|
|
+ conn.Close()
|
|
|
+ b.listener.Close()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ _, err = io.ReadFull(conn, body)
|
|
|
+ if err != nil {
|
|
|
+ b.t.Error(err)
|
|
|
+ conn.Close()
|
|
|
+ b.listener.Close()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if response == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
|
|
|
+ binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
|
|
|
+ _, err = conn.Write(resHeader)
|
|
|
+ if err != nil {
|
|
|
+ b.t.Error(err)
|
|
|
+ conn.Close()
|
|
|
+ b.listener.Close()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ _, err = conn.Write(response)
|
|
|
+ if err != nil {
|
|
|
+ b.t.Error(err)
|
|
|
+ conn.Close()
|
|
|
+ b.listener.Close()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ err = conn.Close()
|
|
|
+ if err != nil {
|
|
|
+ b.t.Error(err)
|
|
|
+ b.listener.Close()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ err = b.listener.Close()
|
|
|
+ if err != nil {
|
|
|
+ b.t.Error(err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// NewBroker launches a fake Kafka broker. It takes a testing.T as provided by the test framework and a channel of responses to use.
|
|
|
// If an error occurs it is simply logged to the testing.T and the broker exits.
|
|
|
func NewBroker(t *testing.T, responses chan []byte) *Broker {
|
|
|
- ln, err := net.Listen("tcp", "localhost:0")
|
|
|
+ var err error
|
|
|
+
|
|
|
+ broker := new(Broker)
|
|
|
+ broker.stopper = make(chan bool)
|
|
|
+ broker.responses = responses
|
|
|
+
|
|
|
+ broker.listener, err = net.Listen("tcp", "localhost:0")
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- _, portStr, err := net.SplitHostPort(ln.Addr().String())
|
|
|
+ _, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
@@ -56,74 +130,9 @@ func NewBroker(t *testing.T, responses chan []byte) *Broker {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- broker := new(Broker)
|
|
|
broker.port = int32(tmp)
|
|
|
- broker.stopper = make(chan bool)
|
|
|
- broker.responses = responses
|
|
|
- go func() {
|
|
|
- defer close(broker.stopper)
|
|
|
- conn, err := ln.Accept()
|
|
|
- if err != nil {
|
|
|
- t.Error(err)
|
|
|
- conn.Close()
|
|
|
- ln.Close()
|
|
|
- return
|
|
|
- }
|
|
|
- reqHeader := make([]byte, 4)
|
|
|
- resHeader := make([]byte, 8)
|
|
|
- for response := range responses {
|
|
|
- _, err := io.ReadFull(conn, reqHeader)
|
|
|
- if err != nil {
|
|
|
- t.Error(err)
|
|
|
- conn.Close()
|
|
|
- ln.Close()
|
|
|
- return
|
|
|
- }
|
|
|
- body := make([]byte, binary.BigEndian.Uint32(reqHeader))
|
|
|
- if len(body) < 10 {
|
|
|
- t.Error("Kafka request too short.")
|
|
|
- conn.Close()
|
|
|
- ln.Close()
|
|
|
- return
|
|
|
- }
|
|
|
- _, err = io.ReadFull(conn, body)
|
|
|
- if err != nil {
|
|
|
- t.Error(err)
|
|
|
- conn.Close()
|
|
|
- ln.Close()
|
|
|
- return
|
|
|
- }
|
|
|
- if response == nil {
|
|
|
- continue
|
|
|
- }
|
|
|
- binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
|
|
|
- binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
|
|
|
- _, err = conn.Write(resHeader)
|
|
|
- if err != nil {
|
|
|
- t.Error(err)
|
|
|
- conn.Close()
|
|
|
- ln.Close()
|
|
|
- return
|
|
|
- }
|
|
|
- _, err = conn.Write(response)
|
|
|
- if err != nil {
|
|
|
- t.Error(err)
|
|
|
- conn.Close()
|
|
|
- ln.Close()
|
|
|
- return
|
|
|
- }
|
|
|
- }
|
|
|
- err = conn.Close()
|
|
|
- if err != nil {
|
|
|
- t.Error(err)
|
|
|
- ln.Close()
|
|
|
- return
|
|
|
- }
|
|
|
- err = ln.Close()
|
|
|
- if err != nil {
|
|
|
- t.Error(err)
|
|
|
- return
|
|
|
- }
|
|
|
- }()
|
|
|
+
|
|
|
+ go broker.serverLoop()
|
|
|
+
|
|
|
return broker
|
|
|
}
|