|
|
@@ -8,7 +8,20 @@ import (
|
|
|
"testing"
|
|
|
)
|
|
|
|
|
|
-func fakeTCPServer(t *testing.T, responses [][]byte, done chan<- bool) (int32, error) {
|
|
|
+// FakeKafkaServer is a mock helper for testing the Broker and other higher-level APIs.
|
|
|
+// It takes a testing.T as provided by the test framework, a set of responses, and a done channel.
|
|
|
+// It spawns a TCP server on a kernel-selected localhost port, then spawns a goroutine that reads Kafka requests
|
|
|
+// from that port and returns each provided response in order (if a response is nil, nothing is sent).
|
|
|
+// It returns the port on which it is listening or an error (if an error is returned, the goroutine is not spawned,
|
|
|
+// if an error occurs *in* the goroutine it is simply logged to the testing.T and the goroutine exits).
|
|
|
+// When the goroutine finishes, it closes the done channel. The using test must read from the done channel as its
|
|
|
+// last step to ensure the number of requests and provided responses lined up correctly.
|
|
|
+//
|
|
|
+// When running tests with this, it is strongly recommended to specify a -timeout to `go test` so that if the test hangs
|
|
|
+// waiting for a response, it automatically panics.
|
|
|
+//
|
|
|
+// It is not necessary to prefix message length to your response bytes, the server does that automatically as a convenience.
|
|
|
+func FakeKafkaServer(t *testing.T, responses [][]byte, done chan<- bool) (int32, error) {
|
|
|
ln, err := net.Listen("tcp", "localhost:0")
|
|
|
if err != nil {
|
|
|
return 0, err
|
|
|
@@ -23,12 +36,12 @@ func fakeTCPServer(t *testing.T, responses [][]byte, done chan<- bool) (int32, e
|
|
|
}
|
|
|
port := int32(tmp)
|
|
|
go func() {
|
|
|
+ defer close(done)
|
|
|
conn, err := ln.Accept()
|
|
|
if err != nil {
|
|
|
t.Error(err)
|
|
|
conn.Close()
|
|
|
ln.Close()
|
|
|
- done<- true
|
|
|
return
|
|
|
}
|
|
|
for _, response := range responses {
|
|
|
@@ -38,7 +51,6 @@ func fakeTCPServer(t *testing.T, responses [][]byte, done chan<- bool) (int32, e
|
|
|
t.Error(err)
|
|
|
conn.Close()
|
|
|
ln.Close()
|
|
|
- done<- true
|
|
|
return
|
|
|
}
|
|
|
body := make([]byte, binary.BigEndian.Uint32(header))
|
|
|
@@ -47,7 +59,17 @@ func fakeTCPServer(t *testing.T, responses [][]byte, done chan<- bool) (int32, e
|
|
|
t.Error(err)
|
|
|
conn.Close()
|
|
|
ln.Close()
|
|
|
- done<- true
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if response == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ binary.BigEndian.PutUint32(header, uint32(len(response)))
|
|
|
+ _, err = conn.Write(header)
|
|
|
+ if err != nil {
|
|
|
+ t.Error(err)
|
|
|
+ conn.Close()
|
|
|
+ ln.Close()
|
|
|
return
|
|
|
}
|
|
|
_, err = conn.Write(response)
|
|
|
@@ -55,7 +77,6 @@ func fakeTCPServer(t *testing.T, responses [][]byte, done chan<- bool) (int32, e
|
|
|
t.Error(err)
|
|
|
conn.Close()
|
|
|
ln.Close()
|
|
|
- done<- true
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
@@ -63,16 +84,13 @@ func fakeTCPServer(t *testing.T, responses [][]byte, done chan<- bool) (int32, e
|
|
|
if err != nil {
|
|
|
t.Error(err)
|
|
|
ln.Close()
|
|
|
- done<- true
|
|
|
return
|
|
|
}
|
|
|
err = ln.Close()
|
|
|
if err != nil {
|
|
|
t.Error(err)
|
|
|
- done<- true
|
|
|
return
|
|
|
}
|
|
|
- done<- true
|
|
|
}()
|
|
|
return port, nil
|
|
|
}
|
|
|
@@ -134,7 +152,7 @@ func TestBrokerID(t *testing.T) {
|
|
|
|
|
|
func TestBrokerConnectClose(t *testing.T) {
|
|
|
done := make(chan bool)
|
|
|
- port, err := fakeTCPServer(t, [][]byte{}, done)
|
|
|
+ port, err := FakeKafkaServer(t, [][]byte{}, done)
|
|
|
if err != nil {
|
|
|
t.Error(err)
|
|
|
return
|