Browse Source

Enhance mock broker

Maxim Vladimirsky 9 years ago
parent
commit
da6bc48ffa
3 changed files with 154 additions and 73 deletions
  1. 16 9
      async_producer_test.go
  2. 3 7
      consumer_test.go
  3. 135 57
      mockbroker_test.go

+ 16 - 9
async_producer_test.go

@@ -285,7 +285,10 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
 	closeProducer(t, producer)
 }
 
+// If a Kafka broker becomes unavailable and then returns back in service, then
+// producer reconnects to it and continues sending messages.
 func TestAsyncProducerBrokerBounce(t *testing.T) {
+	// Given
 	seedBroker := newMockBroker(t, 1)
 	leader := newMockBroker(t, 2)
 	leaderAddr := leader.Addr()
@@ -295,30 +298,34 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
 	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+
 	config := NewConfig()
-	config.Producer.Flush.Messages = 10
+	config.Producer.Flush.Messages = 1
 	config.Producer.Return.Successes = true
 	config.Producer.Retry.Backoff = 0
 	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
+	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+	leader.Returns(prodSuccess)
+	expectResults(t, producer, 1, 0)
 
-	for i := 0; i < 10; i++ {
-		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
-	}
+	// When: a broker connection gets reset by a broker (network glitch, restart, you name it).
 	leader.Close()                               // producer should get EOF
 	leader = newMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
 	seedBroker.Returns(metadataResponse)         // tell it to go to broker 2 again
 
-	prodSuccess := new(ProduceResponse)
-	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+	// Then: a produced message goes through the new broker connection.
+	producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	leader.Returns(prodSuccess)
-	expectResults(t, producer, 10, 0)
-	seedBroker.Close()
-	leader.Close()
+	expectResults(t, producer, 1, 0)
 
 	closeProducer(t, producer)
+	seedBroker.Close()
+	leader.Close()
 }
 
 func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {

+ 3 - 7
consumer_test.go

@@ -447,13 +447,13 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
 	}
 
 	//redirect partition 1 back to main leader
-	fetchResponse := new(FetchResponse)
-	fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
-	tmp.Returns(fetchResponse)
 	metadataResponse = new(MetadataResponse)
 	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
+	fetchResponse := new(FetchResponse)
+	fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
+	tmp.Returns(fetchResponse)
 	time.Sleep(5 * time.Millisecond)
 
 	// now send one message to each partition to make sure everything is primed
@@ -493,10 +493,6 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
 	case <-c0.Errors():
 	case <-c1.Errors():
 	}
