package sarama

import (
	"log"
	"os"
	"os/signal"
	"reflect"
	"strconv"
	"sync"
	"sync/atomic"
	"testing"
	"time"
)

var testMsg = StringEncoder("Foo")

// If a particular offset is provided then messages are consumed starting from
// that offset.
func TestConsumerOffsetManual(t *testing.T) {
	// Given
	broker0 := NewMockBroker(t, 0)

	mockFetchResponse := NewMockFetchResponse(t, 1)
	for i := 0; i < 10; i++ {
		mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg)
	}

	broker0.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(broker0.Addr(), broker0.BrokerID()).
			SetLeader("my_topic", 0, broker0.BrokerID()),
		"OffsetRequest": NewMockOffsetResponse(t).
			SetOffset("my_topic", 0, OffsetOldest, 0).
			SetOffset("my_topic", 0, OffsetNewest, 2345),
		"FetchRequest": mockFetchResponse,
	})

	// When
	master, err := NewConsumer([]string{broker0.Addr()}, nil)
	if err != nil {
		t.Fatal(err)
	}

	consumer, err := master.ConsumePartition("my_topic", 0, 1234)
	if err != nil {
		t.Fatal(err)
	}

	// Then: messages starting from offset 1234 are consumed.
	for i := 0; i < 10; i++ {
		select {
		case message := <-consumer.Messages():
			assertMessageOffset(t, message, int64(i+1234))
		case err := <-consumer.Errors():
			t.Error(err)
		}
	}

	safeClose(t, consumer)
	safeClose(t, master)
	broker0.Close()
}

// If `OffsetNewest` is passed as the initial offset then the first consumed
// message is indeed corresponds to the offset that broker claims to be the
// newest in its metadata response.
func TestConsumerOffsetNewest(t *testing.T) {
	// Given
	broker0 := NewMockBroker(t, 0)
	broker0.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(broker0.Addr(), broker0.BrokerID()).
			SetLeader("my_topic", 0, broker0.BrokerID()),
		"OffsetRequest": NewMockOffsetResponse(t).
			SetOffset("my_topic", 0, OffsetNewest, 10).
			SetOffset("my_topic", 0, OffsetOldest, 7),
		"FetchRequest": NewMockFetchResponse(t, 1).
			SetMessage("my_topic", 0, 9, testMsg).
			SetMessage("my_topic", 0, 10, testMsg).
			SetMessage("my_topic", 0, 11, testMsg).
			SetHighWaterMark("my_topic", 0, 14),
	})

	master, err := NewConsumer([]string{broker0.Addr()}, nil)
	if err != nil {
		t.Fatal(err)
	}

	// When
	consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
	if err != nil {
		t.Fatal(err)
	}

	// Then
	assertMessageOffset(t, <-consumer.Messages(), 10)
	if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
		t.Errorf("Expected high water mark offset 14, found %d", hwmo)
	}

	safeClose(t, consumer)
	safeClose(t, master)
	broker0.Close()
}

// It is possible to close a partition consumer and create the same anew.
func TestConsumerRecreate(t *testing.T) {
	// Given
	broker0 := NewMockBroker(t, 0)
	broker0.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(broker0.Addr(), broker0.BrokerID()).
			SetLeader("my_topic", 0, broker0.BrokerID()),
		"OffsetRequest": NewMockOffsetResponse(t).
			SetOffset("my_topic", 0, OffsetOldest, 0).
			SetOffset("my_topic", 0, OffsetNewest, 1000),
		"FetchRequest": NewMockFetchResponse(t, 1).
			SetMessage("my_topic", 0, 10, testMsg),
	})

	c, err := NewConsumer([]string{broker0.Addr()}, nil)
	if err != nil {
		t.Fatal(err)
	}

	pc, err := c.ConsumePartition("my_topic", 0, 10)
	if err != nil {
		t.Fatal(err)
	}
	assertMessageOffset(t, <-pc.Messages(), 10)

	// When
	safeClose(t, pc)
	pc, err = c.ConsumePartition("my_topic", 0, 10)
	if err != nil {
		t.Fatal(err)
	}

	// Then
	assertMessageOffset(t, <-pc.Messages(), 10)

	safeClose(t, pc)
	safeClose(t, c)
	broker0.Close()
}

// An attempt to consume the same partition twice should fail.
func TestConsumerDuplicate(t *testing.T) {
	// Given
	broker0 := NewMockBroker(t, 0)
	broker0.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(broker0.Addr(), broker0.BrokerID()).
			SetLeader("my_topic", 0, broker0.BrokerID()),
		"OffsetRequest": NewMockOffsetResponse(t).
			SetOffset("my_topic", 0, OffsetOldest, 0).
			SetOffset("my_topic", 0, OffsetNewest, 1000),
		"FetchRequest": NewMockFetchResponse(t, 1),
	})

	config := NewConfig()
	config.ChannelBufferSize = 0
	c, err := NewConsumer([]string{broker0.Addr()}, config)
	if err != nil {
		t.Fatal(err)
	}

	pc1, err := c.ConsumePartition("my_topic", 0, 0)
	if err != nil {
		t.Fatal(err)
	}

	// When
	pc2, err := c.ConsumePartition("my_topic", 0, 0)

	// Then
	if pc2 != nil || err != ConfigurationError("That topic/partition is already being consumed") {
		t.Fatal("A partition cannot be consumed twice at the same time")
	}

	safeClose(t, pc1)
	safeClose(t, c)
	broker0.Close()
}

