Browse Source

Further mock broker improvements

- A set of response builders were introduced to construct
  responses based on a particular request parameters.
- mockbroker.SetHandlerByMap that provides a neat way of
  defining a request handler that use mockresponse builders.
- consumer_test.go was rewritten using the new test primitives.
Maxim Vladimirsky 10 years ago
parent
commit
b3fa33fa0c
4 changed files with 884 additions and 477 deletions
  1. 1 1
      Makefile
  2. 634 475
      consumer_test.go
  3. 17 1
      mockbroker_test.go
  4. 232 0
      mockresponses_test.go

+ 1 - 1
Makefile

@@ -1,7 +1,7 @@
 default: fmt vet errcheck test
 
 test:
-	go test -v -timeout 60s -race ./...
+	go test -v -timeout 2m -race ./...
 
 vet:
 	go vet ./...

+ 634 - 475
consumer_test.go

@@ -1,38 +1,37 @@
 package sarama
 
 import (
-	"log"
-	"os"
-	"os/signal"
 	"sync"
 	"testing"
 	"time"
 )
 
-func TestConsumerOffsetManual(t *testing.T) {
-	seedBroker := newMockBroker(t, 1)
-	leader := newMockBroker(t, 2)
-
-	metadataResponse := new(MetadataResponse)
-	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
-	seedBroker.Returns(metadataResponse)
+var testMsg = StringEncoder("Foo")
 
-	offsetResponseNewest := new(OffsetResponse)
-	offsetResponseNewest.AddTopicPartition("my_topic", 0, 2345)
-	leader.Returns(offsetResponseNewest)
-
-	offsetResponseOldest := new(OffsetResponse)
-	offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
-	leader.Returns(offsetResponseOldest)
+// If a particular offset is provided then messages are consumed starting from
+// that offset.
+func TestConsumerOffsetManual(t *testing.T) {
+	// Given
+	broker0 := newMockBroker(t, 0)
+	defer broker0.Close()
 
+	mockFetchResponse := newMockFetchResponse(t, 1)
 	for i := 0; i < 10; i++ {
-		fetchResponse := new(FetchResponse)
-		fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
-		leader.Returns(fetchResponse)
+		mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg)
 	}
 
-	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": newMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()).
+			SetLeader("my_topic", 0, broker0.BrokerID()),
+		"OffsetRequest": newMockOffsetResponse(t).
+			SetOffset("my_topic", 0, OffsetOldest, 0).
+			SetOffset("my_topic", 0, OffsetNewest, 2345),
+		"FetchRequest": mockFetchResponse,
+	})
+
+	// When
+	master, err := NewConsumer([]string{broker0.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -41,14 +40,12 @@ func TestConsumerOffsetManual(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	seedBroker.Close()
 
+	// Then: messages starting from offset 1234 are consumed.
 	for i := 0; i < 10; i++ {
 		select {
 		case message := <-consumer.Messages():
-			if message.Offset != int64(i+1234) {
-				t.Error("Incorrect message offset!")
-			}
+			assertMessageOffset(t, message, int64(i+1234))
 		case err := <-consumer.Errors():
 			t.Error(err)
 		}
@@ -56,195 +53,491 @@ func TestConsumerOffsetManual(t *testing.T) {
 
 	safeClose(t, consumer)
 	safeClose(t, master)
-	leader.Close()
 }
 
+// If `OffsetNewest` is passed as the initial offset then the first consumed
+// message is indeed corresponds to the offset that broker claims to be the
+// newest in his metadata response.
 func TestConsumerOffsetNewest(t *testing.T) {
-	seedBroker := newMockBroker(t, 1)
-	leader := newMockBroker(t, 2)
+	// Given
+	broker0 := newMockBroker(t, 0)
+	defer broker0.Close()
+
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": newMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()).
+			SetLeader("my_topic", 0, broker0.BrokerID()),
+		"OffsetRequest": newMockOffsetResponse(t).
+			SetOffset("my_topic", 0, OffsetNewest, 10).
+			SetOffset("my_topic", 0, OffsetOldest, 7),
+		"FetchRequest": newMockFetchResponse(t, 1).
+			SetMessage("my_topic", 0, 9, testMsg).
+			SetMessage("my_topic", 0, 10, testMsg).
+			SetMessage("my_topic", 0, 11, testMsg).
+			SetHighWaterMark("my_topic", 0, 14),
+	})
+
+	master, err := NewConsumer([]string{broker0.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
 
-	metadataResponse := new(MetadataResponse)
-	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
-	seedBroker.Returns(metadataResponse)
+	// When
+	consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
+	if err != nil {
+		t.Fatal(err)
+	}
 
-	offsetResponseNewest := new(OffsetResponse)
-	offsetResponseNewest.AddTopicPartition("my_topic", 0, 10)
-	leader.Returns(offsetResponseNewest)
+	// Then
+	assertMessageOffset(t, <-consumer.Messages(), 10)
+	if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
+		t.Errorf("Expected high water mark offset 14, found %d", hwmo)
+	}
 
-	offsetResponseOldest := new(OffsetResponse)
-	offsetResponseOldest.AddTopicPartition("my_topic", 0, 7)
-	leader.Returns(offsetResponseOldest)
+	safeClose(t, consumer)
+	safeClose(t, master)
+}
 
-	fetchResponse := new(FetchResponse)
-	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 10)
-	block := fetchResponse.GetBlock("my_topic", 0)
-	block.HighWaterMarkOffset = 14
-	leader.Returns(fetchResponse)
+// It is possible to close a partition consumer and create the same anew.
+func TestConsumerRecreate(t *testing.T) {
+	// Given
+	broker0 := newMockBroker(t, 0)
+	defer broker0.Close()
+
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": newMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()).
+			SetLeader("my_topic", 0, broker0.BrokerID()),
+		"OffsetRequest": newMockOffsetResponse(t).
+			SetOffset("my_topic", 0, OffsetOldest, 0).
+			SetOffset("my_topic", 0, OffsetNewest, 1000),
+		"FetchRequest": newMockFetchResponse(t, 1).
+			SetMessage("my_topic", 0, 10, testMsg),
+	})
+
+	c, err := NewConsumer([]string{broker0.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
 
-	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
+	pc, err := c.ConsumePartition("my_topic", 0, 10)
 	if err != nil {
 		t.Fatal(err)
 	}
-	seedBroker.Close()
+	assertMessageOffset(t, <-pc.Messages(), 10)
 
-	consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
+	// When
+	safeClose(t, pc)
+	pc, err = c.ConsumePartition("my_topic", 0, 10)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	msg := <-consumer.Messages()
+	// Then
+	assertMessageOffset(t, <-pc.Messages(), 10)
+
+	safeClose(t, pc)
+	safeClose(t, c)
+}
+
+// An attempt to consume the same partition twice should fail.
+func TestConsumerDuplicate(t *testing.T) {
+	// Given
+	broker0 := newMockBroker(t, 0)
+	defer broker0.Close()
+
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": newMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()).
+			SetLeader("my_topic", 0, broker0.BrokerID()),
+		"OffsetRequest": newMockOffsetResponse(t).
+			SetOffset("my_topic", 0, OffsetOldest, 0).
+			SetOffset("my_topic", 0, OffsetNewest, 1000),
+		"FetchRequest": newMockFetchResponse(t, 1),
+	})
 
-	// we deliver one message, so it should be one higher than we return in the OffsetResponse
-	if msg.Offset != 10 {
-		t.Error("Latest message offset not fetched correctly:", msg.Offset)
+	config := NewConfig()
+	config.ChannelBufferSize = 0
+	c, err := NewConsumer([]string{broker0.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
 	}
 
-	if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
-		t.Errorf("Expected high water mark offset 14, found %d", hwmo)
+	pc1, err := c.ConsumePartition("my_topic", 0, 0)
+	if err != nil {
+		t.Fatal(err)
 	}
 
-	leader.Close()
-	safeClose(t, consumer)
-	safeClose(t, master)
+	// When
+	pc2, err := c.ConsumePartition("my_topic", 0, 0)
 
-	// We deliver one message, so it should be one higher than we return in the OffsetResponse.
-	// This way it is set correctly for the next FetchRequest.
-	if consumer.(*partitionConsumer).offset != 11 {
-		t.Error("Latest offset not fetched correctly:", consumer.(*partitionConsumer).offset)
+	// Then
+	if pc2 != nil || err != ConfigurationError("That topic/partition is already being consumed") {
+		t.Fatal("A partition cannot be consumed twice at the same time")
 	}
+
+	safeClose(t, pc1)
+	safeClose(t, c)
 }
 
-func TestConsumerShutsDownOutOfRange(t *testing.T) {
-	seedBroker := newMockBroker(t, 1)
-	leader := newMockBroker(t, 2)
+// If consumer fails to refresh metadata it keeps retrying every with frequency
+// given in `Config.Consumer.Retry.Backoff`.
+func TestConsumerLeaderRefreshError(t *testing.T) {
+	// Given
+	broker0 := newMockBroker(t, 100)
+	defer broker0.Close()
+
+	// Stage 1: my_topic/0 served by broker0
+	Logger.Printf("    STAGE 1")
+
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": newMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()).
+			SetLeader("my_topic", 0, broker0.BrokerID()),
+		"OffsetRequest": newMockOffsetResponse(t).
+			SetOffset("my_topic", 0, OffsetOldest, 123).
+			SetOffset("my_topic", 0, OffsetNewest, 1000),
+		"FetchRequest": newMockFetchResponse(t, 1).
+			SetMessage("my_topic", 0, 123, testMsg),
+	})
+
+	config := NewConfig()
+	config.Net.ReadTimeout = 100 * time.Millisecond
+	config.Consumer.Retry.Backoff = 500 * time.Millisecond
+	config.Metadata.Retry.Max = 0
+	c, err := NewConsumer([]string{broker0.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
+	if err != nil {
+		t.Errorf("Failed to create a partition consumer, err=%v", err)
+	}
+
+	assertMessageOffset(t, <-pc.Messages(), 123)
+
+	// Stage 2: broker0 says that it is no longer the leader for my_topic/0,
+	// but the requests to retrieve metadata fail with network timeout.
+	Logger.Printf("    STAGE 2")
+
+	fetchResponse2 := &FetchResponse{}
+	fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
+
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"FetchRequest": newMockWrapper(fetchResponse2),
+	})
 
-	metadataResponse := new(MetadataResponse)
-	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
-	seedBroker.Returns(metadataResponse)
+	// Stage 3: finally the metadata returned by broker0 tells that broker1 is
+	// a new leader for my_topic/0. Consumption resumes.
 
-	offsetResponseNewest := new(OffsetResponse)
-	offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
-	leader.Returns(offsetResponseNewest)
+	// Unfortunately consumer does not propagate `ErrNotLeaderForPartition`
+	// error to PartitionConsumer.Errors() channel. So there is no other way to
+	// synchronize here by sleep.
+	time.Sleep(300 * time.Millisecond)
+	Logger.Printf("    STAGE 3")
 
-	offsetResponseOldest := new(OffsetResponse)
-	offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
-	leader.Returns(offsetResponseOldest)
+	broker1 := newMockBroker(t, 101)
+	defer broker1.Close()
 
-	fetchResponse := new(FetchResponse)
-	fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
-	leader.Returns(fetchResponse)
+	broker1.SetHandlerByMap(map[string]MockResponse{
+		"FetchRequest": newMockFetchResponse(t, 1).
+			SetMessage("my_topic", 0, 124, testMsg),
+	})
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": newMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()).
+			SetBroker(broker1.Addr(), broker1.BrokerID()).
+			SetLeader("my_topic", 0, broker1.BrokerID()),
+	})
 
-	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
+	assertMessageOffset(t, <-pc.Messages(), 124)
+
+	safeClose(t, pc)
+	safeClose(t, c)
+}
+
+func TestConsumerInvalidTopic(t *testing.T) {
+	// Given
+	broker0 := newMockBroker(t, 100)
+	defer broker0.Close()
+
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": newMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()),
+	})
+
+	c, err := NewConsumer([]string{broker0.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// When
+	pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
+
+	// Then
+	if pc != nil || err != ErrUnknownTopicOrPartition {
+		t.Errorf("Should fail with, err=%v", err)
+	}
+
+	safeClose(t, c)
+}
+
+// Nothing bad happens if a partition consumer that has no leader assigned at
+// the moment is closed.
+func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
+	// Given
+	broker0 := newMockBroker(t, 100)
+	defer broker0.Close()
+
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": newMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()).
+			SetLeader("my_topic", 0, broker0.BrokerID()),
+		"OffsetRequest": newMockOffsetResponse(t).
+			SetOffset("my_topic", 0, OffsetOldest, 123).
+			SetOffset("my_topic", 0, OffsetNewest, 1000),
+		"FetchRequest": newMockFetchResponse(t, 1).
+			SetMessage("my_topic", 0, 123, testMsg),
+	})
+
+	config := NewConfig()
+	config.Net.ReadTimeout = 100 * time.Millisecond
+	config.Consumer.Retry.Backoff = 100 * time.Millisecond
+	config.Metadata.Retry.Max = 0
+	c, err := NewConsumer([]string{broker0.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
+	if err != nil {
+		t.Errorf("Failed to create a partition consumer, err=%v", err)
+	}
+
+	assertMessageOffset(t, <-pc.Messages(), 123)
+
+	// broker0 says that it is no longer the leader for my_topic/0, but the
+	// requests to retrieve metadata fail with network timeout.
+	fetchResponse2 := &FetchResponse{}
+	fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
+
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": newMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()).
+			SetLeader("my_topic", 0, broker0.BrokerID()),
+		"FetchRequest": newMockWrapper(fetchResponse2),
+	})
+
+	// When
+
+	// Unfortunately consumer does not propagate `ErrNotLeaderForPartition`
+	// error to PartitionConsumer.Errors() channel. So there is no other way to
+	// synchronize here by sleep.
+	time.Sleep(200 * time.Millisecond)
+
+	// Then: the partition consumer can be closed without any problem.
+	safeClose(t, pc)
+	safeClose(t, c)
+}
+
+// If the initial offset passed on partition consumer creation is out of the
+// actual offset range for the partition, then the partition consumer stops
+// immediately closing its output channels.
+func TestConsumerShutsDownOutOfRange(t *testing.T) {
+	// Given
+	broker0 := newMockBroker(t, 0)
+	defer broker0.Close()
+
+	broker0.SetHandler(func(req *request) (res encoder) {
+		switch reqBody := req.body.(type) {
+		case *MetadataRequest:
+			return newMockMetadataResponse(t).
+				SetBroker(broker0.Addr(), broker0.BrokerID()).
+				SetLeader("my_topic", 0, broker0.BrokerID()).
+				For(reqBody)
+		case *OffsetRequest:
+			return newMockOffsetResponse(t).
+				SetOffset("my_topic", 0, OffsetNewest, 1234).
+				SetOffset("my_topic", 0, OffsetOldest, 7).
+				For(reqBody)
+		case *FetchRequest:
+			fetchResponse := new(FetchResponse)
+			fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
+			return fetchResponse
+		}
+		return nil
+	})
+
+	master, err := NewConsumer([]string{broker0.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	seedBroker.Close()
 
+	// When
 	consumer, err := master.ConsumePartition("my_topic", 0, 101)
 	if err != nil {
 		t.Fatal(err)
 	}
 
+	// Then: consumer should shut down closing its messages and errors channels.
 	if _, ok := <-consumer.Messages(); ok {
 		t.Error("Expected the consumer to shut down")
 	}
+	safeClose(t, consumer)
 
-	leader.Close()
 	safeClose(t, master)
 }
 
-func TestConsumerFunnyOffsets(t *testing.T) {
-	// for topics that are compressed and/or compacted (different things!) we have to be
-	// able to handle receiving offsets that are non-sequential (though still strictly increasing) and
-	// possibly starting prior to the actual value we requested
-	seedBroker := newMockBroker(t, 1)
-	leader := newMockBroker(t, 2)
+// If a fetch response contains messages with offsets that are smaller then
+// requested, then such messages are ignored.
+func TestConsumerExtraOffsets(t *testing.T) {
+	// Given
+	broker0 := newMockBroker(t, 0)
+	defer broker0.Close()
+
+	called := 0
+	broker0.SetHandler(func(req *request) (res encoder) {
+		switch req.body.(type) {
+		case *MetadataRequest:
+			return newMockMetadataResponse(t).
+				SetBroker(broker0.Addr(), broker0.BrokerID()).
+				SetLeader("my_topic", 0, broker0.BrokerID()).For(req.body)
+		case *OffsetRequest:
+			return newMockOffsetResponse(t).
+				SetOffset("my_topic", 0, OffsetNewest, 1234).
+				SetOffset("my_topic", 0, OffsetOldest, 0).For(req.body)
+		case *FetchRequest:
+			fetchResponse := &FetchResponse{}
+			called++
+			if called > 1 {
+				fetchResponse.AddError("my_topic", 0, ErrNoError)
+				return fetchResponse
+			}
+			fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
+			fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
+			fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
+			fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
+			return fetchResponse
+		}
+		return nil
+	})
 
-	metadataResponse := new(MetadataResponse)
-	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
-	seedBroker.Returns(metadataResponse)
+	master, err := NewConsumer([]string{broker0.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
 
-	offsetResponseNewest := new(OffsetResponse)
-	offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
-	leader.Returns(offsetResponseNewest)
+	// When
+	consumer, err := master.ConsumePartition("my_topic", 0, 3)
+	if err != nil {
+		t.Fatal(err)
+	}
 
-	offsetResponseOldest := new(OffsetResponse)
-	offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
-	leader.Returns(offsetResponseOldest)
+	// Then: messages with offsets 1 and 2 are not returned even though they
+	// are present in the response.
+	assertMessageOffset(t, <-consumer.Messages(), 3)
+	assertMessageOffset(t, <-consumer.Messages(), 4)
 
-	fetchResponse := new(FetchResponse)
-	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
-	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
-	leader.Returns(fetchResponse)
+	safeClose(t, consumer)
+	safeClose(t, master)
+}
 
-	fetchResponse = new(FetchResponse)
-	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
-	leader.Returns(fetchResponse)
+// It is fine if offsets of fetched messages are not sequential (although
+// strictly increasing!).
+func TestConsumerNonSequentialOffsets(t *testing.T) {
+	// Given
+	broker0 := newMockBroker(t, 0)
+	defer broker0.Close()
+
+	called := 0
+	broker0.SetHandler(func(req *request) (res encoder) {
+		switch req.body.(type) {
+		case *MetadataRequest:
+			return newMockMetadataResponse(t).
+				SetBroker(broker0.Addr(), broker0.BrokerID()).
+				SetLeader("my_topic", 0, broker0.BrokerID()).For(req.body)
+		case *OffsetRequest:
+			return newMockOffsetResponse(t).
+				SetOffset("my_topic", 0, OffsetNewest, 1234).
+				SetOffset("my_topic", 0, OffsetOldest, 0).For(req.body)
+		case *FetchRequest:
+			called++
+			fetchResponse := &FetchResponse{}
+			if called > 1 {
+				fetchResponse.AddError("my_topic", 0, ErrNoError)
+				return fetchResponse
+			}
+			fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
+			fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
+			fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
+			return fetchResponse
+		}
+		return nil
+	})
 
-	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
+	master, err := NewConsumer([]string{broker0.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	consumer, err := master.ConsumePartition("my_topic", 0, 2)
+	// When
+	consumer, err := master.ConsumePartition("my_topic", 0, 3)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	if message := <-consumer.Messages(); message.Offset != 3 {
-		t.Error("Incorrect message offset!")
-	}
-
-	if message := <-consumer.Messages(); message.Offset != 5 {
-		t.Error("Incorrect message offset!")
-	}
+	// Then: messages with offsets 1 and 2 are not returned even though they
+	// are present in the response.
+	assertMessageOffset(t, <-consumer.Messages(), 5)
+	assertMessageOffset(t, <-consumer.Messages(), 7)
+	assertMessageOffset(t, <-consumer.Messages(), 11)
 
-	leader.Close()
-	seedBroker.Close()
 	safeClose(t, consumer)
 	safeClose(t, master)
 }
 
+// If leadership for a partition is changing then consumer resolves the new
+// leader and switches to it.
 func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	// initial setup
-	seedBroker := newMockBroker(t, 1)
-	leader0 := newMockBroker(t, 2)
-	leader1 := newMockBroker(t, 3)
-
-	metadataResponse := new(MetadataResponse)
-	metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
-	metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
-	seedBroker.Returns(metadataResponse)
+	seedBroker := newMockBroker(t, 10)
+	defer seedBroker.Close()
+	leader0 := newMockBroker(t, 0)
+	defer leader0.Close()
+	leader1 := newMockBroker(t, 1)
+	defer leader1.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": newMockMetadataResponse(t).
+			SetBroker(leader0.Addr(), leader0.BrokerID()).
+			SetBroker(leader1.Addr(), leader1.BrokerID()).
+			SetLeader("my_topic", 0, leader0.BrokerID()).
+			SetLeader("my_topic", 1, leader1.BrokerID()),
+	})
+
+	mockOffsetResponse1 := newMockOffsetResponse(t).
+		SetOffset("my_topic", 0, OffsetOldest, 0).
+		SetOffset("my_topic", 0, OffsetNewest, 1000).
+		SetOffset("my_topic", 1, OffsetOldest, 0).
+		SetOffset("my_topic", 1, OffsetNewest, 1000)
+	leader0.SetHandlerByMap(map[string]MockResponse{
+		"OffsetRequest": mockOffsetResponse1,
+		"FetchRequest":  newMockFetchResponse(t, 1),
+	})
+	leader1.SetHandlerByMap(map[string]MockResponse{
+		"OffsetRequest": mockOffsetResponse1,
+		"FetchRequest":  newMockFetchResponse(t, 1),
+	})
 
 	// launch test goroutines
 	config := NewConfig()
-	config.Consumer.Retry.Backoff = 0
+	config.Consumer.Retry.Backoff = 50
 	master, err := NewConsumer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	offsetResponseNewest0 := new(OffsetResponse)
-	offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
-	leader0.Returns(offsetResponseNewest0)
-
-	offsetResponseOldest0 := new(OffsetResponse)
-	offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
-	leader0.Returns(offsetResponseOldest0)
-
-	offsetResponseNewest1 := new(OffsetResponse)
-	offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
-	leader1.Returns(offsetResponseNewest1)
-
-	offsetResponseOldest1 := new(OffsetResponse)
-	offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
-	leader1.Returns(offsetResponseOldest1)
-
 	// we expect to end up (eventually) consuming exactly ten messages on each partition
 	var wg sync.WaitGroup
 	for i := int32(0); i < 2; i++ {
@@ -275,424 +568,290 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 		}(i, consumer)
 	}
 
-	// leader0 provides first four messages on partition 0
-	fetchResponse := new(FetchResponse)
+	time.Sleep(50 * time.Millisecond)
+	Logger.Printf("    STAGE 1")
+	// Stage 1:
+	//   * my_topic/0 -> leader0 serves 4 messages
+	//   * my_topic/1 -> leader1 serves 0 messages
+
+	mockFetchResponse := newMockFetchResponse(t, 1)
 	for i := 0; i < 4; i++ {
-		fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
+		mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
 	}
-	leader0.Returns(fetchResponse)
+	leader0.SetHandlerByMap(map[string]MockResponse{
+		"FetchRequest": mockFetchResponse,
+	})
+
+	time.Sleep(50 * time.Millisecond)
+	Logger.Printf("    STAGE 2")
+	// Stage 2:
+	//   * leader0 says that it is no longer serving my_topic/0
+	//   * seedBroker tells that leader1 is serving my_topic/0 now
+
+	// seed broker tells that the new partition 0 leader is leader1
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": newMockMetadataResponse(t).
+			SetLeader("my_topic", 0, leader1.BrokerID()).
+			SetLeader("my_topic", 1, leader1.BrokerID()),
+	})
 
 	// leader0 says no longer leader of partition 0
-	fetchResponse = new(FetchResponse)
-	fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
-	leader0.Returns(fetchResponse)
+	leader0.SetHandler(func(req *request) (res encoder) {
+		switch req.body.(type) {
+		case *FetchRequest:
+			fetchResponse := new(FetchResponse)
+			fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
+			return fetchResponse
+		}
+		return nil
+	})
 
-	// metadata assigns both partitions to leader1
-	metadataResponse = new(MetadataResponse)
-	metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
-	seedBroker.Returns(metadataResponse)
-	time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering
+	time.Sleep(50 * time.Millisecond)
+	Logger.Printf("    STAGE 3")
+	// Stage 3:
+	//   * my_topic/0 -> leader1 serves 3 messages
+	//   * my_topic/1 -> leader1 server 8 messages
 
-	// leader1 provides five messages on partition 1
-	fetchResponse = new(FetchResponse)
-	for i := 0; i < 5; i++ {
-		fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
+	// leader1 provides 3 message on partition 0, and 8 messages on partition 1
+	mockFetchResponse2 := newMockFetchResponse(t, 2)
+	for i := 4; i < 7; i++ {
+		mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg)
 	}
-	leader1.Returns(fetchResponse)
-
-	// leader1 provides three more messages on both partitions
-	fetchResponse = new(FetchResponse)
-	for i := 0; i < 3; i++ {
-		fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+4))
-		fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+5))
+	for i := 0; i < 8; i++ {
+		mockFetchResponse2.SetMessage("my_topic", 1, int64(i), testMsg)
 	}
