Browse Source

Merge pull request #1096 from RussellLuo/add-support-for-delete-groups

Add support for DeleteGroups
Evan Huus 7 years ago
parent
commit
3c763ff04e
7 changed files with 250 additions and 16 deletions
  1. 11 0
      broker.go
  2. 46 16
      broker_test.go
  3. 30 0
      delete_groups_request.go
  4. 34 0
      delete_groups_request_test.go
  5. 70 0
      delete_groups_response.go
  6. 57 0
      delete_groups_response_test.go
  7. 2 0
      request.go

+ 11 - 0
broker.go

@@ -539,6 +539,17 @@ func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsRespon
 
 	return response, nil
 }
+
+func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
+	response := new(DeleteGroupsResponse)
+
+	if err := b.sendAndReceive(request, response); err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
 func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
 	b.lock.Lock()
 	defer b.lock.Unlock()

+ 46 - 16
broker_test.go

@@ -71,7 +71,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
 		// Set the broker id in order to validate local broker metrics
 		broker.id = 0
 		conf := NewConfig()
-		conf.Version = V0_10_0_0
+		conf.Version = tt.version
 		err := broker.Open(conf)
 		if err != nil {
 			t.Fatal(err)
@@ -97,11 +97,13 @@ func TestSimpleBrokerCommunication(t *testing.T) {
 
 // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
 var brokerTestTable = []struct {
+	version  KafkaVersion
 	name     string
 	response []byte
 	runner   func(*testing.T, *Broker)
 }{
-	{"MetadataRequest",
+	{V0_10_0_0,
+		"MetadataRequest",
 		[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := MetadataRequest{}
@@ -114,7 +116,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{"ConsumerMetadataRequest",
+	{V0_10_0_0,
+		"ConsumerMetadataRequest",
 		[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := ConsumerMetadataRequest{}
@@ -127,7 +130,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{"ProduceRequest (NoResponse)",
+	{V0_10_0_0,
+		"ProduceRequest (NoResponse)",
 		[]byte{},
 		func(t *testing.T, broker *Broker) {
 			request := ProduceRequest{}
@@ -141,7 +145,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{"ProduceRequest (WaitForLocal)",
+	{V0_10_0_0,
+		"ProduceRequest (WaitForLocal)",
 		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := ProduceRequest{}
@@ -155,7 +160,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{"FetchRequest",
+	{V0_10_0_0,
+		"FetchRequest",
 		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := FetchRequest{}
@@ -168,7 +174,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{"OffsetFetchRequest",
+	{V0_10_0_0,
+		"OffsetFetchRequest",
 		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := OffsetFetchRequest{}
@@ -181,7 +188,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{"OffsetCommitRequest",
+	{V0_10_0_0,
+		"OffsetCommitRequest",
 		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := OffsetCommitRequest{}
@@ -194,7 +202,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{"OffsetRequest",
+	{V0_10_0_0,
+		"OffsetRequest",
 		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := OffsetRequest{}
@@ -207,7 +216,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{"JoinGroupRequest",
+	{V0_10_0_0,
+		"JoinGroupRequest",
 		[]byte{0x00, 0x17, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := JoinGroupRequest{}
@@ -220,7 +230,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{"SyncGroupRequest",
+	{V0_10_0_0,
+		"SyncGroupRequest",
 		[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := SyncGroupRequest{}
@@ -233,7 +244,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{"LeaveGroupRequest",
+	{V0_10_0_0,
+		"LeaveGroupRequest",
 		[]byte{0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := LeaveGroupRequest{}
@@ -246,7 +258,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{"HeartbeatRequest",
+	{V0_10_0_0,
+		"HeartbeatRequest",
 		[]byte{0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := HeartbeatRequest{}
@@ -259,7 +272,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{"ListGroupsRequest",
+	{V0_10_0_0,
+		"ListGroupsRequest",
 		[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := ListGroupsRequest{}
@@ -272,7 +286,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{"DescribeGroupsRequest",
+	{V0_10_0_0,
+		"DescribeGroupsRequest",
 		[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := DescribeGroupsRequest{}
@@ -285,7 +300,8 @@ var brokerTestTable = []struct {
 			}
 		}},
 
-	{"ApiVersionsRequest",
+	{V0_10_0_0,
+		"ApiVersionsRequest",
 		[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := ApiVersionsRequest{}
@@ -297,6 +313,20 @@ var brokerTestTable = []struct {
 				t.Error("ApiVersions request got no response!")
 			}
 		}},
+
+	{V1_1_0_0,
+		"DeleteGroupsRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
+		func(t *testing.T, broker *Broker) {
+			request := DeleteGroupsRequest{}
+			response, err := broker.DeleteGroups(&request)
+			if err != nil {
+				t.Error(err)
+			}
+			if response == nil {
+				t.Error("DeleteGroups request got no response!")
+			}
+		}},
 }
 
 func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {

+ 30 - 0
delete_groups_request.go

@@ -0,0 +1,30 @@
+package sarama
+
+type DeleteGroupsRequest struct {
+	Groups []string
+}
+
+func (r *DeleteGroupsRequest) encode(pe packetEncoder) error {
+	return pe.putStringArray(r.Groups)
+}
+
+func (r *DeleteGroupsRequest) decode(pd packetDecoder, version int16) (err error) {
+	r.Groups, err = pd.getStringArray()
+	return
+}
+
+func (r *DeleteGroupsRequest) key() int16 {
+	return 42
+}
+
+func (r *DeleteGroupsRequest) version() int16 {
+	return 0
+}
+
+func (r *DeleteGroupsRequest) requiredVersion() KafkaVersion {
+	return V1_1_0_0
+}
+
+func (r *DeleteGroupsRequest) AddGroup(group string) {
+	r.Groups = append(r.Groups, group)
+}

+ 34 - 0
delete_groups_request_test.go

@@ -0,0 +1,34 @@
+package sarama
+
+import "testing"
+
+var (
+	emptyDeleteGroupsRequest = []byte{0, 0, 0, 0}
+
+	singleDeleteGroupsRequest = []byte{
+		0, 0, 0, 1, // 1 group
+		0, 3, 'f', 'o', 'o', // group name: foo
+	}
+
+	doubleDeleteGroupsRequest = []byte{
+		0, 0, 0, 2, // 2 groups
+		0, 3, 'f', 'o', 'o', // group name: foo
+		0, 3, 'b', 'a', 'r', // group name: foo
+	}
+)
+
+func TestDeleteGroupsRequest(t *testing.T) {
+	var request *DeleteGroupsRequest
+
+	request = new(DeleteGroupsRequest)
+	testRequest(t, "no groups", request, emptyDeleteGroupsRequest)
+
+	request = new(DeleteGroupsRequest)
+	request.AddGroup("foo")
+	testRequest(t, "one group", request, singleDeleteGroupsRequest)
+
+	request = new(DeleteGroupsRequest)
+	request.AddGroup("foo")
+	request.AddGroup("bar")
+	testRequest(t, "two groups", request, doubleDeleteGroupsRequest)
+}

+ 70 - 0
delete_groups_response.go

@@ -0,0 +1,70 @@
+package sarama
+
+import (
+	"time"
+)
+
+type DeleteGroupsResponse struct {
+	ThrottleTime    time.Duration
+	GroupErrorCodes map[string]KError
+}
+
+func (r *DeleteGroupsResponse) encode(pe packetEncoder) error {
+	pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
+
+	if err := pe.putArrayLength(len(r.GroupErrorCodes)); err != nil {
+		return err
+	}
+	for groupID, errorCode := range r.GroupErrorCodes {
+		if err := pe.putString(groupID); err != nil {
+			return err
+		}
+		pe.putInt16(int16(errorCode))
+	}
+
+	return nil
+}
+
+func (r *DeleteGroupsResponse) decode(pd packetDecoder, version int16) error {
+	throttleTime, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+	if n == 0 {
+		return nil
+	}
+
+	r.GroupErrorCodes = make(map[string]KError, n)
+	for i := 0; i < n; i++ {
+		groupID, err := pd.getString()
+		if err != nil {
+			return err
+		}
+		errorCode, err := pd.getInt16()
+		if err != nil {
+			return err
+		}
+
+		r.GroupErrorCodes[groupID] = KError(errorCode)
+	}
+
+	return nil
+}
+
+func (r *DeleteGroupsResponse) key() int16 {
+	return 42
+}
+
+func (r *DeleteGroupsResponse) version() int16 {
+	return 0
+}
+
+func (r *DeleteGroupsResponse) requiredVersion() KafkaVersion {
+	return V1_1_0_0
+}

+ 57 - 0
delete_groups_response_test.go

@@ -0,0 +1,57 @@
+package sarama
+
+import (
+	"testing"
+)
+
+var (
+	emptyDeleteGroupsResponse = []byte{
+		0, 0, 0, 0, // does not violate any quota
+		0, 0, 0, 0, // no groups
+	}
+
+	errorDeleteGroupsResponse = []byte{
+		0, 0, 0, 0, // does not violate any quota
+		0, 0, 0, 1, // 1 group
+		0, 3, 'f', 'o', 'o', // group name
+		0, 31, // error ErrClusterAuthorizationFailed
+	}
+
+	noErrorDeleteGroupsResponse = []byte{
+		0, 0, 0, 0, // does not violate any quota
+		0, 0, 0, 1, // 1 group
+		0, 3, 'f', 'o', 'o', // group name
+		0, 0, // no error
+	}
+)
+
+func TestDeleteGroupsResponse(t *testing.T) {
+	var response *DeleteGroupsResponse
+
+	response = new(DeleteGroupsResponse)
+	testVersionDecodable(t, "empty", response, emptyDeleteGroupsResponse, 0)
+	if response.ThrottleTime != 0 {
+		t.Error("Expected no violation")
+	}
+	if len(response.GroupErrorCodes) != 0 {
+		t.Error("Expected no groups")
+	}
+
+	response = new(DeleteGroupsResponse)
+	testVersionDecodable(t, "error", response, errorDeleteGroupsResponse, 0)
+	if response.ThrottleTime != 0 {
+		t.Error("Expected no violation")
+	}
+	if response.GroupErrorCodes["foo"] != ErrClusterAuthorizationFailed {
+		t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"])
+	}
+
+	response = new(DeleteGroupsResponse)
+	testVersionDecodable(t, "no error", response, noErrorDeleteGroupsResponse, 0)
+	if response.ThrottleTime != 0 {
+		t.Error("Expected no violation")
+	}
+	if response.GroupErrorCodes["foo"] != ErrNoError {
+		t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"])
+	}
+}

+ 2 - 0
request.go

@@ -142,6 +142,8 @@ func allocateBody(key, version int16) protocolBody {
 		return &AlterConfigsRequest{}
 	case 37:
 		return &CreatePartitionsRequest{}
+	case 42:
+		return &DeleteGroupsRequest{}
 	}
 	return nil
 }