func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) {
	// Given
	broker0 := NewMockBroker(t, 100)

	// Stage 1: my_topic/0 served by broker0
	Logger.Printf("    STAGE 1")

	broker0.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(broker0.Addr(), broker0.BrokerID()).
			SetLeader("my_topic", 0, broker0.BrokerID()),
		"OffsetRequest": NewMockOffsetResponse(t).
			SetOffset("my_topic", 0, OffsetOldest, 123).
			SetOffset("my_topic", 0, OffsetNewest, 1000),
		"FetchRequest": NewMockFetchResponse(t, 1).
			SetMessage("my_topic", 0, 123, testMsg),
	})

	c, err := NewConsumer([]string{broker0.Addr()}, config)
	if err != nil {
		t.Fatal(err)
	}

	pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
	if err != nil {
		t.Fatal(err)
	}

	assertMessageOffset(t, <-pc.Messages(), 123)

	// Stage 2: broker0 says that it is no longer the leader for my_topic/0,
	// but the requests to retrieve metadata fail with network timeout.
	Logger.Printf("    STAGE 2")

	fetchResponse2 := &FetchResponse{}
	fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)

	broker0.SetHandlerByMap(map[string]MockResponse{
		"FetchRequest": NewMockWrapper(fetchResponse2),
	})

	if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
		t.Errorf("Unexpected error: %v", consErr.Err)
	}

	// Stage 3: finally the metadata returned by broker0 tells that broker1 is
	// a new leader for my_topic/0. Consumption resumes.

	Logger.Printf("    STAGE 3")

	broker1 := NewMockBroker(t, 101)

	broker1.SetHandlerByMap(map[string]MockResponse{
		"FetchRequest": NewMockFetchResponse(t, 1).
			SetMessage("my_topic", 0, 124, testMsg),
	})
	broker0.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(broker0.Addr(), broker0.BrokerID()).
			SetBroker(broker1.Addr(), broker1.BrokerID()).
			SetLeader("my_topic", 0, broker1.BrokerID()),
	})

	assertMessageOffset(t, <-pc.Messages(), 124)

	safeClose(t, pc)
	safeClose(t, c)
	broker1.Close()
	broker0.Close()
}

// If consumer fails to refresh metadata it keeps retrying with frequency
// specified by `Config.Consumer.Retry.Backoff`.
func TestConsumerLeaderRefreshError(t *testing.T) {
	config := NewConfig()
	config.Net.ReadTimeout = 100 * time.Millisecond
	config.Consumer.Retry.Backoff = 200 * time.Millisecond
	config.Consumer.Return.Errors = true
	config.Metadata.Retry.Max = 0

	runConsumerLeaderRefreshErrorTestWithConfig(t, config)
}

func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) {
	var calls int32 = 0

	config := NewConfig()
	config.Net.ReadTimeout = 100 * time.Millisecond
	config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {
		atomic.AddInt32(&calls, 1)
		return 200 * time.Millisecond
	}
	config.Consumer.Return.Errors = true
	config.Metadata.Retry.Max = 0

	runConsumerLeaderRefreshErrorTestWithConfig(t, config)

	// we expect at least one call to our backoff function
	if calls == 0 {
		t.Fail()
	}
}

func TestConsumerInvalidTopic(t *testing.T) {
	// Given
	broker0 := NewMockBroker(t, 100)
	broker0.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(broker0.Addr(), broker0.BrokerID()),
	})

	c, err := NewConsumer([]string{broker0.Addr()}, nil)
	if err != nil {
		t.Fatal(err)
	}

	// When
	pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)

	// Then
	if pc != nil || err != ErrUnknownTopicOrPartition {
		t.Errorf("Should fail with, err=%v", err)
	}

	safeClose(t, c)
	broker0.Close()
}

// Nothing bad happens if a partition consumer that has no leader assigned at
// the moment is closed.
func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
	// Given
	broker0 := NewMockBroker(t, 100)
	broker0.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(broker0.Addr(), broker0.BrokerID()).
			SetLeader("my_topic", 0, broker0.BrokerID()),
		"OffsetRequest": NewMockOffsetResponse(t).
			SetOffset("my_topic", 0, OffsetOldest, 123).
			SetOffset("my_topic", 0, OffsetNewest, 1000),
		"FetchRequest": NewMockFetchResponse(t, 1).
			SetMessage("my_topic", 0, 123, testMsg),
	})

	config := NewConfig()
	config.Net.ReadTimeout = 100 * time.Millisecond
	config.Consumer.Retry.Backoff = 100 * time.Millisecond
	config.Consumer.Return.Errors = true
	config.Metadata.Retry.Max = 0
	c, err := NewConsumer([]string{broker0.Addr()}, config)
	if err != nil {
		t.Fatal(err)
	}

	pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
	if err != nil {
		t.Fatal(err)
	}

	assertMessageOffset(t, <-pc.Messages(), 123)

	// broker0 says that it is no longer the leader for my_topic/0, but the
	// requests to retrieve metadata fail with network timeout.
	fetchResponse2 := &FetchResponse{}
	fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)

	broker0.SetHandlerByMap(map[string]MockResponse{
		"FetchRequest": NewMockWrapper(fetchResponse2),
	})

	// When
	if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
		t.Errorf("Unexpected error: %v", consErr.Err)
	}

	// Then: the partition consumer can be closed without any problem.
	safeClose(t, pc)
	safeClose(t, c)
	broker0.Close()
}

