浏览代码

Merge pull request #577 from Shopify/kafka_09_protocol

Implement Kafka 0.9 protocol additions
Willem van Bergen 10 年之前
父节点
当前提交
159e9990b0

+ 26 - 0
describe_groups_request.go

@@ -0,0 +1,26 @@
+package sarama
+
+type DescribeGroupsRequest struct {
+	Groups []string
+}
+
+func (r *DescribeGroupsRequest) encode(pe packetEncoder) error {
+	return pe.putStringArray(r.Groups)
+}
+
+func (r *DescribeGroupsRequest) decode(pd packetDecoder) (err error) {
+	r.Groups, err = pd.getStringArray()
+	return
+}
+
+func (r *DescribeGroupsRequest) key() int16 {
+	return 15
+}
+
+func (r *DescribeGroupsRequest) version() int16 {
+	return 0
+}
+
+func (r *DescribeGroupsRequest) AddGroup(group string) {
+	r.Groups = append(r.Groups, group)
+}

+ 34 - 0
describe_groups_request_test.go

@@ -0,0 +1,34 @@
+package sarama
+
+import "testing"
+
+var (
+	emptyDescribeGroupsRequest = []byte{0, 0, 0, 0}
+
+	singleDescribeGroupsRequest = []byte{
+		0, 0, 0, 1, // 1 group
+		0, 3, 'f', 'o', 'o', // group name: foo
+	}
+
+	doubleDescribeGroupsRequest = []byte{
+		0, 0, 0, 2, // 2 groups
+		0, 3, 'f', 'o', 'o', // group name: foo
+		0, 3, 'b', 'a', 'r', // group name: foo
+	}
+)
+
+func TestDescribeGroupsRequest(t *testing.T) {
+	var request *DescribeGroupsRequest
+
+	request = new(DescribeGroupsRequest)
+	testRequest(t, "no groups", request, emptyDescribeGroupsRequest)
+
+	request = new(DescribeGroupsRequest)
+	request.AddGroup("foo")
+	testRequest(t, "one group", request, singleDescribeGroupsRequest)
+
+	request = new(DescribeGroupsRequest)
+	request.AddGroup("foo")
+	request.AddGroup("bar")
+	testRequest(t, "two groups", request, doubleDescribeGroupsRequest)
+}

+ 162 - 0
describe_groups_response.go

