|
|
@@ -342,24 +342,16 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
|
|
|
func TestConsumerShutsDownOutOfRange(t *testing.T) {
|
|
|
// Given
|
|
|
broker0 := newMockBroker(t, 0)
|
|
|
- broker0.SetHandler(func(req *request) (res encoder) {
|
|
|
- switch reqBody := req.body.(type) {
|
|
|
- case *MetadataRequest:
|
|
|
- return newMockMetadataResponse(t).
|
|
|
- SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
- SetLeader("my_topic", 0, broker0.BrokerID()).
|
|
|
- For(reqBody)
|
|
|
- case *OffsetRequest:
|
|
|
- return newMockOffsetResponse(t).
|
|
|
- SetOffset("my_topic", 0, OffsetNewest, 1234).
|
|
|
- SetOffset("my_topic", 0, OffsetOldest, 7).
|
|
|
- For(reqBody)
|
|
|
- case *FetchRequest:
|
|
|
- fetchResponse := new(FetchResponse)
|
|
|
- fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
|
|
|
- return fetchResponse
|
|
|
- }
|
|
|
- return nil
|
|
|
+ fetchResponse := new(FetchResponse)
|
|
|
+ fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
|
|
|
+ broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
+ "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
+ SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
+ "OffsetRequest": newMockOffsetResponse(t).
|
|
|
+ SetOffset("my_topic", 0, OffsetNewest, 1234).
|
|
|
+ SetOffset("my_topic", 0, OffsetOldest, 7),
|
|
|
+ "FetchRequest": newMockWrapper(fetchResponse),
|
|
|
})
|
|
|
|
|
|
master, err := NewConsumer([]string{broker0.Addr()}, nil)
|
|
|
@@ -388,31 +380,21 @@ func TestConsumerShutsDownOutOfRange(t *testing.T) {
|
|
|
func TestConsumerExtraOffsets(t *testing.T) {
|
|
|
// Given
|
|
|
broker0 := newMockBroker(t, 0)
|
|
|
- called := 0
|
|
|
- broker0.SetHandler(func(req *request) (res encoder) {
|
|
|
- switch req.body.(type) {
|
|
|
- case *MetadataRequest:
|
|
|
- return newMockMetadataResponse(t).
|
|
|
- SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
- SetLeader("my_topic", 0, broker0.BrokerID()).For(req.body)
|
|
|
- case *OffsetRequest:
|
|
|
- return newMockOffsetResponse(t).
|
|
|
- SetOffset("my_topic", 0, OffsetNewest, 1234).
|
|
|
- SetOffset("my_topic", 0, OffsetOldest, 0).For(req.body)
|
|
|
- case *FetchRequest:
|
|
|
- fetchResponse := &FetchResponse{}
|
|
|
- called++
|
|
|
- if called > 1 {
|
|
|
- fetchResponse.AddError("my_topic", 0, ErrNoError)
|
|
|
- return fetchResponse
|
|
|
- }
|
|
|
- fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 1)
|
|
|
- fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
|
|
|
- fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
|
|
|
- fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
|
|
|
- return fetchResponse
|
|
|
- }
|
|
|
- return nil
|
|
|
+ fetchResponse1 := &FetchResponse{}
|
|
|
+ fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
|
|
|
+ fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
|
|
|
+ fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 3)
|
|
|
+ fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 4)
|
|
|
+ fetchResponse2 := &FetchResponse{}
|
|
|
+ fetchResponse2.AddError("my_topic", 0, ErrNoError)
|
|
|
+ broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
+ "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
+ SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
+ "OffsetRequest": newMockOffsetResponse(t).
|
|
|
+ SetOffset("my_topic", 0, OffsetNewest, 1234).
|
|
|
+ SetOffset("my_topic", 0, OffsetOldest, 0),
|
|
|
+ "FetchRequest": newMockSequence(fetchResponse1, fetchResponse2),
|
|
|
})
|
|
|
|
|
|
master, err := NewConsumer([]string{broker0.Addr()}, nil)
|
|
|
@@ -441,30 +423,20 @@ func TestConsumerExtraOffsets(t *testing.T) {
|
|
|
func TestConsumerNonSequentialOffsets(t *testing.T) {
|
|
|
// Given
|
|
|
broker0 := newMockBroker(t, 0)
|
|
|
- called := 0
|
|
|
- broker0.SetHandler(func(req *request) (res encoder) {
|
|
|
- switch req.body.(type) {
|
|
|
- case *MetadataRequest:
|
|
|
- return newMockMetadataResponse(t).
|
|
|
- SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
- SetLeader("my_topic", 0, broker0.BrokerID()).For(req.body)
|
|
|
- case *OffsetRequest:
|
|
|
- return newMockOffsetResponse(t).
|
|
|
- SetOffset("my_topic", 0, OffsetNewest, 1234).
|
|
|
- SetOffset("my_topic", 0, OffsetOldest, 0).For(req.body)
|
|
|
- case *FetchRequest:
|
|
|
- called++
|
|
|
- fetchResponse := &FetchResponse{}
|
|
|
- if called > 1 {
|
|
|
- fetchResponse.AddError("my_topic", 0, ErrNoError)
|
|
|
- return fetchResponse
|
|
|
- }
|
|
|
- fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
|
|
|
- fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
|
|
|
- fetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
|
|
|
- return fetchResponse
|
|
|
- }
|
|
|
- return nil
|
|
|
+ fetchResponse1 := &FetchResponse{}
|
|
|
+ fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 5)
|
|
|
+ fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 7)
|
|
|
+ fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 11)
|
|
|
+ fetchResponse2 := &FetchResponse{}
|
|
|
+ fetchResponse2.AddError("my_topic", 0, ErrNoError)
|
|
|
+ broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
+ "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
+ SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
+ "OffsetRequest": newMockOffsetResponse(t).
|
|
|
+ SetOffset("my_topic", 0, OffsetNewest, 1234).
|
|
|
+ SetOffset("my_topic", 0, OffsetOldest, 0),
|
|
|
+ "FetchRequest": newMockSequence(fetchResponse1, fetchResponse2),
|
|
|
})
|
|
|
|
|
|
master, err := NewConsumer([]string{broker0.Addr()}, nil)
|
|
|
@@ -585,14 +557,10 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
})
|
|
|
|
|
|
// leader0 says no longer leader of partition 0
|
|
|
- leader0.SetHandler(func(req *request) (res encoder) {
|
|
|
- switch req.body.(type) {
|
|
|
- case *FetchRequest:
|
|
|
- fetchResponse := new(FetchResponse)
|
|
|
- fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
|
|
|
- return fetchResponse
|
|
|
- }
|
|
|
- return nil
|
|
|
+ fetchResponse := new(FetchResponse)
|
|
|
+ fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
|
|
|
+ leader0.SetHandlerByMap(map[string]MockResponse{
|
|
|
+ "FetchRequest": newMockWrapper(fetchResponse),
|
|
|
})
|
|
|
|
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
@@ -632,15 +600,10 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
SetMessage("my_topic", 0, int64(7), testMsg).
|
|
|
SetMessage("my_topic", 0, int64(8), testMsg).
|
|
|
SetMessage("my_topic", 0, int64(9), testMsg)
|
|
|
- leader1.SetHandler(func(req *request) (res encoder) {
|
|
|
- switch reqBody := req.body.(type) {
|
|
|
- case *FetchRequest:
|
|
|
- res := mockFetchResponse3.For(reqBody).(*FetchResponse)
|
|
|
- res.AddError("my_topic", 1, ErrNotLeaderForPartition)
|
|
|
- return res
|
|
|
-
|
|
|
- }
|
|
|
- return nil
|
|
|
+ fetchResponse4 := new(FetchResponse)
|
|
|
+ fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition)
|
|
|
+ leader1.SetHandlerByMap(map[string]MockResponse{
|
|
|
+ "FetchRequest": newMockSequence(mockFetchResponse3, fetchResponse4),
|
|
|
})
|
|
|
|
|
|
// leader0 provides two messages on partition 1
|