-	leader1.Returns(fetchResponse)
+	leader1.SetHandlerByMap(map[string]MockResponse{
+		"FetchRequest": mockFetchResponse2,
+	})
 
-	// leader1 provides three more messages on partition0, says no longer leader of partition1
-	fetchResponse = new(FetchResponse)
-	for i := 0; i < 3; i++ {
-		fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7))
-	}
-	fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
-	leader1.Returns(fetchResponse)
+	time.Sleep(50 * time.Millisecond)
+	Logger.Printf("    STAGE 4")
+	// Stage 4:
+	//   * my_topic/0 -> leader1 serves 3 messages
+	//   * my_topic/1 -> leader1 tells that it is no longer the leader
+	//   * seedBroker tells that leader0 is a new leader for my_topic/1
 
 	// metadata assigns 0 to leader1 and 1 to leader0
-	metadataResponse = new(MetadataResponse)
-	metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader0.BrokerID(), nil, nil, ErrNoError)
-	seedBroker.Returns(metadataResponse)
-	time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": newMockMetadataResponse(t).
+			SetLeader("my_topic", 0, leader1.BrokerID()).
+			SetLeader("my_topic", 1, leader0.BrokerID()),
+	})
+
+	// leader1 provides three more messages on partition0, says no longer leader of partition1
+	mockFetchResponse3 := newMockFetchResponse(t, 3).
+		SetMessage("my_topic", 0, int64(7), testMsg).
+		SetMessage("my_topic", 0, int64(8), testMsg).
+		SetMessage("my_topic", 0, int64(9), testMsg)
+	leader1.SetHandler(func(req *request) (res encoder) {
+		switch reqBody := req.body.(type) {
+		case *FetchRequest:
+			res := mockFetchResponse3.For(reqBody).(*FetchResponse)
+			res.AddError("my_topic", 1, ErrNotLeaderForPartition)
+			return res
+
+		}
+		return nil
+	})
 
 	// leader0 provides two messages on partition 1