@@ -0,0 +1,162 @@
+package sarama
+
+type DescribeGroupsResponse struct {
+	Groups []*GroupDescription
+}
+
+func (r *DescribeGroupsResponse) encode(pe packetEncoder) error {
+	if err := pe.putArrayLength(len(r.Groups)); err != nil {
+		return err
+	}
+
+	for _, groupDescription := range r.Groups {
+		if err := groupDescription.encode(pe); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (r *DescribeGroupsResponse) decode(pd packetDecoder) (err error) {
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	r.Groups = make([]*GroupDescription, n)
+	for i := 0; i < n; i++ {
+		r.Groups[i] = new(GroupDescription)
+		if err := r.Groups[i].decode(pd); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+type GroupDescription struct {
+	Err          KError
+	GroupId      string
+	State        string
+	ProtocolType string
+	Protocol     string
+	Members      map[string]*GroupMemberDescription
+}
+
+func (gd *GroupDescription) encode(pe packetEncoder) error {
+	pe.putInt16(int16(gd.Err))
+
+	if err := pe.putString(gd.GroupId); err != nil {
+		return err
+	}
+	if err := pe.putString(gd.State); err != nil {
+		return err
+	}
+	if err := pe.putString(gd.ProtocolType); err != nil {
+		return err
+	}
+	if err := pe.putString(gd.Protocol); err != nil {
+		return err
+	}
+
+	if err := pe.putArrayLength(len(gd.Members)); err != nil {
+		return err
+	}
+
+	for memberId, groupMemberDescription := range gd.Members {
+		if err := pe.putString(memberId); err != nil {
+			return err
+		}
+		if err := groupMemberDescription.encode(pe); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (gd *GroupDescription) decode(pd packetDecoder) (err error) {
+	if kerr, err := pd.getInt16(); err != nil {
+		return err
+	} else {
+		gd.Err = KError(kerr)
+	}
+
+	if gd.GroupId, err = pd.getString(); err != nil {
+		return
+	}
+	if gd.State, err = pd.getString(); err != nil {
+		return
+	}
+	if gd.ProtocolType, err = pd.getString(); err != nil {
+		return
+	}
+	if gd.Protocol, err = pd.getString(); err != nil {
+		return
+	}
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+	if n == 0 {
+		return nil
+	}
+
+	gd.Members = make(map[string]*GroupMemberDescription)
+	for i := 0; i < n; i++ {
+		memberId, err := pd.getString()
+		if err != nil {
+			return err
+		}
+
+		gd.Members[memberId] = new(GroupMemberDescription)
+		if err := gd.Members[memberId].decode(pd); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+type GroupMemberDescription struct {
+	ClientId         string
+	ClientHost       string
+	MemberMetadata   []byte
+	MemberAssignment []byte
+}
+
+func (gmd *GroupMemberDescription) encode(pe packetEncoder) error {
+	if err := pe.putString(gmd.ClientId); err != nil {
+		return err
+	}
+	if err := pe.putString(gmd.ClientHost); err != nil {
+		return err
+	}
+	if err := pe.putBytes(gmd.MemberMetadata); err != nil {
+		return err
+	}
+	if err := pe.putBytes(gmd.MemberAssignment); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (gmd *GroupMemberDescription) decode(pd packetDecoder) (err error) {
+	if gmd.ClientId, err = pd.getString(); err != nil {
+		return
+	}
+	if gmd.ClientHost, err = pd.getString(); err != nil {
+		return
+	}
+	if gmd.MemberMetadata, err = pd.getBytes(); err != nil {
+		return
+	}
+	if gmd.MemberAssignment, err = pd.getBytes(); err != nil {
+		return
+	}
+
+	return nil
+}

+ 91 - 0
describe_groups_response_test.go

@@ -0,0 +1,91 @@
+package sarama
+
+import (
+	"reflect"
+	"testing"
+)
+
+var (
+	describeGroupsResponseEmpty = []byte{
+		0, 0, 0, 0, // no groups
+	}
+
+	describeGroupsResponsePopulated = []byte{
+		0, 0, 0, 2, // 2 groups
+
+		0, 0, // no error
+		0, 3, 'f', 'o', 'o', // Group ID
+		0, 3, 'b', 'a', 'r', // State
+		0, 8, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // ConsumerProtocol type
+		0, 3, 'b', 'a', 'z', // Protocol name
+		0, 0, 0, 1, // 1 member
+		0, 2, 'i', 'd', // Member ID
+		0, 6, 's', 'a', 'r', 'a', 'm', 'a', // Client ID
+		0, 9, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't', // Client Host
+		0, 0, 0, 3, 0x01, 0x02, 0x03, // MemberMetadata
+		0, 0, 0, 3, 0x04, 0x05, 0x06, // MemberAssignment
+
+		0, 30, // ErrGroupAuthorizationFailed
+		0, 0,
+		0, 0,
+		0, 0,
+		0, 0,
+		0, 0, 0, 0,
+	}
+)
+
+func TestDescribeGroupsResponse(t *testing.T) {
+	var response *DescribeGroupsResponse
+
+	response = new(DescribeGroupsResponse)
+	testDecodable(t, "empty", response, describeGroupsResponseEmpty)
+	if len(response.Groups) != 0 {
+		t.Error("Expected no groups")
+	}
+
+	response = new(DescribeGroupsResponse)
+	testDecodable(t, "populated", response, describeGroupsResponsePopulated)
+	if len(response.Groups) != 2 {
+		t.Error("Expected two groups")
+	}
+
+	group0 := response.Groups[0]
+	if group0.Err != ErrNoError {
+		t.Error("Unxpected groups[0].Err, found", group0.Err)
+	}
+	if group0.GroupId != "foo" {
+		t.Error("Unxpected groups[0].GroupId, found", group0.GroupId)
+	}
+	if group0.State != "bar" {
+		t.Error("Unxpected groups[0].State, found", group0.State)
+	}
+	if group0.ProtocolType != "consumer" {
+		t.Error("Unxpected groups[0].ProtocolType, found", group0.ProtocolType)
+	}
+	if group0.Protocol != "baz" {
+		t.Error("Unxpected groups[0].Protocol, found", group0.Protocol)
+	}
+	if len(group0.Members) != 1 {
+		t.Error("Unxpected groups[0].Members, found", group0.Members)
+	}
+	if group0.Members["id"].ClientId != "sarama" {
+		t.Error("Unxpected groups[0].Members[id].ClientId, found", group0.Members["id"].ClientId)
+	}
+	if group0.Members["id"].ClientHost != "localhost" {
+		t.Error("Unxpected groups[0].Members[id].ClientHost, found", group0.Members["id"].ClientHost)
+	}
+	if !reflect.DeepEqual(group0.Members["id"].MemberMetadata, []byte{0x01, 0x02, 0x03}) {
+		t.Error("Unxpected groups[0].Members[id].MemberMetadata, found", group0.Members["id"].MemberMetadata)
+	}
+	if !reflect.DeepEqual(group0.Members["id"].MemberAssignment, []byte{0x04, 0x05, 0x06}) {
+		t.Error("Unxpected groups[0].Members[id].MemberAssignment, found", group0.Members["id"].MemberAssignment)
+	}
+
+	group1 := response.Groups[1]
+	if group1.Err != ErrGroupAuthorizationFailed {
+		t.Error("Unxpected groups[1].Err, found", group0.Err)
+	}
+	if len(group1.Members) != 0 {
+		t.Error("Unxpected groups[1].Members, found", group0.Members)
+	}
+}

+ 33 - 0
errors.go

@@ -92,6 +92,17 @@ const (
 	ErrMessageSetSizeTooLarge          KError = 18
 	ErrNotEnoughReplicas               KError = 19
 	ErrNotEnoughReplicasAfterAppend    KError = 20
+	ErrInvalidRequiredAcks             KError = 21
+	ErrIllegalGeneration               KError = 22
+	ErrInconsistentGroupProtocol       KError = 23
+	ErrInvalidGroupId                  KError = 24
+	ErrUnknownMemberId                 KError = 25
+	ErrInvalidSessionTimeout           KError = 26
+	ErrRebalanceInProgress             KError = 27
+	ErrInvalidCommitOffsetSize         KError = 28
+	ErrTopicAuthorizationFailed        KError = 29
+	ErrGroupAuthorizationFailed        KError = 30
+	ErrClusterAuthorizationFailed      KError = 31
 )
 
 func (err KError) Error() string {
@@ -140,6 +151,28 @@ func (err KError) Error() string {
 		return "kafka server: Messages are rejected since there are fewer in-sync replicas than required."
 	case ErrNotEnoughReplicasAfterAppend:
 		return "kafka server: Messages are written to the log, but to fewer in-sync replicas than required."
+	case ErrInvalidRequiredAcks:
+		return "kafka server: The number of required acks is invalid (should be either -1, 0, or 1)."
+	case ErrIllegalGeneration:
+		return "kafka server: The provided generation id is not the current generation."
+	case ErrInconsistentGroupProtocol:
+		return "kafka server: The provider group protocol type is incompatible with the other members."
+	case ErrInvalidGroupId:
+		return "kafka server: The provided group id was empty."
+	case ErrUnknownMemberId:
+		return "kafka server: The provided member is not known in the current generation."
+	case ErrInvalidSessionTimeout:
+		return "kafka server: The provided session timeout is outside the allowed range."
+	case ErrRebalanceInProgress:
+		return "kafka server: A rebalance for the group is in progress. Please re-join the group."
+	case ErrInvalidCommitOffsetSize:
+		return "kafka server: The provided commit metadata was too large."
+	case ErrTopicAuthorizationFailed:
+		return "kafka server: The client is not authorized to access this topic."
+	case ErrGroupAuthorizationFailed:
+		return "kafka server: The client is not authorized to access this group."
+	case ErrClusterAuthorizationFailed:
+		return "kafka server: The client is not authorized to send this request type."
 	}
 
 	return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)

+ 43 - 0
heartbeat_request.go

@@ -0,0 +1,43 @@
+package sarama
+
+type HeartbeatRequest struct {
+	GroupId      string
+	GenerationId int32
+	MemberId     string
+}
+
+func (r *HeartbeatRequest) encode(pe packetEncoder) error {
+	if err := pe.putString(r.GroupId); err != nil {
+		return err
+	}
+
+	pe.putInt32(r.GenerationId)
+
+	if err := pe.putString(r.MemberId); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (r *HeartbeatRequest) decode(pd packetDecoder) (err error) {
+	if r.GroupId, err = pd.getString(); err != nil {
+		return
+	}
+	if r.GenerationId, err = pd.getInt32(); err != nil {
+		return
+	}
+	if r.MemberId, err = pd.getString(); err != nil {
+		return
+	}
+
+	return nil
+}
+
+func (r *HeartbeatRequest) key() int16 {
+	return 12
+}
+
+func (r *HeartbeatRequest) version() int16 {
+	return 0
+}

+ 21 - 0
heartbeat_request_test.go

@@ -0,0 +1,21 @@
+package sarama
+
+import "testing"
+
+var (
+	basicHeartbeatRequest = []byte{
+		0, 3, 'f', 'o', 'o', // Group ID
+		0x00, 0x01, 0x02, 0x03, // Generatiuon ID
+		0, 3, 'b', 'a', 'z', // Member ID
+	}
+)
+
+func TestHeartbeatRequest(t *testing.T) {
+	var request *HeartbeatRequest
+
+	request = new(HeartbeatRequest)
+	request.GroupId = "foo"
+	request.GenerationId = 66051
+	request.MemberId = "baz"
+	testRequest(t, "basic", request, basicHeartbeatRequest)
+}

+ 20 - 0
heartbeat_response.go

@@ -0,0 +1,20 @@
+package sarama
+
+type HeartbeatResponse struct {
+	Err KError
+}
+
+func (r *HeartbeatResponse) encode(pe packetEncoder) error {
+	pe.putInt16(int16(r.Err))
+	return nil
+}
+
+func (r *HeartbeatResponse) decode(pd packetDecoder) error {
+	if kerr, err := pd.getInt16(); err != nil {
+		return err
+	} else {
+		r.Err = KError(kerr)
+	}
+
+	return nil
+}

+ 18 - 0
heartbeat_response_test.go

@@ -0,0 +1,18 @@
+package sarama
+
+import "testing"
+
+var (
+	heartbeatResponseNoError = []byte{
+		0x00, 0x00}
+)
+
+func TestHeartbeatResponse(t *testing.T) {
+	var response *HeartbeatResponse
+
+	response = new(HeartbeatResponse)
+	testDecodable(t, "no error", response, heartbeatResponseNoError)
+	if response.Err != ErrNoError {
+		t.Error("Decoding error failed: no error expected but found", response.Err)
+	}
+}

+ 94 - 0
join_group_request.go

@@ -0,0 +1,94 @@
+package sarama
+
+type JoinGroupRequest struct {
+	GroupId        string
+	SessionTimeout int32
+	MemberId       string
+	ProtocolType   string
+	GroupProtocols map[string][]byte
+}
+
+func (r *JoinGroupRequest) encode(pe packetEncoder) error {
+	if err := pe.putString(r.GroupId); err != nil {
+		return err
+	}
+	pe.putInt32(r.SessionTimeout)
+	if err := pe.putString(r.MemberId); err != nil {
+		return err
+	}
+	if err := pe.putString(r.ProtocolType); err != nil {
+		return err
+	}
+
+	if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil {
+		return err
+	}
+	for name, metadata := range r.GroupProtocols {
+		if err := pe.putString(name); err != nil {
+			return err
+		}
+		if err := pe.putBytes(metadata); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (r *JoinGroupRequest) decode(pd packetDecoder) (err error) {
+	if r.GroupId, err = pd.getString(); err != nil {
+		return
+	}
+
+	if r.SessionTimeout, err = pd.getInt32(); err != nil {
+		return
+	}
+
+	if r.MemberId, err = pd.getString(); err != nil {
+		return
+	}
+
+	if r.ProtocolType, err = pd.getString(); err != nil {
+		return
+	}
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+	if n == 0 {
+		return nil
+	}
+
+	r.GroupProtocols = make(map[string][]byte)
+	for i := 0; i < n; i++ {
+		name, err := pd.getString()
+		if err != nil {
+			return err
+		}
+		metadata, err := pd.getBytes()
+		if err != nil {
+			return err
+		}
+
+		r.GroupProtocols[name] = metadata
+	}
+
+	return nil
+}
+
+func (r *JoinGroupRequest) key() int16 {
+	return 11
+}
+
+func (r *JoinGroupRequest) version() int16 {
+	return 0
+}
+
+func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
+	if r.GroupProtocols == nil {
+		r.GroupProtocols = make(map[string][]byte)
+	}
+
+	r.GroupProtocols[name] = metadata
+}

+ 41 - 0
join_group_request_test.go

@@ -0,0 +1,41 @@
+package sarama
+
+import "testing"
+
+var (
+	joinGroupRequestNoProtocols = []byte{
+		0, 9, 'T', 'e', 's', 't', 'G', 'r', 'o', 'u', 'p', // Group ID
+		0, 0, 0, 100, // Session timeout
+		0, 0, // Member ID
+		0, 8, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // Protocol Type
+		0, 0, 0, 0, // 0 protocol groups
+	}
+
+	joinGroupRequestOneProtocol = []byte{
+		0, 9, 'T', 'e', 's', 't', 'G', 'r', 'o', 'u', 'p', // Group ID
+		0, 0, 0, 100, // Session timeout
+		0, 11, 'O', 'n', 'e', 'P', 'r', 'o', 't', 'o', 'c', 'o', 'l', // Member ID
+		0, 8, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // Protocol Type
+		0, 0, 0, 1, // 1 group protocol
+		0, 3, 'o', 'n', 'e', // Protocol name
+		0, 0, 0, 3, 0x01, 0x02, 0x03, // protocol metadata
+	}
+)
+
+func TestJoinGroupRequest(t *testing.T) {
+	var request *JoinGroupRequest
+
+	request = new(JoinGroupRequest)
+	request.GroupId = "TestGroup"
+	request.SessionTimeout = 100
+	request.ProtocolType = "consumer"
+	testRequest(t, "no protocols", request, joinGroupRequestNoProtocols)
+
+	request = new(JoinGroupRequest)
+	request.GroupId = "TestGroup"
+	request.SessionTimeout = 100
+	request.MemberId = "OneProtocol"
+	request.ProtocolType = "consumer"
+	request.AddGroupProtocol("one", []byte{0x01, 0x02, 0x03})
+	testRequest(t, "one protocol", request, joinGroupRequestOneProtocol)
+}

+ 90 - 0
join_group_response.go

@@ -0,0 +1,90 @@
+package sarama
+
+type JoinGroupResponse struct {
+	Err           KError
+	GenerationId  int32
+	GroupProtocol string
+	LeaderId      string
+	MemberId      string
+	Members       map[string][]byte
+}
+
+func (r *JoinGroupResponse) encode(pe packetEncoder) error {
+	pe.putInt16(int16(r.Err))
+	pe.putInt32(r.GenerationId)
+
+	if err := pe.putString(r.GroupProtocol); err != nil {
+		return err
+	}
+	if err := pe.putString(r.LeaderId); err != nil {
+		return err
+	}
+	if err := pe.putString(r.MemberId); err != nil {
+		return err
+	}
+
+	if err := pe.putArrayLength(len(r.Members)); err != nil {
+		return err
+	}
+
+	for memberId, memberMetadata := range r.Members {
+		if err := pe.putString(memberId); err != nil {
+			return err
+		}
+
+		if err := pe.putBytes(memberMetadata); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (r *JoinGroupResponse) decode(pd packetDecoder) (err error) {
+	if kerr, err := pd.getInt16(); err != nil {
+		return err
+	} else {
+		r.Err = KError(kerr)
+	}
+
+	if r.GenerationId, err = pd.getInt32(); err != nil {
+		return
+	}
+
+	if r.GroupProtocol, err = pd.getString(); err != nil {
+		return
+	}
+
+	if r.LeaderId, err = pd.getString(); err != nil {
+		return
+	}
+
+	if r.MemberId, err = pd.getString(); err != nil {
+		return
+	}
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+	if n == 0 {
+		return nil
+	}
+
+	r.Members = make(map[string][]byte)
+	for i := 0; i < n; i++ {
+		memberId, err := pd.getString()
+		if err != nil {
+			return err
+		}
+
+		memberMetadata, err := pd.getBytes()
+		if err != nil {
+			return err
+		}
+
+		r.Members[memberId] = memberMetadata
+	}
+
+	return nil
+}

+ 98 - 0
join_group_response_test.go

@@ -0,0 +1,98 @@
+package sarama
+
+import (
+	"reflect"
+	"testing"
+)
+
+var (
+	joinGroupResponseNoError = []byte{
+		0x00, 0x00, // No error
+		0x00, 0x01, 0x02, 0x03, // Generation ID
+		0, 8, 'p', 'r', 'o', 't', 'o', 'c', 'o', 'l', // Protocol name chosen
+		0, 3, 'f', 'o', 'o', // Leader ID
+		0, 3, 'b', 'a', 'r', // Member ID
+		0, 0, 0, 0, // No member info
+	}
+
+	joinGroupResponseWithError = []byte{
+		0, 23, // Error: inconsistent group protocol
+		0x00, 0x00, 0x00, 0x00, // Generation ID
+		0, 0, // Protocol name chosen
+		0, 0, // Leader ID
+		0, 0, // Member ID
+		0, 0, 0, 0, // No member info
+	}
+
+	joinGroupResponseLeader = []byte{
+		0x00, 0x00, // No error
+		0x00, 0x01, 0x02, 0x03, // Generation ID
+		0, 8, 'p', 'r', 'o', 't', 'o', 'c', 'o', 'l', // Protocol name chosen
+		0, 3, 'f', 'o', 'o', // Leader ID
+		0, 3, 'f', 'o', 'o', // Member ID == Leader ID
+		0, 0, 0, 1, // 1 member
+		0, 3, 'f', 'o', 'o', // Member ID
+		0, 0, 0, 3, 0x01, 0x02, 0x03, // Member metadata
+	}
+)
+
+func TestJoinGroupResponse(t *testing.T) {
+	var response *JoinGroupResponse
+
+	response = new(JoinGroupResponse)
+	testDecodable(t, "no error", response, joinGroupResponseNoError)
+	if response.Err != ErrNoError {
+		t.Error("Decoding Err failed: no error expected but found", response.Err)
+	}
+	if response.GenerationId != 66051 {
+		t.Error("Decoding GenerationId failed, found:", response.GenerationId)
+	}
+	if response.LeaderId != "foo" {
+		t.Error("Decoding LeaderId failed, found:", response.LeaderId)
+	}
+	if response.MemberId != "bar" {
+		t.Error("Decoding MemberId failed, found:", response.MemberId)
+	}
+	if len(response.Members) != 0 {
+		t.Error("Decoding Members failed, found:", response.Members)
+	}
+
+	response = new(JoinGroupResponse)
+	testDecodable(t, "with error", response, joinGroupResponseWithError)
+	if response.Err != ErrInconsistentGroupProtocol {
+		t.Error("Decoding Err failed: ErrInconsistentGroupProtocol expected but found", response.Err)
+	}
+	if response.GenerationId != 0 {
+		t.Error("Decoding GenerationId failed, found:", response.GenerationId)
+	}
+	if response.LeaderId != "" {
+		t.Error("Decoding LeaderId failed, found:", response.LeaderId)
+	}
+	if response.MemberId != "" {
+		t.Error("Decoding MemberId failed, found:", response.MemberId)
+	}
+	if len(response.Members) != 0 {
+		t.Error("Decoding Members failed, found:", response.Members)
+	}
+
+	response = new(JoinGroupResponse)
+	testDecodable(t, "with error", response, joinGroupResponseLeader)
+	if response.Err != ErrNoError {
+		t.Error("Decoding Err failed: ErrNoError expected but found", response.Err)
+	}
+	if response.GenerationId != 66051 {
+		t.Error("Decoding GenerationId failed, found:", response.GenerationId)
+	}
+	if response.LeaderId != "foo" {
+		t.Error("Decoding LeaderId failed, found:", response.LeaderId)
+	}
+	if response.MemberId != "foo" {
+		t.Error("Decoding MemberId failed, found:", response.MemberId)
+	}
+	if len(response.Members) != 1 {
+		t.Error("Decoding Members failed, found:", response.Members)
+	}
+	if !reflect.DeepEqual(response.Members["foo"], []byte{0x01, 0x02, 0x03}) {
+		t.Error("Decoding foo member failed, found:", response.Members["foo"])
+	}
+}

+ 36 - 0
leave_group_request.go

@@ -0,0 +1,36 @@
+package sarama
+
+type LeaveGroupRequest struct {
+	GroupId  string
+	MemberId string
+}
+
+func (r *LeaveGroupRequest) encode(pe packetEncoder) error {
+	if err := pe.putString(r.GroupId); err != nil {
+		return err
+	}
+	if err := pe.putString(r.MemberId); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (r *LeaveGroupRequest) decode(pd packetDecoder) (err error) {
+	if r.GroupId, err = pd.getString(); err != nil {
+		return
+	}
+	if r.MemberId, err = pd.getString(); err != nil {
+		return
+	}
+
+	return nil
+}
+
+func (r *LeaveGroupRequest) key() int16 {
+	return 13
+}
+
+func (r *LeaveGroupRequest) version() int16 {
+	return 0
+}

+ 19 - 0
leave_group_request_test.go

@@ -0,0 +1,19 @@
+package sarama
+
+import "testing"
+
+var (
+	basicLeaveGroupRequest = []byte{
+		0, 3, 'f', 'o', 'o',
+		0, 3, 'b', 'a', 'r',
+	}
+)
+
+func TestLeaveGroupRequest(t *testing.T) {
+	var request *LeaveGroupRequest
+
+	request = new(LeaveGroupRequest)
+	request.GroupId = "foo"
+	request.MemberId = "bar"
+	testRequest(t, "basic", request, basicLeaveGroupRequest)
+}

+ 20 - 0
leave_group_response.go

@@ -0,0 +1,20 @@
+package sarama
+
+type LeaveGroupResponse struct {
+	Err KError
+}
+
+func (r *LeaveGroupResponse) encode(pe packetEncoder) error {
+	pe.putInt16(int16(r.Err))
+	return nil
+}
+
+func (r *LeaveGroupResponse) decode(pd packetDecoder) (err error) {
+	if kerr, err := pd.getInt16(); err != nil {
+		return err
+	} else {
+		r.Err = KError(kerr)
+	}
+
+	return nil
+}

+ 24 - 0
leave_group_response_test.go

@@ -0,0 +1,24 @@
+package sarama
+
+import "testing"
+
+var (
+	leaveGroupResponseNoError   = []byte{0x00, 0x00}
+	leaveGroupResponseWithError = []byte{0, 25}
+)
+
+func TestLeaveGroupResponse(t *testing.T) {
+	var response *LeaveGroupResponse
+
+	response = new(LeaveGroupResponse)
+	testDecodable(t, "no error", response, leaveGroupResponseNoError)
+	if response.Err != ErrNoError {
+		t.Error("Decoding error failed: no error expected but found", response.Err)
+	}
+
+	response = new(LeaveGroupResponse)
+	testDecodable(t, "with error", response, leaveGroupResponseWithError)
+	if response.Err != ErrUnknownMemberId {
+		t.Error("Decoding error failed: ErrUnknownMemberId expected but found", response.Err)
+	}
+}

+ 20 - 0
list_groups_request.go

@@ -0,0 +1,20 @@
+package sarama
+
+type ListGroupsRequest struct {
+}
+
+func (r *ListGroupsRequest) encode(pe packetEncoder) error {
+	return nil
+}
+
+func (r *ListGroupsRequest) decode(pd packetDecoder) (err error) {
+	return nil
+}
+
+func (r *ListGroupsRequest) key() int16 {
+	return 16
+}
+
+func (r *ListGroupsRequest) version() int16 {
+	return 0
+}

+ 7 - 0
list_groups_request_test.go

@@ -0,0 +1,7 @@
+package sarama
+
+import "testing"
+
+func TestListGroupsRequest(t *testing.T) {
+	testRequest(t, "ListGroupsRequest", &ListGroupsRequest{}, []byte{})
+}

+ 56 - 0
list_groups_response.go

@@ -0,0 +1,56 @@
+package sarama
+
+type ListGroupsResponse struct {
+	Err    KError
+	Groups map[string]string
+}
+
+func (r *ListGroupsResponse) encode(pe packetEncoder) error {
+	pe.putInt16(int16(r.Err))
+
+	if err := pe.putArrayLength(len(r.Groups)); err != nil {
+		return err
+	}
+	for groupId, protocolType := range r.Groups {
+		if err := pe.putString(groupId); err != nil {
+			return err
+		}
+		if err := pe.putString(protocolType); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (r *ListGroupsResponse) decode(pd packetDecoder) error {
+	if kerr, err := pd.getInt16(); err != nil {
+		return err
+	} else {
+		r.Err = KError(kerr)
+	}
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+	if n == 0 {
+		return nil
+	}
+
+	r.Groups = make(map[string]string)
+	for i := 0; i < n; i++ {
+		groupId, err := pd.getString()
+		if err != nil {
+			return err
+		}
+		protocolType, err := pd.getString()
+		if err != nil {
+			return err
+		}
+
+		r.Groups[groupId] = protocolType
+	}
+
+	return nil
+}

+ 58 - 0
list_groups_response_test.go

@@ -0,0 +1,58 @@
+package sarama
+
+import (
+	"testing"
+)
+
+var (
+	listGroupsResponseEmpty = []byte{
+		0, 0, // no error
+		0, 0, 0, 0, // no groups
+	}
+
+	listGroupsResponseError = []byte{
+		0, 31, // no error
+		0, 0, 0, 0, // ErrClusterAuthorizationFailed
+	}
+
+	listGroupsResponseWithConsumer = []byte{
+		0, 0, // no error
+		0, 0, 0, 1, // 1 group
+		0, 3, 'f', 'o', 'o', // group name
+		0, 8, 'c', 'o', 'n', 's', 'u', 'm', 'e', 'r', // protocol type
+	}
+)
+
+func TestListGroupsResponse(t *testing.T) {
+	var response *ListGroupsResponse
+
+	response = new(ListGroupsResponse)
+	testDecodable(t, "no error", response, listGroupsResponseEmpty)
+	if response.Err != ErrNoError {
+		t.Error("Expected no gerror, found:", response.Err)
+	}
+	if len(response.Groups) != 0 {
+		t.Error("Expected no groups")
+	}
+
+	response = new(ListGroupsResponse)
+	testDecodable(t, "no error", response, listGroupsResponseError)
+	if response.Err != ErrClusterAuthorizationFailed {
+		t.Error("Expected no gerror, found:", response.Err)
+	}
+	if len(response.Groups) != 0 {
+		t.Error("Expected no groups")
+	}
+
+	response = new(ListGroupsResponse)
+	testDecodable(t, "no error", response, listGroupsResponseWithConsumer)
+	if response.Err != ErrNoError {
+		t.Error("Expected no gerror, found:", response.Err)
+	}
+	if len(response.Groups) != 1 {
+		t.Error("Expected one group")
+	}
+	if response.Groups["foo"] != "consumer" {
+		t.Error("Expected foo group to use consumer protocol")
+	}
+}

+ 1 - 0
packet_decoder.go

@@ -16,6 +16,7 @@ type packetDecoder interface {
 	getString() (string, error)
 	getInt32Array() ([]int32, error)
 	getInt64Array() ([]int64, error)
+	getStringArray() ([]string, error)
 
 	// Subsets
 	remaining() int

+ 1 - 0
packet_encoder.go

@@ -15,6 +15,7 @@ type packetEncoder interface {
 	putBytes(in []byte) error
 	putRawBytes(in []byte) error
 	putString(in string) error
+	putStringArray(in []string) error
 	putInt32Array(in []int32) error
 	putInt64Array(in []int64) error
 

+ 15 - 0
prep_encoder.go

@@ -66,6 +66,21 @@ func (pe *prepEncoder) putString(in string) error {
 	return nil
 }
 
+func (pe *prepEncoder) putStringArray(in []string) error {
+	err := pe.putArrayLength(len(in))
+	if err != nil {
+		return err
+	}
+
+	for _, str := range in {
+		if err := pe.putString(str); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
 func (pe *prepEncoder) putInt32Array(in []int32) error {
 	err := pe.putArrayLength(len(in))
 	if err != nil {

+ 27 - 0
real_decoder.go

@@ -181,6 +181,33 @@ func (rd *realDecoder) getInt64Array() ([]int64, error) {
 	return ret, nil
 }
 
+func (rd *realDecoder) getStringArray() ([]string, error) {
+	if rd.remaining() < 4 {
+		rd.off = len(rd.raw)
+		return nil, ErrInsufficientData
+	}
+	n := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
+	rd.off += 4
+
+	if n == 0 {
+		return nil, nil
+	}
+
+	if n < 0 {
+		return nil, PacketDecodingError{"invalid array length"}
+	}
+
+	ret := make([]string, n)
+	for i := range ret {
+		if str, err := rd.getString(); err != nil {
+			return nil, err
+		} else {
+			ret[i] = str
+		}
+	}
+	return ret, nil
+}
+
 // subsets
 
 func (rd *realDecoder) remaining() int {

+ 15 - 0
real_encoder.go

@@ -61,6 +61,21 @@ func (re *realEncoder) putString(in string) error {
 	return nil
 }
 
+func (re *realEncoder) putStringArray(in []string) error {
+	err := re.putArrayLength(len(in))
+	if err != nil {
+		return err
+	}
+
+	for _, val := range in {
+		if err := re.putString(val); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
 func (re *realEncoder) putInt32Array(in []int32) error {
 	err := re.putArrayLength(len(in))
 	if err != nil {

+ 12 - 0
request.go

@@ -95,6 +95,18 @@ func allocateBody(key, version int16) requestBody {
 		return &OffsetFetchRequest{}
 	case 10:
 		return &ConsumerMetadataRequest{}
+	case 11:
+		return &JoinGroupRequest{}
+	case 12:
+		return &HeartbeatRequest{}
+	case 13:
+		return &LeaveGroupRequest{}
+	case 14:
+		return &SyncGroupRequest{}
+	case 15:
+		return &DescribeGroupsRequest{}
+	case 16:
+		return &ListGroupsRequest{}
 	}
 	return nil
 }

+ 2 - 2
request_test.go

@@ -48,7 +48,7 @@ func testRequest(t *testing.T, name string, rb requestBody, expected []byte) {
 	if err != nil {
 		t.Error(err)
 	} else if !bytes.Equal(packet[headerSize:], expected) {
-		t.Error("Encoding", name, "failed\ngot ", packet, "\nwant", expected)
+		t.Error("Encoding", name, "failed\ngot ", packet[headerSize:], "\nwant", expected)
 	}
 	// Decoder request
 	decoded, err := decodeRequest(bytes.NewReader(packet))
@@ -57,7 +57,7 @@ func testRequest(t *testing.T, name string, rb requestBody, expected []byte) {
 	} else if decoded.correlationID != 123 || decoded.clientID != "foo" {
 		t.Errorf("Decoded header is not valid: %v", decoded)
 	} else if !reflect.DeepEqual(rb, decoded.body) {
-		t.Errorf("Decoded request does not match the encoded one\nencoded: %v\ndecoded: %v", rb, decoded)
+		t.Errorf("Decoded request does not match the encoded one\nencoded: %v\ndecoded: %v", rb, decoded.body)
 	}
 }
 

+ 86 - 0
sync_group_request.go

@@ -0,0 +1,86 @@
+package sarama
+
+type SyncGroupRequest struct {
+	GroupId          string
+	GenerationId     int32
+	MemberId         string
+	GroupAssignments map[string][]byte
+}
+
+func (r *SyncGroupRequest) encode(pe packetEncoder) error {
+	if err := pe.putString(r.GroupId); err != nil {
+		return err
+	}
+
+	pe.putInt32(r.GenerationId)
+
+	if err := pe.putString(r.MemberId); err != nil {
+		return err
+	}
+
+	if err := pe.putArrayLength(len(r.GroupAssignments)); err != nil {
+		return err
+	}
+	for memberId, memberAssignment := range r.GroupAssignments {
+		if err := pe.putString(memberId); err != nil {
+			return err
+		}
+		if err := pe.putBytes(memberAssignment); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (r *SyncGroupRequest) decode(pd packetDecoder) (err error) {
+	if r.GroupId, err = pd.getString(); err != nil {
+		return
+	}
+	if r.GenerationId, err = pd.getInt32(); err != nil {
+		return
+	}
+	if r.MemberId, err = pd.getString(); err != nil {
+		return
+	}
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+	if n == 0 {
+		return nil
+	}
+
+	r.GroupAssignments = make(map[string][]byte)
+	for i := 0; i < n; i++ {
+		memberId, err := pd.getString()
+		if err != nil {
+			return err
+		}
+		memberAssignment, err := pd.getBytes()
+		if err != nil {
+			return err
+		}
+
+		r.GroupAssignments[memberId] = memberAssignment
+	}
+
+	return nil
+}
+
+func (r *SyncGroupRequest) key() int16 {
+	return 14
+}
+
+func (r *SyncGroupRequest) version() int16 {
+	return 0
+}
+
+func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) {
+	if r.GroupAssignments == nil {
+		r.GroupAssignments = make(map[string][]byte)
+	}
+
+	r.GroupAssignments[memberId] = memberAssignment
+}

+ 38 - 0
sync_group_request_test.go

@@ -0,0 +1,38 @@
+package sarama
+
+import "testing"
+
+var (
+	emptySyncGroupRequest = []byte{
+		0, 3, 'f', 'o', 'o', // Group ID
+		0x00, 0x01, 0x02, 0x03, // Generation ID
+		0, 3, 'b', 'a', 'z', // Member ID
+		0, 0, 0, 0, // no assignments
+	}
+
+	populatedSyncGroupRequest = []byte{
+		0, 3, 'f', 'o', 'o', // Group ID
+		0x00, 0x01, 0x02, 0x03, // Generation ID
+		0, 3, 'b', 'a', 'z', // Member ID
+		0, 0, 0, 1, // one assignment
+		0, 3, 'b', 'a', 'z', // Member ID
+		0, 0, 0, 3, 'f', 'o', 'o', // Member assignment
+	}
+)
+
+func TestSyncGroupRequest(t *testing.T) {
+	var request *SyncGroupRequest
+
+	request = new(SyncGroupRequest)
+	request.GroupId = "foo"
+	request.GenerationId = 66051
+	request.MemberId = "baz"
+	testRequest(t, "empty", request, emptySyncGroupRequest)
+
+	request = new(SyncGroupRequest)
+	request.GroupId = "foo"
+	request.GenerationId = 66051
+	request.MemberId = "baz"
+	request.AddGroupAssignment("baz", []byte("foo"))
+	testRequest(t, "populated", request, populatedSyncGroupRequest)
+}

+ 22 - 0
sync_group_response.go

@@ -0,0 +1,22 @@
+package sarama
+
+type SyncGroupResponse struct {
+	Err              KError
+	MemberAssignment []byte
+}
+
+func (r *SyncGroupResponse) encode(pe packetEncoder) error {
+	pe.putInt16(int16(r.Err))
+	return pe.putBytes(r.MemberAssignment)
+}
+
+func (r *SyncGroupResponse) decode(pd packetDecoder) (err error) {
+	if kerr, err := pd.getInt16(); err != nil {
+		return err
+	} else {
+		r.Err = KError(kerr)
+	}
+
+	r.MemberAssignment, err = pd.getBytes()
+	return
+}

+ 40 - 0
sync_group_response_test.go

@@ -0,0 +1,40 @@
+package sarama
+
+import (
+	"reflect"
+	"testing"
+)
+
+var (
+	syncGroupResponseNoError = []byte{
+		0x00, 0x00, // No error
+		0, 0, 0, 3, 0x01, 0x02, 0x03, // Member assignment data
+	}
+
+	syncGroupResponseWithError = []byte{
+		0, 27, // ErrRebalanceInProgress
+		0, 0, 0, 0, // No member assignment data
+	}
+)
+
+func TestSyncGroupResponse(t *testing.T) {
+	var response *SyncGroupResponse
+
+	response = new(SyncGroupResponse)
+	testDecodable(t, "no error", response, syncGroupResponseNoError)
+	if response.Err != ErrNoError {
+		t.Error("Decoding Err failed: no error expected but found", response.Err)
+	}
+	if !reflect.DeepEqual(response.MemberAssignment, []byte{0x01, 0x02, 0x03}) {
+		t.Error("Decoding MemberAssignment failed, found:", response.MemberAssignment)
+	}
+
+	response = new(SyncGroupResponse)
+	testDecodable(t, "no error", response, syncGroupResponseWithError)
+	if response.Err != ErrRebalanceInProgress {
+		t.Error("Decoding Err failed: ErrRebalanceInProgress expected but found", response.Err)
+	}
+	if !reflect.DeepEqual(response.MemberAssignment, []byte{}) {
+		t.Error("Decoding MemberAssignment failed, found:", response.MemberAssignment)
+	}
+}