|
|
@@ -5,13 +5,16 @@ import (
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
-func initOffsetManager(t *testing.T) (om OffsetManager,
|
|
|
+func initOffsetManager(t *testing.T, retention time.Duration) (om OffsetManager,
|
|
|
testClient Client, broker, coordinator *MockBroker) {
|
|
|
|
|
|
config := NewConfig()
|
|
|
config.Metadata.Retry.Max = 1
|
|
|
config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
|
|
|
config.Version = V0_9_0_0
|
|
|
+ if retention > 0 {
|
|
|
+ config.Consumer.Offsets.Retention = retention
|
|
|
+ }
|
|
|
|
|
|
broker = NewMockBroker(t, 1)
|
|
|
coordinator = NewMockBroker(t, 2)
|
|
|
@@ -87,7 +90,7 @@ func TestNewOffsetManager(t *testing.T) {
|
|
|
// Test recovery from ErrNotCoordinatorForConsumer
|
|
|
// on first fetchInitialOffset call
|
|
|
func TestOffsetManagerFetchInitialFail(t *testing.T) {
|
|
|
- om, testClient, broker, coordinator := initOffsetManager(t)
|
|
|
+ om, testClient, broker, coordinator := initOffsetManager(t, 0)
|
|
|
|
|
|
// Error on first fetchInitialOffset call
|
|
|
responseBlock := OffsetFetchResponseBlock{
|
|
|
@@ -130,7 +133,7 @@ func TestOffsetManagerFetchInitialFail(t *testing.T) {
|
|
|
|
|
|
// Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
|
|
|
func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
|
|
|
- om, testClient, broker, coordinator := initOffsetManager(t)
|
|
|
+ om, testClient, broker, coordinator := initOffsetManager(t, 0)
|
|
|
|
|
|
// Error on first fetchInitialOffset call
|
|
|
responseBlock := OffsetFetchResponseBlock{
|
|
|
@@ -163,7 +166,7 @@ func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
|
|
|
- om, testClient, broker, coordinator := initOffsetManager(t)
|
|
|
+ om, testClient, broker, coordinator := initOffsetManager(t, 0)
|
|
|
testClient.Config().Consumer.Offsets.Initial = OffsetOldest
|
|
|
|
|
|
// Kafka returns -1 if no offset has been stored for this partition yet.
|
|
|
@@ -185,7 +188,7 @@ func TestPartitionOffsetManagerInitialOffset(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestPartitionOffsetManagerNextOffset(t *testing.T) {
|
|
|
- om, testClient, broker, coordinator := initOffsetManager(t)
|
|
|
+ om, testClient, broker, coordinator := initOffsetManager(t, 0)
|
|
|
pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")
|
|
|
|
|
|
offset, meta := pom.NextOffset()
|
|
|
@@ -204,7 +207,7 @@ func TestPartitionOffsetManagerNextOffset(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestPartitionOffsetManagerResetOffset(t *testing.T) {
|
|
|
- om, testClient, broker, coordinator := initOffsetManager(t)
|
|
|
+ om, testClient, broker, coordinator := initOffsetManager(t, 0)
|
|
|
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
|
|
|
|
|
|
ocResponse := new(OffsetCommitResponse)
|
|
|
@@ -230,9 +233,7 @@ func TestPartitionOffsetManagerResetOffset(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
|
|
|
- om, testClient, broker, coordinator := initOffsetManager(t)
|
|
|
- testClient.Config().Consumer.Offsets.Retention = time.Hour
|
|
|
-
|
|
|
+ om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
|
|
|
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
|
|
|
|
|
|
ocResponse := new(OffsetCommitResponse)
|
|
|
@@ -268,7 +269,7 @@ func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
|
|
|
- om, testClient, broker, coordinator := initOffsetManager(t)
|
|
|
+ om, testClient, broker, coordinator := initOffsetManager(t, 0)
|
|
|
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
|
|
|
|
|
|
ocResponse := new(OffsetCommitResponse)
|
|
|
@@ -293,9 +294,7 @@ func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
|
|
|
- om, testClient, broker, coordinator := initOffsetManager(t)
|
|
|
- testClient.Config().Consumer.Offsets.Retention = time.Hour
|
|
|
-
|
|
|
+ om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
|
|
|
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
|
|
|
|
|
|
ocResponse := new(OffsetCommitResponse)
|
|
|
@@ -330,7 +329,7 @@ func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestPartitionOffsetManagerCommitErr(t *testing.T) {
|
|
|
- om, testClient, broker, coordinator := initOffsetManager(t)
|
|
|
+ om, testClient, broker, coordinator := initOffsetManager(t, 0)
|
|
|
pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
|
|
|
|
|
|
// Error on one partition
|
|
|
@@ -394,7 +393,7 @@ func TestPartitionOffsetManagerCommitErr(t *testing.T) {
|
|
|
|
|
|
// Test of recovery from abort
|
|
|
func TestAbortPartitionOffsetManager(t *testing.T) {
|
|
|
- om, testClient, broker, coordinator := initOffsetManager(t)
|
|
|
+ om, testClient, broker, coordinator := initOffsetManager(t, 0)
|
|
|
pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
|
|
|
|
|
|
// this triggers an error in the CommitOffset request,
|