-	fetchResponse = new(FetchResponse)
-	fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
-	fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
-	leader0.Returns(fetchResponse)
-	time.Sleep(50 * time.Millisecond) // dumbest way to force a particular response ordering
-
-	leader1.Close()
-	leader0.Close()
+	mockFetchResponse4 := newMockFetchResponse(t, 2)
+	for i := 8; i < 10; i++ {
+		mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg)
+	}
+	leader0.SetHandlerByMap(map[string]MockResponse{
+		"FetchRequest": mockFetchResponse4,
+	})
+
 	wg.Wait()
-	seedBroker.Close()
 	safeClose(t, master)
 }
 
+// When two partitions have the same broker as the leader, if one partition
+// consumer channel buffer is full then that does not affect the ability to
+// read messages by the other consumer.
 func TestConsumerInterleavedClose(t *testing.T) {
-	seedBroker := newMockBroker(t, 1)
-	leader := newMockBroker(t, 2)
-
-	metadataResponse := new(MetadataResponse)
-	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
-	seedBroker.Returns(metadataResponse)
+	// Given
+	broker0 := newMockBroker(t, 0)
+	defer broker0.Close()
+
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": newMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()).
+			SetLeader("my_topic", 0, broker0.BrokerID()).
+			SetLeader("my_topic", 1, broker0.BrokerID()),
+		"OffsetRequest": newMockOffsetResponse(t).
+			SetOffset("my_topic", 0, OffsetOldest, 1000).
+			SetOffset("my_topic", 0, OffsetNewest, 1100).
+			SetOffset("my_topic", 1, OffsetOldest, 2000).
+			SetOffset("my_topic", 1, OffsetNewest, 2100),
+		"FetchRequest": newMockFetchResponse(t, 1).
+			SetMessage("my_topic", 0, 1000, testMsg).
+			SetMessage("my_topic", 0, 1001, testMsg).
+			SetMessage("my_topic", 0, 1002, testMsg).
+			SetMessage("my_topic", 1, 2000, testMsg),
+	})
 
 	config := NewConfig()
 	config.ChannelBufferSize = 0
