Browse Source

converted the last of the old mockbroker test

Burke Libbey 12 years ago
parent
commit
37608277e6
3 changed files with 60 additions and 235 deletions
  1. 5 142
      broker_test.go
  2. 42 93
      client_test.go
  3. 13 0
      mockbroker/mockbroker.go

+ 5 - 142
broker_test.go

@@ -1,147 +1,11 @@
 package sarama
 
 import (
-	"encoding/binary"
 	"fmt"
-	"io"
-	"net"
-	"strconv"
+	"github.com/Shopify/sarama/mockbroker"
 	"testing"
-	"time"
 )
 
-// MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
-// accepts a single connection. It reads Kafka requests from that connection and returns each response
-// from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
-// the server sleeps for 250ms instead of reading a request).
-//
-// When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
-// waiting for a response, the test panics.
-//
-// It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
-// automatically as a convenience.
-type MockBroker struct {
-	port      int32
-	stopper   chan bool
-	responses chan []byte
-	listener  net.Listener
-	t         *testing.T
-}
-
-func (b *MockBroker) Port() int32 {
-	return b.port
-}
-
-func (b *MockBroker) Addr() string {
-	return b.listener.Addr().String()
-}
-
-// Close closes the response channel originally provided, then waits to make sure
-// that all requests/responses matched up before exiting.
-func (b *MockBroker) Close() {
-	close(b.responses)
-	<-b.stopper
-}
-
-func (b *MockBroker) serverLoop() {
-	defer close(b.stopper)
-	conn, err := b.listener.Accept()
-	if err != nil {
-		b.t.Error(err)
-		conn.Close()
-		b.listener.Close()
-		return
-	}
-	reqHeader := make([]byte, 4)
-	resHeader := make([]byte, 8)
-	for response := range b.responses {
-		if response == nil {
-			time.Sleep(250 * time.Millisecond)
-			continue
-		}
-		_, err := io.ReadFull(conn, reqHeader)
-		if err != nil {
-			b.t.Error(err)
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-		body := make([]byte, binary.BigEndian.Uint32(reqHeader))
-		if len(body) < 10 {
-			b.t.Error("Kafka request too short.")
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-		_, err = io.ReadFull(conn, body)
-		if err != nil {
-			b.t.Error(err)
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-		if len(response) == 0 {
-			continue
-		}
-		binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
-		binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
-		_, err = conn.Write(resHeader)
-		if err != nil {
-			b.t.Error(err)
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-		_, err = conn.Write(response)
-		if err != nil {
-			b.t.Error(err)
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-	}
-	err = conn.Close()
-	if err != nil {
-		b.t.Error(err)
-		b.listener.Close()
-		return
-	}
-	err = b.listener.Close()
-	if err != nil {
-		b.t.Error(err)
-		return
-	}
-}
-
-// NewMockBroker launches a fake Kafka broker. It takes a testing.T as provided by the test framework and a channel of responses to use.
-// If an error occurs it is simply logged to the testing.T and the broker exits.
-func NewMockBroker(t *testing.T, responses chan []byte) *MockBroker {
-	var err error
-
-	broker := new(MockBroker)
-	broker.stopper = make(chan bool)
-	broker.responses = responses
-	broker.t = t
-
-	broker.listener, err = net.Listen("tcp", "localhost:0")
-	if err != nil {
-		t.Fatal(err)
-	}
-	_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
-	if err != nil {
-		t.Fatal(err)
-	}
-	tmp, err := strconv.ParseInt(portStr, 10, 32)
-	if err != nil {
-		t.Fatal(err)
-	}
-	broker.port = int32(tmp)
-
-	go broker.serverLoop()
-
-	return broker
-}
-
 func ExampleBroker() error {
 	broker := NewBroker("localhost:9092")
 	err := broker.Open(4)
@@ -179,11 +43,10 @@ func TestBrokerAccessors(t *testing.T) {
 }
 
 func TestSimpleBrokerCommunication(t *testing.T) {
-	responses := make(chan []byte)
-	mockBroker := NewMockBroker(t, responses)
-	defer mockBroker.Close()
+	mb := mockbroker.New(t, 0)
+	defer mb.Close()
 
-	broker := NewBroker(mockBroker.Addr())
+	broker := NewBroker(mb.Addr())
 	err := broker.Open(4)
 	if err != nil {
 		t.Fatal(err)
@@ -191,7 +54,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
 
 	go func() {
 		for _, tt := range brokerTestTable {
-			responses <- tt.response
+			mb.ExpectBytes(tt.response)
 		}
 	}()
 	for _, tt := range brokerTestTable {

+ 42 - 93
client_test.go

@@ -1,80 +1,57 @@
 package sarama
 
 import (
-	"encoding/binary"
+	"github.com/Shopify/sarama/mockbroker"
 	"testing"
 )
 
 func TestSimpleClient(t *testing.T) {
-	responses := make(chan []byte, 1)
-	mockBroker := NewMockBroker(t, responses)
-	defer mockBroker.Close()
 
-	// Only one response needed, an empty metadata response
-	responses <- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
+	mb := mockbroker.New(t, 1)
 
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+	mb.ExpectMetadataRequest()
+
+	client, err := NewClient("client_id", []string{mb.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	client.Close()
+	defer client.Close()
+	defer mb.Close()
 }
 
 func TestClientExtraBrokers(t *testing.T) {
-	responses := make(chan []byte, 1)
-	mockBroker := NewMockBroker(t, responses)
-	mockExtra := NewMockBroker(t, make(chan []byte))
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	responses <- response
-
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+
+	mb1 := mockbroker.New(t, 1)
+	mb2 := mockbroker.New(t, 2)
+
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb2)
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	client.Close()
+	defer client.Close()
+	defer mb1.Close()
+	defer mb2.Close()
 }
 
 func TestClientMetadata(t *testing.T) {
-	responses := make(chan []byte, 1)
-	mockBroker := NewMockBroker(t, responses)
-	mockExtra := NewMockBroker(t, make(chan []byte))
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x05,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x05,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	responses <- response
-
-	client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
+
+	mb1 := mockbroker.New(t, 1)
+	mb5 := mockbroker.New(t, 5)
+
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb5).
+		AddTopicPartition("my_topic", 0, mb5.BrokerID())
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 	defer client.Close()
+	defer mb1.Close()
+	defer mb5.Close()
 
 	topics, err := client.Topics()
 	if err != nil {
@@ -99,50 +76,22 @@ func TestClientMetadata(t *testing.T) {
 }
 
 func TestClientRefreshBehaviour(t *testing.T) {
-	responses := make(chan []byte, 1)
-	extraResponses := make(chan []byte, 2)
-	mockBroker := NewMockBroker(t, responses)
-	mockExtra := NewMockBroker(t, extraResponses)
-	defer mockBroker.Close()
-	defer mockExtra.Close()
-
-	// return the extra mock as another available broker
-	response := []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0xaa,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
-	responses <- response
-	extraResponses <- []byte{
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x05,
-		0x00, 0x00, 0x00, 0x0e,
-		0xFF, 0xFF, 0xFF, 0xFF,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-	extraResponses <- []byte{
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x0b,
-		0x00, 0x00, 0x00, 0xaa,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00}
-
-	client, err := NewClient("clientID", []string{mockBroker.Addr()}, &ClientConfig{MetadataRetries: 1})
+	mb1 := mockbroker.New(t, 1)
+	mb5 := mockbroker.New(t, 5)
+
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb5)
+
+	mb5.ExpectMetadataRequest().
+		AddTopicPartition("my_topic", 0xb, 5)
+
+	client, err := NewClient("clientID", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1})
 	if err != nil {
 		t.Fatal(err)
 	}
 	defer client.Close()
+	defer mb1.Close()
+	defer mb5.Close()
 
 	parts, err := client.Partitions("my_topic")
 	if err != nil {
@@ -154,7 +103,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	tst, err := client.Leader("my_topic", 0xb)
 	if err != nil {
 		t.Error(err)
-	} else if tst.ID() != 0xaa {
+	} else if tst.ID() != 5 {
 		t.Error("Leader for my_topic had incorrect ID.")
 	}
 

+ 13 - 0
mockbroker/mockbroker.go

@@ -45,6 +45,16 @@ func (b *MockBroker) Addr() string {
 	return b.listener.Addr().String()
 }
 
+type rawExpectation []byte
+
+func (r rawExpectation) ResponseBytes() []byte {
+	return r
+}
+
+func (b *MockBroker) ExpectBytes(bytes []byte) {
+	b.expectations <- rawExpectation(bytes)
+}
+
 func (b *MockBroker) Close() {
 	if b.expecting {
 		b.t.Fatalf("Not all expectations were satisfied in mockBroker with ID=%d!", b.BrokerID())
@@ -80,6 +90,9 @@ func (b *MockBroker) serverLoop() (ok bool) {
 			return b.serverError(err, conn)
 		}
 		response := expectation.ResponseBytes()
+		if len(response) == 0 {
+			continue
+		}
 
 		binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
 		binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))