|
|
@@ -1,6 +1,7 @@
|
|
|
package sarama
|
|
|
|
|
|
import (
|
|
|
+ "log"
|
|
|
"testing"
|
|
|
"time"
|
|
|
)
|
|
|
@@ -36,7 +37,7 @@ func TestNewOffsetManager(t *testing.T) {
|
|
|
|
|
|
func initPOM(t *testing.T) PartitionOffsetManager {
|
|
|
config := NewConfig()
|
|
|
- config.Metadata.Retry.Max = 0
|
|
|
+ config.Metadata.Retry.Max = 1
|
|
|
config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
|
|
|
|
|
|
broker = newMockBroker(t, 1)
|
|
|
@@ -44,7 +45,8 @@ func initPOM(t *testing.T) PartitionOffsetManager {
|
|
|
|
|
|
seedMeta := new(MetadataResponse)
|
|
|
seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
|
|
|
- // seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
|
|
|
+ seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
|
|
|
+ seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
|
|
|
broker.Returns(seedMeta)
|
|
|
|
|
|
var err error
|
|
|
@@ -118,15 +120,42 @@ func TestSetOffset(t *testing.T) {
|
|
|
coordinator.Close()
|
|
|
}
|
|
|
|
|
|
-func TestCloseErrors(t *testing.T) {
|
|
|
+// This test is not passing
|
|
|
+func TestCommitErr(t *testing.T) {
|
|
|
+ log.Println("TestCommitErr")
|
|
|
+ log.Println("=============")
|
|
|
pom := initPOM(t)
|
|
|
- // pom.errors = "foo"
|
|
|
+
|
|
|
+ ocResponse := new(OffsetCommitResponse)
|
|
|
+ // ocResponse.Errors
|
|
|
+ ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
|
|
|
+ ocResponse.AddError("my_topic", 1, ErrNoError)
|
|
|
+ coordinator.Returns(ocResponse)
|
|
|
+
|
|
|
+ // ocResponse2 := new(OffsetCommitResponse)
|
|
|
+ // // ocResponse.Errors
|
|
|
+ // ocResponse2.AddError("my_topic", 1, ErrNoError)
|
|
|
+ // coordinator.Returns(ocResponse2)
|
|
|
+
|
|
|
+ freshCoordinator := newMockBroker(t, 3)
|
|
|
+
|
|
|
+ // For RefreshCoordinator()
|
|
|
+ coordinatorResponse3 := new(ConsumerMetadataResponse)
|
|
|
+ coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID()
|
|
|
+ coordinatorResponse3.CoordinatorHost = "127.0.0.1"
|
|
|
+ coordinatorResponse3.CoordinatorPort = freshCoordinator.Port()
|
|
|
+ broker.Returns(coordinatorResponse3)
|
|
|
+
|
|
|
+ ocResponse2 := new(OffsetCommitResponse)
|
|
|
+ // ocResponse.Errors
|
|
|
+ // ocResponse2.AddError("my_topic", 0, ErrOffsetOutOfRange)
|
|
|
+ ocResponse2.AddError("my_topic", 0, ErrNoError)
|
|
|
+ freshCoordinator.Returns(ocResponse2)
|
|
|
+
|
|
|
+ pom.SetOffset(100, "modified_meta")
|
|
|
+
|
|
|
err := pom.Close()
|
|
|
if err != nil {
|
|
|
t.Error(err)
|
|
|
}
|
|
|
-
|
|
|
- testClient.Close()
|
|
|
- broker.Close()
|
|
|
- coordinator.Close()
|
|
|
}
|