-	master, err := NewConsumer([]string{seedBroker.Addr()}, config)
+	master, err := NewConsumer([]string{broker0.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	offsetResponseNewest0 := new(OffsetResponse)
-	offsetResponseNewest0.AddTopicPartition("my_topic", 0, 1234)
-	leader.Returns(offsetResponseNewest0)
-
-	offsetResponseOldest0 := new(OffsetResponse)
-	offsetResponseOldest0.AddTopicPartition("my_topic", 0, 0)
-	leader.Returns(offsetResponseOldest0)
-
-	c0, err := master.ConsumePartition("my_topic", 0, 0)
+	c0, err := master.ConsumePartition("my_topic", 0, 1000)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	fetchResponse := new(FetchResponse)
-	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
-	leader.Returns(fetchResponse)
-	time.Sleep(50 * time.Millisecond)
-
-	offsetResponseNewest1 := new(OffsetResponse)
-	offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
-	leader.Returns(offsetResponseNewest1)
-
-	offsetResponseOldest1 := new(OffsetResponse)
-	offsetResponseOldest1.AddTopicPartition("my_topic", 1, 0)
-	leader.Returns(offsetResponseOldest1)
-
-	c1, err := master.ConsumePartition("my_topic", 1, 0)
+	c1, err := master.ConsumePartition("my_topic", 1, 2000)
 	if err != nil {
 		t.Fatal(err)
 	}
-	<-c0.Messages()
 
-	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
-	fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
-	leader.Returns(fetchResponse)
-	leader.Returns(fetchResponse)
+	// When/Then: we can read from partition 0 even if nobody reads from partition 1
+	assertMessageOffset(t, <-c0.Messages(), 1000)
+	assertMessageOffset(t, <-c0.Messages(), 1001)
+	assertMessageOffset(t, <-c0.Messages(), 1002)
 
 	safeClose(t, c1)
 	safeClose(t, c0)
 	safeClose(t, master)
-	leader.Close()
-	seedBroker.Close()
 }
 
 func TestConsumerBounceWithReferenceOpen(t *testing.T) {
-	seedBroker := newMockBroker(t, 1)
-	leader := newMockBroker(t, 2)
-	leaderAddr := leader.Addr()
-	tmp := newMockBroker(t, 3)
-
-	metadataResponse := new(MetadataResponse)
-	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddBroker(tmp.Addr(), tmp.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, tmp.BrokerID(), nil, nil, ErrNoError)
-	seedBroker.Returns(metadataResponse)
+	broker0 := newMockBroker(t, 0)
+	broker0Addr := broker0.Addr()
+	broker1 := newMockBroker(t, 1)
+
+	mockMetadataResponse := newMockMetadataResponse(t).
+		SetBroker(broker0.Addr(), broker0.BrokerID()).
+		SetBroker(broker1.Addr(), broker1.BrokerID()).
+		SetLeader("my_topic", 0, broker0.BrokerID()).
+		SetLeader("my_topic", 1, broker1.BrokerID())
+
+	mockOffsetResponse := newMockOffsetResponse(t).
+		SetOffset("my_topic", 0, OffsetOldest, 1000).
+		SetOffset("my_topic", 0, OffsetNewest, 1100).
+		SetOffset("my_topic", 1, OffsetOldest, 2000).
+		SetOffset("my_topic", 1, OffsetNewest, 2100)
+
+	mockFetchResponse := newMockFetchResponse(t, 1)
+	for i := 0; i < 10; i++ {
+		mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg)
+		mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg)
+	}
+
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"OffsetRequest": mockOffsetResponse,
+		"FetchRequest":  mockFetchResponse,
+	})
+	broker1.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": mockMetadataResponse,
+		"OffsetRequest":   mockOffsetResponse,
+		"FetchRequest":    mockFetchResponse,
+	})
 
 	config := NewConfig()
 	config.Consumer.Return.Errors = true