-	// send it back to the same broker
-	seedBroker.Returns(metadataResponse)
-
-	time.Sleep(5 * time.Millisecond)
 
 	select {
 	case <-c0.Messages():

+ 135 - 57
mockbroker_test.go

@@ -1,39 +1,66 @@
 package sarama
 
 import (
+	"bytes"
 	"encoding/binary"
-	"errors"
+	"fmt"
 	"io"
 	"net"
 	"strconv"
+	"sync"
 	"testing"
 	"time"
+
+	"github.com/davecgh/go-spew/spew"
+)
+
+const (
+	expectationTimeout = 250 * time.Millisecond
 )
 
-// mockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
-// accepts a single connection. It reads Kafka requests from that connection and returns each response
-// from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
-// the server sleeps for 250ms instead of reading a request).
+type requestHandlerFunc func(req *request) (res encoder)
+
+// mockBroker is a mock Kafka broker. It consists of a TCP server on a
+// kernel-selected localhost port that can accept many connections. It reads
+// Kafka requests from that connection and passes them to the user specified
+// handler function (see SetHandler) that generates respective responses. If
+// the handler has not been explicitly specified then the broker returns
+// responses set by the Returns function in the exact order they were provided.
+// (if a response has a len of 0, nothing is sent, and the client request will
+// timeout in this case).
 //
-// When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
-// waiting for a response, the test panics.
+// When running tests with one of these, it is strongly recommended to specify
+// a timeout to `go test` so that if the broker hangs waiting for a response,
+// the test panics.
 //
-// It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
-// automatically as a convenience.
+// It is not necessary to prefix message length or correlation ID to your
+// response bytes, the server does that automatically as a convenience.
 type mockBroker struct {
 	brokerID     int32
 	port         int32
-	stopper      chan bool
+	closing      chan none
+	stopper      chan none
 	expectations chan encoder
 	listener     net.Listener
 	t            *testing.T
 	latency      time.Duration
+	handler      requestHandlerFunc
+	handlerMux   sync.Mutex
 }
 
 func (b *mockBroker) SetLatency(latency time.Duration) {
 	b.latency = latency
 }
 
+// SetHandler sets the specified function as the request handler. Whenever
+// a mock broker reads a request from the wire it passes the request to the
+// function and sends back whatever the handler function returns.
+func (b *mockBroker) SetHandler(handler requestHandlerFunc) {
+	b.handlerMux.Lock()
+	b.handler = handler
+	b.handlerMux.Unlock()
+}
+
 func (b *mockBroker) BrokerID() int32 {
 	return b.brokerID
 }
@@ -47,80 +74,129 @@ func (b *mockBroker) Addr() string {
 }
 
 func (b *mockBroker) Close() {
+	close(b.expectations)
 	if len(b.expectations) > 0 {
-		b.t.Errorf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %d", b.BrokerID(), len(b.expectations))
+		buf := bytes.NewBufferString(fmt.Sprintf("mockbroker/%d: not all expectations were satisfied! Still waiting on:\n", b.BrokerID()))
+		for e := range b.expectations {
+			_, _ = buf.WriteString(spew.Sdump(e))
+		}
+		b.t.Error(buf.String())
 	}
-	close(b.expectations)
+	close(b.closing)
 	<-b.stopper
 }
 
-func (b *mockBroker) serverLoop() (ok bool) {
-	var (
-		err  error
-		conn net.Conn
-	)
-
+func (b *mockBroker) serverLoop() {
 	defer close(b.stopper)
-	if conn, err = b.listener.Accept(); err != nil {
-		return b.serverError(err, conn)
+	var err error
+	var conn net.Conn
+
+	go func() {
+		<-b.closing
+		safeClose(b.t, b.listener)
+	}()
+
+	wg := &sync.WaitGroup{}
+	i := 0
+	for conn, err = b.listener.Accept(); err == nil; conn, err = b.listener.Accept() {
+		wg.Add(1)
+		go b.handleRequests(conn, i, wg)
+		i++
 	}
-	reqHeader := make([]byte, 4)
+	wg.Wait()
+	Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err)
+}
+
+func (b *mockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) {
+	defer wg.Done()
+	defer func() {
+		_ = conn.Close()
+	}()
+	Logger.Printf("*** mockbroker/%d/%d: connection opened", b.BrokerID(), idx)
+	var err error
+
+	abort := make(chan none)
+	defer close(abort)
+	go func() {
+		select {
+		case <-b.closing:
+			_ = conn.Close()
+		case <-abort:
+		}
+	}()
+
 	resHeader := make([]byte, 8)
-	for expectation := range b.expectations {
-		_, err = io.ReadFull(conn, reqHeader)
+	for {
+		req, err := decodeRequest(conn)
 		if err != nil {
-			return b.serverError(err, conn)
-		}
-		body := make([]byte, binary.BigEndian.Uint32(reqHeader))
-		if len(body) < 10 {
-			return b.serverError(errors.New("Kafka request too short."), conn)
-		}
-		if _, err = io.ReadFull(conn, body); err != nil {
-			return b.serverError(err, conn)
+			Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
+			b.serverError(err)
+			break
 		}
 
 		if b.latency > 0 {
 			time.Sleep(b.latency)
 		}
 
-		response, err := encode(expectation)
+		res := b.requestHandler()(req)
+		Logger.Printf("*** mockbroker/%d/%d: served %+v -> %+v", b.brokerID, idx, req, res)
+
+		encodedRes, err := encode(res)
 		if err != nil {
-			return false
+			b.serverError(err)
+			break
 		}
-		if len(response) == 0 {
+		if len(encodedRes) == 0 {
 			continue
 		}
 
-		binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
-		binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
+		binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4))
+		binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID))
 		if _, err = conn.Write(resHeader); err != nil {
-			return b.serverError(err, conn)
+			b.serverError(err)
+			break
 		}
-		if _, err = conn.Write(response); err != nil {
-			return b.serverError(err, conn)
+		if _, err = conn.Write(encodedRes); err != nil {
+			b.serverError(err)
+			break
 		}
 	}
-	if err = conn.Close(); err != nil {
-		return b.serverError(err, nil)
-	}
-	if err = b.listener.Close(); err != nil {
-		b.t.Error(err)
-		return false
-	}
-	return true
+	Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
 }
 
-func (b *mockBroker) serverError(err error, conn net.Conn) bool {
-	b.t.Error(err)
-	if conn != nil {
-		if err := conn.Close(); err != nil {
-			b.t.Error(err)
+func (b *mockBroker) requestHandler() requestHandlerFunc {
+	b.handlerMux.Lock()
+	defer b.handlerMux.Unlock()
+	return b.handler
+}
+
+func (b *mockBroker) defaultRequestHandler(req *request) (res encoder) {
+	select {
+	case res, ok := <-b.expectations:
+		if !ok {
+			return nil
 		}
+		return res
+	case <-time.After(expectationTimeout):
+		return nil
 	}
-	if err := b.listener.Close(); err != nil {
-		b.t.Error(err)
+}
+
+func (b *mockBroker) serverError(err error) {
+	isConnectionClosedError := false
+	if _, ok := err.(*net.OpError); ok {
+		isConnectionClosedError = true
+	} else if err == io.EOF {
+		isConnectionClosedError = true
+	} else if err.Error() == "use of closed network connection" {
+		isConnectionClosedError = true
 	}
-	return false
+
+	if isConnectionClosedError {
+		return
+	}
+
+	b.t.Errorf(err.Error())
 }
 
 // newMockBroker launches a fake Kafka broker. It takes a *testing.T as provided by the
@@ -136,17 +212,19 @@ func newMockBrokerAddr(t *testing.T, brokerID int32, addr string) *mockBroker {
 	var err error
 
 	broker := &mockBroker{
-		stopper:      make(chan bool),
+		closing:      make(chan none),
+		stopper:      make(chan none),
 		t:            t,
 		brokerID:     brokerID,
 		expectations: make(chan encoder, 512),
 	}
+	broker.handler = broker.defaultRequestHandler
 
 	broker.listener, err = net.Listen("tcp", addr)
 	if err != nil {
 		t.Fatal(err)
 	}
-	Logger.Printf("mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String())
+	Logger.Printf("*** mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String())
 	_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
 	if err != nil {
 		t.Fatal(err)