|
@@ -169,6 +169,64 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) {
|
|
|
+
|
|
|
+ config := NewConfig()
|
|
|
+ config.Consumer.Offsets.AutoCommit.Enable = false
|
|
|
+
|
|
|
+ om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
|
|
|
+ pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
|
|
|
+
|
|
|
+
|
|
|
+ timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval
|
|
|
+
|
|
|
+ ocResponse := new(OffsetCommitResponse)
|
|
|
+ ocResponse.AddError("my_topic", 0, ErrNoError)
|
|
|
+ called := make(chan none)
|
|
|
+ handler := func(req *request) (res encoderWithHeader) {
|
|
|
+ close(called)
|
|
|
+ return ocResponse
|
|
|
+ }
|
|
|
+ coordinator.setHandler(handler)
|
|
|
+
|
|
|
+
|
|
|
+ expected := int64(1)
|
|
|
+ pom.ResetOffset(expected, "modified_meta")
|
|
|
+ _, _ = pom.NextOffset()
|
|
|
+
|
|
|
+ select {
|
|
|
+ case <-called:
|
|
|
+
|
|
|
+ t.Errorf("Received request when AutoCommit is disabled")
|
|
|
+ case <-time.After(timeout):
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ called = make(chan none)
|
|
|
+
|
|
|
+ om.Commit()
|
|
|
+
|
|
|
+ select {
|
|
|
+ case <-called:
|
|
|
+
|
|
|
+
|
|
|
+ case <-time.After(timeout):
|
|
|
+
|
|
|
+ t.Errorf("No request received for after waiting for %v", timeout)
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ broker.Close()
|
|
|
+ coordinator.Close()
|
|
|
+
|
|
|
+
|
|
|
+ safeClose(t, om)
|
|
|
+ safeClose(t, pom)
|
|
|
+ safeClose(t, testClient)
|
|
|
+}
|
|
|
+
|
|
|
|
|
|
|
|
|
func TestOffsetManagerFetchInitialFail(t *testing.T) {
|