|
@@ -15,9 +15,9 @@ var testMsg = StringEncoder("Foo")
|
|
|
// that offset.
|
|
// that offset.
|
|
|
func TestConsumerOffsetManual(t *testing.T) {
|
|
func TestConsumerOffsetManual(t *testing.T) {
|
|
|
// Given
|
|
// Given
|
|
|
- broker0 := newMockBroker(t, 0)
|
|
|
|
|
|
|
+ broker0 := NewMockBroker(t, 0)
|
|
|
|
|
|
|
|
- mockFetchResponse := newMockFetchResponse(t, 1)
|
|
|
|
|
|
|
+ mockFetchResponse := NewMockFetchResponse(t, 1)
|
|
|
for i := 0; i < 10; i++ {
|
|
for i := 0; i < 10; i++ {
|
|
|
mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg)
|
|
mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg)
|
|
|
}
|
|
}
|
|
@@ -26,7 +26,7 @@ func TestConsumerOffsetManual(t *testing.T) {
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
- "OffsetRequest": newMockOffsetResponse(t).
|
|
|
|
|
|
|
+ "OffsetRequest": NewMockOffsetResponse(t).
|
|
|
SetOffset("my_topic", 0, OffsetOldest, 0).
|
|
SetOffset("my_topic", 0, OffsetOldest, 0).
|
|
|
SetOffset("my_topic", 0, OffsetNewest, 2345),
|
|
SetOffset("my_topic", 0, OffsetNewest, 2345),
|
|
|
"FetchRequest": mockFetchResponse,
|
|
"FetchRequest": mockFetchResponse,
|
|
@@ -63,15 +63,15 @@ func TestConsumerOffsetManual(t *testing.T) {
|
|
|
// newest in its metadata response.
|
|
// newest in its metadata response.
|
|
|
func TestConsumerOffsetNewest(t *testing.T) {
|
|
func TestConsumerOffsetNewest(t *testing.T) {
|
|
|
// Given
|
|
// Given
|
|
|
- broker0 := newMockBroker(t, 0)
|
|
|
|
|
|
|
+ broker0 := NewMockBroker(t, 0)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
- "OffsetRequest": newMockOffsetResponse(t).
|
|
|
|
|
|
|
+ "OffsetRequest": NewMockOffsetResponse(t).
|
|
|
SetOffset("my_topic", 0, OffsetNewest, 10).
|
|
SetOffset("my_topic", 0, OffsetNewest, 10).
|
|
|
SetOffset("my_topic", 0, OffsetOldest, 7),
|
|
SetOffset("my_topic", 0, OffsetOldest, 7),
|
|
|
- "FetchRequest": newMockFetchResponse(t, 1).
|
|
|
|
|
|
|
+ "FetchRequest": NewMockFetchResponse(t, 1).
|
|
|
SetMessage("my_topic", 0, 9, testMsg).
|
|
SetMessage("my_topic", 0, 9, testMsg).
|
|
|
SetMessage("my_topic", 0, 10, testMsg).
|
|
SetMessage("my_topic", 0, 10, testMsg).
|
|
|
SetMessage("my_topic", 0, 11, testMsg).
|
|
SetMessage("my_topic", 0, 11, testMsg).
|
|
@@ -103,15 +103,15 @@ func TestConsumerOffsetNewest(t *testing.T) {
|
|
|
// It is possible to close a partition consumer and create the same anew.
|
|
// It is possible to close a partition consumer and create the same anew.
|
|
|
func TestConsumerRecreate(t *testing.T) {
|
|
func TestConsumerRecreate(t *testing.T) {
|
|
|
// Given
|
|
// Given
|
|
|
- broker0 := newMockBroker(t, 0)
|
|
|
|
|
|
|
+ broker0 := NewMockBroker(t, 0)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
- "OffsetRequest": newMockOffsetResponse(t).
|
|
|
|
|
|
|
+ "OffsetRequest": NewMockOffsetResponse(t).
|
|
|
SetOffset("my_topic", 0, OffsetOldest, 0).
|
|
SetOffset("my_topic", 0, OffsetOldest, 0).
|
|
|
SetOffset("my_topic", 0, OffsetNewest, 1000),
|
|
SetOffset("my_topic", 0, OffsetNewest, 1000),
|
|
|
- "FetchRequest": newMockFetchResponse(t, 1).
|
|
|
|
|
|
|
+ "FetchRequest": NewMockFetchResponse(t, 1).
|
|
|
SetMessage("my_topic", 0, 10, testMsg),
|
|
SetMessage("my_topic", 0, 10, testMsg),
|
|
|
})
|
|
})
|
|
|
|
|
|
|
@@ -144,15 +144,15 @@ func TestConsumerRecreate(t *testing.T) {
|
|
|
// An attempt to consume the same partition twice should fail.
|
|
// An attempt to consume the same partition twice should fail.
|
|
|
func TestConsumerDuplicate(t *testing.T) {
|
|
func TestConsumerDuplicate(t *testing.T) {
|
|
|
// Given
|
|
// Given
|
|
|
- broker0 := newMockBroker(t, 0)
|
|
|
|
|
|
|
+ broker0 := NewMockBroker(t, 0)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
- "OffsetRequest": newMockOffsetResponse(t).
|
|
|
|
|
|
|
+ "OffsetRequest": NewMockOffsetResponse(t).
|
|
|
SetOffset("my_topic", 0, OffsetOldest, 0).
|
|
SetOffset("my_topic", 0, OffsetOldest, 0).
|
|
|
SetOffset("my_topic", 0, OffsetNewest, 1000),
|
|
SetOffset("my_topic", 0, OffsetNewest, 1000),
|
|
|
- "FetchRequest": newMockFetchResponse(t, 1),
|
|
|
|
|
|
|
+ "FetchRequest": NewMockFetchResponse(t, 1),
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
config := NewConfig()
|
|
config := NewConfig()
|
|
@@ -184,7 +184,7 @@ func TestConsumerDuplicate(t *testing.T) {
|
|
|
// specified by `Config.Consumer.Retry.Backoff`.
|
|
// specified by `Config.Consumer.Retry.Backoff`.
|
|
|
func TestConsumerLeaderRefreshError(t *testing.T) {
|
|
func TestConsumerLeaderRefreshError(t *testing.T) {
|
|
|
// Given
|
|
// Given
|
|
|
- broker0 := newMockBroker(t, 100)
|
|
|
|
|
|
|
+ broker0 := NewMockBroker(t, 100)
|
|
|
|
|
|
|
|
// Stage 1: my_topic/0 served by broker0
|
|
// Stage 1: my_topic/0 served by broker0
|
|
|
Logger.Printf(" STAGE 1")
|
|
Logger.Printf(" STAGE 1")
|
|
@@ -193,10 +193,10 @@ func TestConsumerLeaderRefreshError(t *testing.T) {
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
- "OffsetRequest": newMockOffsetResponse(t).
|
|
|
|
|
|
|
+ "OffsetRequest": NewMockOffsetResponse(t).
|
|
|
SetOffset("my_topic", 0, OffsetOldest, 123).
|
|
SetOffset("my_topic", 0, OffsetOldest, 123).
|
|
|
SetOffset("my_topic", 0, OffsetNewest, 1000),
|
|
SetOffset("my_topic", 0, OffsetNewest, 1000),
|
|
|
- "FetchRequest": newMockFetchResponse(t, 1).
|
|
|
|
|
|
|
+ "FetchRequest": NewMockFetchResponse(t, 1).
|
|
|
SetMessage("my_topic", 0, 123, testMsg),
|
|
SetMessage("my_topic", 0, 123, testMsg),
|
|
|
})
|
|
})
|
|
|
|
|
|
|
@@ -225,7 +225,7 @@ func TestConsumerLeaderRefreshError(t *testing.T) {
|
|
|
fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
|
|
fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
|
|
|
|
|
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "FetchRequest": newMockWrapper(fetchResponse2),
|
|
|
|
|
|
|
+ "FetchRequest": NewMockWrapper(fetchResponse2),
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
|
|
if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
|
|
@@ -237,10 +237,10 @@ func TestConsumerLeaderRefreshError(t *testing.T) {
|
|
|
|
|
|
|
|
Logger.Printf(" STAGE 3")
|
|
Logger.Printf(" STAGE 3")
|
|
|
|
|
|
|
|
- broker1 := newMockBroker(t, 101)
|
|
|
|
|
|
|
+ broker1 := NewMockBroker(t, 101)
|
|
|
|
|
|
|
|
broker1.SetHandlerByMap(map[string]MockResponse{
|
|
broker1.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "FetchRequest": newMockFetchResponse(t, 1).
|
|
|
|
|
|
|
+ "FetchRequest": NewMockFetchResponse(t, 1).
|
|
|
SetMessage("my_topic", 0, 124, testMsg),
|
|
SetMessage("my_topic", 0, 124, testMsg),
|
|
|
})
|
|
})
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
@@ -260,7 +260,7 @@ func TestConsumerLeaderRefreshError(t *testing.T) {
|
|
|
|
|
|
|
|
func TestConsumerInvalidTopic(t *testing.T) {
|
|
func TestConsumerInvalidTopic(t *testing.T) {
|
|
|
// Given
|
|
// Given
|
|
|
- broker0 := newMockBroker(t, 100)
|
|
|
|
|
|
|
+ broker0 := NewMockBroker(t, 100)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()),
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()),
|
|
@@ -287,15 +287,15 @@ func TestConsumerInvalidTopic(t *testing.T) {
|
|
|
// the moment is closed.
|
|
// the moment is closed.
|
|
|
func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
|
|
func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
|
|
|
// Given
|
|
// Given
|
|
|
- broker0 := newMockBroker(t, 100)
|
|
|
|
|
|
|
+ broker0 := NewMockBroker(t, 100)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
- "OffsetRequest": newMockOffsetResponse(t).
|
|
|
|
|
|
|
+ "OffsetRequest": NewMockOffsetResponse(t).
|
|
|
SetOffset("my_topic", 0, OffsetOldest, 123).
|
|
SetOffset("my_topic", 0, OffsetOldest, 123).
|
|
|
SetOffset("my_topic", 0, OffsetNewest, 1000),
|
|
SetOffset("my_topic", 0, OffsetNewest, 1000),
|
|
|
- "FetchRequest": newMockFetchResponse(t, 1).
|
|
|
|
|
|
|
+ "FetchRequest": NewMockFetchResponse(t, 1).
|
|
|
SetMessage("my_topic", 0, 123, testMsg),
|
|
SetMessage("my_topic", 0, 123, testMsg),
|
|
|
})
|
|
})
|
|
|
|
|
|
|
@@ -322,7 +322,7 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
|
|
|
fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
|
|
fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
|
|
|
|
|
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "FetchRequest": newMockWrapper(fetchResponse2),
|
|
|
|
|
|
|
+ "FetchRequest": NewMockWrapper(fetchResponse2),
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
// When
|
|
// When
|
|
@@ -341,17 +341,17 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
|
|
|
// immediately closing its output channels.
|
|
// immediately closing its output channels.
|
|
|
func TestConsumerShutsDownOutOfRange(t *testing.T) {
|
|
func TestConsumerShutsDownOutOfRange(t *testing.T) {
|
|
|
// Given
|
|
// Given
|
|
|
- broker0 := newMockBroker(t, 0)
|
|
|
|
|
|
|
+ broker0 := NewMockBroker(t, 0)
|
|
|
fetchResponse := new(FetchResponse)
|
|
fetchResponse := new(FetchResponse)
|
|
|
fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
|
|
fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
- "OffsetRequest": newMockOffsetResponse(t).
|
|
|
|
|
|
|
+ "OffsetRequest": NewMockOffsetResponse(t).
|
|
|
SetOffset("my_topic", 0, OffsetNewest, 1234).
|
|
SetOffset("my_topic", 0, OffsetNewest, 1234).
|
|
|
SetOffset("my_topic", 0, OffsetOldest, 7),
|
|
SetOffset("my_topic", 0, OffsetOldest, 7),
|
|
|
- "FetchRequest": newMockWrapper(fetchResponse),
|
|
|
|
|
|
|
+ "FetchRequest": NewMockWrapper(fetchResponse),
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
master, err := NewConsumer([]string{broker0.Addr()}, nil)
|
|
master, err := NewConsumer([]string{broker0.Addr()}, nil)
|
|
@@ -379,7 +379,7 @@ func TestConsumerShutsDownOutOfRange(t *testing.T) {
|
|
|
// requested, then such messages are ignored.
|
|
// requested, then such messages are ignored.
|
|
|
func TestConsumerExtraOffsets(t *testing.T) {
|
|
func TestConsumerExtraOffsets(t *testing.T) {
|
|
|
// Given
|
|
// Given
|
|
|
- broker0 := newMockBroker(t, 0)
|
|
|
|
|
|
|
+ broker0 := NewMockBroker(t, 0)
|
|
|
fetchResponse1 := &FetchResponse{}
|
|
fetchResponse1 := &FetchResponse{}
|
|
|
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
|
|
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
|
|
|
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
|
|
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)
|
|
@@ -391,10 +391,10 @@ func TestConsumerExtraOffsets(t *testing.T) {
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
- "OffsetRequest": newMockOffsetResponse(t).
|
|
|
|
|
|
|
+ "OffsetRequest": NewMockOffsetResponse(t).
|
|
|
SetOffset("my_topic", 0, OffsetNewest, 1234).
|
|
SetOffset("my_topic", 0, OffsetNewest, 1234).
|
|
|
SetOffset("my_topic", 0, OffsetOldest, 0),
|
|
SetOffset("my_topic", 0, OffsetOldest, 0),
|
|
|
- "FetchRequest": newMockSequence(fetchResponse1, fetchResponse2),
|
|
|
|
|
|
|
+ "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
master, err := NewConsumer([]string{broker0.Addr()}, nil)
|
|
master, err := NewConsumer([]string{broker0.Addr()}, nil)
|
|
@@ -422,7 +422,7 @@ func TestConsumerExtraOffsets(t *testing.T) {
|
|
|
// strictly increasing!).
|
|
// strictly increasing!).
|
|
|
func TestConsumerNonSequentialOffsets(t *testing.T) {
|
|
func TestConsumerNonSequentialOffsets(t *testing.T) {
|
|
|
// Given
|
|
// Given
|
|
|
- broker0 := newMockBroker(t, 0)
|
|
|
|
|
|
|
+ broker0 := NewMockBroker(t, 0)
|
|
|
fetchResponse1 := &FetchResponse{}
|
|
fetchResponse1 := &FetchResponse{}
|
|
|
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 5)
|
|
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 5)
|
|
|
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 7)
|
|
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 7)
|
|
@@ -433,10 +433,10 @@ func TestConsumerNonSequentialOffsets(t *testing.T) {
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
- "OffsetRequest": newMockOffsetResponse(t).
|
|
|
|
|
|
|
+ "OffsetRequest": NewMockOffsetResponse(t).
|
|
|
SetOffset("my_topic", 0, OffsetNewest, 1234).
|
|
SetOffset("my_topic", 0, OffsetNewest, 1234).
|
|
|
SetOffset("my_topic", 0, OffsetOldest, 0),
|
|
SetOffset("my_topic", 0, OffsetOldest, 0),
|
|
|
- "FetchRequest": newMockSequence(fetchResponse1, fetchResponse2),
|
|
|
|
|
|
|
+ "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
master, err := NewConsumer([]string{broker0.Addr()}, nil)
|
|
master, err := NewConsumer([]string{broker0.Addr()}, nil)
|
|
@@ -465,9 +465,9 @@ func TestConsumerNonSequentialOffsets(t *testing.T) {
|
|
|
// leader and switches to it.
|
|
// leader and switches to it.
|
|
|
func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
// initial setup
|
|
// initial setup
|
|
|
- seedBroker := newMockBroker(t, 10)
|
|
|
|
|
- leader0 := newMockBroker(t, 0)
|
|
|
|
|
- leader1 := newMockBroker(t, 1)
|
|
|
|
|
|
|
+ seedBroker := NewMockBroker(t, 10)
|
|
|
|
|
+ leader0 := NewMockBroker(t, 0)
|
|
|
|
|
+ leader1 := NewMockBroker(t, 1)
|
|
|
|
|
|
|
|
seedBroker.SetHandlerByMap(map[string]MockResponse{
|
|
seedBroker.SetHandlerByMap(map[string]MockResponse{
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
@@ -477,18 +477,18 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
SetLeader("my_topic", 1, leader1.BrokerID()),
|
|
SetLeader("my_topic", 1, leader1.BrokerID()),
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
- mockOffsetResponse1 := newMockOffsetResponse(t).
|
|
|
|
|
|
|
+ mockOffsetResponse1 := NewMockOffsetResponse(t).
|
|
|
SetOffset("my_topic", 0, OffsetOldest, 0).
|
|
SetOffset("my_topic", 0, OffsetOldest, 0).
|
|
|
SetOffset("my_topic", 0, OffsetNewest, 1000).
|
|
SetOffset("my_topic", 0, OffsetNewest, 1000).
|
|
|
SetOffset("my_topic", 1, OffsetOldest, 0).
|
|
SetOffset("my_topic", 1, OffsetOldest, 0).
|
|
|
SetOffset("my_topic", 1, OffsetNewest, 1000)
|
|
SetOffset("my_topic", 1, OffsetNewest, 1000)
|
|
|
leader0.SetHandlerByMap(map[string]MockResponse{
|
|
leader0.SetHandlerByMap(map[string]MockResponse{
|
|
|
"OffsetRequest": mockOffsetResponse1,
|
|
"OffsetRequest": mockOffsetResponse1,
|
|
|
- "FetchRequest": newMockFetchResponse(t, 1),
|
|
|
|
|
|
|
+ "FetchRequest": NewMockFetchResponse(t, 1),
|
|
|
})
|
|
})
|
|
|
leader1.SetHandlerByMap(map[string]MockResponse{
|
|
leader1.SetHandlerByMap(map[string]MockResponse{
|
|
|
"OffsetRequest": mockOffsetResponse1,
|
|
"OffsetRequest": mockOffsetResponse1,
|
|
|
- "FetchRequest": newMockFetchResponse(t, 1),
|
|
|
|
|
|
|
+ "FetchRequest": NewMockFetchResponse(t, 1),
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
// launch test goroutines
|
|
// launch test goroutines
|
|
@@ -535,7 +535,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
// * my_topic/0 -> leader0 serves 4 messages
|
|
// * my_topic/0 -> leader0 serves 4 messages
|
|
|
// * my_topic/1 -> leader1 serves 0 messages
|
|
// * my_topic/1 -> leader1 serves 0 messages
|
|
|
|
|
|
|
|
- mockFetchResponse := newMockFetchResponse(t, 1)
|
|
|
|
|
|
|
+ mockFetchResponse := NewMockFetchResponse(t, 1)
|
|
|
for i := 0; i < 4; i++ {
|
|
for i := 0; i < 4; i++ {
|
|
|
mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
|
|
mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg)
|
|
|
}
|
|
}
|
|
@@ -560,7 +560,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
fetchResponse := new(FetchResponse)
|
|
fetchResponse := new(FetchResponse)
|
|
|
fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
|
|
fetchResponse.AddError("my_topic", 0, ErrNotLeaderForPartition)
|
|
|
leader0.SetHandlerByMap(map[string]MockResponse{
|
|
leader0.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "FetchRequest": newMockWrapper(fetchResponse),
|
|
|
|
|
|
|
+ "FetchRequest": NewMockWrapper(fetchResponse),
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
time.Sleep(50 * time.Millisecond)
|
|
time.Sleep(50 * time.Millisecond)
|
|
@@ -570,7 +570,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
// * my_topic/1 -> leader1 server 8 messages
|
|
// * my_topic/1 -> leader1 server 8 messages
|
|
|
|
|
|
|
|
// leader1 provides 3 message on partition 0, and 8 messages on partition 1
|
|
// leader1 provides 3 message on partition 0, and 8 messages on partition 1
|
|
|
- mockFetchResponse2 := newMockFetchResponse(t, 2)
|
|
|
|
|
|
|
+ mockFetchResponse2 := NewMockFetchResponse(t, 2)
|
|
|
for i := 4; i < 7; i++ {
|
|
for i := 4; i < 7; i++ {
|
|
|
mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg)
|
|
mockFetchResponse2.SetMessage("my_topic", 0, int64(i), testMsg)
|
|
|
}
|
|
}
|
|
@@ -596,18 +596,18 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
// leader1 provides three more messages on partition0, says no longer leader of partition1
|
|
// leader1 provides three more messages on partition0, says no longer leader of partition1
|
|
|
- mockFetchResponse3 := newMockFetchResponse(t, 3).
|
|
|
|
|
|
|
+ mockFetchResponse3 := NewMockFetchResponse(t, 3).
|
|
|
SetMessage("my_topic", 0, int64(7), testMsg).
|
|
SetMessage("my_topic", 0, int64(7), testMsg).
|
|
|
SetMessage("my_topic", 0, int64(8), testMsg).
|
|
SetMessage("my_topic", 0, int64(8), testMsg).
|
|
|
SetMessage("my_topic", 0, int64(9), testMsg)
|
|
SetMessage("my_topic", 0, int64(9), testMsg)
|
|
|
fetchResponse4 := new(FetchResponse)
|
|
fetchResponse4 := new(FetchResponse)
|
|
|
fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition)
|
|
fetchResponse4.AddError("my_topic", 1, ErrNotLeaderForPartition)
|
|
|
leader1.SetHandlerByMap(map[string]MockResponse{
|
|
leader1.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "FetchRequest": newMockSequence(mockFetchResponse3, fetchResponse4),
|
|
|
|
|
|
|
+ "FetchRequest": NewMockSequence(mockFetchResponse3, fetchResponse4),
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
// leader0 provides two messages on partition 1
|
|
// leader0 provides two messages on partition 1
|
|
|
- mockFetchResponse4 := newMockFetchResponse(t, 2)
|
|
|
|
|
|
|
+ mockFetchResponse4 := NewMockFetchResponse(t, 2)
|
|
|
for i := 8; i < 10; i++ {
|
|
for i := 8; i < 10; i++ {
|
|
|
mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg)
|
|
mockFetchResponse4.SetMessage("my_topic", 1, int64(i), testMsg)
|
|
|
}
|
|
}
|
|
@@ -627,18 +627,18 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
// read messages by the other consumer.
|
|
// read messages by the other consumer.
|
|
|
func TestConsumerInterleavedClose(t *testing.T) {
|
|
func TestConsumerInterleavedClose(t *testing.T) {
|
|
|
// Given
|
|
// Given
|
|
|
- broker0 := newMockBroker(t, 0)
|
|
|
|
|
|
|
+ broker0 := NewMockBroker(t, 0)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()).
|
|
SetLeader("my_topic", 0, broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 1, broker0.BrokerID()),
|
|
SetLeader("my_topic", 1, broker0.BrokerID()),
|
|
|
- "OffsetRequest": newMockOffsetResponse(t).
|
|
|
|
|
|
|
+ "OffsetRequest": NewMockOffsetResponse(t).
|
|
|
SetOffset("my_topic", 0, OffsetOldest, 1000).
|
|
SetOffset("my_topic", 0, OffsetOldest, 1000).
|
|
|
SetOffset("my_topic", 0, OffsetNewest, 1100).
|
|
SetOffset("my_topic", 0, OffsetNewest, 1100).
|
|
|
SetOffset("my_topic", 1, OffsetOldest, 2000).
|
|
SetOffset("my_topic", 1, OffsetOldest, 2000).
|
|
|
SetOffset("my_topic", 1, OffsetNewest, 2100),
|
|
SetOffset("my_topic", 1, OffsetNewest, 2100),
|
|
|
- "FetchRequest": newMockFetchResponse(t, 1).
|
|
|
|
|
|
|
+ "FetchRequest": NewMockFetchResponse(t, 1).
|
|
|
SetMessage("my_topic", 0, 1000, testMsg).
|
|
SetMessage("my_topic", 0, 1000, testMsg).
|
|
|
SetMessage("my_topic", 0, 1001, testMsg).
|
|
SetMessage("my_topic", 0, 1001, testMsg).
|
|
|
SetMessage("my_topic", 0, 1002, testMsg).
|
|
SetMessage("my_topic", 0, 1002, testMsg).
|
|
@@ -674,9 +674,9 @@ func TestConsumerInterleavedClose(t *testing.T) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func TestConsumerBounceWithReferenceOpen(t *testing.T) {
|
|
func TestConsumerBounceWithReferenceOpen(t *testing.T) {
|
|
|
- broker0 := newMockBroker(t, 0)
|
|
|
|
|
|
|
+ broker0 := NewMockBroker(t, 0)
|
|
|
broker0Addr := broker0.Addr()
|
|
broker0Addr := broker0.Addr()
|
|
|
- broker1 := newMockBroker(t, 1)
|
|
|
|
|
|
|
+ broker1 := NewMockBroker(t, 1)
|
|
|
|
|
|
|
|
mockMetadataResponse := newMockMetadataResponse(t).
|
|
mockMetadataResponse := newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
@@ -684,13 +684,13 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()).
|
|
SetLeader("my_topic", 0, broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 1, broker1.BrokerID())
|
|
SetLeader("my_topic", 1, broker1.BrokerID())
|
|
|
|
|
|
|
|
- mockOffsetResponse := newMockOffsetResponse(t).
|
|
|
|
|
|
|
+ mockOffsetResponse := NewMockOffsetResponse(t).
|
|
|
SetOffset("my_topic", 0, OffsetOldest, 1000).
|
|
SetOffset("my_topic", 0, OffsetOldest, 1000).
|
|
|
SetOffset("my_topic", 0, OffsetNewest, 1100).
|
|
SetOffset("my_topic", 0, OffsetNewest, 1100).
|
|
|
SetOffset("my_topic", 1, OffsetOldest, 2000).
|
|
SetOffset("my_topic", 1, OffsetOldest, 2000).
|
|
|
SetOffset("my_topic", 1, OffsetNewest, 2100)
|
|
SetOffset("my_topic", 1, OffsetNewest, 2100)
|
|
|
|
|
|
|
|
- mockFetchResponse := newMockFetchResponse(t, 1)
|
|
|
|
|
|
|
+ mockFetchResponse := NewMockFetchResponse(t, 1)
|
|
|
for i := 0; i < 10; i++ {
|
|
for i := 0; i < 10; i++ {
|
|
|
mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg)
|
|
mockFetchResponse.SetMessage("my_topic", 0, int64(1000+i), testMsg)
|
|
|
mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg)
|
|
mockFetchResponse.SetMessage("my_topic", 1, int64(2000+i), testMsg)
|
|
@@ -745,7 +745,7 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Bring broker0 back to service.
|
|
// Bring broker0 back to service.
|
|
|
- broker0 = newMockBrokerAddr(t, 0, broker0Addr)
|
|
|
|
|
|
|
+ broker0 = NewMockBrokerAddr(t, 0, broker0Addr)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
"FetchRequest": mockFetchResponse,
|
|
"FetchRequest": mockFetchResponse,
|
|
|
})
|
|
})
|
|
@@ -773,12 +773,12 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
|
|
|
|
|
|
|
|
func TestConsumerOffsetOutOfRange(t *testing.T) {
|
|
func TestConsumerOffsetOutOfRange(t *testing.T) {
|
|
|
// Given
|
|
// Given
|
|
|
- broker0 := newMockBroker(t, 2)
|
|
|
|
|
|
|
+ broker0 := NewMockBroker(t, 2)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
- "OffsetRequest": newMockOffsetResponse(t).
|
|
|
|
|
|
|
+ "OffsetRequest": NewMockOffsetResponse(t).
|
|
|
SetOffset("my_topic", 0, OffsetNewest, 1234).
|
|
SetOffset("my_topic", 0, OffsetNewest, 1234).
|
|
|
SetOffset("my_topic", 0, OffsetOldest, 2345),
|
|
SetOffset("my_topic", 0, OffsetOldest, 2345),
|
|
|
})
|
|
})
|