// If the initial offset passed on partition consumer creation is out of the
// actual offset range for the partition, then the partition consumer stops
// immediately closing its output channels.
func TestConsumerShutsDownOutOfRange(t *testing.T) {
	// Given
	broker0 := NewMockBroker(t, 0)
	fetchResponse := new(FetchResponse)
	fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
	broker0.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(broker0.Addr(), broker0.BrokerID()).
			SetLeader("my_topic", 0, broker0.BrokerID()),
		"OffsetRequest": NewMockOffsetResponse(t).
			SetOffset("my_topic", 0, OffsetNewest, 1234).
			SetOffset("my_topic", 0, OffsetOldest, 7),
		"FetchRequest": NewMockWrapper(fetchResponse),
	})

	master, err := NewConsumer([]string{broker0.Addr()}, nil)
	if err != nil {
		t.Fatal(err)
	}

	// When
	consumer, err := master.ConsumePartition("my_topic", 0, 101)
	if err != nil {
		t.Fatal(err)
	}

	// Then: consumer should shut down closing its messages and errors channels.
	if _, ok := <-consumer.Messages(); ok {
		t.Error("Expected the consumer to shut down")
	}
	safeClose(t, consumer)

	safeClose(t, master)
	broker0.Close()
}

// If a fetch response contains messages with offsets that are smaller then
// requested, then such messages are ignored.
func TestConsumerExtraOffsets(t *testing.T) {
	// Given
	legacyFetchResponse := &FetchResponse{}
	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
	newFetchResponse := &FetchResponse{Version: 4}
	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1)
	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4)
	newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4)
	newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
	for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
		var offsetResponseVersion int16
		cfg := NewConfig()
		cfg.Consumer.Return.Errors = true
		if fetchResponse1.Version >= 4 {
			cfg.Version = V0_11_0_0
			offsetResponseVersion = 1
		}

		broker0 := NewMockBroker(t, 0)
		fetchResponse2 := &FetchResponse{}
		fetchResponse2.Version = fetchResponse1.Version
		fetchResponse2.AddError("my_topic", 0, ErrNoError)
		broker0.SetHandlerByMap(map[string]MockResponse{
			"MetadataRequest": NewMockMetadataResponse(t).
				SetBroker(broker0.Addr(), broker0.BrokerID()).
				SetLeader("my_topic", 0, broker0.BrokerID()),
			"OffsetRequest": NewMockOffsetResponse(t).
				SetVersion(offsetResponseVersion).
				SetOffset("my_topic", 0, OffsetNewest, 1234).
				SetOffset("my_topic", 0, OffsetOldest, 0),
			"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
		})

		master, err := NewConsumer([]string{broker0.Addr()}, cfg)
		if err != nil {
			t.Fatal(err)
		}

		// When
		consumer, err := master.ConsumePartition("my_topic", 0, 3)
		if err != nil {
			t.Fatal(err)
		}

		// Then: messages with offsets 1 and 2 are not returned even though they
		// are present in the response.
		select {
		case msg := <-consumer.Messages():
			assertMessageOffset(t, msg, 3)
		case err := <-consumer.Errors():
			t.Fatal(err)
		}

		select {
		case msg := <-consumer.Messages():
			assertMessageOffset(t, msg, 4)
		case err := <-consumer.Errors():
			t.Fatal(err)
		}

		safeClose(t, consumer)
		safeClose(t, master)
		broker0.Close()
	}
}

// In some situations broker may return a block containing only
// messages older then requested, even though there would be
// more messages if higher offset was requested.
func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {
	// Given
	fetchResponse1 := &FetchResponse{Version: 4}
	fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1)

	fetchResponse2 := &FetchResponse{Version: 4}
	fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000)

	cfg := NewConfig()
	cfg.Consumer.Return.Errors = true
	cfg.Version = V0_11_0_0

	broker0 := NewMockBroker(t, 0)

	broker0.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(broker0.Addr(), broker0.BrokerID()).
			SetLeader("my_topic", 0, broker0.BrokerID()),
		"OffsetRequest": NewMockOffsetResponse(t).
			SetVersion(1).
			SetOffset("my_topic", 0, OffsetNewest, 1234).
			SetOffset("my_topic", 0, OffsetOldest, 0),
		"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
	})

	master, err := NewConsumer([]string{broker0.Addr()}, cfg)
	if err != nil {
		t.Fatal(err)
	}

	// When
	consumer, err := master.ConsumePartition("my_topic", 0, 2)
	if err != nil {
		t.Fatal(err)
	}

	select {
	case msg := <-consumer.Messages():
		assertMessageOffset(t, msg, 1000000)
	case err := <-consumer.Errors():
		t.Fatal(err)
	}

	safeClose(t, consumer)
	safeClose(t, master)
	broker0.Close()
}

func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
	// Given
	fetchResponse1 := &FetchResponse{Version: 4}
	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)

	cfg := NewConfig()
	cfg.Version = V0_11_0_0

	broker0 := NewMockBroker(t, 0)
	fetchResponse2 := &FetchResponse{}
	fetchResponse2.Version = 4
	fetchResponse2.AddError("my_topic", 0, ErrNoError)
	broker0.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(broker0.Addr(), broker0.BrokerID()).
			SetLeader("my_topic", 0, broker0.BrokerID()),
		"OffsetRequest": NewMockOffsetResponse(t).
			SetVersion(1).
			SetOffset("my_topic", 0, OffsetNewest, 1234).
			SetOffset("my_topic", 0, OffsetOldest, 0),
		"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
	})

	master, err := NewConsumer([]string{broker0.Addr()}, cfg)
	if err != nil {
		t.Fatal(err)
	}

	// When
	consumer, err := master.ConsumePartition("my_topic", 0, 1)
	if err != nil {
		t.Fatal(err)
	}

	assertMessageOffset(t, <-consumer.Messages(), 1)
	assertMessageOffset(t, <-consumer.Messages(), 2)

	safeClose(t, consumer)
	safeClose(t, master)
	broker0.Close()
}