-	config.Consumer.Retry.Backoff = 0
-	config.ChannelBufferSize = 0
-	master, err := NewConsumer([]string{seedBroker.Addr()}, config)
+	config.Consumer.Retry.Backoff = 100 * time.Millisecond
+	config.ChannelBufferSize = 1
+	master, err := NewConsumer([]string{broker1.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	offsetResponseNewest := new(OffsetResponse)
-	offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
-	leader.Returns(offsetResponseNewest)
-
-	offsetResponseOldest := new(OffsetResponse)
-	offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
-	leader.Returns(offsetResponseOldest)
-
-	c0, err := master.ConsumePartition("my_topic", 0, 0)
+	c0, err := master.ConsumePartition("my_topic", 0, 1000)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	offsetResponseNewest = new(OffsetResponse)
-	offsetResponseNewest.AddTopicPartition("my_topic", 1, 1234)
-	tmp.Returns(offsetResponseNewest)
-
-	offsetResponseOldest = new(OffsetResponse)
-	offsetResponseOldest.AddTopicPartition("my_topic", 1, 0)
-	tmp.Returns(offsetResponseOldest)
-
-	c1, err := master.ConsumePartition("my_topic", 1, 0)
+	c1, err := master.ConsumePartition("my_topic", 1, 2000)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	//redirect partition 1 back to main leader
-	metadataResponse = new(MetadataResponse)
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
-	seedBroker.Returns(metadataResponse)
-	fetchResponse := new(FetchResponse)
-	fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
-	tmp.Returns(fetchResponse)
-	time.Sleep(5 * time.Millisecond)
-
-	// now send one message to each partition to make sure everything is primed
-	fetchResponse = new(FetchResponse)
-	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
-	fetchResponse.AddError("my_topic", 1, ErrNoError)
-	leader.Returns(fetchResponse)
-	<-c0.Messages()
-
-	fetchResponse = new(FetchResponse)
-	fetchResponse.AddError("my_topic", 0, ErrNoError)
-	fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
-	leader.Returns(fetchResponse)
-	<-c1.Messages()
-
-	// bounce the broker
-	leader.Close()
-	leader = newMockBrokerAddr(t, 2, leaderAddr)
-
-	// unblock one of the two (it doesn't matter which)
-	select {
-	case <-c0.Errors():
-	case <-c1.Errors():
+	// read messages from both partition to make sure that both brokers operate
+	// normally.
+	assertMessageOffset(t, <-c0.Messages(), 1000)
+	assertMessageOffset(t, <-c1.Messages(), 2000)
+
+	// Simulate broker shutdown. Note that metadata response does not change,
+	// that is the leadership does not move to another broker. So partition
+	// consumer will keep retrying to restore the connection with the broker.
+	broker0.Close()
+
+	// Make sure that while the partition/0 leader is down, consumer/partition/1
+	// is capable of pulling messages from broker1.
+	for i := 1; i < 7; i++ {
+		offset := (<-c1.Messages()).Offset
+		if offset != int64(2000+i) {
+			t.Errorf("Expected offset %d from consumer/partition/1", int64(2000+i))
+		}
 	}
-	// send it back to the same broker
-	seedBroker.Returns(metadataResponse)
 
-	fetchResponse = new(FetchResponse)
-	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
-	fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
-	leader.Returns(fetchResponse)
+	// Bring broker0 back to service.
+	broker0 = newMockBrokerAddr(t, 0, broker0Addr)
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"FetchRequest": mockFetchResponse,
+	})
 
-	time.Sleep(5 * time.Millisecond)
+	// Read the rest of messages from both partitions.
+	for i := 7; i < 10; i++ {
+		assertMessageOffset(t, <-c1.Messages(), int64(2000+i))
+	}
+	for i := 1; i < 10; i++ {
+		assertMessageOffset(t, <-c0.Messages(), int64(1000+i))
+	}
 
-	// unblock the other one
 	select {
 	case <-c0.Errors():
-	case <-c1.Errors():
+	default:
+		t.Errorf("Partition consumer should have detected broker restart")
 	}
 
-	select {
-	case <-c0.Messages():
-	case <-c1.Messages():
-	}
-
-	leader.Close()
-	seedBroker.Close()
-	wg := sync.WaitGroup{}
-	wg.Add(2)
-	go func() {
-		_ = c0.Close()
-		wg.Done()
-	}()
-	go func() {
-		_ = c1.Close()
-		wg.Done()
-	}()
-	wg.Wait()
+	safeClose(t, c1)
+	safeClose(t, c0)
 	safeClose(t, master)
-	tmp.Close()
+	broker0.Close()
+	broker1.Close()
 }
 
 func TestConsumerOffsetOutOfRange(t *testing.T) {
-	seedBroker := newMockBroker(t, 1)
-	leader := newMockBroker(t, 2)
-
-	metadataResponse := new(MetadataResponse)
-	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
-	seedBroker.Returns(metadataResponse)
-
-	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
+	// Given
+	broker0 := newMockBroker(t, 2)
+	defer broker0.Close()
+
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": newMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()).
+			SetLeader("my_topic", 0, broker0.BrokerID()),
+		"OffsetRequest": newMockOffsetResponse(t).
+			SetOffset("my_topic", 0, OffsetNewest, 1234).
+			SetOffset("my_topic", 0, OffsetOldest, 2345),
+	})
+
+	master, err := NewConsumer([]string{broker0.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	seedBroker.Close()
-
-	offsetResponseNewest := new(OffsetResponse)
-	offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
 
-	offsetResponseOldest := new(OffsetResponse)
-	offsetResponseOldest.AddTopicPartition("my_topic", 0, 2345)
-
-	leader.Returns(offsetResponseNewest)
-	leader.Returns(offsetResponseOldest)
+	// When/Then
 	if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
 		t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
 	}
-
-	leader.Returns(offsetResponseNewest)
-	leader.Returns(offsetResponseOldest)
 	if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
 		t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
 	}
