|
|
@@ -9,33 +9,40 @@ import (
|
|
|
)
|
|
|
|
|
|
// FakeKafkaServer is a mock helper for testing the Broker and other higher-level APIs.
|
|
|
-// It takes a testing.T as provided by the test framework, a channel of responses, and a done channel.
|
|
|
+// It takes a testing.T as provided by the test framework and a channel of responses to use.
|
|
|
// It spawns a TCP server on a kernel-selected localhost port, then spawns a goroutine that reads Kafka requests
|
|
|
// from that port and returns each provided response in order (if a response is nil, nothing is sent).
|
|
|
-// It returns the port on which it is listening or an error (if an error is returned, the goroutine is not spawned,
|
|
|
-// if an error occurs *in* the goroutine it is simply logged to the testing.T and the goroutine exits).
|
|
|
-// When the responses channel closes, it closes the done channel and exits. The calling test must read from the done
|
|
|
-// channel as its last step to ensure the number of requests and provided responses lined up correctly.
|
|
|
+// When the server is successfully created, it returns the port on which it is listening and a 'done' channel
|
|
|
+// which it will close when it exits. Otherwise it will return an error (if an error occurs *in* the goroutine it
|
|
|
+// is simply logged to the testing.T and the goroutine exits). There is also a StopFakeServer helper that leads to
|
|
|
+// this recommended pattern in tests:
|
|
|
//
|
|
|
-// When running tests with this, it is strongly recommended to specify a -timeout to `go test` so that if the test hangs
|
|
|
+// port, done, err := FakeKafkaServer(t, responses)
|
|
|
+// if err != nil {
|
|
|
+// t.Fatal(err)
|
|
|
+// }
|
|
|
+// defer StopFakeServer(responses, done)
|
|
|
+//
|
|
|
+// When running tests like this, it is strongly recommended to specify a -timeout to `go test` so that if the test hangs
|
|
|
// waiting for a response, it automatically panics.
|
|
|
//
|
|
|
// It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
|
|
|
// automatically as a convenience.
|
|
|
-func FakeKafkaServer(t *testing.T, responses <-chan []byte, done chan<- bool) (int32, error) {
|
|
|
+func FakeKafkaServer(t *testing.T, responses <-chan []byte) (int32, <-chan bool, error) {
|
|
|
ln, err := net.Listen("tcp", "localhost:0")
|
|
|
if err != nil {
|
|
|
- return 0, err
|
|
|
+ return 0, nil, err
|
|
|
}
|
|
|
_, portStr, err := net.SplitHostPort(ln.Addr().String())
|
|
|
if err != nil {
|
|
|
- return 0, err
|
|
|
+ return 0, nil, err
|
|
|
}
|
|
|
tmp, err := strconv.ParseInt(portStr, 10, 32)
|
|
|
if err != nil {
|
|
|
- return 0, err
|
|
|
+ return 0, nil, err
|
|
|
}
|
|
|
port := int32(tmp)
|
|
|
+ done := make(chan bool)
|
|
|
go func() {
|
|
|
defer close(done)
|
|
|
conn, err := ln.Accept()
|
|
|
@@ -101,7 +108,12 @@ func FakeKafkaServer(t *testing.T, responses <-chan []byte, done chan<- bool) (i
|
|
|
return
|
|
|
}
|
|
|
}()
|
|
|
- return port, nil
|
|
|
+ return port, done, nil
|
|
|
+}
|
|
|
+
|
|
|
+func StopFakeServer(responses chan []byte, done <-chan bool) {
|
|
|
+ close(responses)
|
|
|
+ <-done
|
|
|
}
|
|
|
|
|
|
func TestBrokerEquals(t *testing.T) {
|
|
|
@@ -160,24 +172,20 @@ func TestBrokerID(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestBrokerConnectClose(t *testing.T) {
|
|
|
- done := make(chan bool)
|
|
|
responses := make(chan []byte)
|
|
|
- port, err := FakeKafkaServer(t, responses, done)
|
|
|
+ port, done, err := FakeKafkaServer(t, responses)
|
|
|
if err != nil {
|
|
|
- t.Error(err)
|
|
|
- return
|
|
|
+ t.Fatal(err)
|
|
|
}
|
|
|
+ defer StopFakeServer(responses, done)
|
|
|
+
|
|
|
broker := NewBroker("localhost", port)
|
|
|
err = broker.Connect()
|
|
|
if err != nil {
|
|
|
- t.Error(err)
|
|
|
- return
|
|
|
+ t.Fatal(err)
|
|
|
}
|
|
|
err = broker.Close()
|
|
|
if err != nil {
|
|
|
t.Error(err)
|
|
|
- return
|
|
|
}
|
|
|
- close(responses)
|
|
|
- <-done
|
|
|
}
|