func TestConsumeMessageWithSessionIDs(t *testing.T) {
	// Given
	fetchResponse1 := &FetchResponse{Version: 7}
	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
	fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)

	cfg := NewConfig()
	cfg.Version = V1_1_0_0

	broker0 := NewMockBroker(t, 0)
	fetchResponse2 := &FetchResponse{}
	fetchResponse2.Version = 7
	fetchResponse2.AddError("my_topic", 0, ErrNoError)

	broker0.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(broker0.Addr(), broker0.BrokerID()).
			SetLeader("my_topic", 0, broker0.BrokerID()),
		"OffsetRequest": NewMockOffsetResponse(t).
			SetVersion(1).
			SetOffset("my_topic", 0, OffsetNewest, 1234).
			SetOffset("my_topic", 0, OffsetOldest, 0),
		"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
	})

	master, err := NewConsumer([]string{broker0.Addr()}, cfg)
	if err != nil {
		t.Fatal(err)
	}

	// When
	consumer, err := master.ConsumePartition("my_topic", 0, 1)
	if err != nil {
		t.Fatal(err)
	}

	assertMessageOffset(t, <-consumer.Messages(), 1)
	assertMessageOffset(t, <-consumer.Messages(), 2)

	safeClose(t, consumer)
	safeClose(t, master)
	broker0.Close()

	fetchReq := broker0.History()[3].Request.(*FetchRequest)
	if fetchReq.SessionID != 0 || fetchReq.SessionEpoch != -1 {
		t.Error("Expected session ID to be zero & Epoch to be -1")
	}
}

// It is fine if offsets of fetched messages are not sequential (although
// strictly increasing!).
func TestConsumerNonSequentialOffsets(t *testing.T) {
	// Given
	legacyFetchResponse := &FetchResponse{}
	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
	legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
	newFetchResponse := &FetchResponse{Version: 4}
	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
	newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
	newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11)
	newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
	for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
		var offsetResponseVersion int16
		cfg := NewConfig()
		if fetchResponse1.Version >= 4 {
			cfg.Version = V0_11_0_0
			offsetResponseVersion = 1
		}

		broker0 := NewMockBroker(t, 0)
		fetchResponse2 := &FetchResponse{Version: fetchResponse1.Version}
		fetchResponse2.AddError("my_topic", 0, ErrNoError)
		broker0.SetHandlerByMap(map[string]MockResponse{
			"MetadataRequest": NewMockMetadataResponse(t).
				SetBroker(broker0.Addr(), broker0.BrokerID()).
				SetLeader("my_topic", 0, broker0.BrokerID()),
			"OffsetRequest": NewMockOffsetResponse(t).
				SetVersion(offsetResponseVersion).
				SetOffset("my_topic", 0, OffsetNewest, 1234).
				SetOffset("my_topic", 0, OffsetOldest, 0),
			"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
		})

		master, err := NewConsumer([]string{broker0.Addr()}, cfg)
		if err != nil {
			t.Fatal(err)
		}

		// When
		consumer, err := master.ConsumePartition("my_topic", 0, 3)
		if err != nil {
			t.Fatal(err)
		}

		// Then: messages with offsets 1 and 2 are not returned even though they
		// are present in the response.
		assertMessageOffset(t, <-consumer.Messages(), 5)
		assertMessageOffset(t, <-consumer.Messages(), 7)
		assertMessageOffset(t, <-consumer.Messages(), 11)

		safeClose(t, consumer)
		safeClose(t, master)
		broker0.Close()
	}
}