-
-	leader.Returns(offsetResponseNewest)
-	leader.Returns(offsetResponseOldest)
 	if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
 		t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
 	}
 
-	leader.Close()
 	safeClose(t, master)
 }
 
-// This example has the simplest use case of the consumer. It simply
-// iterates over the messages channel using a for/range loop. Because
-// a producer never stopsunless requested, a signal handler is registered
-// so we can trigger a clean shutdown of the consumer.
-func ExampleConsumer_for_loop() {
-	master, err := NewConsumer([]string{"localhost:9092"}, nil)
-	if err != nil {
-		log.Fatalln(err)
-	}
-	defer func() {
-		if err := master.Close(); err != nil {
-			log.Fatalln(err)
-		}
-	}()
-
-	consumer, err := master.ConsumePartition("my_topic", 0, 0)
-	if err != nil {
-		log.Fatalln(err)
-	}
-
-	go func() {
-		// By default, the consumer will always keep going, unless we tell it to stop.
-		// In this case, we capture the SIGINT signal so we can tell the consumer to stop
-		signals := make(chan os.Signal, 1)
-		signal.Notify(signals, os.Interrupt)
-		<-signals
-		consumer.AsyncClose()
-	}()
-
-	msgCount := 0
-	for message := range consumer.Messages() {
-		log.Println(string(message.Value))
-		msgCount++
+func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
+	if msg.Offset != expectedOffset {
+		t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
 	}
-	log.Println("Processed", msgCount, "messages.")
-}
-
-// This example shows how to use a consumer with a select statement
-// dealing with the different channels.
-func ExampleConsumer_select() {
-	config := NewConfig()
-	config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them.
-
-	master, err := NewConsumer([]string{"localhost:9092"}, config)
-	if err != nil {
-		log.Fatalln(err)
-	}
-	defer func() {
-		if err := master.Close(); err != nil {
-			log.Fatalln(err)
-		}
-	}()
-
-	consumer, err := master.ConsumePartition("my_topic", 0, 0)
-	if err != nil {
-		log.Fatalln(err)
-	}
-	defer func() {
-		if err := consumer.Close(); err != nil {
-			log.Fatalln(err)
-		}
-	}()
-
-	msgCount := 0
-
-	signals := make(chan os.Signal, 1)
-	signal.Notify(signals, os.Interrupt)
-
-consumerLoop:
-	for {
-		select {
-		case err := <-consumer.Errors():
-			log.Println(err)
-		case <-consumer.Messages():
-			msgCount++
-		case <-signals:
-			log.Println("Received interrupt")
-			break consumerLoop
-		}
-	}
-	log.Println("Processed", msgCount, "messages.")
-}
-
-// This example shows how to use a consumer with different goroutines
-// to read from the Messages and Errors channels.
-func ExampleConsumer_goroutines() {
-	config := NewConfig()
-	config.Consumer.Return.Errors = true // Handle errors manually instead of letting Sarama log them.
-
-	master, err := NewConsumer([]string{"localhost:9092"}, config)
-	if err != nil {
-		log.Fatalln(err)
-	}
-	defer func() {
-		if err := master.Close(); err != nil {
-			panic(err)
-		}
-	}()
-
-	consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest)
-	if err != nil {
-		log.Fatalln(err)
-	}
-
-	var (
-		wg       sync.WaitGroup
-		msgCount int
-	)
-
-	wg.Add(1)
-	go func() {
-		defer wg.Done()
-		for message := range consumer.Messages() {
-			log.Printf("Consumed message with offset %d", message.Offset)
-			msgCount++
-		}
-	}()
-
-	wg.Add(1)
-	go func() {
-		defer wg.Done()
-		for err := range consumer.Errors() {
-			log.Println(err)
-		}
-	}()
-
-	// Wait for an interrupt signal to trigger the shutdown
-	signals := make(chan os.Signal, 1)
-	signal.Notify(signals, os.Interrupt)
-	<-signals
-	consumer.AsyncClose()
-
-	// Wait for the Messages and Errors channel to be fully drained.
-	wg.Wait()
-	log.Println("Processed", msgCount, "messages.")
 }

