|
|
@@ -23,7 +23,7 @@ func TestConsumerOffsetManual(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
"OffsetRequest": NewMockOffsetResponse(t).
|
|
|
@@ -65,7 +65,7 @@ func TestConsumerOffsetNewest(t *testing.T) {
|
|
|
// Given
|
|
|
broker0 := NewMockBroker(t, 0)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
"OffsetRequest": NewMockOffsetResponse(t).
|
|
|
@@ -105,7 +105,7 @@ func TestConsumerRecreate(t *testing.T) {
|
|
|
// Given
|
|
|
broker0 := NewMockBroker(t, 0)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
"OffsetRequest": NewMockOffsetResponse(t).
|
|
|
@@ -146,7 +146,7 @@ func TestConsumerDuplicate(t *testing.T) {
|
|
|
// Given
|
|
|
broker0 := NewMockBroker(t, 0)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
"OffsetRequest": NewMockOffsetResponse(t).
|
|
|
@@ -190,7 +190,7 @@ func TestConsumerLeaderRefreshError(t *testing.T) {
|
|
|
Logger.Printf(" STAGE 1")
|
|
|
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
"OffsetRequest": NewMockOffsetResponse(t).
|
|
|
@@ -244,7 +244,7 @@ func TestConsumerLeaderRefreshError(t *testing.T) {
|
|
|
SetMessage("my_topic", 0, 124, testMsg),
|
|
|
})
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetBroker(broker1.Addr(), broker1.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker1.BrokerID()),
|
|
|
@@ -262,7 +262,7 @@ func TestConsumerInvalidTopic(t *testing.T) {
|
|
|
// Given
|
|
|
broker0 := NewMockBroker(t, 100)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()),
|
|
|
})
|
|
|
|
|
|
@@ -289,7 +289,7 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
|
|
|
// Given
|
|
|
broker0 := NewMockBroker(t, 100)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
"OffsetRequest": NewMockOffsetResponse(t).
|
|
|
@@ -345,7 +345,7 @@ func TestConsumerShutsDownOutOfRange(t *testing.T) {
|
|
|
fetchResponse := new(FetchResponse)
|
|
|
fetchResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
"OffsetRequest": NewMockOffsetResponse(t).
|
|
|
@@ -388,7 +388,7 @@ func TestConsumerExtraOffsets(t *testing.T) {
|
|
|
fetchResponse2 := &FetchResponse{}
|
|
|
fetchResponse2.AddError("my_topic", 0, ErrNoError)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
"OffsetRequest": NewMockOffsetResponse(t).
|
|
|
@@ -430,7 +430,7 @@ func TestConsumerNonSequentialOffsets(t *testing.T) {
|
|
|
fetchResponse2 := &FetchResponse{}
|
|
|
fetchResponse2.AddError("my_topic", 0, ErrNoError)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
"OffsetRequest": NewMockOffsetResponse(t).
|
|
|
@@ -470,7 +470,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
leader1 := NewMockBroker(t, 1)
|
|
|
|
|
|
seedBroker.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
SetBroker(leader0.Addr(), leader0.BrokerID()).
|
|
|
SetBroker(leader1.Addr(), leader1.BrokerID()).
|
|
|
SetLeader("my_topic", 0, leader0.BrokerID()).
|
|
|
@@ -551,7 +551,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
|
|
|
// seed broker tells that the new partition 0 leader is leader1
|
|
|
seedBroker.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
SetLeader("my_topic", 0, leader1.BrokerID()).
|
|
|
SetLeader("my_topic", 1, leader1.BrokerID()),
|
|
|
})
|
|
|
@@ -590,7 +590,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
|
|
|
// metadata assigns 0 to leader1 and 1 to leader0
|
|
|
seedBroker.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
SetLeader("my_topic", 0, leader1.BrokerID()).
|
|
|
SetLeader("my_topic", 1, leader0.BrokerID()),
|
|
|
})
|
|
|
@@ -629,7 +629,7 @@ func TestConsumerInterleavedClose(t *testing.T) {
|
|
|
// Given
|
|
|
broker0 := NewMockBroker(t, 0)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 1, broker0.BrokerID()),
|
|
|
@@ -678,7 +678,7 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
|
|
|
broker0Addr := broker0.Addr()
|
|
|
broker1 := NewMockBroker(t, 1)
|
|
|
|
|
|
- mockMetadataResponse := newMockMetadataResponse(t).
|
|
|
+ mockMetadataResponse := NewMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetBroker(broker1.Addr(), broker1.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()).
|
|
|
@@ -775,7 +775,7 @@ func TestConsumerOffsetOutOfRange(t *testing.T) {
|
|
|
// Given
|
|
|
broker0 := NewMockBroker(t, 2)
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "MetadataRequest": newMockMetadataResponse(t).
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
"OffsetRequest": NewMockOffsetResponse(t).
|