// If leadership for a partition is changing then consumer resolves the new
// leader and switches to it.
func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
	// initial setup
	seedBroker := NewMockBroker(t, 10)
	leader0 := NewMockBroker(t, 0)
	leader1 := NewMockBroker(t, 1)

	seedBroker.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(leader0.Addr(), leader0.BrokerID()).
			SetBroker(leader1.Addr(), leader1.BrokerID()).
			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
			SetLeader("my_topic", 0, leader0.BrokerID()).
			SetLeader("my_topic", 1, leader1.BrokerID()),
	})

	mockOffsetResponse1 := NewMockOffsetResponse(t).
		SetOffset("my_topic", 0, OffsetOldest, 0).
		SetOffset("my_topic", 0, OffsetNewest, 1000).
		SetOffset("my_topic", 1, OffsetOldest, 0).
		SetOffset("my_topic", 1, OffsetNewest, 1000)
	leader0.SetHandlerByMap(map[string]MockResponse{
		"OffsetRequest": mockOffsetResponse1,
		"FetchRequest":  NewMockFetchResponse(t, 1),
	})
	leader1.SetHandlerByMap(map[string]MockResponse{
		"OffsetRequest": mockOffsetResponse1,
		"FetchRequest":  NewMockFetchResponse(t, 1),
	})

	// launch test goroutines
	config := NewConfig()
	config.Consumer.Retry.Backoff = 50
	master, err := NewConsumer([]string{seedBroker.Addr()}, config)
	if err != nil {
		t.Fatal(err)
	}

	// we expect to end up (eventually) consuming exactly ten messages on each partition
	var wg sync.WaitGroup
	for i := int32(0); i < 2; i++ {
		consumer, err := master.ConsumePartition("my_topic", i, 0)
		if err != nil {
			t.Error(err)
		}

		go func(c PartitionConsumer) {
			for err := range c.Errors() {
				t.Error(err)
			}
		}(consumer)

		wg.Add(1)
		go func(partition int32, c PartitionConsumer) {
			for i := 0; i < 10; i++ {
				message := <-consumer.Messages()
				if message.Offset != int64(i) {
					t.Error("Incorrect message offset!", i, partition, message.Offset)
				}
				if message.Partition != partition {
					t.Error("Incorrect message partition!")
				}
			}
			safeClose(t, consumer)
			wg.Done()
		}(i, consumer)
	}

	time.Sleep(50 * time.Millisecond)
	Logger.Printf("    STAGE 1")
	// Stage 1:
	//   * my_topic/0 -> leader0 serves 4 messages
	//   * my_topic/1 -> leader1 serves 0 messages

	mockFetchResponse := NewMockFetchResponse(t, 1)
	for i := 0; i < 4; i++ {
		mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
	}
	leader0.SetHandlerByMap(map[string]MockResponse{
		"FetchRequest": mockFetchResponse,
	})

	time.Sleep(50 * time.Millisecond)
	Logger.Printf("    STAGE 2")
	// Stage 2:
	//   * leader0 says that it is no longer serving my_topic/0
	//   * seedBroker tells that leader1 is serving my_topic/0 now

	// seed broker tells that the new partition 0 leader is leader1
	seedBroker.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetLeader("my_topic", 0, leader1.BrokerID()).
			SetLeader("my_topic", 1, leader1.BrokerID()).
			SetBroker(leader0.Addr(), leader0.BrokerID()).
			SetBroker(leader1.Addr(), leader1.BrokerID()).
			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
	})

	// leader0 says no longer leader of partition 0
	fetchResponse := new(FetchResponse)
	fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
	leader0.SetHandlerByMap(map[string]MockResponse{
		"FetchRequest": NewMockWrapper(fetchResponse),
	})

	time.Sleep(50 * time.Millisecond)
	Logger.Printf("    STAGE 3")
	// Stage 3:
	//   * my_topic/0 -> leader1 serves 3 messages
	//   * my_topic/1 -> leader1 server 8 messages

	// leader1 provides 3 message on partition 0, and 8 messages on partition 1
	mockFetchResponse2 := NewMockFetchResponse(t, 2)
	for i := 4; i < 7; i++ {
		mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg)
	}
	for i := 0; i < 8; i++ {
		mockFetchResponse2.SetMessage("my_topic", 1, int64(i), testMsg)
	}
	leader1.SetHandlerByMap(map[string]MockResponse{
		"FetchRequest": mockFetchResponse2,
	})

	time.Sleep(50 * time.Millisecond)
	Logger.Printf("    STAGE 4")
	// Stage 4:
	//   * my_topic/0 -> leader1 serves 3 messages
	//   * my_topic/1 -> leader1 tells that it is no longer the leader
	//   * seedBroker tells that leader0 is a new leader for my_topic/1

	// metadata assigns 0 to leader1 and 1 to leader0
	seedBroker.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetLeader("my_topic", 0, leader1.BrokerID()).
			SetLeader("my_topic", 1, leader0.BrokerID()).
			SetBroker(leader0.Addr(), leader0.BrokerID()).
			SetBroker(leader1.Addr(), leader1.BrokerID()).
			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
	})

	// leader1 provides three more messages on partition0, says no longer leader of partition1
	mockFetchResponse3 := NewMockFetchResponse(t, 3).
		SetMessage("my_topic", 0, int64(7), testMsg).
		SetMessage("my_topic", 0, int64(8), testMsg).
		SetMessage("my_topic", 0, int64(9), testMsg)
	fetchResponse4 := new(FetchResponse)
	fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition)
	leader1.SetHandlerByMap(map[string]MockResponse{
		"FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4),
	})

	// leader0 provides two messages on partition 1
	mockFetchResponse4 := NewMockFetchResponse(t, 2)
	for i := 8; i < 10; i++ {
		mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg)
	}
	leader0.SetHandlerByMap(map[string]MockResponse{
		"FetchRequest": mockFetchResponse4,
	})

	wg.Wait()
	safeClose(t, master)
	leader1.Close()
	leader0.Close()
	seedBroker.Close()
}

// When two partitions have the same broker as the leader, if one partition
// consumer channel buffer is full then that does not affect the ability to
// read messages by the other consumer.
func TestConsumerInterleavedClose(t *testing.T) {
	// Given
	broker0 := NewMockBroker(t, 0)
	broker0.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(broker0.Addr(), broker0.BrokerID()).
			SetLeader("my_topic", 0, broker0.BrokerID()).
			SetLeader("my_topic", 1, broker0.BrokerID()),
		"OffsetRequest": NewMockOffsetResponse(t).
			SetOffset("my_topic", 0, OffsetOldest, 1000).
			SetOffset("my_topic", 0, OffsetNewest, 1100).
			SetOffset("my_topic", 1, OffsetOldest, 2000).
			SetOffset("my_topic", 1, OffsetNewest, 2100),
		"FetchRequest": NewMockFetchResponse(t, 1).
			SetMessage("my_topic", 0, 1000, testMsg).
			SetMessage("my_topic", 0, 1001, testMsg).
			SetMessage("my_topic", 0, 1002, testMsg).
			SetMessage("my_topic", 1, 2000, testMsg),
	})

	config := NewConfig()
	config.ChannelBufferSize = 0
	master, err := NewConsumer([]string{broker0.Addr()}, config)
	if err != nil {
		t.Fatal(err)
	}

	c0, err := master.ConsumePartition("my_topic", 0, 1000)
	if err != nil {
		t.Fatal(err)
	}

	c1, err := master.ConsumePartition("my_topic", 1, 2000)
	if err != nil {
		t.Fatal(err)
	}

	// When/Then: we can read from partition 0 even if nobody reads from partition 1
	assertMessageOffset(t, <-c0.Messages(), 1000)
	assertMessageOffset(t, <-c0.Messages(), 1001)
	assertMessageOffset(t, <-c0.Messages(), 1002)

	safeClose(t, c1)
	safeClose(t, c0)
	safeClose(t, master)
	broker0.Close()
}

