Browse Source

Merge pull request #1374 from dwi-di/consumer-group-offsets-nil

support ListConsumerGroupOffsets without topicPartition
Vlad Gorodetsky 6 years ago
parent
commit
d9dbc20e79
3 changed files with 16 additions and 3 deletions
  1. 3 1
      admin.go
  2. 1 1
      admin_test.go
  3. 12 1
      mockresponses.go

+ 3 - 1
admin.go

@@ -611,7 +611,9 @@ func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions m
 		partitions:    topicPartitions,
 	}
 
-	if ca.conf.Version.IsAtLeast(V0_8_2_2) {
+	if ca.conf.Version.IsAtLeast(V0_10_2_0) {
+		request.Version = 2
+	} else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
 		request.Version = 1
 	}
 

+ 1 - 1
admin_test.go

@@ -822,7 +822,7 @@ func TestListConsumerGroupOffsets(t *testing.T) {
 	expectedOffset := int64(0)
 
 	seedBroker.SetHandlerByMap(map[string]MockResponse{
-		"OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
+		"OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError).SetError(ErrNoError),
 		"MetadataRequest": NewMockMetadataResponse(t).
 			SetController(seedBroker.BrokerID()).
 			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),

+ 12 - 1
mockresponses.go

@@ -574,6 +574,7 @@ func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
 // MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
 type MockOffsetFetchResponse struct {
 	offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
+	error   KError
 	t       TestReporter
 }
 
@@ -599,15 +600,25 @@ func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int3
 	return mr
 }
 
+func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
+	mr.error = kerror
+	return mr
+}
+
 func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
 	req := reqBody.(*OffsetFetchRequest)
 	group := req.ConsumerGroup
-	res := &OffsetFetchResponse{}
+	res := &OffsetFetchResponse{Version: req.Version}
+
 	for topic, partitions := range mr.offsets[group] {
 		for partition, block := range partitions {
 			res.AddBlock(topic, partition, block)
 		}
 	}
+
+	if res.Version >= 2 {
+		res.Err = mr.error
+	}
 	return res
 }