Browse Source

Add sleep capability to mock broker

Evan Huus 11 years ago
parent
commit
3e5f3e4441
2 changed files with 9 additions and 3 deletions
  1. 8 2
      mock/broker.go
  2. 1 1
      protocol/broker_test.go

+ 8 - 2
mock/broker.go

@@ -12,11 +12,13 @@ import (
 	"net"
 	"strconv"
 	"testing"
+	"time"
 )
 
 // Broker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
 // accepts a single connection. It reads Kafka requests from that connection and returns each response
-// from the channel provided at creation-time (if a response is nil, nothing is sent).
+// from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
+// the server sleeps for 250ms instead of reading a request).
 //
 // When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
 // waiting for a response, the test panics.
@@ -55,6 +57,10 @@ func (b *Broker) serverLoop() {
 	reqHeader := make([]byte, 4)
 	resHeader := make([]byte, 8)
 	for response := range b.responses {
+		if response == nil {
+			time.Sleep(250 * time.Millisecond)
+			continue
+		}
 		_, err := io.ReadFull(conn, reqHeader)
 		if err != nil {
 			b.t.Error(err)
@@ -76,7 +82,7 @@ func (b *Broker) serverLoop() {
 			b.listener.Close()
 			return
 		}
-		if response == nil {
+		if len(response) == 0 {
 			continue
 		}
 		binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))

+ 1 - 1
protocol/broker_test.go

@@ -122,7 +122,7 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{nil,
+	{[]byte{},
 		func(t *testing.T, broker *Broker) {
 			request := ProduceRequest{}
 			request.RequiredAcks = types.NO_RESPONSE