func TestConsumerBounceWithReferenceOpen(t *testing.T) {
	broker0 := NewMockBroker(t, 0)
	broker0Addr := broker0.Addr()
	broker1 := NewMockBroker(t, 1)

	mockMetadataResponse := NewMockMetadataResponse(t).
		SetBroker(broker0.Addr(), broker0.BrokerID()).
		SetBroker(broker1.Addr(), broker1.BrokerID()).
		SetLeader("my_topic", 0, broker0.BrokerID()).
		SetLeader("my_topic", 1, broker1.BrokerID())

	mockOffsetResponse := NewMockOffsetResponse(t).
		SetOffset("my_topic", 0, OffsetOldest, 1000).
		SetOffset("my_topic", 0, OffsetNewest, 1100).
		SetOffset("my_topic", 1, OffsetOldest, 2000).
		SetOffset("my_topic", 1, OffsetNewest, 2100)

	mockFetchResponse := NewMockFetchResponse(t, 1)
	for i := 0; i < 10; i++ {
		mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg)
		mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg)
	}

	broker0.SetHandlerByMap(map[string]MockResponse{
		"OffsetRequest": mockOffsetResponse,
		"FetchRequest":  mockFetchResponse,
	})
	broker1.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": mockMetadataResponse,
		"OffsetRequest":   mockOffsetResponse,
		"FetchRequest":    mockFetchResponse,
	})

	config := NewConfig()
	config.Consumer.Return.Errors = true
	config.Consumer.Retry.Backoff = 100 * time.Millisecond
	config.ChannelBufferSize = 1
	master, err := NewConsumer([]string{broker1.Addr()}, config)
	if err != nil {
		t.Fatal(err)
	}

	c0, err := master.ConsumePartition("my_topic", 0, 1000)
	if err != nil {
		t.Fatal(err)
	}

	c1, err := master.ConsumePartition("my_topic", 1, 2000)
	if err != nil {
		t.Fatal(err)
	}

	// read messages from both partition to make sure that both brokers operate
	// normally.
	assertMessageOffset(t, <-c0.Messages(), 1000)
	assertMessageOffset(t, <-c1.Messages(), 2000)

	// Simulate broker shutdown. Note that metadata response does not change,
	// that is the leadership does not move to another broker. So partition
	// consumer will keep retrying to restore the connection with the broker.
	broker0.Close()

	// Make sure that while the partition/0 leader is down, consumer/partition/1
	// is capable of pulling messages from broker1.
	for i := 1; i < 7; i++ {
		offset := (<-c1.Messages()).Offset
		if offset != int64(2000+i) {
			t.Errorf("Expected offset %d from consumer/partition/1", int64(2000+i))
		}
	}

	// Bring broker0 back to service.
	broker0 = NewMockBrokerAddr(t, 0, broker0Addr)
	broker0.SetHandlerByMap(map[string]MockResponse{
		"FetchRequest": mockFetchResponse,
	})

	// Read the rest of messages from both partitions.
	for i := 7; i < 10; i++ {
		assertMessageOffset(t, <-c1.Messages(), int64(2000+i))
	}
	for i := 1; i < 10; i++ {
		assertMessageOffset(t, <-c0.Messages(), int64(1000+i))
	}

	select {
	case <-c0.Errors():
	default:
		t.Errorf("Partition consumer should have detected broker restart")
	}

	safeClose(t, c1)
	safeClose(t, c0)
	safeClose(t, master)
	broker0.Close()
	broker1.Close()
}

func TestConsumerOffsetOutOfRange(t *testing.T) {
	// Given
	broker0 := NewMockBroker(t, 2)
	broker0.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(broker0.Addr(), broker0.BrokerID()).
			SetLeader("my_topic", 0, broker0.BrokerID()),
		"OffsetRequest": NewMockOffsetResponse(t).
			SetOffset("my_topic", 0, OffsetNewest, 1234).
			SetOffset("my_topic", 0, OffsetOldest, 2345),
	})

	master, err := NewConsumer([]string{broker0.Addr()}, nil)
	if err != nil {
		t.Fatal(err)
	}

	// When/Then
	if _, err := master.ConsumePartition("my_topic", 0, 0); err != ErrOffsetOutOfRange {
		t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
	}
	if _, err := master.ConsumePartition("my_topic", 0, 3456); err != ErrOffsetOutOfRange {
		t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
	}
	if _, err := master.ConsumePartition("my_topic", 0, -3); err != ErrOffsetOutOfRange {
		t.Fatal("Should return ErrOffsetOutOfRange, got:", err)
	}

	safeClose(t, master)
	broker0.Close()
}

func TestConsumerExpiryTicker(t *testing.T) {
	// Given
	broker0 := NewMockBroker(t, 0)
	fetchResponse1 := &FetchResponse{}
	for i := 1; i <= 8; i++ {
		fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, int64(i))
	}
	broker0.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(broker0.Addr(), broker0.BrokerID()).
			SetLeader("my_topic", 0, broker0.BrokerID()),
		"OffsetRequest": NewMockOffsetResponse(t).
			SetOffset("my_topic", 0, OffsetNewest, 1234).
			SetOffset("my_topic", 0, OffsetOldest, 1),
		"FetchRequest": NewMockSequence(fetchResponse1),
	})

	config := NewConfig()
	config.ChannelBufferSize = 0
	config.Consumer.MaxProcessingTime = 10 * time.Millisecond
	master, err := NewConsumer([]string{broker0.Addr()}, config)
	if err != nil {
		t.Fatal(err)
	}

	// When
	consumer, err := master.ConsumePartition("my_topic", 0, 1)
	if err != nil {
		t.Fatal(err)
	}

	// Then: messages with offsets 1 through 8 are read
	for i := 1; i <= 8; i++ {
		assertMessageOffset(t, <-consumer.Messages(), int64(i))
		time.Sleep(2 * time.Millisecond)
	}

	safeClose(t, consumer)
	safeClose(t, master)
	broker0.Close()
}