+ 17 - 1
mockbroker_test.go

@@ -6,6 +6,7 @@ import (
 	"fmt"
 	"io"
 	"net"
+	"reflect"
 	"strconv"
 	"sync"
 	"testing"
@@ -61,6 +62,17 @@ func (b *mockBroker) SetHandler(handler requestHandlerFunc) {
 	b.handlerMux.Unlock()
 }
 
+func (b *mockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
+	b.SetHandler(func(req *request) (res encoder) {
+		reqTypeName := reflect.TypeOf(req.body).Elem().Name()
+		mockResponse := handlerMap[reqTypeName]
+		if mockResponse == nil {
+			return nil
+		}
+		return mockResponse.For(req.body)
+	})
+}
+
 func (b *mockBroker) BrokerID() int32 {
 	return b.brokerID
 }
@@ -139,7 +151,11 @@ func (b *mockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup)
 		}
 
 		res := b.requestHandler()(req)
-		Logger.Printf("*** mockbroker/%d/%d: served %+v -> %+v", b.brokerID, idx, req, res)
+		if res == nil {
+			Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, req)
+			continue
+		}
+		Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res)
 
 		encodedRes, err := encode(res)
 		if err != nil {

+ 232 - 0
mockresponses_test.go

@@ -0,0 +1,232 @@
+package sarama
+
+import (
+	"testing"
+)
+
+// MockResponse is a response builder interface it defines one method that
+// allows generating a response based on a request body.
+type MockResponse interface {
+	For(reqBody decoder) (res encoder)
+}
+
+type mockWrapper struct {
+	res encoder
+}
+
+func (mw *mockWrapper) For(reqBody decoder) (res encoder) {
+	return mw.res
+}
+
+func newMockWrapper(res encoder) *mockWrapper {
+	return &mockWrapper{res: res}
+}
+
+// mockMetadataResponse is a `MetadataResponse` builder.
+type mockMetadataResponse struct {
+	leaders map[string]map[int32]int32
+	brokers map[string]int32
+	t       *testing.T
+}
+
+func newMockMetadataResponse(t *testing.T) *mockMetadataResponse {
+	return &mockMetadataResponse{
+		leaders: make(map[string]map[int32]int32),
+		brokers: make(map[string]int32),
+		t:       t,
+	}
+}
+
+func (mmr *mockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *mockMetadataResponse {
+	partitions := mmr.leaders[topic]
+	if partitions == nil {
+		partitions = make(map[int32]int32)
+		mmr.leaders[topic] = partitions
+	}
+	partitions[partition] = brokerID
+	return mmr
+}
+
+func (mmr *mockMetadataResponse) SetBroker(addr string, brokerID int32) *mockMetadataResponse {
+	mmr.brokers[addr] = brokerID
+	return mmr
+}
+
+func (mor *mockMetadataResponse) For(reqBody decoder) encoder {
+	metadataRequest := reqBody.(*MetadataRequest)
+	metadataResponse := &MetadataResponse{}
+	for addr, brokerID := range mor.brokers {
+		metadataResponse.AddBroker(addr, brokerID)
+	}
+	if len(metadataRequest.Topics) == 0 {
+		for topic, partitions := range mor.leaders {
+			for partition, brokerID := range partitions {
+				metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
+			}
+		}
+		return metadataResponse
+	}
+	for _, topic := range metadataRequest.Topics {
+		for partition, brokerID := range mor.leaders[topic] {
+			metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
+		}
+	}
+	return metadataResponse
+}
+
+// mockOffsetResponse is an `OffsetResponse` builder.
+type mockOffsetResponse struct {
+	offsets map[string]map[int32]map[int64]int64
+	t       *testing.T
+}
+
+func newMockOffsetResponse(t *testing.T) *mockOffsetResponse {
+	return &mockOffsetResponse{
+		offsets: make(map[string]map[int32]map[int64]int64),
+		t:       t,
+	}
+}
+
+func (mor *mockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *mockOffsetResponse {
+	partitions := mor.offsets[topic]
+	if partitions == nil {
+		partitions = make(map[int32]map[int64]int64)
+		mor.offsets[topic] = partitions
+	}
+	times := partitions[partition]
+	if times == nil {
+		times = make(map[int64]int64)
+		partitions[partition] = times
+	}
+	times[time] = offset
+	return mor
+}
+
+func (mor *mockOffsetResponse) For(reqBody decoder) encoder {
+	offsetRequest := reqBody.(*OffsetRequest)
+	offsetResponse := &OffsetResponse{}
+	for topic, partitions := range offsetRequest.blocks {
+		for partition, block := range partitions {
+			offset := mor.getOffset(topic, partition, block.time)
+			offsetResponse.AddTopicPartition(topic, partition, offset)
+		}
+	}
+	return offsetResponse
+}
+
+func (mor *mockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
+	partitions := mor.offsets[topic]
+	if partitions == nil {
+		mor.t.Errorf("missing topic: %s", topic)
+	}
+	times := partitions[partition]
+	if times == nil {
+		mor.t.Errorf("missing partition: %d", partition)
+	}
+	offset, ok := times[time]
+	if !ok {
+		mor.t.Errorf("missing time: %d", time)
+	}
+	return offset
+}
+
+// mockFetchResponse is a `FetchResponse` builder.
+type mockFetchResponse struct {
+	messages       map[string]map[int32]map[int64]Encoder
+	highWaterMarks map[string]map[int32]int64
+	t              *testing.T
+	batchSize      int
+}
+
+func newMockFetchResponse(t *testing.T, batchSize int) *mockFetchResponse {
+	return &mockFetchResponse{
+		messages:       make(map[string]map[int32]map[int64]Encoder),
+		highWaterMarks: make(map[string]map[int32]int64),
+		t:              t,
+		batchSize:      batchSize,
+	}
+}
+
+func (mfr *mockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *mockFetchResponse {
+	partitions := mfr.messages[topic]
+	if partitions == nil {
+		partitions = make(map[int32]map[int64]Encoder)
+		mfr.messages[topic] = partitions
+	}
+	messages := partitions[partition]
+	if messages == nil {
+		messages = make(map[int64]Encoder)
+		partitions[partition] = messages
+	}
+	messages[offset] = msg
+	return mfr
+}
+
+func (mfr *mockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *mockFetchResponse {
+	partitions := mfr.highWaterMarks[topic]
+	if partitions == nil {
+		partitions = make(map[int32]int64)
+		mfr.highWaterMarks[topic] = partitions
+	}
+	partitions[partition] = offset
+	return mfr
+}
+
+func (mfr *mockFetchResponse) For(reqBody decoder) encoder {
+	fetchRequest := reqBody.(*FetchRequest)
+	res := &FetchResponse{}
+	for topic, partitions := range fetchRequest.blocks {
+		for partition, block := range partitions {
+			initialOffset := block.fetchOffset
+			offset := initialOffset
+			maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
+			for i := 0; i < mfr.batchSize && offset < maxOffset; {
+				msg := mfr.getMessage(topic, partition, offset)
+				if msg != nil {
+					res.AddMessage(topic, partition, nil, msg, offset)
+					i++
+				}
+				offset++
+			}
+			fb := res.GetBlock(topic, partition)
+			if fb == nil {
+				res.AddError(topic, partition, ErrNoError)
+				fb = res.GetBlock(topic, partition)
+			}
+			fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
+		}
+	}
+	return res
+}
+
+func (mfr *mockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
+	partitions := mfr.messages[topic]
+	if partitions == nil {
+		return nil
+	}
+	messages := partitions[partition]
+	if messages == nil {
+		return nil
+	}
+	return messages[offset]
+}
+
+func (mfr *mockFetchResponse) getMessageCount(topic string, partition int32) int {
+	partitions := mfr.messages[topic]
+	if partitions == nil {
+		return 0
+	}
+	messages := partitions[partition]
+	if messages == nil {
+		return 0
+	}
+	return len(messages)
+}
+
+func (mfr *mockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
+	partitions := mfr.highWaterMarks[topic]
+	if partitions == nil {
+		return 0
+	}
+	return partitions[partition]
+}