|
@@ -13,7 +13,6 @@ var testMsg = StringEncoder("Foo")
|
|
|
func TestConsumerOffsetManual(t *testing.T) {
|
|
|
|
|
|
broker0 := newMockBroker(t, 0)
|
|
|
- defer broker0.Close()
|
|
|
|
|
|
mockFetchResponse := newMockFetchResponse(t, 1)
|
|
|
for i := 0; i < 10; i++ {
|
|
@@ -53,16 +52,15 @@ func TestConsumerOffsetManual(t *testing.T) {
|
|
|
|
|
|
safeClose(t, consumer)
|
|
|
safeClose(t, master)
|
|
|
+ broker0.Close()
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
-
|
|
|
+
|
|
|
func TestConsumerOffsetNewest(t *testing.T) {
|
|
|
|
|
|
broker0 := newMockBroker(t, 0)
|
|
|
- defer broker0.Close()
|
|
|
-
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
@@ -96,14 +94,13 @@ func TestConsumerOffsetNewest(t *testing.T) {
|
|
|
|
|
|
safeClose(t, consumer)
|
|
|
safeClose(t, master)
|
|
|
+ broker0.Close()
|
|
|
}
|
|
|
|
|
|
|
|
|
func TestConsumerRecreate(t *testing.T) {
|
|
|
|
|
|
broker0 := newMockBroker(t, 0)
|
|
|
- defer broker0.Close()
|
|
|
-
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
@@ -138,14 +135,13 @@ func TestConsumerRecreate(t *testing.T) {
|
|
|
|
|
|
safeClose(t, pc)
|
|
|
safeClose(t, c)
|
|
|
+ broker0.Close()
|
|
|
}
|
|
|
|
|
|
|
|
|
func TestConsumerDuplicate(t *testing.T) {
|
|
|
|
|
|
broker0 := newMockBroker(t, 0)
|
|
|
- defer broker0.Close()
|
|
|
-
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
@@ -178,14 +174,14 @@ func TestConsumerDuplicate(t *testing.T) {
|
|
|
|
|
|
safeClose(t, pc1)
|
|
|
safeClose(t, c)
|
|
|
+ broker0.Close()
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
func TestConsumerLeaderRefreshError(t *testing.T) {
|
|
|
|
|
|
broker0 := newMockBroker(t, 100)
|
|
|
- defer broker0.Close()
|
|
|
|
|
|
|
|
|
Logger.Printf(" STAGE 1")
|
|
@@ -203,7 +199,8 @@ func TestConsumerLeaderRefreshError(t *testing.T) {
|
|
|
|
|
|
config := NewConfig()
|
|
|
config.Net.ReadTimeout = 100 * time.Millisecond
|
|
|
- config.Consumer.Retry.Backoff = 500 * time.Millisecond
|
|
|
+ config.Consumer.Retry.Backoff = 200 * time.Millisecond
|
|
|
+ config.Consumer.Return.Errors = true
|
|
|
config.Metadata.Retry.Max = 0
|
|
|
c, err := NewConsumer([]string{broker0.Addr()}, config)
|
|
|
if err != nil {
|
|
@@ -212,7 +209,7 @@ func TestConsumerLeaderRefreshError(t *testing.T) {
|
|
|
|
|
|
pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
|
|
|
if err != nil {
|
|
|
- t.Errorf("Failed to create a partition consumer, err=%v", err)
|
|
|
+ t.Fatal(err)
|
|
|
}
|
|
|
|
|
|
assertMessageOffset(t, <-pc.Messages(), 123)
|
|
@@ -228,17 +225,16 @@ func TestConsumerLeaderRefreshError(t *testing.T) {
|
|
|
"FetchRequest": newMockWrapper(fetchResponse2),
|
|
|
})
|
|
|
|
|
|
+ if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
|
|
|
+ t.Errorf("Unexpected error: %v", consErr.Err)
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- time.Sleep(300 * time.Millisecond)
|
|
|
Logger.Printf(" STAGE 3")
|
|
|
|
|
|
broker1 := newMockBroker(t, 101)
|
|
|
- defer broker1.Close()
|
|
|
|
|
|
broker1.SetHandlerByMap(map[string]MockResponse{
|
|
|
"FetchRequest": newMockFetchResponse(t, 1).
|
|
@@ -255,13 +251,13 @@ func TestConsumerLeaderRefreshError(t *testing.T) {
|
|
|
|
|
|
safeClose(t, pc)
|
|
|
safeClose(t, c)
|
|
|
+ broker1.Close()
|
|
|
+ broker0.Close()
|
|
|
}
|
|
|
|
|
|
func TestConsumerInvalidTopic(t *testing.T) {
|
|
|
|
|
|
broker0 := newMockBroker(t, 100)
|
|
|
- defer broker0.Close()
|
|
|
-
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()),
|
|
@@ -281,6 +277,7 @@ func TestConsumerInvalidTopic(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
safeClose(t, c)
|
|
|
+ broker0.Close()
|
|
|
}
|
|
|
|
|
|
|
|
@@ -288,8 +285,6 @@ func TestConsumerInvalidTopic(t *testing.T) {
|
|
|
func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
|
|
|
|
|
|
broker0 := newMockBroker(t, 100)
|
|
|
- defer broker0.Close()
|
|
|
-
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
@@ -304,6 +299,7 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
|
|
|
config := NewConfig()
|
|
|
config.Net.ReadTimeout = 100 * time.Millisecond
|
|
|
config.Consumer.Retry.Backoff = 100 * time.Millisecond
|
|
|
+ config.Consumer.Return.Errors = true
|
|
|
config.Metadata.Retry.Max = 0
|
|
|
c, err := NewConsumer([]string{broker0.Addr()}, config)
|
|
|
if err != nil {
|
|
@@ -312,7 +308,7 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
|
|
|
|
|
|
pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
|
|
|
if err != nil {
|
|
|
- t.Errorf("Failed to create a partition consumer, err=%v", err)
|
|
|
+ t.Fatal(err)
|
|
|
}
|
|
|
|
|
|
assertMessageOffset(t, <-pc.Messages(), 123)
|
|
@@ -323,22 +319,18 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
|
|
|
fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)
|
|
|
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
- "MetadataRequest": newMockMetadataResponse(t).
|
|
|
- SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
|
- SetLeader("my_topic", 0, broker0.BrokerID()),
|
|
|
"FetchRequest": newMockWrapper(fetchResponse2),
|
|
|
})
|
|
|
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- time.Sleep(200 * time.Millisecond)
|
|
|
+ if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
|
|
|
+ t.Errorf("Unexpected error: %v", consErr.Err)
|
|
|
+ }
|
|
|
|
|
|
|
|
|
safeClose(t, pc)
|
|
|
safeClose(t, c)
|
|
|
+ broker0.Close()
|
|
|
}
|
|
|
|
|
|
|
|
@@ -347,8 +339,6 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
|
|
|
func TestConsumerShutsDownOutOfRange(t *testing.T) {
|
|
|
|
|
|
broker0 := newMockBroker(t, 0)
|
|
|
- defer broker0.Close()
|
|
|
-
|
|
|
broker0.SetHandler(func(req *request) (res encoder) {
|
|
|
switch reqBody := req.body.(type) {
|
|
|
case *MetadataRequest:
|
|
@@ -387,6 +377,7 @@ func TestConsumerShutsDownOutOfRange(t *testing.T) {
|
|
|
safeClose(t, consumer)
|
|
|
|
|
|
safeClose(t, master)
|
|
|
+ broker0.Close()
|
|
|
}
|
|
|
|
|
|
|
|
@@ -394,8 +385,6 @@ func TestConsumerShutsDownOutOfRange(t *testing.T) {
|
|
|
func TestConsumerExtraOffsets(t *testing.T) {
|
|
|
|
|
|
broker0 := newMockBroker(t, 0)
|
|
|
- defer broker0.Close()
|
|
|
-
|
|
|
called := 0
|
|
|
broker0.SetHandler(func(req *request) (res encoder) {
|
|
|
switch req.body.(type) {
|
|
@@ -441,6 +430,7 @@ func TestConsumerExtraOffsets(t *testing.T) {
|
|
|
|
|
|
safeClose(t, consumer)
|
|
|
safeClose(t, master)
|
|
|
+ broker0.Close()
|
|
|
}
|
|
|
|
|
|
|
|
@@ -448,8 +438,6 @@ func TestConsumerExtraOffsets(t *testing.T) {
|
|
|
func TestConsumerNonSequentialOffsets(t *testing.T) {
|
|
|
|
|
|
broker0 := newMockBroker(t, 0)
|
|
|
- defer broker0.Close()
|
|
|
-
|
|
|
called := 0
|
|
|
broker0.SetHandler(func(req *request) (res encoder) {
|
|
|
switch req.body.(type) {
|
|
@@ -495,6 +483,7 @@ func TestConsumerNonSequentialOffsets(t *testing.T) {
|
|
|
|
|
|
safeClose(t, consumer)
|
|
|
safeClose(t, master)
|
|
|
+ broker0.Close()
|
|
|
}
|
|
|
|
|
|
|
|
@@ -502,11 +491,8 @@ func TestConsumerNonSequentialOffsets(t *testing.T) {
|
|
|
func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
|
|
|
seedBroker := newMockBroker(t, 10)
|
|
|
- defer seedBroker.Close()
|
|
|
leader0 := newMockBroker(t, 0)
|
|
|
- defer leader0.Close()
|
|
|
leader1 := newMockBroker(t, 1)
|
|
|
- defer leader1.Close()
|
|
|
|
|
|
seedBroker.SetHandlerByMap(map[string]MockResponse{
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
@@ -665,6 +651,9 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
|
|
|
wg.Wait()
|
|
|
safeClose(t, master)
|
|
|
+ leader1.Close()
|
|
|
+ leader0.Close()
|
|
|
+ seedBroker.Close()
|
|
|
}
|
|
|
|
|
|
|
|
@@ -673,8 +662,6 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
func TestConsumerInterleavedClose(t *testing.T) {
|
|
|
|
|
|
broker0 := newMockBroker(t, 0)
|
|
|
- defer broker0.Close()
|
|
|
-
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
@@ -717,6 +704,7 @@ func TestConsumerInterleavedClose(t *testing.T) {
|
|
|
safeClose(t, c1)
|
|
|
safeClose(t, c0)
|
|
|
safeClose(t, master)
|
|
|
+ broker0.Close()
|
|
|
}
|
|
|
|
|
|
func TestConsumerBounceWithReferenceOpen(t *testing.T) {
|
|
@@ -820,8 +808,6 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
|
|
|
func TestConsumerOffsetOutOfRange(t *testing.T) {
|
|
|
|
|
|
broker0 := newMockBroker(t, 2)
|
|
|
- defer broker0.Close()
|
|
|
-
|
|
|
broker0.SetHandlerByMap(map[string]MockResponse{
|
|
|
"MetadataRequest": newMockMetadataResponse(t).
|
|
|
SetBroker(broker0.Addr(), broker0.BrokerID()).
|
|
@@ -848,6 +834,7 @@ func TestConsumerOffsetOutOfRange(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
safeClose(t, master)
|
|
|
+ broker0.Close()
|
|
|
}
|
|
|
|
|
|
func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
|