|
|
@@ -209,6 +209,7 @@ func TestSetOffset(t *testing.T) {
|
|
|
func TestCommitErr(t *testing.T) {
|
|
|
pom := initPOM(t)
|
|
|
|
|
|
+ // Error on one partition
|
|
|
ocResponse := new(OffsetCommitResponse)
|
|
|
ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
|
|
|
ocResponse.AddError("my_topic", 1, ErrNoError)
|
|
|
@@ -223,10 +224,46 @@ func TestCommitErr(t *testing.T) {
|
|
|
CoordinatorPort: newCoordinator.Port(),
|
|
|
})
|
|
|
|
|
|
+ // Nothing in response.Errors at all
|
|
|
ocResponse2 := new(OffsetCommitResponse)
|
|
|
- ocResponse2.AddError("my_topic", 0, ErrNoError)
|
|
|
newCoordinator.Returns(ocResponse2)
|
|
|
|
|
|
+ // For RefreshCoordinator()
|
|
|
+ broker.Returns(&ConsumerMetadataResponse{
|
|
|
+ CoordinatorID: newCoordinator.BrokerID(),
|
|
|
+ CoordinatorHost: "127.0.0.1",
|
|
|
+ CoordinatorPort: newCoordinator.Port(),
|
|
|
+ })
|
|
|
+
|
|
|
+ // Error on the wrong partition for this pom
|
|
|
+ ocResponse3 := new(OffsetCommitResponse)
|
|
|
+ ocResponse3.AddError("my_topic", 1, ErrNoError)
|
|
|
+ newCoordinator.Returns(ocResponse3)
|
|
|
+
|
|
|
+ // For RefreshCoordinator()
|
|
|
+ broker.Returns(&ConsumerMetadataResponse{
|
|
|
+ CoordinatorID: newCoordinator.BrokerID(),
|
|
|
+ CoordinatorHost: "127.0.0.1",
|
|
|
+ CoordinatorPort: newCoordinator.Port(),
|
|
|
+ })
|
|
|
+
|
|
|
+ // ErrUnknownTopicOrPartition/ErrNotLeaderForPartition/ErrLeaderNotAvailable block
|
|
|
+ ocResponse4 := new(OffsetCommitResponse)
|
|
|
+ ocResponse4.AddError("my_topic", 0, ErrUnknownTopicOrPartition)
|
|
|
+ newCoordinator.Returns(ocResponse4)
|
|
|
+
|
|
|
+ // For RefreshCoordinator()
|
|
|
+ broker.Returns(&ConsumerMetadataResponse{
|
|
|
+ CoordinatorID: newCoordinator.BrokerID(),
|
|
|
+ CoordinatorHost: "127.0.0.1",
|
|
|
+ CoordinatorPort: newCoordinator.Port(),
|
|
|
+ })
|
|
|
+
|
|
|
+ // Normal error response
|
|
|
+ ocResponse5 := new(OffsetCommitResponse)
|
|
|
+ ocResponse5.AddError("my_topic", 0, ErrNoError)
|
|
|
+ newCoordinator.Returns(ocResponse5)
|
|
|
+
|
|
|
pom.SetOffset(100, "modified_meta")
|
|
|
|
|
|
err := pom.Close()
|