|
|
@@ -9,19 +9,19 @@ import (
|
|
|
)
|
|
|
|
|
|
// FakeKafkaServer is a mock helper for testing the Broker and other higher-level APIs.
|
|
|
-// It takes a testing.T as provided by the test framework, a set of responses, and a done channel.
|
|
|
+// It takes a testing.T as provided by the test framework, a channel of responses, and a done channel.
|
|
|
// It spawns a TCP server on a kernel-selected localhost port, then spawns a goroutine that reads Kafka requests
|
|
|
// from that port and returns each provided response in order (if a response is nil, nothing is sent).
|
|
|
// It returns the port on which it is listening or an error (if an error is returned, the goroutine is not spawned,
|
|
|
// if an error occurs *in* the goroutine it is simply logged to the testing.T and the goroutine exits).
|
|
|
-// When the goroutine finishes, it closes the done channel. The using test must read from the done channel as its
|
|
|
-// last step to ensure the number of requests and provided responses lined up correctly.
|
|
|
+// When the responses channel closes, it closes the done channel and exits. The calling test must read from the done
|
|
|
+// channel as its last step to ensure the number of requests and provided responses lined up correctly.
|
|
|
//
|
|
|
// When running tests with this, it is strongly recommended to specify a -timeout to `go test` so that if the test hangs
|
|
|
// waiting for a response, it automatically panics.
|
|
|
//
|
|
|
// It is not necessary to prefix message length to your response bytes, the server does that automatically as a convenience.
|
|
|
-func FakeKafkaServer(t *testing.T, responses [][]byte, done chan<- bool) (int32, error) {
|
|
|
+func FakeKafkaServer(t *testing.T, responses <-chan []byte, done chan<- bool) (int32, error) {
|
|
|
ln, err := net.Listen("tcp", "localhost:0")
|
|
|
if err != nil {
|
|
|
return 0, err
|
|
|
@@ -44,7 +44,7 @@ func FakeKafkaServer(t *testing.T, responses [][]byte, done chan<- bool) (int32,
|
|
|
ln.Close()
|
|
|
return
|
|
|
}
|
|
|
- for _, response := range responses {
|
|
|
+ for response := range responses {
|
|
|
header := make([]byte, 4)
|
|
|
_, err := io.ReadFull(conn, header)
|
|
|
if err != nil {
|
|
|
@@ -152,7 +152,8 @@ func TestBrokerID(t *testing.T) {
|
|
|
|
|
|
func TestBrokerConnectClose(t *testing.T) {
|
|
|
done := make(chan bool)
|
|
|
- port, err := FakeKafkaServer(t, [][]byte{}, done)
|
|
|
+ responses := make(chan []byte)
|
|
|
+ port, err := FakeKafkaServer(t, responses, done)
|
|
|
if err != nil {
|
|
|
t.Error(err)
|
|
|
return
|
|
|
@@ -168,5 +169,6 @@ func TestBrokerConnectClose(t *testing.T) {
|
|
|
t.Error(err)
|
|
|
return
|
|
|
}
|
|
|
+ close(responses)
|
|
|
<-done
|
|
|
}
|