Jelajahi Sumber

Add support for DeleteGroups

Fixed #1095.
RussellLuo 6 tahun lalu
induk
melakukan
0f1c3e9b87

+ 11 - 0
broker.go

@@ -539,6 +539,17 @@ func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsRespon
 
 	return response, nil
 }
+
+func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) {
+	response := new(DeleteGroupsResponse)
+
+	if err := b.sendAndReceive(request, response); err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
 func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
 	b.lock.Lock()
 	defer b.lock.Unlock()

+ 13 - 0
broker_test.go

@@ -297,6 +297,19 @@ var brokerTestTable = []struct {
 				t.Error("ApiVersions request got no response!")
 			}
 		}},
+
+	{"DeleteGroupsRequest",
+		[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
+		func(t *testing.T, broker *Broker) {
+			request := DeleteGroupsRequest{}
+			response, err := broker.DeleteGroups(&request)
+			if err != nil {
+				t.Error(err)
+			}
+			if response == nil {
+				t.Error("DeleteGroups request got no response!")
+			}
+		}},
 }
 
 func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics brokerMetrics) {

+ 30 - 0
delete_groups_request.go

@@ -0,0 +1,30 @@
+package sarama
+
+type DeleteGroupsRequest struct {
+	Groups []string
+}
+
+func (r *DeleteGroupsRequest) encode(pe packetEncoder) error {
+	return pe.putStringArray(r.Groups)
+}
+
+func (r *DeleteGroupsRequest) decode(pd packetDecoder, version int16) (err error) {
+	r.Groups, err = pd.getStringArray()
+	return
+}
+
+func (r *DeleteGroupsRequest) key() int16 {
+	return 42
+}
+
+func (r *DeleteGroupsRequest) version() int16 {
+	return 0
+}
+
+func (r *DeleteGroupsRequest) requiredVersion() KafkaVersion {
+	return V1_1_0_0
+}
+
+func (r *DeleteGroupsRequest) AddGroup(group string) {
+	r.Groups = append(r.Groups, group)
+}

+ 34 - 0
delete_groups_request_test.go

@@ -0,0 +1,34 @@
+package sarama
+
+import "testing"
+
+var (
+	emptyDeleteGroupsRequest = []byte{0, 0, 0, 0}
+
+	singleDeleteGroupsRequest = []byte{
+		0, 0, 0, 1, // 1 group
+		0, 3, 'f', 'o', 'o', // group name: foo
+	}
+
+	doubleDeleteGroupsRequest = []byte{
+		0, 0, 0, 2, // 2 groups
+		0, 3, 'f', 'o', 'o', // group name: foo
+		0, 3, 'b', 'a', 'r', // group name: foo
+	}
+)
+
+func TestDeleteGroupsRequest(t *testing.T) {
+	var request *DeleteGroupsRequest
+
+	request = new(DeleteGroupsRequest)
+	testRequest(t, "no groups", request, emptyDeleteGroupsRequest)
+
+	request = new(DeleteGroupsRequest)
+	request.AddGroup("foo")
+	testRequest(t, "one group", request, singleDeleteGroupsRequest)
+
+	request = new(DeleteGroupsRequest)
+	request.AddGroup("foo")
+	request.AddGroup("bar")
+	testRequest(t, "two groups", request, doubleDeleteGroupsRequest)
+}

+ 67 - 0
delete_groups_response.go

@@ -0,0 +1,67 @@
+package sarama
+
+type DeleteGroupsResponse struct {
+	ThrottleTimeMs  int32
+	GroupErrorCodes map[string]KError
+}
+
+func (r *DeleteGroupsResponse) encode(pe packetEncoder) error {
+	pe.putInt32(r.ThrottleTimeMs)
+
+	if err := pe.putArrayLength(len(r.GroupErrorCodes)); err != nil {
+		return err
+	}
+	for groupID, errorCode := range r.GroupErrorCodes {
+		if err := pe.putString(groupID); err != nil {
+			return err
+		}
+		pe.putInt16(int16(errorCode))
+	}
+
+	return nil
+}
+
+func (r *DeleteGroupsResponse) decode(pd packetDecoder, version int16) error {
+	throttleTimeMs, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+
+	r.ThrottleTimeMs = throttleTimeMs
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+	if n == 0 {
+		return nil
+	}
+
+	r.GroupErrorCodes = make(map[string]KError, n)
+	for i := 0; i < n; i++ {
+		groupID, err := pd.getString()
+		if err != nil {
+			return err
+		}
+		errorCode, err := pd.getInt16()
+		if err != nil {
+			return err
+		}
+
+		r.GroupErrorCodes[groupID] = KError(errorCode)
+	}
+
+	return nil
+}
+
+func (r *DeleteGroupsResponse) key() int16 {
+	return 42
+}
+
+func (r *DeleteGroupsResponse) version() int16 {
+	return 0
+}
+
+func (r *DeleteGroupsResponse) requiredVersion() KafkaVersion {
+	return V1_1_0_0
+}

+ 57 - 0
delete_groups_response_test.go

@@ -0,0 +1,57 @@
+package sarama
+
+import (
+	"testing"
+)
+
+var (
+	emptyDeleteGroupsResponse = []byte{
+		0, 0, 0, 0, // does not violate any quota
+		0, 0, 0, 0, // no groups
+	}
+
+	errorDeleteGroupsResponse = []byte{
+		0, 0, 0, 0, // does not violate any quota
+		0, 0, 0, 1, // 1 group
+		0, 3, 'f', 'o', 'o', // group name
+		0, 31, // error ErrClusterAuthorizationFailed
+	}
+
+	noErrorDeleteGroupsResponse = []byte{
+		0, 0, 0, 0, // does not violate any quota
+		0, 0, 0, 1, // 1 group
+		0, 3, 'f', 'o', 'o', // group name
+		0, 0, // no error
+	}
+)
+
+func TestDeleteGroupsResponse(t *testing.T) {
+	var response *DeleteGroupsResponse
+
+	response = new(DeleteGroupsResponse)
+	testVersionDecodable(t, "empty", response, emptyDeleteGroupsResponse, 0)
+	if response.ThrottleTimeMs != 0 {
+		t.Error("Expected no violation")
+	}
+	if len(response.GroupErrorCodes) != 0 {
+		t.Error("Expected no groups")
+	}
+
+	response = new(DeleteGroupsResponse)
+	testVersionDecodable(t, "error", response, errorDeleteGroupsResponse, 0)
+	if response.ThrottleTimeMs != 0 {
+		t.Error("Expected no violation")
+	}
+	if response.GroupErrorCodes["foo"] != ErrClusterAuthorizationFailed {
+		t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"])
+	}
+
+	response = new(DeleteGroupsResponse)
+	testVersionDecodable(t, "no error", response, noErrorDeleteGroupsResponse, 0)
+	if response.ThrottleTimeMs != 0 {
+		t.Error("Expected no violation")
+	}
+	if response.GroupErrorCodes["foo"] != ErrNoError {
+		t.Error("Expected error ErrClusterAuthorizationFailed, found:", response.GroupErrorCodes["foo"])
+	}
+}

+ 2 - 0
request.go

@@ -142,6 +142,8 @@ func allocateBody(key, version int16) protocolBody {
 		return &AlterConfigsRequest{}
 	case 37:
 		return &CreatePartitionsRequest{}
+	case 42:
+		return &DeleteGroupsRequest{}
 	}
 	return nil
 }