func TestConsumerTimestamps(t *testing.T) {
	now := time.Now().Truncate(time.Millisecond)
	type testMessage struct {
		key       Encoder
		offset    int64
		timestamp time.Time
	}
	for _, d := range []struct {
		kversion          KafkaVersion
		logAppendTime     bool
		messages          []testMessage
		expectedTimestamp []time.Time
	}{
		{MinVersion, false, []testMessage{
			{testMsg, 1, now},
			{testMsg, 2, now},
		}, []time.Time{{}, {}}},
		{V0_9_0_0, false, []testMessage{
			{testMsg, 1, now},
			{testMsg, 2, now},
		}, []time.Time{{}, {}}},
		{V0_10_0_0, false, []testMessage{
			{testMsg, 1, now},
			{testMsg, 2, now},
		}, []time.Time{{}, {}}},
		{V0_10_2_1, false, []testMessage{
			{testMsg, 1, now.Add(time.Second)},
			{testMsg, 2, now.Add(2 * time.Second)},
		}, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
		{V0_10_2_1, true, []testMessage{
			{testMsg, 1, now.Add(time.Second)},
			{testMsg, 2, now.Add(2 * time.Second)},
		}, []time.Time{now, now}},
		{V0_11_0_0, false, []testMessage{
			{testMsg, 1, now.Add(time.Second)},
			{testMsg, 2, now.Add(2 * time.Second)},
		}, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
		{V0_11_0_0, true, []testMessage{
			{testMsg, 1, now.Add(time.Second)},
			{testMsg, 2, now.Add(2 * time.Second)},
		}, []time.Time{now, now}},
	} {
		var fr *FetchResponse
		var offsetResponseVersion int16
		cfg := NewConfig()
		cfg.Version = d.kversion
		switch {
		case d.kversion.IsAtLeast(V0_11_0_0):
			offsetResponseVersion = 1
			fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
			for _, m := range d.messages {
				fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
			}
			fr.SetLastOffsetDelta("my_topic", 0, 2)
			fr.SetLastStableOffset("my_topic", 0, 2)
		case d.kversion.IsAtLeast(V0_10_1_0):
			offsetResponseVersion = 1
			fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
			for _, m := range d.messages {
				fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
			}
		default:
			var version int16
			switch {
			case d.kversion.IsAtLeast(V0_10_0_0):
				version = 2
			case d.kversion.IsAtLeast(V0_9_0_0):
				version = 1
			}
			fr = &FetchResponse{Version: version}
			for _, m := range d.messages {
				fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0)
			}
		}

		broker0 := NewMockBroker(t, 0)
		broker0.SetHandlerByMap(map[string]MockResponse{
			"MetadataRequest": NewMockMetadataResponse(t).
				SetBroker(broker0.Addr(), broker0.BrokerID()).
				SetLeader("my_topic", 0, broker0.BrokerID()),
			"OffsetRequest": NewMockOffsetResponse(t).
				SetVersion(offsetResponseVersion).
				SetOffset("my_topic", 0, OffsetNewest, 1234).
				SetOffset("my_topic", 0, OffsetOldest, 0),
			"FetchRequest": NewMockSequence(fr),
		})

		master, err := NewConsumer([]string{broker0.Addr()}, cfg)
		if err != nil {
			t.Fatal(err)
		}

		consumer, err := master.ConsumePartition("my_topic", 0, 1)
		if err != nil {
			t.Fatal(err)
		}

		for i, ts := range d.expectedTimestamp {
			select {
			case msg := <-consumer.Messages():
				assertMessageOffset(t, msg, int64(i)+1)
				if msg.Timestamp != ts {
					t.Errorf("Wrong timestamp (kversion:%v, logAppendTime:%v): got: %v, want: %v",
						d.kversion, d.logAppendTime, msg.Timestamp, ts)
				}
			case err := <-consumer.Errors():
				t.Fatal(err)
			}
		}

		safeClose(t, consumer)
		safeClose(t, master)
		broker0.Close()
	}
}

// When set to ReadCommitted, no uncommitted message should be available in messages channel
func TestExcludeUncommitted(t *testing.T) {
	// Given
	broker0 := NewMockBroker(t, 0)

	fetchResponse := &FetchResponse{
		Version: 4,
		Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: {
			AbortedTransactions: []*AbortedTransaction{{ProducerID: 7, FirstOffset: 1235}},
		}}},
	}
	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1234, 7, true)   // committed msg
	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1235, 7, true)   // uncommitted msg
	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1236, 7, true)   // uncommitted msg
	fetchResponse.AddControlRecord("my_topic", 0, 1237, 7, ControlRecordAbort) // abort control record
	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1238, 7, true)   // committed msg

	broker0.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(broker0.Addr(), broker0.BrokerID()).
			SetLeader("my_topic", 0, broker0.BrokerID()),
		"OffsetRequest": NewMockOffsetResponse(t).
			SetVersion(1).
			SetOffset("my_topic", 0, OffsetOldest, 0).
			SetOffset("my_topic", 0, OffsetNewest, 1237),
		"FetchRequest": NewMockWrapper(fetchResponse),
	})

	cfg := NewConfig()
	cfg.Consumer.Return.Errors = true
	cfg.Version = V0_11_0_0
	cfg.Consumer.IsolationLevel = ReadCommitted

	// When
	master, err := NewConsumer([]string{broker0.Addr()}, cfg)
	if err != nil {
		t.Fatal(err)
	}

	consumer, err := master.ConsumePartition("my_topic", 0, 1234)
	if err != nil {
		t.Fatal(err)
	}

	// Then: only the 2 committed messages are returned
	select {
	case message := <-consumer.Messages():
		assertMessageOffset(t, message, int64(1234))
	case err := <-consumer.Errors():
		t.Error(err)
	}
	select {
	case message := <-consumer.Messages():
		assertMessageOffset(t, message, int64(1238))
	case err := <-consumer.Errors():
		t.Error(err)
	}

	safeClose(t, consumer)
	safeClose(t, master)
	broker0.Close()
}

func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
	if msg.Offset != expectedOffset {
		t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
	}
}

// This example shows how to use the consumer to read messages
// from a single partition.
func ExampleConsumer() {
	consumer, err := NewConsumer([]string{"localhost:9092"}, nil)
	if err != nil {
		panic(err)
	}

	defer func() {
		if err := consumer.Close(); err != nil {
			log.Fatalln(err)
		}
	}()

	partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
	if err != nil {
		panic(err)
	}

	defer func() {
		if err := partitionConsumer.Close(); err != nil {
			log.Fatalln(err)
		}
	}()

	// Trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	consumed := 0
ConsumerLoop:
	for {
		select {
		case msg := <-partitionConsumer.Messages():
			log.Printf("Consumed message offset %d\n", msg.Offset)
			consumed++
		case <-signals:
			break ConsumerLoop
		}
	}

	log.Printf("Consumed: %d\n", consumed)
}

func Test_partitionConsumer_parseResponse(t *testing.T) {
	type args struct {
		response *FetchResponse
	}
	tests := []struct {
		name    string
		args    args
		want    []*ConsumerMessage
		wantErr bool
	}{
		{
			name: "empty but throttled FetchResponse is not considered an error",
			args: args{
				response: &FetchResponse{
					ThrottleTime: time.Millisecond,
				},
			},
		},
		{
			name: "empty FetchResponse is considered an incomplete response by default",
			args: args{
				response: &FetchResponse{},
			},
			wantErr: true,
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			child := &partitionConsumer{
				broker: &brokerConsumer{
					broker: &Broker{},
				},
				conf: &Config{},
			}
			got, err := child.parseResponse(tt.args.response)
			if (err != nil) != tt.wantErr {
				t.Errorf("partitionConsumer.parseResponse() error = %v, wantErr %v", err, tt.wantErr)
				return
			}
			if !reflect.DeepEqual(got, tt.want) {
				t.Errorf("partitionConsumer.parseResponse() = %v, want %v", got, tt.want)
			}
		})
	}
}

func testConsumerInterceptor(
	t *testing.T,
	interceptors []ConsumerInterceptor,
	expectationFn func(*testing.T, int, *ConsumerMessage),
) {
	// Given
	broker0 := NewMockBroker(t, 0)

	mockFetchResponse := NewMockFetchResponse(t, 1)
	for i := 0; i < 10; i++ {
		mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
	}

	broker0.SetHandlerByMap(map[string]MockResponse{
		"MetadataRequest": NewMockMetadataResponse(t).
			SetBroker(broker0.Addr(), broker0.BrokerID()).
			SetLeader("my_topic", 0, broker0.BrokerID()),
		"OffsetRequest": NewMockOffsetResponse(t).
			SetOffset("my_topic", 0, OffsetOldest, 0).
			SetOffset("my_topic", 0, OffsetNewest, 0),
		"FetchRequest": mockFetchResponse,
	})
	config := NewConfig()
	config.Consumer.Interceptors = interceptors
	// When
	master, err := NewConsumer([]string{broker0.Addr()}, config)
	if err != nil {
		t.Fatal(err)
	}

	consumer, err := master.ConsumePartition("my_topic", 0, 0)
	if err != nil {
		t.Fatal(err)
	}

	for i := 0; i < 10; i++ {
		select {
		case msg := <-consumer.Messages():
			expectationFn(t, i, msg)
		case err := <-consumer.Errors():
			t.Error(err)
		}
	}

	safeClose(t, consumer)
	safeClose(t, master)
	broker0.Close()
}

func TestConsumerInterceptors(t *testing.T) {
	tests := []struct {
		name          string
		interceptors  []ConsumerInterceptor
		expectationFn func(*testing.T, int, *ConsumerMessage)
	}{
		{
			name:         "intercept messages",
			interceptors: []ConsumerInterceptor{&appendInterceptor{i: 0}},
			expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
				ev, _ := testMsg.Encode()
				expected := string(ev) + strconv.Itoa(i)
				v := string(msg.Value)
				if v != expected {
					t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
				}
			},
		},
		{
			name:         "interceptor chain",
			interceptors: []ConsumerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 1000}},
			expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
				ev, _ := testMsg.Encode()
				expected := string(ev) + strconv.Itoa(i) + strconv.Itoa(i+1000)
				v := string(msg.Value)
				if v != expected {
					t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
				}
			},
		},
		{
			name:         "interceptor chain with one interceptor failing",
			interceptors: []ConsumerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: 1000}},
			expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
				ev, _ := testMsg.Encode()
				expected := string(ev) + strconv.Itoa(i+1000)
				v := string(msg.Value)
				if v != expected {
					t.Errorf("Interceptor should have not changed the value, got %s, expected %s", v, expected)
				}
			},
		},
		{
			name:         "interceptor chain with all interceptors failing",
			interceptors: []ConsumerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: -1}},
			expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) {
				ev, _ := testMsg.Encode()
				expected := string(ev)
				v := string(msg.Value)
				if v != expected {
					t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
				}
			},
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) { testConsumerInterceptor(t, tt.interceptors, tt.expectationFn) })
	}
}